You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by "robertwb (via GitHub)" <gi...@apache.org> on 2023/05/03 17:31:22 UTC

[GitHub] [beam] robertwb opened a new pull request, #26526: Add timeout parameter to with_exception_handling.

robertwb opened a new pull request, #26526:
URL: https://github.com/apache/beam/pull/26526

   
   ------------------------
   
   Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
   
    - [ ] Mention the appropriate issue in your description (for example: `addresses #123`), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, comment `fixes #<ISSUE NUMBER>` instead.
    - [ ] Update `CHANGES.md` with noteworthy changes.
    - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/get-started-contributing/#make-the-reviewers-job-easier).
   
   To check the build health, please visit [https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md](https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md)
   
   GitHub Actions Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   [![Build python source distribution and wheels](https://github.com/apache/beam/workflows/Build%20python%20source%20distribution%20and%20wheels/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Build+python+source+distribution+and+wheels%22+branch%3Amaster+event%3Aschedule)
   [![Python tests](https://github.com/apache/beam/workflows/Python%20tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Python+Tests%22+branch%3Amaster+event%3Aschedule)
   [![Java tests](https://github.com/apache/beam/workflows/Java%20Tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Java+Tests%22+branch%3Amaster+event%3Aschedule)
   [![Go tests](https://github.com/apache/beam/workflows/Go%20tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Go+tests%22+branch%3Amaster+event%3Aschedule)
   
   See [CI.md](https://github.com/apache/beam/blob/master/CI.md) for more information about GitHub Actions CI.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] riteshghorse merged pull request #26526: Add timeout parameter to with_exception_handling.

Posted by "riteshghorse (via GitHub)" <gi...@apache.org>.
riteshghorse merged PR #26526:
URL: https://github.com/apache/beam/pull/26526


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] github-actions[bot] commented on pull request #26526: Add timeout parameter to with_exception_handling.

Posted by "github-actions[bot] (via GitHub)" <gi...@apache.org>.
github-actions[bot] commented on PR #26526:
URL: https://github.com/apache/beam/pull/26526#issuecomment-1533438561

   Stopping reviewer notifications for this pull request: review requested by someone other than the bot, ceding control


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] cozos commented on a diff in pull request #26526: Add timeout parameter to with_exception_handling.

Posted by "cozos (via GitHub)" <gi...@apache.org>.
cozos commented on code in PR #26526:
URL: https://github.com/apache/beam/pull/26526#discussion_r1185933893


##########
sdks/python/apache_beam/transforms/core.py:
##########
@@ -2268,19 +2279,35 @@ def finish_bundle(self):
 
   def teardown(self):
     self._call_remote(self._remote_teardown)
-    self._pool.shutdown()
-    self._pool = None
+    self._terminate_pool()
 
   def _call_remote(self, method, *args, **kwargs):
     if self._pool is None:
       self._pool = concurrent.futures.ProcessPoolExecutor(1)
       self._pool.submit(self._remote_init, self._serialized_fn).result()
     try:
-      return self._pool.submit(method, *args, **kwargs).result()
-    except concurrent.futures.process.BrokenProcessPool:
-      self._pool = None
+      return self._pool.submit(method, *args, **kwargs).result(
+          self._timeout if method == self._remote_process else None)
+    except (concurrent.futures.process.BrokenProcessPool,
+            TimeoutError,
+            concurrent.futures._base.TimeoutError):
+      self._terminate_pool()
       raise
 
+  def _terminate_pool(self):
+    """Forcibly terminate the pool, not leaving any live subprocesses."""
+    pool = self._pool
+    self._pool = None
+    processes = list(pool._processes.values())
+    pool.shutdown(wait=False)
+    for p in processes:
+      if p.is_alive():
+        p.kill()
+    time.sleep(1)
+    for p in processes:
+      if p.is_alive():
+        p.terminate()

Review Comment:
   My understanding is that `SIGTERM` can be intercepted by a program and thus lets it gracefully die, whereas SIGKILL cannot be intercepted and thus results in abrupt termination. With that in mind, wouldn't it make sense to terminate, then kill?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] cozos commented on a diff in pull request #26526: Add timeout parameter to with_exception_handling.

Posted by "cozos (via GitHub)" <gi...@apache.org>.
cozos commented on code in PR #26526:
URL: https://github.com/apache/beam/pull/26526#discussion_r1185957495


##########
sdks/python/apache_beam/transforms/core.py:
##########
@@ -2321,6 +2348,43 @@ def _remote_teardown(cls):
     cls._fn = None
 
 
+class _TimeoutDoFn(DoFn):
+  """Process method run in a separate thread allowing timeouts.
+  """
+  def __init__(self, fn, timeout=None):
+    self._fn = fn
+    self._timeout = timeout
+    self._pool = None
+
+  def __getattribute__(self, name):
+    if (name.startswith('__') or name in self.__dict__ or
+        name in type(self).__dict__):
+      return object.__getattribute__(self, name)
+    else:
+      return getattr(self._fn, name)
+
+  def process(self, *args, **kwargs):
+    if self._pool is None:
+      self._pool = concurrent.futures.ThreadPoolExecutor(10)
+    # Ensure we iterate over the entire output list in the given amount of time.
+    try:
+      return self._pool.submit(
+          lambda: list(self._fn.process(*args, **kwargs))).result(
+              self._timeout)
+    except TimeoutError:
+      self._pool.shutdown(wait=False)
+      self._pool = None
+      raise
+
+  def teardown(self):
+    try:
+      self._fn.teardown()
+    finally:
+      if self._pool is not None:
+        self._pool.shutdown(wait=False)

Review Comment:
   @robertwb I think comes with lots of problems. `shutdown(wait=False)` is non blocking but actively running threads will NOT be cancelled. That means that the timed out task threads in the pool will become zombies - they continue running, consuming memory and CPU resources until its execution finishes - try putting an infinite loop in the TimeoutDoFn. Users who use this will think they have a memory leak. 
   
   From https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.Executor.shutdown
   
   > If wait is False then this method will return immediately and the resources associated with the executor will be freed when all pending futures are done executing. Regardless of the value of wait, the entire Python program will not exit until all pending futures are done executing.
   >
   > If cancel_futures is True, this method will cancel all pending futures that the executor has not started running. Any futures that are completed or running won’t be cancelled, regardless of the value of cancel_futures.
   
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] riteshghorse commented on pull request #26526: Add timeout parameter to with_exception_handling.

Posted by "riteshghorse (via GitHub)" <gi...@apache.org>.
riteshghorse commented on PR #26526:
URL: https://github.com/apache/beam/pull/26526#issuecomment-1533703745

   Checks passed but not updated here, Merging!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] cozos commented on a diff in pull request #26526: Add timeout parameter to with_exception_handling.

Posted by "cozos (via GitHub)" <gi...@apache.org>.
cozos commented on code in PR #26526:
URL: https://github.com/apache/beam/pull/26526#discussion_r1185957495


##########
sdks/python/apache_beam/transforms/core.py:
##########
@@ -2321,6 +2348,43 @@ def _remote_teardown(cls):
     cls._fn = None
 
 
+class _TimeoutDoFn(DoFn):
+  """Process method run in a separate thread allowing timeouts.
+  """
+  def __init__(self, fn, timeout=None):
+    self._fn = fn
+    self._timeout = timeout
+    self._pool = None
+
+  def __getattribute__(self, name):
+    if (name.startswith('__') or name in self.__dict__ or
+        name in type(self).__dict__):
+      return object.__getattribute__(self, name)
+    else:
+      return getattr(self._fn, name)
+
+  def process(self, *args, **kwargs):
+    if self._pool is None:
+      self._pool = concurrent.futures.ThreadPoolExecutor(10)
+    # Ensure we iterate over the entire output list in the given amount of time.
+    try:
+      return self._pool.submit(
+          lambda: list(self._fn.process(*args, **kwargs))).result(
+              self._timeout)
+    except TimeoutError:
+      self._pool.shutdown(wait=False)
+      self._pool = None
+      raise
+
+  def teardown(self):
+    try:
+      self._fn.teardown()
+    finally:
+      if self._pool is not None:
+        self._pool.shutdown(wait=False)

Review Comment:
   @robertwb I think comes with lots of problems. `shutdown(wait=False)` is non blocking but actively running threads will NOT be cancelled. That means that the timed out task threads in the pool will become zombies - they continue running, consuming memory and CPU resources until its execution finishes - try putting an infinite loop in the TimeoutDoFn. Users who use this will think they have a memory leak. 
   
   From https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.Executor.shutdown
   
   > If wait is False then this method will return immediately and the resources associated with the executor will be freed when all pending futures are done executing. Regardless of the value of wait, the entire Python program will not exit until all pending futures are done executing.
   >
   > If cancel_futures is True, this method will cancel all pending futures that the executor has not started running. **Any futures that are completed or running won’t be cancelled, regardless of the value of cancel_futures**.
   
   
   



##########
sdks/python/apache_beam/transforms/core.py:
##########
@@ -2321,6 +2348,43 @@ def _remote_teardown(cls):
     cls._fn = None
 
 
+class _TimeoutDoFn(DoFn):
+  """Process method run in a separate thread allowing timeouts.
+  """
+  def __init__(self, fn, timeout=None):
+    self._fn = fn
+    self._timeout = timeout
+    self._pool = None
+
+  def __getattribute__(self, name):
+    if (name.startswith('__') or name in self.__dict__ or
+        name in type(self).__dict__):
+      return object.__getattribute__(self, name)
+    else:
+      return getattr(self._fn, name)
+
+  def process(self, *args, **kwargs):
+    if self._pool is None:
+      self._pool = concurrent.futures.ThreadPoolExecutor(10)
+    # Ensure we iterate over the entire output list in the given amount of time.
+    try:
+      return self._pool.submit(
+          lambda: list(self._fn.process(*args, **kwargs))).result(
+              self._timeout)
+    except TimeoutError:
+      self._pool.shutdown(wait=False)
+      self._pool = None
+      raise
+
+  def teardown(self):
+    try:
+      self._fn.teardown()
+    finally:
+      if self._pool is not None:
+        self._pool.shutdown(wait=False)

Review Comment:
   @robertwb I think comes with lots of problems. `shutdown(wait=False)` is non blocking but actively running threads will NOT be cancelled. That means that the timed out task threads in the pool will become zombies - they continue running, consuming memory and CPU resources until its execution finishes - try putting an infinite loop in the TimeoutDoFn. Users who use this will think they have a memory leak. 
   
   From https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.Executor.shutdown
   
   > If wait is False then this method will return immediately and the resources associated with the executor will be freed when all pending futures are done executing. Regardless of the value of wait, the entire Python program will not exit until all pending futures are done executing.
   >
   > If cancel_futures is True, this method will cancel all pending futures that the executor has not started running. **Any futures that are completed or running won’t be cancelled, regardless of the value of cancel_futures**.
   
   (emphasis mine)
   
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] cozos commented on a diff in pull request #26526: Add timeout parameter to with_exception_handling.

Posted by "cozos (via GitHub)" <gi...@apache.org>.
cozos commented on code in PR #26526:
URL: https://github.com/apache/beam/pull/26526#discussion_r1185957495


##########
sdks/python/apache_beam/transforms/core.py:
##########
@@ -2321,6 +2348,43 @@ def _remote_teardown(cls):
     cls._fn = None
 
 
+class _TimeoutDoFn(DoFn):
+  """Process method run in a separate thread allowing timeouts.
+  """
+  def __init__(self, fn, timeout=None):
+    self._fn = fn
+    self._timeout = timeout
+    self._pool = None
+
+  def __getattribute__(self, name):
+    if (name.startswith('__') or name in self.__dict__ or
+        name in type(self).__dict__):
+      return object.__getattribute__(self, name)
+    else:
+      return getattr(self._fn, name)
+
+  def process(self, *args, **kwargs):
+    if self._pool is None:
+      self._pool = concurrent.futures.ThreadPoolExecutor(10)
+    # Ensure we iterate over the entire output list in the given amount of time.
+    try:
+      return self._pool.submit(
+          lambda: list(self._fn.process(*args, **kwargs))).result(
+              self._timeout)
+    except TimeoutError:
+      self._pool.shutdown(wait=False)
+      self._pool = None
+      raise
+
+  def teardown(self):
+    try:
+      self._fn.teardown()
+    finally:
+      if self._pool is not None:
+        self._pool.shutdown(wait=False)

Review Comment:
   @robertwb I think comes with lots of problems. `shutdown(wait=False)` is non blocking but actively running threads will NOT be cancelled. That means that the threads in the pool will become zombies - they continue running, consuming memory and CPU resources until its execution finishes. Users who use this will think they have a memory leak. 
   
   From https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.Executor.shutdown
   
   > If wait is False then this method will return immediately and the resources associated with the executor will be freed when all pending futures are done executing. Regardless of the value of wait, the entire Python program will not exit until all pending futures are done executing.
   >
   > If cancel_futures is True, this method will cancel all pending futures that the executor has not started running. Any futures that are completed or running won’t be cancelled, regardless of the value of cancel_futures.
   
   
   Try putting an infinite loop in the TimeoutDoFn.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] cozos commented on a diff in pull request #26526: Add timeout parameter to with_exception_handling.

Posted by "cozos (via GitHub)" <gi...@apache.org>.
cozos commented on code in PR #26526:
URL: https://github.com/apache/beam/pull/26526#discussion_r1185957495


##########
sdks/python/apache_beam/transforms/core.py:
##########
@@ -2321,6 +2348,43 @@ def _remote_teardown(cls):
     cls._fn = None
 
 
+class _TimeoutDoFn(DoFn):
+  """Process method run in a separate thread allowing timeouts.
+  """
+  def __init__(self, fn, timeout=None):
+    self._fn = fn
+    self._timeout = timeout
+    self._pool = None
+
+  def __getattribute__(self, name):
+    if (name.startswith('__') or name in self.__dict__ or
+        name in type(self).__dict__):
+      return object.__getattribute__(self, name)
+    else:
+      return getattr(self._fn, name)
+
+  def process(self, *args, **kwargs):
+    if self._pool is None:
+      self._pool = concurrent.futures.ThreadPoolExecutor(10)
+    # Ensure we iterate over the entire output list in the given amount of time.
+    try:
+      return self._pool.submit(
+          lambda: list(self._fn.process(*args, **kwargs))).result(
+              self._timeout)
+    except TimeoutError:
+      self._pool.shutdown(wait=False)
+      self._pool = None
+      raise
+
+  def teardown(self):
+    try:
+      self._fn.teardown()
+    finally:
+      if self._pool is not None:
+        self._pool.shutdown(wait=False)

Review Comment:
   @robertwb @riteshghorse I think comes with lots of problems. `shutdown(wait=False)` is non blocking but actively running threads will NOT be cancelled. That means that the timed out task threads in the pool will become zombies - they continue running, consuming memory and CPU resources until its execution finishes - try putting an infinite loop in the TimeoutDoFn. Users who use this will think they have a memory leak. 
   
   From https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.Executor.shutdown
   
   > If wait is False then this method will return immediately and the resources associated with the executor will be freed when all pending futures are done executing. Regardless of the value of wait, the entire Python program will not exit until all pending futures are done executing.
   >
   > If cancel_futures is True, this method will cancel all pending futures that the executor has not started running. **Any futures that are completed or running won’t be cancelled, regardless of the value of cancel_futures**.
   
   (emphasis mine)
   
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] codecov[bot] commented on pull request #26526: Add timeout parameter to with_exception_handling.

Posted by "codecov[bot] (via GitHub)" <gi...@apache.org>.
codecov[bot] commented on PR #26526:
URL: https://github.com/apache/beam/pull/26526#issuecomment-1533509041

   ## [Codecov](https://app.codecov.io/gh/apache/beam/pull/26526?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
   > Merging [#26526](https://app.codecov.io/gh/apache/beam/pull/26526?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (329b60b) into [master](https://app.codecov.io/gh/apache/beam/commit/83aa2b6cc22f4b64e8ad742bea6c940c534b2e59?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (83aa2b6) will **decrease** coverage by `9.19%`.
   > The diff coverage is `73.78%`.
   
   ```diff
   @@            Coverage Diff             @@
   ##           master   #26526      +/-   ##
   ==========================================
   - Coverage   81.10%   71.92%   -9.19%     
   ==========================================
     Files         469      752     +283     
     Lines       67304   101771   +34467     
   ==========================================
   + Hits        54587    73195   +18608     
   - Misses      12717    27080   +14363     
   - Partials        0     1496    +1496     
   ```
   
   | Flag | Coverage Δ | |
   |---|---|---|
   | python | `81.11% <88.40%> (+<0.01%)` | :arrow_up: |
   
   Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#carryforward-flags-in-the-pull-request-comment) to find out more.
   
   | [Impacted Files](https://app.codecov.io/gh/apache/beam/pull/26526?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | |
   |---|---|---|
   | [sdks/go/pkg/beam/core/graph/edge.go](https://app.codecov.io/gh/apache/beam/pull/26526?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9nby9wa2cvYmVhbS9jb3JlL2dyYXBoL2VkZ2UuZ28=) | `3.35% <ø> (ø)` | |
   | [sdks/go/pkg/beam/core/runtime/exec/translate.go](https://app.codecov.io/gh/apache/beam/pull/26526?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9nby9wa2cvYmVhbS9jb3JlL3J1bnRpbWUvZXhlYy90cmFuc2xhdGUuZ28=) | `23.74% <0.00%> (ø)` | |
   | [sdks/go/pkg/beam/core/runtime/graphx/serialize.go](https://app.codecov.io/gh/apache/beam/pull/26526?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9nby9wa2cvYmVhbS9jb3JlL3J1bnRpbWUvZ3JhcGh4L3NlcmlhbGl6ZS5nbw==) | `27.56% <0.00%> (ø)` | |
   | [sdks/go/pkg/beam/core/runtime/harness/datamgr.go](https://app.codecov.io/gh/apache/beam/pull/26526?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9nby9wa2cvYmVhbS9jb3JlL3J1bnRpbWUvaGFybmVzcy9kYXRhbWdyLmdv) | `56.61% <ø> (ø)` | |
   | [sdks/go/pkg/beam/core/runtime/harness/harness.go](https://app.codecov.io/gh/apache/beam/pull/26526?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9nby9wa2cvYmVhbS9jb3JlL3J1bnRpbWUvaGFybmVzcy9oYXJuZXNzLmdv) | `11.11% <0.00%> (ø)` | |
   | [sdks/python/apache\_beam/transforms/combiners.py](https://app.codecov.io/gh/apache/beam/pull/26526?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vdHJhbnNmb3Jtcy9jb21iaW5lcnMucHk=) | `93.43% <ø> (ø)` | |
   | [sdks/python/apache\_beam/yaml/yaml\_provider.py](https://app.codecov.io/gh/apache/beam/pull/26526?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0veWFtbC95YW1sX3Byb3ZpZGVyLnB5) | `66.39% <0.00%> (+4.00%)` | :arrow_up: |
   | [sdks/python/setup.py](https://app.codecov.io/gh/apache/beam/pull/26526?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vc2V0dXAucHk=) | `0.00% <ø> (ø)` | |
   | [sdks/go/pkg/beam/core/runtime/graphx/translate.go](https://app.codecov.io/gh/apache/beam/pull/26526?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9nby9wa2cvYmVhbS9jb3JlL3J1bnRpbWUvZ3JhcGh4L3RyYW5zbGF0ZS5nbw==) | `38.00% <12.50%> (ø)` | |
   | [sdks/go/pkg/beam/core/runtime/exec/datasource.go](https://app.codecov.io/gh/apache/beam/pull/26526?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9nby9wa2cvYmVhbS9jb3JlL3J1bnRpbWUvZXhlYy9kYXRhc291cmNlLmdv) | `67.31% <42.85%> (ø)` | |
   | ... and [12 more](https://app.codecov.io/gh/apache/beam/pull/26526?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | |
   
   ... and [280 files with indirect coverage changes](https://app.codecov.io/gh/apache/beam/pull/26526/indirect-changes?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   
   :mega: We’re building smart automated test selection to slash your CI/CD build times. [Learn more](https://about.codecov.io/iterative-testing/?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] cozos commented on a diff in pull request #26526: Add timeout parameter to with_exception_handling.

Posted by "cozos (via GitHub)" <gi...@apache.org>.
cozos commented on code in PR #26526:
URL: https://github.com/apache/beam/pull/26526#discussion_r1185957495


##########
sdks/python/apache_beam/transforms/core.py:
##########
@@ -2321,6 +2348,43 @@ def _remote_teardown(cls):
     cls._fn = None
 
 
+class _TimeoutDoFn(DoFn):
+  """Process method run in a separate thread allowing timeouts.
+  """
+  def __init__(self, fn, timeout=None):
+    self._fn = fn
+    self._timeout = timeout
+    self._pool = None
+
+  def __getattribute__(self, name):
+    if (name.startswith('__') or name in self.__dict__ or
+        name in type(self).__dict__):
+      return object.__getattribute__(self, name)
+    else:
+      return getattr(self._fn, name)
+
+  def process(self, *args, **kwargs):
+    if self._pool is None:
+      self._pool = concurrent.futures.ThreadPoolExecutor(10)
+    # Ensure we iterate over the entire output list in the given amount of time.
+    try:
+      return self._pool.submit(
+          lambda: list(self._fn.process(*args, **kwargs))).result(
+              self._timeout)
+    except TimeoutError:
+      self._pool.shutdown(wait=False)
+      self._pool = None
+      raise
+
+  def teardown(self):
+    try:
+      self._fn.teardown()
+    finally:
+      if self._pool is not None:
+        self._pool.shutdown(wait=False)

Review Comment:
   @robertwb I think comes with lots of problems. `shutdown(wait=False)` is non blocking but actively running threads will NOT be cancelled. That means that the threads in the pool will become zombies - they continue running, consuming memory and CPU resources until its execution finishes. Users who use this will think they have a memory leak. 
   
   From https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.Executor.shutdown
   
   ```
   If wait is False then this method will return immediately and the resources associated with the executor will be freed when all pending futures are done executing. Regardless of the value of wait, the entire Python program will not exit until all pending futures are done executing.
   
   If cancel_futures is True, this method will cancel all pending futures that the executor has not started running. Any futures that are completed or running won’t be cancelled, regardless of the value of cancel_futures.
   ```
   
   Try putting an infinite loop in the TimeoutDoFn.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] robertwb commented on pull request #26526: Add timeout parameter to with_exception_handling.

Posted by "robertwb (via GitHub)" <gi...@apache.org>.
robertwb commented on PR #26526:
URL: https://github.com/apache/beam/pull/26526#issuecomment-1533437257

   R: @riteshghorse


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] cozos commented on a diff in pull request #26526: Add timeout parameter to with_exception_handling.

Posted by "cozos (via GitHub)" <gi...@apache.org>.
cozos commented on code in PR #26526:
URL: https://github.com/apache/beam/pull/26526#discussion_r1185941858


##########
sdks/python/apache_beam/transforms/core.py:
##########
@@ -2268,19 +2279,35 @@ def finish_bundle(self):
 
   def teardown(self):
     self._call_remote(self._remote_teardown)
-    self._pool.shutdown()
-    self._pool = None
+    self._terminate_pool()
 
   def _call_remote(self, method, *args, **kwargs):
     if self._pool is None:
       self._pool = concurrent.futures.ProcessPoolExecutor(1)

Review Comment:
   FYI the default process creation strategy in Linux (also COS in Dataflow) is `fork` which actually causes problems in Python multiprocessing: https://pythonspeed.com/articles/python-multiprocessing/
   
   Personally I've ran into problems with fork copying threads and gRPC client (i.e. network connections). In fact the Fn Logging handler doesn't work in the subprocess I believe. I changed my pool to `ProcessPoolExecutor(mp_context=get_context("spawn"))` which fixed some issues. This brings other issues though like needing to reestablish env variables, fn logging connection, lull logging thread.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] cozos commented on a diff in pull request #26526: Add timeout parameter to with_exception_handling.

Posted by "cozos (via GitHub)" <gi...@apache.org>.
cozos commented on code in PR #26526:
URL: https://github.com/apache/beam/pull/26526#discussion_r1185957495


##########
sdks/python/apache_beam/transforms/core.py:
##########
@@ -2321,6 +2348,43 @@ def _remote_teardown(cls):
     cls._fn = None
 
 
+class _TimeoutDoFn(DoFn):
+  """Process method run in a separate thread allowing timeouts.
+  """
+  def __init__(self, fn, timeout=None):
+    self._fn = fn
+    self._timeout = timeout
+    self._pool = None
+
+  def __getattribute__(self, name):
+    if (name.startswith('__') or name in self.__dict__ or
+        name in type(self).__dict__):
+      return object.__getattribute__(self, name)
+    else:
+      return getattr(self._fn, name)
+
+  def process(self, *args, **kwargs):
+    if self._pool is None:
+      self._pool = concurrent.futures.ThreadPoolExecutor(10)
+    # Ensure we iterate over the entire output list in the given amount of time.
+    try:
+      return self._pool.submit(
+          lambda: list(self._fn.process(*args, **kwargs))).result(
+              self._timeout)
+    except TimeoutError:
+      self._pool.shutdown(wait=False)
+      self._pool = None
+      raise
+
+  def teardown(self):
+    try:
+      self._fn.teardown()
+    finally:
+      if self._pool is not None:
+        self._pool.shutdown(wait=False)

Review Comment:
   @robertwb I think comes with lots of problems. `shutdown(wait=False)` is non blocking but actively running threads will NOT be cancelled. That means that the threads in the pool will become zombies - they continue running, consuming memory and CPU resources until its execution finishes - try putting an infinite loop in the TimeoutDoFn. Users who use this will think they have a memory leak. 
   
   From https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.Executor.shutdown
   
   > If wait is False then this method will return immediately and the resources associated with the executor will be freed when all pending futures are done executing. Regardless of the value of wait, the entire Python program will not exit until all pending futures are done executing.
   >
   > If cancel_futures is True, this method will cancel all pending futures that the executor has not started running. Any futures that are completed or running won’t be cancelled, regardless of the value of cancel_futures.
   
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org