You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2019/01/18 09:53:50 UTC
[beam] branch master updated: Changed BeamPython FnAPI state
channel to use secure connection when credentials provided
This is an automated email from the ASF dual-hosted git repository.
robertwb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 679a37f Changed BeamPython FnAPI state channel to use secure connection when credentials provided
new 73674b8 Merge pull request #7549 Secure state channel
679a37f is described below
commit 679a37f088313f5eada9d8e2a95c8db9f894d0c6
Author: Craig Chambers <ch...@google.com>
AuthorDate: Thu Jan 17 09:20:45 2019 -0800
Changed BeamPython FnAPI state channel to use secure connection when credentials provided
---
.../apache_beam/runners/worker/sdk_worker.py | 26 +++++++++++++---------
1 file changed, 16 insertions(+), 10 deletions(-)
diff --git a/sdks/python/apache_beam/runners/worker/sdk_worker.py b/sdks/python/apache_beam/runners/worker/sdk_worker.py
index b23cf68..4eb22a8 100644
--- a/sdks/python/apache_beam/runners/worker/sdk_worker.py
+++ b/sdks/python/apache_beam/runners/worker/sdk_worker.py
@@ -71,7 +71,7 @@ class SdkHarness(object):
self._control_channel, WorkerIdInterceptor(self._worker_id))
self._data_channel_factory = data_plane.GrpcClientDataChannelFactory(
credentials)
- self._state_handler_factory = GrpcStateHandlerFactory()
+ self._state_handler_factory = GrpcStateHandlerFactory(credentials)
self._profiler_factory = profiler_factory
self.workers = queue.Queue()
# one thread is enough for getting the progress report.
@@ -345,10 +345,11 @@ class GrpcStateHandlerFactory(StateHandlerFactory):
Caches the created channels by ``state descriptor url``.
"""
- def __init__(self):
+ def __init__(self, credentials=None):
self._state_handler_cache = {}
self._lock = threading.Lock()
self._throwing_state_handler = ThrowingStateHandler()
+ self._credentials = credentials
def create_state_handler(self, api_service_descriptor):
if not api_service_descriptor:
@@ -357,14 +358,19 @@ class GrpcStateHandlerFactory(StateHandlerFactory):
if url not in self._state_handler_cache:
with self._lock:
if url not in self._state_handler_cache:
- logging.info('Creating insecure state channel for %s', url)
- grpc_channel = GRPCChannelFactory.insecure_channel(
- url,
- # Options to have no limits (-1) on the size of the messages
- # received or sent over the data plane. The actual buffer size is
- # controlled in a layer above.
- options=[("grpc.max_receive_message_length", -1),
- ("grpc.max_send_message_length", -1)])
+ # Options to have no limits (-1) on the size of the messages
+ # received or sent over the data plane. The actual buffer size is
+ # controlled in a layer above.
+ options = [('grpc.max_receive_message_length', -1),
+ ('grpc.max_send_message_length', -1)]
+ if self._credentials is None:
+ logging.info('Creating insecure state channel for %s.', url)
+ grpc_channel = GRPCChannelFactory.insecure_channel(
+ url, options=options)
+ else:
+ logging.info('Creating secure state channel for %s.', url)
+ grpc_channel = GRPCChannelFactory.secure_channel(
+ url, self._credentials, options=options)
logging.info('State channel established.')
# Add workerId to the grpc channel
grpc_channel = grpc.intercept_channel(grpc_channel,