You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by rd...@apache.org on 2011/01/07 23:16:23 UTC

svn commit: r1056536 [2/2] - in /pig/trunk: ./ src/org/apache/pig/ src/org/apache/pig/scripting/ src/org/apache/pig/scripting/jython/ src/org/apache/pig/tools/pigstats/ test/org/apache/pig/test/

Modified: pig/trunk/src/org/apache/pig/tools/pigstats/PigStats.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/tools/pigstats/PigStats.java?rev=1056536&r1=1056535&r2=1056536&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/tools/pigstats/PigStats.java (original)
+++ pig/trunk/src/org/apache/pig/tools/pigstats/PigStats.java Fri Jan  7 22:16:22 2011
@@ -17,12 +17,9 @@
  */
 package org.apache.pig.tools.pigstats;
 
-import java.text.SimpleDateFormat;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Comparator;
-import java.util.Date;
-import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -30,30 +27,17 @@ import java.util.Properties;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapred.JobClient;
-import org.apache.hadoop.mapred.JobID;
-import org.apache.hadoop.mapred.jobcontrol.Job;
-import org.apache.pig.ExecType;
 import org.apache.pig.PigException;
 import org.apache.pig.PigRunner.ReturnCode;
-import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler;
-import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceOper;
-import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.NativeMapReduceOper;
-import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROpPlanVisitor;
-import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan;
 import org.apache.pig.classification.InterfaceAudience;
 import org.apache.pig.classification.InterfaceStability;
-import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.logicalLayer.FrontendException;
-import org.apache.pig.impl.plan.DependencyOrderWalker;
-import org.apache.pig.impl.plan.VisitorException;
 import org.apache.pig.impl.util.SpillableMemoryManager;
 import org.apache.pig.newplan.BaseOperatorPlan;
 import org.apache.pig.newplan.Operator;
 import org.apache.pig.newplan.OperatorPlan;
-import org.apache.pig.newplan.PlanVisitor;
 import org.apache.pig.tools.pigstats.JobStats.JobState;
+import org.apache.pig.tools.pigstats.SimplePigStats.JobGraphPrinter;
 
 /**
  * PigStats encapsulates the statistics collected from a running script. 
@@ -63,203 +47,45 @@ import org.apache.pig.tools.pigstats.Job
 
 @InterfaceAudience.Public
 @InterfaceStability.Evolving
-public final class PigStats {
+public abstract class PigStats {
     
     private static final Log LOG = LogFactory.getLog(PigStats.class);
     
-    private static final String DATE_FORMAT = "yyyy-MM-dd HH:mm:ss";  
-    
     private static ThreadLocal<PigStats> tps = new ThreadLocal<PigStats>();
     
-    private PigContext pigContext;
-    
-    private JobClient jobClient;
-    
-    private JobControlCompiler jcc;
-    
-    private JobGraph jobPlan;
-    
-    // map MR job id to MapReduceOper
-    private Map<String, MapReduceOper> jobMroMap;
-     
-    private Map<MapReduceOper, JobStats> mroJobMap;
-      
-    private long startTime = -1;
-    private long endTime = -1;
+    protected int returnCode = ReturnCode.UNKNOWN;
     
-    private String userId;
-    
-    private int returnCode = ReturnCode.UNKNOWN;
     private String errorMessage;
     private int errorCode = -1;
     
     public static PigStats get() {
-        if (tps.get() == null) tps.set(new PigStats());
-        return tps.get();
-    }
-        
-    static PigStats start() {
-        tps.set(new PigStats());
+        if (tps.get() == null) tps.set(new SimplePigStats());
         return tps.get();
     }
     
-    /**
-     * JobGraph is an {@link OperatorPlan} whose members are {@link JobStats}
-     */
-    public static class JobGraph extends BaseOperatorPlan implements Iterable<JobStats>{
-                
-        @Override
-        public String toString() {
-            JobGraphPrinter jp = new JobGraphPrinter(this);
-            try {
-                jp.visit();
-            } catch (FrontendException e) {
-                LOG.warn("unable to print job plan", e);
-            }
-            return jp.toString();
-        }
-        
-        public Iterator<JobStats> iterator() {
-            return new Iterator<JobStats>() {
-                private Iterator<Operator> iter = getOperators();                
-                @Override
-                public boolean hasNext() {                
-                    return iter.hasNext();
-                }
-                @Override
-                public JobStats next() {              
-                    return (JobStats)iter.next();
-                }
-                @Override
-                public void remove() {}
-            };
-        }
- 
-        boolean isConnected(Operator from, Operator to) {
-            List<Operator> succs = null;
-            succs = getSuccessors(from);
-            if (succs != null) {
-                for (Operator succ: succs) {
-                    if (succ.getName().equals(to.getName()) 
-                            || isConnected(succ, to)) {
-                        return true;
-                    }                    
-                }
-            }
-            return false;
-        }
-        
-        List<JobStats> getSuccessfulJobs() {
-            ArrayList<JobStats> lst = new ArrayList<JobStats>();
-            Iterator<JobStats> iter = iterator();
-            while (iter.hasNext()) {
-                JobStats js = iter.next();
-                if (js.getState() == JobState.SUCCESS) {
-                    lst.add(js);
-                }
-            }
-            Collections.sort(lst, new JobComparator());
-            return lst;
-        }
-        
-        List<JobStats> getFailedJobs() {
-            ArrayList<JobStats> lst = new ArrayList<JobStats>();
-            Iterator<JobStats> iter = iterator();
-            while (iter.hasNext()) {
-                JobStats js = iter.next();
-                if (js.getState() == JobState.FAILED) {
-                    lst.add(js);
-                }
-            }            
-            return lst;
-        }
-    }
-    
-    /**
-     * This class builds the job DAG from a MR plan
-     */
-    private class JobGraphBuilder extends MROpPlanVisitor {
-
-        public JobGraphBuilder(MROperPlan plan) {
-            super(plan, new DependencyOrderWalker<MapReduceOper, MROperPlan>(
-                    plan));
-            jobPlan = new JobGraph();
-            mroJobMap = new HashMap<MapReduceOper, JobStats>();        
-        }
-        
-        @Override
-        public void visitMROp(MapReduceOper mr) throws VisitorException {
-            JobStats js = new JobStats(
-                    mr.getOperatorKey().toString(), jobPlan);            
-            jobPlan.add(js);
-            List<MapReduceOper> preds = getPlan().getPredecessors(mr);
-            if (preds != null) {
-                for (MapReduceOper pred : preds) {
-                    JobStats jpred = mroJobMap.get(pred);
-                    if (!jobPlan.isConnected(jpred, js)) {
-                        jobPlan.connect(jpred, js);
-                    }
-                }
-            }
-            mroJobMap.put(mr, js);            
-        }        
+    static void set(PigStats stats) {
+        tps.set(stats);
     }
-    
-    /**
-     * This class prints a JobGraph
-     */
-    static class JobGraphPrinter extends PlanVisitor {
-        
-        StringBuffer buf;
-
-        protected JobGraphPrinter(OperatorPlan plan) {
-            super(plan,
-                    new org.apache.pig.newplan.DependencyOrderWalker(
-                            plan));
-            buf = new StringBuffer();
-        }
-        
-        public void visit(JobStats op) throws FrontendException {
-            buf.append(op.getJobId());
-            List<Operator> succs = plan.getSuccessors(op);
-            if (succs != null) {
-                buf.append("\t->\t");
-                for (Operator p : succs) {                  
-                    buf.append(((JobStats)p).getJobId()).append(",");
-                }               
-            }
-            buf.append("\n");
-        }
         
-        @Override
-        public String toString() {
-            buf.append("\n");
-            return buf.toString();
-        }        
-    }
-    
-    private static class JobComparator implements Comparator<JobStats> {
-        @Override
-        public int compare(JobStats o1, JobStats o2) {           
-            return o1.getJobId().compareTo(o2.getJobId());
-        }       
-    }
-    
-    public boolean isSuccessful() {
-        return (returnCode == ReturnCode.SUCCESS);
+    static PigStats start() {
+        tps.set(new SimplePigStats());
+        return tps.get();
     }
     
     /**
-     * Return codes are defined in {@link ReturnCode}
+     * Returns code are defined in {@link ReturnCode}
      */
     public int getReturnCode() {
         return returnCode;
     }
     
+    /**
+     * Returns error message string
+     */
     public String getErrorMessage() {
         return errorMessage;
     }
-
+    
     /**
      * Returns the error code of {@link PigException}
      */
@@ -267,156 +93,77 @@ public final class PigStats {
         return errorCode;
     }
     
+    public abstract boolean isEmbedded();
+    
+    public abstract boolean isSuccessful();
+ 
+    public abstract Map<String, List<PigStats>> getAllStats();
+    
+    public abstract List<String> getAllErrorMessages();       
+        
     /**
      * Returns the properties associated with the script
      */
-    public Properties getPigProperties() {
-        if (pigContext == null) return null;
-        return pigContext.getProperties();
-    }
+    public abstract Properties getPigProperties();
     
     /**
      * Returns the DAG of the MR jobs spawned by the script
      */
-    public JobGraph getJobGraph() {
-        return jobPlan;
-    }
+    public abstract JobGraph getJobGraph();
     
     /**
      * Returns the list of output locations in the script
      */
-    public List<String> getOutputLocations() {
-        ArrayList<String> locations = new ArrayList<String>();
-        for (OutputStats output : getOutputStats()) {
-            locations.add(output.getLocation());
-        }
-        return Collections.unmodifiableList(locations);
-    }
+    public abstract List<String> getOutputLocations();
     
     /**
      * Returns the list of output names in the script
      */
-    public List<String> getOutputNames() {
-        ArrayList<String> names = new ArrayList<String>();
-        for (OutputStats output : getOutputStats()) {            
-            names.add(output.getName());
-        }
-        return Collections.unmodifiableList(names);
-    }
+    public abstract List<String> getOutputNames();
     
     /**
      * Returns the number of bytes for the given output location,
      * -1 for invalid location or name.
      */
-    public long getNumberBytes(String location) {
-        if (location == null) return -1;
-        String name = new Path(location).getName();
-        long count = -1;
-        for (OutputStats output : getOutputStats()) {
-            if (name.equals(output.getName())) {
-                count = output.getBytes();
-                break;
-            }
-        }
-        return count;
-    }
+    public abstract long getNumberBytes(String location);
     
     /**
      * Returns the number of records for the given output location,
      * -1 for invalid location or name.
      */
-    public long getNumberRecords(String location) {
-        if (location == null) return -1;
-        String name = new Path(location).getName();
-        long count = -1;
-        for (OutputStats output : getOutputStats()) {
-            if (name.equals(output.getName())) {
-                count = output.getNumberRecords();
-                break;
-            }
-        }
-        return count;
-    }
+    public abstract long getNumberRecords(String location);
         
     /**
      * Returns the alias associated with this output location
      */
-    public String getOutputAlias(String location) {
-        if (location == null) return null;
-        String name = new Path(location).getName();
-        String alias = null;
-        for (OutputStats output : getOutputStats()) {
-            if (name.equals(output.getName())) {
-                alias = output.getAlias();
-                break;
-            }
-        }
-        return alias;
-    }
+    public abstract String getOutputAlias(String location);
     
     /**
      * Returns the total spill counts from {@link SpillableMemoryManager}.
      */
-    public long getSMMSpillCount() {
-        Iterator<JobStats> it = jobPlan.iterator();
-        long ret = 0;
-        while (it.hasNext()) {
-            ret += it.next().getSMMSpillCount();
-        }
-        return ret;
-    }
+    public abstract long getSMMSpillCount();
     
     /**
      * Returns the total number of bags that spilled proactively
      */
-    public long getProactiveSpillCountObjects() {
-        Iterator<JobStats> it = jobPlan.iterator();
-        long ret = 0;
-        while (it.hasNext()) {            
-            ret += it.next().getProactiveSpillCountObjects();
-        }
-        return ret;
-    }
+    public abstract long getProactiveSpillCountObjects();
     
     /**
      * Returns the total number of records that spilled proactively
      */
-    public long getProactiveSpillCountRecords() {
-        Iterator<JobStats> it = jobPlan.iterator();
-        long ret = 0;
-        while (it.hasNext()) {            
-            ret += it.next().getProactiveSpillCountRecs();
-        }
-        return ret;
-    }
+    public abstract long getProactiveSpillCountRecords();
     
     /**
      * Returns the total bytes written to user specified HDFS
      * locations of this script.
      */
-    public long getBytesWritten() {
-        Iterator<JobStats> it = jobPlan.iterator();
-        long ret = 0;
-        while (it.hasNext()) {
-            long n = it.next().getBytesWritten();
-            if (n > 0) ret += n;
-        }
-        return ret;
-    }
+    public abstract long getBytesWritten();
     
     /**
      * Returns the total number of records in user specified output
      * locations of this script.
      */
-    public long getRecordWritten() {
-        Iterator<JobStats> it = jobPlan.iterator();
-        long ret = 0;
-        while (it.hasNext()) {
-            long n = it.next().getRecordWrittern();
-            if (n > 0) ret += n;
-        }
-        return ret;
-    }
+    public abstract long getRecordWritten();
 
     public String getHadoopVersion() {
         return ScriptState.get().getHadoopVersion();
@@ -426,291 +173,110 @@ public final class PigStats {
         return ScriptState.get().getPigVersion();
     }
    
-    public String getScriptId() {
-        return ScriptState.get().getId();
-    }
+    public abstract String getScriptId();
     
-    public String getFeatures() {
-        return ScriptState.get().getScriptFeatures();
-    }
+    public abstract String getFeatures();
     
-    public long getDuration() {
-        return (startTime > 0 && endTime > 0) ? (endTime - startTime) : -1;
-    }
+    public abstract long getDuration();
     
     /**
      * Returns the number of MR jobs for this script
      */
-    public int getNumberJobs() {
-        return jobPlan.size();
-    }
-        
-    public List<OutputStats> getOutputStats() {
-        List<OutputStats> outputs = new ArrayList<OutputStats>();
-        Iterator<JobStats> iter = jobPlan.iterator();
-        while (iter.hasNext()) {
-            for (OutputStats os : iter.next().getOutputs()) {
-                outputs.add(os);
-            }
-        }        
-        return Collections.unmodifiableList(outputs);       
-    }
-    
-    public List<InputStats> getInputStats() {
-        List<InputStats> inputs = new ArrayList<InputStats>();
-        Iterator<JobStats> iter = jobPlan.iterator();
-        while (iter.hasNext()) {
-            for (InputStats is : iter.next().getInputs()) {
-                inputs.add(is);
-            }
-        }        
-        return Collections.unmodifiableList(inputs);       
-    }
-    
-    private PigStats() {        
-        jobMroMap = new HashMap<String, MapReduceOper>(); 
-        jobPlan = new JobGraph();
-    }
-    
-    void start(PigContext pigContext, JobClient jobClient, 
-            JobControlCompiler jcc, MROperPlan mrPlan) {
-        
-        if (pigContext == null || jobClient == null || jcc == null) {
-            LOG.warn("invalid params: " + pigContext + jobClient + jcc);
-            return;
-        }
+    public abstract int getNumberJobs();
         
-        this.pigContext = pigContext;
-        this.jobClient = jobClient;
-        this.jcc = jcc;         
-        
-        // build job DAG with job ids assigned to null 
-        try {
-            new JobGraphBuilder(mrPlan).visit();
-        } catch (VisitorException e) {
-            LOG.warn("unable to build job plan", e);
-        }
-        
-        startTime = System.currentTimeMillis();
-        userId = System.getProperty("user.name");
-    }
+    public abstract List<OutputStats> getOutputStats();
     
-    void stop() {
-        endTime = System.currentTimeMillis();
-        int m = getNumberSuccessfulJobs();
-        int n = getNumberFailedJobs();
- 
-        if (n == 0 && m > 0 && m == jobPlan.size()) {
-            returnCode = ReturnCode.SUCCESS;
-        } else if (m > 0 && m < jobPlan.size()) {
-            returnCode = ReturnCode.PARTIAL_FAILURE;
-        } else {
-            returnCode = ReturnCode.FAILURE;
-        }
-    }
-    
-    boolean isInitialized() {
-        return startTime > 0;
-    }
+    public abstract OutputStats result(String alias);
     
-    JobClient getJobClient() {
-        return jobClient;
-    }
+    public abstract List<InputStats> getInputStats();    
     
-    JobControlCompiler getJobControlCompiler() {
-        return jcc;
+    void setErrorMessage(String errorMessage) {
+        this.errorMessage = errorMessage;
     }
     
-    void setReturnCode(int returnCode) {
-        this.returnCode = returnCode; 
-    }
-        
-    @SuppressWarnings("deprecation")
-    JobStats addJobStats(Job job) {
-        MapReduceOper mro = null;
-        JobID jobId = job.getAssignedJobID();
-        if (jobId != null) {
-            mro = jobMroMap.get(jobId.toString());
-        } else {
-            mro = jobMroMap.get(job.toString());
-        }
-        if (mro == null) {
-            LOG.warn("unable to get MR oper for job: "
-                    + ((jobId == null) ? job.toString() : jobId.toString()));
-            return null;
+    void setErrorCode(int errorCode) {
+        this.errorCode = errorCode;
+    } 
+    /**
+     * JobGraph is an {@link OperatorPlan} whose members are {@link JobStats}
+     */
+    public static class JobGraph extends BaseOperatorPlan implements Iterable<JobStats>{
+                
+        @Override
+        public String toString() {
+            JobGraphPrinter jp = new JobGraphPrinter(this);
+            try {
+                jp.visit();
+            } catch (FrontendException e) {
+                LOG.warn("unable to print job plan", e);
+            }
+            return jp.toString();
         }
-        JobStats js = mroJobMap.get(mro);
-        
-        js.setAlias(mro);
-        js.setConf(job.getJobConf());
-        return js;
-    }
-    
-    @SuppressWarnings("deprecation")
-    public JobStats addJobStatsForNative(NativeMapReduceOper mr) {
-        JobStats js = mroJobMap.get(mr);
-        js.setId(new JobID(mr.getJobId(), NativeMapReduceOper.getJobNumber())); 
-        js.setAlias(mr);
         
-        return js;
-    }
-            
-    void display() {
-        if (returnCode == ReturnCode.UNKNOWN) {
-            LOG.warn("unknown return code, can't display the results");
-            return;
-        }
-        if (pigContext == null) {
-            LOG.warn("unknown exec type, don't display the results");
-            return;
+        public Iterator<JobStats> iterator() {
+            return new Iterator<JobStats>() {
+                private Iterator<Operator> iter = getOperators();                
+                @Override
+                public boolean hasNext() {                
+                    return iter.hasNext();
+                }
+                @Override
+                public JobStats next() {              
+                    return (JobStats)iter.next();
+                }
+                @Override
+                public void remove() {}
+            };
         }
  
-        // currently counters are not working in local mode - see PIG-1286
-        ExecType execType = pigContext.getExecType();
-        if (execType == ExecType.LOCAL) {
-            LOG.info("Detected Local mode. Stats reported below may be incomplete");
-        }
-        
-        SimpleDateFormat sdf = new SimpleDateFormat(DATE_FORMAT);
-        StringBuilder sb = new StringBuilder();
-        sb.append("\nHadoopVersion\tPigVersion\tUserId\tStartedAt\tFinishedAt\tFeatures\n");
-        sb.append(getHadoopVersion()).append("\t").append(getPigVersion()).append("\t")
-            .append(userId).append("\t")
-            .append(sdf.format(new Date(startTime))).append("\t")
-            .append(sdf.format(new Date(endTime))).append("\t")
-            .append(getFeatures()).append("\n");
-        sb.append("\n");
-        if (returnCode == ReturnCode.SUCCESS) {
-            sb.append("Success!\n");
-        } else if (returnCode == ReturnCode.PARTIAL_FAILURE) {
-            sb.append("Some jobs have failed! Stop running all dependent jobs\n");
-        } else {
-            sb.append("Failed!\n");
-        }
-        sb.append("\n");
-                
-        if (returnCode == ReturnCode.SUCCESS 
-                || returnCode == ReturnCode.PARTIAL_FAILURE) {            
-            sb.append("Job Stats (time in seconds):\n");
-            if (execType == ExecType.LOCAL) {
-                sb.append(JobStats.SUCCESS_HEADER_LOCAL).append("\n");
-            } else {
-                sb.append(JobStats.SUCCESS_HEADER).append("\n");
-            }
-            List<JobStats> arr = jobPlan.getSuccessfulJobs();
-            for (JobStats js : arr) {                
-                sb.append(js.getDisplayString(execType == ExecType.LOCAL));
-            }
-            sb.append("\n");
-        }
-        if (returnCode == ReturnCode.FAILURE
-                || returnCode == ReturnCode.PARTIAL_FAILURE) {
-            sb.append("Failed Jobs:\n");
-            sb.append(JobStats.FAILURE_HEADER).append("\n");
-            List<JobStats> arr = jobPlan.getFailedJobs();
-            for (JobStats js : arr) {   
-                sb.append(js.getDisplayString(execType == ExecType.LOCAL));
+        boolean isConnected(Operator from, Operator to) {
+            List<Operator> succs = null;
+            succs = getSuccessors(from);
+            if (succs != null) {
+                for (Operator succ: succs) {
+                    if (succ.getName().equals(to.getName()) 
+                            || isConnected(succ, to)) {
+                        return true;
+                    }                    
+                }
             }
-            sb.append("\n");
-        }
-        sb.append("Input(s):\n");
-        for (InputStats is : getInputStats()) {
-            sb.append(is.getDisplayString(execType == ExecType.LOCAL));
-        }
-        sb.append("\n");
-        sb.append("Output(s):\n");
-        for (OutputStats ds : getOutputStats()) {
-            sb.append(ds.getDisplayString(execType == ExecType.LOCAL));
-        }
-        
-        if (execType != ExecType.LOCAL) {
-            sb.append("\nCounters:\n");
-            sb.append("Total records written : " + getRecordWritten()).append("\n");
-            sb.append("Total bytes written : " + getBytesWritten()).append("\n");
-            sb.append("Spillable Memory Manager spill count : "
-                    + getSMMSpillCount()).append("\n");
-            sb.append("Total bags proactively spilled: " 
-                    + getProactiveSpillCountObjects()).append("\n");
-            sb.append("Total records proactively spilled: " 
-                    + getProactiveSpillCountRecords()).append("\n");
+            return false;
         }
         
-        sb.append("\nJob DAG:\n").append(jobPlan.toString());
-        
-        LOG.info("Script Statistics: \n" + sb.toString());
-    }
-    
-    @SuppressWarnings("deprecation")
-    void mapMROperToJob(MapReduceOper mro, Job job) {
-        if (mro == null) {
-            LOG.warn("null MR operator");
-        } else {
-            JobStats js = mroJobMap.get(mro);
-            if (js == null) {
-                LOG.warn("null job stats for mro: " + mro.getOperatorKey());
-            } else {
-                JobID id = job.getAssignedJobID();
-                js.setId(id);    
-                if (id != null) {
-                    jobMroMap.put(id.toString(), mro);
-                } else {
-                    jobMroMap.put(job.toString(), mro);
+        List<JobStats> getSuccessfulJobs() {
+            ArrayList<JobStats> lst = new ArrayList<JobStats>();
+            Iterator<JobStats> iter = iterator();
+            while (iter.hasNext()) {
+                JobStats js = iter.next();
+                if (js.getState() == JobState.SUCCESS) {
+                    lst.add(js);
                 }
             }
-        }
-    }
-
-    void setErrorMessage(String errorMessage) {
-        this.errorMessage = errorMessage;
-    }
-
-    void setErrorCode(int errorCode) {
-        this.errorCode = errorCode;
-    }    
-    
-    void setBackendException(Job job, Exception e) {
-        if (e instanceof PigException) {
-            LOG.error("ERROR " + ((PigException)e).getErrorCode() + ": " 
-                    + e.getLocalizedMessage());
-        } else if (e != null) {
-            LOG.error("ERROR: " + e.getLocalizedMessage());
+            Collections.sort(lst, new JobComparator());
+            return lst;
         }
         
-        if (job.getAssignedJobID() == null || e == null) {
-            LOG.debug("unable to set backend exception");
-            return;
-        }
-        String id = job.getAssignedJobID().toString();
-        Iterator<JobStats> iter = jobPlan.iterator();
-        while (iter.hasNext()) {
-            JobStats js = iter.next();
-            if (id.equals(js.getJobId())) {
-                js.setBackendException(e);
-                break;
-            }
+        List<JobStats> getFailedJobs() {
+            ArrayList<JobStats> lst = new ArrayList<JobStats>();
+            Iterator<JobStats> iter = iterator();
+            while (iter.hasNext()) {
+                JobStats js = iter.next();
+                if (js.getState() == JobState.FAILED) {
+                    lst.add(js);
+                }
+            }            
+            return lst;
         }
-    }
-    
-    PigContext getPigContext() {
-        return pigContext;
-    }
+    }    
     
-    int getNumberSuccessfulJobs() {
-        Iterator<JobStats> iter = jobPlan.iterator();
-        int count = 0;
-        while (iter.hasNext()) {
-            if (iter.next().getState() == JobState.SUCCESS) count++; 
-        }
-        return count;
-    }
+    private static class JobComparator implements Comparator<JobStats> {
+        @Override
+        public int compare(JobStats o1, JobStats o2) {           
+            return o1.getJobId().compareTo(o2.getJobId());
+        }       
+    }    
     
-    int getNumberFailedJobs() {
-        Iterator<JobStats> iter = jobPlan.iterator();
-        int count = 0;
-        while (iter.hasNext()) {
-            if (iter.next().getState() == JobState.FAILED) count++; 
-        }
-        return count;
+    void setReturnCode(int returnCode) {
+        this.returnCode = returnCode; 
     }
-    
 }

Modified: pig/trunk/src/org/apache/pig/tools/pigstats/PigStatsUtil.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/tools/pigstats/PigStatsUtil.java?rev=1056536&r1=1056535&r2=1056536&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/tools/pigstats/PigStatsUtil.java (original)
+++ pig/trunk/src/org/apache/pig/tools/pigstats/PigStatsUtil.java Fri Jan  7 22:16:22 2011
@@ -19,15 +19,13 @@
 package org.apache.pig.tools.pigstats;
 
 import java.io.IOException;
-import java.net.URI;
-import java.net.URISyntaxException;
+import java.util.List;
 import java.util.Map;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapred.Counters;
 import org.apache.hadoop.mapred.JobClient;
 import org.apache.hadoop.mapred.RunningJob;
@@ -162,7 +160,7 @@ public abstract class PigStatsUtil {
      */
     public static void startCollection(PigContext pc, JobClient client, 
             JobControlCompiler jcc, MROperPlan plan) {
-        PigStats ps = PigStats.start();
+        SimplePigStats ps = (SimplePigStats)PigStats.start();
         ps.start(pc, client, jcc, plan);
         
         ScriptState.get().emitLaunchStartedNotification(plan.size());
@@ -175,7 +173,7 @@ public abstract class PigStatsUtil {
      *      file at INFO level 
      */
     public static void stopCollection(boolean display) {
-        PigStats ps = PigStats.get();
+        SimplePigStats ps = (SimplePigStats)PigStats.get();
         ps.stop();
         if (!ps.isSuccessful()) {
             LOG.error(ps.getNumberFailedJobs() + " map reduce job(s) failed!");
@@ -214,7 +212,7 @@ public abstract class PigStatsUtil {
      * Logs the statistics in the Pig log file at INFO level
      */
     public static void displayStatistics() {
-        PigStats.get().display();
+        ((SimplePigStats)PigStats.get()).display();
     }
     
     /**
@@ -226,7 +224,7 @@ public abstract class PigStatsUtil {
      * @param jobMroMap the map that maps {@link Job}s to {@link MapReduceOper}s
      */
     public static void updateJobMroMap(Map<Job, MapReduceOper> jobMroMap) {
-        PigStats ps = PigStats.get();
+        SimplePigStats ps = (SimplePigStats)PigStats.get();
         for (Map.Entry<Job, MapReduceOper> entry : jobMroMap.entrySet()) {
             MapReduceOper mro = entry.getValue();
             ps.mapMROperToJob(mro, entry.getKey());
@@ -239,7 +237,7 @@ public abstract class PigStatsUtil {
      * @param jc the job control
      */
     public static void accumulateStats(JobControl jc) {
-        PigStats ps = PigStats.get();
+        SimplePigStats ps = (SimplePigStats)PigStats.get();
         ScriptState ss = ScriptState.get();
         
         for (Job job : jc.getSuccessfulJobs()) {            
@@ -269,7 +267,7 @@ public abstract class PigStatsUtil {
     }
     
     public static void setBackendException(Job job, Exception e) {
-        PigStats.get().setBackendException(job, e);
+        ((SimplePigStats)PigStats.get()).setBackendException(job, e);
     }
     
     private static Pattern pattern = Pattern.compile("tmp(-)?[\\d]{1,10}$");
@@ -279,7 +277,7 @@ public abstract class PigStatsUtil {
         return result.find();
     }
     
-    private static JobStats addFailedJobStats(PigStats ps, Job job) {
+    private static JobStats addFailedJobStats(SimplePigStats ps, Job job) {
         JobStats js = ps.addJobStats(job);
         if (js == null) {
             LOG.warn("unable to add failed job stats");            
@@ -296,9 +294,17 @@ public abstract class PigStatsUtil {
         return addNativeJobStats(ps, mr, success, null);
     }
     
+    public static void setStatsMap(Map<String, List<PigStats>> statsMap) {
+        EmbeddedPigStats stats = new EmbeddedPigStats(statsMap);
+        PigStats.set(stats);
+    }
+    
     public static JobStats addNativeJobStats(PigStats ps, NativeMapReduceOper mr,
             boolean success, Exception e) {
-        JobStats js = ps.addJobStatsForNative(mr);
+        if (ps.isEmbedded()) {
+            throw new IllegalArgumentException();
+        }
+        JobStats js = ((SimplePigStats)ps).addJobStatsForNative(mr);
         if(js == null) {
             LOG.warn("unable to add native job stats");
         } else {
@@ -309,7 +315,7 @@ public abstract class PigStatsUtil {
         return js;
     }    
     
-    private static JobStats accumulateSuccessStatistics(PigStats ps, Job job) {
+    private static JobStats accumulateSuccessStatistics(SimplePigStats ps, Job job) {
         JobStats js = ps.addJobStats(job);
         if (js == null) {
             LOG.warn("unable to add job stats");

Modified: pig/trunk/src/org/apache/pig/tools/pigstats/ScriptState.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/tools/pigstats/ScriptState.java?rev=1056536&r1=1056535&r2=1056536&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/tools/pigstats/ScriptState.java (original)
+++ pig/trunk/src/org/apache/pig/tools/pigstats/ScriptState.java Fri Jan  7 22:16:22 2011
@@ -65,6 +65,7 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStream;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POUnion;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;
+import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.logicalLayer.LOCogroup;
 import org.apache.pig.impl.logicalLayer.LOCross;
 import org.apache.pig.impl.logicalLayer.LODistinct;
@@ -177,15 +178,18 @@ public class ScriptState {
     
     private long scriptFeatures;
     
+    private PigContext pigContext;
+    
     private Map<MapReduceOper, String> featureMap = null;
     private Map<MapReduceOper, String> aliasMap = null;
     
     private List<PigProgressNotificationListener> listeners
             = new ArrayList<PigProgressNotificationListener>();
     
-    public static ScriptState start(String commandLine) {
+    public static ScriptState start(String commandLine, PigContext pigContext) {
         ScriptState ss = new ScriptState(UUID.randomUUID().toString());
         ss.setCommandLine(commandLine);
+        ss.setPigContext(pigContext);
         tss.set(ss);
         return ss;
     }
@@ -197,7 +201,7 @@ public class ScriptState {
 
     public static ScriptState get() {
         if (tss.get() == null) {
-            ScriptState.start("");
+            ScriptState.start("", null);
         }
         return tss.get();
     }           
@@ -205,52 +209,56 @@ public class ScriptState {
     public void registerListener(PigProgressNotificationListener listener) {
         listeners.add(listener);
     }
+    
+    public List<PigProgressNotificationListener> getAllListeners() {
+        return listeners;
+    }
         
     public void emitLaunchStartedNotification(int numJobsToLaunch) {
         for (PigProgressNotificationListener listener: listeners) {
-            listener.launchStartedNotification(numJobsToLaunch);
+            listener.launchStartedNotification(id, numJobsToLaunch);
         }
     }
     
-    public void emitJobsSubmittedNotification(int numJobsSubmitted) {
+    public void emitJobsSubmittedNotification(int numJobsSubmitted) {        
         for (PigProgressNotificationListener listener: listeners) {
-            listener.jobsSubmittedNotification(numJobsSubmitted);
+            listener.jobsSubmittedNotification(id, numJobsSubmitted);
         }        
     }
     
     public void emitJobStartedNotification(String assignedJobId) {
         for (PigProgressNotificationListener listener: listeners) {
-            listener.jobStartedNotification(assignedJobId);
+            listener.jobStartedNotification(id, assignedJobId);
         }
     }
     
     public void emitjobFinishedNotification(JobStats jobStats) {
         for (PigProgressNotificationListener listener: listeners) {
-            listener.jobFinishedNotification(jobStats);
+            listener.jobFinishedNotification(id, jobStats);
         }
     }
     
     public void emitJobFailedNotification(JobStats jobStats) {
         for (PigProgressNotificationListener listener: listeners) {
-            listener.jobFailedNotification(jobStats);
+            listener.jobFailedNotification(id, jobStats);
         }
     }
     
     public void emitOutputCompletedNotification(OutputStats outputStats) {
         for (PigProgressNotificationListener listener: listeners) {
-            listener.outputCompletedNotification(outputStats);
+            listener.outputCompletedNotification(id, outputStats);
         }
     }
     
     public void emitProgressUpdatedNotification(int progress) {
         for (PigProgressNotificationListener listener: listeners) {
-            listener.progressUpdatedNotification(progress);
+            listener.progressUpdatedNotification(id, progress);
         }
     }
     
     public void emitLaunchCompletedNotification(int numJobsSucceeded) {
         for (PigProgressNotificationListener listener: listeners) {
-            listener.launchCompletedNotification(numJobsSucceeded);
+            listener.launchCompletedNotification(id, numJobsSucceeded);
         }
     }
     
@@ -536,6 +544,14 @@ public class ScriptState {
         return sb.toString();
     }
     
+    public void setPigContext(PigContext pigContext) {
+        this.pigContext = pigContext;
+    }
+
+    public PigContext getPigContext() {
+        return pigContext;
+    }
+
     private static class FeatureVisitor extends PhyPlanVisitor {
         private BitSet feature;
         
@@ -588,7 +604,7 @@ public class ScriptState {
         }        
     }    
     
-    public static class LogicalPlanFeatureVisitor extends LOVisitor {
+    static class LogicalPlanFeatureVisitor extends LOVisitor {
         
         private BitSet feature;
         

Added: pig/trunk/src/org/apache/pig/tools/pigstats/SimplePigStats.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/tools/pigstats/SimplePigStats.java?rev=1056536&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/tools/pigstats/SimplePigStats.java (added)
+++ pig/trunk/src/org/apache/pig/tools/pigstats/SimplePigStats.java Fri Jan  7 22:16:22 2011
@@ -0,0 +1,587 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.tools.pigstats;
+
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobID;
+import org.apache.hadoop.mapred.jobcontrol.Job;
+import org.apache.pig.ExecType;
+import org.apache.pig.PigException;
+import org.apache.pig.PigRunner.ReturnCode;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceOper;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.NativeMapReduceOper;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROpPlanVisitor;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.impl.plan.DependencyOrderWalker;
+import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.newplan.Operator;
+import org.apache.pig.newplan.OperatorPlan;
+import org.apache.pig.newplan.PlanVisitor;
+import org.apache.pig.tools.pigstats.JobStats.JobState;
+
+/**
+ * SimplePigStats encapsulates the statistics collected from a running script. 
+ * It includes status of the execution, the DAG of its MR jobs, as well as 
+ * information about outputs and inputs of the script. 
+ */
+final class SimplePigStats extends PigStats {
+    
+    private static final Log LOG = LogFactory.getLog(SimplePigStats.class);
+    
+    private static final String DATE_FORMAT = "yyyy-MM-dd HH:mm:ss";  
+        
+    private PigContext pigContext;
+    
+    private JobClient jobClient;
+    
+    private JobControlCompiler jcc;
+    
+    private JobGraph jobPlan;
+    
+    // map MR job id to MapReduceOper
+    private Map<String, MapReduceOper> jobMroMap;
+     
+    private Map<MapReduceOper, JobStats> mroJobMap;
+    
+    private Map<String, OutputStats> aliasOuputMap;
+      
+    private long startTime = -1;
+    private long endTime = -1;
+    
+    private String userId;
+                 
+    /**
+     * This class builds the job DAG from a MR plan
+     */
+    private class JobGraphBuilder extends MROpPlanVisitor {
+
+        public JobGraphBuilder(MROperPlan plan) {
+            super(plan, new DependencyOrderWalker<MapReduceOper, MROperPlan>(
+                    plan));
+            jobPlan = new JobGraph();
+            mroJobMap = new HashMap<MapReduceOper, JobStats>();        
+        }
+        
+        @Override
+        public void visitMROp(MapReduceOper mr) throws VisitorException {
+            JobStats js = new JobStats(
+                    mr.getOperatorKey().toString(), jobPlan);            
+            jobPlan.add(js);
+            List<MapReduceOper> preds = getPlan().getPredecessors(mr);
+            if (preds != null) {
+                for (MapReduceOper pred : preds) {
+                    JobStats jpred = mroJobMap.get(pred);
+                    if (!jobPlan.isConnected(jpred, js)) {
+                        jobPlan.connect(jpred, js);
+                    }
+                }
+            }
+            mroJobMap.put(mr, js);            
+        }        
+    }
+    
+    /**
+     * This class prints a JobGraph
+     */
+    static class JobGraphPrinter extends PlanVisitor {
+        
+        StringBuffer buf;
+
+        protected JobGraphPrinter(OperatorPlan plan) {
+            super(plan,
+                    new org.apache.pig.newplan.DependencyOrderWalker(
+                            plan));
+            buf = new StringBuffer();
+        }
+        
+        public void visit(JobStats op) throws FrontendException {
+            buf.append(op.getJobId());
+            List<Operator> succs = plan.getSuccessors(op);
+            if (succs != null) {
+                buf.append("\t->\t");
+                for (Operator p : succs) {                  
+                    buf.append(((JobStats)p).getJobId()).append(",");
+                }               
+            }
+            buf.append("\n");
+        }
+        
+        @Override
+        public String toString() {
+            buf.append("\n");
+            return buf.toString();
+        }        
+    }
+    
+    @Override
+    public List<String> getAllErrorMessages() {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public Map<String, List<PigStats>> getAllStats() {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public boolean isEmbedded() {
+        return false;
+    }
+    
+    @Override
+    public boolean isSuccessful() {
+        return (returnCode == ReturnCode.SUCCESS);
+    }
+ 
+    @Override
+    public Properties getPigProperties() {
+        if (pigContext == null) return null;
+        return pigContext.getProperties();
+    }
+
+    @Override
+    public JobGraph getJobGraph() {
+        return jobPlan;
+    }
+ 
+    @Override
+    public List<String> getOutputLocations() {
+        ArrayList<String> locations = new ArrayList<String>();
+        for (OutputStats output : getOutputStats()) {
+            locations.add(output.getLocation());
+        }
+        return Collections.unmodifiableList(locations);
+    }
+ 
+    @Override
+    public List<String> getOutputNames() {
+        ArrayList<String> names = new ArrayList<String>();
+        for (OutputStats output : getOutputStats()) {            
+            names.add(output.getName());
+        }
+        return Collections.unmodifiableList(names);
+    }
+ 
+    @Override
+    public long getNumberBytes(String location) {
+        if (location == null) return -1;
+        String name = new Path(location).getName();
+        long count = -1;
+        for (OutputStats output : getOutputStats()) {
+            if (name.equals(output.getName())) {
+                count = output.getBytes();
+                break;
+            }
+        }
+        return count;
+    }
+
+    @Override
+    public long getNumberRecords(String location) {
+        if (location == null) return -1;
+        String name = new Path(location).getName();
+        long count = -1;
+        for (OutputStats output : getOutputStats()) {
+            if (name.equals(output.getName())) {
+                count = output.getNumberRecords();
+                break;
+            }
+        }
+        return count;
+    }
+ 
+    @Override
+    public String getOutputAlias(String location) {
+        if (location == null) return null;
+        String name = new Path(location).getName();
+        String alias = null;
+        for (OutputStats output : getOutputStats()) {
+            if (name.equals(output.getName())) {
+                alias = output.getAlias();
+                break;
+            }
+        }
+        return alias;
+    }
+
+    @Override
+    public long getSMMSpillCount() {
+        Iterator<JobStats> it = jobPlan.iterator();
+        long ret = 0;
+        while (it.hasNext()) {
+            ret += it.next().getSMMSpillCount();
+        }
+        return ret;
+    }
+
+    @Override
+    public long getProactiveSpillCountObjects() {
+        Iterator<JobStats> it = jobPlan.iterator();
+        long ret = 0;
+        while (it.hasNext()) {            
+            ret += it.next().getProactiveSpillCountObjects();
+        }
+        return ret;
+    }
+    
+    @Override
+    public long getProactiveSpillCountRecords() {
+        Iterator<JobStats> it = jobPlan.iterator();
+        long ret = 0;
+        while (it.hasNext()) {            
+            ret += it.next().getProactiveSpillCountRecs();
+        }
+        return ret;
+    }
+    
+    @Override
+    public long getBytesWritten() {
+        Iterator<JobStats> it = jobPlan.iterator();
+        long ret = 0;
+        while (it.hasNext()) {
+            long n = it.next().getBytesWritten();
+            if (n > 0) ret += n;
+        }
+        return ret;
+    }
+    
+    @Override
+    public long getRecordWritten() {
+        Iterator<JobStats> it = jobPlan.iterator();
+        long ret = 0;
+        while (it.hasNext()) {
+            long n = it.next().getRecordWrittern();
+            if (n > 0) ret += n;
+        }
+        return ret;
+    }
+   
+    @Override
+    public String getScriptId() {
+        return ScriptState.get().getId();
+    }
+    
+    @Override
+    public String getFeatures() {
+        return ScriptState.get().getScriptFeatures();
+    }
+    
+    @Override
+    public long getDuration() {
+        return (startTime > 0 && endTime > 0) ? (endTime - startTime) : -1;
+    }
+    
+    @Override
+    public int getNumberJobs() {
+        return jobPlan.size();
+    }
+        
+    @Override
+    public List<OutputStats> getOutputStats() {
+        List<OutputStats> outputs = new ArrayList<OutputStats>();
+        Iterator<JobStats> iter = jobPlan.iterator();
+        while (iter.hasNext()) {
+            for (OutputStats os : iter.next().getOutputs()) {
+                outputs.add(os);
+            }
+        }        
+        return Collections.unmodifiableList(outputs);       
+    }
+    
+    @Override
+    public OutputStats result(String alias) {
+        if (aliasOuputMap == null) {
+            aliasOuputMap = new HashMap<String, OutputStats>();
+            Iterator<JobStats> iter = jobPlan.iterator();
+            while (iter.hasNext()) {
+                for (OutputStats os : iter.next().getOutputs()) {
+                    String a = os.getAlias();
+                    if (a == null || a.length() == 0) {
+                        LOG.warn("Output alias isn't avalable for " + os.getLocation());
+                        continue;
+                    }
+                    aliasOuputMap.put(a, os);
+                }
+            }    
+        }
+        return aliasOuputMap.get(alias);
+    }
+    
+    @Override
+    public List<InputStats> getInputStats() {
+        List<InputStats> inputs = new ArrayList<InputStats>();
+        Iterator<JobStats> iter = jobPlan.iterator();
+        while (iter.hasNext()) {
+            for (InputStats is : iter.next().getInputs()) {
+                inputs.add(is);
+            }
+        }        
+        return Collections.unmodifiableList(inputs);       
+    }
+    
+    SimplePigStats() {        
+        jobMroMap = new HashMap<String, MapReduceOper>(); 
+        jobPlan = new JobGraph();
+    }
+    
+    void start(PigContext pigContext, JobClient jobClient, 
+            JobControlCompiler jcc, MROperPlan mrPlan) {
+        
+        if (pigContext == null || jobClient == null || jcc == null) {
+            LOG.warn("invalid params: " + pigContext + jobClient + jcc);
+            return;
+        }
+        
+        this.pigContext = pigContext;
+        this.jobClient = jobClient;
+        this.jcc = jcc;         
+        
+        // build job DAG with job ids assigned to null 
+        try {
+            new JobGraphBuilder(mrPlan).visit();
+        } catch (VisitorException e) {
+            LOG.warn("unable to build job plan", e);
+        }
+        
+        startTime = System.currentTimeMillis();
+        userId = System.getProperty("user.name");
+    }
+    
+    void stop() {
+        endTime = System.currentTimeMillis();
+        int m = getNumberSuccessfulJobs();
+        int n = getNumberFailedJobs();
+ 
+        if (n == 0 && m > 0 && m == jobPlan.size()) {
+            returnCode = ReturnCode.SUCCESS;
+        } else if (m > 0 && m < jobPlan.size()) {
+            returnCode = ReturnCode.PARTIAL_FAILURE;
+        } else {
+            returnCode = ReturnCode.FAILURE;
+        }
+    }
+    
+    boolean isInitialized() {
+        return startTime > 0;
+    }
+    
+    JobClient getJobClient() {
+        return jobClient;
+    }
+    
+    JobControlCompiler getJobControlCompiler() {
+        return jcc;
+    }
+        
+    @SuppressWarnings("deprecation")
+    JobStats addJobStats(Job job) {
+        MapReduceOper mro = null;
+        JobID jobId = job.getAssignedJobID();
+        if (jobId != null) {
+            mro = jobMroMap.get(jobId.toString());
+        } else {
+            mro = jobMroMap.get(job.toString());
+        }
+        if (mro == null) {
+            LOG.warn("unable to get MR oper for job: "
+                    + ((jobId == null) ? job.toString() : jobId.toString()));
+            return null;
+        }
+        JobStats js = mroJobMap.get(mro);
+        
+        js.setAlias(mro);
+        js.setConf(job.getJobConf());
+        return js;
+    }
+    
+    @SuppressWarnings("deprecation")
+    public JobStats addJobStatsForNative(NativeMapReduceOper mr) {
+        JobStats js = mroJobMap.get(mr);
+        js.setId(new JobID(mr.getJobId(), NativeMapReduceOper.getJobNumber())); 
+        js.setAlias(mr);
+        
+        return js;
+    }
+            
+    void display() {
+        if (returnCode == ReturnCode.UNKNOWN) {
+            LOG.warn("unknown return code, can't display the results");
+            return;
+        }
+        if (pigContext == null) {
+            LOG.warn("unknown exec type, don't display the results");
+            return;
+        }
+ 
+        // currently counters are not working in local mode - see PIG-1286
+        ExecType execType = pigContext.getExecType();
+        if (execType == ExecType.LOCAL) {
+            LOG.info("Detected Local mode. Stats reported below may be incomplete");
+        }
+        
+        SimpleDateFormat sdf = new SimpleDateFormat(DATE_FORMAT);
+        StringBuilder sb = new StringBuilder();
+        sb.append("\nHadoopVersion\tPigVersion\tUserId\tStartedAt\tFinishedAt\tFeatures\n");
+        sb.append(getHadoopVersion()).append("\t").append(getPigVersion()).append("\t")
+            .append(userId).append("\t")
+            .append(sdf.format(new Date(startTime))).append("\t")
+            .append(sdf.format(new Date(endTime))).append("\t")
+            .append(getFeatures()).append("\n");
+        sb.append("\n");
+        if (returnCode == ReturnCode.SUCCESS) {
+            sb.append("Success!\n");
+        } else if (returnCode == ReturnCode.PARTIAL_FAILURE) {
+            sb.append("Some jobs have failed! Stop running all dependent jobs\n");
+        } else {
+            sb.append("Failed!\n");
+        }
+        sb.append("\n");
+                
+        if (returnCode == ReturnCode.SUCCESS 
+                || returnCode == ReturnCode.PARTIAL_FAILURE) {            
+            sb.append("Job Stats (time in seconds):\n");
+            if (execType == ExecType.LOCAL) {
+                sb.append(JobStats.SUCCESS_HEADER_LOCAL).append("\n");
+            } else {
+                sb.append(JobStats.SUCCESS_HEADER).append("\n");
+            }
+            List<JobStats> arr = jobPlan.getSuccessfulJobs();
+            for (JobStats js : arr) {                
+                sb.append(js.getDisplayString(execType == ExecType.LOCAL));
+            }
+            sb.append("\n");
+        }
+        if (returnCode == ReturnCode.FAILURE
+                || returnCode == ReturnCode.PARTIAL_FAILURE) {
+            sb.append("Failed Jobs:\n");
+            sb.append(JobStats.FAILURE_HEADER).append("\n");
+            List<JobStats> arr = jobPlan.getFailedJobs();
+            for (JobStats js : arr) {   
+                sb.append(js.getDisplayString(execType == ExecType.LOCAL));
+            }
+            sb.append("\n");
+        }
+        sb.append("Input(s):\n");
+        for (InputStats is : getInputStats()) {
+            sb.append(is.getDisplayString(execType == ExecType.LOCAL));
+        }
+        sb.append("\n");
+        sb.append("Output(s):\n");
+        for (OutputStats ds : getOutputStats()) {
+            sb.append(ds.getDisplayString(execType == ExecType.LOCAL));
+        }
+        
+        if (execType != ExecType.LOCAL) {
+            sb.append("\nCounters:\n");
+            sb.append("Total records written : " + getRecordWritten()).append("\n");
+            sb.append("Total bytes written : " + getBytesWritten()).append("\n");
+            sb.append("Spillable Memory Manager spill count : "
+                    + getSMMSpillCount()).append("\n");
+            sb.append("Total bags proactively spilled: " 
+                    + getProactiveSpillCountObjects()).append("\n");
+            sb.append("Total records proactively spilled: " 
+                    + getProactiveSpillCountRecords()).append("\n");
+        }
+        
+        sb.append("\nJob DAG:\n").append(jobPlan.toString());
+        
+        LOG.info("Script Statistics: \n" + sb.toString());
+    }
+    
+    @SuppressWarnings("deprecation")
+    void mapMROperToJob(MapReduceOper mro, Job job) {
+        if (mro == null) {
+            LOG.warn("null MR operator");
+        } else {
+            JobStats js = mroJobMap.get(mro);
+            if (js == null) {
+                LOG.warn("null job stats for mro: " + mro.getOperatorKey());
+            } else {
+                JobID id = job.getAssignedJobID();
+                js.setId(id);    
+                if (id != null) {
+                    jobMroMap.put(id.toString(), mro);
+                } else {
+                    jobMroMap.put(job.toString(), mro);
+                }
+            }
+        }
+    }   
+    
+    void setBackendException(Job job, Exception e) {
+        if (e instanceof PigException) {
+            LOG.error("ERROR " + ((PigException)e).getErrorCode() + ": " 
+                    + e.getLocalizedMessage());
+        } else if (e != null) {
+            LOG.error("ERROR: " + e.getLocalizedMessage());
+        }
+        
+        if (job.getAssignedJobID() == null || e == null) {
+            LOG.debug("unable to set backend exception");
+            return;
+        }
+        String id = job.getAssignedJobID().toString();
+        Iterator<JobStats> iter = jobPlan.iterator();
+        while (iter.hasNext()) {
+            JobStats js = iter.next();
+            if (id.equals(js.getJobId())) {
+                js.setBackendException(e);
+                break;
+            }
+        }
+    }
+    
+    PigContext getPigContext() {
+        return pigContext;
+    }
+    
+    int getNumberSuccessfulJobs() {
+        Iterator<JobStats> iter = jobPlan.iterator();
+        int count = 0;
+        while (iter.hasNext()) {
+            if (iter.next().getState() == JobState.SUCCESS) count++; 
+        }
+        return count;
+    }
+    
+    int getNumberFailedJobs() {
+        Iterator<JobStats> iter = jobPlan.iterator();
+        int count = 0;
+        while (iter.hasNext()) {
+            if (iter.next().getState() == JobState.FAILED) count++; 
+        }
+        return count;
+    }
+    
+}

Modified: pig/trunk/test/org/apache/pig/test/TestPigRunner.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestPigRunner.java?rev=1056536&r1=1056535&r2=1056536&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestPigRunner.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestPigRunner.java Fri Jan  7 22:16:22 2011
@@ -24,8 +24,10 @@ import static org.junit.Assert.assertTru
 import java.io.File;
 import java.io.FileWriter;
 import java.io.PrintWriter;
+import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
 import java.util.Properties;
 
 import junit.framework.Assert;
@@ -507,60 +509,68 @@ public class TestPigRunner {
         }
     }
 
-    private static class TestNotificationListener implements PigProgressNotificationListener {
+    public static class TestNotificationListener implements PigProgressNotificationListener {
         
-        private int numJobsToLaunch = 0;
-        private int numJobsSubmitted = 0;
-        private int numJobStarted = 0;
-        private int numJobFinished = 0;
+        private Map<String, int[]> numMap = new HashMap<String, int[]>();
+        
+        private static final int JobsToLaunch = 0;
+        private static final int JobsSubmitted = 1;
+        private static final int JobStarted = 2;
+        private static final int JobFinished = 3;
         
         @Override
-        public void launchStartedNotification(int numJobsToLaunch) {
-            System.out.println("++++ numJobsToLaunch: " + numJobsToLaunch);  
-            this.numJobsToLaunch = numJobsToLaunch;
+        public void launchStartedNotification(String id, int numJobsToLaunch) {            
+            System.out.println("id: " + id + " numJobsToLaunch: " + numJobsToLaunch);  
+            int[] nums = new int[4];
+            numMap.put(id, nums);
+            nums[JobsToLaunch] = numJobsToLaunch;
         }
 
         @Override
-        public void jobFailedNotification(JobStats jobStats) {
-            System.out.println("++++ job failed: " + jobStats.getJobId());           
+        public void jobFailedNotification(String id, JobStats jobStats) {
+            System.out.println("id: " + id + " job failed: " + jobStats.getJobId());           
         }
 
         @Override
-        public void jobFinishedNotification(JobStats jobStats) {
-            System.out.println("++++ job finished: " + jobStats.getJobId());  
-            numJobFinished++;            
+        public void jobFinishedNotification(String id, JobStats jobStats) {
+            System.out.println("id: " + id + " job finished: " + jobStats.getJobId()); 
+            int[] nums = numMap.get(id);
+            nums[JobFinished]++;            
         }
 
         @Override
-        public void jobStartedNotification(String assignedJobId) {
-            System.out.println("++++ job started: " + assignedJobId);   
-            numJobStarted++;
+        public void jobStartedNotification(String id, String assignedJobId) {
+            System.out.println("id: " + id + " job started: " + assignedJobId);   
+            int[] nums = numMap.get(id);
+            nums[JobStarted]++;
         }
 
         @Override
-        public void jobsSubmittedNotification(int numJobsSubmitted) {
-            System.out.println("++++ jobs submitted: " + numJobsSubmitted);
-            this.numJobsSubmitted += numJobsSubmitted;
+        public void jobsSubmittedNotification(String id, int numJobsSubmitted) {
+            System.out.println("id: " + id + " jobs submitted: " + numJobsSubmitted);
+            int[] nums = numMap.get(id);
+            nums[JobsSubmitted] += numJobsSubmitted;
         }
 
         @Override
-        public void launchCompletedNotification(int numJobsSucceeded) {
-            System.out.println("++++ numJobsSucceeded: " + numJobsSucceeded);   
+        public void launchCompletedNotification(String id, int numJobsSucceeded) {
+            System.out.println("id: " + id + " numJobsSucceeded: " + numJobsSucceeded);   
             System.out.println("");
-            assertEquals(this.numJobsToLaunch, numJobsSucceeded);
-            assertEquals(this.numJobsSubmitted, numJobsSucceeded);
-            assertEquals(this.numJobStarted, numJobsSucceeded);
-            assertEquals(this.numJobFinished, numJobsSucceeded);
+            int[] nums = numMap.get(id);
+            assertEquals(nums[JobsToLaunch], numJobsSucceeded);
+            assertEquals(nums[JobsSubmitted], numJobsSucceeded);
+            assertEquals(nums[JobStarted], numJobsSucceeded);
+            assertEquals(nums[JobFinished], numJobsSucceeded);
         }
 
         @Override
-        public void outputCompletedNotification(OutputStats outputStats) {
-            System.out.println("++++ output done: " + outputStats.getLocation());
+        public void outputCompletedNotification(String id, OutputStats outputStats) {
+            System.out.println("id: " + id + " output done: " + outputStats.getLocation());
         }
 
         @Override
-        public void progressUpdatedNotification(int progress) {
-            System.out.println("++++ progress: " + progress + "%");           
+        public void progressUpdatedNotification(String id, int progress) {
+            System.out.println("id: " + id + " progress: " + progress + "%");           
         }
         
     }

Added: pig/trunk/test/org/apache/pig/test/TestScriptLanguage.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestScriptLanguage.java?rev=1056536&view=auto
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestScriptLanguage.java (added)
+++ pig/trunk/test/org/apache/pig/test/TestScriptLanguage.java Fri Jan  7 22:16:22 2011
@@ -0,0 +1,340 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.test;
+
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.pig.ExecType;
+import org.apache.pig.PigRunner;
+import org.apache.pig.PigServer;
+import org.apache.pig.scripting.ScriptEngine;
+import org.apache.pig.tools.pigstats.PigStats;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class TestScriptLanguage {
+
+    static MiniCluster cluster = MiniCluster.buildCluster();
+    private PigServer pigServer;
+    
+    @BeforeClass
+    public static void setUpBeforeClass() throws Exception {
+        
+    }
+
+    @AfterClass
+    public static void tearDownAfterClass() throws Exception {
+        cluster.shutDown();
+    }
+
+    @Before
+    public void setUp() throws Exception {
+        pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        Util.deleteFile(cluster, "simple_out");
+    }
+    
+    @Test
+    public void firstTest() throws Exception {
+        String[] script = {
+                "#!/usr/bin/python",
+                "from org.apache.pig.scripting import *",
+                "Pig.fs(\"-rmr simple_out\")",
+                "input = 'simple_table'",
+                "output = 'simple_out'",
+                "P = Pig.compile(\"\"\"a = load '$input';store a into '$output';\"\"\")",
+                "Q = P.bind({'input':input, 'output':output})",
+                "stats = Q.runSingle()",
+                "if stats.isSuccessful():",
+                "\tprint 'success!'",
+                "else:",
+                "\traise 'failed'"
+        };
+        String[] input = {
+                "1\t3",
+                "2\t4",
+                "3\t5"
+        };
+        
+        Util.createInputFile(cluster, "simple_table", input);
+        Util.createLocalInputFile( "testScript.py", script);
+        
+        ScriptEngine scriptEngine = ScriptEngine.getInstance("jython");
+        Map<String, List<PigStats>> statsMap = scriptEngine.run(pigServer.getPigContext(), "testScript.py");
+        assertEquals(1, statsMap.size());        
+        Iterator<List<PigStats>> it = statsMap.values().iterator();      
+        PigStats stats = it.next().get(0);
+        assertTrue(stats.isSuccessful());
+        assertEquals(1, stats.getNumberJobs());
+        String name = stats.getOutputNames().get(0);
+        assertEquals("simple_out", name);
+        assertEquals(12, stats.getBytesWritten());
+        assertEquals(3, stats.getRecordWritten());     
+    }
+    
+    @Test
+    public void secondTest() throws Exception {
+        String[] script = {
+                "#!/usr/bin/python",
+                "from org.apache.pig.scripting import *",
+                "Pig.fs(\"-rmr simple_out\")",
+                "input = 'simple_table_6'",
+                "output = 'simple_out'",
+                "P = Pig.compileFromFile(\"\"\"testScript.pig\"\"\")",
+                "Q = P.bind({'input':input, 'output':output})",
+                "stats = Q.runSingle()",
+                "if stats.isSuccessful():",
+                "\tprint 'success!'",
+                "else:",
+                "\traise 'failed'"
+        };
+        String[] input = {
+                "1\t3",
+                "2\t4",
+                "3\t5"
+        };
+        
+        String[] pigLatin = {
+                "a = load '$input';",
+                "store a into '$output';"
+        };
+        
+        Util.createInputFile(cluster, "simple_table_6", input);
+        Util.createLocalInputFile( "testScript.py", script);
+        Util.createLocalInputFile( "testScript.pig", pigLatin);
+        
+        ScriptEngine scriptEngine = ScriptEngine.getInstance("jython");
+        Map<String, List<PigStats>> statsMap = scriptEngine.run(pigServer.getPigContext(), "testScript.py");
+        assertEquals(1, statsMap.size());        
+        Iterator<List<PigStats>> it = statsMap.values().iterator();      
+        PigStats stats = it.next().get(0);
+        assertTrue(stats.isSuccessful());
+        assertEquals(1, stats.getNumberJobs());
+        String name = stats.getOutputNames().get(0);
+        assertEquals("simple_out", name);
+        assertEquals(12, stats.getBytesWritten());
+        assertEquals(3, stats.getRecordWritten());     
+    }
+    
+    @Test
+    public void firstParallelTest() throws Exception {
+        String[] script = {
+                "#!/usr/bin/python",
+                "from org.apache.pig.scripting import *",
+                "Pig.fs(\"-rmr simple_out\")",
+                "Pig.fs(\"-rmr simple_out2\")",
+                "input = 'simple_table_1'",
+                "output1 = 'simple_out'",
+                "output2 = 'simple_out'",
+                "P = Pig.compile(\"mypipeline\", \"\"\"a = load '$input';store a into '$output';\"\"\")",
+                "Q = P.bind([{'input':input, 'output':output1}, {'input':input, 'output':output2}])",
+                "stats = Q.run()"
+        };
+        String[] input = {
+                "1\t3",
+                "2\t4",
+                "3\t5"
+        };
+        
+        Util.createInputFile(cluster, "simple_table_1", input);
+        Util.createLocalInputFile( "testScript.py", script);
+        
+        ScriptEngine scriptEngine = ScriptEngine.getInstance("jython");
+        Map<String, List<PigStats>> statsMap = scriptEngine.run(pigServer.getPigContext(), "testScript.py");
+        assertEquals(1, statsMap.size());
+        assertEquals("mypipeline", statsMap.keySet().iterator().next());
+        List<PigStats> lst = statsMap.get("mypipeline");
+        assertEquals(2, lst.size());
+        for (PigStats stats : lst) {
+            assertTrue(stats.isSuccessful());
+            assertEquals(1, stats.getNumberJobs());
+            assertEquals(12, stats.getBytesWritten());
+            assertEquals(3, stats.getRecordWritten());     
+        }
+    }
+    
+    @Test
+    public void pigRunnerTest() throws Exception {
+        String[] script = {
+                "from org.apache.pig.scripting import *",
+                "Pig.fs(\"-rmr simple_out\")",
+                "input = 'simple_table_2'",
+                "output = 'simple_out'",
+                "P = Pig.compile(\"\"\"a = load '$input';store a into '$output';\"\"\")",
+                "Q = P.bind({'input':input, 'output':output})",
+                "stats = Q.runSingle()",
+                "if stats.isSuccessful():",
+                "\tprint 'success!'",
+                "else:",
+                "\traise 'failed'"
+        };
+        String[] input = {
+                "1\t3",
+                "2\t4",
+                "3\t5"
+        };
+        
+        Util.createInputFile(cluster, "simple_table_2", input);
+        Util.createLocalInputFile( "testScript.py", script);
+          
+        String[] args = { "-g", "jython", "testScript.py" };
+        
+        PigStats mainStats = PigRunner.run(args, new TestPigRunner.TestNotificationListener());
+        assertTrue(mainStats.isEmbedded());
+        assertTrue(mainStats.isSuccessful());
+        Map<String, List<PigStats>> statsMap = mainStats.getAllStats();
+        assertEquals(1, statsMap.size());        
+        Iterator<List<PigStats>> it = statsMap.values().iterator();      
+        PigStats stats = it.next().get(0);
+        assertTrue(stats.isSuccessful());
+        assertEquals(1, stats.getNumberJobs());
+        String name = stats.getOutputNames().get(0);
+        assertEquals("simple_out", name);
+        assertEquals(12, stats.getBytesWritten());
+        assertEquals(3, stats.getRecordWritten());     
+    }
+    
+    @Test
+    public void runParallelTest() throws Exception {
+        String[] script = {
+                "#!/usr/bin/python",
+                "from org.apache.pig.scripting import *",
+                "input = 'simple_table_3'",
+                "Pig.fs(\"-rmr simple_out\")",
+                "Pig.fs(\"-rmr simple_out2\")",
+                "output1 = 'simple_out'",
+                "output2 = 'simple_out2'",
+                "P = Pig.compile(\"mypipeline\", \"\"\"a = load '$input';store a into '$output';\"\"\")",
+                "Q = P.bind([{'input':input, 'output':output1}, {'input':input, 'output':output2}])",
+                "stats = Q.run()"
+        };
+        String[] input = {
+                "1\t3",
+                "2\t4",
+                "3\t5"
+        };
+        
+        Util.createInputFile(cluster, "simple_table_3", input);
+        Util.createLocalInputFile( "testScript.py", script);
+        
+        String[] args = { "-g", "jython", "testScript.py" };
+        PigStats mainStats = PigRunner.run(args, new TestPigRunner.TestNotificationListener());
+        assertTrue(mainStats.isEmbedded());
+        assertTrue(mainStats.isSuccessful());
+        Map<String, List<PigStats>> statsMap = mainStats.getAllStats();
+        assertEquals(1, statsMap.size());
+        assertEquals("mypipeline", statsMap.keySet().iterator().next());
+        List<PigStats> lst = statsMap.get("mypipeline");
+        assertEquals(2, lst.size());
+        for (PigStats stats : lst) {
+            assertTrue(stats.isSuccessful());
+            assertEquals(1, stats.getNumberJobs());
+            assertEquals(12, stats.getBytesWritten());
+            assertEquals(3, stats.getRecordWritten());     
+        }
+    }
+    
+    @Test
+    public void runLoopTest() throws Exception {
+        String[] script = {
+                "#!/usr/bin/python",
+                "from org.apache.pig.scripting import *",
+                "Pig.fs(\"-rmr simple_out\")",
+                "Pig.fs(\"-rmr simple_out2\")",
+                "input = 'simple_table_4'",
+                "P = Pig.compile(\"mypipeline\", \"\"\"a = load '$input';store a into '$output';\"\"\")",
+                "for x in [\"simple_out\", \"simple_out2\"]:",
+                "\tQ = P.bind({'input':input, 'output':x}).run()"
+        };
+        String[] input = {
+                "1\t3",
+                "2\t4",
+                "3\t5"
+        };
+        
+        Util.createInputFile(cluster, "simple_table_4", input);
+        Util.createLocalInputFile( "testScript.py", script);
+        
+        String[] args = { "-g", "jython", "testScript.py" };
+        PigStats mainStats = PigRunner.run(args, new TestPigRunner.TestNotificationListener());
+        assertTrue(mainStats.isEmbedded());
+        assertTrue(mainStats.isSuccessful());
+        Map<String, List<PigStats>> statsMap = mainStats.getAllStats();
+        assertEquals(1, statsMap.size());
+        assertEquals("mypipeline", statsMap.keySet().iterator().next());
+        List<PigStats> lst = statsMap.get("mypipeline");
+        assertEquals(2, lst.size());
+        for (PigStats stats : lst) {
+            assertTrue(stats.isSuccessful());
+            assertEquals(1, stats.getNumberJobs());
+            assertEquals(12, stats.getBytesWritten());
+            assertEquals(3, stats.getRecordWritten());     
+        }
+    }
+    
+    @Test
+    public void bindLocalVariableTest() throws Exception {
+        String[] script = {
+                "#!/usr/bin/python",
+                "from org.apache.pig.scripting import *",
+                "Pig.fs(\"-rmr simple_out\")",
+                "input = 'simple_table_5'",
+                "output = 'simple_out'",
+                "P = Pig.compile(\"\"\"a = load '$input';store a into '$output';\"\"\")",
+                "Q = P.bind()",
+                "stats = Q.runSingle()",
+                "if stats.isSuccessful():",
+                "\tprint 'success!'",
+                "else:",
+                "\traise 'failed'"
+        };
+        String[] input = {
+                "1\t3",
+                "2\t4",
+                "3\t5"
+        };
+        
+        Util.createInputFile(cluster, "simple_table_5", input);
+        Util.createLocalInputFile( "testScript.py", script);
+        
+        ScriptEngine scriptEngine = ScriptEngine.getInstance("jython");
+        Map<String, List<PigStats>> statsMap = scriptEngine.run(pigServer.getPigContext(), "testScript.py");
+        assertEquals(1, statsMap.size());        
+        Iterator<List<PigStats>> it = statsMap.values().iterator();      
+        PigStats stats = it.next().get(0);
+        assertTrue(stats.isSuccessful());
+        assertEquals(1, stats.getNumberJobs());
+        String name = stats.getOutputNames().get(0);
+        assertEquals("simple_out", name);
+        assertEquals(12, stats.getBytesWritten());
+        assertEquals(3, stats.getRecordWritten());     
+    }
+
+}