You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by pr...@apache.org on 2009/03/05 02:04:07 UTC

svn commit: r750264 - in /hadoop/pig/branches/multiquery: ./ src/org/apache/pig/ src/org/apache/pig/backend/executionengine/util/ src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/ src/org/apache/pig/backend/hadoop/executionengine/physic...

Author: pradeepkth
Date: Thu Mar  5 01:04:06 2009
New Revision: 750264

URL: http://svn.apache.org/viewvc?rev=750264&view=rev
Log:
PIG-627: multiquery support M2 (hagleitn via pradeepkth)

Modified:
    hadoop/pig/branches/multiquery/CHANGES.txt
    hadoop/pig/branches/multiquery/src/org/apache/pig/PigServer.java
    hadoop/pig/branches/multiquery/src/org/apache/pig/backend/executionengine/util/ExecTools.java
    hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
    hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/LocalLauncher.java
    hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
    hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
    hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapBase.java
    hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java
    hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java
    hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStore.java
    hadoop/pig/branches/multiquery/src/org/apache/pig/backend/local/executionengine/LocalExecutionEngine.java
    hadoop/pig/branches/multiquery/src/org/apache/pig/backend/local/executionengine/LocalPigLauncher.java
    hadoop/pig/branches/multiquery/test/org/apache/pig/test/TestJobSubmission.java
    hadoop/pig/branches/multiquery/test/org/apache/pig/test/TestLocalJobSubmission.java
    hadoop/pig/branches/multiquery/test/org/apache/pig/test/TestMultiQuery.java
    hadoop/pig/branches/multiquery/test/org/apache/pig/test/TestStore.java
    hadoop/pig/branches/multiquery/test/org/apache/pig/test/utils/GenPhyOp.java

Modified: hadoop/pig/branches/multiquery/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/multiquery/CHANGES.txt?rev=750264&r1=750263&r2=750264&view=diff
==============================================================================
--- hadoop/pig/branches/multiquery/CHANGES.txt (original)
+++ hadoop/pig/branches/multiquery/CHANGES.txt Thu Mar  5 01:04:06 2009
@@ -408,3 +408,5 @@
     PIG-642: Limit after FRJ causes problems (daijy)
 
     PIG-627: multiquery support M1 (hagleitn via olgan)
+
+    PIG-627: multiquery support M2 (hagleitn via pradeepkth)

Modified: hadoop/pig/branches/multiquery/src/org/apache/pig/PigServer.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/multiquery/src/org/apache/pig/PigServer.java?rev=750264&r1=750263&r2=750264&view=diff
==============================================================================
--- hadoop/pig/branches/multiquery/src/org/apache/pig/PigServer.java (original)
+++ hadoop/pig/branches/multiquery/src/org/apache/pig/PigServer.java Thu Mar  5 01:04:06 2009
@@ -751,11 +751,9 @@
             if(leaves.size() == 1) {
                 leaf = leaves.get(0);
             } else {
-                for (Iterator<LogicalOperator> it = leaves.iterator(); it.hasNext();) {
-                    LogicalOperator leafOp = it.next();
-                    if(leafOp.getAlias().equals(alias))
-                        leaf = leafOp;
-                }
+                // should have exactly one leaf since we asked for a
+                // specific alias.
+                throw new AssertionError("Ceci n'est pas un bug.");
             }
             
             lp = QueryParser.generateStorePlan(scope, lp, "fakefile", 

Modified: hadoop/pig/branches/multiquery/src/org/apache/pig/backend/executionengine/util/ExecTools.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/multiquery/src/org/apache/pig/backend/executionengine/util/ExecTools.java?rev=750264&r1=750263&r2=750264&view=diff
==============================================================================
--- hadoop/pig/branches/multiquery/src/org/apache/pig/backend/executionengine/util/ExecTools.java (original)
+++ hadoop/pig/branches/multiquery/src/org/apache/pig/backend/executionengine/util/ExecTools.java Thu Mar  5 01:04:06 2009
@@ -47,7 +47,6 @@
                 String scope = leaf.getOperatorKey().getScope();
                 POStore str = new POStore(new OperatorKey(scope,
                     NodeIdGenerator.getGenerator().getNextNodeId(scope)));
-                str.setPc(pigContext);
                 spec = new FileSpec(FileLocalizer.getTemporaryPath(null,
                     pigContext).toString(),
                     new FuncSpec(BinStorage.class.getName()));

Modified: hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java?rev=750264&r1=750263&r2=750264&view=diff
==============================================================================
--- hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java (original)
+++ hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java Thu Mar  5 01:04:06 2009
@@ -22,12 +22,7 @@
 import java.io.IOException;
 import java.net.URI;
 import java.net.URISyntaxException;
-import java.util.ArrayList;
-import java.util.Hashtable;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
+import java.util.*;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -35,6 +30,7 @@
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.filecache.DistributedCache;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.io.WritableComparator;
@@ -53,6 +49,7 @@
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;
 import org.apache.pig.data.BagFactory;
 import org.apache.pig.data.DataType;
 import org.apache.pig.data.TupleFactory;
@@ -104,85 +101,121 @@
 
     public static final String LOG_DIR = "_logs";
 
+    private List<Path> tmpPath;
+    private Path curTmpPath;
+
+    public JobControlCompiler(PigContext pigContext, Configuration conf) throws IOException {
+        this.pigContext = pigContext;
+        this.conf = conf;
+        tmpPath = new LinkedList<Path>();
+    }
+
+    /**
+     * Moves all the results of a collection of MR jobs to the final
+     * output directory. Some of the results may have been put into a
+     * temp location to work around restrictions with multiple output
+     * from a single map reduce job.
+     *
+     * This method should always be called after the job execution
+     * completes.
+     */
+    public void moveResults() throws IOException {
+        if (curTmpPath != null) {
+            tmpPath.add(curTmpPath);
+            curTmpPath = null;
+        }
+
+        for (Path tmp: tmpPath) {
+            Path abs = new Path(tmp, "abs");
+            Path rel = new Path(tmp, "rel");
+            FileSystem fs = tmp.getFileSystem(conf);
+
+            if (fs.exists(abs)) {
+                moveResults(abs, abs.toUri().getPath(), fs);
+            }
+            
+            if (fs.exists(rel)) {        
+                moveResults(rel, rel.toUri().getPath()+"/", fs);
+            }
+        }
+        tmpPath = new LinkedList<Path>();
+    }
+
+    /**
+     * Walks the temporary directory structure to move (rename) files
+     * to their final location.
+     */
+    private void moveResults(Path p, String rem, FileSystem fs) throws IOException {
+        for (FileStatus fstat: fs.listStatus(p)) {
+            Path src = fstat.getPath();
+            if (fstat.isDir()) {
+                fs.mkdirs(removePart(src, rem));
+                moveResults(fstat.getPath(), rem, fs);
+            } else {
+                Path dst = removePart(src, rem);
+                fs.rename(src,dst);
+            }
+        }
+    }
+
+    private Path removePart(Path src, String part) {
+        URI uri = src.toUri();
+        String pathStr = uri.getPath().replace(part, "");
+        return new Path(pathStr);
+    }
+
+    private void makeTmpPath() throws IOException {
+        if (curTmpPath != null) {
+            tmpPath.add(curTmpPath);
+        }
+
+        for (int tries = 0;;) {
+            try {
+                curTmpPath = 
+                    new Path(FileLocalizer
+                             .getTemporaryPath(null, pigContext).toString());
+                FileSystem fs = curTmpPath.getFileSystem(conf);
+                curTmpPath = curTmpPath.makeQualified(fs);
+                fs.mkdirs(curTmpPath);
+                break;
+            } catch (IOException ioe) {
+                if (++tries==100) {
+                    throw ioe;
+                }
+            }
+        }
+    }
+
     /**
      * The map between MapReduceOpers and their corresponding Jobs
      */
     Map<OperatorKey, Job> seen = new Hashtable<OperatorKey, Job>();
     
     /**
-     * Top level compile method that issues a call to the recursive
-     * compile method.
+     * Compiles all jobs that have no dependencies removes them from
+     * the plan and returns. Should be called with the same plan until
+     * exhausted. 
      * @param plan - The MROperPlan to be compiled
      * @param grpName - The name given to the JobControl
-     * @param conf - The Configuration object having the various properties
-     * @param pigContext - PigContext passed on from the execution engine
-     * @return JobControl object
+     * @return JobControl object - null if no more jobs in plan
      * @throws JobCreationException
      */
-    public JobControl compile(MROperPlan plan, String grpName, Configuration conf, PigContext pigContext) throws JobCreationException{
+    public JobControl compile(MROperPlan plan, String grpName) throws JobCreationException{
         this.plan = plan;
-        this.conf = conf;
-        this.pigContext = pigContext;
-        JobControl jobCtrl = new JobControl(grpName);
-        
-        List<MapReduceOper> leaves ;
-        leaves = plan.getLeaves();
-        
-        for (MapReduceOper mro : leaves) {
-            jobCtrl.addJob(compile(mro,jobCtrl));
+
+        if (plan.size() == 0) {
+            return null;
         }
-        return jobCtrl;
-    }
-    
-    /**
-     * The recursive compilation method that works by doing a depth first 
-     * traversal of the MROperPlan. Compiles a Job for the input MapReduceOper
-     * with the dependencies maintained in jobCtrl
-     * @param mro - Input MapReduceOper for which a Job needs to be compiled
-     * @param jobCtrl - The running JobCtrl object to maintain dependencies b/w jobs
-     * @return Job corresponding to the input mro
-     * @throws JobCreationException
-     */
-    private Job compile(MapReduceOper mro, JobControl jobCtrl) throws JobCreationException {
-        List<MapReduceOper> pred = plan.getPredecessors(mro);
-        
-        JobConf currJC = null;
-        
-        try{
-            if(pred==null || pred.size()<=0){
-                //No dependencies! Create the JobConf
-                //Construct the Job object with it and return
-                Job ret = null;
-                if(seen.containsKey(mro.getOperatorKey()))
-                    ret = seen.get(mro.getOperatorKey());
-                else{
-                    currJC = getJobConf(mro, conf, pigContext);
-                    ret = new Job(currJC,null);
-                    seen.put(mro.getOperatorKey(), ret);
-                }
-                return ret;
-            }
-            
-            //Has dependencies. So compile all the inputs
-            List<Job> compiledInputs = new ArrayList<Job>(pred.size());
-            
-            for (MapReduceOper oper : pred) {
-                Job ret = null;
-                if(seen.containsKey(oper.getOperatorKey()))
-                    ret = seen.get(oper.getOperatorKey());
-                else{
-                    ret = compile(oper, jobCtrl);
-                    jobCtrl.addJob(ret);
-                    seen.put(oper.getOperatorKey(),ret);
-                }
-                compiledInputs.add(ret);
+
+        JobControl jobCtrl = new JobControl(grpName);
+
+        try {
+            List<MapReduceOper> roots = new LinkedList<MapReduceOper>();
+            roots.addAll(plan.getRoots());
+            for (MapReduceOper mro: roots) {
+                jobCtrl.addJob(new Job(getJobConf(mro, conf, pigContext)));
+                plan.remove(mro);
             }
-            //Get JobConf for the current MapReduceOper
-            currJC = getJobConf(mro, conf, pigContext);
-            
-            //Create a new Job with the obtained JobConf
-            //and the compiled inputs as dependent jobs
-            return new Job(currJC,(ArrayList<Job>)compiledInputs);
         } catch (JobCreationException jce) {
         	throw jce;
         } catch(Exception e) {
@@ -190,8 +223,10 @@
             String msg = "Internal error creating job configuration.";
             throw new JobCreationException(msg, errCode, PigException.BUG, e);
         }
+
+        return jobCtrl;
     }
-    
+        
     /**
      * The method that creates the JobConf corresponding to a MapReduceOper.
      * The assumption is that
@@ -225,10 +260,10 @@
         jobConf.setUser(user != null ? user : "Pigster");
         
         //Process the POLoads
-        List<PhysicalOperator> lds = getRoots(mro.mapPlan);
+        List<POLoad> lds = PlanHelper.getLoads(mro.mapPlan);
+
         if(lds!=null && lds.size()>0){
-            for (PhysicalOperator operator : lds) {
-                POLoad ld = (POLoad)operator;
+            for (POLoad ld : lds) {
                 
                 Pair<FileSpec, Boolean> p = new Pair<FileSpec, Boolean>(ld.getLFile(), ld.isSplittable());
                 //Store the inp filespecs
@@ -275,27 +310,49 @@
             jobConf.setOutputFormat(PigOutputFormat.class);
             
             //Process POStore and remove it from the plan
-            POStore st = null;
-            if(mro.reducePlan.isEmpty()){
-                st = (POStore) mro.mapPlan.getLeaves().get(0);
-                mro.mapPlan.remove(st);
-            }
-            else{
-                st = (POStore) mro.reducePlan.getLeaves().get(0);
-                mro.reducePlan.remove(st);
-            }
-            //set out filespecs
-            String outputPath = st.getSFile().getFileName();
-            FuncSpec outputFuncSpec = st.getSFile().getFuncSpec();
-            FileOutputFormat.setOutputPath(jobConf, new Path(outputPath));
-            jobConf.set("pig.storeFunc", outputFuncSpec.toString());
-
-            // Setup the logs directory for streaming jobs
-            jobConf.set("pig.streaming.log.dir", 
-                        new Path(new Path(outputPath), LOG_DIR).toString());
-            jobConf.set("pig.streaming.task.output.dir", outputPath);
+            List<POStore> mapStores = PlanHelper.getStores(mro.mapPlan);
+            List<POStore> reduceStores = PlanHelper.getStores(mro.reducePlan);
+
+            if (mapStores.size() + reduceStores.size() == 1) { // single store case
+                log.info("Setting up single store job");
+                
+                POStore st;
+                if (reduceStores.isEmpty()) {
+                    st = mapStores.remove(0);
+                    mro.mapPlan.remove(st);
+                }
+                else {
+                    st = reduceStores.remove(0);
+                    mro.reducePlan.remove(st);
+                }
+                String outputPath = st.getSFile().getFileName();
+                FuncSpec outputFuncSpec = st.getSFile().getFuncSpec();
+                FileOutputFormat.setOutputPath(jobConf, new Path(outputPath));
+                jobConf.set("pig.storeFunc", outputFuncSpec.toString());
+                
+                jobConf.set("pig.streaming.log.dir", 
+                            new Path(outputPath, LOG_DIR).toString());
+                jobConf.set("pig.streaming.task.output.dir", outputPath);
+            } 
+           else { // multi store case
+                log.info("Setting up multi store job");
+
+                makeTmpPath();
+                FileSystem fs = curTmpPath.getFileSystem(conf);
+                for (POStore st: mapStores) {
+                    Path tmpOut = new Path(
+                        curTmpPath,
+                        PlanHelper.makeStoreTmpPath(st.getSFile().getFileName()));
+                    fs.mkdirs(tmpOut);
+                }
+
+                FileOutputFormat.setOutputPath(jobConf, curTmpPath);
+
+                jobConf.set("pig.streaming.log.dir", 
+                            new Path(curTmpPath, LOG_DIR).toString());
+                jobConf.set("pig.streaming.task.output.dir", curTmpPath.toString());
+           }
 
-            
             // store map key type
             // this is needed when the key is null to create
             // an appropriate NullableXXXWritable object

Modified: hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/LocalLauncher.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/LocalLauncher.java?rev=750264&r1=750263&r2=750264&view=diff
==============================================================================
--- hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/LocalLauncher.java (original)
+++ hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/LocalLauncher.java Thu Mar  5 01:04:06 2009
@@ -1,238 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.pig.backend.hadoop.executionengine.mapReduceLayer;
-
-import java.io.IOException;
-import java.io.PrintStream;
-import java.util.List;
-import java.util.Properties;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapred.JobClient;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.jobcontrol.Job;
-import org.apache.hadoop.mapred.jobcontrol.JobControl;
-
-import org.apache.pig.backend.executionengine.ExecException;
-import org.apache.pig.backend.executionengine.ExecutionEngine;
-import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
-import org.apache.pig.impl.PigContext;
-import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
-import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRCompiler.LastInputStreamingOptimizer;
-import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan;
-import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MRPrinter;
-import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.DotMRPrinter;
-import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MRStreamHandler;
-import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.POPackageAnnotator;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POJoinPackage;
-import org.apache.pig.impl.plan.PlanException;
-import org.apache.pig.impl.plan.VisitorException;
-import org.apache.pig.impl.util.ConfigurationValidator;
-
-
-public class LocalLauncher extends Launcher{
-    private static final Log log = LogFactory.getLog(LocalLauncher.class);
-    
-    @Override
-    public boolean launchPig(
-            PhysicalPlan php,
-            String grpName,
-            PigContext pc) throws PlanException, VisitorException,
-                                  IOException, ExecException,
-                                  JobCreationException {
-        long sleepTime = 500;
-        MROperPlan mrp = compile(php, pc);
-        
-        ExecutionEngine exe = pc.getExecutionEngine();
-        Properties validatedProperties = ConfigurationValidator.getValidatedProperties(exe.getConfiguration());
-        Configuration conf = ConfigurationUtil.toConfiguration(validatedProperties);
-        conf.set("mapred.job.tracker", "local");
-        JobClient jobClient = new JobClient(new JobConf(conf));
-
-        JobControlCompiler jcc = new JobControlCompiler();
-        
-        JobControl jc = jcc.compile(mrp, grpName, conf, pc);
-        
-        
-        int numMRJobs = jc.getWaitingJobs().size();
-        
-        new Thread(jc).start();
-
-        double lastProg = -1;
-        int perCom = 0;
-        while(!jc.allFinished()){
-            try {
-                Thread.sleep(sleepTime);
-            } catch (InterruptedException e) {}
-            double prog = calculateProgress(jc, jobClient)/numMRJobs;
-            if(prog>=(lastProg+0.01)){
-                perCom = (int)(prog * 100);
-                if(perCom!=100)
-                    log.info( perCom + "% complete");
-            }
-            lastProg = prog;
-        }
-        // Look to see if any jobs failed.  If so, we need to report that.
-        List<Job> failedJobs = jc.getFailedJobs();
-        if (failedJobs != null && failedJobs.size() > 0) {
-            log.error("Map reduce job failed");
-            for (Job fj : failedJobs) {
-                log.error(fj.getMessage());
-                getStats(fj, jobClient, true);
-            }
-            jc.stop(); 
-            return false;
-        }
-
-        List<Job> succJobs = jc.getSuccessfulJobs();
-        if(succJobs!=null)
-            for(Job job : succJobs){
-                getStats(job,jobClient, false);
-            }
-
-        jc.stop(); 
-        log.info( "100% complete");
-        log.info("Success!");
-        return true;
-    }
-
-    @Override
-    public void explain(
-            PhysicalPlan php,
-            PigContext pc,
-            PrintStream ps,
-            String format,
-            boolean verbose) throws PlanException, VisitorException,
-                                    IOException {
-        log.trace("Entering LocalLauncher.explain");
-        MROperPlan mrp = compile(php, pc);
-
-        if (format.equals("text")) {
-            MRPrinter printer = new MRPrinter(ps, mrp);
-            printer.setVerbose(verbose);
-            printer.visit();
-        } else {
-            DotMRPrinter printer =new DotMRPrinter(mrp, ps);
-            printer.setVerbose(verbose);
-            printer.dump();
-        }
-    }
- 
-    private MROperPlan compile(
-            PhysicalPlan php,
-            PigContext pc) throws PlanException, IOException, VisitorException {
-        MRCompiler comp = new MRCompiler(php, pc);
-        comp.randomizeFileLocalizer();
-        comp.compile();
-        MROperPlan plan = comp.getMRPlan();
-        String lastInputChunkSize = 
-            pc.getProperties().getProperty(
-                    "last.input.chunksize", POJoinPackage.DEFAULT_CHUNK_SIZE);
-        String prop = System.getProperty("pig.exec.nocombiner");
-        if (!("true".equals(prop)))  {
-            CombinerOptimizer co = new CombinerOptimizer(plan, lastInputChunkSize);
-            co.visit();
-        }
-        
-        // optimize key - value handling in package
-        POPackageAnnotator pkgAnnotator = new POPackageAnnotator(plan);
-        pkgAnnotator.visit();
-        
-        // check whether stream operator is present
-        MRStreamHandler checker = new MRStreamHandler(plan);
-        checker.visit();
-
-        // optimize joins
-        LastInputStreamingOptimizer liso = 
-            new MRCompiler.LastInputStreamingOptimizer(plan, lastInputChunkSize);
-        liso.visit();
-
-        // figure out the type of the key for the map plan
-        // this is needed when the key is null to create
-        // an appropriate NullableXXXWritable object
-        KeyTypeDiscoveryVisitor kdv = new KeyTypeDiscoveryVisitor(plan);
-        kdv.visit();
-        return plan;
-    }
-
-    //A purely testing method. Not to be used elsewhere
-    public boolean launchPigWithCombinePlan(PhysicalPlan php,
-            String grpName, PigContext pc, PhysicalPlan combinePlan) throws PlanException,
-            VisitorException, IOException, ExecException, JobCreationException {
-        long sleepTime = 500;
-        MRCompiler comp = new MRCompiler(php, pc);
-        comp.compile();
-
-        Configuration conf = new Configuration();
-        conf.set("mapred.job.tracker", "local");
-        JobClient jobClient = new JobClient(new JobConf(conf));
-
-        MROperPlan mrp = comp.getMRPlan();
-        if(mrp.getLeaves().get(0)!=mrp.getRoots().get(0))
-            throw new PlanException("Unsupported configuration to test combine plan");
-        
-        MapReduceOper mro = mrp.getLeaves().get(0);
-        mro.combinePlan = combinePlan;
-        
-        JobControlCompiler jcc = new JobControlCompiler();
-
-        JobControl jc = jcc.compile(mrp, grpName, conf, pc);
-
-        int numMRJobs = jc.getWaitingJobs().size();
-
-        new Thread(jc).start();
-
-        double lastProg = -1;
-        while (!jc.allFinished()) {
-            try {
-                Thread.sleep(sleepTime);
-            } catch (InterruptedException e) {
-            }
-            double prog = calculateProgress(jc, jobClient) / numMRJobs;
-            if (prog > lastProg)
-                log.info((int)(prog * 100) + "% complete");
-            lastProg = prog;
-        }
-        lastProg = calculateProgress(jc, jobClient) / numMRJobs;
-        if (isComplete(lastProg))
-            log.info("Completed Successfully");
-        else {
-            log.info("Unsuccessful attempt. Completed " + lastProg * 100
-                    + "% of the job");
-            List<Job> failedJobs = jc.getFailedJobs();
-            if (failedJobs == null)
-                throw new ExecException(
-                        "Something terribly wrong with Job Control.");
-            for (Job job : failedJobs) {
-                getStats(job, jobClient, true);
-            }
-        }
-        List<Job> succJobs = jc.getSuccessfulJobs();
-        if (succJobs != null)
-            for (Job job : succJobs) {
-                getStats(job, jobClient, false);
-            }
-
-        jc.stop();
-
-        return isComplete(lastProg);
-    }
-}

Modified: hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java?rev=750264&r1=750263&r2=750264&view=diff
==============================================================================
--- hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java (original)
+++ hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java Thu Mar  5 01:04:06 2009
@@ -302,7 +302,6 @@
     
     private POStore getStore(){
         POStore st = new POStore(new OperatorKey(scope,nig.getNextNodeId(scope)));
-        st.setPc(pigContext);
         return st;
     }
     
@@ -577,14 +576,6 @@
         }
     }
 
-    /*private void addUDFs(PhysicalPlan plan) throws VisitorException{
-        if(plan!=null){
-            udfFinderForExpr.setPlan(plan);
-            udfFinderForExpr.visit();
-            curMROp.UDFs.addAll(udfFinderForExpr.getUDFs());
-        }
-    }*/
-    
     private void addUDFs(PhysicalPlan plan) throws VisitorException{
         if(plan!=null){
             udfFinder.setPlan(plan);

Modified: hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java?rev=750264&r1=750263&r2=750264&view=diff
==============================================================================
--- hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java (original)
+++ hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java Thu Mar  5 01:04:06 2009
@@ -20,6 +20,7 @@
 import java.io.IOException;
 import java.io.PrintStream;
 import java.util.List;
+import java.util.LinkedList;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -58,7 +59,7 @@
                                                    IOException,
                                                    ExecException,
                                                    JobCreationException {
-        long sleepTime = 5000;
+        long sleepTime = 500;
         MROperPlan mrp = compile(php, pc);
         
         ExecutionEngine exe = pc.getExecutionEngine();
@@ -66,30 +67,38 @@
         Configuration conf = ConfigurationUtil.toConfiguration(exe.getConfiguration());
         JobClient jobClient = ((HExecutionEngine)exe).getJobClient();
 
-        JobControlCompiler jcc = new JobControlCompiler();
+        JobControlCompiler jcc = new JobControlCompiler(pc, conf);
         
-        JobControl jc = jcc.compile(mrp, grpName, conf, pc);
-        
-        int numMRJobs = jc.getWaitingJobs().size();
-        
-        new Thread(jc).start();
-
+        List<Job> failedJobs = new LinkedList<Job>();
+        List<Job> succJobs = new LinkedList<Job>();
+        JobControl jc;
+        int numMRJobs = mrp.size();
         double lastProg = -1;
-        int perCom = 0;
-        while(!jc.allFinished()){
-            try {
-                Thread.sleep(sleepTime);
-            } catch (InterruptedException e) {}
-            double prog = calculateProgress(jc, jobClient)/numMRJobs;
-            if(prog>=(lastProg+0.01)){
-                perCom = (int)(prog * 100);
-                if(perCom!=100)
-                    log.info( perCom + "% complete");
+
+        while((jc = jcc.compile(mrp, grpName)) != null) {
+            numMRJobs += jc.getWaitingJobs().size();
+
+            new Thread(jc).start();
+            
+            while(!jc.allFinished()){
+                try {
+                    Thread.sleep(sleepTime);
+                } catch (InterruptedException e) {}
+                double prog = calculateProgress(jc, jobClient)/numMRJobs;
+                if(prog>=(lastProg+0.01)){
+                    int perCom = (int)(prog * 100);
+                    if(perCom!=100)
+                        log.info( perCom + "% complete");
+                }
+                lastProg = prog;
             }
-            lastProg = prog;
+            failedJobs.addAll(jc.getFailedJobs());
+            succJobs.addAll(jc.getSuccessfulJobs());
+            jcc.moveResults();
+            jc.stop(); 
         }
+
         // Look to see if any jobs failed.  If so, we need to report that.
-        List<Job> failedJobs = jc.getFailedJobs();
         if (failedJobs != null && failedJobs.size() > 0) {
             log.error("Map reduce job failed");
             for (Job fj : failedJobs) {
@@ -100,13 +109,12 @@
             return false;
         }
 
-        List<Job> succJobs = jc.getSuccessfulJobs();
-        if(succJobs!=null)
+        if(succJobs!=null) {
             for(Job job : succJobs){
                 getStats(job,jobClient, false);
             }
+        }
 
-        jc.stop(); 
         log.info( "100% complete");
         log.info("Success!");
         return true;

Modified: hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapBase.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapBase.java?rev=750264&r1=750263&r2=750264&view=diff
==============================================================================
--- hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapBase.java (original)
+++ hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapBase.java Thu Mar  5 01:04:06 2009
@@ -39,8 +39,11 @@
 import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStoreImpl;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;
 import org.apache.pig.impl.util.ObjectSerializer;
 import org.apache.pig.impl.util.SpillableMemoryManager;
 
@@ -54,6 +57,10 @@
     
     //Map Plan
     protected PhysicalPlan mp;
+
+    // Store operators
+    protected List<POStore> stores;
+
     protected TupleFactory tf = TupleFactory.getInstance();
     
     OutputCollector<PigNullableWritable, Writable> outputCollector;
@@ -77,7 +84,7 @@
     @Override
     public void close() throws IOException {
         super.close();
-        PhysicalOperator.setReporter(null);
+
         if(errorInMap) {
             //error in map - returning
             return;
@@ -100,7 +107,19 @@
             }
         }
         mp = null;
-        
+
+        for (POStore store: stores) {
+            if (!initialized) {
+                MapReducePOStoreImpl impl 
+                    = new MapReducePOStoreImpl(PigMapReduce.sJobConf);
+                store.setStoreImpl(impl);
+                store.setUp();
+            }
+            store.tearDown();
+        }
+
+        PhysicalOperator.setReporter(null);
+        initialized = false;
     }
 
     /**
@@ -113,8 +132,9 @@
         SpillableMemoryManager.configure(ConfigurationUtil.toProperties(job));
         PigMapReduce.sJobConf = job;
         try {
-            mp = (PhysicalPlan) ObjectSerializer.deserialize(job
-                    .get("pig.mapPlan"));
+            mp = (PhysicalPlan) ObjectSerializer.deserialize(
+                job.get("pig.mapPlan"));
+            stores = PlanHelper.getStores(mp);
             
             // To be removed
             if(mp.isEmpty())
@@ -166,6 +186,13 @@
             this.outputCollector = oc;
             pigReporter.setRep(reporter);
             PhysicalOperator.setReporter(pigReporter);
+            for (POStore store: stores) {
+                MapReducePOStoreImpl impl 
+                    = new MapReducePOStoreImpl(PigMapReduce.sJobConf);
+                impl.setReporter(reporter);
+                store.setStoreImpl(impl);
+                store.setUp();
+            }
         }
         
         if(mp.isEmpty()){

Modified: hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java?rev=750264&r1=750263&r2=750264&view=diff
==============================================================================
--- hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java (original)
+++ hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java Thu Mar  5 01:04:06 2009
@@ -42,6 +42,8 @@
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POJoinPackage;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;
 import org.apache.pig.data.DataType;
 import org.apache.pig.data.TargetedTuple;
 import org.apache.pig.data.Tuple;
@@ -140,6 +142,9 @@
         
         //The reduce plan
         protected PhysicalPlan rp;
+
+        // Store operators
+        protected List<POStore> stores;
         
         //The POPackage operator which is the
         //root of every Map Reduce plan is
@@ -155,7 +160,11 @@
         protected boolean errorInReduce = false;
         
         PhysicalOperator[] roots;
+
         private PhysicalOperator leaf;
+
+        protected boolean initialized = false;
+
         /**
          * Configures the Reduce plan, the POPackage operator
          * and the reporter thread
@@ -168,6 +177,8 @@
             try {
                 rp = (PhysicalPlan) ObjectSerializer.deserialize(jConf
                         .get("pig.reducePlan"));
+                stores = PlanHelper.getStores(rp);
+
                 pack = (POPackage)ObjectSerializer.deserialize(jConf.get("pig.reduce.package"));
                 // To be removed
                 if(rp.isEmpty())
@@ -203,11 +214,23 @@
                 OutputCollector<PigNullableWritable, Writable> oc,
                 Reporter reporter) throws IOException {
             
-            // cache the collector for use in runPipeline()
-            // which could additionally be called from close()
-            this.outputCollector = oc;
-            pigReporter.setRep(reporter);
-            PhysicalOperator.setReporter(pigReporter);
+            if (!initialized) {
+                initialized = true;
+                
+                // cache the collector for use in runPipeline()
+                // which could additionally be called from close()
+                this.outputCollector = oc;
+                pigReporter.setRep(reporter);
+                PhysicalOperator.setReporter(pigReporter);
+
+                for (POStore store: stores) {
+                    MapReducePOStoreImpl impl 
+                        = new MapReducePOStoreImpl(PigMapReduce.sJobConf);
+                    impl.setReporter(reporter);
+                    store.setStoreImpl(impl);
+                    store.setUp();
+                }
+            }
 
             // In the case we optimize the join, we combine
             // POPackage and POForeach - so we could get many
@@ -322,9 +345,6 @@
         @Override
         public void close() throws IOException {
             super.close();
-            /*if(runnableReporter!=null)
-                runnableReporter.setDone(true);*/
-            PhysicalOperator.setReporter(null);
             
             if(errorInReduce) {
                 // there was an error in reduce - just return
@@ -347,6 +367,19 @@
                      throw ioe;
                 }
             }
+
+            for (POStore store: stores) {
+                if (!initialized) {
+                    MapReducePOStoreImpl impl 
+                        = new MapReducePOStoreImpl(PigMapReduce.sJobConf);
+                    store.setStoreImpl(impl);
+                    store.setUp();
+                }
+                store.tearDown();
+            }
+            
+            PhysicalOperator.setReporter(null);
+            initialized = false;
         }
     }
     
@@ -384,11 +417,23 @@
                 OutputCollector<PigNullableWritable, Writable> oc,
                 Reporter reporter) throws IOException {
             
-            // cache the collector for use in runPipeline()
-            // which could additionally be called from close()
-            this.outputCollector = oc;
-            pigReporter.setRep(reporter);
-            PhysicalOperator.setReporter(pigReporter);
+            if (!initialized) {
+                initialized = true;
+                
+                // cache the collector for use in runPipeline()
+                // which could additionally be called from close()
+                this.outputCollector = oc;
+                pigReporter.setRep(reporter);
+                PhysicalOperator.setReporter(pigReporter);
+
+                for (POStore store: stores) {
+                    MapReducePOStoreImpl impl 
+                        = new MapReducePOStoreImpl(PigMapReduce.sJobConf);
+                    impl.setReporter(reporter);
+                    store.setStoreImpl(impl);
+                    store.setUp();
+                }
+            }
             
             // If the keyType is not a tuple, the MapWithComparator.collect()
             // would have wrapped the key into a tuple so that the 

Modified: hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java?rev=750264&r1=750263&r2=750264&view=diff
==============================================================================
--- hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java (original)
+++ hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java Thu Mar  5 01:04:06 2009
@@ -1169,7 +1169,6 @@
         POStore store = new POStore(new OperatorKey(scope, nodeGen
                 .getNextNodeId(scope)));
         store.setSFile(loStore.getOutputFile());
-        store.setPc(pc);
         currentPlan.add(store);
         
         List<LogicalOperator> op = loStore.getPlan().getPredecessors(loStore); 

Modified: hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStore.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStore.java?rev=750264&r1=750263&r2=750264&view=diff
==============================================================================
--- hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStore.java (original)
+++ hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStore.java Thu Mar  5 01:04:06 2009
@@ -45,20 +45,13 @@
  *
  */
 public class POStore extends PhysicalOperator {
-    /**
-     * 
-     */
+
     private static final long serialVersionUID = 1L;
-    // The user defined load function or a default load function
-    private StoreFunc storer;
-    // The filespec on which the operator is based
-    FileSpec sFile;
-    // The stream used to bind to by the loader
-    OutputStream os;
-    // PigContext passed to us by the operator creator
-    PigContext pc;
-    
+    private static Result empty = new Result(POStatus.STATUS_NULL, null);
+    private StoreFunc storer;    
     private final Log log = LogFactory.getLog(getClass());
+    private POStoreImpl impl;
+    private FileSpec sFile;
     
     public POStore(OperatorKey k) {
         this(k, -1, null);
@@ -73,87 +66,56 @@
     }
     
     /**
-     * Set up the storer by 
-     * 1) Instantiating the store func
-     * 2) Opening an output stream to the specified file and
-     * 3) Binding to the output stream
+     * Set up the storer
      * @throws IOException
      */
-    private void setUp() throws IOException{
-        storer = (StoreFunc)PigContext.instantiateFuncFromSpec(sFile.getFuncSpec());
-        os = FileLocalizer.create(sFile.getFileName(), pc);
-        storer.bindTo(os);
+    public void setUp() throws IOException{
+        if (impl != null) {
+            storer = impl.createStoreFunc(sFile);
+        }
     }
     
     /**
-     * At the end of processing, the outputstream is closed
-     * using this method
+     * Called at the end of processing for clean up.
      * @throws IOException
      */
-    private void tearDown() throws IOException{
-        os.close();
-    }
+    public void tearDown() throws IOException{
+        if (impl != null) {
+            impl.tearDown();
+        }
+   }
     
     /**
      * To perform cleanup when there is an error.
-     * Uses the FileLocalizer method which only 
-     * deletes the file but not the dirs created
-     * with it.
      * @throws IOException
      */
-    private void cleanUp() throws IOException{
-        String fName = sFile.getFileName();
-        os.flush();
-        if(FileLocalizer.fileExists(fName,pc))
-            FileLocalizer.delete(fName,pc);
+    public void cleanUp() throws IOException{
+        if (impl != null) {
+            impl.cleanUp();
+        }
     }
     
-    /**
-     * The main method used by the local execution engine
-     * to store tuples into the specified file using the
-     * specified store function. One call to this method
-     * retrieves all tuples from its predecessor operator
-     * and stores it into the file till it recieves an EOP.
-     * 
-     * If there is an error, the cleanUp routine is called
-     * and then the tearDown is called to close the OutputStream
-     * 
-     * @return Whatever the predecessor returns
-     *          A null from the predecessor is ignored
-     *          and processing of further tuples continued
-     */
-    public Result store() throws ExecException{
-        try{
-            setUp();
-        }catch (IOException e) {
-            ExecException ee = new ExecException("Unable to setup the storer because of the exception: " + e.getMessage());
-            ee.initCause(e);
-            throw ee;
-        }
-        try{
-            Result res;
-            Tuple inpValue = null;
-            while(true){
-                res = processInput();
-                if(res.returnStatus==POStatus.STATUS_OK)
-                    storer.putNext((Tuple)res.result);
-                else if(res.returnStatus==POStatus.STATUS_NULL)
-                    continue;
-                else
-                    break;
-            }
-            if(res.returnStatus==POStatus.STATUS_EOP){
-                storer.finish();
-            }
-            else{
-                cleanUp();
+    @Override
+    public Result getNext(Tuple t) throws ExecException {
+        Result res = processInput();
+        try {
+            switch (res.returnStatus) {
+            case POStatus.STATUS_OK:
+                storer.putNext((Tuple)res.result);
+                res = empty;
+                break;
+            case POStatus.STATUS_EOP:
+                break;
+            case POStatus.STATUS_ERR:
+            case POStatus.STATUS_NULL:
+            default:
+                break;
             }
-            tearDown();
-            return res;
-        }catch(IOException e){
-            log.error("Received error from storer function: " + e);
-            return new Result();
+        } catch (IOException ioe) {
+            log.error("Received error from storer function: " + ioe);
+            throw new ExecException(ioe);
         }
+        return res;
     }
 
     @Override
@@ -174,12 +136,6 @@
         return false;
     }
 
-    public StoreFunc getStorer() {
-        return storer;
-    }
-
-    
-
     @Override
     public void visit(PhyPlanVisitor v) throws VisitorException {
         v.visitStore(this);
@@ -189,16 +145,11 @@
         return sFile;
     }
 
-    public void setSFile(FileSpec file) {
-        sFile = file;
+    public void setSFile(FileSpec sFile) {
+        this.sFile = sFile;
     }
 
-    public PigContext getPc() {
-        return pc;
+    public void setStoreImpl(POStoreImpl impl) {
+        this.impl = impl;
     }
-
-    public void setPc(PigContext pc) {
-        this.pc = pc;
-    }
-
 }

Modified: hadoop/pig/branches/multiquery/src/org/apache/pig/backend/local/executionengine/LocalExecutionEngine.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/multiquery/src/org/apache/pig/backend/local/executionengine/LocalExecutionEngine.java?rev=750264&r1=750263&r2=750264&view=diff
==============================================================================
--- hadoop/pig/branches/multiquery/src/org/apache/pig/backend/local/executionengine/LocalExecutionEngine.java (original)
+++ hadoop/pig/branches/multiquery/src/org/apache/pig/backend/local/executionengine/LocalExecutionEngine.java Thu Mar  5 01:04:06 2009
@@ -49,7 +49,6 @@
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.LogToPhyTranslationVisitor;
 import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.impl.plan.NodeIdGenerator;
-import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.LocalLauncher;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PlanPrinter;
@@ -151,7 +150,6 @@
                 String scope = leaf.getOperatorKey().getScope();
                 POStore str = new POStore(new OperatorKey(scope,
                         NodeIdGenerator.getGenerator().getNextNodeId(scope)));
-                str.setPc(pigContext);
                 spec = new FileSpec(FileLocalizer.getTemporaryPath(null,
                         pigContext).toString(), new FuncSpec(BinStorage.class
                         .getName()));
@@ -161,7 +159,6 @@
                 spec = ((POStore) leaf).getSFile();
             }
 
-            // LocalLauncher launcher = new LocalLauncher();
             LocalPigLauncher launcher = new LocalPigLauncher();
             boolean success = launcher.launchPig(plan, jobName, pigContext);
             if (success)
@@ -189,7 +186,6 @@
         try {
             ExecTools.checkLeafIsStore(plan, pigContext);
 
-            // LocalLauncher launcher = new LocalLauncher();
             LocalPigLauncher launcher = new LocalPigLauncher();
             launcher.explain(plan, pigContext, stream, 
                              format, isVerbose);

Modified: hadoop/pig/branches/multiquery/src/org/apache/pig/backend/local/executionengine/LocalPigLauncher.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/multiquery/src/org/apache/pig/backend/local/executionengine/LocalPigLauncher.java?rev=750264&r1=750263&r2=750264&view=diff
==============================================================================
--- hadoop/pig/branches/multiquery/src/org/apache/pig/backend/local/executionengine/LocalPigLauncher.java (original)
+++ hadoop/pig/branches/multiquery/src/org/apache/pig/backend/local/executionengine/LocalPigLauncher.java Thu Mar  5 01:04:06 2009
@@ -21,6 +21,7 @@
 import java.io.IOException;
 import java.io.PrintStream;
 import java.util.List;
+import java.util.BitSet;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -32,13 +33,16 @@
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
+import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.plan.PlanException;
 import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;
 
 public class LocalPigLauncher extends Launcher {
-
-    Log log = LogFactory.getLog(getClass());
+    private static final Tuple DUMMYTUPLE = null;
+    private Log log = LogFactory.getLog(getClass());
+    List<POStore> stores;
 
     @Override
     public void explain(PhysicalPlan pp, PigContext pc, PrintStream ps,
@@ -53,17 +57,17 @@
             throws PlanException, VisitorException, IOException, ExecException,
             JobCreationException {
         // TODO Auto-generated method stub
-        List<PhysicalOperator> stores = php.getLeaves();
+        stores = PlanHelper.getStores(php);
         int noJobs = stores.size();
         int failedJobs = 0;
 
-        for (PhysicalOperator op : stores) {
-            POStore store = (POStore) op;
-            Result res = store.store();
-            if (res.returnStatus != POStatus.STATUS_EOP)
-                failedJobs++;
+        for (POStore op : stores) {
+            op.setStoreImpl(new LocalPOStoreImpl(pc));
+            op.setUp();
         }
 
+        failedJobs = runPipeline(stores.toArray(new POStore[0]));
+
         if (failedJobs == 0) {
             log.info("100% complete!");
             log.info("Success!!");
@@ -76,4 +80,36 @@
 
     }
 
+    private int runPipeline(POStore[] leaves) throws IOException, ExecException {
+        BitSet bs = new BitSet(leaves.length);
+        int failed = 0;
+        while(true) {
+            if (bs.cardinality() == leaves.length) {
+                break;
+            }
+            for(int i=bs.nextClearBit(0); i<leaves.length; i=bs.nextClearBit(i+1)) {
+                Result res = leaves[i].getNext(DUMMYTUPLE);
+                switch(res.returnStatus) {
+                case POStatus.STATUS_NULL:
+                    // good null from store means keep at it.
+                    continue;
+                case POStatus.STATUS_OK:
+                    // ok shouldn't happen store should have consumed it.
+                    // fallthrough
+                case POStatus.STATUS_ERR:
+                    leaves[i].cleanUp();
+                    leaves[i].tearDown();
+                    failed++;
+                    // fallthrough
+                case POStatus.STATUS_EOP:
+                    leaves[i].tearDown();
+                    // fallthrough
+                default:
+                    bs.set(i);
+                    break;
+                }
+            }
+        }
+        return failed;
+    }
 }

Modified: hadoop/pig/branches/multiquery/test/org/apache/pig/test/TestJobSubmission.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/multiquery/test/org/apache/pig/test/TestJobSubmission.java?rev=750264&r1=750263&r2=750264&view=diff
==============================================================================
--- hadoop/pig/branches/multiquery/test/org/apache/pig/test/TestJobSubmission.java (original)
+++ hadoop/pig/branches/multiquery/test/org/apache/pig/test/TestJobSubmission.java Thu Mar  5 01:04:06 2009
@@ -468,11 +468,11 @@
         ExecutionEngine exe = pc.getExecutionEngine();
         ConfigurationValidator.validatePigProperties(exe.getConfiguration());
         Configuration conf = ConfigurationUtil.toConfiguration(exe.getConfiguration());
-        JobControlCompiler jcc = new JobControlCompiler();
+        JobControlCompiler jcc = new JobControlCompiler(pc, conf);
         try {
-        	jcc.compile(mrPlan, "Test", conf, pc);
+        	jcc.compile(mrPlan, "Test");
         } catch (JobCreationException jce) {
-        	assertTrue(jce.getErrorCode() == 1068);
+            assertTrue(jce.getErrorCode() == 1068);
         }
     }
     

Modified: hadoop/pig/branches/multiquery/test/org/apache/pig/test/TestLocalJobSubmission.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/multiquery/test/org/apache/pig/test/TestLocalJobSubmission.java?rev=750264&r1=750263&r2=750264&view=diff
==============================================================================
--- hadoop/pig/branches/multiquery/test/org/apache/pig/test/TestLocalJobSubmission.java (original)
+++ hadoop/pig/branches/multiquery/test/org/apache/pig/test/TestLocalJobSubmission.java Thu Mar  5 01:04:06 2009
@@ -32,7 +32,9 @@
 import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.io.FileLocalizer;
 import org.apache.pig.impl.io.FileSpec;
-import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.LocalLauncher;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher;
+import org.apache.pig.backend.local.executionengine.LocalPigLauncher;
+import org.apache.pig.backend.local.executionengine.LocalPOStoreImpl;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POProject;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
@@ -130,7 +132,7 @@
         rmrf(outDir);
     }
     
-    private void generateInput(int numTuples) throws ExecException{
+    private void generateInput(int numTuples) throws Exception{
         
         DataBag inpDb = GenRandomData.genRandSmallTupDataBag(r, numTuples, 1000);
         
@@ -146,13 +148,18 @@
         inps.add(proj);
         
         POStore str = new POStore(new OperatorKey("", r.nextLong()));
-        str.setInputs(inps);
         
         FileSpec fSpec = new FileSpec(ldFile, new FuncSpec(PigStorage.class.getName()));
         
         str.setSFile(fSpec);
-        str.setPc(pc);
-        str.store();
+        str.setStoreImpl(new LocalPOStoreImpl(pc));
+
+        PhysicalPlan pp = new PhysicalPlan();
+        pp.add(proj);
+        pp.add(str);
+        pp.connect(proj,str);
+        
+        new LocalPigLauncher().launchPig(pp, "TestLocalJobSubmission", pc);
     }
     
     /*private void setUp1(boolean gen) throws Exception {
@@ -389,7 +396,6 @@
         POStore st = new POStore(new OperatorKey("", r.nextLong()));
         ld.setPc(pc);
         ld.setLFile(LFSpec);
-        st.setPc(pc);
         st.setSFile(SFSpec);
         
         Tuple sample = new DefaultTuple();
@@ -449,7 +455,7 @@
     
     private void submit() throws Exception{
         assertEquals(true, FileLocalizer.fileExists(ldFile, pc));
-        LocalLauncher ll = new LocalLauncher();
+        MapReduceLauncher ll = new MapReduceLauncher();
         ll.launchPig(php, grpName, pc);  
     }
 }

Modified: hadoop/pig/branches/multiquery/test/org/apache/pig/test/TestMultiQuery.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/multiquery/test/org/apache/pig/test/TestMultiQuery.java?rev=750264&r1=750263&r2=750264&view=diff
==============================================================================
--- hadoop/pig/branches/multiquery/test/org/apache/pig/test/TestMultiQuery.java (original)
+++ hadoop/pig/branches/multiquery/test/org/apache/pig/test/TestMultiQuery.java Thu Mar  5 01:04:06 2009
@@ -72,7 +72,7 @@
 
             myPig.registerQuery("a = load 'file:test/org/apache/pig/test/data/passwd' " +
                                 "using PigStorage(':') as (uname:chararray, passwd:chararray, uid:int,gid:int);");
-            myPig.registerQuery("b = filter a by uid > 500;");
+            myPig.registerQuery("b = filter a by uid > 5;");
             myPig.registerQuery("store b into '/tmp/output1';");
             myPig.registerQuery("c = group b by gid;");
             myPig.registerQuery("store c into '/tmp/output2';");
@@ -103,7 +103,7 @@
 
             myPig.registerQuery("a = load 'file:test/org/apache/pig/test/data/passwd' " +
                                 "using PigStorage(':') as (uname:chararray, passwd:chararray, uid:int,gid:int);");
-            myPig.registerQuery("b = filter a by uid > 500;");
+            myPig.registerQuery("b = filter a by uid > 5;");
             myPig.registerQuery("store b into '/tmp/output1';");
             myPig.registerQuery("c = group b by gid;");
             myPig.registerQuery("store c into '/tmp/output2';");
@@ -128,11 +128,11 @@
 
             myPig.registerQuery("a = load 'file:test/org/apache/pig/test/data/passwd' " +
                                 "using PigStorage(':') as (uname:chararray, passwd:chararray, uid:int,gid:int);");
-            myPig.registerQuery("b = filter a by uid > 500;");
+            myPig.registerQuery("b = filter a by uid > 5;");
             myPig.registerQuery("store b into '/tmp/output1';");
-            myPig.registerQuery("c = filter b by uid > 1000;");
+            myPig.registerQuery("c = filter b by uid > 10;");
             myPig.registerQuery("store c into '/tmp/output2';");
-            myPig.registerQuery("d = filter c by uid > 1500;");
+            myPig.registerQuery("d = filter c by uid > 15;");
             myPig.registerQuery("store d into '/tmp/output3';");
 
             LogicalPlan lp = checkLogicalPlan(1, 3, 14);
@@ -161,11 +161,11 @@
 
             myPig.registerQuery("a = load 'file:test/org/apache/pig/test/data/passwd' " +
                                 "using PigStorage(':') as (uname:chararray, passwd:chararray, uid:int,gid:int);");
-            myPig.registerQuery("b = filter a by uid > 500;");
+            myPig.registerQuery("b = filter a by uid > 5;");
             myPig.registerQuery("store b into '/tmp/output1';");
-            myPig.registerQuery("c = filter b by uid > 1000;");
+            myPig.registerQuery("c = filter b by uid > 10;");
             myPig.registerQuery("store c into '/tmp/output2';");
-            myPig.registerQuery("d = filter c by uid > 1500;");
+            myPig.registerQuery("d = filter c by uid > 15;");
             myPig.registerQuery("store d into '/tmp/output3';");
 
             myPig.executeBatch();
@@ -190,8 +190,8 @@
                                 "using PigStorage(':') as (uname:chararray, passwd:chararray, uid:int,gid:int);");
             myPig.registerQuery("b = load 'file:test/org/apache/pig/test/data/passwd2' " +
                                 "using PigStorage(':') as (uname:chararray, passwd:chararray, uid:int,gid:int);");
-            myPig.registerQuery("c = filter a by uid > 500;");
-            myPig.registerQuery("d = filter b by uid > 1000;");
+            myPig.registerQuery("c = filter a by uid > 5;");
+            myPig.registerQuery("d = filter b by uid > 10;");
             myPig.registerQuery("store c into '/tmp/output1';");
             myPig.registerQuery("store d into '/tmp/output2';");
             myPig.registerQuery("e = cogroup c by uid, d by uid;");
@@ -225,8 +225,8 @@
                                 "using PigStorage(':') as (uname:chararray, passwd:chararray, uid:int,gid:int);");
             myPig.registerQuery("b = load 'file:test/org/apache/pig/test/data/passwd2' " +
                                 "using PigStorage(':') as (uname:chararray, passwd:chararray, uid:int,gid:int);");
-            myPig.registerQuery("c = filter a by uid > 500;");
-            myPig.registerQuery("d = filter b by uid > 1000;");
+            myPig.registerQuery("c = filter a by uid > 5;");
+            myPig.registerQuery("d = filter b by uid > 10;");
             myPig.registerQuery("store c into '/tmp/output1';");
             myPig.registerQuery("store d into '/tmp/output2';");
             myPig.registerQuery("e = cogroup c by uid, d by uid;");
@@ -252,7 +252,7 @@
 
             myPig.registerQuery("a = load 'file:test/org/apache/pig/test/data/passwd' " +
                                 "using PigStorage(':') as (uname:chararray, passwd:chararray, uid:int,gid:int);");
-            myPig.registerQuery("b = filter a by uid > 500;");
+            myPig.registerQuery("b = filter a by uid > 5;");
             myPig.registerQuery("group b by gid;");
 
             LogicalPlan lp = checkLogicalPlan(0, 0, 0);
@@ -279,7 +279,7 @@
 
             myPig.registerQuery("a = load 'file:test/org/apache/pig/test/data/passwd' " +
                                 "using PigStorage(':') as (uname:chararray, passwd:chararray, uid:int,gid:int);");
-            myPig.registerQuery("b = filter a by uid > 500;");
+            myPig.registerQuery("b = filter a by uid > 5;");
             myPig.registerQuery("group b by gid;");
 
             myPig.executeBatch();
@@ -299,7 +299,7 @@
         try {
             String script = "a = load 'file:test/org/apache/pig/test/data/passwd' "
                           + "using PigStorage(':') as (uname:chararray, passwd:chararray, uid:int,gid:int);"
-                          + "b = filter a by uid > 500;"
+                          + "b = filter a by uid > 5;"
                           + "explain b;"
                           + "store b into '/tmp/output1';\n";
             
@@ -324,7 +324,7 @@
         try {
             String script = "a = load 'file:test/org/apache/pig/test/data/passwd' "
                           + "using PigStorage(':') as (uname:chararray, passwd:chararray, uid:int,gid:int);"
-                          + "b = filter a by uid > 500;"
+                          + "b = filter a by uid > 5;"
                           + "dump b;"
                           + "store b into '/tmp/output1';\n";
             
@@ -349,7 +349,7 @@
         try {
             String script = "a = load 'file:test/org/apache/pig/test/data/passwd' "
                           + "using PigStorage(':') as (uname:chararray, passwd:chararray, uid:int,gid:int);"
-                          + "b = filter a by uid > 500;"
+                          + "b = filter a by uid > 5;"
                           + "describe b;"
                           + "store b into '/tmp/output1';\n";
             
@@ -374,7 +374,7 @@
         try {
             String script = "a = load 'file:test/org/apache/pig/test/data/passwd' "
                           + "using PigStorage(':') as (uname:chararray, passwd:chararray, uid:int,gid:int);"
-                          + "b = filter a by uid > 500;"
+                          + "b = filter a by uid > 5;"
                           + "illustrate b;"
                           + "store b into '/tmp/output1';\n";
             

Modified: hadoop/pig/branches/multiquery/test/org/apache/pig/test/TestStore.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/multiquery/test/org/apache/pig/test/TestStore.java?rev=750264&r1=750263&r2=750264&view=diff
==============================================================================
--- hadoop/pig/branches/multiquery/test/org/apache/pig/test/TestStore.java (original)
+++ hadoop/pig/branches/multiquery/test/org/apache/pig/test/TestStore.java Thu Mar  5 01:04:06 2009
@@ -39,6 +39,9 @@
 import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.io.FileLocalizer;
 import org.apache.pig.impl.io.FileSpec;
+import org.apache.pig.backend.local.executionengine.LocalPigLauncher;
+import org.apache.pig.backend.local.executionengine.LocalPOStoreImpl;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
@@ -67,30 +70,34 @@
         st.setSFile(fSpec);
         pc = new PigContext();
         pc.connect();
-        st.setPc(pc);
+        st.setStoreImpl(new LocalPOStoreImpl(pc));
         
         proj = GenPhyOp.exprProject();
         proj.setColumn(0);
         proj.setResultType(DataType.TUPLE);
         proj.setOverloaded(true);
         List<PhysicalOperator> inps = new ArrayList<PhysicalOperator>();
-        inps.add(proj);
-        st.setInputs(inps);
-        
     }
 
     @After
     public void tearDown() throws Exception {
     }
 
+    private boolean store() throws Exception {
+        PhysicalPlan pp = new PhysicalPlan();
+        pp.add(proj);
+        pp.add(st);
+        pp.connect(proj, st);
+        return new LocalPigLauncher().launchPig(pp, "TestStore", pc);
+    }
+
     @Test
-    public void testStore() throws ExecException, IOException {
+    public void testStore() throws Exception {
         inpDB = GenRandomData.genRandSmallTupDataBag(new Random(), 10, 100);
         Tuple t = new DefaultTuple();
         t.append(inpDB);
         proj.attachInput(t);
-        Result res = st.store();
-        assertEquals(POStatus.STATUS_EOP, res.returnStatus);
+        assertTrue(store());
         
         int size = 0;
         BufferedReader br = new BufferedReader(new FileReader("/tmp/storeTest.txt"));
@@ -112,13 +119,12 @@
     }
 
     @Test
-    public void testStoreComplexData() throws ExecException, IOException {
+    public void testStoreComplexData() throws Exception {
         inpDB = GenRandomData.genRandFullTupTextDataBag(new Random(), 10, 100);
         Tuple t = new DefaultTuple();
         t.append(inpDB);
         proj.attachInput(t);
-        Result res = st.store();
-        assertEquals(POStatus.STATUS_EOP, res.returnStatus);
+        assertTrue(store());
         PigStorage ps = new PigStorage(":");
         
         int size = 0;
@@ -144,15 +150,14 @@
     }
 
     @Test
-    public void testStoreComplexDataWithNull() throws ExecException, IOException {
+    public void testStoreComplexDataWithNull() throws Exception {
         Tuple inputTuple = GenRandomData.genRandSmallBagTextTupleWithNulls(new Random(), 10, 100);
         inpDB = DefaultBagFactory.getInstance().newDefaultBag();
         inpDB.add(inputTuple);
         Tuple t = new DefaultTuple();
         t.append(inpDB);
         proj.attachInput(t);
-        Result res = st.store();
-        assertEquals(POStatus.STATUS_EOP, res.returnStatus);
+        assertTrue(store());
         PigStorage ps = new PigStorage(":");
         
         int size = 0;

Modified: hadoop/pig/branches/multiquery/test/org/apache/pig/test/utils/GenPhyOp.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/multiquery/test/org/apache/pig/test/utils/GenPhyOp.java?rev=750264&r1=750263&r2=750264&view=diff
==============================================================================
--- hadoop/pig/branches/multiquery/test/org/apache/pig/test/utils/GenPhyOp.java (original)
+++ hadoop/pig/branches/multiquery/test/org/apache/pig/test/utils/GenPhyOp.java Thu Mar  5 01:04:06 2009
@@ -749,7 +749,6 @@
 
     public static POStore topStoreOp() {
         POStore ret = new POStore(new OperatorKey("", r.nextLong()));
-        ret.setPc(pc);
         ret.setSFile(new FileSpec("DummyFil", new FuncSpec("DummyLdr")));
         return ret;
     }