You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2021/09/13 18:53:04 UTC

[GitHub] [beam] rohdesamuel commented on a change in pull request #15490: [BEAM-10708] Introspect beam_sql output

rohdesamuel commented on a change in pull request #15490:
URL: https://github.com/apache/beam/pull/15490#discussion_r707439393



##########
File path: sdks/python/apache_beam/runners/interactive/caching/cacheable.py
##########
@@ -26,24 +26,22 @@
 from dataclasses import dataclass
 
 import apache_beam as beam
-from apache_beam.runners.interactive.utils import obfuscate
 
 
 @dataclass
 class Cacheable:
-  pcoll_id: str
   var: str
   version: str
-  pcoll: beam.pvalue.PCollection
   producer_version: str
+  pcoll: beam.pvalue.PCollection

Review comment:
       Is there a reason this field was moved?

##########
File path: sdks/python/apache_beam/runners/interactive/display/pipeline_graph.py
##########
@@ -182,8 +182,8 @@ def _generate_graph_dicts(self):
       for pcoll_id in transform.outputs.values():
         pcoll_node = None
         if self._pipeline_instrument:
-          pcoll_node = self._pipeline_instrument.cacheable_var_by_pcoll_id(
-              pcoll_id)
+          cacheable = self._pipeline_instrument._cacheables.get(pcoll_id)

Review comment:
       It's a code smell when another class access the private members of another. Is there another way to do this?

##########
File path: sdks/python/apache_beam/runners/interactive/recording_manager.py
##########
@@ -167,14 +166,14 @@ def __init__(
     self._result_lock = threading.Lock()
     self._pcolls = pcolls
 
-    pcoll_var = lambda pcoll: pipeline_instrument.cacheable_var_by_pcoll_id(
-        pipeline_instrument.pcolls_to_pcoll_id.get(str(pcoll), None))
+    name_by_pcoll = {v: k for k, v in utils.pcoll_by_name().items()}
+    pcoll_var = lambda pcoll: name_by_pcoll.get(pcoll, None)

Review comment:
       You can inline this. The lambda was originally written because the line was too long.

##########
File path: sdks/python/apache_beam/runners/interactive/interactive_runner.py
##########
@@ -246,7 +247,7 @@ def read(self, pcoll, include_window_info=False):
     If include_window_info is True, then returns the elements as
     WindowedValues. Otherwise, return the element as itself.
     """
-    key = self._pipeline_instrument.cache_key(pcoll)
+    key = self._pipeline_instrument.cacheable_key(pcoll)

Review comment:
       what's the difference between a `cache_key` and a `cacheable_key`?

##########
File path: sdks/python/apache_beam/runners/interactive/interactive_environment.py
##########
@@ -235,9 +239,16 @@ def is_in_notebook(self):
   @property
   def inspector(self):
     """Gets the singleton InteractiveEnvironmentInspector to retrieve
-    information consumable by other applications."""
+    information consumable by other applications such as a notebook
+    extension."""
     return self._inspector
 
+  @property
+  def inspector_with_synthetic(self):
+    """Gets the singleton InteractiveEnvironmentInspector with additional
+    synthetic variables generated by Interactive Beam. Internally used."""
+    return self._inspector_with_synthetic
+

Review comment:
       Why have a separate inspector? Specifically, why separate the notion of having an inspector with and without synthetic PCollections?

##########
File path: sdks/python/apache_beam/runners/interactive/messaging/interactive_environment_inspector.py
##########
@@ -36,16 +36,17 @@ class InteractiveEnvironmentInspector(object):
   list_inspectables first then communicates back to the kernel and get_val for
   usage on the kernel side.
   """
-  def __init__(self):
+  def __init__(self, ignore_synthetic=True):

Review comment:
       Same question, why do we need this boolean?

##########
File path: sdks/python/apache_beam/runners/interactive/display/pcoll_visualization.py
##########
@@ -236,6 +239,30 @@ def continuous_update_display():  # pylint: disable=unused-variable
   return None
 
 
+def visualize_computed_pcoll(
+    pcoll_name: str, pcoll: beam.pvalue.PValue) -> None:
+  """A simple visualize alternative.
+
+  When the pcoll_name and pcoll pair identifies a watched and computed
+  PCollection in the current interactive environment without ambiguity, an
+  ElementStream can be built directly from cache.
+  """
+  try:
+    pipeline = ie.current_env().user_pipeline(pcoll.pipeline)
+    cache_manager = ie.current_env().get_cache_manager(pipeline)
+    cache_key = CacheKey.from_pcoll(pcoll_name, pcoll).to_str()
+    reader, _ = cache_manager.read('full', cache_key)
+    visualize(
+        ElementStream(pcoll, pcoll_name, cache_key, float('inf'), float('inf')),
+        element_type=pcoll.element_type)

Review comment:
       Please move this logic to the RecordingManager class as a `read(pcoll: PValue)` method. It makes most sense to bundle as much logic about reading from cache and stream manipulation into that class so that this code isn't duplicated everywhere. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org