You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by da...@apache.org on 2015/09/22 07:03:43 UTC

[12/50] [abbrv] hive git commit: HIVE-11382 Invalidate aggregate column stats on alter partition (gates)

HIVE-11382 Invalidate aggregate column stats on alter partition (gates)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/7e7f461b
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/7e7f461b
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/7e7f461b

Branch: refs/heads/master
Commit: 7e7f461b0ba86e40224564e0ad1e320c4f6d62b3
Parents: 9d3d4eb
Author: Alan Gates <ga...@hortonworks.com>
Authored: Thu Jul 30 10:12:35 2015 -0700
Committer: Alan Gates <ga...@hortonworks.com>
Committed: Thu Jul 30 10:12:35 2015 -0700

----------------------------------------------------------------------
 .../TestHBaseAggrStatsCacheIntegration.java     | 192 +++++++++++++++++++
 .../hadoop/hive/metastore/hbase/HBaseStore.java |   7 +
 2 files changed, 199 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/7e7f461b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/hbase/TestHBaseAggrStatsCacheIntegration.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/hbase/TestHBaseAggrStatsCacheIntegration.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/hbase/TestHBaseAggrStatsCacheIntegration.java
index 7e6a2ef..ad76b2e 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/hbase/TestHBaseAggrStatsCacheIntegration.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/hbase/TestHBaseAggrStatsCacheIntegration.java
@@ -496,4 +496,196 @@ public class TestHBaseAggrStatsCacheIntegration extends HBaseIntegrationTests {
       store.backdoor().getStatsCache().wakeInvalidator();
     }
   }
+
+  @Test
+  public void alterInvalidation() throws Exception {
+    try {
+      String dbName = "default";
+      String tableName = "ai";
+      List<String> partVals1 = Arrays.asList("today");
+      List<String> partVals2 = Arrays.asList("yesterday");
+      List<String> partVals3 = Arrays.asList("tomorrow");
+      long now = System.currentTimeMillis();
+
+      List<FieldSchema> cols = new ArrayList<>();
+      cols.add(new FieldSchema("col1", "boolean", "nocomment"));
+      SerDeInfo serde = new SerDeInfo("serde", "seriallib", null);
+      StorageDescriptor sd = new StorageDescriptor(cols, "file:/tmp", "input", "output", false, 0,
+          serde, null, null, Collections.<String, String>emptyMap());
+      List<FieldSchema> partCols = new ArrayList<>();
+      partCols.add(new FieldSchema("ds", "string", ""));
+      Table table = new Table(tableName, dbName, "me", (int) now, (int) now, 0, sd, partCols,
+          Collections.<String, String>emptyMap(), null, null, null);
+      store.createTable(table);
+
+      Partition[] partitions = new Partition[3];
+      int partnum = 0;
+      for (List<String> partVals : Arrays.asList(partVals1, partVals2, partVals3)) {
+        StorageDescriptor psd = new StorageDescriptor(sd);
+        psd.setLocation("file:/tmp/default/invalidation/ds=" + partVals.get(0));
+        Partition part = new Partition(partVals, dbName, tableName, (int) now, (int) now, psd,
+            Collections.<String, String>emptyMap());
+        partitions[partnum++] = part;
+        store.addPartition(part);
+
+        ColumnStatistics cs = new ColumnStatistics();
+        ColumnStatisticsDesc desc = new ColumnStatisticsDesc(false, dbName, tableName);
+        desc.setLastAnalyzed(now);
+        desc.setPartName("ds=" + partVals.get(0));
+        cs.setStatsDesc(desc);
+        ColumnStatisticsObj obj = new ColumnStatisticsObj();
+        obj.setColName("col1");
+        obj.setColType("boolean");
+        ColumnStatisticsData data = new ColumnStatisticsData();
+        BooleanColumnStatsData bcsd = new BooleanColumnStatsData();
+        bcsd.setNumFalses(10);
+        bcsd.setNumTrues(20);
+        bcsd.setNumNulls(30);
+        data.setBooleanStats(bcsd);
+        obj.setStatsData(data);
+        cs.addToStatsObj(obj);
+
+        store.updatePartitionColumnStatistics(cs, partVals);
+      }
+
+      AggrStats aggrStats = store.get_aggr_stats_for(dbName, tableName,
+          Arrays.asList("ds=today", "ds=tomorrow"), Arrays.asList("col1"));
+      aggrStats = store.get_aggr_stats_for(dbName, tableName,
+          Arrays.asList("ds=today", "ds=yesterday"), Arrays.asList("col1"));
+
+      // Check that we had to build it from the stats
+      Assert.assertEquals(0, store.backdoor().getStatsCache().hbaseHits.getCnt());
+      Assert.assertEquals(2, store.backdoor().getStatsCache().totalGets.getCnt());
+      Assert.assertEquals(2, store.backdoor().getStatsCache().misses.getCnt());
+
+      // wake the invalidator and check again to make sure it isn't too aggressive about
+      // removing our stuff.
+      store.backdoor().getStatsCache().wakeInvalidator();
+
+      Partition newPart = new Partition(partitions[2]);
+      newPart.setLastAccessTime((int)System.currentTimeMillis());
+      store.alterPartition(dbName, tableName, partVals3, newPart);
+
+      store.backdoor().getStatsCache().setRunInvalidatorEvery(100);
+      store.backdoor().getStatsCache().wakeInvalidator();
+
+      aggrStats = store.get_aggr_stats_for(dbName, tableName,
+          Arrays.asList("ds=tomorrow", "ds=today"), Arrays.asList("col1"));
+
+      // Check that we missed, which means this aggregate was dropped from the cache.
+      Assert.assertEquals(0, store.backdoor().getStatsCache().hbaseHits.getCnt());
+      Assert.assertEquals(3, store.backdoor().getStatsCache().totalGets.getCnt());
+      Assert.assertEquals(3, store.backdoor().getStatsCache().misses.getCnt());
+
+      // Check that our other aggregate is still in the cache.
+      aggrStats = store.get_aggr_stats_for(dbName, tableName,
+          Arrays.asList("ds=yesterday", "ds=today"), Arrays.asList("col1"));
+
+      Assert.assertEquals(0, store.backdoor().getStatsCache().hbaseHits.getCnt());
+      Assert.assertEquals(4, store.backdoor().getStatsCache().totalGets.getCnt());
+      Assert.assertEquals(3, store.backdoor().getStatsCache().misses.getCnt());
+    } finally {
+      store.backdoor().getStatsCache().setRunInvalidatorEvery(5000);
+      store.backdoor().getStatsCache().setMaxTimeInCache(500000);
+      store.backdoor().getStatsCache().wakeInvalidator();
+    }
+  }
+
+  @Test
+  public void altersInvalidation() throws Exception {
+    try {
+      String dbName = "default";
+      String tableName = "asi";
+      List<String> partVals1 = Arrays.asList("today");
+      List<String> partVals2 = Arrays.asList("yesterday");
+      List<String> partVals3 = Arrays.asList("tomorrow");
+      long now = System.currentTimeMillis();
+
+      List<FieldSchema> cols = new ArrayList<>();
+      cols.add(new FieldSchema("col1", "boolean", "nocomment"));
+      SerDeInfo serde = new SerDeInfo("serde", "seriallib", null);
+      StorageDescriptor sd = new StorageDescriptor(cols, "file:/tmp", "input", "output", false, 0,
+          serde, null, null, Collections.<String, String>emptyMap());
+      List<FieldSchema> partCols = new ArrayList<>();
+      partCols.add(new FieldSchema("ds", "string", ""));
+      Table table = new Table(tableName, dbName, "me", (int) now, (int) now, 0, sd, partCols,
+          Collections.<String, String>emptyMap(), null, null, null);
+      store.createTable(table);
+
+      Partition[] partitions = new Partition[3];
+      int partnum = 0;
+      for (List<String> partVals : Arrays.asList(partVals1, partVals2, partVals3)) {
+        StorageDescriptor psd = new StorageDescriptor(sd);
+        psd.setLocation("file:/tmp/default/invalidation/ds=" + partVals.get(0));
+        Partition part = new Partition(partVals, dbName, tableName, (int) now, (int) now, psd,
+            Collections.<String, String>emptyMap());
+        partitions[partnum++] = part;
+        store.addPartition(part);
+
+        ColumnStatistics cs = new ColumnStatistics();
+        ColumnStatisticsDesc desc = new ColumnStatisticsDesc(false, dbName, tableName);
+        desc.setLastAnalyzed(now);
+        desc.setPartName("ds=" + partVals.get(0));
+        cs.setStatsDesc(desc);
+        ColumnStatisticsObj obj = new ColumnStatisticsObj();
+        obj.setColName("col1");
+        obj.setColType("boolean");
+        ColumnStatisticsData data = new ColumnStatisticsData();
+        BooleanColumnStatsData bcsd = new BooleanColumnStatsData();
+        bcsd.setNumFalses(10);
+        bcsd.setNumTrues(20);
+        bcsd.setNumNulls(30);
+        data.setBooleanStats(bcsd);
+        obj.setStatsData(data);
+        cs.addToStatsObj(obj);
+
+        store.updatePartitionColumnStatistics(cs, partVals);
+      }
+
+      AggrStats aggrStats = store.get_aggr_stats_for(dbName, tableName,
+          Arrays.asList("ds=today", "ds=tomorrow"), Arrays.asList("col1"));
+      aggrStats = store.get_aggr_stats_for(dbName, tableName,
+          Arrays.asList("ds=today", "ds=yesterday"), Arrays.asList("col1"));
+
+      // Check that we had to build it from the stats
+      Assert.assertEquals(0, store.backdoor().getStatsCache().hbaseHits.getCnt());
+      Assert.assertEquals(2, store.backdoor().getStatsCache().totalGets.getCnt());
+      Assert.assertEquals(2, store.backdoor().getStatsCache().misses.getCnt());
+
+      // wake the invalidator and check again to make sure it isn't too aggressive about
+      // removing our stuff.
+      store.backdoor().getStatsCache().wakeInvalidator();
+
+      Partition[] newParts = new Partition[2];
+      newParts[0] = new Partition(partitions[0]);
+      newParts[0].setLastAccessTime((int)System.currentTimeMillis());
+      newParts[1] = new Partition(partitions[2]);
+      newParts[1].setLastAccessTime((int) System.currentTimeMillis());
+      store.alterPartitions(dbName, tableName, Arrays.asList(partVals1, partVals3),
+          Arrays.asList(newParts));
+
+      store.backdoor().getStatsCache().setRunInvalidatorEvery(100);
+      store.backdoor().getStatsCache().wakeInvalidator();
+
+      aggrStats = store.get_aggr_stats_for(dbName, tableName,
+          Arrays.asList("ds=tomorrow", "ds=today"), Arrays.asList("col1"));
+
+      // Check that we missed, which means this aggregate was dropped from the cache.
+      Assert.assertEquals(0, store.backdoor().getStatsCache().hbaseHits.getCnt());
+      Assert.assertEquals(3, store.backdoor().getStatsCache().totalGets.getCnt());
+      Assert.assertEquals(3, store.backdoor().getStatsCache().misses.getCnt());
+
+      // Check that our other aggregate got dropped too
+      aggrStats = store.get_aggr_stats_for(dbName, tableName,
+          Arrays.asList("ds=yesterday", "ds=today"), Arrays.asList("col1"));
+
+      Assert.assertEquals(0, store.backdoor().getStatsCache().hbaseHits.getCnt());
+      Assert.assertEquals(4, store.backdoor().getStatsCache().totalGets.getCnt());
+      Assert.assertEquals(4, store.backdoor().getStatsCache().misses.getCnt());
+    } finally {
+      store.backdoor().getStatsCache().setRunInvalidatorEvery(5000);
+      store.backdoor().getStatsCache().setMaxTimeInCache(500000);
+      store.backdoor().getStatsCache().wakeInvalidator();
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/7e7f461b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseStore.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseStore.java b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseStore.java
index 744070d..f8042fc 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseStore.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseStore.java
@@ -522,6 +522,9 @@ public class HBaseStore implements RawStore {
     try {
       Partition oldPart = getHBase().getPartition(db_name, tbl_name, part_vals);
       getHBase().replacePartition(oldPart, new_part);
+      // Drop any cached stats that reference this partitions
+      getHBase().getStatsCache().invalidate(db_name, tbl_name,
+          buildExternalPartName(db_name, tbl_name, part_vals));
       commit = true;
     } catch (IOException e) {
       LOG.error("Unable to add partition", e);
@@ -540,6 +543,10 @@ public class HBaseStore implements RawStore {
     try {
       List<Partition> oldParts = getHBase().getPartitions(db_name, tbl_name, part_vals_list);
       getHBase().replacePartitions(oldParts, new_parts);
+      for (List<String> part_vals : part_vals_list) {
+        getHBase().getStatsCache().invalidate(db_name, tbl_name,
+            buildExternalPartName(db_name, tbl_name, part_vals));
+      }
       commit = true;
     } catch (IOException e) {
       LOG.error("Unable to add partition", e);