You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by an...@apache.org on 2014/01/23 06:33:48 UTC

svn commit: r1560584 - in /pig/trunk: CHANGES.txt src/org/apache/pig/LoadFunc.java src/org/apache/pig/StoreFunc.java src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigHadoopLogger.java

Author: aniket486
Date: Thu Jan 23 05:33:48 2014
New Revision: 1560584

URL: http://svn.apache.org/r1560584
Log:
PIG-2207: Support custom counters for aggregating warnings from different udfs (aniket486)

Modified:
    pig/trunk/CHANGES.txt
    pig/trunk/src/org/apache/pig/LoadFunc.java
    pig/trunk/src/org/apache/pig/StoreFunc.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigHadoopLogger.java

Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1560584&r1=1560583&r2=1560584&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Thu Jan 23 05:33:48 2014
@@ -28,6 +28,8 @@ PIG-3419: Pluggable Execution Engine (ac
 
 IMPROVEMENTS
 
+PIG-2207: Support custom counters for aggregating warnings from different udfs (aniket486)
+
 PIG-3654: Add class cache to PigContext (tmwoodruff via daijy)
 
 PIG-3463: Pig should use hadoop local mode for small jobs (aniket486)

Modified: pig/trunk/src/org/apache/pig/LoadFunc.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/LoadFunc.java?rev=1560584&r1=1560583&r2=1560584&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/LoadFunc.java (original)
+++ pig/trunk/src/org/apache/pig/LoadFunc.java Thu Jan 23 05:33:48 2014
@@ -33,6 +33,7 @@ import org.apache.hadoop.mapreduce.Recor
 import org.apache.pig.classification.InterfaceAudience;
 import org.apache.pig.classification.InterfaceStability;
 import org.apache.pig.LoadPushDown.RequiredFieldList;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigHadoopLogger;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
 import org.apache.pig.builtin.Utf8StorageConverter;
 import org.apache.pig.data.Tuple;
@@ -302,8 +303,6 @@ public abstract class LoadFunc {
      * @param warningEnum type of warning
      */
     public final void warn(String msg, Enum warningEnum) {
-        Counter counter = PigStatusReporter.getInstance().getCounter(warningEnum);
-        if (counter!=null)
-            counter.increment(1);
+        PigHadoopLogger.getInstance().warn(this, msg, warningEnum);
     }
 }

Modified: pig/trunk/src/org/apache/pig/StoreFunc.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/StoreFunc.java?rev=1560584&r1=1560583&r2=1560584&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/StoreFunc.java (original)
+++ pig/trunk/src/org/apache/pig/StoreFunc.java Thu Jan 23 05:33:48 2014
@@ -26,6 +26,7 @@ import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.OutputFormat;
 import org.apache.hadoop.mapreduce.RecordWriter;
 
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigHadoopLogger;
 import org.apache.pig.classification.InterfaceAudience;
 import org.apache.pig.classification.InterfaceStability;
 import org.apache.pig.data.Tuple;
@@ -202,7 +203,6 @@ public abstract class StoreFunc implemen
      * @param warningEnum type of warning
      */
     public final void warn(String msg, Enum warningEnum) {
-        Counter counter = PigStatusReporter.getInstance().getCounter(warningEnum);
-        counter.increment(1);
+        PigHadoopLogger.getInstance().warn(this, msg, warningEnum);
     }
 }

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigHadoopLogger.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigHadoopLogger.java?rev=1560584&r1=1560583&r2=1560584&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigHadoopLogger.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigHadoopLogger.java Thu Jan 23 05:33:48 2014
@@ -17,26 +17,31 @@
  */
 package org.apache.pig.backend.hadoop.executionengine.mapReduceLayer;
 
+import java.util.Map;
+import java.util.WeakHashMap;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.pig.EvalFunc;
+import org.apache.pig.LoadFunc;
+import org.apache.pig.StoreFunc;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PigLogger;
 import org.apache.pig.tools.pigstats.PigStatusReporter;
 
 /**
- * 
+ *
  * A singleton class that implements the PigLogger interface
  * for use in map reduce context. Provides ability to aggregate
  * warning messages
  */
 public final class PigHadoopLogger implements PigLogger {
-    private static PigHadoopLogger instance = new PigHadoopLogger();  
-    
-    public static synchronized PigHadoopLogger getInstance() {
-        if (instance == null) {
-            instance = new PigHadoopLogger();
-        }
-        return instance;
-    } 
+    private static class PigHadoopLoggerHelper {
+        private static PigHadoopLogger instance = new PigHadoopLogger();
+    }
+
+    public static PigHadoopLogger getInstance() {
+        return PigHadoopLoggerHelper.instance;
+    }
 
     private static Log log = LogFactory.getLog(PigHadoopLogger.class);
 
@@ -44,21 +49,33 @@ public final class PigHadoopLogger imple
 
     private boolean aggregate = false;
 
+    private Map<Object, String> msgMap = new WeakHashMap<Object, String>();
+
     private PigHadoopLogger() {
-    }    
+    }
 
-    @SuppressWarnings("unchecked")
+    @SuppressWarnings("rawtypes")
     public void warn(Object o, String msg, Enum warningEnum) {
-        String displayMessage = o.getClass().getName() + ": " + msg;
-        
+        String className = o.getClass().getName();
+        String displayMessage = className + "(" + warningEnum + "): " + msg;
+
         if (getAggregate()) {
             if (reporter != null) {
-                reporter.getCounter(warningEnum).increment(1);
+                // log atleast once
+                if (!msgMap.get(o).equals(displayMessage)) {
+                    log.warn(displayMessage);
+                    msgMap.put(o, displayMessage);
+                }
+                if (o instanceof EvalFunc || o instanceof LoadFunc || o instanceof StoreFunc) {
+                    reporter.getCounter(className, warningEnum.name()).increment(1);
+                } else {
+                    reporter.getCounter(warningEnum).increment(1);
+                }
             } else {
                 //TODO:
                 //in local mode of execution if the PigHadoopLogger is used initially,
-                //then aggregation cannot be performed as the reporter will be null. 
-                //The reference to a reporter is given by Hadoop at run time. 
+                //then aggregation cannot be performed as the reporter will be null.
+                //The reference to a reporter is given by Hadoop at run time.
                 //In local mode, due to the absence of Hadoop there will be no reporter
                 //Just print the warning message as is.
                 //If a warning message is printed in map reduce mode when aggregation
@@ -68,16 +85,16 @@ public final class PigHadoopLogger imple
         } else {
             log.warn(displayMessage);
         }
-    }    
+    }
 
     public synchronized void setReporter(PigStatusReporter rep) {
         this.reporter = rep;
     }
-    
+
     public synchronized boolean getAggregate() {
         return aggregate;
     }
-    
+
     public synchronized void setAggregate(boolean aggregate) {
         this.aggregate = aggregate;
     }