You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by jo...@apache.org on 2016/06/01 20:33:08 UTC
ambari git commit: Revert "AMBARI-16821 Improve TimelineMetricsCache
eviction/flush logic using a cache library (dsen)"
Repository: ambari
Updated Branches:
refs/heads/trunk 3324fd645 -> 8d30f1a4d
Revert "AMBARI-16821 Improve TimelineMetricsCache eviction/flush logic using a cache library (dsen)"
This reverts commit bef695d3fc44528d94a0fa60a3f1b6c5db51f02d.
Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/8d30f1a4
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/8d30f1a4
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/8d30f1a4
Branch: refs/heads/trunk
Commit: 8d30f1a4d1da10225f66ecc02f757b4db904c5c0
Parents: 3324fd6
Author: Jonathan Hurley <jh...@hortonworks.com>
Authored: Wed Jun 1 16:31:26 2016 -0400
Committer: Jonathan Hurley <jh...@hortonworks.com>
Committed: Wed Jun 1 16:31:26 2016 -0400
----------------------------------------------------------------------
ambari-metrics/ambari-metrics-common/pom.xml | 28 +---
.../timeline/cache/TimelineMetricsCache.java | 161 +++++++++++++------
.../cache/TimelineMetricsCacheTest.java | 20 +--
.../timeline/HadoopTimelineMetricsSinkTest.java | 11 +-
ambari-project/pom.xml | 5 +
ambari-server/pom.xml | 5 -
6 files changed, 127 insertions(+), 103 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ambari/blob/8d30f1a4/ambari-metrics/ambari-metrics-common/pom.xml
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-common/pom.xml b/ambari-metrics/ambari-metrics-common/pom.xml
index feaee22..41ba62e 100644
--- a/ambari-metrics/ambari-metrics-common/pom.xml
+++ b/ambari-metrics/ambari-metrics-common/pom.xml
@@ -70,32 +70,6 @@
</execution>
</executions>
</plugin>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-shade-plugin</artifactId>
- <version>2.3</version>
- <executions>
- <!-- Run shade goal on package phase -->
- <execution>
- <phase>package</phase>
- <goals>
- <goal>shade</goal>
- </goals>
- <configuration>
- <artifactSet>
- <includes>
- <include>commons-io:*</include>
- <include>com.google.code.gson:*</include>
- <include>com.google.guava:*</include>
- <include>org.apache.curator:*</include>
- </includes>
- </artifactSet>
- <minimizeJar>true</minimizeJar>
- <createDependencyReducedPom>false</createDependencyReducedPom>
- </configuration>
- </execution>
- </executions>
- </plugin>
</plugins>
</build>
@@ -105,6 +79,7 @@
<artifactId>commons-logging</artifactId>
<version>1.1.1</version>
</dependency>
+ <!-- TODO: Need to add these as shaded dependencies -->
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
@@ -125,6 +100,7 @@
<artifactId>curator-framework</artifactId>
<version>2.7.1</version>
</dependency>
+ <!-- END TODO -->
<dependency>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-xc</artifactId>
http://git-wip-us.apache.org/repos/asf/ambari/blob/8d30f1a4/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/cache/TimelineMetricsCache.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/cache/TimelineMetricsCache.java b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/cache/TimelineMetricsCache.java
index 57f1437..0bed7d0 100644
--- a/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/cache/TimelineMetricsCache.java
+++ b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/cache/TimelineMetricsCache.java
@@ -17,8 +17,6 @@
*/
package org.apache.hadoop.metrics2.sink.timeline.cache;
-import com.google.common.cache.Cache;
-import com.google.common.cache.CacheBuilder;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
@@ -27,18 +25,18 @@ import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
import java.util.ArrayList;
-import java.util.Collection;
import java.util.HashMap;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.ConcurrentSkipListMap;
@InterfaceAudience.Public
@InterfaceStability.Evolving
public class TimelineMetricsCache {
- private final Cache<String, TimelineMetricWrapper> timelineMetricCache;
+ private final TimelineMetricHolder timelineMetricCache = new TimelineMetricHolder();
private static final Log LOG = LogFactory.getLog(TimelineMetric.class);
public static final int MAX_RECS_PER_NAME_DEFAULT = 10000;
public static final int MAX_EVICTION_TIME_MILLIS = 59000; // ~ 1 min
@@ -56,81 +54,144 @@ public class TimelineMetricsCache {
this.maxRecsPerName = maxRecsPerName;
this.maxEvictionTimeInMillis = maxEvictionTimeInMillis;
this.skipCounterTransform = skipCounterTransform;
- this.timelineMetricCache = CacheBuilder.newBuilder().expireAfterWrite(maxEvictionTimeInMillis * 2, TimeUnit.MILLISECONDS).build();
}
class TimelineMetricWrapper {
- private Cache<Long, Double> dataPointsCache;
+ private long timeDiff = -1;
+ private long oldestTimestamp = -1;
private TimelineMetric timelineMetric;
- private Long oldestTimeStamp;
- private Long newestTimeStamp;
TimelineMetricWrapper(TimelineMetric timelineMetric) {
this.timelineMetric = timelineMetric;
- dataPointsCache = CacheBuilder.newBuilder().
- maximumSize(maxRecsPerName).expireAfterWrite(maxEvictionTimeInMillis * 2, TimeUnit.MILLISECONDS).build();
+ this.oldestTimestamp = timelineMetric.getStartTime();
+ }
- putMetric(timelineMetric);
+ private void updateTimeDiff(long timestamp) {
+ if (oldestTimestamp != -1 && timestamp > oldestTimestamp) {
+ timeDiff = timestamp - oldestTimestamp;
+ } else {
+ oldestTimestamp = timestamp;
+ }
}
public synchronized void putMetric(TimelineMetric metric) {
- if (dataPointsCache.size() == 0) {
- oldestTimeStamp = metric.getStartTime();
- newestTimeStamp = metric.getStartTime();
+ TreeMap<Long, Double> metricValues = this.timelineMetric.getMetricValues();
+ if (metricValues.size() > maxRecsPerName) {
+ // remove values for eldest maxEvictionTimeInMillis
+ long newEldestTimestamp = oldestTimestamp + maxEvictionTimeInMillis;
+ TreeMap<Long, Double> metricsSubSet =
+ new TreeMap<>(metricValues.tailMap(newEldestTimestamp));
+ if (metricsSubSet.isEmpty()) {
+ oldestTimestamp = metric.getStartTime();
+ this.timelineMetric.setStartTime(metric.getStartTime());
+ } else {
+ Long newStartTime = metricsSubSet.firstKey();
+ oldestTimestamp = newStartTime;
+ this.timelineMetric.setStartTime(newStartTime);
+ }
+ this.timelineMetric.setMetricValues(metricsSubSet);
+ LOG.warn("Metrics cache overflow. Values for metric " +
+ metric.getMetricName() + " older than " + newEldestTimestamp +
+ " were removed to clean up the cache.");
}
- TreeMap<Long, Double> metricValues = metric.getMetricValues();
- for (Map.Entry<Long, Double> entry : metricValues.entrySet()) {
- Long key = entry.getKey();
- dataPointsCache.put(key, entry.getValue());
- }
- oldestTimeStamp = Math.min(oldestTimeStamp, metric.getStartTime());
- newestTimeStamp = Math.max(newestTimeStamp, metric.getStartTime());
+ this.timelineMetric.addMetricValues(metric.getMetricValues());
+ updateTimeDiff(metric.getStartTime());
+ }
+
+ public synchronized long getTimeDiff() {
+ return timeDiff;
}
public synchronized TimelineMetric getTimelineMetric() {
- TreeMap<Long, Double> metricValues = new TreeMap<>(dataPointsCache.asMap());
- if (metricValues.isEmpty() || newestTimeStamp - oldestTimeStamp < maxEvictionTimeInMillis) {
+ return timelineMetric;
+ }
+ }
+
+ // TODO: Add weighted eviction
+ class TimelineMetricHolder extends ConcurrentSkipListMap<String, TimelineMetricWrapper> {
+ private static final long serialVersionUID = 2L;
+ // To avoid duplication at the end of the buffer and beginning of the next
+ // segment of values
+ private Map<String, Long> endOfBufferTimestamps = new HashMap<String, Long>();
+
+ public TimelineMetric evict(String metricName) {
+ TimelineMetricWrapper metricWrapper = this.get(metricName);
+
+ if (metricWrapper == null
+ || metricWrapper.getTimeDiff() < getMaxEvictionTimeInMillis()) {
return null;
}
- dataPointsCache.invalidateAll();
- timelineMetric.setStartTime(metricValues.firstKey());
- timelineMetric.setMetricValues(metricValues);
- return new TimelineMetric(timelineMetric);
+
+ TimelineMetric timelineMetric = metricWrapper.getTimelineMetric();
+ this.remove(metricName);
+
+ return timelineMetric;
+ }
+
+ public TimelineMetrics evictAll() {
+ List<TimelineMetric> metricList = new ArrayList<TimelineMetric>();
+
+ for (Iterator<Map.Entry<String, TimelineMetricWrapper>> it = this.entrySet().iterator(); it.hasNext();) {
+ Map.Entry<String, TimelineMetricWrapper> cacheEntry = it.next();
+ TimelineMetricWrapper metricWrapper = cacheEntry.getValue();
+ if (metricWrapper != null) {
+ TimelineMetric timelineMetric = cacheEntry.getValue().getTimelineMetric();
+ metricList.add(timelineMetric);
+ }
+ it.remove();
+ }
+ TimelineMetrics timelineMetrics = new TimelineMetrics();
+ timelineMetrics.setMetrics(metricList);
+ return timelineMetrics;
+ }
+
+ public void put(String metricName, TimelineMetric timelineMetric) {
+ if (isDuplicate(timelineMetric)) {
+ return;
+ }
+ TimelineMetricWrapper metric = this.get(metricName);
+ if (metric == null) {
+ this.put(metricName, new TimelineMetricWrapper(timelineMetric));
+ } else {
+ metric.putMetric(timelineMetric);
+ }
+ // Buffer last ts value
+ endOfBufferTimestamps.put(metricName, timelineMetric.getStartTime());
+ }
+
+ /**
+ * Test whether last buffered timestamp is same as the newly received.
+ * @param timelineMetric @TimelineMetric
+ * @return true/false
+ */
+ private boolean isDuplicate(TimelineMetric timelineMetric) {
+ return endOfBufferTimestamps.containsKey(timelineMetric.getMetricName())
+ && endOfBufferTimestamps.get(timelineMetric.getMetricName()).equals(timelineMetric.getStartTime());
}
}
public TimelineMetric getTimelineMetric(String metricName) {
- TimelineMetricWrapper timelineMetricWrapper = timelineMetricCache.getIfPresent(metricName);
- if (timelineMetricWrapper != null) {
- return timelineMetricWrapper.getTimelineMetric();
+ if (timelineMetricCache.containsKey(metricName)) {
+ return timelineMetricCache.evict(metricName);
}
+
return null;
}
public TimelineMetrics getAllMetrics() {
- TimelineMetrics timelineMetrics = new TimelineMetrics();
- Collection<TimelineMetricWrapper> timelineMetricWrapperCollection = timelineMetricCache.asMap().values();
- List<TimelineMetric> timelineMetricList =
- new ArrayList<>(timelineMetricWrapperCollection.size());
-
- for (TimelineMetricWrapper timelineMetricWrapper : timelineMetricWrapperCollection) {
- timelineMetricList.add(timelineMetricWrapper.getTimelineMetric());
- }
-
- timelineMetrics.setMetrics(timelineMetricList);
- return timelineMetrics;
+ return timelineMetricCache.evictAll();
}
+ /**
+ * Getter method to help testing eviction
+ * @return @int
+ */
+ public int getMaxEvictionTimeInMillis() {
+ return maxEvictionTimeInMillis;
+ }
public void putTimelineMetric(TimelineMetric timelineMetric) {
- String metricName = timelineMetric.getMetricName();
- TimelineMetricWrapper timelineMetricWrapper = timelineMetricCache.getIfPresent(metricName);
-
- if (timelineMetricWrapper != null) {
- timelineMetricWrapper.putMetric(timelineMetric);
- } else {
- timelineMetricCache.put(metricName, new TimelineMetricWrapper(timelineMetric));
- }
+ timelineMetricCache.put(timelineMetric.getMetricName(), timelineMetric);
}
private void transformMetricValuesToDerivative(TimelineMetric timelineMetric) {
http://git-wip-us.apache.org/repos/asf/ambari/blob/8d30f1a4/ambari-metrics/ambari-metrics-common/src/test/java/org/apache/hadoop/metrics2/sink/timeline/cache/TimelineMetricsCacheTest.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-common/src/test/java/org/apache/hadoop/metrics2/sink/timeline/cache/TimelineMetricsCacheTest.java b/ambari-metrics/ambari-metrics-common/src/test/java/org/apache/hadoop/metrics2/sink/timeline/cache/TimelineMetricsCacheTest.java
index 87c848b..18d973c 100644
--- a/ambari-metrics/ambari-metrics-common/src/test/java/org/apache/hadoop/metrics2/sink/timeline/cache/TimelineMetricsCacheTest.java
+++ b/ambari-metrics/ambari-metrics-common/src/test/java/org/apache/hadoop/metrics2/sink/timeline/cache/TimelineMetricsCacheTest.java
@@ -26,7 +26,6 @@ import java.util.TreeMap;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
public class TimelineMetricsCacheTest {
@@ -76,8 +75,8 @@ public class TimelineMetricsCacheTest {
}
@Test
- public void testMaxRecsPerNameForTimelineMetricWrapperCache() throws Exception {
- int maxRecsPerName = 3;
+ public void testMaxRecsPerName() throws Exception {
+ int maxRecsPerName = 2;
int maxEvictionTime = TimelineMetricsCache.MAX_EVICTION_TIME_MILLIS ;
TimelineMetricsCache timelineMetricsCache =
new TimelineMetricsCache(maxRecsPerName, maxEvictionTime);
@@ -125,21 +124,6 @@ public class TimelineMetricsCacheTest {
assertEquals(DEFAULT_START_TIME + maxEvictionTime * 2, cachedMetric.getStartTime());
}
- @Test
- public void testEvictionTimeForTimelineMetricWrapperCache() {
- int maxEvictionTime = 10;
- TimelineMetricsCache timelineMetricsCache =
- new TimelineMetricsCache(TimelineMetricsCache.MAX_RECS_PER_NAME_DEFAULT, maxEvictionTime);
- int numberOfMetricsInserted = 1000;
- for (int i = 0; i < numberOfMetricsInserted; i++) {
- timelineMetricsCache.putTimelineMetric(
- createTimelineMetricSingleValue(DEFAULT_START_TIME + maxEvictionTime * i));
- }
- TimelineMetric cachedMetric = timelineMetricsCache.getTimelineMetric(METRIC_NAME);
- assertNotNull(cachedMetric);
- assertTrue("Some metric values should have been removed", cachedMetric.getMetricValues().size() < numberOfMetricsInserted);
- }
-
private TimelineMetric createTimelineMetricSingleValue(final long startTime) {
TreeMap<Long, Double> values = new TreeMap<Long, Double>();
values.put(startTime, 0.0);
http://git-wip-us.apache.org/repos/asf/ambari/blob/8d30f1a4/ambari-metrics/ambari-metrics-hadoop-sink/src/test/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSinkTest.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-hadoop-sink/src/test/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSinkTest.java b/ambari-metrics/ambari-metrics-hadoop-sink/src/test/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSinkTest.java
index 1d7a0fb..ea7f72d 100644
--- a/ambari-metrics/ambari-metrics-hadoop-sink/src/test/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSinkTest.java
+++ b/ambari-metrics/ambari-metrics-hadoop-sink/src/test/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSinkTest.java
@@ -179,7 +179,7 @@ public class HadoopTimelineMetricsSinkTest {
expect(conf.getInt(eq(MAX_METRIC_ROW_CACHE_SIZE), anyInt())).andReturn(10).anyTimes();
// Return eviction time smaller than time diff for first 3 entries
// Third entry will result in eviction
- expect(conf.getInt(eq(METRICS_SEND_INTERVAL), anyInt())).andReturn(90).anyTimes();
+ expect(conf.getInt(eq(METRICS_SEND_INTERVAL), anyInt())).andReturn(10).anyTimes();
expect(sink.findLiveCollectorHostsFromKnownCollector("localhost", "6188"))
.andReturn(Collections.singletonList("localhost")).anyTimes();
@@ -211,6 +211,7 @@ public class HadoopTimelineMetricsSinkTest {
expect(metric.value()).andReturn(3.0).once();
expect(metric.value()).andReturn(4.0).once();
expect(metric.value()).andReturn(5.0).once();
+ expect(metric.value()).andReturn(6.0).once();
MetricsRecord record = createNiceMock(MetricsRecord.class);
expect(record.name()).andReturn("testName").anyTimes();
@@ -227,7 +228,7 @@ public class HadoopTimelineMetricsSinkTest {
final Long now = System.currentTimeMillis();
// TODO: Current implementation of cache needs > 1 elements to evict any
expect(record.timestamp()).andReturn(now).times(2);
- expect(record.timestamp()).andReturn(now + 100l).once();
+ expect(record.timestamp()).andReturn(now + 100l).times(2);
expect(record.timestamp()).andReturn(now + 200l).once();
expect(record.timestamp()).andReturn(now + 300l).once();
@@ -258,6 +259,8 @@ public class HadoopTimelineMetricsSinkTest {
sink.putMetrics(record);
// time = t3
sink.putMetrics(record);
+ // time = t4
+ sink.putMetrics(record);
verify(conf, sink, record, metric);
@@ -271,7 +274,7 @@ public class HadoopTimelineMetricsSinkTest {
Assert.assertEquals(now, timestamps.next());
Assert.assertEquals(new Long(now + 100l), timestamps.next());
Iterator<Double> values = timelineMetric1.getMetricValues().values().iterator();
- Assert.assertEquals(new Double(2.0), values.next());
+ Assert.assertEquals(new Double(1.0), values.next());
Assert.assertEquals(new Double(3.0), values.next());
// t3, t4
TimelineMetric timelineMetric2 = metricsIterator.next().getMetrics().get(0);
@@ -280,8 +283,8 @@ public class HadoopTimelineMetricsSinkTest {
Assert.assertEquals(new Long(now + 200l), timestamps.next());
Assert.assertEquals(new Long(now + 300l), timestamps.next());
values = timelineMetric2.getMetricValues().values().iterator();
- Assert.assertEquals(new Double(4.0), values.next());
Assert.assertEquals(new Double(5.0), values.next());
+ Assert.assertEquals(new Double(6.0), values.next());
}
@Test
http://git-wip-us.apache.org/repos/asf/ambari/blob/8d30f1a4/ambari-project/pom.xml
----------------------------------------------------------------------
diff --git a/ambari-project/pom.xml b/ambari-project/pom.xml
index 85c9619..2fbb1e1 100644
--- a/ambari-project/pom.xml
+++ b/ambari-project/pom.xml
@@ -218,6 +218,11 @@
<version>9.3-1101-jdbc4</version>
</dependency>
<dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ <version>16.0</version>
+ </dependency>
+ <dependency>
<groupId>com.google.code.findbugs</groupId>
<artifactId>jsr305</artifactId>
<version>1.3.9</version>
http://git-wip-us.apache.org/repos/asf/ambari/blob/8d30f1a4/ambari-server/pom.xml
----------------------------------------------------------------------
diff --git a/ambari-server/pom.xml b/ambari-server/pom.xml
index 30069c4..20d3fab 100644
--- a/ambari-server/pom.xml
+++ b/ambari-server/pom.xml
@@ -1102,11 +1102,6 @@
<artifactId>jetty-server</artifactId>
</dependency>
<dependency>
- <groupId>com.google.guava</groupId>
- <artifactId>guava</artifactId>
- <version>16.0</version>
- </dependency>
- <dependency>
<groupId>commons-logging</groupId>
<artifactId>commons-logging</artifactId>
</dependency>