You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2016/12/16 00:55:43 UTC

[1/2] incubator-beam git commit: Rename PTransform.apply() to PTransform.expand()

Repository: incubator-beam
Updated Branches:
  refs/heads/python-sdk d3c887480 -> e26527873


Rename PTransform.apply() to PTransform.expand()

See https://issues.apache.org/jira/browse/BEAM-1125


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/e62249a1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/e62249a1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/e62249a1

Branch: refs/heads/python-sdk
Commit: e62249a1f0a170f7e16926a3f0e6bc25d1422c22
Parents: d3c8874
Author: Ahmet Altay <al...@google.com>
Authored: Thu Dec 15 14:27:08 2016 -0800
Committer: Robert Bradshaw <ro...@google.com>
Committed: Thu Dec 15 16:51:50 2016 -0800

----------------------------------------------------------------------
 sdks/python/README.md                           |  2 +-
 .../examples/complete/autocomplete.py           |  2 +-
 .../examples/complete/estimate_pi.py            |  2 +-
 .../apache_beam/examples/complete/tfidf.py      |  2 +-
 .../examples/complete/top_wikipedia_sessions.py |  6 ++---
 .../examples/cookbook/custom_ptransform.py      |  2 +-
 .../examples/cookbook/multiple_output_pardo.py  |  2 +-
 .../apache_beam/examples/snippets/snippets.py   | 16 ++++++-------
 .../examples/snippets/snippets_test.py          |  2 +-
 .../apache_beam/examples/wordcount_debugging.py |  2 +-
 sdks/python/apache_beam/io/avroio.py            |  4 ++--
 .../apache_beam/io/datastore/v1/datastoreio.py  |  4 ++--
 sdks/python/apache_beam/io/iobase.py            |  6 ++---
 sdks/python/apache_beam/io/textio.py            |  4 ++--
 sdks/python/apache_beam/pipeline_test.py        |  4 ++--
 .../runners/dataflow/native_io/iobase.py        |  2 +-
 .../apache_beam/runners/direct/direct_runner.py |  2 +-
 sdks/python/apache_beam/runners/runner.py       |  4 ++--
 sdks/python/apache_beam/transforms/combiners.py | 14 ++++++------
 .../apache_beam/transforms/combiners_test.py    |  2 +-
 sdks/python/apache_beam/transforms/core.py      | 24 ++++++++++----------
 .../python/apache_beam/transforms/ptransform.py | 10 ++++----
 .../apache_beam/transforms/ptransform_test.py   |  6 ++---
 .../python/apache_beam/transforms/sideinputs.py | 10 ++++----
 sdks/python/apache_beam/transforms/util.py      |  4 ++--
 .../transforms/write_ptransform_test.py         |  2 +-
 .../typehints/typed_pipeline_test.py            |  2 +-
 27 files changed, 71 insertions(+), 71 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e62249a1/sdks/python/README.md
----------------------------------------------------------------------
diff --git a/sdks/python/README.md b/sdks/python/README.md
index 820084d..5ea2a60 100644
--- a/sdks/python/README.md
+++ b/sdks/python/README.md
@@ -262,7 +262,7 @@ import re
 import apache_beam as beam
 p = beam.Pipeline('DirectPipelineRunner')
 class MyCountTransform(beam.PTransform):
-  def apply(self, pcoll):
+  def expand(self, pcoll):
     return (pcoll
             | 'one word' >> beam.Map(lambda word: (word, 1))
             # GroupByKey accepts a PCollection of (word, 1) elements and

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e62249a1/sdks/python/apache_beam/examples/complete/autocomplete.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/complete/autocomplete.py b/sdks/python/apache_beam/examples/complete/autocomplete.py
index c3cd88f..eaa5ca2 100644
--- a/sdks/python/apache_beam/examples/complete/autocomplete.py
+++ b/sdks/python/apache_beam/examples/complete/autocomplete.py
@@ -60,7 +60,7 @@ class TopPerPrefix(beam.PTransform):
     super(TopPerPrefix, self).__init__()
     self._count = count
 
-  def apply(self, words):
+  def expand(self, words):
     """Compute the most common words for each possible prefixes.
 
     Args:

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e62249a1/sdks/python/apache_beam/examples/complete/estimate_pi.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/complete/estimate_pi.py b/sdks/python/apache_beam/examples/complete/estimate_pi.py
index 37c1aad..682c6d2 100644
--- a/sdks/python/apache_beam/examples/complete/estimate_pi.py
+++ b/sdks/python/apache_beam/examples/complete/estimate_pi.py
@@ -90,7 +90,7 @@ class JsonCoder(object):
 class EstimatePiTransform(beam.PTransform):
   """Runs 10M trials, and combine the results to estimate pi."""
 
-  def apply(self, pcoll):
+  def expand(self, pcoll):
     # A hundred work items of a hundred thousand tries each.
     return (pcoll
             | 'Initialize' >> beam.Create([100000] * 100).with_output_types(int)

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e62249a1/sdks/python/apache_beam/examples/complete/tfidf.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/complete/tfidf.py b/sdks/python/apache_beam/examples/complete/tfidf.py
index 043d5f6..59b2900 100644
--- a/sdks/python/apache_beam/examples/complete/tfidf.py
+++ b/sdks/python/apache_beam/examples/complete/tfidf.py
@@ -53,7 +53,7 @@ class TfIdf(beam.PTransform):
   The output is mapping from terms to scores for each document URI.
   """
 
-  def apply(self, uri_to_content):
+  def expand(self, uri_to_content):
 
     # Compute the total number of documents, and prepare a singleton
     # PCollection to use as side input.

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e62249a1/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions.py b/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions.py
index a48a383..2d66d7f 100644
--- a/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions.py
+++ b/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions.py
@@ -76,7 +76,7 @@ class ComputeSessions(beam.PTransform):
   def __init__(self):
     super(ComputeSessions, self).__init__()
 
-  def apply(self, pcoll):
+  def expand(self, pcoll):
     return (pcoll
             | beam.WindowInto('ComputeSessionsWindow',
                               window.Sessions(gap_size=ONE_HOUR_IN_SECONDS))
@@ -89,7 +89,7 @@ class TopPerMonth(beam.PTransform):
   def __init__(self):
     super(TopPerMonth, self).__init__()
 
-  def apply(self, pcoll):
+  def expand(self, pcoll):
     return (pcoll
             | beam.WindowInto('TopPerMonthWindow',
                               window.FixedWindows(
@@ -127,7 +127,7 @@ class ComputeTopSessions(beam.PTransform):
     super(ComputeTopSessions, self).__init__()
     self.sampling_threshold = sampling_threshold
 
-  def apply(self, pcoll):
+  def expand(self, pcoll):
     return (pcoll
             | beam.ParDo('ExtractUserAndTimestamp',
                          ExtractUserAndTimestampDoFn())

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e62249a1/sdks/python/apache_beam/examples/cookbook/custom_ptransform.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/custom_ptransform.py b/sdks/python/apache_beam/examples/cookbook/custom_ptransform.py
index ca13bbf..b9d64cf 100644
--- a/sdks/python/apache_beam/examples/cookbook/custom_ptransform.py
+++ b/sdks/python/apache_beam/examples/cookbook/custom_ptransform.py
@@ -36,7 +36,7 @@ from apache_beam.utils.options import PipelineOptions
 class Count1(beam.PTransform):
   """Count as a subclass of PTransform, with an apply method."""
 
-  def apply(self, pcoll):
+  def expand(self, pcoll):
     return (
         pcoll
         | 'ParWithOne' >> beam.Map(lambda v: (v, 1))

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e62249a1/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo.py b/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo.py
index d24170e..167e709 100644
--- a/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo.py
+++ b/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo.py
@@ -112,7 +112,7 @@ class CountWords(beam.PTransform):
   of "word: count" strings.
   """
 
-  def apply(self, pcoll):
+  def expand(self, pcoll):
     return (pcoll
             | 'pair_with_one' >> beam.Map(lambda x: (x, 1))
             | 'group' >> beam.GroupByKey()

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e62249a1/sdks/python/apache_beam/examples/snippets/snippets.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/snippets/snippets.py b/sdks/python/apache_beam/examples/snippets/snippets.py
index 6dcf05e..f78ecd8 100644
--- a/sdks/python/apache_beam/examples/snippets/snippets.py
+++ b/sdks/python/apache_beam/examples/snippets/snippets.py
@@ -80,7 +80,7 @@ def construct_pipeline(renames):
   class ReverseWords(beam.PTransform):
     """A PTransform that reverses individual elements in a PCollection."""
 
-    def apply(self, pcoll):
+    def expand(self, pcoll):
       return pcoll | beam.Map(lambda e: e[::-1])
 
   def filter_words(unused_x):
@@ -387,7 +387,7 @@ def pipeline_monitoring(renames):
   # The CountWords Composite Transform inside the WordCount pipeline.
   class CountWords(beam.PTransform):
 
-    def apply(self, pcoll):
+    def expand(self, pcoll):
       return (pcoll
               # Convert lines of text into individual words.
               | 'ExtractWords' >> beam.ParDo(ExtractWordsFn())
@@ -508,7 +508,7 @@ def examples_wordcount_wordcount(renames):
   # [START examples_wordcount_wordcount_composite]
   class CountWords(beam.PTransform):
 
-    def apply(self, pcoll):
+    def expand(self, pcoll):
       return (pcoll
               # Convert lines of text into individual words.
               | beam.FlatMap(
@@ -705,7 +705,7 @@ def model_custom_source(count):
       super(ReadFromCountingSource, self).__init__(**kwargs)
       self._count = count
 
-    def apply(self, pcoll):
+    def expand(self, pcoll):
       return pcoll | iobase.Read(_CountingSource(count))
   # [END model_custom_source_new_ptransform]
 
@@ -838,7 +838,7 @@ def model_custom_sink(simplekv, KVs, final_table_name_no_ptransform,
       self._url = url
       self._final_table_name = final_table_name
 
-    def apply(self, pcoll):
+    def expand(self, pcoll):
       return pcoll | iobase.Write(_SimpleKVSink(self._url,
                                                 self._final_table_name))
   # [END model_custom_sink_new_ptransform]
@@ -1001,7 +1001,7 @@ def model_composite_transform_example(contents, output_path):
   class CountWords(beam.PTransform):
     # [END composite_ptransform_declare]
 
-    def apply(self, pcoll):
+    def expand(self, pcoll):
       return (pcoll
               | beam.FlatMap(lambda x: re.findall(r'\w+', x))
               | beam.combiners.Count.PerElement()
@@ -1197,7 +1197,7 @@ def model_join_using_side_inputs(
 # [START model_library_transforms_keys]
 class Keys(beam.PTransform):
 
-  def apply(self, pcoll):
+  def expand(self, pcoll):
     return pcoll | 'Keys' >> beam.Map(lambda (k, v): k)
 # [END model_library_transforms_keys]
 # pylint: enable=invalid-name
@@ -1206,7 +1206,7 @@ class Keys(beam.PTransform):
 # [START model_library_transforms_count]
 class Count(beam.PTransform):
 
-  def apply(self, pcoll):
+  def expand(self, pcoll):
     return (
         pcoll
         | 'PairWithOne' >> beam.Map(lambda v: (v, 1))

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e62249a1/sdks/python/apache_beam/examples/snippets/snippets_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/snippets/snippets_test.py b/sdks/python/apache_beam/examples/snippets/snippets_test.py
index 09b4ba4..db2ea81 100644
--- a/sdks/python/apache_beam/examples/snippets/snippets_test.py
+++ b/sdks/python/apache_beam/examples/snippets/snippets_test.py
@@ -279,7 +279,7 @@ class TypeHintsTest(unittest.TestCase):
     @beam.typehints.with_input_types(T)
     @beam.typehints.with_output_types(beam.typehints.Tuple[int, T])
     class MyTransform(beam.PTransform):
-      def apply(self, pcoll):
+      def expand(self, pcoll):
         return pcoll | beam.Map(lambda x: (len(x), x))
 
     words_with_lens = words | MyTransform()

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e62249a1/sdks/python/apache_beam/examples/wordcount_debugging.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/wordcount_debugging.py b/sdks/python/apache_beam/examples/wordcount_debugging.py
index 473a486..cdf4e0c 100644
--- a/sdks/python/apache_beam/examples/wordcount_debugging.py
+++ b/sdks/python/apache_beam/examples/wordcount_debugging.py
@@ -95,7 +95,7 @@ class CountWords(beam.PTransform):
   def __init__(self):
     super(CountWords, self).__init__()
 
-  def apply(self, pcoll):
+  def expand(self, pcoll):
     return (pcoll
             | 'split' >> (beam.FlatMap(lambda x: re.findall(r'[A-Za-z\']+', x))
                           .with_output_types(unicode))

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e62249a1/sdks/python/apache_beam/io/avroio.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/avroio.py b/sdks/python/apache_beam/io/avroio.py
index 35d0e94..3663bdb 100644
--- a/sdks/python/apache_beam/io/avroio.py
+++ b/sdks/python/apache_beam/io/avroio.py
@@ -77,7 +77,7 @@ class ReadFromAvro(PTransform):
     super(ReadFromAvro, self).__init__()
     self._source = _AvroSource(file_pattern, min_bundle_size, validate=validate)
 
-  def apply(self, pvalue):
+  def expand(self, pvalue):
     return pvalue.pipeline | Read(self._source)
 
   def display_data(self):
@@ -294,7 +294,7 @@ class WriteToAvro(beam.transforms.PTransform):
     self._sink = _AvroSink(file_path_prefix, schema, codec, file_name_suffix,
                            num_shards, shard_name_template, mime_type)
 
-  def apply(self, pcoll):
+  def expand(self, pcoll):
     return pcoll | beam.io.iobase.Write(self._sink)
 
   def display_data(self):

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e62249a1/sdks/python/apache_beam/io/datastore/v1/datastoreio.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/datastore/v1/datastoreio.py b/sdks/python/apache_beam/io/datastore/v1/datastoreio.py
index a86bb0b..93c592d 100644
--- a/sdks/python/apache_beam/io/datastore/v1/datastoreio.py
+++ b/sdks/python/apache_beam/io/datastore/v1/datastoreio.py
@@ -102,7 +102,7 @@ class ReadFromDatastore(PTransform):
     self._query = query
     self._num_splits = num_splits
 
-  def apply(self, pcoll):
+  def expand(self, pcoll):
     # This is a composite transform involves the following:
     #   1. Create a singleton of the user provided `query` and apply a ``ParDo``
     #   that splits the query into `num_splits` and assign each split query a
@@ -312,7 +312,7 @@ class _Mutate(PTransform):
     self._mutation_fn = mutation_fn
     logging.warning('datastoreio write transform is experimental.')
 
-  def apply(self, pcoll):
+  def expand(self, pcoll):
     return (pcoll
             | 'Convert to Mutation' >> Map(self._mutation_fn)
             | 'Write Mutation to Datastore' >> ParDo(_Mutate.DatastoreWriteFn(

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e62249a1/sdks/python/apache_beam/io/iobase.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/iobase.py b/sdks/python/apache_beam/io/iobase.py
index fd6ae57..8fb5238 100644
--- a/sdks/python/apache_beam/io/iobase.py
+++ b/sdks/python/apache_beam/io/iobase.py
@@ -658,7 +658,7 @@ class Read(ptransform.PTransform):
     super(Read, self).__init__(label)
     self.source = source
 
-  def apply(self, pbegin):
+  def expand(self, pbegin):
     assert isinstance(pbegin, pvalue.PBegin)
     self.pipeline = pbegin.pipeline
     return pvalue.PCollection(self.pipeline)
@@ -723,7 +723,7 @@ class Write(ptransform.PTransform):
     return {'sink': self.sink.__class__,
             'sink_dd': self.sink}
 
-  def apply(self, pcoll):
+  def expand(self, pcoll):
     from apache_beam.runners.dataflow.native_io import iobase as dataflow_io
     if isinstance(self.sink, dataflow_io.NativeSink):
       # A native sink
@@ -746,7 +746,7 @@ class WriteImpl(ptransform.PTransform):
     super(WriteImpl, self).__init__()
     self.sink = sink
 
-  def apply(self, pcoll):
+  def expand(self, pcoll):
     do_once = pcoll.pipeline | 'DoOnce' >> core.Create([None])
     init_result_coll = do_once | core.Map(
         'initialize_write', lambda _, sink: sink.initialize_write(), self.sink)

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e62249a1/sdks/python/apache_beam/io/textio.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/textio.py b/sdks/python/apache_beam/io/textio.py
index ebadf69..09cf024 100644
--- a/sdks/python/apache_beam/io/textio.py
+++ b/sdks/python/apache_beam/io/textio.py
@@ -239,7 +239,7 @@ class ReadFromText(PTransform):
                                strip_trailing_newlines, coder,
                                validate=validate)
 
-  def apply(self, pvalue):
+  def expand(self, pvalue):
     return pvalue.pipeline | Read(self._source)
 
   def display_data(self):
@@ -297,7 +297,7 @@ class WriteToText(PTransform):
                            append_trailing_newlines, num_shards,
                            shard_name_template, coder, compression_type)
 
-  def apply(self, pcoll):
+  def expand(self, pcoll):
     return pcoll | Write(self._sink)
 
   def display_data(self):

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e62249a1/sdks/python/apache_beam/pipeline_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/pipeline_test.py b/sdks/python/apache_beam/pipeline_test.py
index c50f04d..5af4811 100644
--- a/sdks/python/apache_beam/pipeline_test.py
+++ b/sdks/python/apache_beam/pipeline_test.py
@@ -73,7 +73,7 @@ class PipelineTest(unittest.TestCase):
 
   class CustomTransform(PTransform):
 
-    def apply(self, pcoll):
+    def expand(self, pcoll):
       return pcoll | '+1' >> FlatMap(lambda x: [x + 1])
 
   class Visitor(PipelineVisitor):
@@ -174,7 +174,7 @@ class PipelineTest(unittest.TestCase):
         # No call to super(...).__init__
         self.suffix = suffix
 
-      def apply(self, pcoll):
+      def expand(self, pcoll):
         return pcoll | Map(lambda x: x + self.suffix)
 
     self.assertEqual(

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e62249a1/sdks/python/apache_beam/runners/dataflow/native_io/iobase.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/dataflow/native_io/iobase.py b/sdks/python/apache_beam/runners/dataflow/native_io/iobase.py
index 32da3a2..b6eb288 100644
--- a/sdks/python/apache_beam/runners/dataflow/native_io/iobase.py
+++ b/sdks/python/apache_beam/runners/dataflow/native_io/iobase.py
@@ -306,6 +306,6 @@ class _NativeWrite(ptransform.PTransform):
     super(_NativeWrite, self).__init__(label)
     self.sink = sink
 
-  def apply(self, pcoll):
+  def expand(self, pcoll):
     self._check_pcollection(pcoll)
     return pvalue.PDone(pcoll.pipeline)

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e62249a1/sdks/python/apache_beam/runners/direct/direct_runner.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/direct/direct_runner.py b/sdks/python/apache_beam/runners/direct/direct_runner.py
index 1afd486..fa78902 100644
--- a/sdks/python/apache_beam/runners/direct/direct_runner.py
+++ b/sdks/python/apache_beam/runners/direct/direct_runner.py
@@ -92,7 +92,7 @@ class DirectPipelineRunner(PipelineRunner):
 
   def apply(self, transform, input):  # pylint: disable=redefined-builtin
     """Runner callback for a pipeline.apply call."""
-    return transform.apply(input)
+    return transform.expand(input)
 
 
 class BufferingInMemoryCache(object):

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e62249a1/sdks/python/apache_beam/runners/runner.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/runner.py b/sdks/python/apache_beam/runners/runner.py
index 0f53d65..ec15bee 100644
--- a/sdks/python/apache_beam/runners/runner.py
+++ b/sdks/python/apache_beam/runners/runner.py
@@ -148,8 +148,8 @@ class PipelineRunner(object):
         'Execution of [%s] not implemented in runner %s.' % (transform, self))
 
   def apply_PTransform(self, transform, input):
-    # The base case of apply is to call the transform's apply.
-    return transform.apply(input)
+    # The base case of apply is to call the transform's expand.
+    return transform.expand(input)
 
   def run_transform(self, transform_node):
     """Runner callback for a pipeline.run call.

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e62249a1/sdks/python/apache_beam/transforms/combiners.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/combiners.py b/sdks/python/apache_beam/transforms/combiners.py
index 22d2b3e..96fcddd 100644
--- a/sdks/python/apache_beam/transforms/combiners.py
+++ b/sdks/python/apache_beam/transforms/combiners.py
@@ -58,13 +58,13 @@ class Mean(object):
   class Globally(ptransform.PTransform):
     """combiners.Mean.Globally computes the arithmetic mean of the elements."""
 
-    def apply(self, pcoll):
+    def expand(self, pcoll):
       return pcoll | core.CombineGlobally(MeanCombineFn())
 
   class PerKey(ptransform.PTransform):
     """combiners.Mean.PerKey finds the means of the values for each key."""
 
-    def apply(self, pcoll):
+    def expand(self, pcoll):
       return pcoll | core.CombinePerKey(MeanCombineFn())
 
 
@@ -105,19 +105,19 @@ class Count(object):
   class Globally(ptransform.PTransform):
     """combiners.Count.Globally counts the total number of elements."""
 
-    def apply(self, pcoll):
+    def expand(self, pcoll):
       return pcoll | core.CombineGlobally(CountCombineFn())
 
   class PerKey(ptransform.PTransform):
     """combiners.Count.PerKey counts how many elements each unique key has."""
 
-    def apply(self, pcoll):
+    def expand(self, pcoll):
       return pcoll | core.CombinePerKey(CountCombineFn())
 
   class PerElement(ptransform.PTransform):
     """combiners.Count.PerElement counts how many times each element occurs."""
 
-    def apply(self, pcoll):
+    def expand(self, pcoll):
       paired_with_void_type = KV[pcoll.element_type, Any]
       return (pcoll
               | (core.Map('%s:PairWithVoid' % self.label, lambda x: (x, None))
@@ -475,7 +475,7 @@ class ToList(ptransform.PTransform):
   def __init__(self, label='ToList'):
     super(ToList, self).__init__(label)
 
-  def apply(self, pcoll):
+  def expand(self, pcoll):
     return pcoll | core.CombineGlobally(self.label, ToListCombineFn())
 
 
@@ -509,7 +509,7 @@ class ToDict(ptransform.PTransform):
   def __init__(self, label='ToDict'):
     super(ToDict, self).__init__(label)
 
-  def apply(self, pcoll):
+  def expand(self, pcoll):
     return pcoll | core.CombineGlobally(self.label, ToDictCombineFn())
 
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e62249a1/sdks/python/apache_beam/transforms/combiners_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/combiners_test.py b/sdks/python/apache_beam/transforms/combiners_test.py
index 8dc274e..6113ea2 100644
--- a/sdks/python/apache_beam/transforms/combiners_test.py
+++ b/sdks/python/apache_beam/transforms/combiners_test.py
@@ -318,7 +318,7 @@ class CombineTest(unittest.TestCase):
 
   def test_combine_globally_with_default_side_input(self):
     class CombineWithSideInput(PTransform):
-      def apply(self, pcoll):
+      def expand(self, pcoll):
         side = pcoll | CombineGlobally(sum).as_singleton_view()
         main = pcoll.pipeline | Create([None])
         return main | Map(lambda _, s: s, side)

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e62249a1/sdks/python/apache_beam/transforms/core.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/core.py b/sdks/python/apache_beam/transforms/core.py
index 523c5a6..0ba1c62 100644
--- a/sdks/python/apache_beam/transforms/core.py
+++ b/sdks/python/apache_beam/transforms/core.py
@@ -598,7 +598,7 @@ class ParDo(PTransformWithSideInputs):
                                   label='Transform Function'),
             'fn_dd': self.fn}
 
-  def apply(self, pcoll):
+  def expand(self, pcoll):
     self.side_output_tags = set()
     # TODO(robertwb): Change all uses of the dofn attribute to use fn instead.
     self.dofn = self.fn
@@ -641,7 +641,7 @@ class _MultiParDo(PTransform):
     self._tags = tags
     self._main_tag = main_tag
 
-  def apply(self, pcoll):
+  def expand(self, pcoll):
     _ = pcoll | self._do_transform
     return pvalue.DoOutputsTuple(
         pcoll.pipeline, self._do_transform, self._tags, self._main_tag)
@@ -854,7 +854,7 @@ class CombineGlobally(PTransform):
   def as_singleton_view(self):
     return self.clone(as_view=True)
 
-  def apply(self, pcoll):
+  def expand(self, pcoll):
     def add_input_types(transform):
       type_hints = self.get_type_hints()
       if type_hints.input_types:
@@ -939,7 +939,7 @@ class CombinePerKey(PTransformWithSideInputs):
   def process_argspec_fn(self):
     return self.fn._fn  # pylint: disable=protected-access
 
-  def apply(self, pcoll):
+  def expand(self, pcoll):
     args, kwargs = util.insert_values_in_args(
         self.args, self.kwargs, self.side_inputs)
     return pcoll | GroupByKey() | CombineValues('Combine',
@@ -952,7 +952,7 @@ class CombineValues(PTransformWithSideInputs):
   def make_fn(self, fn):
     return fn if isinstance(fn, CombineFn) else CombineFn.from_callable(fn)
 
-  def apply(self, pcoll):
+  def expand(self, pcoll):
     args, kwargs = util.insert_values_in_args(
         self.args, self.kwargs, self.side_inputs)
 
@@ -1083,7 +1083,7 @@ class GroupByKey(PTransform):
               timer_window, name, time_domain, fire_time, state):
             yield wvalue.with_value((k, wvalue.value))
 
-  def apply(self, pcoll):
+  def expand(self, pcoll):
     # This code path is only used in the local direct runner.  For Dataflow
     # runner execution, the GroupByKey transform is expanded on the service.
     input_type = pcoll.element_type
@@ -1132,7 +1132,7 @@ class GroupByKeyOnly(PTransform):
     key_type, value_type = trivial_inference.key_value_types(input_type)
     return KV[key_type, Iterable[value_type]]
 
-  def apply(self, pcoll):
+  def expand(self, pcoll):
     self._check_pcollection(pcoll)
     return pvalue.PCollection(pcoll.pipeline)
 
@@ -1170,7 +1170,7 @@ class Partition(PTransformWithSideInputs):
   def make_fn(self, fn):
     return fn if isinstance(fn, PartitionFn) else CallableWrapperPartitionFn(fn)
 
-  def apply(self, pcoll):
+  def expand(self, pcoll):
     n = int(self.args[0])
     return pcoll | ParDo(
         self.ApplyPartitionFnFn(), self.fn, *self.args,
@@ -1261,14 +1261,14 @@ class WindowInto(ParDo):
   def infer_output_type(self, input_type):
     return input_type
 
-  def apply(self, pcoll):
+  def expand(self, pcoll):
     input_type = pcoll.element_type
 
     if input_type is not None:
       output_type = input_type
       self.with_input_types(input_type)
       self.with_output_types(output_type)
-    return super(WindowInto, self).apply(pcoll)
+    return super(WindowInto, self).expand(pcoll)
 
 
 # Python's pickling is broken for nested classes.
@@ -1305,7 +1305,7 @@ class Flatten(PTransform):
       raise ValueError('Input to Flatten must be an iterable.')
     return pvalueish, pvalueish
 
-  def apply(self, pcolls):
+  def expand(self, pcolls):
     for pcoll in pcolls:
       self._check_pcollection(pcoll)
     return pvalue.PCollection(self.pipeline)
@@ -1345,7 +1345,7 @@ class Create(PTransform):
     else:
       return Union[[trivial_inference.instance_to_type(v) for v in self.value]]
 
-  def apply(self, pbegin):
+  def expand(self, pbegin):
     assert isinstance(pbegin, pvalue.PBegin)
     self.pipeline = pbegin.pipeline
     return pvalue.PCollection(self.pipeline)

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e62249a1/sdks/python/apache_beam/transforms/ptransform.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/ptransform.py b/sdks/python/apache_beam/transforms/ptransform.py
index 2212d00..1bd7fb4 100644
--- a/sdks/python/apache_beam/transforms/ptransform.py
+++ b/sdks/python/apache_beam/transforms/ptransform.py
@@ -359,7 +359,7 @@ class PTransform(WithTypeHints, HasDisplayData):
     transform.label = new_label
     return transform
 
-  def apply(self, input_or_inputs):
+  def expand(self, input_or_inputs):
     raise NotImplementedError
 
   def __str__(self):
@@ -493,7 +493,7 @@ class ChainedPTransform(PTransform):
     else:
       return NotImplemented
 
-  def apply(self, pval):
+  def expand(self, pval):
     return reduce(operator.or_, self._parts, pval)
 
 
@@ -650,7 +650,7 @@ class CallablePTransform(PTransform):
     super(CallablePTransform, self).__init__(label=label)
     return self
 
-  def apply(self, pcoll):
+  def expand(self, pcoll):
     # Since the PTransform will be implemented entirely as a function
     # (once called), we need to pass through any type-hinting information that
     # may have been annotated via the .with_input_types() and
@@ -700,7 +700,7 @@ def ptransform_fn(fn):
         super(CustomMapper, self).__init__()
         self.mapfn = mapfn
 
-      def apply(self, pcoll):
+      def expand(self, pcoll):
         return pcoll | ParDo(self.mapfn)
 
   With either method the custom PTransform can be used in pipelines as if
@@ -738,5 +738,5 @@ class _NamedPTransform(PTransform):
   def __ror__(self, pvalueish):
     return self.transform.__ror__(pvalueish, self.label)
 
-  def apply(self, pvalue):
+  def expand(self, pvalue):
     raise RuntimeError("Should never be applied directly.")

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e62249a1/sdks/python/apache_beam/transforms/ptransform_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/ptransform_test.py b/sdks/python/apache_beam/transforms/ptransform_test.py
index e3b1026..9118fee 100644
--- a/sdks/python/apache_beam/transforms/ptransform_test.py
+++ b/sdks/python/apache_beam/transforms/ptransform_test.py
@@ -526,7 +526,7 @@ class PTransformTest(unittest.TestCase):
 
   def test_multi_input_ptransform(self):
     class DisjointUnion(PTransform):
-      def apply(self, pcollections):
+      def expand(self, pcollections):
         return (pcollections
                 | beam.Flatten()
                 | beam.Map(lambda x: (x, None))
@@ -545,7 +545,7 @@ class PTransformTest(unittest.TestCase):
         pvalueish = list(pvalueish)
         return pvalueish, sum([list(p.values()) for p in pvalueish], [])
 
-      def apply(self, pcoll_dicts):
+      def expand(self, pcoll_dicts):
         keys = reduce(operator.or_, [set(p.keys()) for p in pcoll_dicts])
         res = {}
         for k in keys:
@@ -575,7 +575,7 @@ class PTransformLabelsTest(unittest.TestCase):
 
     pardo = None
 
-    def apply(self, pcoll):
+    def expand(self, pcoll):
       self.pardo = beam.FlatMap('*do*', lambda x: [x + 1])
       return pcoll | self.pardo
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e62249a1/sdks/python/apache_beam/transforms/sideinputs.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/sideinputs.py b/sdks/python/apache_beam/transforms/sideinputs.py
index 05ba6ab..46731bf 100644
--- a/sdks/python/apache_beam/transforms/sideinputs.py
+++ b/sdks/python/apache_beam/transforms/sideinputs.py
@@ -50,7 +50,7 @@ class CreatePCollectionView(PTransform):
     # typehints.View[...].
     return input_type
 
-  def apply(self, pcoll):
+  def expand(self, pcoll):
     return self.view
 
 
@@ -68,7 +68,7 @@ class ViewAsSingleton(PTransform):
     self.has_default = has_default
     self.default_value = default_value
 
-  def apply(self, pcoll):
+  def expand(self, pcoll):
     self._check_pcollection(pcoll)
     input_type = pcoll.element_type
     output_type = input_type
@@ -93,7 +93,7 @@ class ViewAsIterable(PTransform):
       label = 'ViewAsIterable(%s)' % label
     super(ViewAsIterable, self).__init__(label=label)
 
-  def apply(self, pcoll):
+  def expand(self, pcoll):
     self._check_pcollection(pcoll)
     input_type = pcoll.element_type
     output_type = typehints.Iterable[input_type]
@@ -118,7 +118,7 @@ class ViewAsList(PTransform):
       label = 'ViewAsList(%s)' % label
     super(ViewAsList, self).__init__(label=label)
 
-  def apply(self, pcoll):
+  def expand(self, pcoll):
     self._check_pcollection(pcoll)
     input_type = pcoll.element_type
     output_type = typehints.List[input_type]
@@ -144,7 +144,7 @@ class ViewAsDict(PTransform):
       label = 'ViewAsDict(%s)' % label
     super(ViewAsDict, self).__init__(label=label)
 
-  def apply(self, pcoll):
+  def expand(self, pcoll):
     self._check_pcollection(pcoll)
     input_type = pcoll.element_type
     key_type, value_type = (

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e62249a1/sdks/python/apache_beam/transforms/util.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/util.py b/sdks/python/apache_beam/transforms/util.py
index ebe6ba9..9815996 100644
--- a/sdks/python/apache_beam/transforms/util.py
+++ b/sdks/python/apache_beam/transforms/util.py
@@ -99,7 +99,7 @@ class CoGroupByKey(PTransform):
       pcolls = tuple(pvalueish)
       return pcolls, pcolls
 
-  def apply(self, pcolls):
+  def expand(self, pcolls):
     """Performs CoGroupByKey on argument pcolls; see class docstring."""
     # For associating values in K-V pairs with the PCollections they came from.
     def _pair_tag_with_value((key, value), tag):
@@ -222,7 +222,7 @@ def assert_that(actual, matcher, label='assert_that'):
 
   class AssertThat(PTransform):
 
-    def apply(self, pipeline):
+    def expand(self, pipeline):
       return pipeline | 'singleton' >> Create([None]) | Map(
           match,
           AsList(actual | core.WindowInto(window.GlobalWindows())))

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e62249a1/sdks/python/apache_beam/transforms/write_ptransform_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/write_ptransform_test.py b/sdks/python/apache_beam/transforms/write_ptransform_test.py
index e7cdbd4..9a1a7de 100644
--- a/sdks/python/apache_beam/transforms/write_ptransform_test.py
+++ b/sdks/python/apache_beam/transforms/write_ptransform_test.py
@@ -84,7 +84,7 @@ class WriteToTestSink(PTransform):
     self.last_sink = None
     self.label = 'write_to_test_sink'
 
-  def apply(self, pcoll):
+  def expand(self, pcoll):
     self.last_sink = _TestSink(return_init_result=self.return_init_result,
                                return_write_results=self.return_write_results)
     return pcoll | beam.io.Write(self.last_sink)

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e62249a1/sdks/python/apache_beam/typehints/typed_pipeline_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/typehints/typed_pipeline_test.py b/sdks/python/apache_beam/typehints/typed_pipeline_test.py
index f2e8f12..329d657 100644
--- a/sdks/python/apache_beam/typehints/typed_pipeline_test.py
+++ b/sdks/python/apache_beam/typehints/typed_pipeline_test.py
@@ -202,7 +202,7 @@ class CustomTransformTest(unittest.TestCase):
     def _extract_input_pvalues(self, pvalueish):
       return pvalueish, (pvalueish['in0'], pvalueish['in1'])
 
-    def apply(self, pvalueish):
+    def expand(self, pvalueish):
       return {'out0': pvalueish['in0'], 'out1': pvalueish['in1']}
 
     # TODO(robertwb): (typecheck) Make these the default?


[2/2] incubator-beam git commit: Closes #1634

Posted by ro...@apache.org.
Closes #1634


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/e2652787
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/e2652787
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/e2652787

Branch: refs/heads/python-sdk
Commit: e2652787355d4c322138f55ae2c54494ec592e59
Parents: d3c8874 e62249a
Author: Robert Bradshaw <ro...@google.com>
Authored: Thu Dec 15 16:52:39 2016 -0800
Committer: Robert Bradshaw <ro...@google.com>
Committed: Thu Dec 15 16:52:39 2016 -0800

----------------------------------------------------------------------
 sdks/python/README.md                           |  2 +-
 .../examples/complete/autocomplete.py           |  2 +-
 .../examples/complete/estimate_pi.py            |  2 +-
 .../apache_beam/examples/complete/tfidf.py      |  2 +-
 .../examples/complete/top_wikipedia_sessions.py |  6 ++---
 .../examples/cookbook/custom_ptransform.py      |  2 +-
 .../examples/cookbook/multiple_output_pardo.py  |  2 +-
 .../apache_beam/examples/snippets/snippets.py   | 16 ++++++-------
 .../examples/snippets/snippets_test.py          |  2 +-
 .../apache_beam/examples/wordcount_debugging.py |  2 +-
 sdks/python/apache_beam/io/avroio.py            |  4 ++--
 .../apache_beam/io/datastore/v1/datastoreio.py  |  4 ++--
 sdks/python/apache_beam/io/iobase.py            |  6 ++---
 sdks/python/apache_beam/io/textio.py            |  4 ++--
 sdks/python/apache_beam/pipeline_test.py        |  4 ++--
 .../runners/dataflow/native_io/iobase.py        |  2 +-
 .../apache_beam/runners/direct/direct_runner.py |  2 +-
 sdks/python/apache_beam/runners/runner.py       |  4 ++--
 sdks/python/apache_beam/transforms/combiners.py | 14 ++++++------
 .../apache_beam/transforms/combiners_test.py    |  2 +-
 sdks/python/apache_beam/transforms/core.py      | 24 ++++++++++----------
 .../python/apache_beam/transforms/ptransform.py | 10 ++++----
 .../apache_beam/transforms/ptransform_test.py   |  6 ++---
 .../python/apache_beam/transforms/sideinputs.py | 10 ++++----
 sdks/python/apache_beam/transforms/util.py      |  4 ++--
 .../transforms/write_ptransform_test.py         |  2 +-
 .../typehints/typed_pipeline_test.py            |  2 +-
 27 files changed, 71 insertions(+), 71 deletions(-)
----------------------------------------------------------------------