You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by mx...@apache.org on 2019/03/11 11:06:16 UTC

[beam] branch master updated: [BEAM-6666] subprocess.Popen hangs after use of gRPC channel

This is an automated email from the ASF dual-hosted git repository.

mxm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 5fc404d  [BEAM-6666] subprocess.Popen hangs after use of gRPC channel
     new 99d5d91  Merge pull request #7845: [BEAM-6666] subprocess.Popen hangs after use of gRPC channel
5fc404d is described below

commit 5fc404d49a77fc9c67230286567a3537147a5303
Author: Heejong Lee <he...@gmail.com>
AuthorDate: Thu Feb 14 11:25:24 2019 -0800

    [BEAM-6666] subprocess.Popen hangs after use of gRPC channel
---
 .../python/apache_beam/examples/wordcount_xlang.py | 43 ++++++++++++--------
 .../runners/portability/portable_runner.py         | 16 ++++++--
 sdks/python/build.gradle                           | 46 +++++++++++++++++-----
 3 files changed, 74 insertions(+), 31 deletions(-)

diff --git a/sdks/python/apache_beam/examples/wordcount_xlang.py b/sdks/python/apache_beam/examples/wordcount_xlang.py
index d9256fe..f4071f8 100644
--- a/sdks/python/apache_beam/examples/wordcount_xlang.py
+++ b/sdks/python/apache_beam/examples/wordcount_xlang.py
@@ -31,8 +31,10 @@ from apache_beam.io import ReadFromText
 from apache_beam.io import WriteToText
 from apache_beam.options.pipeline_options import PipelineOptions
 from apache_beam.options.pipeline_options import SetupOptions
+from apache_beam.options.pipeline_options import StandardOptions
 
-EXPANSION_SERVICE_PORT = '8097'
+# avoid possible conflict with job-server embedded expansion service at 8097
+EXPANSION_SERVICE_PORT = '8096'
 EXPANSION_SERVICE_ADDR = 'localhost:%s' % EXPANSION_SERVICE_PORT
 
 
@@ -60,14 +62,7 @@ class WordExtractingDoFn(beam.DoFn):
     return words
 
 
-def run(pipeline_args, input_file, output_file):
-
-  # We use the save_main_session option because one or more DoFn's in this
-  # workflow rely on global context (e.g., a module imported at module level).
-  pipeline_options = PipelineOptions(pipeline_args)
-  pipeline_options.view_as(SetupOptions).save_main_session = True
-  p = beam.Pipeline(options=pipeline_options)
-
+def run(p, input_file, output_file):
   # Read the text file[pattern] into a PCollection.
   lines = p | 'read' >> ReadFromText(input_file)
 
@@ -92,12 +87,7 @@ def run(pipeline_args, input_file, output_file):
   result.wait_until_finish()
 
 
-def wait_for_ready():
-  with grpc.insecure_channel(EXPANSION_SERVICE_ADDR) as channel:
-    grpc.channel_ready_future(channel).result()
-
-
-if __name__ == '__main__':
+def main():
   logging.getLogger().setLevel(logging.INFO)
 
   parser = argparse.ArgumentParser()
@@ -115,13 +105,32 @@ if __name__ == '__main__':
                       help='Jar file for expansion service')
 
   known_args, pipeline_args = parser.parse_known_args()
+
+  pipeline_options = PipelineOptions(pipeline_args)
+  assert (
+      pipeline_options.view_as(StandardOptions).runner.lower()
+      == "portablerunner"), "Only PortableRunner is supported."
+
+  # We use the save_main_session option because one or more DoFn's in this
+  # workflow rely on global context (e.g., a module imported at module level).
+  pipeline_options.view_as(SetupOptions).save_main_session = True
+
+  p = beam.Pipeline(options=pipeline_options)
+  p.runner.init_dockerized_job_server()
+
   try:
     server = subprocess.Popen([
         'java', '-jar', known_args.expansion_service_jar,
         EXPANSION_SERVICE_PORT])
-    wait_for_ready()
 
-    run(pipeline_args, known_args.input, known_args.output)
+    with grpc.insecure_channel(EXPANSION_SERVICE_ADDR) as channel:
+      grpc.channel_ready_future(channel).result()
+
+    run(p, known_args.input, known_args.output)
 
   finally:
     server.kill()
+
+
+if __name__ == '__main__':
+  main()
diff --git a/sdks/python/apache_beam/runners/portability/portable_runner.py b/sdks/python/apache_beam/runners/portability/portable_runner.py
index 97d9c01..c6f5591 100644
--- a/sdks/python/apache_beam/runners/portability/portable_runner.py
+++ b/sdks/python/apache_beam/runners/portability/portable_runner.py
@@ -77,6 +77,9 @@ class PortableRunner(runner.PipelineRunner):
     This runner schedules the job on a job service. The responsibility of
     running and managing the job lies with the job service used.
   """
+  def __init__(self):
+    self._job_endpoint = None
+
   @staticmethod
   def default_docker_image():
     if 'USER' in os.environ:
@@ -137,6 +140,12 @@ class PortableRunner(runner.PipelineRunner):
           payload=(portable_options.environment_config.encode('ascii')
                    if portable_options.environment_config else None))
 
+  def init_dockerized_job_server(self):
+    # TODO Provide a way to specify a container Docker URL
+    # https://issues.apache.org/jira/browse/BEAM-6328
+    docker = DockerizedJobServer()
+    self._job_endpoint = docker.start()
+
   def run_pipeline(self, pipeline, options):
     portable_options = options.view_as(PortableOptions)
     job_endpoint = portable_options.job_endpoint
@@ -147,10 +156,9 @@ class PortableRunner(runner.PipelineRunner):
       options.view_as(SetupOptions).sdk_location = 'container'
 
     if not job_endpoint:
-      # TODO Provide a way to specify a container Docker URL
-      # https://issues.apache.org/jira/browse/BEAM-6328
-      docker = DockerizedJobServer()
-      job_endpoint = docker.start()
+      if not self._job_endpoint:
+        self.init_dockerized_job_server()
+      job_endpoint = self._job_endpoint
       job_service = None
     elif job_endpoint == 'embed':
       job_service = local_job_service.LocalJobServicer()
diff --git a/sdks/python/build.gradle b/sdks/python/build.gradle
index 0ac62e9..827f7fc 100644
--- a/sdks/python/build.gradle
+++ b/sdks/python/build.gradle
@@ -125,8 +125,8 @@ task preCommit() {
 task portablePreCommit() {
   dependsOn ':beam-runners-flink_2.11-job-server-container:docker'
   dependsOn ':beam-sdks-python-container:docker'
-  dependsOn portableWordCountTask('portableWordCountBatch', false, false)
-  dependsOn portableWordCountTask('portableWordCountStreaming', true, false)
+  dependsOn portableWordCountTask('portableWordCountBatch', false)
+  dependsOn portableWordCountTask('portableWordCountStreaming', true)
 }
 
 
@@ -204,13 +204,13 @@ task directRunnerIT(dependsOn: 'installGcpTest') {
 //    ./gradlew :beam-sdks-python:portableWordCount -PjobEndpoint=localhost:8099
 //
 task portableWordCount {
-  dependsOn portableWordCountTask('portableWordCountExample', project.hasProperty("streaming"), project.hasProperty("crossLanguage"))
+  dependsOn portableWordCountTask('portableWordCountExample', project.hasProperty("streaming"))
 }
 
-def portableWordCountTask(name, streaming, crossLanguage) {
+def portableWordCountTask(name, streaming) {
   tasks.create(name) {
     dependsOn = ['installGcpTest']
-    mustRunAfter = [':beam-runners-flink_2.11-job-server-container:docker', ':beam-sdks-python-container:docker', ':beam-sdks-java-container:docker', ':beam-runners-core-construction-java:buildTestExpansionServiceJar']
+    mustRunAfter = [':beam-runners-flink_2.11-job-server-container:docker', ':beam-sdks-python-container:docker']
     doLast {
       // TODO: Figure out GCS credentials and use real GCS input and output.
       def options = [
@@ -226,14 +226,11 @@ def portableWordCountTask(name, streaming, crossLanguage) {
       else
         // workaround for local file output in docker container
         options += ["--environment_cache_millis=10000"]
-      if (crossLanguage)
-        options += ["--expansion_service_jar=${project(":beam-runners-core-construction-java:").buildTestExpansionServiceJar.archivePath}"]
       if (project.hasProperty("jobEndpoint"))
         options += ["--job_endpoint=${project.property('jobEndpoint')}"]
-      def wordcountMain = crossLanguage ? "apache_beam.examples.wordcount_xlang" : "apache_beam.examples.wordcount"
       exec {
         executable 'sh'
-        args '-c', ". ${project.ext.envdir}/bin/activate && python -m ${wordcountMain} ${options.join(' ')}"
+        args '-c', ". ${project.ext.envdir}/bin/activate && python -m apache_beam.examples.wordcount ${options.join(' ')}"
         // TODO: Check that the output file is generated and runs.
       }
     }
@@ -456,10 +453,39 @@ project.task('crossLanguagePythonJava') {
   dependsOn 'setupVirtualenv'
   dependsOn ':beam-sdks-java-container:docker'
   dependsOn ':beam-runners-core-construction-java:buildTestExpansionServiceJar'
+
   doLast {
+    def testServiceExpansionJar = project(":beam-runners-core-construction-java:").buildTestExpansionServiceJar.archivePath
+    exec {
+      executable 'sh'
+      args '-c', ". ${project.ext.envdir}/bin/activate && pip install -e .[test] && python -m apache_beam.transforms.external_test --expansion_service_jar=${testServiceExpansionJar}"
+    }
+  }
+}
+
+project.task('crossLanguagePortableWordCount') {
+  dependsOn 'setupVirtualenv'
+  dependsOn ':beam-runners-flink_2.11-job-server-container:docker'
+  dependsOn ':beam-sdks-python-container:docker'
+  dependsOn ':beam-sdks-java-container:docker'
+  dependsOn ':beam-runners-core-construction-java:buildTestExpansionServiceJar'
+
+  doLast {
+    def testServiceExpansionJar = project(":beam-runners-core-construction-java:").buildTestExpansionServiceJar.archivePath
+    def options = [
+            "--input=/etc/profile",
+            "--output=/tmp/py-wordcount-portable",
+            "--runner=PortableRunner",
+            "--experiments=worker_threads=100",
+            "--parallelism=2",
+            "--shutdown_sources_on_final_watermark",
+            "--environment_cache_millis=10000",
+            "--expansion_service_jar=${testServiceExpansionJar}",
+    ]
     exec {
       executable 'sh'
-      args '-c', ". ${project.ext.envdir}/bin/activate && pip install -e .[test] && python -m apache_beam.transforms.external_test --expansion_service_jar=${project(":beam-runners-core-construction-java:").buildTestExpansionServiceJar.archivePath}"
+      args '-c', ". ${project.ext.envdir}/bin/activate && python -m apache_beam.examples.wordcount_xlang ${options.join(' ')}"
+      // TODO: Check that the output file is generated and runs.
     }
   }
 }