You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by kn...@apache.org on 2014/10/06 23:30:55 UTC

svn commit: r1629765 - in /pig/branches/branch-0.14: CHANGES.txt src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduceCounter.java

Author: knoguchi
Date: Mon Oct  6 21:30:54 2014
New Revision: 1629765

URL: http://svn.apache.org/r1629765
Log:
PIG-4220: MapReduce-based Rank failing with NPE due to missing Counters (knoguchi)

Modified:
    pig/branches/branch-0.14/CHANGES.txt
    pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduceCounter.java

Modified: pig/branches/branch-0.14/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.14/CHANGES.txt?rev=1629765&r1=1629764&r2=1629765&view=diff
==============================================================================
--- pig/branches/branch-0.14/CHANGES.txt (original)
+++ pig/branches/branch-0.14/CHANGES.txt Mon Oct  6 21:30:54 2014
@@ -84,6 +84,8 @@ OPTIMIZATIONS
  
 BUG FIXES
 
+PIG-4220: MapReduce-based Rank failing with NPE due to missing Counters (knoguchi)
+
 PIG-3985: Multiquery execution of RANK with RANK BY causes NPE (rohini)
 
 PIG-4218: Pig OrcStorage fail to load a map with null key (daijy)

Modified: pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduceCounter.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduceCounter.java?rev=1629765&r1=1629764&r2=1629765&view=diff
==============================================================================
--- pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduceCounter.java (original)
+++ pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduceCounter.java Mon Oct  6 21:30:54 2014
@@ -60,6 +60,13 @@ public class PigMapReduceCounter {
                     pOperator = mp.getPredecessors(pOperator).get(0);
                 }
             }
+
+            PigStatusReporter reporter = PigStatusReporter.getInstance();
+            if (reporter != null) {
+                reporter.incrCounter(
+                        JobControlCompiler.PIG_MAP_RANK_NAME
+                        + context.getJobID().toString(), taskID, 0);
+            }
         }
 
         /**
@@ -69,15 +76,11 @@ public class PigMapReduceCounter {
         public void collect(Context context, Tuple tuple)
         throws InterruptedException, IOException {
             context.write(null, tuple);
-            try {
-                PigStatusReporter reporter = PigStatusReporter.getInstance();
-                if (reporter != null) {
-                    reporter.incrCounter(
-                            JobControlCompiler.PIG_MAP_RANK_NAME
-                            + context.getJobID().toString(), taskID, 1);
-                }
-            } catch (Exception ex) {
-                log.error("Error on incrementer of PigMapCounter");
+            PigStatusReporter reporter = PigStatusReporter.getInstance();
+            if (reporter != null) {
+                reporter.incrCounter(
+                        JobControlCompiler.PIG_MAP_RANK_NAME
+                        + context.getJobID().toString(), taskID, 1);
             }
         }
     }
@@ -116,6 +119,7 @@ public class PigMapReduceCounter {
             }
 
             this.context = context;
+            incrementCounter(0L);
         }
 
         /**
@@ -127,21 +131,14 @@ public class PigMapReduceCounter {
          * @param increment is the value to add to the corresponding global counter.
          **/
         public static void incrementCounter(Long increment) {
-            try {
-                PigStatusReporter reporter = PigStatusReporter.getInstance();
-                if (reporter != null) {
-
-                    if(leaf instanceof POCounter){
-                        reporter.incrCounter(
-                                JobControlCompiler.PIG_MAP_RANK_NAME
-                                + context.getJobID().toString(), taskID, increment);
-                    }
-
+            PigStatusReporter reporter = PigStatusReporter.getInstance();
+            if (reporter != null) {
+                if(leaf instanceof POCounter){
+                    reporter.incrCounter(
+                            JobControlCompiler.PIG_MAP_RANK_NAME
+                            + context.getJobID().toString(), taskID, increment);
                 }
-            } catch (Exception ex) {
-                log.error("Error on incrementer of PigReduceCounter");
             }
-
         }
     }
 }