You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2018/12/18 08:31:01 UTC

[beam] branch master updated: [BEAM-6067] Update BeamPython side-input support in the Dataflow runner for the unified worker. (#7269)

This is an automated email from the ASF dual-hosted git repository.

robertwb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new dc62028  [BEAM-6067] Update BeamPython side-input support in the Dataflow runner for the unified worker. (#7269)
dc62028 is described below

commit dc62028e393ee3b7c7fc1c6c51af8dc701b65101
Author: CraigChambersG <45...@users.noreply.github.com>
AuthorDate: Tue Dec 18 00:30:52 2018 -0800

    [BEAM-6067] Update BeamPython side-input support in the Dataflow runner for the unified worker. (#7269)
---
 sdks/python/apache_beam/runners/dataflow/dataflow_runner.py    | 10 +++++++---
 sdks/python/apache_beam/runners/dataflow/internal/apiclient.py |  8 ++++++++
 sdks/python/apache_beam/runners/worker/sdk_worker.py           |  7 ++++---
 3 files changed, 19 insertions(+), 6 deletions(-)

diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
index f53808d..e76f1bc 100644
--- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
+++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
@@ -323,7 +323,8 @@ class DataflowRunner(PipelineRunner):
           'please install apache_beam[gcp]')
 
     # Convert all side inputs into a form acceptable to Dataflow.
-    if apiclient._use_fnapi(options):
+    if apiclient._use_fnapi(options) and (
+        not apiclient._use_unified_worker(options)):
       pipeline.visit(self.side_input_visitor())
 
     # Performing configured PTransform overrides.  Note that this is currently
@@ -674,8 +675,11 @@ class DataflowRunner(PipelineRunner):
     from apache_beam.runners.dataflow.internal import apiclient
     transform_proto = self.proto_context.transforms.get_proto(transform_node)
     transform_id = self.proto_context.transforms.get_id(transform_node)
-    if (apiclient._use_fnapi(options)
-        and transform_proto.spec.urn == common_urns.primitives.PAR_DO.urn):
+    # The data transmitted in SERIALIZED_FN is different depending on whether
+    # this is a fnapi pipeline or not.
+    if (apiclient._use_fnapi(options) and
+        (transform_proto.spec.urn == common_urns.primitives.PAR_DO.urn or
+         apiclient._use_unified_worker(options))):
       # Patch side input ids to be unique across a given pipeline.
       if (label_renames and
           transform_proto.spec.urn == common_urns.primitives.PAR_DO.urn):
diff --git a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
index 2420186..833aa6b 100644
--- a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
+++ b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
@@ -849,6 +849,14 @@ def _use_fnapi(pipeline_options):
       debug_options.experiments and 'beam_fn_api' in debug_options.experiments)
 
 
+def _use_unified_worker(pipeline_options):
+  debug_options = pipeline_options.view_as(DebugOptions)
+
+  return _use_fnapi(pipeline_options) and (
+      debug_options.experiments and
+      'use_unified_worker' in debug_options.experiments)
+
+
 def get_default_container_image_for_current_sdk(job_type):
   """For internal use only; no backwards-compatibility guarantees.
 
diff --git a/sdks/python/apache_beam/runners/worker/sdk_worker.py b/sdks/python/apache_beam/runners/worker/sdk_worker.py
index e8b8ceb..00cdf85 100644
--- a/sdks/python/apache_beam/runners/worker/sdk_worker.py
+++ b/sdks/python/apache_beam/runners/worker/sdk_worker.py
@@ -56,10 +56,10 @@ class SdkHarness(object):
     self._worker_index = 0
     self._worker_id = worker_id
     if credentials is None:
-      logging.info('Creating insecure control channel.')
+      logging.info('Creating insecure control channel for %s.', control_address)
       self._control_channel = grpc.insecure_channel(control_address)
     else:
-      logging.info('Creating secure control channel.')
+      logging.info('Creating secure control channel for %s.', control_address)
       self._control_channel = grpc.secure_channel(control_address, credentials)
     grpc.channel_ready_future(self._control_channel).result(timeout=60)
     logging.info('Control channel established.')
@@ -351,7 +351,7 @@ class GrpcStateHandlerFactory(StateHandlerFactory):
     if url not in self._state_handler_cache:
       with self._lock:
         if url not in self._state_handler_cache:
-          logging.info('Creating channel for %s', url)
+          logging.info('Creating insecure state channel for %s', url)
           grpc_channel = grpc.insecure_channel(
               url,
               # Options to have no limits (-1) on the size of the messages
@@ -359,6 +359,7 @@ class GrpcStateHandlerFactory(StateHandlerFactory):
               # controlled in a layer above.
               options=[("grpc.max_receive_message_length", -1),
                        ("grpc.max_send_message_length", -1)])
+          logging.info('State channel established.')
           # Add workerId to the grpc channel
           grpc_channel = grpc.intercept_channel(grpc_channel,
                                                 WorkerIdInterceptor())