You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@heron.apache.org by ob...@apache.org on 2020/08/15 02:29:09 UTC
[incubator-heron] 01/02: Clean heron-ui backend
This is an automated email from the ASF dual-hosted git repository.
obristow pushed a commit to branch obristow/clean-ui-backend
in repository https://gitbox.apache.org/repos/asf/incubator-heron.git
commit 067c259c7d902a6b5e7827b6d23e055ea9582754
Author: Oliver Bristow <ev...@gmail.com>
AuthorDate: Fri Jul 31 00:15:06 2020 +0100
Clean heron-ui backend
* replace tornado server with FastAPI+Jinja2+Uvicorn
* factor out torado from heron.tools.common
* add type annotations
* expose port 8889 of Vagrant VM for heron-ui
* pin to latest 4.x version of Tornado
* visual changes to container pages
This should only be an internal refactor.
Async querying of the tracker was dropped, but could be reintroduced
using [aiohttp](https://docs.aiohttp.org/en/stable/) if speed becomes
an issue for requests that aggregate tracker data.
---
heron/common/src/python/utils/__init__.py | 3 +-
heron/instance/src/python/utils/__init__.py | 3 +-
heron/shell/src/python/BUILD | 2 +-
heron/tools/cli/src/python/activate.py | 25 +-
heron/tools/common/src/python/BUILD | 4 +-
heron/tools/common/src/python/access/fetch.py | 79 --
heron/tools/common/src/python/access/heron_api.py | 912 --------------------
heron/tools/common/src/python/access/query.py | 62 --
.../common/src/python/access/tracker_access.py | 134 ---
.../src/python/{access => clients}/__init__.py | 7 +-
heron/tools/common/src/python/clients/tracker.py | 922 +++++++++++++++++++++
heron/tools/explorer/src/python/BUILD | 3 +-
heron/tools/explorer/src/python/logicalplan.py | 13 +-
heron/tools/explorer/src/python/main.py | 31 +-
heron/tools/explorer/src/python/physicalplan.py | 60 +-
heron/tools/explorer/src/python/topologies.py | 14 +-
.../explorer/tests/python/explorer_unittest.py | 4 +-
heron/tools/tracker/src/python/BUILD | 2 +-
.../ui/resources/static/js/stat-trendlines.js | 2 +-
heron/tools/ui/resources/static/js/topologies.js | 32 +-
.../tools/ui/resources/templates/application.html | 34 +-
heron/tools/ui/resources/templates/browse.html | 47 +-
heron/tools/ui/resources/templates/config.html | 6 +-
heron/tools/ui/resources/templates/error.html | 8 +-
heron/tools/ui/resources/templates/exception.html | 6 +-
heron/tools/ui/resources/templates/file.html | 8 +-
heron/tools/ui/resources/templates/nav.html | 8 +-
heron/tools/ui/resources/templates/shell.snip.html | 8 +
heron/tools/ui/resources/templates/topologies.html | 8 +-
heron/tools/ui/resources/templates/topology.html | 28 +-
heron/tools/ui/src/python/BUILD | 6 +-
heron/tools/ui/src/python/args.py | 134 ---
heron/tools/ui/src/python/consts.py | 35 -
heron/tools/ui/src/python/handlers/__init__.py | 36 -
heron/tools/ui/src/python/handlers/api/__init__.py | 36 -
heron/tools/ui/src/python/handlers/api/metrics.py | 99 ---
heron/tools/ui/src/python/handlers/api/topology.py | 358 --------
heron/tools/ui/src/python/handlers/base.py | 45 -
.../ui/src/python/handlers/common/__init__.py | 21 -
.../tools/ui/src/python/handlers/common/consts.py | 22 -
heron/tools/ui/src/python/handlers/common/utils.py | 33 -
heron/tools/ui/src/python/handlers/mainhandler.py | 32 -
heron/tools/ui/src/python/handlers/notfound.py | 36 -
heron/tools/ui/src/python/handlers/ranges.py | 55 --
heron/tools/ui/src/python/handlers/topology.py | 316 -------
heron/tools/ui/src/python/main.py | 779 +++++++++++++----
integration_test/src/python/http_server/BUILD | 2 +-
vagrant/Vagrantfile | 2 +
48 files changed, 1731 insertions(+), 2791 deletions(-)
diff --git a/heron/common/src/python/utils/__init__.py b/heron/common/src/python/utils/__init__.py
index cf16dbf..1e7010c 100644
--- a/heron/common/src/python/utils/__init__.py
+++ b/heron/common/src/python/utils/__init__.py
@@ -15,5 +15,4 @@
# specific language governing permissions and limitations
# under the License.
'''common utility modules'''
-__all__ = ['metrics', 'misc', 'topology', 'config', 'tracker_access',
- 'tuple', 'proc', 'log']
+__all__ = ['proc', 'log']
diff --git a/heron/instance/src/python/utils/__init__.py b/heron/instance/src/python/utils/__init__.py
index 223a4dd..e297bac 100644
--- a/heron/instance/src/python/utils/__init__.py
+++ b/heron/instance/src/python/utils/__init__.py
@@ -15,5 +15,4 @@
# specific language governing permissions and limitations
# under the License.
'''common utility modules'''
-__all__ = ['metrics', 'misc', 'topology', 'config', 'tracker_access',
- 'system_constants', 'system_config', 'tuple', 'proc', 'log']
+__all__ = ['system_constants', 'system_config', 'tuple']
diff --git a/heron/shell/src/python/BUILD b/heron/shell/src/python/BUILD
index 0ff887c..52cf78d 100644
--- a/heron/shell/src/python/BUILD
+++ b/heron/shell/src/python/BUILD
@@ -7,7 +7,7 @@ pex_library(
),
reqs = [
"requests==2.12.3",
- "tornado==4.0.2",
+ "tornado==4.5.3",
],
deps = [
"//heron/common/src/python:common-py",
diff --git a/heron/tools/cli/src/python/activate.py b/heron/tools/cli/src/python/activate.py
index 3567484..47a848c 100644
--- a/heron/tools/cli/src/python/activate.py
+++ b/heron/tools/cli/src/python/activate.py
@@ -20,7 +20,11 @@
''' activate '''
from heron.common.src.python.utils.log import Log
-import heron.tools.cli.src.python.cli_helper as cli_helper
+from heron.tools.cli.src.python import cli_helper
+from heron.tools.common.src.python.clients import tracker
+
+import requests
+
def create_parser(subparsers):
'''
@@ -32,12 +36,13 @@ def create_parser(subparsers):
# pylint: disable=unused-argument
def run(command, parser, cl_args, unknown_args):
- '''
- :param command:
- :param parser:
- :param cl_args:
- :param unknown_args:
- :return:
- '''
- Log.debug("Activate Args: %s", cl_args)
- return cli_helper.run(command, cl_args, "activate topology")
+ """ run command """
+ try:
+ clusters = tracker.get_clusters()
+ except requests.ConnectTimeout as e:
+ Log.error(f"Fail to connect to tracker: {e}")
+ return False
+ print('Available clusters:')
+ for cluster in clusters:
+ print(' %s' % cluster)
+ return True
diff --git a/heron/tools/common/src/python/BUILD b/heron/tools/common/src/python/BUILD
index 848e59c..8ce984a 100644
--- a/heron/tools/common/src/python/BUILD
+++ b/heron/tools/common/src/python/BUILD
@@ -2,14 +2,14 @@ package(default_visibility = ["//visibility:public"])
pex_library(
name = "tracker-py",
- srcs = glob(["access/*.py"]),
+ srcs = glob(["clients/*.py"]),
)
pex_library(
name = "common-py",
srcs = glob(
["**/*.py"],
- exclude = ["access"],
+ exclude = ["clients"],
exclude_directories = 1,
),
reqs = ["PyYAML==3.13"],
diff --git a/heron/tools/common/src/python/access/fetch.py b/heron/tools/common/src/python/access/fetch.py
deleted file mode 100644
index bc8b8a3..0000000
--- a/heron/tools/common/src/python/access/fetch.py
+++ /dev/null
@@ -1,79 +0,0 @@
-#!/usr/bin/env python3
-# -*- encoding: utf-8 -*-
-
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-''' fetch.py '''
-import json
-import time
-
-import tornado.httpclient
-import tornado.gen
-
-from heron.common.src.python.utils import log
-
-Log = log.Log
-
-################################################################################
-
-################################################################################
-@tornado.gen.coroutine
-def fetch_url_as_json(fetch_url, default_value=None):
- '''
- Fetch the given url and convert the response to json.
- :param fetch_url: URL to fetch
- :param default_value: value to return in case of failure
- :return:
- '''
- # assign empty dict for optional param
- if default_value is None:
- default_value = dict()
-
- Log.debug("fetching url %s", fetch_url)
- ret = default_value
-
- # time the duration of the fetch
- start = time.time()
-
- # fetch the URL asynchronously
- http_response = yield tornado.httpclient.AsyncHTTPClient().fetch(fetch_url)
-
- # handle http errors, and return if any
- if http_response.error:
- Log.error("Unable to get response from %s. Error %s", fetch_url, http_response.error)
- raise tornado.gen.Return(ret)
-
- # load response and handle return errors, if any
- response = json.loads(http_response.body)
- if not 'result' in response:
- Log.error("Empty response from %s", fetch_url)
- raise tornado.gen.Return(ret)
-
- # get the response and execution time on server side
- ret = response['result']
- execution = 1000 * response['executiontime']
-
- # calculate the time
- end = time.time()
- duration = 1000 * (end - start)
-
- Log.debug("TIME: url fetch took %.2f ms server time %s", execution, fetch_url)
- Log.debug("TIME: url fetch took %.2f ms round trip %s", duration, fetch_url)
-
- # convert future to value
- raise tornado.gen.Return(ret)
diff --git a/heron/tools/common/src/python/access/heron_api.py b/heron/tools/common/src/python/access/heron_api.py
deleted file mode 100644
index 6e77e51..0000000
--- a/heron/tools/common/src/python/access/heron_api.py
+++ /dev/null
@@ -1,912 +0,0 @@
-#!/usr/bin/env python3
-# -*- encoding: utf-8 -*-
-
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-''' heron.py '''
-
-import logging
-
-import tornado.httpclient
-import tornado.gen
-from tornado.options import options
-from .fetch import fetch_url_as_json
-from .query import QueryHandler
-
-# pylint: disable=bad-whitespace
-CLUSTER_URL_FMT = "%s/clusters"
-TOPOLOGIES_URL_FMT = "%s/topologies"
-TOPOLOGIES_STATS_URL_FMT = "%s/states" % TOPOLOGIES_URL_FMT
-EXECUTION_STATE_URL_FMT = "%s/executionstate" % TOPOLOGIES_URL_FMT
-LOGICALPLAN_URL_FMT = "%s/logicalplan" % TOPOLOGIES_URL_FMT
-PHYSICALPLAN_URL_FMT = "%s/physicalplan" % TOPOLOGIES_URL_FMT
-PACKINGPLAN_URL_FMT = "%s/packingplan" % TOPOLOGIES_URL_FMT
-SCHEDULER_LOCATION_URL_FMT = "%s/schedulerlocation" % TOPOLOGIES_URL_FMT
-
-METRICS_URL_FMT = "%s/metrics" % TOPOLOGIES_URL_FMT
-METRICS_QUERY_URL_FMT = "%s/metricsquery" % TOPOLOGIES_URL_FMT
-METRICS_TIMELINE_URL_FMT = "%s/metricstimeline" % TOPOLOGIES_URL_FMT
-
-EXCEPTIONS_URL_FMT = "%s/exceptions" % TOPOLOGIES_URL_FMT
-EXCEPTION_SUMMARY_URL_FMT = "%s/exceptionsummary" % TOPOLOGIES_URL_FMT
-
-INFO_URL_FMT = "%s/info" % TOPOLOGIES_URL_FMT
-PID_URL_FMT = "%s/pid" % TOPOLOGIES_URL_FMT
-JSTACK_URL_FMT = "%s/jstack" % TOPOLOGIES_URL_FMT
-JMAP_URL_FMT = "%s/jmap" % TOPOLOGIES_URL_FMT
-HISTOGRAM_URL_FMT = "%s/histo" % TOPOLOGIES_URL_FMT
-
-FILE_DATA_URL_FMT = "%s/containerfiledata" % TOPOLOGIES_URL_FMT
-FILE_DOWNLOAD_URL_FMT = "%s/containerfiledownload" % TOPOLOGIES_URL_FMT
-FILESTATS_URL_FMT = "%s/containerfilestats" % TOPOLOGIES_URL_FMT
-
-capacity = "DIVIDE(" \
- " DEFAULT(0," \
- " MULTIPLY(" \
- " TS({0},{1},__execute-count/default)," \
- " TS({0},{1},__execute-latency/default)" \
- " )" \
- " )," \
- " 60000000000" \
- ")"
-
-failures = "DEFAULT(0," \
- " DIVIDE(" \
- " TS({0},{1},__fail-count/default)," \
- " SUM(" \
- " DEFAULT(1, TS({0},{1},__execute-count/default))," \
- " DEFAULT(0, TS({0},{1},__fail-count/default))" \
- " )" \
- " )" \
- ")"
-
-cpu = "DEFAULT(0, TS({0},{1},__jvm-process-cpu-load))"
-
-memory = "DIVIDE(" \
- " DEFAULT(0, TS({0},{1},__jvm-memory-used-mb))," \
- " DEFAULT(1, TS({0},{1},__jvm-memory-mb-total))" \
- ")"
-
-gc = "RATE(TS({0},{1},__jvm-gc-collection-time-ms))"
-
-backpressure = "DEFAULT(0, TS(__stmgr__,*,__time_spent_back_pressure_by_compid/{0}))"
-
-queries = dict(
- cpu=cpu,
- capacity=capacity,
- failures=failures,
- memory=memory,
- gc=gc,
- backpressure=backpressure
-)
-
-
-def get_tracker_endpoint():
- '''
- Get the endpoint for heron tracker
- :return:
- '''
- return options.tracker_url
-
-
-def create_url(fmt):
- '''
- Given an URL format, substitute with tracker service endpoint
- :param fmt:
- :return:
- '''
- return fmt % get_tracker_endpoint()
-
-
-@tornado.gen.coroutine
-def get_clusters():
- '''
- :return:
- '''
- request_url = create_url(CLUSTER_URL_FMT)
- raise tornado.gen.Return((yield fetch_url_as_json(request_url)))
-
-
-################################################################################
-@tornado.gen.coroutine
-def get_topologies():
- '''
- Get the list of topologies given a data center from heron tracker
- :return:
- '''
- request_url = create_url(TOPOLOGIES_URL_FMT)
- raise tornado.gen.Return((yield fetch_url_as_json(request_url)))
-
-
-################################################################################
-@tornado.gen.coroutine
-def get_topologies_states():
- '''
- Get the list of topologies and their states
- :return:
- '''
- request_url = create_url(TOPOLOGIES_STATS_URL_FMT)
- raise tornado.gen.Return((yield fetch_url_as_json(request_url)))
-
-
-@tornado.gen.coroutine
-def _get_topologies(cluster, role=None, env=None):
- endpoint = create_url(TOPOLOGIES_URL_FMT)
- params = dict(cluster=cluster)
- if role is not None:
- params['role'] = role
- if env is not None:
- params['environ'] = env
- request_url = tornado.httputil.url_concat(endpoint, params)
- raise tornado.gen.Return((yield fetch_url_as_json(request_url)))
-
-
-################################################################################
-def get_cluster_topologies(cluster):
- '''
- Get the list of topologies given a cluster
- :param cluster:
- :return:
- '''
- return _get_topologies(cluster)
-
-
-################################################################################
-def get_cluster_role_topologies(cluster, role):
- '''
- Get the list of topologies given a cluster submitted by a given role
- :param cluster:
- :param role:
- :return:
- '''
- return _get_topologies(cluster, role=role)
-
-
-################################################################################
-def get_cluster_role_env_topologies(cluster, role, env):
- '''
- Get the list of topologies given a cluster submitted by a given role under a given environment
- :param cluster:
- :param role:
- :param env:
- :return:
- '''
- return _get_topologies(cluster, role=role, env=env)
-
-
-################################################################################
-@tornado.gen.coroutine
-def get_execution_state(cluster, environ, topology, role=None):
- '''
- Get the execution state of a topology in a cluster
- :param cluster:
- :param environ:
- :param topology:
- :param role:
- :return:
- '''
- params = dict(cluster=cluster, environ=environ, topology=topology)
- if role is not None:
- params['role'] = role
- request_url = tornado.httputil.url_concat(create_url(EXECUTION_STATE_URL_FMT), params)
- raise tornado.gen.Return((yield fetch_url_as_json(request_url)))
-
-
-################################################################################
-@tornado.gen.coroutine
-def get_logical_plan(cluster, environ, topology, role=None):
- '''
- Get the logical plan state of a topology in a cluster
- :param cluster:
- :param environ:
- :param topology:
- :param role:
- :return:
- '''
- params = dict(cluster=cluster, environ=environ, topology=topology)
- if role is not None:
- params['role'] = role
- request_url = tornado.httputil.url_concat(
- create_url(LOGICALPLAN_URL_FMT), params)
- raise tornado.gen.Return((yield fetch_url_as_json(request_url)))
-
-
-################################################################################
-@tornado.gen.coroutine
-def get_comps(cluster, environ, topology, role=None):
- '''
- Get the list of component names for the topology from Heron Nest
- :param cluster:
- :param environ:
- :param topology:
- :param role:
- :return:
- '''
- params = dict(cluster=cluster, environ=environ, topology=topology)
- if role is not None:
- params['role'] = role
- request_url = tornado.httputil.url_concat(
- create_url(LOGICALPLAN_URL_FMT), params)
- lplan = yield fetch_url_as_json(request_url)
- comps = list(lplan['spouts'].keys()) + list(lplan['bolts'].keys())
- raise tornado.gen.Return(comps)
-
-################################################################################
-@tornado.gen.coroutine
-def get_instances(cluster, environ, topology, role=None):
- '''
- Get the list of instances for the topology from Heron Nest
- :param cluster:
- :param environ:
- :param topology:
- :param role:
- :return:
- '''
- params = dict(cluster=cluster, environ=environ, topology=topology)
- if role is not None:
- params['role'] = role
- request_url = tornado.httputil.url_concat(
- create_url(PHYSICALPLAN_URL_FMT), params)
- pplan = yield fetch_url_as_json(request_url)
- instances = list(pplan['instances'].keys())
- raise tornado.gen.Return(instances)
-
-
-################################################################################
-@tornado.gen.coroutine
-def get_packing_plan(cluster, environ, topology, role=None):
- '''
- Get the packing plan state of a topology in a cluster from tracker
- :param cluster:
- :param environ:
- :param topology:
- :param role:
- :return:
- '''
- params = dict(cluster=cluster, environ=environ, topology=topology)
- if role is not None:
- params['role'] = role
- request_url = tornado.httputil.url_concat(
- create_url(PACKINGPLAN_URL_FMT), params)
- raise tornado.gen.Return((yield fetch_url_as_json(request_url)))
-
-
-################################################################################
-@tornado.gen.coroutine
-def get_physical_plan(cluster, environ, topology, role=None):
- '''
- Get the physical plan state of a topology in a cluster from tracker
- :param cluster:
- :param environ:
- :param topology:
- :param role:
- :return:
- '''
- params = dict(cluster=cluster, environ=environ, topology=topology)
- if role is not None:
- params['role'] = role
- request_url = tornado.httputil.url_concat(
- create_url(PHYSICALPLAN_URL_FMT), params)
- raise tornado.gen.Return((yield fetch_url_as_json(request_url)))
-
-
-################################################################################
-@tornado.gen.coroutine
-def get_scheduler_location(cluster, environ, topology, role=None):
- '''
- Get the scheduler location of a topology in a cluster from tracker
- :param cluster:
- :param environ:
- :param topology:
- :param role:
- :return:
- '''
- params = dict(cluster=cluster, environ=environ, topology=topology)
- if role is not None:
- params['role'] = role
- request_url = tornado.httputil.url_concat(
- create_url(SCHEDULER_LOCATION_URL_FMT), params)
- raise tornado.gen.Return((yield fetch_url_as_json(request_url)))
-
-
-################################################################################
-@tornado.gen.coroutine
-def get_component_exceptionsummary(cluster, environ, topology, component, role=None):
- '''
- Get summary of exception for a component
- :param cluster:
- :param environ:
- :param topology:
- :param component:
- :param role:
- :return:
- '''
- params = dict(
- cluster=cluster,
- environ=environ,
- topology=topology,
- component=component)
- if role is not None:
- params['role'] = role
- request_url = tornado.httputil.url_concat(
- create_url(EXCEPTION_SUMMARY_URL_FMT), params)
- raise tornado.gen.Return((yield fetch_url_as_json(request_url)))
-
-
-################################################################################
-@tornado.gen.coroutine
-def get_component_exceptions(cluster, environ, topology, component, role=None):
- '''
- Get exceptions for 'component' for 'topology'
- :param cluster:
- :param environ:
- :param topology:
- :param component:
- :param role:
- :return:
- '''
- params = dict(
- cluster=cluster,
- environ=environ,
- topology=topology,
- component=component)
- if role is not None:
- params['role'] = role
- request_url = tornado.httputil.url_concat(
- create_url(EXCEPTIONS_URL_FMT), params)
- raise tornado.gen.Return((yield fetch_url_as_json(request_url)))
-
-
-################################################################################
-@tornado.gen.coroutine
-def get_comp_instance_metrics(cluster, environ, topology, component,
- metrics, instances, time_range, role=None):
- '''
- Get the metrics for some instances of a topology from tracker
- :param cluster:
- :param environ:
- :param topology:
- :param component:
- :param metrics: dict of display name to cuckoo name
- :param instances:
- :param time_range: 2-tuple consisting of start and end of range
- :param role:
- :return:
- '''
- params = dict(
- cluster=cluster,
- environ=environ,
- topology=topology,
- component=component)
- if role is not None:
- params['role'] = role
-
- # form the fetch url
- request_url = tornado.httputil.url_concat(
- create_url(METRICS_URL_FMT), params)
-
- # convert a single instance to a list, if needed
- all_instances = instances if isinstance(instances, list) else [instances]
-
- # append each metric to the url
- for _, metric_name in list(metrics.items()):
- request_url = tornado.httputil.url_concat(request_url, dict(metricname=metric_name[0]))
-
- # append each instance to the url
- for i in all_instances:
- request_url = tornado.httputil.url_concat(request_url, dict(instance=i))
-
- # append the time interval to the url
- request_url = tornado.httputil.url_concat(request_url, dict(interval=time_range[1]))
-
- raise tornado.gen.Return((yield fetch_url_as_json(request_url)))
-
-
-################################################################################
-@tornado.gen.coroutine
-def get_comp_metrics(cluster, environ, topology, component,
- instances, metricnames, time_range, role=None):
- '''
- Get the metrics for all the instances of a topology from Heron Nest
- :param cluster:
- :param environ:
- :param topology:
- :param component:
- :param instances:
- :param metricnames: dict of display name to cuckoo name
- :param time_range: 2-tuple consisting of start and end of range
- :param role:
- :return:
- '''
- params = dict(
- cluster=cluster,
- environ=environ,
- topology=topology,
- component=component)
- if role is not None:
- params['role'] = role
-
- # form the url
- request_url = tornado.httputil.url_concat(
- create_url(METRICS_URL_FMT), params)
-
- # append each metric to the url
- for metric_name in metricnames:
- request_url = tornado.httputil.url_concat(request_url, dict(metricname=metric_name))
-
- # append each instance to the url
- for instance in instances:
- request_url = tornado.httputil.url_concat(request_url, dict(instance=instance))
-
- # append the time interval to the url
- request_url = tornado.httputil.url_concat(request_url, dict(interval=time_range[1]))
-
- raise tornado.gen.Return((yield fetch_url_as_json(request_url)))
-
-
-################################################################################
-@tornado.gen.coroutine
-def get_metrics(cluster, environment, topology, timerange, query, role=None):
- '''
- Get the metrics for a topology from tracker
- :param cluster:
- :param environment:
- :param topology:
- :param timerange:
- :param query:
- :param role:
- :return:
- '''
- params = dict(
- cluster=cluster,
- environ=environment,
- topology=topology,
- starttime=timerange[0],
- endtime=timerange[1],
- query=query)
-
- if role is not None:
- params['role'] = role
-
- request_url = tornado.httputil.url_concat(
- create_url(METRICS_QUERY_URL_FMT), params
- )
-
- logging.info("get_metrics %s", request_url)
- raise tornado.gen.Return((yield fetch_url_as_json(request_url)))
-
-
-################################################################################
-@tornado.gen.coroutine
-def get_comp_metrics_timeline(cluster, environ, topology, component,
- instances, metricnames, time_range, role=None):
- '''
- Get the minute-by-minute metrics for all instances of a topology from tracker
- :param cluster:
- :param environ:
- :param topology:
- :param component:
- :param instances:
- :param metricnames: dict of display name to cuckoo name
- :param time_range: 2-tuple consisting of start and end of range
- :param role:
- :return:
- '''
- params = dict(
- cluster=cluster,
- environ=environ,
- topology=topology,
- component=component)
-
- if role is not None:
- params['role'] = role
-
- # form the url
- request_url = tornado.httputil.url_concat(create_url(METRICS_TIMELINE_URL_FMT), params)
-
- if role is not None:
- request_url = tornado.httputil.url_concat(request_url, dict(role=role))
-
- # append each metric to the url
- for metric_name in metricnames:
- request_url = tornado.httputil.url_concat(request_url, dict(metricname=metric_name))
-
- # append each instance to the url
- for instance in instances:
- request_url = tornado.httputil.url_concat(request_url, dict(instance=instance))
-
- # append the time interval to the url
- request_url = tornado.httputil.url_concat(
- request_url, dict(starttime=time_range[0], endtime=time_range[1]))
-
- raise tornado.gen.Return((yield fetch_url_as_json(request_url)))
-
-
-@tornado.gen.coroutine
-def get_topology_info(cluster, environ, topology, role=None):
- '''
- :param cluster:
- :param environ:
- :param topology:
- :param role:
- :return:
- '''
- params = dict(
- cluster=cluster,
- environ=environ,
- topology=topology)
-
- if role is not None:
- params['role'] = role
-
- request_url = tornado.httputil.url_concat(create_url(INFO_URL_FMT), params)
-
- raise tornado.gen.Return((yield fetch_url_as_json(request_url)))
-
-
-# Get pid of the instance
-@tornado.gen.coroutine
-def get_instance_pid(cluster, environ, topology, instance, role=None):
- '''
- :param cluster:
- :param environ:
- :param topology:
- :param instance:
- :param role:
- :return:
- '''
- params = dict(
- cluster=cluster,
- environ=environ,
- topology=topology,
- instance=instance)
-
- if role is not None:
- params['role'] = role
-
- request_url = tornado.httputil.url_concat(create_url(PID_URL_FMT), params)
-
- raise tornado.gen.Return((yield fetch_url_as_json(request_url)))
-
-
-# Get jstack of instance
-@tornado.gen.coroutine
-def get_instance_jstack(cluster, environ, topology, instance, role=None):
- '''
- :param cluster:
- :param environ:
- :param topology:
- :param instance:
- :param role:
- :return:
- '''
- params = dict(
- cluster=cluster,
- environ=environ,
- topology=topology,
- instance=instance)
-
- if role is not None:
- params['role'] = role
-
- request_url = tornado.httputil.url_concat(
- create_url(JSTACK_URL_FMT), params)
-
- raise tornado.gen.Return((yield fetch_url_as_json(request_url)))
-
-
-# Get histogram of active memory objects.
-@tornado.gen.coroutine
-def get_instance_mem_histogram(cluster, environ, topology, instance, role=None):
- '''
- :param cluster:
- :param environ:
- :param topology:
- :param instance:
- :param role:
- :return:
- '''
- params = dict(
- cluster=cluster,
- environ=environ,
- topology=topology,
- instance=instance)
-
- if role is not None:
- params['role'] = role
-
- request_url = tornado.httputil.url_concat(
- create_url(HISTOGRAM_URL_FMT), params)
-
- raise tornado.gen.Return((yield fetch_url_as_json(request_url)))
-
-
-# Call heap dump for an instance and save it at /tmp/heap.bin
-@tornado.gen.coroutine
-def run_instance_jmap(cluster, environ, topology, instance, role=None):
- '''
- :param cluster:
- :param environ:
- :param topology:
- :param instance:
- :param role:
- :return:
- '''
- params = dict(
- cluster=cluster,
- environ=environ,
- topology=topology,
- instance=instance)
-
- if role is not None:
- params['role'] = role
-
- request_url = tornado.httputil.url_concat(
- create_url(JMAP_URL_FMT), params)
-
- if role is not None:
- request_url = tornado.httputil.url_concat(request_url, dict(role=role))
-
- raise tornado.gen.Return((yield fetch_url_as_json(request_url)))
-
-# Get a url to download a file from the container
-def get_container_file_download_url(cluster, environ, topology, container,
- path, role=None):
- '''
- :param cluster:
- :param environ:
- :param topology:
- :param container:
- :param path:
- :param role:
- :return:
- '''
- params = dict(
- cluster=cluster,
- environ=environ,
- topology=topology,
- container=container,
- path=path)
- if role is not None:
- params['role'] = role
-
- request_url = tornado.httputil.url_concat(
- create_url(FILE_DOWNLOAD_URL_FMT), params)
-
- if role is not None:
- request_url = tornado.httputil.url_concat(request_url, dict(role=role))
- return request_url
-
-# Get file data from the container
-@tornado.gen.coroutine
-def get_container_file_data(cluster, environ, topology, container,
- path, offset, length, role=None):
- '''
- :param cluster:
- :param environ:
- :param topology:
- :param container:
- :param path:
- :param offset:
- :param length:
- :param role:
- :return:
- '''
- params = dict(
- cluster=cluster,
- environ=environ,
- topology=topology,
- container=container,
- path=path,
- offset=offset,
- length=length)
-
- if role is not None:
- params['role'] = role
-
- request_url = tornado.httputil.url_concat(
- create_url(FILE_DATA_URL_FMT), params)
-
- if role is not None:
- request_url = tornado.httputil.url_concat(request_url, dict(role=role))
-
- raise tornado.gen.Return((yield fetch_url_as_json(request_url)))
-
-
-# Get filestats
-@tornado.gen.coroutine
-def get_filestats(cluster, environ, topology, container, path, role=None):
- '''
- :param cluster:
- :param environ:
- :param topology:
- :param container:
- :param path:
- :param role:
- :return:
- '''
- params = dict(
- cluster=cluster,
- environ=environ,
- topology=topology,
- container=container,
- path=path)
-
- if role is not None:
- params['role'] = role
-
- request_url = tornado.httputil.url_concat(create_url(FILESTATS_URL_FMT), params)
-
- raise tornado.gen.Return((yield fetch_url_as_json(request_url)))
-
-
-class HeronQueryHandler(QueryHandler):
- ''' HeronQueryHandler '''
-
- @tornado.gen.coroutine
- def fetch(self, cluster, metric, topology, component, instance, timerange, environ=None):
- '''
- :param cluster:
- :param metric:
- :param topology:
- :param component:
- :param instance:
- :param timerange:
- :param environ:
- :return:
- '''
- components = [component] if component != "*" else (yield get_comps(cluster, environ, topology))
-
- futures = []
- for comp in components:
- query = self.get_query(metric, comp, instance)
- future = get_metrics(cluster, environ, topology, timerange, query)
- futures.append(future)
-
- results = yield futures
-
- timelines = []
- for result in results:
- timelines.extend(result["timeline"])
-
- result = self.get_metric_response(timerange, timelines, False)
-
- raise tornado.gen.Return(result)
-
- @tornado.gen.coroutine
- def fetch_max(self, cluster, metric, topology, component, instance, timerange, environ=None):
- '''
- :param cluster:
- :param metric:
- :param topology:
- :param component:
- :param instance:
- :param timerange:
- :param environ:
- :return:
- '''
- components = [component] if component != "*" else (yield get_comps(cluster, environ, topology))
-
- result = {}
- futures = []
- for comp in components:
- query = self.get_query(metric, comp, instance)
- max_query = "MAX(%s)" % query
- future = get_metrics(cluster, environ, topology, timerange, max_query)
- futures.append(future)
-
- results = yield futures
-
- data = self.compute_max(results)
-
- result = self.get_metric_response(timerange, data, True)
-
- raise tornado.gen.Return(result)
-
- # pylint: disable=unused-argument
- @tornado.gen.coroutine
- def fetch_backpressure(self, cluster, metric, topology, component, instance, \
- timerange, is_max, environ=None):
- '''
- :param cluster:
- :param metric:
- :param topology:
- :param component:
- :param instance:
- :param timerange:
- :param isMax:
- :param environ:
- :return:
- '''
- instances = yield get_instances(cluster, environ, topology)
- if component != "*":
- filtered_inst = [instance for instance in instances if instance.split("_")[2] == component]
- else:
- filtered_inst = instances
-
- futures_dict = {}
- for inst in filtered_inst:
- query = queries.get(metric).format(inst)
- futures_dict[inst] = get_metrics(cluster, environ, topology, timerange, query)
-
- res = yield futures_dict
-
- if not is_max:
- timelines = []
- for key in res:
- result = res[key]
- # Replacing stream manager instance name with component instance name
- if len(result["timeline"]) > 0:
- result["timeline"][0]["instance"] = key
- timelines.extend(result["timeline"])
- result = self.get_metric_response(timerange, timelines, is_max)
- else:
- data = self.compute_max(list(res.values()))
- result = self.get_metric_response(timerange, data, is_max)
-
- raise tornado.gen.Return(result)
-
- # pylint: disable=no-self-use
- def compute_max(self, multi_ts):
- '''
- :param multi_ts:
- :return:
- '''
- # Some components don't have specific metrics such as capacity hence the
- # key set is empty. These components are filtered out first.
- filtered_ts = [ts for ts in multi_ts if len(ts["timeline"][0]["data"]) > 0]
- if len(filtered_ts) > 0 and len(filtered_ts[0]["timeline"]) > 0:
- keys = list(filtered_ts[0]["timeline"][0]["data"].keys())
- timelines = ([res["timeline"][0]["data"][key] for key in keys] for res in filtered_ts)
- values = (max(v) for v in zip(*timelines))
- return dict(list(zip(keys, values)))
- return {}
-
- # pylint: disable=no-self-use
- def get_metric_response(self, timerange, data, isMax):
- '''
- :param timerange:
- :param data:
- :param isMax:
- :return:
- '''
- if isMax:
- return dict(
- status="success",
- starttime=timerange[0],
- endtime=timerange[1],
- result=dict(timeline=[dict(data=data)])
- )
-
- return dict(
- status="success",
- starttime=timerange[0],
- endtime=timerange[1],
- result=dict(timeline=data)
- )
-
- # pylint: disable=no-self-use
- def get_query(self, metric, component, instance):
- '''
- :param metric:
- :param component:
- :param instance:
- :return:
- '''
- q = queries.get(metric)
- return q.format(component, instance)
diff --git a/heron/tools/common/src/python/access/query.py b/heron/tools/common/src/python/access/query.py
deleted file mode 100644
index ba874bc..0000000
--- a/heron/tools/common/src/python/access/query.py
+++ /dev/null
@@ -1,62 +0,0 @@
-#!/usr/bin/env python3
-# -*- encoding: utf-8 -*-
-
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-''' query.py '''
-
-class QueryHandler:
- ''' QueryHandler '''
-
- def fetch(self, cluster, metric, topology, component, instance, timerange, environ=None):
- '''
- :param cluster:
- :param metric:
- :param topology:
- :param component:
- :param instance:
- :param timerange:
- :param environ:
- :return:
- '''
-
- def fetch_max(self, cluster, metric, topology, component, instance, timerange, environ=None):
- '''
- :param cluster:
- :param metric:
- :param topology:
- :param component:
- :param instance:
- :param timerange:
- :param environ:
- :return:
- '''
-
- def fetch_backpressure(self, cluster, metric, topology, component, instance, \
- timerange, is_max, environ=None):
- '''
- :param cluster:
- :param metric:
- :param topology:
- :param component:
- :param instance:
- :param timerange:
- :param is_max:
- :param environ:
- :return:
- '''
diff --git a/heron/tools/common/src/python/access/tracker_access.py b/heron/tools/common/src/python/access/tracker_access.py
deleted file mode 100644
index c0700f5..0000000
--- a/heron/tools/common/src/python/access/tracker_access.py
+++ /dev/null
@@ -1,134 +0,0 @@
-#!/usr/bin/env python3
-# -*- encoding: utf-8 -*-
-
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-'''tracker_access.py: util functions for heron explorer and tracker'''
-import traceback
-import tornado.gen
-import tornado.ioloop
-
-from heron.tools.common.src.python.access import heron_api as API
-from heron.common.src.python.utils.log import Log
-
-def _all_metric_queries():
- queries_normal = ['complete-latency',
- 'execute-latency',
- 'process-latency',
- 'jvm-uptime-secs',
- 'jvm-process-cpu-load',
- 'jvm-memory-used-mb']
- queries = ['__%s' % m for m in queries_normal]
- count_queries_normal = ['emit-count', 'execute-count', 'ack-count', 'fail-count']
- count_queries = ['__%s/default' % m for m in count_queries_normal]
- return queries, queries_normal, count_queries, count_queries_normal
-
-
-def metric_queries():
- """all metric queries"""
- qs = _all_metric_queries()
- return qs[0] + qs[2]
-
-
-def queries_map():
- """map from query parameter to query name"""
- qs = _all_metric_queries()
- return dict(list(zip(qs[0], qs[1])) + list(zip(qs[2], qs[3])))
-
-
-def get_clusters():
- """Synced API call to get all cluster names"""
- instance = tornado.ioloop.IOLoop.instance()
- # pylint: disable=unnecessary-lambda
- try:
- return instance.run_sync(lambda: API.get_clusters())
- except Exception:
- Log.debug(traceback.format_exc())
- raise
-
-
-def get_logical_plan(cluster, env, topology, role):
- """Synced API call to get logical plans"""
- instance = tornado.ioloop.IOLoop.instance()
- try:
- return instance.run_sync(lambda: API.get_logical_plan(cluster, env, topology, role))
- except Exception:
- Log.debug(traceback.format_exc())
- raise
-
-
-def get_topology_info(*args):
- """Synced API call to get topology information"""
- instance = tornado.ioloop.IOLoop.instance()
- try:
- return instance.run_sync(lambda: API.get_topology_info(*args))
- except Exception:
- Log.debug(traceback.format_exc())
- raise
-
-
-def get_topology_metrics(*args):
- """Synced API call to get topology metrics"""
- instance = tornado.ioloop.IOLoop.instance()
- try:
- return instance.run_sync(lambda: API.get_comp_metrics(*args))
- except Exception:
- Log.debug(traceback.format_exc())
- raise
-
-
-def get_component_metrics(component, cluster, env, topology, role):
- """Synced API call to get component metrics"""
- all_queries = metric_queries()
- try:
- result = get_topology_metrics(cluster, env, topology, component, [],
- all_queries, [0, -1], role)
- return result["metrics"]
- except Exception:
- Log.debug(traceback.format_exc())
- raise
-
-
-def get_cluster_topologies(cluster):
- """Synced API call to get topologies under a cluster"""
- instance = tornado.ioloop.IOLoop.instance()
- try:
- return instance.run_sync(lambda: API.get_cluster_topologies(cluster))
- except Exception:
- Log.debug(traceback.format_exc())
- raise
-
-
-def get_cluster_role_topologies(cluster, role):
- """Synced API call to get topologies under a cluster submitted by a role"""
- instance = tornado.ioloop.IOLoop.instance()
- try:
- return instance.run_sync(lambda: API.get_cluster_role_topologies(cluster, role))
- except Exception:
- Log.debug(traceback.format_exc())
- raise
-
-
-def get_cluster_role_env_topologies(cluster, role, env):
- """Synced API call to get topologies under a cluster submitted by a role under env"""
- instance = tornado.ioloop.IOLoop.instance()
- try:
- return instance.run_sync(lambda: API.get_cluster_role_env_topologies(cluster, role, env))
- except Exception:
- Log.debug(traceback.format_exc())
- raise
diff --git a/heron/tools/common/src/python/access/__init__.py b/heron/tools/common/src/python/clients/__init__.py
similarity index 89%
rename from heron/tools/common/src/python/access/__init__.py
rename to heron/tools/common/src/python/clients/__init__.py
index 9639b9f..508a4cc 100644
--- a/heron/tools/common/src/python/access/__init__.py
+++ b/heron/tools/common/src/python/clients/__init__.py
@@ -14,8 +14,7 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
-''' access module '''
-__all__ = ['access']
+''' clients module '''
+__all__ = ['tracker']
-from .heron_api import *
-from .query import *
+from . import tracker
diff --git a/heron/tools/common/src/python/clients/tracker.py b/heron/tools/common/src/python/clients/tracker.py
new file mode 100644
index 0000000..ab914d3
--- /dev/null
+++ b/heron/tools/common/src/python/clients/tracker.py
@@ -0,0 +1,922 @@
+#!/usr/bin/env python3
+# -*- encoding: utf-8 -*-
+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+'''
+This module provides a synchronous client library for a tracker instance.
+
+To use this client, you must provide the tracker_url, which is done with:
+ heron.tools.common.src.python.clients.tracker.tracker_url = tracker_url
+this is a reminent of the old tornado implementation and should be factored
+out in some way.
+
+This module isn't just a thin client for a tracker, it also includes
+methods common to heron-explorer and heron-ui.
+
+'''
+
+import re
+import time
+
+from typing import Any, Iterable, List, Optional, Tuple
+from urllib.parse import urlencode
+
+from heron.common.src.python.utils.log import Log
+
+import requests
+
+# This requires setting
+tracker_url = "http://127.0.0.1:8888"
+
+# pylint: disable=bad-whitespace
+CLUSTER_URL_FMT = "%s/clusters"
+TOPOLOGIES_URL_FMT = "%s/topologies"
+TOPOLOGIES_STATS_URL_FMT = "%s/states" % TOPOLOGIES_URL_FMT
+EXECUTION_STATE_URL_FMT = "%s/executionstate" % TOPOLOGIES_URL_FMT
+LOGICALPLAN_URL_FMT = "%s/logicalplan" % TOPOLOGIES_URL_FMT
+PHYSICALPLAN_URL_FMT = "%s/physicalplan" % TOPOLOGIES_URL_FMT
+PACKINGPLAN_URL_FMT = "%s/packingplan" % TOPOLOGIES_URL_FMT
+SCHEDULER_LOCATION_URL_FMT = "%s/schedulerlocation" % TOPOLOGIES_URL_FMT
+
+METRICS_URL_FMT = "%s/metrics" % TOPOLOGIES_URL_FMT
+METRICS_QUERY_URL_FMT = "%s/metricsquery" % TOPOLOGIES_URL_FMT
+METRICS_TIMELINE_URL_FMT = "%s/metricstimeline" % TOPOLOGIES_URL_FMT
+
+EXCEPTIONS_URL_FMT = "%s/exceptions" % TOPOLOGIES_URL_FMT
+EXCEPTION_SUMMARY_URL_FMT = "%s/exceptionsummary" % TOPOLOGIES_URL_FMT
+
+INFO_URL_FMT = "%s/info" % TOPOLOGIES_URL_FMT
+PID_URL_FMT = "%s/pid" % TOPOLOGIES_URL_FMT
+JSTACK_URL_FMT = "%s/jstack" % TOPOLOGIES_URL_FMT
+JMAP_URL_FMT = "%s/jmap" % TOPOLOGIES_URL_FMT
+HISTOGRAM_URL_FMT = "%s/histo" % TOPOLOGIES_URL_FMT
+
+FILE_DATA_URL_FMT = "%s/containerfiledata" % TOPOLOGIES_URL_FMT
+FILE_DOWNLOAD_URL_FMT = "%s/containerfiledownload" % TOPOLOGIES_URL_FMT
+FILESTATS_URL_FMT = "%s/containerfilestats" % TOPOLOGIES_URL_FMT
+
+
+def strip_whitespace(s):
+ return re.sub(r'\s','', s)
+
+capacity = strip_whitespace("""
+DIVIDE(
+ DEFAULT(0,
+ MULTIPLY(
+ TS({0},{1},__execute-count/default),
+ TS({0},{1},__execute-latency/default)
+ )
+ ),
+ 60000000000
+)
+""")
+
+failures = strip_whitespace("""
+DEFAULT(0,
+ DIVIDE(
+ TS({0},{1},__fail-count/default),
+ SUM(
+ DEFAULT(1, TS({0},{1},__execute-count/default)),
+ DEFAULT(0, TS({0},{1},__fail-count/default))
+ )
+ )
+)
+""")
+
+cpu = strip_whitespace("DEFAULT(0, TS({0},{1},__jvm-process-cpu-load))")
+
+memory = strip_whitespace("""
+DIVIDE(
+ DEFAULT(0, TS({0},{1},__jvm-memory-used-mb)),
+ DEFAULT(1, TS({0},{1},__jvm-memory-mb-total))
+)
+""")
+
+gc = "RATE(TS({0},{1},__jvm-gc-collection-time-ms))"
+
+backpressure = strip_whitespace("""
+DEFAULT(0, TS(__stmgr__,*,__time_spent_back_pressure_by_compid/{0}))
+""")
+
+queries = dict(
+ cpu=cpu,
+ capacity=capacity,
+ failures=failures,
+ memory=memory,
+ gc=gc,
+ backpressure=backpressure
+)
+
+def api_get(url: str, params=None) -> dict:
+ """Make a GET request to a tracker URL and return the result."""
+ start = time.time()
+ try:
+ response = requests.get(url, params)
+ response.raise_for_status()
+ except Exception as e:
+ Log.error(f"Unable to get response from {url}: {e}")
+ return None
+ end = time.time()
+ data = response.json()
+ if "result" not in data:
+ Log.error(f"Empty response from {url}")
+ return None
+
+ execution = data["executiontime"] * 1000
+ duration = (end - start) * 1000
+ Log.debug(f"URL fetch took {execution:.2}ms server time for {url}")
+ Log.debug(f"URL fetch took {duration:.2}ms round trip time for {url}")
+
+ return data["result"]
+
+
+def create_url(fmt: str) -> str:
+ '''
+ Given an URL format, substitute with tracker service endpoint
+ :param fmt:
+ :return:
+ '''
+ return fmt % tracker_url
+
+
+def get_clusters() -> List[str]:
+ '''
+ :return:
+ '''
+ request_url = create_url(CLUSTER_URL_FMT)
+ return api_get(request_url)
+
+
+def get_topologies() -> Any:
+ '''
+ Get the list of topologies given a data center from heron tracker
+ :return:
+ '''
+ request_url = create_url(TOPOLOGIES_URL_FMT)
+ return api_get(request_url)
+
+
+def get_topologies_states() -> Any:
+ '''
+ Get the list of topologies and their states
+ :return:
+ '''
+ request_url = create_url(TOPOLOGIES_STATS_URL_FMT)
+ return api_get(request_url)
+
+
+def _get_topologies(cluster: str, role: Optional[str]=None, env: Optional[str]=None) -> Any:
+ base_url = create_url(TOPOLOGIES_URL_FMT)
+ params = {"cluster": cluster, "environ": env, "role": role}
+ return api_get(base_url, params)
+
+
+def get_cluster_topologies(cluster: str) -> Any:
+ '''
+ Get the list of topologies given a cluster
+ :param cluster:
+ :return:
+ '''
+ return _get_topologies(cluster)
+
+
+def get_cluster_role_topologies(cluster: str, role: str) -> Any:
+ '''
+ Get the list of topologies given a cluster submitted by a given role
+ :param cluster:
+ :param role:
+ :return:
+ '''
+ return _get_topologies(cluster, role=role)
+
+
+def get_cluster_role_env_topologies(cluster: str, role: str, env: str) -> Any:
+ '''
+ Get the list of topologies given a cluster submitted by a given role under a given environment
+ :param cluster:
+ :param role:
+ :param env:
+ :return:
+ '''
+ return _get_topologies(cluster, role=role, env=env)
+
+
+def get_execution_state(cluster: str, environ: str, topology: str, role: Optional[str]=None) -> Any:
+ '''
+ Get the execution state of a topology in a cluster
+ :param cluster:
+ :param environ:
+ :param topology:
+ :param role:
+ :return:
+ '''
+ base_url = create_url(EXECUTION_STATE_URL_FMT)
+ params = {"cluster": cluster, "environ": environ, "topology": topology, "role": role}
+ return api_get(base_url, params)
+
+
+def get_logical_plan(cluster: str, environ: str, topology: str, role: Optional[str]=None) -> Any:
+ '''
+ Get the logical plan state of a topology in a cluster
+ :param cluster:
+ :param environ:
+ :param topology:
+ :param role:
+ :return:
+ '''
+ base_url = create_url(LOGICALPLAN_URL_FMT)
+ params = {"cluster": cluster, "environ": environ, "topology": topology, "role": role}
+ return api_get(base_url, params)
+
+
+def get_comps(cluster: str, environ: str, topology: str, role: Optional[str]=None) -> Any:
+ '''
+ Get the list of component names for the topology from Heron Nest
+ :param cluster:
+ :param environ:
+ :param topology:
+ :param role:
+ :return:
+ '''
+ base_url = create_url(LOGICALPLAN_URL_FMT)
+ params = {"cluster": cluster, "environ": environ, "topology": topology, "role": role}
+ lplan = api_get(base_url, params)
+ return sorted(lplan['components'])
+
+def get_instances(cluster: str, environ: str, topology: str, role: Optional[str]=None):
+ '''
+ Get the list of instances for the topology from Heron Nest
+ :param cluster:
+ :param environ:
+ :param topology:
+ :param role:
+ :return:
+ '''
+ base_url = create_url(PHYSICALPLAN_URL_FMT)
+ params = {"cluster": cluster, "environ": environ, "topology": topology, "role": role}
+ pplan = api_get(base_url, params)
+ return sorted(pplan['instances'])
+
+
+def get_packing_plan(cluster: str, environ: str, topology: str, role: Optional[str]=None) -> Any:
+ '''
+ Get the packing plan state of a topology in a cluster from tracker
+ :param cluster:
+ :param environ:
+ :param topology:
+ :param role:
+ :return:
+ '''
+ base_url = create_url(PACKINGPLAN_URL_FMT)
+ params = {"cluster": cluster, "environ": environ, "topology": topology, "role": role}
+ return api_get(base_url, params)
+
+
+def get_physical_plan(cluster: str, environ: str, topology: str, role: Optional[str]=None) -> Any:
+ '''
+ Get the physical plan state of a topology in a cluster from tracker
+ :param cluster:
+ :param environ:
+ :param topology:
+ :param role:
+ :return:
+ '''
+ base_url = create_url(PHYSICALPLAN_URL_FMT)
+ params = {"cluster": cluster, "environ": environ, "topology": topology, "role": role}
+ return api_get(base_url, params)
+
+
+def get_scheduler_location(
+ cluster: str,
+ environ: str,
+ topology: str,
+ role: Optional[str]=None,
+) -> Any:
+ '''
+ Get the scheduler location of a topology in a cluster from tracker
+ :param cluster:
+ :param environ:
+ :param topology:
+ :param role:
+ :return:
+ '''
+ base_url = create_url(SCHEDULER_LOCATION_URL_FMT)
+ params = {"cluster": cluster, "environ": environ, "topology": topology, "role": role}
+ return api_get(base_url, params)
+
+
+def get_component_exceptionsummary(
+ cluster: str,
+ environ: str,
+ topology: str,
+ component: str,
+ role: Optional[str]=None,
+) -> Any:
+ '''
+ Get summary of exception for a component
+ :param cluster:
+ :param environ:
+ :param topology:
+ :param component:
+ :param role:
+ :return:
+ '''
+ base_url = create_url(EXCEPTION_SUMMARY_URL_FMT)
+ params = {
+ "cluster": cluster,
+ "environ": environ,
+ "topology": topology,
+ "role": role,
+ "component": component,
+ }
+ return api_get(base_url, params)
+
+
+def get_component_exceptions(
+ cluster: str,
+ environ: str,
+ topology: str,
+ component: str,
+ role: Optional[str]=None,
+) -> Any:
+ '''
+ Get exceptions for 'component' for 'topology'
+ :param cluster:
+ :param environ:
+ :param topology:
+ :param component:
+ :param role:
+ :return:
+ '''
+ base_url = create_url(EXCEPTIONS_URL_FMT)
+ params = {
+ "cluster": cluster,
+ "environ": environ,
+ "topology": topology,
+ "role": role,
+ "component": component,
+ }
+ return api_get(base_url, params)
+
+
+def get_comp_instance_metrics(
+ cluster: str,
+ environ: str,
+ topology: str,
+ component: str,
+ metrics: List[str],
+ instances: List[str],
+ time_range: Tuple[int, int],
+ role: Optional[str]=None,
+) -> Any:
+ '''
+ Get the metrics for some instances of a topology from tracker
+ :param cluster:
+ :param environ:
+ :param topology:
+ :param component:
+ :param metrics: dict of display name to cuckoo name
+ :param instances:
+ :param time_range: 2-tuple consisting of start and end of range
+ :param role:
+ :return:
+ '''
+ base_url = create_url(METRICS_URL_FMT)
+ params = {
+ "cluster": cluster,
+ "environ": environ,
+ "topology": topology,
+ "role": role,
+ "component": component,
+ "interval": time_range[1],
+ "metricname": [m[0] for m in metrics.values()],
+ "instance": instances if isinstance(instances, list) else [instances],
+ }
+ return api_get(base_url, params)
+
+
+def get_comp_metrics(
+ cluster: str,
+ environ: str,
+ topology: str,
+ component: str,
+ instances: List[str],
+ metrics: List[str],
+ time_range: Tuple[int, int],
+ role: Optional[str]=None,
+) -> Any:
+ '''
+ Get the metrics for all the instances of a topology from Heron Nest
+ :param cluster:
+ :param environ:
+ :param topology:
+ :param component:
+ :param instances:
+ :param metrics: list of names, or ["*"] for all
+ :param time_range: 2-tuple consisting of start and end of range
+ :param role:
+ :return:
+ '''
+ base_url = create_url(METRICS_URL_FMT)
+ params = {
+ "cluster": cluster,
+ "environ": environ,
+ "topology": topology,
+ "role": role,
+ "component": component,
+ "metricname": metrics,
+ "instance": instances,
+ "interval": time_range[1],
+ }
+ return api_get(base_url, params)
+
+
+def get_metrics(
+ cluster: str,
+ environment: str,
+ topology: str,
+ timerange: str,
+ query: str,
+ role: Optional[str]=None,
+) -> Any:
+ '''
+ Get the metrics for a topology from tracker
+ :param cluster:
+ :param environment:
+ :param topology:
+ :param timerange:
+ :param query:
+ :param role:
+ :return:
+ '''
+ base_url = create_url(METRICS_QUERY_URL_FMT)
+ params = {
+ "cluster": cluster,
+ "environ": environment,
+ "role": role,
+ "topology": topology,
+ "starttime": timerange[0],
+ "endtime": timerange[1],
+ "query": query,
+ }
+ return api_get(base_url, params)
+
+
+def get_comp_metrics_timeline(
+ cluster: str,
+ environ: str,
+ topology: str,
+ component: str,
+ instances: List[str],
+ metricnames: List[str],
+ time_range: Tuple[int, int],
+ role: Optional[str]=None,
+) -> Any:
+ '''
+ Get the minute-by-minute metrics for all instances of a topology from tracker
+ :param cluster:
+ :param environ:
+ :param topology:
+ :param component:
+ :param instances:
+ :param metricnames: dict of display name to cuckoo name
+ :param time_range: 2-tuple consisting of start and end of range
+ :param role:
+ :return:
+ '''
+ base_url = create_url(METRICS_TIMELINE_URL_FMT)
+ params = {
+ "cluster": cluster,
+ "environ": environ,
+ "role": role,
+ "topology": topology,
+ "component": component,
+ "instance": instances,
+ "metricname": metricnames,
+ "starttime": time_range[0],
+ "endtime": time_range[1],
+ }
+ return api_get(base_url, params)
+
+
+def get_topology_info(cluster: str, environ: str, topology: str, role: Optional[str]=None) -> Any:
+ '''
+ :param cluster:
+ :param environ:
+ :param topology:
+ :param role:
+ :return:
+ '''
+ base_url = create_url(INFO_URL_FMT)
+ params = {
+ "cluster": cluster,
+ "environ": environ,
+ "topology": topology,
+ "role": role,
+ }
+ return api_get(base_url, params)
+
+
+def get_instance_pid(
+ cluster: str,
+ environ: str,
+ topology: str,
+ instance: str,
+ role: Optional[str]=None,
+) -> None:
+ '''
+ :param cluster:
+ :param environ:
+ :param topology:
+ :param instance:
+ :param role:
+ :return:
+ '''
+ base_url = create_url(PID_URL_FMT)
+ params = {
+ "cluster": cluster,
+ "environ": environ,
+ "topology": topology,
+ "role": role,
+ "instance": instance,
+ }
+ return api_get(base_url, params)
+
+
+def get_instance_jstack(
+ cluster: str,
+ environ: str,
+ topology: str,
+ instance: str,
+ role: Optional[str]=None,
+) -> None:
+ '''
+ :param cluster:
+ :param environ:
+ :param topology:
+ :param instance:
+ :param role:
+ :return:
+ '''
+ base_url = create_url(JSTACK_URL_FMT)
+ params = {
+ "cluster": cluster,
+ "environ": environ,
+ "topology": topology,
+ "role": role,
+ "instance": instance,
+ }
+ return api_get(base_url, params)
+
+
+def get_instance_mem_histogram(
+ cluster: str,
+ environ: str,
+ topology: str,
+ instance: str,
+ role: Optional[str]=None,
+) -> Any:
+ '''
+ Get histogram of active memory objects.
+ :param cluster:
+ :param environ:
+ :param topology:
+ :param instance:
+ :param role:
+ :return:
+ '''
+ base_url = create_url(HISTOGRAM_URL_FMT)
+ params = {
+ "cluster": cluster,
+ "environ": environ,
+ "topology": topology,
+ "role": role,
+ "instance": instance,
+ }
+ return api_get(base_url, params)
+
+
+def run_instance_jmap(cluster: str, environ: str, topology: str, instance: str, role=None) -> Any:
+ '''
+ Call heap dump for an instance and save it at /tmp/heap.bin
+ :param cluster:
+ :param environ:
+ :param topology:
+ :param instance:
+ :param role:
+ :return:
+ '''
+ base_url = create_url(JMAP_URL_FMT)
+ params = {
+ "cluster": cluster,
+ "environ": environ,
+ "topology": topology,
+ "role": role,
+ "instance": instance,
+ }
+ return api_get(base_url, params)
+
+
+def get_container_file_download_url(
+ cluster: str,
+ environ: str,
+ topology: str,
+ container: str,
+ path: str,
+ role: Optional[str]=None,
+) -> Any:
+ '''
+ :param cluster:
+ :param environ:
+ :param topology:
+ :param container:
+ :param path:
+ :param role:
+ :return:
+ '''
+ base_url = create_url(FILE_DOWNLOAD_URL_FMT)
+ params = dict(
+ cluster=cluster,
+ environ=environ,
+ topology=topology,
+ container=container,
+ path=path)
+ if role is not None:
+ params['role'] = role
+
+ query = urlencode(params)
+ return f"{base_url}?{query}"
+
+
+def get_container_file_data(
+ cluster: str,
+ environ: str,
+ topology: str,
+ container: str,
+ path: str,
+ offset: int,
+ length: int,
+ role: Optional[str]=None,
+) -> Any:
+ '''
+ :param cluster:
+ :param environ:
+ :param topology:
+ :param container:
+ :param path:
+ :param offset:
+ :param length:
+ :param role:
+ :return:
+ '''
+ base_url = create_url(FILE_DATA_URL_FMT)
+ params = {
+ "cluster": cluster,
+ "environ": environ,
+ "topology": topology,
+ "role": role,
+ "container": container,
+ "path": path,
+ "offset": offset,
+ "length": length,
+ }
+ return api_get(base_url, params)
+
+
+def get_filestats(
+ cluster: str,
+ environ: str,
+ topology: str,
+ container: str,
+ path: str,
+ role: Optional[str]=None,
+) -> Any:
+ '''
+ :param cluster:
+ :param environ:
+ :param topology:
+ :param container:
+ :param path:
+ :param role:
+ :return:
+ '''
+ base_url = create_url(FILESTATS_URL_FMT)
+ params = {
+ "cluster": cluster,
+ "environ": environ,
+ "topology": topology,
+ "role": role,
+ "container": container,
+ "path": path,
+ }
+ return api_get(base_url, params)
+
+
+class HeronQueryHandler:
+ ''' HeronQueryHandler '''
+
+
+ def fetch(
+ self,
+ cluster: str,
+ metric: str,
+ topology: str,
+ component: str,
+ instance:str,
+ timerange: Tuple[int, int],
+ environ: Optional[str]=None,
+ ) -> Any:
+ '''
+ :param cluster:
+ :param metric:
+ :param topology:
+ :param component:
+ :param instance:
+ :param timerange:
+ :param environ:
+ :return:
+ '''
+ components = [component] if component != "*" else get_comps(cluster, environ, topology)
+
+ results = []
+ for comp in components:
+ query = self.get_query(metric, comp, instance)
+ results.append(get_metrics(cluster, environ, topology, timerange, query))
+
+ timelines = []
+ for result in results:
+ timelines.extend(result["timeline"])
+
+ return self.get_metric_response(timerange, timelines, False)
+
+ def fetch_max(
+ self,
+ cluster: str,
+ metric: str,
+ topology: str,
+ component: str,
+ instance: str,
+ timerange: Tuple[int, int],
+ environ: Optional[str]=None,
+ ) -> Any:
+ '''
+ :param cluster:
+ :param metric:
+ :param topology:
+ :param component:
+ :param instance:
+ :param timerange:
+ :param environ:
+ :return:
+ '''
+ components = [component] if component != "*" else get_comps(cluster, environ, topology)
+
+ comp_metrics = []
+ for comp in components:
+ query = self.get_query(metric, comp, instance)
+ max_query = "MAX(%s)" % query
+ comp_metrics.append(get_metrics(cluster, environ, topology, timerange, max_query))
+
+ data = self.compute_max(comp_metrics)
+
+ return self.get_metric_response(timerange, data, True)
+
+ # pylint: disable=unused-argument
+ def fetch_backpressure(
+ self,
+ cluster: str,
+ metric: str,
+ topology: str,
+ component: str,
+ instance: str,
+ timerange: Tuple[int, int],
+ is_max: bool,
+ environ: Optional[str]=None,
+ ) -> Any:
+ '''
+ :param cluster:
+ :param metric:
+ :param topology:
+ :param component:
+ :param instance:
+ :param timerange:
+ :param isMax:
+ :param environ:
+ :return:
+ '''
+ instances = get_instances(cluster, environ, topology)
+ if component != "*":
+ filtered_inst = [instance for instance in instances if instance.split("_")[2] == component]
+ else:
+ filtered_inst = instances
+
+ inst_metrics = {}
+ for inst in filtered_inst:
+ query = queries[metric].format(inst)
+ inst_metrics[inst] = get_metrics(cluster, environ, topology, timerange, query)
+
+
+ if is_max:
+ data = self.compute_max(inst_metrics.values())
+ return self.get_metric_response(timerange, data, is_max)
+
+ timelines = []
+ for i, metrics in inst_metrics.items():
+ # Replacing stream manager instance name with component instance name
+ if len(metrics["timeline"]) > 0:
+ metrics["timeline"][0]["instance"] = i
+ timelines.extend(metrics["timeline"])
+ return self.get_metric_response(timerange, timelines, is_max)
+
+ # pylint: disable=no-self-use
+ def compute_max(self, multi_ts: Iterable[dict]) -> dict:
+ '''
+ :param multi_ts:
+ :return:
+ '''
+ # Some components don't have specific metrics such as capacity hence the
+ # key set is empty. These components are filtered out first.
+ filtered_ts = [ts for ts in multi_ts if len(ts["timeline"][0]["data"]) > 0]
+ if len(filtered_ts) > 0 and len(filtered_ts[0]["timeline"]) > 0:
+ keys = list(filtered_ts[0]["timeline"][0]["data"].keys())
+ timelines = ([res["timeline"][0]["data"][key] for key in keys] for res in filtered_ts)
+ values = (max(v) for v in zip(*timelines))
+ return dict(list(zip(keys, values)))
+ return {}
+
+ # pylint: disable=no-self-use
+ def get_metric_response(self, timerange: Tuple[int, int], data: dict, is_max: bool) -> dict:
+ '''
+ :param timerange:
+ :param data:
+ :param is_max:
+ :return:
+ '''
+ if is_max:
+ return dict(
+ status="success",
+ starttime=timerange[0],
+ endtime=timerange[1],
+ result=dict(timeline=[dict(data=data)])
+ )
+
+ return dict(
+ status="success",
+ starttime=timerange[0],
+ endtime=timerange[1],
+ result=dict(timeline=data)
+ )
+
+ # pylint: disable=no-self-use
+ def get_query(self, metric: str, component: str, instance: str) -> str:
+ '''
+ :param metric:
+ :param component:
+ :param instance:
+ :return:
+ '''
+ return queries[metric].format(component, instance)
+
+
+def _all_metric_queries() -> Tuple[List[str], List[str], List[str], List[str]]:
+ # the count queries look suspicious, as if they should be parameterised with the environment
+ normal_query_labels = [
+ 'complete-latency',
+ 'execute-latency',
+ 'process-latency',
+ 'jvm-uptime-secs',
+ 'jvm-process-cpu-load',
+ 'jvm-memory-used-mb']
+ normal_queries = [f'__{m}' for m in normal_query_labels]
+ count_query_labels = ['emit-count', 'execute-count', 'ack-count', 'fail-count']
+ count_queries = [f'__{m}/default' for m in count_query_labels]
+ return normal_queries, normal_query_labels, count_queries, count_query_labels
+
+
+def metric_queries() -> List[str]:
+ """all metric queries"""
+ normal_queries, _, count_queries, _ = _all_metric_queries()
+ return normal_queries + count_queries
+
+
+def queries_map() -> dict:
+ """map from query parameter to query name"""
+ normal_queries, normal_query_labels, count_queries, count_query_labels = _all_metric_queries()
+ result = dict(zip(normal_query_labels, normal_queries))
+ result.update(zip(count_query_labels, count_queries))
+ return result
diff --git a/heron/tools/explorer/src/python/BUILD b/heron/tools/explorer/src/python/BUILD
index 6b10054..5964c23 100644
--- a/heron/tools/explorer/src/python/BUILD
+++ b/heron/tools/explorer/src/python/BUILD
@@ -4,9 +4,10 @@ pex_library(
name = "explorer-py",
srcs = glob(["**/*.py"]),
reqs = [
- "tornado==4.0.2",
+ "tornado==4.5.3",
"tabulate==0.7.4",
"click==7.1.2",
+ "requests==2.12.3",
],
deps = [
"//heron/common/src/python:common-py",
diff --git a/heron/tools/explorer/src/python/logicalplan.py b/heron/tools/explorer/src/python/logicalplan.py
index e467dba..2acf144 100644
--- a/heron/tools/explorer/src/python/logicalplan.py
+++ b/heron/tools/explorer/src/python/logicalplan.py
@@ -24,11 +24,12 @@ import sys
from collections import defaultdict
from heron.common.src.python.utils.log import Log
-
-import heron.tools.common.src.python.access.tracker_access as tracker_access
+from heron.tools.common.src.python.clients import tracker
from tabulate import tabulate
+import requests
+
def to_table(components, topo_info, component_filter):
""" normalize raw logical plan info to table """
@@ -67,10 +68,10 @@ def to_table(components, topo_info, component_filter):
def run(component_type: str, cluster: str, role: str, environment: str, topology: str):
""" run command """
try:
- components = tracker_access.get_logical_plan(cluster, environment, topology, role)
- topo_info = tracker_access.get_topology_info(cluster, environment, topology, role)
- except:
- Log.error("Fail to connect to tracker")
+ components = tracker.get_logical_plan(cluster, environment, topology, role)
+ topo_info = tracker.get_topology_info(cluster, environment, topology, role)
+ except requests.ConnectionError as e:
+ Log.error(f"Fail to connect to tracker: {e}")
sys.exit(1)
table, header = to_table(components, topo_info, component_type)
print(tabulate(table, headers=header))
diff --git a/heron/tools/explorer/src/python/main.py b/heron/tools/explorer/src/python/main.py
index be35780..e4a48cc 100644
--- a/heron/tools/explorer/src/python/main.py
+++ b/heron/tools/explorer/src/python/main.py
@@ -23,16 +23,15 @@ import logging
import os
import sys
-import heron.common.src.python.utils.log as log
-import heron.tools.common.src.python.access.tracker_access as tracker_access
-import heron.tools.common.src.python.utils.config as config
-import heron.tools.explorer.src.python.logicalplan as logicalplan
-import heron.tools.explorer.src.python.physicalplan as physicalplan
-import heron.tools.explorer.src.python.topologies as topologies
+from heron.common.src.python.utils import log
+from heron.tools.common.src.python.clients import tracker
+from heron.tools.common.src.python.utils import config
+from heron.tools.explorer.src.python import logicalplan
+from heron.tools.explorer.src.python import physicalplan
+from heron.tools.explorer.src.python import topologies
import click
-
-from tornado.options import define
+import requests
Log = log.Log
@@ -85,11 +84,11 @@ def cli(verbose: int):
@cli.command("clusters")
@tracker_url_option()
def cli_clusters(tracker_url: str):
- define("tracker_url", tracker_url)
+ tracker.tracker_url = tracker_url
try:
- clusters = tracker_access.get_clusters()
- except:
- Log.error("Fail to connect to tracker")
+ clusters = tracker.get_clusters()
+ except requests.ConnectionError as e:
+ Log.error(f"Fail to connect to tracker: {e}")
sys.exit(1)
print("Available clusters:")
for cluster in clusters:
@@ -100,7 +99,7 @@ def cli_clusters(tracker_url: str):
@click.argument("cre", metavar="CLUSTER[/ROLE[/ENV]]")
def cli_topologies(tracker_url: str, cre: str):
"""Show the topologies under the given CLUSTER[/ROLE[/ENV]]."""
- define("tracker_url", tracker_url)
+ tracker.tracker_url = tracker_url
topologies.run(
cre=cre,
)
@@ -124,7 +123,7 @@ def logical_plan(
tracker_url: str,
) -> None:
"""Show logical plan information for the given topology."""
- define("tracker_url", tracker_url)
+ tracker.tracker_url = tracker_url
cluster = config.get_heron_cluster(cre)
cluster_config_path = config.get_heron_cluster_conf_dir(cluster, config_path)
cluster, role, environment = config.parse_cluster_role_env(cre, cluster_config_path)
@@ -154,7 +153,7 @@ def metrics(
topology: str,
component: str,
) -> None:
- define("tracker_url", tracker_url)
+ tracker.tracker_url = tracker_url
cluster = config.get_heron_cluster(cre)
cluster_config_path = config.get_heron_cluster_conf_dir(cluster, config_path)
cluster, role, environment = config.parse_cluster_role_env(cre, cluster_config_path)
@@ -187,7 +186,7 @@ def containers(
topology: str,
container_id: int,
) -> None:
- define("tracker_url", tracker_url)
+ tracker.tracker_url = tracker_url
cluster = config.get_heron_cluster(cre)
cluster_config_path = config.get_heron_cluster_conf_dir(cluster, config_path)
cluster, role, environment = config.parse_cluster_role_env(cre, cluster_config_path)
diff --git a/heron/tools/explorer/src/python/physicalplan.py b/heron/tools/explorer/src/python/physicalplan.py
index e647ca4..a7d1332 100644
--- a/heron/tools/explorer/src/python/physicalplan.py
+++ b/heron/tools/explorer/src/python/physicalplan.py
@@ -20,16 +20,22 @@
''' physicalplan.py '''
import sys
+
+from typing import Optional
+
from heron.common.src.python.utils.log import Log
-import heron.tools.common.src.python.access.tracker_access as tracker_access
+from heron.tools.common.src.python.clients import tracker
+
from tabulate import tabulate
+import requests
+
def to_table(metrics):
""" normalize raw metrics API result to table """
- all_queries = tracker_access.metric_queries()
- m = tracker_access.queries_map()
- header = ['container id'] + [m[k] for k in all_queries if k in list(metrics.keys())]
+ all_queries = tracker.metric_queries()
+ m = tracker.queries_map()
+ header = ['container id'] + [m[k] for k in all_queries if k in metrics.keys()]
stats = []
if not metrics:
return stats, header
@@ -50,36 +56,38 @@ def run_metrics(
role: str,
environment: str,
topology: str,
- component: str,
+ component: Optional[str],
) -> None:
"""Render a table of metrics."""
try:
- result = tracker_access.get_topology_info(cluster, environment, topology, role)
- except Exception:
- Log.error("Fail to connect to tracker")
+ result = tracker.get_topology_info(cluster, environment, topology, role)
+ except requests.ConnectionError as e:
+ Log.error(f"Fail to connect to tracker: {e}")
sys.exit(1)
- spouts = list(result['physical_plan']['spouts'].keys())
- bolts = list(result['physical_plan']['bolts'].keys())
- components = spouts + bolts
+
+ all_components = sorted(result['physical_plan']['components'].keys())
if component:
- if component in components:
- components = [component]
- else:
+ if component not in components:
Log.error(f"Unknown component: {component!r}")
sys.exit(1)
- cresult = []
- for comp in components:
+ components = [component]
+ else:
+ components = all_components
+ all_queries = tracker.metric_queries()
+
+ for i, comp in enumerate(components):
+ print(f"{i} {comp} of {len(components)}")
try:
- metrics = tracker_access.get_component_metrics(comp, cluster, environment, topology, role)
- except:
- Log.error("Fail to connect to tracker")
+ result = tracker.get_comp_metrics(
+ cluster, environment, topology, comp, [], all_queries, [0, -1], role,
+ )
+ except requests.ConnectionError as e:
+ Log.error(f"Fail to connect to tracker: {e}")
sys.exit(1)
- stat, header = to_table(metrics)
- cresult.append((comp, stat, header))
- for i, (c, stat, header) in enumerate(cresult):
+ stat, header = to_table(result["metrics"])
if i != 0:
print('')
- print(f"{c!r} metrics:")
+ print(f"{comp!r} metrics:")
print(tabulate(stat, headers=header))
def run_containers(
@@ -91,9 +99,9 @@ def run_containers(
) -> None:
"""Render a table of container information."""
try:
- result = tracker_access.get_topology_info(cluster, environment, topology, role)
- except:
- Log.error("Fail to connect to tracker")
+ result = tracker.get_topology_info(cluster, environment, topology, role)
+ except requests.ConnectionError as e:
+ Log.error(f"Fail to connect to tracker: {e}")
sys.exit(1)
containers = result['physical_plan']['stmgrs']
all_bolts, all_spouts = set(), set()
diff --git a/heron/tools/explorer/src/python/topologies.py b/heron/tools/explorer/src/python/topologies.py
index d341ec6..d373195 100644
--- a/heron/tools/explorer/src/python/topologies.py
+++ b/heron/tools/explorer/src/python/topologies.py
@@ -22,10 +22,12 @@
import sys
from heron.common.src.python.utils.log import Log
-import heron.tools.common.src.python.access.tracker_access as tracker_access
+from heron.tools.common.src.python.clients import tracker
from tabulate import tabulate
+import requests
+
def to_table(result):
table = []
@@ -41,18 +43,18 @@ def run(cre: str) -> None:
"""Print all topologies under the given CRE."""
cluster, *role_env = cre.split('/')
if not role_env:
- get_topologies = tracker_access.get_cluster_topologies
+ get_topologies = tracker.get_cluster_topologies
elif len(role_env) == 1:
- get_topologies = tracker_access.get_cluster_role_topologies
+ get_topologies = tracker.get_cluster_role_topologies
elif len(role_env) == 2:
- get_topologies = tracker_access.get_cluster_role_env_topologies
+ get_topologies = tracker.get_cluster_role_env_topologies
else:
Log.error("Invalid topologies selection")
sys.exit(1)
try:
result = get_topologies(cluster, *role_env)
- except Exception:
- Log.error("Fail to connect to tracker")
+ except requests.ConnectionError as e:
+ Log.error(f"Fail to connect to tracker: {e}")
sys.exit(1)
topologies = result[cluster]
table, header = to_table(topologies)
diff --git a/heron/tools/explorer/tests/python/explorer_unittest.py b/heron/tools/explorer/tests/python/explorer_unittest.py
index 830f773..93bb421 100644
--- a/heron/tools/explorer/tests/python/explorer_unittest.py
+++ b/heron/tools/explorer/tests/python/explorer_unittest.py
@@ -24,8 +24,8 @@ import os
import unittest
from unittest.mock import Mock
-import heron.tools.explorer.src.python.topologies as topologies
-from heron.tools.explorer.src.python.main import cli
+from heron.tools.common.src.python.clients import tracker
+from heron.tools.explorer.src.python import topologies
# pylint: disable=missing-docstring, no-self-use
diff --git a/heron/tools/tracker/src/python/BUILD b/heron/tools/tracker/src/python/BUILD
index 7b06a78..c04756d 100644
--- a/heron/tools/tracker/src/python/BUILD
+++ b/heron/tools/tracker/src/python/BUILD
@@ -8,7 +8,7 @@ pex_library(
),
reqs = [
"protobuf==3.8.0",
- "tornado==4.0.2",
+ "tornado==4.5.3",
"javaobj-py3==0.4.1",
"networkx==2.4",
],
diff --git a/heron/tools/ui/resources/static/js/stat-trendlines.js b/heron/tools/ui/resources/static/js/stat-trendlines.js
index 2f32229..8b845f8 100644
--- a/heron/tools/ui/resources/static/js/stat-trendlines.js
+++ b/heron/tools/ui/resources/static/js/stat-trendlines.js
@@ -216,7 +216,7 @@ function StatTrendlines(baseUrl, cluster, environ, toponame, physicalPlan, logic
.attr('class', 'text-center')
.html([
'<a class="btn btn-primary btn-xs" target="_blank" href="' + baseUrl + '/topologies/' + cluster + '/' + environ + '/' + toponame + '/' + container + '/file?path=./log-files/' + instance + '.log.0">logs</a>',
- '<a class="btn btn-primary btn-xs" target="_blank" href="' + baseUrl + '/topologies/filestats/' + cluster + '/' + environ + '/' + toponame + '/' + container + '">job</a>',
+ '<a class="btn btn-primary btn-xs" target="_blank" href="' + baseUrl + '/topologies/filestats/' + cluster + '/' + environ + '/' + toponame + '/' + container + '/file">files</a>',
'<a class="btn btn-primary btn-xs" target="_blank" href="' + baseUrl + '/topologies/' + cluster + '/' + environ + '/' + toponame + '/' + name + '/' + instance + '/exceptions">exceptions</a>',
'<br>',
].join(' '));
diff --git a/heron/tools/ui/resources/static/js/topologies.js b/heron/tools/ui/resources/static/js/topologies.js
index ff8dbbe..a9a104a 100644
--- a/heron/tools/ui/resources/static/js/topologies.js
+++ b/heron/tools/ui/resources/static/js/topologies.js
@@ -1482,29 +1482,21 @@ var InstanceCounters = React.createClass({
if (instanceInfo) {
var stmgrId = instanceInfo.stmgrId;
var container = stmgrId.split("-")[1]
- var logfileUrl = this.props.info.baseUrl + '/topologies/' + this.props.info.cluster
- + '/' + this.props.info.environ + '/' + this.props.info.topology
+ var topologyParams = this.props.info.cluster + '/' + this.props.info.environ
+ + '/' + this.props.info.topology
+ var instanceParams = topologyParams + '/' + instanceInfo.id
+ var logfileUrl = this.props.info.baseUrl + '/topologies/' + topologyParams
+ '/' + container + '/file?path=./log-files/' + instanceInfo.id + '.log.0'
- var jobUrl = this.props.info.baseUrl + '/topologies/filestats/' + this.props.info.cluster
- + '/' + this.props.info.environ + '/' + this.props.info.topology
- + '/' + container;
- var exceptionsUrl = this.props.info.baseUrl + '/topologies/' + this.props.info.cluster
- + '/' + this.props.info.environ + '/' + this.props.info.topology
+ var jobUrl = this.props.info.baseUrl + '/topologies/filestats/' + topologyParams
+ + '/' + container + '/file';
+ var exceptionsUrl = this.props.info.baseUrl + '/topologies/' + topologyParams
+ '/' + this.props.info.comp_name + '/' + instance + '/exceptions';
- var pidUrl = this.props.info.baseUrl + '/topologies/' + this.props.info.cluster
- + '/' + this.props.info.environ + '/' + this.props.info.topology
- + '/' + instanceInfo.id + '/pid'
- var jstackUrl = this.props.info.baseUrl + '/topologies/' + this.props.info.cluster
- + '/' + this.props.info.environ + '/' + this.props.info.topology
- + '/' + instanceInfo.id + '/jstack'
- var jmapUrl = this.props.info.baseUrl + '/topologies/' + this.props.info.cluster
- + '/' + this.props.info.environ + '/' + this.props.info.topology
- + '/' + instanceInfo.id + '/jmap'
- var histoUrl = this.props.info.baseUrl + '/topologies/' + this.props.info.cluster
- + '/' + this.props.info.environ + '/' + this.props.info.topology
- + '/' + instanceInfo.id + '/histo'
+ var pidUrl = this.props.info.baseUrl + '/topologies/' + instanceParams + '/pid'
+ var jstackUrl = this.props.info.baseUrl + '/topologies/' + instanceParams + '/jstack'
+ var jmapUrl = this.props.info.baseUrl + '/topologies/' + instanceParams + '/jmap'
+ var histoUrl = this.props.info.baseUrl + '/topologies/' + instanceParams + '/histo'
var links = [['Logs', logfileUrl, "_blank"],
- ['Job', jobUrl, "_blank"],
+ ['Files', jobUrl, "_blank"],
['Exceptions', exceptionsUrl, "_self"],
['Pid', pidUrl, "_self"],
['Stack', jstackUrl, "_self"],
diff --git a/heron/tools/ui/resources/templates/application.html b/heron/tools/ui/resources/templates/application.html
index 2ae474b..c86a3cc 100644
--- a/heron/tools/ui/resources/templates/application.html
+++ b/heron/tools/ui/resources/templates/application.html
@@ -23,30 +23,30 @@ under the License.
<meta charset="utf-8">
<title>Heron Admin</title>
<meta content="width=device-width, initial-scale=1.0, user-scalable=no" name="viewport">
- <link rel="shortcut icon" href="{{ static_url('img/logo54x54.png') }}" type="image/x-icon" >
- <script src="{{ static_url('js/html5-trunk.js') }}"></script>
+ <link rel="shortcut icon" href="{{ url_for('static', path='/img/logo54x54.png') }}" type="image/x-icon" >
+ <script src="{{ url_for('static', path='/js/html5-trunk.js') }}"></script>
<!-- bootstrap css -->
- <link href="{{ static_url('css/bootstrap.css') }}" rel="stylesheet">
+ <link href="{{ url_for('static', path='/css/bootstrap.css') }}" rel="stylesheet">
<!-- custom css -->
- <link href="{{ static_url('css/main.css') }}" rel="stylesheet">
+ <link href="{{ url_for('static', path='/css/main.css') }}" rel="stylesheet">
- <link href="{{ static_url('css/visstyle.css') }}" rel="stylesheet">
+ <link href="{{ url_for('static', path='/css/visstyle.css') }}" rel="stylesheet">
- <script src="{{ static_url('js/react.0.10.0.js') }}"></script>
- <script src="{{ static_url('js/JSXTransformer.0.10.0.js') }}"></script>
- <script src="{{ static_url('js/moment.min.2.8.3.js') }}"></script>
- <script src="{{ static_url('js/underscore-min.1.6.0.js') }}"></script>
+ <script src="{{ url_for('static', path='/js/react.0.10.0.js') }}"></script>
+ <script src="{{ url_for('static', path='/js/JSXTransformer.0.10.0.js') }}"></script>
+ <script src="{{ url_for('static', path='/js/moment.min.2.8.3.js') }}"></script>
+ <script src="{{ url_for('static', path='/js/underscore-min.1.6.0.js') }}"></script>
<!-- Scripts -->
- <script src="{{ static_url('js/jquery.min.js') }}"></script>
- <script src="{{ static_url('js/bootstrap.js') }}"></script>
- <script src="{{ static_url('js/jquery-ui-1.8.23.custom.min.js') }}"></script>
+ <script src="{{ url_for('static', path='/js/jquery.min.js') }}"></script>
+ <script src="{{ url_for('static', path='/js/bootstrap.js') }}"></script>
+ <script src="{{ url_for('static', path='/js/jquery-ui-1.8.23.custom.min.js') }}"></script>
- <script src="{{ static_url('js/list.min.1.1.1.js') }}"></script>
+ <script src="{{ url_for('static', path='/js/list.min.1.1.1.js') }}"></script>
- <script type="text/jsx" src="{{ static_url('js/common.js') }}" ></script>
+ <script type="text/jsx" src="{{ url_for('static', path='/js/common.js') }}" ></script>
<style type="text/css">
.logo-img{
@@ -81,10 +81,10 @@ under the License.
<!-- Main navigation start -->
<nav id="global-nav">
<div class="logo-img">
- <img src="{{ static_url('img/logo54x54.png') }}" alt="Heron">
+ <img src="{{ url_for('static', path='/img/logo54x54.png') }}" alt="Heron">
</div>
<!-- <div class="logo">H<span>eron</span></div> -->
- {% module Template("nav.html", active=active, className=function) %}
+ {% include "nav.html" %}
</nav>
<!-- Main navigation end -->
@@ -99,7 +99,7 @@ under the License.
<!-- Header end -->
<!-- Dashboard wrapper start -->
<div class="dashboard-wrapper">
- {% block content %}{% end %}
+ {% block content %}{% endblock %}
</div>
<!-- Dashboard wrapper end -->
</div>
diff --git a/heron/tools/ui/resources/templates/browse.html b/heron/tools/ui/resources/templates/browse.html
index 91b27e5..39772d1 100644
--- a/heron/tools/ui/resources/templates/browse.html
+++ b/heron/tools/ui/resources/templates/browse.html
@@ -14,33 +14,52 @@
<html>
<head>
-<link rel="shortcut icon" href="{{ static_url('img/logo54x54.png') }}" type="image/x-icon" >
-<link href="{{ static_url('css/bootstrap.css') }}" rel="stylesheet">
+<link rel="shortcut icon" href="{{ url_for("static", path="img/logo54x54.png") }}" type="image/x-icon" >
+<link href="{{ url_for("static", path="css/bootstrap.css") }}" rel="stylesheet">
<style type="text/css">
div.tight
{
height:85%;
overflow:auto;
}
+ul.files li
+{
+ list-style: none;
+ line-height: normal;
+ font-family: monospace;
+ white-space: pre;
+}
</style>
-<title>Path browser for {{path}}</title>
+<title>Path browser for {{ path }}</title>
</head>
<body>
<div class="container">
- <div class="span6">
- <strong> Path </strong> {{path}}
- </div>
+ <div class="span6">
+ <strong> Path </strong> {{ path }}
+ </div>
- <div class="span12 tight">
- <pre>
-{% for fn in sorted(filestats.keys()) %} {% if not filestats[fn]["is_dir"] %}
- {{ filestats[fn]["formatted_stat"] }} <a href='{{baseUrl}}/topologies/{{cluster}}/{{environ}}/{{topology}}/{{container}}/file?path={{filestats[fn]["path"]}}'>{{ fn }}</a> <a href='/topologies/{{cluster}}/{{environ}}/{{topology}}/{{container}}/filedownload?path={{filestats[fn]["path"]}}'><font size=1>dl</font></a> {% else %}
- {{ filestats[fn]["formatted_stat"] }} <a href='{{baseUrl}}/topologies/filestats/{{cluster}}/{{environ}}/{{topology}}/{{container}}?path={{filestats[fn]["path"]}}'>{{ fn }}</a> {% end %} {% end %}
- </pre>
- </div>
+ <div class="span12 tight">
+
+ <ul class="files">
+ {%- for fpath, finfo in filestats | dictsort -%}
+ <li {%- if finfo.is_dir%} class="directory" {%- endif %}>
+ {#- All rows have the preformtted filestats -#}
+ <span class="file-stat">{{- finfo.formatted_stat -}}</span>
+ {%- if fpath == ".." -%}
+ <a href="{{ url_for("file_stats_page", cluster=cluster, environment=environ, topology=topology, container=container) }}?path={{ path.rpartition("/")[0] | urlencode }}" title="explore"> ../</a>
+ {%- elif finfo.is_dir -%}
+ <a href="{{ url_for("file_stats_page", cluster=cluster, environment=environ, topology=topology, container=container) }}?path={{ "{}/{}".format(path, fpath) | urlencode }}" title="explore"> {{ fpath }}/</a>
+ {%- else -%}
+ <a href="{{ url_for("file_page", cluster=cluster, environment=environ, topology=topology, container=container) }}?path={{ "{}/{}".format(path, fpath) | urlencode }}" title="preview"> {{ fpath }}</a>
+ {#- -#}
+ <a href="{{ url_for("file_download", cluster=cluster, environment=environ, topology=topology, container=container) }}?path={{ "{}/{}".format(path, fpath) | urlencode }}" title="download"><span class="glyphicon glyphicon-download"></span></a>
+ {%- endif -%}
+ </li>
+ {%- endfor -%}
+ </ul>
+ </div>
</div>
</body>
-
</html>
diff --git a/heron/tools/ui/resources/templates/config.html b/heron/tools/ui/resources/templates/config.html
index 9d25fa5..e9d0706 100644
--- a/heron/tools/ui/resources/templates/config.html
+++ b/heron/tools/ui/resources/templates/config.html
@@ -31,8 +31,8 @@ under the License.
<!-- Row end -->
</div>
-<script src="{{ static_url('js/d3.min.3.4.11.js') }}"></script>
-<script type="text/jsx" src="{{ static_url('js/config.js') }}"></script>
+<script src="{{ url_for('static', path='js/d3.min.3.4.11.js') }}"></script>
+<script type="text/jsx" src="{{ url_for('static', path='js/config.js') }}"></script>
<script type="application/javascript">
window.onload = function() {
@@ -49,4 +49,4 @@ under the License.
document.getElementById("display-config")
);
</script>
-{% end %}
+{% endblock %}
diff --git a/heron/tools/ui/resources/templates/error.html b/heron/tools/ui/resources/templates/error.html
index 0b549f4..a22469b 100644
--- a/heron/tools/ui/resources/templates/error.html
+++ b/heron/tools/ui/resources/templates/error.html
@@ -41,14 +41,14 @@ under the License.
<meta charset="utf-8">
<title>Heron Admin</title>
<meta content="width=device-width, initial-scale=1.0, user-scalable=no" name="viewport">
- <link rel="shortcut icon" href="{{ static_url('img/logo54x54.png') }}" type="image/x-icon" >
- <script src="{{ static_url('js/html5-trunk.js') }}"></script>
+ <link rel="shortcut icon" href="{{ url_for('static', path='img/logo54x54.png') }}" type="image/x-icon" >
+ <script src="{{ url_for('static', path='js/html5-trunk.js') }}"></script>
<!-- bootstrap css -->
- <link href="{{ static_url('css/bootstrap.css') }}" rel="stylesheet">
+ <link href="{{ url_for('static', path='css/bootstrap.css') }}" rel="stylesheet">
<!-- custom css -->
- <link href="{{ static_url('css/main.css') }}" rel="stylesheet">
+ <link href="{{ url_for('static', path='css/main.css') }}" rel="stylesheet">
</head>
diff --git a/heron/tools/ui/resources/templates/exception.html b/heron/tools/ui/resources/templates/exception.html
index 567322a..5531f31 100644
--- a/heron/tools/ui/resources/templates/exception.html
+++ b/heron/tools/ui/resources/templates/exception.html
@@ -31,8 +31,8 @@ under the License.
<!-- Row end -->
</div>
-<script src="{{ static_url('js/d3.min.3.4.11.js') }}"></script>
-<script type="text/jsx" src="{{ static_url('js/exceptions.js') }}"></script>
+<script src="{{ url_for('static', path='js/d3.min.3.4.11.js') }}"></script>
+<script type="text/jsx" src="{{ url_for('static', path='js/exceptions.js') }}"></script>
<script type="application/javascript">
window.onload = function() {
@@ -49,4 +49,4 @@ under the License.
document.getElementById("display-exceptions")
);
</script>
-{% end %}
+{% endblock %}
diff --git a/heron/tools/ui/resources/templates/file.html b/heron/tools/ui/resources/templates/file.html
index 75a48a4..38b678e 100644
--- a/heron/tools/ui/resources/templates/file.html
+++ b/heron/tools/ui/resources/templates/file.html
@@ -17,8 +17,8 @@
<head>
<meta charset="utf-8">
<title></title>
- <link rel="shortcut icon" href="{{ static_url('img/logo54x54.png') }}" type="image/x-icon" >
- <link href="{{ static_url('css/bootstrap.css') }}" rel="stylesheet">
+ <link rel="shortcut icon" href="{{ url_for('static', path='/img/logo54x54.png') }}" type="image/x-icon" >
+ <link href="{{ url_for('static', path='/css/bootstrap.css') }}" rel="stylesheet">
<style type="text/css">
.log {
font-family: "Inconsolata", "Monaco", "Courier New", "Courier";
@@ -32,8 +32,8 @@
background: #000000;
}
</style>
- <script src="{{ static_url('js/jquery.min.js') }}"></script>
- <script src="{{ static_url('js/jquery.pailer.js') }}"></script>
+ <script src="{{ url_for('static', path='/js/jquery.min.js') }}"></script>
+ <script src="{{ url_for('static', path='/js/jquery.pailer.js') }}"></script>
</head>
<style type="text/css">
diff --git a/heron/tools/ui/resources/templates/nav.html b/heron/tools/ui/resources/templates/nav.html
index eee4a55..00c3ccf 100644
--- a/heron/tools/ui/resources/templates/nav.html
+++ b/heron/tools/ui/resources/templates/nav.html
@@ -19,11 +19,11 @@ under the License.
<div id="mainnav">
<ul style="margin-left:5px;">
- <li class="{{ className(active, 'topologies') }}">
- {% if className(active, 'topologies') %}
+ <li {% if active == 'topologies' %}class="active"{% endif %}>
+ {% if active == 'topologies' %}
<span class="current-arrow"> </span>
- {% end %}
- <a href="{{ reverse_url('topologies') }}">
+ {% endif %}
+ <a href="{{ url_for('topologies_page') }}">
<div class="icon">
<span class="fs1" aria-hidden="true" data-icon=""></span>
</div>
diff --git a/heron/tools/ui/resources/templates/shell.snip.html b/heron/tools/ui/resources/templates/shell.snip.html
new file mode 100644
index 0000000..652780f
--- /dev/null
+++ b/heron/tools/ui/resources/templates/shell.snip.html
@@ -0,0 +1,8 @@
+{{ info | safe }}
+<pre>
+<br/>
+${{ host }}>: {{ command }}
+<br/>
+<br/>
+{{ output }}
+</pre>
diff --git a/heron/tools/ui/resources/templates/topologies.html b/heron/tools/ui/resources/templates/topologies.html
index 856a95c..926c384 100644
--- a/heron/tools/ui/resources/templates/topologies.html
+++ b/heron/tools/ui/resources/templates/topologies.html
@@ -32,8 +32,8 @@ under the License.
<!-- Row end -->
</div>
-<script src="{{ static_url('js/d3.min.3.4.11.js') }}"></script>
-<script type="text/jsx" src="{{ static_url('js/alltopologies.js') }}" ></script>
+<script src="{{ url_for('static', path='/js/d3.min.3.4.11.js') }}"></script>
+<script type="text/jsx" src="{{ url_for('static', path='/js/alltopologies.js') }}" ></script>
<script type="application/javascript">
window.onload = function() {
d3.select("#page-title")
@@ -48,8 +48,8 @@ under the License.
/** @jsx React.DOM */
React.renderComponent(
- <FilterableTopologyTable clusters={ {% raw clusters %} } />,
+ <FilterableTopologyTable clusters={ {{ clusters | safe }} } />,
document.getElementById('topotable-container')
);
</script>
-{% end %}
+{% endblock %}
diff --git a/heron/tools/ui/resources/templates/topology.html b/heron/tools/ui/resources/templates/topology.html
index c109625..f6e8f53 100644
--- a/heron/tools/ui/resources/templates/topology.html
+++ b/heron/tools/ui/resources/templates/topology.html
@@ -75,10 +75,10 @@ under the License.
<td>{{launched}}</td>
<td>{{execution_state['submission_user']}}</td>
<td>
- <a id="jobPageLink" class="btn btn-primary btn-xs" href={{job_page_link}} target="_blank">Job</a>
+ <a id="jobPageLink" class="btn btn-primary btn-xs" href={{job_page_link}} target="_blank">Files</a>
{% for extra_link in execution_state['extra_links'] %}
<a id="{{extra_link['name']}}" class="btn btn-primary btn-xs" href="{{extra_link['url']}}" target="_blank">{{extra_link['name']}}</a>
- {% end %}
+ {% endfor %}
</td>
</tr>
</tbody>
@@ -234,17 +234,17 @@ under the License.
</div>
</div>
-<script src="{{ static_url('js/d3.min.3.4.11.js') }}"></script>
-<script src="{{ static_url('js/d3-tip.min.0.6.3.js') }}"></script>
-<script src="{{ static_url('js/stat-trendlines.js') }}"></script>
-<script src="{{ static_url('js/plan-controller.js') }}"></script>
-<script src="{{ static_url('js/physical-plan.js') }}"></script>
-<script src="{{ static_url('js/logical-plan.js') }}"></script>
-<script src="{{ static_url('js/plan-stats.js') }}"></script>
-<script src="{{ static_url('js/colors.js') }}"></script>
-<script src="{{ static_url('js/topology-details.js') }}"></script>
-<script type="text/jsx" src="{{ static_url('js/topologies.js') }}" ></script>
-<script type="text/jsx" src="{{ static_url('js/config.js') }}"></script>
+<script src="{{ url_for('static', path='js/d3.min.3.4.11.js') }}"></script>
+<script src="{{ url_for('static', path='js/d3-tip.min.0.6.3.js') }}"></script>
+<script src="{{ url_for('static', path='js/stat-trendlines.js') }}"></script>
+<script src="{{ url_for('static', path='js/plan-controller.js') }}"></script>
+<script src="{{ url_for('static', path='js/physical-plan.js') }}"></script>
+<script src="{{ url_for('static', path='js/logical-plan.js') }}"></script>
+<script src="{{ url_for('static', path='js/plan-stats.js') }}"></script>
+<script src="{{ url_for('static', path='js/colors.js') }}"></script>
+<script src="{{ url_for('static', path='js/topology-details.js') }}"></script>
+<script type="text/jsx" src="{{ url_for('static', path='js/topologies.js') }}" ></script>
+<script type="text/jsx" src="{{ url_for('static', path='js/config.js') }}"></script>
<script type="application/javascript">
function calculateParallelism() {
@@ -386,4 +386,4 @@ under the License.
);
</script>
-{% end %}
+{% endblock %}
diff --git a/heron/tools/ui/src/python/BUILD b/heron/tools/ui/src/python/BUILD
index 100d9a5..187eea5 100644
--- a/heron/tools/ui/src/python/BUILD
+++ b/heron/tools/ui/src/python/BUILD
@@ -8,7 +8,11 @@ pex_library(
),
reqs = [
"requests==2.12.3",
- "tornado==4.0.2",
+ "click==7.1.2",
+ "fastapi==0.60.1",
+ "jinja2==2.11.2",
+ "aiofiles==0.5.0",
+ "uvicorn==0.11.7",
],
deps = [
"//heron/common/src/python:common-py",
diff --git a/heron/tools/ui/src/python/args.py b/heron/tools/ui/src/python/args.py
deleted file mode 100644
index 54022f4..0000000
--- a/heron/tools/ui/src/python/args.py
+++ /dev/null
@@ -1,134 +0,0 @@
-#!/usr/bin/env python3
-# -*- encoding: utf-8 -*-
-
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-''' args.py '''
-import argparse
-
-import heron.tools.ui.src.python.consts as consts
-
-
-# pylint: disable=protected-access,superfluous-parens
-class _HelpAction(argparse._HelpAction):
- def __call__(self, parser, namespace, values, option_string=None):
- parser.print_help()
-
- # retrieve subparsers from parser
- subparsers_actions = [
- action for action in parser._actions
- if isinstance(action, argparse._SubParsersAction)
- ]
-
- # there will probably only be one subparser_action,
- # but better save than sorry
- for subparsers_action in subparsers_actions:
- # get all subparsers and print help
- for choice, subparser in list(subparsers_action.choices.items()):
- print("Subparser '{}'".format(choice))
- print(subparser.format_help())
-
- parser.exit()
-
-
-class SubcommandHelpFormatter(argparse.RawDescriptionHelpFormatter):
- ''' SubcommandHelpFormatter '''
-
- def _format_action(self, action):
- # pylint: disable=bad-super-call
- parts = super(argparse.RawDescriptionHelpFormatter, self)._format_action(action)
- if action.nargs == argparse.PARSER:
- parts = "\n".join(parts.split("\n")[1:])
- return parts
-
-
-def add_titles(parser):
- '''
- :param parser:
- :return:
- '''
- parser._positionals.title = "Required arguments"
- parser._optionals.title = "Optional arguments"
- return parser
-
-
-def add_arguments(parser):
- '''
- :param parser:
- :return:
- '''
- parser.add_argument(
- '--tracker_url',
- metavar='(a url; path to tracker; default: "' + consts.DEFAULT_TRACKER_URL + '")',
- default=consts.DEFAULT_TRACKER_URL)
-
- parser.add_argument(
- '--address',
- metavar='(an string; address to listen; default: "' + consts.DEFAULT_ADDRESS + '")',
- default=consts.DEFAULT_ADDRESS)
-
- parser.add_argument(
- '--port',
- metavar='(an integer; port to listen; default: ' + str(consts.DEFAULT_PORT) + ')',
- type=int,
- default=consts.DEFAULT_PORT)
-
- parser.add_argument(
- '--base_url',
- metavar='(a string; the base url path if operating behind proxy; default: '
- + str(consts.DEFAULT_BASE_URL) + ')',
- default=consts.DEFAULT_BASE_URL)
-
- return parser
-
-
-def create_parsers():
- '''
- :return:
- '''
- parser = argparse.ArgumentParser(
- epilog='For detailed documentation, go to https://heron.incubator.apache.org',
- usage="%(prog)s [options] [help]",
- add_help=False)
-
- parser = add_titles(parser)
- parser = add_arguments(parser)
-
- # create the child parser for subcommand
- child_parser = argparse.ArgumentParser(
- parents=[parser],
- formatter_class=SubcommandHelpFormatter,
- add_help=False)
-
- # subparser for each command
- subparsers = child_parser.add_subparsers(
- title="Available commands")
-
- help_parser = subparsers.add_parser(
- 'help',
- help='Prints help',
- add_help=False)
-
- version_parser = subparsers.add_parser(
- 'version',
- help='Prints version',
- add_help=True)
-
- help_parser.set_defaults(help=True)
- version_parser.set_defaults(version=True)
- return (parser, child_parser)
diff --git a/heron/tools/ui/src/python/consts.py b/heron/tools/ui/src/python/consts.py
deleted file mode 100644
index 7116a53..0000000
--- a/heron/tools/ui/src/python/consts.py
+++ /dev/null
@@ -1,35 +0,0 @@
-#!/usr/bin/env python3
-# -*- encoding: utf-8 -*-
-
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-''' consts.py '''
-import heron.tools.common.src.python.utils.config as common_config
-
-# default parameter - address for the web to ui to listen on
-DEFAULT_ADDRESS = "0.0.0.0"
-
-# default parameter - port for the web to ui to listen on
-DEFAULT_PORT = 8889
-
-# default parameter - url to connect to heron tracker
-DEFAULT_TRACKER_URL = "http://127.0.0.1:8888"
-
-DEFAULT_BASE_URL = ""
-
-VERSION = common_config.get_version_number()
diff --git a/heron/tools/ui/src/python/handlers/__init__.py b/heron/tools/ui/src/python/handlers/__init__.py
deleted file mode 100644
index 32aa1dc..0000000
--- a/heron/tools/ui/src/python/handlers/__init__.py
+++ /dev/null
@@ -1,36 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-''' handler module '''
-__all__ = ['handlers']
-
-from heron.tools.ui.src.python.handlers import api
-
-from heron.tools.ui.src.python.handlers.base import BaseHandler
-from heron.tools.ui.src.python.handlers.mainhandler import MainHandler
-from heron.tools.ui.src.python.handlers.notfound import NotFoundHandler
-
-################################################################################
-# Handlers for topology related requests
-################################################################################
-from heron.tools.ui.src.python.handlers.topology import ContainerFileDataHandler
-from heron.tools.ui.src.python.handlers.topology import ContainerFileDownloadHandler
-from heron.tools.ui.src.python.handlers.topology import ContainerFileHandler
-from heron.tools.ui.src.python.handlers.topology import ContainerFileStatsHandler
-from heron.tools.ui.src.python.handlers.topology import ListTopologiesHandler
-from heron.tools.ui.src.python.handlers.topology import TopologyPlanHandler
-from heron.tools.ui.src.python.handlers.topology import TopologyConfigHandler
-from heron.tools.ui.src.python.handlers.topology import TopologyExceptionsPageHandler
diff --git a/heron/tools/ui/src/python/handlers/api/__init__.py b/heron/tools/ui/src/python/handlers/api/__init__.py
deleted file mode 100644
index e36109d..0000000
--- a/heron/tools/ui/src/python/handlers/api/__init__.py
+++ /dev/null
@@ -1,36 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-''' api module '''
-from .metrics import (
- MetricsHandler,
- MetricsTimelineHandler
-)
-
-from .topology import (
- TopologyExceptionSummaryHandler,
- ListTopologiesJsonHandler,
- TopologyLogicalPlanJsonHandler,
- TopologyPackingPlanJsonHandler,
- TopologyPhysicalPlanJsonHandler,
- TopologySchedulerLocationJsonHandler,
- TopologyExecutionStateJsonHandler,
- TopologyExceptionsJsonHandler,
- PidHandler,
- JstackHandler,
- MemoryHistogramHandler,
- JmapHandler
-)
diff --git a/heron/tools/ui/src/python/handlers/api/metrics.py b/heron/tools/ui/src/python/handlers/api/metrics.py
deleted file mode 100644
index d70b1fd..0000000
--- a/heron/tools/ui/src/python/handlers/api/metrics.py
+++ /dev/null
@@ -1,99 +0,0 @@
-#!/usr/bin/env python3
-# -*- encoding: utf-8 -*-
-
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-''' metrics.py '''
-from heron.tools.ui.src.python.handlers import base
-import heron.tools.common.src.python.access as access
-
-import tornado.gen
-
-ALL_INSTANCES = '*'
-
-# pylint: disable=invalid-name
-query_handler = access.HeronQueryHandler()
-
-
-class MetricsHandler(base.BaseHandler):
- ''' MetricsHandler '''
-
- @tornado.gen.coroutine
- def get(self):
- '''
- :return:
- '''
- cluster = self.get_argument("cluster")
- environ = self.get_argument("environ")
- topology = self.get_argument("topology")
- component = self.get_argument("component", default=None)
- metricnames = self.get_arguments("metricname")
- instances = self.get_arguments("instance")
- interval = self.get_argument("interval", default=-1)
- time_range = (0, interval)
- compnames = [component] if component else (yield access.get_comps(cluster, environ, topology))
-
- # fetch the metrics
- futures = {}
- for comp in compnames:
- future = access.get_comp_metrics(
- cluster, environ, topology, comp, instances,
- metricnames, time_range)
- futures[comp] = future
-
- results = yield futures
-
- self.write(results[component] if component else results)
-
-
-class MetricsTimelineHandler(base.BaseHandler):
- ''' MetricsTimelineHandler '''
-
- @tornado.gen.coroutine
- def get(self):
- '''
- :return:
- '''
- cluster = self.get_argument("cluster")
- environ = self.get_argument("environ")
- topology = self.get_argument("topology")
- component = self.get_argument("component", default=None)
- metric = self.get_argument("metric")
- instances = self.get_argument("instance")
- start = self.get_argument("starttime")
- end = self.get_argument("endtime")
- maxquery = self.get_argument("max", default=False)
- timerange = (start, end)
- compnames = [component]
-
- # fetch the metrics
- futures = {}
- if metric == "backpressure":
- for comp in compnames:
- future = query_handler.fetch_backpressure(cluster, metric, topology, component,
- instances, timerange, maxquery, environ)
- futures[comp] = future
- else:
- fetch = query_handler.fetch_max if maxquery else query_handler.fetch
- for comp in compnames:
- future = fetch(cluster, metric, topology, component,
- instances, timerange, environ)
- futures[comp] = future
-
- results = yield futures
- self.write(results[component] if component else results)
diff --git a/heron/tools/ui/src/python/handlers/api/topology.py b/heron/tools/ui/src/python/handlers/api/topology.py
deleted file mode 100644
index c519bc5..0000000
--- a/heron/tools/ui/src/python/handlers/api/topology.py
+++ /dev/null
@@ -1,358 +0,0 @@
-#!/usr/bin/env python3
-# -*- encoding: utf-8 -*-
-
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-''' topology.py '''
-import json
-import time
-import tornado.escape
-import tornado.web
-import tornado.gen
-
-from heron.tools.ui.src.python.handlers import base
-import heron.tools.common.src.python.access as access
-from heron.tools.ui.src.python.handlers import common
-
-
-class TopologyExceptionSummaryHandler(base.BaseHandler):
- ''' TopologyExceptionSummaryHandler '''
-
- @tornado.gen.coroutine
- def get(self, cluster, environ, topology, comp_name):
- '''
- :param cluster:
- :param environ:
- :param topology:
- :param comp_name:
- :return:
- '''
- start_time = time.time()
- comp_names = []
- if comp_name == "All":
- lplan = yield access.get_logical_plan(cluster, environ, topology)
- if not lplan:
- self.write(dict())
- return
-
- if not 'spouts' in lplan or not 'bolts' in lplan:
- self.write(dict())
- return
- comp_names = list(lplan['spouts'].keys())
- comp_names.extend(list(lplan['bolts'].keys()))
- else:
- comp_names = [comp_name]
- exception_infos = dict()
- for comp_name_ in comp_names:
- exception_infos[comp_name_] = yield access.get_component_exceptionsummary(
- cluster, environ, topology, comp_name_)
-
- # Combine exceptions from multiple component
- aggregate_exceptions = dict()
- for comp_name_, exception_logs in list(exception_infos.items()):
- for exception_log in exception_logs:
- class_name = exception_log['class_name']
- if class_name != '':
- if not class_name in aggregate_exceptions:
- aggregate_exceptions[class_name] = 0
- aggregate_exceptions[class_name] += int(exception_log['count'])
- # Put the exception value in a table
- aggregate_exceptions_table = []
- for key in aggregate_exceptions:
- aggregate_exceptions_table.append([key, str(aggregate_exceptions[key])])
- result = dict(
- status="success",
- executiontime=time.time() - start_time,
- result=aggregate_exceptions_table)
- self.write(result)
-
-
-class ListTopologiesJsonHandler(base.BaseHandler):
- ''' ListTopologiesJsonHandler '''
-
- @tornado.gen.coroutine
- def get(self):
- '''
- :return:
- '''
- # get all the topologies from heron nest
- topologies = yield access.get_topologies_states()
-
- result = dict()
-
- # now convert some of the fields to be displayable
- for cluster, cluster_value in list(topologies.items()):
- result[cluster] = dict()
- for environ, environ_value in list(cluster_value.items()):
- result[cluster][environ] = dict()
- for topology, topology_value in list(environ_value.items()):
- if "jobname" not in topology_value or topology_value["jobname"] is None:
- continue
-
- if "submission_time" in topology_value:
- topology_value["submission_time"] = topology_value["submission_time"]
- else:
- topology_value["submission_time"] = '-'
-
- result[cluster][environ][topology] = topology_value
-
- self.write(result)
-
-
-class TopologyLogicalPlanJsonHandler(base.BaseHandler):
- ''' TopologyLogicalPlanJsonHandler '''
-
- @tornado.gen.coroutine
- def get(self, cluster, environ, topology):
- '''
- :param cluster:
- :param environ:
- :param topology:
- :return:
- '''
-
- start_time = time.time()
- lplan = yield access.get_logical_plan(cluster, environ, topology)
-
- # construct the result
- result = dict(
- status="success",
- message="",
- version=common.VERSION,
- executiontime=time.time() - start_time,
- result=lplan
- )
-
- self.write(result)
-
-
-class TopologyPackingPlanJsonHandler(base.BaseHandler):
- ''' TopologyPackingPlanJsonHandler '''
-
- @tornado.gen.coroutine
- def get(self, cluster, environ, topology):
- '''
- :param cluster:
- :param environ:
- :param topology:
- :return:
- '''
-
- start_time = time.time()
- packing_plan = yield access.get_packing_plan(cluster, environ, topology)
-
- result_map = dict(
- status="success",
- message="",
- version=common.VERSION,
- executiontime=time.time() - start_time,
- result=packing_plan
- )
-
- self.write(result_map)
-
-
-class TopologyPhysicalPlanJsonHandler(base.BaseHandler):
- ''' TopologyPhysicalPlanJsonHandler '''
-
- @tornado.gen.coroutine
- def get(self, cluster, environ, topology):
- '''
- :param cluster:
- :param environ:
- :param topology:
- :return:
- '''
-
- start_time = time.time()
- pplan = yield access.get_physical_plan(cluster, environ, topology)
-
- result_map = dict(
- status="success",
- message="",
- version=common.VERSION,
- executiontime=time.time() - start_time,
- result=pplan
- )
-
- self.write(result_map)
-
-
-class TopologyExecutionStateJsonHandler(base.BaseHandler):
- ''' TopologyExecutionStateJsonHandler '''
-
- @tornado.gen.coroutine
- def get(self, cluster, environ, topology):
- '''
- :param cluster:
- :param environ:
- :param topology:
- :return:
- '''
- start_time = time.time()
- estate = yield access.get_execution_state(cluster, environ, topology)
-
- result_map = dict(
- status="success",
- message="",
- version=common.VERSION,
- executiontime=time.time() - start_time,
- result=estate
- )
-
- self.write(result_map)
-
-
-class TopologySchedulerLocationJsonHandler(base.BaseHandler):
- ''' TopologySchedulerLocationJsonHandler '''
-
- # pylint: disable=unused-argument
- @tornado.gen.coroutine
- def get(self, cluster, environ, topology):
- '''
- :param cluster:
- :param environ:
- :param topology:
- :return:
- '''
- start_time = time.time()
- estate = yield access.get_execution_state(cluster, environ, topology)
-
- result_map = dict(
- status="success",
- message="",
- version=common.VERSION,
- executiontime=time.time() - start_time,
- result=estate
- )
-
- self.write(result_map)
-
-
-class TopologyExceptionsJsonHandler(base.BaseHandler):
- ''' Handler for getting exceptions '''
-
- @tornado.gen.coroutine
- def get(self, cluster, environ, topology, component):
- '''
- :param cluster:
- :param environ:
- :param topology:
- :param component:
- :return:
- '''
- start_time = time.time()
- futures = yield access.get_component_exceptions(cluster, environ, topology, component)
- result_map = dict(
- status='success',
- executiontime=time.time() - start_time,
- result=futures)
- self.write(json.dumps(result_map))
-
-
-class PidHandler(base.BaseHandler):
- ''' PidHandler '''
-
- @tornado.gen.coroutine
- def get(self, cluster, environ, topology, instance):
- '''
- :param cluster:
- :param environ:
- :param topology:
- :param instance:
- :return:
- '''
- pplan = yield access.get_physical_plan(cluster, environ, topology)
- host = pplan['stmgrs'][pplan['instances'][instance]['stmgrId']]['host']
- result = json.loads((yield access.get_instance_pid(
- cluster, environ, topology, instance)))
- self.write('<pre><br/>$%s>: %s<br/><br/>%s</pre>' % (
- host,
- tornado.escape.xhtml_escape(result['command']),
- tornado.escape.xhtml_escape(result['stdout'])))
-
-
-class JstackHandler(base.BaseHandler):
- ''' JstackHandler '''
-
- @tornado.gen.coroutine
- def get(self, cluster, environ, topology, instance):
- '''
- :param cluster:
- :param environ:
- :param topology:
- :param instance:
- :return:
- '''
- pplan = yield access.get_physical_plan(cluster, environ, topology)
- host = pplan['stmgrs'][pplan['instances'][instance]['stmgrId']]['host']
- result = json.loads((yield access.get_instance_jstack(
- cluster, environ, topology, instance)))
- self.write('<pre><br/>$%s>: %s<br/><br/>%s</pre>' % (
- host,
- tornado.escape.xhtml_escape(result['command']),
- tornado.escape.xhtml_escape(result['stdout'])))
-
-
-class MemoryHistogramHandler(base.BaseHandler):
- ''' MemoryHistogramHandler '''
-
- @tornado.gen.coroutine
- def get(self, cluster, environ, topology, instance):
- '''
- :param cluster:
- :param environ:
- :param topology:
- :param instance:
- :return:
- '''
- pplan = yield access.get_physical_plan(cluster, environ, topology)
- host = pplan['stmgrs'][pplan['instances'][instance]['stmgrId']]['host']
- result = json.loads((yield access.get_instance_mem_histogram(
- cluster, environ, topology, instance)))
- self.write('<pre><br/>$%s>: %s<br/><br/>%s</pre>' % (
- host,
- tornado.escape.xhtml_escape(result['command']),
- tornado.escape.xhtml_escape(result['stdout'])))
-
-
-class JmapHandler(base.BaseHandler):
- ''' JmapHandler '''
-
- @tornado.gen.coroutine
- def get(self, cluster, environ, topology, instance):
- '''
- :param cluster:
- :param environ:
- :param topology:
- :param instance:
- :return:
- '''
- pplan = yield access.get_physical_plan(cluster, environ, topology)
- host = pplan['stmgrs'][pplan['instances'][instance]['stmgrId']]['host']
- result = json.loads((yield access.run_instance_jmap(
- cluster, environ, topology, instance)))
- notes = "<br/>\n".join([
- "* May Take longer than usual (1-2 min) please be patient."
- "* Use scp to copy heap dump files from host. (scp %s:/tmp/heap.bin /tmp/)" % host
- ])
- self.write('<pre>%s<br/>$%s>: %s<br/><br/>%s</pre>' % (
- notes,
- host,
- tornado.escape.xhtml_escape(result['command']),
- tornado.escape.xhtml_escape(result['stdout'])))
diff --git a/heron/tools/ui/src/python/handlers/base.py b/heron/tools/ui/src/python/handlers/base.py
deleted file mode 100644
index 059f3f6..0000000
--- a/heron/tools/ui/src/python/handlers/base.py
+++ /dev/null
@@ -1,45 +0,0 @@
-#!/usr/bin/env python3
-# -*- encoding: utf-8 -*-
-
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-''' base.py '''
-import tornado.ioloop
-import tornado.web
-import tornado.httpserver
-
-
-# pylint: disable=abstract-method
-class BaseHandler(tornado.web.RequestHandler):
- ''' BaseHandler '''
-
- def write_error(self, status_code, **kwargs):
- '''
- :param status_code:
- :param kwargs:
- :return:
- '''
- if "exc_info" in kwargs:
- exc_info = kwargs["exc_info"]
- error = exc_info[1]
-
- errormessage = "%s: %s" % (status_code, error)
- self.render("error.html", errormessage=errormessage)
- else:
- errormessage = "%s" % (status_code)
- self.render("error.html", errormessage=errormessage)
diff --git a/heron/tools/ui/src/python/handlers/common/__init__.py b/heron/tools/ui/src/python/handlers/common/__init__.py
deleted file mode 100644
index 4ace12b..0000000
--- a/heron/tools/ui/src/python/handlers/common/__init__.py
+++ /dev/null
@@ -1,21 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-''' common module '''
-__all__ = ['common']
-
-from heron.tools.ui.src.python.handlers.common.consts import *
-from heron.tools.ui.src.python.handlers.common.utils import *
diff --git a/heron/tools/ui/src/python/handlers/common/consts.py b/heron/tools/ui/src/python/handlers/common/consts.py
deleted file mode 100644
index d033153..0000000
--- a/heron/tools/ui/src/python/handlers/common/consts.py
+++ /dev/null
@@ -1,22 +0,0 @@
-#!/usr/bin/env python3
-# -*- encoding: utf-8 -*-
-
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-''' consts.py '''
-VERSION = "1.0.0"
diff --git a/heron/tools/ui/src/python/handlers/common/utils.py b/heron/tools/ui/src/python/handlers/common/utils.py
deleted file mode 100644
index ccb7b85..0000000
--- a/heron/tools/ui/src/python/handlers/common/utils.py
+++ /dev/null
@@ -1,33 +0,0 @@
-#!/usr/bin/env python3
-# -*- encoding: utf-8 -*-
-
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-''' utils.py '''
-
-
-# pylint: disable=invalid-name
-def className(selected, item):
- '''
- :param selected:
- :param item:
- :return:
- '''
- if selected == item:
- return "active"
- return ''
diff --git a/heron/tools/ui/src/python/handlers/mainhandler.py b/heron/tools/ui/src/python/handlers/mainhandler.py
deleted file mode 100644
index 59b5b5d..0000000
--- a/heron/tools/ui/src/python/handlers/mainhandler.py
+++ /dev/null
@@ -1,32 +0,0 @@
-#!/usr/bin/env python3
-# -*- encoding: utf-8 -*-
-
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-''' mainhandler.py '''
-from heron.tools.ui.src.python.handlers import base
-
-
-class MainHandler(base.BaseHandler):
- ''' MainHandler'''
-
- def get(self):
- '''
- :return:
- '''
- self.redirect("/topologies")
diff --git a/heron/tools/ui/src/python/handlers/notfound.py b/heron/tools/ui/src/python/handlers/notfound.py
deleted file mode 100644
index 57c62a0..0000000
--- a/heron/tools/ui/src/python/handlers/notfound.py
+++ /dev/null
@@ -1,36 +0,0 @@
-#!/usr/bin/env python3
-# -*- encoding: utf-8 -*-
-
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-''' notfound.py '''
-from heron.tools.ui.src.python.handlers import base
-
-
-class NotFoundHandler(base.BaseHandler):
- ''' NotFoundHandler '''
-
- # pylint: disable=unused-argument
- def get(self, *args, **kwargs):
- '''
- :param args:
- :param kwargs:
- :return:
- '''
- errormessage = "Sorry, we could not find this page"
- self.render("error.html", errormessage=errormessage)
diff --git a/heron/tools/ui/src/python/handlers/ranges.py b/heron/tools/ui/src/python/handlers/ranges.py
deleted file mode 100644
index eca3d42..0000000
--- a/heron/tools/ui/src/python/handlers/ranges.py
+++ /dev/null
@@ -1,55 +0,0 @@
-#!/usr/bin/env python3
-# -*- encoding: utf-8 -*-
-
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-''' ranges.py '''
-import time
-
-TIME_RANGES_3 = {
- "tenMinMetric": (0, 10 * 60, 'm'),
- "threeHourMetric": (0, 3 * 60 * 60, 'h'),
- "oneDayMetric": (0, 24 * 60 * 60, 'h'),
-}
-
-TIME_RANGES_6 = {
- "tenMinMetric": (0, 10 * 60, 'm'),
- "threeHourMetric": (0, 3 * 60 * 60, 'h'),
- "oneDayMetric": (0, 24 * 60 * 60, 'h'),
-
- "prevTenMinMetric": (10 * 60, 20 * 60, 'm'),
- "prevThreeHourMetric": (3 * 60 * 60, 6 * 60 * 60, 'h'),
- "prevOneDayMetric": (24 * 60 * 60, 48 * 60 * 60, 'h')
-}
-
-
-def get_time_ranges(ranges):
- '''
- :param ranges:
- :return:
- '''
- # get the current time
- now = int(time.time())
-
- # form the new
- time_slots = dict()
-
- for key, value in list(ranges.items()):
- time_slots[key] = (now - value[0], now - value[1], value[2])
-
- return (now, time_slots)
diff --git a/heron/tools/ui/src/python/handlers/topology.py b/heron/tools/ui/src/python/handlers/topology.py
deleted file mode 100644
index 98f502e..0000000
--- a/heron/tools/ui/src/python/handlers/topology.py
+++ /dev/null
@@ -1,316 +0,0 @@
-#!/usr/bin/env python3
-# -*- encoding: utf-8 -*-
-
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-''' topology.py '''
-import random
-from datetime import datetime
-import tornado.escape
-import tornado.web
-import tornado.gen
-
-import heron.tools.common.src.python.access as access
-import heron.common.src.python.utils.log as log
-
-from . import base
-from . import common
-
-Log = log.Log
-
-################################################################################
-# pylint: disable=abstract-method
-# pylint: disable=arguments-differ
-class TopologyConfigHandler(base.BaseHandler):
- ''' Handler for displaying the config for a topology '''
-
- def initialize(self, baseUrl):
- self.baseUrl = baseUrl
-
- def get(self, cluster, environ, topology):
- '''
- :param cluster:
- :param environ:
- :param topology:
- :return:
- '''
- # pylint: disable=no-member
- options = dict(
- cluster=cluster,
- environ=environ,
- topology=topology,
- active="topologies",
- function=common.className,
- baseUrl=self.baseUrl)
- self.render("config.html", **options)
-
-
-################################################################################
-class TopologyExceptionsPageHandler(base.BaseHandler):
- ''' Handler for displaying all the exceptions of a topology '''
-
- def initialize(self, baseUrl):
- self.baseUrl = baseUrl
-
- def get(self, cluster, environ, topology, comp_name, instance):
- '''
- :param cluster:
- :param environ:
- :param topology:
- :param comp_name:
- :param instance:
- :return:
- '''
- # pylint: disable=no-member
- options = dict(
- cluster=cluster,
- environ=environ,
- topology=topology,
- comp_name=comp_name,
- instance=instance,
- active="topologies",
- function=common.className,
- baseUrl=self.baseUrl)
- # send the exception
- self.render("exception.html", **options)
-
-
-class ListTopologiesHandler(base.BaseHandler):
- ''' Handler for displaying all the topologies - defaults to 'local'''
-
- def initialize(self, baseUrl):
- self.baseUrl = baseUrl
-
- @tornado.gen.coroutine
- def get(self):
- '''
- :return:
- '''
- clusters = yield access.get_clusters()
-
- # pylint: disable=no-member
- options = dict(
- topologies=[], # no topologies
- clusters=[str(cluster) for cluster in clusters],
- active="topologies", # active icon the nav bar
- function=common.className,
- baseUrl=self.baseUrl
- )
-
- # send the all topologies page
- self.render("topologies.html", **options)
-
-
-################################################################################
-class TopologyPlanHandler(base.BaseHandler):
- ''' Handler for displaying the logical plan of a topology '''
-
- def initialize(self, baseUrl):
- self.baseUrl = baseUrl
-
- @tornado.gen.coroutine
- def get(self, cluster, environ, topology):
- '''
- :param cluster:
- :param environ:
- :param topology:
- :return:
- '''
-
- # fetch the execution of the topology asynchronously
- execution_state = yield access.get_execution_state(cluster, environ, topology)
-
- # fetch scheduler location of the topology
- scheduler_location = yield access.get_scheduler_location(cluster, environ, topology)
-
- job_page_link = scheduler_location["job_page_link"]
-
- # convert the topology launch time to display format
- launched_at = datetime.utcfromtimestamp(execution_state['submission_time'])
- launched_time = launched_at.strftime('%Y-%m-%d %H:%M:%S UTC')
-
- # pylint: disable=no-member
- options = dict(
- cluster=cluster,
- environ=environ,
- topology=topology,
- execution_state=execution_state,
- launched=launched_time,
- status="running" if random.randint(0, 1) else "errors",
- active="topologies",
- job_page_link=job_page_link,
- function=common.className,
- baseUrl=self.baseUrl
- )
-
- # send the single topology page
- self.render("topology.html", **options)
-
-
-################################################################################
-# Handler for displaying the log file for an instance
-class ContainerFileHandler(base.BaseHandler):
- """
- Responsible for creating the web page for files. The html
- will in turn call another endpoint to get the file data.
- """
-
- def initialize(self, baseUrl):
- self.baseUrl = baseUrl
-
- @tornado.gen.coroutine
- def get(self, cluster, environ, topology, container):
- '''
- :param cluster:
- :param environ:
- :param topology:
- :param container:
- :return:
- '''
- path = self.get_argument("path")
-
- options = dict(
- cluster=cluster,
- environ=environ,
- topology=topology,
- container=container,
- path=path,
- baseUrl=self.baseUrl
- )
-
- self.render("file.html", **options)
-
-
-################################################################################
-# Handler for getting the data for a file in a container of a topology
-class ContainerFileDataHandler(base.BaseHandler):
- """
- Responsible for getting the data for a file in a container of a topology.
- """
-
- def initialize(self, baseUrl):
- self.baseUrl = baseUrl
-
- @tornado.gen.coroutine
- def get(self, cluster, environ, topology, container):
- '''
- :param cluster:
- :param environ:
- :param topology:
- :param container:
- :return:
- '''
- offset = self.get_argument("offset")
- length = self.get_argument("length")
- path = self.get_argument("path")
-
- data = yield access.get_container_file_data(cluster, environ, topology, container, path,
- offset, length)
-
- self.write(data)
- self.finish()
-
-
-################################################################################
-# Handler for getting the file stats for a container
-class ContainerFileStatsHandler(base.BaseHandler):
- """
- Responsible for getting the file stats for a container.
- """
-
- def initialize(self, baseUrl):
- self.baseUrl = baseUrl
-
- @tornado.gen.coroutine
- def get(self, cluster, environ, topology, container):
- '''
- :param cluster:
- :param environ:
- :param topology:
- :param container:
- :return:
- '''
- path = self.get_argument("path", default=".")
- data = yield access.get_filestats(cluster, environ, topology, container, path)
-
- options = dict(
- cluster=cluster,
- environ=environ,
- topology=topology,
- container=container,
- path=path,
- filestats=data,
- baseUrl=self.baseUrl)
- self.render("browse.html", **options)
-
-
-################################################################################
-# Handler for downloading the file from a container
-class ContainerFileDownloadHandler(base.BaseHandler):
- """
- Responsible for downloading the file from a container.
- """
- def initialize(self, baseUrl):
- self.baseUrl = baseUrl
-
- @tornado.gen.coroutine
- def get(self, cluster, environ, topology, container):
- '''
- :param cluster:
- :param environ:
- :param topology:
- :param container:
- :return:
- '''
- # If the file is large, we want to abandon downloading
- # if user cancels the requests.
- # pylint: disable=attribute-defined-outside-init
- self.connection_closed = False
-
- path = self.get_argument("path")
- filename = path.split("/")[-1]
- self.set_header("Content-Disposition", "attachment; filename=%s" % filename)
-
- # Download the files in chunks. We are downloading from Tracker,
- # which in turns downloads from heron-shell. This much indirection
- # means that if we use static file downloading, the whole files would
- # be cached in memory before it can be sent downstream. Hence, we reuse
- # the file data API to read in chunks until the EOF, or until the download
- # is cancelled by user.
-
- # 4 MB gives good enough chunk size giving good speed for small files.
- # If files are large, a single threaded download may not be enough.
- file_download_url = access.get_container_file_download_url(cluster, environ,
- topology, container, path)
-
- Log.debug("file download url: %s", str(file_download_url))
- def streaming_callback(chunk):
- self.write(chunk)
- self.flush()
-
- http_client = tornado.httpclient.AsyncHTTPClient()
- yield http_client.fetch(file_download_url, streaming_callback=streaming_callback)
- self.finish()
-
-
- def on_connection_close(self):
- '''
- :return:
- '''
- # pylint: disable=attribute-defined-outside-init
- self.connection_closed = True
diff --git a/heron/tools/ui/src/python/main.py b/heron/tools/ui/src/python/main.py
index 98f1e8e..bc7ddad 100644
--- a/heron/tools/ui/src/python/main.py
+++ b/heron/tools/ui/src/python/main.py
@@ -18,185 +18,610 @@
# specific language governing permissions and limitations
# under the License.
-''' main.py '''
-import logging
-import os
-import signal
+"""
+heron-ui provides a web interface for exploring the state of a
+cluster as reported by a tracker.
+"""
+import logging
+import os.path
import sys
-import tornado.ioloop
-import tornado.options
-import tornado.web
-import tornado.log
-import tornado.template
-from tornado.httpclient import AsyncHTTPClient
-from tornado.options import define
-from tornado.web import url
-
-import heron.common.src.python.utils.log as log
-import heron.tools.common.src.python.utils.config as common_config
-from heron.tools.ui.src.python import handlers
-from heron.tools.ui.src.python import args
-
-Log = log.Log
-
-class Application(tornado.web.Application):
- ''' Application '''
-
- def __init__(self, base_url):
-
- # pylint: disable=fixme
- # TODO: hacky solution
- # sys.path[0] should be the path to the extracted files for heron-ui, as it is added
- # when bootstrapping the pex file
- static_prefix = '/static/'
- if base_url != "":
- static_prefix = os.path.join(base_url, 'static/')
-
- AsyncHTTPClient.configure(None, defaults=dict(request_timeout=120.0))
- Log.info("Using base url: %s", base_url)
- settings = dict(
- template_path=os.path.join(sys.path[0], "heron/tools/ui/resources/templates"),
- static_path=os.path.join(sys.path[0], "heron/tools/ui/resources/static"),
- static_url_prefix=static_prefix,
- gzip=True,
- debug=True,
- default_handler_class=handlers.NotFoundHandler,
- )
- Log.info(os.path.join(base_url, 'static/'))
-
- # Change these to query string parameters, since
- # current format can lead to pattern matching issues.
- callbacks = [
- (r"/", handlers.MainHandler),
-
- url(r"/topologies", handlers.ListTopologiesHandler, dict(baseUrl=base_url),
- name='topologies'),
- url(r"/topologies/filestats/([^\/]+)/([^\/]+)/([^\/]+)/([^\/]+)",
- handlers.ContainerFileStatsHandler, dict(baseUrl=base_url)),
- url(r"/topologies/([^\/]+)/([^\/]+)/([^\/]+)/config",
- handlers.TopologyConfigHandler, dict(baseUrl=base_url)),
- url(r"/topologies/([^\/]+)/([^\/]+)/([^\/]+)/([^\/]+)/([^\/]+)/exceptions",
- handlers.TopologyExceptionsPageHandler, dict(baseUrl=base_url)),
- url(r"/topologies/([^\/]+)/([^\/]+)/([^\/]+)",
- handlers.TopologyPlanHandler, dict(baseUrl=base_url)),
-
- # topology metric apis
- (r"/topologies/metrics",
- handlers.api.MetricsHandler),
- (r"/topologies/metrics/timeline",
- handlers.api.MetricsTimelineHandler),
-
- url(r"/topologies/([^\/]+)/([^\/]+)/([^\/]+)/([^\/]+)/file",
- handlers.ContainerFileHandler, dict(baseUrl=base_url)),
- url(r"/topologies/([^\/]+)/([^\/]+)/([^\/]+)/([^\/]+)/filedata",
- handlers.ContainerFileDataHandler, dict(baseUrl=base_url)),
- url(r"/topologies/([^\/]+)/([^\/]+)/([^\/]+)/([^\/]+)/filedownload",
- handlers.ContainerFileDownloadHandler, dict(baseUrl=base_url)),
-
- # Topology list and plan handlers
- (r"/topologies/list.json",
- handlers.api.ListTopologiesJsonHandler),
- (r"/topologies/([^\/]+)/([^\/]+)/([^\/]+)/logicalplan.json",
- handlers.api.TopologyLogicalPlanJsonHandler),
- (r"/topologies/([^\/]+)/([^\/]+)/([^\/]+)/packingplan.json",
- handlers.api.TopologyPackingPlanJsonHandler),
- (r"/topologies/([^\/]+)/([^\/]+)/([^\/]+)/physicalplan.json",
- handlers.api.TopologyPhysicalPlanJsonHandler),
- (r"/topologies/([^\/]+)/([^\/]+)/([^\/]+)/executionstate.json",
- handlers.api.TopologyExecutionStateJsonHandler),
- (r"/topologies/([^\/]+)/([^\/]+)/([^\/]+)/schedulerlocation.json",
- handlers.api.TopologySchedulerLocationJsonHandler),
-
- # Counter Handlers
- (r"/topologies/([^\/]+)/([^\/]+)/([^\/]+)/([^\/]+)/exceptions.json",
- handlers.api.TopologyExceptionsJsonHandler),
- (r"/topologies/([^\/]+)/([^\/]+)/([^\/]+)/([^\/]+)/exceptionsummary.json",
- handlers.api.TopologyExceptionSummaryHandler),
-
- # Heron shell Handlers
- (r"/topologies/([^\/]+)/([^\/]+)/([^\/]+)/([^\/]+)/pid",
- handlers.api.PidHandler),
- (r"/topologies/([^\/]+)/([^\/]+)/([^\/]+)/([^\/]+)/jstack",
- handlers.api.JstackHandler),
- (r"/topologies/([^\/]+)/([^\/]+)/([^\/]+)/([^\/]+)/jmap",
- handlers.api.JmapHandler),
- (r"/topologies/([^\/]+)/([^\/]+)/([^\/]+)/([^\/]+)/histo",
- handlers.api.MemoryHistogramHandler),
-
- ## Static files
- (r"/static/(.*)", tornado.web.StaticFileHandler,
- dict(path=settings['static_path']))
-
- ]
-
- tornado.web.Application.__init__(self, callbacks, **settings)
-
-
-def define_options(address, port, tracker_url, base_url):
- '''
- :param address:
- :param port:
- :param tracker_url:
- :return:
- '''
- define("address", default=address)
- define("port", default=port)
- define("tracker_url", default=tracker_url)
- define("base_url", default=base_url)
-
-
-def main():
- '''
- :param argv:
- :return:
- '''
- log.configure(logging.DEBUG)
- tornado.log.enable_pretty_logging()
-
- # create the parser and parse the arguments
- (parser, child_parser) = args.create_parsers()
- (parsed_args, remaining) = parser.parse_known_args()
-
- if remaining:
- r = child_parser.parse_args(args=remaining, namespace=parsed_args)
- namespace = vars(r)
- if 'version' in namespace:
- common_config.print_build_info()
- else:
- parser.print_help()
- parser.exit()
-
- # log additional information
- command_line_args = vars(parsed_args)
-
- Log.info("Listening at http://%s:%d%s", command_line_args['address'],
- command_line_args['port'], command_line_args['base_url'])
- Log.info("Using tracker url: %s", command_line_args['tracker_url'])
-
- # pass the options to tornado and start the ui server
- define_options(command_line_args['address'],
- command_line_args['port'],
- command_line_args['tracker_url'],
- command_line_args['base_url'])
- http_server = tornado.httpserver.HTTPServer(Application(command_line_args['base_url']))
- http_server.listen(command_line_args['port'], address=command_line_args['address'])
-
- # pylint: disable=unused-argument
- # stop Tornado IO loop
- def signal_handler(signum, frame):
- # start a new line after ^C character because this looks nice
- print('\n')
- Log.debug('SIGINT received. Stopping UI')
- tornado.ioloop.IOLoop.instance().stop()
-
- # associate SIGINT and SIGTERM with a handler
- signal.signal(signal.SIGINT, signal_handler)
- signal.signal(signal.SIGTERM, signal_handler)
-
- # start Tornado IO loop
- tornado.ioloop.IOLoop.instance().start()
+import time
+
+from collections import Counter
+from datetime import datetime
+from typing import Callable, List, Optional
+
+from heron.tools.common.src.python.utils import config
+from heron.tools.common.src.python.clients import tracker
+from heron.common.src.python.utils import log
+
+import click
+import pydantic
+import requests
+import uvicorn
+
+from fastapi import APIRouter, FastAPI, Query, Request
+from fastapi.responses import HTMLResponse
+from fastapi.staticfiles import StaticFiles
+from fastapi.templating import Jinja2Templates
+from starlette.responses import RedirectResponse, Response
+from starlette.exceptions import HTTPException as StarletteHTTPException
+
+
+VERSION = config.get_version_number()
+DEFAULT_ADDRESS = "0.0.0.0"
+DEFAULT_PORT = 8889
+DEFAULT_TRACKER_URL = "http://127.0.0.1:8888"
+DEFAULT_BASE_URL = ""
+
+base_url = DEFAULT_BASE_URL
+tracker_url = DEFAULT_TRACKER_URL
+
+app = FastAPI(title="Heron UI", version=VERSION)
+
+templates = Jinja2Templates(
+ directory=os.path.join(sys.path[0], "heron/tools/ui/resources/templates")
+)
+topologies_router = APIRouter()
+
+
+@app.get("/")
+def home():
+ """Redirect from root to topologies listing."""
+ return RedirectResponse(url=app.url_path_for("topologies_page"))
+
+
+@topologies_router.get("")
+def topologies_page(request: Request) -> Response:
+ """Return a rendered list of topologies."""
+ return templates.TemplateResponse("topologies.html", {
+ "topologies": [],
+ "clusters": [str(cluster) for cluster in tracker.get_clusters()],
+ "active": "topologies",
+ "baseUrl": base_url,
+ "request": request,
+ })
+
+
+@topologies_router.get("/{cluster}/{environment}/{topology}/config")
+def config_page(
+ cluster: str,
+ environment: str,
+ topology: str,
+ request: Request,
+) -> Response:
+ """Render a HTML page of config for a topology."""
+ return templates.TemplateResponse(
+ "config.html",
+ {
+ "cluster": cluster,
+ "environ": environment,
+ "topology": topology,
+ "active": "topologies",
+ "baseUrl": base_url,
+ "request": request,
+ },
+ )
+
+
+@topologies_router.get("/{cluster}/{environment}/{topology}/{instance}/{component}/exceptions")
+def exceptions_page(
+ cluster: str, environment: str, topology: str, component: str, instance: str,
+ request: Request
+) -> Response:
+ """Render a HTML page of exceptions for a container."""
+ return templates.TemplateResponse(
+ "exception.html",
+ {
+ "cluster": cluster,
+ "environ": environment,
+ "topology": topology,
+ "comp_name": component,
+ "instance": instance,
+ "active": "topologies",
+ "baseUrl": base_url,
+ "request": request,
+ },
+ )
+
+
+@topologies_router.get("/{cluster}/{environment}/{topology}")
+def planner_page(
+ cluster: str,
+ environment: str,
+ topology: str,
+ request: Request,
+) -> Response:
+ """Render a HTML page to show information about a topology."""
+ execution_state = tracker.get_execution_state(cluster, environment, topology)
+ scheduler_location = tracker.get_scheduler_location(
+ cluster, environment, topology
+ )
+ # is the tracker really making links for the UI!?
+ job_page_link = scheduler_location["job_page_link"]
+ launched_at = datetime.utcfromtimestamp(execution_state["submission_time"])
+ launched_time = launched_at.isoformat(" ") + "Z"
+
+ return templates.TemplateResponse(
+ "topology.html",
+ {
+ "cluster": cluster,
+ "environ": environment,
+ "topology": topology,
+ "execution_state": execution_state,
+ "launched": launched_time,
+ "status": "unknown", # supposed to be "running" or "errors", but not implemented
+ "active": "topologies",
+ "job_page_link": job_page_link,
+ "baseUrl": base_url,
+ "request": request,
+ },
+ )
+
+
+@topologies_router.get("/metrics")
+def metrics(
+ cluster: str,
+ environ: str,
+ topology: str,
+ metric_names: List[str] = Query([], alias="metricname"),
+ instances: List[str] = Query([], alias="instance"),
+ component: Optional[str] = None,
+ interval: int = -1,
+) -> dict:
+ """Return metrics for a given time range."""
+ time_range = (0, interval)
+ component_names = (
+ [component]
+ if component else
+ tracker.get_comps(cluster, environ, topology)
+ )
+ # could make this async
+ result = {
+ # need to port over everything from access to tracker
+ c: tracker.get_comp_metrics(
+ cluster, environ, topology, c, instances, metric_names, time_range)
+ for c in component_names
+ }
+ # switching the payload shape is bad, so this should be factored out in the future
+ if component:
+ return result[component]
+ return result
+
+# should factor out the tornado based access module
+query_handler = tracker.HeronQueryHandler()
+@topologies_router.get("/metrics/timeline")
+def timeline(
+ cluster: str,
+ environ: str,
+ topology: str,
+ metric: str,
+ instance: str,
+ starttime: str,
+ endtime: str,
+ component: Optional[str] = None,
+ max: bool = False, # pylint: disable=redefined-builtin
+) -> dict:
+ """Return metrics for a given time range."""
+ timerange = (starttime, endtime)
+ component_names = (
+ [component]
+ if component else
+ tracker.get_comps(cluster, environ, topology)
+ )
+ if metric == "backpressure":
+ result = {
+ c: query_handler.fetch_backpressure(
+ cluster, metric, topology, c,
+ instance, timerange, max, environ,
+ )
+ for c in component_names
+ }
+ else:
+ fetch = query_handler.fetch_max if max else query_handler.fetch
+ result = {
+ c: fetch(cluster, metric, topology, c, instance, timerange, environ)
+ for c in component_names
+ }
+ # switching the payload shape is bad, so this should be factored out in the future
+ if component:
+ return result[component]
+ return result
+
+
+@topologies_router.get("/filestats/{cluster}/{environment}/{topology}/{container}/file")
+def file_stats_page(
+ cluster: str, environment: str, topology: str,
+ container: str, request: Request, path: str = ".",
+) -> Response:
+ """Render a HTML page for exploring a container's files."""
+ data = tracker.get_filestats(cluster, environment, topology, container, path)
+ return templates.TemplateResponse(
+ "browse.html",
+ {
+ "cluster": cluster,
+ "environ": environment,
+ "topology": topology,
+ "container": container,
+ "path": path,
+ "filestats": data,
+ "baseUrl": base_url,
+ "request": request,
+ },
+ )
+
+
+@topologies_router.get("/{cluster}/{environment}/{topology}/{container}/file")
+def file_page(
+ cluster: str, environment: str, topology: str, container: str, path: str,
+ request: Request,
+) -> Response:
+ """Render a HTML page for retrieving a container's file."""
+ return templates.TemplateResponse(
+ "file.html",
+ {
+ "cluster": cluster,
+ "environ": environment,
+ "topology": topology,
+ "container": container,
+ "path": path,
+ "baseUrl": base_url,
+ "request": request,
+ },
+ )
+
+
+@topologies_router.get("/{cluster}/{environment}/{topology}/{container}/filedata")
+def file_data(
+ cluster: str,
+ environment: str,
+ topology: str,
+ container: str,
+ offset: int,
+ length: int,
+ path: str,
+) -> Response:
+ """Return a byte-range of data from a container's file."""
+ # this should just use the byte-range header in a file download method
+ data = tracker.get_container_file_data(
+ cluster, environment, topology, container, path, offset, length
+ )
+ return data
+
+
+@topologies_router.get("/{cluster}/{environment}/{topology}/{container}/filedownload")
+def file_download(
+ cluster: str, environment: str, topology: str, container: str, path: str
+) -> Response:
+ """Return a file from a container."""
+ filename = os.path.basename(path)
+ # make a streaming response and use a streaming download client
+ download_url = tracker.get_container_file_download_url(
+ cluster, environment, topology, container, path
+ )
+ data = requests.get(download_url)
+ return Response(
+ content=data.content,
+ media_type="application/binary",
+ headers={"Content-Disposition": f"attachment; filename={filename}"},
+ )
+
+
+# topology list and plan handlers
+class ApiEnvelope(pydantic.BaseModel):
+ """Envelope for heron-ui JSON API."""
+ status: str
+ message: str
+ version: str = VERSION
+ executiontime: int
+ result: dict
+
+
+def api_topology_json(method: Callable[[], dict]) -> ApiEnvelope:
+ """Wrap the output of a method with a response envelope."""
+ started = time.time()
+ result = method()
+ return ApiEnvelope(
+ status="success",
+ message="",
+ executiontime=time.time() - started,
+ result=result,
+ )
+
+@topologies_router.get("/list.json")
+def topologies_json() -> dict:
+ """Return the (mutated) list of topologies."""
+ topologies = tracker.get_topologies_states()
+ result = {}
+ for c, cluster_value in topologies.items():
+ result[c] = {}
+ for e, environment_value in cluster_value.items():
+ result[c][e] = {}
+ for t, topology_value in environment_value.items():
+ if topology_value.get("jobname") is None:
+ continue
+ # transforming payloads is usually an indicator of a bad shape
+ topology_value.setdefault("submission_time", "-")
+ result[c][e][t] = topology_value
+ return result
+
+
+@topologies_router.get(
+ "/{cluster}/{environment}/{topology}/logicalplan.json", response_model=ApiEnvelope
+)
+def logical_plan_json(cluster: str, environment: str, topology: str) -> ApiEnvelope:
+ """Return the logical plan object for a topology."""
+ return api_topology_json(lambda: tracker.get_logical_plan(
+ cluster, environment, topology, None,
+ ))
+
+
+@topologies_router.get(
+ "/{cluster}/{environment}/{topology}/packingplan.json", response_model=ApiEnvelope
+)
+def packing_plan_json(cluster: str, environment: str, topology: str) -> ApiEnvelope:
+ """Return the packing plan object for a topology."""
+ return api_topology_json(lambda: tracker.get_packing_plan(
+ cluster, environment, topology, None,
+ ))
+
+
+@topologies_router.get(
+ "/{cluster}/{environment}/{topology}/physicalplan.json", response_model=ApiEnvelope
+)
+def physical_plan_json(cluster: str, environment: str, topology: str) -> ApiEnvelope:
+ """Return the physical plan object for a topology."""
+ return api_topology_json(lambda: tracker.get_physical_plan(
+ cluster, environment, topology, None,
+ ))
+
+
+@topologies_router.get(
+ "/{cluster}/{environment}/{topology}/executionstate.json", response_model=ApiEnvelope
+)
+def execution_state_json(cluster: str, environment: str, topology: str) -> ApiEnvelope:
+ """Return the execution state object for a topology."""
+ return api_topology_json(lambda: tracker.get_execution_state(
+ cluster, environment, topology,
+ ))
+
+
+@topologies_router.get(
+ "/{cluster}/{environment}/{topology}/schedulerlocation.json",
+ response_model=ApiEnvelope,
+)
+def scheduler_location_json(cluster: str, environment: str, topology: str) -> ApiEnvelope:
+ """Unimplemented method which is currently a duplicate of execution state."""
+ return api_topology_json(lambda: tracker.get_execution_state(
+ cluster, environment, topology,
+ ))
+
+
+@topologies_router.get(
+ "/{cluster}/{environment}/{topology}/{component}/exceptions.json",
+ response_model=ApiEnvelope,
+)
+def exceptions_json(cluster: str, environment: str, topology: str, component: str) -> ApiEnvelope:
+ """Return a list of exceptions for a component."""
+ return api_topology_json(lambda: tracker.get_component_exceptions(
+ cluster, environment, topology, component,
+ ))
+
+
+@topologies_router.get(
+ "/{cluster}/{environment}/{topology}/{component}/exceptionsummary.json",
+ response_model=ApiEnvelope,
+)
+def exception_summary_json(
+ cluster: str, environment: str, topology: str, component: str
+) -> ApiEnvelope:
+ """Return a table of exception classes to totals."""
+ started = time.time()
+ if component.lower() == "all":
+ logical_plan = tracker.get_logical_plan(cluster, environment, topology)
+ if not logical_plan or not {"bolts", "spouts"} <= logical_plan.keys():
+ return {}
+ # looks like topologies can have spouts but no bolts, so they're assumed to be empty - should
+ # the above key check be removed and replaced with bolts defaulting to an empty list?
+ component_names = [*logical_plan["spouts"], *logical_plan["bolts"]]
+ else:
+ component_names = [component]
+
+ exception_infos = {
+ c: tracker.get_component_exceptionsummary(cluster, environment, topology, c)
+ for c in component_names
+ }
+
+ class_counts = Counter()
+ for exception_logs in exception_infos.values():
+ for exception_log in exception_logs:
+ class_counts[exception_log["class_name"]] += int(exception_log["count"])
+
+ aggregate_exceptions_table = [
+ [class_name, str(count)]
+ for class_name, count in class_counts.items()
+ ]
+
+ return ApiEnvelope(
+ status="success",
+ message="",
+ executiontime=time.time() - started,
+ result=aggregate_exceptions_table,
+ )
+
+
+@topologies_router.get(
+ "/{cluster}/{environment}/{topology}/{instance}/pid"
+)
+def pid_snippet(
+ request: Request,
+ cluster: str,
+ environment: str,
+ topology: str,
+ instance: str,
+) -> Response:
+ """Render a HTML snippet containing topology output of container."""
+ physical_plan = tracker.get_physical_plan(cluster, environment, topology)
+ host = physical_plan["stmgrs"][physical_plan["instances"][instance]["stmgrId"]][
+ "host"
+ ]
+ info = tracker.get_instance_pid(cluster, environment, topology, instance)
+ command = info["command"]
+ stdout = info["stdout"]
+ return templates.TemplateResponse(
+ "shell.snip.html",
+ {
+ "request": request,
+ "host": host,
+ "command": command,
+ "output": stdout,
+ },
+ )
+
+
+
+@topologies_router.get(
+ "/{cluster}/{environment}/{topology}/{instance}/jstack"
+)
+def jstack_snippet(
+ request: Request,
+ cluster: str,
+ environment: str,
+ topology: str,
+ instance: str,
+) -> HTMLResponse:
+ """Render a HTML snippet containing jstack output of container."""
+ physical_plan = tracker.get_physical_plan(cluster, environment, topology)
+ host = physical_plan["stmgrs"][physical_plan["instances"][instance]["stmgrId"]][
+ "host"
+ ]
+ info = tracker.get_instance_jstack(cluster, environment, topology, instance)
+ command = info["command"]
+ stdout = info["stdout"]
+ return templates.TemplateResponse(
+ "shell.snip.html",
+ {
+ "request": request,
+ "host": host,
+ "command": command,
+ "output": stdout,
+ },
+ )
+
+
+@topologies_router.get(
+ "/{cluster}/{environment}/{topology}/{instance}/jmap"
+)
+def jmap_snippet(
+ request: Request,
+ cluster: str,
+ environment: str,
+ topology: str,
+ instance: str,
+) -> HTMLResponse:
+ """Render a HTML snippet containing jmap output of container."""
+ physical_plan = tracker.get_physical_plan(cluster, environment, topology)
+ host = physical_plan["stmgrs"][physical_plan["instances"][instance]["stmgrId"]][
+ "host"
+ ]
+ info = tracker.run_instance_jmap(cluster, environment, topology, instance)
+ command = info["command"]
+ stdout = info["stdout"]
+ info = """
+ <ul>
+ <li>May take longer than usual (1-2 minutes) please be patient.</li>
+ <li>Use SCP to copy heap dump files from host. (SCP {host}:/tmp/heap.bin /tmp/)</li>
+ </ul>
+ """
+ return templates.TemplateResponse(
+ "shell.snip.html",
+ {
+ "request": request,
+ "host": host,
+ "command": command,
+ "output": stdout,
+ "info": info,
+ },
+ )
+
+
+@topologies_router.get(
+ "/{cluster}/{environment}/{topology}/{instance}/histo"
+)
+def histogram_snippet(
+ request: Request,
+ cluster: str,
+ environment: str,
+ topology: str,
+ instance: str,
+) -> HTMLResponse:
+ """Render a HTML snippet containing jmap histogram output of container."""
+ # use a function to DRY up these container API methods
+ physical_plan = tracker.get_physical_plan(cluster, environment, topology)
+ host = physical_plan["stmgrs"][physical_plan["instances"][instance]["stmgrId"]][
+ "host"
+ ]
+ info = tracker.get_instance_mem_histogram(
+ cluster, environment, topology, instance
+ )
+ command = info["command"]
+ stdout = info["stdout"]
+ return templates.TemplateResponse(
+ "shell.snip.html",
+ {
+ "request": request,
+ "host": host,
+ "command": command,
+ "output": stdout,
+ },
+ )
+
+
+app.include_router(topologies_router, prefix="/topologies")
+app.mount(
+ "/static",
+ StaticFiles(directory=os.path.join(sys.path[0], "heron/tools/ui/resources/static")),
+ name="static",
+)
+
+@app.exception_handler(StarletteHTTPException)
+async def unicorn_exception_handler(request: Request, exc: StarletteHTTPException) -> Response:
+ if exc.status_code == 404:
+ message = "URL not found"
+ else:
+ message = str(exc)
+ return templates.TemplateResponse("error.html", {"errormessage": message, "request": request})
+
+def show_version(_, __, value):
+ if value:
+ config.print_build_info()
+ sys.exit(0)
+
+@click.command()
+@click.option("--tracker-url", "tracker_url_option", default=DEFAULT_TRACKER_URL)
+@click.option("--base-url", "base_url_option", default=DEFAULT_BASE_URL)
+@click.option("--host", default=DEFAULT_ADDRESS)
+@click.option("--port", type=int, default=DEFAULT_PORT)
+@click.option("--verbose", is_flag=True)
+@click.option(
+ "--version",
+ is_flag=True,
+ is_eager=True,
+ expose_value=False,
+ callback=show_version,
+)
+def cli(
+ host: str, port: int, base_url_option: str, tracker_url_option: str, verbose: bool
+) -> None:
+ """Start a web UI for heron which renders information from the tracker."""
+ global base_url, tracker_url
+ base_url = base_url_option
+ log.configure(level=logging.DEBUG if verbose else logging.INFO)
+ tracker.tracker_url = tracker_url_option
+
+ uvicorn.run(app, host=host, port=port, log_config=None)
if __name__ == "__main__":
- main()
+ cli() # pylint: disable=no-value-for-parameter
diff --git a/integration_test/src/python/http_server/BUILD b/integration_test/src/python/http_server/BUILD
index 750ad68..80f22be 100644
--- a/integration_test/src/python/http_server/BUILD
+++ b/integration_test/src/python/http_server/BUILD
@@ -7,7 +7,7 @@ pex_binary(
],
main = "main.py",
reqs = [
- "tornado==4.0.2",
+ "tornado==4.5.3",
],
deps = [
"//heron/common/src/python:common-py",
diff --git a/vagrant/Vagrantfile b/vagrant/Vagrantfile
index ad39ee4..f3e155b 100644
--- a/vagrant/Vagrantfile
+++ b/vagrant/Vagrantfile
@@ -33,6 +33,8 @@ Vagrant.configure(2) do |config|
config.vm.box = "bento/ubuntu-20.04"
config.vm.synced_folder "../", "/vagrant"
config.vm.boot_timeout = 600
+ # heron-ui
+ config.vm.network "forwarded_port", guest: 8889, host: 8889
config.vm.define "master" do |master|
master.vm.provider "virtualbox" do |v|