You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by an...@apache.org on 2014/01/23 06:33:48 UTC
svn commit: r1560584 - in /pig/trunk: CHANGES.txt
src/org/apache/pig/LoadFunc.java src/org/apache/pig/StoreFunc.java
src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigHadoopLogger.java
Author: aniket486
Date: Thu Jan 23 05:33:48 2014
New Revision: 1560584
URL: http://svn.apache.org/r1560584
Log:
PIG-2207: Support custom counters for aggregating warnings from different udfs (aniket486)
Modified:
pig/trunk/CHANGES.txt
pig/trunk/src/org/apache/pig/LoadFunc.java
pig/trunk/src/org/apache/pig/StoreFunc.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigHadoopLogger.java
Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1560584&r1=1560583&r2=1560584&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Thu Jan 23 05:33:48 2014
@@ -28,6 +28,8 @@ PIG-3419: Pluggable Execution Engine (ac
IMPROVEMENTS
+PIG-2207: Support custom counters for aggregating warnings from different udfs (aniket486)
+
PIG-3654: Add class cache to PigContext (tmwoodruff via daijy)
PIG-3463: Pig should use hadoop local mode for small jobs (aniket486)
Modified: pig/trunk/src/org/apache/pig/LoadFunc.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/LoadFunc.java?rev=1560584&r1=1560583&r2=1560584&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/LoadFunc.java (original)
+++ pig/trunk/src/org/apache/pig/LoadFunc.java Thu Jan 23 05:33:48 2014
@@ -33,6 +33,7 @@ import org.apache.hadoop.mapreduce.Recor
import org.apache.pig.classification.InterfaceAudience;
import org.apache.pig.classification.InterfaceStability;
import org.apache.pig.LoadPushDown.RequiredFieldList;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigHadoopLogger;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
import org.apache.pig.builtin.Utf8StorageConverter;
import org.apache.pig.data.Tuple;
@@ -302,8 +303,6 @@ public abstract class LoadFunc {
* @param warningEnum type of warning
*/
public final void warn(String msg, Enum warningEnum) {
- Counter counter = PigStatusReporter.getInstance().getCounter(warningEnum);
- if (counter!=null)
- counter.increment(1);
+ PigHadoopLogger.getInstance().warn(this, msg, warningEnum);
}
}
Modified: pig/trunk/src/org/apache/pig/StoreFunc.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/StoreFunc.java?rev=1560584&r1=1560583&r2=1560584&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/StoreFunc.java (original)
+++ pig/trunk/src/org/apache/pig/StoreFunc.java Thu Jan 23 05:33:48 2014
@@ -26,6 +26,7 @@ import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigHadoopLogger;
import org.apache.pig.classification.InterfaceAudience;
import org.apache.pig.classification.InterfaceStability;
import org.apache.pig.data.Tuple;
@@ -202,7 +203,6 @@ public abstract class StoreFunc implemen
* @param warningEnum type of warning
*/
public final void warn(String msg, Enum warningEnum) {
- Counter counter = PigStatusReporter.getInstance().getCounter(warningEnum);
- counter.increment(1);
+ PigHadoopLogger.getInstance().warn(this, msg, warningEnum);
}
}
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigHadoopLogger.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigHadoopLogger.java?rev=1560584&r1=1560583&r2=1560584&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigHadoopLogger.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigHadoopLogger.java Thu Jan 23 05:33:48 2014
@@ -17,26 +17,31 @@
*/
package org.apache.pig.backend.hadoop.executionengine.mapReduceLayer;
+import java.util.Map;
+import java.util.WeakHashMap;
+
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.pig.EvalFunc;
+import org.apache.pig.LoadFunc;
+import org.apache.pig.StoreFunc;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PigLogger;
import org.apache.pig.tools.pigstats.PigStatusReporter;
/**
- *
+ *
* A singleton class that implements the PigLogger interface
* for use in map reduce context. Provides ability to aggregate
* warning messages
*/
public final class PigHadoopLogger implements PigLogger {
- private static PigHadoopLogger instance = new PigHadoopLogger();
-
- public static synchronized PigHadoopLogger getInstance() {
- if (instance == null) {
- instance = new PigHadoopLogger();
- }
- return instance;
- }
+ private static class PigHadoopLoggerHelper {
+ private static PigHadoopLogger instance = new PigHadoopLogger();
+ }
+
+ public static PigHadoopLogger getInstance() {
+ return PigHadoopLoggerHelper.instance;
+ }
private static Log log = LogFactory.getLog(PigHadoopLogger.class);
@@ -44,21 +49,33 @@ public final class PigHadoopLogger imple
private boolean aggregate = false;
+ private Map<Object, String> msgMap = new WeakHashMap<Object, String>();
+
private PigHadoopLogger() {
- }
+ }
- @SuppressWarnings("unchecked")
+ @SuppressWarnings("rawtypes")
public void warn(Object o, String msg, Enum warningEnum) {
- String displayMessage = o.getClass().getName() + ": " + msg;
-
+ String className = o.getClass().getName();
+ String displayMessage = className + "(" + warningEnum + "): " + msg;
+
if (getAggregate()) {
if (reporter != null) {
- reporter.getCounter(warningEnum).increment(1);
+ // log atleast once
+ if (!msgMap.get(o).equals(displayMessage)) {
+ log.warn(displayMessage);
+ msgMap.put(o, displayMessage);
+ }
+ if (o instanceof EvalFunc || o instanceof LoadFunc || o instanceof StoreFunc) {
+ reporter.getCounter(className, warningEnum.name()).increment(1);
+ } else {
+ reporter.getCounter(warningEnum).increment(1);
+ }
} else {
//TODO:
//in local mode of execution if the PigHadoopLogger is used initially,
- //then aggregation cannot be performed as the reporter will be null.
- //The reference to a reporter is given by Hadoop at run time.
+ //then aggregation cannot be performed as the reporter will be null.
+ //The reference to a reporter is given by Hadoop at run time.
//In local mode, due to the absence of Hadoop there will be no reporter
//Just print the warning message as is.
//If a warning message is printed in map reduce mode when aggregation
@@ -68,16 +85,16 @@ public final class PigHadoopLogger imple
} else {
log.warn(displayMessage);
}
- }
+ }
public synchronized void setReporter(PigStatusReporter rep) {
this.reporter = rep;
}
-
+
public synchronized boolean getAggregate() {
return aggregate;
}
-
+
public synchronized void setAggregate(boolean aggregate) {
this.aggregate = aggregate;
}