You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ch...@apache.org on 2013/08/30 22:04:31 UTC

svn commit: r1519062 [2/5] - in /pig/trunk: ./ src/META-INF/services/ src/org/apache/pig/ src/org/apache/pig/backend/executionengine/ src/org/apache/pig/backend/hadoop/executionengine/ src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/ s...

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java?rev=1519062&r1=1519061&r2=1519062&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java Fri Aug 30 20:04:29 2013
@@ -47,7 +47,6 @@ import org.apache.hadoop.io.WritableComp
 import org.apache.hadoop.mapred.Counters;
 import org.apache.hadoop.mapred.Counters.Counter;
 import org.apache.hadoop.mapred.Counters.Group;
-import org.apache.hadoop.mapred.JobClient;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.JobPriority;
 import org.apache.hadoop.mapred.jobcontrol.Job;
@@ -106,6 +105,7 @@ import org.apache.pig.impl.util.Pair;
 import org.apache.pig.impl.util.UDFContext;
 import org.apache.pig.impl.util.Utils;
 import org.apache.pig.tools.pigstats.ScriptState;
+import org.apache.pig.tools.pigstats.mapreduce.MRScriptState;
 
 /**
  * This is compiler class that takes an MROperPlan and converts
@@ -167,7 +167,7 @@ public class JobControlCompiler{
     private Map<Job, MapReduceOper> jobMroMap;
     private int counterSize;
 
-    public JobControlCompiler(PigContext pigContext, Configuration conf) throws IOException {
+    public JobControlCompiler(PigContext pigContext, Configuration conf) {
         this.pigContext = pigContext;
         this.conf = conf;
         jobStoreMap = new HashMap<Job, Pair<List<POStore>, Path>>();
@@ -270,7 +270,7 @@ public class JobControlCompiler{
         this.plan = plan;
 
         int timeToSleep;
-        String defaultPigJobControlSleep = pigContext.getExecType() == ExecType.LOCAL ? "100" : "5000";
+        String defaultPigJobControlSleep = pigContext.getExecType().isLocal() ? "100" : "5000";
         String pigJobControlSleep = conf.get("pig.jobcontrol.sleep", defaultPigJobControlSleep);
         if (!pigJobControlSleep.equals(defaultPigJobControlSleep)) {
           log.info("overriding default JobControl sleep (" + defaultPigJobControlSleep + ") to " + pigJobControlSleep);
@@ -438,7 +438,7 @@ public class JobControlCompiler{
         // add settings for pig statistics
         String setScriptProp = conf.get(ScriptState.INSERT_ENABLED, "true");
         if (setScriptProp.equalsIgnoreCase("true")) {
-            ScriptState ss = ScriptState.get();
+            MRScriptState ss = MRScriptState.get();
             ss.addSettingsToConf(mro, conf);
         }
 
@@ -501,7 +501,7 @@ public class JobControlCompiler{
                 }
             }
 
-            if (!pigContext.inIllustrator && pigContext.getExecType() != ExecType.LOCAL)
+            if (!pigContext.inIllustrator && ! pigContext.getExecType().isLocal())
             {
 
                 // Setup the DistributedCache for this job
@@ -838,7 +838,7 @@ public class JobControlCompiler{
             }
             // Serialize the UDF specific context info.
             UDFContext.getUDFContext().serialize(conf);
-            Job cjob = new Job(new JobConf(nwJob.getConfiguration()), new ArrayList());
+            Job cjob = new Job(new JobConf(nwJob.getConfiguration()), new ArrayList<Job>());
             jobStoreMap.put(cjob,new Pair<List<POStore>, Path>(storeLocations, tmpLocation));
             return cjob;
 
@@ -1426,7 +1426,7 @@ public class JobControlCompiler{
 
         // XXX Hadoop currently doesn't support distributed cache in local mode.
         // This line will be removed after the support is added by Hadoop team.
-        if (pigContext.getExecType() != ExecType.LOCAL) {
+        if (!pigContext.getExecType().isLocal()) {
             symlink = prefix + "_"
                     + Integer.toString(System.identityHashCode(filename)) + "_"
                     + Long.toString(System.currentTimeMillis());
@@ -1477,6 +1477,7 @@ public class JobControlCompiler{
      * @return the path as seen on distributed cache
      * @throws IOException
      */
+    @SuppressWarnings("deprecation")
     private static void putJarOnClassPathThroughDistributedCache(
             PigContext pigContext,
             Configuration conf,
@@ -1542,7 +1543,7 @@ public class JobControlCompiler{
 
             // XXX Hadoop currently doesn't support distributed cache in local mode.
             // This line will be removed after the support is added
-            if (pigContext.getExecType() == ExecType.LOCAL) return;
+            if (pigContext.getExecType().isLocal()) return;
 
             // set up distributed cache for the replicated files
             FileSpec[] replFiles = join.getReplFiles();
@@ -1583,7 +1584,7 @@ public class JobControlCompiler{
 
             // XXX Hadoop currently doesn't support distributed cache in local mode.
             // This line will be removed after the support is added
-            if (pigContext.getExecType() == ExecType.LOCAL) return;
+            if (pigContext.getExecType().isLocal()) return;
 
             String indexFile = join.getIndexFile();
 
@@ -1607,7 +1608,7 @@ public class JobControlCompiler{
 
             // XXX Hadoop currently doesn't support distributed cache in local mode.
             // This line will be removed after the support is added
-            if (pigContext.getExecType() == ExecType.LOCAL) return;
+            if (pigContext.getExecType().isLocal()) return;
 
             String indexFile = mergeCoGrp.getIndexFileName();
 
@@ -1644,7 +1645,7 @@ public class JobControlCompiler{
 
             // XXX Hadoop currently doesn't support distributed cache in local mode.
             // This line will be removed after the support is added
-            if (pigContext.getExecType() == ExecType.LOCAL) return;
+            if (pigContext.getExecType().isLocal()) return;
 
             // set up distributed cache for files indicated by the UDF
             String[] files = func.getCacheFiles();
@@ -1688,8 +1689,6 @@ public class JobControlCompiler{
                 }
             }
         }
-
-        boolean isReplaced() { return replaced; }
     }
 
 }

Added: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/LocalExecType.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/LocalExecType.java?rev=1519062&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/LocalExecType.java (added)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/LocalExecType.java Fri Aug 30 20:04:29 2013
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.backend.hadoop.executionengine.mapReduceLayer;
+
+import java.util.Properties;
+import org.apache.pig.ExecType;
+import org.apache.pig.backend.executionengine.ExecutionEngine;
+import org.apache.pig.backend.hadoop.executionengine.MRExecutionEngine;
+import org.apache.pig.impl.PigContext;
+
+/**
+ * LocalExecType is the ExecType for local mode in Hadoop Mapreduce.
+ *
+ */
+
+public class LocalExecType implements ExecType {
+
+    private static final String[] modes = { "LOCAL", "MAPREDUCE_LOCAL",
+            "MAPRED_LOCAL" };
+
+    @Override
+    public boolean accepts(Properties properties) {
+        String execTypeSpecified = properties.getProperty("exectype", "")
+                .toUpperCase();
+        for (String mode : modes) {
+            if (execTypeSpecified.equals(mode)) {
+                return true;
+            }
+        }
+        return false;
+    }
+
+    @Override
+    public ExecutionEngine getExecutionEngine(PigContext pigContext) {
+        return new MRExecutionEngine(pigContext);
+    }
+
+    @Override
+    public Class getExecutionEngineClass() {
+        return MRExecutionEngine.class;
+    }
+
+    @Override
+    public boolean isLocal() {
+        return true;
+    }
+
+    @Override
+    public String name() {
+        return "LOCAL";
+    }
+    
+    public String toString() {
+        return name();
+    }
+
+}

Added: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRExecType.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRExecType.java?rev=1519062&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRExecType.java (added)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRExecType.java Fri Aug 30 20:04:29 2013
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.backend.hadoop.executionengine.mapReduceLayer;
+
+import java.util.Properties;
+
+import org.apache.pig.ExecType;
+import org.apache.pig.backend.executionengine.ExecutionEngine;
+import org.apache.pig.backend.hadoop.executionengine.MRExecutionEngine;
+import org.apache.pig.impl.PigContext;
+
+/**
+ * MRExecType is the ExecType for distributed mode in Hadoop Mapreduce.
+ *
+ */
+public class MRExecType implements ExecType {
+
+    private static final long serialVersionUID = 1L;
+    private static final String[] modes = { "MAPREDUCE", "MAPRED" };
+
+    @Override
+    public boolean accepts(Properties properties) {
+        String execTypeSpecified = properties.getProperty("exectype", "")
+                .toUpperCase();
+        for (String mode : modes) {
+            if (execTypeSpecified.equals(mode)) {
+                return true;
+            }
+        }
+        return false;
+    }
+
+    @Override
+    public ExecutionEngine getExecutionEngine(PigContext pigContext) {
+        return new MRExecutionEngine(pigContext);
+    }
+
+    @Override
+    public Class<? extends ExecutionEngine> getExecutionEngineClass() {
+        return MRExecutionEngine.class;
+    }
+
+    @Override
+    public boolean isLocal() {
+        return false;
+    }
+
+    @Override
+    public String name() {
+        return "MAPREDUCE";
+    }
+    
+    public String toString() {
+        return name();
+    }
+}

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java?rev=1519062&r1=1519061&r2=1519062&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java Fri Aug 30 20:04:29 2013
@@ -43,9 +43,11 @@ import org.apache.pig.PigConfiguration;
 import org.apache.pig.PigException;
 import org.apache.pig.PigRunner.ReturnCode;
 import org.apache.pig.PigWarning;
+import org.apache.pig.backend.BackendException;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
-import org.apache.pig.backend.hadoop.executionengine.HExecutionEngine;
+import org.apache.pig.backend.hadoop.executionengine.Launcher;
+import org.apache.pig.backend.hadoop.executionengine.MRExecutionEngine;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRCompiler.LastInputStreamingOptimizer;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.DotMRPrinter;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.EndOfAllInputSetter;
@@ -67,9 +69,11 @@ import org.apache.pig.impl.plan.VisitorE
 import org.apache.pig.impl.util.ConfigurationValidator;
 import org.apache.pig.impl.util.LogUtils;
 import org.apache.pig.impl.util.UDFContext;
+import org.apache.pig.tools.pigstats.OutputStats;
 import org.apache.pig.tools.pigstats.PigStats;
 import org.apache.pig.tools.pigstats.PigStatsUtil;
-import org.apache.pig.tools.pigstats.ScriptState;
+import org.apache.pig.tools.pigstats.mapreduce.MRPigStatsUtil;
+import org.apache.pig.tools.pigstats.mapreduce.MRScriptState;
 
 
 /**
@@ -94,29 +98,39 @@ public class MapReduceLauncher extends L
     
     private JobControl jc=null;
     
-    private class HangingJobKiller extends Thread {
-        public HangingJobKiller() {
-        }
-        @Override
-        public void run() {
-            try {
-                log.debug("Receive kill signal");
-                if (jc!=null) {
-                    for (Job job : jc.getRunningJobs()) {
-                        RunningJob runningJob = job.getJobClient().getJob(job.getAssignedJobID());
-                        if (runningJob!=null)
-                            runningJob.killJob();
-                        log.info("Job " + job.getJobID() + " killed");
-                    }
+    public void kill() {
+        try {
+            log.debug("Receive kill signal");
+            if (jc!=null) {
+                for (Job job : jc.getRunningJobs()) {
+                    RunningJob runningJob = job.getJobClient().getJob(job.getAssignedJobID());
+                    if (runningJob!=null)
+                        runningJob.killJob();
+                    log.info("Job " + job.getJobID() + " killed");
                 }
-            } catch (Exception e) {
-                log.warn("Encounter exception on cleanup:" + e);
             }
+        } catch (Exception e) {
+            log.warn("Encounter exception on cleanup:" + e);
         }
     }
-
-    public MapReduceLauncher() {
-        Runtime.getRuntime().addShutdownHook(new HangingJobKiller());
+    
+    public void killJob(String jobID, JobConf jobConf) throws BackendException {
+        try {
+            if (jobConf != null) {
+                JobClient jc = new JobClient(jobConf);
+                JobID id = JobID.forName(jobID);
+                RunningJob job = jc.getJob(id);
+                if (job == null)
+                    System.out.println("Job with id " + jobID + " is not active");
+                else
+                {
+                    job.killJob();
+                    log.info("Kill " + id + " submitted.");
+                }
+            }        
+        } catch (IOException e) {
+            throw new BackendException(e);
+        }
     }
     
     /**
@@ -150,15 +164,15 @@ public class MapReduceLauncher extends L
         ConfigurationValidator.validatePigProperties(pc.getProperties());
         Configuration conf = ConfigurationUtil.toConfiguration(pc.getProperties());
         
-        HExecutionEngine exe = pc.getExecutionEngine();
+        MRExecutionEngine exe = (MRExecutionEngine) pc.getExecutionEngine();
         JobClient jobClient = new JobClient(exe.getJobConf());
         
         JobControlCompiler jcc = new JobControlCompiler(pc, conf);
 
-        ScriptState.get().addWorkflowAdjacenciesToConf(mrp, conf);
-        
+        MRScriptState.get().addWorkflowAdjacenciesToConf(mrp, conf);
+
         // start collecting statistics
-        PigStatsUtil.startCollection(pc, jobClient, jcc, mrp); 
+        MRPigStatsUtil.startCollection(pc, jobClient, jcc, mrp); 
         
         // Find all the intermediate data stores. The plan will be destroyed during compile/execution
         // so this needs to be done before.
@@ -193,7 +207,7 @@ public class MapReduceLauncher extends L
                     if(mro instanceof NativeMapReduceOper) {
                         NativeMapReduceOper natOp = (NativeMapReduceOper)mro;
                         try {
-                            ScriptState.get().emitJobsSubmittedNotification(1);
+                            MRScriptState.get().emitJobsSubmittedNotification(1);
                             natOp.runJob();
                             numMRJobsCompl++;
                         } catch (IOException e) {
@@ -232,17 +246,17 @@ public class MapReduceLauncher extends L
             List<Job> jobsWithoutIds = jc.getWaitingJobs();
             log.info(jobsWithoutIds.size() +" map-reduce job(s) waiting for submission.");
             //notify listeners about jobs submitted
-            ScriptState.get().emitJobsSubmittedNotification(jobsWithoutIds.size());
+            MRScriptState.get().emitJobsSubmittedNotification(jobsWithoutIds.size());
             
             // update Pig stats' job DAG with just compiled jobs
-            PigStatsUtil.updateJobMroMap(jcc.getJobMroMap());
+            MRPigStatsUtil.updateJobMroMap(jcc.getJobMroMap());
             
             // determine job tracker url 
             String jobTrackerLoc;
             JobConf jobConf = jobsWithoutIds.get(0).getJobConf();
             try {
                 String port = jobConf.get("mapred.job.tracker.http.address");
-                String jobTrackerAdd = jobConf.get(HExecutionEngine.JOB_TRACKER_LOCATION);
+                String jobTrackerAdd = jobConf.get(MRExecutionEngine.JOB_TRACKER_LOCATION);
                 
                 jobTrackerLoc = jobTrackerAdd.substring(0,jobTrackerAdd.indexOf(":")) 
                 + port.substring(port.indexOf(":"));
@@ -305,9 +319,9 @@ public class MapReduceLauncher extends L
                             // display the aliases being processed
                             MapReduceOper mro = jcc.getJobMroMap().get(job);
                             if (mro != null) {
-                                String alias = ScriptState.get().getAlias(mro);
+                                String alias = MRScriptState.get().getAlias(mro);
                                 log.info("Processing aliases " + alias);
-                                String aliasLocation = ScriptState.get().getAliasLocation(mro);
+                                String aliasLocation = MRScriptState.get().getAliasLocation(mro);
                                 log.info("detailed locations: " + aliasLocation);
                             }
     
@@ -318,8 +332,8 @@ public class MapReduceLauncher extends L
                 			}  
     
                             // update statistics for this job so jobId is set
-                            PigStatsUtil.addJobStats(job);
-                			ScriptState.get().emitJobStartedNotification(
+                            MRPigStatsUtil.addJobStats(job);
+                            MRScriptState.get().emitJobStartedNotification(
                                     job.getAssignedJobID().toString());                        
                 		}
                 		else{
@@ -334,7 +348,7 @@ public class MapReduceLauncher extends L
                     }
     
                 	// collect job stats by frequently polling of completed jobs (PIG-1829)
-                	PigStatsUtil.accumulateStats(jc);
+                    MRPigStatsUtil.accumulateStats(jc);
                 	
                     // if stop_on_failure is enabled, we need to stop immediately when any job has failed
                     checkStopOnFailure(stop_on_failure);
@@ -389,7 +403,7 @@ public class MapReduceLauncher extends L
                 succJobs.addAll(jobs);
                             
                 // collecting final statistics
-                PigStatsUtil.accumulateStats(jc);
+                MRPigStatsUtil.accumulateStats(jc);
 
         	}
         	catch (Exception e) {
@@ -400,7 +414,7 @@ public class MapReduceLauncher extends L
         	}
         }
 
-        ScriptState.get().emitProgressUpdatedNotification(100);
+        MRScriptState.get().emitProgressUpdatedNotification(100);
         
         log.info( "100% complete");
              
@@ -429,13 +443,13 @@ public class MapReduceLauncher extends L
                 for (POStore st: sts) {
                     failureMap.put(st.getSFile(), backendException);
                 }
-                PigStatsUtil.setBackendException(fj, backendException);
+                MRPigStatsUtil.setBackendException(fj, backendException);
             }
             failed = true;
         }
         
         // stats collection is done, log the results
-        PigStatsUtil.stopCollection(true); 
+        MRPigStatsUtil.stopCollection(true); 
         
         // PigStatsUtil.stopCollection also computes the return code based on
         // total jobs to run, jobs successful and jobs failed
@@ -488,7 +502,30 @@ public class MapReduceLauncher extends L
                 ? ReturnCode.PARTIAL_FAILURE
                 : ReturnCode.FAILURE)
                 : ReturnCode.SUCCESS; 
-        return PigStatsUtil.getPigStats(ret);
+        
+        PigStats pigStats = PigStatsUtil.getPigStats(ret);
+        // run cleanup for all of the stores
+             for (OutputStats output : pigStats.getOutputStats()) {
+                 POStore store = output.getPOStore();
+                 try {
+                     if (!output.isSuccessful()) {
+                         store.getStoreFunc().cleanupOnFailure(
+                                 store.getSFile().getFileName(),
+                                 new org.apache.hadoop.mapreduce.Job(output.getConf()));
+                     } else {
+                         store.getStoreFunc().cleanupOnSuccess(
+                                 store.getSFile().getFileName(),
+                                 new org.apache.hadoop.mapreduce.Job(output.getConf()));
+                     }
+                 } catch (IOException e) {
+                     throw new ExecException(e);
+                 } catch (AbstractMethodError nsme) {
+                     // Just swallow it.  This means we're running against an
+                     // older instance of a StoreFunc that doesn't implement
+                     // this method.
+                 }
+             }
+        return pigStats;
     }
 
     /**
@@ -534,7 +571,7 @@ public class MapReduceLauncher extends L
             int perCom = (int)(prog * 100);
             if(perCom!=100) {
                 log.info( perCom + "% complete");
-                ScriptState.get().emitProgressUpdatedNotification(perCom);
+                MRScriptState.get().emitProgressUpdatedNotification(perCom);
             }
             return true;
         }

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReducePOStoreImpl.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReducePOStoreImpl.java?rev=1519062&r1=1519061&r2=1519062&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReducePOStoreImpl.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReducePOStoreImpl.java Fri Aug 30 20:04:29 2013
@@ -29,8 +29,7 @@ import org.apache.pig.StoreFuncInterface
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStoreImpl;
 import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims;
-import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil;
-import org.apache.pig.tools.pigstats.PigStatsUtil;
+import org.apache.pig.tools.pigstats.mapreduce.MRPigStatsUtil;
 import org.apache.pig.tools.pigstats.PigStatusReporter;
 /**
  * This class is used to have a POStore write to DFS via a output
@@ -117,8 +116,8 @@ public class MapReducePOStoreImpl extend
     }
     
     public Counter createRecordCounter(POStore store) {
-        String name = PigStatsUtil.getMultiStoreCounterName(store);
+        String name = MRPigStatsUtil.getMultiStoreCounterName(store);
         return (name == null) ? null : reporter.getCounter(
-                PigStatsUtil.MULTI_STORE_COUNTER_GROUP, name); 
+                MRPigStatsUtil.MULTI_STORE_COUNTER_GROUP, name); 
     }
 }

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/NativeMapReduceOper.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/NativeMapReduceOper.java?rev=1519062&r1=1519061&r2=1519062&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/NativeMapReduceOper.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/NativeMapReduceOper.java Fri Aug 30 20:04:29 2013
@@ -24,7 +24,7 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.impl.plan.VisitorException;
 import org.apache.pig.tools.pigstats.PigStats;
-import org.apache.pig.tools.pigstats.PigStatsUtil;
+import org.apache.pig.tools.pigstats.mapreduce.MRPigStatsUtil;
 
 public class NativeMapReduceOper extends MapReduceOper {
     
@@ -77,20 +77,20 @@ public class NativeMapReduceOper extends
         RunJarSecurityManager secMan = new RunJarSecurityManager();
         try {
             RunJar.main(getNativeMRParams());
-            PigStatsUtil.addNativeJobStats(PigStats.get(), this, true);
+            MRPigStatsUtil.addNativeJobStats(PigStats.get(), this, true);
         } catch (SecurityException se) {
             if(secMan.getExitInvoked()) {
                 if(secMan.getExitCode() != 0) {
                     throw new JobCreationException("Native job returned with non-zero return code");
                 }
                 else {
-                    PigStatsUtil.addNativeJobStats(PigStats.get(), this, true);
+                    MRPigStatsUtil.addNativeJobStats(PigStats.get(), this, true);
                 }
             }
         } catch (Throwable t) {
             JobCreationException e = new JobCreationException(
                     "Cannot run native mapreduce job "+ t.getMessage(), t);
-            PigStatsUtil.addNativeJobStats(PigStats.get(), this, false, e);
+            MRPigStatsUtil.addNativeJobStats(PigStats.get(), this, false, e);
             throw e;
         } finally {
             secMan.retire();

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigInputFormat.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigInputFormat.java?rev=1519062&r1=1519061&r2=1519062&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigInputFormat.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigInputFormat.java Fri Aug 30 20:04:29 2013
@@ -240,7 +240,7 @@ public class PigInputFormat extends Inpu
 
                 // if the execution is against Mapred DFS, set
                 // working dir to /user/<userid>
-                if(pigContext.getExecType() == ExecType.MAPREDUCE) {
+                if(!pigContext.getExecType().isLocal()) {
                     fs.setWorkingDirectory(jobcontext.getWorkingDirectory());
                 }
                 

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigRecordReader.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigRecordReader.java?rev=1519062&r1=1519061&r2=1519062&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigRecordReader.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigRecordReader.java Fri Aug 30 20:04:29 2013
@@ -36,7 +36,7 @@ import org.apache.pig.backend.hadoop.dat
 import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.io.FileSpec;
 import org.apache.pig.impl.util.ObjectSerializer;
-import org.apache.pig.tools.pigstats.PigStatsUtil;
+import org.apache.pig.tools.pigstats.mapreduce.MRPigStatsUtil;
 import org.apache.pig.tools.pigstats.PigStatusReporter;
 
 /**
@@ -146,7 +146,7 @@ public class PigRecordReader extends Rec
             PigStatusReporter reporter = PigStatusReporter.getInstance();
             if (reporter != null) {
                 inputRecordCounter = reporter.getCounter(
-                        PigStatsUtil.MULTI_INPUTS_COUNTER_GROUP,
+                        MRPigStatsUtil.MULTI_INPUTS_COUNTER_GROUP,
                         counterName);
                 LOG.info("Created input record counter: " + counterName);
             } else {
@@ -228,7 +228,7 @@ public class PigRecordReader extends Rec
             (ArrayList<FileSpec>) ObjectSerializer.deserialize(
                     conf.get(PigInputFormat.PIG_INPUTS));
         String fname = inputs.get(pigSplit.getInputIndex()).getFileName();
-        return PigStatsUtil.getMultiInputsCounterName(fname, pigSplit.getInputIndex());
+        return MRPigStatsUtil.getMultiInputsCounterName(fname, pigSplit.getInputIndex());
     }
     
     /**

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/WeightedRangePartitioner.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/WeightedRangePartitioner.java?rev=1519062&r1=1519061&r2=1519062&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/WeightedRangePartitioner.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/WeightedRangePartitioner.java Fri Aug 30 20:04:29 2013
@@ -111,7 +111,7 @@ public class WeightedRangePartitioner ex
         try{
             // use local file system to get the quantilesFile
             Configuration conf;
-            if (pigContext.getExecType()==ExecType.MAPREDUCE) {
+            if (!pigContext.getExecType().isLocal()) {
                 conf = new Configuration(true);
             } else {
                 conf = new Configuration(false);

Modified: pig/trunk/src/org/apache/pig/data/SchemaTupleBackend.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/data/SchemaTupleBackend.java?rev=1519062&r1=1519061&r2=1519062&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/data/SchemaTupleBackend.java (original)
+++ pig/trunk/src/org/apache/pig/data/SchemaTupleBackend.java Fri Aug 30 20:04:29 2013
@@ -265,7 +265,7 @@ public class SchemaTupleBackend {
     private static SchemaTupleBackend stb;
 
     public static void initialize(Configuration jConf, PigContext pigContext) throws IOException {
-        initialize(jConf, pigContext, pigContext.getExecType() == ExecType.LOCAL);
+        initialize(jConf, pigContext, pigContext.getExecType().isLocal());
     }
 
     public static void initialize(Configuration jConf, PigContext pigContext, boolean isLocal) throws IOException {

Modified: pig/trunk/src/org/apache/pig/data/SchemaTupleFrontend.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/data/SchemaTupleFrontend.java?rev=1519062&r1=1519061&r2=1519062&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/data/SchemaTupleFrontend.java (original)
+++ pig/trunk/src/org/apache/pig/data/SchemaTupleFrontend.java Fri Aug 30 20:04:29 2013
@@ -110,8 +110,8 @@ public class SchemaTupleFrontend {
          * @param conf
          */
         private void internalCopyAllGeneratedToDistributedCache() {
-            LOG.info("Starting process to move generated code to distributed cache");
-            if (pigContext.getExecType() == ExecType.LOCAL) {
+            LOG.info("Starting process to move generated code to distributed cacche");
+            if (pigContext.getExecType().isLocal()) {
                 String codePath = codeDir.getAbsolutePath();
                 LOG.info("Distributed cache not supported or needed in local mode. Setting key ["
                         + LOCAL_CODE_DIR + "] with code temp directory: " + codePath);

Modified: pig/trunk/src/org/apache/pig/impl/PigContext.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/PigContext.java?rev=1519062&r1=1519061&r2=1519062&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/impl/PigContext.java (original)
+++ pig/trunk/src/org/apache/pig/impl/PigContext.java Fri Aug 30 20:04:29 2013
@@ -51,6 +51,7 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.log4j.Level;
 import org.apache.pig.ExecType;
+import org.apache.pig.ExecTypeProvider;
 import org.apache.pig.FuncSpec;
 import org.apache.pig.Main;
 import org.apache.pig.PigException;
@@ -58,10 +59,9 @@ import org.apache.pig.backend.datastorag
 import org.apache.pig.backend.datastorage.DataStorageException;
 import org.apache.pig.backend.datastorage.ElementDescriptor;
 import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.executionengine.ExecutionEngine;
 import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
 import org.apache.pig.backend.hadoop.datastorage.HDataStorage;
-import org.apache.pig.backend.hadoop.executionengine.HExecutionEngine;
-import org.apache.pig.backend.hadoop.streaming.HadoopExecutableManager;
 import org.apache.pig.impl.streaming.ExecutableManager;
 import org.apache.pig.impl.streaming.StreamingCommand;
 import org.apache.pig.impl.util.JarManager;
@@ -86,8 +86,8 @@ public class PigContext implements Seria
      * and also because some is not serializable e.g. the Configuration)
      */
 
-    //one of: local, mapreduce, pigbody
-    private ExecType execType;;
+    //one of: local, mapreduce, or a custom exec type for a different execution engine
+    private ExecType execType;
 
     //main file system that jobs and shell commands access
     transient private DataStorage dfs;
@@ -96,7 +96,7 @@ public class PigContext implements Seria
     transient private DataStorage lfs;
 
     // handle to the back-end
-    transient private HExecutionEngine executionEngine;
+    transient private ExecutionEngine executionEngine;
 
     private Properties properties;
 
@@ -232,6 +232,14 @@ public class PigContext implements Seria
     public PigContext() {
         this(ExecType.MAPREDUCE, new Properties());
     }
+
+        public PigContext(Configuration conf) throws PigException {
+            this(ConfigurationUtil.toProperties(conf));
+        }
+        
+        public PigContext(Properties properties) throws PigException {
+            this(ExecTypeProvider.selectExecType(properties), properties);
+        }
     
     public PigContext(ExecType execType, Configuration conf) {
         this(execType, ConfigurationUtil.toProperties(conf));
@@ -250,7 +258,7 @@ public class PigContext implements Seria
                 skipJars.add(hadoopJar);
         }
 
-        executionEngine = null;
+        this.executionEngine = execType.getExecutionEngine(this);
 
         // Add the default paths to be skipped for auto-shipping of commands
         skippedShipPaths.add("/bin");
@@ -291,42 +299,28 @@ public class PigContext implements Seria
     }
 
     public void connect() throws ExecException {
-
-        switch (execType) {
-            case LOCAL:
-            case MAPREDUCE:
-            {
-                executionEngine = new HExecutionEngine (this);
-
                 executionEngine.init();
-
                 dfs = executionEngine.getDataStorage();
+                lfs = new HDataStorage(URI.create("file:///"), properties);
+                Runtime.getRuntime().addShutdownHook(new ExecutionEngineKiller());
 
-                lfs = new HDataStorage(URI.create("file:///"),
-                                        properties);
-            }
-            break;
+    }
 
-            default:
-            {
-                int errCode = 2040;
-                String msg = "Unkown exec type: " + execType;
-                throw new ExecException(msg, errCode, PigException.BUG);
+    class ExecutionEngineKiller extends Thread {
+        public ExecutionEngineKiller() {}
+                
+                @Override
+        public void run() {
+            try {
+                executionEngine.kill();
+            } catch (Exception e) {
+                log.warn("Error in killing Execution Engine: " + e);
             }
         }
-
     }
 
     public void setJobtrackerLocation(String newLocation) {
-        Properties trackerLocation = new Properties();
-        trackerLocation.setProperty("mapred.job.tracker", newLocation);
-
-        try {
-            executionEngine.updateConfiguration(trackerLocation);
-        }
-        catch (ExecException e) {
-            log.error("Failed to set tracker at: " + newLocation);
-        }
+        executionEngine.setProperty("mapred.job.tracker", newLocation);
     }
 
     /**
@@ -519,7 +513,7 @@ public class PigContext implements Seria
         srcElement.copy(dstElement, this.properties, false);
     }
 
-    public HExecutionEngine getExecutionEngine() {
+    public ExecutionEngine getExecutionEngine() {
         return executionEngine;
     }
 
@@ -798,24 +792,10 @@ public class PigContext implements Seria
      * @throws ExecException
      */
     public ExecutableManager createExecutableManager() throws ExecException {
-        ExecutableManager executableManager = null;
-
-        switch (execType) {
-            case LOCAL:
-            case MAPREDUCE:
-            {
-                executableManager = new HadoopExecutableManager();
-            }
-            break;
-            default:
-            {
-                int errCode = 2040;
-                String msg = "Unkown exec type: " + execType;
-                throw new ExecException(msg, errCode, PigException.BUG);
-            }
+        if (executionEngine != null) {
+            return executionEngine.getExecutableManager();
         }
-
-        return executableManager;
+        return null;
     }
 
     public FuncSpec getFuncSpecFromAlias(String alias) {

Modified: pig/trunk/src/org/apache/pig/pen/ExampleGenerator.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/pen/ExampleGenerator.java?rev=1519062&r1=1519061&r2=1519062&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/pen/ExampleGenerator.java (original)
+++ pig/trunk/src/org/apache/pig/pen/ExampleGenerator.java Fri Aug 30 20:04:29 2013
@@ -30,10 +30,11 @@ import org.apache.pig.impl.util.Identity
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.executionengine.MRExecutionEngine;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad;
-import org.apache.pig.backend.hadoop.executionengine.HExecutionEngine;
+
 
 import org.apache.pig.data.DataBag;
 import org.apache.pig.data.Tuple;
@@ -66,7 +67,7 @@ public class ExampleGenerator {
 
     PhysicalPlan physPlan;
     PhysicalPlanResetter physPlanReseter;
-    private HExecutionEngine execEngine;
+    private MRExecutionEngine execEngine;
     private LocalMapReduceSimulator localMRRunner;
 
     Log log = LogFactory.getLog(getClass());
@@ -98,7 +99,7 @@ public class ExampleGenerator {
                     + e.getLocalizedMessage());
 
         }
-        execEngine = new HExecutionEngine(pigContext);
+        execEngine = new MRExecutionEngine(pigContext);
         localMRRunner = new LocalMapReduceSimulator();
         poLoadToSchemaMap = new HashMap<POLoad, LogicalSchema>();
     }

Modified: pig/trunk/src/org/apache/pig/tools/ToolsPigServer.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/tools/ToolsPigServer.java?rev=1519062&r1=1519061&r2=1519062&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/tools/ToolsPigServer.java (original)
+++ pig/trunk/src/org/apache/pig/tools/ToolsPigServer.java Fri Aug 30 20:04:29 2013
@@ -29,8 +29,6 @@ import org.apache.pig.ExecType;
 import org.apache.pig.PigServer;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.backend.executionengine.ExecJob;
-import org.apache.pig.backend.hadoop.executionengine.HExecutionEngine;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
 import org.apache.pig.classification.InterfaceAudience;
 import org.apache.pig.classification.InterfaceStability;
 import org.apache.pig.impl.PigContext;
@@ -151,10 +149,7 @@ public class ToolsPigServer extends PigS
      */
     public List<ExecJob> runPlan(LogicalPlan newPlan,
                                  String jobName) throws FrontendException, ExecException {
-
-        HExecutionEngine engine = new HExecutionEngine(pigContext);
-        PhysicalPlan pp = engine.compile(newPlan, null);
-        PigStats stats = launchPlan(pp, jobName);
+        PigStats stats = launchPlan(newPlan, jobName);
         return getJobs(stats);
     }
 

Modified: pig/trunk/src/org/apache/pig/tools/grunt/GruntParser.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/tools/grunt/GruntParser.java?rev=1519062&r1=1519061&r2=1519062&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/tools/grunt/GruntParser.java (original)
+++ pig/trunk/src/org/apache/pig/tools/grunt/GruntParser.java Fri Aug 30 20:04:29 2013
@@ -34,7 +34,6 @@ import java.util.AbstractList;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Date;
-import java.util.Enumeration;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
@@ -47,12 +46,7 @@ import jline.ConsoleReaderInputStream;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FsShell;
-import org.apache.hadoop.mapred.JobClient;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.JobID;
-import org.apache.hadoop.mapred.RunningJob;
 import org.apache.pig.LoadFunc;
 import org.apache.pig.PigServer;
 import org.apache.pig.backend.datastorage.ContainerDescriptor;
@@ -62,7 +56,6 @@ import org.apache.pig.backend.datastorag
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
 import org.apache.pig.backend.hadoop.datastorage.HDataStorage;
-import org.apache.pig.backend.hadoop.executionengine.HExecutionEngine;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.io.FileLocalizer;
@@ -228,15 +221,6 @@ public class GruntParser extends PigScri
         mLfs = mPigServer.getPigContext().getLfs();
         mConf = mPigServer.getPigContext().getProperties();
         shell = new FsShell(ConfigurationUtil.toConfiguration(mConf));
-
-        // TODO: this violates the abstraction layer decoupling between
-        // front end and back end and needs to be changed.
-        // Right now I am not clear on how the Job Id comes from to tell
-        // the back end to kill a given job (mJobClient is used only in
-        // processKill)
-        //
-        HExecutionEngine execEngine = mPigServer.getPigContext().getExecutionEngine();
-        mJobConf = execEngine.getJobConf();
     }
 
     public void setScriptIllustrate() {
@@ -398,7 +382,6 @@ public class GruntParser extends PigScri
 
     protected void explainCurrentBatch(boolean dontPrintOutput) throws IOException {
         PrintStream lp = (dontPrintOutput) ? new NullPrintStream("dummy") : System.out;
-        PrintStream pp = (dontPrintOutput) ? new NullPrintStream("dummy") : System.out;
         PrintStream ep = (dontPrintOutput) ? new NullPrintStream("dummy") : System.out;
 
         if (!(mExplain.mLast && mExplain.mCount == 0)) {
@@ -415,26 +398,24 @@ public class GruntParser extends PigScri
 
             if (file.isDirectory()) {
                 String sCount = (mExplain.mLast && mExplain.mCount == 1)?"":"_"+mExplain.mCount;
-                lp = new PrintStream(new File(file, "logical_plan-"+mExplain.mTime+sCount+"."+mExplain.mFormat));
-                pp = new PrintStream(new File(file, "physical_plan-"+mExplain.mTime+sCount+"."+mExplain.mFormat));
-                ep = new PrintStream(new File(file, "exec_plan-"+mExplain.mTime+sCount+"."+mExplain.mFormat));
+                String suffix = mExplain.mTime+sCount+"."+mExplain.mFormat;
+                lp = new PrintStream(new File(file, "logical_plan-"+suffix));
                 mPigServer.explain(mExplain.mAlias, mExplain.mFormat,
-                                   mExplain.mVerbose, markAsExecuted, lp, pp, ep);
+                                   mExplain.mVerbose, markAsExecuted, lp, null, file, suffix);
                 lp.close();
-                pp.close();
                 ep.close();
             }
             else {
                 boolean append = !(mExplain.mCount==1);
-                lp = pp = ep = new PrintStream(new FileOutputStream(mExplain.mTarget, append));
+                lp = ep = new PrintStream(new FileOutputStream(mExplain.mTarget, append));
                 mPigServer.explain(mExplain.mAlias, mExplain.mFormat,
-                                   mExplain.mVerbose, markAsExecuted, lp, pp, ep);
+                                   mExplain.mVerbose, markAsExecuted, lp, ep, null, null);
                 lp.close();
             }
         }
         else {
             mPigServer.explain(mExplain.mAlias, mExplain.mFormat,
-                               mExplain.mVerbose, markAsExecuted, lp, pp, ep);
+                               mExplain.mVerbose, markAsExecuted, lp, ep, null, null);
         }
     }
 
@@ -603,33 +584,7 @@ public class GruntParser extends PigScri
         }
         else
         {
-            //mPigServer.getPigContext().getProperties().setProperty(key, value);
-            // PIG-2508 properties need to be managed through JobConf
-            // since all other code depends on access to properties,
-            // we need to re-populate from updated JobConf
-            //java.util.HashSet<?> keysBefore = new java.util.HashSet<Object>(mPigServer.getPigContext().getProperties().keySet());
-            // set current properties on jobConf
-            Properties properties = mPigServer.getPigContext().getProperties();
-            Configuration jobConf = mPigServer.getPigContext().getExecutionEngine().getJobConf();
-            Enumeration<Object> propertiesIter = properties.keys();
-            while (propertiesIter.hasMoreElements()) {
-                String pkey = (String) propertiesIter.nextElement();
-                String val = properties.getProperty(pkey);
-                // We do not put user.name, See PIG-1419
-                if (!pkey.equals("user.name"))
-                   jobConf.set(pkey, val);
-            }
-            // set new value, JobConf will handle deprecation etc.
-            jobConf.set(key, value);
-            // re-initialize to reflect updated JobConf
-            properties.clear();
-            Iterator<Map.Entry<String, String>> iter = jobConf.iterator();
-            while (iter.hasNext()) {
-                Map.Entry<String, String> entry = iter.next();
-                properties.put(entry.getKey(), entry.getValue());
-            }
-            //keysBefore.removeAll(mPigServer.getPigContext().getProperties().keySet());
-            //log.info("PIG-2508: keys dropped from properties: " + keysBefore);
+           mPigServer.getPigContext().getExecutionEngine().setProperty(key, value);
         }
     }
 
@@ -812,18 +767,7 @@ public class GruntParser extends PigScri
     @Override
     protected void processKill(String jobid) throws IOException
     {
-        if (mJobConf != null) {
-            JobClient jc = new JobClient(mJobConf);
-            JobID id = JobID.forName(jobid);
-            RunningJob job = jc.getJob(id);
-            if (job == null)
-                System.out.println("Job with id " + jobid + " is not active");
-            else
-            {
-                job.killJob();
-                log.info("Kill " + id + " submitted.");
-            }
-        }
+        mPigServer.getPigContext().getExecutionEngine().killJob(jobid);
     }
 
     @Override
@@ -1292,7 +1236,6 @@ public class GruntParser extends PigScri
     private DataStorage mDfs;
     private DataStorage mLfs;
     private Properties mConf;
-    private JobConf mJobConf;
     private boolean mDone;
     private boolean mLoadOnly;
     private ExplainState mExplain;

Modified: pig/trunk/src/org/apache/pig/tools/pigstats/InputStats.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/tools/pigstats/InputStats.java?rev=1519062&r1=1519061&r2=1519062&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/tools/pigstats/InputStats.java (original)
+++ pig/trunk/src/org/apache/pig/tools/pigstats/InputStats.java Fri Aug 30 20:04:29 2013
@@ -43,7 +43,7 @@ public final class InputStats {
     
     private Configuration conf;
 
-    InputStats(String location, long bytes, long records, boolean success) {
+    public InputStats(String location, long bytes, long records, boolean success) {
         this.location = location;
         this.bytes = bytes;
         this.records = records;        
@@ -84,7 +84,7 @@ public final class InputStats {
         return type;
     }
     
-    String getDisplayString(boolean local) {
+    public String getDisplayString(boolean local) {
         StringBuilder sb = new StringBuilder();
         if (success) {            
             sb.append("Successfully ");
@@ -116,19 +116,19 @@ public final class InputStats {
         return sb.toString();
     }
     
-    void setConf(Configuration conf) {
+    public void setConf(Configuration conf) {
         this.conf = conf;
     }
     
-    void markSampleInput() {
+    public void markSampleInput() {
         type = INPUT_TYPE.sampler;
     }
     
-    void markIndexerInput() {
+    public void markIndexerInput() {
         type = INPUT_TYPE.indexer;
     }
     
-    void markSideFileInput() {
+    public void markSideFileInput() {
         type = INPUT_TYPE.side;
     }
 }

Modified: pig/trunk/src/org/apache/pig/tools/pigstats/JobStats.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/tools/pigstats/JobStats.java?rev=1519062&r1=1519061&r2=1519062&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/tools/pigstats/JobStats.java (original)
+++ pig/trunk/src/org/apache/pig/tools/pigstats/JobStats.java Fri Aug 30 20:04:29 2013
@@ -18,41 +18,18 @@
 
 package org.apache.pig.tools.pigstats;
 
-import java.io.FileNotFoundException;
-import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
-import java.util.HashMap;
-import java.util.Iterator;
 import java.util.List;
-import java.util.Map;
-
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapred.Counters;
-import org.apache.hadoop.mapred.JobClient;
-import org.apache.hadoop.mapred.JobID;
-import org.apache.hadoop.mapred.RunningJob;
-import org.apache.hadoop.mapred.TaskReport;
-import org.apache.hadoop.mapred.Counters.Counter;
-import org.apache.pig.PigCounters;
-import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.FileBasedOutputSizeReader;
-import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler;
-import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceOper;
-import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigStatsOutputSizeReader;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
 import org.apache.pig.classification.InterfaceAudience;
 import org.apache.pig.classification.InterfaceStability;
 import org.apache.pig.newplan.Operator;
 import org.apache.pig.newplan.PlanVisitor;
-import org.apache.pig.impl.PigContext;
-import org.apache.pig.impl.io.FileSpec;
 import org.apache.pig.impl.logicalLayer.FrontendException;
-import org.apache.pig.impl.util.ObjectSerializer;
 import org.apache.pig.tools.pigstats.PigStats.JobGraph;
-import org.apache.pig.tools.pigstats.SimplePigStats.JobGraphPrinter;
 
 /**
  * This class encapsulates the runtime statistics of a MapReduce job. 
@@ -60,131 +37,47 @@ import org.apache.pig.tools.pigstats.Sim
  */
 @InterfaceAudience.Public
 @InterfaceStability.Evolving
-public final class JobStats extends Operator {
+public abstract class JobStats extends Operator {
         
     public static final String ALIAS = "JobStatistics:alias";
     public static final String ALIAS_LOCATION = "JobStatistics:alias_location";
     public static final String FEATURE = "JobStatistics:feature";
-    
-    public static final String SUCCESS_HEADER = "JobId\tMaps\tReduces\t" +
-    		"MaxMapTime\tMinMapTIme\tAvgMapTime\tMedianMapTime\tMaxReduceTime\t" +
-    		"MinReduceTime\tAvgReduceTime\tMedianReducetime\tAlias\tFeature\tOutputs";
-   
-    public static final String FAILURE_HEADER = "JobId\tAlias\tFeature\tMessage\tOutputs";
-    
-    // currently counters are not working in local mode - see PIG-1286
-    public static final String SUCCESS_HEADER_LOCAL = "JobId\tAlias\tFeature\tOutputs";
-    
+        
     private static final Log LOG = LogFactory.getLog(JobStats.class);
+    public static final String SUCCESS_HEADER = null;
+    public static final String FAILURE_HEADER = null;
     
     public static enum JobState { UNKNOWN, SUCCESS, FAILED; }
     
-    private JobState state = JobState.UNKNOWN;
+    protected JobState state = JobState.UNKNOWN;
         
-    private Configuration conf;
-    
-    private List<POStore> mapStores = null;
+    protected ArrayList<OutputStats> outputs;
     
-    private List<POStore> reduceStores = null;
-    
-    private List<FileSpec> loads = null;
-    
-    private ArrayList<OutputStats> outputs;
-    
-    private ArrayList<InputStats> inputs;
+    protected ArrayList<InputStats> inputs;
        
     private String errorMsg;
     
     private Exception exception = null;
-    
-    private Boolean disableCounter = false;
-            
-    @SuppressWarnings("deprecation")
-    private JobID jobId;
-    
-    private long maxMapTime = 0;
-    private long minMapTime = 0;
-    private long avgMapTime = 0;
-    private long medianMapTime = 0;
-    private long maxReduceTime = 0;
-    private long minReduceTime = 0;
-    private long avgReduceTime = 0;
-    private long medianReduceTime = 0;
-
-    private int numberMaps = 0;
-    private int numberReduces = 0;
-    
-    private long mapInputRecords = 0;
-    private long mapOutputRecords = 0;
-    private long reduceInputRecords = 0;
-    private long reduceOutputRecords = 0;
-    private long hdfsBytesWritten = 0;
-    private long hdfsBytesRead = 0;
-    private long spillCount = 0;
-    private long activeSpillCountObj = 0;
-    private long activeSpillCountRecs = 0;
-    
-    private HashMap<String, Long> multiStoreCounters 
-            = new HashMap<String, Long>();
-    
-    private HashMap<String, Long> multiInputCounters 
-            = new HashMap<String, Long>();
-        
-    @SuppressWarnings("deprecation")
-    private Counters counters = null;
-    
-    JobStats(String name, JobGraph plan) {
+               
+    protected JobStats(String name, JobGraph plan) {
         super(name, plan);
         outputs = new ArrayList<OutputStats>();
         inputs = new ArrayList<InputStats>();
     }
 
-    public String getJobId() { 
-        return (jobId == null) ? null : jobId.toString(); 
-    }
+    public abstract String getJobId();
     
     public JobState getState() { return state; }
     
     public boolean isSuccessful() { return (state == JobState.SUCCESS); }
-    
-    public String getErrorMessage() { return errorMsg; }
-    
-    public Exception getException() { return exception; }
-    
-    public int getNumberMaps() { return numberMaps; }
-    
-    public int getNumberReduces() { return numberReduces; }
-    
-    public long getMaxMapTime() { return maxMapTime; }
-    
-    public long getMinMapTime() { return minMapTime; }
-    
-    public long getAvgMapTime() { return avgMapTime; }
-    
-    public long getMaxReduceTime() { return maxReduceTime; }
-    
-    public long getMinReduceTime() { return minReduceTime; }
-    
-    public long getAvgREduceTime() { return avgReduceTime; }           
-        
-    public long getMapInputRecords() { return mapInputRecords; }
-
-    public long getMapOutputRecords() { return mapOutputRecords; }
-
-    public long getReduceOutputRecords() { return reduceOutputRecords; }
 
-    public long getReduceInputRecords() { return reduceInputRecords; }
+    public void setSuccessful(boolean isSuccessful) {
+        this.state = isSuccessful ? JobState.SUCCESS : JobState.FAILED;
+    }
 
-    public long getSMMSpillCount() { return spillCount; }
-    
-    public long getProactiveSpillCountObjects() { return activeSpillCountObj; }
-    
-    public long getProactiveSpillCountRecs() { return activeSpillCountRecs; }
-    
-    public long getHdfsBytesWritten() { return hdfsBytesWritten; }
+    public String getErrorMessage() { return errorMsg; }
     
-    @SuppressWarnings("deprecation")
-    public Counters getHadoopCounters() { return counters; }
+    public Exception getException() { return exception; }
     
     public List<OutputStats> getOutputs() {
         return Collections.unmodifiableList(outputs);
@@ -193,10 +86,6 @@ public final class JobStats extends Oper
     public List<InputStats> getInputs() {
         return Collections.unmodifiableList(inputs);
     }
-
-    public Map<String, Long> getMultiStoreCounters() {
-        return Collections.unmodifiableMap(multiStoreCounters);
-    }
        
     public String getAlias() {
         return (String)getAnnotation(ALIAS);
@@ -237,237 +126,32 @@ public final class JobStats extends Oper
     }
     
     @Override
-    public void accept(PlanVisitor v) throws FrontendException {
-        if (v instanceof JobGraphPrinter) {
-            JobGraphPrinter jpp = (JobGraphPrinter)v;
-            jpp.visit(this);
-        }
-    }
-
+    public abstract void accept(PlanVisitor v) throws FrontendException;
+    
+    
     @Override
     public boolean isEqual(Operator operator) {
         if (!(operator instanceof JobStats)) return false;
         return name.equalsIgnoreCase(operator.getName());
     }    
- 
 
-    @SuppressWarnings("deprecation")
-    void setId(JobID jobId) {
-        this.jobId = jobId;
-    }
-    
-    void setSuccessful(boolean isSuccessful) {
-        this.state = isSuccessful ? JobState.SUCCESS : JobState.FAILED;
-    }
-    
-    void setErrorMsg(String errorMsg) {
+    public void setErrorMsg(String errorMsg) {
         this.errorMsg = errorMsg;
     }
     
-    void setBackendException(Exception e) {
+    public void setBackendException(Exception e) {
         exception = e;
     }
-        
-    @SuppressWarnings("unchecked")
-    void setConf(Configuration conf) {        
-        if (conf == null) return;
-        this.conf = conf;
-        try {
-            this.mapStores = (List<POStore>) ObjectSerializer.deserialize(conf
-                    .get(JobControlCompiler.PIG_MAP_STORES));
-            this.reduceStores = (List<POStore>) ObjectSerializer.deserialize(conf
-                    .get(JobControlCompiler.PIG_REDUCE_STORES));           
-            this.loads = (ArrayList<FileSpec>) ObjectSerializer.deserialize(conf
-                    .get("pig.inputs"));
-            this.disableCounter = conf.getBoolean("pig.disable.counter", false);
-        } catch (IOException e) {
-            LOG.warn("Failed to deserialize the store list", e);
-        }                    
-    }
-    
-    void setMapStat(int size, long max, long min, long avg, long median) {
-        numberMaps = size;
-        maxMapTime = max;
-        minMapTime = min;
-        avgMapTime = avg;    
-        medianMapTime = median;
-    }
-    
-    void setReduceStat(int size, long max, long min, long avg, long median) {
-        numberReduces = size;
-        maxReduceTime = max;
-        minReduceTime = min;
-        avgReduceTime = avg;       
-        medianReduceTime = median;
-    }  
-    
-    String getDisplayString(boolean local) {
-        StringBuilder sb = new StringBuilder();
-        String id = (jobId == null) ? "N/A" : jobId.toString();
-        if (state == JobState.FAILED || local) {           
-            sb.append(id).append("\t")
-                .append(getAlias()).append("\t")
-                .append(getFeature()).append("\t");
-            if (state == JobState.FAILED) {
-                sb.append("Message: ").append(getErrorMessage()).append("\t");
-            }
-        } else if (state == JobState.SUCCESS) {
-            sb.append(id).append("\t")
-                .append(numberMaps).append("\t")
-                .append(numberReduces).append("\t");
-            if (numberMaps == 0) {
-                sb.append("n/a\t").append("n/a\t").append("n/a\t").append("n/a\t");
-            } else { 
-                sb.append(maxMapTime/1000).append("\t")
-                    .append(minMapTime/1000).append("\t")
-                    .append(avgMapTime/1000).append("\t")
-                    .append(medianMapTime/1000).append("\t");
-            }
-            if (numberReduces == 0) {
-                sb.append("n/a\t").append("n/a\t").append("n/a\t").append("n/a\t");
-            } else {
-                sb.append(maxReduceTime/1000).append("\t")
-                    .append(minReduceTime/1000).append("\t")
-                    .append(avgReduceTime/1000).append("\t")
-                    .append(medianReduceTime/1000).append("\t");
-            }
-            sb.append(getAlias()).append("\t")
-                .append(getFeature()).append("\t");
-        }
-        for (OutputStats os : outputs) {
-            sb.append(os.getLocation()).append(",");
-        }        
-        sb.append("\n");
-        return sb.toString();
-    }
-
-    @SuppressWarnings("deprecation")
-    void addCounters(RunningJob rjob) {
-        if (rjob != null) {
-            try {
-                counters = rjob.getCounters();
-            } catch (IOException e) {
-                LOG.warn("Unable to get job counters", e);
-            }
-        }
-        if (counters != null) {
-            Counters.Group taskgroup = counters
-                    .getGroup(PigStatsUtil.TASK_COUNTER_GROUP);
-            Counters.Group hdfsgroup = counters
-                    .getGroup(PigStatsUtil.FS_COUNTER_GROUP);
-            Counters.Group multistoregroup = counters
-                    .getGroup(PigStatsUtil.MULTI_STORE_COUNTER_GROUP);
-            Counters.Group multiloadgroup = counters
-                    .getGroup(PigStatsUtil.MULTI_INPUTS_COUNTER_GROUP);
-
-            mapInputRecords = taskgroup.getCounterForName(
-                    PigStatsUtil.MAP_INPUT_RECORDS).getCounter();
-            mapOutputRecords = taskgroup.getCounterForName(
-                    PigStatsUtil.MAP_OUTPUT_RECORDS).getCounter();
-            reduceInputRecords = taskgroup.getCounterForName(
-                    PigStatsUtil.REDUCE_INPUT_RECORDS).getCounter();
-            reduceOutputRecords = taskgroup.getCounterForName(
-                    PigStatsUtil.REDUCE_OUTPUT_RECORDS).getCounter();
-            hdfsBytesRead = hdfsgroup.getCounterForName(
-                    PigStatsUtil.HDFS_BYTES_READ).getCounter();      
-            hdfsBytesWritten = hdfsgroup.getCounterForName(
-                    PigStatsUtil.HDFS_BYTES_WRITTEN).getCounter();            
-            spillCount = counters.findCounter(
-                    PigCounters.SPILLABLE_MEMORY_MANAGER_SPILL_COUNT)
-                    .getCounter();
-            activeSpillCountObj = counters.findCounter(
-                    PigCounters.PROACTIVE_SPILL_COUNT_BAGS).getCounter();
-            activeSpillCountRecs = counters.findCounter(
-                    PigCounters.PROACTIVE_SPILL_COUNT_RECS).getCounter();
-
-            Iterator<Counter> iter = multistoregroup.iterator();
-            while (iter.hasNext()) {
-                Counter cter = iter.next();
-                multiStoreCounters.put(cter.getName(), cter.getValue());
-            }     
-            
-            Iterator<Counter> iter2 = multiloadgroup.iterator();
-            while (iter2.hasNext()) {
-                Counter cter = iter2.next();
-                multiInputCounters.put(cter.getName(), cter.getValue());
-            } 
-            
-        }              
-    }
-    
-    void addMapReduceStatistics(JobClient client, Configuration conf) {
-        TaskReport[] maps = null;
-        try {
-            maps = client.getMapTaskReports(jobId);
-        } catch (IOException e) {
-            LOG.warn("Failed to get map task report", e);            
-        }
-        if (maps != null && maps.length > 0) {
-            int size = maps.length;
-            long max = 0;
-            long min = Long.MAX_VALUE;
-            long median = 0;
-            long total = 0;
-            long durations[] = new long[size];
-            
-            for (int i = 0; i < maps.length; i++) {
-            	TaskReport rpt = maps[i];
-                long duration = rpt.getFinishTime() - rpt.getStartTime();
-                durations[i] = duration;
-                max = (duration > max) ? duration : max;
-                min = (duration < min) ? duration : min;
-                total += duration;
-            }
-            long avg = total / size;
-            
-            median = calculateMedianValue(durations);
-            setMapStat(size, max, min, avg, median);
-        } else {
-            int m = conf.getInt("mapred.map.tasks", 1);
-            if (m > 0) {
-                setMapStat(m, -1, -1, -1, -1);
-            }
-        }
-        
-        TaskReport[] reduces = null;
-        try {
-            reduces = client.getReduceTaskReports(jobId);
-        } catch (IOException e) {
-            LOG.warn("Failed to get reduce task report", e);
-        }
-        if (reduces != null && reduces.length > 0) {
-            int size = reduces.length;
-            long max = 0;
-            long min = Long.MAX_VALUE;
-            long median = 0;
-            long total = 0;
-            long durations[] = new long[size];
-            
-            for (int i = 0; i < reduces.length; i++) {
-            	TaskReport rpt = reduces[i];
-                long duration = rpt.getFinishTime() - rpt.getStartTime();
-                durations[i] = duration;
-                max = (duration > max) ? duration : max;
-                min = (duration < min) ? duration : min;
-                total += duration;
-            }
-            long avg = total / size;
-            median = calculateMedianValue(durations);
-            setReduceStat(size, max, min, avg, median);
-        } else {
-            int m = conf.getInt("mapred.reduce.tasks", 1);
-            if (m > 0) {
-                setReduceStat(m, -1, -1, -1, -1);
-            }
-        }
-    }
+       
+    public abstract String getDisplayString(boolean isLocal);
 
+    
     /**
      * Calculate the median value from the given array
      * @param durations
      * @return median value
      */
-	private long calculateMedianValue(long[] durations) {
+    protected long calculateMedianValue(long[] durations) {
 		long median;
 		// figure out the median
 		Arrays.sort(durations);
@@ -482,150 +166,11 @@ public final class JobStats extends Oper
 		return median;
 	}
     
-    void setAlias(MapReduceOper mro) {       
-        annotate(ALIAS, ScriptState.get().getAlias(mro));             
-        annotate(ALIAS_LOCATION, ScriptState.get().getAliasLocation(mro));
-        annotate(FEATURE, ScriptState.get().getPigFeature(mro));
-    }
-    
-    void addOutputStatistics() {
-        if (mapStores == null || reduceStores == null) {
-            LOG.warn("unable to get stores of the job");
-            return;
-        }
-        
-        if (mapStores.size() + reduceStores.size() == 1) {
-            POStore sto = (mapStores.size() > 0) ? mapStores.get(0)
-                    : reduceStores.get(0);
-            if (!sto.isTmpStore()) {
-                long records = (mapStores.size() > 0) ? mapOutputRecords
-                        : reduceOutputRecords;           
-                OutputStats ds = new OutputStats(sto.getSFile().getFileName(),
-                        hdfsBytesWritten, records, (state == JobState.SUCCESS));
-                ds.setPOStore(sto);
-                ds.setConf(conf);
-                outputs.add(ds);
-                
-                if (state == JobState.SUCCESS) {
-                    ScriptState.get().emitOutputCompletedNotification(ds);
-                }
-            }
-        } else {
-            for (POStore sto : mapStores) {
-                if (sto.isTmpStore()) continue;
-                addOneOutputStats(sto);
-            }
-            for (POStore sto : reduceStores) {
-                if (sto.isTmpStore()) continue;
-                addOneOutputStats(sto);
-            }     
-        }
-    }
-    
-    /**
-     * Looks up the output size reader from OUTPUT_SIZE_READER_KEY and invokes
-     * it to get the size of output. If OUTPUT_SIZE_READER_KEY is not set,
-     * defaults to FileBasedOutputSizeReader.
-     * @param sto POStore
-     * @param conf configuration
-     */
-    static long getOutputSize(POStore sto, Configuration conf) {
-        PigStatsOutputSizeReader reader = null;
-        String readerNames = conf.get(
-                PigStatsOutputSizeReader.OUTPUT_SIZE_READER_KEY,
-                FileBasedOutputSizeReader.class.getCanonicalName());
-
-        for (String className : readerNames.split(",")) {
-            reader = (PigStatsOutputSizeReader) PigContext.instantiateFuncFromSpec(className);
-            if (reader.supports(sto)) {
-                LOG.info("using output size reader: " + className);
-                try {
-                    return reader.getOutputSize(sto, conf);
-                } catch (FileNotFoundException e) {
-                    LOG.warn("unable to find the output file", e);
-                    return -1;
-                } catch (IOException e) {
-                    LOG.warn("unable to get byte written of the job", e);
-                    return -1;
-                }
-            }
-        }
-
-        LOG.warn("unable to find an output size reader");
-        return -1;
-    }
-
-    private void addOneOutputStats(POStore sto) {
-        long records = -1;
-        if (sto.isMultiStore()) {
-            Long n = multiStoreCounters.get(PigStatsUtil.getMultiStoreCounterName(sto));
-            if (n != null) records = n;
-        } else {
-            records = mapOutputRecords;
-        }
-
-        long bytes = getOutputSize(sto, conf);
-        String location = sto.getSFile().getFileName();
-        OutputStats ds = new OutputStats(location, bytes, records,
-                (state == JobState.SUCCESS));  
-        ds.setPOStore(sto);
-        ds.setConf(conf);
-        outputs.add(ds);
-        
-        if (state == JobState.SUCCESS) {
-            ScriptState.get().emitOutputCompletedNotification(ds);
-        }
-    }
-       
-    void addInputStatistics() {
-        if (loads == null)  {
-            LOG.warn("unable to get inputs of the job");
-            return;
-        }
-        
-        if (loads.size() == 1) {
-            FileSpec fsp = loads.get(0); 
-            if (!PigStatsUtil.isTempFile(fsp.getFileName())) {
-                long records = mapInputRecords;       
-                InputStats is = new InputStats(fsp.getFileName(),
-                        hdfsBytesRead, records, (state == JobState.SUCCESS));              
-                is.setConf(conf);
-                if (isSampler()) is.markSampleInput();
-                if (isIndexer()) is.markIndexerInput();
-                inputs.add(is);                
-            }
-        } else {
-            for (int i=0; i<loads.size(); i++) {
-                FileSpec fsp = loads.get(i);
-                if (PigStatsUtil.isTempFile(fsp.getFileName())) continue;
-                addOneInputStats(fsp.getFileName(), i);
-            }
-        }            
-    }
-    
-    private void addOneInputStats(String fileName, int index) {
-        long records = -1;
-        Long n = multiInputCounters.get(
-                PigStatsUtil.getMultiInputsCounterName(fileName, index));
-        if (n != null) {   
-            records = n;
-        } else {
-            // the file could be empty
-            if (!disableCounter) records = 0;
-            else {
-                LOG.warn("unable to get input counter for " + fileName);
-            }
-        }
-        InputStats is = new InputStats(fileName, -1, records, (state == JobState.SUCCESS));              
-        is.setConf(conf);
-        inputs.add(is);
-    }
-    
-    private boolean isSampler() {
+    public boolean isSampler() {
         return getFeature().contains(ScriptState.PIG_FEATURE.SAMPLER.name());
     }
     
-    private boolean isIndexer() {
+    public boolean isIndexer() {
         return getFeature().contains(ScriptState.PIG_FEATURE.INDEXER.name());
     }
     

Modified: pig/trunk/src/org/apache/pig/tools/pigstats/OutputStats.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/tools/pigstats/OutputStats.java?rev=1519062&r1=1519061&r2=1519062&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/tools/pigstats/OutputStats.java (original)
+++ pig/trunk/src/org/apache/pig/tools/pigstats/OutputStats.java Fri Aug 30 20:04:29 2013
@@ -57,7 +57,7 @@ public final class OutputStats {
 
     private static final Log LOG = LogFactory.getLog(OutputStats.class);
     
-    OutputStats(String location, long bytes, long records, boolean success) {
+    public OutputStats(String location, long bytes, long records, boolean success) {
         this.location = location;
         this.bytes = bytes;
         this.records = records;        
@@ -107,7 +107,7 @@ public final class OutputStats {
         return conf;
     }
     
-    String getDisplayString(boolean local) {
+    public String getDisplayString(boolean local) {
         StringBuilder sb = new StringBuilder();
         if (success) {
             sb.append("Successfully stored ");
@@ -127,11 +127,11 @@ public final class OutputStats {
         return sb.toString();
     }
 
-    void setPOStore(POStore store) {
+    public void setPOStore(POStore store) {
         this.store = store;
     }
     
-    void setConf(Configuration conf) {
+    public void setConf(Configuration conf) {
         this.conf = conf;
     }
     

Modified: pig/trunk/src/org/apache/pig/tools/pigstats/PigStats.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/tools/pigstats/PigStats.java?rev=1519062&r1=1519061&r2=1519062&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/tools/pigstats/PigStats.java (original)
+++ pig/trunk/src/org/apache/pig/tools/pigstats/PigStats.java Fri Aug 30 20:04:29 2013
@@ -38,8 +38,9 @@ import org.apache.pig.impl.util.Spillabl
 import org.apache.pig.newplan.BaseOperatorPlan;
 import org.apache.pig.newplan.Operator;
 import org.apache.pig.newplan.OperatorPlan;
+import org.apache.pig.newplan.PlanVisitor;
 import org.apache.pig.tools.pigstats.JobStats.JobState;
-import org.apache.pig.tools.pigstats.SimplePigStats.JobGraphPrinter;
+import org.apache.pig.tools.pigstats.mapreduce.SimplePigStats;
 
 /**
  * PigStats encapsulates the statistics collected from a running script. 
@@ -61,7 +62,10 @@ public abstract class PigStats {
     private int errorCode = -1;
     
     public static PigStats get() {
-        if (tps.get() == null) tps.set(new SimplePigStats());
+        if (tps.get() == null) {
+            LOG.info("PigStats has not been set. Defaulting to SimplePigStats");
+            tps.set(new SimplePigStats());
+        }
         return tps.get();
     }
     
@@ -69,8 +73,8 @@ public abstract class PigStats {
         tps.set(stats);
     }
         
-    static PigStats start() {
-        tps.set(new SimplePigStats());
+    public static PigStats start(PigStats stats) {
+        tps.set(stats);
         return tps.get();
     }
     
@@ -201,6 +205,41 @@ public abstract class PigStats {
     void setErrorCode(int errorCode) {
         this.errorCode = errorCode;
     } 
+
+    
+    /**
+     * This class prints a JobGraph
+     */
+    public static class JobGraphPrinter extends PlanVisitor {
+        
+        StringBuffer buf;
+
+        protected JobGraphPrinter(OperatorPlan plan) {
+            super(plan,
+                    new org.apache.pig.newplan.DependencyOrderWalker(
+                            plan));
+            buf = new StringBuffer();
+        }
+        
+        public void visit(JobStats op) throws FrontendException {
+            buf.append(op.getJobId());
+            List<Operator> succs = plan.getSuccessors(op);
+            if (succs != null) {
+                buf.append("\t->\t");
+                for (Operator p : succs) {                  
+                    buf.append(((JobStats)p).getJobId()).append(",");
+                }               
+            }
+            buf.append("\n");
+        }
+        
+        @Override
+        public String toString() {
+            buf.append("\n");
+            return buf.toString();
+        }        
+    }
+    
     /**
      * JobGraph is an {@link OperatorPlan} whose members are {@link JobStats}
      */