You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by da...@apache.org on 2016/06/14 23:13:23 UTC
[48/50] [abbrv] incubator-beam git commit: Move all files to
apache_beam folder
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/apache_beam/coders/typecoders.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/coders/typecoders.py b/sdks/python/apache_beam/coders/typecoders.py
new file mode 100644
index 0000000..98cf2b5
--- /dev/null
+++ b/sdks/python/apache_beam/coders/typecoders.py
@@ -0,0 +1,154 @@
+# Copyright 2016 Google Inc. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""Type coders registration.
+
+This module contains functionality to define and use coders for custom classes.
+Let's say we have a class Xyz and we are processing a PCollection with elements
+of type Xyz. If we do not register a coder for Xyz, a default pickle-based
+fallback coder will be used. This can be undesirable for two reasons. First, we
+may want a faster coder or a more space efficient one. Second, the pickle-based
+coder is not deterministic in the sense that objects like dictionaries or sets
+are not guaranteed to be encoded in the same way every time (elements are not
+really ordered).
+
+Two (sometimes three) steps are needed to define and use a custom coder:
+ - define the coder class
+ - associate the code with the class (a.k.a. coder registration)
+ - typehint DoFns or transforms with the new class or composite types using
+ the class.
+
+A coder class is defined by subclassing from CoderBase and defining the
+encode_to_bytes and decode_from_bytes methods. The framework uses duck-typing
+for coders so it is not strictly required to subclass from CoderBase as long as
+the encode/decode methods are defined.
+
+Registering a coder class is made with a register_coder() call::
+
+ from google.cloud.dataflow import coders
+ ...
+ coders.registry.register_coder(Xyz, XyzCoder)
+
+Additionally, DoFns and PTransforms may need type hints. This is not always
+necessary since there is functionality to infer the return types of DoFns by
+analyzing the code. For instance, for the function below the return type of
+'Xyz' will be inferred::
+
+ def MakeXyzs(v):
+ return Xyz(v)
+
+If Xyz is inferred then its coder will be used whenever the framework needs to
+serialize data (e.g., writing to the shuffler subsystem responsible for group by
+key operations). If a typehint is needed it can be specified by decorating the
+DoFns or using with_input_types/with_output_types methods on PTransforms. For
+example, the above function can be decorated::
+
+ @with_output_types(Xyz)
+ def MakeXyzs(v):
+ return complex_operation_returning_Xyz(v)
+
+See google.cloud.dataflow.typehints.decorators module for more details.
+"""
+
+import logging
+
+from google.cloud.dataflow.coders import coders
+from google.cloud.dataflow.typehints import typehints
+
+
+class CoderRegistry(object):
+ """A coder registry for typehint/coder associations."""
+
+ def __init__(self, fallback_coder=None):
+ self._coders = {}
+ self.custom_types = []
+ self.register_standard_coders(fallback_coder)
+
+ def register_standard_coders(self, fallback_coder):
+ """Register coders for all basic and composite types."""
+ self._register_coder_internal(int, coders.VarIntCoder)
+ self._register_coder_internal(float, coders.FloatCoder)
+ self._register_coder_internal(str, coders.BytesCoder)
+ self._register_coder_internal(bytes, coders.BytesCoder)
+ self._register_coder_internal(unicode, coders.StrUtf8Coder)
+ self._register_coder_internal(typehints.TupleConstraint, coders.TupleCoder)
+ self._register_coder_internal(typehints.AnyTypeConstraint,
+ coders.PickleCoder)
+ self._fallback_coder = fallback_coder or coders.PickleCoder
+
+ def _register_coder_internal(self, typehint_type, typehint_coder_class):
+ self._coders[typehint_type] = typehint_coder_class
+
+ def register_coder(self, typehint_type, typehint_coder_class):
+ if not isinstance(typehint_coder_class, type):
+ raise TypeError('Coder registration requires a coder class object. '
+ 'Received %r instead.' % typehint_coder_class)
+ if typehint_type not in self.custom_types:
+ self.custom_types.append(typehint_type)
+ self._register_coder_internal(typehint_type, typehint_coder_class)
+
+ def get_coder(self, typehint):
+ coder = self._coders.get(
+ typehint.__class__ if isinstance(typehint, typehints.TypeConstraint)
+ else typehint, None)
+ if isinstance(typehint, typehints.TypeConstraint) and coder is not None:
+ return coder.from_type_hint(typehint, self)
+ if coder is None:
+ # We use the fallback coder when there is no coder registered for a
+ # typehint. For example a user defined class with no coder specified.
+ if not hasattr(self, '_fallback_coder'):
+ raise RuntimeError(
+ 'Coder registry has no fallback coder. This can happen if the '
+ 'fast_coders module could not be imported.')
+ if isinstance(typehint, typehints.IterableTypeConstraint):
+ # In this case, we suppress the warning message for using the fallback
+ # coder, since Iterable is hinted as the output of a GroupByKey
+ # operation and that direct output will not be coded.
+ # TODO(ccy): refine this behavior.
+ pass
+ elif typehint is None:
+ # In some old code, None is used for Any.
+ # TODO(robertwb): Clean this up.
+ pass
+ elif isinstance(typehint, typehints.TypeVariable):
+ # TODO(robertwb): Clean this up when type inference is fully enabled.
+ pass
+ else:
+ logging.warning('Using fallback coder for typehint: %r.', typehint)
+ coder = self._fallback_coder
+ return coder.from_type_hint(typehint, self)
+
+ def get_custom_type_coder_tuples(self, types):
+ """Returns type/coder tuples for all custom types passed in."""
+ return [(t, self._coders[t]) for t in types if t in self.custom_types]
+
+ def verify_deterministic(self, key_coder, op_name, silent=True):
+ if not key_coder.is_deterministic():
+ error_msg = ('The key coder "%s" for %s '
+ 'is not deterministic. This may result in incorrect '
+ 'pipeline output. This can be fixed by adding a type '
+ 'hint to the operation preceding the GroupByKey step, '
+ 'and for custom key classes, by writing a '
+ 'deterministic custom Coder. Please see the '
+ 'documentation for more details.' % (key_coder, op_name))
+ if isinstance(key_coder, (coders.PickleCoder, self._fallback_coder)):
+ if not silent:
+ logging.warning(error_msg)
+ return coders.DeterministicPickleCoder(key_coder, op_name)
+ else:
+ raise ValueError(error_msg)
+ else:
+ return key_coder
+
+registry = CoderRegistry()
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/apache_beam/coders/typecoders_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/coders/typecoders_test.py b/sdks/python/apache_beam/coders/typecoders_test.py
new file mode 100644
index 0000000..ed46ede
--- /dev/null
+++ b/sdks/python/apache_beam/coders/typecoders_test.py
@@ -0,0 +1,114 @@
+# Copyright 2016 Google Inc. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""Unit tests for the typecoders module."""
+
+import unittest
+
+from google.cloud.dataflow.coders import coders
+from google.cloud.dataflow.coders import typecoders
+from google.cloud.dataflow.internal import pickler
+from google.cloud.dataflow.typehints import typehints
+
+
+class CustomClass(object):
+
+ def __init__(self, n):
+ self.number = n
+
+ def __eq__(self, other):
+ return self.number == other.number
+
+
+class CustomCoder(coders.Coder):
+
+ def encode(self, value):
+ return str(value.number)
+
+ def decode(self, encoded):
+ return CustomClass(int(encoded))
+
+ def is_deterministic(self):
+ # This coder is deterministic. Though we don't use need this coder to be
+ # deterministic for this test, we annotate this as such to follow best
+ # practices.
+ return True
+
+
+class TypeCodersTest(unittest.TestCase):
+
+ def test_register_non_type_coder(self):
+ coder = CustomCoder()
+ with self.assertRaises(TypeError) as e:
+ # When registering a coder the coder class must be specified.
+ typecoders.registry.register_coder(CustomClass, coder)
+ self.assertEqual(e.exception.message,
+ 'Coder registration requires a coder class object. '
+ 'Received %r instead.' % coder)
+
+ def test_get_coder_with_custom_coder(self):
+ typecoders.registry.register_coder(CustomClass, CustomCoder)
+ self.assertEqual(CustomCoder,
+ typecoders.registry.get_coder(CustomClass).__class__)
+
+ def test_get_coder_with_composite_custom_coder(self):
+ typecoders.registry.register_coder(CustomClass, CustomCoder)
+ coder = typecoders.registry.get_coder(typehints.KV[CustomClass, str])
+ revived_coder = pickler.loads(pickler.dumps(coder))
+ self.assertEqual(
+ (CustomClass(123), 'abc'),
+ revived_coder.decode(revived_coder.encode((CustomClass(123), 'abc'))))
+
+ def test_get_coder_with_standard_coder(self):
+ self.assertEqual(coders.BytesCoder,
+ typecoders.registry.get_coder(str).__class__)
+
+ def test_fallbackcoder(self):
+ coder = typecoders.registry.get_coder(typehints.Any)
+ self.assertEqual(('abc', 123), coder.decode(coder.encode(('abc', 123))))
+
+ def test_get_coder_can_be_pickled(self):
+ coder = typecoders.registry.get_coder(typehints.Tuple[str, int])
+ revived_coder = pickler.loads(pickler.dumps(coder))
+ self.assertEqual(('abc', 123),
+ revived_coder.decode(revived_coder.encode(('abc', 123))))
+
+ def test_standard_int_coder(self):
+ real_coder = typecoders.registry.get_coder(int)
+ expected_coder = coders.VarIntCoder()
+ self.assertEqual(
+ real_coder.encode(0x0404), expected_coder.encode(0x0404))
+ self.assertEqual(0x0404, real_coder.decode(real_coder.encode(0x0404)))
+ self.assertEqual(
+ real_coder.encode(0x040404040404),
+ expected_coder.encode(0x040404040404))
+ self.assertEqual(0x040404040404,
+ real_coder.decode(real_coder.encode(0x040404040404)))
+
+ def test_standard_str_coder(self):
+ real_coder = typecoders.registry.get_coder(str)
+ expected_coder = coders.BytesCoder()
+ self.assertEqual(
+ real_coder.encode('abc'), expected_coder.encode('abc'))
+ self.assertEqual('abc', real_coder.decode(real_coder.encode('abc')))
+
+ real_coder = typecoders.registry.get_coder(bytes)
+ expected_coder = coders.BytesCoder()
+ self.assertEqual(
+ real_coder.encode('abc'), expected_coder.encode('abc'))
+ self.assertEqual('abc', real_coder.decode(real_coder.encode('abc')))
+
+
+if __name__ == '__main__':
+ unittest.main()
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/apache_beam/dataflow_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/dataflow_test.py b/sdks/python/apache_beam/dataflow_test.py
new file mode 100644
index 0000000..c40b88f
--- /dev/null
+++ b/sdks/python/apache_beam/dataflow_test.py
@@ -0,0 +1,405 @@
+# Copyright 2016 Google Inc. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""Integration tests for the dataflow package."""
+
+from __future__ import absolute_import
+
+import logging
+import re
+import unittest
+
+from google.cloud.dataflow.pipeline import Pipeline
+from google.cloud.dataflow.pvalue import AsDict
+from google.cloud.dataflow.pvalue import AsIter as AllOf
+from google.cloud.dataflow.pvalue import AsList
+from google.cloud.dataflow.pvalue import AsSingleton
+from google.cloud.dataflow.pvalue import EmptySideInput
+from google.cloud.dataflow.pvalue import SideOutputValue
+from google.cloud.dataflow.transforms import Create
+from google.cloud.dataflow.transforms import DoFn
+from google.cloud.dataflow.transforms import FlatMap
+from google.cloud.dataflow.transforms import GroupByKey
+from google.cloud.dataflow.transforms import Map
+from google.cloud.dataflow.transforms import ParDo
+from google.cloud.dataflow.transforms import WindowInto
+from google.cloud.dataflow.transforms.util import assert_that
+from google.cloud.dataflow.transforms.util import equal_to
+from google.cloud.dataflow.transforms.window import IntervalWindow
+from google.cloud.dataflow.transforms.window import WindowFn
+
+
+class DataflowTest(unittest.TestCase):
+ """Dataflow integration tests."""
+
+ SAMPLE_DATA = 'aa bb cc aa bb aa \n' * 10
+ SAMPLE_RESULT = [('cc', 10), ('bb', 20), ('aa', 30)]
+
+ # TODO(silviuc): Figure out a nice way to specify labels for stages so that
+ # internal steps get prepended with surorunding stage names.
+ @staticmethod
+ def Count(pcoll): # pylint: disable=invalid-name
+ """A Count transform: v, ... => (v, n), ..."""
+ return (pcoll
+ | Map('AddCount', lambda x: (x, 1))
+ | GroupByKey('GroupCounts')
+ | Map('AddCounts', lambda (x, ones): (x, sum(ones))))
+
+ def test_word_count(self):
+ pipeline = Pipeline('DirectPipelineRunner')
+ lines = pipeline | Create('SomeWords', [DataflowTest.SAMPLE_DATA])
+ result = (
+ (lines | FlatMap('GetWords', lambda x: re.findall(r'\w+', x)))
+ .apply('CountWords', DataflowTest.Count))
+ assert_that(result, equal_to(DataflowTest.SAMPLE_RESULT))
+ pipeline.run()
+
+ def test_map(self):
+ pipeline = Pipeline('DirectPipelineRunner')
+ lines = pipeline | Create('input', ['a', 'b', 'c'])
+ result = (lines
+ | Map('upper', str.upper)
+ | Map('prefix', lambda x, prefix: prefix + x, 'foo-'))
+ assert_that(result, equal_to(['foo-A', 'foo-B', 'foo-C']))
+ pipeline.run()
+
+ def test_word_count_using_get(self):
+ pipeline = Pipeline('DirectPipelineRunner')
+ lines = pipeline | Create('SomeWords', [DataflowTest.SAMPLE_DATA])
+ result = (
+ (lines | FlatMap('GetWords', lambda x: re.findall(r'\w+', x)))
+ .apply('CountWords', DataflowTest.Count))
+ assert_that(result, equal_to(DataflowTest.SAMPLE_RESULT))
+ pipeline.run()
+
+ def test_par_do_with_side_input_as_arg(self):
+ pipeline = Pipeline('DirectPipelineRunner')
+ words_list = ['aa', 'bb', 'cc']
+ words = pipeline | Create('SomeWords', words_list)
+ prefix = pipeline | Create('SomeString', ['xyz']) # side in
+ suffix = 'zyx'
+ result = words | FlatMap(
+ 'DecorateWords',
+ lambda x, pfx, sfx: ['%s-%s-%s' % (pfx, x, sfx)],
+ AsSingleton(prefix), suffix)
+ assert_that(result, equal_to(['xyz-%s-zyx' % x for x in words_list]))
+ pipeline.run()
+
+ def test_par_do_with_side_input_as_keyword_arg(self):
+ pipeline = Pipeline('DirectPipelineRunner')
+ words_list = ['aa', 'bb', 'cc']
+ words = pipeline | Create('SomeWords', words_list)
+ prefix = 'zyx'
+ suffix = pipeline | Create('SomeString', ['xyz']) # side in
+ result = words | FlatMap(
+ 'DecorateWords',
+ lambda x, pfx, sfx: ['%s-%s-%s' % (pfx, x, sfx)],
+ prefix, sfx=AsSingleton(suffix))
+ assert_that(result, equal_to(['zyx-%s-xyz' % x for x in words_list]))
+ pipeline.run()
+
+ def test_par_do_with_do_fn_object(self):
+ class SomeDoFn(DoFn):
+ """A custom DoFn for a FlatMap transform."""
+
+ def process(self, context, prefix, suffix):
+ return ['%s-%s-%s' % (prefix, context.element, suffix)]
+
+ pipeline = Pipeline('DirectPipelineRunner')
+ words_list = ['aa', 'bb', 'cc']
+ words = pipeline | Create('SomeWords', words_list)
+ prefix = 'zyx'
+ suffix = pipeline | Create('SomeString', ['xyz']) # side in
+ result = words | ParDo('DecorateWordsDoFn', SomeDoFn(), prefix,
+ suffix=AsSingleton(suffix))
+ assert_that(result, equal_to(['zyx-%s-xyz' % x for x in words_list]))
+ pipeline.run()
+
+ def test_par_do_with_multiple_outputs_and_using_yield(self):
+ class SomeDoFn(DoFn):
+ """A custom DoFn using yield."""
+
+ def process(self, context):
+ yield context.element
+ if context.element % 2 == 0:
+ yield SideOutputValue('even', context.element)
+ else:
+ yield SideOutputValue('odd', context.element)
+
+ pipeline = Pipeline('DirectPipelineRunner')
+ nums = pipeline | Create('Some Numbers', [1, 2, 3, 4])
+ results = nums | ParDo(
+ 'ClassifyNumbers', SomeDoFn()).with_outputs('odd', 'even', main='main')
+ assert_that(results.main, equal_to([1, 2, 3, 4]))
+ assert_that(results.odd, equal_to([1, 3]), label='assert:odd')
+ assert_that(results.even, equal_to([2, 4]), label='assert:even')
+ pipeline.run()
+
+ def test_par_do_with_multiple_outputs_and_using_return(self):
+ def some_fn(v):
+ if v % 2 == 0:
+ return [v, SideOutputValue('even', v)]
+ else:
+ return [v, SideOutputValue('odd', v)]
+
+ pipeline = Pipeline('DirectPipelineRunner')
+ nums = pipeline | Create('Some Numbers', [1, 2, 3, 4])
+ results = nums | FlatMap(
+ 'ClassifyNumbers', some_fn).with_outputs('odd', 'even', main='main')
+ assert_that(results.main, equal_to([1, 2, 3, 4]))
+ assert_that(results.odd, equal_to([1, 3]), label='assert:odd')
+ assert_that(results.even, equal_to([2, 4]), label='assert:even')
+ pipeline.run()
+
+ def test_empty_singleton_side_input(self):
+ pipeline = Pipeline('DirectPipelineRunner')
+ pcol = pipeline | Create('start', [1, 2])
+ side = pipeline | Create('side', []) # Empty side input.
+
+ def my_fn(k, s):
+ v = ('empty' if isinstance(s, EmptySideInput) else 'full')
+ return [(k, v)]
+ result = pcol | FlatMap('compute', my_fn, AsSingleton(side))
+ assert_that(result, equal_to([(1, 'empty'), (2, 'empty')]))
+ pipeline.run()
+
+ def test_multi_valued_singleton_side_input(self):
+ pipeline = Pipeline('DirectPipelineRunner')
+ pcol = pipeline | Create('start', [1, 2])
+ side = pipeline | Create('side', [3, 4]) # 2 values in side input.
+ pcol | FlatMap('compute', lambda x, s: [x * s], AsSingleton(side))
+ with self.assertRaises(ValueError) as e:
+ pipeline.run()
+
+ def test_default_value_singleton_side_input(self):
+ pipeline = Pipeline('DirectPipelineRunner')
+ pcol = pipeline | Create('start', [1, 2])
+ side = pipeline | Create('side', []) # 0 values in side input.
+ result = (
+ pcol | FlatMap('compute', lambda x, s: [x * s], AsSingleton(side, 10)))
+ assert_that(result, equal_to([10, 20]))
+ pipeline.run()
+
+ def test_iterable_side_input(self):
+ pipeline = Pipeline('DirectPipelineRunner')
+ pcol = pipeline | Create('start', [1, 2])
+ side = pipeline | Create('side', [3, 4]) # 2 values in side input.
+ result = pcol | FlatMap('compute',
+ lambda x, s: [x * y for y in s], AllOf(side))
+ assert_that(result, equal_to([3, 4, 6, 8]))
+ pipeline.run()
+
+ def test_undeclared_side_outputs(self):
+ pipeline = Pipeline('DirectPipelineRunner')
+ nums = pipeline | Create('Some Numbers', [1, 2, 3, 4])
+ results = nums | FlatMap(
+ 'ClassifyNumbers',
+ lambda x: [x, SideOutputValue('even' if x % 2 == 0 else 'odd', x)]
+ ).with_outputs()
+ # TODO(silviuc): Revisit this test to check for undeclared side outputs.
+ # This should work with .with_outputs() without any tags declared and
+ # the results[None] should work also.
+ assert_that(results[None], equal_to([1, 2, 3, 4]))
+ assert_that(results.odd, equal_to([1, 3]), label='assert:odd')
+ assert_that(results.even, equal_to([2, 4]), label='assert:even')
+ pipeline.run()
+
+ def test_empty_side_outputs(self):
+ pipeline = Pipeline('DirectPipelineRunner')
+ nums = pipeline | Create('Some Numbers', [1, 3, 5])
+ results = nums | FlatMap(
+ 'ClassifyNumbers',
+ lambda x: [x, SideOutputValue('even' if x % 2 == 0 else 'odd', x)]
+ ).with_outputs()
+ assert_that(results[None], equal_to([1, 3, 5]))
+ assert_that(results.odd, equal_to([1, 3, 5]), label='assert:odd')
+ assert_that(results.even, equal_to([]), label='assert:even')
+ pipeline.run()
+
+ def test_as_list_and_as_dict_side_inputs(self):
+ a_list = [5, 1, 3, 2, 9]
+ some_pairs = [('crouton', 17), ('supreme', None)]
+ pipeline = Pipeline('DirectPipelineRunner')
+ main_input = pipeline | Create('main input', [1])
+ side_list = pipeline | Create('side list', a_list)
+ side_pairs = pipeline | Create('side pairs', some_pairs)
+ results = main_input | FlatMap(
+ 'concatenate',
+ lambda x, the_list, the_dict: [[x, the_list, the_dict]],
+ AsList(side_list), AsDict(side_pairs))
+
+ def matcher(expected_elem, expected_list, expected_pairs):
+ def match(actual):
+ [[actual_elem, actual_list, actual_dict]] = actual
+ equal_to([expected_elem])([actual_elem])
+ equal_to(expected_list)(actual_list)
+ equal_to(expected_pairs)(actual_dict.iteritems())
+ return match
+
+ assert_that(results, matcher(1, a_list, some_pairs))
+ pipeline.run()
+
+ def test_as_singleton_without_unique_labels(self):
+ # This should succeed as calling AsSingleton on the same PCollection twice
+ # with the same defaults will return the same PCollectionView.
+ a_list = [2]
+ pipeline = Pipeline('DirectPipelineRunner')
+ main_input = pipeline | Create('main input', [1])
+ side_list = pipeline | Create('side list', a_list)
+ results = main_input | FlatMap(
+ 'test',
+ lambda x, s1, s2: [[x, s1, s2]],
+ AsSingleton(side_list), AsSingleton(side_list))
+
+ def matcher(expected_elem, expected_singleton):
+ def match(actual):
+ [[actual_elem, actual_singleton1, actual_singleton2]] = actual
+ equal_to([expected_elem])([actual_elem])
+ equal_to([expected_singleton])([actual_singleton1])
+ equal_to([expected_singleton])([actual_singleton2])
+ return match
+
+ assert_that(results, matcher(1, 2))
+ pipeline.run()
+
+ def test_as_singleton_with_different_defaults_without_unique_labels(self):
+ # This should fail as AsSingleton with distinct default values should create
+ # distinct PCollectionViews with the same full_label.
+ a_list = [2]
+ pipeline = Pipeline('DirectPipelineRunner')
+ main_input = pipeline | Create('main input', [1])
+ side_list = pipeline | Create('side list', a_list)
+
+ with self.assertRaises(RuntimeError) as e:
+ _ = main_input | FlatMap(
+ 'test',
+ lambda x, s1, s2: [[x, s1, s2]],
+ AsSingleton(side_list), AsSingleton(side_list, default_value=3))
+ self.assertTrue(
+ e.exception.message.startswith(
+ 'Transform "ViewAsSingleton(side list.None)" does not have a '
+ 'stable unique label.'))
+
+ def test_as_singleton_with_different_defaults_with_unique_labels(self):
+ a_list = []
+ pipeline = Pipeline('DirectPipelineRunner')
+ main_input = pipeline | Create('main input', [1])
+ side_list = pipeline | Create('side list', a_list)
+ results = main_input | FlatMap(
+ 'test',
+ lambda x, s1, s2: [[x, s1, s2]],
+ AsSingleton('si1', side_list, default_value=2),
+ AsSingleton('si2', side_list, default_value=3))
+
+ def matcher(expected_elem, expected_singleton1, expected_singleton2):
+ def match(actual):
+ [[actual_elem, actual_singleton1, actual_singleton2]] = actual
+ equal_to([expected_elem])([actual_elem])
+ equal_to([expected_singleton1])([actual_singleton1])
+ equal_to([expected_singleton2])([actual_singleton2])
+ return match
+
+ assert_that(results, matcher(1, 2, 3))
+ pipeline.run()
+
+ def test_as_list_without_unique_labels(self):
+ # This should succeed as calling AsList on the same PCollection twice will
+ # return the same PCollectionView.
+ a_list = [1, 2, 3]
+ pipeline = Pipeline('DirectPipelineRunner')
+ main_input = pipeline | Create('main input', [1])
+ side_list = pipeline | Create('side list', a_list)
+ results = main_input | FlatMap(
+ 'test',
+ lambda x, ls1, ls2: [[x, ls1, ls2]],
+ AsList(side_list), AsList(side_list))
+
+ def matcher(expected_elem, expected_list):
+ def match(actual):
+ [[actual_elem, actual_list1, actual_list2]] = actual
+ equal_to([expected_elem])([actual_elem])
+ equal_to(expected_list)(actual_list1)
+ equal_to(expected_list)(actual_list2)
+ return match
+
+ assert_that(results, matcher(1, [1, 2, 3]))
+ pipeline.run()
+
+ def test_as_list_with_unique_labels(self):
+ a_list = [1, 2, 3]
+ pipeline = Pipeline('DirectPipelineRunner')
+ main_input = pipeline | Create('main input', [1])
+ side_list = pipeline | Create('side list', a_list)
+ results = main_input | FlatMap(
+ 'test',
+ lambda x, ls1, ls2: [[x, ls1, ls2]],
+ AsList(side_list), AsList(side_list, label='label'))
+
+ def matcher(expected_elem, expected_list):
+ def match(actual):
+ [[actual_elem, actual_list1, actual_list2]] = actual
+ equal_to([expected_elem])([actual_elem])
+ equal_to(expected_list)(actual_list1)
+ equal_to(expected_list)(actual_list2)
+ return match
+
+ assert_that(results, matcher(1, [1, 2, 3]))
+ pipeline.run()
+
+ def test_as_dict_with_unique_labels(self):
+ some_kvs = [('a', 1), ('b', 2)]
+ pipeline = Pipeline('DirectPipelineRunner')
+ main_input = pipeline | Create('main input', [1])
+ side_kvs = pipeline | Create('side kvs', some_kvs)
+ results = main_input | FlatMap(
+ 'test',
+ lambda x, dct1, dct2: [[x, dct1, dct2]],
+ AsDict(side_kvs), AsDict(side_kvs, label='label'))
+
+ def matcher(expected_elem, expected_kvs):
+ def match(actual):
+ [[actual_elem, actual_dict1, actual_dict2]] = actual
+ equal_to([expected_elem])([actual_elem])
+ equal_to(expected_kvs)(actual_dict1.iteritems())
+ equal_to(expected_kvs)(actual_dict2.iteritems())
+ return match
+
+ assert_that(results, matcher(1, some_kvs))
+ pipeline.run()
+
+ def test_window_transform(self):
+ class TestWindowFn(WindowFn):
+ """Windowing function adding two disjoint windows to each element."""
+
+ def assign(self, assign_context):
+ _ = assign_context
+ return [IntervalWindow(10, 20), IntervalWindow(20, 30)]
+
+ def merge(self, existing_windows):
+ return existing_windows
+
+ pipeline = Pipeline('DirectPipelineRunner')
+ numbers = pipeline | Create('KVs', [(1, 10), (2, 20), (3, 30)])
+ result = (numbers
+ | WindowInto('W', windowfn=TestWindowFn())
+ | GroupByKey('G'))
+ assert_that(
+ result, equal_to([(1, [10]), (1, [10]), (2, [20]),
+ (2, [20]), (3, [30]), (3, [30])]))
+ pipeline.run()
+
+
+if __name__ == '__main__':
+ logging.getLogger().setLevel(logging.INFO)
+ unittest.main()
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/apache_beam/error.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/error.py b/sdks/python/apache_beam/error.py
new file mode 100644
index 0000000..779c4d9
--- /dev/null
+++ b/sdks/python/apache_beam/error.py
@@ -0,0 +1,39 @@
+# Copyright 2016 Google Inc. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""Python Dataflow error classes."""
+
+
+class DataflowError(Exception):
+ """Base class for all Dataflow errors."""
+
+
+class PipelineError(DataflowError):
+ """An error in the pipeline object (e.g. a PValue not linked to it)."""
+
+
+class PValueError(DataflowError):
+ """An error related to a PValue object (e.g. value is not computed)."""
+
+
+class RunnerError(DataflowError):
+ """An error related to a Runner object (e.g. cannot find a runner to run)."""
+
+
+class SideInputError(DataflowError):
+ """An error related to a side input to a parallel Do operation."""
+
+
+class TransformError(DataflowError):
+ """An error related to a PTransform object."""
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/apache_beam/examples/__init__.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/__init__.py b/sdks/python/apache_beam/examples/__init__.py
new file mode 100644
index 0000000..e69de29
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/apache_beam/examples/complete/autocomplete.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/complete/autocomplete.py b/sdks/python/apache_beam/examples/complete/autocomplete.py
new file mode 100644
index 0000000..400863d
--- /dev/null
+++ b/sdks/python/apache_beam/examples/complete/autocomplete.py
@@ -0,0 +1,79 @@
+# Copyright 2016 Google Inc. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""A workflow emitting the top k most common words for each prefix."""
+
+from __future__ import absolute_import
+
+import argparse
+import logging
+import re
+
+import google.cloud.dataflow as df
+
+
+def run(argv=None):
+
+ parser = argparse.ArgumentParser()
+ parser.add_argument('--input',
+ required=True,
+ help='Input file to process.')
+ parser.add_argument('--output',
+ required=True,
+ help='Output file to write results to.')
+ known_args, pipeline_args = parser.parse_known_args(argv)
+
+ p = df.Pipeline(argv=pipeline_args)
+
+ (p # pylint: disable=expression-not-assigned
+ | df.io.Read('read', df.io.TextFileSource(known_args.input))
+ | df.FlatMap('split', lambda x: re.findall(r'[A-Za-z\']+', x))
+ | TopPerPrefix('TopPerPrefix', 5)
+ | df.Map('format',
+ lambda (prefix, candidates): '%s: %s' % (prefix, candidates))
+ | df.io.Write('write', df.io.TextFileSink(known_args.output)))
+ p.run()
+
+
+class TopPerPrefix(df.PTransform):
+
+ def __init__(self, label, count):
+ super(TopPerPrefix, self).__init__(label)
+ self._count = count
+
+ def apply(self, words):
+ """Compute the most common words for each possible prefixes.
+
+ Args:
+ words: a PCollection of strings
+
+ Returns:
+ A PCollection of most common words with each prefix, in the form
+ (prefix, [(count, word), (count, word), ...])
+ """
+ return (words
+ | df.combiners.Count.PerElement()
+ | df.FlatMap(extract_prefixes)
+ | df.combiners.Top.LargestPerKey(self._count))
+
+
+def extract_prefixes((word, count)):
+ for k in range(1, len(word) + 1):
+ prefix = word[:k]
+ yield prefix, (count, word)
+
+
+if __name__ == '__main__':
+ logging.getLogger().setLevel(logging.INFO)
+ run()
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/apache_beam/examples/complete/autocomplete_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/complete/autocomplete_test.py b/sdks/python/apache_beam/examples/complete/autocomplete_test.py
new file mode 100644
index 0000000..3c10483
--- /dev/null
+++ b/sdks/python/apache_beam/examples/complete/autocomplete_test.py
@@ -0,0 +1,78 @@
+# Copyright 2016 Google Inc. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""Test for the wordcount example."""
+
+import collections
+import logging
+import re
+import tempfile
+import unittest
+
+
+import google.cloud.dataflow as df
+from google.cloud.dataflow.examples.complete import autocomplete
+from google.cloud.dataflow.pvalue import AsIter
+from google.cloud.dataflow.utils import options
+
+# TODO(robertwb): Move to testing utilities.
+
+
+def assert_that(pcoll, matcher):
+ """Asserts that the give PCollection satisfies the constraints of the matcher
+ in a way that is runnable locally or on a remote service.
+ """
+ singleton = pcoll.pipeline | df.Create('create_singleton', [None])
+
+ def check_matcher(_, side_value):
+ assert matcher(side_value)
+ return []
+ singleton | df.FlatMap(check_matcher, AsIter(pcoll))
+
+
+def contains_in_any_order(expected):
+ def matcher(value):
+ vs = collections.Counter(value)
+ es = collections.Counter(expected)
+ if vs != es:
+ raise ValueError(
+ 'extra: %s, missing: %s' % (vs - es, es - vs))
+ return True
+ return matcher
+
+
+class WordCountTest(unittest.TestCase):
+
+ WORDS = ['this', 'this', 'that', 'to', 'to', 'to']
+
+ def test_top_prefixes(self):
+ p = df.Pipeline('DirectPipelineRunner')
+ words = p | df.Create('create', self.WORDS)
+ result = words | autocomplete.TopPerPrefix('test', 5)
+ # values must be hashable for now
+ result = result | df.Map(lambda (k, vs): (k, tuple(vs)))
+ assert_that(result, contains_in_any_order(
+ [
+ ('t', ((3, 'to'), (2, 'this'), (1, 'that'))),
+ ('to', ((3, 'to'), )),
+ ('th', ((2, 'this'), (1, 'that'))),
+ ('thi', ((2, 'this'), )),
+ ('this', ((2, 'this'), )),
+ ('tha', ((1, 'that'), )),
+ ('that', ((1, 'that'), )),
+ ]))
+ p.run()
+
+if __name__ == '__main__':
+ unittest.main()
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/apache_beam/examples/complete/estimate_pi.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/complete/estimate_pi.py b/sdks/python/apache_beam/examples/complete/estimate_pi.py
new file mode 100644
index 0000000..0e52bad
--- /dev/null
+++ b/sdks/python/apache_beam/examples/complete/estimate_pi.py
@@ -0,0 +1,109 @@
+# -*- coding: utf-8 -*-
+# Copyright 2016 Google Inc. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""A workflow that uses a simple Monte Carlo method to estimate \u03c0.
+
+The algorithm computes the fraction of points drawn uniformly within the unit
+square that also fall in the quadrant of the unit circle that overlaps the
+square. A simple area calculation shows that this fraction should be \u03c0/4, so
+we multiply our counts ratio by four to estimate \u03c0.
+"""
+
+from __future__ import absolute_import
+
+import argparse
+import json
+import logging
+import random
+
+
+import google.cloud.dataflow as df
+from google.cloud.dataflow.typehints import Any
+from google.cloud.dataflow.typehints import Iterable
+from google.cloud.dataflow.typehints import Tuple
+
+
+@df.typehints.with_output_types(Tuple[int, int, int])
+@df.typehints.with_input_types(int)
+def run_trials(runs):
+ """Run trials and return a 3-tuple representing the results.
+
+ Args:
+ runs: Number of trial runs to be executed.
+
+ Returns:
+ A 3-tuple (total trials, inside trials, 0).
+
+ The final zero is needed solely to make sure that the combine_results function
+ has same type for inputs and outputs (a requirement for combiner functions).
+ """
+ inside_runs = 0
+ for _ in xrange(runs):
+ x = random.uniform(0, 1)
+ y = random.uniform(0, 1)
+ inside_runs += 1 if x * x + y * y <= 1.0 else 0
+ return runs, inside_runs, 0
+
+
+@df.typehints.with_output_types(Tuple[int, int, float])
+@df.typehints.with_input_types(Iterable[Tuple[int, int, Any]])
+def combine_results(results):
+ """Combiner function to sum up trials and compute the estimate.
+
+ Args:
+ results: An iterable of 3-tuples (total trials, inside trials, ignored).
+
+ Returns:
+ A 3-tuple containing the sum of total trials, sum of inside trials, and
+ the probability computed from the two numbers.
+ """
+ # TODO(silviuc): Do we guarantee that argument can be iterated repeatedly?
+ # Should document one way or the other.
+ total, inside = sum(r[0] for r in results), sum(r[1] for r in results)
+ return total, inside, 4 * float(inside) / total
+
+
+class JsonCoder(object):
+ """A JSON coder used to format the final result."""
+
+ def encode(self, x):
+ return json.dumps(x)
+
+
+def run(argv=None):
+
+ parser = argparse.ArgumentParser()
+ parser.add_argument('--output',
+ required=True,
+ help='Output file to write results to.')
+ known_args, pipeline_args = parser.parse_known_args(argv)
+
+ p = df.Pipeline(argv=pipeline_args)
+ # A thousand work items of a million tries each.
+ (p # pylint: disable=expression-not-assigned
+ | df.Create('Initialize', [100000] * 100).with_output_types(int)
+ | df.Map('Run trials', run_trials)
+ | df.CombineGlobally('Sum', combine_results).without_defaults()
+ | df.io.Write('Write',
+ df.io.TextFileSink(known_args.output,
+ coder=JsonCoder())))
+
+ # Actually run the pipeline (all operations above are deferred).
+ p.run()
+
+
+if __name__ == '__main__':
+ logging.getLogger().setLevel(logging.INFO)
+ run()
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/apache_beam/examples/complete/estimate_pi_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/complete/estimate_pi_test.py b/sdks/python/apache_beam/examples/complete/estimate_pi_test.py
new file mode 100644
index 0000000..0c5be30
--- /dev/null
+++ b/sdks/python/apache_beam/examples/complete/estimate_pi_test.py
@@ -0,0 +1,46 @@
+# Copyright 2016 Google Inc. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""Test for the estimate_pi example."""
+
+import json
+import logging
+import tempfile
+import unittest
+
+from google.cloud.dataflow.examples.complete import estimate_pi
+
+
+class EstimatePiTest(unittest.TestCase):
+
+ def create_temp_file(self, contents):
+ with tempfile.NamedTemporaryFile(delete=False) as f:
+ f.write(contents)
+ return f.name
+
+ def test_basics(self):
+ temp_path = self.create_temp_file('result')
+ estimate_pi.run([
+ '--output=%s' % temp_path])
+ # Parse result file and compare.
+ with open(temp_path + '-00000-of-00001') as result_file:
+ estimated_pi = json.loads(result_file.readline())[2]
+ # Note: Probabilistically speaking this test can fail with a probability
+ # that is very small (VERY) given that we run at least 10 million trials.
+ self.assertTrue(estimated_pi > 3.13 and estimated_pi < 3.15)
+
+
+if __name__ == '__main__':
+ logging.getLogger().setLevel(logging.INFO)
+ unittest.main()
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/apache_beam/examples/complete/juliaset/juliaset/__init__.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/complete/juliaset/juliaset/__init__.py b/sdks/python/apache_beam/examples/complete/juliaset/juliaset/__init__.py
new file mode 100644
index 0000000..e69de29
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/apache_beam/examples/complete/juliaset/juliaset/juliaset.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/complete/juliaset/juliaset/juliaset.py b/sdks/python/apache_beam/examples/complete/juliaset/juliaset/juliaset.py
new file mode 100644
index 0000000..3546f03
--- /dev/null
+++ b/sdks/python/apache_beam/examples/complete/juliaset/juliaset/juliaset.py
@@ -0,0 +1,119 @@
+# Copyright 2016 Google Inc. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""A Julia set computing workflow: https://en.wikipedia.org/wiki/Julia_set.
+
+We use the quadratic polinomial f(z) = z*z + c, with c = -.62772 +.42193i
+"""
+
+from __future__ import absolute_import
+
+import argparse
+
+import google.cloud.dataflow as df
+
+
+def from_pixel(x, y, n):
+ """Converts a NxN pixel position to a (-1..1, -1..1) complex number."""
+ return complex(2.0 * x / n - 1.0, 2.0 * y / n - 1.0)
+
+
+def get_julia_set_point_color((x, y), c, n, max_iterations):
+ """Given an pixel, convert it into a point in our julia set."""
+ z = from_pixel(x, y, n)
+ for i in xrange(max_iterations):
+ if z.real * z.real + z.imag * z.imag > 2.0:
+ break
+ z = z * z + c
+ return x, y, i # pylint: disable=undefined-loop-variable
+
+
+def generate_julia_set_colors(pipeline, c, n, max_iterations):
+ """Compute julia set coordinates for each point in our set."""
+ def point_set(n):
+ for x in range(n):
+ for y in range(n):
+ yield (x, y)
+
+ julia_set_colors = (pipeline
+ | df.Create('add points', point_set(n))
+ | df.Map(get_julia_set_point_color, c, n, max_iterations))
+
+ return julia_set_colors
+
+
+def generate_julia_set_visualization(data, n, max_iterations):
+ """Generate the pixel matrix for rendering the julia set as an image."""
+ import numpy as np # pylint: disable=g-import-not-at-top
+ colors = []
+ for r in range(0, 256, 16):
+ for g in range(0, 256, 16):
+ for b in range(0, 256, 16):
+ colors.append((r, g, b))
+
+ xy = np.zeros((n, n, 3), dtype=np.uint8)
+ for x, y, iteration in data:
+ xy[x, y] = colors[iteration * len(colors) / max_iterations]
+
+ return xy
+
+
+def save_julia_set_visualization(out_file, image_array):
+ """Save the fractal image of our julia set as a png."""
+ from matplotlib import pyplot as plt # pylint: disable=g-import-not-at-top
+ plt.imsave(out_file, image_array, format='png')
+
+
+def run(argv=None): # pylint: disable=missing-docstring
+
+ parser = argparse.ArgumentParser()
+ parser.add_argument('--grid_size',
+ dest='grid_size',
+ default=1000,
+ help='Size of the NxN matrix')
+ parser.add_argument(
+ '--coordinate_output',
+ dest='coordinate_output',
+ required=True,
+ help='Output file to write the color coordinates of the image to.')
+ parser.add_argument('--image_output',
+ dest='image_output',
+ default=None,
+ help='Output file to write the resulting image to.')
+ known_args, pipeline_args = parser.parse_known_args(argv)
+
+ p = df.Pipeline(argv=pipeline_args)
+ n = int(known_args.grid_size)
+
+ coordinates = generate_julia_set_colors(p, complex(-.62772, .42193), n, 100)
+
+ # Group each coordinate triplet by its x value, then write the coordinates to
+ # the output file with an x-coordinate grouping per line.
+ # pylint: disable=expression-not-assigned
+ # pylint: disable=g-long-lambda
+ (coordinates | df.Map('x coord key', lambda (x, y, i): (x, (x, y, i)))
+ | df.GroupByKey('x coord') | df.Map(
+ 'format',
+ lambda (k, coords): ' '.join('(%s, %s, %s)' % coord for coord in coords))
+ | df.io.Write('write', df.io.TextFileSink(known_args.coordinate_output)))
+ # pylint: enable=g-long-lambda
+ # pylint: enable=expression-not-assigned
+ p.run()
+
+ # Optionally render the image and save it to a file.
+ # TODO(silviuc): Add this functionality.
+ # if p.options.image_output is not None:
+ # julia_set_image = generate_julia_set_visualization(
+ # file_with_coordinates, n, 100)
+ # save_julia_set_visualization(p.options.image_output, julia_set_image)
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/apache_beam/examples/complete/juliaset/juliaset/juliaset_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/complete/juliaset/juliaset/juliaset_test.py b/sdks/python/apache_beam/examples/complete/juliaset/juliaset/juliaset_test.py
new file mode 100644
index 0000000..33c434a
--- /dev/null
+++ b/sdks/python/apache_beam/examples/complete/juliaset/juliaset/juliaset_test.py
@@ -0,0 +1,83 @@
+# Copyright 2016 Google Inc. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""Test for the juliaset example."""
+
+import logging
+import os
+import re
+import tempfile
+import unittest
+
+
+from google.cloud.dataflow.examples.complete.juliaset.juliaset import juliaset
+
+
+class JuliaSetTest(unittest.TestCase):
+
+ def setUp(self):
+ self.test_files = {}
+ self.test_files['output_coord_file_name'] = self.generate_temp_file()
+ self.test_files['output_image_file_name'] = self.generate_temp_file()
+
+ def tearDown(self):
+ for test_file in self.test_files.values():
+ if os.path.exists(test_file):
+ os.remove(test_file)
+
+ def generate_temp_file(self):
+ with tempfile.NamedTemporaryFile(delete=False) as temp_file:
+ return temp_file.name
+
+ def run_example(self, grid_size, image_file_name=None):
+ args = [
+ '--coordinate_output=%s' % self.test_files['output_coord_file_name'],
+ '--grid_size=%s' % grid_size,
+ ]
+
+ if image_file_name is not None:
+ args.append('--image_output=%s' % image_file_name)
+
+ juliaset.run(args)
+
+ def test_output_file_format(self):
+ grid_size = 5
+ self.run_example(grid_size)
+
+ # Parse the results from the file, and ensure it was written in the proper
+ # format.
+ with open(self.test_files['output_coord_file_name'] +
+ '-00000-of-00001') as result_file:
+ output_lines = result_file.readlines()
+
+ # Should have a line for each x-coordinate.
+ self.assertEqual(grid_size, len(output_lines))
+ for line in output_lines:
+ coordinates = re.findall(r'(\(\d+, \d+, \d+\))', line)
+
+ # Should have 5 coordinates on each line.
+ self.assertTrue(coordinates)
+ self.assertEqual(grid_size, len(coordinates))
+
+ def test_generate_fractal_image(self):
+ temp_image_file = self.test_files['output_image_file_name']
+ self.run_example(10, image_file_name=temp_image_file)
+
+ # Ensure that the image was saved properly.
+ # TODO(silviuc): Reactivate the test when --image_output is supported.
+ # self.assertTrue(os.stat(temp_image_file).st_size > 0)
+
+if __name__ == '__main__':
+ logging.getLogger().setLevel(logging.INFO)
+ unittest.main()
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/apache_beam/examples/complete/juliaset/juliaset_main.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/complete/juliaset/juliaset_main.py b/sdks/python/apache_beam/examples/complete/juliaset/juliaset_main.py
new file mode 100644
index 0000000..39a58d6
--- /dev/null
+++ b/sdks/python/apache_beam/examples/complete/juliaset/juliaset_main.py
@@ -0,0 +1,55 @@
+# Copyright 2016 Google Inc. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""A Julia set computing workflow: https://en.wikipedia.org/wiki/Julia_set.
+
+This example has in the juliaset/ folder all the code needed to execute the
+workflow. It is organized in this way so that it can be packaged as a Python
+package and later installed in the VM workers executing the job. The root
+directory for the example contains just a "driver" script to launch the job
+and the setup.py file needed to create a package.
+
+The advantages for organizing the code is that large projects will naturally
+evolve beyond just one module and you will have to make sure the additional
+modules are present in the worker.
+
+In Python Dataflow, using the --setup_file option when submitting a job, will
+trigger creating a source distribution (as if running python setup.py sdist) and
+then staging the resulting tarball in the staging area. The workers, upon
+startup, will install the tarball.
+
+Below is a complete command line for running the juliaset workflow remotely as
+an example:
+
+python juliaset_main.py \
+ --job_name juliaset-$USER \
+ --project YOUR-PROJECT \
+ --runner BlockingDataflowPipelineRunner \
+ --setup_file ./setup.py \
+ --staging_location gs://YOUR-BUCKET/juliaset/staging \
+ --temp_location gs://YOUR-BUCKET/juliaset/temp \
+ --coordinate_output gs://YOUR-BUCKET/juliaset/out \
+ --grid_size 20 \
+
+"""
+
+import logging
+
+
+from juliaset import juliaset
+
+
+if __name__ == '__main__':
+ logging.getLogger().setLevel(logging.INFO)
+ juliaset.run()
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/apache_beam/examples/complete/juliaset/setup.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/complete/juliaset/setup.py b/sdks/python/apache_beam/examples/complete/juliaset/setup.py
new file mode 100644
index 0000000..91d6588
--- /dev/null
+++ b/sdks/python/apache_beam/examples/complete/juliaset/setup.py
@@ -0,0 +1,115 @@
+# Copyright 2016 Google Inc. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""Setup.py module for the workflow's worker utilities.
+
+All the workflow related code is gathered in a package that will be built as a
+source distribution, staged in the staging area for the workflow being run and
+then installed in the workers when they start running.
+
+This behavior is triggered by specifying the --setup_file command line option
+when running the workflow for remote execution.
+"""
+
+import subprocess
+
+import setuptools
+from setuptools.command.bdist_egg import bdist_egg as _bdist_egg
+
+
+class bdist_egg(_bdist_egg): # pylint: disable=invalid-name
+ """A bdist_egg command class that will be invoked during package install.
+
+ The package built using the current setup.py will be staged and later
+ installed in the worker using `easy_install package'. This class will be
+ instantiated during install for this specific scenario and will trigger
+ running the custom commands specified.
+ """
+
+ def run(self):
+ self.run_command('CustomCommands')
+ _bdist_egg.run(self)
+
+
+# Some custom command to run during setup. The command is not essential for this
+# workflow. It is used here as an example. Each command will spawn a child
+# process. Typically, these commands will include steps to install non-Python
+# packages. For instance, to install a C++-based library libjpeg62 the following
+# two commands will have to be added:
+#
+# ['apt-get', 'update'],
+# ['apt-get', '--assume-yes', install', 'libjpeg62'],
+#
+# First, note that there is no need to use the sudo command because the setup
+# script runs with appropriate access.
+# Second, if apt-get tool is used then the first command needs to be 'apt-get
+# update' so the tool refreshes itself and initializes links to download
+# repositories. Without this initial step the other apt-get install commands
+# will fail with package not found errors. Note also --assume-yes option which
+# shortcuts the interactive confirmation.
+#
+# The output of custom commands (including failures) will be logged in the
+# worker-startup log.
+CUSTOM_COMMANDS = [
+ ['echo', 'Custom command worked!']]
+
+
+class CustomCommands(setuptools.Command):
+ """A setuptools Command class able to run arbitrary commands."""
+
+ def initialize_options(self):
+ pass
+
+ def finalize_options(self):
+ pass
+
+ def RunCustomCommand(self, command_list):
+ print 'Running command: %s' % command_list
+ p = subprocess.Popen(
+ command_list,
+ stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
+ # Can use communicate(input='y\n'.encode()) if the command run requires
+ # some confirmation.
+ stdout_data, _ = p.communicate()
+ print 'Command output: %s' % stdout_data
+ if p.returncode != 0:
+ raise RuntimeError(
+ 'Command %s failed: exit code: %s' % (command_list, p.returncode))
+
+ def run(self):
+ for command in CUSTOM_COMMANDS:
+ self.RunCustomCommand(command)
+
+
+# Configure the required packages and scripts to install.
+# Note that the Python Dataflow containers come with numpy already installed
+# so this dependency will not trigger anything to be installed unless a version
+# restriction is specified.
+REQUIRED_PACKAGES = [
+ 'numpy',
+ ]
+
+
+setuptools.setup(
+ name='juliaset',
+ version='0.0.1',
+ description='Julia set workflow package.',
+ install_requires=REQUIRED_PACKAGES,
+ packages=setuptools.find_packages(),
+ cmdclass={
+ # Command class instantiated and run during easy_install scenarios.
+ 'bdist_egg': bdist_egg,
+ 'CustomCommands': CustomCommands,
+ }
+ )
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/apache_beam/examples/complete/tfidf.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/complete/tfidf.py b/sdks/python/apache_beam/examples/complete/tfidf.py
new file mode 100644
index 0000000..fcdfac8
--- /dev/null
+++ b/sdks/python/apache_beam/examples/complete/tfidf.py
@@ -0,0 +1,196 @@
+# Copyright 2016 Google Inc. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""A TF-IDF workflow (term frequency - inverse document frequency).
+
+For an explanation of the TF-IDF algorithm see the following link:
+http://en.wikipedia.org/wiki/Tf-idf
+"""
+
+from __future__ import absolute_import
+
+import argparse
+import glob
+import math
+import re
+
+import google.cloud.dataflow as df
+from google.cloud.dataflow.pvalue import AsSingleton
+
+
+def read_documents(pipeline, uris):
+ """Read the documents at the provided uris and returns (uri, line) pairs."""
+ pcolls = []
+ for uri in uris:
+ pcolls.append(
+ pipeline
+ | df.io.Read('read: %s' % uri, df.io.TextFileSource(uri))
+ | df.Map('withkey: %s' % uri, lambda v, uri: (uri, v), uri))
+ return pcolls | df.Flatten('flatten read pcolls')
+
+
+class TfIdf(df.PTransform):
+ """A transform containing a basic TF-IDF pipeline.
+
+ The input consists of KV objects where the key is the document's URI and
+ the value is a piece of the document's content.
+ The output is mapping from terms to scores for each document URI.
+ """
+
+ def apply(self, uri_to_content):
+
+ # Compute the total number of documents, and prepare a singleton
+ # PCollection to use as side input.
+ total_documents = (
+ uri_to_content
+ | df.Keys('get uris')
+ | df.RemoveDuplicates('get unique uris')
+ | df.combiners.Count.Globally(' count uris'))
+
+ # Create a collection of pairs mapping a URI to each of the words
+ # in the document associated with that that URI.
+
+ def split_into_words((uri, line)):
+ return [(uri, w.lower()) for w in re.findall(r'[A-Za-z\']+', line)]
+
+ uri_to_words = (
+ uri_to_content
+ | df.FlatMap('split words', split_into_words))
+
+ # Compute a mapping from each word to the total number of documents
+ # in which it appears.
+ word_to_doc_count = (
+ uri_to_words
+ | df.RemoveDuplicates('get unique words per doc')
+ | df.Values('get words')
+ | df.combiners.Count.PerElement('count docs per word'))
+
+ # Compute a mapping from each URI to the total number of words in the
+ # document associated with that URI.
+ uri_to_word_total = (
+ uri_to_words
+ | df.Keys(' get uris')
+ | df.combiners.Count.PerElement('count words in doc'))
+
+ # Count, for each (URI, word) pair, the number of occurrences of that word
+ # in the document associated with the URI.
+ uri_and_word_to_count = (
+ uri_to_words
+ | df.combiners.Count.PerElement('count word-doc pairs'))
+
+ # Adjust the above collection to a mapping from (URI, word) pairs to counts
+ # into an isomorphic mapping from URI to (word, count) pairs, to prepare
+ # for a join by the URI key.
+ uri_to_word_and_count = (
+ uri_and_word_to_count
+ | df.Map('shift keys',
+ lambda ((uri, word), count): (uri, (word, count))))
+
+ # Perform a CoGroupByKey (a sort of pre-join) on the prepared
+ # uri_to_word_total and uri_to_word_and_count tagged by 'word totals' and
+ # 'word counts' strings. This yields a mapping from URI to a dictionary
+ # that maps the above mentioned tag strings to an iterable containing the
+ # word total for that URI and word and count respectively.
+ #
+ # A diagram (in which '[]' just means 'iterable'):
+ #
+ # URI: {'word totals': [count], # Total words within this URI's document.
+ # 'word counts': [(word, count), # Counts of specific words
+ # (word, count), # within this URI's document.
+ # ... ]}
+ uri_to_word_and_count_and_total = (
+ {'word totals': uri_to_word_total, 'word counts': uri_to_word_and_count}
+ | df.CoGroupByKey('cogroup by uri'))
+
+ # Compute a mapping from each word to a (URI, term frequency) pair for each
+ # URI. A word's term frequency for a document is simply the number of times
+ # that word occurs in the document divided by the total number of words in
+ # the document.
+
+ def compute_term_frequency((uri, count_and_total)):
+ word_and_count = count_and_total['word counts']
+ # We have an iterable for one element that we want extracted.
+ [word_total] = count_and_total['word totals']
+ for word, count in word_and_count:
+ yield word, (uri, float(count) / word_total)
+
+ word_to_uri_and_tf = (
+ uri_to_word_and_count_and_total
+ | df.FlatMap('compute term frequencies', compute_term_frequency))
+
+ # Compute a mapping from each word to its document frequency.
+ # A word's document frequency in a corpus is the number of
+ # documents in which the word appears divided by the total
+ # number of documents in the corpus.
+ #
+ # This calculation uses a side input, a Dataflow-computed auxiliary value
+ # presented to each invocation of our MapFn lambda. The second argument to
+ # the lambda (called total---note that we are unpacking the first argument)
+ # receives the value we listed after the lambda in Map(). Additional side
+ # inputs (and ordinary Python values, too) can be provided to MapFns and
+ # DoFns in this way.
+ word_to_df = (
+ word_to_doc_count
+ | df.Map('compute doc frequencies',
+ lambda (word, count), total: (word, float(count) / total),
+ AsSingleton(total_documents)))
+
+ # Join the term frequency and document frequency collections,
+ # each keyed on the word.
+ word_to_uri_and_tf_and_df = (
+ {'tf': word_to_uri_and_tf, 'df': word_to_df}
+ | df.CoGroupByKey('cogroup words by tf-df'))
+
+ # Compute a mapping from each word to a (URI, TF-IDF) score for each URI.
+ # There are a variety of definitions of TF-IDF
+ # ("term frequency - inverse document frequency") score; here we use a
+ # basic version that is the term frequency divided by the log of the
+ # document frequency.
+
+ def compute_tf_idf((word, tf_and_df)):
+ [docf] = tf_and_df['df']
+ for uri, tf in tf_and_df['tf']:
+ yield word, (uri, tf * math.log(1 / docf))
+
+ word_to_uri_and_tfidf = (
+ word_to_uri_and_tf_and_df
+ | df.FlatMap('compute tf-idf', compute_tf_idf))
+
+ return word_to_uri_and_tfidf
+
+
+def run(argv=None):
+ """Main entry point; defines and runs the tfidf pipeline."""
+ parser = argparse.ArgumentParser()
+ parser.add_argument('--uris',
+ required=True,
+ help='URIs to process.')
+ parser.add_argument('--output',
+ required=True,
+ help='Output file to write results to.')
+ known_args, pipeline_args = parser.parse_known_args(argv)
+
+ p = df.Pipeline(argv=pipeline_args)
+ # Read documents specified by the uris command line option.
+ pcoll = read_documents(p, glob.glob(known_args.uris))
+ # Compute TF-IDF information for each word.
+ output = pcoll | TfIdf()
+ # Write the output using a "Write" transform that has side effects.
+ # pylint: disable=expression-not-assigned
+ output | df.io.Write('write', df.io.TextFileSink(known_args.output))
+ p.run()
+
+
+if __name__ == '__main__':
+ run()
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/apache_beam/examples/complete/tfidf_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/complete/tfidf_test.py b/sdks/python/apache_beam/examples/complete/tfidf_test.py
new file mode 100644
index 0000000..85b4964
--- /dev/null
+++ b/sdks/python/apache_beam/examples/complete/tfidf_test.py
@@ -0,0 +1,88 @@
+# Copyright 2016 Google Inc. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""Test for the TF-IDF example."""
+
+import logging
+import os
+import re
+import tempfile
+import unittest
+
+import google.cloud.dataflow as df
+from google.cloud.dataflow.examples.complete import tfidf
+
+
+EXPECTED_RESULTS = set([
+ ('ghi', '1.txt', 0.3662040962227032),
+ ('abc', '1.txt', 0.0),
+ ('abc', '3.txt', 0.0),
+ ('abc', '2.txt', 0.0),
+ ('def', '1.txt', 0.13515503603605478),
+ ('def', '2.txt', 0.2027325540540822)])
+
+
+EXPECTED_LINE_RE = r'\(u\'([a-z]*)\', \(\'.*([0-9]\.txt)\', (.*)\)\)'
+
+
+class TfIdfTest(unittest.TestCase):
+
+ def create_file(self, path, contents):
+ logging.info('Creating temp file: %s', path)
+ with open(path, 'w') as f:
+ f.write(contents)
+
+ def test_tfidf_transform(self):
+ p = df.Pipeline('DirectPipelineRunner')
+ uri_to_line = p | df.Create(
+ 'create sample',
+ [('1.txt', 'abc def ghi'),
+ ('2.txt', 'abc def'),
+ ('3.txt', 'abc')])
+ result = (
+ uri_to_line
+ | tfidf.TfIdf()
+ | df.Map('flatten', lambda (word, (uri, tfidf)): (word, uri, tfidf)))
+ df.assert_that(result, df.equal_to(EXPECTED_RESULTS))
+ # Run the pipeline. Note that the assert_that above adds to the pipeline
+ # a check that the result PCollection contains expected values. To actually
+ # trigger the check the pipeline must be run.
+ p.run()
+
+ def test_basics(self):
+ # Setup the files with expected content.
+ temp_folder = tempfile.mkdtemp()
+ self.create_file(os.path.join(temp_folder, '1.txt'), 'abc def ghi')
+ self.create_file(os.path.join(temp_folder, '2.txt'), 'abc def')
+ self.create_file(os.path.join(temp_folder, '3.txt'), 'abc')
+ tfidf.run([
+ '--uris=%s/*' % temp_folder,
+ '--output', os.path.join(temp_folder, 'result')])
+ # Parse result file and compare.
+ results = []
+ with open(os.path.join(temp_folder,
+ 'result-00000-of-00001')) as result_file:
+ for line in result_file:
+ match = re.search(EXPECTED_LINE_RE, line)
+ logging.info('Result line: %s', line)
+ if match is not None:
+ results.append(
+ (match.group(1), match.group(2), float(match.group(3))))
+ logging.info('Computed results: %s', set(results))
+ self.assertEqual(set(results), EXPECTED_RESULTS)
+
+
+if __name__ == '__main__':
+ logging.getLogger().setLevel(logging.INFO)
+ unittest.main()
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions.py b/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions.py
new file mode 100644
index 0000000..d0935fe
--- /dev/null
+++ b/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions.py
@@ -0,0 +1,170 @@
+# Copyright 2016 Google Inc. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""An example that reads Wikipedia edit data and computes strings of edits.
+
+An example that reads Wikipedia edit data from Cloud Storage and computes the
+user with the longest string of edits separated by no more than an hour within
+each 30 day period.
+
+To execute this pipeline locally using the DirectPipelineRunner, specify an
+output prefix on GCS:
+ --output gs://YOUR_OUTPUT_PREFIX
+
+To execute this pipeline using the Google Cloud Dataflow service, specify
+pipeline configuration in addition to the above:
+ --job_name NAME_FOR_YOUR_JOB
+ --project YOUR_PROJECT_ID
+ --staging_location gs://YOUR_STAGING_DIRECTORY
+ --temp_location gs://YOUR_TEMPORARY_DIRECTORY
+ --runner BlockingDataflowPipelineRunner
+
+The default input is gs://dataflow-samples/wikipedia_edits/*.json and can be
+overridden with --input.
+"""
+
+from __future__ import absolute_import
+
+import argparse
+import json
+import logging
+import sys
+
+import google.cloud.dataflow as df
+from google.cloud.dataflow import combiners
+from google.cloud.dataflow import window
+
+ONE_HOUR_IN_SECONDS = 3600
+THIRTY_DAYS_IN_SECONDS = 30 * 24 * ONE_HOUR_IN_SECONDS
+
+
+class ExtractUserAndTimestampDoFn(df.DoFn):
+ """Extracts user and timestamp representing a Wikipedia edit."""
+
+ def process(self, context):
+ table_row = json.loads(context.element)
+ if 'contributor_username' in table_row:
+ user_name = table_row['contributor_username']
+ timestamp = table_row['timestamp']
+ yield window.TimestampedValue(user_name, timestamp)
+
+
+class ComputeSessions(df.PTransform):
+ """Computes the number of edits in each user session.
+
+ A session is defined as a string of edits where each is separated from the
+ next by less than an hour.
+ """
+
+ def __init__(self):
+ super(ComputeSessions, self).__init__()
+
+ def apply(self, pcoll):
+ return (pcoll
+ | df.WindowInto('ComputeSessionsWindow',
+ window.Sessions(gap_size=ONE_HOUR_IN_SECONDS))
+ | combiners.Count.PerElement())
+
+
+class TopPerMonth(df.PTransform):
+ """Computes the longest session ending in each month."""
+
+ def __init__(self):
+ super(TopPerMonth, self).__init__()
+
+ def apply(self, pcoll):
+ return (pcoll
+ | df.WindowInto('TopPerMonthWindow',
+ window.FixedWindows(
+ size=THIRTY_DAYS_IN_SECONDS))
+ | combiners.core.CombineGlobally(
+ 'Top',
+ combiners.TopCombineFn(
+ 10, lambda first, second: first[1] < second[1]))
+ .without_defaults())
+
+
+class SessionsToStringsDoFn(df.DoFn):
+ """Adds the session information to be part of the key."""
+
+ def process(self, context):
+ yield (context.element[0] + ' : ' +
+ ', '.join([str(w) for w in context.windows]), context.element[1])
+
+
+class FormatOutputDoFn(df.DoFn):
+ """Formats a string containing the user, count, and session."""
+
+ def process(self, context):
+ for kv in context.element:
+ session = kv[0]
+ count = kv[1]
+ yield (session + ' : ' + str(count) + ' : '
+ + ', '.join([str(w) for w in context.windows]))
+
+
+class ComputeTopSessions(df.PTransform):
+ """Computes the top user sessions for each month."""
+
+ def __init__(self, sampling_threshold):
+ super(ComputeTopSessions, self).__init__()
+ self.sampling_threshold = sampling_threshold
+
+ def apply(self, pcoll):
+ return (pcoll
+ | df.ParDo('ExtractUserAndTimestamp', ExtractUserAndTimestampDoFn())
+ | df.Filter(
+ lambda x: abs(hash(x)) <= sys.maxint * self.sampling_threshold)
+ | ComputeSessions()
+ | df.ParDo('SessionsToStrings', SessionsToStringsDoFn())
+ | TopPerMonth()
+ | df.ParDo('FormatOutput', FormatOutputDoFn()))
+
+
+def run(argv=None):
+ """Runs the Wikipedia top edits pipeline.
+
+ Args:
+ argv: Pipeline options as a list of arguments.
+ """
+
+ parser = argparse.ArgumentParser()
+ parser.add_argument(
+ '--input',
+ dest='input',
+ default='gs://dataflow-samples/wikipedia_edits/*.json',
+ help='Input specified as a GCS path containing a BigQuery table exported '
+ 'as json.')
+ parser.add_argument('--output',
+ required=True,
+ help='Output file to write results to.')
+ parser.add_argument('--sampling_threshold',
+ type=float,
+ default=0.1,
+ help='Fraction of entries used for session tracking')
+ known_args, pipeline_args = parser.parse_known_args(argv)
+
+ p = df.Pipeline(argv=pipeline_args)
+
+ (p # pylint: disable=expression-not-assigned
+ | df.Read('read', df.io.TextFileSource(known_args.input))
+ | ComputeTopSessions(known_args.sampling_threshold)
+ | df.io.Write('write', df.io.TextFileSink(known_args.output)))
+
+ p.run()
+
+
+if __name__ == '__main__':
+ logging.getLogger().setLevel(logging.INFO)
+ run()
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions_test.py b/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions_test.py
new file mode 100644
index 0000000..a4fdf8c
--- /dev/null
+++ b/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions_test.py
@@ -0,0 +1,58 @@
+# Copyright 2016 Google Inc. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""Test for the top wikipedia sessions example."""
+
+import json
+import unittest
+
+
+import google.cloud.dataflow as df
+from google.cloud.dataflow.examples.complete import top_wikipedia_sessions
+
+
+class ComputeTopSessionsTest(unittest.TestCase):
+
+ EDITS = [
+ json.dumps({'timestamp': 0.0, 'contributor_username': 'user1'}),
+ json.dumps({'timestamp': 0.001, 'contributor_username': 'user1'}),
+ json.dumps({'timestamp': 0.002, 'contributor_username': 'user1'}),
+ json.dumps({'timestamp': 0.0, 'contributor_username': 'user2'}),
+ json.dumps({'timestamp': 0.001, 'contributor_username': 'user2'}),
+ json.dumps({'timestamp': 3.601, 'contributor_username': 'user2'}),
+ json.dumps({'timestamp': 3.602, 'contributor_username': 'user2'}),
+ json.dumps(
+ {'timestamp': 2 * 3600.0, 'contributor_username': 'user2'}),
+ json.dumps(
+ {'timestamp': 35 * 24 * 3.600, 'contributor_username': 'user3'})
+ ]
+
+ EXPECTED = [
+ 'user1 : [0.0, 3600.002) : 3 : [0.0, 2592000.0)',
+ 'user2 : [0.0, 3603.602) : 4 : [0.0, 2592000.0)',
+ 'user2 : [7200.0, 10800.0) : 1 : [0.0, 2592000.0)',
+ 'user3 : [3024.0, 6624.0) : 1 : [0.0, 2592000.0)',
+ ]
+
+ def test_compute_top_sessions(self):
+ p = df.Pipeline('DirectPipelineRunner')
+ edits = p | df.Create('create', self.EDITS)
+ result = edits | top_wikipedia_sessions.ComputeTopSessions(1.0)
+
+ df.assert_that(result, df.equal_to(self.EXPECTED))
+ p.run()
+
+
+if __name__ == '__main__':
+ unittest.main()
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/apache_beam/examples/cookbook/bigquery_schema.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/bigquery_schema.py b/sdks/python/apache_beam/examples/cookbook/bigquery_schema.py
new file mode 100644
index 0000000..67616ec
--- /dev/null
+++ b/sdks/python/apache_beam/examples/cookbook/bigquery_schema.py
@@ -0,0 +1,127 @@
+# Copyright 2016 Google Inc. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""A workflow that writes to a BigQuery table with nested and repeated fields.
+
+Demonstrates how to build a bigquery.TableSchema object with nested and repeated
+fields. Also, shows how to generate data to be written to a BigQuery table with
+nested and repeated fields.
+"""
+
+from __future__ import absolute_import
+
+import argparse
+import logging
+
+import google.cloud.dataflow as df
+
+
+def run(argv=None):
+ """Run the workflow."""
+ parser = argparse.ArgumentParser()
+
+ parser.add_argument(
+ '--output',
+ required=True,
+ help=
+ ('Output BigQuery table for results specified as: PROJECT:DATASET.TABLE '
+ 'or DATASET.TABLE.'))
+ known_args, pipeline_args = parser.parse_known_args(argv)
+
+ p = df.Pipeline(argv=pipeline_args)
+
+ from google.cloud.dataflow.internal.clients import bigquery # pylint: disable=g-import-not-at-top
+
+ table_schema = bigquery.TableSchema()
+
+ # Fields that use standard types.
+ kind_schema = bigquery.TableFieldSchema()
+ kind_schema.name = 'kind'
+ kind_schema.type = 'string'
+ kind_schema.mode = 'nullable'
+ table_schema.fields.append(kind_schema)
+
+ full_name_schema = bigquery.TableFieldSchema()
+ full_name_schema.name = 'fullName'
+ full_name_schema.type = 'string'
+ full_name_schema.mode = 'required'
+ table_schema.fields.append(full_name_schema)
+
+ age_schema = bigquery.TableFieldSchema()
+ age_schema.name = 'age'
+ age_schema.type = 'integer'
+ age_schema.mode = 'nullable'
+ table_schema.fields.append(age_schema)
+
+ gender_schema = bigquery.TableFieldSchema()
+ gender_schema.name = 'gender'
+ gender_schema.type = 'string'
+ gender_schema.mode = 'nullable'
+ table_schema.fields.append(gender_schema)
+
+ # A nested field
+ phone_number_schema = bigquery.TableFieldSchema()
+ phone_number_schema.name = 'phoneNumber'
+ phone_number_schema.type = 'record'
+ phone_number_schema.mode = 'nullable'
+
+ area_code = bigquery.TableFieldSchema()
+ area_code.name = 'areaCode'
+ area_code.type = 'integer'
+ area_code.mode = 'nullable'
+ phone_number_schema.fields.append(area_code)
+
+ number = bigquery.TableFieldSchema()
+ number.name = 'number'
+ number.type = 'integer'
+ number.mode = 'nullable'
+ phone_number_schema.fields.append(number)
+ table_schema.fields.append(phone_number_schema)
+
+ # A repeated field.
+ children_schema = bigquery.TableFieldSchema()
+ children_schema.name = 'children'
+ children_schema.type = 'string'
+ children_schema.mode = 'repeated'
+ table_schema.fields.append(children_schema)
+
+ def create_random_record(record_id):
+ return {'kind': 'kind' + record_id, 'fullName': 'fullName'+record_id,
+ 'age': int(record_id) * 10, 'gender': 'male',
+ 'phoneNumber': {
+ 'areaCode': int(record_id) * 100,
+ 'number': int(record_id) * 100000},
+ 'children': ['child' + record_id + '1',
+ 'child' + record_id + '2',
+ 'child' + record_id + '3']
+ }
+
+ # pylint: disable=expression-not-assigned
+ record_ids = p | df.Create('CreateIDs', ['1', '2', '3', '4', '5'])
+ records = record_ids | df.Map('CreateRecords', create_random_record)
+ records | df.io.Write(
+ 'write',
+ df.io.BigQuerySink(
+ known_args.output,
+ schema=table_schema,
+ create_disposition=df.io.BigQueryDisposition.CREATE_IF_NEEDED,
+ write_disposition=df.io.BigQueryDisposition.WRITE_TRUNCATE))
+
+ # Run the pipeline (all operations are deferred until run() is called).
+ p.run()
+
+
+if __name__ == '__main__':
+ logging.getLogger().setLevel(logging.INFO)
+ run()
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/apache_beam/examples/cookbook/bigquery_side_input.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/bigquery_side_input.py b/sdks/python/apache_beam/examples/cookbook/bigquery_side_input.py
new file mode 100644
index 0000000..20ef8d9
--- /dev/null
+++ b/sdks/python/apache_beam/examples/cookbook/bigquery_side_input.py
@@ -0,0 +1,114 @@
+# Copyright 2016 Google Inc. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""A Dataflow job that uses BigQuery sources as a side inputs.
+
+Illustrates how to insert side-inputs into transforms in three different forms,
+as a singleton, as a iterator, and as a list.
+
+This workflow generate a set of tuples of the form (groupId, corpus, word) where
+groupId is a generated identifier for the group and corpus and word are randomly
+selected from corresponding rows in BQ dataset 'publicdata:samples.shakespeare'.
+Users should specify the number of groups to form and optionally a corpus and/or
+a word that should be ignored when forming groups.
+"""
+
+import argparse
+import logging
+from random import randrange
+
+import google.cloud.dataflow as df
+
+from google.cloud.dataflow.pvalue import AsIter
+from google.cloud.dataflow.pvalue import AsList
+from google.cloud.dataflow.pvalue import AsSingleton
+
+
+def create_groups(group_ids, corpus, word, ignore_corpus, ignore_word):
+ """Generate groups given the input PCollections."""
+
+ def attach_corpus_fn(group, corpus, ignore):
+ selected = None
+ len_corpus = len(corpus)
+ while not selected:
+ c = corpus[randrange(0, len_corpus - 1)].values()[0]
+ if c != ignore:
+ selected = c
+
+ yield (group, selected)
+
+ def attach_word_fn(group, words, ignore):
+ selected = None
+ len_words = len(words)
+ while not selected:
+ c = words[randrange(0, len_words - 1)].values()[0]
+ if c != ignore:
+ selected = c
+
+ yield group + (selected,)
+
+ return (group_ids
+ | df.FlatMap(
+ 'attach corpus',
+ attach_corpus_fn,
+ AsList(corpus),
+ AsSingleton(ignore_corpus))
+ | df.FlatMap(
+ 'attach word',
+ attach_word_fn,
+ AsIter(word),
+ AsSingleton(ignore_word)))
+
+
+def run(argv=None):
+ """Run the workflow."""
+
+ parser = argparse.ArgumentParser()
+ parser.add_argument('--output')
+ parser.add_argument('--ignore_corpus', default='')
+ parser.add_argument('--ignore_word', default='')
+ parser.add_argument('--num_groups')
+
+ known_args, pipeline_args = parser.parse_known_args(argv)
+ p = df.Pipeline(argv=pipeline_args)
+
+ group_ids = []
+ for i in xrange(0, int(known_args.num_groups)):
+ group_ids.append('id' + str(i))
+
+ query_corpus = 'select UNIQUE(corpus) from publicdata:samples.shakespeare'
+ query_word = 'select UNIQUE(word) from publicdata:samples.shakespeare'
+ ignore_corpus = known_args.ignore_corpus
+ ignore_word = known_args.ignore_word
+
+ pcoll_corpus = p | df.Read('read corpus',
+ df.io.BigQuerySource(query=query_corpus))
+ pcoll_word = p | df.Read('read words',
+ df.io.BigQuerySource(query=query_word))
+ pcoll_ignore_corpus = p | df.Create('create_ignore_corpus', [ignore_corpus])
+ pcoll_ignore_word = p | df.Create('create_ignore_word', [ignore_word])
+ pcoll_group_ids = p | df.Create('create groups', group_ids)
+
+ pcoll_groups = create_groups(pcoll_group_ids, pcoll_corpus, pcoll_word,
+ pcoll_ignore_corpus, pcoll_ignore_word)
+
+ # pylint:disable=expression-not-assigned
+ pcoll_groups | df.io.Write('WriteToText',
+ df.io.TextFileSink(known_args.output))
+ p.run()
+
+
+if __name__ == '__main__':
+ logging.getLogger().setLevel(logging.INFO)
+ run()