You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2020/10/13 18:25:47 UTC

[beam] branch release-2.25.0 updated: [BEAM-11056] Fix warning message and rename old APIs (#13080)

This is an automated email from the ASF dual-hosted git repository.

robinyqiu pushed a commit to branch release-2.25.0
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/release-2.25.0 by this push:
     new 967a79d  [BEAM-11056] Fix warning message and rename old APIs (#13080)
     new 378b662  Merge pull request #13088 from KevinGG/release-2.25.0
967a79d is described below

commit 967a79d44272081915a461e03a64864fd35bef79
Author: Ning Kang <ka...@gmail.com>
AuthorDate: Tue Oct 13 09:01:29 2020 -0700

    [BEAM-11056] Fix warning message and rename old APIs (#13080)
    
    [BEAM-11056] Fix warning message and rename old APIs (#13080)
---
 .../runners/interactive/background_caching_job.py  |  77 +++++------
 .../runners/interactive/interactive_beam.py        | 145 ++++++++++++---------
 .../runners/interactive/interactive_beam_test.py   |   2 +-
 .../runners/interactive/interactive_runner.py      |   2 +-
 .../runners/interactive/interactive_runner_test.py |   2 +-
 .../runners/interactive/options/capture_control.py |   4 +-
 .../runners/interactive/pipeline_instrument.py     |  18 +--
 .../runners/interactive/recording_manager_test.py  |   6 +-
 8 files changed, 139 insertions(+), 117 deletions(-)

diff --git a/sdks/python/apache_beam/runners/interactive/background_caching_job.py b/sdks/python/apache_beam/runners/interactive/background_caching_job.py
index 79cef08..b1d2f28 100644
--- a/sdks/python/apache_beam/runners/interactive/background_caching_job.py
+++ b/sdks/python/apache_beam/runners/interactive/background_caching_job.py
@@ -15,25 +15,25 @@
 # limitations under the License.
 #
 
-"""Module to build and run background caching job.
+"""Module to build and run background source recording jobs.
 
 For internal use only; no backwards-compatibility guarantees.
 
-A background caching job is a job that captures events for all capturable
-sources of a given pipeline. With Interactive Beam, one such job is started when
-a pipeline run happens (which produces a main job in contrast to the background
-caching job) and meets the following conditions:
+A background source recording job is a job that records events for all
+recordable sources of a given pipeline. With Interactive Beam, one such job is
+started when a pipeline run happens (which produces a main job in contrast to
+the background source recording job) and meets the following conditions:
 
-  #. The pipeline contains capturable sources, configured through
-     interactive_beam.options.capturable_sources.
+  #. The pipeline contains recordable sources, configured through
+     interactive_beam.options.recordable_sources.
   #. No such background job is running.
   #. No such background job has completed successfully and the cached events are
-     still valid (invalidated when capturable sources change in the pipeline).
+     still valid (invalidated when recordable sources change in the pipeline).
 
-Once started, the background caching job runs asynchronously until it hits some
-capture limit configured in interactive_beam.options. Meanwhile, the main job
-and future main jobs from the pipeline will run using the deterministic
-replayable captured events until they are invalidated.
+Once started, the background source recording job runs asynchronously until it
+hits some recording limit configured in interactive_beam.options. Meanwhile,
+the main job and future main jobs from the pipeline will run using the
+deterministic replayable recorded events until they are invalidated.
 """
 
 # pytype: skip-file
@@ -54,17 +54,17 @@ _LOGGER = logging.getLogger(__name__)
 
 class BackgroundCachingJob(object):
   """A simple abstraction that controls necessary components of a timed and
-  space limited background caching job.
+  space limited background source recording job.
 
-  A background caching job successfully completes source data capture in 2
-  conditions:
+  A background source recording job successfully completes source data
+  recording in 2 conditions:
 
     #. The job is finite and runs into DONE state;
     #. The job is infinite but hits an interactive_beam.options configured limit
        and gets cancelled into CANCELLED/CANCELLING state.
 
-  In both situations, the background caching job should be treated as done
-  successfully.
+  In both situations, the background source recording job should be treated as
+  done successfully.
   """
   def __init__(self, pipeline_result, limiters):
     self._pipeline_result = pipeline_result
@@ -104,7 +104,7 @@ class BackgroundCachingJob(object):
       return self._pipeline_result.state is PipelineState.RUNNING
 
   def cancel(self):
-    """Cancels this background caching job.
+    """Cancels this background source recording job.
     """
     with self._result_lock:
       if not PipelineState.is_terminal(self._pipeline_result.state):
@@ -122,7 +122,8 @@ class BackgroundCachingJob(object):
 
 
 def attempt_to_run_background_caching_job(runner, user_pipeline, options=None):
-  """Attempts to run a background caching job for a user-defined pipeline.
+  """Attempts to run a background source recording job for a user-defined
+  pipeline.
 
   Returns True if a job was started, False otherwise.
 
@@ -134,7 +135,7 @@ def attempt_to_run_background_caching_job(runner, user_pipeline, options=None):
     attempt_to_cancel_background_caching_job(user_pipeline)
     # Cancel the gRPC server serving the test stream if there is one.
     attempt_to_stop_test_stream_service(user_pipeline)
-    # TODO(BEAM-8335): refactor background caching job logic from
+    # TODO(BEAM-8335): refactor background source recording job logic from
     # pipeline_instrument module to this module and aggregate tests.
     from apache_beam.runners.interactive import pipeline_instrument as instr
     runner_pipeline = beam.pipeline.Pipeline.from_runner_api(
@@ -154,10 +155,10 @@ def attempt_to_run_background_caching_job(runner, user_pipeline, options=None):
 
 
 def is_background_caching_job_needed(user_pipeline):
-  """Determines if a background caching job needs to be started.
+  """Determines if a background source recording job needs to be started.
 
-  It does several state checks and record state changes throughout the process.
-  It is not idempotent to simplify the usage.
+  It does several state checks and recording state changes throughout the
+  process. It is not idempotent to simplify the usage.
   """
   job = ie.current_env().get_background_caching_job(user_pipeline)
   # Checks if the pipeline contains any source that needs to be cached.
@@ -165,9 +166,9 @@ def is_background_caching_job_needed(user_pipeline):
   # If this is True, we can invalidate a previous done/running job if there is
   # one.
   cache_changed = is_source_to_cache_changed(user_pipeline)
-  # When capture replay is disabled, cache is always needed for capturable
+  # When recording replay is disabled, cache is always needed for recordable
   # sources (if any).
-  if need_cache and not ie.current_env().options.enable_capture_replay:
+  if need_cache and not ie.current_env().options.enable_recording_replay:
     from apache_beam.runners.interactive.options import capture_control
     capture_control.evict_captured_data()
     return True
@@ -200,7 +201,7 @@ def is_cache_complete(pipeline_id):
       user_pipeline, update_cached_source_signature=False)
 
   # Stop reading from the cache if the background job is done or the underlying
-  # cache signature changed that requires a new background caching job.
+  # cache signature changed that requires a new background source recording job.
   return is_done or cache_changed
 
 
@@ -210,8 +211,9 @@ def has_source_to_cache(user_pipeline):
   interactive environment into a streaming cache if this has not been done.
   The wrapping doesn't invalidate existing cache in any way.
 
-  This can help determining if a background caching job is needed to write cache
-  for sources and if a test stream service is needed to serve the cache.
+  This can help determining if a background source recording job is needed to
+  write cache for sources and if a test stream service is needed to serve the
+  cache.
 
   Throughout the check, if source-to-cache has changed from the last check, it
   also cleans up the invalidated cache early on.
@@ -235,10 +237,11 @@ def has_source_to_cache(user_pipeline):
 
 
 def attempt_to_cancel_background_caching_job(user_pipeline):
-  """Attempts to cancel background caching job for a user-defined pipeline.
+  """Attempts to cancel background source recording job for a user-defined
+  pipeline.
 
-  If no background caching job needs to be cancelled, NOOP. Otherwise, cancel
-  such job.
+  If no background source recording job needs to be cancelled, NOOP. Otherwise,
+  cancel such job.
   """
   job = ie.current_env().get_background_caching_job(user_pipeline)
   if job:
@@ -273,7 +276,7 @@ def is_source_to_cache_changed(
   transforms for the user-defined pipeline if there is any change.
 
   When it's True, there is addition/deletion/mutation of source transforms that
-  requires a new background caching job.
+  requires a new background source recording job.
   """
   # By default gets empty set if the user_pipeline is first time seen because
   # we can treat it as adding transforms.
@@ -285,8 +288,8 @@ def is_source_to_cache_changed(
   # change by default.
   if is_changed and update_cached_source_signature:
     options = ie.current_env().options
-    # No info needed when capture replay is disabled.
-    if options.enable_capture_replay:
+    # No info needed when recording replay is disabled.
+    if options.enable_recording_replay:
       if not recorded_signature:
 
         def sizeof_fmt(num, suffix='B'):
@@ -301,13 +304,13 @@ def is_source_to_cache_changed(
             'In order to have a deterministic replay, a segment of data will '
             'be recorded from all sources for %s seconds or until a total of '
             '%s have been written to disk.',
-            options.capture_duration.total_seconds(),
-            sizeof_fmt(options.capture_size_limit))
+            options.recording_duration.total_seconds(),
+            sizeof_fmt(options.recording_size_limit))
       else:
         _LOGGER.info(
             'Interactive Beam has detected a new streaming source was '
             'added to the pipeline. In order for the cached streaming '
-            'data to start at the same time, all captured data has been '
+            'data to start at the same time, all recorded data has been '
             'cleared and a new segment of data will be recorded.')
 
     ie.current_env().cleanup(user_pipeline)
diff --git a/sdks/python/apache_beam/runners/interactive/interactive_beam.py b/sdks/python/apache_beam/runners/interactive/interactive_beam.py
index 5fd5ba9..e6865dd 100644
--- a/sdks/python/apache_beam/runners/interactive/interactive_beam.py
+++ b/sdks/python/apache_beam/runners/interactive/interactive_beam.py
@@ -34,6 +34,7 @@ this module in your notebook or application code.
 from __future__ import absolute_import
 
 import logging
+from datetime import timedelta
 
 import pandas as pd
 
@@ -52,98 +53,117 @@ _LOGGER = logging.getLogger(__name__)
 class Options(interactive_options.InteractiveOptions):
   """Options that guide how Interactive Beam works."""
   @property
-  def enable_capture_replay(self):
-    """Whether replayable source data capture should be replayed for multiple
-    PCollection evaluations and pipeline runs as long as the data captured is
+  def enable_recording_replay(self):
+    """Whether replayable source data recorded should be replayed for multiple
+    PCollection evaluations and pipeline runs as long as the data recorded is
     still valid."""
     return self.capture_control._enable_capture_replay
 
-  @enable_capture_replay.setter
-  def enable_capture_replay(self, value):
-    """Sets whether source data capture should be replayed. True - Enables
-    capture of replayable source data so that following PCollection evaluations
-    and pipeline runs always use the same data captured; False - Disables
-    capture of replayable source data so that following PCollection evaluation
-    and pipeline runs always use new data from sources."""
+  @enable_recording_replay.setter
+  def enable_recording_replay(self, value):
+    """Sets whether source data recorded should be replayed. True - Enables
+    recording of replayable source data so that following PCollection
+    evaluations and pipeline runs always use the same data recorded;
+    False - Disables recording of replayable source data so that following
+    PCollection evaluation and pipeline runs always use new data from sources.
+    """
     # This makes sure the log handler is configured correctly in case the
     # options are configured in an early stage.
     _ = ie.current_env()
     if value:
       _LOGGER.info(
-          'Capture replay is enabled. When a PCollection is evaluated or the '
-          'pipeline is executed, existing data captured from previous '
+          'Record replay is enabled. When a PCollection is evaluated or the '
+          'pipeline is executed, existing data recorded from previous '
           'computations will be replayed for consistent results. If no '
-          'captured data is available, new data from capturable sources will '
-          'be captured.')
+          'recorded data is available, new data from recordable sources will '
+          'be recorded.')
     else:
       _LOGGER.info(
-          'Capture replay is disabled. The next time a PCollection is '
+          'Record replay is disabled. The next time a PCollection is '
           'evaluated or the pipeline is executed, new data will always be '
           'consumed from sources in the pipeline. You will not have '
           'replayability until re-enabling this option.')
     self.capture_control._enable_capture_replay = value
 
   @property
-  def capturable_sources(self):
-    """Interactive Beam automatically captures data from sources in this set.
+  def recordable_sources(self):
+    """Interactive Beam automatically records data from sources in this set.
     """
     return self.capture_control._capturable_sources
 
   @property
-  def capture_duration(self):
-    """The data capture of sources ends as soon as the background caching job
-    has run for this long."""
+  def recording_duration(self):
+    """The data recording of sources ends as soon as the background source
+    recording job has run for this long."""
     return self.capture_control._capture_duration
 
-  @capture_duration.setter
-  def capture_duration(self, value):
-    """Sets the capture duration as a timedelta.
+  @recording_duration.setter
+  def recording_duration(self, value):
+    """Sets the recording duration as a timedelta. The input can be a
+    datetime.timedelta, a possitive integer as seconds or a string
+    representation that is parsable by pandas.to_timedelta.
 
     Example::
 
-      # Sets the capture duration limit to 10 seconds.
-      interactive_beam.options.capture_duration = timedelta(seconds=10)
-      # Evicts all captured data if there is any.
-      interactive_beam.evict_captured_data()
-      # The next PCollection evaluation will capture fresh data from sources,
-      # and the data captured will be replayed until another eviction.
-      interactive_beam.collect(some_pcoll)
+      # Sets the recording duration limit to 10 seconds.
+      ib.options.recording_duration = timedelta(seconds=10)
+      ib.options.recording_duration = 10
+      ib.options.recording_duration = '10s'
+      # Explicitly control the recordings.
+      ib.recordings.stop(p)
+      ib.recordings.clear(p)
+      ib.recordings.record(p)
+      # The next PCollection evaluation uses fresh data from sources,
+      # and the data recorded will be replayed until another clear.
+      ib.collect(some_pcoll)
     """
-    assert value.total_seconds() > 0, 'Duration must be a positive value.'
+    duration = None
+    if isinstance(value, int):
+      assert value > 0, 'Duration must be a positive value.'
+      duration = timedelta(seconds=value)
+    elif isinstance(value, str):
+      duration = pd.to_timedelta(value)
+    else:
+      assert isinstance(value, timedelta), ('The input can only abe a '
+        'datetime.timedelta, a possitive integer as seconds, or a string '
+        'representation that is parsable by pandas.to_timedelta.')
+      duration = value
     if self.capture_control._capture_duration.total_seconds(
-    ) != value.total_seconds():
+    ) != duration.total_seconds():
       _ = ie.current_env()
       _LOGGER.info(
-          'You have changed capture duration from %s seconds to %s seconds. '
-          'To allow new data to be captured for the updated duration, the '
+          'You have changed recording duration from %s seconds to %s seconds. '
+          'To allow new data to be recorded for the updated duration the '
           'next time a PCollection is evaluated or the pipeline is executed, '
-          'please invoke evict_captured_data().',
+          'please invoke ib.recordings.stop, ib.recordings.clear and '
+          'ib.recordings.record.',
           self.capture_control._capture_duration.total_seconds(),
-          value.total_seconds())
-      self.capture_control._capture_duration = value
+          duration.total_seconds())
+      self.capture_control._capture_duration = duration
 
   @property
-  def capture_size_limit(self):
-    """The data capture of sources ends as soon as the size (in bytes) of data
-    captured from capturable sources reaches the limit."""
+  def recording_size_limit(self):
+    """The data recording of sources ends as soon as the size (in bytes) of data
+    recorded from recordable sources reaches the limit."""
     return self.capture_control._capture_size_limit
 
-  @capture_size_limit.setter
-  def capture_size_limit(self, value):
-    """Sets the capture size in bytes.
+  @recording_size_limit.setter
+  def recording_size_limit(self, value):
+    """Sets the recording size in bytes.
 
     Example::
 
-      # Sets the capture size limit to 1GB.
-      interactive_beam.options.capture_size_limit = 1e9
+      # Sets the recording size limit to 1GB.
+      interactive_beam.options.recording_size_limit = 1e9
     """
     if self.capture_control._capture_size_limit != value:
       _ = ie.current_env()
       _LOGGER.info(
-          'You have changed capture size limit from %s bytes to %s bytes. To '
-          'allow new data to be captured under the updated size limit, the '
-          'next time a PCollection is evaluated or the pipeline is executed, '
-          'please invoke evict_captured_data().',
+          'You have changed recording size limit from %s bytes to %s bytes. To '
+          'allow new data to be recorded under the updated size limit the '
+          'next time a PCollection is recorded or the pipeline is executed, '
+          'please invoke ib.recordings.stop, ib.recordings.clear and '
+          'ib.recordings.record.',
           self.capture_control._capture_size_limit,
           value)
       self.capture_control._capture_size_limit = value
@@ -194,7 +214,7 @@ class Options(interactive_options.InteractiveOptions):
       # You can also use dateutil.tz to get a timezone.
       tz = dateutil.tz.gettz('US/Eastern')
 
-      interactive_beam.options.capture_size = tz
+      interactive_beam.options.display_timezone = tz
     """
     self._display_timezone = value
 
@@ -203,9 +223,9 @@ class Recordings():
   """An introspection interface for recordings for pipelines.
 
   When a user materializes a PCollection onto disk (eg. ib.show) for a streaming
-  pipeline, a background recording job is started. This job pulls data from all
-  defined unbounded sources for that PCollection's pipeline. The following
-  methods allow for introspection into that background recording job.
+  pipeline, a background source recording job is started. This job pulls data
+  from all defined unbounded sources for that PCollection's pipeline. The
+  following methods allow for introspection into that background recording job.
   """
   def describe(self, pipeline=None):
     # type: (Optional[beam.Pipeline]) -> dict[str, Any]
@@ -245,7 +265,7 @@ class Recordings():
   def stop(self, pipeline):
     # type: (beam.Pipeline) -> None
 
-    """Stops the background recording of the given pipeline."""
+    """Stops the background source recording of the given pipeline."""
 
     recording_manager = ie.current_env().get_recording_manager(
         pipeline, create_if_absent=True)
@@ -254,8 +274,8 @@ class Recordings():
   def record(self, pipeline):
     # type: (beam.Pipeline) -> bool
 
-    """Starts a background recording job for the given pipeline. Returns True if
-    the recording job was started.
+    """Starts a background source recording job for the given pipeline. Returns
+    True if the recording job was started.
     """
 
     description = self.describe(pipeline)
@@ -279,11 +299,10 @@ class Recordings():
 
 # Users can set options to guide how Interactive Beam works.
 # Examples:
-# from datetime import timedelta
 # from apache_beam.runners.interactive import interactive_beam as ib
-# ib.options.enable_capture_replay = False/True
-# ib.options.capture_duration = timedelta(seconds=60)
-# ib.options.capturable_sources.add(SourceClass)
+# ib.options.enable_recording_replay = False/True
+# ib.options.recording_duration = '1m'
+# ib.options.recordable_sources.add(SourceClass)
 # Check the docstrings for detailed usages.
 options = Options()
 
@@ -578,11 +597,11 @@ def show_graph(pipeline):
   pipeline_graph.PipelineGraph(pipeline).display_graph()
 
 
-def evict_captured_data(pipeline=None):
-  """Forcefully evicts all captured replayable data for the given pipeline. If
+def evict_recorded_data(pipeline=None):
+  """Forcefully evicts all recorded replayable data for the given pipeline. If
   no pipeline is specified, evicts for all user defined pipelines.
 
-  Once invoked, Interactive Beam will capture new data based on the guidance of
+  Once invoked, Interactive Beam will record new data based on the guidance of
   options the next time it evaluates/visualizes PCollections or runs pipelines.
   """
   from apache_beam.runners.interactive.options import capture_control
diff --git a/sdks/python/apache_beam/runners/interactive/interactive_beam_test.py b/sdks/python/apache_beam/runners/interactive/interactive_beam_test.py
index 5bf76b0..b76e73d 100644
--- a/sdks/python/apache_beam/runners/interactive/interactive_beam_test.py
+++ b/sdks/python/apache_beam/runners/interactive/interactive_beam_test.py
@@ -222,7 +222,7 @@ class InteractiveBeamTest(unittest.TestCase):
     """Tests that recording pipeline succeeds."""
 
     # Add the TestStream so that it can be cached.
-    ib.options.capturable_sources.add(TestStream)
+    ib.options.recordable_sources.add(TestStream)
 
     # Create a pipeline with an arbitrary amonunt of elements.
     p = beam.Pipeline(
diff --git a/sdks/python/apache_beam/runners/interactive/interactive_runner.py b/sdks/python/apache_beam/runners/interactive/interactive_runner.py
index bca6743..99b4c7d 100644
--- a/sdks/python/apache_beam/runners/interactive/interactive_runner.py
+++ b/sdks/python/apache_beam/runners/interactive/interactive_runner.py
@@ -127,7 +127,7 @@ class InteractiveRunner(runners.PipelineRunner):
     return self._underlying_runner.apply(transform, pvalueish, options)
 
   def run_pipeline(self, pipeline, options):
-    if not ie.current_env().options.enable_capture_replay:
+    if not ie.current_env().options.enable_recording_replay:
       capture_control.evict_captured_data()
     if self._force_compute:
       ie.current_env().evict_computed_pcollections()
diff --git a/sdks/python/apache_beam/runners/interactive/interactive_runner_test.py b/sdks/python/apache_beam/runners/interactive/interactive_runner_test.py
index a1b3374..8c8d831 100644
--- a/sdks/python/apache_beam/runners/interactive/interactive_runner_test.py
+++ b/sdks/python/apache_beam/runners/interactive/interactive_runner_test.py
@@ -168,7 +168,7 @@ class InteractiveRunnerTest(unittest.TestCase):
         return words
 
     # Add the TestStream so that it can be cached.
-    ib.options.capturable_sources.add(TestStream)
+    ib.options.recordable_sources.add(TestStream)
 
     p = beam.Pipeline(
         runner=interactive_runner.InteractiveRunner(),
diff --git a/sdks/python/apache_beam/runners/interactive/options/capture_control.py b/sdks/python/apache_beam/runners/interactive/options/capture_control.py
index ab877b5..837ff57 100644
--- a/sdks/python/apache_beam/runners/interactive/options/capture_control.py
+++ b/sdks/python/apache_beam/runners/interactive/options/capture_control.py
@@ -68,9 +68,9 @@ def evict_captured_data(pipeline=None):
 
   In future PCollection evaluation/visualization and pipeline
   runs, Interactive Beam will capture fresh data."""
-  if ie.current_env().options.enable_capture_replay:
+  if ie.current_env().options.enable_recording_replay:
     _LOGGER.info(
-        'You have requested Interactive Beam to evict all recorded'
+        'You have requested Interactive Beam to evict all recorded '
         'data that could be deterministically replayed among multiple '
         'pipeline runs.')
   ie.current_env().cleanup(pipeline)
diff --git a/sdks/python/apache_beam/runners/interactive/pipeline_instrument.py b/sdks/python/apache_beam/runners/interactive/pipeline_instrument.py
index cb80e37..8f603e1 100644
--- a/sdks/python/apache_beam/runners/interactive/pipeline_instrument.py
+++ b/sdks/python/apache_beam/runners/interactive/pipeline_instrument.py
@@ -392,7 +392,7 @@ class PipelineInstrument(object):
 
   @property
   def has_unbounded_sources(self):
-    """Returns whether the pipeline has any capturable sources.
+    """Returns whether the pipeline has any recordable sources.
     """
     return len(self._unbounded_sources) > 0
 
@@ -469,7 +469,7 @@ class PipelineInstrument(object):
 
       def visit_transform(self, transform_node):
         if isinstance(transform_node.transform,
-                      tuple(ie.current_env().options.capturable_sources)):
+                      tuple(ie.current_env().options.recordable_sources)):
           unbounded_source_pcolls.update(transform_node.outputs.values())
         cacheable_inputs.update(self._pin._cacheable_inputs(transform_node))
         ins, outs = self._pin._all_inputs_outputs(transform_node)
@@ -610,14 +610,14 @@ class PipelineInstrument(object):
     if pcoll.pipeline is not pipeline:
       return
 
-    # Ignore the unbounded reads from capturable sources as these will be pruned
+    # Ignore the unbounded reads from recordable sources as these will be pruned
     # out using the PipelineFragment later on.
     if ignore_unbounded_reads:
       ignore = False
       producer = pcoll.producer
       while producer:
         if isinstance(producer.transform,
-                      tuple(ie.current_env().options.capturable_sources)):
+                      tuple(ie.current_env().options.recordable_sources)):
           ignore = True
           break
         producer = producer.parent
@@ -924,16 +924,16 @@ def cacheable_key(pcoll, pcolls_to_pcoll_id, pcoll_version_map=None):
 
 
 def has_unbounded_sources(pipeline):
-  """Checks if a given pipeline has capturable sources."""
+  """Checks if a given pipeline has recordable sources."""
   return len(unbounded_sources(pipeline)) > 0
 
 
 def unbounded_sources(pipeline):
-  """Returns a pipeline's capturable sources."""
+  """Returns a pipeline's recordable sources."""
   class CheckUnboundednessVisitor(PipelineVisitor):
     """Visitor checks if there are any unbounded read sources in the Pipeline.
 
-    Visitor visits all nodes and checks if it is an instance of capturable
+    Visitor visits all nodes and checks if it is an instance of recordable
     sources.
     """
     def __init__(self):
@@ -944,7 +944,7 @@ def unbounded_sources(pipeline):
 
     def visit_transform(self, transform_node):
       if isinstance(transform_node.transform,
-                    tuple(ie.current_env().options.capturable_sources)):
+                    tuple(ie.current_env().options.recordable_sources)):
         self.unbounded_sources.append(transform_node)
 
   v = CheckUnboundednessVisitor()
@@ -1006,7 +1006,7 @@ def watch_sources(pipeline):
 
     def visit_transform(self, transform_node):
       if isinstance(transform_node.transform,
-                    tuple(ie.current_env().options.capturable_sources)):
+                    tuple(ie.current_env().options.recordable_sources)):
         for pcoll in transform_node.outputs.values():
           ie.current_env().watch({'synthetic_var_' + str(id(pcoll)): pcoll})
 
diff --git a/sdks/python/apache_beam/runners/interactive/recording_manager_test.py b/sdks/python/apache_beam/runners/interactive/recording_manager_test.py
index 29a9b4d..c8c30a3 100644
--- a/sdks/python/apache_beam/runners/interactive/recording_manager_test.py
+++ b/sdks/python/apache_beam/runners/interactive/recording_manager_test.py
@@ -367,7 +367,7 @@ class RecordingManagerTest(unittest.TestCase):
       'This test requires at least Python 3.6 to work.')
   def test_cancel_stops_recording(self):
     # Add the TestStream so that it can be cached.
-    ib.options.capturable_sources.add(TestStream)
+    ib.options.recordable_sources.add(TestStream)
 
     p = beam.Pipeline(
         InteractiveRunner(), options=PipelineOptions(streaming=True))
@@ -410,7 +410,7 @@ class RecordingManagerTest(unittest.TestCase):
     cache ensures that correct results are computed every run.
     """
     # Add the TestStream so that it can be cached.
-    ib.options.capturable_sources.add(TestStream)
+    ib.options.recordable_sources.add(TestStream)
     p = beam.Pipeline(
         InteractiveRunner(), options=PipelineOptions(streaming=True))
     elems = (
@@ -500,7 +500,7 @@ class RecordingManagerTest(unittest.TestCase):
       'This test requires at least Python 3.6 to work.')
   def test_record_pipeline(self):
     # Add the TestStream so that it can be cached.
-    ib.options.capturable_sources.add(TestStream)
+    ib.options.recordable_sources.add(TestStream)
     p = beam.Pipeline(
         InteractiveRunner(), options=PipelineOptions(streaming=True))
     # pylint: disable=unused-variable