You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2017/01/03 18:42:37 UTC

[1/2] beam git commit: Updates Python SDK examples to use Beam text source/sink.

Repository: beam
Updated Branches:
  refs/heads/python-sdk f22fb9ce5 -> a779ac15d


Updates Python SDK examples to use Beam text source/sink.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/87bed83b
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/87bed83b
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/87bed83b

Branch: refs/heads/python-sdk
Commit: 87bed83bc96e73003f05ed299e08b558ac5d1e37
Parents: f22fb9c
Author: Chamikara Jayalath <ch...@google.com>
Authored: Mon Jan 2 23:21:05 2017 -0800
Committer: Robert Bradshaw <ro...@google.com>
Committed: Tue Jan 3 10:42:02 2017 -0800

----------------------------------------------------------------------
 .../apache_beam/examples/complete/autocomplete.py   |  6 ++++--
 .../apache_beam/examples/complete/estimate_pi.py    |  4 ++--
 .../examples/complete/juliaset/juliaset/juliaset.py |  3 ++-
 sdks/python/apache_beam/examples/complete/tfidf.py  |  6 ++++--
 .../examples/complete/top_wikipedia_sessions.py     |  6 ++++--
 .../examples/cookbook/bigquery_side_input.py        |  3 ++-
 .../apache_beam/examples/cookbook/bigshuffle.py     | 16 ++++++++--------
 sdks/python/apache_beam/examples/cookbook/coders.py |  8 ++++----
 .../examples/cookbook/custom_ptransform.py          | 11 ++++++-----
 .../examples/cookbook/datastore_wordcount.py        |  3 ++-
 .../examples/cookbook/group_with_coder.py           |  6 ++++--
 .../apache_beam/examples/cookbook/mergecontacts.py  | 16 +++++++---------
 .../examples/cookbook/multiple_output_pardo.py      | 14 +++++++-------
 sdks/python/apache_beam/examples/wordcount.py       |  6 ++++--
 .../apache_beam/examples/wordcount_debugging.py     |  6 ++++--
 .../apache_beam/examples/wordcount_minimal.py       |  6 ++++--
 .../consumer_tracking_pipeline_visitor_test.py      |  8 ++++++--
 17 files changed, 74 insertions(+), 54 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/87bed83b/sdks/python/apache_beam/examples/complete/autocomplete.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/complete/autocomplete.py b/sdks/python/apache_beam/examples/complete/autocomplete.py
index 87e6c0c..3f2a7ae 100644
--- a/sdks/python/apache_beam/examples/complete/autocomplete.py
+++ b/sdks/python/apache_beam/examples/complete/autocomplete.py
@@ -24,6 +24,8 @@ import logging
 import re
 
 import apache_beam as beam
+from apache_beam.io import ReadFromText
+from apache_beam.io import WriteToText
 from apache_beam.utils.pipeline_options import PipelineOptions
 from apache_beam.utils.pipeline_options import SetupOptions
 
@@ -45,12 +47,12 @@ def run(argv=None):
   p = beam.Pipeline(options=pipeline_options)
 
   (p  # pylint: disable=expression-not-assigned
-   | 'read' >> beam.io.Read(beam.io.TextFileSource(known_args.input))
+   | 'read' >> ReadFromText(known_args.input)
    | 'split' >> beam.FlatMap(lambda x: re.findall(r'[A-Za-z\']+', x))
    | 'TopPerPrefix' >> TopPerPrefix(5)
    | 'format' >> beam.Map(
        lambda (prefix, candidates): '%s: %s' % (prefix, candidates))
-   | 'write' >> beam.io.Write(beam.io.TextFileSink(known_args.output)))
+   | 'write' >> WriteToText(known_args.output))
   p.run()
 
 

http://git-wip-us.apache.org/repos/asf/beam/blob/87bed83b/sdks/python/apache_beam/examples/complete/estimate_pi.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/complete/estimate_pi.py b/sdks/python/apache_beam/examples/complete/estimate_pi.py
index d0faefe..11081a6 100644
--- a/sdks/python/apache_beam/examples/complete/estimate_pi.py
+++ b/sdks/python/apache_beam/examples/complete/estimate_pi.py
@@ -33,6 +33,7 @@ import random
 
 
 import apache_beam as beam
+from apache_beam.io import WriteToText
 from apache_beam.typehints import Any
 from apache_beam.typehints import Iterable
 from apache_beam.typehints import Tuple
@@ -113,8 +114,7 @@ def run(argv=None):
 
   (p  # pylint: disable=expression-not-assigned
    | EstimatePiTransform()
-   | beam.io.Write(beam.io.TextFileSink(known_args.output,
-                                        coder=JsonCoder())))
+   | WriteToText(known_args.output, coder=JsonCoder()))
 
   # Actually run the pipeline (all operations above are deferred).
   p.run()

http://git-wip-us.apache.org/repos/asf/beam/blob/87bed83b/sdks/python/apache_beam/examples/complete/juliaset/juliaset/juliaset.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/complete/juliaset/juliaset/juliaset.py b/sdks/python/apache_beam/examples/complete/juliaset/juliaset/juliaset.py
index 1445fbe..45fc1fb 100644
--- a/sdks/python/apache_beam/examples/complete/juliaset/juliaset/juliaset.py
+++ b/sdks/python/apache_beam/examples/complete/juliaset/juliaset/juliaset.py
@@ -25,6 +25,7 @@ from __future__ import absolute_import
 import argparse
 
 import apache_beam as beam
+from apache_beam.io import WriteToText
 
 
 def from_pixel(x, y, n):
@@ -110,7 +111,7 @@ def run(argv=None):  # pylint: disable=missing-docstring
    | 'x coord' >> beam.GroupByKey()
    | 'format' >> beam.Map(
        lambda (k, coords): ' '.join('(%s, %s, %s)' % coord for coord in coords))
-   | beam.io.Write(beam.io.TextFileSink(known_args.coordinate_output)))
+   | WriteToText(known_args.coordinate_output))
   # pylint: enable=expression-not-assigned
   p.run()
 

http://git-wip-us.apache.org/repos/asf/beam/blob/87bed83b/sdks/python/apache_beam/examples/complete/tfidf.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/complete/tfidf.py b/sdks/python/apache_beam/examples/complete/tfidf.py
index b4d5b45..59b9d6f 100644
--- a/sdks/python/apache_beam/examples/complete/tfidf.py
+++ b/sdks/python/apache_beam/examples/complete/tfidf.py
@@ -29,6 +29,8 @@ import math
 import re
 
 import apache_beam as beam
+from apache_beam.io import ReadFromText
+from apache_beam.io import WriteToText
 from apache_beam.pvalue import AsSingleton
 from apache_beam.utils.pipeline_options import PipelineOptions
 from apache_beam.utils.pipeline_options import SetupOptions
@@ -40,7 +42,7 @@ def read_documents(pipeline, uris):
   for uri in uris:
     pcolls.append(
         pipeline
-        | beam.io.Read('read: %s' % uri, beam.io.TextFileSource(uri))
+        | 'read: %s' % uri >> ReadFromText(uri)
         | beam.Map('withkey: %s' % uri, lambda v, uri: (uri, v), uri))
   return pcolls | 'flatten read pcolls' >> beam.Flatten()
 
@@ -197,7 +199,7 @@ def run(argv=None):
   output = pcoll | TfIdf()
   # Write the output using a "Write" transform that has side effects.
   # pylint: disable=expression-not-assigned
-  output | 'write' >> beam.io.Write(beam.io.TextFileSink(known_args.output))
+  output | 'write' >> WriteToText(known_args.output)
   p.run()
 
 

http://git-wip-us.apache.org/repos/asf/beam/blob/87bed83b/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions.py b/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions.py
index cbd305a..2dea642 100644
--- a/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions.py
+++ b/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions.py
@@ -46,6 +46,8 @@ import logging
 import apache_beam as beam
 from apache_beam import combiners
 from apache_beam import window
+from apache_beam.io import ReadFromText
+from apache_beam.io import WriteToText
 from apache_beam.utils.pipeline_options import PipelineOptions
 from apache_beam.utils.pipeline_options import SetupOptions
 
@@ -168,9 +170,9 @@ def run(argv=None):
   p = beam.Pipeline(options=pipeline_options)
 
   (p  # pylint: disable=expression-not-assigned
-   | beam.Read(beam.io.TextFileSource(known_args.input))
+   | ReadFromText(known_args.input)
    | ComputeTopSessions(known_args.sampling_threshold)
-   | beam.io.Write(beam.io.TextFileSink(known_args.output)))
+   | WriteToText(known_args.output))
 
   p.run()
 

http://git-wip-us.apache.org/repos/asf/beam/blob/87bed83b/sdks/python/apache_beam/examples/cookbook/bigquery_side_input.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/bigquery_side_input.py b/sdks/python/apache_beam/examples/cookbook/bigquery_side_input.py
index 25e2c3b..7c5784b 100644
--- a/sdks/python/apache_beam/examples/cookbook/bigquery_side_input.py
+++ b/sdks/python/apache_beam/examples/cookbook/bigquery_side_input.py
@@ -33,6 +33,7 @@ from random import randrange
 
 import apache_beam as beam
 
+from apache_beam.io import WriteToText
 from apache_beam.pvalue import AsList
 from apache_beam.pvalue import AsSingleton
 from apache_beam.utils.pipeline_options import PipelineOptions
@@ -113,7 +114,7 @@ def run(argv=None):
                                pcoll_ignore_corpus, pcoll_ignore_word)
 
   # pylint:disable=expression-not-assigned
-  pcoll_groups | beam.io.Write(beam.io.TextFileSink(known_args.output))
+  pcoll_groups | WriteToText(known_args.output)
   p.run()
 
 

http://git-wip-us.apache.org/repos/asf/beam/blob/87bed83b/sdks/python/apache_beam/examples/cookbook/bigshuffle.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/bigshuffle.py b/sdks/python/apache_beam/examples/cookbook/bigshuffle.py
index 83d3881..ceeefd6 100644
--- a/sdks/python/apache_beam/examples/cookbook/bigshuffle.py
+++ b/sdks/python/apache_beam/examples/cookbook/bigshuffle.py
@@ -25,6 +25,8 @@ import logging
 
 
 import apache_beam as beam
+from apache_beam.io import ReadFromText
+from apache_beam.io import WriteToText
 from apache_beam.utils.pipeline_options import PipelineOptions
 from apache_beam.utils.pipeline_options import SetupOptions
 
@@ -53,9 +55,7 @@ def run(argv=None):
   p = beam.Pipeline(options=pipeline_options)
 
   # Read the text file[pattern] into a PCollection.
-  lines = p | beam.io.Read(
-      beam.io.TextFileSource(known_args.input,
-                             coder=beam.coders.BytesCoder()))
+  lines = p | ReadFromText(known_args.input, coder=beam.coders.BytesCoder())
 
   # Count the occurrences of each word.
   output = (lines
@@ -68,7 +68,7 @@ def run(argv=None):
                 lambda (key, vals): ['%s%s' % (key, val) for val in vals]))
 
   # Write the output using a "Write" transform that has side effects.
-  output | beam.io.Write(beam.io.TextFileSink(known_args.output))
+  output | WriteToText(known_args.output)
 
   # Optionally write the input and output checksums.
   if known_args.checksum_output:
@@ -76,15 +76,15 @@ def run(argv=None):
                   | 'input-csum' >> beam.Map(crc32line)
                   | 'combine-input-csum' >> beam.CombineGlobally(sum)
                   | 'hex-format' >> beam.Map(lambda x: '%x' % x))
-    input_csum | 'write-input-csum' >> beam.io.Write(
-        beam.io.TextFileSink(known_args.checksum_output + '-input'))
+    input_csum | 'write-input-csum' >> WriteToText(
+        known_args.checksum_output + '-input')
 
     output_csum = (output
                    | 'output-csum' >> beam.Map(crc32line)
                    | 'combine-output-csum' >> beam.CombineGlobally(sum)
                    | 'hex-format-output' >> beam.Map(lambda x: '%x' % x))
-    output_csum | 'write-output-csum' >> beam.io.Write(
-        beam.io.TextFileSink(known_args.checksum_output + '-output'))
+    output_csum | 'write-output-csum' >> WriteToText(
+        known_args.checksum_output + '-output')
 
   # Actually run the pipeline (all operations above are deferred).
   p.run()

http://git-wip-us.apache.org/repos/asf/beam/blob/87bed83b/sdks/python/apache_beam/examples/cookbook/coders.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/coders.py b/sdks/python/apache_beam/examples/cookbook/coders.py
index 690ba66..ede9d70 100644
--- a/sdks/python/apache_beam/examples/cookbook/coders.py
+++ b/sdks/python/apache_beam/examples/cookbook/coders.py
@@ -35,6 +35,8 @@ import json
 import logging
 
 import apache_beam as beam
+from apache_beam.io import ReadFromText
+from apache_beam.io import WriteToText
 from apache_beam.utils.pipeline_options import PipelineOptions
 from apache_beam.utils.pipeline_options import SetupOptions
 
@@ -87,12 +89,10 @@ def run(argv=None):
 
   p = beam.Pipeline(argv=pipeline_args)
   (p  # pylint: disable=expression-not-assigned
-   | beam.io.Read('read',
-                  beam.io.TextFileSource(known_args.input, coder=JsonCoder()))
+   | 'read' >> ReadFromText(known_args.input, coder=JsonCoder())
    | 'points' >> beam.FlatMap(compute_points)
    | beam.CombinePerKey(sum)
-   | beam.io.Write('write',
-                   beam.io.TextFileSink(known_args.output, coder=JsonCoder())))
+   | 'write' >> WriteToText(known_args.output, coder=JsonCoder()))
   p.run()
 
 

http://git-wip-us.apache.org/repos/asf/beam/blob/87bed83b/sdks/python/apache_beam/examples/cookbook/custom_ptransform.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/custom_ptransform.py b/sdks/python/apache_beam/examples/cookbook/custom_ptransform.py
index 56259ed..ef6bc5a 100644
--- a/sdks/python/apache_beam/examples/cookbook/custom_ptransform.py
+++ b/sdks/python/apache_beam/examples/cookbook/custom_ptransform.py
@@ -26,7 +26,8 @@ import argparse
 import logging
 
 import apache_beam as beam
-
+from apache_beam.io import ReadFromText
+from apache_beam.io import WriteToText
 from apache_beam.utils.pipeline_options import PipelineOptions
 
 
@@ -66,9 +67,9 @@ def run_count2(known_args, options):
   """Runs the second example pipeline."""
   logging.info('Running second pipeline')
   p = beam.Pipeline(options=options)
-  (p | beam.io.Read(beam.io.TextFileSource(known_args.input))
+  (p | ReadFromText(known_args.input)
    | Count2()  # pylint: disable=no-value-for-parameter
-   | beam.io.Write(beam.io.TextFileSink(known_args.output)))
+   | WriteToText(known_args.output))
   p.run()
 
 
@@ -93,9 +94,9 @@ def run_count3(known_args, options):
   """Runs the third example pipeline."""
   logging.info('Running third pipeline')
   p = beam.Pipeline(options=options)
-  (p | beam.io.Read(beam.io.TextFileSource(known_args.input))
+  (p | ReadFromText(known_args.input)
    | Count3(2)  # pylint: disable=no-value-for-parameter
-   | beam.io.Write(beam.io.TextFileSink(known_args.output)))
+   | WriteToText(known_args.output))
   p.run()
 
 

http://git-wip-us.apache.org/repos/asf/beam/blob/87bed83b/sdks/python/apache_beam/examples/cookbook/datastore_wordcount.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/datastore_wordcount.py b/sdks/python/apache_beam/examples/cookbook/datastore_wordcount.py
index dd34070..8f68fb4 100644
--- a/sdks/python/apache_beam/examples/cookbook/datastore_wordcount.py
+++ b/sdks/python/apache_beam/examples/cookbook/datastore_wordcount.py
@@ -71,6 +71,7 @@ from google.datastore.v1 import query_pb2
 from googledatastore import helper as datastore_helper, PropertyFilter
 
 import apache_beam as beam
+from apache_beam.io import ReadFromText
 from apache_beam.io.datastore.v1.datastoreio import ReadFromDatastore
 from apache_beam.io.datastore.v1.datastoreio import WriteToDatastore
 from apache_beam.utils.pipeline_options import GoogleCloudOptions
@@ -133,7 +134,7 @@ def write_to_datastore(project, user_options, pipeline_options):
 
   # pylint: disable=expression-not-assigned
   (p
-   | 'read' >> beam.io.Read(beam.io.TextFileSource(user_options.input))
+   | 'read' >> ReadFromText(user_options.input)
    | 'create entity' >> beam.Map(
        EntityWrapper(user_options.namespace, user_options.kind,
                      user_options.ancestor).make_entity)

http://git-wip-us.apache.org/repos/asf/beam/blob/87bed83b/sdks/python/apache_beam/examples/cookbook/group_with_coder.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/group_with_coder.py b/sdks/python/apache_beam/examples/cookbook/group_with_coder.py
index c4b8c59..78540d1 100644
--- a/sdks/python/apache_beam/examples/cookbook/group_with_coder.py
+++ b/sdks/python/apache_beam/examples/cookbook/group_with_coder.py
@@ -33,6 +33,8 @@ import sys
 
 import apache_beam as beam
 from apache_beam import coders
+from apache_beam.io import ReadFromText
+from apache_beam.io import WriteToText
 from apache_beam.typehints import typehints
 from apache_beam.typehints.decorators import with_output_types
 from apache_beam.utils.pipeline_options import PipelineOptions
@@ -98,7 +100,7 @@ def run(argv=sys.argv[1:]):
   coders.registry.register_coder(Player, PlayerCoder)
 
   (p  # pylint: disable=expression-not-assigned
-   | beam.io.Read(beam.io.TextFileSource(known_args.input))
+   | ReadFromText(known_args.input)
    # The get_players function is annotated with a type hint above, so the type
    # system knows the output type of the following operation is a key-value pair
    # of a Player and an int. Please see the documentation for details on
@@ -111,7 +113,7 @@ def run(argv=sys.argv[1:]):
    # encode Player objects as keys for this combine operation.
    | beam.CombinePerKey(sum)
    | beam.Map(lambda (k, v): '%s,%d' % (k.name, v))
-   | beam.io.Write(beam.io.TextFileSink(known_args.output)))
+   | WriteToText(known_args.output))
   p.run()
 
 

http://git-wip-us.apache.org/repos/asf/beam/blob/87bed83b/sdks/python/apache_beam/examples/cookbook/mergecontacts.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/mergecontacts.py b/sdks/python/apache_beam/examples/cookbook/mergecontacts.py
index 6602609..2475e02 100644
--- a/sdks/python/apache_beam/examples/cookbook/mergecontacts.py
+++ b/sdks/python/apache_beam/examples/cookbook/mergecontacts.py
@@ -36,6 +36,8 @@ import logging
 import re
 
 import apache_beam as beam
+from apache_beam.io import ReadFromText
+from apache_beam.io import WriteToText
 from apache_beam.utils.pipeline_options import PipelineOptions
 from apache_beam.utils.pipeline_options import SetupOptions
 
@@ -72,7 +74,7 @@ def run(argv=None, assert_results=None):
   # quotes/backslashes, and convert it a PCollection of (key, value) pairs.
   def read_kv_textfile(label, textfile):
     return (p
-            | beam.io.Read('read_%s' % label, textfile)
+            | 'read_%s' % label >> ReadFromText(textfile)
             | beam.Map('backslash_%s' % label,
                        lambda x: re.sub(r'\\', r'\\\\', x))
             | beam.Map('escape_quotes_%s' % label,
@@ -80,12 +82,9 @@ def run(argv=None, assert_results=None):
             | beam.Map('split_%s' % label, lambda x: re.split(r'\t+', x, 1)))
 
   # Read input databases.
-  email = read_kv_textfile('email',
-                           beam.io.TextFileSource(known_args.input_email))
-  phone = read_kv_textfile('phone',
-                           beam.io.TextFileSource(known_args.input_phone))
-  snailmail = read_kv_textfile('snailmail', beam.io.TextFileSource(
-      known_args.input_snailmail))
+  email = read_kv_textfile('email', known_args.input_email)
+  phone = read_kv_textfile('phone', known_args.input_phone)
+  snailmail = read_kv_textfile('snailmail', known_args.input_snailmail)
 
   # Group together all entries under the same name.
   grouped = (email, phone, snailmail) | 'group_by_name' >> beam.CoGroupByKey()
@@ -113,8 +112,7 @@ def run(argv=None, assert_results=None):
 
   # Write tab-delimited output.
   # pylint: disable=expression-not-assigned
-  tsv_lines | beam.io.Write('write_tsv',
-                            beam.io.TextFileSink(known_args.output_tsv))
+  tsv_lines | 'write_tsv' >> WriteToText(known_args.output_tsv)
 
   # TODO(silviuc): Move the assert_results logic to the unit test.
   if assert_results is not None:

http://git-wip-us.apache.org/repos/asf/beam/blob/87bed83b/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo.py b/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo.py
index dd91e74..3acebc6 100644
--- a/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo.py
+++ b/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo.py
@@ -54,6 +54,8 @@ import re
 
 import apache_beam as beam
 from apache_beam import pvalue
+from apache_beam.io import ReadFromText
+from apache_beam.io import WriteToText
 from apache_beam.utils.pipeline_options import PipelineOptions
 from apache_beam.utils.pipeline_options import SetupOptions
 
@@ -137,7 +139,7 @@ def run(argv=None):
   pipeline_options.view_as(SetupOptions).save_main_session = True
   p = beam.Pipeline(options=pipeline_options)
 
-  lines = p | beam.Read(beam.io.TextFileSource(known_args.input))
+  lines = p | ReadFromText(known_args.input)
 
   # with_outputs allows accessing the side outputs of a DoFn.
   split_lines_result = (lines
@@ -158,20 +160,18 @@ def run(argv=None):
    | 'pair_with_key' >> beam.Map(lambda x: ('chars_temp_key', x))
    | beam.GroupByKey()
    | 'count chars' >> beam.Map(lambda (_, counts): sum(counts))
-   | 'write chars' >> beam.Write(
-       beam.io.TextFileSink(known_args.output + '-chars')))
+   | 'write chars' >> WriteToText(known_args.output + '-chars'))
 
   # pylint: disable=expression-not-assigned
   (short_words
    | 'count short words' >> CountWords()
-   | 'write short words' >> beam.Write(
-       beam.io.TextFileSink(known_args.output + '-short-words')))
+   | 'write short words' >> WriteToText(
+       known_args.output + '-short-words'))
 
   # pylint: disable=expression-not-assigned
   (words
    | 'count words' >> CountWords()
-   | 'write words' >> beam.Write(
-       beam.io.TextFileSink(known_args.output + '-words')))
+   | 'write words' >> WriteToText(known_args.output + '-words'))
 
   p.run()
 

http://git-wip-us.apache.org/repos/asf/beam/blob/87bed83b/sdks/python/apache_beam/examples/wordcount.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/wordcount.py b/sdks/python/apache_beam/examples/wordcount.py
index 51fc2eb..211211d 100644
--- a/sdks/python/apache_beam/examples/wordcount.py
+++ b/sdks/python/apache_beam/examples/wordcount.py
@@ -24,6 +24,8 @@ import logging
 import re
 
 import apache_beam as beam
+from apache_beam.io import ReadFromText
+from apache_beam.io import WriteToText
 from apache_beam.utils.pipeline_options import PipelineOptions
 from apache_beam.utils.pipeline_options import SetupOptions
 
@@ -76,7 +78,7 @@ def run(argv=None):
   p = beam.Pipeline(options=pipeline_options)
 
   # Read the text file[pattern] into a PCollection.
-  lines = p | 'read' >> beam.io.Read(beam.io.TextFileSource(known_args.input))
+  lines = p | 'read' >> ReadFromText(known_args.input)
 
   # Count the occurrences of each word.
   counts = (lines
@@ -91,7 +93,7 @@ def run(argv=None):
 
   # Write the output using a "Write" transform that has side effects.
   # pylint: disable=expression-not-assigned
-  output | 'write' >> beam.io.Write(beam.io.TextFileSink(known_args.output))
+  output | 'write' >> WriteToText(known_args.output)
 
   # Actually run the pipeline (all operations above are deferred).
   result = p.run()

http://git-wip-us.apache.org/repos/asf/beam/blob/87bed83b/sdks/python/apache_beam/examples/wordcount_debugging.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/wordcount_debugging.py b/sdks/python/apache_beam/examples/wordcount_debugging.py
index bba09b4..14f379d 100644
--- a/sdks/python/apache_beam/examples/wordcount_debugging.py
+++ b/sdks/python/apache_beam/examples/wordcount_debugging.py
@@ -46,6 +46,8 @@ import logging
 import re
 
 import apache_beam as beam
+from apache_beam.io import ReadFromText
+from apache_beam.io import WriteToText
 from apache_beam.utils.pipeline_options import PipelineOptions
 from apache_beam.utils.pipeline_options import SetupOptions
 
@@ -126,7 +128,7 @@ def run(argv=None):
   # Read the text file[pattern] into a PCollection, count the occurrences of
   # each word and filter by a list of words.
   filtered_words = (
-      p | 'read' >> beam.io.Read(beam.io.TextFileSource(known_args.input))
+      p | 'read' >> ReadFromText(known_args.input)
       | CountWords()
       | 'FilterText' >> beam.ParDo(FilterTextFn('Flourish|stomach')))
 
@@ -146,7 +148,7 @@ def run(argv=None):
   # pylint: disable=unused-variable
   output = (filtered_words
             | 'format' >> beam.Map(lambda (word, c): '%s: %s' % (word, c))
-            | 'write' >> beam.io.Write(beam.io.TextFileSink(known_args.output)))
+            | 'write' >> WriteToText(known_args.output))
 
   # Actually run the pipeline (all operations above are deferred).
   p.run()

http://git-wip-us.apache.org/repos/asf/beam/blob/87bed83b/sdks/python/apache_beam/examples/wordcount_minimal.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/wordcount_minimal.py b/sdks/python/apache_beam/examples/wordcount_minimal.py
index c02ec16..18595d0 100644
--- a/sdks/python/apache_beam/examples/wordcount_minimal.py
+++ b/sdks/python/apache_beam/examples/wordcount_minimal.py
@@ -51,6 +51,8 @@ import logging
 import re
 
 import apache_beam as beam
+from apache_beam.io import ReadFromText
+from apache_beam.io import WriteToText
 from apache_beam.utils.pipeline_options import PipelineOptions
 from apache_beam.utils.pipeline_options import SetupOptions
 
@@ -93,7 +95,7 @@ def run(argv=None):
   p = beam.Pipeline(options=pipeline_options)
 
   # Read the text file[pattern] into a PCollection.
-  lines = p | 'read' >> beam.io.Read(beam.io.TextFileSource(known_args.input))
+  lines = p | 'read' >> ReadFromText(known_args.input)
 
   # Count the occurrences of each word.
   counts = (lines
@@ -108,7 +110,7 @@ def run(argv=None):
 
   # Write the output using a "Write" transform that has side effects.
   # pylint: disable=expression-not-assigned
-  output | 'write' >> beam.io.Write(beam.io.TextFileSink(known_args.output))
+  output | 'write' >> WriteToText(known_args.output)
 
   # Actually run the pipeline (all operations above are deferred).
   p.run()

http://git-wip-us.apache.org/repos/asf/beam/blob/87bed83b/sdks/python/apache_beam/runners/direct/consumer_tracking_pipeline_visitor_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/direct/consumer_tracking_pipeline_visitor_test.py b/sdks/python/apache_beam/runners/direct/consumer_tracking_pipeline_visitor_test.py
index 6fc9d83..27cf1b6 100644
--- a/sdks/python/apache_beam/runners/direct/consumer_tracking_pipeline_visitor_test.py
+++ b/sdks/python/apache_beam/runners/direct/consumer_tracking_pipeline_visitor_test.py
@@ -21,8 +21,8 @@ import logging
 import unittest
 
 from apache_beam import pvalue
+from apache_beam.io import iobase
 from apache_beam.io import Read
-from apache_beam.io import TextFileSource
 from apache_beam.pipeline import Pipeline
 from apache_beam.pvalue import AsList
 from apache_beam.runners.direct import DirectRunner
@@ -47,7 +47,11 @@ class ConsumerTrackingPipelineVisitorTest(unittest.TestCase):
 
   def test_root_transforms(self):
     root_create = Create('create', [[1, 2, 3]])
-    root_read = Read('read', TextFileSource('/tmp/somefile'))
+
+    class DummySource(iobase.BoundedSource):
+      pass
+
+    root_read = Read('read', DummySource())
     root_flatten = Flatten('flatten', pipeline=self.pipeline)
 
     pbegin = pvalue.PBegin(self.pipeline)


[2/2] beam git commit: Closes #1728

Posted by ro...@apache.org.
Closes #1728


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/a779ac15
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/a779ac15
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/a779ac15

Branch: refs/heads/python-sdk
Commit: a779ac15d284413cb07f19ded7abcc2d6e72aa3d
Parents: f22fb9c 87bed83
Author: Robert Bradshaw <ro...@google.com>
Authored: Tue Jan 3 10:42:03 2017 -0800
Committer: Robert Bradshaw <ro...@google.com>
Committed: Tue Jan 3 10:42:03 2017 -0800

----------------------------------------------------------------------
 .../apache_beam/examples/complete/autocomplete.py   |  6 ++++--
 .../apache_beam/examples/complete/estimate_pi.py    |  4 ++--
 .../examples/complete/juliaset/juliaset/juliaset.py |  3 ++-
 sdks/python/apache_beam/examples/complete/tfidf.py  |  6 ++++--
 .../examples/complete/top_wikipedia_sessions.py     |  6 ++++--
 .../examples/cookbook/bigquery_side_input.py        |  3 ++-
 .../apache_beam/examples/cookbook/bigshuffle.py     | 16 ++++++++--------
 sdks/python/apache_beam/examples/cookbook/coders.py |  8 ++++----
 .../examples/cookbook/custom_ptransform.py          | 11 ++++++-----
 .../examples/cookbook/datastore_wordcount.py        |  3 ++-
 .../examples/cookbook/group_with_coder.py           |  6 ++++--
 .../apache_beam/examples/cookbook/mergecontacts.py  | 16 +++++++---------
 .../examples/cookbook/multiple_output_pardo.py      | 14 +++++++-------
 sdks/python/apache_beam/examples/wordcount.py       |  6 ++++--
 .../apache_beam/examples/wordcount_debugging.py     |  6 ++++--
 .../apache_beam/examples/wordcount_minimal.py       |  6 ++++--
 .../consumer_tracking_pipeline_visitor_test.py      |  8 ++++++--
 17 files changed, 74 insertions(+), 54 deletions(-)
----------------------------------------------------------------------