You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by sw...@apache.org on 2016/04/29 01:21:11 UTC

ambari git commit: AMBARI-16143. Ambari metrics API call should allow for early failure. (swagle)

Repository: ambari
Updated Branches:
  refs/heads/trunk a4af94292 -> 35b4ef6fe


AMBARI-16143. Ambari metrics API call should allow for early failure. (swagle)


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/35b4ef6f
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/35b4ef6f
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/35b4ef6f

Branch: refs/heads/trunk
Commit: 35b4ef6fe18b6a88abdef9998e41010c9035f293
Parents: a4af942
Author: Siddharth Wagle <sw...@hortonworks.com>
Authored: Thu Apr 28 16:20:35 2016 -0700
Committer: Siddharth Wagle <sw...@hortonworks.com>
Committed: Thu Apr 28 16:20:41 2016 -0700

----------------------------------------------------------------------
 .../metrics/MetricsPropertyProvider.java        |  2 +-
 .../metrics/timeline/AMSPropertyProvider.java   | 44 ++++-------
 .../timeline/AMSReportPropertyProvider.java     | 19 +++--
 .../metrics/timeline/MetricsRequestHelper.java  |  2 +-
 .../timeline/cache/TimelineMetricCache.java     | 19 ++++-
 .../cache/TimelineMetricCacheEntryFactory.java  |  2 +
 .../timeline/AMSPropertyProviderTest.java       | 81 +++++++++++++++++++-
 7 files changed, 130 insertions(+), 39 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/35b4ef6f/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/MetricsPropertyProvider.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/MetricsPropertyProvider.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/MetricsPropertyProvider.java
index a346051..61d7a17 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/MetricsPropertyProvider.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/MetricsPropertyProvider.java
@@ -132,7 +132,7 @@ public abstract class MetricsPropertyProvider extends AbstractPropertyProvider {
       return resources;
     }
 
-    if(!checkAuthorizationForMetrics(resources, clusterNamePropertyId)) {
+    if (!checkAuthorizationForMetrics(resources, clusterNamePropertyId)) {
       return resources;
     }
 

http://git-wip-us.apache.org/repos/asf/ambari/blob/35b4ef6f/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/AMSPropertyProvider.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/AMSPropertyProvider.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/AMSPropertyProvider.java
index 4bc9fd7..9a7454c 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/AMSPropertyProvider.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/AMSPropertyProvider.java
@@ -196,7 +196,7 @@ public abstract class AMSPropertyProvider extends MetricsPropertyProvider {
      * @throws SystemException if unable to populate the resources
      */
     @SuppressWarnings("unchecked")
-    public Collection<Resource> populateResources() throws SystemException {
+    public Collection<Resource> populateResources() throws SystemException, IOException {
       // No open ended query support.
       if (temporalInfo != null && (temporalInfo.getStartTime() == null
           || temporalInfo.getEndTime() == null)) {
@@ -218,20 +218,10 @@ public abstract class AMSPropertyProvider extends MetricsPropertyProvider {
         if (!hostComponentHostMetrics.isEmpty()) {
           String hostComponentHostMetricParams = getSetString(processRegexps(hostComponentHostMetrics), -1);
           setQueryParams(hostComponentHostMetricParams, hostnames, true, componentName);
-          TimelineMetrics metricsResponse = null;
-          try {
-            metricsResponse = getTimelineMetricsFromCache(
+          TimelineMetrics metricsResponse = getTimelineMetricsFromCache(
               getTimelineAppMetricCacheKey(hostComponentHostMetrics,
                 componentName, hostnames, uriBuilder.toString()), componentName);
-          } catch (IOException e) {
-            if (LOG.isDebugEnabled()) {
-              LOG.debug("Caught exception fetching metric data.", e);
-            }
-            // Skip further queries to preempt long calls due to timeout
-            if (e instanceof SocketTimeoutException) {
-              return Collections.emptySet();
-            }
-          }
+
           if (metricsResponse != null) {
             timelineMetrics.getMetrics().addAll(metricsResponse.getMetrics());
           }
@@ -240,20 +230,10 @@ public abstract class AMSPropertyProvider extends MetricsPropertyProvider {
         if (!nonHostComponentMetrics.isEmpty()) {
           String nonHostComponentHostMetricParams = getSetString(processRegexps(nonHostComponentMetrics), -1);
           setQueryParams(nonHostComponentHostMetricParams, hostnames, false, componentName);
-          TimelineMetrics metricsResponse = null;
-          try {
-            metricsResponse = getTimelineMetricsFromCache(
+          TimelineMetrics metricsResponse = getTimelineMetricsFromCache(
               getTimelineAppMetricCacheKey(nonHostComponentMetrics,
                 componentName, hostnames, uriBuilder.toString()), componentName);
-          } catch (IOException e) {
-            if (LOG.isDebugEnabled()) {
-              LOG.debug("Caught exception fetching metric data.", e);
-            }
-            // Skip further queries to preempt long calls due to timeout
-            if (e instanceof SocketTimeoutException) {
-              return Collections.emptySet();
-            }
-          }
+
           if (metricsResponse != null) {
             timelineMetrics.getMetrics().addAll(metricsResponse.getMetrics());
           }
@@ -485,8 +465,18 @@ public abstract class AMSPropertyProvider extends MetricsPropertyProvider {
     // For each cluster
     for (Map.Entry<String, Map<TemporalInfo, MetricsRequest>> clusterEntry : requestMap.entrySet()) {
       // For each request
-      for (MetricsRequest metricsRequest : clusterEntry.getValue().values() ) {
-        metricsRequest.populateResources();
+      for (MetricsRequest metricsRequest : clusterEntry.getValue().values()) {
+        try {
+          metricsRequest.populateResources();
+        } catch (IOException io) {
+          // Skip further queries to preempt long calls due to timeout
+          if (io instanceof SocketTimeoutException) {
+            if (LOG.isDebugEnabled()) {
+              LOG.debug("Skip populating resources on socket timeout.");
+            }
+            break;
+          }
+        }
       }
     }
 

http://git-wip-us.apache.org/repos/asf/ambari/blob/35b4ef6f/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/AMSReportPropertyProvider.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/AMSReportPropertyProvider.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/AMSReportPropertyProvider.java
index 306390c..3688742 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/AMSReportPropertyProvider.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/AMSReportPropertyProvider.java
@@ -38,6 +38,7 @@ import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
 import org.apache.http.client.utils.URIBuilder;
 
 import java.io.IOException;
+import java.net.SocketTimeoutException;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
@@ -204,15 +205,21 @@ public class AMSReportPropertyProvider extends MetricsReportPropertyProvider {
 
       // Self populating cache updates itself on every get with latest results
       TimelineMetrics timelineMetrics;
-      if (metricCache != null && metricCacheKey.getTemporalInfo() != null) {
-        timelineMetrics = metricCache.getAppTimelineMetricsFromCache(metricCacheKey);
-      } else {
-        try {
+      try {
+        if (metricCache != null && metricCacheKey.getTemporalInfo() != null) {
+          timelineMetrics = metricCache.getAppTimelineMetricsFromCache(metricCacheKey);
+        } else {
           timelineMetrics = requestHelper.fetchTimelineMetrics(uriBuilder,
             temporalInfo.getStartTimeMillis(),
             temporalInfo.getEndTimeMillis());
-        } catch (IOException e) {
-          timelineMetrics = null;
+        }
+      } catch (IOException io) {
+        timelineMetrics = null;
+        if (io instanceof SocketTimeoutException) {
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Skip populating metrics on socket timeout exception.");
+          }
+          break;
         }
       }
 

http://git-wip-us.apache.org/repos/asf/ambari/blob/35b4ef6f/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/MetricsRequestHelper.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/MetricsRequestHelper.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/MetricsRequestHelper.java
index 94014f8..388de15 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/MetricsRequestHelper.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/MetricsRequestHelper.java
@@ -117,7 +117,7 @@ public class MetricsRequestHelper {
       }
 
       if (io instanceof SocketTimeoutException) {
-        errorMsg += " Can not connect to collector, socket error.";
+        errorMsg += " Cannot connect to collector: SocketTimeoutException.";
         LOG.error(errorMsg);
         throw io;
       }

http://git-wip-us.apache.org/repos/asf/ambari/blob/35b4ef6f/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/cache/TimelineMetricCache.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/cache/TimelineMetricCache.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/cache/TimelineMetricCache.java
index 8b17a23..b5fe05e 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/cache/TimelineMetricCache.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/cache/TimelineMetricCache.java
@@ -30,6 +30,8 @@ import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
+import java.net.SocketTimeoutException;
 import java.util.ArrayList;
 import java.util.concurrent.atomic.AtomicInteger;
 
@@ -54,7 +56,7 @@ public class TimelineMetricCache extends UpdatingSelfPopulatingCache {
    * @param key @TimelineAppMetricCacheKey
    * @return @org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics
    */
-  public TimelineMetrics getAppTimelineMetricsFromCache(TimelineAppMetricCacheKey key) throws IllegalArgumentException {
+  public TimelineMetrics getAppTimelineMetricsFromCache(TimelineAppMetricCacheKey key) throws IllegalArgumentException, IOException {
     if (LOG.isDebugEnabled()) {
       LOG.debug("Fetching metrics with key: " + key);
     }
@@ -62,7 +64,20 @@ public class TimelineMetricCache extends UpdatingSelfPopulatingCache {
     // Make sure key is valid
     validateKey(key);
 
-    Element element = get(key);
+    Element element = null;
+    try {
+      element = get(key);
+    } catch (LockTimeoutException le) {
+      // Ehcache masks the Socket Timeout to look as a LockTimeout
+      Throwable t = le.getCause();
+      if (t instanceof CacheException) {
+        t = t.getCause();
+        if (t instanceof SocketTimeoutException) {
+          throw new SocketTimeoutException(t.getMessage());
+        }
+      }
+    }
+
     TimelineMetrics timelineMetrics = new TimelineMetrics();
     if (element != null && element.getObjectValue() != null) {
       TimelineMetricsCacheValue value = (TimelineMetricsCacheValue) element.getObjectValue();

http://git-wip-us.apache.org/repos/asf/ambari/blob/35b4ef6f/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/cache/TimelineMetricCacheEntryFactory.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/cache/TimelineMetricCacheEntryFactory.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/cache/TimelineMetricCacheEntryFactory.java
index ed0f878..4880d98 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/cache/TimelineMetricCacheEntryFactory.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/cache/TimelineMetricCacheEntryFactory.java
@@ -91,6 +91,7 @@ public class TimelineMetricCacheEntryFactory implements UpdatingCacheEntryFactor
         metricCacheKey.getTemporalInfo().getEndTimeMillis());
     } catch (IOException io) {
       LOG.debug("Caught IOException on fetching metrics. " + io.getMessage());
+      throw io;
     }
 
     TimelineMetricsCacheValue value = null;
@@ -191,6 +192,7 @@ public class TimelineMetricCacheEntryFactory implements UpdatingCacheEntryFactor
         if (LOG.isDebugEnabled()) {
           LOG.debug("Exception retrieving metrics.", io);
         }
+        throw io;
       }
     } else {
       LOG.debug("Skip updating cache with new startTime = " +

http://git-wip-us.apache.org/repos/asf/ambari/blob/35b4ef6f/ambari-server/src/test/java/org/apache/ambari/server/controller/metrics/timeline/AMSPropertyProviderTest.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/controller/metrics/timeline/AMSPropertyProviderTest.java b/ambari-server/src/test/java/org/apache/ambari/server/controller/metrics/timeline/AMSPropertyProviderTest.java
index 3adf9f7..c7dabf1 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/controller/metrics/timeline/AMSPropertyProviderTest.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/controller/metrics/timeline/AMSPropertyProviderTest.java
@@ -23,6 +23,7 @@ import org.apache.ambari.server.configuration.ComponentSSLConfiguration;
 import org.apache.ambari.server.configuration.Configuration;
 import org.apache.ambari.server.controller.AmbariManagementController;
 import org.apache.ambari.server.controller.AmbariServer;
+import org.apache.ambari.server.controller.internal.AbstractPropertyProvider;
 import org.apache.ambari.server.controller.internal.PropertyInfo;
 import org.apache.ambari.server.controller.internal.ResourceImpl;
 import org.apache.ambari.server.controller.internal.TemporalInfoImpl;
@@ -43,6 +44,7 @@ import org.apache.ambari.server.state.Cluster;
 import org.apache.ambari.server.state.Clusters;
 import org.apache.ambari.server.state.ComponentInfo;
 import org.apache.ambari.server.state.StackId;
+import org.apache.commons.httpclient.HttpConnection;
 import org.apache.http.client.utils.URIBuilder;
 import org.easymock.EasyMock;
 import org.junit.After;
@@ -50,16 +52,21 @@ import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
+import org.mockito.Matchers;
 import org.powermock.api.easymock.PowerMock;
 import org.powermock.core.classloader.annotations.PowerMockIgnore;
 import org.powermock.core.classloader.annotations.PrepareForTest;
 import org.powermock.modules.junit4.PowerMockRunner;
 import org.springframework.security.core.context.SecurityContextHolder;
 
+import javax.ws.rs.HttpMethod;
 import java.io.File;
 import java.io.IOException;
 import java.io.InputStream;
 import java.lang.reflect.Field;
+import java.lang.reflect.Method;
+import java.net.HttpURLConnection;
+import java.net.SocketTimeoutException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
@@ -70,9 +77,14 @@ import java.util.Set;
 
 import static org.apache.ambari.server.controller.metrics.MetricsServiceProvider.MetricsService;
 import static org.easymock.EasyMock.anyObject;
+import static org.easymock.EasyMock.createMockBuilder;
 import static org.easymock.EasyMock.createNiceMock;
 import static org.easymock.EasyMock.expect;
 import static org.easymock.EasyMock.replay;
+import static org.easymock.EasyMock.verify;
+import static org.mockito.Matchers.anyCollection;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
 
 @RunWith(PowerMockRunner.class)
@@ -934,6 +946,72 @@ public class AMSPropertyProviderTest {
     }
   }
 
+  @Test
+  public void testSocketTimeoutExceptionBehavior() throws Exception {
+    setUpCommonMocks();
+
+    SecurityContextHolder.getContext().setAuthentication(
+      TestAuthenticationFactory.createClusterAdministrator("ClusterAdmin", 2L));
+
+    URLStreamProvider streamProvider = createNiceMock(URLStreamProvider.class);
+    HttpURLConnection connection = createNiceMock(HttpURLConnection.class);
+
+    expect(streamProvider.processURL((String) anyObject(), (String) anyObject(),
+      (String) anyObject(),  (Map<String, List<String>>) anyObject())).andReturn(connection);
+
+    expect(connection.getInputStream()).andThrow(
+      new SocketTimeoutException("Unit test raising Exception")).once();
+
+    replay(streamProvider, connection);
+
+    injectCacheEntryFactoryWithStreamProvider(streamProvider);
+    TestMetricHostProvider metricHostProvider = new TestMetricHostProvider();
+    ComponentSSLConfiguration sslConfiguration = mock(ComponentSSLConfiguration.class);
+
+    Map<String, Map<String, PropertyInfo>> propertyIds = PropertyHelper.getMetricPropertyIds(Resource.Type.Host);
+
+    AMSPropertyProvider propertyProvider = new AMSHostPropertyProvider(
+      propertyIds,
+      streamProvider,
+      sslConfiguration,
+      new TimelineMetricCacheProvider(new Configuration(), cacheEntryFactory),
+      metricHostProvider,
+      CLUSTER_NAME_PROPERTY_ID,
+      HOST_NAME_PROPERTY_ID
+    );
+
+    final Resource resource1 = new ResourceImpl(Resource.Type.Host);
+    resource1.setProperty(CLUSTER_NAME_PROPERTY_ID, "c1");
+    resource1.setProperty(HOST_NAME_PROPERTY_ID, "h1");
+    final Resource resource2 = new ResourceImpl(Resource.Type.Host);
+    resource2.setProperty(CLUSTER_NAME_PROPERTY_ID, "c1");
+    resource2.setProperty(HOST_NAME_PROPERTY_ID, "h2");
+
+    // Separating temporal info to ensure multiple requests made
+    Map<String, TemporalInfo> temporalInfoMap = new HashMap<String, TemporalInfo>();
+    temporalInfoMap.put(PROPERTY_ID1, new TemporalInfoImpl(1416445244801L, 1416448936464L, 1L));
+    temporalInfoMap.put(PROPERTY_ID2, new TemporalInfoImpl(1416445344901L, 1416448946564L, 1L));
+
+    Request request = PropertyHelper.getReadRequest(
+      new HashSet<String>() {{
+        add(PROPERTY_ID1);
+        add(PROPERTY_ID2);
+      }}, temporalInfoMap);
+
+    Set<Resource> resources =
+      propertyProvider.populateResources(
+        new HashSet<Resource>() {{ add(resource1); add(resource2); }}, request, null);
+
+    verify(streamProvider, connection);
+
+    Assert.assertEquals(2, resources.size());
+    Resource res = resources.iterator().next();
+    Map<String, Object> properties = PropertyHelper.getProperties(resources.iterator().next());
+    Assert.assertNotNull(properties);
+    Assert.assertNull(res.getPropertyValue(PROPERTY_ID1));
+    Assert.assertNull(res.getPropertyValue(PROPERTY_ID2));
+  }
+
   public static class TestMetricHostProvider implements MetricHostProvider {
 
     private String hostName;
@@ -992,7 +1070,7 @@ public class AMSPropertyProviderTest {
     expect(ams.getAmbariMetaInfo()).andReturn(ambariMetaInfo).anyTimes();
     expect(ambariMetaInfo.getComponentToService(anyObject(String.class),
       anyObject(String.class), anyObject(String.class))).andReturn("HDFS").anyTimes();
-    expect(ambariMetaInfo.getComponent(anyObject(String.class),anyObject(String.class),
+    expect(ambariMetaInfo.getComponent(anyObject(String.class), anyObject(String.class),
       anyObject(String.class), anyObject(String.class)))
       .andReturn(componentInfo).anyTimes();
     expect(componentInfo.getTimelineAppid()).andReturn(null).anyTimes();
@@ -1007,5 +1085,4 @@ public class AMSPropertyProviderTest {
     field.setAccessible(true);
     field.set(cacheEntryFactory, new MetricsRequestHelper(streamProvider));
   }
-
 }