You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by pa...@apache.org on 2020/03/13 23:53:31 UTC

[beam] branch master updated: [BEAM-8335] Final PR to merge the InteractiveBeam feature branch

This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new c14491d  [BEAM-8335] Final PR to merge the InteractiveBeam feature branch
     new e95d066  Merge pull request #11109 from [BEAM-8335] Final PR to merge the InteractiveBeam feature branch
c14491d is described below

commit c14491dd66cbb64a8f670e58434e0d916ab26a81
Author: Sam Rohde <ro...@gmail.com>
AuthorDate: Thu Mar 12 10:23:59 2020 -0700

    [BEAM-8335] Final PR to merge the InteractiveBeam feature branch
    
    Change-Id: Icc5564bb5bc1d771394f3e9272984366f8242dfa
---
 .../direct/consumer_tracking_pipeline_visitor.py   |   7 +-
 .../consumer_tracking_pipeline_visitor_test.py     |  37 ++++++
 .../apache_beam/runners/direct/test_stream_impl.py |   3 +-
 .../runners/interactive/background_caching_job.py  |   2 +-
 .../interactive/background_caching_job_test.py     |  34 ++++++
 .../runners/interactive/caching/streaming_cache.py |  10 +-
 .../runners/interactive/display/pipeline_graph.py  |   1 +
 .../runners/interactive/interactive_beam.py        | 127 +++++++++++++++++++++
 .../runners/interactive/interactive_runner.py      |  34 +++++-
 .../runners/interactive/interactive_runner_test.py |  61 +++++++---
 .../runners/interactive/options/capture_control.py |   2 +-
 .../runners/interactive/pipeline_fragment.py       |   3 +
 .../runners/interactive/pipeline_instrument.py     |   3 +-
 .../interactive/pipeline_instrument_test.py        |  22 ++--
 .../interactive/testing/pipeline_assertion.py      |   4 +-
 .../apache_beam/runners/interactive/utils.py       |   8 +-
 16 files changed, 315 insertions(+), 43 deletions(-)

diff --git a/sdks/python/apache_beam/runners/direct/consumer_tracking_pipeline_visitor.py b/sdks/python/apache_beam/runners/direct/consumer_tracking_pipeline_visitor.py
index acf48d8..4290b55 100644
--- a/sdks/python/apache_beam/runners/direct/consumer_tracking_pipeline_visitor.py
+++ b/sdks/python/apache_beam/runners/direct/consumer_tracking_pipeline_visitor.py
@@ -23,7 +23,6 @@ from __future__ import absolute_import
 
 from typing import TYPE_CHECKING
 from typing import Dict
-from typing import List
 from typing import Set
 
 from apache_beam import pvalue
@@ -44,7 +43,7 @@ class ConsumerTrackingPipelineVisitor(PipelineVisitor):
   """
   def __init__(self):
     self.value_to_consumers = {
-    }  # type: Dict[pvalue.PValue, List[AppliedPTransform]]
+    }  # type: Dict[pvalue.PValue, Set[AppliedPTransform]]
     self.root_transforms = set()  # type: Set[AppliedPTransform]
     self.step_names = {}  # type: Dict[AppliedPTransform, str]
 
@@ -68,8 +67,8 @@ class ConsumerTrackingPipelineVisitor(PipelineVisitor):
         if isinstance(input_value, pvalue.PBegin):
           self.root_transforms.add(applied_ptransform)
         if input_value not in self.value_to_consumers:
-          self.value_to_consumers[input_value] = []
-        self.value_to_consumers[input_value].append(applied_ptransform)
+          self.value_to_consumers[input_value] = set()
+        self.value_to_consumers[input_value].add(applied_ptransform)
     else:
       self.root_transforms.add(applied_ptransform)
     self.step_names[applied_ptransform] = 's%d' % (self._num_transforms)
diff --git a/sdks/python/apache_beam/runners/direct/consumer_tracking_pipeline_visitor_test.py b/sdks/python/apache_beam/runners/direct/consumer_tracking_pipeline_visitor_test.py
index aebf452..ec9dd81 100644
--- a/sdks/python/apache_beam/runners/direct/consumer_tracking_pipeline_visitor_test.py
+++ b/sdks/python/apache_beam/runners/direct/consumer_tracking_pipeline_visitor_test.py
@@ -126,6 +126,43 @@ class ConsumerTrackingPipelineVisitorTest(unittest.TestCase):
         len(self.visitor.step_names), 3)  # 2 creates + expanded CoGBK
     self.assertEqual(len(self.visitor.views), 0)
 
+  def test_visitor_not_sorted(self):
+    p = Pipeline()
+    # pylint: disable=expression-not-assigned
+    from apache_beam.testing.test_stream import TestStream
+    p | TestStream().add_elements(['']) | beam.Map(lambda _: _)
+
+    original_graph = p.to_runner_api(return_context=False)
+    out_of_order_graph = p.to_runner_api(return_context=False)
+
+    root_id = out_of_order_graph.root_transform_ids[0]
+    root = out_of_order_graph.components.transforms[root_id]
+    tmp = root.subtransforms[0]
+    root.subtransforms[0] = root.subtransforms[1]
+    root.subtransforms[1] = tmp
+
+    p = beam.Pipeline().from_runner_api(
+        out_of_order_graph, runner='BundleBasedDirectRunner', options=None)
+    v_out_of_order = ConsumerTrackingPipelineVisitor()
+    p.visit(v_out_of_order)
+
+    p = beam.Pipeline().from_runner_api(
+        original_graph, runner='BundleBasedDirectRunner', options=None)
+    v_original = ConsumerTrackingPipelineVisitor()
+    p.visit(v_original)
+
+    # Convert to string to assert they are equal.
+    out_of_order_labels = {
+        str(k): [str(t) for t in v_out_of_order.value_to_consumers[k]]
+        for k in v_out_of_order.value_to_consumers
+    }
+
+    original_labels = {
+        str(k): [str(t) for t in v_original.value_to_consumers[k]]
+        for k in v_original.value_to_consumers
+    }
+    self.assertDictEqual(out_of_order_labels, original_labels)
+
 
 if __name__ == '__main__':
   logging.getLogger().setLevel(logging.DEBUG)
diff --git a/sdks/python/apache_beam/runners/direct/test_stream_impl.py b/sdks/python/apache_beam/runners/direct/test_stream_impl.py
index e5ddc0a..149f575 100644
--- a/sdks/python/apache_beam/runners/direct/test_stream_impl.py
+++ b/sdks/python/apache_beam/runners/direct/test_stream_impl.py
@@ -106,7 +106,8 @@ class _ExpandableTestStream(PTransform):
         | _TestStream(
             self.test_stream.output_tags,
             events=self.test_stream._events,
-            coder=self.test_stream.coder)
+            coder=self.test_stream.coder,
+            endpoint=self.test_stream._endpoint)
         | 'TestStream Multiplexer' >> ParDo(mux).with_outputs())
 
     # Apply a way to control the watermark per output. It is necessary to
diff --git a/sdks/python/apache_beam/runners/interactive/background_caching_job.py b/sdks/python/apache_beam/runners/interactive/background_caching_job.py
index e4e565b..e002809 100644
--- a/sdks/python/apache_beam/runners/interactive/background_caching_job.py
+++ b/sdks/python/apache_beam/runners/interactive/background_caching_job.py
@@ -56,7 +56,7 @@ class BackgroundCachingJob(object):
   """A simple abstraction that controls necessary components of a timed and
   space limited background caching job.
 
-  A background caching job successfully complete source data capture in 2
+  A background caching job successfully completes source data capture in 2
   conditions:
 
     #. The job is finite and runs into DONE state;
diff --git a/sdks/python/apache_beam/runners/interactive/background_caching_job_test.py b/sdks/python/apache_beam/runners/interactive/background_caching_job_test.py
index e5b6b52..a3bce37 100644
--- a/sdks/python/apache_beam/runners/interactive/background_caching_job_test.py
+++ b/sdks/python/apache_beam/runners/interactive/background_caching_job_test.py
@@ -30,7 +30,9 @@ from apache_beam.runners.interactive import background_caching_job as bcj
 from apache_beam.runners.interactive import interactive_beam as ib
 from apache_beam.runners.interactive import interactive_environment as ie
 from apache_beam.runners.interactive import interactive_runner
+from apache_beam.runners.interactive.caching.streaming_cache import StreamingCache
 from apache_beam.runners.interactive.testing.mock_ipython import mock_get_ipython
+from apache_beam.runners.interactive.testing.test_cache_manager import FileRecordsBuilder
 from apache_beam.testing.test_stream import TestStream
 from apache_beam.testing.test_stream_service import TestStreamServiceController
 from apache_beam.transforms.window import TimestampedValue
@@ -44,6 +46,7 @@ except ImportError:
 
 _FOO_PUBSUB_SUB = 'projects/test-project/subscriptions/foo'
 _BAR_PUBSUB_SUB = 'projects/test-project/subscriptions/bar'
+_TEST_CACHE_KEY = 'test'
 
 
 def _build_a_test_stream_pipeline():
@@ -68,6 +71,19 @@ def _build_an_empty_stream_pipeline():
   return p
 
 
+def _setup_test_streaming_cache():
+  cache_manager = StreamingCache(cache_dir=None)
+  ie.current_env().set_cache_manager(cache_manager)
+  builder = FileRecordsBuilder(tag=_TEST_CACHE_KEY)
+  (builder
+      .advance_watermark(watermark_secs=0)
+      .advance_processing_time(5)
+      .add_element(element='a', event_time_secs=1)
+      .advance_watermark(watermark_secs=100)
+      .advance_processing_time(10)) # yapf: disable
+  cache_manager.write(builder.build(), _TEST_CACHE_KEY)
+
+
 @unittest.skipIf(
     not ie.current_env().is_interactive_ready,
     '[interactive] dependency is not installed.')
@@ -85,8 +101,14 @@ class BackgroundCachingJobTest(unittest.TestCase):
       'apache_beam.runners.interactive.background_caching_job'
       '.has_source_to_cache',
       lambda x: True)
+  # Disable the clean up so that we can keep the test streaming cache.
+  @patch(
+      'apache_beam.runners.interactive.interactive_environment'
+      '.InteractiveEnvironment.cleanup',
+      lambda x: None)
   def test_background_caching_job_starts_when_none_such_job_exists(self):
     p = _build_a_test_stream_pipeline()
+    _setup_test_streaming_cache()
     p.run()
     self.assertIsNotNone(ie.current_env().get_background_caching_job(p))
     expected_cached_source_signature = bcj.extract_source_to_cache_signature(p)
@@ -109,8 +131,14 @@ class BackgroundCachingJobTest(unittest.TestCase):
       'apache_beam.runners.interactive.background_caching_job'
       '.has_source_to_cache',
       lambda x: True)
+  # Disable the clean up so that we can keep the test streaming cache.
+  @patch(
+      'apache_beam.runners.interactive.interactive_environment'
+      '.InteractiveEnvironment.cleanup',
+      lambda x: None)
   def test_background_caching_job_not_start_when_such_job_exists(self):
     p = _build_a_test_stream_pipeline()
+    _setup_test_streaming_cache()
     a_running_background_caching_job = bcj.BackgroundCachingJob(
         runner.PipelineResult(runner.PipelineState.RUNNING))
     ie.current_env().set_background_caching_job(
@@ -127,8 +155,14 @@ class BackgroundCachingJobTest(unittest.TestCase):
       'apache_beam.runners.interactive.background_caching_job'
       '.has_source_to_cache',
       lambda x: True)
+  # Disable the clean up so that we can keep the test streaming cache.
+  @patch(
+      'apache_beam.runners.interactive.interactive_environment'
+      '.InteractiveEnvironment.cleanup',
+      lambda x: None)
   def test_background_caching_job_not_start_when_such_job_is_done(self):
     p = _build_a_test_stream_pipeline()
+    _setup_test_streaming_cache()
     a_done_background_caching_job = bcj.BackgroundCachingJob(
         runner.PipelineResult(runner.PipelineState.DONE))
     ie.current_env().set_background_caching_job(
diff --git a/sdks/python/apache_beam/runners/interactive/caching/streaming_cache.py b/sdks/python/apache_beam/runners/interactive/caching/streaming_cache.py
index 1dea45a..17d8a5f 100644
--- a/sdks/python/apache_beam/runners/interactive/caching/streaming_cache.py
+++ b/sdks/python/apache_beam/runners/interactive/caching/streaming_cache.py
@@ -84,7 +84,7 @@ class StreamingCacheSink(beam.PTransform):
     """Returns the space usage in bytes of the sink."""
     try:
       return os.stat(self._path).st_size
-    except Exception:
+    except OSError:
       _LOGGER.debug(
           'Failed to calculate cache size for file %s, the file might have not '
           'been created yet. Return 0. %s',
@@ -104,7 +104,7 @@ class StreamingCacheSink(beam.PTransform):
         self._coder = coder
 
         # Try and make the given path.
-        Path(os.path.dirname(full_path)).mkdir(exist_ok=True)
+        Path(os.path.dirname(full_path)).mkdir(parents=True, exist_ok=True)
 
       def start_bundle(self):
         # Open the file for 'append-mode' and writing 'bytes'.
@@ -303,7 +303,11 @@ class StreamingCache(CacheManager):
       os.makedirs(directory)
     with open(filepath, 'ab') as f:
       for v in values:
-        f.write(self._default_pcoder.encode(v.SerializeToString()) + b'\n')
+        if isinstance(v, (TestStreamFileHeader, TestStreamFileRecord)):
+          val = v.SerializeToString()
+        else:
+          val = v
+        f.write(self._default_pcoder.encode(val) + b'\n')
 
   def source(self, *labels):
     """Returns the StreamingCacheManager source.
diff --git a/sdks/python/apache_beam/runners/interactive/display/pipeline_graph.py b/sdks/python/apache_beam/runners/interactive/display/pipeline_graph.py
index cf627d5..d061845 100644
--- a/sdks/python/apache_beam/runners/interactive/display/pipeline_graph.py
+++ b/sdks/python/apache_beam/runners/interactive/display/pipeline_graph.py
@@ -121,6 +121,7 @@ class PipelineGraph(object):
     return self._get_graph().to_string()
 
   def display_graph(self):
+    """Displays the graph generated."""
     rendered_graph = self._renderer.render_pipeline_graph(self)
     if ie.current_env().is_in_notebook:
       try:
diff --git a/sdks/python/apache_beam/runners/interactive/interactive_beam.py b/sdks/python/apache_beam/runners/interactive/interactive_beam.py
index 7832a7e..38e320b 100644
--- a/sdks/python/apache_beam/runners/interactive/interactive_beam.py
+++ b/sdks/python/apache_beam/runners/interactive/interactive_beam.py
@@ -40,9 +40,12 @@ from apache_beam.runners.interactive import background_caching_job as bcj
 from apache_beam.runners.interactive import interactive_environment as ie
 from apache_beam.runners.interactive import interactive_runner as ir
 from apache_beam.runners.interactive import pipeline_fragment as pf
+from apache_beam.runners.interactive import pipeline_instrument as pi
 from apache_beam.runners.interactive.display import pipeline_graph
 from apache_beam.runners.interactive.display.pcoll_visualization import visualize
 from apache_beam.runners.interactive.options import interactive_options
+from apache_beam.runners.interactive.utils import elements_to_df
+from apache_beam.runners.interactive.utils import to_element_list
 
 
 class Options(interactive_options.InteractiveOptions):
@@ -86,6 +89,7 @@ class Options(interactive_options.InteractiveOptions):
       interactive_beam.evict_captured_data()
       # The next PCollection evaluation will capture fresh data from sources,
       # and the data captured will be replayed until another eviction.
+      interactive_beam.collect(some_pcoll)
     """
     assert value.total_seconds() > 0, 'Duration must be a positive value.'
     self.capture_control._capture_duration = value
@@ -233,6 +237,12 @@ def show(*pcolls, **configs):
        more dive-in widget and statistically overview widget of the data.
        Otherwise, those 2 data visualization widgets will not be displayed.
 
+  By default, the visualization contains data tables rendering data from given
+  pcolls separately as if they are converted into dataframes. If visualize_data
+  is True, there will be a more dive-in widget and statistically overview widget
+  of the data. Otherwise, those 2 data visualization widgets will not be
+  displayed.
+
   Ad hoc builds a pipeline fragment including only transforms that are
   necessary to produce data for given PCollections pcolls, runs the pipeline
   fragment to compute data for those pcolls and then visualizes the data.
@@ -290,6 +300,9 @@ def show(*pcolls, **configs):
   if isinstance(runner, ir.InteractiveRunner):
     runner = runner._underlying_runner
 
+  # Make sure that sources without a user reference are still cached.
+  pi.watch_sources(user_pipeline)
+
   # Make sure that all PCollections to be shown are watched. If a PCollection
   # has not been watched, make up a variable name for that PCollection and watch
   # it. No validation is needed here because the watch logic can handle
@@ -314,6 +327,26 @@ def show(*pcolls, **configs):
   bcj.attempt_to_run_background_caching_job(
       runner, user_pipeline, user_pipeline.options)
 
+  pcolls = set(pcolls)
+  computed_pcolls = set()
+  for pcoll in pcolls:
+    if pcoll in ie.current_env().computed_pcollections:
+      computed_pcolls.add(pcoll)
+  pcolls = pcolls.difference(computed_pcolls)
+  # If in notebook, static plotting computed pcolls as computation is done.
+  if ie.current_env().is_in_notebook:
+    for pcoll in computed_pcolls:
+      visualize(
+          pcoll,
+          include_window_info=include_window_info,
+          display_facets=visualize_data)
+  elif ie.current_env().is_in_ipython:
+    for pcoll in computed_pcolls:
+      visualize(pcoll, include_window_info=include_window_info)
+
+  if not pcolls:
+    return
+
   # Build a pipeline fragment for the PCollections and run it.
   result = pf.PipelineFragment(list(pcolls), user_pipeline.options).run()
   ie.current_env().set_pipeline_result(user_pipeline, result)
@@ -343,6 +376,100 @@ def show(*pcolls, **configs):
     ie.current_env().mark_pcollection_computed(pcolls)
 
 
+def collect(pcoll, include_window_info=False):
+  """Materializes all of the elements from a PCollection into a Dataframe.
+
+  For example::
+
+    p = beam.Pipeline(InteractiveRunner())
+    init = p | 'Init' >> beam.Create(range(10))
+    square = init | 'Square' >> beam.Map(lambda x: x * x)
+
+    # Run the pipeline and bring the PCollection into memory as a Dataframe.
+    in_memory_square = collect(square)
+  """
+  return head(pcoll, n=-1, include_window_info=include_window_info)
+
+
+def head(pcoll, n=5, include_window_info=False):
+  """Materializes the first n elements from a PCollection into a Dataframe.
+
+  This reads each element from file and reads only the amount that it needs
+  into memory.
+  For example::
+
+    p = beam.Pipeline(InteractiveRunner())
+    init = p | 'Init' >> beam.Create(range(10))
+    square = init | 'Square' >> beam.Map(lambda x: x * x)
+
+    # Run the pipeline and bring the PCollection into memory as a Dataframe.
+    in_memory_square = head(square, n=5)
+  """
+  assert isinstance(pcoll, beam.pvalue.PCollection), (
+      '{} is not an apache_beam.pvalue.PCollection.'.format(pcoll))
+
+  user_pipeline = pcoll.pipeline
+  runner = user_pipeline.runner
+  if isinstance(runner, ir.InteractiveRunner):
+    runner = runner._underlying_runner
+
+  # Make sure that sources without a user reference are still cached.
+  pi.watch_sources(user_pipeline)
+
+  # Make sure that all PCollections to be shown are watched. If a PCollection
+  # has not been watched, make up a variable name for that PCollection and watch
+  # it. No validation is needed here because the watch logic can handle
+  # arbitrary variables.
+  watched_pcollections = set()
+  for watching in ie.current_env().watching():
+    for _, val in watching:
+      if hasattr(val, '__class__') and isinstance(val, beam.pvalue.PCollection):
+        watched_pcollections.add(val)
+  if pcoll not in watched_pcollections:
+    watch({'anonymous_pcollection_{}'.format(id(pcoll)): pcoll})
+
+  warnings.filterwarnings('ignore', category=DeprecationWarning)
+  # Attempt to run background caching job since we have the reference to the
+  # user-defined pipeline.
+  bcj.attempt_to_run_background_caching_job(
+      runner, user_pipeline, user_pipeline.options)
+
+  if pcoll in ie.current_env().computed_pcollections:
+    # Read from pcoll cache, then convert to DF
+    pipeline_instrument = pi.PipelineInstrument(pcoll.pipeline)
+    key = pipeline_instrument.cache_key(pcoll)
+    cache_manager = ie.current_env().cache_manager()
+
+    coder = cache_manager.load_pcoder('full', key)
+    reader, _ = cache_manager.read('full', key)
+    elements = to_element_list(reader, coder, include_window_info=True)
+  else:
+
+    # Build a pipeline fragment for the PCollections and run it.
+    result = pf.PipelineFragment([pcoll], user_pipeline.options).run()
+    ie.current_env().set_pipeline_result(user_pipeline, result)
+
+    # Invoke wait_until_finish to ensure the blocking nature of this API without
+    # relying on the run to be blocking.
+    result.wait_until_finish()
+
+    # If the pipeline execution is successful at this stage, mark the
+    # computation completeness for the given PCollections so that when further
+    # `show` invocation occurs, Interactive Beam wouldn't need to re-compute.
+    if result.state is beam.runners.runner.PipelineState.DONE:
+      ie.current_env().mark_pcollection_computed([pcoll])
+
+    elements = result.read(pcoll, include_window_info=True)
+
+  results = []
+  for e in elements:
+    results.append(e)
+    if len(results) >= n and n > 0:
+      break
+
+  return elements_to_df(results, include_window_info=include_window_info)
+
+
 def show_graph(pipeline):
   """Shows the current pipeline shape of a given Beam pipeline as a DAG.
   """
diff --git a/sdks/python/apache_beam/runners/interactive/interactive_runner.py b/sdks/python/apache_beam/runners/interactive/interactive_runner.py
index 77e0efc..1281373 100644
--- a/sdks/python/apache_beam/runners/interactive/interactive_runner.py
+++ b/sdks/python/apache_beam/runners/interactive/interactive_runner.py
@@ -31,6 +31,7 @@ import logging
 import apache_beam as beam
 from apache_beam import runners
 from apache_beam.options.pipeline_options import DebugOptions
+from apache_beam.pipeline import PipelineVisitor
 from apache_beam.runners.direct import direct_runner
 from apache_beam.runners.interactive import cache_manager as cache
 from apache_beam.runners.interactive import interactive_environment as ie
@@ -39,6 +40,7 @@ from apache_beam.runners.interactive import background_caching_job
 from apache_beam.runners.interactive.display import pipeline_graph
 from apache_beam.runners.interactive.options import capture_control
 from apache_beam.runners.interactive.utils import to_element_list
+from apache_beam.testing.test_stream_service import TestStreamServiceController
 
 # size of PCollection samples cached.
 SAMPLE_SIZE = 8
@@ -148,6 +150,10 @@ class InteractiveRunner(runners.PipelineRunner):
     if self._force_compute:
       ie.current_env().evict_computed_pcollections()
 
+    # Make sure that sources without a user reference are still cached.
+    inst.watch_sources(pipeline)
+
+    user_pipeline = inst.user_pipeline(pipeline)
     pipeline_instrument = inst.build_pipeline_instrument(pipeline, options)
 
     # The user_pipeline analyzed might be None if the pipeline given has nothing
@@ -155,17 +161,41 @@ class InteractiveRunner(runners.PipelineRunner):
     # When it's None, there is no need to cache including the background
     # caching job and no result to track since no background caching job is
     # started at all.
-    user_pipeline = pipeline_instrument.user_pipeline
     if user_pipeline:
       # Should use the underlying runner and run asynchronously.
       background_caching_job.attempt_to_run_background_caching_job(
           self._underlying_runner, user_pipeline, options)
+      if (background_caching_job.has_source_to_cache(user_pipeline) and
+          not background_caching_job.is_a_test_stream_service_running(
+              user_pipeline)):
+        streaming_cache_manager = ie.current_env().cache_manager()
+        if streaming_cache_manager:
+          test_stream_service = TestStreamServiceController(
+              streaming_cache_manager)
+          test_stream_service.start()
+          ie.current_env().set_test_stream_service_controller(
+              user_pipeline, test_stream_service)
 
     pipeline_to_execute = beam.pipeline.Pipeline.from_runner_api(
         pipeline_instrument.instrumented_pipeline_proto(),
         self._underlying_runner,
         options)
 
+    if ie.current_env().get_test_stream_service_controller(user_pipeline):
+      endpoint = ie.current_env().get_test_stream_service_controller(
+          user_pipeline).endpoint
+
+      # TODO: make the StreamingCacheManager and TestStreamServiceController
+      # constructed when the InteractiveEnvironment is imported.
+      class TestStreamVisitor(PipelineVisitor):
+        def visit_transform(self, transform_node):
+          from apache_beam.testing.test_stream import TestStream
+          if (isinstance(transform_node.transform, TestStream) and
+              not transform_node.transform._events):
+            transform_node.transform._endpoint = endpoint
+
+      pipeline_to_execute.visit(TestStreamVisitor())
+
     if not self._skip_display:
       a_pipeline_graph = pipeline_graph.PipelineGraph(
           pipeline_instrument.original_pipeline,
@@ -216,6 +246,7 @@ class PipelineResult(beam.runners.runner.PipelineResult):
 
   def get(self, pcoll, include_window_info=False):
     """Materializes the PCollection into a list.
+
     If include_window_info is True, then returns the elements as
     WindowedValues. Otherwise, return the element as itself.
     """
@@ -223,6 +254,7 @@ class PipelineResult(beam.runners.runner.PipelineResult):
 
   def read(self, pcoll, include_window_info=False):
     """Reads the PCollection one element at a time from cache.
+
     If include_window_info is True, then returns the elements as
     WindowedValues. Otherwise, return the element as itself.
     """
diff --git a/sdks/python/apache_beam/runners/interactive/interactive_runner_test.py b/sdks/python/apache_beam/runners/interactive/interactive_runner_test.py
index 3e1ad80..5b7090e 100644
--- a/sdks/python/apache_beam/runners/interactive/interactive_runner_test.py
+++ b/sdks/python/apache_beam/runners/interactive/interactive_runner_test.py
@@ -28,12 +28,19 @@ from __future__ import print_function
 
 import unittest
 
+import pandas as pd
+
 import apache_beam as beam
 from apache_beam.runners.direct import direct_runner
 from apache_beam.runners.interactive import interactive_beam as ib
 from apache_beam.runners.interactive import interactive_environment as ie
 from apache_beam.runners.interactive import interactive_runner
 from apache_beam.runners.interactive.testing.mock_ipython import mock_get_ipython
+from apache_beam.transforms.window import GlobalWindow
+from apache_beam.utils.timestamp import Timestamp
+from apache_beam.utils.windowed_value import PaneInfo
+from apache_beam.utils.windowed_value import PaneInfoTiming
+from apache_beam.utils.windowed_value import WindowedValue
 
 # TODO(BEAM-8288): clean up the work-around of nose tests using Python2 without
 # unittest.mock module.
@@ -98,19 +105,47 @@ class InteractiveRunnerTest(unittest.TestCase):
     result = p.run()
     result.wait_until_finish()
 
-    actual = dict(result.get(counts))
-    self.assertDictEqual(
-        actual,
-        {
-            'to': 2,
-            'be': 2,
-            'or': 1,
-            'not': 1,
-            'that': 1,
-            'is': 1,
-            'the': 1,
-            'question': 1
-        })
+    actual = list(result.get(counts))
+    self.assertSetEqual(
+        set(actual),
+        set([
+            ('or', 1),
+            ('that', 1),
+            ('be', 2),
+            ('is', 1),
+            ('question', 1),
+            ('to', 2),
+            ('the', 1),
+            ('not', 1),
+        ]))
+
+    # Truncate the precision to millis because the window coder uses millis
+    # as units then gets upcast to micros.
+    end_of_window = (GlobalWindow().max_timestamp().micros // 1000) * 1000
+    df_counts = ib.collect(counts, include_window_info=True)
+    df_expected = pd.DataFrame({
+        0: [e[0] for e in actual],
+        1: [e[1] for e in actual],
+        'event_time': [end_of_window for _ in actual],
+        'windows': [[GlobalWindow()] for _ in actual],
+        'pane_info': [
+            PaneInfo(True, True, PaneInfoTiming.ON_TIME, 0, 0) for _ in actual
+        ]
+    },
+                               columns=[
+                                   0, 1, 'event_time', 'windows', 'pane_info'
+                               ])
+
+    pd.testing.assert_frame_equal(df_expected, df_counts)
+
+    actual_reified = result.get(counts, include_window_info=True)
+    expected_reified = [
+        WindowedValue(
+            e,
+            Timestamp(micros=end_of_window), [GlobalWindow()],
+            PaneInfo(True, True, PaneInfoTiming.ON_TIME, 0, 0)) for e in actual
+    ]
+    self.assertEqual(actual_reified, expected_reified)
 
   def test_session(self):
     class MockPipelineRunner(object):
diff --git a/sdks/python/apache_beam/runners/interactive/options/capture_control.py b/sdks/python/apache_beam/runners/interactive/options/capture_control.py
index 8b484e2..14bb742 100644
--- a/sdks/python/apache_beam/runners/interactive/options/capture_control.py
+++ b/sdks/python/apache_beam/runners/interactive/options/capture_control.py
@@ -42,7 +42,7 @@ class CaptureControl(object):
     self._enable_capture_replay = True
     self._capturable_sources = {
         ReadFromPubSub,
-    }
+    }  # yapf: disable
     self._capture_duration = timedelta(seconds=5)
     self._capture_size_limit = 1e9
 
diff --git a/sdks/python/apache_beam/runners/interactive/pipeline_fragment.py b/sdks/python/apache_beam/runners/interactive/pipeline_fragment.py
index 54316f3..2831af4 100644
--- a/sdks/python/apache_beam/runners/interactive/pipeline_fragment.py
+++ b/sdks/python/apache_beam/runners/interactive/pipeline_fragment.py
@@ -204,6 +204,9 @@ class PipelineFragment(object):
       def enter_composite_transform(self, transform_node):
         if isinstance(transform_node.transform, TestStream):
           return
+        if isinstance(transform_node.transform, TestStream):
+          return
+
         pruned_parts = list(transform_node.parts)
         for part in transform_node.parts:
           if part not in necessary_transforms:
diff --git a/sdks/python/apache_beam/runners/interactive/pipeline_instrument.py b/sdks/python/apache_beam/runners/interactive/pipeline_instrument.py
index e803627..b39fea4 100644
--- a/sdks/python/apache_beam/runners/interactive/pipeline_instrument.py
+++ b/sdks/python/apache_beam/runners/interactive/pipeline_instrument.py
@@ -229,7 +229,7 @@ class PipelineInstrument(object):
     # Create the pipeline_proto to read all the components from. It will later
     # create a new pipeline proto from the cut out components.
     pipeline_proto, context = pipeline.to_runner_api(
-      return_context=True, use_fake_coders=False)
+        return_context=True, use_fake_coders=False)
 
     # Get all the root transforms. The caching transforms will be subtransforms
     # of one of these roots.
@@ -433,6 +433,7 @@ class PipelineInstrument(object):
           cacheable_input in unbounded_source_pcolls)
     # Replace/wire inputs w/ cached PCollections from ReadCache transforms.
     self._replace_with_cached_inputs(self._pipeline)
+
     # Write cache for all cacheables.
     for _, cacheable in self.cacheables.items():
       self._write_cache(
diff --git a/sdks/python/apache_beam/runners/interactive/pipeline_instrument_test.py b/sdks/python/apache_beam/runners/interactive/pipeline_instrument_test.py
index 56792ef..8fa9724 100644
--- a/sdks/python/apache_beam/runners/interactive/pipeline_instrument_test.py
+++ b/sdks/python/apache_beam/runners/interactive/pipeline_instrument_test.py
@@ -26,7 +26,6 @@ import unittest
 import apache_beam as beam
 from apache_beam import coders
 from apache_beam.pipeline import PipelineVisitor
-from apache_beam.portability.api.beam_runner_api_pb2 import TestStreamPayload
 from apache_beam.runners.interactive import cache_manager as cache
 from apache_beam.runners.interactive import interactive_beam as ib
 from apache_beam.runners.interactive import interactive_environment as ie
@@ -307,7 +306,7 @@ class PipelineInstrumentTest(unittest.TestCase):
 
   def test_instrument_example_unbounded_pipeline_to_read_cache(self):
     """Tests that the instrumenter works for a single unbounded source.
-        """
+    """
     # Create a new interactive environment to make the test idempotent.
     ie.new_env(cache_manager=streaming_cache.StreamingCache(cache_dir=None))
 
@@ -328,7 +327,7 @@ class PipelineInstrumentTest(unittest.TestCase):
       if not isinstance(pcoll, beam.pvalue.PCollection):
         continue
       cache_key = cache_key_of(name, pcoll)
-      self._mock_write_cache([TestStreamPayload()], cache_key)
+      self._mock_write_cache([b''], cache_key)
 
     # Instrument the original pipeline to create the pipeline the user will see.
     instrumenter = instr.build_pipeline_instrument(p_original)
@@ -467,9 +466,9 @@ class PipelineInstrumentTest(unittest.TestCase):
   def test_instrument_mixed_streaming_batch(self):
     """Tests caching for both batch and streaming sources in the same pipeline.
 
-        This ensures that cached bounded and unbounded sources are read from the
-        TestStream.
-        """
+    This ensures that cached bounded and unbounded sources are read from the
+    TestStream.
+    """
     # Create a new interactive environment to make the test idempotent.
     ie.new_env(cache_manager=streaming_cache.StreamingCache(cache_dir=None))
 
@@ -492,8 +491,7 @@ class PipelineInstrumentTest(unittest.TestCase):
     def cache_key_of(name, pcoll):
       return name + '_' + str(id(pcoll)) + '_' + str(id(pcoll.producer))
 
-    self._mock_write_cache([TestStreamPayload()],
-                           cache_key_of('source_2', source_2))
+    self._mock_write_cache([b''], cache_key_of('source_2', source_2))
     ie.current_env().mark_pcollection_computed([source_2])
 
     # Instrument the original pipeline to create the pipeline the user will see.
@@ -552,7 +550,7 @@ class PipelineInstrumentTest(unittest.TestCase):
 
   def test_instrument_example_unbounded_pipeline_direct_from_source(self):
     """Tests that the it caches PCollections from a source.
-        """
+    """
     # Create a new interactive environment to make the test idempotent.
     ie.new_env(cache_manager=streaming_cache.StreamingCache(cache_dir=None))
 
@@ -618,7 +616,7 @@ class PipelineInstrumentTest(unittest.TestCase):
 
   def test_instrument_example_unbounded_pipeline_to_read_cache_not_cached(self):
     """Tests that the instrumenter works when the PCollection is not cached.
-        """
+    """
     # Create a new interactive environment to make the test idempotent.
     ie.new_env(cache_manager=streaming_cache.StreamingCache(cache_dir=None))
 
@@ -689,7 +687,7 @@ class PipelineInstrumentTest(unittest.TestCase):
 
   def test_instrument_example_unbounded_pipeline_to_multiple_read_cache(self):
     """Tests that the instrumenter works for multiple unbounded sources.
-        """
+    """
     # Create a new interactive environment to make the test idempotent.
     ie.new_env(cache_manager=streaming_cache.StreamingCache(cache_dir=None))
 
@@ -714,7 +712,7 @@ class PipelineInstrumentTest(unittest.TestCase):
       if not isinstance(pcoll, beam.pvalue.PCollection):
         continue
       cache_key = cache_key_of(name, pcoll)
-      self._mock_write_cache([TestStreamPayload()], cache_key)
+      self._mock_write_cache([b''], cache_key)
 
     # Instrument the original pipeline to create the pipeline the user will see.
     instrumenter = instr.build_pipeline_instrument(p_original)
diff --git a/sdks/python/apache_beam/runners/interactive/testing/pipeline_assertion.py b/sdks/python/apache_beam/runners/interactive/testing/pipeline_assertion.py
index 55e07db..9f4bdbc 100644
--- a/sdks/python/apache_beam/runners/interactive/testing/pipeline_assertion.py
+++ b/sdks/python/apache_beam/runners/interactive/testing/pipeline_assertion.py
@@ -69,7 +69,7 @@ def assert_pipeline_proto_equal(
 def assert_pipeline_proto_contain_top_level_transform(
     test_case, pipeline_proto, transform_label):
   """Asserts the top level transforms contain a transform with the given
-     transform label."""
+   transform label."""
   _assert_pipeline_proto_contains_top_level_transform(
       test_case, pipeline_proto, transform_label, True)
 
@@ -77,7 +77,7 @@ def assert_pipeline_proto_contain_top_level_transform(
 def assert_pipeline_proto_not_contain_top_level_transform(
     test_case, pipeline_proto, transform_label):
   """Asserts the top level transforms do not contain a transform with the given
-     transform label."""
+   transform label."""
   _assert_pipeline_proto_contains_top_level_transform(
       test_case, pipeline_proto, transform_label, False)
 
diff --git a/sdks/python/apache_beam/runners/interactive/utils.py b/sdks/python/apache_beam/runners/interactive/utils.py
index d78bb9c..046cf6e 100644
--- a/sdks/python/apache_beam/runners/interactive/utils.py
+++ b/sdks/python/apache_beam/runners/interactive/utils.py
@@ -26,10 +26,10 @@ from apache_beam.portability.api.beam_runner_api_pb2 import TestStreamPayload
 
 
 def to_element_list(
-        reader, # type: Generator[Union[TestStreamPayload.Event, WindowedValueHolder]]
-        coder, # type: Coder
-        include_window_info # type: bool
-):
+    reader, # type: Generator[Union[TestStreamPayload.Event, WindowedValueHolder]]
+    coder, # type: Coder
+    include_window_info # type: bool
+    ):
   # type: (...) -> List[WindowedValue]
 
   """Returns an iterator that properly decodes the elements from the reader.