You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by nz...@apache.org on 2011/09/17 08:22:34 UTC

svn commit: r1171917 - in /hive/trunk: common/src/java/org/apache/hadoop/hive/conf/ conf/ ql/src/java/org/apache/hadoop/hive/ql/exec/ ql/src/java/org/apache/hadoop/hive/ql/history/ ql/src/java/org/apache/hadoop/hive/ql/stats/

Author: nzhang
Date: Sat Sep 17 06:22:33 2011
New Revision: 1171917

URL: http://svn.apache.org/viewvc?rev=1171917&view=rev
Log:
HIVE-2446. Introduction of client statistics publishers possibility (Robert Surówka via Ning Zhang)

Added:
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/stats/ClientStatsPublisher.java
Modified:
    hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
    hive/trunk/conf/hive-default.xml
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/HadoopJobExecHelper.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/history/HiveHistory.java

Modified: hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
URL: http://svn.apache.org/viewvc/hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java?rev=1171917&r1=1171916&r2=1171917&view=diff
==============================================================================
--- hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (original)
+++ hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java Sat Sep 17 06:22:33 2011
@@ -123,6 +123,7 @@ public class HiveConf extends Configurat
     PREEXECHOOKS("hive.exec.pre.hooks", ""),
     POSTEXECHOOKS("hive.exec.post.hooks", ""),
     ONFAILUREHOOKS("hive.exec.failure.hooks", ""),
+    CLIENTSTATSPUBLISHERS("hive.client.stats.publishers", ""),
     EXECPARALLEL("hive.exec.parallel", false), // parallel query launching
     EXECPARALLETHREADNUMBER("hive.exec.parallel.thread.number", 8),
     HIVESPECULATIVEEXECREDUCERS("hive.mapred.reduce.tasks.speculative.execution", true),
@@ -410,7 +411,8 @@ public class HiveConf extends Configurat
         3000),  // # milliseconds to wait before the next retry
     HIVE_STATS_COLLECT_RAWDATASIZE("hive.stats.collect.rawdatasize", true),
     // should the raw data size be collected when analayzing tables
-
+    CLIENT_STATS_COUNTERS("hive.client.stats.counters", ""),
+    //Subset of counters that should be of interest for hive.client.stats.publishers (when one wants to limit their publishing). Non-display names should be used".
 
     // Concurrency
     HIVE_SUPPORT_CONCURRENCY("hive.support.concurrency", false),

Modified: hive/trunk/conf/hive-default.xml
URL: http://svn.apache.org/viewvc/hive/trunk/conf/hive-default.xml?rev=1171917&r1=1171916&r2=1171917&view=diff
==============================================================================
--- hive/trunk/conf/hive-default.xml (original)
+++ hive/trunk/conf/hive-default.xml Sat Sep 17 06:22:33 2011
@@ -551,6 +551,18 @@
 </property>
 
 <property>
+  <name>hive.client.stats.publishers</name>
+  <value></value>
+  <description>Comma-separated list of statistics publishers to be invoked on counters on each job.  A client stats publisher is specified as the name of a Java class which implements the org.apache.hadoop.hive.ql.stats.ClientStatsPublisher interface.</description>
+</property>
+
+<property>
+  <name>hive.client.stats.counters</name>
+  <value></value>
+  <description>Subset of counters that should be of interest for hive.client.stats.publishers (when one wants to limit their publishing). Non-display names should be used</description>
+</property>
+
+<property>
   <name>hive.merge.mapfiles</name>
   <value>true</value>
   <description>Merge small files at the end of a map-only job</description>

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/HadoopJobExecHelper.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/HadoopJobExecHelper.java?rev=1171917&r1=1171916&r2=1171917&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/HadoopJobExecHelper.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/HadoopJobExecHelper.java Sat Sep 17 06:22:33 2011
@@ -21,6 +21,7 @@ package org.apache.hadoop.hive.ql.exec;
 import java.io.IOException;
 import java.io.Serializable;
 import java.text.SimpleDateFormat;
+import java.util.ArrayList;
 import java.util.Calendar;
 import java.util.Collections;
 import java.util.HashMap;
@@ -30,6 +31,9 @@ import java.util.Map;
 import java.util.Set;
 import java.util.Enumeration;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.common.JavaUtils;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.MapRedStats;
 import org.apache.hadoop.hive.ql.exec.Operator.ProgressCounter;
@@ -38,6 +42,7 @@ import org.apache.hadoop.hive.ql.exec.er
 import org.apache.hadoop.hive.ql.history.HiveHistory.Keys;
 import org.apache.hadoop.hive.ql.session.SessionState;
 import org.apache.hadoop.hive.ql.session.SessionState.LogHelper;
+import org.apache.hadoop.hive.ql.stats.ClientStatsPublisher;
 import org.apache.hadoop.hive.shims.ShimLoader;
 import org.apache.hadoop.mapred.Counters;
 import org.apache.hadoop.mapred.JobClient;
@@ -54,6 +59,8 @@ import org.apache.log4j.PropertyConfigur
 
 public class HadoopJobExecHelper {
 
+  static final private Log LOG = LogFactory.getLog(HadoopJobExecHelper.class.getName());
+
   protected transient JobConf job;
   protected Task<? extends Serializable> task;
 
@@ -225,6 +232,7 @@ public class HadoopJobExecHelper {
     long cpuMsec = -1;
     int numMap = -1;
     int numReduce = -1;
+    List<ClientStatsPublisher> clientStatPublishers = getClientStatPublishers();
 
     while (!rj.isComplete()) {
       try {
@@ -363,6 +371,14 @@ public class HadoopJobExecHelper {
       }
     }
 
+    //Prepare data for Client Stat Publishers (if any present) and execute them
+     if (clientStatPublishers.size() > 0){
+        Map<String, Double> exctractedCounters = extractAllCounterValues(ctrs);
+        for(ClientStatsPublisher clientStatPublisher : clientStatPublishers){
+          clientStatPublisher.run(exctractedCounters, rj.getID().toString());
+        }
+      }
+
     Counter counterCpuMsec = ctrs.findCounter("org.apache.hadoop.mapred.Task$Counter",
         "CPU_MILLISECONDS");
     if (counterCpuMsec != null) {
@@ -704,6 +720,7 @@ public class HadoopJobExecHelper {
     if (SessionState.get() != null) {
       SessionState.get().getLastMapRedStatsList().add(mapRedStats);
     }
+
     boolean success = mapRedStats.isSuccess();
 
     String statusMesg = getJobEndMsg(rj.getJobID());
@@ -728,4 +745,40 @@ public class HadoopJobExecHelper {
 
     return returnVal;
   }
+
+  private Map<String, Double> extractAllCounterValues(Counters counters) {
+    Map<String, Double> exctractedCounters = new HashMap<String, Double>();
+    for (Counters.Group cg : counters) {
+      for (Counter c : cg) {
+        exctractedCounters.put(cg.getName() + "::" + c.getName(), new Double(c.getCounter()));
+      }
+    }
+    return exctractedCounters;
+  }
+
+  private List<ClientStatsPublisher> getClientStatPublishers() {
+    List<ClientStatsPublisher> clientStatsPublishers = new ArrayList<ClientStatsPublisher>();
+    String confString = HiveConf.getVar(job, HiveConf.ConfVars.CLIENTSTATSPUBLISHERS);
+    confString = confString.trim();
+    if (confString.equals("")) {
+      return clientStatsPublishers;
+    }
+
+    String[] clientStatsPublisherClasses = confString.split(",");
+
+    for (String clientStatsPublisherClass : clientStatsPublisherClasses) {
+      try {
+        clientStatsPublishers.add((ClientStatsPublisher) Class.forName(
+            clientStatsPublisherClass.trim(), true, JavaUtils.getClassLoader()).newInstance());
+      } catch (RuntimeException e) {
+        throw e;
+      } catch (Exception e) {
+        LOG.warn(e.getClass().getName() + " occured when trying to create class: "
+            + clientStatsPublisherClass.trim() + " implementing ClientStatsPublisher interface");
+        LOG.warn("The exception message is: " + e.getMessage());
+        LOG.warn("Program will continue, but without this ClientStatsPublisher working");
+      }
+    }
+    return clientStatsPublishers;
+  }
 }

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/history/HiveHistory.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/history/HiveHistory.java?rev=1171917&r1=1171916&r2=1171917&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/history/HiveHistory.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/history/HiveHistory.java Sat Sep 17 06:22:33 2011
@@ -299,7 +299,7 @@ public class HiveHistory {
   }
 
   /**
-   * Called at the start of job Driver.run().
+   * Called at the start of job Driver.execute().
    */
   public void startQuery(String cmd, String id) {
     SessionState ss = SessionState.get();

Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/stats/ClientStatsPublisher.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/stats/ClientStatsPublisher.java?rev=1171917&view=auto
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/stats/ClientStatsPublisher.java (added)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/stats/ClientStatsPublisher.java Sat Sep 17 06:22:33 2011
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.stats;
+
+import java.util.Map;
+
+public interface ClientStatsPublisher {
+
+  public void run(Map<String, Double> counterValues, String jobID);
+
+}