You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by al...@apache.org on 2017/08/22 01:33:57 UTC
[1/2] beam git commit: Allow termination of DirectRunner execution
with Ctrl-C
Repository: beam
Updated Branches:
refs/heads/master d338b4412 -> 32bf7bc64
Allow termination of DirectRunner execution with Ctrl-C
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/a493f325
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/a493f325
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/a493f325
Branch: refs/heads/master
Commit: a493f325f9e90b378b38a3cff03f8103b28d282a
Parents: d338b44
Author: Charles Chen <cc...@google.com>
Authored: Mon Aug 21 14:18:45 2017 -0700
Committer: Ahmet Altay <al...@google.com>
Committed: Mon Aug 21 18:32:20 2017 -0700
----------------------------------------------------------------------
.../apache_beam/runners/direct/direct_runner.py | 10 ++++++++++
.../python/apache_beam/runners/direct/executor.py | 18 ++++++++++++++----
.../experimental/python_rpc_direct/server.py | 2 +-
.../typehints/native_type_compatibility_test.py | 3 ++-
.../apache_beam/typehints/typed_pipeline_test.py | 3 ++-
5 files changed, 29 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/a493f325/sdks/python/apache_beam/runners/direct/direct_runner.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/direct/direct_runner.py b/sdks/python/apache_beam/runners/direct/direct_runner.py
index 7a88d0e..2deb7da 100644
--- a/sdks/python/apache_beam/runners/direct/direct_runner.py
+++ b/sdks/python/apache_beam/runners/direct/direct_runner.py
@@ -283,6 +283,16 @@ class DirectPipelineResult(PipelineResult):
self._executor = executor
self._evaluation_context = evaluation_context
+ def __del__(self):
+ if self._state == PipelineState.RUNNING:
+ logging.warning(
+ 'The DirectPipelineResult is being garbage-collected while the '
+ 'DirectRunner is still running the corresponding pipeline. This may '
+ 'lead to incomplete execution of the pipeline if the main thread '
+ 'exits before pipeline completion. Consider using '
+ 'result.wait_until_finish() to wait for completion of pipeline '
+ 'execution.')
+
def _is_in_terminal_state(self):
return self._state is not PipelineState.RUNNING
http://git-wip-us.apache.org/repos/asf/beam/blob/a493f325/sdks/python/apache_beam/runners/direct/executor.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/direct/executor.py b/sdks/python/apache_beam/runners/direct/executor.py
index 2e46978..d465068 100644
--- a/sdks/python/apache_beam/runners/direct/executor.py
+++ b/sdks/python/apache_beam/runners/direct/executor.py
@@ -58,6 +58,9 @@ class _ExecutorService(object):
self._default_name = 'ExecutorServiceWorker-' + str(index)
self._update_name()
self.shutdown_requested = False
+
+ # Stop worker thread when main thread exits.
+ self.daemon = True
self.start()
def _update_name(self, task=None):
@@ -78,7 +81,6 @@ class _ExecutorService(object):
return None
def run(self):
-
while not self.shutdown_requested:
task = self._get_task_or_none()
if task:
@@ -460,9 +462,17 @@ class _ExecutorServiceParallelExecutor(object):
return None
def take(self):
- item = self._queue.get()
- self._queue.task_done()
- return item
+ # The implementation of Queue.Queue.get() does not propagate
+ # KeyboardInterrupts when a timeout is not used. We therefore use a
+ # one-second timeout in the following loop to allow KeyboardInterrupts
+ # to be correctly propagated.
+ while True:
+ try:
+ item = self._queue.get(timeout=1)
+ self._queue.task_done()
+ return item
+ except Queue.Empty:
+ pass
def offer(self, item):
assert isinstance(item, self._item_type)
http://git-wip-us.apache.org/repos/asf/beam/blob/a493f325/sdks/python/apache_beam/runners/experimental/python_rpc_direct/server.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/experimental/python_rpc_direct/server.py b/sdks/python/apache_beam/runners/experimental/python_rpc_direct/server.py
index 3addf92..bae25a4 100644
--- a/sdks/python/apache_beam/runners/experimental/python_rpc_direct/server.py
+++ b/sdks/python/apache_beam/runners/experimental/python_rpc_direct/server.py
@@ -17,10 +17,10 @@
"""A runner implementation that submits a job for remote execution.
"""
-from concurrent import futures
import time
import uuid
+from concurrent import futures
import grpc
from apache_beam.portability.api import beam_job_api_pb2
http://git-wip-us.apache.org/repos/asf/beam/blob/a493f325/sdks/python/apache_beam/typehints/native_type_compatibility_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/typehints/native_type_compatibility_test.py b/sdks/python/apache_beam/typehints/native_type_compatibility_test.py
index d0cafe1..0ff2b3b 100644
--- a/sdks/python/apache_beam/typehints/native_type_compatibility_test.py
+++ b/sdks/python/apache_beam/typehints/native_type_compatibility_test.py
@@ -17,9 +17,10 @@
"""Test for Beam type compatibility library."""
-import typing
import unittest
+import typing
+
from apache_beam.typehints import typehints
from apache_beam.typehints import native_type_compatibility
http://git-wip-us.apache.org/repos/asf/beam/blob/a493f325/sdks/python/apache_beam/typehints/typed_pipeline_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/typehints/typed_pipeline_test.py b/sdks/python/apache_beam/typehints/typed_pipeline_test.py
index 58274f3..59d1e1c 100644
--- a/sdks/python/apache_beam/typehints/typed_pipeline_test.py
+++ b/sdks/python/apache_beam/typehints/typed_pipeline_test.py
@@ -17,9 +17,10 @@
"""Unit tests for the type-hint objects and decorators."""
import inspect
-import typing
import unittest
+import typing
+
import apache_beam as beam
from apache_beam import pvalue
from apache_beam import typehints
[2/2] beam git commit: This closes #3741
Posted by al...@apache.org.
This closes #3741
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/32bf7bc6
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/32bf7bc6
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/32bf7bc6
Branch: refs/heads/master
Commit: 32bf7bc64ac4670b8973bcb231381926e6f768a0
Parents: d338b44 a493f32
Author: Ahmet Altay <al...@google.com>
Authored: Mon Aug 21 18:33:26 2017 -0700
Committer: Ahmet Altay <al...@google.com>
Committed: Mon Aug 21 18:33:26 2017 -0700
----------------------------------------------------------------------
.../apache_beam/runners/direct/direct_runner.py | 10 ++++++++++
.../python/apache_beam/runners/direct/executor.py | 18 ++++++++++++++----
.../experimental/python_rpc_direct/server.py | 2 +-
.../typehints/native_type_compatibility_test.py | 3 ++-
.../apache_beam/typehints/typed_pipeline_test.py | 3 ++-
5 files changed, 29 insertions(+), 7 deletions(-)
----------------------------------------------------------------------