You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by da...@apache.org on 2016/06/14 23:12:59 UTC
[24/50] [abbrv] incubator-beam git commit: Move all files to
apache_beam folder
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/google/cloud/dataflow/coders/stream_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/google/cloud/dataflow/coders/stream_test.py b/sdks/python/google/cloud/dataflow/coders/stream_test.py
deleted file mode 100644
index 3002116..0000000
--- a/sdks/python/google/cloud/dataflow/coders/stream_test.py
+++ /dev/null
@@ -1,168 +0,0 @@
-# Copyright 2016 Google Inc. All Rights Reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-"""Tests for the stream implementations."""
-
-import logging
-import math
-import unittest
-
-
-from google.cloud.dataflow.coders import slow_stream
-
-
-class StreamTest(unittest.TestCase):
- # pylint: disable=invalid-name
- InputStream = slow_stream.InputStream
- OutputStream = slow_stream.OutputStream
- ByteCountingOutputStream = slow_stream.ByteCountingOutputStream
- # pylint: enable=invalid-name
-
- def test_read_write(self):
- out_s = self.OutputStream()
- out_s.write('abc')
- out_s.write('\0\t\n')
- out_s.write('xyz', True)
- out_s.write('', True)
- in_s = self.InputStream(out_s.get())
- self.assertEquals('abc\0\t\n', in_s.read(6))
- self.assertEquals('xyz', in_s.read_all(True))
- self.assertEquals('', in_s.read_all(True))
-
- def test_read_all(self):
- out_s = self.OutputStream()
- out_s.write('abc')
- in_s = self.InputStream(out_s.get())
- self.assertEquals('abc', in_s.read_all(False))
-
- def test_read_write_byte(self):
- out_s = self.OutputStream()
- out_s.write_byte(1)
- out_s.write_byte(0)
- out_s.write_byte(0xFF)
- in_s = self.InputStream(out_s.get())
- self.assertEquals(1, in_s.read_byte())
- self.assertEquals(0, in_s.read_byte())
- self.assertEquals(0xFF, in_s.read_byte())
-
- def test_read_write_large(self):
- values = range(4 * 1024)
- out_s = self.OutputStream()
- for v in values:
- out_s.write_bigendian_int64(v)
- in_s = self.InputStream(out_s.get())
- for v in values:
- self.assertEquals(v, in_s.read_bigendian_int64())
-
- def run_read_write_var_int64(self, values):
- out_s = self.OutputStream()
- for v in values:
- out_s.write_var_int64(v)
- in_s = self.InputStream(out_s.get())
- for v in values:
- self.assertEquals(v, in_s.read_var_int64())
-
- def test_small_var_int64(self):
- self.run_read_write_var_int64(range(-10, 30))
-
- def test_medium_var_int64(self):
- base = -1.7
- self.run_read_write_var_int64(
- [int(base**pow)
- for pow in range(1, int(63 * math.log(2) / math.log(-base)))])
-
- def test_large_var_int64(self):
- self.run_read_write_var_int64([0, 2**63 - 1, -2**63, 2**63 - 3])
-
- def test_read_write_double(self):
- values = 0, 1, -1, 1e100, 1.0/3, math.pi, float('inf')
- out_s = self.OutputStream()
- for v in values:
- out_s.write_bigendian_double(v)
- in_s = self.InputStream(out_s.get())
- for v in values:
- self.assertEquals(v, in_s.read_bigendian_double())
-
- def test_read_write_bigendian_int64(self):
- values = 0, 1, -1, 2**63-1, -2**63, int(2**61 * math.pi)
- out_s = self.OutputStream()
- for v in values:
- out_s.write_bigendian_int64(v)
- in_s = self.InputStream(out_s.get())
- for v in values:
- self.assertEquals(v, in_s.read_bigendian_int64())
-
- def test_read_write_bigendian_int32(self):
- values = 0, 1, -1, 2**31-1, -2**31, int(2**29 * math.pi)
- out_s = self.OutputStream()
- for v in values:
- out_s.write_bigendian_int32(v)
- in_s = self.InputStream(out_s.get())
- for v in values:
- self.assertEquals(v, in_s.read_bigendian_int32())
-
- def test_byte_counting(self):
- bc_s = self.ByteCountingOutputStream()
- self.assertEquals(0, bc_s.get_count())
- bc_s.write('def')
- self.assertEquals(3, bc_s.get_count())
- bc_s.write('')
- self.assertEquals(3, bc_s.get_count())
- bc_s.write_byte(10)
- self.assertEquals(4, bc_s.get_count())
- # "nested" also writes the length of the string, which should
- # cause 1 extra byte to be counted.
- bc_s.write('2345', nested=True)
- self.assertEquals(9, bc_s.get_count())
- bc_s.write_var_int64(63)
- self.assertEquals(10, bc_s.get_count())
- bc_s.write_bigendian_int64(42)
- self.assertEquals(18, bc_s.get_count())
- bc_s.write_bigendian_int32(36)
- self.assertEquals(22, bc_s.get_count())
- bc_s.write_bigendian_double(6.25)
- self.assertEquals(30, bc_s.get_count())
-
-
-try:
- # pylint: disable=g-import-not-at-top
- from google.cloud.dataflow.coders import stream
-
- class FastStreamTest(StreamTest):
- """Runs the test with the compiled stream classes."""
- InputStream = stream.InputStream
- OutputStream = stream.OutputStream
- ByteCountingOutputStream = stream.ByteCountingOutputStream
-
-
- class SlowFastStreamTest(StreamTest):
- """Runs the test with compiled and uncompiled stream classes."""
- InputStream = stream.InputStream
- OutputStream = slow_stream.OutputStream
- ByteCountingOutputStream = slow_stream.ByteCountingOutputStream
-
-
- class FastSlowStreamTest(StreamTest):
- """Runs the test with uncompiled and compiled stream classes."""
- InputStream = slow_stream.InputStream
- OutputStream = stream.OutputStream
- ByteCountingOutputStream = stream.ByteCountingOutputStream
-
-except ImportError:
- pass
-
-
-if __name__ == '__main__':
- logging.getLogger().setLevel(logging.INFO)
- unittest.main()
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/google/cloud/dataflow/coders/typecoders.py
----------------------------------------------------------------------
diff --git a/sdks/python/google/cloud/dataflow/coders/typecoders.py b/sdks/python/google/cloud/dataflow/coders/typecoders.py
deleted file mode 100644
index 98cf2b5..0000000
--- a/sdks/python/google/cloud/dataflow/coders/typecoders.py
+++ /dev/null
@@ -1,154 +0,0 @@
-# Copyright 2016 Google Inc. All Rights Reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-"""Type coders registration.
-
-This module contains functionality to define and use coders for custom classes.
-Let's say we have a class Xyz and we are processing a PCollection with elements
-of type Xyz. If we do not register a coder for Xyz, a default pickle-based
-fallback coder will be used. This can be undesirable for two reasons. First, we
-may want a faster coder or a more space efficient one. Second, the pickle-based
-coder is not deterministic in the sense that objects like dictionaries or sets
-are not guaranteed to be encoded in the same way every time (elements are not
-really ordered).
-
-Two (sometimes three) steps are needed to define and use a custom coder:
- - define the coder class
- - associate the code with the class (a.k.a. coder registration)
- - typehint DoFns or transforms with the new class or composite types using
- the class.
-
-A coder class is defined by subclassing from CoderBase and defining the
-encode_to_bytes and decode_from_bytes methods. The framework uses duck-typing
-for coders so it is not strictly required to subclass from CoderBase as long as
-the encode/decode methods are defined.
-
-Registering a coder class is made with a register_coder() call::
-
- from google.cloud.dataflow import coders
- ...
- coders.registry.register_coder(Xyz, XyzCoder)
-
-Additionally, DoFns and PTransforms may need type hints. This is not always
-necessary since there is functionality to infer the return types of DoFns by
-analyzing the code. For instance, for the function below the return type of
-'Xyz' will be inferred::
-
- def MakeXyzs(v):
- return Xyz(v)
-
-If Xyz is inferred then its coder will be used whenever the framework needs to
-serialize data (e.g., writing to the shuffler subsystem responsible for group by
-key operations). If a typehint is needed it can be specified by decorating the
-DoFns or using with_input_types/with_output_types methods on PTransforms. For
-example, the above function can be decorated::
-
- @with_output_types(Xyz)
- def MakeXyzs(v):
- return complex_operation_returning_Xyz(v)
-
-See google.cloud.dataflow.typehints.decorators module for more details.
-"""
-
-import logging
-
-from google.cloud.dataflow.coders import coders
-from google.cloud.dataflow.typehints import typehints
-
-
-class CoderRegistry(object):
- """A coder registry for typehint/coder associations."""
-
- def __init__(self, fallback_coder=None):
- self._coders = {}
- self.custom_types = []
- self.register_standard_coders(fallback_coder)
-
- def register_standard_coders(self, fallback_coder):
- """Register coders for all basic and composite types."""
- self._register_coder_internal(int, coders.VarIntCoder)
- self._register_coder_internal(float, coders.FloatCoder)
- self._register_coder_internal(str, coders.BytesCoder)
- self._register_coder_internal(bytes, coders.BytesCoder)
- self._register_coder_internal(unicode, coders.StrUtf8Coder)
- self._register_coder_internal(typehints.TupleConstraint, coders.TupleCoder)
- self._register_coder_internal(typehints.AnyTypeConstraint,
- coders.PickleCoder)
- self._fallback_coder = fallback_coder or coders.PickleCoder
-
- def _register_coder_internal(self, typehint_type, typehint_coder_class):
- self._coders[typehint_type] = typehint_coder_class
-
- def register_coder(self, typehint_type, typehint_coder_class):
- if not isinstance(typehint_coder_class, type):
- raise TypeError('Coder registration requires a coder class object. '
- 'Received %r instead.' % typehint_coder_class)
- if typehint_type not in self.custom_types:
- self.custom_types.append(typehint_type)
- self._register_coder_internal(typehint_type, typehint_coder_class)
-
- def get_coder(self, typehint):
- coder = self._coders.get(
- typehint.__class__ if isinstance(typehint, typehints.TypeConstraint)
- else typehint, None)
- if isinstance(typehint, typehints.TypeConstraint) and coder is not None:
- return coder.from_type_hint(typehint, self)
- if coder is None:
- # We use the fallback coder when there is no coder registered for a
- # typehint. For example a user defined class with no coder specified.
- if not hasattr(self, '_fallback_coder'):
- raise RuntimeError(
- 'Coder registry has no fallback coder. This can happen if the '
- 'fast_coders module could not be imported.')
- if isinstance(typehint, typehints.IterableTypeConstraint):
- # In this case, we suppress the warning message for using the fallback
- # coder, since Iterable is hinted as the output of a GroupByKey
- # operation and that direct output will not be coded.
- # TODO(ccy): refine this behavior.
- pass
- elif typehint is None:
- # In some old code, None is used for Any.
- # TODO(robertwb): Clean this up.
- pass
- elif isinstance(typehint, typehints.TypeVariable):
- # TODO(robertwb): Clean this up when type inference is fully enabled.
- pass
- else:
- logging.warning('Using fallback coder for typehint: %r.', typehint)
- coder = self._fallback_coder
- return coder.from_type_hint(typehint, self)
-
- def get_custom_type_coder_tuples(self, types):
- """Returns type/coder tuples for all custom types passed in."""
- return [(t, self._coders[t]) for t in types if t in self.custom_types]
-
- def verify_deterministic(self, key_coder, op_name, silent=True):
- if not key_coder.is_deterministic():
- error_msg = ('The key coder "%s" for %s '
- 'is not deterministic. This may result in incorrect '
- 'pipeline output. This can be fixed by adding a type '
- 'hint to the operation preceding the GroupByKey step, '
- 'and for custom key classes, by writing a '
- 'deterministic custom Coder. Please see the '
- 'documentation for more details.' % (key_coder, op_name))
- if isinstance(key_coder, (coders.PickleCoder, self._fallback_coder)):
- if not silent:
- logging.warning(error_msg)
- return coders.DeterministicPickleCoder(key_coder, op_name)
- else:
- raise ValueError(error_msg)
- else:
- return key_coder
-
-registry = CoderRegistry()
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/google/cloud/dataflow/coders/typecoders_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/google/cloud/dataflow/coders/typecoders_test.py b/sdks/python/google/cloud/dataflow/coders/typecoders_test.py
deleted file mode 100644
index ed46ede..0000000
--- a/sdks/python/google/cloud/dataflow/coders/typecoders_test.py
+++ /dev/null
@@ -1,114 +0,0 @@
-# Copyright 2016 Google Inc. All Rights Reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-"""Unit tests for the typecoders module."""
-
-import unittest
-
-from google.cloud.dataflow.coders import coders
-from google.cloud.dataflow.coders import typecoders
-from google.cloud.dataflow.internal import pickler
-from google.cloud.dataflow.typehints import typehints
-
-
-class CustomClass(object):
-
- def __init__(self, n):
- self.number = n
-
- def __eq__(self, other):
- return self.number == other.number
-
-
-class CustomCoder(coders.Coder):
-
- def encode(self, value):
- return str(value.number)
-
- def decode(self, encoded):
- return CustomClass(int(encoded))
-
- def is_deterministic(self):
- # This coder is deterministic. Though we don't use need this coder to be
- # deterministic for this test, we annotate this as such to follow best
- # practices.
- return True
-
-
-class TypeCodersTest(unittest.TestCase):
-
- def test_register_non_type_coder(self):
- coder = CustomCoder()
- with self.assertRaises(TypeError) as e:
- # When registering a coder the coder class must be specified.
- typecoders.registry.register_coder(CustomClass, coder)
- self.assertEqual(e.exception.message,
- 'Coder registration requires a coder class object. '
- 'Received %r instead.' % coder)
-
- def test_get_coder_with_custom_coder(self):
- typecoders.registry.register_coder(CustomClass, CustomCoder)
- self.assertEqual(CustomCoder,
- typecoders.registry.get_coder(CustomClass).__class__)
-
- def test_get_coder_with_composite_custom_coder(self):
- typecoders.registry.register_coder(CustomClass, CustomCoder)
- coder = typecoders.registry.get_coder(typehints.KV[CustomClass, str])
- revived_coder = pickler.loads(pickler.dumps(coder))
- self.assertEqual(
- (CustomClass(123), 'abc'),
- revived_coder.decode(revived_coder.encode((CustomClass(123), 'abc'))))
-
- def test_get_coder_with_standard_coder(self):
- self.assertEqual(coders.BytesCoder,
- typecoders.registry.get_coder(str).__class__)
-
- def test_fallbackcoder(self):
- coder = typecoders.registry.get_coder(typehints.Any)
- self.assertEqual(('abc', 123), coder.decode(coder.encode(('abc', 123))))
-
- def test_get_coder_can_be_pickled(self):
- coder = typecoders.registry.get_coder(typehints.Tuple[str, int])
- revived_coder = pickler.loads(pickler.dumps(coder))
- self.assertEqual(('abc', 123),
- revived_coder.decode(revived_coder.encode(('abc', 123))))
-
- def test_standard_int_coder(self):
- real_coder = typecoders.registry.get_coder(int)
- expected_coder = coders.VarIntCoder()
- self.assertEqual(
- real_coder.encode(0x0404), expected_coder.encode(0x0404))
- self.assertEqual(0x0404, real_coder.decode(real_coder.encode(0x0404)))
- self.assertEqual(
- real_coder.encode(0x040404040404),
- expected_coder.encode(0x040404040404))
- self.assertEqual(0x040404040404,
- real_coder.decode(real_coder.encode(0x040404040404)))
-
- def test_standard_str_coder(self):
- real_coder = typecoders.registry.get_coder(str)
- expected_coder = coders.BytesCoder()
- self.assertEqual(
- real_coder.encode('abc'), expected_coder.encode('abc'))
- self.assertEqual('abc', real_coder.decode(real_coder.encode('abc')))
-
- real_coder = typecoders.registry.get_coder(bytes)
- expected_coder = coders.BytesCoder()
- self.assertEqual(
- real_coder.encode('abc'), expected_coder.encode('abc'))
- self.assertEqual('abc', real_coder.decode(real_coder.encode('abc')))
-
-
-if __name__ == '__main__':
- unittest.main()
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/google/cloud/dataflow/dataflow_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/google/cloud/dataflow/dataflow_test.py b/sdks/python/google/cloud/dataflow/dataflow_test.py
deleted file mode 100644
index c40b88f..0000000
--- a/sdks/python/google/cloud/dataflow/dataflow_test.py
+++ /dev/null
@@ -1,405 +0,0 @@
-# Copyright 2016 Google Inc. All Rights Reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-"""Integration tests for the dataflow package."""
-
-from __future__ import absolute_import
-
-import logging
-import re
-import unittest
-
-from google.cloud.dataflow.pipeline import Pipeline
-from google.cloud.dataflow.pvalue import AsDict
-from google.cloud.dataflow.pvalue import AsIter as AllOf
-from google.cloud.dataflow.pvalue import AsList
-from google.cloud.dataflow.pvalue import AsSingleton
-from google.cloud.dataflow.pvalue import EmptySideInput
-from google.cloud.dataflow.pvalue import SideOutputValue
-from google.cloud.dataflow.transforms import Create
-from google.cloud.dataflow.transforms import DoFn
-from google.cloud.dataflow.transforms import FlatMap
-from google.cloud.dataflow.transforms import GroupByKey
-from google.cloud.dataflow.transforms import Map
-from google.cloud.dataflow.transforms import ParDo
-from google.cloud.dataflow.transforms import WindowInto
-from google.cloud.dataflow.transforms.util import assert_that
-from google.cloud.dataflow.transforms.util import equal_to
-from google.cloud.dataflow.transforms.window import IntervalWindow
-from google.cloud.dataflow.transforms.window import WindowFn
-
-
-class DataflowTest(unittest.TestCase):
- """Dataflow integration tests."""
-
- SAMPLE_DATA = 'aa bb cc aa bb aa \n' * 10
- SAMPLE_RESULT = [('cc', 10), ('bb', 20), ('aa', 30)]
-
- # TODO(silviuc): Figure out a nice way to specify labels for stages so that
- # internal steps get prepended with surorunding stage names.
- @staticmethod
- def Count(pcoll): # pylint: disable=invalid-name
- """A Count transform: v, ... => (v, n), ..."""
- return (pcoll
- | Map('AddCount', lambda x: (x, 1))
- | GroupByKey('GroupCounts')
- | Map('AddCounts', lambda (x, ones): (x, sum(ones))))
-
- def test_word_count(self):
- pipeline = Pipeline('DirectPipelineRunner')
- lines = pipeline | Create('SomeWords', [DataflowTest.SAMPLE_DATA])
- result = (
- (lines | FlatMap('GetWords', lambda x: re.findall(r'\w+', x)))
- .apply('CountWords', DataflowTest.Count))
- assert_that(result, equal_to(DataflowTest.SAMPLE_RESULT))
- pipeline.run()
-
- def test_map(self):
- pipeline = Pipeline('DirectPipelineRunner')
- lines = pipeline | Create('input', ['a', 'b', 'c'])
- result = (lines
- | Map('upper', str.upper)
- | Map('prefix', lambda x, prefix: prefix + x, 'foo-'))
- assert_that(result, equal_to(['foo-A', 'foo-B', 'foo-C']))
- pipeline.run()
-
- def test_word_count_using_get(self):
- pipeline = Pipeline('DirectPipelineRunner')
- lines = pipeline | Create('SomeWords', [DataflowTest.SAMPLE_DATA])
- result = (
- (lines | FlatMap('GetWords', lambda x: re.findall(r'\w+', x)))
- .apply('CountWords', DataflowTest.Count))
- assert_that(result, equal_to(DataflowTest.SAMPLE_RESULT))
- pipeline.run()
-
- def test_par_do_with_side_input_as_arg(self):
- pipeline = Pipeline('DirectPipelineRunner')
- words_list = ['aa', 'bb', 'cc']
- words = pipeline | Create('SomeWords', words_list)
- prefix = pipeline | Create('SomeString', ['xyz']) # side in
- suffix = 'zyx'
- result = words | FlatMap(
- 'DecorateWords',
- lambda x, pfx, sfx: ['%s-%s-%s' % (pfx, x, sfx)],
- AsSingleton(prefix), suffix)
- assert_that(result, equal_to(['xyz-%s-zyx' % x for x in words_list]))
- pipeline.run()
-
- def test_par_do_with_side_input_as_keyword_arg(self):
- pipeline = Pipeline('DirectPipelineRunner')
- words_list = ['aa', 'bb', 'cc']
- words = pipeline | Create('SomeWords', words_list)
- prefix = 'zyx'
- suffix = pipeline | Create('SomeString', ['xyz']) # side in
- result = words | FlatMap(
- 'DecorateWords',
- lambda x, pfx, sfx: ['%s-%s-%s' % (pfx, x, sfx)],
- prefix, sfx=AsSingleton(suffix))
- assert_that(result, equal_to(['zyx-%s-xyz' % x for x in words_list]))
- pipeline.run()
-
- def test_par_do_with_do_fn_object(self):
- class SomeDoFn(DoFn):
- """A custom DoFn for a FlatMap transform."""
-
- def process(self, context, prefix, suffix):
- return ['%s-%s-%s' % (prefix, context.element, suffix)]
-
- pipeline = Pipeline('DirectPipelineRunner')
- words_list = ['aa', 'bb', 'cc']
- words = pipeline | Create('SomeWords', words_list)
- prefix = 'zyx'
- suffix = pipeline | Create('SomeString', ['xyz']) # side in
- result = words | ParDo('DecorateWordsDoFn', SomeDoFn(), prefix,
- suffix=AsSingleton(suffix))
- assert_that(result, equal_to(['zyx-%s-xyz' % x for x in words_list]))
- pipeline.run()
-
- def test_par_do_with_multiple_outputs_and_using_yield(self):
- class SomeDoFn(DoFn):
- """A custom DoFn using yield."""
-
- def process(self, context):
- yield context.element
- if context.element % 2 == 0:
- yield SideOutputValue('even', context.element)
- else:
- yield SideOutputValue('odd', context.element)
-
- pipeline = Pipeline('DirectPipelineRunner')
- nums = pipeline | Create('Some Numbers', [1, 2, 3, 4])
- results = nums | ParDo(
- 'ClassifyNumbers', SomeDoFn()).with_outputs('odd', 'even', main='main')
- assert_that(results.main, equal_to([1, 2, 3, 4]))
- assert_that(results.odd, equal_to([1, 3]), label='assert:odd')
- assert_that(results.even, equal_to([2, 4]), label='assert:even')
- pipeline.run()
-
- def test_par_do_with_multiple_outputs_and_using_return(self):
- def some_fn(v):
- if v % 2 == 0:
- return [v, SideOutputValue('even', v)]
- else:
- return [v, SideOutputValue('odd', v)]
-
- pipeline = Pipeline('DirectPipelineRunner')
- nums = pipeline | Create('Some Numbers', [1, 2, 3, 4])
- results = nums | FlatMap(
- 'ClassifyNumbers', some_fn).with_outputs('odd', 'even', main='main')
- assert_that(results.main, equal_to([1, 2, 3, 4]))
- assert_that(results.odd, equal_to([1, 3]), label='assert:odd')
- assert_that(results.even, equal_to([2, 4]), label='assert:even')
- pipeline.run()
-
- def test_empty_singleton_side_input(self):
- pipeline = Pipeline('DirectPipelineRunner')
- pcol = pipeline | Create('start', [1, 2])
- side = pipeline | Create('side', []) # Empty side input.
-
- def my_fn(k, s):
- v = ('empty' if isinstance(s, EmptySideInput) else 'full')
- return [(k, v)]
- result = pcol | FlatMap('compute', my_fn, AsSingleton(side))
- assert_that(result, equal_to([(1, 'empty'), (2, 'empty')]))
- pipeline.run()
-
- def test_multi_valued_singleton_side_input(self):
- pipeline = Pipeline('DirectPipelineRunner')
- pcol = pipeline | Create('start', [1, 2])
- side = pipeline | Create('side', [3, 4]) # 2 values in side input.
- pcol | FlatMap('compute', lambda x, s: [x * s], AsSingleton(side))
- with self.assertRaises(ValueError) as e:
- pipeline.run()
-
- def test_default_value_singleton_side_input(self):
- pipeline = Pipeline('DirectPipelineRunner')
- pcol = pipeline | Create('start', [1, 2])
- side = pipeline | Create('side', []) # 0 values in side input.
- result = (
- pcol | FlatMap('compute', lambda x, s: [x * s], AsSingleton(side, 10)))
- assert_that(result, equal_to([10, 20]))
- pipeline.run()
-
- def test_iterable_side_input(self):
- pipeline = Pipeline('DirectPipelineRunner')
- pcol = pipeline | Create('start', [1, 2])
- side = pipeline | Create('side', [3, 4]) # 2 values in side input.
- result = pcol | FlatMap('compute',
- lambda x, s: [x * y for y in s], AllOf(side))
- assert_that(result, equal_to([3, 4, 6, 8]))
- pipeline.run()
-
- def test_undeclared_side_outputs(self):
- pipeline = Pipeline('DirectPipelineRunner')
- nums = pipeline | Create('Some Numbers', [1, 2, 3, 4])
- results = nums | FlatMap(
- 'ClassifyNumbers',
- lambda x: [x, SideOutputValue('even' if x % 2 == 0 else 'odd', x)]
- ).with_outputs()
- # TODO(silviuc): Revisit this test to check for undeclared side outputs.
- # This should work with .with_outputs() without any tags declared and
- # the results[None] should work also.
- assert_that(results[None], equal_to([1, 2, 3, 4]))
- assert_that(results.odd, equal_to([1, 3]), label='assert:odd')
- assert_that(results.even, equal_to([2, 4]), label='assert:even')
- pipeline.run()
-
- def test_empty_side_outputs(self):
- pipeline = Pipeline('DirectPipelineRunner')
- nums = pipeline | Create('Some Numbers', [1, 3, 5])
- results = nums | FlatMap(
- 'ClassifyNumbers',
- lambda x: [x, SideOutputValue('even' if x % 2 == 0 else 'odd', x)]
- ).with_outputs()
- assert_that(results[None], equal_to([1, 3, 5]))
- assert_that(results.odd, equal_to([1, 3, 5]), label='assert:odd')
- assert_that(results.even, equal_to([]), label='assert:even')
- pipeline.run()
-
- def test_as_list_and_as_dict_side_inputs(self):
- a_list = [5, 1, 3, 2, 9]
- some_pairs = [('crouton', 17), ('supreme', None)]
- pipeline = Pipeline('DirectPipelineRunner')
- main_input = pipeline | Create('main input', [1])
- side_list = pipeline | Create('side list', a_list)
- side_pairs = pipeline | Create('side pairs', some_pairs)
- results = main_input | FlatMap(
- 'concatenate',
- lambda x, the_list, the_dict: [[x, the_list, the_dict]],
- AsList(side_list), AsDict(side_pairs))
-
- def matcher(expected_elem, expected_list, expected_pairs):
- def match(actual):
- [[actual_elem, actual_list, actual_dict]] = actual
- equal_to([expected_elem])([actual_elem])
- equal_to(expected_list)(actual_list)
- equal_to(expected_pairs)(actual_dict.iteritems())
- return match
-
- assert_that(results, matcher(1, a_list, some_pairs))
- pipeline.run()
-
- def test_as_singleton_without_unique_labels(self):
- # This should succeed as calling AsSingleton on the same PCollection twice
- # with the same defaults will return the same PCollectionView.
- a_list = [2]
- pipeline = Pipeline('DirectPipelineRunner')
- main_input = pipeline | Create('main input', [1])
- side_list = pipeline | Create('side list', a_list)
- results = main_input | FlatMap(
- 'test',
- lambda x, s1, s2: [[x, s1, s2]],
- AsSingleton(side_list), AsSingleton(side_list))
-
- def matcher(expected_elem, expected_singleton):
- def match(actual):
- [[actual_elem, actual_singleton1, actual_singleton2]] = actual
- equal_to([expected_elem])([actual_elem])
- equal_to([expected_singleton])([actual_singleton1])
- equal_to([expected_singleton])([actual_singleton2])
- return match
-
- assert_that(results, matcher(1, 2))
- pipeline.run()
-
- def test_as_singleton_with_different_defaults_without_unique_labels(self):
- # This should fail as AsSingleton with distinct default values should create
- # distinct PCollectionViews with the same full_label.
- a_list = [2]
- pipeline = Pipeline('DirectPipelineRunner')
- main_input = pipeline | Create('main input', [1])
- side_list = pipeline | Create('side list', a_list)
-
- with self.assertRaises(RuntimeError) as e:
- _ = main_input | FlatMap(
- 'test',
- lambda x, s1, s2: [[x, s1, s2]],
- AsSingleton(side_list), AsSingleton(side_list, default_value=3))
- self.assertTrue(
- e.exception.message.startswith(
- 'Transform "ViewAsSingleton(side list.None)" does not have a '
- 'stable unique label.'))
-
- def test_as_singleton_with_different_defaults_with_unique_labels(self):
- a_list = []
- pipeline = Pipeline('DirectPipelineRunner')
- main_input = pipeline | Create('main input', [1])
- side_list = pipeline | Create('side list', a_list)
- results = main_input | FlatMap(
- 'test',
- lambda x, s1, s2: [[x, s1, s2]],
- AsSingleton('si1', side_list, default_value=2),
- AsSingleton('si2', side_list, default_value=3))
-
- def matcher(expected_elem, expected_singleton1, expected_singleton2):
- def match(actual):
- [[actual_elem, actual_singleton1, actual_singleton2]] = actual
- equal_to([expected_elem])([actual_elem])
- equal_to([expected_singleton1])([actual_singleton1])
- equal_to([expected_singleton2])([actual_singleton2])
- return match
-
- assert_that(results, matcher(1, 2, 3))
- pipeline.run()
-
- def test_as_list_without_unique_labels(self):
- # This should succeed as calling AsList on the same PCollection twice will
- # return the same PCollectionView.
- a_list = [1, 2, 3]
- pipeline = Pipeline('DirectPipelineRunner')
- main_input = pipeline | Create('main input', [1])
- side_list = pipeline | Create('side list', a_list)
- results = main_input | FlatMap(
- 'test',
- lambda x, ls1, ls2: [[x, ls1, ls2]],
- AsList(side_list), AsList(side_list))
-
- def matcher(expected_elem, expected_list):
- def match(actual):
- [[actual_elem, actual_list1, actual_list2]] = actual
- equal_to([expected_elem])([actual_elem])
- equal_to(expected_list)(actual_list1)
- equal_to(expected_list)(actual_list2)
- return match
-
- assert_that(results, matcher(1, [1, 2, 3]))
- pipeline.run()
-
- def test_as_list_with_unique_labels(self):
- a_list = [1, 2, 3]
- pipeline = Pipeline('DirectPipelineRunner')
- main_input = pipeline | Create('main input', [1])
- side_list = pipeline | Create('side list', a_list)
- results = main_input | FlatMap(
- 'test',
- lambda x, ls1, ls2: [[x, ls1, ls2]],
- AsList(side_list), AsList(side_list, label='label'))
-
- def matcher(expected_elem, expected_list):
- def match(actual):
- [[actual_elem, actual_list1, actual_list2]] = actual
- equal_to([expected_elem])([actual_elem])
- equal_to(expected_list)(actual_list1)
- equal_to(expected_list)(actual_list2)
- return match
-
- assert_that(results, matcher(1, [1, 2, 3]))
- pipeline.run()
-
- def test_as_dict_with_unique_labels(self):
- some_kvs = [('a', 1), ('b', 2)]
- pipeline = Pipeline('DirectPipelineRunner')
- main_input = pipeline | Create('main input', [1])
- side_kvs = pipeline | Create('side kvs', some_kvs)
- results = main_input | FlatMap(
- 'test',
- lambda x, dct1, dct2: [[x, dct1, dct2]],
- AsDict(side_kvs), AsDict(side_kvs, label='label'))
-
- def matcher(expected_elem, expected_kvs):
- def match(actual):
- [[actual_elem, actual_dict1, actual_dict2]] = actual
- equal_to([expected_elem])([actual_elem])
- equal_to(expected_kvs)(actual_dict1.iteritems())
- equal_to(expected_kvs)(actual_dict2.iteritems())
- return match
-
- assert_that(results, matcher(1, some_kvs))
- pipeline.run()
-
- def test_window_transform(self):
- class TestWindowFn(WindowFn):
- """Windowing function adding two disjoint windows to each element."""
-
- def assign(self, assign_context):
- _ = assign_context
- return [IntervalWindow(10, 20), IntervalWindow(20, 30)]
-
- def merge(self, existing_windows):
- return existing_windows
-
- pipeline = Pipeline('DirectPipelineRunner')
- numbers = pipeline | Create('KVs', [(1, 10), (2, 20), (3, 30)])
- result = (numbers
- | WindowInto('W', windowfn=TestWindowFn())
- | GroupByKey('G'))
- assert_that(
- result, equal_to([(1, [10]), (1, [10]), (2, [20]),
- (2, [20]), (3, [30]), (3, [30])]))
- pipeline.run()
-
-
-if __name__ == '__main__':
- logging.getLogger().setLevel(logging.INFO)
- unittest.main()
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/google/cloud/dataflow/error.py
----------------------------------------------------------------------
diff --git a/sdks/python/google/cloud/dataflow/error.py b/sdks/python/google/cloud/dataflow/error.py
deleted file mode 100644
index 779c4d9..0000000
--- a/sdks/python/google/cloud/dataflow/error.py
+++ /dev/null
@@ -1,39 +0,0 @@
-# Copyright 2016 Google Inc. All Rights Reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-"""Python Dataflow error classes."""
-
-
-class DataflowError(Exception):
- """Base class for all Dataflow errors."""
-
-
-class PipelineError(DataflowError):
- """An error in the pipeline object (e.g. a PValue not linked to it)."""
-
-
-class PValueError(DataflowError):
- """An error related to a PValue object (e.g. value is not computed)."""
-
-
-class RunnerError(DataflowError):
- """An error related to a Runner object (e.g. cannot find a runner to run)."""
-
-
-class SideInputError(DataflowError):
- """An error related to a side input to a parallel Do operation."""
-
-
-class TransformError(DataflowError):
- """An error related to a PTransform object."""
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/google/cloud/dataflow/examples/__init__.py
----------------------------------------------------------------------
diff --git a/sdks/python/google/cloud/dataflow/examples/__init__.py b/sdks/python/google/cloud/dataflow/examples/__init__.py
deleted file mode 100644
index e69de29..0000000
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/google/cloud/dataflow/examples/complete/autocomplete.py
----------------------------------------------------------------------
diff --git a/sdks/python/google/cloud/dataflow/examples/complete/autocomplete.py b/sdks/python/google/cloud/dataflow/examples/complete/autocomplete.py
deleted file mode 100644
index 400863d..0000000
--- a/sdks/python/google/cloud/dataflow/examples/complete/autocomplete.py
+++ /dev/null
@@ -1,79 +0,0 @@
-# Copyright 2016 Google Inc. All Rights Reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-"""A workflow emitting the top k most common words for each prefix."""
-
-from __future__ import absolute_import
-
-import argparse
-import logging
-import re
-
-import google.cloud.dataflow as df
-
-
-def run(argv=None):
-
- parser = argparse.ArgumentParser()
- parser.add_argument('--input',
- required=True,
- help='Input file to process.')
- parser.add_argument('--output',
- required=True,
- help='Output file to write results to.')
- known_args, pipeline_args = parser.parse_known_args(argv)
-
- p = df.Pipeline(argv=pipeline_args)
-
- (p # pylint: disable=expression-not-assigned
- | df.io.Read('read', df.io.TextFileSource(known_args.input))
- | df.FlatMap('split', lambda x: re.findall(r'[A-Za-z\']+', x))
- | TopPerPrefix('TopPerPrefix', 5)
- | df.Map('format',
- lambda (prefix, candidates): '%s: %s' % (prefix, candidates))
- | df.io.Write('write', df.io.TextFileSink(known_args.output)))
- p.run()
-
-
-class TopPerPrefix(df.PTransform):
-
- def __init__(self, label, count):
- super(TopPerPrefix, self).__init__(label)
- self._count = count
-
- def apply(self, words):
- """Compute the most common words for each possible prefixes.
-
- Args:
- words: a PCollection of strings
-
- Returns:
- A PCollection of most common words with each prefix, in the form
- (prefix, [(count, word), (count, word), ...])
- """
- return (words
- | df.combiners.Count.PerElement()
- | df.FlatMap(extract_prefixes)
- | df.combiners.Top.LargestPerKey(self._count))
-
-
-def extract_prefixes((word, count)):
- for k in range(1, len(word) + 1):
- prefix = word[:k]
- yield prefix, (count, word)
-
-
-if __name__ == '__main__':
- logging.getLogger().setLevel(logging.INFO)
- run()
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/google/cloud/dataflow/examples/complete/autocomplete_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/google/cloud/dataflow/examples/complete/autocomplete_test.py b/sdks/python/google/cloud/dataflow/examples/complete/autocomplete_test.py
deleted file mode 100644
index 3c10483..0000000
--- a/sdks/python/google/cloud/dataflow/examples/complete/autocomplete_test.py
+++ /dev/null
@@ -1,78 +0,0 @@
-# Copyright 2016 Google Inc. All Rights Reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-"""Test for the wordcount example."""
-
-import collections
-import logging
-import re
-import tempfile
-import unittest
-
-
-import google.cloud.dataflow as df
-from google.cloud.dataflow.examples.complete import autocomplete
-from google.cloud.dataflow.pvalue import AsIter
-from google.cloud.dataflow.utils import options
-
-# TODO(robertwb): Move to testing utilities.
-
-
-def assert_that(pcoll, matcher):
- """Asserts that the give PCollection satisfies the constraints of the matcher
- in a way that is runnable locally or on a remote service.
- """
- singleton = pcoll.pipeline | df.Create('create_singleton', [None])
-
- def check_matcher(_, side_value):
- assert matcher(side_value)
- return []
- singleton | df.FlatMap(check_matcher, AsIter(pcoll))
-
-
-def contains_in_any_order(expected):
- def matcher(value):
- vs = collections.Counter(value)
- es = collections.Counter(expected)
- if vs != es:
- raise ValueError(
- 'extra: %s, missing: %s' % (vs - es, es - vs))
- return True
- return matcher
-
-
-class WordCountTest(unittest.TestCase):
-
- WORDS = ['this', 'this', 'that', 'to', 'to', 'to']
-
- def test_top_prefixes(self):
- p = df.Pipeline('DirectPipelineRunner')
- words = p | df.Create('create', self.WORDS)
- result = words | autocomplete.TopPerPrefix('test', 5)
- # values must be hashable for now
- result = result | df.Map(lambda (k, vs): (k, tuple(vs)))
- assert_that(result, contains_in_any_order(
- [
- ('t', ((3, 'to'), (2, 'this'), (1, 'that'))),
- ('to', ((3, 'to'), )),
- ('th', ((2, 'this'), (1, 'that'))),
- ('thi', ((2, 'this'), )),
- ('this', ((2, 'this'), )),
- ('tha', ((1, 'that'), )),
- ('that', ((1, 'that'), )),
- ]))
- p.run()
-
-if __name__ == '__main__':
- unittest.main()
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/google/cloud/dataflow/examples/complete/estimate_pi.py
----------------------------------------------------------------------
diff --git a/sdks/python/google/cloud/dataflow/examples/complete/estimate_pi.py b/sdks/python/google/cloud/dataflow/examples/complete/estimate_pi.py
deleted file mode 100644
index 0e52bad..0000000
--- a/sdks/python/google/cloud/dataflow/examples/complete/estimate_pi.py
+++ /dev/null
@@ -1,109 +0,0 @@
-# -*- coding: utf-8 -*-
-# Copyright 2016 Google Inc. All Rights Reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-"""A workflow that uses a simple Monte Carlo method to estimate \u03c0.
-
-The algorithm computes the fraction of points drawn uniformly within the unit
-square that also fall in the quadrant of the unit circle that overlaps the
-square. A simple area calculation shows that this fraction should be \u03c0/4, so
-we multiply our counts ratio by four to estimate \u03c0.
-"""
-
-from __future__ import absolute_import
-
-import argparse
-import json
-import logging
-import random
-
-
-import google.cloud.dataflow as df
-from google.cloud.dataflow.typehints import Any
-from google.cloud.dataflow.typehints import Iterable
-from google.cloud.dataflow.typehints import Tuple
-
-
-@df.typehints.with_output_types(Tuple[int, int, int])
-@df.typehints.with_input_types(int)
-def run_trials(runs):
- """Run trials and return a 3-tuple representing the results.
-
- Args:
- runs: Number of trial runs to be executed.
-
- Returns:
- A 3-tuple (total trials, inside trials, 0).
-
- The final zero is needed solely to make sure that the combine_results function
- has same type for inputs and outputs (a requirement for combiner functions).
- """
- inside_runs = 0
- for _ in xrange(runs):
- x = random.uniform(0, 1)
- y = random.uniform(0, 1)
- inside_runs += 1 if x * x + y * y <= 1.0 else 0
- return runs, inside_runs, 0
-
-
-@df.typehints.with_output_types(Tuple[int, int, float])
-@df.typehints.with_input_types(Iterable[Tuple[int, int, Any]])
-def combine_results(results):
- """Combiner function to sum up trials and compute the estimate.
-
- Args:
- results: An iterable of 3-tuples (total trials, inside trials, ignored).
-
- Returns:
- A 3-tuple containing the sum of total trials, sum of inside trials, and
- the probability computed from the two numbers.
- """
- # TODO(silviuc): Do we guarantee that argument can be iterated repeatedly?
- # Should document one way or the other.
- total, inside = sum(r[0] for r in results), sum(r[1] for r in results)
- return total, inside, 4 * float(inside) / total
-
-
-class JsonCoder(object):
- """A JSON coder used to format the final result."""
-
- def encode(self, x):
- return json.dumps(x)
-
-
-def run(argv=None):
-
- parser = argparse.ArgumentParser()
- parser.add_argument('--output',
- required=True,
- help='Output file to write results to.')
- known_args, pipeline_args = parser.parse_known_args(argv)
-
- p = df.Pipeline(argv=pipeline_args)
- # A thousand work items of a million tries each.
- (p # pylint: disable=expression-not-assigned
- | df.Create('Initialize', [100000] * 100).with_output_types(int)
- | df.Map('Run trials', run_trials)
- | df.CombineGlobally('Sum', combine_results).without_defaults()
- | df.io.Write('Write',
- df.io.TextFileSink(known_args.output,
- coder=JsonCoder())))
-
- # Actually run the pipeline (all operations above are deferred).
- p.run()
-
-
-if __name__ == '__main__':
- logging.getLogger().setLevel(logging.INFO)
- run()
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/google/cloud/dataflow/examples/complete/estimate_pi_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/google/cloud/dataflow/examples/complete/estimate_pi_test.py b/sdks/python/google/cloud/dataflow/examples/complete/estimate_pi_test.py
deleted file mode 100644
index 0c5be30..0000000
--- a/sdks/python/google/cloud/dataflow/examples/complete/estimate_pi_test.py
+++ /dev/null
@@ -1,46 +0,0 @@
-# Copyright 2016 Google Inc. All Rights Reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-"""Test for the estimate_pi example."""
-
-import json
-import logging
-import tempfile
-import unittest
-
-from google.cloud.dataflow.examples.complete import estimate_pi
-
-
-class EstimatePiTest(unittest.TestCase):
-
- def create_temp_file(self, contents):
- with tempfile.NamedTemporaryFile(delete=False) as f:
- f.write(contents)
- return f.name
-
- def test_basics(self):
- temp_path = self.create_temp_file('result')
- estimate_pi.run([
- '--output=%s' % temp_path])
- # Parse result file and compare.
- with open(temp_path + '-00000-of-00001') as result_file:
- estimated_pi = json.loads(result_file.readline())[2]
- # Note: Probabilistically speaking this test can fail with a probability
- # that is very small (VERY) given that we run at least 10 million trials.
- self.assertTrue(estimated_pi > 3.13 and estimated_pi < 3.15)
-
-
-if __name__ == '__main__':
- logging.getLogger().setLevel(logging.INFO)
- unittest.main()
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/google/cloud/dataflow/examples/complete/juliaset/juliaset/__init__.py
----------------------------------------------------------------------
diff --git a/sdks/python/google/cloud/dataflow/examples/complete/juliaset/juliaset/__init__.py b/sdks/python/google/cloud/dataflow/examples/complete/juliaset/juliaset/__init__.py
deleted file mode 100644
index e69de29..0000000
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/google/cloud/dataflow/examples/complete/juliaset/juliaset/juliaset.py
----------------------------------------------------------------------
diff --git a/sdks/python/google/cloud/dataflow/examples/complete/juliaset/juliaset/juliaset.py b/sdks/python/google/cloud/dataflow/examples/complete/juliaset/juliaset/juliaset.py
deleted file mode 100644
index 3546f03..0000000
--- a/sdks/python/google/cloud/dataflow/examples/complete/juliaset/juliaset/juliaset.py
+++ /dev/null
@@ -1,119 +0,0 @@
-# Copyright 2016 Google Inc. All Rights Reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-"""A Julia set computing workflow: https://en.wikipedia.org/wiki/Julia_set.
-
-We use the quadratic polinomial f(z) = z*z + c, with c = -.62772 +.42193i
-"""
-
-from __future__ import absolute_import
-
-import argparse
-
-import google.cloud.dataflow as df
-
-
-def from_pixel(x, y, n):
- """Converts a NxN pixel position to a (-1..1, -1..1) complex number."""
- return complex(2.0 * x / n - 1.0, 2.0 * y / n - 1.0)
-
-
-def get_julia_set_point_color((x, y), c, n, max_iterations):
- """Given an pixel, convert it into a point in our julia set."""
- z = from_pixel(x, y, n)
- for i in xrange(max_iterations):
- if z.real * z.real + z.imag * z.imag > 2.0:
- break
- z = z * z + c
- return x, y, i # pylint: disable=undefined-loop-variable
-
-
-def generate_julia_set_colors(pipeline, c, n, max_iterations):
- """Compute julia set coordinates for each point in our set."""
- def point_set(n):
- for x in range(n):
- for y in range(n):
- yield (x, y)
-
- julia_set_colors = (pipeline
- | df.Create('add points', point_set(n))
- | df.Map(get_julia_set_point_color, c, n, max_iterations))
-
- return julia_set_colors
-
-
-def generate_julia_set_visualization(data, n, max_iterations):
- """Generate the pixel matrix for rendering the julia set as an image."""
- import numpy as np # pylint: disable=g-import-not-at-top
- colors = []
- for r in range(0, 256, 16):
- for g in range(0, 256, 16):
- for b in range(0, 256, 16):
- colors.append((r, g, b))
-
- xy = np.zeros((n, n, 3), dtype=np.uint8)
- for x, y, iteration in data:
- xy[x, y] = colors[iteration * len(colors) / max_iterations]
-
- return xy
-
-
-def save_julia_set_visualization(out_file, image_array):
- """Save the fractal image of our julia set as a png."""
- from matplotlib import pyplot as plt # pylint: disable=g-import-not-at-top
- plt.imsave(out_file, image_array, format='png')
-
-
-def run(argv=None): # pylint: disable=missing-docstring
-
- parser = argparse.ArgumentParser()
- parser.add_argument('--grid_size',
- dest='grid_size',
- default=1000,
- help='Size of the NxN matrix')
- parser.add_argument(
- '--coordinate_output',
- dest='coordinate_output',
- required=True,
- help='Output file to write the color coordinates of the image to.')
- parser.add_argument('--image_output',
- dest='image_output',
- default=None,
- help='Output file to write the resulting image to.')
- known_args, pipeline_args = parser.parse_known_args(argv)
-
- p = df.Pipeline(argv=pipeline_args)
- n = int(known_args.grid_size)
-
- coordinates = generate_julia_set_colors(p, complex(-.62772, .42193), n, 100)
-
- # Group each coordinate triplet by its x value, then write the coordinates to
- # the output file with an x-coordinate grouping per line.
- # pylint: disable=expression-not-assigned
- # pylint: disable=g-long-lambda
- (coordinates | df.Map('x coord key', lambda (x, y, i): (x, (x, y, i)))
- | df.GroupByKey('x coord') | df.Map(
- 'format',
- lambda (k, coords): ' '.join('(%s, %s, %s)' % coord for coord in coords))
- | df.io.Write('write', df.io.TextFileSink(known_args.coordinate_output)))
- # pylint: enable=g-long-lambda
- # pylint: enable=expression-not-assigned
- p.run()
-
- # Optionally render the image and save it to a file.
- # TODO(silviuc): Add this functionality.
- # if p.options.image_output is not None:
- # julia_set_image = generate_julia_set_visualization(
- # file_with_coordinates, n, 100)
- # save_julia_set_visualization(p.options.image_output, julia_set_image)
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/google/cloud/dataflow/examples/complete/juliaset/juliaset/juliaset_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/google/cloud/dataflow/examples/complete/juliaset/juliaset/juliaset_test.py b/sdks/python/google/cloud/dataflow/examples/complete/juliaset/juliaset/juliaset_test.py
deleted file mode 100644
index 33c434a..0000000
--- a/sdks/python/google/cloud/dataflow/examples/complete/juliaset/juliaset/juliaset_test.py
+++ /dev/null
@@ -1,83 +0,0 @@
-# Copyright 2016 Google Inc. All Rights Reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-"""Test for the juliaset example."""
-
-import logging
-import os
-import re
-import tempfile
-import unittest
-
-
-from google.cloud.dataflow.examples.complete.juliaset.juliaset import juliaset
-
-
-class JuliaSetTest(unittest.TestCase):
-
- def setUp(self):
- self.test_files = {}
- self.test_files['output_coord_file_name'] = self.generate_temp_file()
- self.test_files['output_image_file_name'] = self.generate_temp_file()
-
- def tearDown(self):
- for test_file in self.test_files.values():
- if os.path.exists(test_file):
- os.remove(test_file)
-
- def generate_temp_file(self):
- with tempfile.NamedTemporaryFile(delete=False) as temp_file:
- return temp_file.name
-
- def run_example(self, grid_size, image_file_name=None):
- args = [
- '--coordinate_output=%s' % self.test_files['output_coord_file_name'],
- '--grid_size=%s' % grid_size,
- ]
-
- if image_file_name is not None:
- args.append('--image_output=%s' % image_file_name)
-
- juliaset.run(args)
-
- def test_output_file_format(self):
- grid_size = 5
- self.run_example(grid_size)
-
- # Parse the results from the file, and ensure it was written in the proper
- # format.
- with open(self.test_files['output_coord_file_name'] +
- '-00000-of-00001') as result_file:
- output_lines = result_file.readlines()
-
- # Should have a line for each x-coordinate.
- self.assertEqual(grid_size, len(output_lines))
- for line in output_lines:
- coordinates = re.findall(r'(\(\d+, \d+, \d+\))', line)
-
- # Should have 5 coordinates on each line.
- self.assertTrue(coordinates)
- self.assertEqual(grid_size, len(coordinates))
-
- def test_generate_fractal_image(self):
- temp_image_file = self.test_files['output_image_file_name']
- self.run_example(10, image_file_name=temp_image_file)
-
- # Ensure that the image was saved properly.
- # TODO(silviuc): Reactivate the test when --image_output is supported.
- # self.assertTrue(os.stat(temp_image_file).st_size > 0)
-
-if __name__ == '__main__':
- logging.getLogger().setLevel(logging.INFO)
- unittest.main()
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/google/cloud/dataflow/examples/complete/juliaset/juliaset_main.py
----------------------------------------------------------------------
diff --git a/sdks/python/google/cloud/dataflow/examples/complete/juliaset/juliaset_main.py b/sdks/python/google/cloud/dataflow/examples/complete/juliaset/juliaset_main.py
deleted file mode 100644
index 39a58d6..0000000
--- a/sdks/python/google/cloud/dataflow/examples/complete/juliaset/juliaset_main.py
+++ /dev/null
@@ -1,55 +0,0 @@
-# Copyright 2016 Google Inc. All Rights Reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-"""A Julia set computing workflow: https://en.wikipedia.org/wiki/Julia_set.
-
-This example has in the juliaset/ folder all the code needed to execute the
-workflow. It is organized in this way so that it can be packaged as a Python
-package and later installed in the VM workers executing the job. The root
-directory for the example contains just a "driver" script to launch the job
-and the setup.py file needed to create a package.
-
-The advantages for organizing the code is that large projects will naturally
-evolve beyond just one module and you will have to make sure the additional
-modules are present in the worker.
-
-In Python Dataflow, using the --setup_file option when submitting a job, will
-trigger creating a source distribution (as if running python setup.py sdist) and
-then staging the resulting tarball in the staging area. The workers, upon
-startup, will install the tarball.
-
-Below is a complete command line for running the juliaset workflow remotely as
-an example:
-
-python juliaset_main.py \
- --job_name juliaset-$USER \
- --project YOUR-PROJECT \
- --runner BlockingDataflowPipelineRunner \
- --setup_file ./setup.py \
- --staging_location gs://YOUR-BUCKET/juliaset/staging \
- --temp_location gs://YOUR-BUCKET/juliaset/temp \
- --coordinate_output gs://YOUR-BUCKET/juliaset/out \
- --grid_size 20 \
-
-"""
-
-import logging
-
-
-from juliaset import juliaset
-
-
-if __name__ == '__main__':
- logging.getLogger().setLevel(logging.INFO)
- juliaset.run()
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/google/cloud/dataflow/examples/complete/juliaset/setup.py
----------------------------------------------------------------------
diff --git a/sdks/python/google/cloud/dataflow/examples/complete/juliaset/setup.py b/sdks/python/google/cloud/dataflow/examples/complete/juliaset/setup.py
deleted file mode 100644
index 91d6588..0000000
--- a/sdks/python/google/cloud/dataflow/examples/complete/juliaset/setup.py
+++ /dev/null
@@ -1,115 +0,0 @@
-# Copyright 2016 Google Inc. All Rights Reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-"""Setup.py module for the workflow's worker utilities.
-
-All the workflow related code is gathered in a package that will be built as a
-source distribution, staged in the staging area for the workflow being run and
-then installed in the workers when they start running.
-
-This behavior is triggered by specifying the --setup_file command line option
-when running the workflow for remote execution.
-"""
-
-import subprocess
-
-import setuptools
-from setuptools.command.bdist_egg import bdist_egg as _bdist_egg
-
-
-class bdist_egg(_bdist_egg): # pylint: disable=invalid-name
- """A bdist_egg command class that will be invoked during package install.
-
- The package built using the current setup.py will be staged and later
- installed in the worker using `easy_install package'. This class will be
- instantiated during install for this specific scenario and will trigger
- running the custom commands specified.
- """
-
- def run(self):
- self.run_command('CustomCommands')
- _bdist_egg.run(self)
-
-
-# Some custom command to run during setup. The command is not essential for this
-# workflow. It is used here as an example. Each command will spawn a child
-# process. Typically, these commands will include steps to install non-Python
-# packages. For instance, to install a C++-based library libjpeg62 the following
-# two commands will have to be added:
-#
-# ['apt-get', 'update'],
-# ['apt-get', '--assume-yes', install', 'libjpeg62'],
-#
-# First, note that there is no need to use the sudo command because the setup
-# script runs with appropriate access.
-# Second, if apt-get tool is used then the first command needs to be 'apt-get
-# update' so the tool refreshes itself and initializes links to download
-# repositories. Without this initial step the other apt-get install commands
-# will fail with package not found errors. Note also --assume-yes option which
-# shortcuts the interactive confirmation.
-#
-# The output of custom commands (including failures) will be logged in the
-# worker-startup log.
-CUSTOM_COMMANDS = [
- ['echo', 'Custom command worked!']]
-
-
-class CustomCommands(setuptools.Command):
- """A setuptools Command class able to run arbitrary commands."""
-
- def initialize_options(self):
- pass
-
- def finalize_options(self):
- pass
-
- def RunCustomCommand(self, command_list):
- print 'Running command: %s' % command_list
- p = subprocess.Popen(
- command_list,
- stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
- # Can use communicate(input='y\n'.encode()) if the command run requires
- # some confirmation.
- stdout_data, _ = p.communicate()
- print 'Command output: %s' % stdout_data
- if p.returncode != 0:
- raise RuntimeError(
- 'Command %s failed: exit code: %s' % (command_list, p.returncode))
-
- def run(self):
- for command in CUSTOM_COMMANDS:
- self.RunCustomCommand(command)
-
-
-# Configure the required packages and scripts to install.
-# Note that the Python Dataflow containers come with numpy already installed
-# so this dependency will not trigger anything to be installed unless a version
-# restriction is specified.
-REQUIRED_PACKAGES = [
- 'numpy',
- ]
-
-
-setuptools.setup(
- name='juliaset',
- version='0.0.1',
- description='Julia set workflow package.',
- install_requires=REQUIRED_PACKAGES,
- packages=setuptools.find_packages(),
- cmdclass={
- # Command class instantiated and run during easy_install scenarios.
- 'bdist_egg': bdist_egg,
- 'CustomCommands': CustomCommands,
- }
- )
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/google/cloud/dataflow/examples/complete/tfidf.py
----------------------------------------------------------------------
diff --git a/sdks/python/google/cloud/dataflow/examples/complete/tfidf.py b/sdks/python/google/cloud/dataflow/examples/complete/tfidf.py
deleted file mode 100644
index fcdfac8..0000000
--- a/sdks/python/google/cloud/dataflow/examples/complete/tfidf.py
+++ /dev/null
@@ -1,196 +0,0 @@
-# Copyright 2016 Google Inc. All Rights Reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-"""A TF-IDF workflow (term frequency - inverse document frequency).
-
-For an explanation of the TF-IDF algorithm see the following link:
-http://en.wikipedia.org/wiki/Tf-idf
-"""
-
-from __future__ import absolute_import
-
-import argparse
-import glob
-import math
-import re
-
-import google.cloud.dataflow as df
-from google.cloud.dataflow.pvalue import AsSingleton
-
-
-def read_documents(pipeline, uris):
- """Read the documents at the provided uris and returns (uri, line) pairs."""
- pcolls = []
- for uri in uris:
- pcolls.append(
- pipeline
- | df.io.Read('read: %s' % uri, df.io.TextFileSource(uri))
- | df.Map('withkey: %s' % uri, lambda v, uri: (uri, v), uri))
- return pcolls | df.Flatten('flatten read pcolls')
-
-
-class TfIdf(df.PTransform):
- """A transform containing a basic TF-IDF pipeline.
-
- The input consists of KV objects where the key is the document's URI and
- the value is a piece of the document's content.
- The output is mapping from terms to scores for each document URI.
- """
-
- def apply(self, uri_to_content):
-
- # Compute the total number of documents, and prepare a singleton
- # PCollection to use as side input.
- total_documents = (
- uri_to_content
- | df.Keys('get uris')
- | df.RemoveDuplicates('get unique uris')
- | df.combiners.Count.Globally(' count uris'))
-
- # Create a collection of pairs mapping a URI to each of the words
- # in the document associated with that that URI.
-
- def split_into_words((uri, line)):
- return [(uri, w.lower()) for w in re.findall(r'[A-Za-z\']+', line)]
-
- uri_to_words = (
- uri_to_content
- | df.FlatMap('split words', split_into_words))
-
- # Compute a mapping from each word to the total number of documents
- # in which it appears.
- word_to_doc_count = (
- uri_to_words
- | df.RemoveDuplicates('get unique words per doc')
- | df.Values('get words')
- | df.combiners.Count.PerElement('count docs per word'))
-
- # Compute a mapping from each URI to the total number of words in the
- # document associated with that URI.
- uri_to_word_total = (
- uri_to_words
- | df.Keys(' get uris')
- | df.combiners.Count.PerElement('count words in doc'))
-
- # Count, for each (URI, word) pair, the number of occurrences of that word
- # in the document associated with the URI.
- uri_and_word_to_count = (
- uri_to_words
- | df.combiners.Count.PerElement('count word-doc pairs'))
-
- # Adjust the above collection to a mapping from (URI, word) pairs to counts
- # into an isomorphic mapping from URI to (word, count) pairs, to prepare
- # for a join by the URI key.
- uri_to_word_and_count = (
- uri_and_word_to_count
- | df.Map('shift keys',
- lambda ((uri, word), count): (uri, (word, count))))
-
- # Perform a CoGroupByKey (a sort of pre-join) on the prepared
- # uri_to_word_total and uri_to_word_and_count tagged by 'word totals' and
- # 'word counts' strings. This yields a mapping from URI to a dictionary
- # that maps the above mentioned tag strings to an iterable containing the
- # word total for that URI and word and count respectively.
- #
- # A diagram (in which '[]' just means 'iterable'):
- #
- # URI: {'word totals': [count], # Total words within this URI's document.
- # 'word counts': [(word, count), # Counts of specific words
- # (word, count), # within this URI's document.
- # ... ]}
- uri_to_word_and_count_and_total = (
- {'word totals': uri_to_word_total, 'word counts': uri_to_word_and_count}
- | df.CoGroupByKey('cogroup by uri'))
-
- # Compute a mapping from each word to a (URI, term frequency) pair for each
- # URI. A word's term frequency for a document is simply the number of times
- # that word occurs in the document divided by the total number of words in
- # the document.
-
- def compute_term_frequency((uri, count_and_total)):
- word_and_count = count_and_total['word counts']
- # We have an iterable for one element that we want extracted.
- [word_total] = count_and_total['word totals']
- for word, count in word_and_count:
- yield word, (uri, float(count) / word_total)
-
- word_to_uri_and_tf = (
- uri_to_word_and_count_and_total
- | df.FlatMap('compute term frequencies', compute_term_frequency))
-
- # Compute a mapping from each word to its document frequency.
- # A word's document frequency in a corpus is the number of
- # documents in which the word appears divided by the total
- # number of documents in the corpus.
- #
- # This calculation uses a side input, a Dataflow-computed auxiliary value
- # presented to each invocation of our MapFn lambda. The second argument to
- # the lambda (called total---note that we are unpacking the first argument)
- # receives the value we listed after the lambda in Map(). Additional side
- # inputs (and ordinary Python values, too) can be provided to MapFns and
- # DoFns in this way.
- word_to_df = (
- word_to_doc_count
- | df.Map('compute doc frequencies',
- lambda (word, count), total: (word, float(count) / total),
- AsSingleton(total_documents)))
-
- # Join the term frequency and document frequency collections,
- # each keyed on the word.
- word_to_uri_and_tf_and_df = (
- {'tf': word_to_uri_and_tf, 'df': word_to_df}
- | df.CoGroupByKey('cogroup words by tf-df'))
-
- # Compute a mapping from each word to a (URI, TF-IDF) score for each URI.
- # There are a variety of definitions of TF-IDF
- # ("term frequency - inverse document frequency") score; here we use a
- # basic version that is the term frequency divided by the log of the
- # document frequency.
-
- def compute_tf_idf((word, tf_and_df)):
- [docf] = tf_and_df['df']
- for uri, tf in tf_and_df['tf']:
- yield word, (uri, tf * math.log(1 / docf))
-
- word_to_uri_and_tfidf = (
- word_to_uri_and_tf_and_df
- | df.FlatMap('compute tf-idf', compute_tf_idf))
-
- return word_to_uri_and_tfidf
-
-
-def run(argv=None):
- """Main entry point; defines and runs the tfidf pipeline."""
- parser = argparse.ArgumentParser()
- parser.add_argument('--uris',
- required=True,
- help='URIs to process.')
- parser.add_argument('--output',
- required=True,
- help='Output file to write results to.')
- known_args, pipeline_args = parser.parse_known_args(argv)
-
- p = df.Pipeline(argv=pipeline_args)
- # Read documents specified by the uris command line option.
- pcoll = read_documents(p, glob.glob(known_args.uris))
- # Compute TF-IDF information for each word.
- output = pcoll | TfIdf()
- # Write the output using a "Write" transform that has side effects.
- # pylint: disable=expression-not-assigned
- output | df.io.Write('write', df.io.TextFileSink(known_args.output))
- p.run()
-
-
-if __name__ == '__main__':
- run()
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/google/cloud/dataflow/examples/complete/tfidf_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/google/cloud/dataflow/examples/complete/tfidf_test.py b/sdks/python/google/cloud/dataflow/examples/complete/tfidf_test.py
deleted file mode 100644
index 85b4964..0000000
--- a/sdks/python/google/cloud/dataflow/examples/complete/tfidf_test.py
+++ /dev/null
@@ -1,88 +0,0 @@
-# Copyright 2016 Google Inc. All Rights Reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-"""Test for the TF-IDF example."""
-
-import logging
-import os
-import re
-import tempfile
-import unittest
-
-import google.cloud.dataflow as df
-from google.cloud.dataflow.examples.complete import tfidf
-
-
-EXPECTED_RESULTS = set([
- ('ghi', '1.txt', 0.3662040962227032),
- ('abc', '1.txt', 0.0),
- ('abc', '3.txt', 0.0),
- ('abc', '2.txt', 0.0),
- ('def', '1.txt', 0.13515503603605478),
- ('def', '2.txt', 0.2027325540540822)])
-
-
-EXPECTED_LINE_RE = r'\(u\'([a-z]*)\', \(\'.*([0-9]\.txt)\', (.*)\)\)'
-
-
-class TfIdfTest(unittest.TestCase):
-
- def create_file(self, path, contents):
- logging.info('Creating temp file: %s', path)
- with open(path, 'w') as f:
- f.write(contents)
-
- def test_tfidf_transform(self):
- p = df.Pipeline('DirectPipelineRunner')
- uri_to_line = p | df.Create(
- 'create sample',
- [('1.txt', 'abc def ghi'),
- ('2.txt', 'abc def'),
- ('3.txt', 'abc')])
- result = (
- uri_to_line
- | tfidf.TfIdf()
- | df.Map('flatten', lambda (word, (uri, tfidf)): (word, uri, tfidf)))
- df.assert_that(result, df.equal_to(EXPECTED_RESULTS))
- # Run the pipeline. Note that the assert_that above adds to the pipeline
- # a check that the result PCollection contains expected values. To actually
- # trigger the check the pipeline must be run.
- p.run()
-
- def test_basics(self):
- # Setup the files with expected content.
- temp_folder = tempfile.mkdtemp()
- self.create_file(os.path.join(temp_folder, '1.txt'), 'abc def ghi')
- self.create_file(os.path.join(temp_folder, '2.txt'), 'abc def')
- self.create_file(os.path.join(temp_folder, '3.txt'), 'abc')
- tfidf.run([
- '--uris=%s/*' % temp_folder,
- '--output', os.path.join(temp_folder, 'result')])
- # Parse result file and compare.
- results = []
- with open(os.path.join(temp_folder,
- 'result-00000-of-00001')) as result_file:
- for line in result_file:
- match = re.search(EXPECTED_LINE_RE, line)
- logging.info('Result line: %s', line)
- if match is not None:
- results.append(
- (match.group(1), match.group(2), float(match.group(3))))
- logging.info('Computed results: %s', set(results))
- self.assertEqual(set(results), EXPECTED_RESULTS)
-
-
-if __name__ == '__main__':
- logging.getLogger().setLevel(logging.INFO)
- unittest.main()
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/google/cloud/dataflow/examples/complete/top_wikipedia_sessions.py
----------------------------------------------------------------------
diff --git a/sdks/python/google/cloud/dataflow/examples/complete/top_wikipedia_sessions.py b/sdks/python/google/cloud/dataflow/examples/complete/top_wikipedia_sessions.py
deleted file mode 100644
index d0935fe..0000000
--- a/sdks/python/google/cloud/dataflow/examples/complete/top_wikipedia_sessions.py
+++ /dev/null
@@ -1,170 +0,0 @@
-# Copyright 2016 Google Inc. All Rights Reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-"""An example that reads Wikipedia edit data and computes strings of edits.
-
-An example that reads Wikipedia edit data from Cloud Storage and computes the
-user with the longest string of edits separated by no more than an hour within
-each 30 day period.
-
-To execute this pipeline locally using the DirectPipelineRunner, specify an
-output prefix on GCS:
- --output gs://YOUR_OUTPUT_PREFIX
-
-To execute this pipeline using the Google Cloud Dataflow service, specify
-pipeline configuration in addition to the above:
- --job_name NAME_FOR_YOUR_JOB
- --project YOUR_PROJECT_ID
- --staging_location gs://YOUR_STAGING_DIRECTORY
- --temp_location gs://YOUR_TEMPORARY_DIRECTORY
- --runner BlockingDataflowPipelineRunner
-
-The default input is gs://dataflow-samples/wikipedia_edits/*.json and can be
-overridden with --input.
-"""
-
-from __future__ import absolute_import
-
-import argparse
-import json
-import logging
-import sys
-
-import google.cloud.dataflow as df
-from google.cloud.dataflow import combiners
-from google.cloud.dataflow import window
-
-ONE_HOUR_IN_SECONDS = 3600
-THIRTY_DAYS_IN_SECONDS = 30 * 24 * ONE_HOUR_IN_SECONDS
-
-
-class ExtractUserAndTimestampDoFn(df.DoFn):
- """Extracts user and timestamp representing a Wikipedia edit."""
-
- def process(self, context):
- table_row = json.loads(context.element)
- if 'contributor_username' in table_row:
- user_name = table_row['contributor_username']
- timestamp = table_row['timestamp']
- yield window.TimestampedValue(user_name, timestamp)
-
-
-class ComputeSessions(df.PTransform):
- """Computes the number of edits in each user session.
-
- A session is defined as a string of edits where each is separated from the
- next by less than an hour.
- """
-
- def __init__(self):
- super(ComputeSessions, self).__init__()
-
- def apply(self, pcoll):
- return (pcoll
- | df.WindowInto('ComputeSessionsWindow',
- window.Sessions(gap_size=ONE_HOUR_IN_SECONDS))
- | combiners.Count.PerElement())
-
-
-class TopPerMonth(df.PTransform):
- """Computes the longest session ending in each month."""
-
- def __init__(self):
- super(TopPerMonth, self).__init__()
-
- def apply(self, pcoll):
- return (pcoll
- | df.WindowInto('TopPerMonthWindow',
- window.FixedWindows(
- size=THIRTY_DAYS_IN_SECONDS))
- | combiners.core.CombineGlobally(
- 'Top',
- combiners.TopCombineFn(
- 10, lambda first, second: first[1] < second[1]))
- .without_defaults())
-
-
-class SessionsToStringsDoFn(df.DoFn):
- """Adds the session information to be part of the key."""
-
- def process(self, context):
- yield (context.element[0] + ' : ' +
- ', '.join([str(w) for w in context.windows]), context.element[1])
-
-
-class FormatOutputDoFn(df.DoFn):
- """Formats a string containing the user, count, and session."""
-
- def process(self, context):
- for kv in context.element:
- session = kv[0]
- count = kv[1]
- yield (session + ' : ' + str(count) + ' : '
- + ', '.join([str(w) for w in context.windows]))
-
-
-class ComputeTopSessions(df.PTransform):
- """Computes the top user sessions for each month."""
-
- def __init__(self, sampling_threshold):
- super(ComputeTopSessions, self).__init__()
- self.sampling_threshold = sampling_threshold
-
- def apply(self, pcoll):
- return (pcoll
- | df.ParDo('ExtractUserAndTimestamp', ExtractUserAndTimestampDoFn())
- | df.Filter(
- lambda x: abs(hash(x)) <= sys.maxint * self.sampling_threshold)
- | ComputeSessions()
- | df.ParDo('SessionsToStrings', SessionsToStringsDoFn())
- | TopPerMonth()
- | df.ParDo('FormatOutput', FormatOutputDoFn()))
-
-
-def run(argv=None):
- """Runs the Wikipedia top edits pipeline.
-
- Args:
- argv: Pipeline options as a list of arguments.
- """
-
- parser = argparse.ArgumentParser()
- parser.add_argument(
- '--input',
- dest='input',
- default='gs://dataflow-samples/wikipedia_edits/*.json',
- help='Input specified as a GCS path containing a BigQuery table exported '
- 'as json.')
- parser.add_argument('--output',
- required=True,
- help='Output file to write results to.')
- parser.add_argument('--sampling_threshold',
- type=float,
- default=0.1,
- help='Fraction of entries used for session tracking')
- known_args, pipeline_args = parser.parse_known_args(argv)
-
- p = df.Pipeline(argv=pipeline_args)
-
- (p # pylint: disable=expression-not-assigned
- | df.Read('read', df.io.TextFileSource(known_args.input))
- | ComputeTopSessions(known_args.sampling_threshold)
- | df.io.Write('write', df.io.TextFileSink(known_args.output)))
-
- p.run()
-
-
-if __name__ == '__main__':
- logging.getLogger().setLevel(logging.INFO)
- run()
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/google/cloud/dataflow/examples/complete/top_wikipedia_sessions_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/google/cloud/dataflow/examples/complete/top_wikipedia_sessions_test.py b/sdks/python/google/cloud/dataflow/examples/complete/top_wikipedia_sessions_test.py
deleted file mode 100644
index a4fdf8c..0000000
--- a/sdks/python/google/cloud/dataflow/examples/complete/top_wikipedia_sessions_test.py
+++ /dev/null
@@ -1,58 +0,0 @@
-# Copyright 2016 Google Inc. All Rights Reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-"""Test for the top wikipedia sessions example."""
-
-import json
-import unittest
-
-
-import google.cloud.dataflow as df
-from google.cloud.dataflow.examples.complete import top_wikipedia_sessions
-
-
-class ComputeTopSessionsTest(unittest.TestCase):
-
- EDITS = [
- json.dumps({'timestamp': 0.0, 'contributor_username': 'user1'}),
- json.dumps({'timestamp': 0.001, 'contributor_username': 'user1'}),
- json.dumps({'timestamp': 0.002, 'contributor_username': 'user1'}),
- json.dumps({'timestamp': 0.0, 'contributor_username': 'user2'}),
- json.dumps({'timestamp': 0.001, 'contributor_username': 'user2'}),
- json.dumps({'timestamp': 3.601, 'contributor_username': 'user2'}),
- json.dumps({'timestamp': 3.602, 'contributor_username': 'user2'}),
- json.dumps(
- {'timestamp': 2 * 3600.0, 'contributor_username': 'user2'}),
- json.dumps(
- {'timestamp': 35 * 24 * 3.600, 'contributor_username': 'user3'})
- ]
-
- EXPECTED = [
- 'user1 : [0.0, 3600.002) : 3 : [0.0, 2592000.0)',
- 'user2 : [0.0, 3603.602) : 4 : [0.0, 2592000.0)',
- 'user2 : [7200.0, 10800.0) : 1 : [0.0, 2592000.0)',
- 'user3 : [3024.0, 6624.0) : 1 : [0.0, 2592000.0)',
- ]
-
- def test_compute_top_sessions(self):
- p = df.Pipeline('DirectPipelineRunner')
- edits = p | df.Create('create', self.EDITS)
- result = edits | top_wikipedia_sessions.ComputeTopSessions(1.0)
-
- df.assert_that(result, df.equal_to(self.EXPECTED))
- p.run()
-
-
-if __name__ == '__main__':
- unittest.main()