You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/06/12 16:55:28 UTC

[13/50] [abbrv] beam git commit: Add coder info to pubsub io

Add coder info to pubsub io


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/b5852d21
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/b5852d21
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/b5852d21

Branch: refs/heads/gearpump-runner
Commit: b5852d212cab060321c43a5800f8585aa3649aec
Parents: 0a0a1bc
Author: Vikas Kedigehalli <vi...@google.com>
Authored: Wed Jun 7 16:28:18 2017 -0700
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Wed Jun 7 22:55:00 2017 -0700

----------------------------------------------------------------------
 sdks/python/apache_beam/io/gcp/pubsub.py        | 32 +++++++++++++++-----
 sdks/python/apache_beam/io/gcp/pubsub_test.py   | 28 +++++++++++++++--
 .../runners/dataflow/dataflow_runner.py         | 23 ++++++++++----
 3 files changed, 67 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/b5852d21/sdks/python/apache_beam/io/gcp/pubsub.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/gcp/pubsub.py b/sdks/python/apache_beam/io/gcp/pubsub.py
index 1ba8ac0..40326e1 100644
--- a/sdks/python/apache_beam/io/gcp/pubsub.py
+++ b/sdks/python/apache_beam/io/gcp/pubsub.py
@@ -40,13 +40,15 @@ __all__ = ['ReadStringsFromPubSub', 'WriteStringsToPubSub',
 class ReadStringsFromPubSub(PTransform):
   """A ``PTransform`` for reading utf-8 string payloads from Cloud Pub/Sub."""
 
-  def __init__(self, topic, subscription=None, id_label=None):
+  def __init__(self, topic=None, subscription=None, id_label=None):
     """Initializes ``ReadStringsFromPubSub``.
 
     Attributes:
-      topic: Cloud Pub/Sub topic in the form "/topics/<project>/<topic>".
-      subscription: Optional existing Cloud Pub/Sub subscription to use in the
-        form "projects/<project>/subscriptions/<subscription>".
+      topic: Cloud Pub/Sub topic in the form "/topics/<project>/<topic>". If
+        provided then subscription must be None.
+      subscription: Existing Cloud Pub/Sub subscription to use in the
+        form "projects/<project>/subscriptions/<subscription>". If provided then
+        topic must be None.
       id_label: The attribute on incoming Pub/Sub messages to use as a unique
         record identifier.  When specified, the value of this attribute (which
         can be any string that uniquely identifies the record) will be used for
@@ -55,6 +57,12 @@ class ReadStringsFromPubSub(PTransform):
         case, deduplication of the stream will be strictly best effort.
     """
     super(ReadStringsFromPubSub, self).__init__()
+    if topic and subscription:
+      raise ValueError("Only one of topic or subscription should be provided.")
+
+    if not (topic or subscription):
+      raise ValueError("Either a topic or subscription must be provided.")
+
     self._source = _PubSubPayloadSource(
         topic,
         subscription=subscription,
@@ -90,9 +98,11 @@ class _PubSubPayloadSource(dataflow_io.NativeSource):
   """Source for the payload of a message as bytes from a Cloud Pub/Sub topic.
 
   Attributes:
-    topic: Cloud Pub/Sub topic in the form "/topics/<project>/<topic>".
-    subscription: Optional existing Cloud Pub/Sub subscription to use in the
-      form "projects/<project>/subscriptions/<subscription>".
+    topic: Cloud Pub/Sub topic in the form "/topics/<project>/<topic>". If
+      provided then topic must be None.
+    subscription: Existing Cloud Pub/Sub subscription to use in the
+      form "projects/<project>/subscriptions/<subscription>". If provided then
+      subscription must be None.
     id_label: The attribute on incoming Pub/Sub messages to use as a unique
       record identifier.  When specified, the value of this attribute (which can
       be any string that uniquely identifies the record) will be used for
@@ -101,7 +111,10 @@ class _PubSubPayloadSource(dataflow_io.NativeSource):
       case, deduplication of the stream will be strictly best effort.
   """
 
-  def __init__(self, topic, subscription=None, id_label=None):
+  def __init__(self, topic=None, subscription=None, id_label=None):
+    # we are using this coder explicitly for portability reasons of PubsubIO
+    # across implementations in languages.
+    self.coder = coders.BytesCoder()
     self.topic = topic
     self.subscription = subscription
     self.id_label = id_label
@@ -131,6 +144,9 @@ class _PubSubPayloadSink(dataflow_io.NativeSink):
   """Sink for the payload of a message as bytes to a Cloud Pub/Sub topic."""
 
   def __init__(self, topic):
+    # we are using this coder explicitly for portability reasons of PubsubIO
+    # across implementations in languages.
+    self.coder = coders.BytesCoder()
     self.topic = topic
 
   @property

http://git-wip-us.apache.org/repos/asf/beam/blob/b5852d21/sdks/python/apache_beam/io/gcp/pubsub_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/gcp/pubsub_test.py b/sdks/python/apache_beam/io/gcp/pubsub_test.py
index 322d08a..cf14e8c 100644
--- a/sdks/python/apache_beam/io/gcp/pubsub_test.py
+++ b/sdks/python/apache_beam/io/gcp/pubsub_test.py
@@ -34,9 +34,9 @@ from apache_beam.transforms.display_test import DisplayDataItemMatcher
 
 
 class TestReadStringsFromPubSub(unittest.TestCase):
-  def test_expand(self):
+  def test_expand_with_topic(self):
     p = TestPipeline()
-    pcoll = p | ReadStringsFromPubSub('a_topic', 'a_subscription', 'a_label')
+    pcoll = p | ReadStringsFromPubSub('a_topic', None, 'a_label')
     # Ensure that the output type is str
     self.assertEqual(unicode, pcoll.element_type)
 
@@ -47,9 +47,33 @@ class TestReadStringsFromPubSub(unittest.TestCase):
     # Ensure that the properties passed through correctly
     source = read_pcoll.producer.transform.source
     self.assertEqual('a_topic', source.topic)
+    self.assertEqual('a_label', source.id_label)
+
+  def test_expand_with_subscription(self):
+    p = TestPipeline()
+    pcoll = p | ReadStringsFromPubSub(None, 'a_subscription', 'a_label')
+    # Ensure that the output type is str
+    self.assertEqual(unicode, pcoll.element_type)
+
+    # Ensure that the type on the intermediate read output PCollection is bytes
+    read_pcoll = pcoll.producer.inputs[0]
+    self.assertEqual(bytes, read_pcoll.element_type)
+
+    # Ensure that the properties passed through correctly
+    source = read_pcoll.producer.transform.source
     self.assertEqual('a_subscription', source.subscription)
     self.assertEqual('a_label', source.id_label)
 
+  def test_expand_with_both_topic_and_subscription(self):
+    with self.assertRaisesRegexp(
+        ValueError, "Only one of topic or subscription should be provided."):
+      ReadStringsFromPubSub('a_topic', 'a_subscription', 'a_label')
+
+  def test_expand_with_no_topic_or_subscription(self):
+    with self.assertRaisesRegexp(
+        ValueError, "Either a topic or subscription must be provided."):
+      ReadStringsFromPubSub(None, None, 'a_label')
+
 
 class TestWriteStringsToPubSub(unittest.TestCase):
   def test_expand(self):

http://git-wip-us.apache.org/repos/asf/beam/blob/b5852d21/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
index 3fc8983..d9aa1bf 100644
--- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
+++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
@@ -618,10 +618,12 @@ class DataflowRunner(PipelineRunner):
       if not standard_options.streaming:
         raise ValueError('PubSubPayloadSource is currently available for use '
                          'only in streaming pipelines.')
-      step.add_property(PropertyNames.PUBSUB_TOPIC, transform.source.topic)
-      if transform.source.subscription:
+      # Only one of topic or subscription should be set.
+      if transform.source.topic:
+        step.add_property(PropertyNames.PUBSUB_TOPIC, transform.source.topic)
+      elif transform.source.subscription:
         step.add_property(PropertyNames.PUBSUB_SUBSCRIPTION,
-                          transform.source.topic)
+                          transform.source.subscription)
       if transform.source.id_label:
         step.add_property(PropertyNames.PUBSUB_ID_LABEL,
                           transform.source.id_label)
@@ -639,7 +641,12 @@ class DataflowRunner(PipelineRunner):
     # step should be the type of value outputted by each step.  Read steps
     # automatically wrap output values in a WindowedValue wrapper, if necessary.
     # This is also necessary for proper encoding for size estimation.
-    coder = coders.WindowedValueCoder(transform._infer_output_coder())  # pylint: disable=protected-access
+    # Using a GlobalWindowCoder as a place holder instead of the default
+    # PickleCoder because GlobalWindowCoder is known coder.
+    # TODO(robertwb): Query the collection for the windowfn to extract the
+    # correct coder.
+    coder = coders.WindowedValueCoder(transform._infer_output_coder(),
+                                      coders.coders.GlobalWindowCoder())  # pylint: disable=protected-access
 
     step.encoding = self._get_cloud_encoding(coder)
     step.add_property(
@@ -708,8 +715,12 @@ class DataflowRunner(PipelineRunner):
     step.add_property(PropertyNames.FORMAT, transform.sink.format)
 
     # Wrap coder in WindowedValueCoder: this is necessary for proper encoding
-    # for size estimation.
-    coder = coders.WindowedValueCoder(transform.sink.coder)
+    # for size estimation. Using a GlobalWindowCoder as a place holder instead
+    # of the default PickleCoder because GlobalWindowCoder is known coder.
+    # TODO(robertwb): Query the collection for the windowfn to extract the
+    # correct coder.
+    coder = coders.WindowedValueCoder(transform.sink.coder,
+                                      coders.coders.GlobalWindowCoder())
     step.encoding = self._get_cloud_encoding(coder)
     step.add_property(PropertyNames.ENCODING, step.encoding)
     step.add_property(