You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by sw...@apache.org on 2016/04/29 01:21:11 UTC
ambari git commit: AMBARI-16143. Ambari metrics API call should allow
for early failure. (swagle)
Repository: ambari
Updated Branches:
refs/heads/trunk a4af94292 -> 35b4ef6fe
AMBARI-16143. Ambari metrics API call should allow for early failure. (swagle)
Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/35b4ef6f
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/35b4ef6f
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/35b4ef6f
Branch: refs/heads/trunk
Commit: 35b4ef6fe18b6a88abdef9998e41010c9035f293
Parents: a4af942
Author: Siddharth Wagle <sw...@hortonworks.com>
Authored: Thu Apr 28 16:20:35 2016 -0700
Committer: Siddharth Wagle <sw...@hortonworks.com>
Committed: Thu Apr 28 16:20:41 2016 -0700
----------------------------------------------------------------------
.../metrics/MetricsPropertyProvider.java | 2 +-
.../metrics/timeline/AMSPropertyProvider.java | 44 ++++-------
.../timeline/AMSReportPropertyProvider.java | 19 +++--
.../metrics/timeline/MetricsRequestHelper.java | 2 +-
.../timeline/cache/TimelineMetricCache.java | 19 ++++-
.../cache/TimelineMetricCacheEntryFactory.java | 2 +
.../timeline/AMSPropertyProviderTest.java | 81 +++++++++++++++++++-
7 files changed, 130 insertions(+), 39 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ambari/blob/35b4ef6f/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/MetricsPropertyProvider.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/MetricsPropertyProvider.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/MetricsPropertyProvider.java
index a346051..61d7a17 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/MetricsPropertyProvider.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/MetricsPropertyProvider.java
@@ -132,7 +132,7 @@ public abstract class MetricsPropertyProvider extends AbstractPropertyProvider {
return resources;
}
- if(!checkAuthorizationForMetrics(resources, clusterNamePropertyId)) {
+ if (!checkAuthorizationForMetrics(resources, clusterNamePropertyId)) {
return resources;
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/35b4ef6f/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/AMSPropertyProvider.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/AMSPropertyProvider.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/AMSPropertyProvider.java
index 4bc9fd7..9a7454c 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/AMSPropertyProvider.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/AMSPropertyProvider.java
@@ -196,7 +196,7 @@ public abstract class AMSPropertyProvider extends MetricsPropertyProvider {
* @throws SystemException if unable to populate the resources
*/
@SuppressWarnings("unchecked")
- public Collection<Resource> populateResources() throws SystemException {
+ public Collection<Resource> populateResources() throws SystemException, IOException {
// No open ended query support.
if (temporalInfo != null && (temporalInfo.getStartTime() == null
|| temporalInfo.getEndTime() == null)) {
@@ -218,20 +218,10 @@ public abstract class AMSPropertyProvider extends MetricsPropertyProvider {
if (!hostComponentHostMetrics.isEmpty()) {
String hostComponentHostMetricParams = getSetString(processRegexps(hostComponentHostMetrics), -1);
setQueryParams(hostComponentHostMetricParams, hostnames, true, componentName);
- TimelineMetrics metricsResponse = null;
- try {
- metricsResponse = getTimelineMetricsFromCache(
+ TimelineMetrics metricsResponse = getTimelineMetricsFromCache(
getTimelineAppMetricCacheKey(hostComponentHostMetrics,
componentName, hostnames, uriBuilder.toString()), componentName);
- } catch (IOException e) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Caught exception fetching metric data.", e);
- }
- // Skip further queries to preempt long calls due to timeout
- if (e instanceof SocketTimeoutException) {
- return Collections.emptySet();
- }
- }
+
if (metricsResponse != null) {
timelineMetrics.getMetrics().addAll(metricsResponse.getMetrics());
}
@@ -240,20 +230,10 @@ public abstract class AMSPropertyProvider extends MetricsPropertyProvider {
if (!nonHostComponentMetrics.isEmpty()) {
String nonHostComponentHostMetricParams = getSetString(processRegexps(nonHostComponentMetrics), -1);
setQueryParams(nonHostComponentHostMetricParams, hostnames, false, componentName);
- TimelineMetrics metricsResponse = null;
- try {
- metricsResponse = getTimelineMetricsFromCache(
+ TimelineMetrics metricsResponse = getTimelineMetricsFromCache(
getTimelineAppMetricCacheKey(nonHostComponentMetrics,
componentName, hostnames, uriBuilder.toString()), componentName);
- } catch (IOException e) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Caught exception fetching metric data.", e);
- }
- // Skip further queries to preempt long calls due to timeout
- if (e instanceof SocketTimeoutException) {
- return Collections.emptySet();
- }
- }
+
if (metricsResponse != null) {
timelineMetrics.getMetrics().addAll(metricsResponse.getMetrics());
}
@@ -485,8 +465,18 @@ public abstract class AMSPropertyProvider extends MetricsPropertyProvider {
// For each cluster
for (Map.Entry<String, Map<TemporalInfo, MetricsRequest>> clusterEntry : requestMap.entrySet()) {
// For each request
- for (MetricsRequest metricsRequest : clusterEntry.getValue().values() ) {
- metricsRequest.populateResources();
+ for (MetricsRequest metricsRequest : clusterEntry.getValue().values()) {
+ try {
+ metricsRequest.populateResources();
+ } catch (IOException io) {
+ // Skip further queries to preempt long calls due to timeout
+ if (io instanceof SocketTimeoutException) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Skip populating resources on socket timeout.");
+ }
+ break;
+ }
+ }
}
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/35b4ef6f/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/AMSReportPropertyProvider.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/AMSReportPropertyProvider.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/AMSReportPropertyProvider.java
index 306390c..3688742 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/AMSReportPropertyProvider.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/AMSReportPropertyProvider.java
@@ -38,6 +38,7 @@ import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
import org.apache.http.client.utils.URIBuilder;
import java.io.IOException;
+import java.net.SocketTimeoutException;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
@@ -204,15 +205,21 @@ public class AMSReportPropertyProvider extends MetricsReportPropertyProvider {
// Self populating cache updates itself on every get with latest results
TimelineMetrics timelineMetrics;
- if (metricCache != null && metricCacheKey.getTemporalInfo() != null) {
- timelineMetrics = metricCache.getAppTimelineMetricsFromCache(metricCacheKey);
- } else {
- try {
+ try {
+ if (metricCache != null && metricCacheKey.getTemporalInfo() != null) {
+ timelineMetrics = metricCache.getAppTimelineMetricsFromCache(metricCacheKey);
+ } else {
timelineMetrics = requestHelper.fetchTimelineMetrics(uriBuilder,
temporalInfo.getStartTimeMillis(),
temporalInfo.getEndTimeMillis());
- } catch (IOException e) {
- timelineMetrics = null;
+ }
+ } catch (IOException io) {
+ timelineMetrics = null;
+ if (io instanceof SocketTimeoutException) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Skip populating metrics on socket timeout exception.");
+ }
+ break;
}
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/35b4ef6f/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/MetricsRequestHelper.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/MetricsRequestHelper.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/MetricsRequestHelper.java
index 94014f8..388de15 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/MetricsRequestHelper.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/MetricsRequestHelper.java
@@ -117,7 +117,7 @@ public class MetricsRequestHelper {
}
if (io instanceof SocketTimeoutException) {
- errorMsg += " Can not connect to collector, socket error.";
+ errorMsg += " Cannot connect to collector: SocketTimeoutException.";
LOG.error(errorMsg);
throw io;
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/35b4ef6f/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/cache/TimelineMetricCache.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/cache/TimelineMetricCache.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/cache/TimelineMetricCache.java
index 8b17a23..b5fe05e 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/cache/TimelineMetricCache.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/cache/TimelineMetricCache.java
@@ -30,6 +30,8 @@ import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.IOException;
+import java.net.SocketTimeoutException;
import java.util.ArrayList;
import java.util.concurrent.atomic.AtomicInteger;
@@ -54,7 +56,7 @@ public class TimelineMetricCache extends UpdatingSelfPopulatingCache {
* @param key @TimelineAppMetricCacheKey
* @return @org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics
*/
- public TimelineMetrics getAppTimelineMetricsFromCache(TimelineAppMetricCacheKey key) throws IllegalArgumentException {
+ public TimelineMetrics getAppTimelineMetricsFromCache(TimelineAppMetricCacheKey key) throws IllegalArgumentException, IOException {
if (LOG.isDebugEnabled()) {
LOG.debug("Fetching metrics with key: " + key);
}
@@ -62,7 +64,20 @@ public class TimelineMetricCache extends UpdatingSelfPopulatingCache {
// Make sure key is valid
validateKey(key);
- Element element = get(key);
+ Element element = null;
+ try {
+ element = get(key);
+ } catch (LockTimeoutException le) {
+ // Ehcache masks the Socket Timeout to look as a LockTimeout
+ Throwable t = le.getCause();
+ if (t instanceof CacheException) {
+ t = t.getCause();
+ if (t instanceof SocketTimeoutException) {
+ throw new SocketTimeoutException(t.getMessage());
+ }
+ }
+ }
+
TimelineMetrics timelineMetrics = new TimelineMetrics();
if (element != null && element.getObjectValue() != null) {
TimelineMetricsCacheValue value = (TimelineMetricsCacheValue) element.getObjectValue();
http://git-wip-us.apache.org/repos/asf/ambari/blob/35b4ef6f/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/cache/TimelineMetricCacheEntryFactory.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/cache/TimelineMetricCacheEntryFactory.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/cache/TimelineMetricCacheEntryFactory.java
index ed0f878..4880d98 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/cache/TimelineMetricCacheEntryFactory.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/cache/TimelineMetricCacheEntryFactory.java
@@ -91,6 +91,7 @@ public class TimelineMetricCacheEntryFactory implements UpdatingCacheEntryFactor
metricCacheKey.getTemporalInfo().getEndTimeMillis());
} catch (IOException io) {
LOG.debug("Caught IOException on fetching metrics. " + io.getMessage());
+ throw io;
}
TimelineMetricsCacheValue value = null;
@@ -191,6 +192,7 @@ public class TimelineMetricCacheEntryFactory implements UpdatingCacheEntryFactor
if (LOG.isDebugEnabled()) {
LOG.debug("Exception retrieving metrics.", io);
}
+ throw io;
}
} else {
LOG.debug("Skip updating cache with new startTime = " +
http://git-wip-us.apache.org/repos/asf/ambari/blob/35b4ef6f/ambari-server/src/test/java/org/apache/ambari/server/controller/metrics/timeline/AMSPropertyProviderTest.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/controller/metrics/timeline/AMSPropertyProviderTest.java b/ambari-server/src/test/java/org/apache/ambari/server/controller/metrics/timeline/AMSPropertyProviderTest.java
index 3adf9f7..c7dabf1 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/controller/metrics/timeline/AMSPropertyProviderTest.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/controller/metrics/timeline/AMSPropertyProviderTest.java
@@ -23,6 +23,7 @@ import org.apache.ambari.server.configuration.ComponentSSLConfiguration;
import org.apache.ambari.server.configuration.Configuration;
import org.apache.ambari.server.controller.AmbariManagementController;
import org.apache.ambari.server.controller.AmbariServer;
+import org.apache.ambari.server.controller.internal.AbstractPropertyProvider;
import org.apache.ambari.server.controller.internal.PropertyInfo;
import org.apache.ambari.server.controller.internal.ResourceImpl;
import org.apache.ambari.server.controller.internal.TemporalInfoImpl;
@@ -43,6 +44,7 @@ import org.apache.ambari.server.state.Cluster;
import org.apache.ambari.server.state.Clusters;
import org.apache.ambari.server.state.ComponentInfo;
import org.apache.ambari.server.state.StackId;
+import org.apache.commons.httpclient.HttpConnection;
import org.apache.http.client.utils.URIBuilder;
import org.easymock.EasyMock;
import org.junit.After;
@@ -50,16 +52,21 @@ import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
+import org.mockito.Matchers;
import org.powermock.api.easymock.PowerMock;
import org.powermock.core.classloader.annotations.PowerMockIgnore;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
import org.springframework.security.core.context.SecurityContextHolder;
+import javax.ws.rs.HttpMethod;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.lang.reflect.Field;
+import java.lang.reflect.Method;
+import java.net.HttpURLConnection;
+import java.net.SocketTimeoutException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
@@ -70,9 +77,14 @@ import java.util.Set;
import static org.apache.ambari.server.controller.metrics.MetricsServiceProvider.MetricsService;
import static org.easymock.EasyMock.anyObject;
+import static org.easymock.EasyMock.createMockBuilder;
import static org.easymock.EasyMock.createNiceMock;
import static org.easymock.EasyMock.expect;
import static org.easymock.EasyMock.replay;
+import static org.easymock.EasyMock.verify;
+import static org.mockito.Matchers.anyCollection;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
@RunWith(PowerMockRunner.class)
@@ -934,6 +946,72 @@ public class AMSPropertyProviderTest {
}
}
+ @Test
+ public void testSocketTimeoutExceptionBehavior() throws Exception {
+ setUpCommonMocks();
+
+ SecurityContextHolder.getContext().setAuthentication(
+ TestAuthenticationFactory.createClusterAdministrator("ClusterAdmin", 2L));
+
+ URLStreamProvider streamProvider = createNiceMock(URLStreamProvider.class);
+ HttpURLConnection connection = createNiceMock(HttpURLConnection.class);
+
+ expect(streamProvider.processURL((String) anyObject(), (String) anyObject(),
+ (String) anyObject(), (Map<String, List<String>>) anyObject())).andReturn(connection);
+
+ expect(connection.getInputStream()).andThrow(
+ new SocketTimeoutException("Unit test raising Exception")).once();
+
+ replay(streamProvider, connection);
+
+ injectCacheEntryFactoryWithStreamProvider(streamProvider);
+ TestMetricHostProvider metricHostProvider = new TestMetricHostProvider();
+ ComponentSSLConfiguration sslConfiguration = mock(ComponentSSLConfiguration.class);
+
+ Map<String, Map<String, PropertyInfo>> propertyIds = PropertyHelper.getMetricPropertyIds(Resource.Type.Host);
+
+ AMSPropertyProvider propertyProvider = new AMSHostPropertyProvider(
+ propertyIds,
+ streamProvider,
+ sslConfiguration,
+ new TimelineMetricCacheProvider(new Configuration(), cacheEntryFactory),
+ metricHostProvider,
+ CLUSTER_NAME_PROPERTY_ID,
+ HOST_NAME_PROPERTY_ID
+ );
+
+ final Resource resource1 = new ResourceImpl(Resource.Type.Host);
+ resource1.setProperty(CLUSTER_NAME_PROPERTY_ID, "c1");
+ resource1.setProperty(HOST_NAME_PROPERTY_ID, "h1");
+ final Resource resource2 = new ResourceImpl(Resource.Type.Host);
+ resource2.setProperty(CLUSTER_NAME_PROPERTY_ID, "c1");
+ resource2.setProperty(HOST_NAME_PROPERTY_ID, "h2");
+
+ // Separating temporal info to ensure multiple requests made
+ Map<String, TemporalInfo> temporalInfoMap = new HashMap<String, TemporalInfo>();
+ temporalInfoMap.put(PROPERTY_ID1, new TemporalInfoImpl(1416445244801L, 1416448936464L, 1L));
+ temporalInfoMap.put(PROPERTY_ID2, new TemporalInfoImpl(1416445344901L, 1416448946564L, 1L));
+
+ Request request = PropertyHelper.getReadRequest(
+ new HashSet<String>() {{
+ add(PROPERTY_ID1);
+ add(PROPERTY_ID2);
+ }}, temporalInfoMap);
+
+ Set<Resource> resources =
+ propertyProvider.populateResources(
+ new HashSet<Resource>() {{ add(resource1); add(resource2); }}, request, null);
+
+ verify(streamProvider, connection);
+
+ Assert.assertEquals(2, resources.size());
+ Resource res = resources.iterator().next();
+ Map<String, Object> properties = PropertyHelper.getProperties(resources.iterator().next());
+ Assert.assertNotNull(properties);
+ Assert.assertNull(res.getPropertyValue(PROPERTY_ID1));
+ Assert.assertNull(res.getPropertyValue(PROPERTY_ID2));
+ }
+
public static class TestMetricHostProvider implements MetricHostProvider {
private String hostName;
@@ -992,7 +1070,7 @@ public class AMSPropertyProviderTest {
expect(ams.getAmbariMetaInfo()).andReturn(ambariMetaInfo).anyTimes();
expect(ambariMetaInfo.getComponentToService(anyObject(String.class),
anyObject(String.class), anyObject(String.class))).andReturn("HDFS").anyTimes();
- expect(ambariMetaInfo.getComponent(anyObject(String.class),anyObject(String.class),
+ expect(ambariMetaInfo.getComponent(anyObject(String.class), anyObject(String.class),
anyObject(String.class), anyObject(String.class)))
.andReturn(componentInfo).anyTimes();
expect(componentInfo.getTimelineAppid()).andReturn(null).anyTimes();
@@ -1007,5 +1085,4 @@ public class AMSPropertyProviderTest {
field.setAccessible(true);
field.set(cacheEntryFactory, new MetricsRequestHelper(streamProvider));
}
-
}