You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by al...@apache.org on 2017/08/25 22:07:47 UTC
[1/2] beam git commit: Actually wait for exector service to shutdown
Repository: beam
Updated Branches:
refs/heads/master 77a0a2afc -> 9071c5516
Actually wait for exector service to shutdown
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/2dea491c
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/2dea491c
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/2dea491c
Branch: refs/heads/master
Commit: 2dea491cab131b830e884bd408e82e97690259d9
Parents: 77a0a2a
Author: Sergiy Byelozyorov <se...@chromium.org>
Authored: Wed Aug 23 19:04:31 2017 +0200
Committer: Ahmet Altay <al...@google.com>
Committed: Fri Aug 25 15:07:33 2017 -0700
----------------------------------------------------------------------
.../runners/direct/direct_runner_test.py | 41 ++++++++++++++++++++
.../apache_beam/runners/direct/executor.py | 1 +
2 files changed, 42 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/2dea491c/sdks/python/apache_beam/runners/direct/direct_runner_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/direct/direct_runner_test.py b/sdks/python/apache_beam/runners/direct/direct_runner_test.py
new file mode 100644
index 0000000..1c8b785
--- /dev/null
+++ b/sdks/python/apache_beam/runners/direct/direct_runner_test.py
@@ -0,0 +1,41 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import threading
+import unittest
+
+import apache_beam as beam
+from apache_beam.testing import test_pipeline
+
+
+class DirectPipelineResultTest(unittest.TestCase):
+
+ def test_waiting_on_result_stops_executor_threads(self):
+ pre_test_threads = set(t.ident for t in threading.enumerate())
+
+ pipeline = test_pipeline.TestPipeline()
+ _ = (pipeline | beam.Create([{'foo': 'bar'}]))
+ result = pipeline.run()
+ result.wait_until_finish()
+
+ post_test_threads = set(t.ident for t in threading.enumerate())
+ new_threads = post_test_threads - pre_test_threads
+ self.assertEqual(len(new_threads), 0)
+
+
+if __name__ == '__main__':
+ unittest.main()
http://git-wip-us.apache.org/repos/asf/beam/blob/2dea491c/sdks/python/apache_beam/runners/direct/executor.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/direct/executor.py b/sdks/python/apache_beam/runners/direct/executor.py
index d465068..3e08b52 100644
--- a/sdks/python/apache_beam/runners/direct/executor.py
+++ b/sdks/python/apache_beam/runners/direct/executor.py
@@ -415,6 +415,7 @@ class _ExecutorServiceParallelExecutor(object):
raise t, v, tb
finally:
self.executor_service.shutdown()
+ self.executor_service.await_completion()
def schedule_consumers(self, committed_bundle):
if committed_bundle.pcollection in self.value_to_consumers:
[2/2] beam git commit: This closes #3751
Posted by al...@apache.org.
This closes #3751
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/9071c551
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/9071c551
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/9071c551
Branch: refs/heads/master
Commit: 9071c55165e88dd898175f88862d41b53bf4deca
Parents: 77a0a2a 2dea491
Author: Ahmet Altay <al...@google.com>
Authored: Fri Aug 25 15:07:37 2017 -0700
Committer: Ahmet Altay <al...@google.com>
Committed: Fri Aug 25 15:07:37 2017 -0700
----------------------------------------------------------------------
.../runners/direct/direct_runner_test.py | 41 ++++++++++++++++++++
.../apache_beam/runners/direct/executor.py | 1 +
2 files changed, 42 insertions(+)
----------------------------------------------------------------------