You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/03/20 11:06:51 UTC

[12/53] [abbrv] flink git commit: [optimizer] Rename optimizer project to "flink-optimizer" (previously flink-compiler)

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/WorksetIterationPlanNode.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/WorksetIterationPlanNode.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/WorksetIterationPlanNode.java
new file mode 100644
index 0000000..95adace
--- /dev/null
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/WorksetIterationPlanNode.java
@@ -0,0 +1,259 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.optimizer.plan;
+
+import static org.apache.flink.optimizer.plan.PlanNode.SourceAndDamReport.FOUND_SOURCE;
+import static org.apache.flink.optimizer.plan.PlanNode.SourceAndDamReport.FOUND_SOURCE_AND_DAM;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.flink.api.common.operators.util.FieldList;
+import org.apache.flink.api.common.typeutils.TypeComparatorFactory;
+import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
+import org.apache.flink.optimizer.CompilerException;
+import org.apache.flink.optimizer.costs.Costs;
+import org.apache.flink.optimizer.dag.OptimizerNode;
+import org.apache.flink.optimizer.dag.WorksetIterationNode;
+import org.apache.flink.runtime.operators.DriverStrategy;
+import org.apache.flink.util.Visitor;
+
+/**
+ * A node in the execution, representing a workset iteration (delta iteration).
+ */
+public class WorksetIterationPlanNode extends DualInputPlanNode implements IterationPlanNode {
+
+	private final SolutionSetPlanNode solutionSetPlanNode;
+	
+	private final WorksetPlanNode worksetPlanNode;
+	
+	private final PlanNode solutionSetDeltaPlanNode;
+	
+	private final PlanNode nextWorkSetPlanNode;
+	
+	private TypeSerializerFactory<?> worksetSerializer;
+	
+	private TypeSerializerFactory<?> solutionSetSerializer;
+	
+	private TypeComparatorFactory<?> solutionSetComparator;
+	
+	private boolean immediateSolutionSetUpdate;
+	
+	public Object postPassHelper;
+	
+	private TypeSerializerFactory<?> serializerForIterationChannel;
+	
+	// --------------------------------------------------------------------------------------------
+
+	public WorksetIterationPlanNode(WorksetIterationNode template, String nodeName, Channel initialSolutionSet, Channel initialWorkset,
+			SolutionSetPlanNode solutionSetPlanNode, WorksetPlanNode worksetPlanNode,
+			PlanNode nextWorkSetPlanNode, PlanNode solutionSetDeltaPlanNode)
+	{
+		super(template, nodeName, initialSolutionSet, initialWorkset, DriverStrategy.BINARY_NO_OP);
+		this.solutionSetPlanNode = solutionSetPlanNode;
+		this.worksetPlanNode = worksetPlanNode;
+		this.solutionSetDeltaPlanNode = solutionSetDeltaPlanNode;
+		this.nextWorkSetPlanNode = nextWorkSetPlanNode;
+		
+		mergeBranchPlanMaps();
+
+	}
+
+	// --------------------------------------------------------------------------------------------
+	
+	public WorksetIterationNode getIterationNode() {
+		if (this.template instanceof WorksetIterationNode) {
+			return (WorksetIterationNode) this.template;
+		} else {
+			throw new RuntimeException();
+		}
+	}
+	
+	public SolutionSetPlanNode getSolutionSetPlanNode() {
+		return this.solutionSetPlanNode;
+	}
+	
+	public WorksetPlanNode getWorksetPlanNode() {
+		return this.worksetPlanNode;
+	}
+	
+	public PlanNode getSolutionSetDeltaPlanNode() {
+		return this.solutionSetDeltaPlanNode;
+	}
+	
+	public PlanNode getNextWorkSetPlanNode() {
+		return this.nextWorkSetPlanNode;
+	}
+	
+	public Channel getInitialSolutionSetInput() {
+		return getInput1();
+	}
+	
+	public Channel getInitialWorksetInput() {
+		return getInput2();
+	}
+	
+	public void setImmediateSolutionSetUpdate(boolean immediateUpdate) {
+		this.immediateSolutionSetUpdate = immediateUpdate;
+	}
+	
+	public boolean isImmediateSolutionSetUpdate() {
+		return this.immediateSolutionSetUpdate;
+	}
+	
+	public FieldList getSolutionSetKeyFields() {
+		return getIterationNode().getSolutionSetKeyFields();
+	}
+	
+	// --------------------------------------------------------------------------------------------
+	
+	public TypeSerializerFactory<?> getWorksetSerializer() {
+		return worksetSerializer;
+	}
+	
+	public void setWorksetSerializer(TypeSerializerFactory<?> worksetSerializer) {
+		this.worksetSerializer = worksetSerializer;
+	}
+	
+	public TypeSerializerFactory<?> getSolutionSetSerializer() {
+		return solutionSetSerializer;
+	}
+	
+	public void setSolutionSetSerializer(TypeSerializerFactory<?> solutionSetSerializer) {
+		this.solutionSetSerializer = solutionSetSerializer;
+	}
+	
+	public TypeComparatorFactory<?> getSolutionSetComparator() {
+		return solutionSetComparator;
+	}
+	
+	public void setSolutionSetComparator(TypeComparatorFactory<?> solutionSetComparator) {
+		this.solutionSetComparator = solutionSetComparator;
+	}
+
+	// --------------------------------------------------------------------------------------------
+	
+	public void setCosts(Costs nodeCosts) {
+		// add the costs from the step function
+		nodeCosts.addCosts(this.solutionSetDeltaPlanNode.getCumulativeCostsShare());
+		nodeCosts.addCosts(this.nextWorkSetPlanNode.getCumulativeCostsShare());
+
+		super.setCosts(nodeCosts);
+	}
+	
+	public int getMemoryConsumerWeight() {
+		// solution set index and workset back channel
+		return 2;
+	}
+	
+	@Override
+	public SourceAndDamReport hasDamOnPathDownTo(PlanNode source) {
+		if (source == this) {
+			return FOUND_SOURCE;
+		}
+		
+		SourceAndDamReport fromOutside = super.hasDamOnPathDownTo(source);
+
+		if (fromOutside == FOUND_SOURCE_AND_DAM) {
+			return FOUND_SOURCE_AND_DAM;
+		}
+		else if (fromOutside == FOUND_SOURCE) {
+			// we always have a dam in the solution set index
+			return FOUND_SOURCE_AND_DAM;
+		} else {
+			SourceAndDamReport fromNextWorkset = nextWorkSetPlanNode.hasDamOnPathDownTo(source);
+
+			if (fromNextWorkset == FOUND_SOURCE_AND_DAM){
+				return FOUND_SOURCE_AND_DAM;
+			} else if (fromNextWorkset == FOUND_SOURCE){
+				return FOUND_SOURCE_AND_DAM;
+			} else {
+				return this.solutionSetDeltaPlanNode.hasDamOnPathDownTo(source);
+			}
+		}
+	}
+
+	@Override
+	public void acceptForStepFunction(Visitor<PlanNode> visitor) {
+		this.solutionSetDeltaPlanNode.accept(visitor);
+		this.nextWorkSetPlanNode.accept(visitor);
+	}
+
+	/**
+	 * Merging can only take place after the solutionSetDelta and nextWorkset PlanNode has been set,
+	 * because they can contain also some of the branching nodes.
+	 */
+	@Override
+	protected void mergeBranchPlanMaps(Map<OptimizerNode, PlanNode> branchPlan1, Map<OptimizerNode,PlanNode> branchPlan2) {}
+
+	
+	protected void mergeBranchPlanMaps() {
+		Map<OptimizerNode, PlanNode> branchPlan1 = input1.getSource().branchPlan;
+		Map<OptimizerNode, PlanNode> branchPlan2 = input2.getSource().branchPlan;
+
+		// merge the branchPlan maps according the template's uncloseBranchesStack
+		if (this.template.hasUnclosedBranches()) {
+			if (this.branchPlan == null) {
+				this.branchPlan = new HashMap<OptimizerNode, PlanNode>(8);
+			}
+
+			for (OptimizerNode.UnclosedBranchDescriptor uc : this.template.getOpenBranches()) {
+				OptimizerNode brancher = uc.getBranchingNode();
+				PlanNode selectedCandidate = null;
+
+				if (branchPlan1 != null) {
+					// predecessor 1 has branching children, see if it got the branch we are looking for
+					selectedCandidate = branchPlan1.get(brancher);
+				}
+
+				if (selectedCandidate == null && branchPlan2 != null) {
+					// predecessor 2 has branching children, see if it got the branch we are looking for
+					selectedCandidate = branchPlan2.get(brancher);
+				}
+
+				if(selectedCandidate == null && getSolutionSetDeltaPlanNode() != null && getSolutionSetDeltaPlanNode()
+						.branchPlan != null){
+					selectedCandidate = getSolutionSetDeltaPlanNode().branchPlan.get(brancher);
+				}
+
+				if(selectedCandidate == null && getNextWorkSetPlanNode() != null && getNextWorkSetPlanNode()
+						.branchPlan != null){
+					selectedCandidate = getNextWorkSetPlanNode().branchPlan.get(brancher);
+				}
+
+				if (selectedCandidate == null) {
+					throw new CompilerException(
+							"Candidates for a node with open branches are missing information about the selected candidate ");
+				}
+
+				this.branchPlan.put(brancher, selectedCandidate);
+			}
+		}
+	}
+	
+	// --------------------------------------------------------------------------------------------
+	
+	public TypeSerializerFactory<?> getSerializerForIterationChannel() {
+		return serializerForIterationChannel;
+	}
+	
+	public void setSerializerForIterationChannel(TypeSerializerFactory<?> serializerForIterationChannel) {
+		this.serializerForIterationChannel = serializerForIterationChannel;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/WorksetPlanNode.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/WorksetPlanNode.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/WorksetPlanNode.java
new file mode 100644
index 0000000..8d044d6
--- /dev/null
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/WorksetPlanNode.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.optimizer.plan;
+
+import static org.apache.flink.optimizer.plan.PlanNode.SourceAndDamReport.FOUND_SOURCE;
+import static org.apache.flink.optimizer.plan.PlanNode.SourceAndDamReport.FOUND_SOURCE_AND_DAM;
+import static org.apache.flink.optimizer.plan.PlanNode.SourceAndDamReport.NOT_FOUND;
+
+import java.util.Collections;
+import java.util.HashMap;
+
+import org.apache.flink.optimizer.costs.Costs;
+import org.apache.flink.optimizer.dag.OptimizerNode;
+import org.apache.flink.optimizer.dag.WorksetNode;
+import org.apache.flink.optimizer.dataproperties.GlobalProperties;
+import org.apache.flink.optimizer.dataproperties.LocalProperties;
+import org.apache.flink.runtime.operators.DamBehavior;
+import org.apache.flink.runtime.operators.DriverStrategy;
+import org.apache.flink.util.Visitor;
+
+/**
+ * Plan candidate node for partial solution of a bulk iteration.
+ */
+public class WorksetPlanNode extends PlanNode {
+	
+	private static final Costs NO_COSTS = new Costs();
+	
+	private WorksetIterationPlanNode containingIterationNode;
+	
+	private final Channel initialInput;
+	
+	public Object postPassHelper;
+	
+	
+	public WorksetPlanNode(WorksetNode template, String nodeName,
+			GlobalProperties gProps, LocalProperties lProps,
+			Channel initialInput)
+	{
+		super(template, nodeName, DriverStrategy.NONE);
+		
+		this.globalProps = gProps;
+		this.localProps = lProps;
+		this.initialInput = initialInput;
+		
+		// the node incurs no cost
+		this.nodeCosts = NO_COSTS;
+		this.cumulativeCosts = NO_COSTS;
+		
+		if (initialInput.getSource().branchPlan != null && initialInput.getSource().branchPlan.size() > 0) {
+			if (this.branchPlan == null) {
+				this.branchPlan = new HashMap<OptimizerNode, PlanNode>();
+			}
+			
+			this.branchPlan.putAll(initialInput.getSource().branchPlan);
+		}
+	}
+	
+	// --------------------------------------------------------------------------------------------
+	
+	public WorksetNode getWorksetNode() {
+		return (WorksetNode) this.template;
+	}
+	
+	public WorksetIterationPlanNode getContainingIterationNode() {
+		return this.containingIterationNode;
+	}
+	
+	public void setContainingIterationNode(WorksetIterationPlanNode containingIterationNode) {
+		this.containingIterationNode = containingIterationNode;
+	}
+
+	// --------------------------------------------------------------------------------------------
+	
+
+	@Override
+	public void accept(Visitor<PlanNode> visitor) {
+		if (visitor.preVisit(this)) {
+			visitor.postVisit(this);
+		}
+	}
+
+
+	@Override
+	public Iterable<PlanNode> getPredecessors() {
+		return Collections.<PlanNode>emptyList();
+	}
+
+
+	@Override
+	public Iterable<Channel> getInputs() {
+		return Collections.<Channel>emptyList();
+	}
+
+
+	@Override
+	public SourceAndDamReport hasDamOnPathDownTo(PlanNode source) {
+		if (source == this) {
+			return FOUND_SOURCE;
+		}
+		SourceAndDamReport res = this.initialInput.getSource().hasDamOnPathDownTo(source);
+		if (res == FOUND_SOURCE_AND_DAM) {
+			return FOUND_SOURCE_AND_DAM;
+		}
+		else if (res == FOUND_SOURCE) {
+			return (this.initialInput.getLocalStrategy().dams() || 
+					this.initialInput.getTempMode().breaksPipeline() ||
+					getDriverStrategy().firstDam() == DamBehavior.FULL_DAM) ?
+				FOUND_SOURCE_AND_DAM : FOUND_SOURCE;
+		}
+		else {
+			return NOT_FOUND;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plandump/DumpableConnection.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plandump/DumpableConnection.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plandump/DumpableConnection.java
new file mode 100644
index 0000000..3f8cb46
--- /dev/null
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plandump/DumpableConnection.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.optimizer.plandump;
+
+import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
+
+
+/**
+ *
+ */
+public interface DumpableConnection<T extends DumpableNode<T>> {
+
+	public DumpableNode<T> getSource();
+	
+	public ShipStrategyType getShipStrategy();
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plandump/DumpableNode.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plandump/DumpableNode.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plandump/DumpableNode.java
new file mode 100644
index 0000000..1bc0f0c
--- /dev/null
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plandump/DumpableNode.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.optimizer.plandump;
+
+import org.apache.flink.optimizer.dag.OptimizerNode;
+import org.apache.flink.optimizer.plan.PlanNode;
+
+/**
+ *
+ */
+public interface DumpableNode<T extends DumpableNode<T>> {
+	
+	/**
+	 * Gets an iterator over the predecessors.
+	 * 
+	 * @return An iterator over the predecessors.
+	 */
+	Iterable<T> getPredecessors();
+	
+	Iterable<DumpableConnection<T>> getDumpableInputs();
+	
+	OptimizerNode getOptimizerNode();
+	
+	PlanNode getPlanNode();
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plandump/PlanJSONDumpGenerator.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plandump/PlanJSONDumpGenerator.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plandump/PlanJSONDumpGenerator.java
new file mode 100644
index 0000000..6f918c0
--- /dev/null
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plandump/PlanJSONDumpGenerator.java
@@ -0,0 +1,657 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.optimizer.plandump;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.io.StringWriter;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+
+import org.apache.commons.lang3.StringEscapeUtils;
+import org.apache.flink.api.common.operators.CompilerHints;
+import org.apache.flink.optimizer.CompilerException;
+import org.apache.flink.optimizer.dag.BinaryUnionNode;
+import org.apache.flink.optimizer.dag.BulkIterationNode;
+import org.apache.flink.optimizer.dag.DataSinkNode;
+import org.apache.flink.optimizer.dag.DataSourceNode;
+import org.apache.flink.optimizer.dag.OptimizerNode;
+import org.apache.flink.optimizer.dag.DagConnection;
+import org.apache.flink.optimizer.dag.TempMode;
+import org.apache.flink.optimizer.dag.WorksetIterationNode;
+import org.apache.flink.optimizer.dataproperties.GlobalProperties;
+import org.apache.flink.optimizer.dataproperties.LocalProperties;
+import org.apache.flink.optimizer.plan.BulkIterationPlanNode;
+import org.apache.flink.optimizer.plan.Channel;
+import org.apache.flink.optimizer.plan.OptimizedPlan;
+import org.apache.flink.optimizer.plan.PlanNode;
+import org.apache.flink.optimizer.plan.SingleInputPlanNode;
+import org.apache.flink.optimizer.plan.SinkPlanNode;
+import org.apache.flink.optimizer.plan.WorksetIterationPlanNode;
+import org.apache.flink.optimizer.util.Utils;
+import org.apache.flink.runtime.operators.DriverStrategy;
+import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
+import org.apache.flink.util.StringUtils;
+
+/**
+ * 
+ */
+public class PlanJSONDumpGenerator {
+	
+	private Map<DumpableNode<?>, Integer> nodeIds; // resolves pact nodes to ids
+
+	private int nodeCnt;
+	
+	private boolean encodeForHTML;
+
+	// --------------------------------------------------------------------------------------------
+	
+	public void setEncodeForHTML(boolean encodeForHTML) {
+		this.encodeForHTML = encodeForHTML;
+	}
+	
+	public boolean isEncodeForHTML() {
+		return encodeForHTML;
+	}
+	
+	
+	public void dumpPactPlanAsJSON(List<DataSinkNode> nodes, PrintWriter writer) {
+		@SuppressWarnings("unchecked")
+		List<DumpableNode<?>> n = (List<DumpableNode<?>>) (List<?>) nodes;
+		compilePlanToJSON(n, writer);
+	}
+	
+	public String getPactPlanAsJSON(List<DataSinkNode> nodes) {
+		StringWriter sw = new StringWriter();
+		PrintWriter pw = new PrintWriter(sw);
+		dumpPactPlanAsJSON(nodes, pw);
+		return sw.toString();
+	}
+	
+	public void dumpOptimizerPlanAsJSON(OptimizedPlan plan, File toFile) throws IOException {
+		PrintWriter pw = null;
+		try {
+			pw = new PrintWriter(new FileOutputStream(toFile), false);
+			dumpOptimizerPlanAsJSON(plan, pw);
+			pw.flush();
+		} finally {
+			if (pw != null) {
+				pw.close();
+			}
+		}
+	}
+	
+	public String getOptimizerPlanAsJSON(OptimizedPlan plan) {
+		StringWriter sw = new StringWriter();
+		PrintWriter pw = new PrintWriter(sw);
+		dumpOptimizerPlanAsJSON(plan, pw);
+		pw.close();
+		return sw.toString();
+	}
+	
+	public void dumpOptimizerPlanAsJSON(OptimizedPlan plan, PrintWriter writer) {
+		Collection<SinkPlanNode> sinks = plan.getDataSinks();
+		if (sinks instanceof List) {
+			dumpOptimizerPlanAsJSON((List<SinkPlanNode>) sinks, writer);
+		} else {
+			List<SinkPlanNode> n = new ArrayList<SinkPlanNode>();
+			n.addAll(sinks);
+			dumpOptimizerPlanAsJSON(n, writer);
+		}
+	}
+	
+	public void dumpOptimizerPlanAsJSON(List<SinkPlanNode> nodes, PrintWriter writer) {
+		@SuppressWarnings("unchecked")
+		List<DumpableNode<?>> n = (List<DumpableNode<?>>) (List<?>) nodes;
+		compilePlanToJSON(n, writer);
+	}
+	
+	// --------------------------------------------------------------------------------------------
+	
+	private void compilePlanToJSON(List<DumpableNode<?>> nodes, PrintWriter writer) {
+		// initialization to assign node ids
+		this.nodeIds = new HashMap<DumpableNode<?>, Integer>();
+		this.nodeCnt = 0;
+		
+		// JSON header
+		writer.print("{\n\t\"nodes\": [\n\n");
+
+		// Generate JSON for plan
+		for (int i = 0; i < nodes.size(); i++) {
+			visit(nodes.get(i), writer, i == 0);
+		}
+		
+		// JSON Footer
+		writer.println("\n\t]\n}");
+	}
+
+	private boolean visit(DumpableNode<?> node, PrintWriter writer, boolean first) {
+		// check for duplicate traversal
+		if (this.nodeIds.containsKey(node)) {
+			return false;
+		}
+		
+		// assign an id first
+		this.nodeIds.put(node, this.nodeCnt++);
+		
+		// then recurse
+		for (DumpableNode<?> child : node.getPredecessors()) {
+			//This is important, because when the node was already in the graph it is not allowed
+			//to set first to false!
+			if (visit(child, writer, first)) {
+				first = false;
+			};
+		}
+		
+		// check if this node should be skipped from the dump
+		final OptimizerNode n = node.getOptimizerNode();
+		
+		// ------------------ dump after the ascend ---------------------
+		// start a new node and output node id
+		if (!first) {
+			writer.print(",\n");	
+		}
+		// open the node
+		writer.print("\t{\n");
+		
+		// recurse, it is is an iteration node
+		if (node instanceof BulkIterationNode || node instanceof BulkIterationPlanNode) {
+			
+			DumpableNode<?> innerChild = node instanceof BulkIterationNode ?
+					((BulkIterationNode) node).getNextPartialSolution() :
+					((BulkIterationPlanNode) node).getRootOfStepFunction();
+					
+			DumpableNode<?> begin = node instanceof BulkIterationNode ?
+				((BulkIterationNode) node).getPartialSolution() :
+				((BulkIterationPlanNode) node).getPartialSolutionPlanNode();
+			
+			writer.print("\t\t\"step_function\": [\n");
+			
+			visit(innerChild, writer, true);
+			
+			writer.print("\n\t\t],\n");
+			writer.print("\t\t\"partial_solution\": " + this.nodeIds.get(begin) + ",\n");
+			writer.print("\t\t\"next_partial_solution\": " + this.nodeIds.get(innerChild) + ",\n");
+		} else if (node instanceof WorksetIterationNode || node instanceof WorksetIterationPlanNode) {
+			
+			DumpableNode<?> worksetRoot = node instanceof WorksetIterationNode ?
+					((WorksetIterationNode) node).getNextWorkset() :
+					((WorksetIterationPlanNode) node).getNextWorkSetPlanNode();
+			DumpableNode<?> solutionDelta = node instanceof WorksetIterationNode ?
+					((WorksetIterationNode) node).getSolutionSetDelta() :
+					((WorksetIterationPlanNode) node).getSolutionSetDeltaPlanNode();
+					
+			DumpableNode<?> workset = node instanceof WorksetIterationNode ?
+						((WorksetIterationNode) node).getWorksetNode() :
+						((WorksetIterationPlanNode) node).getWorksetPlanNode();
+			DumpableNode<?> solutionSet = node instanceof WorksetIterationNode ?
+						((WorksetIterationNode) node).getSolutionSetNode() :
+						((WorksetIterationPlanNode) node).getSolutionSetPlanNode();
+			
+			writer.print("\t\t\"step_function\": [\n");
+			
+			visit(worksetRoot, writer, true);
+			visit(solutionDelta, writer, false);
+			
+			writer.print("\n\t\t],\n");
+			writer.print("\t\t\"workset\": " + this.nodeIds.get(workset) + ",\n");
+			writer.print("\t\t\"solution_set\": " + this.nodeIds.get(solutionSet) + ",\n");
+			writer.print("\t\t\"next_workset\": " + this.nodeIds.get(worksetRoot) + ",\n");
+			writer.print("\t\t\"solution_delta\": " + this.nodeIds.get(solutionDelta) + ",\n");
+		}
+		
+		// print the id
+		writer.print("\t\t\"id\": " + this.nodeIds.get(node));
+
+		
+		final String type;
+		String contents;
+		if (n instanceof DataSinkNode) {
+			type = "sink";
+			contents = n.getOperator().toString();
+		} else if (n instanceof DataSourceNode) {
+			type = "source";
+			contents = n.getOperator().toString();
+		}
+		else if (n instanceof BulkIterationNode) {
+			type = "bulk_iteration";
+			contents = n.getOperator().getName();
+		}
+		else if (n instanceof WorksetIterationNode) {
+			type = "workset_iteration";
+			contents = n.getOperator().getName();
+		}
+		else if (n instanceof BinaryUnionNode) {
+			type = "pact";
+			contents = "";
+		}
+		else {
+			type = "pact";
+			contents = n.getOperator().getName();
+		}
+		
+		contents = StringUtils.showControlCharacters(contents);
+		if (encodeForHTML) {
+			contents = StringEscapeUtils.escapeHtml4(contents);
+			contents = contents.replace("\\", "&#92;");
+		}
+		
+		
+		String name = n.getName();
+		if (name.equals("Reduce") && (node instanceof SingleInputPlanNode) && 
+				((SingleInputPlanNode) node).getDriverStrategy() == DriverStrategy.SORTED_GROUP_COMBINE) {
+			name = "Combine";
+		}
+		
+		// output the type identifier
+		writer.print(",\n\t\t\"type\": \"" + type + "\"");
+		
+		// output node name
+		writer.print(",\n\t\t\"pact\": \"" + name + "\"");
+		
+		// output node contents
+		writer.print(",\n\t\t\"contents\": \"" + contents + "\"");
+
+		// degree of parallelism
+		writer.print(",\n\t\t\"parallelism\": \""
+			+ (n.getParallelism() >= 1 ? n.getParallelism() : "default") + "\"");
+		
+		// output node predecessors
+		Iterator<? extends DumpableConnection<?>> inConns = node.getDumpableInputs().iterator();
+		String child1name = "", child2name = "";
+
+		if (inConns != null && inConns.hasNext()) {
+			// start predecessor list
+			writer.print(",\n\t\t\"predecessors\": [");
+			int inputNum = 0;
+			
+			while (inConns.hasNext()) {
+				final DumpableConnection<?> inConn = inConns.next();
+				final DumpableNode<?> source = inConn.getSource();
+				writer.print(inputNum == 0 ? "\n" : ",\n");
+				if (inputNum == 0) {
+					child1name += child1name.length() > 0 ? ", " : ""; 
+					child1name += source.getOptimizerNode().getOperator().getName();
+				} else if (inputNum == 1) {
+					child2name += child2name.length() > 0 ? ", " : ""; 
+					child2name = source.getOptimizerNode().getOperator().getName();
+				}
+
+				// output predecessor id
+				writer.print("\t\t\t{\"id\": " + this.nodeIds.get(source));
+
+				// output connection side
+				if (inConns.hasNext() || inputNum > 0) {
+					writer.print(", \"side\": \"" + (inputNum == 0 ? "first" : "second") + "\"");
+				}
+				// output shipping strategy and channel type
+				final Channel channel = (inConn instanceof Channel) ? (Channel) inConn : null; 
+				final ShipStrategyType shipType = channel != null ? channel.getShipStrategy() :
+						((DagConnection) inConn).getShipStrategy();
+					
+				String shipStrategy = null;
+				if (shipType != null) {
+					switch (shipType) {
+					case NONE:
+						// nothing
+						break;
+					case FORWARD:
+						shipStrategy = "Forward";
+						break;
+					case BROADCAST:
+						shipStrategy = "Broadcast";
+						break;
+					case PARTITION_HASH:
+						shipStrategy = "Hash Partition";
+						break;
+					case PARTITION_RANGE:
+						shipStrategy = "Range Partition";
+						break;
+					case PARTITION_RANDOM:
+						shipStrategy = "Redistribute";
+						break;
+					case PARTITION_FORCED_REBALANCE:
+						shipStrategy = "Rebalance";
+						break;
+					case PARTITION_CUSTOM:
+						shipStrategy = "Custom Partition";
+						break;
+					default:
+						throw new CompilerException("Unknown ship strategy '" + inConn.getShipStrategy().name()
+							+ "' in JSON generator.");
+					}
+				}
+				
+				if (channel != null && channel.getShipStrategyKeys() != null && channel.getShipStrategyKeys().size() > 0) {
+					shipStrategy += " on " + (channel.getShipStrategySortOrder() == null ?
+							channel.getShipStrategyKeys().toString() :
+							Utils.createOrdering(channel.getShipStrategyKeys(), channel.getShipStrategySortOrder()).toString());
+				}
+
+				if (shipStrategy != null) {
+					writer.print(", \"ship_strategy\": \"" + shipStrategy + "\"");
+				}
+				
+				if (channel != null) {
+					String localStrategy = null;
+					switch (channel.getLocalStrategy()) {
+					case NONE:
+						break;
+					case SORT:
+						localStrategy = "Sort";
+						break;
+					case COMBININGSORT:
+						localStrategy = "Sort (combining)";
+						break;
+					default:
+						throw new CompilerException("Unknown local strategy " + channel.getLocalStrategy().name());
+					}
+					
+					if (channel != null && channel.getLocalStrategyKeys() != null && channel.getLocalStrategyKeys().size() > 0) {
+						localStrategy += " on " + (channel.getLocalStrategySortOrder() == null ?
+								channel.getLocalStrategyKeys().toString() :
+								Utils.createOrdering(channel.getLocalStrategyKeys(), channel.getLocalStrategySortOrder()).toString());
+					}
+					
+					if (localStrategy != null) {
+						writer.print(", \"local_strategy\": \"" + localStrategy + "\"");
+					}
+					
+					if (channel != null && channel.getTempMode() != TempMode.NONE) {
+						String tempMode = channel.getTempMode().toString();
+						writer.print(", \"temp_mode\": \"" + tempMode + "\"");
+					}
+				}
+				
+				writer.print('}');
+				inputNum++;
+			}
+			// finish predecessors
+			writer.print("\n\t\t]");
+		}
+		
+		//---------------------------------------------------------------------------------------
+		// the part below here is relevant only to plan nodes with concrete strategies, etc
+		//---------------------------------------------------------------------------------------
+
+		final PlanNode p = node.getPlanNode();
+		if (p == null) {
+			// finish node
+			writer.print("\n\t}");
+			return true;
+		}
+		// local strategy
+		String locString = null;
+		if (p.getDriverStrategy() != null) {
+			switch (p.getDriverStrategy()) {
+			case NONE:
+			case BINARY_NO_OP:
+				break;
+				
+			case UNARY_NO_OP:
+				locString = "No-Op";
+				break;
+				
+			case COLLECTOR_MAP:
+			case MAP:
+				locString = "Map";
+				break;
+				
+			case FLAT_MAP:
+				locString = "FlatMap";
+				break;
+				
+			case MAP_PARTITION:
+				locString = "Map Partition";
+				break;
+			
+			case ALL_REDUCE:
+				locString = "Reduce All";
+				break;
+			
+			case ALL_GROUP_REDUCE:
+			case ALL_GROUP_REDUCE_COMBINE:
+				locString = "Group Reduce All";
+				break;
+				
+			case SORTED_REDUCE:
+				locString = "Sorted Reduce";
+				break;
+				
+			case SORTED_PARTIAL_REDUCE:
+				locString = "Sorted Combine/Reduce";
+				break;
+
+			case SORTED_GROUP_REDUCE:
+				locString = "Sorted Group Reduce";
+				break;
+				
+			case SORTED_GROUP_COMBINE:
+				locString = "Sorted Combine";
+				break;
+
+			case HYBRIDHASH_BUILD_FIRST:
+				locString = "Hybrid Hash (build: " + child1name + ")";
+				break;
+			case HYBRIDHASH_BUILD_SECOND:
+				locString = "Hybrid Hash (build: " + child2name + ")";
+				break;
+				
+			case HYBRIDHASH_BUILD_FIRST_CACHED:
+				locString = "Hybrid Hash (CACHED) (build: " + child1name + ")";
+				break;
+			case HYBRIDHASH_BUILD_SECOND_CACHED:
+				locString = "Hybrid Hash (CACHED) (build: " + child2name + ")";
+				break;
+
+			case NESTEDLOOP_BLOCKED_OUTER_FIRST:
+				locString = "Nested Loops (Blocked Outer: " + child1name + ")";
+				break;
+			case NESTEDLOOP_BLOCKED_OUTER_SECOND:
+				locString = "Nested Loops (Blocked Outer: " + child2name + ")";
+				break;
+			case NESTEDLOOP_STREAMED_OUTER_FIRST:
+				locString = "Nested Loops (Streamed Outer: " + child1name + ")";
+				break;
+			case NESTEDLOOP_STREAMED_OUTER_SECOND:
+				locString = "Nested Loops (Streamed Outer: " + child2name + ")";
+				break;
+
+			case MERGE:
+				locString = "Merge";
+				break;
+
+			case CO_GROUP:
+				locString = "Co-Group";
+				break;
+
+			default:
+				locString = p.getDriverStrategy().name();
+				break;
+			}
+
+			if (locString != null) {
+				writer.print(",\n\t\t\"driver_strategy\": \"");
+				writer.print(locString);
+				writer.print("\"");
+			}
+		}
+		
+		{
+			// output node global properties
+			final GlobalProperties gp = p.getGlobalProperties();
+
+			writer.print(",\n\t\t\"global_properties\": [\n");
+
+			addProperty(writer, "Partitioning", gp.getPartitioning().name(), true);
+			if (gp.getPartitioningFields() != null) {
+				addProperty(writer, "Partitioned on", gp.getPartitioningFields().toString(), false);
+			}
+			if (gp.getPartitioningOrdering() != null) {
+				addProperty(writer, "Partitioning Order", gp.getPartitioningOrdering().toString(), false);	
+			}
+			else {
+				addProperty(writer, "Partitioning Order", "(none)", false);
+			}
+			if (n.getUniqueFields() == null || n.getUniqueFields().size() == 0) {
+				addProperty(writer, "Uniqueness", "not unique", false);
+			}
+			else {
+				addProperty(writer, "Uniqueness", n.getUniqueFields().toString(), false);	
+			}
+
+			writer.print("\n\t\t]");
+		}
+
+		{
+			// output node local properties
+			LocalProperties lp = p.getLocalProperties();
+
+			writer.print(",\n\t\t\"local_properties\": [\n");
+
+			if (lp.getOrdering() != null) {
+				addProperty(writer, "Order", lp.getOrdering().toString(), true);	
+			}
+			else {
+				addProperty(writer, "Order", "(none)", true);
+			}
+			if (lp.getGroupedFields() != null && lp.getGroupedFields().size() > 0) {
+				addProperty(writer, "Grouped on", lp.getGroupedFields().toString(), false);
+			} else {
+				addProperty(writer, "Grouping", "not grouped", false);	
+			}
+			if (n.getUniqueFields() == null || n.getUniqueFields().size() == 0) {
+				addProperty(writer, "Uniqueness", "not unique", false);
+			}
+			else {
+				addProperty(writer, "Uniqueness", n.getUniqueFields().toString(), false);	
+			}
+
+			writer.print("\n\t\t]");
+		}
+
+		// output node size estimates
+		writer.print(",\n\t\t\"estimates\": [\n");
+
+		addProperty(writer, "Est. Output Size", n.getEstimatedOutputSize() == -1 ? "(unknown)"
+			: formatNumber(n.getEstimatedOutputSize(), "B"), true);
+		addProperty(writer, "Est. Cardinality", n.getEstimatedNumRecords() == -1 ? "(unknown)"
+			: formatNumber(n.getEstimatedNumRecords()), false);
+
+		writer.print("\t\t]");
+
+		// output node cost
+		if (p.getNodeCosts() != null) {
+			writer.print(",\n\t\t\"costs\": [\n");
+
+			addProperty(writer, "Network", p.getNodeCosts().getNetworkCost() == -1 ? "(unknown)"
+				: formatNumber(p.getNodeCosts().getNetworkCost(), "B"), true);
+			addProperty(writer, "Disk I/O", p.getNodeCosts().getDiskCost() == -1 ? "(unknown)"
+				: formatNumber(p.getNodeCosts().getDiskCost(), "B"), false);
+			addProperty(writer, "CPU", p.getNodeCosts().getCpuCost() == -1 ? "(unknown)"
+				: formatNumber(p.getNodeCosts().getCpuCost(), ""), false);
+
+			addProperty(writer, "Cumulative Network",
+				p.getCumulativeCosts().getNetworkCost() == -1 ? "(unknown)" : formatNumber(p
+					.getCumulativeCosts().getNetworkCost(), "B"), false);
+			addProperty(writer, "Cumulative Disk I/O",
+				p.getCumulativeCosts().getDiskCost() == -1 ? "(unknown)" : formatNumber(p
+					.getCumulativeCosts().getDiskCost(), "B"), false);
+			addProperty(writer, "Cumulative CPU",
+				p.getCumulativeCosts().getCpuCost() == -1 ? "(unknown)" : formatNumber(p
+					.getCumulativeCosts().getCpuCost(), ""), false);
+
+			writer.print("\n\t\t]");
+		}
+
+		// output the node compiler hints
+		if (n.getOperator().getCompilerHints() != null) {
+			CompilerHints hints = n.getOperator().getCompilerHints();
+			CompilerHints defaults = new CompilerHints();
+
+			String size = hints.getOutputSize() == defaults.getOutputSize() ? "(none)" : String.valueOf(hints.getOutputSize());
+			String card = hints.getOutputCardinality() == defaults.getOutputCardinality() ? "(none)" : String.valueOf(hints.getOutputCardinality());
+			String width = hints.getAvgOutputRecordSize() == defaults.getAvgOutputRecordSize() ? "(none)" : String.valueOf(hints.getAvgOutputRecordSize());
+			String filter = hints.getFilterFactor() == defaults.getFilterFactor() ? "(none)" : String.valueOf(hints.getFilterFactor());
+			
+			writer.print(",\n\t\t\"compiler_hints\": [\n");
+
+			addProperty(writer, "Output Size (bytes)", size, true);
+			addProperty(writer, "Output Cardinality", card, false);
+			addProperty(writer, "Avg. Output Record Size (bytes)", width, false);
+			addProperty(writer, "Filter Factor", filter, false);
+
+			writer.print("\t\t]");
+		}
+
+		// finish node
+		writer.print("\n\t}");
+		return true;
+	}
+
+	private void addProperty(PrintWriter writer, String name, String value, boolean first) {
+		if (!first) {
+			writer.print(",\n");
+		}
+		writer.print("\t\t\t{ \"name\": \"");
+		writer.print(name);
+		writer.print("\", \"value\": \"");
+		writer.print(value);
+		writer.print("\" }");
+	}
+
+	public static final String formatNumber(double number) {
+		return formatNumber(number, "");
+	}
+
+	public static final String formatNumber(double number, String suffix) {
+		if (number <= 0.0) {
+			return String.valueOf(number);
+		}
+
+		int power = (int) Math.ceil(Math.log10(number));
+
+		int group = (power - 1) / 3;
+		if (group >= SIZE_SUFFIXES.length) {
+			group = SIZE_SUFFIXES.length - 1;
+		} else if (group < 0) {
+			group = 0;
+		}
+
+		// truncate fractional part
+		int beforeDecimal = power - group * 3;
+		if (power > beforeDecimal) {
+			for (int i = power - beforeDecimal; i > 0; i--) {
+				number /= 10;
+			}
+		}
+		
+		return group > 0 ? String.format(Locale.US, "%.2f %s", number, SIZE_SUFFIXES[group]) :
+			String.format(Locale.US, "%.2f", number);
+	}
+
+	private static final char[] SIZE_SUFFIXES = { 0, 'K', 'M', 'G', 'T' };
+}