You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by jo...@apache.org on 2016/09/08 15:19:38 UTC

ambari git commit: AMBARI-18331 - JMX metric retrieval method may unnecessarily refresh metrics at a high rate (jonathanhurley)

Repository: ambari
Updated Branches:
  refs/heads/trunk 4e4f67ae8 -> 8efbde8fd


AMBARI-18331 - JMX metric retrieval method may unnecessarily refresh metrics at a high rate (jonathanhurley)


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/8efbde8f
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/8efbde8f
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/8efbde8f

Branch: refs/heads/trunk
Commit: 8efbde8fd13be4278017345a4077bebb6f4f366e
Parents: 4e4f67a
Author: Jonathan Hurley <jh...@hortonworks.com>
Authored: Wed Sep 7 09:02:25 2016 -0400
Committer: Jonathan Hurley <jh...@hortonworks.com>
Committed: Thu Sep 8 10:16:33 2016 -0400

----------------------------------------------------------------------
 ambari-server/docs/configuration/index.md       |   3 +-
 .../server/configuration/Configuration.java     |  56 +++++-
 .../controller/jmx/JMXPropertyProvider.java     |   5 +-
 .../metrics/RestMetricsPropertyProvider.java    |   3 +-
 .../state/services/MetricsRetrievalService.java | 193 +++++++++++-------
 .../metrics/JMXPropertyProviderTest.java        |   8 +-
 .../RestMetricsPropertyProviderTest.java        |   8 +-
 .../services/MetricsRetrievalServiceTest.java   | 195 +++++++++++++++++++
 8 files changed, 397 insertions(+), 74 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/8efbde8f/ambari-server/docs/configuration/index.md
----------------------------------------------------------------------
diff --git a/ambari-server/docs/configuration/index.md b/ambari-server/docs/configuration/index.md
index 18234d0..3c3f3d0 100644
--- a/ambari-server/docs/configuration/index.md
+++ b/ambari-server/docs/configuration/index.md
@@ -96,7 +96,6 @@ The following are the properties which can be used to configure Ambari.
 | authentication.ldap.usernameAttribute | The attribute used for determining the user name, such as `uid`. |`uid` | 
 | authorization.ldap.adminGroupMappingRules | A comma-separate list of groups which would give a user administrative access to Ambari when syncing from LDAP. This is only used when `authorization.ldap.groupSearchFilter` is blank.<br/><br/>The following are examples of valid values:<ul><li>`administrators`<li>`Hadoop Admins,Hadoop Admins.*,DC Admins,.*Hadoop Operators`</ul> |`Ambari Administrators` | 
 | authorization.ldap.groupSearchFilter | The DN to use when searching for LDAP groups. | | 
-| blueprint.skip_install_tasks | Determines if the request generated by a blueprint should include the package installation tasks. |`false` | 
 | bootstrap.dir | The directory on the Ambari Server file system used for storing Ambari Agent bootstrap information such as request responses. |`/var/run/ambari-server/bootstrap` | 
 | bootstrap.master_host_name | The host name of the Ambari Server which will be used by the Ambari Agents for communication. | | 
 | bootstrap.script | The location and name of the Python script used to bootstrap new Ambari Agent hosts. |`/usr/lib/python2.6/site-packages/ambari_server/bootstrap.py` | 
@@ -130,6 +129,8 @@ The following are the properties which can be used to configure Ambari.
 | kerberos.keytab.cache.dir | The location on the Ambari Server where Kerberos keytabs are cached. |`/var/lib/ambari-server/data/cache` | 
 | metadata.path | The location on the Ambari Server where the stack resources exist.<br/><br/>The following are examples of valid values:<ul><li>`/var/lib/ambari-server/resources/stacks`</ul> | | 
 | metrics.retrieval-service.cache.timeout | The amount of time, in minutes, that JMX and REST metrics retrieved directly can remain in the cache. |`30` | 
+| metrics.retrieval-service.request.ttl | The number of seconds to wait between issuing JMX or REST metric requests to the same endpoint. This property is used to throttle requests to the same URL being made too close together<br/><br/> This property is related to `metrics.retrieval-service.request.ttl.enabled`. |`5` | 
+| metrics.retrieval-service.request.ttl.enabled | Enables throttling requests to the same endpoint within a fixed amount of time. This property will prevent Ambari from making new metric requests to update the cache for URLs which have been recently retrieved.<br/><br/> This property is related to `metrics.retrieval-service.request.ttl`. |`true` | 
 | mpacks.staging.path | The Ambari Management Pack staging directory on the Ambari Server.<br/><br/>The following are examples of valid values:<ul><li>`/var/lib/ambari-server/resources/mpacks`</ul> | | 
 | packages.pre.installed | Determines whether Ambari Agent instances have already have the necessary stack software installed |`false` | 
 | proxy.allowed.hostports | A comma-separated whitelist of host and port values which Ambari Server can use to determine if a proxy value is valid. |`*:*` | 

http://git-wip-us.apache.org/repos/asf/ambari/blob/8efbde8f/ambari-server/src/main/java/org/apache/ambari/server/configuration/Configuration.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/configuration/Configuration.java b/ambari-server/src/main/java/org/apache/ambari/server/configuration/Configuration.java
index db1b81e..0690ca8 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/configuration/Configuration.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/configuration/Configuration.java
@@ -85,9 +85,9 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.common.collect.Multimap;
+import com.google.common.collect.Sets;
 import com.google.gson.Gson;
 import com.google.gson.GsonBuilder;
-import com.google.common.collect.Sets;
 import com.google.gson.JsonElement;
 import com.google.gson.JsonObject;
 import com.google.gson.JsonParser;
@@ -2264,8 +2264,36 @@ public class Configuration {
       10 * METRIC_RETRIEVAL_SERVICE_THREADPOOL_MAX_SIZE.getDefaultValue());
 
   /**
-   * The number of tasks that can be queried from the database at once
-   * In the case of more tasks, multiple queries are issued
+   * {@code true} to enable a TTL per request made by the
+   * {@link MetricsRetrievalService}. Enabling this property will prevent
+   * requests to the same URL endpoint within a fixed amount of time allowing
+   * requests to be throttled.
+   */
+  @Markdown(
+      relatedTo = "metrics.retrieval-service.request.ttl",
+      description = "Enables throttling requests to the same endpoint within a fixed amount of time. "
+          + "This property will prevent Ambari from making new metric requests to update the cache for URLs which have been recently retrieved.")
+  public static final ConfigurationProperty<Boolean> METRIC_RETRIEVAL_SERVICE_REQUEST_TTL_ENABLED = new ConfigurationProperty<>(
+      "metrics.retrieval-service.request.ttl.enabled", Boolean.TRUE);
+
+  /**
+   * The amount of time, in {@link TimeUnit#SECONDS}, that requests to the same
+   * URL by the {@link MetricsRetrievalService} must be separated. Requests to
+   * the same URL which are too close together will not result in metrics
+   * retrieval. This property is used to throttle requests to the same URL being
+   * made too close together.
+   */
+  @Markdown(
+      relatedTo = "metrics.retrieval-service.request.ttl.enabled",
+      description = "The number of seconds to wait between issuing JMX or REST metric requests to the same endpoint. "
+          + "This property is used to throttle requests to the same URL being made too close together")
+  public static final ConfigurationProperty<Integer> METRIC_RETRIEVAL_SERVICE_REQUEST_TTL = new ConfigurationProperty<>(
+      "metrics.retrieval-service.request.ttl", 5);
+
+  /**
+   * The number of tasks that can be queried from the database at once In the
+   * case of more tasks, multiple queries are issued
+   *
    * @return
    */
   @Markdown(description = "The maximum number of tasks which can be queried by ID from the database.")
@@ -4766,8 +4794,30 @@ public class Configuration {
   }
 
   /**
+   * Gets the number of seconds that requests made to the same URL will be discarded in order to
+   * throttle the retrieval from the same endpoint.
+   *
+   * @return the number of seconds that must elapse between requests to the same endpoint.
+   */
+  public int getMetricsServiceRequestTTL() {
+    return Integer.parseInt(getProperty(METRIC_RETRIEVAL_SERVICE_REQUEST_TTL));
+  }
+
+  /**
+   * Gets whether the TTL request cache in the {@link MetricsRetrievalService}
+   * is enabled. This evicting cache is used to prevent requests to the same URL
+   * within a specified amount of time.
+   *
+   * @return {@code true} if enabled, {@code false} otherwise.
+   */
+  public boolean isMetricsServiceRequestTTLCacheEnabled() {
+    return Boolean.parseBoolean(getProperty(METRIC_RETRIEVAL_SERVICE_REQUEST_TTL_ENABLED));
+  }
+
+  /**
    * Returns the number of tasks that can be queried from the database at once
    * In the case of more tasks, multiple queries are issued
+   *
    * @return
    */
   public int getTaskIdListLimit() {

http://git-wip-us.apache.org/repos/asf/ambari/blob/8efbde8f/ambari-server/src/main/java/org/apache/ambari/server/controller/jmx/JMXPropertyProvider.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/jmx/JMXPropertyProvider.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/jmx/JMXPropertyProvider.java
index 7665d7f..cbc15cb 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/controller/jmx/JMXPropertyProvider.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/jmx/JMXPropertyProvider.java
@@ -41,6 +41,7 @@ import org.apache.ambari.server.controller.spi.Resource;
 import org.apache.ambari.server.controller.spi.SystemException;
 import org.apache.ambari.server.controller.utilities.StreamProvider;
 import org.apache.ambari.server.state.services.MetricsRetrievalService;
+import org.apache.ambari.server.state.services.MetricsRetrievalService.MetricSourceType;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -262,7 +263,7 @@ public class JMXPropertyProvider extends ThreadPoolEnabledPropertyProvider {
         String jmxUrl = getSpec(protocol, hostName, port, "/jmx");
 
         // always submit a request to cache the latest data
-        metricsRetrievalService.submitJMXRequest(streamProvider, jmxUrl);
+        metricsRetrievalService.submitRequest(MetricSourceType.JMX, streamProvider, jmxUrl);
 
         // check to see if there is a cached value and use it if there is
         JMXMetricHolder jmxMetricHolder = metricsRetrievalService.getCachedJMXMetric(jmxUrl);
@@ -286,7 +287,7 @@ public class JMXPropertyProvider extends ThreadPoolEnabledPropertyProvider {
               }
               if (queryURL != null) {
                 String adHocUrl = getSpec(protocol, hostName, port, queryURL);
-                metricsRetrievalService.submitJMXRequest(streamProvider, adHocUrl);
+                metricsRetrievalService.submitRequest(MetricSourceType.JMX, streamProvider, adHocUrl);
                 JMXMetricHolder adHocJMXMetricHolder = metricsRetrievalService.getCachedJMXMetric(adHocUrl);
 
                 // if the ticket becomes invalid (timeout) then bail out

http://git-wip-us.apache.org/repos/asf/ambari/blob/8efbde8f/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/RestMetricsPropertyProvider.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/RestMetricsPropertyProvider.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/RestMetricsPropertyProvider.java
index cbe827a..0b8dd62 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/RestMetricsPropertyProvider.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/RestMetricsPropertyProvider.java
@@ -40,6 +40,7 @@ import org.apache.ambari.server.controller.utilities.StreamProvider;
 import org.apache.ambari.server.state.Cluster;
 import org.apache.ambari.server.state.Clusters;
 import org.apache.ambari.server.state.services.MetricsRetrievalService;
+import org.apache.ambari.server.state.services.MetricsRetrievalService.MetricSourceType;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -241,7 +242,7 @@ public class RestMetricsPropertyProvider extends ThreadPoolEnabledPropertyProvid
       String spec = getSpec(protocol, hostname, port, url);
 
       // always submit a request to cache the latest data
-      metricsRetrievalService.submitRESTRequest(streamProvider, spec);
+      metricsRetrievalService.submitRequest(MetricSourceType.REST, streamProvider, spec);
 
       // check to see if there is a cached value and use it if there is
       Map<String, String> jsonMap = metricsRetrievalService.getCachedRESTMetric(spec);

http://git-wip-us.apache.org/repos/asf/ambari/blob/8efbde8f/ambari-server/src/main/java/org/apache/ambari/server/state/services/MetricsRetrievalService.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/state/services/MetricsRetrievalService.java b/ambari-server/src/main/java/org/apache/ambari/server/state/services/MetricsRetrievalService.java
index fa36905..629f6ab 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/state/services/MetricsRetrievalService.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/state/services/MetricsRetrievalService.java
@@ -70,15 +70,34 @@ import com.google.inject.Inject;
  * in remote metric data. Otherwise, the cache will never be populated. On the
  * first usage of this service, the cache will always be empty. On every
  * subsequent request, the data from the prior invocation of
- * {@link #submit(JMXRunnable)} will be available.
+ * {@link #submitRequest(MetricSourceType, StreamProvider, String)} will be available.
  * <p/>
  * Metric data is cached temporarily and is controlled by
  * {@link Configuration#getMetricsServiceCacheTimeout()}.
+ * <p/>
+ * In order to control throttling requests to the same endpoint,
+ * {@link Configuration#isMetricsServiceRequestTTLCacheEnabled()} can be enabled
+ * to allow for a fixed interval of time to pass between requests.
  */
 @AmbariService
 public class MetricsRetrievalService extends AbstractService {
 
   /**
+   * The type of web service hosting the metrics.
+   */
+  public enum MetricSourceType {
+    /**
+     * JMX
+     */
+    JMX,
+
+    /**
+     * REST
+     */
+    REST
+  }
+
+  /**
    * Logger.
    */
   protected final static Logger LOG = LoggerFactory.getLogger(MetricsRetrievalService.class);
@@ -137,6 +156,18 @@ public class MetricsRetrievalService extends AbstractService {
    */
   private final Set<String> m_queuedUrls = Sets.newConcurrentHashSet();
 
+  /**
+   * An evicting cache which ensures that multiple requests for the same
+   * endpoint are not executed back-to-back. When enabled, a fixed period of
+   * time must pass before this service will make requests to a previously
+   * retrieved endpoint.
+   * <p/>
+   * If this cache is not enabled, then it will be {@code null}.
+   * <p/>
+   * For simplicity, this is a cache of URL to URL.
+   */
+  private Cache<String, String> m_ttlUrlCache;
+
 
   /**
    * The size of the worker queue (used for logged warnings about size).
@@ -166,6 +197,14 @@ public class MetricsRetrievalService extends AbstractService {
     m_restCache = CacheBuilder.newBuilder().expireAfterWrite(jmxCacheExpirationMinutes,
         TimeUnit.MINUTES).build();
 
+    // enable the TTL cache if configured; otherwise leave it as null
+    int ttlSeconds = m_configuration.getMetricCacheTTLSeconds();
+    boolean ttlCacheEnabled = m_configuration.isMetricsServiceRequestTTLCacheEnabled();
+    if (ttlCacheEnabled) {
+      m_ttlUrlCache = CacheBuilder.newBuilder().expireAfterWrite(ttlSeconds,
+          TimeUnit.SECONDS).build();
+    }
+
     // iniitalize the executor service
     int corePoolSize = m_configuration.getMetricsServiceThreadPoolCoreSize();
     int maxPoolSize = m_configuration.getMetricsServiceThreadPoolMaxSize();
@@ -187,6 +226,11 @@ public class MetricsRetrievalService extends AbstractService {
     LOG.info(
         "Initializing the Metrics Retrieval Service with core={}, max={}, workerQueue={}, threadPriority={}",
         corePoolSize, maxPoolSize, m_queueMaximumSize, threadPriority);
+
+    if (ttlCacheEnabled) {
+      LOG.info("Metrics Retrieval Service request TTL cache is enabled and set to {} seconds",
+          ttlSeconds);
+    }
   }
 
   /**
@@ -205,23 +249,34 @@ public class MetricsRetrievalService extends AbstractService {
   protected void doStop() {
     m_jmxCache.invalidateAll();
     m_restCache.invalidateAll();
+
+    if (null != m_ttlUrlCache) {
+      m_ttlUrlCache.invalidateAll();
+    }
+
     m_queuedUrls.clear();
     m_threadPoolExecutor.shutdownNow();
   }
 
   /**
-   * Submit a {@link JMXRunnable} for execution. This will run inside of an
-   * {@link ExecutorService} to retrieve JMX data from a URL endpoint and parse
-   * the result into a {@link JMXMetricHolder}.
+   * Submit a {@link Runnable} for execution which retrieves metric data from
+   * the supplied endpoint. This will run inside of an {@link ExecutorService}
+   * to retrieve metric data from a URL endpoint and parse the result into a
+   * cached value.
    * <p/>
-   * Once JMX data is retrieved it is cached. Data in the cache can be retrieved
-   * via {@link #getCachedJMXMetric(String)}.
+   * Once metric data is retrieved it is cached. Data in the cache can be
+   * retrieved via {@link #getCachedJMXMetric(String)} or
+   * {@link #getCachedRESTMetric(String)}, depending on the type of metric
+   * requested.
    * <p/>
    * Callers need not worry about invoking this mulitple times for the same URL
    * endpoint. A single endpoint will only be enqueued once regardless of how
    * many times this method is called until it has been fully retrieved and
-   * parsed.
+   * parsed. If the last endpoint request was too recent, then this method will
+   * opt to not make another call until the TTL period expires.
    *
+   * @param type
+   *          the type of service hosting the metric (not {@code null}).
    * @param streamProvider
    *          the {@link StreamProvider} to use to read from the remote
    *          endpoint.
@@ -230,62 +285,46 @@ public class MetricsRetrievalService extends AbstractService {
    *
    * @see #getCachedJMXMetric(String)
    */
-  public void submitJMXRequest(StreamProvider streamProvider, String jmxUrl) {
+  public void submitRequest(MetricSourceType type, StreamProvider streamProvider, String url) {
+    // check to ensure that the request isn't already queued
+    if (m_queuedUrls.contains(url)) {
+      return;
+    }
+
+    // check to ensure that the request wasn't made too recently
+    if (null != m_ttlUrlCache && null != m_ttlUrlCache.getIfPresent(url)) {
+      return;
+    }
+
     // log warnings if the queue size seems to be rather large
-    BlockingQueue<Runnable> queue =  m_threadPoolExecutor.getQueue();
+    BlockingQueue<Runnable> queue = m_threadPoolExecutor.getQueue();
     int queueSize = queue.size();
     if (queueSize > Math.floor(0.9f * m_queueMaximumSize)) {
       LOG.warn("The worker queue contains {} work items and is at {}% of capacity", queueSize,
           ((float) queueSize / m_queueMaximumSize) * 100);
     }
 
-    // don't enqueue another request for the same URL
-    if (m_queuedUrls.contains(jmxUrl)) {
-      return;
-    }
-
     // enqueue this URL
-    m_queuedUrls.add(jmxUrl);
-
-    JMXRunnable jmxRunnable = new JMXRunnable(m_jmxCache, m_queuedUrls, m_jmxObjectReader,
-        streamProvider, jmxUrl);
-
-    m_threadPoolExecutor.execute(jmxRunnable);
-  }
-
-  /**
-   * Submit a {@link RESTRunnable} for execution. This will run inside of an
-   * {@link ExecutorService} to retrieve JMX data from a URL endpoint and parse
-   * the result into a {@link Map} of {@link String}.
-   * <p/>
-   * Once REST data is retrieved it is cached. Data in the cache can be
-   * retrieved via {@link #getCachedRESTMetric(String)}.
-   * <p/>
-   * Callers need not worry about invoking this mulitple times for the same URL
-   * endpoint. A single endpoint will only be enqueued once regardless of how
-   * many times this method is called until it has been fully retrieved and
-   * parsed.
-   *
-   * @param streamProvider
-   *          the {@link StreamProvider} to use to read from the remote
-   *          endpoint.
-   * @param restUrl
-   *          the URL to read from
-   *
-   * @see #getCachedRESTMetric(String)
-   */
-  public void submitRESTRequest(StreamProvider streamProvider, String restUrl) {
-    if (m_queuedUrls.contains(restUrl)) {
-      return;
+    m_queuedUrls.add(url);
+
+    Runnable runnable = null;
+    switch (type) {
+      case JMX:
+        runnable = new JMXRunnable(m_jmxCache, m_queuedUrls, m_ttlUrlCache, m_jmxObjectReader,
+            streamProvider, url);
+        break;
+      case REST:
+        runnable = new RESTRunnable(m_restCache, m_queuedUrls, m_ttlUrlCache, m_gson,
+            streamProvider, url);
+        break;
+      default:
+        LOG.warn("Unable to retrieve metrics for the unknown type {}", type);
+        break;
     }
 
-    // enqueue this URL
-    m_queuedUrls.add(restUrl);
-
-    RESTRunnable restRunnable = new RESTRunnable(m_restCache, m_queuedUrls, m_gson,
-        streamProvider, restUrl);
-
-    m_threadPoolExecutor.execute(restRunnable);
+    if (null != runnable) {
+      m_threadPoolExecutor.execute(runnable);
+    }
   }
 
   /**
@@ -293,11 +332,13 @@ public class MetricsRetrievalService extends AbstractService {
    * is no metric data cached for the given URL, then {@code null} is returned.
    * <p/>
    * The onky way this cache is populated is by requesting the data to be loaded
-   * asynchronously via {@link #submit(JMXRunnable)}.
+   * asynchronously via
+   * {@link #submitRequest(MetricSourceType, StreamProvider, String)} with the
+   * {@link MetricSourceType#JMX} type.
    *
    * @param jmxUrl
    *          the URL to retrieve cached data for (not {@code null}).
-   * @return
+   * @return the metric, or {@code null} if none.
    */
   public JMXMetricHolder getCachedJMXMetric(String jmxUrl) {
     return m_jmxCache.getIfPresent(jmxUrl);
@@ -308,11 +349,13 @@ public class MetricsRetrievalService extends AbstractService {
    * metric data cached for the given URL, then {@code null} is returned.
    * <p/>
    * The onky way this cache is populated is by requesting the data to be loaded
-   * asynchronously via {@link #submit(JMXRunnable)}.
+   * asynchronously via
+   * {@link #submitRequest(MetricSourceType, StreamProvider, String)} with the
+   * {@link MetricSourceType#REST} type.
    *
    * @param restUrl
    *          the URL to retrieve cached data for (not {@code null}).
-   * @return
+   * @return the metric, or {@code null} if none.
    */
   public Map<String, String> getCachedRESTMetric(String restUrl) {
     return m_restCache.getIfPresent(restUrl);
@@ -339,6 +382,12 @@ public class MetricsRetrievalService extends AbstractService {
     private final Set<String> m_queuedUrls;
 
     /**
+     * An evicting cache used to control whether a request for a metric can be
+     * made or if it is too soon after the last request.
+     */
+    private final Cache<String, String> m_ttlUrlCache;
+
+    /**
      * Constructor.
      *
      * @param streamProvider
@@ -349,11 +398,17 @@ public class MetricsRetrievalService extends AbstractService {
      *          the URLs which are currently waiting to be processed. This
      *          method will remove the specified URL from this {@link Set} when
      *          it completes (successful or not).
+     * @param m_ttlUrlCache
+     *          an evicting cache which is used to determine if a request for a
+     *          metric is too soon after the last request, or {@code null} if
+     *          requests can be made sequentially without any separation.
      */
-    private MetricRunnable(StreamProvider streamProvider, String url, Set<String> queuedUrls) {
+    private MetricRunnable(StreamProvider streamProvider, String url, Set<String> queuedUrls,
+        Cache<String, String> ttlUrlCache) {
       m_streamProvider = streamProvider;
       m_url = url;
       m_queuedUrls = queuedUrls;
+      m_ttlUrlCache = ttlUrlCache;
     }
 
     /**
@@ -382,14 +437,19 @@ public class MetricsRetrievalService extends AbstractService {
         inputStream = m_streamProvider.readFrom(m_url);
         processInputStreamAndCacheResult(inputStream);
 
+        // cache the URL, but only after successful parsing of the response
+        if (null != m_ttlUrlCache) {
+          m_ttlUrlCache.put(m_url, m_url);
+        }
+
       } catch (Exception exception) {
         logException(exception, m_url);
       } finally {
+        IOUtils.closeQuietly(inputStream);
+
         // remove this URL from the list of queued URLs to ensure it will be
         // requested again
         m_queuedUrls.remove(m_url);
-
-        IOUtils.closeQuietly(inputStream);
       }
     }
 
@@ -454,19 +514,20 @@ public class MetricsRetrievalService extends AbstractService {
     private final ObjectReader m_jmxObjectReader;
     private final Cache<String, JMXMetricHolder> m_cache;
 
-
     /**
      * Constructor.
      *
      * @param cache
      * @param queuedUrls
+     * @param ttlUrlCache
      * @param jmxObjectReader
      * @param streamProvider
      * @param jmxUrl
      */
     private JMXRunnable(Cache<String, JMXMetricHolder> cache, Set<String> queuedUrls,
-        ObjectReader jmxObjectReader, StreamProvider streamProvider, String jmxUrl) {
-      super(streamProvider, jmxUrl, queuedUrls);
+        Cache<String, String> ttlUrlCache, ObjectReader jmxObjectReader,
+        StreamProvider streamProvider, String jmxUrl) {
+      super(streamProvider, jmxUrl, queuedUrls, ttlUrlCache);
       m_cache = cache;
       m_jmxObjectReader = jmxObjectReader;
     }
@@ -497,13 +558,15 @@ public class MetricsRetrievalService extends AbstractService {
      *
      * @param cache
      * @param queuedUrls
+     * @param ttlUrlCache
      * @param gson
      * @param streamProvider
      * @param restUrl
      */
     private RESTRunnable(Cache<String, Map<String, String>> cache, Set<String> queuedUrls,
-        Gson gson, StreamProvider streamProvider, String restUrl) {
-      super(streamProvider, restUrl, queuedUrls);
+        Cache<String, String> ttlUrlCache, Gson gson, StreamProvider streamProvider,
+        String restUrl) {
+      super(streamProvider, restUrl, queuedUrls, ttlUrlCache);
       m_cache = cache;
       m_gson = gson;
     }

http://git-wip-us.apache.org/repos/asf/ambari/blob/8efbde8f/ambari-server/src/test/java/org/apache/ambari/server/controller/metrics/JMXPropertyProviderTest.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/controller/metrics/JMXPropertyProviderTest.java b/ambari-server/src/test/java/org/apache/ambari/server/controller/metrics/JMXPropertyProviderTest.java
index 80d7438..12a829f 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/controller/metrics/JMXPropertyProviderTest.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/controller/metrics/JMXPropertyProviderTest.java
@@ -89,7 +89,13 @@ public class JMXPropertyProviderTest {
   @BeforeClass
   public static void setupClass() {
     Injector injector = Guice.createInjector(new InMemoryDefaultTestModule());
-    JMXPropertyProvider.init(injector.getInstance(Configuration.class));
+
+    // disable request TTL for these tests
+    Configuration configuration = injector.getInstance(Configuration.class);
+    configuration.setProperty(Configuration.METRIC_RETRIEVAL_SERVICE_REQUEST_TTL_ENABLED.getKey(),
+        "false");
+
+    JMXPropertyProvider.init(configuration);
 
     metricPropertyProviderFactory = injector.getInstance(MetricPropertyProviderFactory.class);
 

http://git-wip-us.apache.org/repos/asf/ambari/blob/8efbde8f/ambari-server/src/test/java/org/apache/ambari/server/controller/metrics/RestMetricsPropertyProviderTest.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/controller/metrics/RestMetricsPropertyProviderTest.java b/ambari-server/src/test/java/org/apache/ambari/server/controller/metrics/RestMetricsPropertyProviderTest.java
index 304b42f..a9934a2 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/controller/metrics/RestMetricsPropertyProviderTest.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/controller/metrics/RestMetricsPropertyProviderTest.java
@@ -112,7 +112,13 @@ public class RestMetricsPropertyProviderTest {
     clusters = injector.getInstance(Clusters.class);
     clusters.addCluster("c1", new StackId("HDP-2.1.1"));
     c1 = clusters.getCluster("c1");
-    JMXPropertyProvider.init(injector.getInstance(Configuration.class));
+
+    // disable request TTL for these tests
+    Configuration configuration = injector.getInstance(Configuration.class);
+    configuration.setProperty(Configuration.METRIC_RETRIEVAL_SERVICE_REQUEST_TTL_ENABLED.getKey(),
+        "false");
+
+    JMXPropertyProvider.init(configuration);
 
     MetricsRetrievalService metricsRetrievalService = injector.getInstance(
         MetricsRetrievalService.class);

http://git-wip-us.apache.org/repos/asf/ambari/blob/8efbde8f/ambari-server/src/test/java/org/apache/ambari/server/state/services/MetricsRetrievalServiceTest.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/state/services/MetricsRetrievalServiceTest.java b/ambari-server/src/test/java/org/apache/ambari/server/state/services/MetricsRetrievalServiceTest.java
new file mode 100644
index 0000000..784ba92
--- /dev/null
+++ b/ambari-server/src/test/java/org/apache/ambari/server/state/services/MetricsRetrievalServiceTest.java
@@ -0,0 +1,195 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.server.state.services;
+
+import java.io.InputStream;
+import java.util.Map;
+
+import javax.persistence.EntityManager;
+
+import org.apache.ambari.server.configuration.Configuration;
+import org.apache.ambari.server.controller.jmx.JMXMetricHolder;
+import org.apache.ambari.server.controller.utilities.StreamProvider;
+import org.apache.ambari.server.orm.DBAccessor;
+import org.apache.ambari.server.state.Cluster;
+import org.apache.ambari.server.state.Clusters;
+import org.apache.ambari.server.state.services.MetricsRetrievalService.MetricSourceType;
+import org.apache.ambari.server.state.stack.OsFamily;
+import org.apache.ambari.server.utils.SynchronousThreadPoolExecutor;
+import org.apache.commons.io.IOUtils;
+import org.easymock.EasyMock;
+import org.easymock.EasyMockSupport;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.google.inject.Binder;
+import com.google.inject.Guice;
+import com.google.inject.Injector;
+import com.google.inject.Module;
+
+import junit.framework.Assert;
+
+/**
+ * Tests the {@link MetricsRetrievalService}.
+ */
+public class MetricsRetrievalServiceTest extends EasyMockSupport {
+
+  private Injector m_injector;
+
+  private static final String JMX_URL = "http://jmx-endpoint";
+  private static final String REST_URL = "http://rest-endpoint";
+
+  MetricsRetrievalService m_service = new MetricsRetrievalService();
+
+  /**
+   *
+   */
+  @Before
+  public void before() {
+    // create an injector which will inject the mocks
+    m_injector = Guice.createInjector(new MockModule());
+    m_injector.injectMembers(m_service);
+  }
+
+  /**
+   * Tests that initial missing values are returned correctly as {@code null}.
+   */
+  @Test
+  public void testCachedValueRetrievalDoesNotRequest() throws Exception {
+    m_service.doStart();
+
+    JMXMetricHolder jmxMetricHolder = m_service.getCachedJMXMetric(JMX_URL);
+    Assert.assertNull(jmxMetricHolder);
+
+    Map<String, String> restMetrics = m_service.getCachedRESTMetric(REST_URL);
+    Assert.assertNull(restMetrics);
+  }
+
+  /**
+   * Tests retrieval of metrics.
+   */
+  @Test
+  public void testRetrievalOfMetrics() throws Exception {
+    InputStream jmxInputStream = IOUtils.toInputStream("{ \"beans\": [] }");
+    InputStream restInputStream = IOUtils.toInputStream("{}");
+
+    StreamProvider streamProvider = createNiceMock(StreamProvider.class);
+    EasyMock.expect(streamProvider.readFrom(JMX_URL)).andReturn(jmxInputStream).once();
+    EasyMock.expect(streamProvider.readFrom(REST_URL)).andReturn(restInputStream).once();
+
+    replayAll();
+
+    m_service.doStart();
+
+    // make the service synchronous
+    m_service.setThreadPoolExecutor(new SynchronousThreadPoolExecutor());
+
+    JMXMetricHolder jmxMetricHolder = m_service.getCachedJMXMetric(JMX_URL);
+    Assert.assertNull(jmxMetricHolder);
+
+    Map<String, String> restMetrics = m_service.getCachedRESTMetric(REST_URL);
+    Assert.assertNull(restMetrics);
+
+    m_service.submitRequest(MetricSourceType.JMX, streamProvider, JMX_URL);
+    jmxMetricHolder = m_service.getCachedJMXMetric(JMX_URL);
+    Assert.assertNotNull(jmxMetricHolder);
+
+    m_service.submitRequest(MetricSourceType.REST, streamProvider, REST_URL);
+    restMetrics = m_service.getCachedRESTMetric(REST_URL);
+    Assert.assertNotNull(restMetrics);
+
+    verifyAll();
+  }
+
+  /**
+   * Tests that many requests to the same URL do not invoke the stream provider
+   * more than once.
+   */
+  @Test
+  public void testRequestTTL() throws Exception {
+    InputStream jmxInputStream = IOUtils.toInputStream("{ \"beans\": [] }");
+
+    // only allow a single call to the mock
+    StreamProvider streamProvider = createStrictMock(StreamProvider.class);
+    EasyMock.expect(streamProvider.readFrom(JMX_URL)).andReturn(jmxInputStream).once();
+
+    replayAll();
+
+    m_service.doStart();
+
+    // make the service synchronous
+    m_service.setThreadPoolExecutor(new SynchronousThreadPoolExecutor());
+
+    // make 100 requests in rapid succession to the same URL
+    for (int i = 0; i < 100; i++) {
+      m_service.submitRequest(MetricSourceType.JMX, streamProvider, JMX_URL);
+    }
+
+    verifyAll();
+  }
+
+  /**
+   * Tests that disabling the request TTL allows subsequent requests for the
+   * same resource.
+   */
+  @Test
+  public void testRequestTTLDisabled() throws Exception {
+    Configuration configuration = m_injector.getInstance(Configuration.class);
+    configuration.setProperty(
+        Configuration.METRIC_RETRIEVAL_SERVICE_REQUEST_TTL_ENABLED.getKey(), "false");
+
+    InputStream jmxInputStream = IOUtils.toInputStream("{ \"beans\": [] }");
+
+    // allow 100 calls to the mock exactly
+    StreamProvider streamProvider = createStrictMock(StreamProvider.class);
+    EasyMock.expect(streamProvider.readFrom(JMX_URL)).andReturn(jmxInputStream).times(100);
+
+    replayAll();
+
+    m_service.doStart();
+
+    // make the service synchronous
+    m_service.setThreadPoolExecutor(new SynchronousThreadPoolExecutor());
+
+    // make 100 requests in rapid succession to the same URL
+    for (int i = 0; i < 100; i++) {
+      m_service.submitRequest(MetricSourceType.JMX, streamProvider, JMX_URL);
+    }
+
+    verifyAll();
+  }
+
+  /**
+   *
+   */
+  private class MockModule implements Module {
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public void configure(Binder binder) {
+      Cluster cluster = EasyMock.createNiceMock(Cluster.class);
+      binder.bind(Clusters.class).toInstance(createNiceMock(Clusters.class));
+      binder.bind(OsFamily.class).toInstance(createNiceMock(OsFamily.class));
+      binder.bind(DBAccessor.class).toInstance(createNiceMock(DBAccessor.class));
+      binder.bind(Cluster.class).toInstance(cluster);
+      binder.bind(EntityManager.class).toInstance(createNiceMock(EntityManager.class));
+    }
+  }
+
+}