You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by pa...@apache.org on 2019/03/08 20:47:11 UTC

[beam] branch master updated: [BEAM-3208] Add Distinct PTransform to mirror Java SDK

This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new d96e913  [BEAM-3208] Add Distinct PTransform to mirror Java SDK
     new 3b839c8  Merge pull request #7918 from ttanay/beam-3208
d96e913 is described below

commit d96e9134c7213fadeefc1b27cc7116470560d887
Author: ttanay <tt...@gmail.com>
AuthorDate: Thu Feb 21 23:18:16 2019 +0530

    [BEAM-3208] Add Distinct PTransform to mirror Java SDK
    
    The Java SDK introduced a Distinct PTransform to replace the
    RemoveDuplicates PTransform. But, the Python SDK still had the
    RemoveDuplicates PTransform. A new Distinct PTransform was added
    which does the same thing as RemoveDuplicates. RemoveDuplicates is
    now an alias to the Distinct PTransform and is deprecated.
---
 sdks/python/apache_beam/examples/complete/tfidf.py    |  4 ++--
 sdks/python/apache_beam/transforms/ptransform.py      |  3 ++-
 sdks/python/apache_beam/transforms/ptransform_test.py | 12 ++++++++++--
 sdks/python/apache_beam/transforms/util.py            | 15 ++++++++++++---
 4 files changed, 26 insertions(+), 8 deletions(-)

diff --git a/sdks/python/apache_beam/examples/complete/tfidf.py b/sdks/python/apache_beam/examples/complete/tfidf.py
index 3eeb898..4d99b98 100644
--- a/sdks/python/apache_beam/examples/complete/tfidf.py
+++ b/sdks/python/apache_beam/examples/complete/tfidf.py
@@ -63,7 +63,7 @@ class TfIdf(beam.PTransform):
     total_documents = (
         uri_to_content
         | 'GetUris 1' >> beam.Keys()
-        | 'GetUniqueUris' >> beam.RemoveDuplicates()
+        | 'GetUniqueUris' >> beam.Distinct()
         | 'CountUris' >> beam.combiners.Count.Globally())
 
     # Create a collection of pairs mapping a URI to each of the words
@@ -81,7 +81,7 @@ class TfIdf(beam.PTransform):
     # in which it appears.
     word_to_doc_count = (
         uri_to_words
-        | 'GetUniqueWordsPerDoc' >> beam.RemoveDuplicates()
+        | 'GetUniqueWordsPerDoc' >> beam.Distinct()
         | 'GetWords' >> beam.Values()
         | 'CountDocsPerWord' >> beam.combiners.Count.PerElement())
 
diff --git a/sdks/python/apache_beam/transforms/ptransform.py b/sdks/python/apache_beam/transforms/ptransform.py
index 9eed963..bfa1c52 100644
--- a/sdks/python/apache_beam/transforms/ptransform.py
+++ b/sdks/python/apache_beam/transforms/ptransform.py
@@ -47,6 +47,7 @@ from builtins import hex
 from builtins import object
 from builtins import zip
 from functools import reduce
+from functools import wraps
 
 from google.protobuf import message
 
@@ -858,7 +859,7 @@ def ptransform_fn(fn):
   (first argument if no label was specified and second argument otherwise).
   """
   # TODO(robertwb): Consider removing staticmethod to allow for self parameter.
-
+  @wraps(fn)
   def callable_ptransform_factory(*args, **kwargs):
     return _PTransformFnPTransform(fn, *args, **kwargs)
   return callable_ptransform_factory
diff --git a/sdks/python/apache_beam/transforms/ptransform_test.py b/sdks/python/apache_beam/transforms/ptransform_test.py
index 75a0102..7e9ebec 100644
--- a/sdks/python/apache_beam/transforms/ptransform_test.py
+++ b/sdks/python/apache_beam/transforms/ptransform_test.py
@@ -648,6 +648,14 @@ class PTransformTest(unittest.TestCase):
     assert_that(result, equal_to([(1, 7), (2, 1), (2, 3), (2, 5), (3, 6)]))
     pipeline.run()
 
+  def test_distinct(self):
+    pipeline = TestPipeline()
+    pcoll = pipeline | 'Start' >> beam.Create(
+        [6, 3, 1, 1, 9, 'pleat', 'pleat', 'kazoo', 'navel'])
+    result = pcoll.apply(beam.Distinct())
+    assert_that(result, equal_to([1, 3, 6, 9, 'pleat', 'kazoo', 'navel']))
+    pipeline.run()
+
   def test_remove_duplicates(self):
     pipeline = TestPipeline()
     pcoll = pipeline | 'Start' >> beam.Create(
@@ -742,7 +750,7 @@ def SamplePTransform(pcoll):
   """Sample transform using the @ptransform_fn decorator."""
   map_transform = 'ToPairs' >> beam.Map(lambda v: (v, None))
   combine_transform = 'Group' >> beam.CombinePerKey(lambda vs: None)
-  keys_transform = 'RemoveDuplicates' >> beam.Keys()
+  keys_transform = 'Distinct' >> beam.Keys()
   return pcoll | map_transform | combine_transform | keys_transform
 
 
@@ -807,7 +815,7 @@ class PTransformLabelsTest(unittest.TestCase):
     self.assertTrue('*Sample*' in pipeline.applied_labels)
     self.assertTrue('*Sample*/ToPairs' in pipeline.applied_labels)
     self.assertTrue('*Sample*/Group' in pipeline.applied_labels)
-    self.assertTrue('*Sample*/RemoveDuplicates' in pipeline.applied_labels)
+    self.assertTrue('*Sample*/Distinct' in pipeline.applied_labels)
 
   def test_combine_with_label(self):
     vals = [1, 2, 3, 4, 5, 6, 7]
diff --git a/sdks/python/apache_beam/transforms/util.py b/sdks/python/apache_beam/transforms/util.py
index d34a734..64dce9d 100644
--- a/sdks/python/apache_beam/transforms/util.py
+++ b/sdks/python/apache_beam/transforms/util.py
@@ -51,10 +51,12 @@ from apache_beam.transforms.window import NonMergingWindowFn
 from apache_beam.transforms.window import TimestampCombiner
 from apache_beam.transforms.window import TimestampedValue
 from apache_beam.utils import windowed_value
+from apache_beam.utils.annotations import deprecated
 
 __all__ = [
     'BatchElements',
     'CoGroupByKey',
+    'Distinct',
     'Keys',
     'KvSwap',
     'RemoveDuplicates',
@@ -193,12 +195,19 @@ def KvSwap(label='KvSwap'):  # pylint: disable=invalid-name
 
 
 @ptransform_fn
-def RemoveDuplicates(pcoll):  # pylint: disable=invalid-name
-  """Produces a PCollection containing the unique elements of a PCollection."""
+def Distinct(pcoll):  # pylint: disable=invalid-name
+  """Produces a PCollection containing distinct elements of a PCollection."""
   return (pcoll
           | 'ToPairs' >> Map(lambda v: (v, None))
           | 'Group' >> CombinePerKey(lambda vs: None)
-          | 'RemoveDuplicates' >> Keys())
+          | 'Distinct' >> Keys())
+
+
+@deprecated(since='2.12', current='Distinct')
+@ptransform_fn
+def RemoveDuplicates(pcoll):
+  """Produces a PCollection containing distinct elements of a PCollection."""
+  return pcoll | 'RemoveDuplicates' >> Distinct()
 
 
 class _BatchSizeEstimator(object):