You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/06/12 16:55:28 UTC
[13/50] [abbrv] beam git commit: Add coder info to pubsub io
Add coder info to pubsub io
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/b5852d21
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/b5852d21
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/b5852d21
Branch: refs/heads/gearpump-runner
Commit: b5852d212cab060321c43a5800f8585aa3649aec
Parents: 0a0a1bc
Author: Vikas Kedigehalli <vi...@google.com>
Authored: Wed Jun 7 16:28:18 2017 -0700
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Wed Jun 7 22:55:00 2017 -0700
----------------------------------------------------------------------
sdks/python/apache_beam/io/gcp/pubsub.py | 32 +++++++++++++++-----
sdks/python/apache_beam/io/gcp/pubsub_test.py | 28 +++++++++++++++--
.../runners/dataflow/dataflow_runner.py | 23 ++++++++++----
3 files changed, 67 insertions(+), 16 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/b5852d21/sdks/python/apache_beam/io/gcp/pubsub.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/gcp/pubsub.py b/sdks/python/apache_beam/io/gcp/pubsub.py
index 1ba8ac0..40326e1 100644
--- a/sdks/python/apache_beam/io/gcp/pubsub.py
+++ b/sdks/python/apache_beam/io/gcp/pubsub.py
@@ -40,13 +40,15 @@ __all__ = ['ReadStringsFromPubSub', 'WriteStringsToPubSub',
class ReadStringsFromPubSub(PTransform):
"""A ``PTransform`` for reading utf-8 string payloads from Cloud Pub/Sub."""
- def __init__(self, topic, subscription=None, id_label=None):
+ def __init__(self, topic=None, subscription=None, id_label=None):
"""Initializes ``ReadStringsFromPubSub``.
Attributes:
- topic: Cloud Pub/Sub topic in the form "/topics/<project>/<topic>".
- subscription: Optional existing Cloud Pub/Sub subscription to use in the
- form "projects/<project>/subscriptions/<subscription>".
+ topic: Cloud Pub/Sub topic in the form "/topics/<project>/<topic>". If
+ provided then subscription must be None.
+ subscription: Existing Cloud Pub/Sub subscription to use in the
+ form "projects/<project>/subscriptions/<subscription>". If provided then
+ topic must be None.
id_label: The attribute on incoming Pub/Sub messages to use as a unique
record identifier. When specified, the value of this attribute (which
can be any string that uniquely identifies the record) will be used for
@@ -55,6 +57,12 @@ class ReadStringsFromPubSub(PTransform):
case, deduplication of the stream will be strictly best effort.
"""
super(ReadStringsFromPubSub, self).__init__()
+ if topic and subscription:
+ raise ValueError("Only one of topic or subscription should be provided.")
+
+ if not (topic or subscription):
+ raise ValueError("Either a topic or subscription must be provided.")
+
self._source = _PubSubPayloadSource(
topic,
subscription=subscription,
@@ -90,9 +98,11 @@ class _PubSubPayloadSource(dataflow_io.NativeSource):
"""Source for the payload of a message as bytes from a Cloud Pub/Sub topic.
Attributes:
- topic: Cloud Pub/Sub topic in the form "/topics/<project>/<topic>".
- subscription: Optional existing Cloud Pub/Sub subscription to use in the
- form "projects/<project>/subscriptions/<subscription>".
+ topic: Cloud Pub/Sub topic in the form "/topics/<project>/<topic>". If
+ provided then topic must be None.
+ subscription: Existing Cloud Pub/Sub subscription to use in the
+ form "projects/<project>/subscriptions/<subscription>". If provided then
+ subscription must be None.
id_label: The attribute on incoming Pub/Sub messages to use as a unique
record identifier. When specified, the value of this attribute (which can
be any string that uniquely identifies the record) will be used for
@@ -101,7 +111,10 @@ class _PubSubPayloadSource(dataflow_io.NativeSource):
case, deduplication of the stream will be strictly best effort.
"""
- def __init__(self, topic, subscription=None, id_label=None):
+ def __init__(self, topic=None, subscription=None, id_label=None):
+ # we are using this coder explicitly for portability reasons of PubsubIO
+ # across implementations in languages.
+ self.coder = coders.BytesCoder()
self.topic = topic
self.subscription = subscription
self.id_label = id_label
@@ -131,6 +144,9 @@ class _PubSubPayloadSink(dataflow_io.NativeSink):
"""Sink for the payload of a message as bytes to a Cloud Pub/Sub topic."""
def __init__(self, topic):
+ # we are using this coder explicitly for portability reasons of PubsubIO
+ # across implementations in languages.
+ self.coder = coders.BytesCoder()
self.topic = topic
@property
http://git-wip-us.apache.org/repos/asf/beam/blob/b5852d21/sdks/python/apache_beam/io/gcp/pubsub_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/gcp/pubsub_test.py b/sdks/python/apache_beam/io/gcp/pubsub_test.py
index 322d08a..cf14e8c 100644
--- a/sdks/python/apache_beam/io/gcp/pubsub_test.py
+++ b/sdks/python/apache_beam/io/gcp/pubsub_test.py
@@ -34,9 +34,9 @@ from apache_beam.transforms.display_test import DisplayDataItemMatcher
class TestReadStringsFromPubSub(unittest.TestCase):
- def test_expand(self):
+ def test_expand_with_topic(self):
p = TestPipeline()
- pcoll = p | ReadStringsFromPubSub('a_topic', 'a_subscription', 'a_label')
+ pcoll = p | ReadStringsFromPubSub('a_topic', None, 'a_label')
# Ensure that the output type is str
self.assertEqual(unicode, pcoll.element_type)
@@ -47,9 +47,33 @@ class TestReadStringsFromPubSub(unittest.TestCase):
# Ensure that the properties passed through correctly
source = read_pcoll.producer.transform.source
self.assertEqual('a_topic', source.topic)
+ self.assertEqual('a_label', source.id_label)
+
+ def test_expand_with_subscription(self):
+ p = TestPipeline()
+ pcoll = p | ReadStringsFromPubSub(None, 'a_subscription', 'a_label')
+ # Ensure that the output type is str
+ self.assertEqual(unicode, pcoll.element_type)
+
+ # Ensure that the type on the intermediate read output PCollection is bytes
+ read_pcoll = pcoll.producer.inputs[0]
+ self.assertEqual(bytes, read_pcoll.element_type)
+
+ # Ensure that the properties passed through correctly
+ source = read_pcoll.producer.transform.source
self.assertEqual('a_subscription', source.subscription)
self.assertEqual('a_label', source.id_label)
+ def test_expand_with_both_topic_and_subscription(self):
+ with self.assertRaisesRegexp(
+ ValueError, "Only one of topic or subscription should be provided."):
+ ReadStringsFromPubSub('a_topic', 'a_subscription', 'a_label')
+
+ def test_expand_with_no_topic_or_subscription(self):
+ with self.assertRaisesRegexp(
+ ValueError, "Either a topic or subscription must be provided."):
+ ReadStringsFromPubSub(None, None, 'a_label')
+
class TestWriteStringsToPubSub(unittest.TestCase):
def test_expand(self):
http://git-wip-us.apache.org/repos/asf/beam/blob/b5852d21/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
index 3fc8983..d9aa1bf 100644
--- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
+++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
@@ -618,10 +618,12 @@ class DataflowRunner(PipelineRunner):
if not standard_options.streaming:
raise ValueError('PubSubPayloadSource is currently available for use '
'only in streaming pipelines.')
- step.add_property(PropertyNames.PUBSUB_TOPIC, transform.source.topic)
- if transform.source.subscription:
+ # Only one of topic or subscription should be set.
+ if transform.source.topic:
+ step.add_property(PropertyNames.PUBSUB_TOPIC, transform.source.topic)
+ elif transform.source.subscription:
step.add_property(PropertyNames.PUBSUB_SUBSCRIPTION,
- transform.source.topic)
+ transform.source.subscription)
if transform.source.id_label:
step.add_property(PropertyNames.PUBSUB_ID_LABEL,
transform.source.id_label)
@@ -639,7 +641,12 @@ class DataflowRunner(PipelineRunner):
# step should be the type of value outputted by each step. Read steps
# automatically wrap output values in a WindowedValue wrapper, if necessary.
# This is also necessary for proper encoding for size estimation.
- coder = coders.WindowedValueCoder(transform._infer_output_coder()) # pylint: disable=protected-access
+ # Using a GlobalWindowCoder as a place holder instead of the default
+ # PickleCoder because GlobalWindowCoder is known coder.
+ # TODO(robertwb): Query the collection for the windowfn to extract the
+ # correct coder.
+ coder = coders.WindowedValueCoder(transform._infer_output_coder(),
+ coders.coders.GlobalWindowCoder()) # pylint: disable=protected-access
step.encoding = self._get_cloud_encoding(coder)
step.add_property(
@@ -708,8 +715,12 @@ class DataflowRunner(PipelineRunner):
step.add_property(PropertyNames.FORMAT, transform.sink.format)
# Wrap coder in WindowedValueCoder: this is necessary for proper encoding
- # for size estimation.
- coder = coders.WindowedValueCoder(transform.sink.coder)
+ # for size estimation. Using a GlobalWindowCoder as a place holder instead
+ # of the default PickleCoder because GlobalWindowCoder is known coder.
+ # TODO(robertwb): Query the collection for the windowfn to extract the
+ # correct coder.
+ coder = coders.WindowedValueCoder(transform.sink.coder,
+ coders.coders.GlobalWindowCoder())
step.encoding = self._get_cloud_encoding(coder)
step.add_property(PropertyNames.ENCODING, step.encoding)
step.add_property(