You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by av...@apache.org on 2017/11/20 19:09:47 UTC
[2/2] ambari git commit: AMBARI-22470 : Refine Metric Definition
Service and AD Query service. (avijayan)
AMBARI-22470 : Refine Metric Definition Service and AD Query service. (avijayan)
Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/ba9be802
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/ba9be802
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/ba9be802
Branch: refs/heads/branch-3.0-ams
Commit: ba9be8028a7608521e1ea769a239276c3cc223bc
Parents: 0fcca47
Author: Aravindan Vijayan <av...@hortonworks.com>
Authored: Mon Nov 20 10:46:13 2017 -0800
Committer: Aravindan Vijayan <av...@hortonworks.com>
Committed: Mon Nov 20 10:46:13 2017 -0800
----------------------------------------------------------------------
.../pom.xml | 63 ++--
.../src/main/resources/config.yml | 2 +-
.../src/main/resources/hbase-site.xml | 286 +++++++++++++++++++
.../adservice/app/ADServiceScalaModule.scala | 50 ++++
.../adservice/app/AnomalyDetectionApp.scala | 10 +-
.../app/AnomalyDetectionAppConfig.scala | 4 +-
.../app/AnomalyDetectionAppModule.scala | 5 +-
.../configuration/HBaseConfiguration.scala | 3 +
.../adservice/db/AdAnomalyStoreAccessor.scala | 36 +++
.../db/AdMetadataStoreAccessorImpl.scala | 96 +++++++
.../adservice/db/ConnectionProvider.scala | 45 +++
.../adservice/db/DefaultPhoenixDataSource.scala | 79 +++++
.../adservice/db/LevelDbStoreAccessor.scala | 56 ----
.../adservice/db/MetadataDatasource.scala | 6 +
.../db/PhoenixAnomalyStoreAccessor.scala | 75 +++--
.../db/PhoenixConnectionProvider.scala | 66 +++++
.../adservice/db/PhoenixQueryConstants.scala | 12 +-
.../adservice/leveldb/LevelDBDatasource.scala | 17 +-
.../adservice/metadata/ADMetadataProvider.scala | 86 ++++--
.../metadata/InputMetricDefinitionParser.scala | 24 +-
.../adservice/metadata/MetricDefinition.scala | 2 +
.../metadata/MetricDefinitionService.scala | 16 +-
.../metadata/MetricDefinitionServiceImpl.scala | 73 +++--
.../metrics/adservice/metadata/MetricKey.scala | 3 +
.../metadata/MetricMetadataProvider.scala | 2 +-
.../adservice/model/MetricAnomalyInstance.scala | 32 +++
.../model/SingleMetricAnomalyInstance.scala | 29 --
.../adservice/resource/AnomalyResource.scala | 55 +++-
.../resource/MetricDefinitionResource.scala | 77 ++++-
.../adservice/resource/RootResource.scala | 5 +-
.../adservice/service/ADQueryService.scala | 6 +-
.../adservice/service/ADQueryServiceImpl.scala | 25 +-
.../adservice/service/AbstractADService.scala | 44 +++
.../PointInTimeAnomalyInstance.scala | 4 +-
.../subsystem/trend/TrendAnomalyInstance.scala | 4 +-
.../app/DefaultADResourceSpecTest.scala | 5 +-
.../metadata/AMSMetadataProviderTest.scala | 16 +-
.../metadata/MetricSourceDefinitionTest.scala | 16 +-
ambari-metrics/ambari-metrics-common/pom.xml | 45 ---
.../sink/timeline/TimelineMetricKey.java | 59 ----
.../sink/timeline/query/ConnectionProvider.java | 32 ---
.../query/DefaultPhoenixDataSource.java | 108 -------
.../query/PhoenixConnectionProvider.java | 31 --
.../timeline/HBaseTimelineMetricsService.java | 36 ++-
.../metrics/timeline/PhoenixHBaseAccessor.java | 26 +-
.../metrics/timeline/TimelineMetricStore.java | 3 +-
.../timeline/query/ConnectionProvider.java | 31 ++
.../query/DefaultPhoenixDataSource.java | 92 ++++++
.../query/PhoenixConnectionProvider.java | 31 ++
.../webapp/TimelineWebServices.java | 12 +-
.../TestApplicationHistoryServer.java | 2 +-
.../timeline/AbstractMiniHBaseClusterTest.java | 13 +-
.../timeline/PhoenixHBaseAccessorTest.java | 11 +-
.../timeline/TestTimelineMetricStore.java | 3 +-
54 files changed, 1424 insertions(+), 546 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ambari/blob/ba9be802/ambari-metrics/ambari-metrics-anomaly-detection-service/pom.xml
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/pom.xml b/ambari-metrics/ambari-metrics-anomaly-detection-service/pom.xml
index 142f02f..c6927dd 100644
--- a/ambari-metrics/ambari-metrics-anomaly-detection-service/pom.xml
+++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/pom.xml
@@ -34,10 +34,12 @@
<properties>
<scala.version>2.12.3</scala.version>
<scala.binary.version>2.11</scala.binary.version>
- <hadoop.version>2.7.3.2.6.0.3-8</hadoop.version>
<jackson.version>2.9.1</jackson.version>
<dropwizard.version>1.2.0</dropwizard.version>
<spark.version>2.2.0</spark.version>
+ <hadoop.version>2.7.3.2.6.0.3-8</hadoop.version>
+ <hbase.version>1.1.2.2.6.0.3-8</hbase.version>
+ <phoenix.version>4.7.0.2.6.0.3-8</phoenix.version>
</properties>
<repositories>
@@ -64,6 +66,7 @@
<directory>src/main/resources</directory>
<includes>
<include>**/*.yml</include>
+ <include>**/*.xml</include>
<include>**/*.txt</include>
</includes>
</resource>
@@ -145,6 +148,28 @@
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
+ <filter>
+ <artifact>org.apache.phoenix:phoenix-core</artifact>
+ <excludes>
+ <exclude>org/joda/time/**</exclude>
+ <exclude>com/codahale/metrics/**</exclude>
+ <exclude>com/google/common/collect/**</exclude>
+ </excludes>
+ </filter>
+ <filter>
+ <artifact>org.apache.phoenix:phoenix-core</artifact>
+ <excludes>
+ <exclude>org/joda/time/**</exclude>
+ <exclude>com/codahale/metrics/**</exclude>
+ <exclude>com/google/common/collect/**</exclude>
+ </excludes>
+ </filter>
+ <filter>
+ <artifact>*:*</artifact>
+ <excludes>
+ <exclude>com/sun/jersey/**</exclude>
+ </excludes>
+ </filter>
</filters>
</configuration>
<executions>
@@ -245,33 +270,25 @@
</dependency>
<dependency>
<groupId>org.apache.phoenix</groupId>
- <artifactId>phoenix-spark</artifactId>
- <version>4.10.0-HBase-1.1</version>
+ <artifactId>phoenix-core</artifactId>
+ <version>${phoenix.version}</version>
<exclusions>
<exclusion>
- <artifactId>jersey-server</artifactId>
- <groupId>com.sun.jersey</groupId>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-common</artifactId>
</exclusion>
<exclusion>
- <artifactId>jersey-core</artifactId>
- <groupId>com.sun.jersey</groupId>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-annotations</artifactId>
</exclusion>
<exclusion>
- <artifactId>jersey-client</artifactId>
+ <artifactId>jersey-core</artifactId>
<groupId>com.sun.jersey</groupId>
</exclusion>
<exclusion>
- <artifactId>jersey-guice</artifactId>
- <groupId>com.sun.jersey.contribs</groupId>
- </exclusion>
- <exclusion>
- <artifactId>jersey-json</artifactId>
+ <artifactId>jersey-server</artifactId>
<groupId>com.sun.jersey</groupId>
</exclusion>
- <exclusion>
- <groupId>com.fasterxml.jackson.core</groupId>
- <artifactId>jackson-databind</artifactId>
- </exclusion>
</exclusions>
</dependency>
<dependency>
@@ -379,6 +396,10 @@
<groupId>org.slf4j</groupId>
<artifactId>log4j-over-slf4j</artifactId>
</exclusion>
+ <exclusion>
+ <artifactId>jersey-server</artifactId>
+ <groupId>org.glassfish.jersey.core</groupId>
+ </exclusion>
</exclusions>
</dependency>
<dependency>
@@ -444,6 +465,12 @@
<artifactId>leveldb</artifactId>
<version>0.9</version>
</dependency>
+ <!-- https://mvnrepository.com/artifact/org.scalaj/scalaj-http -->
+ <dependency>
+ <groupId>org.scalaj</groupId>
+ <artifactId>scalaj-http_2.12</artifactId>
+ <version>2.3.0</version>
+ </dependency>
<dependency>
<groupId>junit</groupId>
@@ -454,7 +481,7 @@
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
- <version>21.0</version>
+ <version>18.0</version>
</dependency>
<dependency>
<groupId>io.dropwizard.metrics</groupId>
http://git-wip-us.apache.org/repos/asf/ambari/blob/ba9be802/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/resources/config.yml
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/resources/config.yml b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/resources/config.yml
index 9402f6e..7de06b4 100644
--- a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/resources/config.yml
+++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/resources/config.yml
@@ -27,7 +27,7 @@ metricsCollector:
hosts: host1,host2
port: 6188
protocol: http
- metadataEndpoint: /v1/timeline/metrics/metadata/keys
+ metadataEndpoint: /ws/v1/timeline/metrics/metadata/key
adQueryService:
anomalyDataTtl: 604800
http://git-wip-us.apache.org/repos/asf/ambari/blob/ba9be802/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/resources/hbase-site.xml
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/resources/hbase-site.xml b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/resources/hbase-site.xml
new file mode 100644
index 0000000..66f0454
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/resources/hbase-site.xml
@@ -0,0 +1,286 @@
+<?xml version="1.0"?>
+<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
+<!--
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+-->
+<configuration>
+
+ <property>
+ <name>dfs.client.read.shortcircuit</name>
+ <value>true</value>
+ </property>
+
+ <property>
+ <name>hbase.client.scanner.caching</name>
+ <value>10000</value>
+ </property>
+
+ <property>
+ <name>hbase.client.scanner.timeout.period</name>
+ <value>300000</value>
+ </property>
+
+ <property>
+ <name>hbase.cluster.distributed</name>
+ <value>false</value>
+ </property>
+
+ <property>
+ <name>hbase.hregion.majorcompaction</name>
+ <value>0</value>
+ </property>
+
+ <property>
+ <name>hbase.hregion.max.filesize</name>
+ <value>4294967296</value>
+ </property>
+
+ <property>
+ <name>hbase.hregion.memstore.block.multiplier</name>
+ <value>4</value>
+ </property>
+
+ <property>
+ <name>hbase.hregion.memstore.flush.size</name>
+ <value>134217728</value>
+ </property>
+
+ <property>
+ <name>hbase.hstore.blockingStoreFiles</name>
+ <value>200</value>
+ </property>
+
+ <property>
+ <name>hbase.hstore.flusher.count</name>
+ <value>2</value>
+ </property>
+
+ <property>
+ <name>hbase.local.dir</name>
+ <value>${hbase.tmp.dir}/local</value>
+ </property>
+
+ <property>
+ <name>hbase.master.info.bindAddress</name>
+ <value>0.0.0.0</value>
+ </property>
+
+ <property>
+ <name>hbase.master.info.port</name>
+ <value>61310</value>
+ </property>
+
+ <property>
+ <name>hbase.master.normalizer.class</name>
+ <value>org.apache.hadoop.hbase.master.normalizer.SimpleRegionNormalizer</value>
+ </property>
+
+ <property>
+ <name>hbase.master.port</name>
+ <value>61300</value>
+ </property>
+
+ <property>
+ <name>hbase.master.wait.on.regionservers.mintostart</name>
+ <value>1</value>
+ </property>
+
+ <property>
+ <name>hbase.normalizer.enabled</name>
+ <value>false</value>
+ </property>
+
+ <property>
+ <name>hbase.normalizer.period</name>
+ <value>600000</value>
+ </property>
+
+ <property>
+ <name>hbase.regionserver.global.memstore.lowerLimit</name>
+ <value>0.3</value>
+ </property>
+
+ <property>
+ <name>hbase.regionserver.global.memstore.upperLimit</name>
+ <value>0.35</value>
+ </property>
+
+ <property>
+ <name>hbase.regionserver.info.port</name>
+ <value>61330</value>
+ </property>
+
+ <property>
+ <name>hbase.regionserver.port</name>
+ <value>61320</value>
+ </property>
+
+ <property>
+ <name>hbase.regionserver.thread.compaction.large</name>
+ <value>2</value>
+ </property>
+
+ <property>
+ <name>hbase.regionserver.thread.compaction.small</name>
+ <value>3</value>
+ </property>
+
+ <property>
+ <name>hbase.replication</name>
+ <value>false</value>
+ </property>
+
+ <property>
+ <name>hbase.rootdir</name>
+ <value>file:///var/lib/ambari-metrics-collector/hbase</value>
+ </property>
+
+ <property>
+ <name>hbase.rpc.timeout</name>
+ <value>300000</value>
+ </property>
+
+ <property>
+ <name>hbase.snapshot.enabled</name>
+ <value>false</value>
+ </property>
+
+ <property>
+ <name>hbase.superuser</name>
+ <value>activity_explorer,activity_analyzer</value>
+ </property>
+
+ <property>
+ <name>hbase.tmp.dir</name>
+ <value>/var/lib/ambari-metrics-collector/hbase-tmp</value>
+ </property>
+
+ <property>
+ <name>hbase.zookeeper.leaderport</name>
+ <value>61388</value>
+ </property>
+
+ <property>
+ <name>hbase.zookeeper.peerport</name>
+ <value>61288</value>
+ </property>
+
+ <property>
+ <name>hbase.zookeeper.property.clientPort</name>
+ <value>61181</value>
+ </property>
+
+ <property>
+ <name>hbase.zookeeper.property.dataDir</name>
+ <value>${hbase.tmp.dir}/zookeeper</value>
+ </property>
+
+ <property>
+ <name>hbase.zookeeper.property.tickTime</name>
+ <value>6000</value>
+ </property>
+
+ <property>
+ <name>hbase.zookeeper.quorum</name>
+ <value>c6401.ambari.apache.org</value>
+ <final>true</final>
+ </property>
+
+ <property>
+ <name>hfile.block.cache.size</name>
+ <value>0.3</value>
+ </property>
+
+ <property>
+ <name>phoenix.coprocessor.maxMetaDataCacheSize</name>
+ <value>20480000</value>
+ </property>
+
+ <property>
+ <name>phoenix.coprocessor.maxServerCacheTimeToLiveMs</name>
+ <value>60000</value>
+ </property>
+
+ <property>
+ <name>phoenix.groupby.maxCacheSize</name>
+ <value>307200000</value>
+ </property>
+
+ <property>
+ <name>phoenix.mutate.batchSize</name>
+ <value>10000</value>
+ </property>
+
+ <property>
+ <name>phoenix.query.keepAliveMs</name>
+ <value>300000</value>
+ </property>
+
+ <property>
+ <name>phoenix.query.maxGlobalMemoryPercentage</name>
+ <value>15</value>
+ </property>
+
+ <property>
+ <name>phoenix.query.rowKeyOrderSaltedTable</name>
+ <value>true</value>
+ </property>
+
+ <property>
+ <name>phoenix.query.spoolThresholdBytes</name>
+ <value>20971520</value>
+ </property>
+
+ <property>
+ <name>phoenix.query.timeoutMs</name>
+ <value>300000</value>
+ </property>
+
+ <property>
+ <name>phoenix.sequence.saltBuckets</name>
+ <value>2</value>
+ </property>
+
+ <property>
+ <name>phoenix.spool.directory</name>
+ <value>${hbase.tmp.dir}/phoenix-spool</value>
+ </property>
+
+ <property>
+ <name>zookeeper.session.timeout</name>
+ <value>120000</value>
+ </property>
+
+ <property>
+ <name>zookeeper.session.timeout.localHBaseCluster</name>
+ <value>120000</value>
+ </property>
+
+ <property>
+ <name>zookeeper.znode.parent</name>
+ <value>/ams-hbase-unsecure</value>
+ </property>
+
+ <property>
+ <name>hbase.use.dynamic.jars</name>
+ <value>false</value>
+ </property>
+
+ </configuration>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ambari/blob/ba9be802/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/app/ADServiceScalaModule.scala
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/app/ADServiceScalaModule.scala b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/app/ADServiceScalaModule.scala
new file mode 100644
index 0000000..8578a80
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/app/ADServiceScalaModule.scala
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.metrics.adservice.app
+
+import com.fasterxml.jackson.module.scala._
+import com.fasterxml.jackson.module.scala.deser.{ScalaNumberDeserializersModule, UntypedObjectDeserializerModule}
+import com.fasterxml.jackson.module.scala.introspect.{ScalaAnnotationIntrospector, ScalaAnnotationIntrospectorModule}
+
+/**
+ * Extended Jackson Module that fixes the Scala-Jackson BytecodeReadingParanamer issue.
+ */
+class ADServiceScalaModule extends JacksonModule
+ with IteratorModule
+ with EnumerationModule
+ with OptionModule
+ with SeqModule
+ with IterableModule
+ with TupleModule
+ with MapModule
+ with SetModule
+ with FixedScalaAnnotationIntrospectorModule
+ with UntypedObjectDeserializerModule
+ with EitherModule {
+
+ override def getModuleName = "ADServiceScalaModule"
+
+ object ADServiceScalaModule extends ADServiceScalaModule
+
+}
+
+
+trait FixedScalaAnnotationIntrospectorModule extends JacksonModule {
+ this += { _.appendAnnotationIntrospector(ScalaAnnotationIntrospector) }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/ba9be802/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/app/AnomalyDetectionApp.scala
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/app/AnomalyDetectionApp.scala b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/app/AnomalyDetectionApp.scala
index 8b3a829..2d0dbdf 100644
--- a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/app/AnomalyDetectionApp.scala
+++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/app/AnomalyDetectionApp.scala
@@ -21,6 +21,9 @@ import javax.ws.rs.Path
import javax.ws.rs.container.{ContainerRequestFilter, ContainerResponseFilter}
import org.apache.ambari.metrics.adservice.app.GuiceInjector.{withInjector, wrap}
+import org.apache.ambari.metrics.adservice.db.{AdAnomalyStoreAccessor, MetadataDatasource}
+import org.apache.ambari.metrics.adservice.metadata.MetricDefinitionService
+import org.apache.ambari.metrics.adservice.service.ADQueryService
import org.glassfish.jersey.filter.LoggingFilter
import com.codahale.metrics.health.HealthCheck
@@ -45,6 +48,11 @@ class AnomalyDetectionApp extends Application[AnomalyDetectionAppConfig] {
injector.instancesOfType(classOf[HealthCheck]).foreach { h => env.healthChecks.register(h.getClass.getName, h) }
injector.instancesOfType(classOf[ContainerRequestFilter]).foreach { f => env.jersey().register(f) }
injector.instancesOfType(classOf[ContainerResponseFilter]).foreach { f => env.jersey().register(f) }
+
+ //Initialize Services
+ injector.getInstance(classOf[MetadataDatasource]).initialize
+ injector.getInstance(classOf[MetricDefinitionService]).initialize
+ injector.getInstance(classOf[ADQueryService]).initialize
}
env.jersey.register(jacksonJaxbJsonProvider)
env.jersey.register(new LoggingFilter)
@@ -53,7 +61,7 @@ class AnomalyDetectionApp extends Application[AnomalyDetectionAppConfig] {
private def jacksonJaxbJsonProvider: JacksonJaxbJsonProvider = {
val provider = new JacksonJaxbJsonProvider()
val objectMapper = new ObjectMapper()
- objectMapper.registerModule(DefaultScalaModule)
+ objectMapper.registerModule(new ADServiceScalaModule)
objectMapper.registerModule(new JodaModule)
objectMapper.configure(SerializationFeature.WRAP_ROOT_VALUE, false)
objectMapper.configure(SerializationFeature.FAIL_ON_EMPTY_BEANS, false)
http://git-wip-us.apache.org/repos/asf/ambari/blob/ba9be802/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/app/AnomalyDetectionAppConfig.scala
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/app/AnomalyDetectionAppConfig.scala b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/app/AnomalyDetectionAppConfig.scala
index 93f6b28..f9ed4b2 100644
--- a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/app/AnomalyDetectionAppConfig.scala
+++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/app/AnomalyDetectionAppConfig.scala
@@ -42,7 +42,7 @@ class AnomalyDetectionAppConfig extends Configuration {
private val metricCollectorConfiguration = new MetricCollectorConfiguration
/*
- Anomaly Service configuration
+ Anomaly Query Service configuration
*/
@Valid
private val adServiceConfiguration = new AdServiceConfiguration
@@ -54,7 +54,7 @@ class AnomalyDetectionAppConfig extends Configuration {
private val metricDefinitionDBConfiguration = new MetricDefinitionDBConfiguration
/*
- HBase Conf
+ AMS HBase Conf
*/
@JsonIgnore
def getHBaseConf : org.apache.hadoop.conf.Configuration = {
http://git-wip-us.apache.org/repos/asf/ambari/blob/ba9be802/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/app/AnomalyDetectionAppModule.scala
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/app/AnomalyDetectionAppModule.scala b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/app/AnomalyDetectionAppModule.scala
index a896563..68e9df9 100644
--- a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/app/AnomalyDetectionAppModule.scala
+++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/app/AnomalyDetectionAppModule.scala
@@ -17,7 +17,7 @@
*/
package org.apache.ambari.metrics.adservice.app
-import org.apache.ambari.metrics.adservice.db.{AdMetadataStoreAccessor, LevelDbStoreAccessor, MetadataDatasource}
+import org.apache.ambari.metrics.adservice.db._
import org.apache.ambari.metrics.adservice.leveldb.LevelDBDataSource
import org.apache.ambari.metrics.adservice.metadata.{MetricDefinitionService, MetricDefinitionServiceImpl}
import org.apache.ambari.metrics.adservice.resource.{AnomalyResource, MetricDefinitionResource, RootResource}
@@ -38,9 +38,10 @@ class AnomalyDetectionAppModule(config: AnomalyDetectionAppConfig, env: Environm
bind(classOf[AnomalyResource])
bind(classOf[MetricDefinitionResource])
bind(classOf[RootResource])
- bind(classOf[AdMetadataStoreAccessor]).to(classOf[LevelDbStoreAccessor])
+ bind(classOf[AdMetadataStoreAccessor]).to(classOf[AdMetadataStoreAccessorImpl])
bind(classOf[ADQueryService]).to(classOf[ADQueryServiceImpl])
bind(classOf[MetricDefinitionService]).to(classOf[MetricDefinitionServiceImpl])
bind(classOf[MetadataDatasource]).to(classOf[LevelDBDataSource])
+ bind(classOf[AdAnomalyStoreAccessor]).to(classOf[PhoenixAnomalyStoreAccessor])
}
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/ba9be802/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/configuration/HBaseConfiguration.scala
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/configuration/HBaseConfiguration.scala b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/configuration/HBaseConfiguration.scala
index a51a959..a95ff15 100644
--- a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/configuration/HBaseConfiguration.scala
+++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/configuration/HBaseConfiguration.scala
@@ -28,6 +28,9 @@ object HBaseConfiguration {
var isInitialized: Boolean = false
val LOG : Logger = LoggerFactory.getLogger("HBaseConfiguration")
+ /**
+ * Initialize the hbase conf from hbase-site present in classpath.
+ */
def initConfigs(): Unit = {
if (!isInitialized) {
var classLoader: ClassLoader = Thread.currentThread.getContextClassLoader
http://git-wip-us.apache.org/repos/asf/ambari/blob/ba9be802/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/db/AdAnomalyStoreAccessor.scala
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/db/AdAnomalyStoreAccessor.scala b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/db/AdAnomalyStoreAccessor.scala
new file mode 100644
index 0000000..676b09a
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/db/AdAnomalyStoreAccessor.scala
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.metrics.adservice.db
+
+import org.apache.ambari.metrics.adservice.model.AnomalyType.AnomalyType
+import org.apache.ambari.metrics.adservice.model.MetricAnomalyInstance
+
+/**
+ * Trait for anomaly store accessor. (Phoenix)
+ */
+trait AdAnomalyStoreAccessor {
+
+ def initialize(): Unit
+
+ def getMetricAnomalies(anomalyType: AnomalyType,
+ startTime: Long,
+ endTime: Long,
+ limit: Int) : List[MetricAnomalyInstance]
+
+ }
http://git-wip-us.apache.org/repos/asf/ambari/blob/ba9be802/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/db/AdMetadataStoreAccessorImpl.scala
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/db/AdMetadataStoreAccessorImpl.scala b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/db/AdMetadataStoreAccessorImpl.scala
new file mode 100644
index 0000000..7405459
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/db/AdMetadataStoreAccessorImpl.scala
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.metrics.adservice.db
+
+import org.apache.ambari.metrics.adservice.metadata.MetricSourceDefinition
+import org.apache.commons.lang.SerializationUtils
+
+import com.google.inject.Inject
+
+/**
+ * Implementation of the AdMetadataStoreAccessor.
+ * Serves as the adaptor between metric definition service and LevelDB worlds.
+ */
+class AdMetadataStoreAccessorImpl extends AdMetadataStoreAccessor {
+
+ @Inject
+ var metadataDataSource: MetadataDatasource = _
+
+ @Inject
+ def this(metadataDataSource: MetadataDatasource) = {
+ this
+ this.metadataDataSource = metadataDataSource
+ }
+
+ /**
+ * Return all saved component definitions from DB.
+ *
+ * @return
+ */
+ override def getSavedInputDefinitions: List[MetricSourceDefinition] = {
+ val valuesFromStore : List[MetadataDatasource#Value] = metadataDataSource.getAll
+ val definitions = scala.collection.mutable.MutableList.empty[MetricSourceDefinition]
+
+ for (value : Array[Byte] <- valuesFromStore) {
+ val definition : MetricSourceDefinition = SerializationUtils.deserialize(value).asInstanceOf[MetricSourceDefinition]
+ if (definition != null) {
+ definitions.+=(definition)
+ }
+ }
+ definitions.toList
+ }
+
+ /**
+ * Save a set of component definitions
+ *
+ * @param metricSourceDefinitions Set of component definitions
+ * @return Success / Failure
+ */
+ override def saveInputDefinitions(metricSourceDefinitions: List[MetricSourceDefinition]): Boolean = {
+ for (definition <- metricSourceDefinitions) {
+ saveInputDefinition(definition)
+ }
+ true
+ }
+
+ /**
+ * Save a component definition
+ *
+ * @param metricSourceDefinition component definition
+ * @return Success / Failure
+ */
+ override def saveInputDefinition(metricSourceDefinition: MetricSourceDefinition): Boolean = {
+ val storeValue : MetadataDatasource#Value = SerializationUtils.serialize(metricSourceDefinition)
+ val storeKey : MetadataDatasource#Key = metricSourceDefinition.definitionName.getBytes()
+ metadataDataSource.put(storeKey, storeValue)
+ true
+ }
+
+ /**
+ * Delete a component definition
+ *
+ * @param definitionName component definition
+ * @return
+ */
+ override def removeInputDefinition(definitionName: String): Boolean = {
+ val storeKey : MetadataDatasource#Key = definitionName.getBytes()
+ metadataDataSource.delete(storeKey)
+ true
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/ba9be802/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/db/ConnectionProvider.scala
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/db/ConnectionProvider.scala b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/db/ConnectionProvider.scala
new file mode 100644
index 0000000..cc02ed4
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/db/ConnectionProvider.scala
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *//**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.metrics.adservice.db
+
+import java.sql.Connection
+import java.sql.SQLException
+
+/**
+ * Provides a connection to the anomaly store.
+ */
+trait ConnectionProvider {
+ @throws[SQLException]
+ def getConnection: Connection
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ambari/blob/ba9be802/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/db/DefaultPhoenixDataSource.scala
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/db/DefaultPhoenixDataSource.scala b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/db/DefaultPhoenixDataSource.scala
new file mode 100644
index 0000000..d9396de
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/db/DefaultPhoenixDataSource.scala
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.metrics.adservice.db
+
+import org.apache.commons.logging.LogFactory
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.hbase.client.ConnectionFactory
+import org.apache.hadoop.hbase.client.HBaseAdmin
+import java.io.IOException
+import java.sql.Connection
+import java.sql.DriverManager
+import java.sql.SQLException
+
+object DefaultPhoenixDataSource {
+ private[db] val LOG = LogFactory.getLog(classOf[DefaultPhoenixDataSource])
+ private val ZOOKEEPER_CLIENT_PORT = "hbase.zookeeper.property.clientPort"
+ private val ZOOKEEPER_QUORUM = "hbase.zookeeper.quorum"
+ private val ZNODE_PARENT = "zookeeper.znode.parent"
+ private val connectionUrl = "jdbc:phoenix:%s:%s:%s"
+}
+
+class DefaultPhoenixDataSource(var hbaseConf: Configuration) extends PhoenixConnectionProvider {
+
+ val zookeeperClientPort: String = hbaseConf.getTrimmed(DefaultPhoenixDataSource.ZOOKEEPER_CLIENT_PORT, "2181")
+ val zookeeperQuorum: String = hbaseConf.getTrimmed(DefaultPhoenixDataSource.ZOOKEEPER_QUORUM)
+ val znodeParent: String = hbaseConf.getTrimmed(DefaultPhoenixDataSource.ZNODE_PARENT, "/ams-hbase-unsecure")
+ final private var url : String = _
+
+ if (zookeeperQuorum == null || zookeeperQuorum.isEmpty) {
+ throw new IllegalStateException("Unable to find Zookeeper quorum to access HBase store using Phoenix.")
+ }
+ url = String.format(DefaultPhoenixDataSource.connectionUrl, zookeeperQuorum, zookeeperClientPort, znodeParent)
+
+
+ /**
+ * Get HBaseAdmin for table ops.
+ *
+ * @return @HBaseAdmin
+ * @throws IOException
+ */
+ @throws[IOException]
+ override def getHBaseAdmin: HBaseAdmin = ConnectionFactory.createConnection(hbaseConf).getAdmin.asInstanceOf[HBaseAdmin]
+
+ /**
+ * Get JDBC connection to HBase store. Assumption is that the hbase
+ * configuration is present on the classpath and loaded by the caller into
+ * the Configuration object.
+ * Phoenix already caches the HConnection between the client and HBase
+ * cluster.
+ *
+ * @return @java.sql.Connection
+ */
+ @throws[SQLException]
+ override def getConnection: Connection = {
+ DefaultPhoenixDataSource.LOG.debug("Metric store connection url: " + url)
+ try DriverManager.getConnection(url)
+ catch {
+ case e: SQLException =>
+ DefaultPhoenixDataSource.LOG.warn("Unable to connect to HBase store using Phoenix.", e)
+ throw e
+ }
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ambari/blob/ba9be802/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/db/LevelDbStoreAccessor.scala
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/db/LevelDbStoreAccessor.scala b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/db/LevelDbStoreAccessor.scala
deleted file mode 100644
index baad57d..0000000
--- a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/db/LevelDbStoreAccessor.scala
+++ /dev/null
@@ -1,56 +0,0 @@
-package org.apache.ambari.metrics.adservice.db
-
-import org.apache.ambari.metrics.adservice.metadata.MetricSourceDefinition
-
-import com.google.inject.Inject
-
-class LevelDbStoreAccessor extends AdMetadataStoreAccessor{
-
- @Inject
- var levelDbDataSource : MetadataDatasource = _
-
- @Inject
- def this(levelDbDataSource: MetadataDatasource) = {
- this
- this.levelDbDataSource = levelDbDataSource
- }
-
- /**
- * Return all saved component definitions from DB.
- *
- * @return
- */
- override def getSavedInputDefinitions: List[MetricSourceDefinition] = {
- List.empty[MetricSourceDefinition]
- }
-
- /**
- * Save a set of component definitions
- *
- * @param metricSourceDefinitions Set of component definitions
- * @return Success / Failure
- */
-override def saveInputDefinitions(metricSourceDefinitions: List[MetricSourceDefinition]): Boolean = {
- true
-}
-
- /**
- * Save a component definition
- *
- * @param metricSourceDefinition component definition
- * @return Success / Failure
- */
- override def saveInputDefinition(metricSourceDefinition: MetricSourceDefinition): Boolean = {
- true
- }
-
- /**
- * Delete a component definition
- *
- * @param definitionName component definition
- * @return
- */
- override def removeInputDefinition(definitionName: String): Boolean = {
- true
- }
-}
http://git-wip-us.apache.org/repos/asf/ambari/blob/ba9be802/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/db/MetadataDatasource.scala
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/db/MetadataDatasource.scala b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/db/MetadataDatasource.scala
index aa6694a..7b223a2 100644
--- a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/db/MetadataDatasource.scala
+++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/db/MetadataDatasource.scala
@@ -44,6 +44,12 @@ trait MetadataDatasource {
*/
def get(key: Key): Option[Value]
+ /**
+ * This function obtains all the values
+ *
+ * @return the list of values
+ */
+ def getAll: List[Value]
/**
* This function associates a key to a value, overwriting if necessary
http://git-wip-us.apache.org/repos/asf/ambari/blob/ba9be802/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/db/PhoenixAnomalyStoreAccessor.scala
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/db/PhoenixAnomalyStoreAccessor.scala b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/db/PhoenixAnomalyStoreAccessor.scala
index 36aea21..147d1f7 100644
--- a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/db/PhoenixAnomalyStoreAccessor.scala
+++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/db/PhoenixAnomalyStoreAccessor.scala
@@ -23,48 +23,60 @@ import java.util.concurrent.TimeUnit.SECONDS
import org.apache.ambari.metrics.adservice.app.AnomalyDetectionAppConfig
import org.apache.ambari.metrics.adservice.common._
import org.apache.ambari.metrics.adservice.configuration.HBaseConfiguration
-import org.apache.ambari.metrics.adservice.metadata.MetricKey
+import org.apache.ambari.metrics.adservice.metadata.{MetricDefinitionService, MetricKey}
import org.apache.ambari.metrics.adservice.model.AnomalyDetectionMethod.AnomalyDetectionMethod
import org.apache.ambari.metrics.adservice.model.AnomalyType.AnomalyType
-import org.apache.ambari.metrics.adservice.model.{AnomalyDetectionMethod, AnomalyType, SingleMetricAnomalyInstance}
+import org.apache.ambari.metrics.adservice.model.{AnomalyDetectionMethod, AnomalyType, MetricAnomalyInstance}
import org.apache.ambari.metrics.adservice.subsystem.pointintime.PointInTimeAnomalyInstance
import org.apache.ambari.metrics.adservice.subsystem.trend.TrendAnomalyInstance
import org.apache.hadoop.hbase.util.RetryCounterFactory
-import org.apache.hadoop.metrics2.sink.timeline.query.{DefaultPhoenixDataSource, PhoenixConnectionProvider}
+import org.slf4j.{Logger, LoggerFactory}
import com.google.inject.Inject
-object PhoenixAnomalyStoreAccessor {
+/**
+ * Phoenix query handler class.
+ */
+class PhoenixAnomalyStoreAccessor extends AdAnomalyStoreAccessor {
@Inject
var configuration: AnomalyDetectionAppConfig = _
+ @Inject
+ var metricDefinitionService: MetricDefinitionService = _
+
var datasource: PhoenixConnectionProvider = _
+ val LOG : Logger = LoggerFactory.getLogger(classOf[PhoenixAnomalyStoreAccessor])
- def initAnomalyMetricSchema(): Unit = {
+ @Override
+ def initialize(): Unit = {
- val datasource: PhoenixConnectionProvider = new DefaultPhoenixDataSource(HBaseConfiguration.getHBaseConf)
+ datasource = new DefaultPhoenixDataSource(HBaseConfiguration.getHBaseConf)
val retryCounterFactory = new RetryCounterFactory(10, SECONDS.toMillis(3).toInt)
val ttl = configuration.getAdServiceConfiguration.getAnomalyDataTtl
try {
- var conn = datasource.getConnectionRetryingOnException(retryCounterFactory)
+ var conn : Connection = getConnectionRetryingOnException(retryCounterFactory)
var stmt = conn.createStatement
+ //Create Method parameters table.
val methodParametersSql = String.format(PhoenixQueryConstants.CREATE_METHOD_PARAMETERS_TABLE,
PhoenixQueryConstants.METHOD_PARAMETERS_TABLE_NAME)
stmt.executeUpdate(methodParametersSql)
+ //Create Point in Time anomaly table
val pointInTimeAnomalySql = String.format(PhoenixQueryConstants.CREATE_PIT_ANOMALY_METRICS_TABLE_SQL,
PhoenixQueryConstants.PIT_ANOMALY_METRICS_TABLE_NAME,
ttl.asInstanceOf[Object])
stmt.executeUpdate(pointInTimeAnomalySql)
+ //Create Trend Anomaly table
val trendAnomalySql = String.format(PhoenixQueryConstants.CREATE_TREND_ANOMALY_METRICS_TABLE_SQL,
PhoenixQueryConstants.TREND_ANOMALY_METRICS_TABLE_NAME,
ttl.asInstanceOf[Object])
stmt.executeUpdate(trendAnomalySql)
+ //Create model snapshot table.
val snapshotSql = String.format(PhoenixQueryConstants.CREATE_MODEL_SNAPSHOT_TABLE,
PhoenixQueryConstants.MODEL_SNAPSHOT)
stmt.executeUpdate(snapshotSql)
@@ -75,11 +87,9 @@ object PhoenixAnomalyStoreAccessor {
}
}
- @throws[SQLException]
- def getConnection: Connection = datasource.getConnection
-
- def getSingleMetricAnomalies(anomalyType: AnomalyType, startTime: Long, endTime: Long, limit: Int) : scala.collection.mutable.MutableList[SingleMetricAnomalyInstance] = {
- val anomalies = scala.collection.mutable.MutableList.empty[SingleMetricAnomalyInstance]
+ @Override
+ def getMetricAnomalies(anomalyType: AnomalyType, startTime: Long, endTime: Long, limit: Int) : List[MetricAnomalyInstance] = {
+ val anomalies = scala.collection.mutable.MutableList.empty[MetricAnomalyInstance]
val conn : Connection = getConnection
var stmt : PreparedStatement = null
var rs : ResultSet = null
@@ -98,8 +108,8 @@ object PhoenixAnomalyStoreAccessor {
val anomalyScore: Double = rs.getDouble("ANOMALY_SCORE")
val modelSnapshot: String = rs.getString("MODEL_PARAMETERS")
- val metricKey: MetricKey = null //MetricManager.getMetricKeyFromUuid(uuid) //TODO
- val anomalyInstance: SingleMetricAnomalyInstance = new PointInTimeAnomalyInstance(metricKey, timestamp,
+ val metricKey: MetricKey = metricDefinitionService.getMetricKeyFromUuid(uuid)
+ val anomalyInstance: MetricAnomalyInstance = new PointInTimeAnomalyInstance(metricKey, timestamp,
metricValue, methodType, anomalyScore, season, modelSnapshot)
anomalies.+=(anomalyInstance)
}
@@ -115,8 +125,8 @@ object PhoenixAnomalyStoreAccessor {
val anomalyScore: Double = rs.getDouble("ANOMALY_SCORE")
val modelSnapshot: String = rs.getString("MODEL_PARAMETERS")
- val metricKey: MetricKey = null //MetricManager.getMetricKeyFromUuid(uuid) //TODO
- val anomalyInstance: SingleMetricAnomalyInstance = TrendAnomalyInstance(metricKey,
+ val metricKey: MetricKey = metricDefinitionService.getMetricKeyFromUuid(uuid)
+ val anomalyInstance: MetricAnomalyInstance = TrendAnomalyInstance(metricKey,
TimeRange(anomalyStart, anomalyEnd),
TimeRange(referenceStart, referenceEnd),
methodType, anomalyScore, season, modelSnapshot)
@@ -127,11 +137,11 @@ object PhoenixAnomalyStoreAccessor {
case e: SQLException => throw e
}
- anomalies
+ anomalies.toList
}
@throws[SQLException]
- def prepareAnomalyMetricsGetSqlStatement(connection: Connection, anomalyType: AnomalyType, startTime: Long, endTime: Long, limit: Int): PreparedStatement = {
+ private def prepareAnomalyMetricsGetSqlStatement(connection: Connection, anomalyType: AnomalyType, startTime: Long, endTime: Long, limit: Int): PreparedStatement = {
val sb = new StringBuilder
@@ -145,11 +155,11 @@ object PhoenixAnomalyStoreAccessor {
var stmt: java.sql.PreparedStatement = null
try {
stmt = connection.prepareStatement(sb.toString)
- var pos = 1
- pos += 1
+ var pos = 1
stmt.setLong(pos, startTime)
+ pos += 1
stmt.setLong(pos, endTime)
stmt.setFetchSize(limit)
@@ -157,9 +167,32 @@ object PhoenixAnomalyStoreAccessor {
} catch {
case e: SQLException =>
if (stmt != null)
- stmt
+ return stmt
throw e
}
stmt
}
+
+ @throws[SQLException]
+ private def getConnection: Connection = datasource.getConnection
+
+ @throws[SQLException]
+ @throws[InterruptedException]
+ private def getConnectionRetryingOnException (retryCounterFactory : RetryCounterFactory) : Connection = {
+ val retryCounter = retryCounterFactory.create
+ while(true) {
+ try
+ return getConnection
+ catch {
+ case e: SQLException =>
+ if (!retryCounter.shouldRetry) {
+ LOG.error("HBaseAccessor getConnection failed after " + retryCounter.getMaxAttempts + " attempts")
+ throw e
+ }
+ }
+ retryCounter.sleepUntilNextRetry()
+ }
+ null
+ }
+
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/ba9be802/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/db/PhoenixConnectionProvider.scala
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/db/PhoenixConnectionProvider.scala b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/db/PhoenixConnectionProvider.scala
new file mode 100644
index 0000000..1faf1ba
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/db/PhoenixConnectionProvider.scala
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *//**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.metrics.adservice.db
+
+import org.apache.hadoop.hbase.client.HBaseAdmin
+import java.io.IOException
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+trait PhoenixConnectionProvider extends ConnectionProvider {
+ /**
+ * Get HBaseAdmin for the Phoenix connection
+ *
+ * @return
+ * @throws IOException
+ */
+ @throws[IOException]
+ def getHBaseAdmin: HBaseAdmin
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ambari/blob/ba9be802/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/db/PhoenixQueryConstants.scala
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/db/PhoenixQueryConstants.scala b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/db/PhoenixQueryConstants.scala
index 5379c91..d9774e0 100644
--- a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/db/PhoenixQueryConstants.scala
+++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/db/PhoenixQueryConstants.scala
@@ -54,25 +54,25 @@ object PhoenixQueryConstants {
val CREATE_TREND_ANOMALY_METRICS_TABLE_SQL: String = "CREATE TABLE IF NOT EXISTS %s (" +
"METRIC_UUID BINARY(20) NOT NULL, " +
+ "METHOD_NAME VARCHAR, " +
"ANOMALY_PERIOD_START UNSIGNED_LONG NOT NULL, " +
"ANOMALY_PERIOD_END UNSIGNED_LONG NOT NULL, " +
"TEST_PERIOD_START UNSIGNED_LONG NOT NULL, " +
"TEST_PERIOD_END UNSIGNED_LONG NOT NULL, " +
- "METHOD_NAME VARCHAR, " +
"SEASONAL_INFO VARCHAR, " +
"ANOMALY_SCORE DOUBLE, " +
"MODEL_PARAMETERS VARCHAR, " +
"DETECTION_TIME UNSIGNED_LONG " +
"CONSTRAINT pk PRIMARY KEY (METRIC_UUID, METHOD_NAME, ANOMALY_PERIOD_START, ANOMALY_PERIOD_END, TEST_PERIOD_START, TEST_PERIOD_END)) " +
- "DATA_BLOCK_ENCODING='FAST_DIFF' IMMUTABLE_ROWS=true, TTL=%s, COMPRESSION='SNAPPY'"
+ "DATA_BLOCK_ENCODING='FAST_DIFF', IMMUTABLE_ROWS=true, TTL=%s, COMPRESSION='SNAPPY'"
val CREATE_MODEL_SNAPSHOT_TABLE: String = "CREATE TABLE IF NOT EXISTS %s (" +
- "METRIC_UUID BINARY(20), " +
+ "METRIC_UUID BINARY(20) NOT NULL, " +
"METHOD_NAME VARCHAR, " +
"METHOD_TYPE VARCHAR, " +
- "PARAMETERS VARCHAR " +
- "SNAPSHOT_TIME UNSIGNED LONG NOT NULL " +
- "CONSTRAINT pk PRIMARY KEY (METRIC_UUID, METHOD_NAME)) " +
+ "PARAMETERS VARCHAR, " +
+ "SNAPSHOT_TIME UNSIGNED_LONG NOT NULL " +
+ "CONSTRAINT pk PRIMARY KEY (METRIC_UUID, METHOD_NAME, SNAPSHOT_TIME)) " +
"DATA_BLOCK_ENCODING='FAST_DIFF', IMMUTABLE_ROWS=true, COMPRESSION='SNAPPY'"
//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
http://git-wip-us.apache.org/repos/asf/ambari/blob/ba9be802/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/leveldb/LevelDBDatasource.scala
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/leveldb/LevelDBDatasource.scala b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/leveldb/LevelDBDatasource.scala
index a34a60a..49ef272 100644
--- a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/leveldb/LevelDBDatasource.scala
+++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/leveldb/LevelDBDatasource.scala
@@ -42,7 +42,6 @@ class LevelDBDataSource() extends MetadataDatasource {
def this(appConfig: AnomalyDetectionAppConfig) = {
this
this.appConfig = appConfig
- initialize()
}
override def initialize(): Unit = {
@@ -83,6 +82,22 @@ class LevelDBDataSource() extends MetadataDatasource {
override def get(key: Key): Option[Value] = Option(db.get(key))
/**
+ * This function obtains all the values
+ *
+ * @return the list of values
+ */
+ def getAll: List[Value] = {
+ val values = scala.collection.mutable.MutableList.empty[Value]
+ val iterator = db.iterator()
+ iterator.seekToFirst()
+ while (iterator.hasNext) {
+ val entry: java.util.Map.Entry[Key, Value] = iterator.next()
+ values.+=(entry.getValue)
+ }
+ values.toList
+ }
+
+ /**
* This function updates the DataSource by deleting, updating and inserting new (key-value) pairs.
*
* @param toRemove which includes all the keys to be removed from the DataSource.
http://git-wip-us.apache.org/repos/asf/ambari/blob/ba9be802/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/metadata/ADMetadataProvider.scala
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/metadata/ADMetadataProvider.scala b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/metadata/ADMetadataProvider.scala
index 95b1b63..c277221 100644
--- a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/metadata/ADMetadataProvider.scala
+++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/metadata/ADMetadataProvider.scala
@@ -17,15 +17,17 @@
package org.apache.ambari.metrics.adservice.metadata
-import java.net.{HttpURLConnection, URL}
+import javax.ws.rs.core.Response
import org.apache.ambari.metrics.adservice.configuration.MetricCollectorConfiguration
import org.apache.commons.lang.StringUtils
-import org.apache.hadoop.metrics2.sink.timeline.TimelineMetricKey
+import org.slf4j.{Logger, LoggerFactory}
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.module.scala.experimental.ScalaObjectMapper
+import scalaj.http.{Http, HttpRequest, HttpResponse}
+
/**
* Class to invoke Metrics Collector metadata API.
* TODO : Instantiate a sync thread that regularly updates the internal maps by reading off AMS metadata.
@@ -36,6 +38,7 @@ class ADMetadataProvider extends MetricMetadataProvider {
var metricCollectorPort: String = _
var metricCollectorProtocol: String = _
var metricMetadataPath: String = "/v1/timeline/metrics/metadata/keys"
+ val LOG : Logger = LoggerFactory.getLogger(classOf[ADMetadataProvider])
val connectTimeout: Int = 10000
val readTimeout: Int = 10000
@@ -52,10 +55,8 @@ class ADMetadataProvider extends MetricMetadataProvider {
metricMetadataPath = configuration.getMetadataEndpoint
}
- override def getMetricKeysForDefinitions(metricSourceDefinition: MetricSourceDefinition): (Map[MetricDefinition,
- Set[MetricKey]], Set[MetricKey]) = {
+ override def getMetricKeysForDefinitions(metricSourceDefinition: MetricSourceDefinition): Set[MetricKey] = {
- val keysMap = scala.collection.mutable.Map[MetricDefinition, Set[MetricKey]]()
val numDefinitions: Int = metricSourceDefinition.metricDefinitions.size
val metricKeySet: scala.collection.mutable.Set[MetricKey] = scala.collection.mutable.Set.empty[MetricKey]
@@ -64,52 +65,79 @@ class ADMetadataProvider extends MetricMetadataProvider {
for (host <- metricCollectorHosts) {
val metricKeys: Set[MetricKey] = getKeysFromMetricsCollector(metricCollectorProtocol, host, metricCollectorPort, metricMetadataPath, metricDef)
if (metricKeys != null) {
- keysMap += (metricDef -> metricKeys)
- metricKeySet.++(metricKeys)
+ metricKeySet.++=(metricKeys)
}
}
}
}
- (keysMap.toMap, metricKeySet.toSet)
+ metricKeySet.toSet
}
/**
- * Make Metrics Collector REST API call to fetch keys.
*
- * @param url
+ * @param protocol
+ * @param host
+ * @param port
+ * @param path
* @param metricDefinition
* @return
*/
def getKeysFromMetricsCollector(protocol: String, host: String, port: String, path: String, metricDefinition: MetricDefinition): Set[MetricKey] = {
- val url: String = protocol + "://" + host + port + "/" + path
+ val url: String = protocol + "://" + host + ":" + port + path
val mapper = new ObjectMapper() with ScalaObjectMapper
+
+ if (metricDefinition.hosts == null || metricDefinition.hosts.isEmpty) {
+ val request: HttpRequest = Http(url)
+ .param("metricName", metricDefinition.metricName)
+ .param("appId", metricDefinition.appId)
+ makeHttpGetCall(request, mapper)
+ } else {
+ val metricKeySet: scala.collection.mutable.Set[MetricKey] = scala.collection.mutable.Set.empty[MetricKey]
+
+ for (h <- metricDefinition.hosts) {
+ val request: HttpRequest = Http(url)
+ .param("metricName", metricDefinition.metricName)
+ .param("appId", metricDefinition.appId)
+ .param("hostname", h)
+
+ val metricKeys = makeHttpGetCall(request, mapper)
+ metricKeySet.++=(metricKeys)
+ }
+ metricKeySet.toSet
+ }
+ }
+
+ private def makeHttpGetCall(request: HttpRequest, mapper: ObjectMapper): Set[MetricKey] = {
+
try {
- val connection = new URL(url).openConnection.asInstanceOf[HttpURLConnection]
- connection.setConnectTimeout(connectTimeout)
- connection.setReadTimeout(readTimeout)
- connection.setRequestMethod("GET")
- val inputStream = connection.getInputStream
- val content = scala.io.Source.fromInputStream(inputStream).mkString
- if (inputStream != null) inputStream.close()
- val metricKeySet: Set[MetricKey] = fromTimelineMetricKey(mapper.readValue[java.util.Set[TimelineMetricKey]](content))
- return metricKeySet
+ val result: HttpResponse[String] = request.asString
+ if (result.code == Response.Status.OK.getStatusCode) {
+ LOG.info("Successfully fetched metric keys from metrics collector")
+ val metricKeySet: java.util.Set[java.util.Map[String, String]] = mapper.readValue(result.body,
+ classOf[java.util.Set[java.util.Map[String, String]]])
+ getMetricKeys(metricKeySet)
+ } else {
+ LOG.error("Got an error when trying to fetch metric key from metrics collector. Code = " + result.code + ", Message = " + result.body)
+ }
} catch {
- case _: java.io.IOException | _: java.net.SocketTimeoutException => // handle this
+ case _: java.io.IOException | _: java.net.SocketTimeoutException => LOG.error("Unable to fetch metric keys from Metrics collector for : " + request.toString)
}
- null
+ Set.empty[MetricKey]
}
- def fromTimelineMetricKey(timelineMetricKeys: java.util.Set[TimelineMetricKey]): Set[MetricKey] = {
+
+ def getMetricKeys(timelineMetricKeys: java.util.Set[java.util.Map[String, String]]): Set[MetricKey] = {
val metricKeySet: scala.collection.mutable.Set[MetricKey] = scala.collection.mutable.Set.empty[MetricKey]
val iter = timelineMetricKeys.iterator()
while (iter.hasNext) {
- val timelineMetricKey: TimelineMetricKey = iter.next()
- val metricKey: MetricKey = MetricKey(timelineMetricKey.metricName,
- timelineMetricKey.appId,
- timelineMetricKey.instanceId,
- timelineMetricKey.hostName,
- timelineMetricKey.uuid)
+ val timelineMetricKey: java.util.Map[String, String] = iter.next()
+ val metricKey: MetricKey = MetricKey(
+ timelineMetricKey.get("metricName"),
+ timelineMetricKey.get("appId"),
+ timelineMetricKey.get("instanceId"),
+ timelineMetricKey.get("hostname"),
+ timelineMetricKey.get("uuid").getBytes())
metricKeySet.add(metricKey)
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/ba9be802/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/metadata/InputMetricDefinitionParser.scala
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/metadata/InputMetricDefinitionParser.scala b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/metadata/InputMetricDefinitionParser.scala
index cc66c90..3c8ea84 100644
--- a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/metadata/InputMetricDefinitionParser.scala
+++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/metadata/InputMetricDefinitionParser.scala
@@ -19,6 +19,8 @@ package org.apache.ambari.metrics.adservice.metadata
import java.io.File
+import org.apache.ambari.metrics.adservice.app.ADServiceScalaModule
+
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.module.scala.experimental.ScalaObjectMapper
@@ -30,15 +32,19 @@ object InputMetricDefinitionParser {
return List.empty[MetricSourceDefinition]
}
val mapper = new ObjectMapper() with ScalaObjectMapper
-
- def metricSourceDefinitions: List[MetricSourceDefinition] =
- for {
- file <- getFilesInDirectory(directory)
- definition: MetricSourceDefinition = mapper.readValue[MetricSourceDefinition](file)
- if definition != null
- } yield definition
-
- metricSourceDefinitions
+ mapper.registerModule(new ADServiceScalaModule)
+ val metricSourceDefinitions: scala.collection.mutable.MutableList[MetricSourceDefinition] =
+ scala.collection.mutable.MutableList.empty[MetricSourceDefinition]
+
+ for (file <- getFilesInDirectory(directory)) {
+ val source = scala.io.Source.fromFile(file)
+ val lines = try source.mkString finally source.close()
+ val definition: MetricSourceDefinition = mapper.readValue[MetricSourceDefinition](lines)
+ if (definition != null) {
+ metricSourceDefinitions.+=(definition)
+ }
+ }
+ metricSourceDefinitions.toList
}
private def getFilesInDirectory(directory: String): List[File] = {
http://git-wip-us.apache.org/repos/asf/ambari/blob/ba9be802/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/metadata/MetricDefinition.scala
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/metadata/MetricDefinition.scala b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/metadata/MetricDefinition.scala
index 036867b..c668dfa 100644
--- a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/metadata/MetricDefinition.scala
+++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/metadata/MetricDefinition.scala
@@ -19,6 +19,8 @@
package org.apache.ambari.metrics.adservice.metadata
import org.apache.commons.lang3.StringUtils
+
+import com.fasterxml.jackson.annotation.JsonIgnore
/*
{
"metric-name": "mem_free",
http://git-wip-us.apache.org/repos/asf/ambari/blob/ba9be802/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/metadata/MetricDefinitionService.scala
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/metadata/MetricDefinitionService.scala b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/metadata/MetricDefinitionService.scala
index 635dc60..52ce39e 100644
--- a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/metadata/MetricDefinitionService.scala
+++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/metadata/MetricDefinitionService.scala
@@ -17,7 +17,9 @@
package org.apache.ambari.metrics.adservice.metadata
-trait MetricDefinitionService {
+import org.apache.ambari.metrics.adservice.service.AbstractADService
+
+trait MetricDefinitionService extends AbstractADService{
/**
* Given a 'UUID', return the metric key associated with it.
@@ -27,6 +29,12 @@ trait MetricDefinitionService {
def getMetricKeyFromUuid(uuid: Array[Byte]) : MetricKey
/**
+ * Return all the definitions being tracked.
+ * @return Map of Metric Source Definition name to Metric Source Definition.
+ */
+ def getDefinitions: List[MetricSourceDefinition]
+
+ /**
* Given a component definition name, return the definition associated with it.
* @param name component definition name
* @return
@@ -61,4 +69,10 @@ trait MetricDefinitionService {
*/
def getDefinitionByAppId(appId: String) : List[MetricSourceDefinition]
+ /**
+ * Return the mapping between definition name to set of metric keys.
+ * @return Map of Metric Source Definition to set of metric keys associated with it.
+ */
+ def getMetricKeys: Map[String, Set[MetricKey]]
+
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/ba9be802/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/metadata/MetricDefinitionServiceImpl.scala
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/metadata/MetricDefinitionServiceImpl.scala b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/metadata/MetricDefinitionServiceImpl.scala
index c34d2dd..b9b4a7c 100644
--- a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/metadata/MetricDefinitionServiceImpl.scala
+++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/metadata/MetricDefinitionServiceImpl.scala
@@ -32,31 +32,24 @@ class MetricDefinitionServiceImpl extends MetricDefinitionService {
var configuration: AnomalyDetectionAppConfig = _
var metricMetadataProvider: MetricMetadataProvider = _
- var metricSourceDefinitionMap: Map[String, MetricSourceDefinition] = Map()
- var metricKeys: Set[MetricKey] = Set.empty[MetricKey]
- var metricDefinitionMetricKeyMap: Map[MetricDefinition, Set[MetricKey]] = Map()
+ val metricSourceDefinitionMap: scala.collection.mutable.Map[String, MetricSourceDefinition] = scala.collection.mutable.Map()
+ val metricDefinitionMetricKeyMap: scala.collection.mutable.Map[MetricSourceDefinition, Set[MetricKey]] = scala.collection.mutable.Map()
+ val metricKeys: scala.collection.mutable.Set[MetricKey] = scala.collection.mutable.Set.empty[MetricKey]
@Inject
def this (anomalyDetectionAppConfig: AnomalyDetectionAppConfig, metadataStoreAccessor: AdMetadataStoreAccessor) = {
this ()
adMetadataStoreAccessor = metadataStoreAccessor
configuration = anomalyDetectionAppConfig
- initializeService()
}
- def initializeService() : Unit = {
-
- //Create AD Metadata Schema
- //TODO Make sure AD Metadata DB is initialized here.
+ @Override
+ def initialize() : Unit = {
+ LOG.info("Initializing Metric Definition Service...")
//Initialize Metric Metadata Provider
metricMetadataProvider = new ADMetadataProvider(configuration.getMetricCollectorConfiguration)
- loadMetricSourceDefinitions()
- }
-
- def loadMetricSourceDefinitions() : Unit = {
-
//Load definitions from metadata store
val definitionsFromStore: List[MetricSourceDefinition] = adMetadataStoreAccessor.getSavedInputDefinitions
for (definition <- definitionsFromStore) {
@@ -71,14 +64,16 @@ class MetricDefinitionServiceImpl extends MetricDefinitionService {
//Union the 2 sources, with DB taking precedence.
//Save new definition list to DB.
- metricSourceDefinitionMap = metricSourceDefinitionMap.++(combineDefinitionSources(definitionsFromConfig, definitionsFromStore))
+ metricSourceDefinitionMap.++=(combineDefinitionSources(definitionsFromConfig, definitionsFromStore))
- //Reach out to AMS Metadata and get Metric Keys. Pass in List<CD> and get back (Map<MD,Set<MK>>, Set<MK>)
+ //Reach out to AMS Metadata and get Metric Keys. Pass in MSD and get back Set<MK>
for (definition <- metricSourceDefinitionMap.values) {
- val (definitionKeyMap: Map[MetricDefinition, Set[MetricKey]], keys: Set[MetricKey])= metricMetadataProvider.getMetricKeysForDefinitions(definition)
- metricDefinitionMetricKeyMap = metricDefinitionMetricKeyMap.++(definitionKeyMap)
- metricKeys = metricKeys.++(keys)
+ val keys: Set[MetricKey] = metricMetadataProvider.getMetricKeysForDefinitions(definition)
+ metricDefinitionMetricKeyMap(definition) = keys
+ metricKeys.++=(keys)
}
+
+ LOG.info("Successfully initialized Metric Definition Service.")
}
def getMetricKeyFromUuid(uuid: Array[Byte]): MetricKey = {
@@ -92,16 +87,24 @@ class MetricDefinitionServiceImpl extends MetricDefinitionService {
}
@Override
+ def getDefinitions: List[MetricSourceDefinition] = {
+ metricSourceDefinitionMap.values.toList
+ }
+
+ @Override
def getDefinitionByName(name: String): MetricSourceDefinition = {
if (!metricSourceDefinitionMap.contains(name)) {
LOG.warn("Metric Source Definition with name " + name + " not found")
+ null
+ } else {
+ metricSourceDefinitionMap.apply(name)
}
- metricSourceDefinitionMap.apply(name)
}
@Override
def addDefinition(definition: MetricSourceDefinition): Boolean = {
if (metricSourceDefinitionMap.contains(definition.definitionName)) {
+ LOG.info("Definition with name " + definition.definitionName + " already present.")
return false
}
definition.definitionSource = MetricSourceDefinitionType.API
@@ -109,6 +112,10 @@ class MetricDefinitionServiceImpl extends MetricDefinitionService {
val success: Boolean = adMetadataStoreAccessor.saveInputDefinition(definition)
if (success) {
metricSourceDefinitionMap += definition.definitionName -> definition
+ val keys: Set[MetricKey] = metricMetadataProvider.getMetricKeysForDefinitions(definition)
+ metricDefinitionMetricKeyMap(definition) = keys
+ metricKeys.++=(keys)
+ LOG.info("Successfully created metric source definition : " + definition.definitionName)
}
success
}
@@ -116,16 +123,22 @@ class MetricDefinitionServiceImpl extends MetricDefinitionService {
@Override
def updateDefinition(definition: MetricSourceDefinition): Boolean = {
if (!metricSourceDefinitionMap.contains(definition.definitionName)) {
+ LOG.warn("Metric Source Definition with name " + definition.definitionName + " not found")
return false
}
if (metricSourceDefinitionMap.apply(definition.definitionName).definitionSource != MetricSourceDefinitionType.API) {
return false
}
+ definition.definitionSource = MetricSourceDefinitionType.API
val success: Boolean = adMetadataStoreAccessor.saveInputDefinition(definition)
if (success) {
metricSourceDefinitionMap += definition.definitionName -> definition
+ val keys: Set[MetricKey] = metricMetadataProvider.getMetricKeysForDefinitions(definition)
+ metricDefinitionMetricKeyMap(definition) = keys
+ metricKeys.++=(keys)
+ LOG.info("Successfully updated metric source definition : " + definition.definitionName)
}
success
}
@@ -133,17 +146,22 @@ class MetricDefinitionServiceImpl extends MetricDefinitionService {
@Override
def deleteDefinitionByName(name: String): Boolean = {
if (!metricSourceDefinitionMap.contains(name)) {
+ LOG.warn("Metric Source Definition with name " + name + " not found")
return false
}
val definition : MetricSourceDefinition = metricSourceDefinitionMap.apply(name)
if (definition.definitionSource != MetricSourceDefinitionType.API) {
+ LOG.warn("Cannot delete metric source definition which was not created through API.")
return false
}
val success: Boolean = adMetadataStoreAccessor.removeInputDefinition(name)
if (success) {
- metricSourceDefinitionMap += definition.definitionName -> definition
+ metricSourceDefinitionMap -= definition.definitionName
+ metricKeys.--=(metricDefinitionMetricKeyMap.apply(definition))
+ metricDefinitionMetricKeyMap -= definition
+ LOG.info("Successfully deleted metric source definition : " + name)
}
success
}
@@ -183,7 +201,6 @@ class MetricDefinitionServiceImpl extends MetricDefinitionService {
this.adMetadataStoreAccessor = adMetadataStoreAccessor
}
-
/**
* Look into the Metric Definitions inside a Metric Source definition, and push down source level appId &
* hosts to Metric definition if they do not have an override.
@@ -202,7 +219,7 @@ class MetricDefinitionServiceImpl extends MetricDefinitionService {
}
}
- if (metricDef.isValid && metricDef.hosts.isEmpty) {
+ if (metricDef.isValid && (metricDef.hosts == null || metricDef.hosts.isEmpty)) {
if (sourceLevelHostList != null && sourceLevelHostList.nonEmpty) {
metricDef.hosts = sourceLevelHostList
}
@@ -210,4 +227,16 @@ class MetricDefinitionServiceImpl extends MetricDefinitionService {
}
}
+ /**
+ * Return the mapping between definition name to set of metric keys.
+ *
+ * @return Map of Metric Source Definition to set of metric keys associated with it.
+ */
+ override def getMetricKeys: Map[String, Set[MetricKey]] = {
+ val metricKeyMap: scala.collection.mutable.Map[String, Set[MetricKey]] = scala.collection.mutable.Map()
+ for (definition <- metricSourceDefinitionMap.values) {
+ metricKeyMap(definition.definitionName) = metricDefinitionMetricKeyMap.apply(definition)
+ }
+ metricKeyMap.toMap
+ }
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/ba9be802/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/metadata/MetricKey.scala
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/metadata/MetricKey.scala b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/metadata/MetricKey.scala
index afad617..65c496e 100644
--- a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/metadata/MetricKey.scala
+++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/metadata/MetricKey.scala
@@ -18,6 +18,9 @@
package org.apache.ambari.metrics.adservice.metadata
+import javax.xml.bind.annotation.XmlRootElement
+
+@XmlRootElement
case class MetricKey (metricName: String, appId: String, instanceId: String, hostname: String, uuid: Array[Byte]) {
@Override
http://git-wip-us.apache.org/repos/asf/ambari/blob/ba9be802/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/metadata/MetricMetadataProvider.scala
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/metadata/MetricMetadataProvider.scala b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/metadata/MetricMetadataProvider.scala
index 5f9c0a0..b5ba15e 100644
--- a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/metadata/MetricMetadataProvider.scala
+++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/metadata/MetricMetadataProvider.scala
@@ -27,5 +27,5 @@ trait MetricMetadataProvider {
* @param metricSourceDefinition component definition
* @return
*/
- def getMetricKeysForDefinitions(metricSourceDefinition: MetricSourceDefinition): (Map[MetricDefinition, Set[MetricKey]], Set[MetricKey])
+ def getMetricKeysForDefinitions(metricSourceDefinition: MetricSourceDefinition): Set[MetricKey]
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/ba9be802/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/model/MetricAnomalyInstance.scala
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/model/MetricAnomalyInstance.scala b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/model/MetricAnomalyInstance.scala
new file mode 100644
index 0000000..248a380
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/model/MetricAnomalyInstance.scala
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.metrics.adservice.model
+
+import javax.xml.bind.annotation.XmlRootElement
+
+import org.apache.ambari.metrics.adservice.metadata.MetricKey
+import org.apache.ambari.metrics.adservice.model.AnomalyType.AnomalyType
+
+@XmlRootElement
+abstract class MetricAnomalyInstance {
+
+ val metricKey: MetricKey
+ val anomalyType: AnomalyType
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ambari/blob/ba9be802/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/model/SingleMetricAnomalyInstance.scala
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/model/SingleMetricAnomalyInstance.scala b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/model/SingleMetricAnomalyInstance.scala
deleted file mode 100644
index 981a893..0000000
--- a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/model/SingleMetricAnomalyInstance.scala
+++ /dev/null
@@ -1,29 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ambari.metrics.adservice.model
-
-import org.apache.ambari.metrics.adservice.metadata.MetricKey
-import org.apache.ambari.metrics.adservice.model.AnomalyType.AnomalyType
-
-abstract class SingleMetricAnomalyInstance {
-
- val metricKey: MetricKey
- val anomalyType: AnomalyType
-
-}