You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by nc...@apache.org on 2016/02/24 15:24:21 UTC
[6/7] ambari git commit: AMBARI-15148 Implement AMS collector
certificate verification on monitor and service check sides (dsen)
AMBARI-15148 Implement AMS collector certificate verification on monitor and service check sides (dsen)
Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/6e002b25
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/6e002b25
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/6e002b25
Branch: refs/heads/branch-dev-patch-upgrade
Commit: 6e002b25c891c04d4d4308071ffab865c624c942
Parents: 1c56114
Author: Dmytro Sen <ds...@apache.org>
Authored: Wed Feb 24 15:54:21 2016 +0200
Committer: Dmytro Sen <ds...@apache.org>
Committed: Wed Feb 24 15:54:21 2016 +0200
----------------------------------------------------------------------
.../conf/unix/metric_monitor.ini | 7 +-
.../src/main/python/core/config_reader.py | 36 +++++--
.../src/main/python/core/controller.py | 1 -
.../src/main/python/core/emitter.py | 34 ++++---
.../src/main/python/core/security.py | 98 ++++++++++++++++++++
.../src/test/python/core/TestEmitter.py | 65 +++++++------
.../AMBARI_METRICS/0.1.0/package/scripts/ams.py | 40 +++++++-
.../0.1.0/package/scripts/functions.py | 4 -
.../package/scripts/metrics_grafana_util.py | 58 ++++++++----
.../0.1.0/package/scripts/network.py | 39 ++++++++
.../0.1.0/package/scripts/params.py | 4 +-
.../0.1.0/package/scripts/service_check.py | 24 +++--
.../package/templates/metric_monitor.ini.j2 | 5 +-
.../metrics_grafana_datasource.json.j2 | 2 +-
.../AMBARI_METRICS/test_metrics_collector.py | 10 ++
.../AMBARI_METRICS/test_metrics_grafana.py | 12 ++-
.../python/stacks/2.0.6/configs/default.json | 10 ++
17 files changed, 355 insertions(+), 94 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ambari/blob/6e002b25/ambari-metrics/ambari-metrics-host-monitoring/conf/unix/metric_monitor.ini
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-host-monitoring/conf/unix/metric_monitor.ini b/ambari-metrics/ambari-metrics-host-monitoring/conf/unix/metric_monitor.ini
index 5952982..3e5d861 100644
--- a/ambari-metrics/ambari-metrics-host-monitoring/conf/unix/metric_monitor.ini
+++ b/ambari-metrics/ambari-metrics-host-monitoring/conf/unix/metric_monitor.ini
@@ -18,11 +18,9 @@
[default]
debug_level = INFO
-metrics_server = localhost:{{ams_collector_port}}
-hostname = {{hostname}}
+hostname = localhost
enable_time_threshold = false
enable_value_threshold = false
-https_enabled = false
[emitter]
send_interval = 60
@@ -30,3 +28,6 @@ send_interval = 60
[collector]
collector_sleep_interval = 5
max_queue_size = 5000
+host = localhost
+port = 6188
+https_enabled = false
http://git-wip-us.apache.org/repos/asf/ambari/blob/6e002b25/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/core/config_reader.py
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/core/config_reader.py b/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/core/config_reader.py
index d533537..a053955 100644
--- a/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/core/config_reader.py
+++ b/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/core/config_reader.py
@@ -34,30 +34,38 @@ class ConfigDefaults(object):
pass
def get_metric_file_path(self):
pass
+ def get_ca_certs_file_path(self):
+ pass
@OsFamilyImpl(os_family=OSConst.WINSRV_FAMILY)
class ConfigDefaultsWindows(ConfigDefaults):
def __init__(self):
self._CONFIG_FILE_PATH = "conf\\metric_monitor.ini"
self._METRIC_FILE_PATH = "conf\\metric_groups.conf"
+ self._METRIC_FILE_PATH = "conf\\ca.pem"
pass
def get_config_file_path(self):
return self._CONFIG_FILE_PATH
def get_metric_file_path(self):
return self._METRIC_FILE_PATH
+ def get_ca_certs_file_path(self):
+ return self._CA_CERTS_FILE_PATH
@OsFamilyImpl(os_family=OsFamilyImpl.DEFAULT)
class ConfigDefaultsLinux(ConfigDefaults):
def __init__(self):
self._CONFIG_FILE_PATH = "/etc/ambari-metrics-monitor/conf/metric_monitor.ini"
self._METRIC_FILE_PATH = "/etc/ambari-metrics-monitor/conf/metric_groups.conf"
+ self._CA_CERTS_FILE_PATH = "/etc/ambari-metrics-monitor/conf/ca.pem"
pass
def get_config_file_path(self):
return self._CONFIG_FILE_PATH
def get_metric_file_path(self):
return self._METRIC_FILE_PATH
+ def get_ca_certs_file_path(self):
+ return self._CA_CERTS_FILE_PATH
configDefaults = ConfigDefaults()
@@ -65,6 +73,7 @@ config = ConfigParser.RawConfigParser()
CONFIG_FILE_PATH = configDefaults.get_config_file_path()
METRIC_FILE_PATH = configDefaults.get_metric_file_path()
+CA_CERTS_FILE_PATH = configDefaults.get_ca_certs_file_path()
OUT_DIR = os.path.join(os.sep, "var", "log", "ambari-metrics-host-monitoring")
SERVER_OUT_FILE = OUT_DIR + os.sep + "ambari-metrics-host-monitoring.out"
@@ -88,10 +97,8 @@ AMBARI_AGENT_CONF = '/etc/ambari-agent/conf/ambari-agent.ini'
config_content = """
[default]
debug_level = INFO
-metrics_server = host:port
enable_time_threshold = false
enable_value_threshold = false
-https_enabled = false
[emitter]
send_interval = 60
@@ -99,6 +106,9 @@ send_interval = 60
[collector]
collector_sleep_interval = 5
max_queue_size = 5000
+host = localhost
+port = 6188
+https_enabled = false
"""
metric_group_info = """
@@ -162,7 +172,7 @@ class Configuration:
'process_metric_groups': []
}
pass
-
+ self._ca_cert_file_path = CA_CERTS_FILE_PATH
self.hostname_script = None
ambari_agent_config = ConfigParser.RawConfigParser()
if os.path.exists(AMBARI_AGENT_CONF):
@@ -193,9 +203,6 @@ class Configuration:
def get_collector_sleep_interval(self):
return int(self.get("collector", "collector_sleep_interval", 5))
- def get_server_address(self):
- return self.get("default", "metrics_server")
-
def get_hostname_config(self):
return self.get("default", "hostname", None)
@@ -211,5 +218,18 @@ class Configuration:
def get_max_queue_size(self):
return int(self.get("collector", "max_queue_size", 5000))
- def get_server_https_enabled(self):
- return "true" == str(self.get("default", "https_enabled")).lower()
+ def is_server_https_enabled(self):
+ return "true" == str(self.get("collector", "https_enabled")).lower()
+
+ def get_server_host(self):
+ return self.get("collector", "host")
+
+ def get_server_port(self):
+ try:
+ return int(self.get("collector", "port"))
+ except:
+ return 6188
+
+ def get_ca_certs(self):
+ return self._ca_cert_file_path
+
http://git-wip-us.apache.org/repos/asf/ambari/blob/6e002b25/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/core/controller.py
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/core/controller.py b/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/core/controller.py
index 1713501..c04a61b 100644
--- a/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/core/controller.py
+++ b/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/core/controller.py
@@ -46,7 +46,6 @@ class Controller(threading.Thread):
hostinfo.get_ip_address())
self.event_queue = Queue(config.get_max_queue_size())
self.metric_collector = MetricsCollector(self.event_queue, self.application_metric_map, hostinfo)
- self.server_url = config.get_server_address()
self.sleep_interval = config.get_collector_sleep_interval()
self._stop_handler = stop_handler
self.initialize_events_cache()
http://git-wip-us.apache.org/repos/asf/ambari/blob/6e002b25/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/core/emitter.py
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/core/emitter.py b/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/core/emitter.py
index 4e39ab5..6997108 100644
--- a/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/core/emitter.py
+++ b/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/core/emitter.py
@@ -20,12 +20,13 @@ limitations under the License.
import logging
import threading
-import urllib2
+
+from security import CachedHTTPSConnection, CachedHTTPConnection
logger = logging.getLogger()
class Emitter(threading.Thread):
- COLLECTOR_URL = "{0}://{1}/ws/v1/timeline/metrics"
+ AMS_METRICS_POST_URL = "/ws/v1/timeline/metrics/"
RETRY_SLEEP_INTERVAL = 5
MAX_RETRY_COUNT = 3
"""
@@ -39,8 +40,16 @@ class Emitter(threading.Thread):
self._stop_handler = stop_handler
self.application_metric_map = application_metric_map
# TODO verify certificate
- protocol = 'https' if config.get_server_https_enabled() else 'http'
- self.collector_url = self.COLLECTOR_URL.format(protocol, config.get_server_address())
+ timeout = int(self.send_interval - 10)
+ if config.is_server_https_enabled():
+ self.connection = CachedHTTPSConnection(config.get_server_host(),
+ config.get_server_port(),
+ timeout=timeout,
+ ca_certs=config.get_ca_certs())
+ else:
+ self.connection = CachedHTTPConnection(config.get_server_host(),
+ config.get_server_port(),
+ timeout=timeout)
def run(self):
logger.info('Running Emitter thread: %s' % threading.currentThread().getName())
@@ -75,7 +84,7 @@ class Emitter(threading.Thread):
logger.warn('Error sending metrics to server. %s' % str(e))
pass
- if response and response.getcode() == 200:
+ if response and response.status == 200:
retry_count = self.MAX_RETRY_COUNT
else:
logger.warn("Retrying after {0} ...".format(self.RETRY_SLEEP_INTERVAL))
@@ -87,14 +96,15 @@ class Emitter(threading.Thread):
pass
# TODO verify certificate
def push_metrics(self, data):
- headers = {"Content-Type" : "application/json", "Accept" : "*/*"}
- logger.info("server: %s" % self.collector_url)
+ headers = {"Content-Type" : "application/json",
+ "Accept" : "*/*",
+ "Connection":" Keep-Alive"}
logger.debug("message to sent: %s" % data)
- req = urllib2.Request(self.collector_url, data, headers)
- response = urllib2.urlopen(req, timeout=int(self.send_interval - 10))
+ self.connection.request("POST", self.AMS_METRICS_POST_URL, data, headers)
+ response = self.connection.getresponse()
if response:
- logger.debug("POST response from server: retcode = {0}".format(response.getcode()))
+ logger.debug("POST response from server: retcode = {0}, reason = {1}"
+ .format(response.status, response.reason))
logger.debug(str(response.read()))
- pass
- return response
+ return response
http://git-wip-us.apache.org/repos/asf/ambari/blob/6e002b25/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/core/security.py
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/core/security.py b/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/core/security.py
new file mode 100644
index 0000000..e36e01d
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/core/security.py
@@ -0,0 +1,98 @@
+#!/usr/bin/env python
+
+'''
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+'''
+
+import logging
+import ssl
+import socket
+import httplib
+
+logger = logging.getLogger()
+
+# TODO merge this with security.py in ambari-agent and move to ambrari commons
+
+class VerifiedHTTPSConnection(httplib.HTTPSConnection):
+ """ Connecting using ssl wrapped sockets """
+
+ def __init__(self, host, port, timeout, ca_certs):
+ httplib.HTTPSConnection.__init__(self, host, port=port, timeout=timeout)
+ self.ca_certs = ca_certs
+
+ def connect(self):
+
+ try:
+ sock = self.create_connection()
+ self.sock = ssl.wrap_socket(sock, cert_reqs=ssl.CERT_REQUIRED,
+ ca_certs=self.ca_certs)
+ logger.info('SSL connection established.')
+ except (ssl.SSLError, AttributeError) as ex:
+ logger.info('Insecure connection to https://{0}:{1}/ failed'
+ .format(self.host, self.port))
+
+ def create_connection(self):
+ if self.sock:
+ self.sock.close()
+ logger.info("SSL Connect being called.. connecting to https://{0}:{1}/"
+ .format(self.host, self.port))
+ sock = socket.create_connection((self.host, self.port), timeout=self.timeout)
+ sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
+ if self._tunnel_host:
+ self.sock = sock
+ self._tunnel()
+
+ return sock
+
+class CachedHTTPConnection:
+ """ Caches a socket and uses a single http connection to the server. """
+
+ def __init__(self, host, port, timeout):
+ self.connected = False
+ self.host = host
+ self.port = port
+ self.timeout = timeout
+
+ def connect(self):
+ if not self.connected:
+ self.httpconn = self.create_connection()
+ self.httpconn.connect()
+ self.connected = True
+
+ def request(self, method, url, body=None, headers={}):
+ self.connect()
+ try:
+ return self.httpconn.request(method, url, body, headers)
+ except Exception as e:
+ self.connected = False
+ raise e
+
+ def getresponse(self):
+ return self.httpconn.getresponse()
+
+ def create_connection(self):
+ return httplib.HTTPConnection(self.host, self.port, self.timeout)
+
+class CachedHTTPSConnection(CachedHTTPConnection):
+ """ Caches an ssl socket and uses a single https connection to the server. """
+
+ def __init__(self, host, port, timeout, ca_certs):
+ self.ca_certs = ca_certs
+ CachedHTTPConnection.__init__(self, host, port, timeout)
+
+ def create_connection(self):
+ return VerifiedHTTPSConnection(self.host, self.port, self.timeout, self.ca_certs)
http://git-wip-us.apache.org/repos/asf/ambari/blob/6e002b25/ambari-metrics/ambari-metrics-host-monitoring/src/test/python/core/TestEmitter.py
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-host-monitoring/src/test/python/core/TestEmitter.py b/ambari-metrics/ambari-metrics-host-monitoring/src/test/python/core/TestEmitter.py
index 8f5236a..be0608f 100644
--- a/ambari-metrics/ambari-metrics-host-monitoring/src/test/python/core/TestEmitter.py
+++ b/ambari-metrics/ambari-metrics-host-monitoring/src/test/python/core/TestEmitter.py
@@ -19,34 +19,37 @@ limitations under the License.
'''
import json
-import urllib2
-
import logging
-from unittest import TestCase
-from only_for_platform import get_platform, os_distro_value, PLATFORM_WINDOWS
-
-from ambari_commons.os_check import OSCheck
+from unittest import TestCase
+from only_for_platform import get_platform, PLATFORM_WINDOWS
from mock.mock import patch, MagicMock
+from security import CachedHTTPConnection
+
+if get_platform() != PLATFORM_WINDOWS:
+ os_distro_value = ('Suse','11','Final')
+else:
+ os_distro_value = ('win2012serverr2','6.3','WindowsServer')
with patch("platform.linux_distribution", return_value = os_distro_value):
from ambari_commons import OSCheck
- from core.application_metric_map import ApplicationMetricMap
- from core.config_reader import Configuration
- from core.emitter import Emitter
- from core.stop_handler import bind_signal_handlers
+ from application_metric_map import ApplicationMetricMap
+ from config_reader import Configuration
+ from emitter import Emitter
+ from stop_handler import bind_signal_handlers
logger = logging.getLogger()
class TestEmitter(TestCase):
@patch.object(OSCheck, "os_distribution", new = MagicMock(return_value = os_distro_value))
- @patch("urllib2.urlopen")
- def testJavaHomeAvailableCheck(self, url_open_mock):
- url_open_mock.return_value = MagicMock()
- url_open_mock.return_value.getcode.return_value = 200
- self.assertEqual(urllib2.urlopen(None, None).getcode(), 200)
- url_open_mock.reset_mock()
+ @patch.object(CachedHTTPConnection, "create_connection", new = MagicMock())
+ @patch.object(CachedHTTPConnection, "request")
+ @patch.object(CachedHTTPConnection, "getresponse")
+ def test_submit_metrics(self, getresponse_mock, request_mock):
+ request_mock.return_value = MagicMock()
+ getresponse_mock.return_value = MagicMock()
+ getresponse_mock.return_value.status = 200
stop_handler = bind_signal_handlers()
@@ -56,16 +59,20 @@ class TestEmitter(TestCase):
application_metric_map.put_metric("APP1", {"metric1":1}, 1)
emitter = Emitter(config, application_metric_map, stop_handler)
emitter.submit_metrics()
-
- self.assertEqual(url_open_mock.call_count, 1)
- self.assertUrlData(url_open_mock)
+
+ self.assertEqual(request_mock.call_count, 1)
+ self.assertUrlData(request_mock)
@patch.object(OSCheck, "os_distribution", new = MagicMock(return_value = os_distro_value))
- @patch("urllib2.urlopen")
- def testRetryFetch(self, url_open_mock):
+ @patch.object(CachedHTTPConnection, "create_connection", new = MagicMock())
+ @patch.object(CachedHTTPConnection, "getresponse", new = MagicMock())
+ @patch.object(CachedHTTPConnection, "request")
+ def testRetryFetch(self, request_mock):
stop_handler = bind_signal_handlers()
+ request_mock.return_value = MagicMock()
+
config = Configuration()
application_metric_map = ApplicationMetricMap("host","10.10.10.10")
application_metric_map.clear()
@@ -73,15 +80,15 @@ class TestEmitter(TestCase):
emitter = Emitter(config, application_metric_map, stop_handler)
emitter.RETRY_SLEEP_INTERVAL = .001
emitter.submit_metrics()
-
- self.assertEqual(url_open_mock.call_count, 3)
- self.assertUrlData(url_open_mock)
-
- def assertUrlData(self, url_open_mock):
- self.assertEqual(len(url_open_mock.call_args), 2)
- data = url_open_mock.call_args[0][0].data
+
+ self.assertEqual(request_mock.call_count, 3)
+ self.assertUrlData(request_mock)
+
+ def assertUrlData(self, request_mock):
+ self.assertEqual(len(request_mock.call_args), 2)
+ data = request_mock.call_args[0][2]
self.assertTrue(data is not None)
-
+
metrics = json.loads(data)
self.assertEqual(len(metrics['metrics']), 1)
self.assertEqual(metrics['metrics'][0]['metricname'],'metric1')
http://git-wip-us.apache.org/repos/asf/ambari/blob/6e002b25/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/scripts/ams.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/scripts/ams.py b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/scripts/ams.py
index 417574b..c8c3b6d 100644
--- a/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/scripts/ams.py
+++ b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/scripts/ams.py
@@ -333,6 +333,9 @@ def ams(name=None):
mode=0644
)
+ if params.metric_collector_https_enabled:
+ export_ca_certs(params.ams_collector_conf_dir)
+
pass
elif name == 'monitor':
@@ -384,7 +387,9 @@ def ams(name=None):
content=InlineTemplate(params.ams_env_sh_template)
)
- # TODO
+ if params.metric_collector_https_enabled:
+ export_ca_certs(params.ams_monitor_conf_dir)
+
pass
elif name == 'grafana':
@@ -415,6 +420,39 @@ def ams(name=None):
content=InlineTemplate(params.ams_grafana_ini_template)
)
+ if params.metric_collector_https_enabled:
+ export_ca_certs(params.ams_grafana_conf_dir)
+
pass
+def export_ca_certs(dir_path):
+ # export ca certificates on every restart to handle changed truststore content
+
+ import params
+ import tempfile
+
+ ca_certs_path = os.path.join(dir_path, params.metric_truststore_ca_certs)
+ truststore = params.metric_truststore_path
+
+ tmpdir = tempfile.mkdtemp()
+ truststore_p12 = os.path.join(tmpdir,'truststore.p12')
+
+ if (params.metric_truststore_type.lower() == 'jks'):
+ # Convert truststore from JKS to PKCS12
+ cmd = format("{sudo} {java64_home}/bin/keytool -importkeystore -srckeystore {metric_truststore_path} -destkeystore {truststore_p12} -deststoretype PKCS12 -srcstorepass {metric_truststore_password} -deststorepass {metric_truststore_password}")
+ Execute(cmd,
+ )
+ truststore = truststore_p12
+
+ # Export all CA certificates from the truststore to the conf directory
+ cmd = format("{sudo} openssl pkcs12 -in {truststore} -out {ca_certs_path} -cacerts -nokeys -passin pass:{metric_truststore_password}")
+ Execute(cmd,
+ )
+ Execute(('chown', params.ams_user, ca_certs_path),
+ sudo=True
+ )
+ Execute(format('{sudo} rm -rf {tmpdir}')
+ )
+
+
pass
http://git-wip-us.apache.org/repos/asf/ambari/blob/6e002b25/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/scripts/functions.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/scripts/functions.py b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/scripts/functions.py
index 140c24c..e3b3a48 100644
--- a/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/scripts/functions.py
+++ b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/scripts/functions.py
@@ -18,12 +18,8 @@ limitations under the License.
"""
-import os
import re
import math
-import datetime
-
-from resource_management.core.shell import checked_call
def calc_xmn_from_xms(heapsize_str, xmn_percent, xmn_max):
"""
http://git-wip-us.apache.org/repos/asf/ambari/blob/6e002b25/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/scripts/metrics_grafana_util.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/scripts/metrics_grafana_util.py b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/scripts/metrics_grafana_util.py
index 02caa11..37d403d 100644
--- a/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/scripts/metrics_grafana_util.py
+++ b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/scripts/metrics_grafana_util.py
@@ -17,14 +17,15 @@ See the License for the specific language governing permissions and
limitations under the License.
"""
+import httplib
from resource_management.core.logger import Logger
from resource_management.core.base import Fail
from resource_management import Template
-import httplib
import time
import socket
import json
+import network
def create_ams_datasource():
@@ -33,16 +34,21 @@ def create_ams_datasource():
GRAFANA_CONNECT_TIMEOUT = 15
GRAFANA_URL = "/api/datasources"
METRICS_GRAFANA_DATASOURCE_NAME = "AMBARI_METRICS"
-
+ grafana_https_enabled = params.ams_grafana_protocol.lower() == 'https'
headers = {"Content-type": "application/json"}
+ ams_datasource_json = Template('metrics_grafana_datasource.json.j2',
+ ams_datasource_name=METRICS_GRAFANA_DATASOURCE_NAME)\
+ .get_content()
+
Logger.info("Checking if AMS Grafana datasource already exists")
Logger.info("Connecting (GET) to %s:%s%s" % (params.hostname,
params.ams_grafana_port,
GRAFANA_URL))
-# TODO add https support
- conn = httplib.HTTPConnection(params.hostname,
- int(params.ams_grafana_port))
+
+ conn = network.get_http_connection(params.hostname,
+ int(params.ams_grafana_port),
+ grafana_https_enabled)
conn.request("GET", GRAFANA_URL)
response = conn.getresponse()
@@ -58,7 +64,7 @@ def create_ams_datasource():
Logger.info("Ambari Metrics Grafana datasource already present. Checking Metrics Collector URL")
datasource_url = datasources_json[i]["url"]
- if datasource_url == (params.ams_grafana_protocol + "://"
+ if datasource_url == (params.metric_collector_protocol + "://"
+ params.metric_collector_host + ":"
+ params.metric_collector_port):
Logger.info("Metrics Collector URL validation succeeded. Skipping datasource creation")
@@ -69,10 +75,31 @@ def create_ams_datasource():
Logger.info("Metrics Collector URL validation failed.")
datasource_id = datasources_json[i]["id"]
Logger.info("Deleting obselete Metrics datasource.")
- conn = httplib.HTTPConnection(params.hostname, int(params.ams_grafana_port))
- conn.request("DELETE", GRAFANA_URL + "/" + str(datasource_id))
+ conn = network.get_http_connection(params.hostname,
+ int(params.ams_grafana_port),
+ grafana_https_enabled)
+ conn.request("PUT", GRAFANA_URL + "/" + str(datasource_id), ams_datasource_json, headers)
response = conn.getresponse()
- Logger.info("Http response: %s %s" % (response.status, response.reason))
+ data = response.read()
+ Logger.info("Http data: %s" % data)
+ conn.close()
+
+ if response.status == 200:
+ Logger.info("Ambari Metrics Grafana data source updated.")
+ GRAFANA_CONNECT_TRIES = 0 # No need to create datasource again
+ elif response.status == 500:
+ Logger.info("Ambari Metrics Grafana data source update failed. Not retrying.")
+ raise Fail("Ambari Metrics Grafana data source update failed. PUT request status: %s %s \n%s" %
+ (response.status, response.reason, data))
+ else:
+ Logger.info("Ambari Metrics Grafana data source update failed.")
+ if i < GRAFANA_CONNECT_TRIES - 1:
+ time.sleep(GRAFANA_CONNECT_TIMEOUT)
+ Logger.info("Next retry in %s seconds."
+ % (GRAFANA_CONNECT_TIMEOUT))
+ else:
+ raise Fail("Ambari Metrics Grafana data source creation failed. POST request status: %s %s \n%s" %
+ (response.status, response.reason, data))
break
else:
@@ -83,19 +110,15 @@ def create_ams_datasource():
for i in xrange(0, GRAFANA_CONNECT_TRIES):
try:
- ams_datasource_json = Template('metrics_grafana_datasource.json.j2',
- ams_datasource_name=METRICS_GRAFANA_DATASOURCE_NAME,
- ams_grafana_protocol=params.ams_grafana_protocol,
- ams_collector_host=params.metric_collector_host,
- ams_collector_port=params.metric_collector_port).get_content()
Logger.info("Generated datasource:\n%s" % ams_datasource_json)
Logger.info("Connecting (POST) to %s:%s%s" % (params.hostname,
params.ams_grafana_port,
GRAFANA_URL))
- conn = httplib.HTTPConnection(params.hostname,
- int(params.ams_grafana_port))
+ conn = network.get_http_connection(params.hostname,
+ int(params.ams_grafana_port),
+ grafana_https_enabled)
conn.request("POST", GRAFANA_URL, ams_datasource_json, headers)
response = conn.getresponse()
@@ -129,5 +152,4 @@ def create_ams_datasource():
else:
raise Fail("Ambari Metrics Grafana data source creation failed. POST request status: %s %s \n%s" %
(response.status, response.reason, data))
-
-
+ pass
http://git-wip-us.apache.org/repos/asf/ambari/blob/6e002b25/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/scripts/network.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/scripts/network.py b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/scripts/network.py
new file mode 100644
index 0000000..672ee53
--- /dev/null
+++ b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/scripts/network.py
@@ -0,0 +1,39 @@
+#!/usr/bin/env python
+"""
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+"""
+
+import httplib
+import ssl
+
+from resource_management.core.exceptions import Fail
+
+def get_http_connection(host, port, https_enabled=False, ca_certs=None):
+ if https_enabled:
+ if ca_certs:
+ check_ssl_certificate(host, port, ca_certs)
+ return httplib.HTTPSConnection(host, port)
+ else:
+ return httplib.HTTPConnection(host, port)
+
+def check_ssl_certificate(host, port, ca_certs):
+ try:
+ ssl.get_server_certificate((host, port), ssl_version=ssl.PROTOCOL_SSLv23, ca_certs=ca_certs)
+ except (ssl.SSLError) as ssl_error:
+ raise Fail("Failed to verify the SSL certificate for AMS Collector https://{0}:{1} with CA certificate in {2}"
+ .format(host, port, ca_certs))
http://git-wip-us.apache.org/repos/asf/ambari/blob/6e002b25/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/scripts/params.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/scripts/params.py b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/scripts/params.py
index f1ff998..58ff71c 100644
--- a/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/scripts/params.py
+++ b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/scripts/params.py
@@ -59,6 +59,7 @@ else:
metric_truststore_path= default("/configurations/ams-ssl-client/ssl.client.truststore.location", "")
metric_truststore_type= default("/configurations/ams-ssl-client/ssl.client.truststore.type", "")
metric_truststore_password= default("/configurations/ams-ssl-client/ssl.client.truststore.password", "")
+metric_truststore_ca_certs='ca.pem'
if 'cluster-env' in config['configurations'] and \
'metrics_collector_vip_host' in config['configurations']['cluster-env']:
@@ -76,9 +77,11 @@ else:
metric_collector_port = '6188'
ams_collector_log_dir = config['configurations']['ams-env']['metrics_collector_log_dir']
+ams_collector_conf_dir = "/etc/ambari-metrics-collector/conf"
ams_monitor_log_dir = config['configurations']['ams-env']['metrics_monitor_log_dir']
ams_monitor_dir = "/usr/lib/python2.6/site-packages/resource_monitoring"
+ams_monitor_conf_dir = "/etc/ambari-metrics-monitor/conf"
ams_monitor_pid_dir = status_params.ams_monitor_pid_dir
ams_monitor_script = "/usr/sbin/ambari-metrics-monitor"
@@ -177,7 +180,6 @@ else:
zookeeper_clientPort = '2181'
ams_checkpoint_dir = config['configurations']['ams-site']['timeline.metrics.aggregator.checkpoint.dir']
-hbase_pid_dir = status_params.hbase_pid_dir
_hbase_tmp_dir = config['configurations']['ams-hbase-site']['hbase.tmp.dir']
hbase_tmp_dir = substitute_vars(_hbase_tmp_dir, config['configurations']['ams-hbase-site'])
_zookeeper_data_dir = config['configurations']['ams-hbase-site']['hbase.zookeeper.property.dataDir']
http://git-wip-us.apache.org/repos/asf/ambari/blob/6e002b25/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/scripts/service_check.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/scripts/service_check.py b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/scripts/service_check.py
index 4346f0f..8f369f7 100644
--- a/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/scripts/service_check.py
+++ b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/scripts/service_check.py
@@ -27,6 +27,7 @@ from ambari_commons import OSConst
from ambari_commons.os_family_impl import OsFamilyFuncImpl, OsFamilyImpl
import httplib
+import network
import urllib
import ambari_simplejson as json # simplejson is much faster comparing to Python 2.6 json module and has the same functions set.
import os
@@ -68,6 +69,8 @@ class AMSServiceCheck(Script):
random_value1 = random.random()
headers = {"Content-type": "application/json"}
+ ca_certs = os.path.join(params.ams_collector_conf_dir,
+ params.metric_truststore_ca_certs)
for i in xrange(0, self.AMS_CONNECT_TRIES):
try:
@@ -79,9 +82,10 @@ class AMSServiceCheck(Script):
Logger.info("Connecting (POST) to %s:%s%s" % (params.metric_collector_host,
params.metric_collector_port,
self.AMS_METRICS_POST_URL))
- conn = self.get_http_connection(params.metric_collector_host,
- int(params.metric_collector_port),
- params.metric_collector_https_enabled)
+ conn = network.get_http_connection(params.metric_collector_host,
+ int(params.metric_collector_port),
+ params.metric_collector_https_enabled,
+ ca_certs)
conn.request("POST", self.AMS_METRICS_POST_URL, metric_json, headers)
response = conn.getresponse()
@@ -128,9 +132,10 @@ class AMSServiceCheck(Script):
params.metric_collector_port,
self.AMS_METRICS_GET_URL % encoded_get_metrics_parameters))
- conn = self.get_http_connection(params.metric_collector_host,
- int(params.metric_collector_port),
- params.metric_collector_https_enabled)
+ conn = network.get_http_connection(params.metric_collector_host,
+ int(params.metric_collector_port),
+ params.metric_collector_https_enabled,
+ ca_certs)
conn.request("GET", self.AMS_METRICS_GET_URL % encoded_get_metrics_parameters)
response = conn.getresponse()
Logger.info("Http response: %s %s" % (response.status, response.reason))
@@ -163,13 +168,6 @@ class AMSServiceCheck(Script):
Logger.info("Ambari Metrics service check is finished.")
- def get_http_connection(self, host, port, https_enabled=False):
- if https_enabled:
- # TODO verify certificate
- return httplib.HTTPSConnection(host, port)
- else:
- return httplib.HTTPConnection(host, port)
-
if __name__ == "__main__":
AMSServiceCheck().execute()
http://git-wip-us.apache.org/repos/asf/ambari/blob/6e002b25/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/templates/metric_monitor.ini.j2
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/templates/metric_monitor.ini.j2 b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/templates/metric_monitor.ini.j2
index 4e2d0f5..0b0932a 100644
--- a/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/templates/metric_monitor.ini.j2
+++ b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/templates/metric_monitor.ini.j2
@@ -18,11 +18,9 @@
[default]
debug_level = INFO
-metrics_server = {{metric_collector_host}}:{{metric_collector_port}}
hostname = {{hostname}}
enable_time_threshold = false
enable_value_threshold = false
-https_enabled = {{metric_collector_https_enabled}}
[emitter]
send_interval = {{metrics_report_interval}}
@@ -30,3 +28,6 @@ send_interval = {{metrics_report_interval}}
[collector]
collector_sleep_interval = 5
max_queue_size = 5000
+host = {{metric_collector_host}}
+port = {{metric_collector_port}}
+https_enabled = {{metric_collector_https_enabled}}
http://git-wip-us.apache.org/repos/asf/ambari/blob/6e002b25/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/templates/metrics_grafana_datasource.json.j2
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/templates/metrics_grafana_datasource.json.j2 b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/templates/metrics_grafana_datasource.json.j2
index da04668..678d769 100644
--- a/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/templates/metrics_grafana_datasource.json.j2
+++ b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/templates/metrics_grafana_datasource.json.j2
@@ -20,7 +20,7 @@
"name": "{{ams_datasource_name}}",
"type": "ambarimetrics",
"access": "proxy",
- "url": "{{ams_grafana_protocol}}://{{ams_collector_host}}:{{ams_collector_port}}",
+ "url": "{{metric_collector_protocol}}://{{metric_collector_host}}:{{metric_collector_port}}",
"password": "",
"user": "",
"database": "",
http://git-wip-us.apache.org/repos/asf/ambari/blob/6e002b25/ambari-server/src/test/python/stacks/2.0.6/AMBARI_METRICS/test_metrics_collector.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/python/stacks/2.0.6/AMBARI_METRICS/test_metrics_collector.py b/ambari-server/src/test/python/stacks/2.0.6/AMBARI_METRICS/test_metrics_collector.py
index 64b16c6..50bf712 100644
--- a/ambari-server/src/test/python/stacks/2.0.6/AMBARI_METRICS/test_metrics_collector.py
+++ b/ambari-server/src/test/python/stacks/2.0.6/AMBARI_METRICS/test_metrics_collector.py
@@ -21,6 +21,7 @@ limitations under the License.
from mock.mock import MagicMock, patch
from stacks.utils.RMFTestCase import *
+@patch("tempfile.mkdtemp", new = MagicMock(return_value='/some_tmp_dir'))
@patch("os.path.exists", new = MagicMock(return_value=True))
@patch("platform.linux_distribution", new = MagicMock(return_value="Linux"))
class TestMetricsCollector(RMFTestCase):
@@ -39,6 +40,15 @@ class TestMetricsCollector(RMFTestCase):
self.assert_hbase_configure('master', distributed=True)
self.assert_hbase_configure('regionserver', distributed=True)
self.assert_ams('collector', distributed=True)
+ self.assertResourceCalled('Execute', 'ambari-sudo.sh /usr/jdk64/jdk1.7.0_45/bin/keytool -importkeystore -srckeystore /etc/security/clientKeys/all.jks -destkeystore /some_tmp_dir/truststore.p12 -deststoretype PKCS12 -srcstorepass bigdata -deststorepass bigdata',
+ )
+ self.assertResourceCalled('Execute', 'ambari-sudo.sh openssl pkcs12 -in /some_tmp_dir/truststore.p12 -out /etc/ambari-metrics-collector/conf/ca.pem -cacerts -nokeys -passin pass:bigdata',
+ )
+ self.assertResourceCalled('Execute', ('chown', u'ams', '/etc/ambari-metrics-collector/conf/ca.pem'),
+ sudo=True
+ )
+ self.assertResourceCalled('Execute', 'ambari-sudo.sh rm -rf /some_tmp_dir',
+ )
self.assertResourceCalled('Execute', '/usr/lib/ams-hbase/bin/hbase-daemon.sh --config /etc/ams-hbase/conf stop regionserver',
on_timeout = 'ls /var/run/ambari-metrics-collector//hbase-ams-regionserver.pid >/dev/null 2>&1 && ps `cat /var/run/ambari-metrics-collector//hbase-ams-regionserver.pid` >/dev/null 2>&1 && ambari-sudo.sh -H -E kill -9 `ambari-sudo.sh cat /var/run/ambari-metrics-collector//hbase-ams-regionserver.pid`',
timeout = 30,
http://git-wip-us.apache.org/repos/asf/ambari/blob/6e002b25/ambari-server/src/test/python/stacks/2.0.6/AMBARI_METRICS/test_metrics_grafana.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/python/stacks/2.0.6/AMBARI_METRICS/test_metrics_grafana.py b/ambari-server/src/test/python/stacks/2.0.6/AMBARI_METRICS/test_metrics_grafana.py
index 755bb4f..1c72aba 100644
--- a/ambari-server/src/test/python/stacks/2.0.6/AMBARI_METRICS/test_metrics_grafana.py
+++ b/ambari-server/src/test/python/stacks/2.0.6/AMBARI_METRICS/test_metrics_grafana.py
@@ -18,10 +18,11 @@ See the License for the specific language governing permissions and
limitations under the License.
'''
-from mock.mock import MagicMock, patch, call
+from mock.mock import MagicMock, patch
from stacks.utils.RMFTestCase import *
import os, sys
+@patch("tempfile.mkdtemp", new = MagicMock(return_value='/some_tmp_dir'))
@patch("os.path.exists", new = MagicMock(return_value=True))
@patch("platform.linux_distribution", new = MagicMock(return_value="Linux"))
class TestMetricsGrafana(RMFTestCase):
@@ -47,6 +48,15 @@ class TestMetricsGrafana(RMFTestCase):
)
self.maxDiff=None
self.assert_configure()
+ self.assertResourceCalled('Execute', 'ambari-sudo.sh /usr/jdk64/jdk1.7.0_45/bin/keytool -importkeystore -srckeystore /etc/security/clientKeys/all.jks -destkeystore /some_tmp_dir/truststore.p12 -deststoretype PKCS12 -srcstorepass bigdata -deststorepass bigdata',
+ )
+ self.assertResourceCalled('Execute', 'ambari-sudo.sh openssl pkcs12 -in /some_tmp_dir/truststore.p12 -out /etc/ambari-metrics-grafana/conf/ca.pem -cacerts -nokeys -passin pass:bigdata',
+ )
+ self.assertResourceCalled('Execute', ('chown', u'ams', '/etc/ambari-metrics-grafana/conf/ca.pem'),
+ sudo = True
+ )
+ self.assertResourceCalled('Execute', 'ambari-sudo.sh rm -rf /some_tmp_dir',
+ )
self.assertResourceCalled('Execute', '/usr/sbin/ambari-metrics-grafana stop',
user = 'ams'
)
http://git-wip-us.apache.org/repos/asf/ambari/blob/6e002b25/ambari-server/src/test/python/stacks/2.0.6/configs/default.json
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/python/stacks/2.0.6/configs/default.json b/ambari-server/src/test/python/stacks/2.0.6/configs/default.json
index adf12ad..4ff45e8 100644
--- a/ambari-server/src/test/python/stacks/2.0.6/configs/default.json
+++ b/ambari-server/src/test/python/stacks/2.0.6/configs/default.json
@@ -820,6 +820,7 @@
"content": "\n"
},
"ams-site": {
+ "timeline.metrics.service.http.policy": "HTTPS_ONLY",
"timeline.metrics.host.aggregator.minute.ttl": "604800",
"timeline.metrics.cluster.aggregator.daily.checkpointCutOffMultiplier": "1",
"timeline.metrics.cluster.aggregator.daily.ttl": "63072000",
@@ -868,6 +869,11 @@
"ams-ssl-server": {
"content": "\n"
},
+ "ams-ssl-client": {
+ "ssl.client.truststore.location": "/etc/security/clientKeys/all.jks",
+ "ssl.client.truststore.type": "jks",
+ "ssl.client.truststore.password": "bigdata"
+ },
"ams-grafana-ini": {
"content": "\n"
}
@@ -881,6 +887,7 @@
"ams-hbase-log4j": {},
"ams-site": {},
"ams-ssl-server": {},
+ "ams-ssl-client": {},
"sqoop-site": {},
"yarn-site": {
"final": {
@@ -966,6 +973,9 @@
"ams-ssl-server": {
"tag": "version1"
},
+ "ams-ssl-client": {
+ "tag": "version1"
+ },
"ams-hbase-policy": {
"tag": "version1"
},