You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2017/01/03 18:42:37 UTC
[1/2] beam git commit: Updates Python SDK examples to use Beam text
source/sink.
Repository: beam
Updated Branches:
refs/heads/python-sdk f22fb9ce5 -> a779ac15d
Updates Python SDK examples to use Beam text source/sink.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/87bed83b
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/87bed83b
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/87bed83b
Branch: refs/heads/python-sdk
Commit: 87bed83bc96e73003f05ed299e08b558ac5d1e37
Parents: f22fb9c
Author: Chamikara Jayalath <ch...@google.com>
Authored: Mon Jan 2 23:21:05 2017 -0800
Committer: Robert Bradshaw <ro...@google.com>
Committed: Tue Jan 3 10:42:02 2017 -0800
----------------------------------------------------------------------
.../apache_beam/examples/complete/autocomplete.py | 6 ++++--
.../apache_beam/examples/complete/estimate_pi.py | 4 ++--
.../examples/complete/juliaset/juliaset/juliaset.py | 3 ++-
sdks/python/apache_beam/examples/complete/tfidf.py | 6 ++++--
.../examples/complete/top_wikipedia_sessions.py | 6 ++++--
.../examples/cookbook/bigquery_side_input.py | 3 ++-
.../apache_beam/examples/cookbook/bigshuffle.py | 16 ++++++++--------
sdks/python/apache_beam/examples/cookbook/coders.py | 8 ++++----
.../examples/cookbook/custom_ptransform.py | 11 ++++++-----
.../examples/cookbook/datastore_wordcount.py | 3 ++-
.../examples/cookbook/group_with_coder.py | 6 ++++--
.../apache_beam/examples/cookbook/mergecontacts.py | 16 +++++++---------
.../examples/cookbook/multiple_output_pardo.py | 14 +++++++-------
sdks/python/apache_beam/examples/wordcount.py | 6 ++++--
.../apache_beam/examples/wordcount_debugging.py | 6 ++++--
.../apache_beam/examples/wordcount_minimal.py | 6 ++++--
.../consumer_tracking_pipeline_visitor_test.py | 8 ++++++--
17 files changed, 74 insertions(+), 54 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/87bed83b/sdks/python/apache_beam/examples/complete/autocomplete.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/complete/autocomplete.py b/sdks/python/apache_beam/examples/complete/autocomplete.py
index 87e6c0c..3f2a7ae 100644
--- a/sdks/python/apache_beam/examples/complete/autocomplete.py
+++ b/sdks/python/apache_beam/examples/complete/autocomplete.py
@@ -24,6 +24,8 @@ import logging
import re
import apache_beam as beam
+from apache_beam.io import ReadFromText
+from apache_beam.io import WriteToText
from apache_beam.utils.pipeline_options import PipelineOptions
from apache_beam.utils.pipeline_options import SetupOptions
@@ -45,12 +47,12 @@ def run(argv=None):
p = beam.Pipeline(options=pipeline_options)
(p # pylint: disable=expression-not-assigned
- | 'read' >> beam.io.Read(beam.io.TextFileSource(known_args.input))
+ | 'read' >> ReadFromText(known_args.input)
| 'split' >> beam.FlatMap(lambda x: re.findall(r'[A-Za-z\']+', x))
| 'TopPerPrefix' >> TopPerPrefix(5)
| 'format' >> beam.Map(
lambda (prefix, candidates): '%s: %s' % (prefix, candidates))
- | 'write' >> beam.io.Write(beam.io.TextFileSink(known_args.output)))
+ | 'write' >> WriteToText(known_args.output))
p.run()
http://git-wip-us.apache.org/repos/asf/beam/blob/87bed83b/sdks/python/apache_beam/examples/complete/estimate_pi.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/complete/estimate_pi.py b/sdks/python/apache_beam/examples/complete/estimate_pi.py
index d0faefe..11081a6 100644
--- a/sdks/python/apache_beam/examples/complete/estimate_pi.py
+++ b/sdks/python/apache_beam/examples/complete/estimate_pi.py
@@ -33,6 +33,7 @@ import random
import apache_beam as beam
+from apache_beam.io import WriteToText
from apache_beam.typehints import Any
from apache_beam.typehints import Iterable
from apache_beam.typehints import Tuple
@@ -113,8 +114,7 @@ def run(argv=None):
(p # pylint: disable=expression-not-assigned
| EstimatePiTransform()
- | beam.io.Write(beam.io.TextFileSink(known_args.output,
- coder=JsonCoder())))
+ | WriteToText(known_args.output, coder=JsonCoder()))
# Actually run the pipeline (all operations above are deferred).
p.run()
http://git-wip-us.apache.org/repos/asf/beam/blob/87bed83b/sdks/python/apache_beam/examples/complete/juliaset/juliaset/juliaset.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/complete/juliaset/juliaset/juliaset.py b/sdks/python/apache_beam/examples/complete/juliaset/juliaset/juliaset.py
index 1445fbe..45fc1fb 100644
--- a/sdks/python/apache_beam/examples/complete/juliaset/juliaset/juliaset.py
+++ b/sdks/python/apache_beam/examples/complete/juliaset/juliaset/juliaset.py
@@ -25,6 +25,7 @@ from __future__ import absolute_import
import argparse
import apache_beam as beam
+from apache_beam.io import WriteToText
def from_pixel(x, y, n):
@@ -110,7 +111,7 @@ def run(argv=None): # pylint: disable=missing-docstring
| 'x coord' >> beam.GroupByKey()
| 'format' >> beam.Map(
lambda (k, coords): ' '.join('(%s, %s, %s)' % coord for coord in coords))
- | beam.io.Write(beam.io.TextFileSink(known_args.coordinate_output)))
+ | WriteToText(known_args.coordinate_output))
# pylint: enable=expression-not-assigned
p.run()
http://git-wip-us.apache.org/repos/asf/beam/blob/87bed83b/sdks/python/apache_beam/examples/complete/tfidf.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/complete/tfidf.py b/sdks/python/apache_beam/examples/complete/tfidf.py
index b4d5b45..59b9d6f 100644
--- a/sdks/python/apache_beam/examples/complete/tfidf.py
+++ b/sdks/python/apache_beam/examples/complete/tfidf.py
@@ -29,6 +29,8 @@ import math
import re
import apache_beam as beam
+from apache_beam.io import ReadFromText
+from apache_beam.io import WriteToText
from apache_beam.pvalue import AsSingleton
from apache_beam.utils.pipeline_options import PipelineOptions
from apache_beam.utils.pipeline_options import SetupOptions
@@ -40,7 +42,7 @@ def read_documents(pipeline, uris):
for uri in uris:
pcolls.append(
pipeline
- | beam.io.Read('read: %s' % uri, beam.io.TextFileSource(uri))
+ | 'read: %s' % uri >> ReadFromText(uri)
| beam.Map('withkey: %s' % uri, lambda v, uri: (uri, v), uri))
return pcolls | 'flatten read pcolls' >> beam.Flatten()
@@ -197,7 +199,7 @@ def run(argv=None):
output = pcoll | TfIdf()
# Write the output using a "Write" transform that has side effects.
# pylint: disable=expression-not-assigned
- output | 'write' >> beam.io.Write(beam.io.TextFileSink(known_args.output))
+ output | 'write' >> WriteToText(known_args.output)
p.run()
http://git-wip-us.apache.org/repos/asf/beam/blob/87bed83b/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions.py b/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions.py
index cbd305a..2dea642 100644
--- a/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions.py
+++ b/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions.py
@@ -46,6 +46,8 @@ import logging
import apache_beam as beam
from apache_beam import combiners
from apache_beam import window
+from apache_beam.io import ReadFromText
+from apache_beam.io import WriteToText
from apache_beam.utils.pipeline_options import PipelineOptions
from apache_beam.utils.pipeline_options import SetupOptions
@@ -168,9 +170,9 @@ def run(argv=None):
p = beam.Pipeline(options=pipeline_options)
(p # pylint: disable=expression-not-assigned
- | beam.Read(beam.io.TextFileSource(known_args.input))
+ | ReadFromText(known_args.input)
| ComputeTopSessions(known_args.sampling_threshold)
- | beam.io.Write(beam.io.TextFileSink(known_args.output)))
+ | WriteToText(known_args.output))
p.run()
http://git-wip-us.apache.org/repos/asf/beam/blob/87bed83b/sdks/python/apache_beam/examples/cookbook/bigquery_side_input.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/bigquery_side_input.py b/sdks/python/apache_beam/examples/cookbook/bigquery_side_input.py
index 25e2c3b..7c5784b 100644
--- a/sdks/python/apache_beam/examples/cookbook/bigquery_side_input.py
+++ b/sdks/python/apache_beam/examples/cookbook/bigquery_side_input.py
@@ -33,6 +33,7 @@ from random import randrange
import apache_beam as beam
+from apache_beam.io import WriteToText
from apache_beam.pvalue import AsList
from apache_beam.pvalue import AsSingleton
from apache_beam.utils.pipeline_options import PipelineOptions
@@ -113,7 +114,7 @@ def run(argv=None):
pcoll_ignore_corpus, pcoll_ignore_word)
# pylint:disable=expression-not-assigned
- pcoll_groups | beam.io.Write(beam.io.TextFileSink(known_args.output))
+ pcoll_groups | WriteToText(known_args.output)
p.run()
http://git-wip-us.apache.org/repos/asf/beam/blob/87bed83b/sdks/python/apache_beam/examples/cookbook/bigshuffle.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/bigshuffle.py b/sdks/python/apache_beam/examples/cookbook/bigshuffle.py
index 83d3881..ceeefd6 100644
--- a/sdks/python/apache_beam/examples/cookbook/bigshuffle.py
+++ b/sdks/python/apache_beam/examples/cookbook/bigshuffle.py
@@ -25,6 +25,8 @@ import logging
import apache_beam as beam
+from apache_beam.io import ReadFromText
+from apache_beam.io import WriteToText
from apache_beam.utils.pipeline_options import PipelineOptions
from apache_beam.utils.pipeline_options import SetupOptions
@@ -53,9 +55,7 @@ def run(argv=None):
p = beam.Pipeline(options=pipeline_options)
# Read the text file[pattern] into a PCollection.
- lines = p | beam.io.Read(
- beam.io.TextFileSource(known_args.input,
- coder=beam.coders.BytesCoder()))
+ lines = p | ReadFromText(known_args.input, coder=beam.coders.BytesCoder())
# Count the occurrences of each word.
output = (lines
@@ -68,7 +68,7 @@ def run(argv=None):
lambda (key, vals): ['%s%s' % (key, val) for val in vals]))
# Write the output using a "Write" transform that has side effects.
- output | beam.io.Write(beam.io.TextFileSink(known_args.output))
+ output | WriteToText(known_args.output)
# Optionally write the input and output checksums.
if known_args.checksum_output:
@@ -76,15 +76,15 @@ def run(argv=None):
| 'input-csum' >> beam.Map(crc32line)
| 'combine-input-csum' >> beam.CombineGlobally(sum)
| 'hex-format' >> beam.Map(lambda x: '%x' % x))
- input_csum | 'write-input-csum' >> beam.io.Write(
- beam.io.TextFileSink(known_args.checksum_output + '-input'))
+ input_csum | 'write-input-csum' >> WriteToText(
+ known_args.checksum_output + '-input')
output_csum = (output
| 'output-csum' >> beam.Map(crc32line)
| 'combine-output-csum' >> beam.CombineGlobally(sum)
| 'hex-format-output' >> beam.Map(lambda x: '%x' % x))
- output_csum | 'write-output-csum' >> beam.io.Write(
- beam.io.TextFileSink(known_args.checksum_output + '-output'))
+ output_csum | 'write-output-csum' >> WriteToText(
+ known_args.checksum_output + '-output')
# Actually run the pipeline (all operations above are deferred).
p.run()
http://git-wip-us.apache.org/repos/asf/beam/blob/87bed83b/sdks/python/apache_beam/examples/cookbook/coders.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/coders.py b/sdks/python/apache_beam/examples/cookbook/coders.py
index 690ba66..ede9d70 100644
--- a/sdks/python/apache_beam/examples/cookbook/coders.py
+++ b/sdks/python/apache_beam/examples/cookbook/coders.py
@@ -35,6 +35,8 @@ import json
import logging
import apache_beam as beam
+from apache_beam.io import ReadFromText
+from apache_beam.io import WriteToText
from apache_beam.utils.pipeline_options import PipelineOptions
from apache_beam.utils.pipeline_options import SetupOptions
@@ -87,12 +89,10 @@ def run(argv=None):
p = beam.Pipeline(argv=pipeline_args)
(p # pylint: disable=expression-not-assigned
- | beam.io.Read('read',
- beam.io.TextFileSource(known_args.input, coder=JsonCoder()))
+ | 'read' >> ReadFromText(known_args.input, coder=JsonCoder())
| 'points' >> beam.FlatMap(compute_points)
| beam.CombinePerKey(sum)
- | beam.io.Write('write',
- beam.io.TextFileSink(known_args.output, coder=JsonCoder())))
+ | 'write' >> WriteToText(known_args.output, coder=JsonCoder()))
p.run()
http://git-wip-us.apache.org/repos/asf/beam/blob/87bed83b/sdks/python/apache_beam/examples/cookbook/custom_ptransform.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/custom_ptransform.py b/sdks/python/apache_beam/examples/cookbook/custom_ptransform.py
index 56259ed..ef6bc5a 100644
--- a/sdks/python/apache_beam/examples/cookbook/custom_ptransform.py
+++ b/sdks/python/apache_beam/examples/cookbook/custom_ptransform.py
@@ -26,7 +26,8 @@ import argparse
import logging
import apache_beam as beam
-
+from apache_beam.io import ReadFromText
+from apache_beam.io import WriteToText
from apache_beam.utils.pipeline_options import PipelineOptions
@@ -66,9 +67,9 @@ def run_count2(known_args, options):
"""Runs the second example pipeline."""
logging.info('Running second pipeline')
p = beam.Pipeline(options=options)
- (p | beam.io.Read(beam.io.TextFileSource(known_args.input))
+ (p | ReadFromText(known_args.input)
| Count2() # pylint: disable=no-value-for-parameter
- | beam.io.Write(beam.io.TextFileSink(known_args.output)))
+ | WriteToText(known_args.output))
p.run()
@@ -93,9 +94,9 @@ def run_count3(known_args, options):
"""Runs the third example pipeline."""
logging.info('Running third pipeline')
p = beam.Pipeline(options=options)
- (p | beam.io.Read(beam.io.TextFileSource(known_args.input))
+ (p | ReadFromText(known_args.input)
| Count3(2) # pylint: disable=no-value-for-parameter
- | beam.io.Write(beam.io.TextFileSink(known_args.output)))
+ | WriteToText(known_args.output))
p.run()
http://git-wip-us.apache.org/repos/asf/beam/blob/87bed83b/sdks/python/apache_beam/examples/cookbook/datastore_wordcount.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/datastore_wordcount.py b/sdks/python/apache_beam/examples/cookbook/datastore_wordcount.py
index dd34070..8f68fb4 100644
--- a/sdks/python/apache_beam/examples/cookbook/datastore_wordcount.py
+++ b/sdks/python/apache_beam/examples/cookbook/datastore_wordcount.py
@@ -71,6 +71,7 @@ from google.datastore.v1 import query_pb2
from googledatastore import helper as datastore_helper, PropertyFilter
import apache_beam as beam
+from apache_beam.io import ReadFromText
from apache_beam.io.datastore.v1.datastoreio import ReadFromDatastore
from apache_beam.io.datastore.v1.datastoreio import WriteToDatastore
from apache_beam.utils.pipeline_options import GoogleCloudOptions
@@ -133,7 +134,7 @@ def write_to_datastore(project, user_options, pipeline_options):
# pylint: disable=expression-not-assigned
(p
- | 'read' >> beam.io.Read(beam.io.TextFileSource(user_options.input))
+ | 'read' >> ReadFromText(user_options.input)
| 'create entity' >> beam.Map(
EntityWrapper(user_options.namespace, user_options.kind,
user_options.ancestor).make_entity)
http://git-wip-us.apache.org/repos/asf/beam/blob/87bed83b/sdks/python/apache_beam/examples/cookbook/group_with_coder.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/group_with_coder.py b/sdks/python/apache_beam/examples/cookbook/group_with_coder.py
index c4b8c59..78540d1 100644
--- a/sdks/python/apache_beam/examples/cookbook/group_with_coder.py
+++ b/sdks/python/apache_beam/examples/cookbook/group_with_coder.py
@@ -33,6 +33,8 @@ import sys
import apache_beam as beam
from apache_beam import coders
+from apache_beam.io import ReadFromText
+from apache_beam.io import WriteToText
from apache_beam.typehints import typehints
from apache_beam.typehints.decorators import with_output_types
from apache_beam.utils.pipeline_options import PipelineOptions
@@ -98,7 +100,7 @@ def run(argv=sys.argv[1:]):
coders.registry.register_coder(Player, PlayerCoder)
(p # pylint: disable=expression-not-assigned
- | beam.io.Read(beam.io.TextFileSource(known_args.input))
+ | ReadFromText(known_args.input)
# The get_players function is annotated with a type hint above, so the type
# system knows the output type of the following operation is a key-value pair
# of a Player and an int. Please see the documentation for details on
@@ -111,7 +113,7 @@ def run(argv=sys.argv[1:]):
# encode Player objects as keys for this combine operation.
| beam.CombinePerKey(sum)
| beam.Map(lambda (k, v): '%s,%d' % (k.name, v))
- | beam.io.Write(beam.io.TextFileSink(known_args.output)))
+ | WriteToText(known_args.output))
p.run()
http://git-wip-us.apache.org/repos/asf/beam/blob/87bed83b/sdks/python/apache_beam/examples/cookbook/mergecontacts.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/mergecontacts.py b/sdks/python/apache_beam/examples/cookbook/mergecontacts.py
index 6602609..2475e02 100644
--- a/sdks/python/apache_beam/examples/cookbook/mergecontacts.py
+++ b/sdks/python/apache_beam/examples/cookbook/mergecontacts.py
@@ -36,6 +36,8 @@ import logging
import re
import apache_beam as beam
+from apache_beam.io import ReadFromText
+from apache_beam.io import WriteToText
from apache_beam.utils.pipeline_options import PipelineOptions
from apache_beam.utils.pipeline_options import SetupOptions
@@ -72,7 +74,7 @@ def run(argv=None, assert_results=None):
# quotes/backslashes, and convert it a PCollection of (key, value) pairs.
def read_kv_textfile(label, textfile):
return (p
- | beam.io.Read('read_%s' % label, textfile)
+ | 'read_%s' % label >> ReadFromText(textfile)
| beam.Map('backslash_%s' % label,
lambda x: re.sub(r'\\', r'\\\\', x))
| beam.Map('escape_quotes_%s' % label,
@@ -80,12 +82,9 @@ def run(argv=None, assert_results=None):
| beam.Map('split_%s' % label, lambda x: re.split(r'\t+', x, 1)))
# Read input databases.
- email = read_kv_textfile('email',
- beam.io.TextFileSource(known_args.input_email))
- phone = read_kv_textfile('phone',
- beam.io.TextFileSource(known_args.input_phone))
- snailmail = read_kv_textfile('snailmail', beam.io.TextFileSource(
- known_args.input_snailmail))
+ email = read_kv_textfile('email', known_args.input_email)
+ phone = read_kv_textfile('phone', known_args.input_phone)
+ snailmail = read_kv_textfile('snailmail', known_args.input_snailmail)
# Group together all entries under the same name.
grouped = (email, phone, snailmail) | 'group_by_name' >> beam.CoGroupByKey()
@@ -113,8 +112,7 @@ def run(argv=None, assert_results=None):
# Write tab-delimited output.
# pylint: disable=expression-not-assigned
- tsv_lines | beam.io.Write('write_tsv',
- beam.io.TextFileSink(known_args.output_tsv))
+ tsv_lines | 'write_tsv' >> WriteToText(known_args.output_tsv)
# TODO(silviuc): Move the assert_results logic to the unit test.
if assert_results is not None:
http://git-wip-us.apache.org/repos/asf/beam/blob/87bed83b/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo.py b/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo.py
index dd91e74..3acebc6 100644
--- a/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo.py
+++ b/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo.py
@@ -54,6 +54,8 @@ import re
import apache_beam as beam
from apache_beam import pvalue
+from apache_beam.io import ReadFromText
+from apache_beam.io import WriteToText
from apache_beam.utils.pipeline_options import PipelineOptions
from apache_beam.utils.pipeline_options import SetupOptions
@@ -137,7 +139,7 @@ def run(argv=None):
pipeline_options.view_as(SetupOptions).save_main_session = True
p = beam.Pipeline(options=pipeline_options)
- lines = p | beam.Read(beam.io.TextFileSource(known_args.input))
+ lines = p | ReadFromText(known_args.input)
# with_outputs allows accessing the side outputs of a DoFn.
split_lines_result = (lines
@@ -158,20 +160,18 @@ def run(argv=None):
| 'pair_with_key' >> beam.Map(lambda x: ('chars_temp_key', x))
| beam.GroupByKey()
| 'count chars' >> beam.Map(lambda (_, counts): sum(counts))
- | 'write chars' >> beam.Write(
- beam.io.TextFileSink(known_args.output + '-chars')))
+ | 'write chars' >> WriteToText(known_args.output + '-chars'))
# pylint: disable=expression-not-assigned
(short_words
| 'count short words' >> CountWords()
- | 'write short words' >> beam.Write(
- beam.io.TextFileSink(known_args.output + '-short-words')))
+ | 'write short words' >> WriteToText(
+ known_args.output + '-short-words'))
# pylint: disable=expression-not-assigned
(words
| 'count words' >> CountWords()
- | 'write words' >> beam.Write(
- beam.io.TextFileSink(known_args.output + '-words')))
+ | 'write words' >> WriteToText(known_args.output + '-words'))
p.run()
http://git-wip-us.apache.org/repos/asf/beam/blob/87bed83b/sdks/python/apache_beam/examples/wordcount.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/wordcount.py b/sdks/python/apache_beam/examples/wordcount.py
index 51fc2eb..211211d 100644
--- a/sdks/python/apache_beam/examples/wordcount.py
+++ b/sdks/python/apache_beam/examples/wordcount.py
@@ -24,6 +24,8 @@ import logging
import re
import apache_beam as beam
+from apache_beam.io import ReadFromText
+from apache_beam.io import WriteToText
from apache_beam.utils.pipeline_options import PipelineOptions
from apache_beam.utils.pipeline_options import SetupOptions
@@ -76,7 +78,7 @@ def run(argv=None):
p = beam.Pipeline(options=pipeline_options)
# Read the text file[pattern] into a PCollection.
- lines = p | 'read' >> beam.io.Read(beam.io.TextFileSource(known_args.input))
+ lines = p | 'read' >> ReadFromText(known_args.input)
# Count the occurrences of each word.
counts = (lines
@@ -91,7 +93,7 @@ def run(argv=None):
# Write the output using a "Write" transform that has side effects.
# pylint: disable=expression-not-assigned
- output | 'write' >> beam.io.Write(beam.io.TextFileSink(known_args.output))
+ output | 'write' >> WriteToText(known_args.output)
# Actually run the pipeline (all operations above are deferred).
result = p.run()
http://git-wip-us.apache.org/repos/asf/beam/blob/87bed83b/sdks/python/apache_beam/examples/wordcount_debugging.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/wordcount_debugging.py b/sdks/python/apache_beam/examples/wordcount_debugging.py
index bba09b4..14f379d 100644
--- a/sdks/python/apache_beam/examples/wordcount_debugging.py
+++ b/sdks/python/apache_beam/examples/wordcount_debugging.py
@@ -46,6 +46,8 @@ import logging
import re
import apache_beam as beam
+from apache_beam.io import ReadFromText
+from apache_beam.io import WriteToText
from apache_beam.utils.pipeline_options import PipelineOptions
from apache_beam.utils.pipeline_options import SetupOptions
@@ -126,7 +128,7 @@ def run(argv=None):
# Read the text file[pattern] into a PCollection, count the occurrences of
# each word and filter by a list of words.
filtered_words = (
- p | 'read' >> beam.io.Read(beam.io.TextFileSource(known_args.input))
+ p | 'read' >> ReadFromText(known_args.input)
| CountWords()
| 'FilterText' >> beam.ParDo(FilterTextFn('Flourish|stomach')))
@@ -146,7 +148,7 @@ def run(argv=None):
# pylint: disable=unused-variable
output = (filtered_words
| 'format' >> beam.Map(lambda (word, c): '%s: %s' % (word, c))
- | 'write' >> beam.io.Write(beam.io.TextFileSink(known_args.output)))
+ | 'write' >> WriteToText(known_args.output))
# Actually run the pipeline (all operations above are deferred).
p.run()
http://git-wip-us.apache.org/repos/asf/beam/blob/87bed83b/sdks/python/apache_beam/examples/wordcount_minimal.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/wordcount_minimal.py b/sdks/python/apache_beam/examples/wordcount_minimal.py
index c02ec16..18595d0 100644
--- a/sdks/python/apache_beam/examples/wordcount_minimal.py
+++ b/sdks/python/apache_beam/examples/wordcount_minimal.py
@@ -51,6 +51,8 @@ import logging
import re
import apache_beam as beam
+from apache_beam.io import ReadFromText
+from apache_beam.io import WriteToText
from apache_beam.utils.pipeline_options import PipelineOptions
from apache_beam.utils.pipeline_options import SetupOptions
@@ -93,7 +95,7 @@ def run(argv=None):
p = beam.Pipeline(options=pipeline_options)
# Read the text file[pattern] into a PCollection.
- lines = p | 'read' >> beam.io.Read(beam.io.TextFileSource(known_args.input))
+ lines = p | 'read' >> ReadFromText(known_args.input)
# Count the occurrences of each word.
counts = (lines
@@ -108,7 +110,7 @@ def run(argv=None):
# Write the output using a "Write" transform that has side effects.
# pylint: disable=expression-not-assigned
- output | 'write' >> beam.io.Write(beam.io.TextFileSink(known_args.output))
+ output | 'write' >> WriteToText(known_args.output)
# Actually run the pipeline (all operations above are deferred).
p.run()
http://git-wip-us.apache.org/repos/asf/beam/blob/87bed83b/sdks/python/apache_beam/runners/direct/consumer_tracking_pipeline_visitor_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/direct/consumer_tracking_pipeline_visitor_test.py b/sdks/python/apache_beam/runners/direct/consumer_tracking_pipeline_visitor_test.py
index 6fc9d83..27cf1b6 100644
--- a/sdks/python/apache_beam/runners/direct/consumer_tracking_pipeline_visitor_test.py
+++ b/sdks/python/apache_beam/runners/direct/consumer_tracking_pipeline_visitor_test.py
@@ -21,8 +21,8 @@ import logging
import unittest
from apache_beam import pvalue
+from apache_beam.io import iobase
from apache_beam.io import Read
-from apache_beam.io import TextFileSource
from apache_beam.pipeline import Pipeline
from apache_beam.pvalue import AsList
from apache_beam.runners.direct import DirectRunner
@@ -47,7 +47,11 @@ class ConsumerTrackingPipelineVisitorTest(unittest.TestCase):
def test_root_transforms(self):
root_create = Create('create', [[1, 2, 3]])
- root_read = Read('read', TextFileSource('/tmp/somefile'))
+
+ class DummySource(iobase.BoundedSource):
+ pass
+
+ root_read = Read('read', DummySource())
root_flatten = Flatten('flatten', pipeline=self.pipeline)
pbegin = pvalue.PBegin(self.pipeline)
[2/2] beam git commit: Closes #1728
Posted by ro...@apache.org.
Closes #1728
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/a779ac15
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/a779ac15
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/a779ac15
Branch: refs/heads/python-sdk
Commit: a779ac15d284413cb07f19ded7abcc2d6e72aa3d
Parents: f22fb9c 87bed83
Author: Robert Bradshaw <ro...@google.com>
Authored: Tue Jan 3 10:42:03 2017 -0800
Committer: Robert Bradshaw <ro...@google.com>
Committed: Tue Jan 3 10:42:03 2017 -0800
----------------------------------------------------------------------
.../apache_beam/examples/complete/autocomplete.py | 6 ++++--
.../apache_beam/examples/complete/estimate_pi.py | 4 ++--
.../examples/complete/juliaset/juliaset/juliaset.py | 3 ++-
sdks/python/apache_beam/examples/complete/tfidf.py | 6 ++++--
.../examples/complete/top_wikipedia_sessions.py | 6 ++++--
.../examples/cookbook/bigquery_side_input.py | 3 ++-
.../apache_beam/examples/cookbook/bigshuffle.py | 16 ++++++++--------
sdks/python/apache_beam/examples/cookbook/coders.py | 8 ++++----
.../examples/cookbook/custom_ptransform.py | 11 ++++++-----
.../examples/cookbook/datastore_wordcount.py | 3 ++-
.../examples/cookbook/group_with_coder.py | 6 ++++--
.../apache_beam/examples/cookbook/mergecontacts.py | 16 +++++++---------
.../examples/cookbook/multiple_output_pardo.py | 14 +++++++-------
sdks/python/apache_beam/examples/wordcount.py | 6 ++++--
.../apache_beam/examples/wordcount_debugging.py | 6 ++++--
.../apache_beam/examples/wordcount_minimal.py | 6 ++++--
.../consumer_tracking_pipeline_visitor_test.py | 8 ++++++--
17 files changed, 74 insertions(+), 54 deletions(-)
----------------------------------------------------------------------