You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by an...@apache.org on 2013/11/26 21:17:43 UTC

svn commit: r1545813 - in /pig/trunk: CHANGES.txt src/org/apache/pig/newplan/logical/optimizer/LogicalPlanPrinter.java

Author: aniket486
Date: Tue Nov 26 20:17:43 2013
New Revision: 1545813

URL: http://svn.apache.org/r1545813
Log:
PIG-3567: LogicalPlanPrinter throws OOM for large scripts (aniket486)

Modified:
    pig/trunk/CHANGES.txt
    pig/trunk/src/org/apache/pig/newplan/logical/optimizer/LogicalPlanPrinter.java

Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1545813&r1=1545812&r2=1545813&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Tue Nov 26 20:17:43 2013
@@ -119,6 +119,8 @@ PIG-3480: TFile-based tmpfile compressio
 
 BUG FIXES
 
+PIG-3567: LogicalPlanPrinter throws OOM for large scripts (aniket486)
+
 PIG-3579: pig.script's deserialized version does not maintain line numbers (jgzhang via aniket486)
 
 PIG-3570: Rollback PIG-3060 (daijy)

Modified: pig/trunk/src/org/apache/pig/newplan/logical/optimizer/LogicalPlanPrinter.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/logical/optimizer/LogicalPlanPrinter.java?rev=1545813&r1=1545812&r2=1545813&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/newplan/logical/optimizer/LogicalPlanPrinter.java (original)
+++ pig/trunk/src/org/apache/pig/newplan/logical/optimizer/LogicalPlanPrinter.java Tue Nov 26 20:17:43 2013
@@ -17,7 +17,6 @@
  */
 package org.apache.pig.newplan.logical.optimizer;
 
-import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.io.PrintStream;
 import java.util.ArrayList;
@@ -47,138 +46,121 @@ import org.apache.pig.newplan.logical.re
 public class LogicalPlanPrinter extends PlanVisitor {
 
     private PrintStream mStream = null;
-    private String TAB1 = "    ";
-    private String TABMore = "|   ";
-    private String LSep = "|\n|---";
-    private String USep = "|   |\n|   ";
+    private byte[] TAB1 = "    ".getBytes();
+    private byte[] TABMore = "|   ".getBytes();
+    private byte[] Bar = "|\n".getBytes();
+    private byte[] LSep = "|---".getBytes();
+    private byte[] USep = "|   |\n".getBytes();
     static public String SEPERATE = "\t";
 
+    protected ArrayList<byte[]> tabs;
+    protected boolean reverse = false;
+
     /**
      * @param ps PrintStream to output plan information to
      * @param plan Logical plan to print
      */
     public LogicalPlanPrinter(OperatorPlan plan, PrintStream ps) throws FrontendException {
+        this(plan, ps, new ArrayList<byte[]>());
+    }
+
+    private LogicalPlanPrinter(OperatorPlan plan, PrintStream ps, ArrayList<byte[]> tabs) throws FrontendException {
         super(plan, null);
         mStream = ps;
+        this.tabs = tabs;
+        if (plan instanceof LogicalPlan) {
+            reverse = false;
+        }
+        else {
+            reverse = true;
+        }
     }
 
     @Override
     public void visit() throws FrontendException {
         try {
-            if (plan instanceof LogicalPlan) {
-                mStream.write(depthFirstLP().getBytes());
-            }
-            else {
-                mStream.write(reverseDepthFirstLP().getBytes());
-            }
+            depthFirstLP();
         } catch (IOException e) {
             throw new FrontendException(e);
         }
     }
 
-    protected String depthFirstLP() throws FrontendException, IOException {
-        StringBuilder sb = new StringBuilder();
-        List<Operator> leaves = plan.getSinks();
+    protected void depthFirstLP() throws FrontendException, IOException {
+        List<Operator> leaves;
+        if(reverse) {
+            leaves = plan.getSources();
+        } else {
+            leaves = plan.getSinks();
+        }
         for (Operator leaf : leaves) {
-            sb.append(depthFirst(leaf));
-            sb.append("\n");
+            writeWithTabs((leaf.toString()+"\n").getBytes());
+            depthFirst(leaf);
         }
-        return sb.toString();
     }
-    
-    private String depthFirst(Operator node) throws FrontendException, IOException {
-        String nodeString = printNode(node);
-        
-        List<Operator> originalPredecessors =  plan.getPredecessors(node);
-        if (originalPredecessors == null)
-            return nodeString;
-        
-        StringBuffer sb = new StringBuffer(nodeString);
-        List<Operator> predecessors =  new ArrayList<Operator>(originalPredecessors);
-        
-        int i = 0;
-        for (Operator pred : predecessors) {
-            i++;
-            String DFStr = depthFirst(pred);
-            if (DFStr != null) {
-                sb.append(LSep);
-                if (i < predecessors.size())
-                    sb.append(shiftStringByTabs(DFStr, 2));
-                else
-                    sb.append(shiftStringByTabs(DFStr, 1));
-            }
+
+    private void writeWithTabs(byte[] data) throws IOException {
+        for(byte[] tab : tabs) {
+            mStream.write(tab);
         }
-        return sb.toString();
+        mStream.write(data);
     }
-    
-    protected String reverseDepthFirstLP() throws FrontendException, IOException {
-        StringBuilder sb = new StringBuilder();
-        List<Operator> roots = plan.getSources();
-        for (Operator root : roots) {
-            sb.append(reverseDepthFirst(root));
-            sb.append("\n");
-        }
-        return sb.toString();
-    }
-    
-    private String reverseDepthFirst(Operator node) throws FrontendException, IOException {
-        String nodeString = printNode(node);
-        
-        List<Operator> originalSuccessors =  plan.getSuccessors(node);
-        if (originalSuccessors == null)
-            return nodeString;
-        
-        StringBuffer sb = new StringBuffer(nodeString);
-        List<Operator> successors =  new ArrayList<Operator>(originalSuccessors);
-        
+
+    private void depthFirst(Operator node) throws FrontendException, IOException {
+        printNodePlan(node);
+        List<Operator> operators;
+
+        if(reverse) {
+            operators = plan.getSuccessors(node);
+        } else {
+            operators =  plan.getPredecessors(node);
+        }
+        if (operators == null)
+            return;
+
+        List<Operator> predecessors =  new ArrayList<Operator>(operators);
+
         int i = 0;
-        for (Operator succ : successors) {
+        for (Operator pred : predecessors) {
             i++;
-            String DFStr = reverseDepthFirst(succ);
-            if (DFStr != null) {
-                sb.append(LSep);
-                if (i < successors.size())
-                    sb.append(shiftStringByTabs(DFStr, 2));
-                else
-                    sb.append(shiftStringByTabs(DFStr, 1));
-            }
+            writeWithTabs(Bar);
+            writeWithTabs(LSep);
+            mStream.write((pred.toString()+"\n").getBytes());
+            if (i < predecessors.size()) {
+                tabs.add(TABMore);
+            } else {
+                tabs.add(TAB1);
+            }
+            depthFirst(pred);
+            tabs.remove(tabs.size() - 1);
         }
-        return sb.toString();
     }
-    
-    private String planString(OperatorPlan lp) throws VisitorException, IOException {
-        StringBuilder sb = new StringBuilder();
-        ByteArrayOutputStream baos = new ByteArrayOutputStream();
-        PrintStream ps = new PrintStream(baos);
+
+    private void printPlan(OperatorPlan lp) throws VisitorException, IOException {
+        writeWithTabs(USep);
+        tabs.add(TABMore);
         if(lp!=null) {
-            LogicalPlanPrinter printer = new LogicalPlanPrinter(lp, ps);
+            LogicalPlanPrinter printer = new LogicalPlanPrinter(lp, mStream, tabs);
             printer.visit();
         }
-        else
-            return "";
-        sb.append(USep);
-        sb.append(shiftStringByTabs(baos.toString(), 2));
-        return sb.toString();
-    }
-    
-    private String printNode(Operator node) throws FrontendException, IOException {
-        StringBuilder sb = new StringBuilder(node.toString()+"\n");
-        
+        tabs.remove(tabs.size() - 1);
+    }
+
+    private void printNodePlan(Operator node) throws FrontendException, IOException {
         if(node instanceof LOFilter){
-            sb.append(planString(((LOFilter)node).getFilterPlan()));
+            printPlan(((LOFilter)node).getFilterPlan());
         }
         else if(node instanceof LOLimit){
-            sb.append(planString(((LOLimit)node).getLimitPlan()));
+            printPlan(((LOLimit)node).getLimitPlan());
         }
         else if(node instanceof LOForEach){
-            sb.append(planString(((LOForEach)node).getInnerPlan()));        
+            printPlan(((LOForEach)node).getInnerPlan());        
         }
         else if(node instanceof LOCogroup){
             MultiMap<Integer, LogicalExpressionPlan> plans = ((LOCogroup)node).getExpressionPlans();
             for (int i : plans.keySet()) {
                 // Visit the associated plans
                 for (OperatorPlan plan : plans.get(i)) {
-                    sb.append(planString(plan));
+                    printPlan(plan);
                 }
             }
         }
@@ -187,44 +169,28 @@ public class LogicalPlanPrinter extends 
             for (int i: plans.keySet()) {
                 // Visit the associated plans
                 for (OperatorPlan plan : plans.get(i)) {
-                    sb.append(planString(plan));
+                    printPlan(plan);
                 }
             }
         }
         else if(node instanceof LORank){
             // Visit fields for rank
             for (OperatorPlan plan : ((LORank)node).getRankColPlans())
-                sb.append(planString(plan));
+                printPlan(plan);
         }
         else if(node instanceof LOSort){
             for (OperatorPlan plan : ((LOSort)node).getSortColPlans())
-                sb.append(planString(plan));
+                printPlan(plan);
         }
         else if(node instanceof LOSplitOutput){
-            sb.append(planString(((LOSplitOutput)node).getFilterPlan()));
+            printPlan(((LOSplitOutput)node).getFilterPlan());
         }
         else if(node instanceof LOGenerate){
             for (OperatorPlan plan : ((LOGenerate)node).getOutputPlans()) {
-                sb.append(planString(plan));
+                printPlan(plan);
             }
         }
-        return sb.toString();
-    }
-
-    private String shiftStringByTabs(String DFStr, int TabType) {
-        StringBuilder sb = new StringBuilder();
-        String[] spl = DFStr.split("\n");
-
-        String tab = (TabType == 1) ? TAB1 : TABMore;
-
-        sb.append(spl[0] + "\n");
-        for (int i = 1; i < spl.length; i++) {
-            sb.append(tab);
-            sb.append(spl[i]);
-            sb.append("\n");
-        }
-        return sb.toString();
     }
 }
 
-        
+