You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by bo...@apache.org on 2020/03/31 16:37:03 UTC
[beam] branch master updated: [BEAM-9454] Add Deduplication
PTransform
This is an automated email from the ASF dual-hosted git repository.
boyuanz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 203efbd [BEAM-9454] Add Deduplication PTransform
new cf07234 Merge pull request #11060 from boyuanzz/dedup
203efbd is described below
commit 203efbdaadbc3576e60bf66eb2b19a18ed886e3f
Author: Boyuan Zhang <bo...@google.com>
AuthorDate: Thu Mar 5 14:27:05 2020 -0800
[BEAM-9454] Add Deduplication PTransform
---
sdks/python/apache_beam/transforms/deduplicate.py | 134 +++++++++++++++++
.../apache_beam/transforms/deduplicate_test.py | 167 +++++++++++++++++++++
2 files changed, 301 insertions(+)
diff --git a/sdks/python/apache_beam/transforms/deduplicate.py b/sdks/python/apache_beam/transforms/deduplicate.py
new file mode 100644
index 0000000..743412e
--- /dev/null
+++ b/sdks/python/apache_beam/transforms/deduplicate.py
@@ -0,0 +1,134 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# pytype: skip-file
+
+"""a collection of ptransforms for deduplicating elements."""
+
+from __future__ import absolute_import
+from __future__ import division
+
+import typing
+
+from apache_beam import typehints
+from apache_beam.coders.coders import BooleanCoder
+from apache_beam.transforms import core
+from apache_beam.transforms import ptransform
+from apache_beam.transforms import userstate
+from apache_beam.transforms.timeutil import TimeDomain
+from apache_beam.utils import timestamp
+
+__all__ = [
+ 'Deduplicate',
+ 'DeduplicatePerKey',
+]
+
+K = typing.TypeVar('K')
+V = typing.TypeVar('V')
+
+
+@typehints.with_input_types(typing.Tuple[K, V])
+@typehints.with_output_types(typing.Tuple[K, V])
+class DeduplicatePerKey(ptransform.PTransform):
+ """ A PTransform which deduplicates <key, value> pair over a time domain and
+ threshold. Values in different windows will NOT be considered duplicates of
+ each other. Deduplication is guaranteed with respect of time domain and
+ duration.
+
+ Time durations are required so as to avoid unbounded memory and/or storage
+ requirements within a runner and care might need to be used to ensure that the
+ deduplication time limit is long enough to remove duplicates but short enough
+ to not cause performance problems within a runner. Each runner may provide an
+ optimized implementation of their choice using the deduplication time domain
+ and threshold specified.
+
+ Does not preserve any order the input PCollection might have had.
+ """
+ def __init__(self, processing_time_duration=None, event_time_duration=None):
+ if processing_time_duration is None and event_time_duration is None:
+ raise ValueError(
+ 'DeduplicatePerKey requires at lease provide either'
+ 'processing_time_duration or event_time_duration.')
+ self.processing_time_duration = processing_time_duration
+ self.event_time_duration = event_time_duration
+
+ def _create_deduplicate_fn(self):
+ processing_timer_spec = userstate.TimerSpec(
+ 'processing_timer', TimeDomain.REAL_TIME)
+ event_timer_spec = userstate.TimerSpec('event_timer', TimeDomain.WATERMARK)
+ state_spec = userstate.BagStateSpec('seen', BooleanCoder())
+ processing_time_duration = self.processing_time_duration
+ event_time_duration = self.event_time_duration
+
+ class DeduplicationFn(core.DoFn):
+ def process(
+ self,
+ kv,
+ ts=core.DoFn.TimestampParam,
+ seen_state=core.DoFn.StateParam(state_spec),
+ processing_timer=core.DoFn.TimerParam(processing_timer_spec),
+ event_timer=core.DoFn.TimerParam(event_timer_spec)):
+ if True in seen_state.read():
+ return
+
+ if processing_time_duration is not None:
+ processing_timer.set(
+ timestamp.Timestamp.now() + processing_time_duration)
+ if event_time_duration is not None:
+ event_timer.set(ts + event_time_duration)
+ seen_state.add(True)
+ yield kv
+
+ @userstate.on_timer(processing_timer_spec)
+ def process_processing_timer(
+ self, seen_state=core.DoFn.StateParam(state_spec)):
+ seen_state.clear()
+
+ @userstate.on_timer(event_timer_spec)
+ def process_event_timer(
+ self, seen_state=core.DoFn.StateParam(state_spec)):
+ seen_state.clear()
+
+ return DeduplicationFn()
+
+ def expand(self, pcoll):
+ return (
+ pcoll
+ | 'DeduplicateFn' >> core.ParDo(self._create_deduplicate_fn()))
+
+
+class Deduplicate(ptransform.PTransform):
+ """Similar to DeduplicatePerKey, the Deduplicate transform takes any arbitray
+ value as input and uses value as key to deduplicate among certain amount of
+ time duration.
+ """
+ def __init__(self, processing_time_duration=None, event_time_duration=None):
+ if processing_time_duration is None and event_time_duration is None:
+ raise ValueError(
+ 'Deduplicate requires at lease provide either'
+ 'processing_time_duration or event_time_duration.')
+ self.processing_time_duration = processing_time_duration
+ self.event_time_duration = event_time_duration
+
+ def expand(self, pcoll):
+ return (
+ pcoll
+ | 'Use Value as Key' >> core.Map(lambda x: (x, None))
+ | 'DeduplicatePerKey' >> DeduplicatePerKey(
+ processing_time_duration=self.processing_time_duration,
+ event_time_duration=self.event_time_duration)
+ | 'Output Value' >> core.Map(lambda kv: kv[0]))
diff --git a/sdks/python/apache_beam/transforms/deduplicate_test.py b/sdks/python/apache_beam/transforms/deduplicate_test.py
new file mode 100644
index 0000000..5bea895
--- /dev/null
+++ b/sdks/python/apache_beam/transforms/deduplicate_test.py
@@ -0,0 +1,167 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# pytype: skip-file
+
+"""Unit tests for deduplicate transform by using TestStream."""
+
+from __future__ import absolute_import
+
+import unittest
+
+from nose.plugins.attrib import attr
+
+import apache_beam as beam
+from apache_beam.coders import coders
+from apache_beam.testing.test_pipeline import TestPipeline
+from apache_beam.testing.test_stream import TestStream
+from apache_beam.testing.util import assert_that
+from apache_beam.testing.util import equal_to
+from apache_beam.testing.util import equal_to_per_window
+from apache_beam.transforms import deduplicate
+from apache_beam.transforms import window
+from apache_beam.utils.timestamp import Duration
+from apache_beam.utils.timestamp import Timestamp
+
+
+# TestStream is only supported in streaming pipeline. The Deduplicate transform
+# also requires Timer support. Sickbaying this testsuite until dataflow runner
+# supports both TestStream and user timer.
+@attr('ValidatesRunner', 'sickbay-batch', 'sickbay-streaming')
+class DeduplicateTest(unittest.TestCase):
+ def __init__(self, *args, **kwargs):
+ self.runner = None
+ self.options = None
+ super(DeduplicateTest, self).__init__(*args, **kwargs)
+
+ def set_runner(self, runner):
+ self.runner = runner
+
+ def set_options(self, options):
+ self.options = options
+
+ def create_pipeline(self):
+ if self.runner and self.options:
+ return TestPipeline(runner=self.runner, options=self.options)
+ elif self.runner:
+ return TestPipeline(runner=self.runner)
+ elif self.options:
+ return TestPipeline(options=self.options)
+ else:
+ return TestPipeline()
+
+ def test_deduplication_in_different_windows(self):
+ with self.create_pipeline() as p:
+ test_stream = (
+ TestStream(
+ coder=coders.StrUtf8Coder()).advance_watermark_to(0).add_elements(
+ [
+ window.TimestampedValue('k1', 0),
+ window.TimestampedValue('k2', 10),
+ window.TimestampedValue('k3', 20),
+ window.TimestampedValue('k1', 30),
+ window.TimestampedValue('k2', 40),
+ window.TimestampedValue('k3', 50),
+ window.TimestampedValue('k4', 60),
+ window.TimestampedValue('k5', 70),
+ window.TimestampedValue('k6', 80)
+ ]).advance_watermark_to_infinity())
+
+ res = (
+ p
+ | test_stream
+ | beam.WindowInto(window.FixedWindows(30))
+ | deduplicate.Deduplicate(processing_time_duration=10 * 60)
+ | beam.Map(lambda e, ts=beam.DoFn.TimestampParam: (e, ts)))
+ # Deduplication should happen per window.
+ expect_unique_keys_per_window = {
+ window.IntervalWindow(0, 30): [('k1', Timestamp(0)),
+ ('k2', Timestamp(10)),
+ ('k3', Timestamp(20))],
+ window.IntervalWindow(30, 60): [('k1', Timestamp(30)),
+ ('k2', Timestamp(40)),
+ ('k3', Timestamp(50))],
+ window.IntervalWindow(60, 90): [('k4', Timestamp(60)),
+ ('k5', Timestamp(70)),
+ ('k6', Timestamp(80))],
+ }
+ assert_that(
+ res,
+ equal_to_per_window(expect_unique_keys_per_window),
+ use_global_window=False,
+ label='assert per window')
+
+ @unittest.skip('TestStream not yet supported')
+ def test_deduplication_with_event_time(self):
+ deduplicate_duration = 60
+ with self.create_pipeline() as p:
+ test_stream = (
+ TestStream(coder=coders.StrUtf8Coder()).with_output_types(
+ str).advance_watermark_to(0).add_elements([
+ window.TimestampedValue('k1', 0),
+ window.TimestampedValue('k2', 20),
+ window.TimestampedValue('k3', 30)
+ ]).advance_watermark_to(30).add_elements([
+ window.TimestampedValue('k1', 40),
+ window.TimestampedValue('k2', 50),
+ window.TimestampedValue('k3', 60)
+ ]).advance_watermark_to(deduplicate_duration).add_elements([
+ window.TimestampedValue('k1', 70)
+ ]).advance_watermark_to_infinity())
+ res = (
+ p
+ | test_stream
+ | deduplicate.Deduplicate(
+ event_time_duration=Duration(deduplicate_duration))
+ | beam.Map(lambda e, ts=beam.DoFn.TimestampParam: (e, ts)))
+
+ assert_that(
+ res,
+ equal_to([('k1', Timestamp(0)), ('k2', Timestamp(20)),
+ ('k3', Timestamp(30)), ('k1', Timestamp(70))]))
+
+ @unittest.skip('TestStream not yet supported')
+ def test_deduplication_with_processing_time(self):
+ deduplicate_duration = 60
+ with self.create_pipeline() as p:
+ test_stream = (
+ TestStream(coder=coders.StrUtf8Coder()).with_output_types(
+ str).advance_watermark_to(0).add_elements([
+ window.TimestampedValue('k1', 0),
+ window.TimestampedValue('k2', 20),
+ window.TimestampedValue('k3', 30)
+ ]).advance_processing_time(30).add_elements([
+ window.TimestampedValue('k1', 40),
+ window.TimestampedValue('k2', 50),
+ window.TimestampedValue('k3', 60)
+ ]).advance_processing_time(deduplicate_duration).add_elements([
+ window.TimestampedValue('k1', 70)
+ ]).advance_watermark_to_infinity())
+ res = (
+ p
+ | test_stream
+ | deduplicate.Deduplicate(
+ processing_time_duration=Duration(deduplicate_duration))
+ | beam.Map(lambda e, ts=beam.DoFn.TimestampParam: (e, ts)))
+ assert_that(
+ res,
+ equal_to([('k1', Timestamp(0)), ('k2', Timestamp(20)),
+ ('k3', Timestamp(30)), ('k1', Timestamp(70))]))
+
+
+if __name__ == '__main__':
+ unittest.main()