You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by av...@apache.org on 2018/04/01 19:14:20 UTC

[ambari] 24/39: AMBARI-22343. Add ability in AMS to tee metrics to a set of configured Kafka brokers. (swagle)

This is an automated email from the ASF dual-hosted git repository.

avijayan pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/ambari.git

commit 64dcc02abc94859f9df316767b7f4589ea0b1502
Author: Siddharth Wagle <sw...@hortonworks.com>
AuthorDate: Tue Oct 31 11:34:45 2017 -0700

    AMBARI-22343. Add ability in AMS to tee metrics to a set of configured Kafka brokers. (swagle)
---
 .../libraries/functions/package_conditions.py      |   4 +
 .../pom.xml                                        |  11 ++
 .../src/main/resources/config.yml                  |   3 -
 .../ambari-metrics-timelineservice/pom.xml         | 127 ++++++++++++---------
 .../metrics/timeline/PhoenixHBaseAccessor.java     |  29 ++---
 .../timeline/TimelineMetricConfiguration.java      |  55 ++++++---
 .../timeline/sink/ExternalSinkProvider.java        |  12 +-
 .../metrics/timeline/sink/HttpSinkProvider.java    |   4 +-
 .../metrics/timeline/sink/KafkaSinkProvider.java   | 118 +++++++++++++++++++
 .../DefaultInternalMetricsSourceProvider.java      |   2 +-
 .../metrics/timeline/source/RawMetricsSource.java  |  17 +--
 .../0.1.0/configuration/ams-admanager-config.xml   |  60 ++++++++++
 .../0.1.0/configuration/ams-admanager-env.xml      | 105 +++++++++++++++++
 .../0.1.0/configuration/ams-site.xml               |   1 -
 .../AMBARI_METRICS/0.1.0/metainfo.xml              |  41 ++++++-
 .../AMBARI_METRICS/0.1.0/package/scripts/ams.py    |  35 ++++++
 .../0.1.0/package/scripts/ams_admanager.py         |  73 ++++++++++++
 .../AMBARI_METRICS/0.1.0/package/scripts/params.py |   9 ++
 .../AMBARI_METRICS/0.1.0/package/scripts/status.py |   2 +
 .../0.1.0/package/scripts/status_params.py         |   2 +
 .../package/templates/admanager_config.yaml.j2     |  24 ++++
 21 files changed, 619 insertions(+), 115 deletions(-)

diff --git a/ambari-common/src/main/python/resource_management/libraries/functions/package_conditions.py b/ambari-common/src/main/python/resource_management/libraries/functions/package_conditions.py
index ebc1aba..64cda98 100644
--- a/ambari-common/src/main/python/resource_management/libraries/functions/package_conditions.py
+++ b/ambari-common/src/main/python/resource_management/libraries/functions/package_conditions.py
@@ -50,6 +50,10 @@ def should_install_phoenix():
   has_phoenix = len(phoenix_hosts) > 0
   return phoenix_enabled or has_phoenix
 
+def should_install_ams_admanager():
+  config = Script.get_config()
+  return _has_applicable_local_component(config, ["AD_MANAGER"])
+
 def should_install_ams_collector():
   config = Script.get_config()
   return _has_applicable_local_component(config, ["METRICS_COLLECTOR"])
diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/pom.xml b/ambari-metrics/ambari-metrics-anomaly-detection-service/pom.xml
index 554d026..e96e957 100644
--- a/ambari-metrics/ambari-metrics-anomaly-detection-service/pom.xml
+++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/pom.xml
@@ -296,6 +296,12 @@
       <artifactId>spark-mllib_${scala.binary.version}</artifactId>
       <version>${spark.version}</version>
       <scope>provided</scope>
+      <exclusions>
+        <exclusion>
+          <groupId>com.fasterxml.jackson.core</groupId>
+          <artifactId>jackson-databind</artifactId>
+        </exclusion>
+      </exclusions>
     </dependency>
     <dependency>
       <groupId>org.apache.hadoop</groupId>
@@ -419,6 +425,11 @@
       <version>${jackson.version}</version>
     </dependency>
     <dependency>
+      <groupId>com.fasterxml.jackson.core</groupId>
+      <artifactId>jackson-databind</artifactId>
+      <version>${jackson.version}</version>
+    </dependency>
+    <dependency>
       <groupId>junit</groupId>
       <artifactId>junit</artifactId>
       <version>4.12</version>
diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/resources/config.yml b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/resources/config.yml
index 9ca9e95..bd88d57 100644
--- a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/resources/config.yml
+++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/resources/config.yml
@@ -2,9 +2,6 @@ server:
   applicationConnectors:
    - type: http
      port: 9999
-  adminConnectors:
-    - type: http
-      port: 9990
   requestLog:
     type: external
 
diff --git a/ambari-metrics/ambari-metrics-timelineservice/pom.xml b/ambari-metrics/ambari-metrics-timelineservice/pom.xml
index 3d119f9..7794a11 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/pom.xml
+++ b/ambari-metrics/ambari-metrics-timelineservice/pom.xml
@@ -80,7 +80,8 @@
               <outputDirectory>${project.build.directory}/lib</outputDirectory>
               <includeScope>compile</includeScope>
               <excludeScope>test</excludeScope>
-              <excludeArtifactIds>jasper-runtime,jasper-compiler</excludeArtifactIds>
+              <excludeArtifactIds>jasper-runtime,jasper-compiler
+              </excludeArtifactIds>
             </configuration>
           </execution>
         </executions>
@@ -125,11 +126,13 @@
                 <source>
                   <location>target/lib</location>
                   <excludes>
-                  <exclude>*tests.jar</exclude>
+                    <exclude>*tests.jar</exclude>
                   </excludes>
                 </source>
                 <source>
-                  <location>${project.build.directory}/${project.artifactId}-${project.version}.jar</location>
+                  <location>
+                    ${project.build.directory}/${project.artifactId}-${project.version}.jar
+                  </location>
                 </source>
               </sources>
             </mapping>
@@ -214,7 +217,9 @@
                   <location>conf/unix/amshbase_metrics_whitelist</location>
                 </source>
                 <source>
-                  <location>target/embedded/${hbase.folder}/conf/hbase-site.xml</location>
+                  <location>
+                    target/embedded/${hbase.folder}/conf/hbase-site.xml
+                  </location>
                 </source>
               </sources>
             </mapping>
@@ -287,7 +292,8 @@
           <skip>true</skip>
           <attach>false</attach>
           <submodules>false</submodules>
-          <controlDir>${project.basedir}/../src/main/package/deb/control</controlDir>
+          <controlDir>${project.basedir}/../src/main/package/deb/control
+          </controlDir>
         </configuration>
       </plugin>
     </plugins>
@@ -657,23 +663,29 @@
       <scope>test</scope>
       <classifier>tests</classifier>
     </dependency>
-      <dependency>
-        <groupId>org.apache.hbase</groupId>
-        <artifactId>hbase-testing-util</artifactId>
-        <version>${hbase.version}</version>
-        <scope>test</scope>
-        <optional>true</optional>
-        <exclusions>
-          <exclusion>
-            <groupId>org.jruby</groupId>
-            <artifactId>jruby-complete</artifactId>
-          </exclusion>
-          <exclusion>
-            <artifactId>zookeeper</artifactId>
-            <groupId>org.apache.zookeeper</groupId>
-          </exclusion>
-        </exclusions>
-      </dependency>
+    <dependency>
+      <groupId>org.apache.hbase</groupId>
+      <artifactId>hbase-testing-util</artifactId>
+      <version>${hbase.version}</version>
+      <scope>test</scope>
+      <optional>true</optional>
+      <exclusions>
+        <exclusion>
+          <groupId>org.jruby</groupId>
+          <artifactId>jruby-complete</artifactId>
+        </exclusion>
+        <exclusion>
+          <artifactId>zookeeper</artifactId>
+          <groupId>org.apache.zookeeper</groupId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.kafka</groupId>
+      <artifactId>kafka-clients</artifactId>
+      <version>0.11.0.1</version>
+    </dependency>
+
     <dependency>
       <groupId>org.powermock</groupId>
       <artifactId>powermock-module-junit4</artifactId>
@@ -731,17 +743,17 @@
                 </goals>
                 <configuration>
                   <target name="Download HBase">
-                    <mkdir dir="${project.build.directory}/embedded" />
+                    <mkdir dir="${project.build.directory}/embedded"/>
                     <get
-                      src="${hbase.tar}"
-                      dest="${project.build.directory}/embedded/hbase.tar.gz"
-                      usetimestamp="true"
-                      />
+                        src="${hbase.tar}"
+                        dest="${project.build.directory}/embedded/hbase.tar.gz"
+                        usetimestamp="true"
+                    />
                     <untar
-                      src="${project.build.directory}/embedded/hbase.tar.gz"
-                      dest="${project.build.directory}/embedded"
-                      compression="gzip"
-                      />
+                        src="${project.build.directory}/embedded/hbase.tar.gz"
+                        dest="${project.build.directory}/embedded"
+                        compression="gzip"
+                    />
                   </target>
                 </configuration>
               </execution>
@@ -755,19 +767,19 @@
                   <target name="Download Phoenix">
                     <mkdir dir="${project.build.directory}/embedded"/>
                     <get
-                      src="${phoenix.tar}"
-                      dest="${project.build.directory}/embedded/phoenix.tar.gz"
-                      usetimestamp="true"
-                      />
+                        src="${phoenix.tar}"
+                        dest="${project.build.directory}/embedded/phoenix.tar.gz"
+                        usetimestamp="true"
+                    />
                     <untar
-                      src="${project.build.directory}/embedded/phoenix.tar.gz"
-                      dest="${project.build.directory}/embedded"
-                      compression="gzip"
-                      />
+                        src="${project.build.directory}/embedded/phoenix.tar.gz"
+                        dest="${project.build.directory}/embedded"
+                        compression="gzip"
+                    />
                     <move
-                      file="${project.build.directory}/embedded/${phoenix.folder}/phoenix-${phoenix.version}-server.jar"
-                      tofile="${project.build.directory}/embedded/${hbase.folder}/lib/phoenix-${phoenix.version}-server.jar"
-                      />
+                        file="${project.build.directory}/embedded/${phoenix.folder}/phoenix-${phoenix.version}-server.jar"
+                        tofile="${project.build.directory}/embedded/${hbase.folder}/lib/phoenix-${phoenix.version}-server.jar"
+                    />
                   </target>
                 </configuration>
               </execution>
@@ -798,24 +810,24 @@
                 </goals>
                 <configuration>
                   <target name="Download HBase">
-                    <mkdir dir="${project.build.directory}/embedded" />
+                    <mkdir dir="${project.build.directory}/embedded"/>
                     <get
-                      src="${hbase.winpkg.zip}"
-                      dest="${project.build.directory}/embedded/hbase.zip"
-                      usetimestamp="true"
-                      />
+                        src="${hbase.winpkg.zip}"
+                        dest="${project.build.directory}/embedded/hbase.zip"
+                        usetimestamp="true"
+                    />
                     <unzip
-                      src="${project.build.directory}/embedded/hbase.zip"
-                      dest="${project.build.directory}/embedded/hbase.temp"
-                      />
+                        src="${project.build.directory}/embedded/hbase.zip"
+                        dest="${project.build.directory}/embedded/hbase.temp"
+                    />
                     <unzip
-                      src="${project.build.directory}/embedded/hbase.temp/resources/${hbase.winpkg.folder}.zip"
-                      dest="${project.build.directory}/embedded"
-                      />
+                        src="${project.build.directory}/embedded/hbase.temp/resources/${hbase.winpkg.folder}.zip"
+                        dest="${project.build.directory}/embedded"
+                    />
                     <copy
-                      file="${project.build.directory}/embedded/hbase.temp/resources/servicehost.exe"
-                      tofile="${project.build.directory}/embedded/${hbase.winpkg.folder}/bin/ams_hbase_master.exe"
-                      />
+                        file="${project.build.directory}/embedded/hbase.temp/resources/servicehost.exe"
+                        tofile="${project.build.directory}/embedded/${hbase.winpkg.folder}/bin/ams_hbase_master.exe"
+                    />
                   </target>
                 </configuration>
               </execution>
@@ -854,7 +866,8 @@
             <!-- The configuration of the plugin -->
             <configuration>
               <!-- Configuration of the archiver -->
-              <finalName>${project.artifactId}-simulator-${project.version}</finalName>
+              <finalName>${project.artifactId}-simulator-${project.version}
+              </finalName>
               <archive>
                 <!-- Manifest specific configuration -->
                 <manifest>
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java
index f8d31f7..65b4614 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java
@@ -120,7 +120,6 @@ import org.apache.hadoop.hbase.HColumnDescriptor;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.client.Durability;
 import org.apache.hadoop.hbase.client.HBaseAdmin;
-import org.apache.hadoop.hbase.util.RetryCounter;
 import org.apache.hadoop.hbase.util.RetryCounterFactory;
 import org.apache.hadoop.metrics2.sink.timeline.ContainerMetric;
 import org.apache.hadoop.metrics2.sink.timeline.MetricClusterAggregate;
@@ -211,7 +210,7 @@ public class PhoenixHBaseAccessor {
   private HashMap<String, String> tableTTL = new HashMap<>();
 
   private final TimelineMetricConfiguration configuration;
-  private InternalMetricsSource rawMetricsSource;
+  private List<InternalMetricsSource> rawMetricsSources;
 
   public PhoenixHBaseAccessor(PhoenixConnectionProvider dataSource) {
     this(TimelineMetricConfiguration.getInstance(), dataSource);
@@ -278,15 +277,17 @@ public class PhoenixHBaseAccessor {
       LOG.info("Initialized aggregator sink class " + metricSinkClass);
     }
 
-    ExternalSinkProvider externalSinkProvider = configuration.getExternalSinkProvider();
+    List<ExternalSinkProvider> externalSinkProviderList = configuration.getExternalSinkProviderList();
     InternalSourceProvider internalSourceProvider = configuration.getInternalSourceProvider();
-    if (externalSinkProvider != null) {
-      ExternalMetricsSink rawMetricsSink = externalSinkProvider.getExternalMetricsSink(RAW_METRICS);
-      int interval = configuration.getExternalSinkInterval(RAW_METRICS);
-      if (interval == -1){
-        interval = cacheCommitInterval;
+    if (!externalSinkProviderList.isEmpty()) {
+      for (ExternalSinkProvider externalSinkProvider : externalSinkProviderList) {
+        ExternalMetricsSink rawMetricsSink = externalSinkProvider.getExternalMetricsSink(RAW_METRICS);
+        int interval = configuration.getExternalSinkInterval(externalSinkProvider.getClass().getSimpleName(), RAW_METRICS);
+        if (interval == -1) {
+          interval = cacheCommitInterval;
+        }
+        rawMetricsSources.add(internalSourceProvider.getInternalMetricsSource(RAW_METRICS, interval, rawMetricsSink));
       }
-      rawMetricsSource = internalSourceProvider.getInternalMetricsSource(RAW_METRICS, interval, rawMetricsSink);
     }
     TIMELINE_METRIC_READ_HELPER = new TimelineMetricReadHelper(this.metadataManagerInstance);
   }
@@ -303,8 +304,10 @@ public class PhoenixHBaseAccessor {
     }
     if (metricsList.size() > 0) {
       commitMetrics(metricsList);
-      if (rawMetricsSource != null) {
-        rawMetricsSource.publishTimelineMetrics(metricsList);
+      if (!rawMetricsSources.isEmpty()) {
+        for (InternalMetricsSource rawMetricsSource : rawMetricsSources) {
+          rawMetricsSource.publishTimelineMetrics(metricsList);
+        }
       }
     }
   }
@@ -316,10 +319,8 @@ public class PhoenixHBaseAccessor {
   private void commitAnomalyMetric(Connection conn, TimelineMetric metric) {
     PreparedStatement metricRecordStmt = null;
     try {
-
       Map<String, String> metricMetadata = metric.getMetadata();
-
-
+      
       byte[] uuid = metadataManagerInstance.getUuid(metric);
       if (uuid == null) {
         LOG.error("Error computing UUID for metric. Cannot write metrics : " + metric.toString());
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricConfiguration.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricConfiguration.java
index 929fc8c..395ec7b 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricConfiguration.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricConfiguration.java
@@ -26,6 +26,7 @@ import java.net.MalformedURLException;
 import java.net.URISyntaxException;
 import java.net.URL;
 import java.net.UnknownHostException;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashSet;
@@ -51,8 +52,6 @@ import org.apache.log4j.Logger;
  * Configuration class that reads properties from ams-site.xml. All values
  * for time or intervals are given in seconds.
  */
-@InterfaceAudience.Public
-@InterfaceStability.Evolving
 public class TimelineMetricConfiguration {
   private static final Log LOG = LogFactory.getLog(TimelineMetricConfiguration.class);
 
@@ -343,14 +342,22 @@ public class TimelineMetricConfiguration {
   public static final String TIMELINE_METRICS_COLLECTOR_IGNITE_BACKUPS = "timeline.metrics.collector.ignite.nodes.backups";
 
   public static final String INTERNAL_CACHE_HEAP_PERCENT =
-    "timeline.metrics.service.cache.%s.heap.percent";
+    "timeline.metrics.internal.cache.%s.heap.percent";
 
   public static final String EXTERNAL_SINK_INTERVAL =
-    "timeline.metrics.service.external.sink.%s.interval";
+    "timeline.metrics.external.sink.%s.%s.interval";
 
   public static final String DEFAULT_EXTERNAL_SINK_DIR =
-    "timeline.metrics.service.external.sink.dir";
-
+    "timeline.metrics.external.sink.dir";
+
+  public static final String KAFKA_SERVERS = "timeline.metrics.external.sink.kafka.bootstrap.servers";
+  public static final String KAFKA_ACKS = "timeline.metrics.external.sink.kafka.acks";
+  public static final String KAFKA_RETRIES = "timeline.metrics.external.sink.kafka.bootstrap.retries";
+  public static final String KAFKA_BATCH_SIZE = "timeline.metrics.external.sink.kafka.batch.size";
+  public static final String KAFKA_LINGER_MS = "timeline.metrics.external.sink.kafka.linger.ms";
+  public static final String KAFKA_BUFFER_MEM = "timeline.metrics.external.sink.kafka.buffer.memory";
+  public static final String KAFKA_SINK_TIMEOUT_SECONDS = "timeline.metrics.external.sink.kafka.timeout.seconds";
+  
   private Configuration hbaseConf;
   private Configuration metricsConf;
   private Configuration metricsSslConf;
@@ -601,8 +608,24 @@ public class TimelineMetricConfiguration {
     return false;
   }
 
-  public int getExternalSinkInterval(SOURCE_NAME sourceName) {
-    return Integer.parseInt(metricsConf.get(String.format(EXTERNAL_SINK_INTERVAL, sourceName), "-1"));
+  /**
+   * Get the sink interval for a metrics source.
+   * Determines how often the metrics will be written to the sink.
+   * This determines whether any caching will be needed on the collector
+   * side, default interval disables caching by writing at the same time as
+   * we get data.
+   *
+   * @param sinkProviderClassName Simple name of your implementation of {@link ExternalSinkProvider}
+   * @param sourceName {@link SOURCE_NAME}
+   * @return seconds
+   */
+  public int getExternalSinkInterval(String sinkProviderClassName,
+                                     SOURCE_NAME sourceName) {
+    String sinkProviderSimpleClassName = sinkProviderClassName.substring(
+      sinkProviderClassName.lastIndexOf(".") + 1);
+
+    return Integer.parseInt(metricsConf.get(
+      String.format(EXTERNAL_SINK_INTERVAL, sinkProviderSimpleClassName, sourceName), "-1"));
   }
 
   public InternalSourceProvider getInternalSourceProvider() {
@@ -612,12 +635,18 @@ public class TimelineMetricConfiguration {
     return ReflectionUtils.newInstance(providerClass, metricsConf);
   }
 
-  public ExternalSinkProvider getExternalSinkProvider() {
-    Class<?> providerClass = metricsConf.getClassByNameOrNull(TIMELINE_METRICS_SINK_PROVIDER_CLASS);
-    if (providerClass != null) {
-      return (ExternalSinkProvider) ReflectionUtils.newInstance(providerClass, metricsConf);
+  /**
+   * List of external sink provider classes. Comma-separated.
+   */
+  public List<ExternalSinkProvider> getExternalSinkProviderList() {
+    Class<?>[] providerClasses = metricsConf.getClasses(TIMELINE_METRICS_SINK_PROVIDER_CLASS);
+    List<ExternalSinkProvider> providerList = new ArrayList<>();
+    if (providerClasses != null) {
+      for (Class<?> providerClass : providerClasses) {
+        providerList.add((ExternalSinkProvider) ReflectionUtils.newInstance(providerClass, metricsConf));
+      }
     }
-    return null;
+    return providerList;
   }
 
   public String getInternalCacheHeapPercent(String instanceName) {
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/sink/ExternalSinkProvider.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/sink/ExternalSinkProvider.java
index 48887d9..7c7683b 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/sink/ExternalSinkProvider.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/sink/ExternalSinkProvider.java
@@ -1,8 +1,3 @@
-package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.sink;
-
-import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.source.InternalSourceProvider;
-import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.source.InternalSourceProvider.SOURCE_NAME;
-
 /**
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
@@ -21,9 +16,14 @@ import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.
  * limitations under the License.
  */
 
+package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.sink;
+
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.source.InternalSourceProvider.SOURCE_NAME;
+
+
 /**
  * Configurable provider for sink classes that match the metrics sources.
- * Provider can return same sink of different sinks for each source.
+ * Provider can return same sink or different sinks for each source.
  */
 public interface ExternalSinkProvider {
 
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/sink/HttpSinkProvider.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/sink/HttpSinkProvider.java
index bb84c8a..9c2a93e 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/sink/HttpSinkProvider.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/sink/HttpSinkProvider.java
@@ -92,7 +92,7 @@ public class HttpSinkProvider implements ExternalSinkProvider {
 
   @Override
   public ExternalMetricsSink getExternalMetricsSink(InternalSourceProvider.SOURCE_NAME sourceName) {
-    return null;
+    return new DefaultHttpMetricsSink();
   }
 
   protected HttpURLConnection getConnection(String spec) throws IOException {
@@ -147,7 +147,7 @@ public class HttpSinkProvider implements ExternalSinkProvider {
     @Override
     public int getSinkTimeOutSeconds() {
       try {
-        return conf.getMetricsConf().getInt("timeline.metrics.service.external.http.sink.timeout.seconds", 10);
+        return conf.getMetricsConf().getInt("timeline.metrics.external.sink.http.timeout.seconds", 10);
       } catch (Exception e) {
         return 10;
       }
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/sink/KafkaSinkProvider.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/sink/KafkaSinkProvider.java
new file mode 100644
index 0000000..3b34b55
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/sink/KafkaSinkProvider.java
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.sink;
+
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.KAFKA_ACKS;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.KAFKA_BATCH_SIZE;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.KAFKA_BUFFER_MEM;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.KAFKA_LINGER_MS;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.KAFKA_RETRIES;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.KAFKA_SERVERS;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.KAFKA_SINK_TIMEOUT_SECONDS;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_CACHE_COMMIT_INTERVAL;
+
+import java.util.Collection;
+import java.util.Properties;
+import java.util.concurrent.Future;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.source.InternalSourceProvider.SOURCE_NAME;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+/*
+  This will be used by the single Metrics committer thread. Hence it is
+  important to make this non-blocking export.
+ */
+public class KafkaSinkProvider implements ExternalSinkProvider {
+  private static String TOPIC_NAME = "ambari-metrics-topic";
+  private static final Log LOG = LogFactory.getLog(KafkaSinkProvider.class);
+
+  private Producer producer;
+  private int TIMEOUT_SECONDS = 10;
+  private int FLUSH_SECONDS = 3;
+
+  ObjectMapper objectMapper = new ObjectMapper();
+
+  public KafkaSinkProvider() {
+    TimelineMetricConfiguration configuration = TimelineMetricConfiguration.getInstance();
+
+    Properties configProperties = new Properties();
+    try {
+      configProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, configuration.getMetricsConf().getTrimmed(KAFKA_SERVERS));
+      configProperties.put(ProducerConfig.ACKS_CONFIG, configuration.getMetricsConf().getTrimmed(KAFKA_ACKS, "all"));
+      // Avoid duplicates - No transactional semantics
+      configProperties.put(ProducerConfig.RETRIES_CONFIG, configuration.getMetricsConf().getInt(KAFKA_RETRIES, 0));
+      configProperties.put(ProducerConfig.BATCH_SIZE_CONFIG, configuration.getMetricsConf().getInt(KAFKA_BATCH_SIZE, 128));
+      configProperties.put(ProducerConfig.LINGER_MS_CONFIG, configuration.getMetricsConf().getInt(KAFKA_LINGER_MS, 1));
+      configProperties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, configuration.getMetricsConf().getLong(KAFKA_BUFFER_MEM, 33554432)); // 32 MB
+      FLUSH_SECONDS = configuration.getMetricsConf().getInt(TIMELINE_METRICS_CACHE_COMMIT_INTERVAL, 3);
+      TIMEOUT_SECONDS = configuration.getMetricsConf().getInt(KAFKA_SINK_TIMEOUT_SECONDS, 10);
+    } catch (Exception e) {
+      LOG.error("Configuration error!", e);
+      throw new ExceptionInInitializerError(e);
+    }
+    configProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.ByteArraySerializer");
+    configProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.connect.json.JsonSerializer");
+
+    
+
+    producer = new KafkaProducer(configProperties);
+  }
+
+  @Override
+  public ExternalMetricsSink getExternalMetricsSink(SOURCE_NAME sourceName) {
+    switch (sourceName) {
+      case RAW_METRICS:
+        return new KafkaRawMetricsSink();
+      default:
+        throw new UnsupportedOperationException("Provider does not support " +
+          "the expected source " + sourceName);
+    }
+  }
+
+  class KafkaRawMetricsSink implements ExternalMetricsSink {
+
+    @Override
+    public int getSinkTimeOutSeconds() {
+      return TIMEOUT_SECONDS;
+    }
+
+    @Override
+    public int getFlushSeconds() {
+      return FLUSH_SECONDS;
+    }
+
+    @Override
+    public void sinkMetricData(Collection<TimelineMetrics> metrics) {
+      JsonNode jsonNode = objectMapper.valueToTree(metrics);
+      ProducerRecord<String, JsonNode> rec = new ProducerRecord<String, JsonNode>(TOPIC_NAME, jsonNode);
+      Future<RecordMetadata> f = producer.send(rec);
+    }
+  }
+
+}
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/source/DefaultInternalMetricsSourceProvider.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/source/DefaultInternalMetricsSourceProvider.java
index b97c39f..c6b071f 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/source/DefaultInternalMetricsSourceProvider.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/source/DefaultInternalMetricsSourceProvider.java
@@ -24,7 +24,7 @@ import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.
 public class DefaultInternalMetricsSourceProvider implements InternalSourceProvider {
   private static final Log LOG = LogFactory.getLog(DefaultInternalMetricsSourceProvider.class);
 
-  // TODO: Implement read based sources for higher level data
+  // TODO: Implement read based sources for higher order data
   @Override
   public InternalMetricsSource getInternalMetricsSource(SOURCE_NAME sourceName, int sinkIntervalSeconds, ExternalMetricsSink sink) {
     if (sink == null) {
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/source/RawMetricsSource.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/source/RawMetricsSource.java
index 967d819..879577a 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/source/RawMetricsSource.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/source/RawMetricsSource.java
@@ -63,21 +63,14 @@ public class RawMetricsSource implements InternalMetricsSource {
   }
 
   private void initializeFixedRateScheduler() {
-    executorService.scheduleAtFixedRate(new Runnable() {
-      @Override
-      public void run() {
-        rawMetricsSink.sinkMetricData(cache.evictAll());
-      }
-    }, rawMetricsSink.getFlushSeconds(), rawMetricsSink.getFlushSeconds(), TimeUnit.SECONDS);
+    executorService.scheduleAtFixedRate(() -> rawMetricsSink.sinkMetricData(cache.evictAll()),
+      rawMetricsSink.getFlushSeconds(), rawMetricsSink.getFlushSeconds(), TimeUnit.SECONDS);
   }
 
   private void submitDataWithTimeout(final Collection<TimelineMetrics> metrics) {
-    Future f = executorService.submit(new Callable<Object>() {
-      @Override
-      public Object call() throws Exception {
-        rawMetricsSink.sinkMetricData(metrics);
-        return null;
-      }
+    Future f = executorService.submit(() -> {
+      rawMetricsSink.sinkMetricData(metrics);
+      return null;
     });
     try {
       f.get(rawMetricsSink.getSinkTimeOutSeconds(), TimeUnit.SECONDS);
diff --git a/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/configuration/ams-admanager-config.xml b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/configuration/ams-admanager-config.xml
new file mode 100644
index 0000000..489850f
--- /dev/null
+++ b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/configuration/ams-admanager-config.xml
@@ -0,0 +1,60 @@
+<?xml version="1.0"?>
+<!--
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+-->
+<configuration>
+  <property>
+    <name>ambari.metrics.admanager.spark.operation.mode</name>
+    <value>stand-alone</value>
+    <display-name>Anomaly Detection Service operation mode</display-name>
+    <description>
+      Service Operation modes:
+      1) stand-alone: Standalone Spark cluster for AD jobs
+      2) spark-on-yarn: Spark running on YARN. (Recommended production setting)
+    </description>
+    <on-ambari-upgrade add="true"/>
+    <value-attributes>
+      <overridable>false</overridable>
+      <type>value-list</type>
+      <entries>
+        <entry>
+          <value>stand-alone</value>
+          <label>Stand Alone</label>
+        </entry>
+        <entry>
+          <value>spark-on-yarn</value>
+          <label>Spark on YARN</label>
+        </entry>
+      </entries>
+      <selection-cardinality>1</selection-cardinality>
+    </value-attributes>
+  </property>
+  <property>
+    <name>ambari.metrics.admanager.application.port</name>
+    <value>9090</value>
+    <display-name>AD Manager http port</display-name>
+    <description>AMS Anomaly Detection Manager application port</description>
+    <value-attributes>
+      <type>int</type>
+      <overridable>false</overridable>
+    </value-attributes>
+    <on-ambari-upgrade add="true"/>
+  </property>
+</configuration>
\ No newline at end of file
diff --git a/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/configuration/ams-admanager-env.xml b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/configuration/ams-admanager-env.xml
new file mode 100644
index 0000000..99e93a6
--- /dev/null
+++ b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/configuration/ams-admanager-env.xml
@@ -0,0 +1,105 @@
+<?xml version="1.0"?>
+<!--
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+-->
+<configuration>
+  <property>
+    <name>ams_admanager_log_dir</name>
+    <value>/var/log/ambari-metrics-anomaly-detection</value>
+    <display-name>Anomaly Detection Manager log dir</display-name>
+    <description>AMS Anomaly Detection Manager log directory.</description>
+    <value-attributes>
+      <type>directory</type>
+    </value-attributes>
+    <on-ambari-upgrade add="true"/>
+  </property>
+  <property>
+    <name>ams_admanager_pid_dir</name>
+    <value>/var/run/ambari-metrics-anomaly-detection</value>
+    <display-name>Anomaly Detection Manager pid dir</display-name>
+    <description>AMS Anomaly Detection Manager pid directory.</description>
+    <value-attributes>
+      <type>directory</type>
+    </value-attributes>
+    <on-ambari-upgrade add="true"/>
+  </property>
+  <property>
+    <name>ams_admanager_data_dir</name>
+    <value>/var/lib/ambari-metrics-anomaly-detection</value>
+    <display-name>Anomaly Detection Manager data dir</display-name>
+    <description>AMS Anomaly Detection Manager data directory.</description>
+    <value-attributes>
+      <type>directory</type>
+    </value-attributes>
+    <on-ambari-upgrade add="true"/>
+  </property>
+  <property>
+    <name>ams_admanager_heapsize</name>
+    <value>1024</value>
+    <description>Anomaly Detection Manager Heap Size</description>
+    <display-name>Anomaly Detection Manager Heap Size</display-name>
+    <value-attributes>
+      <type>int</type>
+      <unit>MB</unit>
+      <minimum>512</minimum>
+      <maximum>16384</maximum>
+      <increment-step>256</increment-step>
+    </value-attributes>
+    <on-ambari-upgrade add="true"/>
+  </property>
+  <property>
+    <name>content</name>
+    <display-name>ams-ad-env template</display-name>
+    <value>
+      # Set environment variables here.
+
+      # The java implementation to use. Java 1.8 required.
+      export JAVA_HOME={{java64_home}}
+
+      #  Anomaly Detection Manager Log directory for log4j
+      export AMS_AD_LOG_DIR={{ams_admanager_log_dir}}
+
+      # Anomaly Detection Manager pid directory
+      export AMS_AD_PID_DIR={{ams_admanager_pid_dir}}
+
+      # Anomaly Detection Manager heapsize
+      export AMS_AD_HEAPSIZE={{ams_admanager_heapsize}}
+
+      # Anomaly Detection Manager data dir
+      export AMS_AD_DATA_DIR={{ams_admanager_data_dir}}
+
+      # Anomaly Detection Manager options
+      export AMS_AD_OPTS="
+      {% if security_enabled %}
+      export AMS_AD_OPTS="$AMS_AD_OPTS -Djava.security.auth.login.config={{ams_ad_jaas_config_file}}"
+      {% endif %}
+
+      # Anomaly Detection Manager GC options
+      export AMS_AD_GC_OPTS="-XX:+UseConcMarkSweepGC -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -Xloggc:{{ams_admanager_log_dir}}/admanager-gc.log-`date +'%Y%m%d%H%M'`"
+      export AMS_AD_OPTS="$AMS_AD_OPTS $AMS_AD_GC_OPTS"
+
+    </value>
+    <value-attributes>
+      <type>content</type>
+    </value-attributes>
+    <on-ambari-upgrade add="true"/>
+  </property>
+
+</configuration>
\ No newline at end of file
diff --git a/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/configuration/ams-site.xml b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/configuration/ams-site.xml
index 49dfd95..6bd25d2 100644
--- a/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/configuration/ams-site.xml
+++ b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/configuration/ams-site.xml
@@ -1,5 +1,4 @@
 <?xml version="1.0"?>
-<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
 <!--
 /**
  *
diff --git a/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/metainfo.xml b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/metainfo.xml
index 78014b0..0add0cd 100644
--- a/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/metainfo.xml
+++ b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/metainfo.xml
@@ -114,13 +114,37 @@
             <config-type>ams-grafana-env</config-type>
             <config-type>ams-grafana-ini</config-type>
           </configuration-dependencies>
+          <logs>
+            <log>
+              <logId>ams_grafana</logId>
+              <primary>true</primary>
+            </log>
+          </logs>
+        </component>
+
+        <component>
+          <name>AD_MANAGER</name>
+          <displayName>AD Manager</displayName>
+          <category>MASTER</category>
+          <cardinality>0-1</cardinality>
+          <versionAdvertised>false</versionAdvertised>
+          <commandScript>
+            <script>scripts/ams_admanager.py</script>
+            <scriptType>PYTHON</scriptType>
+            <timeout>1200</timeout>
+          </commandScript>
+          <configuration-dependencies>
+            <config-type>ams-hbase-site</config-type>
+            <config-type>ams-admanager-config</config-type>
+            <config-type>ams-admanager-env</config-type>
+          </configuration-dependencies>
+          <logs>
+            <log>
+              <logId>ams_anomaly_detection</logId>
+              <primary>true</primary>
+            </log>
+          </logs>
         </component>
-        <logs>
-          <log>
-            <logId>ams_grafana</logId>
-            <primary>true</primary>
-          </log>
-        </logs>
       </components>
 
       <themes>
@@ -153,6 +177,11 @@
               <condition>should_install_ams_grafana</condition>
             </package>
             <package>
+              <name>ambari-metrics-admanager</name>
+              <skipUpgrade>true</skipUpgrade>
+              <condition>should_install_ams_admanager</condition>
+            </package>
+            <package>
               <name>gcc</name>
             </package>
           </packages>
diff --git a/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/scripts/ams.py b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/scripts/ams.py
index 9b15fae..fe6b4ec 100644
--- a/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/scripts/ams.py
+++ b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/scripts/ams.py
@@ -512,6 +512,41 @@ def ams(name=None, action=None):
 
     pass
 
+  elif name == 'admanager':
+    ams_ad_directories = [
+      params.ams_ad_conf_dir,
+      params.ams_ad_log_dir,
+      params.ams_ad_data_dir,
+      params.ams_ad_pid_dir
+    ]
+
+    for ams_ad_dir in ams_ad_directories:
+      Directory(ams_ad_dir,
+                owner=params.ams_user,
+                group=params.user_group,
+                mode=0755,
+                create_parents=True,
+                recursive_ownership=True
+                )
+
+    File(format("{ams_ad_conf_dir}/ams-admanager-env.sh"),
+         owner=params.ams_user,
+         group=params.user_group,
+         content=InlineTemplate(params.ams_grafana_env_sh_template)
+         )
+
+    File(format("{conf_dir}/config.yaml"),
+         content=Template("config.yaml.j2"),
+         owner=params.ams_user,
+         group=params.user_group
+         )
+
+    if action != 'stop':
+      for dir in ams_ad_directories:
+        Execute(('chown', '-R', params.ams_user, dir),
+                sudo=True
+                )
+
 def is_spnego_enabled(params):
   if 'core-site' in params.config['configurations'] \
       and 'hadoop.http.authentication.type' in params.config['configurations']['core-site'] \
diff --git a/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/scripts/ams_admanager.py b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/scripts/ams_admanager.py
new file mode 100644
index 0000000..96c4454
--- /dev/null
+++ b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/scripts/ams_admanager.py
@@ -0,0 +1,73 @@
+#!/usr/bin/env python
+"""
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+"""
+from resource_management import Script, Execute
+from resource_management.libraries.functions import format
+from status import check_service_status
+from ams import ams
+from resource_management.core.logger import Logger
+from resource_management.core import sudo
+
+class AmsADManager(Script):
+  def install(self, env):
+    import params
+    env.set_params(params)
+    self.install_packages(env)
+    self.configure(env) # for security
+
+  def configure(self, env, action = None):
+    import params
+    env.set_params(params)
+    ams(name='admanager', action=action)
+
+  def start(self, env):
+    import params
+    env.set_params(params)
+    self.configure(env, action = 'start')
+
+    start_cmd = format("{ams_admanager_script} start")
+    Execute(start_cmd,
+            user=params.ams_user
+            )
+    pidfile = format("{ams_ad_pid_dir}/admanager.pid")
+    if not sudo.path_exists(pidfile):
+      Logger.warning("Pid file doesn't exist after starting of the component.")
+    else:
+      Logger.info("AD Manager Server has started with pid: {0}".format(sudo.read_file(pidfile).strip()))
+
+
+  def stop(self, env):
+    import params
+    env.set_params(params)
+    self.configure(env, action = 'stop')
+    Execute((format("{ams_admanager_script}"), 'stop'),
+            sudo=True
+            )
+
+  def status(self, env):
+    import status_params
+    env.set_params(status_params)
+    check_service_status(env, name='admanager')
+
+  def get_pid_files(self):
+    import status_params
+    return [status_params.ams_ad_pid_file]
+
+if __name__ == "__main__":
+  AmsADManager().execute()
diff --git a/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/scripts/params.py b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/scripts/params.py
index 6389696..5d49939 100644
--- a/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/scripts/params.py
+++ b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/scripts/params.py
@@ -187,6 +187,15 @@ ams_hbase_home_dir = "/usr/lib/ams-hbase/"
 
 ams_hbase_init_check_enabled = default("/configurations/ams-site/timeline.metrics.hbase.init.check.enabled", True)
 
+# AD Manager settings
+ams_ad_conf_dir = '/etc/ambari-metrics-anomaly-detection/conf'
+ams_ad_log_dir = default("/configurations/ams-ad-env/ams_admanager_log_dir", 'var/log/ambari-metrics-anomaly-detection')
+ams_ad_pid_dir = status_params.ams_admanager_pid_dir
+ams_ad_data_dir = default("/configurations/ams-ad-env/ams_admanager_data_dir", '/var/lib/ambari-metrics-anomaly-detection')
+
+ams_admanager_script = "/usr/sbin/ambari-metrics-admanager"
+ams_admanager_port = config['configurations']['ams-admanager-config']['ambari.metrics.admanager.application.port']
+
 #hadoop params
 
 hbase_excluded_hosts = config['commandParams']['excluded_hosts']
diff --git a/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/scripts/status.py b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/scripts/status.py
index 0b24ac0..e2af793 100644
--- a/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/scripts/status.py
+++ b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/scripts/status.py
@@ -43,6 +43,8 @@ def check_service_status(env, name):
     check_process_status(status_params.monitor_pid_file)
   elif name == 'grafana':
     check_process_status(status_params.grafana_pid_file)
+  elif name == 'admanager':
+    check_process_status(status_params.ams_ad_pid_file)
 
 @OsFamilyFuncImpl(os_family=OSConst.WINSRV_FAMILY)
 def check_service_status(name):
diff --git a/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/scripts/status_params.py b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/scripts/status_params.py
index 27c6020..bc9b7e3 100644
--- a/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/scripts/status_params.py
+++ b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/scripts/status_params.py
@@ -33,9 +33,11 @@ hbase_user = ams_user
 ams_collector_pid_dir = config['configurations']['ams-env']['metrics_collector_pid_dir']
 ams_monitor_pid_dir = config['configurations']['ams-env']['metrics_monitor_pid_dir']
 ams_grafana_pid_dir = config['configurations']['ams-grafana-env']['metrics_grafana_pid_dir']
+ams_admanager_pid_dir = config['configurations']['ams-ad-env']['ams_admanager_pid_dir']
 
 monitor_pid_file = format("{ams_monitor_pid_dir}/ambari-metrics-monitor.pid")
 grafana_pid_file = format("{ams_grafana_pid_dir}/grafana-server.pid")
+ams_ad_pid_file = format("{ams_ad_pid_dir}/admanager.pid")
 
 security_enabled = config['configurations']['cluster-env']['security_enabled']
 ams_hbase_conf_dir = format("{hbase_conf_dir}")
diff --git a/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/templates/admanager_config.yaml.j2 b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/templates/admanager_config.yaml.j2
new file mode 100644
index 0000000..787aa3e
--- /dev/null
+++ b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/templates/admanager_config.yaml.j2
@@ -0,0 +1,24 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+server:
+  applicationConnectors:
+   - type: http
+     port: {{ams_admanager_port}}
+  requestLog:
+    type: external
+
+logging:
+  type: external

-- 
To stop receiving notification emails like this one, please contact
avijayan@apache.org.