You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by al...@apache.org on 2017/06/29 16:46:19 UTC
[1/2] beam git commit: Add PubSub I/O support to Python DirectRunner
Repository: beam
Updated Branches:
refs/heads/master 4d41e25d8 -> 2dd1907c6
Add PubSub I/O support to Python DirectRunner
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/fb7ec28c
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/fb7ec28c
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/fb7ec28c
Branch: refs/heads/master
Commit: fb7ec28cfb1291b04e0eac738054eefe0bb9a103
Parents: 4d41e25
Author: Charles Chen <cc...@google.com>
Authored: Mon Jun 26 18:03:53 2017 -0700
Committer: Ahmet Altay <al...@google.com>
Committed: Thu Jun 29 09:46:03 2017 -0700
----------------------------------------------------------------------
.../apache_beam/examples/streaming_wordcount.py | 12 ++-
sdks/python/apache_beam/io/gcp/pubsub.py | 91 +++++++++++++++-----
sdks/python/apache_beam/io/gcp/pubsub_test.py | 89 +++++++++++--------
.../runners/dataflow/dataflow_runner.py | 11 +--
.../apache_beam/runners/direct/direct_runner.py | 54 ++++++++++++
.../runners/direct/transform_evaluator.py | 89 +++++++++++++++++++
6 files changed, 281 insertions(+), 65 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/fb7ec28c/sdks/python/apache_beam/examples/streaming_wordcount.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/streaming_wordcount.py b/sdks/python/apache_beam/examples/streaming_wordcount.py
index 4c29f2b..7696d77 100644
--- a/sdks/python/apache_beam/examples/streaming_wordcount.py
+++ b/sdks/python/apache_beam/examples/streaming_wordcount.py
@@ -28,6 +28,8 @@ import logging
import apache_beam as beam
+from apache_beam.options.pipeline_options import PipelineOptions
+from apache_beam.options.pipeline_options import StandardOptions
import apache_beam.transforms.window as window
@@ -41,13 +43,17 @@ def run(argv=None):
parser = argparse.ArgumentParser()
parser.add_argument(
'--input_topic', required=True,
- help='Input PubSub topic of the form "/topics/<PROJECT>/<TOPIC>".')
+ help=('Input PubSub topic of the form '
+ '"projects/<PROJECT>/topics/<TOPIC>".'))
parser.add_argument(
'--output_topic', required=True,
- help='Output PubSub topic of the form "/topics/<PROJECT>/<TOPIC>".')
+ help=('Output PubSub topic of the form '
+ '"projects/<PROJECT>/topic/<TOPIC>".'))
known_args, pipeline_args = parser.parse_known_args(argv)
+ options = PipelineOptions(pipeline_args)
+ options.view_as(StandardOptions).streaming = True
- with beam.Pipeline(argv=pipeline_args) as p:
+ with beam.Pipeline(options=options) as p:
# Read from PubSub into a PCollection.
lines = p | beam.io.ReadStringsFromPubSub(known_args.input_topic)
http://git-wip-us.apache.org/repos/asf/beam/blob/fb7ec28c/sdks/python/apache_beam/io/gcp/pubsub.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/gcp/pubsub.py b/sdks/python/apache_beam/io/gcp/pubsub.py
index fabe296..32d388a 100644
--- a/sdks/python/apache_beam/io/gcp/pubsub.py
+++ b/sdks/python/apache_beam/io/gcp/pubsub.py
@@ -24,12 +24,16 @@ This API is currently under development and is subject to change.
from __future__ import absolute_import
+import re
+
from apache_beam import coders
from apache_beam.io.iobase import Read
from apache_beam.io.iobase import Write
from apache_beam.runners.dataflow.native_io import iobase as dataflow_io
+from apache_beam.transforms import core
from apache_beam.transforms import PTransform
from apache_beam.transforms import Map
+from apache_beam.transforms import window
from apache_beam.transforms.display import DisplayDataItem
@@ -43,11 +47,12 @@ class ReadStringsFromPubSub(PTransform):
"""Initializes ``ReadStringsFromPubSub``.
Attributes:
- topic: Cloud Pub/Sub topic in the form "/topics/<project>/<topic>". If
- provided then subscription must be None.
+ topic: Cloud Pub/Sub topic in the form "projects/<project>/topics/
+ <topic>". If provided, subscription must be None.
subscription: Existing Cloud Pub/Sub subscription to use in the
- form "projects/<project>/subscriptions/<subscription>". If provided then
- topic must be None.
+ form "projects/<project>/subscriptions/<subscription>". If not
+ specified, a temporary subscription will be created from the specified
+ topic. If provided, topic must be None.
id_label: The attribute on incoming Pub/Sub messages to use as a unique
record identifier. When specified, the value of this attribute (which
can be any string that uniquely identifies the record) will be used for
@@ -56,17 +61,14 @@ class ReadStringsFromPubSub(PTransform):
case, deduplication of the stream will be strictly best effort.
"""
super(ReadStringsFromPubSub, self).__init__()
- if topic and subscription:
- raise ValueError("Only one of topic or subscription should be provided.")
-
- if not (topic or subscription):
- raise ValueError("Either a topic or subscription must be provided.")
-
self._source = _PubSubPayloadSource(
topic,
subscription=subscription,
id_label=id_label)
+ def get_windowing(self, unused_inputs):
+ return core.Windowing(window.GlobalWindows())
+
def expand(self, pvalue):
pcoll = pvalue.pipeline | Read(self._source)
pcoll.element_type = bytes
@@ -93,15 +95,45 @@ class WriteStringsToPubSub(PTransform):
return pcoll | Write(self._sink)
+PROJECT_ID_REGEXP = '[a-z][-a-z0-9:.]{4,61}[a-z0-9]'
+SUBSCRIPTION_REGEXP = 'projects/([^/]+)/subscriptions/(.+)'
+TOPIC_REGEXP = 'projects/([^/]+)/topics/(.+)'
+
+
+def parse_topic(full_topic):
+ match = re.match(TOPIC_REGEXP, full_topic)
+ if not match:
+ raise ValueError(
+ 'PubSub topic must be in the form "projects/<project>/topics'
+ '/<topic>" (got %r).' % full_topic)
+ project, topic_name = match.group(1), match.group(2)
+ if not re.match(PROJECT_ID_REGEXP, project):
+ raise ValueError('Invalid PubSub project name: %r.' % project)
+ return project, topic_name
+
+
+def parse_subscription(full_subscription):
+ match = re.match(SUBSCRIPTION_REGEXP, full_subscription)
+ if not match:
+ raise ValueError(
+ 'PubSub subscription must be in the form "projects/<project>'
+ '/subscriptions/<subscription>" (got %r).' % full_subscription)
+ project, subscription_name = match.group(1), match.group(2)
+ if not re.match(PROJECT_ID_REGEXP, project):
+ raise ValueError('Invalid PubSub project name: %r.' % project)
+ return project, subscription_name
+
+
class _PubSubPayloadSource(dataflow_io.NativeSource):
"""Source for the payload of a message as bytes from a Cloud Pub/Sub topic.
Attributes:
- topic: Cloud Pub/Sub topic in the form "/topics/<project>/<topic>". If
- provided then topic must be None.
+ topic: Cloud Pub/Sub topic in the form "projects/<project>/topics/<topic>".
+ If provided, subscription must be None.
subscription: Existing Cloud Pub/Sub subscription to use in the
- form "projects/<project>/subscriptions/<subscription>". If provided then
- subscription must be None.
+ form "projects/<project>/subscriptions/<subscription>". If not specified,
+ a temporary subscription will be created from the specified topic. If
+ provided, topic must be None.
id_label: The attribute on incoming Pub/Sub messages to use as a unique
record identifier. When specified, the value of this attribute (which can
be any string that uniquely identifies the record) will be used for
@@ -111,13 +143,26 @@ class _PubSubPayloadSource(dataflow_io.NativeSource):
"""
def __init__(self, topic=None, subscription=None, id_label=None):
- # we are using this coder explicitly for portability reasons of PubsubIO
+ # We are using this coder explicitly for portability reasons of PubsubIO
# across implementations in languages.
self.coder = coders.BytesCoder()
- self.topic = topic
- self.subscription = subscription
+ self.full_topic = topic
+ self.full_subscription = subscription
+ self.topic_name = None
+ self.subscription_name = None
self.id_label = id_label
+ # Perform some validation on the topic and subscription.
+ if not (topic or subscription):
+ raise ValueError('Either a topic or subscription must be provided.')
+ if topic and subscription:
+ raise ValueError('Only one of topic or subscription should be provided.')
+
+ if topic:
+ self.project, self.topic_name = parse_topic(topic)
+ if subscription:
+ self.project, self.subscription_name = parse_subscription(subscription)
+
@property
def format(self):
"""Source format name required for remote execution."""
@@ -128,10 +173,10 @@ class _PubSubPayloadSource(dataflow_io.NativeSource):
DisplayDataItem(self.id_label,
label='ID Label Attribute').drop_if_none(),
'topic':
- DisplayDataItem(self.topic,
- label='Pubsub Topic'),
+ DisplayDataItem(self.full_topic,
+ label='Pubsub Topic').drop_if_none(),
'subscription':
- DisplayDataItem(self.subscription,
+ DisplayDataItem(self.full_subscription,
label='Pubsub Subscription').drop_if_none()}
def reader(self):
@@ -146,7 +191,9 @@ class _PubSubPayloadSink(dataflow_io.NativeSink):
# we are using this coder explicitly for portability reasons of PubsubIO
# across implementations in languages.
self.coder = coders.BytesCoder()
- self.topic = topic
+ self.full_topic = topic
+
+ self.project, self.topic_name = parse_topic(topic)
@property
def format(self):
@@ -154,7 +201,7 @@ class _PubSubPayloadSink(dataflow_io.NativeSink):
return 'pubsub'
def display_data(self):
- return {'topic': DisplayDataItem(self.topic, label='Pubsub Topic')}
+ return {'topic': DisplayDataItem(self.full_topic, label='Pubsub Topic')}
def writer(self):
raise NotImplementedError(
http://git-wip-us.apache.org/repos/asf/beam/blob/fb7ec28c/sdks/python/apache_beam/io/gcp/pubsub_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/gcp/pubsub_test.py b/sdks/python/apache_beam/io/gcp/pubsub_test.py
index 5d3e985..0dcc3c3 100644
--- a/sdks/python/apache_beam/io/gcp/pubsub_test.py
+++ b/sdks/python/apache_beam/io/gcp/pubsub_test.py
@@ -31,89 +31,108 @@ from apache_beam.transforms.display import DisplayData
from apache_beam.transforms.display_test import DisplayDataItemMatcher
+# Protect against environments where the PubSub library is not available.
+# pylint: disable=wrong-import-order, wrong-import-position
+try:
+ from google.cloud import pubsub
+except ImportError:
+ pubsub = None
+# pylint: enable=wrong-import-order, wrong-import-position
+
+
+@unittest.skipIf(pubsub is None, 'GCP dependencies are not installed')
class TestReadStringsFromPubSub(unittest.TestCase):
def test_expand_with_topic(self):
p = TestPipeline()
- pcoll = p | ReadStringsFromPubSub('a_topic', None, 'a_label')
+ pcoll = p | ReadStringsFromPubSub('projects/fakeprj/topics/a_topic',
+ None, 'a_label')
# Ensure that the output type is str
self.assertEqual(unicode, pcoll.element_type)
- # Ensure that the type on the intermediate read output PCollection is bytes
- read_pcoll = pcoll.producer.inputs[0]
- self.assertEqual(bytes, read_pcoll.element_type)
-
# Ensure that the properties passed through correctly
- source = read_pcoll.producer.transform.source
- self.assertEqual('a_topic', source.topic)
+ source = pcoll.producer.transform._source
+ self.assertEqual('a_topic', source.topic_name)
self.assertEqual('a_label', source.id_label)
def test_expand_with_subscription(self):
p = TestPipeline()
- pcoll = p | ReadStringsFromPubSub(None, 'a_subscription', 'a_label')
+ pcoll = p | ReadStringsFromPubSub(
+ None, 'projects/fakeprj/subscriptions/a_subscription', 'a_label')
# Ensure that the output type is str
self.assertEqual(unicode, pcoll.element_type)
- # Ensure that the type on the intermediate read output PCollection is bytes
- read_pcoll = pcoll.producer.inputs[0]
- self.assertEqual(bytes, read_pcoll.element_type)
-
# Ensure that the properties passed through correctly
- source = read_pcoll.producer.transform.source
- self.assertEqual('a_subscription', source.subscription)
+ source = pcoll.producer.transform._source
+ self.assertEqual('a_subscription', source.subscription_name)
self.assertEqual('a_label', source.id_label)
- def test_expand_with_both_topic_and_subscription(self):
- with self.assertRaisesRegexp(
- ValueError, "Only one of topic or subscription should be provided."):
- ReadStringsFromPubSub('a_topic', 'a_subscription', 'a_label')
-
def test_expand_with_no_topic_or_subscription(self):
with self.assertRaisesRegexp(
ValueError, "Either a topic or subscription must be provided."):
ReadStringsFromPubSub(None, None, 'a_label')
+ def test_expand_with_both_topic_and_subscription(self):
+ with self.assertRaisesRegexp(
+ ValueError, "Only one of topic or subscription should be provided."):
+ ReadStringsFromPubSub('a_topic', 'a_subscription', 'a_label')
+
+@unittest.skipIf(pubsub is None, 'GCP dependencies are not installed')
class TestWriteStringsToPubSub(unittest.TestCase):
def test_expand(self):
p = TestPipeline()
- pdone = p | ReadStringsFromPubSub('baz') | WriteStringsToPubSub('a_topic')
+ pdone = (p
+ | ReadStringsFromPubSub('projects/fakeprj/topics/baz')
+ | WriteStringsToPubSub('projects/fakeprj/topics/a_topic'))
# Ensure that the properties passed through correctly
- sink = pdone.producer.transform.sink
- self.assertEqual('a_topic', sink.topic)
-
- # Ensure that the type on the intermediate payload transformer output
- # PCollection is bytes
- write_pcoll = pdone.producer.inputs[0]
- self.assertEqual(bytes, write_pcoll.element_type)
+ self.assertEqual('a_topic', pdone.producer.transform.dofn.topic_name)
+@unittest.skipIf(pubsub is None, 'GCP dependencies are not installed')
class TestPubSubSource(unittest.TestCase):
- def test_display_data(self):
- source = _PubSubPayloadSource('a_topic', 'a_subscription', 'a_label')
+ def test_display_data_topic(self):
+ source = _PubSubPayloadSource(
+ 'projects/fakeprj/topics/a_topic',
+ None,
+ 'a_label')
+ dd = DisplayData.create_from(source)
+ expected_items = [
+ DisplayDataItemMatcher(
+ 'topic', 'projects/fakeprj/topics/a_topic'),
+ DisplayDataItemMatcher('id_label', 'a_label')]
+
+ hc.assert_that(dd.items, hc.contains_inanyorder(*expected_items))
+
+ def test_display_data_subscription(self):
+ source = _PubSubPayloadSource(
+ None,
+ 'projects/fakeprj/subscriptions/a_subscription',
+ 'a_label')
dd = DisplayData.create_from(source)
expected_items = [
- DisplayDataItemMatcher('topic', 'a_topic'),
- DisplayDataItemMatcher('subscription', 'a_subscription'),
+ DisplayDataItemMatcher(
+ 'subscription', 'projects/fakeprj/subscriptions/a_subscription'),
DisplayDataItemMatcher('id_label', 'a_label')]
hc.assert_that(dd.items, hc.contains_inanyorder(*expected_items))
def test_display_data_no_subscription(self):
- source = _PubSubPayloadSource('a_topic')
+ source = _PubSubPayloadSource('projects/fakeprj/topics/a_topic')
dd = DisplayData.create_from(source)
expected_items = [
- DisplayDataItemMatcher('topic', 'a_topic')]
+ DisplayDataItemMatcher('topic', 'projects/fakeprj/topics/a_topic')]
hc.assert_that(dd.items, hc.contains_inanyorder(*expected_items))
+@unittest.skipIf(pubsub is None, 'GCP dependencies are not installed')
class TestPubSubSink(unittest.TestCase):
def test_display_data(self):
- sink = _PubSubPayloadSink('a_topic')
+ sink = _PubSubPayloadSink('projects/fakeprj/topics/a_topic')
dd = DisplayData.create_from(sink)
expected_items = [
- DisplayDataItemMatcher('topic', 'a_topic')]
+ DisplayDataItemMatcher('topic', 'projects/fakeprj/topics/a_topic')]
hc.assert_that(dd.items, hc.contains_inanyorder(*expected_items))
http://git-wip-us.apache.org/repos/asf/beam/blob/fb7ec28c/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
index f213b3b..57bcc5e 100644
--- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
+++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
@@ -668,11 +668,12 @@ class DataflowRunner(PipelineRunner):
raise ValueError('PubSubPayloadSource is currently available for use '
'only in streaming pipelines.')
# Only one of topic or subscription should be set.
- if transform.source.topic:
- step.add_property(PropertyNames.PUBSUB_TOPIC, transform.source.topic)
- elif transform.source.subscription:
+ if transform.source.full_subscription:
step.add_property(PropertyNames.PUBSUB_SUBSCRIPTION,
- transform.source.subscription)
+ transform.source.full_subscription)
+ elif transform.source.full_topic:
+ step.add_property(PropertyNames.PUBSUB_TOPIC,
+ transform.source.full_topic)
if transform.source.id_label:
step.add_property(PropertyNames.PUBSUB_ID_LABEL,
transform.source.id_label)
@@ -756,7 +757,7 @@ class DataflowRunner(PipelineRunner):
if not standard_options.streaming:
raise ValueError('PubSubPayloadSink is currently available for use '
'only in streaming pipelines.')
- step.add_property(PropertyNames.PUBSUB_TOPIC, transform.sink.topic)
+ step.add_property(PropertyNames.PUBSUB_TOPIC, transform.sink.full_topic)
else:
raise ValueError(
'Sink %r has unexpected format %s.' % (
http://git-wip-us.apache.org/repos/asf/beam/blob/fb7ec28c/sdks/python/apache_beam/runners/direct/direct_runner.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/direct/direct_runner.py b/sdks/python/apache_beam/runners/direct/direct_runner.py
index 2a75977..1a94b3d 100644
--- a/sdks/python/apache_beam/runners/direct/direct_runner.py
+++ b/sdks/python/apache_beam/runners/direct/direct_runner.py
@@ -26,8 +26,10 @@ from __future__ import absolute_import
import collections
import logging
+import apache_beam as beam
from apache_beam import typehints
from apache_beam.metrics.execution import MetricsEnvironment
+from apache_beam.pvalue import PCollection
from apache_beam.runners.direct.bundle_factory import BundleFactory
from apache_beam.runners.runner import PipelineResult
from apache_beam.runners.runner import PipelineRunner
@@ -107,6 +109,58 @@ class DirectRunner(PipelineRunner):
.with_output_types(*type_hints.output_types[0]))
return transform.expand(pcoll)
+ def apply_ReadStringsFromPubSub(self, transform, pcoll):
+ try:
+ from google.cloud import pubsub as unused_pubsub
+ except ImportError:
+ raise ImportError('Google Cloud PubSub not available, please install '
+ 'apache_beam[gcp]')
+ # Execute this as a native transform.
+ output = PCollection(pcoll.pipeline)
+ output.element_type = unicode
+ return output
+
+ def apply_WriteStringsToPubSub(self, transform, pcoll):
+ try:
+ from google.cloud import pubsub
+ except ImportError:
+ raise ImportError('Google Cloud PubSub not available, please install '
+ 'apache_beam[gcp]')
+ project = transform._sink.project
+ topic_name = transform._sink.topic_name
+
+ class DirectWriteToPubSub(beam.DoFn):
+ _topic = None
+
+ def __init__(self, project, topic_name):
+ self.project = project
+ self.topic_name = topic_name
+
+ def start_bundle(self):
+ if self._topic is None:
+ self._topic = pubsub.Client(project=self.project).topic(
+ self.topic_name)
+ self._buffer = []
+
+ def process(self, elem):
+ self._buffer.append(elem.encode('utf-8'))
+ if len(self._buffer) >= 100:
+ self._flush()
+
+ def finish_bundle(self):
+ self._flush()
+
+ def _flush(self):
+ if self._buffer:
+ with self._topic.batch() as batch:
+ for datum in self._buffer:
+ batch.publish(datum)
+ self._buffer = []
+
+ output = pcoll | beam.ParDo(DirectWriteToPubSub(project, topic_name))
+ output.element_type = unicode
+ return output
+
def run(self, pipeline):
"""Execute the entire pipeline and returns an DirectPipelineResult."""
http://git-wip-us.apache.org/repos/asf/beam/blob/fb7ec28c/sdks/python/apache_beam/runners/direct/transform_evaluator.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/direct/transform_evaluator.py b/sdks/python/apache_beam/runners/direct/transform_evaluator.py
index 67b2492..641291d 100644
--- a/sdks/python/apache_beam/runners/direct/transform_evaluator.py
+++ b/sdks/python/apache_beam/runners/direct/transform_evaluator.py
@@ -20,6 +20,8 @@
from __future__ import absolute_import
import collections
+import random
+import time
from apache_beam import coders
from apache_beam import pvalue
@@ -48,6 +50,7 @@ from apache_beam.typehints.typecheck import OutputCheckWrapperDoFn
from apache_beam.typehints.typecheck import TypeCheckError
from apache_beam.typehints.typecheck import TypeCheckWrapperDoFn
from apache_beam.utils import counters
+from apache_beam.utils.timestamp import Timestamp
from apache_beam.utils.timestamp import MIN_TIMESTAMP
from apache_beam.options.pipeline_options import TypeOptions
@@ -63,6 +66,7 @@ class TransformEvaluatorRegistry(object):
self._evaluation_context = evaluation_context
self._evaluators = {
io.Read: _BoundedReadEvaluator,
+ io.ReadStringsFromPubSub: _PubSubReadEvaluator,
core.Flatten: _FlattenEvaluator,
core.ParDo: _ParDoEvaluator,
core._GroupByKeyOnly: _GroupByKeyOnlyEvaluator,
@@ -357,6 +361,91 @@ class _TestStreamEvaluator(_TransformEvaluator):
{None: hold})
+class _PubSubSubscriptionWrapper(object):
+ """Wrapper for garbage-collecting temporary PubSub subscriptions."""
+
+ def __init__(self, subscription, should_cleanup):
+ self.subscription = subscription
+ self.should_cleanup = should_cleanup
+
+ def __del__(self):
+ if self.should_cleanup:
+ self.subscription.delete()
+
+
+class _PubSubReadEvaluator(_TransformEvaluator):
+ """TransformEvaluator for PubSub read."""
+
+ _subscription_cache = {}
+
+ def __init__(self, evaluation_context, applied_ptransform,
+ input_committed_bundle, side_inputs, scoped_metrics_container):
+ assert not side_inputs
+ super(_PubSubReadEvaluator, self).__init__(
+ evaluation_context, applied_ptransform, input_committed_bundle,
+ side_inputs, scoped_metrics_container)
+
+ source = self._applied_ptransform.transform._source
+ self._subscription = _PubSubReadEvaluator.get_subscription(
+ self._applied_ptransform, source.project, source.topic_name,
+ source.subscription_name)
+
+ @classmethod
+ def get_subscription(cls, transform, project, topic, subscription_name):
+ if transform not in cls._subscription_cache:
+ from google.cloud import pubsub
+ should_create = not subscription_name
+ if should_create:
+ subscription_name = 'beam_%d_%x' % (
+ int(time.time()), random.randrange(1 << 32))
+ cls._subscription_cache[transform] = _PubSubSubscriptionWrapper(
+ pubsub.Client(project=project).topic(topic).subscription(
+ subscription_name),
+ should_create)
+ if should_create:
+ cls._subscription_cache[transform].subscription.create()
+ return cls._subscription_cache[transform].subscription
+
+ def start_bundle(self):
+ pass
+
+ def process_element(self, element):
+ pass
+
+ def _read_from_pubsub(self):
+ from google.cloud import pubsub
+ # Because of the AutoAck, we are not able to reread messages if this
+ # evaluator fails with an exception before emitting a bundle. However,
+ # the DirectRunner currently doesn't retry work items anyway, so the
+ # pipeline would enter an inconsistent state on any error.
+ with pubsub.subscription.AutoAck(
+ self._subscription, return_immediately=True,
+ max_messages=10) as results:
+ return [message.data for unused_ack_id, message in results.items()]
+
+ def finish_bundle(self):
+ data = self._read_from_pubsub()
+ if data:
+ output_pcollection = list(self._outputs)[0]
+ bundle = self._evaluation_context.create_bundle(output_pcollection)
+ # TODO(ccy): we currently do not use the PubSub message timestamp or
+ # respect the PubSub source's id_label field.
+ now = Timestamp.of(time.time())
+ for message_data in data:
+ bundle.output(GlobalWindows.windowed_value(message_data, timestamp=now))
+ bundles = [bundle]
+ else:
+ bundles = []
+ input_pvalue = self._applied_ptransform.inputs
+ if not input_pvalue:
+ input_pvalue = pvalue.PBegin(self._applied_ptransform.transform.pipeline)
+ unprocessed_bundle = self._evaluation_context.create_bundle(
+ input_pvalue)
+ return TransformResult(
+ self._applied_ptransform, bundles,
+ [unprocessed_bundle], None, {None: Timestamp.of(time.time())})
+
+
class _FlattenEvaluator(_TransformEvaluator):
"""TransformEvaluator for Flatten transform."""
[2/2] beam git commit: This closes #3454
Posted by al...@apache.org.
This closes #3454
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/2dd1907c
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/2dd1907c
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/2dd1907c
Branch: refs/heads/master
Commit: 2dd1907c65cdd04f2a54f2ef1368ee39f72c19fe
Parents: 4d41e25 fb7ec28
Author: Ahmet Altay <al...@google.com>
Authored: Thu Jun 29 09:46:07 2017 -0700
Committer: Ahmet Altay <al...@google.com>
Committed: Thu Jun 29 09:46:07 2017 -0700
----------------------------------------------------------------------
.../apache_beam/examples/streaming_wordcount.py | 12 ++-
sdks/python/apache_beam/io/gcp/pubsub.py | 91 +++++++++++++++-----
sdks/python/apache_beam/io/gcp/pubsub_test.py | 89 +++++++++++--------
.../runners/dataflow/dataflow_runner.py | 11 +--
.../apache_beam/runners/direct/direct_runner.py | 54 ++++++++++++
.../runners/direct/transform_evaluator.py | 89 +++++++++++++++++++
6 files changed, 281 insertions(+), 65 deletions(-)
----------------------------------------------------------------------