You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by ao...@apache.org on 2017/04/11 11:14:27 UTC

[05/22] ambari git commit: AMBARI-20591 In case of HA-enabled cluster on shutting down Active Master, Ambari rest api call and HBase quick links show two Active Masters (dsen)

AMBARI-20591 In case of HA-enabled cluster on shutting down Active Master, Ambari rest api call and HBase quick links show two Active Masters (dsen)


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/b2bcc38e
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/b2bcc38e
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/b2bcc38e

Branch: refs/heads/branch-3.0-perf
Commit: b2bcc38ed917f3b43fa21133f64cdb4642e3c2e1
Parents: 8de3961
Author: Dmytro Sen <ds...@apache.org>
Authored: Thu Apr 6 15:13:52 2017 +0300
Committer: Andrew Onishuk <ao...@hortonworks.com>
Committed: Tue Apr 11 14:14:12 2017 +0300

----------------------------------------------------------------------
 .../state/services/MetricsRetrievalService.java | 29 ++++++++-
 .../services/MetricsRetrievalServiceTest.java   | 63 ++++++++++++++++++++
 2 files changed, 91 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/b2bcc38e/ambari-server/src/main/java/org/apache/ambari/server/state/services/MetricsRetrievalService.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/state/services/MetricsRetrievalService.java b/ambari-server/src/main/java/org/apache/ambari/server/state/services/MetricsRetrievalService.java
index 79e0e25..59ec15b 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/state/services/MetricsRetrievalService.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/state/services/MetricsRetrievalService.java
@@ -18,6 +18,7 @@
 package org.apache.ambari.server.state.services;
 
 import java.io.BufferedReader;
+import java.io.IOException;
 import java.io.InputStream;
 import java.io.InputStreamReader;
 import java.lang.Thread.UncaughtExceptionHandler;
@@ -441,7 +442,12 @@ public class MetricsRetrievalService extends AbstractService {
         if (null != m_ttlUrlCache) {
           m_ttlUrlCache.put(m_url, m_url);
         }
-
+      } catch (IOException exception)
+      {
+        LOG.debug("Removing cached values for url {}", m_url);
+        // need to ensure old values are removed because they could be not valid if the state have changed.
+        removeCachedMetricsForCurrentURL();
+        logException(exception, m_url);
       } catch (Exception exception) {
         logException(exception, m_url);
       } finally {
@@ -454,6 +460,11 @@ public class MetricsRetrievalService extends AbstractService {
     }
 
     /**
+     * Removes metric values for current URL from cache.
+     */
+    protected abstract void removeCachedMetricsForCurrentURL();
+
+    /**
      * Reads data from the specified {@link InputStream} and processes that into
      * a cachable value. The value will then be cached by this method.
      *
@@ -536,6 +547,14 @@ public class MetricsRetrievalService extends AbstractService {
      * {@inheritDoc}
      */
     @Override
+    protected void removeCachedMetricsForCurrentURL() {
+      m_cache.invalidate(m_url);
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
     protected void processInputStreamAndCacheResult(InputStream inputStream) throws Exception {
       JMXMetricHolder jmxMetricHolder = m_jmxObjectReader.readValue(inputStream);
       m_cache.put(m_url, jmxMetricHolder);
@@ -575,6 +594,14 @@ public class MetricsRetrievalService extends AbstractService {
      * {@inheritDoc}
      */
     @Override
+    protected void removeCachedMetricsForCurrentURL() {
+      m_cache.invalidate(m_url);
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
     protected void processInputStreamAndCacheResult(InputStream inputStream) throws Exception {
       Type type = new TypeToken<Map<Object, Object>>() {}.getType();
 

http://git-wip-us.apache.org/repos/asf/ambari/blob/b2bcc38e/ambari-server/src/test/java/org/apache/ambari/server/state/services/MetricsRetrievalServiceTest.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/state/services/MetricsRetrievalServiceTest.java b/ambari-server/src/test/java/org/apache/ambari/server/state/services/MetricsRetrievalServiceTest.java
index 784ba92..ea204aa 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/state/services/MetricsRetrievalServiceTest.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/state/services/MetricsRetrievalServiceTest.java
@@ -17,6 +17,7 @@
  */
 package org.apache.ambari.server.state.services;
 
+import java.io.IOException;
 import java.io.InputStream;
 import java.util.Map;
 
@@ -117,6 +118,68 @@ public class MetricsRetrievalServiceTest extends EasyMockSupport {
   }
 
   /**
+   * Test removing cached values if request failed with IOException.
+   */
+  @Test
+  public void testRemovingValuesFromCacheOnFail() throws Exception {
+
+    Configuration configuration = m_injector.getInstance(Configuration.class);
+    configuration.setProperty(
+            Configuration.METRIC_RETRIEVAL_SERVICE_REQUEST_TTL.getKey(), "1");
+
+    InputStream jmxInputStream = IOUtils.toInputStream("{ \"beans\": [] }");
+    InputStream restInputStream = IOUtils.toInputStream("{}");
+
+    StreamProvider streamProvider = createNiceMock(StreamProvider.class);
+
+    EasyMock.expect(streamProvider.readFrom(JMX_URL)).andReturn(jmxInputStream).once();
+    EasyMock.expect(streamProvider.readFrom(REST_URL)).andReturn(restInputStream).once();
+
+    EasyMock.expect(streamProvider.readFrom(JMX_URL)).andThrow(new IOException()).once();
+    EasyMock.expect(streamProvider.readFrom(REST_URL)).andThrow(new IOException()).once();
+
+    replayAll();
+
+    m_service.doStart();
+
+    // make the service synchronous
+    m_service.setThreadPoolExecutor(new SynchronousThreadPoolExecutor());
+
+    JMXMetricHolder jmxMetricHolder = m_service.getCachedJMXMetric(JMX_URL);
+    Assert.assertNull(jmxMetricHolder);
+
+    Map<String, String> restMetrics = m_service.getCachedRESTMetric(REST_URL);
+    Assert.assertNull(restMetrics);
+
+    m_service.submitRequest(MetricSourceType.JMX, streamProvider, JMX_URL);
+    jmxMetricHolder = m_service.getCachedJMXMetric(JMX_URL);
+    Assert.assertNotNull(jmxMetricHolder);
+
+    m_service.submitRequest(MetricSourceType.REST, streamProvider, REST_URL);
+    restMetrics = m_service.getCachedRESTMetric(REST_URL);
+    Assert.assertNotNull(restMetrics);
+
+
+    jmxMetricHolder = m_service.getCachedJMXMetric(JMX_URL);
+    Assert.assertNotNull(jmxMetricHolder);
+
+    restMetrics = m_service.getCachedRESTMetric(REST_URL);
+    Assert.assertNotNull(restMetrics);
+
+    Thread.sleep(1000);
+
+    m_service.submitRequest(MetricSourceType.JMX, streamProvider, JMX_URL);
+    jmxMetricHolder = m_service.getCachedJMXMetric(JMX_URL);
+    Assert.assertNull(jmxMetricHolder);
+
+    m_service.submitRequest(MetricSourceType.REST, streamProvider, REST_URL);
+    restMetrics = m_service.getCachedRESTMetric(REST_URL);
+    Assert.assertNull(restMetrics);
+
+    verifyAll();
+  }
+
+  /**
    * Tests that many requests to the same URL do not invoke the stream provider
    * more than once.
    */