You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by an...@apache.org on 2013/11/26 21:17:43 UTC
svn commit: r1545813 - in /pig/trunk: CHANGES.txt
src/org/apache/pig/newplan/logical/optimizer/LogicalPlanPrinter.java
Author: aniket486
Date: Tue Nov 26 20:17:43 2013
New Revision: 1545813
URL: http://svn.apache.org/r1545813
Log:
PIG-3567: LogicalPlanPrinter throws OOM for large scripts (aniket486)
Modified:
pig/trunk/CHANGES.txt
pig/trunk/src/org/apache/pig/newplan/logical/optimizer/LogicalPlanPrinter.java
Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1545813&r1=1545812&r2=1545813&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Tue Nov 26 20:17:43 2013
@@ -119,6 +119,8 @@ PIG-3480: TFile-based tmpfile compressio
BUG FIXES
+PIG-3567: LogicalPlanPrinter throws OOM for large scripts (aniket486)
+
PIG-3579: pig.script's deserialized version does not maintain line numbers (jgzhang via aniket486)
PIG-3570: Rollback PIG-3060 (daijy)
Modified: pig/trunk/src/org/apache/pig/newplan/logical/optimizer/LogicalPlanPrinter.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/logical/optimizer/LogicalPlanPrinter.java?rev=1545813&r1=1545812&r2=1545813&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/newplan/logical/optimizer/LogicalPlanPrinter.java (original)
+++ pig/trunk/src/org/apache/pig/newplan/logical/optimizer/LogicalPlanPrinter.java Tue Nov 26 20:17:43 2013
@@ -17,7 +17,6 @@
*/
package org.apache.pig.newplan.logical.optimizer;
-import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.PrintStream;
import java.util.ArrayList;
@@ -47,138 +46,121 @@ import org.apache.pig.newplan.logical.re
public class LogicalPlanPrinter extends PlanVisitor {
private PrintStream mStream = null;
- private String TAB1 = " ";
- private String TABMore = "| ";
- private String LSep = "|\n|---";
- private String USep = "| |\n| ";
+ private byte[] TAB1 = " ".getBytes();
+ private byte[] TABMore = "| ".getBytes();
+ private byte[] Bar = "|\n".getBytes();
+ private byte[] LSep = "|---".getBytes();
+ private byte[] USep = "| |\n".getBytes();
static public String SEPERATE = "\t";
+ protected ArrayList<byte[]> tabs;
+ protected boolean reverse = false;
+
/**
* @param ps PrintStream to output plan information to
* @param plan Logical plan to print
*/
public LogicalPlanPrinter(OperatorPlan plan, PrintStream ps) throws FrontendException {
+ this(plan, ps, new ArrayList<byte[]>());
+ }
+
+ private LogicalPlanPrinter(OperatorPlan plan, PrintStream ps, ArrayList<byte[]> tabs) throws FrontendException {
super(plan, null);
mStream = ps;
+ this.tabs = tabs;
+ if (plan instanceof LogicalPlan) {
+ reverse = false;
+ }
+ else {
+ reverse = true;
+ }
}
@Override
public void visit() throws FrontendException {
try {
- if (plan instanceof LogicalPlan) {
- mStream.write(depthFirstLP().getBytes());
- }
- else {
- mStream.write(reverseDepthFirstLP().getBytes());
- }
+ depthFirstLP();
} catch (IOException e) {
throw new FrontendException(e);
}
}
- protected String depthFirstLP() throws FrontendException, IOException {
- StringBuilder sb = new StringBuilder();
- List<Operator> leaves = plan.getSinks();
+ protected void depthFirstLP() throws FrontendException, IOException {
+ List<Operator> leaves;
+ if(reverse) {
+ leaves = plan.getSources();
+ } else {
+ leaves = plan.getSinks();
+ }
for (Operator leaf : leaves) {
- sb.append(depthFirst(leaf));
- sb.append("\n");
+ writeWithTabs((leaf.toString()+"\n").getBytes());
+ depthFirst(leaf);
}
- return sb.toString();
}
-
- private String depthFirst(Operator node) throws FrontendException, IOException {
- String nodeString = printNode(node);
-
- List<Operator> originalPredecessors = plan.getPredecessors(node);
- if (originalPredecessors == null)
- return nodeString;
-
- StringBuffer sb = new StringBuffer(nodeString);
- List<Operator> predecessors = new ArrayList<Operator>(originalPredecessors);
-
- int i = 0;
- for (Operator pred : predecessors) {
- i++;
- String DFStr = depthFirst(pred);
- if (DFStr != null) {
- sb.append(LSep);
- if (i < predecessors.size())
- sb.append(shiftStringByTabs(DFStr, 2));
- else
- sb.append(shiftStringByTabs(DFStr, 1));
- }
+
+ private void writeWithTabs(byte[] data) throws IOException {
+ for(byte[] tab : tabs) {
+ mStream.write(tab);
}
- return sb.toString();
+ mStream.write(data);
}
-
- protected String reverseDepthFirstLP() throws FrontendException, IOException {
- StringBuilder sb = new StringBuilder();
- List<Operator> roots = plan.getSources();
- for (Operator root : roots) {
- sb.append(reverseDepthFirst(root));
- sb.append("\n");
- }
- return sb.toString();
- }
-
- private String reverseDepthFirst(Operator node) throws FrontendException, IOException {
- String nodeString = printNode(node);
-
- List<Operator> originalSuccessors = plan.getSuccessors(node);
- if (originalSuccessors == null)
- return nodeString;
-
- StringBuffer sb = new StringBuffer(nodeString);
- List<Operator> successors = new ArrayList<Operator>(originalSuccessors);
-
+
+ private void depthFirst(Operator node) throws FrontendException, IOException {
+ printNodePlan(node);
+ List<Operator> operators;
+
+ if(reverse) {
+ operators = plan.getSuccessors(node);
+ } else {
+ operators = plan.getPredecessors(node);
+ }
+ if (operators == null)
+ return;
+
+ List<Operator> predecessors = new ArrayList<Operator>(operators);
+
int i = 0;
- for (Operator succ : successors) {
+ for (Operator pred : predecessors) {
i++;
- String DFStr = reverseDepthFirst(succ);
- if (DFStr != null) {
- sb.append(LSep);
- if (i < successors.size())
- sb.append(shiftStringByTabs(DFStr, 2));
- else
- sb.append(shiftStringByTabs(DFStr, 1));
- }
+ writeWithTabs(Bar);
+ writeWithTabs(LSep);
+ mStream.write((pred.toString()+"\n").getBytes());
+ if (i < predecessors.size()) {
+ tabs.add(TABMore);
+ } else {
+ tabs.add(TAB1);
+ }
+ depthFirst(pred);
+ tabs.remove(tabs.size() - 1);
}
- return sb.toString();
}
-
- private String planString(OperatorPlan lp) throws VisitorException, IOException {
- StringBuilder sb = new StringBuilder();
- ByteArrayOutputStream baos = new ByteArrayOutputStream();
- PrintStream ps = new PrintStream(baos);
+
+ private void printPlan(OperatorPlan lp) throws VisitorException, IOException {
+ writeWithTabs(USep);
+ tabs.add(TABMore);
if(lp!=null) {
- LogicalPlanPrinter printer = new LogicalPlanPrinter(lp, ps);
+ LogicalPlanPrinter printer = new LogicalPlanPrinter(lp, mStream, tabs);
printer.visit();
}
- else
- return "";
- sb.append(USep);
- sb.append(shiftStringByTabs(baos.toString(), 2));
- return sb.toString();
- }
-
- private String printNode(Operator node) throws FrontendException, IOException {
- StringBuilder sb = new StringBuilder(node.toString()+"\n");
-
+ tabs.remove(tabs.size() - 1);
+ }
+
+ private void printNodePlan(Operator node) throws FrontendException, IOException {
if(node instanceof LOFilter){
- sb.append(planString(((LOFilter)node).getFilterPlan()));
+ printPlan(((LOFilter)node).getFilterPlan());
}
else if(node instanceof LOLimit){
- sb.append(planString(((LOLimit)node).getLimitPlan()));
+ printPlan(((LOLimit)node).getLimitPlan());
}
else if(node instanceof LOForEach){
- sb.append(planString(((LOForEach)node).getInnerPlan()));
+ printPlan(((LOForEach)node).getInnerPlan());
}
else if(node instanceof LOCogroup){
MultiMap<Integer, LogicalExpressionPlan> plans = ((LOCogroup)node).getExpressionPlans();
for (int i : plans.keySet()) {
// Visit the associated plans
for (OperatorPlan plan : plans.get(i)) {
- sb.append(planString(plan));
+ printPlan(plan);
}
}
}
@@ -187,44 +169,28 @@ public class LogicalPlanPrinter extends
for (int i: plans.keySet()) {
// Visit the associated plans
for (OperatorPlan plan : plans.get(i)) {
- sb.append(planString(plan));
+ printPlan(plan);
}
}
}
else if(node instanceof LORank){
// Visit fields for rank
for (OperatorPlan plan : ((LORank)node).getRankColPlans())
- sb.append(planString(plan));
+ printPlan(plan);
}
else if(node instanceof LOSort){
for (OperatorPlan plan : ((LOSort)node).getSortColPlans())
- sb.append(planString(plan));
+ printPlan(plan);
}
else if(node instanceof LOSplitOutput){
- sb.append(planString(((LOSplitOutput)node).getFilterPlan()));
+ printPlan(((LOSplitOutput)node).getFilterPlan());
}
else if(node instanceof LOGenerate){
for (OperatorPlan plan : ((LOGenerate)node).getOutputPlans()) {
- sb.append(planString(plan));
+ printPlan(plan);
}
}
- return sb.toString();
- }
-
- private String shiftStringByTabs(String DFStr, int TabType) {
- StringBuilder sb = new StringBuilder();
- String[] spl = DFStr.split("\n");
-
- String tab = (TabType == 1) ? TAB1 : TABMore;
-
- sb.append(spl[0] + "\n");
- for (int i = 1; i < spl.length; i++) {
- sb.append(tab);
- sb.append(spl[i]);
- sb.append("\n");
- }
- return sb.toString();
}
}
-
+