You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/07/12 15:33:22 UTC

[1/2] incubator-beam git commit: Adds more code snippets.

Repository: incubator-beam
Updated Branches:
  refs/heads/python-sdk b6bb97f57 -> 4006f7779


Adds more code snippets.

Adds code snippets for sources, sinks, and joining using side inputs.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/22ff002f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/22ff002f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/22ff002f

Branch: refs/heads/python-sdk
Commit: 22ff002f385141c9e7d98a04f2edb817f43623a0
Parents: b6bb97f
Author: Chamikara Jayalath <ch...@apache.org>
Authored: Sat Jun 25 01:53:12 2016 -0700
Committer: Chamikara Jayalath <ch...@google.com>
Committed: Mon Jul 11 22:49:11 2016 -0700

----------------------------------------------------------------------
 .../apache_beam/examples/snippets/__init__.py   |  16 ++
 .../apache_beam/examples/snippets/snippets.py   | 195 +++++++++++++++++++
 .../examples/snippets/snippets_test.py          |  73 +++++++
 3 files changed, 284 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/22ff002f/sdks/python/apache_beam/examples/snippets/__init__.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/snippets/__init__.py b/sdks/python/apache_beam/examples/snippets/__init__.py
new file mode 100644
index 0000000..cce3aca
--- /dev/null
+++ b/sdks/python/apache_beam/examples/snippets/__init__.py
@@ -0,0 +1,16 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/22ff002f/sdks/python/apache_beam/examples/snippets/snippets.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/snippets/snippets.py b/sdks/python/apache_beam/examples/snippets/snippets.py
index d84deea..57ddbee 100644
--- a/sdks/python/apache_beam/examples/snippets/snippets.py
+++ b/sdks/python/apache_beam/examples/snippets/snippets.py
@@ -609,6 +609,162 @@ def examples_wordcount_debugging(renames):
   p.run()
 
 
+def model_custom_source(count):
+  """Demonstrates creating a new custom source and using it in a pipeline.
+
+  Defines a new source 'CountingSource' that produces integers starting from 0
+  up to a given size.
+
+  Uses the new source in an example pipeline.
+
+  Args:
+    count: the size of the counting source to be used in the pipeline
+           demonstrated in this method.
+  """
+
+  import apache_beam as beam
+  from apache_beam.io import iobase
+  from apache_beam.io.range_trackers import OffsetRangeTracker
+  from apache_beam.utils.options import PipelineOptions
+
+  # Defining a new source.
+  # [START model_custom_source_new_source]
+  class CountingSource(iobase.BoundedSource):
+
+    def __init__(self, count):
+      self._count = count
+
+    def estimate_size(self):
+      return self._count
+
+    def get_range_tracker(self, start_position, stop_position):
+      if start_position is None:
+        start_position = 0
+      if stop_position is None:
+        stop_position = self._count
+
+      return OffsetRangeTracker(start_position, stop_position)
+
+    def read(self, range_tracker):
+      for i in range(self._count):
+        if not range_tracker.try_claim(i):
+          return
+        yield i
+
+    def split(self, desired_bundle_size, start_position=None,
+              stop_position=None):
+      if start_position is None:
+        start_position = 0
+      if stop_position is None:
+        stop_position = self._count
+
+      bundle_start = start_position
+      while bundle_start < self._count:
+        bundle_stop = max(self._count, bundle_start + desired_bundle_size)
+        yield iobase.SourceBundle(weight=(bundle_stop - bundle_start),
+                                  source=self,
+                                  start_position=bundle_start,
+                                  stop_position=bundle_stop)
+        bundle_start = bundle_stop
+  # [END model_custom_source_new_source]
+
+  # Using the source in an example pipeline.
+  # [START model_custom_source_use_new_source]
+  p = beam.Pipeline(options=PipelineOptions())
+  numbers = p | beam.io.Read('ProduceNumbers', CountingSource(count))
+  # [END model_custom_source_use_new_source]
+
+  lines = numbers | beam.core.Map(lambda number: 'line %d' % number)
+  beam.assert_that(
+      lines, beam.equal_to(
+          ['line ' + str(number) for number in range(0, count)]))
+
+  p.run()
+
+
+def model_custom_sink(simplekv, KVs, final_table_name):
+  """Demonstrates creating a new custom sink and using it in a pipeline.
+
+  Defines a new sink 'SimpleKVSink' that demonstrates writing to a simple
+  key-value based storage system.
+
+  Uses the new sink in an example pipeline.
+
+  Args:
+    simplekv: an object that mocks the key-value storage. The API of the
+              key-value storage consists of following methods.
+              simplekv.connect(url) -
+                  connects to the storage and returns an access token
+                  which can be used to perform further operations
+              simplekv.open_table(access_token, table_name) -
+                  creates a table named 'table_name'. Returns a table object.
+              simplekv.write_to_table(table, access_token, key, value) -
+                  writes a key-value pair to the given table.
+              simplekv.rename_table(access_token, old_name, new_name) -
+                  renames the table named 'old_name' to 'new_name'.
+    KVs: the set of key-value pairs to be written in the example pipeline.
+    final_table_name: the prefix of final set of tables to be created by the
+                      example pipeline.
+  """
+
+  import apache_beam as beam
+  from apache_beam.io import iobase
+  from apache_beam.utils.options import PipelineOptions
+
+  # Defining the new sink.
+  # [START model_custom_sink_new_sink]
+  class SimpleKVSink(iobase.Sink):
+
+    def __init__(self, url, final_table_name):
+      self._url = url
+      self._final_table_name = final_table_name
+
+    def initialize_write(self):
+      access_token = simplekv.connect(self._url)
+      return access_token
+
+    def open_writer(self, access_token, uid):
+      table_name = 'table' + uid
+      return SimpleKVWriter(access_token, table_name)
+
+    def finalize_write(self, access_token, table_names):
+      for i, table_name in enumerate(table_names):
+        simplekv.rename_table(
+            access_token, table_name, self._final_table_name + str(i))
+  # [END model_custom_sink_new_sink]
+
+  # Defining a writer for the new sink.
+  # [START model_custom_sink_new_writer]
+  class SimpleKVWriter(iobase.Writer):
+
+    def __init__(self, access_token, table_name):
+      self._access_token = access_token
+      self._table_name = table_name
+      self._table = simplekv.open_table(access_token, table_name)
+
+    def write(self, record):
+      key, value = record
+
+      simplekv.write_to_table(self._access_token, self._table, key, value)
+
+    def close(self):
+      return self._table_name
+  # [END model_custom_sink_new_writer]
+
+  # Using the new sink in an example pipeline.
+  # [START model_custom_sink_use_new_sink]
+  p = beam.Pipeline(options=PipelineOptions())
+  kvs = p | beam.core.Create(
+      'CreateKVs', KVs)
+
+  kvs | beam.io.Write('WriteToSimpleKV',
+                      SimpleKVSink('http://url_to_simple_kv/',
+                                   final_table_name))
+  # [END model_custom_sink_use_new_sink]
+
+  p.run()
+
+
 def model_textio(renames):
   """Using a Read and Write transform to read/write text files.
 
@@ -859,6 +1015,45 @@ def model_co_group_by_key_tuple(email_list, phone_list, output_path):
   p.run()
 
 
+def model_join_using_side_inputs(
+    name_list, email_list, phone_list, output_path):
+  """Joining PCollections using side inputs."""
+
+  import apache_beam as beam
+  from apache_beam.pvalue import AsIter
+  from apache_beam.utils.options import PipelineOptions
+
+  p = beam.Pipeline(options=PipelineOptions())
+  # [START model_join_using_side_inputs]
+  # This code performs a join by receiving the set of names as an input and
+  # passing PCollections that contain emails and phone numbers as side inputs
+  # instead of using CoGroupByKey.
+  names = p | beam.Create('names', name_list)
+  emails = p | beam.Create('email', email_list)
+  phones = p | beam.Create('phone', phone_list)
+
+  def join_info(name, emails, phone_numbers):
+    filtered_emails = []
+    for name_in_list, email in emails:
+      if name_in_list == name:
+        filtered_emails.append(email)
+
+    filtered_phone_numbers = []
+    for name_in_list, phone_number in phone_numbers:
+      if name_in_list == name:
+        filtered_phone_numbers.append(phone_number)
+
+    return '; '.join(['%s' % name,
+                      '%s' % ','.join(filtered_emails),
+                      '%s' % ','.join(filtered_phone_numbers)])
+
+  contact_lines = names | beam.core.Map(
+      "CreateContacts", join_info, AsIter(emails), AsIter(phones))
+  # [END model_join_using_side_inputs]
+  contact_lines | beam.io.Write(beam.io.TextFileSink(output_path))
+  p.run()
+
+
 # [START model_library_transforms_keys]
 class Keys(beam.PTransform):
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/22ff002f/sdks/python/apache_beam/examples/snippets/snippets_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/snippets/snippets_test.py b/sdks/python/apache_beam/examples/snippets/snippets_test.py
index 6e1045f..eaf1b3a 100644
--- a/sdks/python/apache_beam/examples/snippets/snippets_test.py
+++ b/sdks/python/apache_beam/examples/snippets/snippets_test.py
@@ -17,7 +17,9 @@
 
 """Tests for all code snippets used in public docs."""
 
+import glob
 import logging
+import os
 import sys
 import tempfile
 import unittest
@@ -234,6 +236,8 @@ class TypeHintsTest(unittest.TestCase):
     # [END type_hints_missing_define_numbers]
 
     # Consider the following code.
+    # pylint: disable=expression-not-assigned
+    # pylint: disable=unused-variable
     # [START type_hints_missing_apply]
     evens = numbers | beam.Filter(lambda x: x % 2 == 0)
     # [END type_hints_missing_apply]
@@ -279,11 +283,13 @@ class TypeHintsTest(unittest.TestCase):
     words_with_lens = words | MyTransform()
     # [END type_hints_transform]
 
+    # pylint: disable=expression-not-assigned
     with self.assertRaises(typehints.TypeCheckError):
       words_with_lens | beam.Map(lambda x: x).with_input_types(
           beam.typehints.Tuple[int, int])
 
   def test_runtime_checks_off(self):
+    # pylint: disable=expression-not-assigned
     p = beam.Pipeline('DirectPipelineRunner', argv=sys.argv)
     # [START type_hints_runtime_off]
     p | beam.Create(['a']) | beam.Map(lambda x: 3).with_output_types(str)
@@ -291,6 +297,7 @@ class TypeHintsTest(unittest.TestCase):
     # [END type_hints_runtime_off]
 
   def test_runtime_checks_on(self):
+    # pylint: disable=expression-not-assigned
     p = beam.Pipeline('DirectPipelineRunner', argv=sys.argv)
     with self.assertRaises(typehints.TypeCheckError):
       # [START type_hints_runtime_on]
@@ -379,6 +386,62 @@ class SnippetsTest(unittest.TestCase):
         self.get_output(result_path),
         ['cba', 'fed', 'ihg', 'lkj', 'onm', 'rqp', 'uts', 'xwv', 'zy'])
 
+  def test_model_custom_source(self):
+    snippets.model_custom_source(100)
+
+  def test_model_custom_sink(self):
+    tempdir_name = tempfile.mkdtemp()
+
+    class SimpleKV(object):
+      def __init__(self, tmp_dir):
+        self._dummy_token = 'dummy_token'
+        self._tmp_dir = tmp_dir
+
+      def connect(self, url):
+        return self._dummy_token
+
+      def open_table(self, access_token, table_name):
+        assert access_token == self._dummy_token
+        file_name = self._tmp_dir + os.sep + table_name
+        assert not os.path.exists(file_name)
+        open(file_name, 'wb').close()
+        return table_name
+
+      def write_to_table(self, access_token, table_name, key, value):
+        assert access_token == self._dummy_token
+        file_name = self._tmp_dir + os.sep + table_name
+        assert os.path.exists(file_name)
+        with open(file_name, 'ab') as f:
+          f.write(key + ':' + value + os.linesep)
+
+      def rename_table(self, access_token, old_name, new_name):
+        assert access_token == self._dummy_token
+        old_file_name = self._tmp_dir + os.sep + old_name
+        new_file_name = self._tmp_dir + os.sep + new_name
+        assert os.path.isfile(old_file_name)
+        assert not os.path.exists(new_file_name)
+
+        os.rename(old_file_name, new_file_name)
+
+    snippets.model_custom_sink(
+        SimpleKV(tempdir_name),
+        [('key' + str(i), 'value' + str(i)) for i in range(100)],
+        'final_table')
+
+    glob_pattern = tempdir_name + os.sep + 'final_table*'
+    output_files = glob.glob(glob_pattern)
+    assert len(output_files) > 0
+
+    received_output = []
+    for file_name in output_files:
+      with open(file_name) as f:
+        for line in f:
+          received_output.append(line.rstrip(os.linesep))
+    expected_output = [
+        'key' + str(i) + ':' + 'value' + str(i) for i in range(100)]
+
+    self.assertItemsEqual(expected_output, received_output)
+
   def test_model_textio(self):
     temp_path = self.create_temp_file('aa bb cc\n bb cc\n cc')
     result_path = temp_path + '.result'
@@ -480,6 +543,16 @@ class SnippetsTest(unittest.TestCase):
     expect = ['a; a@example.com; x4312', 'b; b@example.com; x8452']
     self.assertEqual(expect, self.get_output(result_path))
 
+  def test_model_join_using_side_inputs(self):
+    name_list = ['a', 'b']
+    email_list = [['a', 'a@example.com'], ['b', 'b@example.com']]
+    phone_list = [['a', 'x4312'], ['b', 'x8452']]
+    result_path = self.create_temp_file()
+    snippets.model_join_using_side_inputs(
+        name_list, email_list, phone_list, result_path)
+    expect = ['a; a@example.com; x4312', 'b; b@example.com; x8452']
+    self.assertEqual(expect, self.get_output(result_path))
+
 
 class CombineTest(unittest.TestCase):
   """Tests for dataflow/model/combine."""


[2/2] incubator-beam git commit: This closes #565

Posted by ke...@apache.org.
This closes #565


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/4006f777
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/4006f777
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/4006f777

Branch: refs/heads/python-sdk
Commit: 4006f77792f55513d831f56c7a86ed6fecd58356
Parents: b6bb97f 22ff002
Author: Kenneth Knowles <kl...@google.com>
Authored: Tue Jul 12 08:32:54 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Tue Jul 12 08:32:54 2016 -0700

----------------------------------------------------------------------
 .../apache_beam/examples/snippets/__init__.py   |  16 ++
 .../apache_beam/examples/snippets/snippets.py   | 195 +++++++++++++++++++
 .../examples/snippets/snippets_test.py          |  73 +++++++
 3 files changed, 284 insertions(+)
----------------------------------------------------------------------