You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by vi...@apache.org on 2018/07/19 19:55:47 UTC
[48/51] [partial] hive git commit: HIVE-20188 : Split server-specific
code outside of standalone metastore-common (Alexander Kolbasov reviewed by
Vihang Karajgaonkar)
http://git-wip-us.apache.org/repos/asf/hive/blob/081fa368/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/common/ndv/hll/HLLDenseRegister.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/common/ndv/hll/HLLDenseRegister.java b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/common/ndv/hll/HLLDenseRegister.java
deleted file mode 100644
index 422bfbe..0000000
--- a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/common/ndv/hll/HLLDenseRegister.java
+++ /dev/null
@@ -1,202 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hive.common.ndv.hll;
-
-import java.util.Arrays;
-
-public class HLLDenseRegister implements HLLRegister {
-
- // 2^p number of bytes for register
- private byte[] register;
-
- // max value stored in registered is cached to determine the bit width for
- // bit packing
- private int maxRegisterValue;
-
- // number of register bits
- private int p;
-
- // m = 2^p
- private int m;
-
- public HLLDenseRegister(int p) {
- this(p, true);
- }
-
- public HLLDenseRegister(int p, boolean bitPack) {
- this.p = p;
- this.m = 1 << p;
- this.register = new byte[m];
- this.maxRegisterValue = 0;
- if (bitPack == false) {
- this.maxRegisterValue = 0xff;
- }
- }
-
- public boolean add(long hashcode) {
-
- // LSB p bits
- final int registerIdx = (int) (hashcode & (m - 1));
-
- // MSB 64 - p bits
- final long w = hashcode >>> p;
-
- // longest run of trailing zeroes
- final int lr = Long.numberOfTrailingZeros(w) + 1;
- return set(registerIdx, (byte) lr);
- }
-
- // this is a lossy invert of the function above, which produces a hashcode
- // which collides with the current winner of the register (we lose all higher
- // bits, but we get all bits useful for lesser p-bit options)
-
- // +-------------|-------------+
- // |xxxx100000000|1000000000000| (lr=9 + idx=1024)
- // +-------------|-------------+
- // \
- // +---------------|-----------+
- // |xxxx10000000010|00000000000| (lr=2 + idx=0)
- // +---------------|-----------+
-
- // This shows the relevant bits of the original hash value
- // and how the conversion is moving bits from the index value
- // over to the leading zero computation
-
- public void extractLowBitsTo(HLLRegister dest) {
- for (int idx = 0; idx < register.length; idx++) {
- byte lr = register[idx]; // this can be a max of 65, never > 127
- if (lr != 0) {
- dest.add((long) ((1 << (p + lr - 1)) | idx));
- }
- }
- }
-
- public boolean set(int idx, byte value) {
- boolean updated = false;
- if (idx < register.length && value > register[idx]) {
-
- // update max register value
- if (value > maxRegisterValue) {
- maxRegisterValue = value;
- }
-
- // set register value and compute inverse pow of 2 for register value
- register[idx] = value;
-
- updated = true;
- }
- return updated;
- }
-
- public int size() {
- return register.length;
- }
-
- public int getNumZeroes() {
- int numZeroes = 0;
- for (byte b : register) {
- if (b == 0) {
- numZeroes++;
- }
- }
- return numZeroes;
- }
-
- public void merge(HLLRegister hllRegister) {
- if (hllRegister instanceof HLLDenseRegister) {
- HLLDenseRegister hdr = (HLLDenseRegister) hllRegister;
- byte[] inRegister = hdr.getRegister();
-
- // merge only if the register length matches
- if (register.length != inRegister.length) {
- throw new IllegalArgumentException(
- "The size of register sets of HyperLogLogs to be merged does not match.");
- }
-
- // compare register values and store the max register value
- for (int i = 0; i < inRegister.length; i++) {
- final byte cb = register[i];
- final byte ob = inRegister[i];
- register[i] = ob > cb ? ob : cb;
- }
-
- // update max register value
- if (hdr.getMaxRegisterValue() > maxRegisterValue) {
- maxRegisterValue = hdr.getMaxRegisterValue();
- }
- } else {
- throw new IllegalArgumentException("Specified register is not instance of HLLDenseRegister");
- }
- }
-
- public byte[] getRegister() {
- return register;
- }
-
- public void setRegister(byte[] register) {
- this.register = register;
- }
-
- public int getMaxRegisterValue() {
- return maxRegisterValue;
- }
-
- public double getSumInversePow2() {
- double sum = 0;
- for (byte b : register) {
- sum += HLLConstants.inversePow2Data[b];
- }
- return sum;
- }
-
- @Override
- public String toString() {
- StringBuilder sb = new StringBuilder();
- sb.append("HLLDenseRegister - ");
- sb.append("p: ");
- sb.append(p);
- sb.append(" numZeroes: ");
- sb.append(getNumZeroes());
- sb.append(" maxRegisterValue: ");
- sb.append(maxRegisterValue);
- return sb.toString();
- }
-
- public String toExtendedString() {
- return toString() + " register: " + Arrays.toString(register);
- }
-
- @Override
- public boolean equals(Object obj) {
- if (!(obj instanceof HLLDenseRegister)) {
- return false;
- }
- HLLDenseRegister other = (HLLDenseRegister) obj;
- return maxRegisterValue == other.maxRegisterValue && Arrays.equals(register, other.register);
- }
-
- @Override
- public int hashCode() {
- int hashcode = 0;
- hashcode += 31 * maxRegisterValue;
- hashcode += Arrays.hashCode(register);
- return hashcode;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/hive/blob/081fa368/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/common/ndv/hll/HLLRegister.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/common/ndv/hll/HLLRegister.java b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/common/ndv/hll/HLLRegister.java
deleted file mode 100644
index a90094d..0000000
--- a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/common/ndv/hll/HLLRegister.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hive.common.ndv.hll;
-
-public interface HLLRegister {
-
- /**
- * Specify a hashcode to add to hyperloglog register.
- * @param hashcode
- * - hashcode to add
- * @return true if register value is updated else false
- */
- boolean add(long hashcode);
-
- /**
- * Instead of specifying hashcode, this interface can be used to directly
- * specify the register index and register value. This interface is useful
- * when reconstructing hyperloglog from a serialized representation where its
- * not possible to regenerate the hashcode.
- * @param idx
- * - register index
- * @param value
- * - register value
- * @return true if register value is updated else false
- */
- boolean set(int idx, byte value);
-
- /**
- * Merge hyperloglog registers of the same type (SPARSE or DENSE register)
- * @param reg
- * - register to be merged
- */
- void merge(HLLRegister reg);
-}
http://git-wip-us.apache.org/repos/asf/hive/blob/081fa368/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/common/ndv/hll/HLLSparseRegister.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/common/ndv/hll/HLLSparseRegister.java b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/common/ndv/hll/HLLSparseRegister.java
deleted file mode 100644
index d5ac54a..0000000
--- a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/common/ndv/hll/HLLSparseRegister.java
+++ /dev/null
@@ -1,261 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hive.common.ndv.hll;
-
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.TreeMap;
-
-public class HLLSparseRegister implements HLLRegister {
-
- private TreeMap<Integer,Byte> sparseMap;
-
- // for a better insertion performance values are added to temporary unsorted
- // list which will be merged to sparse map after a threshold
- private int[] tempList;
- private int tempListIdx;
-
- // number of register bits
- private final int p;
-
- // new number of register bits for higher accuracy
- private final int pPrime;
-
- // number of bits to store the number of zero runs
- private final int qPrime;
-
- // masks for quicker extraction of p, pPrime, qPrime values
- private final int mask;
- private final int pPrimeMask;
- private final int qPrimeMask;
-
- public HLLSparseRegister(int p, int pp, int qp) {
- this.p = p;
- this.sparseMap = new TreeMap<>();
- this.tempList = new int[HLLConstants.TEMP_LIST_DEFAULT_SIZE];
- this.tempListIdx = 0;
- this.pPrime = pp;
- this.qPrime = qp;
- this.mask = ((1 << pPrime) - 1) ^ ((1 << p) - 1);
- this.pPrimeMask = ((1 << pPrime) - 1);
- this.qPrimeMask = (1 << qPrime) - 1;
- }
-
- public boolean add(long hashcode) {
- boolean updated = false;
-
- // fill the temp list before merging to sparse map
- if (tempListIdx < tempList.length) {
- int encodedHash = encodeHash(hashcode);
- tempList[tempListIdx++] = encodedHash;
- updated = true;
- } else {
- updated = mergeTempListToSparseMap();
- }
-
- return updated;
- }
-
- /**
- * Adds temp list to sparse map. The key for sparse map entry is the register
- * index determined by pPrime and value is the number of trailing zeroes.
- * @return
- */
- private boolean mergeTempListToSparseMap() {
- boolean updated = false;
- for (int i = 0; i < tempListIdx; i++) {
- int encodedHash = tempList[i];
- int key = encodedHash & pPrimeMask;
- byte value = (byte) (encodedHash >>> pPrime);
- byte nr = 0;
- // if MSB is set to 1 then next qPrime MSB bits contains the value of
- // number of zeroes.
- // if MSB is set to 0 then number of zeroes is contained within pPrime - p
- // bits.
- if (encodedHash < 0) {
- nr = (byte) (value & qPrimeMask);
- } else {
- nr = (byte) (Integer.numberOfTrailingZeros(encodedHash >>> p) + 1);
- }
- updated = set(key, nr);
- }
-
- // reset temp list index
- tempListIdx = 0;
- return updated;
- }
-
- /**
- * <pre>
- * <b>Input:</b> 64 bit hashcode
- *
- * |---------w-------------| |------------p'----------|
- * 10101101.......1010101010 10101010101 01010101010101
- * |------p-----|
- *
- * <b>Output:</b> 32 bit int
- *
- * |b| |-q'-| |------------p'----------|
- * 1 010101 01010101010 10101010101010
- * |------p-----|
- *
- *
- * The default values of p', q' and b are 25, 6, 1 (total 32 bits) respectively.
- * This function will return an int encoded in the following format
- *
- * p - LSB p bits represent the register index
- * p' - LSB p' bits are used for increased accuracy in estimation
- * q' - q' bits after p' are left as such from the hashcode if b = 0 else
- * q' bits encodes the longest trailing zero runs from in (w-p) input bits
- * b - 0 if longest trailing zero run is contained within (p'-p) bits
- * 1 if longest trailing zero run is computeed from (w-p) input bits and
- * its value is stored in q' bits
- * </pre>
- * @param hashcode
- * @return
- */
- public int encodeHash(long hashcode) {
- // x = p' - p
- int x = (int) (hashcode & mask);
- if (x == 0) {
- // more bits should be considered for finding q (longest zero runs)
- // set MSB to 1
- int ntr = Long.numberOfTrailingZeros(hashcode >> p) + 1;
- long newHashCode = hashcode & pPrimeMask;
- newHashCode |= ntr << pPrime;
- newHashCode |= 0x80000000;
- return (int) newHashCode;
- } else {
- // q is contained within p' - p
- // set MSB to 0
- return (int) (hashcode & 0x7FFFFFFF);
- }
- }
-
- public int getSize() {
- return sparseMap.size() + tempListIdx;
- }
-
- public void merge(HLLRegister hllRegister) {
- if (hllRegister instanceof HLLSparseRegister) {
- HLLSparseRegister hsr = (HLLSparseRegister) hllRegister;
-
- // retain only the largest value for a register index
- for (Map.Entry<Integer, Byte> entry : hsr.getSparseMap().entrySet()) {
- int key = entry.getKey();
- byte value = entry.getValue();
- set(key, value);
- }
- } else {
- throw new IllegalArgumentException("Specified register not instance of HLLSparseRegister");
- }
- }
-
- public boolean set(int key, byte value) {
- // retain only the largest value for a register index
- Byte containedValue = sparseMap.get(key);
- if (containedValue == null || value > containedValue) {
- sparseMap.put(key, value);
- return true;
- }
- return false;
- }
-
- public TreeMap<Integer,Byte> getSparseMap() {
- return getMergedSparseMap();
- }
-
- private TreeMap<Integer,Byte> getMergedSparseMap() {
- if (tempListIdx != 0) {
- mergeTempListToSparseMap();
- }
- return sparseMap;
- }
-
- // this is effectively the same as the dense register impl.
- public void extractLowBitsTo(HLLRegister dest) {
- for (Entry<Integer, Byte> entry : getSparseMap().entrySet()) {
- int idx = entry.getKey();
- byte lr = entry.getValue(); // this can be a max of 65, never > 127
- if (lr != 0) {
- // should be a no-op for sparse
- dest.add((long) ((1 << (p + lr - 1)) | idx));
- }
- }
- }
-
- public int getP() {
- return p;
- }
-
- public int getPPrime() {
- return pPrime;
- }
-
- @Override
- public String toString() {
- StringBuilder sb = new StringBuilder();
- sb.append("HLLSparseRegister - ");
- sb.append("p: ");
- sb.append(p);
- sb.append(" pPrime: ");
- sb.append(pPrime);
- sb.append(" qPrime: ");
- sb.append(qPrime);
- return sb.toString();
- }
-
- public String toExtendedString() {
- return toString() + " register: " + sparseMap.toString();
- }
-
- @Override
- public boolean equals(Object obj) {
- if (!(obj instanceof HLLSparseRegister)) {
- return false;
- }
- HLLSparseRegister other = (HLLSparseRegister) obj;
- boolean result = p == other.p && pPrime == other.pPrime && qPrime == other.qPrime
- && tempListIdx == other.tempListIdx;
- if (result) {
- for (int i = 0; i < tempListIdx; i++) {
- if (tempList[i] != other.tempList[i]) {
- return false;
- }
- }
-
- result = result && sparseMap.equals(other.sparseMap);
- }
- return result;
- }
-
- @Override
- public int hashCode() {
- int hashcode = 0;
- hashcode += 31 * p;
- hashcode += 31 * pPrime;
- hashcode += 31 * qPrime;
- for (int i = 0; i < tempListIdx; i++) {
- hashcode += 31 * tempList[tempListIdx];
- }
- hashcode += sparseMap.hashCode();
- return hashcode;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/hive/blob/081fa368/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/common/ndv/hll/HyperLogLog.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/common/ndv/hll/HyperLogLog.java b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/common/ndv/hll/HyperLogLog.java
deleted file mode 100644
index 91a6865..0000000
--- a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/common/ndv/hll/HyperLogLog.java
+++ /dev/null
@@ -1,664 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hive.common.ndv.hll;
-
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.charset.Charset;
-import java.util.Map;
-import java.util.TreeMap;
-
-import org.apache.hadoop.hive.common.ndv.NumDistinctValueEstimator;
-import org.apache.hadoop.hive.common.type.HiveDecimal;
-import org.apache.hadoop.hive.ql.util.JavaDataModel;
-import org.apache.hive.common.util.Murmur3;
-
-/**
- * <pre>
- * This is an implementation of the following variants of hyperloglog (HLL)
- * algorithm
- * Original - Original HLL algorithm from Flajolet et. al from
- * http://algo.inria.fr/flajolet/Publications/FlFuGaMe07.pdf
- * HLLNoBias - Google's implementation of bias correction based on lookup table
- * http://static.googleusercontent.com/media/research.google.com/en//pubs/archive/40671.pdf
- * HLL++ - Google's implementation of HLL++ algorithm that uses SPARSE registers
- * http://static.googleusercontent.com/media/research.google.com/en//pubs/archive/40671.pdf
- *
- * Following are the constructor parameters that determines which algorithm is
- * used
- * <b>numRegisterIndexBits</b> - number of LSB hashcode bits to be used as register index.
- * <i>Default is 14</i>. min = 4 and max = 16
- * <b>numHashBits</b> - number of bits for hashcode. <i>Default is 64</i>. min = 32 and max = 128
- * <b>encoding</b> - Type of encoding to use (SPARSE or DENSE). The algorithm automatically
- * switches to DENSE beyond a threshold. <i>Default: SPARSE</i>
- * <b>enableBitPacking</b> - To enable bit packing or not. Bit packing improves compression
- * at the cost of more CPU cycles. <i>Default: true</i>
- * <b>noBias</b> - Use Google's bias table lookup for short range bias correction.
- * Enabling this will highly improve the estimation accuracy for short
- * range values. <i>Default: true</i>
- *
- * </pre>
- */
-public class HyperLogLog implements NumDistinctValueEstimator {
- private final static int DEFAULT_HASH_BITS = 64;
- private final static long HASH64_ZERO = Murmur3.hash64(new byte[] {0});
- private final static long HASH64_ONE = Murmur3.hash64(new byte[] {1});
-
- public enum EncodingType {
- SPARSE, DENSE
- }
-
- // number of bits to address registers
- private final int p;
-
- // number of registers - 2^p
- private final int m;
-
- // refer paper
- private float alphaMM;
-
- // enable/disable bias correction using table lookup
- private final boolean noBias;
-
- // enable/disable bitpacking
- private final boolean bitPacking;
-
- // Not making it configurable for perf reasons (avoid checks)
- private final int chosenHashBits = DEFAULT_HASH_BITS;
-
- private HLLDenseRegister denseRegister;
- private HLLSparseRegister sparseRegister;
-
- // counts are cached to avoid repeated complex computation. If register value
- // is updated the count will be computed again.
- private long cachedCount;
- private boolean invalidateCount;
-
- private EncodingType encoding;
-
- // threshold to switch from SPARSE to DENSE encoding
- private int encodingSwitchThreshold;
-
- private HyperLogLog(HyperLogLogBuilder hllBuilder) {
- if (hllBuilder.numRegisterIndexBits < HLLConstants.MIN_P_VALUE
- || hllBuilder.numRegisterIndexBits > HLLConstants.MAX_P_VALUE) {
- throw new IllegalArgumentException("p value should be between " + HLLConstants.MIN_P_VALUE
- + " to " + HLLConstants.MAX_P_VALUE);
- }
- this.p = hllBuilder.numRegisterIndexBits;
- this.m = 1 << p;
- this.noBias = hllBuilder.noBias;
- this.bitPacking = hllBuilder.bitPacking;
-
- // the threshold should be less than 12K bytes for p = 14.
- // The reason to divide by 5 is, in sparse mode after serialization the
- // entriesin sparse map are compressed, and delta encoded as varints. The
- // worst case size of varints are 5 bytes. Hence, 12K/5 ~= 2400 entries in
- // sparse map.
- if (bitPacking) {
- this.encodingSwitchThreshold = ((m * 6) / 8) / 5;
- } else {
- // if bitpacking is disabled, all register values takes 8 bits and hence
- // we can be more flexible with the threshold. For p=14, 16K/5 = 3200
- // entries in sparse map can be allowed.
- this.encodingSwitchThreshold = m / 3;
- }
-
- // initializeAlpha(DEFAULT_HASH_BITS);
- // alphaMM value for 128 bits hash seems to perform better for default 64 hash bits
- this.alphaMM = 0.7213f / (1 + 1.079f / m);
- // For efficiency alpha is multiplied by m^2
- this.alphaMM = this.alphaMM * m * m;
-
- this.cachedCount = -1;
- this.invalidateCount = false;
- this.encoding = hllBuilder.encoding;
- if (encoding.equals(EncodingType.SPARSE)) {
- this.sparseRegister = new HLLSparseRegister(p, HLLConstants.P_PRIME_VALUE,
- HLLConstants.Q_PRIME_VALUE);
- this.denseRegister = null;
- } else {
- this.sparseRegister = null;
- this.denseRegister = new HLLDenseRegister(p, bitPacking);
- }
- }
-
- public static HyperLogLogBuilder builder() {
- return new HyperLogLogBuilder();
- }
-
- public static class HyperLogLogBuilder {
- private int numRegisterIndexBits = 14;
- private EncodingType encoding = EncodingType.SPARSE;
- private boolean bitPacking = true;
- private boolean noBias = true;
-
- public HyperLogLogBuilder() {
- }
-
- public HyperLogLogBuilder setNumRegisterIndexBits(int b) {
- this.numRegisterIndexBits = b;
- return this;
- }
-
- public HyperLogLogBuilder setSizeOptimized() {
- // allowing this to be increased via config breaks the merge impl
- // p=10 = ~1kb per vector or smaller
- this.numRegisterIndexBits = 10;
- return this;
- }
-
- public HyperLogLogBuilder setEncoding(EncodingType enc) {
- this.encoding = enc;
- return this;
- }
-
- public HyperLogLogBuilder enableBitPacking(boolean b) {
- this.bitPacking = b;
- return this;
- }
-
- public HyperLogLogBuilder enableNoBias(boolean nb) {
- this.noBias = nb;
- return this;
- }
-
- public HyperLogLog build() {
- return new HyperLogLog(this);
- }
- }
-
- // see paper for alpha initialization.
- private void initializeAlpha(final int hashBits) {
- if (hashBits <= 16) {
- alphaMM = 0.673f;
- } else if (hashBits <= 32) {
- alphaMM = 0.697f;
- } else if (hashBits <= 64) {
- alphaMM = 0.709f;
- } else {
- alphaMM = 0.7213f / (float) (1 + 1.079f / m);
- }
-
- // For efficiency alpha is multiplied by m^2
- alphaMM = alphaMM * m * m;
- }
-
- public void addBoolean(boolean val) {
- add(val ? HASH64_ONE : HASH64_ZERO);
- }
-
- public void addByte(byte val) {
- add(Murmur3.hash64(new byte[] {val}));
- }
-
- public void addBytes(byte[] val) {
- add(Murmur3.hash64(val));
- }
-
- public void addShort(short val) {
- add(Murmur3.hash64(val));
- }
-
- public void addInt(int val) {
- add(Murmur3.hash64(val));
- }
-
- public void addLong(long val) {
- add(Murmur3.hash64(val));
- }
-
- public void addFloat(float val) {
- add(Murmur3.hash64(Float.floatToIntBits(val)));
- }
-
- public void addDouble(double val) {
- add(Murmur3.hash64(Double.doubleToLongBits(val)));
- }
-
- public void addChar(char val) {
- add(Murmur3.hash64((short)val));
- }
-
- /**
- * Java's default charset will be used for strings.
- * @param val
- * - input string
- */
- public void addString(String val) {
- add(Murmur3.hash64(val.getBytes()));
- }
-
- public void addString(String val, Charset charset) {
- add(Murmur3.hash64(val.getBytes(charset)));
- }
-
- public void add(long hashcode) {
- if (encoding.equals(EncodingType.SPARSE)) {
- if (sparseRegister.add(hashcode)) {
- invalidateCount = true;
- }
-
- // if size of sparse map excess the threshold convert the sparse map to
- // dense register and switch to DENSE encoding
- if (sparseRegister.getSize() > encodingSwitchThreshold) {
- encoding = EncodingType.DENSE;
- denseRegister = sparseToDenseRegister(sparseRegister);
- sparseRegister = null;
- invalidateCount = true;
- }
- } else {
- if (denseRegister.add(hashcode)) {
- invalidateCount = true;
- }
- }
- }
-
- public long estimateNumDistinctValues() {
- // FMSketch treats the ndv of all nulls as 1 but hll treates the ndv as 0.
- // In order to get rid of divide by 0 problem, we follow FMSketch
- return count() > 0 ? count() : 1;
- }
-
- public long count() {
- // compute count only if the register values are updated else return the
- // cached count
- if (invalidateCount || cachedCount < 0) {
- if (encoding.equals(EncodingType.SPARSE)) {
-
- // if encoding is still SPARSE use linear counting with increase
- // accuracy (as we use pPrime bits for register index)
- int mPrime = 1 << sparseRegister.getPPrime();
- cachedCount = linearCount(mPrime, mPrime - sparseRegister.getSparseMap().size());
- } else {
-
- // for DENSE encoding, use bias table lookup for HLLNoBias algorithm
- // else fallback to HLLOriginal algorithm
- double sum = denseRegister.getSumInversePow2();
- long numZeros = denseRegister.getNumZeroes();
-
- // cardinality estimate from normalized bias corrected harmonic mean on
- // the registers
- cachedCount = (long) (alphaMM * (1.0 / sum));
- long pow = (long) Math.pow(2, chosenHashBits);
-
- // when bias correction is enabled
- if (noBias) {
- cachedCount = cachedCount <= 5 * m ? (cachedCount - estimateBias(cachedCount))
- : cachedCount;
- long h = cachedCount;
- if (numZeros != 0) {
- h = linearCount(m, numZeros);
- }
-
- if (h < getThreshold()) {
- cachedCount = h;
- }
- } else {
- // HLL algorithm shows stronger bias for values in (2.5 * m) range.
- // To compensate for this short range bias, linear counting is used
- // for values before this short range. The original paper also says
- // similar bias is seen for long range values due to hash collisions
- // in range >1/30*(2^32). For the default case, we do not have to
- // worry about this long range bias as the paper used 32-bit hashing
- // and we use 64-bit hashing as default. 2^64 values are too high to
- // observe long range bias (hash collisions).
- if (cachedCount <= 2.5 * m) {
-
- // for short range use linear counting
- if (numZeros != 0) {
- cachedCount = linearCount(m, numZeros);
- }
- } else if (chosenHashBits < 64 && cachedCount > (0.033333 * pow)) {
-
- // long range bias for 32-bit hashcodes
- if (cachedCount > (1 / 30) * pow) {
- cachedCount = (long) (-pow * Math.log(1.0 - (double) cachedCount / (double) pow));
- }
- }
- }
- }
- invalidateCount = false;
- }
-
- return cachedCount;
- }
-
- private long getThreshold() {
- return (long) (HLLConstants.thresholdData[p - 4] + 0.5);
- }
-
- /**
- * Estimate bias from lookup table
- * @param count
- * - cardinality before bias correction
- * @return cardinality after bias correction
- */
- private long estimateBias(long count) {
- double[] rawEstForP = HLLConstants.rawEstimateData[p - 4];
-
- // compute distance and store it in sorted map
- TreeMap<Double,Integer> estIndexMap = new TreeMap<>();
- double distance = 0;
- for (int i = 0; i < rawEstForP.length; i++) {
- distance = Math.pow(count - rawEstForP[i], 2);
- estIndexMap.put(distance, i);
- }
-
- // take top-k closest neighbors and compute the bias corrected cardinality
- long result = 0;
- double[] biasForP = HLLConstants.biasData[p - 4];
- double biasSum = 0;
- int kNeighbors = HLLConstants.K_NEAREST_NEIGHBOR;
- for (Map.Entry<Double, Integer> entry : estIndexMap.entrySet()) {
- biasSum += biasForP[entry.getValue()];
- kNeighbors--;
- if (kNeighbors <= 0) {
- break;
- }
- }
-
- // 0.5 added for rounding off
- result = (long) ((biasSum / HLLConstants.K_NEAREST_NEIGHBOR) + 0.5);
- return result;
- }
-
- public void setCount(long count) {
- this.cachedCount = count;
- this.invalidateCount = true;
- }
-
- private long linearCount(int mVal, long numZeros) {
- return (long) (Math.round(mVal * Math.log(mVal / ((double) numZeros))));
- }
-
- // refer paper
- public double getStandardError() {
- return 1.04 / Math.sqrt(m);
- }
-
- public HLLDenseRegister getHLLDenseRegister() {
- return denseRegister;
- }
-
- public HLLSparseRegister getHLLSparseRegister() {
- return sparseRegister;
- }
-
- /**
- * Reconstruct sparse map from serialized integer list
- * @param reg
- * - uncompressed and delta decoded integer list
- */
- public void setHLLSparseRegister(int[] reg) {
- for (int i : reg) {
- int key = i >>> HLLConstants.Q_PRIME_VALUE;
- byte value = (byte) (i & 0x3f);
- sparseRegister.set(key, value);
- }
- }
-
- /**
- * Reconstruct dense registers from byte array
- * @param reg
- * - unpacked byte array
- */
- public void setHLLDenseRegister(byte[] reg) {
- int i = 0;
- for (byte b : reg) {
- denseRegister.set(i, b);
- i++;
- }
- }
-
- /**
- * Merge the specified hyperloglog to the current one. Encoding switches
- * automatically after merge if the encoding switch threshold is exceeded.
- * @param hll
- * - hyperloglog to be merged
- * @throws IllegalArgumentException
- */
- public void merge(HyperLogLog hll) {
- if (chosenHashBits != hll.chosenHashBits) {
- throw new IllegalArgumentException(
- "HyperLogLog cannot be merged as either p or hashbits are different. Current: "
- + toString() + " Provided: " + hll.toString());
- }
-
- if (p > hll.p) {
- throw new IllegalArgumentException(
- "HyperLogLog cannot merge a smaller p into a larger one : "
- + toString() + " Provided: " + hll.toString());
- }
-
- if (p != hll.p) {
- // invariant: p > hll.p
- hll = hll.squash(p);
- }
-
- EncodingType otherEncoding = hll.getEncoding();
-
- if (encoding.equals(EncodingType.SPARSE) && otherEncoding.equals(EncodingType.SPARSE)) {
- sparseRegister.merge(hll.getHLLSparseRegister());
- // if after merge the sparse switching threshold is exceeded then change
- // to dense encoding
- if (sparseRegister.getSize() > encodingSwitchThreshold) {
- encoding = EncodingType.DENSE;
- denseRegister = sparseToDenseRegister(sparseRegister);
- sparseRegister = null;
- }
- } else if (encoding.equals(EncodingType.DENSE) && otherEncoding.equals(EncodingType.DENSE)) {
- denseRegister.merge(hll.getHLLDenseRegister());
- } else if (encoding.equals(EncodingType.SPARSE) && otherEncoding.equals(EncodingType.DENSE)) {
- denseRegister = sparseToDenseRegister(sparseRegister);
- denseRegister.merge(hll.getHLLDenseRegister());
- sparseRegister = null;
- encoding = EncodingType.DENSE;
- } else if (encoding.equals(EncodingType.DENSE) && otherEncoding.equals(EncodingType.SPARSE)) {
- HLLDenseRegister otherDenseRegister = sparseToDenseRegister(hll.getHLLSparseRegister());
- denseRegister.merge(otherDenseRegister);
- }
-
- invalidateCount = true;
- }
-
- /**
- * Reduces the accuracy of the HLL provided to a smaller size
- * @param p0
- * - new p size for the new HyperLogLog (smaller or no change)
- * @return reduced (or same) HyperLogLog instance
- */
- public HyperLogLog squash(final int p0) {
- if (p0 > p) {
- throw new IllegalArgumentException(
- "HyperLogLog cannot be be squashed to be bigger. Current: "
- + toString() + " Provided: " + p0);
- }
-
- if (p0 == p) {
- return this;
- }
-
- final HyperLogLog hll = new HyperLogLogBuilder()
- .setNumRegisterIndexBits(p0).setEncoding(EncodingType.DENSE)
- .enableNoBias(noBias).build();
- final HLLDenseRegister result = hll.denseRegister;
-
- if (encoding == EncodingType.SPARSE) {
- sparseRegister.extractLowBitsTo(result);
- } else if (encoding == EncodingType.DENSE) {
- denseRegister.extractLowBitsTo(result);
- }
- return hll;
- }
-
- /**
- * Converts sparse to dense hll register.
- * @param sparseRegister
- * - sparse register to be converted
- * @return converted dense register
- */
- private HLLDenseRegister sparseToDenseRegister(HLLSparseRegister sparseRegister) {
- if (sparseRegister == null) {
- return null;
- }
- int p = sparseRegister.getP();
- int pMask = (1 << p) - 1;
- HLLDenseRegister result = new HLLDenseRegister(p, bitPacking);
- for (Map.Entry<Integer, Byte> entry : sparseRegister.getSparseMap().entrySet()) {
- int key = entry.getKey();
- int idx = key & pMask;
- result.set(idx, entry.getValue());
- }
- return result;
- }
-
- @Override
- public String toString() {
- StringBuilder sb = new StringBuilder();
- sb.append("Encoding: ");
- sb.append(encoding);
- sb.append(", p: ");
- sb.append(p);
- sb.append(", estimatedCardinality: ");
- sb.append(estimateNumDistinctValues());
- return sb.toString();
- }
-
- public String toStringExtended() {
- if (encoding.equals(EncodingType.DENSE)) {
- return toString() + ", " + denseRegister.toExtendedString();
- } else if (encoding.equals(EncodingType.SPARSE)) {
- return toString() + ", " + sparseRegister.toExtendedString();
- }
-
- return toString();
- }
-
- public int getNumRegisterIndexBits() {
- return p;
- }
-
- public EncodingType getEncoding() {
- return encoding;
- }
-
- public void setEncoding(EncodingType encoding) {
- this.encoding = encoding;
- }
-
- @Override
- public boolean equals(Object obj) {
- if (!(obj instanceof HyperLogLog)) {
- return false;
- }
-
- HyperLogLog other = (HyperLogLog) obj;
- long count = estimateNumDistinctValues();
- long otherCount = other.estimateNumDistinctValues();
- boolean result = p == other.p && chosenHashBits == other.chosenHashBits
- && encoding.equals(other.encoding) && count == otherCount;
- if (encoding.equals(EncodingType.DENSE)) {
- result = result && denseRegister.equals(other.getHLLDenseRegister());
- }
-
- if (encoding.equals(EncodingType.SPARSE)) {
- result = result && sparseRegister.equals(other.getHLLSparseRegister());
- }
- return result;
- }
-
- @Override
- public int hashCode() {
- int hashcode = 0;
- hashcode += 31 * p;
- hashcode += 31 * chosenHashBits;
- hashcode += encoding.hashCode();
- hashcode += 31 * estimateNumDistinctValues();
- if (encoding.equals(EncodingType.DENSE)) {
- hashcode += 31 * denseRegister.hashCode();
- }
-
- if (encoding.equals(EncodingType.SPARSE)) {
- hashcode += 31 * sparseRegister.hashCode();
- }
- return hashcode;
- }
-
- @Override
- public void reset() {
- }
-
- @Override
- public byte[] serialize() {
- ByteArrayOutputStream bos = new ByteArrayOutputStream();
- // write bytes to bos ...
- try {
- HyperLogLogUtils.serializeHLL(bos, this);
- byte[] result = bos.toByteArray();
- bos.close();
- return result;
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- }
-
- @Override
- public NumDistinctValueEstimator deserialize(byte[] buf) {
- return HyperLogLogUtils.deserializeHLL(buf);
- }
-
- @Override
- public void addToEstimator(long v) {
- addLong(v);
- }
-
- @Override
- public void addToEstimator(String s) {
- addString(s);
- }
-
- @Override
- public void addToEstimator(double d) {
- addDouble(d);
- }
-
- @Override
- public void addToEstimator(HiveDecimal decimal) {
- addDouble(decimal.doubleValue());
- }
-
- @Override
- public void mergeEstimators(NumDistinctValueEstimator o) {
- merge((HyperLogLog) o);
- }
-
- @Override
- public int lengthFor(JavaDataModel model) {
- // 5 is the head, 1<<p means the number of bytes for register
- return (5 + (1 << p));
- }
-
- @Override
- public boolean canMerge(NumDistinctValueEstimator o) {
- return o instanceof HyperLogLog;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/hive/blob/081fa368/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/common/ndv/hll/HyperLogLogUtils.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/common/ndv/hll/HyperLogLogUtils.java b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/common/ndv/hll/HyperLogLogUtils.java
deleted file mode 100644
index aeba2e9..0000000
--- a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/common/ndv/hll/HyperLogLogUtils.java
+++ /dev/null
@@ -1,409 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hive.common.ndv.hll;
-
-import java.io.ByteArrayInputStream;
-import java.io.EOFException;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.util.Arrays;
-import java.util.Map;
-import java.util.TreeMap;
-
-import org.apache.hadoop.hive.common.ndv.hll.HyperLogLog.EncodingType;
-
-/**
- * HyperLogLog serialization utilities.
- */
-public class HyperLogLogUtils {
-
- public static final byte[] MAGIC = new byte[] { 'H', 'L', 'L' };
-
- /**
- * HyperLogLog is serialized using the following format
- *
- * <pre>
- * |-4 byte-|------varlong----|varint (optional)|----------|
- * ---------------------------------------------------------
- * | header | estimated-count | register-length | register |
- * ---------------------------------------------------------
- *
- * <b>4 byte header</b> is encoded like below
- * 3 bytes - HLL magic string to identify serialized stream
- * 4 bits - p (number of bits to be used as register index)
- * 1 - spare bit (not used)
- * 3 bits - encoding (000 - sparse, 001..110 - n bit packing, 111 - no bit packing)
- *
- * Followed by header are 3 fields that are required for reconstruction
- * of hyperloglog
- * Estimated count - variable length long to store last computed estimated count.
- * This is just for quick lookup without deserializing registers
- * Register length - number of entries in the register (required only for
- * for sparse representation. For bit-packing, the register
- * length can be found from p)
- * </pre>
- * @param out
- * - output stream to write to
- * @param hll
- * - hyperloglog that needs to be serialized
- * @throws IOException
- */
- public static void serializeHLL(OutputStream out, HyperLogLog hll) throws IOException {
-
- // write header
- out.write(MAGIC);
- int fourthByte = 0;
- int p = hll.getNumRegisterIndexBits();
- fourthByte = (p & 0xff) << 4;
-
- int bitWidth = 0;
- EncodingType enc = hll.getEncoding();
-
- // determine bit width for bitpacking and encode it in header
- if (enc.equals(EncodingType.DENSE)) {
- int lzr = hll.getHLLDenseRegister().getMaxRegisterValue();
- bitWidth = getBitWidth(lzr);
-
- // the max value of number of zeroes for 64 bit hash can be encoded using
- // only 6 bits. So we will disable bit packing for any values >6
- if (bitWidth > 6) {
- fourthByte |= 7;
- bitWidth = 8;
- } else {
- fourthByte |= (bitWidth & 7);
- }
- }
-
- // write fourth byte of header
- out.write(fourthByte);
-
- // write estimated count
- long estCount = hll.estimateNumDistinctValues();
- writeVulong(out, estCount);
-
- // serialize dense/sparse registers. Dense registers are bitpacked whereas
- // sparse registers are delta and variable length encoded
- if (enc.equals(EncodingType.DENSE)) {
- byte[] register = hll.getHLLDenseRegister().getRegister();
- bitpackHLLRegister(out, register, bitWidth);
- } else if (enc.equals(EncodingType.SPARSE)) {
- TreeMap<Integer, Byte> sparseMap = hll.getHLLSparseRegister().getSparseMap();
-
- // write the number of elements in sparse map (required for
- // reconstruction)
- writeVulong(out, sparseMap.size());
-
- // compute deltas and write the values as varints
- int prev = 0;
- for (Map.Entry<Integer, Byte> entry : sparseMap.entrySet()) {
- if (prev == 0) {
- prev = (entry.getKey() << HLLConstants.Q_PRIME_VALUE) | entry.getValue();
- writeVulong(out, prev);
- } else {
- int curr = (entry.getKey() << HLLConstants.Q_PRIME_VALUE) | entry.getValue();
- int delta = curr - prev;
- writeVulong(out, delta);
- prev = curr;
- }
- }
- }
- }
-
- /**
- * Refer serializeHLL() for format of serialization. This function
- * deserializes the serialized hyperloglogs
- * @param in
- * - input stream
- * @return deserialized hyperloglog
- * @throws IOException
- */
- public static HyperLogLog deserializeHLL(InputStream in) throws IOException {
- checkMagicString(in);
- int fourthByte = in.read() & 0xff;
- int p = fourthByte >>> 4;
-
- // read type of encoding
- int enc = fourthByte & 7;
- EncodingType encoding = null;
- int bitSize = 0;
- if (enc == 0) {
- encoding = EncodingType.SPARSE;
- } else if (enc > 0 && enc < 7) {
- bitSize = enc;
- encoding = EncodingType.DENSE;
- } else {
- // bit packing disabled
- bitSize = 8;
- encoding = EncodingType.DENSE;
- }
-
- // estimated count
- long estCount = readVulong(in);
-
- HyperLogLog result = null;
- if (encoding.equals(EncodingType.SPARSE)) {
- result = HyperLogLog.builder().setNumRegisterIndexBits(p)
- .setEncoding(EncodingType.SPARSE).build();
- int numRegisterEntries = (int) readVulong(in);
- int[] reg = new int[numRegisterEntries];
- int prev = 0;
-
- // reconstruct the sparse map from delta encoded and varint input stream
- if (numRegisterEntries > 0) {
- prev = (int) readVulong(in);
- reg[0] = prev;
- }
- int delta = 0;
- int curr = 0;
- for (int i = 1; i < numRegisterEntries; i++) {
- delta = (int) readVulong(in);
- curr = prev + delta;
- reg[i] = curr;
- prev = curr;
- }
- result.setHLLSparseRegister(reg);
- } else {
-
- // explicitly disable bit packing
- if (bitSize == 8) {
- result = HyperLogLog.builder().setNumRegisterIndexBits(p)
- .setEncoding(EncodingType.DENSE).enableBitPacking(false).build();
- } else {
- result = HyperLogLog.builder().setNumRegisterIndexBits(p)
- .setEncoding(EncodingType.DENSE).enableBitPacking(true).build();
- }
- int m = 1 << p;
- byte[] register = unpackHLLRegister(in, m, bitSize);
- result.setHLLDenseRegister(register);
- }
-
- result.setCount(estCount);
-
- return result;
- }
-
- /**
- * This function deserializes the serialized hyperloglogs from a byte array.
- * @param buf - to deserialize
- * @return HyperLogLog
- */
- public static HyperLogLog deserializeHLL(final byte[] buf) {
- InputStream is = new ByteArrayInputStream(buf); // TODO: use faster non-sync inputstream
- try {
- HyperLogLog result = deserializeHLL(is);
- is.close();
- return result;
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- }
-
- private static void bitpackHLLRegister(OutputStream out, byte[] register, int bitWidth)
- throws IOException {
- int bitsLeft = 8;
- byte current = 0;
-
- if (bitWidth == 8) {
- fastPathWrite(out, register);
- return;
- }
-
- // write the blob
- for (byte value : register) {
- int bitsToWrite = bitWidth;
- while (bitsToWrite > bitsLeft) {
- // add the bits to the bottom of the current word
- current |= value >>> (bitsToWrite - bitsLeft);
- // subtract out the bits we just added
- bitsToWrite -= bitsLeft;
- // zero out the bits above bitsToWrite
- value &= (1 << bitsToWrite) - 1;
- out.write(current);
- current = 0;
- bitsLeft = 8;
- }
- bitsLeft -= bitsToWrite;
- current |= value << bitsLeft;
- if (bitsLeft == 0) {
- out.write(current);
- current = 0;
- bitsLeft = 8;
- }
- }
-
- out.flush();
- }
-
- private static void fastPathWrite(OutputStream out, byte[] register) throws IOException {
- for (byte b : register) {
- out.write(b);
- }
- }
-
- /**
- * Unpack the bitpacked HyperLogLog register.
- * @param in
- * - input stream
- * @param length
- * - serialized length
- * @return unpacked HLL register
- * @throws IOException
- */
- private static byte[] unpackHLLRegister(InputStream in, int length, int bitSize)
- throws IOException {
- int mask = (1 << bitSize) - 1;
- int bitsLeft = 8;
-
- if (bitSize == 8) {
- return fastPathRead(in, length);
- }
-
- byte current = (byte) (0xff & in.read());
-
- byte[] output = new byte[length];
- for (int i = 0; i < output.length; i++) {
- byte result = 0;
- int bitsLeftToRead = bitSize;
- while (bitsLeftToRead > bitsLeft) {
- result <<= bitsLeft;
- result |= current & ((1 << bitsLeft) - 1);
- bitsLeftToRead -= bitsLeft;
- current = (byte) (0xff & in.read());
- bitsLeft = 8;
- }
- if (bitsLeftToRead > 0) {
- result <<= bitsLeftToRead;
- bitsLeft -= bitsLeftToRead;
- result |= (current >>> bitsLeft) & ((1 << bitsLeftToRead) - 1);
- }
- output[i] = (byte) (result & mask);
- }
- return output;
- }
-
- private static byte[] fastPathRead(InputStream in, int length) throws IOException {
- byte[] result = new byte[length];
- for (int i = 0; i < length; i++) {
- result[i] = (byte) in.read();
- }
- return result;
- }
-
- /**
- * Get estimated cardinality without deserializing HLL
- * @param in
- * - serialized HLL
- * @return - cardinality
- * @throws IOException
- */
- public static long getEstimatedCountFromSerializedHLL(InputStream in) throws IOException {
- checkMagicString(in);
- in.read();
- return readVulong(in);
- }
-
- /**
- * Check if the specified input stream is actually a HLL stream
- * @param in
- * - input stream
- * @throws IOException
- */
- private static void checkMagicString(InputStream in) throws IOException {
- byte[] magic = new byte[3];
- magic[0] = (byte) in.read();
- magic[1] = (byte) in.read();
- magic[2] = (byte) in.read();
-
- if (!Arrays.equals(magic, MAGIC)) {
- throw new IllegalArgumentException("The input stream is not a HyperLogLog stream.");
- }
- }
-
- /**
- * Minimum bits required to encode the specified value
- * @param val
- * - input value
- * @return
- */
- private static int getBitWidth(int val) {
- int count = 0;
- while (val != 0) {
- count++;
- val = (byte) (val >>> 1);
- }
- return count;
- }
-
- /**
- * Return relative error between actual and estimated cardinality
- * @param actualCount
- * - actual count
- * @param estimatedCount
- * - estimated count
- * @return relative error
- */
- public static float getRelativeError(long actualCount, long estimatedCount) {
- float err = (1.0f - ((float) estimatedCount / (float) actualCount)) * 100.0f;
- return err;
- }
-
- /**
- * Write variable length encoded longs to output stream
- * @param output
- * - out stream
- * @param value
- * - long
- * @throws IOException
- */
- private static void writeVulong(OutputStream output, long value) throws IOException {
- while (true) {
- if ((value & ~0x7f) == 0) {
- output.write((byte) value);
- return;
- } else {
- output.write((byte) (0x80 | (value & 0x7f)));
- value >>>= 7;
- }
- }
- }
-
- /**
- * Read variable length encoded longs from input stream
- * @param in
- * - input stream
- * @return decoded long value
- * @throws IOException
- */
- private static long readVulong(InputStream in) throws IOException {
- long result = 0;
- long b;
- int offset = 0;
- do {
- b = in.read();
- if (b == -1) {
- throw new EOFException("Reading Vulong past EOF");
- }
- result |= (0x7f & b) << offset;
- offset += 7;
- } while (b >= 0x80);
- return result;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/hive/blob/081fa368/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/AcidEventListener.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/AcidEventListener.java b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/AcidEventListener.java
deleted file mode 100644
index 5279247..0000000
--- a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/AcidEventListener.java
+++ /dev/null
@@ -1,146 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hive.metastore;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hive.metastore.api.Database;
-import org.apache.hadoop.hive.metastore.api.HiveObjectType;
-import org.apache.hadoop.hive.metastore.api.MetaException;
-import org.apache.hadoop.hive.metastore.api.Partition;
-import org.apache.hadoop.hive.metastore.api.Table;
-import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
-import org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars;
-import org.apache.hadoop.hive.metastore.events.AlterDatabaseEvent;
-import org.apache.hadoop.hive.metastore.events.AlterPartitionEvent;
-import org.apache.hadoop.hive.metastore.events.AlterTableEvent;
-import org.apache.hadoop.hive.metastore.events.DropDatabaseEvent;
-import org.apache.hadoop.hive.metastore.events.DropPartitionEvent;
-import org.apache.hadoop.hive.metastore.events.DropTableEvent;
-import org.apache.hadoop.hive.metastore.txn.TxnStore;
-import org.apache.hadoop.hive.metastore.txn.TxnUtils;
-
-
-/**
- * It handles cleanup of dropped partition/table/database in ACID related metastore tables
- */
-public class AcidEventListener extends MetaStoreEventListener {
-
- private TxnStore txnHandler;
- private Configuration conf;
-
- public AcidEventListener(Configuration configuration) {
- super(configuration);
- conf = configuration;
- }
-
- @Override
- public void onDropDatabase (DropDatabaseEvent dbEvent) throws MetaException {
- // We can loop thru all the tables to check if they are ACID first and then perform cleanup,
- // but it's more efficient to unconditionally perform cleanup for the database, especially
- // when there are a lot of tables
- txnHandler = getTxnHandler();
- txnHandler.cleanupRecords(HiveObjectType.DATABASE, dbEvent.getDatabase(), null, null);
- }
-
- @Override
- public void onDropTable(DropTableEvent tableEvent) throws MetaException {
- if (TxnUtils.isTransactionalTable(tableEvent.getTable())) {
- txnHandler = getTxnHandler();
- txnHandler.cleanupRecords(HiveObjectType.TABLE, null, tableEvent.getTable(), null);
- }
- }
-
- @Override
- public void onDropPartition(DropPartitionEvent partitionEvent) throws MetaException {
- if (TxnUtils.isTransactionalTable(partitionEvent.getTable())) {
- txnHandler = getTxnHandler();
- txnHandler.cleanupRecords(HiveObjectType.PARTITION, null, partitionEvent.getTable(),
- partitionEvent.getPartitionIterator());
- }
- }
-
- @Override
- public void onAlterTable(AlterTableEvent tableEvent) throws MetaException {
- if (!TxnUtils.isTransactionalTable(tableEvent.getNewTable())) {
- return;
- }
- Table oldTable = tableEvent.getOldTable();
- Table newTable = tableEvent.getNewTable();
- if(!oldTable.getCatName().equalsIgnoreCase(newTable.getCatName()) ||
- !oldTable.getDbName().equalsIgnoreCase(newTable.getDbName()) ||
- !oldTable.getTableName().equalsIgnoreCase(newTable.getTableName())) {
- txnHandler = getTxnHandler();
- txnHandler.onRename(
- oldTable.getCatName(), oldTable.getDbName(), oldTable.getTableName(), null,
- newTable.getCatName(), newTable.getDbName(), newTable.getTableName(), null);
- }
- }
- @Override
- public void onAlterPartition(AlterPartitionEvent partitionEvent) throws MetaException {
- if (!TxnUtils.isTransactionalTable(partitionEvent.getTable())) {
- return;
- }
- Partition oldPart = partitionEvent.getOldPartition();
- Partition newPart = partitionEvent.getNewPartition();
- Table t = partitionEvent.getTable();
- String oldPartName = Warehouse.makePartName(t.getPartitionKeys(), oldPart.getValues());
- String newPartName = Warehouse.makePartName(t.getPartitionKeys(), newPart.getValues());
- if(!oldPartName.equals(newPartName)) {
- txnHandler = getTxnHandler();
- txnHandler.onRename(t.getCatName(), t.getDbName(), t.getTableName(), oldPartName,
- t.getCatName(), t.getDbName(), t.getTableName(), newPartName);
- }
- }
- @Override
- public void onAlterDatabase(AlterDatabaseEvent dbEvent) throws MetaException {
- Database oldDb = dbEvent.getOldDatabase();
- Database newDb = dbEvent.getNewDatabase();
- if(!oldDb.getCatalogName().equalsIgnoreCase(newDb.getCatalogName()) ||
- !oldDb.getName().equalsIgnoreCase(newDb.getName())) {
- txnHandler = getTxnHandler();
- txnHandler.onRename(
- oldDb.getCatalogName(), oldDb.getName(), null, null,
- newDb.getCatalogName(), newDb.getName(), null, null);
- }
- }
-
- private TxnStore getTxnHandler() {
- boolean hackOn = MetastoreConf.getBoolVar(conf, ConfVars.HIVE_IN_TEST) ||
- MetastoreConf.getBoolVar(conf, ConfVars.HIVE_IN_TEZ_TEST);
- String origTxnMgr = null;
- boolean origConcurrency = false;
-
- // Since TxnUtils.getTxnStore calls TxnHandler.setConf -> checkQFileTestHack -> TxnDbUtil.setConfValues,
- // which may change the values of below two entries, we need to avoid polluting the original values
- if (hackOn) {
- origTxnMgr = MetastoreConf.getVar(conf, ConfVars.HIVE_TXN_MANAGER);
- origConcurrency = MetastoreConf.getBoolVar(conf, ConfVars.HIVE_SUPPORT_CONCURRENCY);
- }
-
- txnHandler = TxnUtils.getTxnStore(conf);
-
- // Set them back
- if (hackOn) {
- MetastoreConf.setVar(conf, ConfVars.HIVE_TXN_MANAGER, origTxnMgr);
- MetastoreConf.setBoolVar(conf, ConfVars.HIVE_SUPPORT_CONCURRENCY, origConcurrency);
- }
-
- return txnHandler;
- }
-}
http://git-wip-us.apache.org/repos/asf/hive/blob/081fa368/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/AggregateStatsCache.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/AggregateStatsCache.java b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/AggregateStatsCache.java
deleted file mode 100644
index 8e920bb..0000000
--- a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/AggregateStatsCache.java
+++ /dev/null
@@ -1,571 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.hadoop.hive.metastore;
-
-import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
-import org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
-import org.apache.hive.common.util.BloomFilter;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
-public class AggregateStatsCache {
-
- private static final Logger LOG = LoggerFactory.getLogger(AggregateStatsCache.class.getName());
- private static AggregateStatsCache self = null;
-
- // Backing store for this cache
- private final ConcurrentHashMap<Key, AggrColStatsList> cacheStore;
- // Cache size
- private final int maxCacheNodes;
- // Current nodes in the cache
- private final AtomicInteger currentNodes = new AtomicInteger(0);
- // Run the cleaner thread when the cache is maxFull% full
- private final double maxFull;
- // Run the cleaner thread until cache is cleanUntil% occupied
- private final double cleanUntil;
- // Nodes go stale after this
- private final long timeToLiveMs;
- // Max time when waiting for write locks on node list
- private final long maxWriterWaitTime;
- // Max time when waiting for read locks on node list
- private final long maxReaderWaitTime;
- // Maximum number of paritions aggregated per cache node
- private final int maxPartsPerCacheNode;
- // Bloom filter false positive probability
- private final double falsePositiveProbability;
- // Max tolerable variance for matches
- private final double maxVariance;
- // Used to determine if cleaner thread is already running
- private boolean isCleaning = false;
- private final AtomicLong cacheHits = new AtomicLong(0);
- private final AtomicLong cacheMisses = new AtomicLong(0);
- // To track cleaner metrics
- int numRemovedTTL = 0, numRemovedLRU = 0;
-
- private AggregateStatsCache(int maxCacheNodes, int maxPartsPerCacheNode, long timeToLiveMs,
- double falsePositiveProbability, double maxVariance, long maxWriterWaitTime,
- long maxReaderWaitTime, double maxFull, double cleanUntil) {
- this.maxCacheNodes = maxCacheNodes;
- this.maxPartsPerCacheNode = maxPartsPerCacheNode;
- this.timeToLiveMs = timeToLiveMs;
- this.falsePositiveProbability = falsePositiveProbability;
- this.maxVariance = maxVariance;
- this.maxWriterWaitTime = maxWriterWaitTime;
- this.maxReaderWaitTime = maxReaderWaitTime;
- this.maxFull = maxFull;
- this.cleanUntil = cleanUntil;
- this.cacheStore = new ConcurrentHashMap<>();
- }
-
- public static synchronized AggregateStatsCache getInstance(Configuration conf) {
- if (self == null) {
- int maxCacheNodes =
- MetastoreConf.getIntVar(conf, ConfVars.AGGREGATE_STATS_CACHE_SIZE);
- // The number of partitions aggregated per cache node
- // If the number of partitions requested is > this value, we'll fetch directly from Metastore
- int maxPartitionsPerCacheNode =
- MetastoreConf.getIntVar(conf, ConfVars.AGGREGATE_STATS_CACHE_MAX_PARTITIONS);
- long timeToLiveMs = MetastoreConf.getTimeVar(conf, ConfVars.AGGREGATE_STATS_CACHE_TTL,
- TimeUnit.SECONDS)*1000;
- // False positives probability we are ready to tolerate for the underlying bloom filter
- double falsePositiveProbability =
- MetastoreConf.getDoubleVar(conf, ConfVars.AGGREGATE_STATS_CACHE_FPP);
- // Maximum tolerable variance in number of partitions between cached node and our request
- double maxVariance =
- MetastoreConf.getDoubleVar(conf, ConfVars.AGGREGATE_STATS_CACHE_MAX_VARIANCE);
- long maxWriterWaitTime = MetastoreConf.getTimeVar(conf,
- ConfVars.AGGREGATE_STATS_CACHE_MAX_WRITER_WAIT, TimeUnit.MILLISECONDS);
- long maxReaderWaitTime = MetastoreConf.getTimeVar(conf,
- ConfVars.AGGREGATE_STATS_CACHE_MAX_READER_WAIT, TimeUnit.MILLISECONDS);
- double maxFull =
- MetastoreConf.getDoubleVar(conf, ConfVars.AGGREGATE_STATS_CACHE_MAX_FULL);
- double cleanUntil =
- MetastoreConf.getDoubleVar(conf, ConfVars.AGGREGATE_STATS_CACHE_CLEAN_UNTIL);
- self =
- new AggregateStatsCache(maxCacheNodes, maxPartitionsPerCacheNode, timeToLiveMs,
- falsePositiveProbability, maxVariance, maxWriterWaitTime, maxReaderWaitTime, maxFull,
- cleanUntil);
- }
- return self;
- }
-
- public int getMaxCacheNodes() {
- return maxCacheNodes;
- }
-
- public int getCurrentNodes() {
- return currentNodes.intValue();
- }
-
- public float getFullPercent() {
- return (currentNodes.intValue() / (float) maxCacheNodes) * 100;
- }
-
- public int getMaxPartsPerCacheNode() {
- return maxPartsPerCacheNode;
- }
-
- public double getFalsePositiveProbability() {
- return falsePositiveProbability;
- }
-
- public Float getHitRatio() {
- if (cacheHits.longValue() + cacheMisses.longValue() > 0) {
- return (float) (cacheHits.longValue()) / (cacheHits.longValue() + cacheMisses.longValue());
- }
- return null;
- }
-
- /**
- * Return aggregate stats for a column from the cache or null.
- * While reading from the nodelist for a key, we wait maxReaderWaitTime to acquire the lock,
- * failing which we return a cache miss (i.e. null)
- * @param catName catalog name
- * @param dbName database name
- * @param tblName table name
- * @param colName column name
- * @param partNames list of partition names
- * @return aggregated col stats
- */
- public AggrColStats get(String catName, String dbName, String tblName, String colName, List<String> partNames) {
- // Cache key
- Key key = new Key(catName, dbName, tblName, colName);
- AggrColStatsList candidateList = cacheStore.get(key);
- // No key, or no nodes in candidate list
- if ((candidateList == null) || (candidateList.nodes.size() == 0)) {
- LOG.debug("No aggregate stats cached for " + key.toString());
- return null;
- }
- // Find the value object
- // Update the timestamp of the key,value if value matches the criteria
- // Return the value
- AggrColStats match = null;
- boolean isLocked = false;
- try {
- // Try to readlock the candidateList; timeout after maxReaderWaitTime
- isLocked = candidateList.readLock.tryLock(maxReaderWaitTime, TimeUnit.MILLISECONDS);
- if (isLocked) {
- match = findBestMatch(partNames, candidateList.nodes);
- }
- if (match != null) {
- // Ok to not lock the list for this and use a volatile lastAccessTime instead
- candidateList.updateLastAccessTime();
- cacheHits.incrementAndGet();
- LOG.info("Returning aggregate stats from the cache; total hits: " + cacheHits.longValue()
- + ", total misses: " + cacheMisses.longValue() + ", hit ratio: " + getHitRatio());
- }
- else {
- cacheMisses.incrementAndGet();
- }
- } catch (InterruptedException e) {
- LOG.debug("Interrupted Exception ignored ",e);
- } finally {
- if (isLocked) {
- candidateList.readLock.unlock();
- }
- }
- return match;
- }
-
- /**
- * Find the best match using the configurable error tolerance and time to live value
- *
- * @param partNames
- * @param candidates
- * @return best matched node or null
- */
- private AggrColStats findBestMatch(List<String> partNames, List<AggrColStats> candidates) {
- // Hits, misses tracked for a candidate node
- MatchStats matchStats;
- // MatchStats for each candidate
- Map<AggrColStats, MatchStats> candidateMatchStats = new HashMap<>();
- // The final match we intend to return
- AggrColStats bestMatch = null;
- // To compare among potentially multiple matches
- int bestMatchHits = 0;
- int numPartsRequested = partNames.size();
- // 1st pass at marking invalid candidates
- // Checks based on variance and TTL
- // Note: we're not creating a copy of the list for saving memory
- for (AggrColStats candidate : candidates) {
- // Variance check
- if (Math.abs((candidate.getNumPartsCached() - numPartsRequested) / numPartsRequested)
- > maxVariance) {
- continue;
- }
- // TTL check
- if (isExpired(candidate)) {
- continue;
- } else {
- candidateMatchStats.put(candidate, new MatchStats(0, 0));
- }
- }
- // We'll count misses as we iterate
- int maxMisses = (int) maxVariance * numPartsRequested;
- for (String partName : partNames) {
- for (Iterator<Map.Entry<AggrColStats, MatchStats>> iterator = candidateMatchStats.entrySet().iterator(); iterator.hasNext();) {
- Map.Entry<AggrColStats, MatchStats> entry = iterator.next();
- AggrColStats candidate = entry.getKey();
- matchStats = entry.getValue();
- if (candidate.getBloomFilter().test(partName.getBytes())) {
- ++matchStats.hits;
- } else {
- ++matchStats.misses;
- }
- // 2nd pass at removing invalid candidates
- // If misses so far exceed max tolerable misses
- if (matchStats.misses > maxMisses) {
- iterator.remove();
- continue;
- }
- // Check if this is the best match so far
- if (matchStats.hits > bestMatchHits) {
- bestMatch = candidate;
- }
- }
- }
- if (bestMatch != null) {
- // Update the last access time for this node
- bestMatch.updateLastAccessTime();
- }
- return bestMatch;
- }
-
- /**
- * Add a new node to the cache; may trigger the cleaner thread if the cache is near full capacity.
- * We'll however add the node even if we temporaily exceed maxCacheNodes, because the cleaner
- * will eventually create space from expired nodes or by removing LRU nodes.
- * @param catName catalog name
- * @param dbName database name
- * @param tblName table name
- * @param colName column name
- * @param numPartsCached
- * @param colStats
- * @param bloomFilter
- */
- // TODO: make add asynchronous: add shouldn't block the higher level calls
- public void add(String catName, String dbName, String tblName, String colName, long numPartsCached,
- ColumnStatisticsObj colStats, BloomFilter bloomFilter) {
- // If we have no space in the cache, run cleaner thread
- if (getCurrentNodes() / maxCacheNodes > maxFull) {
- spawnCleaner();
- }
- // Cache key
- Key key = new Key(catName, dbName, tblName, colName);
- // Add new node to the cache
- AggrColStats node = new AggrColStats(numPartsCached, bloomFilter, colStats);
- AggrColStatsList nodeList;
- AggrColStatsList newNodeList = new AggrColStatsList();
- newNodeList.nodes = new ArrayList<>();
- nodeList = cacheStore.putIfAbsent(key, newNodeList);
- if (nodeList == null) {
- nodeList = newNodeList;
- }
- boolean isLocked = false;
- try {
- isLocked = nodeList.writeLock.tryLock(maxWriterWaitTime, TimeUnit.MILLISECONDS);
- if (isLocked) {
- nodeList.nodes.add(node);
- node.updateLastAccessTime();
- nodeList.updateLastAccessTime();
- currentNodes.getAndIncrement();
- }
- } catch (InterruptedException e) {
- LOG.debug("Interrupted Exception ignored ", e);
- } finally {
- if (isLocked) {
- nodeList.writeLock.unlock();
- }
- }
- }
-
- /**
- * Cleans the expired nodes or removes LRU nodes of the cache,
- * until the cache size reduces to cleanUntil% full.
- */
- private void spawnCleaner() {
- // This spawns a separate thread to walk through the cache and removes expired nodes.
- // Only one cleaner thread should be running at any point.
- synchronized (this) {
- if (isCleaning) {
- return;
- }
- isCleaning = true;
- }
- Thread cleaner = new Thread("AggregateStatsCache-CleanerThread") {
- @Override
- public void run() {
- numRemovedTTL = 0;
- numRemovedLRU = 0;
- long cleanerStartTime = System.currentTimeMillis();
- LOG.info("AggregateStatsCache is " + getFullPercent() + "% full, with "
- + getCurrentNodes() + " nodes; starting cleaner thread");
- try {
- Iterator<Map.Entry<Key, AggrColStatsList>> mapIterator = cacheStore.entrySet().iterator();
- while (mapIterator.hasNext()) {
- Map.Entry<Key, AggrColStatsList> pair =
- mapIterator.next();
- AggrColStats node;
- AggrColStatsList candidateList = pair.getValue();
- List<AggrColStats> nodes = candidateList.nodes;
- if (nodes.size() == 0) {
- mapIterator.remove();
- continue;
- }
- boolean isLocked = false;
- try {
- isLocked = candidateList.writeLock.tryLock(maxWriterWaitTime, TimeUnit.MILLISECONDS);
- if (isLocked) {
- for (Iterator<AggrColStats> listIterator = nodes.iterator(); listIterator.hasNext();) {
- node = listIterator.next();
- // Remove the node if it has expired
- if (isExpired(node)) {
- listIterator.remove();
- numRemovedTTL++;
- currentNodes.getAndDecrement();
- }
- }
- }
- } catch (InterruptedException e) {
- LOG.debug("Interrupted Exception ignored ",e);
- } finally {
- if (isLocked) {
- candidateList.writeLock.unlock();
- }
- }
- // We want to make sure this runs at a low priority in the background
- Thread.yield();
- }
- // If the expired nodes did not result in cache being cleanUntil% in size,
- // start removing LRU nodes
- while (getCurrentNodes() / maxCacheNodes > cleanUntil) {
- evictOneNode();
- }
- } finally {
- isCleaning = false;
- LOG.info("Stopping cleaner thread; AggregateStatsCache is now " + getFullPercent()
- + "% full, with " + getCurrentNodes() + " nodes");
- LOG.info("Number of expired nodes removed: " + numRemovedTTL);
- LOG.info("Number of LRU nodes removed: " + numRemovedLRU);
- LOG.info("Cleaner ran for: " + (System.currentTimeMillis() - cleanerStartTime) + "ms");
- }
- }
- };
- cleaner.setPriority(Thread.MIN_PRIORITY);
- cleaner.setDaemon(true);
- cleaner.start();
- }
-
- /**
- * Evict an LRU node or expired node whichever we find first
- */
- private void evictOneNode() {
- // Get the LRU key, value
- Key lruKey = null;
- AggrColStatsList lruValue = null;
- for (Map.Entry<Key, AggrColStatsList> entry : cacheStore.entrySet()) {
- Key key = entry.getKey();
- AggrColStatsList value = entry.getValue();
- if (lruKey == null) {
- lruKey = key;
- lruValue = value;
- continue;
- }
- if ((value.lastAccessTime < lruValue.lastAccessTime) && !(value.nodes.isEmpty())) {
- lruKey = key;
- lruValue = value;
- }
- }
- // Now delete a node for this key's list
- AggrColStatsList candidateList = cacheStore.get(lruKey);
- boolean isLocked = false;
- try {
- isLocked = candidateList.writeLock.tryLock(maxWriterWaitTime, TimeUnit.MILLISECONDS);
- if (isLocked) {
- AggrColStats candidate;
- AggrColStats lruNode = null;
- int currentIndex = 0;
- int deleteIndex = 0;
- for (Iterator<AggrColStats> iterator = candidateList.nodes.iterator(); iterator.hasNext();) {
- candidate = iterator.next();
- // Since we have to create space for 1, if we find an expired node we will remove it &
- // return
- if (isExpired(candidate)) {
- iterator.remove();
- currentNodes.getAndDecrement();
- numRemovedTTL++;
- return;
- }
- // Sorry, too many ifs but this form looks optimal
- // Update the LRU node from what we've seen so far
- if (lruNode == null) {
- lruNode = candidate;
- ++currentIndex;
- continue;
- }
- if (lruNode != null) {
- if (candidate.lastAccessTime < lruNode.lastAccessTime) {
- lruNode = candidate;
- deleteIndex = currentIndex;
- }
- }
- }
- candidateList.nodes.remove(deleteIndex);
- currentNodes.getAndDecrement();
- numRemovedLRU++;
- }
- } catch (InterruptedException e) {
- LOG.debug("Interrupted Exception ignored ",e);
- } finally {
- if (isLocked) {
- candidateList.writeLock.unlock();
- }
- }
- }
-
- private boolean isExpired(AggrColStats aggrColStats) {
- return (System.currentTimeMillis() - aggrColStats.lastAccessTime) > timeToLiveMs;
- }
-
- /**
- * Key object for the stats cache hashtable
- */
- static class Key {
- private final String catName;
- private final String dbName;
- private final String tblName;
- private final String colName;
-
- Key(String cat, String db, String table, String col) {
- // Don't construct an illegal cache key
- if (cat == null || (db == null) || (table == null) || (col == null)) {
- throw new IllegalArgumentException("catName, dbName, tblName, colName can't be null");
- }
- catName = cat;
- dbName = db;
- tblName = table;
- colName = col;
- }
-
- @Override
- public boolean equals(Object other) {
- if ((other == null) || !(other instanceof Key)) {
- return false;
- }
- Key that = (Key) other;
- return catName.equals(that.catName) && dbName.equals(that.dbName) &&
- tblName.equals(that.tblName) && colName.equals(that.colName);
- }
-
- @Override
- public int hashCode() {
- return catName.hashCode() * 31 + dbName.hashCode() * 31 + tblName.hashCode() * 31 +
- colName.hashCode();
- }
-
- @Override
- public String toString() {
- return "catalog: " + catName + ", database:" + dbName + ", table:" + tblName + ", column:" +
- colName;
- }
-
- }
-
- static class AggrColStatsList {
- // TODO: figure out a better data structure for node list(?)
- private List<AggrColStats> nodes = new ArrayList<>();
- private final ReadWriteLock lock = new ReentrantReadWriteLock();
- // Read lock for get operation
- private final Lock readLock = lock.readLock();
- // Write lock for add, evict and clean operation
- private final Lock writeLock = lock.writeLock();
- // Using volatile instead of locking updates to this variable,
- // since we can rely on approx lastAccessTime but don't want a performance hit
- private volatile long lastAccessTime = 0;
-
- List<AggrColStats> getNodes() {
- return nodes;
- }
-
- void updateLastAccessTime() {
- this.lastAccessTime = System.currentTimeMillis();
- }
- }
-
- public static class AggrColStats {
- private final long numPartsCached;
- private final BloomFilter bloomFilter;
- private final ColumnStatisticsObj colStats;
- private volatile long lastAccessTime;
-
- public AggrColStats(long numPartsCached, BloomFilter bloomFilter,
- ColumnStatisticsObj colStats) {
- this.numPartsCached = numPartsCached;
- this.bloomFilter = bloomFilter;
- this.colStats = colStats;
- this.lastAccessTime = System.currentTimeMillis();
- }
-
- public long getNumPartsCached() {
- return numPartsCached;
- }
-
- public ColumnStatisticsObj getColStats() {
- return colStats;
- }
-
- public BloomFilter getBloomFilter() {
- return bloomFilter;
- }
-
- void updateLastAccessTime() {
- this.lastAccessTime = System.currentTimeMillis();
- }
- }
-
- /**
- * Intermediate object, used to collect hits & misses for each cache node that is evaluate for an
- * incoming request
- */
- private static class MatchStats {
- private int hits = 0;
- private int misses = 0;
-
- MatchStats(int hits, int misses) {
- this.hits = hits;
- this.misses = misses;
- }
- }
-}