You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by sz...@apache.org on 2015/07/11 00:35:34 UTC
[3/4] hive git commit: HIVE-10761 : Create codahale-based metrics
system for Hive (Szehon, reviewed by Xuefu)
HIVE-10761 : Create codahale-based metrics system for Hive (Szehon, reviewed by Xuefu)
Conflicts:
pom.xml
service/src/java/org/apache/hive/service/server/HiveServer2.java
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/bd84e87c
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/bd84e87c
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/bd84e87c
Branch: refs/heads/branch-1
Commit: bd84e87c880ac389c9108916aada0da06d94bf9b
Parents: 2d49e5a
Author: Szehon Ho <sz...@cloudera.com>
Authored: Wed Jun 3 23:46:28 2015 -0700
Committer: Szehon Ho <sz...@cloudera.com>
Committed: Fri Jul 10 15:22:56 2015 -0700
----------------------------------------------------------------------
common/pom.xml | 20 +
.../hadoop/hive/common/JvmPauseMonitor.java | 225 ++++++++++++
.../hive/common/metrics/LegacyMetrics.java | 262 +++++++++++++
.../hadoop/hive/common/metrics/Metrics.java | 253 -------------
.../hive/common/metrics/common/Metrics.java | 68 ++++
.../common/metrics/common/MetricsFactory.java | 48 +++
.../metrics/metrics2/CodahaleMetrics.java | 366 +++++++++++++++++++
.../metrics/metrics2/MetricsReporting.java | 27 ++
.../org/apache/hadoop/hive/conf/HiveConf.java | 18 +-
.../hive/common/metrics/TestLegacyMetrics.java | 295 +++++++++++++++
.../hadoop/hive/common/metrics/TestMetrics.java | 286 ---------------
.../metrics/metrics2/TestCodahaleMetrics.java | 138 +++++++
.../hive/metastore/TestMetaStoreMetrics.java | 94 +++++
.../hadoop/hive/metastore/HiveMetaStore.java | 132 ++++---
pom.xml | 3 +
.../apache/hive/service/server/HiveServer2.java | 25 +-
.../hadoop/hive/shims/Hadoop20SShims.java | 5 -
.../apache/hadoop/hive/shims/Hadoop23Shims.java | 13 -
.../apache/hadoop/hive/shims/HadoopShims.java | 2 -
19 files changed, 1665 insertions(+), 615 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/bd84e87c/common/pom.xml
----------------------------------------------------------------------
diff --git a/common/pom.xml b/common/pom.xml
index a615c1e..8d4b1ea 100644
--- a/common/pom.xml
+++ b/common/pom.xml
@@ -98,6 +98,26 @@
<artifactId>json</artifactId>
<version>${json.version}</version>
</dependency>
+ <dependency>
+ <groupId>io.dropwizard.metrics</groupId>
+ <artifactId>metrics-core</artifactId>
+ <version>${dropwizard.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>io.dropwizard.metrics</groupId>
+ <artifactId>metrics-jvm</artifactId>
+ <version>${dropwizard.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>io.dropwizard.metrics</groupId>
+ <artifactId>metrics-json</artifactId>
+ <version>${dropwizard.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-databind</artifactId>
+ <version>${jackson.new.version}</version>
+ </dependency>
</dependencies>
<profiles>
http://git-wip-us.apache.org/repos/asf/hive/blob/bd84e87c/common/src/java/org/apache/hadoop/hive/common/JvmPauseMonitor.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/common/JvmPauseMonitor.java b/common/src/java/org/apache/hadoop/hive/common/JvmPauseMonitor.java
new file mode 100644
index 0000000..c3949f2
--- /dev/null
+++ b/common/src/java/org/apache/hadoop/hive/common/JvmPauseMonitor.java
@@ -0,0 +1,225 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.common;
+
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Stopwatch;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.common.metrics.common.MetricsFactory;
+import org.apache.hadoop.util.Daemon;
+
+import java.lang.management.GarbageCollectorMXBean;
+import java.lang.management.ManagementFactory;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Based on the JvmPauseMonitor from Hadoop.
+ */
+public class JvmPauseMonitor {
+ private static final Log LOG = LogFactory.getLog(
+ JvmPauseMonitor.class);
+
+ /** The target sleep time */
+ private static final long SLEEP_INTERVAL_MS = 500;
+
+ /** log WARN if we detect a pause longer than this threshold */
+ private final long warnThresholdMs;
+ private static final String WARN_THRESHOLD_KEY =
+ "jvm.pause.warn-threshold.ms";
+ private static final long WARN_THRESHOLD_DEFAULT = 10000;
+
+ /** log INFO if we detect a pause longer than this threshold */
+ private final long infoThresholdMs;
+ private static final String INFO_THRESHOLD_KEY =
+ "jvm.pause.info-threshold.ms";
+ private static final long INFO_THRESHOLD_DEFAULT = 1000;
+
+ private long numGcWarnThresholdExceeded = 0;
+ private long numGcInfoThresholdExceeded = 0;
+ private long totalGcExtraSleepTime = 0;
+
+ private Thread monitorThread;
+ private volatile boolean shouldRun = true;
+
+ public JvmPauseMonitor(Configuration conf) {
+ this.warnThresholdMs = conf.getLong(WARN_THRESHOLD_KEY, WARN_THRESHOLD_DEFAULT);
+ this.infoThresholdMs = conf.getLong(INFO_THRESHOLD_KEY, INFO_THRESHOLD_DEFAULT);
+ }
+
+ public void start() {
+ Preconditions.checkState(monitorThread == null,
+ "JvmPauseMonitor thread is Already started");
+ monitorThread = new Daemon(new Monitor());
+ monitorThread.start();
+ }
+
+ public void stop() {
+ shouldRun = false;
+ if (isStarted()) {
+ monitorThread.interrupt();
+ try {
+ monitorThread.join();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ }
+ }
+
+ public boolean isStarted() {
+ return monitorThread != null;
+ }
+
+ public long getNumGcWarnThreadholdExceeded() {
+ return numGcWarnThresholdExceeded;
+ }
+
+ public long getNumGcInfoThresholdExceeded() {
+ return numGcInfoThresholdExceeded;
+ }
+
+ public long getTotalGcExtraSleepTime() {
+ return totalGcExtraSleepTime;
+ }
+
+ private String formatMessage(long extraSleepTime,
+ Map<String, GcTimes> gcTimesAfterSleep,
+ Map<String, GcTimes> gcTimesBeforeSleep) {
+
+ Set<String> gcBeanNames = Sets.intersection(
+ gcTimesAfterSleep.keySet(),
+ gcTimesBeforeSleep.keySet());
+ List<String> gcDiffs = Lists.newArrayList();
+ for (String name : gcBeanNames) {
+ GcTimes diff = gcTimesAfterSleep.get(name).subtract(
+ gcTimesBeforeSleep.get(name));
+ if (diff.gcCount != 0) {
+ gcDiffs.add("GC pool '" + name + "' had collection(s): " +
+ diff.toString());
+ }
+ }
+
+ String ret = "Detected pause in JVM or host machine (eg GC): " +
+ "pause of approximately " + extraSleepTime + "ms\n";
+ if (gcDiffs.isEmpty()) {
+ ret += "No GCs detected";
+ } else {
+ ret += Joiner.on("\n").join(gcDiffs);
+ }
+ return ret;
+ }
+
+ private Map<String, GcTimes> getGcTimes() {
+ Map<String, GcTimes> map = Maps.newHashMap();
+ List<GarbageCollectorMXBean> gcBeans =
+ ManagementFactory.getGarbageCollectorMXBeans();
+ for (GarbageCollectorMXBean gcBean : gcBeans) {
+ map.put(gcBean.getName(), new GcTimes(gcBean));
+ }
+ return map;
+ }
+
+ private static class GcTimes {
+ private GcTimes(GarbageCollectorMXBean gcBean) {
+ gcCount = gcBean.getCollectionCount();
+ gcTimeMillis = gcBean.getCollectionTime();
+ }
+
+ private GcTimes(long count, long time) {
+ this.gcCount = count;
+ this.gcTimeMillis = time;
+ }
+
+ private GcTimes subtract(GcTimes other) {
+ return new GcTimes(this.gcCount - other.gcCount,
+ this.gcTimeMillis - other.gcTimeMillis);
+ }
+
+ @Override
+ public String toString() {
+ return "count=" + gcCount + " time=" + gcTimeMillis + "ms";
+ }
+
+ private long gcCount;
+ private long gcTimeMillis;
+ }
+
+ private class Monitor implements Runnable {
+ @Override
+ public void run() {
+ Stopwatch sw = new Stopwatch();
+ Map<String, GcTimes> gcTimesBeforeSleep = getGcTimes();
+ while (shouldRun) {
+ sw.reset().start();
+ try {
+ Thread.sleep(SLEEP_INTERVAL_MS);
+ } catch (InterruptedException ie) {
+ return;
+ }
+ long extraSleepTime = sw.elapsedMillis() - SLEEP_INTERVAL_MS;
+ Map<String, GcTimes> gcTimesAfterSleep = getGcTimes();
+
+ if (extraSleepTime > warnThresholdMs) {
+ ++numGcWarnThresholdExceeded;
+ LOG.warn(formatMessage(
+ extraSleepTime, gcTimesAfterSleep, gcTimesBeforeSleep));
+ incrementMetricsCounter("jvm.pause.warn-threshold", 1);
+ } else if (extraSleepTime > infoThresholdMs) {
+ ++numGcInfoThresholdExceeded;
+ LOG.info(formatMessage(
+ extraSleepTime, gcTimesAfterSleep, gcTimesBeforeSleep));
+ incrementMetricsCounter("jvm.pause.info-threshold", 1);
+ }
+ incrementMetricsCounter("jvm.pause.extraSleepTime", extraSleepTime);
+ totalGcExtraSleepTime += extraSleepTime;
+ gcTimesBeforeSleep = gcTimesAfterSleep;
+ }
+ }
+
+ private void incrementMetricsCounter(String name, long count) {
+ try {
+ MetricsFactory.getMetricsInstance().incrementCounter(name, count);
+ } catch (Exception e) {
+ LOG.warn("Error Reporting JvmPauseMonitor to Metrics system", e);
+ }
+ }
+ }
+
+ /**
+ * Simple 'main' to facilitate manual testing of the pause monitor.
+ *
+ * This main function just leaks memory into a list. Running this class
+ * with a 1GB heap will very quickly go into "GC hell" and result in
+ * log messages about the GC pauses.
+ */
+ public static void main(String []args) throws Exception {
+ new JvmPauseMonitor(new Configuration()).start();
+ List<String> list = Lists.newArrayList();
+ int i = 0;
+ while (true) {
+ list.add(String.valueOf(i++));
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/bd84e87c/common/src/java/org/apache/hadoop/hive/common/metrics/LegacyMetrics.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/common/metrics/LegacyMetrics.java b/common/src/java/org/apache/hadoop/hive/common/metrics/LegacyMetrics.java
new file mode 100644
index 0000000..14f7afb
--- /dev/null
+++ b/common/src/java/org/apache/hadoop/hive/common/metrics/LegacyMetrics.java
@@ -0,0 +1,262 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.common.metrics;
+
+import org.apache.hadoop.hive.common.metrics.common.Metrics;
+import org.apache.hadoop.hive.conf.HiveConf;
+
+import java.io.IOException;
+import java.lang.management.ManagementFactory;
+import java.util.HashMap;
+
+import javax.management.MBeanServer;
+import javax.management.MalformedObjectNameException;
+import javax.management.ObjectName;
+
+/**
+ * This class may eventually get superseded by org.apache.hadoop.hive.common.metrics2.Metrics.
+ *
+ * Metrics Subsystem - allows exposure of a number of named parameters/counters
+ * via jmx, intended to be used as a static subsystem
+ *
+ * Has a couple of primary ways it can be used:
+ * (i) Using the set and get methods to set and get named parameters
+ * (ii) Using the incrementCounter method to increment and set named
+ * parameters in one go, rather than having to make a get and then a set.
+ * (iii) Using the startScope and endScope methods to start and end
+ * named "scopes" that record the number of times they've been
+ * instantiated and amount of time(in milliseconds) spent inside
+ * the scopes.
+ */
+public class LegacyMetrics implements Metrics {
+
+ private LegacyMetrics() {
+ // block
+ }
+
+ /**
+ * MetricsScope : A class that encapsulates an idea of a metered scope.
+ * Instantiating a named scope and then closing it exposes two counters:
+ * (i) a "number of calls" counter ( <name>.n ), and
+ * (ii) a "number of msecs spent between scope open and close" counter. ( <name>.t)
+ */
+ public static class MetricsScope {
+
+ final LegacyMetrics metrics;
+
+ final String name;
+ final String numCounter;
+ final String timeCounter;
+ final String avgTimeCounter;
+
+ private boolean isOpen = false;
+ private Long startTime = null;
+
+ /**
+ * Instantiates a named scope - intended to only be called by Metrics, so locally scoped.
+ * @param name - name of the variable
+ * @throws IOException
+ */
+ private MetricsScope(String name, LegacyMetrics metrics) throws IOException {
+ this.metrics = metrics;
+ this.name = name;
+ this.numCounter = name + ".n";
+ this.timeCounter = name + ".t";
+ this.avgTimeCounter = name + ".avg_t";
+ open();
+ }
+
+ public Long getNumCounter() throws IOException {
+ return (Long) metrics.get(numCounter);
+ }
+
+ public Long getTimeCounter() throws IOException {
+ return (Long) metrics.get(timeCounter);
+ }
+
+ /**
+ * Opens scope, and makes note of the time started, increments run counter
+ * @throws IOException
+ *
+ */
+ public void open() throws IOException {
+ if (!isOpen) {
+ isOpen = true;
+ startTime = System.currentTimeMillis();
+ } else {
+ throw new IOException("Scope named " + name + " is not closed, cannot be opened.");
+ }
+ }
+
+ /**
+ * Closes scope, and records the time taken
+ * @throws IOException
+ */
+ public void close() throws IOException {
+ if (isOpen) {
+ Long endTime = System.currentTimeMillis();
+ synchronized(metrics) {
+ Long num = metrics.incrementCounter(numCounter);
+ Long time = metrics.incrementCounter(timeCounter, endTime - startTime);
+ if (num != null && time != null) {
+ metrics.set(avgTimeCounter, Double.valueOf(time.doubleValue() / num.doubleValue()));
+ }
+ }
+ } else {
+ throw new IOException("Scope named " + name + " is not open, cannot be closed.");
+ }
+ isOpen = false;
+ }
+
+
+ /**
+ * Closes scope if open, and reopens it
+ * @throws IOException
+ */
+ public void reopen() throws IOException {
+ if(isOpen) {
+ close();
+ }
+ open();
+ }
+
+ }
+
+ private static final MetricsMBean metrics = new MetricsMBeanImpl();
+
+ private static final ObjectName oname;
+ static {
+ try {
+ oname = new ObjectName(
+ "org.apache.hadoop.hive.common.metrics:type=MetricsMBean");
+ } catch (MalformedObjectNameException mone) {
+ throw new RuntimeException(mone);
+ }
+ }
+
+
+ private static final ThreadLocal<HashMap<String, MetricsScope>> threadLocalScopes
+ = new ThreadLocal<HashMap<String,MetricsScope>>() {
+ @Override
+ protected HashMap<String,MetricsScope> initialValue() {
+ return new HashMap<String,MetricsScope>();
+ }
+ };
+
+ private boolean initialized = false;
+
+ public void init(HiveConf conf) throws Exception {
+ if (!initialized) {
+ MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
+ mbs.registerMBean(metrics, oname);
+ initialized = true;
+ }
+ }
+
+ public boolean isInitialized() {
+ return initialized;
+ }
+
+ public Long incrementCounter(String name) throws IOException{
+ if (!initialized) {
+ return null;
+ }
+ return incrementCounter(name,Long.valueOf(1));
+ }
+
+ public Long incrementCounter(String name, long increment) throws IOException{
+ if (!initialized) {
+ return null;
+ }
+ Long value;
+ synchronized(metrics) {
+ if (!metrics.hasKey(name)) {
+ value = Long.valueOf(increment);
+ set(name, value);
+ } else {
+ value = ((Long)get(name)) + increment;
+ set(name, value);
+ }
+ }
+ return value;
+ }
+
+ public void set(String name, Object value) throws IOException{
+ if (!initialized) {
+ return;
+ }
+ metrics.put(name,value);
+ }
+
+ public Object get(String name) throws IOException{
+ if (!initialized) {
+ return null;
+ }
+ return metrics.get(name);
+ }
+
+ public void startScope(String name) throws IOException{
+ if (!initialized) {
+ return;
+ }
+ if (threadLocalScopes.get().containsKey(name)) {
+ threadLocalScopes.get().get(name).open();
+ } else {
+ threadLocalScopes.get().put(name, new MetricsScope(name, this));
+ }
+ }
+
+ public MetricsScope getScope(String name) throws IOException {
+ if (!initialized) {
+ return null;
+ }
+ if (threadLocalScopes.get().containsKey(name)) {
+ return threadLocalScopes.get().get(name);
+ } else {
+ throw new IOException("No metrics scope named " + name);
+ }
+ }
+
+ public void endScope(String name) throws IOException{
+ if (!initialized) {
+ return;
+ }
+ if (threadLocalScopes.get().containsKey(name)) {
+ threadLocalScopes.get().get(name).close();
+ }
+ }
+
+ /**
+ * Resets the static context state to initial.
+ * Used primarily for testing purposes.
+ *
+ * Note that threadLocalScopes ThreadLocal is *not* cleared in this call.
+ */
+ public void deInit() throws Exception {
+ synchronized (metrics) {
+ if (initialized) {
+ MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
+ if (mbs.isRegistered(oname)) {
+ mbs.unregisterMBean(oname);
+ }
+ metrics.clear();
+ initialized = false;
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/bd84e87c/common/src/java/org/apache/hadoop/hive/common/metrics/Metrics.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/common/metrics/Metrics.java b/common/src/java/org/apache/hadoop/hive/common/metrics/Metrics.java
deleted file mode 100644
index 01c9d1d..0000000
--- a/common/src/java/org/apache/hadoop/hive/common/metrics/Metrics.java
+++ /dev/null
@@ -1,253 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hive.common.metrics;
-
-import java.io.IOException;
-import java.lang.management.ManagementFactory;
-import java.util.HashMap;
-
-import javax.management.MBeanServer;
-import javax.management.MalformedObjectNameException;
-import javax.management.ObjectName;
-
-/**
- * Metrics Subsystem - allows exposure of a number of named parameters/counters
- * via jmx, intended to be used as a static subsystem
- *
- * Has a couple of primary ways it can be used:
- * (i) Using the set and get methods to set and get named parameters
- * (ii) Using the incrementCounter method to increment and set named
- * parameters in one go, rather than having to make a get and then a set.
- * (iii) Using the startScope and endScope methods to start and end
- * named "scopes" that record the number of times they've been
- * instantiated and amount of time(in milliseconds) spent inside
- * the scopes.
- */
-public class Metrics {
-
- private Metrics() {
- // block
- }
-
- /**
- * MetricsScope : A class that encapsulates an idea of a metered scope.
- * Instantiating a named scope and then closing it exposes two counters:
- * (i) a "number of calls" counter ( <name>.n ), and
- * (ii) a "number of msecs spent between scope open and close" counter. ( <name>.t)
- */
- public static class MetricsScope {
-
- final String name;
- final String numCounter;
- final String timeCounter;
- final String avgTimeCounter;
-
- private boolean isOpen = false;
- private Long startTime = null;
-
- /**
- * Instantiates a named scope - intended to only be called by Metrics, so locally scoped.
- * @param name - name of the variable
- * @throws IOException
- */
- private MetricsScope(String name) throws IOException {
- this.name = name;
- this.numCounter = name + ".n";
- this.timeCounter = name + ".t";
- this.avgTimeCounter = name + ".avg_t";
- open();
- }
-
- public Long getNumCounter() throws IOException {
- return (Long)Metrics.get(numCounter);
- }
-
- public Long getTimeCounter() throws IOException {
- return (Long)Metrics.get(timeCounter);
- }
-
- /**
- * Opens scope, and makes note of the time started, increments run counter
- * @throws IOException
- *
- */
- public void open() throws IOException {
- if (!isOpen) {
- isOpen = true;
- startTime = System.currentTimeMillis();
- } else {
- throw new IOException("Scope named " + name + " is not closed, cannot be opened.");
- }
- }
-
- /**
- * Closes scope, and records the time taken
- * @throws IOException
- */
- public void close() throws IOException {
- if (isOpen) {
- Long endTime = System.currentTimeMillis();
- synchronized(metrics) {
- Long num = Metrics.incrementCounter(numCounter);
- Long time = Metrics.incrementCounter(timeCounter, endTime - startTime);
- if (num != null && time != null) {
- Metrics.set(avgTimeCounter, Double.valueOf(time.doubleValue() / num.doubleValue()));
- }
- }
- } else {
- throw new IOException("Scope named " + name + " is not open, cannot be closed.");
- }
- isOpen = false;
- }
-
-
- /**
- * Closes scope if open, and reopens it
- * @throws IOException
- */
- public void reopen() throws IOException {
- if(isOpen) {
- close();
- }
- open();
- }
-
- }
-
- private static final MetricsMBean metrics = new MetricsMBeanImpl();
-
- private static final ObjectName oname;
- static {
- try {
- oname = new ObjectName(
- "org.apache.hadoop.hive.common.metrics:type=MetricsMBean");
- } catch (MalformedObjectNameException mone) {
- throw new RuntimeException(mone);
- }
- }
-
-
- private static final ThreadLocal<HashMap<String, MetricsScope>> threadLocalScopes
- = new ThreadLocal<HashMap<String,MetricsScope>>() {
- @Override
- protected HashMap<String,MetricsScope> initialValue() {
- return new HashMap<String,MetricsScope>();
- }
- };
-
- private static boolean initialized = false;
-
- public static void init() throws Exception {
- synchronized (metrics) {
- if (!initialized) {
- MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
- mbs.registerMBean(metrics, oname);
- initialized = true;
- }
- }
- }
-
- public static Long incrementCounter(String name) throws IOException{
- if (!initialized) {
- return null;
- }
- return incrementCounter(name,Long.valueOf(1));
- }
-
- public static Long incrementCounter(String name, long increment) throws IOException{
- if (!initialized) {
- return null;
- }
- Long value;
- synchronized(metrics) {
- if (!metrics.hasKey(name)) {
- value = Long.valueOf(increment);
- set(name, value);
- } else {
- value = ((Long)get(name)) + increment;
- set(name, value);
- }
- }
- return value;
- }
-
- public static void set(String name, Object value) throws IOException{
- if (!initialized) {
- return;
- }
- metrics.put(name,value);
- }
-
- public static Object get(String name) throws IOException{
- if (!initialized) {
- return null;
- }
- return metrics.get(name);
- }
-
- public static MetricsScope startScope(String name) throws IOException{
- if (!initialized) {
- return null;
- }
- if (threadLocalScopes.get().containsKey(name)) {
- threadLocalScopes.get().get(name).open();
- } else {
- threadLocalScopes.get().put(name, new MetricsScope(name));
- }
- return threadLocalScopes.get().get(name);
- }
-
- public static MetricsScope getScope(String name) throws IOException {
- if (!initialized) {
- return null;
- }
- if (threadLocalScopes.get().containsKey(name)) {
- return threadLocalScopes.get().get(name);
- } else {
- throw new IOException("No metrics scope named " + name);
- }
- }
-
- public static void endScope(String name) throws IOException{
- if (!initialized) {
- return;
- }
- if (threadLocalScopes.get().containsKey(name)) {
- threadLocalScopes.get().get(name).close();
- }
- }
-
- /**
- * Resets the static context state to initial.
- * Used primarily for testing purposes.
- *
- * Note that threadLocalScopes ThreadLocal is *not* cleared in this call.
- */
- static void uninit() throws Exception {
- synchronized (metrics) {
- if (initialized) {
- MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
- if (mbs.isRegistered(oname)) {
- mbs.unregisterMBean(oname);
- }
- metrics.clear();
- initialized = false;
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/hive/blob/bd84e87c/common/src/java/org/apache/hadoop/hive/common/metrics/common/Metrics.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/common/metrics/common/Metrics.java b/common/src/java/org/apache/hadoop/hive/common/metrics/common/Metrics.java
new file mode 100644
index 0000000..13a5336
--- /dev/null
+++ b/common/src/java/org/apache/hadoop/hive/common/metrics/common/Metrics.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.common.metrics.common;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+
+import java.io.IOException;
+
+/**
+ * Generic Metics interface.
+ */
+public interface Metrics {
+
+ /**
+ * Initialize Metrics system with given Hive configuration.
+ * @param conf
+ */
+ public void init(HiveConf conf) throws Exception;
+
+ /**
+ * Deinitializes the Metrics system.
+ */
+ public void deInit() throws Exception;
+
+ /**
+ * @param name
+ * @throws IOException
+ */
+ public void startScope(String name) throws IOException;
+
+ public void endScope(String name) throws IOException;
+
+ //Counter-related methods
+
+ /**
+ * Increments a counter of the given name by 1.
+ * @param name
+ * @return
+ * @throws IOException
+ */
+ public Long incrementCounter(String name) throws IOException;
+
+ /**
+ * Increments a counter of the given name by "increment"
+ * @param name
+ * @param increment
+ * @return
+ * @throws IOException
+ */
+ public Long incrementCounter(String name, long increment) throws IOException;
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/bd84e87c/common/src/java/org/apache/hadoop/hive/common/metrics/common/MetricsFactory.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/common/metrics/common/MetricsFactory.java b/common/src/java/org/apache/hadoop/hive/common/metrics/common/MetricsFactory.java
new file mode 100644
index 0000000..12a309d
--- /dev/null
+++ b/common/src/java/org/apache/hadoop/hive/common/metrics/common/MetricsFactory.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.common.metrics.common;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.util.ReflectionUtils;
+
+/**
+ * Class that manages a static Metric instance for this process.
+ */
+public class MetricsFactory {
+
+ private static Metrics metrics;
+ private static Object initLock = new Object();
+
+ public synchronized static void init(HiveConf conf) throws Exception {
+ if (metrics == null) {
+ metrics = (Metrics) ReflectionUtils.newInstance(conf.getClassByName(
+ conf.getVar(HiveConf.ConfVars.HIVE_METRICS_CLASS)), conf);
+ }
+ metrics.init(conf);
+ }
+
+ public synchronized static Metrics getMetricsInstance() {
+ return metrics;
+ }
+
+ public synchronized static void deInit() throws Exception {
+ if (metrics != null) {
+ metrics.deInit();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/bd84e87c/common/src/java/org/apache/hadoop/hive/common/metrics/metrics2/CodahaleMetrics.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/common/metrics/metrics2/CodahaleMetrics.java b/common/src/java/org/apache/hadoop/hive/common/metrics/metrics2/CodahaleMetrics.java
new file mode 100644
index 0000000..e59da99
--- /dev/null
+++ b/common/src/java/org/apache/hadoop/hive/common/metrics/metrics2/CodahaleMetrics.java
@@ -0,0 +1,366 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.common.metrics.metrics2;
+
+import com.codahale.metrics.ConsoleReporter;
+import com.codahale.metrics.Counter;
+import com.codahale.metrics.ExponentiallyDecayingReservoir;
+import com.codahale.metrics.JmxReporter;
+import com.codahale.metrics.Metric;
+import com.codahale.metrics.MetricRegistry;
+import com.codahale.metrics.MetricSet;
+import com.codahale.metrics.Timer;
+import com.codahale.metrics.json.MetricsModule;
+import com.codahale.metrics.jvm.BufferPoolMetricSet;
+import com.codahale.metrics.jvm.ClassLoadingGaugeSet;
+import com.codahale.metrics.jvm.GarbageCollectorMetricSet;
+import com.codahale.metrics.jvm.MemoryUsageGaugeSet;
+import com.codahale.metrics.jvm.ThreadStatesGaugeSet;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Splitter;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+import com.google.common.collect.Lists;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hive.conf.HiveConf;
+
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.lang.management.ManagementFactory;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TimerTask;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * Codahale-backed Metrics implementation.
+ */
+public class CodahaleMetrics implements org.apache.hadoop.hive.common.metrics.common.Metrics {
+ public static final String API_PREFIX = "api_";
+ public static final Log LOGGER = LogFactory.getLog(CodahaleMetrics.class);
+
+ public final MetricRegistry metricRegistry = new MetricRegistry();
+ private final Lock timersLock = new ReentrantLock();
+ private final Lock countersLock = new ReentrantLock();
+
+ private LoadingCache<String, Timer> timers;
+ private LoadingCache<String, Counter> counters;
+
+ private boolean initialized = false;
+ private HiveConf conf;
+ private final Set<Closeable> reporters = new HashSet<Closeable>();
+
+ private final ThreadLocal<HashMap<String, MetricsScope>> threadLocalScopes
+ = new ThreadLocal<HashMap<String,MetricsScope>>() {
+ @Override
+ protected HashMap<String,MetricsScope> initialValue() {
+ return new HashMap<String,MetricsScope>();
+ }
+ };
+
+ public static class MetricsScope {
+
+ final String name;
+ final Timer timer;
+ Timer.Context timerContext;
+ CodahaleMetrics metrics;
+
+ private boolean isOpen = false;
+
+ /**
+ * Instantiates a named scope - intended to only be called by Metrics, so locally scoped.
+ * @param name - name of the variable
+ * @throws IOException
+ */
+ private MetricsScope(String name, CodahaleMetrics metrics) throws IOException {
+ this.name = name;
+ this.metrics = metrics;
+ this.timer = metrics.getTimer(name);
+ open();
+ }
+
+ /**
+ * Opens scope, and makes note of the time started, increments run counter
+ * @throws IOException
+ *
+ */
+ public void open() throws IOException {
+ if (!isOpen) {
+ isOpen = true;
+ this.timerContext = timer.time();
+ } else {
+ throw new IOException("Scope named " + name + " is not closed, cannot be opened.");
+ }
+ }
+
+ /**
+ * Closes scope, and records the time taken
+ * @throws IOException
+ */
+ public void close() throws IOException {
+ if (isOpen) {
+ timerContext.close();
+
+ } else {
+ throw new IOException("Scope named " + name + " is not open, cannot be closed.");
+ }
+ isOpen = false;
+ }
+ }
+
+ public synchronized void init(HiveConf conf) throws Exception {
+ if (initialized) {
+ return;
+ }
+
+ this.conf = conf;
+ //Codahale artifacts are lazily-created.
+ timers = CacheBuilder.newBuilder().build(
+ new CacheLoader<String, com.codahale.metrics.Timer>() {
+ @Override
+ public com.codahale.metrics.Timer load(String key) throws Exception {
+ Timer timer = new Timer(new ExponentiallyDecayingReservoir());
+ metricRegistry.register(key, timer);
+ return timer;
+ }
+ }
+ );
+ counters = CacheBuilder.newBuilder().build(
+ new CacheLoader<String, Counter>() {
+ @Override
+ public Counter load(String key) throws Exception {
+ Counter counter = new Counter();
+ metricRegistry.register(key, counter);
+ return counter;
+ }
+ }
+ );
+
+ //register JVM metrics
+ registerAll("gc", new GarbageCollectorMetricSet());
+ registerAll("buffers", new BufferPoolMetricSet(ManagementFactory.getPlatformMBeanServer()));
+ registerAll("memory", new MemoryUsageGaugeSet());
+ registerAll("threads", new ThreadStatesGaugeSet());
+ registerAll("classLoading", new ClassLoadingGaugeSet());
+
+ //Metrics reporter
+ Set<MetricsReporting> finalReporterList = new HashSet<MetricsReporting>();
+ List<String> metricsReporterNames = Lists.newArrayList(
+ Splitter.on(",").trimResults().omitEmptyStrings().split(conf.getVar(HiveConf.ConfVars.HIVE_METRICS_REPORTER)));
+
+ if(metricsReporterNames != null) {
+ for (String metricsReportingName : metricsReporterNames) {
+ try {
+ MetricsReporting reporter = MetricsReporting.valueOf(metricsReportingName.trim().toUpperCase());
+ finalReporterList.add(reporter);
+ } catch (IllegalArgumentException e) {
+ LOGGER.warn("Metrics reporter skipped due to invalid configured reporter: " + metricsReportingName);
+ }
+ }
+ }
+ initReporting(finalReporterList);
+ initialized = true;
+ }
+
+
+ public synchronized void deInit() throws Exception {
+ if (initialized) {
+ if (reporters != null) {
+ for (Closeable reporter : reporters) {
+ reporter.close();
+ }
+ }
+ for (Map.Entry<String, Metric> metric : metricRegistry.getMetrics().entrySet()) {
+ metricRegistry.remove(metric.getKey());
+ }
+ timers.invalidateAll();
+ counters.invalidateAll();
+ initialized = false;
+ }
+ }
+
+ public void startScope(String name) throws IOException {
+ synchronized (this) {
+ if (!initialized) {
+ return;
+ }
+ }
+ name = API_PREFIX + name;
+ if (threadLocalScopes.get().containsKey(name)) {
+ threadLocalScopes.get().get(name).open();
+ } else {
+ threadLocalScopes.get().put(name, new MetricsScope(name, this));
+ }
+ }
+
+ public void endScope(String name) throws IOException{
+ synchronized (this) {
+ if (!initialized) {
+ return;
+ }
+ }
+ name = API_PREFIX + name;
+ if (threadLocalScopes.get().containsKey(name)) {
+ threadLocalScopes.get().get(name).close();
+ }
+ }
+
+ public Long incrementCounter(String name) throws IOException {
+ return incrementCounter(name, 1);
+ }
+
+ public Long incrementCounter(String name, long increment) throws IOException {
+ String key = name;
+ try {
+ countersLock.lock();
+ counters.get(key).inc(increment);
+ return counters.get(key).getCount();
+ } catch(ExecutionException ee) {
+ throw new RuntimeException(ee);
+ } finally {
+ countersLock.unlock();
+ }
+ }
+
+ // This method is necessary to synchronize lazy-creation to the timers.
+ private Timer getTimer(String name) throws IOException {
+ String key = name;
+ try {
+ timersLock.lock();
+ Timer timer = timers.get(key);
+ return timer;
+ } catch (ExecutionException e) {
+ throw new IOException(e);
+ } finally {
+ timersLock.unlock();
+ }
+ }
+
+ private void registerAll(String prefix, MetricSet metricSet) {
+ for (Map.Entry<String, Metric> entry : metricSet.getMetrics().entrySet()) {
+ if (entry.getValue() instanceof MetricSet) {
+ registerAll(prefix + "." + entry.getKey(), (MetricSet) entry.getValue());
+ } else {
+ metricRegistry.register(prefix + "." + entry.getKey(), entry.getValue());
+ }
+ }
+ }
+
+ @VisibleForTesting
+ public MetricRegistry getMetricRegistry() {
+ return metricRegistry;
+ }
+
+ /**
+ * Should be only called once to initialize the reporters
+ */
+ private void initReporting(Set<MetricsReporting> reportingSet) throws Exception {
+ for (MetricsReporting reporting : reportingSet) {
+ switch(reporting) {
+ case CONSOLE:
+ final ConsoleReporter consoleReporter = ConsoleReporter.forRegistry(metricRegistry)
+ .convertRatesTo(TimeUnit.SECONDS)
+ .convertDurationsTo(TimeUnit.MILLISECONDS)
+ .build();
+ consoleReporter.start(1, TimeUnit.SECONDS);
+ reporters.add(consoleReporter);
+ break;
+ case JMX:
+ final JmxReporter jmxReporter = JmxReporter.forRegistry(metricRegistry)
+ .convertRatesTo(TimeUnit.SECONDS)
+ .convertDurationsTo(TimeUnit.MILLISECONDS)
+ .build();
+ jmxReporter.start();
+ reporters.add(jmxReporter);
+ break;
+ case JSON_FILE:
+ final JsonFileReporter jsonFileReporter = new JsonFileReporter();
+ jsonFileReporter.start();
+ reporters.add(jsonFileReporter);
+ break;
+ }
+ }
+ }
+
+ class JsonFileReporter implements Closeable {
+ private ObjectMapper jsonMapper = null;
+ private java.util.Timer timer = null;
+
+ public void start() {
+ this.jsonMapper = new ObjectMapper().registerModule(new MetricsModule(TimeUnit.MILLISECONDS, TimeUnit.MILLISECONDS, false));
+ this.timer = new java.util.Timer(true);
+
+ long time = conf.getTimeVar(HiveConf.ConfVars.HIVE_METRICS_JSON_FILE_INTERVAL, TimeUnit.MILLISECONDS);
+ final String pathString = conf.getVar(HiveConf.ConfVars.HIVE_METRICS_JSON_FILE_LOCATION);
+
+ timer.schedule(new TimerTask() {
+ @Override
+ public void run() {
+ BufferedWriter bw = null;
+ try {
+ String json = jsonMapper.writerWithDefaultPrettyPrinter().writeValueAsString(metricRegistry);
+ Path tmpPath = new Path(pathString + ".tmp");
+ FileSystem fs = FileSystem.get(conf);
+ fs.delete(tmpPath, true);
+ bw = new BufferedWriter(new OutputStreamWriter(fs.create(tmpPath, true)));
+ bw.write(json);
+ bw.close();
+
+ Path path = new Path(pathString);
+ fs.rename(tmpPath, path);
+ fs.setPermission(path, FsPermission.createImmutable((short) 0644));
+ } catch (Exception e) {
+ LOGGER.warn("Error writing JSON Metrics to file", e);
+ } finally {
+ try {
+ if (bw != null) {
+ bw.close();
+ }
+ } catch (IOException e) {
+ //Ignore.
+ }
+ }
+
+
+ }
+ }, 0, time);
+ }
+
+ public void close() {
+ if (timer != null) {
+ this.timer.cancel();
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/bd84e87c/common/src/java/org/apache/hadoop/hive/common/metrics/metrics2/MetricsReporting.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/common/metrics/metrics2/MetricsReporting.java b/common/src/java/org/apache/hadoop/hive/common/metrics/metrics2/MetricsReporting.java
new file mode 100644
index 0000000..643246f
--- /dev/null
+++ b/common/src/java/org/apache/hadoop/hive/common/metrics/metrics2/MetricsReporting.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.common.metrics.metrics2;
+
+/**
+ * Reporting options for org.apache.hadoop.hive.common.metrics.metrics2.Metrics.
+ */
+public enum MetricsReporting {
+ JMX,
+ CONSOLE,
+ JSON_FILE
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/bd84e87c/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 4fe2ae8..f40d159 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -645,6 +645,7 @@ public class HiveConf extends Configuration {
"Maximum cache full % after which the cache cleaner thread kicks in."),
METASTORE_AGGREGATE_STATS_CACHE_CLEAN_UNTIL("hive.metastore.aggregate.stats.cache.clean.until", (float) 0.8,
"The cleaner thread cleans until cache reaches this % full size."),
+ METASTORE_METRICS("hive.metastore.metrics.enabled", false, "Enable metrics on the metastore."),
// Parameters for exporting metadata on table drop (requires the use of the)
// org.apache.hadoop.hive.ql.parse.MetaDataExportListener preevent listener
@@ -1688,6 +1689,7 @@ public class HiveConf extends Configuration {
" EXECUTION: Log completion of tasks\n" +
" PERFORMANCE: Execution + Performance logs \n" +
" VERBOSE: All logs" ),
+ HIVE_SERVER2_METRICS_ENABLED("hive.server2.metrics.enabled", false, "Enable metrics on the HiveServer2."),
// logging configuration
HIVE_LOG4J_FILE("hive.log4j.file", "",
"Hive log4j configuration file.\n" +
@@ -1715,7 +1717,21 @@ public class HiveConf extends Configuration {
HIVE_AUTOGEN_COLUMNALIAS_PREFIX_INCLUDEFUNCNAME(
"hive.autogen.columnalias.prefix.includefuncname", false,
"Whether to include function name in the column alias auto generated by Hive."),
-
+ HIVE_METRICS_CLASS("hive.service.metrics.class",
+ "org.apache.hadoop.hive.common.metrics.metrics2.CodahaleMetrics",
+ new StringSet(
+ "org.apache.hadoop.hive.common.metrics.metrics2.CodahaleMetrics",
+ "org.apache.hadoop.hive.common.metrics.LegacyMetrics"),
+ "Hive metrics subsystem implementation class."),
+ HIVE_METRICS_REPORTER("hive.service.metrics.reporter", "JSON_FILE, JMX",
+ "Reporter type for metric class org.apache.hadoop.hive.common.metrics.metrics2.CodahaleMetrics, comma separated list of JMX, CONSOLE, JSON_FILE"),
+ HIVE_METRICS_JSON_FILE_LOCATION("hive.service.metrics.file.location", "file:///tmp/my-logging.properties",
+ "For metric class org.apache.hadoop.hive.common.metrics.metrics2.CodahaleMetrics JSON_FILE reporter, the location of JSON metrics file. " +
+ "This file will get overwritten at every interval."),
+ HIVE_METRICS_JSON_FILE_INTERVAL("hive.service.metrics.file.frequency", "5s",
+ new TimeValidator(TimeUnit.MILLISECONDS),
+ "For metric class org.apache.hadoop.hive.common.metrics.metrics2.CodahaleMetrics JSON_FILE reporter, " +
+ "the frequency of updating JSON metrics file."),
HIVE_PERF_LOGGER("hive.exec.perf.logger", "org.apache.hadoop.hive.ql.log.PerfLogger",
"The class responsible for logging client side performance metrics. \n" +
"Must be a subclass of org.apache.hadoop.hive.ql.log.PerfLogger"),
http://git-wip-us.apache.org/repos/asf/hive/blob/bd84e87c/common/src/test/org/apache/hadoop/hive/common/metrics/TestLegacyMetrics.java
----------------------------------------------------------------------
diff --git a/common/src/test/org/apache/hadoop/hive/common/metrics/TestLegacyMetrics.java b/common/src/test/org/apache/hadoop/hive/common/metrics/TestLegacyMetrics.java
new file mode 100644
index 0000000..c14c7ee
--- /dev/null
+++ b/common/src/test/org/apache/hadoop/hive/common/metrics/TestLegacyMetrics.java
@@ -0,0 +1,295 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.common.metrics;
+
+import java.io.IOException;
+import java.lang.management.ManagementFactory;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import javax.management.Attribute;
+import javax.management.MBeanAttributeInfo;
+import javax.management.MBeanInfo;
+import javax.management.MBeanOperationInfo;
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+
+import org.apache.hadoop.hive.common.metrics.common.MetricsFactory;
+import org.apache.hadoop.hive.common.metrics.LegacyMetrics.MetricsScope;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import static org.junit.Assert.*;
+
+public class TestLegacyMetrics {
+
+ private static final String scopeName = "foo";
+ private static final long periodMs = 50L;
+ private static LegacyMetrics metrics;
+
+ @Before
+ public void before() throws Exception {
+ MetricsFactory.deInit();
+ HiveConf conf = new HiveConf();
+ conf.setVar(HiveConf.ConfVars.HIVE_METRICS_CLASS, LegacyMetrics.class.getCanonicalName());
+ MetricsFactory.init(conf);
+ metrics = (LegacyMetrics) MetricsFactory.getMetricsInstance();
+ }
+
+ @After
+ public void after() throws Exception {
+ MetricsFactory.deInit();
+ }
+
+ @Test
+ public void testMetricsMBean() throws Exception {
+ MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
+ final ObjectName oname = new ObjectName(
+ "org.apache.hadoop.hive.common.metrics:type=MetricsMBean");
+ MBeanInfo mBeanInfo = mbs.getMBeanInfo(oname);
+ // check implementation class:
+ assertEquals(MetricsMBeanImpl.class.getName(), mBeanInfo.getClassName());
+
+ // check reset operation:
+ MBeanOperationInfo[] oops = mBeanInfo.getOperations();
+ boolean resetFound = false;
+ for (MBeanOperationInfo op : oops) {
+ if ("reset".equals(op.getName())) {
+ resetFound = true;
+ break;
+ }
+ }
+ assertTrue(resetFound);
+
+ // add metric with a non-null value:
+ Attribute attr = new Attribute("fooMetric", Long.valueOf(-77));
+ mbs.setAttribute(oname, attr);
+
+ mBeanInfo = mbs.getMBeanInfo(oname);
+ MBeanAttributeInfo[] attrinuteInfos = mBeanInfo.getAttributes();
+ assertEquals(1, attrinuteInfos.length);
+ boolean attrFound = false;
+ for (MBeanAttributeInfo info : attrinuteInfos) {
+ if ("fooMetric".equals(info.getName())) {
+ assertEquals("java.lang.Long", info.getType());
+ assertTrue(info.isReadable());
+ assertTrue(info.isWritable());
+ assertFalse(info.isIs());
+
+ attrFound = true;
+ break;
+ }
+ }
+ assertTrue(attrFound);
+
+ // check metric value:
+ Object v = mbs.getAttribute(oname, "fooMetric");
+ assertEquals(Long.valueOf(-77), v);
+
+ // reset the bean:
+ Object result = mbs.invoke(oname, "reset", new Object[0], new String[0]);
+ assertNull(result);
+
+ // the metric value must be zeroed:
+ v = mbs.getAttribute(oname, "fooMetric");
+ assertEquals(Long.valueOf(0), v);
+ }
+
+ private <T> void expectIOE(Callable<T> c) throws Exception {
+ try {
+ T t = c.call();
+ fail("IOE expected but ["+t+"] was returned.");
+ } catch (IOException ioe) {
+ // ok, expected
+ }
+ }
+
+ @Test
+ public void testScopeSingleThread() throws Exception {
+ metrics.startScope(scopeName);
+ final MetricsScope fooScope = metrics.getScope(scopeName);
+ // the time and number counters become available only after the 1st
+ // scope close:
+ expectIOE(new Callable<Long>() {
+ @Override
+ public Long call() throws Exception {
+ Long num = fooScope.getNumCounter();
+ return num;
+ }
+ });
+ expectIOE(new Callable<Long>() {
+ @Override
+ public Long call() throws Exception {
+ Long time = fooScope.getTimeCounter();
+ return time;
+ }
+ });
+ // cannot open scope that is already open:
+ expectIOE(new Callable<Void>() {
+ @Override
+ public Void call() throws Exception {
+ fooScope.open();
+ return null;
+ }
+ });
+
+ assertSame(fooScope, metrics.getScope(scopeName));
+ Thread.sleep(periodMs+ 1);
+ // 1st close:
+ // closing of open scope should be ok:
+ metrics.endScope(scopeName);
+ expectIOE(new Callable<Void>() {
+ @Override
+ public Void call() throws Exception {
+ metrics.endScope(scopeName); // closing of closed scope not allowed
+ return null;
+ }
+ });
+
+ assertEquals(Long.valueOf(1), fooScope.getNumCounter());
+ final long t1 = fooScope.getTimeCounter().longValue();
+ assertTrue(t1 > periodMs);
+
+ assertSame(fooScope, metrics.getScope(scopeName));
+
+ // opening allowed after closing:
+ metrics.startScope(scopeName);
+ // opening of already open scope not allowed:
+ expectIOE(new Callable<Void>() {
+ @Override
+ public Void call() throws Exception {
+ metrics.startScope(scopeName);
+ return null;
+ }
+ });
+
+ assertEquals(Long.valueOf(1), fooScope.getNumCounter());
+ assertEquals(t1, fooScope.getTimeCounter().longValue());
+
+ assertSame(fooScope, metrics.getScope(scopeName));
+ Thread.sleep(periodMs + 1);
+ // Reopening (close + open) allowed in opened state:
+ fooScope.reopen();
+
+ assertEquals(Long.valueOf(2), fooScope.getNumCounter());
+ assertTrue(fooScope.getTimeCounter().longValue() > 2 * periodMs);
+
+ Thread.sleep(periodMs + 1);
+ // 3rd close:
+ fooScope.close();
+
+ assertEquals(Long.valueOf(3), fooScope.getNumCounter());
+ assertTrue(fooScope.getTimeCounter().longValue() > 3 * periodMs);
+ Double avgT = (Double) metrics.get("foo.avg_t");
+ assertTrue(avgT.doubleValue() > periodMs);
+ }
+
+ @Test
+ public void testScopeConcurrency() throws Exception {
+ metrics.startScope(scopeName);
+ MetricsScope fooScope = metrics.getScope(scopeName);
+ final int threads = 10;
+ ExecutorService executorService = Executors.newFixedThreadPool(threads);
+ for (int i=0; i<threads; i++) {
+ final int n = i;
+ executorService.submit(new Callable<Void>() {
+ @Override
+ public Void call() throws Exception {
+ testScopeImpl(n);
+ return null;
+ }
+ });
+ }
+ executorService.shutdown();
+ assertTrue(executorService.awaitTermination(periodMs * 3 * threads, TimeUnit.MILLISECONDS));
+
+ fooScope = metrics.getScope(scopeName);
+ assertEquals(Long.valueOf(3 * threads), fooScope.getNumCounter());
+ assertTrue(fooScope.getTimeCounter().longValue() > 3 * periodMs * threads);
+ Double avgT = (Double) metrics.get("foo.avg_t");
+ assertTrue(avgT.doubleValue() > periodMs);
+ metrics.endScope(scopeName);
+ }
+
+ void testScopeImpl(int n) throws Exception {
+ metrics.startScope(scopeName);
+ final MetricsScope fooScope = metrics.getScope(scopeName);
+ // cannot open scope that is already open:
+ expectIOE(new Callable<Void>() {
+ @Override
+ public Void call() throws Exception {
+ fooScope.open();
+ return null;
+ }
+ });
+
+ assertSame(fooScope, metrics.getScope(scopeName));
+ Thread.sleep(periodMs+ 1);
+ // 1st close:
+ metrics.endScope(scopeName); // closing of open scope should be ok.
+
+ assertTrue(fooScope.getNumCounter().longValue() >= 1);
+ final long t1 = fooScope.getTimeCounter().longValue();
+ assertTrue(t1 > periodMs);
+
+ expectIOE(new Callable<Void>() {
+ @Override
+ public Void call() throws Exception {
+ metrics.endScope(scopeName); // closing of closed scope not allowed
+ return null;
+ }
+ });
+
+ assertSame(fooScope, metrics.getScope(scopeName));
+
+ // opening allowed after closing:
+ metrics.startScope(scopeName);
+
+ assertTrue(fooScope.getNumCounter().longValue() >= 1);
+ assertTrue(fooScope.getTimeCounter().longValue() >= t1);
+
+ // opening of already open scope not allowed:
+ expectIOE(new Callable<Void>() {
+ @Override
+ public Void call() throws Exception {
+ metrics.startScope(scopeName);
+ return null;
+ }
+ });
+
+ assertSame(fooScope, metrics.getScope(scopeName));
+ Thread.sleep(periodMs + 1);
+ // Reopening (close + open) allowed in opened state:
+ fooScope.reopen();
+
+ assertTrue(fooScope.getNumCounter().longValue() >= 2);
+ assertTrue(fooScope.getTimeCounter().longValue() > 2 * periodMs);
+
+ Thread.sleep(periodMs + 1);
+ // 3rd close:
+ fooScope.close();
+
+ assertTrue(fooScope.getNumCounter().longValue() >= 3);
+ assertTrue(fooScope.getTimeCounter().longValue() > 3 * periodMs);
+ Double avgT = (Double) metrics.get("foo.avg_t");
+ assertTrue(avgT.doubleValue() > periodMs);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/bd84e87c/common/src/test/org/apache/hadoop/hive/common/metrics/TestMetrics.java
----------------------------------------------------------------------
diff --git a/common/src/test/org/apache/hadoop/hive/common/metrics/TestMetrics.java b/common/src/test/org/apache/hadoop/hive/common/metrics/TestMetrics.java
deleted file mode 100644
index e85d3f8..0000000
--- a/common/src/test/org/apache/hadoop/hive/common/metrics/TestMetrics.java
+++ /dev/null
@@ -1,286 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hive.common.metrics;
-
-import java.io.IOException;
-import java.lang.management.ManagementFactory;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-
-import javax.management.Attribute;
-import javax.management.MBeanAttributeInfo;
-import javax.management.MBeanInfo;
-import javax.management.MBeanOperationInfo;
-import javax.management.MBeanServer;
-import javax.management.ObjectName;
-
-import org.apache.hadoop.hive.common.metrics.Metrics.MetricsScope;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import static org.junit.Assert.*;
-
-public class TestMetrics {
-
- private static final String scopeName = "foo";
- private static final long periodMs = 50L;
-
- @Before
- public void before() throws Exception {
- Metrics.uninit();
- Metrics.init();
- }
-
- @After
- public void after() throws Exception {
- Metrics.uninit();
- }
-
- @Test
- public void testMetricsMBean() throws Exception {
- MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
- final ObjectName oname = new ObjectName(
- "org.apache.hadoop.hive.common.metrics:type=MetricsMBean");
- MBeanInfo mBeanInfo = mbs.getMBeanInfo(oname);
- // check implementation class:
- assertEquals(MetricsMBeanImpl.class.getName(), mBeanInfo.getClassName());
-
- // check reset operation:
- MBeanOperationInfo[] oops = mBeanInfo.getOperations();
- boolean resetFound = false;
- for (MBeanOperationInfo op : oops) {
- if ("reset".equals(op.getName())) {
- resetFound = true;
- break;
- }
- }
- assertTrue(resetFound);
-
- // add metric with a non-null value:
- Attribute attr = new Attribute("fooMetric", Long.valueOf(-77));
- mbs.setAttribute(oname, attr);
-
- mBeanInfo = mbs.getMBeanInfo(oname);
- MBeanAttributeInfo[] attrinuteInfos = mBeanInfo.getAttributes();
- assertEquals(1, attrinuteInfos.length);
- boolean attrFound = false;
- for (MBeanAttributeInfo info : attrinuteInfos) {
- if ("fooMetric".equals(info.getName())) {
- assertEquals("java.lang.Long", info.getType());
- assertTrue(info.isReadable());
- assertTrue(info.isWritable());
- assertFalse(info.isIs());
-
- attrFound = true;
- break;
- }
- }
- assertTrue(attrFound);
-
- // check metric value:
- Object v = mbs.getAttribute(oname, "fooMetric");
- assertEquals(Long.valueOf(-77), v);
-
- // reset the bean:
- Object result = mbs.invoke(oname, "reset", new Object[0], new String[0]);
- assertNull(result);
-
- // the metric value must be zeroed:
- v = mbs.getAttribute(oname, "fooMetric");
- assertEquals(Long.valueOf(0), v);
- }
-
- private <T> void expectIOE(Callable<T> c) throws Exception {
- try {
- T t = c.call();
- fail("IOE expected but ["+t+"] was returned.");
- } catch (IOException ioe) {
- // ok, expected
- }
- }
-
- @Test
- public void testScopeSingleThread() throws Exception {
- final MetricsScope fooScope = Metrics.startScope(scopeName);
- // the time and number counters become available only after the 1st
- // scope close:
- expectIOE(new Callable<Long>() {
- @Override
- public Long call() throws Exception {
- Long num = fooScope.getNumCounter();
- return num;
- }
- });
- expectIOE(new Callable<Long>() {
- @Override
- public Long call() throws Exception {
- Long time = fooScope.getTimeCounter();
- return time;
- }
- });
- // cannot open scope that is already open:
- expectIOE(new Callable<Void>() {
- @Override
- public Void call() throws Exception {
- fooScope.open();
- return null;
- }
- });
-
- assertSame(fooScope, Metrics.getScope(scopeName));
- Thread.sleep(periodMs+1);
- // 1st close:
- // closing of open scope should be ok:
- Metrics.endScope(scopeName);
- expectIOE(new Callable<Void>() {
- @Override
- public Void call() throws Exception {
- Metrics.endScope(scopeName); // closing of closed scope not allowed
- return null;
- }
- });
-
- assertEquals(Long.valueOf(1), fooScope.getNumCounter());
- final long t1 = fooScope.getTimeCounter().longValue();
- assertTrue(t1 > periodMs);
-
- assertSame(fooScope, Metrics.getScope(scopeName));
-
- // opening allowed after closing:
- Metrics.startScope(scopeName);
- // opening of already open scope not allowed:
- expectIOE(new Callable<Void>() {
- @Override
- public Void call() throws Exception {
- Metrics.startScope(scopeName);
- return null;
- }
- });
-
- assertEquals(Long.valueOf(1), fooScope.getNumCounter());
- assertEquals(t1, fooScope.getTimeCounter().longValue());
-
- assertSame(fooScope, Metrics.getScope(scopeName));
- Thread.sleep(periodMs + 1);
- // Reopening (close + open) allowed in opened state:
- fooScope.reopen();
-
- assertEquals(Long.valueOf(2), fooScope.getNumCounter());
- assertTrue(fooScope.getTimeCounter().longValue() > 2 * periodMs);
-
- Thread.sleep(periodMs + 1);
- // 3rd close:
- fooScope.close();
-
- assertEquals(Long.valueOf(3), fooScope.getNumCounter());
- assertTrue(fooScope.getTimeCounter().longValue() > 3 * periodMs);
- Double avgT = (Double)Metrics.get("foo.avg_t");
- assertTrue(avgT.doubleValue() > periodMs);
- }
-
- @Test
- public void testScopeConcurrency() throws Exception {
- MetricsScope fooScope = Metrics.startScope(scopeName);
- final int threads = 10;
- ExecutorService executorService = Executors.newFixedThreadPool(threads);
- for (int i=0; i<threads; i++) {
- final int n = i;
- executorService.submit(new Callable<Void>() {
- @Override
- public Void call() throws Exception {
- testScopeImpl(n);
- return null;
- }
- });
- }
- executorService.shutdown();
- assertTrue(executorService.awaitTermination(periodMs * 3 * threads, TimeUnit.MILLISECONDS));
-
- fooScope = Metrics.getScope(scopeName);
- assertEquals(Long.valueOf(3 * threads), fooScope.getNumCounter());
- assertTrue(fooScope.getTimeCounter().longValue() > 3 * periodMs * threads);
- Double avgT = (Double)Metrics.get("foo.avg_t");
- assertTrue(avgT.doubleValue() > periodMs);
- Metrics.endScope(scopeName);
- }
-
- void testScopeImpl(int n) throws Exception {
- final MetricsScope fooScope = Metrics.startScope(scopeName);
- // cannot open scope that is already open:
- expectIOE(new Callable<Void>() {
- @Override
- public Void call() throws Exception {
- fooScope.open();
- return null;
- }
- });
-
- assertSame(fooScope, Metrics.getScope(scopeName));
- Thread.sleep(periodMs+1);
- // 1st close:
- Metrics.endScope(scopeName); // closing of open scope should be ok.
-
- assertTrue(fooScope.getNumCounter().longValue() >= 1);
- final long t1 = fooScope.getTimeCounter().longValue();
- assertTrue(t1 > periodMs);
-
- expectIOE(new Callable<Void>() {
- @Override
- public Void call() throws Exception {
- Metrics.endScope(scopeName); // closing of closed scope not allowed
- return null;
- }
- });
-
- assertSame(fooScope, Metrics.getScope(scopeName));
-
- // opening allowed after closing:
- Metrics.startScope(scopeName);
-
- assertTrue(fooScope.getNumCounter().longValue() >= 1);
- assertTrue(fooScope.getTimeCounter().longValue() >= t1);
-
- // opening of already open scope not allowed:
- expectIOE(new Callable<Void>() {
- @Override
- public Void call() throws Exception {
- Metrics.startScope(scopeName);
- return null;
- }
- });
-
- assertSame(fooScope, Metrics.getScope(scopeName));
- Thread.sleep(periodMs + 1);
- // Reopening (close + open) allowed in opened state:
- fooScope.reopen();
-
- assertTrue(fooScope.getNumCounter().longValue() >= 2);
- assertTrue(fooScope.getTimeCounter().longValue() > 2 * periodMs);
-
- Thread.sleep(periodMs + 1);
- // 3rd close:
- fooScope.close();
-
- assertTrue(fooScope.getNumCounter().longValue() >= 3);
- assertTrue(fooScope.getTimeCounter().longValue() > 3 * periodMs);
- Double avgT = (Double)Metrics.get("foo.avg_t");
- assertTrue(avgT.doubleValue() > periodMs);
- }
-}
http://git-wip-us.apache.org/repos/asf/hive/blob/bd84e87c/common/src/test/org/apache/hadoop/hive/common/metrics/metrics2/TestCodahaleMetrics.java
----------------------------------------------------------------------
diff --git a/common/src/test/org/apache/hadoop/hive/common/metrics/metrics2/TestCodahaleMetrics.java b/common/src/test/org/apache/hadoop/hive/common/metrics/metrics2/TestCodahaleMetrics.java
new file mode 100644
index 0000000..8749349
--- /dev/null
+++ b/common/src/test/org/apache/hadoop/hive/common/metrics/metrics2/TestCodahaleMetrics.java
@@ -0,0 +1,138 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.common.metrics.metrics2;
+
+import com.codahale.metrics.Counter;
+import com.codahale.metrics.MetricRegistry;
+import com.codahale.metrics.Timer;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.hadoop.hive.common.metrics.common.MetricsFactory;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.shims.ShimLoader;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Unit test for new Metrics subsystem.
+ */
+public class TestCodahaleMetrics {
+
+ private static File workDir = new File(System.getProperty("test.tmp.dir"));
+ private static File jsonReportFile;
+ public static MetricRegistry metricRegistry;
+
+ @Before
+ public void before() throws Exception {
+ HiveConf conf = new HiveConf();
+
+ jsonReportFile = new File(workDir, "json_reporting");
+ jsonReportFile.delete();
+ String defaultFsName = ShimLoader.getHadoopShims().getHadoopConfNames().get("HADOOPFS");
+ conf.set(defaultFsName, "local");
+ conf.setVar(HiveConf.ConfVars.HIVE_METRICS_CLASS, CodahaleMetrics.class.getCanonicalName());
+ conf.setVar(HiveConf.ConfVars.HIVE_METRICS_REPORTER, MetricsReporting.JSON_FILE.name() + "," + MetricsReporting.JMX.name());
+ conf.setVar(HiveConf.ConfVars.HIVE_METRICS_JSON_FILE_LOCATION, jsonReportFile.toString());
+ conf.setVar(HiveConf.ConfVars.HIVE_METRICS_JSON_FILE_INTERVAL, "100ms");
+
+ MetricsFactory.init(conf);
+ metricRegistry = ((CodahaleMetrics) MetricsFactory.getMetricsInstance()).getMetricRegistry();
+ }
+
+ @After
+ public void after() throws Exception {
+ MetricsFactory.deInit();
+ }
+
+ @Test
+ public void testScope() throws Exception {
+ int runs = 5;
+ for (int i = 0; i < runs; i++) {
+ MetricsFactory.getMetricsInstance().startScope("method1");
+ MetricsFactory.getMetricsInstance().endScope("method1");
+ }
+
+ Timer timer = metricRegistry.getTimers().get("api_method1");
+ Assert.assertEquals(5, timer.getCount());
+ Assert.assertTrue(timer.getMeanRate() > 0);
+ }
+
+
+ @Test
+ public void testCount() throws Exception {
+ int runs = 5;
+ for (int i = 0; i < runs; i++) {
+ MetricsFactory.getMetricsInstance().incrementCounter("count1");
+ }
+ Counter counter = metricRegistry.getCounters().get("count1");
+ Assert.assertEquals(5L, counter.getCount());
+ }
+
+ @Test
+ public void testConcurrency() throws Exception {
+ int threads = 4;
+ ExecutorService executorService = Executors.newFixedThreadPool(threads);
+ for (int i=0; i< threads; i++) {
+ final int n = i;
+ executorService.submit(new Callable<Void>() {
+ @Override
+ public Void call() throws Exception {
+ MetricsFactory.getMetricsInstance().startScope("method2");
+ MetricsFactory.getMetricsInstance().endScope("method2");
+ return null;
+ }
+ });
+ }
+ executorService.shutdown();
+ assertTrue(executorService.awaitTermination(10000, TimeUnit.MILLISECONDS));
+ Timer timer = metricRegistry.getTimers().get("api_method2");
+ Assert.assertEquals(4, timer.getCount());
+ Assert.assertTrue(timer.getMeanRate() > 0);
+ }
+
+ @Test
+ public void testFileReporting() throws Exception {
+ int runs = 5;
+ for (int i = 0; i < runs; i++) {
+ MetricsFactory.getMetricsInstance().incrementCounter("count2");
+ Thread.sleep(100);
+ }
+
+ Thread.sleep(2000);
+ byte[] jsonData = Files.readAllBytes(Paths.get(jsonReportFile.getAbsolutePath()));
+ ObjectMapper objectMapper = new ObjectMapper();
+
+ JsonNode rootNode = objectMapper.readTree(jsonData);
+ JsonNode countersNode = rootNode.path("counters");
+ JsonNode methodCounterNode = countersNode.path("count2");
+ JsonNode countNode = methodCounterNode.path("count");
+ Assert.assertEquals(countNode.asInt(), 5);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/bd84e87c/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestMetaStoreMetrics.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestMetaStoreMetrics.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestMetaStoreMetrics.java
new file mode 100644
index 0000000..25f34d1
--- /dev/null
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestMetaStoreMetrics.java
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.metastore;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import junit.framework.TestCase;
+import org.apache.hadoop.hive.cli.CliSessionState;
+import org.apache.hadoop.hive.common.metrics.metrics2.MetricsReporting;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.Driver;
+import org.apache.hadoop.hive.ql.session.SessionState;
+import org.apache.hadoop.hive.shims.ShimLoader;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+
+/**
+ * Tests Hive Metastore Metrics.
+ */
+public class TestMetaStoreMetrics {
+
+ private static File workDir = new File(System.getProperty("test.tmp.dir"));
+ private static File jsonReportFile;
+
+ private static HiveConf hiveConf;
+ private static Driver driver;
+
+
+ @Before
+ public void before() throws Exception {
+
+ int port = MetaStoreUtils.findFreePort();
+
+ jsonReportFile = new File(workDir, "json_reporting");
+ jsonReportFile.delete();
+
+ hiveConf = new HiveConf(TestMetaStoreMetrics.class);
+ hiveConf.setVar(HiveConf.ConfVars.METASTOREURIS, "thrift://localhost:" + port);
+ hiveConf.setIntVar(HiveConf.ConfVars.METASTORETHRIFTCONNECTIONRETRIES, 3);
+ hiveConf.setBoolVar(HiveConf.ConfVars.METASTORE_METRICS, true);
+ hiveConf.setBoolVar(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY, false);
+ hiveConf.setVar(HiveConf.ConfVars.HIVE_METRICS_REPORTER, MetricsReporting.JSON_FILE.name() + "," + MetricsReporting.JMX.name());
+ hiveConf.setVar(HiveConf.ConfVars.HIVE_METRICS_JSON_FILE_LOCATION, jsonReportFile.toString());
+ hiveConf.setVar(HiveConf.ConfVars.HIVE_METRICS_JSON_FILE_INTERVAL, "100ms");
+
+ MetaStoreUtils.startMetaStore(port, ShimLoader.getHadoopThriftAuthBridge(), hiveConf);
+
+ SessionState.start(new CliSessionState(hiveConf));
+ driver = new Driver(hiveConf);
+ }
+
+ @Test
+ public void testMetricsFile() throws Exception {
+ driver.run("show databases");
+
+ //give timer thread a chance to print the metrics
+ Thread.sleep(2000);
+
+ //As the file is being written, try a few times.
+ //This can be replaced by CodahaleMetrics's JsonServlet reporter once it is exposed.
+ byte[] jsonData = Files.readAllBytes(Paths.get(jsonReportFile.getAbsolutePath()));
+ ObjectMapper objectMapper = new ObjectMapper();
+
+ JsonNode rootNode = objectMapper.readTree(jsonData);
+ JsonNode countersNode = rootNode.path("timers");
+ JsonNode methodCounterNode = countersNode.path("api_get_all_databases");
+ JsonNode countNode = methodCounterNode.path("count");
+ Assert.assertTrue(countNode.asInt() > 0);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/bd84e87c/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
index a3e5ed2..828c585 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
@@ -18,39 +18,14 @@
package org.apache.hadoop.hive.metastore;
-import static org.apache.commons.lang.StringUtils.join;
-import static org.apache.hadoop.hive.metastore.MetaStoreUtils.DEFAULT_DATABASE_COMMENT;
-import static org.apache.hadoop.hive.metastore.MetaStoreUtils.DEFAULT_DATABASE_NAME;
-import static org.apache.hadoop.hive.metastore.MetaStoreUtils.validateName;
-
-import java.io.IOException;
-import java.text.DateFormat;
-import java.text.SimpleDateFormat;
-import java.util.AbstractMap;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.Formatter;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.LinkedHashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Properties;
-import java.util.Set;
-import java.util.Timer;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.locks.Condition;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
-import java.util.regex.Pattern;
-
-import javax.jdo.JDOException;
-
+import com.facebook.fb303.FacebookBase;
+import com.facebook.fb303.fb_status;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Splitter;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableListMultimap;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Multimaps;
import org.apache.commons.cli.OptionBuilder;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -58,12 +33,13 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.common.FileUtils;
+import org.apache.hadoop.hive.common.JvmPauseMonitor;
import org.apache.hadoop.hive.common.LogUtils;
import org.apache.hadoop.hive.common.LogUtils.LogInitializationException;
import org.apache.hadoop.hive.common.classification.InterfaceAudience;
import org.apache.hadoop.hive.common.classification.InterfaceStability;
import org.apache.hadoop.hive.common.cli.CommonCliOptions;
-import org.apache.hadoop.hive.common.metrics.Metrics;
+import org.apache.hadoop.hive.common.metrics.common.MetricsFactory;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.metastore.api.AbortTxnRequest;
@@ -221,14 +197,35 @@ import org.apache.thrift.transport.TServerTransport;
import org.apache.thrift.transport.TTransport;
import org.apache.thrift.transport.TTransportFactory;
-import com.facebook.fb303.FacebookBase;
-import com.facebook.fb303.fb_status;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Splitter;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableListMultimap;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Multimaps;
+import javax.jdo.JDOException;
+import java.io.IOException;
+import java.text.DateFormat;
+import java.text.SimpleDateFormat;
+import java.util.AbstractMap;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Formatter;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Properties;
+import java.util.Set;
+import java.util.Timer;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.regex.Pattern;
+
+import static org.apache.commons.lang.StringUtils.join;
+import static org.apache.hadoop.hive.metastore.MetaStoreUtils.*;
/**
* TODO:pc remove application logic to a separate interface.
@@ -464,9 +461,10 @@ public class HiveMetaStore extends ThriftHiveMetastore {
}
}
- if (hiveConf.getBoolean("hive.metastore.metrics.enabled", false)) {
+ //Start Metrics for Embedded mode
+ if (hiveConf.getBoolVar(ConfVars.METASTORE_METRICS)) {
try {
- Metrics.init();
+ MetricsFactory.init(hiveConf);
} catch (Exception e) {
// log exception, but ignore inability to start
LOG.error("error in Metrics init: " + e.getClass().getName() + " "
@@ -750,11 +748,13 @@ public class HiveMetaStore extends ThriftHiveMetastore {
incrementCounter(function);
logInfo((getIpAddress() == null ? "" : "source:" + getIpAddress() + " ") +
function + extraLogInfo);
- try {
- Metrics.startScope(function);
- } catch (IOException e) {
- LOG.debug("Exception when starting metrics scope"
+ if (hiveConf.getBoolVar(ConfVars.METASTORE_METRICS)) {
+ try {
+ MetricsFactory.getMetricsInstance().startScope(function);
+ } catch (IOException e) {
+ LOG.debug("Exception when starting metrics scope"
+ e.getClass().getName() + " " + e.getMessage(), e);
+ }
}
return function;
}
@@ -792,10 +792,12 @@ public class HiveMetaStore extends ThriftHiveMetastore {
}
private void endFunction(String function, MetaStoreEndFunctionContext context) {
- try {
- Metrics.endScope(function);
- } catch (IOException e) {
- LOG.debug("Exception when closing metrics scope" + e);
+ if (hiveConf.getBoolVar(ConfVars.METASTORE_METRICS)) {
+ try {
+ MetricsFactory.getMetricsInstance().endScope(function);
+ } catch (IOException e) {
+ LOG.debug("Exception when closing metrics scope" + e);
+ }
}
for (MetaStoreEndFunctionListener listener : endFunctionListeners) {
@@ -819,6 +821,14 @@ public class HiveMetaStore extends ThriftHiveMetastore {
threadLocalMS.remove();
}
}
+ if (hiveConf.getBoolVar(ConfVars.METASTORE_METRICS)) {
+ try {
+ MetricsFactory.deInit();
+ } catch (Exception e) {
+ LOG.error("error in Metrics deinit: " + e.getClass().getName() + " "
+ + e.getMessage(), e);
+ }
+ }
logInfo("Metastore shutdown complete.");
}
@@ -5914,6 +5924,16 @@ public class HiveMetaStore extends ThriftHiveMetastore {
}
});
+ //Start Metrics for Standalone (Remote) Mode
+ if (conf.getBoolVar(ConfVars.METASTORE_METRICS)) {
+ try {
+ MetricsFactory.init(conf);
+ } catch (Exception e) {
+ // log exception, but ignore inability to start
+ LOG.error("error in Metrics init: " + e.getClass().getName() + " "
+ + e.getMessage(), e);
+ }
+ }
Lock startLock = new ReentrantLock();
Condition startCondition = startLock.newCondition();
@@ -6104,7 +6124,13 @@ public class HiveMetaStore extends ThriftHiveMetastore {
// Wrap the start of the threads in a catch Throwable loop so that any failures
// don't doom the rest of the metastore.
startLock.lock();
- ShimLoader.getHadoopShims().startPauseMonitor(conf);
+ try {
+ JvmPauseMonitor pauseMonitor = new JvmPauseMonitor(conf);
+ pauseMonitor.start();
+ } catch (Throwable t) {
+ LOG.warn("Could not initiate the JvmPauseMonitor thread." + " GCs and Pauses may not be " +
+ "warned upon.", t);
+ }
try {
// Per the javadocs on Condition, do not depend on the condition alone as a start gate