You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by da...@apache.org on 2014/04/02 00:46:08 UTC
svn commit: r1583817 - in
/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez:
PigProcessor.java PigTezLogger.java TezStatusReporter.java
Author: daijy
Date: Tue Apr 1 22:46:08 2014
New Revision: 1583817
URL: http://svn.apache.org/r1583817
Log:
PIG-3829: Make custom counter work
Added:
pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/PigTezLogger.java
pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezStatusReporter.java
Modified:
pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/PigProcessor.java
Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/PigProcessor.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/PigProcessor.java?rev=1583817&r1=1583816&r2=1583817&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/PigProcessor.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/PigProcessor.java Tue Apr 1 22:46:08 2014
@@ -101,6 +101,12 @@ public class PigProcessor implements Log
// for backwards compatibility with the existing code base.
PigMapReduce.sJobConfInternal.set(conf);
+ boolean aggregateWarning = "true".equalsIgnoreCase(pc.getProperties().getProperty("aggregate.warning"));
+
+ PigTezLogger pigTezLogger = new PigTezLogger(new TezStatusReporter(processorContext), aggregateWarning);
+
+ PhysicalOperator.setPigLogger(pigTezLogger);
+
LinkedList<TezTaskConfigurable> tezTCs = PlanHelper.getPhysicalOperators(execPlan, TezTaskConfigurable.class);
for (TezTaskConfigurable tezTC : tezTCs){
tezTC.initialize(processorContext);
Added: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/PigTezLogger.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/PigTezLogger.java?rev=1583817&view=auto
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/PigTezLogger.java (added)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/PigTezLogger.java Tue Apr 1 22:46:08 2014
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.tez;
+
+import java.util.Map;
+import java.util.WeakHashMap;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.pig.EvalFunc;
+import org.apache.pig.LoadFunc;
+import org.apache.pig.StoreFunc;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PigLogger;
+
+public class PigTezLogger implements PigLogger {
+
+ private static Log log = LogFactory.getLog(PigTezLogger.class);
+
+ private TezStatusReporter reporter = null;
+
+ private boolean aggregate = false;
+
+ private Map<Object, String> msgMap = new WeakHashMap<Object, String>();
+
+ public PigTezLogger(TezStatusReporter tezStatusReporter, boolean aggregate) {
+ this.reporter = tezStatusReporter;
+ this.aggregate = aggregate;
+ }
+
+ @SuppressWarnings("rawtypes")
+ public void warn(Object o, String msg, Enum warningEnum) {
+ String className = o.getClass().getName();
+ String displayMessage = className + "(" + warningEnum + "): " + msg;
+
+ if (getAggregate()) {
+ if (reporter != null) {
+ // log atleast once
+ if (msgMap.get(o) == null || !msgMap.get(o).equals(displayMessage)) {
+ log.warn(displayMessage);
+ msgMap.put(o, displayMessage);
+ }
+ if (o instanceof EvalFunc || o instanceof LoadFunc || o instanceof StoreFunc) {
+ reporter.getCounter(className, warningEnum.name()).increment(1);
+ } else {
+ reporter.getCounter(warningEnum).increment(1);
+ }
+ } else {
+ //TODO:
+ //There is one issue in PigHadoopLogger in local mode since reporter is null.
+ //It is probably also true to PigTezLogger once tez local mode is done, need to
+ //check back. Please refer to PigHadoopLogger for detail
+ log.warn(displayMessage);
+ }
+ } else {
+ log.warn(displayMessage);
+ }
+ }
+
+ public synchronized boolean getAggregate() {
+ return aggregate;
+ }
+
+}
\ No newline at end of file
Added: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezStatusReporter.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezStatusReporter.java?rev=1583817&view=auto
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezStatusReporter.java (added)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezStatusReporter.java Tue Apr 1 22:46:08 2014
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.tez;
+
+import org.apache.tez.common.counters.TezCounter;
+import org.apache.tez.runtime.api.TezProcessorContext;
+
+public class TezStatusReporter {
+ private TezProcessorContext context;
+
+ public TezStatusReporter(TezProcessorContext context) {
+ this.context = context;
+ }
+
+ public TezCounter getCounter(Enum<?> name) {
+ if (context.getCounters().getGroup(name.getClass().getName())!=null) {
+ return context.getCounters().getGroup(name.getClass().getName()).findCounter(name.toString());
+ }
+ return null;
+ }
+
+ public TezCounter getCounter(String group, String name) {
+ if (context.getCounters().getGroup(group)!=null)
+ return context.getCounters().getGroup(group).findCounter(name);
+ return null;
+ }
+}
\ No newline at end of file