You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by nz...@apache.org on 2011/09/17 08:22:34 UTC
svn commit: r1171917 - in /hive/trunk:
common/src/java/org/apache/hadoop/hive/conf/ conf/
ql/src/java/org/apache/hadoop/hive/ql/exec/
ql/src/java/org/apache/hadoop/hive/ql/history/
ql/src/java/org/apache/hadoop/hive/ql/stats/
Author: nzhang
Date: Sat Sep 17 06:22:33 2011
New Revision: 1171917
URL: http://svn.apache.org/viewvc?rev=1171917&view=rev
Log:
HIVE-2446. Introduction of client statistics publishers possibility (Robert Surówka via Ning Zhang)
Added:
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/stats/ClientStatsPublisher.java
Modified:
hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
hive/trunk/conf/hive-default.xml
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/HadoopJobExecHelper.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/history/HiveHistory.java
Modified: hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
URL: http://svn.apache.org/viewvc/hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java?rev=1171917&r1=1171916&r2=1171917&view=diff
==============================================================================
--- hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (original)
+++ hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java Sat Sep 17 06:22:33 2011
@@ -123,6 +123,7 @@ public class HiveConf extends Configurat
PREEXECHOOKS("hive.exec.pre.hooks", ""),
POSTEXECHOOKS("hive.exec.post.hooks", ""),
ONFAILUREHOOKS("hive.exec.failure.hooks", ""),
+ CLIENTSTATSPUBLISHERS("hive.client.stats.publishers", ""),
EXECPARALLEL("hive.exec.parallel", false), // parallel query launching
EXECPARALLETHREADNUMBER("hive.exec.parallel.thread.number", 8),
HIVESPECULATIVEEXECREDUCERS("hive.mapred.reduce.tasks.speculative.execution", true),
@@ -410,7 +411,8 @@ public class HiveConf extends Configurat
3000), // # milliseconds to wait before the next retry
HIVE_STATS_COLLECT_RAWDATASIZE("hive.stats.collect.rawdatasize", true),
// should the raw data size be collected when analayzing tables
-
+ CLIENT_STATS_COUNTERS("hive.client.stats.counters", ""),
+ //Subset of counters that should be of interest for hive.client.stats.publishers (when one wants to limit their publishing). Non-display names should be used".
// Concurrency
HIVE_SUPPORT_CONCURRENCY("hive.support.concurrency", false),
Modified: hive/trunk/conf/hive-default.xml
URL: http://svn.apache.org/viewvc/hive/trunk/conf/hive-default.xml?rev=1171917&r1=1171916&r2=1171917&view=diff
==============================================================================
--- hive/trunk/conf/hive-default.xml (original)
+++ hive/trunk/conf/hive-default.xml Sat Sep 17 06:22:33 2011
@@ -551,6 +551,18 @@
</property>
<property>
+ <name>hive.client.stats.publishers</name>
+ <value></value>
+ <description>Comma-separated list of statistics publishers to be invoked on counters on each job. A client stats publisher is specified as the name of a Java class which implements the org.apache.hadoop.hive.ql.stats.ClientStatsPublisher interface.</description>
+</property>
+
+<property>
+ <name>hive.client.stats.counters</name>
+ <value></value>
+ <description>Subset of counters that should be of interest for hive.client.stats.publishers (when one wants to limit their publishing). Non-display names should be used</description>
+</property>
+
+<property>
<name>hive.merge.mapfiles</name>
<value>true</value>
<description>Merge small files at the end of a map-only job</description>
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/HadoopJobExecHelper.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/HadoopJobExecHelper.java?rev=1171917&r1=1171916&r2=1171917&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/HadoopJobExecHelper.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/HadoopJobExecHelper.java Sat Sep 17 06:22:33 2011
@@ -21,6 +21,7 @@ package org.apache.hadoop.hive.ql.exec;
import java.io.IOException;
import java.io.Serializable;
import java.text.SimpleDateFormat;
+import java.util.ArrayList;
import java.util.Calendar;
import java.util.Collections;
import java.util.HashMap;
@@ -30,6 +31,9 @@ import java.util.Map;
import java.util.Set;
import java.util.Enumeration;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.common.JavaUtils;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.MapRedStats;
import org.apache.hadoop.hive.ql.exec.Operator.ProgressCounter;
@@ -38,6 +42,7 @@ import org.apache.hadoop.hive.ql.exec.er
import org.apache.hadoop.hive.ql.history.HiveHistory.Keys;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hadoop.hive.ql.session.SessionState.LogHelper;
+import org.apache.hadoop.hive.ql.stats.ClientStatsPublisher;
import org.apache.hadoop.hive.shims.ShimLoader;
import org.apache.hadoop.mapred.Counters;
import org.apache.hadoop.mapred.JobClient;
@@ -54,6 +59,8 @@ import org.apache.log4j.PropertyConfigur
public class HadoopJobExecHelper {
+ static final private Log LOG = LogFactory.getLog(HadoopJobExecHelper.class.getName());
+
protected transient JobConf job;
protected Task<? extends Serializable> task;
@@ -225,6 +232,7 @@ public class HadoopJobExecHelper {
long cpuMsec = -1;
int numMap = -1;
int numReduce = -1;
+ List<ClientStatsPublisher> clientStatPublishers = getClientStatPublishers();
while (!rj.isComplete()) {
try {
@@ -363,6 +371,14 @@ public class HadoopJobExecHelper {
}
}
+ //Prepare data for Client Stat Publishers (if any present) and execute them
+ if (clientStatPublishers.size() > 0){
+ Map<String, Double> exctractedCounters = extractAllCounterValues(ctrs);
+ for(ClientStatsPublisher clientStatPublisher : clientStatPublishers){
+ clientStatPublisher.run(exctractedCounters, rj.getID().toString());
+ }
+ }
+
Counter counterCpuMsec = ctrs.findCounter("org.apache.hadoop.mapred.Task$Counter",
"CPU_MILLISECONDS");
if (counterCpuMsec != null) {
@@ -704,6 +720,7 @@ public class HadoopJobExecHelper {
if (SessionState.get() != null) {
SessionState.get().getLastMapRedStatsList().add(mapRedStats);
}
+
boolean success = mapRedStats.isSuccess();
String statusMesg = getJobEndMsg(rj.getJobID());
@@ -728,4 +745,40 @@ public class HadoopJobExecHelper {
return returnVal;
}
+
+ private Map<String, Double> extractAllCounterValues(Counters counters) {
+ Map<String, Double> exctractedCounters = new HashMap<String, Double>();
+ for (Counters.Group cg : counters) {
+ for (Counter c : cg) {
+ exctractedCounters.put(cg.getName() + "::" + c.getName(), new Double(c.getCounter()));
+ }
+ }
+ return exctractedCounters;
+ }
+
+ private List<ClientStatsPublisher> getClientStatPublishers() {
+ List<ClientStatsPublisher> clientStatsPublishers = new ArrayList<ClientStatsPublisher>();
+ String confString = HiveConf.getVar(job, HiveConf.ConfVars.CLIENTSTATSPUBLISHERS);
+ confString = confString.trim();
+ if (confString.equals("")) {
+ return clientStatsPublishers;
+ }
+
+ String[] clientStatsPublisherClasses = confString.split(",");
+
+ for (String clientStatsPublisherClass : clientStatsPublisherClasses) {
+ try {
+ clientStatsPublishers.add((ClientStatsPublisher) Class.forName(
+ clientStatsPublisherClass.trim(), true, JavaUtils.getClassLoader()).newInstance());
+ } catch (RuntimeException e) {
+ throw e;
+ } catch (Exception e) {
+ LOG.warn(e.getClass().getName() + " occured when trying to create class: "
+ + clientStatsPublisherClass.trim() + " implementing ClientStatsPublisher interface");
+ LOG.warn("The exception message is: " + e.getMessage());
+ LOG.warn("Program will continue, but without this ClientStatsPublisher working");
+ }
+ }
+ return clientStatsPublishers;
+ }
}
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/history/HiveHistory.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/history/HiveHistory.java?rev=1171917&r1=1171916&r2=1171917&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/history/HiveHistory.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/history/HiveHistory.java Sat Sep 17 06:22:33 2011
@@ -299,7 +299,7 @@ public class HiveHistory {
}
/**
- * Called at the start of job Driver.run().
+ * Called at the start of job Driver.execute().
*/
public void startQuery(String cmd, String id) {
SessionState ss = SessionState.get();
Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/stats/ClientStatsPublisher.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/stats/ClientStatsPublisher.java?rev=1171917&view=auto
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/stats/ClientStatsPublisher.java (added)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/stats/ClientStatsPublisher.java Sat Sep 17 06:22:33 2011
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.stats;
+
+import java.util.Map;
+
+public interface ClientStatsPublisher {
+
+ public void run(Map<String, Double> counterValues, String jobID);
+
+}