You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by av...@apache.org on 2016/12/22 20:09:49 UTC
ambari git commit: AMBARI-19273 : Refine AmbariServer Metrics service
and enable JVM metrics source by default.
Repository: ambari
Updated Branches:
refs/heads/trunk c695e50a7 -> 6d196db0d
AMBARI-19273 : Refine AmbariServer Metrics service and enable JVM metrics source by default.
Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/6d196db0
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/6d196db0
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/6d196db0
Branch: refs/heads/trunk
Commit: 6d196db0d80a145bc323a03aff74533938836392
Parents: c695e50
Author: Aravindan Vijayan <av...@hortonworks.com>
Authored: Thu Dec 22 12:09:27 2016 -0800
Committer: Aravindan Vijayan <av...@hortonworks.com>
Committed: Thu Dec 22 12:09:27 2016 -0800
----------------------------------------------------------------------
ambari-server/conf/unix/metrics.properties | 15 +-
ambari-server/conf/windows/metrics.properties | 28 +++
.../src/main/assemblies/server-windows.xml | 4 +
ambari-server/src/main/assemblies/server.xml | 4 +
.../server/configuration/Configuration.java | 8 +
.../ambari/server/controller/AmbariServer.java | 19 +-
.../server/metrics/system/AmbariMetricSink.java | 34 ---
.../server/metrics/system/MetricsService.java | 16 +-
.../server/metrics/system/MetricsSink.java | 43 ++++
.../server/metrics/system/MetricsSource.java | 5 +-
.../server/metrics/system/SingleMetric.java | 44 ++++
.../system/impl/AbstractMetricsSource.java | 15 +-
.../system/impl/AmbariMetricSinkImpl.java | 239 ++++++++++++++-----
.../metrics/system/impl/Configuration.java | 83 -------
.../metrics/system/impl/JvmMetricsSource.java | 54 +++--
.../system/impl/MetricsConfiguration.java | 89 +++++++
.../metrics/system/impl/MetricsServiceImpl.java | 154 +++++-------
.../system/impl/JvmMetricsSourceTest.java | 36 +++
.../metric/system/impl/MetricsServiceTest.java | 40 ++++
.../system/impl/TestAmbariMetricsSinkImpl.java | 79 ++++++
.../metric/system/impl/TestMetricsSource.java | 37 +++
.../src/test/resources/metrics.properties | 29 +++
22 files changed, 749 insertions(+), 326 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ambari/blob/6d196db0/ambari-server/conf/unix/metrics.properties
----------------------------------------------------------------------
diff --git a/ambari-server/conf/unix/metrics.properties b/ambari-server/conf/unix/metrics.properties
index 5f01e39..3ee22d6 100644
--- a/ambari-server/conf/unix/metrics.properties
+++ b/ambari-server/conf/unix/metrics.properties
@@ -17,15 +17,12 @@
# limitations under the License.
-# Metrics sources info
-metrics.sources=jvm
-
+#### Source Configs #####
# Source interval determines how often the metric is sent to sink. Its unit is in seconds
-source.jvm.interval=5
-source.jvm.class=org.apache.ambari.server.metrics.system.impl.JvmMetricsSource
+metric.sources=jvm
-#source.database.interval=10
-#source.database.class=org.apache.ambari.server.metrics.system.impl.DbMetricSource
+source.jvm.interval=10
+source.jvm.class=org.apache.ambari.server.metrics.system.impl.JvmMetricsSource
-# Sink frequency determines how often the sink publish the metrics from buffer to AMS.
-sink.frequency=10
\ No newline at end of file
+#Override Ambari Server hostname for metrics
+#ambariserver.hostname.override=
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ambari/blob/6d196db0/ambari-server/conf/windows/metrics.properties
----------------------------------------------------------------------
diff --git a/ambari-server/conf/windows/metrics.properties b/ambari-server/conf/windows/metrics.properties
new file mode 100644
index 0000000..3ee22d6
--- /dev/null
+++ b/ambari-server/conf/windows/metrics.properties
@@ -0,0 +1,28 @@
+# Copyright 2011 The Apache Software Foundation
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+#### Source Configs #####
+# Source interval determines how often the metric is sent to sink. Its unit is in seconds
+metric.sources=jvm
+
+source.jvm.interval=10
+source.jvm.class=org.apache.ambari.server.metrics.system.impl.JvmMetricsSource
+
+#Override Ambari Server hostname for metrics
+#ambariserver.hostname.override=
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ambari/blob/6d196db0/ambari-server/src/main/assemblies/server-windows.xml
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/assemblies/server-windows.xml b/ambari-server/src/main/assemblies/server-windows.xml
index 191fcfb..ce1e270 100644
--- a/ambari-server/src/main/assemblies/server-windows.xml
+++ b/ambari-server/src/main/assemblies/server-windows.xml
@@ -45,6 +45,10 @@
<outputDirectory>/ambari-server-${project.version}/conf</outputDirectory>
</file>
<file>
+ <source>${basedir}/conf/windows/metrics.properties</source>
+ <outputDirectory>/ambari-server-${project.version}/conf</outputDirectory>
+ </file>
+ <file>
<source>${basedir}/conf/windows/ca.config</source>
<outputDirectory>/ambari-server-${project.version}/keystore</outputDirectory>
</file>
http://git-wip-us.apache.org/repos/asf/ambari/blob/6d196db0/ambari-server/src/main/assemblies/server.xml
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/assemblies/server.xml b/ambari-server/src/main/assemblies/server.xml
index 3a61477..5055d46 100644
--- a/ambari-server/src/main/assemblies/server.xml
+++ b/ambari-server/src/main/assemblies/server.xml
@@ -219,6 +219,10 @@
<outputDirectory>/etc/ambari-server/conf</outputDirectory>
</file>
<file>
+ <source>conf/unix/metrics.properties</source>
+ <outputDirectory>/etc/ambari-server/conf</outputDirectory>
+ </file>
+ <file>
<source>conf/unix/krb5JAASLogin.conf</source>
<outputDirectory>/etc/ambari-server/conf</outputDirectory>
</file>
http://git-wip-us.apache.org/repos/asf/ambari/blob/6d196db0/ambari-server/src/main/java/org/apache/ambari/server/configuration/Configuration.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/configuration/Configuration.java b/ambari-server/src/main/java/org/apache/ambari/server/configuration/Configuration.java
index 03af391..d2f7934 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/configuration/Configuration.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/configuration/Configuration.java
@@ -2586,6 +2586,14 @@ public class Configuration {
public static final ConfigurationProperty<Integer> LOGSEARCH_PORTAL_READ_TIMEOUT = new ConfigurationProperty<>(
"logsearch.portal.read.timeout", 5000);
+
+ /**
+ * Global disable flag for AmbariServer Metrics.
+ */
+ @Markdown(description = "Global disable flag for AmbariServer Metrics.")
+ public static final ConfigurationProperty<Boolean> AMBARISERVER_METRICS_DISABLE = new ConfigurationProperty<>(
+ "ambariserver.metrics.disable", false);
+
private static final Logger LOG = LoggerFactory.getLogger(
Configuration.class);
http://git-wip-us.apache.org/repos/asf/ambari/blob/6d196db0/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariServer.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariServer.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariServer.java
index 65a4b1c..1704546 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariServer.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariServer.java
@@ -28,8 +28,6 @@ import java.net.URL;
import java.util.EnumSet;
import java.util.Enumeration;
import java.util.Map;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
import java.util.logging.LogManager;
import javax.crypto.BadPaddingException;
@@ -542,6 +540,9 @@ public class AmbariServer {
ExecutionScheduleManager executionScheduleManager = injector
.getInstance(ExecutionScheduleManager.class);
+ MetricsService metricsService = injector.getInstance(
+ MetricsService.class);
+
clusterController = controller;
StateRecoveryManager recoveryManager = injector.getInstance(
@@ -554,14 +555,6 @@ public class AmbariServer {
*/
server.start();
- // TODO, start every other tread.
- final ExecutorService executor = Executors.newSingleThreadExecutor();
- MetricsService metricsService = injector.getInstance(
- MetricsService.class);
- metricsService.init();
- executor.submit(metricsService);
- LOG.info("********* Started Ambari Metrics **********");
-
serverForAgent.start();
LOG.info("********* Started Server **********");
@@ -574,6 +567,12 @@ public class AmbariServer {
serviceManager.startAsync();
LOG.info("********* Started Services **********");
+ if (!Configuration.AMBARISERVER_METRICS_DISABLE.equals(true)) {
+ metricsService.start();
+ } else {
+ LOG.info("AmbariServer Metrics disabled.");
+ }
+
server.join();
LOG.info("Joined the Server");
} catch (BadPaddingException bpe) {
http://git-wip-us.apache.org/repos/asf/ambari/blob/6d196db0/ambari-server/src/main/java/org/apache/ambari/server/metrics/system/AmbariMetricSink.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/metrics/system/AmbariMetricSink.java b/ambari-server/src/main/java/org/apache/ambari/server/metrics/system/AmbariMetricSink.java
deleted file mode 100644
index 809176be..0000000
--- a/ambari-server/src/main/java/org/apache/ambari/server/metrics/system/AmbariMetricSink.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-//package org.apache.hadoop.metrics2.sink.ambari;
-
-package org.apache.ambari.server.metrics.system;
-
-import java.util.Map;
-
-public interface AmbariMetricSink {
- /**
- * initialize Collector URI and sink frequency to publish the metrics to AMS
- **/
- void init(String protocol, String collectorUri, int frequency);
-
- /**
- * Publish metrics to Collector
- **/
- void publish(Map<String, Number> metricsMap);
-}
http://git-wip-us.apache.org/repos/asf/ambari/blob/6d196db0/ambari-server/src/main/java/org/apache/ambari/server/metrics/system/MetricsService.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/metrics/system/MetricsService.java b/ambari-server/src/main/java/org/apache/ambari/server/metrics/system/MetricsService.java
index 23845c9..29db6a8 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/metrics/system/MetricsService.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/metrics/system/MetricsService.java
@@ -15,12 +15,24 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.ambari.server.metrics.system;
-public interface MetricsService extends Runnable {
+import java.util.Collection;
+
+import org.apache.ambari.server.metrics.system.impl.AbstractMetricsSource;
+
+
+public interface MetricsService{
/**
* Set up configuration
**/
- void init();
+ void start();
+
+ /**
+ * Get Configured sources
+ * @return
+ */
+ Collection<AbstractMetricsSource> getSources();
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/6d196db0/ambari-server/src/main/java/org/apache/ambari/server/metrics/system/MetricsSink.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/metrics/system/MetricsSink.java b/ambari-server/src/main/java/org/apache/ambari/server/metrics/system/MetricsSink.java
new file mode 100644
index 0000000..803e6e0
--- /dev/null
+++ b/ambari-server/src/main/java/org/apache/ambari/server/metrics/system/MetricsSink.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.server.metrics.system;
+
+import java.util.List;
+
+import org.apache.ambari.server.metrics.system.impl.MetricsConfiguration;
+
+
+public interface MetricsSink {
+
+ /**
+ * initialize Sink passing in configuration
+ **/
+ void init(MetricsConfiguration configuration);
+
+ /**
+ * Publish metrics to Collector
+ **/
+ void publish(List<SingleMetric> metrics);
+
+
+ /**
+ * Returns if the sink is initialized.
+ */
+ boolean isInitialized();
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/6d196db0/ambari-server/src/main/java/org/apache/ambari/server/metrics/system/MetricsSource.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/metrics/system/MetricsSource.java b/ambari-server/src/main/java/org/apache/ambari/server/metrics/system/MetricsSource.java
index cf10408..b0ccb3c 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/metrics/system/MetricsSource.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/metrics/system/MetricsSource.java
@@ -15,11 +15,14 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.ambari.server.metrics.system;
+import org.apache.ambari.server.metrics.system.impl.MetricsConfiguration;
+
public interface MetricsSource extends Runnable{
/**
* initialize sink
**/
- void init(AmbariMetricSink sink);
+ void init(MetricsConfiguration configuration, MetricsSink sink);
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/6d196db0/ambari-server/src/main/java/org/apache/ambari/server/metrics/system/SingleMetric.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/metrics/system/SingleMetric.java b/ambari-server/src/main/java/org/apache/ambari/server/metrics/system/SingleMetric.java
new file mode 100644
index 0000000..6626c8ab
--- /dev/null
+++ b/ambari-server/src/main/java/org/apache/ambari/server/metrics/system/SingleMetric.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.server.metrics.system;
+
+public class SingleMetric {
+
+ String metricName;
+ double value;
+ long timestamp;
+
+ public SingleMetric (String metricName, double value, long timestamp) {
+ this.metricName = metricName;
+ this.value = value;
+ this.timestamp = timestamp;
+ }
+
+ public String getMetricName() {
+ return metricName;
+ }
+
+ public double getValue() {
+ return value;
+ }
+
+ public long getTimestamp() {
+ return timestamp;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/6d196db0/ambari-server/src/main/java/org/apache/ambari/server/metrics/system/impl/AbstractMetricsSource.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/metrics/system/impl/AbstractMetricsSource.java b/ambari-server/src/main/java/org/apache/ambari/server/metrics/system/impl/AbstractMetricsSource.java
index cb90a13..34ac872 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/metrics/system/impl/AbstractMetricsSource.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/metrics/system/impl/AbstractMetricsSource.java
@@ -17,25 +17,26 @@
*/
package org.apache.ambari.server.metrics.system.impl;
-import java.util.Map;
+import java.util.List;
-import org.apache.ambari.server.metrics.system.AmbariMetricSink;
+import org.apache.ambari.server.metrics.system.MetricsSink;
import org.apache.ambari.server.metrics.system.MetricsSource;
+import org.apache.ambari.server.metrics.system.SingleMetric;
public abstract class AbstractMetricsSource implements MetricsSource {
- protected AmbariMetricSink sink;
+ protected MetricsSink sink;
/**
* Pass metrics sink to metrics source
**/
@Override
- public void init(AmbariMetricSink sink) {
- this.sink = sink;
+ public void init(MetricsConfiguration configuration, MetricsSink sink) {
+ this.sink = sink;
}
/**
* Get metrics at the instance
* @return a map for metrics that maps metrics name to metrics value
**/
- abstract public Map<String, Number> getMetrics();
-}
+ abstract public List<SingleMetric> getMetrics();
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ambari/blob/6d196db0/ambari-server/src/main/java/org/apache/ambari/server/metrics/system/impl/AmbariMetricSinkImpl.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/metrics/system/impl/AmbariMetricSinkImpl.java b/ambari-server/src/main/java/org/apache/ambari/server/metrics/system/impl/AmbariMetricSinkImpl.java
index 391fe11..ba5f983 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/metrics/system/impl/AmbariMetricSinkImpl.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/metrics/system/impl/AmbariMetricSinkImpl.java
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -17,73 +17,177 @@
*/
package org.apache.ambari.server.metrics.system.impl;
-import java.io.IOException;
+
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
-import org.apache.ambari.server.metrics.system.AmbariMetricSink;
-import org.apache.commons.lang.ClassUtils;
+import org.apache.ambari.server.controller.AmbariManagementController;
+import org.apache.ambari.server.controller.internal.AbstractControllerResourceProvider;
+import org.apache.ambari.server.controller.internal.ServiceConfigVersionResourceProvider;
+import org.apache.ambari.server.controller.spi.Predicate;
+import org.apache.ambari.server.controller.spi.Request;
+import org.apache.ambari.server.controller.spi.Resource;
+import org.apache.ambari.server.controller.spi.ResourceProvider;
+import org.apache.ambari.server.controller.utilities.PredicateBuilder;
+import org.apache.ambari.server.controller.utilities.PropertyHelper;
+import org.apache.ambari.server.metrics.system.MetricsSink;
+import org.apache.ambari.server.metrics.system.SingleMetric;
+import org.apache.ambari.server.security.authorization.internal.InternalAuthenticationToken;
+import org.apache.ambari.server.state.Cluster;
+import org.apache.ambari.server.state.Clusters;
+import org.apache.ambari.server.state.Service;
+import org.apache.ambari.server.state.ServiceComponent;
+import org.apache.ambari.server.state.ServiceComponentHost;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.metrics2.sink.timeline.AbstractTimelineMetricsSink;
import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
-import jline.internal.Log;
+import org.apache.hadoop.metrics2.sink.timeline.cache.TimelineMetricsCache;
+import org.springframework.security.core.context.SecurityContextHolder;
+
+public class AmbariMetricSinkImpl extends AbstractTimelineMetricsSink implements MetricsSink {
+ private static final String AMBARI_SERVER_APP_ID = "ambari_server";
+ private Collection<String> collectorHosts;
-public class AmbariMetricSinkImpl extends AbstractTimelineMetricsSink implements AmbariMetricSink {
- private static final String APP_ID = "ambari_server";
- private int timeoutSeconds = 10;
- private String collectorProtocol;
private String collectorUri;
+ private String port;
+ private String protocol;
private String hostName;
- private int counter = 0;
- private int frequency;
- private List<TimelineMetric> buffer = new ArrayList<>();
+ private AmbariManagementController ambariManagementController;
+ private TimelineMetricsCache timelineMetricsCache;
+ private boolean isInitialized = false;
+
+ public AmbariMetricSinkImpl(AmbariManagementController amc) {
+ this.ambariManagementController = amc;
+ }
+
@Override
- public void init(String protocol, String collectorUri, int frequency) {
+ public void init(MetricsConfiguration configuration) {
+
+ if (ambariManagementController == null) {
+ return;
+ }
+
+ InternalAuthenticationToken authenticationToken = new InternalAuthenticationToken("admin");
+ authenticationToken.setAuthenticated(true);
+ SecurityContextHolder.getContext().setAuthentication(authenticationToken);
+ Clusters clusters = ambariManagementController.getClusters();
+ String ambariMetricsServiceName = "AMBARI_METRICS";
+ collectorHosts = new HashSet<>();
+
+ for (Map.Entry<String, Cluster> kv : clusters.getClusters().entrySet()) {
+ String clusterName = kv.getKey();
+ Cluster c = kv.getValue();
+ Resource.Type type = Resource.Type.ServiceConfigVersion;
+
+ Set<String> propertyIds = new HashSet<String>();
+ propertyIds.add(ServiceConfigVersionResourceProvider.SERVICE_CONFIG_VERSION_CONFIGURATIONS_PROPERTY_ID);
+
+ Predicate predicate = new PredicateBuilder().property(
+ ServiceConfigVersionResourceProvider.SERVICE_CONFIG_VERSION_CLUSTER_NAME_PROPERTY_ID).equals(clusterName).and().property(
+ ServiceConfigVersionResourceProvider.SERVICE_CONFIG_VERSION_SERVICE_NAME_PROPERTY_ID).equals(ambariMetricsServiceName).and().property(
+ ServiceConfigVersionResourceProvider.SERVICE_CONFIG_VERSION_IS_CURRENT_PROPERTY_ID).equals("true").toPredicate();
+
+ Request request = PropertyHelper.getReadRequest(propertyIds);
+
+ ResourceProvider provider = AbstractControllerResourceProvider.getResourceProvider(
+ type,
+ PropertyHelper.getPropertyIds(type),
+ PropertyHelper.getKeyPropertyIds(type),
+ ambariManagementController);
+
+ try {
+ //get collector host(s)
+ Service service = c.getService(ambariMetricsServiceName);
+ if (service != null) {
+ for (String component : service.getServiceComponents().keySet()) {
+ ServiceComponent sc = service.getServiceComponents().get(component);
+ for (ServiceComponentHost serviceComponentHost : sc.getServiceComponentHosts().values()) {
+ if (serviceComponentHost.getServiceComponentName().equals("METRICS_COLLECTOR")) {
+ collectorHosts.add(serviceComponentHost.getHostName());
+ }
+ }
+ }
+ }
+
+ // get collector port and protocol
+ Set<Resource> resources = provider.getResources(request, predicate);
+
+ for (Resource resource : resources) {
+ if (resource != null) {
+ ArrayList<LinkedHashMap<Object, Object>> configs = (ArrayList<LinkedHashMap<Object, Object>>)
+ resource.getPropertyValue(ServiceConfigVersionResourceProvider.SERVICE_CONFIG_VERSION_CONFIGURATIONS_PROPERTY_ID);
+ for (LinkedHashMap<Object, Object> config : configs) {
+ if (config != null && config.get("type").equals("ams-site")) {
+ TreeMap<Object, Object> properties = (TreeMap<Object, Object>) config.get("properties");
+ String timelineWebappAddress = (String) properties.get("timeline.metrics.service.webapp.address");
+ if (StringUtils.isNotEmpty(timelineWebappAddress) && timelineWebappAddress.contains(":")) {
+ port = timelineWebappAddress.split(":")[1];
+ }
+ String httpPolicy = (String) properties.get("timeline.metrics.service.http.policy");
+ protocol = httpPolicy.equals("HTTP_ONLY") ? "http" : "https";
+ break;
+ }
+ }
+ }
+ }
+ } catch (Exception e) {
+ LOG.info("Exception caught when retrieving Collector URI", e);
+ }
+ }
+
+ collectorUri = getCollectorUri(findPreferredCollectHost());
+ hostName = configuration.getProperty("ambariserver.hostname.override", getDefaultLocalHostName());
+
+ int maxRowCacheSize = Integer.parseInt(configuration.getProperty(MAX_METRIC_ROW_CACHE_SIZE,
+ String.valueOf(TimelineMetricsCache.MAX_RECS_PER_NAME_DEFAULT)));
+ int metricsSendInterval = Integer.parseInt(configuration.getProperty(METRICS_SEND_INTERVAL,
+ String.valueOf(TimelineMetricsCache.MAX_EVICTION_TIME_MILLIS)));
- /**
- * Protocol is either HTTP or HTTPS, and the collectorURI is the domain name of the collector
- * An example of the complete collector URI might be: http://c6403.ambari.org/ws/v1/timeline/metrics
- */
- this.frequency = frequency;
- this.collectorProtocol = protocol;
- this.collectorUri = getCollectorUri(collectorUri);
+ timelineMetricsCache = new TimelineMetricsCache(maxRowCacheSize, metricsSendInterval);
+ if (CollectionUtils.isNotEmpty(collectorHosts)) {
+ isInitialized = true;
+ }
+ }
+
+ private String getDefaultLocalHostName() {
try {
- hostName = InetAddress.getLocalHost().getHostName();
+ return InetAddress.getLocalHost().getCanonicalHostName();
} catch (UnknownHostException e) {
- Log.info("Error getting host address");
+ LOG.info("Error getting host address");
}
+ return null;
}
@Override
- public void publish(Map<String, Number> metricsMap) {
- List<TimelineMetric> metricsList = createMetricsList(metricsMap);
+ public void publish(List<SingleMetric> metrics) {
- if(counter > frequency) {
- TimelineMetrics timelineMetrics = new TimelineMetrics();
- timelineMetrics.setMetrics(buffer);
- String connectUrl = collectorUri;
- String jsonData = null;
- try {
- jsonData = mapper.writeValueAsString(timelineMetrics);
- } catch (IOException e) {
- LOG.error("Unable to parse metrics", e);
- }
- if (jsonData != null) {
- emitMetricsJson(connectUrl, jsonData);
+ //If Sink not yet initialized, drop the metrics on the floor.
+ if (isInitialized) {
+ List<TimelineMetric> metricList = getFilteredMetricList(metrics);
+ if (!metricList.isEmpty()) {
+ TimelineMetrics timelineMetrics = new TimelineMetrics();
+ timelineMetrics.setMetrics(metricList);
+ emitMetrics(timelineMetrics);
}
- counter = 0;
- } else {
- buffer.addAll(metricsList);
- counter++;
}
+ }
+ @Override
+ public boolean isInitialized() {
+ return isInitialized;
}
@@ -94,22 +198,22 @@ public class AmbariMetricSinkImpl extends AbstractTimelineMetricsSink implements
*/
@Override
protected String getCollectorUri(String host) {
- return getCollectorProtocol() + "://" + host + WS_V1_TIMELINE_METRICS;
+ return constructContainerMetricUri(protocol, host, port);
}
@Override
protected String getCollectorProtocol() {
- return collectorProtocol;
+ return protocol;
}
@Override
protected String getCollectorPort() {
- return null;
+ return port;
}
@Override
protected int getTimeoutSeconds() {
- return timeoutSeconds;
+ return 10;
}
/**
@@ -119,6 +223,7 @@ public class AmbariMetricSinkImpl extends AbstractTimelineMetricsSink implements
*/
@Override
protected String getZookeeperQuorum() {
+ //Ignoring Zk Fallback.
return null;
}
@@ -129,7 +234,7 @@ public class AmbariMetricSinkImpl extends AbstractTimelineMetricsSink implements
*/
@Override
protected Collection<String> getConfiguredCollectorHosts() {
- return null;
+ return collectorHosts;
}
/**
@@ -142,27 +247,33 @@ public class AmbariMetricSinkImpl extends AbstractTimelineMetricsSink implements
return hostName;
}
- private List<TimelineMetric> createMetricsList(Map<String, Number> metricsMap) {
- final List<TimelineMetric> metricsList = new ArrayList<>();
- for (Map.Entry<String, Number> entry : metricsMap.entrySet()) {
- final long currentTimeMillis = System.currentTimeMillis();
- String metricsName = entry.getKey();
- Number value = entry.getValue();
- TimelineMetric metric = createTimelineMetric(currentTimeMillis, APP_ID, metricsName, value);
- metricsList.add(metric);
+ private List<TimelineMetric> getFilteredMetricList(List<SingleMetric> metrics) {
+ final List<TimelineMetric> metricList = new ArrayList<>();
+ for (SingleMetric metric : metrics) {
+
+ String metricName = metric.getMetricName();
+ Double value = metric.getValue();
+
+ TimelineMetric timelineMetric = createTimelineMetric(metric.getTimestamp(), AMBARI_SERVER_APP_ID, metricName, value);
+ timelineMetricsCache.putTimelineMetric(timelineMetric, false);
+ TimelineMetric cachedMetric = timelineMetricsCache.getTimelineMetric(metricName);
+
+ if (cachedMetric != null) {
+ metricList.add(cachedMetric);
+ }
}
- return metricsList;
+ return metricList;
}
private TimelineMetric createTimelineMetric(long currentTimeMillis, String component, String attributeName,
Number attributeValue) {
- TimelineMetric timelineMetric = new TimelineMetric();
- timelineMetric.setMetricName(attributeName);
- timelineMetric.setHostName(hostName);
- timelineMetric.setAppId(component);
- timelineMetric.setStartTime(currentTimeMillis);
- timelineMetric.setType(ClassUtils.getShortCanonicalName(attributeValue, "Number"));
- timelineMetric.getMetricValues().put(currentTimeMillis, attributeValue.doubleValue());
- return timelineMetric;
+ TimelineMetric timelineMetric = new TimelineMetric();
+ timelineMetric.setMetricName(attributeName);
+ timelineMetric.setHostName(hostName);
+ timelineMetric.setAppId(component);
+ timelineMetric.setStartTime(currentTimeMillis);
+
+ timelineMetric.getMetricValues().put(currentTimeMillis, attributeValue.doubleValue());
+ return timelineMetric;
}
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/6d196db0/ambari-server/src/main/java/org/apache/ambari/server/metrics/system/impl/Configuration.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/metrics/system/impl/Configuration.java b/ambari-server/src/main/java/org/apache/ambari/server/metrics/system/impl/Configuration.java
deleted file mode 100644
index 44a3de0..0000000
--- a/ambari-server/src/main/java/org/apache/ambari/server/metrics/system/impl/Configuration.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.ambari.server.metrics.system.impl;
-
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.Properties;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class Configuration {
- public static final String CONFIG_FILE = "metrics.properties";
-
- private static Logger LOG = LoggerFactory.getLogger(Configuration.class);
- private Properties properties;
-
- public Configuration() {
- this(readConfigFile());
- }
-
- public Configuration(Properties properties) {
- this.properties = properties;
- }
-
- private static Properties readConfigFile() {
- Properties properties = new Properties();
-
- //Get property file stream from classpath
- InputStream inputStream = Configuration.class.getClassLoader().getResourceAsStream(CONFIG_FILE);
-
- if (inputStream == null) {
- throw new RuntimeException(CONFIG_FILE + " not found in classpath");
- }
-
- // load the properties
- try {
- properties.load(inputStream);
- inputStream.close();
- } catch (FileNotFoundException fnf) {
- LOG.info("No configuration file " + CONFIG_FILE + " found in classpath.", fnf);
- } catch (IOException ie) {
- throw new IllegalArgumentException("Can't read configuration file " +
- CONFIG_FILE, ie);
- }
-
- return properties;
- }
-
- /**
- * Get the property value for the given key.
- *
- * @return the property value
- */
- public String getProperty(String key) {
- return properties.getProperty(key);
- }
-
- /**
- * Get the property value for the given key.
- *
- * @return the property value
- */
- public String getProperty(String key, String defaultValue) {
- return properties.getProperty(key, defaultValue);
- }
-}
http://git-wip-us.apache.org/repos/asf/ambari/blob/6d196db0/ambari-server/src/main/java/org/apache/ambari/server/metrics/system/impl/JvmMetricsSource.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/metrics/system/impl/JvmMetricsSource.java b/ambari-server/src/main/java/org/apache/ambari/server/metrics/system/impl/JvmMetricsSource.java
index ef07deb..2e4747b 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/metrics/system/impl/JvmMetricsSource.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/metrics/system/impl/JvmMetricsSource.java
@@ -18,17 +18,22 @@
package org.apache.ambari.server.metrics.system.impl;
import java.lang.management.ManagementFactory;
-import java.util.HashMap;
+import java.util.ArrayList;
+import java.util.List;
import java.util.Map;
-import org.apache.ambari.server.metrics.system.AmbariMetricSink;
+import org.apache.ambari.server.metrics.system.MetricsSink;
+import org.apache.ambari.server.metrics.system.SingleMetric;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.codahale.metrics.Gauge;
import com.codahale.metrics.Metric;
+import com.codahale.metrics.MetricFilter;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.MetricSet;
import com.codahale.metrics.jvm.BufferPoolMetricSet;
+import com.codahale.metrics.jvm.FileDescriptorRatioGauge;
import com.codahale.metrics.jvm.GarbageCollectorMetricSet;
import com.codahale.metrics.jvm.MemoryUsageGaugeSet;
import com.codahale.metrics.jvm.ThreadStatesGaugeSet;
@@ -38,21 +43,21 @@ public class JvmMetricsSource extends AbstractMetricsSource {
private static Logger LOG = LoggerFactory.getLogger(JvmMetricsSource.class);
@Override
- public void init(AmbariMetricSink sink) {
- super.init(sink);
- registerAll("gc", new GarbageCollectorMetricSet(), registry);
- registerAll("buffers", new BufferPoolMetricSet(ManagementFactory.getPlatformMBeanServer()), registry);
- registerAll("memory", new MemoryUsageGaugeSet(), registry);
- registerAll("threads", new ThreadStatesGaugeSet(), registry);
+ public void init(MetricsConfiguration configuration, MetricsSink sink) {
+ super.init(configuration, sink);
+ registerAll("jvm.gc", new GarbageCollectorMetricSet(), registry);
+ registerAll("jvm.buffers", new BufferPoolMetricSet(ManagementFactory.getPlatformMBeanServer()), registry);
+ registerAll("jvm.memory", new MemoryUsageGaugeSet(), registry);
+ registerAll("jvm.threads", new ThreadStatesGaugeSet(), registry);
+ registry.register("jvm.file.open.descriptor.ratio", new FileDescriptorRatioGauge());
}
@Override
public void run() {
- this.sink.publish(getMetrics());
- LOG.info("********* Published system metrics to sink **********");
+ sink.publish(getMetrics());
+ LOG.debug("********* Published system metrics to sink **********");
}
-
private void registerAll(String prefix, MetricSet metricSet, MetricRegistry registry) {
for (Map.Entry<String, Metric> entry : metricSet.getMetrics().entrySet()) {
if (entry.getValue() instanceof MetricSet) {
@@ -64,13 +69,26 @@ public class JvmMetricsSource extends AbstractMetricsSource {
}
@Override
- public Map<String, Number> getMetrics() {
- Map<String, Number> map = new HashMap<>();
- for (String metricName : registry.getGauges().keySet()) {
- if (metricName.equals("threads.deadlocks") ) continue;
- Number value = (Number)registry.getGauges().get(metricName).getValue();
- map.put(metricName, value);
+ public List<SingleMetric> getMetrics() {
+
+ List<SingleMetric> metrics = new ArrayList<>();
+ Map<String, Gauge> gaugeSet = registry.getGauges(new NonNumericMetricFilter());
+ for (String metricName : gaugeSet.keySet()) {
+ Number value = (Number) gaugeSet.get(metricName).getValue();
+ metrics.add(new SingleMetric(metricName, value.doubleValue(), System.currentTimeMillis()));
+ }
+
+ return metrics;
+ }
+
+ public class NonNumericMetricFilter implements MetricFilter {
+
+ @Override
+ public boolean matches(String name, Metric metric) {
+ if (name.equalsIgnoreCase("jvm.threads.deadlocks")) {
+ return false;
+ }
+ return true;
}
- return map;
}
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/6d196db0/ambari-server/src/main/java/org/apache/ambari/server/metrics/system/impl/MetricsConfiguration.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/metrics/system/impl/MetricsConfiguration.java b/ambari-server/src/main/java/org/apache/ambari/server/metrics/system/impl/MetricsConfiguration.java
new file mode 100644
index 0000000..ca83a53
--- /dev/null
+++ b/ambari-server/src/main/java/org/apache/ambari/server/metrics/system/impl/MetricsConfiguration.java
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.server.metrics.system.impl;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Properties;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class MetricsConfiguration {
+ public static final String CONFIG_FILE = "metrics.properties";
+
+ private static Logger LOG = LoggerFactory.getLogger(MetricsConfiguration.class);
+ private Properties properties;
+
+ public static MetricsConfiguration getMetricsConfiguration() {
+ Properties properties = readConfigFile();
+ if (properties == null || properties.isEmpty()) {
+ return null;
+ }
+ return new MetricsConfiguration(properties);
+ }
+
+ public MetricsConfiguration(Properties properties) {
+ this.properties = properties;
+ }
+
+ private static Properties readConfigFile() {
+ Properties properties = new Properties();
+
+ //Get property file stream from classpath
+ InputStream inputStream = MetricsConfiguration.class.getClassLoader().getResourceAsStream(CONFIG_FILE);
+
+ if (inputStream == null) {
+ LOG.info(CONFIG_FILE + " not found in classpath");
+ return null;
+ }
+
+ // load the properties
+ try {
+ properties.load(inputStream);
+ inputStream.close();
+ } catch (FileNotFoundException fnf) {
+ LOG.info("No configuration file " + CONFIG_FILE + " found in classpath.");
+ return null;
+ } catch (IOException ie) {
+ LOG.error("Can't read configuration file " + CONFIG_FILE, ie);
+ return null;
+ }
+
+ return properties;
+ }
+
+ /**
+ * Get the property value for the given key.
+ *
+ * @return the property value
+ */
+ public String getProperty(String key) {
+ return properties.getProperty(key);
+ }
+
+ /**
+ * Get the property value for the given key.
+ *
+ * @return the property value
+ */
+ public String getProperty(String key, String defaultValue) {
+ return properties.getProperty(key, defaultValue);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/6d196db0/ambari-server/src/main/java/org/apache/ambari/server/metrics/system/impl/MetricsServiceImpl.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/metrics/system/impl/MetricsServiceImpl.java b/ambari-server/src/main/java/org/apache/ambari/server/metrics/system/impl/MetricsServiceImpl.java
index 44b6204..96a51f3 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/metrics/system/impl/MetricsServiceImpl.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/metrics/system/impl/MetricsServiceImpl.java
@@ -17,35 +17,19 @@
*/
package org.apache.ambari.server.metrics.system.impl;
-
-import java.util.ArrayList;
+import java.util.Collection;
import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedHashMap;
import java.util.Map;
-import java.util.Set;
-import java.util.TreeMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.ambari.server.controller.AmbariManagementController;
-import org.apache.ambari.server.controller.internal.AbstractControllerResourceProvider;
-import org.apache.ambari.server.controller.internal.ServiceConfigVersionResourceProvider;
-import org.apache.ambari.server.controller.spi.Predicate;
-import org.apache.ambari.server.controller.spi.Request;
-import org.apache.ambari.server.controller.spi.Resource;
-import org.apache.ambari.server.controller.spi.ResourceProvider;
-import org.apache.ambari.server.controller.utilities.PredicateBuilder;
-import org.apache.ambari.server.controller.utilities.PropertyHelper;
-import org.apache.ambari.server.metrics.system.AmbariMetricSink;
import org.apache.ambari.server.metrics.system.MetricsService;
-import org.apache.ambari.server.security.authorization.internal.InternalAuthenticationToken;
-import org.apache.ambari.server.state.Cluster;
-import org.apache.ambari.server.state.Clusters;
+import org.apache.ambari.server.metrics.system.MetricsSink;
+import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.springframework.security.core.context.SecurityContextHolder;
import com.google.inject.Inject;
import com.google.inject.Singleton;
@@ -54,115 +38,89 @@ import com.google.inject.Singleton;
public class MetricsServiceImpl implements MetricsService {
private static Logger LOG = LoggerFactory.getLogger(MetricsServiceImpl.class);
private Map<String, AbstractMetricsSource> sources = new HashMap<>();
- private AmbariMetricSink sink = new AmbariMetricSinkImpl();
- private String collectorUri = "";
- private String collectorProtocol = "";
- private Configuration configuration;
+ private MetricsSink sink = null;
+ private MetricsConfiguration configuration = null;
@Inject
AmbariManagementController amc;
@Override
- public void init() {
+ public void start() {
+ LOG.info("********* Initializing AmbariServer Metrics Service **********");
try {
- configuration = new Configuration();
- if (collectorUri.isEmpty() || collectorProtocol.isEmpty()) {
- setCollectorUri();
+ configuration = MetricsConfiguration.getMetricsConfiguration();
+ if (configuration == null) {
+ return;
+ }
+ sink = new AmbariMetricSinkImpl(amc);
+ initializeMetricsSink();
+ initializeMetricSources();
+
+ if (!sink.isInitialized()) {
+ //If Sink is not initialized, Service will check for every 5 mins.
+ Executors.newScheduledThreadPool(1).scheduleWithFixedDelay(new Runnable() {
+ @Override
+ public void run() {
+ LOG.info("Checking for metrics sink initialization");
+ if (!sink.isInitialized()) {
+ initializeMetricsSink();
+ }
+ }
+ }, 5, 5, TimeUnit.MINUTES);
}
- configureSourceAndSink();
} catch (Exception e) {
- LOG.info("Error initializing MetricsService", e);
- }
-
- }
- @Override
- public void run() {
- final ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);
- for (Map.Entry<String, AbstractMetricsSource> entry : sources.entrySet()) {
- publishMetrics(executor, entry);
+ LOG.info("Unable to initialize MetricsService : ", e.getMessage());
}
}
+ private void initializeMetricsSink() {
- private void setCollectorUri() {
- InternalAuthenticationToken authenticationToken = new InternalAuthenticationToken("admin");
- authenticationToken.setAuthenticated(true);
- SecurityContextHolder.getContext().setAuthentication(authenticationToken);
- Clusters clusters = amc.getClusters();
- for (Map.Entry<String, Cluster> kv : clusters.getClusters().entrySet()) {
- String clusterName = kv.getKey();
- Resource.Type type = Resource.Type.ServiceConfigVersion;
-
- Set<String> propertyIds = new HashSet<String>();
- propertyIds.add(ServiceConfigVersionResourceProvider.SERVICE_CONFIG_VERSION_CONFIGURATIONS_PROPERTY_ID);
-
- Predicate predicate = new PredicateBuilder().property(
- ServiceConfigVersionResourceProvider.SERVICE_CONFIG_VERSION_CLUSTER_NAME_PROPERTY_ID).equals(clusterName).and().property(
- ServiceConfigVersionResourceProvider.SERVICE_CONFIG_VERSION_SERVICE_NAME_PROPERTY_ID).equals("AMBARI_METRICS").and().property(
- ServiceConfigVersionResourceProvider.SERVICE_CONFIG_VERSION_IS_CURRENT_PROPERTY_ID).equals("true").toPredicate();
-
- Request request = PropertyHelper.getReadRequest(propertyIds);
-
- ResourceProvider provider = AbstractControllerResourceProvider.getResourceProvider(
- type,
- PropertyHelper.getPropertyIds(type),
- PropertyHelper.getKeyPropertyIds(type),
- amc);
-
- try {
- Set<Resource> resources = provider.getResources(request, predicate);
-
- // get collector uri
- for (Resource resource : resources) {
- if (resource != null) {
- ArrayList<LinkedHashMap<Object, Object>> configs = (ArrayList<LinkedHashMap<Object, Object>>)
- resource.getPropertyValue(ServiceConfigVersionResourceProvider.SERVICE_CONFIG_VERSION_CONFIGURATIONS_PROPERTY_ID);
- for (LinkedHashMap<Object, Object> config : configs) {
- if (config != null && config.get("type").equals("ams-site")) {
- TreeMap<Object, Object> properties = (TreeMap<Object, Object>) config.get("properties");
- collectorUri = (String) properties.get("timeline.metrics.service.webapp.address");
- String which_protocol = (String) properties.get("timeline.metrics.service.http.policy");
- collectorProtocol = which_protocol.equals("HTTP_ONLY") ? "http" : "https";
- break;
- }
- }
- }
- }
- } catch (Exception e) {
- LOG.info("Throwing exception when retrieving Collector URI", e);
- }
- }
+ LOG.info("********* Configuring Metric Sink **********");
+ sink.init(configuration);
}
- private void configureSourceAndSink() {
+ private void initializeMetricSources() {
try {
- LOG.info("********* Configuring Ambari Metrics Sink and Source**********");
- int frequency = Integer.parseInt(configuration.getProperty("sink.frequency", "10")); // default value 10
- sink.init(collectorProtocol, collectorUri, frequency);
- String[] sourceNames = configuration.getProperty("metrics.sources").split(",");
+
+ LOG.info("********* Configuring Metric Sources **********");
+ String commaSeparatedSources = configuration.getProperty("metric.sources");
+
+ if (StringUtils.isEmpty(commaSeparatedSources)) {
+ LOG.info("No sources configured.");
+ return;
+ }
+
+ String[] sourceNames = commaSeparatedSources.split(",");
for (String sourceName: sourceNames) {
String className = configuration.getProperty("source." + sourceName + ".class");
Class t = Class.forName(className);
AbstractMetricsSource src = (AbstractMetricsSource)t.newInstance();
- src.init(sink);
+ src.init(configuration, sink);
sources.put(sourceName, src);
}
- }
- catch (Exception e) {
- LOG.info("Throwing exception when registering metric sink and source", e);
+
+ final ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);
+ for (Map.Entry<String, AbstractMetricsSource> entry : sources.entrySet()) {
+ startSource(executor, entry);
+ }
+ } catch (Exception e) {
+ LOG.error("Error when configuring metric sink and source", e);
}
}
- private void publishMetrics(ScheduledExecutorService executor, Map.Entry<String, AbstractMetricsSource> entry) {
+ private void startSource(ScheduledExecutorService executor, Map.Entry<String, AbstractMetricsSource> entry) {
String className = entry.getKey();
AbstractMetricsSource source = entry.getValue();
String interval = "source." + className + ".interval";
- int duration = Integer.parseInt(configuration.getProperty(interval, "5")); // default value 5
+ int duration = Integer.parseInt(configuration.getProperty(interval, "10")); // default value 10 seconds
try {
executor.scheduleWithFixedDelay(source, 0, duration, TimeUnit.SECONDS);
-
} catch (Exception e) {
- LOG.info("Throwing exception when failing scheduling source", e);
+ LOG.info("Throwing exception when starting metric source", e);
}
}
+
+ public Collection<AbstractMetricsSource> getSources() {
+ return sources.values();
+ }
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/6d196db0/ambari-server/src/test/java/org/apache/ambari/server/metric/system/impl/JvmMetricsSourceTest.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/metric/system/impl/JvmMetricsSourceTest.java b/ambari-server/src/test/java/org/apache/ambari/server/metric/system/impl/JvmMetricsSourceTest.java
new file mode 100644
index 0000000..a6aa5d5
--- /dev/null
+++ b/ambari-server/src/test/java/org/apache/ambari/server/metric/system/impl/JvmMetricsSourceTest.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.server.metric.system.impl;
+
+import org.apache.ambari.server.metrics.system.MetricsSink;
+import org.apache.ambari.server.metrics.system.impl.JvmMetricsSource;
+import org.apache.ambari.server.metrics.system.impl.MetricsConfiguration;
+import org.junit.Test;
+
+public class JvmMetricsSourceTest {
+
+ @Test
+ public void testJvmSourceInit() {
+ JvmMetricsSource jvmMetricsSource = new JvmMetricsSource();
+ MetricsConfiguration configuration = MetricsConfiguration.getMetricsConfiguration();
+ MetricsSink sink = new TestAmbariMetricsSinkImpl();
+ jvmMetricsSource.init(configuration, sink);
+ org.junit.Assert.assertEquals(jvmMetricsSource.getMetrics().size(), 39);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/6d196db0/ambari-server/src/test/java/org/apache/ambari/server/metric/system/impl/MetricsServiceTest.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/metric/system/impl/MetricsServiceTest.java b/ambari-server/src/test/java/org/apache/ambari/server/metric/system/impl/MetricsServiceTest.java
new file mode 100644
index 0000000..67d80ee
--- /dev/null
+++ b/ambari-server/src/test/java/org/apache/ambari/server/metric/system/impl/MetricsServiceTest.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.server.metric.system.impl;
+
+import org.apache.ambari.server.metrics.system.MetricsService;
+import org.apache.ambari.server.metrics.system.impl.AbstractMetricsSource;
+import org.apache.ambari.server.metrics.system.impl.JvmMetricsSource;
+import org.apache.ambari.server.metrics.system.impl.MetricsServiceImpl;
+import org.junit.Test;
+
+import junit.framework.Assert;
+
+public class MetricsServiceTest {
+
+ @Test
+ public void testMetricsServiceStart() {
+ MetricsService metricsService = new MetricsServiceImpl();
+ metricsService.start();
+ Assert.assertTrue(metricsService.getSources().size() == 2);
+ for (AbstractMetricsSource source : metricsService.getSources()) {
+ Assert.assertTrue ( source instanceof JvmMetricsSource || source instanceof TestMetricsSource);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/6d196db0/ambari-server/src/test/java/org/apache/ambari/server/metric/system/impl/TestAmbariMetricsSinkImpl.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/metric/system/impl/TestAmbariMetricsSinkImpl.java b/ambari-server/src/test/java/org/apache/ambari/server/metric/system/impl/TestAmbariMetricsSinkImpl.java
new file mode 100644
index 0000000..c4ba97e
--- /dev/null
+++ b/ambari-server/src/test/java/org/apache/ambari/server/metric/system/impl/TestAmbariMetricsSinkImpl.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.server.metric.system.impl;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.ambari.server.metrics.system.MetricsSink;
+import org.apache.ambari.server.metrics.system.SingleMetric;
+import org.apache.ambari.server.metrics.system.impl.MetricsConfiguration;
+import org.apache.hadoop.metrics2.sink.timeline.AbstractTimelineMetricsSink;
+
+public class TestAmbariMetricsSinkImpl extends AbstractTimelineMetricsSink implements MetricsSink {
+
+ @Override
+ public void publish(List<SingleMetric> metrics) {
+ }
+
+ @Override
+ public boolean isInitialized() {
+ return true;
+ }
+
+ @Override
+ protected String getCollectorUri(String host) {
+ return constructContainerMetricUri(getCollectorProtocol(), host, getCollectorPort());
+ }
+
+ @Override
+ protected String getCollectorProtocol() {
+ return "http";
+ }
+
+ @Override
+ protected String getCollectorPort() {
+ return "6188";
+ }
+
+ @Override
+ protected int getTimeoutSeconds() {
+ return 1000;
+ }
+
+ @Override
+ protected String getZookeeperQuorum() {
+ return null;
+ }
+
+ @Override
+ protected Collection<String> getConfiguredCollectorHosts() {
+ return Collections.singletonList("localhost");
+ }
+
+ @Override
+ protected String getHostname() {
+ return "localhost";
+ }
+
+ @Override
+ public void init(MetricsConfiguration configuration) {
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/6d196db0/ambari-server/src/test/java/org/apache/ambari/server/metric/system/impl/TestMetricsSource.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/metric/system/impl/TestMetricsSource.java b/ambari-server/src/test/java/org/apache/ambari/server/metric/system/impl/TestMetricsSource.java
new file mode 100644
index 0000000..c6ce90e
--- /dev/null
+++ b/ambari-server/src/test/java/org/apache/ambari/server/metric/system/impl/TestMetricsSource.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.server.metric.system.impl;
+
+import java.util.List;
+
+import org.apache.ambari.server.metrics.system.SingleMetric;
+import org.apache.ambari.server.metrics.system.impl.AbstractMetricsSource;
+
+public class TestMetricsSource extends AbstractMetricsSource {
+
+ @Override
+ public List<SingleMetric> getMetrics() {
+ return null;
+ }
+
+ @Override
+ public void run() {
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/6d196db0/ambari-server/src/test/resources/metrics.properties
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/resources/metrics.properties b/ambari-server/src/test/resources/metrics.properties
new file mode 100644
index 0000000..5eee064
--- /dev/null
+++ b/ambari-server/src/test/resources/metrics.properties
@@ -0,0 +1,29 @@
+# Copyright 2011 The Apache Software Foundation
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+#### Source Configs #####
+# Source interval determines how often the metric is sent to sink. Its unit is in seconds
+metric.sources=jvm,testsource
+
+source.jvm.interval=10
+source.jvm.class=org.apache.ambari.server.metrics.system.impl.JvmMetricsSource
+source.testsource.class=org.apache.ambari.server.metric.system.impl.TestMetricsSource
+
+#### Sink Configs #####
+# Sink frequency determines how often the sink publish the metrics from buffer to AMS.
\ No newline at end of file