You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by av...@apache.org on 2016/12/06 20:45:54 UTC
ambari git commit: AMBARI-19101 : Merge AMS service check support for
HA from trunk. (dsen, avijayan via avijayan)
Repository: ambari
Updated Branches:
refs/heads/branch-2.5 9d4a91a6b -> ee5651dcf
AMBARI-19101 : Merge AMS service check support for HA from trunk. (dsen, avijayan via avijayan)
Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/ee5651dc
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/ee5651dc
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/ee5651dc
Branch: refs/heads/branch-2.5
Commit: ee5651dcfbba9be46b58b06dd44c667da29255fe
Parents: 9d4a91a
Author: Aravindan Vijayan <av...@hortonworks.com>
Authored: Tue Dec 6 12:45:27 2016 -0800
Committer: Aravindan Vijayan <av...@hortonworks.com>
Committed: Tue Dec 6 12:45:27 2016 -0800
----------------------------------------------------------------------
.../ambari_commons/ambari_metrics_helper.py | 11 +-
.../ambari_commons/parallel_processing.py | 95 ++++++++
.../package/scripts/metrics_grafana_util.py | 51 ++++-
.../0.1.0/package/scripts/params.py | 6 +-
.../0.1.0/package/scripts/service_check.py | 226 ++++++++++---------
.../metrics_grafana_datasource.json.j2 | 4 +-
.../2.1/hooks/before-START/scripts/params.py | 2 +-
7 files changed, 272 insertions(+), 123 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ambari/blob/ee5651dc/ambari-common/src/main/python/ambari_commons/ambari_metrics_helper.py
----------------------------------------------------------------------
diff --git a/ambari-common/src/main/python/ambari_commons/ambari_metrics_helper.py b/ambari-common/src/main/python/ambari_commons/ambari_metrics_helper.py
index 10eb660..bfc786c 100644
--- a/ambari-common/src/main/python/ambari_commons/ambari_metrics_helper.py
+++ b/ambari-common/src/main/python/ambari_commons/ambari_metrics_helper.py
@@ -29,13 +29,16 @@ def select_metric_collector_for_sink(sink_name):
# TODO check '*' sink_name
all_collectors_string = get_metric_collectors_from_properties_file(sink_name)
- if all_collectors_string:
- all_collectors_list = all_collectors_string.split(',')
- return select_metric_collector_hosts_from_hostnames(all_collectors_list)
+ return select_metric_collector_hosts_from_hostnames(all_collectors_string)
+
+def select_metric_collector_hosts_from_hostnames(comma_separated_hosts):
+ if comma_separated_hosts:
+ hosts = comma_separated_hosts.split(',')
+ return get_random_host(hosts)
else:
return 'localhost'
-def select_metric_collector_hosts_from_hostnames(hosts):
+def get_random_host(hosts):
return random.choice(hosts)
def get_metric_collectors_from_properties_file(sink_name):
http://git-wip-us.apache.org/repos/asf/ambari/blob/ee5651dc/ambari-common/src/main/python/ambari_commons/parallel_processing.py
----------------------------------------------------------------------
diff --git a/ambari-common/src/main/python/ambari_commons/parallel_processing.py b/ambari-common/src/main/python/ambari_commons/parallel_processing.py
new file mode 100644
index 0000000..c5a95de
--- /dev/null
+++ b/ambari-common/src/main/python/ambari_commons/parallel_processing.py
@@ -0,0 +1,95 @@
+#!/usr/bin/env python
+
+'''
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+'''
+
+import logging
+from multiprocessing import Process, Queue
+
+logger = logging.getLogger()
+
+SUCCESS = "SUCCESS"
+FAILED = "FAILED"
+
+class PrallelProcessResult(object):
+ def __init__(self, element, status, result):
+ self.result = result
+ self.status = status
+ self.element = element
+
+class ParallelProcess(Process):
+
+
+ def __init__(self, function, element, params, queue):
+ self.function = function
+ self.element = element
+ self.params = params
+ self.queue = queue
+ super(ParallelProcess, self).__init__()
+
+ def return_name(self):
+ ## NOTE: self.name is an attribute of multiprocessing.Process
+ return "Process running function '%s' for element '%s'" % (self.function, self.element)
+
+ def run(self):
+ try:
+ result = self.function(self.element, self.params)
+ self.queue.put(PrallelProcessResult(self.element, SUCCESS, result))
+ except Exception as e:
+ self.queue.put(PrallelProcessResult(self.element, FAILED,
+ "Exception while running function '%s' for '%s'. Reason : %s" % (self.function, self.element, str(e))))
+ return
+
+def execute_in_parallel(function, array, params, wait_for_all = False):
+ logger.info("Started running %s for %s" % (function, array))
+ processs = []
+ q = Queue()
+ counter = len(array)
+ results = {}
+
+ for element in array:
+ process = ParallelProcess(function, element, params, q)
+ process.start()
+ processs.append(process)
+
+ while counter > 0:
+ tmp = q.get()
+ counter-=1
+ results[tmp.element] = tmp
+ if tmp.status == SUCCESS and not wait_for_all:
+ counter = 0
+
+ for process in processs:
+ process.terminate()
+
+ logger.info("Finished running %s for %s" % (function, array))
+
+ return results
+
+def func (elem, params):
+ if elem == 'S':
+ return "lalala"
+ else :
+ raise Exception('Exception')
+
+if __name__ == "__main__":
+ results = execute_in_parallel(func, ['F', 'BF', 'S'], None)
+ for result in results:
+ print results[result].element
+ print results[result].status
+ print results[result].result
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ambari/blob/ee5651dc/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/scripts/metrics_grafana_util.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/scripts/metrics_grafana_util.py b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/scripts/metrics_grafana_util.py
index aff95eb..65487b7 100644
--- a/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/scripts/metrics_grafana_util.py
+++ b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/scripts/metrics_grafana_util.py
@@ -18,16 +18,21 @@ limitations under the License.
"""
import httplib
+
+from ambari_commons.parallel_processing import PrallelProcessResult, execute_in_parallel, SUCCESS
+from service_check import post_metrics_to_collector
from resource_management.core.logger import Logger
from resource_management.core.base import Fail
from resource_management import Template
from collections import namedtuple
from urlparse import urlparse
from base64 import b64encode
+import random
import time
import socket
import ambari_simplejson as json
import network
+import os
GRAFANA_CONNECT_TRIES = 15
GRAFANA_CONNECT_TIMEOUT = 20
@@ -171,20 +176,32 @@ def perform_grafana_delete_call(url, server):
return response
-def is_unchanged_datasource_url(datasource_url):
+def is_unchanged_datasource_url(grafana_datasource_url, new_datasource_host):
import params
- parsed_url = urlparse(datasource_url)
+ parsed_url = urlparse(grafana_datasource_url)
Logger.debug("parsed url: scheme = %s, host = %s, port = %s" % (
parsed_url.scheme, parsed_url.hostname, parsed_url.port))
Logger.debug("collector: scheme = %s, host = %s, port = %s" %
- (params.metric_collector_protocol, params.metric_collector_host,
+ (params.metric_collector_protocol, new_datasource_host,
params.metric_collector_port))
return parsed_url.scheme.strip() == params.metric_collector_protocol.strip() and \
- parsed_url.hostname.strip() == params.metric_collector_host.strip() and \
+ parsed_url.hostname.strip() == new_datasource_host.strip() and \
str(parsed_url.port) == params.metric_collector_port
+def do_ams_collector_post(metric_collector_host, params):
+ ams_metrics_post_url = "/ws/v1/timeline/metrics/"
+ random_value1 = random.random()
+ headers = {"Content-type": "application/json"}
+ ca_certs = os.path.join(params.ams_collector_conf_dir,
+ params.metric_truststore_ca_certs)
+
+ current_time = int(time.time()) * 1000
+ metric_json = Template('smoketest_metrics.json.j2', hostname=params.hostname, random1=random_value1,
+ current_time=current_time).get_content()
+ post_metrics_to_collector(ams_metrics_post_url, metric_collector_host, params.metric_collector_port, params.metric_collector_https_enabled,
+ metric_json, headers, ca_certs)
def create_ams_datasource():
import params
server = Server(protocol = params.ams_grafana_protocol.strip(),
@@ -196,11 +213,28 @@ def create_ams_datasource():
"""
Create AMS datasource in Grafana, if exsists make sure the collector url is accurate
"""
- ams_datasource_json = Template('metrics_grafana_datasource.json.j2',
- ams_datasource_name=METRICS_GRAFANA_DATASOURCE_NAME).get_content()
+ Logger.info("Trying to find working metric collector")
+ results = execute_in_parallel(do_ams_collector_post, params.ams_collector_hosts.split(','), params)
+ new_datasource_host = ""
+
+ for host in params.ams_collector_hosts:
+ if host in results:
+ if results[host].status == SUCCESS:
+ new_datasource_host = host
+ Logger.info("Found working collector on host %s" % new_datasource_host)
+ break
+ else:
+ Logger.warning(results[host].result)
- Logger.info("Checking if AMS Grafana datasource already exists")
+ if new_datasource_host == "":
+ Logger.warning("All metric collectors are unavailable. Will use random collector as datasource host.")
+ new_datasource_host = params.random_metric_collector_host
+ Logger.info("New datasource host will be %s" % new_datasource_host)
+
+ ams_datasource_json = Template('metrics_grafana_datasource.json.j2',
+ ams_datasource_name=METRICS_GRAFANA_DATASOURCE_NAME, ams_datasource_host=new_datasource_host).get_content()
+ Logger.info("Checking if AMS Grafana datasource already exists")
response = perform_grafana_get_call(GRAFANA_DATASOURCE_URL, server)
create_datasource = True
@@ -215,7 +249,7 @@ def create_ams_datasource():
Logger.info("Ambari Metrics Grafana datasource already present. Checking Metrics Collector URL")
datasource_url = datasources_json[i]["url"]
- if is_unchanged_datasource_url(datasource_url):
+ if is_unchanged_datasource_url(datasource_url, new_datasource_host):
Logger.info("Metrics Collector URL validation succeeded.")
return
else: # Metrics datasource present, but collector host is wrong.
@@ -359,4 +393,3 @@ def create_ams_dashboards():
pass
-
http://git-wip-us.apache.org/repos/asf/ambari/blob/ee5651dc/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/scripts/params.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/scripts/params.py b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/scripts/params.py
index 684c312..ad66ffe 100644
--- a/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/scripts/params.py
+++ b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/scripts/params.py
@@ -123,7 +123,10 @@ if 'cluster-env' in config['configurations'] and \
'metrics_collector_vip_host' in config['configurations']['cluster-env']:
metric_collector_host = config['configurations']['cluster-env']['metrics_collector_vip_host']
else:
- metric_collector_host = select_metric_collector_hosts_from_hostnames(ams_collector_hosts.split(","))
+ metric_collector_host = select_metric_collector_hosts_from_hostnames(ams_collector_hosts)
+
+random_metric_collector_host = select_metric_collector_hosts_from_hostnames(ams_collector_hosts)
+
if 'cluster-env' in config['configurations'] and \
'metrics_collector_vip_port' in config['configurations']['cluster-env']:
metric_collector_port = config['configurations']['cluster-env']['metrics_collector_vip_port']
@@ -345,4 +348,3 @@ HdfsResource = functools.partial(
)
-
http://git-wip-us.apache.org/repos/asf/ambari/blob/ee5651dc/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/scripts/service_check.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/scripts/service_check.py b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/scripts/service_check.py
index ddd3e42..1417f4a 100644
--- a/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/scripts/service_check.py
+++ b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/scripts/service_check.py
@@ -25,6 +25,7 @@ from resource_management import Template
from ambari_commons import OSConst
from ambari_commons.os_family_impl import OsFamilyFuncImpl, OsFamilyImpl
+from ambari_commons.parallel_processing import PrallelProcessResult, execute_in_parallel, SUCCESS
import httplib
import network
@@ -39,10 +40,10 @@ import socket
class AMSServiceCheck(Script):
AMS_METRICS_POST_URL = "/ws/v1/timeline/metrics/"
AMS_METRICS_GET_URL = "/ws/v1/timeline/metrics?%s"
- AMS_CONNECT_TRIES = 30
- AMS_CONNECT_TIMEOUT = 15
- AMS_READ_TRIES = 10
- AMS_READ_TIMEOUT = 5
+ AMS_CONNECT_TRIES = 10
+ AMS_CONNECT_TIMEOUT = 10
+ AMS_READ_TRIES = 5
+ AMS_READ_TIMEOUT = 10
@OsFamilyFuncImpl(os_family=OSConst.WINSRV_FAMILY)
def service_check(self, env):
@@ -62,124 +63,139 @@ class AMSServiceCheck(Script):
if not check_windows_service_exists(params.ams_collector_win_service_name):
raise Fail("Metrics Collector service was not properly installed. Check the logs and retry the installation.")
- @OsFamilyFuncImpl(os_family=OsFamilyImpl.DEFAULT)
- def service_check(self, env):
- import params
-
- Logger.info("Ambari Metrics service check was started.")
- env.set_params(params)
-
+ def service_check_for_single_host(self, metric_collector_host, params):
random_value1 = random.random()
headers = {"Content-type": "application/json"}
ca_certs = os.path.join(params.ams_collector_conf_dir,
params.metric_truststore_ca_certs)
- for i in xrange(0, self.AMS_CONNECT_TRIES):
- try:
- current_time = int(time.time()) * 1000
- metric_json = Template('smoketest_metrics.json.j2', hostname=params.hostname, random1=random_value1,
+ current_time = int(time.time()) * 1000
+ metric_json = Template('smoketest_metrics.json.j2', hostname=params.hostname, random1=random_value1,
current_time=current_time).get_content()
- Logger.info("Generated metrics:\n%s" % metric_json)
-
- Logger.info("Connecting (POST) to %s:%s%s" % (params.metric_collector_host,
- params.metric_collector_port,
- self.AMS_METRICS_POST_URL))
- conn = network.get_http_connection(params.metric_collector_host,
+ try:
+ post_metrics_to_collector(self.AMS_METRICS_POST_URL, metric_collector_host, params.metric_collector_port, params.metric_collector_https_enabled,
+ metric_json, headers, ca_certs, self.AMS_CONNECT_TRIES, self.AMS_CONNECT_TIMEOUT)
+
+ get_metrics_parameters = {
+ "metricNames": "AMBARI_METRICS.SmokeTest.FakeMetric",
+ "appId": "amssmoketestfake",
+ "hostname": params.hostname,
+ "startTime": current_time - 60000,
+ "endTime": current_time + 61000,
+ "precision": "seconds",
+ "grouped": "false",
+ }
+ encoded_get_metrics_parameters = urllib.urlencode(get_metrics_parameters)
+
+ Logger.info("Connecting (GET) to %s:%s%s" % (metric_collector_host,
+ params.metric_collector_port,
+ self.AMS_METRICS_GET_URL % encoded_get_metrics_parameters))
+ for i in xrange(0, self.AMS_READ_TRIES):
+ conn = network.get_http_connection(metric_collector_host,
int(params.metric_collector_port),
params.metric_collector_https_enabled,
ca_certs)
- conn.request("POST", self.AMS_METRICS_POST_URL, metric_json, headers)
-
+ conn.request("GET", self.AMS_METRICS_GET_URL % encoded_get_metrics_parameters)
response = conn.getresponse()
- Logger.info("Http response: %s %s" % (response.status, response.reason))
- except (httplib.HTTPException, socket.error) as ex:
- if i < self.AMS_CONNECT_TRIES - 1: #range/xrange returns items from start to end-1
- time.sleep(self.AMS_CONNECT_TIMEOUT)
- Logger.info("Connection failed. Next retry in %s seconds."
- % (self.AMS_CONNECT_TIMEOUT))
- continue
- else:
- raise Fail("Metrics were not saved. Service check has failed. "
- "\nConnection failed.")
+ Logger.info("Http response for host %s : %s %s" % (metric_collector_host, response.status, response.reason))
- data = response.read()
- Logger.info("Http data: %s" % data)
- conn.close()
+ data = response.read()
+ Logger.info("Http data: %s" % data)
+ conn.close()
- if response.status == 200:
- Logger.info("Metrics were saved.")
- break
- else:
- Logger.info("Metrics were not saved. Service check has failed.")
- if i < self.AMS_CONNECT_TRIES - 1: #range/xrange returns items from start to end-1
- time.sleep(self.AMS_CONNECT_TIMEOUT)
- Logger.info("Next retry in %s seconds."
- % (self.AMS_CONNECT_TIMEOUT))
+ if response.status == 200:
+ Logger.info("Metrics were retrieved from host %s" % metric_collector_host)
+ else:
+ raise Fail("Metrics were not retrieved from host %s. GET request status: %s %s \n%s" %
+ (metric_collector_host, response.status, response.reason, data))
+ data_json = json.loads(data)
+
+ def floats_eq(f1, f2, delta):
+ return abs(f1-f2) < delta
+
+ values_are_present = False
+ for metrics_data in data_json["metrics"]:
+ if (str(current_time) in metrics_data["metrics"] and str(current_time + 1000) in metrics_data["metrics"]
+ and floats_eq(metrics_data["metrics"][str(current_time)], random_value1, 0.0000001)
+ and floats_eq(metrics_data["metrics"][str(current_time + 1000)], current_time, 1)):
+ Logger.info("Values %s and %s were found in the response from host %s." % (metric_collector_host, random_value1, current_time))
+ values_are_present = True
+ break
+ pass
+
+ if not values_are_present:
+ if i < self.AMS_READ_TRIES - 1: #range/xrange returns items from start to end-1
+ Logger.info("Values weren't stored yet. Retrying in %s seconds."
+ % (self.AMS_READ_TIMEOUT))
+ time.sleep(self.AMS_READ_TIMEOUT)
+ else:
+ raise Fail("Values %s and %s were not found in the response." % (random_value1, current_time))
else:
- raise Fail("Metrics were not saved. Service check has failed. POST request status: %s %s \n%s" %
- (response.status, response.reason, data))
-
- get_metrics_parameters = {
- "metricNames": "AMBARI_METRICS.SmokeTest.FakeMetric",
- "appId": "amssmoketestfake",
- "hostname": params.hostname,
- "startTime": current_time - 60000,
- "endTime": current_time + 61000,
- "precision": "seconds",
- "grouped": "false",
- }
- encoded_get_metrics_parameters = urllib.urlencode(get_metrics_parameters)
-
- Logger.info("Connecting (GET) to %s:%s%s" % (params.metric_collector_host,
- params.metric_collector_port,
- self.AMS_METRICS_GET_URL % encoded_get_metrics_parameters))
- for i in xrange(0, self.AMS_READ_TRIES):
- conn = network.get_http_connection(params.metric_collector_host,
- int(params.metric_collector_port),
- params.metric_collector_https_enabled,
- ca_certs)
- conn.request("GET", self.AMS_METRICS_GET_URL % encoded_get_metrics_parameters)
- response = conn.getresponse()
- Logger.info("Http response: %s %s" % (response.status, response.reason))
-
- data = response.read()
- Logger.info("Http data: %s" % data)
- conn.close()
-
- if response.status == 200:
- Logger.info("Metrics were retrieved.")
- else:
- Logger.info("Metrics were not retrieved. Service check has failed.")
- raise Fail("Metrics were not retrieved. Service check has failed. GET request status: %s %s \n%s" %
- (response.status, response.reason, data))
- data_json = json.loads(data)
-
- def floats_eq(f1, f2, delta):
- return abs(f1-f2) < delta
-
- values_are_present = False
- for metrics_data in data_json["metrics"]:
- if (str(current_time) in metrics_data["metrics"] and str(current_time + 1000) in metrics_data["metrics"]
- and floats_eq(metrics_data["metrics"][str(current_time)], random_value1, 0.0000001)
- and floats_eq(metrics_data["metrics"][str(current_time + 1000)], current_time, 1)):
- Logger.info("Values %s and %s were found in the response." % (random_value1, current_time))
- values_are_present = True
break
pass
+ except Fail as ex:
+ Logger.warning("Ambari Metrics service check failed on collector host %s. Reason : %s" % (metric_collector_host, str(ex)))
+ raise Fail("Ambari Metrics service check failed on collector host %s. Reason : %s" % (metric_collector_host, str(ex)))
+
+ @OsFamilyFuncImpl(os_family=OsFamilyImpl.DEFAULT)
+ def service_check(self, env):
+ import params
+
+ Logger.info("Ambari Metrics service check was started.")
+ env.set_params(params)
- if not values_are_present:
- if i < self.AMS_READ_TRIES - 1: #range/xrange returns items from start to end-1
- Logger.info("Values weren't stored yet. Retrying in %s seconds."
- % (self.AMS_READ_TIMEOUT))
- time.sleep(self.AMS_READ_TIMEOUT)
+ results = execute_in_parallel(self.service_check_for_single_host, params.ams_collector_hosts.split(','), params)
+
+ for host in str(params.ams_collector_hosts).split(","):
+ if host in results:
+ if results[host].status == SUCCESS:
+ Logger.info("Ambari Metrics service check passed on host " + host)
+ return
else:
- Logger.info("Values %s and %s were not found in the response." % (random_value1, current_time))
- raise Fail("Values %s and %s were not found in the response." % (random_value1, current_time))
- else:
- break
- pass
- Logger.info("Ambari Metrics service check is finished.")
+ Logger.warning(results[host].result)
+ raise Fail("All metrics collectors are unavailable.")
+
+def post_metrics_to_collector(ams_metrics_post_url, metric_collector_host, metric_collector_port, metric_collector_https_enabled,
+ metric_json, headers, ca_certs, tries = 1, connect_timeout = 10):
+ for i in xrange(0, tries):
+ try:
+ Logger.info("Generated metrics for host %s :\n%s" % (metric_collector_host, metric_json))
+
+ Logger.info("Connecting (POST) to %s:%s%s" % (metric_collector_host,
+ metric_collector_port,
+ ams_metrics_post_url))
+ conn = network.get_http_connection(metric_collector_host,
+ int(metric_collector_port),
+ metric_collector_https_enabled,
+ ca_certs)
+ conn.request("POST", ams_metrics_post_url, metric_json, headers)
+ response = conn.getresponse()
+ Logger.info("Http response for host %s: %s %s" % (metric_collector_host, response.status, response.reason))
+ except (httplib.HTTPException, socket.error) as ex:
+ if i < tries - 1: #range/xrange returns items from start to end-1
+ time.sleep(connect_timeout)
+ Logger.info("Connection failed for host %s. Next retry in %s seconds."
+ % (metric_collector_host, connect_timeout))
+ continue
+ else:
+ raise Fail("Metrics were not saved. Connection failed.")
+
+ data = response.read()
+ Logger.info("Http data: %s" % data)
+ conn.close()
+
+ if response.status == 200:
+ Logger.info("Metrics were saved.")
+ break
+ else:
+ Logger.info("Metrics were not saved.")
+ if i < tries - 1: #range/xrange returns items from start to end-1
+ time.sleep(tries)
+ Logger.info("Next retry in %s seconds."
+ % (tries))
+ else:
+ raise Fail("Metrics were not saved. POST request status: %s %s \n%s" %
+ (response.status, response.reason, data))
if __name__ == "__main__":
AMSServiceCheck().execute()
-
http://git-wip-us.apache.org/repos/asf/ambari/blob/ee5651dc/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/templates/metrics_grafana_datasource.json.j2
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/templates/metrics_grafana_datasource.json.j2 b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/templates/metrics_grafana_datasource.json.j2
index 678d769..05d1ae5 100644
--- a/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/templates/metrics_grafana_datasource.json.j2
+++ b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/templates/metrics_grafana_datasource.json.j2
@@ -20,7 +20,7 @@
"name": "{{ams_datasource_name}}",
"type": "ambarimetrics",
"access": "proxy",
- "url": "{{metric_collector_protocol}}://{{metric_collector_host}}:{{metric_collector_port}}",
+ "url": "{{metric_collector_protocol}}://{{ams_datasource_host}}:{{metric_collector_port}}",
"password": "",
"user": "",
"database": "",
@@ -30,4 +30,4 @@
"withCredentials": false,
"isDefault": true,
"jsonData": {}
-}
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ambari/blob/ee5651dc/ambari-server/src/main/resources/stacks/HDPWIN/2.1/hooks/before-START/scripts/params.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/stacks/HDPWIN/2.1/hooks/before-START/scripts/params.py b/ambari-server/src/main/resources/stacks/HDPWIN/2.1/hooks/before-START/scripts/params.py
index a22eb90..1058c75 100644
--- a/ambari-server/src/main/resources/stacks/HDPWIN/2.1/hooks/before-START/scripts/params.py
+++ b/ambari-server/src/main/resources/stacks/HDPWIN/2.1/hooks/before-START/scripts/params.py
@@ -29,7 +29,7 @@ if has_metric_collector:
'metrics_collector_vip_host' in config['configurations']['cluster-env']:
metric_collector_host = config['configurations']['cluster-env']['metrics_collector_vip_host']
else:
- metric_collector_host = select_metric_collector_hosts_from_hostnames(ams_collector_hosts.split(","))
+ metric_collector_host = select_metric_collector_hosts_from_hostnames(ams_collector_hosts)
if 'cluster-env' in config['configurations'] and \
'metrics_collector_vip_port' in config['configurations']['cluster-env']:
metric_collector_port = config['configurations']['cluster-env']['metrics_collector_vip_port']