You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2020/09/10 17:46:43 UTC

[GitHub] [beam] KevinGG commented on a change in pull request #12704: [BEAM-10603] Implement the new Large Source Recording API.

KevinGG commented on a change in pull request #12704:
URL: https://github.com/apache/beam/pull/12704#discussion_r486514105



##########
File path: sdks/python/apache_beam/runners/interactive/interactive_beam.py
##########
@@ -342,121 +343,79 @@ def show(*pcolls, **configs):
     assert isinstance(pcoll, beam.pvalue.PCollection), (
         '{} is not an apache_beam.pvalue.PCollection.'.format(pcoll))
   user_pipeline = pcolls[0].pipeline
-  for pcoll in pcolls:
-    assert pcoll.pipeline is user_pipeline, (
-        '{} belongs to a different user-defined pipeline ({}) than that of'
-        ' other PCollections ({}).'.format(
-            pcoll, pcoll.pipeline, user_pipeline))
+
   # TODO(BEAM-8288): Remove below pops and assertion once Python 2 is
   # deprecated from Beam.
   include_window_info = configs.pop('include_window_info', False)
   visualize_data = configs.pop('visualize_data', False)
+  n = configs.pop('n', 'inf')
+  duration = configs.pop('duration', 'inf')
+
+  if n == 'inf':
+    n = float('inf')
+
+  if duration == 'inf':
+    duration = float('inf')
+
   # This assertion is to protect the backward compatibility for function
   # signature change after Python 2 deprecation.
   assert not configs, (
       'The only configs supported are include_window_info and '
       'visualize_data.')

Review comment:
       Include the new configs `n` and `duration`.

##########
File path: sdks/python/apache_beam/runners/interactive/interactive_beam.py
##########
@@ -273,6 +270,10 @@ def show(*pcolls, **configs):
   The given pcolls can be dictionary of PCollections (as values), or iterable
   of PCollections or plain PCollection values.
 
+  The user can specify either the max number of elements with `n` to read
+  or the maximum duration of elements to read with `duration`. When a limiter is
+  not supplied, it is assumed to be infinite.
+
   There are 2 boolean configurations:

Review comment:
       Nit: how about merging these configurations in the docstrings.
   Something like:
   ```
   There are 4 configurations: 
     #. include_window_info=<True/False>. If True, windowing information of the
          data will be visualized too. Default is false.
     #. visualize_data=<True/False>. By default, the visualization contains data
          tables rendering data from given pcolls separately as if they are
          converted into dataframes. If visualize_data is True, there will be a
          more dive-in widget and statistically overview widget of the data.
          Otherwise, those 2 data visualization widgets will not be displayed.
     #. n=<int>. Max number of elements to visualize. Default 'inf'.
     #. duration=<int>. Max duration of elements to read. Default 'inf'.
   ```
   

##########
File path: sdks/python/apache_beam/runners/interactive/interactive_beam.py
##########
@@ -342,121 +343,79 @@ def show(*pcolls, **configs):
     assert isinstance(pcoll, beam.pvalue.PCollection), (
         '{} is not an apache_beam.pvalue.PCollection.'.format(pcoll))
   user_pipeline = pcolls[0].pipeline
-  for pcoll in pcolls:
-    assert pcoll.pipeline is user_pipeline, (
-        '{} belongs to a different user-defined pipeline ({}) than that of'
-        ' other PCollections ({}).'.format(
-            pcoll, pcoll.pipeline, user_pipeline))
+
   # TODO(BEAM-8288): Remove below pops and assertion once Python 2 is
   # deprecated from Beam.
   include_window_info = configs.pop('include_window_info', False)
   visualize_data = configs.pop('visualize_data', False)
+  n = configs.pop('n', 'inf')
+  duration = configs.pop('duration', 'inf')
+
+  if n == 'inf':
+    n = float('inf')
+
+  if duration == 'inf':
+    duration = float('inf')
+
   # This assertion is to protect the backward compatibility for function
   # signature change after Python 2 deprecation.
   assert not configs, (
       'The only configs supported are include_window_info and '
       'visualize_data.')
-  runner = user_pipeline.runner
-  if isinstance(runner, ir.InteractiveRunner):
-    runner = runner._underlying_runner
-
-  # Make sure that sources without a user reference are still cached.
-  pi.watch_sources(user_pipeline)
-
-  # Make sure that all PCollections to be shown are watched. If a PCollection
-  # has not been watched, make up a variable name for that PCollection and watch
-  # it. No validation is needed here because the watch logic can handle
-  # arbitrary variables.
-  watched_pcollections = set()
-  for watching in ie.current_env().watching():
-    for _, val in watching:
-      if hasattr(val, '__class__') and isinstance(val, beam.pvalue.PCollection):
-        watched_pcollections.add(val)
-  for pcoll in pcolls:
-    if pcoll not in watched_pcollections:
-      watch({'anonymous_pcollection_{}'.format(id(pcoll)): pcoll})
-
-  if ie.current_env().is_in_ipython:
-    warnings.filterwarnings(
-        'ignore',
-        'options is deprecated since First stable release. References to '
-        '<pipeline>.options will not be supported',
-        category=DeprecationWarning)
-  # Attempt to run background caching job since we have the reference to the
-  # user-defined pipeline.
-  bcj.attempt_to_run_background_caching_job(
-      runner, user_pipeline, user_pipeline.options)
-
-  pcolls = set(pcolls)
-  computed_pcolls = set()
-  for pcoll in pcolls:
-    if pcoll in ie.current_env().computed_pcollections:
-      computed_pcolls.add(pcoll)
-  pcolls = pcolls.difference(computed_pcolls)
-  # If in notebook, static plotting computed pcolls as computation is done.
-  if ie.current_env().is_in_notebook:
-    for pcoll in computed_pcolls:
-      visualize(
-          pcoll,
-          include_window_info=include_window_info,
-          display_facets=visualize_data)
-  elif ie.current_env().is_in_ipython:
-    for pcoll in computed_pcolls:
-      visualize(pcoll, include_window_info=include_window_info)
-
-  if not pcolls:
-    return
-
-  # Build a pipeline fragment for the PCollections and run it.
-  result = pf.PipelineFragment(list(pcolls), user_pipeline.options).run()
-  ie.current_env().set_pipeline_result(user_pipeline, result)
-
-  # If in notebook, dynamic plotting as computation goes.
-  if ie.current_env().is_in_notebook:
-    for pcoll in pcolls:
-      visualize(
-          pcoll,
-          dynamic_plotting_interval=1,
-          include_window_info=include_window_info,
-          display_facets=visualize_data)
-
-  # Invoke wait_until_finish to ensure the blocking nature of this API without
-  # relying on the run to be blocking.
-  result.wait_until_finish()
-
-  # If just in ipython shell, plotting once when the computation is completed.
-  if ie.current_env().is_in_ipython and not ie.current_env().is_in_notebook:
-    for pcoll in pcolls:
-      visualize(pcoll, include_window_info=include_window_info)
-
-  # If the pipeline execution is successful at this stage, mark the computation
-  # completeness for the given PCollections so that when further `show`
-  # invocation occurs, Interactive Beam wouldn't need to re-compute them.
-  if result.state is beam.runners.runner.PipelineState.DONE:
-    ie.current_env().mark_pcollection_computed(pcolls)
-
-
-def collect(pcoll, include_window_info=False):
-  """Materializes all of the elements from a PCollection into a Dataframe.
 
-  For example::
+  recording_manager = RecordingManager(user_pipeline)
+  recording = recording_manager.record(
+      pcolls, max_n=n, max_duration_secs=duration)
+
+  # Catch a KeyboardInterrupt to gracefully cancel the recording and
+  # visualizations.
+  try:
+    # If in notebook, static plotting computed pcolls as computation is done.
+    if ie.current_env().is_in_notebook:
+      for stream in recording.computed().values():
+        visualize(
+            stream,
+            include_window_info=include_window_info,
+            display_facets=visualize_data)
+    elif ie.current_env().is_in_ipython:
+      for stream in recording.computed().values():
+        visualize(stream, include_window_info=include_window_info)

Review comment:
       I assume you've verified that, when the recording is not computed, this (static) and below (with dynamic_plotting_interval) `visualize` statements will update the same visualization. Is it because you are generating a consistent display_id from the recording manager?

##########
File path: sdks/python/apache_beam/runners/interactive/interactive_beam.py
##########
@@ -342,121 +343,79 @@ def show(*pcolls, **configs):
     assert isinstance(pcoll, beam.pvalue.PCollection), (
         '{} is not an apache_beam.pvalue.PCollection.'.format(pcoll))
   user_pipeline = pcolls[0].pipeline
-  for pcoll in pcolls:
-    assert pcoll.pipeline is user_pipeline, (
-        '{} belongs to a different user-defined pipeline ({}) than that of'
-        ' other PCollections ({}).'.format(
-            pcoll, pcoll.pipeline, user_pipeline))
+
   # TODO(BEAM-8288): Remove below pops and assertion once Python 2 is
   # deprecated from Beam.
   include_window_info = configs.pop('include_window_info', False)
   visualize_data = configs.pop('visualize_data', False)
+  n = configs.pop('n', 'inf')
+  duration = configs.pop('duration', 'inf')
+
+  if n == 'inf':
+    n = float('inf')
+
+  if duration == 'inf':
+    duration = float('inf')

Review comment:
       Do we need any assertion in the future? Like `n` needs to be a positive integer. And `duration` will be a string such as `1h2m3s`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org