You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ha...@apache.org on 2013/07/09 05:37:15 UTC

svn commit: r1501052 - in /hive/trunk/common/src: java/org/apache/hadoop/hive/common/metrics/ test/org/apache/hadoop/hive/common/ test/org/apache/hadoop/hive/common/metrics/

Author: hashutosh
Date: Tue Jul  9 03:37:15 2013
New Revision: 1501052

URL: http://svn.apache.org/r1501052
Log:
HIVE-4796 : Increase coverage of package org.apache.hadoop.hive.common.metrics (Ivan Veselovsky)

Added:
    hive/trunk/common/src/test/org/apache/hadoop/hive/common/
    hive/trunk/common/src/test/org/apache/hadoop/hive/common/metrics/
    hive/trunk/common/src/test/org/apache/hadoop/hive/common/metrics/TestMetrics.java
Modified:
    hive/trunk/common/src/java/org/apache/hadoop/hive/common/metrics/Metrics.java
    hive/trunk/common/src/java/org/apache/hadoop/hive/common/metrics/MetricsMBean.java
    hive/trunk/common/src/java/org/apache/hadoop/hive/common/metrics/MetricsMBeanImpl.java

Modified: hive/trunk/common/src/java/org/apache/hadoop/hive/common/metrics/Metrics.java
URL: http://svn.apache.org/viewvc/hive/trunk/common/src/java/org/apache/hadoop/hive/common/metrics/Metrics.java?rev=1501052&r1=1501051&r2=1501052&view=diff
==============================================================================
--- hive/trunk/common/src/java/org/apache/hadoop/hive/common/metrics/Metrics.java (original)
+++ hive/trunk/common/src/java/org/apache/hadoop/hive/common/metrics/Metrics.java Tue Jul  9 03:37:15 2013
@@ -22,6 +22,7 @@ import java.lang.management.ManagementFa
 import java.util.HashMap;
 
 import javax.management.MBeanServer;
+import javax.management.MalformedObjectNameException;
 import javax.management.ObjectName;
 
 /**
@@ -39,32 +40,32 @@ import javax.management.ObjectName;
  */
 public class Metrics {
 
+  private Metrics() {
+    // block
+  }
+  
   /**
    * MetricsScope : A class that encapsulates an idea of a metered scope.
    * Instantiating a named scope and then closing it exposes two counters:
    *   (i) a "number of calls" counter ( <name>.n ), and
    *  (ii) a "number of msecs spent between scope open and close" counter. ( <name>.t)
    */
-  public class MetricsScope {
+  public static class MetricsScope {
 
-    String name = null;
-    boolean isOpen = false;
-    Long startTime = null;
-    String numCounter = null;
-    String timeCounter = null;
-    String avgTimeCounter = null;
-
-    //disable default ctor - so that it can't be created without a name
-    @SuppressWarnings("unused")
-    private MetricsScope() {
-    }
+    final String name;
+    final String numCounter;
+    final String timeCounter;
+    final String avgTimeCounter;
+    
+    private boolean isOpen = false;
+    private Long startTime = null;
 
     /**
      * Instantiates a named scope - intended to only be called by Metrics, so locally scoped.
      * @param name - name of the variable
      * @throws IOException
      */
-    MetricsScope(String name) throws IOException {
+    private MetricsScope(String name) throws IOException {
       this.name = name;
       this.numCounter = name + ".n";
       this.timeCounter = name + ".t";
@@ -128,28 +129,36 @@ public class Metrics {
 
   }
 
+  private static final MetricsMBean metrics = new MetricsMBeanImpl();
 
-  static MetricsMBean metrics = new MetricsMBeanImpl();
-
-  static ThreadLocal<HashMap<String, MetricsScope>> threadLocalScopes
+  private static final ObjectName oname;
+  static {
+    try {
+      oname = new ObjectName(
+          "org.apache.hadoop.hive.common.metrics:type=MetricsMBean");      
+    } catch (MalformedObjectNameException mone) {
+      throw new RuntimeException(mone);
+    }
+  }
+  
+  
+  private static final ThreadLocal<HashMap<String, MetricsScope>> threadLocalScopes
     = new ThreadLocal<HashMap<String,MetricsScope>>() {
     @Override
-    protected synchronized HashMap<String,MetricsScope> initialValue() {
+    protected HashMap<String,MetricsScope> initialValue() {
       return new HashMap<String,MetricsScope>();
     }
   };
 
-  static boolean initialized = false;
-
-  static Metrics m = new Metrics();
+  private static boolean initialized = false;
 
   public static void init() throws Exception {
-    if (!initialized) {
-      MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
-      ObjectName oname = new ObjectName(
-        "org.apache.hadoop.hive.common.metrics:type=MetricsMBean");
-      mbs.registerMBean(metrics, oname);
-      initialized = true;
+    synchronized (metrics) {
+      if (!initialized) {
+        MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
+        mbs.registerMBean(metrics, oname);
+        initialized = true;
+      }
     }
   }
 
@@ -181,9 +190,7 @@ public class Metrics {
     if (!initialized) {
       return;
     }
-    synchronized(metrics) {
-      metrics.put(name,value);
-    }
+    metrics.put(name,value);
   }
 
   public static Object get(String name) throws IOException{
@@ -200,7 +207,7 @@ public class Metrics {
     if (threadLocalScopes.get().containsKey(name)) {
       threadLocalScopes.get().get(name).open();
     } else {
-      threadLocalScopes.get().put(name, m.new MetricsScope(name));
+      threadLocalScopes.get().put(name, new MetricsScope(name));
     }
     return threadLocalScopes.get().get(name);
   }
@@ -225,4 +232,22 @@ public class Metrics {
     }
   }
 
+  /**
+   * Resets the static context state to initial.
+   * Used primarily for testing purposes.
+   * 
+   * Note that threadLocalScopes ThreadLocal is *not* cleared in this call.
+   */
+  static void uninit() throws Exception {
+    synchronized (metrics) {
+      if (initialized) {
+        MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
+        if (mbs.isRegistered(oname)) {
+          mbs.unregisterMBean(oname);
+        }
+        metrics.clear();
+        initialized = false;
+      }
+    }
+  }
 }

Modified: hive/trunk/common/src/java/org/apache/hadoop/hive/common/metrics/MetricsMBean.java
URL: http://svn.apache.org/viewvc/hive/trunk/common/src/java/org/apache/hadoop/hive/common/metrics/MetricsMBean.java?rev=1501052&r1=1501051&r2=1501052&view=diff
==============================================================================
--- hive/trunk/common/src/java/org/apache/hadoop/hive/common/metrics/MetricsMBean.java (original)
+++ hive/trunk/common/src/java/org/apache/hadoop/hive/common/metrics/MetricsMBean.java Tue Jul  9 03:37:15 2013
@@ -47,5 +47,10 @@ public interface MetricsMBean extends Dy
      * @throws Exception
      */
     public abstract Object get(String name) throws IOException;
+    
 
+    /**
+     * Removes all the keys and values from this MetricsMBean. 
+     */
+    void clear();
 }

Modified: hive/trunk/common/src/java/org/apache/hadoop/hive/common/metrics/MetricsMBeanImpl.java
URL: http://svn.apache.org/viewvc/hive/trunk/common/src/java/org/apache/hadoop/hive/common/metrics/MetricsMBeanImpl.java?rev=1501052&r1=1501051&r2=1501052&view=diff
==============================================================================
--- hive/trunk/common/src/java/org/apache/hadoop/hive/common/metrics/MetricsMBeanImpl.java (original)
+++ hive/trunk/common/src/java/org/apache/hadoop/hive/common/metrics/MetricsMBeanImpl.java Tue Jul  9 03:37:15 2013
@@ -36,15 +36,15 @@ import javax.management.ReflectionExcept
 
 public class MetricsMBeanImpl implements  MetricsMBean {
 
-    Map<String,Object> metricsMap = new HashMap<String,Object>();
+    private final Map<String,Object> metricsMap = new HashMap<String,Object>();
 
-    MBeanAttributeInfo[] attributeInfos;
-    boolean dirtyAttributeInfoCache = true;
+    private MBeanAttributeInfo[] attributeInfos;
+    private boolean dirtyAttributeInfoCache = true;
 
-    MBeanConstructorInfo[] ctors = null;
-    MBeanOperationInfo[] ops = {new MBeanOperationInfo("reset",
+    private static final MBeanConstructorInfo[] ctors = null;
+    private static final MBeanOperationInfo[] ops = {new MBeanOperationInfo("reset",
         "Sets the values of all Attributes to 0", null, "void", MBeanOperationInfo.ACTION)};
-    MBeanNotificationInfo[] notifs = null;
+    private static final MBeanNotificationInfo[] notifs = null;
 
     @Override
     public Object getAttribute(String arg0) throws AttributeNotFoundException,
@@ -77,7 +77,7 @@ public class MetricsMBeanImpl implements
             int i = 0;
             for (String key : metricsMap.keySet()) {
               attributeInfos[i] = new MBeanAttributeInfo(
-                  key, metricsMap.get(key).getClass().getName(), key, true, false, false);
+                  key, metricsMap.get(key).getClass().getName(), key, true, true/*writable*/, false);
               i++;
             }
             dirtyAttributeInfoCache = false;
@@ -129,12 +129,14 @@ public class MetricsMBeanImpl implements
         return attributesSet;
     }
 
+    @Override
     public boolean hasKey(String name) {
       synchronized(metricsMap) {
         return metricsMap.containsKey(name);
       }
     }
 
+    @Override
     public void put(String name, Object value) throws IOException {
       synchronized(metricsMap) {
         if (!metricsMap.containsKey(name)) {
@@ -144,6 +146,7 @@ public class MetricsMBeanImpl implements
       }
     }
 
+    @Override
     public Object get(String name) throws IOException {
         try {
           return getAttribute(name);
@@ -163,4 +166,13 @@ public class MetricsMBeanImpl implements
         }
       }
     }
+    
+    @Override
+    public void clear() {
+      synchronized(metricsMap) {
+        attributeInfos = null;
+        dirtyAttributeInfoCache = true;
+        metricsMap.clear();
+      }
+    }
 }

Added: hive/trunk/common/src/test/org/apache/hadoop/hive/common/metrics/TestMetrics.java
URL: http://svn.apache.org/viewvc/hive/trunk/common/src/test/org/apache/hadoop/hive/common/metrics/TestMetrics.java?rev=1501052&view=auto
==============================================================================
--- hive/trunk/common/src/test/org/apache/hadoop/hive/common/metrics/TestMetrics.java (added)
+++ hive/trunk/common/src/test/org/apache/hadoop/hive/common/metrics/TestMetrics.java Tue Jul  9 03:37:15 2013
@@ -0,0 +1,284 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.common.metrics;
+
+import java.io.IOException;
+import java.lang.management.ManagementFactory;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import javax.management.Attribute;
+import javax.management.MBeanAttributeInfo;
+import javax.management.MBeanInfo;
+import javax.management.MBeanOperationInfo;
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+
+import org.apache.hadoop.hive.common.metrics.Metrics.MetricsScope;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import static org.junit.Assert.*;
+
+public class TestMetrics {
+
+  private static final String scopeName = "foo";
+  private static final long periodMs = 50L;
+
+  @Before
+  public void before() throws Exception {
+    Metrics.uninit();
+    Metrics.init();
+  }
+  
+  @After
+  public void after() throws Exception {
+    Metrics.uninit();
+  }
+  
+  @Test
+  public void testMetricsMBean() throws Exception {
+    MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
+    final ObjectName oname = new ObjectName(
+        "org.apache.hadoop.hive.common.metrics:type=MetricsMBean");
+    MBeanInfo mBeanInfo = mbs.getMBeanInfo(oname);
+    // check implementation class:
+    assertEquals(MetricsMBeanImpl.class.getName(), mBeanInfo.getClassName());
+
+    // check reset operation:
+    MBeanOperationInfo[] oops = mBeanInfo.getOperations();
+    boolean resetFound = false;
+    for (MBeanOperationInfo op : oops) {
+      if ("reset".equals(op.getName())) {
+        resetFound = true;
+        break;
+      }
+    }
+    assertTrue(resetFound);
+
+    // add metric with a non-null value:
+    Attribute attr = new Attribute("fooMetric", Long.valueOf(-77));
+    mbs.setAttribute(oname, attr);
+
+    mBeanInfo = mbs.getMBeanInfo(oname);
+    MBeanAttributeInfo[] attrinuteInfos = mBeanInfo.getAttributes();
+    assertEquals(1, attrinuteInfos.length);
+    boolean attrFound = false;
+    for (MBeanAttributeInfo info : attrinuteInfos) {
+      if ("fooMetric".equals(info.getName())) {
+        assertEquals("java.lang.Long", info.getType());
+        assertTrue(info.isReadable());
+        assertTrue(info.isWritable());
+        assertFalse(info.isIs());
+
+        attrFound = true;
+        break;
+      }
+    }
+    assertTrue(attrFound);
+
+    // check metric value:
+    Object v = mbs.getAttribute(oname, "fooMetric");
+    assertEquals(Long.valueOf(-77), v);
+
+    // reset the bean:
+    Object result = mbs.invoke(oname, "reset", new Object[0], new String[0]);
+    assertNull(result);
+
+    // the metric value must be zeroed:
+    v = mbs.getAttribute(oname, "fooMetric");
+    assertEquals(Long.valueOf(0), v);
+  }
+  
+  private <T> void expectIOE(Callable<T> c) throws Exception {
+    try {
+      T t = c.call();
+      fail("IOE expected but ["+t+"] was returned.");
+    } catch (IOException ioe) {
+      // ok, expected
+    } 
+  }
+
+  @Test
+  public void testScopeSingleThread() throws Exception {
+    final MetricsScope fooScope = Metrics.startScope(scopeName);
+    // the time and number counters become available only after the 1st 
+    // scope close:
+    expectIOE(new Callable<Long>() {
+      @Override
+      public Long call() throws Exception {
+        Long num = fooScope.getNumCounter();
+        return num;
+      }
+    });
+    expectIOE(new Callable<Long>() {
+      @Override
+      public Long call() throws Exception {
+        Long time = fooScope.getTimeCounter();
+        return time;
+      }
+    });
+    // cannot open scope that is already open:
+    expectIOE(new Callable<Void>() {
+      @Override
+      public Void call() throws Exception {
+        fooScope.open();
+        return null;
+      }
+    });
+    
+    assertSame(fooScope, Metrics.getScope(scopeName));
+    Thread.sleep(periodMs+1);
+    // 1st close:
+    // closing of open scope should be ok:
+    Metrics.endScope(scopeName); 
+    expectIOE(new Callable<Void>() {
+      @Override
+      public Void call() throws Exception {
+        Metrics.endScope(scopeName); // closing of closed scope not allowed
+        return null;
+      }
+    });
+    
+    assertEquals(Long.valueOf(1), fooScope.getNumCounter());
+    final long t1 = fooScope.getTimeCounter().longValue(); 
+    assertTrue(t1 > periodMs);
+    
+    assertSame(fooScope, Metrics.getScope(scopeName));
+    
+   // opening allowed after closing:
+    Metrics.startScope(scopeName);
+    // opening of already open scope not allowed:
+    expectIOE(new Callable<Void>() {
+      @Override
+      public Void call() throws Exception {
+        Metrics.startScope(scopeName); 
+        return null;
+      }
+    });
+    
+    assertEquals(Long.valueOf(1), fooScope.getNumCounter());
+    assertEquals(t1, fooScope.getTimeCounter().longValue());
+    
+    assertSame(fooScope, Metrics.getScope(scopeName));
+    Thread.sleep(periodMs + 1);
+    // Reopening (close + open) allowed in opened state: 
+    fooScope.reopen();
+
+    assertEquals(Long.valueOf(2), fooScope.getNumCounter());
+    assertTrue(fooScope.getTimeCounter().longValue() > 2 * periodMs);
+    
+    Thread.sleep(periodMs + 1);
+    // 3rd close:
+    fooScope.close();
+    
+    assertEquals(Long.valueOf(3), fooScope.getNumCounter());
+    assertTrue(fooScope.getTimeCounter().longValue() > 3 * periodMs);
+    Double avgT = (Double)Metrics.get("foo.avg_t");
+    assertTrue(avgT.doubleValue() > periodMs);
+  }
+  
+  @Test
+  public void testScopeConcurrency() throws Exception {
+    final int threads = 10;
+    ExecutorService executorService = Executors.newFixedThreadPool(threads);
+    for (int i=0; i<threads; i++) {
+      final int n = i;
+      executorService.submit(new Callable<Void>() {
+        @Override
+        public Void call() throws Exception {
+          testScopeImpl(n);
+          return null; 
+        }
+      });
+    }
+    executorService.shutdown();
+    assertTrue(executorService.awaitTermination(periodMs * 3 * threads, TimeUnit.MILLISECONDS));
+    
+    final MetricsScope fooScope = Metrics.getScope(scopeName);
+    assertEquals(Long.valueOf(3 * threads), fooScope.getNumCounter());
+    assertTrue(fooScope.getTimeCounter().longValue() > 3 * periodMs * threads);
+    Double avgT = (Double)Metrics.get("foo.avg_t");
+    assertTrue(avgT.doubleValue() > periodMs);
+  }
+  
+  void testScopeImpl(int n) throws Exception {
+    final MetricsScope fooScope = Metrics.startScope(scopeName);
+    // cannot open scope that is already open:
+    expectIOE(new Callable<Void>() {
+      @Override
+      public Void call() throws Exception {
+        fooScope.open();
+        return null;
+      }
+    });
+    
+    assertSame(fooScope, Metrics.getScope(scopeName));
+    Thread.sleep(periodMs+1);
+    // 1st close:
+    Metrics.endScope(scopeName); // closing of open scope should be ok.
+    
+    assertTrue(fooScope.getNumCounter().longValue() >= 1);
+    final long t1 = fooScope.getTimeCounter().longValue(); 
+    assertTrue(t1 > periodMs);
+    
+    expectIOE(new Callable<Void>() {
+      @Override
+      public Void call() throws Exception {
+        Metrics.endScope(scopeName); // closing of closed scope not allowed
+        return null;
+      }
+    });
+    
+    assertSame(fooScope, Metrics.getScope(scopeName));
+    
+   // opening allowed after closing:
+    Metrics.startScope(scopeName);
+    
+    assertTrue(fooScope.getNumCounter().longValue() >= 1);
+    assertTrue(fooScope.getTimeCounter().longValue() >= t1);
+    
+   // opening of already open scope not allowed:
+    expectIOE(new Callable<Void>() {
+      @Override
+      public Void call() throws Exception {
+        Metrics.startScope(scopeName); 
+        return null;
+      }
+    });
+    
+    assertSame(fooScope, Metrics.getScope(scopeName));
+    Thread.sleep(periodMs + 1);
+    // Reopening (close + open) allowed in opened state: 
+    fooScope.reopen();
+
+    assertTrue(fooScope.getNumCounter().longValue() >= 2);
+    assertTrue(fooScope.getTimeCounter().longValue() > 2 * periodMs);
+    
+    Thread.sleep(periodMs + 1);
+    // 3rd close:
+    fooScope.close();
+    
+    assertTrue(fooScope.getNumCounter().longValue() >= 3);
+    assertTrue(fooScope.getTimeCounter().longValue() > 3 * periodMs);
+    Double avgT = (Double)Metrics.get("foo.avg_t");
+    assertTrue(avgT.doubleValue() > periodMs);
+  }
+}