You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by ds...@apache.org on 2016/02/24 14:54:55 UTC

ambari git commit: AMBARI-15148 Implement AMS collector certificate verification on monitor and service check sides (dsen)

Repository: ambari
Updated Branches:
  refs/heads/trunk 1c5611407 -> 6e002b25c


AMBARI-15148 Implement AMS collector certificate verification on monitor and service check sides (dsen)


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/6e002b25
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/6e002b25
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/6e002b25

Branch: refs/heads/trunk
Commit: 6e002b25c891c04d4d4308071ffab865c624c942
Parents: 1c56114
Author: Dmytro Sen <ds...@apache.org>
Authored: Wed Feb 24 15:54:21 2016 +0200
Committer: Dmytro Sen <ds...@apache.org>
Committed: Wed Feb 24 15:54:21 2016 +0200

----------------------------------------------------------------------
 .../conf/unix/metric_monitor.ini                |  7 +-
 .../src/main/python/core/config_reader.py       | 36 +++++--
 .../src/main/python/core/controller.py          |  1 -
 .../src/main/python/core/emitter.py             | 34 ++++---
 .../src/main/python/core/security.py            | 98 ++++++++++++++++++++
 .../src/test/python/core/TestEmitter.py         | 65 +++++++------
 .../AMBARI_METRICS/0.1.0/package/scripts/ams.py | 40 +++++++-
 .../0.1.0/package/scripts/functions.py          |  4 -
 .../package/scripts/metrics_grafana_util.py     | 58 ++++++++----
 .../0.1.0/package/scripts/network.py            | 39 ++++++++
 .../0.1.0/package/scripts/params.py             |  4 +-
 .../0.1.0/package/scripts/service_check.py      | 24 +++--
 .../package/templates/metric_monitor.ini.j2     |  5 +-
 .../metrics_grafana_datasource.json.j2          |  2 +-
 .../AMBARI_METRICS/test_metrics_collector.py    | 10 ++
 .../AMBARI_METRICS/test_metrics_grafana.py      | 12 ++-
 .../python/stacks/2.0.6/configs/default.json    | 10 ++
 17 files changed, 355 insertions(+), 94 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/6e002b25/ambari-metrics/ambari-metrics-host-monitoring/conf/unix/metric_monitor.ini
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-host-monitoring/conf/unix/metric_monitor.ini b/ambari-metrics/ambari-metrics-host-monitoring/conf/unix/metric_monitor.ini
index 5952982..3e5d861 100644
--- a/ambari-metrics/ambari-metrics-host-monitoring/conf/unix/metric_monitor.ini
+++ b/ambari-metrics/ambari-metrics-host-monitoring/conf/unix/metric_monitor.ini
@@ -18,11 +18,9 @@
 
 [default]
 debug_level = INFO
-metrics_server = localhost:{{ams_collector_port}}
-hostname = {{hostname}}
+hostname = localhost
 enable_time_threshold = false
 enable_value_threshold = false
-https_enabled = false
 
 [emitter]
 send_interval = 60
@@ -30,3 +28,6 @@ send_interval = 60
 [collector]
 collector_sleep_interval = 5
 max_queue_size = 5000
+host = localhost
+port = 6188
+https_enabled = false

http://git-wip-us.apache.org/repos/asf/ambari/blob/6e002b25/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/core/config_reader.py
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/core/config_reader.py b/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/core/config_reader.py
index d533537..a053955 100644
--- a/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/core/config_reader.py
+++ b/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/core/config_reader.py
@@ -34,30 +34,38 @@ class ConfigDefaults(object):
     pass
   def get_metric_file_path(self):
     pass
+  def get_ca_certs_file_path(self):
+    pass
 
 @OsFamilyImpl(os_family=OSConst.WINSRV_FAMILY)
 class ConfigDefaultsWindows(ConfigDefaults):
   def __init__(self):
     self._CONFIG_FILE_PATH = "conf\\metric_monitor.ini"
     self._METRIC_FILE_PATH = "conf\\metric_groups.conf"
+    self._METRIC_FILE_PATH = "conf\\ca.pem"
     pass
 
   def get_config_file_path(self):
     return self._CONFIG_FILE_PATH
   def get_metric_file_path(self):
     return self._METRIC_FILE_PATH
+  def get_ca_certs_file_path(self):
+    return self._CA_CERTS_FILE_PATH
 
 @OsFamilyImpl(os_family=OsFamilyImpl.DEFAULT)
 class ConfigDefaultsLinux(ConfigDefaults):
   def __init__(self):
     self._CONFIG_FILE_PATH = "/etc/ambari-metrics-monitor/conf/metric_monitor.ini"
     self._METRIC_FILE_PATH = "/etc/ambari-metrics-monitor/conf/metric_groups.conf"
+    self._CA_CERTS_FILE_PATH = "/etc/ambari-metrics-monitor/conf/ca.pem"
     pass
 
   def get_config_file_path(self):
     return self._CONFIG_FILE_PATH
   def get_metric_file_path(self):
     return self._METRIC_FILE_PATH
+  def get_ca_certs_file_path(self):
+    return self._CA_CERTS_FILE_PATH
 
 configDefaults = ConfigDefaults()
 
@@ -65,6 +73,7 @@ config = ConfigParser.RawConfigParser()
 
 CONFIG_FILE_PATH = configDefaults.get_config_file_path()
 METRIC_FILE_PATH = configDefaults.get_metric_file_path()
+CA_CERTS_FILE_PATH = configDefaults.get_ca_certs_file_path()
 
 OUT_DIR = os.path.join(os.sep, "var", "log", "ambari-metrics-host-monitoring")
 SERVER_OUT_FILE = OUT_DIR + os.sep + "ambari-metrics-host-monitoring.out"
@@ -88,10 +97,8 @@ AMBARI_AGENT_CONF = '/etc/ambari-agent/conf/ambari-agent.ini'
 config_content = """
 [default]
 debug_level = INFO
-metrics_server = host:port
 enable_time_threshold = false
 enable_value_threshold = false
-https_enabled = false
 
 [emitter]
 send_interval = 60
@@ -99,6 +106,9 @@ send_interval = 60
 [collector]
 collector_sleep_interval = 5
 max_queue_size = 5000
+host = localhost
+port = 6188
+https_enabled = false
 """
 
 metric_group_info = """
@@ -162,7 +172,7 @@ class Configuration:
         'process_metric_groups': []
       }
     pass
-
+    self._ca_cert_file_path = CA_CERTS_FILE_PATH
     self.hostname_script = None
     ambari_agent_config = ConfigParser.RawConfigParser()
     if os.path.exists(AMBARI_AGENT_CONF):
@@ -193,9 +203,6 @@ class Configuration:
   def get_collector_sleep_interval(self):
     return int(self.get("collector", "collector_sleep_interval", 5))
 
-  def get_server_address(self):
-    return self.get("default", "metrics_server")
-
   def get_hostname_config(self):
     return self.get("default", "hostname", None)
 
@@ -211,5 +218,18 @@ class Configuration:
   def get_max_queue_size(self):
     return int(self.get("collector", "max_queue_size", 5000))
 
-  def get_server_https_enabled(self):
-    return "true" == str(self.get("default", "https_enabled")).lower()
+  def is_server_https_enabled(self):
+    return "true" == str(self.get("collector", "https_enabled")).lower()
+
+  def get_server_host(self):
+    return self.get("collector", "host")
+
+  def get_server_port(self):
+    try:
+      return int(self.get("collector", "port"))
+    except:
+      return 6188
+
+  def get_ca_certs(self):
+    return self._ca_cert_file_path
+

http://git-wip-us.apache.org/repos/asf/ambari/blob/6e002b25/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/core/controller.py
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/core/controller.py b/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/core/controller.py
index 1713501..c04a61b 100644
--- a/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/core/controller.py
+++ b/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/core/controller.py
@@ -46,7 +46,6 @@ class Controller(threading.Thread):
                                                        hostinfo.get_ip_address())
     self.event_queue = Queue(config.get_max_queue_size())
     self.metric_collector = MetricsCollector(self.event_queue, self.application_metric_map, hostinfo)
-    self.server_url = config.get_server_address()
     self.sleep_interval = config.get_collector_sleep_interval()
     self._stop_handler = stop_handler
     self.initialize_events_cache()

http://git-wip-us.apache.org/repos/asf/ambari/blob/6e002b25/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/core/emitter.py
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/core/emitter.py b/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/core/emitter.py
index 4e39ab5..6997108 100644
--- a/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/core/emitter.py
+++ b/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/core/emitter.py
@@ -20,12 +20,13 @@ limitations under the License.
 
 import logging
 import threading
-import urllib2
+
+from security import CachedHTTPSConnection, CachedHTTPConnection
 
 logger = logging.getLogger()
 
 class Emitter(threading.Thread):
-  COLLECTOR_URL = "{0}://{1}/ws/v1/timeline/metrics"
+  AMS_METRICS_POST_URL = "/ws/v1/timeline/metrics/"
   RETRY_SLEEP_INTERVAL = 5
   MAX_RETRY_COUNT = 3
   """
@@ -39,8 +40,16 @@ class Emitter(threading.Thread):
     self._stop_handler = stop_handler
     self.application_metric_map = application_metric_map
     # TODO verify certificate
-    protocol = 'https' if config.get_server_https_enabled() else 'http'
-    self.collector_url = self.COLLECTOR_URL.format(protocol, config.get_server_address())
+    timeout = int(self.send_interval - 10)
+    if config.is_server_https_enabled():
+      self.connection = CachedHTTPSConnection(config.get_server_host(),
+                                              config.get_server_port(),
+                                              timeout=timeout,
+                                              ca_certs=config.get_ca_certs())
+    else:
+      self.connection = CachedHTTPConnection(config.get_server_host(),
+                                             config.get_server_port(),
+                                             timeout=timeout)
 
   def run(self):
     logger.info('Running Emitter thread: %s' % threading.currentThread().getName())
@@ -75,7 +84,7 @@ class Emitter(threading.Thread):
         logger.warn('Error sending metrics to server. %s' % str(e))
       pass
 
-      if response and response.getcode() == 200:
+      if response and response.status == 200:
         retry_count = self.MAX_RETRY_COUNT
       else:
         logger.warn("Retrying after {0} ...".format(self.RETRY_SLEEP_INTERVAL))
@@ -87,14 +96,15 @@ class Emitter(threading.Thread):
     pass
     # TODO verify certificate
   def push_metrics(self, data):
-    headers = {"Content-Type" : "application/json", "Accept" : "*/*"}
-    logger.info("server: %s" % self.collector_url)
+    headers = {"Content-Type" : "application/json",
+               "Accept" : "*/*",
+               "Connection":" Keep-Alive"}
     logger.debug("message to sent: %s" % data)
-    req = urllib2.Request(self.collector_url, data, headers)
-    response = urllib2.urlopen(req, timeout=int(self.send_interval - 10))
+    self.connection.request("POST", self.AMS_METRICS_POST_URL, data, headers)
+    response = self.connection.getresponse()
     if response:
-      logger.debug("POST response from server: retcode = {0}".format(response.getcode()))
+      logger.debug("POST response from server: retcode = {0}, reason = {1}"
+                   .format(response.status, response.reason))
       logger.debug(str(response.read()))
-    pass
-    return response
 
+    return response

http://git-wip-us.apache.org/repos/asf/ambari/blob/6e002b25/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/core/security.py
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/core/security.py b/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/core/security.py
new file mode 100644
index 0000000..e36e01d
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/core/security.py
@@ -0,0 +1,98 @@
+#!/usr/bin/env python
+
+'''
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+'''
+
+import logging
+import ssl
+import socket
+import httplib
+
+logger = logging.getLogger()
+
+# TODO merge this with security.py in ambari-agent and move to ambrari commons
+
+class VerifiedHTTPSConnection(httplib.HTTPSConnection):
+  """ Connecting using ssl wrapped sockets """
+
+  def __init__(self, host, port, timeout, ca_certs):
+    httplib.HTTPSConnection.__init__(self, host, port=port, timeout=timeout)
+    self.ca_certs = ca_certs
+
+  def connect(self):
+
+    try:
+      sock = self.create_connection()
+      self.sock = ssl.wrap_socket(sock, cert_reqs=ssl.CERT_REQUIRED,
+                                  ca_certs=self.ca_certs)
+      logger.info('SSL connection established.')
+    except (ssl.SSLError, AttributeError) as ex:
+      logger.info('Insecure connection to https://{0}:{1}/ failed'
+                  .format(self.host, self.port))
+
+  def create_connection(self):
+    if self.sock:
+      self.sock.close()
+    logger.info("SSL Connect being called.. connecting to https://{0}:{1}/"
+                .format(self.host, self.port))
+    sock = socket.create_connection((self.host, self.port), timeout=self.timeout)
+    sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
+    if self._tunnel_host:
+      self.sock = sock
+      self._tunnel()
+
+    return sock
+
+class CachedHTTPConnection:
+  """ Caches a socket and uses a single http connection to the server. """
+
+  def __init__(self, host, port, timeout):
+    self.connected = False
+    self.host = host
+    self.port = port
+    self.timeout = timeout
+
+  def connect(self):
+    if not self.connected:
+      self.httpconn = self.create_connection()
+      self.httpconn.connect()
+      self.connected = True
+
+  def request(self, method, url, body=None, headers={}):
+    self.connect()
+    try:
+      return self.httpconn.request(method, url, body, headers)
+    except Exception as e:
+      self.connected = False
+      raise e
+
+  def getresponse(self):
+    return self.httpconn.getresponse()
+
+  def create_connection(self):
+    return httplib.HTTPConnection(self.host, self.port, self.timeout)
+
+class CachedHTTPSConnection(CachedHTTPConnection):
+  """ Caches an ssl socket and uses a single https connection to the server. """
+
+  def __init__(self, host, port, timeout, ca_certs):
+    self.ca_certs = ca_certs
+    CachedHTTPConnection.__init__(self, host, port, timeout)
+
+  def create_connection(self):
+    return VerifiedHTTPSConnection(self.host, self.port, self.timeout, self.ca_certs)

http://git-wip-us.apache.org/repos/asf/ambari/blob/6e002b25/ambari-metrics/ambari-metrics-host-monitoring/src/test/python/core/TestEmitter.py
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-host-monitoring/src/test/python/core/TestEmitter.py b/ambari-metrics/ambari-metrics-host-monitoring/src/test/python/core/TestEmitter.py
index 8f5236a..be0608f 100644
--- a/ambari-metrics/ambari-metrics-host-monitoring/src/test/python/core/TestEmitter.py
+++ b/ambari-metrics/ambari-metrics-host-monitoring/src/test/python/core/TestEmitter.py
@@ -19,34 +19,37 @@ limitations under the License.
 '''
 
 import json
-import urllib2
-
 import logging
-from unittest import TestCase
-from only_for_platform import get_platform, os_distro_value, PLATFORM_WINDOWS
-
-from ambari_commons.os_check import OSCheck
 
+from unittest import TestCase
+from only_for_platform import get_platform, PLATFORM_WINDOWS
 from mock.mock import patch, MagicMock
+from security import CachedHTTPConnection
+
+if get_platform() != PLATFORM_WINDOWS:
+  os_distro_value = ('Suse','11','Final')
+else:
+  os_distro_value = ('win2012serverr2','6.3','WindowsServer')
 
 with patch("platform.linux_distribution", return_value = os_distro_value):
   from ambari_commons import OSCheck
-  from core.application_metric_map import ApplicationMetricMap
-  from core.config_reader import Configuration
-  from core.emitter import Emitter
-  from core.stop_handler import bind_signal_handlers
+  from application_metric_map import ApplicationMetricMap
+  from config_reader import Configuration
+  from emitter import Emitter
+  from stop_handler import bind_signal_handlers
 
 logger = logging.getLogger()
 
 class TestEmitter(TestCase):
 
   @patch.object(OSCheck, "os_distribution", new = MagicMock(return_value = os_distro_value))
-  @patch("urllib2.urlopen")
-  def testJavaHomeAvailableCheck(self, url_open_mock):
-    url_open_mock.return_value = MagicMock()
-    url_open_mock.return_value.getcode.return_value = 200
-    self.assertEqual(urllib2.urlopen(None, None).getcode(), 200)
-    url_open_mock.reset_mock()
+  @patch.object(CachedHTTPConnection, "create_connection", new = MagicMock())
+  @patch.object(CachedHTTPConnection, "request")
+  @patch.object(CachedHTTPConnection, "getresponse")
+  def test_submit_metrics(self, getresponse_mock, request_mock):
+    request_mock.return_value = MagicMock()
+    getresponse_mock.return_value = MagicMock()
+    getresponse_mock.return_value.status = 200
 
     stop_handler = bind_signal_handlers()
 
@@ -56,16 +59,20 @@ class TestEmitter(TestCase):
     application_metric_map.put_metric("APP1", {"metric1":1}, 1)
     emitter = Emitter(config, application_metric_map, stop_handler)
     emitter.submit_metrics()
-    
-    self.assertEqual(url_open_mock.call_count, 1)
-    self.assertUrlData(url_open_mock)
+
+    self.assertEqual(request_mock.call_count, 1)
+    self.assertUrlData(request_mock)
 
 
   @patch.object(OSCheck, "os_distribution", new = MagicMock(return_value = os_distro_value))
-  @patch("urllib2.urlopen")
-  def testRetryFetch(self, url_open_mock):
+  @patch.object(CachedHTTPConnection, "create_connection", new = MagicMock())
+  @patch.object(CachedHTTPConnection, "getresponse", new = MagicMock())
+  @patch.object(CachedHTTPConnection, "request")
+  def testRetryFetch(self, request_mock):
     stop_handler = bind_signal_handlers()
 
+    request_mock.return_value = MagicMock()
+
     config = Configuration()
     application_metric_map = ApplicationMetricMap("host","10.10.10.10")
     application_metric_map.clear()
@@ -73,15 +80,15 @@ class TestEmitter(TestCase):
     emitter = Emitter(config, application_metric_map, stop_handler)
     emitter.RETRY_SLEEP_INTERVAL = .001
     emitter.submit_metrics()
-    
-    self.assertEqual(url_open_mock.call_count, 3)
-    self.assertUrlData(url_open_mock)
-    
-  def assertUrlData(self, url_open_mock):
-    self.assertEqual(len(url_open_mock.call_args), 2)
-    data = url_open_mock.call_args[0][0].data
+
+    self.assertEqual(request_mock.call_count, 3)
+    self.assertUrlData(request_mock)
+
+  def assertUrlData(self, request_mock):
+    self.assertEqual(len(request_mock.call_args), 2)
+    data = request_mock.call_args[0][2]
     self.assertTrue(data is not None)
-    
+
     metrics = json.loads(data)
     self.assertEqual(len(metrics['metrics']), 1)
     self.assertEqual(metrics['metrics'][0]['metricname'],'metric1')

http://git-wip-us.apache.org/repos/asf/ambari/blob/6e002b25/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/scripts/ams.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/scripts/ams.py b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/scripts/ams.py
index 417574b..c8c3b6d 100644
--- a/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/scripts/ams.py
+++ b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/scripts/ams.py
@@ -333,6 +333,9 @@ def ams(name=None):
                 mode=0644
       )
 
+    if params.metric_collector_https_enabled:
+      export_ca_certs(params.ams_collector_conf_dir)
+
     pass
 
   elif name == 'monitor':
@@ -384,7 +387,9 @@ def ams(name=None):
          content=InlineTemplate(params.ams_env_sh_template)
     )
 
-    # TODO
+    if params.metric_collector_https_enabled:
+      export_ca_certs(params.ams_monitor_conf_dir)
+
     pass
   elif name == 'grafana':
 
@@ -415,6 +420,39 @@ def ams(name=None):
          content=InlineTemplate(params.ams_grafana_ini_template)
          )
 
+    if params.metric_collector_https_enabled:
+      export_ca_certs(params.ams_grafana_conf_dir)
+
     pass
 
+def export_ca_certs(dir_path):
+  # export ca certificates on every restart to handle changed truststore content
+
+  import params
+  import tempfile
+
+  ca_certs_path = os.path.join(dir_path, params.metric_truststore_ca_certs)
+  truststore = params.metric_truststore_path
+
+  tmpdir = tempfile.mkdtemp()
+  truststore_p12 = os.path.join(tmpdir,'truststore.p12')
+
+  if (params.metric_truststore_type.lower() == 'jks'):
+    # Convert truststore from JKS to PKCS12
+    cmd = format("{sudo} {java64_home}/bin/keytool -importkeystore -srckeystore {metric_truststore_path} -destkeystore {truststore_p12} -deststoretype PKCS12 -srcstorepass {metric_truststore_password} -deststorepass {metric_truststore_password}")
+    Execute(cmd,
+    )
+    truststore = truststore_p12
+
+  # Export all CA certificates from the truststore to the conf directory
+  cmd = format("{sudo} openssl pkcs12 -in {truststore} -out {ca_certs_path} -cacerts -nokeys -passin pass:{metric_truststore_password}")
+  Execute(cmd,
+  )
+  Execute(('chown', params.ams_user, ca_certs_path),
+          sudo=True
+  )
+  Execute(format('{sudo} rm -rf {tmpdir}')
+  )
+
+
   pass

http://git-wip-us.apache.org/repos/asf/ambari/blob/6e002b25/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/scripts/functions.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/scripts/functions.py b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/scripts/functions.py
index 140c24c..e3b3a48 100644
--- a/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/scripts/functions.py
+++ b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/scripts/functions.py
@@ -18,12 +18,8 @@ limitations under the License.
 
 """
 
-import os
 import re
 import math
-import datetime
-
-from resource_management.core.shell import checked_call
 
 def calc_xmn_from_xms(heapsize_str, xmn_percent, xmn_max):
   """

http://git-wip-us.apache.org/repos/asf/ambari/blob/6e002b25/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/scripts/metrics_grafana_util.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/scripts/metrics_grafana_util.py b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/scripts/metrics_grafana_util.py
index 02caa11..37d403d 100644
--- a/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/scripts/metrics_grafana_util.py
+++ b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/scripts/metrics_grafana_util.py
@@ -17,14 +17,15 @@ See the License for the specific language governing permissions and
 limitations under the License.
 
 """
+import httplib
 from resource_management.core.logger import Logger
 from resource_management.core.base import Fail
 from resource_management import Template
 
-import httplib
 import time
 import socket
 import json
+import network
 
 def create_ams_datasource():
 
@@ -33,16 +34,21 @@ def create_ams_datasource():
   GRAFANA_CONNECT_TIMEOUT = 15
   GRAFANA_URL = "/api/datasources"
   METRICS_GRAFANA_DATASOURCE_NAME = "AMBARI_METRICS"
-
+  grafana_https_enabled = params.ams_grafana_protocol.lower() == 'https'
   headers = {"Content-type": "application/json"}
 
+  ams_datasource_json = Template('metrics_grafana_datasource.json.j2',
+                                 ams_datasource_name=METRICS_GRAFANA_DATASOURCE_NAME)\
+                                 .get_content()
+
   Logger.info("Checking if AMS Grafana datasource already exists")
   Logger.info("Connecting (GET) to %s:%s%s" % (params.hostname,
                                                params.ams_grafana_port,
                                                GRAFANA_URL))
-# TODO add https support
-  conn = httplib.HTTPConnection(params.hostname,
-                                int(params.ams_grafana_port))
+
+  conn = network.get_http_connection(params.hostname,
+                                     int(params.ams_grafana_port),
+                                     grafana_https_enabled)
 
   conn.request("GET", GRAFANA_URL)
   response = conn.getresponse()
@@ -58,7 +64,7 @@ def create_ams_datasource():
         Logger.info("Ambari Metrics Grafana datasource already present. Checking Metrics Collector URL")
         datasource_url = datasources_json[i]["url"]
 
-        if datasource_url == (params.ams_grafana_protocol + "://"
+        if datasource_url == (params.metric_collector_protocol + "://"
                                 + params.metric_collector_host + ":"
                                 + params.metric_collector_port):
           Logger.info("Metrics Collector URL validation succeeded. Skipping datasource creation")
@@ -69,10 +75,31 @@ def create_ams_datasource():
           Logger.info("Metrics Collector URL validation failed.")
           datasource_id = datasources_json[i]["id"]
           Logger.info("Deleting obselete Metrics datasource.")
-          conn = httplib.HTTPConnection(params.hostname, int(params.ams_grafana_port))
-          conn.request("DELETE", GRAFANA_URL + "/" + str(datasource_id))
+          conn = network.get_http_connection(params.hostname,
+                                             int(params.ams_grafana_port),
+                                             grafana_https_enabled)
+          conn.request("PUT", GRAFANA_URL + "/" + str(datasource_id), ams_datasource_json, headers)
           response = conn.getresponse()
-          Logger.info("Http response: %s %s" % (response.status, response.reason))
+          data = response.read()
+          Logger.info("Http data: %s" % data)
+          conn.close()
+
+          if response.status == 200:
+            Logger.info("Ambari Metrics Grafana data source updated.")
+            GRAFANA_CONNECT_TRIES = 0 # No need to create datasource again
+          elif response.status == 500:
+            Logger.info("Ambari Metrics Grafana data source update failed. Not retrying.")
+            raise Fail("Ambari Metrics Grafana data source update failed. PUT request status: %s %s \n%s" %
+                       (response.status, response.reason, data))
+          else:
+            Logger.info("Ambari Metrics Grafana data source update failed.")
+            if i < GRAFANA_CONNECT_TRIES - 1:
+              time.sleep(GRAFANA_CONNECT_TIMEOUT)
+              Logger.info("Next retry in %s seconds."
+                          % (GRAFANA_CONNECT_TIMEOUT))
+            else:
+              raise Fail("Ambari Metrics Grafana data source creation failed. POST request status: %s %s \n%s" %
+                         (response.status, response.reason, data))
 
         break
   else:
@@ -83,19 +110,15 @@ def create_ams_datasource():
 
   for i in xrange(0, GRAFANA_CONNECT_TRIES):
     try:
-      ams_datasource_json = Template('metrics_grafana_datasource.json.j2',
-                             ams_datasource_name=METRICS_GRAFANA_DATASOURCE_NAME,
-                             ams_grafana_protocol=params.ams_grafana_protocol,
-                             ams_collector_host=params.metric_collector_host,
-                             ams_collector_port=params.metric_collector_port).get_content()
 
       Logger.info("Generated datasource:\n%s" % ams_datasource_json)
 
       Logger.info("Connecting (POST) to %s:%s%s" % (params.hostname,
                                                     params.ams_grafana_port,
                                                     GRAFANA_URL))
-      conn = httplib.HTTPConnection(params.hostname,
-                                    int(params.ams_grafana_port))
+      conn = network.get_http_connection(params.hostname,
+                                         int(params.ams_grafana_port),
+                                         grafana_https_enabled)
       conn.request("POST", GRAFANA_URL, ams_datasource_json, headers)
 
       response = conn.getresponse()
@@ -129,5 +152,4 @@ def create_ams_datasource():
       else:
         raise Fail("Ambari Metrics Grafana data source creation failed. POST request status: %s %s \n%s" %
                  (response.status, response.reason, data))
-
-
+  pass

http://git-wip-us.apache.org/repos/asf/ambari/blob/6e002b25/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/scripts/network.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/scripts/network.py b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/scripts/network.py
new file mode 100644
index 0000000..672ee53
--- /dev/null
+++ b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/scripts/network.py
@@ -0,0 +1,39 @@
+#!/usr/bin/env python
+"""
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+"""
+
+import httplib
+import ssl
+
+from resource_management.core.exceptions import Fail
+
+def get_http_connection(host, port, https_enabled=False, ca_certs=None):
+  if https_enabled:
+    if ca_certs:
+      check_ssl_certificate(host, port, ca_certs)
+    return httplib.HTTPSConnection(host, port)
+  else:
+    return httplib.HTTPConnection(host, port)
+
+def check_ssl_certificate(host, port, ca_certs):
+  try:
+    ssl.get_server_certificate((host, port), ssl_version=ssl.PROTOCOL_SSLv23, ca_certs=ca_certs)
+  except (ssl.SSLError) as ssl_error:
+    raise Fail("Failed to verify the SSL certificate for AMS Collector https://{0}:{1} with CA certificate in {2}"
+               .format(host, port, ca_certs))

http://git-wip-us.apache.org/repos/asf/ambari/blob/6e002b25/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/scripts/params.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/scripts/params.py b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/scripts/params.py
index f1ff998..58ff71c 100644
--- a/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/scripts/params.py
+++ b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/scripts/params.py
@@ -59,6 +59,7 @@ else:
 metric_truststore_path= default("/configurations/ams-ssl-client/ssl.client.truststore.location", "")
 metric_truststore_type= default("/configurations/ams-ssl-client/ssl.client.truststore.type", "")
 metric_truststore_password= default("/configurations/ams-ssl-client/ssl.client.truststore.password", "")
+metric_truststore_ca_certs='ca.pem'
 
 if 'cluster-env' in config['configurations'] and \
     'metrics_collector_vip_host' in config['configurations']['cluster-env']:
@@ -76,9 +77,11 @@ else:
     metric_collector_port = '6188'
 
 ams_collector_log_dir = config['configurations']['ams-env']['metrics_collector_log_dir']
+ams_collector_conf_dir = "/etc/ambari-metrics-collector/conf"
 ams_monitor_log_dir = config['configurations']['ams-env']['metrics_monitor_log_dir']
 
 ams_monitor_dir = "/usr/lib/python2.6/site-packages/resource_monitoring"
+ams_monitor_conf_dir = "/etc/ambari-metrics-monitor/conf"
 ams_monitor_pid_dir = status_params.ams_monitor_pid_dir
 ams_monitor_script = "/usr/sbin/ambari-metrics-monitor"
 
@@ -177,7 +180,6 @@ else:
     zookeeper_clientPort = '2181'
 
 ams_checkpoint_dir = config['configurations']['ams-site']['timeline.metrics.aggregator.checkpoint.dir']
-hbase_pid_dir = status_params.hbase_pid_dir
 _hbase_tmp_dir = config['configurations']['ams-hbase-site']['hbase.tmp.dir']
 hbase_tmp_dir = substitute_vars(_hbase_tmp_dir, config['configurations']['ams-hbase-site'])
 _zookeeper_data_dir = config['configurations']['ams-hbase-site']['hbase.zookeeper.property.dataDir']

http://git-wip-us.apache.org/repos/asf/ambari/blob/6e002b25/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/scripts/service_check.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/scripts/service_check.py b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/scripts/service_check.py
index 4346f0f..8f369f7 100644
--- a/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/scripts/service_check.py
+++ b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/scripts/service_check.py
@@ -27,6 +27,7 @@ from ambari_commons import OSConst
 from ambari_commons.os_family_impl import OsFamilyFuncImpl, OsFamilyImpl
 
 import httplib
+import network
 import urllib
 import ambari_simplejson as json # simplejson is much faster comparing to Python 2.6 json module and has the same functions set.
 import os
@@ -68,6 +69,8 @@ class AMSServiceCheck(Script):
 
     random_value1 = random.random()
     headers = {"Content-type": "application/json"}
+    ca_certs = os.path.join(params.ams_collector_conf_dir,
+                            params.metric_truststore_ca_certs)
 
     for i in xrange(0, self.AMS_CONNECT_TRIES):
       try:
@@ -79,9 +82,10 @@ class AMSServiceCheck(Script):
         Logger.info("Connecting (POST) to %s:%s%s" % (params.metric_collector_host,
                                                       params.metric_collector_port,
                                                       self.AMS_METRICS_POST_URL))
-        conn = self.get_http_connection(params.metric_collector_host,
-                                        int(params.metric_collector_port),
-                                        params.metric_collector_https_enabled)
+        conn = network.get_http_connection(params.metric_collector_host,
+                                           int(params.metric_collector_port),
+                                           params.metric_collector_https_enabled,
+                                           ca_certs)
         conn.request("POST", self.AMS_METRICS_POST_URL, metric_json, headers)
 
         response = conn.getresponse()
@@ -128,9 +132,10 @@ class AMSServiceCheck(Script):
                                                  params.metric_collector_port,
                                               self.AMS_METRICS_GET_URL % encoded_get_metrics_parameters))
 
-    conn = self.get_http_connection(params.metric_collector_host,
-                                    int(params.metric_collector_port),
-                                    params.metric_collector_https_enabled)
+    conn = network.get_http_connection(params.metric_collector_host,
+                                       int(params.metric_collector_port),
+                                       params.metric_collector_https_enabled,
+                                       ca_certs)
     conn.request("GET", self.AMS_METRICS_GET_URL % encoded_get_metrics_parameters)
     response = conn.getresponse()
     Logger.info("Http response: %s %s" % (response.status, response.reason))
@@ -163,13 +168,6 @@ class AMSServiceCheck(Script):
 
     Logger.info("Ambari Metrics service check is finished.")
 
-  def get_http_connection(self, host, port, https_enabled=False):
-    if https_enabled:
-      # TODO verify certificate
-      return httplib.HTTPSConnection(host, port)
-    else:
-      return httplib.HTTPConnection(host, port)
-
 if __name__ == "__main__":
   AMSServiceCheck().execute()
 

http://git-wip-us.apache.org/repos/asf/ambari/blob/6e002b25/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/templates/metric_monitor.ini.j2
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/templates/metric_monitor.ini.j2 b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/templates/metric_monitor.ini.j2
index 4e2d0f5..0b0932a 100644
--- a/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/templates/metric_monitor.ini.j2
+++ b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/templates/metric_monitor.ini.j2
@@ -18,11 +18,9 @@
 
 [default]
 debug_level = INFO
-metrics_server = {{metric_collector_host}}:{{metric_collector_port}}
 hostname = {{hostname}}
 enable_time_threshold = false
 enable_value_threshold = false
-https_enabled = {{metric_collector_https_enabled}}
 
 [emitter]
 send_interval = {{metrics_report_interval}}
@@ -30,3 +28,6 @@ send_interval = {{metrics_report_interval}}
 [collector]
 collector_sleep_interval = 5
 max_queue_size = 5000
+host = {{metric_collector_host}}
+port = {{metric_collector_port}}
+https_enabled = {{metric_collector_https_enabled}}

http://git-wip-us.apache.org/repos/asf/ambari/blob/6e002b25/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/templates/metrics_grafana_datasource.json.j2
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/templates/metrics_grafana_datasource.json.j2 b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/templates/metrics_grafana_datasource.json.j2
index da04668..678d769 100644
--- a/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/templates/metrics_grafana_datasource.json.j2
+++ b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/templates/metrics_grafana_datasource.json.j2
@@ -20,7 +20,7 @@
   "name": "{{ams_datasource_name}}",
   "type": "ambarimetrics",
   "access": "proxy",
-  "url": "{{ams_grafana_protocol}}://{{ams_collector_host}}:{{ams_collector_port}}",
+  "url": "{{metric_collector_protocol}}://{{metric_collector_host}}:{{metric_collector_port}}",
   "password": "",
   "user": "",
   "database": "",

http://git-wip-us.apache.org/repos/asf/ambari/blob/6e002b25/ambari-server/src/test/python/stacks/2.0.6/AMBARI_METRICS/test_metrics_collector.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/python/stacks/2.0.6/AMBARI_METRICS/test_metrics_collector.py b/ambari-server/src/test/python/stacks/2.0.6/AMBARI_METRICS/test_metrics_collector.py
index 64b16c6..50bf712 100644
--- a/ambari-server/src/test/python/stacks/2.0.6/AMBARI_METRICS/test_metrics_collector.py
+++ b/ambari-server/src/test/python/stacks/2.0.6/AMBARI_METRICS/test_metrics_collector.py
@@ -21,6 +21,7 @@ limitations under the License.
 from mock.mock import MagicMock, patch
 from stacks.utils.RMFTestCase import *
 
+@patch("tempfile.mkdtemp", new = MagicMock(return_value='/some_tmp_dir'))
 @patch("os.path.exists", new = MagicMock(return_value=True))
 @patch("platform.linux_distribution", new = MagicMock(return_value="Linux"))
 class TestMetricsCollector(RMFTestCase):
@@ -39,6 +40,15 @@ class TestMetricsCollector(RMFTestCase):
     self.assert_hbase_configure('master', distributed=True)
     self.assert_hbase_configure('regionserver', distributed=True)
     self.assert_ams('collector', distributed=True)
+    self.assertResourceCalled('Execute', 'ambari-sudo.sh /usr/jdk64/jdk1.7.0_45/bin/keytool -importkeystore -srckeystore /etc/security/clientKeys/all.jks -destkeystore /some_tmp_dir/truststore.p12 -deststoretype PKCS12 -srcstorepass bigdata -deststorepass bigdata',
+                              )
+    self.assertResourceCalled('Execute', 'ambari-sudo.sh openssl pkcs12 -in /some_tmp_dir/truststore.p12 -out /etc/ambari-metrics-collector/conf/ca.pem -cacerts -nokeys -passin pass:bigdata',
+                              )
+    self.assertResourceCalled('Execute', ('chown', u'ams', '/etc/ambari-metrics-collector/conf/ca.pem'),
+                              sudo=True
+                              )
+    self.assertResourceCalled('Execute', 'ambari-sudo.sh rm -rf /some_tmp_dir',
+                              )
     self.assertResourceCalled('Execute', '/usr/lib/ams-hbase/bin/hbase-daemon.sh --config /etc/ams-hbase/conf stop regionserver',
                               on_timeout = 'ls /var/run/ambari-metrics-collector//hbase-ams-regionserver.pid >/dev/null 2>&1 && ps `cat /var/run/ambari-metrics-collector//hbase-ams-regionserver.pid` >/dev/null 2>&1 && ambari-sudo.sh -H -E kill -9 `ambari-sudo.sh cat /var/run/ambari-metrics-collector//hbase-ams-regionserver.pid`',
                               timeout = 30,

http://git-wip-us.apache.org/repos/asf/ambari/blob/6e002b25/ambari-server/src/test/python/stacks/2.0.6/AMBARI_METRICS/test_metrics_grafana.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/python/stacks/2.0.6/AMBARI_METRICS/test_metrics_grafana.py b/ambari-server/src/test/python/stacks/2.0.6/AMBARI_METRICS/test_metrics_grafana.py
index 755bb4f..1c72aba 100644
--- a/ambari-server/src/test/python/stacks/2.0.6/AMBARI_METRICS/test_metrics_grafana.py
+++ b/ambari-server/src/test/python/stacks/2.0.6/AMBARI_METRICS/test_metrics_grafana.py
@@ -18,10 +18,11 @@ See the License for the specific language governing permissions and
 limitations under the License.
 '''
 
-from mock.mock import MagicMock, patch, call
+from mock.mock import MagicMock, patch
 from stacks.utils.RMFTestCase import *
 import os, sys
 
+@patch("tempfile.mkdtemp", new = MagicMock(return_value='/some_tmp_dir'))
 @patch("os.path.exists", new = MagicMock(return_value=True))
 @patch("platform.linux_distribution", new = MagicMock(return_value="Linux"))
 class TestMetricsGrafana(RMFTestCase):
@@ -47,6 +48,15 @@ class TestMetricsGrafana(RMFTestCase):
                        )
     self.maxDiff=None
     self.assert_configure()
+    self.assertResourceCalled('Execute', 'ambari-sudo.sh /usr/jdk64/jdk1.7.0_45/bin/keytool -importkeystore -srckeystore /etc/security/clientKeys/all.jks -destkeystore /some_tmp_dir/truststore.p12 -deststoretype PKCS12 -srcstorepass bigdata -deststorepass bigdata',
+                              )
+    self.assertResourceCalled('Execute', 'ambari-sudo.sh openssl pkcs12 -in /some_tmp_dir/truststore.p12 -out /etc/ambari-metrics-grafana/conf/ca.pem -cacerts -nokeys -passin pass:bigdata',
+    )
+    self.assertResourceCalled('Execute', ('chown', u'ams', '/etc/ambari-metrics-grafana/conf/ca.pem'),
+                              sudo = True
+    )
+    self.assertResourceCalled('Execute', 'ambari-sudo.sh rm -rf /some_tmp_dir',
+                              )
     self.assertResourceCalled('Execute', '/usr/sbin/ambari-metrics-grafana stop',
                               user = 'ams'
                               )

http://git-wip-us.apache.org/repos/asf/ambari/blob/6e002b25/ambari-server/src/test/python/stacks/2.0.6/configs/default.json
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/python/stacks/2.0.6/configs/default.json b/ambari-server/src/test/python/stacks/2.0.6/configs/default.json
index adf12ad..4ff45e8 100644
--- a/ambari-server/src/test/python/stacks/2.0.6/configs/default.json
+++ b/ambari-server/src/test/python/stacks/2.0.6/configs/default.json
@@ -820,6 +820,7 @@
             "content": "\n"
         },
         "ams-site": {
+            "timeline.metrics.service.http.policy": "HTTPS_ONLY",
             "timeline.metrics.host.aggregator.minute.ttl": "604800",
             "timeline.metrics.cluster.aggregator.daily.checkpointCutOffMultiplier": "1",
             "timeline.metrics.cluster.aggregator.daily.ttl": "63072000",
@@ -868,6 +869,11 @@
         "ams-ssl-server": {
             "content": "\n"
         },
+        "ams-ssl-client": {
+            "ssl.client.truststore.location": "/etc/security/clientKeys/all.jks",
+            "ssl.client.truststore.type": "jks",
+            "ssl.client.truststore.password": "bigdata"
+        },
         "ams-grafana-ini": {
             "content": "\n"
         }
@@ -881,6 +887,7 @@
         "ams-hbase-log4j": {},
         "ams-site": {},
         "ams-ssl-server": {},
+        "ams-ssl-client": {},
         "sqoop-site": {},
         "yarn-site": {
         "final": {
@@ -966,6 +973,9 @@
         "ams-ssl-server": {
             "tag": "version1"
         },
+        "ams-ssl-client": {
+            "tag": "version1"
+        },
         "ams-hbase-policy": {
             "tag": "version1"
         },