You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by pa...@apache.org on 2020/03/13 23:53:31 UTC
[beam] branch master updated: [BEAM-8335] Final PR to merge the
InteractiveBeam feature branch
This is an automated email from the ASF dual-hosted git repository.
pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new c14491d [BEAM-8335] Final PR to merge the InteractiveBeam feature branch
new e95d066 Merge pull request #11109 from [BEAM-8335] Final PR to merge the InteractiveBeam feature branch
c14491d is described below
commit c14491dd66cbb64a8f670e58434e0d916ab26a81
Author: Sam Rohde <ro...@gmail.com>
AuthorDate: Thu Mar 12 10:23:59 2020 -0700
[BEAM-8335] Final PR to merge the InteractiveBeam feature branch
Change-Id: Icc5564bb5bc1d771394f3e9272984366f8242dfa
---
.../direct/consumer_tracking_pipeline_visitor.py | 7 +-
.../consumer_tracking_pipeline_visitor_test.py | 37 ++++++
.../apache_beam/runners/direct/test_stream_impl.py | 3 +-
.../runners/interactive/background_caching_job.py | 2 +-
.../interactive/background_caching_job_test.py | 34 ++++++
.../runners/interactive/caching/streaming_cache.py | 10 +-
.../runners/interactive/display/pipeline_graph.py | 1 +
.../runners/interactive/interactive_beam.py | 127 +++++++++++++++++++++
.../runners/interactive/interactive_runner.py | 34 +++++-
.../runners/interactive/interactive_runner_test.py | 61 +++++++---
.../runners/interactive/options/capture_control.py | 2 +-
.../runners/interactive/pipeline_fragment.py | 3 +
.../runners/interactive/pipeline_instrument.py | 3 +-
.../interactive/pipeline_instrument_test.py | 22 ++--
.../interactive/testing/pipeline_assertion.py | 4 +-
.../apache_beam/runners/interactive/utils.py | 8 +-
16 files changed, 315 insertions(+), 43 deletions(-)
diff --git a/sdks/python/apache_beam/runners/direct/consumer_tracking_pipeline_visitor.py b/sdks/python/apache_beam/runners/direct/consumer_tracking_pipeline_visitor.py
index acf48d8..4290b55 100644
--- a/sdks/python/apache_beam/runners/direct/consumer_tracking_pipeline_visitor.py
+++ b/sdks/python/apache_beam/runners/direct/consumer_tracking_pipeline_visitor.py
@@ -23,7 +23,6 @@ from __future__ import absolute_import
from typing import TYPE_CHECKING
from typing import Dict
-from typing import List
from typing import Set
from apache_beam import pvalue
@@ -44,7 +43,7 @@ class ConsumerTrackingPipelineVisitor(PipelineVisitor):
"""
def __init__(self):
self.value_to_consumers = {
- } # type: Dict[pvalue.PValue, List[AppliedPTransform]]
+ } # type: Dict[pvalue.PValue, Set[AppliedPTransform]]
self.root_transforms = set() # type: Set[AppliedPTransform]
self.step_names = {} # type: Dict[AppliedPTransform, str]
@@ -68,8 +67,8 @@ class ConsumerTrackingPipelineVisitor(PipelineVisitor):
if isinstance(input_value, pvalue.PBegin):
self.root_transforms.add(applied_ptransform)
if input_value not in self.value_to_consumers:
- self.value_to_consumers[input_value] = []
- self.value_to_consumers[input_value].append(applied_ptransform)
+ self.value_to_consumers[input_value] = set()
+ self.value_to_consumers[input_value].add(applied_ptransform)
else:
self.root_transforms.add(applied_ptransform)
self.step_names[applied_ptransform] = 's%d' % (self._num_transforms)
diff --git a/sdks/python/apache_beam/runners/direct/consumer_tracking_pipeline_visitor_test.py b/sdks/python/apache_beam/runners/direct/consumer_tracking_pipeline_visitor_test.py
index aebf452..ec9dd81 100644
--- a/sdks/python/apache_beam/runners/direct/consumer_tracking_pipeline_visitor_test.py
+++ b/sdks/python/apache_beam/runners/direct/consumer_tracking_pipeline_visitor_test.py
@@ -126,6 +126,43 @@ class ConsumerTrackingPipelineVisitorTest(unittest.TestCase):
len(self.visitor.step_names), 3) # 2 creates + expanded CoGBK
self.assertEqual(len(self.visitor.views), 0)
+ def test_visitor_not_sorted(self):
+ p = Pipeline()
+ # pylint: disable=expression-not-assigned
+ from apache_beam.testing.test_stream import TestStream
+ p | TestStream().add_elements(['']) | beam.Map(lambda _: _)
+
+ original_graph = p.to_runner_api(return_context=False)
+ out_of_order_graph = p.to_runner_api(return_context=False)
+
+ root_id = out_of_order_graph.root_transform_ids[0]
+ root = out_of_order_graph.components.transforms[root_id]
+ tmp = root.subtransforms[0]
+ root.subtransforms[0] = root.subtransforms[1]
+ root.subtransforms[1] = tmp
+
+ p = beam.Pipeline().from_runner_api(
+ out_of_order_graph, runner='BundleBasedDirectRunner', options=None)
+ v_out_of_order = ConsumerTrackingPipelineVisitor()
+ p.visit(v_out_of_order)
+
+ p = beam.Pipeline().from_runner_api(
+ original_graph, runner='BundleBasedDirectRunner', options=None)
+ v_original = ConsumerTrackingPipelineVisitor()
+ p.visit(v_original)
+
+ # Convert to string to assert they are equal.
+ out_of_order_labels = {
+ str(k): [str(t) for t in v_out_of_order.value_to_consumers[k]]
+ for k in v_out_of_order.value_to_consumers
+ }
+
+ original_labels = {
+ str(k): [str(t) for t in v_original.value_to_consumers[k]]
+ for k in v_original.value_to_consumers
+ }
+ self.assertDictEqual(out_of_order_labels, original_labels)
+
if __name__ == '__main__':
logging.getLogger().setLevel(logging.DEBUG)
diff --git a/sdks/python/apache_beam/runners/direct/test_stream_impl.py b/sdks/python/apache_beam/runners/direct/test_stream_impl.py
index e5ddc0a..149f575 100644
--- a/sdks/python/apache_beam/runners/direct/test_stream_impl.py
+++ b/sdks/python/apache_beam/runners/direct/test_stream_impl.py
@@ -106,7 +106,8 @@ class _ExpandableTestStream(PTransform):
| _TestStream(
self.test_stream.output_tags,
events=self.test_stream._events,
- coder=self.test_stream.coder)
+ coder=self.test_stream.coder,
+ endpoint=self.test_stream._endpoint)
| 'TestStream Multiplexer' >> ParDo(mux).with_outputs())
# Apply a way to control the watermark per output. It is necessary to
diff --git a/sdks/python/apache_beam/runners/interactive/background_caching_job.py b/sdks/python/apache_beam/runners/interactive/background_caching_job.py
index e4e565b..e002809 100644
--- a/sdks/python/apache_beam/runners/interactive/background_caching_job.py
+++ b/sdks/python/apache_beam/runners/interactive/background_caching_job.py
@@ -56,7 +56,7 @@ class BackgroundCachingJob(object):
"""A simple abstraction that controls necessary components of a timed and
space limited background caching job.
- A background caching job successfully complete source data capture in 2
+ A background caching job successfully completes source data capture in 2
conditions:
#. The job is finite and runs into DONE state;
diff --git a/sdks/python/apache_beam/runners/interactive/background_caching_job_test.py b/sdks/python/apache_beam/runners/interactive/background_caching_job_test.py
index e5b6b52..a3bce37 100644
--- a/sdks/python/apache_beam/runners/interactive/background_caching_job_test.py
+++ b/sdks/python/apache_beam/runners/interactive/background_caching_job_test.py
@@ -30,7 +30,9 @@ from apache_beam.runners.interactive import background_caching_job as bcj
from apache_beam.runners.interactive import interactive_beam as ib
from apache_beam.runners.interactive import interactive_environment as ie
from apache_beam.runners.interactive import interactive_runner
+from apache_beam.runners.interactive.caching.streaming_cache import StreamingCache
from apache_beam.runners.interactive.testing.mock_ipython import mock_get_ipython
+from apache_beam.runners.interactive.testing.test_cache_manager import FileRecordsBuilder
from apache_beam.testing.test_stream import TestStream
from apache_beam.testing.test_stream_service import TestStreamServiceController
from apache_beam.transforms.window import TimestampedValue
@@ -44,6 +46,7 @@ except ImportError:
_FOO_PUBSUB_SUB = 'projects/test-project/subscriptions/foo'
_BAR_PUBSUB_SUB = 'projects/test-project/subscriptions/bar'
+_TEST_CACHE_KEY = 'test'
def _build_a_test_stream_pipeline():
@@ -68,6 +71,19 @@ def _build_an_empty_stream_pipeline():
return p
+def _setup_test_streaming_cache():
+ cache_manager = StreamingCache(cache_dir=None)
+ ie.current_env().set_cache_manager(cache_manager)
+ builder = FileRecordsBuilder(tag=_TEST_CACHE_KEY)
+ (builder
+ .advance_watermark(watermark_secs=0)
+ .advance_processing_time(5)
+ .add_element(element='a', event_time_secs=1)
+ .advance_watermark(watermark_secs=100)
+ .advance_processing_time(10)) # yapf: disable
+ cache_manager.write(builder.build(), _TEST_CACHE_KEY)
+
+
@unittest.skipIf(
not ie.current_env().is_interactive_ready,
'[interactive] dependency is not installed.')
@@ -85,8 +101,14 @@ class BackgroundCachingJobTest(unittest.TestCase):
'apache_beam.runners.interactive.background_caching_job'
'.has_source_to_cache',
lambda x: True)
+ # Disable the clean up so that we can keep the test streaming cache.
+ @patch(
+ 'apache_beam.runners.interactive.interactive_environment'
+ '.InteractiveEnvironment.cleanup',
+ lambda x: None)
def test_background_caching_job_starts_when_none_such_job_exists(self):
p = _build_a_test_stream_pipeline()
+ _setup_test_streaming_cache()
p.run()
self.assertIsNotNone(ie.current_env().get_background_caching_job(p))
expected_cached_source_signature = bcj.extract_source_to_cache_signature(p)
@@ -109,8 +131,14 @@ class BackgroundCachingJobTest(unittest.TestCase):
'apache_beam.runners.interactive.background_caching_job'
'.has_source_to_cache',
lambda x: True)
+ # Disable the clean up so that we can keep the test streaming cache.
+ @patch(
+ 'apache_beam.runners.interactive.interactive_environment'
+ '.InteractiveEnvironment.cleanup',
+ lambda x: None)
def test_background_caching_job_not_start_when_such_job_exists(self):
p = _build_a_test_stream_pipeline()
+ _setup_test_streaming_cache()
a_running_background_caching_job = bcj.BackgroundCachingJob(
runner.PipelineResult(runner.PipelineState.RUNNING))
ie.current_env().set_background_caching_job(
@@ -127,8 +155,14 @@ class BackgroundCachingJobTest(unittest.TestCase):
'apache_beam.runners.interactive.background_caching_job'
'.has_source_to_cache',
lambda x: True)
+ # Disable the clean up so that we can keep the test streaming cache.
+ @patch(
+ 'apache_beam.runners.interactive.interactive_environment'
+ '.InteractiveEnvironment.cleanup',
+ lambda x: None)
def test_background_caching_job_not_start_when_such_job_is_done(self):
p = _build_a_test_stream_pipeline()
+ _setup_test_streaming_cache()
a_done_background_caching_job = bcj.BackgroundCachingJob(
runner.PipelineResult(runner.PipelineState.DONE))
ie.current_env().set_background_caching_job(
diff --git a/sdks/python/apache_beam/runners/interactive/caching/streaming_cache.py b/sdks/python/apache_beam/runners/interactive/caching/streaming_cache.py
index 1dea45a..17d8a5f 100644
--- a/sdks/python/apache_beam/runners/interactive/caching/streaming_cache.py
+++ b/sdks/python/apache_beam/runners/interactive/caching/streaming_cache.py
@@ -84,7 +84,7 @@ class StreamingCacheSink(beam.PTransform):
"""Returns the space usage in bytes of the sink."""
try:
return os.stat(self._path).st_size
- except Exception:
+ except OSError:
_LOGGER.debug(
'Failed to calculate cache size for file %s, the file might have not '
'been created yet. Return 0. %s',
@@ -104,7 +104,7 @@ class StreamingCacheSink(beam.PTransform):
self._coder = coder
# Try and make the given path.
- Path(os.path.dirname(full_path)).mkdir(exist_ok=True)
+ Path(os.path.dirname(full_path)).mkdir(parents=True, exist_ok=True)
def start_bundle(self):
# Open the file for 'append-mode' and writing 'bytes'.
@@ -303,7 +303,11 @@ class StreamingCache(CacheManager):
os.makedirs(directory)
with open(filepath, 'ab') as f:
for v in values:
- f.write(self._default_pcoder.encode(v.SerializeToString()) + b'\n')
+ if isinstance(v, (TestStreamFileHeader, TestStreamFileRecord)):
+ val = v.SerializeToString()
+ else:
+ val = v
+ f.write(self._default_pcoder.encode(val) + b'\n')
def source(self, *labels):
"""Returns the StreamingCacheManager source.
diff --git a/sdks/python/apache_beam/runners/interactive/display/pipeline_graph.py b/sdks/python/apache_beam/runners/interactive/display/pipeline_graph.py
index cf627d5..d061845 100644
--- a/sdks/python/apache_beam/runners/interactive/display/pipeline_graph.py
+++ b/sdks/python/apache_beam/runners/interactive/display/pipeline_graph.py
@@ -121,6 +121,7 @@ class PipelineGraph(object):
return self._get_graph().to_string()
def display_graph(self):
+ """Displays the graph generated."""
rendered_graph = self._renderer.render_pipeline_graph(self)
if ie.current_env().is_in_notebook:
try:
diff --git a/sdks/python/apache_beam/runners/interactive/interactive_beam.py b/sdks/python/apache_beam/runners/interactive/interactive_beam.py
index 7832a7e..38e320b 100644
--- a/sdks/python/apache_beam/runners/interactive/interactive_beam.py
+++ b/sdks/python/apache_beam/runners/interactive/interactive_beam.py
@@ -40,9 +40,12 @@ from apache_beam.runners.interactive import background_caching_job as bcj
from apache_beam.runners.interactive import interactive_environment as ie
from apache_beam.runners.interactive import interactive_runner as ir
from apache_beam.runners.interactive import pipeline_fragment as pf
+from apache_beam.runners.interactive import pipeline_instrument as pi
from apache_beam.runners.interactive.display import pipeline_graph
from apache_beam.runners.interactive.display.pcoll_visualization import visualize
from apache_beam.runners.interactive.options import interactive_options
+from apache_beam.runners.interactive.utils import elements_to_df
+from apache_beam.runners.interactive.utils import to_element_list
class Options(interactive_options.InteractiveOptions):
@@ -86,6 +89,7 @@ class Options(interactive_options.InteractiveOptions):
interactive_beam.evict_captured_data()
# The next PCollection evaluation will capture fresh data from sources,
# and the data captured will be replayed until another eviction.
+ interactive_beam.collect(some_pcoll)
"""
assert value.total_seconds() > 0, 'Duration must be a positive value.'
self.capture_control._capture_duration = value
@@ -233,6 +237,12 @@ def show(*pcolls, **configs):
more dive-in widget and statistically overview widget of the data.
Otherwise, those 2 data visualization widgets will not be displayed.
+ By default, the visualization contains data tables rendering data from given
+ pcolls separately as if they are converted into dataframes. If visualize_data
+ is True, there will be a more dive-in widget and statistically overview widget
+ of the data. Otherwise, those 2 data visualization widgets will not be
+ displayed.
+
Ad hoc builds a pipeline fragment including only transforms that are
necessary to produce data for given PCollections pcolls, runs the pipeline
fragment to compute data for those pcolls and then visualizes the data.
@@ -290,6 +300,9 @@ def show(*pcolls, **configs):
if isinstance(runner, ir.InteractiveRunner):
runner = runner._underlying_runner
+ # Make sure that sources without a user reference are still cached.
+ pi.watch_sources(user_pipeline)
+
# Make sure that all PCollections to be shown are watched. If a PCollection
# has not been watched, make up a variable name for that PCollection and watch
# it. No validation is needed here because the watch logic can handle
@@ -314,6 +327,26 @@ def show(*pcolls, **configs):
bcj.attempt_to_run_background_caching_job(
runner, user_pipeline, user_pipeline.options)
+ pcolls = set(pcolls)
+ computed_pcolls = set()
+ for pcoll in pcolls:
+ if pcoll in ie.current_env().computed_pcollections:
+ computed_pcolls.add(pcoll)
+ pcolls = pcolls.difference(computed_pcolls)
+ # If in notebook, static plotting computed pcolls as computation is done.
+ if ie.current_env().is_in_notebook:
+ for pcoll in computed_pcolls:
+ visualize(
+ pcoll,
+ include_window_info=include_window_info,
+ display_facets=visualize_data)
+ elif ie.current_env().is_in_ipython:
+ for pcoll in computed_pcolls:
+ visualize(pcoll, include_window_info=include_window_info)
+
+ if not pcolls:
+ return
+
# Build a pipeline fragment for the PCollections and run it.
result = pf.PipelineFragment(list(pcolls), user_pipeline.options).run()
ie.current_env().set_pipeline_result(user_pipeline, result)
@@ -343,6 +376,100 @@ def show(*pcolls, **configs):
ie.current_env().mark_pcollection_computed(pcolls)
+def collect(pcoll, include_window_info=False):
+ """Materializes all of the elements from a PCollection into a Dataframe.
+
+ For example::
+
+ p = beam.Pipeline(InteractiveRunner())
+ init = p | 'Init' >> beam.Create(range(10))
+ square = init | 'Square' >> beam.Map(lambda x: x * x)
+
+ # Run the pipeline and bring the PCollection into memory as a Dataframe.
+ in_memory_square = collect(square)
+ """
+ return head(pcoll, n=-1, include_window_info=include_window_info)
+
+
+def head(pcoll, n=5, include_window_info=False):
+ """Materializes the first n elements from a PCollection into a Dataframe.
+
+ This reads each element from file and reads only the amount that it needs
+ into memory.
+ For example::
+
+ p = beam.Pipeline(InteractiveRunner())
+ init = p | 'Init' >> beam.Create(range(10))
+ square = init | 'Square' >> beam.Map(lambda x: x * x)
+
+ # Run the pipeline and bring the PCollection into memory as a Dataframe.
+ in_memory_square = head(square, n=5)
+ """
+ assert isinstance(pcoll, beam.pvalue.PCollection), (
+ '{} is not an apache_beam.pvalue.PCollection.'.format(pcoll))
+
+ user_pipeline = pcoll.pipeline
+ runner = user_pipeline.runner
+ if isinstance(runner, ir.InteractiveRunner):
+ runner = runner._underlying_runner
+
+ # Make sure that sources without a user reference are still cached.
+ pi.watch_sources(user_pipeline)
+
+ # Make sure that all PCollections to be shown are watched. If a PCollection
+ # has not been watched, make up a variable name for that PCollection and watch
+ # it. No validation is needed here because the watch logic can handle
+ # arbitrary variables.
+ watched_pcollections = set()
+ for watching in ie.current_env().watching():
+ for _, val in watching:
+ if hasattr(val, '__class__') and isinstance(val, beam.pvalue.PCollection):
+ watched_pcollections.add(val)
+ if pcoll not in watched_pcollections:
+ watch({'anonymous_pcollection_{}'.format(id(pcoll)): pcoll})
+
+ warnings.filterwarnings('ignore', category=DeprecationWarning)
+ # Attempt to run background caching job since we have the reference to the
+ # user-defined pipeline.
+ bcj.attempt_to_run_background_caching_job(
+ runner, user_pipeline, user_pipeline.options)
+
+ if pcoll in ie.current_env().computed_pcollections:
+ # Read from pcoll cache, then convert to DF
+ pipeline_instrument = pi.PipelineInstrument(pcoll.pipeline)
+ key = pipeline_instrument.cache_key(pcoll)
+ cache_manager = ie.current_env().cache_manager()
+
+ coder = cache_manager.load_pcoder('full', key)
+ reader, _ = cache_manager.read('full', key)
+ elements = to_element_list(reader, coder, include_window_info=True)
+ else:
+
+ # Build a pipeline fragment for the PCollections and run it.
+ result = pf.PipelineFragment([pcoll], user_pipeline.options).run()
+ ie.current_env().set_pipeline_result(user_pipeline, result)
+
+ # Invoke wait_until_finish to ensure the blocking nature of this API without
+ # relying on the run to be blocking.
+ result.wait_until_finish()
+
+ # If the pipeline execution is successful at this stage, mark the
+ # computation completeness for the given PCollections so that when further
+ # `show` invocation occurs, Interactive Beam wouldn't need to re-compute.
+ if result.state is beam.runners.runner.PipelineState.DONE:
+ ie.current_env().mark_pcollection_computed([pcoll])
+
+ elements = result.read(pcoll, include_window_info=True)
+
+ results = []
+ for e in elements:
+ results.append(e)
+ if len(results) >= n and n > 0:
+ break
+
+ return elements_to_df(results, include_window_info=include_window_info)
+
+
def show_graph(pipeline):
"""Shows the current pipeline shape of a given Beam pipeline as a DAG.
"""
diff --git a/sdks/python/apache_beam/runners/interactive/interactive_runner.py b/sdks/python/apache_beam/runners/interactive/interactive_runner.py
index 77e0efc..1281373 100644
--- a/sdks/python/apache_beam/runners/interactive/interactive_runner.py
+++ b/sdks/python/apache_beam/runners/interactive/interactive_runner.py
@@ -31,6 +31,7 @@ import logging
import apache_beam as beam
from apache_beam import runners
from apache_beam.options.pipeline_options import DebugOptions
+from apache_beam.pipeline import PipelineVisitor
from apache_beam.runners.direct import direct_runner
from apache_beam.runners.interactive import cache_manager as cache
from apache_beam.runners.interactive import interactive_environment as ie
@@ -39,6 +40,7 @@ from apache_beam.runners.interactive import background_caching_job
from apache_beam.runners.interactive.display import pipeline_graph
from apache_beam.runners.interactive.options import capture_control
from apache_beam.runners.interactive.utils import to_element_list
+from apache_beam.testing.test_stream_service import TestStreamServiceController
# size of PCollection samples cached.
SAMPLE_SIZE = 8
@@ -148,6 +150,10 @@ class InteractiveRunner(runners.PipelineRunner):
if self._force_compute:
ie.current_env().evict_computed_pcollections()
+ # Make sure that sources without a user reference are still cached.
+ inst.watch_sources(pipeline)
+
+ user_pipeline = inst.user_pipeline(pipeline)
pipeline_instrument = inst.build_pipeline_instrument(pipeline, options)
# The user_pipeline analyzed might be None if the pipeline given has nothing
@@ -155,17 +161,41 @@ class InteractiveRunner(runners.PipelineRunner):
# When it's None, there is no need to cache including the background
# caching job and no result to track since no background caching job is
# started at all.
- user_pipeline = pipeline_instrument.user_pipeline
if user_pipeline:
# Should use the underlying runner and run asynchronously.
background_caching_job.attempt_to_run_background_caching_job(
self._underlying_runner, user_pipeline, options)
+ if (background_caching_job.has_source_to_cache(user_pipeline) and
+ not background_caching_job.is_a_test_stream_service_running(
+ user_pipeline)):
+ streaming_cache_manager = ie.current_env().cache_manager()
+ if streaming_cache_manager:
+ test_stream_service = TestStreamServiceController(
+ streaming_cache_manager)
+ test_stream_service.start()
+ ie.current_env().set_test_stream_service_controller(
+ user_pipeline, test_stream_service)
pipeline_to_execute = beam.pipeline.Pipeline.from_runner_api(
pipeline_instrument.instrumented_pipeline_proto(),
self._underlying_runner,
options)
+ if ie.current_env().get_test_stream_service_controller(user_pipeline):
+ endpoint = ie.current_env().get_test_stream_service_controller(
+ user_pipeline).endpoint
+
+ # TODO: make the StreamingCacheManager and TestStreamServiceController
+ # constructed when the InteractiveEnvironment is imported.
+ class TestStreamVisitor(PipelineVisitor):
+ def visit_transform(self, transform_node):
+ from apache_beam.testing.test_stream import TestStream
+ if (isinstance(transform_node.transform, TestStream) and
+ not transform_node.transform._events):
+ transform_node.transform._endpoint = endpoint
+
+ pipeline_to_execute.visit(TestStreamVisitor())
+
if not self._skip_display:
a_pipeline_graph = pipeline_graph.PipelineGraph(
pipeline_instrument.original_pipeline,
@@ -216,6 +246,7 @@ class PipelineResult(beam.runners.runner.PipelineResult):
def get(self, pcoll, include_window_info=False):
"""Materializes the PCollection into a list.
+
If include_window_info is True, then returns the elements as
WindowedValues. Otherwise, return the element as itself.
"""
@@ -223,6 +254,7 @@ class PipelineResult(beam.runners.runner.PipelineResult):
def read(self, pcoll, include_window_info=False):
"""Reads the PCollection one element at a time from cache.
+
If include_window_info is True, then returns the elements as
WindowedValues. Otherwise, return the element as itself.
"""
diff --git a/sdks/python/apache_beam/runners/interactive/interactive_runner_test.py b/sdks/python/apache_beam/runners/interactive/interactive_runner_test.py
index 3e1ad80..5b7090e 100644
--- a/sdks/python/apache_beam/runners/interactive/interactive_runner_test.py
+++ b/sdks/python/apache_beam/runners/interactive/interactive_runner_test.py
@@ -28,12 +28,19 @@ from __future__ import print_function
import unittest
+import pandas as pd
+
import apache_beam as beam
from apache_beam.runners.direct import direct_runner
from apache_beam.runners.interactive import interactive_beam as ib
from apache_beam.runners.interactive import interactive_environment as ie
from apache_beam.runners.interactive import interactive_runner
from apache_beam.runners.interactive.testing.mock_ipython import mock_get_ipython
+from apache_beam.transforms.window import GlobalWindow
+from apache_beam.utils.timestamp import Timestamp
+from apache_beam.utils.windowed_value import PaneInfo
+from apache_beam.utils.windowed_value import PaneInfoTiming
+from apache_beam.utils.windowed_value import WindowedValue
# TODO(BEAM-8288): clean up the work-around of nose tests using Python2 without
# unittest.mock module.
@@ -98,19 +105,47 @@ class InteractiveRunnerTest(unittest.TestCase):
result = p.run()
result.wait_until_finish()
- actual = dict(result.get(counts))
- self.assertDictEqual(
- actual,
- {
- 'to': 2,
- 'be': 2,
- 'or': 1,
- 'not': 1,
- 'that': 1,
- 'is': 1,
- 'the': 1,
- 'question': 1
- })
+ actual = list(result.get(counts))
+ self.assertSetEqual(
+ set(actual),
+ set([
+ ('or', 1),
+ ('that', 1),
+ ('be', 2),
+ ('is', 1),
+ ('question', 1),
+ ('to', 2),
+ ('the', 1),
+ ('not', 1),
+ ]))
+
+ # Truncate the precision to millis because the window coder uses millis
+ # as units then gets upcast to micros.
+ end_of_window = (GlobalWindow().max_timestamp().micros // 1000) * 1000
+ df_counts = ib.collect(counts, include_window_info=True)
+ df_expected = pd.DataFrame({
+ 0: [e[0] for e in actual],
+ 1: [e[1] for e in actual],
+ 'event_time': [end_of_window for _ in actual],
+ 'windows': [[GlobalWindow()] for _ in actual],
+ 'pane_info': [
+ PaneInfo(True, True, PaneInfoTiming.ON_TIME, 0, 0) for _ in actual
+ ]
+ },
+ columns=[
+ 0, 1, 'event_time', 'windows', 'pane_info'
+ ])
+
+ pd.testing.assert_frame_equal(df_expected, df_counts)
+
+ actual_reified = result.get(counts, include_window_info=True)
+ expected_reified = [
+ WindowedValue(
+ e,
+ Timestamp(micros=end_of_window), [GlobalWindow()],
+ PaneInfo(True, True, PaneInfoTiming.ON_TIME, 0, 0)) for e in actual
+ ]
+ self.assertEqual(actual_reified, expected_reified)
def test_session(self):
class MockPipelineRunner(object):
diff --git a/sdks/python/apache_beam/runners/interactive/options/capture_control.py b/sdks/python/apache_beam/runners/interactive/options/capture_control.py
index 8b484e2..14bb742 100644
--- a/sdks/python/apache_beam/runners/interactive/options/capture_control.py
+++ b/sdks/python/apache_beam/runners/interactive/options/capture_control.py
@@ -42,7 +42,7 @@ class CaptureControl(object):
self._enable_capture_replay = True
self._capturable_sources = {
ReadFromPubSub,
- }
+ } # yapf: disable
self._capture_duration = timedelta(seconds=5)
self._capture_size_limit = 1e9
diff --git a/sdks/python/apache_beam/runners/interactive/pipeline_fragment.py b/sdks/python/apache_beam/runners/interactive/pipeline_fragment.py
index 54316f3..2831af4 100644
--- a/sdks/python/apache_beam/runners/interactive/pipeline_fragment.py
+++ b/sdks/python/apache_beam/runners/interactive/pipeline_fragment.py
@@ -204,6 +204,9 @@ class PipelineFragment(object):
def enter_composite_transform(self, transform_node):
if isinstance(transform_node.transform, TestStream):
return
+ if isinstance(transform_node.transform, TestStream):
+ return
+
pruned_parts = list(transform_node.parts)
for part in transform_node.parts:
if part not in necessary_transforms:
diff --git a/sdks/python/apache_beam/runners/interactive/pipeline_instrument.py b/sdks/python/apache_beam/runners/interactive/pipeline_instrument.py
index e803627..b39fea4 100644
--- a/sdks/python/apache_beam/runners/interactive/pipeline_instrument.py
+++ b/sdks/python/apache_beam/runners/interactive/pipeline_instrument.py
@@ -229,7 +229,7 @@ class PipelineInstrument(object):
# Create the pipeline_proto to read all the components from. It will later
# create a new pipeline proto from the cut out components.
pipeline_proto, context = pipeline.to_runner_api(
- return_context=True, use_fake_coders=False)
+ return_context=True, use_fake_coders=False)
# Get all the root transforms. The caching transforms will be subtransforms
# of one of these roots.
@@ -433,6 +433,7 @@ class PipelineInstrument(object):
cacheable_input in unbounded_source_pcolls)
# Replace/wire inputs w/ cached PCollections from ReadCache transforms.
self._replace_with_cached_inputs(self._pipeline)
+
# Write cache for all cacheables.
for _, cacheable in self.cacheables.items():
self._write_cache(
diff --git a/sdks/python/apache_beam/runners/interactive/pipeline_instrument_test.py b/sdks/python/apache_beam/runners/interactive/pipeline_instrument_test.py
index 56792ef..8fa9724 100644
--- a/sdks/python/apache_beam/runners/interactive/pipeline_instrument_test.py
+++ b/sdks/python/apache_beam/runners/interactive/pipeline_instrument_test.py
@@ -26,7 +26,6 @@ import unittest
import apache_beam as beam
from apache_beam import coders
from apache_beam.pipeline import PipelineVisitor
-from apache_beam.portability.api.beam_runner_api_pb2 import TestStreamPayload
from apache_beam.runners.interactive import cache_manager as cache
from apache_beam.runners.interactive import interactive_beam as ib
from apache_beam.runners.interactive import interactive_environment as ie
@@ -307,7 +306,7 @@ class PipelineInstrumentTest(unittest.TestCase):
def test_instrument_example_unbounded_pipeline_to_read_cache(self):
"""Tests that the instrumenter works for a single unbounded source.
- """
+ """
# Create a new interactive environment to make the test idempotent.
ie.new_env(cache_manager=streaming_cache.StreamingCache(cache_dir=None))
@@ -328,7 +327,7 @@ class PipelineInstrumentTest(unittest.TestCase):
if not isinstance(pcoll, beam.pvalue.PCollection):
continue
cache_key = cache_key_of(name, pcoll)
- self._mock_write_cache([TestStreamPayload()], cache_key)
+ self._mock_write_cache([b''], cache_key)
# Instrument the original pipeline to create the pipeline the user will see.
instrumenter = instr.build_pipeline_instrument(p_original)
@@ -467,9 +466,9 @@ class PipelineInstrumentTest(unittest.TestCase):
def test_instrument_mixed_streaming_batch(self):
"""Tests caching for both batch and streaming sources in the same pipeline.
- This ensures that cached bounded and unbounded sources are read from the
- TestStream.
- """
+ This ensures that cached bounded and unbounded sources are read from the
+ TestStream.
+ """
# Create a new interactive environment to make the test idempotent.
ie.new_env(cache_manager=streaming_cache.StreamingCache(cache_dir=None))
@@ -492,8 +491,7 @@ class PipelineInstrumentTest(unittest.TestCase):
def cache_key_of(name, pcoll):
return name + '_' + str(id(pcoll)) + '_' + str(id(pcoll.producer))
- self._mock_write_cache([TestStreamPayload()],
- cache_key_of('source_2', source_2))
+ self._mock_write_cache([b''], cache_key_of('source_2', source_2))
ie.current_env().mark_pcollection_computed([source_2])
# Instrument the original pipeline to create the pipeline the user will see.
@@ -552,7 +550,7 @@ class PipelineInstrumentTest(unittest.TestCase):
def test_instrument_example_unbounded_pipeline_direct_from_source(self):
"""Tests that the it caches PCollections from a source.
- """
+ """
# Create a new interactive environment to make the test idempotent.
ie.new_env(cache_manager=streaming_cache.StreamingCache(cache_dir=None))
@@ -618,7 +616,7 @@ class PipelineInstrumentTest(unittest.TestCase):
def test_instrument_example_unbounded_pipeline_to_read_cache_not_cached(self):
"""Tests that the instrumenter works when the PCollection is not cached.
- """
+ """
# Create a new interactive environment to make the test idempotent.
ie.new_env(cache_manager=streaming_cache.StreamingCache(cache_dir=None))
@@ -689,7 +687,7 @@ class PipelineInstrumentTest(unittest.TestCase):
def test_instrument_example_unbounded_pipeline_to_multiple_read_cache(self):
"""Tests that the instrumenter works for multiple unbounded sources.
- """
+ """
# Create a new interactive environment to make the test idempotent.
ie.new_env(cache_manager=streaming_cache.StreamingCache(cache_dir=None))
@@ -714,7 +712,7 @@ class PipelineInstrumentTest(unittest.TestCase):
if not isinstance(pcoll, beam.pvalue.PCollection):
continue
cache_key = cache_key_of(name, pcoll)
- self._mock_write_cache([TestStreamPayload()], cache_key)
+ self._mock_write_cache([b''], cache_key)
# Instrument the original pipeline to create the pipeline the user will see.
instrumenter = instr.build_pipeline_instrument(p_original)
diff --git a/sdks/python/apache_beam/runners/interactive/testing/pipeline_assertion.py b/sdks/python/apache_beam/runners/interactive/testing/pipeline_assertion.py
index 55e07db..9f4bdbc 100644
--- a/sdks/python/apache_beam/runners/interactive/testing/pipeline_assertion.py
+++ b/sdks/python/apache_beam/runners/interactive/testing/pipeline_assertion.py
@@ -69,7 +69,7 @@ def assert_pipeline_proto_equal(
def assert_pipeline_proto_contain_top_level_transform(
test_case, pipeline_proto, transform_label):
"""Asserts the top level transforms contain a transform with the given
- transform label."""
+ transform label."""
_assert_pipeline_proto_contains_top_level_transform(
test_case, pipeline_proto, transform_label, True)
@@ -77,7 +77,7 @@ def assert_pipeline_proto_contain_top_level_transform(
def assert_pipeline_proto_not_contain_top_level_transform(
test_case, pipeline_proto, transform_label):
"""Asserts the top level transforms do not contain a transform with the given
- transform label."""
+ transform label."""
_assert_pipeline_proto_contains_top_level_transform(
test_case, pipeline_proto, transform_label, False)
diff --git a/sdks/python/apache_beam/runners/interactive/utils.py b/sdks/python/apache_beam/runners/interactive/utils.py
index d78bb9c..046cf6e 100644
--- a/sdks/python/apache_beam/runners/interactive/utils.py
+++ b/sdks/python/apache_beam/runners/interactive/utils.py
@@ -26,10 +26,10 @@ from apache_beam.portability.api.beam_runner_api_pb2 import TestStreamPayload
def to_element_list(
- reader, # type: Generator[Union[TestStreamPayload.Event, WindowedValueHolder]]
- coder, # type: Coder
- include_window_info # type: bool
-):
+ reader, # type: Generator[Union[TestStreamPayload.Event, WindowedValueHolder]]
+ coder, # type: Coder
+ include_window_info # type: bool
+ ):
# type: (...) -> List[WindowedValue]
"""Returns an iterator that properly decodes the elements from the reader.