You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2020/04/10 21:10:28 UTC

[GitHub] [beam] rohdesamuel opened a new pull request #11389: Refactor the BCJ and capture controls to be more testable

rohdesamuel opened a new pull request #11389: Refactor the BCJ and capture controls to be more testable
URL: https://github.com/apache/beam/pull/11389
 
 
   Change-Id: I51c5869a30ab4c82d727486604a77e3fc300f5be
   
   This pulls out the capture control logic from the BCJ into a "Limiter" class. These are super simple classes that return True from a "is_triggered" method.
   
   ------------------------
   
   Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
   
    - [ ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`).
    - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.
    - [ ] Update `CHANGES.md` with noteworthy changes.
    - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier).
   
   Post-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark
   --- | --- | --- | --- | --- | --- | --- | ---
   Go | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/)
   Java | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/)
   Python | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/) | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Python2_PVR_Flink_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Python2_PVR_Flink_Cron/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Python35_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python35_VR_Flink/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/)
   XLang | --- | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/)
   
   Pre-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   --- |Java | Python | Go | Website
   --- | --- | --- | --- | ---
   Non-portable | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/) 
   Portable | --- | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/) | --- | ---
   
   See [.test-infra/jenkins/README](https://github.com/apache/beam/blob/master/.test-infra/jenkins/README.md) for trigger phrase, status and link of all Jenkins jobs.
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] pabloem commented on issue #11389: Refactor the BCJ and capture controls to be more testable

Posted by GitBox <gi...@apache.org>.
pabloem commented on issue #11389: Refactor the BCJ and capture controls to be more testable
URL: https://github.com/apache/beam/pull/11389#issuecomment-613144229
 
 
   retest this please

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] pabloem merged pull request #11389: Refactor the BCJ and capture controls to be more testable

Posted by GitBox <gi...@apache.org>.
pabloem merged pull request #11389: Refactor the BCJ and capture controls to be more testable
URL: https://github.com/apache/beam/pull/11389
 
 
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] pabloem commented on issue #11389: Refactor the BCJ and capture controls to be more testable

Posted by GitBox <gi...@apache.org>.
pabloem commented on issue #11389: Refactor the BCJ and capture controls to be more testable
URL: https://github.com/apache/beam/pull/11389#issuecomment-613120572
 
 
   retest this please

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] pabloem commented on a change in pull request #11389: Refactor the BCJ and capture controls to be more testable

Posted by GitBox <gi...@apache.org>.
pabloem commented on a change in pull request #11389: Refactor the BCJ and capture controls to be more testable
URL: https://github.com/apache/beam/pull/11389#discussion_r407689487
 
 

 ##########
 File path: sdks/python/apache_beam/runners/interactive/interactive_runner_test.py
 ##########
 @@ -196,6 +197,28 @@ def process(self, element):
     # applied but needs an IPython environment. So we manually run this here.
     ie.current_env().track_user_pipelines()
 
+    # Create a fake limiter that cancels the BCJ once the main job receives the
+    # expected amount of results.
+    class FakeLimiter:
+      def __init__(self, p, pcoll):
+        self.p = p
+        self.pcoll = pcoll
+
+      def is_triggered(self):
+        result = ie.current_env().pipeline_result(self.p)
+        if result:
+          try:
+            results = result.get(self.pcoll)
+          except ValueError:
+            return False
+          return len(results) >= 10
+        return False
+
+    # This sets the limiters to stop reading when the test receives 10 elements
+    # or after 5 seconds have elapsed (to eliminate the possibility of hanging).
+    ie.current_env().options.capture_control.set_limiters_for_test(
+        [FakeLimiter(p, data), DurationLimiter(timedelta(seconds=5))])
 
 Review comment:
   IIUC, the test will run at most for 5 seconds (at least .5 from the sleep time of the BCJ checker, right?), but will pass as soon as `>= 10` elements are available, right?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] pabloem commented on a change in pull request #11389: Refactor the BCJ and capture controls to be more testable

Posted by GitBox <gi...@apache.org>.
pabloem commented on a change in pull request #11389: Refactor the BCJ and capture controls to be more testable
URL: https://github.com/apache/beam/pull/11389#discussion_r407693399
 
 

 ##########
 File path: sdks/python/apache_beam/runners/interactive/background_caching_job.py
 ##########
 @@ -66,61 +66,40 @@ class BackgroundCachingJob(object):
   In both situations, the background caching job should be treated as done
   successfully.
   """
-  def __init__(self, pipeline_result, start_limit_checkers=True):
+  def __init__(self, pipeline_result, limiters):
     self._pipeline_result = pipeline_result
-    self._timer = threading.Timer(
-        ie.current_env().options.capture_duration.total_seconds(), self._cancel)
-    self._timer.daemon = True
     self._condition_checker = threading.Thread(
         target=self._background_caching_job_condition_checker, daemon=True)
-    if start_limit_checkers:
-      self._timer.start()
-      self._condition_checker.start()
-    self._timer_triggered = False
-    self._condition_checker_triggered = False
+
+    # Limiters are checks s.t. if any are triggered then the background caching
+    # job gets cancelled.
+    self._limiters = limiters
+    self._condition_checker.start()
 
   def _background_caching_job_condition_checker(self):
     while not PipelineState.is_terminal(self._pipeline_result.state):
       if self._should_end_condition_checker():
+        self.cancel()
         break
-      time.sleep(5)
+      time.sleep(0.5)
 
 Review comment:
   maybe the wait time should be parameterizable? Is it pretty cheap to check the limiters? Up to you.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] pabloem commented on a change in pull request #11389: Refactor the BCJ and capture controls to be more testable

Posted by GitBox <gi...@apache.org>.
pabloem commented on a change in pull request #11389: Refactor the BCJ and capture controls to be more testable
URL: https://github.com/apache/beam/pull/11389#discussion_r407684719
 
 

 ##########
 File path: sdks/python/apache_beam/runners/interactive/options/capture_limiters.py
 ##########
 @@ -0,0 +1,70 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Module to condition how Interactive Beam stops capturing data.
+
+For internal use only; no backwards-compatibility guarantees.
+"""
+
+from __future__ import absolute_import
+
+import threading
+
+from apache_beam.runners.interactive import interactive_environment as ie
+
+
+class Limiter:
+  """Limits an aspect of the caching layer."""
+  def is_triggered(self):
+    # type: () -> bool
+
+    """Returns True if the limiter has triggered."""
 
 Review comment:
   ```suggestion
       """Returns True if the limiter has triggered, and caching should stop."""
   ```

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] rohdesamuel commented on issue #11389: Refactor the BCJ and capture controls to be more testable

Posted by GitBox <gi...@apache.org>.
rohdesamuel commented on issue #11389: Refactor the BCJ and capture controls to be more testable
URL: https://github.com/apache/beam/pull/11389#issuecomment-612219324
 
 
   R: @pabloem 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] rohdesamuel commented on issue #11389: Refactor the BCJ and capture controls to be more testable

Posted by GitBox <gi...@apache.org>.
rohdesamuel commented on issue #11389: Refactor the BCJ and capture controls to be more testable
URL: https://github.com/apache/beam/pull/11389#issuecomment-613118141
 
 
   Thanks Pablo! Let me also get Ning's LGTM.
   
   R: @KevinGG 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] rohdesamuel commented on a change in pull request #11389: Refactor the BCJ and capture controls to be more testable

Posted by GitBox <gi...@apache.org>.
rohdesamuel commented on a change in pull request #11389: Refactor the BCJ and capture controls to be more testable
URL: https://github.com/apache/beam/pull/11389#discussion_r407745428
 
 

 ##########
 File path: sdks/python/apache_beam/runners/interactive/background_caching_job.py
 ##########
 @@ -66,61 +66,40 @@ class BackgroundCachingJob(object):
   In both situations, the background caching job should be treated as done
   successfully.
   """
-  def __init__(self, pipeline_result, start_limit_checkers=True):
+  def __init__(self, pipeline_result, limiters):
     self._pipeline_result = pipeline_result
-    self._timer = threading.Timer(
-        ie.current_env().options.capture_duration.total_seconds(), self._cancel)
-    self._timer.daemon = True
     self._condition_checker = threading.Thread(
         target=self._background_caching_job_condition_checker, daemon=True)
-    if start_limit_checkers:
-      self._timer.start()
-      self._condition_checker.start()
-    self._timer_triggered = False
-    self._condition_checker_triggered = False
+
+    # Limiters are checks s.t. if any are triggered then the background caching
+    # job gets cancelled.
+    self._limiters = limiters
+    self._condition_checker.start()
 
   def _background_caching_job_condition_checker(self):
     while not PipelineState.is_terminal(self._pipeline_result.state):
       if self._should_end_condition_checker():
+        self.cancel()
         break
-      time.sleep(5)
+      time.sleep(0.5)
 
 Review comment:
   It's pretty cheap to check the limiters, so I don't think the wait needs to be parameterizable. This sleep is only here so that this loop doesn't chew up the CPU.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] pabloem commented on issue #11389: Refactor the BCJ and capture controls to be more testable

Posted by GitBox <gi...@apache.org>.
pabloem commented on issue #11389: Refactor the BCJ and capture controls to be more testable
URL: https://github.com/apache/beam/pull/11389#issuecomment-612221061
 
 
   retest this please

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] rohdesamuel commented on a change in pull request #11389: Refactor the BCJ and capture controls to be more testable

Posted by GitBox <gi...@apache.org>.
rohdesamuel commented on a change in pull request #11389: Refactor the BCJ and capture controls to be more testable
URL: https://github.com/apache/beam/pull/11389#discussion_r407745006
 
 

 ##########
 File path: sdks/python/apache_beam/runners/interactive/interactive_runner_test.py
 ##########
 @@ -196,6 +197,28 @@ def process(self, element):
     # applied but needs an IPython environment. So we manually run this here.
     ie.current_env().track_user_pipelines()
 
+    # Create a fake limiter that cancels the BCJ once the main job receives the
+    # expected amount of results.
+    class FakeLimiter:
+      def __init__(self, p, pcoll):
+        self.p = p
+        self.pcoll = pcoll
+
+      def is_triggered(self):
+        result = ie.current_env().pipeline_result(self.p)
+        if result:
+          try:
+            results = result.get(self.pcoll)
+          except ValueError:
+            return False
+          return len(results) >= 10
+        return False
+
+    # This sets the limiters to stop reading when the test receives 10 elements
+    # or after 5 seconds have elapsed (to eliminate the possibility of hanging).
+    ie.current_env().options.capture_control.set_limiters_for_test(
+        [FakeLimiter(p, data), DurationLimiter(timedelta(seconds=5))])
 
 Review comment:
   Yep! 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services