You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by pa...@apache.org on 2019/03/08 20:47:11 UTC
[beam] branch master updated: [BEAM-3208] Add Distinct PTransform
to mirror Java SDK
This is an automated email from the ASF dual-hosted git repository.
pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new d96e913 [BEAM-3208] Add Distinct PTransform to mirror Java SDK
new 3b839c8 Merge pull request #7918 from ttanay/beam-3208
d96e913 is described below
commit d96e9134c7213fadeefc1b27cc7116470560d887
Author: ttanay <tt...@gmail.com>
AuthorDate: Thu Feb 21 23:18:16 2019 +0530
[BEAM-3208] Add Distinct PTransform to mirror Java SDK
The Java SDK introduced a Distinct PTransform to replace the
RemoveDuplicates PTransform. But, the Python SDK still had the
RemoveDuplicates PTransform. A new Distinct PTransform was added
which does the same thing as RemoveDuplicates. RemoveDuplicates is
now an alias to the Distinct PTransform and is deprecated.
---
sdks/python/apache_beam/examples/complete/tfidf.py | 4 ++--
sdks/python/apache_beam/transforms/ptransform.py | 3 ++-
sdks/python/apache_beam/transforms/ptransform_test.py | 12 ++++++++++--
sdks/python/apache_beam/transforms/util.py | 15 ++++++++++++---
4 files changed, 26 insertions(+), 8 deletions(-)
diff --git a/sdks/python/apache_beam/examples/complete/tfidf.py b/sdks/python/apache_beam/examples/complete/tfidf.py
index 3eeb898..4d99b98 100644
--- a/sdks/python/apache_beam/examples/complete/tfidf.py
+++ b/sdks/python/apache_beam/examples/complete/tfidf.py
@@ -63,7 +63,7 @@ class TfIdf(beam.PTransform):
total_documents = (
uri_to_content
| 'GetUris 1' >> beam.Keys()
- | 'GetUniqueUris' >> beam.RemoveDuplicates()
+ | 'GetUniqueUris' >> beam.Distinct()
| 'CountUris' >> beam.combiners.Count.Globally())
# Create a collection of pairs mapping a URI to each of the words
@@ -81,7 +81,7 @@ class TfIdf(beam.PTransform):
# in which it appears.
word_to_doc_count = (
uri_to_words
- | 'GetUniqueWordsPerDoc' >> beam.RemoveDuplicates()
+ | 'GetUniqueWordsPerDoc' >> beam.Distinct()
| 'GetWords' >> beam.Values()
| 'CountDocsPerWord' >> beam.combiners.Count.PerElement())
diff --git a/sdks/python/apache_beam/transforms/ptransform.py b/sdks/python/apache_beam/transforms/ptransform.py
index 9eed963..bfa1c52 100644
--- a/sdks/python/apache_beam/transforms/ptransform.py
+++ b/sdks/python/apache_beam/transforms/ptransform.py
@@ -47,6 +47,7 @@ from builtins import hex
from builtins import object
from builtins import zip
from functools import reduce
+from functools import wraps
from google.protobuf import message
@@ -858,7 +859,7 @@ def ptransform_fn(fn):
(first argument if no label was specified and second argument otherwise).
"""
# TODO(robertwb): Consider removing staticmethod to allow for self parameter.
-
+ @wraps(fn)
def callable_ptransform_factory(*args, **kwargs):
return _PTransformFnPTransform(fn, *args, **kwargs)
return callable_ptransform_factory
diff --git a/sdks/python/apache_beam/transforms/ptransform_test.py b/sdks/python/apache_beam/transforms/ptransform_test.py
index 75a0102..7e9ebec 100644
--- a/sdks/python/apache_beam/transforms/ptransform_test.py
+++ b/sdks/python/apache_beam/transforms/ptransform_test.py
@@ -648,6 +648,14 @@ class PTransformTest(unittest.TestCase):
assert_that(result, equal_to([(1, 7), (2, 1), (2, 3), (2, 5), (3, 6)]))
pipeline.run()
+ def test_distinct(self):
+ pipeline = TestPipeline()
+ pcoll = pipeline | 'Start' >> beam.Create(
+ [6, 3, 1, 1, 9, 'pleat', 'pleat', 'kazoo', 'navel'])
+ result = pcoll.apply(beam.Distinct())
+ assert_that(result, equal_to([1, 3, 6, 9, 'pleat', 'kazoo', 'navel']))
+ pipeline.run()
+
def test_remove_duplicates(self):
pipeline = TestPipeline()
pcoll = pipeline | 'Start' >> beam.Create(
@@ -742,7 +750,7 @@ def SamplePTransform(pcoll):
"""Sample transform using the @ptransform_fn decorator."""
map_transform = 'ToPairs' >> beam.Map(lambda v: (v, None))
combine_transform = 'Group' >> beam.CombinePerKey(lambda vs: None)
- keys_transform = 'RemoveDuplicates' >> beam.Keys()
+ keys_transform = 'Distinct' >> beam.Keys()
return pcoll | map_transform | combine_transform | keys_transform
@@ -807,7 +815,7 @@ class PTransformLabelsTest(unittest.TestCase):
self.assertTrue('*Sample*' in pipeline.applied_labels)
self.assertTrue('*Sample*/ToPairs' in pipeline.applied_labels)
self.assertTrue('*Sample*/Group' in pipeline.applied_labels)
- self.assertTrue('*Sample*/RemoveDuplicates' in pipeline.applied_labels)
+ self.assertTrue('*Sample*/Distinct' in pipeline.applied_labels)
def test_combine_with_label(self):
vals = [1, 2, 3, 4, 5, 6, 7]
diff --git a/sdks/python/apache_beam/transforms/util.py b/sdks/python/apache_beam/transforms/util.py
index d34a734..64dce9d 100644
--- a/sdks/python/apache_beam/transforms/util.py
+++ b/sdks/python/apache_beam/transforms/util.py
@@ -51,10 +51,12 @@ from apache_beam.transforms.window import NonMergingWindowFn
from apache_beam.transforms.window import TimestampCombiner
from apache_beam.transforms.window import TimestampedValue
from apache_beam.utils import windowed_value
+from apache_beam.utils.annotations import deprecated
__all__ = [
'BatchElements',
'CoGroupByKey',
+ 'Distinct',
'Keys',
'KvSwap',
'RemoveDuplicates',
@@ -193,12 +195,19 @@ def KvSwap(label='KvSwap'): # pylint: disable=invalid-name
@ptransform_fn
-def RemoveDuplicates(pcoll): # pylint: disable=invalid-name
- """Produces a PCollection containing the unique elements of a PCollection."""
+def Distinct(pcoll): # pylint: disable=invalid-name
+ """Produces a PCollection containing distinct elements of a PCollection."""
return (pcoll
| 'ToPairs' >> Map(lambda v: (v, None))
| 'Group' >> CombinePerKey(lambda vs: None)
- | 'RemoveDuplicates' >> Keys())
+ | 'Distinct' >> Keys())
+
+
+@deprecated(since='2.12', current='Distinct')
+@ptransform_fn
+def RemoveDuplicates(pcoll):
+ """Produces a PCollection containing distinct elements of a PCollection."""
+ return pcoll | 'RemoveDuplicates' >> Distinct()
class _BatchSizeEstimator(object):