You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ga...@apache.org on 2015/07/30 19:13:19 UTC
hive git commit: HIVE-11382 Invalidate aggregate column stats on
alter partition (gates)
Repository: hive
Updated Branches:
refs/heads/hbase-metastore 9d3d4ebfe -> 7e7f461b0
HIVE-11382 Invalidate aggregate column stats on alter partition (gates)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/7e7f461b
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/7e7f461b
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/7e7f461b
Branch: refs/heads/hbase-metastore
Commit: 7e7f461b0ba86e40224564e0ad1e320c4f6d62b3
Parents: 9d3d4eb
Author: Alan Gates <ga...@hortonworks.com>
Authored: Thu Jul 30 10:12:35 2015 -0700
Committer: Alan Gates <ga...@hortonworks.com>
Committed: Thu Jul 30 10:12:35 2015 -0700
----------------------------------------------------------------------
.../TestHBaseAggrStatsCacheIntegration.java | 192 +++++++++++++++++++
.../hadoop/hive/metastore/hbase/HBaseStore.java | 7 +
2 files changed, 199 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/7e7f461b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/hbase/TestHBaseAggrStatsCacheIntegration.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/hbase/TestHBaseAggrStatsCacheIntegration.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/hbase/TestHBaseAggrStatsCacheIntegration.java
index 7e6a2ef..ad76b2e 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/hbase/TestHBaseAggrStatsCacheIntegration.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/hbase/TestHBaseAggrStatsCacheIntegration.java
@@ -496,4 +496,196 @@ public class TestHBaseAggrStatsCacheIntegration extends HBaseIntegrationTests {
store.backdoor().getStatsCache().wakeInvalidator();
}
}
+
+ @Test
+ public void alterInvalidation() throws Exception {
+ try {
+ String dbName = "default";
+ String tableName = "ai";
+ List<String> partVals1 = Arrays.asList("today");
+ List<String> partVals2 = Arrays.asList("yesterday");
+ List<String> partVals3 = Arrays.asList("tomorrow");
+ long now = System.currentTimeMillis();
+
+ List<FieldSchema> cols = new ArrayList<>();
+ cols.add(new FieldSchema("col1", "boolean", "nocomment"));
+ SerDeInfo serde = new SerDeInfo("serde", "seriallib", null);
+ StorageDescriptor sd = new StorageDescriptor(cols, "file:/tmp", "input", "output", false, 0,
+ serde, null, null, Collections.<String, String>emptyMap());
+ List<FieldSchema> partCols = new ArrayList<>();
+ partCols.add(new FieldSchema("ds", "string", ""));
+ Table table = new Table(tableName, dbName, "me", (int) now, (int) now, 0, sd, partCols,
+ Collections.<String, String>emptyMap(), null, null, null);
+ store.createTable(table);
+
+ Partition[] partitions = new Partition[3];
+ int partnum = 0;
+ for (List<String> partVals : Arrays.asList(partVals1, partVals2, partVals3)) {
+ StorageDescriptor psd = new StorageDescriptor(sd);
+ psd.setLocation("file:/tmp/default/invalidation/ds=" + partVals.get(0));
+ Partition part = new Partition(partVals, dbName, tableName, (int) now, (int) now, psd,
+ Collections.<String, String>emptyMap());
+ partitions[partnum++] = part;
+ store.addPartition(part);
+
+ ColumnStatistics cs = new ColumnStatistics();
+ ColumnStatisticsDesc desc = new ColumnStatisticsDesc(false, dbName, tableName);
+ desc.setLastAnalyzed(now);
+ desc.setPartName("ds=" + partVals.get(0));
+ cs.setStatsDesc(desc);
+ ColumnStatisticsObj obj = new ColumnStatisticsObj();
+ obj.setColName("col1");
+ obj.setColType("boolean");
+ ColumnStatisticsData data = new ColumnStatisticsData();
+ BooleanColumnStatsData bcsd = new BooleanColumnStatsData();
+ bcsd.setNumFalses(10);
+ bcsd.setNumTrues(20);
+ bcsd.setNumNulls(30);
+ data.setBooleanStats(bcsd);
+ obj.setStatsData(data);
+ cs.addToStatsObj(obj);
+
+ store.updatePartitionColumnStatistics(cs, partVals);
+ }
+
+ AggrStats aggrStats = store.get_aggr_stats_for(dbName, tableName,
+ Arrays.asList("ds=today", "ds=tomorrow"), Arrays.asList("col1"));
+ aggrStats = store.get_aggr_stats_for(dbName, tableName,
+ Arrays.asList("ds=today", "ds=yesterday"), Arrays.asList("col1"));
+
+ // Check that we had to build it from the stats
+ Assert.assertEquals(0, store.backdoor().getStatsCache().hbaseHits.getCnt());
+ Assert.assertEquals(2, store.backdoor().getStatsCache().totalGets.getCnt());
+ Assert.assertEquals(2, store.backdoor().getStatsCache().misses.getCnt());
+
+ // wake the invalidator and check again to make sure it isn't too aggressive about
+ // removing our stuff.
+ store.backdoor().getStatsCache().wakeInvalidator();
+
+ Partition newPart = new Partition(partitions[2]);
+ newPart.setLastAccessTime((int)System.currentTimeMillis());
+ store.alterPartition(dbName, tableName, partVals3, newPart);
+
+ store.backdoor().getStatsCache().setRunInvalidatorEvery(100);
+ store.backdoor().getStatsCache().wakeInvalidator();
+
+ aggrStats = store.get_aggr_stats_for(dbName, tableName,
+ Arrays.asList("ds=tomorrow", "ds=today"), Arrays.asList("col1"));
+
+ // Check that we missed, which means this aggregate was dropped from the cache.
+ Assert.assertEquals(0, store.backdoor().getStatsCache().hbaseHits.getCnt());
+ Assert.assertEquals(3, store.backdoor().getStatsCache().totalGets.getCnt());
+ Assert.assertEquals(3, store.backdoor().getStatsCache().misses.getCnt());
+
+ // Check that our other aggregate is still in the cache.
+ aggrStats = store.get_aggr_stats_for(dbName, tableName,
+ Arrays.asList("ds=yesterday", "ds=today"), Arrays.asList("col1"));
+
+ Assert.assertEquals(0, store.backdoor().getStatsCache().hbaseHits.getCnt());
+ Assert.assertEquals(4, store.backdoor().getStatsCache().totalGets.getCnt());
+ Assert.assertEquals(3, store.backdoor().getStatsCache().misses.getCnt());
+ } finally {
+ store.backdoor().getStatsCache().setRunInvalidatorEvery(5000);
+ store.backdoor().getStatsCache().setMaxTimeInCache(500000);
+ store.backdoor().getStatsCache().wakeInvalidator();
+ }
+ }
+
+ @Test
+ public void altersInvalidation() throws Exception {
+ try {
+ String dbName = "default";
+ String tableName = "asi";
+ List<String> partVals1 = Arrays.asList("today");
+ List<String> partVals2 = Arrays.asList("yesterday");
+ List<String> partVals3 = Arrays.asList("tomorrow");
+ long now = System.currentTimeMillis();
+
+ List<FieldSchema> cols = new ArrayList<>();
+ cols.add(new FieldSchema("col1", "boolean", "nocomment"));
+ SerDeInfo serde = new SerDeInfo("serde", "seriallib", null);
+ StorageDescriptor sd = new StorageDescriptor(cols, "file:/tmp", "input", "output", false, 0,
+ serde, null, null, Collections.<String, String>emptyMap());
+ List<FieldSchema> partCols = new ArrayList<>();
+ partCols.add(new FieldSchema("ds", "string", ""));
+ Table table = new Table(tableName, dbName, "me", (int) now, (int) now, 0, sd, partCols,
+ Collections.<String, String>emptyMap(), null, null, null);
+ store.createTable(table);
+
+ Partition[] partitions = new Partition[3];
+ int partnum = 0;
+ for (List<String> partVals : Arrays.asList(partVals1, partVals2, partVals3)) {
+ StorageDescriptor psd = new StorageDescriptor(sd);
+ psd.setLocation("file:/tmp/default/invalidation/ds=" + partVals.get(0));
+ Partition part = new Partition(partVals, dbName, tableName, (int) now, (int) now, psd,
+ Collections.<String, String>emptyMap());
+ partitions[partnum++] = part;
+ store.addPartition(part);
+
+ ColumnStatistics cs = new ColumnStatistics();
+ ColumnStatisticsDesc desc = new ColumnStatisticsDesc(false, dbName, tableName);
+ desc.setLastAnalyzed(now);
+ desc.setPartName("ds=" + partVals.get(0));
+ cs.setStatsDesc(desc);
+ ColumnStatisticsObj obj = new ColumnStatisticsObj();
+ obj.setColName("col1");
+ obj.setColType("boolean");
+ ColumnStatisticsData data = new ColumnStatisticsData();
+ BooleanColumnStatsData bcsd = new BooleanColumnStatsData();
+ bcsd.setNumFalses(10);
+ bcsd.setNumTrues(20);
+ bcsd.setNumNulls(30);
+ data.setBooleanStats(bcsd);
+ obj.setStatsData(data);
+ cs.addToStatsObj(obj);
+
+ store.updatePartitionColumnStatistics(cs, partVals);
+ }
+
+ AggrStats aggrStats = store.get_aggr_stats_for(dbName, tableName,
+ Arrays.asList("ds=today", "ds=tomorrow"), Arrays.asList("col1"));
+ aggrStats = store.get_aggr_stats_for(dbName, tableName,
+ Arrays.asList("ds=today", "ds=yesterday"), Arrays.asList("col1"));
+
+ // Check that we had to build it from the stats
+ Assert.assertEquals(0, store.backdoor().getStatsCache().hbaseHits.getCnt());
+ Assert.assertEquals(2, store.backdoor().getStatsCache().totalGets.getCnt());
+ Assert.assertEquals(2, store.backdoor().getStatsCache().misses.getCnt());
+
+ // wake the invalidator and check again to make sure it isn't too aggressive about
+ // removing our stuff.
+ store.backdoor().getStatsCache().wakeInvalidator();
+
+ Partition[] newParts = new Partition[2];
+ newParts[0] = new Partition(partitions[0]);
+ newParts[0].setLastAccessTime((int)System.currentTimeMillis());
+ newParts[1] = new Partition(partitions[2]);
+ newParts[1].setLastAccessTime((int) System.currentTimeMillis());
+ store.alterPartitions(dbName, tableName, Arrays.asList(partVals1, partVals3),
+ Arrays.asList(newParts));
+
+ store.backdoor().getStatsCache().setRunInvalidatorEvery(100);
+ store.backdoor().getStatsCache().wakeInvalidator();
+
+ aggrStats = store.get_aggr_stats_for(dbName, tableName,
+ Arrays.asList("ds=tomorrow", "ds=today"), Arrays.asList("col1"));
+
+ // Check that we missed, which means this aggregate was dropped from the cache.
+ Assert.assertEquals(0, store.backdoor().getStatsCache().hbaseHits.getCnt());
+ Assert.assertEquals(3, store.backdoor().getStatsCache().totalGets.getCnt());
+ Assert.assertEquals(3, store.backdoor().getStatsCache().misses.getCnt());
+
+ // Check that our other aggregate got dropped too
+ aggrStats = store.get_aggr_stats_for(dbName, tableName,
+ Arrays.asList("ds=yesterday", "ds=today"), Arrays.asList("col1"));
+
+ Assert.assertEquals(0, store.backdoor().getStatsCache().hbaseHits.getCnt());
+ Assert.assertEquals(4, store.backdoor().getStatsCache().totalGets.getCnt());
+ Assert.assertEquals(4, store.backdoor().getStatsCache().misses.getCnt());
+ } finally {
+ store.backdoor().getStatsCache().setRunInvalidatorEvery(5000);
+ store.backdoor().getStatsCache().setMaxTimeInCache(500000);
+ store.backdoor().getStatsCache().wakeInvalidator();
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/7e7f461b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseStore.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseStore.java b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseStore.java
index 744070d..f8042fc 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseStore.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseStore.java
@@ -522,6 +522,9 @@ public class HBaseStore implements RawStore {
try {
Partition oldPart = getHBase().getPartition(db_name, tbl_name, part_vals);
getHBase().replacePartition(oldPart, new_part);
+ // Drop any cached stats that reference this partitions
+ getHBase().getStatsCache().invalidate(db_name, tbl_name,
+ buildExternalPartName(db_name, tbl_name, part_vals));
commit = true;
} catch (IOException e) {
LOG.error("Unable to add partition", e);
@@ -540,6 +543,10 @@ public class HBaseStore implements RawStore {
try {
List<Partition> oldParts = getHBase().getPartitions(db_name, tbl_name, part_vals_list);
getHBase().replacePartitions(oldParts, new_parts);
+ for (List<String> part_vals : part_vals_list) {
+ getHBase().getStatsCache().invalidate(db_name, tbl_name,
+ buildExternalPartName(db_name, tbl_name, part_vals));
+ }
commit = true;
} catch (IOException e) {
LOG.error("Unable to add partition", e);