You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ch...@apache.org on 2018/03/27 22:43:27 UTC

[beam] branch master updated: [BEAM-3744] Expand Pubsub read API for Python. (#4901)

This is an automated email from the ASF dual-hosted git repository.

chamikara pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 8d854d4  [BEAM-3744] Expand Pubsub read API for Python. (#4901)
8d854d4 is described below

commit 8d854d4ce0365a8e201b388618d7732f000c65b9
Author: Udi Meiri (Ehud) <ud...@users.noreply.github.com>
AuthorDate: Tue Mar 27 15:43:23 2018 -0700

    [BEAM-3744] Expand Pubsub read API for Python. (#4901)
    
    * Expand Pubsub read API for Python.
    
    New features:
    - Timestamp attributes (user attribute containing message timestamp)
      - In either milliseconds from epoch or RFC 3339.
    - Label IDs (user attribute used as a unique message ID)
    - Raw payloads
    - Source API can return either:
      - Payloads (raw or decoded UTF-8)
      - PubSubMessage type, encapsulating message payload and attributes
    
    * Address review comments.
    
    * Move to a single read PTransform: ReadFromPubSub
    
    ReadStringsFromPubSub will be removed in the future.
---
 .../apache_beam/examples/windowed_wordcount.py     |   4 +-
 sdks/python/apache_beam/io/gcp/pubsub.py           | 202 +++++++--------------
 sdks/python/apache_beam/io/gcp/pubsub_test.py      | 181 +++++++++++++++---
 .../runners/dataflow/dataflow_runner.py            |   5 +-
 .../apache_beam/runners/dataflow/internal/names.py |   5 +-
 .../apache_beam/runners/direct/direct_runner.py    |   2 +-
 .../runners/direct/transform_evaluator.py          |  37 +++-
 sdks/python/apache_beam/utils/timestamp.py         |  61 ++++++-
 sdks/python/apache_beam/utils/timestamp_test.py    |  35 +++-
 sdks/python/setup.py                               |   1 +
 10 files changed, 351 insertions(+), 182 deletions(-)

diff --git a/sdks/python/apache_beam/examples/windowed_wordcount.py b/sdks/python/apache_beam/examples/windowed_wordcount.py
index 3838408..987b660 100644
--- a/sdks/python/apache_beam/examples/windowed_wordcount.py
+++ b/sdks/python/apache_beam/examples/windowed_wordcount.py
@@ -67,10 +67,10 @@ def run(argv=None):
 
   with beam.Pipeline(argv=pipeline_args) as p:
 
-    # Read the text from PubSub messages
+    # Read the text from PubSub messages.
     lines = p | beam.io.ReadStringsFromPubSub(known_args.input_topic)
 
-    # Capitalize the characters in each line.
+    # Get the number of appearances of a word.
     def count_ones(word_ones):
       (word, ones) = word_ones
       return (word, sum(ones))
diff --git a/sdks/python/apache_beam/io/gcp/pubsub.py b/sdks/python/apache_beam/io/gcp/pubsub.py
index d5afee9..f5ca17e 100644
--- a/sdks/python/apache_beam/io/gcp/pubsub.py
+++ b/sdks/python/apache_beam/io/gcp/pubsub.py
@@ -20,6 +20,14 @@ Cloud Pub/Sub sources and sinks are currently supported only in streaming
 pipelines, during remote execution.
 
 This API is currently under development and is subject to change.
+
+Description of common arguments used in this module:
+  topic: Cloud Pub/Sub topic in the form "projects/<project>/topics/<topic>".
+    If provided, subscription must be None.
+  subscription: Existing Cloud Pub/Sub subscription to use in the
+    form "projects/<project>/subscriptions/<subscription>". If not specified,
+    a temporary subscription will be created from the specified topic. If
+    provided, topic must be None.
 """
 
 from __future__ import absolute_import
@@ -34,8 +42,6 @@ from apache_beam.io.iobase import Write
 from apache_beam.runners.dataflow.native_io import iobase as dataflow_io
 from apache_beam.transforms import Map
 from apache_beam.transforms import PTransform
-from apache_beam.transforms import core
-from apache_beam.transforms import window
 from apache_beam.transforms.display import DisplayDataItem
 
 try:
@@ -44,7 +50,7 @@ except ImportError:
   pubsub_pb2 = None
 
 
-__all__ = ['PubsubMessage', 'ReadMessagesFromPubSub', 'ReadStringsFromPubSub',
+__all__ = ['PubsubMessage', 'ReadFromPubSub', 'ReadStringsFromPubSub',
            'WriteStringsToPubSub']
 
 
@@ -52,38 +58,27 @@ class PubsubMessage(object):
   """Represents a message from Cloud Pub/Sub.
 
   This interface is experimental. No backwards compatibility guarantees.
+
+  Attributes:
+    payload: (str) Message payload, as a byte string.
+    attributes: (dict) Map of string to string.
   """
 
-  def __init__(self, payload, message_id, attributes, publish_time):
+  def __init__(self, payload, attributes):
     """Constructs a message.
 
-    This interface is experimental. No backwards compatibility guarantees.
-
-    See also:
-      https://cloud.google.com/pubsub/docs/reference/rest/v1/PubsubMessage
-
-    Attributes:
-      payload: (str) Message payload, as a byte string.
-      message_id: (str) Server assigned message ID. Unique within a topic.
-      attributes: (dict) Map of string to string.
-      publish_time: (str) Server assigned timestamp of when the message was
-        published.
+    Beam users should not directly construct ``PubsubMessages``.
     """
     self.payload = payload
-    self.message_id = message_id
     self.attributes = attributes
-    self.publish_time = publish_time
 
   def __eq__(self, other):
     return isinstance(other, PubsubMessage) and (
         self.payload == other.payload and
-        self.message_id == other.message_id and
-        self.attributes == other.attributes and
-        self.publish_time == other.publish_time)
+        self.attributes == other.attributes)
 
   def __repr__(self):
-    return 'PubsubMessage(%s, %s, %s, %s)' % (
-        self.payload, self.message_id, self.attributes, self.publish_time)
+    return 'PubsubMessage(%s, %s)' % (self.payload, self.attributes)
 
   @staticmethod
   def _from_proto(proto_msg):
@@ -95,8 +90,7 @@ class PubsubMessage(object):
     msg.ParseFromString(proto_msg)
     # Convert ScalarMapContainer to dict.
     attributes = dict((key, msg.attributes[key]) for key in msg.attributes)
-    return PubsubMessage(msg.data, msg.message_id,
-                         attributes, msg.publish_time)
+    return PubsubMessage(msg.data, attributes)
 
   @staticmethod
   def _from_message(msg):
@@ -106,130 +100,82 @@ class PubsubMessage(object):
     """
     # Convert ScalarMapContainer to dict.
     attributes = dict((key, msg.attributes[key]) for key in msg.attributes)
-    return PubsubMessage(msg.data, msg.message_id,
-                         attributes, msg.service_timestamp)
-
+    return PubsubMessage(msg.data, attributes)
 
-class ReadMessagesFromPubSub(PTransform):
-  """A ``PTransform`` for reading from Cloud Pub/Sub.
-
-  This interface is experimental. No backwards compatibility guarantees.
 
-  Outputs elements of type :class:`~PubsubMessage`.
-  """
+class ReadFromPubSub(PTransform):
+  """A ``PTransform`` for reading from Cloud Pub/Sub."""
+  # Implementation note: This ``PTransform`` is overridden by Directrunner.
 
-  def __init__(self, topic=None, subscription=None, id_label=None):
-    """Initializes ``ReadMessagesFromPubSub``.
+  def __init__(self, topic=None, subscription=None, id_label=None,
+               with_attributes=False, timestamp_attribute=None):
+    """Initializes ``ReadFromPubSub``.
 
     Args:
-      topic: Cloud Pub/Sub topic in the form "projects/<project>/topics/
-        <topic>". If provided, subscription must be None.
-      subscription: Existing Cloud Pub/Sub subscription to use in the
-        form "projects/<project>/subscriptions/<subscription>". If not
-        specified, a temporary subscription will be created from the specified
-        topic. If provided, topic must be None.
       id_label: The attribute on incoming Pub/Sub messages to use as a unique
-        record identifier.  When specified, the value of this attribute (which
+        record identifier. When specified, the value of this attribute (which
         can be any string that uniquely identifies the record) will be used for
-        deduplication of messages.  If not provided, we cannot guarantee
+        deduplication of messages. If not provided, we cannot guarantee
         that no duplicate data will be delivered on the Pub/Sub stream. In this
         case, deduplication of the stream will be strictly best effort.
+      with_attributes:
+        True - output elements will be :class:`~PubsubMessage` objects.
+        False - output elements will be of type ``str`` (message payload only).
+      timestamp_attribute: Message value to use as element timestamp. If None,
+        uses message publishing time as the timestamp.
+        Note that this argument doesn't require with_attributes=True.
+
+        Timestamp values should be in one of two formats:
+
+        - A numerical value representing the number of milliseconds since the
+          Unix epoch.
+        - A string in RFC 3339 format, UTC timezone. Example:
+          ``2015-10-29T23:41:41.123Z``. The sub-second component of the
+          timestamp is optional, and digits beyond the first three (i.e., time
+          units smaller than milliseconds) may be ignored.
     """
-    super(ReadMessagesFromPubSub, self).__init__()
-    self.topic = topic
-    self.subscription = subscription
-    self.id_label = id_label
-
-  def get_windowing(self, unused_inputs):
-    return core.Windowing(window.GlobalWindows())
+    super(ReadFromPubSub, self).__init__()
+    self.with_attributes = with_attributes
+    self._source = _PubSubSource(
+        topic=topic,
+        subscription=subscription,
+        id_label=id_label,
+        with_attributes=with_attributes,
+        timestamp_attribute=timestamp_attribute)
 
-  def expand(self, pcoll):
-    p = (pcoll.pipeline
-         | _ReadFromPubSub(self.topic, self.subscription, self.id_label,
-                           with_attributes=True))
-    return p
+  def expand(self, pvalue):
+    pcoll = pvalue.pipeline | Read(self._source)
+    if self.with_attributes:
+      pcoll = pcoll | Map(PubsubMessage._from_proto)
+      pcoll.element_type = PubsubMessage
+    else:
+      pcoll.element_type = bytes
+    return pcoll
 
 
 class ReadStringsFromPubSub(PTransform):
   """A ``PTransform`` for reading utf-8 string payloads from Cloud Pub/Sub.
 
   Outputs elements of type ``unicode``, decoded from UTF-8.
+
+  This class is deprecated.
   """
 
   def __init__(self, topic=None, subscription=None, id_label=None):
-    """Initializes ``ReadStringsFromPubSub``.
-
-    Args:
-      topic: Cloud Pub/Sub topic in the form "projects/<project>/topics/
-        <topic>". If provided, subscription must be None.
-      subscription: Existing Cloud Pub/Sub subscription to use in the
-        form "projects/<project>/subscriptions/<subscription>". If not
-        specified, a temporary subscription will be created from the specified
-        topic. If provided, topic must be None.
-      id_label: The attribute on incoming Pub/Sub messages to use as a unique
-        record identifier.  When specified, the value of this attribute (which
-        can be any string that uniquely identifies the record) will be used for
-        deduplication of messages.  If not provided, we cannot guarantee
-        that no duplicate data will be delivered on the Pub/Sub stream. In this
-        case, deduplication of the stream will be strictly best effort.
-    """
     super(ReadStringsFromPubSub, self).__init__()
     self.topic = topic
     self.subscription = subscription
     self.id_label = id_label
 
-  def get_windowing(self, unused_inputs):
-    return core.Windowing(window.GlobalWindows())
-
-  def expand(self, pcoll):
-    p = (pcoll.pipeline
-         | _ReadFromPubSub(self.topic, self.subscription, self.id_label,
-                           with_attributes=False)
+  def expand(self, pvalue):
+    p = (pvalue.pipeline
+         | ReadFromPubSub(self.topic, self.subscription, self.id_label,
+                          with_attributes=False)
          | 'DecodeString' >> Map(lambda b: b.decode('utf-8')))
     p.element_type = text_type
     return p
 
 
-class _ReadFromPubSub(PTransform):
-  """A ``PTransform`` for reading from Cloud Pub/Sub."""
-
-  def __init__(self, topic, subscription, id_label, with_attributes):
-    """Initializes ``_ReadFromPubSub``.
-
-    Args:
-      topic: Cloud Pub/Sub topic in the form "projects/<project>/topics/
-        <topic>". If provided, subscription must be None.
-      subscription: Existing Cloud Pub/Sub subscription to use in the
-        form "projects/<project>/subscriptions/<subscription>". If not
-        specified, a temporary subscription will be created from the specified
-        topic. If provided, topic must be None.
-      id_label: The attribute on incoming Pub/Sub messages to use as a unique
-        record identifier.  When specified, the value of this attribute (which
-        can be any string that uniquely identifies the record) will be used for
-        deduplication of messages.  If None, we cannot guarantee
-        that no duplicate data will be delivered on the Pub/Sub stream. In this
-        case, deduplication of the stream will be strictly best effort.
-      with_attributes: False - output elements will be raw payload bytes.
-        True - output will be :class:`~PubsubMessage` objects.
-    """
-    super(_ReadFromPubSub, self).__init__()
-    self.with_attributes = with_attributes
-    self._source = _PubSubSource(
-        topic,
-        subscription=subscription,
-        id_label=id_label,
-        with_attributes=with_attributes)
-
-  def expand(self, pvalue):
-    pcoll = pvalue.pipeline | Read(self._source)
-    if self.with_attributes:
-      pcoll = pcoll | Map(PubsubMessage._from_proto)
-      pcoll.element_type = PubsubMessage
-    else:
-      pcoll.element_type = bytes
-    return pcoll
-
-
 class WriteStringsToPubSub(PTransform):
   """A ``PTransform`` for writing utf-8 string payloads to Cloud Pub/Sub."""
 
@@ -280,25 +226,15 @@ def parse_subscription(full_subscription):
 class _PubSubSource(dataflow_io.NativeSource):
   """Source for the payload of a message as bytes from a Cloud Pub/Sub topic.
 
+  This ``NativeSource`` is overridden by a native Pubsub implementation.
+
   Attributes:
-    topic: Cloud Pub/Sub topic in the form "projects/<project>/topics/<topic>".
-      If provided, subscription must be None.
-    subscription: Existing Cloud Pub/Sub subscription to use in the
-      form "projects/<project>/subscriptions/<subscription>". If not specified,
-      a temporary subscription will be created from the specified topic. If
-      provided, topic must be None.
-    id_label: The attribute on incoming Pub/Sub messages to use as a unique
-      record identifier.  When specified, the value of this attribute (which can
-      be any string that uniquely identifies the record) will be used for
-      deduplication of messages.  If not provided, Dataflow cannot guarantee
-      that no duplicate data will be delivered on the Pub/Sub stream. In this
-      case, deduplication of the stream will be strictly best effort.
     with_attributes: If False, will fetch just message payload. Otherwise,
       fetches ``PubsubMessage`` protobufs.
   """
 
   def __init__(self, topic=None, subscription=None, id_label=None,
-               with_attributes=False):
+               with_attributes=False, timestamp_attribute=None):
     # We are using this coder explicitly for portability reasons of PubsubIO
     # across implementations in languages.
     self.coder = coders.BytesCoder()
@@ -308,6 +244,7 @@ class _PubSubSource(dataflow_io.NativeSource):
     self.subscription_name = None
     self.id_label = id_label
     self.with_attributes = with_attributes
+    self.timestamp_attribute = timestamp_attribute
 
     # Perform some validation on the topic and subscription.
     if not (topic or subscription):
@@ -337,8 +274,7 @@ class _PubSubSource(dataflow_io.NativeSource):
                             label='Pubsub Subscription').drop_if_none()}
 
   def reader(self):
-    raise NotImplementedError(
-        'PubSubPayloadSource is not supported in local execution.')
+    raise NotImplementedError
 
   def is_bounded(self):
     return False
diff --git a/sdks/python/apache_beam/io/gcp/pubsub_test.py b/sdks/python/apache_beam/io/gcp/pubsub_test.py
index b5afff8..f987947 100644
--- a/sdks/python/apache_beam/io/gcp/pubsub_test.py
+++ b/sdks/python/apache_beam/io/gcp/pubsub_test.py
@@ -27,22 +27,24 @@ import mock
 
 import apache_beam as beam
 from apache_beam.io.gcp.pubsub import PubsubMessage
-from apache_beam.io.gcp.pubsub import ReadMessagesFromPubSub
+from apache_beam.io.gcp.pubsub import ReadFromPubSub
 from apache_beam.io.gcp.pubsub import ReadStringsFromPubSub
 from apache_beam.io.gcp.pubsub import WriteStringsToPubSub
 from apache_beam.io.gcp.pubsub import _PubSubPayloadSink
 from apache_beam.io.gcp.pubsub import _PubSubSource
-from apache_beam.io.gcp.pubsub import _ReadFromPubSub
 from apache_beam.options.pipeline_options import StandardOptions
 from apache_beam.runners.direct import transform_evaluator
 from apache_beam.runners.direct.direct_runner import _DirectReadFromPubSub
 from apache_beam.runners.direct.direct_runner import _get_transform_overrides
 from apache_beam.runners.direct.transform_evaluator import _PubSubReadEvaluator
 from apache_beam.testing.test_pipeline import TestPipeline
+from apache_beam.testing.util import TestWindowedValue
 from apache_beam.testing.util import assert_that
 from apache_beam.testing.util import equal_to
+from apache_beam.transforms import window
 from apache_beam.transforms.display import DisplayData
 from apache_beam.transforms.display_test import DisplayDataItemMatcher
+from apache_beam.utils import timestamp
 
 # Protect against environments where the PubSub library is not available.
 # pylint: disable=wrong-import-order, wrong-import-position
@@ -60,8 +62,9 @@ class TestReadFromPubSubOverride(unittest.TestCase):
     p = TestPipeline()
     p.options.view_as(StandardOptions).streaming = True
     pcoll = (p
-             | _ReadFromPubSub('projects/fakeprj/topics/a_topic',
-                               None, 'a_label', with_attributes=False)
+             | ReadFromPubSub('projects/fakeprj/topics/a_topic',
+                              None, 'a_label', with_attributes=False,
+                              timestamp_attribute=None)
              | beam.Map(lambda x: x))
     self.assertEqual(str, pcoll.element_type)
 
@@ -69,7 +72,7 @@ class TestReadFromPubSubOverride(unittest.TestCase):
     overrides = _get_transform_overrides(p.options)
     p.replace_all(overrides)
 
-    # Note that the direct output of ReadMessagesFromPubSub will be replaced
+    # Note that the direct output of ReadFromPubSub will be replaced
     # by a PTransformOverride, so we use a no-op Map.
     read_transform = pcoll.producer.inputs[0].producer.transform
 
@@ -82,9 +85,9 @@ class TestReadFromPubSubOverride(unittest.TestCase):
     p = TestPipeline()
     p.options.view_as(StandardOptions).streaming = True
     pcoll = (p
-             | _ReadFromPubSub(
+             | ReadFromPubSub(
                  None, 'projects/fakeprj/subscriptions/a_subscription',
-                 'a_label', with_attributes=False)
+                 'a_label', with_attributes=False, timestamp_attribute=None)
              | beam.Map(lambda x: x))
     self.assertEqual(str, pcoll.element_type)
 
@@ -92,7 +95,7 @@ class TestReadFromPubSubOverride(unittest.TestCase):
     overrides = _get_transform_overrides(p.options)
     p.replace_all(overrides)
 
-    # Note that the direct output of ReadMessagesFromPubSub will be replaced
+    # Note that the direct output of ReadFromPubSub will be replaced
     # by a PTransformOverride, so we use a no-op Map.
     read_transform = pcoll.producer.inputs[0].producer.transform
 
@@ -104,20 +107,22 @@ class TestReadFromPubSubOverride(unittest.TestCase):
   def test_expand_with_no_topic_or_subscription(self):
     with self.assertRaisesRegexp(
         ValueError, "Either a topic or subscription must be provided."):
-      _ReadFromPubSub(None, None, 'a_label', with_attributes=False)
+      ReadFromPubSub(None, None, 'a_label', with_attributes=False,
+                     timestamp_attribute=None)
 
   def test_expand_with_both_topic_and_subscription(self):
     with self.assertRaisesRegexp(
         ValueError, "Only one of topic or subscription should be provided."):
-      _ReadFromPubSub('a_topic', 'a_subscription', 'a_label',
-                      with_attributes=False)
+      ReadFromPubSub('a_topic', 'a_subscription', 'a_label',
+                     with_attributes=False, timestamp_attribute=None)
 
-  def test_expand_with_attributes(self):
+  def test_expand_with_other_options(self):
     p = TestPipeline()
     p.options.view_as(StandardOptions).streaming = True
     pcoll = (p
-             | _ReadFromPubSub('projects/fakeprj/topics/a_topic',
-                               None, 'a_label', with_attributes=True)
+             | ReadFromPubSub('projects/fakeprj/topics/a_topic',
+                              None, 'a_label', with_attributes=True,
+                              timestamp_attribute='time')
              | beam.Map(lambda x: x))
     self.assertEqual(PubsubMessage, pcoll.element_type)
 
@@ -125,14 +130,14 @@ class TestReadFromPubSubOverride(unittest.TestCase):
     overrides = _get_transform_overrides(p.options)
     p.replace_all(overrides)
 
-    # Note that the direct output of ReadMessagesFromPubSub will be replaced
+    # Note that the direct output of ReadFromPubSub will be replaced
     # by a PTransformOverride, so we use a no-op Map.
     read_transform = pcoll.producer.inputs[0].producer.transform
 
     # Ensure that the properties passed through correctly
     source = read_transform._source
-    self.assertEqual('a_topic', source.topic_name)
-    self.assertEqual('a_label', source.id_label)
+    self.assertTrue(source.with_attributes)
+    self.assertEqual('time', source.timestamp_attribute)
 
 
 @unittest.skipIf(pubsub is None, 'GCP dependencies are not installed')
@@ -276,6 +281,13 @@ class FakePubsubClient(object):
     return FakePubsubTopic(name, self)
 
 
+def create_client_message(payload, message_id, attributes, publish_time):
+  """Returns a message as it would be returned from Cloud Pub/Sub client."""
+  msg = pubsub.message.Message(payload, message_id, attributes)
+  msg._service_timestamp = publish_time
+  return msg
+
+
 @unittest.skipIf(pubsub is None, 'GCP dependencies are not installed')
 class TestReadFromPubSub(unittest.TestCase):
 
@@ -283,9 +295,13 @@ class TestReadFromPubSub(unittest.TestCase):
   def test_read_messages_success(self, mock_pubsub):
     payload = 'payload'
     message_id = 'message_id'
-    attributes = {'attribute': 'value'}
-    data = [pubsub.message.Message(payload, message_id, attributes)]
-    expected_data = [PubsubMessage(payload, message_id, attributes, None)]
+    publish_time = '2018-03-12T13:37:01.234567Z'
+    attributes = {'key': 'value'}
+    data = [create_client_message(
+        payload, message_id, attributes, publish_time)]
+    expected_data = [TestWindowedValue(PubsubMessage(payload, attributes),
+                                       timestamp.Timestamp(1520861821.234567),
+                                       [window.GlobalWindow()])]
 
     mock_pubsub.Client = functools.partial(FakePubsubClient, data)
     mock_pubsub.subscription.AutoAck = FakeAutoAck
@@ -293,16 +309,18 @@ class TestReadFromPubSub(unittest.TestCase):
     p = TestPipeline()
     p.options.view_as(StandardOptions).streaming = True
     pcoll = (p
-             | ReadMessagesFromPubSub('projects/fakeprj/topics/a_topic',
-                                      None, 'a_label'))
-    assert_that(pcoll, equal_to(expected_data))
+             | ReadFromPubSub('projects/fakeprj/topics/a_topic',
+                              None, 'a_label', with_attributes=True))
+    assert_that(pcoll, equal_to(expected_data), reify_windows=True)
     p.run()
 
   @mock.patch('google.cloud.pubsub')
   def test_read_strings_success(self, mock_pubsub):
     payload = u'🤷 ¯\\_(ツ)_/¯'
     payload_encoded = payload.encode('utf-8')
-    data = [pubsub.message.Message(payload_encoded, None, None)]
+    publish_time = '2018-03-12T13:37:01.234567Z'
+    data = [create_client_message(
+        payload_encoded, None, None, publish_time)]
     expected_data = [payload]
 
     mock_pubsub.Client = functools.partial(FakePubsubClient, data)
@@ -316,6 +334,121 @@ class TestReadFromPubSub(unittest.TestCase):
     assert_that(pcoll, equal_to(expected_data))
     p.run()
 
+  @mock.patch('google.cloud.pubsub')
+  def test_read_payload_success(self, mock_pubsub):
+    payload_encoded = u'🤷 ¯\\_(ツ)_/¯'.encode('utf-8')
+    publish_time = '2018-03-12T13:37:01.234567Z'
+    data = [create_client_message(
+        payload_encoded, None, None, publish_time)]
+    expected_data = [payload_encoded]
+
+    mock_pubsub.Client = functools.partial(FakePubsubClient, data)
+    mock_pubsub.subscription.AutoAck = FakeAutoAck
+
+    p = TestPipeline()
+    p.options.view_as(StandardOptions).streaming = True
+    pcoll = (p
+             | ReadFromPubSub('projects/fakeprj/topics/a_topic',
+                              None, 'a_label'))
+    assert_that(pcoll, equal_to(expected_data))
+    p.run()
+
+  @mock.patch('google.cloud.pubsub')
+  def test_read_messages_timestamp_attribute_milli_success(self, mock_pubsub):
+    payload = 'payload'
+    message_id = 'message_id'
+    attributes = {'time': '1337'}
+    publish_time = '2018-03-12T13:37:01.234567Z'
+    data = [create_client_message(
+        payload, message_id, attributes, publish_time)]
+    expected_data = [
+        TestWindowedValue(
+            PubsubMessage(payload, attributes),
+            timestamp.Timestamp(micros=int(attributes['time']) * 1000),
+            [window.GlobalWindow()]),
+    ]
+
+    mock_pubsub.Client = functools.partial(FakePubsubClient, data)
+    mock_pubsub.subscription.AutoAck = FakeAutoAck
+
+    p = TestPipeline()
+    p.options.view_as(StandardOptions).streaming = True
+    pcoll = (p
+             | ReadFromPubSub(
+                 'projects/fakeprj/topics/a_topic', None, 'a_label',
+                 with_attributes=True, timestamp_attribute='time'))
+    assert_that(pcoll, equal_to(expected_data), reify_windows=True)
+    p.run()
+
+  @mock.patch('google.cloud.pubsub')
+  def test_read_messages_timestamp_attribute_rfc3339_success(self, mock_pubsub):
+    payload = 'payload'
+    message_id = 'message_id'
+    attributes = {'time': '2018-03-12T13:37:01.234567Z'}
+    publish_time = '2018-03-12T13:37:01.234567Z'
+    data = [create_client_message(
+        payload, message_id, attributes, publish_time)]
+    expected_data = [
+        TestWindowedValue(
+            PubsubMessage(payload, attributes),
+            timestamp.Timestamp.from_rfc3339(attributes['time']),
+            [window.GlobalWindow()]),
+    ]
+
+    mock_pubsub.Client = functools.partial(FakePubsubClient, data)
+    mock_pubsub.subscription.AutoAck = FakeAutoAck
+
+    p = TestPipeline()
+    p.options.view_as(StandardOptions).streaming = True
+    pcoll = (p
+             | ReadFromPubSub(
+                 'projects/fakeprj/topics/a_topic', None, 'a_label',
+                 with_attributes=True, timestamp_attribute='time'))
+    assert_that(pcoll, equal_to(expected_data), reify_windows=True)
+    p.run()
+
+  @mock.patch('google.cloud.pubsub')
+  def test_read_messages_timestamp_attribute_fail_missing(self, mock_pubsub):
+    payload = 'payload'
+    message_id = 'message_id'
+    attributes = {'time': '1337'}
+    publish_time = '2018-03-12T13:37:01.234567Z'
+    data = [create_client_message(
+        payload, message_id, attributes, publish_time)]
+
+    mock_pubsub.Client = functools.partial(FakePubsubClient, data)
+    mock_pubsub.subscription.AutoAck = FakeAutoAck
+
+    p = TestPipeline()
+    p.options.view_as(StandardOptions).streaming = True
+    _ = (p
+         | ReadFromPubSub(
+             'projects/fakeprj/topics/a_topic', None, 'a_label',
+             with_attributes=True, timestamp_attribute='nonexistent'))
+    with self.assertRaisesRegexp(KeyError, r'Timestamp.*nonexistent'):
+      p.run()
+
+  @mock.patch('google.cloud.pubsub')
+  def test_read_messages_timestamp_attribute_fail_parse(self, mock_pubsub):
+    payload = 'payload'
+    message_id = 'message_id'
+    attributes = {'time': '1337 unparseable'}
+    publish_time = '2018-03-12T13:37:01.234567Z'
+    data = [create_client_message(
+        payload, message_id, attributes, publish_time)]
+
+    mock_pubsub.Client = functools.partial(FakePubsubClient, data)
+    mock_pubsub.subscription.AutoAck = FakeAutoAck
+
+    p = TestPipeline()
+    p.options.view_as(StandardOptions).streaming = True
+    _ = (p
+         | ReadFromPubSub(
+             'projects/fakeprj/topics/a_topic', None, 'a_label',
+             with_attributes=True, timestamp_attribute='time'))
+    with self.assertRaisesRegexp(ValueError, r'parse'):
+      p.run()
+
 
 if __name__ == '__main__':
   logging.getLogger().setLevel(logging.INFO)
diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
index e53a12e..d9d76f8 100644
--- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
+++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
@@ -777,7 +777,7 @@ class DataflowRunner(PipelineRunner):
       standard_options = (
           transform_node.inputs[0].pipeline.options.view_as(StandardOptions))
       if not standard_options.streaming:
-        raise ValueError('PubSubPayloadSource is currently available for use '
+        raise ValueError('Cloud Pub/Sub is currently available for use '
                          'only in streaming pipelines.')
       # Only one of topic or subscription should be set.
       if transform.source.full_subscription:
@@ -793,6 +793,9 @@ class DataflowRunner(PipelineRunner):
         # Setting this property signals Dataflow runner to return full
         # PubsubMessages instead of just the payload.
         step.add_property(PropertyNames.PUBSUB_SERIALIZED_ATTRIBUTES_FN, '')
+      if transform.source.timestamp_attribute is not None:
+        step.add_property(PropertyNames.PUBSUB_TIMESTAMP_ATTRIBUTE,
+                          transform.source.timestamp_attribute)
     else:
       raise ValueError(
           'Source %r has unexpected format %s.' % (
diff --git a/sdks/python/apache_beam/runners/dataflow/internal/names.py b/sdks/python/apache_beam/runners/dataflow/internal/names.py
index 8e48236..b5f8051 100644
--- a/sdks/python/apache_beam/runners/dataflow/internal/names.py
+++ b/sdks/python/apache_beam/runners/dataflow/internal/names.py
@@ -74,10 +74,11 @@ class PropertyNames(object):
   OUTPUT_INFO = 'output_info'
   OUTPUT_NAME = 'output_name'
   PARALLEL_INPUT = 'parallel_input'
-  PUBSUB_TOPIC = 'pubsub_topic'
-  PUBSUB_SUBSCRIPTION = 'pubsub_subscription'
   PUBSUB_ID_LABEL = 'pubsub_id_label'
   PUBSUB_SERIALIZED_ATTRIBUTES_FN = 'pubsub_serialized_attributes_fn'
+  PUBSUB_SUBSCRIPTION = 'pubsub_subscription'
+  PUBSUB_TIMESTAMP_ATTRIBUTE = 'pubsub_timestamp_label'
+  PUBSUB_TOPIC = 'pubsub_topic'
   SERIALIZED_FN = 'serialized_fn'
   SHARD_NAME_TEMPLATE = 'shard_template'
   SOURCE_STEP_INPUT = 'custom_source_step_input'
diff --git a/sdks/python/apache_beam/runners/direct/direct_runner.py b/sdks/python/apache_beam/runners/direct/direct_runner.py
index 062509f..510a4e6 100644
--- a/sdks/python/apache_beam/runners/direct/direct_runner.py
+++ b/sdks/python/apache_beam/runners/direct/direct_runner.py
@@ -268,7 +268,7 @@ def _get_pubsub_transform_overrides(pipeline_options):
   class ReadFromPubSubOverride(PTransformOverride):
     def matches(self, applied_ptransform):
       return isinstance(applied_ptransform.transform,
-                        beam_pubsub._ReadFromPubSub)
+                        beam_pubsub.ReadFromPubSub)
 
     def get_replacement_transform(self, transform):
       if not pipeline_options.view_as(StandardOptions).streaming:
diff --git a/sdks/python/apache_beam/runners/direct/transform_evaluator.py b/sdks/python/apache_beam/runners/direct/transform_evaluator.py
index ad29007..eb1ccd5 100644
--- a/sdks/python/apache_beam/runners/direct/transform_evaluator.py
+++ b/sdks/python/apache_beam/runners/direct/transform_evaluator.py
@@ -417,7 +417,7 @@ class _PubSubReadEvaluator(_TransformEvaluator):
   def process_element(self, element):
     pass
 
-  def _read_from_pubsub(self):
+  def _read_from_pubsub(self, timestamp_attribute):
     from apache_beam.io.gcp.pubsub import PubsubMessage
     from google.cloud import pubsub
     # Because of the AutoAck, we are not able to reread messages if this
@@ -428,24 +428,40 @@ class _PubSubReadEvaluator(_TransformEvaluator):
         self._subscription, return_immediately=True,
         max_messages=10) as results:
       def _get_element(message):
-        if self.source.with_attributes:
-          return PubsubMessage._from_message(message)
+        parsed_message = PubsubMessage._from_message(message)
+        if timestamp_attribute:
+          try:
+            rfc3339_or_milli = parsed_message.attributes[timestamp_attribute]
+          except KeyError as e:
+            raise KeyError('Timestamp attribute not found: %s' % e)
+          try:
+            timestamp = Timestamp.from_rfc3339(rfc3339_or_milli)
+          except ValueError:
+            try:
+              timestamp = Timestamp(micros=int(rfc3339_or_milli) * 1000)
+            except ValueError as e:
+              raise ValueError('Bad timestamp value: %s' % e)
         else:
-          return message.data
+          timestamp = Timestamp.from_rfc3339(message.service_timestamp)
+
+        return timestamp, parsed_message
 
       return [_get_element(message)
               for unused_ack_id, message in results.items()]
 
   def finish_bundle(self):
-    data = self._read_from_pubsub()
+    data = self._read_from_pubsub(self.source.timestamp_attribute)
     if data:
       output_pcollection = list(self._outputs)[0]
       bundle = self._evaluation_context.create_bundle(output_pcollection)
-      # TODO(ccy): we currently do not use the PubSub message timestamp or
-      # respect the PubSub source's id_label field.
-      now = Timestamp.of(time.time())
-      for message_data in data:
-        bundle.output(GlobalWindows.windowed_value(message_data, timestamp=now))
+      # TODO(ccy): Respect the PubSub source's id_label field.
+      for timestamp, message in data:
+        if self.source.with_attributes:
+          element = message
+        else:
+          element = message.payload
+        bundle.output(
+            GlobalWindows.windowed_value(element, timestamp=timestamp))
       bundles = [bundle]
     else:
       bundles = []
@@ -456,6 +472,7 @@ class _PubSubReadEvaluator(_TransformEvaluator):
     unprocessed_bundle = self._evaluation_context.create_bundle(
         input_pvalue)
 
+    # TODO(udim): Correct value for watermark hold.
     return TransformResult(self, bundles, [unprocessed_bundle], None,
                            {None: Timestamp.of(time.time())})
 
diff --git a/sdks/python/apache_beam/utils/timestamp.py b/sdks/python/apache_beam/utils/timestamp.py
index c437d5a..7c41c30 100644
--- a/sdks/python/apache_beam/utils/timestamp.py
+++ b/sdks/python/apache_beam/utils/timestamp.py
@@ -24,6 +24,9 @@ from __future__ import absolute_import
 from __future__ import division
 
 import datetime
+import re
+
+import pytz
 
 
 class Timestamp(object):
@@ -38,6 +41,12 @@ class Timestamp(object):
   """
 
   def __init__(self, seconds=0, micros=0):
+    if not isinstance(seconds, (int, float, long)):
+      raise TypeError('Cannot interpret %s %s as seconds.' % (
+          seconds, type(seconds)))
+    if not isinstance(micros, (int, float, long)):
+      raise TypeError('Cannot interpret %s %s as micros.' % (
+          micros, type(micros)))
     self.micros = int(seconds * 1000000) + int(micros)
 
   @staticmethod
@@ -53,12 +62,52 @@ class Timestamp(object):
       Corresponding Timestamp object.
     """
 
-    if isinstance(seconds, Duration):
-      raise TypeError('Can\'t interpret %s as Timestamp.' % seconds)
+    if not isinstance(seconds, (int, float, Timestamp)):
+      raise TypeError('Cannot interpret %s %s as Timestamp.' % (
+          seconds, type(seconds)))
     if isinstance(seconds, Timestamp):
       return seconds
     return Timestamp(seconds)
 
+  RFC_3339_RE = re.compile(
+      r'^(\d{4})-(\d{2})-(\d{2})T(\d{2}):(\d{2}):(\d{2})(?:\.(\d+))?Z$')
+
+  @staticmethod
+  def _epoch_datetime_utc():
+    return datetime.datetime.fromtimestamp(0, pytz.utc)
+
+  @classmethod
+  def from_utc_datetime(cls, dt):
+    """Create a ``Timestamp`` instance from a ``datetime.datetime`` object.
+
+    Args:
+      dt: A ``datetime.datetime`` object in UTC (offset-aware).
+    """
+    if dt.tzinfo != pytz.utc:
+      raise ValueError('dt not in UTC: %s', dt)
+    duration = dt - cls._epoch_datetime_utc()
+    return Timestamp(duration.total_seconds())
+
+  @classmethod
+  def from_rfc3339(cls, rfc3339):
+    """Create a ``Timestamp`` instance from an RFC 3339 compliant string.
+
+    Args:
+      rfc3339: String in RFC 3339 form.
+    """
+    dt_args = []
+    match = cls.RFC_3339_RE.match(rfc3339)
+    if match is None:
+      raise ValueError('Could not parse RFC 3339 string: %s', rfc3339)
+    for s in match.groups():
+      if s is not None:
+        dt_args.append(int(s))
+      else:
+        dt_args.append(0)
+    dt_args += (pytz.utc, )
+    dt = datetime.datetime(*dt_args)
+    return cls.from_utc_datetime(dt)
+
   def predecessor(self):
     """Returns the largest timestamp smaller than self."""
     return Timestamp(micros=self.micros - 1)
@@ -76,12 +125,12 @@ class Timestamp(object):
     return 'Timestamp(%s%d)' % (sign, int_part)
 
   def to_utc_datetime(self):
-    epoch = datetime.datetime.utcfromtimestamp(0)
     # We can't easily construct a datetime object from microseconds, so we
     # create one at the epoch and add an appropriate timedelta interval.
-    return epoch + datetime.timedelta(microseconds=self.micros)
+    return self._epoch_datetime_utc().replace(tzinfo=None) + datetime.timedelta(
+        microseconds=self.micros)
 
-  def isoformat(self):
+  def to_rfc3339(self):
     # Append 'Z' for UTC timezone.
     return self.to_utc_datetime().isoformat() + 'Z'
 
@@ -150,7 +199,7 @@ class Duration(object):
     """
 
     if isinstance(seconds, Timestamp):
-      raise TypeError('Can\'t interpret %s as Duration.' % seconds)
+      raise TypeError('Cannot interpret %s as Duration.' % seconds)
     if isinstance(seconds, Duration):
       return seconds
     return Duration(seconds)
diff --git a/sdks/python/apache_beam/utils/timestamp_test.py b/sdks/python/apache_beam/utils/timestamp_test.py
index 3322936..8296dc6 100644
--- a/sdks/python/apache_beam/utils/timestamp_test.py
+++ b/sdks/python/apache_beam/utils/timestamp_test.py
@@ -19,8 +19,11 @@
 
 from __future__ import absolute_import
 
+import datetime
 import unittest
 
+import pytz
+
 from apache_beam.utils.timestamp import Duration
 from apache_beam.utils.timestamp import Timestamp
 
@@ -43,13 +46,39 @@ class TimestampTest(unittest.TestCase):
     self.assertEqual(Timestamp(10000000) % Duration(0.000005), 0)
 
   def test_utc_timestamp(self):
-    self.assertEqual(Timestamp(10000000).isoformat(),
+    self.assertEqual(Timestamp(10000000).to_rfc3339(),
                      '1970-04-26T17:46:40Z')
-    self.assertEqual(Timestamp(10000000.000001).isoformat(),
+    self.assertEqual(Timestamp(10000000.000001).to_rfc3339(),
                      '1970-04-26T17:46:40.000001Z')
-    self.assertEqual(Timestamp(1458343379.123456).isoformat(),
+    self.assertEqual(Timestamp(1458343379.123456).to_rfc3339(),
                      '2016-03-18T23:22:59.123456Z')
 
+  def test_from_rfc3339(self):
+    test_cases = [
+        (10000000, '1970-04-26T17:46:40Z'),
+        (10000000.000001, '1970-04-26T17:46:40.000001Z'),
+        (1458343379.123456, '2016-03-18T23:22:59.123456Z'),
+    ]
+    for seconds_float, rfc3339_str in test_cases:
+      self.assertEqual(Timestamp(seconds_float),
+                       Timestamp.from_rfc3339(rfc3339_str))
+      self.assertEqual(rfc3339_str,
+                       Timestamp.from_rfc3339(rfc3339_str).to_rfc3339())
+
+  def test_from_rfc3339_failure(self):
+    with self.assertRaisesRegexp(ValueError, 'parse'):
+      Timestamp.from_rfc3339('not rfc3339')
+    with self.assertRaisesRegexp(ValueError, 'parse'):
+      Timestamp.from_rfc3339('2016-03-18T23:22:59.123456Z unparseable')
+
+  def test_from_utc_datetime(self):
+    self.assertEqual(
+        Timestamp.from_utc_datetime(datetime.datetime(1970, 1, 1,
+                                                      tzinfo=pytz.utc)),
+        Timestamp(0))
+    with self.assertRaisesRegexp(ValueError, r'UTC'):
+      Timestamp.from_utc_datetime(datetime.datetime(1970, 1, 1))
+
   def test_arithmetic(self):
     # Supported operations.
     self.assertEqual(Timestamp(123) + 456, 579)
diff --git a/sdks/python/setup.py b/sdks/python/setup.py
index ffc4df7..b7f400e 100644
--- a/sdks/python/setup.py
+++ b/sdks/python/setup.py
@@ -106,6 +106,7 @@ REQUIRED_PACKAGES = [
     'oauth2client>=2.0.1,<5',
     # grpcio 1.8.1 and above requires protobuf 3.5.0.post1.
     'protobuf>=3.5.0.post1,<4',
+    'pytz>=2018.3',
     'pyyaml>=3.12,<4.0.0',
     'pyvcf>=0.6.8,<0.7.0',
     'six>=1.9,<1.12',

-- 
To stop receiving notification emails like this one, please contact
chamikara@apache.org.