You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by al...@apache.org on 2017/08/22 01:33:57 UTC

[1/2] beam git commit: Allow termination of DirectRunner execution with Ctrl-C

Repository: beam
Updated Branches:
  refs/heads/master d338b4412 -> 32bf7bc64


Allow termination of DirectRunner execution with Ctrl-C


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/a493f325
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/a493f325
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/a493f325

Branch: refs/heads/master
Commit: a493f325f9e90b378b38a3cff03f8103b28d282a
Parents: d338b44
Author: Charles Chen <cc...@google.com>
Authored: Mon Aug 21 14:18:45 2017 -0700
Committer: Ahmet Altay <al...@google.com>
Committed: Mon Aug 21 18:32:20 2017 -0700

----------------------------------------------------------------------
 .../apache_beam/runners/direct/direct_runner.py   | 10 ++++++++++
 .../python/apache_beam/runners/direct/executor.py | 18 ++++++++++++++----
 .../experimental/python_rpc_direct/server.py      |  2 +-
 .../typehints/native_type_compatibility_test.py   |  3 ++-
 .../apache_beam/typehints/typed_pipeline_test.py  |  3 ++-
 5 files changed, 29 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/a493f325/sdks/python/apache_beam/runners/direct/direct_runner.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/direct/direct_runner.py b/sdks/python/apache_beam/runners/direct/direct_runner.py
index 7a88d0e..2deb7da 100644
--- a/sdks/python/apache_beam/runners/direct/direct_runner.py
+++ b/sdks/python/apache_beam/runners/direct/direct_runner.py
@@ -283,6 +283,16 @@ class DirectPipelineResult(PipelineResult):
     self._executor = executor
     self._evaluation_context = evaluation_context
 
+  def __del__(self):
+    if self._state == PipelineState.RUNNING:
+      logging.warning(
+          'The DirectPipelineResult is being garbage-collected while the '
+          'DirectRunner is still running the corresponding pipeline. This may '
+          'lead to incomplete execution of the pipeline if the main thread '
+          'exits before pipeline completion. Consider using '
+          'result.wait_until_finish() to wait for completion of pipeline '
+          'execution.')
+
   def _is_in_terminal_state(self):
     return self._state is not PipelineState.RUNNING
 

http://git-wip-us.apache.org/repos/asf/beam/blob/a493f325/sdks/python/apache_beam/runners/direct/executor.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/direct/executor.py b/sdks/python/apache_beam/runners/direct/executor.py
index 2e46978..d465068 100644
--- a/sdks/python/apache_beam/runners/direct/executor.py
+++ b/sdks/python/apache_beam/runners/direct/executor.py
@@ -58,6 +58,9 @@ class _ExecutorService(object):
       self._default_name = 'ExecutorServiceWorker-' + str(index)
       self._update_name()
       self.shutdown_requested = False
+
+      # Stop worker thread when main thread exits.
+      self.daemon = True
       self.start()
 
     def _update_name(self, task=None):
@@ -78,7 +81,6 @@ class _ExecutorService(object):
         return None
 
     def run(self):
-
       while not self.shutdown_requested:
         task = self._get_task_or_none()
         if task:
@@ -460,9 +462,17 @@ class _ExecutorServiceParallelExecutor(object):
         return None
 
     def take(self):
-      item = self._queue.get()
-      self._queue.task_done()
-      return item
+      # The implementation of Queue.Queue.get() does not propagate
+      # KeyboardInterrupts when a timeout is not used.  We therefore use a
+      # one-second timeout in the following loop to allow KeyboardInterrupts
+      # to be correctly propagated.
+      while True:
+        try:
+          item = self._queue.get(timeout=1)
+          self._queue.task_done()
+          return item
+        except Queue.Empty:
+          pass
 
     def offer(self, item):
       assert isinstance(item, self._item_type)

http://git-wip-us.apache.org/repos/asf/beam/blob/a493f325/sdks/python/apache_beam/runners/experimental/python_rpc_direct/server.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/experimental/python_rpc_direct/server.py b/sdks/python/apache_beam/runners/experimental/python_rpc_direct/server.py
index 3addf92..bae25a4 100644
--- a/sdks/python/apache_beam/runners/experimental/python_rpc_direct/server.py
+++ b/sdks/python/apache_beam/runners/experimental/python_rpc_direct/server.py
@@ -17,10 +17,10 @@
 
 """A runner implementation that submits a job for remote execution.
 """
-from concurrent import futures
 import time
 import uuid
 
+from concurrent import futures
 import grpc
 
 from apache_beam.portability.api import beam_job_api_pb2

http://git-wip-us.apache.org/repos/asf/beam/blob/a493f325/sdks/python/apache_beam/typehints/native_type_compatibility_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/typehints/native_type_compatibility_test.py b/sdks/python/apache_beam/typehints/native_type_compatibility_test.py
index d0cafe1..0ff2b3b 100644
--- a/sdks/python/apache_beam/typehints/native_type_compatibility_test.py
+++ b/sdks/python/apache_beam/typehints/native_type_compatibility_test.py
@@ -17,9 +17,10 @@
 
 """Test for Beam type compatibility library."""
 
-import typing
 import unittest
 
+import typing
+
 from apache_beam.typehints import typehints
 from apache_beam.typehints import native_type_compatibility
 

http://git-wip-us.apache.org/repos/asf/beam/blob/a493f325/sdks/python/apache_beam/typehints/typed_pipeline_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/typehints/typed_pipeline_test.py b/sdks/python/apache_beam/typehints/typed_pipeline_test.py
index 58274f3..59d1e1c 100644
--- a/sdks/python/apache_beam/typehints/typed_pipeline_test.py
+++ b/sdks/python/apache_beam/typehints/typed_pipeline_test.py
@@ -17,9 +17,10 @@
 
 """Unit tests for the type-hint objects and decorators."""
 import inspect
-import typing
 import unittest
 
+import typing
+
 import apache_beam as beam
 from apache_beam import pvalue
 from apache_beam import typehints

[2/2] beam git commit: This closes #3741

Posted by al...@apache.org.
This closes #3741


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/32bf7bc6
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/32bf7bc6
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/32bf7bc6

Branch: refs/heads/master
Commit: 32bf7bc64ac4670b8973bcb231381926e6f768a0
Parents: d338b44 a493f32
Author: Ahmet Altay <al...@google.com>
Authored: Mon Aug 21 18:33:26 2017 -0700
Committer: Ahmet Altay <al...@google.com>
Committed: Mon Aug 21 18:33:26 2017 -0700

----------------------------------------------------------------------
 .../apache_beam/runners/direct/direct_runner.py   | 10 ++++++++++
 .../python/apache_beam/runners/direct/executor.py | 18 ++++++++++++++----
 .../experimental/python_rpc_direct/server.py      |  2 +-
 .../typehints/native_type_compatibility_test.py   |  3 ++-
 .../apache_beam/typehints/typed_pipeline_test.py  |  3 ++-
 5 files changed, 29 insertions(+), 7 deletions(-)
----------------------------------------------------------------------