You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by mx...@apache.org on 2019/03/11 11:06:16 UTC
[beam] branch master updated: [BEAM-6666] subprocess.Popen hangs
after use of gRPC channel
This is an automated email from the ASF dual-hosted git repository.
mxm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 5fc404d [BEAM-6666] subprocess.Popen hangs after use of gRPC channel
new 99d5d91 Merge pull request #7845: [BEAM-6666] subprocess.Popen hangs after use of gRPC channel
5fc404d is described below
commit 5fc404d49a77fc9c67230286567a3537147a5303
Author: Heejong Lee <he...@gmail.com>
AuthorDate: Thu Feb 14 11:25:24 2019 -0800
[BEAM-6666] subprocess.Popen hangs after use of gRPC channel
---
.../python/apache_beam/examples/wordcount_xlang.py | 43 ++++++++++++--------
.../runners/portability/portable_runner.py | 16 ++++++--
sdks/python/build.gradle | 46 +++++++++++++++++-----
3 files changed, 74 insertions(+), 31 deletions(-)
diff --git a/sdks/python/apache_beam/examples/wordcount_xlang.py b/sdks/python/apache_beam/examples/wordcount_xlang.py
index d9256fe..f4071f8 100644
--- a/sdks/python/apache_beam/examples/wordcount_xlang.py
+++ b/sdks/python/apache_beam/examples/wordcount_xlang.py
@@ -31,8 +31,10 @@ from apache_beam.io import ReadFromText
from apache_beam.io import WriteToText
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
+from apache_beam.options.pipeline_options import StandardOptions
-EXPANSION_SERVICE_PORT = '8097'
+# avoid possible conflict with job-server embedded expansion service at 8097
+EXPANSION_SERVICE_PORT = '8096'
EXPANSION_SERVICE_ADDR = 'localhost:%s' % EXPANSION_SERVICE_PORT
@@ -60,14 +62,7 @@ class WordExtractingDoFn(beam.DoFn):
return words
-def run(pipeline_args, input_file, output_file):
-
- # We use the save_main_session option because one or more DoFn's in this
- # workflow rely on global context (e.g., a module imported at module level).
- pipeline_options = PipelineOptions(pipeline_args)
- pipeline_options.view_as(SetupOptions).save_main_session = True
- p = beam.Pipeline(options=pipeline_options)
-
+def run(p, input_file, output_file):
# Read the text file[pattern] into a PCollection.
lines = p | 'read' >> ReadFromText(input_file)
@@ -92,12 +87,7 @@ def run(pipeline_args, input_file, output_file):
result.wait_until_finish()
-def wait_for_ready():
- with grpc.insecure_channel(EXPANSION_SERVICE_ADDR) as channel:
- grpc.channel_ready_future(channel).result()
-
-
-if __name__ == '__main__':
+def main():
logging.getLogger().setLevel(logging.INFO)
parser = argparse.ArgumentParser()
@@ -115,13 +105,32 @@ if __name__ == '__main__':
help='Jar file for expansion service')
known_args, pipeline_args = parser.parse_known_args()
+
+ pipeline_options = PipelineOptions(pipeline_args)
+ assert (
+ pipeline_options.view_as(StandardOptions).runner.lower()
+ == "portablerunner"), "Only PortableRunner is supported."
+
+ # We use the save_main_session option because one or more DoFn's in this
+ # workflow rely on global context (e.g., a module imported at module level).
+ pipeline_options.view_as(SetupOptions).save_main_session = True
+
+ p = beam.Pipeline(options=pipeline_options)
+ p.runner.init_dockerized_job_server()
+
try:
server = subprocess.Popen([
'java', '-jar', known_args.expansion_service_jar,
EXPANSION_SERVICE_PORT])
- wait_for_ready()
- run(pipeline_args, known_args.input, known_args.output)
+ with grpc.insecure_channel(EXPANSION_SERVICE_ADDR) as channel:
+ grpc.channel_ready_future(channel).result()
+
+ run(p, known_args.input, known_args.output)
finally:
server.kill()
+
+
+if __name__ == '__main__':
+ main()
diff --git a/sdks/python/apache_beam/runners/portability/portable_runner.py b/sdks/python/apache_beam/runners/portability/portable_runner.py
index 97d9c01..c6f5591 100644
--- a/sdks/python/apache_beam/runners/portability/portable_runner.py
+++ b/sdks/python/apache_beam/runners/portability/portable_runner.py
@@ -77,6 +77,9 @@ class PortableRunner(runner.PipelineRunner):
This runner schedules the job on a job service. The responsibility of
running and managing the job lies with the job service used.
"""
+ def __init__(self):
+ self._job_endpoint = None
+
@staticmethod
def default_docker_image():
if 'USER' in os.environ:
@@ -137,6 +140,12 @@ class PortableRunner(runner.PipelineRunner):
payload=(portable_options.environment_config.encode('ascii')
if portable_options.environment_config else None))
+ def init_dockerized_job_server(self):
+ # TODO Provide a way to specify a container Docker URL
+ # https://issues.apache.org/jira/browse/BEAM-6328
+ docker = DockerizedJobServer()
+ self._job_endpoint = docker.start()
+
def run_pipeline(self, pipeline, options):
portable_options = options.view_as(PortableOptions)
job_endpoint = portable_options.job_endpoint
@@ -147,10 +156,9 @@ class PortableRunner(runner.PipelineRunner):
options.view_as(SetupOptions).sdk_location = 'container'
if not job_endpoint:
- # TODO Provide a way to specify a container Docker URL
- # https://issues.apache.org/jira/browse/BEAM-6328
- docker = DockerizedJobServer()
- job_endpoint = docker.start()
+ if not self._job_endpoint:
+ self.init_dockerized_job_server()
+ job_endpoint = self._job_endpoint
job_service = None
elif job_endpoint == 'embed':
job_service = local_job_service.LocalJobServicer()
diff --git a/sdks/python/build.gradle b/sdks/python/build.gradle
index 0ac62e9..827f7fc 100644
--- a/sdks/python/build.gradle
+++ b/sdks/python/build.gradle
@@ -125,8 +125,8 @@ task preCommit() {
task portablePreCommit() {
dependsOn ':beam-runners-flink_2.11-job-server-container:docker'
dependsOn ':beam-sdks-python-container:docker'
- dependsOn portableWordCountTask('portableWordCountBatch', false, false)
- dependsOn portableWordCountTask('portableWordCountStreaming', true, false)
+ dependsOn portableWordCountTask('portableWordCountBatch', false)
+ dependsOn portableWordCountTask('portableWordCountStreaming', true)
}
@@ -204,13 +204,13 @@ task directRunnerIT(dependsOn: 'installGcpTest') {
// ./gradlew :beam-sdks-python:portableWordCount -PjobEndpoint=localhost:8099
//
task portableWordCount {
- dependsOn portableWordCountTask('portableWordCountExample', project.hasProperty("streaming"), project.hasProperty("crossLanguage"))
+ dependsOn portableWordCountTask('portableWordCountExample', project.hasProperty("streaming"))
}
-def portableWordCountTask(name, streaming, crossLanguage) {
+def portableWordCountTask(name, streaming) {
tasks.create(name) {
dependsOn = ['installGcpTest']
- mustRunAfter = [':beam-runners-flink_2.11-job-server-container:docker', ':beam-sdks-python-container:docker', ':beam-sdks-java-container:docker', ':beam-runners-core-construction-java:buildTestExpansionServiceJar']
+ mustRunAfter = [':beam-runners-flink_2.11-job-server-container:docker', ':beam-sdks-python-container:docker']
doLast {
// TODO: Figure out GCS credentials and use real GCS input and output.
def options = [
@@ -226,14 +226,11 @@ def portableWordCountTask(name, streaming, crossLanguage) {
else
// workaround for local file output in docker container
options += ["--environment_cache_millis=10000"]
- if (crossLanguage)
- options += ["--expansion_service_jar=${project(":beam-runners-core-construction-java:").buildTestExpansionServiceJar.archivePath}"]
if (project.hasProperty("jobEndpoint"))
options += ["--job_endpoint=${project.property('jobEndpoint')}"]
- def wordcountMain = crossLanguage ? "apache_beam.examples.wordcount_xlang" : "apache_beam.examples.wordcount"
exec {
executable 'sh'
- args '-c', ". ${project.ext.envdir}/bin/activate && python -m ${wordcountMain} ${options.join(' ')}"
+ args '-c', ". ${project.ext.envdir}/bin/activate && python -m apache_beam.examples.wordcount ${options.join(' ')}"
// TODO: Check that the output file is generated and runs.
}
}
@@ -456,10 +453,39 @@ project.task('crossLanguagePythonJava') {
dependsOn 'setupVirtualenv'
dependsOn ':beam-sdks-java-container:docker'
dependsOn ':beam-runners-core-construction-java:buildTestExpansionServiceJar'
+
doLast {
+ def testServiceExpansionJar = project(":beam-runners-core-construction-java:").buildTestExpansionServiceJar.archivePath
+ exec {
+ executable 'sh'
+ args '-c', ". ${project.ext.envdir}/bin/activate && pip install -e .[test] && python -m apache_beam.transforms.external_test --expansion_service_jar=${testServiceExpansionJar}"
+ }
+ }
+}
+
+project.task('crossLanguagePortableWordCount') {
+ dependsOn 'setupVirtualenv'
+ dependsOn ':beam-runners-flink_2.11-job-server-container:docker'
+ dependsOn ':beam-sdks-python-container:docker'
+ dependsOn ':beam-sdks-java-container:docker'
+ dependsOn ':beam-runners-core-construction-java:buildTestExpansionServiceJar'
+
+ doLast {
+ def testServiceExpansionJar = project(":beam-runners-core-construction-java:").buildTestExpansionServiceJar.archivePath
+ def options = [
+ "--input=/etc/profile",
+ "--output=/tmp/py-wordcount-portable",
+ "--runner=PortableRunner",
+ "--experiments=worker_threads=100",
+ "--parallelism=2",
+ "--shutdown_sources_on_final_watermark",
+ "--environment_cache_millis=10000",
+ "--expansion_service_jar=${testServiceExpansionJar}",
+ ]
exec {
executable 'sh'
- args '-c', ". ${project.ext.envdir}/bin/activate && pip install -e .[test] && python -m apache_beam.transforms.external_test --expansion_service_jar=${project(":beam-runners-core-construction-java:").buildTestExpansionServiceJar.archivePath}"
+ args '-c', ". ${project.ext.envdir}/bin/activate && python -m apache_beam.examples.wordcount_xlang ${options.join(' ')}"
+ // TODO: Check that the output file is generated and runs.
}
}
}