You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by tb...@apache.org on 2013/05/17 22:03:08 UTC

svn commit: r1483967 - in /incubator/ambari/trunk: ./ ambari-server/src/main/java/org/apache/ambari/server/controller/jmx/ ambari-server/src/test/java/org/apache/ambari/server/controller/jmx/

Author: tbeerbower
Date: Fri May 17 20:03:08 2013
New Revision: 1483967

URL: http://svn.apache.org/r1483967
Log:
AMBARI-2158 - Certain queries with metrics takes a long time (3 minutes on a 450-node cluster)

Modified:
    incubator/ambari/trunk/CHANGES.txt
    incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/server/controller/jmx/JMXPropertyProvider.java
    incubator/ambari/trunk/ambari-server/src/test/java/org/apache/ambari/server/controller/jmx/JMXPropertyProviderTest.java
    incubator/ambari/trunk/ambari-server/src/test/java/org/apache/ambari/server/controller/jmx/TestStreamProvider.java

Modified: incubator/ambari/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/ambari/trunk/CHANGES.txt?rev=1483967&r1=1483966&r2=1483967&view=diff
==============================================================================
--- incubator/ambari/trunk/CHANGES.txt (original)
+++ incubator/ambari/trunk/CHANGES.txt Fri May 17 20:03:08 2013
@@ -297,6 +297,9 @@ Trunk (unreleased changes):
 
  IMPROVEMENTS
 
+ AMBARI-2158. Certain queries with metrics takes a long time (3 minutes on
+ a 450-node cluster). (tbeerbower)
+
  AMBARI-2150. Reassign Master Wizard: start the service after reassigning the
  master. (yusaku)
 

Modified: incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/server/controller/jmx/JMXPropertyProvider.java
URL: http://svn.apache.org/viewvc/incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/server/controller/jmx/JMXPropertyProvider.java?rev=1483967&r1=1483966&r2=1483967&view=diff
==============================================================================
--- incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/server/controller/jmx/JMXPropertyProvider.java (original)
+++ incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/server/controller/jmx/JMXPropertyProvider.java Fri May 17 20:03:08 2013
@@ -36,6 +36,15 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CompletionService;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
 
 /**
  * Property provider implementation for JMX sources.
@@ -45,24 +54,37 @@ public class JMXPropertyProvider extends
   private static final String NAME_KEY = "name";
   private static final String PORT_KEY = "tag.port";
 
-  private final StreamProvider streamProvider;
+  private static final long DEFAULT_POPULATE_TIMEOUT_MILLIS = 10000L;
 
-  private final JMXHostProvider jmxHostProvider;
+  public static final String TIMED_OUT_MSG = "Timed out waiting for JMX metrics.";
 
-  private static final Map<String, String> DEFAULT_JMX_PORTS = new HashMap<String, String>();
-
-  private final String clusterNamePropertyId;
+  /**
+   * Thread pool
+   */
+  private static final ExecutorService EXECUTOR_SERVICE;
+  private static final int THREAD_POOL_CORE_SIZE = 20;
+  private static final int THREAD_POOL_MAX_SIZE = 100;
+  private static final long THREAD_POOL_TIMEOUT_MILLIS = 30000L;
 
-  private final String hostNamePropertyId;
+  static {
+    LinkedBlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>(); // unlimited Queue
 
-  private final String componentNamePropertyId;
+    ThreadPoolExecutor threadPoolExecutor =
+        new ThreadPoolExecutor(
+            THREAD_POOL_CORE_SIZE,
+            THREAD_POOL_MAX_SIZE,
+            THREAD_POOL_TIMEOUT_MILLIS,
+            TimeUnit.MILLISECONDS,
+            queue);
 
-  private final String statePropertyId;
+    threadPoolExecutor.allowCoreThreadTimeOut(true);
 
-  private final Set<String> healthyStates;
+    EXECUTOR_SERVICE = threadPoolExecutor;
+  }
 
   private final static ObjectReader objectReader;
 
+  private static final Map<String, String> DEFAULT_JMX_PORTS = new HashMap<String, String>();
 
   static {
     DEFAULT_JMX_PORTS.put("NAMENODE",           "50070");
@@ -80,6 +102,27 @@ public class JMXPropertyProvider extends
   protected final static Logger LOG =
       LoggerFactory.getLogger(JMXPropertyProvider.class);
 
+  private final StreamProvider streamProvider;
+
+  private final JMXHostProvider jmxHostProvider;
+
+  private final String clusterNamePropertyId;
+
+  private final String hostNamePropertyId;
+
+  private final String componentNamePropertyId;
+
+  private final String statePropertyId;
+
+  private final Set<String> healthyStates;
+
+  /**
+   * The amount of time that this provider will wait for JMX metric values to be
+   * returned from the JMX sources.  If no results are returned for this amount of
+   * time then the request to populate the resources will fail.
+   */
+  protected long populateTimeout = DEFAULT_POPULATE_TIMEOUT_MILLIS;
+
 
   // ----- Constructors ------------------------------------------------------
 
@@ -122,12 +165,38 @@ public class JMXPropertyProvider extends
   public Set<Resource> populateResources(Set<Resource> resources, Request request, Predicate predicate)
       throws SystemException {
 
+    CompletionService<Resource> completionService =
+        new ExecutorCompletionService<Resource>(EXECUTOR_SERVICE);
 
-    Set<Resource> keepers = new HashSet<Resource>();
+    // In a large cluster we could have thousands of resources to populate here.
+    // Distribute the work across multiple threads.
     for (Resource resource : resources) {
-      if (populateResource(resource, request, predicate)) {
-        keepers.add(resource);
+      completionService.submit(getPopulateResourceCallable(resource, request, predicate));
+    }
+
+    Set<Resource> keepers = new HashSet<Resource>();
+    try {
+      for (int i = 0; i < resources.size(); ++ i) {
+        Future<Resource> resourceFuture =
+            completionService.poll(populateTimeout, TimeUnit.MILLISECONDS);
+
+        if (resourceFuture == null) {
+          // its been more than the populateTimeout since the last callable completed ...
+          // don't wait any longer
+          LOG.error(TIMED_OUT_MSG);
+          break;
+        } else {
+          // future should already be completed... no need to wait on get
+          Resource resource = resourceFuture.get();
+          if (resource != null) {
+            keepers.add(resource);
+          }
+        }
       }
+    } catch (InterruptedException e) {
+      logException(e);
+    } catch (ExecutionException e) {
+      rethrowSystemException(e.getCause());
     }
     return keepers;
   }
@@ -136,6 +205,15 @@ public class JMXPropertyProvider extends
   // ----- helper methods ----------------------------------------------------
 
   /**
+   * Set the populate timeout value for this provider.
+   *
+   * @param populateTimeout  the populate timeout value
+   */
+  protected void setPopulateTimeout(long populateTimeout) {
+    this.populateTimeout = populateTimeout;
+  }
+
+  /**
    * Get the spec to locate the JMX stream from the given host and port
    *
    * @param hostName  the host name
@@ -148,28 +226,46 @@ public class JMXPropertyProvider extends
   }
 
   /**
+   * Get a callable that can be used to populate the given resource.
+   *
+   * @param resource  the resource to be populated
+   * @param request   the request
+   * @param predicate the predicate
+   *
+   * @return a callable that can be used to populate the given resource
+   */
+  private Callable<Resource> getPopulateResourceCallable(
+      final Resource resource, final Request request, final Predicate predicate) {
+    return new Callable<Resource>() {
+      public Resource call() throws SystemException {
+        return populateResource(resource, request, predicate);
+      }
+    };
+  }
+
+  /**
    * Populate a resource by obtaining the requested JMX properties.
    *
    * @param resource  the resource to be populated
    * @param request   the request
    * @param predicate the predicate
    *
-   * @return true if the resource should be part of the result set for the given predicate
+   * @return the populated resource; null if the resource should NOT be part of the result set for the given predicate
    */
-  private boolean populateResource(Resource resource, Request request, Predicate predicate)
+  private Resource populateResource(Resource resource, Request request, Predicate predicate)
       throws SystemException {
 
     Set<String> ids = getRequestPropertyIds(request, predicate);
     if (ids.isEmpty()) {
       // no properties requested
-      return true;
+      return resource;
     }
 
     // Don't attempt to get the JMX properties if the resource is in an unhealthy state
     if (statePropertyId != null) {
       String state = (String) resource.getPropertyValue(statePropertyId);
       if (state != null && !healthyStates.contains(state)) {
-        return true;
+        return resource;
       }
     }
 
@@ -177,106 +273,98 @@ public class JMXPropertyProvider extends
 
     if (getComponentMetrics().get(componentName) == null) {
       // If there are no metrics defined for the given component then there is nothing to do.
-      return true;
+      return resource;
     }
 
     String clusterName = (String) resource.getPropertyValue(clusterNamePropertyId);
 
     String port = getPort(clusterName, componentName);
     if (port == null) {
-      String error = "Unable to get JMX metrics.  No port value for " + componentName;
-      logError(error, null);
-      throw new SystemException(error, null);
+      throw new SystemException(
+          "Unable to get JMX metrics.  No port value for " + componentName, null);
     }
 
     String hostName = getHost(resource, clusterName, componentName);
     if (hostName == null) {
-      String error = "Unable to get JMX metrics.  No host name for " + componentName;
-      logError(error, null);
-      throw new SystemException(error, null);
+      throw new SystemException(
+          "Unable to get JMX metrics.  No host name for " + componentName, null);
     }
 
-    String      spec = getSpec(hostName, port);
-    InputStream in   = null;
     try {
-      in = streamProvider.readFrom(spec);
-      JMXMetricHolder metricHolder = objectReader.readValue(in);
+      InputStream in = streamProvider.readFrom(getSpec(hostName, port));
 
-      Map<String, Map<String, Object>> categories = new HashMap<String, Map<String, Object>>();
+      try {
+        JMXMetricHolder metricHolder = objectReader.readValue(in);
 
-      for (Map<String, Object> bean : metricHolder.getBeans()) {
-        String category = getCategory(bean);
-        if (category != null) {
-          categories.put(category, bean);
+        Map<String, Map<String, Object>> categories = new HashMap<String, Map<String, Object>>();
+
+        for (Map<String, Object> bean : metricHolder.getBeans()) {
+          String category = getCategory(bean);
+          if (category != null) {
+            categories.put(category, bean);
+          }
         }
-      }
 
-      for (String propertyId : ids) {
-        Map<String, PropertyInfo> propertyInfoMap = getPropertyInfoMap(componentName, propertyId);
+        for (String propertyId : ids) {
+          Map<String, PropertyInfo> propertyInfoMap = getPropertyInfoMap(componentName, propertyId);
 
-        for (Map.Entry<String, PropertyInfo> entry : propertyInfoMap.entrySet()) {
+          for (Map.Entry<String, PropertyInfo> entry : propertyInfoMap.entrySet()) {
 
-          PropertyInfo propertyInfo = entry.getValue();
-          propertyId = entry.getKey();
+            PropertyInfo propertyInfo = entry.getValue();
+            propertyId = entry.getKey();
 
-          if (propertyInfo.isPointInTime()) {
-
-            String property = propertyInfo.getPropertyId();
-            String category = "";
-
-            List<String> keyList = new LinkedList<String>();
-            int keyStartIndex = property.indexOf('[', 0);
-            int firstKeyIndex = keyStartIndex > -1 ? keyStartIndex : property.length();
-            while (keyStartIndex > -1) {
-              int keyEndIndex = property.indexOf(']', keyStartIndex);
-              if (keyEndIndex > -1 & keyEndIndex > keyStartIndex) {
-                keyList.add(property.substring(keyStartIndex + 1, keyEndIndex));
-                keyStartIndex = property.indexOf('[', keyEndIndex);
-              }
-              else {
-                keyStartIndex = -1;
+            if (propertyInfo.isPointInTime()) {
+
+              String property = propertyInfo.getPropertyId();
+              String category = "";
+
+              List<String> keyList = new LinkedList<String>();
+              int keyStartIndex = property.indexOf('[', 0);
+              int firstKeyIndex = keyStartIndex > -1 ? keyStartIndex : property.length();
+              while (keyStartIndex > -1) {
+                int keyEndIndex = property.indexOf(']', keyStartIndex);
+                if (keyEndIndex > -1 & keyEndIndex > keyStartIndex) {
+                  keyList.add(property.substring(keyStartIndex + 1, keyEndIndex));
+                  keyStartIndex = property.indexOf('[', keyEndIndex);
+                }
+                else {
+                  keyStartIndex = -1;
+                }
               }
-            }
 
-            int dotIndex = property.lastIndexOf('.', firstKeyIndex - 1);
-            if (dotIndex != -1){
-              category = property.substring(0, dotIndex);
-              property = property.substring(dotIndex + 1, firstKeyIndex);
-            }
+              int dotIndex = property.lastIndexOf('.', firstKeyIndex - 1);
+              if (dotIndex != -1){
+                category = property.substring(0, dotIndex);
+                property = property.substring(dotIndex + 1, firstKeyIndex);
+              }
 
-            Map<String, Object> properties = categories.get(category);
-            if (properties != null && properties.containsKey(property)) {
-              Object value = properties.get(property);
-              if (keyList.size() > 0 && value instanceof Map) {
-                Map map = (Map) value;
-                for (String key : keyList) {
-                  value = map.get(key);
-                  if (value instanceof Map) {
-                    map = (Map) value;
-                  }
-                  else {
-                    break;
+              Map<String, Object> properties = categories.get(category);
+              if (properties != null && properties.containsKey(property)) {
+                Object value = properties.get(property);
+                if (keyList.size() > 0 && value instanceof Map) {
+                  Map map = (Map) value;
+                  for (String key : keyList) {
+                    value = map.get(key);
+                    if (value instanceof Map) {
+                      map = (Map) value;
+                    }
+                    else {
+                      break;
+                    }
                   }
                 }
+                resource.setProperty(propertyId, value);
               }
-              resource.setProperty(propertyId, value);
             }
           }
         }
+      } finally {
+        in.close();
       }
     } catch (IOException e) {
-      logError(spec, e);
-    } finally {
-      if (in != null) {
-        try {
-          in.close();
-        } catch (IOException e) {
-            logError("Unable to close http input steam : spec=" + spec, e);
-        }
-      }
+      logException(e);
     }
-
-    return true;
+    return resource;
   }
 
   private String getPort(String clusterName, String componentName) throws SystemException {
@@ -303,14 +391,35 @@ public class JMXPropertyProvider extends
     return null;
   }
 
-  private static void logError(String error, IOException e) {
-    if (LOG.isErrorEnabled()) {
-      if (e == null) {
-        LOG.error("Caught exception getting JMX metrics : spec=" + error);
-      } else {
-        LOG.error("Caught exception getting JMX metrics : spec=" + error);
-        LOG.debug("" + e);
-      }
+  /**
+   * Log an error for the given exception.
+   *
+   * @param throwable  the caught exception
+   *
+   * @return the error message that was logged
+   */
+  private static String logException(Throwable throwable) {
+    String msg = "Caught exception getting JMX metrics : " + throwable.getLocalizedMessage();
+
+    LOG.error(msg);
+    LOG.debug(msg, throwable);
+
+    return msg;
+  }
+
+  /**
+   * Rethrow the given exception as a System exception and log the message.
+   *
+   * @param throwable  the caught exception
+   *
+   * @throws SystemException always around the given exception
+   */
+  private static void rethrowSystemException(Throwable throwable) throws SystemException {
+    String msg = logException(throwable);
+
+    if (throwable instanceof SystemException) {
+      throw (SystemException) throwable;
     }
+    throw new SystemException (msg, throwable);
   }
 }

Modified: incubator/ambari/trunk/ambari-server/src/test/java/org/apache/ambari/server/controller/jmx/JMXPropertyProviderTest.java
URL: http://svn.apache.org/viewvc/incubator/ambari/trunk/ambari-server/src/test/java/org/apache/ambari/server/controller/jmx/JMXPropertyProviderTest.java?rev=1483967&r1=1483966&r2=1483967&view=diff
==============================================================================
--- incubator/ambari/trunk/ambari-server/src/test/java/org/apache/ambari/server/controller/jmx/JMXPropertyProviderTest.java (original)
+++ incubator/ambari/trunk/ambari-server/src/test/java/org/apache/ambari/server/controller/jmx/JMXPropertyProviderTest.java Fri May 17 20:03:08 2013
@@ -39,6 +39,8 @@ public class JMXPropertyProviderTest {
   protected static final String HOST_COMPONENT_COMPONENT_NAME_PROPERTY_ID = PropertyHelper.getPropertyId("HostRoles", "component_name");
   protected static final String HOST_COMPONENT_STATE_PROPERTY_ID = PropertyHelper.getPropertyId("HostRoles", "state");
 
+  public static final int NUMBER_OF_RESOURCES = 400;
+
   @Test
   public void testPopulateResources() throws Exception {
     TestStreamProvider  streamProvider = new TestStreamProvider();
@@ -294,6 +296,50 @@ public class JMXPropertyProviderTest {
     Assert.assertNull(streamProvider.getLastSpec());
   }
 
+  @Test
+  public void testPopulateResourcesMany() throws Exception {
+    // Set the provider to take 50 millis to return the JMX values
+    TestStreamProvider  streamProvider = new TestStreamProvider(50L);
+    TestJMXHostProvider hostProvider = new TestJMXHostProvider(true);
+    Set<Resource> resources = new HashSet<Resource>();
+
+    JMXPropertyProvider propertyProvider = new JMXPropertyProvider(
+        PropertyHelper.getJMXPropertyIds(Resource.Type.HostComponent),
+        streamProvider,
+        hostProvider,
+        PropertyHelper.getPropertyId("HostRoles", "cluster_name"),
+        PropertyHelper.getPropertyId("HostRoles", "host_name"),
+        PropertyHelper.getPropertyId("HostRoles", "component_name"),
+        PropertyHelper.getPropertyId("HostRoles", "state"),
+        Collections.singleton("STARTED"));
+
+    for (int i = 0; i < NUMBER_OF_RESOURCES; ++i) {
+      // datanode
+      Resource resource = new ResourceImpl(Resource.Type.HostComponent);
+
+      resource.setProperty(HOST_COMPONENT_HOST_NAME_PROPERTY_ID, "domu-12-31-39-14-ee-b3.compute-1.internal");
+      resource.setProperty(HOST_COMPONENT_COMPONENT_NAME_PROPERTY_ID, "DATANODE");
+
+      resources.add(resource);
+    }
+
+    // request with an empty set should get all supported properties
+    Request request = PropertyHelper.getReadRequest(Collections.<String>emptySet());
+
+    Set<Resource> resourceSet = propertyProvider.populateResources(resources, request, null);
+
+    Assert.assertEquals(NUMBER_OF_RESOURCES, resourceSet.size());
+
+    for (Resource resource : resourceSet) {
+      // see test/resources/hdfs_datanode_jmx.json for values
+      Assert.assertEquals(856,  resource.getPropertyValue(PropertyHelper.getPropertyId("metrics/rpc", "ReceivedBytes")));
+      Assert.assertEquals(954466304, resource.getPropertyValue(PropertyHelper.getPropertyId("metrics/jvm", "HeapMemoryMax")));
+      Assert.assertEquals(9772616, resource.getPropertyValue(PropertyHelper.getPropertyId("metrics/jvm", "HeapMemoryUsed")));
+      Assert.assertEquals(136314880, resource.getPropertyValue(PropertyHelper.getPropertyId("metrics/jvm", "NonHeapMemoryMax")));
+      Assert.assertEquals(21933376, resource.getPropertyValue(PropertyHelper.getPropertyId("metrics/jvm", "NonHeapMemoryUsed")));
+    }
+  }
+
   private static class TestJMXHostProvider implements JMXHostProvider {
     private final boolean unknownPort;
 

Modified: incubator/ambari/trunk/ambari-server/src/test/java/org/apache/ambari/server/controller/jmx/TestStreamProvider.java
URL: http://svn.apache.org/viewvc/incubator/ambari/trunk/ambari-server/src/test/java/org/apache/ambari/server/controller/jmx/TestStreamProvider.java?rev=1483967&r1=1483966&r2=1483967&view=diff
==============================================================================
--- incubator/ambari/trunk/ambari-server/src/test/java/org/apache/ambari/server/controller/jmx/TestStreamProvider.java (original)
+++ incubator/ambari/trunk/ambari-server/src/test/java/org/apache/ambari/server/controller/jmx/TestStreamProvider.java Fri May 17 20:03:08 2013
@@ -37,8 +37,21 @@ public class TestStreamProvider implemen
     FILE_MAPPING.put("60010", "hbase_hbasemaster_jmx.json");
   }
 
+  /**
+   * Delay to simulate response time.
+   */
+  protected final long delay;
+
   private String lastSpec;
 
+  public TestStreamProvider() {
+    delay = 0;
+  }
+
+  public TestStreamProvider(long delay) {
+    this.delay = delay;
+  }
+
   @Override
   public InputStream readFrom(String spec) throws IOException {
     lastSpec = spec;
@@ -46,6 +59,14 @@ public class TestStreamProvider implemen
     if (filename == null) {
       throw new IOException("Can't find JMX source for " + spec);
     }
+    if (delay > 0) {
+      try {
+        Thread.sleep(delay);
+      } catch (InterruptedException e) {
+        // do nothing
+      }
+    }
+
     return ClassLoader.getSystemResourceAsStream(filename);
   }
 
@@ -57,6 +78,4 @@ public class TestStreamProvider implemen
     int n = spec.indexOf(":", 5);
     return spec.substring(n + 1, n + 6);
   }
-
-
 }