You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by pa...@apache.org on 2019/05/24 21:19:41 UTC

[beam] branch master updated: [BEAM-6693] ApproximateUnique transforms for Python SDK (#8535)

This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 7a5252e  [BEAM-6693] ApproximateUnique transforms for Python SDK (#8535)
7a5252e is described below

commit 7a5252e90faab1f6f7299f8f26b2461e50b2742b
Author: Hannah Jiang <Ha...@users.noreply.github.com>
AuthorDate: Fri May 24 14:19:19 2019 -0700

    [BEAM-6693] ApproximateUnique transforms for Python SDK (#8535)
    
    * [BEAM-6693]ApproximateUnique transforms for Python SDK
---
 sdks/python/apache_beam/transforms/__init__.py   |   1 +
 sdks/python/apache_beam/transforms/stats.py      | 237 ++++++++++++++
 sdks/python/apache_beam/transforms/stats_test.py | 376 +++++++++++++++++++++++
 sdks/python/setup.py                             |   1 +
 4 files changed, 615 insertions(+)

diff --git a/sdks/python/apache_beam/transforms/__init__.py b/sdks/python/apache_beam/transforms/__init__.py
index 41cfcf6..ab35331 100644
--- a/sdks/python/apache_beam/transforms/__init__.py
+++ b/sdks/python/apache_beam/transforms/__init__.py
@@ -24,6 +24,7 @@ from apache_beam.transforms import combiners
 from apache_beam.transforms.core import *
 from apache_beam.transforms.external import *
 from apache_beam.transforms.ptransform import *
+from apache_beam.transforms.stats import *
 from apache_beam.transforms.timeutil import TimeDomain
 from apache_beam.transforms.util import *
 
diff --git a/sdks/python/apache_beam/transforms/stats.py b/sdks/python/apache_beam/transforms/stats.py
new file mode 100644
index 0000000..59c1df5
--- /dev/null
+++ b/sdks/python/apache_beam/transforms/stats.py
@@ -0,0 +1,237 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""This module has all statistic related transforms."""
+
+from __future__ import absolute_import
+from __future__ import division
+
+import heapq
+import math
+import sys
+from builtins import round
+
+import mmh3
+
+from apache_beam import coders
+from apache_beam import typehints
+from apache_beam.transforms.core import *
+from apache_beam.transforms.ptransform import PTransform
+
+__all__ = [
+    'ApproximateUnique',
+]
+
+# Type variables
+T = typehints.TypeVariable('T')
+K = typehints.TypeVariable('K')
+V = typehints.TypeVariable('V')
+
+
+class ApproximateUnique(object):
+  """
+  Hashes input elements and uses those to extrapolate the size of the entire
+  set of hash values by assuming the rest of the hash values are as densely
+  distributed as the sample space.
+  """
+
+  _NO_VALUE_ERR_MSG = 'Either size or error should be set. Received {}.'
+  _MULTI_VALUE_ERR_MSG = 'Either size or error should be set. ' \
+                         'Received {size = %s, error = %s}.'
+  _INPUT_SIZE_ERR_MSG = 'ApproximateUnique needs a size >= 16 for an error ' \
+                        '<= 0.50. In general, the estimation error is about ' \
+                        '2 / sqrt(sample_size). Received {size = %s}.'
+  _INPUT_ERROR_ERR_MSG = 'ApproximateUnique needs an estimation error ' \
+                         'between 0.01 and 0.50. Received {error = %s}.'
+
+  @staticmethod
+  def parse_input_params(size=None, error=None):
+    """
+    Check if input params are valid and return sample size.
+
+    :param size: an int not smaller than 16, which we would use to estimate
+      number of unique values.
+    :param error: max estimation error, which is a float between 0.01 and 0.50.
+      If error is given, sample size will be calculated from error with
+      _get_sample_size_from_est_error function.
+    :return: sample size
+    :raises:
+      ValueError: If both size and error are given, or neither is given, or
+      values are out of range.
+    """
+
+    if None not in (size, error):
+      raise ValueError(ApproximateUnique._MULTI_VALUE_ERR_MSG % (size, error))
+    elif size is None and error is None:
+      raise ValueError(ApproximateUnique._NO_VALUE_ERR_MSG)
+    elif size is not None:
+      if not isinstance(size, int) or size < 16:
+        raise ValueError(ApproximateUnique._INPUT_SIZE_ERR_MSG % (size))
+      else:
+        return size
+    else:
+      if error < 0.01 or error > 0.5:
+        raise ValueError(ApproximateUnique._INPUT_ERROR_ERR_MSG % (error))
+      else:
+        return ApproximateUnique._get_sample_size_from_est_error(error)
+
+  @staticmethod
+  def _get_sample_size_from_est_error(est_err):
+    """
+    :return: sample size
+
+    Calculate sample size from estimation error
+    """
+    #math.ceil in python2.7 returns a float, while it returns an int in python3.
+    return int(math.ceil(4.0 / math.pow(est_err, 2.0)))
+
+  @typehints.with_input_types(T)
+  @typehints.with_output_types(int)
+  class Globally(PTransform):
+    """ Approximate.Globally approximate number of unique values"""
+
+    def __init__(self, size=None, error=None):
+      self._sample_size = ApproximateUnique.parse_input_params(size, error)
+
+    def expand(self, pcoll):
+      coder = coders.registry.get_coder(pcoll)
+      return pcoll \
+             | 'CountGlobalUniqueValues' \
+             >> (CombineGlobally(ApproximateUniqueCombineFn(self._sample_size,
+                                                            coder)))
+
+  @typehints.with_input_types(typehints.KV[K, V])
+  @typehints.with_output_types(typehints.KV[K, int])
+  class PerKey(PTransform):
+    """ Approximate.PerKey approximate number of unique values per key"""
+
+    def __init__(self, size=None, error=None):
+      self._sample_size = ApproximateUnique.parse_input_params(size, error)
+
+    def expand(self, pcoll):
+      coder = coders.registry.get_coder(pcoll)
+      return pcoll \
+             | 'CountPerKeyUniqueValues' \
+             >> (CombinePerKey(ApproximateUniqueCombineFn(self._sample_size,
+                                                          coder)))
+
+
+class _LargestUnique(object):
+  """
+  An object to keep samples and calculate sample hash space. It is an
+  accumulator of a combine function.
+  """
+  _HASH_SPACE_SIZE = 2.0 * sys.maxsize
+
+  def __init__(self, sample_size):
+    self._sample_size = sample_size
+    self._min_hash = sys.maxsize
+    self._sample_heap = []
+    self._sample_set = set()
+
+  def add(self, element):
+    """
+    :param an element from pcoll.
+    :return: boolean type whether the value is in the heap
+
+    Adds a value to the heap, returning whether the value is (large enough to
+    be) in the heap.
+    """
+    if len(self._sample_heap) >= self._sample_size and element < self._min_hash:
+      return False
+
+    if element not in self._sample_set:
+      self._sample_set.add(element)
+      heapq.heappush(self._sample_heap, element)
+
+      if len(self._sample_heap) > self._sample_size:
+        temp = heapq.heappop(self._sample_heap)
+        self._sample_set.remove(temp)
+        self._min_hash = self._sample_heap[0]
+      elif element < self._min_hash:
+        self._min_hash = element
+
+    return True
+
+  def get_estimate(self):
+    """
+    :return: estimation count of unique values
+
+    If heap size is smaller than sample size, just return heap size.
+    Otherwise, takes into account the possibility of hash collisions,
+    which become more likely than not for 2^32 distinct elements.
+    Note that log(1+x) ~ x for small x, so for sampleSize << maxHash
+    log(1 - sample_size/sample_space) / log(1 - 1/sample_space) ~ sample_size
+    and hence estimate ~ sample_size * hash_space / sample_space
+    as one would expect.
+
+    Given sample_size / sample_space = est / hash_space
+    est = sample_size * hash_space / sample_space
+
+    Given above sample_size approximate,
+    est = log1p(-sample_size/sample_space) / log1p(-1/sample_space)
+      * hash_space / sample_space
+    """
+
+    if len(self._sample_heap) < self._sample_size:
+      return len(self._sample_heap)
+    else:
+      sample_space_size = sys.maxsize - 1.0 * self._min_hash
+      est = (math.log1p(-self._sample_size / sample_space_size)
+             / math.log1p(-1 / sample_space_size)
+             * self._HASH_SPACE_SIZE
+             / sample_space_size)
+
+      return round(est)
+
+
+class ApproximateUniqueCombineFn(CombineFn):
+  """
+  ApproximateUniqueCombineFn computes an estimate of the number of
+  unique values that were combined.
+  """
+
+  def __init__(self, sample_size, coder):
+    self._sample_size = sample_size
+    self._coder = coder
+
+  def create_accumulator(self, *args, **kwargs):
+    return _LargestUnique(self._sample_size)
+
+  def add_input(self, accumulator, element, *args, **kwargs):
+    try:
+      accumulator.add(mmh3.hash64(self._coder.encode(element))[1])
+      return accumulator
+    except Exception as e:
+      raise RuntimeError("Runtime exception: %s", e)
+
+  # created an issue https://issues.apache.org/jira/browse/BEAM-7285 to speep up
+  # merge process.
+  def merge_accumulators(self, accumulators, *args, **kwargs):
+    merged_accumulator = self.create_accumulator()
+    for accumulator in accumulators:
+      for i in accumulator._sample_heap:
+        merged_accumulator.add(i)
+
+    return merged_accumulator
+
+  @staticmethod
+  def extract_output(accumulator):
+    return accumulator.get_estimate()
+
+  def display_data(self):
+    return {'sample_size': self._sample_size}
diff --git a/sdks/python/apache_beam/transforms/stats_test.py b/sdks/python/apache_beam/transforms/stats_test.py
new file mode 100644
index 0000000..0a6003a
--- /dev/null
+++ b/sdks/python/apache_beam/transforms/stats_test.py
@@ -0,0 +1,376 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from __future__ import absolute_import
+from __future__ import division
+
+import math
+import random
+import unittest
+from collections import defaultdict
+
+import numpy as np
+
+import apache_beam as beam
+from apache_beam.testing.test_pipeline import TestPipeline
+from apache_beam.testing.util import assert_that
+from apache_beam.testing.util import equal_to
+
+
+class ApproximateUniqueTest(unittest.TestCase):
+  """Unit tests for ApproximateUnique.Globally and ApproximateUnique.PerKey."""
+
+  def test_approximate_unique_global_by_invalid_size(self):
+    # test if the transformation throws an error as expected with an invalid
+    # small input size (< 16).
+    sample_size = 10
+    test_input = [random.randint(0, 1000) for _ in range(100)]
+
+    with self.assertRaises(ValueError) as e:
+      pipeline = TestPipeline()
+      _ = (pipeline
+           | 'create'
+           >> beam.Create(test_input)
+           | 'get_estimate'
+           >> beam.ApproximateUnique.Globally(size=sample_size))
+      pipeline.run()
+
+    expected_msg = beam.ApproximateUnique._INPUT_SIZE_ERR_MSG % (sample_size)
+
+    assert e.exception.args[0] == expected_msg
+
+  def test_approximate_unique_global_by_invalid_type_size(self):
+    # test if the transformation throws an error as expected with an invalid
+    # type of input size (not int).
+    sample_size = 100.0
+    test_input = [random.randint(0, 1000) for _ in range(100)]
+
+    with self.assertRaises(ValueError) as e:
+      pipeline = TestPipeline()
+      _ = (pipeline
+           | 'create' >> beam.Create(test_input)
+           | 'get_estimate'
+           >> beam.ApproximateUnique.Globally(size=sample_size))
+      pipeline.run()
+
+    expected_msg = beam.ApproximateUnique._INPUT_SIZE_ERR_MSG % (sample_size)
+
+    assert e.exception.args[0] == expected_msg
+
+  def test_approximate_unique_global_by_invalid_small_error(self):
+    # test if the transformation throws an error as expected with an invalid
+    # small input error (< 0.01).
+    est_err = 0.0
+    test_input = [random.randint(0, 1000) for _ in range(100)]
+
+    with self.assertRaises(ValueError) as e:
+      pipeline = TestPipeline()
+      _ = (pipeline
+           | 'create' >> beam.Create(test_input)
+           | 'get_estimate'
+           >> beam.ApproximateUnique.Globally(error=est_err))
+      pipeline.run()
+
+    expected_msg = beam.ApproximateUnique._INPUT_ERROR_ERR_MSG % (est_err)
+
+    assert e.exception.args[0] == expected_msg
+
+  def test_approximate_unique_global_by_invalid_big_error(self):
+    # test if the transformation throws an error as expected with an invalid
+    # big input error (> 0.50).
+    est_err = 0.6
+    test_input = [random.randint(0, 1000) for _ in range(100)]
+
+    with self.assertRaises(ValueError) as e:
+      pipeline = TestPipeline()
+      _ = (pipeline
+           | 'create' >> beam.Create(test_input)
+           | 'get_estimate'
+           >> beam.ApproximateUnique.Globally(error=est_err))
+      pipeline.run()
+
+    expected_msg = beam.ApproximateUnique._INPUT_ERROR_ERR_MSG % (est_err)
+
+    assert e.exception.args[0] == expected_msg
+
+  def test_approximate_unique_global_by_invalid_no_input(self):
+    # test if the transformation throws an error as expected with no input.
+    test_input = [random.randint(0, 1000) for _ in range(100)]
+
+    with self.assertRaises(ValueError) as e:
+      pipeline = TestPipeline()
+      _ = (pipeline
+           | 'create' >> beam.Create(test_input)
+           | 'get_estimate'
+           >> beam.ApproximateUnique.Globally())
+      pipeline.run()
+
+    expected_msg = beam.ApproximateUnique._NO_VALUE_ERR_MSG
+    assert e.exception.args[0] == expected_msg
+
+  def test_approximate_unique_global_by_invalid_both_input(self):
+    # test if the transformation throws an error as expected with multi input.
+    test_input = [random.randint(0, 1000) for _ in range(100)]
+    est_err = 0.2
+    sample_size = 30
+
+    with self.assertRaises(ValueError) as e:
+      pipeline = TestPipeline()
+      _ = (pipeline
+           | 'create' >> beam.Create(test_input)
+           | 'get_estimate'
+           >> beam.ApproximateUnique.Globally(size=sample_size, error=est_err))
+      pipeline.run()
+
+    expected_msg = beam.ApproximateUnique._MULTI_VALUE_ERR_MSG % (
+        sample_size, est_err)
+
+    assert e.exception.args[0] == expected_msg
+
+  def test_get_sample_size_from_est_error(self):
+    # test if get correct sample size from input error.
+    assert beam.ApproximateUnique._get_sample_size_from_est_error(0.5) == 16
+    assert beam.ApproximateUnique._get_sample_size_from_est_error(0.4) == 25
+    assert beam.ApproximateUnique._get_sample_size_from_est_error(0.2) == 100
+    assert beam.ApproximateUnique._get_sample_size_from_est_error(0.1) == 400
+    assert beam.ApproximateUnique._get_sample_size_from_est_error(0.05) == 1600
+    assert beam.ApproximateUnique._get_sample_size_from_est_error(0.01) == 40000
+
+  def test_approximate_unique_global_by_sample_size(self):
+    # test if estimation error with a given sample size is not greater than
+    # expected max error (sample size = 50% of population).
+    sample_size = 50
+    max_err = 2 / math.sqrt(sample_size)
+    test_input = [random.randint(0, 1000) for _ in range(100)]
+    actual_count = len(set(test_input))
+
+    pipeline = TestPipeline()
+    result = (pipeline
+              | 'create' >> beam.Create(test_input)
+              | 'get_estimate'
+              >> beam.ApproximateUnique.Globally(size=sample_size)
+              | 'compare'
+              >> beam.FlatMap(lambda x: [abs(x - actual_count) * 1.0
+                                         / actual_count <= max_err]))
+
+    assert_that(result, equal_to([True]),
+                label='assert:global_by_size')
+    pipeline.run()
+
+  def test_approximate_unique_global_by_sample_size_with_duplicates(self):
+    # test if estimation error with a given sample size is not greater than
+    # expected max error with duplicated input.
+    sample_size = 30
+    max_err = 2 / math.sqrt(sample_size)
+    test_input = [10] * 50 + [20] * 50
+    actual_count = len(set(test_input))
+
+    pipeline = TestPipeline()
+    result = (pipeline
+              | 'create' >> beam.Create(test_input)
+              | 'get_estimate'
+              >> beam.ApproximateUnique.Globally(size=sample_size)
+              | 'compare'
+              >> beam.FlatMap(lambda x: [abs(x - actual_count) * 1.0
+                                         / actual_count <= max_err]))
+
+    assert_that(result, equal_to([True]),
+                label='assert:global_by_size_with_duplicates')
+    pipeline.run()
+
+  def test_approximate_unique_global_by_sample_size_with_small_population(self):
+    # test if estimation is exactly same to actual value when sample size is
+    # not smaller than population size (sample size > 100% of population).
+    sample_size = 31
+    test_input = [random.randint(0, 1000) for _ in range(30)]
+    actual_count = len(set(test_input))
+
+    pipeline = TestPipeline()
+    result = (pipeline
+              | 'create' >> beam.Create(test_input)
+              | 'get_estimate'
+              >> beam.ApproximateUnique.Globally(size=sample_size))
+
+    assert_that(result, equal_to([actual_count]),
+                label='assert:global_by_sample_size_with_small_population')
+    pipeline.run()
+
+  def test_approximate_unique_global_by_sample_size_with_big_population(self):
+    # test if estimation error is smaller than expected max error with a small
+    # sample and a big population (sample size = 1% of population).
+    sample_size = 100
+    max_err = 2 / math.sqrt(sample_size)
+    test_input = [random.randint(0, 1000) for _ in range(10000)]
+    actual_count = len(set(test_input))
+
+    pipeline = TestPipeline()
+    result = (pipeline
+              | 'create' >> beam.Create(test_input)
+              | 'get_estimate'
+              >> beam.ApproximateUnique.Globally(size=sample_size)
+              | 'compare'
+              >> beam.FlatMap(lambda x: [abs(x - actual_count) * 1.0
+                                         / actual_count <= max_err]))
+
+    assert_that(result, equal_to([True]),
+                label='assert:global_by_sample_size_with_big_population')
+    pipeline.run()
+
+  def test_approximate_unique_global_by_error(self):
+    # test if estimation error from input error is not greater than input error.
+    est_err = 0.3
+    test_input = [random.randint(0, 1000) for _ in range(100)]
+    actual_count = len(set(test_input))
+
+    pipeline = TestPipeline()
+    result = (pipeline
+              | 'create' >> beam.Create(test_input)
+              | 'get_estimate'
+              >> beam.ApproximateUnique.Globally(error=est_err)
+              | 'compare'
+              >> beam.FlatMap(lambda x: [abs(x - actual_count) * 1.0
+                                         / actual_count <= est_err]))
+
+    assert_that(result, equal_to([True]),
+                label='assert:global_by_error')
+    pipeline.run()
+
+  def test_approximate_unique_global_by_error_with_samll_population(self):
+    # test if estimation error from input error of a small dataset is not
+    # greater than input error. Sample size is always not smaller than 16, so
+    # when population size is smaller than 16, estimation should be exactly
+    # same to actual value.
+    est_err = 0.01
+    test_input = [random.randint(0, 1000) for _ in range(15)]
+    actual_count = len(set(test_input))
+
+    pipeline = TestPipeline()
+    result = (pipeline
+              | 'create' >> beam.Create(test_input)
+              | 'get_estimate'
+              >> beam.ApproximateUnique.Globally(error=est_err))
+
+    assert_that(result, equal_to([actual_count]),
+                label='assert:global_by_error_with_samll_population')
+    pipeline.run()
+
+  def test_approximate_unique_global_by_error_with_big_population(self):
+    # test if estimation error from input error is with in expected range with
+    # a big population.
+    est_err = 0.2
+    test_input = [random.randint(0, 1000) for _ in range(10000)]
+    actual_count = len(set(test_input))
+
+    pipeline = TestPipeline()
+    result = (pipeline
+              | 'create' >> beam.Create(test_input)
+              | 'get_estimate'
+              >> beam.ApproximateUnique.Globally(error=est_err)
+              | 'compare'
+              >> beam.FlatMap(lambda x: [abs(x - actual_count) * 1.0
+                                         / actual_count <= est_err]))
+
+    assert_that(result, equal_to([True]),
+                label='assert:global_by_error_with_big_population')
+    pipeline.run()
+
+  def test_approximate_unique_perkey_by_size(self):
+    # test if est error per key from sample size is in a expected range.
+    sample_size = 50
+    max_err = 2 / math.sqrt(sample_size)
+    number_of_keys = 10
+    test_input = [(random.randint(1, number_of_keys), random.randint(0, 1000))
+                  for _ in range(100)]
+    actual_count_dict = defaultdict(set)
+    for (x, y) in test_input:
+      actual_count_dict[x].add(y)
+
+    pipeline = TestPipeline()
+    result = (pipeline
+              | 'create' >> beam.Create(test_input)
+              | 'get_estimate'
+              >> beam.ApproximateUnique.PerKey(size=sample_size)
+              | 'compare'
+              >> beam.FlatMap(lambda x: [abs(x[1]
+                                             - len(actual_count_dict[x[0]]))
+                                         * 1.0 / len(actual_count_dict[x[0]])
+                                         <= max_err]))
+
+    assert_that(result, equal_to([True] * number_of_keys),
+                label='assert:perkey_by_size')
+    pipeline.run()
+
+  def test_approximate_unique_perkey_by_error(self):
+    # test if estimation error per key from input err is in the expected range.
+    est_err = 0.01
+    number_of_keys = 10
+    test_input = [(random.randint(1, number_of_keys), random.randint(0, 1000))
+                  for _ in range(100)]
+    actual_count_dict = defaultdict(set)
+    for (x, y) in test_input:
+      actual_count_dict[x].add(y)
+
+    pipeline = TestPipeline()
+    result = (pipeline
+              | 'create' >> beam.Create(test_input)
+              | 'get_estimate'
+              >> beam.ApproximateUnique.PerKey(error=est_err)
+              | 'compare'
+              >> beam.FlatMap(lambda x: [abs(x[1]
+                                             - len(actual_count_dict[x[0]]))
+                                         * 1.0 / len(actual_count_dict[x[0]])
+                                         <= est_err]))
+
+    assert_that(result, equal_to([True] * number_of_keys),
+                label='assert:perkey_by_error')
+    pipeline.run()
+
+  def test_approximate_unique_globally_by_error_with_skewed_data(self):
+    # test if estimation error is within the expected range with skewed data.
+    est_err = 0.01
+
+    # generate skewed dataset
+    values = [i for i in range(200)]
+    probs = [1.0 / 200] * 200
+
+    for idx, _ in enumerate(probs):
+      if idx > 3 and idx < 20:
+        probs[idx] = probs[idx] * (1 + math.log(idx + 1))
+      if idx > 20 and idx < 40:
+        probs[idx] = probs[idx] * (1 + math.log((40 - idx) + 1))
+
+    probs = [p / sum(probs) for p in probs]
+    test_input = np.random.choice(values, 1000, p=probs)
+    actual_count = len(set(test_input))
+
+    pipeline = TestPipeline()
+    result = (pipeline
+              | 'create' >> beam.Create(test_input)
+              | 'get_estimate'
+              >> beam.ApproximateUnique.Globally(error=est_err)
+              | 'compare'
+              >> beam.FlatMap(lambda x: [abs(x - actual_count) * 1.0
+                                         / actual_count <= est_err]))
+
+    assert_that(result, equal_to([True]),
+                label='assert:globally_by_error_with_skewed_data')
+    pipeline.run()
+
+
+if __name__ == '__main__':
+  unittest.main()
diff --git a/sdks/python/setup.py b/sdks/python/setup.py
index 6ef9b7f..e1d18f6 100644
--- a/sdks/python/setup.py
+++ b/sdks/python/setup.py
@@ -125,6 +125,7 @@ REQUIRED_PACKAGES = [
     'pyvcf>=0.6.8,<0.7.0; python_version < "3.0"',
     'pyyaml>=3.12,<4.0.0',
     'typing>=3.6.0,<3.7.0; python_version < "3.5.0"',
+    'mmh3>=2.5.1,<2.5.2',
     ]
 
 REQUIRED_TEST_PACKAGES = [