You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@heron.apache.org by GitBox <gi...@apache.org> on 2022/05/12 13:31:32 UTC

[GitHub] [incubator-heron] thinker0 opened a new pull request, #3830: Update scheduler_location of topology

thinker0 opened a new pull request, #3830:
URL: https://github.com/apache/incubator-heron/pull/3830

   
   Update scheduler_location of topology 
   #3820 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@heron.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-heron] thinker0 commented on a diff in pull request #3830: Fix Update of topology

Posted by GitBox <gi...@apache.org>.
thinker0 commented on code in PR #3830:
URL: https://github.com/apache/incubator-heron/pull/3830#discussion_r887773191


##########
heron/tools/tracker/src/python/tracker.py:
##########
@@ -64,17 +66,18 @@ def sync_topologies(self) -> None:
     def on_topologies_watch(state_manager: StateManager, topologies: List[str]) -> None:
       """watch topologies"""
       topologies = set(topologies)
-      Log.info("State watch triggered for topologies.")
-      Log.debug("Topologies: %s", topologies)
-      cached_names = {t.name for t in self.get_stmgr_topologies(state_manager.name)}
-      Log.debug("Existing topologies: %s", cached_names)
-      for name in cached_names - topologies:
-        Log.info("Removing topology: %s in rootpath: %s",
-                 name, state_manager.rootpath)
-        self.remove_topology(name, state_manager.name)
-
-      for name in topologies - cached_names:
-        self.add_new_topology(state_manager, name)
+      Log.info(f"State watch triggered for topologies of {state_manager.name}.")
+      Log.debug(f"Received topologies: {state_manager.name}, {topologies}")

Review Comment:
   Updated



##########
heron/tools/tracker/src/python/tracker.py:
##########
@@ -64,17 +66,18 @@ def sync_topologies(self) -> None:
     def on_topologies_watch(state_manager: StateManager, topologies: List[str]) -> None:
       """watch topologies"""
       topologies = set(topologies)
-      Log.info("State watch triggered for topologies.")
-      Log.debug("Topologies: %s", topologies)
-      cached_names = {t.name for t in self.get_stmgr_topologies(state_manager.name)}
-      Log.debug("Existing topologies: %s", cached_names)
-      for name in cached_names - topologies:
-        Log.info("Removing topology: %s in rootpath: %s",
-                 name, state_manager.rootpath)
-        self.remove_topology(name, state_manager.name)
-
-      for name in topologies - cached_names:
-        self.add_new_topology(state_manager, name)
+      Log.info(f"State watch triggered for topologies of {state_manager.name}.")
+      Log.debug(f"Received topologies: {state_manager.name}, {topologies}")
+      cached_names = [t.name for t in self.get_stmgr_topologies(state_manager.name)]

Review Comment:
   Updated



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@heron.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-heron] thinker0 commented on a diff in pull request #3830: Fix Update of topology

Posted by GitBox <gi...@apache.org>.
thinker0 commented on code in PR #3830:
URL: https://github.com/apache/incubator-heron/pull/3830#discussion_r887735790


##########
heron/tools/tracker/src/python/topology.py:
##########
@@ -241,8 +244,16 @@ def __init__(self, name: str, state_manager_name: str, tracker_config: Config) -
     self.id: Optional[int] = None
     self.tracker_config: Config = tracker_config
     # this maps pb2 structs to structures returned via API endpoints
-    # it is repopulated every time one of the pb2 roperties is updated
+    # it is repopulated every time one of the pb2 properties is updated
     self.info: Optional[TopologyInfo] = None
+    self.lock = threading.RLock()
+
+  def __eq__(self, o):
+    return isinstance(o, Topology) \
+           and o.name == self.name \
+           and o.state_manager_name == self.state_manager_name \
+           and o.cluster == self.cluster \
+           and o.environ == self.environ

Review Comment:
   ```
   if name not in topologies:
   ```
   
   Since @dataclass compares all values and judges that only the exact same Object is the same, we want some values to be the same.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@heron.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-heron] thinker0 commented on a diff in pull request #3830: Fix Update of topology

Posted by GitBox <gi...@apache.org>.
thinker0 commented on code in PR #3830:
URL: https://github.com/apache/incubator-heron/pull/3830#discussion_r887772727


##########
heron/tools/tracker/src/python/tracker.py:
##########
@@ -114,37 +121,37 @@ def get_stmgr_topologies(self, name: str) -> List[Any]:
     """
     return [t for t in self.topologies if t.state_manager_name == name]
 
-  def add_new_topology(self, state_manager, topology_name: str) -> None:
+  def add_new_topology(self, state_manager: StateManager, topology_name: str) -> None:
     """
     Adds a topology in the local cache, and sets a watch
     on any changes on the topology.
     """
     topology = Topology(topology_name, state_manager.name, self.config)
-    Log.info("Adding new topology: %s, state_manager: %s",
-             topology_name, state_manager.name)
-    # populate the cache before making it addressable in the topologies to
-    # avoid races due to concurrent execution
-    self.topologies.append(topology)
-
-    # Set watches on the pplan, execution_state, tmanager and scheduler_location.
-    state_manager.get_pplan(topology_name, topology.set_physical_plan)
-    state_manager.get_packing_plan(topology_name, topology.set_packing_plan)
-    state_manager.get_execution_state(topology_name, topology.set_execution_state)
-    state_manager.get_tmanager(topology_name, topology.set_tmanager)
-    state_manager.get_scheduler_location(topology_name, topology.set_scheduler_location)
+    with self.lock:
+      if topology not in self.topologies:
+        Log.info(f"Adding new topology: {topology_name}, state_manager: {state_manager.name}")
+        self.topologies.append(topology)
+
+      # Set watches on the pplan, execution_state, tmanager and scheduler_location.
+      state_manager.get_pplan(topology_name, topology.set_physical_plan)
+      state_manager.get_packing_plan(topology_name, topology.set_packing_plan)
+      state_manager.get_execution_state(topology_name, topology.set_execution_state)
+      state_manager.get_tmanager(topology_name, topology.set_tmanager)
+      state_manager.get_scheduler_location(topology_name, topology.set_scheduler_location)

Review Comment:
   Updated



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@heron.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-heron] Code0x58 commented on a diff in pull request #3830: Fix Update of topology

Posted by GitBox <gi...@apache.org>.
Code0x58 commented on code in PR #3830:
URL: https://github.com/apache/incubator-heron/pull/3830#discussion_r895150253


##########
heron/tools/tracker/src/python/tracker.py:
##########
@@ -64,17 +66,18 @@ def sync_topologies(self) -> None:
     def on_topologies_watch(state_manager: StateManager, topologies: List[str]) -> None:
       """watch topologies"""
       topologies = set(topologies)
-      Log.info("State watch triggered for topologies.")
-      Log.debug("Topologies: %s", topologies)
-      cached_names = {t.name for t in self.get_stmgr_topologies(state_manager.name)}
-      Log.debug("Existing topologies: %s", cached_names)
-      for name in cached_names - topologies:
-        Log.info("Removing topology: %s in rootpath: %s",
-                 name, state_manager.rootpath)
-        self.remove_topology(name, state_manager.name)
-
-      for name in topologies - cached_names:
-        self.add_new_topology(state_manager, name)
+      Log.info(f"State watch triggered for topologies of {state_manager.name}.")
+      Log.debug(f"Received topologies: {state_manager.name}, {topologies}")

Review Comment:
   is that the case for things like `Log.debug("Existing topologies: %s", cached_names")`? That is different from `"Existing topologies: %s" % cached_names` which is where I'd expect to see what linter warning



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@heron.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-heron] thinker0 commented on a diff in pull request #3830: Fix Update of topology

Posted by GitBox <gi...@apache.org>.
thinker0 commented on code in PR #3830:
URL: https://github.com/apache/incubator-heron/pull/3830#discussion_r887840888


##########
heron/tools/tracker/src/python/topology.py:
##########
@@ -589,30 +601,31 @@ def _update(
       scheduler_location=...,
     ) -> None:
     """Atomically update this instance to avoid inconsistent reads/writes from other threads."""

Review Comment:
   ```
   heron update lad/www/beta heron-system-access-prometheus --component-parallelism parser:1
   heron update lad/www/beta heron-system-access-prometheus --component-parallelism parser:2
   ```
   Lock is required because the above ```_update``` is executed at the same time when instance update occurs.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@heron.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-heron] thinker0 commented on a diff in pull request #3830: Fix Update of topology

Posted by GitBox <gi...@apache.org>.
thinker0 commented on code in PR #3830:
URL: https://github.com/apache/incubator-heron/pull/3830#discussion_r887802073


##########
heron/tools/tracker/src/python/topology.py:
##########
@@ -589,30 +601,31 @@ def _update(
       scheduler_location=...,
     ) -> None:
     """Atomically update this instance to avoid inconsistent reads/writes from other threads."""

Review Comment:
   ???



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@heron.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-heron] thinker0 commented on a diff in pull request #3830: Fix Update of topology

Posted by GitBox <gi...@apache.org>.
thinker0 commented on code in PR #3830:
URL: https://github.com/apache/incubator-heron/pull/3830#discussion_r888477101


##########
heron/tools/tracker/src/python/tracker.py:
##########
@@ -64,17 +66,18 @@ def sync_topologies(self) -> None:
     def on_topologies_watch(state_manager: StateManager, topologies: List[str]) -> None:
       """watch topologies"""
       topologies = set(topologies)
-      Log.info("State watch triggered for topologies.")
-      Log.debug("Topologies: %s", topologies)
-      cached_names = {t.name for t in self.get_stmgr_topologies(state_manager.name)}
-      Log.debug("Existing topologies: %s", cached_names)
-      for name in cached_names - topologies:
-        Log.info("Removing topology: %s in rootpath: %s",
-                 name, state_manager.rootpath)
-        self.remove_topology(name, state_manager.name)
-
-      for name in topologies - cached_names:
-        self.add_new_topology(state_manager, name)
+      Log.info(f"State watch triggered for topologies of {state_manager.name}.")
+      Log.debug(f"Received topologies: {state_manager.name}, {topologies}")

Review Comment:
   ```
   (format_str, *args)
   ```
   If you use this, have the following problem in stylecheck.
   ```
   bazel test --config=stylecheck heron/tools/tracker/...
   ```
   ```
   C0209: Formatting a regular string which could be a f-string (consider-using-f-string)
   ```
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@heron.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-heron] thinker0 commented on a diff in pull request #3830: Fix Update of topology

Posted by GitBox <gi...@apache.org>.
thinker0 commented on code in PR #3830:
URL: https://github.com/apache/incubator-heron/pull/3830#discussion_r884033927


##########
heron/tools/tracker/src/python/metricstimeline.py:
##########
@@ -84,7 +84,7 @@ async def get_metrics_timeline(
 
   # Form and send the http request.
   url = f"http://{tmanager.host}:{tmanager.stats_port}/stats"
-  Log.debug(f"Making HTTP call to fetch metrics: {url}")
+  # Log.debug(f"Making HTTP call to fetch metrics: {url}")

Review Comment:
    Reduce debug log https://github.com/apache/incubator-heron/pull/3836



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@heron.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-heron] thinker0 commented on a diff in pull request #3830: Fix Update of topology

Posted by GitBox <gi...@apache.org>.
thinker0 commented on code in PR #3830:
URL: https://github.com/apache/incubator-heron/pull/3830#discussion_r887790060


##########
heron/tools/tracker/src/python/tracker.py:
##########
@@ -100,11 +103,15 @@ def get_topology(
                   and t.cluster == cluster
                   and (not role or t.execution_state.role == role)
                   and t.environ == environ]
-    if len(topologies) != 1:
+    if len(topologies) == 0:

Review Comment:
   okay. I'll erase it.
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@heron.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-heron] thinker0 commented on a diff in pull request #3830: Fix Update of topology

Posted by GitBox <gi...@apache.org>.
thinker0 commented on code in PR #3830:
URL: https://github.com/apache/incubator-heron/pull/3830#discussion_r888477101


##########
heron/tools/tracker/src/python/tracker.py:
##########
@@ -64,17 +66,18 @@ def sync_topologies(self) -> None:
     def on_topologies_watch(state_manager: StateManager, topologies: List[str]) -> None:
       """watch topologies"""
       topologies = set(topologies)
-      Log.info("State watch triggered for topologies.")
-      Log.debug("Topologies: %s", topologies)
-      cached_names = {t.name for t in self.get_stmgr_topologies(state_manager.name)}
-      Log.debug("Existing topologies: %s", cached_names)
-      for name in cached_names - topologies:
-        Log.info("Removing topology: %s in rootpath: %s",
-                 name, state_manager.rootpath)
-        self.remove_topology(name, state_manager.name)
-
-      for name in topologies - cached_names:
-        self.add_new_topology(state_manager, name)
+      Log.info(f"State watch triggered for topologies of {state_manager.name}.")
+      Log.debug(f"Received topologies: {state_manager.name}, {topologies}")

Review Comment:
   ```
   (format_str, *args)
   ```
   If you use this, you have the following problem in stylecheck.
   ```
   bazel test --config=stylecheck heron/tools/tracker/...
   ```
   ```
   C0209: Formatting a regular string which could be a f-string (consider-using-f-string)
   ```
   Reverted



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@heron.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-heron] thinker0 commented on pull request #3830: Fix Update of topology

Posted by GitBox <gi...@apache.org>.
thinker0 commented on PR #3830:
URL: https://github.com/apache/incubator-heron/pull/3830#issuecomment-1143287488

   In case of redeploy, the Topology Object information is conflicted in the tracker, and the normal Topology update is not performed, and the Tracker/UI becomes abnormal.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@heron.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-heron] thinker0 commented on pull request #3830: Fix Update of topology

Posted by GitBox <gi...@apache.org>.
thinker0 commented on PR #3830:
URL: https://github.com/apache/incubator-heron/pull/3830#issuecomment-1143585416

   1. If zookeeper is used, Race Condition occurs at the same time because part [Link](https://github.com/apache/incubator-heron/pull/3830/files#diff-29e7366069fde10ea907fccddae5506e19a2d8dcaa5f515ce417fec4957e10ffR604-R631) is executed at the same time. 
   2. There are cases where Topology is registered as a duplicate. A lock was used to remove the part where the race condition occurred. [Link](https://github.com/apache/incubator-heron/pull/3830/files#diff-08d60d80f8e06cd2706c6b1ac3e791e2a57e67178f420d89d69780969bc8eda9R131-R146)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@heron.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-heron] thinker0 commented on a diff in pull request #3830: Fix Update of topology

Posted by GitBox <gi...@apache.org>.
thinker0 commented on code in PR #3830:
URL: https://github.com/apache/incubator-heron/pull/3830#discussion_r884028622


##########
heron/tools/tracker/src/python/metricstimeline.py:
##########
@@ -84,7 +84,7 @@ async def get_metrics_timeline(
 
   # Form and send the http request.
   url = f"http://{tmanager.host}:{tmanager.stats_port}/stats"
-  Log.debug(f"Making HTTP call to fetch metrics: {url}")
+  # Log.debug(f"Making HTTP call to fetch metrics: {url}")

Review Comment:
   There are too many logs during DEBUG, so it would be better to delete them.
   
   I guess I'll have to do something else later to reduce the DEBUG log.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@heron.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-heron] thinker0 commented on a diff in pull request #3830: Fix Update of topology

Posted by GitBox <gi...@apache.org>.
thinker0 commented on code in PR #3830:
URL: https://github.com/apache/incubator-heron/pull/3830#discussion_r887812596


##########
heron/tools/tracker/src/python/topology.py:
##########
@@ -589,30 +601,31 @@ def _update(
       scheduler_location=...,
     ) -> None:
     """Atomically update this instance to avoid inconsistent reads/writes from other threads."""

Review Comment:
   If the code below is,
   
   When watching Zookeeper is used, Race Condition occurs at the same time because part [Link](https://github.com/apache/incubator-heron/pull/3830/files#diff-29e7366069fde10ea907fccddae5506e19a2d8dcaa5f515ce417fec4957e10ffR604-R631) is executed at the same time.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@heron.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-heron] thinker0 commented on a diff in pull request #3830: Fix Update of topology

Posted by GitBox <gi...@apache.org>.
thinker0 commented on code in PR #3830:
URL: https://github.com/apache/incubator-heron/pull/3830#discussion_r888477101


##########
heron/tools/tracker/src/python/tracker.py:
##########
@@ -64,17 +66,18 @@ def sync_topologies(self) -> None:
     def on_topologies_watch(state_manager: StateManager, topologies: List[str]) -> None:
       """watch topologies"""
       topologies = set(topologies)
-      Log.info("State watch triggered for topologies.")
-      Log.debug("Topologies: %s", topologies)
-      cached_names = {t.name for t in self.get_stmgr_topologies(state_manager.name)}
-      Log.debug("Existing topologies: %s", cached_names)
-      for name in cached_names - topologies:
-        Log.info("Removing topology: %s in rootpath: %s",
-                 name, state_manager.rootpath)
-        self.remove_topology(name, state_manager.name)
-
-      for name in topologies - cached_names:
-        self.add_new_topology(state_manager, name)
+      Log.info(f"State watch triggered for topologies of {state_manager.name}.")
+      Log.debug(f"Received topologies: {state_manager.name}, {topologies}")

Review Comment:
   ```
   bazel test --config=stylecheck heron/tools/tracker/...
   ```
   ```
   C0209: Formatting a regular string which could be a f-string (consider-using-f-string)
   ```
   Reverted



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@heron.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-heron] thinker0 commented on pull request #3830: Fix Update of topology

Posted by GitBox <gi...@apache.org>.
thinker0 commented on PR #3830:
URL: https://github.com/apache/incubator-heron/pull/3830#issuecomment-1150000396

   any other problems?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@heron.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-heron] nicknezis merged pull request #3830: Fix Update of topology

Posted by GitBox <gi...@apache.org>.
nicknezis merged PR #3830:
URL: https://github.com/apache/incubator-heron/pull/3830


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@heron.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-heron] thinker0 commented on pull request #3830: Fix Update of topology

Posted by GitBox <gi...@apache.org>.
thinker0 commented on PR #3830:
URL: https://github.com/apache/incubator-heron/pull/3830#issuecomment-1141586615

   Review @windhamwong @Code0x58 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@heron.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-heron] thinker0 commented on a diff in pull request #3830: Fix Update of topology

Posted by GitBox <gi...@apache.org>.
thinker0 commented on code in PR #3830:
URL: https://github.com/apache/incubator-heron/pull/3830#discussion_r887735790


##########
heron/tools/tracker/src/python/topology.py:
##########
@@ -241,8 +244,16 @@ def __init__(self, name: str, state_manager_name: str, tracker_config: Config) -
     self.id: Optional[int] = None
     self.tracker_config: Config = tracker_config
     # this maps pb2 structs to structures returned via API endpoints
-    # it is repopulated every time one of the pb2 roperties is updated
+    # it is repopulated every time one of the pb2 properties is updated
     self.info: Optional[TopologyInfo] = None
+    self.lock = threading.RLock()
+
+  def __eq__(self, o):
+    return isinstance(o, Topology) \
+           and o.name == self.name \
+           and o.state_manager_name == self.state_manager_name \
+           and o.cluster == self.cluster \
+           and o.environ == self.environ

Review Comment:
   ```
   if topology not in self.topologies:
   ```
   
   Since @dataclass compares all values and judges that only the exact same Object is the same, we want some values to be the same.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@heron.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-heron] thinker0 commented on a diff in pull request #3830: Fix Update of topology

Posted by GitBox <gi...@apache.org>.
thinker0 commented on code in PR #3830:
URL: https://github.com/apache/incubator-heron/pull/3830#discussion_r887778658


##########
heron/tools/tracker/src/python/tracker.py:
##########
@@ -114,37 +121,37 @@ def get_stmgr_topologies(self, name: str) -> List[Any]:
     """
     return [t for t in self.topologies if t.state_manager_name == name]
 
-  def add_new_topology(self, state_manager, topology_name: str) -> None:
+  def add_new_topology(self, state_manager: StateManager, topology_name: str) -> None:
     """
     Adds a topology in the local cache, and sets a watch
     on any changes on the topology.
     """
     topology = Topology(topology_name, state_manager.name, self.config)
-    Log.info("Adding new topology: %s, state_manager: %s",
-             topology_name, state_manager.name)
-    # populate the cache before making it addressable in the topologies to
-    # avoid races due to concurrent execution
-    self.topologies.append(topology)
-
-    # Set watches on the pplan, execution_state, tmanager and scheduler_location.
-    state_manager.get_pplan(topology_name, topology.set_physical_plan)
-    state_manager.get_packing_plan(topology_name, topology.set_packing_plan)
-    state_manager.get_execution_state(topology_name, topology.set_execution_state)
-    state_manager.get_tmanager(topology_name, topology.set_tmanager)
-    state_manager.get_scheduler_location(topology_name, topology.set_scheduler_location)
+    with self.lock:

Review Comment:
   ???



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@heron.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-heron] nicknezis commented on a diff in pull request #3830: Fix Update of topology

Posted by GitBox <gi...@apache.org>.
nicknezis commented on code in PR #3830:
URL: https://github.com/apache/incubator-heron/pull/3830#discussion_r895221023


##########
heron/tools/tracker/src/python/tracker.py:
##########
@@ -64,17 +66,18 @@ def sync_topologies(self) -> None:
     def on_topologies_watch(state_manager: StateManager, topologies: List[str]) -> None:
       """watch topologies"""
       topologies = set(topologies)
-      Log.info("State watch triggered for topologies.")
-      Log.debug("Topologies: %s", topologies)
-      cached_names = {t.name for t in self.get_stmgr_topologies(state_manager.name)}
-      Log.debug("Existing topologies: %s", cached_names)
-      for name in cached_names - topologies:
-        Log.info("Removing topology: %s in rootpath: %s",
-                 name, state_manager.rootpath)
-        self.remove_topology(name, state_manager.name)
-
-      for name in topologies - cached_names:
-        self.add_new_topology(state_manager, name)
+      Log.info(f"State watch triggered for topologies of {state_manager.name}.")
+      Log.debug(f"Received topologies: {state_manager.name}, {topologies}")

Review Comment:
   Correct. I don't think this was a style check issue. When I did the conversion to f-string during the Python upgrade, I left some of these Log statements in the (format_str, &args) format because it is more efficient. Doing the f-string first and then passing into a log statement that might not render is wasted effort. The Log statement will only do the string interpolation if the log statement needs to be rendered. Because of this, I felt it was better to keep it in the original form.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@heron.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-heron] nicknezis commented on a diff in pull request #3830: Fix Update of topology

Posted by GitBox <gi...@apache.org>.
nicknezis commented on code in PR #3830:
URL: https://github.com/apache/incubator-heron/pull/3830#discussion_r895332675


##########
heron/tools/tracker/src/python/tracker.py:
##########
@@ -64,17 +66,18 @@ def sync_topologies(self) -> None:
     def on_topologies_watch(state_manager: StateManager, topologies: List[str]) -> None:
       """watch topologies"""
       topologies = set(topologies)
-      Log.info("State watch triggered for topologies.")
-      Log.debug("Topologies: %s", topologies)
+      Log.info(f"State watch triggered for topologies of {state_manager.name}.")
+      Log.debug(f"Received topologies: {state_manager.name}, {topologies}")

Review Comment:
   ```suggestion
         Log.debug("Received topologies: %s, %s", state_manager.name, topologies)
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@heron.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-heron] Code0x58 commented on a diff in pull request #3830: Fix Update of topology

Posted by GitBox <gi...@apache.org>.
Code0x58 commented on code in PR #3830:
URL: https://github.com/apache/incubator-heron/pull/3830#discussion_r895150253


##########
heron/tools/tracker/src/python/tracker.py:
##########
@@ -64,17 +66,18 @@ def sync_topologies(self) -> None:
     def on_topologies_watch(state_manager: StateManager, topologies: List[str]) -> None:
       """watch topologies"""
       topologies = set(topologies)
-      Log.info("State watch triggered for topologies.")
-      Log.debug("Topologies: %s", topologies)
-      cached_names = {t.name for t in self.get_stmgr_topologies(state_manager.name)}
-      Log.debug("Existing topologies: %s", cached_names)
-      for name in cached_names - topologies:
-        Log.info("Removing topology: %s in rootpath: %s",
-                 name, state_manager.rootpath)
-        self.remove_topology(name, state_manager.name)
-
-      for name in topologies - cached_names:
-        self.add_new_topology(state_manager, name)
+      Log.info(f"State watch triggered for topologies of {state_manager.name}.")
+      Log.debug(f"Received topologies: {state_manager.name}, {topologies}")

Review Comment:
   is that the case for things like `Log.debug("Existing topologies: %s", cached_names")`? That is different from `Log.debug("Existing topologies: %s" % cached_names)` which is where I'd expect to see what linter warning (and a warning about using a pre-formatted string in a log method, but maybe my styling there is out of date)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@heron.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-heron] windhamwong commented on pull request #3830: Fix Update of topology

Posted by GitBox <gi...@apache.org>.
windhamwong commented on PR #3830:
URL: https://github.com/apache/incubator-heron/pull/3830#issuecomment-1143168146

   The code looks good to me, but just wonder why the issue happens before this PR. I believe the lock works here well, but finding the root cause might be the key to fix the fundamental issue


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@heron.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-heron] thinker0 commented on a diff in pull request #3830: Fix Update of topology

Posted by GitBox <gi...@apache.org>.
thinker0 commented on code in PR #3830:
URL: https://github.com/apache/incubator-heron/pull/3830#discussion_r887743287


##########
heron/tools/tracker/src/python/topology.py:
##########
@@ -241,8 +244,16 @@ def __init__(self, name: str, state_manager_name: str, tracker_config: Config) -
     self.id: Optional[int] = None
     self.tracker_config: Config = tracker_config
     # this maps pb2 structs to structures returned via API endpoints
-    # it is repopulated every time one of the pb2 roperties is updated
+    # it is repopulated every time one of the pb2 properties is updated
     self.info: Optional[TopologyInfo] = None
+    self.lock = threading.RLock()
+
+  def __eq__(self, o):
+    return isinstance(o, Topology) \
+           and o.name == self.name \
+           and o.state_manager_name == self.state_manager_name \
+           and o.cluster == self.cluster \
+           and o.environ == self.environ

Review Comment:
   If this part is missing, topologies are added as duplicates.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@heron.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-heron] Code0x58 commented on a diff in pull request #3830: Fix Update of topology

Posted by GitBox <gi...@apache.org>.
Code0x58 commented on code in PR #3830:
URL: https://github.com/apache/incubator-heron/pull/3830#discussion_r887689080


##########
heron/tools/tracker/src/python/topology.py:
##########
@@ -241,8 +244,16 @@ def __init__(self, name: str, state_manager_name: str, tracker_config: Config) -
     self.id: Optional[int] = None
     self.tracker_config: Config = tracker_config
     # this maps pb2 structs to structures returned via API endpoints
-    # it is repopulated every time one of the pb2 roperties is updated
+    # it is repopulated every time one of the pb2 properties is updated
     self.info: Optional[TopologyInfo] = None
+    self.lock = threading.RLock()
+
+  def __eq__(self, o):
+    return isinstance(o, Topology) \
+           and o.name == self.name \
+           and o.state_manager_name == self.state_manager_name \
+           and o.cluster == self.cluster \
+           and o.environ == self.environ

Review Comment:
   to be pedantic, you might want some sort of lock that simultaneously locks both `self` and `o`, e.g. some global lock on state to ensure a consistent read for comparison, but that is likely over-the-top for this.
   
   was this introduced for set operations, otherwise I'm not spotting where this is being used.



##########
heron/tools/tracker/src/python/tracker.py:
##########
@@ -64,17 +66,18 @@ def sync_topologies(self) -> None:
     def on_topologies_watch(state_manager: StateManager, topologies: List[str]) -> None:
       """watch topologies"""
       topologies = set(topologies)
-      Log.info("State watch triggered for topologies.")
-      Log.debug("Topologies: %s", topologies)
-      cached_names = {t.name for t in self.get_stmgr_topologies(state_manager.name)}
-      Log.debug("Existing topologies: %s", cached_names)
-      for name in cached_names - topologies:
-        Log.info("Removing topology: %s in rootpath: %s",
-                 name, state_manager.rootpath)
-        self.remove_topology(name, state_manager.name)
-
-      for name in topologies - cached_names:
-        self.add_new_topology(state_manager, name)
+      Log.info(f"State watch triggered for topologies of {state_manager.name}.")
+      Log.debug(f"Received topologies: {state_manager.name}, {topologies}")

Review Comment:
   it's generally recommended to leave the logging in the form `(format_str, *args)`, as it avoids unnecessary stringification and interpolation (i.e. when the message level is lower than the configured logging level), so would be good to change back to that format



##########
heron/tools/tracker/src/python/topology.py:
##########
@@ -589,30 +601,31 @@ def _update(
       scheduler_location=...,
     ) -> None:
     """Atomically update this instance to avoid inconsistent reads/writes from other threads."""

Review Comment:
   If there was one topology per thread, then I don't think the lock would have an impact, but that's a big _if_ given I can't recall the threading structure. If that was the case, you'd probably want the lock for a consistent read, e.g. while doing a comparison/`__eq__`.



##########
heron/tools/tracker/src/python/tracker.py:
##########
@@ -100,11 +103,15 @@ def get_topology(
                   and t.cluster == cluster
                   and (not role or t.execution_state.role == role)
                   and t.environ == environ]
-    if len(topologies) != 1:
+    if len(topologies) == 0:

Review Comment:
   as this is a behaviour change, is it known to rectify anything? now it warns if there is more than one topology, but not I wonder if that is a but. I don't recall if this would cause issues (without reading around a lot), so not sure it should be included unless it is a known improvement



##########
heron/tools/tracker/src/python/tracker.py:
##########
@@ -64,17 +66,18 @@ def sync_topologies(self) -> None:
     def on_topologies_watch(state_manager: StateManager, topologies: List[str]) -> None:
       """watch topologies"""
       topologies = set(topologies)
-      Log.info("State watch triggered for topologies.")
-      Log.debug("Topologies: %s", topologies)
-      cached_names = {t.name for t in self.get_stmgr_topologies(state_manager.name)}
-      Log.debug("Existing topologies: %s", cached_names)
-      for name in cached_names - topologies:
-        Log.info("Removing topology: %s in rootpath: %s",
-                 name, state_manager.rootpath)
-        self.remove_topology(name, state_manager.name)
-
-      for name in topologies - cached_names:
-        self.add_new_topology(state_manager, name)
+      Log.info(f"State watch triggered for topologies of {state_manager.name}.")
+      Log.debug(f"Received topologies: {state_manager.name}, {topologies}")
+      cached_names = [t.name for t in self.get_stmgr_topologies(state_manager.name)]

Review Comment:
   was there a reason to convert this from a set to a list? the code below still ends up doing what are essentially set operations, just implicitly and slower



##########
heron/tools/tracker/src/python/tracker.py:
##########
@@ -114,37 +121,37 @@ def get_stmgr_topologies(self, name: str) -> List[Any]:
     """
     return [t for t in self.topologies if t.state_manager_name == name]
 
-  def add_new_topology(self, state_manager, topology_name: str) -> None:
+  def add_new_topology(self, state_manager: StateManager, topology_name: str) -> None:
     """
     Adds a topology in the local cache, and sets a watch
     on any changes on the topology.
     """
     topology = Topology(topology_name, state_manager.name, self.config)
-    Log.info("Adding new topology: %s, state_manager: %s",
-             topology_name, state_manager.name)
-    # populate the cache before making it addressable in the topologies to
-    # avoid races due to concurrent execution
-    self.topologies.append(topology)
-
-    # Set watches on the pplan, execution_state, tmanager and scheduler_location.
-    state_manager.get_pplan(topology_name, topology.set_physical_plan)
-    state_manager.get_packing_plan(topology_name, topology.set_packing_plan)
-    state_manager.get_execution_state(topology_name, topology.set_execution_state)
-    state_manager.get_tmanager(topology_name, topology.set_tmanager)
-    state_manager.get_scheduler_location(topology_name, topology.set_scheduler_location)
+    with self.lock:

Review Comment:
   I guess `state_manager.name` can't change and `self.config` can't change



##########
heron/tools/tracker/src/python/tracker.py:
##########
@@ -114,37 +121,37 @@ def get_stmgr_topologies(self, name: str) -> List[Any]:
     """
     return [t for t in self.topologies if t.state_manager_name == name]
 
-  def add_new_topology(self, state_manager, topology_name: str) -> None:
+  def add_new_topology(self, state_manager: StateManager, topology_name: str) -> None:
     """
     Adds a topology in the local cache, and sets a watch
     on any changes on the topology.
     """
     topology = Topology(topology_name, state_manager.name, self.config)
-    Log.info("Adding new topology: %s, state_manager: %s",
-             topology_name, state_manager.name)
-    # populate the cache before making it addressable in the topologies to
-    # avoid races due to concurrent execution
-    self.topologies.append(topology)
-
-    # Set watches on the pplan, execution_state, tmanager and scheduler_location.
-    state_manager.get_pplan(topology_name, topology.set_physical_plan)
-    state_manager.get_packing_plan(topology_name, topology.set_packing_plan)
-    state_manager.get_execution_state(topology_name, topology.set_execution_state)
-    state_manager.get_tmanager(topology_name, topology.set_tmanager)
-    state_manager.get_scheduler_location(topology_name, topology.set_scheduler_location)
+    with self.lock:
+      if topology not in self.topologies:
+        Log.info(f"Adding new topology: {topology_name}, state_manager: {state_manager.name}")
+        self.topologies.append(topology)
+
+      # Set watches on the pplan, execution_state, tmanager and scheduler_location.
+      state_manager.get_pplan(topology_name, topology.set_physical_plan)
+      state_manager.get_packing_plan(topology_name, topology.set_packing_plan)
+      state_manager.get_execution_state(topology_name, topology.set_execution_state)
+      state_manager.get_tmanager(topology_name, topology.set_tmanager)
+      state_manager.get_scheduler_location(topology_name, topology.set_scheduler_location)

Review Comment:
   I suppose here you might want a lock on the `state_manager` for a consistent read, unless there's lock or immutability somewhere else to keep things consistent



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@heron.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-heron] thinker0 commented on a diff in pull request #3830: Fix Update of topology

Posted by GitBox <gi...@apache.org>.
thinker0 commented on code in PR #3830:
URL: https://github.com/apache/incubator-heron/pull/3830#discussion_r887812596


##########
heron/tools/tracker/src/python/topology.py:
##########
@@ -589,30 +601,31 @@ def _update(
       scheduler_location=...,
     ) -> None:
     """Atomically update this instance to avoid inconsistent reads/writes from other threads."""

Review Comment:
   If the code below is,
   
   When watching Zookeeper is used, Race Condition occurs at the same time because part [Link](https://github.com/apache/incubator-heron/blob/d1cc8dd69223fcce41fccdbc2472864e8c1a5222/heron/tools/tracker/src/python/topology.py#L629-L647) is executed at the same time.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@heron.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-heron] thinker0 commented on pull request #3830: Fix Update of topology

Posted by GitBox <gi...@apache.org>.
thinker0 commented on PR #3830:
URL: https://github.com/apache/incubator-heron/pull/3830#issuecomment-1143589057

   When watching Zookeeper, the [Link](https://github.com/apache/incubator-heron/blob/72f0bb01d0ba7adac4f0086489b0ddf130e0b378/heron/tools/tracker/src/python/topology.py#L633-L651) part is executed at the same time.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@heron.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-heron] thinker0 commented on a diff in pull request #3830: Fix Update of topology

Posted by GitBox <gi...@apache.org>.
thinker0 commented on code in PR #3830:
URL: https://github.com/apache/incubator-heron/pull/3830#discussion_r888477101


##########
heron/tools/tracker/src/python/tracker.py:
##########
@@ -64,17 +66,18 @@ def sync_topologies(self) -> None:
     def on_topologies_watch(state_manager: StateManager, topologies: List[str]) -> None:
       """watch topologies"""
       topologies = set(topologies)
-      Log.info("State watch triggered for topologies.")
-      Log.debug("Topologies: %s", topologies)
-      cached_names = {t.name for t in self.get_stmgr_topologies(state_manager.name)}
-      Log.debug("Existing topologies: %s", cached_names)
-      for name in cached_names - topologies:
-        Log.info("Removing topology: %s in rootpath: %s",
-                 name, state_manager.rootpath)
-        self.remove_topology(name, state_manager.name)
-
-      for name in topologies - cached_names:
-        self.add_new_topology(state_manager, name)
+      Log.info(f"State watch triggered for topologies of {state_manager.name}.")
+      Log.debug(f"Received topologies: {state_manager.name}, {topologies}")

Review Comment:
   ```
   (format_str, *args)
   ```
   If you use this, have the following problem in stylecheck.
   ```
   bazel test --config=stylecheck heron/tools/tracker/...
   ```
   ```
   C0209: Formatting a regular string which could be a f-string (consider-using-f-string)
   ```
   Reverted



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@heron.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-heron] nicknezis commented on a diff in pull request #3830: Fix Update of topology

Posted by GitBox <gi...@apache.org>.
nicknezis commented on code in PR #3830:
URL: https://github.com/apache/incubator-heron/pull/3830#discussion_r883644429


##########
heron/tools/tracker/src/python/metricstimeline.py:
##########
@@ -84,7 +84,7 @@ async def get_metrics_timeline(
 
   # Form and send the http request.
   url = f"http://{tmanager.host}:{tmanager.stats_port}/stats"
-  Log.debug(f"Making HTTP call to fetch metrics: {url}")
+  # Log.debug(f"Making HTTP call to fetch metrics: {url}")

Review Comment:
   Lets either remove or put back the log statement. I assume this commit was by accident?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@heron.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org