You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by ds...@apache.org on 2017/10/12 09:49:31 UTC
ambari git commit: AMBARI-22215 Refine cluster second aggregator by
aligning sink publish times to 1 minute boundaries. (dsen)
Repository: ambari
Updated Branches:
refs/heads/branch-3.0-ams a9c6054fe -> e196358ca
AMBARI-22215 Refine cluster second aggregator by aligning sink publish times to 1 minute boundaries. (dsen)
Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/e196358c
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/e196358c
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/e196358c
Branch: refs/heads/branch-3.0-ams
Commit: e196358caa8b4c3387645238b8c48bb009cee311
Parents: a9c6054
Author: Dmytro Sen <ds...@apache.org>
Authored: Thu Oct 12 12:49:22 2017 +0300
Committer: Dmytro Sen <ds...@apache.org>
Committed: Thu Oct 12 12:49:22 2017 +0300
----------------------------------------------------------------------
.../timeline/AbstractTimelineMetricsSink.java | 95 +++++++-
.../metrics2/sink/timeline/TimelineMetric.java | 3 +
.../AbstractTimelineMetricSinkTest.java | 240 +++++++++++++++++++
.../AbstractTimelineMetricSinkTest.java | 113 ---------
.../timeline/HadoopTimelineMetricsSink.java | 2 +-
.../timeline/HadoopTimelineMetricsSinkTest.java | 4 +-
.../main/python/core/application_metric_map.py | 52 +++-
.../python/core/TestApplicationMetricMap.py | 38 ++-
.../timeline/TimelineMetricConfiguration.java | 3 -
.../timeline/TimelineMetricsIgniteCache.java | 14 +-
.../timeline/aggregators/AggregatorUtils.java | 2 +-
.../TimelineMetricAggregatorFactory.java | 7 +-
...cClusterAggregatorSecondWithCacheSource.java | 38 +--
.../TimelineMetricsIgniteCacheTest.java | 56 -----
...sterAggregatorSecondWithCacheSourceTest.java | 65 +----
15 files changed, 437 insertions(+), 295 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ambari/blob/e196358c/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AbstractTimelineMetricsSink.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AbstractTimelineMetricsSink.java b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AbstractTimelineMetricsSink.java
index 3c06032..739e9dc 100644
--- a/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AbstractTimelineMetricsSink.java
+++ b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AbstractTimelineMetricsSink.java
@@ -19,6 +19,8 @@ package org.apache.hadoop.metrics2.sink.timeline;
import com.google.common.base.Supplier;
import com.google.common.base.Suppliers;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
import com.google.common.reflect.TypeToken;
import com.google.gson.Gson;
import com.google.gson.JsonSyntaxException;
@@ -58,6 +60,7 @@ import java.util.List;
import java.util.Random;
import java.util.Set;
import java.util.SortedSet;
+import java.util.TreeMap;
import java.util.TreeSet;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@@ -130,6 +133,13 @@ public abstract class AbstractTimelineMetricsSink {
private static final int COLLECTOR_HOST_CACHE_MAX_EXPIRATION_MINUTES = 75;
private static final int COLLECTOR_HOST_CACHE_MIN_EXPIRATION_MINUTES = 60;
+ //10 seconds
+ protected int collectionPeriodMillis = 10000;
+
+ private int cacheExpireTimeMinutesDefault = 10;
+
+ private volatile Cache<String, TimelineMetric> metricsPostCache = CacheBuilder.newBuilder().expireAfterAccess(cacheExpireTimeMinutesDefault, TimeUnit.MINUTES).build();
+
static {
mapper = new ObjectMapper();
AnnotationIntrospector introspector = new JaxbAnnotationIntrospector();
@@ -289,7 +299,21 @@ public abstract class AbstractTimelineMetricsSink {
return collectorHost;
}
+ /**
+ * @param metrics metrics to post, metric values will be aligned by minute mark,
+ * last uncompleted minute will be cached to post in future iteration
+ */
protected boolean emitMetrics(TimelineMetrics metrics) {
+ return emitMetrics(metrics, false);
+ }
+
+ /**
+ * @param metrics metrics to post, if postAllCachedMetrics is false metric values will be aligned by minute mark,
+ * last uncompleted minute will be cached to post in future iteration
+ * @param postAllCachedMetrics if set to true all cached metrics will be posted, ignoring the minute aligning
+ * @return
+ */
+ protected boolean emitMetrics(TimelineMetrics metrics, boolean postAllCachedMetrics) {
String connectUrl;
boolean validCollectorHost = true;
@@ -307,11 +331,20 @@ public abstract class AbstractTimelineMetricsSink {
connectUrl = getCollectorUri(collectorHost);
}
+ TimelineMetrics metricsToEmit = alignMetricsByMinuteMark(metrics);
+
+ if (postAllCachedMetrics) {
+ for (TimelineMetric timelineMetric : metricsPostCache.asMap().values()) {
+ metricsToEmit.addOrMergeTimelineMetric(timelineMetric);
+ }
+ metricsPostCache.invalidateAll();
+ }
+
if (validCollectorHost) {
String jsonData = null;
LOG.debug("EmitMetrics connectUrl = " + connectUrl);
try {
- jsonData = mapper.writeValueAsString(metrics);
+ jsonData = mapper.writeValueAsString(metricsToEmit);
} catch (IOException e) {
LOG.error("Unable to parse metrics", e);
}
@@ -335,6 +368,61 @@ public abstract class AbstractTimelineMetricsSink {
}
/**
+ * Align metrics by the minutes so that only complete minutes are send.
+ * Not completed minutes data points will be cached and posted when the minute will be completed.
+ * Cached metrics are merged with currently posting metrics
+ * e.g:
+ * first iteration if metrics from 00m15s to 01m15s are processed,
+ * then metrics from 00m15s to 00m59s will be posted
+ * and from 01m00s to 01m15s will be cached
+ * second iteration metrics from 01m25s to 02m55s are processed,
+ * cached metrics from previous call will be merged with current,
+ * metrics from 01m00s to 02m55s will be posted, cache will be empty
+ * @param metrics
+ * @return
+ */
+ protected TimelineMetrics alignMetricsByMinuteMark(TimelineMetrics metrics) {
+ TimelineMetrics allMetricsToPost = new TimelineMetrics();
+
+ for (TimelineMetric metric : metrics.getMetrics()) {
+ TimelineMetric cachedMetric = metricsPostCache.getIfPresent(metric.getMetricName());
+ if (cachedMetric != null) {
+ metric.addMetricValues(cachedMetric.getMetricValues());
+ metricsPostCache.invalidate(metric.getMetricName());
+ }
+ }
+
+ for (TimelineMetric metric : metrics.getMetrics()) {
+ TreeMap<Long, Double> valuesToCache = new TreeMap<>();
+ TreeMap<Long, Double> valuesToPost = metric.getMetricValues();
+
+ // in case there can't be any more datapoints in last minute just post the metrics,
+ // otherwise need to cut off and cache the last uncompleted minute
+ if (!(valuesToPost.lastKey() % 60000 > 60000 - collectionPeriodMillis)) {
+ Long lastMinute = valuesToPost.lastKey() / 60000;
+ while (!valuesToPost.isEmpty() && valuesToPost.lastKey() / 60000 == lastMinute) {
+ valuesToCache.put(valuesToPost.lastKey(), valuesToPost.get(valuesToPost.lastKey()));
+ valuesToPost.remove(valuesToPost.lastKey());
+ }
+ }
+
+ if (!valuesToCache.isEmpty()) {
+ TimelineMetric metricToCache = new TimelineMetric(metric);
+ metricToCache.setMetricValues(valuesToCache);
+ metricsPostCache.put(metricToCache.getMetricName(), metricToCache);
+ }
+
+ if (!valuesToPost.isEmpty()) {
+ TimelineMetric metricToPost = new TimelineMetric(metric);
+ metricToPost.setMetricValues(valuesToPost);
+ allMetricsToPost.addOrMergeTimelineMetric(metricToPost);
+ }
+ }
+
+ return allMetricsToPost;
+ }
+
+ /**
* Cleans up and closes an input stream
* see http://docs.oracle.com/javase/6/docs/technotes/guides/net/http-keepalive.html
* @param is the InputStream to clean up
@@ -609,6 +697,11 @@ public abstract class AbstractTimelineMetricsSink {
rand.nextInt(zookeeperMaxBackoffTimeMins - zookeeperMinBackoffTimeMins + 1)) * 60*1000l;
}
+ //for now it's used only for testing
+ protected Cache<String, TimelineMetric> getMetricsPostCache() {
+ return metricsPostCache;
+ }
+
/**
* Get a pre-formatted URI for the collector
*/
http://git-wip-us.apache.org/repos/asf/ambari/blob/e196358c/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetric.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetric.java b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetric.java
index 3dfcf4e..b376048 100644
--- a/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetric.java
+++ b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetric.java
@@ -146,6 +146,9 @@ public class TimelineMetric implements Comparable<TimelineMetric>, Serializable
public void addMetricValues(Map<Long, Double> metricValues) {
this.metricValues.putAll(metricValues);
+ if (!this.metricValues.isEmpty()) {
+ this.setStartTime(this.metricValues.firstKey());
+ }
}
@XmlElement(name = "metadata")
http://git-wip-us.apache.org/repos/asf/ambari/blob/e196358c/ambari-metrics/ambari-metrics-common/src/test/java/org/apache/hadoop/metrics2/sink/timeline/AbstractTimelineMetricSinkTest.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-common/src/test/java/org/apache/hadoop/metrics2/sink/timeline/AbstractTimelineMetricSinkTest.java b/ambari-metrics/ambari-metrics-common/src/test/java/org/apache/hadoop/metrics2/sink/timeline/AbstractTimelineMetricSinkTest.java
new file mode 100644
index 0000000..634d18c
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-common/src/test/java/org/apache/hadoop/metrics2/sink/timeline/AbstractTimelineMetricSinkTest.java
@@ -0,0 +1,240 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.metrics2.sink.timeline;
+
+import junit.framework.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.api.easymock.PowerMock;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import java.io.OutputStream;
+import java.net.HttpURLConnection;
+import java.net.URL;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.TreeMap;
+
+import static org.easymock.EasyMock.anyString;
+import static org.easymock.EasyMock.expect;
+import static org.powermock.api.easymock.PowerMock.expectNew;
+import static org.powermock.api.easymock.PowerMock.replayAll;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({AbstractTimelineMetricsSink.class, HttpURLConnection.class})
+public class AbstractTimelineMetricSinkTest {
+
+ @Test
+ public void testParseHostsStringIntoCollection() {
+ AbstractTimelineMetricsSink sink = new TestTimelineMetricsSink();
+ Collection<String> hosts;
+
+ hosts = sink.parseHostsStringIntoCollection("");
+ Assert.assertTrue(hosts.isEmpty());
+
+ hosts = sink.parseHostsStringIntoCollection("test1.123.abc.def.local");
+ Assert.assertTrue(hosts.size() == 1);
+ Assert.assertTrue(hosts.contains("test1.123.abc.def.local"));
+
+ hosts = sink.parseHostsStringIntoCollection("test1.123.abc.def.local ");
+ Assert.assertTrue(hosts.size() == 1);
+ Assert.assertTrue(hosts.contains("test1.123.abc.def.local"));
+
+ hosts = sink.parseHostsStringIntoCollection("test1.123.abc.def.local,test1.456.abc.def.local");
+ Assert.assertTrue(hosts.size() == 2);
+
+ hosts = sink.parseHostsStringIntoCollection("test1.123.abc.def.local, test1.456.abc.def.local");
+ Assert.assertTrue(hosts.size() == 2);
+ Assert.assertTrue(hosts.contains("test1.123.abc.def.local"));
+ Assert.assertTrue(hosts.contains("test1.456.abc.def.local"));
+ }
+
+ @Test
+ @PrepareForTest({URL.class, OutputStream.class, AbstractTimelineMetricsSink.class, HttpURLConnection.class, TimelineMetric.class})
+ public void testEmitMetrics() throws Exception {
+ HttpURLConnection connection = PowerMock.createNiceMock(HttpURLConnection.class);
+ URL url = PowerMock.createNiceMock(URL.class);
+ expectNew(URL.class, anyString()).andReturn(url).anyTimes();
+ expect(url.openConnection()).andReturn(connection).anyTimes();
+ expect(connection.getResponseCode()).andReturn(200).anyTimes();
+ OutputStream os = PowerMock.createNiceMock(OutputStream.class);
+ expect(connection.getOutputStream()).andReturn(os).anyTimes();
+
+
+ TestTimelineMetricsSink sink = new TestTimelineMetricsSink();
+ TimelineMetrics timelineMetrics = new TimelineMetrics();
+ long startTime = System.currentTimeMillis() / 60000 * 60000;
+
+ long seconds = 1000;
+ TreeMap<Long, Double> metricValues = new TreeMap<>();
+ /*
+
+ 0 +30s +60s
+ | | |
+ (1)(2)(3) (4)(5) (6) m1
+
+ */
+ // (6) should be cached, the rest - posted
+
+ metricValues.put(startTime + 4*seconds, 1.0);
+ metricValues.put(startTime + 14*seconds, 2.0);
+ metricValues.put(startTime + 24*seconds, 3.0);
+ metricValues.put(startTime + 34*seconds, 4.0);
+ metricValues.put(startTime + 44*seconds, 5.0);
+ metricValues.put(startTime + 64*seconds, 6.0);
+
+ TimelineMetric timelineMetric = new TimelineMetric("metric1", "host1", "app1", "instance1");
+ timelineMetric.setStartTime(metricValues.firstKey());
+ timelineMetric.addMetricValues(metricValues);
+
+ timelineMetrics.addOrMergeTimelineMetric(timelineMetric);
+
+ replayAll();
+ sink.emitMetrics(timelineMetrics);
+ Assert.assertEquals(1, sink.getMetricsPostCache().size());
+ metricValues = new TreeMap<>();
+ metricValues.put(startTime + 64*seconds, 6.0);
+ Assert.assertEquals(metricValues, sink.getMetricsPostCache().getIfPresent("metric1").getMetricValues());
+
+ timelineMetrics = new TimelineMetrics();
+ metricValues = new TreeMap<>();
+ /*
+
+ +60 +90s +120s +150s +180s
+ | | | | |
+ (7) (8) (9) (10) (11) m1
+
+ */
+ // (6) from previous post should be merged with current data
+ // (6),(7),(8),(9),(10) - should be posted, (11) - cached
+ metricValues.put(startTime + 74*seconds, 7.0);
+ metricValues.put(startTime + 94*seconds, 8.0);
+ metricValues.put(startTime + 124*seconds, 9.0);
+ metricValues.put(startTime + 154*seconds, 10.0);
+ metricValues.put(startTime + 184*seconds, 11.0);
+
+ timelineMetric = new TimelineMetric("metric1", "host1", "app1", "instance1");
+ timelineMetric.setStartTime(metricValues.firstKey());
+ timelineMetric.addMetricValues(metricValues);
+
+ timelineMetrics.addOrMergeTimelineMetric(timelineMetric);
+ sink.emitMetrics(timelineMetrics);
+
+ Assert.assertEquals(1, sink.getMetricsPostCache().size());
+ metricValues = new TreeMap<>();
+ metricValues.put(startTime + 184*seconds, 11.0);
+ Assert.assertEquals(metricValues, sink.getMetricsPostCache().getIfPresent("metric1").getMetricValues());timelineMetrics = new TimelineMetrics();
+
+ metricValues = new TreeMap<>();
+ /*
+
+ +180s +210s +240s
+ | | |
+ (12) (13)
+
+ */
+ // (11) from previous post should be merged with current data
+ // (11),(12),(13) - should be posted, cache should be empty
+ metricValues.put(startTime + 194*seconds, 12.0);
+ metricValues.put(startTime + 239*seconds, 13.0);
+
+ timelineMetric = new TimelineMetric("metric1", "host1", "app1", "instance1");
+ timelineMetric.setStartTime(metricValues.firstKey());
+ timelineMetric.addMetricValues(metricValues);
+
+ timelineMetrics.addOrMergeTimelineMetric(timelineMetric);
+ sink.emitMetrics(timelineMetrics);
+
+ Assert.assertEquals(0, sink.getMetricsPostCache().size());
+
+ metricValues = new TreeMap<>();
+ /*
+
+ +240s +270s +300s +330s
+ | | | |
+ (14) (15) (16)
+
+ */
+ // since postAllCachedMetrics in emitMetrics call is true (14),(15),(16) - should be posted, cache should be empty
+ metricValues.put(startTime + 245*seconds, 14.0);
+ metricValues.put(startTime + 294*seconds, 15.0);
+ metricValues.put(startTime + 315*seconds, 16.0);
+
+ timelineMetric = new TimelineMetric("metric1", "host1", "app1", "instance1");
+ timelineMetric.setStartTime(metricValues.firstKey());
+ timelineMetric.addMetricValues(metricValues);
+
+ timelineMetrics.addOrMergeTimelineMetric(timelineMetric);
+ sink.emitMetrics(timelineMetrics, true);
+
+ Assert.assertEquals(0, sink.getMetricsPostCache().size());
+ }
+
+ private class TestTimelineMetricsSink extends AbstractTimelineMetricsSink {
+ @Override
+ protected String getCollectorUri(String host) {
+ return "";
+ }
+
+ @Override
+ protected String getCollectorProtocol() {
+ return "http";
+ }
+
+ @Override
+ protected String getCollectorPort() {
+ return "2181";
+ }
+
+ @Override
+ protected int getTimeoutSeconds() {
+ return 10;
+ }
+
+ @Override
+ protected String getZookeeperQuorum() {
+ return "localhost:2181";
+ }
+
+ @Override
+ protected Collection<String> getConfiguredCollectorHosts() {
+ return Arrays.asList("localhost");
+ }
+
+ @Override
+ protected String getHostname() {
+ return "h1";
+ }
+
+ @Override
+ protected boolean isHostInMemoryAggregationEnabled() {
+ return true;
+ }
+
+ @Override
+ protected int getHostInMemoryAggregationPort() {
+ return 61888;
+ }
+
+ @Override
+ protected String getHostInMemoryAggregationProtocol() {
+ return "http";
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/e196358c/ambari-metrics/ambari-metrics-common/src/test/java/org/apache/hadoop/metrics2/sink/timeline/availability/AbstractTimelineMetricSinkTest.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-common/src/test/java/org/apache/hadoop/metrics2/sink/timeline/availability/AbstractTimelineMetricSinkTest.java b/ambari-metrics/ambari-metrics-common/src/test/java/org/apache/hadoop/metrics2/sink/timeline/availability/AbstractTimelineMetricSinkTest.java
deleted file mode 100644
index 396d08d..0000000
--- a/ambari-metrics/ambari-metrics-common/src/test/java/org/apache/hadoop/metrics2/sink/timeline/availability/AbstractTimelineMetricSinkTest.java
+++ /dev/null
@@ -1,113 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.metrics2.sink.timeline.availability;
-
-import junit.framework.Assert;
-import org.apache.hadoop.metrics2.sink.timeline.AbstractTimelineMetricsSink;
-import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
-import org.junit.Test;
-
-import java.util.Arrays;
-import java.util.Collection;
-
-public class AbstractTimelineMetricSinkTest {
-
- @Test
- public void testParseHostsStringIntoCollection() {
- AbstractTimelineMetricsSink sink = new TestTimelineMetricsSink();
- Collection<String> hosts;
-
- hosts = sink.parseHostsStringIntoCollection("");
- Assert.assertTrue(hosts.isEmpty());
-
- hosts = sink.parseHostsStringIntoCollection("test1.123.abc.def.local");
- Assert.assertTrue(hosts.size() == 1);
- Assert.assertTrue(hosts.contains("test1.123.abc.def.local"));
-
- hosts = sink.parseHostsStringIntoCollection("test1.123.abc.def.local ");
- Assert.assertTrue(hosts.size() == 1);
- Assert.assertTrue(hosts.contains("test1.123.abc.def.local"));
-
- hosts = sink.parseHostsStringIntoCollection("test1.123.abc.def.local,test1.456.abc.def.local");
- Assert.assertTrue(hosts.size() == 2);
-
- hosts = sink.parseHostsStringIntoCollection("test1.123.abc.def.local, test1.456.abc.def.local");
- Assert.assertTrue(hosts.size() == 2);
- Assert.assertTrue(hosts.contains("test1.123.abc.def.local"));
- Assert.assertTrue(hosts.contains("test1.456.abc.def.local"));
-
- }
-
- private class TestTimelineMetricsSink extends AbstractTimelineMetricsSink {
- @Override
- protected String getCollectorUri(String host) {
- return "";
- }
-
- @Override
- protected String getCollectorProtocol() {
- return "http";
- }
-
- @Override
- protected String getCollectorPort() {
- return "2181";
- }
-
- @Override
- protected int getTimeoutSeconds() {
- return 10;
- }
-
- @Override
- protected String getZookeeperQuorum() {
- return "localhost:2181";
- }
-
- @Override
- protected Collection<String> getConfiguredCollectorHosts() {
- return Arrays.asList("localhost");
- }
-
- @Override
- protected String getHostname() {
- return "h1";
- }
-
- @Override
- protected boolean isHostInMemoryAggregationEnabled() {
- return true;
- }
-
- @Override
- protected int getHostInMemoryAggregationPort() {
- return 61888;
- }
-
- @Override
- protected String getHostInMemoryAggregationProtocol() {
- return "http";
- }
-
- @Override
- public boolean emitMetrics(TimelineMetrics metrics) {
- super.init();
- return super.emitMetrics(metrics);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/ambari/blob/e196358c/ambari-metrics/ambari-metrics-hadoop-sink/src/main/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSink.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-hadoop-sink/src/main/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSink.java b/ambari-metrics/ambari-metrics-hadoop-sink/src/main/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSink.java
index f37c2be..f70d8ec 100644
--- a/ambari-metrics/ambari-metrics-hadoop-sink/src/main/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSink.java
+++ b/ambari-metrics/ambari-metrics-hadoop-sink/src/main/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSink.java
@@ -508,7 +508,7 @@ public class HadoopTimelineMetricsSink extends AbstractTimelineMetricsSink imple
LOG.debug("Closing HadoopTimelineMetricSink. Flushing metrics to collector...");
TimelineMetrics metrics = metricsCache.getAllMetrics();
if (metrics != null) {
- emitMetrics(metrics);
+ emitMetrics(metrics, true);
}
}
});
http://git-wip-us.apache.org/repos/asf/ambari/blob/e196358c/ambari-metrics/ambari-metrics-hadoop-sink/src/test/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSinkTest.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-hadoop-sink/src/test/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSinkTest.java b/ambari-metrics/ambari-metrics-hadoop-sink/src/test/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSinkTest.java
index 8fde394..b194924 100644
--- a/ambari-metrics/ambari-metrics-hadoop-sink/src/test/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSinkTest.java
+++ b/ambari-metrics/ambari-metrics-hadoop-sink/src/test/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSinkTest.java
@@ -179,7 +179,7 @@ public class HadoopTimelineMetricsSinkTest {
createMockBuilder(HadoopTimelineMetricsSink.class)
.withConstructor().addMockedMethod("appendPrefix")
.addMockedMethod("findLiveCollectorHostsFromKnownCollector")
- .addMockedMethod("emitMetrics").createNiceMock();
+ .addMockedMethod("emitMetrics", TimelineMetrics.class).createNiceMock();
SubsetConfiguration conf = createNiceMock(SubsetConfiguration.class);
expect(conf.getString("slave.host.name")).andReturn("localhost").anyTimes();
@@ -310,7 +310,7 @@ public class HadoopTimelineMetricsSinkTest {
createMockBuilder(HadoopTimelineMetricsSink.class)
.withConstructor().addMockedMethod("appendPrefix")
.addMockedMethod("findLiveCollectorHostsFromKnownCollector")
- .addMockedMethod("emitMetrics").createNiceMock();
+ .addMockedMethod("emitMetrics", TimelineMetrics.class).createNiceMock();
SubsetConfiguration conf = createNiceMock(SubsetConfiguration.class);
expect(conf.getString("slave.host.name")).andReturn("localhost").anyTimes();
http://git-wip-us.apache.org/repos/asf/ambari/blob/e196358c/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/core/application_metric_map.py
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/core/application_metric_map.py b/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/core/application_metric_map.py
index 34a6787..bd957a0 100644
--- a/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/core/application_metric_map.py
+++ b/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/core/application_metric_map.py
@@ -41,6 +41,7 @@ class ApplicationMetricMap:
self.ip_address = ip_address
self.lock = RLock()
self.app_metric_map = {}
+ self.cached_metric_map = {}
pass
def put_metric(self, application_id, metric_id_to_value_map, timestamp):
@@ -98,7 +99,7 @@ class ApplicationMetricMap:
"appid" : "HOST",
"instanceid" : result_instanceid,
"starttime" : self.get_start_time(appId, metricId),
- "metrics" : metricData
+ "metrics" : self.align_values_by_minute_mark(appId, metricId, metricData) if clear_once_flattened else metricData
}
timeline_metrics[ "metrics" ].append( timeline_metric )
pass
@@ -114,6 +115,10 @@ class ApplicationMetricMap:
def get_start_time(self, app_id, metric_id):
with self.lock:
+ if self.cached_metric_map.has_key(app_id):
+ if self.cached_metric_map.get(app_id).has_key(metric_id):
+ metrics = self.cached_metric_map.get(app_id).get(metric_id)
+ return min(metrics.iterkeys())
if self.app_metric_map.has_key(app_id):
if self.app_metric_map.get(app_id).has_key(metric_id):
metrics = self.app_metric_map.get(app_id).get(metric_id)
@@ -137,3 +142,48 @@ class ApplicationMetricMap:
with self.lock:
self.app_metric_map.clear()
pass
+
+ # Align metrics by the minutes so that only complete minutes are send.
+ # Not completed minutes data points will be cached and posted when the minute will be completed.
+ # Cached metrics are merged with currently posting metrics
+ # e.g:
+ # first iteration if metrics from 00m15s to 01m15s are processed,
+ # then metrics from 00m15s to 00m59s will be posted
+ # and from 01m00s to 01m15s will be cached
+ # second iteration metrics from 01m25s to 02m55s are processed,
+ # cached metrics from previous call will be merged with current,
+ # metrics from 01m00s to 02m55s will be posted, cache will be empty
+ def align_values_by_minute_mark(self, appId, metricId, metricData):
+ with self.lock:
+ # append with cached values
+ if self.cached_metric_map.get(appId) and self.cached_metric_map.get(appId).get(metricId):
+ metricData.update(self.cached_metric_map[appId][metricId])
+ self.cached_metric_map[appId].pop(metricId)
+
+ # check if needs to be cached
+ # in case there can't be any more datapoints in last minute just post the metrics,
+ # otherwise need to cut off and cache the last uncompleted minute
+ max_time = max(metricData.iterkeys())
+ if max_time % 60000 <= 60000 - 10000:
+ max_minute = max_time / 60000
+ metric_data_copy = metricData.copy()
+ for time,value in metric_data_copy.iteritems():
+ if time / 60000 == max_minute:
+ cached_metric_map = self.cached_metric_map.get(appId)
+ if not cached_metric_map:
+ cached_metric_map = { metricId : { time : value } }
+ self.cached_metric_map[ appId ] = cached_metric_map
+ else:
+ cached_metric_id_map = cached_metric_map.get(metricId)
+ if not cached_metric_id_map:
+ cached_metric_id_map = { time : value }
+ cached_metric_map[ metricId ] = cached_metric_id_map
+ else:
+ cached_metric_map[ metricId ].update( { time : value } )
+ pass
+ pass
+ metricData.pop(time)
+ pass
+ pass
+
+ return metricData
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ambari/blob/e196358c/ambari-metrics/ambari-metrics-host-monitoring/src/test/python/core/TestApplicationMetricMap.py
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-host-monitoring/src/test/python/core/TestApplicationMetricMap.py b/ambari-metrics/ambari-metrics-host-monitoring/src/test/python/core/TestApplicationMetricMap.py
index a956a78..d9ea55d 100644
--- a/ambari-metrics/ambari-metrics-host-monitoring/src/test/python/core/TestApplicationMetricMap.py
+++ b/ambari-metrics/ambari-metrics-host-monitoring/src/test/python/core/TestApplicationMetricMap.py
@@ -50,7 +50,7 @@ class TestApplicationMetricMap(TestCase):
self.assertEqual(p['metrics'][0]['metrics'][str(timestamp)], 'bv')
self.assertEqual(application_metric_map.get_start_time(application_id, "b"), timestamp)
-
+
metrics = {}
metrics.update({"b" : 'bv'})
metrics.update({"a" : 'av'})
@@ -71,4 +71,38 @@ class TestApplicationMetricMap(TestCase):
json_data = json.loads(application_metric_map.flatten('A1', True))
self.assertEqual(len(json_data['metrics']), 1)
self.assertTrue(json_data['metrics'][0]['metricname'] == 'a')
- self.assertFalse(application_metric_map.app_metric_map)
\ No newline at end of file
+ self.assertFalse(application_metric_map.app_metric_map)
+
+ def test_flatten_and_align_values_by_minute_mark(self):
+ application_metric_map = ApplicationMetricMap("host", "10.10.10.10")
+ second = 1000
+ timestamp = int(round(1415390640.3806491 * second))
+ application_id = application_metric_map.format_app_id("A","1")
+ metrics = {}
+ metrics.update({"b" : 'bv'})
+
+ # 0s 60s 120s
+ # (0) (1) (2) (3)
+ # (3) should be cut off and cached
+ application_metric_map.put_metric(application_id, metrics, timestamp)
+ application_metric_map.put_metric(application_id, metrics, timestamp + second*24)
+ application_metric_map.put_metric(application_id, metrics, timestamp + second*84)
+ application_metric_map.put_metric(application_id, metrics, timestamp + second*124)
+
+ json_data = json.loads(application_metric_map.flatten(application_id, True))
+ self.assertEqual(len(json_data['metrics'][0]['metrics']), 3)
+ self.assertEqual(len(application_metric_map.cached_metric_map.get(application_id).get("b")), 1)
+ self.assertEqual(application_metric_map.cached_metric_map.get(application_id).get("b"), {timestamp + second*124 : 'bv'})
+
+ # 120s 180s
+ # (3) (4)
+ # cached (3) should be added to the post;
+ # (4) should be posted as well because there can't be more data points in the minute
+ application_metric_map.put_metric(application_id, metrics, timestamp + second * 176)
+
+ json_data = json.loads(application_metric_map.flatten(application_id, True))
+ self.assertEqual(len(json_data['metrics'][0]['metrics']), 2)
+
+ # starttime should be set to (3)
+ self.assertEqual(json_data['metrics'][0]['starttime'], timestamp + second*124)
+ self.assertEqual(len(application_metric_map.cached_metric_map.get(application_id)), 0)
http://git-wip-us.apache.org/repos/asf/ambari/blob/e196358c/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricConfiguration.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricConfiguration.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricConfiguration.java
index d0e385b..026eaf5 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricConfiguration.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricConfiguration.java
@@ -121,9 +121,6 @@ public class TimelineMetricConfiguration {
public static final String CLUSTER_AGGREGATOR_TIMESLICE_INTERVAL =
"timeline.metrics.cluster.aggregator.second.timeslice.interval";
- public static final String CLUSTER_CACHE_AGGREGATOR_TIMESLICE_INTERVAL =
- "timeline.metrics.cluster.cache.aggregator.second.timeslice.interval";
-
public static final String AGGREGATOR_CHECKPOINT_DELAY =
"timeline.metrics.service.checkpointDelay";
http://git-wip-us.apache.org/repos/asf/ambari/blob/e196358c/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricsIgniteCache.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricsIgniteCache.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricsIgniteCache.java
index aeaa4ba..6441c9c 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricsIgniteCache.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricsIgniteCache.java
@@ -50,7 +50,6 @@ import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
-import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -60,12 +59,11 @@ import java.util.concurrent.locks.Lock;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_SECOND_SLEEP_INTERVAL;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_CACHE_AGGREGATOR_TIMESLICE_INTERVAL;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_TIMESLICE_INTERVAL;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.HOST_APP_ID;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_CLUSTER_AGGREGATOR_INTERPOLATION_ENABLED;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_COLLECTOR_IGNITE_BACKUPS;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_COLLECTOR_IGNITE_NODES;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_SINK_COLLECTION_PERIOD;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRIC_AGGREGATION_SQL_FILTERS;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_SERVICE_HTTP_POLICY;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.AggregatorUtils.getRoundedCheckPointTimeMillis;
@@ -77,7 +75,6 @@ public class TimelineMetricsIgniteCache implements TimelineMetricDistributedCach
LogFactory.getLog(TimelineMetricsIgniteCache.class);
private IgniteCache<TimelineClusterMetric, MetricClusterAggregate> igniteCache;
private long cacheSliceIntervalMillis;
- private int collectionPeriodMillis;
private boolean interpolationEnabled;
private List<String> skipAggrPatternStrings = new ArrayList<>();
private List<String> appIdsToAggregate;
@@ -110,8 +107,7 @@ public class TimelineMetricsIgniteCache implements TimelineMetricDistributedCach
//aggregation parameters
appIdsToAggregate = timelineMetricConfiguration.getAppIdsForHostAggregation();
interpolationEnabled = Boolean.parseBoolean(metricConf.get(TIMELINE_METRICS_CLUSTER_AGGREGATOR_INTERPOLATION_ENABLED, "true"));
- collectionPeriodMillis = (int) SECONDS.toMillis(metricConf.getInt(TIMELINE_METRICS_SINK_COLLECTION_PERIOD, 10));
- cacheSliceIntervalMillis = SECONDS.toMillis(metricConf.getInt(CLUSTER_CACHE_AGGREGATOR_TIMESLICE_INTERVAL, 30));
+ cacheSliceIntervalMillis = SECONDS.toMillis(metricConf.getInt(CLUSTER_AGGREGATOR_TIMESLICE_INTERVAL, 30));
Long aggregationInterval = metricConf.getLong(CLUSTER_AGGREGATOR_SECOND_SLEEP_INTERVAL, 120L);
String filteredMetricPatterns = metricConf.get(TIMELINE_METRIC_AGGREGATION_SQL_FILTERS);
@@ -215,12 +211,6 @@ public class TimelineMetricsIgniteCache implements TimelineMetricDistributedCach
if (slicedClusterMetrics != null) {
for (Map.Entry<TimelineClusterMetric, Double> metricDoubleEntry : slicedClusterMetrics.entrySet()) {
- if (metricDoubleEntry.getKey().getTimestamp() == timeSlices.get(timeSlices.size()-1)[1] && metricDoubleEntry.getKey().getTimestamp() - metric.getMetricValues().lastKey() > collectionPeriodMillis) {
- if(LOG.isDebugEnabled()) {
- LOG.debug("Last skipped timestamp @ " + new Date(metric.getMetricValues().lastKey()) + " slice timestamp @ " + new Date(metricDoubleEntry.getKey().getTimestamp()));
- }
- continue;
- }
MetricClusterAggregate newMetricClusterAggregate = new MetricClusterAggregate(
metricDoubleEntry.getValue(), 1, null, metricDoubleEntry.getValue(), metricDoubleEntry.getValue());
//put app metric into cache
http://git-wip-us.apache.org/repos/asf/ambari/blob/e196358c/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/AggregatorUtils.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/AggregatorUtils.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/AggregatorUtils.java
index b12cb86..b8338fb 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/AggregatorUtils.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/AggregatorUtils.java
@@ -223,7 +223,7 @@ public class AggregatorUtils {
*/
public static Long getSliceTimeForMetric(List<Long[]> timeSlices, Long timestamp) {
for (Long[] timeSlice : timeSlices) {
- if (timestamp > timeSlice[0] && timestamp <= timeSlice[1]) {
+ if (timestamp >= timeSlice[0] && timestamp < timeSlice[1]) {
return timeSlice[1];
}
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/e196358c/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAggregatorFactory.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAggregatorFactory.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAggregatorFactory.java
index c27d712..9e493ea 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAggregatorFactory.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAggregatorFactory.java
@@ -41,7 +41,6 @@ import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.ti
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_SECOND_DISABLED;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_SECOND_SLEEP_INTERVAL;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_TIMESLICE_INTERVAL;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_CACHE_AGGREGATOR_TIMESLICE_INTERVAL;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.DEFAULT_CHECKPOINT_LOCATION;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.HOST_AGGREGATOR_DAILY_CHECKPOINT_CUTOFF_MULTIPLIER;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.HOST_AGGREGATOR_DAILY_DISABLED;
@@ -273,9 +272,6 @@ public class TimelineMetricAggregatorFactory {
long timeSliceIntervalMillis = SECONDS.toMillis(metricsConf.getInt
(CLUSTER_AGGREGATOR_TIMESLICE_INTERVAL, 30));
- long cacheTimeSliceIntervalMillis = SECONDS.toMillis(metricsConf.getInt
- (CLUSTER_CACHE_AGGREGATOR_TIMESLICE_INTERVAL, 30));
-
int checkpointCutOffMultiplier =
metricsConf.getInt(CLUSTER_AGGREGATOR_SECOND_CHECKPOINT_CUTOFF_MULTIPLIER, 2);
@@ -297,8 +293,7 @@ public class TimelineMetricAggregatorFactory {
120000l,
timeSliceIntervalMillis,
haController,
- distributedCache,
- cacheTimeSliceIntervalMillis
+ distributedCache
);
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/e196358c/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecondWithCacheSource.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecondWithCacheSource.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecondWithCacheSource.java
index 0c030b6..888044a 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecondWithCacheSource.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecondWithCacheSource.java
@@ -31,19 +31,16 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.AggregatorUtils.getSliceTimeForMetric;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.AggregatorUtils.getTimeSlices;
public class TimelineMetricClusterAggregatorSecondWithCacheSource extends TimelineMetricClusterAggregatorSecond {
private TimelineMetricDistributedCache distributedCache;
- private Long cacheTimeSliceIntervalMillis;
public TimelineMetricClusterAggregatorSecondWithCacheSource(AggregationTaskRunner.AGGREGATOR_NAME metricAggregateSecond, TimelineMetricMetadataManager metricMetadataManager, PhoenixHBaseAccessor hBaseAccessor, Configuration metricsConf, String checkpointLocation, long sleepIntervalMillis, int checkpointCutOffMultiplier, String aggregatorDisabledParam, String inputTableName, String outputTableName,
Long nativeTimeRangeDelay,
Long timeSliceInterval,
- MetricCollectorHAController haController, TimelineMetricDistributedCache distributedCache, Long cacheTimeSliceIntervalMillis) {
+ MetricCollectorHAController haController, TimelineMetricDistributedCache distributedCache) {
super(metricAggregateSecond, metricMetadataManager, hBaseAccessor, metricsConf, checkpointLocation, sleepIntervalMillis, checkpointCutOffMultiplier, aggregatorDisabledParam, inputTableName, outputTableName, nativeTimeRangeDelay, timeSliceInterval, haController);
this.distributedCache = distributedCache;
- this.cacheTimeSliceIntervalMillis = cacheTimeSliceIntervalMillis;
}
@Override
@@ -81,36 +78,11 @@ public class TimelineMetricClusterAggregatorSecondWithCacheSource extends Timeli
//Slices in cache could be different from aggregate slices, so need to recalculate. Counts hosted apps
Map<TimelineClusterMetric, MetricClusterAggregate> aggregateMetricsFromMetricClusterAggregates(Map<TimelineClusterMetric, MetricClusterAggregate> metricsFromCache, List<Long[]> timeSlices) {
- Map<TimelineClusterMetric, MetricClusterAggregate> result = new HashMap<>();
-
- //normalize if slices in cache are different from the aggregation slices
- //TODO add basic interpolation, current implementation assumes that cacheTimeSliceIntervalMillis <= timeSliceIntervalMillis
- if (cacheTimeSliceIntervalMillis.equals(timeSliceIntervalMillis)) {
- result = metricsFromCache;
- } else {
- for (Map.Entry<TimelineClusterMetric, MetricClusterAggregate> clusterMetricAggregateEntry : metricsFromCache.entrySet()) {
- Long timestamp = getSliceTimeForMetric(timeSlices, clusterMetricAggregateEntry.getKey().getTimestamp());
- if (timestamp <= 0) {
- LOG.warn("Entry doesn't match any slice. Slices : " + timeSlices + " metric timestamp : " + clusterMetricAggregateEntry.getKey().getTimestamp());
- continue;
- }
- TimelineClusterMetric timelineClusterMetric = new TimelineClusterMetric(clusterMetricAggregateEntry.getKey().getMetricName(), clusterMetricAggregateEntry.getKey().getAppId(), clusterMetricAggregateEntry.getKey().getInstanceId(), timestamp);
- if (result.containsKey(timelineClusterMetric)) {
- MetricClusterAggregate metricClusterAggregate = result.get(timelineClusterMetric);
- metricClusterAggregate.updateMax(clusterMetricAggregateEntry.getValue().getMax());
- metricClusterAggregate.updateMin(clusterMetricAggregateEntry.getValue().getMin());
- metricClusterAggregate.setSum((metricClusterAggregate.getSum() + clusterMetricAggregateEntry.getValue().getSum()) / 2D);
- metricClusterAggregate.setNumberOfHosts(Math.max(metricClusterAggregate.getNumberOfHosts(), clusterMetricAggregateEntry.getValue().getNumberOfHosts()));
- } else {
- result.put(timelineClusterMetric, clusterMetricAggregateEntry.getValue());
- }
- }
- }
-
+ //TODO add basic interpolation
//TODO investigate if needed, maybe add config to disable/enable
//count hosted apps
Map<String, MutableInt> hostedAppCounter = new HashMap<>();
- for (Map.Entry<TimelineClusterMetric, MetricClusterAggregate> clusterMetricAggregateEntry : result.entrySet()) {
+ for (Map.Entry<TimelineClusterMetric, MetricClusterAggregate> clusterMetricAggregateEntry : metricsFromCache.entrySet()) {
int numHosts = clusterMetricAggregateEntry.getValue().getNumberOfHosts();
String appId = clusterMetricAggregateEntry.getKey().getAppId();
if (!hostedAppCounter.containsKey(appId)) {
@@ -124,9 +96,9 @@ public class TimelineMetricClusterAggregatorSecondWithCacheSource extends Timeli
}
// Add liveHosts per AppId metrics.
- processLiveAppCountMetrics(result, hostedAppCounter, timeSlices.get(timeSlices.size() - 1)[1]);
+ processLiveAppCountMetrics(metricsFromCache, hostedAppCounter, timeSlices.get(timeSlices.size() - 1)[1]);
- return result;
+ return metricsFromCache;
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ambari/blob/e196358c/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricsIgniteCacheTest.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricsIgniteCacheTest.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricsIgniteCacheTest.java
index d3c6061..2cb66ba 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricsIgniteCacheTest.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricsIgniteCacheTest.java
@@ -167,62 +167,6 @@ public class TimelineMetricsIgniteCacheTest {
metricValues.clear();
timelineMetrics.clear();
- /*
-
- 0 +30s +60s +90s
- | | | |
- (1) (2) h1
- (3) (4) h2
- (5) (6) h1
-
- */
- // Case 3 : merging host data points, ignore (2) for h1 as it will conflict with (5), two hosts.
- metricValues = new TreeMap<>();
- metricValues.put(startTime + 15*seconds, 1.0);
- metricValues.put(startTime + 45*seconds, 2.0);
- timelineMetric = new TimelineMetric("metric1", "host1", "app1", "instance1");
- timelineMetric.setMetricValues(metricValues);
- timelineMetrics.add(timelineMetric);
-
- metricValues = new TreeMap<>();
- metricValues.put(startTime + 45*seconds, 3.0);
- metricValues.put(startTime + 85*seconds, 4.0);
- timelineMetric = new TimelineMetric("metric1", "host2", "app1", "instance1");
- timelineMetric.setMetricValues(metricValues);
- timelineMetrics.add(timelineMetric);
-
- metricValues = new TreeMap<>();
- metricValues.put(startTime + 55*seconds, 5.0);
- metricValues.put(startTime + 85*seconds, 6.0);
- timelineMetric = new TimelineMetric("metric1", "host1", "app1", "instance1");
- timelineMetric.setMetricValues(metricValues);
- timelineMetrics.add(timelineMetric);
-
- timelineMetricsIgniteCache.putMetrics(timelineMetrics, metricMetadataManagerMock);
-
- aggregateMap = timelineMetricsIgniteCache.evictMetricAggregates(startTime, startTime + 120*seconds);
-
- Assert.assertEquals(aggregateMap.size(), 3);
- timelineClusterMetric = new TimelineClusterMetric(timelineMetric.getMetricName(),
- timelineMetric.getAppId(), timelineMetric.getInstanceId(), startTime + 30*seconds);
-
- Assert.assertTrue(aggregateMap.containsKey(timelineClusterMetric));
- Assert.assertEquals(1.0, aggregateMap.get(timelineClusterMetric).getSum());
- Assert.assertEquals(1, aggregateMap.get(timelineClusterMetric).getNumberOfHosts());
-
- timelineClusterMetric.setTimestamp(startTime + 2*30*seconds);
- Assert.assertTrue(aggregateMap.containsKey(timelineClusterMetric));
- Assert.assertEquals(8.0, aggregateMap.get(timelineClusterMetric).getSum());
- Assert.assertEquals(2, aggregateMap.get(timelineClusterMetric).getNumberOfHosts());
-
- timelineClusterMetric.setTimestamp(startTime + 3*30*seconds);
- Assert.assertTrue(aggregateMap.containsKey(timelineClusterMetric));
- Assert.assertEquals(10.0, aggregateMap.get(timelineClusterMetric).getSum());
- Assert.assertEquals(2, aggregateMap.get(timelineClusterMetric).getNumberOfHosts());
-
- metricValues.clear();
- timelineMetrics.clear();
-
Assert.assertEquals(0d, timelineMetricsIgniteCache.getPointInTimeCacheMetrics().get("Cluster_KeySize"));
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/e196358c/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecondWithCacheSourceTest.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecondWithCacheSourceTest.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecondWithCacheSourceTest.java
index 7cddb00..e8a9dc2 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecondWithCacheSourceTest.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecondWithCacheSourceTest.java
@@ -79,7 +79,7 @@ public class TimelineMetricClusterAggregatorSecondWithCacheSourceTest {
TimelineMetricClusterAggregatorSecondWithCacheSource secondAggregator = new TimelineMetricClusterAggregatorSecondWithCacheSource(
METRIC_AGGREGATE_SECOND, metricMetadataManagerMock, null, configuration, null,
aggregatorInterval, 2, "false", "", "", aggregatorInterval,
- sliceInterval, null, timelineMetricsIgniteCache, 30L);
+ sliceInterval, null, timelineMetricsIgniteCache);
long now = System.currentTimeMillis();
long startTime = now - 120000;
@@ -112,67 +112,4 @@ public class TimelineMetricClusterAggregatorSecondWithCacheSourceTest {
Assert.assertEquals(2d, a1.getSum());
Assert.assertEquals(5d, a2.getSum());
}
-
- @Test
- public void testSlicesRecalculation() throws Exception {
- long aggregatorInterval = 120000;
- long sliceInterval = 30000;
-
- Configuration configuration = new Configuration();
-
- TimelineMetricMetadataManager metricMetadataManagerMock = createNiceMock(TimelineMetricMetadataManager.class);
- expect(metricMetadataManagerMock.getMetadataCacheValue((TimelineMetricMetadataKey) anyObject())).andReturn(null).anyTimes();
- replay(metricMetadataManagerMock);
-
- TimelineMetricClusterAggregatorSecondWithCacheSource secondAggregator = new TimelineMetricClusterAggregatorSecondWithCacheSource(
- METRIC_AGGREGATE_SECOND, metricMetadataManagerMock, null, configuration, null,
- aggregatorInterval, 2, "false", "", "", aggregatorInterval,
- sliceInterval, null, timelineMetricsIgniteCache, 30L);
-
- long seconds = 1000;
- long now = getRoundedCheckPointTimeMillis(System.currentTimeMillis(), 120*seconds);
- long startTime = now - 120*seconds;
-
- Map<TimelineClusterMetric, MetricClusterAggregate> metricsFromCache = new HashMap<>();
- metricsFromCache.put(new TimelineClusterMetric("m1", "a1", "i1",startTime + 5 * seconds),
- new MetricClusterAggregate(1.0, 2, 1.0, 1.0, 1.0));
- metricsFromCache.put(new TimelineClusterMetric("m1", "a1", "i1",startTime + 25 * seconds),
- new MetricClusterAggregate(2.0, 2, 1.0, 2.0, 2.0));
- metricsFromCache.put(new TimelineClusterMetric("m1", "a1", "i1",startTime + 45 * seconds),
- new MetricClusterAggregate(3.0, 2, 1.0, 1.0, 1.0));
- metricsFromCache.put(new TimelineClusterMetric("m1", "a1", "i1",startTime + 65 * seconds),
- new MetricClusterAggregate(4.0, 2, 1.0, 4.0, 4.0));
- metricsFromCache.put(new TimelineClusterMetric("m1", "a1", "i1",startTime + 85 * seconds),
- new MetricClusterAggregate(5.0, 2, 1.0, 5.0, 5.0));
-
- List<Long[]> timeslices = getTimeSlices(startTime, startTime + 120*seconds, 30*seconds);
-
- Map<TimelineClusterMetric, MetricClusterAggregate> aggregates = secondAggregator.aggregateMetricsFromMetricClusterAggregates(metricsFromCache, timeslices);
-
- Assert.assertNotNull(aggregates);
- Assert.assertEquals(4, aggregates.size());
-
- TimelineClusterMetric timelineClusterMetric = new TimelineClusterMetric("m1", "a1", "i1", startTime + 30*seconds);
- MetricClusterAggregate metricClusterAggregate = aggregates.get(timelineClusterMetric);
- Assert.assertNotNull(metricClusterAggregate);
- Assert.assertEquals(1.5, metricClusterAggregate.getSum());
- Assert.assertEquals(1d, metricClusterAggregate.getMin());
- Assert.assertEquals(2d, metricClusterAggregate.getMax());
- Assert.assertEquals(2, metricClusterAggregate.getNumberOfHosts());
-
- timelineClusterMetric.setTimestamp(startTime + 60*seconds);
- metricClusterAggregate = aggregates.get(timelineClusterMetric);
- Assert.assertNotNull(metricClusterAggregate);
- Assert.assertEquals(3d, metricClusterAggregate.getSum());
-
- timelineClusterMetric.setTimestamp(startTime + 90*seconds);
- metricClusterAggregate = aggregates.get(timelineClusterMetric);
- Assert.assertNotNull(metricClusterAggregate);
- Assert.assertEquals(4.5d, metricClusterAggregate.getSum());
-
- timelineClusterMetric = new TimelineClusterMetric("live_hosts", "a1", null, startTime + 120*seconds);
- metricClusterAggregate = aggregates.get(timelineClusterMetric);
- Assert.assertNotNull(metricClusterAggregate);
- Assert.assertEquals(2d, metricClusterAggregate.getSum());
- }
}