You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by bo...@apache.org on 2020/03/31 16:37:03 UTC

[beam] branch master updated: [BEAM-9454] Add Deduplication PTransform

This is an automated email from the ASF dual-hosted git repository.

boyuanz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 203efbd  [BEAM-9454] Add Deduplication PTransform
     new cf07234  Merge pull request #11060 from boyuanzz/dedup
203efbd is described below

commit 203efbdaadbc3576e60bf66eb2b19a18ed886e3f
Author: Boyuan Zhang <bo...@google.com>
AuthorDate: Thu Mar 5 14:27:05 2020 -0800

    [BEAM-9454] Add Deduplication PTransform
---
 sdks/python/apache_beam/transforms/deduplicate.py  | 134 +++++++++++++++++
 .../apache_beam/transforms/deduplicate_test.py     | 167 +++++++++++++++++++++
 2 files changed, 301 insertions(+)

diff --git a/sdks/python/apache_beam/transforms/deduplicate.py b/sdks/python/apache_beam/transforms/deduplicate.py
new file mode 100644
index 0000000..743412e
--- /dev/null
+++ b/sdks/python/apache_beam/transforms/deduplicate.py
@@ -0,0 +1,134 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# pytype: skip-file
+
+"""a collection of ptransforms for deduplicating elements."""
+
+from __future__ import absolute_import
+from __future__ import division
+
+import typing
+
+from apache_beam import typehints
+from apache_beam.coders.coders import BooleanCoder
+from apache_beam.transforms import core
+from apache_beam.transforms import ptransform
+from apache_beam.transforms import userstate
+from apache_beam.transforms.timeutil import TimeDomain
+from apache_beam.utils import timestamp
+
+__all__ = [
+    'Deduplicate',
+    'DeduplicatePerKey',
+]
+
+K = typing.TypeVar('K')
+V = typing.TypeVar('V')
+
+
+@typehints.with_input_types(typing.Tuple[K, V])
+@typehints.with_output_types(typing.Tuple[K, V])
+class DeduplicatePerKey(ptransform.PTransform):
+  """ A PTransform which deduplicates <key, value> pair over a time domain and
+  threshold. Values in different windows will NOT be considered duplicates of
+  each other. Deduplication is guaranteed with respect of time domain and
+  duration.
+
+  Time durations are required so as to avoid unbounded memory and/or storage
+  requirements within a runner and care might need to be used to ensure that the
+  deduplication time limit is long enough to remove duplicates but short enough
+  to not cause performance problems within a runner. Each runner may provide an
+  optimized implementation of their choice using the deduplication time domain
+  and threshold specified.
+
+  Does not preserve any order the input PCollection might have had.
+  """
+  def __init__(self, processing_time_duration=None, event_time_duration=None):
+    if processing_time_duration is None and event_time_duration is None:
+      raise ValueError(
+          'DeduplicatePerKey requires at lease provide either'
+          'processing_time_duration or event_time_duration.')
+    self.processing_time_duration = processing_time_duration
+    self.event_time_duration = event_time_duration
+
+  def _create_deduplicate_fn(self):
+    processing_timer_spec = userstate.TimerSpec(
+        'processing_timer', TimeDomain.REAL_TIME)
+    event_timer_spec = userstate.TimerSpec('event_timer', TimeDomain.WATERMARK)
+    state_spec = userstate.BagStateSpec('seen', BooleanCoder())
+    processing_time_duration = self.processing_time_duration
+    event_time_duration = self.event_time_duration
+
+    class DeduplicationFn(core.DoFn):
+      def process(
+          self,
+          kv,
+          ts=core.DoFn.TimestampParam,
+          seen_state=core.DoFn.StateParam(state_spec),
+          processing_timer=core.DoFn.TimerParam(processing_timer_spec),
+          event_timer=core.DoFn.TimerParam(event_timer_spec)):
+        if True in seen_state.read():
+          return
+
+        if processing_time_duration is not None:
+          processing_timer.set(
+              timestamp.Timestamp.now() + processing_time_duration)
+        if event_time_duration is not None:
+          event_timer.set(ts + event_time_duration)
+        seen_state.add(True)
+        yield kv
+
+      @userstate.on_timer(processing_timer_spec)
+      def process_processing_timer(
+          self, seen_state=core.DoFn.StateParam(state_spec)):
+        seen_state.clear()
+
+      @userstate.on_timer(event_timer_spec)
+      def process_event_timer(
+          self, seen_state=core.DoFn.StateParam(state_spec)):
+        seen_state.clear()
+
+    return DeduplicationFn()
+
+  def expand(self, pcoll):
+    return (
+        pcoll
+        | 'DeduplicateFn' >> core.ParDo(self._create_deduplicate_fn()))
+
+
+class Deduplicate(ptransform.PTransform):
+  """Similar to DeduplicatePerKey, the Deduplicate transform takes any arbitray
+  value as input and uses value as key to deduplicate among certain amount of
+  time duration.
+  """
+  def __init__(self, processing_time_duration=None, event_time_duration=None):
+    if processing_time_duration is None and event_time_duration is None:
+      raise ValueError(
+          'Deduplicate requires at lease provide either'
+          'processing_time_duration or event_time_duration.')
+    self.processing_time_duration = processing_time_duration
+    self.event_time_duration = event_time_duration
+
+  def expand(self, pcoll):
+    return (
+        pcoll
+        | 'Use Value as Key' >> core.Map(lambda x: (x, None))
+        | 'DeduplicatePerKey' >> DeduplicatePerKey(
+            processing_time_duration=self.processing_time_duration,
+            event_time_duration=self.event_time_duration)
+        | 'Output Value' >> core.Map(lambda kv: kv[0]))
diff --git a/sdks/python/apache_beam/transforms/deduplicate_test.py b/sdks/python/apache_beam/transforms/deduplicate_test.py
new file mode 100644
index 0000000..5bea895
--- /dev/null
+++ b/sdks/python/apache_beam/transforms/deduplicate_test.py
@@ -0,0 +1,167 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# pytype: skip-file
+
+"""Unit tests for deduplicate transform by using TestStream."""
+
+from __future__ import absolute_import
+
+import unittest
+
+from nose.plugins.attrib import attr
+
+import apache_beam as beam
+from apache_beam.coders import coders
+from apache_beam.testing.test_pipeline import TestPipeline
+from apache_beam.testing.test_stream import TestStream
+from apache_beam.testing.util import assert_that
+from apache_beam.testing.util import equal_to
+from apache_beam.testing.util import equal_to_per_window
+from apache_beam.transforms import deduplicate
+from apache_beam.transforms import window
+from apache_beam.utils.timestamp import Duration
+from apache_beam.utils.timestamp import Timestamp
+
+
+# TestStream is only supported in streaming pipeline. The Deduplicate transform
+# also requires Timer support. Sickbaying this testsuite until dataflow runner
+# supports both TestStream and user timer.
+@attr('ValidatesRunner', 'sickbay-batch', 'sickbay-streaming')
+class DeduplicateTest(unittest.TestCase):
+  def __init__(self, *args, **kwargs):
+    self.runner = None
+    self.options = None
+    super(DeduplicateTest, self).__init__(*args, **kwargs)
+
+  def set_runner(self, runner):
+    self.runner = runner
+
+  def set_options(self, options):
+    self.options = options
+
+  def create_pipeline(self):
+    if self.runner and self.options:
+      return TestPipeline(runner=self.runner, options=self.options)
+    elif self.runner:
+      return TestPipeline(runner=self.runner)
+    elif self.options:
+      return TestPipeline(options=self.options)
+    else:
+      return TestPipeline()
+
+  def test_deduplication_in_different_windows(self):
+    with self.create_pipeline() as p:
+      test_stream = (
+          TestStream(
+              coder=coders.StrUtf8Coder()).advance_watermark_to(0).add_elements(
+                  [
+                      window.TimestampedValue('k1', 0),
+                      window.TimestampedValue('k2', 10),
+                      window.TimestampedValue('k3', 20),
+                      window.TimestampedValue('k1', 30),
+                      window.TimestampedValue('k2', 40),
+                      window.TimestampedValue('k3', 50),
+                      window.TimestampedValue('k4', 60),
+                      window.TimestampedValue('k5', 70),
+                      window.TimestampedValue('k6', 80)
+                  ]).advance_watermark_to_infinity())
+
+      res = (
+          p
+          | test_stream
+          | beam.WindowInto(window.FixedWindows(30))
+          | deduplicate.Deduplicate(processing_time_duration=10 * 60)
+          | beam.Map(lambda e, ts=beam.DoFn.TimestampParam: (e, ts)))
+      # Deduplication should happen per window.
+      expect_unique_keys_per_window = {
+          window.IntervalWindow(0, 30): [('k1', Timestamp(0)),
+                                         ('k2', Timestamp(10)),
+                                         ('k3', Timestamp(20))],
+          window.IntervalWindow(30, 60): [('k1', Timestamp(30)),
+                                          ('k2', Timestamp(40)),
+                                          ('k3', Timestamp(50))],
+          window.IntervalWindow(60, 90): [('k4', Timestamp(60)),
+                                          ('k5', Timestamp(70)),
+                                          ('k6', Timestamp(80))],
+      }
+      assert_that(
+          res,
+          equal_to_per_window(expect_unique_keys_per_window),
+          use_global_window=False,
+          label='assert per window')
+
+  @unittest.skip('TestStream not yet supported')
+  def test_deduplication_with_event_time(self):
+    deduplicate_duration = 60
+    with self.create_pipeline() as p:
+      test_stream = (
+          TestStream(coder=coders.StrUtf8Coder()).with_output_types(
+              str).advance_watermark_to(0).add_elements([
+                  window.TimestampedValue('k1', 0),
+                  window.TimestampedValue('k2', 20),
+                  window.TimestampedValue('k3', 30)
+              ]).advance_watermark_to(30).add_elements([
+                  window.TimestampedValue('k1', 40),
+                  window.TimestampedValue('k2', 50),
+                  window.TimestampedValue('k3', 60)
+              ]).advance_watermark_to(deduplicate_duration).add_elements([
+                  window.TimestampedValue('k1', 70)
+              ]).advance_watermark_to_infinity())
+      res = (
+          p
+          | test_stream
+          | deduplicate.Deduplicate(
+              event_time_duration=Duration(deduplicate_duration))
+          | beam.Map(lambda e, ts=beam.DoFn.TimestampParam: (e, ts)))
+
+      assert_that(
+          res,
+          equal_to([('k1', Timestamp(0)), ('k2', Timestamp(20)),
+                    ('k3', Timestamp(30)), ('k1', Timestamp(70))]))
+
+  @unittest.skip('TestStream not yet supported')
+  def test_deduplication_with_processing_time(self):
+    deduplicate_duration = 60
+    with self.create_pipeline() as p:
+      test_stream = (
+          TestStream(coder=coders.StrUtf8Coder()).with_output_types(
+              str).advance_watermark_to(0).add_elements([
+                  window.TimestampedValue('k1', 0),
+                  window.TimestampedValue('k2', 20),
+                  window.TimestampedValue('k3', 30)
+              ]).advance_processing_time(30).add_elements([
+                  window.TimestampedValue('k1', 40),
+                  window.TimestampedValue('k2', 50),
+                  window.TimestampedValue('k3', 60)
+              ]).advance_processing_time(deduplicate_duration).add_elements([
+                  window.TimestampedValue('k1', 70)
+              ]).advance_watermark_to_infinity())
+      res = (
+          p
+          | test_stream
+          | deduplicate.Deduplicate(
+              processing_time_duration=Duration(deduplicate_duration))
+          | beam.Map(lambda e, ts=beam.DoFn.TimestampParam: (e, ts)))
+      assert_that(
+          res,
+          equal_to([('k1', Timestamp(0)), ('k2', Timestamp(20)),
+                    ('k3', Timestamp(30)), ('k1', Timestamp(70))]))
+
+
+if __name__ == '__main__':
+  unittest.main()