You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ol...@apache.org on 2009/02/25 06:18:48 UTC

svn commit: r747660 [1/2] - in /hadoop/pig/branches/multiquery: ./ src/org/apache/pig/ src/org/apache/pig/backend/executionengine/ src/org/apache/pig/backend/hadoop/executionengine/ src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/ src/...

Author: olga
Date: Wed Feb 25 05:18:47 2009
New Revision: 747660

URL: http://svn.apache.org/viewvc?rev=747660&view=rev
Log:
 PIG-627: multiquery support M1 (hagleitn via olgan)

Added:
    hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/DotMRPrinter.java
    hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/DotPOPrinter.java
    hadoop/pig/branches/multiquery/src/org/apache/pig/impl/logicalLayer/DotLOPrinter.java
    hadoop/pig/branches/multiquery/src/org/apache/pig/impl/plan/DotPlanDumper.java
    hadoop/pig/branches/multiquery/src/org/apache/pig/impl/plan/PlanDumper.java
    hadoop/pig/branches/multiquery/test/org/apache/pig/test/TestMultiQuery.java
    hadoop/pig/branches/multiquery/test/org/apache/pig/test/TestMultiQueryLocal.java
    hadoop/pig/branches/multiquery/test/org/apache/pig/test/data/passwd
    hadoop/pig/branches/multiquery/test/org/apache/pig/test/data/passwd2
    hadoop/pig/branches/multiquery/test/org/apache/pig/test/data/test.ppf
    hadoop/pig/branches/multiquery/test/org/apache/pig/test/data/test_broken.ppf
    hadoop/pig/branches/multiquery/test/org/apache/pig/test/data/testsub.pig
    hadoop/pig/branches/multiquery/test/org/apache/pig/test/data/testsubnested_exec.pig
    hadoop/pig/branches/multiquery/test/org/apache/pig/test/data/testsubnested_run.pig
Modified:
    hadoop/pig/branches/multiquery/CHANGES.txt
    hadoop/pig/branches/multiquery/src/org/apache/pig/PigServer.java
    hadoop/pig/branches/multiquery/src/org/apache/pig/backend/executionengine/ExecutionEngine.java
    hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java
    hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/Launcher.java
    hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/LocalLauncher.java
    hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
    hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
    hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java
    hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/MRPrinter.java
    hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhysicalPlan.java
    hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PlanPrinter.java
    hadoop/pig/branches/multiquery/src/org/apache/pig/backend/local/executionengine/LocalExecutionEngine.java
    hadoop/pig/branches/multiquery/src/org/apache/pig/backend/local/executionengine/LocalPigLauncher.java
    hadoop/pig/branches/multiquery/src/org/apache/pig/impl/logicalLayer/LOPrinter.java
    hadoop/pig/branches/multiquery/src/org/apache/pig/impl/logicalLayer/LogicalPlan.java
    hadoop/pig/branches/multiquery/src/org/apache/pig/impl/plan/OperatorPlan.java
    hadoop/pig/branches/multiquery/src/org/apache/pig/tools/grunt/GruntParser.java
    hadoop/pig/branches/multiquery/src/org/apache/pig/tools/pigscript/parser/PigScriptParser.jj
    hadoop/pig/branches/multiquery/test/org/apache/pig/test/TestGrunt.java
    hadoop/pig/branches/multiquery/test/org/apache/pig/test/TestPigScriptParser.java

Modified: hadoop/pig/branches/multiquery/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/multiquery/CHANGES.txt?rev=747660&r1=747659&r2=747660&view=diff
==============================================================================
--- hadoop/pig/branches/multiquery/CHANGES.txt (original)
+++ hadoop/pig/branches/multiquery/CHANGES.txt Wed Feb 25 05:18:47 2009
@@ -406,3 +406,5 @@
     storing strings > 65536 bytes (in UTF8 form) using BinStorage() (sms)
 
     PIG-642: Limit after FRJ causes problems (daijy)
+
+    PIG-627: multiquery support M1 (hagleitn via olgan)

Modified: hadoop/pig/branches/multiquery/src/org/apache/pig/PigServer.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/multiquery/src/org/apache/pig/PigServer.java?rev=747660&r1=747659&r2=747660&view=diff
==============================================================================
--- hadoop/pig/branches/multiquery/src/org/apache/pig/PigServer.java (original)
+++ hadoop/pig/branches/multiquery/src/org/apache/pig/PigServer.java Wed Feb 25 05:18:47 2009
@@ -32,6 +32,7 @@
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
+import java.util.Stack;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -51,7 +52,6 @@
 import org.apache.pig.impl.logicalLayer.LogicalOperator;
 import org.apache.pig.impl.logicalLayer.LogicalPlan;
 import org.apache.pig.impl.logicalLayer.LogicalPlanBuilder;
-import org.apache.pig.impl.logicalLayer.LOPrinter;
 import org.apache.pig.impl.logicalLayer.PlanSetter;
 import org.apache.pig.impl.logicalLayer.optimizer.LogicalOptimizer;
 import org.apache.pig.impl.logicalLayer.parser.ParseException;
@@ -95,23 +95,38 @@
         throw new PigException(msg, errCode, PigException.BUG);
     }
 
-
-    Map<LogicalOperator, LogicalPlan> aliases = new HashMap<LogicalOperator, LogicalPlan>();
-    Map<OperatorKey, LogicalOperator> opTable = new HashMap<OperatorKey, LogicalOperator>();
-    Map<String, LogicalOperator> aliasOp = new HashMap<String, LogicalOperator>();
-    PigContext pigContext;
+    /*
+     * The data structure to support grunt shell operations. 
+     * The grunt shell can only work on one graph at a time. 
+     * If a script is contained inside another script, the grunt
+     * shell first saves the current graph on the stack and works 
+     * on a new graph. After the nested script is done, the grunt 
+     * shell pops up the saved graph and continues working on it.
+     */
+    private Stack<Graph> graphs = new Stack<Graph>();
+    
+    /*
+     * The current Graph the grunt shell is working on.
+     */
+    private Graph currDAG;
+ 
+    private PigContext pigContext;
     
+    private static int scopeCounter = 0;
     private String scope = constructScope();
-    private ArrayList<String> cachedScript = new ArrayList<String>();
-    
+
     private String constructScope() {
         // scope servers for now as a session id
-        // scope = user_id + "-" + time_stamp;
         
-        String user = System.getProperty("user.name", "DEFAULT_USER_ID");
-        String date = (new Date()).toString();
-       
-        return user + "-" + date;
+        // String user = System.getProperty("user.name", "DEFAULT_USER_ID");
+        // String date = (new Date()).toString();
+
+        // scope is not really used in the system right now. It will
+        // however make your explain statements look lengthy if set to
+        // username-date. For now let's simplify the scope, if a real
+        // scope is needed again, we might need to update all the
+        // operators to not include scope in their name().
+        return ""+(++scopeCounter);
     }
     
     public PigServer(String execTypeString) throws ExecException, IOException {
@@ -123,21 +138,15 @@
     }
 
     public PigServer(ExecType execType, Properties properties) throws ExecException {
-        this.pigContext = new PigContext(execType, properties);
-        if (this.pigContext.getProperties().getProperty(PigContext.JOB_NAME) == null) {
-            setJobName("DefaultJobName") ;
-        }
-        pigContext.connect();
+        this(new PigContext(execType, properties));
     }
-    
+  
     public PigServer(PigContext context) throws ExecException {
         this.pigContext = context;
-        if (this.pigContext.getProperties().getProperty(PigContext.JOB_NAME) == null) {
-            setJobName("DefaultJobName") ;
-        }
+        currDAG = new Graph(false);
         pigContext.connect();
     }
-
+    
     public PigContext getPigContext(){
         return pigContext;
     }
@@ -149,7 +158,61 @@
     public void debugOff() {
         pigContext.debug = false;
     }
-    
+ 
+    /**
+     * Starts batch execution mode.
+     */
+    public void setBatchOn() {
+        log.info("Create a new graph.");
+        
+        if (currDAG != null) {
+            graphs.push(currDAG);
+        }
+        currDAG = new Graph(true);
+    }
+
+    /**
+     * Retrieve the current execution mode.
+     * 
+     * @return true if the execution mode is batch; false otherwise.
+     */
+    public boolean isBatchOn() {
+        return currDAG.isBatchOn();
+    }
+
+    /**
+     * Submits a batch of Pig commands for execution. 
+     * 
+     * @throws FrontendException
+     * @throws ExecException
+     */
+    public void executeBatch() throws FrontendException, ExecException {
+        if (currDAG == null || !isBatchOn() || graphs.size() < 1) {
+            throw new IllegalStateException("setBatchOn() must be called first.");
+        }
+        
+        try {
+            currDAG.execute();
+        } finally {
+            log.info("Delete the current graph.");
+            currDAG = graphs.pop();
+        }
+    }
+
+    /**
+     * Discards a batch of Pig commands.
+     * 
+     * @throws FrontendException
+     * @throws ExecException
+     */
+    public void discardBatch() throws FrontendException {
+        if (currDAG == null || !isBatchOn() || graphs.size() < 1) {
+            throw new IllegalStateException("setBatchOn() must be called first.");
+        }
+        
+        currDAG = graphs.pop();
+    }
+       
     /**
      * Add a path to be skipped while automatically shipping binaries for 
      * streaming.
@@ -264,53 +327,10 @@
      *            line number of the query within the whold script
      * @throws IOException
      */    
-    public void registerQuery(String query, int startLine) throws IOException {
-            
-        LogicalPlan lp = parseQuery(query, startLine, aliases, opTable, aliasOp);
-        // store away the query for use in cloning later
-        cachedScript .add(query);
-        
-        if (lp.getLeaves().size() == 1)
-        {
-            LogicalOperator op = lp.getSingleLeafPlanOutputOp();
-            // No need to do anything about DEFINE 
-            if (op instanceof LODefine) {
-                return;
-            }
-        
-            // Check if we just processed a LOStore i.e. STORE
-            if (op instanceof LOStore) {
-                try{
-                    execute(null);
-                } catch (Exception e) {
-                    int errCode = 1002;
-                    String msg = "Unable to store alias " + op.getOperatorKey().getId();
-                    throw new FrontendException(msg, errCode, PigException.INPUT, e);
-                }
-            }
-        }
+    public void registerQuery(String query, int startLine) throws IOException {            
+    	currDAG.registerQuery(query, startLine);
     }
-    
-    private LogicalPlan parseQuery(String query, int startLine, Map<LogicalOperator, LogicalPlan> aliasesMap, 
-            Map<OperatorKey, LogicalOperator> opTableMap, Map<String, LogicalOperator> aliasOpMap) throws IOException {
-        if(query != null) {
-            query = query.trim();
-            if(query.length() == 0) return null;
-        }else {
-            return null;
-        }
-        try {
-            return new LogicalPlanBuilder(pigContext).parse(scope, query,
-                    aliasesMap, opTableMap, aliasOpMap, startLine);
-        } catch (ParseException e) {
-            //throw (IOException) new IOException(e.getMessage()).initCause(e);
-            PigException pe = Utils.getPigException(e);
-            int errCode = 1000;
-            String msg = "Error during parsing. " + (pe == null? e.getMessage() : pe.getMessage());
-            throw new FrontendException(msg, errCode, PigException.INPUT, false, null, e);
-        }
-    }
-
+ 
     public LogicalPlan clonePlan(String alias) throws IOException {
         // There are two choices on how we clone the logical plan
         // 1 - we really clone each operator and connect up the cloned operators
@@ -334,28 +354,39 @@
         // final logical plan is the clone that we want
         LogicalPlan lp = null;
         int lineNumber = 1;
-        // create data structures needed for parsing
-        Map<LogicalOperator, LogicalPlan> cloneAliases = new HashMap<LogicalOperator, LogicalPlan>();
-        Map<OperatorKey, LogicalOperator> cloneOpTable = new HashMap<OperatorKey, LogicalOperator>();
-        Map<String, LogicalOperator> cloneAliasOp = new HashMap<String, LogicalOperator>();
-        for (Iterator<String> it = cachedScript.iterator(); it.hasNext(); lineNumber++) {
-            lp = parseQuery(it.next(), lineNumber, cloneAliases, cloneOpTable, cloneAliasOp);
+        
+        // create data structures needed for parsing        
+        Graph graph = new Graph(true);
+        
+        for (Iterator<String> it = currDAG.getScriptCache().iterator(); it.hasNext(); lineNumber++) {
+            if (isBatchOn()) {
+                graph.registerQuery(it.next(), lineNumber);
+            } else {
+                lp = graph.parseQuery(it.next(), lineNumber);
+            }
         }
         
         if(alias == null) {
             // a store prompted the execution - so return
             // the entire logical plan
+            if (isBatchOn()) {
+                lp = new LogicalPlan();
+                for (LogicalPlan lpPart : graph.getStoreOpTable().values()) {
+                    lp.mergeSharedPlan(lpPart);
+                }
+            }
+
             return lp;
         } else {
             // return the logical plan corresponding to the 
             // alias supplied
-            LogicalOperator op = cloneAliasOp.get(alias);
+            LogicalOperator op = graph.getAliasOp().get(alias);
             if(op == null) {
                 int errCode = 1003;
                 String msg = "Unable to find an operator for alias " + alias;
                 throw new FrontendException(msg, errCode, PigException.INPUT);
             }
-            return cloneAliases.get(op);
+            return graph.getAliases().get(op);
         }
     }
     
@@ -396,7 +427,7 @@
     }
 
     public void setJobName(String name){
-        pigContext.getProperties().setProperty(PigContext.JOB_NAME, PigContext.JOB_NAME_PREFIX + ":" + name);
+        currDAG.setJobName(name);
     }
     
     /**
@@ -404,15 +435,30 @@
      * result
      */
     public Iterator<Tuple> openIterator(String id) throws IOException {
+        if (isBatchOn()) {
+            log.info("Skip DUMP command in batch mode.");
+            return new Iterator<Tuple>() {
+                public boolean hasNext() {
+                    return false;
+                }
+                public Tuple next() {
+                    return null;
+                }
+                public void remove() {
+                }
+            };
+        }
+
         try {
-            LogicalOperator op = aliasOp.get(id);
+            LogicalOperator op = currDAG.getAliasOp().get(id);
             if(null == op) {
                 int errCode = 1003;
                 String msg = "Unable to find an operator for alias " + id;
                 throw new FrontendException(msg, errCode, PigException.INPUT);
             }
-//            ExecJob job = execute(getPlanFromAlias(id, op.getClass().getName()));
+
             ExecJob job = store(id, FileLocalizer.getTemporaryPath(null, pigContext).toString(), BinStorage.class.getName() + "()");
+            
             // invocation of "execute" is synchronous!
             if (job.getStatus() == JOB_STATUS.COMPLETED) {
                     return job.getResults();
@@ -445,7 +491,7 @@
             String id,
             String filename,
             String func) throws IOException{
-        if (!aliasOp.containsKey(id))
+        if (!currDAG.getAliasOp().containsKey(id))
             throw new IOException("Invalid alias: " + id);
         
         try {
@@ -502,38 +548,65 @@
      */
     public void explain(String alias,
                         PrintStream stream) throws IOException {
+        explain(alias, "text", true, stream, stream, stream);
+    }
+
+    /**
+     * Provide information on how a pig query will be executed.
+     * @param alias Name of alias to explain.
+     * @param format Format in which the explain should be printed
+     * @param verbose Controls the amount of information printed
+     * @param dir Directory to print the differnt plans into
+     * @throws IOException if the requested alias cannot be found.
+     */
+    public void explain(String alias,
+                        String format,
+                        boolean verbose,
+                        String dir) throws IOException {
         try {
-            LogicalPlan lp = compileLp(alias);
+            PrintStream lps = new PrintStream(new File(dir,"logical_plan."+format));
+            PrintStream pps = new PrintStream(new File(dir,"physical_plan."+format));
+            PrintStream eps = new PrintStream(new File(dir,"exec_plan."+format));
+            explain(alias, format, verbose, lps, pps, eps);
+            lps.close();
+            pps.close();
+            eps.close();
             
-            // MRCompiler needs a store to be the leaf - hence
-            // add a store to the plan to explain
-            
-            // figure out the leaf to which the store needs to be added
-            List<LogicalOperator> leaves = lp.getLeaves();
-            LogicalOperator leaf = null;
-            if(leaves.size() == 1) {
-                leaf = leaves.get(0);
-            } else {
-                for (Iterator<LogicalOperator> it = leaves.iterator(); it.hasNext();) {
-                    LogicalOperator leafOp = it.next();
-                    if(leafOp.getAlias().equals(alias))
-                        leaf = leafOp;
-                }
-            }
-            
-            LogicalPlan storePlan = QueryParser.generateStorePlan(
-                scope, lp, "fakefile", PigStorage.class.getName(), leaf);
-            stream.println("Logical Plan:");
-            LOPrinter lv = new LOPrinter(stream, storePlan);
-            lv.visit();
-
-            PhysicalPlan pp = compilePp(storePlan);
-            stream.println("-----------------------------------------------");
-            stream.println("Physical Plan:");
+        } catch (Exception e) {
+            int errCode = 1067;
+            String msg = "Unable to explain alias " + alias;
+            throw new FrontendException(msg, errCode, PigException.INPUT, e);
+        }
+    }
 
-            stream.println("-----------------------------------------------");
-            pigContext.getExecutionEngine().explain(pp, stream);
-      
+    /**
+     * Provide information on how a pig query will be executed.
+     * @param alias Name of alias to explain.
+     * @param format Format in which the explain should be printed
+     * @param verbose Controls the amount of information printed
+     * @param lps Stream to print the logical tree
+     * @param lps Stream to print the physical tree
+     * @param lps Stream to print the execution tree
+     * @throws IOException if the requested alias cannot be found.
+     */
+    public void explain(String alias,
+                        String format,
+                        boolean verbose,
+                        PrintStream lps,
+                        PrintStream pps,
+                        PrintStream eps) throws IOException {
+        try {
+            LogicalPlan lp = getStorePlan(alias);
+            if (lp.size() == 0) {
+                lps.println("Logical plan is empty.");
+                pps.println("Physical plan is empty.");
+                eps.println("Execution plan is empty.");
+                return;
+            }
+            PhysicalPlan pp = compilePp(lp);
+            lp.explain(lps, format, verbose);
+            pp.explain(pps, format, verbose);
+            pigContext.getExecutionEngine().explain(pp, eps, format, verbose);
         } catch (Exception e) {
             int errCode = 1067;
             String msg = "Unable to explain alias " + alias;
@@ -633,10 +706,10 @@
   
     public Map<String, LogicalPlan> getAliases() {
         Map<String, LogicalPlan> aliasPlans = new HashMap<String, LogicalPlan>();
-        for(LogicalOperator op: this.aliases.keySet()) {
+        for(LogicalOperator op:  currDAG.getAliases().keySet()) {
             String alias = op.getAlias();
             if(null != alias) {
-                aliasPlans.put(alias, this.aliases.get(op));
+                aliasPlans.put(alias, currDAG.getAliases().get(op));
             }
         }
         return aliasPlans;
@@ -652,7 +725,6 @@
     }
 
     public Map<LogicalOperator, DataBag> getExamples(String alias) {
-        //LogicalPlan plan = aliases.get(aliasOp.get(alias));
         LogicalPlan plan = null;
         try {
             plan = clonePlan(alias);
@@ -665,15 +737,48 @@
         ExampleGenerator exgen = new ExampleGenerator(plan, pigContext);
         return exgen.getExamples();
     }
+
+    private LogicalPlan getStorePlan(String alias) throws IOException {
+        LogicalPlan lp = compileLp(alias);
+        
+        if (!isBatchOn() || alias != null) {
+            // MRCompiler needs a store to be the leaf - hence
+            // add a store to the plan to explain
+            
+            // figure out the leaves to which stores need to be added
+            List<LogicalOperator> leaves = lp.getLeaves();
+            LogicalOperator leaf = null;
+            if(leaves.size() == 1) {
+                leaf = leaves.get(0);
+            } else {
+                for (Iterator<LogicalOperator> it = leaves.iterator(); it.hasNext();) {
+                    LogicalOperator leafOp = it.next();
+                    if(leafOp.getAlias().equals(alias))
+                        leaf = leafOp;
+                }
+            }
+            
+            lp = QueryParser.generateStorePlan(scope, lp, "fakefile", 
+                                               PigStorage.class.getName(), leaf);
+        }
+
+        return lp;
+    }
     
     private ExecJob execute(String alias) throws FrontendException, ExecException {
-        ExecJob job = null;
-//        lp.explain(System.out, System.err);
         LogicalPlan typeCheckedLp = compileLp(alias);
-        
+
+        if (typeCheckedLp.size() == 0) {
+            return null;
+        }
+
+        LogicalOperator op = typeCheckedLp.getLeaves().get(0);
+        if (op instanceof LODefine) {
+            log.info("Skip execution of DEFINE only logical plan.");
+            return null;
+        }
+
         return executeCompiledLogicalPlan(typeCheckedLp);
-//        typeCheckedLp.explain(System.out, System.err);
-        
     }
     
     private ExecJob executeCompiledLogicalPlan(LogicalPlan compiledLp) throws ExecException {
@@ -698,14 +803,14 @@
         // create a clone of the logical plan and give it
         // to the operations below
         LogicalPlan lpClone;
+ 
         try {
-             lpClone = clonePlan(alias);
+            lpClone = clonePlan(alias);
         } catch (IOException e) {
             int errCode = 2001;
             String msg = "Unable to clone plan before compiling";
             throw new FrontendException(msg, errCode, PigException.BUG, e);
         }
-
         
         // Set the logical plan values correctly in all the operators
         PlanSetter ps = new PlanSetter(lpClone);
@@ -777,13 +882,13 @@
     private LogicalPlan getPlanFromAlias(
             String alias,
             String operation) throws FrontendException {
-        LogicalOperator lo = aliasOp.get(alias);
+        LogicalOperator lo = currDAG.getAliasOp().get(alias);
         if (lo == null) {
             int errCode = 1004;
             String msg = "No alias " + alias + " to " + operation;
             throw new FrontendException(msg, errCode, PigException.INPUT, false, null);
         }
-        LogicalPlan lp = aliases.get(lo);
+        LogicalPlan lp = currDAG.getAliases().get(lo);
         if (lp == null) {
             int errCode = 1005;
             String msg = "No plan for " + alias + " to " + operation;
@@ -792,5 +897,101 @@
         return lp;
     }
 
+    /*
+     * This class holds the internal states of a grunt shell session.
+     */
+    private class Graph {
+    	
+        private Map<LogicalOperator, LogicalPlan> aliases = new HashMap<LogicalOperator, LogicalPlan>();
+        
+        private Map<OperatorKey, LogicalOperator> opTable = new HashMap<OperatorKey, LogicalOperator>();
+        
+        private Map<String, LogicalOperator> aliasOp = new HashMap<String, LogicalOperator>();
+       
+        private ArrayList<String> scriptCache = new ArrayList<String>();	
+    
+        private Map<LogicalOperator, LogicalPlan> storeOpTable = new HashMap<LogicalOperator, LogicalPlan>();
+
+        private String jobName;
+        
+        private boolean batchMode;
+      
+        
+        Graph(boolean batchMode) { 
+            this.batchMode = batchMode;
+            this.jobName = "DefaultJobName";
+        };
+        
+        
+        Map<LogicalOperator, LogicalPlan> getAliases() { return aliases; }
+        
+        Map<OperatorKey, LogicalOperator> getOpTable() { return opTable; }
+        
+        Map<String, LogicalOperator> getAliasOp() { return aliasOp; }
+        
+        ArrayList<String> getScriptCache() { return scriptCache; }
+        
+        Map<LogicalOperator, LogicalPlan> getStoreOpTable() { return storeOpTable; }
+        
+        boolean isBatchOn() { return batchMode; };
+        
+        void execute() throws ExecException, FrontendException {
+            pigContext.getProperties().setProperty(PigContext.JOB_NAME, PigContext.JOB_NAME_PREFIX + ":" + jobName);
+            PigServer.this.execute(null);
+        }
+
+        void setJobName(String name) {
+            jobName = name;
+        }
 
+        void registerQuery(String query, int startLine) throws IOException {
+            
+            LogicalPlan lp = parseQuery(query, startLine);
+            
+            // store away the query for use in cloning later
+            scriptCache.add(query);
+            if (lp.getLeaves().size() == 1) {
+                LogicalOperator op = lp.getSingleLeafPlanOutputOp();
+
+                // Check if we just processed a LOStore i.e. STORE
+                if (op instanceof LOStore) {
+
+                    if (!batchMode) {
+                        try {
+                            execute();
+                        } catch (Exception e) {
+                            int errCode = 1002;
+                            String msg = "Unable to store alias "
+                                    + op.getOperatorKey().getId();
+                            throw new FrontendException(msg, errCode,
+                                    PigException.INPUT, e);
+                        }
+                    } else {
+                        storeOpTable.put(op, lp);
+                    }
+
+                }
+            }
+        }        
+    
+        LogicalPlan parseQuery(String query, int startLine) throws IOException {
+            if (query != null) {
+                query = query.trim();
+            }
+        
+            if (query == null || query.length() == 0) { 
+                throw new IllegalArgumentException();
+            }
+        
+            try {
+                return new LogicalPlanBuilder(PigServer.this.pigContext).parse(scope, query,
+                                              aliases, opTable, aliasOp, startLine);
+            } catch (ParseException e) {
+                PigException pe = Utils.getPigException(e);
+                int errCode = 1000;
+                String msg = "Error during parsing. " + (pe == null? e.getMessage() : pe.getMessage());
+                throw new FrontendException(msg, errCode, PigException.INPUT, false, null, e);
+            }
+        }
+    }
 }

Modified: hadoop/pig/branches/multiquery/src/org/apache/pig/backend/executionengine/ExecutionEngine.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/multiquery/src/org/apache/pig/backend/executionengine/ExecutionEngine.java?rev=747660&r1=747659&r2=747660&view=diff
==============================================================================
--- hadoop/pig/branches/multiquery/src/org/apache/pig/backend/executionengine/ExecutionEngine.java (original)
+++ hadoop/pig/branches/multiquery/src/org/apache/pig/backend/executionengine/ExecutionEngine.java Wed Feb 25 05:18:47 2009
@@ -123,8 +123,11 @@
      *
      * @param plan PhysicalPlan to explain
      * @param stream Stream to print output to
+     * @param format Format to print in
+     * @param verbose Amount of information to print
      */
-    public void explain(PhysicalPlan plan, PrintStream stream);
+    public void explain(PhysicalPlan plan, PrintStream stream, 
+                        String format, boolean verbose);
 
     /**
      * Return currently running jobs (can be useful for admin purposes)

Modified: hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java?rev=747660&r1=747659&r2=747660&view=diff
==============================================================================
--- hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java (original)
+++ hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java Wed Feb 25 05:18:47 2009
@@ -273,16 +273,13 @@
         throw new UnsupportedOperationException();
     }
 
-    public void explain(PhysicalPlan plan, PrintStream stream) {
+    public void explain(PhysicalPlan plan, PrintStream stream, String format, boolean verbose) {
         try {
-            PlanPrinter printer = new PlanPrinter(plan, stream);
-            printer.visit();
-            stream.println();
-
             ExecTools.checkLeafIsStore(plan, pigContext);
 
             MapReduceLauncher launcher = new MapReduceLauncher();
-            launcher.explain(plan, pigContext, stream);
+            launcher.explain(plan, pigContext, stream, format, verbose);
+
         } catch (Exception ve) {
             throw new RuntimeException(ve);
         }

Modified: hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/Launcher.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/Launcher.java?rev=747660&r1=747659&r2=747660&view=diff
==============================================================================
--- hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/Launcher.java (original)
+++ hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/Launcher.java Wed Feb 25 05:18:47 2009
@@ -88,13 +88,17 @@
      * @param pp PhysicalPlan to explain
      * @param pc PigContext to use for configuration
      * @param ps PrintStream to write output on.
+     * @param format Format to write in
+     * @param verbose Amount of information to print
      * @throws VisitorException
      * @throws IOException
      */
     public abstract void explain(
             PhysicalPlan pp,
             PigContext pc,
-            PrintStream ps) throws PlanException,
+            PrintStream ps,
+            String format,
+            boolean verbose) throws PlanException,
                                    VisitorException,
                                    IOException;
     

Modified: hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/LocalLauncher.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/LocalLauncher.java?rev=747660&r1=747659&r2=747660&view=diff
==============================================================================
--- hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/LocalLauncher.java (original)
+++ hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/LocalLauncher.java Wed Feb 25 05:18:47 2009
@@ -38,6 +38,7 @@
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRCompiler.LastInputStreamingOptimizer;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MRPrinter;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.DotMRPrinter;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MRStreamHandler;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.POPackageAnnotator;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
@@ -117,13 +118,22 @@
     public void explain(
             PhysicalPlan php,
             PigContext pc,
-            PrintStream ps) throws PlanException, VisitorException,
-                                   IOException {
+            PrintStream ps,
+            String format,
+            boolean verbose) throws PlanException, VisitorException,
+                                    IOException {
         log.trace("Entering LocalLauncher.explain");
         MROperPlan mrp = compile(php, pc);
 
-        MRPrinter printer = new MRPrinter(ps, mrp);
-        printer.visit();
+        if (format.equals("text")) {
+            MRPrinter printer = new MRPrinter(ps, mrp);
+            printer.setVerbose(verbose);
+            printer.visit();
+        } else {
+            DotMRPrinter printer =new DotMRPrinter(mrp, ps);
+            printer.setVerbose(verbose);
+            printer.dump();
+        }
     }
  
     private MROperPlan compile(

Modified: hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java?rev=747660&r1=747659&r2=747660&view=diff
==============================================================================
--- hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java (original)
+++ hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java Wed Feb 25 05:18:47 2009
@@ -197,15 +197,21 @@
      */
     public MROperPlan compile() throws IOException, PlanException, VisitorException {
         List<PhysicalOperator> leaves = plan.getLeaves();
-        if(!(leaves.get(0) instanceof POStore)) {
-            int errCode = 2025;
-            String msg = "Expected leaf of reduce plan to " +
-                "always be POStore. Found " + leaves.get(0).getClass().getSimpleName();
-            throw new MRCompilerException(msg, errCode, PigException.BUG);
-        }
-        POStore store = (POStore)leaves.get(0);
-        FileLocalizer.registerDeleteOnFail(store.getSFile().getFileName(), pigContext);
-        compile(store);
+
+        for (PhysicalOperator op : leaves) {
+            if (!(op instanceof POStore)) {
+                int errCode = 2025;
+                String msg = "Expected leaf of reduce plan to " +
+                    "always be POStore. Found " + op.getClass().getSimpleName();
+                throw new MRCompilerException(msg, errCode, PigException.BUG);
+            }
+        }
+
+        for (PhysicalOperator op : leaves) {
+            POStore store = (POStore)op;
+            FileLocalizer.registerDeleteOnFail(store.getSFile().getFileName(), pigContext);
+            compile(store);
+        }
 
         // I'm quite certain this is not the best way to do this.  The issue
         // is that for jobs that take multiple map reduce passes, for

Modified: hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java?rev=747660&r1=747659&r2=747660&view=diff
==============================================================================
--- hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java (original)
+++ hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java Wed Feb 25 05:18:47 2009
@@ -35,6 +35,7 @@
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRCompiler.LastInputStreamingOptimizer;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MRPrinter;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.DotMRPrinter;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MRStreamHandler;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.POPackageAnnotator;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
@@ -115,13 +116,26 @@
     public void explain(
             PhysicalPlan php,
             PigContext pc,
-            PrintStream ps) throws PlanException, VisitorException,
+            PrintStream ps,
+            String format,
+            boolean verbose) throws PlanException, VisitorException,
                                    IOException {
         log.trace("Entering MapReduceLauncher.explain");
         MROperPlan mrp = compile(php, pc);
 
-        MRPrinter printer = new MRPrinter(ps, mrp);
-        printer.visit();
+        if (format.equals("text")) {
+            MRPrinter printer = new MRPrinter(ps, mrp);
+            printer.setVerbose(verbose);
+            printer.visit();
+        } else {
+            ps.println("#--------------------------------------------------");
+            ps.println("# Map Reduce Plan                                  ");
+            ps.println("#--------------------------------------------------");
+            
+            DotMRPrinter printer =new DotMRPrinter(mrp, ps);
+            printer.setVerbose(verbose);
+            printer.dump();
+        }
     }
 
     private MROperPlan compile(

Modified: hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java?rev=747660&r1=747659&r2=747660&view=diff
==============================================================================
--- hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java (original)
+++ hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java Wed Feb 25 05:18:47 2009
@@ -323,4 +323,8 @@
     public void setReplFiles(FileSpec[] replFiles) {
         this.replFiles = replFiles;
     }
+
+    public int getRequestedParallelism() {
+        return requestedParallelism;
+    }
 }

Added: hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/DotMRPrinter.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/DotMRPrinter.java?rev=747660&view=auto
==============================================================================
--- hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/DotMRPrinter.java (added)
+++ hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/DotMRPrinter.java Wed Feb 25 05:18:47 2009
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans;
+
+import java.io.PrintStream;
+import java.util.List;
+import java.util.LinkedList;
+import java.util.Collection;
+import org.apache.pig.impl.util.MultiMap;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.pig.impl.plan.DotPlanDumper;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceOper;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.DotPOPrinter;
+import org.apache.pig.impl.plan.OperatorPlan;
+import org.apache.pig.impl.plan.Operator;
+import org.apache.pig.impl.plan.OperatorKey;
+import org.apache.pig.impl.plan.PlanVisitor;
+import org.apache.pig.impl.plan.PlanException;
+
+/**
+ * This class can print an MR plan in the DOT format. It uses
+ * clusters to illustrate nesting. If "verbose" is off, it will skip
+ * any nesting in the associated physical plans.
+ */
+public class DotMRPrinter extends DotPlanDumper<MapReduceOper, MROperPlan, 
+                                  DotMRPrinter.InnerOperator, 
+                                  DotMRPrinter.InnerPlan> {
+
+    static int counter = 0;
+    boolean isVerboseNesting = true;
+
+    public DotMRPrinter(MROperPlan plan, PrintStream ps) {
+        this(plan, ps, false, new HashSet<Operator>(), new HashSet<Operator>());
+    }
+
+    private DotMRPrinter(MROperPlan plan, PrintStream ps, boolean isSubGraph,
+                         Set<Operator> subgraphs, 
+                         Set<Operator> multiInputSubgraphs) {
+        super(plan, ps, isSubGraph, subgraphs, multiInputSubgraphs);
+    }
+
+    @Override
+    public void setVerbose(boolean verbose) {
+        // leave the parents verbose set to true
+        isVerboseNesting = verbose;
+    }
+
+    @Override
+    protected DotPlanDumper makeDumper(InnerPlan plan, PrintStream ps) {
+        return new InnerPrinter(plan, ps, mSubgraphs, mMultiInputSubgraphs);
+    }
+
+    @Override
+    protected String getName(MapReduceOper op) {
+        String name = "Map";
+        if (!op.combinePlan.isEmpty()) {
+            name += " - Combine";
+        }
+        if (!op.reducePlan.isEmpty()) {
+            name += " - Reduce";
+        }
+        if (op.getRequestedParallelism()!=-1) {
+            name += " Parallelism: "+op.getRequestedParallelism();
+        }
+        name += ", Global Sort: "+op.isGlobalSort();
+        return name;
+    }
+
+    @Override
+    protected Collection<InnerPlan> getNestedPlans(MapReduceOper op) {
+        Collection<InnerPlan> plans = new LinkedList<InnerPlan>();
+        plans.add(new InnerPlan(op.mapPlan, op.combinePlan, op.reducePlan));
+        return plans;
+    }
+    
+    @Override
+    protected String[] getAttributes(MapReduceOper op) {
+        String[] attributes = new String[3];
+        attributes[0] = "label=\""+getName(op)+"\"";
+        attributes[1] = "style=\"filled\"";
+        attributes[2] = "fillcolor=\"#EEEEEE\"";
+        return attributes;
+    }
+
+
+    /**
+     * Helper class to represent the relationship of map, reduce and
+     * combine phases in an MR operator.
+     */
+    public class InnerOperator extends Operator<PlanVisitor> {
+        String name;
+        PhysicalPlan plan;
+        int code;
+        
+        public InnerOperator(PhysicalPlan plan, String name) {
+            super(new OperatorKey());
+            this.name = name;
+            this.plan = plan;
+            this.code = counter++;
+        }
+        
+        @Override public void visit(PlanVisitor v) {}
+        @Override public boolean supportsMultipleInputs() {return false;}
+        @Override public boolean supportsMultipleOutputs() {return false;}
+        @Override public String name() {return name;}
+        public PhysicalPlan getPlan() {return plan;}
+        @Override public int hashCode() {return code;}
+    }
+
+    /**
+     * Helper class to represent the relationship of map, reduce and
+     * combine phases in an MR operator. Each MR operator will have
+     * an inner plan of map -> (combine)? -> (reduce)? inner
+     * operators. The inner operators contain the physical plan of the
+     * execution phase.
+     */
+    public class InnerPlan extends OperatorPlan<InnerOperator> {
+        public InnerPlan(PhysicalPlan mapPlan, PhysicalPlan combinePlan, 
+                         PhysicalPlan reducePlan) {
+            try {
+                InnerOperator map = new InnerOperator(mapPlan, "Map");
+                
+                this.add(map);
+                if (!combinePlan.isEmpty()) {
+                    InnerOperator combine = 
+                        new InnerOperator(combinePlan, "Combine");
+                    InnerOperator reduce = 
+                        new InnerOperator(reducePlan, "Reduce");
+                    this.add(combine);
+                    this.connect(map, combine);
+                    this.add(reduce);
+                    this.connect(combine, reduce);
+                } 
+                else if (!reducePlan.isEmpty()){
+                    InnerOperator reduce = 
+                        new InnerOperator(reducePlan, "Reduce");
+                    this.add(reduce);
+                    this.connect(map, reduce);
+                }
+            } catch (PlanException e) {}
+        }
+    }
+
+    /**
+     * Helper class to represent the relationship of map, reduce and
+     * combine phases in an MR operator.
+     */    
+    private class InnerPrinter extends DotPlanDumper<InnerOperator, InnerPlan,
+                                       PhysicalOperator, PhysicalPlan> {
+
+        public InnerPrinter(InnerPlan plan, PrintStream ps,
+                            Set<Operator> subgraphs, 
+                            Set<Operator> multiInputSubgraphs) {
+            super(plan, ps, true, subgraphs, multiInputSubgraphs);
+        }
+
+        @Override
+        protected String[] getAttributes(InnerOperator op) {
+            String[] attributes = new String[3];
+            attributes[0] = "label=\""+getName(op)+"\"";
+            attributes[1] = "style=\"filled\"";
+            attributes[2] = "fillcolor=\"white\"";
+            return attributes;
+        }
+
+        @Override
+        protected Collection<PhysicalPlan> getNestedPlans(InnerOperator op) {
+            Collection<PhysicalPlan> l = new LinkedList<PhysicalPlan>();
+            l.add(op.getPlan());
+            return l;
+        }
+
+        @Override
+        protected DotPOPrinter makeDumper(PhysicalPlan plan, PrintStream ps) {
+            DotPOPrinter printer = new DotPOPrinter(plan, ps, true, 
+                                                    mSubgraphs, 
+                                                    mMultiInputSubgraphs);
+            printer.setVerbose(isVerboseNesting);
+            return printer;
+        }
+    }
+}

Modified: hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/MRPrinter.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/MRPrinter.java?rev=747660&r1=747659&r2=747660&view=diff
==============================================================================
--- hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/MRPrinter.java (original)
+++ hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/MRPrinter.java Wed Feb 25 05:18:47 2009
@@ -34,6 +34,7 @@
 
     private PrintStream mStream = null;
     private int mIndent = 0;
+    private boolean isVerbose = true;
 
     /**
      * @param ps PrintStream to output plan information to
@@ -47,24 +48,31 @@
         mStream.println("--------------------------------------------------");
     }
 
+    public void setVerbose(boolean verbose) {
+        isVerbose = verbose;
+    }
+
     @Override
     public void visitMROp(MapReduceOper mr) throws VisitorException {
         mStream.println("MapReduce node " + mr.getOperatorKey().toString());
         if (mr.mapPlan != null && mr.mapPlan.size() > 0) {
             mStream.println("Map Plan");
             PlanPrinter printer = new PlanPrinter(mr.mapPlan, mStream);
+            printer.setVerbose(isVerbose);
             printer.visit();
             mStream.println("--------");
         }
         if (mr.combinePlan != null && mr.combinePlan.size() > 0) {
             mStream.println("Combine Plan");
             PlanPrinter printer = new PlanPrinter(mr.combinePlan, mStream);
+            printer.setVerbose(isVerbose);
             printer.visit();
             mStream.println("--------");
         }
         if (mr.reducePlan != null && mr.reducePlan.size() > 0) {
             mStream.println("Reduce Plan");
             PlanPrinter printer = new PlanPrinter(mr.reducePlan, mStream);
+            printer.setVerbose(isVerbose);
             printer.visit();
             mStream.println("--------");
         }

Added: hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/DotPOPrinter.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/DotPOPrinter.java?rev=747660&view=auto
==============================================================================
--- hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/DotPOPrinter.java (added)
+++ hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/DotPOPrinter.java Wed Feb 25 05:18:47 2009
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans;
+
+import java.io.PrintStream;
+import java.util.List;
+import java.util.LinkedList;
+import java.util.Collection;
+import org.apache.pig.impl.util.MultiMap;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.pig.impl.plan.DotPlanDumper;
+import org.apache.pig.impl.plan.Operator;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.*;
+
+/**
+ * This class can print a physical plan in the DOT format. It uses
+ * clusters to illustrate nesting. If "verbose" is off, it will skip
+ * any nesting.
+ */
+public class DotPOPrinter extends DotPlanDumper<PhysicalOperator, PhysicalPlan, 
+                                  PhysicalOperator, PhysicalPlan> {
+
+    public DotPOPrinter(PhysicalPlan plan, PrintStream ps) {
+        this(plan, ps, false, new HashSet<Operator>(), new HashSet<Operator>());
+    }
+
+    public DotPOPrinter(PhysicalPlan plan, PrintStream ps, boolean isSubGraph,
+                        Set<Operator> subgraphs, 
+                        Set<Operator> multiInputSubgraphs) {
+        super(plan, ps, isSubGraph, subgraphs, multiInputSubgraphs);
+    }
+
+    @Override
+    protected DotPlanDumper makeDumper(PhysicalPlan plan, PrintStream ps) {
+        return new DotPOPrinter(plan, ps, true, mSubgraphs, 
+                                mMultiInputSubgraphs);
+    }
+
+    @Override
+    protected String getName(PhysicalOperator op) {
+        return (op.name().split(" - "))[0];
+    }
+
+
+    @Override
+    protected String[] getAttributes(PhysicalOperator op) {
+        if (op instanceof POStore || op instanceof POLoad) {
+            String[] attributes = new String[3];
+            attributes[0] = "label=\""+getName(op).replace(":",",\\n")+"\"";
+            attributes[1] = "style=\"filled\"";
+            attributes[2] = "fillcolor=\"gray\"";
+            return attributes;
+        }
+        else {
+            return super.getAttributes(op);
+        }
+    }
+
+    @Override
+    protected Collection<PhysicalPlan> getNestedPlans(PhysicalOperator op) {
+        Collection<PhysicalPlan> plans = new LinkedList<PhysicalPlan>();
+
+        if(op instanceof POFilter){
+            plans.add(((POFilter)op).getPlan());
+        }
+        else if(op instanceof POForEach){
+            plans.addAll(((POForEach)op).getInputPlans());
+        }
+        else if(op instanceof POSort){
+            plans.addAll(((POSort)op).getSortPlans()); 
+        }
+        else if(op instanceof POLocalRearrange){
+            plans.addAll(((POLocalRearrange)op).getPlans());
+        }
+        else if(op instanceof POFRJoin) {
+            POFRJoin frj = (POFRJoin)op;
+            List<List<PhysicalPlan>> joinPlans = frj.getJoinPlans();
+            if(joinPlans!=null) {
+                for (List<PhysicalPlan> list : joinPlans) {
+                    plans.addAll(list);
+                }
+            }
+        }
+
+        return plans;
+    }
+}

Modified: hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhysicalPlan.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhysicalPlan.java?rev=747660&r1=747659&r2=747660&view=diff
==============================================================================
--- hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhysicalPlan.java (original)
+++ hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhysicalPlan.java Wed Feb 25 05:18:47 2009
@@ -20,6 +20,7 @@
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.io.OutputStream;
+import java.io.PrintStream;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
@@ -67,9 +68,20 @@
      * into the given output stream
      * @param out : OutputStream to which the visual representation is written
      */
-    public void explain(OutputStream out){
+    public void explain(OutputStream out) {
+        explain(out, true);
+    }
+
+    /**
+     * Write a visual representation of the Physical Plan
+     * into the given output stream
+     * @param out : OutputStream to which the visual representation is written
+     * @param verbose : Amount of information to print
+     */
+    public void explain(OutputStream out, boolean verbose){
         PlanPrinter<PhysicalOperator, PhysicalPlan> mpp = new PlanPrinter<PhysicalOperator, PhysicalPlan>(
                 this);
+        mpp.setVerbose(verbose);
 
         try {
             mpp.print(out);
@@ -81,6 +93,28 @@
             e.printStackTrace();
         }
     }
+
+    /**
+     * Write a visual representation of the Physical Plan
+     * into the given printstream
+     * @param ps : PrintStream to which the visual representation is written
+     * @param format : Format to print in
+     * @param verbose : Amount of information to print
+     */
+    public void explain(PrintStream ps, String format, boolean verbose) {
+        ps.println("#-----------------------------------------------");
+        ps.println("# Physical Plan:");
+        ps.println("#-----------------------------------------------");
+
+        if (format.equals("text")) {
+            explain((OutputStream)ps, verbose);
+        } else if (format.equals("dot")) {
+            DotPOPrinter pp = new DotPOPrinter(this, ps);
+            pp.setVerbose(verbose);
+            pp.dump();
+        }
+        ps.println("");
+  }
     
     @Override
     public void connect(PhysicalOperator from, PhysicalOperator to)
@@ -162,7 +196,7 @@
             return "Empty Plan!";
         else{
             ByteArrayOutputStream baos = new ByteArrayOutputStream();
-            explain(baos);
+            explain(baos, true);
             return baos.toString();
         }
     }

Modified: hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PlanPrinter.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PlanPrinter.java?rev=747660&r1=747659&r2=747660&view=diff
==============================================================================
--- hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PlanPrinter.java (original)
+++ hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PlanPrinter.java Wed Feb 25 05:18:47 2009
@@ -52,6 +52,8 @@
     
     PrintStream stream = System.out;
 
+    boolean isVerbose = true;
+
     public PlanPrinter(P plan) {
         super(plan, new DepthFirstWalker<O, P>(plan));
     }
@@ -61,6 +63,10 @@
         this.stream = stream;
     }
 
+    public void setVerbose(boolean verbose) {
+        isVerbose = verbose;
+    }
+
     @Override
     public void visit() throws VisitorException {
         try {
@@ -119,7 +125,7 @@
         StringBuilder sb = new StringBuilder();
         ByteArrayOutputStream baos = new ByteArrayOutputStream();
         if(pp!=null)
-            pp.explain(baos);
+            pp.explain(baos, isVerbose);
         else
             return "";
         sb.append(USep);
@@ -138,25 +144,27 @@
 
     private String depthFirst(O node) throws VisitorException {
         StringBuilder sb = new StringBuilder(node.name() + "\n");
-        if(node instanceof POFilter){
+        if (isVerbose) {
+          if(node instanceof POFilter){
             sb.append(planString(((POFilter)node).getPlan()));
-        }
-        else if(node instanceof POLocalRearrange){
+          }
+          else if(node instanceof POLocalRearrange){
             sb.append(planString(((POLocalRearrange)node).getPlans()));
-        }
-        else if(node instanceof POSort){
+          }
+          else if(node instanceof POSort){
             sb.append(planString(((POSort)node).getSortPlans())); 
-        }
-        else if(node instanceof POForEach){
+          }
+          else if(node instanceof POForEach){
             sb.append(planString(((POForEach)node).getInputPlans()));
-        }
-        else if(node instanceof POFRJoin){
+          }
+          else if(node instanceof POFRJoin){
             POFRJoin frj = (POFRJoin)node;
             List<List<PhysicalPlan>> joinPlans = frj.getJoinPlans();
             if(joinPlans!=null)
-                for (List<PhysicalPlan> list : joinPlans) {
-                    sb.append(planString(list));
-                }
+              for (List<PhysicalPlan> list : joinPlans) {
+                sb.append(planString(list));
+              }
+          }
         }
         
         List<O> originalPredecessors = mPlan.getPredecessors(node);

Modified: hadoop/pig/branches/multiquery/src/org/apache/pig/backend/local/executionengine/LocalExecutionEngine.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/multiquery/src/org/apache/pig/backend/local/executionengine/LocalExecutionEngine.java?rev=747660&r1=747659&r2=747660&view=diff
==============================================================================
--- hadoop/pig/branches/multiquery/src/org/apache/pig/backend/local/executionengine/LocalExecutionEngine.java (original)
+++ hadoop/pig/branches/multiquery/src/org/apache/pig/backend/local/executionengine/LocalExecutionEngine.java Wed Feb 25 05:18:47 2009
@@ -184,17 +184,15 @@
         throw new UnsupportedOperationException();
     }
 
-    public void explain(PhysicalPlan plan, PrintStream stream) {
+    public void explain(PhysicalPlan plan, PrintStream stream, 
+                        String format, boolean isVerbose) {
         try {
-            PlanPrinter printer = new PlanPrinter(plan, stream);
-            printer.visit();
-            stream.println();
-
             ExecTools.checkLeafIsStore(plan, pigContext);
 
             // LocalLauncher launcher = new LocalLauncher();
             LocalPigLauncher launcher = new LocalPigLauncher();
-            launcher.explain(plan, pigContext, stream);
+            launcher.explain(plan, pigContext, stream, 
+                             format, isVerbose);
         } catch (Exception ve) {
             throw new RuntimeException(ve);
         }

Modified: hadoop/pig/branches/multiquery/src/org/apache/pig/backend/local/executionengine/LocalPigLauncher.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/multiquery/src/org/apache/pig/backend/local/executionengine/LocalPigLauncher.java?rev=747660&r1=747659&r2=747660&view=diff
==============================================================================
--- hadoop/pig/branches/multiquery/src/org/apache/pig/backend/local/executionengine/LocalPigLauncher.java (original)
+++ hadoop/pig/branches/multiquery/src/org/apache/pig/backend/local/executionengine/LocalPigLauncher.java Wed Feb 25 05:18:47 2009
@@ -41,10 +41,10 @@
     Log log = LogFactory.getLog(getClass());
 
     @Override
-    public void explain(PhysicalPlan pp, PigContext pc, PrintStream ps)
-            throws PlanException, VisitorException, IOException {
-        // TODO Auto-generated method stub
-        pp.explain(ps);
+    public void explain(PhysicalPlan pp, PigContext pc, PrintStream ps,
+                        String format, boolean isVerbose)
+        throws PlanException, VisitorException, IOException {
+        pp.explain(ps, format, isVerbose);
         ps.append('\n');
     }
 

Added: hadoop/pig/branches/multiquery/src/org/apache/pig/impl/logicalLayer/DotLOPrinter.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/multiquery/src/org/apache/pig/impl/logicalLayer/DotLOPrinter.java?rev=747660&view=auto
==============================================================================
--- hadoop/pig/branches/multiquery/src/org/apache/pig/impl/logicalLayer/DotLOPrinter.java (added)
+++ hadoop/pig/branches/multiquery/src/org/apache/pig/impl/logicalLayer/DotLOPrinter.java Wed Feb 25 05:18:47 2009
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.impl.logicalLayer;
+
+import java.io.PrintStream;
+import java.util.List;
+import java.util.LinkedList;
+import java.util.Collection;
+import org.apache.pig.impl.util.MultiMap;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.pig.impl.plan.DotPlanDumper;
+import org.apache.pig.impl.plan.Operator;
+
+/**
+ * This class can print a logical plan in the DOT format. It uses
+ * clusters to illustrate nesting. If "verbose" is off, it will skip
+ * any nesting.
+ */
+public class DotLOPrinter extends DotPlanDumper<LogicalOperator, LogicalPlan,
+                                  LogicalOperator, LogicalPlan> {
+
+    public DotLOPrinter(LogicalPlan plan, PrintStream ps) {
+        this(plan, ps, false, new HashSet<Operator>(), new HashSet<Operator>());
+    }
+
+    private DotLOPrinter(LogicalPlan plan, PrintStream ps, boolean isSubGraph,
+                         Set<Operator> subgraphs, 
+                         Set<Operator> multiInSubgraphs) {
+        super(plan, ps, isSubGraph, subgraphs, multiInSubgraphs);
+    }
+
+    @Override
+    protected DotPlanDumper makeDumper(LogicalPlan plan, PrintStream ps) {
+        return new DotLOPrinter(plan, ps, true, mSubgraphs, 
+                                mMultiInputSubgraphs);
+    }
+
+    @Override
+    protected String getName(LogicalOperator op) {
+        String info = (op.name().split("\\d+-\\d+"))[0];
+        if (op instanceof LOProject) {
+            LOProject pr = (LOProject)op;
+            info += pr.isStar()?" [*]": pr.getProjection();
+        }
+        return info;
+    }
+
+    @Override
+    protected String[] getAttributes(LogicalOperator op) {
+        if (op instanceof LOStore || op instanceof LOLoad) {
+            String[] attributes = new String[3];
+            attributes[0] = "label=\""+getName(op).replace(":",",\\n")+"\"";
+            attributes[1] = "style=\"filled\"";
+            attributes[2] = "fillcolor=\"gray\"";
+            return attributes;
+        }
+        else {
+            return super.getAttributes(op);
+        }
+    }
+
+    @Override
+    protected MultiMap<LogicalOperator, LogicalPlan> 
+        getMultiInputNestedPlans(LogicalOperator op) {
+        
+        if(op instanceof LOCogroup){
+            return  ((LOCogroup)op).getGroupByPlans();
+        }
+        else if(op instanceof LOFRJoin){
+            return ((LOFRJoin)op).getJoinColPlans();
+        }
+        return new MultiMap<LogicalOperator, LogicalPlan>();
+    }
+
+    @Override
+    protected Collection<LogicalPlan> getNestedPlans(LogicalOperator op) {
+        Collection<LogicalPlan> plans = new LinkedList<LogicalPlan>();
+
+        if(op instanceof LOFilter){
+            plans.add(((LOFilter)op).getComparisonPlan());
+        }
+        else if(op instanceof LOForEach){
+            plans.addAll(((LOForEach)op).getForEachPlans());
+        }
+        else if(op instanceof LOGenerate){
+            plans.addAll(((LOGenerate)op).getGeneratePlans());
+        }
+        else if(op instanceof LOSort){
+            plans.addAll(((LOSort)op).getSortColPlans()); 
+        }
+        else if(op instanceof LOSplitOutput){
+            plans.add(((LOSplitOutput)op).getConditionPlan());
+        }
+        
+        return plans;
+    }
+}

Modified: hadoop/pig/branches/multiquery/src/org/apache/pig/impl/logicalLayer/LOPrinter.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/multiquery/src/org/apache/pig/impl/logicalLayer/LOPrinter.java?rev=747660&r1=747659&r2=747660&view=diff
==============================================================================
--- hadoop/pig/branches/multiquery/src/org/apache/pig/impl/logicalLayer/LOPrinter.java (original)
+++ hadoop/pig/branches/multiquery/src/org/apache/pig/impl/logicalLayer/LOPrinter.java Wed Feb 25 05:18:47 2009
@@ -46,6 +46,7 @@
     private String USep = "|   |\n|   ";
     private int levelCntr = -1;
     private OutputStream printer;
+    private boolean isVerbose = true;
 
     /**
      * @param ps PrintStream to output plan information to
@@ -66,6 +67,10 @@
         }
     }
 
+    public void setVerbose(boolean verbose) {
+        isVerbose = verbose;
+    }
+
     public void print(OutputStream printer) throws VisitorException, IOException {
         this.printer = printer;
         printer.write(depthFirstLP().getBytes());
@@ -114,55 +119,58 @@
             try {
                 sb.append(((ExpressionOperator)node).getFieldSchema());
             } catch (Exception e) {
-                //sb.append("Caught Exception: " + e.getMessage());
+                sb.append("Caught Exception: " + e.getMessage());
             }
         } else {
             sb.append(" Schema: ");
             try {
                 sb.append(node.getSchema());
             } catch (Exception e) {
-                //sb.append("Caught exception: " + e.getMessage());
+                sb.append("Caught exception: " + e.getMessage());
             }
         }
         sb.append(" Type: " + DataType.findTypeName(node.getType()));
         sb.append("\n");
-        if(node instanceof LOFilter){
-            sb.append(planString(((LOFilter)node).getComparisonPlan()));
-        }
-        else if(node instanceof LOForEach){
-            sb.append(planString(((LOForEach)node).getForEachPlans()));        
-        }
-        else if(node instanceof LOGenerate){
-            sb.append(planString(((LOGenerate)node).getGeneratePlans())); 
-            
-        }
-        else if(node instanceof LOCogroup){
-            MultiMap<LogicalOperator, LogicalPlan> plans = ((LOCogroup)node).getGroupByPlans();
-            for (LogicalOperator lo : plans.keySet()) {
-                // Visit the associated plans
-                for (LogicalPlan plan : plans.get(lo)) {
-                    sb.append(planString(plan));
+
+        if (isVerbose) {
+            if(node instanceof LOFilter){
+                sb.append(planString(((LOFilter)node).getComparisonPlan()));
+            }
+            else if(node instanceof LOForEach){
+                sb.append(planString(((LOForEach)node).getForEachPlans()));        
+            }
+            else if(node instanceof LOGenerate){
+                sb.append(planString(((LOGenerate)node).getGeneratePlans())); 
+                
+            }
+            else if(node instanceof LOCogroup){
+                MultiMap<LogicalOperator, LogicalPlan> plans = ((LOCogroup)node).getGroupByPlans();
+                for (LogicalOperator lo : plans.keySet()) {
+                    // Visit the associated plans
+                    for (LogicalPlan plan : plans.get(lo)) {
+                        sb.append(planString(plan));
+                    }
                 }
             }
-        }
-        else if(node instanceof LOFRJoin){
-            MultiMap<LogicalOperator, LogicalPlan> plans = ((LOFRJoin)node).getJoinColPlans();
-            for (LogicalOperator lo : plans.keySet()) {
-                // Visit the associated plans
-                for (LogicalPlan plan : plans.get(lo)) {
-                    sb.append(planString(plan));
+            else if(node instanceof LOFRJoin){
+                MultiMap<LogicalOperator, LogicalPlan> plans = ((LOFRJoin)node).getJoinColPlans();
+                for (LogicalOperator lo : plans.keySet()) {
+                    // Visit the associated plans
+                    for (LogicalPlan plan : plans.get(lo)) {
+                        sb.append(planString(plan));
+                    }
                 }
             }
-        }
-        else if(node instanceof LOSort){
-            sb.append(planString(((LOSort)node).getSortColPlans())); 
-        }
-        else if(node instanceof LOSplitOutput){
-            sb.append(planString(((LOSplitOutput)node).getConditionPlan()));
-        }
-        else if (node instanceof LOProject) {
-            sb.append("Input: ");
-            sb.append(((LOProject)node).getExpression().name());
+            else if(node instanceof LOSort){
+                sb.append(planString(((LOSort)node).getSortColPlans())); 
+            }
+            else if(node instanceof LOSplitOutput){
+                sb.append(planString(((LOSplitOutput)node).getConditionPlan()));
+            }
+            else if (node instanceof LOProject) {
+                sb.append("Input: ");
+                sb.append(((LOProject)node).getExpression().name());
+            }
         }
         
         List<LogicalOperator> originalPredecessors =  mPlan.getPredecessors(node);

Modified: hadoop/pig/branches/multiquery/src/org/apache/pig/impl/logicalLayer/LogicalPlan.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/multiquery/src/org/apache/pig/impl/logicalLayer/LogicalPlan.java?rev=747660&r1=747659&r2=747660&view=diff
==============================================================================
--- hadoop/pig/branches/multiquery/src/org/apache/pig/impl/logicalLayer/LogicalPlan.java (original)
+++ hadoop/pig/branches/multiquery/src/org/apache/pig/impl/logicalLayer/LogicalPlan.java Wed Feb 25 05:18:47 2009
@@ -58,6 +58,24 @@
 
         lpp.print(out);
     }
+
+    public void explain(PrintStream ps, String format, boolean verbose) 
+        throws VisitorException, IOException {
+        ps.println("#-----------------------------------------------");
+        ps.println("# Logical Plan:");
+        ps.println("#-----------------------------------------------");
+                
+        if (format.equals("text")) {
+            LOPrinter lpp = new LOPrinter(ps, this);
+            lpp.setVerbose(verbose);
+            lpp.visit();
+        } else if (format.equals("dot")) {
+            DotLOPrinter lpp = new DotLOPrinter(this, ps);
+            lpp.setVerbose(verbose);
+            lpp.dump();
+        }
+        ps.println("");
+    }
     
 //    public String toString() {
 //        if(mOps.size() == 0)

Added: hadoop/pig/branches/multiquery/src/org/apache/pig/impl/plan/DotPlanDumper.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/multiquery/src/org/apache/pig/impl/plan/DotPlanDumper.java?rev=747660&view=auto
==============================================================================
--- hadoop/pig/branches/multiquery/src/org/apache/pig/impl/plan/DotPlanDumper.java (added)
+++ hadoop/pig/branches/multiquery/src/org/apache/pig/impl/plan/DotPlanDumper.java Wed Feb 25 05:18:47 2009
@@ -0,0 +1,297 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.impl.plan;
+
+import java.io.PrintStream;
+import java.util.LinkedList;
+import java.util.Collection;
+import java.util.Set;
+import java.util.HashSet;
+
+import org.apache.pig.impl.util.MultiMap;
+
+/**
+ * This class puts everything that is needed to dump a plan in a
+ * format readable by graphviz's dot algorithm. Out of the box it does
+ * not print any nested plans.
+ */
+public class DotPlanDumper<E extends Operator, P extends OperatorPlan<E>, 
+                           N extends Operator, S extends OperatorPlan<N>> 
+    extends PlanDumper<E, P, S> {
+
+    protected Set<Operator> mSubgraphs;
+    protected Set<Operator> mMultiInputSubgraphs;    
+    private boolean isSubGraph = false;
+  
+    public DotPlanDumper(P plan, PrintStream ps) {
+        this(plan, ps, false, new HashSet<Operator>(), new HashSet<Operator>());
+    }
+
+    protected DotPlanDumper(P plan, PrintStream ps, boolean isSubGraph, 
+                            Set<Operator> mSubgraphs, 
+                            Set<Operator> mMultiInputSubgraphs) {
+        super(plan, ps);
+        this.isSubGraph = isSubGraph;
+        this.mSubgraphs = mSubgraphs;
+        this.mMultiInputSubgraphs = mMultiInputSubgraphs;
+    }
+
+    @Override
+    public void dump() {
+        if (!isSubGraph) {
+            ps.println("digraph plan {");
+            ps.println("compound=true;");
+            ps.println("node [shape=rect];");
+        }
+        super.dump();
+        if (!isSubGraph) {
+            ps.println("}");
+        }
+    }
+
+    @Override
+    protected void dumpMultiInputNestedOperator(E op, MultiMap<E, S> plans) {
+        dumpInvisibleOutput(op);
+
+        ps.print("subgraph ");
+        ps.print(getClusterID(op));
+        ps.println(" {");
+        join("; ", getAttributes(op));
+        ps.println("labelloc=b;");
+        
+        mMultiInputSubgraphs.add(op);
+
+        for (E o: plans.keySet()) {
+            ps.print("subgraph ");
+            ps.print(getClusterID(op, o));
+            ps.println(" {");
+            ps.println("label=\"\";");
+            dumpInvisibleInput(op, o);
+            for (S plan : plans.get(o)) {
+                PlanDumper dumper = makeDumper(plan, ps);
+                dumper.dump();
+                connectInvisibleInput(op, o, plan);
+            }
+            ps.println("};");
+        }
+        ps.println("};");
+        
+        for (E o: plans.keySet()) {
+            for (S plan: plans.get(o)) {
+                connectInvisibleOutput(op, plan);
+            }
+        }
+    }
+
+    @Override
+    protected void dumpNestedOperator(E op, Collection<S> plans) {
+        dumpInvisibleOperators(op);
+        ps.print("subgraph ");
+        ps.print(getClusterID(op));
+        ps.println(" {");
+        join("; ", getAttributes(op));
+        ps.println("labelloc=b;");
+
+        mSubgraphs.add(op);
+        
+        for (S plan: plans) {
+            PlanDumper dumper = makeDumper(plan, ps);
+            dumper.dump();
+            connectInvisibleInput(op, plan);
+        }
+        ps.println("};");
+
+        for (S plan: plans) {
+            connectInvisibleOutput(op, plan);
+        }
+    }
+
+    @Override
+    protected void dumpOperator(E op) {
+        ps.print(getID(op));
+        ps.print(" [");
+        join(", ", getAttributes(op));
+        ps.println("];");
+    }
+
+    @Override
+    protected void dumpEdge(E op, E suc) {
+        String in = getID(op);
+        String out = getID(suc);
+        String attributes = "";
+
+        if (mMultiInputSubgraphs.contains(op) || mSubgraphs.contains(op)) {
+            in = getSubgraphID(op, false);
+        }
+
+        ps.print(in);
+
+        if (mMultiInputSubgraphs.contains(suc)) {
+            out = getSubgraphID(suc, op, true);
+            attributes = " [lhead="+getClusterID(suc,op)+"]";
+        }
+
+        if (mSubgraphs.contains(suc)) {
+            out = getSubgraphID(suc, true);
+            attributes = " [lhead="+getClusterID(suc)+"]";
+        }
+        
+        ps.print(" -> ");
+        ps.print(out);
+        ps.println(attributes);
+    }
+
+    @Override
+    protected PlanDumper makeDumper(S plan, PrintStream ps) {
+        return new DotPlanDumper(plan, ps, true, 
+                                 mSubgraphs, mMultiInputSubgraphs);
+    }
+
+    /**
+     * Used to generate the label for an operator.
+     * @param E operator to dump
+     */
+    protected String getName(E op) {
+        return op.name();
+    }
+    
+    /**
+     * Used to generate the the attributes of a node
+     * @param E operator
+     */
+    protected String[] getAttributes(E op) {
+        String[] attributes = new String[1];
+        attributes[0] =  "label=\""+getName(op)+"\"";
+        return attributes;
+    }
+
+
+    private void connectInvisibleInput(E op1, E op2, S plan) {
+        String in = getSubgraphID(op1, op2, true);
+        
+        for (N l: plan.getRoots()) {
+            dumpInvisibleEdge(in, getID(l));
+        }
+    }
+
+    private void connectInvisibleInput(E op, S plan) {
+        String in = getSubgraphID(op, true);
+
+        for (N l: plan.getRoots()) {
+            String out;
+            if (mSubgraphs.contains(l) || mMultiInputSubgraphs.contains(l)) {
+                out = getSubgraphID(l, true);
+            } else {
+                out = getID(l);
+            }
+
+            dumpInvisibleEdge(in, out);
+        }
+    }
+
+    private void connectInvisibleOutput(E op, S plan) {
+        String out = getSubgraphID(op, false);
+
+        for (N l: plan.getLeaves()) {
+            String in;
+            if (mSubgraphs.contains(l) || mMultiInputSubgraphs.contains(l)) {
+                in = getSubgraphID(l, false);
+            } else {
+                in = getID(l);
+            }
+
+            dumpInvisibleEdge(in, out);
+        }
+    }
+
+    private void connectInvisible(E op, S plan) {
+        connectInvisibleInput(op, plan);
+        connectInvisibleOutput(op, plan);
+    }        
+
+    private void dumpInvisibleInput(E op1, E op2) {
+        ps.print(getSubgraphID(op1, op2, true));
+        ps.print(" ");
+        ps.print(getInvisibleAttributes(op1));
+        ps.println(";");
+    }
+    
+    private void dumpInvisibleInput(E op) {
+        ps.print(getSubgraphID(op, true));
+        ps.print(" ");
+        ps.print(getInvisibleAttributes(op));
+        ps.println(";");
+    }
+
+    private void dumpInvisibleOutput(E op) {
+        ps.print(getSubgraphID(op, false));
+        ps.print(" ");
+        ps.print(getInvisibleAttributes(op));
+        ps.println(";");
+    }
+
+    protected void dumpInvisibleOperators(E op) {
+        dumpInvisibleInput(op);
+        dumpInvisibleOutput(op);
+    }
+
+    private String getClusterID(Operator op1, Operator op2) {
+        return getClusterID(op1)+"_"+getID(op2);
+    }
+
+    private String getClusterID(Operator op) {
+        return "cluster_"+getID(op);
+    }
+
+    private String getSubgraphID(Operator op1, Operator op2, boolean in) {
+        String id = "s"+getID(op1)+"_"+getID(op2);
+        if (in) {
+            id += "_in";
+        }
+        else {
+            id += "_out";
+        }
+        return id;
+    }
+
+    private String getSubgraphID(Operator op, boolean in) {
+        String id =  "s"+getID(op);
+        if (in) {
+            id += "_in";
+        }
+        else {
+            id += "_out";
+        }
+        return id;
+    }
+
+    private String getID(Operator op) {
+        return ""+Math.abs(op.hashCode());
+    }
+
+    private String getInvisibleAttributes(Operator op) {
+        return "[label=\"\", style=invis, height=0, width=0]";
+    }
+    
+    private void dumpInvisibleEdge(String op, String suc) {
+        ps.print(op);
+        ps.print(" -> ");
+        ps.print(suc);
+        ps.println(" [style=invis];");
+    }
+}