You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ch...@apache.org on 2013/11/11 22:50:50 UTC

svn commit: r1540855 - in /pig/trunk: ./ src/org/apache/pig/tools/pigstats/ src/org/apache/pig/tools/pigstats/mapreduce/ test/org/apache/pig/test/

Author: cheolsoo
Date: Mon Nov 11 21:50:50 2013
New Revision: 1540855

URL: http://svn.apache.org/r1540855
Log:
PIG-3561: Clean up PigStats and JobStats after PIG-3419 (cheolsoo)

Modified:
    pig/trunk/CHANGES.txt
    pig/trunk/src/org/apache/pig/tools/pigstats/JobStats.java
    pig/trunk/src/org/apache/pig/tools/pigstats/PigStats.java
    pig/trunk/src/org/apache/pig/tools/pigstats/mapreduce/MRJobStats.java
    pig/trunk/src/org/apache/pig/tools/pigstats/mapreduce/MRPigStatsUtil.java
    pig/trunk/src/org/apache/pig/tools/pigstats/mapreduce/SimplePigStats.java
    pig/trunk/test/org/apache/pig/test/TestPigRunner.java

Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1540855&r1=1540854&r2=1540855&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Mon Nov 11 21:50:50 2013
@@ -46,6 +46,8 @@ OPTIMIZATIONS
  
 BUG FIXES
 
+PIG-3561: Clean up PigStats and JobStats after PIG-3419 (cheolsoo)
+
 PIG-3553: HadoopJobHistoryLoader fails to load job history on hadoop v 1.2 (lgiri via cheolsoo)
 
 PIG-3559: Trunk is broken by PIG-3522 (cheolsoo)

Modified: pig/trunk/src/org/apache/pig/tools/pigstats/JobStats.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/tools/pigstats/JobStats.java?rev=1540855&r1=1540854&r2=1540855&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/tools/pigstats/JobStats.java (original)
+++ pig/trunk/src/org/apache/pig/tools/pigstats/JobStats.java Mon Nov 11 21:50:50 2013
@@ -26,6 +26,7 @@ import java.util.Map;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapred.Counters;
 import org.apache.pig.classification.InterfaceAudience;
 import org.apache.pig.classification.InterfaceStability;
@@ -58,6 +59,8 @@ public abstract class JobStats extends O
 
     protected ArrayList<InputStats> inputs;
 
+    protected Configuration conf;
+
     private String errorMsg;
 
     private Exception exception = null;
@@ -70,6 +73,13 @@ public abstract class JobStats extends O
 
     public abstract String getJobId();
 
+    public void setConf(Configuration conf) {
+        if (conf == null) {
+            return;
+        }
+        this.conf = conf;
+    }
+
     public JobState getState() { return state; }
 
     public boolean isSuccessful() { return (state == JobState.SUCCESS); }
@@ -304,4 +314,3 @@ public abstract class JobStats extends O
     abstract public Map<String, Long> getMultiStoreCounters();
 
 }
-

Modified: pig/trunk/src/org/apache/pig/tools/pigstats/PigStats.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/tools/pigstats/PigStats.java?rev=1540855&r1=1540854&r2=1540855&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/tools/pigstats/PigStats.java (original)
+++ pig/trunk/src/org/apache/pig/tools/pigstats/PigStats.java Mon Nov 11 21:50:50 2013
@@ -28,11 +28,14 @@ import java.util.Properties;
 import org.apache.commons.collections.IteratorUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapred.JobClient;
 import org.apache.pig.PigException;
 import org.apache.pig.PigRunner.ReturnCode;
 import org.apache.pig.classification.InterfaceAudience;
+import org.apache.pig.classification.InterfaceAudience.Private;
 import org.apache.pig.classification.InterfaceStability;
+import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.logicalLayer.FrontendException;
 import org.apache.pig.impl.util.SpillableMemoryManager;
 import org.apache.pig.newplan.BaseOperatorPlan;
@@ -42,26 +45,34 @@ import org.apache.pig.newplan.PlanVisito
 import org.apache.pig.tools.pigstats.JobStats.JobState;
 import org.apache.pig.tools.pigstats.mapreduce.SimplePigStats;
 
+import com.google.common.collect.Maps;
+
 /**
- * PigStats encapsulates the statistics collected from a running script. 
- * It includes status of the execution, the DAG of its MR jobs, as well as 
- * information about outputs and inputs of the script. 
+ * PigStats encapsulates the statistics collected from a running script. It
+ * includes status of the execution, the DAG of its Hadoop jobs, as well as
+ * information about outputs and inputs of the script.
  */
 
 @InterfaceAudience.Public
 @InterfaceStability.Evolving
 public abstract class PigStats {
-    
     private static final Log LOG = LogFactory.getLog(PigStats.class);
-    
     private static ThreadLocal<PigStats> tps = new ThreadLocal<PigStats>();
-    
+
+    protected static final String DATE_FORMAT = "yyyy-MM-dd HH:mm:ss";
+    protected long startTime = -1;
+    protected long endTime = -1;
+
+    protected String userId;
+    protected JobGraph jobPlan;
+    protected PigContext pigContext;
+    protected Map<String, OutputStats> aliasOuputMap;
+
+    protected int errorCode = -1;
+    protected String errorMessage = null;
+    protected Throwable errorThrowable = null;
     protected int returnCode = ReturnCode.UNKNOWN;
-    
-    private String errorMessage;
-    private int errorCode = -1;
-    private Throwable errorThrowable = null;
-    
+
     public static PigStats get() {
         if (tps.get() == null) {
             LOG.info("PigStats has not been set. Defaulting to SimplePigStats");
@@ -69,37 +80,37 @@ public abstract class PigStats {
         }
         return tps.get();
     }
-    
+
     static void set(PigStats stats) {
         tps.set(stats);
     }
-        
+
     public static PigStats start(PigStats stats) {
         tps.set(stats);
         return tps.get();
     }
-    
+
     /**
      * Returns code are defined in {@link ReturnCode}
      */
     public int getReturnCode() {
         return returnCode;
     }
-    
+
     /**
      * Returns error message string
      */
     public String getErrorMessage() {
         return errorMessage;
     }
-    
+
     /**
      * Returns the error code of {@link PigException}
      */
     public int getErrorCode() {
         return errorCode;
     }
-    
+
     /**
      * Returns the error code of {@link PigException}
      */
@@ -110,152 +121,269 @@ public abstract class PigStats {
     public abstract JobClient getJobClient();
 
     public abstract boolean isEmbedded();
-    
-    public abstract boolean isSuccessful();
- 
+
+    public boolean isSuccessful() {
+        return (getNumberJobs() == 0 && returnCode == ReturnCode.UNKNOWN
+                || returnCode == ReturnCode.SUCCESS);
+    }
+
     public abstract Map<String, List<PigStats>> getAllStats();
-    
-    public abstract List<String> getAllErrorMessages();       
-        
+
+    public abstract List<String> getAllErrorMessages();
+
     /**
      * Returns the properties associated with the script
      */
-    public abstract Properties getPigProperties();
-    
+    public Properties getPigProperties() {
+        if (pigContext == null) {
+            return null;
+        }
+        return pigContext.getProperties();
+    }
+
     /**
-     * Returns the DAG of the MR jobs spawned by the script
+     * Returns the DAG of jobs spawned by the script
      */
-    public abstract JobGraph getJobGraph();
-    
+    public JobGraph getJobGraph() {
+        return jobPlan;
+    }
+
     /**
      * Returns the list of output locations in the script
      */
-    public abstract List<String> getOutputLocations();
-    
+    public List<String> getOutputLocations() {
+        ArrayList<String> locations = new ArrayList<String>();
+        for (OutputStats output : getOutputStats()) {
+            locations.add(output.getLocation());
+        }
+        return Collections.unmodifiableList(locations);
+    }
+
     /**
      * Returns the list of output names in the script
      */
-    public abstract List<String> getOutputNames();
-    
+    public List<String> getOutputNames() {
+        ArrayList<String> names = new ArrayList<String>();
+        for (OutputStats output : getOutputStats()) {
+            names.add(output.getName());
+        }
+        return Collections.unmodifiableList(names);
+    }
+
     /**
      * Returns the number of bytes for the given output location,
      * -1 for invalid location or name.
      */
-    public abstract long getNumberBytes(String location);
-    
+    public long getNumberBytes(String location) {
+        if (location == null) return -1;
+        String name = new Path(location).getName();
+        long count = -1;
+        for (OutputStats output : getOutputStats()) {
+            if (name.equals(output.getName())) {
+                count = output.getBytes();
+                break;
+            }
+        }
+        return count;
+    }
+
     /**
      * Returns the number of records for the given output location,
      * -1 for invalid location or name.
      */
-    public abstract long getNumberRecords(String location);
-        
+    public long getNumberRecords(String location) {
+        if (location == null) return -1;
+        String name = new Path(location).getName();
+        long count = -1;
+        for (OutputStats output : getOutputStats()) {
+            if (name.equals(output.getName())) {
+                count = output.getNumberRecords();
+                break;
+            }
+        }
+        return count;
+    }
+
     /**
      * Returns the alias associated with this output location
      */
-    public abstract String getOutputAlias(String location);
-    
+    public String getOutputAlias(String location) {
+        if (location == null) {
+            return null;
+        }
+        String name = new Path(location).getName();
+        String alias = null;
+        for (OutputStats output : getOutputStats()) {
+            if (name.equals(output.getName())) {
+                alias = output.getAlias();
+                break;
+            }
+        }
+        return alias;
+    }
+
     /**
      * Returns the total spill counts from {@link SpillableMemoryManager}.
      */
     public abstract long getSMMSpillCount();
-    
+
     /**
      * Returns the total number of bags that spilled proactively
      */
     public abstract long getProactiveSpillCountObjects();
-    
+
     /**
      * Returns the total number of records that spilled proactively
      */
     public abstract long getProactiveSpillCountRecords();
-    
+
     /**
      * Returns the total bytes written to user specified HDFS
      * locations of this script.
      */
-    public abstract long getBytesWritten();
-    
+    public long getBytesWritten() {
+        Iterator<JobStats> it = jobPlan.iterator();
+        long ret = 0;
+        while (it.hasNext()) {
+            long n = it.next().getBytesWritten();
+            if (n > 0) ret += n;
+        }
+        return ret;
+    }
+
     /**
      * Returns the total number of records in user specified output
      * locations of this script.
      */
-    public abstract long getRecordWritten();
+    public long getRecordWritten() {
+        Iterator<JobStats> it = jobPlan.iterator();
+        long ret = 0;
+        while (it.hasNext()) {
+            long n = it.next().getRecordWrittern();
+            if (n > 0) ret += n;
+        }
+        return ret;
+    }
 
     public String getHadoopVersion() {
         return ScriptState.get().getHadoopVersion();
     }
-    
+
     public String getPigVersion() {
         return ScriptState.get().getPigVersion();
     }
-   
-    public abstract String getScriptId();
-    
-    public abstract String getFeatures();
-    
-    public abstract long getDuration();
-    
-    /**
-     * Returns the number of MR jobs for this script
-     */
-    public abstract int getNumberJobs();
-        
-    public abstract List<OutputStats> getOutputStats();
-    
-    public abstract OutputStats result(String alias);
-    
-    public abstract List<InputStats> getInputStats();    
-    
-    void setErrorMessage(String errorMessage) {
+
+    public String getScriptId() {
+        return ScriptState.get().getId();
+    }
+
+    public String getFeatures() {
+        return ScriptState.get().getScriptFeatures();
+    }
+
+    public long getDuration() {
+        return (startTime > 0 && endTime > 0) ? (endTime - startTime) : -1;
+    }
+
+    /**
+     * Returns the number of jobs for this script
+     */
+    public int getNumberJobs() {
+        return jobPlan.size();
+    }
+
+    public List<OutputStats> getOutputStats() {
+        List<OutputStats> outputs = new ArrayList<OutputStats>();
+        Iterator<JobStats> iter = jobPlan.iterator();
+        while (iter.hasNext()) {
+            for (OutputStats os : iter.next().getOutputs()) {
+                outputs.add(os);
+            }
+        }
+        return Collections.unmodifiableList(outputs);
+    }
+
+    public OutputStats result(String alias) {
+        if (aliasOuputMap == null) {
+            aliasOuputMap = Maps.newHashMap();
+            Iterator<JobStats> iter = jobPlan.iterator();
+            while (iter.hasNext()) {
+                for (OutputStats os : iter.next().getOutputs()) {
+                    String a = os.getAlias();
+                    if (a == null || a.length() == 0) {
+                        LOG.warn("Output alias isn't avalable for " + os.getLocation());
+                        continue;
+                    }
+                    aliasOuputMap.put(a, os);
+                }
+            }
+        }
+        return aliasOuputMap.get(alias);
+    }
+
+    public List<InputStats> getInputStats() {
+        List<InputStats> inputs = new ArrayList<InputStats>();
+        Iterator<JobStats> iter = jobPlan.iterator();
+        while (iter.hasNext()) {
+            for (InputStats is : iter.next().getInputs()) {
+                inputs.add(is);
+            }
+        }
+        return Collections.unmodifiableList(inputs);
+    }
+
+    public void setErrorMessage(String errorMessage) {
         this.errorMessage = errorMessage;
     }
-    
-    void setErrorCode(int errorCode) {
+
+    public void setErrorCode(int errorCode) {
         this.errorCode = errorCode;
-    } 
-    
-    void setErrorThrowable(Throwable t) {
+    }
+
+    public void setErrorThrowable(Throwable t) {
         this.errorThrowable = t;
     }
-    
+
+    public void setReturnCode(int returnCode) {
+        this.returnCode = returnCode;
+    }
+
     /**
      * This class prints a JobGraph
      */
     public static class JobGraphPrinter extends PlanVisitor {
-        
+
         StringBuffer buf;
 
         protected JobGraphPrinter(OperatorPlan plan) {
-            super(plan,
-                    new org.apache.pig.newplan.DependencyOrderWalker(
-                            plan));
+            super(plan, new org.apache.pig.newplan.DependencyOrderWalker(plan));
             buf = new StringBuffer();
         }
-        
+
         public void visit(JobStats op) throws FrontendException {
             buf.append(op.getJobId());
             List<Operator> succs = plan.getSuccessors(op);
             if (succs != null) {
                 buf.append("\t->\t");
-                for (Operator p : succs) {                  
+                for (Operator p : succs) {
                     buf.append(((JobStats)p).getJobId()).append(",");
-                }               
+                }
             }
             buf.append("\n");
         }
-        
+
         @Override
         public String toString() {
             buf.append("\n");
             return buf.toString();
-        }        
+        }
     }
-    
+
     /**
      * JobGraph is an {@link OperatorPlan} whose members are {@link JobStats}
      */
     public static class JobGraph extends BaseOperatorPlan implements Iterable<JobStats>{
-                
+
         @Override
         public String toString() {
             JobGraphPrinter jp = new JobGraphPrinter(this);
@@ -266,11 +394,11 @@ public abstract class PigStats {
             }
             return jp.toString();
         }
-        
+
         /**
          * Returns a List representation of the Job graph. Returned list is an
          * ArrayList
-         * 
+         *
          * @return List<JobStats>
          */
         @SuppressWarnings("unchecked")
@@ -280,34 +408,34 @@ public abstract class PigStats {
 
         public Iterator<JobStats> iterator() {
             return new Iterator<JobStats>() {
-                private Iterator<Operator> iter = getOperators();                
+                private Iterator<Operator> iter = getOperators();
                 @Override
-                public boolean hasNext() {                
+                public boolean hasNext() {
                     return iter.hasNext();
                 }
                 @Override
-                public JobStats next() {              
+                public JobStats next() {
                     return (JobStats)iter.next();
                 }
                 @Override
                 public void remove() {}
             };
         }
- 
+
         public boolean isConnected(Operator from, Operator to) {
             List<Operator> succs = null;
             succs = getSuccessors(from);
             if (succs != null) {
                 for (Operator succ: succs) {
-                    if (succ.getName().equals(to.getName()) 
+                    if (succ.getName().equals(to.getName())
                             || isConnected(succ, to)) {
                         return true;
-                    }                    
+                    }
                 }
             }
             return false;
         }
-        
+
         public List<JobStats> getSuccessfulJobs() {
             ArrayList<JobStats> lst = new ArrayList<JobStats>();
             Iterator<JobStats> iter = iterator();
@@ -320,7 +448,7 @@ public abstract class PigStats {
             Collections.sort(lst, new JobComparator());
             return lst;
         }
-        
+
         public List<JobStats> getFailedJobs() {
             ArrayList<JobStats> lst = new ArrayList<JobStats>();
             Iterator<JobStats> iter = iterator();
@@ -329,19 +457,79 @@ public abstract class PigStats {
                 if (js.getState() == JobState.FAILED) {
                     lst.add(js);
                 }
-            }            
+            }
             return lst;
         }
-    }    
-    
+    }
+
     private static class JobComparator implements Comparator<JobStats> {
         @Override
-        public int compare(JobStats o1, JobStats o2) {           
+        public int compare(JobStats o1, JobStats o2) {
             return o1.getJobId().compareTo(o2.getJobId());
-        }       
-    }    
-    
-    void setReturnCode(int returnCode) {
-        this.returnCode = returnCode; 
+        }
+    }
+
+    @Private
+    public void setBackendException(String jobId, Exception e) {
+        if (e instanceof PigException) {
+            LOG.error("ERROR " + ((PigException)e).getErrorCode() + ": "
+                    + e.getLocalizedMessage());
+        } else if (e != null) {
+            LOG.error("ERROR: " + e.getLocalizedMessage());
+        }
+
+        if (jobId == null || e == null) {
+            LOG.debug("unable to set backend exception");
+            return;
+        }
+        Iterator<JobStats> iter = jobPlan.iterator();
+        while (iter.hasNext()) {
+            JobStats js = iter.next();
+            if (jobId.equals(js.getJobId())) {
+                js.setBackendException(e);
+                break;
+            }
+        }
+    }
+
+    @Private
+    public PigContext getPigContext() {
+        return pigContext;
+    }
+
+    public void start() {
+        startTime = System.currentTimeMillis();
+        userId = System.getProperty("user.name");
+    }
+
+    public void stop() {
+        endTime = System.currentTimeMillis();
+        int failed = getNumberFailedJobs();
+        int succeeded = getNumberSuccessfulJobs();
+        if (failed == 0 && succeeded > 0 && succeeded == jobPlan.size()) {
+            returnCode = ReturnCode.SUCCESS;
+        } else if (succeeded > 0 && succeeded < jobPlan.size()) {
+            returnCode = ReturnCode.PARTIAL_FAILURE;
+        } else {
+            returnCode = ReturnCode.FAILURE;
+        }
+    }
+
+    public int getNumberSuccessfulJobs() {
+        Iterator<JobStats> iter = jobPlan.iterator();
+        int count = 0;
+        while (iter.hasNext()) {
+            if (iter.next().getState() == JobState.SUCCESS) count++;
+        }
+        return count;
+    }
+
+    public int getNumberFailedJobs() {
+        Iterator<JobStats> iter = jobPlan.iterator();
+        int count = 0;
+        while (iter.hasNext()) {
+            if (iter.next().getState() == JobState.FAILED) count++;
+        }
+        return count;
     }
 }

Modified: pig/trunk/src/org/apache/pig/tools/pigstats/mapreduce/MRJobStats.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/tools/pigstats/mapreduce/MRJobStats.java?rev=1540855&r1=1540854&r2=1540855&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/tools/pigstats/mapreduce/MRJobStats.java (original)
+++ pig/trunk/src/org/apache/pig/tools/pigstats/mapreduce/MRJobStats.java Mon Nov 11 21:50:50 2013
@@ -57,14 +57,14 @@ import org.apache.pig.tools.pigstats.Pig
 
 
 /**
- * This class encapsulates the runtime statistics of a MapReduce job. 
+ * This class encapsulates the runtime statistics of a MapReduce job.
  * Job statistics is collected when job is completed.
  */
 @InterfaceAudience.Public
 @InterfaceStability.Evolving
 public final class MRJobStats extends JobStats {
-        
-    
+
+
     MRJobStats(String name, JobGraph plan) {
         super(name, plan);
     }
@@ -72,27 +72,25 @@ public final class MRJobStats extends Jo
     public static final String SUCCESS_HEADER = "JobId\tMaps\tReduces\t" +
             "MaxMapTime\tMinMapTIme\tAvgMapTime\tMedianMapTime\tMaxReduceTime\t" +
             "MinReduceTime\tAvgReduceTime\tMedianReducetime\tAlias\tFeature\tOutputs";
-   
+
     public static final String FAILURE_HEADER = "JobId\tAlias\tFeature\tMessage\tOutputs";
-    
+
     // currently counters are not working in local mode - see PIG-1286
     public static final String SUCCESS_HEADER_LOCAL = "JobId\tAlias\tFeature\tOutputs";
-    
+
     private static final Log LOG = LogFactory.getLog(MRJobStats.class);
-        
-    private Configuration conf;
-    
+
     private List<POStore> mapStores = null;
-    
+
     private List<POStore> reduceStores = null;
-    
+
     private List<FileSpec> loads = null;
-        
+
     private Boolean disableCounter = false;
-            
+
     @SuppressWarnings("deprecation")
     private JobID jobId;
-    
+
     private long maxMapTime = 0;
     private long minMapTime = 0;
     private long avgMapTime = 0;
@@ -104,7 +102,7 @@ public final class MRJobStats extends Jo
 
     private int numberMaps = 0;
     private int numberReduces = 0;
-    
+
     private long mapInputRecords = 0;
     private long mapOutputRecords = 0;
     private long reduceInputRecords = 0;
@@ -114,37 +112,37 @@ public final class MRJobStats extends Jo
     private long spillCount = 0;
     private long activeSpillCountObj = 0;
     private long activeSpillCountRecs = 0;
-    
-    private HashMap<String, Long> multiStoreCounters 
+
+    private HashMap<String, Long> multiStoreCounters
             = new HashMap<String, Long>();
-    
-    private HashMap<String, Long> multiInputCounters 
+
+    private HashMap<String, Long> multiInputCounters
             = new HashMap<String, Long>();
-        
+
     @SuppressWarnings("deprecation")
     private Counters counters = null;
-    
 
-    public String getJobId() { 
-        return (jobId == null) ? null : jobId.toString(); 
+
+    public String getJobId() {
+        return (jobId == null) ? null : jobId.toString();
     }
-        
+
     public int getNumberMaps() { return numberMaps; }
-    
+
     public int getNumberReduces() { return numberReduces; }
-    
+
     public long getMaxMapTime() { return maxMapTime; }
-    
+
     public long getMinMapTime() { return minMapTime; }
-    
+
     public long getAvgMapTime() { return avgMapTime; }
-    
+
     public long getMaxReduceTime() { return maxReduceTime; }
-    
+
     public long getMinReduceTime() { return minReduceTime; }
-    
-    public long getAvgREduceTime() { return avgReduceTime; }           
-        
+
+    public long getAvgREduceTime() { return avgReduceTime; }
+
     public long getMapInputRecords() { return mapInputRecords; }
 
     public long getMapOutputRecords() { return mapOutputRecords; }
@@ -154,13 +152,13 @@ public final class MRJobStats extends Jo
     public long getReduceInputRecords() { return reduceInputRecords; }
 
     public long getSMMSpillCount() { return spillCount; }
-    
+
     public long getProactiveSpillCountObjects() { return activeSpillCountObj; }
-    
+
     public long getProactiveSpillCountRecs() { return activeSpillCountRecs; }
-    
+
     public long getHdfsBytesWritten() { return hdfsBytesWritten; }
-    
+
     @SuppressWarnings("deprecation")
     public Counters getHadoopCounters() { return counters; }
 
@@ -168,11 +166,11 @@ public final class MRJobStats extends Jo
     public Map<String, Long> getMultiStoreCounters() {
         return Collections.unmodifiableMap(multiStoreCounters);
     }
-       
+
     public String getAlias() {
         return (String)getAnnotation(ALIAS);
     }
-    
+
     public String getAliasLocation() {
         return (String)getAnnotation(ALIAS_LOCATION);
     }
@@ -180,7 +178,7 @@ public final class MRJobStats extends Jo
     public String getFeature() {
         return (String)getAnnotation(FEATURE);
     }
-        
+
     @Override
     public void accept(PlanVisitor v) throws FrontendException {
         if (v instanceof JobGraphPrinter) {
@@ -194,43 +192,43 @@ public final class MRJobStats extends Jo
         this.jobId = jobId;
     }
 
+    @Override
     @SuppressWarnings("unchecked")
-    void setConf(Configuration conf) {        
-        if (conf == null) return;
-        this.conf = conf;
+    public void setConf(Configuration conf) {
+        super.setConf(conf);
         try {
             this.mapStores = (List<POStore>) ObjectSerializer.deserialize(conf
                     .get(JobControlCompiler.PIG_MAP_STORES));
             this.reduceStores = (List<POStore>) ObjectSerializer.deserialize(conf
-                    .get(JobControlCompiler.PIG_REDUCE_STORES));           
+                    .get(JobControlCompiler.PIG_REDUCE_STORES));
             this.loads = (ArrayList<FileSpec>) ObjectSerializer.deserialize(conf
                     .get("pig.inputs"));
             this.disableCounter = conf.getBoolean("pig.disable.counter", false);
         } catch (IOException e) {
             LOG.warn("Failed to deserialize the store list", e);
-        }                    
+        }
     }
-    
+
     void setMapStat(int size, long max, long min, long avg, long median) {
         numberMaps = size;
         maxMapTime = max;
         minMapTime = min;
-        avgMapTime = avg;    
+        avgMapTime = avg;
         medianMapTime = median;
     }
-    
+
     void setReduceStat(int size, long max, long min, long avg, long median) {
         numberReduces = size;
         maxReduceTime = max;
         minReduceTime = min;
-        avgReduceTime = avg;       
+        avgReduceTime = avg;
         medianReduceTime = median;
-    }  
-    
+    }
+
     public String getDisplayString(boolean local) {
         StringBuilder sb = new StringBuilder();
         String id = (jobId == null) ? "N/A" : jobId.toString();
-        if (state == JobState.FAILED || local) {           
+        if (state == JobState.FAILED || local) {
             sb.append(id).append("\t")
                 .append(getAlias()).append("\t")
                 .append(getFeature()).append("\t");
@@ -243,7 +241,7 @@ public final class MRJobStats extends Jo
                 .append(numberReduces).append("\t");
             if (numberMaps == 0) {
                 sb.append("n/a\t").append("n/a\t").append("n/a\t").append("n/a\t");
-            } else { 
+            } else {
                 sb.append(maxMapTime/1000).append("\t")
                     .append(minMapTime/1000).append("\t")
                     .append(avgMapTime/1000).append("\t")
@@ -262,7 +260,7 @@ public final class MRJobStats extends Jo
         }
         for (OutputStats os : outputs) {
             sb.append(os.getLocation()).append(",");
-        }        
+        }
         sb.append("\n");
         return sb.toString();
     }
@@ -295,9 +293,9 @@ public final class MRJobStats extends Jo
             reduceOutputRecords = taskgroup.getCounterForName(
                     MRPigStatsUtil.REDUCE_OUTPUT_RECORDS).getCounter();
             hdfsBytesRead = hdfsgroup.getCounterForName(
-                    MRPigStatsUtil.HDFS_BYTES_READ).getCounter();      
+                    MRPigStatsUtil.HDFS_BYTES_READ).getCounter();
             hdfsBytesWritten = hdfsgroup.getCounterForName(
-                    MRPigStatsUtil.HDFS_BYTES_WRITTEN).getCounter();            
+                    MRPigStatsUtil.HDFS_BYTES_WRITTEN).getCounter();
             spillCount = counters.findCounter(
                     PigCounters.SPILLABLE_MEMORY_MANAGER_SPILL_COUNT)
                     .getCounter();
@@ -310,23 +308,23 @@ public final class MRJobStats extends Jo
             while (iter.hasNext()) {
                 Counter cter = iter.next();
                 multiStoreCounters.put(cter.getName(), cter.getValue());
-            }     
-            
+            }
+
             Iterator<Counter> iter2 = multiloadgroup.iterator();
             while (iter2.hasNext()) {
                 Counter cter = iter2.next();
                 multiInputCounters.put(cter.getName(), cter.getValue());
-            } 
-            
-        }              
+            }
+
+        }
     }
-    
+
     void addMapReduceStatistics(JobClient client, Configuration conf) {
         TaskReport[] maps = null;
         try {
             maps = client.getMapTaskReports(jobId);
         } catch (IOException e) {
-            LOG.warn("Failed to get map task report", e);            
+            LOG.warn("Failed to get map task report", e);
         }
         if (maps != null && maps.length > 0) {
             int size = maps.length;
@@ -335,7 +333,7 @@ public final class MRJobStats extends Jo
             long median = 0;
             long total = 0;
             long durations[] = new long[size];
-            
+
             for (int i = 0; i < maps.length; i++) {
                 TaskReport rpt = maps[i];
                 long duration = rpt.getFinishTime() - rpt.getStartTime();
@@ -345,7 +343,7 @@ public final class MRJobStats extends Jo
                 total += duration;
             }
             long avg = total / size;
-            
+
             median = calculateMedianValue(durations);
             setMapStat(size, max, min, avg, median);
         } else {
@@ -354,7 +352,7 @@ public final class MRJobStats extends Jo
                 setMapStat(m, -1, -1, -1, -1);
             }
         }
-        
+
         TaskReport[] reduces = null;
         try {
             reduces = client.getReduceTaskReports(jobId);
@@ -368,7 +366,7 @@ public final class MRJobStats extends Jo
             long median = 0;
             long total = 0;
             long durations[] = new long[size];
-            
+
             for (int i = 0; i < reduces.length; i++) {
                 TaskReport rpt = reduces[i];
                 long duration = rpt.getFinishTime() - rpt.getStartTime();
@@ -387,32 +385,32 @@ public final class MRJobStats extends Jo
             }
         }
     }
-    
-    void setAlias(MapReduceOper mro) {       
+
+    void setAlias(MapReduceOper mro) {
         MRScriptState ss = MRScriptState.get();
-        annotate(ALIAS, ss.getAlias(mro));             
+        annotate(ALIAS, ss.getAlias(mro));
         annotate(ALIAS_LOCATION, ss.getAliasLocation(mro));
         annotate(FEATURE, ss.getPigFeature(mro));
     }
-    
+
     void addOutputStatistics() {
         if (mapStores == null || reduceStores == null) {
             LOG.warn("unable to get stores of the job");
             return;
         }
-        
+
         if (mapStores.size() + reduceStores.size() == 1) {
             POStore sto = (mapStores.size() > 0) ? mapStores.get(0)
                     : reduceStores.get(0);
             if (!sto.isTmpStore()) {
                 long records = (mapStores.size() > 0) ? mapOutputRecords
-                        : reduceOutputRecords;           
+                        : reduceOutputRecords;
                 OutputStats ds = new OutputStats(sto.getSFile().getFileName(),
                         hdfsBytesWritten, records, (state == JobState.SUCCESS));
                 ds.setPOStore(sto);
                 ds.setConf(conf);
                 outputs.add(ds);
-                
+
                 if (state == JobState.SUCCESS) {
                      MRScriptState.get().emitOutputCompletedNotification(ds);
                 }
@@ -425,10 +423,10 @@ public final class MRJobStats extends Jo
             for (POStore sto : reduceStores) {
                 if (sto.isTmpStore()) continue;
                 addOneOutputStats(sto);
-            }     
+            }
         }
     }
-    
+
     /**
      * Looks up the output size reader from OUTPUT_SIZE_READER_KEY and invokes
      * it to get the size of output. If OUTPUT_SIZE_READER_KEY is not set,
@@ -474,32 +472,32 @@ public final class MRJobStats extends Jo
         long bytes = getOutputSize(sto, conf);
         String location = sto.getSFile().getFileName();
         OutputStats ds = new OutputStats(location, bytes, records,
-                (state == JobState.SUCCESS));  
+                (state == JobState.SUCCESS));
         ds.setPOStore(sto);
         ds.setConf(conf);
         outputs.add(ds);
-        
+
         if (state == JobState.SUCCESS) {
              MRScriptState.get().emitOutputCompletedNotification(ds);
         }
     }
-       
+
     void addInputStatistics() {
         if (loads == null)  {
             LOG.warn("unable to get inputs of the job");
             return;
         }
-        
+
         if (loads.size() == 1) {
-            FileSpec fsp = loads.get(0); 
+            FileSpec fsp = loads.get(0);
             if (!MRPigStatsUtil.isTempFile(fsp.getFileName())) {
-                long records = mapInputRecords;       
+                long records = mapInputRecords;
                 InputStats is = new InputStats(fsp.getFileName(),
-                        hdfsBytesRead, records, (state == JobState.SUCCESS));              
+                        hdfsBytesRead, records, (state == JobState.SUCCESS));
                 is.setConf(conf);
                 if (isSampler()) is.markSampleInput();
                 if (isIndexer()) is.markIndexerInput();
-                inputs.add(is);                
+                inputs.add(is);
             }
         } else {
             for (int i=0; i<loads.size(); i++) {
@@ -507,14 +505,14 @@ public final class MRJobStats extends Jo
                 if (MRPigStatsUtil.isTempFile(fsp.getFileName())) continue;
                 addOneInputStats(fsp.getFileName(), i);
             }
-        }            
+        }
     }
-    
+
     private void addOneInputStats(String fileName, int index) {
         long records = -1;
         Long n = multiInputCounters.get(
                 MRPigStatsUtil.getMultiInputsCounterName(fileName, index));
-        if (n != null) {   
+        if (n != null) {
             records = n;
         } else {
             // the file could be empty
@@ -523,9 +521,9 @@ public final class MRJobStats extends Jo
                 LOG.warn("unable to get input counter for " + fileName);
             }
         }
-        InputStats is = new InputStats(fileName, -1, records, (state == JobState.SUCCESS));              
+        InputStats is = new InputStats(fileName, -1, records, (state == JobState.SUCCESS));
         is.setConf(conf);
         inputs.add(is);
     }
-    
+
 }

Modified: pig/trunk/src/org/apache/pig/tools/pigstats/mapreduce/MRPigStatsUtil.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/tools/pigstats/mapreduce/MRPigStatsUtil.java?rev=1540855&r1=1540854&r2=1540855&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/tools/pigstats/mapreduce/MRPigStatsUtil.java (original)
+++ pig/trunk/src/org/apache/pig/tools/pigstats/mapreduce/MRPigStatsUtil.java Mon Nov 11 21:50:50 2013
@@ -27,12 +27,14 @@ import org.apache.hadoop.mapred.JobClien
 import org.apache.hadoop.mapred.RunningJob;
 import org.apache.hadoop.mapred.jobcontrol.Job;
 import org.apache.hadoop.mapred.jobcontrol.JobControl;
+import org.apache.hadoop.mapreduce.JobID;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceOper;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.NativeMapReduceOper;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
 import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims;
+import org.apache.pig.classification.InterfaceAudience.Private;
 import org.apache.pig.impl.PigContext;
 import org.apache.pig.tools.pigstats.PigStats.JobGraph;
 import org.apache.pig.tools.pigstats.mapreduce.SimplePigStats;
@@ -48,41 +50,41 @@ import org.apache.pig.tools.pigstats.Job
  */
 public class MRPigStatsUtil extends PigStatsUtil {
 
-    public static final String MULTI_STORE_RECORD_COUNTER 
+    public static final String MULTI_STORE_RECORD_COUNTER
             = "Output records in ";
-    public static final String MULTI_STORE_COUNTER_GROUP 
+    public static final String MULTI_STORE_COUNTER_GROUP
             = "MultiStoreCounters";
-    public static final String TASK_COUNTER_GROUP 
+    public static final String TASK_COUNTER_GROUP
             = "org.apache.hadoop.mapred.Task$Counter";
-    public static final String FS_COUNTER_GROUP 
+    public static final String FS_COUNTER_GROUP
             = HadoopShims.getFsCounterGroupName();
-    public static final String MAP_INPUT_RECORDS 
+    public static final String MAP_INPUT_RECORDS
             = "MAP_INPUT_RECORDS";
-    public static final String MAP_OUTPUT_RECORDS 
+    public static final String MAP_OUTPUT_RECORDS
             = "MAP_OUTPUT_RECORDS";
-    public static final String REDUCE_INPUT_RECORDS 
+    public static final String REDUCE_INPUT_RECORDS
             = "REDUCE_INPUT_RECORDS";
-    public static final String REDUCE_OUTPUT_RECORDS 
+    public static final String REDUCE_OUTPUT_RECORDS
             = "REDUCE_OUTPUT_RECORDS";
-    public static final String HDFS_BYTES_WRITTEN 
+    public static final String HDFS_BYTES_WRITTEN
             = "HDFS_BYTES_WRITTEN";
-    public static final String HDFS_BYTES_READ 
+    public static final String HDFS_BYTES_READ
             = "HDFS_BYTES_READ";
-    public static final String MULTI_INPUTS_RECORD_COUNTER 
+    public static final String MULTI_INPUTS_RECORD_COUNTER
             = "Input records from ";
-    public static final String MULTI_INPUTS_COUNTER_GROUP 
+    public static final String MULTI_INPUTS_COUNTER_GROUP
             = "MultiInputCounters";
-    
+
     private static final Log LOG = LogFactory.getLog(MRPigStatsUtil.class);
-    
+
     // Restrict total string size of a counter name to 64 characters.
     // Leave 24 characters for prefix string.
     private static final int COUNTER_NAME_LIMIT = 40;
-   
+
     /**
-     * Returns the count for the given counter name in the counter group 
+     * Returns the count for the given counter name in the counter group
      * 'MultiStoreCounters'
-     * 
+     *
      * @param job the MR job
      * @param jobClient the Hadoop job client
      * @param counterName the counter name
@@ -102,37 +104,37 @@ public class MRPigStatsUtil extends PigS
         } catch (IOException e) {
             LOG.warn("Failed to get the counter for " + counterName, e);
         }
-        return value;        
+        return value;
     }
-    
+
     /**
      * Returns the counter name for the given {@link POStore}
-     * 
+     *
      * @param store the POStore
-     * @return the counter name 
+     * @return the counter name
      */
     public static String getMultiStoreCounterName(POStore store) {
         String shortName = getShortName(store.getSFile().getFileName());
-        return (shortName == null) ? null 
+        return (shortName == null) ? null
                 : MULTI_STORE_RECORD_COUNTER + "_" + store.getIndex() + "_" + shortName;
     }
-    
+
     /**
      * Returns the counter name for the given input file name
-     * 
+     *
      * @param fname the input file name
      * @return the counter name
      */
     public static String getMultiInputsCounterName(String fname, int index) {
-        String shortName = getShortName(fname);            
-        return (shortName == null) ? null 
+        String shortName = getShortName(fname);
+        return (shortName == null) ? null
                 : MULTI_INPUTS_RECORD_COUNTER + "_" + index + "_" + shortName;
     }
-    
+
     private static final String SEPARATOR = "/";
     private static final String SEMICOLON = ";";
-    
-    private static String getShortName(String uri) {  
+
+    private static String getShortName(String uri) {
         int scolon = uri.indexOf(SEMICOLON);
         int slash;
         if (scolon!=-1) {
@@ -153,39 +155,39 @@ public class MRPigStatsUtil extends PigS
         }
         return shortName;
     }
-           
+
     /**
      * Starts collecting statistics for the given MR plan
-     * 
+     *
      * @param pc the Pig context
      * @param client the Hadoop job client
      * @param jcc the job compiler
      * @param plan the MR plan
      */
-    public static void startCollection(PigContext pc, JobClient client, 
+    public static void startCollection(PigContext pc, JobClient client,
             JobControlCompiler jcc, MROperPlan plan) {
         SimplePigStats ps = (SimplePigStats)PigStats.start(new SimplePigStats());
-        ps.start(pc, client, jcc, plan);
+        ps.initialize(pc, client, jcc, plan);
 
         MRScriptState.get().emitInitialPlanNotification(plan);
         MRScriptState.get().emitLaunchStartedNotification(plan.size());
     }
-     
+
     /**
      * Stops collecting statistics for a MR plan
-     * 
-     * @param display if true, log collected statistics in the Pig log 
-     *      file at INFO level 
+     *
+     * @param display if true, log collected statistics in the Pig log
+     *      file at INFO level
      */
     public static void stopCollection(boolean display) {
         SimplePigStats ps = (SimplePigStats)PigStats.get();
-        ps.stop();
+        ps.finish();
         if (!ps.isSuccessful()) {
             LOG.error(ps.getNumberFailedJobs() + " map reduce job(s) failed!");
             String errMsg = ps.getErrorMessage();
             if (errMsg != null) {
                 LOG.error("Error message: " + errMsg);
-            }            
+            }
         }
         MRScriptState.get().emitLaunchCompletedNotification(
                 ps.getNumberSuccessfulJobs());
@@ -209,13 +211,13 @@ public class MRPigStatsUtil extends PigS
     public static void displayStatistics() {
         ((SimplePigStats)PigStats.get()).display();
     }
-    
+
     /**
-     * Updates the {@link JobGraph} of the {@link PigStats}. The initial 
-     * {@link JobGraph} is created without job ids using {@link MROperPlan}, 
+     * Updates the {@link JobGraph} of the {@link PigStats}. The initial
+     * {@link JobGraph} is created without job ids using {@link MROperPlan},
      * before any job is submitted for execution. The {@link JobGraph} then
-     * is updated with job ids after jobs are executed. 
-     *  
+     * is updated with job ids after jobs are executed.
+     *
      * @param jobMroMap the map that maps {@link Job}s to {@link MapReduceOper}s
      */
     public static void updateJobMroMap(Map<Job, MapReduceOper> jobMroMap) {
@@ -223,58 +225,62 @@ public class MRPigStatsUtil extends PigS
         for (Map.Entry<Job, MapReduceOper> entry : jobMroMap.entrySet()) {
             MapReduceOper mro = entry.getValue();
             ps.mapMROperToJob(mro, entry.getKey());
-        }        
+        }
     }
-    
+
     /**
      * Updates the statistics after a patch of jobs is done
-     * 
+     *
      * @param jc the job control
      */
     public static void accumulateStats(JobControl jc) {
         SimplePigStats ps = (SimplePigStats)PigStats.get();
         MRScriptState ss = MRScriptState.get();
-        
-        for (Job job : jc.getSuccessfulJobs()) {            
+
+        for (Job job : jc.getSuccessfulJobs()) {
             MRJobStats js = addSuccessJobStats(ps, job);
             if (js != null) {
                 ss.emitjobFinishedNotification(js);
             }
         }
-        
-        for (Job job : jc.getFailedJobs()) {                      
+
+        for (Job job : jc.getFailedJobs()) {
             MRJobStats js = addFailedJobStats(ps, job);
             if (js != null) {
-                js.setErrorMsg(job.getMessage());    
+                js.setErrorMsg(job.getMessage());
                 ss.emitJobFailedNotification(js);
-            } 
+            }
         }
     }
-    
-    
+
+    @Private
     public static void setBackendException(Job job, Exception e) {
-        ((SimplePigStats)PigStats.get()).setBackendException(job, e);
+        JobID jobId = job.getAssignedJobID();
+        if (jobId == null) {
+            return;
+        }
+        PigStats.get().setBackendException(jobId.toString(), e);
     }
-    
+
     private static MRJobStats addFailedJobStats(SimplePigStats ps, Job job) {
         if (ps.isJobSeen(job)) return null;
-        
+
         MRJobStats js = ps.addMRJobStats(job);
         if (js == null) {
-            LOG.warn("unable to add failed job stats");            
-        } else {       
+            LOG.warn("unable to add failed job stats");
+        } else {
             js.setSuccessful(false);
             js.addOutputStatistics();
             js.addInputStatistics();
         }
         return js;
     }
-    
+
     public static MRJobStats addNativeJobStats(PigStats ps, NativeMapReduceOper mr,
             boolean success) {
         return addNativeJobStats(ps, mr, success, null);
     }
-    
+
     public static MRJobStats addNativeJobStats(PigStats ps, NativeMapReduceOper mr,
             boolean success, Exception e) {
         if (ps.isEmbedded()) {
@@ -289,19 +295,19 @@ public class MRPigStatsUtil extends PigS
                 js.setBackendException(e);
         }
         return js;
-    }    
-    
+    }
+
     private static MRJobStats addSuccessJobStats(SimplePigStats ps, Job job) {
         if (ps.isJobSeen(job)) return null;
 
         MRJobStats js = ps.addMRJobStats(job);
         if (js == null) {
             LOG.warn("unable to add job stats");
-        } else {                
+        } else {
             js.setSuccessful(true);
-                           
+
             js.addMapReduceStatistics(ps.getJobClient(), job.getJobConf());
-            
+
             JobClient client = ps.getJobClient();
             RunningJob rjob = null;
             try {
@@ -310,14 +316,14 @@ public class MRPigStatsUtil extends PigS
                 LOG.warn("Failed to get running job", e);
             }
             if (rjob == null) {
-                LOG.warn("Failed to get RunningJob for job " 
-                        + job.getAssignedJobID());           
-            } else {                        
-                js.addCounters(rjob); 
+                LOG.warn("Failed to get RunningJob for job "
+                        + job.getAssignedJobID());
+            } else {
+                js.addCounters(rjob);
             }
-            
+
             js.addOutputStatistics();
-            
+
             js.addInputStatistics();
         }
         return js;

Modified: pig/trunk/src/org/apache/pig/tools/pigstats/mapreduce/SimplePigStats.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/tools/pigstats/mapreduce/SimplePigStats.java?rev=1540855&r1=1540854&r2=1540855&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/tools/pigstats/mapreduce/SimplePigStats.java (original)
+++ pig/trunk/src/org/apache/pig/tools/pigstats/mapreduce/SimplePigStats.java Mon Nov 11 21:50:50 2013
@@ -18,25 +18,20 @@
 package org.apache.pig.tools.pigstats.mapreduce;
 
 import java.text.SimpleDateFormat;
-import java.util.ArrayList;
-import java.util.Collections;
 import java.util.Date;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
-import java.util.Properties;
 import java.util.Set;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapred.JobClient;
 import org.apache.hadoop.mapred.JobID;
 import org.apache.hadoop.mapred.jobcontrol.Job;
 import org.apache.pig.ExecType;
-import org.apache.pig.PigException;
 import org.apache.pig.PigRunner.ReturnCode;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceOper;
@@ -48,45 +43,26 @@ import org.apache.pig.impl.plan.Dependen
 import org.apache.pig.impl.plan.VisitorException;
 import org.apache.pig.tools.pigstats.InputStats;
 import org.apache.pig.tools.pigstats.mapreduce.MRJobStats;
-import org.apache.pig.tools.pigstats.JobStats.JobState;
 import org.apache.pig.tools.pigstats.OutputStats;
 import org.apache.pig.tools.pigstats.PigStats;
-import org.apache.pig.tools.pigstats.ScriptState;
 import org.apache.pig.tools.pigstats.JobStats;
 
 /**
- * SimplePigStats encapsulates the statistics collected from a running script. 
- * It includes status of the execution, the DAG of its MR jobs, as well as 
- * information about outputs and inputs of the script. 
+ * SimplePigStats encapsulates the statistics collected from a running script.
+ * It includes status of the execution, the DAG of its MR jobs, as well as
+ * information about outputs and inputs of the script.
  */
 public final class SimplePigStats extends PigStats {
-    
     private static final Log LOG = LogFactory.getLog(SimplePigStats.class);
-    
-    private static final String DATE_FORMAT = "yyyy-MM-dd HH:mm:ss";  
-        
-    private PigContext pigContext;
-    
+
     private JobClient jobClient;
-    
     private JobControlCompiler jcc;
-    
-    private JobGraph jobPlan;
-    
     private Map<Job, MapReduceOper> jobMroMap;
-     
     private Map<MapReduceOper, MRJobStats> mroJobMap;
-    
+
     // successful jobs so far
     private Set<Job> jobSeen = new HashSet<Job>();
-    
-    private Map<String, OutputStats> aliasOuputMap;
-      
-    private long startTime = -1;
-    private long endTime = -1;
-    
-    private String userId;
-                 
+
     /**
      * This class builds the job DAG from a MR plan
      */
@@ -96,13 +72,13 @@ public final class SimplePigStats extend
             super(plan, new DependencyOrderWalker<MapReduceOper, MROperPlan>(
                     plan));
             jobPlan = new JobGraph();
-            mroJobMap = new HashMap<MapReduceOper, MRJobStats>();        
+            mroJobMap = new HashMap<MapReduceOper, MRJobStats>();
         }
-        
+
         @Override
         public void visitMROp(MapReduceOper mr) throws VisitorException {
             MRJobStats js = new MRJobStats(
-                    mr.getOperatorKey().toString(), jobPlan);            
+                    mr.getOperatorKey().toString(), jobPlan);
             jobPlan.add(js);
             List<MapReduceOper> preds = getPlan().getPredecessors(mr);
             if (preds != null) {
@@ -113,10 +89,10 @@ public final class SimplePigStats extend
                     }
                 }
             }
-            mroJobMap.put(mr, js);            
-        }        
+            mroJobMap.put(mr, js);
+        }
     }
-    
+
     @Override
     public List<String> getAllErrorMessages() {
         throw new UnsupportedOperationException();
@@ -131,83 +107,6 @@ public final class SimplePigStats extend
     public boolean isEmbedded() {
         return false;
     }
-    
-    @Override
-    public boolean isSuccessful() {
-        return (getNumberJobs()==0 && returnCode==ReturnCode.UNKNOWN
-                || returnCode == ReturnCode.SUCCESS);
-    }
- 
-    @Override
-    public Properties getPigProperties() {
-        if (pigContext == null) return null;
-        return pigContext.getProperties();
-    }
-
-    @Override
-    public JobGraph getJobGraph() {
-        return jobPlan;
-    }
- 
-    @Override
-    public List<String> getOutputLocations() {
-        ArrayList<String> locations = new ArrayList<String>();
-        for (OutputStats output : getOutputStats()) {
-            locations.add(output.getLocation());
-        }
-        return Collections.unmodifiableList(locations);
-    }
- 
-    @Override
-    public List<String> getOutputNames() {
-        ArrayList<String> names = new ArrayList<String>();
-        for (OutputStats output : getOutputStats()) {            
-            names.add(output.getName());
-        }
-        return Collections.unmodifiableList(names);
-    }
- 
-    @Override
-    public long getNumberBytes(String location) {
-        if (location == null) return -1;
-        String name = new Path(location).getName();
-        long count = -1;
-        for (OutputStats output : getOutputStats()) {
-            if (name.equals(output.getName())) {
-                count = output.getBytes();
-                break;
-            }
-        }
-        return count;
-    }
-
-    @Override
-    public long getNumberRecords(String location) {
-        if (location == null) return -1;
-        String name = new Path(location).getName();
-        long count = -1;
-        for (OutputStats output : getOutputStats()) {
-            if (name.equals(output.getName())) {
-                count = output.getNumberRecords();
-                break;
-            }
-        }
-        return count;
-    }
- 
-    @Override
-    public String getOutputAlias(String location) {
-        if (location == null) return null;
-        String name = new Path(location).getName();
-        String alias = null;
-        for (OutputStats output : getOutputStats()) {
-            if (name.equals(output.getName())) {
-                alias = output.getAlias();
-                break;
-            }
-        }
-        return alias;
-    }
 
     @Override
     public long getSMMSpillCount() {
@@ -223,149 +122,52 @@ public final class SimplePigStats extend
     public long getProactiveSpillCountObjects() {
         Iterator<JobStats> it = jobPlan.iterator();
         long ret = 0;
-        while (it.hasNext()) {            
+        while (it.hasNext()) {
             ret += ((MRJobStats) it.next()).getProactiveSpillCountObjects();
         }
         return ret;
     }
-    
+
     @Override
     public long getProactiveSpillCountRecords() {
         Iterator<JobStats> it = jobPlan.iterator();
         long ret = 0;
-        while (it.hasNext()) {            
-            ret += ((MRJobStats) it.next()).getProactiveSpillCountRecs();
-        }
-        return ret;
-    }
-    
-    @Override
-    public long getBytesWritten() {
-        Iterator<JobStats> it = jobPlan.iterator();
-        long ret = 0;
-        while (it.hasNext()) {
-            long n = it.next().getBytesWritten();
-            if (n > 0) ret += n;
-        }
-        return ret;
-    }
-    
-    @Override
-    public long getRecordWritten() {
-        Iterator<JobStats> it = jobPlan.iterator();
-        long ret = 0;
         while (it.hasNext()) {
-            long n = it.next().getRecordWrittern();
-            if (n > 0) ret += n;
+            ret += ((MRJobStats) it.next()).getProactiveSpillCountRecs();
         }
         return ret;
     }
-   
-    @Override
-    public String getScriptId() {
-        return ScriptState.get().getId();
-    }
-    
-    @Override
-    public String getFeatures() {
-        return ScriptState.get().getScriptFeatures();
-    }
-    
-    @Override
-    public long getDuration() {
-        return (startTime > 0 && endTime > 0) ? (endTime - startTime) : -1;
-    }
-    
-    @Override
-    public int getNumberJobs() {
-        return jobPlan.size();
-    }
-        
-    @Override
-    public List<OutputStats> getOutputStats() {
-        List<OutputStats> outputs = new ArrayList<OutputStats>();
-        Iterator<JobStats> iter = jobPlan.iterator();
-        while (iter.hasNext()) {
-            for (OutputStats os : iter.next().getOutputs()) {
-                outputs.add(os);
-            }
-        }        
-        return Collections.unmodifiableList(outputs);       
-    }
-    
-    @Override
-    public OutputStats result(String alias) {
-        if (aliasOuputMap == null) {
-            aliasOuputMap = new HashMap<String, OutputStats>();
-            Iterator<JobStats> iter = jobPlan.iterator();
-            while (iter.hasNext()) {
-                for (OutputStats os : iter.next().getOutputs()) {
-                    String a = os.getAlias();
-                    if (a == null || a.length() == 0) {
-                        LOG.warn("Output alias isn't avalable for " + os.getLocation());
-                        continue;
-                    }
-                    aliasOuputMap.put(a, os);
-                }
-            }    
-        }
-        return aliasOuputMap.get(alias);
-    }
-    
-    @Override
-    public List<InputStats> getInputStats() {
-        List<InputStats> inputs = new ArrayList<InputStats>();
-        Iterator<JobStats> iter = jobPlan.iterator();
-        while (iter.hasNext()) {
-            for (InputStats is : iter.next().getInputs()) {
-                inputs.add(is);
-            }
-        }        
-        return Collections.unmodifiableList(inputs);       
-    }
-    
-    public SimplePigStats() {        
-        jobMroMap = new HashMap<Job, MapReduceOper>(); 
+
+    public SimplePigStats() {
+        jobMroMap = new HashMap<Job, MapReduceOper>();
         jobPlan = new JobGraph();
     }
-    
-    void start(PigContext pigContext, JobClient jobClient, 
+
+    void initialize(PigContext pigContext, JobClient jobClient,
             JobControlCompiler jcc, MROperPlan mrPlan) {
-        
+        super.start();
+
         if (pigContext == null || jobClient == null || jcc == null) {
             LOG.warn("invalid params: " + pigContext + jobClient + jcc);
             return;
         }
-        
+
         this.pigContext = pigContext;
         this.jobClient = jobClient;
-        this.jcc = jcc;         
-        
-        // build job DAG with job ids assigned to null 
+        this.jcc = jcc;
+
+        // build job DAG with job ids assigned to null
         try {
             new JobGraphBuilder(mrPlan).visit();
         } catch (VisitorException e) {
             LOG.warn("unable to build job plan", e);
         }
-        
-        startTime = System.currentTimeMillis();
-        userId = System.getProperty("user.name");
-    }
-    
-    void stop() {
-        endTime = System.currentTimeMillis();
-        int m = getNumberSuccessfulJobs();
-        int n = getNumberFailedJobs();
- 
-        if (n == 0 && m > 0 && m == jobPlan.size()) {
-            returnCode = ReturnCode.SUCCESS;
-        } else if (m > 0 && m < jobPlan.size()) {
-            returnCode = ReturnCode.PARTIAL_FAILURE;
-        } else {
-            returnCode = ReturnCode.FAILURE;
-        }
     }
-    
+
+    void finish() {
+        super.stop();
+    }
+
     boolean isInitialized() {
         return startTime > 0;
     }
@@ -374,37 +176,35 @@ public final class SimplePigStats extend
     public JobClient getJobClient() {
         return jobClient;
     }
-    
+
     JobControlCompiler getJobControlCompiler() {
         return jcc;
     }
-        
-    @SuppressWarnings("deprecation")
+
     MRJobStats addMRJobStats(Job job) {
         MapReduceOper mro = jobMroMap.get(job);
-         
+
         if (mro == null) {
             LOG.warn("unable to get MR oper for job: " + job.toString());
             return null;
         }
         MRJobStats js = mroJobMap.get(mro);
-        
+
         JobID jobId = job.getAssignedJobID();
         js.setId(jobId);
         js.setAlias(mro);
         js.setConf(job.getJobConf());
         return js;
     }
-    
-    @SuppressWarnings("deprecation")
-    public MRJobStats addMRJobStatsForNative(NativeMapReduceOper mr) {
+
+    MRJobStats addMRJobStatsForNative(NativeMapReduceOper mr) {
         MRJobStats js = mroJobMap.get(mr);
-        js.setId(new JobID(mr.getJobId(), NativeMapReduceOper.getJobNumber())); 
+        js.setId(new JobID(mr.getJobId(), NativeMapReduceOper.getJobNumber()));
         js.setAlias(mr);
-        
+
         return js;
     }
-            
+
     void display() {
         if (returnCode == ReturnCode.UNKNOWN) {
             LOG.warn("unknown return code, can't display the results");
@@ -414,13 +214,13 @@ public final class SimplePigStats extend
             LOG.warn("unknown exec type, don't display the results");
             return;
         }
- 
+
         // currently counters are not working in local mode - see PIG-1286
         ExecType execType = pigContext.getExecType();
         if (execType.isLocal()) {
             LOG.info("Detected Local mode. Stats reported below may be incomplete");
         }
-        
+
         SimpleDateFormat sdf = new SimpleDateFormat(DATE_FORMAT);
         StringBuilder sb = new StringBuilder();
         sb.append("\nHadoopVersion\tPigVersion\tUserId\tStartedAt\tFinishedAt\tFeatures\n");
@@ -438,9 +238,9 @@ public final class SimplePigStats extend
             sb.append("Failed!\n");
         }
         sb.append("\n");
-                
-        if (returnCode == ReturnCode.SUCCESS 
-                || returnCode == ReturnCode.PARTIAL_FAILURE) {            
+
+        if (returnCode == ReturnCode.SUCCESS
+                || returnCode == ReturnCode.PARTIAL_FAILURE) {
             sb.append("Job Stats (time in seconds):\n");
             if (execType.isLocal()) {
                 sb.append(MRJobStats.SUCCESS_HEADER_LOCAL).append("\n");
@@ -448,7 +248,7 @@ public final class SimplePigStats extend
                 sb.append(MRJobStats.SUCCESS_HEADER).append("\n");
             }
             List<JobStats> arr = jobPlan.getSuccessfulJobs();
-            for (JobStats js : arr) {                
+            for (JobStats js : arr) {
                 sb.append(js.getDisplayString(execType.isLocal()));
             }
             sb.append("\n");
@@ -458,7 +258,7 @@ public final class SimplePigStats extend
             sb.append("Failed Jobs:\n");
             sb.append(MRJobStats.FAILURE_HEADER).append("\n");
             List<JobStats> arr = jobPlan.getFailedJobs();
-            for (JobStats js : arr) {   
+            for (JobStats js : arr) {
                 sb.append(js.getDisplayString(execType.isLocal()));
             }
             sb.append("\n");
@@ -472,24 +272,24 @@ public final class SimplePigStats extend
         for (OutputStats ds : getOutputStats()) {
             sb.append(ds.getDisplayString(execType.isLocal()));
         }
-        
+
         if (!(execType.isLocal())) {
             sb.append("\nCounters:\n");
             sb.append("Total records written : " + getRecordWritten()).append("\n");
             sb.append("Total bytes written : " + getBytesWritten()).append("\n");
             sb.append("Spillable Memory Manager spill count : "
                     + getSMMSpillCount()).append("\n");
-            sb.append("Total bags proactively spilled: " 
+            sb.append("Total bags proactively spilled: "
                     + getProactiveSpillCountObjects()).append("\n");
-            sb.append("Total records proactively spilled: " 
+            sb.append("Total records proactively spilled: "
                     + getProactiveSpillCountRecords()).append("\n");
         }
-        
+
         sb.append("\nJob DAG:\n").append(jobPlan.toString());
-        
+
         LOG.info("Script Statistics: \n" + sb.toString());
     }
-    
+
     void mapMROperToJob(MapReduceOper mro, Job job) {
         if (mro == null) {
             LOG.warn("null MR operator");
@@ -501,55 +301,10 @@ public final class SimplePigStats extend
                 jobMroMap.put(job, mro);
             }
         }
-    }   
-    
-    void setBackendException(Job job, Exception e) {
-        if (e instanceof PigException) {
-            LOG.error("ERROR " + ((PigException)e).getErrorCode() + ": " 
-                    + e.getLocalizedMessage());
-        } else if (e != null) {
-            LOG.error("ERROR: " + e.getLocalizedMessage());
-        }
-        
-        if (job.getAssignedJobID() == null || e == null) {
-            LOG.debug("unable to set backend exception");
-            return;
-        }
-        String id = job.getAssignedJobID().toString();
-        Iterator<JobStats> iter = jobPlan.iterator();
-        while (iter.hasNext()) {
-            JobStats js = iter.next();
-            if (id.equals(js.getJobId())) {
-                js.setBackendException(e);
-                break;
-            }
-        }
-    }
-    
-    PigContext getPigContext() {
-        return pigContext;
-    }
-    
-    int getNumberSuccessfulJobs() {
-        Iterator<JobStats> iter = jobPlan.iterator();
-        int count = 0;
-        while (iter.hasNext()) {
-            if (iter.next().getState() == JobState.SUCCESS) count++; 
-        }
-        return count;
-    }
-    
-    int getNumberFailedJobs() {
-        Iterator<JobStats> iter = jobPlan.iterator();
-        int count = 0;
-        while (iter.hasNext()) {
-            if (iter.next().getState() == JobState.FAILED) count++; 
-        }
-        return count;
     }
-    
+
     boolean isJobSeen(Job job) {
-        return !jobSeen.add(job);    
+        return !jobSeen.add(job);
     }
-    
+
 }

Modified: pig/trunk/test/org/apache/pig/test/TestPigRunner.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestPigRunner.java?rev=1540855&r1=1540854&r2=1540855&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestPigRunner.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestPigRunner.java Mon Nov 11 21:50:50 2013
@@ -582,16 +582,10 @@ public class TestPigRunner {
         String[] args = { "-Dpig.additional.jars=pig-withouthadoop.jar",
                 "-Dmapred.job.queue.name=default",
                 "-e", "A = load '" + INPUT_FILE + "';store A into '" + OUTPUT_FILE + "';\n" };
-        PigStats stats = PigRunner.run(args, new TestNotificationListener());        
+        PigStats stats = PigRunner.run(args, new TestNotificationListener());
 
         Util.deleteFile(cluster, OUTPUT_FILE);
-        
-        java.lang.reflect.Method getPigContext = stats.getClass()
-                .getDeclaredMethod("getPigContext");
-
-        getPigContext.setAccessible(true);
-
-        PigContext ctx = (PigContext) getPigContext.invoke(stats);
+        PigContext ctx = stats.getPigContext();
 
         Assert.assertNotNull(ctx);