You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by ao...@apache.org on 2016/04/05 19:02:44 UTC
ambari git commit: AMBARI-15710. Create script to export AMS metrics
and re-import into AMS to visualize using Grafana (aonishuk)
Repository: ambari
Updated Branches:
refs/heads/trunk 32b2f739d -> 68fe82f5d
AMBARI-15710. Create script to export AMS metrics and re-import into AMS to visualize using Grafana (aonishuk)
Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/68fe82f5
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/68fe82f5
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/68fe82f5
Branch: refs/heads/trunk
Commit: 68fe82f5d437a54432189bb1c061102375e8b843
Parents: 32b2f73
Author: Andrew Onishuk <ao...@hortonworks.com>
Authored: Tue Apr 5 20:02:37 2016 +0300
Committer: Andrew Onishuk <ao...@hortonworks.com>
Committed: Tue Apr 5 20:02:37 2016 +0300
----------------------------------------------------------------------
.../resources/scripts/export_ams_metrics.py | 367 +++++++++++++++++++
1 file changed, 367 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ambari/blob/68fe82f5/ambari-server/src/main/resources/scripts/export_ams_metrics.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/scripts/export_ams_metrics.py b/ambari-server/src/main/resources/scripts/export_ams_metrics.py
new file mode 100644
index 0000000..9344841
--- /dev/null
+++ b/ambari-server/src/main/resources/scripts/export_ams_metrics.py
@@ -0,0 +1,367 @@
+#!/usr/bin/env python
+'''
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+'''
+
+import optparse
+import sys
+import os
+import logging
+import urllib2
+import json
+import datetime
+import time
+import re
+import copy
+from flask import Flask, Response, jsonify, request, abort
+from flask.ext.cors import CORS
+from flask_restful import Resource, Api, reqparse
+
+
+
+class Params:
+
+ AMS_HOSTNAME = 'localhost'
+ AMS_PORT = '6188'
+ AMS_APP_ID = None
+ AMS_APP_ID_FORMATTED = None
+ HOSTS_FILE = None
+ METRICS_FILE = None
+ OUT_DIR = None
+ PRECISION = 'minutes'
+ START_TIME = None
+ END_TIME = None
+ METRICS = []
+ HOSTS = []
+ METRICS_METADATA = {}
+ FLASK_SERVER_NAME = None
+ METRICS_FOR_HOSTS = {}
+ HOSTS_WITH_COMPONENTS = {}
+
+ @staticmethod
+ def get_collector_uri(metricNames, hostname=None):
+ if hostname:
+ return 'http://{0}:{1}/ws/v1/timeline/metrics?metricNames={2}&hostname={3}&appId={4}&startTime={5}&endTime={6}&precision={7}' \
+ .format(Params.AMS_HOSTNAME, Params.AMS_PORT, metricNames, hostname, Params.AMS_APP_ID,
+ Params.START_TIME, Params.END_TIME, Params.PRECISION)
+ else:
+ return 'http://{0}:{1}/ws/v1/timeline/metrics?metricNames={2}&appId={3}&startTime={4}&endTime={5}&precision={6}' \
+ .format(Params.AMS_HOSTNAME, Params.AMS_PORT, metricNames, Params.AMS_APP_ID, Params.START_TIME,
+ Params.END_TIME, Params.PRECISION)
+
+ @staticmethod
+ def get_collector_metadata_uri():
+ return 'http://{0}:{1}/ws/v1/timeline/metrics/metadata'.format(Params.AMS_HOSTNAME, Params.AMS_PORT)
+ @staticmethod
+ def get_hosts_components_uri():
+ return 'http://{0}:{1}/ws/v1/timeline/metrics/hosts'.format(Params.AMS_HOSTNAME, Params.AMS_PORT)
+
+
+class Utils:
+
+ @staticmethod
+ def setup_logger(verbose, log_file):
+
+ global logger
+ logger = logging.getLogger('AmbariMetricsExport')
+ formatter = logging.Formatter('%(asctime)s %(levelname)s %(message)s')
+ if log_file:
+ filehandler = logging.FileHandler(log_file)
+ consolehandler = logging.StreamHandler()
+ filehandler.setFormatter(formatter)
+ consolehandler.setFormatter(formatter)
+ logger.addHandler(filehandler)
+ logger.addHandler(consolehandler)
+
+ # set verbose
+ if verbose:
+ # logging.basicConfig(level=logging.DEBUG)
+ logger.setLevel(logging.DEBUG)
+ else:
+ # logging.basicConfig(level=logging.INFO)
+ logger.setLevel(logging.INFO)
+
+ @staticmethod
+ def get_data_from_url(collector_uri):
+ req = urllib2.Request(collector_uri)
+ connection = None
+ try:
+ connection = urllib2.urlopen(req)
+ except Exception as e:
+ logger.error('Error on metrics GET request: %s' % collector_uri)
+ logger.error(str(e))
+ # Validate json before dumping
+ response_data = None
+ if connection:
+ try:
+ response_data = json.loads(connection.read())
+ except Exception as e:
+ logger.warn('Error parsing json data returned from URI: %s' % collector_uri)
+ logger.debug(str(e))
+
+ return response_data
+
+ @staticmethod
+ def get_epoch(input):
+ if (len(input) == 13):
+ return int(input)
+ elif (len(input) == 20):
+ return int(time.mktime(datetime.datetime.strptime(input, '%Y-%m-%dT%H:%M:%SZ').timetuple()) * 1000)
+ else:
+ return -1
+
+
+class AmsMetricsProcessor:
+
+ @staticmethod
+ def get_metrics_for_host(metrics, host=None):
+ metrics_result = {}
+ for metric in metrics:
+ uri = Params.get_collector_uri(metric, host)
+ logger.debug('Request URI: %s' % str(uri))
+ metrics_dict = Utils.get_data_from_url(uri)
+ if metrics_dict and "metrics" in metrics_dict and metrics_dict["metrics"]:
+ metrics_result[metric] = metrics_dict
+ else:
+ logger.debug("No found metric {0} on host {1}".format(metric, host))
+
+ return metrics_result
+
+ @staticmethod
+ def get_metrics_metadata():
+ url = Params.get_collector_metadata_uri()
+ all_metrics_metadata = Utils.get_data_from_url(url)
+
+ app_id = Params.AMS_APP_ID
+ if Params.AMS_APP_ID != "HOST":
+ app_id = Params.AMS_APP_ID.lower()
+
+ app_metrics_metadata = []
+ if app_id in all_metrics_metadata:
+ for app_metric in all_metrics_metadata[app_id]:
+ for metric in Params.METRICS:
+ if metric == app_metric["metricname"]:
+ app_metrics_metadata.append(app_metric)
+ logger.debug("Adding {0} to metadata".format(app_metric["metricname"]))
+ return {app_id.lower() : app_metrics_metadata}
+ else:
+ logger.error("AMS App_id={0} not found in AMS metadata".format(app_id))
+ sys.exit(1)
+
+ @staticmethod
+ def get_hosts_with_components():
+ url = Params.get_hosts_components_uri()
+ all_hosts_with_components = Utils.get_data_from_url(url)
+ hosts_with_components = {}
+ for host in Params.HOSTS:
+ if host in all_hosts_with_components:
+ hosts_with_components[host] = all_hosts_with_components[host]
+
+ return hosts_with_components
+
+ @staticmethod
+ def export_ams_metrics():
+
+ if not os.path.exists(Params.METRICS_FILE):
+ logger.error('Metrics file is required.')
+ sys.exit(1)
+ logger.info('Reading metrics file.')
+ with open(Params.METRICS_FILE, 'r') as file:
+ for line in file:
+ Params.METRICS.append(line.strip())
+ logger.info('Reading hosts file.')
+
+ if Params.HOSTS_FILE and os.path.exists(Params.HOSTS_FILE):
+ with open(Params.HOSTS_FILE, 'r') as file:
+ for line in file:
+ Params.HOSTS.append(line.strip())
+ else:
+ logger.info('No hosts file found, aggregate metrics will be exported.')
+ hosts_metrics = {}
+ if Params.HOSTS:
+ for host in Params.HOSTS:
+ hosts_metrics[host] = AmsMetricsProcessor.get_metrics_for_host(Params.METRICS, host)
+ return hosts_metrics
+ else:
+ return AmsMetricsProcessor.get_metrics_for_host(Params.METRICS, None)
+
+ def process(self):
+ self.metrics_for_hosts = self.export_ams_metrics()
+ self.metrics_metadata = self.get_metrics_metadata()
+ self.hosts_with_components = self.get_hosts_with_components()
+
+
+class FlaskServer():
+ def __init__ (self, ams_metrics_processor):
+ self.ams_metrics_processor = ams_metrics_processor
+ app = Flask(__name__)
+ api = Api(app)
+ cors = CORS(app)
+
+ api.add_resource(HostsResource, '/ws/v1/timeline/metrics/hosts', resource_class_kwargs={'ams_metrics_processor': self.ams_metrics_processor})
+ api.add_resource(MetadataResource, '/ws/v1/timeline/metrics/metadata', resource_class_kwargs={'ams_metrics_processor': self.ams_metrics_processor})
+ api.add_resource(MetricsResource, '/ws/v1/timeline/metrics', resource_class_kwargs={'ams_metrics_processor': self.ams_metrics_processor})
+
+ logger.info("Start Flask server. Server URL = " + Params.FLASK_SERVER_NAME + ":5000")
+
+ app.run(debug=True,
+ host=Params.FLASK_SERVER_NAME,
+ port=5000)
+
+class MetadataResource(Resource):
+ def __init__ (self, ams_metrics_processor):
+ self.ams_metrics_processor = ams_metrics_processor
+
+ def get(self):
+ if self.ams_metrics_processor.metrics_metadata:
+ return jsonify(self.ams_metrics_processor.metrics_metadata)
+ else:
+ abort(404)
+
+class HostsResource(Resource):
+ def __init__ (self, ams_metrics_processor):
+ self.ams_metrics_processor = ams_metrics_processor
+
+ def get(self):
+ if self.ams_metrics_processor.hosts_with_components:
+ return jsonify(self.ams_metrics_processor.hosts_with_components)
+ else:
+ abort(404)
+
+class MetricsResource(Resource):
+ def __init__ (self, ams_metrics_processor):
+ self.ams_metrics_processor = ams_metrics_processor
+
+ def get(self):
+ args = request.args
+ separator = "._"
+ if not "metricNames" in args:
+ logger.error("Bad request")
+ abort(404)
+ metric_name, operation = args["metricNames"].split(separator, 1)
+ metric_dict = {"metrics" : []}
+
+ if "hostname" in args and args["hostname"] != "":
+ host_names = args["hostname"].split(",")
+ metric_dict = {"metrics" : []}
+ for host_name in host_names:
+ if metric_name in self.ams_metrics_processor.metrics_for_hosts[host_name]:
+ metric_dict["metrics"].append(self.ams_metrics_processor.metrics_for_hosts[host_name][metric_name]["metrics"][0])
+ else:
+ continue
+ else:
+ for host in self.ams_metrics_processor.metrics_for_hosts:
+ for metric in self.ams_metrics_processor.metrics_for_hosts[host]:
+ if metric_name == metric:
+ metric_dict = self.ams_metrics_processor.metrics_for_hosts[host][metric_name]
+ break
+
+ if metric_dict:
+ metrics_json_new = copy.copy(metric_dict)
+ for i in range (0, len(metrics_json_new["metrics"])):
+ metrics_json_new["metrics"][i]["metricname"] += separator + operation
+ return jsonify(metrics_json_new)
+ else :
+ abort(404)
+ return
+
+#
+# Main.
+#
+def main():
+ parser = optparse.OptionParser(usage="usage: %prog [options]")
+ parser.set_description('This python program is a Ambari thin client and '
+ 'supports export of ambari metric data for an app '
+ 'from Ambari Metrics Service to a output dir. '
+ 'The metrics will be exported to a file with name of '
+ 'the metric and in a directory with the name as the '
+ 'hostname under the output dir.')
+
+ d = datetime.datetime.now()
+ time_suffix = '{0}-{1}-{2}-{3}-{4}-{5}'.format(d.year, d.month, d.day,
+ d.hour, d.minute, d.second)
+ print 'Time: %s' % time_suffix
+ logfile = os.path.join('/tmp', 'ambari_metrics_export.out')
+
+ parser.add_option("-v", "--verbose", dest="verbose", action="store_false",
+ default=False, help="output verbosity.")
+ parser.add_option("-s", "--host", dest="server_hostname",
+ help="AMS host name.")
+ parser.add_option("-p", "--port", dest="server_port",
+ default="6188", help="AMS port. [default: 6188]")
+ parser.add_option("-a", "--app-id", dest="app_id",
+ help="AMS app id.")
+ parser.add_option("-m", "--metrics-file", dest="metrics_file",
+ help="Metrics file with metric names to query. New line separated.")
+ parser.add_option("-f", "--host-file", dest="hosts_file",
+ help="Host file with hostnames to query. New line separated.")
+ parser.add_option("-l", "--logfile", dest="log_file", default=logfile,
+ metavar='FILE', help="Log file. [default: %s]" % logfile)
+ parser.add_option("-r", "--precision", dest="precision",
+ default='minutes', help="AMS API precision, default = minutes.")
+ parser.add_option("-b", "--start_time", dest="start_time",
+ help="Start time in milliseconds since epoch or UTC timestamp in YYYY-MM-DDTHH:mm:ssZ format.")
+ parser.add_option("-e", "--end_time", dest="end_time",
+ help="End time in milliseconds since epoch or UTC timestamp in YYYY-MM-DDTHH:mm:ssZ format.")
+ parser.add_option("-n", "--flask-server_name", dest="server_name",
+ help="Flask server name, default = 127.0.0.1", default="127.0.0.1")
+
+ (options, args) = parser.parse_args()
+
+ Params.AMS_HOSTNAME = options.server_hostname
+
+ Params.AMS_PORT = options.server_port
+
+ Params.AMS_APP_ID = options.app_id
+
+ Params.METRICS_FILE = options.metrics_file
+
+ Params.HOSTS_FILE = options.hosts_file
+
+ Params.PRECISION = options.precision
+
+ Params.FLASK_SERVER_NAME = options.server_name
+
+ Utils.setup_logger(options.verbose, options.log_file)
+
+ Params.START_TIME = Utils.get_epoch(options.start_time)
+
+ if Params.START_TIME == -1:
+ logger.warn('No start time provided, or it is in the wrong format. Please '
+ 'provide milliseconds since epoch or a value in YYYY-MM-DDTHH:mm:ssZ format')
+ logger.info('Aborting...')
+ sys.exit(1)
+
+ Params.END_TIME = Utils.get_epoch(options.end_time)
+
+ if Params.END_TIME == -1:
+ logger.warn('No end time provided, or it is in the wrong format. Please '
+ 'provide milliseconds since epoch or a value in YYYY-MM-DDTHH:mm:ssZ format')
+ logger.info('Aborting...')
+ sys.exit(1)
+
+ ams_metrics_processor = AmsMetricsProcessor()
+ ams_metrics_processor.process()
+ FlaskServer(ams_metrics_processor)
+
+
+if __name__ == "__main__":
+ try:
+ main()
+ except (KeyboardInterrupt, EOFError):
+ print("\nAborting ... Keyboard Interrupt.")
+ sys.exit(1)