You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2021/09/13 18:53:04 UTC
[GitHub] [beam] rohdesamuel commented on a change in pull request #15490: [BEAM-10708] Introspect beam_sql output
rohdesamuel commented on a change in pull request #15490:
URL: https://github.com/apache/beam/pull/15490#discussion_r707439393
##########
File path: sdks/python/apache_beam/runners/interactive/caching/cacheable.py
##########
@@ -26,24 +26,22 @@
from dataclasses import dataclass
import apache_beam as beam
-from apache_beam.runners.interactive.utils import obfuscate
@dataclass
class Cacheable:
- pcoll_id: str
var: str
version: str
- pcoll: beam.pvalue.PCollection
producer_version: str
+ pcoll: beam.pvalue.PCollection
Review comment:
Is there a reason this field was moved?
##########
File path: sdks/python/apache_beam/runners/interactive/display/pipeline_graph.py
##########
@@ -182,8 +182,8 @@ def _generate_graph_dicts(self):
for pcoll_id in transform.outputs.values():
pcoll_node = None
if self._pipeline_instrument:
- pcoll_node = self._pipeline_instrument.cacheable_var_by_pcoll_id(
- pcoll_id)
+ cacheable = self._pipeline_instrument._cacheables.get(pcoll_id)
Review comment:
It's a code smell when another class access the private members of another. Is there another way to do this?
##########
File path: sdks/python/apache_beam/runners/interactive/recording_manager.py
##########
@@ -167,14 +166,14 @@ def __init__(
self._result_lock = threading.Lock()
self._pcolls = pcolls
- pcoll_var = lambda pcoll: pipeline_instrument.cacheable_var_by_pcoll_id(
- pipeline_instrument.pcolls_to_pcoll_id.get(str(pcoll), None))
+ name_by_pcoll = {v: k for k, v in utils.pcoll_by_name().items()}
+ pcoll_var = lambda pcoll: name_by_pcoll.get(pcoll, None)
Review comment:
You can inline this. The lambda was originally written because the line was too long.
##########
File path: sdks/python/apache_beam/runners/interactive/interactive_runner.py
##########
@@ -246,7 +247,7 @@ def read(self, pcoll, include_window_info=False):
If include_window_info is True, then returns the elements as
WindowedValues. Otherwise, return the element as itself.
"""
- key = self._pipeline_instrument.cache_key(pcoll)
+ key = self._pipeline_instrument.cacheable_key(pcoll)
Review comment:
what's the difference between a `cache_key` and a `cacheable_key`?
##########
File path: sdks/python/apache_beam/runners/interactive/interactive_environment.py
##########
@@ -235,9 +239,16 @@ def is_in_notebook(self):
@property
def inspector(self):
"""Gets the singleton InteractiveEnvironmentInspector to retrieve
- information consumable by other applications."""
+ information consumable by other applications such as a notebook
+ extension."""
return self._inspector
+ @property
+ def inspector_with_synthetic(self):
+ """Gets the singleton InteractiveEnvironmentInspector with additional
+ synthetic variables generated by Interactive Beam. Internally used."""
+ return self._inspector_with_synthetic
+
Review comment:
Why have a separate inspector? Specifically, why separate the notion of having an inspector with and without synthetic PCollections?
##########
File path: sdks/python/apache_beam/runners/interactive/messaging/interactive_environment_inspector.py
##########
@@ -36,16 +36,17 @@ class InteractiveEnvironmentInspector(object):
list_inspectables first then communicates back to the kernel and get_val for
usage on the kernel side.
"""
- def __init__(self):
+ def __init__(self, ignore_synthetic=True):
Review comment:
Same question, why do we need this boolean?
##########
File path: sdks/python/apache_beam/runners/interactive/display/pcoll_visualization.py
##########
@@ -236,6 +239,30 @@ def continuous_update_display(): # pylint: disable=unused-variable
return None
+def visualize_computed_pcoll(
+ pcoll_name: str, pcoll: beam.pvalue.PValue) -> None:
+ """A simple visualize alternative.
+
+ When the pcoll_name and pcoll pair identifies a watched and computed
+ PCollection in the current interactive environment without ambiguity, an
+ ElementStream can be built directly from cache.
+ """
+ try:
+ pipeline = ie.current_env().user_pipeline(pcoll.pipeline)
+ cache_manager = ie.current_env().get_cache_manager(pipeline)
+ cache_key = CacheKey.from_pcoll(pcoll_name, pcoll).to_str()
+ reader, _ = cache_manager.read('full', cache_key)
+ visualize(
+ ElementStream(pcoll, pcoll_name, cache_key, float('inf'), float('inf')),
+ element_type=pcoll.element_type)
Review comment:
Please move this logic to the RecordingManager class as a `read(pcoll: PValue)` method. It makes most sense to bundle as much logic about reading from cache and stream manipulation into that class so that this code isn't duplicated everywhere.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org