You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2020/10/13 18:25:47 UTC
[beam] branch release-2.25.0 updated: [BEAM-11056] Fix warning
message and rename old APIs (#13080)
This is an automated email from the ASF dual-hosted git repository.
robinyqiu pushed a commit to branch release-2.25.0
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/release-2.25.0 by this push:
new 967a79d [BEAM-11056] Fix warning message and rename old APIs (#13080)
new 378b662 Merge pull request #13088 from KevinGG/release-2.25.0
967a79d is described below
commit 967a79d44272081915a461e03a64864fd35bef79
Author: Ning Kang <ka...@gmail.com>
AuthorDate: Tue Oct 13 09:01:29 2020 -0700
[BEAM-11056] Fix warning message and rename old APIs (#13080)
[BEAM-11056] Fix warning message and rename old APIs (#13080)
---
.../runners/interactive/background_caching_job.py | 77 +++++------
.../runners/interactive/interactive_beam.py | 145 ++++++++++++---------
.../runners/interactive/interactive_beam_test.py | 2 +-
.../runners/interactive/interactive_runner.py | 2 +-
.../runners/interactive/interactive_runner_test.py | 2 +-
.../runners/interactive/options/capture_control.py | 4 +-
.../runners/interactive/pipeline_instrument.py | 18 +--
.../runners/interactive/recording_manager_test.py | 6 +-
8 files changed, 139 insertions(+), 117 deletions(-)
diff --git a/sdks/python/apache_beam/runners/interactive/background_caching_job.py b/sdks/python/apache_beam/runners/interactive/background_caching_job.py
index 79cef08..b1d2f28 100644
--- a/sdks/python/apache_beam/runners/interactive/background_caching_job.py
+++ b/sdks/python/apache_beam/runners/interactive/background_caching_job.py
@@ -15,25 +15,25 @@
# limitations under the License.
#
-"""Module to build and run background caching job.
+"""Module to build and run background source recording jobs.
For internal use only; no backwards-compatibility guarantees.
-A background caching job is a job that captures events for all capturable
-sources of a given pipeline. With Interactive Beam, one such job is started when
-a pipeline run happens (which produces a main job in contrast to the background
-caching job) and meets the following conditions:
+A background source recording job is a job that records events for all
+recordable sources of a given pipeline. With Interactive Beam, one such job is
+started when a pipeline run happens (which produces a main job in contrast to
+the background source recording job) and meets the following conditions:
- #. The pipeline contains capturable sources, configured through
- interactive_beam.options.capturable_sources.
+ #. The pipeline contains recordable sources, configured through
+ interactive_beam.options.recordable_sources.
#. No such background job is running.
#. No such background job has completed successfully and the cached events are
- still valid (invalidated when capturable sources change in the pipeline).
+ still valid (invalidated when recordable sources change in the pipeline).
-Once started, the background caching job runs asynchronously until it hits some
-capture limit configured in interactive_beam.options. Meanwhile, the main job
-and future main jobs from the pipeline will run using the deterministic
-replayable captured events until they are invalidated.
+Once started, the background source recording job runs asynchronously until it
+hits some recording limit configured in interactive_beam.options. Meanwhile,
+the main job and future main jobs from the pipeline will run using the
+deterministic replayable recorded events until they are invalidated.
"""
# pytype: skip-file
@@ -54,17 +54,17 @@ _LOGGER = logging.getLogger(__name__)
class BackgroundCachingJob(object):
"""A simple abstraction that controls necessary components of a timed and
- space limited background caching job.
+ space limited background source recording job.
- A background caching job successfully completes source data capture in 2
- conditions:
+ A background source recording job successfully completes source data
+ recording in 2 conditions:
#. The job is finite and runs into DONE state;
#. The job is infinite but hits an interactive_beam.options configured limit
and gets cancelled into CANCELLED/CANCELLING state.
- In both situations, the background caching job should be treated as done
- successfully.
+ In both situations, the background source recording job should be treated as
+ done successfully.
"""
def __init__(self, pipeline_result, limiters):
self._pipeline_result = pipeline_result
@@ -104,7 +104,7 @@ class BackgroundCachingJob(object):
return self._pipeline_result.state is PipelineState.RUNNING
def cancel(self):
- """Cancels this background caching job.
+ """Cancels this background source recording job.
"""
with self._result_lock:
if not PipelineState.is_terminal(self._pipeline_result.state):
@@ -122,7 +122,8 @@ class BackgroundCachingJob(object):
def attempt_to_run_background_caching_job(runner, user_pipeline, options=None):
- """Attempts to run a background caching job for a user-defined pipeline.
+ """Attempts to run a background source recording job for a user-defined
+ pipeline.
Returns True if a job was started, False otherwise.
@@ -134,7 +135,7 @@ def attempt_to_run_background_caching_job(runner, user_pipeline, options=None):
attempt_to_cancel_background_caching_job(user_pipeline)
# Cancel the gRPC server serving the test stream if there is one.
attempt_to_stop_test_stream_service(user_pipeline)
- # TODO(BEAM-8335): refactor background caching job logic from
+ # TODO(BEAM-8335): refactor background source recording job logic from
# pipeline_instrument module to this module and aggregate tests.
from apache_beam.runners.interactive import pipeline_instrument as instr
runner_pipeline = beam.pipeline.Pipeline.from_runner_api(
@@ -154,10 +155,10 @@ def attempt_to_run_background_caching_job(runner, user_pipeline, options=None):
def is_background_caching_job_needed(user_pipeline):
- """Determines if a background caching job needs to be started.
+ """Determines if a background source recording job needs to be started.
- It does several state checks and record state changes throughout the process.
- It is not idempotent to simplify the usage.
+ It does several state checks and recording state changes throughout the
+ process. It is not idempotent to simplify the usage.
"""
job = ie.current_env().get_background_caching_job(user_pipeline)
# Checks if the pipeline contains any source that needs to be cached.
@@ -165,9 +166,9 @@ def is_background_caching_job_needed(user_pipeline):
# If this is True, we can invalidate a previous done/running job if there is
# one.
cache_changed = is_source_to_cache_changed(user_pipeline)
- # When capture replay is disabled, cache is always needed for capturable
+ # When recording replay is disabled, cache is always needed for recordable
# sources (if any).
- if need_cache and not ie.current_env().options.enable_capture_replay:
+ if need_cache and not ie.current_env().options.enable_recording_replay:
from apache_beam.runners.interactive.options import capture_control
capture_control.evict_captured_data()
return True
@@ -200,7 +201,7 @@ def is_cache_complete(pipeline_id):
user_pipeline, update_cached_source_signature=False)
# Stop reading from the cache if the background job is done or the underlying
- # cache signature changed that requires a new background caching job.
+ # cache signature changed that requires a new background source recording job.
return is_done or cache_changed
@@ -210,8 +211,9 @@ def has_source_to_cache(user_pipeline):
interactive environment into a streaming cache if this has not been done.
The wrapping doesn't invalidate existing cache in any way.
- This can help determining if a background caching job is needed to write cache
- for sources and if a test stream service is needed to serve the cache.
+ This can help determining if a background source recording job is needed to
+ write cache for sources and if a test stream service is needed to serve the
+ cache.
Throughout the check, if source-to-cache has changed from the last check, it
also cleans up the invalidated cache early on.
@@ -235,10 +237,11 @@ def has_source_to_cache(user_pipeline):
def attempt_to_cancel_background_caching_job(user_pipeline):
- """Attempts to cancel background caching job for a user-defined pipeline.
+ """Attempts to cancel background source recording job for a user-defined
+ pipeline.
- If no background caching job needs to be cancelled, NOOP. Otherwise, cancel
- such job.
+ If no background source recording job needs to be cancelled, NOOP. Otherwise,
+ cancel such job.
"""
job = ie.current_env().get_background_caching_job(user_pipeline)
if job:
@@ -273,7 +276,7 @@ def is_source_to_cache_changed(
transforms for the user-defined pipeline if there is any change.
When it's True, there is addition/deletion/mutation of source transforms that
- requires a new background caching job.
+ requires a new background source recording job.
"""
# By default gets empty set if the user_pipeline is first time seen because
# we can treat it as adding transforms.
@@ -285,8 +288,8 @@ def is_source_to_cache_changed(
# change by default.
if is_changed and update_cached_source_signature:
options = ie.current_env().options
- # No info needed when capture replay is disabled.
- if options.enable_capture_replay:
+ # No info needed when recording replay is disabled.
+ if options.enable_recording_replay:
if not recorded_signature:
def sizeof_fmt(num, suffix='B'):
@@ -301,13 +304,13 @@ def is_source_to_cache_changed(
'In order to have a deterministic replay, a segment of data will '
'be recorded from all sources for %s seconds or until a total of '
'%s have been written to disk.',
- options.capture_duration.total_seconds(),
- sizeof_fmt(options.capture_size_limit))
+ options.recording_duration.total_seconds(),
+ sizeof_fmt(options.recording_size_limit))
else:
_LOGGER.info(
'Interactive Beam has detected a new streaming source was '
'added to the pipeline. In order for the cached streaming '
- 'data to start at the same time, all captured data has been '
+ 'data to start at the same time, all recorded data has been '
'cleared and a new segment of data will be recorded.')
ie.current_env().cleanup(user_pipeline)
diff --git a/sdks/python/apache_beam/runners/interactive/interactive_beam.py b/sdks/python/apache_beam/runners/interactive/interactive_beam.py
index 5fd5ba9..e6865dd 100644
--- a/sdks/python/apache_beam/runners/interactive/interactive_beam.py
+++ b/sdks/python/apache_beam/runners/interactive/interactive_beam.py
@@ -34,6 +34,7 @@ this module in your notebook or application code.
from __future__ import absolute_import
import logging
+from datetime import timedelta
import pandas as pd
@@ -52,98 +53,117 @@ _LOGGER = logging.getLogger(__name__)
class Options(interactive_options.InteractiveOptions):
"""Options that guide how Interactive Beam works."""
@property
- def enable_capture_replay(self):
- """Whether replayable source data capture should be replayed for multiple
- PCollection evaluations and pipeline runs as long as the data captured is
+ def enable_recording_replay(self):
+ """Whether replayable source data recorded should be replayed for multiple
+ PCollection evaluations and pipeline runs as long as the data recorded is
still valid."""
return self.capture_control._enable_capture_replay
- @enable_capture_replay.setter
- def enable_capture_replay(self, value):
- """Sets whether source data capture should be replayed. True - Enables
- capture of replayable source data so that following PCollection evaluations
- and pipeline runs always use the same data captured; False - Disables
- capture of replayable source data so that following PCollection evaluation
- and pipeline runs always use new data from sources."""
+ @enable_recording_replay.setter
+ def enable_recording_replay(self, value):
+ """Sets whether source data recorded should be replayed. True - Enables
+ recording of replayable source data so that following PCollection
+ evaluations and pipeline runs always use the same data recorded;
+ False - Disables recording of replayable source data so that following
+ PCollection evaluation and pipeline runs always use new data from sources.
+ """
# This makes sure the log handler is configured correctly in case the
# options are configured in an early stage.
_ = ie.current_env()
if value:
_LOGGER.info(
- 'Capture replay is enabled. When a PCollection is evaluated or the '
- 'pipeline is executed, existing data captured from previous '
+ 'Record replay is enabled. When a PCollection is evaluated or the '
+ 'pipeline is executed, existing data recorded from previous '
'computations will be replayed for consistent results. If no '
- 'captured data is available, new data from capturable sources will '
- 'be captured.')
+ 'recorded data is available, new data from recordable sources will '
+ 'be recorded.')
else:
_LOGGER.info(
- 'Capture replay is disabled. The next time a PCollection is '
+ 'Record replay is disabled. The next time a PCollection is '
'evaluated or the pipeline is executed, new data will always be '
'consumed from sources in the pipeline. You will not have '
'replayability until re-enabling this option.')
self.capture_control._enable_capture_replay = value
@property
- def capturable_sources(self):
- """Interactive Beam automatically captures data from sources in this set.
+ def recordable_sources(self):
+ """Interactive Beam automatically records data from sources in this set.
"""
return self.capture_control._capturable_sources
@property
- def capture_duration(self):
- """The data capture of sources ends as soon as the background caching job
- has run for this long."""
+ def recording_duration(self):
+ """The data recording of sources ends as soon as the background source
+ recording job has run for this long."""
return self.capture_control._capture_duration
- @capture_duration.setter
- def capture_duration(self, value):
- """Sets the capture duration as a timedelta.
+ @recording_duration.setter
+ def recording_duration(self, value):
+ """Sets the recording duration as a timedelta. The input can be a
+ datetime.timedelta, a possitive integer as seconds or a string
+ representation that is parsable by pandas.to_timedelta.
Example::
- # Sets the capture duration limit to 10 seconds.
- interactive_beam.options.capture_duration = timedelta(seconds=10)
- # Evicts all captured data if there is any.
- interactive_beam.evict_captured_data()
- # The next PCollection evaluation will capture fresh data from sources,
- # and the data captured will be replayed until another eviction.
- interactive_beam.collect(some_pcoll)
+ # Sets the recording duration limit to 10 seconds.
+ ib.options.recording_duration = timedelta(seconds=10)
+ ib.options.recording_duration = 10
+ ib.options.recording_duration = '10s'
+ # Explicitly control the recordings.
+ ib.recordings.stop(p)
+ ib.recordings.clear(p)
+ ib.recordings.record(p)
+ # The next PCollection evaluation uses fresh data from sources,
+ # and the data recorded will be replayed until another clear.
+ ib.collect(some_pcoll)
"""
- assert value.total_seconds() > 0, 'Duration must be a positive value.'
+ duration = None
+ if isinstance(value, int):
+ assert value > 0, 'Duration must be a positive value.'
+ duration = timedelta(seconds=value)
+ elif isinstance(value, str):
+ duration = pd.to_timedelta(value)
+ else:
+ assert isinstance(value, timedelta), ('The input can only abe a '
+ 'datetime.timedelta, a possitive integer as seconds, or a string '
+ 'representation that is parsable by pandas.to_timedelta.')
+ duration = value
if self.capture_control._capture_duration.total_seconds(
- ) != value.total_seconds():
+ ) != duration.total_seconds():
_ = ie.current_env()
_LOGGER.info(
- 'You have changed capture duration from %s seconds to %s seconds. '
- 'To allow new data to be captured for the updated duration, the '
+ 'You have changed recording duration from %s seconds to %s seconds. '
+ 'To allow new data to be recorded for the updated duration the '
'next time a PCollection is evaluated or the pipeline is executed, '
- 'please invoke evict_captured_data().',
+ 'please invoke ib.recordings.stop, ib.recordings.clear and '
+ 'ib.recordings.record.',
self.capture_control._capture_duration.total_seconds(),
- value.total_seconds())
- self.capture_control._capture_duration = value
+ duration.total_seconds())
+ self.capture_control._capture_duration = duration
@property
- def capture_size_limit(self):
- """The data capture of sources ends as soon as the size (in bytes) of data
- captured from capturable sources reaches the limit."""
+ def recording_size_limit(self):
+ """The data recording of sources ends as soon as the size (in bytes) of data
+ recorded from recordable sources reaches the limit."""
return self.capture_control._capture_size_limit
- @capture_size_limit.setter
- def capture_size_limit(self, value):
- """Sets the capture size in bytes.
+ @recording_size_limit.setter
+ def recording_size_limit(self, value):
+ """Sets the recording size in bytes.
Example::
- # Sets the capture size limit to 1GB.
- interactive_beam.options.capture_size_limit = 1e9
+ # Sets the recording size limit to 1GB.
+ interactive_beam.options.recording_size_limit = 1e9
"""
if self.capture_control._capture_size_limit != value:
_ = ie.current_env()
_LOGGER.info(
- 'You have changed capture size limit from %s bytes to %s bytes. To '
- 'allow new data to be captured under the updated size limit, the '
- 'next time a PCollection is evaluated or the pipeline is executed, '
- 'please invoke evict_captured_data().',
+ 'You have changed recording size limit from %s bytes to %s bytes. To '
+ 'allow new data to be recorded under the updated size limit the '
+ 'next time a PCollection is recorded or the pipeline is executed, '
+ 'please invoke ib.recordings.stop, ib.recordings.clear and '
+ 'ib.recordings.record.',
self.capture_control._capture_size_limit,
value)
self.capture_control._capture_size_limit = value
@@ -194,7 +214,7 @@ class Options(interactive_options.InteractiveOptions):
# You can also use dateutil.tz to get a timezone.
tz = dateutil.tz.gettz('US/Eastern')
- interactive_beam.options.capture_size = tz
+ interactive_beam.options.display_timezone = tz
"""
self._display_timezone = value
@@ -203,9 +223,9 @@ class Recordings():
"""An introspection interface for recordings for pipelines.
When a user materializes a PCollection onto disk (eg. ib.show) for a streaming
- pipeline, a background recording job is started. This job pulls data from all
- defined unbounded sources for that PCollection's pipeline. The following
- methods allow for introspection into that background recording job.
+ pipeline, a background source recording job is started. This job pulls data
+ from all defined unbounded sources for that PCollection's pipeline. The
+ following methods allow for introspection into that background recording job.
"""
def describe(self, pipeline=None):
# type: (Optional[beam.Pipeline]) -> dict[str, Any]
@@ -245,7 +265,7 @@ class Recordings():
def stop(self, pipeline):
# type: (beam.Pipeline) -> None
- """Stops the background recording of the given pipeline."""
+ """Stops the background source recording of the given pipeline."""
recording_manager = ie.current_env().get_recording_manager(
pipeline, create_if_absent=True)
@@ -254,8 +274,8 @@ class Recordings():
def record(self, pipeline):
# type: (beam.Pipeline) -> bool
- """Starts a background recording job for the given pipeline. Returns True if
- the recording job was started.
+ """Starts a background source recording job for the given pipeline. Returns
+ True if the recording job was started.
"""
description = self.describe(pipeline)
@@ -279,11 +299,10 @@ class Recordings():
# Users can set options to guide how Interactive Beam works.
# Examples:
-# from datetime import timedelta
# from apache_beam.runners.interactive import interactive_beam as ib
-# ib.options.enable_capture_replay = False/True
-# ib.options.capture_duration = timedelta(seconds=60)
-# ib.options.capturable_sources.add(SourceClass)
+# ib.options.enable_recording_replay = False/True
+# ib.options.recording_duration = '1m'
+# ib.options.recordable_sources.add(SourceClass)
# Check the docstrings for detailed usages.
options = Options()
@@ -578,11 +597,11 @@ def show_graph(pipeline):
pipeline_graph.PipelineGraph(pipeline).display_graph()
-def evict_captured_data(pipeline=None):
- """Forcefully evicts all captured replayable data for the given pipeline. If
+def evict_recorded_data(pipeline=None):
+ """Forcefully evicts all recorded replayable data for the given pipeline. If
no pipeline is specified, evicts for all user defined pipelines.
- Once invoked, Interactive Beam will capture new data based on the guidance of
+ Once invoked, Interactive Beam will record new data based on the guidance of
options the next time it evaluates/visualizes PCollections or runs pipelines.
"""
from apache_beam.runners.interactive.options import capture_control
diff --git a/sdks/python/apache_beam/runners/interactive/interactive_beam_test.py b/sdks/python/apache_beam/runners/interactive/interactive_beam_test.py
index 5bf76b0..b76e73d 100644
--- a/sdks/python/apache_beam/runners/interactive/interactive_beam_test.py
+++ b/sdks/python/apache_beam/runners/interactive/interactive_beam_test.py
@@ -222,7 +222,7 @@ class InteractiveBeamTest(unittest.TestCase):
"""Tests that recording pipeline succeeds."""
# Add the TestStream so that it can be cached.
- ib.options.capturable_sources.add(TestStream)
+ ib.options.recordable_sources.add(TestStream)
# Create a pipeline with an arbitrary amonunt of elements.
p = beam.Pipeline(
diff --git a/sdks/python/apache_beam/runners/interactive/interactive_runner.py b/sdks/python/apache_beam/runners/interactive/interactive_runner.py
index bca6743..99b4c7d 100644
--- a/sdks/python/apache_beam/runners/interactive/interactive_runner.py
+++ b/sdks/python/apache_beam/runners/interactive/interactive_runner.py
@@ -127,7 +127,7 @@ class InteractiveRunner(runners.PipelineRunner):
return self._underlying_runner.apply(transform, pvalueish, options)
def run_pipeline(self, pipeline, options):
- if not ie.current_env().options.enable_capture_replay:
+ if not ie.current_env().options.enable_recording_replay:
capture_control.evict_captured_data()
if self._force_compute:
ie.current_env().evict_computed_pcollections()
diff --git a/sdks/python/apache_beam/runners/interactive/interactive_runner_test.py b/sdks/python/apache_beam/runners/interactive/interactive_runner_test.py
index a1b3374..8c8d831 100644
--- a/sdks/python/apache_beam/runners/interactive/interactive_runner_test.py
+++ b/sdks/python/apache_beam/runners/interactive/interactive_runner_test.py
@@ -168,7 +168,7 @@ class InteractiveRunnerTest(unittest.TestCase):
return words
# Add the TestStream so that it can be cached.
- ib.options.capturable_sources.add(TestStream)
+ ib.options.recordable_sources.add(TestStream)
p = beam.Pipeline(
runner=interactive_runner.InteractiveRunner(),
diff --git a/sdks/python/apache_beam/runners/interactive/options/capture_control.py b/sdks/python/apache_beam/runners/interactive/options/capture_control.py
index ab877b5..837ff57 100644
--- a/sdks/python/apache_beam/runners/interactive/options/capture_control.py
+++ b/sdks/python/apache_beam/runners/interactive/options/capture_control.py
@@ -68,9 +68,9 @@ def evict_captured_data(pipeline=None):
In future PCollection evaluation/visualization and pipeline
runs, Interactive Beam will capture fresh data."""
- if ie.current_env().options.enable_capture_replay:
+ if ie.current_env().options.enable_recording_replay:
_LOGGER.info(
- 'You have requested Interactive Beam to evict all recorded'
+ 'You have requested Interactive Beam to evict all recorded '
'data that could be deterministically replayed among multiple '
'pipeline runs.')
ie.current_env().cleanup(pipeline)
diff --git a/sdks/python/apache_beam/runners/interactive/pipeline_instrument.py b/sdks/python/apache_beam/runners/interactive/pipeline_instrument.py
index cb80e37..8f603e1 100644
--- a/sdks/python/apache_beam/runners/interactive/pipeline_instrument.py
+++ b/sdks/python/apache_beam/runners/interactive/pipeline_instrument.py
@@ -392,7 +392,7 @@ class PipelineInstrument(object):
@property
def has_unbounded_sources(self):
- """Returns whether the pipeline has any capturable sources.
+ """Returns whether the pipeline has any recordable sources.
"""
return len(self._unbounded_sources) > 0
@@ -469,7 +469,7 @@ class PipelineInstrument(object):
def visit_transform(self, transform_node):
if isinstance(transform_node.transform,
- tuple(ie.current_env().options.capturable_sources)):
+ tuple(ie.current_env().options.recordable_sources)):
unbounded_source_pcolls.update(transform_node.outputs.values())
cacheable_inputs.update(self._pin._cacheable_inputs(transform_node))
ins, outs = self._pin._all_inputs_outputs(transform_node)
@@ -610,14 +610,14 @@ class PipelineInstrument(object):
if pcoll.pipeline is not pipeline:
return
- # Ignore the unbounded reads from capturable sources as these will be pruned
+ # Ignore the unbounded reads from recordable sources as these will be pruned
# out using the PipelineFragment later on.
if ignore_unbounded_reads:
ignore = False
producer = pcoll.producer
while producer:
if isinstance(producer.transform,
- tuple(ie.current_env().options.capturable_sources)):
+ tuple(ie.current_env().options.recordable_sources)):
ignore = True
break
producer = producer.parent
@@ -924,16 +924,16 @@ def cacheable_key(pcoll, pcolls_to_pcoll_id, pcoll_version_map=None):
def has_unbounded_sources(pipeline):
- """Checks if a given pipeline has capturable sources."""
+ """Checks if a given pipeline has recordable sources."""
return len(unbounded_sources(pipeline)) > 0
def unbounded_sources(pipeline):
- """Returns a pipeline's capturable sources."""
+ """Returns a pipeline's recordable sources."""
class CheckUnboundednessVisitor(PipelineVisitor):
"""Visitor checks if there are any unbounded read sources in the Pipeline.
- Visitor visits all nodes and checks if it is an instance of capturable
+ Visitor visits all nodes and checks if it is an instance of recordable
sources.
"""
def __init__(self):
@@ -944,7 +944,7 @@ def unbounded_sources(pipeline):
def visit_transform(self, transform_node):
if isinstance(transform_node.transform,
- tuple(ie.current_env().options.capturable_sources)):
+ tuple(ie.current_env().options.recordable_sources)):
self.unbounded_sources.append(transform_node)
v = CheckUnboundednessVisitor()
@@ -1006,7 +1006,7 @@ def watch_sources(pipeline):
def visit_transform(self, transform_node):
if isinstance(transform_node.transform,
- tuple(ie.current_env().options.capturable_sources)):
+ tuple(ie.current_env().options.recordable_sources)):
for pcoll in transform_node.outputs.values():
ie.current_env().watch({'synthetic_var_' + str(id(pcoll)): pcoll})
diff --git a/sdks/python/apache_beam/runners/interactive/recording_manager_test.py b/sdks/python/apache_beam/runners/interactive/recording_manager_test.py
index 29a9b4d..c8c30a3 100644
--- a/sdks/python/apache_beam/runners/interactive/recording_manager_test.py
+++ b/sdks/python/apache_beam/runners/interactive/recording_manager_test.py
@@ -367,7 +367,7 @@ class RecordingManagerTest(unittest.TestCase):
'This test requires at least Python 3.6 to work.')
def test_cancel_stops_recording(self):
# Add the TestStream so that it can be cached.
- ib.options.capturable_sources.add(TestStream)
+ ib.options.recordable_sources.add(TestStream)
p = beam.Pipeline(
InteractiveRunner(), options=PipelineOptions(streaming=True))
@@ -410,7 +410,7 @@ class RecordingManagerTest(unittest.TestCase):
cache ensures that correct results are computed every run.
"""
# Add the TestStream so that it can be cached.
- ib.options.capturable_sources.add(TestStream)
+ ib.options.recordable_sources.add(TestStream)
p = beam.Pipeline(
InteractiveRunner(), options=PipelineOptions(streaming=True))
elems = (
@@ -500,7 +500,7 @@ class RecordingManagerTest(unittest.TestCase):
'This test requires at least Python 3.6 to work.')
def test_record_pipeline(self):
# Add the TestStream so that it can be cached.
- ib.options.capturable_sources.add(TestStream)
+ ib.options.recordable_sources.add(TestStream)
p = beam.Pipeline(
InteractiveRunner(), options=PipelineOptions(streaming=True))
# pylint: disable=unused-variable