You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@heron.apache.org by hu...@apache.org on 2018/07/10 22:24:17 UTC

[incubator-heron] branch master updated: Added endpoint to tracker to fetch packing plan. (#2959)

This is an automated email from the ASF dual-hosted git repository.

huijun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-heron.git


The following commit(s) were added to refs/heads/master by this push:
     new 3fea43b  Added endpoint to tracker to fetch packing plan. (#2959)
3fea43b is described below

commit 3fea43bbb96f5bdcf52d0821adc3641545a3e785
Author: Faria Kalim <fa...@gmail.com>
AuthorDate: Tue Jul 10 15:24:14 2018 -0700

    Added endpoint to tracker to fetch packing plan. (#2959)
    
    * Added endpoint to tracker to fetch packing plan.
    
    * checked scheduled resources as part of test
---
 .../tools/tracker/src/python/handlers/__init__.py  |  1 +
 .../src/python/handlers/packingplanhandler.py      | 61 +++++++++++++++++++++
 heron/tools/tracker/src/python/main.py             |  1 +
 heron/tools/tracker/src/python/topology.py         | 11 ++++
 heron/tools/tracker/src/python/tracker.py          | 64 ++++++++++++++++++++++
 heron/tools/tracker/tests/python/mock_proto.py     | 47 ++++++++++++++++
 .../tracker/tests/python/topology_unittest.py      | 19 +++++++
 7 files changed, 204 insertions(+)

diff --git a/heron/tools/tracker/src/python/handlers/__init__.py b/heron/tools/tracker/src/python/handlers/__init__.py
index a715d1b..e74e983 100644
--- a/heron/tools/tracker/src/python/handlers/__init__.py
+++ b/heron/tools/tracker/src/python/handlers/__init__.py
@@ -19,6 +19,7 @@ from metricshandler import MetricsHandler
 from metricsqueryhandler import MetricsQueryHandler
 from metricstimelinehandler import MetricsTimelineHandler
 from physicalplanhandler import PhysicalPlanHandler
+from packingplanhandler import PackingPlanHandler
 from pidhandler import PidHandler
 from runtimestatehandler import RuntimeStateHandler
 from schedulerlocationhandler import SchedulerLocationHandler
diff --git a/heron/tools/tracker/src/python/handlers/packingplanhandler.py b/heron/tools/tracker/src/python/handlers/packingplanhandler.py
new file mode 100644
index 0000000..ced6eb1
--- /dev/null
+++ b/heron/tools/tracker/src/python/handlers/packingplanhandler.py
@@ -0,0 +1,61 @@
+#!/usr/bin/env python
+# -*- encoding: utf-8 -*-
+
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing,
+#  software distributed under the License is distributed on an
+#  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+#  KIND, either express or implied.  See the License for the
+#  specific language governing permissions and limitations
+#  under the License.
+
+''' packingplanhandler.py '''
+import traceback
+import tornado.gen
+import tornado.web
+
+from heron.common.src.python.utils.log import Log
+from heron.tools.tracker.src.python.handlers import BaseHandler
+
+
+class PackingPlanHandler(BaseHandler):
+  """
+  URL - /topologies/packingplan
+  Parameters:
+   - cluster (required)
+   - role - (optional) Role used to submit the topology.
+   - environ (required)
+   - topology (required) name of the requested topology
+
+  The response JSON is a dictionary with all the
+  information of packing plan of the topology.
+  """
+
+  # pylint: disable=attribute-defined-outside-init
+  def initialize(self, tracker):
+    """initialize"""
+    self.tracker = tracker
+
+  @tornado.gen.coroutine
+  def get(self):
+    """get method"""
+    try:
+      cluster = self.get_argument_cluster()
+      role = self.get_argument_role()
+      environ = self.get_argument_environ()
+      topology_name = self.get_argument_topology()
+      topology_info = self.tracker.getTopologyInfo(topology_name, cluster, role, environ)
+      packing_plan = topology_info["packing_plan"]
+      self.write_success_response(packing_plan)
+    except Exception as e:
+      Log.debug(traceback.format_exc())
+      self.write_error_response(e)
diff --git a/heron/tools/tracker/src/python/main.py b/heron/tools/tracker/src/python/main.py
index 1a3518f..d9f60e9 100644
--- a/heron/tools/tracker/src/python/main.py
+++ b/heron/tools/tracker/src/python/main.py
@@ -62,6 +62,7 @@ class Application(tornado.web.Application):
         (r"/topologies/containerfilestats",
          handlers.ContainerFileStatsHandler, {"tracker":self.tracker}),
         (r"/topologies/physicalplan", handlers.PhysicalPlanHandler, {"tracker":self.tracker}),
+        (r"/topologies/packingplan", handlers.PackingPlanHandler, {"tracker":self.tracker}),
         # Deprecated. See https://github.com/apache/incubator-heron/issues/1754
         (r"/topologies/executionstate", handlers.ExecutionStateHandler, {"tracker":self.tracker}),
         (r"/topologies/schedulerlocation", handlers.SchedulerLocationHandler,
diff --git a/heron/tools/tracker/src/python/topology.py b/heron/tools/tracker/src/python/topology.py
index c7e1122..5198492 100644
--- a/heron/tools/tracker/src/python/topology.py
+++ b/heron/tools/tracker/src/python/topology.py
@@ -46,6 +46,7 @@ class Topology(object):
     self.name = name
     self.state_manager_name = state_manager_name
     self.physical_plan = None
+    self.packing_plan = None
     self.execution_state = None
     self.id = None
     self.cluster = None
@@ -128,6 +129,16 @@ class Topology(object):
       self.id = physical_plan.topology.id
     self.trigger_watches()
 
+  def set_packing_plan(self, packing_plan):
+    """ set packing plan """
+    if not packing_plan:
+      self.packing_plan = None
+      self.id = None
+    else:
+      self.packing_plan = packing_plan
+      self.id = packing_plan.id
+    self.trigger_watches()
+
   # pylint: disable=no-self-use
   def get_execution_state_dc_environ(self, execution_state):
     """
diff --git a/heron/tools/tracker/src/python/tracker.py b/heron/tools/tracker/src/python/tracker.py
index 4fb6e87..8c0649a 100644
--- a/heron/tools/tracker/src/python/tracker.py
+++ b/heron/tools/tracker/src/python/tracker.py
@@ -205,6 +205,13 @@ class Tracker(object):
       if not data:
         Log.debug("No data to be set")
 
+    def on_topology_packing_plan(data):
+      """watch packing plan"""
+      Log.info("Watch triggered for topology packing plan: " + topologyName)
+      topology.set_packing_plan(data)
+      if not data:
+        Log.debug("No data to be set")
+
     def on_topology_execution_state(data):
       """watch execution state"""
       Log.info("Watch triggered for topology execution state: " + topologyName)
@@ -228,6 +235,7 @@ class Tracker(object):
 
     # Set watches on the pplan, execution_state, tmaster and scheduler_location.
     state_manager.get_pplan(topologyName, on_topology_pplan)
+    state_manager.get_packing_plan(topologyName, on_topology_packing_plan)
     state_manager.get_execution_state(topologyName, on_topology_execution_state)
     state_manager.get_tmaster(topologyName, on_topology_tmaster)
     state_manager.get_scheduler_location(topologyName, on_topology_scheduler_location)
@@ -301,6 +309,8 @@ class Tracker(object):
     runtime_state = {}
     runtime_state["has_physical_plan"] = \
       True if topology.physical_plan else False
+    runtime_state["has_packing_plan"] = \
+      True if topology.packing_plan else False
     runtime_state["has_tmaster_location"] = \
       True if topology.tmaster else False
     runtime_state["has_scheduler_location"] = \
@@ -514,6 +524,52 @@ class Tracker(object):
 
     return physicalPlan
 
+  # pylint: disable=too-many-locals
+  def extract_packing_plan(self, topology):
+    """
+    Returns the representation of packing plan that will
+    be returned from Tracker.
+    """
+    packingPlan = {
+        "id": "",
+        "container_plans": []
+    }
+
+    if not topology.packing_plan:
+      return packingPlan
+
+    container_plans = topology.packing_plan.container_plans
+
+    containers = []
+    for container_plan in container_plans:
+      instances = []
+      for instance_plan in container_plan.instance_plans:
+        instance_resources = {"cpu": instance_plan.resource.cpu,
+                              "ram": instance_plan.resource.ram,
+                              "disk": instance_plan.resource.disk}
+        instance = {"component_name" : instance_plan.component_name,
+                    "task_id" : instance_plan.task_id,
+                    "component_index": instance_plan.component_index,
+                    "instance_resources": instance_resources}
+        instances.append(instance)
+      required_resource = {"cpu": container_plan.requiredResource.cpu,
+                           "ram": container_plan.requiredResource.ram,
+                           "disk": container_plan.requiredResource.disk}
+      scheduled_resource = {}
+      if container_plan.scheduledResource:
+        scheduled_resource = {"cpu": container_plan.scheduledResource.cpu,
+                              "ram": container_plan.scheduledResource.ram,
+                              "disk": container_plan.scheduledResource.disk}
+      container = {"id": container_plan.id,
+                   "instances": instances,
+                   "required_resources": required_resource,
+                   "scheduled_resources": scheduled_resource}
+      containers.append(container)
+
+    packingPlan["id"] = topology.packing_plan.id
+    packingPlan["container_plans"] = containers
+    return json.dumps(packingPlan)
+
   def setTopologyInfo(self, topology):
     """
     Extracts info from the stored proto states and
@@ -536,6 +592,11 @@ class Tracker(object):
     if not topology.physical_plan:
       has_physical_plan = False
 
+    Log.info("Setting topology info for topology: " + topology.name)
+    has_packing_plan = True
+    if not topology.packing_plan:
+      has_packing_plan = False
+
     has_tmaster_location = True
     if not topology.tmaster:
       has_tmaster_location = False
@@ -549,6 +610,7 @@ class Tracker(object):
         "id": topology.id,
         "logical_plan": None,
         "physical_plan": None,
+        "packing_plan": None,
         "execution_state": None,
         "tmaster_location": None,
         "scheduler_location": None,
@@ -556,6 +618,7 @@ class Tracker(object):
 
     executionState = self.extract_execution_state(topology)
     executionState["has_physical_plan"] = has_physical_plan
+    executionState["has_packing_plan"] = has_packing_plan
     executionState["has_tmaster_location"] = has_tmaster_location
     executionState["has_scheduler_location"] = has_scheduler_location
     executionState["status"] = topology.get_status()
@@ -566,6 +629,7 @@ class Tracker(object):
     topologyInfo["execution_state"] = executionState
     topologyInfo["logical_plan"] = self.extract_logical_plan(topology)
     topologyInfo["physical_plan"] = self.extract_physical_plan(topology)
+    topologyInfo["packing_plan"] = self.extract_packing_plan(topology)
     topologyInfo["tmaster_location"] = self.extract_tmaster(topology)
     topologyInfo["scheduler_location"] = self.extract_scheduler_location(topology)
 
diff --git a/heron/tools/tracker/tests/python/mock_proto.py b/heron/tools/tracker/tests/python/mock_proto.py
index d8fa9df..f3d4014 100644
--- a/heron/tools/tracker/tests/python/mock_proto.py
+++ b/heron/tools/tracker/tests/python/mock_proto.py
@@ -2,6 +2,7 @@
 from heronpy.api import api_constants
 import heron.proto.execution_state_pb2 as protoEState
 import heron.proto.physical_plan_pb2 as protoPPlan
+import heron.proto.packing_plan_pb2 as protoPackingPlan
 import heron.proto.tmaster_pb2 as protoTmaster
 import heron.proto.topology_pb2 as protoTopology
 
@@ -27,6 +28,37 @@ class MockProto(object):
       spout.outputs.add().stream.CopyFrom(stream)
     return spout
 
+  def create_mock_resource(self):
+    resource = protoPackingPlan.Resource()
+    resource.cpu = 1.0
+    resource.ram = 1024
+    resource.disk = 1024 * 2
+    return resource
+
+  def create_mock_instance_plan(self):
+    instancePlan = protoPackingPlan.InstancePlan()
+    instancePlan.component_name  = "word"
+    instancePlan.task_id = 1
+    instancePlan.component_index = 1
+    instancePlan.resource.CopyFrom(self.create_mock_resource())
+    return instancePlan
+
+  def create_mock_simple_container_plan(self):
+    containerPlan = protoPackingPlan.ContainerPlan()
+    containerPlan.id = 1
+    containerPlan.instance_plans.extend([self.create_mock_instance_plan()])
+    containerPlan.requiredResource.CopyFrom(self.create_mock_resource())
+
+    return containerPlan
+
+  def create_mock_simple_container_plan2(self):
+    containerPlan = protoPackingPlan.ContainerPlan()
+    containerPlan.id = 1
+    containerPlan.instance_plans.extend([self.create_mock_instance_plan()])
+    containerPlan.requiredResource.CopyFrom(self.create_mock_resource())
+    containerPlan.scheduledResource.CopyFrom(self.create_mock_resource())
+    return containerPlan
+
   def create_mock_bolt(self,
                        bolt_name,
                        input_streams,
@@ -138,6 +170,21 @@ class MockProto(object):
         bolt_parallelism))
     return pplan
 
+  def create_mock_simple_packing_plan(
+    self):
+    packingPlan = protoPackingPlan.PackingPlan()
+    packingPlan.id = "ExclamationTopology"
+    packingPlan.container_plans.extend([self.create_mock_simple_container_plan()])
+    return packingPlan
+
+  def create_mock_simple_packing_plan2(
+    self):
+    packingPlan = protoPackingPlan.PackingPlan()
+    packingPlan.id = "ExclamationTopology"
+    packingPlan.container_plans.extend([self.create_mock_simple_container_plan2()])
+    packingPlan.container_plans.extend([self.create_mock_simple_container_plan()])
+    return packingPlan
+
   def create_mock_medium_physical_plan(
       self,
       spout_parallelism=1,
diff --git a/heron/tools/tracker/tests/python/topology_unittest.py b/heron/tools/tracker/tests/python/topology_unittest.py
index fdbfb67..952e3a4 100644
--- a/heron/tools/tracker/tests/python/topology_unittest.py
+++ b/heron/tools/tracker/tests/python/topology_unittest.py
@@ -22,6 +22,25 @@ class TopologyTest(unittest.TestCase):
     self.assertEqual(MockProto.topology_id, self.topology.id)
     self.assertEqual(physical_plan, self.topology.physical_plan)
 
+  def test_set_packing_plan(self):
+    # Set it to None
+    self.topology.set_packing_plan(None)
+    self.assertIsNone(self.topology.id)
+    self.assertIsNone(self.topology.packing_plan)
+
+    packing_plan = MockProto().create_mock_simple_packing_plan()
+    self.topology.set_packing_plan(packing_plan)
+    self.assertEqual(packing_plan, self.topology.packing_plan)
+
+    # testing with a packing plan with scheduled resources
+    self.topology.set_packing_plan(None)
+    self.assertIsNone(self.topology.id)
+    self.assertIsNone(self.topology.packing_plan)
+
+    packing_plan = MockProto().create_mock_simple_packing_plan2()
+    self.topology.set_packing_plan(packing_plan)
+    self.assertEqual(packing_plan, self.topology.packing_plan)
+
   def test_set_execution_state(self):
     # Set it to None
     self.topology.set_execution_state(None)