You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by sw...@apache.org on 2015/08/18 01:56:51 UTC
[1/2] ambari git commit: AMBARI-12654. Create a Caching layer that
provides sliding window behavior for metric requests to Ambari. (swagle)
Repository: ambari
Updated Branches:
refs/heads/branch-2.1 c9a703009 -> b4ee1918d
http://git-wip-us.apache.org/repos/asf/ambari/blob/b4ee1918/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/cache/TimelineAppMetricCacheKey.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/cache/TimelineAppMetricCacheKey.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/cache/TimelineAppMetricCacheKey.java
new file mode 100644
index 0000000..76bc73b
--- /dev/null
+++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/cache/TimelineAppMetricCacheKey.java
@@ -0,0 +1,119 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.server.controller.metrics.timeline.cache;
+
+import org.apache.ambari.server.controller.spi.TemporalInfo;
+import org.apache.http.client.utils.URIBuilder;
+
+import java.util.Set;
+
+/**
+ * Cache contents represent metrics for an App / Cluster.
+ * This is designed to work on the premise that a client makes same requests
+ * over and over, so this caching strategy allows us to send a http request
+ * for multiple metrics in the same query and ensure parallelization on the
+ * metrics backend side as well.
+ */
+public class TimelineAppMetricCacheKey {
+ private final Set<String> metricNames;
+ private final String appId;
+ private String spec;
+ private TemporalInfo temporalInfo;
+
+ public TimelineAppMetricCacheKey(Set<String> metricNames, String appId,
+ TemporalInfo temporalInfo) {
+ this.metricNames = metricNames;
+ this.appId = appId;
+ this.temporalInfo = temporalInfo;
+ }
+
+ public Set<String> getMetricNames() {
+ return metricNames;
+ }
+
+ /**
+ * Temporal info is used to calculate the next query window,
+ * it does not contribute to the key behavior.
+ * @return @TemporalInfo
+ */
+ public TemporalInfo getTemporalInfo() {
+ return temporalInfo;
+ }
+
+ /**
+ * Set temporalInfo to new query window each time.
+ * @param temporalInfo @TemporalInfo
+ */
+ public void setTemporalInfo(TemporalInfo temporalInfo) {
+ this.temporalInfo = temporalInfo;
+ }
+
+ public String getAppId() {
+ return appId;
+ }
+
+ /**
+ * Actual http request Uri, this does not contribute to the key behavior,
+ * it is used solely for interoperability between @AMSPropertyProvider.MetricsRequest
+ * and the Cache.
+ * @return Request Uri
+ */
+ public String getSpec() {
+ return spec;
+ }
+
+ /**
+ * Set spec string.
+ * @param spec Request Uri
+ */
+ public void setSpec(String spec) {
+ this.spec = spec;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+
+ TimelineAppMetricCacheKey that = (TimelineAppMetricCacheKey) o;
+
+ if (appId != null ? !appId.equals(that.appId) : that.appId != null)
+ return false;
+ if (metricNames != null ? !metricNames.equals(that.metricNames) : that.metricNames != null)
+ return false;
+
+ return true;
+ }
+
+ @Override
+ public int hashCode() {
+ int result = metricNames != null ? metricNames.hashCode() : 0;
+ result = 31 * result + (appId != null ? appId.hashCode() : 0);
+ return result;
+ }
+
+ @Override
+ public String toString() {
+ return "TimelineAppMetricCacheKey {" +
+ "metricNames = " + metricNames +
+ ", appId = '" + appId + '\'' +
+ ", temporalInfo = " + temporalInfo +
+ ", uriInfo = " + spec +
+ '}';
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/b4ee1918/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/cache/TimelineMetricCache.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/cache/TimelineMetricCache.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/cache/TimelineMetricCache.java
new file mode 100644
index 0000000..ca141d4
--- /dev/null
+++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/cache/TimelineMetricCache.java
@@ -0,0 +1,133 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.server.controller.metrics.timeline.cache;
+
+import net.sf.ehcache.CacheException;
+import net.sf.ehcache.Ehcache;
+import net.sf.ehcache.Element;
+import net.sf.ehcache.constructs.blocking.LockTimeoutException;
+import net.sf.ehcache.constructs.blocking.UpdatingCacheEntryFactory;
+import net.sf.ehcache.constructs.blocking.UpdatingSelfPopulatingCache;
+import org.apache.ambari.server.AmbariException;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class TimelineMetricCache extends UpdatingSelfPopulatingCache {
+
+ private final static Logger LOG = LoggerFactory.getLogger(TimelineMetricCache.class);
+ private static AtomicInteger printCacheStatsCounter = new AtomicInteger(0);
+
+ /**
+ * Creates a SelfPopulatingCache.
+ *
+ * @param cache @Cache
+ * @param factory @CacheEntryFactory
+ */
+ public TimelineMetricCache(Ehcache cache, UpdatingCacheEntryFactory factory) throws CacheException {
+ super(cache, factory);
+ }
+
+ /**
+ * Get metrics for an app grouped by the requested @TemporalInfo which is a
+ * part of the @TimelineAppMetricCacheKey
+ * @param key @TimelineAppMetricCacheKey
+ * @return @org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics
+ */
+ public TimelineMetrics getAppTimelineMetricsFromCache(TimelineAppMetricCacheKey key) throws IllegalArgumentException {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Fetching metrics with key: " + key);
+ }
+
+ // Make sure key is valid
+ validateKey(key);
+
+ Element element = get(key);
+ TimelineMetrics timelineMetrics = new TimelineMetrics();
+ if (element != null) {
+ TimelineMetricsCacheValue value = (TimelineMetricsCacheValue) element.getObjectValue();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Returning value from cache: " + value);
+ }
+ timelineMetrics.setMetrics(new ArrayList<TimelineMetric>(value.getTimelineMetrics().values()));
+ }
+
+ if (LOG.isDebugEnabled()) {
+ // Print stats every 100 calls - Note: Supported in debug mode only
+ if (printCacheStatsCounter.getAndIncrement() == 0) {
+ LOG.debug("Metrics cache stats => \n" + this.getStatistics());
+ } else {
+ printCacheStatsCounter.compareAndSet(100, 0);
+ }
+ }
+
+ return timelineMetrics;
+ }
+
+ /**
+ * Set new time bounds on the cache key so that update can use the new
+ * query window. We do this quietly which means regular get/update logic is
+ * not invoked.
+ */
+ @Override
+ public Element get(Object key) throws LockTimeoutException {
+ Element element = this.getQuiet(key);
+ if (element != null) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("key : " + element.getObjectKey());
+ LOG.trace("value : " + element.getObjectValue());
+ }
+
+ // Set new time boundaries on the key
+ TimelineAppMetricCacheKey existingKey = (TimelineAppMetricCacheKey) element.getObjectKey();
+
+ LOG.debug("Existing temporal info: " + existingKey.getTemporalInfo() +
+ " for : " + existingKey.getMetricNames());
+
+ TimelineAppMetricCacheKey newKey = (TimelineAppMetricCacheKey) key;
+ existingKey.setTemporalInfo(newKey.getTemporalInfo());
+
+ LOG.debug("New temporal info: " + newKey.getTemporalInfo() +
+ " for : " + existingKey.getMetricNames());
+ }
+
+ return super.get(key);
+ }
+
+ private void validateKey(TimelineAppMetricCacheKey key) throws IllegalArgumentException {
+ StringBuilder msg = new StringBuilder("Invalid metric key requested.");
+ boolean throwException = false;
+
+ if (key.getTemporalInfo() == null) {
+ msg.append(" No temporal info provided.");
+ throwException = true;
+ }
+
+ if (key.getSpec() == null) {
+ msg.append(" Missing call spec for metric request.");
+ }
+
+ if (throwException) {
+ throw new IllegalArgumentException(msg.toString());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/b4ee1918/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/cache/TimelineMetricCacheEntryFactory.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/cache/TimelineMetricCacheEntryFactory.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/cache/TimelineMetricCacheEntryFactory.java
new file mode 100644
index 0000000..597f037
--- /dev/null
+++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/cache/TimelineMetricCacheEntryFactory.java
@@ -0,0 +1,299 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.server.controller.metrics.timeline.cache;
+
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import net.sf.ehcache.constructs.blocking.UpdatingCacheEntryFactory;
+import org.apache.ambari.server.configuration.ComponentSSLConfiguration;
+import org.apache.ambari.server.configuration.Configuration;
+import org.apache.ambari.server.controller.internal.URLStreamProvider;
+import org.apache.ambari.server.controller.metrics.timeline.MetricsRequestHelper;
+import org.apache.ambari.server.controller.spi.TemporalInfo;
+import org.apache.ambari.server.controller.utilities.StreamProvider;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
+import org.apache.http.client.utils.URIBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+
+@Singleton
+public class TimelineMetricCacheEntryFactory implements UpdatingCacheEntryFactory {
+ private final static Logger LOG = LoggerFactory.getLogger(TimelineMetricCacheEntryFactory.class);
+ // Not declared final to ease unit test code and allow streamProvider
+ // injection
+ private MetricsRequestHelper requestHelperForGets;
+ private MetricsRequestHelper requestHelperForUpdates;
+ private final Long BUFFER_TIME_DIFF_CATCHUP_INTERVAL;
+
+ @Inject
+ public TimelineMetricCacheEntryFactory(Configuration configuration) {
+ // Longer timeout for first cache miss
+ requestHelperForGets = new MetricsRequestHelper(new URLStreamProvider(
+ configuration.getMetricsRequestConnectTimeoutMillis(),
+ configuration.getMetricsRequestReadTimeoutMillis(),
+ ComponentSSLConfiguration.instance()));
+
+ // Timeout setting different from first request timeout
+ // Allows stale data to be returned at the behest of performance.
+ requestHelperForUpdates = new MetricsRequestHelper(new URLStreamProvider(
+ configuration.getMetricsRequestConnectTimeoutMillis(),
+ configuration.getMetricsRequestIntervalReadTimeoutMillis(),
+ ComponentSSLConfiguration.instance()));
+
+ BUFFER_TIME_DIFF_CATCHUP_INTERVAL = configuration.getMetricRequestBufferTimeCatchupInterval();
+ }
+
+ /**
+ * This method is called on a get element from cache call when key is not
+ * found in cache, returns a value for the key to be cached.
+ *
+ * @param key @org.apache.ambari.server.controller.metrics.timeline.cache.TimelineAppMetricCacheKey
+ * @return @org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics
+ * @throws Exception
+ */
+ @Override
+ public Object createEntry(Object key) throws Exception {
+ LOG.debug("Creating cache entry since none exists, key = " + key);
+ TimelineAppMetricCacheKey metricCacheKey = (TimelineAppMetricCacheKey) key;
+
+ TimelineMetrics timelineMetrics =
+ requestHelperForGets.fetchTimelineMetrics(metricCacheKey.getSpec());
+
+ TimelineMetricsCacheValue value = null;
+
+ if (timelineMetrics != null && !timelineMetrics.getMetrics().isEmpty()) {
+ Map<String, TimelineMetric> cacheValue =
+ new HashMap<String, TimelineMetric>(timelineMetrics.getMetrics().size());
+ for (TimelineMetric metric : timelineMetrics.getMetrics()) {
+ cacheValue.put(metric.getMetricName(), metric);
+ }
+
+ value = new TimelineMetricsCacheValue(
+ metricCacheKey.getTemporalInfo().getStartTime(),
+ metricCacheKey.getTemporalInfo().getEndTime(),
+ cacheValue // Null or empty should prompt a refresh
+ );
+
+ LOG.debug("Created cache entry: " + value);
+ }
+
+ return value;
+ }
+
+ /**
+ * Called on a get call for existing values in the cache,
+ * the necessary locking code is present in the get call and this call
+ * should update the value of the cache entry before returning.
+ *
+ * @param key @org.apache.ambari.server.controller.metrics.timeline.cache.TimelineAppMetricCacheKey
+ * @param value @org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics
+ * @throws Exception
+ */
+ @Override
+ public void updateEntryValue(Object key, Object value) throws Exception {
+ TimelineAppMetricCacheKey metricCacheKey = (TimelineAppMetricCacheKey) key;
+ TimelineMetricsCacheValue existingMetrics = (TimelineMetricsCacheValue) value;
+
+ LOG.debug("Updating cache entry, key: " + key + ", with value = " + value);
+
+ Long existingSeriesStartTime = existingMetrics.getStartTime();
+ Long existingSeriesEndTime = existingMetrics.getEndTime();
+
+ TemporalInfo newTemporalInfo = metricCacheKey.getTemporalInfo();
+ Long requestedStartTime = newTemporalInfo.getStartTime();
+ Long requestedEndTime = newTemporalInfo.getEndTime();
+
+ // Calculate new start and end times
+ URIBuilder uriBuilder = new URIBuilder(metricCacheKey.getSpec());
+ Long newStartTime = getRefreshRequestStartTime(existingSeriesStartTime,
+ existingSeriesEndTime, requestedStartTime);
+ Long newEndTime = getRefreshRequestEndTime(existingSeriesStartTime,
+ existingSeriesEndTime, requestedEndTime);
+
+ // Cover complete overlap scenario
+ // time axis: |-------- exSt ----- reqSt ------ exEnd ----- extEnd ---------|
+ if (newEndTime > newStartTime &&
+ !(newStartTime.equals(existingSeriesStartTime) &&
+ newEndTime.equals(existingSeriesEndTime))) {
+
+ LOG.debug("Existing cached timeseries startTime = " +
+ new Date(getMillisecondsTime(existingSeriesStartTime)) + ", endTime = " +
+ new Date(getMillisecondsTime(existingSeriesEndTime)));
+
+ LOG.debug("Requested timeseries startTime = " +
+ new Date(getMillisecondsTime(newStartTime)) + ", endTime = " +
+ new Date(getMillisecondsTime(newEndTime)));
+
+ // Update spec with new start and end time
+ uriBuilder.setParameter("startTime", String.valueOf(newStartTime));
+ uriBuilder.setParameter("endTime", String.valueOf(newEndTime));
+
+ try {
+ TimelineMetrics newTimeSeries = requestHelperForUpdates.fetchTimelineMetrics(uriBuilder.toString());
+
+ // Update existing time series with new values
+ updateTimelineMetricsInCache(newTimeSeries, existingMetrics,
+ getMillisecondsTime(requestedStartTime),
+ getMillisecondsTime(requestedEndTime));
+
+ // Replace old boundary values
+ existingMetrics.setStartTime(requestedStartTime);
+ existingMetrics.setEndTime(requestedEndTime);
+
+ } catch (IOException io) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Exception retrieving metrics.", io);
+ }
+ }
+ } else {
+ LOG.debug("Skip updating cache with new startTime = " +
+ new Date(getMillisecondsTime(newStartTime)) +
+ ", new endTime = " + new Date(getMillisecondsTime(newEndTime)));
+ }
+ }
+
+ /**
+ * Update cache with new timeseries data
+ */
+ protected void updateTimelineMetricsInCache(TimelineMetrics newMetrics,
+ TimelineMetricsCacheValue timelineMetricsCacheValue,
+ Long requestedStartTime, Long requestedEndTime) {
+
+ Map<String, TimelineMetric> existingTimelineMetricMap = timelineMetricsCacheValue.getTimelineMetrics();
+
+ // NOTE: Metrics names so far are unique, the Map optimization avoids
+ // multiple iterations of the List
+ for (TimelineMetric timelineMetric : newMetrics.getMetrics()) {
+ if (LOG.isTraceEnabled()) {
+ TreeMap<Long, Double> sortedMetrics = new TreeMap<Long, Double>(timelineMetric.getMetricValues());
+
+ LOG.trace("New metric: " + timelineMetric.getMetricName() +
+ " # " + timelineMetric.getMetricValues().size() + ", startTime = " +
+ sortedMetrics.firstKey() + ", endTime = " + sortedMetrics.lastKey());
+ }
+
+
+ TimelineMetric existingMetric = existingTimelineMetricMap.get(timelineMetric.getMetricName());
+
+ if (existingMetric != null) {
+ Map<Long, Double> existingMetricValues = existingMetric.getMetricValues();
+ LOG.trace("Existing metric: " + timelineMetric.getMetricName() +
+ " # " + existingMetricValues.size());
+
+ Iterator<Map.Entry<Long, Double>> valueIterator = existingMetricValues.entrySet().iterator();
+
+ // Remove old values
+ // Assumption: All return value are millis
+ while (valueIterator.hasNext()) {
+ Map.Entry<Long, Double> metricEntry = valueIterator.next();
+ if (metricEntry.getKey() < requestedStartTime
+ || metricEntry.getKey() > requestedEndTime) {
+ valueIterator.remove();
+ }
+ }
+
+ // Add new ones
+ existingMetricValues.putAll(timelineMetric.getMetricValues());
+
+ if (LOG.isTraceEnabled()) {
+ TreeMap<Long, Double> sortedMetrics = new TreeMap<Long, Double>(existingMetricValues);
+ LOG.trace("Merged metric: " + timelineMetric.getMetricName() + ", " +
+ "Final size: " + existingMetricValues.size() + ", startTime = " +
+ sortedMetrics.firstKey() + ", endTime = " + sortedMetrics.lastKey());
+ }
+ } else {
+ existingTimelineMetricMap.put(timelineMetric.getMetricName(), timelineMetric);
+ }
+ }
+ }
+
+ // Scenario: Regular graph updates
+ // time axis: |-------- exSt ----- reqSt ------ exEnd ----- reqEnd ---------|
+ // Scenario: Selective graph updates
+ // time axis: |-------- exSt ----- exEnd ------ reqSt ----- reqEnd ---------|
+ // Scenario: Extended time window
+ // time axis: |-------- reSt ----- exSt ------- extEnd ---- reqEnd ---------|
+ protected Long getRefreshRequestStartTime(Long existingSeriesStartTime,
+ Long existingSeriesEndTime, Long requestedStartTime) {
+ Long diff = requestedStartTime - existingSeriesEndTime;
+ Long startTime = requestedStartTime;
+
+ if (diff < 0 && requestedStartTime > existingSeriesStartTime) {
+ // Regular graph updates
+ // Overlapping timeseries data refresh only new part
+ // Account for missing data on the trailing edge due to buffering
+ startTime = getTimeShiftedStartTime(existingSeriesEndTime);
+ }
+
+ LOG.trace("Requesting timeseries data with new startTime = " +
+ new Date(getMillisecondsTime(startTime)));
+
+ return startTime;
+ }
+
+ // Scenario: Regular graph updates
+ // time axis: |-------- exSt ----- reqSt ------ exEnd ----- reqEnd ---------|
+ // Scenario: Old data request /w overlap
+ // time axis: |-------- reqSt ----- exSt ------ reqEnd ----- extEnd --------|
+ // Scenario: Very Old data request /wo overlap
+ // time axis: |-------- reqSt ----- reqEnd ------ exSt ----- extEnd --------|
+ protected Long getRefreshRequestEndTime(Long existingSeriesStartTime,
+ Long existingSeriesEndTime, Long requestedEndTime) {
+ Long endTime = requestedEndTime;
+ Long diff = requestedEndTime - existingSeriesEndTime;
+ if (diff < 0 && requestedEndTime > existingSeriesStartTime) {
+ // End time overlaps existing timeseries
+ // Get only older data that might not be in the cache
+ endTime = existingSeriesStartTime;
+ }
+
+ LOG.trace("Requesting timeseries data with new endTime = " +
+ new Date(getMillisecondsTime(endTime)));
+ return endTime;
+ }
+
+ /**
+ * Time shift by a constant taking into account Epoch vs millis
+ */
+ private long getTimeShiftedStartTime(long startTime) {
+ if (startTime < 9999999999l) {
+ // Epoch time
+ return startTime - (BUFFER_TIME_DIFF_CATCHUP_INTERVAL / 1000);
+ } else {
+ return startTime - BUFFER_TIME_DIFF_CATCHUP_INTERVAL;
+ }
+ }
+
+ private long getMillisecondsTime(long time) {
+ if (time < 9999999999l) {
+ return time * 1000;
+ } else {
+ return time;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/b4ee1918/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/cache/TimelineMetricCacheProvider.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/cache/TimelineMetricCacheProvider.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/cache/TimelineMetricCacheProvider.java
new file mode 100644
index 0000000..8df957e
--- /dev/null
+++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/cache/TimelineMetricCacheProvider.java
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.server.controller.metrics.timeline.cache;
+
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import net.sf.ehcache.Cache;
+import net.sf.ehcache.CacheManager;
+import net.sf.ehcache.config.CacheConfiguration;
+import net.sf.ehcache.store.MemoryStoreEvictionPolicy;
+import org.apache.ambari.server.configuration.Configuration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Cache implementation that provides ability to perform incremental reads
+ * from Metrics backend and reduce the amount of calls between Ambari and the
+ * Metrics backend.
+ */
+@Singleton
+public class TimelineMetricCacheProvider {
+ private TimelineMetricCache timelineMetricsCache;
+ private volatile boolean isCacheInitialized = false;
+ public static final String TIMELINE_METRIC_CACHE_INSTANCE_NAME = "timelineMetricCache";
+
+ Configuration configuration;
+ TimelineMetricCacheEntryFactory cacheEntryFactory;
+
+ private final static Logger LOG = LoggerFactory.getLogger(TimelineMetricCacheProvider.class);
+
+ @Inject
+ public TimelineMetricCacheProvider(Configuration configuration,
+ TimelineMetricCacheEntryFactory cacheEntryFactory) {
+ this.configuration = configuration;
+ this.cacheEntryFactory = cacheEntryFactory;
+ }
+
+ private synchronized void initializeCache() {
+ // Check in case of contention to avoid ObjectExistsException
+ if (isCacheInitialized) {
+ return;
+ }
+
+ //Create a singleton CacheManager using defaults
+ System.setProperty("net.sf.ehcache.skipUpdateCheck", "true");
+ CacheManager manager = CacheManager.getInstance();
+
+ LOG.info("Creating Metrics Cache with timeouts => ttl = " +
+ configuration.getMetricCacheTTLSeconds() + ", idle = " +
+ configuration.getMetricCacheIdleSeconds());
+
+ //Create a Cache specifying its configuration.
+ Cache cache = new Cache(
+ new CacheConfiguration(TIMELINE_METRIC_CACHE_INSTANCE_NAME, configuration.getMetricCacheMaxEntries())
+ .timeToLiveSeconds(configuration.getMetricCacheTTLSeconds()) // 1 hour
+ .timeToIdleSeconds(configuration.getMetricCacheIdleSeconds()) // 5 minutes
+ .memoryStoreEvictionPolicy(MemoryStoreEvictionPolicy.LRU)
+ .eternal(false)
+ .diskPersistent(false)
+ .overflowToDisk(false)
+ .statistics(LOG.isDebugEnabled() || LOG.isTraceEnabled())
+ );
+
+ timelineMetricsCache = new TimelineMetricCache(cache, cacheEntryFactory);
+
+ LOG.info("Registering metrics cache with provider: name = " +
+ cache.getName() + ", guid: " + cache.getGuid());
+
+ manager.addCache(timelineMetricsCache);
+
+ isCacheInitialized = true;
+ }
+
+ /**
+ * Return an instance of a Ehcache
+ * @return @TimelineMetricCache or null if caching is disabled through config.
+ */
+ public TimelineMetricCache getTimelineMetricsCache() {
+ if (configuration.isMetricsCacheDisabled()) {
+ return null;
+ }
+
+ if (!isCacheInitialized) {
+ initializeCache();
+ }
+ return timelineMetricsCache;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/b4ee1918/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/cache/TimelineMetricsCacheValue.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/cache/TimelineMetricsCacheValue.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/cache/TimelineMetricsCacheValue.java
new file mode 100644
index 0000000..f9f1f54
--- /dev/null
+++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/cache/TimelineMetricsCacheValue.java
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.server.controller.metrics.timeline.cache;
+
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
+
+import java.util.Date;
+import java.util.Map;
+
+/**
+ * Wrapper object for metrics returned from AMS that includes the query time
+ * window.
+ */
+public class TimelineMetricsCacheValue {
+ private Long startTime;
+ private Long endTime;
+ private Map<String, TimelineMetric> timelineMetrics;
+
+ public TimelineMetricsCacheValue(Long startTime, Long endTime, Map<String, TimelineMetric> timelineMetrics) {
+ this.startTime = startTime;
+ this.endTime = endTime;
+ this.timelineMetrics = timelineMetrics;
+ }
+
+ public Map<String, TimelineMetric> getTimelineMetrics() {
+ return timelineMetrics;
+ }
+
+ /**
+ * Map of metricName to metric values. Works on the assumption that metric
+ * name is unique
+ */
+ public void setTimelineMetrics(Map<String, TimelineMetric> timelineMetrics) {
+ this.timelineMetrics = timelineMetrics;
+ }
+
+ public Long getStartTime() {
+ return startTime;
+ }
+
+ public void setStartTime(Long startTime) {
+ this.startTime = startTime;
+ }
+
+ public Long getEndTime() {
+ return endTime;
+ }
+
+ public void setEndTime(Long endTime) {
+ this.endTime = endTime;
+ }
+
+ private long getMillisecondsTime(long time) {
+ if (time < 9999999999l) {
+ return time * 1000;
+ } else {
+ return time;
+ }
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder("TimelineMetricsCacheValue {" +
+ "metricNames = " + timelineMetrics.keySet() +
+ ", startTime = " + new Date(getMillisecondsTime(startTime)) +
+ ", endTime = " + new Date(getMillisecondsTime(endTime)) +
+ ", timelineMetrics =");
+
+ for (TimelineMetric metric : timelineMetrics.values()) {
+ sb.append(" { ");
+ sb.append(metric.getMetricName());
+ sb.append(" # ");
+ sb.append(metric.getMetricValues().size());
+ sb.append(" }");
+ }
+ sb.append("}");
+ return sb.toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/b4ee1918/ambari-server/src/test/java/org/apache/ambari/server/controller/internal/StackDefinedPropertyProviderTest.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/controller/internal/StackDefinedPropertyProviderTest.java b/ambari-server/src/test/java/org/apache/ambari/server/controller/internal/StackDefinedPropertyProviderTest.java
index b8e0596..3ba79ca 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/controller/internal/StackDefinedPropertyProviderTest.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/controller/internal/StackDefinedPropertyProviderTest.java
@@ -19,6 +19,7 @@ package org.apache.ambari.server.controller.internal;
import java.io.IOException;
import java.io.InputStream;
+import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
@@ -27,11 +28,18 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
+import com.google.inject.Binder;
+import com.google.inject.Module;
+import com.google.inject.util.Modules;
+import org.apache.ambari.server.configuration.Configuration;
import org.apache.ambari.server.controller.jmx.TestStreamProvider;
import org.apache.ambari.server.controller.metrics.JMXPropertyProviderTest;
import org.apache.ambari.server.controller.metrics.MetricsServiceProvider;
import org.apache.ambari.server.controller.metrics.ganglia.GangliaPropertyProviderTest.TestGangliaHostProvider;
import org.apache.ambari.server.controller.metrics.ganglia.GangliaPropertyProviderTest.TestGangliaServiceProvider;
+import org.apache.ambari.server.controller.metrics.timeline.MetricsRequestHelper;
+import org.apache.ambari.server.controller.metrics.timeline.cache.TimelineMetricCacheEntryFactory;
+import org.apache.ambari.server.controller.metrics.timeline.cache.TimelineMetricCacheProvider;
import org.apache.ambari.server.controller.spi.Predicate;
import org.apache.ambari.server.controller.spi.PropertyProvider;
import org.apache.ambari.server.controller.spi.Request;
@@ -52,6 +60,7 @@ import org.apache.ambari.server.state.stack.Metric;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
+import org.junit.BeforeClass;
import org.junit.Test;
import com.google.inject.Guice;
@@ -70,11 +79,29 @@ public class StackDefinedPropertyProviderTest {
private Injector injector = null;
private OrmTestHelper helper = null;
+ private static TimelineMetricCacheEntryFactory cacheEntryFactory;
+ private static TimelineMetricCacheProvider cacheProvider;
+
+ @BeforeClass
+ public static void setupCache() {
+ cacheEntryFactory = new TimelineMetricCacheEntryFactory(new Configuration());
+ cacheProvider = new TimelineMetricCacheProvider(new Configuration(), cacheEntryFactory);
+ }
+
+ public class TestModuleWithCacheProvider implements Module {
+ @Override
+ public void configure(Binder binder) {
+ binder.bind(TimelineMetricCacheProvider.class).toInstance(cacheProvider);
+ }
+ }
+
@Before
public void setup() throws Exception {
InMemoryDefaultTestModule module = new InMemoryDefaultTestModule();
-
- injector = Guice.createInjector(module);
+ // Use the same cache provider to ensure there is only once instance of
+ // Cache available. The @net.sf.ehcache.CacheManager is a singleton and
+ // does not allow multiple instance with same cache name to be registered.
+ injector = Guice.createInjector(Modules.override(module).with(new TestModuleWithCacheProvider()));
injector.getInstance(GuiceJpaInitializer.class);
StackDefinedPropertyProvider.init(injector);
@@ -404,10 +431,10 @@ public class StackDefinedPropertyProviderTest {
Assert.assertEquals(12, resource.getPropertyValue(PropertyHelper.getPropertyId("metrics/yarn/Queue/root/default", "AggregateContainersReleased")));
Assert.assertEquals(8192, resource.getPropertyValue(PropertyHelper.getPropertyId("metrics/yarn/Queue/root/default", "AvailableMB")));
Assert.assertEquals(1, resource.getPropertyValue(PropertyHelper.getPropertyId("metrics/yarn/Queue/root/default", "AvailableVCores")));
- Assert.assertEquals(47, resource.getPropertyValue(PropertyHelper.getPropertyId("metrics/yarn/Queue/root/default", "AppsSubmitted")));
+ Assert.assertEquals(47, resource.getPropertyValue(PropertyHelper.getPropertyId("metrics/yarn/Queue/root/default", "AppsSubmitted")));
- Assert.assertEquals(4, resource.getPropertyValue(PropertyHelper.getPropertyId("metrics/yarn/Queue/root/second_queue", "AggregateContainersAllocated")));
- Assert.assertEquals(4, resource.getPropertyValue(PropertyHelper.getPropertyId("metrics/yarn/Queue/root/second_queue", "AggregateContainersReleased")));
+ Assert.assertEquals(4, resource.getPropertyValue(PropertyHelper.getPropertyId("metrics/yarn/Queue/root/second_queue", "AggregateContainersAllocated")));
+ Assert.assertEquals(4, resource.getPropertyValue(PropertyHelper.getPropertyId("metrics/yarn/Queue/root/second_queue", "AggregateContainersReleased")));
Assert.assertEquals(6048, resource.getPropertyValue(PropertyHelper.getPropertyId("metrics/yarn/Queue/root/second_queue", "AvailableMB")));
Assert.assertEquals(1, resource.getPropertyValue(PropertyHelper.getPropertyId("metrics/yarn/Queue/root/second_queue", "AvailableVCores")));
Assert.assertEquals(1, resource.getPropertyValue(PropertyHelper.getPropertyId("metrics/yarn/Queue/root/second_queue", "AppsSubmitted")));
@@ -688,8 +715,6 @@ public class StackDefinedPropertyProviderTest {
Assert.assertEquals(8444, resource.getPropertyValue(PropertyHelper.getPropertyId("metrics/dfs/journalnode/cluster/mycluster", "lastWrittenTxId")));
}
-
-
@Test
public void testPopulateResources_jmx_Storm() throws Exception {
// Adjust stack version for cluster
@@ -818,7 +843,6 @@ public class StackDefinedPropertyProviderTest {
Assert.assertTrue(map.get("metrics/hbase/master").containsKey("IsActiveMaster"));
}
-
@Test
public void testPopulateResources_params_category5() throws Exception {
org.apache.ambari.server.controller.metrics.ganglia.TestStreamProvider streamProvider =
@@ -1042,7 +1066,7 @@ public class StackDefinedPropertyProviderTest {
org.apache.ambari.server.controller.metrics.ganglia.TestStreamProvider streamProvider =
new org.apache.ambari.server.controller.metrics.ganglia.TestStreamProvider("ams/aggregate_component_metric.json");
-
+ injectCacheEntryFactoryWithStreamProvider(streamProvider);
JMXPropertyProviderTest.TestJMXHostProvider jmxHostProvider = new JMXPropertyProviderTest.TestJMXHostProvider(true);
TestGangliaHostProvider hostProvider = new TestGangliaHostProvider();
MetricsServiceProvider serviceProvider = new MetricsServiceProvider() {
@@ -1085,4 +1109,10 @@ public class StackDefinedPropertyProviderTest {
Assert.assertEquals(32, metricsArray.length);
}
+ /* Since streamProviders are not injected this hack becomes necessary */
+ private void injectCacheEntryFactoryWithStreamProvider(StreamProvider streamProvider) throws Exception {
+ Field field = TimelineMetricCacheEntryFactory.class.getDeclaredField("requestHelperForGets");
+ field.setAccessible(true);
+ field.set(cacheEntryFactory, new MetricsRequestHelper(streamProvider));
+ }
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/b4ee1918/ambari-server/src/test/java/org/apache/ambari/server/controller/metrics/timeline/AMSPropertyProviderTest.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/controller/metrics/timeline/AMSPropertyProviderTest.java b/ambari-server/src/test/java/org/apache/ambari/server/controller/metrics/timeline/AMSPropertyProviderTest.java
index c8007c8..71febc9 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/controller/metrics/timeline/AMSPropertyProviderTest.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/controller/metrics/timeline/AMSPropertyProviderTest.java
@@ -20,6 +20,7 @@ package org.apache.ambari.server.controller.metrics.timeline;
import org.apache.ambari.server.AmbariException;
import org.apache.ambari.server.api.services.AmbariMetaInfo;
import org.apache.ambari.server.configuration.ComponentSSLConfiguration;
+import org.apache.ambari.server.configuration.Configuration;
import org.apache.ambari.server.controller.AmbariManagementController;
import org.apache.ambari.server.controller.AmbariServer;
import org.apache.ambari.server.controller.internal.PropertyInfo;
@@ -27,29 +28,34 @@ import org.apache.ambari.server.controller.internal.ResourceImpl;
import org.apache.ambari.server.controller.internal.TemporalInfoImpl;
import org.apache.ambari.server.controller.metrics.MetricHostProvider;
import org.apache.ambari.server.controller.metrics.ganglia.TestStreamProvider;
+import org.apache.ambari.server.controller.metrics.timeline.cache.TimelineMetricCacheEntryFactory;
+import org.apache.ambari.server.controller.metrics.timeline.cache.TimelineMetricCacheProvider;
import org.apache.ambari.server.controller.spi.Request;
import org.apache.ambari.server.controller.spi.Resource;
import org.apache.ambari.server.controller.spi.SystemException;
import org.apache.ambari.server.controller.spi.TemporalInfo;
import org.apache.ambari.server.controller.utilities.PropertyHelper;
+import org.apache.ambari.server.controller.utilities.StreamProvider;
import org.apache.ambari.server.state.Cluster;
import org.apache.ambari.server.state.Clusters;
import org.apache.ambari.server.state.ComponentInfo;
import org.apache.ambari.server.state.StackId;
import org.apache.http.client.utils.URIBuilder;
import org.junit.Assert;
-import org.junit.Ignore;
+import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.powermock.api.easymock.PowerMock;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
+import java.lang.reflect.Field;
+import java.lang.reflect.Modifier;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
@@ -66,6 +72,7 @@ import static org.mockito.Mockito.mock;
@RunWith(PowerMockRunner.class)
@PrepareForTest({AMSPropertyProvider.class, AmbariServer.class})
+@PowerMockIgnore({"javax.xml.parsers.*", "org.xml.sax.*", "net.sf.ehcache.*", "org.apache.log4j.*"})
public class AMSPropertyProviderTest {
private static final String PROPERTY_ID1 = PropertyHelper.getPropertyId("metrics/cpu", "cpu_user");
private static final String PROPERTY_ID2 = PropertyHelper.getPropertyId("metrics/memory", "mem_free");
@@ -82,10 +89,20 @@ public class AMSPropertyProviderTest {
private static final String EMBEDDED_METRICS_FILE_PATH = FILE_PATH_PREFIX + "embedded_host_metric.json";
private static final String AGGREGATE_METRICS_FILE_PATH = FILE_PATH_PREFIX + "aggregate_component_metric.json";
+ private static TimelineMetricCacheEntryFactory cacheEntryFactory;
+ private static TimelineMetricCacheProvider cacheProvider;
+
+ @BeforeClass
+ public static void setupCache() {
+ cacheEntryFactory = new TimelineMetricCacheEntryFactory(new Configuration());
+ cacheProvider = new TimelineMetricCacheProvider(new Configuration(), cacheEntryFactory);
+ }
+
@Test
public void testPopulateResourcesForSingleHostMetric() throws Exception {
setUpCommonMocks();
TestStreamProvider streamProvider = new TestStreamProvider(SINGLE_HOST_METRICS_FILE_PATH);
+ injectCacheEntryFactoryWithStreamProvider(streamProvider);
TestMetricHostProvider metricHostProvider = new TestMetricHostProvider();
ComponentSSLConfiguration sslConfiguration = mock(ComponentSSLConfiguration.class);
@@ -94,6 +111,7 @@ public class AMSPropertyProviderTest {
propertyIds,
streamProvider,
sslConfiguration,
+ cacheProvider,
metricHostProvider,
CLUSTER_NAME_PROPERTY_ID,
HOST_NAME_PROPERTY_ID
@@ -129,6 +147,7 @@ public class AMSPropertyProviderTest {
// given
TestStreamProvider streamProvider = new TestStreamProvider(SINGLE_HOST_METRICS_FILE_PATH);
+ injectCacheEntryFactoryWithStreamProvider(streamProvider);
TestMetricHostProvider metricHostProvider = new TestMetricHostProvider();
ComponentSSLConfiguration sslConfiguration = mock(ComponentSSLConfiguration.class);
Map<String, Map<String, PropertyInfo>> propertyIds = PropertyHelper.getMetricPropertyIds(Resource.Type.Host);
@@ -136,6 +155,7 @@ public class AMSPropertyProviderTest {
propertyIds,
streamProvider,
sslConfiguration,
+ cacheProvider,
metricHostProvider,
CLUSTER_NAME_PROPERTY_ID,
HOST_NAME_PROPERTY_ID
@@ -169,6 +189,7 @@ public class AMSPropertyProviderTest {
public void testPopulateResourcesForMultipleHostMetricscPointInTime() throws Exception {
setUpCommonMocks();
TestStreamProvider streamProvider = new TestStreamProvider(MULTIPLE_HOST_METRICS_FILE_PATH);
+ injectCacheEntryFactoryWithStreamProvider(streamProvider);
TestMetricHostProvider metricHostProvider = new TestMetricHostProvider();
ComponentSSLConfiguration sslConfiguration = mock(ComponentSSLConfiguration.class);
@@ -177,6 +198,7 @@ public class AMSPropertyProviderTest {
propertyIds,
streamProvider,
sslConfiguration,
+ cacheProvider,
metricHostProvider,
CLUSTER_NAME_PROPERTY_ID,
HOST_NAME_PROPERTY_ID
@@ -217,6 +239,7 @@ public class AMSPropertyProviderTest {
public void testPopulateResourcesForMultipleHostMetrics() throws Exception {
setUpCommonMocks();
TestStreamProvider streamProvider = new TestStreamProvider(MULTIPLE_HOST_METRICS_FILE_PATH);
+ injectCacheEntryFactoryWithStreamProvider(streamProvider);
TestMetricHostProvider metricHostProvider = new TestMetricHostProvider();
ComponentSSLConfiguration sslConfiguration = mock(ComponentSSLConfiguration.class);
@@ -225,6 +248,7 @@ public class AMSPropertyProviderTest {
propertyIds,
streamProvider,
sslConfiguration,
+ cacheProvider,
metricHostProvider,
CLUSTER_NAME_PROPERTY_ID,
HOST_NAME_PROPERTY_ID
@@ -276,6 +300,7 @@ public class AMSPropertyProviderTest {
public void testPopulateResourcesForRegexpMetrics() throws Exception {
setUpCommonMocks();
TestStreamProvider streamProvider = new TestStreamProvider(MULTIPLE_COMPONENT_REGEXP_METRICS_FILE_PATH);
+ injectCacheEntryFactoryWithStreamProvider(streamProvider);
TestMetricHostProvider metricHostProvider = new TestMetricHostProvider();
ComponentSSLConfiguration sslConfiguration = mock(ComponentSSLConfiguration.class);
@@ -291,6 +316,7 @@ public class AMSPropertyProviderTest {
propertyIds,
streamProvider,
sslConfiguration,
+ cacheProvider,
metricHostProvider,
CLUSTER_NAME_PROPERTY_ID,
COMPONENT_NAME_PROPERTY_ID
@@ -327,6 +353,7 @@ public class AMSPropertyProviderTest {
public void testPopulateResourcesForSingleComponentMetric() throws Exception {
setUpCommonMocks();
TestStreamProvider streamProvider = new TestStreamProvider(SINGLE_COMPONENT_METRICS_FILE_PATH);
+ injectCacheEntryFactoryWithStreamProvider(streamProvider);
TestMetricHostProvider metricHostProvider = new TestMetricHostProvider();
ComponentSSLConfiguration sslConfiguration = mock(ComponentSSLConfiguration.class);
@@ -337,6 +364,7 @@ public class AMSPropertyProviderTest {
propertyIds,
streamProvider,
sslConfiguration,
+ cacheProvider,
metricHostProvider,
CLUSTER_NAME_PROPERTY_ID,
COMPONENT_NAME_PROPERTY_ID
@@ -394,6 +422,7 @@ public class AMSPropertyProviderTest {
PowerMock.replayAll();
TestStreamProvider streamProvider = new TestStreamProvider(EMBEDDED_METRICS_FILE_PATH);
+ injectCacheEntryFactoryWithStreamProvider(streamProvider);
TestMetricHostProvider metricHostProvider = new TestMetricHostProvider();
ComponentSSLConfiguration sslConfiguration = mock(ComponentSSLConfiguration.class);
@@ -404,6 +433,7 @@ public class AMSPropertyProviderTest {
propertyIds,
streamProvider,
sslConfiguration,
+ cacheProvider,
metricHostProvider,
CLUSTER_NAME_PROPERTY_ID,
COMPONENT_NAME_PROPERTY_ID
@@ -460,6 +490,7 @@ public class AMSPropertyProviderTest {
PowerMock.replayAll();
TestStreamProvider streamProvider = new TestStreamProvider(AGGREGATE_METRICS_FILE_PATH);
+ injectCacheEntryFactoryWithStreamProvider(streamProvider);
TestMetricHostProvider metricHostProvider = new TestMetricHostProvider();
ComponentSSLConfiguration sslConfiguration = mock(ComponentSSLConfiguration.class);
@@ -471,6 +502,7 @@ public class AMSPropertyProviderTest {
propertyIds,
streamProvider,
sslConfiguration,
+ cacheProvider,
metricHostProvider,
CLUSTER_NAME_PROPERTY_ID,
COMPONENT_NAME_PROPERTY_ID
@@ -504,6 +536,7 @@ public class AMSPropertyProviderTest {
public void testFilterOutOfBandMetricData() throws Exception {
setUpCommonMocks();
TestStreamProvider streamProvider = new TestStreamProvider(SINGLE_HOST_METRICS_FILE_PATH);
+ injectCacheEntryFactoryWithStreamProvider(streamProvider);
TestMetricHostProvider metricHostProvider = new TestMetricHostProvider();
ComponentSSLConfiguration sslConfiguration = mock(ComponentSSLConfiguration.class);
@@ -512,6 +545,7 @@ public class AMSPropertyProviderTest {
propertyIds,
streamProvider,
sslConfiguration,
+ cacheProvider,
metricHostProvider,
CLUSTER_NAME_PROPERTY_ID,
HOST_NAME_PROPERTY_ID
@@ -571,6 +605,7 @@ public class AMSPropertyProviderTest {
setUpCommonMocks();
TestStreamProviderForHostComponentHostMetricsTest streamProvider =
new TestStreamProviderForHostComponentHostMetricsTest(null);
+ injectCacheEntryFactoryWithStreamProvider(streamProvider);
TestMetricHostProvider metricHostProvider = new TestMetricHostProvider();
ComponentSSLConfiguration sslConfiguration = mock(ComponentSSLConfiguration.class);
@@ -579,6 +614,7 @@ public class AMSPropertyProviderTest {
propertyIds,
streamProvider,
sslConfiguration,
+ cacheProvider,
metricHostProvider,
CLUSTER_NAME_PROPERTY_ID,
HOST_NAME_PROPERTY_ID,
@@ -696,7 +732,16 @@ public class AMSPropertyProviderTest {
expect(ambariMetaInfo.getComponent(anyObject(String.class),anyObject(String.class),
anyObject(String.class), anyObject(String.class)))
.andReturn(componentInfo).anyTimes();
+
replay(ams, clusters, cluster, ambariMetaInfo);
PowerMock.replayAll();
}
+
+ /* Since streamProviders are not injected this hack becomes necessary */
+ private void injectCacheEntryFactoryWithStreamProvider(StreamProvider streamProvider) throws Exception {
+ Field field = TimelineMetricCacheEntryFactory.class.getDeclaredField("requestHelperForGets");
+ field.setAccessible(true);
+ field.set(cacheEntryFactory, new MetricsRequestHelper(streamProvider));
+ }
+
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/b4ee1918/ambari-server/src/test/java/org/apache/ambari/server/controller/metrics/timeline/AMSReportPropertyProviderTest.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/controller/metrics/timeline/AMSReportPropertyProviderTest.java b/ambari-server/src/test/java/org/apache/ambari/server/controller/metrics/timeline/AMSReportPropertyProviderTest.java
index 3ee64fa..99a2102 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/controller/metrics/timeline/AMSReportPropertyProviderTest.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/controller/metrics/timeline/AMSReportPropertyProviderTest.java
@@ -18,18 +18,24 @@
package org.apache.ambari.server.controller.metrics.timeline;
import org.apache.ambari.server.configuration.ComponentSSLConfiguration;
+import org.apache.ambari.server.configuration.Configuration;
import org.apache.ambari.server.controller.internal.PropertyInfo;
import org.apache.ambari.server.controller.internal.ResourceImpl;
import org.apache.ambari.server.controller.internal.TemporalInfoImpl;
import org.apache.ambari.server.controller.metrics.ganglia.TestStreamProvider;
+import org.apache.ambari.server.controller.metrics.timeline.cache.TimelineMetricCacheEntryFactory;
+import org.apache.ambari.server.controller.metrics.timeline.cache.TimelineMetricCacheProvider;
import org.apache.ambari.server.controller.spi.Request;
import org.apache.ambari.server.controller.spi.Resource;
import org.apache.ambari.server.controller.spi.TemporalInfo;
import org.apache.ambari.server.controller.utilities.PropertyHelper;
+import org.apache.ambari.server.controller.utilities.StreamProvider;
import org.apache.http.client.utils.URIBuilder;
import org.junit.Assert;
+import org.junit.BeforeClass;
import org.junit.Test;
import java.io.File;
+import java.lang.reflect.Field;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
@@ -44,9 +50,19 @@ public class AMSReportPropertyProviderTest {
private static final String SINGLE_HOST_METRICS_FILE_PATH = FILE_PATH_PREFIX + "single_host_metric.json";
private static final String AGGREGATE_CLUSTER_METRICS_FILE_PATH = FILE_PATH_PREFIX + "aggregate_cluster_metrics.json";
+ private static TimelineMetricCacheEntryFactory cacheEntryFactory;
+ private static TimelineMetricCacheProvider cacheProvider;
+
+ @BeforeClass
+ public static void setupCache() {
+ cacheEntryFactory = new TimelineMetricCacheEntryFactory(new Configuration());
+ cacheProvider = new TimelineMetricCacheProvider(new Configuration(), cacheEntryFactory);
+ }
+
@Test
public void testPopulateResources() throws Exception {
TestStreamProvider streamProvider = new TestStreamProvider(SINGLE_HOST_METRICS_FILE_PATH);
+ injectCacheEntryFactoryWithStreamProvider(streamProvider);
TestMetricHostProvider metricHostProvider = new TestMetricHostProvider();
ComponentSSLConfiguration sslConfiguration = mock(ComponentSSLConfiguration.class);
@@ -58,6 +74,7 @@ public class AMSReportPropertyProviderTest {
propertyIds,
streamProvider,
sslConfiguration,
+ cacheProvider,
metricHostProvider,
CLUSTER_NAME_PROPERTY_ID
);
@@ -88,6 +105,7 @@ public class AMSReportPropertyProviderTest {
@Test
public void testPopulateResourceWithAggregateFunction() throws Exception {
TestStreamProvider streamProvider = new TestStreamProvider(AGGREGATE_CLUSTER_METRICS_FILE_PATH);
+ injectCacheEntryFactoryWithStreamProvider(streamProvider);
TestMetricHostProvider metricHostProvider = new TestMetricHostProvider();
ComponentSSLConfiguration sslConfiguration = mock(ComponentSSLConfiguration.class);
@@ -99,6 +117,7 @@ public class AMSReportPropertyProviderTest {
propertyIds,
streamProvider,
sslConfiguration,
+ cacheProvider,
metricHostProvider,
CLUSTER_NAME_PROPERTY_ID
);
@@ -125,4 +144,11 @@ public class AMSReportPropertyProviderTest {
Number[][] val = (Number[][]) res.getPropertyValue("metrics/cpu/User._sum");
Assert.assertEquals(90, val.length);
}
+
+ /* Since streamProviders are not injected this hack becomes necessary */
+ private void injectCacheEntryFactoryWithStreamProvider(StreamProvider streamProvider) throws Exception {
+ Field field = TimelineMetricCacheEntryFactory.class.getDeclaredField("requestHelperForGets");
+ field.setAccessible(true);
+ field.set(cacheEntryFactory, new MetricsRequestHelper(streamProvider));
+ }
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/b4ee1918/ambari-server/src/test/java/org/apache/ambari/server/controller/metrics/timeline/cache/TimelineMetricCacheTest.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/controller/metrics/timeline/cache/TimelineMetricCacheTest.java b/ambari-server/src/test/java/org/apache/ambari/server/controller/metrics/timeline/cache/TimelineMetricCacheTest.java
new file mode 100644
index 0000000..b16024b
--- /dev/null
+++ b/ambari-server/src/test/java/org/apache/ambari/server/controller/metrics/timeline/cache/TimelineMetricCacheTest.java
@@ -0,0 +1,365 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.server.controller.metrics.timeline.cache;
+
+import com.google.inject.Binder;
+import com.google.inject.Guice;
+import com.google.inject.Injector;
+import com.google.inject.Module;
+import junit.framework.Assert;
+import net.sf.ehcache.Cache;
+import net.sf.ehcache.CacheManager;
+import net.sf.ehcache.constructs.blocking.UpdatingCacheEntryFactory;
+import net.sf.ehcache.constructs.blocking.UpdatingSelfPopulatingCache;
+import org.apache.ambari.server.configuration.Configuration;
+import org.apache.ambari.server.controller.internal.TemporalInfoImpl;
+import org.apache.ambari.server.controller.spi.TemporalInfo;
+import org.apache.ambari.server.state.stack.OsFamily;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
+import org.easymock.IAnswer;
+import org.junit.After;
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
+import static org.apache.ambari.server.controller.metrics.timeline.cache.TimelineMetricCacheProvider.TIMELINE_METRIC_CACHE_INSTANCE_NAME;
+import static org.easymock.EasyMock.anyObject;
+import static org.easymock.EasyMock.createMock;
+import static org.easymock.EasyMock.createMockBuilder;
+import static org.easymock.EasyMock.createNiceMock;
+import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.expectLastCall;
+import static org.easymock.EasyMock.getCurrentArguments;
+import static org.easymock.EasyMock.replay;
+import static org.easymock.EasyMock.verify;
+
+public class TimelineMetricCacheTest {
+
+ private TimelineMetricCacheProvider getMetricCacheProvider(
+ final Configuration configuration,
+ final TimelineMetricCacheEntryFactory cacheEntryFactory) {
+
+ Injector injector = Guice.createInjector(new Module() {
+ @Override
+ public void configure(Binder binder) {
+ binder.bind(OsFamily.class).toInstance(createNiceMock(OsFamily.class));
+ binder.bind(Configuration.class).toInstance(configuration);
+ binder.bind(TimelineMetricCacheEntryFactory.class).toInstance(cacheEntryFactory);
+ }
+ });
+ return injector.getInstance(TimelineMetricCacheProvider.class);
+ }
+
+ @After
+ public void removeCacheInstance() {
+ // Avoids Object Exists Exception on unit tests by adding a new cache for
+ // every provider.
+ CacheManager manager = CacheManager.getInstance();
+ manager.removeCache(TIMELINE_METRIC_CACHE_INSTANCE_NAME);
+ }
+
+ // General cache behavior demonstration
+ @Test
+ public void testSelfPopulatingCacheUpdates() throws Exception {
+ UpdatingCacheEntryFactory cacheEntryFactory = createMock(UpdatingCacheEntryFactory.class);
+
+ StringBuilder value = new StringBuilder("b");
+
+ expect(cacheEntryFactory.createEntry("a")).andReturn(value);
+ cacheEntryFactory.updateEntryValue("a", value);
+ expectLastCall().andAnswer(new IAnswer<Object>() {
+ @Override
+ public Object answer() throws Throwable {
+ String key = (String) getCurrentArguments()[0];
+ StringBuilder value = (StringBuilder) getCurrentArguments()[1];
+ System.out.println("key = " + key + ", value = " + value);
+ value.append("c");
+ return null;
+ }
+ });
+
+ replay(cacheEntryFactory);
+
+ CacheManager manager = CacheManager.getInstance();
+ Cache cache = new Cache("test", 10, false, false, 10000, 10000);
+ UpdatingSelfPopulatingCache testCache = new UpdatingSelfPopulatingCache(cache, cacheEntryFactory);
+ manager.addCache(testCache);
+
+ Assert.assertEquals("b", testCache.get("a").getObjectValue().toString());
+ Assert.assertEquals("bc", testCache.get("a").getObjectValue().toString());
+
+ verify(cacheEntryFactory);
+ }
+
+ @Test
+ public void testTimlineMetricCacheProviderGets() throws Exception {
+ Configuration configuration = createNiceMock(Configuration.class);
+ expect(configuration.getMetricCacheMaxEntries()).andReturn(1000);
+ expect(configuration.getMetricCacheTTLSeconds()).andReturn(3600);
+ expect(configuration.getMetricCacheIdleSeconds()).andReturn(100);
+
+ final long now = System.currentTimeMillis();
+ Map<String, TimelineMetric> valueMap = new HashMap<String, TimelineMetric>();
+ TimelineMetric timelineMetric = new TimelineMetric();
+ timelineMetric.setMetricName("cpu_user");
+ timelineMetric.setAppId("app1");
+ Map<Long, Double> metricValues = new HashMap<Long, Double>();
+ metricValues.put(now + 100, 1.0);
+ metricValues.put(now + 200, 2.0);
+ metricValues.put(now + 300, 3.0);
+ timelineMetric.setMetricValues(metricValues);
+ valueMap.put("cpu_user", timelineMetric);
+
+ TimelineMetricCacheEntryFactory cacheEntryFactory = createMock(TimelineMetricCacheEntryFactory.class);
+
+ TimelineAppMetricCacheKey queryKey = new TimelineAppMetricCacheKey(
+ Collections.singleton("cpu_user"),
+ "app1",
+ new TemporalInfoImpl(now, now + 1000, 1)
+ );
+ TimelineMetricsCacheValue value = new TimelineMetricsCacheValue(now, now + 1000, valueMap);
+ TimelineAppMetricCacheKey testKey = new TimelineAppMetricCacheKey(
+ Collections.singleton("cpu_user"),
+ "app1",
+ new TemporalInfoImpl(now, now + 2000, 1)
+ );
+
+ expect(cacheEntryFactory.createEntry(anyObject())).andReturn(value);
+ cacheEntryFactory.updateEntryValue(testKey, value);
+ expectLastCall().once();
+
+ replay(configuration, cacheEntryFactory);
+
+ TimelineMetricCacheProvider cacheProvider = getMetricCacheProvider(configuration, cacheEntryFactory);
+ TimelineMetricCache cache = cacheProvider.getTimelineMetricsCache();
+
+ // call to get
+ TimelineMetrics metrics = cache.getAppTimelineMetricsFromCache(queryKey);
+ List<TimelineMetric> metricsList = metrics.getMetrics();
+ Assert.assertEquals(1, metricsList.size());
+ TimelineMetric metric = metricsList.iterator().next();
+ Assert.assertEquals("cpu_user", metric.getMetricName());
+ Assert.assertEquals("app1", metric.getAppId());
+ Assert.assertSame(metricValues, metric.getMetricValues());
+
+ // call to update with new key
+ metrics = cache.getAppTimelineMetricsFromCache(testKey);
+ metricsList = metrics.getMetrics();
+ Assert.assertEquals(1, metricsList.size());
+ Assert.assertEquals("cpu_user", metric.getMetricName());
+ Assert.assertEquals("app1", metric.getAppId());
+ Assert.assertSame(metricValues, metric.getMetricValues());
+
+ verify(configuration, cacheEntryFactory);
+ }
+
+ @Test
+ @SuppressWarnings("all")
+ public void testCacheUpdateBoundsOnVariousRequestScenarios() throws Exception {
+ Configuration configuration = createNiceMock(Configuration.class);
+ expect(configuration.getMetricsRequestConnectTimeoutMillis()).andReturn(10000);
+ expect(configuration.getMetricsRequestReadTimeoutMillis()).andReturn(10000);
+ expect(configuration.getMetricsRequestIntervalReadTimeoutMillis()).andReturn(10000);
+ // Disable buffer fudge factor
+ expect(configuration.getMetricRequestBufferTimeCatchupInterval()).andReturn(0l);
+
+ replay(configuration);
+
+ TimelineMetricCacheEntryFactory factory =
+ createMockBuilder(TimelineMetricCacheEntryFactory.class)
+ .withConstructor(configuration).createMock();
+
+ replay(factory);
+
+ long now = System.currentTimeMillis();
+ final long existingSeriesStartTime = now - (3600 * 1000); // now - 1 hour
+ final long existingSeriesEndTime = now;
+
+ // Regular timeseries overlap
+ long requestedStartTime = existingSeriesStartTime + 60000; // + 1 min
+ long requestedEndTime = existingSeriesEndTime + 60000; // + 1 min
+
+ long newStartTime = factory.getRefreshRequestStartTime(existingSeriesStartTime,
+ existingSeriesEndTime, requestedStartTime);
+
+ long newEndTime = factory.getRefreshRequestEndTime(existingSeriesStartTime,
+ existingSeriesEndTime, requestedEndTime);
+
+ Assert.assertEquals(existingSeriesEndTime, newStartTime);
+ Assert.assertEquals(requestedEndTime, newEndTime);
+
+ // Disconnected timeseries graph
+ requestedStartTime = existingSeriesEndTime + 60000; // end + 1 min
+ requestedEndTime = existingSeriesEndTime + 60000 + 3600000; // + 1 min + 1 hour
+
+ newStartTime = factory.getRefreshRequestStartTime(existingSeriesStartTime,
+ existingSeriesEndTime, requestedStartTime);
+
+ newEndTime = factory.getRefreshRequestEndTime(existingSeriesStartTime,
+ existingSeriesEndTime, requestedEndTime);
+
+ Assert.assertEquals(requestedStartTime, newStartTime);
+ Assert.assertEquals(requestedEndTime, newEndTime);
+
+ // Complete overlap
+ requestedStartTime = existingSeriesStartTime - 60000; // - 1 min
+ requestedEndTime = existingSeriesEndTime + 60000; // + 1 min
+
+ newStartTime = factory.getRefreshRequestStartTime(existingSeriesStartTime,
+ existingSeriesEndTime, requestedStartTime);
+
+ newEndTime = factory.getRefreshRequestEndTime(existingSeriesStartTime,
+ existingSeriesEndTime, requestedEndTime);
+
+ Assert.assertEquals(requestedStartTime, newStartTime);
+ Assert.assertEquals(requestedEndTime, newEndTime);
+
+ // Timeseries in the past
+ requestedStartTime = existingSeriesStartTime - 3600000 - 60000; // - 1 hour - 1 min
+ requestedEndTime = existingSeriesStartTime - 60000; // start - 1 min
+
+ newStartTime = factory.getRefreshRequestStartTime(existingSeriesStartTime,
+ existingSeriesEndTime, requestedStartTime);
+
+ newEndTime = factory.getRefreshRequestEndTime(existingSeriesStartTime,
+ existingSeriesEndTime, requestedEndTime);
+
+ Assert.assertEquals(requestedStartTime, newStartTime);
+ Assert.assertEquals(requestedEndTime, newEndTime);
+
+ // Timeseries overlap - no new request needed
+ requestedStartTime = existingSeriesStartTime + 60000; // + 1 min
+ requestedEndTime = existingSeriesEndTime - 60000; // - 1 min
+
+ newStartTime = factory.getRefreshRequestStartTime(existingSeriesStartTime,
+ existingSeriesEndTime, requestedStartTime);
+
+ newEndTime = factory.getRefreshRequestEndTime(existingSeriesStartTime,
+ existingSeriesEndTime, requestedEndTime);
+
+ Assert.assertEquals(newStartTime, existingSeriesEndTime);
+ Assert.assertEquals(newEndTime, existingSeriesStartTime);
+
+ verify(configuration, factory);
+
+ }
+
+ @Test
+ public void testTimelineMetricCacheTimeseriesUpdates() throws Exception {
+ Configuration configuration = createNiceMock(Configuration.class);
+ expect(configuration.getMetricsRequestConnectTimeoutMillis()).andReturn(10000);
+ expect(configuration.getMetricsRequestReadTimeoutMillis()).andReturn(10000);
+ expect(configuration.getMetricsRequestIntervalReadTimeoutMillis()).andReturn(10000);
+ // Disable buffer fudge factor
+ expect(configuration.getMetricRequestBufferTimeCatchupInterval()).andReturn(0l);
+
+ replay(configuration);
+
+ TimelineMetricCacheEntryFactory factory =
+ createMockBuilder(TimelineMetricCacheEntryFactory.class)
+ .withConstructor(configuration).createMock();
+
+ replay(factory);
+
+ long now = System.currentTimeMillis();
+
+ // Existing values
+
+ final TimelineMetric timelineMetric1 = new TimelineMetric();
+ timelineMetric1.setMetricName("cpu_user");
+ timelineMetric1.setAppId("app1");
+ Map<Long, Double> metricValues = new TreeMap<Long, Double>();
+ metricValues.put(now - 100, 1.0);
+ metricValues.put(now - 200, 2.0);
+ metricValues.put(now - 300, 3.0);
+ timelineMetric1.setMetricValues(metricValues);
+ final TimelineMetric timelineMetric2 = new TimelineMetric();
+ timelineMetric2.setMetricName("cpu_nice");
+ timelineMetric2.setAppId("app1");
+ metricValues = new TreeMap<Long, Double>();
+ metricValues.put(now + 400, 1.0);
+ metricValues.put(now + 500, 2.0);
+ metricValues.put(now + 600, 3.0);
+ timelineMetric2.setMetricValues(metricValues);
+
+ TimelineMetricsCacheValue existingMetricValue = new TimelineMetricsCacheValue(
+ now - 1000, now + 1000,
+ new HashMap<String, TimelineMetric>() {{
+ put("cpu_user", timelineMetric1);
+ put("cpu_nice", timelineMetric2);
+ }});
+
+ // New values
+ TimelineMetrics newMetrics = new TimelineMetrics();
+ TimelineMetric timelineMetric3 = new TimelineMetric();
+ timelineMetric3.setMetricName("cpu_user");
+ timelineMetric3.setAppId("app1");
+ metricValues = new TreeMap<Long, Double>();
+ metricValues.put(now + 1400, 1.0);
+ metricValues.put(now + 1500, 2.0);
+ metricValues.put(now + 1600, 3.0);
+ timelineMetric3.setMetricValues(metricValues);
+ newMetrics.getMetrics().add(timelineMetric3);
+
+ factory.updateTimelineMetricsInCache(newMetrics, existingMetricValue,
+ now, now + 2000);
+
+ Assert.assertEquals(2, existingMetricValue.getTimelineMetrics().size());
+ Assert.assertEquals(3, existingMetricValue.getTimelineMetrics().get("cpu_user").getMetricValues().size());
+ Assert.assertEquals(3, existingMetricValue.getTimelineMetrics().get("cpu_nice").getMetricValues().size());
+ Map<Long, Double> newMetricsMap = existingMetricValue.getTimelineMetrics().get("cpu_user").getMetricValues();
+ Iterator<Long> metricKeyIterator = newMetricsMap.keySet().iterator();
+ Assert.assertEquals(now + 1400, metricKeyIterator.next().longValue());
+ Assert.assertEquals(now + 1500, metricKeyIterator.next().longValue());
+ Assert.assertEquals(now + 1600, metricKeyIterator.next().longValue());
+
+ verify(configuration, factory);
+ }
+
+ @Test
+ public void testEqualsOnKeys() {
+ long now = System.currentTimeMillis();
+ TemporalInfo temporalInfo = new TemporalInfoImpl(now - 1000, now, 1);
+
+ TimelineAppMetricCacheKey key1 = new TimelineAppMetricCacheKey(
+ new HashSet<String>() {{ add("cpu_num._avg"); add("proc_run._avg"); }},
+ "HOST",
+ temporalInfo
+ );
+
+ TimelineAppMetricCacheKey key2 = new TimelineAppMetricCacheKey(
+ new HashSet<String>() {{ add("cpu_num._avg"); }},
+ "HOST",
+ temporalInfo
+ );
+
+ Assert.assertFalse(key1.equals(key2));
+ Assert.assertFalse(key2.equals(key1));
+
+ key2.getMetricNames().add("proc_run._avg");
+
+ Assert.assertTrue(key1.equals(key2));
+ }
+}
[2/2] ambari git commit: AMBARI-12654. Create a Caching layer that
provides sliding window behavior for metric requests to Ambari. (swagle)
Posted by sw...@apache.org.
AMBARI-12654. Create a Caching layer that provides sliding window behavior for metric requests to Ambari. (swagle)
Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/b4ee1918
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/b4ee1918
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/b4ee1918
Branch: refs/heads/branch-2.1
Commit: b4ee1918d14af3602a01a1befe6107baa5b1221b
Parents: c9a7030
Author: Siddharth Wagle <sw...@hortonworks.com>
Authored: Mon Aug 17 16:55:31 2015 -0700
Committer: Siddharth Wagle <sw...@hortonworks.com>
Committed: Mon Aug 17 16:55:31 2015 -0700
----------------------------------------------------------------------
.../metrics2/sink/timeline/TimelineMetric.java | 17 +
.../TimelineMetricClusterAggregatorMinute.java | 7 -
ambari-server/pom.xml | 5 +
.../server/configuration/Configuration.java | 117 +++++-
.../controller/AmbariManagementController.java | 4 +
.../AmbariManagementControllerImpl.java | 47 +--
.../ambari/server/controller/AmbariServer.java | 3 +-
.../server/controller/ControllerModule.java | 4 +
.../internal/AbstractProviderModule.java | 20 +-
.../internal/StackDefinedPropertyProvider.java | 8 +-
.../controller/internal/TemporalInfoImpl.java | 12 +-
.../metrics/MetricsPaddingMethod.java | 12 +-
.../metrics/MetricsPropertyProvider.java | 3 +
.../metrics/MetricsPropertyProviderProxy.java | 7 +
.../metrics/MetricsReportPropertyProvider.java | 3 +
.../MetricsReportPropertyProviderProxy.java | 5 +
.../timeline/AMSComponentPropertyProvider.java | 6 +-
.../AMSHostComponentPropertyProvider.java | 7 +-
.../timeline/AMSHostPropertyProvider.java | 6 +-
.../metrics/timeline/AMSPropertyProvider.java | 248 +++++++------
.../timeline/AMSReportPropertyProvider.java | 115 +++---
.../metrics/timeline/MetricsRequestHelper.java | 108 ++++++
.../cache/TimelineAppMetricCacheKey.java | 119 ++++++
.../timeline/cache/TimelineMetricCache.java | 133 +++++++
.../cache/TimelineMetricCacheEntryFactory.java | 299 +++++++++++++++
.../cache/TimelineMetricCacheProvider.java | 104 ++++++
.../cache/TimelineMetricsCacheValue.java | 94 +++++
.../StackDefinedPropertyProviderTest.java | 48 ++-
.../timeline/AMSPropertyProviderTest.java | 49 ++-
.../timeline/AMSReportPropertyProviderTest.java | 26 ++
.../timeline/cache/TimelineMetricCacheTest.java | 365 +++++++++++++++++++
31 files changed, 1760 insertions(+), 241 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ambari/blob/b4ee1918/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetric.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetric.java b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetric.java
index f482e54..8b8df06 100644
--- a/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetric.java
+++ b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetric.java
@@ -43,6 +43,23 @@ public class TimelineMetric implements Comparable<TimelineMetric> {
private String type;
private Map<Long, Double> metricValues = new TreeMap<Long, Double>();
+ // default
+ public TimelineMetric() {
+
+ }
+
+ // copy constructor
+ public TimelineMetric(TimelineMetric metric) {
+ setMetricName(metric.getMetricName());
+ setType(metric.getType());
+ setTimestamp(metric.getTimestamp());
+ setAppId(metric.getAppId());
+ setInstanceId(metric.getInstanceId());
+ setHostName(metric.getHostName());
+ setStartTime(metric.getStartTime());
+ setMetricValues(new TreeMap<Long, Double>(metric.getMetricValues()));
+ }
+
@XmlElement(name = "metricname")
public String getMetricName() {
return metricName;
http://git-wip-us.apache.org/repos/asf/ambari/blob/b4ee1918/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorMinute.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorMinute.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorMinute.java
index fdcd720..85bdbbc 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorMinute.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorMinute.java
@@ -227,13 +227,6 @@ public class TimelineMetricClusterAggregatorMinute extends AbstractTimelineAggre
sum = oldValue + metric.getValue();
}
timelineClusterMetricMap.put(clusterMetric, (sum / count));
- } else {
- if (timelineMetric.getMetricName().equals("tserver.general.entries")) {
- LOG.info("--- Fallen off: serverTs = " + timelineMetric.getTimestamp() +
- ", timeShift: " + timeShift +
- ", timestamp: " + Long.parseLong(metric.getKey().toString()) +
- ", host = " + timelineMetric.getHostName());
- }
}
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/b4ee1918/ambari-server/pom.xml
----------------------------------------------------------------------
diff --git a/ambari-server/pom.xml b/ambari-server/pom.xml
index 5725446..068b841 100644
--- a/ambari-server/pom.xml
+++ b/ambari-server/pom.xml
@@ -1958,6 +1958,11 @@
<artifactId>jackson-annotations</artifactId>
<version>2.1.4</version>
</dependency>
+ <dependency>
+ <groupId>net.sf.ehcache</groupId>
+ <artifactId>ehcache</artifactId>
+ <version>2.10.0</version>
+ </dependency>
</dependencies>
<pluginRepositories>
http://git-wip-us.apache.org/repos/asf/ambari/blob/b4ee1918/ambari-server/src/main/java/org/apache/ambari/server/configuration/Configuration.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/configuration/Configuration.java b/ambari-server/src/main/java/org/apache/ambari/server/configuration/Configuration.java
index 371d5d2..23291bc 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/configuration/Configuration.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/configuration/Configuration.java
@@ -407,6 +407,23 @@ public class Configuration {
private static final String DEFAULT_JDBC_POOL_ACQUISITION_RETRY_ATTEMPTS = "30";
private static final String DEFAULT_JDBC_POOL_ACQUISITION_RETRY_DELAY = "1000";
+ // Timeline Metrics Cache settings
+ private static final String TIMELINE_METRICS_CACHE_DISABLE = "server.timeline.metrics.cache.disabled";
+ private static final String TIMELINE_METRICS_CACHE_MAX_ENTRIES = "server.timeline.metrics.cache.max.entries";
+ private static final String DEFAULT_TIMELINE_METRICS_CACHE_MAX_ENTRIES = "50";
+ private static final String TIMELINE_METRICS_CACHE_TTL = "server.timeline.metrics.cache.entry.ttl.seconds";
+ private static final String DEFAULT_TIMELINE_METRICS_CACHE_TTL = "3600";
+ private static final String TIMELINE_METRICS_CACHE_IDLE_TIME = "server.timeline.metrics.cache.entry.idle.seconds";
+ private static final String DEFAULT_TIMELINE_METRICS_CACHE_IDLE_TIME = "300";
+ private static final String TIMELINE_METRICS_REQUEST_READ_TIMEOUT = "server.timeline.metrics.cache.read.timeout.millis";
+ private static final String DEFAULT_TIMELINE_METRICS_REQUEST_READ_TIMEOUT = "10000";
+ private static final String TIMELINE_METRICS_REQUEST_INTERVAL_READ_TIMEOUT = "server.timeline.metrics.cache.interval.read.timeout.millis";
+ private static final String DEFAULT_TIMELINE_METRICS_REQUEST_INTERVAL_READ_TIMEOUT = "5000";
+ private static final String TIMELINE_METRICS_REQUEST_CONNECT_TIMEOUT = "server.timeline.metrics.cache.connect.timeout.millis";
+ private static final String DEFAULT_TIMELINE_METRICS_REQUEST_CONNECT_TIMEOUT = "5000";
+ private static final String TIMELINE_METRICS_REQUEST_CATCHUP_INTERVAL = "server.timeline.metrics.cache.catchup.interval";
+ private static final String DEFAULT_TIMELINE_METRICS_REQUEST_CATCHUP_INTERVAL = "300000";
+
/**
* The full path to the XML file that describes the different alert templates.
*/
@@ -679,6 +696,15 @@ public class Configuration {
}
/**
+ * Get the property value for the given key.
+ *
+ * @return the property value
+ */
+ public String getProperty(String key, String defaultValue) {
+ return properties.getProperty(key, defaultValue);
+ }
+
+ /**
* Gets a copy of all of the configuration properties that back this
* {@link Configuration} instance.
*
@@ -1001,7 +1027,7 @@ public class Configuration {
*/
public String getApiGzipMinSize() {
return properties.getProperty(API_GZIP_MIN_COMPRESSION_SIZE_KEY,
- API_GZIP_MIN_COMPRESSION_SIZE_DEFAULT);
+ API_GZIP_MIN_COMPRESSION_SIZE_DEFAULT);
}
/**
@@ -1244,7 +1270,7 @@ public class Configuration {
public int getConnectionMaxIdleTime() {
return Integer.parseInt(properties.getProperty
- (SERVER_CONNECTION_MAX_IDLE_TIME, String.valueOf("900000")));
+ (SERVER_CONNECTION_MAX_IDLE_TIME, String.valueOf("900000")));
}
/**
@@ -1283,7 +1309,7 @@ public class Configuration {
public int getOneWayAuthPort() {
return Integer.parseInt(properties.getProperty(SRVR_ONE_WAY_SSL_PORT_KEY,
- String.valueOf(SRVR_ONE_WAY_SSL_PORT_DEFAULT)));
+ String.valueOf(SRVR_ONE_WAY_SSL_PORT_DEFAULT)));
}
public int getTwoWayAuthPort() {
@@ -1376,7 +1402,7 @@ public class Configuration {
public Integer getRequestReadTimeout() {
return Integer.parseInt(properties.getProperty(REQUEST_READ_TIMEOUT,
- REQUEST_READ_TIMEOUT_DEFAULT));
+ REQUEST_READ_TIMEOUT_DEFAULT));
}
public Integer getRequestConnectTimeout() {
@@ -1386,7 +1412,7 @@ public class Configuration {
public String getExecutionSchedulerConnections() {
return properties.getProperty(EXECUTION_SCHEDULER_CONNECTIONS,
- DEFAULT_SCHEDULER_MAX_CONNECTIONS);
+ DEFAULT_SCHEDULER_MAX_CONNECTIONS);
}
public Long getExecutionSchedulerMisfireToleration() {
@@ -1412,7 +1438,7 @@ public class Configuration {
public String getCustomActionDefinitionPath() {
return properties.getProperty(CUSTOM_ACTION_DEFINITION_KEY,
- CUSTOM_ACTION_DEFINITION_DEF_VALUE);
+ CUSTOM_ACTION_DEFINITION_DEF_VALUE);
}
public int getAgentPackageParallelCommandsLimit() {
@@ -1461,7 +1487,7 @@ public class Configuration {
*/
public int getClientThreadPoolSize() {
return Integer.parseInt(properties.getProperty(
- CLIENT_THREADPOOL_SIZE_KEY, String.valueOf(CLIENT_THREADPOOL_SIZE_DEFAULT)));
+ CLIENT_THREADPOOL_SIZE_KEY, String.valueOf(CLIENT_THREADPOOL_SIZE_DEFAULT)));
}
/**
@@ -1499,7 +1525,7 @@ public class Configuration {
*/
public long getViewExtractionThreadPoolTimeout() {
return Long.parseLong(properties.getProperty(
- VIEW_EXTRACTION_THREADPOOL_TIMEOUT_KEY, String.valueOf(VIEW_EXTRACTION_THREADPOOL_TIMEOUT_DEFAULT)));
+ VIEW_EXTRACTION_THREADPOOL_TIMEOUT_KEY, String.valueOf(VIEW_EXTRACTION_THREADPOOL_TIMEOUT_DEFAULT)));
}
/**
@@ -1512,7 +1538,7 @@ public class Configuration {
*/
public int getHttpSessionInactiveTimeout() {
return Integer.parseInt(properties.getProperty(
- SERVER_HTTP_SESSION_INACTIVE_TIMEOUT,
+ SERVER_HTTP_SESSION_INACTIVE_TIMEOUT,
"1800"));
}
@@ -1531,7 +1557,7 @@ public class Configuration {
*/
public int getAlertEventPublisherPoolSize() {
return Integer.parseInt(properties.getProperty(
- ALERTS_EXECUTION_SCHEDULER_THREADS_KEY, ALERTS_EXECUTION_SCHEDULER_THREADS_DEFAULT));
+ ALERTS_EXECUTION_SCHEDULER_THREADS_KEY, ALERTS_EXECUTION_SCHEDULER_THREADS_DEFAULT));
}
/**
@@ -1594,7 +1620,7 @@ public class Configuration {
*/
public int getKdcConnectionCheckTimeout() {
return Integer.parseInt(properties.getProperty(
- KDC_CONNECTION_CHECK_TIMEOUT_KEY, KDC_CONNECTION_CHECK_TIMEOUT_DEFAULT));
+ KDC_CONNECTION_CHECK_TIMEOUT_KEY, KDC_CONNECTION_CHECK_TIMEOUT_DEFAULT));
}
/**
@@ -1779,4 +1805,73 @@ public class Configuration {
}
}
+ /**
+ * Max allowed entries in metrics cache.
+ */
+ public int getMetricCacheMaxEntries() {
+ return Integer.parseInt(properties.getProperty(TIMELINE_METRICS_CACHE_MAX_ENTRIES,
+ DEFAULT_TIMELINE_METRICS_CACHE_MAX_ENTRIES));
+ }
+
+ /**
+ * Eviction time for entries in metrics cache.
+ */
+ public int getMetricCacheTTLSeconds() {
+ return Integer.parseInt(properties.getProperty(TIMELINE_METRICS_CACHE_TTL,
+ DEFAULT_TIMELINE_METRICS_CACHE_TTL));
+ }
+
+ /**
+ * Max time to idle for entries in the cache.
+ */
+ public int getMetricCacheIdleSeconds() {
+ return Integer.parseInt(properties.getProperty(TIMELINE_METRICS_CACHE_IDLE_TIME,
+ DEFAULT_TIMELINE_METRICS_CACHE_IDLE_TIME));
+ }
+
+ /**
+ * Separate timeout settings for metrics cache.
+ * @return milliseconds
+ */
+ public int getMetricsRequestReadTimeoutMillis() {
+ return Integer.parseInt(properties.getProperty(TIMELINE_METRICS_REQUEST_READ_TIMEOUT,
+ DEFAULT_TIMELINE_METRICS_REQUEST_READ_TIMEOUT));
+ }
+
+ /**
+ * Separate timeout settings for metrics cache.
+ * Timeout on reads for update requests made for smaller time intervals.
+ *
+ * @return milliseconds
+ */
+ public int getMetricsRequestIntervalReadTimeoutMillis() {
+ return Integer.parseInt(properties.getProperty(TIMELINE_METRICS_REQUEST_INTERVAL_READ_TIMEOUT,
+ DEFAULT_TIMELINE_METRICS_REQUEST_INTERVAL_READ_TIMEOUT));
+ }
+
+ /**
+ * Separate timeout settings for metrics cache.
+ * @return milliseconds
+ */
+ public int getMetricsRequestConnectTimeoutMillis() {
+ return Integer.parseInt(properties.getProperty(TIMELINE_METRICS_REQUEST_CONNECT_TIMEOUT,
+ DEFAULT_TIMELINE_METRICS_REQUEST_CONNECT_TIMEOUT));
+ }
+
+ /**
+ * Diable metrics caching.
+ * @return true / false
+ */
+ public boolean isMetricsCacheDisabled() {
+ return Boolean.parseBoolean(properties.getProperty(TIMELINE_METRICS_CACHE_DISABLE, "false"));
+ }
+
+ /**
+ * Constant fudge factor subtracted from the cache update requests to
+ * account for unavailability of data on the trailing edge due to buffering.
+ */
+ public Long getMetricRequestBufferTimeCatchupInterval() {
+ return Long.parseLong(properties.getProperty(TIMELINE_METRICS_REQUEST_CATCHUP_INTERVAL,
+ DEFAULT_TIMELINE_METRICS_REQUEST_CATCHUP_INTERVAL));
+ }
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/b4ee1918/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariManagementController.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariManagementController.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariManagementController.java
index 17b6d4a..0eef06c 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariManagementController.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariManagementController.java
@@ -23,7 +23,9 @@ import org.apache.ambari.server.RoleCommand;
import org.apache.ambari.server.actionmanager.ActionManager;
import org.apache.ambari.server.agent.ExecutionCommand;
import org.apache.ambari.server.api.services.AmbariMetaInfo;
+import org.apache.ambari.server.configuration.Configuration;
import org.apache.ambari.server.controller.internal.RequestStageContainer;
+import org.apache.ambari.server.controller.metrics.timeline.cache.TimelineMetricCacheProvider;
import org.apache.ambari.server.metadata.RoleCommandOrder;
import org.apache.ambari.server.scheduler.ExecutionScheduleManager;
import org.apache.ambari.server.security.ldap.LdapBatchDto;
@@ -782,5 +784,7 @@ public interface AmbariManagementController {
* @return
*/
Set<StackConfigurationDependencyResponse> getStackConfigurationDependencies(Set<StackConfigurationDependencyRequest> requests) throws AmbariException;
+
+ TimelineMetricCacheProvider getTimelineMetricCacheProvider();
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/b4ee1918/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariManagementControllerImpl.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariManagementControllerImpl.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariManagementControllerImpl.java
index a7f206a..ef6fc58 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariManagementControllerImpl.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariManagementControllerImpl.java
@@ -89,6 +89,7 @@ import org.apache.ambari.server.controller.internal.RequestStageContainer;
import org.apache.ambari.server.controller.internal.URLStreamProvider;
import org.apache.ambari.server.controller.internal.WidgetLayoutResourceProvider;
import org.apache.ambari.server.controller.internal.WidgetResourceProvider;
+import org.apache.ambari.server.controller.metrics.timeline.cache.TimelineMetricCacheProvider;
import org.apache.ambari.server.controller.spi.Resource;
import org.apache.ambari.server.customactions.ActionDefinition;
import org.apache.ambari.server.metadata.ActionMetadata;
@@ -734,7 +735,7 @@ public class AmbariManagementControllerImpl implements AmbariManagementControlle
handleGlobalsBackwardsCompability(request, propertiesAttributes);
Config config = createConfig(cluster, request.getType(), request.getProperties(),
- request.getVersionTag(), propertiesAttributes);
+ request.getVersionTag(), propertiesAttributes);
return new ConfigurationResponse(cluster.getClusterName(), config);
}
@@ -782,7 +783,7 @@ public class AmbariManagementControllerImpl implements AmbariManagementControlle
private Config createConfig(Cluster cluster, String type, Map<String, String> properties,
String versionTag, Map<String, Map<String, String>> propertiesAttributes) {
Config config = configFactory.createNew(cluster, type,
- properties, propertiesAttributes);
+ properties, propertiesAttributes);
if (!StringUtils.isEmpty(versionTag)) {
config.setTag(versionTag);
@@ -889,10 +890,10 @@ public class AmbariManagementControllerImpl implements AmbariManagementControlle
String logDir = BASE_LOG_DIR + File.pathSeparator + requestId;
Stage stage =
stageFactory.createNew(requestId, logDir,
- null == cluster ? null : cluster.getClusterName(),
- null == cluster ? -1L : cluster.getClusterId(),
- requestContext, clusterHostInfo, commandParamsStage,
- hostParamsStage);
+ null == cluster ? null : cluster.getClusterName(),
+ null == cluster ? -1L : cluster.getClusterId(),
+ requestContext, clusterHostInfo, commandParamsStage,
+ hostParamsStage);
stage.setStageId(id);
return stage;
}
@@ -904,9 +905,9 @@ public class AmbariManagementControllerImpl implements AmbariManagementControlle
if (LOG.isDebugEnabled()) {
LOG.debug("Received a getClusters request"
- + ", clusterName=" + request.getClusterName()
- + ", clusterId=" + request.getClusterId()
- + ", stackInfo=" + request.getStackVersion());
+ + ", clusterName=" + request.getClusterName()
+ + ", clusterId=" + request.getClusterId()
+ + ", stackInfo=" + request.getStackVersion());
}
Cluster singleCluster = null;
@@ -2474,7 +2475,7 @@ public class AmbariManagementControllerImpl implements AmbariManagementControlle
// Add attributes
Map<String, Map<String, Map<String, String>>> configAttributes =
configHelper.getEffectiveConfigAttributes(cluster,
- ec.getConfigurationTags());
+ ec.getConfigurationTags());
for (Map.Entry<String, Map<String, Map<String, String>>> attributesOccurrence : configAttributes.entrySet()) {
String type = attributesOccurrence.getKey();
@@ -2606,7 +2607,7 @@ public class AmbariManagementControllerImpl implements AmbariManagementControlle
boolean runSmokeTest, boolean reconfigureClients) throws AmbariException {
RequestStageContainer request = addStages(null, cluster, requestProperties, requestParameters, changedServices,
- changedComponents, changedHosts, ignoredHosts, runSmokeTest, reconfigureClients);
+ changedComponents, changedHosts, ignoredHosts, runSmokeTest, reconfigureClients);
request.persist();
return request.getRequestStatusResponse();
@@ -3227,15 +3228,15 @@ public class AmbariManagementControllerImpl implements AmbariManagementControlle
final Resource.Type level)
throws AmbariException {
Set<String> ignoredHosts = maintenanceStateHelper.filterHostsInMaintenanceState(
- candidateHosts, new MaintenanceStateHelper.HostPredicate() {
- @Override
- public boolean shouldHostBeRemoved(final String hostname)
- throws AmbariException {
- Host host = clusters.getHost(hostname);
- return !maintenanceStateHelper.isOperationAllowed(
- host, cluster.getClusterId(), level);
- }
- }
+ candidateHosts, new MaintenanceStateHelper.HostPredicate() {
+ @Override
+ public boolean shouldHostBeRemoved(final String hostname)
+ throws AmbariException {
+ Host host = clusters.getHost(hostname);
+ return !maintenanceStateHelper.isOperationAllowed(
+ host, cluster.getClusterId(), level);
+ }
+ }
);
LOG.debug("Ignoring hosts when selecting available hosts for action" +
" due to maintenance state." +
@@ -4188,7 +4189,7 @@ public class AmbariManagementControllerImpl implements AmbariManagementControlle
String user, long createTime) {
List<WidgetEntity> createdEntities =
widgetDAO.findByName(clusterEntity.getClusterId(), layoutInfo.getWidgetName(),
- user, layoutInfo.getDefaultSectionName());
+ user, layoutInfo.getDefaultSectionName());
if (createdEntities == null || createdEntities.isEmpty()) {
WidgetEntity widgetEntity = new WidgetEntity();
@@ -4329,4 +4330,8 @@ public class AmbariManagementControllerImpl implements AmbariManagementControlle
}
}
+ @Override
+ public TimelineMetricCacheProvider getTimelineMetricCacheProvider() {
+ return injector.getInstance(TimelineMetricCacheProvider.class);
+ }
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/b4ee1918/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariServer.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariServer.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariServer.java
index 5644ca5..3c598db 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariServer.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariServer.java
@@ -63,6 +63,7 @@ import org.apache.ambari.server.controller.internal.StackDefinedPropertyProvider
import org.apache.ambari.server.controller.internal.StackDependencyResourceProvider;
import org.apache.ambari.server.controller.internal.UserPrivilegeResourceProvider;
import org.apache.ambari.server.controller.internal.ViewPermissionResourceProvider;
+import org.apache.ambari.server.controller.metrics.timeline.cache.TimelineMetricCacheProvider;
import org.apache.ambari.server.controller.utilities.DatabaseChecker;
import org.apache.ambari.server.orm.GuiceJpaInitializer;
import org.apache.ambari.server.orm.PersistenceType;
@@ -649,7 +650,7 @@ public class AmbariServer {
StageUtils.setGson(injector.getInstance(Gson.class));
StageUtils.setTopologyManager(injector.getInstance(TopologyManager.class));
WorkflowJsonService.setDBProperties(
- injector.getInstance(Configuration.class));
+ injector.getInstance(Configuration.class));
SecurityFilter.init(injector.getInstance(Configuration.class));
StackDefinedPropertyProvider.init(injector);
AbstractControllerResourceProvider.init(injector.getInstance(ResourceProviderFactory.class));
http://git-wip-us.apache.org/repos/asf/ambari/blob/b4ee1918/ambari-server/src/main/java/org/apache/ambari/server/controller/ControllerModule.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/ControllerModule.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/ControllerModule.java
index e048ec5..a40fae6 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/controller/ControllerModule.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/ControllerModule.java
@@ -68,6 +68,8 @@ import org.apache.ambari.server.controller.internal.HostResourceProvider;
import org.apache.ambari.server.controller.internal.MemberResourceProvider;
import org.apache.ambari.server.controller.internal.RepositoryVersionResourceProvider;
import org.apache.ambari.server.controller.internal.ServiceResourceProvider;
+import org.apache.ambari.server.controller.metrics.timeline.cache.TimelineMetricCacheEntryFactory;
+import org.apache.ambari.server.controller.metrics.timeline.cache.TimelineMetricCacheProvider;
import org.apache.ambari.server.controller.spi.ResourceProvider;
import org.apache.ambari.server.controller.utilities.DatabaseChecker;
import org.apache.ambari.server.notifications.DispatchFactory;
@@ -329,6 +331,8 @@ public class ControllerModule extends AbstractModule {
bind(ExecutionScheduler.class).to(ExecutionSchedulerImpl.class);
bind(DBAccessor.class).to(DBAccessorImpl.class);
bind(ViewInstanceHandlerList.class).to(AmbariHandlerList.class);
+ bind(TimelineMetricCacheProvider.class);
+ bind(TimelineMetricCacheEntryFactory.class);
requestStaticInjection(ExecutionCommandWrapper.class);
requestStaticInjection(DatabaseChecker.class);
http://git-wip-us.apache.org/repos/asf/ambari/blob/b4ee1918/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/AbstractProviderModule.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/AbstractProviderModule.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/AbstractProviderModule.java
index 380a0fe..6e3dcd5 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/AbstractProviderModule.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/AbstractProviderModule.java
@@ -34,6 +34,8 @@ import org.apache.ambari.server.controller.metrics.MetricHostProvider;
import org.apache.ambari.server.controller.metrics.MetricsPropertyProvider;
import org.apache.ambari.server.controller.metrics.MetricsReportPropertyProvider;
import org.apache.ambari.server.controller.metrics.MetricsServiceProvider;
+import org.apache.ambari.server.controller.metrics.timeline.cache.TimelineMetricCacheEntryFactory;
+import org.apache.ambari.server.controller.metrics.timeline.cache.TimelineMetricCacheProvider;
import org.apache.ambari.server.controller.spi.NoSuchParentResourceException;
import org.apache.ambari.server.controller.spi.NoSuchResourceException;
import org.apache.ambari.server.controller.spi.Predicate;
@@ -164,6 +166,9 @@ public abstract class AbstractProviderModule implements ProviderModule,
@Inject
AmbariManagementController managementController;
+ @Inject
+ TimelineMetricCacheProvider metricCacheProvider;
+
/**
* The map of host components.
*/
@@ -200,6 +205,9 @@ public abstract class AbstractProviderModule implements ProviderModule,
if (managementController == null) {
managementController = AmbariServer.getController();
}
+ if (metricCacheProvider == null) {
+ metricCacheProvider = managementController.getTimelineMetricCacheProvider();
+ }
}
@@ -884,7 +892,8 @@ public abstract class AbstractProviderModule implements ProviderModule,
return MetricsReportPropertyProvider.createInstance(
PropertyHelper.getMetricPropertyIds(type), streamProvider,
- configuration, hostProvider, serviceProvider, clusterNamePropertyId);
+ configuration, metricCacheProvider, hostProvider, serviceProvider,
+ clusterNamePropertyId);
}
/**
@@ -899,7 +908,7 @@ public abstract class AbstractProviderModule implements ProviderModule,
String hostNamePropertyId) {
return MetricsPropertyProvider.createInstance(type,
PropertyHelper.getMetricPropertyIds(type), streamProvider, configuration,
- hostProvider, serviceProvider, clusterNamePropertyId,
+ metricCacheProvider, hostProvider, serviceProvider, clusterNamePropertyId,
hostNamePropertyId, null);
}
@@ -915,7 +924,8 @@ public abstract class AbstractProviderModule implements ProviderModule,
String componentNamePropertyId) {
return MetricsPropertyProvider.createInstance(type,
PropertyHelper.getMetricPropertyIds(type), streamProvider, configuration,
- hostProvider, serviceProvider, clusterNamePropertyId, null,
+ metricCacheProvider, hostProvider, serviceProvider,
+ clusterNamePropertyId, null,
componentNamePropertyId);
}
@@ -934,8 +944,8 @@ public abstract class AbstractProviderModule implements ProviderModule,
return MetricsPropertyProvider.createInstance(type,
PropertyHelper.getMetricPropertyIds(type), streamProvider, configuration,
- hostProvider, serviceProvider, clusterNamePropertyId, hostNamePropertyId,
- componentNamePropertyId);
+ metricCacheProvider, hostProvider, serviceProvider, clusterNamePropertyId,
+ hostNamePropertyId, componentNamePropertyId);
}
@Override
http://git-wip-us.apache.org/repos/asf/ambari/blob/b4ee1918/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/StackDefinedPropertyProvider.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/StackDefinedPropertyProvider.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/StackDefinedPropertyProvider.java
index 3a6d30b..d0d597e 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/StackDefinedPropertyProvider.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/StackDefinedPropertyProvider.java
@@ -26,6 +26,7 @@ import org.apache.ambari.server.controller.jmx.JMXPropertyProvider;
import org.apache.ambari.server.controller.metrics.MetricHostProvider;
import org.apache.ambari.server.controller.metrics.MetricsPropertyProvider;
import org.apache.ambari.server.controller.metrics.MetricsServiceProvider;
+import org.apache.ambari.server.controller.metrics.timeline.cache.TimelineMetricCacheProvider;
import org.apache.ambari.server.controller.spi.Predicate;
import org.apache.ambari.server.controller.spi.PropertyProvider;
import org.apache.ambari.server.controller.spi.Request;
@@ -81,6 +82,7 @@ public class StackDefinedPropertyProvider implements PropertyProvider {
private final MetricHostProvider metricHostProvider;
private final MetricsServiceProvider metricsServiceProvider;
private MetricsService metricsService = MetricsService.GANGLIA;
+ private TimelineMetricCacheProvider cacheProvider;
/**
* PropertyHelper/AbstractPropertyProvider expect map of maps,
@@ -126,6 +128,7 @@ public class StackDefinedPropertyProvider implements PropertyProvider {
this.streamProvider = streamProvider;
defaultJmx = defaultJmxPropertyProvider;
defaultGanglia = defaultGangliaPropertyProvider;
+ cacheProvider = injector.getInstance(TimelineMetricCacheProvider.class);
}
@@ -147,6 +150,7 @@ public class StackDefinedPropertyProvider implements PropertyProvider {
jmxStatePropertyId, defaultJmxPropertyProvider, defaultGangliaPropertyProvider);
this.metricsService = metricsService;
+ cacheProvider = injector.getInstance(TimelineMetricCacheProvider.class);
}
@@ -201,7 +205,9 @@ public class StackDefinedPropertyProvider implements PropertyProvider {
if (gangliaMap.size() > 0) {
PropertyProvider propertyProvider =
MetricsPropertyProvider.createInstance(type, gangliaMap,
- streamProvider, sslConfig, metricHostProvider,
+ streamProvider, sslConfig,
+ cacheProvider,
+ metricHostProvider,
metricsServiceProvider, clusterNamePropertyId,
hostNamePropertyId, componentNamePropertyId);
http://git-wip-us.apache.org/repos/asf/ambari/blob/b4ee1918/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/TemporalInfoImpl.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/TemporalInfoImpl.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/TemporalInfoImpl.java
index 2ffe984..4083152 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/TemporalInfoImpl.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/TemporalInfoImpl.java
@@ -20,6 +20,8 @@ package org.apache.ambari.server.controller.internal;
import org.apache.ambari.server.controller.spi.TemporalInfo;
+import java.util.Date;
+
/**
* Temporal query data.
*/
@@ -74,6 +76,15 @@ public class TemporalInfoImpl implements TemporalInfo {
}
@Override
+ public String toString() {
+ return "TemporalInfoImpl{" +
+ "m_startTime = " + new Date(getStartTimeMillis()) +
+ ", m_endTime = " + new Date(getEndTimeMillis()) +
+ ", m_step = " + m_step +
+ '}';
+ }
+
+ @Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
@@ -82,7 +93,6 @@ public class TemporalInfoImpl implements TemporalInfo {
return m_endTime == that.m_endTime &&
m_startTime == that.m_startTime &&
m_step == that.m_step;
-
}
@Override
http://git-wip-us.apache.org/repos/asf/ambari/blob/b4ee1918/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/MetricsPaddingMethod.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/MetricsPaddingMethod.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/MetricsPaddingMethod.java
index 32d214d..522c0bd 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/MetricsPaddingMethod.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/MetricsPaddingMethod.java
@@ -20,6 +20,7 @@ package org.apache.ambari.server.controller.metrics;
import org.apache.ambari.server.controller.spi.TemporalInfo;
import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
import java.util.Iterator;
+import java.util.Map;
import java.util.TreeMap;
public class MetricsPaddingMethod {
@@ -51,8 +52,15 @@ public class MetricsPaddingMethod {
return;
}
- // TODO: JSON dser returns LinkedHashMap that is not Navigable
- TreeMap<Long, Double> values = new TreeMap<Long, Double>(metric.getMetricValues());
+ TreeMap<Long, Double> values;
+ Map<Long, Double> metricValuesMap = metric.getMetricValues();
+ if (metricValuesMap instanceof TreeMap) {
+ values = (TreeMap<Long, Double>) metricValuesMap;
+ }
+ else {
+ // JSON dser returns LinkedHashMap that is not Navigable
+ values = new TreeMap<Long, Double>(metricValuesMap);
+ }
long dataInterval = getTimelineMetricInterval(values);
http://git-wip-us.apache.org/repos/asf/ambari/blob/b4ee1918/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/MetricsPropertyProvider.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/MetricsPropertyProvider.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/MetricsPropertyProvider.java
index 9fa9ca4..ad35444 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/MetricsPropertyProvider.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/MetricsPropertyProvider.java
@@ -20,6 +20,7 @@ package org.apache.ambari.server.controller.metrics;
import org.apache.ambari.server.configuration.ComponentSSLConfiguration;
import org.apache.ambari.server.controller.internal.AbstractPropertyProvider;
import org.apache.ambari.server.controller.internal.PropertyInfo;
+import org.apache.ambari.server.controller.metrics.timeline.cache.TimelineMetricCacheProvider;
import org.apache.ambari.server.controller.spi.Predicate;
import org.apache.ambari.server.controller.spi.Request;
import org.apache.ambari.server.controller.spi.Resource;
@@ -81,6 +82,7 @@ public abstract class MetricsPropertyProvider extends AbstractPropertyProvider {
Map<String, Map<String, PropertyInfo>> componentPropertyInfoMap,
StreamProvider streamProvider,
ComponentSSLConfiguration configuration,
+ TimelineMetricCacheProvider cacheProvider,
MetricHostProvider hostProvider,
MetricsServiceProvider serviceProvider,
String clusterNamePropertyId,
@@ -93,6 +95,7 @@ public abstract class MetricsPropertyProvider extends AbstractPropertyProvider {
componentPropertyInfoMap,
streamProvider,
configuration,
+ cacheProvider,
hostProvider,
serviceProvider,
clusterNamePropertyId,
http://git-wip-us.apache.org/repos/asf/ambari/blob/b4ee1918/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/MetricsPropertyProviderProxy.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/MetricsPropertyProviderProxy.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/MetricsPropertyProviderProxy.java
index 57a8e7d..fb74fab 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/MetricsPropertyProviderProxy.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/MetricsPropertyProviderProxy.java
@@ -28,6 +28,7 @@ import org.apache.ambari.server.controller.metrics.timeline.AMSComponentProperty
import org.apache.ambari.server.controller.metrics.timeline.AMSHostComponentPropertyProvider;
import org.apache.ambari.server.controller.metrics.timeline.AMSHostPropertyProvider;
import org.apache.ambari.server.controller.metrics.timeline.AMSPropertyProvider;
+import org.apache.ambari.server.controller.metrics.timeline.cache.TimelineMetricCacheProvider;
import org.apache.ambari.server.controller.spi.Predicate;
import org.apache.ambari.server.controller.spi.Request;
import org.apache.ambari.server.controller.spi.Resource;
@@ -46,12 +47,14 @@ public class MetricsPropertyProviderProxy extends AbstractPropertyProvider {
private final MetricsServiceProvider metricsServiceProvider;
private AMSPropertyProvider amsPropertyProvider;
private GangliaPropertyProvider gangliaPropertyProvider;
+ private TimelineMetricCacheProvider cacheProvider;
public MetricsPropertyProviderProxy(
InternalType type,
Map<String, Map<String, PropertyInfo>> componentPropertyInfoMap,
StreamProvider streamProvider,
ComponentSSLConfiguration configuration,
+ TimelineMetricCacheProvider cacheProvider,
MetricHostProvider hostProvider,
MetricsServiceProvider serviceProvider,
String clusterNamePropertyId,
@@ -60,6 +63,7 @@ public class MetricsPropertyProviderProxy extends AbstractPropertyProvider {
super(componentPropertyInfoMap);
this.metricsServiceProvider = serviceProvider;
+ this.cacheProvider = cacheProvider;
switch (type) {
case Host:
@@ -108,6 +112,7 @@ public class MetricsPropertyProviderProxy extends AbstractPropertyProvider {
this.amsPropertyProvider = new AMSHostPropertyProvider(componentPropertyInfoMap,
streamProvider,
configuration,
+ cacheProvider,
hostProvider,
clusterNamePropertyId,
hostNamePropertyId);
@@ -132,6 +137,7 @@ public class MetricsPropertyProviderProxy extends AbstractPropertyProvider {
componentPropertyInfoMap,
streamProvider,
configuration,
+ cacheProvider,
hostProvider,
clusterNamePropertyId,
hostNamePropertyId,
@@ -158,6 +164,7 @@ public class MetricsPropertyProviderProxy extends AbstractPropertyProvider {
componentPropertyInfoMap,
streamProvider,
configuration,
+ cacheProvider,
hostProvider,
clusterNamePropertyId,
componentNamePropertyId);
http://git-wip-us.apache.org/repos/asf/ambari/blob/b4ee1918/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/MetricsReportPropertyProvider.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/MetricsReportPropertyProvider.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/MetricsReportPropertyProvider.java
index 5399436..54857cc 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/MetricsReportPropertyProvider.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/MetricsReportPropertyProvider.java
@@ -20,6 +20,7 @@ package org.apache.ambari.server.controller.metrics;
import org.apache.ambari.server.configuration.ComponentSSLConfiguration;
import org.apache.ambari.server.controller.internal.AbstractPropertyProvider;
import org.apache.ambari.server.controller.internal.PropertyInfo;
+import org.apache.ambari.server.controller.metrics.timeline.cache.TimelineMetricCacheProvider;
import org.apache.ambari.server.controller.utilities.StreamProvider;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -63,6 +64,7 @@ public abstract class MetricsReportPropertyProvider extends AbstractPropertyProv
Map<String, Map<String, PropertyInfo>> componentPropertyInfoMap,
StreamProvider streamProvider,
ComponentSSLConfiguration configuration,
+ TimelineMetricCacheProvider cacheProvider,
MetricHostProvider hostProvider,
MetricsServiceProvider serviceProvider,
String clusterNamePropertyId) {
@@ -70,6 +72,7 @@ public abstract class MetricsReportPropertyProvider extends AbstractPropertyProv
return new MetricsReportPropertyProviderProxy(componentPropertyInfoMap,
streamProvider,
configuration,
+ cacheProvider,
hostProvider,
serviceProvider,
clusterNamePropertyId);
http://git-wip-us.apache.org/repos/asf/ambari/blob/b4ee1918/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/MetricsReportPropertyProviderProxy.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/MetricsReportPropertyProviderProxy.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/MetricsReportPropertyProviderProxy.java
index a92cb37..9b23686 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/MetricsReportPropertyProviderProxy.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/MetricsReportPropertyProviderProxy.java
@@ -22,6 +22,7 @@ import org.apache.ambari.server.controller.internal.AbstractPropertyProvider;
import org.apache.ambari.server.controller.internal.PropertyInfo;
import org.apache.ambari.server.controller.metrics.ganglia.GangliaReportPropertyProvider;
import org.apache.ambari.server.controller.metrics.timeline.AMSReportPropertyProvider;
+import org.apache.ambari.server.controller.metrics.timeline.cache.TimelineMetricCacheProvider;
import org.apache.ambari.server.controller.spi.Predicate;
import org.apache.ambari.server.controller.spi.Request;
import org.apache.ambari.server.controller.spi.Resource;
@@ -39,11 +40,13 @@ public class MetricsReportPropertyProviderProxy extends AbstractPropertyProvider
private MetricsReportPropertyProvider amsMetricsReportProvider;
private MetricsReportPropertyProvider gangliaMetricsReportProvider;
private final MetricsServiceProvider metricsServiceProvider;
+ private TimelineMetricCacheProvider cacheProvider;
public MetricsReportPropertyProviderProxy(
Map<String, Map<String, PropertyInfo>> componentPropertyInfoMap,
StreamProvider streamProvider,
ComponentSSLConfiguration configuration,
+ TimelineMetricCacheProvider cacheProvider,
MetricHostProvider hostProvider,
MetricsServiceProvider serviceProvider,
String clusterNamePropertyId) {
@@ -51,6 +54,7 @@ public class MetricsReportPropertyProviderProxy extends AbstractPropertyProvider
super(componentPropertyInfoMap);
this.metricsServiceProvider = serviceProvider;
+ this.cacheProvider = cacheProvider;
createReportPropertyProviders(componentPropertyInfoMap,
streamProvider,
@@ -69,6 +73,7 @@ public class MetricsReportPropertyProviderProxy extends AbstractPropertyProvider
componentPropertyInfoMap,
streamProvider,
configuration,
+ cacheProvider,
hostProvider,
clusterNamePropertyId);
http://git-wip-us.apache.org/repos/asf/ambari/blob/b4ee1918/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/AMSComponentPropertyProvider.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/AMSComponentPropertyProvider.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/AMSComponentPropertyProvider.java
index d5f415a..cc0219c 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/AMSComponentPropertyProvider.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/AMSComponentPropertyProvider.java
@@ -20,6 +20,7 @@ package org.apache.ambari.server.controller.metrics.timeline;
import org.apache.ambari.server.configuration.ComponentSSLConfiguration;
import org.apache.ambari.server.controller.internal.PropertyInfo;
import org.apache.ambari.server.controller.metrics.MetricHostProvider;
+import org.apache.ambari.server.controller.metrics.timeline.cache.TimelineMetricCacheProvider;
import org.apache.ambari.server.controller.spi.Resource;
import org.apache.ambari.server.controller.utilities.StreamProvider;
@@ -30,12 +31,13 @@ public class AMSComponentPropertyProvider extends AMSPropertyProvider {
public AMSComponentPropertyProvider(Map<String, Map<String, PropertyInfo>> componentPropertyInfoMap,
StreamProvider streamProvider,
ComponentSSLConfiguration configuration,
+ TimelineMetricCacheProvider cacheProvider,
MetricHostProvider hostProvider,
String clusterNamePropertyId,
String componentNamePropertyId) {
- super(componentPropertyInfoMap, streamProvider, configuration, hostProvider,
- clusterNamePropertyId, null, componentNamePropertyId);
+ super(componentPropertyInfoMap, streamProvider, configuration,
+ cacheProvider, hostProvider, clusterNamePropertyId, null, componentNamePropertyId);
}
@Override
http://git-wip-us.apache.org/repos/asf/ambari/blob/b4ee1918/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/AMSHostComponentPropertyProvider.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/AMSHostComponentPropertyProvider.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/AMSHostComponentPropertyProvider.java
index 8ee2acb..0e71049 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/AMSHostComponentPropertyProvider.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/AMSHostComponentPropertyProvider.java
@@ -20,6 +20,7 @@ package org.apache.ambari.server.controller.metrics.timeline;
import org.apache.ambari.server.configuration.ComponentSSLConfiguration;
import org.apache.ambari.server.controller.internal.PropertyInfo;
import org.apache.ambari.server.controller.metrics.MetricHostProvider;
+import org.apache.ambari.server.controller.metrics.timeline.cache.TimelineMetricCacheProvider;
import org.apache.ambari.server.controller.spi.Resource;
import org.apache.ambari.server.controller.utilities.StreamProvider;
import java.util.Map;
@@ -29,13 +30,15 @@ public class AMSHostComponentPropertyProvider extends AMSPropertyProvider {
public AMSHostComponentPropertyProvider(Map<String, Map<String, PropertyInfo>> componentPropertyInfoMap,
StreamProvider streamProvider,
ComponentSSLConfiguration configuration,
+ TimelineMetricCacheProvider cacheProvider,
MetricHostProvider hostProvider,
String clusterNamePropertyId,
String hostNamePropertyId,
String componentNamePropertyId) {
- super(componentPropertyInfoMap, streamProvider, configuration, hostProvider,
- clusterNamePropertyId, hostNamePropertyId, componentNamePropertyId);
+ super(componentPropertyInfoMap, streamProvider, configuration,
+ cacheProvider, hostProvider, clusterNamePropertyId, hostNamePropertyId,
+ componentNamePropertyId);
}
@Override
http://git-wip-us.apache.org/repos/asf/ambari/blob/b4ee1918/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/AMSHostPropertyProvider.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/AMSHostPropertyProvider.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/AMSHostPropertyProvider.java
index ca9d685..85e016f 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/AMSHostPropertyProvider.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/AMSHostPropertyProvider.java
@@ -20,6 +20,7 @@ package org.apache.ambari.server.controller.metrics.timeline;
import org.apache.ambari.server.configuration.ComponentSSLConfiguration;
import org.apache.ambari.server.controller.internal.PropertyInfo;
import org.apache.ambari.server.controller.metrics.MetricHostProvider;
+import org.apache.ambari.server.controller.metrics.timeline.cache.TimelineMetricCacheProvider;
import org.apache.ambari.server.controller.spi.Resource;
import org.apache.ambari.server.controller.utilities.StreamProvider;
@@ -30,12 +31,13 @@ public class AMSHostPropertyProvider extends AMSPropertyProvider {
public AMSHostPropertyProvider(Map<String, Map<String, PropertyInfo>> componentPropertyInfoMap,
StreamProvider streamProvider,
ComponentSSLConfiguration configuration,
+ TimelineMetricCacheProvider cacheProvider,
MetricHostProvider hostProvider,
String clusterNamePropertyId,
String hostNamePropertyId) {
- super(componentPropertyInfoMap, streamProvider, configuration, hostProvider,
- clusterNamePropertyId, hostNamePropertyId, null);
+ super(componentPropertyInfoMap, streamProvider, configuration,
+ cacheProvider, hostProvider, clusterNamePropertyId, hostNamePropertyId, null);
}
@Override
http://git-wip-us.apache.org/repos/asf/ambari/blob/b4ee1918/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/AMSPropertyProvider.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/AMSPropertyProvider.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/AMSPropertyProvider.java
index 6667134..55b7349 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/AMSPropertyProvider.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/AMSPropertyProvider.java
@@ -17,6 +17,7 @@
*/
package org.apache.ambari.server.controller.metrics.timeline;
+import com.google.common.collect.Sets;
import org.apache.ambari.server.api.services.AmbariMetaInfo;
import org.apache.ambari.server.configuration.ComponentSSLConfiguration;
import org.apache.ambari.server.controller.AmbariManagementController;
@@ -24,6 +25,9 @@ import org.apache.ambari.server.controller.AmbariServer;
import org.apache.ambari.server.controller.internal.PropertyInfo;
import org.apache.ambari.server.controller.metrics.MetricHostProvider;
import org.apache.ambari.server.controller.metrics.MetricsPropertyProvider;
+import org.apache.ambari.server.controller.metrics.timeline.cache.TimelineAppMetricCacheKey;
+import org.apache.ambari.server.controller.metrics.timeline.cache.TimelineMetricCache;
+import org.apache.ambari.server.controller.metrics.timeline.cache.TimelineMetricCacheProvider;
import org.apache.ambari.server.controller.spi.Request;
import org.apache.ambari.server.controller.spi.Resource;
import org.apache.ambari.server.controller.spi.SystemException;
@@ -35,13 +39,8 @@ import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
import org.apache.http.client.utils.URIBuilder;
-import org.codehaus.jackson.map.AnnotationIntrospector;
-import org.codehaus.jackson.map.ObjectMapper;
-import org.codehaus.jackson.map.ObjectReader;
-import org.codehaus.jackson.xc.JaxbAnnotationIntrospector;
-import java.io.BufferedReader;
+
import java.io.IOException;
-import java.io.InputStreamReader;
import java.net.SocketTimeoutException;
import java.util.Collection;
import java.util.Collections;
@@ -51,33 +50,28 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.TreeMap;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
+
import static org.apache.ambari.server.Role.HBASE_MASTER;
import static org.apache.ambari.server.Role.HBASE_REGIONSERVER;
import static org.apache.ambari.server.Role.METRICS_COLLECTOR;
import static org.apache.ambari.server.controller.metrics.MetricsPaddingMethod.ZERO_PADDING_PARAM;
import static org.apache.ambari.server.controller.metrics.MetricsServiceProvider.MetricsService.TIMELINE_METRICS;
-import static org.codehaus.jackson.map.annotate.JsonSerialize.Inclusion;
public abstract class AMSPropertyProvider extends MetricsPropertyProvider {
- private static ObjectMapper mapper;
- private final static ObjectReader timelineObjectReader;
private static final String METRIC_REGEXP_PATTERN = "\\([^)]*\\)";
private static final int COLLECTOR_DEFAULT_PORT = 6188;
-
- static {
- mapper = new ObjectMapper();
- AnnotationIntrospector introspector = new JaxbAnnotationIntrospector();
- mapper.setAnnotationIntrospector(introspector);
- //noinspection deprecation
- mapper.getSerializationConfig().setSerializationInclusion(Inclusion.NON_NULL);
- timelineObjectReader = mapper.reader(TimelineMetrics.class);
- }
+ private final TimelineMetricCache metricCache;
+ private static AtomicInteger printSkipPopulateMsgHostCounter = new AtomicInteger(0);
+ private static AtomicInteger printSkipPopulateMsgHostCompCounter = new AtomicInteger(0);
public AMSPropertyProvider(Map<String, Map<String, PropertyInfo>> componentPropertyInfoMap,
StreamProvider streamProvider,
ComponentSSLConfiguration configuration,
+ TimelineMetricCacheProvider cacheProvider,
MetricHostProvider hostProvider,
String clusterNamePropertyId,
String hostNamePropertyId,
@@ -86,6 +80,8 @@ public abstract class AMSPropertyProvider extends MetricsPropertyProvider {
super(componentPropertyInfoMap, streamProvider, configuration,
hostProvider, clusterNamePropertyId, hostNamePropertyId,
componentNamePropertyId);
+
+ this.metricCache = cacheProvider.getTimelineMetricsCache();
}
protected String getOverridenComponentName(Resource resource) {
@@ -121,6 +117,9 @@ public abstract class AMSPropertyProvider extends MetricsPropertyProvider {
private final Map<String, Set<Resource>> resources = new HashMap<String, Set<Resource>>();
private final Map<String, Set<String>> metrics = new HashMap<String, Set<String>>();
private final URIBuilder uriBuilder;
+ Set<String> resolvedMetricsParams;
+ MetricsRequestHelper requestHelper = new MetricsRequestHelper(streamProvider);
+
// Metrics with amsHostMetric = true
// Basically a host metric to be returned for a hostcomponent
private final Set<String> hostComponentHostMetrics = new HashSet<String>();
@@ -162,45 +161,22 @@ public abstract class AMSPropertyProvider extends MetricsPropertyProvider {
}
}
- private TimelineMetrics getTimelineMetricsForSpec(String spec) {
- TimelineMetrics timelineMetrics = null;
-
- LOG.debug("Metrics request url = " + spec);
- BufferedReader reader = null;
- try {
- reader = new BufferedReader(new InputStreamReader(streamProvider.readFrom(spec)));
- timelineMetrics = timelineObjectReader.readValue(reader);
- LOG.debug("Timeline metrics response => " + timelineMetrics);
-
- } catch (IOException io) {
- String errorMsg = "Error getting timeline metrics.";
- if (LOG.isDebugEnabled()) {
- LOG.error(errorMsg, io);
- } else {
- if (io instanceof SocketTimeoutException) {
- errorMsg += " Can not connect to collector, socket error.";
- }
- LOG.error(errorMsg);
- }
- } finally {
- if (reader != null) {
- try {
- reader.close();
- } catch (IOException e) {
- if (LOG.isWarnEnabled()) {
- if (LOG.isDebugEnabled()) {
- LOG.warn("Unable to close http input stream : spec=" + spec, e);
- } else {
- LOG.warn("Unable to close http input stream : spec=" + spec);
- }
- }
- }
- }
+ private TimelineMetrics getTimelineMetricsFromCache(TimelineAppMetricCacheKey metricCacheKey,
+ String componentName) throws IOException {
+ // Cache only the component level metrics
+ // No point in time metrics are cached
+ if (metricCache != null
+ && !StringUtils.isEmpty(componentName)
+ && !componentName.equalsIgnoreCase("HOST")
+ && metricCacheKey.getTemporalInfo() != null) {
+ return metricCache.getAppTimelineMetricsFromCache(metricCacheKey);
}
- return timelineMetrics;
+ return requestHelper.fetchTimelineMetrics(metricCacheKey.getSpec());
}
+
+
/**
* Populate the associated resources by making a call to the Metrics
* service.
@@ -208,6 +184,7 @@ public abstract class AMSPropertyProvider extends MetricsPropertyProvider {
* @return a collection of populated resources
* @throws SystemException if unable to populate the resources
*/
+ @SuppressWarnings("unchecked")
public Collection<Resource> populateResources() throws SystemException {
// No open ended query support.
if (temporalInfo != null && (temporalInfo.getStartTime() == null
@@ -219,27 +196,62 @@ public abstract class AMSPropertyProvider extends MetricsPropertyProvider {
String componentName = resourceEntry.getKey();
Set<Resource> resourceSet = resourceEntry.getValue();
- TimelineMetrics timelineMetrics;
- // Allow for multiple requests since host metrics for a
- // hostcomponent need the HOST appId
- if (hostComponentHostMetrics.isEmpty()) { //HOST
- String spec = getSpec(componentName);
- timelineMetrics = getTimelineMetricsForSpec(spec);
- } else {
- Set<String> specs = getSpecsForHostComponentMetrics(componentName);
- timelineMetrics = new TimelineMetrics();
- for (String spec : specs) {
- if (!StringUtils.isEmpty(spec)) {
- TimelineMetrics metrics = getTimelineMetricsForSpec(spec);
- if (metrics != null) {
- timelineMetrics.getMetrics().addAll(metrics.getMetrics());
- }
- }
+ TimelineMetrics timelineMetrics = new TimelineMetrics();
+
+ Set<String> nonHostComponentMetrics = new HashSet<String>(metrics.keySet());
+ nonHostComponentMetrics.removeAll(hostComponentHostMetrics);
+ String hostnames = getHostnames(resources.get(componentName));
+
+ // Allow for multiple requests since host metrics for a
+ // hostcomponent need the HOST appId
+ if (!hostComponentHostMetrics.isEmpty()) {
+ String hostComponentHostMetricParams = getSetString(processRegexps(hostComponentHostMetrics), -1);
+ setQueryParams(hostComponentHostMetricParams, hostnames, true, componentName);
+ TimelineMetrics metricsResponse = null;
+ try {
+ metricsResponse = getTimelineMetricsFromCache(
+ getTimelineAppMetricCacheKey(hostComponentHostMetrics,
+ componentName, uriBuilder.toString()), componentName);
+ } catch (IOException e) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Caught exception fetching metric data.", e);
}
+ // Skip further queries to preempt long calls due to timeout
+ if (e instanceof SocketTimeoutException) {
+ return Collections.emptySet();
+ }
+ }
+ if (metricsResponse != null) {
+ timelineMetrics.getMetrics().addAll(metricsResponse.getMetrics());
}
+ }
+
+ if (!nonHostComponentMetrics.isEmpty()) {
+ String nonHostComponentHostMetricParams = getSetString(processRegexps(nonHostComponentMetrics), -1);
+ setQueryParams(nonHostComponentHostMetricParams, hostnames, false, componentName);
+ TimelineMetrics metricsResponse = null;
+ try {
+ metricsResponse = getTimelineMetricsFromCache(
+ getTimelineAppMetricCacheKey(nonHostComponentMetrics,
+ componentName, uriBuilder.toString()), componentName);
+ } catch (IOException e) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Caught exception fetching metric data.", e);
+ }
+ // Skip further queries to preempt long calls due to timeout
+ if (e instanceof SocketTimeoutException) {
+ return Collections.emptySet();
+ }
+ }
+ if (metricsResponse != null) {
+ timelineMetrics.getMetrics().addAll(metricsResponse.getMetrics());
+ }
+ }
+
Map<String, Set<TimelineMetric>> metricsMap = new HashMap<String, Set<TimelineMetric>>();
Set<String> patterns = createPatterns(metrics.keySet());
- if (timelineMetrics != null) {
+
+ if (!timelineMetrics.getMetrics().isEmpty()) {
for (TimelineMetric metric : timelineMetrics.getMetrics()) {
if (metric.getMetricName() != null
&& metric.getMetricValues() != null
@@ -255,9 +267,11 @@ public abstract class AMSPropertyProvider extends MetricsPropertyProvider {
String hostname = getHostName(resource);
if (metricsMap.containsKey(hostname)) {
for (TimelineMetric metric : metricsMap.get(hostname)) {
- // Pad zeros or nulls if needed
- metricsPaddingMethod.applyPaddingStrategy(metric, temporalInfo);
- populateResource(resource, metric, temporalInfo);
+ // Pad zeros or nulls if needed to a clone so we do not cache
+ // padded values
+ TimelineMetric timelineMetricClone = new TimelineMetric(metric);
+ metricsPaddingMethod.applyPaddingStrategy(timelineMetricClone, temporalInfo);
+ populateResource(resource, timelineMetricClone, temporalInfo);
}
}
}
@@ -267,38 +281,14 @@ public abstract class AMSPropertyProvider extends MetricsPropertyProvider {
return Collections.emptySet();
}
- /**
- * Return separate specs for : host component metrics and host component
- * host metrics.
- * @return @Set Urls
- */
- private Set<String> getSpecsForHostComponentMetrics(String componentName) {
- Set<String> nonHostComponentMetrics = new HashSet<String>(metrics.keySet());
- nonHostComponentMetrics.removeAll(hostComponentHostMetrics);
-
- Set<String> specs = new HashSet<String>();
- String hostnames = getHostnames(resources.get(componentName));
- if (!hostComponentHostMetrics.isEmpty()) {
- String hostComponentHostMetricParams = getSetString(processRegexps(hostComponentHostMetrics), -1);
- setQueryParams(hostComponentHostMetricParams, hostnames, true, componentName);
- specs.add(uriBuilder.toString());
- }
-
- if (!nonHostComponentMetrics.isEmpty()) {
- String nonHostComponentHostMetricParams = getSetString(processRegexps(nonHostComponentMetrics), -1);
- setQueryParams(nonHostComponentHostMetricParams, hostnames, false, componentName);
- specs.add(uriBuilder.toString());
- }
- return specs;
- }
-
- private void setQueryParams(String metricsParam,
- String hostname, boolean isHostMetric, String componentName) {
+ private void setQueryParams(String metricsParam, String hostname,
+ boolean isHostMetric, String componentName) {
// Reuse uriBuilder
uriBuilder.removeQuery();
if (metricsParam.length() > 0) {
uriBuilder.setParameter("metricNames", metricsParam);
+ resolvedMetricsParams = Sets.newHashSet(metricsParam.split(","));
}
if (hostname != null && !hostname.isEmpty()) {
@@ -309,7 +299,7 @@ public abstract class AMSPropertyProvider extends MetricsPropertyProvider {
uriBuilder.setParameter("appId", "HOST");
} else {
if (componentName != null && !componentName.isEmpty()
- && !componentName.equalsIgnoreCase("HOST")) {
+ && !componentName.equalsIgnoreCase("HOST")) {
StackId stackId;
try {
AmbariManagementController managementController = AmbariServer.getController();
@@ -344,14 +334,6 @@ public abstract class AMSPropertyProvider extends MetricsPropertyProvider {
}
}
- private String getSpec(String componentName) {
- String metricsParam = getSetString(processRegexps(metrics.keySet()), -1);
- String hostnames = getHostnames(resources.get(componentName));
- setQueryParams(metricsParam, hostnames, false, componentName);
-
- return uriBuilder.toString();
- }
-
private Set<String> createPatterns(Set<String> rawNames) {
Pattern pattern = Pattern.compile(METRIC_REGEXP_PATTERN);
Set<String> result = new HashSet<String>();
@@ -432,6 +414,22 @@ public abstract class AMSPropertyProvider extends MetricsPropertyProvider {
}
}
}
+
+ // Called when host component metrics are present
+ private TimelineAppMetricCacheKey getTimelineAppMetricCacheKey(Set<String> metrics,
+ String componentName, String spec) {
+
+ TimelineAppMetricCacheKey metricCacheKey =
+ new TimelineAppMetricCacheKey(metrics, componentName, temporalInfo);
+
+ // Set Uri on the cache key so the only job of the cache update is
+ // tweaking the params. Note: Passing UriBuilder reference is unsafe
+ // due to reuse. Also, the Uri can only be constructed with a resource
+ // request which ties it to the cluster.
+ metricCacheKey.setSpec(spec);
+
+ return metricCacheKey;
+ }
}
private String getHostnames(Set<Resource> resources) {
@@ -496,6 +494,14 @@ public abstract class AMSPropertyProvider extends MetricsPropertyProvider {
return super.getComponentMetrics();
}
+ /**
+ * Return a set of @MetricsRequest object for each cluster.
+ *
+ * @param resources Set of resources asked to populate
+ * @param request Original Request object used to check properties
+ * @param ids Property ids to populate on the resource
+ * @throws SystemException
+ */
private Map<String, Map<TemporalInfo, MetricsRequest>> getMetricsRequests(
Set<Resource> resources, Request request, Set<String> ids) throws SystemException {
@@ -515,17 +521,31 @@ public abstract class AMSPropertyProvider extends MetricsPropertyProvider {
// Check liveliness of host
if (!hostProvider.isCollectorHostLive(clusterName, TIMELINE_METRICS)) {
- LOG.info("METRICS_COLLECTOR host is not live. Skip populating " +
- "resources with metrics.");
+ if (printSkipPopulateMsgHostCounter.getAndIncrement() == 0) {
+ LOG.info("METRICS_COLLECTOR host is not live. Skip populating " +
+ "resources with metrics, next message will be logged after 1000 " +
+ "attempts.");
+ } else {
+ printSkipPopulateMsgHostCounter.compareAndSet(1000, 0);
+ }
continue;
}
+ // reset
+ printSkipPopulateMsgHostCounter.set(0);
// Check liveliness of Collector
if (!hostProvider.isCollectorComponentLive(clusterName, TIMELINE_METRICS)) {
- LOG.info("METRICS_COLLECTOR is not live. Skip populating resources" +
- " with metrics.");
+ if (printSkipPopulateMsgHostCompCounter.getAndIncrement() == 0) {
+ LOG.info("METRICS_COLLECTOR is not live. Skip populating resources " +
+ "with metrics., next message will be logged after 1000 " +
+ "attempts.");
+ } else {
+ printSkipPopulateMsgHostCompCounter.compareAndSet(1000, 0);
+ }
continue;
}
+ // reset
+ printSkipPopulateMsgHostCompCounter.set(0);
Map<TemporalInfo, MetricsRequest> requests = requestMap.get(clusterName);
if (requests == null) {
@@ -568,7 +588,7 @@ public abstract class AMSPropertyProvider extends MetricsPropertyProvider {
metricsRequest = new MetricsRequest(temporalInfo,
getAMSUriBuilder(collectorHostName,
collectorPort != null ? Integer.parseInt(collectorPort) : COLLECTOR_DEFAULT_PORT),
- (String) resource.getPropertyValue(clusterNamePropertyId));
+ (String) resource.getPropertyValue(clusterNamePropertyId));
requests.put(temporalInfo, metricsRequest);
}
metricsRequest.putResource(getComponentName(resource), resource);
http://git-wip-us.apache.org/repos/asf/ambari/blob/b4ee1918/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/AMSReportPropertyProvider.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/AMSReportPropertyProvider.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/AMSReportPropertyProvider.java
index a095206..0605123 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/AMSReportPropertyProvider.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/AMSReportPropertyProvider.java
@@ -23,6 +23,9 @@ import org.apache.ambari.server.controller.metrics.MetricHostProvider;
import org.apache.ambari.server.controller.metrics.MetricsPaddingMethod;
import org.apache.ambari.server.controller.metrics.MetricsPropertyProvider;
import org.apache.ambari.server.controller.metrics.MetricsReportPropertyProvider;
+import org.apache.ambari.server.controller.metrics.timeline.cache.TimelineAppMetricCacheKey;
+import org.apache.ambari.server.controller.metrics.timeline.cache.TimelineMetricCache;
+import org.apache.ambari.server.controller.metrics.timeline.cache.TimelineMetricCacheProvider;
import org.apache.ambari.server.controller.spi.Predicate;
import org.apache.ambari.server.controller.spi.Request;
import org.apache.ambari.server.controller.spi.Resource;
@@ -33,49 +36,37 @@ import org.apache.ambari.server.controller.utilities.StreamProvider;
import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
import org.apache.http.client.utils.URIBuilder;
-import org.codehaus.jackson.map.AnnotationIntrospector;
-import org.codehaus.jackson.map.ObjectMapper;
-import org.codehaus.jackson.map.ObjectReader;
-import org.codehaus.jackson.map.annotate.JsonSerialize;
-import org.codehaus.jackson.xc.JaxbAnnotationIntrospector;
-import java.io.BufferedReader;
import java.io.IOException;
-import java.io.InputStreamReader;
-import java.net.SocketTimeoutException;
-import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
-import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.TreeMap;
+import java.util.concurrent.atomic.AtomicInteger;
import static org.apache.ambari.server.controller.metrics.MetricsPaddingMethod.ZERO_PADDING_PARAM;
import static org.apache.ambari.server.controller.metrics.MetricsServiceProvider.MetricsService.TIMELINE_METRICS;
-import static org.apache.ambari.server.controller.utilities.PropertyHelper.updateMetricsWithAggregateFunctionSupport;
public class AMSReportPropertyProvider extends MetricsReportPropertyProvider {
- private static ObjectMapper mapper;
- private final static ObjectReader timelineObjectReader;
private MetricsPaddingMethod metricsPaddingMethod;
-
- static {
- mapper = new ObjectMapper();
- AnnotationIntrospector introspector = new JaxbAnnotationIntrospector();
- mapper.setAnnotationIntrospector(introspector);
- //noinspection deprecation
- mapper.getSerializationConfig().setSerializationInclusion(JsonSerialize.Inclusion.NON_NULL);
- timelineObjectReader = mapper.reader(TimelineMetrics.class);
- }
+ private final TimelineMetricCache metricCache;
+ MetricsRequestHelper requestHelper;
+ private static AtomicInteger printSkipPopulateMsgHostCounter = new AtomicInteger(0);
+ private static AtomicInteger printSkipPopulateMsgHostCompCounter = new AtomicInteger(0);
public AMSReportPropertyProvider(Map<String, Map<String, PropertyInfo>> componentPropertyInfoMap,
StreamProvider streamProvider,
ComponentSSLConfiguration configuration,
+ TimelineMetricCacheProvider cacheProvider,
MetricHostProvider hostProvider,
String clusterNamePropertyId) {
super(componentPropertyInfoMap, streamProvider, configuration,
hostProvider, clusterNamePropertyId);
+
+ this.metricCache = cacheProvider.getTimelineMetricsCache();
+ this.requestHelper = new MetricsRequestHelper(streamProvider);
}
/**
@@ -144,17 +135,33 @@ public class AMSReportPropertyProvider extends MetricsReportPropertyProvider {
// Check liveliness of host
if (!hostProvider.isCollectorHostLive(clusterName, TIMELINE_METRICS)) {
- LOG.info("METRICS_COLLECTOR host is not live. Skip populating " +
- "resources with metrics.");
+ if (printSkipPopulateMsgHostCounter.getAndIncrement() == 0) {
+ LOG.info("METRICS_COLLECTOR host is not live. Skip populating " +
+ "resources with metrics, next message will be logged after 1000 " +
+ "attempts.");
+ } else {
+ printSkipPopulateMsgHostCounter.compareAndSet(1000, 0);
+ }
+
return true;
}
+ // reset
+ printSkipPopulateMsgHostCompCounter.set(0);
// Check liveliness of Collector
if (!hostProvider.isCollectorComponentLive(clusterName, TIMELINE_METRICS)) {
- LOG.info("METRICS_COLLECTOR is not live. Skip populating resources" +
- " with metrics.");
+ if (printSkipPopulateMsgHostCompCounter.getAndIncrement() == 0) {
+ LOG.info("METRICS_COLLECTOR is not live. Skip populating resources" +
+ " with metrics, next message will be logged after 1000 " +
+ "attempts.");
+ } else {
+ printSkipPopulateMsgHostCompCounter.compareAndSet(1000, 0);
+ }
+
return true;
}
+ // reset
+ printSkipPopulateMsgHostCompCounter.set(0);
setProperties(resource, clusterName, request, getRequestPropertyIds(request, predicate));
@@ -191,48 +198,34 @@ public class AMSReportPropertyProvider extends MetricsReportPropertyProvider {
uriBuilder.setParameter("endTime", String.valueOf(endTime));
}
- BufferedReader reader = null;
- String spec = uriBuilder.toString();
- try {
- LOG.debug("Metrics request url =" + spec);
- reader = new BufferedReader(new InputStreamReader(streamProvider.readFrom(spec)));
+ TimelineAppMetricCacheKey metricCacheKey =
+ new TimelineAppMetricCacheKey(propertyIdMap.keySet(), "HOST", temporalInfo);
+
+ metricCacheKey.setSpec(uriBuilder.toString());
- TimelineMetrics timelineMetrics = timelineObjectReader.readValue(reader);
- LOG.debug("Timeline metrics response => " + timelineMetrics);
+ // Self populating cache updates itself on every get with latest results
+ TimelineMetrics timelineMetrics;
+ if (metricCache != null && metricCacheKey.getTemporalInfo() != null) {
+ timelineMetrics = metricCache.getAppTimelineMetricsFromCache(metricCacheKey);
+ } else {
+ try {
+ timelineMetrics = requestHelper.fetchTimelineMetrics(uriBuilder.toString());
+ } catch (IOException e) {
+ timelineMetrics = null;
+ }
+ }
+ if (timelineMetrics != null) {
for (TimelineMetric metric : timelineMetrics.getMetrics()) {
if (metric.getMetricName() != null && metric.getMetricValues() != null) {
- // Pad zeros or nulls if needed
- metricsPaddingMethod.applyPaddingStrategy(metric, temporalInfo);
+ // Pad zeros or nulls if needed to a clone so we do not cache
+ // padded values
+ TimelineMetric timelineMetricClone = new TimelineMetric(metric);
+ metricsPaddingMethod.applyPaddingStrategy(timelineMetricClone, temporalInfo);
String propertyId = propertyIdMap.get(metric.getMetricName());
if (propertyId != null) {
- resource.setProperty(propertyId, getValue(metric, temporalInfo));
- }
- }
- }
-
- } catch (IOException io) {
- String errorMsg = "Error getting timeline metrics.";
- if (LOG.isDebugEnabled()) {
- LOG.error(errorMsg, io);
- } else {
- if (io instanceof SocketTimeoutException) {
- errorMsg += " Can not connect to collector, socket error.";
- }
- LOG.error(errorMsg);
- }
- } finally {
- if (reader != null) {
- try {
- reader.close();
- } catch (IOException e) {
- if (LOG.isWarnEnabled()) {
- if (LOG.isDebugEnabled()) {
- LOG.warn("Unable to close http input steam : spec=" + spec, e);
- } else {
- LOG.warn("Unable to close http input steam : spec=" + spec);
- }
+ resource.setProperty(propertyId, getValue(timelineMetricClone, temporalInfo));
}
}
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/b4ee1918/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/MetricsRequestHelper.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/MetricsRequestHelper.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/MetricsRequestHelper.java
new file mode 100644
index 0000000..ca20e54
--- /dev/null
+++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/MetricsRequestHelper.java
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.server.controller.metrics.timeline;
+
+import org.apache.ambari.server.controller.utilities.StreamProvider;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
+import org.codehaus.jackson.map.AnnotationIntrospector;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jackson.map.ObjectReader;
+import org.codehaus.jackson.map.annotate.JsonSerialize;
+import org.codehaus.jackson.xc.JaxbAnnotationIntrospector;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.net.SocketTimeoutException;
+import java.util.Date;
+import java.util.Map;
+
+/**
+ * Helper class to call AMS backend that is utilized by @AMSPropertyProvider
+ * and @AMSReportPropertyProvider as well as @TimelineMetricCacheEntryFactory
+ */
+public class MetricsRequestHelper {
+ private final static Logger LOG = LoggerFactory.getLogger(MetricsRequestHelper.class);
+ private final static ObjectMapper mapper;
+ private final static ObjectReader timelineObjectReader;
+ private final StreamProvider streamProvider;
+
+ static {
+ mapper = new ObjectMapper();
+ AnnotationIntrospector introspector = new JaxbAnnotationIntrospector();
+ mapper.setAnnotationIntrospector(introspector);
+ //noinspection deprecation
+ mapper.getSerializationConfig().setSerializationInclusion(JsonSerialize.Inclusion.NON_NULL);
+ timelineObjectReader = mapper.reader(TimelineMetrics.class);
+ }
+
+ public MetricsRequestHelper(StreamProvider streamProvider) {
+ this.streamProvider = streamProvider;
+ }
+
+ public TimelineMetrics fetchTimelineMetrics(String spec) throws IOException {
+ LOG.debug("Metrics request url = " + spec);
+ BufferedReader reader = null;
+ TimelineMetrics timelineMetrics = null;
+ try {
+ reader = new BufferedReader(new InputStreamReader(streamProvider.readFrom(spec)));
+ timelineMetrics = timelineObjectReader.readValue(reader);
+ if (LOG.isTraceEnabled()) {
+ for (TimelineMetric metric : timelineMetrics.getMetrics()) {
+ LOG.trace("metric: " + metric.getMetricName() +
+ ", size = " + metric.getMetricValues().size() +
+ ", host = " + metric.getHostName() +
+ ", app = " + metric.getAppId() +
+ ", instance = " + metric.getInstanceId() +
+ ", time = " + metric.getTimestamp() +
+ ", startTime = " + new Date(metric.getStartTime()));
+ }
+ }
+ } catch (IOException io) {
+ String errorMsg = "Error getting timeline metrics.";
+ if (LOG.isDebugEnabled()) {
+ LOG.error(errorMsg, io);
+ }
+
+ if (io instanceof SocketTimeoutException) {
+ errorMsg += " Can not connect to collector, socket error.";
+ LOG.error(errorMsg);
+ throw io;
+ }
+
+ } finally {
+ if (reader != null) {
+ try {
+ reader.close();
+ } catch (IOException e) {
+ if (LOG.isWarnEnabled()) {
+ if (LOG.isDebugEnabled()) {
+ LOG.warn("Unable to close http input stream : spec=" + spec, e);
+ } else {
+ LOG.warn("Unable to close http input stream : spec=" + spec);
+ }
+ }
+ }
+ }
+ }
+ return timelineMetrics;
+ }
+}