You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by av...@apache.org on 2018/05/16 22:25:48 UTC
[ambari] branch trunk updated: AMBARI-23863 : Fix issues in app
aggregator in AMS memory cluster aggregation.
This is an automated email from the ASF dual-hosted git repository.
avijayan pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/ambari.git
The following commit(s) were added to refs/heads/trunk by this push:
new f429f27 AMBARI-23863 : Fix issues in app aggregator in AMS memory cluster aggregation.
f429f27 is described below
commit f429f27948c94035d0d7e375d00069829f32f19f
Author: Aravindan Vijayan <av...@hortonworks.com>
AuthorDate: Wed May 16 12:15:44 2018 -0700
AMBARI-23863 : Fix issues in app aggregator in AMS memory cluster aggregation.
---
.../core/timeline/HBaseTimelineMetricsService.java | 4 +-
.../timeline/TimelineMetricDistributedCache.java | 2 +-
.../core/timeline/TimelineMetricsIgniteCache.java | 99 +++++++++++++---------
.../timeline/TimelineMetricsIgniteCacheTest.java | 22 ++---
...ClusterAggregatorSecondWithCacheSourceTest.java | 9 +-
5 files changed, 77 insertions(+), 59 deletions(-)
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/HBaseTimelineMetricsService.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/HBaseTimelineMetricsService.java
index d09d4bb..2be9b01 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/HBaseTimelineMetricsService.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/HBaseTimelineMetricsService.java
@@ -102,7 +102,7 @@ public class HBaseTimelineMetricsService extends AbstractService implements Time
private TimelineMetricDistributedCache startCacheNode() throws MalformedURLException, URISyntaxException {
//TODO make configurable
- return new TimelineMetricsIgniteCache();
+ return new TimelineMetricsIgniteCache(metricMetadataManager);
}
@@ -408,7 +408,7 @@ public class HBaseTimelineMetricsService extends AbstractService implements Time
hBaseAccessor.insertMetricRecordsWithMetadata(metricMetadataManager, metrics, false);
if (configuration.isCollectorInMemoryAggregationEnabled()) {
- cache.putMetrics(metrics.getMetrics(), metricMetadataManager);
+ cache.putMetrics(metrics.getMetrics());
}
return response;
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/TimelineMetricDistributedCache.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/TimelineMetricDistributedCache.java
index 000b3bc..1ae6346 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/TimelineMetricDistributedCache.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/TimelineMetricDistributedCache.java
@@ -27,6 +27,6 @@ import org.apache.ambari.metrics.core.timeline.discovery.TimelineMetricMetadataM
public interface TimelineMetricDistributedCache {
Map<TimelineClusterMetric, MetricClusterAggregate> evictMetricAggregates(Long startTime, Long endTime);
- void putMetrics(Collection<TimelineMetric> elements, TimelineMetricMetadataManager metricMetadataManager);
+ void putMetrics(Collection<TimelineMetric> elements);
Map<String, Double> getPointInTimeCacheMetrics();
}
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/TimelineMetricsIgniteCache.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/TimelineMetricsIgniteCache.java
index e085fd8..b39c2a3 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/TimelineMetricsIgniteCache.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/TimelineMetricsIgniteCache.java
@@ -17,46 +17,18 @@
*/
package org.apache.ambari.metrics.core.timeline;
-import static java.util.concurrent.TimeUnit.SECONDS;
-import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_SECOND_SLEEP_INTERVAL;
-import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_TIMESLICE_INTERVAL;
-import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.HOST_APP_ID;
-import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_CLUSTER_AGGREGATOR_INTERPOLATION_ENABLED;
-import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_COLLECTOR_IGNITE_BACKUPS;
-import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_COLLECTOR_IGNITE_NODES;
-import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.TIMELINE_METRIC_AGGREGATION_SQL_FILTERS;
-import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.TIMELINE_SERVICE_HTTP_POLICY;
-import static org.apache.ambari.metrics.core.timeline.aggregators.AggregatorUtils.getRoundedCheckPointTimeMillis;
-import static org.apache.ambari.metrics.core.timeline.aggregators.AggregatorUtils.getTimeSlices;
-import static org.apache.ambari.metrics.core.timeline.aggregators.AggregatorUtils.sliceFromTimelineMetric;
-import static org.apache.hadoop.metrics2.sink.timeline.TimelineMetricUtils.getJavaMetricPatterns;
-
-import java.net.MalformedURLException;
-import java.net.URISyntaxException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.Lock;
-
-import javax.cache.Cache;
-import javax.cache.expiry.CreatedExpiryPolicy;
-import javax.cache.expiry.Duration;
-
+import org.apache.ambari.metrics.core.timeline.aggregators.TimelineClusterMetric;
+import org.apache.ambari.metrics.core.timeline.discovery.TimelineMetricHostMetadata;
+import org.apache.ambari.metrics.core.timeline.discovery.TimelineMetricMetadataKey;
+import org.apache.ambari.metrics.core.timeline.discovery.TimelineMetricMetadataManager;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.metrics2.sink.timeline.MetricClusterAggregate;
import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetricMetadata;
import org.apache.hadoop.metrics2.sink.timeline.availability.MetricCollectorHAHelper;
-import org.apache.ambari.metrics.core.timeline.aggregators.TimelineClusterMetric;
-import org.apache.ambari.metrics.core.timeline.discovery.TimelineMetricHostMetadata;
-import org.apache.ambari.metrics.core.timeline.discovery.TimelineMetricMetadataManager;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.Ignition;
@@ -72,6 +44,35 @@ import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
import org.apache.ignite.ssl.SslContextFactory;
+import javax.cache.Cache;
+import javax.cache.expiry.CreatedExpiryPolicy;
+import javax.cache.expiry.Duration;
+import java.net.MalformedURLException;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Lock;
+
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_SECOND_SLEEP_INTERVAL;
+import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_TIMESLICE_INTERVAL;
+import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.HOST_APP_ID;
+import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_CLUSTER_AGGREGATOR_INTERPOLATION_ENABLED;
+import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_COLLECTOR_IGNITE_BACKUPS;
+import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_COLLECTOR_IGNITE_NODES;
+import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.TIMELINE_METRIC_AGGREGATION_SQL_FILTERS;
+import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.TIMELINE_SERVICE_HTTP_POLICY;
+import static org.apache.ambari.metrics.core.timeline.aggregators.AggregatorUtils.getRoundedCheckPointTimeMillis;
+import static org.apache.ambari.metrics.core.timeline.aggregators.AggregatorUtils.getTimeSlices;
+import static org.apache.ambari.metrics.core.timeline.aggregators.AggregatorUtils.sliceFromTimelineMetric;
+import static org.apache.hadoop.metrics2.sink.timeline.TimelineMetricUtils.getJavaMetricPatterns;
+
public class TimelineMetricsIgniteCache implements TimelineMetricDistributedCache {
private static final Log LOG =
LogFactory.getLog(TimelineMetricsIgniteCache.class);
@@ -80,14 +81,16 @@ public class TimelineMetricsIgniteCache implements TimelineMetricDistributedCach
private boolean interpolationEnabled;
private List<String> skipAggrPatternStrings = new ArrayList<>();
private List<String> appIdsToAggregate;
+ private TimelineMetricMetadataManager metricMetadataManager;
- public TimelineMetricsIgniteCache() throws MalformedURLException, URISyntaxException {
+ public TimelineMetricsIgniteCache(TimelineMetricMetadataManager metricMetadataManager) throws MalformedURLException, URISyntaxException {
TimelineMetricConfiguration timelineMetricConfiguration = TimelineMetricConfiguration.getInstance();
Configuration metricConf = timelineMetricConfiguration.getMetricsConf();
Configuration sslConf = timelineMetricConfiguration.getMetricsSslConf();
IgniteConfiguration igniteConfiguration = new IgniteConfiguration();
+ this.metricMetadataManager = metricMetadataManager;
//TODO add config to disable logging
@@ -204,11 +207,10 @@ public class TimelineMetricsIgniteCache implements TimelineMetricDistributedCach
* calculates applications host metrics based on the metadata of hosted apps
* updates metadata of hosted apps if needed
* @param elements
- * @param metadataManager
*/
@Override
- public void putMetrics(Collection<TimelineMetric> elements, TimelineMetricMetadataManager metadataManager) {
- Map<String, TimelineMetricHostMetadata> hostMetadata = metadataManager.getHostedAppsCache();
+ public void putMetrics(Collection<TimelineMetric> elements) {
+ Map<String, TimelineMetricHostMetadata> hostMetadata = metricMetadataManager.getHostedAppsCache();
for (TimelineMetric metric : elements) {
if (shouldBeSkipped(metric.getMetricName())) {
if (LOG.isDebugEnabled()) {
@@ -260,8 +262,27 @@ public class TimelineMetricsIgniteCache implements TimelineMetricDistributedCach
private void updateAppAggregatesFromHostMetric(TimelineClusterMetric key, MetricClusterAggregate newMetricClusterAggregate, TimelineMetricHostMetadata timelineMetricHostMetadata) {
for (String appId : timelineMetricHostMetadata.getHostedApps().keySet()) {
- TimelineClusterMetric timelineClusterMetric = new TimelineClusterMetric(key.getMetricName(), appId, key.getInstanceId(), key.getTimestamp());
- putMetricIntoCache(timelineClusterMetric, newMetricClusterAggregate);
+ if (appIdsToAggregate.contains(appId)) {
+ TimelineClusterMetric timelineClusterMetric = new TimelineClusterMetric(key.getMetricName(), appId, key.getInstanceId(), key.getTimestamp());
+
+ TimelineMetricMetadataKey metadataKey = new TimelineMetricMetadataKey(timelineClusterMetric.getMetricName(), appId, timelineClusterMetric.getInstanceId());
+ TimelineMetricMetadata metricMetadata = metricMetadataManager.getMetadataCacheValue(metadataKey);
+
+ if (metricMetadata == null || metricMetadata.getUuid() == null) {
+ TimelineMetricMetadataKey metricMetadataKey = new TimelineMetricMetadataKey(timelineClusterMetric.getMetricName(), HOST_APP_ID, timelineClusterMetric.getInstanceId());
+ metricMetadata = metricMetadataManager.getMetadataCacheValue(metricMetadataKey);
+ if (metricMetadata != null) {
+ TimelineMetricMetadata timelineMetricMetadata = new TimelineMetricMetadata(timelineClusterMetric.getMetricName(),
+ appId, timelineClusterMetric.getInstanceId(), metricMetadata.getUnits(), metricMetadata.getType(), metricMetadata.getSeriesStartTime(),
+ metricMetadata.isSupportsAggregates(), TimelineMetricsFilter.acceptMetric(timelineClusterMetric.getMetricName(), appId));
+ byte[] uuid = metricMetadataManager.getUuid(timelineClusterMetric.getMetricName(), appId, timelineClusterMetric.getInstanceId(), StringUtils.EMPTY, true);
+ timelineMetricMetadata.setUuid(uuid);
+ metricMetadataManager.putIfModifiedTimelineMetricMetadata(timelineMetricMetadata);
+ }
+ }
+
+ putMetricIntoCache(timelineClusterMetric, newMetricClusterAggregate);
+ }
}
}
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/TimelineMetricsIgniteCacheTest.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/TimelineMetricsIgniteCacheTest.java
index 1d0f97a..239a1d5 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/TimelineMetricsIgniteCacheTest.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/TimelineMetricsIgniteCacheTest.java
@@ -67,17 +67,18 @@ public class TimelineMetricsIgniteCacheTest {
conf.getMetricsConf().set(TIMELINE_METRICS_COLLECTOR_IGNITE_NODES, "localhost");
replayAll();
- timelineMetricsIgniteCache = new TimelineMetricsIgniteCache();
+ TimelineMetricMetadataManager metricMetadataManagerMock = createNiceMock(TimelineMetricMetadataManager.class);
+ expect(metricMetadataManagerMock.getUuid(anyObject(TimelineClusterMetric.class), anyBoolean())).andReturn(new byte[16]).once();
+ expect(metricMetadataManagerMock.getHostedAppsCache()).andReturn(new HashMap<>()).anyTimes();
+ replay(metricMetadataManagerMock);
+
+ timelineMetricsIgniteCache = new TimelineMetricsIgniteCache(metricMetadataManagerMock);
}
@Test
public void putEvictMetricsFromCacheSlicesMerging() throws Exception {
long cacheSliceIntervalMillis = 30000L;
- TimelineMetricMetadataManager metricMetadataManagerMock = createNiceMock(TimelineMetricMetadataManager.class);
- expect(metricMetadataManagerMock.getUuid(anyObject(TimelineClusterMetric.class), anyBoolean())).andReturn(new byte[16]).once();
- replay(metricMetadataManagerMock);
-
long startTime = getRoundedCheckPointTimeMillis(System.currentTimeMillis(), cacheSliceIntervalMillis);
long seconds = 1000;
@@ -103,7 +104,7 @@ public class TimelineMetricsIgniteCacheTest {
Collection<TimelineMetric> timelineMetrics = new ArrayList<>();
timelineMetrics.add(timelineMetric);
- timelineMetricsIgniteCache.putMetrics(timelineMetrics, metricMetadataManagerMock);
+ timelineMetricsIgniteCache.putMetrics(timelineMetrics);
Map<TimelineClusterMetric, MetricClusterAggregate> aggregateMap = timelineMetricsIgniteCache.evictMetricAggregates(startTime, startTime + 120*seconds);
Assert.assertEquals(aggregateMap.size(), 2);
@@ -151,7 +152,7 @@ public class TimelineMetricsIgniteCacheTest {
timelineMetrics = new ArrayList<>();
timelineMetrics.add(timelineMetric);
timelineMetrics.add(timelineMetric2);
- timelineMetricsIgniteCache.putMetrics(timelineMetrics, metricMetadataManagerMock);
+ timelineMetricsIgniteCache.putMetrics(timelineMetrics);
aggregateMap = timelineMetricsIgniteCache.evictMetricAggregates(startTime, startTime + 120*seconds);
Assert.assertEquals(aggregateMap.size(), 2);
@@ -177,11 +178,6 @@ public class TimelineMetricsIgniteCacheTest {
long cacheSliceIntervalMillis = 30000L;
- TimelineMetricMetadataManager metricMetadataManagerMock = createNiceMock(TimelineMetricMetadataManager.class);
- expect(metricMetadataManagerMock.getUuid(anyObject(TimelineClusterMetric.class), anyBoolean())).andReturn(new byte[16]).once();
- expect(metricMetadataManagerMock.getHostedAppsCache()).andReturn(new HashMap<>()).anyTimes();
- replay(metricMetadataManagerMock);
-
long startTime = getRoundedCheckPointTimeMillis(System.currentTimeMillis(), cacheSliceIntervalMillis);
long seconds = 1000;
@@ -216,7 +212,7 @@ public class TimelineMetricsIgniteCacheTest {
timelineMetric.setMetricValues(metricValues);
timelineMetrics.add(timelineMetric);
- timelineMetricsIgniteCache.putMetrics(timelineMetrics, metricMetadataManagerMock);
+ timelineMetricsIgniteCache.putMetrics(timelineMetrics);
Map<TimelineClusterMetric, MetricClusterAggregate> aggregateMap = timelineMetricsIgniteCache.evictMetricAggregates(startTime, startTime + 120*seconds);
Assert.assertEquals(aggregateMap.size(), 6);
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/aggregators/TimelineMetricClusterAggregatorSecondWithCacheSourceTest.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/aggregators/TimelineMetricClusterAggregatorSecondWithCacheSourceTest.java
index 34d470c..288c4b5 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/aggregators/TimelineMetricClusterAggregatorSecondWithCacheSourceTest.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/aggregators/TimelineMetricClusterAggregatorSecondWithCacheSourceTest.java
@@ -52,6 +52,7 @@ import static org.powermock.api.easymock.PowerMock.replayAll;
public class TimelineMetricClusterAggregatorSecondWithCacheSourceTest {
private static TimelineMetricsIgniteCache timelineMetricsIgniteCache;
+ private static TimelineMetricMetadataManager metricMetadataManagerMock;
@BeforeClass
public static void setupConf() throws Exception {
TimelineMetricConfiguration conf = new TimelineMetricConfiguration(new
@@ -60,8 +61,11 @@ public class TimelineMetricClusterAggregatorSecondWithCacheSourceTest {
expect(TimelineMetricConfiguration.getInstance()).andReturn(conf).anyTimes();
conf.getMetricsConf().set(TIMELINE_METRICS_COLLECTOR_IGNITE_NODES, "localhost");
replayAll();
+ metricMetadataManagerMock = createNiceMock(TimelineMetricMetadataManager.class);
+ expect(metricMetadataManagerMock.getMetadataCacheValue((TimelineMetricMetadataKey) anyObject())).andReturn(null).anyTimes();
+ replay(metricMetadataManagerMock);
- timelineMetricsIgniteCache = new TimelineMetricsIgniteCache();
+ timelineMetricsIgniteCache = new TimelineMetricsIgniteCache(metricMetadataManagerMock);
}
@Test
@@ -71,9 +75,6 @@ public class TimelineMetricClusterAggregatorSecondWithCacheSourceTest {
Configuration configuration = new Configuration();
- TimelineMetricMetadataManager metricMetadataManagerMock = createNiceMock(TimelineMetricMetadataManager.class);
- expect(metricMetadataManagerMock.getMetadataCacheValue((TimelineMetricMetadataKey) anyObject())).andReturn(null).anyTimes();
- replay(metricMetadataManagerMock);
TimelineMetricClusterAggregatorSecondWithCacheSource secondAggregator = new TimelineMetricClusterAggregatorSecondWithCacheSource(
METRIC_AGGREGATE_SECOND, metricMetadataManagerMock, null, configuration, null,
--
To stop receiving notification emails like this one, please contact
avijayan@apache.org.