You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2018/12/18 08:31:01 UTC
[beam] branch master updated: [BEAM-6067] Update BeamPython
side-input support in the Dataflow runner for the unified worker. (#7269)
This is an automated email from the ASF dual-hosted git repository.
robertwb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new dc62028 [BEAM-6067] Update BeamPython side-input support in the Dataflow runner for the unified worker. (#7269)
dc62028 is described below
commit dc62028e393ee3b7c7fc1c6c51af8dc701b65101
Author: CraigChambersG <45...@users.noreply.github.com>
AuthorDate: Tue Dec 18 00:30:52 2018 -0800
[BEAM-6067] Update BeamPython side-input support in the Dataflow runner for the unified worker. (#7269)
---
sdks/python/apache_beam/runners/dataflow/dataflow_runner.py | 10 +++++++---
sdks/python/apache_beam/runners/dataflow/internal/apiclient.py | 8 ++++++++
sdks/python/apache_beam/runners/worker/sdk_worker.py | 7 ++++---
3 files changed, 19 insertions(+), 6 deletions(-)
diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
index f53808d..e76f1bc 100644
--- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
+++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
@@ -323,7 +323,8 @@ class DataflowRunner(PipelineRunner):
'please install apache_beam[gcp]')
# Convert all side inputs into a form acceptable to Dataflow.
- if apiclient._use_fnapi(options):
+ if apiclient._use_fnapi(options) and (
+ not apiclient._use_unified_worker(options)):
pipeline.visit(self.side_input_visitor())
# Performing configured PTransform overrides. Note that this is currently
@@ -674,8 +675,11 @@ class DataflowRunner(PipelineRunner):
from apache_beam.runners.dataflow.internal import apiclient
transform_proto = self.proto_context.transforms.get_proto(transform_node)
transform_id = self.proto_context.transforms.get_id(transform_node)
- if (apiclient._use_fnapi(options)
- and transform_proto.spec.urn == common_urns.primitives.PAR_DO.urn):
+ # The data transmitted in SERIALIZED_FN is different depending on whether
+ # this is a fnapi pipeline or not.
+ if (apiclient._use_fnapi(options) and
+ (transform_proto.spec.urn == common_urns.primitives.PAR_DO.urn or
+ apiclient._use_unified_worker(options))):
# Patch side input ids to be unique across a given pipeline.
if (label_renames and
transform_proto.spec.urn == common_urns.primitives.PAR_DO.urn):
diff --git a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
index 2420186..833aa6b 100644
--- a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
+++ b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
@@ -849,6 +849,14 @@ def _use_fnapi(pipeline_options):
debug_options.experiments and 'beam_fn_api' in debug_options.experiments)
+def _use_unified_worker(pipeline_options):
+ debug_options = pipeline_options.view_as(DebugOptions)
+
+ return _use_fnapi(pipeline_options) and (
+ debug_options.experiments and
+ 'use_unified_worker' in debug_options.experiments)
+
+
def get_default_container_image_for_current_sdk(job_type):
"""For internal use only; no backwards-compatibility guarantees.
diff --git a/sdks/python/apache_beam/runners/worker/sdk_worker.py b/sdks/python/apache_beam/runners/worker/sdk_worker.py
index e8b8ceb..00cdf85 100644
--- a/sdks/python/apache_beam/runners/worker/sdk_worker.py
+++ b/sdks/python/apache_beam/runners/worker/sdk_worker.py
@@ -56,10 +56,10 @@ class SdkHarness(object):
self._worker_index = 0
self._worker_id = worker_id
if credentials is None:
- logging.info('Creating insecure control channel.')
+ logging.info('Creating insecure control channel for %s.', control_address)
self._control_channel = grpc.insecure_channel(control_address)
else:
- logging.info('Creating secure control channel.')
+ logging.info('Creating secure control channel for %s.', control_address)
self._control_channel = grpc.secure_channel(control_address, credentials)
grpc.channel_ready_future(self._control_channel).result(timeout=60)
logging.info('Control channel established.')
@@ -351,7 +351,7 @@ class GrpcStateHandlerFactory(StateHandlerFactory):
if url not in self._state_handler_cache:
with self._lock:
if url not in self._state_handler_cache:
- logging.info('Creating channel for %s', url)
+ logging.info('Creating insecure state channel for %s', url)
grpc_channel = grpc.insecure_channel(
url,
# Options to have no limits (-1) on the size of the messages
@@ -359,6 +359,7 @@ class GrpcStateHandlerFactory(StateHandlerFactory):
# controlled in a layer above.
options=[("grpc.max_receive_message_length", -1),
("grpc.max_send_message_length", -1)])
+ logging.info('State channel established.')
# Add workerId to the grpc channel
grpc_channel = grpc.intercept_channel(grpc_channel,
WorkerIdInterceptor())