You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ch...@apache.org on 2014/05/02 03:08:37 UTC

svn commit: r1591802 - in /pig/trunk: ./ shims/src/hadoop20/org/apache/pig/backend/hadoop/executionengine/shims/ shims/src/hadoop23/org/apache/pig/backend/hadoop/executionengine/shims/ src/org/apache/pig/backend/hadoop/executionengine/ src/org/apache/p...

Author: cheolsoo
Date: Fri May  2 01:08:37 2014
New Revision: 1591802

URL: http://svn.apache.org/r1591802
Log:
PIG-3914: Change TaskContext to abstract class (cheolsoo)

Added:
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/TaskContext.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/fetch/FetchTaskContext.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRTaskContext.java
Removed:
    pig/trunk/shims/src/hadoop20/org/apache/pig/backend/hadoop/executionengine/shims/TaskContext.java
    pig/trunk/shims/src/hadoop23/org/apache/pig/backend/hadoop/executionengine/shims/TaskContext.java
Modified:
    pig/trunk/CHANGES.txt
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/fetch/FetchLauncher.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReducePOStoreImpl.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigCombiner.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapBase.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapReduce.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigRecordReader.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStore.java
    pig/trunk/src/org/apache/pig/tools/pigstats/PigStatusReporter.java

Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1591802&r1=1591801&r2=1591802&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Fri May  2 01:08:37 2014
@@ -32,6 +32,8 @@ PIG-2207: Support custom counters for ag
 
 IMPROVEMENTS
 
+PIG-3914: Change TaskContext to abstract class (cheolsoo)
+
 PIG-3672: Pig should not check for hardcoded file system implementations (rohini)
 
 PIG-3860: Refactor PigStatusReporter and PigLogger for non-MR execution engine (cheolsoo)

Added: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/TaskContext.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/TaskContext.java?rev=1591802&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/TaskContext.java (added)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/TaskContext.java Fri May  2 01:08:37 2014
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine;
+
+import org.apache.hadoop.mapreduce.Counter;
+
+public abstract class TaskContext<T> {
+    public abstract T get();
+
+    public abstract Counter getCounter(Enum<?> name);
+
+    public abstract Counter getCounter(String group, String name);
+
+    public abstract boolean incrCounter(Enum<?> name, long delta);
+
+    public abstract boolean incrCounter(String group, String name, long delta);
+
+    public void progress() {
+    }
+
+    public float getProgress() {
+        return 0f;
+    }
+
+    public void setStatus(String status) {
+    }
+}

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/fetch/FetchLauncher.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/fetch/FetchLauncher.java?rev=1591802&r1=1591801&r2=1591802&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/fetch/FetchLauncher.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/fetch/FetchLauncher.java Fri May  2 01:08:37 2014
@@ -25,6 +25,7 @@ import org.apache.hadoop.mapreduce.TaskA
 import org.apache.pig.PigException;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
+import org.apache.pig.backend.hadoop.executionengine.TaskContext;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigHadoopLogger;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.UDFFinishVisitor;
@@ -36,7 +37,6 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStream;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;
 import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims;
-import org.apache.pig.backend.hadoop.executionengine.shims.TaskContext;
 import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil;
 import org.apache.pig.data.SchemaTupleBackend;
 import org.apache.pig.impl.PigContext;
@@ -148,7 +148,7 @@ public class FetchLauncher {
 
         boolean aggregateWarning = "true".equalsIgnoreCase(conf.get("aggregate.warning"));
         PigStatusReporter pigStatusReporter = PigStatusReporter.getInstance();
-        pigStatusReporter.setContext(new TaskContext<FetchContext>(new FetchContext()));
+        pigStatusReporter.setContext(new FetchTaskContext(new FetchContext()));
         PigHadoopLogger pigHadoopLogger = PigHadoopLogger.getInstance();
         pigHadoopLogger.setReporter(pigStatusReporter);
         pigHadoopLogger.setAggregate(aggregateWarning);

Added: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/fetch/FetchTaskContext.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/fetch/FetchTaskContext.java?rev=1591802&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/fetch/FetchTaskContext.java (added)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/fetch/FetchTaskContext.java Fri May  2 01:08:37 2014
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.fetch;
+
+import org.apache.hadoop.mapreduce.Counter;
+import org.apache.pig.backend.hadoop.executionengine.TaskContext;
+
+public class FetchTaskContext extends TaskContext<FetchContext> {
+    private FetchContext context;
+
+    public FetchTaskContext(FetchContext context) {
+        this.context = context;
+    }
+
+    @Override
+    public FetchContext get() {
+        return context;
+    }
+
+    @Override
+    public Counter getCounter(Enum<?> name) {
+        if (context == null) {
+            return null;
+        }
+        return context.getCounter(name);
+    }
+
+    @Override
+    public Counter getCounter(String group, String name) {
+        if (context == null) {
+            return null;
+        }
+        return context.getCounter(group, name);
+    }
+
+    @Override
+    public boolean incrCounter(Enum<?> name, long delta) {
+        if (context == null) {
+            return false;
+        }
+        Counter counter = context.getCounter(name);
+        counter.increment(delta);
+        return true;
+    }
+
+    @Override
+    public boolean incrCounter(String group, String name, long delta) {
+        if (context == null) {
+            return false;
+        }
+        Counter counter = context.getCounter(group, name);
+        counter.increment(delta);
+        return true;
+    }
+}

Added: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRTaskContext.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRTaskContext.java?rev=1591802&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRTaskContext.java (added)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRTaskContext.java Fri May  2 01:08:37 2014
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.mapReduceLayer;
+
+import org.apache.hadoop.mapreduce.Counter;
+import org.apache.hadoop.mapreduce.TaskInputOutputContext;
+import org.apache.pig.backend.hadoop.executionengine.TaskContext;
+
+public class MRTaskContext extends TaskContext<TaskInputOutputContext<?,?,?,?>> {
+    private TaskInputOutputContext<?,?,?,?> context;
+
+    public MRTaskContext(TaskInputOutputContext<?,?,?,?> context) {
+        this.context = context;
+    }
+
+    @Override
+    public TaskInputOutputContext<?,?,?,?> get() {
+        return context;
+    }
+
+    @Override
+    public Counter getCounter(Enum<?> name) {
+        if (context == null) {
+            return null;
+        }
+        return context.getCounter(name);
+    }
+
+    @Override
+    public Counter getCounter(String group, String name) {
+        if (context == null) {
+            return null;
+        }
+        return context.getCounter(group, name);
+    }
+
+    @Override
+    public boolean incrCounter(Enum<?> name, long delta) {
+        if (context == null) {
+            return false;
+        }
+        Counter counter = context.getCounter(name);
+        counter.increment(delta);
+        return true;
+    }
+
+    @Override
+    public boolean incrCounter(String group, String name, long delta) {
+        if (context == null) {
+            return false;
+        }
+        Counter counter = context.getCounter(group, name);
+        counter.increment(delta);
+        return true;
+    }
+
+    public void progress() {
+        if (context == null) {
+            return;
+        }
+        context.progress();
+    }
+
+    public void setStatus(String status) {
+        if (context == null) {
+            return;
+        }
+        context.setStatus(status);
+    }
+}

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReducePOStoreImpl.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReducePOStoreImpl.java?rev=1591802&r1=1591801&r2=1591802&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReducePOStoreImpl.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReducePOStoreImpl.java Fri May  2 01:08:37 2014
@@ -20,7 +20,6 @@ package org.apache.pig.backend.hadoop.ex
 import java.io.IOException;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.Counter;
 import org.apache.hadoop.mapreduce.OutputFormat;
 import org.apache.hadoop.mapreduce.RecordWriter;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
@@ -29,8 +28,7 @@ import org.apache.pig.StoreFuncInterface
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStoreImpl;
 import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims;
-import org.apache.pig.backend.hadoop.executionengine.shims.TaskContext;
-import org.apache.pig.tools.pigstats.mapreduce.MRPigStatsUtil;
+import org.apache.pig.tools.pigstats.PigStatsUtil;
 import org.apache.pig.tools.pigstats.PigStatusReporter;
 /**
  * This class is used to have a POStore write to DFS via a output
@@ -51,7 +49,7 @@ public class MapReducePOStoreImpl extend
         // not affect the caller's copy
         Configuration outputConf = new Configuration(context.getConfiguration());
         reporter = PigStatusReporter.getInstance();
-        reporter.setContext(new TaskContext<TaskInputOutputContext<?,?,?,?>>(context));
+        reporter.setContext(new MRTaskContext(context));
 
         // make a copy of the Context to use here - since in the same
         // task (map or reduce) we could have multiple stores, we should
@@ -113,9 +111,7 @@ public class MapReducePOStoreImpl extend
         }
     }
 
-    public Counter createRecordCounter(POStore store) {
-        String name = MRPigStatsUtil.getMultiStoreCounterName(store);
-        return (name == null) ? null : reporter.getCounter(
-                MRPigStatsUtil.MULTI_STORE_COUNTER_GROUP, name);
+    public void incrRecordCounter(String name, long incr) {
+        reporter.incrCounter(PigStatsUtil.MULTI_STORE_COUNTER_GROUP, name, incr);
     }
 }

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigCombiner.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigCombiner.java?rev=1591802&r1=1591801&r2=1591802&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigCombiner.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigCombiner.java Fri May  2 01:08:37 2014
@@ -27,7 +27,6 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.Reducer;
-import org.apache.hadoop.mapreduce.TaskInputOutputContext;
 import org.apache.log4j.PropertyConfigurator;
 import org.apache.pig.PigException;
 import org.apache.pig.backend.executionengine.ExecException;
@@ -38,7 +37,6 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.JoinPackager;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
-import org.apache.pig.backend.hadoop.executionengine.shims.TaskContext;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.io.NullableTuple;
@@ -142,7 +140,7 @@ public class PigCombiner {
 
                 boolean aggregateWarning = "true".equalsIgnoreCase(pigContext.getProperties().getProperty("aggregate.warning"));
                 PigStatusReporter pigStatusReporter = PigStatusReporter.getInstance();
-                pigStatusReporter.setContext(new TaskContext<TaskInputOutputContext<?,?,?,?>>(context));
+                pigStatusReporter.setContext(new MRTaskContext(context));
                 PigHadoopLogger pigHadoopLogger = PigHadoopLogger.getInstance();
                 pigHadoopLogger.setReporter(pigStatusReporter);
                 pigHadoopLogger.setAggregate(aggregateWarning);

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapBase.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapBase.java?rev=1591802&r1=1591801&r2=1591802&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapBase.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapBase.java Fri May  2 01:08:37 2014
@@ -31,7 +31,6 @@ import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.Mapper;
-import org.apache.hadoop.mapreduce.TaskInputOutputContext;
 import org.apache.log4j.PropertyConfigurator;
 import org.apache.pig.PigException;
 import org.apache.pig.backend.executionengine.ExecException;
@@ -42,7 +41,6 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;
-import org.apache.pig.backend.hadoop.executionengine.shims.TaskContext;
 import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil;
 import org.apache.pig.data.DataBag;
 import org.apache.pig.data.SchemaTupleBackend;
@@ -159,7 +157,7 @@ public abstract class PigGenericMapBase 
      */
     @SuppressWarnings("unchecked")
     @Override
-    public void setup(Context context) throws IOException, InterruptedException {       	
+    public void setup(Context context) throws IOException, InterruptedException {
         super.setup(context);
 
         Configuration job = context.getConfiguration();
@@ -212,7 +210,7 @@ public abstract class PigGenericMapBase 
         }
 
         PigStatusReporter pigStatusReporter = PigStatusReporter.getInstance();
-        pigStatusReporter.setContext(new TaskContext<TaskInputOutputContext<?,?,?,?>>(context));
+        pigStatusReporter.setContext(new MRTaskContext(context));
 
         log.info("Aliases being processed per job phase (AliasName[line,offset]): " + job.get("pig.alias.location"));
 
@@ -244,7 +242,7 @@ public abstract class PigGenericMapBase 
 
             boolean aggregateWarning = "true".equalsIgnoreCase(pigContext.getProperties().getProperty("aggregate.warning"));
             PigStatusReporter pigStatusReporter = PigStatusReporter.getInstance();
-            pigStatusReporter.setContext(new TaskContext<TaskInputOutputContext<?,?,?,?>>(context));
+            pigStatusReporter.setContext(new MRTaskContext(context));
             PigHadoopLogger pigHadoopLogger = PigHadoopLogger.getInstance();
             pigHadoopLogger.setReporter(pigStatusReporter);
             pigHadoopLogger.setAggregate(aggregateWarning);

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapReduce.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapReduce.java?rev=1591802&r1=1591801&r2=1591802&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapReduce.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapReduce.java Fri May  2 01:08:37 2014
@@ -30,7 +30,6 @@ import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapred.jobcontrol.Job;
 import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.Reducer;
-import org.apache.hadoop.mapreduce.TaskInputOutputContext;
 import org.apache.pig.PigException;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.backend.hadoop.HDataType;
@@ -43,7 +42,6 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;
-import org.apache.pig.backend.hadoop.executionengine.shims.TaskContext;
 import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil;
 import org.apache.pig.data.DataBag;
 import org.apache.pig.data.DataType;
@@ -377,7 +375,7 @@ public class PigGenericMapReduce {
 
                 boolean aggregateWarning = "true".equalsIgnoreCase(pigContext.getProperties().getProperty("aggregate.warning"));
                 PigStatusReporter pigStatusReporter = PigStatusReporter.getInstance();
-                pigStatusReporter.setContext(new TaskContext<TaskInputOutputContext<?,?,?,?>>(context));
+                pigStatusReporter.setContext(new MRTaskContext(context));
                 PigHadoopLogger pigHadoopLogger = PigHadoopLogger.getInstance();
                 pigHadoopLogger.setReporter(pigStatusReporter);
                 pigHadoopLogger.setAggregate(aggregateWarning);
@@ -608,7 +606,7 @@ public class PigGenericMapReduce {
 
                 boolean aggregateWarning = "true".equalsIgnoreCase(pigContext.getProperties().getProperty("aggregate.warning"));
                 PigStatusReporter pigStatusReporter = PigStatusReporter.getInstance();
-                pigStatusReporter.setContext(new TaskContext<TaskInputOutputContext<?,?,?,?>>(context));
+                pigStatusReporter.setContext(new MRTaskContext(context));
                 PigHadoopLogger pigHadoopLogger = PigHadoopLogger.getInstance();
                 pigHadoopLogger.setReporter(pigStatusReporter);
                 pigHadoopLogger.setAggregate(aggregateWarning);

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigRecordReader.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigRecordReader.java?rev=1591802&r1=1591801&r2=1591802&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigRecordReader.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigRecordReader.java Fri May  2 01:08:37 2014
@@ -26,7 +26,6 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.Counter;
 import org.apache.hadoop.mapreduce.InputFormat;
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.RecordReader;
@@ -36,17 +35,17 @@ import org.apache.pig.backend.hadoop.dat
 import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.io.FileSpec;
 import org.apache.pig.impl.util.ObjectSerializer;
-import org.apache.pig.tools.pigstats.mapreduce.MRPigStatsUtil;
+import org.apache.pig.tools.pigstats.PigStatsUtil;
 import org.apache.pig.tools.pigstats.PigStatusReporter;
 
 /**
  * A wrapper around the actual RecordReader and loadfunc - this is needed for
  * two reasons
  * 1) To intercept the initialize call from hadoop and initialize the underlying
- * actual RecordReader with the right Context object - this is achieved by 
- * looking up the Context corresponding to the input split this Reader is 
+ * actual RecordReader with the right Context object - this is achieved by
+ * looking up the Context corresponding to the input split this Reader is
  * supposed to process
- * 2) We need to give hadoop consistent key-value types - text and tuple 
+ * 2) We need to give hadoop consistent key-value types - text and tuple
  * respectively - so PigRecordReader will call underlying Loader's getNext() to
  * get the Tuple value - the key is null text since key is not used in input to
  * map() in Pig.
@@ -66,55 +65,54 @@ public class PigRecordReader extends Rec
      * {@link LoadFunc#getNext()}
      */
     Tuple curValue = null;
-    
+
     // the current wrapped RecordReader used by the loader
-    @SuppressWarnings("unchecked")
-    private RecordReader curReader;
-    
+    private RecordReader<?, ?> curReader;
+
     // the loader object
     private LoadFunc loadfunc;
-    
-    // the Hadoop counter for multi-input jobs 
-    transient private Counter inputRecordCounter = null;
-    
+
     // the Hadoop counter name
     transient private String counterName = null;
-    
+
     // the wrapped inputformat
-    private InputFormat inputformat;
-    
+    private InputFormat<?, ?> inputformat;
+
     // the wrapped splits
     private PigSplit pigSplit;
-    
+
     // the wrapped split index in use
     private int idx;
-    
+
     private long progress;
-    
+
     private TaskAttemptContext context;
-    
+
+    private PigStatusReporter reporter;
+
     private final long limit;
 
     private long recordCount = 0;
-    
+
     /**
      * the Configuration object with data specific to the input the underlying
-     * RecordReader will process (this is obtained after a 
-     * {@link LoadFunc#setLocation(String, org.apache.hadoop.mapreduce.Job)} 
+     * RecordReader will process (this is obtained after a
+     * {@link LoadFunc#setLocation(String, org.apache.hadoop.mapreduce.Job)}
      * call and hence can contain specific properties the underlying
      * {@link InputFormat} might have put in.
      */
     private Configuration inputSpecificConf;
     /**
-     * @param context 
-     * 
+     * @param context
+     *
      */
-    public PigRecordReader(InputFormat inputformat, PigSplit pigSplit, 
+    public PigRecordReader(InputFormat<?, ?> inputformat, PigSplit pigSplit,
             LoadFunc loadFunc, TaskAttemptContext context, long limit) throws IOException, InterruptedException {
         this.inputformat = inputformat;
-        this.pigSplit = pigSplit; 
+        this.pigSplit = pigSplit;
         this.loadfunc = loadFunc;
         this.context = context;
+        this.reporter = PigStatusReporter.getInstance();
         this.inputSpecificConf = context.getConfiguration();
         curReader = null;
         progress = 0;
@@ -124,7 +122,7 @@ public class PigRecordReader extends Rec
         counterGroup = loadFunc.toString();
         doTiming = context.getConfiguration().getBoolean(TIME_UDFS_PROP, false);
     }
-    
+
     @Override
     public void close() throws IOException {
         if (curReader != null) {
@@ -141,23 +139,11 @@ public class PigRecordReader extends Rec
     }
 
     @Override
-    public Tuple getCurrentValue() throws IOException, InterruptedException {    
-        if (inputRecordCounter == null && counterName != null) {
-            PigStatusReporter reporter = PigStatusReporter.getInstance();
-            if (reporter != null) {
-                inputRecordCounter = reporter.getCounter(
-                        MRPigStatsUtil.MULTI_INPUTS_COUNTER_GROUP,
-                        counterName);
-                LOG.info("Created input record counter: " + counterName);
-            } else {
-                LOG.warn("Get null reporter for " + counterName);
-            }
-        }
+    public Tuple getCurrentValue() throws IOException, InterruptedException {
         // Increment the multi-input record counter
-        if (inputRecordCounter != null && curValue != null) {
-            inputRecordCounter.increment(1);            
+        if (counterName != null && curValue != null) {
+            reporter.incrCounter(PigStatsUtil.MULTI_INPUTS_COUNTER_GROUP, counterName, 1);
         }
-       
         return curValue;
     }
 
@@ -174,8 +160,8 @@ public class PigRecordReader extends Rec
     @Override
     public void initialize(InputSplit split, TaskAttemptContext context)
             throws IOException, InterruptedException {
-        // initialize the underlying actual RecordReader with the right Context 
-        // object - this is achieved by merging the Context corresponding to 
+        // initialize the underlying actual RecordReader with the right Context
+        // object - this is achieved by merging the Context corresponding to
         // the input split this Reader is supposed to process with the context
         // passed in.
         this.pigSplit = (PigSplit)split;
@@ -184,7 +170,7 @@ public class PigRecordReader extends Rec
                 inputSpecificConf);
         // Pass loader signature to LoadFunc and to InputFormat through
         // the conf
-        PigInputFormat.passLoadSignature(loadfunc, pigSplit.getInputIndex(), 
+        PigInputFormat.passLoadSignature(loadfunc, pigSplit.getInputIndex(),
                 context.getConfiguration());
         // now invoke initialize() on underlying RecordReader with
         // the "adjusted" conf
@@ -192,9 +178,13 @@ public class PigRecordReader extends Rec
             curReader.initialize(pigSplit.getWrappedSplit(), context);
             loadfunc.prepareToRead(curReader, pigSplit);
         }
-                
-        if (pigSplit.isMultiInputs() && !pigSplit.disableCounter()) { 
+        if (pigSplit.isMultiInputs() && !pigSplit.disableCounter()) {
             counterName = getMultiInputsCounerName(pigSplit, inputSpecificConf);
+            if (counterName != null) {
+                // Create the counter. This is needed because incrCounter() may
+                // never be called in case of empty file.
+                reporter.incrCounter(PigStatsUtil.MULTI_INPUTS_COUNTER_GROUP, counterName, 0);
+            }
         }
     }
 
@@ -214,7 +204,7 @@ public class PigRecordReader extends Rec
             }
         }
         if (timeThis) {
-            PigStatusReporter.getInstance().incrCounter(counterGroup, TIMING_COUNTER,
+            reporter.incrCounter(counterGroup, TIMING_COUNTER,
                     Math.round((System.nanoTime() - startNanos) / 1000) * TIMING_FREQ);
         }
         recordCount++;
@@ -224,13 +214,13 @@ public class PigRecordReader extends Rec
     @SuppressWarnings("unchecked")
     private static String getMultiInputsCounerName(PigSplit pigSplit,
             Configuration conf) throws IOException {
-        ArrayList<FileSpec> inputs = 
+        ArrayList<FileSpec> inputs =
             (ArrayList<FileSpec>) ObjectSerializer.deserialize(
                     conf.get(PigInputFormat.PIG_INPUTS));
         String fname = inputs.get(pigSplit.getInputIndex()).getFileName();
-        return MRPigStatsUtil.getMultiInputsCounterName(fname, pigSplit.getInputIndex());
+        return PigStatsUtil.getMultiInputsCounterName(fname, pigSplit.getInputIndex());
     }
-    
+
     /**
      * Get the record reader for the next chunk in this CombineFileSplit.
      */
@@ -252,7 +242,7 @@ public class PigRecordReader extends Rec
 
         // get a record reader for the idx-th chunk
         try {
-          
+
             pigSplit.setCurrentIdx(idx);
             curReader =  inputformat.createRecordReader(pigSplit.getWrappedSplit(), context);
             LOG.info("Current split being processed "+pigSplit.getWrappedSplit());

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStore.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStore.java?rev=1591802&r1=1591801&r2=1591802&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStore.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStore.java Fri May  2 01:08:37 2014
@@ -19,9 +19,7 @@ package org.apache.pig.backend.hadoop.ex
 
 import java.io.IOException;
 import java.util.List;
-import java.util.LinkedList;
 
-import org.apache.hadoop.mapreduce.Counter;
 import org.apache.pig.PigException;
 import org.apache.pig.SortInfo;
 import org.apache.pig.StoreFuncInterface;
@@ -37,9 +35,9 @@ import org.apache.pig.impl.io.FileSpec;
 import org.apache.pig.impl.logicalLayer.schema.Schema;
 import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.impl.plan.VisitorException;
-import org.apache.pig.impl.util.IdentityHashSet;
 import org.apache.pig.pen.util.ExampleTuple;
 import org.apache.pig.pen.util.LineageTracer;
+import org.apache.pig.tools.pigstats.PigStatsUtil;
 
 /**
  * The store operator which is used in two ways:
@@ -53,37 +51,36 @@ public class POStore extends PhysicalOpe
 
     private static final long serialVersionUID = 1L;
     private static Result empty = new Result(POStatus.STATUS_NULL, null);
-    transient private StoreFuncInterface storer;    
+    transient private StoreFuncInterface storer;
     transient private POStoreImpl impl;
+    transient private String counterName = null;
     private FileSpec sFile;
     private Schema schema;
-    
-    transient private Counter outputRecordCounter = null;
 
     // flag to distinguish user stores from MRCompiler stores.
     private boolean isTmpStore;
-    
+
     // flag to distinguish single store from multiquery store.
     private boolean isMultiStore;
-    
+
     // flag to indicate if the custom counter should be disabled.
     private boolean disableCounter = false;
-    
+
     // the index of multiquery store to track counters
     private int index;
-    
+
     // If we know how to reload the store, here's how. The lFile
     // FileSpec is set in PigServer.postProcess. It can be used to
     // reload this store, if the optimizer has the need.
     private FileSpec lFile;
-    
+
     // if the predecessor of store is Sort (order by)
-    // then sortInfo will have information of the sort 
+    // then sortInfo will have information of the sort
     // column names and the asc/dsc info
     private SortInfo sortInfo;
-    
+
     private String signature;
-    
+
     public POStore(OperatorKey k) {
         this(k, -1, null);
     }
@@ -91,11 +88,11 @@ public class POStore extends PhysicalOpe
     public POStore(OperatorKey k, int rp) {
         this(k, rp, null);
     }
-    
+
     public POStore(OperatorKey k, int rp, List<PhysicalOperator> inp) {
         super(k, rp, inp);
     }
-    
+
     /**
      * Set up the storer
      * @throws IOException
@@ -105,17 +102,22 @@ public class POStore extends PhysicalOpe
             try{
                 storer = impl.createStoreFunc(this);
                 if (!isTmpStore && !disableCounter && impl instanceof MapReducePOStoreImpl) {
-                    outputRecordCounter = 
-                        ((MapReducePOStoreImpl) impl).createRecordCounter(this);
+                    counterName = PigStatsUtil.getMultiStoreCounterName(this);
+                    if (counterName != null) {
+                        // Create the counter. This is needed because
+                        // incrCounter() may never be called in case of empty
+                        // file.
+                        ((MapReducePOStoreImpl) impl).incrRecordCounter(counterName, 0);
+                    }
                 }
             }catch (IOException ioe) {
                 int errCode = 2081;
-                String msg = "Unable to setup the store function.";            
+                String msg = "Unable to setup the store function.";
                 throw new ExecException(msg, errCode, PigException.BUG, ioe);
             }
         }
     }
-    
+
     /**
      * Called at the end of processing for clean up.
      * @throws IOException
@@ -125,7 +127,7 @@ public class POStore extends PhysicalOpe
             impl.tearDown();
         }
    }
-    
+
     /**
      * To perform cleanup when there is an error.
      * @throws IOException
@@ -135,7 +137,7 @@ public class POStore extends PhysicalOpe
             impl.cleanUp();
         }
     }
-    
+
     @Override
     public Result getNextTuple() throws ExecException {
         Result res = processInput();
@@ -148,8 +150,8 @@ public class POStore extends PhysicalOpe
                     illustratorMarkup(res.result, res.result, 0);
                 res = empty;
 
-                if (outputRecordCounter != null) {
-                    outputRecordCounter.increment(1);
+                if (counterName != null) {
+                    ((MapReducePOStoreImpl) impl).incrRecordCounter(counterName, 1);
                 }
                 break;
             case POStatus.STATUS_EOP:
@@ -206,11 +208,11 @@ public class POStore extends PhysicalOpe
     public FileSpec getInputSpec() {
         return lFile;
     }
-    
+
     public void setIsTmpStore(boolean tmp) {
         isTmpStore = tmp;
     }
-    
+
     public boolean isTmpStore() {
         return isTmpStore;
     }
@@ -222,12 +224,12 @@ public class POStore extends PhysicalOpe
     public void setSchema(Schema schema) {
         this.schema = schema;
     }
-    
+
     public Schema getSchema() {
         return schema;
     }
-    
-    
+
+
     public StoreFuncInterface getStoreFunc() {
         if(storer == null){
             storer = (StoreFuncInterface)PigContext.instantiateFuncFromSpec(sFile.getFuncSpec());
@@ -235,7 +237,7 @@ public class POStore extends PhysicalOpe
         }
         return storer;
     }
-    
+
     /**
      * @param sortInfo the sortInfo to set
      */
@@ -249,11 +251,11 @@ public class POStore extends PhysicalOpe
     public SortInfo getSortInfo() {
         return sortInfo;
     }
-    
+
     public String getSignature() {
         return signature;
     }
-    
+
     public void setSignature(String signature) {
         this.signature = signature;
     }
@@ -265,7 +267,7 @@ public class POStore extends PhysicalOpe
     public boolean isMultiStore() {
         return isMultiStore;
     }
-    
+
     @Override
     public Tuple illustratorMarkup(Object in, Object out, int eqClassIndex) {
         if(illustrator != null) {

Modified: pig/trunk/src/org/apache/pig/tools/pigstats/PigStatusReporter.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/tools/pigstats/PigStatusReporter.java?rev=1591802&r1=1591801&r2=1591802&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/tools/pigstats/PigStatusReporter.java (original)
+++ pig/trunk/src/org/apache/pig/tools/pigstats/PigStatusReporter.java Fri May  2 01:08:37 2014
@@ -21,7 +21,7 @@ package org.apache.pig.tools.pigstats;
 import org.apache.hadoop.mapreduce.Counter;
 import org.apache.hadoop.mapreduce.StatusReporter;
 import org.apache.hadoop.util.Progressable;
-import org.apache.pig.backend.hadoop.executionengine.shims.TaskContext;
+import org.apache.pig.backend.hadoop.executionengine.TaskContext;
 import org.apache.pig.classification.InterfaceAudience;
 import org.apache.pig.classification.InterfaceStability;