You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by al...@apache.org on 2019/05/29 16:58:39 UTC

[beam] branch master updated: Close grpc channel after pulling messages from pubsub

This is an automated email from the ASF dual-hosted git repository.

altay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new a9d7887  Close grpc channel after pulling messages from pubsub
     new d5d7a7b  Merge pull request #8708 from ostrokach/close_grcp_channel_pubsub
a9d7887 is described below

commit a9d788756bb1bbdbdb08fca2d10608031d7bd8f0
Author: Alexey Strokach <os...@gmail.com>
AuthorDate: Tue May 28 21:18:58 2019 +0000

    Close grpc channel after pulling messages from pubsub
---
 sdks/python/apache_beam/io/gcp/pubsub_test.py      | 21 ++++++++++++++++++
 .../runners/direct/transform_evaluator.py          | 25 ++++++++++++----------
 2 files changed, 35 insertions(+), 11 deletions(-)

diff --git a/sdks/python/apache_beam/io/gcp/pubsub_test.py b/sdks/python/apache_beam/io/gcp/pubsub_test.py
index bcc0158..2de41a5 100644
--- a/sdks/python/apache_beam/io/gcp/pubsub_test.py
+++ b/sdks/python/apache_beam/io/gcp/pubsub_test.py
@@ -373,6 +373,9 @@ class TestReadFromPubSub(unittest.TestCase):
     mock_pubsub.return_value.acknowledge.assert_has_calls([
         mock.call(mock.ANY, [ack_id])])
 
+    mock_pubsub.return_value.api.transport.channel.close.assert_has_calls([
+        mock.call()])
+
   def test_read_strings_success(self, mock_pubsub):
     data = u'🤷 ¯\\_(ツ)_/¯'
     data_encoded = data.encode('utf-8')
@@ -394,6 +397,9 @@ class TestReadFromPubSub(unittest.TestCase):
     mock_pubsub.return_value.acknowledge.assert_has_calls([
         mock.call(mock.ANY, [ack_id])])
 
+    mock_pubsub.return_value.api.transport.channel.close.assert_has_calls([
+        mock.call()])
+
   def test_read_data_success(self, mock_pubsub):
     data_encoded = u'🤷 ¯\\_(ツ)_/¯'.encode('utf-8')
     ack_id = 'ack_id'
@@ -412,6 +418,9 @@ class TestReadFromPubSub(unittest.TestCase):
     mock_pubsub.return_value.acknowledge.assert_has_calls([
         mock.call(mock.ANY, [ack_id])])
 
+    mock_pubsub.return_value.api.transport.channel.close.assert_has_calls([
+        mock.call()])
+
   def test_read_messages_timestamp_attribute_milli_success(self, mock_pubsub):
     data = b'data'
     attributes = {'time': '1337'}
@@ -442,6 +451,9 @@ class TestReadFromPubSub(unittest.TestCase):
     mock_pubsub.return_value.acknowledge.assert_has_calls([
         mock.call(mock.ANY, [ack_id])])
 
+    mock_pubsub.return_value.api.transport.channel.close.assert_has_calls([
+        mock.call()])
+
   def test_read_messages_timestamp_attribute_rfc3339_success(self, mock_pubsub):
     data = b'data'
     attributes = {'time': '2018-03-12T13:37:01.234567Z'}
@@ -472,6 +484,9 @@ class TestReadFromPubSub(unittest.TestCase):
     mock_pubsub.return_value.acknowledge.assert_has_calls([
         mock.call(mock.ANY, [ack_id])])
 
+    mock_pubsub.return_value.api.transport.channel.close.assert_has_calls([
+        mock.call()])
+
   def test_read_messages_timestamp_attribute_missing(self, mock_pubsub):
     data = b'data'
     attributes = {}
@@ -503,6 +518,9 @@ class TestReadFromPubSub(unittest.TestCase):
     mock_pubsub.return_value.acknowledge.assert_has_calls([
         mock.call(mock.ANY, [ack_id])])
 
+    mock_pubsub.return_value.api.transport.channel.close.assert_has_calls([
+        mock.call()])
+
   def test_read_messages_timestamp_attribute_fail_parse(self, mock_pubsub):
     data = b'data'
     attributes = {'time': '1337 unparseable'}
@@ -526,6 +544,9 @@ class TestReadFromPubSub(unittest.TestCase):
       p.run()
     mock_pubsub.return_value.acknowledge.assert_not_called()
 
+    mock_pubsub.return_value.api.transport.channel.close.assert_has_calls([
+        mock.call()])
+
   def test_read_message_id_label_unsupported(self, unused_mock_pubsub):
     # id_label is unsupported in DirectRunner.
     options = PipelineOptions([])
diff --git a/sdks/python/apache_beam/runners/direct/transform_evaluator.py b/sdks/python/apache_beam/runners/direct/transform_evaluator.py
index fad0704..7989162 100644
--- a/sdks/python/apache_beam/runners/direct/transform_evaluator.py
+++ b/sdks/python/apache_beam/runners/direct/transform_evaluator.py
@@ -448,13 +448,6 @@ class _PubSubReadEvaluator(_TransformEvaluator):
   def _read_from_pubsub(self, timestamp_attribute):
     from apache_beam.io.gcp.pubsub import PubsubMessage
     from google.cloud import pubsub
-    # Because of the AutoAck, we are not able to reread messages if this
-    # evaluator fails with an exception before emitting a bundle. However,
-    # the DirectRunner currently doesn't retry work items anyway, so the
-    # pipeline would enter an inconsistent state on any error.
-    sub_client = pubsub.SubscriberClient()
-    response = sub_client.pull(self._sub_name, max_messages=10,
-                               return_immediately=True)
 
     def _get_element(message):
       parsed_message = PubsubMessage._from_message(message)
@@ -474,10 +467,20 @@ class _PubSubReadEvaluator(_TransformEvaluator):
 
       return timestamp, parsed_message
 
-    results = [_get_element(rm.message) for rm in response.received_messages]
-    ack_ids = [rm.ack_id for rm in response.received_messages]
-    if ack_ids:
-      sub_client.acknowledge(self._sub_name, ack_ids)
+    # Because of the AutoAck, we are not able to reread messages if this
+    # evaluator fails with an exception before emitting a bundle. However,
+    # the DirectRunner currently doesn't retry work items anyway, so the
+    # pipeline would enter an inconsistent state on any error.
+    sub_client = pubsub.SubscriberClient()
+    try:
+      response = sub_client.pull(self._sub_name, max_messages=10,
+                                 return_immediately=True)
+      results = [_get_element(rm.message) for rm in response.received_messages]
+      ack_ids = [rm.ack_id for rm in response.received_messages]
+      if ack_ids:
+        sub_client.acknowledge(self._sub_name, ack_ids)
+    finally:
+      sub_client.api.transport.channel.close()
 
     return results