You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by yhuai <gi...@git.apache.org> on 2015/10/14 23:15:25 UTC

[GitHub] spark pull request: [SPARK-9741][SQL] Approximate Count Distinct u...

Github user yhuai commented on a diff in the pull request:

    https://github.com/apache/spark/pull/8362#discussion_r42054243
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/functions.scala ---
    @@ -302,3 +307,397 @@ case class Sum(child: Expression) extends AlgebraicAggregate {
     
       override val evaluateExpression = Cast(currentSum, resultType)
     }
    +
    +// scalastyle:off
    +/**
    + * HyperLogLog++ (HLL++) is a state of the art cardinality estimation algorithm. This class
    + * implements the dense version of the HLL++ algorithm as an Aggregate Function.
    + *
    + * This implementation has been based on the following papers:
    + * HyperLogLog: the analysis of a near-optimal cardinality estimation algorithm
    + * http://algo.inria.fr/flajolet/Publications/FlFuGaMe07.pdf
    + *
    + * HyperLogLog in Practice: Algorithmic Engineering of a State of The Art Cardinality Estimation
    + * Algorithm
    + * http://static.googleusercontent.com/external_content/untrusted_dlcp/research.google.com/en/us/pubs/archive/40671.pdf
    + *
    + * Appendix to HyperLogLog in Practice: Algorithmic Engineering of a State of the Art Cardinality
    + * Estimation Algorithm
    + * https://docs.google.com/document/d/1gyjfMHy43U9OWBXxfaeG-3MjGzejW1dlpyMwEYAAWEI/view?fullscreen#
    + *
    + * @param child to estimate the cardinality of.
    + * @param relativeSD the maximum estimation error allowed.
    + */
    +// scalastyle:on
    +case class HyperLogLogPlusPlus(child: Expression, relativeSD: Double = 0.05)
    +    extends AggregateFunction2 {
    +  import HyperLogLogPlusPlus._
    +
    +  /**
    +   * HLL++ uses 'p' bits for addressing. The more addressing bits we use, the more precise the
    +   * algorithm will be, and the more memory it will require. The 'p' value is based on the relative
    +   * error requested.
    +   *
    +   * HLL++ requires that we use at least 4 bits of addressing space (a minimum precision of 27%).
    +   *
    +   * This method rounds up to the nearest integer. This means that the error is always equal to or
    +   * lower than the requested error. Use the <code>trueRsd</code> method to get the actual RSD
    +   * value.
    +   */
    +  private[this] val p = Math.ceil(2.0d * Math.log(1.106d / relativeSD) / Math.log(2.0d)).toInt
    +
    +  require(p >= 4, "HLL++ requires at least 4 bits for addressing. " +
    +    "Use a lower error, at most 27%.")
    +
    +  /**
    +   * Shift used to extract the index of the register from the hashed value.
    +   *
    +   * This assumes the use of 64-bit hashcodes.
    +   */
    +  private[this] val idxShift = JLong.SIZE - p
    +
    +  /**
    +   * Value to pad the 'w' value with before the number of leading zeros is determined.
    +   */
    +  private[this] val wPadding = 1L << (p - 1)
    +
    +  /**
    +   * The number of registers used.
    +   */
    +  private[this] val m = 1 << p
    +
    +  /**
    +   * The pre-calculated combination of: alpha * m * m
    +   *
    +   * 'alpha' corrects the raw cardinality estimate 'Z'. See the FlFuGaMe07 paper for its
    +   * derivation.
    +   */
    +  private[this] val alphaM2 = p match {
    +    case 4 => 0.673d * m * m
    +    case 5 => 0.697d * m * m
    +    case 6 => 0.709d * m * m
    +    case _ => (0.7213d / (1.0d + 1.079d / m)) * m * m
    +  }
    +
    +  /**
    +   * The number of words used to store the registers. We use Longs for storage because this is the
    +   * most compact way of storage; Spark aligns to 8-byte words or uses Long wrappers.
    +   *
    +   * We only store whole registers per word in order to prevent overly complex bitwise operations.
    +   * In practice this means we only use 60 out of 64 bits.
    +   */
    +  private[this] val numWords = m / REGISTERS_PER_WORD + 1
    +
    +  def children: Seq[Expression] = Seq(child)
    +
    +  def nullable: Boolean = false
    +
    +  def dataType: DataType = LongType
    +
    +  def inputTypes: Seq[AbstractDataType] = Seq(AnyDataType)
    +
    +  def bufferSchema: StructType = StructType.fromAttributes(bufferAttributes)
    +
    +  def cloneBufferAttributes: Seq[Attribute] = bufferAttributes.map(_.newInstance())
    +
    +  /** Allocate enough words to store all registers. */
    +  val bufferAttributes: Seq[AttributeReference] = Seq.tabulate(numWords) { i =>
    +    AttributeReference(s"MS[$i]", LongType)()
    +  }
    +
    +  /** Fill all words with zeros. */
    +  def initialize(buffer: MutableRow): Unit = {
    +    var word = 0
    +    while (word < numWords) {
    +      buffer.setLong(mutableBufferOffset + word, 0)
    +      word += 1
    +    }
    +  }
    +
    +  /**
    +   * Update the HLL++ buffer.
    +   *
    +   * Variable names in the HLL++ paper match variable names in the code.
    +   */
    +  def update(buffer: MutableRow, input: InternalRow): Unit = {
    +    val v = child.eval(input)
    +    if (v != null) {
    +      // Create the hashed value 'x'.
    +      val x = MurmurHash.hash64(v)
    --- End diff --
    
    @hvanhovell Does HLL++ require using `hash64`? I took a look at the implementation of it. Looks we will convert it to string in many cases (https://github.com/addthis/stream-lib/blob/master/src/main/java/com/clearspring/analytics/hash/MurmurHash.java#L135-L159). For our old function, the offer method of HyperLogLog class use `hash` internally, which has some specializations.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org