You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by al...@apache.org on 2017/08/25 22:07:47 UTC

[1/2] beam git commit: Actually wait for exector service to shutdown

Repository: beam
Updated Branches:
  refs/heads/master 77a0a2afc -> 9071c5516


Actually wait for exector service to shutdown


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/2dea491c
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/2dea491c
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/2dea491c

Branch: refs/heads/master
Commit: 2dea491cab131b830e884bd408e82e97690259d9
Parents: 77a0a2a
Author: Sergiy Byelozyorov <se...@chromium.org>
Authored: Wed Aug 23 19:04:31 2017 +0200
Committer: Ahmet Altay <al...@google.com>
Committed: Fri Aug 25 15:07:33 2017 -0700

----------------------------------------------------------------------
 .../runners/direct/direct_runner_test.py        | 41 ++++++++++++++++++++
 .../apache_beam/runners/direct/executor.py      |  1 +
 2 files changed, 42 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/2dea491c/sdks/python/apache_beam/runners/direct/direct_runner_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/direct/direct_runner_test.py b/sdks/python/apache_beam/runners/direct/direct_runner_test.py
new file mode 100644
index 0000000..1c8b785
--- /dev/null
+++ b/sdks/python/apache_beam/runners/direct/direct_runner_test.py
@@ -0,0 +1,41 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import threading
+import unittest
+
+import apache_beam as beam
+from apache_beam.testing import test_pipeline
+
+
+class DirectPipelineResultTest(unittest.TestCase):
+
+  def test_waiting_on_result_stops_executor_threads(self):
+    pre_test_threads = set(t.ident for t in threading.enumerate())
+
+    pipeline = test_pipeline.TestPipeline()
+    _ = (pipeline | beam.Create([{'foo': 'bar'}]))
+    result = pipeline.run()
+    result.wait_until_finish()
+
+    post_test_threads = set(t.ident for t in threading.enumerate())
+    new_threads = post_test_threads - pre_test_threads
+    self.assertEqual(len(new_threads), 0)
+
+
+if __name__ == '__main__':
+  unittest.main()

http://git-wip-us.apache.org/repos/asf/beam/blob/2dea491c/sdks/python/apache_beam/runners/direct/executor.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/direct/executor.py b/sdks/python/apache_beam/runners/direct/executor.py
index d465068..3e08b52 100644
--- a/sdks/python/apache_beam/runners/direct/executor.py
+++ b/sdks/python/apache_beam/runners/direct/executor.py
@@ -415,6 +415,7 @@ class _ExecutorServiceParallelExecutor(object):
         raise t, v, tb
     finally:
       self.executor_service.shutdown()
+      self.executor_service.await_completion()
 
   def schedule_consumers(self, committed_bundle):
     if committed_bundle.pcollection in self.value_to_consumers:


[2/2] beam git commit: This closes #3751

Posted by al...@apache.org.
This closes #3751


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/9071c551
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/9071c551
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/9071c551

Branch: refs/heads/master
Commit: 9071c55165e88dd898175f88862d41b53bf4deca
Parents: 77a0a2a 2dea491
Author: Ahmet Altay <al...@google.com>
Authored: Fri Aug 25 15:07:37 2017 -0700
Committer: Ahmet Altay <al...@google.com>
Committed: Fri Aug 25 15:07:37 2017 -0700

----------------------------------------------------------------------
 .../runners/direct/direct_runner_test.py        | 41 ++++++++++++++++++++
 .../apache_beam/runners/direct/executor.py      |  1 +
 2 files changed, 42 insertions(+)
----------------------------------------------------------------------