You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2020/07/30 07:28:11 UTC

[GitHub] [beam] rohdesamuel opened a new pull request #12415: Add the RecordingManager and associated classes.

rohdesamuel opened a new pull request #12415:
URL: https://github.com/apache/beam/pull/12415


   The RecordingManager abstracts out the details on how to read elements from cache.
   
   Change-Id: I9e3c57d91ff56aba4b54d8c2e627b658c0667089
   
   PR 6/7
   
   ------------------------
   
   Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
   
    - [ ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`).
    - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.
    - [ ] Update `CHANGES.md` with noteworthy changes.
    - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier).
   
   Post-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   Lang | SDK | Dataflow | Flink | Samza | Spark | Twister2
   --- | --- | --- | --- | --- | --- | ---
   Go | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/) | ---
   Java | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/i
 con)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)<br>[![Build Status](htt
 ps://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/)
   Python | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python38/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python38/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_
 Py_VR_Dataflow/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Python2_PVR_Flink_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Python2_PVR_Flink_Cron/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python35_VR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python35_VR_Flink/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_P
 ostCommit_Python_VR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/) | ---
   XLang | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Direct/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Direct/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/) | ---
   
   Pre-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   --- |Java | Python | Go | Website
   --- | --- | --- | --- | ---
   Non-portable | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_PythonDocker_Cron/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_PythonDocker_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/b
 eam_PreCommit_Website_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/)
   Portable | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/) | --- | ---
   
   See [.test-infra/jenkins/README](https://github.com/apache/beam/blob/master/.test-infra/jenkins/README.md) for trigger phrase, status and link of all Jenkins jobs.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] rohdesamuel commented on pull request #12415: [BEAM-10603] Add the RecordingManager and associated classes.

Posted by GitBox <gi...@apache.org>.
rohdesamuel commented on pull request #12415:
URL: https://github.com/apache/beam/pull/12415#issuecomment-675760649


   Run PythonDocker PreCommit


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] pabloem commented on a change in pull request #12415: [BEAM-10603] Add the RecordingManager and associated classes.

Posted by GitBox <gi...@apache.org>.
pabloem commented on a change in pull request #12415:
URL: https://github.com/apache/beam/pull/12415#discussion_r475799361



##########
File path: sdks/python/apache_beam/runners/interactive/recording_manager_test.py
##########
@@ -0,0 +1,301 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#  http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from __future__ import absolute_import
+
+import sys
+import unittest
+
+import apache_beam as beam
+from apache_beam import coders
+from apache_beam.portability.api.beam_interactive_api_pb2 import TestStreamFileRecord
+from apache_beam.runners.interactive import background_caching_job as bcj
+from apache_beam.runners.interactive import interactive_beam as ib
+from apache_beam.runners.interactive import interactive_environment as ie
+from apache_beam.runners.interactive import pipeline_instrument as pi
+from apache_beam.runners.interactive.interactive_runner import InteractiveRunner
+from apache_beam.runners.interactive.recording_manager import ElementStream
+from apache_beam.runners.interactive.recording_manager import Recording
+from apache_beam.runners.interactive.recording_manager import RecordingManager
+from apache_beam.runners.interactive.testing.test_cache_manager import FileRecordsBuilder
+from apache_beam.runners.interactive.testing.test_cache_manager import InMemoryCache
+from apache_beam.transforms.window import GlobalWindow
+from apache_beam.utils.timestamp import MIN_TIMESTAMP
+from apache_beam.utils.windowed_value import WindowedValue
+
+PipelineState = beam.runners.runner.PipelineState
+
+
+class MockPipelineResult(beam.runners.runner.PipelineResult):
+  """Mock class for controlling a PipelineResult."""
+  def __init__(self):
+    self._state = PipelineState.RUNNING
+
+  def wait_until_finish(self):
+    pass
+
+  def set_state(self, state):
+    self._state = state
+
+  @property
+  def state(self):
+    return self._state
+
+  def cancel(self):
+    self._state = PipelineState.CANCELLED
+
+
+class ElementStreamTest(unittest.TestCase):
+  def setUp(self):
+    ie.new_env()
+
+    self.cache = InMemoryCache()
+    self.p = beam.Pipeline()
+    self.pcoll = self.p | beam.Create([])
+    self.cache_key = str(pi.CacheKey('pcoll', '', '', ''))
+
+    # Create a MockPipelineResult to control the state of a fake run of the
+    # pipeline.
+    self.mock_result = MockPipelineResult()
+    ie.current_env().track_user_pipelines()
+    ie.current_env().set_pipeline_result(self.p, self.mock_result)
+    ie.current_env().set_cache_manager(self.cache, self.p)
+
+  def test_read(self):
+    """Test reading and if a stream is done no more elements are returned."""
+
+    self.mock_result.set_state(PipelineState.DONE)
+    self.cache.write(['expected'], 'full', self.cache_key)
+    self.cache.save_pcoder(None, 'full', self.cache_key)
+
+    stream = ElementStream(
+        self.pcoll, '', self.cache_key, max_n=1, max_duration_secs=1)
+
+    self.assertFalse(stream.is_done())
+    self.assertEqual(list(stream.read())[0], 'expected')
+    self.assertTrue(stream.is_done())
+
+  def test_done_if_terminated(self):
+    """Test that terminating the job sets the stream as done."""
+
+    self.cache.write(['expected'], 'full', self.cache_key)
+    self.cache.save_pcoder(None, 'full', self.cache_key)
+
+    stream = ElementStream(
+        self.pcoll, '', self.cache_key, max_n=100, max_duration_secs=10)
+
+    self.assertFalse(stream.is_done())
+    self.assertEqual(list(stream.read(tail=False))[0], 'expected')
+
+    # The limiters were not reached, so the stream is not done yet.
+    self.assertFalse(stream.is_done())
+
+    self.mock_result.set_state(PipelineState.DONE)
+    self.assertEqual(list(stream.read(tail=False))[0], 'expected')
+
+    # The underlying pipeline is terminated, so the stream won't yield new
+    # elements.
+    self.assertTrue(stream.is_done())
+
+  def test_read_n(self):
+    """Test that the stream only reads 'n' elements."""
+
+    self.mock_result.set_state(PipelineState.DONE)
+    self.cache.write(list(range(5)), 'full', self.cache_key)
+    self.cache.save_pcoder(None, 'full', self.cache_key)
+
+    stream = ElementStream(
+        self.pcoll, '', self.cache_key, max_n=1, max_duration_secs=1)
+    self.assertEqual(list(stream.read()), [0])
+    self.assertTrue(stream.is_done())
+
+    stream = ElementStream(
+        self.pcoll, '', self.cache_key, max_n=2, max_duration_secs=1)
+    self.assertEqual(list(stream.read()), [0, 1])
+    self.assertTrue(stream.is_done())
+
+    stream = ElementStream(
+        self.pcoll, '', self.cache_key, max_n=5, max_duration_secs=1)
+    self.assertEqual(list(stream.read()), list(range(5)))
+    self.assertTrue(stream.is_done())
+
+    # Test that if the user asks for more than in the cache it still returns.
+    stream = ElementStream(
+        self.pcoll, '', self.cache_key, max_n=10, max_duration_secs=1)
+    self.assertEqual(list(stream.read()), list(range(5)))
+    self.assertTrue(stream.is_done())
+
+  def test_read_duration(self):
+    """Test that the stream only reads a 'duration' of elements."""
+
+    values = (FileRecordsBuilder(tag=self.cache_key)
+              .advance_processing_time(1)
+              .add_element(element=0, event_time_secs=0)
+              .advance_processing_time(1)
+              .add_element(element=1, event_time_secs=1)
+              .advance_processing_time(1)
+              .add_element(element=2, event_time_secs=3)
+              .advance_processing_time(1)
+              .add_element(element=3, event_time_secs=4)
+              .advance_processing_time(1)
+              .add_element(element=4, event_time_secs=5)
+              .build()) # yapf: disable
+
+    self.mock_result.set_state(PipelineState.DONE)
+    self.cache.write(values, 'full', self.cache_key)
+    self.cache.save_pcoder(None, 'full', self.cache_key)
+
+    # The elements read from the cache are TestStreamFileRecord instances and
+    # have the underlying elements encoded. This method decodes the elements
+    # from the TestStreamFileRecord.
+    def get_elements(events):
+      coder = coders.FastPrimitivesCoder()
+      elements = []
+      for e in events:
+        if not isinstance(e, TestStreamFileRecord):
+          continue
+
+        if e.recorded_event.element_event:
+          elements += ([
+              coder.decode(el.encoded_element)
+              for el in e.recorded_event.element_event.elements
+          ])
+      return elements
+
+    # The following tests a progression of reading different durations from the
+    # cache.
+    stream = ElementStream(
+        self.pcoll, '', self.cache_key, max_n=100, max_duration_secs=1)
+    self.assertSequenceEqual(get_elements(stream.read()), [0])
+
+    stream = ElementStream(
+        self.pcoll, '', self.cache_key, max_n=100, max_duration_secs=2)
+    self.assertSequenceEqual(get_elements(stream.read()), [0, 1])
+
+    stream = ElementStream(
+        self.pcoll, '', self.cache_key, max_n=100, max_duration_secs=10)
+    self.assertSequenceEqual(get_elements(stream.read()), [0, 1, 2, 3, 4])
+
+
+class RecordingTest(unittest.TestCase):
+  def setUp(self):
+    ie.new_env()
+
+  @unittest.skipIf(
+      sys.version_info < (3, 6, 0),
+      'This test requires at least Python 3.6 to work.')
+  def test_computed(self):
+    """Tests that a PCollection is marked as computed only in a complete state.
+
+    Because the background caching job is now long-lived, repeated runs of a
+    PipelineFragment may yield different results for the same PCollection.
+    """
+
+    p = beam.Pipeline(InteractiveRunner())
+    elems = p | beam.Create([0, 1, 2])
+
+    ib.watch(locals())
+
+    # Create a MockPipelineResult to control the state of a fake run of the
+    # pipeline.
+    mock_result = MockPipelineResult()
+    ie.current_env().track_user_pipelines()
+    ie.current_env().set_pipeline_result(p, mock_result)
+
+    # Create a mock BackgroundCachingJob that will control whether to set the
+    # PCollections as computed or not.
+    bcj_mock_result = MockPipelineResult()
+    background_caching_job = bcj.BackgroundCachingJob(bcj_mock_result, [])
+
+    # Create a recording.
+    recording = Recording(
+        p, [elems],
+        mock_result,
+        pi.PipelineInstrument(p),
+        max_n=10,
+        max_duration_secs=60)
+
+    # The background caching job and the recording isn't done yet so there may
+    # be more elements to be recorded.
+    self.assertFalse(recording.is_computed())
+    self.assertFalse(recording.computed())
+    self.assertTrue(recording.uncomputed())
+
+    # The recording is finished but the background caching job is not. There
+    # may still be more elements to record, or the intermediate PCollection may
+    # have stopped caching in an incomplete state, e.g. before a window could
+    # fire.
+    mock_result.set_state(PipelineState.DONE)
+    recording.wait_until_finish()
+
+    self.assertFalse(recording.is_computed())
+    self.assertFalse(recording.computed())
+    self.assertTrue(recording.uncomputed())
+
+    # The background caching job finished before we started a recording which
+    # is a sure signal that there will be no more elements.
+    bcj_mock_result.set_state(PipelineState.DONE)
+    ie.current_env().set_background_caching_job(p, background_caching_job)
+    recording = Recording(
+        p, [elems],
+        mock_result,
+        pi.PipelineInstrument(p),
+        max_n=10,
+        max_duration_secs=60)
+    recording.wait_until_finish()
+
+    # There are no more elements and the recording finished, meaning that the
+    # intermediate PCollections are in a complete state. They can now be marked
+    # as computed.
+    self.assertTrue(recording.is_computed())
+    self.assertTrue(recording.computed())
+    self.assertFalse(recording.uncomputed())
+
+
+class RecordingManagerTest(unittest.TestCase):
+  def setUp(self):
+    ie.new_env()
+
+  @unittest.skipIf(
+      sys.version_info < (3, 6, 0),
+      'This test requires at least Python 3.6 to work.')
+  def test_basic_wordcount(self):

Review comment:
       makes sense. Thanks Sam!




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] rohdesamuel commented on a change in pull request #12415: [BEAM-10603] Add the RecordingManager and associated classes.

Posted by GitBox <gi...@apache.org>.
rohdesamuel commented on a change in pull request #12415:
URL: https://github.com/apache/beam/pull/12415#discussion_r472561720



##########
File path: sdks/python/apache_beam/runners/interactive/recording_manager.py
##########
@@ -0,0 +1,330 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from __future__ import absolute_import
+
+import logging
+import threading
+import time
+import warnings
+
+import apache_beam as beam
+from apache_beam.runners.interactive import background_caching_job as bcj
+from apache_beam.runners.interactive import interactive_environment as ie
+from apache_beam.runners.interactive import interactive_runner as ir
+from apache_beam.runners.interactive import pipeline_fragment as pf
+from apache_beam.runners.interactive import pipeline_instrument as pi
+from apache_beam.runners.interactive import utils
+from apache_beam.runners.interactive.options.capture_limiters import CountLimiter
+from apache_beam.runners.interactive.options.capture_limiters import ProcessingTimeLimiter
+
+_LOGGER = logging.getLogger(__name__)
+
+PipelineState = beam.runners.runner.PipelineState
+
+
+class ElementStream:
+  """A stream of elements from a given PCollection."""
+  def __init__(
+      self,
+      pcoll,  # type: beam.pvalue.PCollection
+      var,  # type: str
+      cache_key,  # type: str
+      max_n,  # type: int
+      max_duration_secs  # type: float
+      ):
+    self._pcoll = pcoll
+    self._cache_key = cache_key
+    self._pipeline = pcoll.pipeline
+    self._var = var
+    self._n = max_n
+    self._duration_secs = max_duration_secs
+
+    # A small state variable that when True, indicates that no more new elements
+    # will be yielded if read() is called again.
+    self._done = False
+
+  def var(self):
+    # type: () -> str
+
+    """Returns the variable named that defined this PCollection."""
+    return self._var
+
+  def display_id(self, suffix):
+    #Any type: (str) -> str
+
+    """Returns a unique id able to be displayed in a web browser."""
+    return utils.obfuscate(self._cache_key, suffix)
+
+  def is_computed(self):
+    # type: () -> boolean
+
+    """Returns True if no more elements will be recorded."""
+    return self._pcoll in ie.current_env().computed_pcollections
+
+  def is_done(self):
+    # type: () -> boolean
+
+    """Returns True if no more new elements will be yielded."""
+    return self._done
+
+  def read(self, tail=True):
+    # type: (boolean) -> Any
+
+    """Reads the elements currently recorded."""
+
+    # Get the cache manager and wait until the file exists.
+    cache_manager = ie.current_env().get_cache_manager(self._pipeline)
+    while not cache_manager.exists('full', self._cache_key):
+      pass

Review comment:
       Ack added a sleep.

##########
File path: sdks/python/apache_beam/runners/interactive/recording_manager.py
##########
@@ -0,0 +1,330 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from __future__ import absolute_import
+
+import logging
+import threading
+import time
+import warnings
+
+import apache_beam as beam
+from apache_beam.runners.interactive import background_caching_job as bcj
+from apache_beam.runners.interactive import interactive_environment as ie
+from apache_beam.runners.interactive import interactive_runner as ir
+from apache_beam.runners.interactive import pipeline_fragment as pf
+from apache_beam.runners.interactive import pipeline_instrument as pi
+from apache_beam.runners.interactive import utils
+from apache_beam.runners.interactive.options.capture_limiters import CountLimiter
+from apache_beam.runners.interactive.options.capture_limiters import ProcessingTimeLimiter
+
+_LOGGER = logging.getLogger(__name__)
+
+PipelineState = beam.runners.runner.PipelineState
+
+
+class ElementStream:
+  """A stream of elements from a given PCollection."""
+  def __init__(
+      self,
+      pcoll,  # type: beam.pvalue.PCollection
+      var,  # type: str
+      cache_key,  # type: str
+      max_n,  # type: int
+      max_duration_secs  # type: float
+      ):
+    self._pcoll = pcoll
+    self._cache_key = cache_key
+    self._pipeline = pcoll.pipeline
+    self._var = var
+    self._n = max_n
+    self._duration_secs = max_duration_secs
+
+    # A small state variable that when True, indicates that no more new elements
+    # will be yielded if read() is called again.
+    self._done = False
+
+  def var(self):
+    # type: () -> str
+
+    """Returns the variable named that defined this PCollection."""
+    return self._var
+
+  def display_id(self, suffix):
+    #Any type: (str) -> str
+
+    """Returns a unique id able to be displayed in a web browser."""
+    return utils.obfuscate(self._cache_key, suffix)
+
+  def is_computed(self):
+    # type: () -> boolean
+
+    """Returns True if no more elements will be recorded."""
+    return self._pcoll in ie.current_env().computed_pcollections
+
+  def is_done(self):
+    # type: () -> boolean
+
+    """Returns True if no more new elements will be yielded."""
+    return self._done
+
+  def read(self, tail=True):
+    # type: (boolean) -> Any
+
+    """Reads the elements currently recorded."""
+
+    # Get the cache manager and wait until the file exists.
+    cache_manager = ie.current_env().get_cache_manager(self._pipeline)
+    while not cache_manager.exists('full', self._cache_key):
+      pass
+
+    # Retrieve the coder for the particular PCollection which will be used to
+    # decode elements read from cache.
+    coder = cache_manager.load_pcoder('full', self._cache_key)
+
+    # Read the elements from the cache.
+    limiters = [
+        CountLimiter(self._n), ProcessingTimeLimiter(self._duration_secs)
+    ]
+    if hasattr(cache_manager, 'read_multiple'):

Review comment:
       Actually, I forgot to remove this prototype. I added tail as an optional to read() in a previous PR so I can remove this.

##########
File path: sdks/python/apache_beam/runners/interactive/recording_manager.py
##########
@@ -0,0 +1,330 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from __future__ import absolute_import
+
+import logging
+import threading
+import time
+import warnings
+
+import apache_beam as beam
+from apache_beam.runners.interactive import background_caching_job as bcj
+from apache_beam.runners.interactive import interactive_environment as ie
+from apache_beam.runners.interactive import interactive_runner as ir
+from apache_beam.runners.interactive import pipeline_fragment as pf
+from apache_beam.runners.interactive import pipeline_instrument as pi
+from apache_beam.runners.interactive import utils
+from apache_beam.runners.interactive.options.capture_limiters import CountLimiter
+from apache_beam.runners.interactive.options.capture_limiters import ProcessingTimeLimiter
+
+_LOGGER = logging.getLogger(__name__)
+
+PipelineState = beam.runners.runner.PipelineState
+
+
+class ElementStream:
+  """A stream of elements from a given PCollection."""
+  def __init__(
+      self,
+      pcoll,  # type: beam.pvalue.PCollection
+      var,  # type: str
+      cache_key,  # type: str
+      max_n,  # type: int
+      max_duration_secs  # type: float
+      ):
+    self._pcoll = pcoll
+    self._cache_key = cache_key
+    self._pipeline = pcoll.pipeline
+    self._var = var
+    self._n = max_n
+    self._duration_secs = max_duration_secs
+
+    # A small state variable that when True, indicates that no more new elements
+    # will be yielded if read() is called again.
+    self._done = False
+
+  def var(self):
+    # type: () -> str
+
+    """Returns the variable named that defined this PCollection."""
+    return self._var
+
+  def display_id(self, suffix):
+    #Any type: (str) -> str
+
+    """Returns a unique id able to be displayed in a web browser."""
+    return utils.obfuscate(self._cache_key, suffix)
+
+  def is_computed(self):
+    # type: () -> boolean
+
+    """Returns True if no more elements will be recorded."""
+    return self._pcoll in ie.current_env().computed_pcollections
+
+  def is_done(self):
+    # type: () -> boolean
+
+    """Returns True if no more new elements will be yielded."""
+    return self._done
+
+  def read(self, tail=True):
+    # type: (boolean) -> Any
+
+    """Reads the elements currently recorded."""
+
+    # Get the cache manager and wait until the file exists.
+    cache_manager = ie.current_env().get_cache_manager(self._pipeline)
+    while not cache_manager.exists('full', self._cache_key):
+      pass
+
+    # Retrieve the coder for the particular PCollection which will be used to
+    # decode elements read from cache.
+    coder = cache_manager.load_pcoder('full', self._cache_key)
+
+    # Read the elements from the cache.
+    limiters = [
+        CountLimiter(self._n), ProcessingTimeLimiter(self._duration_secs)
+    ]
+    if hasattr(cache_manager, 'read_multiple'):
+      reader = cache_manager.read_multiple([('full', self._cache_key)],
+                                           limiters=limiters,
+                                           tail=tail)
+    else:
+      reader, _ = cache_manager.read('full', self._cache_key, limiters=limiters)
+
+    # Because a single TestStreamFileRecord can yield multiple elements, we
+    # limit the count again here in the to_element_list call.
+    for e in utils.to_element_list(reader,
+                                   coder,
+                                   include_window_info=True,
+                                   n=self._n):
+      yield e
+
+    # A limiter being triggered means that we have fulfilled the user's request.
+    # This implies that reading from the cache again won't yield any new
+    # elements. WLOG, this applies to the user pipeline being terminated.
+    if any(l.is_triggered()
+           for l in limiters) or ie.current_env().is_terminated(self._pipeline):
+      self._done = True
+
+
+class Recording:
+  """A group of PCollections from a given pipeline run."""
+  def __init__(
+      self,
+      user_pipeline,  # type: beam.Pipeline
+      pcolls,  # type: List[beam.pvalue.PCollection]
+      result,  # type: beam.runner.PipelineResult
+      pipeline_instrument,  # type: beam.runners.interactive.PipelineInstrument
+      max_n,  # type: int
+      max_duration_secs  # type: float
+      ):
+
+    self._user_pipeline = user_pipeline
+    self._result = result
+    self._pcolls = pcolls
+
+    pcoll_var = lambda pcoll: pipeline_instrument.cacheable_var_by_pcoll_id(
+        pipeline_instrument.pcolls_to_pcoll_id.get(str(pcoll), None))
+
+    self._streams = {
+        pcoll: ElementStream(
+            pcoll,
+            pcoll_var(pcoll),
+            pipeline_instrument.cache_key(pcoll),
+            max_n,
+            max_duration_secs)
+        for pcoll in pcolls
+    }
+    self._start = time.time()
+    self._duration_secs = max_duration_secs
+    self._set_computed = bcj.is_cache_complete(str(id(user_pipeline)))
+
+    # Run a separate thread for marking the PCollections done. This is because
+    # the pipeline run may be asynchronous.
+    self._mark_computed = threading.Thread(target=self._mark_all_computed)
+    self._mark_computed.daemon = True
+    self._mark_computed.start()
+
+  def _mark_all_computed(self):
+    # type: () -> None
+
+    """Marks all the PCollections upon a successful pipeline run."""
+    if not self._result:
+      return
+
+    while not PipelineState.is_terminal(self._result.state):

Review comment:
       Ack added a sleep




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] davidyan74 commented on a change in pull request #12415: [BEAM-10603] Add the RecordingManager and associated classes.

Posted by GitBox <gi...@apache.org>.
davidyan74 commented on a change in pull request #12415:
URL: https://github.com/apache/beam/pull/12415#discussion_r473288427



##########
File path: sdks/python/apache_beam/runners/interactive/recording_manager.py
##########
@@ -0,0 +1,329 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from __future__ import absolute_import
+
+import logging
+import threading
+import time
+import warnings
+
+import apache_beam as beam
+from apache_beam.runners.interactive import background_caching_job as bcj
+from apache_beam.runners.interactive import interactive_environment as ie
+from apache_beam.runners.interactive import interactive_runner as ir
+from apache_beam.runners.interactive import pipeline_fragment as pf
+from apache_beam.runners.interactive import pipeline_instrument as pi
+from apache_beam.runners.interactive import utils
+from apache_beam.runners.interactive.options.capture_limiters import CountLimiter
+from apache_beam.runners.interactive.options.capture_limiters import ProcessingTimeLimiter
+
+_LOGGER = logging.getLogger(__name__)
+
+PipelineState = beam.runners.runner.PipelineState
+
+
+class ElementStream:
+  """A stream of elements from a given PCollection."""
+  def __init__(
+      self,
+      pcoll,  # type: beam.pvalue.PCollection
+      var,  # type: str
+      cache_key,  # type: str
+      max_n,  # type: int
+      max_duration_secs  # type: float
+      ):
+    self._pcoll = pcoll
+    self._cache_key = cache_key
+    self._pipeline = pcoll.pipeline
+    self._var = var
+    self._n = max_n
+    self._duration_secs = max_duration_secs
+
+    # A small state variable that when True, indicates that no more new elements
+    # will be yielded if read() is called again.
+    self._done = False
+
+  def var(self):
+    # type: () -> str
+
+    """Returns the variable named that defined this PCollection."""
+    return self._var
+
+  def display_id(self, suffix):
+    #Any type: (str) -> str
+
+    """Returns a unique id able to be displayed in a web browser."""
+    return utils.obfuscate(self._cache_key, suffix)
+
+  def is_computed(self):
+    # type: () -> boolean
+
+    """Returns True if no more elements will be recorded."""
+    return self._pcoll in ie.current_env().computed_pcollections
+
+  def is_done(self):
+    # type: () -> boolean
+
+    """Returns True if no more new elements will be yielded."""
+    return self._done
+
+  def read(self, tail=True):
+    # type: (boolean) -> Any
+
+    """Reads the elements currently recorded."""
+
+    # Get the cache manager and wait until the file exists.
+    cache_manager = ie.current_env().get_cache_manager(self._pipeline)
+    while not cache_manager.exists('full', self._cache_key):
+      time.sleep(0.5)
+
+    # Retrieve the coder for the particular PCollection which will be used to
+    # decode elements read from cache.
+    coder = cache_manager.load_pcoder('full', self._cache_key)
+
+    # Read the elements from the cache.
+    limiters = [
+        CountLimiter(self._n), ProcessingTimeLimiter(self._duration_secs)
+    ]
+    reader, _ = cache_manager.read('full', self._cache_key,
+                                   limiters=limiters,
+                                   tail=tail)
+
+    # Because a single TestStreamFileRecord can yield multiple elements, we
+    # limit the count again here in the to_element_list call.
+    for e in utils.to_element_list(reader,
+                                   coder,
+                                   include_window_info=True,
+                                   n=self._n):
+      yield e
+
+    # A limiter being triggered means that we have fulfilled the user's request.
+    # This implies that reading from the cache again won't yield any new
+    # elements. WLOG, this applies to the user pipeline being terminated.
+    if any(l.is_triggered()
+           for l in limiters) or ie.current_env().is_terminated(self._pipeline):
+      self._done = True

Review comment:
       Trying to understand what's going on. If we get here, the for loop above would've exited. What would it mean if this condition was not met thus self._done was not set to True?
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] davidyan74 commented on a change in pull request #12415: [BEAM-10603] Add the RecordingManager and associated classes.

Posted by GitBox <gi...@apache.org>.
davidyan74 commented on a change in pull request #12415:
URL: https://github.com/apache/beam/pull/12415#discussion_r473380684



##########
File path: sdks/python/apache_beam/runners/interactive/recording_manager.py
##########
@@ -0,0 +1,334 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from __future__ import absolute_import
+
+import logging
+import threading
+import time
+import warnings
+
+import apache_beam as beam
+from apache_beam.runners.interactive import background_caching_job as bcj
+from apache_beam.runners.interactive import interactive_environment as ie
+from apache_beam.runners.interactive import interactive_runner as ir
+from apache_beam.runners.interactive import pipeline_fragment as pf
+from apache_beam.runners.interactive import pipeline_instrument as pi
+from apache_beam.runners.interactive import utils
+from apache_beam.runners.interactive.options.capture_limiters import CountLimiter
+from apache_beam.runners.interactive.options.capture_limiters import ProcessingTimeLimiter
+
+_LOGGER = logging.getLogger(__name__)
+
+PipelineState = beam.runners.runner.PipelineState
+
+
+class ElementStream:
+  """A stream of elements from a given PCollection."""
+  def __init__(
+      self,
+      pcoll,  # type: beam.pvalue.PCollection
+      var,  # type: str
+      cache_key,  # type: str
+      max_n,  # type: int
+      max_duration_secs  # type: float
+      ):
+    self._pcoll = pcoll
+    self._cache_key = cache_key
+    self._pipeline = pcoll.pipeline
+    self._var = var
+    self._n = max_n
+    self._duration_secs = max_duration_secs
+
+    # A small state variable that when True, indicates that no more new elements
+    # will be yielded if read() is called again.
+    self._done = False
+
+  def var(self):
+    # type: () -> str
+
+    """Returns the variable named that defined this PCollection."""
+    return self._var
+
+  def display_id(self, suffix):
+    #Any type: (str) -> str
+
+    """Returns a unique id able to be displayed in a web browser."""
+    return utils.obfuscate(self._cache_key, suffix)
+
+  def is_computed(self):
+    # type: () -> boolean
+
+    """Returns True if no more elements will be recorded."""
+    return self._pcoll in ie.current_env().computed_pcollections
+
+  def is_done(self):
+    # type: () -> boolean
+
+    """Returns True if no more new elements will be yielded."""
+    return self._done
+
+  def read(self, tail=True):
+    # type: (boolean) -> Any
+
+    """Reads the elements currently recorded."""
+
+    # Get the cache manager and wait until the file exists.
+    cache_manager = ie.current_env().get_cache_manager(self._pipeline)
+    while not cache_manager.exists('full', self._cache_key):
+      time.sleep(0.5)
+
+    # Retrieve the coder for the particular PCollection which will be used to
+    # decode elements read from cache.
+    coder = cache_manager.load_pcoder('full', self._cache_key)
+
+    # Read the elements from the cache.
+    limiters = [
+        CountLimiter(self._n), ProcessingTimeLimiter(self._duration_secs)
+    ]
+    reader, _ = cache_manager.read('full', self._cache_key,
+                                   limiters=limiters,
+                                   tail=tail)
+
+    # Because a single TestStreamFileRecord can yield multiple elements, we
+    # limit the count again here in the to_element_list call.
+    #
+    # There are two ways of exiting this loop either a limiter was triggered or
+    # all elements from the cache were read. In the latter situation, it may be
+    # the case that the pipeline was still running. Thus, another invocation of
+    # `read` will yield new elements.
+    for e in utils.to_element_list(reader,
+                                   coder,
+                                   include_window_info=True,
+                                   n=self._n):
+      yield e
+
+    # A limiter being triggered means that we have fulfilled the user's request.
+    # This implies that reading from the cache again won't yield any new
+    # elements. WLOG, this applies to the user pipeline being terminated.
+    if any(l.is_triggered()
+           for l in limiters) or ie.current_env().is_terminated(self._pipeline):
+      self._done = True
+
+
+class Recording:
+  """A group of PCollections from a given pipeline run."""
+  def __init__(
+      self,
+      user_pipeline,  # type: beam.Pipeline
+      pcolls,  # type: List[beam.pvalue.PCollection]
+      result,  # type: beam.runner.PipelineResult
+      pipeline_instrument,  # type: beam.runners.interactive.PipelineInstrument
+      max_n,  # type: int
+      max_duration_secs  # type: float
+      ):
+
+    self._user_pipeline = user_pipeline
+    self._result = result
+    self._pcolls = pcolls
+
+    pcoll_var = lambda pcoll: pipeline_instrument.cacheable_var_by_pcoll_id(
+        pipeline_instrument.pcolls_to_pcoll_id.get(str(pcoll), None))
+
+    self._streams = {
+        pcoll: ElementStream(
+            pcoll,
+            pcoll_var(pcoll),
+            pipeline_instrument.cache_key(pcoll),
+            max_n,
+            max_duration_secs)
+        for pcoll in pcolls
+    }
+    self._start = time.time()
+    self._duration_secs = max_duration_secs
+    self._set_computed = bcj.is_cache_complete(str(id(user_pipeline)))
+
+    # Run a separate thread for marking the PCollections done. This is because
+    # the pipeline run may be asynchronous.
+    self._mark_computed = threading.Thread(target=self._mark_all_computed)
+    self._mark_computed.daemon = True
+    self._mark_computed.start()
+
+  def _mark_all_computed(self):
+    # type: () -> None
+
+    """Marks all the PCollections upon a successful pipeline run."""
+    if not self._result:
+      return
+
+    while not PipelineState.is_terminal(self._result.state):
+      if time.time() - self._start >= self._duration_secs:
+        self._result.cancel()
+        self._result.wait_until_finish()
+
+      if all(s.is_done() for s in self._streams.values()):

Review comment:
       Should this be an elif, since you shouldn't need to cancel twice?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] rohdesamuel commented on a change in pull request #12415: [BEAM-10603] Add the RecordingManager and associated classes.

Posted by GitBox <gi...@apache.org>.
rohdesamuel commented on a change in pull request #12415:
URL: https://github.com/apache/beam/pull/12415#discussion_r473315920



##########
File path: sdks/python/apache_beam/runners/interactive/recording_manager.py
##########
@@ -0,0 +1,329 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from __future__ import absolute_import
+
+import logging
+import threading
+import time
+import warnings
+
+import apache_beam as beam
+from apache_beam.runners.interactive import background_caching_job as bcj
+from apache_beam.runners.interactive import interactive_environment as ie
+from apache_beam.runners.interactive import interactive_runner as ir
+from apache_beam.runners.interactive import pipeline_fragment as pf
+from apache_beam.runners.interactive import pipeline_instrument as pi
+from apache_beam.runners.interactive import utils
+from apache_beam.runners.interactive.options.capture_limiters import CountLimiter
+from apache_beam.runners.interactive.options.capture_limiters import ProcessingTimeLimiter
+
+_LOGGER = logging.getLogger(__name__)
+
+PipelineState = beam.runners.runner.PipelineState
+
+
+class ElementStream:
+  """A stream of elements from a given PCollection."""
+  def __init__(
+      self,
+      pcoll,  # type: beam.pvalue.PCollection
+      var,  # type: str
+      cache_key,  # type: str
+      max_n,  # type: int
+      max_duration_secs  # type: float
+      ):
+    self._pcoll = pcoll
+    self._cache_key = cache_key
+    self._pipeline = pcoll.pipeline
+    self._var = var
+    self._n = max_n
+    self._duration_secs = max_duration_secs
+
+    # A small state variable that when True, indicates that no more new elements
+    # will be yielded if read() is called again.
+    self._done = False
+
+  def var(self):
+    # type: () -> str
+
+    """Returns the variable named that defined this PCollection."""
+    return self._var
+
+  def display_id(self, suffix):
+    #Any type: (str) -> str
+
+    """Returns a unique id able to be displayed in a web browser."""
+    return utils.obfuscate(self._cache_key, suffix)
+
+  def is_computed(self):
+    # type: () -> boolean
+
+    """Returns True if no more elements will be recorded."""
+    return self._pcoll in ie.current_env().computed_pcollections
+
+  def is_done(self):
+    # type: () -> boolean
+
+    """Returns True if no more new elements will be yielded."""
+    return self._done
+
+  def read(self, tail=True):
+    # type: (boolean) -> Any
+
+    """Reads the elements currently recorded."""
+
+    # Get the cache manager and wait until the file exists.
+    cache_manager = ie.current_env().get_cache_manager(self._pipeline)
+    while not cache_manager.exists('full', self._cache_key):
+      time.sleep(0.5)
+
+    # Retrieve the coder for the particular PCollection which will be used to
+    # decode elements read from cache.
+    coder = cache_manager.load_pcoder('full', self._cache_key)
+
+    # Read the elements from the cache.
+    limiters = [
+        CountLimiter(self._n), ProcessingTimeLimiter(self._duration_secs)
+    ]
+    reader, _ = cache_manager.read('full', self._cache_key,
+                                   limiters=limiters,
+                                   tail=tail)
+
+    # Because a single TestStreamFileRecord can yield multiple elements, we
+    # limit the count again here in the to_element_list call.
+    for e in utils.to_element_list(reader,
+                                   coder,
+                                   include_window_info=True,
+                                   n=self._n):
+      yield e
+
+    # A limiter being triggered means that we have fulfilled the user's request.
+    # This implies that reading from the cache again won't yield any new
+    # elements. WLOG, this applies to the user pipeline being terminated.
+    if any(l.is_triggered()
+           for l in limiters) or ie.current_env().is_terminated(self._pipeline):
+      self._done = True

Review comment:
       Done, added a comment to the loop for reading from cache.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] pabloem merged pull request #12415: [BEAM-10603] Add the RecordingManager and associated classes.

Posted by GitBox <gi...@apache.org>.
pabloem merged pull request #12415:
URL: https://github.com/apache/beam/pull/12415


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] codecov[bot] commented on pull request #12415: [BEAM-10603] Add the RecordingManager and associated classes.

Posted by GitBox <gi...@apache.org>.
codecov[bot] commented on pull request #12415:
URL: https://github.com/apache/beam/pull/12415#issuecomment-679302193


   # [Codecov](https://codecov.io/gh/apache/beam/pull/12415?src=pr&el=h1) Report
   > Merging [#12415](https://codecov.io/gh/apache/beam/pull/12415?src=pr&el=desc) into [master](https://codecov.io/gh/apache/beam/commit/e164d170eb6b5ec1dddd99f09e79dfb0147b84ae?el=desc) will **decrease** coverage by `0.08%`.
   > The diff coverage is `n/a`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/beam/pull/12415/graphs/tree.svg?width=650&height=150&src=pr&token=qcbbAh8Fj1)](https://codecov.io/gh/apache/beam/pull/12415?src=pr&el=tree)
   
   ```diff
   @@            Coverage Diff             @@
   ##           master   #12415      +/-   ##
   ==========================================
   - Coverage   34.47%   34.38%   -0.09%     
   ==========================================
     Files         684      695      +11     
     Lines       81483    84498    +3015     
     Branches     9180     9891     +711     
   ==========================================
   + Hits        28090    29058     +968     
   - Misses      52972    54960    +1988     
   - Partials      421      480      +59     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/beam/pull/12415?src=pr&el=tree) | Coverage Δ | |
   |---|---|---|
   | [typehints/typecheck\_test\_py3.py](https://codecov.io/gh/apache/beam/pull/12415/diff?src=pr&el=tree#diff-dHlwZWhpbnRzL3R5cGVjaGVja190ZXN0X3B5My5weQ==) | `33.72% <0.00%> (-13.83%)` | :arrow_down: |
   | [typehints/typecheck.py](https://codecov.io/gh/apache/beam/pull/12415/diff?src=pr&el=tree#diff-dHlwZWhpbnRzL3R5cGVjaGVjay5weQ==) | `31.38% <0.00%> (-4.24%)` | :arrow_down: |
   | [io/jdbc.py](https://codecov.io/gh/apache/beam/pull/12415/diff?src=pr&el=tree#diff-aW8vamRiYy5weQ==) | `82.35% <0.00%> (-2.65%)` | :arrow_down: |
   | [utils/interactive\_utils.py](https://codecov.io/gh/apache/beam/pull/12415/diff?src=pr&el=tree#diff-dXRpbHMvaW50ZXJhY3RpdmVfdXRpbHMucHk=) | `30.95% <0.00%> (-2.39%)` | :arrow_down: |
   | [io/external/xlang\_jdbcio\_it\_test.py](https://codecov.io/gh/apache/beam/pull/12415/diff?src=pr&el=tree#diff-aW8vZXh0ZXJuYWwveGxhbmdfamRiY2lvX2l0X3Rlc3QucHk=) | `40.24% <0.00%> (-1.65%)` | :arrow_down: |
   | [io/mongodbio\_test.py](https://codecov.io/gh/apache/beam/pull/12415/diff?src=pr&el=tree#diff-aW8vbW9uZ29kYmlvX3Rlc3QucHk=) | `34.95% <0.00%> (ø)` | |
   | [runners/direct/direct\_runner.py](https://codecov.io/gh/apache/beam/pull/12415/diff?src=pr&el=tree#diff-cnVubmVycy9kaXJlY3QvZGlyZWN0X3J1bm5lci5weQ==) | `32.88% <0.00%> (ø)` | |
   | [io/gcp/experimental/spannerio.py](https://codecov.io/gh/apache/beam/pull/12415/diff?src=pr&el=tree#diff-aW8vZ2NwL2V4cGVyaW1lbnRhbC9zcGFubmVyaW8ucHk=) | `34.49% <0.00%> (ø)` | |
   | [runners/dataflow/dataflow\_runner\_test.py](https://codecov.io/gh/apache/beam/pull/12415/diff?src=pr&el=tree#diff-cnVubmVycy9kYXRhZmxvdy9kYXRhZmxvd19ydW5uZXJfdGVzdC5weQ==) | `25.49% <0.00%> (ø)` | |
   | [runners/dataflow/ptransform\_overrides.py](https://codecov.io/gh/apache/beam/pull/12415/diff?src=pr&el=tree#diff-cnVubmVycy9kYXRhZmxvdy9wdHJhbnNmb3JtX292ZXJyaWRlcy5weQ==) | `18.11% <0.00%> (ø)` | |
   | ... and [31 more](https://codecov.io/gh/apache/beam/pull/12415/diff?src=pr&el=tree-more) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/beam/pull/12415?src=pr&el=continue).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/beam/pull/12415?src=pr&el=footer). Last update [919ade7...42c5537](https://codecov.io/gh/apache/beam/pull/12415?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] rohdesamuel commented on a change in pull request #12415: [BEAM-10603] Add the RecordingManager and associated classes.

Posted by GitBox <gi...@apache.org>.
rohdesamuel commented on a change in pull request #12415:
URL: https://github.com/apache/beam/pull/12415#discussion_r472558013



##########
File path: sdks/python/apache_beam/runners/interactive/interactive_runner.py
##########
@@ -152,7 +152,11 @@ def run_pipeline(self, pipeline, options):
               user_pipeline)):
         streaming_cache_manager = ie.current_env().get_cache_manager(
             user_pipeline)
-        if streaming_cache_manager:
+
+        # Only make the server if it doens't exist already.
+        if (streaming_cache_manager and
+            not ie.current_env().get_test_stream_service_controller(

Review comment:
       No, the server is "one-time" use. Once it stops it can't be started up again and this should be set None.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] rohdesamuel commented on pull request #12415: [BEAM-10603] Add the RecordingManager and associated classes.

Posted by GitBox <gi...@apache.org>.
rohdesamuel commented on pull request #12415:
URL: https://github.com/apache/beam/pull/12415#issuecomment-675766552


   Run Portable_Python PreCommit


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] rohdesamuel commented on a change in pull request #12415: [BEAM-10603] Add the RecordingManager and associated classes.

Posted by GitBox <gi...@apache.org>.
rohdesamuel commented on a change in pull request #12415:
URL: https://github.com/apache/beam/pull/12415#discussion_r474847078



##########
File path: sdks/python/apache_beam/runners/interactive/recording_manager.py
##########
@@ -0,0 +1,334 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from __future__ import absolute_import
+
+import logging
+import threading
+import time
+import warnings
+
+import apache_beam as beam
+from apache_beam.runners.interactive import background_caching_job as bcj
+from apache_beam.runners.interactive import interactive_environment as ie
+from apache_beam.runners.interactive import interactive_runner as ir
+from apache_beam.runners.interactive import pipeline_fragment as pf
+from apache_beam.runners.interactive import pipeline_instrument as pi
+from apache_beam.runners.interactive import utils
+from apache_beam.runners.interactive.options.capture_limiters import CountLimiter
+from apache_beam.runners.interactive.options.capture_limiters import ProcessingTimeLimiter
+
+_LOGGER = logging.getLogger(__name__)
+
+PipelineState = beam.runners.runner.PipelineState
+
+
+class ElementStream:
+  """A stream of elements from a given PCollection."""
+  def __init__(
+      self,
+      pcoll,  # type: beam.pvalue.PCollection
+      var,  # type: str
+      cache_key,  # type: str
+      max_n,  # type: int
+      max_duration_secs  # type: float
+      ):
+    self._pcoll = pcoll
+    self._cache_key = cache_key
+    self._pipeline = pcoll.pipeline
+    self._var = var
+    self._n = max_n
+    self._duration_secs = max_duration_secs
+
+    # A small state variable that when True, indicates that no more new elements
+    # will be yielded if read() is called again.
+    self._done = False
+
+  def var(self):
+    # type: () -> str
+
+    """Returns the variable named that defined this PCollection."""
+    return self._var
+
+  def display_id(self, suffix):
+    # type: (str) -> str
+
+    """Returns a unique id able to be displayed in a web browser."""
+    return utils.obfuscate(self._cache_key, suffix)
+
+  def is_computed(self):
+    # type: () -> boolean
+
+    """Returns True if no more elements will be recorded."""
+    return self._pcoll in ie.current_env().computed_pcollections
+
+  def is_done(self):
+    # type: () -> boolean
+
+    """Returns True if no more new elements will be yielded."""
+    return self._done
+
+  def read(self, tail=True):
+    # type: (boolean) -> Any
+
+    """Reads the elements currently recorded."""
+
+    # Get the cache manager and wait until the file exists.
+    cache_manager = ie.current_env().get_cache_manager(self._pipeline)
+    while not cache_manager.exists('full', self._cache_key):
+      time.sleep(0.5)

Review comment:
       I looked into the code a bit deeper and the underlying cache waits for the file to exist. Also, I did run all of the test in 100 simultaneous different processes before pushing the PR. That's usually the best way to simulate the Jenkins environment.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] KevinGG commented on a change in pull request #12415: [BEAM-10603] Add the RecordingManager and associated classes.

Posted by GitBox <gi...@apache.org>.
KevinGG commented on a change in pull request #12415:
URL: https://github.com/apache/beam/pull/12415#discussion_r473417825



##########
File path: sdks/python/apache_beam/runners/interactive/recording_manager.py
##########
@@ -0,0 +1,334 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from __future__ import absolute_import
+
+import logging
+import threading
+import time
+import warnings
+
+import apache_beam as beam
+from apache_beam.runners.interactive import background_caching_job as bcj
+from apache_beam.runners.interactive import interactive_environment as ie
+from apache_beam.runners.interactive import interactive_runner as ir
+from apache_beam.runners.interactive import pipeline_fragment as pf
+from apache_beam.runners.interactive import pipeline_instrument as pi
+from apache_beam.runners.interactive import utils
+from apache_beam.runners.interactive.options.capture_limiters import CountLimiter
+from apache_beam.runners.interactive.options.capture_limiters import ProcessingTimeLimiter
+
+_LOGGER = logging.getLogger(__name__)
+
+PipelineState = beam.runners.runner.PipelineState
+
+
+class ElementStream:
+  """A stream of elements from a given PCollection."""
+  def __init__(
+      self,
+      pcoll,  # type: beam.pvalue.PCollection
+      var,  # type: str
+      cache_key,  # type: str
+      max_n,  # type: int
+      max_duration_secs  # type: float
+      ):
+    self._pcoll = pcoll
+    self._cache_key = cache_key
+    self._pipeline = pcoll.pipeline
+    self._var = var
+    self._n = max_n
+    self._duration_secs = max_duration_secs
+
+    # A small state variable that when True, indicates that no more new elements
+    # will be yielded if read() is called again.
+    self._done = False
+
+  def var(self):
+    # type: () -> str
+
+    """Returns the variable named that defined this PCollection."""
+    return self._var
+
+  def display_id(self, suffix):
+    #Any type: (str) -> str

Review comment:
       Is the `Any` some special type hint or a typo?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] rohdesamuel commented on a change in pull request #12415: [BEAM-10603] Add the RecordingManager and associated classes.

Posted by GitBox <gi...@apache.org>.
rohdesamuel commented on a change in pull request #12415:
URL: https://github.com/apache/beam/pull/12415#discussion_r474821997



##########
File path: sdks/python/apache_beam/runners/interactive/recording_manager.py
##########
@@ -0,0 +1,334 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from __future__ import absolute_import
+
+import logging
+import threading
+import time
+import warnings
+
+import apache_beam as beam
+from apache_beam.runners.interactive import background_caching_job as bcj
+from apache_beam.runners.interactive import interactive_environment as ie
+from apache_beam.runners.interactive import interactive_runner as ir
+from apache_beam.runners.interactive import pipeline_fragment as pf
+from apache_beam.runners.interactive import pipeline_instrument as pi
+from apache_beam.runners.interactive import utils
+from apache_beam.runners.interactive.options.capture_limiters import CountLimiter
+from apache_beam.runners.interactive.options.capture_limiters import ProcessingTimeLimiter
+
+_LOGGER = logging.getLogger(__name__)
+
+PipelineState = beam.runners.runner.PipelineState

Review comment:
       Just inadvertently being silly here, changed to an import




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] rohdesamuel commented on pull request #12415: [BEAM-10603] Add the RecordingManager and associated classes.

Posted by GitBox <gi...@apache.org>.
rohdesamuel commented on pull request #12415:
URL: https://github.com/apache/beam/pull/12415#issuecomment-676599665


   Run Python PreCommit


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] davidyan74 commented on a change in pull request #12415: [BEAM-10603] Add the RecordingManager and associated classes.

Posted by GitBox <gi...@apache.org>.
davidyan74 commented on a change in pull request #12415:
URL: https://github.com/apache/beam/pull/12415#discussion_r472548446



##########
File path: sdks/python/apache_beam/runners/interactive/interactive_runner.py
##########
@@ -152,7 +152,11 @@ def run_pipeline(self, pipeline, options):
               user_pipeline)):
         streaming_cache_manager = ie.current_env().get_cache_manager(
             user_pipeline)
-        if streaming_cache_manager:
+
+        # Only make the server if it doens't exist already.
+        if (streaming_cache_manager and
+            not ie.current_env().get_test_stream_service_controller(

Review comment:
       Just wondering, what if the service controller has stopped? Is it possible?

##########
File path: sdks/python/apache_beam/runners/interactive/recording_manager.py
##########
@@ -0,0 +1,330 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from __future__ import absolute_import
+
+import logging
+import threading
+import time
+import warnings
+
+import apache_beam as beam
+from apache_beam.runners.interactive import background_caching_job as bcj
+from apache_beam.runners.interactive import interactive_environment as ie
+from apache_beam.runners.interactive import interactive_runner as ir
+from apache_beam.runners.interactive import pipeline_fragment as pf
+from apache_beam.runners.interactive import pipeline_instrument as pi
+from apache_beam.runners.interactive import utils
+from apache_beam.runners.interactive.options.capture_limiters import CountLimiter
+from apache_beam.runners.interactive.options.capture_limiters import ProcessingTimeLimiter
+
+_LOGGER = logging.getLogger(__name__)
+
+PipelineState = beam.runners.runner.PipelineState
+
+
+class ElementStream:
+  """A stream of elements from a given PCollection."""
+  def __init__(
+      self,
+      pcoll,  # type: beam.pvalue.PCollection
+      var,  # type: str
+      cache_key,  # type: str
+      max_n,  # type: int
+      max_duration_secs  # type: float
+      ):
+    self._pcoll = pcoll
+    self._cache_key = cache_key
+    self._pipeline = pcoll.pipeline
+    self._var = var
+    self._n = max_n
+    self._duration_secs = max_duration_secs
+
+    # A small state variable that when True, indicates that no more new elements
+    # will be yielded if read() is called again.
+    self._done = False
+
+  def var(self):
+    # type: () -> str
+
+    """Returns the variable named that defined this PCollection."""
+    return self._var
+
+  def display_id(self, suffix):
+    #Any type: (str) -> str
+
+    """Returns a unique id able to be displayed in a web browser."""
+    return utils.obfuscate(self._cache_key, suffix)
+
+  def is_computed(self):
+    # type: () -> boolean
+
+    """Returns True if no more elements will be recorded."""
+    return self._pcoll in ie.current_env().computed_pcollections
+
+  def is_done(self):
+    # type: () -> boolean
+
+    """Returns True if no more new elements will be yielded."""
+    return self._done
+
+  def read(self, tail=True):
+    # type: (boolean) -> Any
+
+    """Reads the elements currently recorded."""
+
+    # Get the cache manager and wait until the file exists.
+    cache_manager = ie.current_env().get_cache_manager(self._pipeline)
+    while not cache_manager.exists('full', self._cache_key):
+      pass
+
+    # Retrieve the coder for the particular PCollection which will be used to
+    # decode elements read from cache.
+    coder = cache_manager.load_pcoder('full', self._cache_key)
+
+    # Read the elements from the cache.
+    limiters = [
+        CountLimiter(self._n), ProcessingTimeLimiter(self._duration_secs)
+    ]
+    if hasattr(cache_manager, 'read_multiple'):
+      reader = cache_manager.read_multiple([('full', self._cache_key)],
+                                           limiters=limiters,
+                                           tail=tail)
+    else:
+      reader, _ = cache_manager.read('full', self._cache_key, limiters=limiters)
+
+    # Because a single TestStreamFileRecord can yield multiple elements, we
+    # limit the count again here in the to_element_list call.
+    for e in utils.to_element_list(reader,
+                                   coder,
+                                   include_window_info=True,
+                                   n=self._n):
+      yield e
+
+    # A limiter being triggered means that we have fulfilled the user's request.
+    # This implies that reading from the cache again won't yield any new
+    # elements. WLOG, this applies to the user pipeline being terminated.
+    if any(l.is_triggered()
+           for l in limiters) or ie.current_env().is_terminated(self._pipeline):
+      self._done = True
+
+
+class Recording:
+  """A group of PCollections from a given pipeline run."""
+  def __init__(
+      self,
+      user_pipeline,  # type: beam.Pipeline
+      pcolls,  # type: List[beam.pvalue.PCollection]
+      result,  # type: beam.runner.PipelineResult
+      pipeline_instrument,  # type: beam.runners.interactive.PipelineInstrument
+      max_n,  # type: int
+      max_duration_secs  # type: float
+      ):
+
+    self._user_pipeline = user_pipeline
+    self._result = result
+    self._pcolls = pcolls
+
+    pcoll_var = lambda pcoll: pipeline_instrument.cacheable_var_by_pcoll_id(
+        pipeline_instrument.pcolls_to_pcoll_id.get(str(pcoll), None))
+
+    self._streams = {
+        pcoll: ElementStream(
+            pcoll,
+            pcoll_var(pcoll),
+            pipeline_instrument.cache_key(pcoll),
+            max_n,
+            max_duration_secs)
+        for pcoll in pcolls
+    }
+    self._start = time.time()
+    self._duration_secs = max_duration_secs
+    self._set_computed = bcj.is_cache_complete(str(id(user_pipeline)))
+
+    # Run a separate thread for marking the PCollections done. This is because
+    # the pipeline run may be asynchronous.
+    self._mark_computed = threading.Thread(target=self._mark_all_computed)
+    self._mark_computed.daemon = True
+    self._mark_computed.start()
+
+  def _mark_all_computed(self):
+    # type: () -> None
+
+    """Marks all the PCollections upon a successful pipeline run."""
+    if not self._result:
+      return
+
+    while not PipelineState.is_terminal(self._result.state):

Review comment:
       This also looks like busy waiting.

##########
File path: sdks/python/apache_beam/runners/interactive/interactive_runner.py
##########
@@ -152,7 +152,11 @@ def run_pipeline(self, pipeline, options):
               user_pipeline)):
         streaming_cache_manager = ie.current_env().get_cache_manager(
             user_pipeline)
-        if streaming_cache_manager:
+
+        # Only make the server if it doens't exist already.

Review comment:
       typo: doesn't

##########
File path: sdks/python/apache_beam/runners/interactive/recording_manager.py
##########
@@ -0,0 +1,330 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from __future__ import absolute_import
+
+import logging
+import threading
+import time
+import warnings
+
+import apache_beam as beam
+from apache_beam.runners.interactive import background_caching_job as bcj
+from apache_beam.runners.interactive import interactive_environment as ie
+from apache_beam.runners.interactive import interactive_runner as ir
+from apache_beam.runners.interactive import pipeline_fragment as pf
+from apache_beam.runners.interactive import pipeline_instrument as pi
+from apache_beam.runners.interactive import utils
+from apache_beam.runners.interactive.options.capture_limiters import CountLimiter
+from apache_beam.runners.interactive.options.capture_limiters import ProcessingTimeLimiter
+
+_LOGGER = logging.getLogger(__name__)
+
+PipelineState = beam.runners.runner.PipelineState
+
+
+class ElementStream:
+  """A stream of elements from a given PCollection."""
+  def __init__(
+      self,
+      pcoll,  # type: beam.pvalue.PCollection
+      var,  # type: str
+      cache_key,  # type: str
+      max_n,  # type: int
+      max_duration_secs  # type: float
+      ):
+    self._pcoll = pcoll
+    self._cache_key = cache_key
+    self._pipeline = pcoll.pipeline
+    self._var = var
+    self._n = max_n
+    self._duration_secs = max_duration_secs
+
+    # A small state variable that when True, indicates that no more new elements
+    # will be yielded if read() is called again.
+    self._done = False
+
+  def var(self):
+    # type: () -> str
+
+    """Returns the variable named that defined this PCollection."""
+    return self._var
+
+  def display_id(self, suffix):
+    #Any type: (str) -> str
+
+    """Returns a unique id able to be displayed in a web browser."""
+    return utils.obfuscate(self._cache_key, suffix)
+
+  def is_computed(self):
+    # type: () -> boolean
+
+    """Returns True if no more elements will be recorded."""
+    return self._pcoll in ie.current_env().computed_pcollections
+
+  def is_done(self):
+    # type: () -> boolean
+
+    """Returns True if no more new elements will be yielded."""
+    return self._done
+
+  def read(self, tail=True):
+    # type: (boolean) -> Any
+
+    """Reads the elements currently recorded."""
+
+    # Get the cache manager and wait until the file exists.
+    cache_manager = ie.current_env().get_cache_manager(self._pipeline)
+    while not cache_manager.exists('full', self._cache_key):
+      pass
+
+    # Retrieve the coder for the particular PCollection which will be used to
+    # decode elements read from cache.
+    coder = cache_manager.load_pcoder('full', self._cache_key)
+
+    # Read the elements from the cache.
+    limiters = [
+        CountLimiter(self._n), ProcessingTimeLimiter(self._duration_secs)
+    ]
+    if hasattr(cache_manager, 'read_multiple'):

Review comment:
       This looks like a hack. Is it possible to add read_multiple to CacheManager itself and FileBasedCacheManager as well?

##########
File path: sdks/python/apache_beam/runners/interactive/recording_manager.py
##########
@@ -0,0 +1,330 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from __future__ import absolute_import
+
+import logging
+import threading
+import time
+import warnings
+
+import apache_beam as beam
+from apache_beam.runners.interactive import background_caching_job as bcj
+from apache_beam.runners.interactive import interactive_environment as ie
+from apache_beam.runners.interactive import interactive_runner as ir
+from apache_beam.runners.interactive import pipeline_fragment as pf
+from apache_beam.runners.interactive import pipeline_instrument as pi
+from apache_beam.runners.interactive import utils
+from apache_beam.runners.interactive.options.capture_limiters import CountLimiter
+from apache_beam.runners.interactive.options.capture_limiters import ProcessingTimeLimiter
+
+_LOGGER = logging.getLogger(__name__)
+
+PipelineState = beam.runners.runner.PipelineState
+
+
+class ElementStream:
+  """A stream of elements from a given PCollection."""
+  def __init__(
+      self,
+      pcoll,  # type: beam.pvalue.PCollection
+      var,  # type: str
+      cache_key,  # type: str
+      max_n,  # type: int
+      max_duration_secs  # type: float
+      ):
+    self._pcoll = pcoll
+    self._cache_key = cache_key
+    self._pipeline = pcoll.pipeline
+    self._var = var
+    self._n = max_n
+    self._duration_secs = max_duration_secs
+
+    # A small state variable that when True, indicates that no more new elements
+    # will be yielded if read() is called again.
+    self._done = False
+
+  def var(self):
+    # type: () -> str
+
+    """Returns the variable named that defined this PCollection."""
+    return self._var
+
+  def display_id(self, suffix):
+    #Any type: (str) -> str
+
+    """Returns a unique id able to be displayed in a web browser."""
+    return utils.obfuscate(self._cache_key, suffix)
+
+  def is_computed(self):
+    # type: () -> boolean
+
+    """Returns True if no more elements will be recorded."""
+    return self._pcoll in ie.current_env().computed_pcollections
+
+  def is_done(self):
+    # type: () -> boolean
+
+    """Returns True if no more new elements will be yielded."""
+    return self._done
+
+  def read(self, tail=True):
+    # type: (boolean) -> Any
+
+    """Reads the elements currently recorded."""
+
+    # Get the cache manager and wait until the file exists.
+    cache_manager = ie.current_env().get_cache_manager(self._pipeline)
+    while not cache_manager.exists('full', self._cache_key):
+      pass

Review comment:
       This looks like busy waiting, which can hog the CPU. Can we do it another way? Or at least have a sleep in the while loop?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] pabloem commented on pull request #12415: [BEAM-10603] Add the RecordingManager and associated classes.

Posted by GitBox <gi...@apache.org>.
pabloem commented on pull request #12415:
URL: https://github.com/apache/beam/pull/12415#issuecomment-679286776


   Run Python PreCommit


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] rohdesamuel commented on a change in pull request #12415: [BEAM-10603] Add the RecordingManager and associated classes.

Posted by GitBox <gi...@apache.org>.
rohdesamuel commented on a change in pull request #12415:
URL: https://github.com/apache/beam/pull/12415#discussion_r472561829



##########
File path: sdks/python/apache_beam/runners/interactive/interactive_runner.py
##########
@@ -152,7 +152,11 @@ def run_pipeline(self, pipeline, options):
               user_pipeline)):
         streaming_cache_manager = ie.current_env().get_cache_manager(
             user_pipeline)
-        if streaming_cache_manager:
+
+        # Only make the server if it doens't exist already.

Review comment:
       Done




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] rohdesamuel commented on a change in pull request #12415: [BEAM-10603] Add the RecordingManager and associated classes.

Posted by GitBox <gi...@apache.org>.
rohdesamuel commented on a change in pull request #12415:
URL: https://github.com/apache/beam/pull/12415#discussion_r474823799



##########
File path: sdks/python/apache_beam/runners/interactive/recording_manager_test.py
##########
@@ -0,0 +1,301 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#  http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from __future__ import absolute_import
+
+import sys
+import unittest
+
+import apache_beam as beam
+from apache_beam import coders
+from apache_beam.portability.api.beam_interactive_api_pb2 import TestStreamFileRecord
+from apache_beam.runners.interactive import background_caching_job as bcj
+from apache_beam.runners.interactive import interactive_beam as ib
+from apache_beam.runners.interactive import interactive_environment as ie
+from apache_beam.runners.interactive import pipeline_instrument as pi
+from apache_beam.runners.interactive.interactive_runner import InteractiveRunner
+from apache_beam.runners.interactive.recording_manager import ElementStream
+from apache_beam.runners.interactive.recording_manager import Recording
+from apache_beam.runners.interactive.recording_manager import RecordingManager
+from apache_beam.runners.interactive.testing.test_cache_manager import FileRecordsBuilder
+from apache_beam.runners.interactive.testing.test_cache_manager import InMemoryCache
+from apache_beam.transforms.window import GlobalWindow
+from apache_beam.utils.timestamp import MIN_TIMESTAMP
+from apache_beam.utils.windowed_value import WindowedValue
+
+PipelineState = beam.runners.runner.PipelineState
+
+
+class MockPipelineResult(beam.runners.runner.PipelineResult):
+  """Mock class for controlling a PipelineResult."""
+  def __init__(self):
+    self._state = PipelineState.RUNNING
+
+  def wait_until_finish(self):
+    pass
+
+  def set_state(self, state):
+    self._state = state
+
+  @property
+  def state(self):
+    return self._state
+
+  def cancel(self):
+    self._state = PipelineState.CANCELLED
+
+
+class ElementStreamTest(unittest.TestCase):
+  def setUp(self):
+    ie.new_env()
+
+    self.cache = InMemoryCache()
+    self.p = beam.Pipeline()
+    self.pcoll = self.p | beam.Create([])
+    self.cache_key = str(pi.CacheKey('pcoll', '', '', ''))
+
+    # Create a MockPipelineResult to control the state of a fake run of the
+    # pipeline.
+    self.mock_result = MockPipelineResult()
+    ie.current_env().track_user_pipelines()
+    ie.current_env().set_pipeline_result(self.p, self.mock_result)
+    ie.current_env().set_cache_manager(self.cache, self.p)
+
+  def test_read(self):
+    """Test reading and if a stream is done no more elements are returned."""
+
+    self.mock_result.set_state(PipelineState.DONE)
+    self.cache.write(['expected'], 'full', self.cache_key)
+    self.cache.save_pcoder(None, 'full', self.cache_key)
+
+    stream = ElementStream(
+        self.pcoll, '', self.cache_key, max_n=1, max_duration_secs=1)
+
+    self.assertFalse(stream.is_done())
+    self.assertEqual(list(stream.read())[0], 'expected')
+    self.assertTrue(stream.is_done())
+
+  def test_done_if_terminated(self):
+    """Test that terminating the job sets the stream as done."""
+
+    self.cache.write(['expected'], 'full', self.cache_key)
+    self.cache.save_pcoder(None, 'full', self.cache_key)
+
+    stream = ElementStream(
+        self.pcoll, '', self.cache_key, max_n=100, max_duration_secs=10)
+
+    self.assertFalse(stream.is_done())
+    self.assertEqual(list(stream.read(tail=False))[0], 'expected')
+
+    # The limiters were not reached, so the stream is not done yet.
+    self.assertFalse(stream.is_done())
+
+    self.mock_result.set_state(PipelineState.DONE)
+    self.assertEqual(list(stream.read(tail=False))[0], 'expected')
+
+    # The underlying pipeline is terminated, so the stream won't yield new
+    # elements.
+    self.assertTrue(stream.is_done())
+
+  def test_read_n(self):
+    """Test that the stream only reads 'n' elements."""
+
+    self.mock_result.set_state(PipelineState.DONE)
+    self.cache.write(list(range(5)), 'full', self.cache_key)
+    self.cache.save_pcoder(None, 'full', self.cache_key)
+
+    stream = ElementStream(
+        self.pcoll, '', self.cache_key, max_n=1, max_duration_secs=1)
+    self.assertEqual(list(stream.read()), [0])
+    self.assertTrue(stream.is_done())
+
+    stream = ElementStream(
+        self.pcoll, '', self.cache_key, max_n=2, max_duration_secs=1)
+    self.assertEqual(list(stream.read()), [0, 1])
+    self.assertTrue(stream.is_done())
+
+    stream = ElementStream(
+        self.pcoll, '', self.cache_key, max_n=5, max_duration_secs=1)
+    self.assertEqual(list(stream.read()), list(range(5)))
+    self.assertTrue(stream.is_done())
+
+    # Test that if the user asks for more than in the cache it still returns.
+    stream = ElementStream(
+        self.pcoll, '', self.cache_key, max_n=10, max_duration_secs=1)
+    self.assertEqual(list(stream.read()), list(range(5)))
+    self.assertTrue(stream.is_done())
+
+  def test_read_duration(self):
+    """Test that the stream only reads a 'duration' of elements."""
+
+    values = (FileRecordsBuilder(tag=self.cache_key)
+              .advance_processing_time(1)
+              .add_element(element=0, event_time_secs=0)
+              .advance_processing_time(1)
+              .add_element(element=1, event_time_secs=1)
+              .advance_processing_time(1)
+              .add_element(element=2, event_time_secs=3)
+              .advance_processing_time(1)
+              .add_element(element=3, event_time_secs=4)
+              .advance_processing_time(1)
+              .add_element(element=4, event_time_secs=5)
+              .build()) # yapf: disable
+
+    self.mock_result.set_state(PipelineState.DONE)
+    self.cache.write(values, 'full', self.cache_key)
+    self.cache.save_pcoder(None, 'full', self.cache_key)
+
+    # The elements read from the cache are TestStreamFileRecord instances and
+    # have the underlying elements encoded. This method decodes the elements
+    # from the TestStreamFileRecord.
+    def get_elements(events):
+      coder = coders.FastPrimitivesCoder()
+      elements = []
+      for e in events:
+        if not isinstance(e, TestStreamFileRecord):
+          continue
+
+        if e.recorded_event.element_event:
+          elements += ([
+              coder.decode(el.encoded_element)
+              for el in e.recorded_event.element_event.elements
+          ])
+      return elements
+
+    # The following tests a progression of reading different durations from the
+    # cache.
+    stream = ElementStream(
+        self.pcoll, '', self.cache_key, max_n=100, max_duration_secs=1)
+    self.assertSequenceEqual(get_elements(stream.read()), [0])
+
+    stream = ElementStream(
+        self.pcoll, '', self.cache_key, max_n=100, max_duration_secs=2)
+    self.assertSequenceEqual(get_elements(stream.read()), [0, 1])
+
+    stream = ElementStream(
+        self.pcoll, '', self.cache_key, max_n=100, max_duration_secs=10)
+    self.assertSequenceEqual(get_elements(stream.read()), [0, 1, 2, 3, 4])
+
+
+class RecordingTest(unittest.TestCase):
+  def setUp(self):
+    ie.new_env()
+
+  @unittest.skipIf(
+      sys.version_info < (3, 6, 0),
+      'This test requires at least Python 3.6 to work.')
+  def test_computed(self):
+    """Tests that a PCollection is marked as computed only in a complete state.
+
+    Because the background caching job is now long-lived, repeated runs of a
+    PipelineFragment may yield different results for the same PCollection.
+    """
+
+    p = beam.Pipeline(InteractiveRunner())
+    elems = p | beam.Create([0, 1, 2])
+
+    ib.watch(locals())
+
+    # Create a MockPipelineResult to control the state of a fake run of the
+    # pipeline.
+    mock_result = MockPipelineResult()
+    ie.current_env().track_user_pipelines()
+    ie.current_env().set_pipeline_result(p, mock_result)
+
+    # Create a mock BackgroundCachingJob that will control whether to set the
+    # PCollections as computed or not.
+    bcj_mock_result = MockPipelineResult()
+    background_caching_job = bcj.BackgroundCachingJob(bcj_mock_result, [])
+
+    # Create a recording.
+    recording = Recording(
+        p, [elems],
+        mock_result,
+        pi.PipelineInstrument(p),
+        max_n=10,
+        max_duration_secs=60)
+
+    # The background caching job and the recording isn't done yet so there may
+    # be more elements to be recorded.
+    self.assertFalse(recording.is_computed())
+    self.assertFalse(recording.computed())
+    self.assertTrue(recording.uncomputed())
+
+    # The recording is finished but the background caching job is not. There
+    # may still be more elements to record, or the intermediate PCollection may
+    # have stopped caching in an incomplete state, e.g. before a window could
+    # fire.
+    mock_result.set_state(PipelineState.DONE)
+    recording.wait_until_finish()
+
+    self.assertFalse(recording.is_computed())
+    self.assertFalse(recording.computed())
+    self.assertTrue(recording.uncomputed())
+
+    # The background caching job finished before we started a recording which
+    # is a sure signal that there will be no more elements.
+    bcj_mock_result.set_state(PipelineState.DONE)
+    ie.current_env().set_background_caching_job(p, background_caching_job)
+    recording = Recording(
+        p, [elems],
+        mock_result,
+        pi.PipelineInstrument(p),
+        max_n=10,
+        max_duration_secs=60)
+    recording.wait_until_finish()
+
+    # There are no more elements and the recording finished, meaning that the
+    # intermediate PCollections are in a complete state. They can now be marked
+    # as computed.
+    self.assertTrue(recording.is_computed())
+    self.assertTrue(recording.computed())
+    self.assertFalse(recording.uncomputed())
+
+
+class RecordingManagerTest(unittest.TestCase):
+  def setUp(self):
+    ie.new_env()
+
+  @unittest.skipIf(
+      sys.version_info < (3, 6, 0),
+      'This test requires at least Python 3.6 to work.')
+  def test_basic_wordcount(self):

Review comment:
       The RecordingManager.record method starts a PipelineFragment from the given pipeline. I added comments to make it more clear.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] rohdesamuel commented on a change in pull request #12415: [BEAM-10603] Add the RecordingManager and associated classes.

Posted by GitBox <gi...@apache.org>.
rohdesamuel commented on a change in pull request #12415:
URL: https://github.com/apache/beam/pull/12415#discussion_r473392538



##########
File path: sdks/python/apache_beam/runners/interactive/recording_manager.py
##########
@@ -0,0 +1,334 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from __future__ import absolute_import
+
+import logging
+import threading
+import time
+import warnings
+
+import apache_beam as beam
+from apache_beam.runners.interactive import background_caching_job as bcj
+from apache_beam.runners.interactive import interactive_environment as ie
+from apache_beam.runners.interactive import interactive_runner as ir
+from apache_beam.runners.interactive import pipeline_fragment as pf
+from apache_beam.runners.interactive import pipeline_instrument as pi
+from apache_beam.runners.interactive import utils
+from apache_beam.runners.interactive.options.capture_limiters import CountLimiter
+from apache_beam.runners.interactive.options.capture_limiters import ProcessingTimeLimiter
+
+_LOGGER = logging.getLogger(__name__)
+
+PipelineState = beam.runners.runner.PipelineState
+
+
+class ElementStream:
+  """A stream of elements from a given PCollection."""
+  def __init__(
+      self,
+      pcoll,  # type: beam.pvalue.PCollection
+      var,  # type: str
+      cache_key,  # type: str
+      max_n,  # type: int
+      max_duration_secs  # type: float
+      ):
+    self._pcoll = pcoll
+    self._cache_key = cache_key
+    self._pipeline = pcoll.pipeline
+    self._var = var
+    self._n = max_n
+    self._duration_secs = max_duration_secs
+
+    # A small state variable that when True, indicates that no more new elements
+    # will be yielded if read() is called again.
+    self._done = False
+
+  def var(self):
+    # type: () -> str
+
+    """Returns the variable named that defined this PCollection."""
+    return self._var
+
+  def display_id(self, suffix):
+    #Any type: (str) -> str
+
+    """Returns a unique id able to be displayed in a web browser."""
+    return utils.obfuscate(self._cache_key, suffix)
+
+  def is_computed(self):
+    # type: () -> boolean
+
+    """Returns True if no more elements will be recorded."""
+    return self._pcoll in ie.current_env().computed_pcollections
+
+  def is_done(self):
+    # type: () -> boolean
+
+    """Returns True if no more new elements will be yielded."""
+    return self._done
+
+  def read(self, tail=True):
+    # type: (boolean) -> Any
+
+    """Reads the elements currently recorded."""
+
+    # Get the cache manager and wait until the file exists.
+    cache_manager = ie.current_env().get_cache_manager(self._pipeline)
+    while not cache_manager.exists('full', self._cache_key):
+      time.sleep(0.5)
+
+    # Retrieve the coder for the particular PCollection which will be used to
+    # decode elements read from cache.
+    coder = cache_manager.load_pcoder('full', self._cache_key)
+
+    # Read the elements from the cache.
+    limiters = [
+        CountLimiter(self._n), ProcessingTimeLimiter(self._duration_secs)
+    ]
+    reader, _ = cache_manager.read('full', self._cache_key,
+                                   limiters=limiters,
+                                   tail=tail)
+
+    # Because a single TestStreamFileRecord can yield multiple elements, we
+    # limit the count again here in the to_element_list call.
+    #
+    # There are two ways of exiting this loop either a limiter was triggered or
+    # all elements from the cache were read. In the latter situation, it may be
+    # the case that the pipeline was still running. Thus, another invocation of
+    # `read` will yield new elements.
+    for e in utils.to_element_list(reader,
+                                   coder,
+                                   include_window_info=True,
+                                   n=self._n):
+      yield e
+
+    # A limiter being triggered means that we have fulfilled the user's request.
+    # This implies that reading from the cache again won't yield any new
+    # elements. WLOG, this applies to the user pipeline being terminated.
+    if any(l.is_triggered()
+           for l in limiters) or ie.current_env().is_terminated(self._pipeline):
+      self._done = True
+
+
+class Recording:
+  """A group of PCollections from a given pipeline run."""
+  def __init__(
+      self,
+      user_pipeline,  # type: beam.Pipeline
+      pcolls,  # type: List[beam.pvalue.PCollection]
+      result,  # type: beam.runner.PipelineResult
+      pipeline_instrument,  # type: beam.runners.interactive.PipelineInstrument
+      max_n,  # type: int
+      max_duration_secs  # type: float
+      ):
+
+    self._user_pipeline = user_pipeline
+    self._result = result
+    self._pcolls = pcolls
+
+    pcoll_var = lambda pcoll: pipeline_instrument.cacheable_var_by_pcoll_id(
+        pipeline_instrument.pcolls_to_pcoll_id.get(str(pcoll), None))
+
+    self._streams = {
+        pcoll: ElementStream(
+            pcoll,
+            pcoll_var(pcoll),
+            pipeline_instrument.cache_key(pcoll),
+            max_n,
+            max_duration_secs)
+        for pcoll in pcolls
+    }
+    self._start = time.time()
+    self._duration_secs = max_duration_secs
+    self._set_computed = bcj.is_cache_complete(str(id(user_pipeline)))
+
+    # Run a separate thread for marking the PCollections done. This is because
+    # the pipeline run may be asynchronous.
+    self._mark_computed = threading.Thread(target=self._mark_all_computed)
+    self._mark_computed.daemon = True
+    self._mark_computed.start()
+
+  def _mark_all_computed(self):
+    # type: () -> None
+
+    """Marks all the PCollections upon a successful pipeline run."""
+    if not self._result:
+      return
+
+    while not PipelineState.is_terminal(self._result.state):
+      if time.time() - self._start >= self._duration_secs:
+        self._result.cancel()
+        self._result.wait_until_finish()
+
+      if all(s.is_done() for s in self._streams.values()):

Review comment:
       Cancel is an idempotent operation so I didn't really think about it. I can change it to elif though




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] rohdesamuel commented on pull request #12415: [BEAM-10603] Add the RecordingManager and associated classes.

Posted by GitBox <gi...@apache.org>.
rohdesamuel commented on pull request #12415:
URL: https://github.com/apache/beam/pull/12415#issuecomment-678439815






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] pabloem commented on a change in pull request #12415: [BEAM-10603] Add the RecordingManager and associated classes.

Posted by GitBox <gi...@apache.org>.
pabloem commented on a change in pull request #12415:
URL: https://github.com/apache/beam/pull/12415#discussion_r474338799



##########
File path: sdks/python/apache_beam/runners/interactive/recording_manager.py
##########
@@ -0,0 +1,334 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from __future__ import absolute_import
+
+import logging
+import threading
+import time
+import warnings
+
+import apache_beam as beam
+from apache_beam.runners.interactive import background_caching_job as bcj
+from apache_beam.runners.interactive import interactive_environment as ie
+from apache_beam.runners.interactive import interactive_runner as ir
+from apache_beam.runners.interactive import pipeline_fragment as pf
+from apache_beam.runners.interactive import pipeline_instrument as pi
+from apache_beam.runners.interactive import utils
+from apache_beam.runners.interactive.options.capture_limiters import CountLimiter
+from apache_beam.runners.interactive.options.capture_limiters import ProcessingTimeLimiter
+
+_LOGGER = logging.getLogger(__name__)
+
+PipelineState = beam.runners.runner.PipelineState

Review comment:
       Why aren't you importing this class like a normal one?

##########
File path: sdks/python/apache_beam/runners/interactive/recording_manager.py
##########
@@ -0,0 +1,334 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from __future__ import absolute_import
+
+import logging
+import threading
+import time
+import warnings
+
+import apache_beam as beam
+from apache_beam.runners.interactive import background_caching_job as bcj
+from apache_beam.runners.interactive import interactive_environment as ie
+from apache_beam.runners.interactive import interactive_runner as ir
+from apache_beam.runners.interactive import pipeline_fragment as pf
+from apache_beam.runners.interactive import pipeline_instrument as pi
+from apache_beam.runners.interactive import utils
+from apache_beam.runners.interactive.options.capture_limiters import CountLimiter
+from apache_beam.runners.interactive.options.capture_limiters import ProcessingTimeLimiter
+
+_LOGGER = logging.getLogger(__name__)
+
+PipelineState = beam.runners.runner.PipelineState
+
+
+class ElementStream:
+  """A stream of elements from a given PCollection."""
+  def __init__(
+      self,
+      pcoll,  # type: beam.pvalue.PCollection
+      var,  # type: str
+      cache_key,  # type: str
+      max_n,  # type: int
+      max_duration_secs  # type: float
+      ):
+    self._pcoll = pcoll
+    self._cache_key = cache_key
+    self._pipeline = pcoll.pipeline
+    self._var = var
+    self._n = max_n
+    self._duration_secs = max_duration_secs
+
+    # A small state variable that when True, indicates that no more new elements
+    # will be yielded if read() is called again.
+    self._done = False
+
+  def var(self):
+    # type: () -> str
+
+    """Returns the variable named that defined this PCollection."""
+    return self._var
+
+  def display_id(self, suffix):
+    # type: (str) -> str
+
+    """Returns a unique id able to be displayed in a web browser."""
+    return utils.obfuscate(self._cache_key, suffix)
+
+  def is_computed(self):
+    # type: () -> boolean
+
+    """Returns True if no more elements will be recorded."""
+    return self._pcoll in ie.current_env().computed_pcollections
+
+  def is_done(self):
+    # type: () -> boolean
+
+    """Returns True if no more new elements will be yielded."""
+    return self._done
+
+  def read(self, tail=True):
+    # type: (boolean) -> Any
+
+    """Reads the elements currently recorded."""
+
+    # Get the cache manager and wait until the file exists.
+    cache_manager = ie.current_env().get_cache_manager(self._pipeline)
+    while not cache_manager.exists('full', self._cache_key):
+      time.sleep(0.5)

Review comment:
       I remember sleeping gave us trouble earlier. Does it make sense to write a method in cache_manager to wait without sleeping? (maybe on a lock or some such thing?)
   

##########
File path: sdks/python/apache_beam/runners/interactive/interactive_runner.py
##########
@@ -152,7 +152,11 @@ def run_pipeline(self, pipeline, options):
               user_pipeline)):
         streaming_cache_manager = ie.current_env().get_cache_manager(
             user_pipeline)
-        if streaming_cache_manager:
+
+        # Only make the server if it doens't exist already.
+        if (streaming_cache_manager and
+            not ie.current_env().get_test_stream_service_controller(

Review comment:
       IIUC this change is fixing an unrelated issue, right?

##########
File path: sdks/python/apache_beam/runners/interactive/recording_manager_test.py
##########
@@ -0,0 +1,301 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#  http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from __future__ import absolute_import
+
+import sys
+import unittest
+
+import apache_beam as beam
+from apache_beam import coders
+from apache_beam.portability.api.beam_interactive_api_pb2 import TestStreamFileRecord
+from apache_beam.runners.interactive import background_caching_job as bcj
+from apache_beam.runners.interactive import interactive_beam as ib
+from apache_beam.runners.interactive import interactive_environment as ie
+from apache_beam.runners.interactive import pipeline_instrument as pi
+from apache_beam.runners.interactive.interactive_runner import InteractiveRunner
+from apache_beam.runners.interactive.recording_manager import ElementStream
+from apache_beam.runners.interactive.recording_manager import Recording
+from apache_beam.runners.interactive.recording_manager import RecordingManager
+from apache_beam.runners.interactive.testing.test_cache_manager import FileRecordsBuilder
+from apache_beam.runners.interactive.testing.test_cache_manager import InMemoryCache
+from apache_beam.transforms.window import GlobalWindow
+from apache_beam.utils.timestamp import MIN_TIMESTAMP
+from apache_beam.utils.windowed_value import WindowedValue
+
+PipelineState = beam.runners.runner.PipelineState
+
+
+class MockPipelineResult(beam.runners.runner.PipelineResult):
+  """Mock class for controlling a PipelineResult."""
+  def __init__(self):
+    self._state = PipelineState.RUNNING
+
+  def wait_until_finish(self):
+    pass
+
+  def set_state(self, state):
+    self._state = state
+
+  @property
+  def state(self):
+    return self._state
+
+  def cancel(self):
+    self._state = PipelineState.CANCELLED
+
+
+class ElementStreamTest(unittest.TestCase):
+  def setUp(self):
+    ie.new_env()
+
+    self.cache = InMemoryCache()
+    self.p = beam.Pipeline()
+    self.pcoll = self.p | beam.Create([])
+    self.cache_key = str(pi.CacheKey('pcoll', '', '', ''))
+
+    # Create a MockPipelineResult to control the state of a fake run of the
+    # pipeline.
+    self.mock_result = MockPipelineResult()
+    ie.current_env().track_user_pipelines()
+    ie.current_env().set_pipeline_result(self.p, self.mock_result)
+    ie.current_env().set_cache_manager(self.cache, self.p)
+
+  def test_read(self):
+    """Test reading and if a stream is done no more elements are returned."""
+
+    self.mock_result.set_state(PipelineState.DONE)
+    self.cache.write(['expected'], 'full', self.cache_key)
+    self.cache.save_pcoder(None, 'full', self.cache_key)
+
+    stream = ElementStream(
+        self.pcoll, '', self.cache_key, max_n=1, max_duration_secs=1)
+
+    self.assertFalse(stream.is_done())
+    self.assertEqual(list(stream.read())[0], 'expected')
+    self.assertTrue(stream.is_done())
+
+  def test_done_if_terminated(self):
+    """Test that terminating the job sets the stream as done."""
+
+    self.cache.write(['expected'], 'full', self.cache_key)
+    self.cache.save_pcoder(None, 'full', self.cache_key)
+
+    stream = ElementStream(
+        self.pcoll, '', self.cache_key, max_n=100, max_duration_secs=10)
+
+    self.assertFalse(stream.is_done())
+    self.assertEqual(list(stream.read(tail=False))[0], 'expected')
+
+    # The limiters were not reached, so the stream is not done yet.
+    self.assertFalse(stream.is_done())
+
+    self.mock_result.set_state(PipelineState.DONE)
+    self.assertEqual(list(stream.read(tail=False))[0], 'expected')
+
+    # The underlying pipeline is terminated, so the stream won't yield new
+    # elements.
+    self.assertTrue(stream.is_done())
+
+  def test_read_n(self):
+    """Test that the stream only reads 'n' elements."""
+
+    self.mock_result.set_state(PipelineState.DONE)
+    self.cache.write(list(range(5)), 'full', self.cache_key)
+    self.cache.save_pcoder(None, 'full', self.cache_key)
+
+    stream = ElementStream(
+        self.pcoll, '', self.cache_key, max_n=1, max_duration_secs=1)
+    self.assertEqual(list(stream.read()), [0])
+    self.assertTrue(stream.is_done())
+
+    stream = ElementStream(
+        self.pcoll, '', self.cache_key, max_n=2, max_duration_secs=1)
+    self.assertEqual(list(stream.read()), [0, 1])
+    self.assertTrue(stream.is_done())
+
+    stream = ElementStream(
+        self.pcoll, '', self.cache_key, max_n=5, max_duration_secs=1)
+    self.assertEqual(list(stream.read()), list(range(5)))
+    self.assertTrue(stream.is_done())
+
+    # Test that if the user asks for more than in the cache it still returns.
+    stream = ElementStream(
+        self.pcoll, '', self.cache_key, max_n=10, max_duration_secs=1)
+    self.assertEqual(list(stream.read()), list(range(5)))
+    self.assertTrue(stream.is_done())
+
+  def test_read_duration(self):
+    """Test that the stream only reads a 'duration' of elements."""
+
+    values = (FileRecordsBuilder(tag=self.cache_key)
+              .advance_processing_time(1)
+              .add_element(element=0, event_time_secs=0)
+              .advance_processing_time(1)
+              .add_element(element=1, event_time_secs=1)
+              .advance_processing_time(1)
+              .add_element(element=2, event_time_secs=3)
+              .advance_processing_time(1)
+              .add_element(element=3, event_time_secs=4)
+              .advance_processing_time(1)
+              .add_element(element=4, event_time_secs=5)
+              .build()) # yapf: disable
+
+    self.mock_result.set_state(PipelineState.DONE)
+    self.cache.write(values, 'full', self.cache_key)
+    self.cache.save_pcoder(None, 'full', self.cache_key)
+
+    # The elements read from the cache are TestStreamFileRecord instances and
+    # have the underlying elements encoded. This method decodes the elements
+    # from the TestStreamFileRecord.
+    def get_elements(events):
+      coder = coders.FastPrimitivesCoder()
+      elements = []
+      for e in events:
+        if not isinstance(e, TestStreamFileRecord):
+          continue
+
+        if e.recorded_event.element_event:
+          elements += ([
+              coder.decode(el.encoded_element)
+              for el in e.recorded_event.element_event.elements
+          ])
+      return elements
+
+    # The following tests a progression of reading different durations from the
+    # cache.
+    stream = ElementStream(
+        self.pcoll, '', self.cache_key, max_n=100, max_duration_secs=1)
+    self.assertSequenceEqual(get_elements(stream.read()), [0])
+
+    stream = ElementStream(
+        self.pcoll, '', self.cache_key, max_n=100, max_duration_secs=2)
+    self.assertSequenceEqual(get_elements(stream.read()), [0, 1])
+
+    stream = ElementStream(
+        self.pcoll, '', self.cache_key, max_n=100, max_duration_secs=10)
+    self.assertSequenceEqual(get_elements(stream.read()), [0, 1, 2, 3, 4])
+
+
+class RecordingTest(unittest.TestCase):
+  def setUp(self):
+    ie.new_env()
+
+  @unittest.skipIf(
+      sys.version_info < (3, 6, 0),
+      'This test requires at least Python 3.6 to work.')
+  def test_computed(self):
+    """Tests that a PCollection is marked as computed only in a complete state.
+
+    Because the background caching job is now long-lived, repeated runs of a
+    PipelineFragment may yield different results for the same PCollection.
+    """
+
+    p = beam.Pipeline(InteractiveRunner())
+    elems = p | beam.Create([0, 1, 2])
+
+    ib.watch(locals())
+
+    # Create a MockPipelineResult to control the state of a fake run of the
+    # pipeline.
+    mock_result = MockPipelineResult()
+    ie.current_env().track_user_pipelines()
+    ie.current_env().set_pipeline_result(p, mock_result)
+
+    # Create a mock BackgroundCachingJob that will control whether to set the
+    # PCollections as computed or not.
+    bcj_mock_result = MockPipelineResult()
+    background_caching_job = bcj.BackgroundCachingJob(bcj_mock_result, [])
+
+    # Create a recording.
+    recording = Recording(
+        p, [elems],
+        mock_result,
+        pi.PipelineInstrument(p),
+        max_n=10,
+        max_duration_secs=60)
+
+    # The background caching job and the recording isn't done yet so there may
+    # be more elements to be recorded.
+    self.assertFalse(recording.is_computed())
+    self.assertFalse(recording.computed())
+    self.assertTrue(recording.uncomputed())
+
+    # The recording is finished but the background caching job is not. There
+    # may still be more elements to record, or the intermediate PCollection may
+    # have stopped caching in an incomplete state, e.g. before a window could
+    # fire.
+    mock_result.set_state(PipelineState.DONE)
+    recording.wait_until_finish()
+
+    self.assertFalse(recording.is_computed())
+    self.assertFalse(recording.computed())
+    self.assertTrue(recording.uncomputed())
+
+    # The background caching job finished before we started a recording which
+    # is a sure signal that there will be no more elements.
+    bcj_mock_result.set_state(PipelineState.DONE)
+    ie.current_env().set_background_caching_job(p, background_caching_job)
+    recording = Recording(
+        p, [elems],
+        mock_result,
+        pi.PipelineInstrument(p),
+        max_n=10,
+        max_duration_secs=60)
+    recording.wait_until_finish()
+
+    # There are no more elements and the recording finished, meaning that the
+    # intermediate PCollections are in a complete state. They can now be marked
+    # as computed.
+    self.assertTrue(recording.is_computed())
+    self.assertTrue(recording.computed())
+    self.assertFalse(recording.uncomputed())
+
+
+class RecordingManagerTest(unittest.TestCase):
+  def setUp(self):
+    ie.new_env()
+
+  @unittest.skipIf(
+      sys.version_info < (3, 6, 0),
+      'This test requires at least Python 3.6 to work.')
+  def test_basic_wordcount(self):

Review comment:
       I'm a little confused about this test. Where does the pipeline run here?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] rohdesamuel commented on pull request #12415: [BEAM-10603] Add the RecordingManager and associated classes.

Posted by GitBox <gi...@apache.org>.
rohdesamuel commented on pull request #12415:
URL: https://github.com/apache/beam/pull/12415#issuecomment-679255172


   R: @pabloem 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] codecov[bot] edited a comment on pull request #12415: [BEAM-10603] Add the RecordingManager and associated classes.

Posted by GitBox <gi...@apache.org>.
codecov[bot] edited a comment on pull request #12415:
URL: https://github.com/apache/beam/pull/12415#issuecomment-679302193


   # [Codecov](https://codecov.io/gh/apache/beam/pull/12415?src=pr&el=h1) Report
   > Merging [#12415](https://codecov.io/gh/apache/beam/pull/12415?src=pr&el=desc) into [master](https://codecov.io/gh/apache/beam/commit/e164d170eb6b5ec1dddd99f09e79dfb0147b84ae?el=desc) will **decrease** coverage by `0.08%`.
   > The diff coverage is `n/a`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/beam/pull/12415/graphs/tree.svg?width=650&height=150&src=pr&token=qcbbAh8Fj1)](https://codecov.io/gh/apache/beam/pull/12415?src=pr&el=tree)
   
   ```diff
   @@            Coverage Diff             @@
   ##           master   #12415      +/-   ##
   ==========================================
   - Coverage   34.47%   34.38%   -0.09%     
   ==========================================
     Files         684      695      +11     
     Lines       81483    84498    +3015     
     Branches     9180     9891     +711     
   ==========================================
   + Hits        28090    29058     +968     
   - Misses      52972    54960    +1988     
   - Partials      421      480      +59     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/beam/pull/12415?src=pr&el=tree) | Coverage Δ | |
   |---|---|---|
   | [typehints/typecheck\_test\_py3.py](https://codecov.io/gh/apache/beam/pull/12415/diff?src=pr&el=tree#diff-dHlwZWhpbnRzL3R5cGVjaGVja190ZXN0X3B5My5weQ==) | `33.72% <0.00%> (-13.83%)` | :arrow_down: |
   | [typehints/typecheck.py](https://codecov.io/gh/apache/beam/pull/12415/diff?src=pr&el=tree#diff-dHlwZWhpbnRzL3R5cGVjaGVjay5weQ==) | `31.38% <0.00%> (-4.24%)` | :arrow_down: |
   | [io/jdbc.py](https://codecov.io/gh/apache/beam/pull/12415/diff?src=pr&el=tree#diff-aW8vamRiYy5weQ==) | `82.35% <0.00%> (-2.65%)` | :arrow_down: |
   | [utils/interactive\_utils.py](https://codecov.io/gh/apache/beam/pull/12415/diff?src=pr&el=tree#diff-dXRpbHMvaW50ZXJhY3RpdmVfdXRpbHMucHk=) | `30.95% <0.00%> (-2.39%)` | :arrow_down: |
   | [io/external/xlang\_jdbcio\_it\_test.py](https://codecov.io/gh/apache/beam/pull/12415/diff?src=pr&el=tree#diff-aW8vZXh0ZXJuYWwveGxhbmdfamRiY2lvX2l0X3Rlc3QucHk=) | `40.24% <0.00%> (-1.65%)` | :arrow_down: |
   | [io/mongodbio\_test.py](https://codecov.io/gh/apache/beam/pull/12415/diff?src=pr&el=tree#diff-aW8vbW9uZ29kYmlvX3Rlc3QucHk=) | `34.95% <0.00%> (ø)` | |
   | [runners/direct/direct\_runner.py](https://codecov.io/gh/apache/beam/pull/12415/diff?src=pr&el=tree#diff-cnVubmVycy9kaXJlY3QvZGlyZWN0X3J1bm5lci5weQ==) | `32.88% <0.00%> (ø)` | |
   | [io/gcp/experimental/spannerio.py](https://codecov.io/gh/apache/beam/pull/12415/diff?src=pr&el=tree#diff-aW8vZ2NwL2V4cGVyaW1lbnRhbC9zcGFubmVyaW8ucHk=) | `34.49% <0.00%> (ø)` | |
   | [runners/dataflow/dataflow\_runner\_test.py](https://codecov.io/gh/apache/beam/pull/12415/diff?src=pr&el=tree#diff-cnVubmVycy9kYXRhZmxvdy9kYXRhZmxvd19ydW5uZXJfdGVzdC5weQ==) | `25.49% <0.00%> (ø)` | |
   | [runners/dataflow/ptransform\_overrides.py](https://codecov.io/gh/apache/beam/pull/12415/diff?src=pr&el=tree#diff-cnVubmVycy9kYXRhZmxvdy9wdHJhbnNmb3JtX292ZXJyaWRlcy5weQ==) | `18.11% <0.00%> (ø)` | |
   | ... and [31 more](https://codecov.io/gh/apache/beam/pull/12415/diff?src=pr&el=tree-more) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/beam/pull/12415?src=pr&el=continue).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/beam/pull/12415?src=pr&el=footer). Last update [919ade7...42c5537](https://codecov.io/gh/apache/beam/pull/12415?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] rohdesamuel commented on a change in pull request #12415: [BEAM-10603] Add the RecordingManager and associated classes.

Posted by GitBox <gi...@apache.org>.
rohdesamuel commented on a change in pull request #12415:
URL: https://github.com/apache/beam/pull/12415#discussion_r473308012



##########
File path: sdks/python/apache_beam/runners/interactive/recording_manager.py
##########
@@ -0,0 +1,329 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from __future__ import absolute_import
+
+import logging
+import threading
+import time
+import warnings
+
+import apache_beam as beam
+from apache_beam.runners.interactive import background_caching_job as bcj
+from apache_beam.runners.interactive import interactive_environment as ie
+from apache_beam.runners.interactive import interactive_runner as ir
+from apache_beam.runners.interactive import pipeline_fragment as pf
+from apache_beam.runners.interactive import pipeline_instrument as pi
+from apache_beam.runners.interactive import utils
+from apache_beam.runners.interactive.options.capture_limiters import CountLimiter
+from apache_beam.runners.interactive.options.capture_limiters import ProcessingTimeLimiter
+
+_LOGGER = logging.getLogger(__name__)
+
+PipelineState = beam.runners.runner.PipelineState
+
+
+class ElementStream:
+  """A stream of elements from a given PCollection."""
+  def __init__(
+      self,
+      pcoll,  # type: beam.pvalue.PCollection
+      var,  # type: str
+      cache_key,  # type: str
+      max_n,  # type: int
+      max_duration_secs  # type: float
+      ):
+    self._pcoll = pcoll
+    self._cache_key = cache_key
+    self._pipeline = pcoll.pipeline
+    self._var = var
+    self._n = max_n
+    self._duration_secs = max_duration_secs
+
+    # A small state variable that when True, indicates that no more new elements
+    # will be yielded if read() is called again.
+    self._done = False
+
+  def var(self):
+    # type: () -> str
+
+    """Returns the variable named that defined this PCollection."""
+    return self._var
+
+  def display_id(self, suffix):
+    #Any type: (str) -> str
+
+    """Returns a unique id able to be displayed in a web browser."""
+    return utils.obfuscate(self._cache_key, suffix)
+
+  def is_computed(self):
+    # type: () -> boolean
+
+    """Returns True if no more elements will be recorded."""
+    return self._pcoll in ie.current_env().computed_pcollections
+
+  def is_done(self):
+    # type: () -> boolean
+
+    """Returns True if no more new elements will be yielded."""
+    return self._done
+
+  def read(self, tail=True):
+    # type: (boolean) -> Any
+
+    """Reads the elements currently recorded."""
+
+    # Get the cache manager and wait until the file exists.
+    cache_manager = ie.current_env().get_cache_manager(self._pipeline)
+    while not cache_manager.exists('full', self._cache_key):
+      time.sleep(0.5)
+
+    # Retrieve the coder for the particular PCollection which will be used to
+    # decode elements read from cache.
+    coder = cache_manager.load_pcoder('full', self._cache_key)
+
+    # Read the elements from the cache.
+    limiters = [
+        CountLimiter(self._n), ProcessingTimeLimiter(self._duration_secs)
+    ]
+    reader, _ = cache_manager.read('full', self._cache_key,
+                                   limiters=limiters,
+                                   tail=tail)
+
+    # Because a single TestStreamFileRecord can yield multiple elements, we
+    # limit the count again here in the to_element_list call.
+    for e in utils.to_element_list(reader,
+                                   coder,
+                                   include_window_info=True,
+                                   n=self._n):
+      yield e
+
+    # A limiter being triggered means that we have fulfilled the user's request.
+    # This implies that reading from the cache again won't yield any new
+    # elements. WLOG, this applies to the user pipeline being terminated.
+    if any(l.is_triggered()
+           for l in limiters) or ie.current_env().is_terminated(self._pipeline):
+      self._done = True

Review comment:
       In that case, it means that there are still more elements to be read from the cache.
   
   If the pipeline isn't finished (and the limiters aren't triggered), the cache will yield an incomplete set of data. Thus, if a user of the ElementStream were to read from the cache again, the cache would yield more results. 
   
   There are two ways to exit from reading from cache: limiters are triggered, or all elements in the cache have been read. Just because all the elements in the cache have been read doesn't imply that the pipeline is done nor that the limiters have been triggered.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] rohdesamuel commented on pull request #12415: [BEAM-10603] Add the RecordingManager and associated classes.

Posted by GitBox <gi...@apache.org>.
rohdesamuel commented on pull request #12415:
URL: https://github.com/apache/beam/pull/12415#issuecomment-675760369


   Run Portable_Python PreCommit


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] rohdesamuel commented on a change in pull request #12415: [BEAM-10603] Add the RecordingManager and associated classes.

Posted by GitBox <gi...@apache.org>.
rohdesamuel commented on a change in pull request #12415:
URL: https://github.com/apache/beam/pull/12415#discussion_r474158141



##########
File path: sdks/python/apache_beam/runners/interactive/recording_manager.py
##########
@@ -0,0 +1,334 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from __future__ import absolute_import
+
+import logging
+import threading
+import time
+import warnings
+
+import apache_beam as beam
+from apache_beam.runners.interactive import background_caching_job as bcj
+from apache_beam.runners.interactive import interactive_environment as ie
+from apache_beam.runners.interactive import interactive_runner as ir
+from apache_beam.runners.interactive import pipeline_fragment as pf
+from apache_beam.runners.interactive import pipeline_instrument as pi
+from apache_beam.runners.interactive import utils
+from apache_beam.runners.interactive.options.capture_limiters import CountLimiter
+from apache_beam.runners.interactive.options.capture_limiters import ProcessingTimeLimiter
+
+_LOGGER = logging.getLogger(__name__)
+
+PipelineState = beam.runners.runner.PipelineState
+
+
+class ElementStream:
+  """A stream of elements from a given PCollection."""
+  def __init__(
+      self,
+      pcoll,  # type: beam.pvalue.PCollection
+      var,  # type: str
+      cache_key,  # type: str
+      max_n,  # type: int
+      max_duration_secs  # type: float
+      ):
+    self._pcoll = pcoll
+    self._cache_key = cache_key
+    self._pipeline = pcoll.pipeline
+    self._var = var
+    self._n = max_n
+    self._duration_secs = max_duration_secs
+
+    # A small state variable that when True, indicates that no more new elements
+    # will be yielded if read() is called again.
+    self._done = False
+
+  def var(self):
+    # type: () -> str
+
+    """Returns the variable named that defined this PCollection."""
+    return self._var
+
+  def display_id(self, suffix):
+    #Any type: (str) -> str

Review comment:
       Typo, good catch!




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] pabloem commented on a change in pull request #12415: [BEAM-10603] Add the RecordingManager and associated classes.

Posted by GitBox <gi...@apache.org>.
pabloem commented on a change in pull request #12415:
URL: https://github.com/apache/beam/pull/12415#discussion_r475799128



##########
File path: sdks/python/apache_beam/runners/interactive/recording_manager.py
##########
@@ -0,0 +1,334 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from __future__ import absolute_import
+
+import logging
+import threading
+import time
+import warnings
+
+import apache_beam as beam
+from apache_beam.runners.interactive import background_caching_job as bcj
+from apache_beam.runners.interactive import interactive_environment as ie
+from apache_beam.runners.interactive import interactive_runner as ir
+from apache_beam.runners.interactive import pipeline_fragment as pf
+from apache_beam.runners.interactive import pipeline_instrument as pi
+from apache_beam.runners.interactive import utils
+from apache_beam.runners.interactive.options.capture_limiters import CountLimiter
+from apache_beam.runners.interactive.options.capture_limiters import ProcessingTimeLimiter
+
+_LOGGER = logging.getLogger(__name__)
+
+PipelineState = beam.runners.runner.PipelineState
+
+
+class ElementStream:
+  """A stream of elements from a given PCollection."""
+  def __init__(
+      self,
+      pcoll,  # type: beam.pvalue.PCollection
+      var,  # type: str
+      cache_key,  # type: str
+      max_n,  # type: int
+      max_duration_secs  # type: float
+      ):
+    self._pcoll = pcoll
+    self._cache_key = cache_key
+    self._pipeline = pcoll.pipeline
+    self._var = var
+    self._n = max_n
+    self._duration_secs = max_duration_secs
+
+    # A small state variable that when True, indicates that no more new elements
+    # will be yielded if read() is called again.
+    self._done = False
+
+  def var(self):
+    # type: () -> str
+
+    """Returns the variable named that defined this PCollection."""
+    return self._var
+
+  def display_id(self, suffix):
+    # type: (str) -> str
+
+    """Returns a unique id able to be displayed in a web browser."""
+    return utils.obfuscate(self._cache_key, suffix)
+
+  def is_computed(self):
+    # type: () -> boolean
+
+    """Returns True if no more elements will be recorded."""
+    return self._pcoll in ie.current_env().computed_pcollections
+
+  def is_done(self):
+    # type: () -> boolean
+
+    """Returns True if no more new elements will be yielded."""
+    return self._done
+
+  def read(self, tail=True):
+    # type: (boolean) -> Any
+
+    """Reads the elements currently recorded."""
+
+    # Get the cache manager and wait until the file exists.
+    cache_manager = ie.current_env().get_cache_manager(self._pipeline)
+    while not cache_manager.exists('full', self._cache_key):
+      time.sleep(0.5)

Review comment:
       sounds good




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] davidyan74 commented on a change in pull request #12415: [BEAM-10603] Add the RecordingManager and associated classes.

Posted by GitBox <gi...@apache.org>.
davidyan74 commented on a change in pull request #12415:
URL: https://github.com/apache/beam/pull/12415#discussion_r473311578



##########
File path: sdks/python/apache_beam/runners/interactive/recording_manager.py
##########
@@ -0,0 +1,329 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from __future__ import absolute_import
+
+import logging
+import threading
+import time
+import warnings
+
+import apache_beam as beam
+from apache_beam.runners.interactive import background_caching_job as bcj
+from apache_beam.runners.interactive import interactive_environment as ie
+from apache_beam.runners.interactive import interactive_runner as ir
+from apache_beam.runners.interactive import pipeline_fragment as pf
+from apache_beam.runners.interactive import pipeline_instrument as pi
+from apache_beam.runners.interactive import utils
+from apache_beam.runners.interactive.options.capture_limiters import CountLimiter
+from apache_beam.runners.interactive.options.capture_limiters import ProcessingTimeLimiter
+
+_LOGGER = logging.getLogger(__name__)
+
+PipelineState = beam.runners.runner.PipelineState
+
+
+class ElementStream:
+  """A stream of elements from a given PCollection."""
+  def __init__(
+      self,
+      pcoll,  # type: beam.pvalue.PCollection
+      var,  # type: str
+      cache_key,  # type: str
+      max_n,  # type: int
+      max_duration_secs  # type: float
+      ):
+    self._pcoll = pcoll
+    self._cache_key = cache_key
+    self._pipeline = pcoll.pipeline
+    self._var = var
+    self._n = max_n
+    self._duration_secs = max_duration_secs
+
+    # A small state variable that when True, indicates that no more new elements
+    # will be yielded if read() is called again.
+    self._done = False
+
+  def var(self):
+    # type: () -> str
+
+    """Returns the variable named that defined this PCollection."""
+    return self._var
+
+  def display_id(self, suffix):
+    #Any type: (str) -> str
+
+    """Returns a unique id able to be displayed in a web browser."""
+    return utils.obfuscate(self._cache_key, suffix)
+
+  def is_computed(self):
+    # type: () -> boolean
+
+    """Returns True if no more elements will be recorded."""
+    return self._pcoll in ie.current_env().computed_pcollections
+
+  def is_done(self):
+    # type: () -> boolean
+
+    """Returns True if no more new elements will be yielded."""
+    return self._done
+
+  def read(self, tail=True):
+    # type: (boolean) -> Any
+
+    """Reads the elements currently recorded."""
+
+    # Get the cache manager and wait until the file exists.
+    cache_manager = ie.current_env().get_cache_manager(self._pipeline)
+    while not cache_manager.exists('full', self._cache_key):
+      time.sleep(0.5)
+
+    # Retrieve the coder for the particular PCollection which will be used to
+    # decode elements read from cache.
+    coder = cache_manager.load_pcoder('full', self._cache_key)
+
+    # Read the elements from the cache.
+    limiters = [
+        CountLimiter(self._n), ProcessingTimeLimiter(self._duration_secs)
+    ]
+    reader, _ = cache_manager.read('full', self._cache_key,
+                                   limiters=limiters,
+                                   tail=tail)
+
+    # Because a single TestStreamFileRecord can yield multiple elements, we
+    # limit the count again here in the to_element_list call.
+    for e in utils.to_element_list(reader,
+                                   coder,
+                                   include_window_info=True,
+                                   n=self._n):
+      yield e
+
+    # A limiter being triggered means that we have fulfilled the user's request.
+    # This implies that reading from the cache again won't yield any new
+    # elements. WLOG, this applies to the user pipeline being terminated.
+    if any(l.is_triggered()
+           for l in limiters) or ie.current_env().is_terminated(self._pipeline):
+      self._done = True

Review comment:
       Got it. Can you also add a comment in the code to explain that condition?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] rohdesamuel commented on pull request #12415: [BEAM-10603] Add the RecordingManager and associated classes.

Posted by GitBox <gi...@apache.org>.
rohdesamuel commented on pull request #12415:
URL: https://github.com/apache/beam/pull/12415#issuecomment-675710457






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] rohdesamuel commented on pull request #12415: [BEAM-10603] Add the RecordingManager and associated classes.

Posted by GitBox <gi...@apache.org>.
rohdesamuel commented on pull request #12415:
URL: https://github.com/apache/beam/pull/12415#issuecomment-675165677


   Retest this please


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org