You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by pr...@apache.org on 2009/04/23 18:57:20 UTC

svn commit: r767974 [1/4] - in /hadoop/pig/trunk: ./ src/org/apache/pig/ src/org/apache/pig/backend/executionengine/ src/org/apache/pig/backend/executionengine/util/ src/org/apache/pig/backend/hadoop/datastorage/ src/org/apache/pig/backend/hadoop/execu...

Author: pradeepkth
Date: Thu Apr 23 16:57:16 2009
New Revision: 767974

URL: http://svn.apache.org/viewvc?rev=767974&view=rev
Log:
PIG-627: multiquery support phase 1 and phase 2 (hagleitn and Richard Ding via pradeepkth)

Added:
    hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReducePOStoreImpl.java
      - copied unchanged from r767731, hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReducePOStoreImpl.java
    hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MultiQueryOptimizer.java
      - copied unchanged from r767731, hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MultiQueryOptimizer.java
    hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/NoopFilterRemover.java
      - copied unchanged from r767731, hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/NoopFilterRemover.java
    hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/NoopStoreRemover.java
      - copied unchanged from r767731, hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/NoopStoreRemover.java
    hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PhyPlanSetter.java
      - copied unchanged from r767731, hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PhyPlanSetter.java
    hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/DotMRPrinter.java
      - copied unchanged from r767731, hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/DotMRPrinter.java
    hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/DotPOPrinter.java
      - copied unchanged from r767731, hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/DotPOPrinter.java
    hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStoreImpl.java
      - copied unchanged from r767731, hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStoreImpl.java
    hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/util/PlanHelper.java
      - copied unchanged from r767731, hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/util/PlanHelper.java
    hadoop/pig/trunk/src/org/apache/pig/backend/local/executionengine/LocalPOStoreImpl.java
      - copied unchanged from r767731, hadoop/pig/branches/multiquery/src/org/apache/pig/backend/local/executionengine/LocalPOStoreImpl.java
    hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/DotLOPrinter.java
      - copied unchanged from r767731, hadoop/pig/branches/multiquery/src/org/apache/pig/impl/logicalLayer/DotLOPrinter.java
    hadoop/pig/trunk/src/org/apache/pig/impl/plan/DotPlanDumper.java
      - copied unchanged from r767731, hadoop/pig/branches/multiquery/src/org/apache/pig/impl/plan/DotPlanDumper.java
    hadoop/pig/trunk/src/org/apache/pig/impl/plan/PlanDumper.java
      - copied unchanged from r767731, hadoop/pig/branches/multiquery/src/org/apache/pig/impl/plan/PlanDumper.java
    hadoop/pig/trunk/src/org/apache/pig/impl/plan/ReverseDependencyOrderWalker.java
      - copied unchanged from r767731, hadoop/pig/branches/multiquery/src/org/apache/pig/impl/plan/ReverseDependencyOrderWalker.java
    hadoop/pig/trunk/test/org/apache/pig/test/TestMultiQuery.java
      - copied unchanged from r767731, hadoop/pig/branches/multiquery/test/org/apache/pig/test/TestMultiQuery.java
    hadoop/pig/trunk/test/org/apache/pig/test/TestMultiQueryLocal.java
      - copied unchanged from r767731, hadoop/pig/branches/multiquery/test/org/apache/pig/test/TestMultiQueryLocal.java
    hadoop/pig/trunk/test/org/apache/pig/test/data/passwd
      - copied unchanged from r767731, hadoop/pig/branches/multiquery/test/org/apache/pig/test/data/passwd
    hadoop/pig/trunk/test/org/apache/pig/test/data/passwd2
      - copied unchanged from r767731, hadoop/pig/branches/multiquery/test/org/apache/pig/test/data/passwd2
    hadoop/pig/trunk/test/org/apache/pig/test/data/test.ppf
      - copied unchanged from r767731, hadoop/pig/branches/multiquery/test/org/apache/pig/test/data/test.ppf
    hadoop/pig/trunk/test/org/apache/pig/test/data/test_broken.ppf
      - copied unchanged from r767731, hadoop/pig/branches/multiquery/test/org/apache/pig/test/data/test_broken.ppf
    hadoop/pig/trunk/test/org/apache/pig/test/data/testsub.pig
      - copied unchanged from r767731, hadoop/pig/branches/multiquery/test/org/apache/pig/test/data/testsub.pig
    hadoop/pig/trunk/test/org/apache/pig/test/data/testsubnested_exec.pig
      - copied unchanged from r767731, hadoop/pig/branches/multiquery/test/org/apache/pig/test/data/testsubnested_exec.pig
    hadoop/pig/trunk/test/org/apache/pig/test/data/testsubnested_run.pig
      - copied unchanged from r767731, hadoop/pig/branches/multiquery/test/org/apache/pig/test/data/testsubnested_run.pig
Removed:
    hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/LocalLauncher.java
Modified:
    hadoop/pig/trunk/   (props changed)
    hadoop/pig/trunk/CHANGES.txt
    hadoop/pig/trunk/src/org/apache/pig/Main.java
    hadoop/pig/trunk/src/org/apache/pig/PigServer.java
    hadoop/pig/trunk/src/org/apache/pig/backend/executionengine/ExecutionEngine.java
    hadoop/pig/trunk/src/org/apache/pig/backend/executionengine/util/ExecTools.java
    hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/datastorage/HPath.java
    hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java
    hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
    hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/Launcher.java
    hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
    hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
    hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java
    hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigInputFormat.java
    hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapBase.java
    hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java
    hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/MRPrinter.java
    hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java
    hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java
    hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhysicalPlan.java
    hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PlanPrinter.java
    hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSplit.java
    hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStore.java
    hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java
    hadoop/pig/trunk/src/org/apache/pig/backend/local/executionengine/LocalExecutionEngine.java
    hadoop/pig/trunk/src/org/apache/pig/backend/local/executionengine/LocalPigLauncher.java
    hadoop/pig/trunk/src/org/apache/pig/builtin/BinStorage.java
    hadoop/pig/trunk/src/org/apache/pig/impl/PigContext.java
    hadoop/pig/trunk/src/org/apache/pig/impl/io/FileLocalizer.java
    hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOPrinter.java
    hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOStore.java
    hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LogicalPlan.java
    hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LogicalPlanBuilder.java
    hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/ImplicitSplitInserter.java
    hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt
    hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/validators/LogicalPlanValidationExecutor.java
    hadoop/pig/trunk/src/org/apache/pig/impl/plan/OperatorPlan.java
    hadoop/pig/trunk/src/org/apache/pig/tools/grunt/GruntParser.java
    hadoop/pig/trunk/src/org/apache/pig/tools/pigscript/parser/PigScriptParser.jj
    hadoop/pig/trunk/test/org/apache/pig/test/MiniCluster.java
    hadoop/pig/trunk/test/org/apache/pig/test/RangeSlicer.java
    hadoop/pig/trunk/test/org/apache/pig/test/TestCombiner.java
    hadoop/pig/trunk/test/org/apache/pig/test/TestCustomSlicer.java
    hadoop/pig/trunk/test/org/apache/pig/test/TestFilterOpString.java
    hadoop/pig/trunk/test/org/apache/pig/test/TestGrunt.java
    hadoop/pig/trunk/test/org/apache/pig/test/TestHBaseStorage.java
    hadoop/pig/trunk/test/org/apache/pig/test/TestJobSubmission.java
    hadoop/pig/trunk/test/org/apache/pig/test/TestLoad.java
    hadoop/pig/trunk/test/org/apache/pig/test/TestLocalJobSubmission.java
    hadoop/pig/trunk/test/org/apache/pig/test/TestLocalPOSplit.java
    hadoop/pig/trunk/test/org/apache/pig/test/TestLogToPhyCompiler.java
    hadoop/pig/trunk/test/org/apache/pig/test/TestLogicalPlanBuilder.java
    hadoop/pig/trunk/test/org/apache/pig/test/TestMRCompiler.java
    hadoop/pig/trunk/test/org/apache/pig/test/TestPigScriptParser.java
    hadoop/pig/trunk/test/org/apache/pig/test/TestStore.java
    hadoop/pig/trunk/test/org/apache/pig/test/TestUnion.java
    hadoop/pig/trunk/test/org/apache/pig/test/data/GoldenFiles/MRC1.gld
    hadoop/pig/trunk/test/org/apache/pig/test/data/GoldenFiles/MRC10.gld
    hadoop/pig/trunk/test/org/apache/pig/test/data/GoldenFiles/MRC11.gld
    hadoop/pig/trunk/test/org/apache/pig/test/data/GoldenFiles/MRC12.gld
    hadoop/pig/trunk/test/org/apache/pig/test/data/GoldenFiles/MRC13.gld
    hadoop/pig/trunk/test/org/apache/pig/test/data/GoldenFiles/MRC14.gld
    hadoop/pig/trunk/test/org/apache/pig/test/data/GoldenFiles/MRC15.gld
    hadoop/pig/trunk/test/org/apache/pig/test/data/GoldenFiles/MRC16.gld
    hadoop/pig/trunk/test/org/apache/pig/test/data/GoldenFiles/MRC17.gld
    hadoop/pig/trunk/test/org/apache/pig/test/data/GoldenFiles/MRC2.gld
    hadoop/pig/trunk/test/org/apache/pig/test/data/GoldenFiles/MRC3.gld
    hadoop/pig/trunk/test/org/apache/pig/test/data/GoldenFiles/MRC5.gld
    hadoop/pig/trunk/test/org/apache/pig/test/data/GoldenFiles/MRC6.gld
    hadoop/pig/trunk/test/org/apache/pig/test/data/GoldenFiles/MRC7.gld
    hadoop/pig/trunk/test/org/apache/pig/test/data/GoldenFiles/MRC8.gld
    hadoop/pig/trunk/test/org/apache/pig/test/data/GoldenFiles/MRC9.gld
    hadoop/pig/trunk/test/org/apache/pig/test/utils/GenPhyOp.java
    hadoop/pig/trunk/test/org/apache/pig/test/utils/LogicalPlanTester.java
    hadoop/pig/trunk/test/org/apache/pig/test/utils/dotGraph/DOTParser.jjt   (props changed)

Propchange: hadoop/pig/trunk/
------------------------------------------------------------------------------
    svn:mergeinfo = /hadoop/pig/branches/multiquery:741727-767731

Modified: hadoop/pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=767974&r1=767973&r2=767974&view=diff
==============================================================================
--- hadoop/pig/trunk/CHANGES.txt (original)
+++ hadoop/pig/trunk/CHANGES.txt Thu Apr 23 16:57:16 2009
@@ -595,3 +595,4 @@
 
     PIG-284: target for building source jar (oae via olgan)
 
+    PIG-627: multiquery support phase 1 and phase 2 (hagleitn and Richard Ding via pradeepkth)

Modified: hadoop/pig/trunk/src/org/apache/pig/Main.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/Main.java?rev=767974&r1=767973&r2=767974&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/Main.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/Main.java Thu Apr 23 16:57:16 2009
@@ -99,6 +99,7 @@
         opts.registerOpt('m', "param_file", CmdLineParser.ValueExpected.OPTIONAL);
         opts.registerOpt('o', "hod", CmdLineParser.ValueExpected.NOT_ACCEPTED);
         opts.registerOpt('p', "param", CmdLineParser.ValueExpected.OPTIONAL);
+        opts.registerOpt('M', "no_multiquery", CmdLineParser.ValueExpected.OPTIONAL);
         opts.registerOpt('r', "dryrun", CmdLineParser.ValueExpected.NOT_ACCEPTED);
         opts.registerOpt('t', "optimizer_off", CmdLineParser.ValueExpected.REQUIRED);
         opts.registerOpt('v', "verbose", CmdLineParser.ValueExpected.NOT_ACCEPTED);
@@ -121,6 +122,8 @@
         //by default warning aggregation is on
         properties.setProperty("aggregate.warning", ""+true);
 
+        properties.setProperty("opt.multiquery", ""+true);
+
         char opt;
         while ((opt = opts.getNextOpt()) != CmdLineParser.EndOfOpts) {
             switch (opt) {
@@ -191,6 +194,11 @@
             case 'm':
                 paramFiles.add(opts.getValStr());
                 break;
+
+            case 'M':
+                // turns off multiquery optimization
+                properties.setProperty("opt.multiquery",""+false);
+                break;
                             
             case 'o': 
                 // TODO sgroschupf using system properties is always a very bad idea
@@ -510,6 +518,7 @@
     System.out.println("    -w, -warning turn warning on; also turns warning aggregation off");
     System.out.println("    -x, -exectype local|mapreduce, mapreduce is default");
 
+    System.out.println("    -M, -no_multiquery turn multiquery optimization off; Multiquery is on by default");
 }
 
 private static String validateLogFile(String logFileName, String scriptName) {

Modified: hadoop/pig/trunk/src/org/apache/pig/PigServer.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/PigServer.java?rev=767974&r1=767973&r2=767974&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/PigServer.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/PigServer.java Thu Apr 23 16:57:16 2009
@@ -34,7 +34,8 @@
 import java.util.Map;
 import java.util.Properties;
 import java.util.Set;
-
+import java.util.Stack;
+import org.apache.pig.impl.plan.PlanException;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.pig.backend.datastorage.ContainerDescriptor;
@@ -50,10 +51,12 @@
 import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.io.FileLocalizer;
 import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.impl.logicalLayer.LOCogroup;
+import org.apache.pig.impl.logicalLayer.LOFRJoin;
+import org.apache.pig.impl.logicalLayer.LOLoad;
 import org.apache.pig.impl.logicalLayer.LogicalOperator;
 import org.apache.pig.impl.logicalLayer.LogicalPlan;
 import org.apache.pig.impl.logicalLayer.LogicalPlanBuilder;
-import org.apache.pig.impl.logicalLayer.LOPrinter;
 import org.apache.pig.impl.logicalLayer.PlanSetter;
 import org.apache.pig.impl.logicalLayer.optimizer.LogicalOptimizer;
 import org.apache.pig.impl.logicalLayer.parser.ParseException;
@@ -65,6 +68,7 @@
 import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.impl.plan.CompilationMessageCollector.MessageType;
 import org.apache.pig.impl.streaming.StreamingCommand;
+import org.apache.pig.impl.util.MultiMap;
 import org.apache.pig.impl.util.ObjectSerializer;
 import org.apache.pig.impl.util.PropertiesUtil;
 import org.apache.pig.impl.logicalLayer.LODefine;
@@ -99,24 +103,42 @@
         throw new PigException(msg, errCode, PigException.BUG);
     }
 
-
-    Map<LogicalOperator, LogicalPlan> aliases = new HashMap<LogicalOperator, LogicalPlan>();
-    Map<OperatorKey, LogicalOperator> opTable = new HashMap<OperatorKey, LogicalOperator>();
-    Map<String, LogicalOperator> aliasOp = new HashMap<String, LogicalOperator>();
-    PigContext pigContext;
+    /*
+     * The data structure to support grunt shell operations. 
+     * The grunt shell can only work on one graph at a time. 
+     * If a script is contained inside another script, the grunt
+     * shell first saves the current graph on the stack and works 
+     * on a new graph. After the nested script is done, the grunt 
+     * shell pops up the saved graph and continues working on it.
+     */
+    private Stack<Graph> graphs = new Stack<Graph>();
     
+    /*
+     * The current Graph the grunt shell is working on.
+     */
+    private Graph currDAG;
+ 
+    private PigContext pigContext;
+    
+    private static int scopeCounter = 0;
     private String scope = constructScope();
+
     private ArrayList<String> cachedScript = new ArrayList<String>();
     private boolean aggregateWarning = true;
+    private boolean isMultiQuery = true;
     
     private String constructScope() {
         // scope servers for now as a session id
-        // scope = user_id + "-" + time_stamp;
         
-        String user = System.getProperty("user.name", "DEFAULT_USER_ID");
-        String date = (new Date()).toString();
-       
-        return user + "-" + date;
+        // String user = System.getProperty("user.name", "DEFAULT_USER_ID");
+        // String date = (new Date()).toString();
+
+        // scope is not really used in the system right now. It will
+        // however make your explain statements look lengthy if set to
+        // username-date. For now let's simplify the scope, if a real
+        // scope is needed again, we might need to update all the
+        // operators to not include scope in their name().
+        return ""+(++scopeCounter);
     }
     
     public PigServer(String execTypeString) throws ExecException, IOException {
@@ -128,7 +150,7 @@
     }
 
     public PigServer(ExecType execType, Properties properties) throws ExecException {
-        this(new PigContext(execType, properties), true);
+        this(new PigContext(execType, properties));
     }
   
     public PigServer(PigContext context) throws ExecException {
@@ -137,17 +159,16 @@
     
     public PigServer(PigContext context, boolean connect) throws ExecException {
         this.pigContext = context;
-        if (this.pigContext.getProperties().getProperty(PigContext.JOB_NAME) == null) {
-            setJobName("DefaultJobName") ;
-        }
+        currDAG = new Graph(false);
         
         aggregateWarning = "true".equalsIgnoreCase(pigContext.getProperties().getProperty("aggregate.warning"));
-        
+        isMultiQuery = "true".equalsIgnoreCase(pigContext.getProperties().getProperty("opt.multiquery","true"));
+
         if (connect) {
             pigContext.connect();
         }
     }
-
+    
     public PigContext getPigContext(){
         return pigContext;
     }
@@ -159,7 +180,84 @@
     public void debugOff() {
         pigContext.debug = false;
     }
-    
+ 
+    /**
+     * Starts batch execution mode.
+     */
+    public void setBatchOn() {
+        log.info("Create a new graph.");
+        
+        if (currDAG != null) {
+            graphs.push(currDAG);
+        }
+        currDAG = new Graph(isMultiQuery);
+    }
+
+    /**
+     * Retrieve the current execution mode.
+     * 
+     * @return true if the execution mode is batch; false otherwise.
+     */
+    public boolean isBatchOn() {
+        // Batch is on when there are multiple graphs on the
+        // stack. That gives the right response even if multiquery was
+        // turned off.
+        return graphs.size() > 0;
+    }
+
+    /**
+     * Returns whether there is anything to process in the current batch.
+     * @throws FrontendException
+     * @return true if there are no stores to process in the current
+     * batch, false otherwise.
+     */
+    public boolean isBatchEmpty() throws FrontendException {
+        if (currDAG == null) {
+            int errCode = 1083;
+            String msg = "setBatchOn() must be called first.";
+            throw new FrontendException(msg, errCode, PigException.INPUT);
+        }
+
+        return currDAG.isBatchEmpty();
+    }
+
+    /**
+     * Submits a batch of Pig commands for execution. 
+     * 
+     * @throws FrontendException
+     * @throws ExecException
+     */
+    public void executeBatch() throws FrontendException, ExecException {
+        if (!isMultiQuery) {
+            // ignore if multiquery is off
+            return;
+        }
+
+        if (currDAG == null || !isBatchOn()) {
+            int errCode = 1083;
+            String msg = "setBatchOn() must be called first.";
+            throw new FrontendException(msg, errCode, PigException.INPUT);
+        }
+        
+        currDAG.execute();
+    }
+
+    /**
+     * Discards a batch of Pig commands.
+     * 
+     * @throws FrontendException
+     * @throws ExecException
+     */
+    public void discardBatch() throws FrontendException {
+        if (currDAG == null || !isBatchOn()) {
+            int errCode = 1083;
+            String msg = "setBatchOn() must be called first.";
+            throw new FrontendException(msg, errCode, PigException.INPUT);
+        }
+        
+        currDAG = graphs.pop();
+    }
+       
     /**
      * Add a path to be skipped while automatically shipping binaries for 
      * streaming.
@@ -274,99 +372,20 @@
      *            line number of the query within the whold script
      * @throws IOException
      */    
-    public void registerQuery(String query, int startLine) throws IOException {
-            
-        LogicalPlan lp = parseQuery(query, startLine, aliases, opTable, aliasOp);
-        // store away the query for use in cloning later
-        cachedScript .add(query);
-        
-        if (lp.getLeaves().size() == 1)
-        {
-            LogicalOperator op = lp.getSingleLeafPlanOutputOp();
-            // No need to do anything about DEFINE 
-            if (op instanceof LODefine) {
-                return;
-            }
-        
-            // Check if we just processed a LOStore i.e. STORE
-            if (op instanceof LOStore) {
-                try{
-                    execute(null);
-                } catch (Exception e) {
-                    int errCode = 1002;
-                    String msg = "Unable to store alias " + op.getOperatorKey().getId();
-                    throw new FrontendException(msg, errCode, PigException.INPUT, e);
-                }
-            }
-        }
+    public void registerQuery(String query, int startLine) throws IOException {            
+    	currDAG.registerQuery(query, startLine);
     }
-    
-    private LogicalPlan parseQuery(String query, int startLine, Map<LogicalOperator, LogicalPlan> aliasesMap, 
-            Map<OperatorKey, LogicalOperator> opTableMap, Map<String, LogicalOperator> aliasOpMap) throws IOException {
-        if(query != null) {
-            query = query.trim();
-            if(query.length() == 0) return null;
-        }else {
-            return null;
-        }
-        try {
-            return new LogicalPlanBuilder(pigContext).parse(scope, query,
-                    aliasesMap, opTableMap, aliasOpMap, startLine);
-        } catch (ParseException e) {
-            //throw (IOException) new IOException(e.getMessage()).initCause(e);
-            PigException pe = LogUtils.getPigException(e);
-            int errCode = 1000;
-            String msg = "Error during parsing. " + (pe == null? e.getMessage() : pe.getMessage());
-            throw new FrontendException(msg, errCode, PigException.INPUT, false, null, e);
-        }
-    }
-
+ 
     public LogicalPlan clonePlan(String alias) throws IOException {
-        // There are two choices on how we clone the logical plan
-        // 1 - we really clone each operator and connect up the cloned operators
-        // 2 - we cache away the script till the point we need to clone
-        // and then simply re-parse the script. 
-        // The latter approach is used here
-        // FIXME: There is one open issue with this now:
-        // Consider the following script:
-        // A = load 'file:/somefile';
-        // B = filter A by $0 > 10;
-        // store B into 'bla';
-        // rm 'file:/somefile';
-        // A = load 'file:/someotherfile'
-        // when we try to clone - we try to reparse
-        // from the beginning and currently the parser
-        // checks for file existence of files in the load
-        // in the case where the file is a local one -i.e. with file: prefix
-        // This will be a known issue now and we will need to revisit later
-        
-        // parse each line of the cached script and the
-        // final logical plan is the clone that we want
-        LogicalPlan lp = null;
-        int lineNumber = 1;
-        // create data structures needed for parsing
-        Map<LogicalOperator, LogicalPlan> cloneAliases = new HashMap<LogicalOperator, LogicalPlan>();
-        Map<OperatorKey, LogicalOperator> cloneOpTable = new HashMap<OperatorKey, LogicalOperator>();
-        Map<String, LogicalOperator> cloneAliasOp = new HashMap<String, LogicalOperator>();
-        for (Iterator<String> it = cachedScript.iterator(); it.hasNext(); lineNumber++) {
-            lp = parseQuery(it.next(), lineNumber, cloneAliases, cloneOpTable, cloneAliasOp);
-        }
-        
-        if(alias == null) {
-            // a store prompted the execution - so return
-            // the entire logical plan
-            return lp;
-        } else {
-            // return the logical plan corresponding to the 
-            // alias supplied
-            LogicalOperator op = cloneAliasOp.get(alias);
-            if(op == null) {
-                int errCode = 1003;
-                String msg = "Unable to find an operator for alias " + alias;
-                throw new FrontendException(msg, errCode, PigException.INPUT);
-            }
-            return cloneAliases.get(op);
+        Graph graph = currDAG.clone();
+
+        if (graph == null) {
+            int errCode = 2127;
+            String msg = "Cloning of plan failed.";
+            throw new FrontendException(msg, errCode, PigException.BUG);
         }
+
+        return graph.getPlan(alias);
     }
     
     public void registerQuery(String query) throws IOException {
@@ -378,7 +397,7 @@
             GruntParser grunt = new GruntParser(new FileReader(new File(fileName)));
             grunt.setInteractive(false);
             grunt.setParams(this);
-            grunt.parseStopOnError();
+            grunt.parseStopOnError(true);
         } catch (FileNotFoundException e) {
             // TODO Auto-generated catch block
             e.printStackTrace();
@@ -391,7 +410,7 @@
     }
 
     public void printAliases () throws FrontendException {
-        System.out.println("aliases: " + aliasOp.keySet());
+        System.out.println("aliases: " + currDAG.getAliasOp().keySet());
     }
 
     public Schema dumpSchema(String alias) throws IOException{
@@ -410,7 +429,7 @@
     }
 
     public void setJobName(String name){
-        pigContext.getProperties().setProperty(PigContext.JOB_NAME, PigContext.JOB_NAME_PREFIX + ":" + name);
+        currDAG.setJobName(name);
     }
     
     /**
@@ -419,14 +438,15 @@
      */
     public Iterator<Tuple> openIterator(String id) throws IOException {
         try {
-            LogicalOperator op = aliasOp.get(id);
+            LogicalOperator op = currDAG.getAliasOp().get(id);
             if(null == op) {
                 int errCode = 1003;
                 String msg = "Unable to find an operator for alias " + id;
                 throw new FrontendException(msg, errCode, PigException.INPUT);
             }
-//            ExecJob job = execute(getPlanFromAlias(id, op.getClass().getName()));
+
             ExecJob job = store(id, FileLocalizer.getTemporaryPath(null, pigContext).toString(), BinStorage.class.getName() + "()");
+            
             // invocation of "execute" is synchronous!
 
             if (job.getStatus() == JOB_STATUS.COMPLETED) {
@@ -460,7 +480,7 @@
             String id,
             String filename,
             String func) throws IOException{
-        if (!aliasOp.containsKey(id))
+        if (!currDAG.getAliasOp().containsKey(id))
             throw new IOException("Invalid alias: " + id);
         
         try {
@@ -517,42 +537,51 @@
      */
     public void explain(String alias,
                         PrintStream stream) throws IOException {
+        explain(alias, "text", true, false, stream, stream, stream);
+    }
+
+    /**
+     * Provide information on how a pig query will be executed.
+     * @param alias Name of alias to explain.
+     * @param format Format in which the explain should be printed
+     * @param verbose Controls the amount of information printed
+     * @param markAsExecute When set will treat the explain like a
+     * call to execute in the respoect that all the pending stores are
+     * marked as complete.
+     * @param lps Stream to print the logical tree
+     * @param pps Stream to print the physical tree
+     * @param eps Stream to print the execution tree
+     * @throws IOException if the requested alias cannot be found.
+     */
+    public void explain(String alias,
+                        String format,
+                        boolean verbose,
+                        boolean markAsExecute,
+                        PrintStream lps,
+                        PrintStream pps,
+                        PrintStream eps) throws IOException {
         try {
-            LogicalPlan lp = compileLp(alias);
-            
-            // MRCompiler needs a store to be the leaf - hence
-            // add a store to the plan to explain
-            
-            // figure out the leaf to which the store needs to be added
-            List<LogicalOperator> leaves = lp.getLeaves();
-            LogicalOperator leaf = null;
-            if(leaves.size() == 1) {
-                leaf = leaves.get(0);
-            } else {
-                for (Iterator<LogicalOperator> it = leaves.iterator(); it.hasNext();) {
-                    LogicalOperator leafOp = it.next();
-                    if(leafOp.getAlias().equals(alias))
-                        leaf = leafOp;
-                }
+            pigContext.inExplain = true;
+            LogicalPlan lp = getStorePlan(alias);
+            if (lp.size() == 0) {
+                lps.println("Logical plan is empty.");
+                pps.println("Physical plan is empty.");
+                eps.println("Execution plan is empty.");
+                return;
+            }
+            PhysicalPlan pp = compilePp(lp);
+            lp.explain(lps, format, verbose);
+            pp.explain(pps, format, verbose);
+            pigContext.getExecutionEngine().explain(pp, eps, format, verbose);
+            if (markAsExecute) {
+                currDAG.markAsExecuted();
             }
-            
-            LogicalPlan storePlan = QueryParser.generateStorePlan(
-                scope, lp, "fakefile", PigStorage.class.getName(), leaf);
-            stream.println("Logical Plan:");
-            LOPrinter lv = new LOPrinter(stream, storePlan);
-            lv.visit();
-
-            PhysicalPlan pp = compilePp(storePlan);
-            stream.println("-----------------------------------------------");
-            stream.println("Physical Plan:");
-
-            stream.println("-----------------------------------------------");
-            pigContext.getExecutionEngine().explain(pp, stream);
-      
         } catch (Exception e) {
             int errCode = 1067;
             String msg = "Unable to explain alias " + alias;
             throw new FrontendException(msg, errCode, PigException.INPUT, e);
+        } finally {
+            pigContext.inExplain = false;
         }
     }
 
@@ -648,10 +677,10 @@
   
     public Map<String, LogicalPlan> getAliases() {
         Map<String, LogicalPlan> aliasPlans = new HashMap<String, LogicalPlan>();
-        for(LogicalOperator op: this.aliases.keySet()) {
+        for(LogicalOperator op:  currDAG.getAliases().keySet()) {
             String alias = op.getAlias();
             if(null != alias) {
-                aliasPlans.put(alias, this.aliases.get(op));
+                aliasPlans.put(alias, currDAG.getAliases().get(op));
             }
         }
         return aliasPlans;
@@ -667,11 +696,10 @@
     }
 
     public Set<String> getAliasKeySet() {
-        return aliasOp.keySet();
+        return currDAG.getAliasOp().keySet();
     }
 
     public Map<LogicalOperator, DataBag> getExamples(String alias) {
-        //LogicalPlan plan = aliases.get(aliasOp.get(alias));
         LogicalPlan plan = null;
         try {
             plan = clonePlan(alias);
@@ -684,15 +712,48 @@
         ExampleGenerator exgen = new ExampleGenerator(plan, pigContext);
         return exgen.getExamples();
     }
+
+    private LogicalPlan getStorePlan(String alias) throws IOException {
+        LogicalPlan lp = compileLp(alias);
+        
+        if (!isBatchOn() || alias != null) {
+            // MRCompiler needs a store to be the leaf - hence
+            // add a store to the plan to explain
+            
+            // figure out the leaves to which stores need to be added
+            List<LogicalOperator> leaves = lp.getLeaves();
+            LogicalOperator leaf = null;
+            if(leaves.size() == 1) {
+                leaf = leaves.get(0);
+            } else {
+                for (Iterator<LogicalOperator> it = leaves.iterator(); it.hasNext();) {
+                    LogicalOperator leafOp = it.next();
+                    if(leafOp.getAlias().equals(alias))
+                        leaf = leafOp;
+                }
+            }
+            
+            lp = QueryParser.generateStorePlan(scope, lp, "fakefile", 
+                                               PigStorage.class.getName(), leaf);
+        }
+
+        return lp;
+    }
     
     private ExecJob execute(String alias) throws FrontendException, ExecException {
-        ExecJob job = null;
-//        lp.explain(System.out, System.err);
         LogicalPlan typeCheckedLp = compileLp(alias);
-        
+
+        if (typeCheckedLp.size() == 0) {
+            return null;
+        }
+
+        LogicalOperator op = typeCheckedLp.getLeaves().get(0);
+        if (op instanceof LODefine) {
+            log.info("Skip execution of DEFINE only logical plan.");
+            return null;
+        }
+
         return executeCompiledLogicalPlan(typeCheckedLp);
-//        typeCheckedLp.explain(System.out, System.err);
-        
     }
     
     private ExecJob executeCompiledLogicalPlan(LogicalPlan compiledLp) throws ExecException {
@@ -717,14 +778,14 @@
         // create a clone of the logical plan and give it
         // to the operations below
         LogicalPlan lpClone;
+ 
         try {
-             lpClone = clonePlan(alias);
+            lpClone = clonePlan(alias);
         } catch (IOException e) {
             int errCode = 2001;
             String msg = "Unable to clone plan before compiling";
             throw new FrontendException(msg, errCode, PigException.BUG, e);
         }
-
         
         // Set the logical plan values correctly in all the operators
         PlanSetter ps = new PlanSetter(lpClone);
@@ -788,13 +849,13 @@
     private LogicalPlan getPlanFromAlias(
             String alias,
             String operation) throws FrontendException {
-        LogicalOperator lo = aliasOp.get(alias);
+        LogicalOperator lo = currDAG.getAliasOp().get(alias);
         if (lo == null) {
             int errCode = 1004;
             String msg = "No alias " + alias + " to " + operation;
             throw new FrontendException(msg, errCode, PigException.INPUT, false, null);
         }
-        LogicalPlan lp = aliases.get(lo);
+        LogicalPlan lp = currDAG.getAliases().get(lo);
         if (lp == null) {
             int errCode = 1005;
             String msg = "No plan for " + alias + " to " + operation;
@@ -803,5 +864,266 @@
         return lp;
     }
 
+    /*
+     * This class holds the internal states of a grunt shell session.
+     */
+    private class Graph {
+    	
+        private Map<LogicalOperator, LogicalPlan> aliases = new HashMap<LogicalOperator, LogicalPlan>();
+        
+        private Map<OperatorKey, LogicalOperator> opTable = new HashMap<OperatorKey, LogicalOperator>();
+        
+        private Map<String, LogicalOperator> aliasOp = new HashMap<String, LogicalOperator>();
+       
+        private List<String> scriptCache = new ArrayList<String>();	
+
+        // the fileNameMap contains filename to canonical filename
+        // mappings. This is done so we can reparse the cached script
+        // and remember the translation (current directory might only
+        // be correct during the first parse
+        private Map<String, String> fileNameMap = new HashMap<String, String>();
+    
+        private Map<LOStore, LogicalPlan> storeOpTable = new HashMap<LOStore, LogicalPlan>();
+        
+        private Set<LOLoad> loadOps = new HashSet<LOLoad>();
+
+        private String jobName;
+        
+        private boolean batchMode;
 
+        private int processedStores;
+
+        private int ignoreNumStores;
+        
+        private LogicalPlan lp;
+        
+        Graph(boolean batchMode) { 
+            this.batchMode = batchMode;
+            this.processedStores = 0;
+            this.ignoreNumStores = 0;
+            this.jobName = pigContext.getProperties().getProperty(PigContext.JOB_NAME,
+                                                                  PigContext.JOB_NAME_PREFIX+":DefaultJobName");
+            this.lp = new LogicalPlan();
+        };
+        
+        Map<LogicalOperator, LogicalPlan> getAliases() { return aliases; }
+        
+        Map<OperatorKey, LogicalOperator> getOpTable() { return opTable; }
+        
+        Map<String, LogicalOperator> getAliasOp() { return aliasOp; }
+        
+        List<String> getScriptCache() { return scriptCache; }
+        
+        boolean isBatchOn() { return batchMode; };
+
+        boolean isBatchEmpty() { return processedStores == storeOpTable.keySet().size(); }
+        
+        void execute() throws ExecException, FrontendException {
+            pigContext.getProperties().setProperty(PigContext.JOB_NAME, jobName);
+            PigServer.this.execute(null);
+            processedStores = storeOpTable.keySet().size();
+        }
+
+        void markAsExecuted() {
+            processedStores = storeOpTable.keySet().size();
+        }
+
+        void setJobName(String name) {
+            jobName = PigContext.JOB_NAME_PREFIX+":"+name;
+        }
+
+        LogicalPlan getPlan(String alias) throws IOException {
+            LogicalPlan plan = lp;
+                
+            if (alias != null) {
+                LogicalOperator op = aliasOp.get(alias);
+                if(op == null) {
+                    int errCode = 1003;
+                    String msg = "Unable to find an operator for alias " + alias;
+                    throw new FrontendException(msg, errCode, PigException.INPUT);
+                }
+                plan = aliases.get(op);
+            }
+            return plan;
+        }
+
+        void registerQuery(String query, int startLine) throws IOException {
+            
+            LogicalPlan tmpLp = parseQuery(query, startLine);
+            
+            // store away the query for use in cloning later
+            scriptCache.add(query);
+            if (tmpLp.getLeaves().size() == 1) {
+                LogicalOperator op = tmpLp.getSingleLeafPlanOutputOp();
+                
+                // Check if we just processed a LOStore i.e. STORE
+                if (op instanceof LOStore) {
+
+                    if (!batchMode) {
+                        lp = tmpLp;
+                        try {
+                            execute();
+                        } catch (Exception e) {
+                            int errCode = 1002;
+                            String msg = "Unable to store alias "
+                                    + op.getOperatorKey().getId();
+                            throw new FrontendException(msg, errCode,
+                                    PigException.INPUT, e);
+                        }
+                    } else {
+                        if (0 == ignoreNumStores) {
+                            storeOpTable.put((LOStore)op, tmpLp);
+                            lp.mergeSharedPlan(tmpLp);
+                            List<LogicalOperator> roots = tmpLp.getRoots();
+                            for (LogicalOperator root : roots) {
+                                if (root instanceof LOLoad) {
+                                    loadOps.add((LOLoad)root);
+                                }
+                            }
+
+                        } else {
+                            --ignoreNumStores;
+                        }
+                    }
+                }
+            }
+        }        
+    
+        LogicalPlan parseQuery(String query, int startLine) throws IOException {        
+            if (query == null || query.length() == 0) { 
+                int errCode = 1084;
+                String msg = "Invalid Query: Query is null or of size 0";
+                throw new FrontendException(msg, errCode, PigException.INPUT);
+            }
+
+            query = query.trim();
+        
+            try {
+                return new LogicalPlanBuilder(PigServer.this.pigContext).parse(scope, query,
+                                              aliases, opTable, aliasOp, startLine, fileNameMap);
+            } catch (ParseException e) {
+                PigException pe = LogUtils.getPigException(e);
+                int errCode = 1000;
+                String msg = "Error during parsing. " + (pe == null? e.getMessage() : pe.getMessage());
+                throw new FrontendException(msg, errCode, PigException.INPUT, false, null, e);
+            }
+        }
+
+        protected Graph clone() {
+            // There are two choices on how we clone the logical plan
+            // 1 - we really clone each operator and connect up the cloned operators
+            // 2 - we cache away the script till the point we need to clone
+            // and then simply re-parse the script. 
+            // The latter approach is used here
+            // FIXME: There is one open issue with this now:
+            // Consider the following script:
+            // A = load 'file:/somefile';
+            // B = filter A by $0 > 10;
+            // store B into 'bla';
+            // rm 'file:/somefile';
+            // A = load 'file:/someotherfile'
+            // when we try to clone - we try to reparse
+            // from the beginning and currently the parser
+            // checks for file existence of files in the load
+            // in the case where the file is a local one -i.e. with file: prefix
+            // This will be a known issue now and we will need to revisit later
+            
+            // parse each line of the cached script
+            int lineNumber = 1;
+            
+            // create data structures needed for parsing        
+            Graph graph = new Graph(isBatchOn());
+            graph.ignoreNumStores = processedStores;
+            graph.processedStores = processedStores;
+            graph.fileNameMap = fileNameMap;
+            
+            try {
+                for (Iterator<String> it = getScriptCache().iterator(); it.hasNext(); lineNumber++) {
+                    if (isBatchOn()) {
+                        graph.registerQuery(it.next(), lineNumber);
+                    } else {
+                        graph.lp = graph.parseQuery(it.next(), lineNumber);
+                    }
+                }
+                graph.postProcess();
+            } catch (IOException ioe) {
+                graph = null;
+            }          
+            return graph;
+        }
+        
+        private void postProcess() throws IOException {
+            
+            // Set the logical plan values correctly in all the operators
+            PlanSetter ps = new PlanSetter(lp);
+            ps.visit();
+
+            // The following code deals with store/load combination of 
+            // intermediate files. In this case we will replace the load operator
+            // with a (implicit) split operator, iff the load/store
+            // func is reversible (because that's when we can safely
+            // skip the load and keep going with the split output). If
+            // the load/store func is not reversible (or they are
+            // different functions), we connect the store and the load
+            // to remember the dependency.
+            for (LOLoad load : loadOps) {
+                for (LOStore store : storeOpTable.keySet()) {
+                    String ifile = load.getInputFile().getFileName();
+                    String ofile = store.getOutputFile().getFileName();
+                    if (ofile.compareTo(ifile) == 0) {
+                        LoadFunc lFunc = (LoadFunc) pigContext.instantiateFuncFromSpec(load.getInputFile().getFuncSpec());
+                        StoreFunc sFunc = (StoreFunc) pigContext.instantiateFuncFromSpec(store.getOutputFile().getFuncSpec());
+                        if (lFunc.getClass() == sFunc.getClass() && lFunc instanceof ReversibleLoadStoreFunc) {
+                        
+                        // In this case we remember the input file
+                        // spec in the store. We might have to use it
+                        // in the MR compiler to recreate the load, if
+                        // the store happens on a job boundary.
+                        store.setInputSpec(load.getInputFile());
+
+                            LogicalOperator storePred = lp.getPredecessors(store).get(0);
+                            
+                            // In this case we remember the input file
+                            // spec in the store. We might have to use it
+                            // in the MR compiler to recreate the load, if
+                            // the store happens on a job boundary.
+                            store.setInputSpec(load.getInputFile());
+                            
+                            lp.disconnect(store, load);
+                            lp.replace(load, storePred);
+
+                            List<LogicalOperator> succs = lp.getSuccessors(storePred);
+                            
+                            for (LogicalOperator succ : succs) {
+                                MultiMap<LogicalOperator, LogicalPlan> innerPls = null;
+                                
+                                // fix inner plans for cogroup and frjoin operators
+                                if (succ instanceof LOCogroup) {
+                                    innerPls = ((LOCogroup)succ).getGroupByPlans();
+                                } else if (succ instanceof LOFRJoin) {
+                                    innerPls = ((LOFRJoin)succ).getJoinColPlans();
+                                }
+                                
+                                if (innerPls != null) {
+                                    if (innerPls.containsKey(load)) {
+                                        Collection<LogicalPlan> pls = innerPls.get(load);
+                                        innerPls.removeKey(load);
+                                        innerPls.put(storePred, pls);
+                                    }
+                                }
+                            }
+                        } else {
+                            try {
+                                store.getPlan().connect(store, load);
+                            } catch (PlanException ex) {
+                                int errCode = 2128;
+                                String msg = "Failed to connect store with dependent load.";
+                                throw new FrontendException(msg, errCode, ex);
+                            }
+                        }
+                    }
+                }
+            }
+        }
+    }
 }

Modified: hadoop/pig/trunk/src/org/apache/pig/backend/executionengine/ExecutionEngine.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/executionengine/ExecutionEngine.java?rev=767974&r1=767973&r2=767974&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/executionengine/ExecutionEngine.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/executionengine/ExecutionEngine.java Thu Apr 23 16:57:16 2009
@@ -123,8 +123,11 @@
      *
      * @param plan PhysicalPlan to explain
      * @param stream Stream to print output to
+     * @param format Format to print in
+     * @param verbose Amount of information to print
      */
-    public void explain(PhysicalPlan plan, PrintStream stream);
+    public void explain(PhysicalPlan plan, PrintStream stream, 
+                        String format, boolean verbose);
 
     /**
      * Return currently running jobs (can be useful for admin purposes)

Modified: hadoop/pig/trunk/src/org/apache/pig/backend/executionengine/util/ExecTools.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/executionengine/util/ExecTools.java?rev=767974&r1=767973&r2=767974&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/executionengine/util/ExecTools.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/executionengine/util/ExecTools.java Thu Apr 23 16:57:16 2009
@@ -47,7 +47,6 @@
                 String scope = leaf.getOperatorKey().getScope();
                 POStore str = new POStore(new OperatorKey(scope,
                     NodeIdGenerator.getGenerator().getNextNodeId(scope)));
-                str.setPc(pigContext);
                 spec = new FileSpec(FileLocalizer.getTemporaryPath(null,
                     pigContext).toString(),
                     new FuncSpec(BinStorage.class.getName()));

Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/datastorage/HPath.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/datastorage/HPath.java?rev=767974&r1=767973&r2=767974&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/datastorage/HPath.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/datastorage/HPath.java Thu Apr 23 16:57:16 2009
@@ -181,7 +181,7 @@
    
     @Override
     public String toString() {
-        return path.toString();
+        return path.makeQualified(getHFS()).toString();
     }
     
     @Override

Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java?rev=767974&r1=767973&r2=767974&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java Thu Apr 23 16:57:16 2009
@@ -282,16 +282,13 @@
         throw new UnsupportedOperationException();
     }
 
-    public void explain(PhysicalPlan plan, PrintStream stream) {
+    public void explain(PhysicalPlan plan, PrintStream stream, String format, boolean verbose) {
         try {
-            PlanPrinter printer = new PlanPrinter(plan, stream);
-            printer.visit();
-            stream.println();
-
             ExecTools.checkLeafIsStore(plan, pigContext);
 
             MapReduceLauncher launcher = new MapReduceLauncher();
-            launcher.explain(plan, pigContext, stream);
+            launcher.explain(plan, pigContext, stream, format, verbose);
+
         } catch (Exception ve) {
             throw new RuntimeException(ve);
         }

Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java?rev=767974&r1=767973&r2=767974&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java Thu Apr 23 16:57:16 2009
@@ -22,12 +22,7 @@
 import java.io.IOException;
 import java.net.URI;
 import java.net.URISyntaxException;
-import java.util.ArrayList;
-import java.util.Hashtable;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
+import java.util.*;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -35,6 +30,7 @@
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.filecache.DistributedCache;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.io.WritableComparator;
@@ -55,6 +51,7 @@
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;
 import org.apache.pig.data.BagFactory;
 import org.apache.pig.data.DataType;
 import org.apache.pig.data.TupleFactory;
@@ -106,85 +103,121 @@
 
     public static final String LOG_DIR = "_logs";
 
+    private List<Path> tmpPath;
+    private Path curTmpPath;
+
+    public JobControlCompiler(PigContext pigContext, Configuration conf) throws IOException {
+        this.pigContext = pigContext;
+        this.conf = conf;
+        tmpPath = new LinkedList<Path>();
+    }
+
+    /**
+     * Moves all the results of a collection of MR jobs to the final
+     * output directory. Some of the results may have been put into a
+     * temp location to work around restrictions with multiple output
+     * from a single map reduce job.
+     *
+     * This method should always be called after the job execution
+     * completes.
+     */
+    public void moveResults() throws IOException {
+        if (curTmpPath != null) {
+            tmpPath.add(curTmpPath);
+            curTmpPath = null;
+        }
+
+        for (Path tmp: tmpPath) {
+            Path abs = new Path(tmp, "abs");
+            Path rel = new Path(tmp, "rel");
+            FileSystem fs = tmp.getFileSystem(conf);
+
+            if (fs.exists(abs)) {
+                moveResults(abs, abs.toUri().getPath(), fs);
+            }
+            
+            if (fs.exists(rel)) {        
+                moveResults(rel, rel.toUri().getPath()+"/", fs);
+            }
+        }
+        tmpPath = new LinkedList<Path>();
+    }
+
+    /**
+     * Walks the temporary directory structure to move (rename) files
+     * to their final location.
+     */
+    private void moveResults(Path p, String rem, FileSystem fs) throws IOException {
+        for (FileStatus fstat: fs.listStatus(p)) {
+            Path src = fstat.getPath();
+            if (fstat.isDir()) {
+                fs.mkdirs(removePart(src, rem));
+                moveResults(fstat.getPath(), rem, fs);
+            } else {
+                Path dst = removePart(src, rem);
+                fs.rename(src,dst);
+            }
+        }
+    }
+
+    private Path removePart(Path src, String part) {
+        URI uri = src.toUri();
+        String pathStr = uri.getPath().replace(part, "");
+        return new Path(pathStr);
+    }
+
+    private void makeTmpPath() throws IOException {
+        if (curTmpPath != null) {
+            tmpPath.add(curTmpPath);
+        }
+
+        for (int tries = 0;;) {
+            try {
+                curTmpPath = 
+                    new Path(FileLocalizer
+                             .getTemporaryPath(null, pigContext).toString());
+                FileSystem fs = curTmpPath.getFileSystem(conf);
+                curTmpPath = curTmpPath.makeQualified(fs);
+                fs.mkdirs(curTmpPath);
+                break;
+            } catch (IOException ioe) {
+                if (++tries==100) {
+                    throw ioe;
+                }
+            }
+        }
+    }
+
     /**
      * The map between MapReduceOpers and their corresponding Jobs
      */
     Map<OperatorKey, Job> seen = new Hashtable<OperatorKey, Job>();
     
     /**
-     * Top level compile method that issues a call to the recursive
-     * compile method.
+     * Compiles all jobs that have no dependencies removes them from
+     * the plan and returns. Should be called with the same plan until
+     * exhausted. 
      * @param plan - The MROperPlan to be compiled
      * @param grpName - The name given to the JobControl
-     * @param conf - The Configuration object having the various properties
-     * @param pigContext - PigContext passed on from the execution engine
-     * @return JobControl object
+     * @return JobControl object - null if no more jobs in plan
      * @throws JobCreationException
      */
-    public JobControl compile(MROperPlan plan, String grpName, Configuration conf, PigContext pigContext) throws JobCreationException{
+    public JobControl compile(MROperPlan plan, String grpName) throws JobCreationException{
         this.plan = plan;
-        this.conf = conf;
-        this.pigContext = pigContext;
-        JobControl jobCtrl = new JobControl(grpName);
-        
-        List<MapReduceOper> leaves ;
-        leaves = plan.getLeaves();
-        
-        for (MapReduceOper mro : leaves) {
-            jobCtrl.addJob(compile(mro,jobCtrl));
+
+        if (plan.size() == 0) {
+            return null;
         }
-        return jobCtrl;
-    }
-    
-    /**
-     * The recursive compilation method that works by doing a depth first 
-     * traversal of the MROperPlan. Compiles a Job for the input MapReduceOper
-     * with the dependencies maintained in jobCtrl
-     * @param mro - Input MapReduceOper for which a Job needs to be compiled
-     * @param jobCtrl - The running JobCtrl object to maintain dependencies b/w jobs
-     * @return Job corresponding to the input mro
-     * @throws JobCreationException
-     */
-    private Job compile(MapReduceOper mro, JobControl jobCtrl) throws JobCreationException {
-        List<MapReduceOper> pred = plan.getPredecessors(mro);
-        
-        JobConf currJC = null;
-        
-        try{
-            if(pred==null || pred.size()<=0){
-                //No dependencies! Create the JobConf
-                //Construct the Job object with it and return
-                Job ret = null;
-                if(seen.containsKey(mro.getOperatorKey()))
-                    ret = seen.get(mro.getOperatorKey());
-                else{
-                    currJC = getJobConf(mro, conf, pigContext);
-                    ret = new Job(currJC,null);
-                    seen.put(mro.getOperatorKey(), ret);
-                }
-                return ret;
-            }
-            
-            //Has dependencies. So compile all the inputs
-            List<Job> compiledInputs = new ArrayList<Job>(pred.size());
-            
-            for (MapReduceOper oper : pred) {
-                Job ret = null;
-                if(seen.containsKey(oper.getOperatorKey()))
-                    ret = seen.get(oper.getOperatorKey());
-                else{
-                    ret = compile(oper, jobCtrl);
-                    jobCtrl.addJob(ret);
-                    seen.put(oper.getOperatorKey(),ret);
-                }
-                compiledInputs.add(ret);
+
+        JobControl jobCtrl = new JobControl(grpName);
+
+        try {
+            List<MapReduceOper> roots = new LinkedList<MapReduceOper>();
+            roots.addAll(plan.getRoots());
+            for (MapReduceOper mro: roots) {
+                jobCtrl.addJob(new Job(getJobConf(mro, conf, pigContext)));
+                plan.remove(mro);
             }
-            //Get JobConf for the current MapReduceOper
-            currJC = getJobConf(mro, conf, pigContext);
-            
-            //Create a new Job with the obtained JobConf
-            //and the compiled inputs as dependent jobs
-            return new Job(currJC,(ArrayList<Job>)compiledInputs);
         } catch (JobCreationException jce) {
         	throw jce;
         } catch(Exception e) {
@@ -192,8 +225,10 @@
             String msg = "Internal error creating job configuration.";
             throw new JobCreationException(msg, errCode, PigException.BUG, e);
         }
+
+        return jobCtrl;
     }
-    
+        
     /**
      * The method that creates the JobConf corresponding to a MapReduceOper.
      * The assumption is that
@@ -225,32 +260,33 @@
         //used as the working directory
         String user = System.getProperty("user.name");
         jobConf.setUser(user != null ? user : "Pigster");
-        
-        //Process the POLoads
-        List<PhysicalOperator> lds = getRoots(mro.mapPlan);
-        if(lds!=null && lds.size()>0){
-            for (PhysicalOperator operator : lds) {
-                POLoad ld = (POLoad)operator;
-                
-                Pair<FileSpec, Boolean> p = new Pair<FileSpec, Boolean>(ld.getLFile(), ld.isSplittable());
-                //Store the inp filespecs
-                inp.add(p);
-                
-                //Store the target operators for tuples read
-                //from this input
-                List<PhysicalOperator> ldSucs = mro.mapPlan.getSuccessors(ld);
-                List<OperatorKey> ldSucKeys = new ArrayList<OperatorKey>();
-                if(ldSucs!=null){
-                    for (PhysicalOperator operator2 : ldSucs) {
-                        ldSucKeys.add(operator2.getOperatorKey());
+
+        try{        
+            //Process the POLoads
+            List<POLoad> lds = PlanHelper.getLoads(mro.mapPlan);
+            
+            if(lds!=null && lds.size()>0){
+                for (POLoad ld : lds) {
+                    
+                    Pair<FileSpec, Boolean> p = new Pair<FileSpec, Boolean>(ld.getLFile(), ld.isSplittable());
+                    //Store the inp filespecs
+                    inp.add(p);
+                    
+                    //Store the target operators for tuples read
+                    //from this input
+                    List<PhysicalOperator> ldSucs = mro.mapPlan.getSuccessors(ld);
+                    List<OperatorKey> ldSucKeys = new ArrayList<OperatorKey>();
+                    if(ldSucs!=null){
+                        for (PhysicalOperator operator2 : ldSucs) {
+                            ldSucKeys.add(operator2.getOperatorKey());
+                        }
                     }
+                    inpTargets.add(ldSucKeys);
+                    //Remove the POLoad from the plan
+                    mro.mapPlan.remove(ld);
                 }
-                inpTargets.add(ldSucKeys);
-                //Remove the POLoad from the plan
-                mro.mapPlan.remove(ld);
             }
-        }
-        try{
+
             //Create the jar of all functions reuired
             File submitJarFile = File.createTempFile("Job", ".jar");
             // ensure the job jar is deleted on exit
@@ -277,27 +313,49 @@
             jobConf.setOutputFormat(PigOutputFormat.class);
             
             //Process POStore and remove it from the plan
-            POStore st = null;
-            if(mro.reducePlan.isEmpty()){
-                st = (POStore) mro.mapPlan.getLeaves().get(0);
-                mro.mapPlan.remove(st);
-            }
-            else{
-                st = (POStore) mro.reducePlan.getLeaves().get(0);
-                mro.reducePlan.remove(st);
-            }
-            //set out filespecs
-            String outputPath = st.getSFile().getFileName();
-            FuncSpec outputFuncSpec = st.getSFile().getFuncSpec();
-            FileOutputFormat.setOutputPath(jobConf, new Path(outputPath));
-            jobConf.set("pig.storeFunc", outputFuncSpec.toString());
-
-            // Setup the logs directory for streaming jobs
-            jobConf.set("pig.streaming.log.dir", 
-                        new Path(new Path(outputPath), LOG_DIR).toString());
-            jobConf.set("pig.streaming.task.output.dir", outputPath);
+            List<POStore> mapStores = PlanHelper.getStores(mro.mapPlan);
+            List<POStore> reduceStores = PlanHelper.getStores(mro.reducePlan);
+
+            if (mapStores.size() + reduceStores.size() == 1) { // single store case
+                log.info("Setting up single store job");
+                
+                POStore st;
+                if (reduceStores.isEmpty()) {
+                    st = mapStores.remove(0);
+                    mro.mapPlan.remove(st);
+                }
+                else {
+                    st = reduceStores.remove(0);
+                    mro.reducePlan.remove(st);
+                }
+                String outputPath = st.getSFile().getFileName();
+                FuncSpec outputFuncSpec = st.getSFile().getFuncSpec();
+                FileOutputFormat.setOutputPath(jobConf, new Path(outputPath));
+                jobConf.set("pig.storeFunc", outputFuncSpec.toString());
+                
+                jobConf.set("pig.streaming.log.dir", 
+                            new Path(outputPath, LOG_DIR).toString());
+                jobConf.set("pig.streaming.task.output.dir", outputPath);
+            } 
+           else { // multi store case
+                log.info("Setting up multi store job");
+
+                makeTmpPath();
+                FileSystem fs = curTmpPath.getFileSystem(conf);
+                for (POStore st: mapStores) {
+                    Path tmpOut = new Path(
+                        curTmpPath,
+                        PlanHelper.makeStoreTmpPath(st.getSFile().getFileName()));
+                    fs.mkdirs(tmpOut);
+                }
+
+                FileOutputFormat.setOutputPath(jobConf, curTmpPath);
+
+                jobConf.set("pig.streaming.log.dir", 
+                            new Path(curTmpPath, LOG_DIR).toString());
+                jobConf.set("pig.streaming.task.output.dir", curTmpPath.toString());
+           }
 
-            
             // store map key type
             // this is needed when the key is null to create
             // an appropriate NullableXXXWritable object
@@ -305,13 +363,9 @@
 
             // set parent plan in all operators in map and reduce plans
             // currently the parent plan is really used only when POStream is present in the plan
-            PhysicalPlan[] plans = new PhysicalPlan[] { mro.mapPlan, mro.reducePlan };
-            for (int i = 0; i < plans.length; i++) {
-                for (Iterator<PhysicalOperator> it = plans[i].iterator(); it.hasNext();) {
-                    PhysicalOperator op = it.next();
-                    op.setParentPlan(plans[i]);                
-                }    
-            }
+            new PhyPlanSetter(mro.mapPlan).visit();
+            new PhyPlanSetter(mro.reducePlan).visit();
+
             POPackage pack = null;
             if(mro.reducePlan.isEmpty()){
                 //MapOnly Job

Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/Launcher.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/Launcher.java?rev=767974&r1=767973&r2=767974&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/Launcher.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/Launcher.java Thu Apr 23 16:57:16 2009
@@ -104,13 +104,17 @@
      * @param pp PhysicalPlan to explain
      * @param pc PigContext to use for configuration
      * @param ps PrintStream to write output on.
+     * @param format Format to write in
+     * @param verbose Amount of information to print
      * @throws VisitorException
      * @throws IOException
      */
     public abstract void explain(
             PhysicalPlan pp,
             PigContext pc,
-            PrintStream ps) throws PlanException,
+            PrintStream ps,
+            String format,
+            boolean verbose) throws PlanException,
                                    VisitorException,
                                    IOException;
     

Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java?rev=767974&r1=767973&r2=767974&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java Thu Apr 23 16:57:16 2009
@@ -77,6 +77,7 @@
 import org.apache.pig.impl.plan.VisitorException;
 import org.apache.pig.impl.plan.CompilationMessageCollector.MessageType;
 import org.apache.pig.impl.util.Pair;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;
 
 /**
  * The compiler that compiles a given physical plan
@@ -129,6 +130,9 @@
     
     //The output of compiling the inputs
     MapReduceOper[] compiledInputs = null;
+
+    //Mapping of which MapReduceOper a store belongs to.
+    Map<POStore, MapReduceOper> storeToMapReduceMap;
     
     //The split operators seen till now. If not
     //maintained they will haunt you.
@@ -175,6 +179,7 @@
         }
         scope = roots.get(0).getOperatorKey().getScope();
         messageCollector = new CompilationMessageCollector() ;
+        storeToMapReduceMap = new HashMap<POStore, MapReduceOper>();
     }
     
     public void randomizeFileLocalizer(){
@@ -212,15 +217,21 @@
      */
     public MROperPlan compile() throws IOException, PlanException, VisitorException {
         List<PhysicalOperator> leaves = plan.getLeaves();
-        if(!(leaves.get(0) instanceof POStore)) {
-            int errCode = 2025;
-            String msg = "Expected leaf of reduce plan to " +
-                "always be POStore. Found " + leaves.get(0).getClass().getSimpleName();
-            throw new MRCompilerException(msg, errCode, PigException.BUG);
-        }
-        POStore store = (POStore)leaves.get(0);
-        FileLocalizer.registerDeleteOnFail(store.getSFile().getFileName(), pigContext);
-        compile(store);
+
+        for (PhysicalOperator op : leaves) {
+            if (!(op instanceof POStore)) {
+                int errCode = 2025;
+                String msg = "Expected leaf of reduce plan to " +
+                    "always be POStore. Found " + op.getClass().getSimpleName();
+                throw new MRCompilerException(msg, errCode, PigException.BUG);
+            }
+        }
+
+        List<POStore> stores = PlanHelper.getStores(plan);
+        for (POStore store: stores) {
+            FileLocalizer.registerDeleteOnFail(store.getSFile().getFileName(), pigContext);
+            compile(store);
+        }
 
         // I'm quite certain this is not the best way to do this.  The issue
         // is that for jobs that take multiple map reduce passes, for
@@ -270,6 +281,41 @@
         //op.
         List<PhysicalOperator> predecessors = plan.getPredecessors(op);
         if (predecessors != null && predecessors.size() > 0) {
+            // When processing an entire script (multiquery), we can
+            // get into a situation where a load has
+            // predecessors. This means that it depends on some store
+            // earlier in the plan. We need to take that dependency
+            // and connect the respective MR operators, while at the
+            // same time removing the connection between the Physical
+            // operators. That way the jobs will run in the right
+            // order.
+            if (op instanceof POLoad) {
+
+                if (predecessors.size() != 1) {
+                    int errCode = 2125;
+                    String msg = "Expected at most one predecessor of load. Got "+predecessors.size();
+                    throw new PlanException(msg, errCode, PigException.BUG);
+                }
+
+                PhysicalOperator p = predecessors.get(0);
+                if (!(p instanceof POStore)) {
+                    int errCode = 2126;
+                    String msg = "Predecessor of load should be a store. Got "+p.getClass();
+                    throw new PlanException(msg, errCode, PigException.BUG);
+                }
+
+                // Need new operator
+                curMROp = getMROp();
+                curMROp.mapPlan.add(op);
+                MRPlan.add(curMROp);
+                
+                MapReduceOper oper = storeToMapReduceMap.get((POStore)p);
+
+                plan.disconnect(op, p);
+                MRPlan.connect(oper, curMROp);
+                return;
+            }
+            
             Collections.sort(predecessors);
             compiledInputs = new MapReduceOper[predecessors.size()];
             int i = -1;
@@ -311,7 +357,9 @@
     
     private POStore getStore(){
         POStore st = new POStore(new OperatorKey(scope,nig.getNextNodeId(scope)));
-        st.setPc(pigContext);
+        // mark store as tmp store. These could be removed by the
+        // optimizer, because it wasn't the user requesting it.
+        st.setIsTmpStore(true);
         return st;
     }
     
@@ -501,7 +549,7 @@
         MRPlan.connect(old, ret);
         return ret;
     }
-    
+ 
     /**
      * Returns a temporary DFS Path
      * @return
@@ -586,14 +634,6 @@
         }
     }
 
-    /*private void addUDFs(PhysicalPlan plan) throws VisitorException{
-        if(plan!=null){
-            udfFinderForExpr.setPlan(plan);
-            udfFinderForExpr.visit();
-            curMROp.UDFs.addAll(udfFinderForExpr.getUDFs());
-        }
-    }*/
-    
     private void addUDFs(PhysicalPlan plan) throws VisitorException{
         if(plan!=null){
             udfFinder.setPlan(plan);
@@ -620,6 +660,7 @@
         try{
             FileSpec fSpec = op.getSplitStore();
             MapReduceOper mro = endSingleInputPlanWithStr(fSpec);
+            mro.setSplitter(true);
             splitsSeen.put(op.getOperatorKey(), mro);
             curMROp = startNew(fSpec, mro);
         }catch(Exception e){
@@ -641,6 +682,7 @@
     
     public void visitStore(POStore op) throws VisitorException{
         try{
+            storeToMapReduceMap.put(op, curMROp);
             nonBlocking(op);
         }catch(Exception e){
             int errCode = 2034;
@@ -1740,5 +1782,5 @@
             keyType = p.getResultType();
         }
     }
-    
+
 }

Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java?rev=767974&r1=767973&r2=767974&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java Thu Apr 23 16:57:16 2009
@@ -23,6 +23,7 @@
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
+import java.util.LinkedList;
 import java.util.Map;
 
 import org.apache.commons.logging.Log;
@@ -44,6 +45,7 @@
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRCompiler.LastInputStreamingOptimizer;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MRPrinter;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.DotMRPrinter;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MRStreamHandler;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.POPackageAnnotator;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
@@ -75,7 +77,7 @@
                                                    ExecException,
                                                    JobCreationException,
                                                    Exception {
-        long sleepTime = 5000;
+        long sleepTime = 500;
         aggregateWarning = "true".equalsIgnoreCase(pc.getProperties().getProperty("aggregate.warning"));
         MROperPlan mrp = compile(php, pc);
         
@@ -84,70 +86,79 @@
         Configuration conf = ConfigurationUtil.toConfiguration(exe.getConfiguration());
         JobClient jobClient = ((HExecutionEngine)exe).getJobClient();
 
-        JobControlCompiler jcc = new JobControlCompiler();
+        JobControlCompiler jcc = new JobControlCompiler(pc, conf);
         
-        JobControl jc = jcc.compile(mrp, grpName, conf, pc);
-        
-        int numMRJobs = jc.getWaitingJobs().size();
+        List<Job> failedJobs = new LinkedList<Job>();
+        List<Job> succJobs = new LinkedList<Job>();
+        JobControl jc;
+        int totalMRJobs = mrp.size();
+        int numMRJobsCompl = 0;
+        int numMRJobsCurrent = 0;
+        double lastProg = -1;
         
         //create the exception handler for the job control thread
         //and register the handler with the job control thread
         JobControlThreadExceptionHandler jctExceptionHandler = new JobControlThreadExceptionHandler();
-        Thread jcThread = new Thread(jc);
-        jcThread.setUncaughtExceptionHandler(jctExceptionHandler);
-        jcThread.start();
 
-        double lastProg = -1;
-        int perCom = 0;
-        while(!jc.allFinished()){
-            try {
-                Thread.sleep(sleepTime);
-            } catch (InterruptedException e) {}
-            double prog = calculateProgress(jc, jobClient)/numMRJobs;
-            if(prog>=(lastProg+0.01)){
-                perCom = (int)(prog * 100);
-                if(perCom!=100)
-                    log.info( perCom + "% complete");
+        while((jc = jcc.compile(mrp, grpName)) != null) {
+            numMRJobsCurrent = jc.getWaitingJobs().size();
+
+            Thread jcThread = new Thread(jc);
+            jcThread.setUncaughtExceptionHandler(jctExceptionHandler);
+            jcThread.start();
+
+            while(!jc.allFinished()){
+                try {
+                    Thread.sleep(sleepTime);
+                } catch (InterruptedException e) {}
+                double prog = (numMRJobsCompl+calculateProgress(jc, jobClient))/totalMRJobs;
+                if(prog>=(lastProg+0.01)){
+                    int perCom = (int)(prog * 100);
+                    if(perCom!=100)
+                        log.info( perCom + "% complete");
+                }
+                lastProg = prog;
             }
-            lastProg = prog;
-        }
-        
-        //check for the jobControlException first
-        //if the job controller fails before launching the jobs then there are
-        //no jobs to check for failure
-        if(jobControlException != null) {
+
+            //check for the jobControlException first
+            //if the job controller fails before launching the jobs then there are
+            //no jobs to check for failure
+            if(jobControlException != null) {
         	if(jobControlException instanceof PigException) {
-        		throw jobControlException;
+                    throw jobControlException;
         	} else {
-	        	int errCode = 2117;
-	        	String msg = "Unexpected error when launching map reduce job.";        	
-	    		throw new ExecException(msg, errCode, PigException.BUG, jobControlException);
+                    int errCode = 2117;
+                    String msg = "Unexpected error when launching map reduce job.";        	
+                    throw new ExecException(msg, errCode, PigException.BUG, jobControlException);
         	}
+            }
+
+            numMRJobsCompl += numMRJobsCurrent;
+            failedJobs.addAll(jc.getFailedJobs());
+            succJobs.addAll(jc.getSuccessfulJobs());
+            jcc.moveResults();
+            jc.stop(); 
         }
-        
+
         // Look to see if any jobs failed.  If so, we need to report that.
-        List<Job> failedJobs = jc.getFailedJobs();
         if (failedJobs != null && failedJobs.size() > 0) {
             log.error("Map reduce job failed");
             for (Job fj : failedJobs) {
                 getStats(fj, jobClient, true, pc);
             }
-            jc.stop(); 
             return false;
         }
 
         Map<Enum, Long> warningAggMap = new HashMap<Enum, Long>();
                 
-        List<Job> succJobs = jc.getSuccessfulJobs();
-        if(succJobs!=null)
+        if(succJobs!=null) {
             for(Job job : succJobs){
                 getStats(job,jobClient, false, pc);
                 if(aggregateWarning) {
                 	computeWarningAggregate(job, jobClient, warningAggMap);
                 }
             }
-
-        jc.stop();
+        }
         
         if(aggregateWarning) {
         	CompilationMessageCollector.logAggregate(warningAggMap, MessageType.Warning, log) ;
@@ -162,13 +173,27 @@
     public void explain(
             PhysicalPlan php,
             PigContext pc,
-            PrintStream ps) throws PlanException, VisitorException,
+            PrintStream ps,
+            String format,
+            boolean verbose) throws PlanException, VisitorException,
                                    IOException {
         log.trace("Entering MapReduceLauncher.explain");
         MROperPlan mrp = compile(php, pc);
 
-        MRPrinter printer = new MRPrinter(ps, mrp);
-        printer.visit();
+        if (format.equals("text")) {
+            MRPrinter printer = new MRPrinter(ps, mrp);
+            printer.setVerbose(verbose);
+            printer.visit();
+        } else {
+            ps.println("#--------------------------------------------------");
+            ps.println("# Map Reduce Plan                                  ");
+            ps.println("#--------------------------------------------------");
+            
+            DotMRPrinter printer =new DotMRPrinter(mrp, ps);
+            printer.setVerbose(verbose);
+            printer.dump();
+            ps.println("");
+        }
     }
 
     private MROperPlan compile(
@@ -197,10 +222,6 @@
         POPackageAnnotator pkgAnnotator = new POPackageAnnotator(plan);
         pkgAnnotator.visit();
         
-        // check whether stream operator is present
-        MRStreamHandler checker = new MRStreamHandler(plan);
-        checker.visit();
-        
         // optimize joins
         LastInputStreamingOptimizer liso = 
             new MRCompiler.LastInputStreamingOptimizer(plan, lastInputChunkSize);
@@ -211,6 +232,27 @@
         // an appropriate NullableXXXWritable object
         KeyTypeDiscoveryVisitor kdv = new KeyTypeDiscoveryVisitor(plan);
         kdv.visit();
+
+        // removes the filter(constant(true)) operators introduced by
+        // splits.
+        NoopFilterRemover fRem = new NoopFilterRemover(plan);
+        fRem.visit();
+        
+        MultiQueryOptimizer mqOptimizer = new MultiQueryOptimizer(plan);
+        mqOptimizer.visit();
+
+        // removes unnecessary stores (as can happen with splits in
+        // some cases.). This has to run after the MultiQuery and
+        // NoopFilterRemover.
+        NoopStoreRemover sRem = new NoopStoreRemover(plan);
+        sRem.visit();
+
+        // check whether stream operator is present
+        // after MultiQueryOptimizer because it can shift streams from
+        // map to reduce, etc.
+        MRStreamHandler checker = new MRStreamHandler(plan);
+        checker.visit();
+        
         return plan;
     }
     

Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java?rev=767974&r1=767973&r2=767974&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java Thu Apr 23 16:57:16 2009
@@ -109,6 +109,10 @@
     // to add additional map reduce operator with 1 reducer after this
     long limit = -1;
 
+    // Indicates that this MROper is a splitter MROper. 
+    // That is, this MROper ends due to a POSPlit operator.
+    private boolean splitter = false;
+
     public MapReduceOper(OperatorKey k) {
         super(k);
         mapPlan = new PhysicalPlan();
@@ -323,4 +327,16 @@
     public void setReplFiles(FileSpec[] replFiles) {
         this.replFiles = replFiles;
     }
+
+    public int getRequestedParallelism() {
+        return requestedParallelism;
+    }
+
+    public void setSplitter(boolean spl) {
+        splitter = spl;
+    }
+
+    public boolean isSplitter() {
+        return splitter;
+    }
 }

Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigInputFormat.java?rev=767974&r1=767973&r2=767974&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigInputFormat.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigInputFormat.java Thu Apr 23 16:57:16 2009
@@ -57,7 +57,7 @@
 import org.apache.pig.impl.util.Pair;
 
 public class PigInputFormat implements InputFormat<Text, Tuple>,
-        JobConfigurable {
+       JobConfigurable {
 
     public static final Log LOG = LogFactory
             .getLog(PigInputFormat.class);
@@ -200,11 +200,25 @@
         for (int i = 0; i < inputs.size(); i++) {
             try {
 				Path path = new Path(inputs.get(i).first.getFileName());
-				FileSystem fs = path.getFileSystem(job);
+                                
+                                FileSystem fs;
+                                
+                                try {
+                                    fs = path.getFileSystem(job);
+                                } catch (Exception e) {
+                                    // If an application specific
+                                    // scheme was used
+                                    // (e.g.: "hbase://table") we will fail
+                                    // getting the file system. That's
+                                    // ok, we just use the dfs in that case.
+                                    fs = new Path("/").getFileSystem(job);
+                                }
+
 				// if the execution is against Mapred DFS, set
 				// working dir to /user/<userid>
-				if(pigContext.getExecType() == ExecType.MAPREDUCE)
+				if(pigContext.getExecType() == ExecType.MAPREDUCE) {
 				    fs.setWorkingDirectory(new Path("/user", job.getUser()));
+                                }
 				
 				DataStorage store = new HDataStorage(ConfigurationUtil.toProperties(job));
 				ValidatingInputFileSpec spec;