You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by cc...@apache.org on 2018/10/15 20:29:27 UTC

[beam] branch revert-6674-rev1 created (now cda0cab)

This is an automated email from the ASF dual-hosted git repository.

ccy pushed a change to branch revert-6674-rev1
in repository https://gitbox.apache.org/repos/asf/beam.git.


      at cda0cab  Revert "[BEAM-5706] Revert 324f0b3e3c (pull request #6564 from udim/pubsub-0-35-4)"

This branch includes the following new commits:

     new cda0cab  Revert "[BEAM-5706] Revert 324f0b3e3c (pull request #6564 from udim/pubsub-0-35-4)"

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[beam] 01/01: Revert "[BEAM-5706] Revert 324f0b3e3c (pull request #6564 from udim/pubsub-0-35-4)"

Posted by cc...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ccy pushed a commit to branch revert-6674-rev1
in repository https://gitbox.apache.org/repos/asf/beam.git

commit cda0cabf6935c41290a1ab0265e0ef814c59df1b
Author: Charles Chen <ch...@users.noreply.github.com>
AuthorDate: Mon Oct 15 13:29:16 2018 -0700

    Revert "[BEAM-5706] Revert 324f0b3e3c (pull request #6564 from udim/pubsub-0-35-4)"
---
 .../examples/complete/game/game_stats_it_test.py   |  31 ++-
 .../examples/complete/game/leader_board_it_test.py |  32 +--
 .../examples/streaming_wordcount_it_test.py        |  45 ++--
 sdks/python/apache_beam/io/gcp/pubsub.py           |  13 +-
 .../apache_beam/io/gcp/pubsub_integration_test.py  |  39 +--
 sdks/python/apache_beam/io/gcp/pubsub_test.py      | 270 ++++++---------------
 .../apache_beam/io/gcp/tests/pubsub_matcher.py     |  44 ++--
 .../io/gcp/tests/pubsub_matcher_test.py            |  75 +++---
 .../apache_beam/runners/direct/direct_runner.py    |  37 +--
 .../runners/direct/test_direct_runner.py           |   1 +
 .../runners/direct/transform_evaluator.py          |  98 +++++---
 sdks/python/apache_beam/testing/test_utils.py      |  83 ++++---
 sdks/python/apache_beam/testing/test_utils_test.py |  57 +----
 sdks/python/container/base_image_requirements.txt  |   3 +-
 sdks/python/setup.py                               |   3 +-
 15 files changed, 350 insertions(+), 481 deletions(-)

diff --git a/sdks/python/apache_beam/examples/complete/game/game_stats_it_test.py b/sdks/python/apache_beam/examples/complete/game/game_stats_it_test.py
index 6dc60d0..2fc19da 100644
--- a/sdks/python/apache_beam/examples/complete/game/game_stats_it_test.py
+++ b/sdks/python/apache_beam/examples/complete/game/game_stats_it_test.py
@@ -72,15 +72,15 @@ class GameStatsIT(unittest.TestCase):
 
     # Set up PubSub environment.
     from google.cloud import pubsub
-    self.pubsub_client = pubsub.Client(project=self.project)
-    unique_topic_name = self.INPUT_TOPIC + _unique_id
-    unique_subscrition_name = self.INPUT_SUB + _unique_id
-    self.input_topic = self.pubsub_client.topic(unique_topic_name)
-    self.input_sub = self.input_topic.subscription(unique_subscrition_name)
+    self.pub_client = pubsub.PublisherClient()
+    self.input_topic = self.pub_client.create_topic(
+        self.pub_client.topic_path(self.project, self.INPUT_TOPIC + _unique_id))
 
-    self.input_topic.create()
-    test_utils.wait_for_topics_created([self.input_topic])
-    self.input_sub.create()
+    self.sub_client = pubsub.SubscriberClient()
+    self.input_sub = self.sub_client.create_subscription(
+        self.sub_client.subscription_path(self.project,
+                                          self.INPUT_SUB + _unique_id),
+        self.input_topic.name)
 
     # Set up BigQuery environment
     from google.cloud import bigquery
@@ -95,14 +95,15 @@ class GameStatsIT(unittest.TestCase):
     """Inject game events as test data to PubSub."""
 
     logging.debug('Injecting %d game events to topic %s',
-                  message_count, topic.full_name)
+                  message_count, topic.name)
 
     for _ in range(message_count):
-      topic.publish(self.INPUT_EVENT % self._test_timestamp)
+      self.pub_client.publish(topic.name,
+                              self.INPUT_EVENT % self._test_timestamp)
 
   def _cleanup_pubsub(self):
-    test_utils.cleanup_subscriptions([self.input_sub])
-    test_utils.cleanup_topics([self.input_topic])
+    test_utils.cleanup_subscriptions(self.sub_client, [self.input_sub])
+    test_utils.cleanup_topics(self.pub_client, [self.input_topic])
 
   def _cleanup_dataset(self):
     self.dataset.delete()
@@ -123,9 +124,9 @@ class GameStatsIT(unittest.TestCase):
 
     # TODO(mariagh): Add teams table verifier once game_stats.py is fixed.
 
-    extra_opts = {'subscription': self.input_sub.full_name,
+    extra_opts = {'subscription': self.input_sub.name,
                   'dataset': self.dataset.name,
-                  'topic': self.input_topic.full_name,
+                  'topic': self.input_topic.name,
                   'fixed_window_duration': 1,
                   'user_activity_window_duration': 1,
                   'wait_until_finish_duration':
@@ -143,8 +144,6 @@ class GameStatsIT(unittest.TestCase):
                     self.dataset.name, self.OUTPUT_TABLE_TEAMS)
 
     # Generate input data and inject to PubSub.
-    test_utils.wait_for_subscriptions_created([self.input_topic,
-                                               self.input_sub])
     self._inject_pubsub_game_events(self.input_topic, self.DEFAULT_INPUT_COUNT)
 
     # Get pipeline options from command argument: --test-pipeline-options,
diff --git a/sdks/python/apache_beam/examples/complete/game/leader_board_it_test.py b/sdks/python/apache_beam/examples/complete/game/leader_board_it_test.py
index ab10942..e0e309b 100644
--- a/sdks/python/apache_beam/examples/complete/game/leader_board_it_test.py
+++ b/sdks/python/apache_beam/examples/complete/game/leader_board_it_test.py
@@ -73,15 +73,16 @@ class LeaderBoardIT(unittest.TestCase):
 
     # Set up PubSub environment.
     from google.cloud import pubsub
-    self.pubsub_client = pubsub.Client(project=self.project)
-    unique_topic_name = self.INPUT_TOPIC + _unique_id
-    unique_subscrition_name = self.INPUT_SUB + _unique_id
-    self.input_topic = self.pubsub_client.topic(unique_topic_name)
-    self.input_sub = self.input_topic.subscription(unique_subscrition_name)
 
-    self.input_topic.create()
-    test_utils.wait_for_topics_created([self.input_topic])
-    self.input_sub.create()
+    self.pub_client = pubsub.PublisherClient()
+    self.input_topic = self.pub_client.create_topic(
+        self.pub_client.topic_path(self.project, self.INPUT_TOPIC + _unique_id))
+
+    self.sub_client = pubsub.SubscriberClient()
+    self.input_sub = self.sub_client.create_subscription(
+        self.sub_client.subscription_path(self.project,
+                                          self.INPUT_SUB + _unique_id),
+        self.input_topic.name)
 
     # Set up BigQuery environment
     from google.cloud import bigquery
@@ -96,14 +97,15 @@ class LeaderBoardIT(unittest.TestCase):
     """Inject game events as test data to PubSub."""
 
     logging.debug('Injecting %d game events to topic %s',
-                  message_count, topic.full_name)
+                  message_count, topic.name)
 
     for _ in range(message_count):
-      topic.publish(self.INPUT_EVENT % self._test_timestamp)
+      self.pub_client.publish(topic.name,
+                              self.INPUT_EVENT % self._test_timestamp)
 
   def _cleanup_pubsub(self):
-    test_utils.cleanup_subscriptions([self.input_sub])
-    test_utils.cleanup_topics([self.input_topic])
+    test_utils.cleanup_subscriptions(self.sub_client, [self.input_sub])
+    test_utils.cleanup_topics(self.pub_client, [self.input_topic])
 
   def _cleanup_dataset(self):
     self.dataset.delete()
@@ -131,9 +133,9 @@ class LeaderBoardIT(unittest.TestCase):
                                         teams_query,
                                         self.DEFAULT_EXPECTED_CHECKSUM)
 
-    extra_opts = {'subscription': self.input_sub.full_name,
+    extra_opts = {'subscription': self.input_sub.name,
                   'dataset': self.dataset.name,
-                  'topic': self.input_topic.full_name,
+                  'topic': self.input_topic.name,
                   'team_window_duration': 1,
                   'wait_until_finish_duration':
                       self.WAIT_UNTIL_FINISH_DURATION,
@@ -151,8 +153,6 @@ class LeaderBoardIT(unittest.TestCase):
                     self.dataset.name, self.OUTPUT_TABLE_TEAMS)
 
     # Generate input data and inject to PubSub.
-    test_utils.wait_for_subscriptions_created([self.input_topic,
-                                               self.input_sub])
     self._inject_pubsub_game_events(self.input_topic, self.DEFAULT_INPUT_COUNT)
 
     # Get pipeline options from command argument: --test-pipeline-options,
diff --git a/sdks/python/apache_beam/examples/streaming_wordcount_it_test.py b/sdks/python/apache_beam/examples/streaming_wordcount_it_test.py
index 3c0cfa9..78e89a1 100644
--- a/sdks/python/apache_beam/examples/streaming_wordcount_it_test.py
+++ b/sdks/python/apache_beam/examples/streaming_wordcount_it_test.py
@@ -52,31 +52,31 @@ class StreamingWordCountIT(unittest.TestCase):
 
     # Set up PubSub environment.
     from google.cloud import pubsub
-    self.pubsub_client = pubsub.Client(project=self.project)
-    self.input_topic = self.pubsub_client.topic(INPUT_TOPIC + self.uuid)
-    self.output_topic = self.pubsub_client.topic(OUTPUT_TOPIC + self.uuid)
-    self.input_sub = self.input_topic.subscription(INPUT_SUB + self.uuid)
-    self.output_sub = self.output_topic.subscription(OUTPUT_SUB + self.uuid)
-
-    self.input_topic.create()
-    self.output_topic.create()
-    test_utils.wait_for_topics_created([self.input_topic, self.output_topic])
-    self.input_sub.create()
-    self.output_sub.create()
+    self.pub_client = pubsub.PublisherClient()
+    self.input_topic = self.pub_client.create_topic(
+        self.pub_client.topic_path(self.project, INPUT_TOPIC + self.uuid))
+    self.output_topic = self.pub_client.create_topic(
+        self.pub_client.topic_path(self.project, OUTPUT_TOPIC + self.uuid))
+
+    self.sub_client = pubsub.SubscriberClient()
+    self.input_sub = self.sub_client.create_subscription(
+        self.sub_client.subscription_path(self.project, INPUT_SUB + self.uuid),
+        self.input_topic.name)
+    self.output_sub = self.sub_client.create_subscription(
+        self.sub_client.subscription_path(self.project, OUTPUT_SUB + self.uuid),
+        self.output_topic.name)
 
   def _inject_numbers(self, topic, num_messages):
     """Inject numbers as test data to PubSub."""
-    logging.debug('Injecting %d numbers to topic %s',
-                  num_messages, topic.full_name)
+    logging.debug('Injecting %d numbers to topic %s', num_messages, topic.name)
     for n in range(num_messages):
-      topic.publish(str(n))
-
-  def _cleanup_pubsub(self):
-    test_utils.cleanup_subscriptions([self.input_sub, self.output_sub])
-    test_utils.cleanup_topics([self.input_topic, self.output_topic])
+      self.pub_client.publish(self.input_topic.name, str(n))
 
   def tearDown(self):
-    self._cleanup_pubsub()
+    test_utils.cleanup_subscriptions(self.sub_client,
+                                     [self.input_sub, self.output_sub])
+    test_utils.cleanup_topics(self.pub_client,
+                              [self.input_topic, self.output_topic])
 
   @attr('IT')
   def test_streaming_wordcount_it(self):
@@ -86,17 +86,16 @@ class StreamingWordCountIT(unittest.TestCase):
     # Set extra options to the pipeline for test purpose
     state_verifier = PipelineStateMatcher(PipelineState.RUNNING)
     pubsub_msg_verifier = PubSubMessageMatcher(self.project,
-                                               OUTPUT_SUB + self.uuid,
+                                               self.output_sub.name,
                                                expected_msg,
                                                timeout=400)
-    extra_opts = {'input_subscription': self.input_sub.full_name,
-                  'output_topic': self.output_topic.full_name,
+    extra_opts = {'input_subscription': self.input_sub.name,
+                  'output_topic': self.output_topic.name,
                   'wait_until_finish_duration': WAIT_UNTIL_FINISH_DURATION,
                   'on_success_matcher': all_of(state_verifier,
                                                pubsub_msg_verifier)}
 
     # Generate input data and inject to PubSub.
-    test_utils.wait_for_subscriptions_created([self.input_sub])
     self._inject_numbers(self.input_topic, DEFAULT_INPUT_NUMBERS)
 
     # Get pipeline options from command argument: --test-pipeline-options,
diff --git a/sdks/python/apache_beam/io/gcp/pubsub.py b/sdks/python/apache_beam/io/gcp/pubsub.py
index 2414194..a1644ab 100644
--- a/sdks/python/apache_beam/io/gcp/pubsub.py
+++ b/sdks/python/apache_beam/io/gcp/pubsub.py
@@ -38,11 +38,10 @@ from apache_beam.transforms import PTransform
 from apache_beam.transforms.display import DisplayDataItem
 from apache_beam.utils.annotations import deprecated
 
-# The protobuf library is only used for running on Dataflow.
 try:
-  from google.cloud.proto.pubsub.v1 import pubsub_pb2
+  from google.cloud import pubsub
 except ImportError:
-  pubsub_pb2 = None
+  pubsub = None
 
 __all__ = ['PubsubMessage', 'ReadFromPubSub', 'ReadStringsFromPubSub',
            'WriteStringsToPubSub', 'WriteToPubSub']
@@ -92,7 +91,7 @@ class PubsubMessage(object):
     Returns:
       A new PubsubMessage object.
     """
-    msg = pubsub_pb2.PubsubMessage()
+    msg = pubsub.types.pubsub_pb2.PubsubMessage()
     msg.ParseFromString(proto_msg)
     # Convert ScalarMapContainer to dict.
     attributes = dict((key, msg.attributes[key]) for key in msg.attributes)
@@ -109,7 +108,7 @@ class PubsubMessage(object):
       https://cloud.google.com/pubsub/docs/reference/rpc/google.pubsub.v1#google.pubsub.v1.PubsubMessage
       containing the payload of this object.
     """
-    msg = pubsub_pb2.PubsubMessage()
+    msg = pubsub.types.pubsub_pb2.PubsubMessage()
     msg.data = self.data
     for key, value in self.attributes.iteritems():
       msg.attributes[key] = value
@@ -117,9 +116,9 @@ class PubsubMessage(object):
 
   @staticmethod
   def _from_message(msg):
-    """Construct from ``google.cloud.pubsub.message.Message``.
+    """Construct from ``google.cloud.pubsub_v1.subscriber.message.Message``.
 
-    https://google-cloud-python.readthedocs.io/en/latest/pubsub/subscriber/api/message.html
+    https://googleapis.github.io/google-cloud-python/latest/pubsub/subscriber/api/message.html
     """
     # Convert ScalarMapContainer to dict.
     attributes = dict((key, msg.attributes[key]) for key in msg.attributes)
diff --git a/sdks/python/apache_beam/io/gcp/pubsub_integration_test.py b/sdks/python/apache_beam/io/gcp/pubsub_integration_test.py
index 9bb81fc..5b060e5 100644
--- a/sdks/python/apache_beam/io/gcp/pubsub_integration_test.py
+++ b/sdks/python/apache_beam/io/gcp/pubsub_integration_test.py
@@ -100,21 +100,25 @@ class PubSubIntegrationTest(unittest.TestCase):
 
     # Set up PubSub environment.
     from google.cloud import pubsub
-    self.pubsub_client = pubsub.Client(project=self.project)
-    self.input_topic = self.pubsub_client.topic(INPUT_TOPIC + self.uuid)
-    self.output_topic = self.pubsub_client.topic(OUTPUT_TOPIC + self.uuid)
-    self.input_sub = self.input_topic.subscription(INPUT_SUB + self.uuid)
-    self.output_sub = self.output_topic.subscription(OUTPUT_SUB + self.uuid)
-
-    self.input_topic.create()
-    self.output_topic.create()
-    test_utils.wait_for_topics_created([self.input_topic, self.output_topic])
-    self.input_sub.create()
-    self.output_sub.create()
+    self.pub_client = pubsub.PublisherClient()
+    self.input_topic = self.pub_client.create_topic(
+        self.pub_client.topic_path(self.project, INPUT_TOPIC + self.uuid))
+    self.output_topic = self.pub_client.create_topic(
+        self.pub_client.topic_path(self.project, OUTPUT_TOPIC + self.uuid))
+
+    self.sub_client = pubsub.SubscriberClient()
+    self.input_sub = self.sub_client.create_subscription(
+        self.sub_client.subscription_path(self.project, INPUT_SUB + self.uuid),
+        self.input_topic.name)
+    self.output_sub = self.sub_client.create_subscription(
+        self.sub_client.subscription_path(self.project, OUTPUT_SUB + self.uuid),
+        self.output_topic.name)
 
   def tearDown(self):
-    test_utils.cleanup_subscriptions([self.input_sub, self.output_sub])
-    test_utils.cleanup_topics([self.input_topic, self.output_topic])
+    test_utils.cleanup_subscriptions(self.sub_client,
+                                     [self.input_sub, self.output_sub])
+    test_utils.cleanup_topics(self.pub_client,
+                              [self.input_topic, self.output_topic])
 
   def _test_streaming(self, with_attributes):
     """Runs IT pipeline with message verifier.
@@ -139,21 +143,20 @@ class PubSubIntegrationTest(unittest.TestCase):
       strip_attributes = [self.ID_LABEL, self.TIMESTAMP_ATTRIBUTE]
     pubsub_msg_verifier = PubSubMessageMatcher(
         self.project,
-        OUTPUT_SUB + self.uuid,
+        self.output_sub.name,
         expected_messages,
         timeout=MESSAGE_MATCHER_TIMEOUT_S,
         with_attributes=with_attributes,
         strip_attributes=strip_attributes)
-    extra_opts = {'input_subscription': self.input_sub.full_name,
-                  'output_topic': self.output_topic.full_name,
+    extra_opts = {'input_subscription': self.input_sub.name,
+                  'output_topic': self.output_topic.name,
                   'wait_until_finish_duration': TEST_PIPELINE_DURATION_MS,
                   'on_success_matcher': all_of(state_verifier,
                                                pubsub_msg_verifier)}
 
     # Generate input data and inject to PubSub.
-    test_utils.wait_for_subscriptions_created([self.input_sub])
     for msg in self.INPUT_MESSAGES[self.runner_name]:
-      self.input_topic.publish(msg.data, **msg.attributes)
+      self.pub_client.publish(self.input_topic.name, msg.data, **msg.attributes)
 
     # Get pipeline options from command argument: --test-pipeline-options,
     # and start pipeline job by calling pipeline main function.
diff --git a/sdks/python/apache_beam/io/gcp/pubsub_test.py b/sdks/python/apache_beam/io/gcp/pubsub_test.py
index 6e19950..a95ffc6 100644
--- a/sdks/python/apache_beam/io/gcp/pubsub_test.py
+++ b/sdks/python/apache_beam/io/gcp/pubsub_test.py
@@ -20,12 +20,9 @@
 
 from __future__ import absolute_import
 
-import functools
 import logging
 import unittest
 from builtins import object
-from builtins import range
-from builtins import zip
 
 import hamcrest as hc
 import mock
@@ -43,6 +40,7 @@ from apache_beam.runners.direct import transform_evaluator
 from apache_beam.runners.direct.direct_runner import _DirectReadFromPubSub
 from apache_beam.runners.direct.direct_runner import _get_transform_overrides
 from apache_beam.runners.direct.transform_evaluator import _PubSubReadEvaluator
+from apache_beam.testing import test_utils
 from apache_beam.testing.test_pipeline import TestPipeline
 from apache_beam.testing.util import TestWindowedValue
 from apache_beam.testing.util import assert_that
@@ -54,18 +52,10 @@ from apache_beam.transforms.display_test import DisplayDataItemMatcher
 from apache_beam.utils import timestamp
 
 # Protect against environments where the PubSub library is not available.
-# pylint: disable=wrong-import-order, wrong-import-position
 try:
   from google.cloud import pubsub
 except ImportError:
   pubsub = None
-# pylint: enable=wrong-import-order, wrong-import-position
-
-# The protobuf library is only used for running on Dataflow.
-try:
-  from google.cloud.proto.pubsub.v1 import pubsub_pb2
-except ImportError:
-  pubsub_pb2 = None
 
 
 class TestPubsubMessage(unittest.TestCase):
@@ -81,8 +71,7 @@ class TestPubsubMessage(unittest.TestCase):
     with self.assertRaisesRegexp(ValueError, r'data.*attributes.*must be set'):
       _ = PubsubMessage(None, {})
 
-  @unittest.skipIf(pubsub_pb2 is None,
-                   'PubSub proto dependencies are not installed')
+  @unittest.skipIf(pubsub is None, 'GCP dependencies are not installed')
   def test_proto_conversion(self):
     data = 'data'
     attributes = {'k1': 'v1', 'k2': 'v2'}
@@ -220,7 +209,7 @@ class TestWriteStringsToPubSubOverride(unittest.TestCase):
     write_transform = pcoll.producer.inputs[0].producer.transform
 
     # Ensure that the properties passed through correctly
-    self.assertEqual('a_topic', write_transform.dofn.topic_name)
+    self.assertEqual('a_topic', write_transform.dofn.short_topic_name)
 
   def test_expand(self):
     p = TestPipeline()
@@ -240,7 +229,7 @@ class TestWriteStringsToPubSubOverride(unittest.TestCase):
     write_transform = pcoll.producer.inputs[0].producer.transform
 
     # Ensure that the properties passed through correctly
-    self.assertEqual('a_topic', write_transform.dofn.topic_name)
+    self.assertEqual('a_topic', write_transform.dofn.short_topic_name)
     self.assertEqual(True, write_transform.dofn.with_attributes)
     # TODO(BEAM-4275): These properties aren't supported yet in direct runner.
     self.assertEqual(None, write_transform.dofn.id_label)
@@ -333,118 +322,25 @@ transform_evaluator.TransformEvaluatorRegistry._test_evaluators_overrides = {
 }
 
 
-class FakePubsubTopic(object):
-
-  def __init__(self, name, client):
-    self.name = name
-    self.client = client
-
-  def subscription(self, name):
-    return FakePubsubSubscription(name, self.name, self.client)
-
-  def batch(self):
-    if self.client.batch is None:
-      self.client.batch = FakeBatch(self.client)
-    return self.client.batch
-
-
-class FakePubsubSubscription(object):
-
-  def __init__(self, name, topic, client):
-    self.name = name
-    self.topic = topic
-    self.client = client
-
-  def create(self):
-    pass
-
-
-class FakeAutoAck(object):
-
-  def __init__(self, sub, **unused_kwargs):
-    self.sub = sub
-
-  def __enter__(self):
-    messages = self.sub.client.messages_read
-    self.ack_id_to_msg = dict(zip(range(len(messages)), messages))
-    return self.ack_id_to_msg
-
-  def __exit__(self, exc_type, exc_val, exc_tb):
-    pass
-
-
-class FakeBatch(object):
-  """Context manager that accept Pubsub client writes via publish().
-
-  Verifies writes on exit.
-  """
-
-  def __init__(self, client):
-    self.client = client
-    self.published = []
-
-  def __enter__(self):
-    return self
-
-  def __exit__(self, exc_type, exc_val, exc_tb):
-    if exc_type is not None:
-      return  # Exception will be raised.
-    hc.assert_that(self.published,
-                   hc.only_contains(*self.client.messages_write))
-
-  def publish(self, message, **attrs):
-    self.published.append([message, attrs])
-
-
-class FakePubsubClient(object):
-
-  def __init__(self, messages_read=None, messages_write=None, project=None,
-               **unused_kwargs):
-    """Creates a Pubsub client fake.
-
-    Args:
-      messages_read: List of PubsubMessage objects to return.
-      messages_write: List of [data, attributes] pairs, corresponding to
-        messages expected to be written to the client.
-      project: Name of GCP project.
-    """
-    self.messages_read = messages_read
-    self.messages_write = messages_write
-    self.project = project
-    self.batch = None
-
-  def topic(self, name):
-    return FakePubsubTopic(name, self)
-
-
-def create_client_message(data, message_id, attributes, publish_time):
-  """Returns a message as it would be returned from Cloud Pub/Sub client.
-
-  This is what the reader sees.
-  """
-  msg = pubsub.message.Message(data, message_id, attributes)
-  msg._service_timestamp = publish_time
-  return msg
-
-
 @unittest.skipIf(pubsub is None, 'GCP dependencies are not installed')
+@mock.patch('google.cloud.pubsub.SubscriberClient')
 class TestReadFromPubSub(unittest.TestCase):
 
-  @mock.patch('google.cloud.pubsub')
   def test_read_messages_success(self, mock_pubsub):
     data = 'data'
-    message_id = 'message_id'
-    publish_time = '2018-03-12T13:37:01.234567Z'
+    publish_time_secs = 1520861821
+    publish_time_nanos = 234567000
     attributes = {'key': 'value'}
-    payloads = [create_client_message(
-        data, message_id, attributes, publish_time)]
+    ack_id = 'ack_id'
+    pull_response = test_utils.create_pull_response([
+        test_utils.PullResponseMessage(
+            data, attributes, publish_time_secs, publish_time_nanos, ack_id)
+    ])
     expected_elements = [
         TestWindowedValue(PubsubMessage(data, attributes),
                           timestamp.Timestamp(1520861821.234567),
                           [window.GlobalWindow()])]
-
-    mock_pubsub.Client = functools.partial(FakePubsubClient, payloads)
-    mock_pubsub.subscription.AutoAck = FakeAutoAck
+    mock_pubsub.return_value.pull.return_value = pull_response
 
     p = TestPipeline()
     p.options.view_as(StandardOptions).streaming = True
@@ -453,17 +349,18 @@ class TestReadFromPubSub(unittest.TestCase):
                               None, None, with_attributes=True))
     assert_that(pcoll, equal_to(expected_elements), reify_windows=True)
     p.run()
+    mock_pubsub.return_value.acknowledge.assert_has_calls([
+        mock.call(mock.ANY, [ack_id])])
 
-  @mock.patch('google.cloud.pubsub')
   def test_read_strings_success(self, mock_pubsub):
     data = u'🤷 ¯\\_(ツ)_/¯'
     data_encoded = data.encode('utf-8')
-    publish_time = '2018-03-12T13:37:01.234567Z'
-    payloads = [create_client_message(data_encoded, None, None, publish_time)]
+    ack_id = 'ack_id'
+    pull_response = test_utils.create_pull_response([
+        test_utils.PullResponseMessage(data_encoded, ack_id=ack_id)
+    ])
     expected_elements = [data]
-
-    mock_pubsub.Client = functools.partial(FakePubsubClient, payloads)
-    mock_pubsub.subscription.AutoAck = FakeAutoAck
+    mock_pubsub.return_value.pull.return_value = pull_response
 
     p = TestPipeline()
     p.options.view_as(StandardOptions).streaming = True
@@ -472,16 +369,16 @@ class TestReadFromPubSub(unittest.TestCase):
                                      None, None))
     assert_that(pcoll, equal_to(expected_elements))
     p.run()
+    mock_pubsub.return_value.acknowledge.assert_has_calls([
+        mock.call(mock.ANY, [ack_id])])
 
-  @mock.patch('google.cloud.pubsub')
   def test_read_data_success(self, mock_pubsub):
     data_encoded = u'🤷 ¯\\_(ツ)_/¯'.encode('utf-8')
-    publish_time = '2018-03-12T13:37:01.234567Z'
-    payloads = [create_client_message(data_encoded, None, None, publish_time)]
+    ack_id = 'ack_id'
+    pull_response = test_utils.create_pull_response([
+        test_utils.PullResponseMessage(data_encoded, ack_id=ack_id)])
     expected_elements = [data_encoded]
-
-    mock_pubsub.Client = functools.partial(FakePubsubClient, payloads)
-    mock_pubsub.subscription.AutoAck = FakeAutoAck
+    mock_pubsub.return_value.pull.return_value = pull_response
 
     p = TestPipeline()
     p.options.view_as(StandardOptions).streaming = True
@@ -489,24 +386,26 @@ class TestReadFromPubSub(unittest.TestCase):
              | ReadFromPubSub('projects/fakeprj/topics/a_topic', None, None))
     assert_that(pcoll, equal_to(expected_elements))
     p.run()
+    mock_pubsub.return_value.acknowledge.assert_has_calls([
+        mock.call(mock.ANY, [ack_id])])
 
-  @mock.patch('google.cloud.pubsub')
   def test_read_messages_timestamp_attribute_milli_success(self, mock_pubsub):
     data = 'data'
-    message_id = 'message_id'
     attributes = {'time': '1337'}
-    publish_time = '2018-03-12T13:37:01.234567Z'
-    payloads = [
-        create_client_message(data, message_id, attributes, publish_time)]
+    publish_time_secs = 1520861821
+    publish_time_nanos = 234567000
+    ack_id = 'ack_id'
+    pull_response = test_utils.create_pull_response([
+        test_utils.PullResponseMessage(
+            data, attributes, publish_time_secs, publish_time_nanos, ack_id)
+    ])
     expected_elements = [
         TestWindowedValue(
             PubsubMessage(data, attributes),
             timestamp.Timestamp(micros=int(attributes['time']) * 1000),
             [window.GlobalWindow()]),
     ]
-
-    mock_pubsub.Client = functools.partial(FakePubsubClient, payloads)
-    mock_pubsub.subscription.AutoAck = FakeAutoAck
+    mock_pubsub.return_value.pull.return_value = pull_response
 
     p = TestPipeline()
     p.options.view_as(StandardOptions).streaming = True
@@ -516,24 +415,26 @@ class TestReadFromPubSub(unittest.TestCase):
                  with_attributes=True, timestamp_attribute='time'))
     assert_that(pcoll, equal_to(expected_elements), reify_windows=True)
     p.run()
+    mock_pubsub.return_value.acknowledge.assert_has_calls([
+        mock.call(mock.ANY, [ack_id])])
 
-  @mock.patch('google.cloud.pubsub')
   def test_read_messages_timestamp_attribute_rfc3339_success(self, mock_pubsub):
     data = 'data'
-    message_id = 'message_id'
     attributes = {'time': '2018-03-12T13:37:01.234567Z'}
-    publish_time = '2018-03-12T13:37:01.234567Z'
-    payloads = [
-        create_client_message(data, message_id, attributes, publish_time)]
+    publish_time_secs = 1337000000
+    publish_time_nanos = 133700000
+    ack_id = 'ack_id'
+    pull_response = test_utils.create_pull_response([
+        test_utils.PullResponseMessage(
+            data, attributes, publish_time_secs, publish_time_nanos, ack_id)
+    ])
     expected_elements = [
         TestWindowedValue(
             PubsubMessage(data, attributes),
             timestamp.Timestamp.from_rfc3339(attributes['time']),
             [window.GlobalWindow()]),
     ]
-
-    mock_pubsub.Client = functools.partial(FakePubsubClient, payloads)
-    mock_pubsub.subscription.AutoAck = FakeAutoAck
+    mock_pubsub.return_value.pull.return_value = pull_response
 
     p = TestPipeline()
     p.options.view_as(StandardOptions).streaming = True
@@ -543,24 +444,27 @@ class TestReadFromPubSub(unittest.TestCase):
                  with_attributes=True, timestamp_attribute='time'))
     assert_that(pcoll, equal_to(expected_elements), reify_windows=True)
     p.run()
+    mock_pubsub.return_value.acknowledge.assert_has_calls([
+        mock.call(mock.ANY, [ack_id])])
 
-  @mock.patch('google.cloud.pubsub')
   def test_read_messages_timestamp_attribute_missing(self, mock_pubsub):
     data = 'data'
-    message_id = 'message_id'
     attributes = {}
+    publish_time_secs = 1520861821
+    publish_time_nanos = 234567000
     publish_time = '2018-03-12T13:37:01.234567Z'
-    payloads = [
-        create_client_message(data, message_id, attributes, publish_time)]
+    ack_id = 'ack_id'
+    pull_response = test_utils.create_pull_response([
+        test_utils.PullResponseMessage(
+            data, attributes, publish_time_secs, publish_time_nanos, ack_id)
+    ])
     expected_elements = [
         TestWindowedValue(
             PubsubMessage(data, attributes),
             timestamp.Timestamp.from_rfc3339(publish_time),
             [window.GlobalWindow()]),
     ]
-
-    mock_pubsub.Client = functools.partial(FakePubsubClient, payloads)
-    mock_pubsub.subscription.AutoAck = FakeAutoAck
+    mock_pubsub.return_value.pull.return_value = pull_response
 
     p = TestPipeline()
     p.options.view_as(StandardOptions).streaming = True
@@ -570,18 +474,20 @@ class TestReadFromPubSub(unittest.TestCase):
                  with_attributes=True, timestamp_attribute='nonexistent'))
     assert_that(pcoll, equal_to(expected_elements), reify_windows=True)
     p.run()
+    mock_pubsub.return_value.acknowledge.assert_has_calls([
+        mock.call(mock.ANY, [ack_id])])
 
-  @mock.patch('google.cloud.pubsub')
   def test_read_messages_timestamp_attribute_fail_parse(self, mock_pubsub):
     data = 'data'
-    message_id = 'message_id'
     attributes = {'time': '1337 unparseable'}
-    publish_time = '2018-03-12T13:37:01.234567Z'
-    payloads = [
-        create_client_message(data, message_id, attributes, publish_time)]
-
-    mock_pubsub.Client = functools.partial(FakePubsubClient, payloads)
-    mock_pubsub.subscription.AutoAck = FakeAutoAck
+    publish_time_secs = 1520861821
+    publish_time_nanos = 234567000
+    ack_id = 'ack_id'
+    pull_response = test_utils.create_pull_response([
+        test_utils.PullResponseMessage(
+            data, attributes, publish_time_secs, publish_time_nanos, ack_id)
+    ])
+    mock_pubsub.return_value.pull.return_value = pull_response
 
     p = TestPipeline()
     p.options.view_as(StandardOptions).streaming = True
@@ -591,20 +497,10 @@ class TestReadFromPubSub(unittest.TestCase):
              with_attributes=True, timestamp_attribute='time'))
     with self.assertRaisesRegexp(ValueError, r'parse'):
       p.run()
+    mock_pubsub.return_value.acknowledge.assert_not_called()
 
-  @mock.patch('google.cloud.pubsub')
-  def test_read_message_id_label_unsupported(self, mock_pubsub):
+  def test_read_message_id_label_unsupported(self, unused_mock_pubsub):
     # id_label is unsupported in DirectRunner.
-    data = 'data'
-    message_id = 'message_id'
-    attributes = {'time': '1337 unparseable'}
-    publish_time = '2018-03-12T13:37:01.234567Z'
-    payloads = [
-        create_client_message(data, message_id, attributes, publish_time)]
-
-    mock_pubsub.Client = functools.partial(FakePubsubClient, payloads)
-    mock_pubsub.subscription.AutoAck = FakeAutoAck
-
     p = TestPipeline()
     p.options.view_as(StandardOptions).streaming = True
     _ = (p | ReadFromPubSub('projects/fakeprj/topics/a_topic', None, 'a_label'))
@@ -614,16 +510,12 @@ class TestReadFromPubSub(unittest.TestCase):
 
 
 @unittest.skipIf(pubsub is None, 'GCP dependencies are not installed')
+@mock.patch('google.cloud.pubsub.PublisherClient')
 class TestWriteToPubSub(unittest.TestCase):
 
-  @mock.patch('google.cloud.pubsub')
   def test_write_messages_success(self, mock_pubsub):
     data = 'data'
     payloads = [data]
-    expected_payloads = [[data, {}]]
-
-    mock_pubsub.Client = functools.partial(FakePubsubClient,
-                                           messages_write=expected_payloads)
 
     p = TestPipeline()
     p.options.view_as(StandardOptions).streaming = True
@@ -632,15 +524,12 @@ class TestWriteToPubSub(unittest.TestCase):
          | WriteToPubSub('projects/fakeprj/topics/a_topic',
                          with_attributes=False))
     p.run()
+    mock_pubsub.return_value.publish.assert_has_calls([
+        mock.call(mock.ANY, data)])
 
-  @mock.patch('google.cloud.pubsub')
   def test_write_messages_deprecated(self, mock_pubsub):
     data = 'data'
     payloads = [data]
-    expected_payloads = [[data, {}]]
-
-    mock_pubsub.Client = functools.partial(FakePubsubClient,
-                                           messages_write=expected_payloads)
 
     p = TestPipeline()
     p.options.view_as(StandardOptions).streaming = True
@@ -648,16 +537,13 @@ class TestWriteToPubSub(unittest.TestCase):
          | Create(payloads)
          | WriteStringsToPubSub('projects/fakeprj/topics/a_topic'))
     p.run()
+    mock_pubsub.return_value.publish.assert_has_calls([
+        mock.call(mock.ANY, data)])
 
-  @mock.patch('google.cloud.pubsub')
   def test_write_messages_with_attributes_success(self, mock_pubsub):
     data = 'data'
     attributes = {'key': 'value'}
     payloads = [PubsubMessage(data, attributes)]
-    expected_payloads = [[data, attributes]]
-
-    mock_pubsub.Client = functools.partial(FakePubsubClient,
-                                           messages_write=expected_payloads)
 
     p = TestPipeline()
     p.options.view_as(StandardOptions).streaming = True
@@ -666,15 +552,14 @@ class TestWriteToPubSub(unittest.TestCase):
          | WriteToPubSub('projects/fakeprj/topics/a_topic',
                          with_attributes=True))
     p.run()
+    mock_pubsub.return_value.publish.assert_has_calls([
+        mock.call(mock.ANY, data, **attributes)])
 
-  @mock.patch('google.cloud.pubsub')
   def test_write_messages_with_attributes_error(self, mock_pubsub):
     data = 'data'
     # Sending raw data when WriteToPubSub expects a PubsubMessage object.
     payloads = [data]
 
-    mock_pubsub.Client = functools.partial(FakePubsubClient)
-
     p = TestPipeline()
     p.options.view_as(StandardOptions).streaming = True
     _ = (p
@@ -685,15 +570,10 @@ class TestWriteToPubSub(unittest.TestCase):
                                  r'str.*has no attribute.*data'):
       p.run()
 
-  @mock.patch('google.cloud.pubsub')
   def test_write_messages_unsupported_features(self, mock_pubsub):
     data = 'data'
     attributes = {'key': 'value'}
     payloads = [PubsubMessage(data, attributes)]
-    expected_payloads = [[data, attributes]]
-
-    mock_pubsub.Client = functools.partial(FakePubsubClient,
-                                           messages_write=expected_payloads)
 
     p = TestPipeline()
     p.options.view_as(StandardOptions).streaming = True
diff --git a/sdks/python/apache_beam/io/gcp/tests/pubsub_matcher.py b/sdks/python/apache_beam/io/gcp/tests/pubsub_matcher.py
index 6217faf..7a0b5c8 100644
--- a/sdks/python/apache_beam/io/gcp/tests/pubsub_matcher.py
+++ b/sdks/python/apache_beam/io/gcp/tests/pubsub_matcher.py
@@ -31,12 +31,10 @@ __all__ = ['PubSubMessageMatcher']
 
 
 # Protect against environments where pubsub library is not available.
-# pylint: disable=wrong-import-order, wrong-import-position
 try:
   from google.cloud import pubsub
 except ImportError:
   pubsub = None
-# pylint: enable=wrong-import-order, wrong-import-position
 
 DEFAULT_TIMEOUT = 5 * 60
 MAX_MESSAGES_IN_ONE_PULL = 50
@@ -49,8 +47,9 @@ class PubSubMessageMatcher(BaseMatcher):
   subscription until all expected messages are shown or timeout.
   """
 
-  def __init__(self, project, sub_name, expected_msg, timeout=DEFAULT_TIMEOUT,
-               with_attributes=False, strip_attributes=None):
+  def __init__(self, project, sub_name, expected_msg,
+               timeout=DEFAULT_TIMEOUT, with_attributes=False,
+               strip_attributes=None):
     """Initialize PubSubMessageMatcher object.
 
     Args:
@@ -59,8 +58,9 @@ class PubSubMessageMatcher(BaseMatcher):
       expected_msg: A string list that contains expected message data pulled
         from the subscription. See also: with_attributes.
       timeout: Timeout in seconds to wait for all expected messages appears.
-      with_attributes: Whether expected_msg is a list of
-        ``PubsubMessage`` objects.
+      with_attributes: If True, will match against both message data and
+        attributes. If True, expected_msg should be a list of ``PubsubMessage``
+        objects. Otherwise, it should be a list of ``bytes``.
       strip_attributes: List of strings. If with_attributes==True, strip the
         attributes keyed by these values from incoming messages.
         If a key is missing, will add an attribute with an error message as
@@ -86,28 +86,26 @@ class PubSubMessageMatcher(BaseMatcher):
 
   def _matches(self, _):
     if self.messages is None:
-      self.messages = self._wait_for_messages(self._get_subscription(),
-                                              len(self.expected_msg),
+      self.messages = self._wait_for_messages(len(self.expected_msg),
                                               self.timeout)
     return Counter(self.messages) == Counter(self.expected_msg)
 
-  def _get_subscription(self):
-    return pubsub.Client(project=self.project).subscription(self.sub_name)
-
-  def _wait_for_messages(self, subscription, expected_num, timeout):
+  def _wait_for_messages(self, expected_num, timeout):
     """Wait for messages from given subscription."""
-    logging.debug('Start pulling messages from %s', subscription.full_name)
     total_messages = []
+
+    sub_client = pubsub.SubscriberClient()
     start_time = time.time()
     while time.time() - start_time <= timeout:
-      pulled = subscription.pull(max_messages=MAX_MESSAGES_IN_ONE_PULL)
-      for ack_id, message in pulled:
-        subscription.acknowledge([ack_id])
+      response = sub_client.pull(self.sub_name,
+                                 max_messages=MAX_MESSAGES_IN_ONE_PULL,
+                                 return_immediately=True)
+      for rm in response.received_messages:
+        msg = PubsubMessage._from_message(rm.message)
         if not self.with_attributes:
-          total_messages.append(message.data)
+          total_messages.append(msg.data)
           continue
 
-        msg = PubsubMessage._from_message(message)
         if self.strip_attributes:
           for attr in self.strip_attributes:
             try:
@@ -117,12 +115,16 @@ class PubSubMessageMatcher(BaseMatcher):
                                       'expected attribute not found.')
         total_messages.append(msg)
 
+      ack_ids = [rm.ack_id for rm in response.received_messages]
+      if ack_ids:
+        sub_client.acknowledge(self.sub_name, ack_ids)
       if len(total_messages) >= expected_num:
-        return total_messages
+        break
       time.sleep(1)
 
-    logging.error('Timeout after %d sec. Received %d messages from %s.',
-                  timeout, len(total_messages), subscription.full_name)
+    if time.time() - start_time > timeout:
+      logging.error('Timeout after %d sec. Received %d messages from %s.',
+                    timeout, len(total_messages), self.sub_name)
     return total_messages
 
   def describe_to(self, description):
diff --git a/sdks/python/apache_beam/io/gcp/tests/pubsub_matcher_test.py b/sdks/python/apache_beam/io/gcp/tests/pubsub_matcher_test.py
index 9047763..1261aa1 100644
--- a/sdks/python/apache_beam/io/gcp/tests/pubsub_matcher_test.py
+++ b/sdks/python/apache_beam/io/gcp/tests/pubsub_matcher_test.py
@@ -28,17 +28,19 @@ from hamcrest import assert_that as hc_assert_that
 
 from apache_beam.io.gcp.pubsub import PubsubMessage
 from apache_beam.io.gcp.tests.pubsub_matcher import PubSubMessageMatcher
+from apache_beam.testing.test_utils import PullResponseMessage
+from apache_beam.testing.test_utils import create_pull_response
 
 # Protect against environments where pubsub library is not available.
-# pylint: disable=wrong-import-order, wrong-import-position
 try:
   from google.cloud import pubsub
 except ImportError:
   pubsub = None
-# pylint: enable=wrong-import-order, wrong-import-position
 
 
 @unittest.skipIf(pubsub is None, 'PubSub dependencies are not installed.')
+@mock.patch('time.sleep', return_value=None)
+@mock.patch('google.cloud.pubsub.SubscriberClient')
 class PubSubMatcherTest(unittest.TestCase):
 
   @classmethod
@@ -55,90 +57,75 @@ class PubSubMatcherTest(unittest.TestCase):
         'mock_project', 'mock_sub_name', ['mock_expected_msg'],
         with_attributes=with_attributes, strip_attributes=strip_attributes)
 
-  @mock.patch('time.sleep', return_value=None)
-  @mock.patch('apache_beam.io.gcp.tests.pubsub_matcher.'
-              'PubSubMessageMatcher._get_subscription')
   def test_message_matcher_success(self, mock_get_sub, unsued_mock):
     self.init_matcher()
     self.pubsub_matcher.expected_msg = ['a', 'b']
     mock_sub = mock_get_sub.return_value
     mock_sub.pull.side_effect = [
-        [(1, pubsub.message.Message(b'a', 'unused_id'))],
-        [(2, pubsub.message.Message(b'b', 'unused_id'))],
+        create_pull_response([PullResponseMessage(b'a', {})]),
+        create_pull_response([PullResponseMessage(b'b', {})]),
     ]
     hc_assert_that(self.mock_presult, self.pubsub_matcher)
     self.assertEqual(mock_sub.pull.call_count, 2)
+    self.assertEqual(mock_sub.acknowledge.call_count, 2)
 
-  @mock.patch('time.sleep', return_value=None)
-  @mock.patch('apache_beam.io.gcp.tests.pubsub_matcher.'
-              'PubSubMessageMatcher._get_subscription')
   def test_message_matcher_attributes_success(self, mock_get_sub, unsued_mock):
     self.init_matcher(with_attributes=True)
     self.pubsub_matcher.expected_msg = [PubsubMessage('a', {'k': 'v'})]
     mock_sub = mock_get_sub.return_value
-    msg_a = pubsub.message.Message(b'a', 'unused_id')
-    msg_a.attributes['k'] = 'v'
-    mock_sub.pull.side_effect = [[(1, msg_a)]]
+    mock_sub.pull.side_effect = [
+        create_pull_response([PullResponseMessage(b'a', {'k': 'v'})])
+    ]
     hc_assert_that(self.mock_presult, self.pubsub_matcher)
     self.assertEqual(mock_sub.pull.call_count, 1)
+    self.assertEqual(mock_sub.acknowledge.call_count, 1)
 
-  @mock.patch('time.sleep', return_value=None)
-  @mock.patch('apache_beam.io.gcp.tests.pubsub_matcher.'
-              'PubSubMessageMatcher._get_subscription')
   def test_message_matcher_attributes_fail(self, mock_get_sub, unsued_mock):
     self.init_matcher(with_attributes=True)
     self.pubsub_matcher.expected_msg = [PubsubMessage('a', {})]
     mock_sub = mock_get_sub.return_value
-    msg_a = pubsub.message.Message(b'a', 'unused_id')
-    msg_a.attributes['k'] = 'v'  # Unexpected.
-    mock_sub.pull.side_effect = [[(1, msg_a)]]
+    # Unexpected attribute 'k'.
+    mock_sub.pull.side_effect = [
+        create_pull_response([PullResponseMessage(b'a', {'k': 'v'})])
+    ]
     with self.assertRaisesRegexp(AssertionError, r'Unexpected'):
       hc_assert_that(self.mock_presult, self.pubsub_matcher)
     self.assertEqual(mock_sub.pull.call_count, 1)
+    self.assertEqual(mock_sub.acknowledge.call_count, 1)
 
-  @mock.patch('time.sleep', return_value=None)
-  @mock.patch('apache_beam.io.gcp.tests.pubsub_matcher.'
-              'PubSubMessageMatcher._get_subscription')
   def test_message_matcher_strip_success(self, mock_get_sub, unsued_mock):
     self.init_matcher(with_attributes=True,
                       strip_attributes=['id', 'timestamp'])
     self.pubsub_matcher.expected_msg = [PubsubMessage('a', {'k': 'v'})]
     mock_sub = mock_get_sub.return_value
-    msg_a = pubsub.message.Message(b'a', 'unused_id')
-    msg_a.attributes['id'] = 'foo'
-    msg_a.attributes['timestamp'] = 'bar'
-    msg_a.attributes['k'] = 'v'
-    mock_sub.pull.side_effect = [[(1, msg_a)]]
+    mock_sub.pull.side_effect = [create_pull_response([
+        PullResponseMessage(b'a', {'id': 'foo', 'timestamp': 'bar', 'k': 'v'})
+    ])]
     hc_assert_that(self.mock_presult, self.pubsub_matcher)
     self.assertEqual(mock_sub.pull.call_count, 1)
+    self.assertEqual(mock_sub.acknowledge.call_count, 1)
 
-  @mock.patch('time.sleep', return_value=None)
-  @mock.patch('apache_beam.io.gcp.tests.pubsub_matcher.'
-              'PubSubMessageMatcher._get_subscription')
   def test_message_matcher_strip_fail(self, mock_get_sub, unsued_mock):
     self.init_matcher(with_attributes=True,
                       strip_attributes=['id', 'timestamp'])
     self.pubsub_matcher.expected_msg = [PubsubMessage('a', {'k': 'v'})]
     mock_sub = mock_get_sub.return_value
-    # msg_a is missing attribute 'timestamp'.
-    msg_a = pubsub.message.Message(b'a', 'unused_id')
-    msg_a.attributes['id'] = 'foo'
-    msg_a.attributes['k'] = 'v'
-    mock_sub.pull.side_effect = [[(1, msg_a)]]
+    # Message is missing attribute 'timestamp'.
+    mock_sub.pull.side_effect = [create_pull_response([
+        PullResponseMessage(b'a', {'id': 'foo', 'k': 'v'})
+    ])]
     with self.assertRaisesRegexp(AssertionError, r'Stripped attributes'):
       hc_assert_that(self.mock_presult, self.pubsub_matcher)
     self.assertEqual(mock_sub.pull.call_count, 1)
+    self.assertEqual(mock_sub.acknowledge.call_count, 1)
 
-  @mock.patch('time.sleep', return_value=None)
-  @mock.patch('apache_beam.io.gcp.tests.pubsub_matcher.'
-              'PubSubMessageMatcher._get_subscription')
   def test_message_matcher_mismatch(self, mock_get_sub, unused_mock):
     self.init_matcher()
     self.pubsub_matcher.expected_msg = ['a']
     mock_sub = mock_get_sub.return_value
-    mock_sub.pull.return_value = [
-        (1, pubsub.message.Message(b'c', 'unused_id')),
-        (1, pubsub.message.Message(b'd', 'unused_id')),
+    mock_sub.pull.side_effect = [
+        create_pull_response([PullResponseMessage(b'c', {}),
+                              PullResponseMessage(b'd', {})]),
     ]
     with self.assertRaises(AssertionError) as error:
       hc_assert_that(self.mock_presult, self.pubsub_matcher)
@@ -147,10 +134,9 @@ class PubSubMatcherTest(unittest.TestCase):
     self.assertTrue(
         '\nExpected: Expected 1 messages.\n     but: Got 2 messages.'
         in str(error.exception.args[0]))
+    self.assertEqual(mock_sub.pull.call_count, 1)
+    self.assertEqual(mock_sub.acknowledge.call_count, 1)
 
-  @mock.patch('time.sleep', return_value=None)
-  @mock.patch('apache_beam.io.gcp.tests.pubsub_matcher.'
-              'PubSubMessageMatcher._get_subscription')
   def test_message_matcher_timeout(self, mock_get_sub, unused_mock):
     self.init_matcher()
     mock_sub = mock_get_sub.return_value
@@ -159,6 +145,7 @@ class PubSubMatcherTest(unittest.TestCase):
     with self.assertRaisesRegexp(AssertionError, r'Expected 1.*\n.*Got 0'):
       hc_assert_that(self.mock_presult, self.pubsub_matcher)
     self.assertTrue(mock_sub.pull.called)
+    self.assertEqual(mock_sub.acknowledge.call_count, 0)
 
 
 if __name__ == '__main__':
diff --git a/sdks/python/apache_beam/runners/direct/direct_runner.py b/sdks/python/apache_beam/runners/direct/direct_runner.py
index 00e37f3..d410992 100644
--- a/sdks/python/apache_beam/runners/direct/direct_runner.py
+++ b/sdks/python/apache_beam/runners/direct/direct_runner.py
@@ -25,6 +25,7 @@ from __future__ import absolute_import
 
 import itertools
 import logging
+import time
 
 from google.protobuf import wrappers_pb2
 
@@ -264,11 +265,12 @@ class _DirectReadFromPubSub(PTransform):
 
 
 class _DirectWriteToPubSubFn(DoFn):
-  _topic = None
+  BUFFER_SIZE_ELEMENTS = 100
+  FLUSH_TIMEOUT_SECS = BUFFER_SIZE_ELEMENTS * 0.5
 
   def __init__(self, sink):
     self.project = sink.project
-    self.topic_name = sink.topic_name
+    self.short_topic_name = sink.topic_name
     self.id_label = sink.id_label
     self.timestamp_attribute = sink.timestamp_attribute
     self.with_attributes = sink.with_attributes
@@ -282,30 +284,33 @@ class _DirectWriteToPubSubFn(DoFn):
                                 'supported for PubSub writes')
 
   def start_bundle(self):
-    from google.cloud import pubsub
-
-    if self._topic is None:
-      self._topic = pubsub.Client(project=self.project).topic(
-          self.topic_name)
     self._buffer = []
 
   def process(self, elem):
     self._buffer.append(elem)
-    if len(self._buffer) >= 100:
+    if len(self._buffer) >= self.BUFFER_SIZE_ELEMENTS:
       self._flush()
 
   def finish_bundle(self):
     self._flush()
 
   def _flush(self):
-    if self._buffer:
-      with self._topic.batch() as batch:
-        for elem in self._buffer:
-          if self.with_attributes:
-            batch.publish(elem.data, **elem.attributes)
-          else:
-            batch.publish(elem)
-      self._buffer = []
+    from google.cloud import pubsub
+    pub_client = pubsub.PublisherClient()
+    topic = pub_client.topic_path(self.project, self.short_topic_name)
+
+    if self.with_attributes:
+      futures = [pub_client.publish(topic, elem.data, **elem.attributes)
+                 for elem in self._buffer]
+    else:
+      futures = [pub_client.publish(topic, elem)
+                 for elem in self._buffer]
+
+    timer_start = time.time()
+    for future in futures:
+      remaining = self.FLUSH_TIMEOUT_SECS - (time.time() - timer_start)
+      future.result(remaining)
+    self._buffer = []
 
 
 def _get_pubsub_transform_overrides(pipeline_options):
diff --git a/sdks/python/apache_beam/runners/direct/test_direct_runner.py b/sdks/python/apache_beam/runners/direct/test_direct_runner.py
index 8facca8..23dfeab 100644
--- a/sdks/python/apache_beam/runners/direct/test_direct_runner.py
+++ b/sdks/python/apache_beam/runners/direct/test_direct_runner.py
@@ -52,5 +52,6 @@ class TestDirectRunner(DirectRunner):
     finally:
       if not PipelineState.is_terminal(self.result.state):
         self.result.cancel()
+        self.result.wait_until_finish()
 
     return self.result
diff --git a/sdks/python/apache_beam/runners/direct/transform_evaluator.py b/sdks/python/apache_beam/runners/direct/transform_evaluator.py
index ef12e2c..fad0704 100644
--- a/sdks/python/apache_beam/runners/direct/transform_evaluator.py
+++ b/sdks/python/apache_beam/runners/direct/transform_evaluator.py
@@ -377,20 +377,44 @@ class _TestStreamEvaluator(_TransformEvaluator):
 
 
 class _PubSubSubscriptionWrapper(object):
-  """Wrapper for garbage-collecting temporary PubSub subscriptions."""
+  """Wrapper for managing temporary PubSub subscriptions."""
 
-  def __init__(self, subscription, should_cleanup):
-    self.subscription = subscription
-    self.should_cleanup = should_cleanup
+  def __init__(self, project, short_topic_name, short_sub_name):
+    """Initialize subscription wrapper.
+
+    If sub_name is None, will create a temporary subscription to topic_name.
+
+    Args:
+      project: GCP project name for topic and subscription. May be None.
+        Required if sub_name is None.
+      short_topic_name: Valid topic name without
+        'projects/{project}/topics/' prefix. May be None.
+        Required if sub_name is None.
+      short_sub_name: Valid subscription name without
+        'projects/{project}/subscriptions/' prefix. May be None.
+    """
+    from google.cloud import pubsub
+    self.sub_client = pubsub.SubscriberClient()
+
+    if short_sub_name is None:
+      self.sub_name = self.sub_client.subscription_path(
+          project, 'beam_%d_%x' % (int(time.time()), random.randrange(1 << 32)))
+      topic_name = self.sub_client.topic_path(project, short_topic_name)
+      self.sub_client.create_subscription(self.sub_name, topic_name)
+      self._should_cleanup = True
+    else:
+      self.sub_name = self.sub_client.subscription_path(project, short_sub_name)
+      self._should_cleanup = False
 
   def __del__(self):
-    if self.should_cleanup:
-      self.subscription.delete()
+    if self._should_cleanup:
+      self.sub_client.delete_subscription(self.sub_name)
 
 
 class _PubSubReadEvaluator(_TransformEvaluator):
   """TransformEvaluator for PubSub read."""
 
+  # A mapping of transform to _PubSubSubscriptionWrapper.
   _subscription_cache = {}
 
   def __init__(self, evaluation_context, applied_ptransform,
@@ -404,26 +428,16 @@ class _PubSubReadEvaluator(_TransformEvaluator):
     if self.source.id_label:
       raise NotImplementedError(
           'DirectRunner: id_label is not supported for PubSub reads')
-    self._subscription = _PubSubReadEvaluator.get_subscription(
+    self._sub_name = _PubSubReadEvaluator.get_subscription(
         self._applied_ptransform, self.source.project, self.source.topic_name,
         self.source.subscription_name)
 
   @classmethod
-  def get_subscription(cls, transform, project, topic, subscription_name):
+  def get_subscription(cls, transform, project, topic, short_sub_name):
     if transform not in cls._subscription_cache:
-      from google.cloud import pubsub
-      should_create = not subscription_name
-      if should_create:
-        subscription_name = 'beam_%d_%x' % (
-            int(time.time()), random.randrange(1 << 32))
-      wrapper = _PubSubSubscriptionWrapper(
-          pubsub.Client(project=project).topic(topic).subscription(
-              subscription_name),
-          should_create)
-      if should_create:
-        wrapper.subscription.create()
+      wrapper = _PubSubSubscriptionWrapper(project, topic, short_sub_name)
       cls._subscription_cache[transform] = wrapper
-    return cls._subscription_cache[transform].subscription
+    return cls._subscription_cache[transform].sub_name
 
   def start_bundle(self):
     pass
@@ -438,28 +452,34 @@ class _PubSubReadEvaluator(_TransformEvaluator):
     # evaluator fails with an exception before emitting a bundle. However,
     # the DirectRunner currently doesn't retry work items anyway, so the
     # pipeline would enter an inconsistent state on any error.
-    with pubsub.subscription.AutoAck(
-        self._subscription, return_immediately=True,
-        max_messages=10) as results:
-      def _get_element(message):
-        parsed_message = PubsubMessage._from_message(message)
-        if (timestamp_attribute and
-            timestamp_attribute in parsed_message.attributes):
-          rfc3339_or_milli = parsed_message.attributes[timestamp_attribute]
+    sub_client = pubsub.SubscriberClient()
+    response = sub_client.pull(self._sub_name, max_messages=10,
+                               return_immediately=True)
+
+    def _get_element(message):
+      parsed_message = PubsubMessage._from_message(message)
+      if (timestamp_attribute and
+          timestamp_attribute in parsed_message.attributes):
+        rfc3339_or_milli = parsed_message.attributes[timestamp_attribute]
+        try:
+          timestamp = Timestamp.from_rfc3339(rfc3339_or_milli)
+        except ValueError:
           try:
-            timestamp = Timestamp.from_rfc3339(rfc3339_or_milli)
-          except ValueError:
-            try:
-              timestamp = Timestamp(micros=int(rfc3339_or_milli) * 1000)
-            except ValueError as e:
-              raise ValueError('Bad timestamp value: %s' % e)
-        else:
-          timestamp = Timestamp.from_rfc3339(message.service_timestamp)
+            timestamp = Timestamp(micros=int(rfc3339_or_milli) * 1000)
+          except ValueError as e:
+            raise ValueError('Bad timestamp value: %s' % e)
+      else:
+        timestamp = Timestamp(message.publish_time.seconds,
+                              message.publish_time.nanos // 1000)
+
+      return timestamp, parsed_message
 
-        return timestamp, parsed_message
+    results = [_get_element(rm.message) for rm in response.received_messages]
+    ack_ids = [rm.ack_id for rm in response.received_messages]
+    if ack_ids:
+      sub_client.acknowledge(self._sub_name, ack_ids)
 
-      return [_get_element(message)
-              for unused_ack_id, message in iteritems(results)]
+    return results
 
   def finish_bundle(self):
     data = self._read_from_pubsub(self.source.timestamp_attribute)
diff --git a/sdks/python/apache_beam/testing/test_utils.py b/sdks/python/apache_beam/testing/test_utils.py
index 490d079..1f0e99e 100644
--- a/sdks/python/apache_beam/testing/test_utils.py
+++ b/sdks/python/apache_beam/testing/test_utils.py
@@ -24,11 +24,9 @@ from __future__ import absolute_import
 
 import hashlib
 import imp
-import logging
 import os
 import shutil
 import tempfile
-import time
 from builtins import object
 
 from mock import Mock
@@ -136,46 +134,61 @@ def delete_files(file_paths):
   FileSystems.delete(file_paths)
 
 
-def wait_for_subscriptions_created(subs, timeout=60):
-  """Wait for all PubSub subscriptions are created."""
-  return _wait_until_all_exist(subs, timeout)
-
+def cleanup_subscriptions(sub_client, subs):
+  """Cleanup PubSub subscriptions if exist."""
+  for sub in subs:
+    sub_client.delete_subscription(sub.name)
 
-def wait_for_topics_created(topics, timeout=60):
-  """Wait for all PubSub topics are created."""
-  return _wait_until_all_exist(topics, timeout)
 
+def cleanup_topics(pub_client, topics):
+  """Cleanup PubSub topics if exist."""
+  for topic in topics:
+    pub_client.delete_topic(topic.name)
 
-def _wait_until_all_exist(components, timeout):
-  unchecked_components = set(components)
-  start_time = time.time()
-  while time.time() - start_time <= timeout:
-    unchecked_components = set(
-        [c for c in unchecked_components if not c.exists()])
-    if len(unchecked_components) == 0:
-      return True
-    time.sleep(2)
 
-  raise RuntimeError(
-      'Timeout after %d seconds. %d of %d topics/subscriptions not exist. '
-      'They are %s.' % (timeout, len(unchecked_components),
-                        len(components), list(unchecked_components)))
+class PullResponseMessage(object):
+  """Data representing a pull request response.
 
+  Utility class for ``create_pull_response``.
+  """
+  def __init__(self, data, attributes=None,
+               publish_time_secs=None, publish_time_nanos=None, ack_id=None):
+    self.data = data
+    self.attributes = attributes
+    self.publish_time_secs = publish_time_secs
+    self.publish_time_nanos = publish_time_nanos
+    self.ack_id = ack_id
 
-def cleanup_subscriptions(subs):
-  """Cleanup PubSub subscriptions if exist."""
-  _cleanup_pubsub(subs)
 
+def create_pull_response(responses):
+  """Create an instance of ``google.cloud.pubsub.types.ReceivedMessage``.
 
-def cleanup_topics(topics):
-  """Cleanup PubSub topics if exist."""
-  _cleanup_pubsub(topics)
+  Used to simulate the response from pubsub.SubscriberClient().pull().
 
+  Args:
+    responses: list of ``PullResponseMessage``
 
-def _cleanup_pubsub(components):
-  for c in components:
-    if c.exists():
-      c.delete()
-    else:
-      logging.debug('Cannot delete topic/subscription. %s does not exist.',
-                    c.full_name)
+  Returns:
+    An instance of ``google.cloud.pubsub.types.PullResponse`` populated with
+    responses.
+  """
+  from google.cloud import pubsub
+
+  res = pubsub.types.PullResponse()
+  for response in responses:
+    received_message = res.received_messages.add()
+
+    message = received_message.message
+    message.data = response.data
+    if response.attributes is not None:
+      for k, v in response.attributes.items():
+        message.attributes[k] = v
+    if response.publish_time_secs is not None:
+      message.publish_time.seconds = response.publish_time_secs
+    if response.publish_time_nanos is not None:
+      message.publish_time.nanos = response.publish_time_nanos
+
+    if response.ack_id is not None:
+      received_message.ack_id = response.ack_id
+
+  return res
diff --git a/sdks/python/apache_beam/testing/test_utils_test.py b/sdks/python/apache_beam/testing/test_utils_test.py
index bef4078..2b16c30 100644
--- a/sdks/python/apache_beam/testing/test_utils_test.py
+++ b/sdks/python/apache_beam/testing/test_utils_test.py
@@ -82,56 +82,19 @@ class TestUtilsTest(unittest.TestCase):
         self.assertEqual(f.readline(), b'line2\n')
         self.assertEqual(f.readline(), b'line3\n')
 
-  @mock.patch('time.sleep', return_value=None)
-  def test_wait_for_subscriptions_created_fails(self, patched_time_sleep):
-    sub1 = mock.MagicMock()
-    sub1.exists.return_value = True
-    sub2 = mock.MagicMock()
-    sub2.exists.return_value = False
-    with self.assertRaises(RuntimeError) as error:
-      utils.wait_for_subscriptions_created([sub1, sub2], timeout=0.1)
-    self.assertTrue(sub1.exists.called)
-    self.assertTrue(sub2.exists.called)
-    self.assertTrue(error.exception.args[0].startswith('Timeout after'))
-
-  @mock.patch('time.sleep', return_value=None)
-  def test_wait_for_topics_created_fails(self, patched_time_sleep):
-    topic1 = mock.MagicMock()
-    topic1.exists.return_value = True
-    topic2 = mock.MagicMock()
-    topic2.exists.return_value = False
-    with self.assertRaises(RuntimeError) as error:
-      utils.wait_for_subscriptions_created([topic1, topic2], timeout=0.1)
-    self.assertTrue(topic1.exists.called)
-    self.assertTrue(topic2.exists.called)
-    self.assertTrue(error.exception.args[0].startswith('Timeout after'))
-
-  @mock.patch('time.sleep', return_value=None)
-  def test_wait_for_subscriptions_created_succeeds(self, patched_time_sleep):
-    sub1 = mock.MagicMock()
-    sub1.exists.return_value = True
-    self.assertTrue(
-        utils.wait_for_subscriptions_created([sub1], timeout=0.1))
-
-  @mock.patch('time.sleep', return_value=None)
-  def test_wait_for_topics_created_succeeds(self, patched_time_sleep):
-    topic1 = mock.MagicMock()
-    topic1.exists.return_value = True
-    self.assertTrue(
-        utils.wait_for_subscriptions_created([topic1], timeout=0.1))
-    self.assertTrue(topic1.exists.called)
-
   def test_cleanup_subscriptions(self):
-    mock_sub = mock.MagicMock()
-    mock_sub.exist.return_value = True
-    utils.cleanup_subscriptions([mock_sub])
-    self.assertTrue(mock_sub.delete.called)
+    sub_client = mock.Mock()
+    sub = mock.Mock()
+    sub.name = 'test_sub'
+    utils.cleanup_subscriptions(sub_client, [sub])
+    sub_client.delete_subscription.assert_called_with(sub.name)
 
   def test_cleanup_topics(self):
-    mock_topics = mock.MagicMock()
-    mock_topics.exist.return_value = True
-    utils.cleanup_subscriptions([mock_topics])
-    self.assertTrue(mock_topics.delete.called)
+    pub_client = mock.Mock()
+    topic = mock.Mock()
+    topic.name = 'test_topic'
+    utils.cleanup_topics(pub_client, [topic])
+    pub_client.delete_topic.assert_called_with(topic.name)
 
 
 if __name__ == '__main__':
diff --git a/sdks/python/container/base_image_requirements.txt b/sdks/python/container/base_image_requirements.txt
index a32bc44..8e352a6 100644
--- a/sdks/python/container/base_image_requirements.txt
+++ b/sdks/python/container/base_image_requirements.txt
@@ -47,10 +47,9 @@ nose==1.3.7
 # GCP extra features
 google-apitools==0.5.20
 googledatastore==7.0.1
-google-cloud-pubsub==0.26.0
+google-cloud-pubsub==0.35.4
 google-cloud-bigquery==0.25.0
 proto-google-cloud-datastore-v1==0.90.4
-proto-google-cloud-pubsub-v1==0.15.4
 
 # Optional packages
 cython==0.28.1
diff --git a/sdks/python/setup.py b/sdks/python/setup.py
index 1047167..a3db790 100644
--- a/sdks/python/setup.py
+++ b/sdks/python/setup.py
@@ -141,8 +141,7 @@ GCP_REQUIREMENTS = [
     'google-apitools>=0.5.18,<=0.5.20',
     'proto-google-cloud-datastore-v1>=0.90.0,<=0.90.4',
     'googledatastore==7.0.1; python_version < "3.0"',
-    'google-cloud-pubsub==0.26.0',
-    'proto-google-cloud-pubsub-v1==0.15.4',
+    'google-cloud-pubsub==0.35.4',
     # GCP packages required by tests
     'google-cloud-bigquery==0.25.0',
 ]