You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ch...@apache.org on 2018/03/27 22:43:27 UTC
[beam] branch master updated: [BEAM-3744] Expand Pubsub read API
for Python. (#4901)
This is an automated email from the ASF dual-hosted git repository.
chamikara pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 8d854d4 [BEAM-3744] Expand Pubsub read API for Python. (#4901)
8d854d4 is described below
commit 8d854d4ce0365a8e201b388618d7732f000c65b9
Author: Udi Meiri (Ehud) <ud...@users.noreply.github.com>
AuthorDate: Tue Mar 27 15:43:23 2018 -0700
[BEAM-3744] Expand Pubsub read API for Python. (#4901)
* Expand Pubsub read API for Python.
New features:
- Timestamp attributes (user attribute containing message timestamp)
- In either milliseconds from epoch or RFC 3339.
- Label IDs (user attribute used as a unique message ID)
- Raw payloads
- Source API can return either:
- Payloads (raw or decoded UTF-8)
- PubSubMessage type, encapsulating message payload and attributes
* Address review comments.
* Move to a single read PTransform: ReadFromPubSub
ReadStringsFromPubSub will be removed in the future.
---
.../apache_beam/examples/windowed_wordcount.py | 4 +-
sdks/python/apache_beam/io/gcp/pubsub.py | 202 +++++++--------------
sdks/python/apache_beam/io/gcp/pubsub_test.py | 181 +++++++++++++++---
.../runners/dataflow/dataflow_runner.py | 5 +-
.../apache_beam/runners/dataflow/internal/names.py | 5 +-
.../apache_beam/runners/direct/direct_runner.py | 2 +-
.../runners/direct/transform_evaluator.py | 37 +++-
sdks/python/apache_beam/utils/timestamp.py | 61 ++++++-
sdks/python/apache_beam/utils/timestamp_test.py | 35 +++-
sdks/python/setup.py | 1 +
10 files changed, 351 insertions(+), 182 deletions(-)
diff --git a/sdks/python/apache_beam/examples/windowed_wordcount.py b/sdks/python/apache_beam/examples/windowed_wordcount.py
index 3838408..987b660 100644
--- a/sdks/python/apache_beam/examples/windowed_wordcount.py
+++ b/sdks/python/apache_beam/examples/windowed_wordcount.py
@@ -67,10 +67,10 @@ def run(argv=None):
with beam.Pipeline(argv=pipeline_args) as p:
- # Read the text from PubSub messages
+ # Read the text from PubSub messages.
lines = p | beam.io.ReadStringsFromPubSub(known_args.input_topic)
- # Capitalize the characters in each line.
+ # Get the number of appearances of a word.
def count_ones(word_ones):
(word, ones) = word_ones
return (word, sum(ones))
diff --git a/sdks/python/apache_beam/io/gcp/pubsub.py b/sdks/python/apache_beam/io/gcp/pubsub.py
index d5afee9..f5ca17e 100644
--- a/sdks/python/apache_beam/io/gcp/pubsub.py
+++ b/sdks/python/apache_beam/io/gcp/pubsub.py
@@ -20,6 +20,14 @@ Cloud Pub/Sub sources and sinks are currently supported only in streaming
pipelines, during remote execution.
This API is currently under development and is subject to change.
+
+Description of common arguments used in this module:
+ topic: Cloud Pub/Sub topic in the form "projects/<project>/topics/<topic>".
+ If provided, subscription must be None.
+ subscription: Existing Cloud Pub/Sub subscription to use in the
+ form "projects/<project>/subscriptions/<subscription>". If not specified,
+ a temporary subscription will be created from the specified topic. If
+ provided, topic must be None.
"""
from __future__ import absolute_import
@@ -34,8 +42,6 @@ from apache_beam.io.iobase import Write
from apache_beam.runners.dataflow.native_io import iobase as dataflow_io
from apache_beam.transforms import Map
from apache_beam.transforms import PTransform
-from apache_beam.transforms import core
-from apache_beam.transforms import window
from apache_beam.transforms.display import DisplayDataItem
try:
@@ -44,7 +50,7 @@ except ImportError:
pubsub_pb2 = None
-__all__ = ['PubsubMessage', 'ReadMessagesFromPubSub', 'ReadStringsFromPubSub',
+__all__ = ['PubsubMessage', 'ReadFromPubSub', 'ReadStringsFromPubSub',
'WriteStringsToPubSub']
@@ -52,38 +58,27 @@ class PubsubMessage(object):
"""Represents a message from Cloud Pub/Sub.
This interface is experimental. No backwards compatibility guarantees.
+
+ Attributes:
+ payload: (str) Message payload, as a byte string.
+ attributes: (dict) Map of string to string.
"""
- def __init__(self, payload, message_id, attributes, publish_time):
+ def __init__(self, payload, attributes):
"""Constructs a message.
- This interface is experimental. No backwards compatibility guarantees.
-
- See also:
- https://cloud.google.com/pubsub/docs/reference/rest/v1/PubsubMessage
-
- Attributes:
- payload: (str) Message payload, as a byte string.
- message_id: (str) Server assigned message ID. Unique within a topic.
- attributes: (dict) Map of string to string.
- publish_time: (str) Server assigned timestamp of when the message was
- published.
+ Beam users should not directly construct ``PubsubMessages``.
"""
self.payload = payload
- self.message_id = message_id
self.attributes = attributes
- self.publish_time = publish_time
def __eq__(self, other):
return isinstance(other, PubsubMessage) and (
self.payload == other.payload and
- self.message_id == other.message_id and
- self.attributes == other.attributes and
- self.publish_time == other.publish_time)
+ self.attributes == other.attributes)
def __repr__(self):
- return 'PubsubMessage(%s, %s, %s, %s)' % (
- self.payload, self.message_id, self.attributes, self.publish_time)
+ return 'PubsubMessage(%s, %s)' % (self.payload, self.attributes)
@staticmethod
def _from_proto(proto_msg):
@@ -95,8 +90,7 @@ class PubsubMessage(object):
msg.ParseFromString(proto_msg)
# Convert ScalarMapContainer to dict.
attributes = dict((key, msg.attributes[key]) for key in msg.attributes)
- return PubsubMessage(msg.data, msg.message_id,
- attributes, msg.publish_time)
+ return PubsubMessage(msg.data, attributes)
@staticmethod
def _from_message(msg):
@@ -106,130 +100,82 @@ class PubsubMessage(object):
"""
# Convert ScalarMapContainer to dict.
attributes = dict((key, msg.attributes[key]) for key in msg.attributes)
- return PubsubMessage(msg.data, msg.message_id,
- attributes, msg.service_timestamp)
-
+ return PubsubMessage(msg.data, attributes)
-class ReadMessagesFromPubSub(PTransform):
- """A ``PTransform`` for reading from Cloud Pub/Sub.
-
- This interface is experimental. No backwards compatibility guarantees.
- Outputs elements of type :class:`~PubsubMessage`.
- """
+class ReadFromPubSub(PTransform):
+ """A ``PTransform`` for reading from Cloud Pub/Sub."""
+ # Implementation note: This ``PTransform`` is overridden by Directrunner.
- def __init__(self, topic=None, subscription=None, id_label=None):
- """Initializes ``ReadMessagesFromPubSub``.
+ def __init__(self, topic=None, subscription=None, id_label=None,
+ with_attributes=False, timestamp_attribute=None):
+ """Initializes ``ReadFromPubSub``.
Args:
- topic: Cloud Pub/Sub topic in the form "projects/<project>/topics/
- <topic>". If provided, subscription must be None.
- subscription: Existing Cloud Pub/Sub subscription to use in the
- form "projects/<project>/subscriptions/<subscription>". If not
- specified, a temporary subscription will be created from the specified
- topic. If provided, topic must be None.
id_label: The attribute on incoming Pub/Sub messages to use as a unique
- record identifier. When specified, the value of this attribute (which
+ record identifier. When specified, the value of this attribute (which
can be any string that uniquely identifies the record) will be used for
- deduplication of messages. If not provided, we cannot guarantee
+ deduplication of messages. If not provided, we cannot guarantee
that no duplicate data will be delivered on the Pub/Sub stream. In this
case, deduplication of the stream will be strictly best effort.
+ with_attributes:
+ True - output elements will be :class:`~PubsubMessage` objects.
+ False - output elements will be of type ``str`` (message payload only).
+ timestamp_attribute: Message value to use as element timestamp. If None,
+ uses message publishing time as the timestamp.
+ Note that this argument doesn't require with_attributes=True.
+
+ Timestamp values should be in one of two formats:
+
+ - A numerical value representing the number of milliseconds since the
+ Unix epoch.
+ - A string in RFC 3339 format, UTC timezone. Example:
+ ``2015-10-29T23:41:41.123Z``. The sub-second component of the
+ timestamp is optional, and digits beyond the first three (i.e., time
+ units smaller than milliseconds) may be ignored.
"""
- super(ReadMessagesFromPubSub, self).__init__()
- self.topic = topic
- self.subscription = subscription
- self.id_label = id_label
-
- def get_windowing(self, unused_inputs):
- return core.Windowing(window.GlobalWindows())
+ super(ReadFromPubSub, self).__init__()
+ self.with_attributes = with_attributes
+ self._source = _PubSubSource(
+ topic=topic,
+ subscription=subscription,
+ id_label=id_label,
+ with_attributes=with_attributes,
+ timestamp_attribute=timestamp_attribute)
- def expand(self, pcoll):
- p = (pcoll.pipeline
- | _ReadFromPubSub(self.topic, self.subscription, self.id_label,
- with_attributes=True))
- return p
+ def expand(self, pvalue):
+ pcoll = pvalue.pipeline | Read(self._source)
+ if self.with_attributes:
+ pcoll = pcoll | Map(PubsubMessage._from_proto)
+ pcoll.element_type = PubsubMessage
+ else:
+ pcoll.element_type = bytes
+ return pcoll
class ReadStringsFromPubSub(PTransform):
"""A ``PTransform`` for reading utf-8 string payloads from Cloud Pub/Sub.
Outputs elements of type ``unicode``, decoded from UTF-8.
+
+ This class is deprecated.
"""
def __init__(self, topic=None, subscription=None, id_label=None):
- """Initializes ``ReadStringsFromPubSub``.
-
- Args:
- topic: Cloud Pub/Sub topic in the form "projects/<project>/topics/
- <topic>". If provided, subscription must be None.
- subscription: Existing Cloud Pub/Sub subscription to use in the
- form "projects/<project>/subscriptions/<subscription>". If not
- specified, a temporary subscription will be created from the specified
- topic. If provided, topic must be None.
- id_label: The attribute on incoming Pub/Sub messages to use as a unique
- record identifier. When specified, the value of this attribute (which
- can be any string that uniquely identifies the record) will be used for
- deduplication of messages. If not provided, we cannot guarantee
- that no duplicate data will be delivered on the Pub/Sub stream. In this
- case, deduplication of the stream will be strictly best effort.
- """
super(ReadStringsFromPubSub, self).__init__()
self.topic = topic
self.subscription = subscription
self.id_label = id_label
- def get_windowing(self, unused_inputs):
- return core.Windowing(window.GlobalWindows())
-
- def expand(self, pcoll):
- p = (pcoll.pipeline
- | _ReadFromPubSub(self.topic, self.subscription, self.id_label,
- with_attributes=False)
+ def expand(self, pvalue):
+ p = (pvalue.pipeline
+ | ReadFromPubSub(self.topic, self.subscription, self.id_label,
+ with_attributes=False)
| 'DecodeString' >> Map(lambda b: b.decode('utf-8')))
p.element_type = text_type
return p
-class _ReadFromPubSub(PTransform):
- """A ``PTransform`` for reading from Cloud Pub/Sub."""
-
- def __init__(self, topic, subscription, id_label, with_attributes):
- """Initializes ``_ReadFromPubSub``.
-
- Args:
- topic: Cloud Pub/Sub topic in the form "projects/<project>/topics/
- <topic>". If provided, subscription must be None.
- subscription: Existing Cloud Pub/Sub subscription to use in the
- form "projects/<project>/subscriptions/<subscription>". If not
- specified, a temporary subscription will be created from the specified
- topic. If provided, topic must be None.
- id_label: The attribute on incoming Pub/Sub messages to use as a unique
- record identifier. When specified, the value of this attribute (which
- can be any string that uniquely identifies the record) will be used for
- deduplication of messages. If None, we cannot guarantee
- that no duplicate data will be delivered on the Pub/Sub stream. In this
- case, deduplication of the stream will be strictly best effort.
- with_attributes: False - output elements will be raw payload bytes.
- True - output will be :class:`~PubsubMessage` objects.
- """
- super(_ReadFromPubSub, self).__init__()
- self.with_attributes = with_attributes
- self._source = _PubSubSource(
- topic,
- subscription=subscription,
- id_label=id_label,
- with_attributes=with_attributes)
-
- def expand(self, pvalue):
- pcoll = pvalue.pipeline | Read(self._source)
- if self.with_attributes:
- pcoll = pcoll | Map(PubsubMessage._from_proto)
- pcoll.element_type = PubsubMessage
- else:
- pcoll.element_type = bytes
- return pcoll
-
-
class WriteStringsToPubSub(PTransform):
"""A ``PTransform`` for writing utf-8 string payloads to Cloud Pub/Sub."""
@@ -280,25 +226,15 @@ def parse_subscription(full_subscription):
class _PubSubSource(dataflow_io.NativeSource):
"""Source for the payload of a message as bytes from a Cloud Pub/Sub topic.
+ This ``NativeSource`` is overridden by a native Pubsub implementation.
+
Attributes:
- topic: Cloud Pub/Sub topic in the form "projects/<project>/topics/<topic>".
- If provided, subscription must be None.
- subscription: Existing Cloud Pub/Sub subscription to use in the
- form "projects/<project>/subscriptions/<subscription>". If not specified,
- a temporary subscription will be created from the specified topic. If
- provided, topic must be None.
- id_label: The attribute on incoming Pub/Sub messages to use as a unique
- record identifier. When specified, the value of this attribute (which can
- be any string that uniquely identifies the record) will be used for
- deduplication of messages. If not provided, Dataflow cannot guarantee
- that no duplicate data will be delivered on the Pub/Sub stream. In this
- case, deduplication of the stream will be strictly best effort.
with_attributes: If False, will fetch just message payload. Otherwise,
fetches ``PubsubMessage`` protobufs.
"""
def __init__(self, topic=None, subscription=None, id_label=None,
- with_attributes=False):
+ with_attributes=False, timestamp_attribute=None):
# We are using this coder explicitly for portability reasons of PubsubIO
# across implementations in languages.
self.coder = coders.BytesCoder()
@@ -308,6 +244,7 @@ class _PubSubSource(dataflow_io.NativeSource):
self.subscription_name = None
self.id_label = id_label
self.with_attributes = with_attributes
+ self.timestamp_attribute = timestamp_attribute
# Perform some validation on the topic and subscription.
if not (topic or subscription):
@@ -337,8 +274,7 @@ class _PubSubSource(dataflow_io.NativeSource):
label='Pubsub Subscription').drop_if_none()}
def reader(self):
- raise NotImplementedError(
- 'PubSubPayloadSource is not supported in local execution.')
+ raise NotImplementedError
def is_bounded(self):
return False
diff --git a/sdks/python/apache_beam/io/gcp/pubsub_test.py b/sdks/python/apache_beam/io/gcp/pubsub_test.py
index b5afff8..f987947 100644
--- a/sdks/python/apache_beam/io/gcp/pubsub_test.py
+++ b/sdks/python/apache_beam/io/gcp/pubsub_test.py
@@ -27,22 +27,24 @@ import mock
import apache_beam as beam
from apache_beam.io.gcp.pubsub import PubsubMessage
-from apache_beam.io.gcp.pubsub import ReadMessagesFromPubSub
+from apache_beam.io.gcp.pubsub import ReadFromPubSub
from apache_beam.io.gcp.pubsub import ReadStringsFromPubSub
from apache_beam.io.gcp.pubsub import WriteStringsToPubSub
from apache_beam.io.gcp.pubsub import _PubSubPayloadSink
from apache_beam.io.gcp.pubsub import _PubSubSource
-from apache_beam.io.gcp.pubsub import _ReadFromPubSub
from apache_beam.options.pipeline_options import StandardOptions
from apache_beam.runners.direct import transform_evaluator
from apache_beam.runners.direct.direct_runner import _DirectReadFromPubSub
from apache_beam.runners.direct.direct_runner import _get_transform_overrides
from apache_beam.runners.direct.transform_evaluator import _PubSubReadEvaluator
from apache_beam.testing.test_pipeline import TestPipeline
+from apache_beam.testing.util import TestWindowedValue
from apache_beam.testing.util import assert_that
from apache_beam.testing.util import equal_to
+from apache_beam.transforms import window
from apache_beam.transforms.display import DisplayData
from apache_beam.transforms.display_test import DisplayDataItemMatcher
+from apache_beam.utils import timestamp
# Protect against environments where the PubSub library is not available.
# pylint: disable=wrong-import-order, wrong-import-position
@@ -60,8 +62,9 @@ class TestReadFromPubSubOverride(unittest.TestCase):
p = TestPipeline()
p.options.view_as(StandardOptions).streaming = True
pcoll = (p
- | _ReadFromPubSub('projects/fakeprj/topics/a_topic',
- None, 'a_label', with_attributes=False)
+ | ReadFromPubSub('projects/fakeprj/topics/a_topic',
+ None, 'a_label', with_attributes=False,
+ timestamp_attribute=None)
| beam.Map(lambda x: x))
self.assertEqual(str, pcoll.element_type)
@@ -69,7 +72,7 @@ class TestReadFromPubSubOverride(unittest.TestCase):
overrides = _get_transform_overrides(p.options)
p.replace_all(overrides)
- # Note that the direct output of ReadMessagesFromPubSub will be replaced
+ # Note that the direct output of ReadFromPubSub will be replaced
# by a PTransformOverride, so we use a no-op Map.
read_transform = pcoll.producer.inputs[0].producer.transform
@@ -82,9 +85,9 @@ class TestReadFromPubSubOverride(unittest.TestCase):
p = TestPipeline()
p.options.view_as(StandardOptions).streaming = True
pcoll = (p
- | _ReadFromPubSub(
+ | ReadFromPubSub(
None, 'projects/fakeprj/subscriptions/a_subscription',
- 'a_label', with_attributes=False)
+ 'a_label', with_attributes=False, timestamp_attribute=None)
| beam.Map(lambda x: x))
self.assertEqual(str, pcoll.element_type)
@@ -92,7 +95,7 @@ class TestReadFromPubSubOverride(unittest.TestCase):
overrides = _get_transform_overrides(p.options)
p.replace_all(overrides)
- # Note that the direct output of ReadMessagesFromPubSub will be replaced
+ # Note that the direct output of ReadFromPubSub will be replaced
# by a PTransformOverride, so we use a no-op Map.
read_transform = pcoll.producer.inputs[0].producer.transform
@@ -104,20 +107,22 @@ class TestReadFromPubSubOverride(unittest.TestCase):
def test_expand_with_no_topic_or_subscription(self):
with self.assertRaisesRegexp(
ValueError, "Either a topic or subscription must be provided."):
- _ReadFromPubSub(None, None, 'a_label', with_attributes=False)
+ ReadFromPubSub(None, None, 'a_label', with_attributes=False,
+ timestamp_attribute=None)
def test_expand_with_both_topic_and_subscription(self):
with self.assertRaisesRegexp(
ValueError, "Only one of topic or subscription should be provided."):
- _ReadFromPubSub('a_topic', 'a_subscription', 'a_label',
- with_attributes=False)
+ ReadFromPubSub('a_topic', 'a_subscription', 'a_label',
+ with_attributes=False, timestamp_attribute=None)
- def test_expand_with_attributes(self):
+ def test_expand_with_other_options(self):
p = TestPipeline()
p.options.view_as(StandardOptions).streaming = True
pcoll = (p
- | _ReadFromPubSub('projects/fakeprj/topics/a_topic',
- None, 'a_label', with_attributes=True)
+ | ReadFromPubSub('projects/fakeprj/topics/a_topic',
+ None, 'a_label', with_attributes=True,
+ timestamp_attribute='time')
| beam.Map(lambda x: x))
self.assertEqual(PubsubMessage, pcoll.element_type)
@@ -125,14 +130,14 @@ class TestReadFromPubSubOverride(unittest.TestCase):
overrides = _get_transform_overrides(p.options)
p.replace_all(overrides)
- # Note that the direct output of ReadMessagesFromPubSub will be replaced
+ # Note that the direct output of ReadFromPubSub will be replaced
# by a PTransformOverride, so we use a no-op Map.
read_transform = pcoll.producer.inputs[0].producer.transform
# Ensure that the properties passed through correctly
source = read_transform._source
- self.assertEqual('a_topic', source.topic_name)
- self.assertEqual('a_label', source.id_label)
+ self.assertTrue(source.with_attributes)
+ self.assertEqual('time', source.timestamp_attribute)
@unittest.skipIf(pubsub is None, 'GCP dependencies are not installed')
@@ -276,6 +281,13 @@ class FakePubsubClient(object):
return FakePubsubTopic(name, self)
+def create_client_message(payload, message_id, attributes, publish_time):
+ """Returns a message as it would be returned from Cloud Pub/Sub client."""
+ msg = pubsub.message.Message(payload, message_id, attributes)
+ msg._service_timestamp = publish_time
+ return msg
+
+
@unittest.skipIf(pubsub is None, 'GCP dependencies are not installed')
class TestReadFromPubSub(unittest.TestCase):
@@ -283,9 +295,13 @@ class TestReadFromPubSub(unittest.TestCase):
def test_read_messages_success(self, mock_pubsub):
payload = 'payload'
message_id = 'message_id'
- attributes = {'attribute': 'value'}
- data = [pubsub.message.Message(payload, message_id, attributes)]
- expected_data = [PubsubMessage(payload, message_id, attributes, None)]
+ publish_time = '2018-03-12T13:37:01.234567Z'
+ attributes = {'key': 'value'}
+ data = [create_client_message(
+ payload, message_id, attributes, publish_time)]
+ expected_data = [TestWindowedValue(PubsubMessage(payload, attributes),
+ timestamp.Timestamp(1520861821.234567),
+ [window.GlobalWindow()])]
mock_pubsub.Client = functools.partial(FakePubsubClient, data)
mock_pubsub.subscription.AutoAck = FakeAutoAck
@@ -293,16 +309,18 @@ class TestReadFromPubSub(unittest.TestCase):
p = TestPipeline()
p.options.view_as(StandardOptions).streaming = True
pcoll = (p
- | ReadMessagesFromPubSub('projects/fakeprj/topics/a_topic',
- None, 'a_label'))
- assert_that(pcoll, equal_to(expected_data))
+ | ReadFromPubSub('projects/fakeprj/topics/a_topic',
+ None, 'a_label', with_attributes=True))
+ assert_that(pcoll, equal_to(expected_data), reify_windows=True)
p.run()
@mock.patch('google.cloud.pubsub')
def test_read_strings_success(self, mock_pubsub):
payload = u'🤷 ¯\\_(ツ)_/¯'
payload_encoded = payload.encode('utf-8')
- data = [pubsub.message.Message(payload_encoded, None, None)]
+ publish_time = '2018-03-12T13:37:01.234567Z'
+ data = [create_client_message(
+ payload_encoded, None, None, publish_time)]
expected_data = [payload]
mock_pubsub.Client = functools.partial(FakePubsubClient, data)
@@ -316,6 +334,121 @@ class TestReadFromPubSub(unittest.TestCase):
assert_that(pcoll, equal_to(expected_data))
p.run()
+ @mock.patch('google.cloud.pubsub')
+ def test_read_payload_success(self, mock_pubsub):
+ payload_encoded = u'🤷 ¯\\_(ツ)_/¯'.encode('utf-8')
+ publish_time = '2018-03-12T13:37:01.234567Z'
+ data = [create_client_message(
+ payload_encoded, None, None, publish_time)]
+ expected_data = [payload_encoded]
+
+ mock_pubsub.Client = functools.partial(FakePubsubClient, data)
+ mock_pubsub.subscription.AutoAck = FakeAutoAck
+
+ p = TestPipeline()
+ p.options.view_as(StandardOptions).streaming = True
+ pcoll = (p
+ | ReadFromPubSub('projects/fakeprj/topics/a_topic',
+ None, 'a_label'))
+ assert_that(pcoll, equal_to(expected_data))
+ p.run()
+
+ @mock.patch('google.cloud.pubsub')
+ def test_read_messages_timestamp_attribute_milli_success(self, mock_pubsub):
+ payload = 'payload'
+ message_id = 'message_id'
+ attributes = {'time': '1337'}
+ publish_time = '2018-03-12T13:37:01.234567Z'
+ data = [create_client_message(
+ payload, message_id, attributes, publish_time)]
+ expected_data = [
+ TestWindowedValue(
+ PubsubMessage(payload, attributes),
+ timestamp.Timestamp(micros=int(attributes['time']) * 1000),
+ [window.GlobalWindow()]),
+ ]
+
+ mock_pubsub.Client = functools.partial(FakePubsubClient, data)
+ mock_pubsub.subscription.AutoAck = FakeAutoAck
+
+ p = TestPipeline()
+ p.options.view_as(StandardOptions).streaming = True
+ pcoll = (p
+ | ReadFromPubSub(
+ 'projects/fakeprj/topics/a_topic', None, 'a_label',
+ with_attributes=True, timestamp_attribute='time'))
+ assert_that(pcoll, equal_to(expected_data), reify_windows=True)
+ p.run()
+
+ @mock.patch('google.cloud.pubsub')
+ def test_read_messages_timestamp_attribute_rfc3339_success(self, mock_pubsub):
+ payload = 'payload'
+ message_id = 'message_id'
+ attributes = {'time': '2018-03-12T13:37:01.234567Z'}
+ publish_time = '2018-03-12T13:37:01.234567Z'
+ data = [create_client_message(
+ payload, message_id, attributes, publish_time)]
+ expected_data = [
+ TestWindowedValue(
+ PubsubMessage(payload, attributes),
+ timestamp.Timestamp.from_rfc3339(attributes['time']),
+ [window.GlobalWindow()]),
+ ]
+
+ mock_pubsub.Client = functools.partial(FakePubsubClient, data)
+ mock_pubsub.subscription.AutoAck = FakeAutoAck
+
+ p = TestPipeline()
+ p.options.view_as(StandardOptions).streaming = True
+ pcoll = (p
+ | ReadFromPubSub(
+ 'projects/fakeprj/topics/a_topic', None, 'a_label',
+ with_attributes=True, timestamp_attribute='time'))
+ assert_that(pcoll, equal_to(expected_data), reify_windows=True)
+ p.run()
+
+ @mock.patch('google.cloud.pubsub')
+ def test_read_messages_timestamp_attribute_fail_missing(self, mock_pubsub):
+ payload = 'payload'
+ message_id = 'message_id'
+ attributes = {'time': '1337'}
+ publish_time = '2018-03-12T13:37:01.234567Z'
+ data = [create_client_message(
+ payload, message_id, attributes, publish_time)]
+
+ mock_pubsub.Client = functools.partial(FakePubsubClient, data)
+ mock_pubsub.subscription.AutoAck = FakeAutoAck
+
+ p = TestPipeline()
+ p.options.view_as(StandardOptions).streaming = True
+ _ = (p
+ | ReadFromPubSub(
+ 'projects/fakeprj/topics/a_topic', None, 'a_label',
+ with_attributes=True, timestamp_attribute='nonexistent'))
+ with self.assertRaisesRegexp(KeyError, r'Timestamp.*nonexistent'):
+ p.run()
+
+ @mock.patch('google.cloud.pubsub')
+ def test_read_messages_timestamp_attribute_fail_parse(self, mock_pubsub):
+ payload = 'payload'
+ message_id = 'message_id'
+ attributes = {'time': '1337 unparseable'}
+ publish_time = '2018-03-12T13:37:01.234567Z'
+ data = [create_client_message(
+ payload, message_id, attributes, publish_time)]
+
+ mock_pubsub.Client = functools.partial(FakePubsubClient, data)
+ mock_pubsub.subscription.AutoAck = FakeAutoAck
+
+ p = TestPipeline()
+ p.options.view_as(StandardOptions).streaming = True
+ _ = (p
+ | ReadFromPubSub(
+ 'projects/fakeprj/topics/a_topic', None, 'a_label',
+ with_attributes=True, timestamp_attribute='time'))
+ with self.assertRaisesRegexp(ValueError, r'parse'):
+ p.run()
+
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
index e53a12e..d9d76f8 100644
--- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
+++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
@@ -777,7 +777,7 @@ class DataflowRunner(PipelineRunner):
standard_options = (
transform_node.inputs[0].pipeline.options.view_as(StandardOptions))
if not standard_options.streaming:
- raise ValueError('PubSubPayloadSource is currently available for use '
+ raise ValueError('Cloud Pub/Sub is currently available for use '
'only in streaming pipelines.')
# Only one of topic or subscription should be set.
if transform.source.full_subscription:
@@ -793,6 +793,9 @@ class DataflowRunner(PipelineRunner):
# Setting this property signals Dataflow runner to return full
# PubsubMessages instead of just the payload.
step.add_property(PropertyNames.PUBSUB_SERIALIZED_ATTRIBUTES_FN, '')
+ if transform.source.timestamp_attribute is not None:
+ step.add_property(PropertyNames.PUBSUB_TIMESTAMP_ATTRIBUTE,
+ transform.source.timestamp_attribute)
else:
raise ValueError(
'Source %r has unexpected format %s.' % (
diff --git a/sdks/python/apache_beam/runners/dataflow/internal/names.py b/sdks/python/apache_beam/runners/dataflow/internal/names.py
index 8e48236..b5f8051 100644
--- a/sdks/python/apache_beam/runners/dataflow/internal/names.py
+++ b/sdks/python/apache_beam/runners/dataflow/internal/names.py
@@ -74,10 +74,11 @@ class PropertyNames(object):
OUTPUT_INFO = 'output_info'
OUTPUT_NAME = 'output_name'
PARALLEL_INPUT = 'parallel_input'
- PUBSUB_TOPIC = 'pubsub_topic'
- PUBSUB_SUBSCRIPTION = 'pubsub_subscription'
PUBSUB_ID_LABEL = 'pubsub_id_label'
PUBSUB_SERIALIZED_ATTRIBUTES_FN = 'pubsub_serialized_attributes_fn'
+ PUBSUB_SUBSCRIPTION = 'pubsub_subscription'
+ PUBSUB_TIMESTAMP_ATTRIBUTE = 'pubsub_timestamp_label'
+ PUBSUB_TOPIC = 'pubsub_topic'
SERIALIZED_FN = 'serialized_fn'
SHARD_NAME_TEMPLATE = 'shard_template'
SOURCE_STEP_INPUT = 'custom_source_step_input'
diff --git a/sdks/python/apache_beam/runners/direct/direct_runner.py b/sdks/python/apache_beam/runners/direct/direct_runner.py
index 062509f..510a4e6 100644
--- a/sdks/python/apache_beam/runners/direct/direct_runner.py
+++ b/sdks/python/apache_beam/runners/direct/direct_runner.py
@@ -268,7 +268,7 @@ def _get_pubsub_transform_overrides(pipeline_options):
class ReadFromPubSubOverride(PTransformOverride):
def matches(self, applied_ptransform):
return isinstance(applied_ptransform.transform,
- beam_pubsub._ReadFromPubSub)
+ beam_pubsub.ReadFromPubSub)
def get_replacement_transform(self, transform):
if not pipeline_options.view_as(StandardOptions).streaming:
diff --git a/sdks/python/apache_beam/runners/direct/transform_evaluator.py b/sdks/python/apache_beam/runners/direct/transform_evaluator.py
index ad29007..eb1ccd5 100644
--- a/sdks/python/apache_beam/runners/direct/transform_evaluator.py
+++ b/sdks/python/apache_beam/runners/direct/transform_evaluator.py
@@ -417,7 +417,7 @@ class _PubSubReadEvaluator(_TransformEvaluator):
def process_element(self, element):
pass
- def _read_from_pubsub(self):
+ def _read_from_pubsub(self, timestamp_attribute):
from apache_beam.io.gcp.pubsub import PubsubMessage
from google.cloud import pubsub
# Because of the AutoAck, we are not able to reread messages if this
@@ -428,24 +428,40 @@ class _PubSubReadEvaluator(_TransformEvaluator):
self._subscription, return_immediately=True,
max_messages=10) as results:
def _get_element(message):
- if self.source.with_attributes:
- return PubsubMessage._from_message(message)
+ parsed_message = PubsubMessage._from_message(message)
+ if timestamp_attribute:
+ try:
+ rfc3339_or_milli = parsed_message.attributes[timestamp_attribute]
+ except KeyError as e:
+ raise KeyError('Timestamp attribute not found: %s' % e)
+ try:
+ timestamp = Timestamp.from_rfc3339(rfc3339_or_milli)
+ except ValueError:
+ try:
+ timestamp = Timestamp(micros=int(rfc3339_or_milli) * 1000)
+ except ValueError as e:
+ raise ValueError('Bad timestamp value: %s' % e)
else:
- return message.data
+ timestamp = Timestamp.from_rfc3339(message.service_timestamp)
+
+ return timestamp, parsed_message
return [_get_element(message)
for unused_ack_id, message in results.items()]
def finish_bundle(self):
- data = self._read_from_pubsub()
+ data = self._read_from_pubsub(self.source.timestamp_attribute)
if data:
output_pcollection = list(self._outputs)[0]
bundle = self._evaluation_context.create_bundle(output_pcollection)
- # TODO(ccy): we currently do not use the PubSub message timestamp or
- # respect the PubSub source's id_label field.
- now = Timestamp.of(time.time())
- for message_data in data:
- bundle.output(GlobalWindows.windowed_value(message_data, timestamp=now))
+ # TODO(ccy): Respect the PubSub source's id_label field.
+ for timestamp, message in data:
+ if self.source.with_attributes:
+ element = message
+ else:
+ element = message.payload
+ bundle.output(
+ GlobalWindows.windowed_value(element, timestamp=timestamp))
bundles = [bundle]
else:
bundles = []
@@ -456,6 +472,7 @@ class _PubSubReadEvaluator(_TransformEvaluator):
unprocessed_bundle = self._evaluation_context.create_bundle(
input_pvalue)
+ # TODO(udim): Correct value for watermark hold.
return TransformResult(self, bundles, [unprocessed_bundle], None,
{None: Timestamp.of(time.time())})
diff --git a/sdks/python/apache_beam/utils/timestamp.py b/sdks/python/apache_beam/utils/timestamp.py
index c437d5a..7c41c30 100644
--- a/sdks/python/apache_beam/utils/timestamp.py
+++ b/sdks/python/apache_beam/utils/timestamp.py
@@ -24,6 +24,9 @@ from __future__ import absolute_import
from __future__ import division
import datetime
+import re
+
+import pytz
class Timestamp(object):
@@ -38,6 +41,12 @@ class Timestamp(object):
"""
def __init__(self, seconds=0, micros=0):
+ if not isinstance(seconds, (int, float, long)):
+ raise TypeError('Cannot interpret %s %s as seconds.' % (
+ seconds, type(seconds)))
+ if not isinstance(micros, (int, float, long)):
+ raise TypeError('Cannot interpret %s %s as micros.' % (
+ micros, type(micros)))
self.micros = int(seconds * 1000000) + int(micros)
@staticmethod
@@ -53,12 +62,52 @@ class Timestamp(object):
Corresponding Timestamp object.
"""
- if isinstance(seconds, Duration):
- raise TypeError('Can\'t interpret %s as Timestamp.' % seconds)
+ if not isinstance(seconds, (int, float, Timestamp)):
+ raise TypeError('Cannot interpret %s %s as Timestamp.' % (
+ seconds, type(seconds)))
if isinstance(seconds, Timestamp):
return seconds
return Timestamp(seconds)
+ RFC_3339_RE = re.compile(
+ r'^(\d{4})-(\d{2})-(\d{2})T(\d{2}):(\d{2}):(\d{2})(?:\.(\d+))?Z$')
+
+ @staticmethod
+ def _epoch_datetime_utc():
+ return datetime.datetime.fromtimestamp(0, pytz.utc)
+
+ @classmethod
+ def from_utc_datetime(cls, dt):
+ """Create a ``Timestamp`` instance from a ``datetime.datetime`` object.
+
+ Args:
+ dt: A ``datetime.datetime`` object in UTC (offset-aware).
+ """
+ if dt.tzinfo != pytz.utc:
+ raise ValueError('dt not in UTC: %s', dt)
+ duration = dt - cls._epoch_datetime_utc()
+ return Timestamp(duration.total_seconds())
+
+ @classmethod
+ def from_rfc3339(cls, rfc3339):
+ """Create a ``Timestamp`` instance from an RFC 3339 compliant string.
+
+ Args:
+ rfc3339: String in RFC 3339 form.
+ """
+ dt_args = []
+ match = cls.RFC_3339_RE.match(rfc3339)
+ if match is None:
+ raise ValueError('Could not parse RFC 3339 string: %s', rfc3339)
+ for s in match.groups():
+ if s is not None:
+ dt_args.append(int(s))
+ else:
+ dt_args.append(0)
+ dt_args += (pytz.utc, )
+ dt = datetime.datetime(*dt_args)
+ return cls.from_utc_datetime(dt)
+
def predecessor(self):
"""Returns the largest timestamp smaller than self."""
return Timestamp(micros=self.micros - 1)
@@ -76,12 +125,12 @@ class Timestamp(object):
return 'Timestamp(%s%d)' % (sign, int_part)
def to_utc_datetime(self):
- epoch = datetime.datetime.utcfromtimestamp(0)
# We can't easily construct a datetime object from microseconds, so we
# create one at the epoch and add an appropriate timedelta interval.
- return epoch + datetime.timedelta(microseconds=self.micros)
+ return self._epoch_datetime_utc().replace(tzinfo=None) + datetime.timedelta(
+ microseconds=self.micros)
- def isoformat(self):
+ def to_rfc3339(self):
# Append 'Z' for UTC timezone.
return self.to_utc_datetime().isoformat() + 'Z'
@@ -150,7 +199,7 @@ class Duration(object):
"""
if isinstance(seconds, Timestamp):
- raise TypeError('Can\'t interpret %s as Duration.' % seconds)
+ raise TypeError('Cannot interpret %s as Duration.' % seconds)
if isinstance(seconds, Duration):
return seconds
return Duration(seconds)
diff --git a/sdks/python/apache_beam/utils/timestamp_test.py b/sdks/python/apache_beam/utils/timestamp_test.py
index 3322936..8296dc6 100644
--- a/sdks/python/apache_beam/utils/timestamp_test.py
+++ b/sdks/python/apache_beam/utils/timestamp_test.py
@@ -19,8 +19,11 @@
from __future__ import absolute_import
+import datetime
import unittest
+import pytz
+
from apache_beam.utils.timestamp import Duration
from apache_beam.utils.timestamp import Timestamp
@@ -43,13 +46,39 @@ class TimestampTest(unittest.TestCase):
self.assertEqual(Timestamp(10000000) % Duration(0.000005), 0)
def test_utc_timestamp(self):
- self.assertEqual(Timestamp(10000000).isoformat(),
+ self.assertEqual(Timestamp(10000000).to_rfc3339(),
'1970-04-26T17:46:40Z')
- self.assertEqual(Timestamp(10000000.000001).isoformat(),
+ self.assertEqual(Timestamp(10000000.000001).to_rfc3339(),
'1970-04-26T17:46:40.000001Z')
- self.assertEqual(Timestamp(1458343379.123456).isoformat(),
+ self.assertEqual(Timestamp(1458343379.123456).to_rfc3339(),
'2016-03-18T23:22:59.123456Z')
+ def test_from_rfc3339(self):
+ test_cases = [
+ (10000000, '1970-04-26T17:46:40Z'),
+ (10000000.000001, '1970-04-26T17:46:40.000001Z'),
+ (1458343379.123456, '2016-03-18T23:22:59.123456Z'),
+ ]
+ for seconds_float, rfc3339_str in test_cases:
+ self.assertEqual(Timestamp(seconds_float),
+ Timestamp.from_rfc3339(rfc3339_str))
+ self.assertEqual(rfc3339_str,
+ Timestamp.from_rfc3339(rfc3339_str).to_rfc3339())
+
+ def test_from_rfc3339_failure(self):
+ with self.assertRaisesRegexp(ValueError, 'parse'):
+ Timestamp.from_rfc3339('not rfc3339')
+ with self.assertRaisesRegexp(ValueError, 'parse'):
+ Timestamp.from_rfc3339('2016-03-18T23:22:59.123456Z unparseable')
+
+ def test_from_utc_datetime(self):
+ self.assertEqual(
+ Timestamp.from_utc_datetime(datetime.datetime(1970, 1, 1,
+ tzinfo=pytz.utc)),
+ Timestamp(0))
+ with self.assertRaisesRegexp(ValueError, r'UTC'):
+ Timestamp.from_utc_datetime(datetime.datetime(1970, 1, 1))
+
def test_arithmetic(self):
# Supported operations.
self.assertEqual(Timestamp(123) + 456, 579)
diff --git a/sdks/python/setup.py b/sdks/python/setup.py
index ffc4df7..b7f400e 100644
--- a/sdks/python/setup.py
+++ b/sdks/python/setup.py
@@ -106,6 +106,7 @@ REQUIRED_PACKAGES = [
'oauth2client>=2.0.1,<5',
# grpcio 1.8.1 and above requires protobuf 3.5.0.post1.
'protobuf>=3.5.0.post1,<4',
+ 'pytz>=2018.3',
'pyyaml>=3.12,<4.0.0',
'pyvcf>=0.6.8,<0.7.0',
'six>=1.9,<1.12',
--
To stop receiving notification emails like this one, please contact
chamikara@apache.org.