You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by av...@apache.org on 2017/11/20 19:09:47 UTC

[2/2] ambari git commit: AMBARI-22470 : Refine Metric Definition Service and AD Query service. (avijayan)

AMBARI-22470 : Refine Metric Definition Service and AD Query service. (avijayan)


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/ba9be802
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/ba9be802
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/ba9be802

Branch: refs/heads/branch-3.0-ams
Commit: ba9be8028a7608521e1ea769a239276c3cc223bc
Parents: 0fcca47
Author: Aravindan Vijayan <av...@hortonworks.com>
Authored: Mon Nov 20 10:46:13 2017 -0800
Committer: Aravindan Vijayan <av...@hortonworks.com>
Committed: Mon Nov 20 10:46:13 2017 -0800

----------------------------------------------------------------------
 .../pom.xml                                     |  63 ++--
 .../src/main/resources/config.yml               |   2 +-
 .../src/main/resources/hbase-site.xml           | 286 +++++++++++++++++++
 .../adservice/app/ADServiceScalaModule.scala    |  50 ++++
 .../adservice/app/AnomalyDetectionApp.scala     |  10 +-
 .../app/AnomalyDetectionAppConfig.scala         |   4 +-
 .../app/AnomalyDetectionAppModule.scala         |   5 +-
 .../configuration/HBaseConfiguration.scala      |   3 +
 .../adservice/db/AdAnomalyStoreAccessor.scala   |  36 +++
 .../db/AdMetadataStoreAccessorImpl.scala        |  96 +++++++
 .../adservice/db/ConnectionProvider.scala       |  45 +++
 .../adservice/db/DefaultPhoenixDataSource.scala |  79 +++++
 .../adservice/db/LevelDbStoreAccessor.scala     |  56 ----
 .../adservice/db/MetadataDatasource.scala       |   6 +
 .../db/PhoenixAnomalyStoreAccessor.scala        |  75 +++--
 .../db/PhoenixConnectionProvider.scala          |  66 +++++
 .../adservice/db/PhoenixQueryConstants.scala    |  12 +-
 .../adservice/leveldb/LevelDBDatasource.scala   |  17 +-
 .../adservice/metadata/ADMetadataProvider.scala |  86 ++++--
 .../metadata/InputMetricDefinitionParser.scala  |  24 +-
 .../adservice/metadata/MetricDefinition.scala   |   2 +
 .../metadata/MetricDefinitionService.scala      |  16 +-
 .../metadata/MetricDefinitionServiceImpl.scala  |  73 +++--
 .../metrics/adservice/metadata/MetricKey.scala  |   3 +
 .../metadata/MetricMetadataProvider.scala       |   2 +-
 .../adservice/model/MetricAnomalyInstance.scala |  32 +++
 .../model/SingleMetricAnomalyInstance.scala     |  29 --
 .../adservice/resource/AnomalyResource.scala    |  55 +++-
 .../resource/MetricDefinitionResource.scala     |  77 ++++-
 .../adservice/resource/RootResource.scala       |   5 +-
 .../adservice/service/ADQueryService.scala      |   6 +-
 .../adservice/service/ADQueryServiceImpl.scala  |  25 +-
 .../adservice/service/AbstractADService.scala   |  44 +++
 .../PointInTimeAnomalyInstance.scala            |   4 +-
 .../subsystem/trend/TrendAnomalyInstance.scala  |   4 +-
 .../app/DefaultADResourceSpecTest.scala         |   5 +-
 .../metadata/AMSMetadataProviderTest.scala      |  16 +-
 .../metadata/MetricSourceDefinitionTest.scala   |  16 +-
 ambari-metrics/ambari-metrics-common/pom.xml    |  45 ---
 .../sink/timeline/TimelineMetricKey.java        |  59 ----
 .../sink/timeline/query/ConnectionProvider.java |  32 ---
 .../query/DefaultPhoenixDataSource.java         | 108 -------
 .../query/PhoenixConnectionProvider.java        |  31 --
 .../timeline/HBaseTimelineMetricsService.java   |  36 ++-
 .../metrics/timeline/PhoenixHBaseAccessor.java  |  26 +-
 .../metrics/timeline/TimelineMetricStore.java   |   3 +-
 .../timeline/query/ConnectionProvider.java      |  31 ++
 .../query/DefaultPhoenixDataSource.java         |  92 ++++++
 .../query/PhoenixConnectionProvider.java        |  31 ++
 .../webapp/TimelineWebServices.java             |  12 +-
 .../TestApplicationHistoryServer.java           |   2 +-
 .../timeline/AbstractMiniHBaseClusterTest.java  |  13 +-
 .../timeline/PhoenixHBaseAccessorTest.java      |  11 +-
 .../timeline/TestTimelineMetricStore.java       |   3 +-
 54 files changed, 1424 insertions(+), 546 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/ba9be802/ambari-metrics/ambari-metrics-anomaly-detection-service/pom.xml
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/pom.xml b/ambari-metrics/ambari-metrics-anomaly-detection-service/pom.xml
index 142f02f..c6927dd 100644
--- a/ambari-metrics/ambari-metrics-anomaly-detection-service/pom.xml
+++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/pom.xml
@@ -34,10 +34,12 @@
   <properties>
     <scala.version>2.12.3</scala.version>
     <scala.binary.version>2.11</scala.binary.version>
-    <hadoop.version>2.7.3.2.6.0.3-8</hadoop.version>
     <jackson.version>2.9.1</jackson.version>
     <dropwizard.version>1.2.0</dropwizard.version>
     <spark.version>2.2.0</spark.version>
+    <hadoop.version>2.7.3.2.6.0.3-8</hadoop.version>
+    <hbase.version>1.1.2.2.6.0.3-8</hbase.version>
+    <phoenix.version>4.7.0.2.6.0.3-8</phoenix.version>
   </properties>
   
   <repositories>
@@ -64,6 +66,7 @@
         <directory>src/main/resources</directory>
         <includes>
           <include>**/*.yml</include>
+          <include>**/*.xml</include>
           <include>**/*.txt</include>
         </includes>
       </resource>
@@ -145,6 +148,28 @@
                 <exclude>META-INF/*.RSA</exclude>
               </excludes>
             </filter>
+            <filter>
+              <artifact>org.apache.phoenix:phoenix-core</artifact>
+              <excludes>
+                <exclude>org/joda/time/**</exclude>
+                <exclude>com/codahale/metrics/**</exclude>
+                <exclude>com/google/common/collect/**</exclude>
+              </excludes>
+            </filter>
+            <filter>
+              <artifact>org.apache.phoenix:phoenix-core</artifact>
+              <excludes>
+                <exclude>org/joda/time/**</exclude>
+                <exclude>com/codahale/metrics/**</exclude>
+                <exclude>com/google/common/collect/**</exclude>
+              </excludes>
+            </filter>
+            <filter>
+              <artifact>*:*</artifact>
+              <excludes>
+                <exclude>com/sun/jersey/**</exclude>
+              </excludes>
+            </filter>
           </filters>
         </configuration>
         <executions>
@@ -245,33 +270,25 @@
     </dependency>
     <dependency>
       <groupId>org.apache.phoenix</groupId>
-      <artifactId>phoenix-spark</artifactId>
-      <version>4.10.0-HBase-1.1</version>
+      <artifactId>phoenix-core</artifactId>
+      <version>${phoenix.version}</version>
       <exclusions>
         <exclusion>
-          <artifactId>jersey-server</artifactId>
-          <groupId>com.sun.jersey</groupId>
+          <groupId>org.apache.hadoop</groupId>
+          <artifactId>hadoop-common</artifactId>
         </exclusion>
         <exclusion>
-          <artifactId>jersey-core</artifactId>
-          <groupId>com.sun.jersey</groupId>
+          <groupId>org.apache.hadoop</groupId>
+          <artifactId>hadoop-annotations</artifactId>
         </exclusion>
         <exclusion>
-          <artifactId>jersey-client</artifactId>
+          <artifactId>jersey-core</artifactId>
           <groupId>com.sun.jersey</groupId>
         </exclusion>
         <exclusion>
-          <artifactId>jersey-guice</artifactId>
-          <groupId>com.sun.jersey.contribs</groupId>
-        </exclusion>
-        <exclusion>
-          <artifactId>jersey-json</artifactId>
+          <artifactId>jersey-server</artifactId>
           <groupId>com.sun.jersey</groupId>
         </exclusion>
-        <exclusion>
-          <groupId>com.fasterxml.jackson.core</groupId>
-          <artifactId>jackson-databind</artifactId>
-        </exclusion>
       </exclusions>
     </dependency>
     <dependency>
@@ -379,6 +396,10 @@
           <groupId>org.slf4j</groupId>
           <artifactId>log4j-over-slf4j</artifactId>
         </exclusion>
+        <exclusion>
+          <artifactId>jersey-server</artifactId>
+          <groupId>org.glassfish.jersey.core</groupId>
+        </exclusion>
       </exclusions>
     </dependency>
     <dependency>
@@ -444,6 +465,12 @@
       <artifactId>leveldb</artifactId>
       <version>0.9</version>
     </dependency>
+    <!-- https://mvnrepository.com/artifact/org.scalaj/scalaj-http -->
+    <dependency>
+      <groupId>org.scalaj</groupId>
+      <artifactId>scalaj-http_2.12</artifactId>
+      <version>2.3.0</version>
+    </dependency>
 
     <dependency>
       <groupId>junit</groupId>
@@ -454,7 +481,7 @@
     <dependency>
       <groupId>com.google.guava</groupId>
       <artifactId>guava</artifactId>
-      <version>21.0</version>
+      <version>18.0</version>
     </dependency>
     <dependency>
       <groupId>io.dropwizard.metrics</groupId>

http://git-wip-us.apache.org/repos/asf/ambari/blob/ba9be802/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/resources/config.yml
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/resources/config.yml b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/resources/config.yml
index 9402f6e..7de06b4 100644
--- a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/resources/config.yml
+++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/resources/config.yml
@@ -27,7 +27,7 @@ metricsCollector:
   hosts: host1,host2
   port: 6188
   protocol: http
-  metadataEndpoint: /v1/timeline/metrics/metadata/keys
+  metadataEndpoint: /ws/v1/timeline/metrics/metadata/key
 
 adQueryService:
   anomalyDataTtl: 604800

http://git-wip-us.apache.org/repos/asf/ambari/blob/ba9be802/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/resources/hbase-site.xml
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/resources/hbase-site.xml b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/resources/hbase-site.xml
new file mode 100644
index 0000000..66f0454
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/resources/hbase-site.xml
@@ -0,0 +1,286 @@
+<?xml version="1.0"?>
+<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
+<!--
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+-->
+<configuration>
+    
+    <property>
+      <name>dfs.client.read.shortcircuit</name>
+      <value>true</value>
+    </property>
+    
+    <property>
+      <name>hbase.client.scanner.caching</name>
+      <value>10000</value>
+    </property>
+    
+    <property>
+      <name>hbase.client.scanner.timeout.period</name>
+      <value>300000</value>
+    </property>
+    
+    <property>
+      <name>hbase.cluster.distributed</name>
+      <value>false</value>
+    </property>
+    
+    <property>
+      <name>hbase.hregion.majorcompaction</name>
+      <value>0</value>
+    </property>
+    
+    <property>
+      <name>hbase.hregion.max.filesize</name>
+      <value>4294967296</value>
+    </property>
+    
+    <property>
+      <name>hbase.hregion.memstore.block.multiplier</name>
+      <value>4</value>
+    </property>
+    
+    <property>
+      <name>hbase.hregion.memstore.flush.size</name>
+      <value>134217728</value>
+    </property>
+    
+    <property>
+      <name>hbase.hstore.blockingStoreFiles</name>
+      <value>200</value>
+    </property>
+    
+    <property>
+      <name>hbase.hstore.flusher.count</name>
+      <value>2</value>
+    </property>
+    
+    <property>
+      <name>hbase.local.dir</name>
+      <value>${hbase.tmp.dir}/local</value>
+    </property>
+    
+    <property>
+      <name>hbase.master.info.bindAddress</name>
+      <value>0.0.0.0</value>
+    </property>
+    
+    <property>
+      <name>hbase.master.info.port</name>
+      <value>61310</value>
+    </property>
+    
+    <property>
+      <name>hbase.master.normalizer.class</name>
+      <value>org.apache.hadoop.hbase.master.normalizer.SimpleRegionNormalizer</value>
+    </property>
+    
+    <property>
+      <name>hbase.master.port</name>
+      <value>61300</value>
+    </property>
+    
+    <property>
+      <name>hbase.master.wait.on.regionservers.mintostart</name>
+      <value>1</value>
+    </property>
+    
+    <property>
+      <name>hbase.normalizer.enabled</name>
+      <value>false</value>
+    </property>
+    
+    <property>
+      <name>hbase.normalizer.period</name>
+      <value>600000</value>
+    </property>
+    
+    <property>
+      <name>hbase.regionserver.global.memstore.lowerLimit</name>
+      <value>0.3</value>
+    </property>
+    
+    <property>
+      <name>hbase.regionserver.global.memstore.upperLimit</name>
+      <value>0.35</value>
+    </property>
+    
+    <property>
+      <name>hbase.regionserver.info.port</name>
+      <value>61330</value>
+    </property>
+    
+    <property>
+      <name>hbase.regionserver.port</name>
+      <value>61320</value>
+    </property>
+    
+    <property>
+      <name>hbase.regionserver.thread.compaction.large</name>
+      <value>2</value>
+    </property>
+    
+    <property>
+      <name>hbase.regionserver.thread.compaction.small</name>
+      <value>3</value>
+    </property>
+    
+    <property>
+      <name>hbase.replication</name>
+      <value>false</value>
+    </property>
+    
+    <property>
+      <name>hbase.rootdir</name>
+      <value>file:///var/lib/ambari-metrics-collector/hbase</value>
+    </property>
+    
+    <property>
+      <name>hbase.rpc.timeout</name>
+      <value>300000</value>
+    </property>
+    
+    <property>
+      <name>hbase.snapshot.enabled</name>
+      <value>false</value>
+    </property>
+    
+    <property>
+      <name>hbase.superuser</name>
+      <value>activity_explorer,activity_analyzer</value>
+    </property>
+    
+    <property>
+      <name>hbase.tmp.dir</name>
+      <value>/var/lib/ambari-metrics-collector/hbase-tmp</value>
+    </property>
+    
+    <property>
+      <name>hbase.zookeeper.leaderport</name>
+      <value>61388</value>
+    </property>
+    
+    <property>
+      <name>hbase.zookeeper.peerport</name>
+      <value>61288</value>
+    </property>
+    
+    <property>
+      <name>hbase.zookeeper.property.clientPort</name>
+      <value>61181</value>
+    </property>
+    
+    <property>
+      <name>hbase.zookeeper.property.dataDir</name>
+      <value>${hbase.tmp.dir}/zookeeper</value>
+    </property>
+    
+    <property>
+      <name>hbase.zookeeper.property.tickTime</name>
+      <value>6000</value>
+    </property>
+    
+    <property>
+      <name>hbase.zookeeper.quorum</name>
+      <value>c6401.ambari.apache.org</value>
+      <final>true</final>
+    </property>
+    
+    <property>
+      <name>hfile.block.cache.size</name>
+      <value>0.3</value>
+    </property>
+    
+    <property>
+      <name>phoenix.coprocessor.maxMetaDataCacheSize</name>
+      <value>20480000</value>
+    </property>
+    
+    <property>
+      <name>phoenix.coprocessor.maxServerCacheTimeToLiveMs</name>
+      <value>60000</value>
+    </property>
+    
+    <property>
+      <name>phoenix.groupby.maxCacheSize</name>
+      <value>307200000</value>
+    </property>
+    
+    <property>
+      <name>phoenix.mutate.batchSize</name>
+      <value>10000</value>
+    </property>
+    
+    <property>
+      <name>phoenix.query.keepAliveMs</name>
+      <value>300000</value>
+    </property>
+    
+    <property>
+      <name>phoenix.query.maxGlobalMemoryPercentage</name>
+      <value>15</value>
+    </property>
+    
+    <property>
+      <name>phoenix.query.rowKeyOrderSaltedTable</name>
+      <value>true</value>
+    </property>
+    
+    <property>
+      <name>phoenix.query.spoolThresholdBytes</name>
+      <value>20971520</value>
+    </property>
+    
+    <property>
+      <name>phoenix.query.timeoutMs</name>
+      <value>300000</value>
+    </property>
+    
+    <property>
+      <name>phoenix.sequence.saltBuckets</name>
+      <value>2</value>
+    </property>
+    
+    <property>
+      <name>phoenix.spool.directory</name>
+      <value>${hbase.tmp.dir}/phoenix-spool</value>
+    </property>
+    
+    <property>
+      <name>zookeeper.session.timeout</name>
+      <value>120000</value>
+    </property>
+    
+    <property>
+      <name>zookeeper.session.timeout.localHBaseCluster</name>
+      <value>120000</value>
+    </property>
+    
+    <property>
+      <name>zookeeper.znode.parent</name>
+      <value>/ams-hbase-unsecure</value>
+    </property>
+
+    <property>
+      <name>hbase.use.dynamic.jars</name>
+      <value>false</value>
+    </property>
+
+  </configuration>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ambari/blob/ba9be802/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/app/ADServiceScalaModule.scala
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/app/ADServiceScalaModule.scala b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/app/ADServiceScalaModule.scala
new file mode 100644
index 0000000..8578a80
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/app/ADServiceScalaModule.scala
@@ -0,0 +1,50 @@
+/**
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  * http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package org.apache.ambari.metrics.adservice.app
+
+import com.fasterxml.jackson.module.scala._
+import com.fasterxml.jackson.module.scala.deser.{ScalaNumberDeserializersModule, UntypedObjectDeserializerModule}
+import com.fasterxml.jackson.module.scala.introspect.{ScalaAnnotationIntrospector, ScalaAnnotationIntrospectorModule}
+
+/**
+  * Extended Jackson Module that fixes the Scala-Jackson BytecodeReadingParanamer issue.
+  */
+class ADServiceScalaModule extends JacksonModule
+  with IteratorModule
+  with EnumerationModule
+  with OptionModule
+  with SeqModule
+  with IterableModule
+  with TupleModule
+  with MapModule
+  with SetModule
+  with FixedScalaAnnotationIntrospectorModule
+  with UntypedObjectDeserializerModule
+  with EitherModule {
+
+  override def getModuleName = "ADServiceScalaModule"
+
+  object ADServiceScalaModule extends ADServiceScalaModule
+
+}
+
+
+trait FixedScalaAnnotationIntrospectorModule extends JacksonModule {
+  this += { _.appendAnnotationIntrospector(ScalaAnnotationIntrospector) }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/ba9be802/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/app/AnomalyDetectionApp.scala
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/app/AnomalyDetectionApp.scala b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/app/AnomalyDetectionApp.scala
index 8b3a829..2d0dbdf 100644
--- a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/app/AnomalyDetectionApp.scala
+++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/app/AnomalyDetectionApp.scala
@@ -21,6 +21,9 @@ import javax.ws.rs.Path
 import javax.ws.rs.container.{ContainerRequestFilter, ContainerResponseFilter}
 
 import org.apache.ambari.metrics.adservice.app.GuiceInjector.{withInjector, wrap}
+import org.apache.ambari.metrics.adservice.db.{AdAnomalyStoreAccessor, MetadataDatasource}
+import org.apache.ambari.metrics.adservice.metadata.MetricDefinitionService
+import org.apache.ambari.metrics.adservice.service.ADQueryService
 import org.glassfish.jersey.filter.LoggingFilter
 
 import com.codahale.metrics.health.HealthCheck
@@ -45,6 +48,11 @@ class AnomalyDetectionApp extends Application[AnomalyDetectionAppConfig] {
       injector.instancesOfType(classOf[HealthCheck]).foreach { h => env.healthChecks.register(h.getClass.getName, h) }
       injector.instancesOfType(classOf[ContainerRequestFilter]).foreach { f => env.jersey().register(f) }
       injector.instancesOfType(classOf[ContainerResponseFilter]).foreach { f => env.jersey().register(f) }
+
+      //Initialize Services
+      injector.getInstance(classOf[MetadataDatasource]).initialize
+      injector.getInstance(classOf[MetricDefinitionService]).initialize
+      injector.getInstance(classOf[ADQueryService]).initialize
     }
     env.jersey.register(jacksonJaxbJsonProvider)
     env.jersey.register(new LoggingFilter)
@@ -53,7 +61,7 @@ class AnomalyDetectionApp extends Application[AnomalyDetectionAppConfig] {
   private def jacksonJaxbJsonProvider: JacksonJaxbJsonProvider = {
     val provider = new JacksonJaxbJsonProvider()
     val objectMapper = new ObjectMapper()
-    objectMapper.registerModule(DefaultScalaModule)
+    objectMapper.registerModule(new ADServiceScalaModule)
     objectMapper.registerModule(new JodaModule)
     objectMapper.configure(SerializationFeature.WRAP_ROOT_VALUE, false)
     objectMapper.configure(SerializationFeature.FAIL_ON_EMPTY_BEANS, false)

http://git-wip-us.apache.org/repos/asf/ambari/blob/ba9be802/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/app/AnomalyDetectionAppConfig.scala
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/app/AnomalyDetectionAppConfig.scala b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/app/AnomalyDetectionAppConfig.scala
index 93f6b28..f9ed4b2 100644
--- a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/app/AnomalyDetectionAppConfig.scala
+++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/app/AnomalyDetectionAppConfig.scala
@@ -42,7 +42,7 @@ class AnomalyDetectionAppConfig extends Configuration {
   private val metricCollectorConfiguration = new MetricCollectorConfiguration
 
   /*
-   Anomaly Service configuration
+   Anomaly Query Service configuration
     */
   @Valid
   private val adServiceConfiguration = new AdServiceConfiguration
@@ -54,7 +54,7 @@ class AnomalyDetectionAppConfig extends Configuration {
   private val metricDefinitionDBConfiguration = new MetricDefinitionDBConfiguration
 
   /*
-   HBase Conf
+   AMS HBase Conf
     */
   @JsonIgnore
   def getHBaseConf : org.apache.hadoop.conf.Configuration = {

http://git-wip-us.apache.org/repos/asf/ambari/blob/ba9be802/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/app/AnomalyDetectionAppModule.scala
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/app/AnomalyDetectionAppModule.scala b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/app/AnomalyDetectionAppModule.scala
index a896563..68e9df9 100644
--- a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/app/AnomalyDetectionAppModule.scala
+++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/app/AnomalyDetectionAppModule.scala
@@ -17,7 +17,7 @@
   */
 package org.apache.ambari.metrics.adservice.app
 
-import org.apache.ambari.metrics.adservice.db.{AdMetadataStoreAccessor, LevelDbStoreAccessor, MetadataDatasource}
+import org.apache.ambari.metrics.adservice.db._
 import org.apache.ambari.metrics.adservice.leveldb.LevelDBDataSource
 import org.apache.ambari.metrics.adservice.metadata.{MetricDefinitionService, MetricDefinitionServiceImpl}
 import org.apache.ambari.metrics.adservice.resource.{AnomalyResource, MetricDefinitionResource, RootResource}
@@ -38,9 +38,10 @@ class AnomalyDetectionAppModule(config: AnomalyDetectionAppConfig, env: Environm
     bind(classOf[AnomalyResource])
     bind(classOf[MetricDefinitionResource])
     bind(classOf[RootResource])
-    bind(classOf[AdMetadataStoreAccessor]).to(classOf[LevelDbStoreAccessor])
+    bind(classOf[AdMetadataStoreAccessor]).to(classOf[AdMetadataStoreAccessorImpl])
     bind(classOf[ADQueryService]).to(classOf[ADQueryServiceImpl])
     bind(classOf[MetricDefinitionService]).to(classOf[MetricDefinitionServiceImpl])
     bind(classOf[MetadataDatasource]).to(classOf[LevelDBDataSource])
+    bind(classOf[AdAnomalyStoreAccessor]).to(classOf[PhoenixAnomalyStoreAccessor])
   }
 }

http://git-wip-us.apache.org/repos/asf/ambari/blob/ba9be802/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/configuration/HBaseConfiguration.scala
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/configuration/HBaseConfiguration.scala b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/configuration/HBaseConfiguration.scala
index a51a959..a95ff15 100644
--- a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/configuration/HBaseConfiguration.scala
+++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/configuration/HBaseConfiguration.scala
@@ -28,6 +28,9 @@ object HBaseConfiguration {
   var isInitialized: Boolean = false
   val LOG : Logger = LoggerFactory.getLogger("HBaseConfiguration")
 
+  /**
+    * Initialize the hbase conf from hbase-site present in classpath.
+    */
   def initConfigs(): Unit = {
     if (!isInitialized) {
       var classLoader: ClassLoader = Thread.currentThread.getContextClassLoader

http://git-wip-us.apache.org/repos/asf/ambari/blob/ba9be802/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/db/AdAnomalyStoreAccessor.scala
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/db/AdAnomalyStoreAccessor.scala b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/db/AdAnomalyStoreAccessor.scala
new file mode 100644
index 0000000..676b09a
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/db/AdAnomalyStoreAccessor.scala
@@ -0,0 +1,36 @@
+/**
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  * http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package org.apache.ambari.metrics.adservice.db
+
+import org.apache.ambari.metrics.adservice.model.AnomalyType.AnomalyType
+import org.apache.ambari.metrics.adservice.model.MetricAnomalyInstance
+
+/**
+  * Trait for anomaly store accessor. (Phoenix)
+  */
+trait AdAnomalyStoreAccessor {
+
+  def initialize(): Unit
+
+  def getMetricAnomalies(anomalyType: AnomalyType,
+                         startTime: Long,
+                         endTime: Long,
+                         limit: Int) : List[MetricAnomalyInstance]
+
+  }

http://git-wip-us.apache.org/repos/asf/ambari/blob/ba9be802/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/db/AdMetadataStoreAccessorImpl.scala
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/db/AdMetadataStoreAccessorImpl.scala b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/db/AdMetadataStoreAccessorImpl.scala
new file mode 100644
index 0000000..7405459
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/db/AdMetadataStoreAccessorImpl.scala
@@ -0,0 +1,96 @@
+/**
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  * http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package org.apache.ambari.metrics.adservice.db
+
+import org.apache.ambari.metrics.adservice.metadata.MetricSourceDefinition
+import org.apache.commons.lang.SerializationUtils
+
+import com.google.inject.Inject
+
+/**
+  * Implementation of the AdMetadataStoreAccessor.
+  * Serves as the adaptor between metric definition service and LevelDB worlds.
+  */
+class AdMetadataStoreAccessorImpl extends AdMetadataStoreAccessor {
+
+  @Inject
+  var metadataDataSource: MetadataDatasource = _
+
+  @Inject
+  def this(metadataDataSource: MetadataDatasource) = {
+    this
+    this.metadataDataSource = metadataDataSource
+  }
+
+  /**
+    * Return all saved component definitions from DB.
+    *
+    * @return
+    */
+  override def getSavedInputDefinitions: List[MetricSourceDefinition] = {
+    val valuesFromStore : List[MetadataDatasource#Value] = metadataDataSource.getAll
+    val definitions = scala.collection.mutable.MutableList.empty[MetricSourceDefinition]
+
+    for (value : Array[Byte] <- valuesFromStore) {
+      val definition : MetricSourceDefinition = SerializationUtils.deserialize(value).asInstanceOf[MetricSourceDefinition]
+      if (definition != null) {
+        definitions.+=(definition)
+      }
+    }
+    definitions.toList
+  }
+
+  /**
+    * Save a set of component definitions
+    *
+    * @param metricSourceDefinitions Set of component definitions
+    * @return Success / Failure
+    */
+  override def saveInputDefinitions(metricSourceDefinitions: List[MetricSourceDefinition]): Boolean = {
+    for (definition <- metricSourceDefinitions) {
+      saveInputDefinition(definition)
+    }
+    true
+  }
+
+  /**
+    * Save a component definition
+    *
+    * @param metricSourceDefinition component definition
+    * @return Success / Failure
+    */
+  override def saveInputDefinition(metricSourceDefinition: MetricSourceDefinition): Boolean = {
+    val storeValue : MetadataDatasource#Value = SerializationUtils.serialize(metricSourceDefinition)
+    val storeKey : MetadataDatasource#Key = metricSourceDefinition.definitionName.getBytes()
+    metadataDataSource.put(storeKey, storeValue)
+    true
+  }
+
+  /**
+    * Delete a component definition
+    *
+    * @param definitionName component definition
+    * @return
+    */
+  override def removeInputDefinition(definitionName: String): Boolean = {
+    val storeKey : MetadataDatasource#Key = definitionName.getBytes()
+    metadataDataSource.delete(storeKey)
+    true
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/ba9be802/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/db/ConnectionProvider.scala
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/db/ConnectionProvider.scala b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/db/ConnectionProvider.scala
new file mode 100644
index 0000000..cc02ed4
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/db/ConnectionProvider.scala
@@ -0,0 +1,45 @@
+/**
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  * http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  *//**
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  * http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+package org.apache.ambari.metrics.adservice.db
+
+import java.sql.Connection
+import java.sql.SQLException
+
+/**
+  * Provides a connection to the anomaly store.
+  */
+trait ConnectionProvider {
+  @throws[SQLException]
+  def getConnection: Connection
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ambari/blob/ba9be802/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/db/DefaultPhoenixDataSource.scala
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/db/DefaultPhoenixDataSource.scala b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/db/DefaultPhoenixDataSource.scala
new file mode 100644
index 0000000..d9396de
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/db/DefaultPhoenixDataSource.scala
@@ -0,0 +1,79 @@
+/**
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  * http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+package org.apache.ambari.metrics.adservice.db
+
+import org.apache.commons.logging.LogFactory
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.hbase.client.ConnectionFactory
+import org.apache.hadoop.hbase.client.HBaseAdmin
+import java.io.IOException
+import java.sql.Connection
+import java.sql.DriverManager
+import java.sql.SQLException
+
+object DefaultPhoenixDataSource {
+  private[db] val LOG = LogFactory.getLog(classOf[DefaultPhoenixDataSource])
+  private val ZOOKEEPER_CLIENT_PORT = "hbase.zookeeper.property.clientPort"
+  private val ZOOKEEPER_QUORUM = "hbase.zookeeper.quorum"
+  private val ZNODE_PARENT = "zookeeper.znode.parent"
+  private val connectionUrl = "jdbc:phoenix:%s:%s:%s"
+}
+
+class DefaultPhoenixDataSource(var hbaseConf: Configuration) extends PhoenixConnectionProvider {
+
+  val zookeeperClientPort: String = hbaseConf.getTrimmed(DefaultPhoenixDataSource.ZOOKEEPER_CLIENT_PORT, "2181")
+  val zookeeperQuorum: String = hbaseConf.getTrimmed(DefaultPhoenixDataSource.ZOOKEEPER_QUORUM)
+  val znodeParent: String = hbaseConf.getTrimmed(DefaultPhoenixDataSource.ZNODE_PARENT, "/ams-hbase-unsecure")
+  final private var url : String = _
+
+  if (zookeeperQuorum == null || zookeeperQuorum.isEmpty) {
+    throw new IllegalStateException("Unable to find Zookeeper quorum to access HBase store using Phoenix.")
+  }
+  url = String.format(DefaultPhoenixDataSource.connectionUrl, zookeeperQuorum, zookeeperClientPort, znodeParent)
+
+
+  /**
+    * Get HBaseAdmin for table ops.
+    *
+    * @return @HBaseAdmin
+    * @throws IOException
+    */
+  @throws[IOException]
+  override def getHBaseAdmin: HBaseAdmin = ConnectionFactory.createConnection(hbaseConf).getAdmin.asInstanceOf[HBaseAdmin]
+
+  /**
+    * Get JDBC connection to HBase store. Assumption is that the hbase
+    * configuration is present on the classpath and loaded by the caller into
+    * the Configuration object.
+    * Phoenix already caches the HConnection between the client and HBase
+    * cluster.
+    *
+    * @return @java.sql.Connection
+    */
+  @throws[SQLException]
+  override def getConnection: Connection = {
+    DefaultPhoenixDataSource.LOG.debug("Metric store connection url: " + url)
+    try DriverManager.getConnection(url)
+    catch {
+      case e: SQLException =>
+        DefaultPhoenixDataSource.LOG.warn("Unable to connect to HBase store using Phoenix.", e)
+        throw e
+    }
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ambari/blob/ba9be802/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/db/LevelDbStoreAccessor.scala
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/db/LevelDbStoreAccessor.scala b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/db/LevelDbStoreAccessor.scala
deleted file mode 100644
index baad57d..0000000
--- a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/db/LevelDbStoreAccessor.scala
+++ /dev/null
@@ -1,56 +0,0 @@
-package org.apache.ambari.metrics.adservice.db
-
-import org.apache.ambari.metrics.adservice.metadata.MetricSourceDefinition
-
-import com.google.inject.Inject
-
-class LevelDbStoreAccessor extends AdMetadataStoreAccessor{
-
-  @Inject
-  var levelDbDataSource : MetadataDatasource = _
-
-  @Inject
-  def this(levelDbDataSource: MetadataDatasource) = {
-    this
-    this.levelDbDataSource = levelDbDataSource
-  }
-
-  /**
-    * Return all saved component definitions from DB.
-    *
-    * @return
-    */
-  override def getSavedInputDefinitions: List[MetricSourceDefinition] = {
-    List.empty[MetricSourceDefinition]
-  }
-
-  /**
-    * Save a set of component definitions
-    *
-    * @param metricSourceDefinitions Set of component definitions
-    * @return Success / Failure
-    */
-override def saveInputDefinitions(metricSourceDefinitions: List[MetricSourceDefinition]): Boolean = {
-  true
-}
-
-  /**
-    * Save a component definition
-    *
-    * @param metricSourceDefinition component definition
-    * @return Success / Failure
-    */
-  override def saveInputDefinition(metricSourceDefinition: MetricSourceDefinition): Boolean = {
-    true
-  }
-
-  /**
-    * Delete a component definition
-    *
-    * @param definitionName component definition
-    * @return
-    */
-  override def removeInputDefinition(definitionName: String): Boolean = {
-    true
-  }
-}

http://git-wip-us.apache.org/repos/asf/ambari/blob/ba9be802/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/db/MetadataDatasource.scala
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/db/MetadataDatasource.scala b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/db/MetadataDatasource.scala
index aa6694a..7b223a2 100644
--- a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/db/MetadataDatasource.scala
+++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/db/MetadataDatasource.scala
@@ -44,6 +44,12 @@ trait MetadataDatasource {
     */
   def get(key: Key): Option[Value]
 
+  /**
+    * This function obtains all the values
+    *
+    * @return the list of values
+    */
+  def getAll: List[Value]
 
   /**
     * This function associates a key to a value, overwriting if necessary

http://git-wip-us.apache.org/repos/asf/ambari/blob/ba9be802/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/db/PhoenixAnomalyStoreAccessor.scala
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/db/PhoenixAnomalyStoreAccessor.scala b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/db/PhoenixAnomalyStoreAccessor.scala
index 36aea21..147d1f7 100644
--- a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/db/PhoenixAnomalyStoreAccessor.scala
+++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/db/PhoenixAnomalyStoreAccessor.scala
@@ -23,48 +23,60 @@ import java.util.concurrent.TimeUnit.SECONDS
 import org.apache.ambari.metrics.adservice.app.AnomalyDetectionAppConfig
 import org.apache.ambari.metrics.adservice.common._
 import org.apache.ambari.metrics.adservice.configuration.HBaseConfiguration
-import org.apache.ambari.metrics.adservice.metadata.MetricKey
+import org.apache.ambari.metrics.adservice.metadata.{MetricDefinitionService, MetricKey}
 import org.apache.ambari.metrics.adservice.model.AnomalyDetectionMethod.AnomalyDetectionMethod
 import org.apache.ambari.metrics.adservice.model.AnomalyType.AnomalyType
-import org.apache.ambari.metrics.adservice.model.{AnomalyDetectionMethod, AnomalyType, SingleMetricAnomalyInstance}
+import org.apache.ambari.metrics.adservice.model.{AnomalyDetectionMethod, AnomalyType, MetricAnomalyInstance}
 import org.apache.ambari.metrics.adservice.subsystem.pointintime.PointInTimeAnomalyInstance
 import org.apache.ambari.metrics.adservice.subsystem.trend.TrendAnomalyInstance
 import org.apache.hadoop.hbase.util.RetryCounterFactory
-import org.apache.hadoop.metrics2.sink.timeline.query.{DefaultPhoenixDataSource, PhoenixConnectionProvider}
+import org.slf4j.{Logger, LoggerFactory}
 
 import com.google.inject.Inject
 
-object PhoenixAnomalyStoreAccessor  {
+/**
+  * Phoenix query handler class.
+  */
+class PhoenixAnomalyStoreAccessor extends AdAnomalyStoreAccessor {
 
   @Inject
   var configuration: AnomalyDetectionAppConfig = _
 
+  @Inject
+  var metricDefinitionService: MetricDefinitionService = _
+
   var datasource: PhoenixConnectionProvider = _
+  val LOG : Logger = LoggerFactory.getLogger(classOf[PhoenixAnomalyStoreAccessor])
 
-  def initAnomalyMetricSchema(): Unit = {
+  @Override
+  def initialize(): Unit = {
 
-    val datasource: PhoenixConnectionProvider = new DefaultPhoenixDataSource(HBaseConfiguration.getHBaseConf)
+    datasource = new DefaultPhoenixDataSource(HBaseConfiguration.getHBaseConf)
     val retryCounterFactory = new RetryCounterFactory(10, SECONDS.toMillis(3).toInt)
 
     val ttl = configuration.getAdServiceConfiguration.getAnomalyDataTtl
     try {
-      var conn = datasource.getConnectionRetryingOnException(retryCounterFactory)
+      var conn : Connection = getConnectionRetryingOnException(retryCounterFactory)
       var stmt = conn.createStatement
 
+      //Create Method parameters table.
       val methodParametersSql = String.format(PhoenixQueryConstants.CREATE_METHOD_PARAMETERS_TABLE,
         PhoenixQueryConstants.METHOD_PARAMETERS_TABLE_NAME)
       stmt.executeUpdate(methodParametersSql)
 
+      //Create Point in Time anomaly table
       val pointInTimeAnomalySql = String.format(PhoenixQueryConstants.CREATE_PIT_ANOMALY_METRICS_TABLE_SQL,
         PhoenixQueryConstants.PIT_ANOMALY_METRICS_TABLE_NAME,
         ttl.asInstanceOf[Object])
       stmt.executeUpdate(pointInTimeAnomalySql)
 
+      //Create Trend Anomaly table
       val trendAnomalySql = String.format(PhoenixQueryConstants.CREATE_TREND_ANOMALY_METRICS_TABLE_SQL,
         PhoenixQueryConstants.TREND_ANOMALY_METRICS_TABLE_NAME,
         ttl.asInstanceOf[Object])
       stmt.executeUpdate(trendAnomalySql)
 
+      //Create model snapshot table.
       val snapshotSql = String.format(PhoenixQueryConstants.CREATE_MODEL_SNAPSHOT_TABLE,
         PhoenixQueryConstants.MODEL_SNAPSHOT)
       stmt.executeUpdate(snapshotSql)
@@ -75,11 +87,9 @@ object PhoenixAnomalyStoreAccessor  {
     }
   }
 
-  @throws[SQLException]
-  def getConnection: Connection = datasource.getConnection
-
-  def getSingleMetricAnomalies(anomalyType: AnomalyType, startTime: Long, endTime: Long, limit: Int) : scala.collection.mutable.MutableList[SingleMetricAnomalyInstance] = {
-    val anomalies = scala.collection.mutable.MutableList.empty[SingleMetricAnomalyInstance]
+  @Override
+  def getMetricAnomalies(anomalyType: AnomalyType, startTime: Long, endTime: Long, limit: Int) : List[MetricAnomalyInstance] = {
+    val anomalies = scala.collection.mutable.MutableList.empty[MetricAnomalyInstance]
     val conn : Connection = getConnection
     var stmt : PreparedStatement = null
     var rs : ResultSet = null
@@ -98,8 +108,8 @@ object PhoenixAnomalyStoreAccessor  {
           val anomalyScore: Double = rs.getDouble("ANOMALY_SCORE")
           val modelSnapshot: String = rs.getString("MODEL_PARAMETERS")
 
-          val metricKey: MetricKey = null //MetricManager.getMetricKeyFromUuid(uuid) //TODO
-          val anomalyInstance: SingleMetricAnomalyInstance = new PointInTimeAnomalyInstance(metricKey, timestamp,
+          val metricKey: MetricKey = metricDefinitionService.getMetricKeyFromUuid(uuid)
+          val anomalyInstance: MetricAnomalyInstance = new PointInTimeAnomalyInstance(metricKey, timestamp,
             metricValue, methodType, anomalyScore, season, modelSnapshot)
           anomalies.+=(anomalyInstance)
         }
@@ -115,8 +125,8 @@ object PhoenixAnomalyStoreAccessor  {
           val anomalyScore: Double = rs.getDouble("ANOMALY_SCORE")
           val modelSnapshot: String = rs.getString("MODEL_PARAMETERS")
 
-          val metricKey: MetricKey = null //MetricManager.getMetricKeyFromUuid(uuid) //TODO
-          val anomalyInstance: SingleMetricAnomalyInstance = TrendAnomalyInstance(metricKey,
+          val metricKey: MetricKey = metricDefinitionService.getMetricKeyFromUuid(uuid)
+          val anomalyInstance: MetricAnomalyInstance = TrendAnomalyInstance(metricKey,
             TimeRange(anomalyStart, anomalyEnd),
             TimeRange(referenceStart, referenceEnd),
             methodType, anomalyScore, season, modelSnapshot)
@@ -127,11 +137,11 @@ object PhoenixAnomalyStoreAccessor  {
       case e: SQLException => throw e
     }
 
-    anomalies
+    anomalies.toList
   }
 
   @throws[SQLException]
-  def prepareAnomalyMetricsGetSqlStatement(connection: Connection, anomalyType: AnomalyType, startTime: Long, endTime: Long, limit: Int): PreparedStatement = {
+  private def prepareAnomalyMetricsGetSqlStatement(connection: Connection, anomalyType: AnomalyType, startTime: Long, endTime: Long, limit: Int): PreparedStatement = {
 
     val sb = new StringBuilder
 
@@ -145,11 +155,11 @@ object PhoenixAnomalyStoreAccessor  {
     var stmt: java.sql.PreparedStatement = null
     try {
       stmt = connection.prepareStatement(sb.toString)
-      var pos = 1
 
-      pos += 1
+      var pos = 1
       stmt.setLong(pos, startTime)
 
+      pos += 1
       stmt.setLong(pos, endTime)
 
       stmt.setFetchSize(limit)
@@ -157,9 +167,32 @@ object PhoenixAnomalyStoreAccessor  {
     } catch {
       case e: SQLException =>
         if (stmt != null)
-          stmt
+          return stmt
         throw e
     }
     stmt
   }
+
+  @throws[SQLException]
+  private def getConnection: Connection = datasource.getConnection
+
+  @throws[SQLException]
+  @throws[InterruptedException]
+  private def getConnectionRetryingOnException (retryCounterFactory : RetryCounterFactory) : Connection = {
+    val retryCounter = retryCounterFactory.create
+    while(true) {
+      try
+        return getConnection
+      catch {
+        case e: SQLException =>
+          if (!retryCounter.shouldRetry) {
+            LOG.error("HBaseAccessor getConnection failed after " + retryCounter.getMaxAttempts + " attempts")
+            throw e
+          }
+      }
+      retryCounter.sleepUntilNextRetry()
+    }
+    null
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/ambari/blob/ba9be802/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/db/PhoenixConnectionProvider.scala
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/db/PhoenixConnectionProvider.scala b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/db/PhoenixConnectionProvider.scala
new file mode 100644
index 0000000..1faf1ba
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/db/PhoenixConnectionProvider.scala
@@ -0,0 +1,66 @@
+/**
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  * http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  *//**
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  * http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package org.apache.ambari.metrics.adservice.db
+
+import org.apache.hadoop.hbase.client.HBaseAdmin
+import java.io.IOException
+
+/**
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  * <p/>
+  * http://www.apache.org/licenses/LICENSE-2.0
+  * <p/>
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+trait PhoenixConnectionProvider extends ConnectionProvider {
+  /**
+    * Get HBaseAdmin for the Phoenix connection
+    *
+    * @return
+    * @throws IOException
+    */
+    @throws[IOException]
+    def getHBaseAdmin: HBaseAdmin
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ambari/blob/ba9be802/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/db/PhoenixQueryConstants.scala
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/db/PhoenixQueryConstants.scala b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/db/PhoenixQueryConstants.scala
index 5379c91..d9774e0 100644
--- a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/db/PhoenixQueryConstants.scala
+++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/db/PhoenixQueryConstants.scala
@@ -54,25 +54,25 @@ object PhoenixQueryConstants {
 
   val CREATE_TREND_ANOMALY_METRICS_TABLE_SQL: String = "CREATE TABLE IF NOT EXISTS %s (" +
     "METRIC_UUID BINARY(20) NOT NULL, " +
+    "METHOD_NAME VARCHAR, " +
     "ANOMALY_PERIOD_START UNSIGNED_LONG NOT NULL, " +
     "ANOMALY_PERIOD_END UNSIGNED_LONG NOT NULL, " +
     "TEST_PERIOD_START UNSIGNED_LONG NOT NULL, " +
     "TEST_PERIOD_END UNSIGNED_LONG NOT NULL, " +
-    "METHOD_NAME VARCHAR, " +
     "SEASONAL_INFO VARCHAR, " +
     "ANOMALY_SCORE DOUBLE, " +
     "MODEL_PARAMETERS VARCHAR, " +
     "DETECTION_TIME UNSIGNED_LONG " +
     "CONSTRAINT pk PRIMARY KEY (METRIC_UUID, METHOD_NAME, ANOMALY_PERIOD_START, ANOMALY_PERIOD_END, TEST_PERIOD_START, TEST_PERIOD_END)) " +
-    "DATA_BLOCK_ENCODING='FAST_DIFF' IMMUTABLE_ROWS=true, TTL=%s, COMPRESSION='SNAPPY'"
+    "DATA_BLOCK_ENCODING='FAST_DIFF', IMMUTABLE_ROWS=true, TTL=%s, COMPRESSION='SNAPPY'"
 
   val CREATE_MODEL_SNAPSHOT_TABLE: String = "CREATE TABLE IF NOT EXISTS %s (" +
-    "METRIC_UUID BINARY(20), " +
+    "METRIC_UUID BINARY(20) NOT NULL, " +
     "METHOD_NAME VARCHAR, " +
     "METHOD_TYPE VARCHAR, " +
-    "PARAMETERS VARCHAR " +
-    "SNAPSHOT_TIME UNSIGNED LONG NOT NULL " +
-    "CONSTRAINT pk PRIMARY KEY (METRIC_UUID, METHOD_NAME)) " +
+    "PARAMETERS VARCHAR, " +
+    "SNAPSHOT_TIME UNSIGNED_LONG NOT NULL " +
+    "CONSTRAINT pk PRIMARY KEY (METRIC_UUID, METHOD_NAME, SNAPSHOT_TIME)) " +
     "DATA_BLOCK_ENCODING='FAST_DIFF', IMMUTABLE_ROWS=true, COMPRESSION='SNAPPY'"
 
   //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////

http://git-wip-us.apache.org/repos/asf/ambari/blob/ba9be802/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/leveldb/LevelDBDatasource.scala
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/leveldb/LevelDBDatasource.scala b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/leveldb/LevelDBDatasource.scala
index a34a60a..49ef272 100644
--- a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/leveldb/LevelDBDatasource.scala
+++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/leveldb/LevelDBDatasource.scala
@@ -42,7 +42,6 @@ class LevelDBDataSource() extends MetadataDatasource {
   def this(appConfig: AnomalyDetectionAppConfig) = {
     this
     this.appConfig = appConfig
-    initialize()
   }
 
   override def initialize(): Unit = {
@@ -83,6 +82,22 @@ class LevelDBDataSource() extends MetadataDatasource {
   override def get(key: Key): Option[Value] = Option(db.get(key))
 
   /**
+    * This function obtains all the values
+    *
+    * @return the list of values
+    */
+  def getAll: List[Value] = {
+    val values = scala.collection.mutable.MutableList.empty[Value]
+    val iterator = db.iterator()
+    iterator.seekToFirst()
+    while (iterator.hasNext) {
+      val entry: java.util.Map.Entry[Key, Value] = iterator.next()
+      values.+=(entry.getValue)
+    }
+    values.toList
+  }
+
+  /**
     * This function updates the DataSource by deleting, updating and inserting new (key-value) pairs.
     *
     * @param toRemove which includes all the keys to be removed from the DataSource.

http://git-wip-us.apache.org/repos/asf/ambari/blob/ba9be802/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/metadata/ADMetadataProvider.scala
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/metadata/ADMetadataProvider.scala b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/metadata/ADMetadataProvider.scala
index 95b1b63..c277221 100644
--- a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/metadata/ADMetadataProvider.scala
+++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/metadata/ADMetadataProvider.scala
@@ -17,15 +17,17 @@
 
 package org.apache.ambari.metrics.adservice.metadata
 
-import java.net.{HttpURLConnection, URL}
+import javax.ws.rs.core.Response
 
 import org.apache.ambari.metrics.adservice.configuration.MetricCollectorConfiguration
 import org.apache.commons.lang.StringUtils
-import org.apache.hadoop.metrics2.sink.timeline.TimelineMetricKey
+import org.slf4j.{Logger, LoggerFactory}
 
 import com.fasterxml.jackson.databind.ObjectMapper
 import com.fasterxml.jackson.module.scala.experimental.ScalaObjectMapper
 
+import scalaj.http.{Http, HttpRequest, HttpResponse}
+
 /**
   * Class to invoke Metrics Collector metadata API.
   * TODO : Instantiate a sync thread that regularly updates the internal maps by reading off AMS metadata.
@@ -36,6 +38,7 @@ class ADMetadataProvider extends MetricMetadataProvider {
   var metricCollectorPort: String = _
   var metricCollectorProtocol: String = _
   var metricMetadataPath: String = "/v1/timeline/metrics/metadata/keys"
+  val LOG : Logger = LoggerFactory.getLogger(classOf[ADMetadataProvider])
 
   val connectTimeout: Int = 10000
   val readTimeout: Int = 10000
@@ -52,10 +55,8 @@ class ADMetadataProvider extends MetricMetadataProvider {
     metricMetadataPath = configuration.getMetadataEndpoint
   }
 
-  override def getMetricKeysForDefinitions(metricSourceDefinition: MetricSourceDefinition): (Map[MetricDefinition,
-    Set[MetricKey]], Set[MetricKey]) = {
+  override def getMetricKeysForDefinitions(metricSourceDefinition: MetricSourceDefinition): Set[MetricKey] = {
 
-    val keysMap = scala.collection.mutable.Map[MetricDefinition, Set[MetricKey]]()
     val numDefinitions: Int = metricSourceDefinition.metricDefinitions.size
     val metricKeySet: scala.collection.mutable.Set[MetricKey] = scala.collection.mutable.Set.empty[MetricKey]
 
@@ -64,52 +65,79 @@ class ADMetadataProvider extends MetricMetadataProvider {
         for (host <- metricCollectorHosts) {
           val metricKeys: Set[MetricKey] = getKeysFromMetricsCollector(metricCollectorProtocol, host, metricCollectorPort, metricMetadataPath, metricDef)
           if (metricKeys != null) {
-            keysMap += (metricDef -> metricKeys)
-            metricKeySet.++(metricKeys)
+            metricKeySet.++=(metricKeys)
           }
         }
       }
     }
-    (keysMap.toMap, metricKeySet.toSet)
+    metricKeySet.toSet
   }
 
   /**
-    * Make Metrics Collector REST API call to fetch keys.
     *
-    * @param url
+    * @param protocol
+    * @param host
+    * @param port
+    * @param path
     * @param metricDefinition
     * @return
     */
   def getKeysFromMetricsCollector(protocol: String, host: String, port: String, path: String, metricDefinition: MetricDefinition): Set[MetricKey] = {
 
-    val url: String = protocol + "://" + host + port + "/" + path
+    val url: String = protocol + "://" + host + ":" + port + path
     val mapper = new ObjectMapper() with ScalaObjectMapper
+
+    if (metricDefinition.hosts == null || metricDefinition.hosts.isEmpty) {
+      val request: HttpRequest = Http(url)
+        .param("metricName", metricDefinition.metricName)
+        .param("appId", metricDefinition.appId)
+      makeHttpGetCall(request, mapper)
+    } else {
+      val metricKeySet: scala.collection.mutable.Set[MetricKey] = scala.collection.mutable.Set.empty[MetricKey]
+
+      for (h <- metricDefinition.hosts) {
+        val request: HttpRequest = Http(url)
+          .param("metricName", metricDefinition.metricName)
+          .param("appId", metricDefinition.appId)
+          .param("hostname", h)
+
+        val metricKeys = makeHttpGetCall(request, mapper)
+        metricKeySet.++=(metricKeys)
+      }
+      metricKeySet.toSet
+    }
+  }
+
+  private def makeHttpGetCall(request: HttpRequest, mapper: ObjectMapper): Set[MetricKey] = {
+
     try {
-      val connection = new URL(url).openConnection.asInstanceOf[HttpURLConnection]
-      connection.setConnectTimeout(connectTimeout)
-      connection.setReadTimeout(readTimeout)
-      connection.setRequestMethod("GET")
-      val inputStream = connection.getInputStream
-      val content = scala.io.Source.fromInputStream(inputStream).mkString
-      if (inputStream != null) inputStream.close()
-      val metricKeySet: Set[MetricKey] = fromTimelineMetricKey(mapper.readValue[java.util.Set[TimelineMetricKey]](content))
-      return metricKeySet
+      val result: HttpResponse[String] = request.asString
+      if (result.code == Response.Status.OK.getStatusCode) {
+        LOG.info("Successfully fetched metric keys from metrics collector")
+        val metricKeySet: java.util.Set[java.util.Map[String, String]] = mapper.readValue(result.body,
+          classOf[java.util.Set[java.util.Map[String, String]]])
+        getMetricKeys(metricKeySet)
+      } else {
+        LOG.error("Got an error when trying to fetch metric key from metrics collector. Code = " + result.code + ", Message = " + result.body)
+      }
     } catch {
-      case _: java.io.IOException | _: java.net.SocketTimeoutException => // handle this
+      case _: java.io.IOException | _: java.net.SocketTimeoutException => LOG.error("Unable to fetch metric keys from Metrics collector for : " + request.toString)
     }
-    null
+    Set.empty[MetricKey]
   }
 
-  def fromTimelineMetricKey(timelineMetricKeys: java.util.Set[TimelineMetricKey]): Set[MetricKey] = {
+
+  def getMetricKeys(timelineMetricKeys: java.util.Set[java.util.Map[String, String]]): Set[MetricKey] = {
     val metricKeySet: scala.collection.mutable.Set[MetricKey] = scala.collection.mutable.Set.empty[MetricKey]
     val iter = timelineMetricKeys.iterator()
     while (iter.hasNext) {
-      val timelineMetricKey: TimelineMetricKey = iter.next()
-      val metricKey: MetricKey = MetricKey(timelineMetricKey.metricName,
-        timelineMetricKey.appId,
-        timelineMetricKey.instanceId,
-        timelineMetricKey.hostName,
-        timelineMetricKey.uuid)
+      val timelineMetricKey: java.util.Map[String, String] = iter.next()
+      val metricKey: MetricKey = MetricKey(
+        timelineMetricKey.get("metricName"),
+        timelineMetricKey.get("appId"),
+        timelineMetricKey.get("instanceId"),
+        timelineMetricKey.get("hostname"),
+        timelineMetricKey.get("uuid").getBytes())
 
       metricKeySet.add(metricKey)
     }

http://git-wip-us.apache.org/repos/asf/ambari/blob/ba9be802/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/metadata/InputMetricDefinitionParser.scala
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/metadata/InputMetricDefinitionParser.scala b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/metadata/InputMetricDefinitionParser.scala
index cc66c90..3c8ea84 100644
--- a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/metadata/InputMetricDefinitionParser.scala
+++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/metadata/InputMetricDefinitionParser.scala
@@ -19,6 +19,8 @@ package org.apache.ambari.metrics.adservice.metadata
 
 import java.io.File
 
+import org.apache.ambari.metrics.adservice.app.ADServiceScalaModule
+
 import com.fasterxml.jackson.databind.ObjectMapper
 import com.fasterxml.jackson.module.scala.experimental.ScalaObjectMapper
 
@@ -30,15 +32,19 @@ object InputMetricDefinitionParser {
       return List.empty[MetricSourceDefinition]
     }
     val mapper = new ObjectMapper() with ScalaObjectMapper
-
-    def metricSourceDefinitions: List[MetricSourceDefinition] =
-      for {
-        file <- getFilesInDirectory(directory)
-        definition: MetricSourceDefinition = mapper.readValue[MetricSourceDefinition](file)
-        if definition != null
-      } yield definition
-
-    metricSourceDefinitions
+    mapper.registerModule(new ADServiceScalaModule)
+    val metricSourceDefinitions: scala.collection.mutable.MutableList[MetricSourceDefinition] =
+      scala.collection.mutable.MutableList.empty[MetricSourceDefinition]
+
+    for (file <- getFilesInDirectory(directory)) {
+      val source = scala.io.Source.fromFile(file)
+      val lines = try source.mkString finally source.close()
+      val definition: MetricSourceDefinition = mapper.readValue[MetricSourceDefinition](lines)
+      if (definition != null) {
+        metricSourceDefinitions.+=(definition)
+      }
+    }
+    metricSourceDefinitions.toList
   }
 
   private def getFilesInDirectory(directory: String): List[File] = {

http://git-wip-us.apache.org/repos/asf/ambari/blob/ba9be802/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/metadata/MetricDefinition.scala
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/metadata/MetricDefinition.scala b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/metadata/MetricDefinition.scala
index 036867b..c668dfa 100644
--- a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/metadata/MetricDefinition.scala
+++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/metadata/MetricDefinition.scala
@@ -19,6 +19,8 @@
 package org.apache.ambari.metrics.adservice.metadata
 
 import org.apache.commons.lang3.StringUtils
+
+import com.fasterxml.jackson.annotation.JsonIgnore
 /*
    {
        "metric-name": "mem_free",

http://git-wip-us.apache.org/repos/asf/ambari/blob/ba9be802/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/metadata/MetricDefinitionService.scala
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/metadata/MetricDefinitionService.scala b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/metadata/MetricDefinitionService.scala
index 635dc60..52ce39e 100644
--- a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/metadata/MetricDefinitionService.scala
+++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/metadata/MetricDefinitionService.scala
@@ -17,7 +17,9 @@
 
 package org.apache.ambari.metrics.adservice.metadata
 
-trait MetricDefinitionService {
+import org.apache.ambari.metrics.adservice.service.AbstractADService
+
+trait MetricDefinitionService extends AbstractADService{
 
   /**
     * Given a 'UUID', return the metric key associated with it.
@@ -27,6 +29,12 @@ trait MetricDefinitionService {
   def getMetricKeyFromUuid(uuid: Array[Byte]) : MetricKey
 
   /**
+    * Return all the definitions being tracked.
+    * @return Map of Metric Source Definition name to Metric Source Definition.
+    */
+  def getDefinitions: List[MetricSourceDefinition]
+
+  /**
     * Given a component definition name, return the definition associated with it.
     * @param name component definition name
     * @return
@@ -61,4 +69,10 @@ trait MetricDefinitionService {
     */
   def getDefinitionByAppId(appId: String) : List[MetricSourceDefinition]
 
+  /**
+    * Return the mapping between definition name to set of metric keys.
+    * @return Map of Metric Source Definition to set of metric keys associated with it.
+    */
+  def getMetricKeys:  Map[String, Set[MetricKey]]
+
 }

http://git-wip-us.apache.org/repos/asf/ambari/blob/ba9be802/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/metadata/MetricDefinitionServiceImpl.scala
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/metadata/MetricDefinitionServiceImpl.scala b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/metadata/MetricDefinitionServiceImpl.scala
index c34d2dd..b9b4a7c 100644
--- a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/metadata/MetricDefinitionServiceImpl.scala
+++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/metadata/MetricDefinitionServiceImpl.scala
@@ -32,31 +32,24 @@ class MetricDefinitionServiceImpl extends MetricDefinitionService {
   var configuration: AnomalyDetectionAppConfig = _
   var metricMetadataProvider: MetricMetadataProvider = _
 
-  var metricSourceDefinitionMap: Map[String, MetricSourceDefinition] = Map()
-  var metricKeys: Set[MetricKey] = Set.empty[MetricKey]
-  var metricDefinitionMetricKeyMap: Map[MetricDefinition, Set[MetricKey]] = Map()
+  val metricSourceDefinitionMap: scala.collection.mutable.Map[String, MetricSourceDefinition] = scala.collection.mutable.Map()
+  val metricDefinitionMetricKeyMap: scala.collection.mutable.Map[MetricSourceDefinition, Set[MetricKey]] = scala.collection.mutable.Map()
+  val metricKeys: scala.collection.mutable.Set[MetricKey] = scala.collection.mutable.Set.empty[MetricKey]
 
   @Inject
   def this (anomalyDetectionAppConfig: AnomalyDetectionAppConfig, metadataStoreAccessor: AdMetadataStoreAccessor) = {
     this ()
     adMetadataStoreAccessor = metadataStoreAccessor
     configuration = anomalyDetectionAppConfig
-    initializeService()
   }
 
-  def initializeService() : Unit = {
-
-    //Create AD Metadata Schema
-    //TODO Make sure AD Metadata DB is initialized here.
+  @Override
+  def initialize() : Unit = {
+    LOG.info("Initializing Metric Definition Service...")
 
     //Initialize Metric Metadata Provider
     metricMetadataProvider = new ADMetadataProvider(configuration.getMetricCollectorConfiguration)
 
-    loadMetricSourceDefinitions()
-  }
-
-  def loadMetricSourceDefinitions() : Unit = {
-
     //Load definitions from metadata store
     val definitionsFromStore: List[MetricSourceDefinition] = adMetadataStoreAccessor.getSavedInputDefinitions
     for (definition <- definitionsFromStore) {
@@ -71,14 +64,16 @@ class MetricDefinitionServiceImpl extends MetricDefinitionService {
 
     //Union the 2 sources, with DB taking precedence.
     //Save new definition list to DB.
-    metricSourceDefinitionMap = metricSourceDefinitionMap.++(combineDefinitionSources(definitionsFromConfig, definitionsFromStore))
+    metricSourceDefinitionMap.++=(combineDefinitionSources(definitionsFromConfig, definitionsFromStore))
 
-    //Reach out to AMS Metadata and get Metric Keys. Pass in List<CD> and get back (Map<MD,Set<MK>>, Set<MK>)
+    //Reach out to AMS Metadata and get Metric Keys. Pass in MSD and get back Set<MK>
     for (definition <- metricSourceDefinitionMap.values) {
-      val (definitionKeyMap: Map[MetricDefinition, Set[MetricKey]], keys: Set[MetricKey])= metricMetadataProvider.getMetricKeysForDefinitions(definition)
-      metricDefinitionMetricKeyMap = metricDefinitionMetricKeyMap.++(definitionKeyMap)
-      metricKeys = metricKeys.++(keys)
+      val keys: Set[MetricKey] = metricMetadataProvider.getMetricKeysForDefinitions(definition)
+      metricDefinitionMetricKeyMap(definition) = keys
+      metricKeys.++=(keys)
     }
+
+    LOG.info("Successfully initialized Metric Definition Service.")
   }
 
   def getMetricKeyFromUuid(uuid: Array[Byte]): MetricKey = {
@@ -92,16 +87,24 @@ class MetricDefinitionServiceImpl extends MetricDefinitionService {
   }
 
   @Override
+  def getDefinitions: List[MetricSourceDefinition] = {
+    metricSourceDefinitionMap.values.toList
+  }
+
+  @Override
   def getDefinitionByName(name: String): MetricSourceDefinition = {
     if (!metricSourceDefinitionMap.contains(name)) {
       LOG.warn("Metric Source Definition with name " + name + " not found")
+      null
+    } else {
+      metricSourceDefinitionMap.apply(name)
     }
-    metricSourceDefinitionMap.apply(name)
   }
 
   @Override
   def addDefinition(definition: MetricSourceDefinition): Boolean = {
     if (metricSourceDefinitionMap.contains(definition.definitionName)) {
+      LOG.info("Definition with name " + definition.definitionName + " already present.")
       return false
     }
     definition.definitionSource = MetricSourceDefinitionType.API
@@ -109,6 +112,10 @@ class MetricDefinitionServiceImpl extends MetricDefinitionService {
     val success: Boolean = adMetadataStoreAccessor.saveInputDefinition(definition)
     if (success) {
       metricSourceDefinitionMap += definition.definitionName -> definition
+      val keys: Set[MetricKey] = metricMetadataProvider.getMetricKeysForDefinitions(definition)
+      metricDefinitionMetricKeyMap(definition) = keys
+      metricKeys.++=(keys)
+      LOG.info("Successfully created metric source definition : " + definition.definitionName)
     }
     success
   }
@@ -116,16 +123,22 @@ class MetricDefinitionServiceImpl extends MetricDefinitionService {
   @Override
   def updateDefinition(definition: MetricSourceDefinition): Boolean = {
     if (!metricSourceDefinitionMap.contains(definition.definitionName)) {
+      LOG.warn("Metric Source Definition with name " + definition.definitionName + " not found")
       return false
     }
 
     if (metricSourceDefinitionMap.apply(definition.definitionName).definitionSource != MetricSourceDefinitionType.API) {
       return false
     }
+    definition.definitionSource = MetricSourceDefinitionType.API
 
     val success: Boolean = adMetadataStoreAccessor.saveInputDefinition(definition)
     if (success) {
       metricSourceDefinitionMap += definition.definitionName -> definition
+      val keys: Set[MetricKey] = metricMetadataProvider.getMetricKeysForDefinitions(definition)
+      metricDefinitionMetricKeyMap(definition) = keys
+      metricKeys.++=(keys)
+      LOG.info("Successfully updated metric source definition : " + definition.definitionName)
     }
     success
   }
@@ -133,17 +146,22 @@ class MetricDefinitionServiceImpl extends MetricDefinitionService {
   @Override
   def deleteDefinitionByName(name: String): Boolean = {
     if (!metricSourceDefinitionMap.contains(name)) {
+      LOG.warn("Metric Source Definition with name " + name + " not found")
       return false
     }
 
     val definition : MetricSourceDefinition = metricSourceDefinitionMap.apply(name)
     if (definition.definitionSource != MetricSourceDefinitionType.API) {
+      LOG.warn("Cannot delete metric source definition which was not created through API.")
       return false
     }
 
     val success: Boolean = adMetadataStoreAccessor.removeInputDefinition(name)
     if (success) {
-      metricSourceDefinitionMap += definition.definitionName -> definition
+      metricSourceDefinitionMap -= definition.definitionName
+      metricKeys.--=(metricDefinitionMetricKeyMap.apply(definition))
+      metricDefinitionMetricKeyMap -= definition
+      LOG.info("Successfully deleted metric source definition : " + name)
     }
     success
   }
@@ -183,7 +201,6 @@ class MetricDefinitionServiceImpl extends MetricDefinitionService {
     this.adMetadataStoreAccessor = adMetadataStoreAccessor
   }
 
-
   /**
     * Look into the Metric Definitions inside a Metric Source definition, and push down source level appId &
     * hosts to Metric definition if they do not have an override.
@@ -202,7 +219,7 @@ class MetricDefinitionServiceImpl extends MetricDefinitionService {
         }
       }
 
-      if (metricDef.isValid && metricDef.hosts.isEmpty) {
+      if (metricDef.isValid && (metricDef.hosts == null || metricDef.hosts.isEmpty)) {
         if (sourceLevelHostList != null && sourceLevelHostList.nonEmpty) {
           metricDef.hosts = sourceLevelHostList
         }
@@ -210,4 +227,16 @@ class MetricDefinitionServiceImpl extends MetricDefinitionService {
     }
   }
 
+  /**
+    * Return the mapping between definition name to set of metric keys.
+    *
+    * @return Map of Metric Source Definition to set of metric keys associated with it.
+    */
+  override def getMetricKeys: Map[String, Set[MetricKey]] = {
+    val metricKeyMap: scala.collection.mutable.Map[String, Set[MetricKey]] = scala.collection.mutable.Map()
+    for (definition <- metricSourceDefinitionMap.values) {
+      metricKeyMap(definition.definitionName) = metricDefinitionMetricKeyMap.apply(definition)
+    }
+    metricKeyMap.toMap
+  }
 }

http://git-wip-us.apache.org/repos/asf/ambari/blob/ba9be802/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/metadata/MetricKey.scala
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/metadata/MetricKey.scala b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/metadata/MetricKey.scala
index afad617..65c496e 100644
--- a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/metadata/MetricKey.scala
+++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/metadata/MetricKey.scala
@@ -18,6 +18,9 @@
 
 package org.apache.ambari.metrics.adservice.metadata
 
+import javax.xml.bind.annotation.XmlRootElement
+
+@XmlRootElement
 case class MetricKey (metricName: String, appId: String, instanceId: String, hostname: String, uuid: Array[Byte]) {
 
   @Override

http://git-wip-us.apache.org/repos/asf/ambari/blob/ba9be802/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/metadata/MetricMetadataProvider.scala
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/metadata/MetricMetadataProvider.scala b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/metadata/MetricMetadataProvider.scala
index 5f9c0a0..b5ba15e 100644
--- a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/metadata/MetricMetadataProvider.scala
+++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/metadata/MetricMetadataProvider.scala
@@ -27,5 +27,5 @@ trait MetricMetadataProvider {
     * @param metricSourceDefinition component definition
     * @return
     */
-  def getMetricKeysForDefinitions(metricSourceDefinition: MetricSourceDefinition): (Map[MetricDefinition, Set[MetricKey]], Set[MetricKey])
+  def getMetricKeysForDefinitions(metricSourceDefinition: MetricSourceDefinition): Set[MetricKey]
 }

http://git-wip-us.apache.org/repos/asf/ambari/blob/ba9be802/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/model/MetricAnomalyInstance.scala
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/model/MetricAnomalyInstance.scala b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/model/MetricAnomalyInstance.scala
new file mode 100644
index 0000000..248a380
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/model/MetricAnomalyInstance.scala
@@ -0,0 +1,32 @@
+/**
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  * http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package org.apache.ambari.metrics.adservice.model
+
+import javax.xml.bind.annotation.XmlRootElement
+
+import org.apache.ambari.metrics.adservice.metadata.MetricKey
+import org.apache.ambari.metrics.adservice.model.AnomalyType.AnomalyType
+
+@XmlRootElement
+abstract class MetricAnomalyInstance {
+
+  val metricKey: MetricKey
+  val anomalyType: AnomalyType
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ambari/blob/ba9be802/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/model/SingleMetricAnomalyInstance.scala
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/model/SingleMetricAnomalyInstance.scala b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/model/SingleMetricAnomalyInstance.scala
deleted file mode 100644
index 981a893..0000000
--- a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/model/SingleMetricAnomalyInstance.scala
+++ /dev/null
@@ -1,29 +0,0 @@
-/**
-  * Licensed to the Apache Software Foundation (ASF) under one
-  * or more contributor license agreements.  See the NOTICE file
-  * distributed with this work for additional information
-  * regarding copyright ownership.  The ASF licenses this file
-  * to you under the Apache License, Version 2.0 (the
-  * "License"); you may not use this file except in compliance
-  * with the License.  You may obtain a copy of the License at
-  *
-  * http://www.apache.org/licenses/LICENSE-2.0
-  *
-  * Unless required by applicable law or agreed to in writing, software
-  * distributed under the License is distributed on an "AS IS" BASIS,
-  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  * See the License for the specific language governing permissions and
-  * limitations under the License.
-  */
-
-package org.apache.ambari.metrics.adservice.model
-
-import org.apache.ambari.metrics.adservice.metadata.MetricKey
-import org.apache.ambari.metrics.adservice.model.AnomalyType.AnomalyType
-
-abstract class SingleMetricAnomalyInstance {
-
-  val metricKey: MetricKey
-  val anomalyType: AnomalyType
-
-}