You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by sw...@apache.org on 2017/05/24 22:43:08 UTC
[1/3] ambari git commit: AMBARI-21079. Add ability to sink Raw
metrics to external system via Http. (swagle)
Repository: ambari
Updated Branches:
refs/heads/branch-3.0-ams cd769e2e7 -> c32eebf89
http://git-wip-us.apache.org/repos/asf/ambari/blob/c32eebf8/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/cache/TimelineMetricsCacheSizeOfEngine.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/cache/TimelineMetricsCacheSizeOfEngine.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/cache/TimelineMetricsCacheSizeOfEngine.java
index 19e0e60..fccf190 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/cache/TimelineMetricsCacheSizeOfEngine.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/cache/TimelineMetricsCacheSizeOfEngine.java
@@ -17,57 +17,27 @@
*/
package org.apache.ambari.server.controller.metrics.timeline.cache;
-import java.util.Map;
-import java.util.TreeMap;
-
-import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
-import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
+import org.apache.hadoop.metrics2.sink.timeline.cache.TimelineMetricsEhCacheSizeOfEngine;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import net.sf.ehcache.pool.Size;
import net.sf.ehcache.pool.SizeOfEngine;
-import net.sf.ehcache.pool.impl.DefaultSizeOfEngine;
-import net.sf.ehcache.pool.sizeof.ReflectionSizeOf;
-import net.sf.ehcache.pool.sizeof.SizeOf;
/**
* Cache sizing engine that reduces reflective calls over the Object graph to
* find total Heap usage.
*/
-public class TimelineMetricsCacheSizeOfEngine implements SizeOfEngine {
+public class TimelineMetricsCacheSizeOfEngine extends TimelineMetricsEhCacheSizeOfEngine {
private final static Logger LOG = LoggerFactory.getLogger(TimelineMetricsCacheSizeOfEngine.class);
- public static int DEFAULT_MAX_DEPTH = 1000;
- public static boolean DEFAULT_ABORT_WHEN_MAX_DEPTH_EXCEEDED = false;
-
- private SizeOfEngine underlying = null;
- SizeOf reflectionSizeOf = new ReflectionSizeOf();
-
- // Optimizations
- private volatile long timelineMetricPrimitivesApproximation = 0;
-
- private long sizeOfMapEntry;
- private long sizeOfMapEntryOverhead;
private TimelineMetricsCacheSizeOfEngine(SizeOfEngine underlying) {
- this.underlying = underlying;
+ super(underlying);
}
public TimelineMetricsCacheSizeOfEngine() {
- this(new DefaultSizeOfEngine(DEFAULT_MAX_DEPTH, DEFAULT_ABORT_WHEN_MAX_DEPTH_EXCEEDED));
-
- this.sizeOfMapEntry = reflectionSizeOf.sizeOf(new Long(1)) +
- reflectionSizeOf.sizeOf(new Double(2.0));
-
- //SizeOfMapEntryOverhead = SizeOfMapWithOneEntry - (SizeOfEmptyMap + SizeOfOneEntry)
- TreeMap<Long, Double> map = new TreeMap<>();
- long emptyMapSize = reflectionSizeOf.sizeOf(map);
- map.put(new Long(1), new Double(2.0));
- long sizeOfMapOneEntry = reflectionSizeOf.deepSizeOf(DEFAULT_MAX_DEPTH, DEFAULT_ABORT_WHEN_MAX_DEPTH_EXCEEDED, map).getCalculated();
- this.sizeOfMapEntryOverhead = sizeOfMapOneEntry - (emptyMapSize + this.sizeOfMapEntry);
-
- LOG.info("Creating custom sizeof engine for TimelineMetrics.");
+ // Invoke default constructor in base class
}
@Override
@@ -108,36 +78,10 @@ public class TimelineMetricsCacheSizeOfEngine implements SizeOfEngine {
private long getTimelineMetricCacheValueSize(TimelineMetricsCacheValue value) {
long size = 16; // startTime + endTime
- TimelineMetrics metrics = value.getTimelineMetrics();
+
size += 8; // Object reference
- if (metrics != null) {
- for (TimelineMetric metric : metrics.getMetrics()) {
-
- if (timelineMetricPrimitivesApproximation == 0) {
- timelineMetricPrimitivesApproximation += reflectionSizeOf.sizeOf(metric.getMetricName());
- timelineMetricPrimitivesApproximation += reflectionSizeOf.sizeOf(metric.getAppId());
- timelineMetricPrimitivesApproximation += reflectionSizeOf.sizeOf(metric.getHostName());
- timelineMetricPrimitivesApproximation += reflectionSizeOf.sizeOf(metric.getInstanceId());
- timelineMetricPrimitivesApproximation += reflectionSizeOf.sizeOf(metric.getTimestamp());
- timelineMetricPrimitivesApproximation += reflectionSizeOf.sizeOf(metric.getStartTime());
- timelineMetricPrimitivesApproximation += reflectionSizeOf.sizeOf(metric.getType());
- timelineMetricPrimitivesApproximation += 8; // Object overhead
-
- LOG.debug("timelineMetricPrimitivesApproximation bytes = " + timelineMetricPrimitivesApproximation);
- }
- size += timelineMetricPrimitivesApproximation;
-
- Map<Long, Double> metricValues = metric.getMetricValues();
- if (metricValues != null && !metricValues.isEmpty()) {
- // Numeric wrapper: 12 bytes + 8 bytes Data type + 4 bytes alignment = 48 (Long, Double)
- // Tree Map: 12 bytes for header + 20 bytes for 5 object fields : pointers + 1 byte for flag = 40
- LOG.debug("Size of metric value: " + (sizeOfMapEntry + sizeOfMapEntryOverhead) * metricValues.size());
- size += (sizeOfMapEntry + sizeOfMapEntryOverhead) * metricValues.size(); // Treemap size is O(1)
- }
- }
- LOG.debug("Total Size of metric values in cache: " + size);
- }
+ size += getTimelineMetricsSize(value.getTimelineMetrics()); // TreeMap
return size;
}
@@ -147,7 +91,6 @@ public class TimelineMetricsCacheSizeOfEngine implements SizeOfEngine {
LOG.debug("Copying tracing sizeof engine, maxdepth: {}, abort: {}",
maxDepth, abortWhenMaxDepthExceeded);
- return new TimelineMetricsCacheSizeOfEngine(
- underlying.copyWith(maxDepth, abortWhenMaxDepthExceeded));
+ return new TimelineMetricsCacheSizeOfEngine(underlying.copyWith(maxDepth, abortWhenMaxDepthExceeded));
}
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/c32eebf8/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index e0d0373..ce70a62 100644
--- a/pom.xml
+++ b/pom.xml
@@ -394,6 +394,7 @@
<exclude>ambari-metrics/target/rpm/ambari-metrics/SPECS/ambari-metrics.spec</exclude>
<exclude>ambari-metrics/ambari-metrics-timelineservice/src/test/resources/lib/org/apache/phoenix/phoenix-core-tests/4.2.0/phoenix-core-tests-4.2.0.pom</exclude>
<exclude>ambari-metrics/ambari-metrics-timelineservice/src/test/resources/lib/org/apache/phoenix/phoenix-core-tests/maven-metadata-local.xml</exclude>
+ <exclude>ambari-metrics/ambari-metrics-alertservice/*.iml</exclude>
<exclude>ambari-metrics/*/target/**</exclude>
<!-- ignore .settings and .project -->
<exclude>ambari-metrics/**/.*/**</exclude>
[3/3] ambari git commit: AMBARI-21079. Add ability to sink Raw
metrics to external system via Http. (swagle)
Posted by sw...@apache.org.
AMBARI-21079. Add ability to sink Raw metrics to external system via Http. (swagle)
Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/c32eebf8
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/c32eebf8
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/c32eebf8
Branch: refs/heads/branch-3.0-ams
Commit: c32eebf8935acab9d1a510bf05e8f8aeb8873a6f
Parents: cd769e2
Author: Siddharth Wagle <sw...@hortonworks.com>
Authored: Tue May 23 14:01:14 2017 -0700
Committer: Siddharth Wagle <sw...@hortonworks.com>
Committed: Tue May 23 14:01:14 2017 -0700
----------------------------------------------------------------------
ambari-metrics/ambari-metrics-common/pom.xml | 29 ++-
.../TimelineMetricsEhCacheSizeOfEngine.java | 115 +++++++++
.../ApplicationHistoryServer.java | 13 +-
.../timeline/HBaseTimelineMetricStore.java | 30 ++-
.../metrics/timeline/PhoenixHBaseAccessor.java | 230 ++++++++++--------
.../timeline/TimelineMetricConfiguration.java | 176 +++++++++-----
.../TimelineMetricClusterAggregator.java | 2 +-
.../TimelineMetricMetadataManager.java | 15 +-
.../timeline/sink/DefaultFSSinkProvider.java | 153 ++++++++++++
.../timeline/sink/ExternalMetricsSink.java | 48 ++++
.../timeline/sink/ExternalSinkProvider.java | 35 +++
.../metrics/timeline/sink/HttpSinkProvider.java | 231 +++++++++++++++++++
.../DefaultInternalMetricsSourceProvider.java | 42 ++++
.../timeline/source/InternalMetricsSource.java | 30 +++
.../timeline/source/InternalSourceProvider.java | 39 ++++
.../timeline/source/RawMetricsSource.java | 93 ++++++++
.../source/cache/InternalMetricCacheKey.java | 109 +++++++++
.../source/cache/InternalMetricCacheValue.java | 37 +++
.../source/cache/InternalMetricsCache.java | 231 +++++++++++++++++++
.../cache/InternalMetricsCacheProvider.java | 48 ++++
.../cache/InternalMetricsCacheSizeOfEngine.java | 66 ++++++
.../TestApplicationHistoryServer.java | 4 +-
.../timeline/AbstractMiniHBaseClusterTest.java | 49 ++--
.../timeline/HBaseTimelineMetricStoreTest.java | 8 +-
.../timeline/ITPhoenixHBaseAccessor.java | 110 ++++-----
.../timeline/PhoenixHBaseAccessorTest.java | 167 ++++++--------
.../TimelineMetricStoreWatcherTest.java | 4 +-
.../aggregators/ITClusterAggregator.java | 72 +++---
.../timeline/discovery/TestMetadataManager.java | 2 +-
.../timeline/discovery/TestMetadataSync.java | 6 +-
.../timeline/source/RawMetricsSourceTest.java | 142 ++++++++++++
.../cache/TimelineMetricsCacheSizeOfEngine.java | 71 +-----
pom.xml | 1 +
33 files changed, 1933 insertions(+), 475 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ambari/blob/c32eebf8/ambari-metrics/ambari-metrics-common/pom.xml
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-common/pom.xml b/ambari-metrics/ambari-metrics-common/pom.xml
index 62ae75f..dc2ab5e 100644
--- a/ambari-metrics/ambari-metrics-common/pom.xml
+++ b/ambari-metrics/ambari-metrics-common/pom.xml
@@ -70,43 +70,47 @@
<relocations>
<relocation>
<pattern>com.google</pattern>
- <shadedPattern>org.apache.hadoop.metrics2.sink.relocated.google</shadedPattern>
+ <shadedPattern>org.apache.ambari.metrics.sink.relocated.google</shadedPattern>
</relocation>
<relocation>
<pattern>org.apache.commons.io</pattern>
- <shadedPattern>org.apache.hadoop.metrics2.sink.relocated.commons.io</shadedPattern>StormTimelineMetricsReporter
+ <shadedPattern>org.apache.ambari.metrics.sink.relocated.commons.io</shadedPattern>StormTimelineMetricsReporter
</relocation>
<relocation>
<pattern>org.apache.commons.lang</pattern>
- <shadedPattern>org.apache.hadoop.metrics2.sink.relocated.commons.lang</shadedPattern>
+ <shadedPattern>org.apache.ambari.metrics.relocated.commons.lang</shadedPattern>
</relocation>
<relocation>
<pattern>org.apache.curator</pattern>
- <shadedPattern>org.apache.hadoop.metrics2.sink.relocated.curator</shadedPattern>
+ <shadedPattern>org.apache.ambari.metrics.sink.relocated.curator</shadedPattern>
</relocation>
<relocation>
<pattern>org.apache.jute</pattern>
- <shadedPattern>org.apache.hadoop.metrics2.sink.relocated.jute</shadedPattern>
+ <shadedPattern>org.apache.ambari.metrics.sink.relocated.jute</shadedPattern>
</relocation>
<relocation>
<pattern>org.apache.zookeeper</pattern>
- <shadedPattern>org.apache.hadoop.metrics2.sink.relocated.zookeeper</shadedPattern>
+ <shadedPattern>org.apache.ambari.metrics.sink.relocated.zookeeper</shadedPattern>
</relocation>
<relocation>
<pattern>org.slf4j</pattern>
- <shadedPattern>org.apache.hadoop.metrics2.sink.relocated.slf4j</shadedPattern>
+ <shadedPattern>org.apache.ambari.metrics.sink.relocated.slf4j</shadedPattern>
</relocation>
<relocation>
<pattern>org.apache.log4j</pattern>
- <shadedPattern>org.apache.hadoop.metrics2.sink.relocated.log4j</shadedPattern>
+ <shadedPattern>org.apache.ambari.metrics.sink.relocated.log4j</shadedPattern>
</relocation>
<relocation>
<pattern>jline</pattern>
- <shadedPattern>org.apache.hadoop.metrics2.sink.relocated.jline</shadedPattern>
+ <shadedPattern>org.apache.ambari.metrics.sink.relocated.jline</shadedPattern>
</relocation>
<relocation>
<pattern>org.jboss</pattern>
- <shadedPattern>org.apache.hadoop.metrics2.sink.relocated.jboss</shadedPattern>
+ <shadedPattern>org.apache.ambari.metrics.sink.relocated.jboss</shadedPattern>
+ </relocation>
+ <relocation>
+ <pattern>net.sf.ehcache</pattern>
+ <shadedPattern>org.apache.ambari.metrics.sink.relocated.ehcache</shadedPattern>
</relocation>
</relocations>
</configuration>
@@ -118,6 +122,11 @@
<dependencies>
<dependency>
+ <groupId>net.sf.ehcache</groupId>
+ <artifactId>ehcache</artifactId>
+ <version>2.10.0</version>
+ </dependency>
+ <dependency>
<groupId>commons-logging</groupId>
<artifactId>commons-logging</artifactId>
<version>1.1.1</version>
http://git-wip-us.apache.org/repos/asf/ambari/blob/c32eebf8/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/cache/TimelineMetricsEhCacheSizeOfEngine.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/cache/TimelineMetricsEhCacheSizeOfEngine.java b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/cache/TimelineMetricsEhCacheSizeOfEngine.java
new file mode 100644
index 0000000..ea694b7
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/cache/TimelineMetricsEhCacheSizeOfEngine.java
@@ -0,0 +1,115 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.metrics2.sink.timeline.cache;
+
+import java.util.Map;
+import java.util.TreeMap;
+
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import net.sf.ehcache.pool.SizeOfEngine;
+import net.sf.ehcache.pool.impl.DefaultSizeOfEngine;
+import net.sf.ehcache.pool.sizeof.ReflectionSizeOf;
+import net.sf.ehcache.pool.sizeof.SizeOf;
+
+/**
+ * Cache sizing engine that reduces reflective calls over the Object graph to
+ * find total Heap usage. Used for ehcache based on available memory.
+ */
+public abstract class TimelineMetricsEhCacheSizeOfEngine implements SizeOfEngine {
+ private final static Logger LOG = LoggerFactory.getLogger(TimelineMetricsEhCacheSizeOfEngine.class);
+
+ private static int DEFAULT_MAX_DEPTH = 1000;
+ private static boolean DEFAULT_ABORT_WHEN_MAX_DEPTH_EXCEEDED = false;
+
+ // Base Engine
+ protected SizeOfEngine underlying = null;
+
+ // Counter
+ protected SizeOf reflectionSizeOf = new ReflectionSizeOf();
+
+ // Optimizations
+ private volatile long timelineMetricPrimitivesApproximation = 0;
+
+ // Map entry sizing
+ private long sizeOfMapEntry;
+ private long sizeOfMapEntryOverhead;
+
+ protected TimelineMetricsEhCacheSizeOfEngine(SizeOfEngine underlying) {
+ this.underlying = underlying;
+ }
+
+ public TimelineMetricsEhCacheSizeOfEngine() {
+ this(new DefaultSizeOfEngine(DEFAULT_MAX_DEPTH, DEFAULT_ABORT_WHEN_MAX_DEPTH_EXCEEDED));
+
+ this.sizeOfMapEntry = reflectionSizeOf.sizeOf(new Long(1)) +
+ reflectionSizeOf.sizeOf(new Double(2.0));
+
+ //SizeOfMapEntryOverhead = SizeOfMapWithOneEntry - (SizeOfEmptyMap + SizeOfOneEntry)
+ TreeMap<Long, Double> map = new TreeMap<>();
+ long emptyMapSize = reflectionSizeOf.sizeOf(map);
+ map.put(new Long(1), new Double(2.0));
+ long sizeOfMapOneEntry = reflectionSizeOf.deepSizeOf(DEFAULT_MAX_DEPTH, DEFAULT_ABORT_WHEN_MAX_DEPTH_EXCEEDED, map).getCalculated();
+ this.sizeOfMapEntryOverhead = sizeOfMapOneEntry - (emptyMapSize + this.sizeOfMapEntry);
+
+ LOG.info("Creating custom sizeof engine for TimelineMetrics.");
+ }
+
+ /**
+ * Return size of the metrics TreeMap in an optimized way.
+ *
+ */
+ protected long getTimelineMetricsSize(TimelineMetrics metrics) {
+ long size = 8; // Object reference
+
+ if (metrics != null) {
+ for (TimelineMetric metric : metrics.getMetrics()) {
+
+ if (timelineMetricPrimitivesApproximation == 0) {
+ timelineMetricPrimitivesApproximation += reflectionSizeOf.sizeOf(metric.getMetricName());
+ timelineMetricPrimitivesApproximation += reflectionSizeOf.sizeOf(metric.getAppId());
+ timelineMetricPrimitivesApproximation += reflectionSizeOf.sizeOf(metric.getHostName());
+ timelineMetricPrimitivesApproximation += reflectionSizeOf.sizeOf(metric.getInstanceId());
+ timelineMetricPrimitivesApproximation += reflectionSizeOf.sizeOf(metric.getTimestamp());
+ timelineMetricPrimitivesApproximation += reflectionSizeOf.sizeOf(metric.getStartTime());
+ timelineMetricPrimitivesApproximation += reflectionSizeOf.sizeOf(metric.getType());
+ timelineMetricPrimitivesApproximation += 8; // Object overhead
+
+ LOG.debug("timelineMetricPrimitivesApproximation bytes = " + timelineMetricPrimitivesApproximation);
+ }
+ size += timelineMetricPrimitivesApproximation;
+ size += getValueMapSize(metric.getMetricValues());
+ }
+ LOG.debug("Total Size of metric values in cache: " + size);
+ }
+ return size;
+ }
+
+ protected long getValueMapSize(Map<Long, Double> metricValues) {
+ long size = 0;
+ if (metricValues != null && !metricValues.isEmpty()) {
+ // Numeric wrapper: 12 bytes + 8 bytes Data type + 4 bytes alignment = 48 (Long, Double)
+ // Tree Map: 12 bytes for header + 20 bytes for 5 object fields : pointers + 1 byte for flag = 40
+ LOG.debug("Size of metric value: " + (sizeOfMapEntry + sizeOfMapEntryOverhead) * metricValues.size());
+ size += (sizeOfMapEntry + sizeOfMapEntryOverhead) * metricValues.size(); // Treemap size is O(1)
+ }
+ return size;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/c32eebf8/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/ApplicationHistoryServer.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/ApplicationHistoryServer.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/ApplicationHistoryServer.java
index 1ca9c33..331670d 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/ApplicationHistoryServer.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/ApplicationHistoryServer.java
@@ -34,7 +34,7 @@ import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.YarnUncaughtExceptionHandler;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
-import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.HBaseTimelineMetricStore;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.HBaseTimelineMetricsService;
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration;
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricStore;
import org.apache.hadoop.yarn.server.applicationhistoryservice.timeline.MemoryTimelineStore;
@@ -71,7 +71,7 @@ public class ApplicationHistoryServer extends CompositeService {
@Override
protected void serviceInit(Configuration conf) throws Exception {
- metricConfiguration = new TimelineMetricConfiguration();
+ metricConfiguration = TimelineMetricConfiguration.getInstance();
metricConfiguration.initialize();
historyManager = createApplicationHistory();
ahsClientService = createApplicationHistoryClientService(historyManager);
@@ -164,11 +164,16 @@ public class ApplicationHistoryServer extends CompositeService {
protected TimelineMetricStore createTimelineMetricStore(Configuration conf) {
LOG.info("Creating metrics store.");
- return new HBaseTimelineMetricStore(metricConfiguration);
+ return new HBaseTimelineMetricsService(metricConfiguration);
}
protected void startWebApp() {
- String bindAddress = metricConfiguration.getWebappAddress();
+ String bindAddress = null;
+ try {
+ bindAddress = metricConfiguration.getWebappAddress();
+ } catch (Exception e) {
+ throw new ExceptionInInitializerError("Cannot find bind address");
+ }
LOG.info("Instantiating AHSWebApp at " + bindAddress);
try {
Configuration conf = metricConfiguration.getMetricsConf();
http://git-wip-us.apache.org/repos/asf/ambari/blob/c32eebf8/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java
index f984253..c8eb65f 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java
@@ -51,6 +51,8 @@ import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.function.TimelineMetricsSeriesAggregateFunctionFactory;
import java.io.IOException;
+import java.net.MalformedURLException;
+import java.net.URISyntaxException;
import java.net.UnknownHostException;
import java.sql.SQLException;
import java.util.ArrayList;
@@ -71,9 +73,9 @@ import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.ti
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.DEFAULT_TOPN_HOSTS_LIMIT;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.ACTUAL_AGGREGATOR_NAMES;
-public class HBaseTimelineMetricStore extends AbstractService implements TimelineMetricStore {
+public class HBaseTimelineMetricsService extends AbstractService implements TimelineMetricStore {
- static final Log LOG = LogFactory.getLog(HBaseTimelineMetricStore.class);
+ static final Log LOG = LogFactory.getLog(HBaseTimelineMetricsService.class);
private final TimelineMetricConfiguration configuration;
private PhoenixHBaseAccessor hBaseAccessor;
private static volatile boolean isInitialized = false;
@@ -87,25 +89,28 @@ public class HBaseTimelineMetricStore extends AbstractService implements Timelin
* Construct the service.
*
*/
- public HBaseTimelineMetricStore(TimelineMetricConfiguration configuration) {
- super(HBaseTimelineMetricStore.class.getName());
+ public HBaseTimelineMetricsService(TimelineMetricConfiguration configuration) {
+ super(HBaseTimelineMetricsService.class.getName());
this.configuration = configuration;
}
@Override
protected void serviceInit(Configuration conf) throws Exception {
super.serviceInit(conf);
- initializeSubsystem(configuration.getHbaseConf(), configuration.getMetricsConf());
+ initializeSubsystem();
}
- private synchronized void initializeSubsystem(Configuration hbaseConf,
- Configuration metricsConf) {
+ private synchronized void initializeSubsystem() {
if (!isInitialized) {
- hBaseAccessor = new PhoenixHBaseAccessor(hbaseConf, metricsConf);
+ hBaseAccessor = new PhoenixHBaseAccessor(null);
// Initialize schema
hBaseAccessor.initMetricSchema();
// Initialize metadata from store
- metricMetadataManager = new TimelineMetricMetadataManager(hBaseAccessor, metricsConf);
+ try {
+ metricMetadataManager = new TimelineMetricMetadataManager(hBaseAccessor);
+ } catch (MalformedURLException | URISyntaxException e) {
+ throw new ExceptionInInitializerError("Unable to initialize metadata manager");
+ }
metricMetadataManager.initializeMetadata();
// Initialize policies before TTL update
hBaseAccessor.initPoliciesAndTTL();
@@ -127,6 +132,13 @@ public class HBaseTimelineMetricStore extends AbstractService implements Timelin
//Initialize whitelisting & blacklisting if needed
TimelineMetricsFilter.initializeMetricFilter(configuration);
+ Configuration metricsConf = null;
+ try {
+ metricsConf = configuration.getMetricsConf();
+ } catch (Exception e) {
+ throw new ExceptionInInitializerError("Cannot initialize configuration.");
+ }
+
defaultTopNHostsLimit = Integer.parseInt(metricsConf.get(DEFAULT_TOPN_HOSTS_LIMIT, "20"));
if (Boolean.parseBoolean(metricsConf.get(USE_GROUPBY_AGGREGATOR_QUERIES, "true"))) {
LOG.info("Using group by aggregators for aggregating host and cluster metrics.");
http://git-wip-us.apache.org/repos/asf/ambari/blob/c32eebf8/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java
index 3b2a119..15b0bb8 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java
@@ -17,88 +17,18 @@
*/
package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Multimap;
-import org.apache.commons.lang.StringUtils;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.DoNotRetryIOException;
-import org.apache.hadoop.hbase.HColumnDescriptor;
-import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.client.Durability;
-import org.apache.hadoop.hbase.client.HBaseAdmin;
-import org.apache.hadoop.hbase.util.RetryCounter;
-import org.apache.hadoop.hbase.util.RetryCounterFactory;
-import org.apache.hadoop.metrics2.sink.timeline.ContainerMetric;
-import org.apache.hadoop.metrics2.sink.timeline.MetricClusterAggregate;
-import org.apache.hadoop.metrics2.sink.timeline.MetricHostAggregate;
-import org.apache.hadoop.metrics2.sink.timeline.Precision;
-import org.apache.hadoop.metrics2.sink.timeline.SingleValuedTimelineMetric;
-import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
-import org.apache.hadoop.metrics2.sink.timeline.TimelineMetricMetadata;
-import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
-import org.apache.hadoop.util.ReflectionUtils;
-import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.AggregatorUtils;
-import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.Function;
-import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineClusterMetric;
-import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineMetricReadHelper;
-import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.discovery.TimelineMetricMetadataKey;
-import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.discovery.TimelineMetricMetadataManager;
-import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.Condition;
-import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.DefaultPhoenixDataSource;
-import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixConnectionProvider;
-import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL;
-import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.SplitByMetricNamesCondition;
-import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
-import org.apache.phoenix.exception.PhoenixIOException;
-import org.codehaus.jackson.map.ObjectMapper;
-import org.codehaus.jackson.type.TypeReference;
-
-import java.io.IOException;
-import java.sql.Connection;
-import java.sql.PreparedStatement;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.sql.Statement;
-import java.sql.Timestamp;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.TreeMap;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.AGGREGATE_TABLE_SPLIT_POINTS;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_PRECISION_TABLE_HBASE_BLOCKING_STORE_FILES;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_AGGREGATE_TABLE_HBASE_BLOCKING_STORE_FILES;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_HBASE_AGGREGATE_TABLE_COMPACTION_POLICY_CLASS;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_HBASE_AGGREGATE_TABLE_COMPACTION_POLICY_KEY;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_HBASE_PRECISION_TABLE_COMPACTION_POLICY_CLASS;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_HBASE_PRECISION_TABLE_COMPACTION_POLICY_KEY;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_PRECISION_TABLE_DURABILITY;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_AGGREGATE_TABLES_DURABILITY;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.HBASE_BLOCKING_STORE_FILES;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.AGGREGATORS_SKIP_BLOCK_CACHE;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_DAILY_TABLE_TTL;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_HOUR_TABLE_TTL;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_MINUTE_TABLE_TTL;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_SECOND_TABLE_TTL;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CONTAINER_METRICS_TTL;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.GLOBAL_MAX_RETRIES;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.GLOBAL_RESULT_LIMIT;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.GLOBAL_RETRY_INTERVAL;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.HBASE_BLOCKING_STORE_FILES;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.HBASE_COMPRESSION_SCHEME;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.HBASE_ENCODING_SCHEME;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.HOST_DAILY_TABLE_TTL;
@@ -107,11 +37,19 @@ import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.ti
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.OUT_OFF_BAND_DATA_TIME_ALLOWANCE;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.PRECISION_TABLE_SPLIT_POINTS;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.PRECISION_TABLE_TTL;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CONTAINER_METRICS_TTL;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_CACHE_SIZE;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_AGGREGATE_TABLES_DURABILITY;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_AGGREGATE_TABLE_HBASE_BLOCKING_STORE_FILES;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_CACHE_COMMIT_INTERVAL;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_CACHE_ENABLED;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_CACHE_SIZE;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_HBASE_AGGREGATE_TABLE_COMPACTION_POLICY_CLASS;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_HBASE_AGGREGATE_TABLE_COMPACTION_POLICY_KEY;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_HBASE_PRECISION_TABLE_COMPACTION_POLICY_CLASS;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_HBASE_PRECISION_TABLE_COMPACTION_POLICY_KEY;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_PRECISION_TABLE_DURABILITY;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_PRECISION_TABLE_HBASE_BLOCKING_STORE_FILES;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRIC_AGGREGATOR_SINK_CLASS;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.ALTER_METRICS_METADATA_TABLE;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.CONTAINER_METRICS_TABLE_NAME;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.CREATE_CONTAINER_METRICS_TABLE_SQL;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.CREATE_HOSTED_APPS_METADATA_TABLE_SQL;
@@ -120,7 +58,6 @@ import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.ti
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.CREATE_METRICS_CLUSTER_AGGREGATE_GROUPED_TABLE_SQL;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.CREATE_METRICS_CLUSTER_AGGREGATE_TABLE_SQL;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.CREATE_METRICS_METADATA_TABLE_SQL;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.ALTER_METRICS_METADATA_TABLE;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.CREATE_METRICS_TABLE_SQL;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.DEFAULT_ENCODING;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.DEFAULT_TABLE_COMPRESSION;
@@ -139,11 +76,80 @@ import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.ti
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.UPSERT_AGGREGATE_RECORD_SQL;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.UPSERT_CLUSTER_AGGREGATE_SQL;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.UPSERT_CLUSTER_AGGREGATE_TIME_SQL;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.UPSERT_CONTAINER_METRICS_SQL;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.UPSERT_HOSTED_APPS_METADATA_SQL;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.UPSERT_INSTANCE_HOST_METADATA_SQL;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.UPSERT_METADATA_SQL;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.UPSERT_METRICS_SQL;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.UPSERT_CONTAINER_METRICS_SQL;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.source.InternalSourceProvider.SOURCE_NAME.RAW_METRICS;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.DoNotRetryIOException;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.client.Durability;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.util.RetryCounter;
+import org.apache.hadoop.hbase.util.RetryCounterFactory;
+import org.apache.hadoop.metrics2.sink.timeline.ContainerMetric;
+import org.apache.hadoop.metrics2.sink.timeline.MetricClusterAggregate;
+import org.apache.hadoop.metrics2.sink.timeline.MetricHostAggregate;
+import org.apache.hadoop.metrics2.sink.timeline.Precision;
+import org.apache.hadoop.metrics2.sink.timeline.SingleValuedTimelineMetric;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetricMetadata;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.AggregatorUtils;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.Function;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineClusterMetric;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineMetricReadHelper;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.discovery.TimelineMetricMetadataKey;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.discovery.TimelineMetricMetadataManager;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.Condition;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.DefaultPhoenixDataSource;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixConnectionProvider;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.SplitByMetricNamesCondition;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.sink.ExternalMetricsSink;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.sink.ExternalSinkProvider;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.source.InternalMetricsSource;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.source.InternalSourceProvider;
+import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
+import org.apache.phoenix.exception.PhoenixIOException;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jackson.type.TypeReference;
+
+import com.google.common.collect.Maps;
+import com.google.common.collect.Multimap;
/**
@@ -198,16 +204,29 @@ public class PhoenixHBaseAccessor {
private HashMap<String, String> tableTTL = new HashMap<>();
- public PhoenixHBaseAccessor(Configuration hbaseConf,
- Configuration metricsConf){
- this(hbaseConf, metricsConf, new DefaultPhoenixDataSource(hbaseConf));
+ private final TimelineMetricConfiguration configuration;
+ private InternalMetricsSource rawMetricsSource;
+
+ public PhoenixHBaseAccessor(PhoenixConnectionProvider dataSource) {
+ this(TimelineMetricConfiguration.getInstance(), dataSource);
}
- PhoenixHBaseAccessor(Configuration hbaseConf,
- Configuration metricsConf,
+ // Test friendly construction since mock instrumentation is difficult to get
+ // working with hadoop mini cluster
+ PhoenixHBaseAccessor(TimelineMetricConfiguration configuration,
PhoenixConnectionProvider dataSource) {
- this.hbaseConf = hbaseConf;
- this.metricsConf = metricsConf;
+ this.configuration = TimelineMetricConfiguration.getInstance();
+ try {
+ this.hbaseConf = configuration.getHbaseConf();
+ this.metricsConf = configuration.getMetricsConf();
+ } catch (Exception e) {
+ throw new ExceptionInInitializerError("Cannot initialize configuration.");
+ }
+ if (dataSource == null) {
+ dataSource = new DefaultPhoenixDataSource(hbaseConf);
+ }
+ this.dataSource = dataSource;
+
RESULTSET_LIMIT = metricsConf.getInt(GLOBAL_RESULT_LIMIT, RESULTSET_LIMIT);
try {
Class.forName("org.apache.phoenix.jdbc.PhoenixDriver");
@@ -215,7 +234,7 @@ public class PhoenixHBaseAccessor {
LOG.error("Phoenix client jar not found in the classpath.", e);
throw new IllegalStateException(e);
}
- this.dataSource = dataSource;
+
this.retryCounterFactory = new RetryCounterFactory(metricsConf.getInt(GLOBAL_MAX_RETRIES, 10),
(int) SECONDS.toMillis(metricsConf.getInt(GLOBAL_RETRY_INTERVAL, 3)));
this.outOfBandTimeAllowance = metricsConf.getLong(OUT_OFF_BAND_DATA_TIME_ALLOWANCE,
@@ -249,10 +268,20 @@ public class PhoenixHBaseAccessor {
metricsConf.getClass(TIMELINE_METRIC_AGGREGATOR_SINK_CLASS, null,
TimelineMetricsAggregatorSink.class);
if (metricSinkClass != null) {
- aggregatorSink =
- ReflectionUtils.newInstance(metricSinkClass, metricsConf);
+ aggregatorSink = ReflectionUtils.newInstance(metricSinkClass, metricsConf);
LOG.info("Initialized aggregator sink class " + metricSinkClass);
}
+
+ ExternalSinkProvider externalSinkProvider = configuration.getExternalSinkProvider();
+ InternalSourceProvider internalSourceProvider = configuration.getInternalSourceProvider();
+ if (externalSinkProvider != null) {
+ ExternalMetricsSink rawMetricsSink = externalSinkProvider.getExternalMetricsSink(RAW_METRICS);
+ int interval = configuration.getExternalSinkInterval(RAW_METRICS);
+ if (interval == -1){
+ interval = cacheCommitInterval;
+ }
+ rawMetricsSource = internalSourceProvider.getInternalMetricsSource(RAW_METRICS, interval, rawMetricsSink);
+ }
}
public boolean isInsertCacheEmpty() {
@@ -261,12 +290,15 @@ public class PhoenixHBaseAccessor {
public void commitMetricsFromCache() {
LOG.debug("Clearing metrics cache");
- List<TimelineMetrics> metricsArray = new ArrayList<TimelineMetrics>(insertCache.size());
- while (!insertCache.isEmpty()) {
- metricsArray.add(insertCache.poll());
+ List<TimelineMetrics> metricsList = new ArrayList<TimelineMetrics>(insertCache.size());
+ if (!insertCache.isEmpty()) {
+ insertCache.drainTo(metricsList); // More performant than poll
}
- if (metricsArray.size() > 0) {
- commitMetrics(metricsArray);
+ if (metricsList.size() > 0) {
+ commitMetrics(metricsList);
+ if (rawMetricsSource != null) {
+ rawMetricsSource.publishTimelineMetrics(metricsList);
+ }
}
}
@@ -367,7 +399,7 @@ public class PhoenixHBaseAccessor {
}
@SuppressWarnings("unchecked")
- public static TreeMap<Long, Double> readMetricFromJSON(String json) throws IOException {
+ public static TreeMap<Long, Double> readMetricFromJSON(String json) throws IOException {
return mapper.readValue(json, metricValuesTypeRef);
}
@@ -701,6 +733,9 @@ public class PhoenixHBaseAccessor {
return "";
}
+ /**
+ * Insert precision YARN container data.
+ */
public void insertContainerMetrics(List<ContainerMetric> metrics)
throws SQLException, IOException {
Connection conn = getConnection();
@@ -766,6 +801,9 @@ public class PhoenixHBaseAccessor {
}
}
+ /**
+ * Insert precision data.
+ */
public void insertMetricRecordsWithMetadata(TimelineMetricMetadataManager metadataManager,
TimelineMetrics metrics, boolean skipCache) throws SQLException, IOException {
List<TimelineMetric> timelineMetrics = metrics.getMetrics();
@@ -1385,9 +1423,7 @@ public class PhoenixHBaseAccessor {
try {
aggregatorSink.saveClusterAggregateRecords(records);
} catch (Exception e) {
- LOG.warn(
- "Error writing cluster aggregate records metrics to external sink. "
- + e);
+ LOG.warn("Error writing cluster aggregate records metrics to external sink. ", e);
}
}
}
@@ -1398,8 +1434,8 @@ public class PhoenixHBaseAccessor {
*
* @throws SQLException
*/
- public void saveClusterTimeAggregateRecords(Map<TimelineClusterMetric, MetricHostAggregate> records,
- String tableName) throws SQLException {
+ public void saveClusterAggregateRecordsSecond(Map<TimelineClusterMetric, MetricHostAggregate> records,
+ String tableName) throws SQLException {
if (records == null || records.isEmpty()) {
LOG.debug("Empty aggregate records.");
return;
http://git-wip-us.apache.org/repos/asf/ambari/blob/c32eebf8/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricConfiguration.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricConfiguration.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricConfiguration.java
index 023465b..de33bd1 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricConfiguration.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricConfiguration.java
@@ -17,15 +17,8 @@
*/
package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline;
-import org.apache.commons.lang.StringUtils;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.conf.Configuration;
-
import java.io.BufferedReader;
-import java.io.IOException;
+import java.io.File;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.net.InetAddress;
@@ -37,6 +30,21 @@ import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.sink.ExternalSinkProvider;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.source.DefaultInternalMetricsSourceProvider;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.source.InternalSourceProvider;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.source.InternalSourceProvider.SOURCE_NAME;
+import org.apache.log4j.Appender;
+import org.apache.log4j.FileAppender;
+import org.apache.log4j.Logger;
+
/**
* Configuration class that reads properties from ams-site.xml. All values
* for time or intervals are given in seconds.
@@ -56,6 +64,12 @@ public class TimelineMetricConfiguration {
public static final String TIMELINE_METRIC_AGGREGATOR_SINK_CLASS =
"timeline.metrics.service.aggregator.sink.class";
+ public static final String TIMELINE_METRICS_SOURCE_PROVIDER_CLASS =
+ "timeline.metrics.service.source.provider.class";
+
+ public static final String TIMELINE_METRICS_SINK_PROVIDER_CLASS =
+ "timeline.metrics.service.sink.provider.class";
+
public static final String TIMELINE_METRICS_CACHE_SIZE =
"timeline.metrics.cache.size";
@@ -297,38 +311,63 @@ public class TimelineMetricConfiguration {
public static final String AMSHBASE_METRICS_WHITESLIST_FILE = "amshbase_metrics_whitelist";
public static final String TIMELINE_METRICS_HOST_INMEMORY_AGGREGATION = "timeline.metrics.host.inmemory.aggregation";
+ public static final String INTERNAL_CACHE_HEAP_PERCENT =
+ "timeline.metrics.service.cache.%s.heap.percent";
+
+ public static final String EXTERNAL_SINK_INTERVAL =
+ "timeline.metrics.service.external.sink.%s.interval";
+
+ public static final String DEFAULT_EXTERNAL_SINK_DIR =
+ "timeline.metrics.service.external.sink.dir";
private Configuration hbaseConf;
private Configuration metricsConf;
private Configuration amsEnvConf;
private volatile boolean isInitialized = false;
+ private static TimelineMetricConfiguration instance = new TimelineMetricConfiguration();
+
+ private TimelineMetricConfiguration() {}
+
+ public static TimelineMetricConfiguration getInstance() {
+ return instance;
+ }
+
+ // Tests
+ public TimelineMetricConfiguration(Configuration hbaseConf, Configuration metricsConf) {
+ this.hbaseConf = hbaseConf;
+ this.metricsConf = metricsConf;
+ this.isInitialized = true;
+ }
+
public void initialize() throws URISyntaxException, MalformedURLException {
- ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
- if (classLoader == null) {
- classLoader = getClass().getClassLoader();
- }
- URL hbaseResUrl = classLoader.getResource(HBASE_SITE_CONFIGURATION_FILE);
- URL amsResUrl = classLoader.getResource(METRICS_SITE_CONFIGURATION_FILE);
- LOG.info("Found hbase site configuration: " + hbaseResUrl);
- LOG.info("Found metric service configuration: " + amsResUrl);
-
- if (hbaseResUrl == null) {
- throw new IllegalStateException("Unable to initialize the metrics " +
- "subsystem. No hbase-site present in the classpath.");
- }
+ if (!isInitialized) {
+ ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
+ if (classLoader == null) {
+ classLoader = getClass().getClassLoader();
+ }
+ URL hbaseResUrl = classLoader.getResource(HBASE_SITE_CONFIGURATION_FILE);
+ URL amsResUrl = classLoader.getResource(METRICS_SITE_CONFIGURATION_FILE);
+ LOG.info("Found hbase site configuration: " + hbaseResUrl);
+ LOG.info("Found metric service configuration: " + amsResUrl);
+
+ if (hbaseResUrl == null) {
+ throw new IllegalStateException("Unable to initialize the metrics " +
+ "subsystem. No hbase-site present in the classpath.");
+ }
- if (amsResUrl == null) {
- throw new IllegalStateException("Unable to initialize the metrics " +
- "subsystem. No ams-site present in the classpath.");
- }
+ if (amsResUrl == null) {
+ throw new IllegalStateException("Unable to initialize the metrics " +
+ "subsystem. No ams-site present in the classpath.");
+ }
- hbaseConf = new Configuration(true);
- hbaseConf.addResource(hbaseResUrl.toURI().toURL());
- metricsConf = new Configuration(true);
- metricsConf.addResource(amsResUrl.toURI().toURL());
+ hbaseConf = new Configuration(true);
+ hbaseConf.addResource(hbaseResUrl.toURI().toURL());
+ metricsConf = new Configuration(true);
+ metricsConf.addResource(amsResUrl.toURI().toURL());
- isInitialized = true;
+ isInitialized = true;
+ }
}
public Configuration getHbaseConf() throws URISyntaxException, MalformedURLException {
@@ -346,31 +385,19 @@ public class TimelineMetricConfiguration {
}
public String getZKClientPort() throws MalformedURLException, URISyntaxException {
- if (!isInitialized) {
- initialize();
- }
- return hbaseConf.getTrimmed("hbase.zookeeper.property.clientPort", "2181");
+ return getHbaseConf().getTrimmed("hbase.zookeeper.property.clientPort", "2181");
}
public String getZKQuorum() throws MalformedURLException, URISyntaxException {
- if (!isInitialized) {
- initialize();
- }
- return hbaseConf.getTrimmed("hbase.zookeeper.quorum");
+ return getHbaseConf().getTrimmed("hbase.zookeeper.quorum");
}
public String getClusterZKClientPort() throws MalformedURLException, URISyntaxException {
- if (!isInitialized) {
- initialize();
- }
- return metricsConf.getTrimmed("cluster.zookeeper.property.clientPort", "2181");
+ return getMetricsConf().getTrimmed("cluster.zookeeper.property.clientPort", "2181");
}
public String getClusterZKQuorum() throws MalformedURLException, URISyntaxException {
- if (!isInitialized) {
- initialize();
- }
- return metricsConf.getTrimmed("cluster.zookeeper.quorum");
+ return getMetricsConf().getTrimmed("cluster.zookeeper.quorum");
}
public String getInstanceHostnameFromEnv() throws UnknownHostException {
@@ -390,12 +417,9 @@ public class TimelineMetricConfiguration {
return DEFAULT_INSTANCE_PORT;
}
- public String getWebappAddress() {
+ public String getWebappAddress() throws MalformedURLException, URISyntaxException {
String defaultHttpAddress = "0.0.0.0:6188";
- if (metricsConf != null) {
- return metricsConf.get(WEBAPP_HTTP_ADDRESS, defaultHttpAddress);
- }
- return defaultHttpAddress;
+ return getMetricsConf().get(WEBAPP_HTTP_ADDRESS, defaultHttpAddress);
}
public int getTimelineMetricsServiceHandlerThreadCount() {
@@ -450,8 +474,8 @@ public class TimelineMetricConfiguration {
public boolean isDistributedCollectorModeDisabled() {
try {
- if (metricsConf != null) {
- return Boolean.parseBoolean(metricsConf.get("timeline.metrics.service.distributed.collector.mode.disabled", "false"));
+ if (getMetricsConf() != null) {
+ return Boolean.parseBoolean(getMetricsConf().get("timeline.metrics.service.distributed.collector.mode.disabled", "false"));
}
return false;
} catch (Exception e) {
@@ -497,4 +521,50 @@ public class TimelineMetricConfiguration {
return whitelist;
}
+
+ public int getExternalSinkInterval(SOURCE_NAME sourceName) {
+ return Integer.parseInt(metricsConf.get(String.format(EXTERNAL_SINK_INTERVAL, sourceName), "-1"));
+ }
+
+ public InternalSourceProvider getInternalSourceProvider() {
+ Class<? extends InternalSourceProvider> providerClass =
+ metricsConf.getClass(TIMELINE_METRICS_SOURCE_PROVIDER_CLASS,
+ DefaultInternalMetricsSourceProvider.class, InternalSourceProvider.class);
+ return ReflectionUtils.newInstance(providerClass, metricsConf);
+ }
+
+ public ExternalSinkProvider getExternalSinkProvider() {
+ Class<?> providerClass = metricsConf.getClassByNameOrNull(TIMELINE_METRICS_SINK_PROVIDER_CLASS);
+ if (providerClass != null) {
+ return (ExternalSinkProvider) ReflectionUtils.newInstance(providerClass, metricsConf);
+ }
+ return null;
+ }
+
+ public String getInternalCacheHeapPercent(String instanceName) {
+ String heapPercent = metricsConf.get(String.format(INTERNAL_CACHE_HEAP_PERCENT, instanceName));
+ if (StringUtils.isEmpty(heapPercent)) {
+ return "5%";
+ } else {
+ return heapPercent.endsWith("%") ? heapPercent : heapPercent + "%";
+ }
+ }
+
+ public String getDefaultMetricsSinkDir() {
+ String dirPath = metricsConf.get(DEFAULT_EXTERNAL_SINK_DIR);
+ if (dirPath == null) {
+ // Only one logger at the time of writing
+ Appender appender = (Appender) Logger.getRootLogger().getAllAppenders().nextElement();
+ if (appender instanceof FileAppender) {
+ File f = new File(((FileAppender) appender).getFile());
+ if (f.exists()) {
+ dirPath = f.getParent();
+ } else {
+ dirPath = "/tmp";
+ }
+ }
+ }
+
+ return dirPath;
+ }
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/c32eebf8/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregator.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregator.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregator.java
index ba16b43..74d4013 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregator.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregator.java
@@ -83,7 +83,7 @@ public class TimelineMetricClusterAggregator extends AbstractTimelineAggregator
Map<TimelineClusterMetric, MetricHostAggregate> hostAggregateMap = aggregateMetricsFromResultSet(rs, endTime);
LOG.info("Saving " + hostAggregateMap.size() + " metric aggregates.");
- hBaseAccessor.saveClusterTimeAggregateRecords(hostAggregateMap, outputTableName);
+ hBaseAccessor.saveClusterAggregateRecordsSecond(hostAggregateMap, outputTableName);
}
private Map<TimelineClusterMetric, MetricHostAggregate> aggregateMetricsFromResultSet(ResultSet rs, long endTime)
http://git-wip-us.apache.org/repos/asf/ambari/blob/c32eebf8/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/discovery/TimelineMetricMetadataManager.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/discovery/TimelineMetricMetadataManager.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/discovery/TimelineMetricMetadataManager.java
index f904ebe..8a71756 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/discovery/TimelineMetricMetadataManager.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/discovery/TimelineMetricMetadataManager.java
@@ -27,8 +27,11 @@ import org.apache.hadoop.metrics2.sink.timeline.MetadataException;
import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
import org.apache.hadoop.metrics2.sink.timeline.TimelineMetricMetadata;
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixHBaseAccessor;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration;
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineClusterMetric;
+import java.net.MalformedURLException;
+import java.net.URISyntaxException;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Arrays;
@@ -69,17 +72,21 @@ public class TimelineMetricMetadataManager {
// Filter metrics names matching given patterns, from metadata
final List<String> metricNameFilters = new ArrayList<>();
- public TimelineMetricMetadataManager(PhoenixHBaseAccessor hBaseAccessor,
- Configuration metricsConf) {
- this.hBaseAccessor = hBaseAccessor;
+ // Test friendly construction since mock instrumentation is difficult to get
+ // working with hadoop mini cluster
+ public TimelineMetricMetadataManager(Configuration metricsConf, PhoenixHBaseAccessor hBaseAccessor) {
this.metricsConf = metricsConf;
-
+ this.hBaseAccessor = hBaseAccessor;
String patternStrings = metricsConf.get(TIMELINE_METRIC_METADATA_FILTERS);
if (!StringUtils.isEmpty(patternStrings)) {
metricNameFilters.addAll(Arrays.asList(patternStrings.split(",")));
}
}
+ public TimelineMetricMetadataManager(PhoenixHBaseAccessor hBaseAccessor) throws MalformedURLException, URISyntaxException {
+ this(TimelineMetricConfiguration.getInstance().getMetricsConf(), hBaseAccessor);
+ }
+
/**
* Initialize Metadata from the store
*/
http://git-wip-us.apache.org/repos/asf/ambari/blob/c32eebf8/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/sink/DefaultFSSinkProvider.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/sink/DefaultFSSinkProvider.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/sink/DefaultFSSinkProvider.java
new file mode 100644
index 0000000..6ec6cf9
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/sink/DefaultFSSinkProvider.java
@@ -0,0 +1,153 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.sink;
+
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_CACHE_COMMIT_INTERVAL;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Date;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.source.InternalSourceProvider;
+
+public class DefaultFSSinkProvider implements ExternalSinkProvider {
+ private static final Log LOG = LogFactory.getLog(DefaultFSSinkProvider.class);
+ TimelineMetricConfiguration conf = TimelineMetricConfiguration.getInstance();
+ private final DefaultExternalMetricsSink sink = new DefaultExternalMetricsSink();
+ private long FIXED_FILE_SIZE;
+ private final String SINK_FILE_NAME = "external-metrics-sink.dat";
+ private final String SEPARATOR = ", ";
+ private final String LINE_SEP = System.lineSeparator();
+ private final String HEADERS = "METRIC, APP_ID, INSTANCE_ID, HOSTNAME, START_TIME, DATA";
+
+ public DefaultFSSinkProvider() {
+ try {
+ FIXED_FILE_SIZE = conf.getMetricsConf().getLong("timeline.metrics.service.external.fs.sink.filesize", FileUtils.ONE_MB * 100);
+ } catch (Exception ignored) {
+ FIXED_FILE_SIZE = FileUtils.ONE_MB * 100;
+ }
+ }
+
+ @Override
+ public ExternalMetricsSink getExternalMetricsSink(InternalSourceProvider.SOURCE_NAME sourceName) {
+ return sink;
+ }
+
+ class DefaultExternalMetricsSink implements ExternalMetricsSink {
+
+ @Override
+ public int getSinkTimeOutSeconds() {
+ return 10;
+ }
+
+ @Override
+ public int getFlushSeconds() {
+ try {
+ return conf.getMetricsConf().getInt(TIMELINE_METRICS_CACHE_COMMIT_INTERVAL, 3);
+ } catch (Exception e) {
+ LOG.warn("Cannot read cache commit interval.");
+ }
+ return 3;
+ }
+
+ private boolean createFile(File f) {
+ boolean created = false;
+ if (!f.exists()) {
+ try {
+ created = f.createNewFile();
+ FileUtils.writeStringToFile(f, HEADERS);
+ } catch (IOException e) {
+ LOG.error("Cannot create " + SINK_FILE_NAME + " at " + f.getPath());
+ return false;
+ }
+ }
+
+ return created;
+ }
+
+ private boolean shouldReCreate(File f) {
+ if (!f.exists()) {
+ return true;
+ }
+ if (FileUtils.sizeOf(f) > FIXED_FILE_SIZE) {
+ return true;
+ }
+ return false;
+ }
+
+ @Override
+ public void sinkMetricData(Collection<TimelineMetrics> metrics) {
+ String dirPath = TimelineMetricConfiguration.getInstance().getDefaultMetricsSinkDir();
+ File dir = new File(dirPath);
+ if (!dir.exists()) {
+ LOG.error("Cannot sink data to file system, incorrect dir path " + dirPath);
+ return;
+ }
+
+ File f = FileUtils.getFile(dirPath, SINK_FILE_NAME);
+ if (shouldReCreate(f)) {
+ if (!f.delete()) {
+ LOG.warn("Unable to delete external sink file.");
+ return;
+ }
+ createFile(f);
+ }
+
+ if (metrics != null) {
+ for (TimelineMetrics timelineMetrics : metrics) {
+ for (TimelineMetric metric : timelineMetrics.getMetrics()) {
+ StringBuilder sb = new StringBuilder();
+ sb.append(metric.getMetricName());
+ sb.append(SEPARATOR);
+ sb.append(metric.getAppId());
+ sb.append(SEPARATOR);
+ if (StringUtils.isEmpty(metric.getInstanceId())) {
+ sb.append(SEPARATOR);
+ } else {
+ sb.append(metric.getInstanceId());
+ sb.append(SEPARATOR);
+ }
+ if (StringUtils.isEmpty(metric.getHostName())) {
+ sb.append(SEPARATOR);
+ } else {
+ sb.append(metric.getHostName());
+ sb.append(SEPARATOR);
+ }
+ sb.append(new Date(metric.getStartTime()));
+ sb.append(SEPARATOR);
+ sb.append(metric.getMetricValues().toString());
+ sb.append(LINE_SEP);
+ try {
+ FileUtils.writeStringToFile(f, sb.toString());
+ } catch (IOException e) {
+ LOG.warn("Unable to sink data to file " + f.getPath());
+ }
+ }
+ }
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/c32eebf8/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/sink/ExternalMetricsSink.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/sink/ExternalMetricsSink.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/sink/ExternalMetricsSink.java
new file mode 100644
index 0000000..ff06307
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/sink/ExternalMetricsSink.java
@@ -0,0 +1,48 @@
+package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.sink;
+
+import java.util.Collection;
+
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+public interface ExternalMetricsSink {
+ /**
+ * How many seconds to wait on sink before dropping metrics.
+ * Note: Care should be taken that this timeout does not bottleneck the
+ * sink thread.
+ */
+ int getSinkTimeOutSeconds();
+
+ /**
+ * How frequently to flush data to external system.
+ * Default would be between 60 - 120 seconds, coherent with default sink
+ * interval of AMS.
+ */
+ int getFlushSeconds();
+
+ /**
+ * Raw data stream to process / store on external system.
+ * The data will be held in an in-memory cache and flushed at flush seconds
+ * or when the cache size limit is exceeded we will flush the cache and
+ * drop data if write fails.
+ *
+ * @param metrics {@link Collection<TimelineMetrics>}
+ */
+ void sinkMetricData(Collection<TimelineMetrics> metrics);
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/c32eebf8/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/sink/ExternalSinkProvider.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/sink/ExternalSinkProvider.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/sink/ExternalSinkProvider.java
new file mode 100644
index 0000000..48887d9
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/sink/ExternalSinkProvider.java
@@ -0,0 +1,35 @@
+package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.sink;
+
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.source.InternalSourceProvider;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.source.InternalSourceProvider.SOURCE_NAME;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Configurable provider for sink classes that match the metrics sources.
+ * Provider can return same sink of different sinks for each source.
+ */
+public interface ExternalSinkProvider {
+
+ /**
+ * Return an instance of the metrics sink for the give source
+ * @return {@link ExternalMetricsSink}
+ */
+ ExternalMetricsSink getExternalMetricsSink(SOURCE_NAME sourceName);
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/c32eebf8/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/sink/HttpSinkProvider.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/sink/HttpSinkProvider.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/sink/HttpSinkProvider.java
new file mode 100644
index 0000000..bb84c8a
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/sink/HttpSinkProvider.java
@@ -0,0 +1,231 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.sink;
+
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_CACHE_COMMIT_INTERVAL;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.OutputStream;
+import java.net.HttpURLConnection;
+import java.net.URL;
+import java.security.KeyStore;
+import java.util.Collection;
+
+import javax.net.ssl.HttpsURLConnection;
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.SSLSocketFactory;
+import javax.net.ssl.TrustManagerFactory;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.source.InternalSourceProvider;
+import org.apache.http.client.utils.URIBuilder;
+import org.codehaus.jackson.map.AnnotationIntrospector;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jackson.map.annotate.JsonSerialize;
+import org.codehaus.jackson.xc.JaxbAnnotationIntrospector;
+
+public class HttpSinkProvider implements ExternalSinkProvider {
+ private static final Log LOG = LogFactory.getLog(HttpSinkProvider.class);
+ TimelineMetricConfiguration conf = TimelineMetricConfiguration.getInstance();
+
+ private String connectUrl;
+ private SSLSocketFactory sslSocketFactory;
+ protected static ObjectMapper mapper;
+
+ static {
+ mapper = new ObjectMapper();
+ AnnotationIntrospector introspector = new JaxbAnnotationIntrospector();
+ mapper.setAnnotationIntrospector(introspector);
+ mapper.getSerializationConfig()
+ .withSerializationInclusion(JsonSerialize.Inclusion.NON_NULL);
+ }
+
+ public HttpSinkProvider() {
+ Configuration config;
+ try {
+ config = conf.getMetricsConf();
+ } catch (Exception e) {
+ throw new ExceptionInInitializerError("Unable to read configuration for sink.");
+ }
+ String protocol = config.get("timeline.metrics.service.external.http.sink.protocol", "http");
+ String host = config.get("timeline.metrics.service.external.http.sink.host", "localhost");
+ String port = config.get("timeline.metrics.service.external.http.sink.port", "6189");
+
+ if (protocol.contains("https")) {
+ loadTruststore(
+ config.getTrimmed("timeline.metrics.service.external.http.sink.truststore.path"),
+ config.getTrimmed("timeline.metrics.service.external.http.sink.truststore.type"),
+ config.getTrimmed("timeline.metrics.service.external.http.sink.truststore.password")
+ );
+ }
+
+ URIBuilder uriBuilder = new URIBuilder();
+ uriBuilder.setScheme(protocol);
+ uriBuilder.setHost(host);
+ uriBuilder.setPort(Integer.parseInt(port));
+ connectUrl = uriBuilder.toString();
+ }
+
+ @Override
+ public ExternalMetricsSink getExternalMetricsSink(InternalSourceProvider.SOURCE_NAME sourceName) {
+ return null;
+ }
+
+ protected HttpURLConnection getConnection(String spec) throws IOException {
+ return (HttpURLConnection) new URL(spec).openConnection();
+ }
+
+ // Get an ssl connection
+ protected HttpsURLConnection getSSLConnection(String spec)
+ throws IOException, IllegalStateException {
+
+ HttpsURLConnection connection = (HttpsURLConnection) (new URL(spec).openConnection());
+ connection.setSSLSocketFactory(sslSocketFactory);
+ return connection;
+ }
+
+ protected void loadTruststore(String trustStorePath, String trustStoreType,
+ String trustStorePassword) {
+ if (sslSocketFactory == null) {
+ if (trustStorePath == null || trustStorePassword == null) {
+ String msg = "Can't load TrustStore. Truststore path or password is not set.";
+ LOG.error(msg);
+ throw new IllegalStateException(msg);
+ }
+ FileInputStream in = null;
+ try {
+ in = new FileInputStream(new File(trustStorePath));
+ KeyStore store = KeyStore.getInstance(trustStoreType == null ?
+ KeyStore.getDefaultType() : trustStoreType);
+ store.load(in, trustStorePassword.toCharArray());
+ TrustManagerFactory tmf = TrustManagerFactory
+ .getInstance(TrustManagerFactory.getDefaultAlgorithm());
+ tmf.init(store);
+ SSLContext context = SSLContext.getInstance("TLS");
+ context.init(null, tmf.getTrustManagers(), null);
+ sslSocketFactory = context.getSocketFactory();
+ } catch (Exception e) {
+ LOG.error("Unable to load TrustStore", e);
+ } finally {
+ if (in != null) {
+ try {
+ in.close();
+ } catch (IOException e) {
+ LOG.error("Unable to load TrustStore", e);
+ }
+ }
+ }
+ }
+ }
+
+ class DefaultHttpMetricsSink implements ExternalMetricsSink {
+
+ @Override
+ public int getSinkTimeOutSeconds() {
+ try {
+ return conf.getMetricsConf().getInt("timeline.metrics.service.external.http.sink.timeout.seconds", 10);
+ } catch (Exception e) {
+ return 10;
+ }
+ }
+
+ @Override
+ public int getFlushSeconds() {
+ try {
+ return conf.getMetricsConf().getInt(TIMELINE_METRICS_CACHE_COMMIT_INTERVAL, 3);
+ } catch (Exception e) {
+ LOG.warn("Cannot read cache commit interval.");
+ }
+ return 3;
+ }
+
+ /**
+ * Cleans up and closes an input stream
+ * see http://docs.oracle.com/javase/6/docs/technotes/guides/net/http-keepalive.html
+ * @param is the InputStream to clean up
+ * @return string read from the InputStream
+ * @throws IOException
+ */
+ protected String cleanupInputStream(InputStream is) throws IOException {
+ StringBuilder sb = new StringBuilder();
+ if (is != null) {
+ try (
+ InputStreamReader isr = new InputStreamReader(is);
+ BufferedReader br = new BufferedReader(isr)
+ ) {
+ // read the response body
+ String line;
+ while ((line = br.readLine()) != null) {
+ if (LOG.isDebugEnabled()) {
+ sb.append(line);
+ }
+ }
+ } finally {
+ is.close();
+ }
+ }
+ return sb.toString();
+ }
+
+ @Override
+ public void sinkMetricData(Collection<TimelineMetrics> metrics) {
+ HttpURLConnection connection = null;
+ try {
+ connection = connectUrl.startsWith("https") ? getSSLConnection(connectUrl) : getConnection(connectUrl);
+
+ connection.setRequestMethod("POST");
+ connection.setRequestProperty("Content-Type", "application/json");
+ connection.setRequestProperty("Connection", "Keep-Alive");
+ connection.setConnectTimeout(getSinkTimeOutSeconds());
+ connection.setReadTimeout(getSinkTimeOutSeconds());
+ connection.setDoOutput(true);
+
+ if (metrics != null) {
+ String jsonData = mapper.writeValueAsString(metrics);
+ try (OutputStream os = connection.getOutputStream()) {
+ os.write(jsonData.getBytes("UTF-8"));
+ }
+ }
+
+ int statusCode = connection.getResponseCode();
+
+ if (statusCode != 200) {
+ LOG.info("Unable to POST metrics to external sink, " + connectUrl +
+ ", statusCode = " + statusCode);
+ } else {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Metrics posted to external sink " + connectUrl);
+ }
+ }
+ cleanupInputStream(connection.getInputStream());
+
+ } catch (IOException io) {
+ LOG.warn("Unable to sink data to external system.", io);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/c32eebf8/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/source/DefaultInternalMetricsSourceProvider.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/source/DefaultInternalMetricsSourceProvider.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/source/DefaultInternalMetricsSourceProvider.java
new file mode 100644
index 0000000..b97c39f
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/source/DefaultInternalMetricsSourceProvider.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.source;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.sink.ExternalMetricsSink;
+
+public class DefaultInternalMetricsSourceProvider implements InternalSourceProvider {
+ private static final Log LOG = LogFactory.getLog(DefaultInternalMetricsSourceProvider.class);
+
+ // TODO: Implement read based sources for higher level data
+ @Override
+ public InternalMetricsSource getInternalMetricsSource(SOURCE_NAME sourceName, int sinkIntervalSeconds, ExternalMetricsSink sink) {
+ if (sink == null) {
+ LOG.warn("No external sink configured for source " + sourceName);
+ return null;
+ }
+
+ switch (sourceName) {
+ case RAW_METRICS:
+ return new RawMetricsSource(sinkIntervalSeconds, sink);
+ default:
+ throw new UnsupportedOperationException("Unimplemented source type " + sourceName);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/c32eebf8/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/source/InternalMetricsSource.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/source/InternalMetricsSource.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/source/InternalMetricsSource.java
new file mode 100644
index 0000000..a6e1092
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/source/InternalMetricsSource.java
@@ -0,0 +1,30 @@
+package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.source;
+
+import java.util.Collection;
+
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+public interface InternalMetricsSource {
+ /**
+ * Write metrics to external sink.
+ * Allows pre-processing and caching capabilities to the consumer.
+ */
+ void publishTimelineMetrics(Collection<TimelineMetrics> metrics);
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/c32eebf8/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/source/InternalSourceProvider.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/source/InternalSourceProvider.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/source/InternalSourceProvider.java
new file mode 100644
index 0000000..9d8ca36
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/source/InternalSourceProvider.java
@@ -0,0 +1,39 @@
+package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.source;
+
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.sink.ExternalMetricsSink;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+public interface InternalSourceProvider {
+
+ enum SOURCE_NAME {
+ RAW_METRICS,
+ MINUTE_HOST_AGGREAGATE_METRICS,
+ HOURLY_HOST_AGGREAGATE_METRICS,
+ DAILY_HOST_AGGREAGATE_METRICS,
+ MINUTE_CLUSTER_AGGREAGATE_METRICS,
+ HOURLY_CLUSTER_AGGREAGATE_METRICS,
+ DAILY_CLUSTER_AGGREAGATE_METRICS,
+ }
+
+ /**
+ * Provide Source for metrics data.
+ * @return {@link InternalMetricsSource}
+ */
+ InternalMetricsSource getInternalMetricsSource(SOURCE_NAME sourceName, int sinkIntervalSeconds, ExternalMetricsSink sink);
+}
[2/3] ambari git commit: AMBARI-21079. Add ability to sink Raw
metrics to external system via Http. (swagle)
Posted by sw...@apache.org.
http://git-wip-us.apache.org/repos/asf/ambari/blob/c32eebf8/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/source/RawMetricsSource.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/source/RawMetricsSource.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/source/RawMetricsSource.java
new file mode 100644
index 0000000..967d819
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/source/RawMetricsSource.java
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.source;
+
+import java.util.Collection;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.sink.ExternalMetricsSink;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.source.cache.InternalMetricsCache;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.source.cache.InternalMetricsCacheProvider;
+
+public class RawMetricsSource implements InternalMetricsSource {
+ private static final Log LOG = LogFactory.getLog(RawMetricsSource.class);
+ private final int internalCacheInterval;
+ private final ExternalMetricsSink rawMetricsSink;
+ private final ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
+ private final InternalMetricsCache cache;
+ static final String RAW_METRICS_CACHE = "RAW_METRICS_CACHE_INSTANCE";
+
+ public RawMetricsSource(int internalCacheInterval, ExternalMetricsSink rawMetricsSink) {
+ this.internalCacheInterval = internalCacheInterval;
+ this.rawMetricsSink = rawMetricsSink;
+ this.cache = InternalMetricsCacheProvider.getInstance().getCacheInstance(RAW_METRICS_CACHE);
+ if (rawMetricsSink.getFlushSeconds() > internalCacheInterval) {
+ initializeFixedRateScheduler();
+ }
+ }
+
+ @Override
+ public void publishTimelineMetrics(Collection<TimelineMetrics> metrics) {
+ // TODO: Adjust default flush to reasonable defaults > 3 seconds
+ if (rawMetricsSink.getFlushSeconds() > internalCacheInterval) {
+ // Need to cache only if external sink cannot keep up and thereby has
+ // different flush interval as compared to HBase flush
+ cache.putAll(metrics); // Scheduler initialized already for flush
+ } else {
+ submitDataWithTimeout(metrics);
+ }
+ }
+
+ private void initializeFixedRateScheduler() {
+ executorService.scheduleAtFixedRate(new Runnable() {
+ @Override
+ public void run() {
+ rawMetricsSink.sinkMetricData(cache.evictAll());
+ }
+ }, rawMetricsSink.getFlushSeconds(), rawMetricsSink.getFlushSeconds(), TimeUnit.SECONDS);
+ }
+
+ private void submitDataWithTimeout(final Collection<TimelineMetrics> metrics) {
+ Future f = executorService.submit(new Callable<Object>() {
+ @Override
+ public Object call() throws Exception {
+ rawMetricsSink.sinkMetricData(metrics);
+ return null;
+ }
+ });
+ try {
+ f.get(rawMetricsSink.getSinkTimeOutSeconds(), TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ LOG.warn("Raw metrics sink interrupted.");
+ } catch (ExecutionException e) {
+ LOG.warn("Exception on sinking metrics", e);
+ } catch (TimeoutException e) {
+ LOG.warn("Timeout exception on sinking metrics", e);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/c32eebf8/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/source/cache/InternalMetricCacheKey.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/source/cache/InternalMetricCacheKey.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/source/cache/InternalMetricCacheKey.java
new file mode 100644
index 0000000..28d457d
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/source/cache/InternalMetricCacheKey.java
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.source.cache;
+
+public class InternalMetricCacheKey {
+ private String metricName;
+ private String appId;
+ private String instanceId;
+ private String hostname;
+ private long startTime; // Useful for debugging
+
+ public InternalMetricCacheKey(String metricName, String appId, String instanceId, String hostname, long startTime) {
+ this.metricName = metricName;
+ this.appId = appId;
+ this.instanceId = instanceId;
+ this.hostname = hostname;
+ this.startTime = startTime;
+ }
+
+ public String getMetricName() {
+ return metricName;
+ }
+
+ public void setMetricName(String metricName) {
+ this.metricName = metricName;
+ }
+
+ public String getAppId() {
+ return appId;
+ }
+
+ public void setAppId(String appId) {
+ this.appId = appId;
+ }
+
+ public String getInstanceId() {
+ return instanceId;
+ }
+
+ public void setInstanceId(String instanceId) {
+ this.instanceId = instanceId;
+ }
+
+ public String getHostname() {
+ return hostname;
+ }
+
+ public void setHostname(String hostname) {
+ this.hostname = hostname;
+ }
+
+ public long getStartTime() {
+ return startTime;
+ }
+
+ public void setStartTime(long startTime) {
+ this.startTime = startTime;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+
+ InternalMetricCacheKey that = (InternalMetricCacheKey) o;
+
+ if (!getMetricName().equals(that.getMetricName())) return false;
+ if (!getAppId().equals(that.getAppId())) return false;
+ if (getInstanceId() != null ? !getInstanceId().equals(that.getInstanceId()) : that.getInstanceId() != null)
+ return false;
+ return getHostname() != null ? getHostname().equals(that.getHostname()) : that.getHostname() == null;
+
+ }
+
+ @Override
+ public int hashCode() {
+ int result = getMetricName().hashCode();
+ result = 31 * result + getAppId().hashCode();
+ result = 31 * result + (getInstanceId() != null ? getInstanceId().hashCode() : 0);
+ result = 31 * result + (getHostname() != null ? getHostname().hashCode() : 0);
+ return result;
+ }
+
+ @Override
+ public String toString() {
+ return "InternalMetricCacheKey{" +
+ "metricName='" + metricName + '\'' +
+ ", appId='" + appId + '\'' +
+ ", instanceId='" + instanceId + '\'' +
+ ", hostname='" + hostname + '\'' +
+ ", startTime=" + startTime +
+ '}';
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/c32eebf8/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/source/cache/InternalMetricCacheValue.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/source/cache/InternalMetricCacheValue.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/source/cache/InternalMetricCacheValue.java
new file mode 100644
index 0000000..a4dabe7
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/source/cache/InternalMetricCacheValue.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.source.cache;
+
+import java.util.Map;
+import java.util.TreeMap;
+
+public class InternalMetricCacheValue {
+ private TreeMap<Long, Double> metricValues = new TreeMap<Long, Double>();
+
+ public TreeMap<Long, Double> getMetricValues() {
+ return metricValues;
+ }
+
+ public void setMetricValues(TreeMap<Long, Double> metricValues) {
+ this.metricValues = metricValues;
+ }
+
+ public void addMetricValues(Map<Long, Double> metricValues) {
+ this.metricValues.putAll(metricValues);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/c32eebf8/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/source/cache/InternalMetricsCache.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/source/cache/InternalMetricsCache.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/source/cache/InternalMetricsCache.java
new file mode 100644
index 0000000..a4ed9bc
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/source/cache/InternalMetricsCache.java
@@ -0,0 +1,231 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.source.cache;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Date;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
+
+import net.sf.ehcache.Cache;
+import net.sf.ehcache.CacheException;
+import net.sf.ehcache.CacheManager;
+import net.sf.ehcache.Ehcache;
+import net.sf.ehcache.Element;
+import net.sf.ehcache.config.CacheConfiguration;
+import net.sf.ehcache.config.PersistenceConfiguration;
+import net.sf.ehcache.config.SizeOfPolicyConfiguration;
+import net.sf.ehcache.event.CacheEventListener;
+import net.sf.ehcache.store.MemoryStoreEvictionPolicy;
+
+public class InternalMetricsCache {
+ private static final Log LOG = LogFactory.getLog(InternalMetricsCache.class);
+ private final String instanceName;
+ private final String maxHeapPercent;
+ private volatile boolean isCacheInitialized = false;
+ private Cache cache;
+ static final String TIMELINE_METRIC_CACHE_MANAGER_NAME = "internalMetricsCacheManager";
+ private final Lock lock = new ReentrantLock();
+ private static final int LOCK_TIMEOUT_SECONDS = 2;
+
+ public InternalMetricsCache(String instanceName, String maxHeapPercent) {
+ this.instanceName = instanceName;
+ this.maxHeapPercent = maxHeapPercent;
+ initialize();
+ }
+
+ private void initialize() {
+ // Check in case of contention to avoid ObjectExistsException
+ if (isCacheInitialized) {
+ throw new RuntimeException("Cannot initialize internal cache twice");
+ }
+
+ System.setProperty("net.sf.ehcache.skipUpdateCheck", "true");
+ System.setProperty("net.sf.ehcache.sizeofengine." + TIMELINE_METRIC_CACHE_MANAGER_NAME,
+ "org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.source.cache.InternalMetricsCacheSizeOfEngine");
+
+ net.sf.ehcache.config.Configuration managerConfig =
+ new net.sf.ehcache.config.Configuration();
+ managerConfig.setName(TIMELINE_METRIC_CACHE_MANAGER_NAME);
+
+ // Set max heap available to the cache manager
+ managerConfig.setMaxBytesLocalHeap(maxHeapPercent);
+
+ //Create a singleton CacheManager using defaults
+ CacheManager manager = CacheManager.create(managerConfig);
+
+ LOG.info("Creating Metrics Cache with maxHeapPercent => " + maxHeapPercent);
+
+ // Create a Cache specifying its configuration.
+ CacheConfiguration cacheConfiguration = new CacheConfiguration()
+ .name(instanceName)
+ .memoryStoreEvictionPolicy(MemoryStoreEvictionPolicy.LRU)
+ .sizeOfPolicy(new SizeOfPolicyConfiguration() // Set sizeOf policy to continue on max depth reached - avoid OOM
+ .maxDepth(10000)
+ .maxDepthExceededBehavior(SizeOfPolicyConfiguration.MaxDepthExceededBehavior.CONTINUE))
+ .eternal(true) // infinite time until eviction
+ .persistence(new PersistenceConfiguration()
+ .strategy(PersistenceConfiguration.Strategy.NONE.name()));
+
+ cache = new Cache(cacheConfiguration);
+ cache.getCacheEventNotificationService().registerListener(new InternalCacheEvictionListener());
+
+ LOG.info("Registering internal metrics cache with provider: name = " +
+ cache.getName() + ", guid: " + cache.getGuid());
+
+ manager.addCache(cache);
+
+ isCacheInitialized = true;
+ }
+
+ public InternalMetricCacheValue getInternalMetricCacheValue(InternalMetricCacheKey key) {
+ Element ele = cache.get(key);
+ if (ele != null) {
+ return (InternalMetricCacheValue) ele.getObjectValue();
+ }
+ return null;
+ }
+
+ public Collection<TimelineMetrics> evictAll() {
+ TimelineMetrics metrics = new TimelineMetrics();
+ try {
+ if (lock.tryLock(LOCK_TIMEOUT_SECONDS, TimeUnit.SECONDS)) {
+ try{
+ List keys = cache.getKeys();
+ for (Object obj : keys) {
+ TimelineMetric metric = new TimelineMetric();
+ InternalMetricCacheKey key = (InternalMetricCacheKey) obj;
+ metric.setMetricName(key.getMetricName());
+ metric.setAppId(key.getAppId());
+ metric.setInstanceId(key.getInstanceId());
+ metric.setHostName(key.getHostname());
+ metric.setStartTime(key.getStartTime());
+ metric.setTimestamp(key.getStartTime());
+ Element ele = cache.get(key);
+ metric.setMetricValues(((InternalMetricCacheValue) ele.getObjectValue()).getMetricValues());
+ metrics.getMetrics().add(metric);
+ }
+ cache.removeAll();
+ } finally {
+ lock.unlock();
+ }
+ } else {
+ LOG.warn("evictAll: Unable to acquire lock on the cache instance. " +
+ "Giving up after " + LOCK_TIMEOUT_SECONDS + " seconds.");
+ }
+ } catch (InterruptedException e) {
+ LOG.warn("Interrupted while waiting to acquire lock");
+ }
+
+ return Collections.singletonList(metrics);
+ }
+
+ public void putAll(Collection<TimelineMetrics> metrics) {
+ try {
+ if (lock.tryLock(LOCK_TIMEOUT_SECONDS, TimeUnit.SECONDS)) {
+ try {
+ if (metrics != null) {
+ for (TimelineMetrics timelineMetrics : metrics) {
+ for (TimelineMetric timelineMetric : timelineMetrics.getMetrics()) {
+ InternalMetricCacheKey key = new InternalMetricCacheKey(
+ timelineMetric.getMetricName(),
+ timelineMetric.getAppId(),
+ timelineMetric.getInstanceId(),
+ timelineMetric.getHostName(),
+ timelineMetric.getStartTime()
+ );
+
+ Element ele = cache.get(key);
+ if (ele != null) {
+ InternalMetricCacheValue value = (InternalMetricCacheValue) ele.getObjectValue();
+ value.addMetricValues(timelineMetric.getMetricValues());
+ } else {
+ InternalMetricCacheValue value = new InternalMetricCacheValue();
+ value.setMetricValues(timelineMetric.getMetricValues());
+ cache.put(new Element(key, value));
+ }
+ }
+ }
+ }
+ } finally {
+ lock.unlock();
+ }
+ } else {
+ LOG.warn("putAll: Unable to acquire lock on the cache instance. " +
+ "Giving up after " + LOCK_TIMEOUT_SECONDS + " seconds.");
+ }
+ } catch (InterruptedException e) {
+ LOG.warn("Interrupted while waiting to acquire lock");
+ }
+ }
+
+ class InternalCacheEvictionListener implements CacheEventListener {
+
+ @Override
+ public void notifyElementRemoved(Ehcache cache, Element element) throws CacheException {
+ // expected
+ }
+
+ @Override
+ public void notifyElementPut(Ehcache cache, Element element) throws CacheException {
+ // do nothing
+ }
+
+ @Override
+ public void notifyElementUpdated(Ehcache cache, Element element) throws CacheException {
+ // do nothing
+ }
+
+ @Override
+ public void notifyElementExpired(Ehcache cache, Element element) {
+ // do nothing
+ }
+
+ @Override
+ public void notifyElementEvicted(Ehcache cache, Element element) {
+ // Bad - Remote endpoint cannot keep up resulting in flooding
+ InternalMetricCacheKey key = (InternalMetricCacheKey) element.getObjectKey();
+ LOG.warn("Evicting element from internal metrics cache, metric => " + key
+ .getMetricName() + ", startTime = " + new Date(key.getStartTime()));
+ }
+
+ @Override
+ public void notifyRemoveAll(Ehcache cache) {
+ // expected
+ }
+
+ @Override
+ public Object clone() throws CloneNotSupportedException {
+ return null;
+ }
+
+ @Override
+ public void dispose() {
+ // do nothing
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/c32eebf8/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/source/cache/InternalMetricsCacheProvider.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/source/cache/InternalMetricsCacheProvider.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/source/cache/InternalMetricsCacheProvider.java
new file mode 100644
index 0000000..3e0dc1b
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/source/cache/InternalMetricsCacheProvider.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.source.cache;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration;
+
+public class InternalMetricsCacheProvider {
+ private Map<String, InternalMetricsCache> metricsCacheMap = new ConcurrentHashMap<>();
+ private static final InternalMetricsCacheProvider instance = new InternalMetricsCacheProvider();
+
+ private InternalMetricsCacheProvider() {
+ }
+
+ public static InternalMetricsCacheProvider getInstance() {
+ return instance;
+ }
+
+ public InternalMetricsCache getCacheInstance(String instanceName) {
+ if (metricsCacheMap.containsKey(instanceName)) {
+ return metricsCacheMap.get(instanceName);
+ } else {
+ TimelineMetricConfiguration conf = TimelineMetricConfiguration.getInstance();
+ InternalMetricsCache cache = new InternalMetricsCache(instanceName,
+ conf.getInternalCacheHeapPercent(instanceName));
+
+ metricsCacheMap.put(instanceName, cache);
+ return cache;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/c32eebf8/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/source/cache/InternalMetricsCacheSizeOfEngine.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/source/cache/InternalMetricsCacheSizeOfEngine.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/source/cache/InternalMetricsCacheSizeOfEngine.java
new file mode 100644
index 0000000..d1a1a89
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/source/cache/InternalMetricsCacheSizeOfEngine.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.source.cache;
+
+import org.apache.hadoop.metrics2.sink.timeline.cache.TimelineMetricsEhCacheSizeOfEngine;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import net.sf.ehcache.pool.Size;
+import net.sf.ehcache.pool.SizeOfEngine;
+
+public class InternalMetricsCacheSizeOfEngine extends TimelineMetricsEhCacheSizeOfEngine {
+ private final static Logger LOG = LoggerFactory.getLogger(InternalMetricsCacheSizeOfEngine.class);
+
+ private InternalMetricsCacheSizeOfEngine(SizeOfEngine underlying) {
+ super(underlying);
+ }
+
+ public InternalMetricsCacheSizeOfEngine() {
+ // Invoke default constructor in base class
+ }
+
+ @Override
+ public Size sizeOf(Object key, Object value, Object container) {
+ try {
+ LOG.debug("BEGIN - Sizeof, key: {}, value: {}", key, value);
+ long size = 0;
+ if (key instanceof InternalMetricCacheKey) {
+ InternalMetricCacheKey metricCacheKey = (InternalMetricCacheKey) key;
+ size += reflectionSizeOf.sizeOf(metricCacheKey.getMetricName());
+ size += reflectionSizeOf.sizeOf(metricCacheKey.getAppId());
+ size += reflectionSizeOf.sizeOf(metricCacheKey.getInstanceId()); // null safe
+ size += reflectionSizeOf.sizeOf(metricCacheKey.getHostname());
+ }
+ if (value instanceof InternalMetricCacheValue) {
+ size += getValueMapSize(((InternalMetricCacheValue) value).getMetricValues());
+ }
+ // Mark size as not being exact
+ return new Size(size, false);
+ } finally {
+ LOG.debug("END - Sizeof, key: {}", key);
+ }
+ }
+
+ @Override
+ public SizeOfEngine copyWith(int maxDepth, boolean abortWhenMaxDepthExceeded) {
+ LOG.debug("Copying tracing sizeof engine, maxdepth: {}, abort: {}",
+ maxDepth, abortWhenMaxDepthExceeded);
+
+ return new InternalMetricsCacheSizeOfEngine(underlying.copyWith(maxDepth, abortWhenMaxDepthExceeded));
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/c32eebf8/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/TestApplicationHistoryServer.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/TestApplicationHistoryServer.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/TestApplicationHistoryServer.java
index 3688630..41ddef5 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/TestApplicationHistoryServer.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/TestApplicationHistoryServer.java
@@ -25,7 +25,7 @@ import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.service.Service.STATE;
import org.apache.hadoop.util.ExitUtil;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.HBaseTimelineMetricStore;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.HBaseTimelineMetricsService;
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixHBaseAccessor;
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration;
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.MetricCollectorHAController;
@@ -73,7 +73,7 @@ import static org.powermock.api.support.membermodification.MemberMatcher.method;
import static org.powermock.api.support.membermodification.MemberModifier.suppress;
@RunWith(PowerMockRunner.class)
-@PrepareForTest({ PhoenixHBaseAccessor.class, HBaseTimelineMetricStore.class, UserGroupInformation.class,
+@PrepareForTest({ PhoenixHBaseAccessor.class, HBaseTimelineMetricsService.class, UserGroupInformation.class,
ClientCnxn.class, DefaultPhoenixDataSource.class, ConnectionFactory.class,
TimelineMetricConfiguration.class, ApplicationHistoryServer.class })
@PowerMockIgnore( {"javax.management.*"})
http://git-wip-us.apache.org/repos/asf/ambari/blob/c32eebf8/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/AbstractMiniHBaseClusterTest.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/AbstractMiniHBaseClusterTest.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/AbstractMiniHBaseClusterTest.java
index 611d82e..fbf7b09 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/AbstractMiniHBaseClusterTest.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/AbstractMiniHBaseClusterTest.java
@@ -17,10 +17,32 @@
*/
package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.OUT_OFF_BAND_DATA_TIME_ALLOWANCE;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_RECORD_TABLE_NAME;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.UPSERT_METRICS_SQL;
+import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.replay;
+import static org.powermock.api.easymock.PowerMock.mockStatic;
+import static org.powermock.api.easymock.PowerMock.replayAll;
+
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.IntegrationTestingUtility;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
@@ -41,24 +63,6 @@ import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
-import java.io.IOException;
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.PreparedStatement;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.sql.Statement;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.OUT_OFF_BAND_DATA_TIME_ALLOWANCE;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_RECORD_TABLE_NAME;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.UPSERT_METRICS_SQL;
-import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
-import static org.assertj.core.api.Assertions.assertThat;
-
public abstract class AbstractMiniHBaseClusterTest extends BaseTest {
protected static final long BATCH_SIZE = 3;
@@ -200,11 +204,8 @@ public abstract class AbstractMiniHBaseClusterTest extends BaseTest {
metricsConf.setLong(OUT_OFF_BAND_DATA_TIME_ALLOWANCE, 600000);
return
- new PhoenixHBaseAccessor(
- new Configuration(),
- metricsConf,
+ new PhoenixHBaseAccessor(new TimelineMetricConfiguration(new Configuration(), metricsConf),
new PhoenixConnectionProvider() {
-
@Override
public HBaseAdmin getHBaseAdmin() throws IOException {
try {
@@ -229,7 +230,7 @@ public abstract class AbstractMiniHBaseClusterTest extends BaseTest {
}
protected void insertMetricRecords(Connection conn, TimelineMetrics metrics, long currentTime)
- throws SQLException, IOException {
+ throws SQLException, IOException {
List<TimelineMetric> timelineMetrics = metrics.getMetrics();
if (timelineMetrics == null || timelineMetrics.isEmpty()) {
http://git-wip-us.apache.org/repos/asf/ambari/blob/c32eebf8/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStoreTest.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStoreTest.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStoreTest.java
index aae1d4b..f035678 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStoreTest.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStoreTest.java
@@ -33,7 +33,7 @@ import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.ti
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.Function.PostProcessingFunction.RATE;
import static org.assertj.core.api.Assertions.assertThat;
-public class HBaseTimelineMetricStoreTest {
+public class HBaseTimelineMetricsServiceTest {
public static final String MEM_METRIC = "mem";
public static final String BYTES_IN_METRIC = "bytes_in";
@@ -51,7 +51,7 @@ public class HBaseTimelineMetricStoreTest {
//when
Multimap<String, List<Function>> multimap =
- HBaseTimelineMetricStore.parseMetricNamesToAggregationFunctions(metricNames);
+ HBaseTimelineMetricsService.parseMetricNamesToAggregationFunctions(metricNames);
//then
Assert.assertEquals(multimap.keySet().size(), 3);
@@ -104,7 +104,7 @@ public class HBaseTimelineMetricStoreTest {
metricValues.put(1454016728371L, 1011.25);
// Calculate rate
- Map<Long, Double> rates = HBaseTimelineMetricStore.updateValuesAsRate(new TreeMap<>(metricValues), false);
+ Map<Long, Double> rates = HBaseTimelineMetricsService.updateValuesAsRate(new TreeMap<>(metricValues), false);
// Make sure rate is zero
for (Map.Entry<Long, Double> rateEntry : rates.entrySet()) {
@@ -122,7 +122,7 @@ public class HBaseTimelineMetricStoreTest {
metricValues.put(1454016548371L, 1010.25);
metricValues.put(1454016608371L, 1010.25);
- Map<Long, Double> rates = HBaseTimelineMetricStore.updateValuesAsRate(new TreeMap<>(metricValues), true);
+ Map<Long, Double> rates = HBaseTimelineMetricsService.updateValuesAsRate(new TreeMap<>(metricValues), true);
Assert.assertTrue(rates.size()==4);
Assert.assertTrue(rates.containsValue(-1.0));
http://git-wip-us.apache.org/repos/asf/ambari/blob/c32eebf8/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITPhoenixHBaseAccessor.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITPhoenixHBaseAccessor.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITPhoenixHBaseAccessor.java
index d5baaef..f6d69f6 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITPhoenixHBaseAccessor.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITPhoenixHBaseAccessor.java
@@ -17,9 +17,33 @@
*/
package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline;
-import com.google.common.collect.ArrayListMultimap;
-import com.google.common.collect.Multimap;
-import junit.framework.Assert;
+import static junit.framework.Assert.assertEquals;
+import static junit.framework.Assert.assertTrue;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.MetricTestHelper.createEmptyTimelineClusterMetric;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.MetricTestHelper.createEmptyTimelineMetric;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.MetricTestHelper.createMetricHostAggregate;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.MetricTestHelper.prepareSingleTimelineMetric;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixHBaseAccessor.DATE_TIERED_COMPACTION_POLICY;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixHBaseAccessor.FIFO_COMPACTION_POLICY_CLASS;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixHBaseAccessor.HSTORE_COMPACTION_CLASS_KEY;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixHBaseAccessor.HSTORE_ENGINE_CLASS;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_AGGREGATE_MINUTE_TABLE_NAME;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_RECORD_TABLE_NAME;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.PHOENIX_TABLES;
+
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
@@ -40,35 +64,10 @@ import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.DefaultCondition;
import org.junit.Test;
-import java.io.IOException;
-import java.lang.reflect.Array;
-import java.lang.reflect.Field;
-import java.sql.SQLException;
-import java.sql.Connection;
-import java.sql.PreparedStatement;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.sql.Statement;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.Multimap;
-import static junit.framework.Assert.assertEquals;
-import static junit.framework.Assert.assertTrue;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.MetricTestHelper.createEmptyTimelineClusterMetric;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.MetricTestHelper.createEmptyTimelineMetric;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.MetricTestHelper.createMetricHostAggregate;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.MetricTestHelper.prepareSingleTimelineMetric;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixHBaseAccessor.DATE_TIERED_COMPACTION_POLICY;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixHBaseAccessor.FIFO_COMPACTION_POLICY_CLASS;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixHBaseAccessor.HSTORE_COMPACTION_CLASS_KEY;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixHBaseAccessor.HSTORE_ENGINE_CLASS;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_AGGREGATE_MINUTE_TABLE_NAME;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_RECORD_TABLE_NAME;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.PHOENIX_TABLES;
+import junit.framework.Assert;
@@ -93,7 +92,8 @@ public class ITPhoenixHBaseAccessor extends AbstractMiniHBaseClusterTest {
// WHEN
long endTime = ctime + minute;
Condition condition = new DefaultCondition(
- Collections.singletonList("disk_free"), Collections.singletonList("local1"),
+ new ArrayList<String>() {{ add("disk_free"); }},
+ Collections.singletonList("local1"),
null, null, startTime, endTime, Precision.SECONDS, null, true);
TimelineMetrics timelineMetrics = hdb.getMetricRecords(condition,
singletonValueFunctionMap("disk_free"));
@@ -117,18 +117,19 @@ public class ITPhoenixHBaseAccessor extends AbstractMiniHBaseClusterTest {
long ctime = startTime;
long minute = 60 * 1000;
hdb.insertMetricRecords(prepareSingleTimelineMetric(ctime, "local1",
- "disk_free", 1));
+ "disk_free", 1));
hdb.insertMetricRecords(prepareSingleTimelineMetric(ctime + minute, "local1",
- "disk_free", 2));
+ "disk_free", 2));
hdb.insertMetricRecords(prepareSingleTimelineMetric(ctime, "local2",
- "disk_free", 2));
+ "disk_free", 2));
long endTime = ctime + minute;
boolean success = aggregatorMinute.doWork(startTime, endTime);
assertTrue(success);
// WHEN
Condition condition = new DefaultCondition(
- Collections.singletonList("disk_free"), Collections.singletonList("local1"),
+ new ArrayList<String>() {{ add("disk_free"); }},
+ Collections.singletonList("local1"),
null, null, startTime, endTime, Precision.MINUTES, null, false);
TimelineMetrics timelineMetrics = hdb.getMetricRecords(condition,
singletonValueFunctionMap("disk_free"));
@@ -151,10 +152,10 @@ public class ITPhoenixHBaseAccessor extends AbstractMiniHBaseClusterTest {
TimelineMetricAggregatorFactory.createTimelineMetricAggregatorHourly(hdb, new Configuration(), null);
MetricHostAggregate expectedAggregate =
- createMetricHostAggregate(2.0, 0.0, 20, 15.0);
+ createMetricHostAggregate(2.0, 0.0, 20, 15.0);
Map<TimelineMetric, MetricHostAggregate>
- aggMap = new HashMap<TimelineMetric,
- MetricHostAggregate>();
+ aggMap = new HashMap<TimelineMetric,
+ MetricHostAggregate>();
long startTime = System.currentTimeMillis();
int min_5 = 5 * 60 * 1000;
@@ -179,7 +180,8 @@ public class ITPhoenixHBaseAccessor extends AbstractMiniHBaseClusterTest {
// WHEN
Condition condition = new DefaultCondition(
- Collections.singletonList("disk_used"), Collections.singletonList("test_host"),
+ new ArrayList<String>() {{ add("disk_free"); }},
+ Collections.singletonList("test_host"),
"test_app", null, startTime, endTime, Precision.HOURS, null, true);
TimelineMetrics timelineMetrics = hdb.getMetricRecords(condition,
singletonValueFunctionMap("disk_used"));
@@ -200,20 +202,20 @@ public class ITPhoenixHBaseAccessor extends AbstractMiniHBaseClusterTest {
// GIVEN
TimelineMetricAggregator agg =
TimelineMetricAggregatorFactory.createTimelineClusterAggregatorSecond(
- hdb, new Configuration(), new TimelineMetricMetadataManager(hdb, new Configuration()), null);
+ hdb, new Configuration(), new TimelineMetricMetadataManager(new Configuration(), hdb), null);
long startTime = System.currentTimeMillis();
long ctime = startTime + 1;
long minute = 60 * 1000;
hdb.insertMetricRecords(prepareSingleTimelineMetric(ctime, "local1",
- "disk_free", 1));
+ "disk_free", 1));
hdb.insertMetricRecords(prepareSingleTimelineMetric(ctime, "local2",
- "disk_free", 2));
+ "disk_free", 2));
ctime += minute;
hdb.insertMetricRecords(prepareSingleTimelineMetric(ctime, "local1",
- "disk_free", 2));
+ "disk_free", 2));
hdb.insertMetricRecords(prepareSingleTimelineMetric(ctime, "local2",
- "disk_free", 1));
+ "disk_free", 1));
long endTime = ctime + minute + 1;
boolean success = agg.doWork(startTime, endTime);
@@ -221,8 +223,8 @@ public class ITPhoenixHBaseAccessor extends AbstractMiniHBaseClusterTest {
// WHEN
Condition condition = new DefaultCondition(
- Collections.singletonList("disk_free"), null, null, null,
- startTime, endTime, Precision.SECONDS, null, true);
+ new ArrayList<String>() {{ add("disk_free"); }},
+ null, null, null, startTime, endTime, Precision.SECONDS, null, true);
TimelineMetrics timelineMetrics = hdb.getAggregateMetricRecords(condition,
singletonValueFunctionMap("disk_free"));
@@ -240,7 +242,7 @@ public class ITPhoenixHBaseAccessor extends AbstractMiniHBaseClusterTest {
// GIVEN
TimelineMetricAggregator agg =
TimelineMetricAggregatorFactory.createTimelineClusterAggregatorSecond(hdb,
- new Configuration(), new TimelineMetricMetadataManager(hdb, new Configuration()), null);
+ new Configuration(), new TimelineMetricMetadataManager(new Configuration(), hdb), null);
long startTime = System.currentTimeMillis();
long ctime = startTime + 1;
@@ -261,8 +263,8 @@ public class ITPhoenixHBaseAccessor extends AbstractMiniHBaseClusterTest {
// WHEN
Condition condition = new DefaultCondition(
- Collections.singletonList("disk_free"), null, null, null,
- null, null, Precision.SECONDS, null, true);
+ new ArrayList<String>() {{ add("disk_free"); }},
+ null, null, null, null, null, Precision.SECONDS, null, true);
Multimap<String, List<Function>> mmap = ArrayListMultimap.create();
mmap.put("disk_free", Collections.singletonList(new Function(Function.ReadFunction.SUM, null)));
@@ -288,14 +290,14 @@ public class ITPhoenixHBaseAccessor extends AbstractMiniHBaseClusterTest {
long minute = 60 * 1000;
Map<TimelineClusterMetric, MetricClusterAggregate> records =
- new HashMap<TimelineClusterMetric, MetricClusterAggregate>();
+ new HashMap<TimelineClusterMetric, MetricClusterAggregate>();
records.put(createEmptyTimelineClusterMetric(ctime),
new MetricClusterAggregate(4.0, 2, 0.0, 4.0, 0.0));
records.put(createEmptyTimelineClusterMetric(ctime += minute),
new MetricClusterAggregate(4.0, 2, 0.0, 4.0, 0.0));
records.put(createEmptyTimelineClusterMetric(ctime += minute),
- new MetricClusterAggregate(4.0, 2, 0.0, 4.0, 0.0));
+ new MetricClusterAggregate(4.0, 2, 0.0, 4.0, 0.0));
records.put(createEmptyTimelineClusterMetric(ctime += minute),
new MetricClusterAggregate(4.0, 2, 0.0, 4.0, 0.0));
@@ -305,8 +307,8 @@ public class ITPhoenixHBaseAccessor extends AbstractMiniHBaseClusterTest {
// WHEN
Condition condition = new DefaultCondition(
- Collections.singletonList("disk_used"), null, null, null,
- startTime, ctime + minute, Precision.HOURS, null, true);
+ new ArrayList<String>() {{ add("disk_free"); }},
+ null, null, null, startTime, ctime + minute, Precision.HOURS, null, true);
TimelineMetrics timelineMetrics = hdb.getAggregateMetricRecords(condition,
singletonValueFunctionMap("disk_used"));
http://git-wip-us.apache.org/repos/asf/ambari/blob/c32eebf8/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessorTest.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessorTest.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessorTest.java
index d668178..bf9246d 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessorTest.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessorTest.java
@@ -35,6 +35,7 @@ import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL;
import org.apache.phoenix.exception.PhoenixIOException;
import org.easymock.EasyMock;
+import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.powermock.api.easymock.PowerMock;
@@ -42,6 +43,7 @@ import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
import java.io.IOException;
+import java.lang.reflect.Field;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
@@ -53,22 +55,36 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import static org.easymock.EasyMock.expect;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
+import static org.powermock.api.easymock.PowerMock.*;
@RunWith(PowerMockRunner.class)
-@PrepareForTest(PhoenixTransactSQL.class)
+@PrepareForTest({PhoenixTransactSQL.class, TimelineMetricConfiguration.class})
public class PhoenixHBaseAccessorTest {
private static final String ZOOKEEPER_QUORUM = "hbase.zookeeper.quorum";
- @Test
- public void testGetMetricRecords() throws SQLException, IOException {
+ PhoenixConnectionProvider connectionProvider;
+ PhoenixHBaseAccessor accessor;
+ @Before
+ public void setupConf() throws Exception {
Configuration hbaseConf = new Configuration();
hbaseConf.setStrings(ZOOKEEPER_QUORUM, "quorum");
Configuration metricsConf = new Configuration();
+ metricsConf.setStrings(TimelineMetricConfiguration.TIMELINE_METRICS_CACHE_SIZE, "1");
+ metricsConf.setStrings(TimelineMetricConfiguration.TIMELINE_METRICS_CACHE_COMMIT_INTERVAL, "100");
+ metricsConf.setStrings(
+ TimelineMetricConfiguration.TIMELINE_METRIC_AGGREGATOR_SINK_CLASS,
+ "org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricsAggregatorMemorySink");
+
+ TimelineMetricConfiguration conf = new TimelineMetricConfiguration(hbaseConf, metricsConf);
+ mockStatic(TimelineMetricConfiguration.class);
+ expect(TimelineMetricConfiguration.getInstance()).andReturn(conf).anyTimes();
+ replayAll();
- PhoenixConnectionProvider connectionProvider = new PhoenixConnectionProvider() {
+ connectionProvider = new PhoenixConnectionProvider() {
@Override
public HBaseAdmin getHBaseAdmin() throws IOException {
return null;
@@ -80,21 +96,24 @@ public class PhoenixHBaseAccessorTest {
}
};
- PhoenixHBaseAccessor accessor = new PhoenixHBaseAccessor(hbaseConf, metricsConf, connectionProvider);
+ accessor = new PhoenixHBaseAccessor(connectionProvider);
+ }
+ @Test
+ public void testGetMetricRecords() throws SQLException, IOException {
List<String> metricNames = new LinkedList<>();
List<String> hostnames = new LinkedList<>();
Multimap<String, List<Function>> metricFunctions = ArrayListMultimap.create();
- PowerMock.mockStatic(PhoenixTransactSQL.class);
+ mockStatic(PhoenixTransactSQL.class);
PreparedStatement preparedStatementMock = EasyMock.createNiceMock(PreparedStatement.class);
Condition condition = new DefaultCondition(metricNames, hostnames, "appid", "instanceid", 123L, 234L, Precision.SECONDS, 10, true);
- EasyMock.expect(PhoenixTransactSQL.prepareGetMetricsSqlStmt(null, condition)).andReturn(preparedStatementMock).once();
+ expect(PhoenixTransactSQL.prepareGetMetricsSqlStmt(null, condition)).andReturn(preparedStatementMock).once();
ResultSet rsMock = EasyMock.createNiceMock(ResultSet.class);
- EasyMock.expect(preparedStatementMock.executeQuery()).andReturn(rsMock);
+ expect(preparedStatementMock.executeQuery()).andReturn(rsMock);
- PowerMock.replayAll();
+ replayAll();
EasyMock.replay(preparedStatementMock, rsMock);
// Check when startTime < endTime
@@ -105,104 +124,64 @@ public class PhoenixHBaseAccessorTest {
TimelineMetrics tml2 = accessor.getMetricRecords(condition2, metricFunctions);
assertEquals(0, tml2.getMetrics().size());
- PowerMock.verifyAll();
+ verifyAll();
EasyMock.verify(preparedStatementMock, rsMock);
}
@Test
- public void testGetMetricRecordsIOException()
- throws SQLException, IOException {
-
- Configuration hbaseConf = new Configuration();
- hbaseConf.setStrings(ZOOKEEPER_QUORUM, "quorum");
- Configuration metricsConf = new Configuration();
-
- PhoenixConnectionProvider connectionProvider = new PhoenixConnectionProvider() {
- @Override
- public HBaseAdmin getHBaseAdmin() throws IOException {
- return null;
- }
-
- @Override
- public Connection getConnection() throws SQLException {
- return null;
- }
- };
-
- PhoenixHBaseAccessor accessor = new PhoenixHBaseAccessor(hbaseConf, metricsConf, connectionProvider);
-
+ public void testGetMetricRecordsIOException() throws SQLException, IOException {
List<String> metricNames = new LinkedList<>();
List<String> hostnames = new LinkedList<>();
Multimap<String, List<Function>> metricFunctions = ArrayListMultimap.create();
- PowerMock.mockStatic(PhoenixTransactSQL.class);
+ mockStatic(PhoenixTransactSQL.class);
PreparedStatement preparedStatementMock = EasyMock.createNiceMock(PreparedStatement.class);
Condition condition = new DefaultCondition(metricNames, hostnames, "appid", "instanceid", 123L, 234L, Precision.SECONDS, 10, true);
- EasyMock.expect(PhoenixTransactSQL.prepareGetMetricsSqlStmt(null, condition)).andReturn(preparedStatementMock).once();
+ expect(PhoenixTransactSQL.prepareGetMetricsSqlStmt(null, condition)).andReturn(preparedStatementMock).once();
ResultSet rsMock = EasyMock.createNiceMock(ResultSet.class);
RuntimeException runtimeException = EasyMock.createNiceMock(RuntimeException.class);
IOException io = EasyMock.createNiceMock(IOException.class);
- EasyMock.expect(preparedStatementMock.executeQuery()).andThrow(runtimeException);
- EasyMock.expect(runtimeException.getCause()).andReturn(io).atLeastOnce();
+ expect(preparedStatementMock.executeQuery()).andThrow(runtimeException);
+ expect(runtimeException.getCause()).andReturn(io).atLeastOnce();
StackTraceElement stackTrace[] = new StackTraceElement[]{new StackTraceElement("TimeRange","method","file",1)};
- EasyMock.expect(io.getStackTrace()).andReturn(stackTrace).atLeastOnce();
+ expect(io.getStackTrace()).andReturn(stackTrace).atLeastOnce();
- PowerMock.replayAll();
+ replayAll();
EasyMock.replay(preparedStatementMock, rsMock, io, runtimeException);
TimelineMetrics tml = accessor.getMetricRecords(condition, metricFunctions);
assertEquals(0, tml.getMetrics().size());
- PowerMock.verifyAll();
+ verifyAll();
EasyMock.verify(preparedStatementMock, rsMock, io, runtimeException);
}
@Test
- public void testGetMetricRecordsPhoenixIOExceptionDoNotRetryException()
- throws SQLException, IOException {
-
- Configuration hbaseConf = new Configuration();
- hbaseConf.setStrings(ZOOKEEPER_QUORUM, "quorum");
- Configuration metricsConf = new Configuration();
-
- PhoenixConnectionProvider connectionProvider = new PhoenixConnectionProvider() {
- @Override
- public HBaseAdmin getHBaseAdmin() throws IOException {
- return null;
- }
-
- @Override
- public Connection getConnection() throws SQLException {
- return null;
- }
- };
-
- PhoenixHBaseAccessor accessor = new PhoenixHBaseAccessor(hbaseConf, metricsConf, connectionProvider);
-
+ public void testGetMetricRecordsPhoenixIOExceptionDoNotRetryException() throws SQLException, IOException {
List<String> metricNames = new LinkedList<>();
List<String> hostnames = new LinkedList<>();
Multimap<String, List<Function>> metricFunctions = ArrayListMultimap.create();
- PowerMock.mockStatic(PhoenixTransactSQL.class);
+ mockStatic(PhoenixTransactSQL.class);
PreparedStatement preparedStatementMock = EasyMock.createNiceMock(PreparedStatement.class);
Condition condition = new DefaultCondition(metricNames, hostnames, "appid", "instanceid", null, null, Precision.SECONDS, 10, true);
- EasyMock.expect(PhoenixTransactSQL.prepareGetLatestMetricSqlStmt(null, condition)).andReturn(preparedStatementMock).once();
+ expect(PhoenixTransactSQL.prepareGetLatestMetricSqlStmt(null, condition)).andReturn(preparedStatementMock).once();
PhoenixTransactSQL.setSortMergeJoinEnabled(true);
EasyMock.expectLastCall();
ResultSet rsMock = EasyMock.createNiceMock(ResultSet.class);
PhoenixIOException pioe1 = EasyMock.createNiceMock(PhoenixIOException.class);
PhoenixIOException pioe2 = EasyMock.createNiceMock(PhoenixIOException.class);
DoNotRetryIOException dnrioe = EasyMock.createNiceMock(DoNotRetryIOException.class);
- EasyMock.expect(preparedStatementMock.executeQuery()).andThrow(pioe1);
- EasyMock.expect(pioe1.getCause()).andReturn(pioe2).atLeastOnce();
- EasyMock.expect(pioe2.getCause()).andReturn(dnrioe).atLeastOnce();
+ expect(preparedStatementMock.executeQuery()).andThrow(pioe1);
+ expect(pioe1.getCause()).andReturn(pioe2).atLeastOnce();
+ expect(pioe2.getCause()).andReturn(dnrioe).atLeastOnce();
StackTraceElement stackTrace[] = new StackTraceElement[]{new StackTraceElement("HashJoinRegionScanner","method","file",1)};
- EasyMock.expect(dnrioe.getStackTrace()).andReturn(stackTrace).atLeastOnce();
+ expect(dnrioe.getStackTrace()).andReturn(stackTrace).atLeastOnce();
- PowerMock.replayAll();
+ replayAll();
EasyMock.replay(preparedStatementMock, rsMock, pioe1, pioe2, dnrioe);
try {
accessor.getMetricRecords(condition, metricFunctions);
@@ -210,20 +189,17 @@ public class PhoenixHBaseAccessorTest {
} catch (Exception e) {
//NOP
}
- PowerMock.verifyAll();
+ verifyAll();
}
@Test
public void testMetricsCacheCommittingWhenFull() throws IOException, SQLException {
Configuration hbaseConf = new Configuration();
hbaseConf.setStrings(ZOOKEEPER_QUORUM, "quorum");
- Configuration metricsConf = new Configuration();
- metricsConf.setStrings(TimelineMetricConfiguration.TIMELINE_METRICS_CACHE_SIZE, "1");
- metricsConf.setStrings(TimelineMetricConfiguration.TIMELINE_METRICS_CACHE_COMMIT_INTERVAL, "100");
- final Connection connection = EasyMock.createNiceMock(Connection.class);
+ final Connection connection = EasyMock.createNiceMock(Connection.class);
- PhoenixHBaseAccessor accessor = new PhoenixHBaseAccessor(hbaseConf, metricsConf) {
+ accessor = new PhoenixHBaseAccessor(connectionProvider) {
@Override
public void commitMetrics(Collection<TimelineMetrics> timelineMetricsCollection) {
try {
@@ -235,7 +211,7 @@ public class PhoenixHBaseAccessorTest {
};
TimelineMetrics timelineMetrics = EasyMock.createNiceMock(TimelineMetrics.class);
- EasyMock.expect(timelineMetrics.getMetrics()).andReturn(Collections.singletonList(new TimelineMetric())).anyTimes();
+ expect(timelineMetrics.getMetrics()).andReturn(Collections.singletonList(new TimelineMetric())).anyTimes();
connection.commit();
EasyMock.expectLastCall().once();
@@ -250,44 +226,33 @@ public class PhoenixHBaseAccessorTest {
@Test
public void testMetricsAggregatorSink() throws IOException, SQLException {
- Configuration hbaseConf = new Configuration();
- hbaseConf.setStrings(ZOOKEEPER_QUORUM, "quorum");
- Configuration metricsConf = new Configuration();
Map<TimelineClusterMetric, MetricClusterAggregate> clusterAggregateMap =
new HashMap<>();
Map<TimelineClusterMetric, MetricHostAggregate> clusterTimeAggregateMap =
new HashMap<>();
Map<TimelineMetric, MetricHostAggregate> hostAggregateMap = new HashMap<>();
- metricsConf.setStrings(
- TimelineMetricConfiguration.TIMELINE_METRICS_CACHE_SIZE, "1");
- metricsConf.setStrings(
- TimelineMetricConfiguration.TIMELINE_METRICS_CACHE_COMMIT_INTERVAL,
- "100");
- metricsConf.setStrings(
- TimelineMetricConfiguration.TIMELINE_METRIC_AGGREGATOR_SINK_CLASS,
- "org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricsAggregatorMemorySink");
final Connection connection = EasyMock.createNiceMock(Connection.class);
- final PreparedStatement statement =
- EasyMock.createNiceMock(PreparedStatement.class);
- EasyMock.expect(connection.prepareStatement(EasyMock.anyString()))
- .andReturn(statement).anyTimes();
+ final PreparedStatement statement = EasyMock.createNiceMock(PreparedStatement.class);
+ expect(connection.prepareStatement(EasyMock.anyString())).andReturn(statement).anyTimes();
EasyMock.replay(statement);
EasyMock.replay(connection);
- PhoenixConnectionProvider connectionProvider =
- new PhoenixConnectionProvider() {
- @Override
- public HBaseAdmin getHBaseAdmin() throws IOException {
- return null;
- }
+ connectionProvider = new PhoenixConnectionProvider() {
+
+ @Override
+ public HBaseAdmin getHBaseAdmin() throws IOException {
+ return null;
+ }
+
+ @Override
+ public Connection getConnection() throws SQLException {
+ return connection;
+ }
+ };
- @Override
- public Connection getConnection() throws SQLException {
- return connection;
- }
- };
+ accessor = new PhoenixHBaseAccessor(connectionProvider);
TimelineClusterMetric clusterMetric =
new TimelineClusterMetric("metricName", "appId", "instanceId",
@@ -303,12 +268,10 @@ public class PhoenixHBaseAccessorTest {
clusterTimeAggregateMap.put(clusterMetric, new MetricHostAggregate());
hostAggregateMap.put(timelineMetric, new MetricHostAggregate());
- PhoenixHBaseAccessor accessor =
- new PhoenixHBaseAccessor(hbaseConf, metricsConf, connectionProvider);
accessor.saveClusterAggregateRecords(clusterAggregateMap);
accessor.saveHostAggregateRecords(hostAggregateMap,
PhoenixTransactSQL.METRICS_AGGREGATE_MINUTE_TABLE_NAME);
- accessor.saveClusterTimeAggregateRecords(clusterTimeAggregateMap,
+ accessor.saveClusterAggregateRecordsSecond(clusterTimeAggregateMap,
PhoenixTransactSQL.METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_NAME);
TimelineMetricsAggregatorMemorySink memorySink =
http://git-wip-us.apache.org/repos/asf/ambari/blob/c32eebf8/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricStoreWatcherTest.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricStoreWatcherTest.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricStoreWatcherTest.java
index 54b8442..dd0378d 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricStoreWatcherTest.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricStoreWatcherTest.java
@@ -66,7 +66,7 @@ public class TimelineMetricStoreWatcherTest {
replay(metricStore);
TimelineMetricStoreWatcher timelineMetricStoreWatcher =
- new TimelineMetricStoreWatcher(metricStore, new TimelineMetricConfiguration());
+ new TimelineMetricStoreWatcher(metricStore, TimelineMetricConfiguration.getInstance());
timelineMetricStoreWatcher.run();
timelineMetricStoreWatcher.run();
timelineMetricStoreWatcher.run();
@@ -97,7 +97,7 @@ public class TimelineMetricStoreWatcherTest {
replayAll();
TimelineMetricStoreWatcher timelineMetricStoreWatcher =
- new TimelineMetricStoreWatcher(metricStore, new TimelineMetricConfiguration());
+ new TimelineMetricStoreWatcher(metricStore, TimelineMetricConfiguration.getInstance());
timelineMetricStoreWatcher.run();
timelineMetricStoreWatcher.run();
timelineMetricStoreWatcher.run();
http://git-wip-us.apache.org/repos/asf/ambari/blob/c32eebf8/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/ITClusterAggregator.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/ITClusterAggregator.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/ITClusterAggregator.java
index 07fd85d..86c9b40 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/ITClusterAggregator.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/ITClusterAggregator.java
@@ -18,7 +18,29 @@
package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators;
-import junit.framework.Assert;
+import static junit.framework.Assert.assertEquals;
+import static junit.framework.Assert.assertNotNull;
+import static junit.framework.Assert.assertTrue;
+import static junit.framework.Assert.fail;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.MetricTestHelper.createEmptyTimelineClusterMetric;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.MetricTestHelper.prepareSingleTimelineMetric;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_APP_IDS;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.GET_CLUSTER_AGGREGATE_SQL;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_NAME;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_CLUSTER_AGGREGATE_TABLE_NAME;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.NATIVE_TIME_RANGE_DELTA;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.TreeMap;
+
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.metrics2.sink.timeline.MetricClusterAggregate;
import org.apache.hadoop.metrics2.sink.timeline.MetricHostAggregate;
@@ -26,42 +48,13 @@ import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.AbstractMiniHBaseClusterTest;
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.MetricTestHelper;
-import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixHBaseAccessor;
-import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineClusterMetric;
-import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineMetricAggregator;
-import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineMetricAggregatorFactory;
-import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineMetricReadHelper;
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.discovery.TimelineMetricMetadataManager;
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.Condition;
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.DefaultCondition;
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL;
-import org.apache.log4j.Level;
-import org.apache.log4j.Logger;
-import org.junit.After;
-import org.junit.Before;
import org.junit.Test;
-import java.sql.Connection;
-import java.sql.PreparedStatement;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.sql.Statement;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.TreeMap;
-
-import static junit.framework.Assert.assertEquals;
-import static junit.framework.Assert.assertNotNull;
-import static junit.framework.Assert.assertTrue;
-import static junit.framework.Assert.fail;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.MetricTestHelper.createEmptyTimelineClusterMetric;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.MetricTestHelper.prepareSingleTimelineMetric;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_APP_IDS;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.GET_CLUSTER_AGGREGATE_SQL;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_NAME;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_CLUSTER_AGGREGATE_TABLE_NAME;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.NATIVE_TIME_RANGE_DELTA;
+import junit.framework.Assert;
public class ITClusterAggregator extends AbstractMiniHBaseClusterTest {
private final TimelineMetricReadHelper metricReader = new TimelineMetricReadHelper(false);
@@ -77,7 +70,7 @@ public class ITClusterAggregator extends AbstractMiniHBaseClusterTest {
// GIVEN
TimelineMetricAggregator agg =
TimelineMetricAggregatorFactory.createTimelineClusterAggregatorSecond(hdb,
- getConfigurationForTest(false), new TimelineMetricMetadataManager(hdb, new Configuration()), null);
+ getConfigurationForTest(false), new TimelineMetricMetadataManager(new Configuration(), hdb), null);
TimelineMetricReadHelper readHelper = new TimelineMetricReadHelper(false);
long startTime = System.currentTimeMillis();
@@ -130,7 +123,7 @@ public class ITClusterAggregator extends AbstractMiniHBaseClusterTest {
// GIVEN
TimelineMetricAggregator agg =
TimelineMetricAggregatorFactory.createTimelineClusterAggregatorSecond(hdb,
- getConfigurationForTest(false), new TimelineMetricMetadataManager(hdb, new Configuration()), null);
+ getConfigurationForTest(false), new TimelineMetricMetadataManager(new Configuration(), hdb), null);
TimelineMetricReadHelper readHelper = new TimelineMetricReadHelper(false);
long startTime = System.currentTimeMillis();
@@ -206,7 +199,7 @@ public class ITClusterAggregator extends AbstractMiniHBaseClusterTest {
// GIVEN
TimelineMetricAggregator agg =
TimelineMetricAggregatorFactory.createTimelineClusterAggregatorSecond(hdb,
- getConfigurationForTest(false), new TimelineMetricMetadataManager(hdb, new Configuration()), null);
+ getConfigurationForTest(false), new TimelineMetricMetadataManager(new Configuration(), hdb), null);
TimelineMetricReadHelper readHelper = new TimelineMetricReadHelper(false);
// here we put some metrics tha will be aggregated
@@ -290,7 +283,7 @@ public class ITClusterAggregator extends AbstractMiniHBaseClusterTest {
MetricTestHelper.createMetricHostAggregate(4.0, 0.0, 2, 4.0));
- hdb.saveClusterTimeAggregateRecords(records, METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_NAME);
+ hdb.saveClusterAggregateRecordsSecond(records, METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_NAME);
// WHEN
agg.doWork(startTime, ctime + hour + 1000);
@@ -490,7 +483,7 @@ public class ITClusterAggregator extends AbstractMiniHBaseClusterTest {
conf.set(CLUSTER_AGGREGATOR_APP_IDS, "app1");
TimelineMetricAggregator agg =
TimelineMetricAggregatorFactory.createTimelineClusterAggregatorSecond(hdb,
- conf, new TimelineMetricMetadataManager(hdb, new Configuration()), null);
+ conf, new TimelineMetricMetadataManager(new Configuration(), hdb), null);
TimelineMetricReadHelper readHelper = new TimelineMetricReadHelper(false);
long startTime = System.currentTimeMillis();
@@ -511,14 +504,13 @@ public class ITClusterAggregator extends AbstractMiniHBaseClusterTest {
//THEN
Condition condition = new DefaultCondition(
- Collections.singletonList("cpu_user"), null, "app1", null,
+ new ArrayList<String>() {{ add("cpu_user"); }}, null, "app1", null,
startTime, endTime, null, null, true);
condition.setStatement(String.format(GET_CLUSTER_AGGREGATE_SQL,
PhoenixTransactSQL.getNaiveTimeRangeHint(startTime, NATIVE_TIME_RANGE_DELTA),
METRICS_CLUSTER_AGGREGATE_TABLE_NAME));
- PreparedStatement pstmt = PhoenixTransactSQL.prepareGetMetricsSqlStmt
- (conn, condition);
+ PreparedStatement pstmt = PhoenixTransactSQL.prepareGetMetricsSqlStmt(conn, condition);
ResultSet rs = pstmt.executeQuery();
int recordCount = 0;
@@ -542,7 +534,7 @@ public class ITClusterAggregator extends AbstractMiniHBaseClusterTest {
public void testClusterAggregateMetricNormalization() throws Exception {
TimelineMetricAggregator agg =
TimelineMetricAggregatorFactory.createTimelineClusterAggregatorSecond(hdb,
- getConfigurationForTest(false), new TimelineMetricMetadataManager(hdb, new Configuration()), null);
+ getConfigurationForTest(false), new TimelineMetricMetadataManager(new Configuration(), hdb), null);
TimelineMetricReadHelper readHelper = new TimelineMetricReadHelper(false);
// Sample data
http://git-wip-us.apache.org/repos/asf/ambari/blob/c32eebf8/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/discovery/TestMetadataManager.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/discovery/TestMetadataManager.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/discovery/TestMetadataManager.java
index c62fd34..3adf770 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/discovery/TestMetadataManager.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/discovery/TestMetadataManager.java
@@ -45,7 +45,7 @@ public class TestMetadataManager extends AbstractMiniHBaseClusterTest {
@Before
public void insertDummyRecords() throws IOException, SQLException, URISyntaxException {
// Initialize new manager
- metadataManager = new TimelineMetricMetadataManager(hdb, new Configuration());
+ metadataManager = new TimelineMetricMetadataManager(new Configuration(), hdb);
final long now = System.currentTimeMillis();
TimelineMetrics timelineMetrics = new TimelineMetrics();
http://git-wip-us.apache.org/repos/asf/ambari/blob/c32eebf8/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/discovery/TestMetadataSync.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/discovery/TestMetadataSync.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/discovery/TestMetadataSync.java
index 181abca..a524b13 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/discovery/TestMetadataSync.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/discovery/TestMetadataSync.java
@@ -68,8 +68,7 @@ public class TestMetadataSync {
replay(configuration, hBaseAccessor);
- TimelineMetricMetadataManager metadataManager = new
- TimelineMetricMetadataManager(hBaseAccessor, configuration);
+ TimelineMetricMetadataManager metadataManager = new TimelineMetricMetadataManager(new Configuration(), hBaseAccessor);
metadataManager.metricMetadataSync = new TimelineMetricMetadataSync(metadataManager);
@@ -110,8 +109,7 @@ public class TestMetadataSync {
replay(configuration, hBaseAccessor);
- TimelineMetricMetadataManager metadataManager = new
- TimelineMetricMetadataManager(hBaseAccessor, configuration);
+ TimelineMetricMetadataManager metadataManager = new TimelineMetricMetadataManager(configuration, hBaseAccessor);
metadataManager.putIfModifiedTimelineMetricMetadata(metadata1);
metadataManager.putIfModifiedTimelineMetricMetadata(metadata2);
http://git-wip-us.apache.org/repos/asf/ambari/blob/c32eebf8/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/source/RawMetricsSourceTest.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/source/RawMetricsSourceTest.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/source/RawMetricsSourceTest.java
new file mode 100644
index 0000000..5d3aacb
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/source/RawMetricsSourceTest.java
@@ -0,0 +1,142 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.source;
+
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.source.InternalSourceProvider.SOURCE_NAME.RAW_METRICS;
+import static org.easymock.EasyMock.capture;
+import static org.easymock.EasyMock.createNiceMock;
+import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.expectLastCall;
+import static org.easymock.EasyMock.replay;
+import static org.easymock.EasyMock.verify;
+import static org.powermock.api.easymock.PowerMock.mockStatic;
+import static org.powermock.api.easymock.PowerMock.replayAll;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.TreeMap;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.sink.ExternalMetricsSink;
+import org.easymock.Capture;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import junit.framework.Assert;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(TimelineMetricConfiguration.class)
+public class RawMetricsSourceTest {
+
+ @Before
+ public void setupConf() throws Exception {
+ TimelineMetricConfiguration conf = new TimelineMetricConfiguration(new
+ Configuration(), new Configuration());
+ mockStatic(TimelineMetricConfiguration.class);
+ expect(TimelineMetricConfiguration.getInstance()).andReturn(conf).anyTimes();
+ replayAll();
+ }
+
+ @Test
+ public void testRawMetricsSourcedAtFlushInterval() throws Exception {
+ InternalSourceProvider internalSourceProvider = new DefaultInternalMetricsSourceProvider();
+ ExternalMetricsSink rawMetricsSink = createNiceMock(ExternalMetricsSink.class);
+ expect(rawMetricsSink.getFlushSeconds()).andReturn(1);
+ expect(rawMetricsSink.getSinkTimeOutSeconds()).andReturn(1);
+ Capture<Collection<TimelineMetrics>> metricsCapture = new Capture<>();
+ rawMetricsSink.sinkMetricData(capture(metricsCapture));
+ expectLastCall();
+ replay(rawMetricsSink);
+
+ InternalMetricsSource rawMetricsSource = internalSourceProvider.getInternalMetricsSource(RAW_METRICS, 1, rawMetricsSink);
+ TimelineMetrics timelineMetrics = new TimelineMetrics();
+
+ final long now = System.currentTimeMillis();
+ TimelineMetric metric1 = new TimelineMetric();
+ metric1.setMetricName("m1");
+ metric1.setAppId("a1");
+ metric1.setInstanceId("i1");
+ metric1.setHostName("h1");
+ metric1.setStartTime(now - 200);
+ metric1.setMetricValues(new TreeMap<Long, Double>() {{
+ put(now - 100, 1.0);
+ put(now - 200, 2.0);
+ }});
+ timelineMetrics.getMetrics().add(metric1);
+
+ rawMetricsSource.publishTimelineMetrics(Collections.singletonList(timelineMetrics));
+
+ verify(rawMetricsSink);
+ }
+
+ @Test(timeout = 10000)
+ public void testRawMetricsCachedAndSourced() throws Exception {
+ ExternalMetricsSink rawMetricsSink = createNiceMock(ExternalMetricsSink.class);
+ expect(rawMetricsSink.getFlushSeconds()).andReturn(2).anyTimes();
+ expect(rawMetricsSink.getSinkTimeOutSeconds()).andReturn(1).anyTimes();
+
+ class CaptureOnce<T> extends Capture<T> {
+ @Override
+ public void setValue(T value) {
+ if (!hasCaptured()) {
+ super.setValue(value);
+ }
+ }
+ }
+ Capture<Collection<TimelineMetrics>> metricsCapture = new CaptureOnce<>();
+
+ rawMetricsSink.sinkMetricData(capture(metricsCapture));
+ expectLastCall();
+ replay(rawMetricsSink);
+
+ InternalSourceProvider internalSourceProvider = new DefaultInternalMetricsSourceProvider();
+ InternalMetricsSource rawMetricsSource = internalSourceProvider.getInternalMetricsSource(RAW_METRICS, 1, rawMetricsSink);
+ TimelineMetrics timelineMetrics = new TimelineMetrics();
+
+ final long now = System.currentTimeMillis();
+ TimelineMetric metric1 = new TimelineMetric();
+ metric1.setMetricName("m1");
+ metric1.setAppId("a1");
+ metric1.setInstanceId("i1");
+ metric1.setHostName("h1");
+ metric1.setStartTime(now - 200);
+ metric1.setTimestamp(now - 200);
+ metric1.setMetricValues(new TreeMap<Long, Double>() {{
+ put(now - 100, 1.0);
+ put(now - 200, 2.0);
+ }});
+ timelineMetrics.getMetrics().add(metric1);
+
+ rawMetricsSource.publishTimelineMetrics(Collections.singletonList(timelineMetrics));
+
+ // Wait on eviction
+ Thread.sleep(5000);
+
+ verify(rawMetricsSink);
+
+ Assert.assertTrue(metricsCapture.hasCaptured());
+ Assert.assertTrue(metricsCapture.getValue().iterator().next().getMetrics().iterator().next().equals(metric1));
+ }
+
+}