You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@heron.apache.org by hu...@apache.org on 2018/07/10 22:24:17 UTC
[incubator-heron] branch master updated: Added endpoint to tracker
to fetch packing plan. (#2959)
This is an automated email from the ASF dual-hosted git repository.
huijun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-heron.git
The following commit(s) were added to refs/heads/master by this push:
new 3fea43b Added endpoint to tracker to fetch packing plan. (#2959)
3fea43b is described below
commit 3fea43bbb96f5bdcf52d0821adc3641545a3e785
Author: Faria Kalim <fa...@gmail.com>
AuthorDate: Tue Jul 10 15:24:14 2018 -0700
Added endpoint to tracker to fetch packing plan. (#2959)
* Added endpoint to tracker to fetch packing plan.
* checked scheduled resources as part of test
---
.../tools/tracker/src/python/handlers/__init__.py | 1 +
.../src/python/handlers/packingplanhandler.py | 61 +++++++++++++++++++++
heron/tools/tracker/src/python/main.py | 1 +
heron/tools/tracker/src/python/topology.py | 11 ++++
heron/tools/tracker/src/python/tracker.py | 64 ++++++++++++++++++++++
heron/tools/tracker/tests/python/mock_proto.py | 47 ++++++++++++++++
.../tracker/tests/python/topology_unittest.py | 19 +++++++
7 files changed, 204 insertions(+)
diff --git a/heron/tools/tracker/src/python/handlers/__init__.py b/heron/tools/tracker/src/python/handlers/__init__.py
index a715d1b..e74e983 100644
--- a/heron/tools/tracker/src/python/handlers/__init__.py
+++ b/heron/tools/tracker/src/python/handlers/__init__.py
@@ -19,6 +19,7 @@ from metricshandler import MetricsHandler
from metricsqueryhandler import MetricsQueryHandler
from metricstimelinehandler import MetricsTimelineHandler
from physicalplanhandler import PhysicalPlanHandler
+from packingplanhandler import PackingPlanHandler
from pidhandler import PidHandler
from runtimestatehandler import RuntimeStateHandler
from schedulerlocationhandler import SchedulerLocationHandler
diff --git a/heron/tools/tracker/src/python/handlers/packingplanhandler.py b/heron/tools/tracker/src/python/handlers/packingplanhandler.py
new file mode 100644
index 0000000..ced6eb1
--- /dev/null
+++ b/heron/tools/tracker/src/python/handlers/packingplanhandler.py
@@ -0,0 +1,61 @@
+#!/usr/bin/env python
+# -*- encoding: utf-8 -*-
+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+''' packingplanhandler.py '''
+import traceback
+import tornado.gen
+import tornado.web
+
+from heron.common.src.python.utils.log import Log
+from heron.tools.tracker.src.python.handlers import BaseHandler
+
+
+class PackingPlanHandler(BaseHandler):
+ """
+ URL - /topologies/packingplan
+ Parameters:
+ - cluster (required)
+ - role - (optional) Role used to submit the topology.
+ - environ (required)
+ - topology (required) name of the requested topology
+
+ The response JSON is a dictionary with all the
+ information of packing plan of the topology.
+ """
+
+ # pylint: disable=attribute-defined-outside-init
+ def initialize(self, tracker):
+ """initialize"""
+ self.tracker = tracker
+
+ @tornado.gen.coroutine
+ def get(self):
+ """get method"""
+ try:
+ cluster = self.get_argument_cluster()
+ role = self.get_argument_role()
+ environ = self.get_argument_environ()
+ topology_name = self.get_argument_topology()
+ topology_info = self.tracker.getTopologyInfo(topology_name, cluster, role, environ)
+ packing_plan = topology_info["packing_plan"]
+ self.write_success_response(packing_plan)
+ except Exception as e:
+ Log.debug(traceback.format_exc())
+ self.write_error_response(e)
diff --git a/heron/tools/tracker/src/python/main.py b/heron/tools/tracker/src/python/main.py
index 1a3518f..d9f60e9 100644
--- a/heron/tools/tracker/src/python/main.py
+++ b/heron/tools/tracker/src/python/main.py
@@ -62,6 +62,7 @@ class Application(tornado.web.Application):
(r"/topologies/containerfilestats",
handlers.ContainerFileStatsHandler, {"tracker":self.tracker}),
(r"/topologies/physicalplan", handlers.PhysicalPlanHandler, {"tracker":self.tracker}),
+ (r"/topologies/packingplan", handlers.PackingPlanHandler, {"tracker":self.tracker}),
# Deprecated. See https://github.com/apache/incubator-heron/issues/1754
(r"/topologies/executionstate", handlers.ExecutionStateHandler, {"tracker":self.tracker}),
(r"/topologies/schedulerlocation", handlers.SchedulerLocationHandler,
diff --git a/heron/tools/tracker/src/python/topology.py b/heron/tools/tracker/src/python/topology.py
index c7e1122..5198492 100644
--- a/heron/tools/tracker/src/python/topology.py
+++ b/heron/tools/tracker/src/python/topology.py
@@ -46,6 +46,7 @@ class Topology(object):
self.name = name
self.state_manager_name = state_manager_name
self.physical_plan = None
+ self.packing_plan = None
self.execution_state = None
self.id = None
self.cluster = None
@@ -128,6 +129,16 @@ class Topology(object):
self.id = physical_plan.topology.id
self.trigger_watches()
+ def set_packing_plan(self, packing_plan):
+ """ set packing plan """
+ if not packing_plan:
+ self.packing_plan = None
+ self.id = None
+ else:
+ self.packing_plan = packing_plan
+ self.id = packing_plan.id
+ self.trigger_watches()
+
# pylint: disable=no-self-use
def get_execution_state_dc_environ(self, execution_state):
"""
diff --git a/heron/tools/tracker/src/python/tracker.py b/heron/tools/tracker/src/python/tracker.py
index 4fb6e87..8c0649a 100644
--- a/heron/tools/tracker/src/python/tracker.py
+++ b/heron/tools/tracker/src/python/tracker.py
@@ -205,6 +205,13 @@ class Tracker(object):
if not data:
Log.debug("No data to be set")
+ def on_topology_packing_plan(data):
+ """watch packing plan"""
+ Log.info("Watch triggered for topology packing plan: " + topologyName)
+ topology.set_packing_plan(data)
+ if not data:
+ Log.debug("No data to be set")
+
def on_topology_execution_state(data):
"""watch execution state"""
Log.info("Watch triggered for topology execution state: " + topologyName)
@@ -228,6 +235,7 @@ class Tracker(object):
# Set watches on the pplan, execution_state, tmaster and scheduler_location.
state_manager.get_pplan(topologyName, on_topology_pplan)
+ state_manager.get_packing_plan(topologyName, on_topology_packing_plan)
state_manager.get_execution_state(topologyName, on_topology_execution_state)
state_manager.get_tmaster(topologyName, on_topology_tmaster)
state_manager.get_scheduler_location(topologyName, on_topology_scheduler_location)
@@ -301,6 +309,8 @@ class Tracker(object):
runtime_state = {}
runtime_state["has_physical_plan"] = \
True if topology.physical_plan else False
+ runtime_state["has_packing_plan"] = \
+ True if topology.packing_plan else False
runtime_state["has_tmaster_location"] = \
True if topology.tmaster else False
runtime_state["has_scheduler_location"] = \
@@ -514,6 +524,52 @@ class Tracker(object):
return physicalPlan
+ # pylint: disable=too-many-locals
+ def extract_packing_plan(self, topology):
+ """
+ Returns the representation of packing plan that will
+ be returned from Tracker.
+ """
+ packingPlan = {
+ "id": "",
+ "container_plans": []
+ }
+
+ if not topology.packing_plan:
+ return packingPlan
+
+ container_plans = topology.packing_plan.container_plans
+
+ containers = []
+ for container_plan in container_plans:
+ instances = []
+ for instance_plan in container_plan.instance_plans:
+ instance_resources = {"cpu": instance_plan.resource.cpu,
+ "ram": instance_plan.resource.ram,
+ "disk": instance_plan.resource.disk}
+ instance = {"component_name" : instance_plan.component_name,
+ "task_id" : instance_plan.task_id,
+ "component_index": instance_plan.component_index,
+ "instance_resources": instance_resources}
+ instances.append(instance)
+ required_resource = {"cpu": container_plan.requiredResource.cpu,
+ "ram": container_plan.requiredResource.ram,
+ "disk": container_plan.requiredResource.disk}
+ scheduled_resource = {}
+ if container_plan.scheduledResource:
+ scheduled_resource = {"cpu": container_plan.scheduledResource.cpu,
+ "ram": container_plan.scheduledResource.ram,
+ "disk": container_plan.scheduledResource.disk}
+ container = {"id": container_plan.id,
+ "instances": instances,
+ "required_resources": required_resource,
+ "scheduled_resources": scheduled_resource}
+ containers.append(container)
+
+ packingPlan["id"] = topology.packing_plan.id
+ packingPlan["container_plans"] = containers
+ return json.dumps(packingPlan)
+
def setTopologyInfo(self, topology):
"""
Extracts info from the stored proto states and
@@ -536,6 +592,11 @@ class Tracker(object):
if not topology.physical_plan:
has_physical_plan = False
+ Log.info("Setting topology info for topology: " + topology.name)
+ has_packing_plan = True
+ if not topology.packing_plan:
+ has_packing_plan = False
+
has_tmaster_location = True
if not topology.tmaster:
has_tmaster_location = False
@@ -549,6 +610,7 @@ class Tracker(object):
"id": topology.id,
"logical_plan": None,
"physical_plan": None,
+ "packing_plan": None,
"execution_state": None,
"tmaster_location": None,
"scheduler_location": None,
@@ -556,6 +618,7 @@ class Tracker(object):
executionState = self.extract_execution_state(topology)
executionState["has_physical_plan"] = has_physical_plan
+ executionState["has_packing_plan"] = has_packing_plan
executionState["has_tmaster_location"] = has_tmaster_location
executionState["has_scheduler_location"] = has_scheduler_location
executionState["status"] = topology.get_status()
@@ -566,6 +629,7 @@ class Tracker(object):
topologyInfo["execution_state"] = executionState
topologyInfo["logical_plan"] = self.extract_logical_plan(topology)
topologyInfo["physical_plan"] = self.extract_physical_plan(topology)
+ topologyInfo["packing_plan"] = self.extract_packing_plan(topology)
topologyInfo["tmaster_location"] = self.extract_tmaster(topology)
topologyInfo["scheduler_location"] = self.extract_scheduler_location(topology)
diff --git a/heron/tools/tracker/tests/python/mock_proto.py b/heron/tools/tracker/tests/python/mock_proto.py
index d8fa9df..f3d4014 100644
--- a/heron/tools/tracker/tests/python/mock_proto.py
+++ b/heron/tools/tracker/tests/python/mock_proto.py
@@ -2,6 +2,7 @@
from heronpy.api import api_constants
import heron.proto.execution_state_pb2 as protoEState
import heron.proto.physical_plan_pb2 as protoPPlan
+import heron.proto.packing_plan_pb2 as protoPackingPlan
import heron.proto.tmaster_pb2 as protoTmaster
import heron.proto.topology_pb2 as protoTopology
@@ -27,6 +28,37 @@ class MockProto(object):
spout.outputs.add().stream.CopyFrom(stream)
return spout
+ def create_mock_resource(self):
+ resource = protoPackingPlan.Resource()
+ resource.cpu = 1.0
+ resource.ram = 1024
+ resource.disk = 1024 * 2
+ return resource
+
+ def create_mock_instance_plan(self):
+ instancePlan = protoPackingPlan.InstancePlan()
+ instancePlan.component_name = "word"
+ instancePlan.task_id = 1
+ instancePlan.component_index = 1
+ instancePlan.resource.CopyFrom(self.create_mock_resource())
+ return instancePlan
+
+ def create_mock_simple_container_plan(self):
+ containerPlan = protoPackingPlan.ContainerPlan()
+ containerPlan.id = 1
+ containerPlan.instance_plans.extend([self.create_mock_instance_plan()])
+ containerPlan.requiredResource.CopyFrom(self.create_mock_resource())
+
+ return containerPlan
+
+ def create_mock_simple_container_plan2(self):
+ containerPlan = protoPackingPlan.ContainerPlan()
+ containerPlan.id = 1
+ containerPlan.instance_plans.extend([self.create_mock_instance_plan()])
+ containerPlan.requiredResource.CopyFrom(self.create_mock_resource())
+ containerPlan.scheduledResource.CopyFrom(self.create_mock_resource())
+ return containerPlan
+
def create_mock_bolt(self,
bolt_name,
input_streams,
@@ -138,6 +170,21 @@ class MockProto(object):
bolt_parallelism))
return pplan
+ def create_mock_simple_packing_plan(
+ self):
+ packingPlan = protoPackingPlan.PackingPlan()
+ packingPlan.id = "ExclamationTopology"
+ packingPlan.container_plans.extend([self.create_mock_simple_container_plan()])
+ return packingPlan
+
+ def create_mock_simple_packing_plan2(
+ self):
+ packingPlan = protoPackingPlan.PackingPlan()
+ packingPlan.id = "ExclamationTopology"
+ packingPlan.container_plans.extend([self.create_mock_simple_container_plan2()])
+ packingPlan.container_plans.extend([self.create_mock_simple_container_plan()])
+ return packingPlan
+
def create_mock_medium_physical_plan(
self,
spout_parallelism=1,
diff --git a/heron/tools/tracker/tests/python/topology_unittest.py b/heron/tools/tracker/tests/python/topology_unittest.py
index fdbfb67..952e3a4 100644
--- a/heron/tools/tracker/tests/python/topology_unittest.py
+++ b/heron/tools/tracker/tests/python/topology_unittest.py
@@ -22,6 +22,25 @@ class TopologyTest(unittest.TestCase):
self.assertEqual(MockProto.topology_id, self.topology.id)
self.assertEqual(physical_plan, self.topology.physical_plan)
+ def test_set_packing_plan(self):
+ # Set it to None
+ self.topology.set_packing_plan(None)
+ self.assertIsNone(self.topology.id)
+ self.assertIsNone(self.topology.packing_plan)
+
+ packing_plan = MockProto().create_mock_simple_packing_plan()
+ self.topology.set_packing_plan(packing_plan)
+ self.assertEqual(packing_plan, self.topology.packing_plan)
+
+ # testing with a packing plan with scheduled resources
+ self.topology.set_packing_plan(None)
+ self.assertIsNone(self.topology.id)
+ self.assertIsNone(self.topology.packing_plan)
+
+ packing_plan = MockProto().create_mock_simple_packing_plan2()
+ self.topology.set_packing_plan(packing_plan)
+ self.assertEqual(packing_plan, self.topology.packing_plan)
+
def test_set_execution_state(self):
# Set it to None
self.topology.set_execution_state(None)