You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/07/12 15:33:22 UTC
[1/2] incubator-beam git commit: Adds more code snippets.
Repository: incubator-beam
Updated Branches:
refs/heads/python-sdk b6bb97f57 -> 4006f7779
Adds more code snippets.
Adds code snippets for sources, sinks, and joining using side inputs.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/22ff002f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/22ff002f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/22ff002f
Branch: refs/heads/python-sdk
Commit: 22ff002f385141c9e7d98a04f2edb817f43623a0
Parents: b6bb97f
Author: Chamikara Jayalath <ch...@apache.org>
Authored: Sat Jun 25 01:53:12 2016 -0700
Committer: Chamikara Jayalath <ch...@google.com>
Committed: Mon Jul 11 22:49:11 2016 -0700
----------------------------------------------------------------------
.../apache_beam/examples/snippets/__init__.py | 16 ++
.../apache_beam/examples/snippets/snippets.py | 195 +++++++++++++++++++
.../examples/snippets/snippets_test.py | 73 +++++++
3 files changed, 284 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/22ff002f/sdks/python/apache_beam/examples/snippets/__init__.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/snippets/__init__.py b/sdks/python/apache_beam/examples/snippets/__init__.py
new file mode 100644
index 0000000..cce3aca
--- /dev/null
+++ b/sdks/python/apache_beam/examples/snippets/__init__.py
@@ -0,0 +1,16 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/22ff002f/sdks/python/apache_beam/examples/snippets/snippets.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/snippets/snippets.py b/sdks/python/apache_beam/examples/snippets/snippets.py
index d84deea..57ddbee 100644
--- a/sdks/python/apache_beam/examples/snippets/snippets.py
+++ b/sdks/python/apache_beam/examples/snippets/snippets.py
@@ -609,6 +609,162 @@ def examples_wordcount_debugging(renames):
p.run()
+def model_custom_source(count):
+ """Demonstrates creating a new custom source and using it in a pipeline.
+
+ Defines a new source 'CountingSource' that produces integers starting from 0
+ up to a given size.
+
+ Uses the new source in an example pipeline.
+
+ Args:
+ count: the size of the counting source to be used in the pipeline
+ demonstrated in this method.
+ """
+
+ import apache_beam as beam
+ from apache_beam.io import iobase
+ from apache_beam.io.range_trackers import OffsetRangeTracker
+ from apache_beam.utils.options import PipelineOptions
+
+ # Defining a new source.
+ # [START model_custom_source_new_source]
+ class CountingSource(iobase.BoundedSource):
+
+ def __init__(self, count):
+ self._count = count
+
+ def estimate_size(self):
+ return self._count
+
+ def get_range_tracker(self, start_position, stop_position):
+ if start_position is None:
+ start_position = 0
+ if stop_position is None:
+ stop_position = self._count
+
+ return OffsetRangeTracker(start_position, stop_position)
+
+ def read(self, range_tracker):
+ for i in range(self._count):
+ if not range_tracker.try_claim(i):
+ return
+ yield i
+
+ def split(self, desired_bundle_size, start_position=None,
+ stop_position=None):
+ if start_position is None:
+ start_position = 0
+ if stop_position is None:
+ stop_position = self._count
+
+ bundle_start = start_position
+ while bundle_start < self._count:
+ bundle_stop = max(self._count, bundle_start + desired_bundle_size)
+ yield iobase.SourceBundle(weight=(bundle_stop - bundle_start),
+ source=self,
+ start_position=bundle_start,
+ stop_position=bundle_stop)
+ bundle_start = bundle_stop
+ # [END model_custom_source_new_source]
+
+ # Using the source in an example pipeline.
+ # [START model_custom_source_use_new_source]
+ p = beam.Pipeline(options=PipelineOptions())
+ numbers = p | beam.io.Read('ProduceNumbers', CountingSource(count))
+ # [END model_custom_source_use_new_source]
+
+ lines = numbers | beam.core.Map(lambda number: 'line %d' % number)
+ beam.assert_that(
+ lines, beam.equal_to(
+ ['line ' + str(number) for number in range(0, count)]))
+
+ p.run()
+
+
+def model_custom_sink(simplekv, KVs, final_table_name):
+ """Demonstrates creating a new custom sink and using it in a pipeline.
+
+ Defines a new sink 'SimpleKVSink' that demonstrates writing to a simple
+ key-value based storage system.
+
+ Uses the new sink in an example pipeline.
+
+ Args:
+ simplekv: an object that mocks the key-value storage. The API of the
+ key-value storage consists of following methods.
+ simplekv.connect(url) -
+ connects to the storage and returns an access token
+ which can be used to perform further operations
+ simplekv.open_table(access_token, table_name) -
+ creates a table named 'table_name'. Returns a table object.
+ simplekv.write_to_table(table, access_token, key, value) -
+ writes a key-value pair to the given table.
+ simplekv.rename_table(access_token, old_name, new_name) -
+ renames the table named 'old_name' to 'new_name'.
+ KVs: the set of key-value pairs to be written in the example pipeline.
+ final_table_name: the prefix of final set of tables to be created by the
+ example pipeline.
+ """
+
+ import apache_beam as beam
+ from apache_beam.io import iobase
+ from apache_beam.utils.options import PipelineOptions
+
+ # Defining the new sink.
+ # [START model_custom_sink_new_sink]
+ class SimpleKVSink(iobase.Sink):
+
+ def __init__(self, url, final_table_name):
+ self._url = url
+ self._final_table_name = final_table_name
+
+ def initialize_write(self):
+ access_token = simplekv.connect(self._url)
+ return access_token
+
+ def open_writer(self, access_token, uid):
+ table_name = 'table' + uid
+ return SimpleKVWriter(access_token, table_name)
+
+ def finalize_write(self, access_token, table_names):
+ for i, table_name in enumerate(table_names):
+ simplekv.rename_table(
+ access_token, table_name, self._final_table_name + str(i))
+ # [END model_custom_sink_new_sink]
+
+ # Defining a writer for the new sink.
+ # [START model_custom_sink_new_writer]
+ class SimpleKVWriter(iobase.Writer):
+
+ def __init__(self, access_token, table_name):
+ self._access_token = access_token
+ self._table_name = table_name
+ self._table = simplekv.open_table(access_token, table_name)
+
+ def write(self, record):
+ key, value = record
+
+ simplekv.write_to_table(self._access_token, self._table, key, value)
+
+ def close(self):
+ return self._table_name
+ # [END model_custom_sink_new_writer]
+
+ # Using the new sink in an example pipeline.
+ # [START model_custom_sink_use_new_sink]
+ p = beam.Pipeline(options=PipelineOptions())
+ kvs = p | beam.core.Create(
+ 'CreateKVs', KVs)
+
+ kvs | beam.io.Write('WriteToSimpleKV',
+ SimpleKVSink('http://url_to_simple_kv/',
+ final_table_name))
+ # [END model_custom_sink_use_new_sink]
+
+ p.run()
+
+
def model_textio(renames):
"""Using a Read and Write transform to read/write text files.
@@ -859,6 +1015,45 @@ def model_co_group_by_key_tuple(email_list, phone_list, output_path):
p.run()
+def model_join_using_side_inputs(
+ name_list, email_list, phone_list, output_path):
+ """Joining PCollections using side inputs."""
+
+ import apache_beam as beam
+ from apache_beam.pvalue import AsIter
+ from apache_beam.utils.options import PipelineOptions
+
+ p = beam.Pipeline(options=PipelineOptions())
+ # [START model_join_using_side_inputs]
+ # This code performs a join by receiving the set of names as an input and
+ # passing PCollections that contain emails and phone numbers as side inputs
+ # instead of using CoGroupByKey.
+ names = p | beam.Create('names', name_list)
+ emails = p | beam.Create('email', email_list)
+ phones = p | beam.Create('phone', phone_list)
+
+ def join_info(name, emails, phone_numbers):
+ filtered_emails = []
+ for name_in_list, email in emails:
+ if name_in_list == name:
+ filtered_emails.append(email)
+
+ filtered_phone_numbers = []
+ for name_in_list, phone_number in phone_numbers:
+ if name_in_list == name:
+ filtered_phone_numbers.append(phone_number)
+
+ return '; '.join(['%s' % name,
+ '%s' % ','.join(filtered_emails),
+ '%s' % ','.join(filtered_phone_numbers)])
+
+ contact_lines = names | beam.core.Map(
+ "CreateContacts", join_info, AsIter(emails), AsIter(phones))
+ # [END model_join_using_side_inputs]
+ contact_lines | beam.io.Write(beam.io.TextFileSink(output_path))
+ p.run()
+
+
# [START model_library_transforms_keys]
class Keys(beam.PTransform):
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/22ff002f/sdks/python/apache_beam/examples/snippets/snippets_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/snippets/snippets_test.py b/sdks/python/apache_beam/examples/snippets/snippets_test.py
index 6e1045f..eaf1b3a 100644
--- a/sdks/python/apache_beam/examples/snippets/snippets_test.py
+++ b/sdks/python/apache_beam/examples/snippets/snippets_test.py
@@ -17,7 +17,9 @@
"""Tests for all code snippets used in public docs."""
+import glob
import logging
+import os
import sys
import tempfile
import unittest
@@ -234,6 +236,8 @@ class TypeHintsTest(unittest.TestCase):
# [END type_hints_missing_define_numbers]
# Consider the following code.
+ # pylint: disable=expression-not-assigned
+ # pylint: disable=unused-variable
# [START type_hints_missing_apply]
evens = numbers | beam.Filter(lambda x: x % 2 == 0)
# [END type_hints_missing_apply]
@@ -279,11 +283,13 @@ class TypeHintsTest(unittest.TestCase):
words_with_lens = words | MyTransform()
# [END type_hints_transform]
+ # pylint: disable=expression-not-assigned
with self.assertRaises(typehints.TypeCheckError):
words_with_lens | beam.Map(lambda x: x).with_input_types(
beam.typehints.Tuple[int, int])
def test_runtime_checks_off(self):
+ # pylint: disable=expression-not-assigned
p = beam.Pipeline('DirectPipelineRunner', argv=sys.argv)
# [START type_hints_runtime_off]
p | beam.Create(['a']) | beam.Map(lambda x: 3).with_output_types(str)
@@ -291,6 +297,7 @@ class TypeHintsTest(unittest.TestCase):
# [END type_hints_runtime_off]
def test_runtime_checks_on(self):
+ # pylint: disable=expression-not-assigned
p = beam.Pipeline('DirectPipelineRunner', argv=sys.argv)
with self.assertRaises(typehints.TypeCheckError):
# [START type_hints_runtime_on]
@@ -379,6 +386,62 @@ class SnippetsTest(unittest.TestCase):
self.get_output(result_path),
['cba', 'fed', 'ihg', 'lkj', 'onm', 'rqp', 'uts', 'xwv', 'zy'])
+ def test_model_custom_source(self):
+ snippets.model_custom_source(100)
+
+ def test_model_custom_sink(self):
+ tempdir_name = tempfile.mkdtemp()
+
+ class SimpleKV(object):
+ def __init__(self, tmp_dir):
+ self._dummy_token = 'dummy_token'
+ self._tmp_dir = tmp_dir
+
+ def connect(self, url):
+ return self._dummy_token
+
+ def open_table(self, access_token, table_name):
+ assert access_token == self._dummy_token
+ file_name = self._tmp_dir + os.sep + table_name
+ assert not os.path.exists(file_name)
+ open(file_name, 'wb').close()
+ return table_name
+
+ def write_to_table(self, access_token, table_name, key, value):
+ assert access_token == self._dummy_token
+ file_name = self._tmp_dir + os.sep + table_name
+ assert os.path.exists(file_name)
+ with open(file_name, 'ab') as f:
+ f.write(key + ':' + value + os.linesep)
+
+ def rename_table(self, access_token, old_name, new_name):
+ assert access_token == self._dummy_token
+ old_file_name = self._tmp_dir + os.sep + old_name
+ new_file_name = self._tmp_dir + os.sep + new_name
+ assert os.path.isfile(old_file_name)
+ assert not os.path.exists(new_file_name)
+
+ os.rename(old_file_name, new_file_name)
+
+ snippets.model_custom_sink(
+ SimpleKV(tempdir_name),
+ [('key' + str(i), 'value' + str(i)) for i in range(100)],
+ 'final_table')
+
+ glob_pattern = tempdir_name + os.sep + 'final_table*'
+ output_files = glob.glob(glob_pattern)
+ assert len(output_files) > 0
+
+ received_output = []
+ for file_name in output_files:
+ with open(file_name) as f:
+ for line in f:
+ received_output.append(line.rstrip(os.linesep))
+ expected_output = [
+ 'key' + str(i) + ':' + 'value' + str(i) for i in range(100)]
+
+ self.assertItemsEqual(expected_output, received_output)
+
def test_model_textio(self):
temp_path = self.create_temp_file('aa bb cc\n bb cc\n cc')
result_path = temp_path + '.result'
@@ -480,6 +543,16 @@ class SnippetsTest(unittest.TestCase):
expect = ['a; a@example.com; x4312', 'b; b@example.com; x8452']
self.assertEqual(expect, self.get_output(result_path))
+ def test_model_join_using_side_inputs(self):
+ name_list = ['a', 'b']
+ email_list = [['a', 'a@example.com'], ['b', 'b@example.com']]
+ phone_list = [['a', 'x4312'], ['b', 'x8452']]
+ result_path = self.create_temp_file()
+ snippets.model_join_using_side_inputs(
+ name_list, email_list, phone_list, result_path)
+ expect = ['a; a@example.com; x4312', 'b; b@example.com; x8452']
+ self.assertEqual(expect, self.get_output(result_path))
+
class CombineTest(unittest.TestCase):
"""Tests for dataflow/model/combine."""
[2/2] incubator-beam git commit: This closes #565
Posted by ke...@apache.org.
This closes #565
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/4006f777
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/4006f777
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/4006f777
Branch: refs/heads/python-sdk
Commit: 4006f77792f55513d831f56c7a86ed6fecd58356
Parents: b6bb97f 22ff002
Author: Kenneth Knowles <kl...@google.com>
Authored: Tue Jul 12 08:32:54 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Tue Jul 12 08:32:54 2016 -0700
----------------------------------------------------------------------
.../apache_beam/examples/snippets/__init__.py | 16 ++
.../apache_beam/examples/snippets/snippets.py | 195 +++++++++++++++++++
.../examples/snippets/snippets_test.py | 73 +++++++
3 files changed, 284 insertions(+)
----------------------------------------------------------------------