You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by pa...@apache.org on 2019/05/24 21:19:41 UTC
[beam] branch master updated: [BEAM-6693] ApproximateUnique
transforms for Python SDK (#8535)
This is an automated email from the ASF dual-hosted git repository.
pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 7a5252e [BEAM-6693] ApproximateUnique transforms for Python SDK (#8535)
7a5252e is described below
commit 7a5252e90faab1f6f7299f8f26b2461e50b2742b
Author: Hannah Jiang <Ha...@users.noreply.github.com>
AuthorDate: Fri May 24 14:19:19 2019 -0700
[BEAM-6693] ApproximateUnique transforms for Python SDK (#8535)
* [BEAM-6693]ApproximateUnique transforms for Python SDK
---
sdks/python/apache_beam/transforms/__init__.py | 1 +
sdks/python/apache_beam/transforms/stats.py | 237 ++++++++++++++
sdks/python/apache_beam/transforms/stats_test.py | 376 +++++++++++++++++++++++
sdks/python/setup.py | 1 +
4 files changed, 615 insertions(+)
diff --git a/sdks/python/apache_beam/transforms/__init__.py b/sdks/python/apache_beam/transforms/__init__.py
index 41cfcf6..ab35331 100644
--- a/sdks/python/apache_beam/transforms/__init__.py
+++ b/sdks/python/apache_beam/transforms/__init__.py
@@ -24,6 +24,7 @@ from apache_beam.transforms import combiners
from apache_beam.transforms.core import *
from apache_beam.transforms.external import *
from apache_beam.transforms.ptransform import *
+from apache_beam.transforms.stats import *
from apache_beam.transforms.timeutil import TimeDomain
from apache_beam.transforms.util import *
diff --git a/sdks/python/apache_beam/transforms/stats.py b/sdks/python/apache_beam/transforms/stats.py
new file mode 100644
index 0000000..59c1df5
--- /dev/null
+++ b/sdks/python/apache_beam/transforms/stats.py
@@ -0,0 +1,237 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""This module has all statistic related transforms."""
+
+from __future__ import absolute_import
+from __future__ import division
+
+import heapq
+import math
+import sys
+from builtins import round
+
+import mmh3
+
+from apache_beam import coders
+from apache_beam import typehints
+from apache_beam.transforms.core import *
+from apache_beam.transforms.ptransform import PTransform
+
+__all__ = [
+ 'ApproximateUnique',
+]
+
+# Type variables
+T = typehints.TypeVariable('T')
+K = typehints.TypeVariable('K')
+V = typehints.TypeVariable('V')
+
+
+class ApproximateUnique(object):
+ """
+ Hashes input elements and uses those to extrapolate the size of the entire
+ set of hash values by assuming the rest of the hash values are as densely
+ distributed as the sample space.
+ """
+
+ _NO_VALUE_ERR_MSG = 'Either size or error should be set. Received {}.'
+ _MULTI_VALUE_ERR_MSG = 'Either size or error should be set. ' \
+ 'Received {size = %s, error = %s}.'
+ _INPUT_SIZE_ERR_MSG = 'ApproximateUnique needs a size >= 16 for an error ' \
+ '<= 0.50. In general, the estimation error is about ' \
+ '2 / sqrt(sample_size). Received {size = %s}.'
+ _INPUT_ERROR_ERR_MSG = 'ApproximateUnique needs an estimation error ' \
+ 'between 0.01 and 0.50. Received {error = %s}.'
+
+ @staticmethod
+ def parse_input_params(size=None, error=None):
+ """
+ Check if input params are valid and return sample size.
+
+ :param size: an int not smaller than 16, which we would use to estimate
+ number of unique values.
+ :param error: max estimation error, which is a float between 0.01 and 0.50.
+ If error is given, sample size will be calculated from error with
+ _get_sample_size_from_est_error function.
+ :return: sample size
+ :raises:
+ ValueError: If both size and error are given, or neither is given, or
+ values are out of range.
+ """
+
+ if None not in (size, error):
+ raise ValueError(ApproximateUnique._MULTI_VALUE_ERR_MSG % (size, error))
+ elif size is None and error is None:
+ raise ValueError(ApproximateUnique._NO_VALUE_ERR_MSG)
+ elif size is not None:
+ if not isinstance(size, int) or size < 16:
+ raise ValueError(ApproximateUnique._INPUT_SIZE_ERR_MSG % (size))
+ else:
+ return size
+ else:
+ if error < 0.01 or error > 0.5:
+ raise ValueError(ApproximateUnique._INPUT_ERROR_ERR_MSG % (error))
+ else:
+ return ApproximateUnique._get_sample_size_from_est_error(error)
+
+ @staticmethod
+ def _get_sample_size_from_est_error(est_err):
+ """
+ :return: sample size
+
+ Calculate sample size from estimation error
+ """
+ #math.ceil in python2.7 returns a float, while it returns an int in python3.
+ return int(math.ceil(4.0 / math.pow(est_err, 2.0)))
+
+ @typehints.with_input_types(T)
+ @typehints.with_output_types(int)
+ class Globally(PTransform):
+ """ Approximate.Globally approximate number of unique values"""
+
+ def __init__(self, size=None, error=None):
+ self._sample_size = ApproximateUnique.parse_input_params(size, error)
+
+ def expand(self, pcoll):
+ coder = coders.registry.get_coder(pcoll)
+ return pcoll \
+ | 'CountGlobalUniqueValues' \
+ >> (CombineGlobally(ApproximateUniqueCombineFn(self._sample_size,
+ coder)))
+
+ @typehints.with_input_types(typehints.KV[K, V])
+ @typehints.with_output_types(typehints.KV[K, int])
+ class PerKey(PTransform):
+ """ Approximate.PerKey approximate number of unique values per key"""
+
+ def __init__(self, size=None, error=None):
+ self._sample_size = ApproximateUnique.parse_input_params(size, error)
+
+ def expand(self, pcoll):
+ coder = coders.registry.get_coder(pcoll)
+ return pcoll \
+ | 'CountPerKeyUniqueValues' \
+ >> (CombinePerKey(ApproximateUniqueCombineFn(self._sample_size,
+ coder)))
+
+
+class _LargestUnique(object):
+ """
+ An object to keep samples and calculate sample hash space. It is an
+ accumulator of a combine function.
+ """
+ _HASH_SPACE_SIZE = 2.0 * sys.maxsize
+
+ def __init__(self, sample_size):
+ self._sample_size = sample_size
+ self._min_hash = sys.maxsize
+ self._sample_heap = []
+ self._sample_set = set()
+
+ def add(self, element):
+ """
+ :param an element from pcoll.
+ :return: boolean type whether the value is in the heap
+
+ Adds a value to the heap, returning whether the value is (large enough to
+ be) in the heap.
+ """
+ if len(self._sample_heap) >= self._sample_size and element < self._min_hash:
+ return False
+
+ if element not in self._sample_set:
+ self._sample_set.add(element)
+ heapq.heappush(self._sample_heap, element)
+
+ if len(self._sample_heap) > self._sample_size:
+ temp = heapq.heappop(self._sample_heap)
+ self._sample_set.remove(temp)
+ self._min_hash = self._sample_heap[0]
+ elif element < self._min_hash:
+ self._min_hash = element
+
+ return True
+
+ def get_estimate(self):
+ """
+ :return: estimation count of unique values
+
+ If heap size is smaller than sample size, just return heap size.
+ Otherwise, takes into account the possibility of hash collisions,
+ which become more likely than not for 2^32 distinct elements.
+ Note that log(1+x) ~ x for small x, so for sampleSize << maxHash
+ log(1 - sample_size/sample_space) / log(1 - 1/sample_space) ~ sample_size
+ and hence estimate ~ sample_size * hash_space / sample_space
+ as one would expect.
+
+ Given sample_size / sample_space = est / hash_space
+ est = sample_size * hash_space / sample_space
+
+ Given above sample_size approximate,
+ est = log1p(-sample_size/sample_space) / log1p(-1/sample_space)
+ * hash_space / sample_space
+ """
+
+ if len(self._sample_heap) < self._sample_size:
+ return len(self._sample_heap)
+ else:
+ sample_space_size = sys.maxsize - 1.0 * self._min_hash
+ est = (math.log1p(-self._sample_size / sample_space_size)
+ / math.log1p(-1 / sample_space_size)
+ * self._HASH_SPACE_SIZE
+ / sample_space_size)
+
+ return round(est)
+
+
+class ApproximateUniqueCombineFn(CombineFn):
+ """
+ ApproximateUniqueCombineFn computes an estimate of the number of
+ unique values that were combined.
+ """
+
+ def __init__(self, sample_size, coder):
+ self._sample_size = sample_size
+ self._coder = coder
+
+ def create_accumulator(self, *args, **kwargs):
+ return _LargestUnique(self._sample_size)
+
+ def add_input(self, accumulator, element, *args, **kwargs):
+ try:
+ accumulator.add(mmh3.hash64(self._coder.encode(element))[1])
+ return accumulator
+ except Exception as e:
+ raise RuntimeError("Runtime exception: %s", e)
+
+ # created an issue https://issues.apache.org/jira/browse/BEAM-7285 to speep up
+ # merge process.
+ def merge_accumulators(self, accumulators, *args, **kwargs):
+ merged_accumulator = self.create_accumulator()
+ for accumulator in accumulators:
+ for i in accumulator._sample_heap:
+ merged_accumulator.add(i)
+
+ return merged_accumulator
+
+ @staticmethod
+ def extract_output(accumulator):
+ return accumulator.get_estimate()
+
+ def display_data(self):
+ return {'sample_size': self._sample_size}
diff --git a/sdks/python/apache_beam/transforms/stats_test.py b/sdks/python/apache_beam/transforms/stats_test.py
new file mode 100644
index 0000000..0a6003a
--- /dev/null
+++ b/sdks/python/apache_beam/transforms/stats_test.py
@@ -0,0 +1,376 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from __future__ import absolute_import
+from __future__ import division
+
+import math
+import random
+import unittest
+from collections import defaultdict
+
+import numpy as np
+
+import apache_beam as beam
+from apache_beam.testing.test_pipeline import TestPipeline
+from apache_beam.testing.util import assert_that
+from apache_beam.testing.util import equal_to
+
+
+class ApproximateUniqueTest(unittest.TestCase):
+ """Unit tests for ApproximateUnique.Globally and ApproximateUnique.PerKey."""
+
+ def test_approximate_unique_global_by_invalid_size(self):
+ # test if the transformation throws an error as expected with an invalid
+ # small input size (< 16).
+ sample_size = 10
+ test_input = [random.randint(0, 1000) for _ in range(100)]
+
+ with self.assertRaises(ValueError) as e:
+ pipeline = TestPipeline()
+ _ = (pipeline
+ | 'create'
+ >> beam.Create(test_input)
+ | 'get_estimate'
+ >> beam.ApproximateUnique.Globally(size=sample_size))
+ pipeline.run()
+
+ expected_msg = beam.ApproximateUnique._INPUT_SIZE_ERR_MSG % (sample_size)
+
+ assert e.exception.args[0] == expected_msg
+
+ def test_approximate_unique_global_by_invalid_type_size(self):
+ # test if the transformation throws an error as expected with an invalid
+ # type of input size (not int).
+ sample_size = 100.0
+ test_input = [random.randint(0, 1000) for _ in range(100)]
+
+ with self.assertRaises(ValueError) as e:
+ pipeline = TestPipeline()
+ _ = (pipeline
+ | 'create' >> beam.Create(test_input)
+ | 'get_estimate'
+ >> beam.ApproximateUnique.Globally(size=sample_size))
+ pipeline.run()
+
+ expected_msg = beam.ApproximateUnique._INPUT_SIZE_ERR_MSG % (sample_size)
+
+ assert e.exception.args[0] == expected_msg
+
+ def test_approximate_unique_global_by_invalid_small_error(self):
+ # test if the transformation throws an error as expected with an invalid
+ # small input error (< 0.01).
+ est_err = 0.0
+ test_input = [random.randint(0, 1000) for _ in range(100)]
+
+ with self.assertRaises(ValueError) as e:
+ pipeline = TestPipeline()
+ _ = (pipeline
+ | 'create' >> beam.Create(test_input)
+ | 'get_estimate'
+ >> beam.ApproximateUnique.Globally(error=est_err))
+ pipeline.run()
+
+ expected_msg = beam.ApproximateUnique._INPUT_ERROR_ERR_MSG % (est_err)
+
+ assert e.exception.args[0] == expected_msg
+
+ def test_approximate_unique_global_by_invalid_big_error(self):
+ # test if the transformation throws an error as expected with an invalid
+ # big input error (> 0.50).
+ est_err = 0.6
+ test_input = [random.randint(0, 1000) for _ in range(100)]
+
+ with self.assertRaises(ValueError) as e:
+ pipeline = TestPipeline()
+ _ = (pipeline
+ | 'create' >> beam.Create(test_input)
+ | 'get_estimate'
+ >> beam.ApproximateUnique.Globally(error=est_err))
+ pipeline.run()
+
+ expected_msg = beam.ApproximateUnique._INPUT_ERROR_ERR_MSG % (est_err)
+
+ assert e.exception.args[0] == expected_msg
+
+ def test_approximate_unique_global_by_invalid_no_input(self):
+ # test if the transformation throws an error as expected with no input.
+ test_input = [random.randint(0, 1000) for _ in range(100)]
+
+ with self.assertRaises(ValueError) as e:
+ pipeline = TestPipeline()
+ _ = (pipeline
+ | 'create' >> beam.Create(test_input)
+ | 'get_estimate'
+ >> beam.ApproximateUnique.Globally())
+ pipeline.run()
+
+ expected_msg = beam.ApproximateUnique._NO_VALUE_ERR_MSG
+ assert e.exception.args[0] == expected_msg
+
+ def test_approximate_unique_global_by_invalid_both_input(self):
+ # test if the transformation throws an error as expected with multi input.
+ test_input = [random.randint(0, 1000) for _ in range(100)]
+ est_err = 0.2
+ sample_size = 30
+
+ with self.assertRaises(ValueError) as e:
+ pipeline = TestPipeline()
+ _ = (pipeline
+ | 'create' >> beam.Create(test_input)
+ | 'get_estimate'
+ >> beam.ApproximateUnique.Globally(size=sample_size, error=est_err))
+ pipeline.run()
+
+ expected_msg = beam.ApproximateUnique._MULTI_VALUE_ERR_MSG % (
+ sample_size, est_err)
+
+ assert e.exception.args[0] == expected_msg
+
+ def test_get_sample_size_from_est_error(self):
+ # test if get correct sample size from input error.
+ assert beam.ApproximateUnique._get_sample_size_from_est_error(0.5) == 16
+ assert beam.ApproximateUnique._get_sample_size_from_est_error(0.4) == 25
+ assert beam.ApproximateUnique._get_sample_size_from_est_error(0.2) == 100
+ assert beam.ApproximateUnique._get_sample_size_from_est_error(0.1) == 400
+ assert beam.ApproximateUnique._get_sample_size_from_est_error(0.05) == 1600
+ assert beam.ApproximateUnique._get_sample_size_from_est_error(0.01) == 40000
+
+ def test_approximate_unique_global_by_sample_size(self):
+ # test if estimation error with a given sample size is not greater than
+ # expected max error (sample size = 50% of population).
+ sample_size = 50
+ max_err = 2 / math.sqrt(sample_size)
+ test_input = [random.randint(0, 1000) for _ in range(100)]
+ actual_count = len(set(test_input))
+
+ pipeline = TestPipeline()
+ result = (pipeline
+ | 'create' >> beam.Create(test_input)
+ | 'get_estimate'
+ >> beam.ApproximateUnique.Globally(size=sample_size)
+ | 'compare'
+ >> beam.FlatMap(lambda x: [abs(x - actual_count) * 1.0
+ / actual_count <= max_err]))
+
+ assert_that(result, equal_to([True]),
+ label='assert:global_by_size')
+ pipeline.run()
+
+ def test_approximate_unique_global_by_sample_size_with_duplicates(self):
+ # test if estimation error with a given sample size is not greater than
+ # expected max error with duplicated input.
+ sample_size = 30
+ max_err = 2 / math.sqrt(sample_size)
+ test_input = [10] * 50 + [20] * 50
+ actual_count = len(set(test_input))
+
+ pipeline = TestPipeline()
+ result = (pipeline
+ | 'create' >> beam.Create(test_input)
+ | 'get_estimate'
+ >> beam.ApproximateUnique.Globally(size=sample_size)
+ | 'compare'
+ >> beam.FlatMap(lambda x: [abs(x - actual_count) * 1.0
+ / actual_count <= max_err]))
+
+ assert_that(result, equal_to([True]),
+ label='assert:global_by_size_with_duplicates')
+ pipeline.run()
+
+ def test_approximate_unique_global_by_sample_size_with_small_population(self):
+ # test if estimation is exactly same to actual value when sample size is
+ # not smaller than population size (sample size > 100% of population).
+ sample_size = 31
+ test_input = [random.randint(0, 1000) for _ in range(30)]
+ actual_count = len(set(test_input))
+
+ pipeline = TestPipeline()
+ result = (pipeline
+ | 'create' >> beam.Create(test_input)
+ | 'get_estimate'
+ >> beam.ApproximateUnique.Globally(size=sample_size))
+
+ assert_that(result, equal_to([actual_count]),
+ label='assert:global_by_sample_size_with_small_population')
+ pipeline.run()
+
+ def test_approximate_unique_global_by_sample_size_with_big_population(self):
+ # test if estimation error is smaller than expected max error with a small
+ # sample and a big population (sample size = 1% of population).
+ sample_size = 100
+ max_err = 2 / math.sqrt(sample_size)
+ test_input = [random.randint(0, 1000) for _ in range(10000)]
+ actual_count = len(set(test_input))
+
+ pipeline = TestPipeline()
+ result = (pipeline
+ | 'create' >> beam.Create(test_input)
+ | 'get_estimate'
+ >> beam.ApproximateUnique.Globally(size=sample_size)
+ | 'compare'
+ >> beam.FlatMap(lambda x: [abs(x - actual_count) * 1.0
+ / actual_count <= max_err]))
+
+ assert_that(result, equal_to([True]),
+ label='assert:global_by_sample_size_with_big_population')
+ pipeline.run()
+
+ def test_approximate_unique_global_by_error(self):
+ # test if estimation error from input error is not greater than input error.
+ est_err = 0.3
+ test_input = [random.randint(0, 1000) for _ in range(100)]
+ actual_count = len(set(test_input))
+
+ pipeline = TestPipeline()
+ result = (pipeline
+ | 'create' >> beam.Create(test_input)
+ | 'get_estimate'
+ >> beam.ApproximateUnique.Globally(error=est_err)
+ | 'compare'
+ >> beam.FlatMap(lambda x: [abs(x - actual_count) * 1.0
+ / actual_count <= est_err]))
+
+ assert_that(result, equal_to([True]),
+ label='assert:global_by_error')
+ pipeline.run()
+
+ def test_approximate_unique_global_by_error_with_samll_population(self):
+ # test if estimation error from input error of a small dataset is not
+ # greater than input error. Sample size is always not smaller than 16, so
+ # when population size is smaller than 16, estimation should be exactly
+ # same to actual value.
+ est_err = 0.01
+ test_input = [random.randint(0, 1000) for _ in range(15)]
+ actual_count = len(set(test_input))
+
+ pipeline = TestPipeline()
+ result = (pipeline
+ | 'create' >> beam.Create(test_input)
+ | 'get_estimate'
+ >> beam.ApproximateUnique.Globally(error=est_err))
+
+ assert_that(result, equal_to([actual_count]),
+ label='assert:global_by_error_with_samll_population')
+ pipeline.run()
+
+ def test_approximate_unique_global_by_error_with_big_population(self):
+ # test if estimation error from input error is with in expected range with
+ # a big population.
+ est_err = 0.2
+ test_input = [random.randint(0, 1000) for _ in range(10000)]
+ actual_count = len(set(test_input))
+
+ pipeline = TestPipeline()
+ result = (pipeline
+ | 'create' >> beam.Create(test_input)
+ | 'get_estimate'
+ >> beam.ApproximateUnique.Globally(error=est_err)
+ | 'compare'
+ >> beam.FlatMap(lambda x: [abs(x - actual_count) * 1.0
+ / actual_count <= est_err]))
+
+ assert_that(result, equal_to([True]),
+ label='assert:global_by_error_with_big_population')
+ pipeline.run()
+
+ def test_approximate_unique_perkey_by_size(self):
+ # test if est error per key from sample size is in a expected range.
+ sample_size = 50
+ max_err = 2 / math.sqrt(sample_size)
+ number_of_keys = 10
+ test_input = [(random.randint(1, number_of_keys), random.randint(0, 1000))
+ for _ in range(100)]
+ actual_count_dict = defaultdict(set)
+ for (x, y) in test_input:
+ actual_count_dict[x].add(y)
+
+ pipeline = TestPipeline()
+ result = (pipeline
+ | 'create' >> beam.Create(test_input)
+ | 'get_estimate'
+ >> beam.ApproximateUnique.PerKey(size=sample_size)
+ | 'compare'
+ >> beam.FlatMap(lambda x: [abs(x[1]
+ - len(actual_count_dict[x[0]]))
+ * 1.0 / len(actual_count_dict[x[0]])
+ <= max_err]))
+
+ assert_that(result, equal_to([True] * number_of_keys),
+ label='assert:perkey_by_size')
+ pipeline.run()
+
+ def test_approximate_unique_perkey_by_error(self):
+ # test if estimation error per key from input err is in the expected range.
+ est_err = 0.01
+ number_of_keys = 10
+ test_input = [(random.randint(1, number_of_keys), random.randint(0, 1000))
+ for _ in range(100)]
+ actual_count_dict = defaultdict(set)
+ for (x, y) in test_input:
+ actual_count_dict[x].add(y)
+
+ pipeline = TestPipeline()
+ result = (pipeline
+ | 'create' >> beam.Create(test_input)
+ | 'get_estimate'
+ >> beam.ApproximateUnique.PerKey(error=est_err)
+ | 'compare'
+ >> beam.FlatMap(lambda x: [abs(x[1]
+ - len(actual_count_dict[x[0]]))
+ * 1.0 / len(actual_count_dict[x[0]])
+ <= est_err]))
+
+ assert_that(result, equal_to([True] * number_of_keys),
+ label='assert:perkey_by_error')
+ pipeline.run()
+
+ def test_approximate_unique_globally_by_error_with_skewed_data(self):
+ # test if estimation error is within the expected range with skewed data.
+ est_err = 0.01
+
+ # generate skewed dataset
+ values = [i for i in range(200)]
+ probs = [1.0 / 200] * 200
+
+ for idx, _ in enumerate(probs):
+ if idx > 3 and idx < 20:
+ probs[idx] = probs[idx] * (1 + math.log(idx + 1))
+ if idx > 20 and idx < 40:
+ probs[idx] = probs[idx] * (1 + math.log((40 - idx) + 1))
+
+ probs = [p / sum(probs) for p in probs]
+ test_input = np.random.choice(values, 1000, p=probs)
+ actual_count = len(set(test_input))
+
+ pipeline = TestPipeline()
+ result = (pipeline
+ | 'create' >> beam.Create(test_input)
+ | 'get_estimate'
+ >> beam.ApproximateUnique.Globally(error=est_err)
+ | 'compare'
+ >> beam.FlatMap(lambda x: [abs(x - actual_count) * 1.0
+ / actual_count <= est_err]))
+
+ assert_that(result, equal_to([True]),
+ label='assert:globally_by_error_with_skewed_data')
+ pipeline.run()
+
+
+if __name__ == '__main__':
+ unittest.main()
diff --git a/sdks/python/setup.py b/sdks/python/setup.py
index 6ef9b7f..e1d18f6 100644
--- a/sdks/python/setup.py
+++ b/sdks/python/setup.py
@@ -125,6 +125,7 @@ REQUIRED_PACKAGES = [
'pyvcf>=0.6.8,<0.7.0; python_version < "3.0"',
'pyyaml>=3.12,<4.0.0',
'typing>=3.6.0,<3.7.0; python_version < "3.5.0"',
+ 'mmh3>=2.5.1,<2.5.2',
]
REQUIRED_TEST_PACKAGES = [