You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2019/01/18 09:53:50 UTC

[beam] branch master updated: Changed BeamPython FnAPI state channel to use secure connection when credentials provided

This is an automated email from the ASF dual-hosted git repository.

robertwb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 679a37f  Changed BeamPython FnAPI state channel to use secure connection when credentials provided
     new 73674b8  Merge pull request #7549 Secure state channel
679a37f is described below

commit 679a37f088313f5eada9d8e2a95c8db9f894d0c6
Author: Craig Chambers <ch...@google.com>
AuthorDate: Thu Jan 17 09:20:45 2019 -0800

    Changed BeamPython FnAPI state channel to use secure connection when credentials provided
---
 .../apache_beam/runners/worker/sdk_worker.py       | 26 +++++++++++++---------
 1 file changed, 16 insertions(+), 10 deletions(-)

diff --git a/sdks/python/apache_beam/runners/worker/sdk_worker.py b/sdks/python/apache_beam/runners/worker/sdk_worker.py
index b23cf68..4eb22a8 100644
--- a/sdks/python/apache_beam/runners/worker/sdk_worker.py
+++ b/sdks/python/apache_beam/runners/worker/sdk_worker.py
@@ -71,7 +71,7 @@ class SdkHarness(object):
         self._control_channel, WorkerIdInterceptor(self._worker_id))
     self._data_channel_factory = data_plane.GrpcClientDataChannelFactory(
         credentials)
-    self._state_handler_factory = GrpcStateHandlerFactory()
+    self._state_handler_factory = GrpcStateHandlerFactory(credentials)
     self._profiler_factory = profiler_factory
     self.workers = queue.Queue()
     # one thread is enough for getting the progress report.
@@ -345,10 +345,11 @@ class GrpcStateHandlerFactory(StateHandlerFactory):
   Caches the created channels by ``state descriptor url``.
   """
 
-  def __init__(self):
+  def __init__(self, credentials=None):
     self._state_handler_cache = {}
     self._lock = threading.Lock()
     self._throwing_state_handler = ThrowingStateHandler()
+    self._credentials = credentials
 
   def create_state_handler(self, api_service_descriptor):
     if not api_service_descriptor:
@@ -357,14 +358,19 @@ class GrpcStateHandlerFactory(StateHandlerFactory):
     if url not in self._state_handler_cache:
       with self._lock:
         if url not in self._state_handler_cache:
-          logging.info('Creating insecure state channel for %s', url)
-          grpc_channel = GRPCChannelFactory.insecure_channel(
-              url,
-              # Options to have no limits (-1) on the size of the messages
-              # received or sent over the data plane. The actual buffer size is
-              # controlled in a layer above.
-              options=[("grpc.max_receive_message_length", -1),
-                       ("grpc.max_send_message_length", -1)])
+          # Options to have no limits (-1) on the size of the messages
+          # received or sent over the data plane. The actual buffer size is
+          # controlled in a layer above.
+          options = [('grpc.max_receive_message_length', -1),
+                     ('grpc.max_send_message_length', -1)]
+          if self._credentials is None:
+            logging.info('Creating insecure state channel for %s.', url)
+            grpc_channel = GRPCChannelFactory.insecure_channel(
+                url, options=options)
+          else:
+            logging.info('Creating secure state channel for %s.', url)
+            grpc_channel = GRPCChannelFactory.secure_channel(
+                url, self._credentials, options=options)
           logging.info('State channel established.')
           # Add workerId to the grpc channel
           grpc_channel = grpc.intercept_channel(grpc_channel,