You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ha...@apache.org on 2013/07/09 05:37:15 UTC
svn commit: r1501052 - in /hive/trunk/common/src:
java/org/apache/hadoop/hive/common/metrics/
test/org/apache/hadoop/hive/common/
test/org/apache/hadoop/hive/common/metrics/
Author: hashutosh
Date: Tue Jul 9 03:37:15 2013
New Revision: 1501052
URL: http://svn.apache.org/r1501052
Log:
HIVE-4796 : Increase coverage of package org.apache.hadoop.hive.common.metrics (Ivan Veselovsky)
Added:
hive/trunk/common/src/test/org/apache/hadoop/hive/common/
hive/trunk/common/src/test/org/apache/hadoop/hive/common/metrics/
hive/trunk/common/src/test/org/apache/hadoop/hive/common/metrics/TestMetrics.java
Modified:
hive/trunk/common/src/java/org/apache/hadoop/hive/common/metrics/Metrics.java
hive/trunk/common/src/java/org/apache/hadoop/hive/common/metrics/MetricsMBean.java
hive/trunk/common/src/java/org/apache/hadoop/hive/common/metrics/MetricsMBeanImpl.java
Modified: hive/trunk/common/src/java/org/apache/hadoop/hive/common/metrics/Metrics.java
URL: http://svn.apache.org/viewvc/hive/trunk/common/src/java/org/apache/hadoop/hive/common/metrics/Metrics.java?rev=1501052&r1=1501051&r2=1501052&view=diff
==============================================================================
--- hive/trunk/common/src/java/org/apache/hadoop/hive/common/metrics/Metrics.java (original)
+++ hive/trunk/common/src/java/org/apache/hadoop/hive/common/metrics/Metrics.java Tue Jul 9 03:37:15 2013
@@ -22,6 +22,7 @@ import java.lang.management.ManagementFa
import java.util.HashMap;
import javax.management.MBeanServer;
+import javax.management.MalformedObjectNameException;
import javax.management.ObjectName;
/**
@@ -39,32 +40,32 @@ import javax.management.ObjectName;
*/
public class Metrics {
+ private Metrics() {
+ // block
+ }
+
/**
* MetricsScope : A class that encapsulates an idea of a metered scope.
* Instantiating a named scope and then closing it exposes two counters:
* (i) a "number of calls" counter ( <name>.n ), and
* (ii) a "number of msecs spent between scope open and close" counter. ( <name>.t)
*/
- public class MetricsScope {
+ public static class MetricsScope {
- String name = null;
- boolean isOpen = false;
- Long startTime = null;
- String numCounter = null;
- String timeCounter = null;
- String avgTimeCounter = null;
-
- //disable default ctor - so that it can't be created without a name
- @SuppressWarnings("unused")
- private MetricsScope() {
- }
+ final String name;
+ final String numCounter;
+ final String timeCounter;
+ final String avgTimeCounter;
+
+ private boolean isOpen = false;
+ private Long startTime = null;
/**
* Instantiates a named scope - intended to only be called by Metrics, so locally scoped.
* @param name - name of the variable
* @throws IOException
*/
- MetricsScope(String name) throws IOException {
+ private MetricsScope(String name) throws IOException {
this.name = name;
this.numCounter = name + ".n";
this.timeCounter = name + ".t";
@@ -128,28 +129,36 @@ public class Metrics {
}
+ private static final MetricsMBean metrics = new MetricsMBeanImpl();
- static MetricsMBean metrics = new MetricsMBeanImpl();
-
- static ThreadLocal<HashMap<String, MetricsScope>> threadLocalScopes
+ private static final ObjectName oname;
+ static {
+ try {
+ oname = new ObjectName(
+ "org.apache.hadoop.hive.common.metrics:type=MetricsMBean");
+ } catch (MalformedObjectNameException mone) {
+ throw new RuntimeException(mone);
+ }
+ }
+
+
+ private static final ThreadLocal<HashMap<String, MetricsScope>> threadLocalScopes
= new ThreadLocal<HashMap<String,MetricsScope>>() {
@Override
- protected synchronized HashMap<String,MetricsScope> initialValue() {
+ protected HashMap<String,MetricsScope> initialValue() {
return new HashMap<String,MetricsScope>();
}
};
- static boolean initialized = false;
-
- static Metrics m = new Metrics();
+ private static boolean initialized = false;
public static void init() throws Exception {
- if (!initialized) {
- MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
- ObjectName oname = new ObjectName(
- "org.apache.hadoop.hive.common.metrics:type=MetricsMBean");
- mbs.registerMBean(metrics, oname);
- initialized = true;
+ synchronized (metrics) {
+ if (!initialized) {
+ MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
+ mbs.registerMBean(metrics, oname);
+ initialized = true;
+ }
}
}
@@ -181,9 +190,7 @@ public class Metrics {
if (!initialized) {
return;
}
- synchronized(metrics) {
- metrics.put(name,value);
- }
+ metrics.put(name,value);
}
public static Object get(String name) throws IOException{
@@ -200,7 +207,7 @@ public class Metrics {
if (threadLocalScopes.get().containsKey(name)) {
threadLocalScopes.get().get(name).open();
} else {
- threadLocalScopes.get().put(name, m.new MetricsScope(name));
+ threadLocalScopes.get().put(name, new MetricsScope(name));
}
return threadLocalScopes.get().get(name);
}
@@ -225,4 +232,22 @@ public class Metrics {
}
}
+ /**
+ * Resets the static context state to initial.
+ * Used primarily for testing purposes.
+ *
+ * Note that threadLocalScopes ThreadLocal is *not* cleared in this call.
+ */
+ static void uninit() throws Exception {
+ synchronized (metrics) {
+ if (initialized) {
+ MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
+ if (mbs.isRegistered(oname)) {
+ mbs.unregisterMBean(oname);
+ }
+ metrics.clear();
+ initialized = false;
+ }
+ }
+ }
}
Modified: hive/trunk/common/src/java/org/apache/hadoop/hive/common/metrics/MetricsMBean.java
URL: http://svn.apache.org/viewvc/hive/trunk/common/src/java/org/apache/hadoop/hive/common/metrics/MetricsMBean.java?rev=1501052&r1=1501051&r2=1501052&view=diff
==============================================================================
--- hive/trunk/common/src/java/org/apache/hadoop/hive/common/metrics/MetricsMBean.java (original)
+++ hive/trunk/common/src/java/org/apache/hadoop/hive/common/metrics/MetricsMBean.java Tue Jul 9 03:37:15 2013
@@ -47,5 +47,10 @@ public interface MetricsMBean extends Dy
* @throws Exception
*/
public abstract Object get(String name) throws IOException;
+
+ /**
+ * Removes all the keys and values from this MetricsMBean.
+ */
+ void clear();
}
Modified: hive/trunk/common/src/java/org/apache/hadoop/hive/common/metrics/MetricsMBeanImpl.java
URL: http://svn.apache.org/viewvc/hive/trunk/common/src/java/org/apache/hadoop/hive/common/metrics/MetricsMBeanImpl.java?rev=1501052&r1=1501051&r2=1501052&view=diff
==============================================================================
--- hive/trunk/common/src/java/org/apache/hadoop/hive/common/metrics/MetricsMBeanImpl.java (original)
+++ hive/trunk/common/src/java/org/apache/hadoop/hive/common/metrics/MetricsMBeanImpl.java Tue Jul 9 03:37:15 2013
@@ -36,15 +36,15 @@ import javax.management.ReflectionExcept
public class MetricsMBeanImpl implements MetricsMBean {
- Map<String,Object> metricsMap = new HashMap<String,Object>();
+ private final Map<String,Object> metricsMap = new HashMap<String,Object>();
- MBeanAttributeInfo[] attributeInfos;
- boolean dirtyAttributeInfoCache = true;
+ private MBeanAttributeInfo[] attributeInfos;
+ private boolean dirtyAttributeInfoCache = true;
- MBeanConstructorInfo[] ctors = null;
- MBeanOperationInfo[] ops = {new MBeanOperationInfo("reset",
+ private static final MBeanConstructorInfo[] ctors = null;
+ private static final MBeanOperationInfo[] ops = {new MBeanOperationInfo("reset",
"Sets the values of all Attributes to 0", null, "void", MBeanOperationInfo.ACTION)};
- MBeanNotificationInfo[] notifs = null;
+ private static final MBeanNotificationInfo[] notifs = null;
@Override
public Object getAttribute(String arg0) throws AttributeNotFoundException,
@@ -77,7 +77,7 @@ public class MetricsMBeanImpl implements
int i = 0;
for (String key : metricsMap.keySet()) {
attributeInfos[i] = new MBeanAttributeInfo(
- key, metricsMap.get(key).getClass().getName(), key, true, false, false);
+ key, metricsMap.get(key).getClass().getName(), key, true, true/*writable*/, false);
i++;
}
dirtyAttributeInfoCache = false;
@@ -129,12 +129,14 @@ public class MetricsMBeanImpl implements
return attributesSet;
}
+ @Override
public boolean hasKey(String name) {
synchronized(metricsMap) {
return metricsMap.containsKey(name);
}
}
+ @Override
public void put(String name, Object value) throws IOException {
synchronized(metricsMap) {
if (!metricsMap.containsKey(name)) {
@@ -144,6 +146,7 @@ public class MetricsMBeanImpl implements
}
}
+ @Override
public Object get(String name) throws IOException {
try {
return getAttribute(name);
@@ -163,4 +166,13 @@ public class MetricsMBeanImpl implements
}
}
}
+
+ @Override
+ public void clear() {
+ synchronized(metricsMap) {
+ attributeInfos = null;
+ dirtyAttributeInfoCache = true;
+ metricsMap.clear();
+ }
+ }
}
Added: hive/trunk/common/src/test/org/apache/hadoop/hive/common/metrics/TestMetrics.java
URL: http://svn.apache.org/viewvc/hive/trunk/common/src/test/org/apache/hadoop/hive/common/metrics/TestMetrics.java?rev=1501052&view=auto
==============================================================================
--- hive/trunk/common/src/test/org/apache/hadoop/hive/common/metrics/TestMetrics.java (added)
+++ hive/trunk/common/src/test/org/apache/hadoop/hive/common/metrics/TestMetrics.java Tue Jul 9 03:37:15 2013
@@ -0,0 +1,284 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.common.metrics;
+
+import java.io.IOException;
+import java.lang.management.ManagementFactory;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import javax.management.Attribute;
+import javax.management.MBeanAttributeInfo;
+import javax.management.MBeanInfo;
+import javax.management.MBeanOperationInfo;
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+
+import org.apache.hadoop.hive.common.metrics.Metrics.MetricsScope;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import static org.junit.Assert.*;
+
+public class TestMetrics {
+
+ private static final String scopeName = "foo";
+ private static final long periodMs = 50L;
+
+ @Before
+ public void before() throws Exception {
+ Metrics.uninit();
+ Metrics.init();
+ }
+
+ @After
+ public void after() throws Exception {
+ Metrics.uninit();
+ }
+
+ @Test
+ public void testMetricsMBean() throws Exception {
+ MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
+ final ObjectName oname = new ObjectName(
+ "org.apache.hadoop.hive.common.metrics:type=MetricsMBean");
+ MBeanInfo mBeanInfo = mbs.getMBeanInfo(oname);
+ // check implementation class:
+ assertEquals(MetricsMBeanImpl.class.getName(), mBeanInfo.getClassName());
+
+ // check reset operation:
+ MBeanOperationInfo[] oops = mBeanInfo.getOperations();
+ boolean resetFound = false;
+ for (MBeanOperationInfo op : oops) {
+ if ("reset".equals(op.getName())) {
+ resetFound = true;
+ break;
+ }
+ }
+ assertTrue(resetFound);
+
+ // add metric with a non-null value:
+ Attribute attr = new Attribute("fooMetric", Long.valueOf(-77));
+ mbs.setAttribute(oname, attr);
+
+ mBeanInfo = mbs.getMBeanInfo(oname);
+ MBeanAttributeInfo[] attrinuteInfos = mBeanInfo.getAttributes();
+ assertEquals(1, attrinuteInfos.length);
+ boolean attrFound = false;
+ for (MBeanAttributeInfo info : attrinuteInfos) {
+ if ("fooMetric".equals(info.getName())) {
+ assertEquals("java.lang.Long", info.getType());
+ assertTrue(info.isReadable());
+ assertTrue(info.isWritable());
+ assertFalse(info.isIs());
+
+ attrFound = true;
+ break;
+ }
+ }
+ assertTrue(attrFound);
+
+ // check metric value:
+ Object v = mbs.getAttribute(oname, "fooMetric");
+ assertEquals(Long.valueOf(-77), v);
+
+ // reset the bean:
+ Object result = mbs.invoke(oname, "reset", new Object[0], new String[0]);
+ assertNull(result);
+
+ // the metric value must be zeroed:
+ v = mbs.getAttribute(oname, "fooMetric");
+ assertEquals(Long.valueOf(0), v);
+ }
+
+ private <T> void expectIOE(Callable<T> c) throws Exception {
+ try {
+ T t = c.call();
+ fail("IOE expected but ["+t+"] was returned.");
+ } catch (IOException ioe) {
+ // ok, expected
+ }
+ }
+
+ @Test
+ public void testScopeSingleThread() throws Exception {
+ final MetricsScope fooScope = Metrics.startScope(scopeName);
+ // the time and number counters become available only after the 1st
+ // scope close:
+ expectIOE(new Callable<Long>() {
+ @Override
+ public Long call() throws Exception {
+ Long num = fooScope.getNumCounter();
+ return num;
+ }
+ });
+ expectIOE(new Callable<Long>() {
+ @Override
+ public Long call() throws Exception {
+ Long time = fooScope.getTimeCounter();
+ return time;
+ }
+ });
+ // cannot open scope that is already open:
+ expectIOE(new Callable<Void>() {
+ @Override
+ public Void call() throws Exception {
+ fooScope.open();
+ return null;
+ }
+ });
+
+ assertSame(fooScope, Metrics.getScope(scopeName));
+ Thread.sleep(periodMs+1);
+ // 1st close:
+ // closing of open scope should be ok:
+ Metrics.endScope(scopeName);
+ expectIOE(new Callable<Void>() {
+ @Override
+ public Void call() throws Exception {
+ Metrics.endScope(scopeName); // closing of closed scope not allowed
+ return null;
+ }
+ });
+
+ assertEquals(Long.valueOf(1), fooScope.getNumCounter());
+ final long t1 = fooScope.getTimeCounter().longValue();
+ assertTrue(t1 > periodMs);
+
+ assertSame(fooScope, Metrics.getScope(scopeName));
+
+ // opening allowed after closing:
+ Metrics.startScope(scopeName);
+ // opening of already open scope not allowed:
+ expectIOE(new Callable<Void>() {
+ @Override
+ public Void call() throws Exception {
+ Metrics.startScope(scopeName);
+ return null;
+ }
+ });
+
+ assertEquals(Long.valueOf(1), fooScope.getNumCounter());
+ assertEquals(t1, fooScope.getTimeCounter().longValue());
+
+ assertSame(fooScope, Metrics.getScope(scopeName));
+ Thread.sleep(periodMs + 1);
+ // Reopening (close + open) allowed in opened state:
+ fooScope.reopen();
+
+ assertEquals(Long.valueOf(2), fooScope.getNumCounter());
+ assertTrue(fooScope.getTimeCounter().longValue() > 2 * periodMs);
+
+ Thread.sleep(periodMs + 1);
+ // 3rd close:
+ fooScope.close();
+
+ assertEquals(Long.valueOf(3), fooScope.getNumCounter());
+ assertTrue(fooScope.getTimeCounter().longValue() > 3 * periodMs);
+ Double avgT = (Double)Metrics.get("foo.avg_t");
+ assertTrue(avgT.doubleValue() > periodMs);
+ }
+
+ @Test
+ public void testScopeConcurrency() throws Exception {
+ final int threads = 10;
+ ExecutorService executorService = Executors.newFixedThreadPool(threads);
+ for (int i=0; i<threads; i++) {
+ final int n = i;
+ executorService.submit(new Callable<Void>() {
+ @Override
+ public Void call() throws Exception {
+ testScopeImpl(n);
+ return null;
+ }
+ });
+ }
+ executorService.shutdown();
+ assertTrue(executorService.awaitTermination(periodMs * 3 * threads, TimeUnit.MILLISECONDS));
+
+ final MetricsScope fooScope = Metrics.getScope(scopeName);
+ assertEquals(Long.valueOf(3 * threads), fooScope.getNumCounter());
+ assertTrue(fooScope.getTimeCounter().longValue() > 3 * periodMs * threads);
+ Double avgT = (Double)Metrics.get("foo.avg_t");
+ assertTrue(avgT.doubleValue() > periodMs);
+ }
+
+ void testScopeImpl(int n) throws Exception {
+ final MetricsScope fooScope = Metrics.startScope(scopeName);
+ // cannot open scope that is already open:
+ expectIOE(new Callable<Void>() {
+ @Override
+ public Void call() throws Exception {
+ fooScope.open();
+ return null;
+ }
+ });
+
+ assertSame(fooScope, Metrics.getScope(scopeName));
+ Thread.sleep(periodMs+1);
+ // 1st close:
+ Metrics.endScope(scopeName); // closing of open scope should be ok.
+
+ assertTrue(fooScope.getNumCounter().longValue() >= 1);
+ final long t1 = fooScope.getTimeCounter().longValue();
+ assertTrue(t1 > periodMs);
+
+ expectIOE(new Callable<Void>() {
+ @Override
+ public Void call() throws Exception {
+ Metrics.endScope(scopeName); // closing of closed scope not allowed
+ return null;
+ }
+ });
+
+ assertSame(fooScope, Metrics.getScope(scopeName));
+
+ // opening allowed after closing:
+ Metrics.startScope(scopeName);
+
+ assertTrue(fooScope.getNumCounter().longValue() >= 1);
+ assertTrue(fooScope.getTimeCounter().longValue() >= t1);
+
+ // opening of already open scope not allowed:
+ expectIOE(new Callable<Void>() {
+ @Override
+ public Void call() throws Exception {
+ Metrics.startScope(scopeName);
+ return null;
+ }
+ });
+
+ assertSame(fooScope, Metrics.getScope(scopeName));
+ Thread.sleep(periodMs + 1);
+ // Reopening (close + open) allowed in opened state:
+ fooScope.reopen();
+
+ assertTrue(fooScope.getNumCounter().longValue() >= 2);
+ assertTrue(fooScope.getTimeCounter().longValue() > 2 * periodMs);
+
+ Thread.sleep(periodMs + 1);
+ // 3rd close:
+ fooScope.close();
+
+ assertTrue(fooScope.getNumCounter().longValue() >= 3);
+ assertTrue(fooScope.getTimeCounter().longValue() > 3 * periodMs);
+ Double avgT = (Double)Metrics.get("foo.avg_t");
+ assertTrue(avgT.doubleValue() > periodMs);
+ }
+}