You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by vi...@apache.org on 2018/07/19 19:55:47 UTC

[48/51] [partial] hive git commit: HIVE-20188 : Split server-specific code outside of standalone metastore-common (Alexander Kolbasov reviewed by Vihang Karajgaonkar)

http://git-wip-us.apache.org/repos/asf/hive/blob/081fa368/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/common/ndv/hll/HLLDenseRegister.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/common/ndv/hll/HLLDenseRegister.java b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/common/ndv/hll/HLLDenseRegister.java
deleted file mode 100644
index 422bfbe..0000000
--- a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/common/ndv/hll/HLLDenseRegister.java
+++ /dev/null
@@ -1,202 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hive.common.ndv.hll;
-
-import java.util.Arrays;
-
-public class HLLDenseRegister implements HLLRegister {
-
-  // 2^p number of bytes for register
-  private byte[] register;
-
-  // max value stored in registered is cached to determine the bit width for
-  // bit packing
-  private int maxRegisterValue;
-
-  // number of register bits
-  private int p;
-
-  // m = 2^p
-  private int m;
-
-  public HLLDenseRegister(int p) {
-    this(p, true);
-  }
-
-  public HLLDenseRegister(int p, boolean bitPack) {
-    this.p = p;
-    this.m = 1 << p;
-    this.register = new byte[m];
-    this.maxRegisterValue = 0;
-    if (bitPack == false) {
-      this.maxRegisterValue = 0xff;
-    }
-  }
-
-  public boolean add(long hashcode) {
-
-    // LSB p bits
-    final int registerIdx = (int) (hashcode & (m - 1));
-
-    // MSB 64 - p bits
-    final long w = hashcode >>> p;
-
-    // longest run of trailing zeroes
-    final int lr = Long.numberOfTrailingZeros(w) + 1;
-    return set(registerIdx, (byte) lr);
-  }
-
-  // this is a lossy invert of the function above, which produces a hashcode
-  // which collides with the current winner of the register (we lose all higher 
-  // bits, but we get all bits useful for lesser p-bit options)
-
-  // +-------------|-------------+
-  // |xxxx100000000|1000000000000|  (lr=9 + idx=1024)
-  // +-------------|-------------+
-  //                \
-  // +---------------|-----------+
-  // |xxxx10000000010|00000000000|  (lr=2 + idx=0)
-  // +---------------|-----------+
-
-  // This shows the relevant bits of the original hash value
-  // and how the conversion is moving bits from the index value
-  // over to the leading zero computation
-
-  public void extractLowBitsTo(HLLRegister dest) {
-    for (int idx = 0; idx < register.length; idx++) {
-      byte lr = register[idx]; // this can be a max of 65, never > 127
-      if (lr != 0) {
-        dest.add((long) ((1 << (p + lr - 1)) | idx));
-      }
-    }
-  }
-
-  public boolean set(int idx, byte value) {
-    boolean updated = false;
-    if (idx < register.length && value > register[idx]) {
-
-      // update max register value
-      if (value > maxRegisterValue) {
-        maxRegisterValue = value;
-      }
-
-      // set register value and compute inverse pow of 2 for register value
-      register[idx] = value;
-
-      updated = true;
-    }
-    return updated;
-  }
-
-  public int size() {
-    return register.length;
-  }
-
-  public int getNumZeroes() {
-    int numZeroes = 0;
-    for (byte b : register) {
-      if (b == 0) {
-        numZeroes++;
-      }
-    }
-    return numZeroes;
-  }
-
-  public void merge(HLLRegister hllRegister) {
-    if (hllRegister instanceof HLLDenseRegister) {
-      HLLDenseRegister hdr = (HLLDenseRegister) hllRegister;
-      byte[] inRegister = hdr.getRegister();
-
-      // merge only if the register length matches
-      if (register.length != inRegister.length) {
-        throw new IllegalArgumentException(
-            "The size of register sets of HyperLogLogs to be merged does not match.");
-      }
-
-      // compare register values and store the max register value
-      for (int i = 0; i < inRegister.length; i++) {
-        final byte cb = register[i];
-        final byte ob = inRegister[i];
-        register[i] = ob > cb ? ob : cb;
-      }
-
-      // update max register value
-      if (hdr.getMaxRegisterValue() > maxRegisterValue) {
-        maxRegisterValue = hdr.getMaxRegisterValue();
-      }
-    } else {
-      throw new IllegalArgumentException("Specified register is not instance of HLLDenseRegister");
-    }
-  }
-
-  public byte[] getRegister() {
-    return register;
-  }
-
-  public void setRegister(byte[] register) {
-    this.register = register;
-  }
-
-  public int getMaxRegisterValue() {
-    return maxRegisterValue;
-  }
-
-  public double getSumInversePow2() {
-    double sum = 0;
-    for (byte b : register) {
-      sum += HLLConstants.inversePow2Data[b];
-    }
-    return sum;
-  }
-
-  @Override
-  public String toString() {
-    StringBuilder sb = new StringBuilder();
-    sb.append("HLLDenseRegister - ");
-    sb.append("p: ");
-    sb.append(p);
-    sb.append(" numZeroes: ");
-    sb.append(getNumZeroes());
-    sb.append(" maxRegisterValue: ");
-    sb.append(maxRegisterValue);
-    return sb.toString();
-  }
-
-  public String toExtendedString() {
-    return toString() + " register: " + Arrays.toString(register);
-  }
-
-  @Override
-  public boolean equals(Object obj) {
-    if (!(obj instanceof HLLDenseRegister)) {
-      return false;
-    }
-    HLLDenseRegister other = (HLLDenseRegister) obj;
-    return maxRegisterValue == other.maxRegisterValue && Arrays.equals(register, other.register);
-  }
-
-  @Override
-  public int hashCode() {
-    int hashcode = 0;
-    hashcode += 31 * maxRegisterValue;
-    hashcode += Arrays.hashCode(register);
-    return hashcode;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/hive/blob/081fa368/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/common/ndv/hll/HLLRegister.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/common/ndv/hll/HLLRegister.java b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/common/ndv/hll/HLLRegister.java
deleted file mode 100644
index a90094d..0000000
--- a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/common/ndv/hll/HLLRegister.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hive.common.ndv.hll;
-
-public interface HLLRegister {
-
-  /**
-   * Specify a hashcode to add to hyperloglog register.
-   * @param hashcode
-   *          - hashcode to add
-   * @return true if register value is updated else false
-   */
-  boolean add(long hashcode);
-
-  /**
-   * Instead of specifying hashcode, this interface can be used to directly
-   * specify the register index and register value. This interface is useful
-   * when reconstructing hyperloglog from a serialized representation where its
-   * not possible to regenerate the hashcode.
-   * @param idx
-   *          - register index
-   * @param value
-   *          - register value
-   * @return true if register value is updated else false
-   */
-  boolean set(int idx, byte value);
-
-  /**
-   * Merge hyperloglog registers of the same type (SPARSE or DENSE register)
-   * @param reg
-   *          - register to be merged
-   */
-  void merge(HLLRegister reg);
-}

http://git-wip-us.apache.org/repos/asf/hive/blob/081fa368/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/common/ndv/hll/HLLSparseRegister.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/common/ndv/hll/HLLSparseRegister.java b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/common/ndv/hll/HLLSparseRegister.java
deleted file mode 100644
index d5ac54a..0000000
--- a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/common/ndv/hll/HLLSparseRegister.java
+++ /dev/null
@@ -1,261 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hive.common.ndv.hll;
-
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.TreeMap;
-
-public class HLLSparseRegister implements HLLRegister {
-
-  private TreeMap<Integer,Byte> sparseMap;
-
-  // for a better insertion performance values are added to temporary unsorted
-  // list which will be merged to sparse map after a threshold
-  private int[] tempList;
-  private int tempListIdx;
-
-  // number of register bits
-  private final int p;
-
-  // new number of register bits for higher accuracy
-  private final int pPrime;
-
-  // number of bits to store the number of zero runs
-  private final int qPrime;
-
-  // masks for quicker extraction of p, pPrime, qPrime values
-  private final int mask;
-  private final int pPrimeMask;
-  private final int qPrimeMask;
-
-  public HLLSparseRegister(int p, int pp, int qp) {
-    this.p = p;
-    this.sparseMap = new TreeMap<>();
-    this.tempList = new int[HLLConstants.TEMP_LIST_DEFAULT_SIZE];
-    this.tempListIdx = 0;
-    this.pPrime = pp;
-    this.qPrime = qp;
-    this.mask = ((1 << pPrime) - 1) ^ ((1 << p) - 1);
-    this.pPrimeMask = ((1 << pPrime) - 1);
-    this.qPrimeMask = (1 << qPrime) - 1;
-  }
-
-  public boolean add(long hashcode) {
-    boolean updated = false;
-
-    // fill the temp list before merging to sparse map
-    if (tempListIdx < tempList.length) {
-      int encodedHash = encodeHash(hashcode);
-      tempList[tempListIdx++] = encodedHash;
-      updated = true;
-    } else {
-      updated = mergeTempListToSparseMap();
-    }
-
-    return updated;
-  }
-
-  /**
-   * Adds temp list to sparse map. The key for sparse map entry is the register
-   * index determined by pPrime and value is the number of trailing zeroes.
-   * @return
-   */
-  private boolean mergeTempListToSparseMap() {
-    boolean updated = false;
-    for (int i = 0; i < tempListIdx; i++) {
-      int encodedHash = tempList[i];
-      int key = encodedHash & pPrimeMask;
-      byte value = (byte) (encodedHash >>> pPrime);
-      byte nr = 0;
-      // if MSB is set to 1 then next qPrime MSB bits contains the value of
-      // number of zeroes.
-      // if MSB is set to 0 then number of zeroes is contained within pPrime - p
-      // bits.
-      if (encodedHash < 0) {
-        nr = (byte) (value & qPrimeMask);
-      } else {
-        nr = (byte) (Integer.numberOfTrailingZeros(encodedHash >>> p) + 1);
-      }
-      updated = set(key, nr);
-    }
-
-    // reset temp list index
-    tempListIdx = 0;
-    return updated;
-  }
-
-  /**
-   * <pre>
-   * <b>Input:</b> 64 bit hashcode
-   * 
-   * |---------w-------------| |------------p'----------|
-   * 10101101.......1010101010 10101010101 01010101010101
-   *                                       |------p-----|
-   *                                       
-   * <b>Output:</b> 32 bit int
-   * 
-   * |b| |-q'-|  |------------p'----------|
-   *  1  010101  01010101010 10101010101010
-   *                         |------p-----|
-   *                    
-   * 
-   * The default values of p', q' and b are 25, 6, 1 (total 32 bits) respectively.
-   * This function will return an int encoded in the following format
-   * 
-   * p  - LSB p bits represent the register index
-   * p' - LSB p' bits are used for increased accuracy in estimation
-   * q' - q' bits after p' are left as such from the hashcode if b = 0 else
-   *      q' bits encodes the longest trailing zero runs from in (w-p) input bits
-   * b  - 0 if longest trailing zero run is contained within (p'-p) bits
-   *      1 if longest trailing zero run is computeed from (w-p) input bits and
-   *      its value is stored in q' bits
-   * </pre>
-   * @param hashcode
-   * @return
-   */
-  public int encodeHash(long hashcode) {
-    // x = p' - p
-    int x = (int) (hashcode & mask);
-    if (x == 0) {
-      // more bits should be considered for finding q (longest zero runs)
-      // set MSB to 1
-      int ntr = Long.numberOfTrailingZeros(hashcode >> p) + 1;
-      long newHashCode = hashcode & pPrimeMask;
-      newHashCode |= ntr << pPrime;
-      newHashCode |= 0x80000000;
-      return (int) newHashCode;
-    } else {
-      // q is contained within p' - p
-      // set MSB to 0
-      return (int) (hashcode & 0x7FFFFFFF);
-    }
-  }
-
-  public int getSize() {
-    return sparseMap.size() + tempListIdx;
-  }
-
-  public void merge(HLLRegister hllRegister) {
-    if (hllRegister instanceof HLLSparseRegister) {
-      HLLSparseRegister hsr = (HLLSparseRegister) hllRegister;
-
-      // retain only the largest value for a register index
-      for (Map.Entry<Integer, Byte> entry : hsr.getSparseMap().entrySet()) {
-        int key = entry.getKey();
-        byte value = entry.getValue();
-        set(key, value);
-      }
-    } else {
-      throw new IllegalArgumentException("Specified register not instance of HLLSparseRegister");
-    }
-  }
-
-  public boolean set(int key, byte value) {
-    // retain only the largest value for a register index
-    Byte containedValue = sparseMap.get(key);
-    if (containedValue == null || value > containedValue) {
-      sparseMap.put(key, value);
-      return true;
-    }
-    return false;
-  }
-
-  public TreeMap<Integer,Byte> getSparseMap() {
-    return getMergedSparseMap();
-  }
-
-  private TreeMap<Integer,Byte> getMergedSparseMap() {
-    if (tempListIdx != 0) {
-      mergeTempListToSparseMap();
-    }
-    return sparseMap;
-  }
-
-  // this is effectively the same as the dense register impl.
-  public void extractLowBitsTo(HLLRegister dest) {
-    for (Entry<Integer, Byte> entry : getSparseMap().entrySet()) {
-      int idx = entry.getKey();
-      byte lr = entry.getValue(); // this can be a max of 65, never > 127
-      if (lr != 0) {
-        // should be a no-op for sparse
-        dest.add((long) ((1 << (p + lr - 1)) | idx));
-      }
-    }
-  }
-
-  public int getP() {
-    return p;
-  }
-
-  public int getPPrime() {
-    return pPrime;
-  }
-
-  @Override
-  public String toString() {
-    StringBuilder sb = new StringBuilder();
-    sb.append("HLLSparseRegister - ");
-    sb.append("p: ");
-    sb.append(p);
-    sb.append(" pPrime: ");
-    sb.append(pPrime);
-    sb.append(" qPrime: ");
-    sb.append(qPrime);
-    return sb.toString();
-  }
-
-  public String toExtendedString() {
-    return toString() + " register: " + sparseMap.toString();
-  }
-
-  @Override
-  public boolean equals(Object obj) {
-    if (!(obj instanceof HLLSparseRegister)) {
-      return false;
-    }
-    HLLSparseRegister other = (HLLSparseRegister) obj;
-    boolean result = p == other.p && pPrime == other.pPrime && qPrime == other.qPrime
-        && tempListIdx == other.tempListIdx;
-    if (result) {
-      for (int i = 0; i < tempListIdx; i++) {
-        if (tempList[i] != other.tempList[i]) {
-          return false;
-        }
-      }
-
-      result = result && sparseMap.equals(other.sparseMap);
-    }
-    return result;
-  }
-
-  @Override
-  public int hashCode() {
-    int hashcode = 0;
-    hashcode += 31 * p;
-    hashcode += 31 * pPrime;
-    hashcode += 31 * qPrime;
-    for (int i = 0; i < tempListIdx; i++) {
-      hashcode += 31 * tempList[tempListIdx];
-    }
-    hashcode += sparseMap.hashCode();
-    return hashcode;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/hive/blob/081fa368/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/common/ndv/hll/HyperLogLog.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/common/ndv/hll/HyperLogLog.java b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/common/ndv/hll/HyperLogLog.java
deleted file mode 100644
index 91a6865..0000000
--- a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/common/ndv/hll/HyperLogLog.java
+++ /dev/null
@@ -1,664 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hive.common.ndv.hll;
-
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.charset.Charset;
-import java.util.Map;
-import java.util.TreeMap;
-
-import org.apache.hadoop.hive.common.ndv.NumDistinctValueEstimator;
-import org.apache.hadoop.hive.common.type.HiveDecimal;
-import org.apache.hadoop.hive.ql.util.JavaDataModel;
-import org.apache.hive.common.util.Murmur3;
-
-/**
- * <pre>
- * This is an implementation of the following variants of hyperloglog (HLL)
- * algorithm 
- * Original  - Original HLL algorithm from Flajolet et. al from
- *             http://algo.inria.fr/flajolet/Publications/FlFuGaMe07.pdf
- * HLLNoBias - Google's implementation of bias correction based on lookup table
- *             http://static.googleusercontent.com/media/research.google.com/en//pubs/archive/40671.pdf
- * HLL++     - Google's implementation of HLL++ algorithm that uses SPARSE registers
- *             http://static.googleusercontent.com/media/research.google.com/en//pubs/archive/40671.pdf
- * 
- * Following are the constructor parameters that determines which algorithm is
- * used
- * <b>numRegisterIndexBits</b> - number of LSB hashcode bits to be used as register index.
- *                        <i>Default is 14</i>. min = 4 and max = 16
- * <b>numHashBits</b> - number of bits for hashcode. <i>Default is 64</i>. min = 32 and max = 128
- * <b>encoding</b> - Type of encoding to use (SPARSE or DENSE). The algorithm automatically
- *            switches to DENSE beyond a threshold. <i>Default: SPARSE</i>
- * <b>enableBitPacking</b> - To enable bit packing or not. Bit packing improves compression
- *                    at the cost of more CPU cycles. <i>Default: true</i>
- * <b>noBias</b> - Use Google's bias table lookup for short range bias correction.
- *          Enabling this will highly improve the estimation accuracy for short
- *          range values. <i>Default: true</i>
- *
- * </pre>
- */
-public class HyperLogLog implements NumDistinctValueEstimator {
-  private final static int DEFAULT_HASH_BITS = 64;
-  private final static long HASH64_ZERO = Murmur3.hash64(new byte[] {0});
-  private final static long HASH64_ONE = Murmur3.hash64(new byte[] {1});
-
-  public enum EncodingType {
-    SPARSE, DENSE
-  }
-
-  // number of bits to address registers
-  private final int p;
-
-  // number of registers - 2^p
-  private final int m;
-
-  // refer paper
-  private float alphaMM;
-
-  // enable/disable bias correction using table lookup
-  private final boolean noBias;
-
-  // enable/disable bitpacking
-  private final boolean bitPacking;
-
-  // Not making it configurable for perf reasons (avoid checks)
-  private final int chosenHashBits = DEFAULT_HASH_BITS;
-
-  private HLLDenseRegister denseRegister;
-  private HLLSparseRegister sparseRegister;
-
-  // counts are cached to avoid repeated complex computation. If register value
-  // is updated the count will be computed again.
-  private long cachedCount;
-  private boolean invalidateCount;
-
-  private EncodingType encoding;
-
-  // threshold to switch from SPARSE to DENSE encoding
-  private int encodingSwitchThreshold;
-
-  private HyperLogLog(HyperLogLogBuilder hllBuilder) {
-    if (hllBuilder.numRegisterIndexBits < HLLConstants.MIN_P_VALUE
-        || hllBuilder.numRegisterIndexBits > HLLConstants.MAX_P_VALUE) {
-      throw new IllegalArgumentException("p value should be between " + HLLConstants.MIN_P_VALUE
-          + " to " + HLLConstants.MAX_P_VALUE);
-    }
-    this.p = hllBuilder.numRegisterIndexBits;
-    this.m = 1 << p;
-    this.noBias = hllBuilder.noBias;
-    this.bitPacking = hllBuilder.bitPacking;
-
-    // the threshold should be less than 12K bytes for p = 14.
-    // The reason to divide by 5 is, in sparse mode after serialization the
-    // entriesin sparse map are compressed, and delta encoded as varints. The
-    // worst case size of varints are 5 bytes. Hence, 12K/5 ~= 2400 entries in
-    // sparse map.
-    if (bitPacking) {
-      this.encodingSwitchThreshold = ((m * 6) / 8) / 5;
-    } else {
-      // if bitpacking is disabled, all register values takes 8 bits and hence
-      // we can be more flexible with the threshold. For p=14, 16K/5 = 3200
-      // entries in sparse map can be allowed.
-      this.encodingSwitchThreshold = m / 3;
-    }
-
-    // initializeAlpha(DEFAULT_HASH_BITS);
-    // alphaMM value for 128 bits hash seems to perform better for default 64 hash bits
-    this.alphaMM = 0.7213f / (1 + 1.079f / m);
-    // For efficiency alpha is multiplied by m^2
-    this.alphaMM = this.alphaMM * m * m;
-
-    this.cachedCount = -1;
-    this.invalidateCount = false;
-    this.encoding = hllBuilder.encoding;
-    if (encoding.equals(EncodingType.SPARSE)) {
-      this.sparseRegister = new HLLSparseRegister(p, HLLConstants.P_PRIME_VALUE,
-          HLLConstants.Q_PRIME_VALUE);
-      this.denseRegister = null;
-    } else {
-      this.sparseRegister = null;
-      this.denseRegister = new HLLDenseRegister(p, bitPacking);
-    }
-  }
-
-  public static HyperLogLogBuilder builder() {
-    return new HyperLogLogBuilder();
-  }
-
-  public static class HyperLogLogBuilder {
-    private int numRegisterIndexBits = 14;
-    private EncodingType encoding = EncodingType.SPARSE;
-    private boolean bitPacking = true;
-    private boolean noBias = true;
-
-    public HyperLogLogBuilder() {
-    }
-
-    public HyperLogLogBuilder setNumRegisterIndexBits(int b) {
-      this.numRegisterIndexBits = b;
-      return this;
-    }
-
-    public HyperLogLogBuilder setSizeOptimized() {
-      // allowing this to be increased via config breaks the merge impl
-      // p=10 = ~1kb per vector or smaller
-      this.numRegisterIndexBits = 10;
-      return this;
-    }
-
-    public HyperLogLogBuilder setEncoding(EncodingType enc) {
-      this.encoding = enc;
-      return this;
-    }
-
-    public HyperLogLogBuilder enableBitPacking(boolean b) {
-      this.bitPacking = b;
-      return this;
-    }
-
-    public HyperLogLogBuilder enableNoBias(boolean nb) {
-      this.noBias = nb;
-      return this;
-    }
-
-    public HyperLogLog build() {
-      return new HyperLogLog(this);
-    }
-  }
-
-  // see paper for alpha initialization.
-  private void initializeAlpha(final int hashBits) {
-    if (hashBits <= 16) {
-      alphaMM = 0.673f;
-    } else if (hashBits <= 32) {
-      alphaMM = 0.697f;
-    } else if (hashBits <= 64) {
-      alphaMM = 0.709f;
-    } else {
-      alphaMM = 0.7213f / (float) (1 + 1.079f / m);
-    }
-
-    // For efficiency alpha is multiplied by m^2
-    alphaMM = alphaMM * m * m;
-  }
-
-  public void addBoolean(boolean val) {
-    add(val ? HASH64_ONE : HASH64_ZERO);
-  }
-
-  public void addByte(byte val) {
-    add(Murmur3.hash64(new byte[] {val}));
-  }
-
-  public void addBytes(byte[] val) {
-    add(Murmur3.hash64(val));
-  }
-
-  public void addShort(short val) {
-    add(Murmur3.hash64(val));
-  }
-
-  public void addInt(int val) {
-    add(Murmur3.hash64(val));
-  }
-
-  public void addLong(long val) {
-    add(Murmur3.hash64(val));
-  }
-
-  public void addFloat(float val) {
-    add(Murmur3.hash64(Float.floatToIntBits(val)));
-  }
-
-  public void addDouble(double val) {
-    add(Murmur3.hash64(Double.doubleToLongBits(val)));
-  }
-
-  public void addChar(char val) {
-    add(Murmur3.hash64((short)val));
-  }
-
-  /**
-   * Java's default charset will be used for strings.
-   * @param val
-   *          - input string
-   */
-  public void addString(String val) {
-    add(Murmur3.hash64(val.getBytes()));
-  }
-
-  public void addString(String val, Charset charset) {
-    add(Murmur3.hash64(val.getBytes(charset)));
-  }
-
-  public void add(long hashcode) {
-    if (encoding.equals(EncodingType.SPARSE)) {
-      if (sparseRegister.add(hashcode)) {
-        invalidateCount = true;
-      }
-
-      // if size of sparse map excess the threshold convert the sparse map to
-      // dense register and switch to DENSE encoding
-      if (sparseRegister.getSize() > encodingSwitchThreshold) {
-        encoding = EncodingType.DENSE;
-        denseRegister = sparseToDenseRegister(sparseRegister);
-        sparseRegister = null;
-        invalidateCount = true;
-      }
-    } else {
-      if (denseRegister.add(hashcode)) {
-        invalidateCount = true;
-      }
-    }
-  }
-
-  public long estimateNumDistinctValues() {
-    // FMSketch treats the ndv of all nulls as 1 but hll treates the ndv as 0.
-    // In order to get rid of divide by 0 problem, we follow FMSketch
-    return count() > 0 ? count() : 1;
-  }
-
-  public long count() {
-    // compute count only if the register values are updated else return the
-    // cached count
-    if (invalidateCount || cachedCount < 0) {
-      if (encoding.equals(EncodingType.SPARSE)) {
-
-        // if encoding is still SPARSE use linear counting with increase
-        // accuracy (as we use pPrime bits for register index)
-        int mPrime = 1 << sparseRegister.getPPrime();
-        cachedCount = linearCount(mPrime, mPrime - sparseRegister.getSparseMap().size());
-      } else {
-
-        // for DENSE encoding, use bias table lookup for HLLNoBias algorithm
-        // else fallback to HLLOriginal algorithm
-        double sum = denseRegister.getSumInversePow2();
-        long numZeros = denseRegister.getNumZeroes();
-
-        // cardinality estimate from normalized bias corrected harmonic mean on
-        // the registers
-        cachedCount = (long) (alphaMM * (1.0 / sum));
-        long pow = (long) Math.pow(2, chosenHashBits);
-
-        // when bias correction is enabled
-        if (noBias) {
-          cachedCount = cachedCount <= 5 * m ? (cachedCount - estimateBias(cachedCount))
-              : cachedCount;
-          long h = cachedCount;
-          if (numZeros != 0) {
-            h = linearCount(m, numZeros);
-          }
-
-          if (h < getThreshold()) {
-            cachedCount = h;
-          }
-        } else {
-          // HLL algorithm shows stronger bias for values in (2.5 * m) range.
-          // To compensate for this short range bias, linear counting is used
-          // for values before this short range. The original paper also says
-          // similar bias is seen for long range values due to hash collisions
-          // in range >1/30*(2^32). For the default case, we do not have to
-          // worry about this long range bias as the paper used 32-bit hashing
-          // and we use 64-bit hashing as default. 2^64 values are too high to
-          // observe long range bias (hash collisions).
-          if (cachedCount <= 2.5 * m) {
-
-            // for short range use linear counting
-            if (numZeros != 0) {
-              cachedCount = linearCount(m, numZeros);
-            }
-          } else if (chosenHashBits < 64 && cachedCount > (0.033333 * pow)) {
-
-            // long range bias for 32-bit hashcodes
-            if (cachedCount > (1 / 30) * pow) {
-              cachedCount = (long) (-pow * Math.log(1.0 - (double) cachedCount / (double) pow));
-            }
-          }
-        }
-      }
-      invalidateCount = false;
-    }
-
-    return cachedCount;
-  }
-
-  private long getThreshold() {
-    return (long) (HLLConstants.thresholdData[p - 4] + 0.5);
-  }
-
-  /**
-   * Estimate bias from lookup table
-   * @param count
-   *          - cardinality before bias correction
-   * @return cardinality after bias correction
-   */
-  private long estimateBias(long count) {
-    double[] rawEstForP = HLLConstants.rawEstimateData[p - 4];
-
-    // compute distance and store it in sorted map
-    TreeMap<Double,Integer> estIndexMap = new TreeMap<>();
-    double distance = 0;
-    for (int i = 0; i < rawEstForP.length; i++) {
-      distance = Math.pow(count - rawEstForP[i], 2);
-      estIndexMap.put(distance, i);
-    }
-
-    // take top-k closest neighbors and compute the bias corrected cardinality
-    long result = 0;
-    double[] biasForP = HLLConstants.biasData[p - 4];
-    double biasSum = 0;
-    int kNeighbors = HLLConstants.K_NEAREST_NEIGHBOR;
-    for (Map.Entry<Double, Integer> entry : estIndexMap.entrySet()) {
-      biasSum += biasForP[entry.getValue()];
-      kNeighbors--;
-      if (kNeighbors <= 0) {
-        break;
-      }
-    }
-
-    // 0.5 added for rounding off
-    result = (long) ((biasSum / HLLConstants.K_NEAREST_NEIGHBOR) + 0.5);
-    return result;
-  }
-
-  public void setCount(long count) {
-    this.cachedCount = count;
-    this.invalidateCount = true;
-  }
-
-  private long linearCount(int mVal, long numZeros) {
-    return (long) (Math.round(mVal * Math.log(mVal / ((double) numZeros))));
-  }
-
-  // refer paper
-  public double getStandardError() {
-    return 1.04 / Math.sqrt(m);
-  }
-
-  public HLLDenseRegister getHLLDenseRegister() {
-    return denseRegister;
-  }
-
-  public HLLSparseRegister getHLLSparseRegister() {
-    return sparseRegister;
-  }
-
-  /**
-   * Reconstruct sparse map from serialized integer list
-   * @param reg
-   *          - uncompressed and delta decoded integer list
-   */
-  public void setHLLSparseRegister(int[] reg) {
-    for (int i : reg) {
-      int key = i >>> HLLConstants.Q_PRIME_VALUE;
-      byte value = (byte) (i & 0x3f);
-      sparseRegister.set(key, value);
-    }
-  }
-
-  /**
-   * Reconstruct dense registers from byte array
-   * @param reg
-   *          - unpacked byte array
-   */
-  public void setHLLDenseRegister(byte[] reg) {
-    int i = 0;
-    for (byte b : reg) {
-      denseRegister.set(i, b);
-      i++;
-    }
-  }
-
-  /**
-   * Merge the specified hyperloglog to the current one. Encoding switches
-   * automatically after merge if the encoding switch threshold is exceeded.
-   * @param hll
-   *          - hyperloglog to be merged
-   * @throws IllegalArgumentException
-   */
-  public void merge(HyperLogLog hll) {
-    if (chosenHashBits != hll.chosenHashBits) {
-      throw new IllegalArgumentException(
-          "HyperLogLog cannot be merged as either p or hashbits are different. Current: "
-              + toString() + " Provided: " + hll.toString());
-    }
-
-    if (p > hll.p) {
-      throw new IllegalArgumentException(
-          "HyperLogLog cannot merge a smaller p into a larger one : "
-              + toString() + " Provided: " + hll.toString());
-    }
-
-    if (p != hll.p) {
-      // invariant: p > hll.p
-      hll = hll.squash(p);
-    }
-
-    EncodingType otherEncoding = hll.getEncoding();
-
-    if (encoding.equals(EncodingType.SPARSE) && otherEncoding.equals(EncodingType.SPARSE)) {
-      sparseRegister.merge(hll.getHLLSparseRegister());
-      // if after merge the sparse switching threshold is exceeded then change
-      // to dense encoding
-      if (sparseRegister.getSize() > encodingSwitchThreshold) {
-        encoding = EncodingType.DENSE;
-        denseRegister = sparseToDenseRegister(sparseRegister);
-        sparseRegister = null;
-      }
-    } else if (encoding.equals(EncodingType.DENSE) && otherEncoding.equals(EncodingType.DENSE)) {
-      denseRegister.merge(hll.getHLLDenseRegister());
-    } else if (encoding.equals(EncodingType.SPARSE) && otherEncoding.equals(EncodingType.DENSE)) {
-      denseRegister = sparseToDenseRegister(sparseRegister);
-      denseRegister.merge(hll.getHLLDenseRegister());
-      sparseRegister = null;
-      encoding = EncodingType.DENSE;
-    } else if (encoding.equals(EncodingType.DENSE) && otherEncoding.equals(EncodingType.SPARSE)) {
-      HLLDenseRegister otherDenseRegister = sparseToDenseRegister(hll.getHLLSparseRegister());
-      denseRegister.merge(otherDenseRegister);
-    }
-
-    invalidateCount = true;
-  }
-
-  /**
-   * Reduces the accuracy of the HLL provided to a smaller size
-   * @param p0 
-   *         - new p size for the new HyperLogLog (smaller or no change)
-   * @return reduced (or same) HyperLogLog instance
-   */
-  public HyperLogLog squash(final int p0) {
-    if (p0 > p) {
-      throw new IllegalArgumentException(
-          "HyperLogLog cannot be be squashed to be bigger. Current: "
-              + toString() + " Provided: " + p0);
-    }
-
-    if (p0 == p) {
-      return this;
-    }
-
-    final HyperLogLog hll = new HyperLogLogBuilder()
-        .setNumRegisterIndexBits(p0).setEncoding(EncodingType.DENSE)
-        .enableNoBias(noBias).build();
-    final HLLDenseRegister result = hll.denseRegister;
-
-    if (encoding == EncodingType.SPARSE) {
-      sparseRegister.extractLowBitsTo(result);
-    } else if (encoding == EncodingType.DENSE) {
-      denseRegister.extractLowBitsTo(result);
-    }
-    return hll;
-  }
-
-  /**
-   * Converts sparse to dense hll register.
-   * @param sparseRegister
-   *          - sparse register to be converted
-   * @return converted dense register
-   */
-  private HLLDenseRegister sparseToDenseRegister(HLLSparseRegister sparseRegister) {
-    if (sparseRegister == null) {
-      return null;
-    }
-    int p = sparseRegister.getP();
-    int pMask = (1 << p) - 1;
-    HLLDenseRegister result = new HLLDenseRegister(p, bitPacking);
-    for (Map.Entry<Integer, Byte> entry : sparseRegister.getSparseMap().entrySet()) {
-      int key = entry.getKey();
-      int idx = key & pMask;
-      result.set(idx, entry.getValue());
-    }
-    return result;
-  }
-
-  @Override
-  public String toString() {
-    StringBuilder sb = new StringBuilder();
-    sb.append("Encoding: ");
-    sb.append(encoding);
-    sb.append(", p: ");
-    sb.append(p);
-    sb.append(", estimatedCardinality: ");
-    sb.append(estimateNumDistinctValues());
-    return sb.toString();
-  }
-
-  public String toStringExtended() {
-    if (encoding.equals(EncodingType.DENSE)) {
-      return toString() + ", " + denseRegister.toExtendedString();
-    } else if (encoding.equals(EncodingType.SPARSE)) {
-      return toString() + ", " + sparseRegister.toExtendedString();
-    }
-
-    return toString();
-  }
-
-  public int getNumRegisterIndexBits() {
-    return p;
-  }
-
-  public EncodingType getEncoding() {
-    return encoding;
-  }
-
-  public void setEncoding(EncodingType encoding) {
-    this.encoding = encoding;
-  }
-
-  @Override
-  public boolean equals(Object obj) {
-    if (!(obj instanceof HyperLogLog)) {
-      return false;
-    }
-
-    HyperLogLog other = (HyperLogLog) obj;
-    long count = estimateNumDistinctValues();
-    long otherCount = other.estimateNumDistinctValues();
-    boolean result = p == other.p && chosenHashBits == other.chosenHashBits
-        && encoding.equals(other.encoding) && count == otherCount;
-    if (encoding.equals(EncodingType.DENSE)) {
-      result = result && denseRegister.equals(other.getHLLDenseRegister());
-    }
-
-    if (encoding.equals(EncodingType.SPARSE)) {
-      result = result && sparseRegister.equals(other.getHLLSparseRegister());
-    }
-    return result;
-  }
-
-  @Override
-  public int hashCode() {
-    int hashcode = 0;
-    hashcode += 31 * p;
-    hashcode += 31 * chosenHashBits;
-    hashcode += encoding.hashCode();
-    hashcode += 31 * estimateNumDistinctValues();
-    if (encoding.equals(EncodingType.DENSE)) {
-      hashcode += 31 * denseRegister.hashCode();
-    }
-
-    if (encoding.equals(EncodingType.SPARSE)) {
-      hashcode += 31 * sparseRegister.hashCode();
-    }
-    return hashcode;
-  }
-
-  @Override
-  public void reset() {
-  }
-
-  @Override
-  public byte[] serialize() {
-    ByteArrayOutputStream bos = new ByteArrayOutputStream();
-    // write bytes to bos ...
-    try {
-      HyperLogLogUtils.serializeHLL(bos, this);
-      byte[] result = bos.toByteArray();
-      bos.close();
-      return result;
-    } catch (IOException e) {
-      throw new RuntimeException(e);
-    }
-  }
-
-  @Override
-  public NumDistinctValueEstimator deserialize(byte[] buf) {
-    return HyperLogLogUtils.deserializeHLL(buf);
-  }
-
-  @Override
-  public void addToEstimator(long v) {
-    addLong(v);
-  }
-
-  @Override
-  public void addToEstimator(String s) {
-    addString(s);
-  }
-
-  @Override
-  public void addToEstimator(double d) {
-    addDouble(d);
-  }
-
-  @Override
-  public void addToEstimator(HiveDecimal decimal) {
-    addDouble(decimal.doubleValue());
-  }
-
-  @Override
-  public void mergeEstimators(NumDistinctValueEstimator o) {
-    merge((HyperLogLog) o);
-  }
-
-  @Override
-  public int lengthFor(JavaDataModel model) {
-    // 5 is the head, 1<<p means the number of bytes for register
-    return (5 + (1 << p));
-  }
-
-  @Override
-  public boolean canMerge(NumDistinctValueEstimator o) {
-    return o instanceof HyperLogLog;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/hive/blob/081fa368/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/common/ndv/hll/HyperLogLogUtils.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/common/ndv/hll/HyperLogLogUtils.java b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/common/ndv/hll/HyperLogLogUtils.java
deleted file mode 100644
index aeba2e9..0000000
--- a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/common/ndv/hll/HyperLogLogUtils.java
+++ /dev/null
@@ -1,409 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hive.common.ndv.hll;
-
-import java.io.ByteArrayInputStream;
-import java.io.EOFException;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.util.Arrays;
-import java.util.Map;
-import java.util.TreeMap;
-
-import org.apache.hadoop.hive.common.ndv.hll.HyperLogLog.EncodingType;
-
-/**
- * HyperLogLog serialization utilities.
- */
-public class HyperLogLogUtils {
-
-  public static final byte[] MAGIC = new byte[] { 'H', 'L', 'L' };
-
-  /**
-   * HyperLogLog is serialized using the following format
-   * 
-   * <pre>
-   * |-4 byte-|------varlong----|varint (optional)|----------|  
-   * ---------------------------------------------------------
-   * | header | estimated-count | register-length | register |
-   * ---------------------------------------------------------
-   * 
-   * <b>4 byte header</b> is encoded like below
-   * 3 bytes - HLL magic string to identify serialized stream
-   * 4 bits  - p (number of bits to be used as register index)
-   * 1       - spare bit (not used)
-   * 3 bits  - encoding (000 - sparse, 001..110 - n bit packing, 111 - no bit packing)
-   * 
-   * Followed by header are 3 fields that are required for reconstruction
-   * of hyperloglog
-   * Estimated count - variable length long to store last computed estimated count.
-   *                   This is just for quick lookup without deserializing registers
-   * Register length - number of entries in the register (required only for 
-   *                   for sparse representation. For bit-packing, the register
-   *                   length can be found from p)
-   * </pre>
-   * @param out
-   *          - output stream to write to
-   * @param hll
-   *          - hyperloglog that needs to be serialized
-   * @throws IOException
-   */
-  public static void serializeHLL(OutputStream out, HyperLogLog hll) throws IOException {
-
-    // write header
-    out.write(MAGIC);
-    int fourthByte = 0;
-    int p = hll.getNumRegisterIndexBits();
-    fourthByte = (p & 0xff) << 4;
-
-    int bitWidth = 0;
-    EncodingType enc = hll.getEncoding();
-
-    // determine bit width for bitpacking and encode it in header
-    if (enc.equals(EncodingType.DENSE)) {
-      int lzr = hll.getHLLDenseRegister().getMaxRegisterValue();
-      bitWidth = getBitWidth(lzr);
-
-      // the max value of number of zeroes for 64 bit hash can be encoded using
-      // only 6 bits. So we will disable bit packing for any values >6
-      if (bitWidth > 6) {
-        fourthByte |= 7;
-        bitWidth = 8;
-      } else {
-        fourthByte |= (bitWidth & 7);
-      }
-    }
-
-    // write fourth byte of header
-    out.write(fourthByte);
-
-    // write estimated count
-    long estCount = hll.estimateNumDistinctValues();
-    writeVulong(out, estCount);
-
-    // serialize dense/sparse registers. Dense registers are bitpacked whereas
-    // sparse registers are delta and variable length encoded
-    if (enc.equals(EncodingType.DENSE)) {
-      byte[] register = hll.getHLLDenseRegister().getRegister();
-      bitpackHLLRegister(out, register, bitWidth);
-    } else if (enc.equals(EncodingType.SPARSE)) {
-      TreeMap<Integer, Byte> sparseMap = hll.getHLLSparseRegister().getSparseMap();
-
-      // write the number of elements in sparse map (required for
-      // reconstruction)
-      writeVulong(out, sparseMap.size());
-
-      // compute deltas and write the values as varints
-      int prev = 0;
-      for (Map.Entry<Integer, Byte> entry : sparseMap.entrySet()) {
-        if (prev == 0) {
-          prev = (entry.getKey() << HLLConstants.Q_PRIME_VALUE) | entry.getValue();
-          writeVulong(out, prev);
-        } else {
-          int curr = (entry.getKey() << HLLConstants.Q_PRIME_VALUE) | entry.getValue();
-          int delta = curr - prev;
-          writeVulong(out, delta);
-          prev = curr;
-        }
-      }
-    }
-  }
-
-  /**
-   * Refer serializeHLL() for format of serialization. This function
-   * deserializes the serialized hyperloglogs
-   * @param in
-   *          - input stream
-   * @return deserialized hyperloglog
-   * @throws IOException
-   */
-  public static HyperLogLog deserializeHLL(InputStream in) throws IOException {
-    checkMagicString(in);
-    int fourthByte = in.read() & 0xff;
-    int p = fourthByte >>> 4;
-
-    // read type of encoding
-    int enc = fourthByte & 7;
-    EncodingType encoding = null;
-    int bitSize = 0;
-    if (enc == 0) {
-      encoding = EncodingType.SPARSE;
-    } else if (enc > 0 && enc < 7) {
-      bitSize = enc;
-      encoding = EncodingType.DENSE;
-    } else {
-      // bit packing disabled
-      bitSize = 8;
-      encoding = EncodingType.DENSE;
-    }
-
-    // estimated count
-    long estCount = readVulong(in);
-
-    HyperLogLog result = null;
-    if (encoding.equals(EncodingType.SPARSE)) {
-      result = HyperLogLog.builder().setNumRegisterIndexBits(p)
-          .setEncoding(EncodingType.SPARSE).build();
-      int numRegisterEntries = (int) readVulong(in);
-      int[] reg = new int[numRegisterEntries];
-      int prev = 0;
-
-      // reconstruct the sparse map from delta encoded and varint input stream
-      if (numRegisterEntries > 0) {
-        prev = (int) readVulong(in);
-        reg[0] = prev;
-      }
-      int delta = 0;
-      int curr = 0;
-      for (int i = 1; i < numRegisterEntries; i++) {
-        delta = (int) readVulong(in);
-        curr = prev + delta;
-        reg[i] = curr;
-        prev = curr;
-      }
-      result.setHLLSparseRegister(reg);
-    } else {
-
-      // explicitly disable bit packing
-      if (bitSize == 8) {
-        result = HyperLogLog.builder().setNumRegisterIndexBits(p)
-            .setEncoding(EncodingType.DENSE).enableBitPacking(false).build();
-      } else {
-        result = HyperLogLog.builder().setNumRegisterIndexBits(p)
-            .setEncoding(EncodingType.DENSE).enableBitPacking(true).build();
-      }
-      int m = 1 << p;
-      byte[] register = unpackHLLRegister(in, m, bitSize);
-      result.setHLLDenseRegister(register);
-    }
-
-    result.setCount(estCount);
-
-    return result;
-  }
-
-  /**
-   * This function deserializes the serialized hyperloglogs from a byte array.
-   * @param buf - to deserialize
-   * @return HyperLogLog
-   */
-  public static HyperLogLog deserializeHLL(final byte[] buf) {
-    InputStream is = new ByteArrayInputStream(buf); // TODO: use faster non-sync inputstream
-    try {
-      HyperLogLog result = deserializeHLL(is);
-      is.close();
-      return result;
-    } catch (IOException e) {
-      throw new RuntimeException(e);
-    }
-  }
-
-  private static void bitpackHLLRegister(OutputStream out, byte[] register, int bitWidth)
-      throws IOException {
-    int bitsLeft = 8;
-    byte current = 0;
-
-    if (bitWidth == 8) {
-      fastPathWrite(out, register);
-      return;
-    }
-
-    // write the blob
-    for (byte value : register) {
-      int bitsToWrite = bitWidth;
-      while (bitsToWrite > bitsLeft) {
-        // add the bits to the bottom of the current word
-        current |= value >>> (bitsToWrite - bitsLeft);
-        // subtract out the bits we just added
-        bitsToWrite -= bitsLeft;
-        // zero out the bits above bitsToWrite
-        value &= (1 << bitsToWrite) - 1;
-        out.write(current);
-        current = 0;
-        bitsLeft = 8;
-      }
-      bitsLeft -= bitsToWrite;
-      current |= value << bitsLeft;
-      if (bitsLeft == 0) {
-        out.write(current);
-        current = 0;
-        bitsLeft = 8;
-      }
-    }
-
-    out.flush();
-  }
-
-  private static void fastPathWrite(OutputStream out, byte[] register) throws IOException {
-    for (byte b : register) {
-      out.write(b);
-    }
-  }
-
-  /**
-   * Unpack the bitpacked HyperLogLog register.
-   * @param in
-   *          - input stream
-   * @param length
-   *          - serialized length
-   * @return unpacked HLL register
-   * @throws IOException
-   */
-  private static byte[] unpackHLLRegister(InputStream in, int length, int bitSize)
-      throws IOException {
-    int mask = (1 << bitSize) - 1;
-    int bitsLeft = 8;
-
-    if (bitSize == 8) {
-      return fastPathRead(in, length);
-    }
-
-    byte current = (byte) (0xff & in.read());
-
-    byte[] output = new byte[length];
-    for (int i = 0; i < output.length; i++) {
-      byte result = 0;
-      int bitsLeftToRead = bitSize;
-      while (bitsLeftToRead > bitsLeft) {
-        result <<= bitsLeft;
-        result |= current & ((1 << bitsLeft) - 1);
-        bitsLeftToRead -= bitsLeft;
-        current = (byte) (0xff & in.read());
-        bitsLeft = 8;
-      }
-      if (bitsLeftToRead > 0) {
-        result <<= bitsLeftToRead;
-        bitsLeft -= bitsLeftToRead;
-        result |= (current >>> bitsLeft) & ((1 << bitsLeftToRead) - 1);
-      }
-      output[i] = (byte) (result & mask);
-    }
-    return output;
-  }
-
-  private static byte[] fastPathRead(InputStream in, int length) throws IOException {
-    byte[] result = new byte[length];
-    for (int i = 0; i < length; i++) {
-      result[i] = (byte) in.read();
-    }
-    return result;
-  }
-
-  /**
-   * Get estimated cardinality without deserializing HLL
-   * @param in
-   *          - serialized HLL
-   * @return - cardinality
-   * @throws IOException
-   */
-  public static long getEstimatedCountFromSerializedHLL(InputStream in) throws IOException {
-    checkMagicString(in);
-    in.read();
-    return readVulong(in);
-  }
-
-  /**
-   * Check if the specified input stream is actually a HLL stream
-   * @param in
-   *          - input stream
-   * @throws IOException
-   */
-  private static void checkMagicString(InputStream in) throws IOException {
-    byte[] magic = new byte[3];
-    magic[0] = (byte) in.read();
-    magic[1] = (byte) in.read();
-    magic[2] = (byte) in.read();
-
-    if (!Arrays.equals(magic, MAGIC)) {
-      throw new IllegalArgumentException("The input stream is not a HyperLogLog stream.");
-    }
-  }
-
-  /**
-   * Minimum bits required to encode the specified value
-   * @param val
-   *          - input value
-   * @return
-   */
-  private static int getBitWidth(int val) {
-    int count = 0;
-    while (val != 0) {
-      count++;
-      val = (byte) (val >>> 1);
-    }
-    return count;
-  }
-
-  /**
-   * Return relative error between actual and estimated cardinality
-   * @param actualCount
-   *          - actual count
-   * @param estimatedCount
-   *          - estimated count
-   * @return relative error
-   */
-  public static float getRelativeError(long actualCount, long estimatedCount) {
-    float err = (1.0f - ((float) estimatedCount / (float) actualCount)) * 100.0f;
-    return err;
-  }
-
-  /**
-   * Write variable length encoded longs to output stream
-   * @param output
-   *          - out stream
-   * @param value
-   *          - long
-   * @throws IOException
-   */
-  private static void writeVulong(OutputStream output, long value) throws IOException {
-    while (true) {
-      if ((value & ~0x7f) == 0) {
-        output.write((byte) value);
-        return;
-      } else {
-        output.write((byte) (0x80 | (value & 0x7f)));
-        value >>>= 7;
-      }
-    }
-  }
-
-  /**
-   * Read variable length encoded longs from input stream
-   * @param in
-   *          - input stream
-   * @return decoded long value
-   * @throws IOException
-   */
-  private static long readVulong(InputStream in) throws IOException {
-    long result = 0;
-    long b;
-    int offset = 0;
-    do {
-      b = in.read();
-      if (b == -1) {
-        throw new EOFException("Reading Vulong past EOF");
-      }
-      result |= (0x7f & b) << offset;
-      offset += 7;
-    } while (b >= 0x80);
-    return result;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/hive/blob/081fa368/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/AcidEventListener.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/AcidEventListener.java b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/AcidEventListener.java
deleted file mode 100644
index 5279247..0000000
--- a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/AcidEventListener.java
+++ /dev/null
@@ -1,146 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hive.metastore;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hive.metastore.api.Database;
-import org.apache.hadoop.hive.metastore.api.HiveObjectType;
-import org.apache.hadoop.hive.metastore.api.MetaException;
-import org.apache.hadoop.hive.metastore.api.Partition;
-import org.apache.hadoop.hive.metastore.api.Table;
-import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
-import org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars;
-import org.apache.hadoop.hive.metastore.events.AlterDatabaseEvent;
-import org.apache.hadoop.hive.metastore.events.AlterPartitionEvent;
-import org.apache.hadoop.hive.metastore.events.AlterTableEvent;
-import org.apache.hadoop.hive.metastore.events.DropDatabaseEvent;
-import org.apache.hadoop.hive.metastore.events.DropPartitionEvent;
-import org.apache.hadoop.hive.metastore.events.DropTableEvent;
-import org.apache.hadoop.hive.metastore.txn.TxnStore;
-import org.apache.hadoop.hive.metastore.txn.TxnUtils;
-
-
-/**
- * It handles cleanup of dropped partition/table/database in ACID related metastore tables
- */
-public class AcidEventListener extends MetaStoreEventListener {
-
-  private TxnStore txnHandler;
-  private Configuration conf;
-
-  public AcidEventListener(Configuration configuration) {
-    super(configuration);
-    conf = configuration;
-  }
-
-  @Override
-  public void onDropDatabase (DropDatabaseEvent dbEvent) throws MetaException {
-    // We can loop thru all the tables to check if they are ACID first and then perform cleanup,
-    // but it's more efficient to unconditionally perform cleanup for the database, especially
-    // when there are a lot of tables
-    txnHandler = getTxnHandler();
-    txnHandler.cleanupRecords(HiveObjectType.DATABASE, dbEvent.getDatabase(), null, null);
-  }
-
-  @Override
-  public void onDropTable(DropTableEvent tableEvent)  throws MetaException {
-    if (TxnUtils.isTransactionalTable(tableEvent.getTable())) {
-      txnHandler = getTxnHandler();
-      txnHandler.cleanupRecords(HiveObjectType.TABLE, null, tableEvent.getTable(), null);
-    }
-  }
-
-  @Override
-  public void onDropPartition(DropPartitionEvent partitionEvent)  throws MetaException {
-    if (TxnUtils.isTransactionalTable(partitionEvent.getTable())) {
-      txnHandler = getTxnHandler();
-      txnHandler.cleanupRecords(HiveObjectType.PARTITION, null, partitionEvent.getTable(),
-          partitionEvent.getPartitionIterator());
-    }
-  }
-
-  @Override
-  public void onAlterTable(AlterTableEvent tableEvent) throws MetaException {
-    if (!TxnUtils.isTransactionalTable(tableEvent.getNewTable())) {
-      return;
-    }
-    Table oldTable = tableEvent.getOldTable();
-    Table newTable = tableEvent.getNewTable();
-    if(!oldTable.getCatName().equalsIgnoreCase(newTable.getCatName()) ||
-        !oldTable.getDbName().equalsIgnoreCase(newTable.getDbName()) ||
-        !oldTable.getTableName().equalsIgnoreCase(newTable.getTableName())) {
-      txnHandler = getTxnHandler();
-      txnHandler.onRename(
-          oldTable.getCatName(), oldTable.getDbName(), oldTable.getTableName(), null,
-          newTable.getCatName(), newTable.getDbName(), newTable.getTableName(), null);
-    }
-  }
-  @Override
-  public void onAlterPartition(AlterPartitionEvent partitionEvent)  throws MetaException {
-    if (!TxnUtils.isTransactionalTable(partitionEvent.getTable())) {
-      return;
-    }
-    Partition oldPart = partitionEvent.getOldPartition();
-    Partition newPart = partitionEvent.getNewPartition();
-    Table t = partitionEvent.getTable();
-    String oldPartName = Warehouse.makePartName(t.getPartitionKeys(), oldPart.getValues());
-    String newPartName = Warehouse.makePartName(t.getPartitionKeys(), newPart.getValues());
-    if(!oldPartName.equals(newPartName)) {
-      txnHandler = getTxnHandler();
-      txnHandler.onRename(t.getCatName(), t.getDbName(), t.getTableName(), oldPartName,
-          t.getCatName(), t.getDbName(), t.getTableName(), newPartName);
-    }
-  }
-  @Override
-  public void onAlterDatabase(AlterDatabaseEvent dbEvent) throws MetaException {
-    Database oldDb = dbEvent.getOldDatabase();
-    Database newDb = dbEvent.getNewDatabase();
-    if(!oldDb.getCatalogName().equalsIgnoreCase(newDb.getCatalogName()) ||
-        !oldDb.getName().equalsIgnoreCase(newDb.getName())) {
-      txnHandler = getTxnHandler();
-      txnHandler.onRename(
-          oldDb.getCatalogName(), oldDb.getName(), null, null,
-          newDb.getCatalogName(), newDb.getName(), null, null);
-    }
-  }
-
-  private TxnStore getTxnHandler() {
-    boolean hackOn = MetastoreConf.getBoolVar(conf, ConfVars.HIVE_IN_TEST) ||
-        MetastoreConf.getBoolVar(conf, ConfVars.HIVE_IN_TEZ_TEST);
-    String origTxnMgr = null;
-    boolean origConcurrency = false;
-
-    // Since TxnUtils.getTxnStore calls TxnHandler.setConf -> checkQFileTestHack -> TxnDbUtil.setConfValues,
-    // which may change the values of below two entries, we need to avoid polluting the original values
-    if (hackOn) {
-      origTxnMgr = MetastoreConf.getVar(conf, ConfVars.HIVE_TXN_MANAGER);
-      origConcurrency = MetastoreConf.getBoolVar(conf, ConfVars.HIVE_SUPPORT_CONCURRENCY);
-    }
-
-    txnHandler = TxnUtils.getTxnStore(conf);
-
-    // Set them back
-    if (hackOn) {
-      MetastoreConf.setVar(conf, ConfVars.HIVE_TXN_MANAGER, origTxnMgr);
-      MetastoreConf.setBoolVar(conf, ConfVars.HIVE_SUPPORT_CONCURRENCY, origConcurrency);
-    }
-
-    return txnHandler;
-  }
-}

http://git-wip-us.apache.org/repos/asf/hive/blob/081fa368/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/AggregateStatsCache.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/AggregateStatsCache.java b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/AggregateStatsCache.java
deleted file mode 100644
index 8e920bb..0000000
--- a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/AggregateStatsCache.java
+++ /dev/null
@@ -1,571 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.hadoop.hive.metastore;
-
-import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
-import org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
-import org.apache.hive.common.util.BloomFilter;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
-public class AggregateStatsCache {
-
-  private static final Logger LOG = LoggerFactory.getLogger(AggregateStatsCache.class.getName());
-  private static AggregateStatsCache self = null;
-
-  // Backing store for this cache
-  private final ConcurrentHashMap<Key, AggrColStatsList> cacheStore;
-  // Cache size
-  private final int maxCacheNodes;
-  // Current nodes in the cache
-  private final AtomicInteger currentNodes = new AtomicInteger(0);
-  // Run the cleaner thread when the cache is maxFull% full
-  private final double maxFull;
-  // Run the cleaner thread until cache is cleanUntil% occupied
-  private final double cleanUntil;
-  // Nodes go stale after this
-  private final long timeToLiveMs;
-  // Max time when waiting for write locks on node list
-  private final long maxWriterWaitTime;
-  // Max time when waiting for read locks on node list
-  private final long maxReaderWaitTime;
-  // Maximum number of paritions aggregated per cache node
-  private final int maxPartsPerCacheNode;
-  // Bloom filter false positive probability
-  private final double falsePositiveProbability;
-  // Max tolerable variance for matches
-  private final double maxVariance;
-  // Used to determine if cleaner thread is already running
-  private boolean isCleaning = false;
-  private final AtomicLong cacheHits = new AtomicLong(0);
-  private final AtomicLong cacheMisses = new AtomicLong(0);
-  // To track cleaner metrics
-  int numRemovedTTL = 0, numRemovedLRU = 0;
-
-  private AggregateStatsCache(int maxCacheNodes, int maxPartsPerCacheNode, long timeToLiveMs,
-      double falsePositiveProbability, double maxVariance, long maxWriterWaitTime,
-      long maxReaderWaitTime, double maxFull, double cleanUntil) {
-    this.maxCacheNodes = maxCacheNodes;
-    this.maxPartsPerCacheNode = maxPartsPerCacheNode;
-    this.timeToLiveMs = timeToLiveMs;
-    this.falsePositiveProbability = falsePositiveProbability;
-    this.maxVariance = maxVariance;
-    this.maxWriterWaitTime = maxWriterWaitTime;
-    this.maxReaderWaitTime = maxReaderWaitTime;
-    this.maxFull = maxFull;
-    this.cleanUntil = cleanUntil;
-    this.cacheStore = new ConcurrentHashMap<>();
-  }
-
-  public static synchronized AggregateStatsCache getInstance(Configuration conf) {
-    if (self == null) {
-      int maxCacheNodes =
-          MetastoreConf.getIntVar(conf, ConfVars.AGGREGATE_STATS_CACHE_SIZE);
-      // The number of partitions aggregated per cache node
-      // If the number of partitions requested is > this value, we'll fetch directly from Metastore
-      int maxPartitionsPerCacheNode =
-          MetastoreConf.getIntVar(conf, ConfVars.AGGREGATE_STATS_CACHE_MAX_PARTITIONS);
-      long timeToLiveMs = MetastoreConf.getTimeVar(conf, ConfVars.AGGREGATE_STATS_CACHE_TTL,
-              TimeUnit.SECONDS)*1000;
-      // False positives probability we are ready to tolerate for the underlying bloom filter
-      double falsePositiveProbability =
-          MetastoreConf.getDoubleVar(conf, ConfVars.AGGREGATE_STATS_CACHE_FPP);
-      // Maximum tolerable variance in number of partitions between cached node and our request
-      double maxVariance =
-          MetastoreConf.getDoubleVar(conf, ConfVars.AGGREGATE_STATS_CACHE_MAX_VARIANCE);
-      long maxWriterWaitTime = MetastoreConf.getTimeVar(conf,
-              ConfVars.AGGREGATE_STATS_CACHE_MAX_WRITER_WAIT, TimeUnit.MILLISECONDS);
-      long maxReaderWaitTime = MetastoreConf.getTimeVar(conf,
-              ConfVars.AGGREGATE_STATS_CACHE_MAX_READER_WAIT, TimeUnit.MILLISECONDS);
-      double maxFull =
-          MetastoreConf.getDoubleVar(conf, ConfVars.AGGREGATE_STATS_CACHE_MAX_FULL);
-      double cleanUntil =
-          MetastoreConf.getDoubleVar(conf, ConfVars.AGGREGATE_STATS_CACHE_CLEAN_UNTIL);
-      self =
-          new AggregateStatsCache(maxCacheNodes, maxPartitionsPerCacheNode, timeToLiveMs,
-              falsePositiveProbability, maxVariance, maxWriterWaitTime, maxReaderWaitTime, maxFull,
-              cleanUntil);
-    }
-    return self;
-  }
-
-  public int getMaxCacheNodes() {
-    return maxCacheNodes;
-  }
-
-  public int getCurrentNodes() {
-    return currentNodes.intValue();
-  }
-
-  public float getFullPercent() {
-    return (currentNodes.intValue() / (float) maxCacheNodes) * 100;
-  }
-
-  public int getMaxPartsPerCacheNode() {
-    return maxPartsPerCacheNode;
-  }
-
-  public double getFalsePositiveProbability() {
-    return falsePositiveProbability;
-  }
-
-  public Float getHitRatio() {
-    if (cacheHits.longValue() + cacheMisses.longValue() > 0) {
-      return (float) (cacheHits.longValue()) / (cacheHits.longValue() + cacheMisses.longValue());
-    }
-    return null;
-  }
-
-  /**
-   * Return aggregate stats for a column from the cache or null.
-   * While reading from the nodelist for a key, we wait maxReaderWaitTime to acquire the lock,
-   * failing which we return a cache miss (i.e. null)
-   * @param catName catalog name
-   * @param dbName database name
-   * @param tblName table name
-   * @param colName column name
-   * @param partNames list of partition names
-   * @return aggregated col stats
-   */
-  public AggrColStats get(String catName, String dbName, String tblName, String colName, List<String> partNames) {
-    // Cache key
-    Key key = new Key(catName, dbName, tblName, colName);
-    AggrColStatsList candidateList = cacheStore.get(key);
-    // No key, or no nodes in candidate list
-    if ((candidateList == null) || (candidateList.nodes.size() == 0)) {
-      LOG.debug("No aggregate stats cached for " + key.toString());
-      return null;
-    }
-    // Find the value object
-    // Update the timestamp of the key,value if value matches the criteria
-    // Return the value
-    AggrColStats match = null;
-    boolean isLocked = false;
-    try {
-      // Try to readlock the candidateList; timeout after maxReaderWaitTime
-      isLocked = candidateList.readLock.tryLock(maxReaderWaitTime, TimeUnit.MILLISECONDS);
-      if (isLocked) {
-        match = findBestMatch(partNames, candidateList.nodes);
-      }
-      if (match != null) {
-        // Ok to not lock the list for this and use a volatile lastAccessTime instead
-        candidateList.updateLastAccessTime();
-        cacheHits.incrementAndGet();
-        LOG.info("Returning aggregate stats from the cache; total hits: " + cacheHits.longValue()
-            + ", total misses: " + cacheMisses.longValue() + ", hit ratio: " + getHitRatio());
-      }
-      else {
-        cacheMisses.incrementAndGet();
-      }
-    } catch (InterruptedException e) {
-      LOG.debug("Interrupted Exception ignored ",e);
-    } finally {
-      if (isLocked) {
-        candidateList.readLock.unlock();
-      }
-    }
-    return match;
-  }
-
-  /**
-   * Find the best match using the configurable error tolerance and time to live value
-   *
-   * @param partNames
-   * @param candidates
-   * @return best matched node or null
-   */
-  private AggrColStats findBestMatch(List<String> partNames, List<AggrColStats> candidates) {
-    // Hits, misses tracked for a candidate node
-    MatchStats matchStats;
-    // MatchStats for each candidate
-    Map<AggrColStats, MatchStats> candidateMatchStats = new HashMap<>();
-    // The final match we intend to return
-    AggrColStats bestMatch = null;
-    // To compare among potentially multiple matches
-    int bestMatchHits = 0;
-    int numPartsRequested = partNames.size();
-    // 1st pass at marking invalid candidates
-    // Checks based on variance and TTL
-    // Note: we're not creating a copy of the list for saving memory
-    for (AggrColStats candidate : candidates) {
-      // Variance check
-      if (Math.abs((candidate.getNumPartsCached() - numPartsRequested) / numPartsRequested)
-          > maxVariance) {
-        continue;
-      }
-      // TTL check
-      if (isExpired(candidate)) {
-        continue;
-      } else {
-        candidateMatchStats.put(candidate, new MatchStats(0, 0));
-      }
-    }
-    // We'll count misses as we iterate
-    int maxMisses = (int) maxVariance * numPartsRequested;
-    for (String partName : partNames) {
-      for (Iterator<Map.Entry<AggrColStats, MatchStats>> iterator = candidateMatchStats.entrySet().iterator(); iterator.hasNext();) {
-        Map.Entry<AggrColStats, MatchStats> entry = iterator.next();
-        AggrColStats candidate = entry.getKey();
-        matchStats = entry.getValue();
-        if (candidate.getBloomFilter().test(partName.getBytes())) {
-          ++matchStats.hits;
-        } else {
-          ++matchStats.misses;
-        }
-        // 2nd pass at removing invalid candidates
-        // If misses so far exceed max tolerable misses
-        if (matchStats.misses > maxMisses) {
-          iterator.remove();
-          continue;
-        }
-        // Check if this is the best match so far
-        if (matchStats.hits > bestMatchHits) {
-          bestMatch = candidate;
-        }
-      }
-    }
-    if (bestMatch != null) {
-      // Update the last access time for this node
-      bestMatch.updateLastAccessTime();
-    }
-    return bestMatch;
-  }
-
-  /**
-   * Add a new node to the cache; may trigger the cleaner thread if the cache is near full capacity.
-   * We'll however add the node even if we temporaily exceed maxCacheNodes, because the cleaner
-   * will eventually create space from expired nodes or by removing LRU nodes.
-   * @param catName catalog name
-   * @param dbName database name
-   * @param tblName table name
-   * @param colName column name
-   * @param numPartsCached
-   * @param colStats
-   * @param bloomFilter
-   */
-  // TODO: make add asynchronous: add shouldn't block the higher level calls
-  public void add(String catName, String dbName, String tblName, String colName, long numPartsCached,
-      ColumnStatisticsObj colStats, BloomFilter bloomFilter) {
-    // If we have no space in the cache, run cleaner thread
-    if (getCurrentNodes() / maxCacheNodes > maxFull) {
-      spawnCleaner();
-    }
-    // Cache key
-    Key key = new Key(catName, dbName, tblName, colName);
-    // Add new node to the cache
-    AggrColStats node = new AggrColStats(numPartsCached, bloomFilter, colStats);
-    AggrColStatsList nodeList;
-    AggrColStatsList newNodeList = new AggrColStatsList();
-    newNodeList.nodes = new ArrayList<>();
-    nodeList = cacheStore.putIfAbsent(key, newNodeList);
-    if (nodeList == null) {
-      nodeList = newNodeList;
-    }
-    boolean isLocked = false;
-    try {
-      isLocked = nodeList.writeLock.tryLock(maxWriterWaitTime, TimeUnit.MILLISECONDS);
-      if (isLocked) {
-        nodeList.nodes.add(node);
-        node.updateLastAccessTime();
-        nodeList.updateLastAccessTime();
-        currentNodes.getAndIncrement();
-      }
-    } catch (InterruptedException e) {
-      LOG.debug("Interrupted Exception ignored ", e);
-    } finally {
-      if (isLocked) {
-        nodeList.writeLock.unlock();
-      }
-    }
-  }
-
-  /**
-   * Cleans the expired nodes or removes LRU nodes of the cache,
-   * until the cache size reduces to cleanUntil% full.
-   */
-  private void spawnCleaner() {
-    // This spawns a separate thread to walk through the cache and removes expired nodes.
-    // Only one cleaner thread should be running at any point.
-    synchronized (this) {
-      if (isCleaning) {
-        return;
-      }
-      isCleaning = true;
-    }
-    Thread cleaner = new Thread("AggregateStatsCache-CleanerThread") {
-      @Override
-      public void run() {
-        numRemovedTTL = 0;
-        numRemovedLRU = 0;
-        long cleanerStartTime = System.currentTimeMillis();
-        LOG.info("AggregateStatsCache is " + getFullPercent() + "% full, with "
-            + getCurrentNodes() + " nodes; starting cleaner thread");
-        try {
-          Iterator<Map.Entry<Key, AggrColStatsList>> mapIterator = cacheStore.entrySet().iterator();
-          while (mapIterator.hasNext()) {
-            Map.Entry<Key, AggrColStatsList> pair =
-                mapIterator.next();
-            AggrColStats node;
-            AggrColStatsList candidateList = pair.getValue();
-            List<AggrColStats> nodes = candidateList.nodes;
-            if (nodes.size() == 0) {
-              mapIterator.remove();
-              continue;
-            }
-            boolean isLocked = false;
-            try {
-              isLocked = candidateList.writeLock.tryLock(maxWriterWaitTime, TimeUnit.MILLISECONDS);
-              if (isLocked) {
-                for (Iterator<AggrColStats> listIterator = nodes.iterator(); listIterator.hasNext();) {
-                  node = listIterator.next();
-                  // Remove the node if it has expired
-                  if (isExpired(node)) {
-                    listIterator.remove();
-                    numRemovedTTL++;
-                    currentNodes.getAndDecrement();
-                  }
-                }
-              }
-            } catch (InterruptedException e) {
-              LOG.debug("Interrupted Exception ignored ",e);
-            } finally {
-              if (isLocked) {
-                candidateList.writeLock.unlock();
-              }
-            }
-            // We want to make sure this runs at a low priority in the background
-            Thread.yield();
-          }
-          // If the expired nodes did not result in cache being cleanUntil% in size,
-          // start removing LRU nodes
-          while (getCurrentNodes() / maxCacheNodes > cleanUntil) {
-            evictOneNode();
-          }
-        } finally {
-          isCleaning = false;
-          LOG.info("Stopping cleaner thread; AggregateStatsCache is now " + getFullPercent()
-              + "% full, with " + getCurrentNodes() + " nodes");
-          LOG.info("Number of expired nodes removed: " + numRemovedTTL);
-          LOG.info("Number of LRU nodes removed: " + numRemovedLRU);
-          LOG.info("Cleaner ran for: " + (System.currentTimeMillis() - cleanerStartTime) + "ms");
-        }
-      }
-    };
-    cleaner.setPriority(Thread.MIN_PRIORITY);
-    cleaner.setDaemon(true);
-    cleaner.start();
-  }
-
-  /**
-   * Evict an LRU node or expired node whichever we find first
-   */
-  private void evictOneNode() {
-    // Get the LRU key, value
-    Key lruKey = null;
-    AggrColStatsList lruValue = null;
-    for (Map.Entry<Key, AggrColStatsList> entry : cacheStore.entrySet()) {
-      Key key = entry.getKey();
-      AggrColStatsList value = entry.getValue();
-      if (lruKey == null) {
-        lruKey = key;
-        lruValue = value;
-        continue;
-      }
-      if ((value.lastAccessTime < lruValue.lastAccessTime) && !(value.nodes.isEmpty())) {
-        lruKey = key;
-        lruValue = value;
-      }
-    }
-    // Now delete a node for this key's list
-    AggrColStatsList candidateList = cacheStore.get(lruKey);
-    boolean isLocked = false;
-    try {
-      isLocked = candidateList.writeLock.tryLock(maxWriterWaitTime, TimeUnit.MILLISECONDS);
-      if (isLocked) {
-        AggrColStats candidate;
-        AggrColStats lruNode = null;
-        int currentIndex = 0;
-        int deleteIndex = 0;
-        for (Iterator<AggrColStats> iterator = candidateList.nodes.iterator(); iterator.hasNext();) {
-          candidate = iterator.next();
-          // Since we have to create space for 1, if we find an expired node we will remove it &
-          // return
-          if (isExpired(candidate)) {
-            iterator.remove();
-            currentNodes.getAndDecrement();
-            numRemovedTTL++;
-            return;
-          }
-          // Sorry, too many ifs but this form looks optimal
-          // Update the LRU node from what we've seen so far
-          if (lruNode == null) {
-            lruNode = candidate;
-            ++currentIndex;
-            continue;
-          }
-          if (lruNode != null) {
-            if (candidate.lastAccessTime < lruNode.lastAccessTime) {
-              lruNode = candidate;
-              deleteIndex = currentIndex;
-            }
-          }
-        }
-        candidateList.nodes.remove(deleteIndex);
-        currentNodes.getAndDecrement();
-        numRemovedLRU++;
-      }
-    } catch (InterruptedException e) {
-      LOG.debug("Interrupted Exception ignored ",e);
-    } finally {
-      if (isLocked) {
-        candidateList.writeLock.unlock();
-      }
-    }
-  }
-
-  private boolean isExpired(AggrColStats aggrColStats) {
-    return (System.currentTimeMillis() - aggrColStats.lastAccessTime) > timeToLiveMs;
-  }
-
-  /**
-   * Key object for the stats cache hashtable
-   */
-  static class Key {
-    private final String catName;
-    private final String dbName;
-    private final String tblName;
-    private final String colName;
-
-    Key(String cat, String db, String table, String col) {
-      // Don't construct an illegal cache key
-      if (cat == null || (db == null) || (table == null) || (col == null)) {
-        throw new IllegalArgumentException("catName, dbName, tblName, colName can't be null");
-      }
-      catName = cat;
-      dbName = db;
-      tblName = table;
-      colName = col;
-    }
-
-    @Override
-    public boolean equals(Object other) {
-      if ((other == null) || !(other instanceof Key)) {
-        return false;
-      }
-      Key that = (Key) other;
-      return catName.equals(that.catName) && dbName.equals(that.dbName) &&
-          tblName.equals(that.tblName) && colName.equals(that.colName);
-    }
-
-    @Override
-    public int hashCode() {
-      return catName.hashCode() * 31 + dbName.hashCode() * 31 + tblName.hashCode() * 31 +
-          colName.hashCode();
-    }
-
-    @Override
-    public String toString() {
-      return "catalog: " + catName + ", database:" + dbName + ", table:" + tblName + ", column:" +
-          colName;
-    }
-
-  }
-
-  static class AggrColStatsList {
-    // TODO: figure out a better data structure for node list(?)
-    private List<AggrColStats> nodes = new ArrayList<>();
-    private final ReadWriteLock lock = new ReentrantReadWriteLock();
-    // Read lock for get operation
-    private final Lock readLock = lock.readLock();
-    // Write lock for add, evict and clean operation
-    private final Lock writeLock = lock.writeLock();
-    // Using volatile instead of locking updates to this variable,
-    // since we can rely on approx lastAccessTime but don't want a performance hit
-    private volatile long lastAccessTime = 0;
-
-    List<AggrColStats> getNodes() {
-      return nodes;
-    }
-
-    void updateLastAccessTime() {
-      this.lastAccessTime = System.currentTimeMillis();
-    }
-  }
-
-  public static class AggrColStats {
-    private final long numPartsCached;
-    private final BloomFilter bloomFilter;
-    private final ColumnStatisticsObj colStats;
-    private volatile long lastAccessTime;
-
-    public AggrColStats(long numPartsCached, BloomFilter bloomFilter,
-        ColumnStatisticsObj colStats) {
-      this.numPartsCached = numPartsCached;
-      this.bloomFilter = bloomFilter;
-      this.colStats = colStats;
-      this.lastAccessTime = System.currentTimeMillis();
-    }
-
-    public long getNumPartsCached() {
-      return numPartsCached;
-    }
-
-    public ColumnStatisticsObj getColStats() {
-      return colStats;
-    }
-
-    public BloomFilter getBloomFilter() {
-      return bloomFilter;
-    }
-
-    void updateLastAccessTime() {
-      this.lastAccessTime = System.currentTimeMillis();
-    }
-  }
-
-  /**
-   * Intermediate object, used to collect hits & misses for each cache node that is evaluate for an
-   * incoming request
-   */
-  private static class MatchStats {
-    private int hits = 0;
-    private int misses = 0;
-
-    MatchStats(int hits, int misses) {
-      this.hits = hits;
-      this.misses = misses;
-    }
-  }
-}