You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ch...@apache.org on 2014/04/28 23:59:43 UTC

svn commit: r1590819 - in /pig/trunk: ./ shims/src/hadoop20/org/apache/pig/backend/hadoop/executionengine/shims/ shims/src/hadoop23/org/apache/pig/backend/hadoop/executionengine/shims/ src/org/apache/pig/ src/org/apache/pig/backend/hadoop/executionengi...

Author: cheolsoo
Date: Mon Apr 28 21:59:43 2014
New Revision: 1590819

URL: http://svn.apache.org/r1590819
Log:
PIG-3860: Refactor PigStatusReporter and PigLogger for non-MR execution engine (cheolsoo)

Added:
    pig/trunk/shims/src/hadoop20/org/apache/pig/backend/hadoop/executionengine/shims/TaskContext.java
    pig/trunk/shims/src/hadoop23/org/apache/pig/backend/hadoop/executionengine/shims/TaskContext.java
Modified:
    pig/trunk/CHANGES.txt
    pig/trunk/src/org/apache/pig/TypedOutputEvalFunc.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/fetch/FetchLauncher.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReducePOStoreImpl.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigCombiner.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapBase.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapReduce.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigHadoopLogger.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduceCounter.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigRecordReader.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PigLogger.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/util/MonitoredUDFExecutor.java
    pig/trunk/src/org/apache/pig/tools/counters/PigCounterHelper.java
    pig/trunk/src/org/apache/pig/tools/pigstats/PigStatusReporter.java

Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1590819&r1=1590818&r2=1590819&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Mon Apr 28 21:59:43 2014
@@ -32,6 +32,8 @@ PIG-2207: Support custom counters for ag
 
 IMPROVEMENTS
 
+PIG-3860: Refactor PigStatusReporter and PigLogger for non-MR execution engine (cheolsoo)
+
 PIG-3865: Remodel the XMLLoader to work to be faster and more maintainable (aseldawy via daijy)
 
 PIG-3737: Bundle dependent jars in distribution in %PIG_HOME%/lib folder (daijy)

Added: pig/trunk/shims/src/hadoop20/org/apache/pig/backend/hadoop/executionengine/shims/TaskContext.java
URL: http://svn.apache.org/viewvc/pig/trunk/shims/src/hadoop20/org/apache/pig/backend/hadoop/executionengine/shims/TaskContext.java?rev=1590819&view=auto
==============================================================================
--- pig/trunk/shims/src/hadoop20/org/apache/pig/backend/hadoop/executionengine/shims/TaskContext.java (added)
+++ pig/trunk/shims/src/hadoop20/org/apache/pig/backend/hadoop/executionengine/shims/TaskContext.java Mon Apr 28 21:59:43 2014
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.shims;
+
+import org.apache.hadoop.mapreduce.Counter;
+import org.apache.hadoop.mapreduce.TaskInputOutputContext;
+import org.apache.pig.backend.hadoop.executionengine.fetch.FetchContext;
+
+public class TaskContext<T> {
+    private T context;
+
+    public TaskContext(T context) {
+        this.context = context;
+    }
+
+    public T get() {
+        return context;
+    }
+
+    public Counter getCounter(Enum<?> name) {
+        Counter counter = null;
+        if (context == null) {
+            return counter;
+        }
+        if (context instanceof TaskInputOutputContext) {
+            TaskInputOutputContext<?,?,?,?> taskContext = (TaskInputOutputContext<?,?,?,?>) context;
+            counter = taskContext.getCounter(name);
+        } else if (context instanceof FetchContext) {
+            FetchContext fetchContext = (FetchContext) context;
+            counter = fetchContext.getCounter(name);
+        }
+        return counter;
+    }
+
+    public Counter getCounter(String group, String name) {
+        Counter counter = null;
+        if (context == null) {
+            return counter;
+        }
+        if (context instanceof TaskInputOutputContext) {
+            TaskInputOutputContext<?,?,?,?> taskContext = (TaskInputOutputContext<?,?,?,?>) context;
+            counter = taskContext.getCounter(group, name);
+        } else if (context instanceof FetchContext) {
+            FetchContext fetchContext = (FetchContext) context;
+            counter = fetchContext.getCounter(group, name);
+        }
+        return counter;
+    }
+
+    public boolean incrCounter(Enum<?> name, long delta) {
+        if (context == null) {
+            return false;
+        }
+        if (context instanceof TaskInputOutputContext) {
+            TaskInputOutputContext<?,?,?,?> taskContext = (TaskInputOutputContext<?,?,?,?>) context;
+            Counter counter = taskContext.getCounter(name);
+            counter.increment(delta);
+            return true;
+        } else if (context instanceof FetchContext) {
+            FetchContext fetchContext = (FetchContext) context;
+            Counter counter = fetchContext.getCounter(name);
+            counter.increment(delta);
+            return true;
+        }
+        return false;
+    }
+
+    public boolean incrCounter(String group, String name, long delta) {
+        if (context == null) {
+            return false;
+        }
+        if (context instanceof TaskInputOutputContext) {
+            TaskInputOutputContext<?,?,?,?> taskContext = (TaskInputOutputContext<?,?,?,?>) context;
+            Counter counter = taskContext.getCounter(group, name);
+            counter.increment(delta);
+            return true;
+        } else if (context instanceof FetchContext) {
+            FetchContext fetchContext = (FetchContext) context;
+            Counter counter = fetchContext.getCounter(group, name);
+            counter.increment(delta);
+            return true;
+        }
+        return false;
+    }
+
+    public void progress() {
+        if (context == null) {
+            return;
+        }
+        if (context instanceof TaskInputOutputContext) {
+            TaskInputOutputContext<?,?,?,?> taskContext = (TaskInputOutputContext<?,?,?,?>) context;
+            taskContext.progress();
+        }
+    }
+
+    public float getProgress() {
+        return 0f;
+    }
+
+    public void setStatus(String status) {
+        if (context == null) {
+            return;
+        }
+        if (context instanceof TaskInputOutputContext) {
+            TaskInputOutputContext<?,?,?,?> taskContext = (TaskInputOutputContext<?,?,?,?>) context;
+            taskContext.setStatus(status);
+        }
+    }
+}

Added: pig/trunk/shims/src/hadoop23/org/apache/pig/backend/hadoop/executionengine/shims/TaskContext.java
URL: http://svn.apache.org/viewvc/pig/trunk/shims/src/hadoop23/org/apache/pig/backend/hadoop/executionengine/shims/TaskContext.java?rev=1590819&view=auto
==============================================================================
--- pig/trunk/shims/src/hadoop23/org/apache/pig/backend/hadoop/executionengine/shims/TaskContext.java (added)
+++ pig/trunk/shims/src/hadoop23/org/apache/pig/backend/hadoop/executionengine/shims/TaskContext.java Mon Apr 28 21:59:43 2014
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.shims;
+
+import org.apache.hadoop.mapreduce.Counter;
+import org.apache.hadoop.mapreduce.TaskInputOutputContext;
+import org.apache.pig.backend.hadoop.executionengine.fetch.FetchContext;
+import org.apache.tez.common.counters.TezCounter;
+import org.apache.tez.runtime.api.TezProcessorContext;
+
+public class TaskContext<T> {
+    private T context;
+
+    public TaskContext(T context) {
+        this.context = context;
+    }
+
+    public T get() {
+        return context;
+    }
+
+    public Counter getCounter(Enum<?> name) {
+        Counter counter = null;
+        if (context == null) {
+            return counter;
+        }
+        if (context instanceof TaskInputOutputContext) {
+            TaskInputOutputContext<?,?,?,?> taskContext = (TaskInputOutputContext<?,?,?,?>) context;
+            counter = taskContext.getCounter(name);
+        } else if (context instanceof FetchContext) {
+            FetchContext fetchContext = (FetchContext) context;
+            counter = fetchContext.getCounter(name);
+        }
+        return counter;
+    }
+
+    public Counter getCounter(String group, String name) {
+        Counter counter = null;
+        if (context == null) {
+            return counter;
+        }
+        if (context instanceof TaskInputOutputContext) {
+            TaskInputOutputContext<?,?,?,?> taskContext = (TaskInputOutputContext<?,?,?,?>) context;
+            counter = taskContext.getCounter(group, name);
+        } else if (context instanceof FetchContext) {
+            FetchContext fetchContext = (FetchContext) context;
+            counter = fetchContext.getCounter(group, name);
+        }
+        return counter;
+    }
+
+    public boolean incrCounter(Enum<?> name, long delta) {
+        if (context == null) {
+            return false;
+        }
+        if (context instanceof TaskInputOutputContext) {
+            TaskInputOutputContext<?,?,?,?> taskContext = (TaskInputOutputContext<?,?,?,?>) context;
+            Counter counter = taskContext.getCounter(name);
+            counter.increment(delta);
+            return true;
+        } else if (context instanceof FetchContext) {
+            FetchContext fetchContext = (FetchContext) context;
+            Counter counter = fetchContext.getCounter(name);
+            counter.increment(delta);
+            return true;
+        } else if (context instanceof TezProcessorContext) {
+            TezProcessorContext tezContext = (TezProcessorContext) context;
+            TezCounter counter = tezContext.getCounters().findCounter(name);
+            counter.increment(delta);
+            return true;
+        }
+        return false;
+    }
+
+    public boolean incrCounter(String group, String name, long delta) {
+        if (context == null) {
+            return false;
+        }
+        if (context instanceof TaskInputOutputContext) {
+            TaskInputOutputContext<?,?,?,?> taskContext = (TaskInputOutputContext<?,?,?,?>) context;
+            Counter counter = taskContext.getCounter(group, name);
+            counter.increment(delta);
+            return true;
+        } else if (context instanceof FetchContext) {
+            FetchContext fetchContext = (FetchContext) context;
+            Counter counter = fetchContext.getCounter(group, name);
+            counter.increment(delta);
+            return true;
+        } else if (context instanceof TezProcessorContext) {
+            TezProcessorContext tezContext = (TezProcessorContext) context;
+            TezCounter counter = tezContext.getCounters().getGroup(group).findCounter(name);
+            counter.increment(delta);
+            return true;
+        }
+        return false;
+    }
+
+    public void progress() {
+        if (context == null) {
+            return;
+        }
+        if (context instanceof TaskInputOutputContext) {
+            TaskInputOutputContext<?,?,?,?> taskContext = (TaskInputOutputContext<?,?,?,?>) context;
+            taskContext.progress();
+        }
+    }
+
+    public float getProgress() {
+        if (context == null) {
+            return 0f;
+        }
+        if (context instanceof TaskInputOutputContext) {
+            TaskInputOutputContext<?,?,?,?> taskContext = (TaskInputOutputContext<?,?,?,?>) context;
+            return taskContext.getProgress();
+        }
+        return 0f;
+    }
+
+    public void setStatus(String status) {
+        if (context == null) {
+            return;
+        }
+        if (context instanceof TaskInputOutputContext) {
+            TaskInputOutputContext<?,?,?,?> taskContext = (TaskInputOutputContext<?,?,?,?>) context;
+            taskContext.setStatus(status);
+        }
+    }
+}

Modified: pig/trunk/src/org/apache/pig/TypedOutputEvalFunc.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/TypedOutputEvalFunc.java?rev=1590819&r1=1590818&r2=1590819&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/TypedOutputEvalFunc.java (original)
+++ pig/trunk/src/org/apache/pig/TypedOutputEvalFunc.java Mon Apr 28 21:59:43 2014
@@ -18,7 +18,6 @@
 package org.apache.pig;
 
 import com.google.common.collect.Maps;
-import org.apache.hadoop.mapreduce.Counter;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.tools.pigstats.PigStatusReporter;
 
@@ -161,9 +160,6 @@ public abstract class TypedOutputEvalFun
     }
 
     protected static void safeIncrCounter(String group, String name, Long increment) {
-        Counter counter = PigStatusReporter.getInstance().getCounter(group, name);
-        if (counter != null) {
-            counter.increment(increment);
-        }
+        PigStatusReporter.getInstance().incrCounter(group, name, increment);
     }
 }

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/fetch/FetchLauncher.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/fetch/FetchLauncher.java?rev=1590819&r1=1590818&r2=1590819&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/fetch/FetchLauncher.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/fetch/FetchLauncher.java Mon Apr 28 21:59:43 2014
@@ -36,6 +36,7 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStream;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;
 import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims;
+import org.apache.pig.backend.hadoop.executionengine.shims.TaskContext;
 import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil;
 import org.apache.pig.data.SchemaTupleBackend;
 import org.apache.pig.impl.PigContext;
@@ -116,7 +117,6 @@ public class FetchLauncher {
     }
 
     private void init(PhysicalPlan pp, POStore poStore) throws IOException {
-
         poStore.setStoreImpl(new FetchPOStoreImpl(pigContext));
         poStore.setUp();
 
@@ -147,10 +147,11 @@ public class FetchLauncher {
         }
 
         boolean aggregateWarning = "true".equalsIgnoreCase(conf.get("aggregate.warning"));
+        PigStatusReporter pigStatusReporter = PigStatusReporter.getInstance();
+        pigStatusReporter.setContext(new TaskContext<FetchContext>(new FetchContext()));
         PigHadoopLogger pigHadoopLogger = PigHadoopLogger.getInstance();
+        pigHadoopLogger.setReporter(pigStatusReporter);
         pigHadoopLogger.setAggregate(aggregateWarning);
-        PigStatusReporter.getInstance().setFetchContext(new FetchContext());
-        pigHadoopLogger.setReporter(PigStatusReporter.getInstance());
         PhysicalOperator.setPigLogger(pigHadoopLogger);
     }
 

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReducePOStoreImpl.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReducePOStoreImpl.java?rev=1590819&r1=1590818&r2=1590819&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReducePOStoreImpl.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReducePOStoreImpl.java Mon Apr 28 21:59:43 2014
@@ -29,6 +29,7 @@ import org.apache.pig.StoreFuncInterface
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStoreImpl;
 import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims;
+import org.apache.pig.backend.hadoop.executionengine.shims.TaskContext;
 import org.apache.pig.tools.pigstats.mapreduce.MRPigStatsUtil;
 import org.apache.pig.tools.pigstats.PigStatusReporter;
 /**
@@ -36,48 +37,45 @@ import org.apache.pig.tools.pigstats.Pig
  * collector/record writer. It sets up a modified job configuration to
  * force a write to a specific subdirectory of the main output
  * directory. This is done so that multiple output directories can be
- * used in the same job. 
+ * used in the same job.
  */
-@SuppressWarnings("unchecked")
 public class MapReducePOStoreImpl extends POStoreImpl {
-            
+
     private TaskAttemptContext context;
-    
     private PigStatusReporter reporter;
+    private RecordWriter<?,?> writer;
 
-    private RecordWriter writer;
-           
-    public MapReducePOStoreImpl(TaskInputOutputContext context) {
+    public MapReducePOStoreImpl(TaskInputOutputContext<?,?,?,?> context) {
         // get a copy of the Configuration so that changes to the
         // configuration below (like setting the output location) do
         // not affect the caller's copy
         Configuration outputConf = new Configuration(context.getConfiguration());
-        PigStatusReporter.setContext(context);
         reporter = PigStatusReporter.getInstance();
-       
+        reporter.setContext(new TaskContext<TaskInputOutputContext<?,?,?,?>>(context));
+
         // make a copy of the Context to use here - since in the same
         // task (map or reduce) we could have multiple stores, we should
         // make this copy so that the same context does not get over-written
         // by the different stores.
-        
-        this.context = HadoopShims.createTaskAttemptContext(outputConf, 
+
+        this.context = HadoopShims.createTaskAttemptContext(outputConf,
                 context.getTaskAttemptID());
     }
-    
+
     @Override
-    public StoreFuncInterface createStoreFunc(POStore store) 
+    public StoreFuncInterface createStoreFunc(POStore store)
             throws IOException {
- 
+
         StoreFuncInterface storeFunc = store.getStoreFunc();
 
         // call the setStoreLocation on the storeFunc giving it the
         // Job. Typically this will result in the OutputFormat of the
         // storeFunc storing the output location in the Configuration
-        // in the Job. The PigOutFormat.setLocation() method will merge 
+        // in the Job. The PigOutFormat.setLocation() method will merge
         // this modified Configuration into the configuration of the
         // Context we have
         PigOutputFormat.setLocation(context, store);
-        OutputFormat outputFormat = storeFunc.getOutputFormat();
+        OutputFormat<?,?> outputFormat = storeFunc.getOutputFormat();
 
         // create a new record writer
         try {
@@ -85,9 +83,9 @@ public class MapReducePOStoreImpl extend
         } catch (InterruptedException e) {
             throw new IOException(e);
         }
- 
+
         storeFunc.prepareToWrite(writer);
-        
+
         return storeFunc;
     }
 
@@ -114,10 +112,10 @@ public class MapReducePOStoreImpl extend
             writer = null;
         }
     }
-    
+
     public Counter createRecordCounter(POStore store) {
         String name = MRPigStatsUtil.getMultiStoreCounterName(store);
         return (name == null) ? null : reporter.getCounter(
-                MRPigStatsUtil.MULTI_STORE_COUNTER_GROUP, name); 
+                MRPigStatsUtil.MULTI_STORE_COUNTER_GROUP, name);
     }
 }

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigCombiner.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigCombiner.java?rev=1590819&r1=1590818&r2=1590819&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigCombiner.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigCombiner.java Mon Apr 28 21:59:43 2014
@@ -27,6 +27,7 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.TaskInputOutputContext;
 import org.apache.log4j.PropertyConfigurator;
 import org.apache.pig.PigException;
 import org.apache.pig.backend.executionengine.ExecException;
@@ -37,6 +38,7 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.JoinPackager;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
+import org.apache.pig.backend.hadoop.executionengine.shims.TaskContext;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.io.NullableTuple;
@@ -139,12 +141,11 @@ public class PigCombiner {
                 PhysicalOperator.setReporter(pigReporter);
 
                 boolean aggregateWarning = "true".equalsIgnoreCase(pigContext.getProperties().getProperty("aggregate.warning"));
-
+                PigStatusReporter pigStatusReporter = PigStatusReporter.getInstance();
+                pigStatusReporter.setContext(new TaskContext<TaskInputOutputContext<?,?,?,?>>(context));
                 PigHadoopLogger pigHadoopLogger = PigHadoopLogger.getInstance();
+                pigHadoopLogger.setReporter(pigStatusReporter);
                 pigHadoopLogger.setAggregate(aggregateWarning);
-                PigStatusReporter.setContext(context);
-                pigHadoopLogger.setReporter(PigStatusReporter.getInstance());
-
                 PhysicalOperator.setPigLogger(pigHadoopLogger);
             }
 

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapBase.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapBase.java?rev=1590819&r1=1590818&r2=1590819&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapBase.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapBase.java Mon Apr 28 21:59:43 2014
@@ -31,6 +31,7 @@ import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.TaskInputOutputContext;
 import org.apache.log4j.PropertyConfigurator;
 import org.apache.pig.PigException;
 import org.apache.pig.backend.executionengine.ExecException;
@@ -41,6 +42,7 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;
+import org.apache.pig.backend.hadoop.executionengine.shims.TaskContext;
 import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil;
 import org.apache.pig.data.DataBag;
 import org.apache.pig.data.SchemaTupleBackend;
@@ -64,9 +66,9 @@ import org.apache.pig.tools.pigstats.Pig
 public abstract class PigGenericMapBase extends Mapper<Text, Tuple, PigNullableWritable, Writable> {
 
     private final Log log = LogFactory.getLog(getClass());
-    
+
     protected byte keyType;
-        
+
     //Map Plan
     protected PhysicalPlan mp = null;
 
@@ -74,24 +76,24 @@ public abstract class PigGenericMapBase 
     protected List<POStore> stores;
 
     protected TupleFactory tf = TupleFactory.getInstance();
-    
+
     boolean inIllustrator = false;
-    
+
     Context outputCollector;
-    
+
     // Reporter that will be used by operators
     // to transmit heartbeat
     ProgressableReporter pigReporter;
 
     protected boolean errorInMap = false;
-    
+
     PhysicalOperator[] roots;
 
     private PhysicalOperator leaf;
 
     PigContext pigContext = null;
     private volatile boolean initialized = false;
-    
+
     /**
      * for local map/reduce simulation
      * @param plan the map plan
@@ -99,7 +101,7 @@ public abstract class PigGenericMapBase 
     public void setMapPlan(PhysicalPlan plan) {
         mp = plan;
     }
-    
+
     /**
      * Will be called when all the tuples in the input
      * are done. So reporter thread should be closed.
@@ -111,9 +113,9 @@ public abstract class PigGenericMapBase 
             //error in map - returning
             return;
         }
-            
+
         if(PigMapReduce.sJobConfInternal.get().get(JobControlCompiler.END_OF_INP_IN_MAP, "false").equals("true")) {
-            // If there is a stream in the pipeline or if this map job belongs to merge-join we could 
+            // If there is a stream in the pipeline or if this map job belongs to merge-join we could
             // potentially have more to process - so lets
             // set the flag stating that all map input has been sent
             // already and then lets run the pipeline one more time
@@ -126,7 +128,7 @@ public abstract class PigGenericMapBase 
         if (!inIllustrator) {
             for (POStore store: stores) {
                 if (!initialized) {
-                    MapReducePOStoreImpl impl 
+                    MapReducePOStoreImpl impl
                         = new MapReducePOStoreImpl(context);
                     store.setStoreImpl(impl);
                     store.setUp();
@@ -134,7 +136,7 @@ public abstract class PigGenericMapBase 
                 store.tearDown();
             }
         }
-        
+
         //Calling EvalFunc.finish()
         UDFFinishVisitor finisher = new UDFFinishVisitor(mp, new DependencyOrderWalker<PhysicalOperator, PhysicalPlan>(mp));
         try {
@@ -144,7 +146,7 @@ public abstract class PigGenericMapBase 
             String msg = "Error while calling finish method on UDFs.";
             throw new VisitorException(msg, errCode, PigException.BUG, e);
         }
-        
+
         mp = null;
 
         PhysicalOperator.setReporter(null);
@@ -159,14 +161,14 @@ public abstract class PigGenericMapBase 
     @Override
     public void setup(Context context) throws IOException, InterruptedException {       	
         super.setup(context);
-        
+
         Configuration job = context.getConfiguration();
         SpillableMemoryManager.configure(ConfigurationUtil.toProperties(job));
         PigMapReduce.sJobContext = context;
         PigMapReduce.sJobConfInternal.set(context.getConfiguration());
         PigMapReduce.sJobConf = context.getConfiguration();
         inIllustrator = inIllustrator(context);
-        
+
         PigContext.setPackageImportList((ArrayList<String>)ObjectSerializer.deserialize(job.get("udf.import.list")));
         pigContext = (PigContext)ObjectSerializer.deserialize(job.get("pig.pigContext"));
 
@@ -175,12 +177,12 @@ public abstract class PigGenericMapBase 
 
         if (pigContext.getLog4jProperties()!=null)
             PropertyConfigurator.configure(pigContext.getLog4jProperties());
-        
+
         if (mp == null)
             mp = (PhysicalPlan) ObjectSerializer.deserialize(
                 job.get("pig.mapPlan"));
         stores = PlanHelper.getPhysicalOperators(mp, POStore.class);
-        
+
         // To be removed
         if(mp.isEmpty())
             log.debug("Map Plan empty!");
@@ -191,7 +193,7 @@ public abstract class PigGenericMapBase 
         }
         keyType = ((byte[])ObjectSerializer.deserialize(job.get("pig.map.keytype")))[0];
         // till here
-        
+
         pigReporter = new ProgressableReporter();
         // Get the UDF specific context
         MapRedUtil.setupUDFContext(job);
@@ -200,26 +202,27 @@ public abstract class PigGenericMapBase 
 
             PigSplit split = (PigSplit)context.getInputSplit();
             List<OperatorKey> targetOpKeys = split.getTargetOps();
-            
+
             ArrayList<PhysicalOperator> targetOpsAsList = new ArrayList<PhysicalOperator>();
-            for (OperatorKey targetKey : targetOpKeys) {                    
+            for (OperatorKey targetKey : targetOpKeys) {
                 targetOpsAsList.add(mp.getOperator(targetKey));
             }
             roots = targetOpsAsList.toArray(new PhysicalOperator[1]);
-            leaf = mp.getLeaves().get(0);               
+            leaf = mp.getLeaves().get(0);
         }
-        
-        PigStatusReporter.setContext(context);
- 
+
+        PigStatusReporter pigStatusReporter = PigStatusReporter.getInstance();
+        pigStatusReporter.setContext(new TaskContext<TaskInputOutputContext<?,?,?,?>>(context));
+
         log.info("Aliases being processed per job phase (AliasName[line,offset]): " + job.get("pig.alias.location"));
-        
+
         String dtzStr = PigMapReduce.sJobConfInternal.get().get("pig.datetime.default.tz");
         if (dtzStr != null && dtzStr.length() > 0) {
             // ensure that the internal timezone is uniformly in UTC offset style
             DateTimeZone.setDefault(DateTimeZone.forOffsetMillis(DateTimeZone.forID(dtzStr).getOffset(null)));
         }
     }
-    
+
     /**
      * The map function that attaches the inpTuple appropriately
      * and executes the map plan if its not empty. Collects the
@@ -228,9 +231,9 @@ public abstract class PigGenericMapBase 
      * map-only or map-reduce job to implement. Map-only collects
      * the tuple as-is whereas map-reduce collects it after extracting
      * the key and indexed tuple.
-     */   
+     */
     @Override
-    protected void map(Text key, Tuple inpTuple, Context context) throws IOException, InterruptedException {     
+    protected void map(Text key, Tuple inpTuple, Context context) throws IOException, InterruptedException {
         if(!initialized) {
             initialized  = true;
             // cache the collector for use in runPipeline() which
@@ -238,32 +241,31 @@ public abstract class PigGenericMapBase 
             this.outputCollector = context;
             pigReporter.setRep(context);
             PhysicalOperator.setReporter(pigReporter);
-           
+
+            boolean aggregateWarning = "true".equalsIgnoreCase(pigContext.getProperties().getProperty("aggregate.warning"));
+            PigStatusReporter pigStatusReporter = PigStatusReporter.getInstance();
+            pigStatusReporter.setContext(new TaskContext<TaskInputOutputContext<?,?,?,?>>(context));
+            PigHadoopLogger pigHadoopLogger = PigHadoopLogger.getInstance();
+            pigHadoopLogger.setReporter(pigStatusReporter);
+            pigHadoopLogger.setAggregate(aggregateWarning);
+            PhysicalOperator.setPigLogger(pigHadoopLogger);
+
             if (!inIllustrator) {
                 for (POStore store: stores) {
-                    MapReducePOStoreImpl impl 
+                    MapReducePOStoreImpl impl
                         = new MapReducePOStoreImpl(context);
                     store.setStoreImpl(impl);
                     if (!pigContext.inIllustrator)
                         store.setUp();
                 }
             }
-            
-            boolean aggregateWarning = "true".equalsIgnoreCase(pigContext.getProperties().getProperty("aggregate.warning"));
-
-            PigHadoopLogger pigHadoopLogger = PigHadoopLogger.getInstance();
-            pigHadoopLogger.setAggregate(aggregateWarning);           
-            pigHadoopLogger.setReporter(PigStatusReporter.getInstance());
-
-            PhysicalOperator.setPigLogger(pigHadoopLogger);
-
         }
-        
+
         if (mp.isEmpty()) {
             collect(context,inpTuple);
             return;
         }
-        
+
         for (PhysicalOperator root : roots) {
             if (inIllustrator) {
                 if (root != null) {
@@ -273,7 +275,7 @@ public abstract class PigGenericMapBase 
                 root.attachInput(tf.newTupleNoCopy(inpTuple.getAll()));
             }
         }
-            
+
         runPipeline(leaf);
     }
 
@@ -284,16 +286,16 @@ public abstract class PigGenericMapBase 
                 collect(outputCollector,(Tuple)res.result);
                 continue;
             }
-            
+
             if(res.returnStatus==POStatus.STATUS_EOP) {
                 return;
             }
-            
+
             if(res.returnStatus==POStatus.STATUS_NULL)
                 continue;
-            
+
             if(res.returnStatus==POStatus.STATUS_ERR){
-                // remember that we had an issue so that in 
+                // remember that we had an issue so that in
                 // close() we can do the right thing
                 errorInMap  = true;
                 // if there is an errmessage use it
@@ -305,19 +307,19 @@ public abstract class PigGenericMapBase 
                     errMsg = "Received Error while " +
                     "processing the map plan.";
                 }
-                    
+
                 int errCode = 2055;
                 ExecException ee = new ExecException(errMsg, errCode, PigException.BUG);
                 throw ee;
             }
         }
-        
+
     }
 
     abstract public void collect(Context oc, Tuple tuple) throws InterruptedException, IOException;
 
     abstract public boolean inIllustrator(Context context);
-    
+
     /**
      * @return the keyType
      */
@@ -331,7 +333,7 @@ public abstract class PigGenericMapBase 
     public void setKeyType(byte keyType) {
         this.keyType = keyType;
     }
-    
+
     abstract public Context getIllustratorContext(Configuration conf, DataBag input,
             List<Pair<PigNullableWritable, Writable>> output, InputSplit split)
             throws IOException, InterruptedException;

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapReduce.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapReduce.java?rev=1590819&r1=1590818&r2=1590819&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapReduce.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapReduce.java Mon Apr 28 21:59:43 2014
@@ -30,6 +30,7 @@ import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapred.jobcontrol.Job;
 import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.TaskInputOutputContext;
 import org.apache.pig.PigException;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.backend.hadoop.HDataType;
@@ -42,6 +43,7 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;
+import org.apache.pig.backend.hadoop.executionengine.shims.TaskContext;
 import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil;
 import org.apache.pig.data.DataBag;
 import org.apache.pig.data.DataType;
@@ -63,20 +65,20 @@ import org.joda.time.DateTimeZone;
 /**
  * This class is the static Mapper &amp; Reducer classes that
  * are used by Pig to execute Pig Map Reduce jobs. Since
- * there is a reduce phase, the leaf is bound to be a 
+ * there is a reduce phase, the leaf is bound to be a
  * POLocalRearrange. So the map phase has to separate the
  * key and tuple and collect it into the output
  * collector.
- * 
+ *
  * The shuffle and sort phase sorts these keys &amp; tuples
  * and creates key, List&lt;Tuple&gt; and passes the key and
  * iterator to the list. The deserialized POPackage operator
- * is used to package the key, List&lt;Tuple&gt; into pigKey, 
+ * is used to package the key, List&lt;Tuple&gt; into pigKey,
  * Bag&lt;Tuple&gt; where pigKey is of the appropriate pig type and
  * then the result of the package is attached to the reduce
- * plan which is executed if its not empty. Either the result 
+ * plan which is executed if its not empty. Either the result
  * of the reduce plan or the package res is collected into
- * the output collector. 
+ * the output collector.
  *
  * The index of the tuple (that is, which bag it should be placed in by the
  * package) is packed into the key.  This is done so that hadoop sorts the
@@ -89,28 +91,28 @@ import org.joda.time.DateTimeZone;
 public class PigGenericMapReduce {
 
     public static JobContext sJobContext = null;
-    
+
     /**
-     * @deprecated Use {@link UDFContext} instead in the following way to get 
+     * @deprecated Use {@link UDFContext} instead in the following way to get
      * the job's {@link Configuration}:
      * <pre>UdfContext.getUdfContext().getJobConf()</pre>
      */
     @Deprecated
     public static Configuration sJobConf = null;
-    
+
     public static final ThreadLocal<Configuration> sJobConfInternal = new ThreadLocal<Configuration>();
-    
+
     public static class Map extends PigMapBase {
 
         @Override
-        public void collect(Context oc, Tuple tuple) 
+        public void collect(Context oc, Tuple tuple)
                 throws InterruptedException, IOException {
-            
+
             Byte index = (Byte)tuple.get(0);
             PigNullableWritable key =
                 HDataType.getWritableComparableTypes(tuple.get(1), keyType);
             NullableTuple val = new NullableTuple((Tuple)tuple.get(2));
-            
+
             // Both the key and the value need the index.  The key needs it so
             // that it can be sorted on the index in addition to the key
             // value.  The value needs it so that POPackage can properly
@@ -121,7 +123,7 @@ public class PigGenericMapReduce {
             oc.write(key, val);
         }
     }
-    
+
     /**
      * This "specialized" map class is ONLY to be used in pig queries with
      * order by a udf. A UDF used for comparison in the order by expects
@@ -131,9 +133,9 @@ public class PigGenericMapReduce {
     public static class MapWithComparator extends PigMapBase {
 
         @Override
-        public void collect(Context oc, Tuple tuple) 
+        public void collect(Context oc, Tuple tuple)
                 throws InterruptedException, IOException {
-            
+
             Object keyTuple = null;
             if(keyType != DataType.TUPLE) {
                 Object k = tuple.get(1);
@@ -141,13 +143,13 @@ public class PigGenericMapReduce {
             } else {
                 keyTuple = tuple.get(1);
             }
-            
+
 
             Byte index = (Byte)tuple.get(0);
             PigNullableWritable key =
                 HDataType.getWritableComparableTypes(keyTuple, DataType.TUPLE);
             NullableTuple val = new NullableTuple((Tuple)tuple.get(2));
-            
+
             // Both the key and the value need the index.  The key needs it so
             // that it can be sorted on the index in addition to the key
             // value.  The value needs it so that POPackage can properly
@@ -165,9 +167,9 @@ public class PigGenericMapReduce {
     public static class MapWithPartitionIndex extends Map {
 
         @Override
-        public void collect(Context oc, Tuple tuple) 
+        public void collect(Context oc, Tuple tuple)
                 throws InterruptedException, IOException {
-            
+
             Byte tupleKeyIdx = 2;
             Byte tupleValIdx = 3;
 
@@ -189,13 +191,13 @@ public class PigGenericMapReduce {
             NullablePartitionWritable wrappedKey = new NullablePartitionWritable(key);
 
             NullableTuple val = new NullableTuple((Tuple)tuple.get(tupleValIdx));
-            
+
             // Both the key and the value need the index.  The key needs it so
             // that it can be sorted on the index in addition to the key
             // value.  The value needs it so that POPackage can properly
             // assign the tuple to its slot in the projection.
             wrappedKey.setIndex(index);
-            
+
             // set the partition
             wrappedKey.setPartition(partitionIndex);
             val.setIndex(index);
@@ -203,14 +205,14 @@ public class PigGenericMapReduce {
         }
 
         @Override
-        protected void runPipeline(PhysicalOperator leaf) 
+        protected void runPipeline(PhysicalOperator leaf)
                 throws IOException, InterruptedException {
-            
+
             while(true){
                 Result res = leaf.getNextTuple();
-                
+
                 if(res.returnStatus==POStatus.STATUS_OK){
-                    // For POPartitionRearrange, the result is a bag. 
+                    // For POPartitionRearrange, the result is a bag.
                     // This operator is used for skewed join
                     if (res.result instanceof DataBag) {
                         Iterator<Tuple> its = ((DataBag)res.result).iterator();
@@ -222,7 +224,7 @@ public class PigGenericMapReduce {
                     }
                     continue;
                 }
-                
+
                 if(res.returnStatus==POStatus.STATUS_EOP) {
                     return;
                 }
@@ -232,7 +234,7 @@ public class PigGenericMapReduce {
                 }
 
                 if(res.returnStatus==POStatus.STATUS_ERR){
-                    // remember that we had an issue so that in 
+                    // remember that we had an issue so that in
                     // close() we can do the right thing
                     errorInMap  = true;
                     // if there is an errmessage use it
@@ -252,39 +254,39 @@ public class PigGenericMapReduce {
         }
     }
 
-    abstract public static class Reduce 
+    abstract public static class Reduce
             extends Reducer <PigNullableWritable, NullableTuple, PigNullableWritable, Writable> {
-        
+
         protected final Log log = LogFactory.getLog(getClass());
-        
+
         //The reduce plan
         protected PhysicalPlan rp = null;
 
         // Store operators
         protected List<POStore> stores;
-        
+
         //The POPackage operator which is the
         //root of every Map Reduce plan is
         //obtained through the job conf. The portion
         //remaining after its removal is the reduce
         //plan
         protected POPackage pack;
-        
+
         ProgressableReporter pigReporter;
 
         protected Context outputCollector;
 
         protected boolean errorInReduce = false;
-        
+
         PhysicalOperator[] roots;
 
         private PhysicalOperator leaf;
-        
+
         PigContext pigContext = null;
         protected volatile boolean initialized = false;
-        
+
         private boolean inIllustrator = false;
-        
+
         /**
          * Set the reduce plan: to be used by local runner for illustrator
          * @param plan Reduce plan
@@ -312,7 +314,7 @@ public class PigGenericMapReduce {
             try {
                 PigContext.setPackageImportList((ArrayList<String>)ObjectSerializer.deserialize(jConf.get("udf.import.list")));
                 pigContext = (PigContext)ObjectSerializer.deserialize(jConf.get("pig.pigContext"));
-                
+
                 // This attempts to fetch all of the generated code from the distributed cache, and resolve it
                 SchemaTupleBackend.initialize(jConf, pigContext);
 
@@ -336,36 +338,37 @@ public class PigGenericMapReduce {
                     roots = rp.getRoots().toArray(new PhysicalOperator[1]);
                     leaf = rp.getLeaves().get(0);
                 }
-                
+
                 // Get the UDF specific context
             	MapRedUtil.setupUDFContext(jConf);
-            
+
             } catch (IOException ioe) {
                 String msg = "Problem while configuring reduce plan.";
                 throw new RuntimeException(msg, ioe);
             }
+
             log.info("Aliases being processed per job phase (AliasName[line,offset]): " + jConf.get("pig.alias.location"));
-            
+
             String dtzStr = PigMapReduce.sJobConfInternal.get().get("pig.datetime.default.tz");
             if (dtzStr != null && dtzStr.length() > 0) {
                 // ensure that the internal timezone is uniformly in UTC offset style
                 DateTimeZone.setDefault(DateTimeZone.forOffsetMillis(DateTimeZone.forID(dtzStr).getOffset(null)));
             }
         }
-        
+
         /**
          * The reduce function which packages the key and List&lt;Tuple&gt;
          * into key, Bag&lt;Tuple&gt; after converting Hadoop type key into Pig type.
          * The package result is either collected as is, if the reduce plan is
          * empty or after passing through the reduce plan.
-         */       
+         */
         @Override
-        protected void reduce(PigNullableWritable key, Iterable<NullableTuple> tupIter, Context context) 
-                throws IOException, InterruptedException {            
-            
+        protected void reduce(PigNullableWritable key, Iterable<NullableTuple> tupIter, Context context)
+                throws IOException, InterruptedException {
+
             if (!initialized) {
                 initialized = true;
-                
+
                 // cache the collector for use in runPipeline()
                 // which could additionally be called from close()
                 this.outputCollector = context;
@@ -373,27 +376,26 @@ public class PigGenericMapReduce {
                 PhysicalOperator.setReporter(pigReporter);
 
                 boolean aggregateWarning = "true".equalsIgnoreCase(pigContext.getProperties().getProperty("aggregate.warning"));
-
+                PigStatusReporter pigStatusReporter = PigStatusReporter.getInstance();
+                pigStatusReporter.setContext(new TaskContext<TaskInputOutputContext<?,?,?,?>>(context));
                 PigHadoopLogger pigHadoopLogger = PigHadoopLogger.getInstance();
+                pigHadoopLogger.setReporter(pigStatusReporter);
                 pigHadoopLogger.setAggregate(aggregateWarning);
-                PigStatusReporter.setContext(context);
-                pigHadoopLogger.setReporter(PigStatusReporter.getInstance());
-                
                 PhysicalOperator.setPigLogger(pigHadoopLogger);
 
                 if (!inIllustrator)
                     for (POStore store: stores) {
-                        MapReducePOStoreImpl impl 
+                        MapReducePOStoreImpl impl
                             = new MapReducePOStoreImpl(context);
                         store.setStoreImpl(impl);
                         store.setUp();
                     }
             }
-          
+
             // In the case we optimize the join, we combine
             // POPackage and POForeach - so we could get many
             // tuples out of the getnext() call of POJoinPackage
-            // In this case, we process till we see EOP from 
+            // In this case, we process till we see EOP from
             // POJoinPacakage.getNext()
             if (pack.getPkgr() instanceof JoinPackager)
             {
@@ -409,18 +411,18 @@ public class PigGenericMapReduce {
                 // give only one tuple out for the key
                 pack.attachInput(key, tupIter.iterator());
                 processOnePackageOutput(context);
-            } 
+            }
         }
-        
+
         // return: false-more output
         //         true- end of processing
-        public boolean processOnePackageOutput(Context oc) 
+        public boolean processOnePackageOutput(Context oc)
                 throws IOException, InterruptedException {
 
             Result res = pack.getNextTuple();
             if(res.returnStatus==POStatus.STATUS_OK){
                 Tuple packRes = (Tuple)res.result;
-                
+
                 if(rp.isEmpty()){
                     oc.write(null, packRes);
                     return false;
@@ -429,35 +431,35 @@ public class PigGenericMapReduce {
                     roots[i].attachInput(packRes);
                 }
                 runPipeline(leaf);
-                
+
             }
-            
+
             if(res.returnStatus==POStatus.STATUS_NULL) {
                 return false;
             }
-            
+
             if(res.returnStatus==POStatus.STATUS_ERR){
                 int errCode = 2093;
                 String msg = "Encountered error in package operator while processing group.";
                 throw new ExecException(msg, errCode, PigException.BUG);
             }
-            
+
             if(res.returnStatus==POStatus.STATUS_EOP) {
                 return true;
             }
-                
+
             return false;
-            
+
         }
-        
+
         /**
          * @param leaf
          * @throws InterruptedException
-         * @throws IOException 
+         * @throws IOException
          */
-        protected void runPipeline(PhysicalOperator leaf) 
+        protected void runPipeline(PhysicalOperator leaf)
                 throws InterruptedException, IOException {
-            
+
             while(true)
             {
                 Result redRes = leaf.getNextTuple();
@@ -469,17 +471,17 @@ public class PigGenericMapReduce {
                     }
                     continue;
                 }
-                
+
                 if(redRes.returnStatus==POStatus.STATUS_EOP) {
                     return;
                 }
-                
+
                 if(redRes.returnStatus==POStatus.STATUS_NULL) {
                     continue;
                 }
-                
+
                 if(redRes.returnStatus==POStatus.STATUS_ERR){
-                    // remember that we had an issue so that in 
+                    // remember that we had an issue so that in
                     // close() we can do the right thing
                     errorInReduce   = true;
                     // if there is an errmessage use it
@@ -496,22 +498,22 @@ public class PigGenericMapReduce {
                 }
             }
         }
-        
+
         /**
          * Will be called once all the intermediate keys and values are
          * processed. So right place to stop the reporter thread.
          */
-        @Override 
+        @Override
         protected void cleanup(Context context) throws IOException, InterruptedException {
             super.cleanup(context);
-            
+
             if(errorInReduce) {
                 // there was an error in reduce - just return
                 return;
             }
-            
+
             if(PigMapReduce.sJobConfInternal.get().get("pig.stream.in.reduce", "false").equals("true")) {
-                // If there is a stream in the pipeline we could 
+                // If there is a stream in the pipeline we could
                 // potentially have more to process - so lets
                 // set the flag stating that all map input has been sent
                 // already and then lets run the pipeline one more time
@@ -524,7 +526,7 @@ public class PigGenericMapReduce {
             if (!inIllustrator) {
                 for (POStore store: stores) {
                     if (!initialized) {
-                        MapReducePOStoreImpl impl 
+                        MapReducePOStoreImpl impl
                             = new MapReducePOStoreImpl(context);
                         store.setStoreImpl(impl);
                         store.setUp();
@@ -532,7 +534,7 @@ public class PigGenericMapReduce {
                     store.tearDown();
                 }
             }
-                        
+
             //Calling EvalFunc.finish()
             UDFFinishVisitor finisher = new UDFFinishVisitor(rp, new DependencyOrderWalker<PhysicalOperator, PhysicalPlan>(rp));
             try {
@@ -540,14 +542,14 @@ public class PigGenericMapReduce {
             } catch (VisitorException e) {
                 throw new IOException("Error trying to finish UDFs",e);
             }
-            
+
             PhysicalOperator.setReporter(null);
             initialized = false;
         }
-        
+
         /**
          * Get reducer's illustrator context
-         * 
+         *
          * @param input Input buffer as output by maps
          * @param pkg package
          * @return reducer's illustrator context
@@ -556,25 +558,25 @@ public class PigGenericMapReduce {
          */
         abstract public Context getIllustratorContext(Job job,
                List<Pair<PigNullableWritable, Writable>> input, POPackage pkg) throws IOException, InterruptedException;
-        
+
         abstract public boolean inIllustrator(Context context);
-        
+
         abstract public POPackage getPack(Context context);
     }
-    
+
     /**
      * This "specialized" reduce class is ONLY to be used in pig queries with
      * order by a udf. A UDF used for comparison in the order by expects
      * to be handed tuples. Hence a specialized map class (PigMapReduce.MapWithComparator)
-     * ensures that the "key" used in the order by is wrapped into a tuple (if it 
+     * ensures that the "key" used in the order by is wrapped into a tuple (if it
      * isn't already a tuple). This reduce class unwraps this tuple in the case where
      * the map had wrapped into a tuple and handes the "unwrapped" key to the POPackage
      * for processing
      */
     public static class ReduceWithComparator extends PigMapReduce.Reduce {
-        
+
         private byte keyType;
-        
+
         /**
          * Configures the Reduce plan, the POPackage operator
          * and the reporter thread
@@ -592,12 +594,12 @@ public class PigGenericMapReduce {
          * empty or after passing through the reduce plan.
          */
         @Override
-        protected void reduce(PigNullableWritable key, Iterable<NullableTuple> tupIter, Context context) 
+        protected void reduce(PigNullableWritable key, Iterable<NullableTuple> tupIter, Context context)
                 throws IOException, InterruptedException {
-            
+
             if (!initialized) {
                 initialized = true;
-                
+
                 // cache the collector for use in runPipeline()
                 // which could additionally be called from close()
                 this.outputCollector = context;
@@ -605,24 +607,23 @@ public class PigGenericMapReduce {
                 PhysicalOperator.setReporter(pigReporter);
 
                 boolean aggregateWarning = "true".equalsIgnoreCase(pigContext.getProperties().getProperty("aggregate.warning"));
-                
+                PigStatusReporter pigStatusReporter = PigStatusReporter.getInstance();
+                pigStatusReporter.setContext(new TaskContext<TaskInputOutputContext<?,?,?,?>>(context));
                 PigHadoopLogger pigHadoopLogger = PigHadoopLogger.getInstance();
+                pigHadoopLogger.setReporter(pigStatusReporter);
                 pigHadoopLogger.setAggregate(aggregateWarning);
-                PigStatusReporter.setContext(context);
-                pigHadoopLogger.setReporter(PigStatusReporter.getInstance());
-
                 PhysicalOperator.setPigLogger(pigHadoopLogger);
-                
+
                 for (POStore store: stores) {
-                    MapReducePOStoreImpl impl 
+                    MapReducePOStoreImpl impl
                         = new MapReducePOStoreImpl(context);
                     store.setStoreImpl(impl);
                     store.setUp();
                 }
             }
-            
+
             // If the keyType is not a tuple, the MapWithComparator.collect()
-            // would have wrapped the key into a tuple so that the 
+            // would have wrapped the key into a tuple so that the
             // comparison UDF used in the order by can process it.
             // We need to unwrap the key out of the tuple and hand it
             // to the POPackage for processing
@@ -634,31 +635,31 @@ public class PigGenericMapReduce {
                     throw e;
                 }
             }
-            
+
             pack.attachInput(key, tupIter.iterator());
-            
+
             Result res = pack.getNextTuple();
             if(res.returnStatus==POStatus.STATUS_OK){
                 Tuple packRes = (Tuple)res.result;
-                
+
                 if(rp.isEmpty()){
                     context.write(null, packRes);
                     return;
                 }
-                
+
                 rp.attachInput(packRes);
 
                 List<PhysicalOperator> leaves = rp.getLeaves();
-                
+
                 PhysicalOperator leaf = leaves.get(0);
                 runPipeline(leaf);
-                
+
             }
-            
+
             if(res.returnStatus==POStatus.STATUS_NULL) {
                 return;
             }
-            
+
             if(res.returnStatus==POStatus.STATUS_ERR){
                 int errCode = 2093;
                 String msg = "Encountered error in package operator while processing group.";
@@ -668,5 +669,5 @@ public class PigGenericMapReduce {
         }
 
     }
-   
+
 }

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigHadoopLogger.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigHadoopLogger.java?rev=1590819&r1=1590818&r2=1590819&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigHadoopLogger.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigHadoopLogger.java Mon Apr 28 21:59:43 2014
@@ -35,25 +35,38 @@ import org.apache.pig.tools.pigstats.Pig
  * warning messages
  */
 public final class PigHadoopLogger implements PigLogger {
-    private static class PigHadoopLoggerHelper {
-        private static PigHadoopLogger instance = new PigHadoopLogger();
-    }
-
-    public static PigHadoopLogger getInstance() {
-        return PigHadoopLoggerHelper.instance;
-    }
 
     private static Log log = LogFactory.getLog(PigHadoopLogger.class);
+    private static PigHadoopLogger logger = null;
 
     private PigStatusReporter reporter = null;
-
     private boolean aggregate = false;
-
     private Map<Object, String> msgMap = new WeakHashMap<Object, String>();
 
     private PigHadoopLogger() {
     }
 
+    /**
+     * Get singleton instance of the context
+     */
+    public static PigHadoopLogger getInstance() {
+        if (logger == null) {
+            logger = new PigHadoopLogger();
+        }
+        return logger;
+    }
+
+    public void destory() {
+        if (reporter != null) {
+            reporter.destory();
+        }
+        reporter = null;
+    }
+
+    public void setReporter(PigStatusReporter reporter) {
+        this.reporter = reporter;
+    }
+
     @SuppressWarnings("rawtypes")
     public void warn(Object o, String msg, Enum warningEnum) {
         String className = o.getClass().getName();
@@ -61,15 +74,15 @@ public final class PigHadoopLogger imple
 
         if (getAggregate()) {
             if (reporter != null) {
-                // log atleast once
+                // log at least once
                 if (msgMap.get(o) == null || !msgMap.get(o).equals(displayMessage)) {
                     log.warn(displayMessage);
                     msgMap.put(o, displayMessage);
                 }
                 if (o instanceof EvalFunc || o instanceof LoadFunc || o instanceof StoreFunc) {
-                    reporter.getCounter(className, warningEnum.name()).increment(1);
+                    reporter.incrCounter(className, warningEnum.name(), 1);
                 } else {
-                    reporter.getCounter(warningEnum).increment(1);
+                    reporter.incrCounter(warningEnum, 1);
                 }
             } else {
                 //TODO:
@@ -87,10 +100,6 @@ public final class PigHadoopLogger imple
         }
     }
 
-    public synchronized void setReporter(PigStatusReporter rep) {
-        this.reporter = rep;
-    }
-
     public synchronized boolean getAggregate() {
         return aggregate;
     }
@@ -98,5 +107,4 @@ public final class PigHadoopLogger imple
     public synchronized void setAggregate(boolean aggregate) {
         this.aggregate = aggregate;
     }
-
 }

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduceCounter.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduceCounter.java?rev=1590819&r1=1590818&r2=1590819&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduceCounter.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduceCounter.java Mon Apr 28 21:59:43 2014
@@ -75,10 +75,9 @@ public class PigMapReduceCounter {
             try {
                 PigStatusReporter reporter = PigStatusReporter.getInstance();
                 if (reporter != null) {
-                    reporter.getCounter(
+                    reporter.incrCounter(
                             JobControlCompiler.PIG_MAP_RANK_NAME
-                            + context.getJobID().toString(), taskID)
-                            .increment(1);
+                            + context.getJobID().toString(), taskID, 1);
                 }
             } catch (Exception ex) {
                 log.error("Error on incrementer of PigMapCounter");
@@ -135,10 +134,9 @@ public class PigMapReduceCounter {
                 if (reporter != null) {
 
                     if(leaf instanceof POCounter){
-                        reporter.getCounter(
+                        reporter.incrCounter(
                                 JobControlCompiler.PIG_MAP_RANK_NAME
-                                + context.getJobID().toString(), taskID).increment(increment);
-
+                                + context.getJobID().toString(), taskID, increment);
                     }
 
                 }

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigRecordReader.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigRecordReader.java?rev=1590819&r1=1590818&r2=1590819&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigRecordReader.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigRecordReader.java Mon Apr 28 21:59:43 2014
@@ -56,7 +56,7 @@ public class PigRecordReader extends Rec
     private static final Log LOG = LogFactory.getLog(PigRecordReader.class);
 
     private final static String TIMING_COUNTER = "approx_microsecs";
-    private final static int TIMING_FREQ = 100;
+    private final static long TIMING_FREQ = 100;
 
     transient private String counterGroup = "";
     private boolean doTiming = false;
@@ -214,8 +214,8 @@ public class PigRecordReader extends Rec
             }
         }
         if (timeThis) {
-            PigStatusReporter.getInstance().getCounter(counterGroup, TIMING_COUNTER).increment(
-                    ( Math.round((System.nanoTime() - startNanos) / 1000)) * TIMING_FREQ);
+            PigStatusReporter.getInstance().incrCounter(counterGroup, TIMING_COUNTER,
+                    Math.round((System.nanoTime() - startNanos) / 1000) * TIMING_FREQ);
         }
         recordCount++;
         return true;

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PigLogger.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PigLogger.java?rev=1590819&r1=1590818&r2=1590819&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PigLogger.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PigLogger.java Mon Apr 28 21:59:43 2014
@@ -27,9 +27,8 @@ import org.apache.pig.classification.Int
 @InterfaceAudience.Private
 @InterfaceStability.Evolving
 public interface PigLogger {
-    
-	/**
-	 * If you have warning messages that need aggregation 
-	 */
-    public void warn(Object o, String msg, Enum warningEnum);
+    /**
+     * If you have warning messages that need aggregation 
+     */
+    public void warn(Object o, String msg, Enum<?> warningEnum);
 }

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java?rev=1590819&r1=1590818&r2=1590819&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java Mon Apr 28 21:59:43 2014
@@ -61,7 +61,7 @@ public class POUserFunc extends Expressi
     private static final Log LOG = LogFactory.getLog(POUserFunc.class);
     private final static String TIMING_COUNTER = "approx_microsecs";
     private final static String INVOCATION_COUNTER = "approx_invocations";
-    private final static int TIMING_FREQ = 100;
+    private final static long TIMING_FREQ = 100;
     private final static TupleFactory tf = TupleFactory.getInstance();
 
     private transient String counterGroup;
@@ -229,8 +229,8 @@ public class POUserFunc extends Expressi
                             if (knownSize) {
                                 rslt.set(knownIndex++, trslt.get(i));
                             } else {
-                            rslt.append(trslt.get(i));
-                        }
+                                rslt.append(trslt.get(i));
+                            }
                         }
                         continue;
                     }
@@ -238,8 +238,8 @@ public class POUserFunc extends Expressi
                 if (knownSize) {
                     ((Tuple)res.result).set(knownIndex++, temp.result);
                 } else {
-                ((Tuple)res.result).append(temp.result);
-            }
+                    ((Tuple)res.result).append(temp.result);
+                }
             }
             res.returnStatus = temp.returnStatus;
 
@@ -273,8 +273,7 @@ public class POUserFunc extends Expressi
         boolean timeThis = doTiming && (numInvocations++ % TIMING_FREQ == 0);
         if (timeThis) {
             startNanos = System.nanoTime();
-            PigStatusReporter.getInstance().getCounter(counterGroup, INVOCATION_COUNTER).increment(TIMING_FREQ);
-
+            PigStatusReporter.getInstance().incrCounter(counterGroup, INVOCATION_COUNTER, TIMING_FREQ);
         }
         try {
             if(result.returnStatus == POStatus.STATUS_OK) {
@@ -355,8 +354,8 @@ public class POUserFunc extends Expressi
                 }
             }
             if (timeThis) {
-                PigStatusReporter.getInstance().getCounter(counterGroup, TIMING_COUNTER).increment(
-                        ( Math.round((System.nanoTime() - startNanos) / 1000)) * TIMING_FREQ);
+                PigStatusReporter.getInstance().incrCounter(counterGroup, TIMING_COUNTER,
+                        Math.round((System.nanoTime() - startNanos) / 1000) * TIMING_FREQ);
             }
             return result;
         } catch (ExecException ee) {
@@ -404,13 +403,11 @@ public class POUserFunc extends Expressi
 
     @Override
     public Result getNextBoolean() throws ExecException {
-
         return getNext();
     }
 
     @Override
     public Result getNextDataByteArray() throws ExecException {
-
         return getNext();
     }
 

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/util/MonitoredUDFExecutor.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/util/MonitoredUDFExecutor.java?rev=1590819&r1=1590818&r2=1590819&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/util/MonitoredUDFExecutor.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/util/MonitoredUDFExecutor.java Mon Apr 28 21:59:43 2014
@@ -28,7 +28,6 @@ import java.util.concurrent.ScheduledThr
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
-import org.apache.hadoop.mapreduce.StatusReporter;
 import org.apache.pig.EvalFunc;
 import org.apache.pig.builtin.MonitoredUDF;
 import org.apache.pig.data.Tuple;
@@ -136,20 +135,18 @@ public class MonitoredUDFExecutor implem
         @SuppressWarnings("unchecked")
         public static void handleError(EvalFunc evalFunc, Exception e) {
             evalFunc.getLogger().error(e);
-            StatusReporter reporter = PigStatusReporter.getInstance();
-            if (reporter != null &&
-                    reporter.getCounter(evalFunc.getClass().getName(), e.toString()) != null) {
-                reporter.getCounter(evalFunc.getClass().getName(), e.toString()).increment(1L);
+            PigStatusReporter reporter = PigStatusReporter.getInstance();
+            if (reporter != null) {
+                reporter.incrCounter(evalFunc.getClass().getName(), e.toString(), 1);
             }
         }
 
         @SuppressWarnings("unchecked")
         public static void handleTimeout(EvalFunc evalFunc, Exception e) {
             evalFunc.getLogger().error(e);
-            StatusReporter reporter = PigStatusReporter.getInstance();
-            if (reporter != null &&
-                    reporter.getCounter(evalFunc.getClass().getName(), "MonitoredUDF Timeout") != null) {
-                reporter.getCounter(evalFunc.getClass().getName(), "MonitoredUDF Timeout").increment(1L);
+            PigStatusReporter reporter = PigStatusReporter.getInstance();
+            if (reporter != null) {
+                reporter.incrCounter(evalFunc.getClass().getName(), "MonitoredUDF Timeout", 1);
             }
         }
     }

Modified: pig/trunk/src/org/apache/pig/tools/counters/PigCounterHelper.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/tools/counters/PigCounterHelper.java?rev=1590819&r1=1590818&r2=1590819&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/tools/counters/PigCounterHelper.java (original)
+++ pig/trunk/src/org/apache/pig/tools/counters/PigCounterHelper.java Mon Apr 28 21:59:43 2014
@@ -20,7 +20,6 @@ package org.apache.pig.tools.counters;
 
 import java.util.Map;
 
-import org.apache.hadoop.mapreduce.Counter;
 import org.apache.pig.impl.util.Pair;
 import org.apache.pig.tools.pigstats.PigStatusReporter;
 
@@ -33,53 +32,51 @@ import com.google.common.collect.Maps;
  * stored counters each time it does.
  */
 public class PigCounterHelper {
-  private final Map<Pair<String, String>, Long> counterStringMap_ = Maps.newHashMap();
-  private final Map<Enum<?>, Long> counterEnumMap_ = Maps.newHashMap();
+    private final Map<Pair<String, String>, Long> counterStringMap_ = Maps.newHashMap();
+    private final Map<Enum<?>, Long> counterEnumMap_ = Maps.newHashMap();
 
-  /**
-   * Mocks the Reporter.incrCounter, but adds buffering.
-   * See org.apache.hadoop.mapred.Reporter's incrCounter.
-   */
-  public void incrCounter(String group, String counterName, long incr) {
-    PigStatusReporter reporter = PigStatusReporter.getInstance();
-    if (reporter != null) { // common case
-      Counter counter = reporter.getCounter(group, counterName);
-      if (counter != null) {
-        counter.increment(incr);
-
-        if (counterStringMap_.size() > 0) {
-          for (Map.Entry<Pair<String, String>, Long> entry : counterStringMap_.entrySet()) {
-            reporter.getCounter(entry.getKey().first, entry.getKey().second).increment(entry.getValue());
-          }
-          counterStringMap_.clear();
+    /**
+    * Mocks the Reporter.incrCounter, but adds buffering.
+    * See org.apache.hadoop.mapred.Reporter's incrCounter.
+    */
+    public void incrCounter(String group, String counterName, long incr) {
+        PigStatusReporter reporter = PigStatusReporter.getInstance();
+        if (reporter != null && reporter.incrCounter(group, counterName, incr)) { // common case
+            if (counterStringMap_.size() > 0) {
+                for (Map.Entry<Pair<String, String>, Long> entry : counterStringMap_.entrySet()) {
+                    reporter.incrCounter(entry.getKey().first, entry.getKey().second, entry.getValue());
+                }
+                counterStringMap_.clear();
+            }
+            return;
         }
-        return;
-      }
+
+        // In the case when reporter is not available, or we can't get the
+        // Counter, store in the local map.
+        Pair<String, String> key = new Pair<String, String>(group, counterName);
+        Long currentValue = counterStringMap_.get(key);
+        counterStringMap_.put(key, (currentValue == null ? 0 : currentValue) + incr);
     }
-    // In the case when reporter is not available, or we can't get the Counter,
-    // store in the local map.
-    Pair<String, String> key = new Pair<String, String>(group, counterName);
-    Long currentValue = counterStringMap_.get(key);
-    counterStringMap_.put(key, (currentValue == null ? 0 : currentValue) + incr);
-  }
-
-  /**
-   * Mocks the Reporter.incrCounter, but adds buffering.
-   * See org.apache.hadoop.mapred.Reporter's incrCounter.
-   */
-  public void incrCounter(Enum<?> key, long incr) {
-    PigStatusReporter reporter = PigStatusReporter.getInstance();
-    if (reporter != null && reporter.getCounter(key) != null) {
-      reporter.getCounter(key).increment(incr);
-      if (counterEnumMap_.size() > 0) {
-        for (Map.Entry<Enum<?>, Long> entry : counterEnumMap_.entrySet()) {
-          reporter.getCounter(entry.getKey()).increment(entry.getValue());
+
+    /**
+    * Mocks the Reporter.incrCounter, but adds buffering.
+    * See org.apache.hadoop.mapred.Reporter's incrCounter.
+    */
+    public void incrCounter(Enum<?> key, long incr) {
+        PigStatusReporter reporter = PigStatusReporter.getInstance();
+        if (reporter != null && reporter.incrCounter(key, incr)) { // common case
+            if (counterEnumMap_.size() > 0) {
+                for (Map.Entry<Enum<?>, Long> entry : counterEnumMap_.entrySet()) {
+                    reporter.getCounter(entry.getKey()).increment(entry.getValue());
+                }
+                counterEnumMap_.clear();
+            }
+            return;
         }
-        counterEnumMap_.clear();
-      }
-    } else { // buffer the increments
-      Long currentValue = counterEnumMap_.get(key);
-      counterEnumMap_.put(key, (currentValue == null ? 0 : currentValue) + incr);
+
+        // In the case when reporter is not available, or we can't get the
+        // Counter, store in the local map.
+        Long currentValue = counterEnumMap_.get(key);
+        counterEnumMap_.put(key, (currentValue == null ? 0 : currentValue) + incr);
     }
-  }
 }

Modified: pig/trunk/src/org/apache/pig/tools/pigstats/PigStatusReporter.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/tools/pigstats/PigStatusReporter.java?rev=1590819&r1=1590818&r2=1590819&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/tools/pigstats/PigStatusReporter.java (original)
+++ pig/trunk/src/org/apache/pig/tools/pigstats/PigStatusReporter.java Mon Apr 28 21:59:43 2014
@@ -20,80 +20,85 @@ package org.apache.pig.tools.pigstats;
 
 import org.apache.hadoop.mapreduce.Counter;
 import org.apache.hadoop.mapreduce.StatusReporter;
-import org.apache.hadoop.mapreduce.TaskInputOutputContext;
 import org.apache.hadoop.util.Progressable;
-import org.apache.pig.backend.hadoop.executionengine.fetch.FetchContext;
+import org.apache.pig.backend.hadoop.executionengine.shims.TaskContext;
 import org.apache.pig.classification.InterfaceAudience;
 import org.apache.pig.classification.InterfaceStability;
 
-@SuppressWarnings("unchecked")
 @InterfaceAudience.Public
 @InterfaceStability.Evolving
 public class PigStatusReporter extends StatusReporter implements Progressable {
 
-    private TaskInputOutputContext context;
-    private FetchContext fetchContext;
-
     private static PigStatusReporter reporter = null;
 
+    private TaskContext<?> context = null;
+
+    private PigStatusReporter() {
+    }
+
     /**
      * Get singleton instance of the context
      */
     public static PigStatusReporter getInstance() {
         if (reporter == null) {
-            reporter = new PigStatusReporter(null);
+            reporter = new PigStatusReporter();
         }
         return reporter;
     }
 
-    public static void setContext(TaskInputOutputContext context) {
-        reporter = new PigStatusReporter(context);
+    public void destory() {
+        context = null;
     }
 
-    private PigStatusReporter(TaskInputOutputContext context) {
+    public void setContext(TaskContext<?> context) {
         this.context = context;
     }
 
+    /**
+     * @deprecated use {@link org.apache.pig.tools.pigstats.PigStatusReporter#incrCounter} instead.
+     * This method returns MR counter which is not compatible with Tez mode. Use
+     * incrCounter() that is compatible with both MR and Tez mode.
+     */
     @Override
+    @Deprecated
     public Counter getCounter(Enum<?> name) {
-        if (fetchContext != null) {
-            return fetchContext.getCounter(name);  
-        }
-        return (context == null) ? null : context.getCounter(name); 
+        return (context == null) ? null : context.getCounter(name);
     }
 
+    /**
+     * @deprecated use {@link org.apache.pig.tools.pigstats.PigStatusReporter#incrCounter} instead.
+     * This method returns MR counter which is not compatible with Tez mode. Use
+     * incrCounter() that is compatible with both MR and Tez mode.
+     */
     @Override
+    @Deprecated
     public Counter getCounter(String group, String name) {
-        if (fetchContext != null) {
-            return fetchContext.getCounter(group, name);
-        }
-        return (context == null) ? null : context.getCounter(group, name);
+        return context == null ? null : context.getCounter(group, name);
+    }
+
+    public boolean incrCounter(Enum<?> name, long incr) {
+        return context == null ? false : context.incrCounter(name, incr);
+    }
+
+    public boolean incrCounter(String group, String name, long incr) {
+        return context == null ? false : context.incrCounter(group, name, incr);
     }
 
     @Override
     public void progress() {
-        if (fetchContext == null && context != null) {
+        if (context != null) {
             context.progress();
         }
     }
 
     @Override
     public void setStatus(String status) {
-        if (fetchContext == null && context != null) {
+        if (context != null) {
             context.setStatus(status);
         }
     }
 
     public float getProgress() {
-        return 0;
+        return context == null ? 0f : context.getProgress();
     }
-
-    /**
-     * Sets a dummy counter handler for fetch tasks
-     * @param fetchContext
-     */
-    public void setFetchContext(FetchContext fetchContext) {
-        this.fetchContext = fetchContext;
-    }
-
 }