You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by da...@apache.org on 2015/09/22 07:03:34 UTC
[03/50] [abbrv] hive git commit: HIVE-11294 Use HBase to cache
aggregated stats (gates)
HIVE-11294 Use HBase to cache aggregated stats (gates)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/c53c6f45
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/c53c6f45
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/c53c6f45
Branch: refs/heads/master
Commit: c53c6f45988db869d56abe3b1d831ff775f4fa73
Parents: 1a1c0d8
Author: Alan Gates <ga...@hortonworks.com>
Authored: Wed Jul 22 11:17:01 2015 -0700
Committer: Alan Gates <ga...@hortonworks.com>
Committed: Wed Jul 22 11:17:01 2015 -0700
----------------------------------------------------------------------
.../org/apache/hadoop/hive/conf/HiveConf.java | 51 +-
.../apache/hive/common/util/BloomFilter.java | 20 +-
.../TestHBaseAggrStatsCacheIntegration.java | 499 +++
.../metastore/hbase/HbaseMetastoreProto.java | 4189 +++++++++++++++++-
.../hbase/AggrStatsInvalidatorFilter.java | 121 +
.../hadoop/hive/metastore/hbase/Counter.java | 6 +
.../hive/metastore/hbase/HBaseReadWrite.java | 316 +-
.../hadoop/hive/metastore/hbase/HBaseStore.java | 47 +-
.../hadoop/hive/metastore/hbase/HBaseUtils.java | 81 +-
.../hadoop/hive/metastore/hbase/StatsCache.java | 326 ++
.../stats/ColumnStatsAggregatorFactory.java | 51 +
.../metastore/hbase/hbase_metastore_proto.proto | 30 +
.../hbase/TestHBaseAggregateStatsCache.java | 316 ++
.../hive/metastore/hbase/TestHBaseStore.java | 2 +-
14 files changed, 5717 insertions(+), 338 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/c53c6f45/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 5eb11c2..c42b030 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -18,25 +18,7 @@
package org.apache.hadoop.hive.conf;
-import java.io.ByteArrayOutputStream;
-import java.io.File;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.PrintStream;
-import java.net.URL;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Properties;
-import java.util.concurrent.TimeUnit;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-import javax.security.auth.login.LoginException;
-
+import com.google.common.base.Joiner;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -54,7 +36,23 @@ import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.Shell;
import org.apache.hive.common.HiveCompat;
-import com.google.common.base.Joiner;
+import javax.security.auth.login.LoginException;
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.PrintStream;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
/**
* Hive Configuration.
@@ -417,6 +415,19 @@ public class HiveConf extends Configuration {
METASTORE_HBASE_CONNECTION_CLASS("hive.metastore.hbase.connection.class",
"org.apache.hadoop.hive.metastore.hbase.VanillaHBaseConnection",
"Class used to connection to HBase"),
+ METASTORE_HBASE_AGGR_STATS_CACHE_ENTRIES("hive.metastore.hbase.aggr.stats.cache.entries",
+ 10000, "How many in stats objects to cache in memory"),
+ METASTORE_HBASE_AGGR_STATS_MEMORY_TTL("hive.metastore.hbase.aggr.stats.memory.ttl", "60s",
+ new TimeValidator(TimeUnit.SECONDS),
+ "Number of seconds stats objects live in memory after they are read from HBase."),
+ METASTORE_HBASE_AGGR_STATS_INVALIDATOR_FREQUENCY(
+ "hive.metastore.hbase.aggr.stats.invalidator.frequency", "5s",
+ new TimeValidator(TimeUnit.SECONDS),
+ "How often the stats cache scans its HBase entries and looks for expired entries"),
+ METASTORE_HBASE_AGGR_STATS_HBASE_TTL("hive.metastore.hbase.aggr.stats.hbase.ttl", "604800s",
+ new TimeValidator(TimeUnit.SECONDS),
+ "Number of seconds stats entries live in HBase cache after they are created. They may be" +
+ " invalided by updates or partition drops before this. Default is one week."),
METASTORETHRIFTCONNECTIONRETRIES("hive.metastore.connect.retries", 3,
"Number of retries while opening a connection to metastore"),
http://git-wip-us.apache.org/repos/asf/hive/blob/c53c6f45/common/src/java/org/apache/hive/common/util/BloomFilter.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hive/common/util/BloomFilter.java b/common/src/java/org/apache/hive/common/util/BloomFilter.java
index 656ba8a..d894241 100644
--- a/common/src/java/org/apache/hive/common/util/BloomFilter.java
+++ b/common/src/java/org/apache/hive/common/util/BloomFilter.java
@@ -18,9 +18,10 @@
package org.apache.hive.common.util;
-import static com.google.common.base.Preconditions.checkArgument;
-
import java.util.Arrays;
+import java.util.List;
+
+import static com.google.common.base.Preconditions.checkArgument;
/**
* BloomFilter is a probabilistic data structure for set membership check. BloomFilters are
@@ -63,6 +64,21 @@ public class BloomFilter {
this.bitSet = new BitSet(numBits);
}
+ /**
+ * A constructor to support rebuilding the BloomFilter from a serialized representation.
+ * @param bits
+ * @param numBits
+ * @param numFuncs
+ */
+ public BloomFilter(List<Long> bits, int numBits, int numFuncs) {
+ super();
+ long[] copied = new long[bits.size()];
+ for (int i = 0; i < bits.size(); i++) copied[i] = bits.get(i);
+ bitSet = new BitSet(copied);
+ this.numBits = numBits;
+ numHashFunctions = numFuncs;
+ }
+
static int optimalNumOfHashFunctions(long n, long m) {
return Math.max(1, (int) Math.round((double) m / n * Math.log(2)));
}
http://git-wip-us.apache.org/repos/asf/hive/blob/c53c6f45/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/hbase/TestHBaseAggrStatsCacheIntegration.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/hbase/TestHBaseAggrStatsCacheIntegration.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/hbase/TestHBaseAggrStatsCacheIntegration.java
new file mode 100644
index 0000000..7e6a2ef
--- /dev/null
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/hbase/TestHBaseAggrStatsCacheIntegration.java
@@ -0,0 +1,499 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hadoop.hive.metastore.hbase;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.metastore.api.AggrStats;
+import org.apache.hadoop.hive.metastore.api.BooleanColumnStatsData;
+import org.apache.hadoop.hive.metastore.api.ColumnStatistics;
+import org.apache.hadoop.hive.metastore.api.ColumnStatisticsData;
+import org.apache.hadoop.hive.metastore.api.ColumnStatisticsDesc;
+import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.LongColumnStatsData;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.SerDeInfo;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.metastore.api.StringColumnStatsData;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * Integration tests with HBase Mini-cluster for HBaseStore
+ */
+public class TestHBaseAggrStatsCacheIntegration extends HBaseIntegrationTests {
+
+ private static final Log LOG = LogFactory.getLog(TestHBaseStoreIntegration.class.getName());
+
+ @Rule public ExpectedException thrown = ExpectedException.none();
+
+ @BeforeClass
+ public static void startup() throws Exception {
+ HBaseIntegrationTests.startMiniCluster();
+ }
+
+ @AfterClass
+ public static void shutdown() throws Exception {
+ HBaseIntegrationTests.shutdownMiniCluster();
+ }
+
+ @Before
+ public void setup() throws IOException {
+ setupConnection();
+ setupHBaseStore();
+ store.backdoor().getStatsCache().resetCounters();
+ }
+
+ private static interface Checker {
+ void checkStats(AggrStats aggrStats) throws Exception;
+ }
+
+ @Test
+ public void hit() throws Exception {
+ String dbName = "default";
+ String tableName = "hit";
+ List<String> partVals1 = Arrays.asList("today");
+ List<String> partVals2 = Arrays.asList("yesterday");
+ long now = System.currentTimeMillis();
+
+ List<FieldSchema> cols = new ArrayList<>();
+ cols.add(new FieldSchema("col1", "boolean", "nocomment"));
+ cols.add(new FieldSchema("col2", "varchar", "nocomment"));
+ SerDeInfo serde = new SerDeInfo("serde", "seriallib", null);
+ StorageDescriptor sd = new StorageDescriptor(cols, "file:/tmp", "input", "output", false, 0,
+ serde, null, null, Collections.<String, String>emptyMap());
+ List<FieldSchema> partCols = new ArrayList<>();
+ partCols.add(new FieldSchema("ds", "string", ""));
+ Table table = new Table(tableName, dbName, "me", (int)now, (int)now, 0, sd, partCols,
+ Collections.<String, String>emptyMap(), null, null, null);
+ store.createTable(table);
+
+ for (List<String> partVals : Arrays.asList(partVals1, partVals2)) {
+ StorageDescriptor psd = new StorageDescriptor(sd);
+ psd.setLocation("file:/tmp/default/hit/ds=" + partVals.get(0));
+ Partition part = new Partition(partVals, dbName, tableName, (int) now, (int) now, psd,
+ Collections.<String, String>emptyMap());
+ store.addPartition(part);
+
+ ColumnStatistics cs = new ColumnStatistics();
+ ColumnStatisticsDesc desc = new ColumnStatisticsDesc(false, dbName, tableName);
+ desc.setLastAnalyzed(now);
+ desc.setPartName("ds=" + partVals.get(0));
+ cs.setStatsDesc(desc);
+ ColumnStatisticsObj obj = new ColumnStatisticsObj();
+ obj.setColName("col1");
+ obj.setColType("boolean");
+ ColumnStatisticsData data = new ColumnStatisticsData();
+ BooleanColumnStatsData bcsd = new BooleanColumnStatsData();
+ bcsd.setNumFalses(10);
+ bcsd.setNumTrues(20);
+ bcsd.setNumNulls(30);
+ data.setBooleanStats(bcsd);
+ obj.setStatsData(data);
+ cs.addToStatsObj(obj);
+
+ obj = new ColumnStatisticsObj();
+ obj.setColName("col2");
+ obj.setColType("varchar");
+ data = new ColumnStatisticsData();
+ StringColumnStatsData scsd = new StringColumnStatsData();
+ scsd.setAvgColLen(10.3);
+ scsd.setMaxColLen(2000);
+ scsd.setNumNulls(3);
+ scsd.setNumDVs(12342);
+ data.setStringStats(scsd);
+ obj.setStatsData(data);
+ cs.addToStatsObj(obj);
+
+ store.updatePartitionColumnStatistics(cs, partVals);
+ }
+
+ Checker statChecker = new Checker() {
+ @Override
+ public void checkStats(AggrStats aggrStats) throws Exception {
+ Assert.assertEquals(4, aggrStats.getPartsFound());
+ Assert.assertEquals(2, aggrStats.getColStatsSize());
+ ColumnStatisticsObj cso = aggrStats.getColStats().get(0);
+ Assert.assertEquals("col1", cso.getColName());
+ Assert.assertEquals("boolean", cso.getColType());
+ BooleanColumnStatsData bcsd = cso.getStatsData().getBooleanStats();
+ Assert.assertEquals(20, bcsd.getNumFalses());
+ Assert.assertEquals(40, bcsd.getNumTrues());
+ Assert.assertEquals(60, bcsd.getNumNulls());
+
+ cso = aggrStats.getColStats().get(1);
+ Assert.assertEquals("col2", cso.getColName());
+ Assert.assertEquals("string", cso.getColType());
+ StringColumnStatsData scsd = cso.getStatsData().getStringStats();
+ Assert.assertEquals(10.3, scsd.getAvgColLen(), 0.1);
+ Assert.assertEquals(2000, scsd.getMaxColLen());
+ Assert.assertEquals(6, scsd.getNumNulls());
+ Assert.assertEquals(12342, scsd.getNumDVs());
+ }
+ };
+
+ AggrStats aggrStats = store.get_aggr_stats_for(dbName, tableName,
+ Arrays.asList("ds=today", "ds=yesterday"), Arrays.asList("col1", "col2"));
+ statChecker.checkStats(aggrStats);
+
+ // Check that we had to build it from the stats
+ Assert.assertEquals(0, store.backdoor().getStatsCache().hbaseHits.getCnt());
+ Assert.assertEquals(2, store.backdoor().getStatsCache().totalGets.getCnt());
+ Assert.assertEquals(2, store.backdoor().getStatsCache().misses.getCnt());
+
+ // Call again, this time it should come from memory. Also, reverse the name order this time
+ // to assure that we still hit.
+ aggrStats = store.get_aggr_stats_for(dbName, tableName,
+ Arrays.asList("ds=yesterday", "ds=today"), Arrays.asList("col1", "col2"));
+ statChecker.checkStats(aggrStats);
+
+ Assert.assertEquals(0, store.backdoor().getStatsCache().hbaseHits.getCnt());
+ Assert.assertEquals(4, store.backdoor().getStatsCache().totalGets.getCnt());
+ Assert.assertEquals(2, store.backdoor().getStatsCache().misses.getCnt());
+
+ store.backdoor().getStatsCache().flushMemory();
+ // Call again, this time it should come from hbase
+ aggrStats = store.get_aggr_stats_for(dbName, tableName,
+ Arrays.asList("ds=today", "ds=yesterday"), Arrays.asList("col1", "col2"));
+ statChecker.checkStats(aggrStats);
+
+ Assert.assertEquals(2, store.backdoor().getStatsCache().hbaseHits.getCnt());
+ Assert.assertEquals(6, store.backdoor().getStatsCache().totalGets.getCnt());
+ Assert.assertEquals(2, store.backdoor().getStatsCache().misses.getCnt());
+ }
+
+ @Test
+ public void someWithStats() throws Exception {
+ String dbName = "default";
+ String tableName = "psws";
+ List<String> partVals1 = Arrays.asList("today");
+ List<String> partVals2 = Arrays.asList("yesterday");
+ long now = System.currentTimeMillis();
+
+ List<FieldSchema> cols = new ArrayList<>();
+ cols.add(new FieldSchema("col1", "long", "nocomment"));
+ SerDeInfo serde = new SerDeInfo("serde", "seriallib", null);
+ StorageDescriptor sd = new StorageDescriptor(cols, "file:/tmp", "input", "output", false, 0,
+ serde, null, null, Collections.<String, String>emptyMap());
+ List<FieldSchema> partCols = new ArrayList<>();
+ partCols.add(new FieldSchema("ds", "string", ""));
+ Table table = new Table(tableName, dbName, "me", (int)now, (int)now, 0, sd, partCols,
+ Collections.<String, String>emptyMap(), null, null, null);
+ store.createTable(table);
+
+ boolean first = true;
+ for (List<String> partVals : Arrays.asList(partVals1, partVals2)) {
+ StorageDescriptor psd = new StorageDescriptor(sd);
+ psd.setLocation("file:/tmp/default/psws/ds=" + partVals.get(0));
+ Partition part = new Partition(partVals, dbName, tableName, (int) now, (int) now, psd,
+ Collections.<String, String>emptyMap());
+ store.addPartition(part);
+
+ if (first) {
+ ColumnStatistics cs = new ColumnStatistics();
+ ColumnStatisticsDesc desc = new ColumnStatisticsDesc(false, dbName, tableName);
+ desc.setLastAnalyzed(now);
+ desc.setPartName("ds=" + partVals.get(0));
+ cs.setStatsDesc(desc);
+ ColumnStatisticsObj obj = new ColumnStatisticsObj();
+ obj.setColName("col1");
+ obj.setColType("long");
+ ColumnStatisticsData data = new ColumnStatisticsData();
+ LongColumnStatsData lcsd = new LongColumnStatsData();
+ lcsd.setHighValue(192L);
+ lcsd.setLowValue(-20L);
+ lcsd.setNumNulls(30);
+ lcsd.setNumDVs(32);
+ data.setLongStats(lcsd);
+ obj.setStatsData(data);
+ cs.addToStatsObj(obj);
+
+ store.updatePartitionColumnStatistics(cs, partVals);
+ first = false;
+ }
+ }
+
+ Checker statChecker = new Checker() {
+ @Override
+ public void checkStats(AggrStats aggrStats) throws Exception {
+ Assert.assertEquals(1, aggrStats.getPartsFound());
+ Assert.assertEquals(1, aggrStats.getColStatsSize());
+ ColumnStatisticsObj cso = aggrStats.getColStats().get(0);
+ Assert.assertEquals("col1", cso.getColName());
+ Assert.assertEquals("long", cso.getColType());
+ LongColumnStatsData lcsd = cso.getStatsData().getLongStats();
+ Assert.assertEquals(192L, lcsd.getHighValue());
+ Assert.assertEquals(-20L, lcsd.getLowValue());
+ Assert.assertEquals(30, lcsd.getNumNulls());
+ Assert.assertEquals(32, lcsd.getNumDVs());
+ }
+ };
+
+ AggrStats aggrStats = store.get_aggr_stats_for(dbName, tableName,
+ Arrays.asList("ds=today", "ds=yesterday"), Arrays.asList("col1"));
+ statChecker.checkStats(aggrStats);
+
+ // Check that we had to build it from the stats
+ Assert.assertEquals(0, store.backdoor().getStatsCache().hbaseHits.getCnt());
+ Assert.assertEquals(1, store.backdoor().getStatsCache().totalGets.getCnt());
+ Assert.assertEquals(1, store.backdoor().getStatsCache().misses.getCnt());
+
+ // Call again, this time it should come from memory. Also, reverse the name order this time
+ // to assure that we still hit.
+ aggrStats = store.get_aggr_stats_for(dbName, tableName,
+ Arrays.asList("ds=yesterday", "ds=today"), Arrays.asList("col1"));
+ statChecker.checkStats(aggrStats);
+
+ Assert.assertEquals(0, store.backdoor().getStatsCache().hbaseHits.getCnt());
+ Assert.assertEquals(2, store.backdoor().getStatsCache().totalGets.getCnt());
+ Assert.assertEquals(1, store.backdoor().getStatsCache().misses.getCnt());
+
+ store.backdoor().getStatsCache().flushMemory();
+ // Call again, this time it should come from hbase
+ aggrStats = store.get_aggr_stats_for(dbName, tableName,
+ Arrays.asList("ds=today", "ds=yesterday"), Arrays.asList("col1"));
+ statChecker.checkStats(aggrStats);
+
+ Assert.assertEquals(1, store.backdoor().getStatsCache().hbaseHits.getCnt());
+ Assert.assertEquals(3, store.backdoor().getStatsCache().totalGets.getCnt());
+ Assert.assertEquals(1, store.backdoor().getStatsCache().misses.getCnt());
+ }
+
+ @Test
+ public void invalidation() throws Exception {
+ try {
+ String dbName = "default";
+ String tableName = "invalidation";
+ List<String> partVals1 = Arrays.asList("today");
+ List<String> partVals2 = Arrays.asList("yesterday");
+ List<String> partVals3 = Arrays.asList("tomorrow");
+ long now = System.currentTimeMillis();
+
+ List<FieldSchema> cols = new ArrayList<>();
+ cols.add(new FieldSchema("col1", "boolean", "nocomment"));
+ SerDeInfo serde = new SerDeInfo("serde", "seriallib", null);
+ StorageDescriptor sd = new StorageDescriptor(cols, "file:/tmp", "input", "output", false, 0,
+ serde, null, null, Collections.<String, String>emptyMap());
+ List<FieldSchema> partCols = new ArrayList<>();
+ partCols.add(new FieldSchema("ds", "string", ""));
+ Table table = new Table(tableName, dbName, "me", (int) now, (int) now, 0, sd, partCols,
+ Collections.<String, String>emptyMap(), null, null, null);
+ store.createTable(table);
+
+ for (List<String> partVals : Arrays.asList(partVals1, partVals2, partVals3)) {
+ StorageDescriptor psd = new StorageDescriptor(sd);
+ psd.setLocation("file:/tmp/default/invalidation/ds=" + partVals.get(0));
+ Partition part = new Partition(partVals, dbName, tableName, (int) now, (int) now, psd,
+ Collections.<String, String>emptyMap());
+ store.addPartition(part);
+
+ ColumnStatistics cs = new ColumnStatistics();
+ ColumnStatisticsDesc desc = new ColumnStatisticsDesc(false, dbName, tableName);
+ desc.setLastAnalyzed(now);
+ desc.setPartName("ds=" + partVals.get(0));
+ cs.setStatsDesc(desc);
+ ColumnStatisticsObj obj = new ColumnStatisticsObj();
+ obj.setColName("col1");
+ obj.setColType("boolean");
+ ColumnStatisticsData data = new ColumnStatisticsData();
+ BooleanColumnStatsData bcsd = new BooleanColumnStatsData();
+ bcsd.setNumFalses(10);
+ bcsd.setNumTrues(20);
+ bcsd.setNumNulls(30);
+ data.setBooleanStats(bcsd);
+ obj.setStatsData(data);
+ cs.addToStatsObj(obj);
+
+ store.updatePartitionColumnStatistics(cs, partVals);
+ }
+
+ Checker statChecker = new Checker() {
+ @Override
+ public void checkStats(AggrStats aggrStats) throws Exception {
+ Assert.assertEquals(2, aggrStats.getPartsFound());
+ Assert.assertEquals(1, aggrStats.getColStatsSize());
+ ColumnStatisticsObj cso = aggrStats.getColStats().get(0);
+ Assert.assertEquals("col1", cso.getColName());
+ Assert.assertEquals("boolean", cso.getColType());
+ BooleanColumnStatsData bcsd = cso.getStatsData().getBooleanStats();
+ Assert.assertEquals(20, bcsd.getNumFalses());
+ Assert.assertEquals(40, bcsd.getNumTrues());
+ Assert.assertEquals(60, bcsd.getNumNulls());
+ }
+ };
+
+ AggrStats aggrStats = store.get_aggr_stats_for(dbName, tableName,
+ Arrays.asList("ds=today", "ds=yesterday"), Arrays.asList("col1"));
+ statChecker.checkStats(aggrStats);
+
+ // Check that we had to build it from the stats
+ Assert.assertEquals(0, store.backdoor().getStatsCache().hbaseHits.getCnt());
+ Assert.assertEquals(1, store.backdoor().getStatsCache().totalGets.getCnt());
+ Assert.assertEquals(1, store.backdoor().getStatsCache().misses.getCnt());
+
+ // Call again, this time it should come from memory. Also, reverse the name order this time
+ // to assure that we still hit.
+ aggrStats = store.get_aggr_stats_for(dbName, tableName,
+ Arrays.asList("ds=yesterday", "ds=today"), Arrays.asList("col1"));
+ statChecker.checkStats(aggrStats);
+
+ Assert.assertEquals(0, store.backdoor().getStatsCache().hbaseHits.getCnt());
+ Assert.assertEquals(2, store.backdoor().getStatsCache().totalGets.getCnt());
+ Assert.assertEquals(1, store.backdoor().getStatsCache().misses.getCnt());
+
+ // Now call a different combination to get it in memory too
+ aggrStats = store.get_aggr_stats_for(dbName, tableName,
+ Arrays.asList("ds=tomorrow", "ds=today"), Arrays.asList("col1"));
+ statChecker.checkStats(aggrStats);
+
+ Assert.assertEquals(0, store.backdoor().getStatsCache().hbaseHits.getCnt());
+ Assert.assertEquals(3, store.backdoor().getStatsCache().totalGets.getCnt());
+ Assert.assertEquals(2, store.backdoor().getStatsCache().misses.getCnt());
+
+ aggrStats = store.get_aggr_stats_for(dbName, tableName,
+ Arrays.asList("ds=tomorrow", "ds=today"), Arrays.asList("col1"));
+ statChecker.checkStats(aggrStats);
+
+ Assert.assertEquals(0, store.backdoor().getStatsCache().hbaseHits.getCnt());
+ Assert.assertEquals(4, store.backdoor().getStatsCache().totalGets.getCnt());
+ Assert.assertEquals(2, store.backdoor().getStatsCache().misses.getCnt());
+
+ // wake the invalidator and check again to make sure it isn't too aggressive about
+ // removing our stuff.
+ store.backdoor().getStatsCache().wakeInvalidator();
+
+ aggrStats = store.get_aggr_stats_for(dbName, tableName,
+ Arrays.asList("ds=tomorrow", "ds=today"), Arrays.asList("col1"));
+ statChecker.checkStats(aggrStats);
+
+ Assert.assertEquals(0, store.backdoor().getStatsCache().hbaseHits.getCnt());
+ Assert.assertEquals(5, store.backdoor().getStatsCache().totalGets.getCnt());
+ Assert.assertEquals(2, store.backdoor().getStatsCache().misses.getCnt());
+
+ // Update statistics for 'tomorrow'
+ ColumnStatistics cs = new ColumnStatistics();
+ ColumnStatisticsDesc desc = new ColumnStatisticsDesc(false, dbName, tableName);
+ desc.setLastAnalyzed(now);
+ desc.setPartName("ds=" + partVals3.get(0));
+ cs.setStatsDesc(desc);
+ ColumnStatisticsObj obj = new ColumnStatisticsObj();
+ obj.setColName("col1");
+ obj.setColType("boolean");
+ ColumnStatisticsData data = new ColumnStatisticsData();
+ BooleanColumnStatsData bcsd = new BooleanColumnStatsData();
+ bcsd.setNumFalses(100);
+ bcsd.setNumTrues(200);
+ bcsd.setNumNulls(300);
+ data.setBooleanStats(bcsd);
+ obj.setStatsData(data);
+ cs.addToStatsObj(obj);
+
+ Checker afterUpdate = new Checker() {
+ @Override
+ public void checkStats(AggrStats aggrStats) throws Exception {
+ Assert.assertEquals(2, aggrStats.getPartsFound());
+ Assert.assertEquals(1, aggrStats.getColStatsSize());
+ ColumnStatisticsObj cso = aggrStats.getColStats().get(0);
+ Assert.assertEquals("col1", cso.getColName());
+ Assert.assertEquals("boolean", cso.getColType());
+ BooleanColumnStatsData bcsd = cso.getStatsData().getBooleanStats();
+ Assert.assertEquals(110, bcsd.getNumFalses());
+ Assert.assertEquals(220, bcsd.getNumTrues());
+ Assert.assertEquals(330, bcsd.getNumNulls());
+ }
+ };
+
+ store.updatePartitionColumnStatistics(cs, partVals3);
+
+ store.backdoor().getStatsCache().setRunInvalidatorEvery(100);
+ store.backdoor().getStatsCache().wakeInvalidator();
+
+ aggrStats = store.get_aggr_stats_for(dbName, tableName,
+ Arrays.asList("ds=tomorrow", "ds=today"), Arrays.asList("col1"));
+ afterUpdate.checkStats(aggrStats);
+
+ // Check that we missed, which means this aggregate was dropped from the cache.
+ Assert.assertEquals(0, store.backdoor().getStatsCache().hbaseHits.getCnt());
+ Assert.assertEquals(6, store.backdoor().getStatsCache().totalGets.getCnt());
+ Assert.assertEquals(3, store.backdoor().getStatsCache().misses.getCnt());
+
+ // Check that our other aggregate is still in the cache.
+ aggrStats = store.get_aggr_stats_for(dbName, tableName,
+ Arrays.asList("ds=yesterday", "ds=today"), Arrays.asList("col1"));
+ statChecker.checkStats(aggrStats);
+
+ Assert.assertEquals(0, store.backdoor().getStatsCache().hbaseHits.getCnt());
+ Assert.assertEquals(7, store.backdoor().getStatsCache().totalGets.getCnt());
+ Assert.assertEquals(3, store.backdoor().getStatsCache().misses.getCnt());
+
+ // Drop 'yesterday', so our first aggregate should be dumped from memory and hbase
+ store.dropPartition(dbName, tableName, partVals2);
+
+ store.backdoor().getStatsCache().wakeInvalidator();
+
+ aggrStats = store.get_aggr_stats_for(dbName, tableName,
+ Arrays.asList("ds=yesterday", "ds=today"), Arrays.asList("col1"));
+ new Checker() {
+ @Override
+ public void checkStats(AggrStats aggrStats) throws Exception {
+ Assert.assertEquals(1, aggrStats.getPartsFound());
+ Assert.assertEquals(1, aggrStats.getColStatsSize());
+ ColumnStatisticsObj cso = aggrStats.getColStats().get(0);
+ Assert.assertEquals("col1", cso.getColName());
+ Assert.assertEquals("boolean", cso.getColType());
+ BooleanColumnStatsData bcsd = cso.getStatsData().getBooleanStats();
+ Assert.assertEquals(10, bcsd.getNumFalses());
+ Assert.assertEquals(20, bcsd.getNumTrues());
+ Assert.assertEquals(30, bcsd.getNumNulls());
+ }
+ }.checkStats(aggrStats);
+
+ // Check that we missed, which means this aggregate was dropped from the cache.
+ Assert.assertEquals(0, store.backdoor().getStatsCache().hbaseHits.getCnt());
+ Assert.assertEquals(8, store.backdoor().getStatsCache().totalGets.getCnt());
+ Assert.assertEquals(4, store.backdoor().getStatsCache().misses.getCnt());
+
+ // Check that our other aggregate is still in the cache.
+ aggrStats = store.get_aggr_stats_for(dbName, tableName,
+ Arrays.asList("ds=tomorrow", "ds=today"), Arrays.asList("col1"));
+ afterUpdate.checkStats(aggrStats);
+
+ Assert.assertEquals(0, store.backdoor().getStatsCache().hbaseHits.getCnt());
+ Assert.assertEquals(9, store.backdoor().getStatsCache().totalGets.getCnt());
+ Assert.assertEquals(4, store.backdoor().getStatsCache().misses.getCnt());
+ } finally {
+ store.backdoor().getStatsCache().setRunInvalidatorEvery(5000);
+ store.backdoor().getStatsCache().setMaxTimeInCache(500000);
+ store.backdoor().getStatsCache().wakeInvalidator();
+ }
+ }
+}