You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@datasketches.apache.org by GitBox <gi...@apache.org> on 2021/08/09 20:47:23 UTC

[GitHub] [datasketches-cpp] jmalkin commented on a change in pull request #230: Added count sketch

jmalkin commented on a change in pull request #230:
URL: https://github.com/apache/datasketches-cpp/pull/230#discussion_r685509608



##########
File path: python/src/count_sketch.py
##########
@@ -0,0 +1,240 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import numpy as np
+from random import randint, seed
+from streaming_heap import StreamingHeap
+
+
+class CountSketch:
+    def __init__(self,  num_buckets: int, num_levels: int, phi, rng_seed=100):
+        """
+        # TODO: add string update functionality.
+        :param num_buckets:int -- number of columns in the sketch table
+        :param num_levels:int -- number of rows in the sketch table
+        :param phi:float -- The threshold for heavy hitters
+        :param rng_seed:int -- seed for the randomisation
+
+        Attributes:
+        - self.p  :: int - a large prime used to generate the random hashes.
+        - self.w :: int - the number of hash buckets for the table
+        - self.d :: int - the number of levels that are repeated
+        - self.table :: np.ndarray of type float that is the table that is updated on viewing data.
+
+
+        HELPFUL SOURCES:
+        http://web.stanford.edu/class/archive/cs/cs166/cs166.1166/lectures/11/Small11.pdf
+        http://web.stanford.edu/class/archive/cs/cs166/cs166.1146/lectures/12/Small12.pdf
+        https://people.cs.umass.edu/~mcgregor/711S12/sketches1.pdf
+        https://cs.au.dk/~gerth/advising/thesis/jonas-nicolai-hovmand_morten-houmoeller-nygaard.pdf
+
+        Denote F2 = ||f||_2 where f is the frequency vector underlying the data stream.
+        eg. if stream = (1, 1, 2, 1, 5) then f = (0, 3, 1, 0, 0, 1).
+        If the stream is weighted i.e. we receive (item, weight) pairs, then the same idea applies:
+        stream = [(1, 4), (2, 1) (1, -1), (5, 1)] also has f = (0, 3, 1, 0, 0, 1) -- nb using 0-based indexing for
+        consistency with python.
+
+        The a-heavy hitters problem is to identify all items i for which f[i] > a*F2
+        Solving this problem requires linear space in the worst-case so a relaxed version is solved: to return
+        b-heavy hitters for b = a - t and t > 0.
+        Hence, we permit some _false positives_ : items that are flagged as being b-heavy, but may not be a-heavy.
+
+        A CountSketch can achieve this aim in small space by maintaining a sketch that is used to estimate f[i].
+        Specifically, the CountSketch returns an estimate g[i] for which
+        f[i] - epsilon*F2 <= g[i] <= f[i] + epsilon*F2.
+        The value epsilon is between 0 and 1 and is the worst-case error for estimating the frequency f[i] using the
+        value g[i].
+        epsilon can be explicitly calculated using parameters of the sketch self.w and self.d as seen in the function
+        def get_epsilon(self).
+
+        We return a set i of heavy indices by using the sketch and finding the (relaxed) b-heavy hitters.
+        The CountSketch guarantee ensures f[i] - epsilon*F2 <= g[i] so we will opt to find all i such that:
+        g[i] >= (b - epsilon)*F2 or equivalently g[i] >= ( (a - t) - epsilon)*F2.
+
+        We *set* phi ( = b - t ) and do not use the (a - t) setup and epsilon is explicitly known.
+        Note that there are two sources of approximation:
+         - the ``t'' is the heavy hitter approximate relaxation parameter
+         - the ``epsilon'' is the frequency estimation parameter.
+        """
+        seed(rng_seed)
+        self.p = 2 ** 31 - 1
+        self.w = num_buckets
+        self.d = num_levels
+        self.table = np.zeros((self.d, self.w), dtype=float)
+        self._init_hashes()
+        self.phi = phi
+        assert(self.phi <= 1.0), f"Phi={self.phi:.5f} but cannot be larger than 1."
+        self.total_weight = 0.0  # Sum of all weights seen and is aka the L1 norm of the underlying frequency vector.
+        self.max_num_items = np.ceil(1./self.phi).astype(np.int64)
+        self.heavy_hitter_detector = StreamingHeap(self.max_num_items)
+        self.merged_sketches = []  # A list of all sketches that have been merged with self.
+
+    def _init_hashes(self):
+        """
+        Initialises the hash functions for bucket and sign selection by generating (a,b) pairs for the hash family.
+        A new (a,b) pair is necessary for each of the hash functions
+        """
+        self.bucket_hash_params = {i: self._get_ab_hash() for i in range(self.d)}
+        self.sign_hash_params = {i: self._get_ab_hash() for i in range(self.d)}
+
+    def _get_ab_hash(self):
+        """
+        # TODO - check the nonzero property on a is correct
+        We need 0 <= a <= self.p - 1 and 0 <= b <= self.p - 1 as discussed on page 18
+        of https://people.cs.umass.edu/~mcgregor/711S12/sketches1.pdf
+        :return: the (a,b) pair used to define the hash family
+        """
+        return randint(0, self.p - 1), randint(0, self.p - 1)  # random (a,b) pairs from ([p], [p])
+
+    def _bucket_hash(self, x: int, a: int, b: int, buckets: int):
+        """
+        Generic function for generating 2-wise independent hash functions.
+        We use this function for the bucket locations with buckets <- self.num_buckets.
+        It is also used to generate the random signs with buckets <- 2 -- see `` def _sign_hash ''
+
+        :param x: stream item
+        :param a:
+        :param b:
+        :param buckets:
+        :return: h:int the hash value (aka bucket index) for item x observed in the stream.
+        """
+        h = (a * x + b) % self.p
+        h = h % buckets
+        return h
+
+    def _sign_hash(self, x: int, a: int, b: int):
+        """
+        Returns the 2-wise independent sign hash.
+        This generates the signs when items are put into buckets.
+        :param x:stream item
+        :param a:
+        :param b:
+        :return:s -- int the sign +1 or -1 used for the hashing.
+        """
+        s = 2.*self._bucket_hash(x, a, b, 2) - 1.
+        return s
+
+    def _insert(self, item: int, weight=1.0):
+        """
+        Inserts the item into the sketch table
+        :param item:
+        :param weight:
+        """
+        if not (isinstance(item, np.integer) or isinstance(item, int)):
+            # this checks for ``int`` and ``np.int*` for any input item
+            raise TypeError("Input item must be an int.")
+        for ii in range(self.d):
+            a_bucket, b_bucket = self.bucket_hash_params[ii]  # Gets the (a,b) pair used for *bucket* hashes at level ii
+            a_sign, b_sign = self.sign_hash_params[ii]  # Gets the (a,b) pair used for *sign* hashes at level ii
+            bucket = self._bucket_hash(item, a_bucket, b_bucket, self.w)
+            sign = self._sign_hash(item, a_sign, b_sign)
+            self.table[ii, bucket] += sign*weight
+
+    def update(self, item: int, weight=1.0):
+        """
+        Updates the sketch by:
+         1. Inserting (item, weight) into self.table
+         2. Estimating the current frequency and pushing it into the heavy_hitter_detector
+         (nb. item only pushed into the detector if it has large enough frequency.
+         This logic is dealt with in the detector class itself.)
+        :param item:int
+        :param weight:float
+        """
+        self.total_weight += weight
+        self._insert(item, weight)
+        self.heavy_hitter_detector.push(item, self.get_estimate(item))
+
+    def get_epsilon(self):
+        """
+        :return: an estimate of the worst-case epsilon error achieved by the sketch for frequency estimation.
+        """
+        return np.sqrt(np.e / self.w)
+
+    def get_estimate(self, item: int):
+        """
+        :param item:int -- the identifier of the item whose frequency is being queried.
+        :return:np.median(_estimates) -- The count sketch frequency estimate.
+        """
+        _estimates = self._get_frequency_estimate(item)
+        for sk in self.merged_sketches:
+            _estimates += sk._get_frequency_estimate(item)
+        return np.median(_estimates)  # TODO - replace this with np.max(0.0, np.median(_estimate))?
+
+    def _get_frequency_estimate(self, item: int):
+        """
+        :param item:
+        :return: _estimates -- an array of the frequency estimate where each entry corresponds to a level of the sketch.
+        """
+        _buckets = {_: self._bucket_hash(item, self.bucket_hash_params[_][0], self.bucket_hash_params[_][1], self.w)
+                    for _ in range(self.d)}
+        _signs = {_: self._sign_hash(item, self.sign_hash_params[_][0], self.sign_hash_params[_][1])
+                  for _ in range(self.d)}
+        _estimates = np.array([self.table[_, _buckets[_]] * _signs[_] for _ in range(self.d)])
+        return _estimates
+
+    def get_frequent_items(self):
+        """
+        :return: a list of the heavy hitters and their frequencies.
+        """
+        for k in self.heavy_hitter_detector.heap.keys():
+            self.heavy_hitter_detector.heap[k] = self.get_estimate(k)
+        return list(self.heavy_hitter_detector.heap.items())
+
+    def get_total_weight(self):
+        """
+        Returns the total weight inserted on the stream.
+        Is equivalent to the mass inserted over the stream, or the ell_1 norm of the
+        underlying frequency vector.
+        """
+        return self.total_weight
+
+    def is_empty(self):
+        """
+        Returns True if the sketch self.table is empty and is False otherwise.

Review comment:
       There's a valid philosophical debate over whether adding and then subtracting an item means a sketch is truly empty. It might seem trivial with a single item, but imagine merging 2 sketches that were created with the same items but opposite weights -- if we treat it as empty, we lose the information that it has ingested data.
   
   Certainly open to the idea that we don't care about a distinction, but it's worth thinking about what we want to capture by "empty" 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@datasketches.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@datasketches.apache.org
For additional commands, e-mail: commits-help@datasketches.apache.org