You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by tb...@apache.org on 2013/05/17 22:03:08 UTC
svn commit: r1483967 - in /incubator/ambari/trunk: ./
ambari-server/src/main/java/org/apache/ambari/server/controller/jmx/
ambari-server/src/test/java/org/apache/ambari/server/controller/jmx/
Author: tbeerbower
Date: Fri May 17 20:03:08 2013
New Revision: 1483967
URL: http://svn.apache.org/r1483967
Log:
AMBARI-2158 - Certain queries with metrics takes a long time (3 minutes on a 450-node cluster)
Modified:
incubator/ambari/trunk/CHANGES.txt
incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/server/controller/jmx/JMXPropertyProvider.java
incubator/ambari/trunk/ambari-server/src/test/java/org/apache/ambari/server/controller/jmx/JMXPropertyProviderTest.java
incubator/ambari/trunk/ambari-server/src/test/java/org/apache/ambari/server/controller/jmx/TestStreamProvider.java
Modified: incubator/ambari/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/ambari/trunk/CHANGES.txt?rev=1483967&r1=1483966&r2=1483967&view=diff
==============================================================================
--- incubator/ambari/trunk/CHANGES.txt (original)
+++ incubator/ambari/trunk/CHANGES.txt Fri May 17 20:03:08 2013
@@ -297,6 +297,9 @@ Trunk (unreleased changes):
IMPROVEMENTS
+ AMBARI-2158. Certain queries with metrics takes a long time (3 minutes on
+ a 450-node cluster). (tbeerbower)
+
AMBARI-2150. Reassign Master Wizard: start the service after reassigning the
master. (yusaku)
Modified: incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/server/controller/jmx/JMXPropertyProvider.java
URL: http://svn.apache.org/viewvc/incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/server/controller/jmx/JMXPropertyProvider.java?rev=1483967&r1=1483966&r2=1483967&view=diff
==============================================================================
--- incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/server/controller/jmx/JMXPropertyProvider.java (original)
+++ incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/server/controller/jmx/JMXPropertyProvider.java Fri May 17 20:03:08 2013
@@ -36,6 +36,15 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CompletionService;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
/**
* Property provider implementation for JMX sources.
@@ -45,24 +54,37 @@ public class JMXPropertyProvider extends
private static final String NAME_KEY = "name";
private static final String PORT_KEY = "tag.port";
- private final StreamProvider streamProvider;
+ private static final long DEFAULT_POPULATE_TIMEOUT_MILLIS = 10000L;
- private final JMXHostProvider jmxHostProvider;
+ public static final String TIMED_OUT_MSG = "Timed out waiting for JMX metrics.";
- private static final Map<String, String> DEFAULT_JMX_PORTS = new HashMap<String, String>();
-
- private final String clusterNamePropertyId;
+ /**
+ * Thread pool
+ */
+ private static final ExecutorService EXECUTOR_SERVICE;
+ private static final int THREAD_POOL_CORE_SIZE = 20;
+ private static final int THREAD_POOL_MAX_SIZE = 100;
+ private static final long THREAD_POOL_TIMEOUT_MILLIS = 30000L;
- private final String hostNamePropertyId;
+ static {
+ LinkedBlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>(); // unlimited Queue
- private final String componentNamePropertyId;
+ ThreadPoolExecutor threadPoolExecutor =
+ new ThreadPoolExecutor(
+ THREAD_POOL_CORE_SIZE,
+ THREAD_POOL_MAX_SIZE,
+ THREAD_POOL_TIMEOUT_MILLIS,
+ TimeUnit.MILLISECONDS,
+ queue);
- private final String statePropertyId;
+ threadPoolExecutor.allowCoreThreadTimeOut(true);
- private final Set<String> healthyStates;
+ EXECUTOR_SERVICE = threadPoolExecutor;
+ }
private final static ObjectReader objectReader;
+ private static final Map<String, String> DEFAULT_JMX_PORTS = new HashMap<String, String>();
static {
DEFAULT_JMX_PORTS.put("NAMENODE", "50070");
@@ -80,6 +102,27 @@ public class JMXPropertyProvider extends
protected final static Logger LOG =
LoggerFactory.getLogger(JMXPropertyProvider.class);
+ private final StreamProvider streamProvider;
+
+ private final JMXHostProvider jmxHostProvider;
+
+ private final String clusterNamePropertyId;
+
+ private final String hostNamePropertyId;
+
+ private final String componentNamePropertyId;
+
+ private final String statePropertyId;
+
+ private final Set<String> healthyStates;
+
+ /**
+ * The amount of time that this provider will wait for JMX metric values to be
+ * returned from the JMX sources. If no results are returned for this amount of
+ * time then the request to populate the resources will fail.
+ */
+ protected long populateTimeout = DEFAULT_POPULATE_TIMEOUT_MILLIS;
+
// ----- Constructors ------------------------------------------------------
@@ -122,12 +165,38 @@ public class JMXPropertyProvider extends
public Set<Resource> populateResources(Set<Resource> resources, Request request, Predicate predicate)
throws SystemException {
+ CompletionService<Resource> completionService =
+ new ExecutorCompletionService<Resource>(EXECUTOR_SERVICE);
- Set<Resource> keepers = new HashSet<Resource>();
+ // In a large cluster we could have thousands of resources to populate here.
+ // Distribute the work across multiple threads.
for (Resource resource : resources) {
- if (populateResource(resource, request, predicate)) {
- keepers.add(resource);
+ completionService.submit(getPopulateResourceCallable(resource, request, predicate));
+ }
+
+ Set<Resource> keepers = new HashSet<Resource>();
+ try {
+ for (int i = 0; i < resources.size(); ++ i) {
+ Future<Resource> resourceFuture =
+ completionService.poll(populateTimeout, TimeUnit.MILLISECONDS);
+
+ if (resourceFuture == null) {
+ // its been more than the populateTimeout since the last callable completed ...
+ // don't wait any longer
+ LOG.error(TIMED_OUT_MSG);
+ break;
+ } else {
+ // future should already be completed... no need to wait on get
+ Resource resource = resourceFuture.get();
+ if (resource != null) {
+ keepers.add(resource);
+ }
+ }
}
+ } catch (InterruptedException e) {
+ logException(e);
+ } catch (ExecutionException e) {
+ rethrowSystemException(e.getCause());
}
return keepers;
}
@@ -136,6 +205,15 @@ public class JMXPropertyProvider extends
// ----- helper methods ----------------------------------------------------
/**
+ * Set the populate timeout value for this provider.
+ *
+ * @param populateTimeout the populate timeout value
+ */
+ protected void setPopulateTimeout(long populateTimeout) {
+ this.populateTimeout = populateTimeout;
+ }
+
+ /**
* Get the spec to locate the JMX stream from the given host and port
*
* @param hostName the host name
@@ -148,28 +226,46 @@ public class JMXPropertyProvider extends
}
/**
+ * Get a callable that can be used to populate the given resource.
+ *
+ * @param resource the resource to be populated
+ * @param request the request
+ * @param predicate the predicate
+ *
+ * @return a callable that can be used to populate the given resource
+ */
+ private Callable<Resource> getPopulateResourceCallable(
+ final Resource resource, final Request request, final Predicate predicate) {
+ return new Callable<Resource>() {
+ public Resource call() throws SystemException {
+ return populateResource(resource, request, predicate);
+ }
+ };
+ }
+
+ /**
* Populate a resource by obtaining the requested JMX properties.
*
* @param resource the resource to be populated
* @param request the request
* @param predicate the predicate
*
- * @return true if the resource should be part of the result set for the given predicate
+ * @return the populated resource; null if the resource should NOT be part of the result set for the given predicate
*/
- private boolean populateResource(Resource resource, Request request, Predicate predicate)
+ private Resource populateResource(Resource resource, Request request, Predicate predicate)
throws SystemException {
Set<String> ids = getRequestPropertyIds(request, predicate);
if (ids.isEmpty()) {
// no properties requested
- return true;
+ return resource;
}
// Don't attempt to get the JMX properties if the resource is in an unhealthy state
if (statePropertyId != null) {
String state = (String) resource.getPropertyValue(statePropertyId);
if (state != null && !healthyStates.contains(state)) {
- return true;
+ return resource;
}
}
@@ -177,106 +273,98 @@ public class JMXPropertyProvider extends
if (getComponentMetrics().get(componentName) == null) {
// If there are no metrics defined for the given component then there is nothing to do.
- return true;
+ return resource;
}
String clusterName = (String) resource.getPropertyValue(clusterNamePropertyId);
String port = getPort(clusterName, componentName);
if (port == null) {
- String error = "Unable to get JMX metrics. No port value for " + componentName;
- logError(error, null);
- throw new SystemException(error, null);
+ throw new SystemException(
+ "Unable to get JMX metrics. No port value for " + componentName, null);
}
String hostName = getHost(resource, clusterName, componentName);
if (hostName == null) {
- String error = "Unable to get JMX metrics. No host name for " + componentName;
- logError(error, null);
- throw new SystemException(error, null);
+ throw new SystemException(
+ "Unable to get JMX metrics. No host name for " + componentName, null);
}
- String spec = getSpec(hostName, port);
- InputStream in = null;
try {
- in = streamProvider.readFrom(spec);
- JMXMetricHolder metricHolder = objectReader.readValue(in);
+ InputStream in = streamProvider.readFrom(getSpec(hostName, port));
- Map<String, Map<String, Object>> categories = new HashMap<String, Map<String, Object>>();
+ try {
+ JMXMetricHolder metricHolder = objectReader.readValue(in);
- for (Map<String, Object> bean : metricHolder.getBeans()) {
- String category = getCategory(bean);
- if (category != null) {
- categories.put(category, bean);
+ Map<String, Map<String, Object>> categories = new HashMap<String, Map<String, Object>>();
+
+ for (Map<String, Object> bean : metricHolder.getBeans()) {
+ String category = getCategory(bean);
+ if (category != null) {
+ categories.put(category, bean);
+ }
}
- }
- for (String propertyId : ids) {
- Map<String, PropertyInfo> propertyInfoMap = getPropertyInfoMap(componentName, propertyId);
+ for (String propertyId : ids) {
+ Map<String, PropertyInfo> propertyInfoMap = getPropertyInfoMap(componentName, propertyId);
- for (Map.Entry<String, PropertyInfo> entry : propertyInfoMap.entrySet()) {
+ for (Map.Entry<String, PropertyInfo> entry : propertyInfoMap.entrySet()) {
- PropertyInfo propertyInfo = entry.getValue();
- propertyId = entry.getKey();
+ PropertyInfo propertyInfo = entry.getValue();
+ propertyId = entry.getKey();
- if (propertyInfo.isPointInTime()) {
-
- String property = propertyInfo.getPropertyId();
- String category = "";
-
- List<String> keyList = new LinkedList<String>();
- int keyStartIndex = property.indexOf('[', 0);
- int firstKeyIndex = keyStartIndex > -1 ? keyStartIndex : property.length();
- while (keyStartIndex > -1) {
- int keyEndIndex = property.indexOf(']', keyStartIndex);
- if (keyEndIndex > -1 & keyEndIndex > keyStartIndex) {
- keyList.add(property.substring(keyStartIndex + 1, keyEndIndex));
- keyStartIndex = property.indexOf('[', keyEndIndex);
- }
- else {
- keyStartIndex = -1;
+ if (propertyInfo.isPointInTime()) {
+
+ String property = propertyInfo.getPropertyId();
+ String category = "";
+
+ List<String> keyList = new LinkedList<String>();
+ int keyStartIndex = property.indexOf('[', 0);
+ int firstKeyIndex = keyStartIndex > -1 ? keyStartIndex : property.length();
+ while (keyStartIndex > -1) {
+ int keyEndIndex = property.indexOf(']', keyStartIndex);
+ if (keyEndIndex > -1 & keyEndIndex > keyStartIndex) {
+ keyList.add(property.substring(keyStartIndex + 1, keyEndIndex));
+ keyStartIndex = property.indexOf('[', keyEndIndex);
+ }
+ else {
+ keyStartIndex = -1;
+ }
}
- }
- int dotIndex = property.lastIndexOf('.', firstKeyIndex - 1);
- if (dotIndex != -1){
- category = property.substring(0, dotIndex);
- property = property.substring(dotIndex + 1, firstKeyIndex);
- }
+ int dotIndex = property.lastIndexOf('.', firstKeyIndex - 1);
+ if (dotIndex != -1){
+ category = property.substring(0, dotIndex);
+ property = property.substring(dotIndex + 1, firstKeyIndex);
+ }
- Map<String, Object> properties = categories.get(category);
- if (properties != null && properties.containsKey(property)) {
- Object value = properties.get(property);
- if (keyList.size() > 0 && value instanceof Map) {
- Map map = (Map) value;
- for (String key : keyList) {
- value = map.get(key);
- if (value instanceof Map) {
- map = (Map) value;
- }
- else {
- break;
+ Map<String, Object> properties = categories.get(category);
+ if (properties != null && properties.containsKey(property)) {
+ Object value = properties.get(property);
+ if (keyList.size() > 0 && value instanceof Map) {
+ Map map = (Map) value;
+ for (String key : keyList) {
+ value = map.get(key);
+ if (value instanceof Map) {
+ map = (Map) value;
+ }
+ else {
+ break;
+ }
}
}
+ resource.setProperty(propertyId, value);
}
- resource.setProperty(propertyId, value);
}
}
}
+ } finally {
+ in.close();
}
} catch (IOException e) {
- logError(spec, e);
- } finally {
- if (in != null) {
- try {
- in.close();
- } catch (IOException e) {
- logError("Unable to close http input steam : spec=" + spec, e);
- }
- }
+ logException(e);
}
-
- return true;
+ return resource;
}
private String getPort(String clusterName, String componentName) throws SystemException {
@@ -303,14 +391,35 @@ public class JMXPropertyProvider extends
return null;
}
- private static void logError(String error, IOException e) {
- if (LOG.isErrorEnabled()) {
- if (e == null) {
- LOG.error("Caught exception getting JMX metrics : spec=" + error);
- } else {
- LOG.error("Caught exception getting JMX metrics : spec=" + error);
- LOG.debug("" + e);
- }
+ /**
+ * Log an error for the given exception.
+ *
+ * @param throwable the caught exception
+ *
+ * @return the error message that was logged
+ */
+ private static String logException(Throwable throwable) {
+ String msg = "Caught exception getting JMX metrics : " + throwable.getLocalizedMessage();
+
+ LOG.error(msg);
+ LOG.debug(msg, throwable);
+
+ return msg;
+ }
+
+ /**
+ * Rethrow the given exception as a System exception and log the message.
+ *
+ * @param throwable the caught exception
+ *
+ * @throws SystemException always around the given exception
+ */
+ private static void rethrowSystemException(Throwable throwable) throws SystemException {
+ String msg = logException(throwable);
+
+ if (throwable instanceof SystemException) {
+ throw (SystemException) throwable;
}
+ throw new SystemException (msg, throwable);
}
}
Modified: incubator/ambari/trunk/ambari-server/src/test/java/org/apache/ambari/server/controller/jmx/JMXPropertyProviderTest.java
URL: http://svn.apache.org/viewvc/incubator/ambari/trunk/ambari-server/src/test/java/org/apache/ambari/server/controller/jmx/JMXPropertyProviderTest.java?rev=1483967&r1=1483966&r2=1483967&view=diff
==============================================================================
--- incubator/ambari/trunk/ambari-server/src/test/java/org/apache/ambari/server/controller/jmx/JMXPropertyProviderTest.java (original)
+++ incubator/ambari/trunk/ambari-server/src/test/java/org/apache/ambari/server/controller/jmx/JMXPropertyProviderTest.java Fri May 17 20:03:08 2013
@@ -39,6 +39,8 @@ public class JMXPropertyProviderTest {
protected static final String HOST_COMPONENT_COMPONENT_NAME_PROPERTY_ID = PropertyHelper.getPropertyId("HostRoles", "component_name");
protected static final String HOST_COMPONENT_STATE_PROPERTY_ID = PropertyHelper.getPropertyId("HostRoles", "state");
+ public static final int NUMBER_OF_RESOURCES = 400;
+
@Test
public void testPopulateResources() throws Exception {
TestStreamProvider streamProvider = new TestStreamProvider();
@@ -294,6 +296,50 @@ public class JMXPropertyProviderTest {
Assert.assertNull(streamProvider.getLastSpec());
}
+ @Test
+ public void testPopulateResourcesMany() throws Exception {
+ // Set the provider to take 50 millis to return the JMX values
+ TestStreamProvider streamProvider = new TestStreamProvider(50L);
+ TestJMXHostProvider hostProvider = new TestJMXHostProvider(true);
+ Set<Resource> resources = new HashSet<Resource>();
+
+ JMXPropertyProvider propertyProvider = new JMXPropertyProvider(
+ PropertyHelper.getJMXPropertyIds(Resource.Type.HostComponent),
+ streamProvider,
+ hostProvider,
+ PropertyHelper.getPropertyId("HostRoles", "cluster_name"),
+ PropertyHelper.getPropertyId("HostRoles", "host_name"),
+ PropertyHelper.getPropertyId("HostRoles", "component_name"),
+ PropertyHelper.getPropertyId("HostRoles", "state"),
+ Collections.singleton("STARTED"));
+
+ for (int i = 0; i < NUMBER_OF_RESOURCES; ++i) {
+ // datanode
+ Resource resource = new ResourceImpl(Resource.Type.HostComponent);
+
+ resource.setProperty(HOST_COMPONENT_HOST_NAME_PROPERTY_ID, "domu-12-31-39-14-ee-b3.compute-1.internal");
+ resource.setProperty(HOST_COMPONENT_COMPONENT_NAME_PROPERTY_ID, "DATANODE");
+
+ resources.add(resource);
+ }
+
+ // request with an empty set should get all supported properties
+ Request request = PropertyHelper.getReadRequest(Collections.<String>emptySet());
+
+ Set<Resource> resourceSet = propertyProvider.populateResources(resources, request, null);
+
+ Assert.assertEquals(NUMBER_OF_RESOURCES, resourceSet.size());
+
+ for (Resource resource : resourceSet) {
+ // see test/resources/hdfs_datanode_jmx.json for values
+ Assert.assertEquals(856, resource.getPropertyValue(PropertyHelper.getPropertyId("metrics/rpc", "ReceivedBytes")));
+ Assert.assertEquals(954466304, resource.getPropertyValue(PropertyHelper.getPropertyId("metrics/jvm", "HeapMemoryMax")));
+ Assert.assertEquals(9772616, resource.getPropertyValue(PropertyHelper.getPropertyId("metrics/jvm", "HeapMemoryUsed")));
+ Assert.assertEquals(136314880, resource.getPropertyValue(PropertyHelper.getPropertyId("metrics/jvm", "NonHeapMemoryMax")));
+ Assert.assertEquals(21933376, resource.getPropertyValue(PropertyHelper.getPropertyId("metrics/jvm", "NonHeapMemoryUsed")));
+ }
+ }
+
private static class TestJMXHostProvider implements JMXHostProvider {
private final boolean unknownPort;
Modified: incubator/ambari/trunk/ambari-server/src/test/java/org/apache/ambari/server/controller/jmx/TestStreamProvider.java
URL: http://svn.apache.org/viewvc/incubator/ambari/trunk/ambari-server/src/test/java/org/apache/ambari/server/controller/jmx/TestStreamProvider.java?rev=1483967&r1=1483966&r2=1483967&view=diff
==============================================================================
--- incubator/ambari/trunk/ambari-server/src/test/java/org/apache/ambari/server/controller/jmx/TestStreamProvider.java (original)
+++ incubator/ambari/trunk/ambari-server/src/test/java/org/apache/ambari/server/controller/jmx/TestStreamProvider.java Fri May 17 20:03:08 2013
@@ -37,8 +37,21 @@ public class TestStreamProvider implemen
FILE_MAPPING.put("60010", "hbase_hbasemaster_jmx.json");
}
+ /**
+ * Delay to simulate response time.
+ */
+ protected final long delay;
+
private String lastSpec;
+ public TestStreamProvider() {
+ delay = 0;
+ }
+
+ public TestStreamProvider(long delay) {
+ this.delay = delay;
+ }
+
@Override
public InputStream readFrom(String spec) throws IOException {
lastSpec = spec;
@@ -46,6 +59,14 @@ public class TestStreamProvider implemen
if (filename == null) {
throw new IOException("Can't find JMX source for " + spec);
}
+ if (delay > 0) {
+ try {
+ Thread.sleep(delay);
+ } catch (InterruptedException e) {
+ // do nothing
+ }
+ }
+
return ClassLoader.getSystemResourceAsStream(filename);
}
@@ -57,6 +78,4 @@ public class TestStreamProvider implemen
int n = spec.indexOf(":", 5);
return spec.substring(n + 1, n + 6);
}
-
-
}