You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2020/10/29 15:45:52 UTC

[GitHub] [beam] iindyk commented on a change in pull request #13175: Adding performance improvements to ApproximateQuantiles.

iindyk commented on a change in pull request #13175:
URL: https://github.com/apache/beam/pull/13175#discussion_r514363995



##########
File path: sdks/python/apache_beam/transforms/stats.py
##########
@@ -61,30 +58,34 @@
 K = typing.TypeVar('K')
 V = typing.TypeVar('V')
 
+try:
+  import mmh3  # pylint: disable=import-error
 
-def _get_default_hash_fn():
-  """Returns either murmurhash or md5 based on installation."""
-  try:
-    import mmh3  # pylint: disable=import-error
+  def _mmh3_hash(value):
+    # mmh3.hash64 returns two 64-bit unsigned integers
+    return mmh3.hash64(value, seed=0, signed=False)[0]
+
+  _default_hash_fn = _mmh3_hash
+  _default_hash_fn_type = 'mmh3'
+except ImportError:
 
-    def _mmh3_hash(value):
-      # mmh3.hash64 returns two 64-bit unsigned integers
-      return mmh3.hash64(value, seed=0, signed=False)[0]
+  def _md5_hash(value):
+    # md5 is a 128-bit hash, so we truncate the hexdigest (string of 32
+    # hexadecimal digits) to 16 digits and convert to int to get the 64-bit
+    # integer fingerprint.
+    return int(hashlib.md5(value).hexdigest()[:16], 16)
 
-    return _mmh3_hash
+  _default_hash_fn = _md5_hash
+  _default_hash_fn_type = 'md5'
 
-  except ImportError:
+
+def _get_default_hash_fn():
+  """Returns either murmurhash or md5 based on installation."""
+  if _default_hash_fn_type == 'md5':
     logging.warning(
         'Couldn\'t find murmurhash. Install mmh3 for a faster implementation of'

Review comment:
       Not sure if it's still maintained, last release happened 3 years ago. I'm not aware of any other downsides, wdyt?
   
    https://github.com/hajimes/mmh3

##########
File path: sdks/python/apache_beam/transforms/stats.py
##########
@@ -368,82 +383,129 @@ class PerKey(PTransform):
       weighted: (optional) if set to True, the transform returns weighted
         quantiles. The input PCollection is then expected to contain tuples of
         input values with the corresponding weight.
+      batch_input: (optional) if set to True, the transform expects each element
+        of input PCollection to be a batch. Provides a way to accumulate
+        multiple elements at a time more efficiently.
     """
-    def __init__(self, num_quantiles, key=None, reverse=False, weighted=False):
+    def __init__(
+        self,
+        num_quantiles,
+        key=None,
+        reverse=False,
+        weighted=False,
+        batch_input=False):
       self._num_quantiles = num_quantiles
       self._key = key
       self._reverse = reverse
       self._weighted = weighted
+      self._batch_input = batch_input
 
     def expand(self, pcoll):
       return pcoll | CombinePerKey(
           ApproximateQuantilesCombineFn.create(
               num_quantiles=self._num_quantiles,
               key=self._key,
               reverse=self._reverse,
-              weighted=self._weighted))
+              weighted=self._weighted,
+              batch_input=self._batch_input))
 
     def display_data(self):
       return ApproximateQuantiles._display_data(
           num_quantiles=self._num_quantiles,
           key=self._key,
           reverse=self._reverse,
-          weighted=self._weighted)
+          weighted=self._weighted,
+          batch_input=self._batch_input)
+
+
+class _QuantileSpec(object):
+  """Quantiles computation specifications."""
+  def __init__(self, buffer_size, num_buffers, weighted, key, reverse):
+    # type: (int, int, bool, Any, bool) -> None
+    self.buffer_size = buffer_size
+    self.num_buffers = num_buffers
+    self.weighted = weighted
+    self.key = key
+    self.reverse = reverse
+
+    # Used to sort tuples of values and weights.
+    self.weighted_key = None if key is None else (lambda x: key(x[0]))
+
+    # Used to compare values.
+    if key is None and not reverse:

Review comment:
       Done.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org