You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by sz...@apache.org on 2015/07/11 00:35:32 UTC
[1/4] hive git commit: HIVE-10944 : Fix HS2 for Metrics (Szehon,
reviewed by Sergey Shelukhin and Lenni Kuff)
Repository: hive
Updated Branches:
refs/heads/branch-1 2d49e5ad2 -> ad803d794
HIVE-10944 : Fix HS2 for Metrics (Szehon, reviewed by Sergey Shelukhin and Lenni Kuff)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/5553fbdf
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/5553fbdf
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/5553fbdf
Branch: refs/heads/branch-1
Commit: 5553fbdff75427a4ee61ea010ff2a11ae356a5cf
Parents: bd84e87
Author: Szehon Ho <sz...@cloudera.com>
Authored: Fri Jun 12 16:32:26 2015 -0700
Committer: Szehon Ho <sz...@cloudera.com>
Committed: Fri Jul 10 15:22:56 2015 -0700
----------------------------------------------------------------------
.../hadoop/hive/common/JvmPauseMonitor.java | 12 +++--
.../hive/common/metrics/LegacyMetrics.java | 51 ++++----------------
.../hive/common/metrics/common/Metrics.java | 8 +--
.../common/metrics/common/MetricsFactory.java | 30 +++++++++---
.../metrics/metrics2/CodahaleMetrics.java | 41 +++++-----------
.../org/apache/hadoop/hive/conf/HiveConf.java | 2 +-
.../hive/common/metrics/TestLegacyMetrics.java | 6 +--
.../metrics/metrics2/TestCodahaleMetrics.java | 16 +++---
.../hadoop/hive/metastore/HiveMetaStore.java | 10 ++--
.../apache/hive/service/server/HiveServer2.java | 6 +--
10 files changed, 72 insertions(+), 110 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/5553fbdf/common/src/java/org/apache/hadoop/hive/common/JvmPauseMonitor.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/common/JvmPauseMonitor.java b/common/src/java/org/apache/hadoop/hive/common/JvmPauseMonitor.java
index c3949f2..ec5ac4a 100644
--- a/common/src/java/org/apache/hadoop/hive/common/JvmPauseMonitor.java
+++ b/common/src/java/org/apache/hadoop/hive/common/JvmPauseMonitor.java
@@ -26,6 +26,7 @@ import com.google.common.collect.Sets;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.common.metrics.common.Metrics;
import org.apache.hadoop.hive.common.metrics.common.MetricsFactory;
import org.apache.hadoop.util.Daemon;
@@ -199,10 +200,13 @@ public class JvmPauseMonitor {
}
private void incrementMetricsCounter(String name, long count) {
- try {
- MetricsFactory.getMetricsInstance().incrementCounter(name, count);
- } catch (Exception e) {
- LOG.warn("Error Reporting JvmPauseMonitor to Metrics system", e);
+ Metrics metrics = MetricsFactory.getInstance();
+ if (metrics != null) {
+ try {
+ metrics.incrementCounter(name, count);
+ } catch (Exception e) {
+ LOG.warn("Error Reporting JvmPauseMonitor to Metrics system", e);
+ }
}
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/5553fbdf/common/src/java/org/apache/hadoop/hive/common/metrics/LegacyMetrics.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/common/metrics/LegacyMetrics.java b/common/src/java/org/apache/hadoop/hive/common/metrics/LegacyMetrics.java
index 14f7afb..e811339 100644
--- a/common/src/java/org/apache/hadoop/hive/common/metrics/LegacyMetrics.java
+++ b/common/src/java/org/apache/hadoop/hive/common/metrics/LegacyMetrics.java
@@ -149,7 +149,6 @@ public class LegacyMetrics implements Metrics {
}
}
-
private static final ThreadLocal<HashMap<String, MetricsScope>> threadLocalScopes
= new ThreadLocal<HashMap<String,MetricsScope>>() {
@Override
@@ -158,31 +157,16 @@ public class LegacyMetrics implements Metrics {
}
};
- private boolean initialized = false;
-
- public void init(HiveConf conf) throws Exception {
- if (!initialized) {
- MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
- mbs.registerMBean(metrics, oname);
- initialized = true;
- }
- }
-
- public boolean isInitialized() {
- return initialized;
+ public LegacyMetrics(HiveConf conf) throws Exception {
+ MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
+ mbs.registerMBean(metrics, oname);
}
public Long incrementCounter(String name) throws IOException{
- if (!initialized) {
- return null;
- }
return incrementCounter(name,Long.valueOf(1));
}
public Long incrementCounter(String name, long increment) throws IOException{
- if (!initialized) {
- return null;
- }
Long value;
synchronized(metrics) {
if (!metrics.hasKey(name)) {
@@ -197,23 +181,14 @@ public class LegacyMetrics implements Metrics {
}
public void set(String name, Object value) throws IOException{
- if (!initialized) {
- return;
- }
metrics.put(name,value);
}
public Object get(String name) throws IOException{
- if (!initialized) {
- return null;
- }
return metrics.get(name);
}
public void startScope(String name) throws IOException{
- if (!initialized) {
- return;
- }
if (threadLocalScopes.get().containsKey(name)) {
threadLocalScopes.get().get(name).open();
} else {
@@ -222,9 +197,6 @@ public class LegacyMetrics implements Metrics {
}
public MetricsScope getScope(String name) throws IOException {
- if (!initialized) {
- return null;
- }
if (threadLocalScopes.get().containsKey(name)) {
return threadLocalScopes.get().get(name);
} else {
@@ -233,9 +205,6 @@ public class LegacyMetrics implements Metrics {
}
public void endScope(String name) throws IOException{
- if (!initialized) {
- return;
- }
if (threadLocalScopes.get().containsKey(name)) {
threadLocalScopes.get().get(name).close();
}
@@ -247,16 +216,14 @@ public class LegacyMetrics implements Metrics {
*
* Note that threadLocalScopes ThreadLocal is *not* cleared in this call.
*/
- public void deInit() throws Exception {
+ public void close() throws Exception {
synchronized (metrics) {
- if (initialized) {
- MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
- if (mbs.isRegistered(oname)) {
- mbs.unregisterMBean(oname);
- }
- metrics.clear();
- initialized = false;
+ MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
+ if (mbs.isRegistered(oname)) {
+ mbs.unregisterMBean(oname);
}
+ metrics.clear();
+ threadLocalScopes.remove();
}
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/5553fbdf/common/src/java/org/apache/hadoop/hive/common/metrics/common/Metrics.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/common/metrics/common/Metrics.java b/common/src/java/org/apache/hadoop/hive/common/metrics/common/Metrics.java
index 13a5336..27b69cc 100644
--- a/common/src/java/org/apache/hadoop/hive/common/metrics/common/Metrics.java
+++ b/common/src/java/org/apache/hadoop/hive/common/metrics/common/Metrics.java
@@ -28,16 +28,12 @@ import java.io.IOException;
*/
public interface Metrics {
- /**
- * Initialize Metrics system with given Hive configuration.
- * @param conf
- */
- public void init(HiveConf conf) throws Exception;
+ //Must declare CTOR taking in HiveConf.
/**
* Deinitializes the Metrics system.
*/
- public void deInit() throws Exception;
+ public void close() throws Exception;
/**
* @param name
http://git-wip-us.apache.org/repos/asf/hive/blob/5553fbdf/common/src/java/org/apache/hadoop/hive/common/metrics/common/MetricsFactory.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/common/metrics/common/MetricsFactory.java b/common/src/java/org/apache/hadoop/hive/common/metrics/common/MetricsFactory.java
index 12a309d..8769d68 100644
--- a/common/src/java/org/apache/hadoop/hive/common/metrics/common/MetricsFactory.java
+++ b/common/src/java/org/apache/hadoop/hive/common/metrics/common/MetricsFactory.java
@@ -20,29 +20,43 @@ package org.apache.hadoop.hive.common.metrics.common;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.util.ReflectionUtils;
+import java.lang.reflect.Constructor;
+
/**
* Class that manages a static Metric instance for this process.
*/
public class MetricsFactory {
- private static Metrics metrics;
- private static Object initLock = new Object();
+ //Volatile ensures that static access returns Metrics instance in fully-initialized state.
+ //Alternative is to synchronize static access, which has performance penalties.
+ private volatile static Metrics metrics;
+ /**
+ * Initializes static Metrics instance.
+ */
public synchronized static void init(HiveConf conf) throws Exception {
if (metrics == null) {
- metrics = (Metrics) ReflectionUtils.newInstance(conf.getClassByName(
- conf.getVar(HiveConf.ConfVars.HIVE_METRICS_CLASS)), conf);
+ Class metricsClass = conf.getClassByName(
+ conf.getVar(HiveConf.ConfVars.HIVE_METRICS_CLASS));
+ Constructor constructor = metricsClass.getConstructor(HiveConf.class);
+ metrics = (Metrics) constructor.newInstance(conf);
}
- metrics.init(conf);
}
- public synchronized static Metrics getMetricsInstance() {
+ /**
+ * Returns static Metrics instance, null if not initialized or closed.
+ */
+ public static Metrics getInstance() {
return metrics;
}
- public synchronized static void deInit() throws Exception {
+ /**
+ * Closes and removes static Metrics instance.
+ */
+ public synchronized static void close() throws Exception {
if (metrics != null) {
- metrics.deInit();
+ metrics.close();
+ metrics = null;
}
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/5553fbdf/common/src/java/org/apache/hadoop/hive/common/metrics/metrics2/CodahaleMetrics.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/common/metrics/metrics2/CodahaleMetrics.java b/common/src/java/org/apache/hadoop/hive/common/metrics/metrics2/CodahaleMetrics.java
index e59da99..ae353d0 100644
--- a/common/src/java/org/apache/hadoop/hive/common/metrics/metrics2/CodahaleMetrics.java
+++ b/common/src/java/org/apache/hadoop/hive/common/metrics/metrics2/CodahaleMetrics.java
@@ -77,7 +77,6 @@ public class CodahaleMetrics implements org.apache.hadoop.hive.common.metrics.co
private LoadingCache<String, Timer> timers;
private LoadingCache<String, Counter> counters;
- private boolean initialized = false;
private HiveConf conf;
private final Set<Closeable> reporters = new HashSet<Closeable>();
@@ -139,11 +138,7 @@ public class CodahaleMetrics implements org.apache.hadoop.hive.common.metrics.co
}
}
- public synchronized void init(HiveConf conf) throws Exception {
- if (initialized) {
- return;
- }
-
+ public CodahaleMetrics(HiveConf conf) throws Exception {
this.conf = conf;
//Codahale artifacts are lazily-created.
timers = CacheBuilder.newBuilder().build(
@@ -190,32 +185,23 @@ public class CodahaleMetrics implements org.apache.hadoop.hive.common.metrics.co
}
}
initReporting(finalReporterList);
- initialized = true;
}
- public synchronized void deInit() throws Exception {
- if (initialized) {
- if (reporters != null) {
- for (Closeable reporter : reporters) {
- reporter.close();
- }
+ public void close() throws Exception {
+ if (reporters != null) {
+ for (Closeable reporter : reporters) {
+ reporter.close();
}
- for (Map.Entry<String, Metric> metric : metricRegistry.getMetrics().entrySet()) {
- metricRegistry.remove(metric.getKey());
- }
- timers.invalidateAll();
- counters.invalidateAll();
- initialized = false;
}
+ for (Map.Entry<String, Metric> metric : metricRegistry.getMetrics().entrySet()) {
+ metricRegistry.remove(metric.getKey());
+ }
+ timers.invalidateAll();
+ counters.invalidateAll();
}
public void startScope(String name) throws IOException {
- synchronized (this) {
- if (!initialized) {
- return;
- }
- }
name = API_PREFIX + name;
if (threadLocalScopes.get().containsKey(name)) {
threadLocalScopes.get().get(name).open();
@@ -224,12 +210,7 @@ public class CodahaleMetrics implements org.apache.hadoop.hive.common.metrics.co
}
}
- public void endScope(String name) throws IOException{
- synchronized (this) {
- if (!initialized) {
- return;
- }
- }
+ public void endScope(String name) throws IOException {
name = API_PREFIX + name;
if (threadLocalScopes.get().containsKey(name)) {
threadLocalScopes.get().get(name).close();
http://git-wip-us.apache.org/repos/asf/hive/blob/5553fbdf/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index f40d159..75c2301 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -1725,7 +1725,7 @@ public class HiveConf extends Configuration {
"Hive metrics subsystem implementation class."),
HIVE_METRICS_REPORTER("hive.service.metrics.reporter", "JSON_FILE, JMX",
"Reporter type for metric class org.apache.hadoop.hive.common.metrics.metrics2.CodahaleMetrics, comma separated list of JMX, CONSOLE, JSON_FILE"),
- HIVE_METRICS_JSON_FILE_LOCATION("hive.service.metrics.file.location", "file:///tmp/my-logging.properties",
+ HIVE_METRICS_JSON_FILE_LOCATION("hive.service.metrics.file.location", "file:///tmp/report.json",
"For metric class org.apache.hadoop.hive.common.metrics.metrics2.CodahaleMetrics JSON_FILE reporter, the location of JSON metrics file. " +
"This file will get overwritten at every interval."),
HIVE_METRICS_JSON_FILE_INTERVAL("hive.service.metrics.file.frequency", "5s",
http://git-wip-us.apache.org/repos/asf/hive/blob/5553fbdf/common/src/test/org/apache/hadoop/hive/common/metrics/TestLegacyMetrics.java
----------------------------------------------------------------------
diff --git a/common/src/test/org/apache/hadoop/hive/common/metrics/TestLegacyMetrics.java b/common/src/test/org/apache/hadoop/hive/common/metrics/TestLegacyMetrics.java
index c14c7ee..c3e8282 100644
--- a/common/src/test/org/apache/hadoop/hive/common/metrics/TestLegacyMetrics.java
+++ b/common/src/test/org/apache/hadoop/hive/common/metrics/TestLegacyMetrics.java
@@ -47,16 +47,16 @@ public class TestLegacyMetrics {
@Before
public void before() throws Exception {
- MetricsFactory.deInit();
+ MetricsFactory.close();
HiveConf conf = new HiveConf();
conf.setVar(HiveConf.ConfVars.HIVE_METRICS_CLASS, LegacyMetrics.class.getCanonicalName());
MetricsFactory.init(conf);
- metrics = (LegacyMetrics) MetricsFactory.getMetricsInstance();
+ metrics = (LegacyMetrics) MetricsFactory.getInstance();
}
@After
public void after() throws Exception {
- MetricsFactory.deInit();
+ MetricsFactory.close();
}
@Test
http://git-wip-us.apache.org/repos/asf/hive/blob/5553fbdf/common/src/test/org/apache/hadoop/hive/common/metrics/metrics2/TestCodahaleMetrics.java
----------------------------------------------------------------------
diff --git a/common/src/test/org/apache/hadoop/hive/common/metrics/metrics2/TestCodahaleMetrics.java b/common/src/test/org/apache/hadoop/hive/common/metrics/metrics2/TestCodahaleMetrics.java
index 8749349..954b388 100644
--- a/common/src/test/org/apache/hadoop/hive/common/metrics/metrics2/TestCodahaleMetrics.java
+++ b/common/src/test/org/apache/hadoop/hive/common/metrics/metrics2/TestCodahaleMetrics.java
@@ -63,20 +63,20 @@ public class TestCodahaleMetrics {
conf.setVar(HiveConf.ConfVars.HIVE_METRICS_JSON_FILE_INTERVAL, "100ms");
MetricsFactory.init(conf);
- metricRegistry = ((CodahaleMetrics) MetricsFactory.getMetricsInstance()).getMetricRegistry();
+ metricRegistry = ((CodahaleMetrics) MetricsFactory.getInstance()).getMetricRegistry();
}
@After
public void after() throws Exception {
- MetricsFactory.deInit();
+ MetricsFactory.close();
}
@Test
public void testScope() throws Exception {
int runs = 5;
for (int i = 0; i < runs; i++) {
- MetricsFactory.getMetricsInstance().startScope("method1");
- MetricsFactory.getMetricsInstance().endScope("method1");
+ MetricsFactory.getInstance().startScope("method1");
+ MetricsFactory.getInstance().endScope("method1");
}
Timer timer = metricRegistry.getTimers().get("api_method1");
@@ -89,7 +89,7 @@ public class TestCodahaleMetrics {
public void testCount() throws Exception {
int runs = 5;
for (int i = 0; i < runs; i++) {
- MetricsFactory.getMetricsInstance().incrementCounter("count1");
+ MetricsFactory.getInstance().incrementCounter("count1");
}
Counter counter = metricRegistry.getCounters().get("count1");
Assert.assertEquals(5L, counter.getCount());
@@ -104,8 +104,8 @@ public class TestCodahaleMetrics {
executorService.submit(new Callable<Void>() {
@Override
public Void call() throws Exception {
- MetricsFactory.getMetricsInstance().startScope("method2");
- MetricsFactory.getMetricsInstance().endScope("method2");
+ MetricsFactory.getInstance().startScope("method2");
+ MetricsFactory.getInstance().endScope("method2");
return null;
}
});
@@ -121,7 +121,7 @@ public class TestCodahaleMetrics {
public void testFileReporting() throws Exception {
int runs = 5;
for (int i = 0; i < runs; i++) {
- MetricsFactory.getMetricsInstance().incrementCounter("count2");
+ MetricsFactory.getInstance().incrementCounter("count2");
Thread.sleep(100);
}
http://git-wip-us.apache.org/repos/asf/hive/blob/5553fbdf/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
index 828c585..2cbe3c0 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
@@ -748,9 +748,9 @@ public class HiveMetaStore extends ThriftHiveMetastore {
incrementCounter(function);
logInfo((getIpAddress() == null ? "" : "source:" + getIpAddress() + " ") +
function + extraLogInfo);
- if (hiveConf.getBoolVar(ConfVars.METASTORE_METRICS)) {
+ if (MetricsFactory.getInstance() != null) {
try {
- MetricsFactory.getMetricsInstance().startScope(function);
+ MetricsFactory.getInstance().startScope(function);
} catch (IOException e) {
LOG.debug("Exception when starting metrics scope"
+ e.getClass().getName() + " " + e.getMessage(), e);
@@ -792,9 +792,9 @@ public class HiveMetaStore extends ThriftHiveMetastore {
}
private void endFunction(String function, MetaStoreEndFunctionContext context) {
- if (hiveConf.getBoolVar(ConfVars.METASTORE_METRICS)) {
+ if (MetricsFactory.getInstance() != null) {
try {
- MetricsFactory.getMetricsInstance().endScope(function);
+ MetricsFactory.getInstance().endScope(function);
} catch (IOException e) {
LOG.debug("Exception when closing metrics scope" + e);
}
@@ -823,7 +823,7 @@ public class HiveMetaStore extends ThriftHiveMetastore {
}
if (hiveConf.getBoolVar(ConfVars.METASTORE_METRICS)) {
try {
- MetricsFactory.deInit();
+ MetricsFactory.close();
} catch (Exception e) {
LOG.error("error in Metrics deinit: " + e.getClass().getName() + " "
+ e.getMessage(), e);
http://git-wip-us.apache.org/repos/asf/hive/blob/5553fbdf/service/src/java/org/apache/hive/service/server/HiveServer2.java
----------------------------------------------------------------------
diff --git a/service/src/java/org/apache/hive/service/server/HiveServer2.java b/service/src/java/org/apache/hive/service/server/HiveServer2.java
index 4fecb3c..f0d9e6f 100644
--- a/service/src/java/org/apache/hive/service/server/HiveServer2.java
+++ b/service/src/java/org/apache/hive/service/server/HiveServer2.java
@@ -308,9 +308,9 @@ public class HiveServer2 extends CompositeService {
HiveConf hiveConf = this.getHiveConf();
super.stop();
// Shutdown Metrics
- if (hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_METRICS_ENABLED)) {
+ if (MetricsFactory.getInstance() != null) {
try {
- MetricsFactory.getMetricsInstance().deInit();
+ MetricsFactory.close();
} catch (Exception e) {
LOG.error("error in Metrics deinit: " + e.getClass().getName() + " "
+ e.getMessage(), e);
@@ -359,7 +359,7 @@ public class HiveServer2 extends CompositeService {
server.start();
if (hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_METRICS_ENABLED)) {
- MetricsFactory.getMetricsInstance().init(hiveConf);
+ MetricsFactory.init(hiveConf);
}
try {
JvmPauseMonitor pauseMonitor = new JvmPauseMonitor(hiveConf);
[3/4] hive git commit: HIVE-10761 : Create codahale-based metrics
system for Hive (Szehon, reviewed by Xuefu)
Posted by sz...@apache.org.
HIVE-10761 : Create codahale-based metrics system for Hive (Szehon, reviewed by Xuefu)
Conflicts:
pom.xml
service/src/java/org/apache/hive/service/server/HiveServer2.java
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/bd84e87c
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/bd84e87c
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/bd84e87c
Branch: refs/heads/branch-1
Commit: bd84e87c880ac389c9108916aada0da06d94bf9b
Parents: 2d49e5a
Author: Szehon Ho <sz...@cloudera.com>
Authored: Wed Jun 3 23:46:28 2015 -0700
Committer: Szehon Ho <sz...@cloudera.com>
Committed: Fri Jul 10 15:22:56 2015 -0700
----------------------------------------------------------------------
common/pom.xml | 20 +
.../hadoop/hive/common/JvmPauseMonitor.java | 225 ++++++++++++
.../hive/common/metrics/LegacyMetrics.java | 262 +++++++++++++
.../hadoop/hive/common/metrics/Metrics.java | 253 -------------
.../hive/common/metrics/common/Metrics.java | 68 ++++
.../common/metrics/common/MetricsFactory.java | 48 +++
.../metrics/metrics2/CodahaleMetrics.java | 366 +++++++++++++++++++
.../metrics/metrics2/MetricsReporting.java | 27 ++
.../org/apache/hadoop/hive/conf/HiveConf.java | 18 +-
.../hive/common/metrics/TestLegacyMetrics.java | 295 +++++++++++++++
.../hadoop/hive/common/metrics/TestMetrics.java | 286 ---------------
.../metrics/metrics2/TestCodahaleMetrics.java | 138 +++++++
.../hive/metastore/TestMetaStoreMetrics.java | 94 +++++
.../hadoop/hive/metastore/HiveMetaStore.java | 132 ++++---
pom.xml | 3 +
.../apache/hive/service/server/HiveServer2.java | 25 +-
.../hadoop/hive/shims/Hadoop20SShims.java | 5 -
.../apache/hadoop/hive/shims/Hadoop23Shims.java | 13 -
.../apache/hadoop/hive/shims/HadoopShims.java | 2 -
19 files changed, 1665 insertions(+), 615 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/bd84e87c/common/pom.xml
----------------------------------------------------------------------
diff --git a/common/pom.xml b/common/pom.xml
index a615c1e..8d4b1ea 100644
--- a/common/pom.xml
+++ b/common/pom.xml
@@ -98,6 +98,26 @@
<artifactId>json</artifactId>
<version>${json.version}</version>
</dependency>
+ <dependency>
+ <groupId>io.dropwizard.metrics</groupId>
+ <artifactId>metrics-core</artifactId>
+ <version>${dropwizard.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>io.dropwizard.metrics</groupId>
+ <artifactId>metrics-jvm</artifactId>
+ <version>${dropwizard.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>io.dropwizard.metrics</groupId>
+ <artifactId>metrics-json</artifactId>
+ <version>${dropwizard.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-databind</artifactId>
+ <version>${jackson.new.version}</version>
+ </dependency>
</dependencies>
<profiles>
http://git-wip-us.apache.org/repos/asf/hive/blob/bd84e87c/common/src/java/org/apache/hadoop/hive/common/JvmPauseMonitor.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/common/JvmPauseMonitor.java b/common/src/java/org/apache/hadoop/hive/common/JvmPauseMonitor.java
new file mode 100644
index 0000000..c3949f2
--- /dev/null
+++ b/common/src/java/org/apache/hadoop/hive/common/JvmPauseMonitor.java
@@ -0,0 +1,225 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.common;
+
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Stopwatch;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.common.metrics.common.MetricsFactory;
+import org.apache.hadoop.util.Daemon;
+
+import java.lang.management.GarbageCollectorMXBean;
+import java.lang.management.ManagementFactory;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Based on the JvmPauseMonitor from Hadoop.
+ */
+public class JvmPauseMonitor {
+ private static final Log LOG = LogFactory.getLog(
+ JvmPauseMonitor.class);
+
+ /** The target sleep time */
+ private static final long SLEEP_INTERVAL_MS = 500;
+
+ /** log WARN if we detect a pause longer than this threshold */
+ private final long warnThresholdMs;
+ private static final String WARN_THRESHOLD_KEY =
+ "jvm.pause.warn-threshold.ms";
+ private static final long WARN_THRESHOLD_DEFAULT = 10000;
+
+ /** log INFO if we detect a pause longer than this threshold */
+ private final long infoThresholdMs;
+ private static final String INFO_THRESHOLD_KEY =
+ "jvm.pause.info-threshold.ms";
+ private static final long INFO_THRESHOLD_DEFAULT = 1000;
+
+ private long numGcWarnThresholdExceeded = 0;
+ private long numGcInfoThresholdExceeded = 0;
+ private long totalGcExtraSleepTime = 0;
+
+ private Thread monitorThread;
+ private volatile boolean shouldRun = true;
+
+ public JvmPauseMonitor(Configuration conf) {
+ this.warnThresholdMs = conf.getLong(WARN_THRESHOLD_KEY, WARN_THRESHOLD_DEFAULT);
+ this.infoThresholdMs = conf.getLong(INFO_THRESHOLD_KEY, INFO_THRESHOLD_DEFAULT);
+ }
+
+ public void start() {
+ Preconditions.checkState(monitorThread == null,
+ "JvmPauseMonitor thread is Already started");
+ monitorThread = new Daemon(new Monitor());
+ monitorThread.start();
+ }
+
+ public void stop() {
+ shouldRun = false;
+ if (isStarted()) {
+ monitorThread.interrupt();
+ try {
+ monitorThread.join();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ }
+ }
+
+ public boolean isStarted() {
+ return monitorThread != null;
+ }
+
+ public long getNumGcWarnThreadholdExceeded() {
+ return numGcWarnThresholdExceeded;
+ }
+
+ public long getNumGcInfoThresholdExceeded() {
+ return numGcInfoThresholdExceeded;
+ }
+
+ public long getTotalGcExtraSleepTime() {
+ return totalGcExtraSleepTime;
+ }
+
+ private String formatMessage(long extraSleepTime,
+ Map<String, GcTimes> gcTimesAfterSleep,
+ Map<String, GcTimes> gcTimesBeforeSleep) {
+
+ Set<String> gcBeanNames = Sets.intersection(
+ gcTimesAfterSleep.keySet(),
+ gcTimesBeforeSleep.keySet());
+ List<String> gcDiffs = Lists.newArrayList();
+ for (String name : gcBeanNames) {
+ GcTimes diff = gcTimesAfterSleep.get(name).subtract(
+ gcTimesBeforeSleep.get(name));
+ if (diff.gcCount != 0) {
+ gcDiffs.add("GC pool '" + name + "' had collection(s): " +
+ diff.toString());
+ }
+ }
+
+ String ret = "Detected pause in JVM or host machine (eg GC): " +
+ "pause of approximately " + extraSleepTime + "ms\n";
+ if (gcDiffs.isEmpty()) {
+ ret += "No GCs detected";
+ } else {
+ ret += Joiner.on("\n").join(gcDiffs);
+ }
+ return ret;
+ }
+
+ private Map<String, GcTimes> getGcTimes() {
+ Map<String, GcTimes> map = Maps.newHashMap();
+ List<GarbageCollectorMXBean> gcBeans =
+ ManagementFactory.getGarbageCollectorMXBeans();
+ for (GarbageCollectorMXBean gcBean : gcBeans) {
+ map.put(gcBean.getName(), new GcTimes(gcBean));
+ }
+ return map;
+ }
+
+ private static class GcTimes {
+ private GcTimes(GarbageCollectorMXBean gcBean) {
+ gcCount = gcBean.getCollectionCount();
+ gcTimeMillis = gcBean.getCollectionTime();
+ }
+
+ private GcTimes(long count, long time) {
+ this.gcCount = count;
+ this.gcTimeMillis = time;
+ }
+
+ private GcTimes subtract(GcTimes other) {
+ return new GcTimes(this.gcCount - other.gcCount,
+ this.gcTimeMillis - other.gcTimeMillis);
+ }
+
+ @Override
+ public String toString() {
+ return "count=" + gcCount + " time=" + gcTimeMillis + "ms";
+ }
+
+ private long gcCount;
+ private long gcTimeMillis;
+ }
+
+ private class Monitor implements Runnable {
+ @Override
+ public void run() {
+ Stopwatch sw = new Stopwatch();
+ Map<String, GcTimes> gcTimesBeforeSleep = getGcTimes();
+ while (shouldRun) {
+ sw.reset().start();
+ try {
+ Thread.sleep(SLEEP_INTERVAL_MS);
+ } catch (InterruptedException ie) {
+ return;
+ }
+ long extraSleepTime = sw.elapsedMillis() - SLEEP_INTERVAL_MS;
+ Map<String, GcTimes> gcTimesAfterSleep = getGcTimes();
+
+ if (extraSleepTime > warnThresholdMs) {
+ ++numGcWarnThresholdExceeded;
+ LOG.warn(formatMessage(
+ extraSleepTime, gcTimesAfterSleep, gcTimesBeforeSleep));
+ incrementMetricsCounter("jvm.pause.warn-threshold", 1);
+ } else if (extraSleepTime > infoThresholdMs) {
+ ++numGcInfoThresholdExceeded;
+ LOG.info(formatMessage(
+ extraSleepTime, gcTimesAfterSleep, gcTimesBeforeSleep));
+ incrementMetricsCounter("jvm.pause.info-threshold", 1);
+ }
+ incrementMetricsCounter("jvm.pause.extraSleepTime", extraSleepTime);
+ totalGcExtraSleepTime += extraSleepTime;
+ gcTimesBeforeSleep = gcTimesAfterSleep;
+ }
+ }
+
+ private void incrementMetricsCounter(String name, long count) {
+ try {
+ MetricsFactory.getMetricsInstance().incrementCounter(name, count);
+ } catch (Exception e) {
+ LOG.warn("Error Reporting JvmPauseMonitor to Metrics system", e);
+ }
+ }
+ }
+
+ /**
+ * Simple 'main' to facilitate manual testing of the pause monitor.
+ *
+ * This main function just leaks memory into a list. Running this class
+ * with a 1GB heap will very quickly go into "GC hell" and result in
+ * log messages about the GC pauses.
+ */
+ public static void main(String []args) throws Exception {
+ new JvmPauseMonitor(new Configuration()).start();
+ List<String> list = Lists.newArrayList();
+ int i = 0;
+ while (true) {
+ list.add(String.valueOf(i++));
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/bd84e87c/common/src/java/org/apache/hadoop/hive/common/metrics/LegacyMetrics.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/common/metrics/LegacyMetrics.java b/common/src/java/org/apache/hadoop/hive/common/metrics/LegacyMetrics.java
new file mode 100644
index 0000000..14f7afb
--- /dev/null
+++ b/common/src/java/org/apache/hadoop/hive/common/metrics/LegacyMetrics.java
@@ -0,0 +1,262 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.common.metrics;
+
+import org.apache.hadoop.hive.common.metrics.common.Metrics;
+import org.apache.hadoop.hive.conf.HiveConf;
+
+import java.io.IOException;
+import java.lang.management.ManagementFactory;
+import java.util.HashMap;
+
+import javax.management.MBeanServer;
+import javax.management.MalformedObjectNameException;
+import javax.management.ObjectName;
+
+/**
+ * This class may eventually get superseded by org.apache.hadoop.hive.common.metrics2.Metrics.
+ *
+ * Metrics Subsystem - allows exposure of a number of named parameters/counters
+ * via jmx, intended to be used as a static subsystem
+ *
+ * Has a couple of primary ways it can be used:
+ * (i) Using the set and get methods to set and get named parameters
+ * (ii) Using the incrementCounter method to increment and set named
+ * parameters in one go, rather than having to make a get and then a set.
+ * (iii) Using the startScope and endScope methods to start and end
+ * named "scopes" that record the number of times they've been
+ * instantiated and amount of time(in milliseconds) spent inside
+ * the scopes.
+ */
+public class LegacyMetrics implements Metrics {
+
+ private LegacyMetrics() {
+ // block
+ }
+
+ /**
+ * MetricsScope : A class that encapsulates an idea of a metered scope.
+ * Instantiating a named scope and then closing it exposes two counters:
+ * (i) a "number of calls" counter ( <name>.n ), and
+ * (ii) a "number of msecs spent between scope open and close" counter. ( <name>.t)
+ */
+ public static class MetricsScope {
+
+ final LegacyMetrics metrics;
+
+ final String name;
+ final String numCounter;
+ final String timeCounter;
+ final String avgTimeCounter;
+
+ private boolean isOpen = false;
+ private Long startTime = null;
+
+ /**
+ * Instantiates a named scope - intended to only be called by Metrics, so locally scoped.
+ * @param name - name of the variable
+ * @throws IOException
+ */
+ private MetricsScope(String name, LegacyMetrics metrics) throws IOException {
+ this.metrics = metrics;
+ this.name = name;
+ this.numCounter = name + ".n";
+ this.timeCounter = name + ".t";
+ this.avgTimeCounter = name + ".avg_t";
+ open();
+ }
+
+ public Long getNumCounter() throws IOException {
+ return (Long) metrics.get(numCounter);
+ }
+
+ public Long getTimeCounter() throws IOException {
+ return (Long) metrics.get(timeCounter);
+ }
+
+ /**
+ * Opens scope, and makes note of the time started, increments run counter
+ * @throws IOException
+ *
+ */
+ public void open() throws IOException {
+ if (!isOpen) {
+ isOpen = true;
+ startTime = System.currentTimeMillis();
+ } else {
+ throw new IOException("Scope named " + name + " is not closed, cannot be opened.");
+ }
+ }
+
+ /**
+ * Closes scope, and records the time taken
+ * @throws IOException
+ */
+ public void close() throws IOException {
+ if (isOpen) {
+ Long endTime = System.currentTimeMillis();
+ synchronized(metrics) {
+ Long num = metrics.incrementCounter(numCounter);
+ Long time = metrics.incrementCounter(timeCounter, endTime - startTime);
+ if (num != null && time != null) {
+ metrics.set(avgTimeCounter, Double.valueOf(time.doubleValue() / num.doubleValue()));
+ }
+ }
+ } else {
+ throw new IOException("Scope named " + name + " is not open, cannot be closed.");
+ }
+ isOpen = false;
+ }
+
+
+ /**
+ * Closes scope if open, and reopens it
+ * @throws IOException
+ */
+ public void reopen() throws IOException {
+ if(isOpen) {
+ close();
+ }
+ open();
+ }
+
+ }
+
+ private static final MetricsMBean metrics = new MetricsMBeanImpl();
+
+ private static final ObjectName oname;
+ static {
+ try {
+ oname = new ObjectName(
+ "org.apache.hadoop.hive.common.metrics:type=MetricsMBean");
+ } catch (MalformedObjectNameException mone) {
+ throw new RuntimeException(mone);
+ }
+ }
+
+
+ private static final ThreadLocal<HashMap<String, MetricsScope>> threadLocalScopes
+ = new ThreadLocal<HashMap<String,MetricsScope>>() {
+ @Override
+ protected HashMap<String,MetricsScope> initialValue() {
+ return new HashMap<String,MetricsScope>();
+ }
+ };
+
+ private boolean initialized = false;
+
+ public void init(HiveConf conf) throws Exception {
+ if (!initialized) {
+ MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
+ mbs.registerMBean(metrics, oname);
+ initialized = true;
+ }
+ }
+
+ public boolean isInitialized() {
+ return initialized;
+ }
+
+ public Long incrementCounter(String name) throws IOException{
+ if (!initialized) {
+ return null;
+ }
+ return incrementCounter(name,Long.valueOf(1));
+ }
+
+ public Long incrementCounter(String name, long increment) throws IOException{
+ if (!initialized) {
+ return null;
+ }
+ Long value;
+ synchronized(metrics) {
+ if (!metrics.hasKey(name)) {
+ value = Long.valueOf(increment);
+ set(name, value);
+ } else {
+ value = ((Long)get(name)) + increment;
+ set(name, value);
+ }
+ }
+ return value;
+ }
+
+ public void set(String name, Object value) throws IOException{
+ if (!initialized) {
+ return;
+ }
+ metrics.put(name,value);
+ }
+
+ public Object get(String name) throws IOException{
+ if (!initialized) {
+ return null;
+ }
+ return metrics.get(name);
+ }
+
+ public void startScope(String name) throws IOException{
+ if (!initialized) {
+ return;
+ }
+ if (threadLocalScopes.get().containsKey(name)) {
+ threadLocalScopes.get().get(name).open();
+ } else {
+ threadLocalScopes.get().put(name, new MetricsScope(name, this));
+ }
+ }
+
+ public MetricsScope getScope(String name) throws IOException {
+ if (!initialized) {
+ return null;
+ }
+ if (threadLocalScopes.get().containsKey(name)) {
+ return threadLocalScopes.get().get(name);
+ } else {
+ throw new IOException("No metrics scope named " + name);
+ }
+ }
+
+ public void endScope(String name) throws IOException{
+ if (!initialized) {
+ return;
+ }
+ if (threadLocalScopes.get().containsKey(name)) {
+ threadLocalScopes.get().get(name).close();
+ }
+ }
+
+ /**
+ * Resets the static context state to initial.
+ * Used primarily for testing purposes.
+ *
+ * Note that threadLocalScopes ThreadLocal is *not* cleared in this call.
+ */
+ public void deInit() throws Exception {
+ synchronized (metrics) {
+ if (initialized) {
+ MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
+ if (mbs.isRegistered(oname)) {
+ mbs.unregisterMBean(oname);
+ }
+ metrics.clear();
+ initialized = false;
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/bd84e87c/common/src/java/org/apache/hadoop/hive/common/metrics/Metrics.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/common/metrics/Metrics.java b/common/src/java/org/apache/hadoop/hive/common/metrics/Metrics.java
deleted file mode 100644
index 01c9d1d..0000000
--- a/common/src/java/org/apache/hadoop/hive/common/metrics/Metrics.java
+++ /dev/null
@@ -1,253 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hive.common.metrics;
-
-import java.io.IOException;
-import java.lang.management.ManagementFactory;
-import java.util.HashMap;
-
-import javax.management.MBeanServer;
-import javax.management.MalformedObjectNameException;
-import javax.management.ObjectName;
-
-/**
- * Metrics Subsystem - allows exposure of a number of named parameters/counters
- * via jmx, intended to be used as a static subsystem
- *
- * Has a couple of primary ways it can be used:
- * (i) Using the set and get methods to set and get named parameters
- * (ii) Using the incrementCounter method to increment and set named
- * parameters in one go, rather than having to make a get and then a set.
- * (iii) Using the startScope and endScope methods to start and end
- * named "scopes" that record the number of times they've been
- * instantiated and amount of time(in milliseconds) spent inside
- * the scopes.
- */
-public class Metrics {
-
- private Metrics() {
- // block
- }
-
- /**
- * MetricsScope : A class that encapsulates an idea of a metered scope.
- * Instantiating a named scope and then closing it exposes two counters:
- * (i) a "number of calls" counter ( <name>.n ), and
- * (ii) a "number of msecs spent between scope open and close" counter. ( <name>.t)
- */
- public static class MetricsScope {
-
- final String name;
- final String numCounter;
- final String timeCounter;
- final String avgTimeCounter;
-
- private boolean isOpen = false;
- private Long startTime = null;
-
- /**
- * Instantiates a named scope - intended to only be called by Metrics, so locally scoped.
- * @param name - name of the variable
- * @throws IOException
- */
- private MetricsScope(String name) throws IOException {
- this.name = name;
- this.numCounter = name + ".n";
- this.timeCounter = name + ".t";
- this.avgTimeCounter = name + ".avg_t";
- open();
- }
-
- public Long getNumCounter() throws IOException {
- return (Long)Metrics.get(numCounter);
- }
-
- public Long getTimeCounter() throws IOException {
- return (Long)Metrics.get(timeCounter);
- }
-
- /**
- * Opens scope, and makes note of the time started, increments run counter
- * @throws IOException
- *
- */
- public void open() throws IOException {
- if (!isOpen) {
- isOpen = true;
- startTime = System.currentTimeMillis();
- } else {
- throw new IOException("Scope named " + name + " is not closed, cannot be opened.");
- }
- }
-
- /**
- * Closes scope, and records the time taken
- * @throws IOException
- */
- public void close() throws IOException {
- if (isOpen) {
- Long endTime = System.currentTimeMillis();
- synchronized(metrics) {
- Long num = Metrics.incrementCounter(numCounter);
- Long time = Metrics.incrementCounter(timeCounter, endTime - startTime);
- if (num != null && time != null) {
- Metrics.set(avgTimeCounter, Double.valueOf(time.doubleValue() / num.doubleValue()));
- }
- }
- } else {
- throw new IOException("Scope named " + name + " is not open, cannot be closed.");
- }
- isOpen = false;
- }
-
-
- /**
- * Closes scope if open, and reopens it
- * @throws IOException
- */
- public void reopen() throws IOException {
- if(isOpen) {
- close();
- }
- open();
- }
-
- }
-
- private static final MetricsMBean metrics = new MetricsMBeanImpl();
-
- private static final ObjectName oname;
- static {
- try {
- oname = new ObjectName(
- "org.apache.hadoop.hive.common.metrics:type=MetricsMBean");
- } catch (MalformedObjectNameException mone) {
- throw new RuntimeException(mone);
- }
- }
-
-
- private static final ThreadLocal<HashMap<String, MetricsScope>> threadLocalScopes
- = new ThreadLocal<HashMap<String,MetricsScope>>() {
- @Override
- protected HashMap<String,MetricsScope> initialValue() {
- return new HashMap<String,MetricsScope>();
- }
- };
-
- private static boolean initialized = false;
-
- public static void init() throws Exception {
- synchronized (metrics) {
- if (!initialized) {
- MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
- mbs.registerMBean(metrics, oname);
- initialized = true;
- }
- }
- }
-
- public static Long incrementCounter(String name) throws IOException{
- if (!initialized) {
- return null;
- }
- return incrementCounter(name,Long.valueOf(1));
- }
-
- public static Long incrementCounter(String name, long increment) throws IOException{
- if (!initialized) {
- return null;
- }
- Long value;
- synchronized(metrics) {
- if (!metrics.hasKey(name)) {
- value = Long.valueOf(increment);
- set(name, value);
- } else {
- value = ((Long)get(name)) + increment;
- set(name, value);
- }
- }
- return value;
- }
-
- public static void set(String name, Object value) throws IOException{
- if (!initialized) {
- return;
- }
- metrics.put(name,value);
- }
-
- public static Object get(String name) throws IOException{
- if (!initialized) {
- return null;
- }
- return metrics.get(name);
- }
-
- public static MetricsScope startScope(String name) throws IOException{
- if (!initialized) {
- return null;
- }
- if (threadLocalScopes.get().containsKey(name)) {
- threadLocalScopes.get().get(name).open();
- } else {
- threadLocalScopes.get().put(name, new MetricsScope(name));
- }
- return threadLocalScopes.get().get(name);
- }
-
- public static MetricsScope getScope(String name) throws IOException {
- if (!initialized) {
- return null;
- }
- if (threadLocalScopes.get().containsKey(name)) {
- return threadLocalScopes.get().get(name);
- } else {
- throw new IOException("No metrics scope named " + name);
- }
- }
-
- public static void endScope(String name) throws IOException{
- if (!initialized) {
- return;
- }
- if (threadLocalScopes.get().containsKey(name)) {
- threadLocalScopes.get().get(name).close();
- }
- }
-
- /**
- * Resets the static context state to initial.
- * Used primarily for testing purposes.
- *
- * Note that threadLocalScopes ThreadLocal is *not* cleared in this call.
- */
- static void uninit() throws Exception {
- synchronized (metrics) {
- if (initialized) {
- MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
- if (mbs.isRegistered(oname)) {
- mbs.unregisterMBean(oname);
- }
- metrics.clear();
- initialized = false;
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/hive/blob/bd84e87c/common/src/java/org/apache/hadoop/hive/common/metrics/common/Metrics.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/common/metrics/common/Metrics.java b/common/src/java/org/apache/hadoop/hive/common/metrics/common/Metrics.java
new file mode 100644
index 0000000..13a5336
--- /dev/null
+++ b/common/src/java/org/apache/hadoop/hive/common/metrics/common/Metrics.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.common.metrics.common;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+
+import java.io.IOException;
+
+/**
+ * Generic Metics interface.
+ */
+public interface Metrics {
+
+ /**
+ * Initialize Metrics system with given Hive configuration.
+ * @param conf
+ */
+ public void init(HiveConf conf) throws Exception;
+
+ /**
+ * Deinitializes the Metrics system.
+ */
+ public void deInit() throws Exception;
+
+ /**
+ * @param name
+ * @throws IOException
+ */
+ public void startScope(String name) throws IOException;
+
+ public void endScope(String name) throws IOException;
+
+ //Counter-related methods
+
+ /**
+ * Increments a counter of the given name by 1.
+ * @param name
+ * @return
+ * @throws IOException
+ */
+ public Long incrementCounter(String name) throws IOException;
+
+ /**
+ * Increments a counter of the given name by "increment"
+ * @param name
+ * @param increment
+ * @return
+ * @throws IOException
+ */
+ public Long incrementCounter(String name, long increment) throws IOException;
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/bd84e87c/common/src/java/org/apache/hadoop/hive/common/metrics/common/MetricsFactory.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/common/metrics/common/MetricsFactory.java b/common/src/java/org/apache/hadoop/hive/common/metrics/common/MetricsFactory.java
new file mode 100644
index 0000000..12a309d
--- /dev/null
+++ b/common/src/java/org/apache/hadoop/hive/common/metrics/common/MetricsFactory.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.common.metrics.common;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.util.ReflectionUtils;
+
+/**
+ * Class that manages a static Metric instance for this process.
+ */
+public class MetricsFactory {
+
+ private static Metrics metrics;
+ private static Object initLock = new Object();
+
+ public synchronized static void init(HiveConf conf) throws Exception {
+ if (metrics == null) {
+ metrics = (Metrics) ReflectionUtils.newInstance(conf.getClassByName(
+ conf.getVar(HiveConf.ConfVars.HIVE_METRICS_CLASS)), conf);
+ }
+ metrics.init(conf);
+ }
+
+ public synchronized static Metrics getMetricsInstance() {
+ return metrics;
+ }
+
+ public synchronized static void deInit() throws Exception {
+ if (metrics != null) {
+ metrics.deInit();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/bd84e87c/common/src/java/org/apache/hadoop/hive/common/metrics/metrics2/CodahaleMetrics.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/common/metrics/metrics2/CodahaleMetrics.java b/common/src/java/org/apache/hadoop/hive/common/metrics/metrics2/CodahaleMetrics.java
new file mode 100644
index 0000000..e59da99
--- /dev/null
+++ b/common/src/java/org/apache/hadoop/hive/common/metrics/metrics2/CodahaleMetrics.java
@@ -0,0 +1,366 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.common.metrics.metrics2;
+
+import com.codahale.metrics.ConsoleReporter;
+import com.codahale.metrics.Counter;
+import com.codahale.metrics.ExponentiallyDecayingReservoir;
+import com.codahale.metrics.JmxReporter;
+import com.codahale.metrics.Metric;
+import com.codahale.metrics.MetricRegistry;
+import com.codahale.metrics.MetricSet;
+import com.codahale.metrics.Timer;
+import com.codahale.metrics.json.MetricsModule;
+import com.codahale.metrics.jvm.BufferPoolMetricSet;
+import com.codahale.metrics.jvm.ClassLoadingGaugeSet;
+import com.codahale.metrics.jvm.GarbageCollectorMetricSet;
+import com.codahale.metrics.jvm.MemoryUsageGaugeSet;
+import com.codahale.metrics.jvm.ThreadStatesGaugeSet;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Splitter;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+import com.google.common.collect.Lists;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hive.conf.HiveConf;
+
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.lang.management.ManagementFactory;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TimerTask;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * Codahale-backed Metrics implementation.
+ */
+public class CodahaleMetrics implements org.apache.hadoop.hive.common.metrics.common.Metrics {
+ public static final String API_PREFIX = "api_";
+ public static final Log LOGGER = LogFactory.getLog(CodahaleMetrics.class);
+
+ public final MetricRegistry metricRegistry = new MetricRegistry();
+ private final Lock timersLock = new ReentrantLock();
+ private final Lock countersLock = new ReentrantLock();
+
+ private LoadingCache<String, Timer> timers;
+ private LoadingCache<String, Counter> counters;
+
+ private boolean initialized = false;
+ private HiveConf conf;
+ private final Set<Closeable> reporters = new HashSet<Closeable>();
+
+ private final ThreadLocal<HashMap<String, MetricsScope>> threadLocalScopes
+ = new ThreadLocal<HashMap<String,MetricsScope>>() {
+ @Override
+ protected HashMap<String,MetricsScope> initialValue() {
+ return new HashMap<String,MetricsScope>();
+ }
+ };
+
+ public static class MetricsScope {
+
+ final String name;
+ final Timer timer;
+ Timer.Context timerContext;
+ CodahaleMetrics metrics;
+
+ private boolean isOpen = false;
+
+ /**
+ * Instantiates a named scope - intended to only be called by Metrics, so locally scoped.
+ * @param name - name of the variable
+ * @throws IOException
+ */
+ private MetricsScope(String name, CodahaleMetrics metrics) throws IOException {
+ this.name = name;
+ this.metrics = metrics;
+ this.timer = metrics.getTimer(name);
+ open();
+ }
+
+ /**
+ * Opens scope, and makes note of the time started, increments run counter
+ * @throws IOException
+ *
+ */
+ public void open() throws IOException {
+ if (!isOpen) {
+ isOpen = true;
+ this.timerContext = timer.time();
+ } else {
+ throw new IOException("Scope named " + name + " is not closed, cannot be opened.");
+ }
+ }
+
+ /**
+ * Closes scope, and records the time taken
+ * @throws IOException
+ */
+ public void close() throws IOException {
+ if (isOpen) {
+ timerContext.close();
+
+ } else {
+ throw new IOException("Scope named " + name + " is not open, cannot be closed.");
+ }
+ isOpen = false;
+ }
+ }
+
+ public synchronized void init(HiveConf conf) throws Exception {
+ if (initialized) {
+ return;
+ }
+
+ this.conf = conf;
+ //Codahale artifacts are lazily-created.
+ timers = CacheBuilder.newBuilder().build(
+ new CacheLoader<String, com.codahale.metrics.Timer>() {
+ @Override
+ public com.codahale.metrics.Timer load(String key) throws Exception {
+ Timer timer = new Timer(new ExponentiallyDecayingReservoir());
+ metricRegistry.register(key, timer);
+ return timer;
+ }
+ }
+ );
+ counters = CacheBuilder.newBuilder().build(
+ new CacheLoader<String, Counter>() {
+ @Override
+ public Counter load(String key) throws Exception {
+ Counter counter = new Counter();
+ metricRegistry.register(key, counter);
+ return counter;
+ }
+ }
+ );
+
+ //register JVM metrics
+ registerAll("gc", new GarbageCollectorMetricSet());
+ registerAll("buffers", new BufferPoolMetricSet(ManagementFactory.getPlatformMBeanServer()));
+ registerAll("memory", new MemoryUsageGaugeSet());
+ registerAll("threads", new ThreadStatesGaugeSet());
+ registerAll("classLoading", new ClassLoadingGaugeSet());
+
+ //Metrics reporter
+ Set<MetricsReporting> finalReporterList = new HashSet<MetricsReporting>();
+ List<String> metricsReporterNames = Lists.newArrayList(
+ Splitter.on(",").trimResults().omitEmptyStrings().split(conf.getVar(HiveConf.ConfVars.HIVE_METRICS_REPORTER)));
+
+ if(metricsReporterNames != null) {
+ for (String metricsReportingName : metricsReporterNames) {
+ try {
+ MetricsReporting reporter = MetricsReporting.valueOf(metricsReportingName.trim().toUpperCase());
+ finalReporterList.add(reporter);
+ } catch (IllegalArgumentException e) {
+ LOGGER.warn("Metrics reporter skipped due to invalid configured reporter: " + metricsReportingName);
+ }
+ }
+ }
+ initReporting(finalReporterList);
+ initialized = true;
+ }
+
+
+ public synchronized void deInit() throws Exception {
+ if (initialized) {
+ if (reporters != null) {
+ for (Closeable reporter : reporters) {
+ reporter.close();
+ }
+ }
+ for (Map.Entry<String, Metric> metric : metricRegistry.getMetrics().entrySet()) {
+ metricRegistry.remove(metric.getKey());
+ }
+ timers.invalidateAll();
+ counters.invalidateAll();
+ initialized = false;
+ }
+ }
+
+ public void startScope(String name) throws IOException {
+ synchronized (this) {
+ if (!initialized) {
+ return;
+ }
+ }
+ name = API_PREFIX + name;
+ if (threadLocalScopes.get().containsKey(name)) {
+ threadLocalScopes.get().get(name).open();
+ } else {
+ threadLocalScopes.get().put(name, new MetricsScope(name, this));
+ }
+ }
+
+ public void endScope(String name) throws IOException{
+ synchronized (this) {
+ if (!initialized) {
+ return;
+ }
+ }
+ name = API_PREFIX + name;
+ if (threadLocalScopes.get().containsKey(name)) {
+ threadLocalScopes.get().get(name).close();
+ }
+ }
+
+ public Long incrementCounter(String name) throws IOException {
+ return incrementCounter(name, 1);
+ }
+
+ public Long incrementCounter(String name, long increment) throws IOException {
+ String key = name;
+ try {
+ countersLock.lock();
+ counters.get(key).inc(increment);
+ return counters.get(key).getCount();
+ } catch(ExecutionException ee) {
+ throw new RuntimeException(ee);
+ } finally {
+ countersLock.unlock();
+ }
+ }
+
+ // This method is necessary to synchronize lazy-creation to the timers.
+ private Timer getTimer(String name) throws IOException {
+ String key = name;
+ try {
+ timersLock.lock();
+ Timer timer = timers.get(key);
+ return timer;
+ } catch (ExecutionException e) {
+ throw new IOException(e);
+ } finally {
+ timersLock.unlock();
+ }
+ }
+
+ private void registerAll(String prefix, MetricSet metricSet) {
+ for (Map.Entry<String, Metric> entry : metricSet.getMetrics().entrySet()) {
+ if (entry.getValue() instanceof MetricSet) {
+ registerAll(prefix + "." + entry.getKey(), (MetricSet) entry.getValue());
+ } else {
+ metricRegistry.register(prefix + "." + entry.getKey(), entry.getValue());
+ }
+ }
+ }
+
+ @VisibleForTesting
+ public MetricRegistry getMetricRegistry() {
+ return metricRegistry;
+ }
+
+ /**
+ * Should be only called once to initialize the reporters
+ */
+ private void initReporting(Set<MetricsReporting> reportingSet) throws Exception {
+ for (MetricsReporting reporting : reportingSet) {
+ switch(reporting) {
+ case CONSOLE:
+ final ConsoleReporter consoleReporter = ConsoleReporter.forRegistry(metricRegistry)
+ .convertRatesTo(TimeUnit.SECONDS)
+ .convertDurationsTo(TimeUnit.MILLISECONDS)
+ .build();
+ consoleReporter.start(1, TimeUnit.SECONDS);
+ reporters.add(consoleReporter);
+ break;
+ case JMX:
+ final JmxReporter jmxReporter = JmxReporter.forRegistry(metricRegistry)
+ .convertRatesTo(TimeUnit.SECONDS)
+ .convertDurationsTo(TimeUnit.MILLISECONDS)
+ .build();
+ jmxReporter.start();
+ reporters.add(jmxReporter);
+ break;
+ case JSON_FILE:
+ final JsonFileReporter jsonFileReporter = new JsonFileReporter();
+ jsonFileReporter.start();
+ reporters.add(jsonFileReporter);
+ break;
+ }
+ }
+ }
+
+ class JsonFileReporter implements Closeable {
+ private ObjectMapper jsonMapper = null;
+ private java.util.Timer timer = null;
+
+ public void start() {
+ this.jsonMapper = new ObjectMapper().registerModule(new MetricsModule(TimeUnit.MILLISECONDS, TimeUnit.MILLISECONDS, false));
+ this.timer = new java.util.Timer(true);
+
+ long time = conf.getTimeVar(HiveConf.ConfVars.HIVE_METRICS_JSON_FILE_INTERVAL, TimeUnit.MILLISECONDS);
+ final String pathString = conf.getVar(HiveConf.ConfVars.HIVE_METRICS_JSON_FILE_LOCATION);
+
+ timer.schedule(new TimerTask() {
+ @Override
+ public void run() {
+ BufferedWriter bw = null;
+ try {
+ String json = jsonMapper.writerWithDefaultPrettyPrinter().writeValueAsString(metricRegistry);
+ Path tmpPath = new Path(pathString + ".tmp");
+ FileSystem fs = FileSystem.get(conf);
+ fs.delete(tmpPath, true);
+ bw = new BufferedWriter(new OutputStreamWriter(fs.create(tmpPath, true)));
+ bw.write(json);
+ bw.close();
+
+ Path path = new Path(pathString);
+ fs.rename(tmpPath, path);
+ fs.setPermission(path, FsPermission.createImmutable((short) 0644));
+ } catch (Exception e) {
+ LOGGER.warn("Error writing JSON Metrics to file", e);
+ } finally {
+ try {
+ if (bw != null) {
+ bw.close();
+ }
+ } catch (IOException e) {
+ //Ignore.
+ }
+ }
+
+
+ }
+ }, 0, time);
+ }
+
+ public void close() {
+ if (timer != null) {
+ this.timer.cancel();
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/bd84e87c/common/src/java/org/apache/hadoop/hive/common/metrics/metrics2/MetricsReporting.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/common/metrics/metrics2/MetricsReporting.java b/common/src/java/org/apache/hadoop/hive/common/metrics/metrics2/MetricsReporting.java
new file mode 100644
index 0000000..643246f
--- /dev/null
+++ b/common/src/java/org/apache/hadoop/hive/common/metrics/metrics2/MetricsReporting.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.common.metrics.metrics2;
+
+/**
+ * Reporting options for org.apache.hadoop.hive.common.metrics.metrics2.Metrics.
+ */
+public enum MetricsReporting {
+ JMX,
+ CONSOLE,
+ JSON_FILE
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/bd84e87c/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 4fe2ae8..f40d159 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -645,6 +645,7 @@ public class HiveConf extends Configuration {
"Maximum cache full % after which the cache cleaner thread kicks in."),
METASTORE_AGGREGATE_STATS_CACHE_CLEAN_UNTIL("hive.metastore.aggregate.stats.cache.clean.until", (float) 0.8,
"The cleaner thread cleans until cache reaches this % full size."),
+ METASTORE_METRICS("hive.metastore.metrics.enabled", false, "Enable metrics on the metastore."),
// Parameters for exporting metadata on table drop (requires the use of the)
// org.apache.hadoop.hive.ql.parse.MetaDataExportListener preevent listener
@@ -1688,6 +1689,7 @@ public class HiveConf extends Configuration {
" EXECUTION: Log completion of tasks\n" +
" PERFORMANCE: Execution + Performance logs \n" +
" VERBOSE: All logs" ),
+ HIVE_SERVER2_METRICS_ENABLED("hive.server2.metrics.enabled", false, "Enable metrics on the HiveServer2."),
// logging configuration
HIVE_LOG4J_FILE("hive.log4j.file", "",
"Hive log4j configuration file.\n" +
@@ -1715,7 +1717,21 @@ public class HiveConf extends Configuration {
HIVE_AUTOGEN_COLUMNALIAS_PREFIX_INCLUDEFUNCNAME(
"hive.autogen.columnalias.prefix.includefuncname", false,
"Whether to include function name in the column alias auto generated by Hive."),
-
+ HIVE_METRICS_CLASS("hive.service.metrics.class",
+ "org.apache.hadoop.hive.common.metrics.metrics2.CodahaleMetrics",
+ new StringSet(
+ "org.apache.hadoop.hive.common.metrics.metrics2.CodahaleMetrics",
+ "org.apache.hadoop.hive.common.metrics.LegacyMetrics"),
+ "Hive metrics subsystem implementation class."),
+ HIVE_METRICS_REPORTER("hive.service.metrics.reporter", "JSON_FILE, JMX",
+ "Reporter type for metric class org.apache.hadoop.hive.common.metrics.metrics2.CodahaleMetrics, comma separated list of JMX, CONSOLE, JSON_FILE"),
+ HIVE_METRICS_JSON_FILE_LOCATION("hive.service.metrics.file.location", "file:///tmp/my-logging.properties",
+ "For metric class org.apache.hadoop.hive.common.metrics.metrics2.CodahaleMetrics JSON_FILE reporter, the location of JSON metrics file. " +
+ "This file will get overwritten at every interval."),
+ HIVE_METRICS_JSON_FILE_INTERVAL("hive.service.metrics.file.frequency", "5s",
+ new TimeValidator(TimeUnit.MILLISECONDS),
+ "For metric class org.apache.hadoop.hive.common.metrics.metrics2.CodahaleMetrics JSON_FILE reporter, " +
+ "the frequency of updating JSON metrics file."),
HIVE_PERF_LOGGER("hive.exec.perf.logger", "org.apache.hadoop.hive.ql.log.PerfLogger",
"The class responsible for logging client side performance metrics. \n" +
"Must be a subclass of org.apache.hadoop.hive.ql.log.PerfLogger"),
http://git-wip-us.apache.org/repos/asf/hive/blob/bd84e87c/common/src/test/org/apache/hadoop/hive/common/metrics/TestLegacyMetrics.java
----------------------------------------------------------------------
diff --git a/common/src/test/org/apache/hadoop/hive/common/metrics/TestLegacyMetrics.java b/common/src/test/org/apache/hadoop/hive/common/metrics/TestLegacyMetrics.java
new file mode 100644
index 0000000..c14c7ee
--- /dev/null
+++ b/common/src/test/org/apache/hadoop/hive/common/metrics/TestLegacyMetrics.java
@@ -0,0 +1,295 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.common.metrics;
+
+import java.io.IOException;
+import java.lang.management.ManagementFactory;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import javax.management.Attribute;
+import javax.management.MBeanAttributeInfo;
+import javax.management.MBeanInfo;
+import javax.management.MBeanOperationInfo;
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+
+import org.apache.hadoop.hive.common.metrics.common.MetricsFactory;
+import org.apache.hadoop.hive.common.metrics.LegacyMetrics.MetricsScope;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import static org.junit.Assert.*;
+
+public class TestLegacyMetrics {
+
+ private static final String scopeName = "foo";
+ private static final long periodMs = 50L;
+ private static LegacyMetrics metrics;
+
+ @Before
+ public void before() throws Exception {
+ MetricsFactory.deInit();
+ HiveConf conf = new HiveConf();
+ conf.setVar(HiveConf.ConfVars.HIVE_METRICS_CLASS, LegacyMetrics.class.getCanonicalName());
+ MetricsFactory.init(conf);
+ metrics = (LegacyMetrics) MetricsFactory.getMetricsInstance();
+ }
+
+ @After
+ public void after() throws Exception {
+ MetricsFactory.deInit();
+ }
+
+ @Test
+ public void testMetricsMBean() throws Exception {
+ MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
+ final ObjectName oname = new ObjectName(
+ "org.apache.hadoop.hive.common.metrics:type=MetricsMBean");
+ MBeanInfo mBeanInfo = mbs.getMBeanInfo(oname);
+ // check implementation class:
+ assertEquals(MetricsMBeanImpl.class.getName(), mBeanInfo.getClassName());
+
+ // check reset operation:
+ MBeanOperationInfo[] oops = mBeanInfo.getOperations();
+ boolean resetFound = false;
+ for (MBeanOperationInfo op : oops) {
+ if ("reset".equals(op.getName())) {
+ resetFound = true;
+ break;
+ }
+ }
+ assertTrue(resetFound);
+
+ // add metric with a non-null value:
+ Attribute attr = new Attribute("fooMetric", Long.valueOf(-77));
+ mbs.setAttribute(oname, attr);
+
+ mBeanInfo = mbs.getMBeanInfo(oname);
+ MBeanAttributeInfo[] attrinuteInfos = mBeanInfo.getAttributes();
+ assertEquals(1, attrinuteInfos.length);
+ boolean attrFound = false;
+ for (MBeanAttributeInfo info : attrinuteInfos) {
+ if ("fooMetric".equals(info.getName())) {
+ assertEquals("java.lang.Long", info.getType());
+ assertTrue(info.isReadable());
+ assertTrue(info.isWritable());
+ assertFalse(info.isIs());
+
+ attrFound = true;
+ break;
+ }
+ }
+ assertTrue(attrFound);
+
+ // check metric value:
+ Object v = mbs.getAttribute(oname, "fooMetric");
+ assertEquals(Long.valueOf(-77), v);
+
+ // reset the bean:
+ Object result = mbs.invoke(oname, "reset", new Object[0], new String[0]);
+ assertNull(result);
+
+ // the metric value must be zeroed:
+ v = mbs.getAttribute(oname, "fooMetric");
+ assertEquals(Long.valueOf(0), v);
+ }
+
+ private <T> void expectIOE(Callable<T> c) throws Exception {
+ try {
+ T t = c.call();
+ fail("IOE expected but ["+t+"] was returned.");
+ } catch (IOException ioe) {
+ // ok, expected
+ }
+ }
+
+ @Test
+ public void testScopeSingleThread() throws Exception {
+ metrics.startScope(scopeName);
+ final MetricsScope fooScope = metrics.getScope(scopeName);
+ // the time and number counters become available only after the 1st
+ // scope close:
+ expectIOE(new Callable<Long>() {
+ @Override
+ public Long call() throws Exception {
+ Long num = fooScope.getNumCounter();
+ return num;
+ }
+ });
+ expectIOE(new Callable<Long>() {
+ @Override
+ public Long call() throws Exception {
+ Long time = fooScope.getTimeCounter();
+ return time;
+ }
+ });
+ // cannot open scope that is already open:
+ expectIOE(new Callable<Void>() {
+ @Override
+ public Void call() throws Exception {
+ fooScope.open();
+ return null;
+ }
+ });
+
+ assertSame(fooScope, metrics.getScope(scopeName));
+ Thread.sleep(periodMs+ 1);
+ // 1st close:
+ // closing of open scope should be ok:
+ metrics.endScope(scopeName);
+ expectIOE(new Callable<Void>() {
+ @Override
+ public Void call() throws Exception {
+ metrics.endScope(scopeName); // closing of closed scope not allowed
+ return null;
+ }
+ });
+
+ assertEquals(Long.valueOf(1), fooScope.getNumCounter());
+ final long t1 = fooScope.getTimeCounter().longValue();
+ assertTrue(t1 > periodMs);
+
+ assertSame(fooScope, metrics.getScope(scopeName));
+
+ // opening allowed after closing:
+ metrics.startScope(scopeName);
+ // opening of already open scope not allowed:
+ expectIOE(new Callable<Void>() {
+ @Override
+ public Void call() throws Exception {
+ metrics.startScope(scopeName);
+ return null;
+ }
+ });
+
+ assertEquals(Long.valueOf(1), fooScope.getNumCounter());
+ assertEquals(t1, fooScope.getTimeCounter().longValue());
+
+ assertSame(fooScope, metrics.getScope(scopeName));
+ Thread.sleep(periodMs + 1);
+ // Reopening (close + open) allowed in opened state:
+ fooScope.reopen();
+
+ assertEquals(Long.valueOf(2), fooScope.getNumCounter());
+ assertTrue(fooScope.getTimeCounter().longValue() > 2 * periodMs);
+
+ Thread.sleep(periodMs + 1);
+ // 3rd close:
+ fooScope.close();
+
+ assertEquals(Long.valueOf(3), fooScope.getNumCounter());
+ assertTrue(fooScope.getTimeCounter().longValue() > 3 * periodMs);
+ Double avgT = (Double) metrics.get("foo.avg_t");
+ assertTrue(avgT.doubleValue() > periodMs);
+ }
+
+ @Test
+ public void testScopeConcurrency() throws Exception {
+ metrics.startScope(scopeName);
+ MetricsScope fooScope = metrics.getScope(scopeName);
+ final int threads = 10;
+ ExecutorService executorService = Executors.newFixedThreadPool(threads);
+ for (int i=0; i<threads; i++) {
+ final int n = i;
+ executorService.submit(new Callable<Void>() {
+ @Override
+ public Void call() throws Exception {
+ testScopeImpl(n);
+ return null;
+ }
+ });
+ }
+ executorService.shutdown();
+ assertTrue(executorService.awaitTermination(periodMs * 3 * threads, TimeUnit.MILLISECONDS));
+
+ fooScope = metrics.getScope(scopeName);
+ assertEquals(Long.valueOf(3 * threads), fooScope.getNumCounter());
+ assertTrue(fooScope.getTimeCounter().longValue() > 3 * periodMs * threads);
+ Double avgT = (Double) metrics.get("foo.avg_t");
+ assertTrue(avgT.doubleValue() > periodMs);
+ metrics.endScope(scopeName);
+ }
+
+ void testScopeImpl(int n) throws Exception {
+ metrics.startScope(scopeName);
+ final MetricsScope fooScope = metrics.getScope(scopeName);
+ // cannot open scope that is already open:
+ expectIOE(new Callable<Void>() {
+ @Override
+ public Void call() throws Exception {
+ fooScope.open();
+ return null;
+ }
+ });
+
+ assertSame(fooScope, metrics.getScope(scopeName));
+ Thread.sleep(periodMs+ 1);
+ // 1st close:
+ metrics.endScope(scopeName); // closing of open scope should be ok.
+
+ assertTrue(fooScope.getNumCounter().longValue() >= 1);
+ final long t1 = fooScope.getTimeCounter().longValue();
+ assertTrue(t1 > periodMs);
+
+ expectIOE(new Callable<Void>() {
+ @Override
+ public Void call() throws Exception {
+ metrics.endScope(scopeName); // closing of closed scope not allowed
+ return null;
+ }
+ });
+
+ assertSame(fooScope, metrics.getScope(scopeName));
+
+ // opening allowed after closing:
+ metrics.startScope(scopeName);
+
+ assertTrue(fooScope.getNumCounter().longValue() >= 1);
+ assertTrue(fooScope.getTimeCounter().longValue() >= t1);
+
+ // opening of already open scope not allowed:
+ expectIOE(new Callable<Void>() {
+ @Override
+ public Void call() throws Exception {
+ metrics.startScope(scopeName);
+ return null;
+ }
+ });
+
+ assertSame(fooScope, metrics.getScope(scopeName));
+ Thread.sleep(periodMs + 1);
+ // Reopening (close + open) allowed in opened state:
+ fooScope.reopen();
+
+ assertTrue(fooScope.getNumCounter().longValue() >= 2);
+ assertTrue(fooScope.getTimeCounter().longValue() > 2 * periodMs);
+
+ Thread.sleep(periodMs + 1);
+ // 3rd close:
+ fooScope.close();
+
+ assertTrue(fooScope.getNumCounter().longValue() >= 3);
+ assertTrue(fooScope.getTimeCounter().longValue() > 3 * periodMs);
+ Double avgT = (Double) metrics.get("foo.avg_t");
+ assertTrue(avgT.doubleValue() > periodMs);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/bd84e87c/common/src/test/org/apache/hadoop/hive/common/metrics/TestMetrics.java
----------------------------------------------------------------------
diff --git a/common/src/test/org/apache/hadoop/hive/common/metrics/TestMetrics.java b/common/src/test/org/apache/hadoop/hive/common/metrics/TestMetrics.java
deleted file mode 100644
index e85d3f8..0000000
--- a/common/src/test/org/apache/hadoop/hive/common/metrics/TestMetrics.java
+++ /dev/null
@@ -1,286 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hive.common.metrics;
-
-import java.io.IOException;
-import java.lang.management.ManagementFactory;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-
-import javax.management.Attribute;
-import javax.management.MBeanAttributeInfo;
-import javax.management.MBeanInfo;
-import javax.management.MBeanOperationInfo;
-import javax.management.MBeanServer;
-import javax.management.ObjectName;
-
-import org.apache.hadoop.hive.common.metrics.Metrics.MetricsScope;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import static org.junit.Assert.*;
-
-public class TestMetrics {
-
- private static final String scopeName = "foo";
- private static final long periodMs = 50L;
-
- @Before
- public void before() throws Exception {
- Metrics.uninit();
- Metrics.init();
- }
-
- @After
- public void after() throws Exception {
- Metrics.uninit();
- }
-
- @Test
- public void testMetricsMBean() throws Exception {
- MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
- final ObjectName oname = new ObjectName(
- "org.apache.hadoop.hive.common.metrics:type=MetricsMBean");
- MBeanInfo mBeanInfo = mbs.getMBeanInfo(oname);
- // check implementation class:
- assertEquals(MetricsMBeanImpl.class.getName(), mBeanInfo.getClassName());
-
- // check reset operation:
- MBeanOperationInfo[] oops = mBeanInfo.getOperations();
- boolean resetFound = false;
- for (MBeanOperationInfo op : oops) {
- if ("reset".equals(op.getName())) {
- resetFound = true;
- break;
- }
- }
- assertTrue(resetFound);
-
- // add metric with a non-null value:
- Attribute attr = new Attribute("fooMetric", Long.valueOf(-77));
- mbs.setAttribute(oname, attr);
-
- mBeanInfo = mbs.getMBeanInfo(oname);
- MBeanAttributeInfo[] attrinuteInfos = mBeanInfo.getAttributes();
- assertEquals(1, attrinuteInfos.length);
- boolean attrFound = false;
- for (MBeanAttributeInfo info : attrinuteInfos) {
- if ("fooMetric".equals(info.getName())) {
- assertEquals("java.lang.Long", info.getType());
- assertTrue(info.isReadable());
- assertTrue(info.isWritable());
- assertFalse(info.isIs());
-
- attrFound = true;
- break;
- }
- }
- assertTrue(attrFound);
-
- // check metric value:
- Object v = mbs.getAttribute(oname, "fooMetric");
- assertEquals(Long.valueOf(-77), v);
-
- // reset the bean:
- Object result = mbs.invoke(oname, "reset", new Object[0], new String[0]);
- assertNull(result);
-
- // the metric value must be zeroed:
- v = mbs.getAttribute(oname, "fooMetric");
- assertEquals(Long.valueOf(0), v);
- }
-
- private <T> void expectIOE(Callable<T> c) throws Exception {
- try {
- T t = c.call();
- fail("IOE expected but ["+t+"] was returned.");
- } catch (IOException ioe) {
- // ok, expected
- }
- }
-
- @Test
- public void testScopeSingleThread() throws Exception {
- final MetricsScope fooScope = Metrics.startScope(scopeName);
- // the time and number counters become available only after the 1st
- // scope close:
- expectIOE(new Callable<Long>() {
- @Override
- public Long call() throws Exception {
- Long num = fooScope.getNumCounter();
- return num;
- }
- });
- expectIOE(new Callable<Long>() {
- @Override
- public Long call() throws Exception {
- Long time = fooScope.getTimeCounter();
- return time;
- }
- });
- // cannot open scope that is already open:
- expectIOE(new Callable<Void>() {
- @Override
- public Void call() throws Exception {
- fooScope.open();
- return null;
- }
- });
-
- assertSame(fooScope, Metrics.getScope(scopeName));
- Thread.sleep(periodMs+1);
- // 1st close:
- // closing of open scope should be ok:
- Metrics.endScope(scopeName);
- expectIOE(new Callable<Void>() {
- @Override
- public Void call() throws Exception {
- Metrics.endScope(scopeName); // closing of closed scope not allowed
- return null;
- }
- });
-
- assertEquals(Long.valueOf(1), fooScope.getNumCounter());
- final long t1 = fooScope.getTimeCounter().longValue();
- assertTrue(t1 > periodMs);
-
- assertSame(fooScope, Metrics.getScope(scopeName));
-
- // opening allowed after closing:
- Metrics.startScope(scopeName);
- // opening of already open scope not allowed:
- expectIOE(new Callable<Void>() {
- @Override
- public Void call() throws Exception {
- Metrics.startScope(scopeName);
- return null;
- }
- });
-
- assertEquals(Long.valueOf(1), fooScope.getNumCounter());
- assertEquals(t1, fooScope.getTimeCounter().longValue());
-
- assertSame(fooScope, Metrics.getScope(scopeName));
- Thread.sleep(periodMs + 1);
- // Reopening (close + open) allowed in opened state:
- fooScope.reopen();
-
- assertEquals(Long.valueOf(2), fooScope.getNumCounter());
- assertTrue(fooScope.getTimeCounter().longValue() > 2 * periodMs);
-
- Thread.sleep(periodMs + 1);
- // 3rd close:
- fooScope.close();
-
- assertEquals(Long.valueOf(3), fooScope.getNumCounter());
- assertTrue(fooScope.getTimeCounter().longValue() > 3 * periodMs);
- Double avgT = (Double)Metrics.get("foo.avg_t");
- assertTrue(avgT.doubleValue() > periodMs);
- }
-
- @Test
- public void testScopeConcurrency() throws Exception {
- MetricsScope fooScope = Metrics.startScope(scopeName);
- final int threads = 10;
- ExecutorService executorService = Executors.newFixedThreadPool(threads);
- for (int i=0; i<threads; i++) {
- final int n = i;
- executorService.submit(new Callable<Void>() {
- @Override
- public Void call() throws Exception {
- testScopeImpl(n);
- return null;
- }
- });
- }
- executorService.shutdown();
- assertTrue(executorService.awaitTermination(periodMs * 3 * threads, TimeUnit.MILLISECONDS));
-
- fooScope = Metrics.getScope(scopeName);
- assertEquals(Long.valueOf(3 * threads), fooScope.getNumCounter());
- assertTrue(fooScope.getTimeCounter().longValue() > 3 * periodMs * threads);
- Double avgT = (Double)Metrics.get("foo.avg_t");
- assertTrue(avgT.doubleValue() > periodMs);
- Metrics.endScope(scopeName);
- }
-
- void testScopeImpl(int n) throws Exception {
- final MetricsScope fooScope = Metrics.startScope(scopeName);
- // cannot open scope that is already open:
- expectIOE(new Callable<Void>() {
- @Override
- public Void call() throws Exception {
- fooScope.open();
- return null;
- }
- });
-
- assertSame(fooScope, Metrics.getScope(scopeName));
- Thread.sleep(periodMs+1);
- // 1st close:
- Metrics.endScope(scopeName); // closing of open scope should be ok.
-
- assertTrue(fooScope.getNumCounter().longValue() >= 1);
- final long t1 = fooScope.getTimeCounter().longValue();
- assertTrue(t1 > periodMs);
-
- expectIOE(new Callable<Void>() {
- @Override
- public Void call() throws Exception {
- Metrics.endScope(scopeName); // closing of closed scope not allowed
- return null;
- }
- });
-
- assertSame(fooScope, Metrics.getScope(scopeName));
-
- // opening allowed after closing:
- Metrics.startScope(scopeName);
-
- assertTrue(fooScope.getNumCounter().longValue() >= 1);
- assertTrue(fooScope.getTimeCounter().longValue() >= t1);
-
- // opening of already open scope not allowed:
- expectIOE(new Callable<Void>() {
- @Override
- public Void call() throws Exception {
- Metrics.startScope(scopeName);
- return null;
- }
- });
-
- assertSame(fooScope, Metrics.getScope(scopeName));
- Thread.sleep(periodMs + 1);
- // Reopening (close + open) allowed in opened state:
- fooScope.reopen();
-
- assertTrue(fooScope.getNumCounter().longValue() >= 2);
- assertTrue(fooScope.getTimeCounter().longValue() > 2 * periodMs);
-
- Thread.sleep(periodMs + 1);
- // 3rd close:
- fooScope.close();
-
- assertTrue(fooScope.getNumCounter().longValue() >= 3);
- assertTrue(fooScope.getTimeCounter().longValue() > 3 * periodMs);
- Double avgT = (Double)Metrics.get("foo.avg_t");
- assertTrue(avgT.doubleValue() > periodMs);
- }
-}
http://git-wip-us.apache.org/repos/asf/hive/blob/bd84e87c/common/src/test/org/apache/hadoop/hive/common/metrics/metrics2/TestCodahaleMetrics.java
----------------------------------------------------------------------
diff --git a/common/src/test/org/apache/hadoop/hive/common/metrics/metrics2/TestCodahaleMetrics.java b/common/src/test/org/apache/hadoop/hive/common/metrics/metrics2/TestCodahaleMetrics.java
new file mode 100644
index 0000000..8749349
--- /dev/null
+++ b/common/src/test/org/apache/hadoop/hive/common/metrics/metrics2/TestCodahaleMetrics.java
@@ -0,0 +1,138 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.common.metrics.metrics2;
+
+import com.codahale.metrics.Counter;
+import com.codahale.metrics.MetricRegistry;
+import com.codahale.metrics.Timer;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.hadoop.hive.common.metrics.common.MetricsFactory;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.shims.ShimLoader;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Unit test for new Metrics subsystem.
+ */
+public class TestCodahaleMetrics {
+
+ private static File workDir = new File(System.getProperty("test.tmp.dir"));
+ private static File jsonReportFile;
+ public static MetricRegistry metricRegistry;
+
+ @Before
+ public void before() throws Exception {
+ HiveConf conf = new HiveConf();
+
+ jsonReportFile = new File(workDir, "json_reporting");
+ jsonReportFile.delete();
+ String defaultFsName = ShimLoader.getHadoopShims().getHadoopConfNames().get("HADOOPFS");
+ conf.set(defaultFsName, "local");
+ conf.setVar(HiveConf.ConfVars.HIVE_METRICS_CLASS, CodahaleMetrics.class.getCanonicalName());
+ conf.setVar(HiveConf.ConfVars.HIVE_METRICS_REPORTER, MetricsReporting.JSON_FILE.name() + "," + MetricsReporting.JMX.name());
+ conf.setVar(HiveConf.ConfVars.HIVE_METRICS_JSON_FILE_LOCATION, jsonReportFile.toString());
+ conf.setVar(HiveConf.ConfVars.HIVE_METRICS_JSON_FILE_INTERVAL, "100ms");
+
+ MetricsFactory.init(conf);
+ metricRegistry = ((CodahaleMetrics) MetricsFactory.getMetricsInstance()).getMetricRegistry();
+ }
+
+ @After
+ public void after() throws Exception {
+ MetricsFactory.deInit();
+ }
+
+ @Test
+ public void testScope() throws Exception {
+ int runs = 5;
+ for (int i = 0; i < runs; i++) {
+ MetricsFactory.getMetricsInstance().startScope("method1");
+ MetricsFactory.getMetricsInstance().endScope("method1");
+ }
+
+ Timer timer = metricRegistry.getTimers().get("api_method1");
+ Assert.assertEquals(5, timer.getCount());
+ Assert.assertTrue(timer.getMeanRate() > 0);
+ }
+
+
+ @Test
+ public void testCount() throws Exception {
+ int runs = 5;
+ for (int i = 0; i < runs; i++) {
+ MetricsFactory.getMetricsInstance().incrementCounter("count1");
+ }
+ Counter counter = metricRegistry.getCounters().get("count1");
+ Assert.assertEquals(5L, counter.getCount());
+ }
+
+ @Test
+ public void testConcurrency() throws Exception {
+ int threads = 4;
+ ExecutorService executorService = Executors.newFixedThreadPool(threads);
+ for (int i=0; i< threads; i++) {
+ final int n = i;
+ executorService.submit(new Callable<Void>() {
+ @Override
+ public Void call() throws Exception {
+ MetricsFactory.getMetricsInstance().startScope("method2");
+ MetricsFactory.getMetricsInstance().endScope("method2");
+ return null;
+ }
+ });
+ }
+ executorService.shutdown();
+ assertTrue(executorService.awaitTermination(10000, TimeUnit.MILLISECONDS));
+ Timer timer = metricRegistry.getTimers().get("api_method2");
+ Assert.assertEquals(4, timer.getCount());
+ Assert.assertTrue(timer.getMeanRate() > 0);
+ }
+
+ @Test
+ public void testFileReporting() throws Exception {
+ int runs = 5;
+ for (int i = 0; i < runs; i++) {
+ MetricsFactory.getMetricsInstance().incrementCounter("count2");
+ Thread.sleep(100);
+ }
+
+ Thread.sleep(2000);
+ byte[] jsonData = Files.readAllBytes(Paths.get(jsonReportFile.getAbsolutePath()));
+ ObjectMapper objectMapper = new ObjectMapper();
+
+ JsonNode rootNode = objectMapper.readTree(jsonData);
+ JsonNode countersNode = rootNode.path("counters");
+ JsonNode methodCounterNode = countersNode.path("count2");
+ JsonNode countNode = methodCounterNode.path("count");
+ Assert.assertEquals(countNode.asInt(), 5);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/bd84e87c/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestMetaStoreMetrics.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestMetaStoreMetrics.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestMetaStoreMetrics.java
new file mode 100644
index 0000000..25f34d1
--- /dev/null
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestMetaStoreMetrics.java
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.metastore;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import junit.framework.TestCase;
+import org.apache.hadoop.hive.cli.CliSessionState;
+import org.apache.hadoop.hive.common.metrics.metrics2.MetricsReporting;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.Driver;
+import org.apache.hadoop.hive.ql.session.SessionState;
+import org.apache.hadoop.hive.shims.ShimLoader;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+
+/**
+ * Tests Hive Metastore Metrics.
+ */
+public class TestMetaStoreMetrics {
+
+ private static File workDir = new File(System.getProperty("test.tmp.dir"));
+ private static File jsonReportFile;
+
+ private static HiveConf hiveConf;
+ private static Driver driver;
+
+
+ @Before
+ public void before() throws Exception {
+
+ int port = MetaStoreUtils.findFreePort();
+
+ jsonReportFile = new File(workDir, "json_reporting");
+ jsonReportFile.delete();
+
+ hiveConf = new HiveConf(TestMetaStoreMetrics.class);
+ hiveConf.setVar(HiveConf.ConfVars.METASTOREURIS, "thrift://localhost:" + port);
+ hiveConf.setIntVar(HiveConf.ConfVars.METASTORETHRIFTCONNECTIONRETRIES, 3);
+ hiveConf.setBoolVar(HiveConf.ConfVars.METASTORE_METRICS, true);
+ hiveConf.setBoolVar(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY, false);
+ hiveConf.setVar(HiveConf.ConfVars.HIVE_METRICS_REPORTER, MetricsReporting.JSON_FILE.name() + "," + MetricsReporting.JMX.name());
+ hiveConf.setVar(HiveConf.ConfVars.HIVE_METRICS_JSON_FILE_LOCATION, jsonReportFile.toString());
+ hiveConf.setVar(HiveConf.ConfVars.HIVE_METRICS_JSON_FILE_INTERVAL, "100ms");
+
+ MetaStoreUtils.startMetaStore(port, ShimLoader.getHadoopThriftAuthBridge(), hiveConf);
+
+ SessionState.start(new CliSessionState(hiveConf));
+ driver = new Driver(hiveConf);
+ }
+
+ @Test
+ public void testMetricsFile() throws Exception {
+ driver.run("show databases");
+
+ //give timer thread a chance to print the metrics
+ Thread.sleep(2000);
+
+ //As the file is being written, try a few times.
+ //This can be replaced by CodahaleMetrics's JsonServlet reporter once it is exposed.
+ byte[] jsonData = Files.readAllBytes(Paths.get(jsonReportFile.getAbsolutePath()));
+ ObjectMapper objectMapper = new ObjectMapper();
+
+ JsonNode rootNode = objectMapper.readTree(jsonData);
+ JsonNode countersNode = rootNode.path("timers");
+ JsonNode methodCounterNode = countersNode.path("api_get_all_databases");
+ JsonNode countNode = methodCounterNode.path("count");
+ Assert.assertTrue(countNode.asInt() > 0);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/bd84e87c/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
index a3e5ed2..828c585 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
@@ -18,39 +18,14 @@
package org.apache.hadoop.hive.metastore;
-import static org.apache.commons.lang.StringUtils.join;
-import static org.apache.hadoop.hive.metastore.MetaStoreUtils.DEFAULT_DATABASE_COMMENT;
-import static org.apache.hadoop.hive.metastore.MetaStoreUtils.DEFAULT_DATABASE_NAME;
-import static org.apache.hadoop.hive.metastore.MetaStoreUtils.validateName;
-
-import java.io.IOException;
-import java.text.DateFormat;
-import java.text.SimpleDateFormat;
-import java.util.AbstractMap;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.Formatter;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.LinkedHashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Properties;
-import java.util.Set;
-import java.util.Timer;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.locks.Condition;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
-import java.util.regex.Pattern;
-
-import javax.jdo.JDOException;
-
+import com.facebook.fb303.FacebookBase;
+import com.facebook.fb303.fb_status;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Splitter;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableListMultimap;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Multimaps;
import org.apache.commons.cli.OptionBuilder;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -58,12 +33,13 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.common.FileUtils;
+import org.apache.hadoop.hive.common.JvmPauseMonitor;
import org.apache.hadoop.hive.common.LogUtils;
import org.apache.hadoop.hive.common.LogUtils.LogInitializationException;
import org.apache.hadoop.hive.common.classification.InterfaceAudience;
import org.apache.hadoop.hive.common.classification.InterfaceStability;
import org.apache.hadoop.hive.common.cli.CommonCliOptions;
-import org.apache.hadoop.hive.common.metrics.Metrics;
+import org.apache.hadoop.hive.common.metrics.common.MetricsFactory;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.metastore.api.AbortTxnRequest;
@@ -221,14 +197,35 @@ import org.apache.thrift.transport.TServerTransport;
import org.apache.thrift.transport.TTransport;
import org.apache.thrift.transport.TTransportFactory;
-import com.facebook.fb303.FacebookBase;
-import com.facebook.fb303.fb_status;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Splitter;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableListMultimap;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Multimaps;
+import javax.jdo.JDOException;
+import java.io.IOException;
+import java.text.DateFormat;
+import java.text.SimpleDateFormat;
+import java.util.AbstractMap;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Formatter;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Properties;
+import java.util.Set;
+import java.util.Timer;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.regex.Pattern;
+
+import static org.apache.commons.lang.StringUtils.join;
+import static org.apache.hadoop.hive.metastore.MetaStoreUtils.*;
/**
* TODO:pc remove application logic to a separate interface.
@@ -464,9 +461,10 @@ public class HiveMetaStore extends ThriftHiveMetastore {
}
}
- if (hiveConf.getBoolean("hive.metastore.metrics.enabled", false)) {
+ //Start Metrics for Embedded mode
+ if (hiveConf.getBoolVar(ConfVars.METASTORE_METRICS)) {
try {
- Metrics.init();
+ MetricsFactory.init(hiveConf);
} catch (Exception e) {
// log exception, but ignore inability to start
LOG.error("error in Metrics init: " + e.getClass().getName() + " "
@@ -750,11 +748,13 @@ public class HiveMetaStore extends ThriftHiveMetastore {
incrementCounter(function);
logInfo((getIpAddress() == null ? "" : "source:" + getIpAddress() + " ") +
function + extraLogInfo);
- try {
- Metrics.startScope(function);
- } catch (IOException e) {
- LOG.debug("Exception when starting metrics scope"
+ if (hiveConf.getBoolVar(ConfVars.METASTORE_METRICS)) {
+ try {
+ MetricsFactory.getMetricsInstance().startScope(function);
+ } catch (IOException e) {
+ LOG.debug("Exception when starting metrics scope"
+ e.getClass().getName() + " " + e.getMessage(), e);
+ }
}
return function;
}
@@ -792,10 +792,12 @@ public class HiveMetaStore extends ThriftHiveMetastore {
}
private void endFunction(String function, MetaStoreEndFunctionContext context) {
- try {
- Metrics.endScope(function);
- } catch (IOException e) {
- LOG.debug("Exception when closing metrics scope" + e);
+ if (hiveConf.getBoolVar(ConfVars.METASTORE_METRICS)) {
+ try {
+ MetricsFactory.getMetricsInstance().endScope(function);
+ } catch (IOException e) {
+ LOG.debug("Exception when closing metrics scope" + e);
+ }
}
for (MetaStoreEndFunctionListener listener : endFunctionListeners) {
@@ -819,6 +821,14 @@ public class HiveMetaStore extends ThriftHiveMetastore {
threadLocalMS.remove();
}
}
+ if (hiveConf.getBoolVar(ConfVars.METASTORE_METRICS)) {
+ try {
+ MetricsFactory.deInit();
+ } catch (Exception e) {
+ LOG.error("error in Metrics deinit: " + e.getClass().getName() + " "
+ + e.getMessage(), e);
+ }
+ }
logInfo("Metastore shutdown complete.");
}
@@ -5914,6 +5924,16 @@ public class HiveMetaStore extends ThriftHiveMetastore {
}
});
+ //Start Metrics for Standalone (Remote) Mode
+ if (conf.getBoolVar(ConfVars.METASTORE_METRICS)) {
+ try {
+ MetricsFactory.init(conf);
+ } catch (Exception e) {
+ // log exception, but ignore inability to start
+ LOG.error("error in Metrics init: " + e.getClass().getName() + " "
+ + e.getMessage(), e);
+ }
+ }
Lock startLock = new ReentrantLock();
Condition startCondition = startLock.newCondition();
@@ -6104,7 +6124,13 @@ public class HiveMetaStore extends ThriftHiveMetastore {
// Wrap the start of the threads in a catch Throwable loop so that any failures
// don't doom the rest of the metastore.
startLock.lock();
- ShimLoader.getHadoopShims().startPauseMonitor(conf);
+ try {
+ JvmPauseMonitor pauseMonitor = new JvmPauseMonitor(conf);
+ pauseMonitor.start();
+ } catch (Throwable t) {
+ LOG.warn("Could not initiate the JvmPauseMonitor thread." + " GCs and Pauses may not be " +
+ "warned upon.", t);
+ }
try {
// Per the javadocs on Condition, do not depend on the condition alone as a start gate
[4/4] hive git commit: HIVE-10927 : Add number of HMS/HS2 connection
metrics (Szehon, reviewed by Jimmy Xiang)
Posted by sz...@apache.org.
HIVE-10927 : Add number of HMS/HS2 connection metrics (Szehon, reviewed by Jimmy Xiang)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/ad803d79
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/ad803d79
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/ad803d79
Branch: refs/heads/branch-1
Commit: ad803d794a3b242156f12b7c5f310cdc7009ba06
Parents: 5553fbd
Author: Szehon Ho <sz...@cloudera.com>
Authored: Wed Jul 8 11:38:41 2015 -0700
Committer: Szehon Ho <sz...@cloudera.com>
Committed: Fri Jul 10 15:22:57 2015 -0700
----------------------------------------------------------------------
.../hadoop/hive/common/JvmPauseMonitor.java | 7 ++-
.../hive/common/metrics/LegacyMetrics.java | 30 ++++++++-
.../hive/common/metrics/common/Metrics.java | 27 ++++++++
.../common/metrics/common/MetricsConstant.java | 35 +++++++++++
.../common/metrics/common/MetricsVariable.java | 26 ++++++++
.../metrics/metrics2/CodahaleMetrics.java | 58 ++++++++++++++++-
.../org/apache/hadoop/hive/conf/HiveConf.java | 4 +-
.../metrics/metrics2/TestCodahaleMetrics.java | 42 +++++++++++++
.../hive/metastore/TestMetaStoreMetrics.java | 66 +++++++++++++++++---
.../hadoop/hive/metastore/HiveMetaStore.java | 59 ++++++++++++++---
.../hadoop/hive/metastore/ObjectStore.java | 30 ++++++++-
.../service/cli/thrift/ThriftCLIService.java | 21 ++++++-
12 files changed, 378 insertions(+), 27 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/ad803d79/common/src/java/org/apache/hadoop/hive/common/JvmPauseMonitor.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/common/JvmPauseMonitor.java b/common/src/java/org/apache/hadoop/hive/common/JvmPauseMonitor.java
index ec5ac4a..6ffaf94 100644
--- a/common/src/java/org/apache/hadoop/hive/common/JvmPauseMonitor.java
+++ b/common/src/java/org/apache/hadoop/hive/common/JvmPauseMonitor.java
@@ -27,6 +27,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.common.metrics.common.Metrics;
+import org.apache.hadoop.hive.common.metrics.common.MetricsConstant;
import org.apache.hadoop.hive.common.metrics.common.MetricsFactory;
import org.apache.hadoop.util.Daemon;
@@ -186,14 +187,14 @@ public class JvmPauseMonitor {
++numGcWarnThresholdExceeded;
LOG.warn(formatMessage(
extraSleepTime, gcTimesAfterSleep, gcTimesBeforeSleep));
- incrementMetricsCounter("jvm.pause.warn-threshold", 1);
+ incrementMetricsCounter(MetricsConstant.JVM_PAUSE_WARN, 1);
} else if (extraSleepTime > infoThresholdMs) {
++numGcInfoThresholdExceeded;
LOG.info(formatMessage(
extraSleepTime, gcTimesAfterSleep, gcTimesBeforeSleep));
- incrementMetricsCounter("jvm.pause.info-threshold", 1);
+ incrementMetricsCounter(MetricsConstant.JVM_PAUSE_INFO, 1);
}
- incrementMetricsCounter("jvm.pause.extraSleepTime", extraSleepTime);
+ incrementMetricsCounter(MetricsConstant.JVM_EXTRA_SLEEP, extraSleepTime);
totalGcExtraSleepTime += extraSleepTime;
gcTimesBeforeSleep = gcTimesAfterSleep;
}
http://git-wip-us.apache.org/repos/asf/hive/blob/ad803d79/common/src/java/org/apache/hadoop/hive/common/metrics/LegacyMetrics.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/common/metrics/LegacyMetrics.java b/common/src/java/org/apache/hadoop/hive/common/metrics/LegacyMetrics.java
index e811339..52d99e4 100644
--- a/common/src/java/org/apache/hadoop/hive/common/metrics/LegacyMetrics.java
+++ b/common/src/java/org/apache/hadoop/hive/common/metrics/LegacyMetrics.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.hive.common.metrics;
import org.apache.hadoop.hive.common.metrics.common.Metrics;
+import org.apache.hadoop.hive.common.metrics.common.MetricsVariable;
import org.apache.hadoop.hive.conf.HiveConf;
import java.io.IOException;
@@ -162,11 +163,11 @@ public class LegacyMetrics implements Metrics {
mbs.registerMBean(metrics, oname);
}
- public Long incrementCounter(String name) throws IOException{
+ public Long incrementCounter(String name) throws IOException {
return incrementCounter(name,Long.valueOf(1));
}
- public Long incrementCounter(String name, long increment) throws IOException{
+ public Long incrementCounter(String name, long increment) throws IOException {
Long value;
synchronized(metrics) {
if (!metrics.hasKey(name)) {
@@ -180,6 +181,29 @@ public class LegacyMetrics implements Metrics {
return value;
}
+ public Long decrementCounter(String name) throws IOException{
+ return decrementCounter(name, Long.valueOf(1));
+ }
+
+ public Long decrementCounter(String name, long decrement) throws IOException {
+ Long value;
+ synchronized(metrics) {
+ if (!metrics.hasKey(name)) {
+ value = Long.valueOf(decrement);
+ set(name, -value);
+ } else {
+ value = ((Long)get(name)) - decrement;
+ set(name, value);
+ }
+ }
+ return value;
+ }
+
+ @Override
+ public void addGauge(String name, MetricsVariable variable) {
+ //Not implemented.
+ }
+
public void set(String name, Object value) throws IOException{
metrics.put(name,value);
}
@@ -210,6 +234,8 @@ public class LegacyMetrics implements Metrics {
}
}
+
+
/**
* Resets the static context state to initial.
* Used primarily for testing purposes.
http://git-wip-us.apache.org/repos/asf/hive/blob/ad803d79/common/src/java/org/apache/hadoop/hive/common/metrics/common/Metrics.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/common/metrics/common/Metrics.java b/common/src/java/org/apache/hadoop/hive/common/metrics/common/Metrics.java
index 27b69cc..49b2b32 100644
--- a/common/src/java/org/apache/hadoop/hive/common/metrics/common/Metrics.java
+++ b/common/src/java/org/apache/hadoop/hive/common/metrics/common/Metrics.java
@@ -61,4 +61,31 @@ public interface Metrics {
* @throws IOException
*/
public Long incrementCounter(String name, long increment) throws IOException;
+
+
+ /**
+ * Decrements a counter of the given name by 1.
+ * @param name
+ * @return
+ * @throws IOException
+ */
+ public Long decrementCounter(String name) throws IOException;
+
+ /**
+ * Decrements a counter of the given name by "decrement"
+ * @param name
+ * @param decrement
+ * @return
+ * @throws IOException
+ */
+ public Long decrementCounter(String name, long decrement) throws IOException;
+
+
+ /**
+ * Adds a metrics-gauge to track variable. For example, number of open database connections.
+ * @param name name of gauge
+ * @param variable variable to track.
+ * @throws IOException
+ */
+ public void addGauge(String name, final MetricsVariable variable);
}
http://git-wip-us.apache.org/repos/asf/hive/blob/ad803d79/common/src/java/org/apache/hadoop/hive/common/metrics/common/MetricsConstant.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/common/metrics/common/MetricsConstant.java b/common/src/java/org/apache/hadoop/hive/common/metrics/common/MetricsConstant.java
new file mode 100644
index 0000000..d1ebe12
--- /dev/null
+++ b/common/src/java/org/apache/hadoop/hive/common/metrics/common/MetricsConstant.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.common.metrics.common;
+
+/**
+ * This class defines some metrics generated by Hive processes.
+ */
+public class MetricsConstant {
+
+ public static String JVM_PAUSE_INFO = "jvm.pause.info-threshold";
+ public static String JVM_PAUSE_WARN = "jvm.pause.warn-threshold";
+ public static String JVM_EXTRA_SLEEP = "jvm.pause.extraSleepTime";
+
+ public static String OPEN_CONNECTIONS = "open_connections";
+
+ public static String JDO_ACTIVE_TRANSACTIONS = "active_jdo_transactions";
+ public static String JDO_ROLLBACK_TRANSACTIONS = "rollbacked_jdo_transactions";
+ public static String JDO_COMMIT_TRANSACTIONS = "committed_jdo_transactions";
+ public static String JDO_OPEN_TRANSACTIONS = "opened_jdo_transactions";
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/ad803d79/common/src/java/org/apache/hadoop/hive/common/metrics/common/MetricsVariable.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/common/metrics/common/MetricsVariable.java b/common/src/java/org/apache/hadoop/hive/common/metrics/common/MetricsVariable.java
new file mode 100644
index 0000000..8cf6608
--- /dev/null
+++ b/common/src/java/org/apache/hadoop/hive/common/metrics/common/MetricsVariable.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.common.metrics.common;
+
+/**
+ * Interface for metrics variables. <p/> For example a the database service could expose the number of
+ * currently active connections.
+ */
+public interface MetricsVariable<T> {
+ public T getValue();
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hive/blob/ad803d79/common/src/java/org/apache/hadoop/hive/common/metrics/metrics2/CodahaleMetrics.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/common/metrics/metrics2/CodahaleMetrics.java b/common/src/java/org/apache/hadoop/hive/common/metrics/metrics2/CodahaleMetrics.java
index ae353d0..7756f43 100644
--- a/common/src/java/org/apache/hadoop/hive/common/metrics/metrics2/CodahaleMetrics.java
+++ b/common/src/java/org/apache/hadoop/hive/common/metrics/metrics2/CodahaleMetrics.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.hive.common.metrics.metrics2;
import com.codahale.metrics.ConsoleReporter;
import com.codahale.metrics.Counter;
import com.codahale.metrics.ExponentiallyDecayingReservoir;
+import com.codahale.metrics.Gauge;
import com.codahale.metrics.JmxReporter;
import com.codahale.metrics.Metric;
import com.codahale.metrics.MetricRegistry;
@@ -44,6 +45,7 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hive.common.metrics.common.MetricsVariable;
import org.apache.hadoop.hive.conf.HiveConf;
import java.io.BufferedReader;
@@ -52,12 +54,14 @@ import java.io.Closeable;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.lang.management.ManagementFactory;
+import java.net.URI;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TimerTask;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
@@ -73,9 +77,11 @@ public class CodahaleMetrics implements org.apache.hadoop.hive.common.metrics.co
public final MetricRegistry metricRegistry = new MetricRegistry();
private final Lock timersLock = new ReentrantLock();
private final Lock countersLock = new ReentrantLock();
+ private final Lock gaugesLock = new ReentrantLock();
private LoadingCache<String, Timer> timers;
private LoadingCache<String, Counter> counters;
+ private ConcurrentHashMap<String, Gauge> gauges;
private HiveConf conf;
private final Set<Closeable> reporters = new HashSet<Closeable>();
@@ -161,6 +167,7 @@ public class CodahaleMetrics implements org.apache.hadoop.hive.common.metrics.co
}
}
);
+ gauges = new ConcurrentHashMap<String, Gauge>();
//register JVM metrics
registerAll("gc", new GarbageCollectorMetricSet());
@@ -218,7 +225,7 @@ public class CodahaleMetrics implements org.apache.hadoop.hive.common.metrics.co
}
public Long incrementCounter(String name) throws IOException {
- return incrementCounter(name, 1);
+ return incrementCounter(name, 1L);
}
public Long incrementCounter(String name, long increment) throws IOException {
@@ -234,6 +241,45 @@ public class CodahaleMetrics implements org.apache.hadoop.hive.common.metrics.co
}
}
+ public Long decrementCounter(String name) throws IOException {
+ return decrementCounter(name, 1L);
+ }
+
+ public Long decrementCounter(String name, long decrement) throws IOException {
+ String key = name;
+ try {
+ countersLock.lock();
+ counters.get(key).dec(decrement);
+ return counters.get(key).getCount();
+ } catch(ExecutionException ee) {
+ throw new RuntimeException(ee);
+ } finally {
+ countersLock.unlock();
+ }
+ }
+
+ public void addGauge(String name, final MetricsVariable variable) {
+ Gauge gauge = new Gauge() {
+ @Override
+ public Object getValue() {
+ return variable.getValue();
+ }
+ };
+ try {
+ gaugesLock.lock();
+ gauges.put(name, gauge);
+ // Metrics throws an Exception if we don't do this when the key already exists
+ if (metricRegistry.getGauges().containsKey(name)) {
+ LOGGER.warn("A Gauge with name [" + name + "] already exists. "
+ + " The old gauge will be overwritten, but this is not recommended");
+ metricRegistry.remove(name);
+ }
+ metricRegistry.register(name, gauge);
+ } finally {
+ gaugesLock.unlock();
+ }
+ }
+
// This method is necessary to synchronize lazy-creation to the timers.
private Timer getTimer(String name) throws IOException {
String key = name;
@@ -312,11 +358,19 @@ public class CodahaleMetrics implements org.apache.hadoop.hive.common.metrics.co
try {
String json = jsonMapper.writerWithDefaultPrettyPrinter().writeValueAsString(metricRegistry);
Path tmpPath = new Path(pathString + ".tmp");
- FileSystem fs = FileSystem.get(conf);
+ URI tmpPathURI = tmpPath.toUri();
+ FileSystem fs = null;
+ if (tmpPathURI.getScheme() == null && tmpPathURI.getAuthority() == null) {
+ //default local
+ fs = FileSystem.getLocal(conf);
+ } else {
+ fs = FileSystem.get(tmpPathURI, conf);
+ }
fs.delete(tmpPath, true);
bw = new BufferedWriter(new OutputStreamWriter(fs.create(tmpPath, true)));
bw.write(json);
bw.close();
+ fs.setPermission(tmpPath, FsPermission.createImmutable((short) 0644));
Path path = new Path(pathString);
fs.rename(tmpPath, path);
http://git-wip-us.apache.org/repos/asf/hive/blob/ad803d79/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 75c2301..668c8f2 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -1725,8 +1725,8 @@ public class HiveConf extends Configuration {
"Hive metrics subsystem implementation class."),
HIVE_METRICS_REPORTER("hive.service.metrics.reporter", "JSON_FILE, JMX",
"Reporter type for metric class org.apache.hadoop.hive.common.metrics.metrics2.CodahaleMetrics, comma separated list of JMX, CONSOLE, JSON_FILE"),
- HIVE_METRICS_JSON_FILE_LOCATION("hive.service.metrics.file.location", "file:///tmp/report.json",
- "For metric class org.apache.hadoop.hive.common.metrics.metrics2.CodahaleMetrics JSON_FILE reporter, the location of JSON metrics file. " +
+ HIVE_METRICS_JSON_FILE_LOCATION("hive.service.metrics.file.location", "/tmp/report.json",
+ "For metric class org.apache.hadoop.hive.common.metrics.metrics2.CodahaleMetrics JSON_FILE reporter, the location of local JSON metrics file. " +
"This file will get overwritten at every interval."),
HIVE_METRICS_JSON_FILE_INTERVAL("hive.service.metrics.file.frequency", "5s",
new TimeValidator(TimeUnit.MILLISECONDS),
http://git-wip-us.apache.org/repos/asf/hive/blob/ad803d79/common/src/test/org/apache/hadoop/hive/common/metrics/metrics2/TestCodahaleMetrics.java
----------------------------------------------------------------------
diff --git a/common/src/test/org/apache/hadoop/hive/common/metrics/metrics2/TestCodahaleMetrics.java b/common/src/test/org/apache/hadoop/hive/common/metrics/metrics2/TestCodahaleMetrics.java
index 954b388..a3aa549 100644
--- a/common/src/test/org/apache/hadoop/hive/common/metrics/metrics2/TestCodahaleMetrics.java
+++ b/common/src/test/org/apache/hadoop/hive/common/metrics/metrics2/TestCodahaleMetrics.java
@@ -22,7 +22,9 @@ import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.Timer;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.hadoop.hive.common.metrics.common.MetricsConstant;
import org.apache.hadoop.hive.common.metrics.common.MetricsFactory;
+import org.apache.hadoop.hive.common.metrics.common.MetricsVariable;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.shims.ShimLoader;
import org.junit.After;
@@ -135,4 +137,44 @@ public class TestCodahaleMetrics {
JsonNode countNode = methodCounterNode.path("count");
Assert.assertEquals(countNode.asInt(), 5);
}
+
+ class TestMetricsVariable implements MetricsVariable {
+ private int gaugeVal;
+
+ @Override
+ public Object getValue() {
+ return gaugeVal;
+ }
+ public void setValue(int gaugeVal) {
+ this.gaugeVal = gaugeVal;
+ }
+ };
+
+ @Test
+ public void testGauge() throws Exception {
+ TestMetricsVariable testVar = new TestMetricsVariable();
+ testVar.setValue(20);
+
+ MetricsFactory.getInstance().addGauge("gauge1", testVar);
+ Thread.sleep(2000);
+ byte[] jsonData = Files.readAllBytes(Paths.get(jsonReportFile.getAbsolutePath()));
+ ObjectMapper objectMapper = new ObjectMapper();
+
+ JsonNode rootNode = objectMapper.readTree(jsonData);
+ JsonNode gaugesNode = rootNode.path("gauges");
+ JsonNode methodGaugeNode = gaugesNode.path("gauge1");
+ JsonNode countNode = methodGaugeNode.path("value");
+ Assert.assertEquals(countNode.asInt(), testVar.getValue());
+
+ testVar.setValue(40);
+ Thread.sleep(2000);
+
+ jsonData = Files.readAllBytes(Paths.get(jsonReportFile.getAbsolutePath()));
+
+ rootNode = objectMapper.readTree(jsonData);
+ gaugesNode = rootNode.path("gauges");
+ methodGaugeNode = gaugesNode.path("gauge1");
+ countNode = methodGaugeNode.path("value");
+ Assert.assertEquals(countNode.asInt(), testVar.getValue());
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/ad803d79/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestMetaStoreMetrics.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestMetaStoreMetrics.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestMetaStoreMetrics.java
index 25f34d1..c9da95a 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestMetaStoreMetrics.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestMetaStoreMetrics.java
@@ -24,8 +24,10 @@ import org.apache.hadoop.hive.cli.CliSessionState;
import org.apache.hadoop.hive.common.metrics.metrics2.MetricsReporting;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.Driver;
+import org.apache.hadoop.hive.ql.metadata.Hive;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hadoop.hive.shims.ShimLoader;
+import org.apache.hive.service.server.HiveServer2;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
@@ -37,9 +39,11 @@ import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
+import java.util.Map;
/**
* Tests Hive Metastore Metrics.
+ *
*/
public class TestMetaStoreMetrics {
@@ -49,9 +53,8 @@ public class TestMetaStoreMetrics {
private static HiveConf hiveConf;
private static Driver driver;
-
- @Before
- public void before() throws Exception {
+ @BeforeClass
+ public static void before() throws Exception {
int port = MetaStoreUtils.findFreePort();
@@ -86,9 +89,58 @@ public class TestMetaStoreMetrics {
ObjectMapper objectMapper = new ObjectMapper();
JsonNode rootNode = objectMapper.readTree(jsonData);
- JsonNode countersNode = rootNode.path("timers");
- JsonNode methodCounterNode = countersNode.path("api_get_all_databases");
- JsonNode countNode = methodCounterNode.path("count");
- Assert.assertTrue(countNode.asInt() > 0);
+ JsonNode timersNode = rootNode.path("timers");
+ JsonNode methodCounterNode = timersNode.path("api_get_all_databases");
+ JsonNode methodCountNode = methodCounterNode.path("count");
+ Assert.assertTrue(methodCountNode.asInt() > 0);
+
+ JsonNode countersNode = rootNode.path("counters");
+ JsonNode committedJdoTxNode = countersNode.path("committed_jdo_transactions");
+ JsonNode committedCountNode = committedJdoTxNode.path("count");
+ Assert.assertTrue(committedCountNode.asInt() > 0);
+ }
+
+
+ @Test
+ public void testConnections() throws Exception {
+ byte[] jsonData = Files.readAllBytes(Paths.get(jsonReportFile.getAbsolutePath()));
+ ObjectMapper objectMapper = new ObjectMapper();
+ JsonNode rootNode = objectMapper.readTree(jsonData);
+ JsonNode countersNode = rootNode.path("counters");
+ JsonNode openCnxNode = countersNode.path("open_connections");
+ JsonNode openCnxCountNode = openCnxNode.path("count");
+ Assert.assertTrue(openCnxCountNode.asInt() == 1);
+
+ //create a second connection
+ HiveMetaStoreClient msc = new HiveMetaStoreClient(hiveConf);
+ HiveMetaStoreClient msc2 = new HiveMetaStoreClient(hiveConf);
+ Thread.sleep(2000);
+
+ jsonData = Files.readAllBytes(Paths.get(jsonReportFile.getAbsolutePath()));
+ rootNode = objectMapper.readTree(jsonData);
+ countersNode = rootNode.path("counters");
+ openCnxNode = countersNode.path("open_connections");
+ openCnxCountNode = openCnxNode.path("count");
+ Assert.assertTrue(openCnxCountNode.asInt() == 3);
+
+ msc.close();
+ Thread.sleep(2000);
+
+ jsonData = Files.readAllBytes(Paths.get(jsonReportFile.getAbsolutePath()));
+ rootNode = objectMapper.readTree(jsonData);
+ countersNode = rootNode.path("counters");
+ openCnxNode = countersNode.path("open_connections");
+ openCnxCountNode = openCnxNode.path("count");
+ Assert.assertTrue(openCnxCountNode.asInt() == 2);
+
+ msc2.close();
+ Thread.sleep(2000);
+
+ jsonData = Files.readAllBytes(Paths.get(jsonReportFile.getAbsolutePath()));
+ rootNode = objectMapper.readTree(jsonData);
+ countersNode = rootNode.path("counters");
+ openCnxNode = countersNode.path("open_connections");
+ openCnxCountNode = openCnxNode.path("count");
+ Assert.assertTrue(openCnxCountNode.asInt() == 1);
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/ad803d79/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
index 2cbe3c0..580d9fc 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
@@ -39,6 +39,8 @@ import org.apache.hadoop.hive.common.LogUtils.LogInitializationException;
import org.apache.hadoop.hive.common.classification.InterfaceAudience;
import org.apache.hadoop.hive.common.classification.InterfaceStability;
import org.apache.hadoop.hive.common.cli.CommonCliOptions;
+import org.apache.hadoop.hive.common.metrics.common.Metrics;
+import org.apache.hadoop.hive.common.metrics.common.MetricsConstant;
import org.apache.hadoop.hive.common.metrics.common.MetricsFactory;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
@@ -188,8 +190,11 @@ import org.apache.thrift.TException;
import org.apache.thrift.TProcessor;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.protocol.TCompactProtocol;
+import org.apache.thrift.protocol.TProtocol;
import org.apache.thrift.protocol.TProtocolFactory;
+import org.apache.thrift.server.ServerContext;
import org.apache.thrift.server.TServer;
+import org.apache.thrift.server.TServerEventHandler;
import org.apache.thrift.server.TThreadPoolServer;
import org.apache.thrift.transport.TFramedTransport;
import org.apache.thrift.transport.TServerSocket;
@@ -821,14 +826,6 @@ public class HiveMetaStore extends ThriftHiveMetastore {
threadLocalMS.remove();
}
}
- if (hiveConf.getBoolVar(ConfVars.METASTORE_METRICS)) {
- try {
- MetricsFactory.close();
- } catch (Exception e) {
- LOG.error("error in Metrics deinit: " + e.getClass().getName() + " "
- + e.getMessage(), e);
- }
- }
logInfo("Metastore shutdown complete.");
}
@@ -5878,7 +5875,7 @@ public class HiveMetaStore extends ThriftHiveMetastore {
*/
public static void main(String[] args) throws Throwable {
HiveConf.setLoadMetastoreConfig(true);
- HiveConf conf = new HiveConf(HMSHandler.class);
+ final HiveConf conf = new HiveConf(HMSHandler.class);
HiveMetastoreCli cli = new HiveMetastoreCli(conf);
cli.parse(args);
@@ -5921,6 +5918,14 @@ public class HiveMetaStore extends ThriftHiveMetastore {
if (isCliVerbose) {
System.err.println(shutdownMsg);
}
+ if (conf.getBoolVar(ConfVars.METASTORE_METRICS)) {
+ try {
+ MetricsFactory.close();
+ } catch (Exception e) {
+ LOG.error("error in Metrics deinit: " + e.getClass().getName() + " "
+ + e.getMessage(), e);
+ }
+ }
}
});
@@ -6057,6 +6062,42 @@ public class HiveMetaStore extends ThriftHiveMetastore {
.maxWorkerThreads(maxWorkerThreads);
TServer tServer = new TThreadPoolServer(args);
+ TServerEventHandler tServerEventHandler = new TServerEventHandler() {
+ @Override
+ public void preServe() {
+ }
+
+ @Override
+ public ServerContext createContext(TProtocol tProtocol, TProtocol tProtocol1) {
+ try {
+ Metrics metrics = MetricsFactory.getInstance();
+ if (metrics != null) {
+ metrics.incrementCounter(MetricsConstant.OPEN_CONNECTIONS);
+ }
+ } catch (Exception e) {
+ LOG.warn("Error Reporting Metastore open connection to Metrics system", e);
+ }
+ return null;
+ }
+
+ @Override
+ public void deleteContext(ServerContext serverContext, TProtocol tProtocol, TProtocol tProtocol1) {
+ try {
+ Metrics metrics = MetricsFactory.getInstance();
+ if (metrics != null) {
+ metrics.decrementCounter(MetricsConstant.OPEN_CONNECTIONS);
+ }
+ } catch (Exception e) {
+ LOG.warn("Error Reporting Metastore close connection to Metrics system", e);
+ }
+ }
+
+ @Override
+ public void processContext(ServerContext serverContext, TTransport tTransport, TTransport tTransport1) {
+ }
+ };
+
+ tServer.setServerEventHandler(tServerEventHandler);
HMSHandler.LOG.info("Started the new metaserver on port [" + port
+ "]...");
HMSHandler.LOG.info("Options.minWorkerThreads = "
http://git-wip-us.apache.org/repos/asf/hive/blob/ad803d79/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java b/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java
index bce4511..39ab9e7 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java
@@ -62,6 +62,10 @@ import org.apache.hadoop.hive.common.FileUtils;
import org.apache.hadoop.hive.common.ObjectPair;
import org.apache.hadoop.hive.common.classification.InterfaceAudience;
import org.apache.hadoop.hive.common.classification.InterfaceStability;
+import org.apache.hadoop.hive.common.metrics.common.Metrics;
+import org.apache.hadoop.hive.common.metrics.common.MetricsConstant;
+import org.apache.hadoop.hive.common.metrics.common.MetricsFactory;
+import org.apache.hadoop.hive.common.metrics.common.MetricsVariable;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.metastore.api.AggrStats;
@@ -207,7 +211,7 @@ public class ObjectStore implements RawStore, Configurable {
private MetaStoreDirectSql directSql = null;
private PartitionExpressionProxy expressionProxy = null;
private Configuration hiveConf;
- int openTrasactionCalls = 0;
+ private volatile int openTrasactionCalls = 0;
private Transaction currentTransaction = null;
private TXN_STATUS transactionStatus = TXN_STATUS.NO_STATE;
@@ -280,6 +284,17 @@ public class ObjectStore implements RawStore, Configurable {
initialize(propsFromConf);
+ //Add metric for number of active JDO transactions.
+ Metrics metrics = MetricsFactory.getInstance();
+ if (metrics != null) {
+ metrics.addGauge(MetricsConstant.JDO_ACTIVE_TRANSACTIONS, new MetricsVariable() {
+ @Override
+ public Object getValue() {
+ return openTrasactionCalls;
+ }
+ });
+ }
+
String partitionValidationRegex =
hiveConf.get(HiveConf.ConfVars.METASTORE_PARTITION_NAME_WHITELIST_PATTERN.name());
if (partitionValidationRegex != null && partitionValidationRegex.equals("")) {
@@ -453,6 +468,7 @@ public class ObjectStore implements RawStore, Configurable {
boolean result = currentTransaction.isActive();
debugLog("Open transaction: count = " + openTrasactionCalls + ", isActive = " + result);
+ incrementMetricsCount(MetricsConstant.JDO_OPEN_TRANSACTIONS);
return result;
}
@@ -491,6 +507,7 @@ public class ObjectStore implements RawStore, Configurable {
currentTransaction.commit();
}
+ incrementMetricsCount(MetricsConstant.JDO_COMMIT_TRANSACTIONS);
return true;
}
@@ -528,6 +545,7 @@ public class ObjectStore implements RawStore, Configurable {
// from reattaching in future transactions
pm.evictAll();
}
+ incrementMetricsCount(MetricsConstant.JDO_ROLLBACK_TRANSACTIONS);
}
@Override
@@ -7033,6 +7051,16 @@ public class ObjectStore implements RawStore, Configurable {
}
}
+ private void incrementMetricsCount(String name) {
+ try {
+ Metrics metrics = MetricsFactory.getInstance();
+ if (metrics != null) {
+ metrics.incrementCounter(name);
+ }
+ } catch (Exception e) {
+ LOG.warn("Error Reporting JDO operation to Metrics system", e);
+ }
+ }
private void debugLog(String message) {
if (LOG.isDebugEnabled()) {
http://git-wip-us.apache.org/repos/asf/hive/blob/ad803d79/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java
----------------------------------------------------------------------
diff --git a/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java b/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java
index dfb7faa..67bc778 100644
--- a/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java
+++ b/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java
@@ -30,6 +30,9 @@ import javax.security.auth.login.LoginException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.common.metrics.common.Metrics;
+import org.apache.hadoop.hive.common.metrics.common.MetricsConstant;
+import org.apache.hadoop.hive.common.metrics.common.MetricsFactory;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hive.service.AbstractService;
@@ -108,13 +111,29 @@ public abstract class ThriftCLIService extends AbstractService implements TCLISe
@Override
public ServerContext createContext(
TProtocol input, TProtocol output) {
+ Metrics metrics = MetricsFactory.getInstance();
+ if (metrics != null) {
+ try {
+ metrics.incrementCounter(MetricsConstant.OPEN_CONNECTIONS);
+ } catch (Exception e) {
+ LOG.warn("Error Reporting JDO operation to Metrics system", e);
+ }
+ }
return new ThriftCLIServerContext();
}
@Override
public void deleteContext(ServerContext serverContext,
TProtocol input, TProtocol output) {
- ThriftCLIServerContext context = (ThriftCLIServerContext)serverContext;
+ Metrics metrics = MetricsFactory.getInstance();
+ if (metrics != null) {
+ try {
+ metrics.decrementCounter(MetricsConstant.OPEN_CONNECTIONS);
+ } catch (Exception e) {
+ LOG.warn("Error Reporting JDO operation to Metrics system", e);
+ }
+ }
+ ThriftCLIServerContext context = (ThriftCLIServerContext) serverContext;
SessionHandle sessionHandle = context.getSessionHandle();
if (sessionHandle != null) {
LOG.info("Session disconnected without closing properly, close it now");
[2/4] hive git commit: HIVE-10761 : Create codahale-based metrics
system for Hive (Szehon, reviewed by Xuefu)
Posted by sz...@apache.org.
http://git-wip-us.apache.org/repos/asf/hive/blob/bd84e87c/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index ef12325..6735765 100644
--- a/pom.xml
+++ b/pom.xml
@@ -117,6 +117,7 @@
<commons-pool.version>1.5.4</commons-pool.version>
<commons-dbcp.version>1.4</commons-dbcp.version>
<derby.version>10.10.2.0</derby.version>
+ <dropwizard.version>3.1.0</dropwizard.version>
<guava.version>14.0.1</guava.version>
<groovy.version>2.1.6</groovy.version>
<hadoop-20S.version>1.2.1</hadoop-20S.version>
@@ -129,6 +130,8 @@
<httpcomponents.core.version>4.4</httpcomponents.core.version>
<ivy.version>2.4.0</ivy.version>
<jackson.version>1.9.2</jackson.version>
+ <!-- jackson 1 and 2 lines can coexist without issue, as they have different artifactIds -->
+ <jackson.new.version>2.4.2</jackson.new.version>
<javaewah.version>0.3.2</javaewah.version>
<javolution.version>5.5.1</javolution.version>
<jdo-api.version>3.0.1</jdo-api.version>
http://git-wip-us.apache.org/repos/asf/hive/blob/bd84e87c/service/src/java/org/apache/hive/service/server/HiveServer2.java
----------------------------------------------------------------------
diff --git a/service/src/java/org/apache/hive/service/server/HiveServer2.java b/service/src/java/org/apache/hive/service/server/HiveServer2.java
index 54600e6..4fecb3c 100644
--- a/service/src/java/org/apache/hive/service/server/HiveServer2.java
+++ b/service/src/java/org/apache/hive/service/server/HiveServer2.java
@@ -42,15 +42,16 @@ import org.apache.curator.framework.api.CuratorEvent;
import org.apache.curator.framework.api.CuratorEventType;
import org.apache.curator.framework.recipes.nodes.PersistentEphemeralNode;
import org.apache.curator.retry.ExponentialBackoffRetry;
+import org.apache.hadoop.hive.common.JvmPauseMonitor;
import org.apache.hadoop.hive.common.LogUtils;
import org.apache.hadoop.hive.common.LogUtils.LogInitializationException;
import org.apache.hadoop.hive.common.ServerUtils;
+import org.apache.hadoop.hive.common.metrics.common.MetricsFactory;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.ql.exec.spark.session.SparkSessionManagerImpl;
import org.apache.hadoop.hive.ql.exec.tez.TezSessionPoolManager;
import org.apache.hadoop.hive.ql.util.ZooKeeperHiveHelper;
-import org.apache.hadoop.hive.shims.ShimLoader;
import org.apache.hadoop.hive.shims.Utils;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hive.common.util.HiveStringUtils;
@@ -306,6 +307,15 @@ public class HiveServer2 extends CompositeService {
LOG.info("Shutting down HiveServer2");
HiveConf hiveConf = this.getHiveConf();
super.stop();
+ // Shutdown Metrics
+ if (hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_METRICS_ENABLED)) {
+ try {
+ MetricsFactory.getMetricsInstance().deInit();
+ } catch (Exception e) {
+ LOG.error("error in Metrics deinit: " + e.getClass().getName() + " "
+ + e.getMessage(), e);
+ }
+ }
// Remove this server instance from ZooKeeper if dynamic service discovery is set
if (hiveConf != null && hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_SUPPORT_DYNAMIC_SERVICE_DISCOVERY)) {
try {
@@ -347,7 +357,18 @@ public class HiveServer2 extends CompositeService {
server = new HiveServer2();
server.init(hiveConf);
server.start();
- ShimLoader.getHadoopShims().startPauseMonitor(hiveConf);
+
+ if (hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_METRICS_ENABLED)) {
+ MetricsFactory.getMetricsInstance().init(hiveConf);
+ }
+ try {
+ JvmPauseMonitor pauseMonitor = new JvmPauseMonitor(hiveConf);
+ pauseMonitor.start();
+ } catch (Throwable t) {
+ LOG.warn("Could not initiate the JvmPauseMonitor thread." + " GCs and Pauses may not be " +
+ "warned upon.", t);
+ }
+
// If we're supporting dynamic service discovery, we'll add the service uri for this
// HiveServer2 instance to Zookeeper as a znode.
if (hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_SUPPORT_DYNAMIC_SERVICE_DISCOVERY)) {
http://git-wip-us.apache.org/repos/asf/hive/blob/bd84e87c/shims/0.20S/src/main/java/org/apache/hadoop/hive/shims/Hadoop20SShims.java
----------------------------------------------------------------------
diff --git a/shims/0.20S/src/main/java/org/apache/hadoop/hive/shims/Hadoop20SShims.java b/shims/0.20S/src/main/java/org/apache/hadoop/hive/shims/Hadoop20SShims.java
index 6d8166c..ffffcb7 100644
--- a/shims/0.20S/src/main/java/org/apache/hadoop/hive/shims/Hadoop20SShims.java
+++ b/shims/0.20S/src/main/java/org/apache/hadoop/hive/shims/Hadoop20SShims.java
@@ -159,11 +159,6 @@ public class Hadoop20SShims extends HadoopShimsSecure {
}
@Override
- public void startPauseMonitor(Configuration conf) {
- /* no supported */
- }
-
- @Override
public boolean isLocalMode(Configuration conf) {
return "local".equals(getJobLauncherRpcAddress(conf));
}
http://git-wip-us.apache.org/repos/asf/hive/blob/bd84e87c/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java
----------------------------------------------------------------------
diff --git a/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java b/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java
index 19324b8..5ddab98 100644
--- a/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java
+++ b/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java
@@ -212,19 +212,6 @@ public class Hadoop23Shims extends HadoopShimsSecure {
}
@Override
- public void startPauseMonitor(Configuration conf) {
- try {
- Class.forName("org.apache.hadoop.util.JvmPauseMonitor");
- org.apache.hadoop.util.JvmPauseMonitor pauseMonitor = new org.apache.hadoop.util
- .JvmPauseMonitor(conf);
- pauseMonitor.start();
- } catch (Throwable t) {
- LOG.warn("Could not initiate the JvmPauseMonitor thread." + " GCs and Pauses may not be " +
- "warned upon.", t);
- }
- }
-
- @Override
public boolean isLocalMode(Configuration conf) {
return "local".equals(conf.get("mapreduce.framework.name"));
}
http://git-wip-us.apache.org/repos/asf/hive/blob/bd84e87c/shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java
----------------------------------------------------------------------
diff --git a/shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java b/shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java
index b89b4c3..74785e5 100644
--- a/shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java
+++ b/shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java
@@ -146,8 +146,6 @@ public interface HadoopShims {
public JobContext newJobContext(Job job);
- public void startPauseMonitor(Configuration conf);
-
/**
* Check wether MR is configured to run in local-mode
* @param conf