You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by av...@apache.org on 2018/04/01 19:14:20 UTC
[ambari] 24/39: AMBARI-22343. Add ability in AMS to tee metrics to
a set of configured Kafka brokers. (swagle)
This is an automated email from the ASF dual-hosted git repository.
avijayan pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/ambari.git
commit 64dcc02abc94859f9df316767b7f4589ea0b1502
Author: Siddharth Wagle <sw...@hortonworks.com>
AuthorDate: Tue Oct 31 11:34:45 2017 -0700
AMBARI-22343. Add ability in AMS to tee metrics to a set of configured Kafka brokers. (swagle)
---
.../libraries/functions/package_conditions.py | 4 +
.../pom.xml | 11 ++
.../src/main/resources/config.yml | 3 -
.../ambari-metrics-timelineservice/pom.xml | 127 ++++++++++++---------
.../metrics/timeline/PhoenixHBaseAccessor.java | 29 ++---
.../timeline/TimelineMetricConfiguration.java | 55 ++++++---
.../timeline/sink/ExternalSinkProvider.java | 12 +-
.../metrics/timeline/sink/HttpSinkProvider.java | 4 +-
.../metrics/timeline/sink/KafkaSinkProvider.java | 118 +++++++++++++++++++
.../DefaultInternalMetricsSourceProvider.java | 2 +-
.../metrics/timeline/source/RawMetricsSource.java | 17 +--
.../0.1.0/configuration/ams-admanager-config.xml | 60 ++++++++++
.../0.1.0/configuration/ams-admanager-env.xml | 105 +++++++++++++++++
.../0.1.0/configuration/ams-site.xml | 1 -
.../AMBARI_METRICS/0.1.0/metainfo.xml | 41 ++++++-
.../AMBARI_METRICS/0.1.0/package/scripts/ams.py | 35 ++++++
.../0.1.0/package/scripts/ams_admanager.py | 73 ++++++++++++
.../AMBARI_METRICS/0.1.0/package/scripts/params.py | 9 ++
.../AMBARI_METRICS/0.1.0/package/scripts/status.py | 2 +
.../0.1.0/package/scripts/status_params.py | 2 +
.../package/templates/admanager_config.yaml.j2 | 24 ++++
21 files changed, 619 insertions(+), 115 deletions(-)
diff --git a/ambari-common/src/main/python/resource_management/libraries/functions/package_conditions.py b/ambari-common/src/main/python/resource_management/libraries/functions/package_conditions.py
index ebc1aba..64cda98 100644
--- a/ambari-common/src/main/python/resource_management/libraries/functions/package_conditions.py
+++ b/ambari-common/src/main/python/resource_management/libraries/functions/package_conditions.py
@@ -50,6 +50,10 @@ def should_install_phoenix():
has_phoenix = len(phoenix_hosts) > 0
return phoenix_enabled or has_phoenix
+def should_install_ams_admanager():
+ config = Script.get_config()
+ return _has_applicable_local_component(config, ["AD_MANAGER"])
+
def should_install_ams_collector():
config = Script.get_config()
return _has_applicable_local_component(config, ["METRICS_COLLECTOR"])
diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/pom.xml b/ambari-metrics/ambari-metrics-anomaly-detection-service/pom.xml
index 554d026..e96e957 100644
--- a/ambari-metrics/ambari-metrics-anomaly-detection-service/pom.xml
+++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/pom.xml
@@ -296,6 +296,12 @@
<artifactId>spark-mllib_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
<scope>provided</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-databind</artifactId>
+ </exclusion>
+ </exclusions>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
@@ -419,6 +425,11 @@
<version>${jackson.version}</version>
</dependency>
<dependency>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-databind</artifactId>
+ <version>${jackson.version}</version>
+ </dependency>
+ <dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/resources/config.yml b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/resources/config.yml
index 9ca9e95..bd88d57 100644
--- a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/resources/config.yml
+++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/resources/config.yml
@@ -2,9 +2,6 @@ server:
applicationConnectors:
- type: http
port: 9999
- adminConnectors:
- - type: http
- port: 9990
requestLog:
type: external
diff --git a/ambari-metrics/ambari-metrics-timelineservice/pom.xml b/ambari-metrics/ambari-metrics-timelineservice/pom.xml
index 3d119f9..7794a11 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/pom.xml
+++ b/ambari-metrics/ambari-metrics-timelineservice/pom.xml
@@ -80,7 +80,8 @@
<outputDirectory>${project.build.directory}/lib</outputDirectory>
<includeScope>compile</includeScope>
<excludeScope>test</excludeScope>
- <excludeArtifactIds>jasper-runtime,jasper-compiler</excludeArtifactIds>
+ <excludeArtifactIds>jasper-runtime,jasper-compiler
+ </excludeArtifactIds>
</configuration>
</execution>
</executions>
@@ -125,11 +126,13 @@
<source>
<location>target/lib</location>
<excludes>
- <exclude>*tests.jar</exclude>
+ <exclude>*tests.jar</exclude>
</excludes>
</source>
<source>
- <location>${project.build.directory}/${project.artifactId}-${project.version}.jar</location>
+ <location>
+ ${project.build.directory}/${project.artifactId}-${project.version}.jar
+ </location>
</source>
</sources>
</mapping>
@@ -214,7 +217,9 @@
<location>conf/unix/amshbase_metrics_whitelist</location>
</source>
<source>
- <location>target/embedded/${hbase.folder}/conf/hbase-site.xml</location>
+ <location>
+ target/embedded/${hbase.folder}/conf/hbase-site.xml
+ </location>
</source>
</sources>
</mapping>
@@ -287,7 +292,8 @@
<skip>true</skip>
<attach>false</attach>
<submodules>false</submodules>
- <controlDir>${project.basedir}/../src/main/package/deb/control</controlDir>
+ <controlDir>${project.basedir}/../src/main/package/deb/control
+ </controlDir>
</configuration>
</plugin>
</plugins>
@@ -657,23 +663,29 @@
<scope>test</scope>
<classifier>tests</classifier>
</dependency>
- <dependency>
- <groupId>org.apache.hbase</groupId>
- <artifactId>hbase-testing-util</artifactId>
- <version>${hbase.version}</version>
- <scope>test</scope>
- <optional>true</optional>
- <exclusions>
- <exclusion>
- <groupId>org.jruby</groupId>
- <artifactId>jruby-complete</artifactId>
- </exclusion>
- <exclusion>
- <artifactId>zookeeper</artifactId>
- <groupId>org.apache.zookeeper</groupId>
- </exclusion>
- </exclusions>
- </dependency>
+ <dependency>
+ <groupId>org.apache.hbase</groupId>
+ <artifactId>hbase-testing-util</artifactId>
+ <version>${hbase.version}</version>
+ <scope>test</scope>
+ <optional>true</optional>
+ <exclusions>
+ <exclusion>
+ <groupId>org.jruby</groupId>
+ <artifactId>jruby-complete</artifactId>
+ </exclusion>
+ <exclusion>
+ <artifactId>zookeeper</artifactId>
+ <groupId>org.apache.zookeeper</groupId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>kafka-clients</artifactId>
+ <version>0.11.0.1</version>
+ </dependency>
+
<dependency>
<groupId>org.powermock</groupId>
<artifactId>powermock-module-junit4</artifactId>
@@ -731,17 +743,17 @@
</goals>
<configuration>
<target name="Download HBase">
- <mkdir dir="${project.build.directory}/embedded" />
+ <mkdir dir="${project.build.directory}/embedded"/>
<get
- src="${hbase.tar}"
- dest="${project.build.directory}/embedded/hbase.tar.gz"
- usetimestamp="true"
- />
+ src="${hbase.tar}"
+ dest="${project.build.directory}/embedded/hbase.tar.gz"
+ usetimestamp="true"
+ />
<untar
- src="${project.build.directory}/embedded/hbase.tar.gz"
- dest="${project.build.directory}/embedded"
- compression="gzip"
- />
+ src="${project.build.directory}/embedded/hbase.tar.gz"
+ dest="${project.build.directory}/embedded"
+ compression="gzip"
+ />
</target>
</configuration>
</execution>
@@ -755,19 +767,19 @@
<target name="Download Phoenix">
<mkdir dir="${project.build.directory}/embedded"/>
<get
- src="${phoenix.tar}"
- dest="${project.build.directory}/embedded/phoenix.tar.gz"
- usetimestamp="true"
- />
+ src="${phoenix.tar}"
+ dest="${project.build.directory}/embedded/phoenix.tar.gz"
+ usetimestamp="true"
+ />
<untar
- src="${project.build.directory}/embedded/phoenix.tar.gz"
- dest="${project.build.directory}/embedded"
- compression="gzip"
- />
+ src="${project.build.directory}/embedded/phoenix.tar.gz"
+ dest="${project.build.directory}/embedded"
+ compression="gzip"
+ />
<move
- file="${project.build.directory}/embedded/${phoenix.folder}/phoenix-${phoenix.version}-server.jar"
- tofile="${project.build.directory}/embedded/${hbase.folder}/lib/phoenix-${phoenix.version}-server.jar"
- />
+ file="${project.build.directory}/embedded/${phoenix.folder}/phoenix-${phoenix.version}-server.jar"
+ tofile="${project.build.directory}/embedded/${hbase.folder}/lib/phoenix-${phoenix.version}-server.jar"
+ />
</target>
</configuration>
</execution>
@@ -798,24 +810,24 @@
</goals>
<configuration>
<target name="Download HBase">
- <mkdir dir="${project.build.directory}/embedded" />
+ <mkdir dir="${project.build.directory}/embedded"/>
<get
- src="${hbase.winpkg.zip}"
- dest="${project.build.directory}/embedded/hbase.zip"
- usetimestamp="true"
- />
+ src="${hbase.winpkg.zip}"
+ dest="${project.build.directory}/embedded/hbase.zip"
+ usetimestamp="true"
+ />
<unzip
- src="${project.build.directory}/embedded/hbase.zip"
- dest="${project.build.directory}/embedded/hbase.temp"
- />
+ src="${project.build.directory}/embedded/hbase.zip"
+ dest="${project.build.directory}/embedded/hbase.temp"
+ />
<unzip
- src="${project.build.directory}/embedded/hbase.temp/resources/${hbase.winpkg.folder}.zip"
- dest="${project.build.directory}/embedded"
- />
+ src="${project.build.directory}/embedded/hbase.temp/resources/${hbase.winpkg.folder}.zip"
+ dest="${project.build.directory}/embedded"
+ />
<copy
- file="${project.build.directory}/embedded/hbase.temp/resources/servicehost.exe"
- tofile="${project.build.directory}/embedded/${hbase.winpkg.folder}/bin/ams_hbase_master.exe"
- />
+ file="${project.build.directory}/embedded/hbase.temp/resources/servicehost.exe"
+ tofile="${project.build.directory}/embedded/${hbase.winpkg.folder}/bin/ams_hbase_master.exe"
+ />
</target>
</configuration>
</execution>
@@ -854,7 +866,8 @@
<!-- The configuration of the plugin -->
<configuration>
<!-- Configuration of the archiver -->
- <finalName>${project.artifactId}-simulator-${project.version}</finalName>
+ <finalName>${project.artifactId}-simulator-${project.version}
+ </finalName>
<archive>
<!-- Manifest specific configuration -->
<manifest>
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java
index f8d31f7..65b4614 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java
@@ -120,7 +120,6 @@ import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.HBaseAdmin;
-import org.apache.hadoop.hbase.util.RetryCounter;
import org.apache.hadoop.hbase.util.RetryCounterFactory;
import org.apache.hadoop.metrics2.sink.timeline.ContainerMetric;
import org.apache.hadoop.metrics2.sink.timeline.MetricClusterAggregate;
@@ -211,7 +210,7 @@ public class PhoenixHBaseAccessor {
private HashMap<String, String> tableTTL = new HashMap<>();
private final TimelineMetricConfiguration configuration;
- private InternalMetricsSource rawMetricsSource;
+ private List<InternalMetricsSource> rawMetricsSources;
public PhoenixHBaseAccessor(PhoenixConnectionProvider dataSource) {
this(TimelineMetricConfiguration.getInstance(), dataSource);
@@ -278,15 +277,17 @@ public class PhoenixHBaseAccessor {
LOG.info("Initialized aggregator sink class " + metricSinkClass);
}
- ExternalSinkProvider externalSinkProvider = configuration.getExternalSinkProvider();
+ List<ExternalSinkProvider> externalSinkProviderList = configuration.getExternalSinkProviderList();
InternalSourceProvider internalSourceProvider = configuration.getInternalSourceProvider();
- if (externalSinkProvider != null) {
- ExternalMetricsSink rawMetricsSink = externalSinkProvider.getExternalMetricsSink(RAW_METRICS);
- int interval = configuration.getExternalSinkInterval(RAW_METRICS);
- if (interval == -1){
- interval = cacheCommitInterval;
+ if (!externalSinkProviderList.isEmpty()) {
+ for (ExternalSinkProvider externalSinkProvider : externalSinkProviderList) {
+ ExternalMetricsSink rawMetricsSink = externalSinkProvider.getExternalMetricsSink(RAW_METRICS);
+ int interval = configuration.getExternalSinkInterval(externalSinkProvider.getClass().getSimpleName(), RAW_METRICS);
+ if (interval == -1) {
+ interval = cacheCommitInterval;
+ }
+ rawMetricsSources.add(internalSourceProvider.getInternalMetricsSource(RAW_METRICS, interval, rawMetricsSink));
}
- rawMetricsSource = internalSourceProvider.getInternalMetricsSource(RAW_METRICS, interval, rawMetricsSink);
}
TIMELINE_METRIC_READ_HELPER = new TimelineMetricReadHelper(this.metadataManagerInstance);
}
@@ -303,8 +304,10 @@ public class PhoenixHBaseAccessor {
}
if (metricsList.size() > 0) {
commitMetrics(metricsList);
- if (rawMetricsSource != null) {
- rawMetricsSource.publishTimelineMetrics(metricsList);
+ if (!rawMetricsSources.isEmpty()) {
+ for (InternalMetricsSource rawMetricsSource : rawMetricsSources) {
+ rawMetricsSource.publishTimelineMetrics(metricsList);
+ }
}
}
}
@@ -316,10 +319,8 @@ public class PhoenixHBaseAccessor {
private void commitAnomalyMetric(Connection conn, TimelineMetric metric) {
PreparedStatement metricRecordStmt = null;
try {
-
Map<String, String> metricMetadata = metric.getMetadata();
-
-
+
byte[] uuid = metadataManagerInstance.getUuid(metric);
if (uuid == null) {
LOG.error("Error computing UUID for metric. Cannot write metrics : " + metric.toString());
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricConfiguration.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricConfiguration.java
index 929fc8c..395ec7b 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricConfiguration.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricConfiguration.java
@@ -26,6 +26,7 @@ import java.net.MalformedURLException;
import java.net.URISyntaxException;
import java.net.URL;
import java.net.UnknownHostException;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
@@ -51,8 +52,6 @@ import org.apache.log4j.Logger;
* Configuration class that reads properties from ams-site.xml. All values
* for time or intervals are given in seconds.
*/
-@InterfaceAudience.Public
-@InterfaceStability.Evolving
public class TimelineMetricConfiguration {
private static final Log LOG = LogFactory.getLog(TimelineMetricConfiguration.class);
@@ -343,14 +342,22 @@ public class TimelineMetricConfiguration {
public static final String TIMELINE_METRICS_COLLECTOR_IGNITE_BACKUPS = "timeline.metrics.collector.ignite.nodes.backups";
public static final String INTERNAL_CACHE_HEAP_PERCENT =
- "timeline.metrics.service.cache.%s.heap.percent";
+ "timeline.metrics.internal.cache.%s.heap.percent";
public static final String EXTERNAL_SINK_INTERVAL =
- "timeline.metrics.service.external.sink.%s.interval";
+ "timeline.metrics.external.sink.%s.%s.interval";
public static final String DEFAULT_EXTERNAL_SINK_DIR =
- "timeline.metrics.service.external.sink.dir";
-
+ "timeline.metrics.external.sink.dir";
+
+ public static final String KAFKA_SERVERS = "timeline.metrics.external.sink.kafka.bootstrap.servers";
+ public static final String KAFKA_ACKS = "timeline.metrics.external.sink.kafka.acks";
+ public static final String KAFKA_RETRIES = "timeline.metrics.external.sink.kafka.bootstrap.retries";
+ public static final String KAFKA_BATCH_SIZE = "timeline.metrics.external.sink.kafka.batch.size";
+ public static final String KAFKA_LINGER_MS = "timeline.metrics.external.sink.kafka.linger.ms";
+ public static final String KAFKA_BUFFER_MEM = "timeline.metrics.external.sink.kafka.buffer.memory";
+ public static final String KAFKA_SINK_TIMEOUT_SECONDS = "timeline.metrics.external.sink.kafka.timeout.seconds";
+
private Configuration hbaseConf;
private Configuration metricsConf;
private Configuration metricsSslConf;
@@ -601,8 +608,24 @@ public class TimelineMetricConfiguration {
return false;
}
- public int getExternalSinkInterval(SOURCE_NAME sourceName) {
- return Integer.parseInt(metricsConf.get(String.format(EXTERNAL_SINK_INTERVAL, sourceName), "-1"));
+ /**
+ * Get the sink interval for a metrics source.
+ * Determines how often the metrics will be written to the sink.
+ * This determines whether any caching will be needed on the collector
+ * side, default interval disables caching by writing at the same time as
+ * we get data.
+ *
+ * @param sinkProviderClassName Simple name of your implementation of {@link ExternalSinkProvider}
+ * @param sourceName {@link SOURCE_NAME}
+ * @return seconds
+ */
+ public int getExternalSinkInterval(String sinkProviderClassName,
+ SOURCE_NAME sourceName) {
+ String sinkProviderSimpleClassName = sinkProviderClassName.substring(
+ sinkProviderClassName.lastIndexOf(".") + 1);
+
+ return Integer.parseInt(metricsConf.get(
+ String.format(EXTERNAL_SINK_INTERVAL, sinkProviderSimpleClassName, sourceName), "-1"));
}
public InternalSourceProvider getInternalSourceProvider() {
@@ -612,12 +635,18 @@ public class TimelineMetricConfiguration {
return ReflectionUtils.newInstance(providerClass, metricsConf);
}
- public ExternalSinkProvider getExternalSinkProvider() {
- Class<?> providerClass = metricsConf.getClassByNameOrNull(TIMELINE_METRICS_SINK_PROVIDER_CLASS);
- if (providerClass != null) {
- return (ExternalSinkProvider) ReflectionUtils.newInstance(providerClass, metricsConf);
+ /**
+ * List of external sink provider classes. Comma-separated.
+ */
+ public List<ExternalSinkProvider> getExternalSinkProviderList() {
+ Class<?>[] providerClasses = metricsConf.getClasses(TIMELINE_METRICS_SINK_PROVIDER_CLASS);
+ List<ExternalSinkProvider> providerList = new ArrayList<>();
+ if (providerClasses != null) {
+ for (Class<?> providerClass : providerClasses) {
+ providerList.add((ExternalSinkProvider) ReflectionUtils.newInstance(providerClass, metricsConf));
+ }
}
- return null;
+ return providerList;
}
public String getInternalCacheHeapPercent(String instanceName) {
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/sink/ExternalSinkProvider.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/sink/ExternalSinkProvider.java
index 48887d9..7c7683b 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/sink/ExternalSinkProvider.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/sink/ExternalSinkProvider.java
@@ -1,8 +1,3 @@
-package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.sink;
-
-import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.source.InternalSourceProvider;
-import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.source.InternalSourceProvider.SOURCE_NAME;
-
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
@@ -21,9 +16,14 @@ import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.
* limitations under the License.
*/
+package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.sink;
+
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.source.InternalSourceProvider.SOURCE_NAME;
+
+
/**
* Configurable provider for sink classes that match the metrics sources.
- * Provider can return same sink of different sinks for each source.
+ * Provider can return same sink or different sinks for each source.
*/
public interface ExternalSinkProvider {
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/sink/HttpSinkProvider.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/sink/HttpSinkProvider.java
index bb84c8a..9c2a93e 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/sink/HttpSinkProvider.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/sink/HttpSinkProvider.java
@@ -92,7 +92,7 @@ public class HttpSinkProvider implements ExternalSinkProvider {
@Override
public ExternalMetricsSink getExternalMetricsSink(InternalSourceProvider.SOURCE_NAME sourceName) {
- return null;
+ return new DefaultHttpMetricsSink();
}
protected HttpURLConnection getConnection(String spec) throws IOException {
@@ -147,7 +147,7 @@ public class HttpSinkProvider implements ExternalSinkProvider {
@Override
public int getSinkTimeOutSeconds() {
try {
- return conf.getMetricsConf().getInt("timeline.metrics.service.external.http.sink.timeout.seconds", 10);
+ return conf.getMetricsConf().getInt("timeline.metrics.external.sink.http.timeout.seconds", 10);
} catch (Exception e) {
return 10;
}
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/sink/KafkaSinkProvider.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/sink/KafkaSinkProvider.java
new file mode 100644
index 0000000..3b34b55
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/sink/KafkaSinkProvider.java
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.sink;
+
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.KAFKA_ACKS;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.KAFKA_BATCH_SIZE;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.KAFKA_BUFFER_MEM;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.KAFKA_LINGER_MS;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.KAFKA_RETRIES;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.KAFKA_SERVERS;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.KAFKA_SINK_TIMEOUT_SECONDS;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_CACHE_COMMIT_INTERVAL;
+
+import java.util.Collection;
+import java.util.Properties;
+import java.util.concurrent.Future;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.source.InternalSourceProvider.SOURCE_NAME;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+/*
+ This will be used by the single Metrics committer thread. Hence it is
+ important to make this non-blocking export.
+ */
+public class KafkaSinkProvider implements ExternalSinkProvider {
+ private static String TOPIC_NAME = "ambari-metrics-topic";
+ private static final Log LOG = LogFactory.getLog(KafkaSinkProvider.class);
+
+ private Producer producer;
+ private int TIMEOUT_SECONDS = 10;
+ private int FLUSH_SECONDS = 3;
+
+ ObjectMapper objectMapper = new ObjectMapper();
+
+ public KafkaSinkProvider() {
+ TimelineMetricConfiguration configuration = TimelineMetricConfiguration.getInstance();
+
+ Properties configProperties = new Properties();
+ try {
+ configProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, configuration.getMetricsConf().getTrimmed(KAFKA_SERVERS));
+ configProperties.put(ProducerConfig.ACKS_CONFIG, configuration.getMetricsConf().getTrimmed(KAFKA_ACKS, "all"));
+ // Avoid duplicates - No transactional semantics
+ configProperties.put(ProducerConfig.RETRIES_CONFIG, configuration.getMetricsConf().getInt(KAFKA_RETRIES, 0));
+ configProperties.put(ProducerConfig.BATCH_SIZE_CONFIG, configuration.getMetricsConf().getInt(KAFKA_BATCH_SIZE, 128));
+ configProperties.put(ProducerConfig.LINGER_MS_CONFIG, configuration.getMetricsConf().getInt(KAFKA_LINGER_MS, 1));
+ configProperties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, configuration.getMetricsConf().getLong(KAFKA_BUFFER_MEM, 33554432)); // 32 MB
+ FLUSH_SECONDS = configuration.getMetricsConf().getInt(TIMELINE_METRICS_CACHE_COMMIT_INTERVAL, 3);
+ TIMEOUT_SECONDS = configuration.getMetricsConf().getInt(KAFKA_SINK_TIMEOUT_SECONDS, 10);
+ } catch (Exception e) {
+ LOG.error("Configuration error!", e);
+ throw new ExceptionInInitializerError(e);
+ }
+ configProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.ByteArraySerializer");
+ configProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.connect.json.JsonSerializer");
+
+
+
+ producer = new KafkaProducer(configProperties);
+ }
+
+ @Override
+ public ExternalMetricsSink getExternalMetricsSink(SOURCE_NAME sourceName) {
+ switch (sourceName) {
+ case RAW_METRICS:
+ return new KafkaRawMetricsSink();
+ default:
+ throw new UnsupportedOperationException("Provider does not support " +
+ "the expected source " + sourceName);
+ }
+ }
+
+ class KafkaRawMetricsSink implements ExternalMetricsSink {
+
+ @Override
+ public int getSinkTimeOutSeconds() {
+ return TIMEOUT_SECONDS;
+ }
+
+ @Override
+ public int getFlushSeconds() {
+ return FLUSH_SECONDS;
+ }
+
+ @Override
+ public void sinkMetricData(Collection<TimelineMetrics> metrics) {
+ JsonNode jsonNode = objectMapper.valueToTree(metrics);
+ ProducerRecord<String, JsonNode> rec = new ProducerRecord<String, JsonNode>(TOPIC_NAME, jsonNode);
+ Future<RecordMetadata> f = producer.send(rec);
+ }
+ }
+
+}
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/source/DefaultInternalMetricsSourceProvider.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/source/DefaultInternalMetricsSourceProvider.java
index b97c39f..c6b071f 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/source/DefaultInternalMetricsSourceProvider.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/source/DefaultInternalMetricsSourceProvider.java
@@ -24,7 +24,7 @@ import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.
public class DefaultInternalMetricsSourceProvider implements InternalSourceProvider {
private static final Log LOG = LogFactory.getLog(DefaultInternalMetricsSourceProvider.class);
- // TODO: Implement read based sources for higher level data
+ // TODO: Implement read based sources for higher order data
@Override
public InternalMetricsSource getInternalMetricsSource(SOURCE_NAME sourceName, int sinkIntervalSeconds, ExternalMetricsSink sink) {
if (sink == null) {
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/source/RawMetricsSource.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/source/RawMetricsSource.java
index 967d819..879577a 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/source/RawMetricsSource.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/source/RawMetricsSource.java
@@ -63,21 +63,14 @@ public class RawMetricsSource implements InternalMetricsSource {
}
private void initializeFixedRateScheduler() {
- executorService.scheduleAtFixedRate(new Runnable() {
- @Override
- public void run() {
- rawMetricsSink.sinkMetricData(cache.evictAll());
- }
- }, rawMetricsSink.getFlushSeconds(), rawMetricsSink.getFlushSeconds(), TimeUnit.SECONDS);
+ executorService.scheduleAtFixedRate(() -> rawMetricsSink.sinkMetricData(cache.evictAll()),
+ rawMetricsSink.getFlushSeconds(), rawMetricsSink.getFlushSeconds(), TimeUnit.SECONDS);
}
private void submitDataWithTimeout(final Collection<TimelineMetrics> metrics) {
- Future f = executorService.submit(new Callable<Object>() {
- @Override
- public Object call() throws Exception {
- rawMetricsSink.sinkMetricData(metrics);
- return null;
- }
+ Future f = executorService.submit(() -> {
+ rawMetricsSink.sinkMetricData(metrics);
+ return null;
});
try {
f.get(rawMetricsSink.getSinkTimeOutSeconds(), TimeUnit.SECONDS);
diff --git a/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/configuration/ams-admanager-config.xml b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/configuration/ams-admanager-config.xml
new file mode 100644
index 0000000..489850f
--- /dev/null
+++ b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/configuration/ams-admanager-config.xml
@@ -0,0 +1,60 @@
+<?xml version="1.0"?>
+<!--
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+-->
+<configuration>
+ <property>
+ <name>ambari.metrics.admanager.spark.operation.mode</name>
+ <value>stand-alone</value>
+ <display-name>Anomaly Detection Service operation mode</display-name>
+ <description>
+ Service Operation modes:
+ 1) stand-alone: Standalone Spark cluster for AD jobs
+ 2) spark-on-yarn: Spark running on YARN. (Recommended production setting)
+ </description>
+ <on-ambari-upgrade add="true"/>
+ <value-attributes>
+ <overridable>false</overridable>
+ <type>value-list</type>
+ <entries>
+ <entry>
+ <value>stand-alone</value>
+ <label>Stand Alone</label>
+ </entry>
+ <entry>
+ <value>spark-on-yarn</value>
+ <label>Spark on YARN</label>
+ </entry>
+ </entries>
+ <selection-cardinality>1</selection-cardinality>
+ </value-attributes>
+ </property>
+ <property>
+ <name>ambari.metrics.admanager.application.port</name>
+ <value>9090</value>
+ <display-name>AD Manager http port</display-name>
+ <description>AMS Anomaly Detection Manager application port</description>
+ <value-attributes>
+ <type>int</type>
+ <overridable>false</overridable>
+ </value-attributes>
+ <on-ambari-upgrade add="true"/>
+ </property>
+</configuration>
\ No newline at end of file
diff --git a/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/configuration/ams-admanager-env.xml b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/configuration/ams-admanager-env.xml
new file mode 100644
index 0000000..99e93a6
--- /dev/null
+++ b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/configuration/ams-admanager-env.xml
@@ -0,0 +1,105 @@
+<?xml version="1.0"?>
+<!--
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+-->
+<configuration>
+ <property>
+ <name>ams_admanager_log_dir</name>
+ <value>/var/log/ambari-metrics-anomaly-detection</value>
+ <display-name>Anomaly Detection Manager log dir</display-name>
+ <description>AMS Anomaly Detection Manager log directory.</description>
+ <value-attributes>
+ <type>directory</type>
+ </value-attributes>
+ <on-ambari-upgrade add="true"/>
+ </property>
+ <property>
+ <name>ams_admanager_pid_dir</name>
+ <value>/var/run/ambari-metrics-anomaly-detection</value>
+ <display-name>Anomaly Detection Manager pid dir</display-name>
+ <description>AMS Anomaly Detection Manager pid directory.</description>
+ <value-attributes>
+ <type>directory</type>
+ </value-attributes>
+ <on-ambari-upgrade add="true"/>
+ </property>
+ <property>
+ <name>ams_admanager_data_dir</name>
+ <value>/var/lib/ambari-metrics-anomaly-detection</value>
+ <display-name>Anomaly Detection Manager data dir</display-name>
+ <description>AMS Anomaly Detection Manager data directory.</description>
+ <value-attributes>
+ <type>directory</type>
+ </value-attributes>
+ <on-ambari-upgrade add="true"/>
+ </property>
+ <property>
+ <name>ams_admanager_heapsize</name>
+ <value>1024</value>
+ <description>Anomaly Detection Manager Heap Size</description>
+ <display-name>Anomaly Detection Manager Heap Size</display-name>
+ <value-attributes>
+ <type>int</type>
+ <unit>MB</unit>
+ <minimum>512</minimum>
+ <maximum>16384</maximum>
+ <increment-step>256</increment-step>
+ </value-attributes>
+ <on-ambari-upgrade add="true"/>
+ </property>
+ <property>
+ <name>content</name>
+ <display-name>ams-ad-env template</display-name>
+ <value>
+ # Set environment variables here.
+
+ # The java implementation to use. Java 1.8 required.
+ export JAVA_HOME={{java64_home}}
+
+ # Anomaly Detection Manager Log directory for log4j
+ export AMS_AD_LOG_DIR={{ams_admanager_log_dir}}
+
+ # Anomaly Detection Manager pid directory
+ export AMS_AD_PID_DIR={{ams_admanager_pid_dir}}
+
+ # Anomaly Detection Manager heapsize
+ export AMS_AD_HEAPSIZE={{ams_admanager_heapsize}}
+
+ # Anomaly Detection Manager data dir
+ export AMS_AD_DATA_DIR={{ams_admanager_data_dir}}
+
+ # Anomaly Detection Manager options
+ export AMS_AD_OPTS="
+ {% if security_enabled %}
+ export AMS_AD_OPTS="$AMS_AD_OPTS -Djava.security.auth.login.config={{ams_ad_jaas_config_file}}"
+ {% endif %}
+
+ # Anomaly Detection Manager GC options
+ export AMS_AD_GC_OPTS="-XX:+UseConcMarkSweepGC -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -Xloggc:{{ams_admanager_log_dir}}/admanager-gc.log-`date +'%Y%m%d%H%M'`"
+ export AMS_AD_OPTS="$AMS_AD_OPTS $AMS_AD_GC_OPTS"
+
+ </value>
+ <value-attributes>
+ <type>content</type>
+ </value-attributes>
+ <on-ambari-upgrade add="true"/>
+ </property>
+
+</configuration>
\ No newline at end of file
diff --git a/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/configuration/ams-site.xml b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/configuration/ams-site.xml
index 49dfd95..6bd25d2 100644
--- a/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/configuration/ams-site.xml
+++ b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/configuration/ams-site.xml
@@ -1,5 +1,4 @@
<?xml version="1.0"?>
-<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<!--
/**
*
diff --git a/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/metainfo.xml b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/metainfo.xml
index 78014b0..0add0cd 100644
--- a/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/metainfo.xml
+++ b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/metainfo.xml
@@ -114,13 +114,37 @@
<config-type>ams-grafana-env</config-type>
<config-type>ams-grafana-ini</config-type>
</configuration-dependencies>
+ <logs>
+ <log>
+ <logId>ams_grafana</logId>
+ <primary>true</primary>
+ </log>
+ </logs>
+ </component>
+
+ <component>
+ <name>AD_MANAGER</name>
+ <displayName>AD Manager</displayName>
+ <category>MASTER</category>
+ <cardinality>0-1</cardinality>
+ <versionAdvertised>false</versionAdvertised>
+ <commandScript>
+ <script>scripts/ams_admanager.py</script>
+ <scriptType>PYTHON</scriptType>
+ <timeout>1200</timeout>
+ </commandScript>
+ <configuration-dependencies>
+ <config-type>ams-hbase-site</config-type>
+ <config-type>ams-admanager-config</config-type>
+ <config-type>ams-admanager-env</config-type>
+ </configuration-dependencies>
+ <logs>
+ <log>
+ <logId>ams_anomaly_detection</logId>
+ <primary>true</primary>
+ </log>
+ </logs>
</component>
- <logs>
- <log>
- <logId>ams_grafana</logId>
- <primary>true</primary>
- </log>
- </logs>
</components>
<themes>
@@ -153,6 +177,11 @@
<condition>should_install_ams_grafana</condition>
</package>
<package>
+ <name>ambari-metrics-admanager</name>
+ <skipUpgrade>true</skipUpgrade>
+ <condition>should_install_ams_admanager</condition>
+ </package>
+ <package>
<name>gcc</name>
</package>
</packages>
diff --git a/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/scripts/ams.py b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/scripts/ams.py
index 9b15fae..fe6b4ec 100644
--- a/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/scripts/ams.py
+++ b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/scripts/ams.py
@@ -512,6 +512,41 @@ def ams(name=None, action=None):
pass
+ elif name == 'admanager':
+ ams_ad_directories = [
+ params.ams_ad_conf_dir,
+ params.ams_ad_log_dir,
+ params.ams_ad_data_dir,
+ params.ams_ad_pid_dir
+ ]
+
+ for ams_ad_dir in ams_ad_directories:
+ Directory(ams_ad_dir,
+ owner=params.ams_user,
+ group=params.user_group,
+ mode=0755,
+ create_parents=True,
+ recursive_ownership=True
+ )
+
+ File(format("{ams_ad_conf_dir}/ams-admanager-env.sh"),
+ owner=params.ams_user,
+ group=params.user_group,
+ content=InlineTemplate(params.ams_grafana_env_sh_template)
+ )
+
+ File(format("{conf_dir}/config.yaml"),
+ content=Template("config.yaml.j2"),
+ owner=params.ams_user,
+ group=params.user_group
+ )
+
+ if action != 'stop':
+ for dir in ams_ad_directories:
+ Execute(('chown', '-R', params.ams_user, dir),
+ sudo=True
+ )
+
def is_spnego_enabled(params):
if 'core-site' in params.config['configurations'] \
and 'hadoop.http.authentication.type' in params.config['configurations']['core-site'] \
diff --git a/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/scripts/ams_admanager.py b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/scripts/ams_admanager.py
new file mode 100644
index 0000000..96c4454
--- /dev/null
+++ b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/scripts/ams_admanager.py
@@ -0,0 +1,73 @@
+#!/usr/bin/env python
+"""
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+"""
+from resource_management import Script, Execute
+from resource_management.libraries.functions import format
+from status import check_service_status
+from ams import ams
+from resource_management.core.logger import Logger
+from resource_management.core import sudo
+
+class AmsADManager(Script):
+ def install(self, env):
+ import params
+ env.set_params(params)
+ self.install_packages(env)
+ self.configure(env) # for security
+
+ def configure(self, env, action = None):
+ import params
+ env.set_params(params)
+ ams(name='admanager', action=action)
+
+ def start(self, env):
+ import params
+ env.set_params(params)
+ self.configure(env, action = 'start')
+
+ start_cmd = format("{ams_admanager_script} start")
+ Execute(start_cmd,
+ user=params.ams_user
+ )
+ pidfile = format("{ams_ad_pid_dir}/admanager.pid")
+ if not sudo.path_exists(pidfile):
+ Logger.warning("Pid file doesn't exist after starting of the component.")
+ else:
+ Logger.info("AD Manager Server has started with pid: {0}".format(sudo.read_file(pidfile).strip()))
+
+
+ def stop(self, env):
+ import params
+ env.set_params(params)
+ self.configure(env, action = 'stop')
+ Execute((format("{ams_admanager_script}"), 'stop'),
+ sudo=True
+ )
+
+ def status(self, env):
+ import status_params
+ env.set_params(status_params)
+ check_service_status(env, name='admanager')
+
+ def get_pid_files(self):
+ import status_params
+ return [status_params.ams_ad_pid_file]
+
+if __name__ == "__main__":
+ AmsADManager().execute()
diff --git a/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/scripts/params.py b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/scripts/params.py
index 6389696..5d49939 100644
--- a/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/scripts/params.py
+++ b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/scripts/params.py
@@ -187,6 +187,15 @@ ams_hbase_home_dir = "/usr/lib/ams-hbase/"
ams_hbase_init_check_enabled = default("/configurations/ams-site/timeline.metrics.hbase.init.check.enabled", True)
+# AD Manager settings
+ams_ad_conf_dir = '/etc/ambari-metrics-anomaly-detection/conf'
+ams_ad_log_dir = default("/configurations/ams-ad-env/ams_admanager_log_dir", 'var/log/ambari-metrics-anomaly-detection')
+ams_ad_pid_dir = status_params.ams_admanager_pid_dir
+ams_ad_data_dir = default("/configurations/ams-ad-env/ams_admanager_data_dir", '/var/lib/ambari-metrics-anomaly-detection')
+
+ams_admanager_script = "/usr/sbin/ambari-metrics-admanager"
+ams_admanager_port = config['configurations']['ams-admanager-config']['ambari.metrics.admanager.application.port']
+
#hadoop params
hbase_excluded_hosts = config['commandParams']['excluded_hosts']
diff --git a/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/scripts/status.py b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/scripts/status.py
index 0b24ac0..e2af793 100644
--- a/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/scripts/status.py
+++ b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/scripts/status.py
@@ -43,6 +43,8 @@ def check_service_status(env, name):
check_process_status(status_params.monitor_pid_file)
elif name == 'grafana':
check_process_status(status_params.grafana_pid_file)
+ elif name == 'admanager':
+ check_process_status(status_params.ams_ad_pid_file)
@OsFamilyFuncImpl(os_family=OSConst.WINSRV_FAMILY)
def check_service_status(name):
diff --git a/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/scripts/status_params.py b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/scripts/status_params.py
index 27c6020..bc9b7e3 100644
--- a/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/scripts/status_params.py
+++ b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/scripts/status_params.py
@@ -33,9 +33,11 @@ hbase_user = ams_user
ams_collector_pid_dir = config['configurations']['ams-env']['metrics_collector_pid_dir']
ams_monitor_pid_dir = config['configurations']['ams-env']['metrics_monitor_pid_dir']
ams_grafana_pid_dir = config['configurations']['ams-grafana-env']['metrics_grafana_pid_dir']
+ams_admanager_pid_dir = config['configurations']['ams-ad-env']['ams_admanager_pid_dir']
monitor_pid_file = format("{ams_monitor_pid_dir}/ambari-metrics-monitor.pid")
grafana_pid_file = format("{ams_grafana_pid_dir}/grafana-server.pid")
+ams_ad_pid_file = format("{ams_ad_pid_dir}/admanager.pid")
security_enabled = config['configurations']['cluster-env']['security_enabled']
ams_hbase_conf_dir = format("{hbase_conf_dir}")
diff --git a/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/templates/admanager_config.yaml.j2 b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/templates/admanager_config.yaml.j2
new file mode 100644
index 0000000..787aa3e
--- /dev/null
+++ b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/templates/admanager_config.yaml.j2
@@ -0,0 +1,24 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+server:
+ applicationConnectors:
+ - type: http
+ port: {{ams_admanager_port}}
+ requestLog:
+ type: external
+
+logging:
+ type: external
--
To stop receiving notification emails like this one, please contact
avijayan@apache.org.