You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by da...@apache.org on 2014/04/02 00:46:08 UTC

svn commit: r1583817 - in /pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez: PigProcessor.java PigTezLogger.java TezStatusReporter.java

Author: daijy
Date: Tue Apr  1 22:46:08 2014
New Revision: 1583817

URL: http://svn.apache.org/r1583817
Log:
PIG-3829: Make custom counter work

Added:
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/PigTezLogger.java
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezStatusReporter.java
Modified:
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/PigProcessor.java

Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/PigProcessor.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/PigProcessor.java?rev=1583817&r1=1583816&r2=1583817&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/PigProcessor.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/PigProcessor.java Tue Apr  1 22:46:08 2014
@@ -101,6 +101,12 @@ public class PigProcessor implements Log
         // for backwards compatibility with the existing code base.
         PigMapReduce.sJobConfInternal.set(conf);
 
+        boolean aggregateWarning = "true".equalsIgnoreCase(pc.getProperties().getProperty("aggregate.warning"));
+
+        PigTezLogger pigTezLogger = new PigTezLogger(new TezStatusReporter(processorContext), aggregateWarning);
+
+        PhysicalOperator.setPigLogger(pigTezLogger);
+
         LinkedList<TezTaskConfigurable> tezTCs = PlanHelper.getPhysicalOperators(execPlan, TezTaskConfigurable.class);
         for (TezTaskConfigurable tezTC : tezTCs){
             tezTC.initialize(processorContext);

Added: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/PigTezLogger.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/PigTezLogger.java?rev=1583817&view=auto
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/PigTezLogger.java (added)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/PigTezLogger.java Tue Apr  1 22:46:08 2014
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.tez;
+
+import java.util.Map;
+import java.util.WeakHashMap;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.pig.EvalFunc;
+import org.apache.pig.LoadFunc;
+import org.apache.pig.StoreFunc;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PigLogger;
+
+public class PigTezLogger implements PigLogger {
+
+    private static Log log = LogFactory.getLog(PigTezLogger.class);
+
+    private TezStatusReporter reporter = null;
+
+    private boolean aggregate = false;
+
+    private Map<Object, String> msgMap = new WeakHashMap<Object, String>();
+
+    public PigTezLogger(TezStatusReporter tezStatusReporter, boolean aggregate) {
+        this.reporter = tezStatusReporter;
+        this.aggregate = aggregate;
+    }
+
+    @SuppressWarnings("rawtypes")
+    public void warn(Object o, String msg, Enum warningEnum) {
+        String className = o.getClass().getName();
+        String displayMessage = className + "(" + warningEnum + "): " + msg;
+
+        if (getAggregate()) {
+            if (reporter != null) {
+                // log atleast once
+                if (msgMap.get(o) == null || !msgMap.get(o).equals(displayMessage)) {
+                    log.warn(displayMessage);
+                    msgMap.put(o, displayMessage);
+                }
+                if (o instanceof EvalFunc || o instanceof LoadFunc || o instanceof StoreFunc) {
+                    reporter.getCounter(className, warningEnum.name()).increment(1);
+                } else {
+                    reporter.getCounter(warningEnum).increment(1);
+                }
+            } else {
+                //TODO:
+                //There is one issue in PigHadoopLogger in local mode since reporter is null.
+                //It is probably also true to PigTezLogger once tez local mode is done, need to
+                //check back. Please refer to PigHadoopLogger for detail
+                log.warn(displayMessage);
+            }
+        } else {
+            log.warn(displayMessage);
+        }
+    }
+
+    public synchronized boolean getAggregate() {
+        return aggregate;
+    }
+
+}
\ No newline at end of file

Added: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezStatusReporter.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezStatusReporter.java?rev=1583817&view=auto
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezStatusReporter.java (added)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezStatusReporter.java Tue Apr  1 22:46:08 2014
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.tez;
+
+import org.apache.tez.common.counters.TezCounter;
+import org.apache.tez.runtime.api.TezProcessorContext;
+
+public class TezStatusReporter {
+    private TezProcessorContext context;
+    
+    public TezStatusReporter(TezProcessorContext context) {
+        this.context = context;
+    }
+    
+    public TezCounter getCounter(Enum<?> name) {
+        if (context.getCounters().getGroup(name.getClass().getName())!=null) {
+            return context.getCounters().getGroup(name.getClass().getName()).findCounter(name.toString());
+        }
+        return null;
+    }
+
+    public TezCounter getCounter(String group, String name) {
+        if (context.getCounters().getGroup(group)!=null)
+            return context.getCounters().getGroup(group).findCounter(name);
+        return null;
+    }
+}
\ No newline at end of file