You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by vg...@apache.org on 2017/05/22 22:57:59 UTC
[1/2] hive git commit: HIVE-16579: CachedStore: improvements to
partition col stats caching and cache column stats for unpartitioned table
(Daniel Dai, Thejas Nair, Vaibhav Gumashta reviewed by Daniel Dai,
Thejas Nair)
Repository: hive
Updated Branches:
refs/heads/master 952fe6e17 -> d85beaa99
http://git-wip-us.apache.org/repos/asf/hive/blob/d85beaa9/metastore/src/java/org/apache/hadoop/hive/metastore/cache/SharedCache.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/cache/SharedCache.java b/metastore/src/java/org/apache/hadoop/hive/metastore/cache/SharedCache.java
index 7beee42..6b6355b 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/cache/SharedCache.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/cache/SharedCache.java
@@ -21,14 +21,18 @@ import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.ArrayList;
import java.util.HashMap;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.Map.Entry;
import java.util.TreeMap;
-import org.apache.hadoop.hive.common.StatsSetupConst;
+import org.apache.hadoop.hive.metastore.StatObjectConverter;
+import org.apache.hadoop.hive.metastore.Warehouse;
import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
import org.apache.hadoop.hive.metastore.api.Database;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
import org.apache.hadoop.hive.metastore.api.Table;
@@ -38,17 +42,26 @@ import org.apache.hadoop.hive.metastore.cache.CachedStore.StorageDescriptorWrapp
import org.apache.hadoop.hive.metastore.cache.CachedStore.TableWrapper;
import org.apache.hadoop.hive.metastore.hbase.HBaseUtils;
import org.apache.hive.common.util.HiveStringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import com.google.common.annotations.VisibleForTesting;
public class SharedCache {
private static Map<String, Database> databaseCache = new TreeMap<String, Database>();
private static Map<String, TableWrapper> tableCache = new TreeMap<String, TableWrapper>();
- private static Map<String, PartitionWrapper> partitionCache = new TreeMap<String, PartitionWrapper>();
- private static Map<String, ColumnStatisticsObj> partitionColStatsCache = new TreeMap<String, ColumnStatisticsObj>();
- private static Map<ByteArrayWrapper, StorageDescriptorWrapper> sdCache = new HashMap<ByteArrayWrapper, StorageDescriptorWrapper>();
+ private static Map<String, PartitionWrapper> partitionCache =
+ new TreeMap<String, PartitionWrapper>();
+ private static Map<String, ColumnStatisticsObj> partitionColStatsCache =
+ new TreeMap<String, ColumnStatisticsObj>();
+ private static Map<String, ColumnStatisticsObj> tableColStatsCache =
+ new TreeMap<String, ColumnStatisticsObj>();
+ private static Map<ByteArrayWrapper, StorageDescriptorWrapper> sdCache =
+ new HashMap<ByteArrayWrapper, StorageDescriptorWrapper>();
private static MessageDigest md;
+ static final private Logger LOG = LoggerFactory.getLogger(SharedCache.class.getName());
+
static {
try {
md = MessageDigest.getInstance("MD5");
@@ -97,11 +110,13 @@ public class SharedCache {
Table tblCopy = tbl.deepCopy();
tblCopy.setDbName(HiveStringUtils.normalizeIdentifier(dbName));
tblCopy.setTableName(HiveStringUtils.normalizeIdentifier(tblName));
- for (FieldSchema fs : tblCopy.getPartitionKeys()) {
- fs.setName(HiveStringUtils.normalizeIdentifier(fs.getName()));
+ if (tblCopy.getPartitionKeys() != null) {
+ for (FieldSchema fs : tblCopy.getPartitionKeys()) {
+ fs.setName(HiveStringUtils.normalizeIdentifier(fs.getName()));
+ }
}
TableWrapper wrapper;
- if (tbl.getSd()!=null) {
+ if (tbl.getSd() != null) {
byte[] sdHash = HBaseUtils.hashStorageDescriptor(tbl.getSd(), md);
StorageDescriptor sd = tbl.getSd();
increSd(sd, sdHash);
@@ -121,10 +136,54 @@ public class SharedCache {
}
}
+ public static synchronized ColumnStatisticsObj getCachedTableColStats(String colStatsCacheKey) {
+ return tableColStatsCache.get(colStatsCacheKey);
+ }
+
+ public static synchronized void removeTableColStatsFromCache(String dbName, String tblName) {
+ String partialKey = CacheUtils.buildKeyWithDelimit(dbName, tblName);
+ Iterator<Entry<String, ColumnStatisticsObj>> iterator =
+ tableColStatsCache.entrySet().iterator();
+ while (iterator.hasNext()) {
+ Entry<String, ColumnStatisticsObj> entry = iterator.next();
+ String key = entry.getKey();
+ if (key.toLowerCase().startsWith(partialKey.toLowerCase())) {
+ iterator.remove();
+ }
+ }
+ }
+
+ public static synchronized void removeTableColStatsFromCache(String dbName, String tblName,
+ String colName) {
+ tableColStatsCache.remove(CacheUtils.buildKey(dbName, tblName, colName));
+ }
+
+ public static synchronized void updateTableColStatsInCache(String dbName, String tableName,
+ List<ColumnStatisticsObj> colStatsForTable) {
+ for (ColumnStatisticsObj colStatObj : colStatsForTable) {
+ // Get old stats object if present
+ String key = CacheUtils.buildKey(dbName, tableName, colStatObj.getColName());
+ ColumnStatisticsObj oldStatsObj = tableColStatsCache.get(key);
+ if (oldStatsObj != null) {
+ LOG.debug("CachedStore: updating table column stats for column: " + colStatObj.getColName()
+ + ", of table: " + tableName + " and database: " + dbName);
+ // Update existing stat object's field
+ StatObjectConverter.setFieldsIntoOldStats(oldStatsObj, colStatObj);
+ } else {
+ // No stats exist for this key; add a new object to the cache
+ tableColStatsCache.put(key, colStatObj);
+ }
+ }
+ }
+
public static synchronized void alterTableInCache(String dbName, String tblName, Table newTable) {
removeTableFromCache(dbName, tblName);
addTableToCache(HiveStringUtils.normalizeIdentifier(newTable.getDbName()),
HiveStringUtils.normalizeIdentifier(newTable.getTableName()), newTable);
+ }
+
+ public static synchronized void alterTableInPartitionCache(String dbName, String tblName,
+ Table newTable) {
if (!dbName.equals(newTable.getDbName()) || !tblName.equals(newTable.getTableName())) {
List<Partition> partitions = listCachedPartitions(dbName, tblName, -1);
for (Partition part : partitions) {
@@ -137,6 +196,58 @@ public class SharedCache {
}
}
+ public static synchronized void alterTableInTableColStatsCache(String dbName, String tblName,
+ Table newTable) {
+ if (!dbName.equals(newTable.getDbName()) || !tblName.equals(newTable.getTableName())) {
+ String oldPartialTableStatsKey = CacheUtils.buildKeyWithDelimit(dbName, tblName);
+ Iterator<Entry<String, ColumnStatisticsObj>> iterator =
+ tableColStatsCache.entrySet().iterator();
+ Map<String, ColumnStatisticsObj> newTableColStats =
+ new HashMap<String, ColumnStatisticsObj>();
+ while (iterator.hasNext()) {
+ Entry<String, ColumnStatisticsObj> entry = iterator.next();
+ String key = entry.getKey();
+ ColumnStatisticsObj colStatObj = entry.getValue();
+ if (key.toLowerCase().startsWith(oldPartialTableStatsKey.toLowerCase())) {
+ String[] decomposedKey = CacheUtils.splitTableColStats(key);
+ String newKey = CacheUtils.buildKey(decomposedKey[0], decomposedKey[1], decomposedKey[2]);
+ newTableColStats.put(newKey, colStatObj);
+ iterator.remove();
+ }
+ }
+ tableColStatsCache.putAll(newTableColStats);
+ }
+ }
+
+ public static synchronized void alterTableInPartitionColStatsCache(String dbName, String tblName,
+ Table newTable) {
+ if (!dbName.equals(newTable.getDbName()) || !tblName.equals(newTable.getTableName())) {
+ List<Partition> partitions = listCachedPartitions(dbName, tblName, -1);
+ Map<String, ColumnStatisticsObj> newPartitionColStats =
+ new HashMap<String, ColumnStatisticsObj>();
+ for (Partition part : partitions) {
+ String oldPartialPartitionKey =
+ CacheUtils.buildKeyWithDelimit(dbName, tblName, part.getValues());
+ Iterator<Entry<String, ColumnStatisticsObj>> iterator =
+ partitionColStatsCache.entrySet().iterator();
+ while (iterator.hasNext()) {
+ Entry<String, ColumnStatisticsObj> entry = iterator.next();
+ String key = entry.getKey();
+ ColumnStatisticsObj colStatObj = entry.getValue();
+ if (key.toLowerCase().startsWith(oldPartialPartitionKey.toLowerCase())) {
+ Object[] decomposedKey = CacheUtils.splitPartitionColStats(key);
+ String newKey =
+ CacheUtils.buildKey((String) decomposedKey[0], (String) decomposedKey[1],
+ (List<String>) decomposedKey[2], (String) decomposedKey[3]);
+ newPartitionColStats.put(newKey, colStatObj);
+ iterator.remove();
+ }
+ }
+ }
+ partitionColStatsCache.putAll(newPartitionColStats);
+ }
+ }
+
public static synchronized int getCachedTableCount() {
return tableCache.size();
}
@@ -151,18 +262,6 @@ public class SharedCache {
return tables;
}
- public static synchronized void updateTableColumnStatistics(String dbName, String tableName,
- List<ColumnStatisticsObj> statsObjs) {
- Table tbl = getTableFromCache(dbName, tableName);
- tbl.getSd().getParameters();
- List<String> colNames = new ArrayList<>();
- for (ColumnStatisticsObj statsObj:statsObjs) {
- colNames.add(statsObj.getColName());
- }
- StatsSetupConst.setColumnStatsState(tbl.getParameters(), colNames);
- alterTableInCache(dbName, tableName, tbl);
- }
-
public static synchronized List<TableMeta> getTableMeta(String dbNames, String tableNames, List<String> tableTypes) {
List<TableMeta> tableMetas = new ArrayList<TableMeta>();
for (String dbName : listCachedDatabases()) {
@@ -214,14 +313,51 @@ public class SharedCache {
return partitionCache.containsKey(CacheUtils.buildKey(dbName, tblName, part_vals));
}
- public static synchronized Partition removePartitionFromCache(String dbName, String tblName, List<String> part_vals) {
- PartitionWrapper wrapper = partitionCache.remove(CacheUtils.buildKey(dbName, tblName, part_vals));
- if (wrapper.getSdHash()!=null) {
+ public static synchronized Partition removePartitionFromCache(String dbName, String tblName,
+ List<String> part_vals) {
+ PartitionWrapper wrapper =
+ partitionCache.remove(CacheUtils.buildKey(dbName, tblName, part_vals));
+ if (wrapper.getSdHash() != null) {
decrSd(wrapper.getSdHash());
}
return wrapper.getPartition();
}
+ // Remove cached column stats for all partitions of a table
+ public static synchronized void removePartitionColStatsFromCache(String dbName, String tblName) {
+ String partialKey = CacheUtils.buildKeyWithDelimit(dbName, tblName);
+ Iterator<Entry<String, ColumnStatisticsObj>> iterator =
+ partitionColStatsCache.entrySet().iterator();
+ while (iterator.hasNext()) {
+ Entry<String, ColumnStatisticsObj> entry = iterator.next();
+ String key = entry.getKey();
+ if (key.toLowerCase().startsWith(partialKey.toLowerCase())) {
+ iterator.remove();
+ }
+ }
+ }
+
+ // Remove cached column stats for a particular partition of a table
+ public static synchronized void removePartitionColStatsFromCache(String dbName, String tblName,
+ List<String> partVals) {
+ String partialKey = CacheUtils.buildKeyWithDelimit(dbName, tblName, partVals);
+ Iterator<Entry<String, ColumnStatisticsObj>> iterator =
+ partitionColStatsCache.entrySet().iterator();
+ while (iterator.hasNext()) {
+ Entry<String, ColumnStatisticsObj> entry = iterator.next();
+ String key = entry.getKey();
+ if (key.toLowerCase().startsWith(partialKey.toLowerCase())) {
+ iterator.remove();
+ }
+ }
+ }
+
+ // Remove cached column stats for a particular partition and a particular column of a table
+ public static synchronized void removePartitionColStatsFromCache(String dbName, String tblName,
+ List<String> partVals, String colName) {
+ partitionColStatsCache.remove(CacheUtils.buildKey(dbName, tblName, partVals, colName));
+ }
+
public static synchronized List<Partition> listCachedPartitions(String dbName, String tblName, int max) {
List<Partition> partitions = new ArrayList<Partition>();
int count = 0;
@@ -236,22 +372,53 @@ public class SharedCache {
return partitions;
}
- public static synchronized void alterPartitionInCache(String dbName, String tblName, List<String> partVals, Partition newPart) {
+ public static synchronized void alterPartitionInCache(String dbName, String tblName,
+ List<String> partVals, Partition newPart) {
removePartitionFromCache(dbName, tblName, partVals);
addPartitionToCache(HiveStringUtils.normalizeIdentifier(newPart.getDbName()),
HiveStringUtils.normalizeIdentifier(newPart.getTableName()), newPart);
}
- public static synchronized void updatePartitionColumnStatistics(String dbName, String tableName,
- List<String> partVals, List<ColumnStatisticsObj> statsObjs) {
- Partition part = getPartitionFromCache(dbName, tableName, partVals);
- part.getSd().getParameters();
- List<String> colNames = new ArrayList<>();
- for (ColumnStatisticsObj statsObj:statsObjs) {
- colNames.add(statsObj.getColName());
+ public static synchronized void alterPartitionInColStatsCache(String dbName, String tblName,
+ List<String> partVals, Partition newPart) {
+ String oldPartialPartitionKey = CacheUtils.buildKeyWithDelimit(dbName, tblName, partVals);
+ Map<String, ColumnStatisticsObj> newPartitionColStats =
+ new HashMap<String, ColumnStatisticsObj>();
+ Iterator<Entry<String, ColumnStatisticsObj>> iterator =
+ partitionColStatsCache.entrySet().iterator();
+ while (iterator.hasNext()) {
+ Entry<String, ColumnStatisticsObj> entry = iterator.next();
+ String key = entry.getKey();
+ ColumnStatisticsObj colStatObj = entry.getValue();
+ if (key.toLowerCase().startsWith(oldPartialPartitionKey.toLowerCase())) {
+ Object[] decomposedKey = CacheUtils.splitPartitionColStats(key);
+ String newKey =
+ CacheUtils.buildKey(HiveStringUtils.normalizeIdentifier(newPart.getDbName()),
+ HiveStringUtils.normalizeIdentifier(newPart.getTableName()), newPart.getValues(),
+ (String) decomposedKey[3]);
+ newPartitionColStats.put(newKey, colStatObj);
+ iterator.remove();
+ }
+ }
+ partitionColStatsCache.putAll(newPartitionColStats);
+ }
+
+ public static synchronized void updatePartitionColStatsInCache(String dbName, String tableName,
+ List<String> partVals, List<ColumnStatisticsObj> colStatsObjs) {
+ for (ColumnStatisticsObj colStatObj : colStatsObjs) {
+ // Get old stats object if present
+ String key = CacheUtils.buildKey(dbName, tableName, partVals, colStatObj.getColName());
+ ColumnStatisticsObj oldStatsObj = partitionColStatsCache.get(key);
+ if (oldStatsObj != null) {
+ // Update existing stat object's field
+ LOG.debug("CachedStore: updating partition column stats for column: "
+ + colStatObj.getColName() + ", of table: " + tableName + " and database: " + dbName);
+ StatObjectConverter.setFieldsIntoOldStats(oldStatsObj, colStatObj);
+ } else {
+ // No stats exist for this key; add a new object to the cache
+ partitionColStatsCache.put(key, colStatObj);
+ }
}
- StatsSetupConst.setColumnStatsState(part.getParameters(), colNames);
- alterPartitionInCache(dbName, tableName, partVals, part);
}
public static synchronized int getCachedPartitionCount() {
@@ -262,10 +429,47 @@ public class SharedCache {
return partitionColStatsCache.get(key);
}
- public static synchronized void addPartitionColStatsToCache(Map<String, ColumnStatisticsObj> aggrStatsPerPartition) {
- partitionColStatsCache.putAll(aggrStatsPerPartition);
+ public static synchronized void addPartitionColStatsToCache(String dbName, String tableName,
+ Map<String, List<ColumnStatisticsObj>> colStatsPerPartition) {
+ for (Map.Entry<String, List<ColumnStatisticsObj>> entry : colStatsPerPartition.entrySet()) {
+ String partName = entry.getKey();
+ try {
+ List<String> partVals = Warehouse.getPartValuesFromPartName(partName);
+ for (ColumnStatisticsObj colStatObj : entry.getValue()) {
+ String key = CacheUtils.buildKey(dbName, tableName, partVals, colStatObj.getColName());
+ partitionColStatsCache.put(key, colStatObj);
+ }
+ } catch (MetaException e) {
+ LOG.info("Unable to add partition: " + partName + " to SharedCache", e);
+ }
+ }
}
+ public static synchronized void refreshPartitionColStats(String dbName, String tableName,
+ Map<String, List<ColumnStatisticsObj>> newColStatsPerPartition) {
+ LOG.debug("CachedStore: updating cached partition column stats objects for database: " + dbName
+ + " and table: " + tableName);
+ removePartitionColStatsFromCache(dbName, tableName);
+ addPartitionColStatsToCache(dbName, tableName, newColStatsPerPartition);
+ }
+
+ public static synchronized void addTableColStatsToCache(String dbName, String tableName,
+ List<ColumnStatisticsObj> colStatsForTable) {
+ for (ColumnStatisticsObj colStatObj : colStatsForTable) {
+ String key = CacheUtils.buildKey(dbName, tableName, colStatObj.getColName());
+ tableColStatsCache.put(key, colStatObj);
+ }
+ }
+
+ public static synchronized void refreshTableColStats(String dbName, String tableName,
+ List<ColumnStatisticsObj> colStatsForTable) {
+ LOG.debug("CachedStore: updating cached table column stats objects for database: " + dbName
+ + " and table: " + tableName);
+ // Remove all old cache entries for this table
+ removeTableColStatsFromCache(dbName, tableName);
+ // Add new entries to cache
+ addTableColStatsToCache(dbName, tableName, colStatsForTable);
+ }
public static void increSd(StorageDescriptor sd, byte[] sdHash) {
ByteArrayWrapper byteArray = new ByteArrayWrapper(sdHash);
@@ -295,6 +499,7 @@ public class SharedCache {
// Replace databases in databaseCache with the new list
public static synchronized void refreshDatabases(List<Database> databases) {
+ LOG.debug("CachedStore: updating cached database objects");
for (String dbName : listCachedDatabases()) {
removeDatabaseFromCache(dbName);
}
@@ -305,6 +510,7 @@ public class SharedCache {
// Replace tables in tableCache with the new list
public static synchronized void refreshTables(String dbName, List<Table> tables) {
+ LOG.debug("CachedStore: updating cached table objects for database: " + dbName);
for (Table tbl : listCachedTables(dbName)) {
removeTableFromCache(dbName, tbl.getTableName());
}
@@ -313,17 +519,18 @@ public class SharedCache {
}
}
- public static void refreshPartitions(String dbName, String tblName, List<Partition> partitions) {
- List<String> keysToRemove = new ArrayList<String>();
- for (Map.Entry<String, PartitionWrapper> entry : partitionCache.entrySet()) {
- if (entry.getValue().getPartition().getDbName().equals(dbName)
- && entry.getValue().getPartition().getTableName().equals(tblName)) {
- keysToRemove.add(entry.getKey());
+ public static synchronized void refreshPartitions(String dbName, String tblName,
+ List<Partition> partitions) {
+ LOG.debug("CachedStore: updating cached partition objects for database: " + dbName
+ + " and table: " + tblName);
+ Iterator<Entry<String, PartitionWrapper>> iterator = partitionCache.entrySet().iterator();
+ while (iterator.hasNext()) {
+ PartitionWrapper partitionWrapper = iterator.next().getValue();
+ if (partitionWrapper.getPartition().getDbName().equals(dbName)
+ && partitionWrapper.getPartition().getTableName().equals(tblName)) {
+ iterator.remove();
}
}
- for (String key : keysToRemove) {
- partitionCache.remove(key);
- }
for (Partition part : partitions) {
addPartitionToCache(dbName, tblName, part);
}
http://git-wip-us.apache.org/repos/asf/hive/blob/d85beaa9/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseStore.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseStore.java b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseStore.java
index 0c7d8bb..a7681dd 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseStore.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseStore.java
@@ -2854,7 +2854,7 @@ public class HBaseStore implements RawStore {
}
@Override
- public Map<String, ColumnStatisticsObj> getAggrColStatsForTablePartitions(String dbName,
+ public Map<String, List<ColumnStatisticsObj>> getColStatsForTablePartitions(String dbName,
String tableName) throws MetaException, NoSuchObjectException {
// TODO: see if it makes sense to implement this here
return null;
http://git-wip-us.apache.org/repos/asf/hive/blob/d85beaa9/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/merge/ColumnStatsMergerFactory.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/merge/ColumnStatsMergerFactory.java b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/merge/ColumnStatsMergerFactory.java
index da6cd46..fe890e4 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/merge/ColumnStatsMergerFactory.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/merge/ColumnStatsMergerFactory.java
@@ -26,6 +26,7 @@ import org.apache.hadoop.hive.metastore.api.BooleanColumnStatsData;
import org.apache.hadoop.hive.metastore.api.ColumnStatisticsData;
import org.apache.hadoop.hive.metastore.api.ColumnStatisticsData._Fields;
import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
+import org.apache.hadoop.hive.metastore.api.DateColumnStatsData;
import org.apache.hadoop.hive.metastore.api.DecimalColumnStatsData;
import org.apache.hadoop.hive.metastore.api.DoubleColumnStatsData;
import org.apache.hadoop.hive.metastore.api.LongColumnStatsData;
@@ -35,7 +36,7 @@ public class ColumnStatsMergerFactory {
private ColumnStatsMergerFactory() {
}
-
+
// we depend on the toString() method for javolution.util.FastCollection.
private static int countNumBitVectors(String s) {
if (s != null) {
@@ -88,8 +89,15 @@ public class ColumnStatsMergerFactory {
numBitVectors = nbvNew == nbvOld ? nbvNew : 0;
break;
}
+ case DATE_STATS: {
+ agg = new DateColumnStatsMerger();
+ int nbvNew = countNumBitVectors(statsObjNew.getStatsData().getDateStats().getBitVectors());
+ int nbvOld = countNumBitVectors(statsObjOld.getStatsData().getDateStats().getBitVectors());
+ numBitVectors = nbvNew == nbvOld ? nbvNew : 0;
+ break;
+ }
default:
- throw new RuntimeException("Woh, bad. Unknown stats type " + typeNew.toString());
+ throw new IllegalArgumentException("Unknown stats type " + typeNew.toString());
}
if (numBitVectors > 0) {
agg.ndvEstimator = new NumDistinctValueEstimator(numBitVectors);
@@ -127,8 +135,12 @@ public class ColumnStatsMergerFactory {
csd.setDecimalStats(new DecimalColumnStatsData());
break;
+ case DATE_STATS:
+ csd.setDateStats(new DateColumnStatsData());
+ break;
+
default:
- throw new RuntimeException("Woh, bad. Unknown stats type!");
+ throw new IllegalArgumentException("Unknown stats type");
}
cso.setStatsData(csd);
http://git-wip-us.apache.org/repos/asf/hive/blob/d85beaa9/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/merge/DateColumnStatsMerger.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/merge/DateColumnStatsMerger.java b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/merge/DateColumnStatsMerger.java
new file mode 100644
index 0000000..3179b23
--- /dev/null
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/merge/DateColumnStatsMerger.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hadoop.hive.metastore.hbase.stats.merge;
+
+import org.apache.hadoop.hive.metastore.NumDistinctValueEstimator;
+import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
+import org.apache.hadoop.hive.metastore.api.Date;
+import org.apache.hadoop.hive.metastore.api.DateColumnStatsData;
+
+public class DateColumnStatsMerger extends ColumnStatsMerger {
+ @Override
+ public void merge(ColumnStatisticsObj aggregateColStats, ColumnStatisticsObj newColStats) {
+ DateColumnStatsData aggregateData = aggregateColStats.getStatsData().getDateStats();
+ DateColumnStatsData newData = newColStats.getStatsData().getDateStats();
+ Date lowValue =
+ aggregateData.getLowValue().compareTo(newData.getLowValue()) < 0 ? aggregateData
+ .getLowValue() : newData.getLowValue();
+ aggregateData.setLowValue(lowValue);
+ Date highValue =
+ aggregateData.getHighValue().compareTo(newData.getHighValue()) >= 0 ? aggregateData
+ .getHighValue() : newData.getHighValue();
+ aggregateData.setHighValue(highValue);
+ aggregateData.setNumNulls(aggregateData.getNumNulls() + newData.getNumNulls());
+ if (ndvEstimator == null || !newData.isSetBitVectors() || newData.getBitVectors().length() == 0) {
+ aggregateData.setNumDVs(Math.max(aggregateData.getNumDVs(), newData.getNumDVs()));
+ } else {
+ ndvEstimator.mergeEstimators(new NumDistinctValueEstimator(aggregateData.getBitVectors(),
+ ndvEstimator.getnumBitVectors()));
+ ndvEstimator.mergeEstimators(new NumDistinctValueEstimator(newData.getBitVectors(),
+ ndvEstimator.getnumBitVectors()));
+ long ndv = ndvEstimator.estimateNumDistinctValues();
+ LOG.debug("Use bitvector to merge column " + aggregateColStats.getColName() + "'s ndvs of "
+ + aggregateData.getNumDVs() + " and " + newData.getNumDVs() + " to be " + ndv);
+ aggregateData.setNumDVs(ndv);
+ aggregateData.setBitVectors(ndvEstimator.serialize().toString());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/d85beaa9/metastore/src/test/org/apache/hadoop/hive/metastore/DummyRawStoreControlledCommit.java
----------------------------------------------------------------------
diff --git a/metastore/src/test/org/apache/hadoop/hive/metastore/DummyRawStoreControlledCommit.java b/metastore/src/test/org/apache/hadoop/hive/metastore/DummyRawStoreControlledCommit.java
index f613c30..f53944f 100644
--- a/metastore/src/test/org/apache/hadoop/hive/metastore/DummyRawStoreControlledCommit.java
+++ b/metastore/src/test/org/apache/hadoop/hive/metastore/DummyRawStoreControlledCommit.java
@@ -872,7 +872,7 @@ public class DummyRawStoreControlledCommit implements RawStore, Configurable {
}
@Override
- public Map<String, ColumnStatisticsObj> getAggrColStatsForTablePartitions(String dbName,
+ public Map<String, List<ColumnStatisticsObj>> getColStatsForTablePartitions(String dbName,
String tableName) throws MetaException, NoSuchObjectException {
// TODO Auto-generated method stub
return null;
http://git-wip-us.apache.org/repos/asf/hive/blob/d85beaa9/metastore/src/test/org/apache/hadoop/hive/metastore/DummyRawStoreForJdoConnection.java
----------------------------------------------------------------------
diff --git a/metastore/src/test/org/apache/hadoop/hive/metastore/DummyRawStoreForJdoConnection.java b/metastore/src/test/org/apache/hadoop/hive/metastore/DummyRawStoreForJdoConnection.java
index 1720e37..e0f5cdb 100644
--- a/metastore/src/test/org/apache/hadoop/hive/metastore/DummyRawStoreForJdoConnection.java
+++ b/metastore/src/test/org/apache/hadoop/hive/metastore/DummyRawStoreForJdoConnection.java
@@ -888,7 +888,7 @@ public class DummyRawStoreForJdoConnection implements RawStore {
}
@Override
- public Map<String, ColumnStatisticsObj> getAggrColStatsForTablePartitions(String dbName,
+ public Map<String, List<ColumnStatisticsObj>> getColStatsForTablePartitions(String dbName,
String tableName) throws MetaException, NoSuchObjectException {
// TODO Auto-generated method stub
return null;
http://git-wip-us.apache.org/repos/asf/hive/blob/d85beaa9/metastore/src/test/org/apache/hadoop/hive/metastore/cache/TestCachedStore.java
----------------------------------------------------------------------
diff --git a/metastore/src/test/org/apache/hadoop/hive/metastore/cache/TestCachedStore.java b/metastore/src/test/org/apache/hadoop/hive/metastore/cache/TestCachedStore.java
index 0ab20d6..7a3ec09 100644
--- a/metastore/src/test/org/apache/hadoop/hive/metastore/cache/TestCachedStore.java
+++ b/metastore/src/test/org/apache/hadoop/hive/metastore/cache/TestCachedStore.java
@@ -25,11 +25,22 @@ import java.util.Map;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.ObjectStore;
+import org.apache.hadoop.hive.metastore.TableType;
import org.apache.hadoop.hive.metastore.TestObjectStore.MockPartitionExpressionProxy;
+import org.apache.hadoop.hive.metastore.api.BooleanColumnStatsData;
+import org.apache.hadoop.hive.metastore.api.ColumnStatistics;
+import org.apache.hadoop.hive.metastore.api.ColumnStatisticsData;
+import org.apache.hadoop.hive.metastore.api.ColumnStatisticsDesc;
+import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
import org.apache.hadoop.hive.metastore.api.Database;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.LongColumnStatsData;
+import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.PrincipalType;
+import org.apache.hadoop.hive.metastore.api.SerDeInfo;
import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.metastore.api.StringColumnStatsData;
import org.apache.hadoop.hive.metastore.api.Table;
import org.junit.Assert;
import org.junit.Before;
@@ -37,18 +48,24 @@ import org.junit.Test;
public class TestCachedStore {
- private CachedStore cachedStore = new CachedStore();
+ private ObjectStore objectStore;
+ private CachedStore cachedStore;
@Before
public void setUp() throws Exception {
HiveConf conf = new HiveConf();
- conf.setVar(HiveConf.ConfVars.METASTORE_EXPRESSION_PROXY_CLASS, MockPartitionExpressionProxy.class.getName());
-
- ObjectStore objectStore = new ObjectStore();
+ conf.setBoolean(HiveConf.ConfVars.HIVE_IN_TEST.varname, true);
+ conf.setVar(HiveConf.ConfVars.METASTORE_EXPRESSION_PROXY_CLASS,
+ MockPartitionExpressionProxy.class.getName());
+ objectStore = new ObjectStore();
objectStore.setConf(conf);
+ cachedStore = new CachedStore();
+ cachedStore.setConf(conf);
+ // Stop the CachedStore cache update service. We'll start it explicitly to control the test
+ cachedStore.stopCacheUpdateService(1);
- cachedStore.setRawStore(objectStore);
-
+ // Stop the CachedStore cache update service. We'll start it explicitly to control the test
+ cachedStore.stopCacheUpdateService(1);
SharedCache.getDatabaseCache().clear();
SharedCache.getTableCache().clear();
SharedCache.getPartitionCache().clear();
@@ -56,6 +73,426 @@ public class TestCachedStore {
SharedCache.getPartitionColStatsCache().clear();
}
+ /**********************************************************************************************
+ * Methods that test CachedStore
+ *********************************************************************************************/
+
+ @Test
+ public void testDatabaseOps() throws Exception {
+ // Add a db via ObjectStore
+ String dbName = "testDatabaseOps";
+ String dbDescription = "testDatabaseOps";
+ String dbLocation = "file:/tmp";
+ Map<String, String> dbParams = new HashMap<String, String>();
+ String dbOwner = "user1";
+ Database db = new Database(dbName, dbDescription, dbLocation, dbParams);
+ db.setOwnerName(dbOwner);
+ db.setOwnerType(PrincipalType.USER);
+ objectStore.createDatabase(db);
+ db = objectStore.getDatabase(dbName);
+ // Prewarm CachedStore
+ cachedStore.prewarm();
+
+ // Read database via CachedStore
+ Database dbNew = cachedStore.getDatabase(dbName);
+ Assert.assertEquals(db, dbNew);
+
+ // Add another db via CachedStore
+ final String dbName1 = "testDatabaseOps1";
+ final String dbDescription1 = "testDatabaseOps1";
+ Database db1 = new Database(dbName1, dbDescription1, dbLocation, dbParams);
+ db1.setOwnerName(dbOwner);
+ db1.setOwnerType(PrincipalType.USER);
+ cachedStore.createDatabase(db1);
+ db1 = cachedStore.getDatabase(dbName1);
+
+ // Read db via ObjectStore
+ dbNew = objectStore.getDatabase(dbName1);
+ Assert.assertEquals(db1, dbNew);
+
+ // Alter the db via CachedStore (can only alter owner or parameters)
+ db = new Database(dbName, dbDescription, dbLocation, dbParams);
+ dbOwner = "user2";
+ db.setOwnerName(dbOwner);
+ db.setOwnerType(PrincipalType.USER);
+ cachedStore.alterDatabase(dbName, db);
+ db = cachedStore.getDatabase(dbName);
+
+ // Read db via ObjectStore
+ dbNew = objectStore.getDatabase(dbName);
+ Assert.assertEquals(db, dbNew);
+
+ // Add another db via ObjectStore
+ final String dbName2 = "testDatabaseOps2";
+ final String dbDescription2 = "testDatabaseOps2";
+ Database db2 = new Database(dbName2, dbDescription2, dbLocation, dbParams);
+ db2.setOwnerName(dbOwner);
+ db2.setOwnerType(PrincipalType.USER);
+ objectStore.createDatabase(db2);
+ db2 = objectStore.getDatabase(dbName2);
+
+ // Alter db "testDatabaseOps" via ObjectStore
+ dbOwner = "user1";
+ db = new Database(dbName, dbDescription, dbLocation, dbParams);
+ db.setOwnerName(dbOwner);
+ db.setOwnerType(PrincipalType.USER);
+ objectStore.alterDatabase(dbName, db);
+ db = objectStore.getDatabase(dbName);
+
+ // Drop db "testDatabaseOps1" via ObjectStore
+ objectStore.dropDatabase(dbName1);
+
+ // We update twice to accurately detect if cache is dirty or not
+ updateCache(cachedStore, 100, 500, 100);
+ updateCache(cachedStore, 100, 500, 100);
+
+ // Read the newly added db via CachedStore
+ dbNew = cachedStore.getDatabase(dbName2);
+ Assert.assertEquals(db2, dbNew);
+
+ // Read the altered db via CachedStore (altered user from "user2" to "user1")
+ dbNew = cachedStore.getDatabase(dbName);
+ Assert.assertEquals(db, dbNew);
+
+ // Try to read the dropped db after cache update
+ try {
+ dbNew = cachedStore.getDatabase(dbName1);
+ Assert.fail("The database: " + dbName1
+ + " should have been removed from the cache after running the update service");
+ } catch (NoSuchObjectException e) {
+ // Expected
+ }
+
+ // Clean up
+ objectStore.dropDatabase(dbName);
+ objectStore.dropDatabase(dbName2);
+ }
+
+ @Test
+ public void testTableOps() throws Exception {
+ // Add a db via ObjectStore
+ String dbName = "testTableOps";
+ String dbDescription = "testTableOps";
+ String dbLocation = "file:/tmp";
+ Map<String, String> dbParams = new HashMap<String, String>();
+ String dbOwner = "user1";
+ Database db = new Database(dbName, dbDescription, dbLocation, dbParams);
+ db.setOwnerName(dbOwner);
+ db.setOwnerType(PrincipalType.USER);
+ objectStore.createDatabase(db);
+ db = objectStore.getDatabase(dbName);
+
+ // Add a table via ObjectStore
+ String tblName = "tbl";
+ String tblOwner = "user1";
+ String serdeLocation = "file:/tmp";
+ FieldSchema col1 = new FieldSchema("col1", "int", "integer column");
+ FieldSchema col2 = new FieldSchema("col2", "string", "string column");
+ List<FieldSchema> cols = new ArrayList<FieldSchema>();
+ cols.add(col1);
+ cols.add(col2);
+ Map<String, String> serdeParams = new HashMap<String, String>();
+ Map<String, String> tblParams = new HashMap<String, String>();
+ SerDeInfo serdeInfo = new SerDeInfo("serde", "seriallib", new HashMap<String, String>());
+ StorageDescriptor sd =
+ new StorageDescriptor(cols, serdeLocation, "input", "output", false, 0, serdeInfo, null,
+ null, serdeParams);
+ sd.setStoredAsSubDirectories(false);
+ Table tbl =
+ new Table(tblName, dbName, tblOwner, 0, 0, 0, sd, new ArrayList<FieldSchema>(), tblParams,
+ null, null, TableType.MANAGED_TABLE.toString());
+ objectStore.createTable(tbl);
+ tbl = objectStore.getTable(dbName, tblName);
+
+ // Prewarm CachedStore
+ cachedStore.prewarm();
+
+ // Read database, table via CachedStore
+ Database dbNew = cachedStore.getDatabase(dbName);
+ Assert.assertEquals(db, dbNew);
+ Table tblNew = cachedStore.getTable(dbName, tblName);
+ Assert.assertEquals(tbl, tblNew);
+
+ // Add a new table via CachedStore
+ String tblName1 = "tbl1";
+ Table tbl1 =
+ new Table(tblName1, dbName, tblOwner, 0, 0, 0, sd, new ArrayList<FieldSchema>(), tblParams,
+ null, null, TableType.MANAGED_TABLE.toString());
+ cachedStore.createTable(tbl1);
+ tbl1 = cachedStore.getTable(dbName, tblName1);
+
+ // Read via object store
+ tblNew = objectStore.getTable(dbName, tblName1);
+ Assert.assertEquals(tbl1, tblNew);
+
+ // Add a new table via ObjectStore
+ String tblName2 = "tbl2";
+ Table tbl2 =
+ new Table(tblName2, dbName, tblOwner, 0, 0, 0, sd, new ArrayList<FieldSchema>(), tblParams,
+ null, null, TableType.MANAGED_TABLE.toString());
+ objectStore.createTable(tbl2);
+ tbl2 = objectStore.getTable(dbName, tblName2);
+
+ // Alter table "tbl" via ObjectStore
+ tblOwner = "user2";
+ tbl =
+ new Table(tblName, dbName, tblOwner, 0, 0, 0, sd, new ArrayList<FieldSchema>(), tblParams,
+ null, null, TableType.MANAGED_TABLE.toString());
+ objectStore.alterTable(dbName, tblName, tbl);
+ tbl = objectStore.getTable(dbName, tblName);
+
+ // Drop table "tbl1" via ObjectStore
+ objectStore.dropTable(dbName, tblName1);
+
+ // We update twice to accurately detect if cache is dirty or not
+ updateCache(cachedStore, 100, 500, 100);
+ updateCache(cachedStore, 100, 500, 100);
+
+ // Read "tbl2" via CachedStore
+ tblNew = cachedStore.getTable(dbName, tblName2);
+ Assert.assertEquals(tbl2, tblNew);
+
+ // Read the altered "tbl" via CachedStore
+ tblNew = cachedStore.getTable(dbName, tblName);
+ Assert.assertEquals(tbl, tblNew);
+
+ // Try to read the dropped "tbl1" via CachedStore (should throw exception)
+ tblNew = cachedStore.getTable(dbName, tblName1);
+ Assert.assertNull(tblNew);
+
+ // Should return "tbl" and "tbl2"
+ List<String> tblNames = cachedStore.getTables(dbName, "*");
+ Assert.assertTrue(tblNames.contains(tblName));
+ Assert.assertTrue(!tblNames.contains(tblName1));
+ Assert.assertTrue(tblNames.contains(tblName2));
+
+ // Clean up
+ objectStore.dropTable(dbName, tblName);
+ objectStore.dropTable(dbName, tblName2);
+ objectStore.dropDatabase(dbName);
+ }
+
+ @Test
+ public void testPartitionOps() throws Exception {
+ // Add a db via ObjectStore
+ String dbName = "testPartitionOps";
+ String dbDescription = "testPartitionOps";
+ String dbLocation = "file:/tmp";
+ Map<String, String> dbParams = new HashMap<String, String>();
+ String dbOwner = "user1";
+ Database db = new Database(dbName, dbDescription, dbLocation, dbParams);
+ db.setOwnerName(dbOwner);
+ db.setOwnerType(PrincipalType.USER);
+ objectStore.createDatabase(db);
+ db = objectStore.getDatabase(dbName);
+
+ // Add a table via ObjectStore
+ String tblName = "tbl";
+ String tblOwner = "user1";
+ String serdeLocation = "file:/tmp";
+ FieldSchema col1 = new FieldSchema("col1", "int", "integer column");
+ FieldSchema col2 = new FieldSchema("col2", "string", "string column");
+ List<FieldSchema> cols = new ArrayList<FieldSchema>();
+ cols.add(col1);
+ cols.add(col2);
+ Map<String, String> serdeParams = new HashMap<String, String>();
+ Map<String, String> tblParams = new HashMap<String, String>();
+ SerDeInfo serdeInfo = new SerDeInfo("serde", "seriallib", null);
+ StorageDescriptor sd =
+ new StorageDescriptor(cols, serdeLocation, "input", "output", false, 0, serdeInfo, null,
+ null, serdeParams);
+ FieldSchema ptnCol1 = new FieldSchema("part1", "string", "string partition column");
+ List<FieldSchema> ptnCols = new ArrayList<FieldSchema>();
+ ptnCols.add(ptnCol1);
+ Table tbl =
+ new Table(tblName, dbName, tblOwner, 0, 0, 0, sd, ptnCols, tblParams, null, null,
+ TableType.MANAGED_TABLE.toString());
+ objectStore.createTable(tbl);
+ tbl = objectStore.getTable(dbName, tblName);
+ final String ptnColVal1 = "aaa";
+ Map<String, String> partParams = new HashMap<String, String>();
+ Partition ptn1 =
+ new Partition(Arrays.asList(ptnColVal1), dbName, tblName, 0, 0, sd, partParams);
+ objectStore.addPartition(ptn1);
+ ptn1 = objectStore.getPartition(dbName, tblName, Arrays.asList(ptnColVal1));
+ final String ptnColVal2 = "bbb";
+ Partition ptn2 =
+ new Partition(Arrays.asList(ptnColVal2), dbName, tblName, 0, 0, sd, partParams);
+ objectStore.addPartition(ptn2);
+ ptn2 = objectStore.getPartition(dbName, tblName, Arrays.asList(ptnColVal2));
+
+ // Prewarm CachedStore
+ cachedStore.prewarm();
+
+ // Read database, table, partition via CachedStore
+ Database dbNew = cachedStore.getDatabase(dbName);
+ Assert.assertEquals(db, dbNew);
+ Table tblNew = cachedStore.getTable(dbName, tblName);
+ Assert.assertEquals(tbl, tblNew);
+ Partition newPtn1 = cachedStore.getPartition(dbName, tblName, Arrays.asList(ptnColVal1));
+ Assert.assertEquals(ptn1, newPtn1);
+ Partition newPtn2 = cachedStore.getPartition(dbName, tblName, Arrays.asList(ptnColVal2));
+ Assert.assertEquals(ptn2, newPtn2);
+
+ // Add a new partition via ObjectStore
+ final String ptnColVal3 = "ccc";
+ Partition ptn3 =
+ new Partition(Arrays.asList(ptnColVal3), dbName, tblName, 0, 0, sd, partParams);
+ objectStore.addPartition(ptn3);
+ ptn3 = objectStore.getPartition(dbName, tblName, Arrays.asList(ptnColVal3));
+
+ // Alter an existing partition ("aaa") via ObjectStore
+ final String ptnColVal1Alt = "aaaAlt";
+ Partition ptn1Atl =
+ new Partition(Arrays.asList(ptnColVal1Alt), dbName, tblName, 0, 0, sd, partParams);
+ objectStore.alterPartition(dbName, tblName, Arrays.asList(ptnColVal1), ptn1Atl);
+ ptn1Atl = objectStore.getPartition(dbName, tblName, Arrays.asList(ptnColVal1Alt));
+
+ // Drop an existing partition ("bbb") via ObjectStore
+ objectStore.dropPartition(dbName, tblName, Arrays.asList(ptnColVal2));
+
+ // We update twice to accurately detect if cache is dirty or not
+ updateCache(cachedStore, 100, 500, 100);
+ updateCache(cachedStore, 100, 500, 100);
+
+ // Read the newly added partition via CachedStore
+ Partition newPtn = cachedStore.getPartition(dbName, tblName, Arrays.asList(ptnColVal3));
+ Assert.assertEquals(ptn3, newPtn);
+
+ // Read the altered partition via CachedStore
+ newPtn = cachedStore.getPartition(dbName, tblName, Arrays.asList(ptnColVal1Alt));
+ Assert.assertEquals(ptn1Atl, newPtn);
+
+ // Try to read the dropped partition via CachedStore
+ try {
+ newPtn = cachedStore.getPartition(dbName, tblName, Arrays.asList(ptnColVal2));
+ Assert.fail("The partition: " + ptnColVal2
+ + " should have been removed from the cache after running the update service");
+ } catch (NoSuchObjectException e) {
+ // Expected
+ }
+ }
+
+ //@Test
+ public void testTableColStatsOps() throws Exception {
+ // Add a db via ObjectStore
+ String dbName = "testTableColStatsOps";
+ String dbDescription = "testTableColStatsOps";
+ String dbLocation = "file:/tmp";
+ Map<String, String> dbParams = new HashMap<String, String>();
+ String dbOwner = "user1";
+ Database db = new Database(dbName, dbDescription, dbLocation, dbParams);
+ db.setOwnerName(dbOwner);
+ db.setOwnerType(PrincipalType.USER);
+ objectStore.createDatabase(db);
+ db = objectStore.getDatabase(dbName);
+
+ // Add a table via ObjectStore
+ final String tblName = "tbl";
+ final String tblOwner = "user1";
+ final String serdeLocation = "file:/tmp";
+ final FieldSchema col1 = new FieldSchema("col1", "int", "integer column");
+ // Stats values for col1
+ long col1LowVal = 5;
+ long col1HighVal = 500;
+ long col1Nulls = 10;
+ long col1DV = 20;
+ final FieldSchema col2 = new FieldSchema("col2", "string", "string column");
+ // Stats values for col2
+ long col2MaxColLen = 100;
+ double col2AvgColLen = 45.5;
+ long col2Nulls = 5;
+ long col2DV = 40;
+ final FieldSchema col3 = new FieldSchema("col3", "boolean", "boolean column");
+ // Stats values for col3
+ long col3NumTrues = 100;
+ long col3NumFalses = 30;
+ long col3Nulls = 10;
+ final List<FieldSchema> cols = new ArrayList<FieldSchema>();
+ cols.add(col1);
+ cols.add(col2);
+ cols.add(col3);
+ Map<String, String> serdeParams = new HashMap<String, String>();
+ Map<String, String> tblParams = new HashMap<String, String>();
+ final SerDeInfo serdeInfo = new SerDeInfo("serde", "seriallib", null);
+ StorageDescriptor sd =
+ new StorageDescriptor(cols, serdeLocation, "input", "output", false, 0, serdeInfo, null,
+ null, serdeParams);
+ Table tbl =
+ new Table(tblName, dbName, tblOwner, 0, 0, 0, sd, new ArrayList<FieldSchema>(), tblParams,
+ null, null, TableType.MANAGED_TABLE.toString());
+ objectStore.createTable(tbl);
+ tbl = objectStore.getTable(dbName, tblName);
+
+ // Add ColumnStatistics for tbl to metastore DB via ObjectStore
+ ColumnStatistics stats = new ColumnStatistics();
+ ColumnStatisticsDesc statsDesc = new ColumnStatisticsDesc(true, dbName, tblName);
+ List<ColumnStatisticsObj> colStatObjs = new ArrayList<ColumnStatisticsObj>();
+
+ // Col1
+ ColumnStatisticsData data1 = new ColumnStatisticsData();
+ ColumnStatisticsObj col1Stats = new ColumnStatisticsObj(col1.getName(), col1.getType(), data1);
+ LongColumnStatsData longStats = new LongColumnStatsData();
+ longStats.setLowValue(col1LowVal);
+ longStats.setHighValue(col1HighVal);
+ longStats.setNumNulls(col1Nulls);
+ longStats.setNumDVs(col1DV);
+ data1.setLongStats(longStats);
+ colStatObjs.add(col1Stats);
+
+ // Col2
+ ColumnStatisticsData data2 = new ColumnStatisticsData();
+ ColumnStatisticsObj col2Stats = new ColumnStatisticsObj(col2.getName(), col2.getType(), data2);
+ StringColumnStatsData stringStats = new StringColumnStatsData();
+ stringStats.setMaxColLen(col2MaxColLen);
+ stringStats.setAvgColLen(col2AvgColLen);
+ stringStats.setNumNulls(col2Nulls);
+ stringStats.setNumDVs(col2DV);
+ data2.setStringStats(stringStats);
+ colStatObjs.add(col2Stats);
+
+ // Col3
+ ColumnStatisticsData data3 = new ColumnStatisticsData();
+ ColumnStatisticsObj col3Stats = new ColumnStatisticsObj(col3.getName(), col3.getType(), data3);
+ BooleanColumnStatsData boolStats = new BooleanColumnStatsData();
+ boolStats.setNumTrues(col3NumTrues);
+ boolStats.setNumFalses(col3NumFalses);
+ boolStats.setNumNulls(col3Nulls);
+ data3.setBooleanStats(boolStats);
+ colStatObjs.add(col3Stats);
+
+ stats.setStatsDesc(statsDesc);
+ stats.setStatsObj(colStatObjs);
+
+ // Save to DB
+ objectStore.updateTableColumnStatistics(stats);
+
+ // Prewarm CachedStore
+ cachedStore.prewarm();
+
+ // Read table stats via CachedStore
+ ColumnStatistics newStats =
+ cachedStore.getTableColumnStatistics(dbName, tblName,
+ Arrays.asList(col1.getName(), col2.getName(), col3.getName()));
+ Assert.assertEquals(stats, newStats);
+ }
+
+ private void updateCache(CachedStore cachedStore, long frequency, long sleepTime,
+ long shutdownTimeout) throws InterruptedException {
+ // Set cache refresh period to 100 milliseconds
+ cachedStore.setCacheRefreshPeriod(100);
+ // Start the CachedStore update service
+ cachedStore.startCacheUpdateService();
+ // Sleep for 500 ms so that cache update is complete
+ Thread.sleep(500);
+ // Stop cache update service
+ cachedStore.stopCacheUpdateService(100);
+ }
+
+ /**********************************************************************************************
+ * Methods that test SharedCache
+ *********************************************************************************************/
+
@Test
public void testSharedStoreDb() {
Database db1 = new Database();
@@ -160,6 +597,7 @@ public class TestCachedStore {
Assert.assertEquals(SharedCache.getSdCache().size(), 2);
}
+
@Test
public void testSharedStorePartition() {
Partition part1 = new Partition();
[2/2] hive git commit: HIVE-16579: CachedStore: improvements to
partition col stats caching and cache column stats for unpartitioned table
(Daniel Dai, Thejas Nair, Vaibhav Gumashta reviewed by Daniel Dai,
Thejas Nair)
Posted by vg...@apache.org.
HIVE-16579: CachedStore: improvements to partition col stats caching and cache column stats for unpartitioned table (Daniel Dai, Thejas Nair, Vaibhav Gumashta reviewed by Daniel Dai, Thejas Nair)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/d85beaa9
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/d85beaa9
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/d85beaa9
Branch: refs/heads/master
Commit: d85beaa99ba349d9334d3d96abb6e89c94db8481
Parents: 952fe6e
Author: Vaibhav Gumashta <vg...@hortonworks.com>
Authored: Mon May 22 15:52:58 2017 -0700
Committer: Vaibhav Gumashta <vg...@hortonworks.com>
Committed: Mon May 22 15:52:58 2017 -0700
----------------------------------------------------------------------
.../listener/DummyRawStoreFailEvent.java | 4 +-
.../org/apache/hadoop/hive/ql/QTestUtil.java | 2 +-
.../hive/metastore/MetaStoreDirectSql.java | 73 +-
.../hadoop/hive/metastore/MetaStoreUtils.java | 11 +-
.../hadoop/hive/metastore/ObjectStore.java | 19 +-
.../apache/hadoop/hive/metastore/RawStore.java | 8 +-
.../hive/metastore/StatObjectConverter.java | 148 +++
.../hadoop/hive/metastore/cache/CacheUtils.java | 31 +
.../hive/metastore/cache/CachedStore.java | 943 ++++++++++++-------
.../hive/metastore/cache/SharedCache.java | 293 +++++-
.../hadoop/hive/metastore/hbase/HBaseStore.java | 2 +-
.../stats/merge/ColumnStatsMergerFactory.java | 18 +-
.../stats/merge/DateColumnStatsMerger.java | 55 ++
.../DummyRawStoreControlledCommit.java | 2 +-
.../DummyRawStoreForJdoConnection.java | 2 +-
.../hive/metastore/cache/TestCachedStore.java | 450 ++++++++-
16 files changed, 1637 insertions(+), 424 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/d85beaa9/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/DummyRawStoreFailEvent.java
----------------------------------------------------------------------
diff --git a/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/DummyRawStoreFailEvent.java b/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/DummyRawStoreFailEvent.java
index 91a3a38..3dc63bd 100644
--- a/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/DummyRawStoreFailEvent.java
+++ b/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/DummyRawStoreFailEvent.java
@@ -914,9 +914,9 @@ public class DummyRawStoreFailEvent implements RawStore, Configurable {
}
@Override
- public Map<String, ColumnStatisticsObj> getAggrColStatsForTablePartitions(String dbName,
+ public Map<String, List<ColumnStatisticsObj>> getColStatsForTablePartitions(String dbName,
String tableName) throws MetaException, NoSuchObjectException {
- return objectStore.getAggrColStatsForTablePartitions(dbName, tableName);
+ return objectStore.getColStatsForTablePartitions(dbName, tableName);
}
@Override
http://git-wip-us.apache.org/repos/asf/hive/blob/d85beaa9/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java
----------------------------------------------------------------------
diff --git a/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java b/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java
index d296851..111cc11 100644
--- a/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java
+++ b/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java
@@ -350,7 +350,7 @@ public class QTestUtil {
if (!useHBaseMetastore) {
// Plug verifying metastore in for testing DirectSQL.
conf.setVar(HiveConf.ConfVars.METASTORE_RAW_STORE_IMPL,
- "org.apache.hadoop.hive.metastore.VerifyingObjectStore");
+ "org.apache.hadoop.hive.metastore.VerifyingObjectStore");
} else {
conf.setVar(ConfVars.METASTORE_RAW_STORE_IMPL, HBaseStore.class.getName());
conf.setBoolVar(ConfVars.METASTORE_FASTPATH, true);
http://git-wip-us.apache.org/repos/asf/hive/blob/d85beaa9/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreDirectSql.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreDirectSql.java b/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreDirectSql.java
index b96c27e..df73693 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreDirectSql.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreDirectSql.java
@@ -1208,7 +1208,9 @@ class MetaStoreDirectSql {
}
};
List<Object[]> list = runBatched(colNames, b);
- if (list.isEmpty()) return null;
+ if (list.isEmpty()) {
+ return null;
+ }
ColumnStatisticsDesc csd = new ColumnStatisticsDesc(true, dbName, tableName);
ColumnStatistics result = makeColumnStats(list, csd, 0);
b.closeAllQueries();
@@ -1343,41 +1345,26 @@ class MetaStoreDirectSql {
// Get aggregated column stats for a table per partition for all columns in the partition
// This is primarily used to populate stats object when using CachedStore (Check CachedStore#prewarm)
- public Map<String, ColumnStatisticsObj> getAggrColStatsForTablePartitions(String dbName,
- String tblName, boolean useDensityFunctionForNDVEstimation, double ndvTuner) throws MetaException {
- String queryText = "select \"PARTITION_NAME\", \"COLUMN_NAME\", \"COLUMN_TYPE\", "
- + "min(\"LONG_LOW_VALUE\"), max(\"LONG_HIGH_VALUE\"), min(\"DOUBLE_LOW_VALUE\"), max(\"DOUBLE_HIGH_VALUE\"), "
- + "min(cast(\"BIG_DECIMAL_LOW_VALUE\" as decimal)), max(cast(\"BIG_DECIMAL_HIGH_VALUE\" as decimal)), "
- + "sum(\"NUM_NULLS\"), max(\"NUM_DISTINCTS\"), "
- + "max(\"AVG_COL_LEN\"), max(\"MAX_COL_LEN\"), sum(\"NUM_TRUES\"), sum(\"NUM_FALSES\"), "
- // The following data is used to compute a partitioned table's NDV based
- // on partitions' NDV when useDensityFunctionForNDVEstimation = true. Global NDVs cannot be
- // accurately derived from partition NDVs, because the domain of column value two partitions
- // can overlap. If there is no overlap then global NDV is just the sum
- // of partition NDVs (UpperBound). But if there is some overlay then
- // global NDV can be anywhere between sum of partition NDVs (no overlap)
- // and same as one of the partition NDV (domain of column value in all other
- // partitions is subset of the domain value in one of the partition)
- // (LowerBound).But under uniform distribution, we can roughly estimate the global
- // NDV by leveraging the min/max values.
- // And, we also guarantee that the estimation makes sense by comparing it to the
- // UpperBound (calculated by "sum(\"NUM_DISTINCTS\")")
- // and LowerBound (calculated by "max(\"NUM_DISTINCTS\")")
- + "avg((\"LONG_HIGH_VALUE\"-\"LONG_LOW_VALUE\")/cast(\"NUM_DISTINCTS\" as decimal)),"
- + "avg((\"DOUBLE_HIGH_VALUE\"-\"DOUBLE_LOW_VALUE\")/\"NUM_DISTINCTS\"),"
- + "avg((cast(\"BIG_DECIMAL_HIGH_VALUE\" as decimal)-cast(\"BIG_DECIMAL_LOW_VALUE\" as decimal))/\"NUM_DISTINCTS\"),"
- + "sum(\"NUM_DISTINCTS\") from \"PART_COL_STATS\""
- + " where \"DB_NAME\" = ? and \"TABLE_NAME\" = ? group by \"PARTITION_NAME\", \"COLUMN_NAME\", \"COLUMN_TYPE\"";
+ public Map<String, List<ColumnStatisticsObj>> getColStatsForTablePartitions(String dbName,
+ String tblName) throws MetaException {
+ String queryText =
+ "select \"PARTITION_NAME\", \"COLUMN_NAME\", \"COLUMN_TYPE\", \"LONG_LOW_VALUE\", "
+ + "\"LONG_HIGH_VALUE\", \"DOUBLE_LOW_VALUE\", \"DOUBLE_HIGH_VALUE\", "
+ + "\"BIG_DECIMAL_LOW_VALUE\", \"BIG_DECIMAL_HIGH_VALUE\", \"NUM_NULLS\", "
+ + "\"NUM_DISTINCTS\", \"AVG_COL_LEN\", \"MAX_COL_LEN\", \"NUM_TRUES\", \"NUM_FALSES\""
+ + " from \"PART_COL_STATS\" where \"DB_NAME\" = ? and \"TABLE_NAME\" = ?"
+ + " order by \"PARTITION_NAME\"";
long start = 0;
long end = 0;
Query query = null;
boolean doTrace = LOG.isDebugEnabled();
Object qResult = null;
- ForwardQueryResult fqr = null;
start = doTrace ? System.nanoTime() : 0;
query = pm.newQuery("javax.jdo.query.SQL", queryText);
- qResult = executeWithArray(query,
- prepareParams(dbName, tblName, new ArrayList<String>(), new ArrayList<String>()), queryText);
+ qResult =
+ executeWithArray(query,
+ prepareParams(dbName, tblName, new ArrayList<String>(), new ArrayList<String>()),
+ queryText);
if (qResult == null) {
query.closeAll();
return Maps.newHashMap();
@@ -1385,13 +1372,31 @@ class MetaStoreDirectSql {
end = doTrace ? System.nanoTime() : 0;
timingTrace(doTrace, queryText, start, end);
List<Object[]> list = ensureList(qResult);
- Map<String, ColumnStatisticsObj> partColStatsMap = new HashMap<String, ColumnStatisticsObj>();
+ Map<String, List<ColumnStatisticsObj>> partColStatsMap =
+ new HashMap<String, List<ColumnStatisticsObj>>();
+ String partNameCurrent = null;
+ List<ColumnStatisticsObj> partColStatsList = new ArrayList<ColumnStatisticsObj>();
for (Object[] row : list) {
String partName = (String) row[0];
- String colName = (String) row[1];
- partColStatsMap.put(
- CacheUtils.buildKey(dbName, tblName, CachedStore.partNameToVals(partName), colName),
- prepareCSObjWithAdjustedNDV(row, 1, useDensityFunctionForNDVEstimation, ndvTuner));
+ if (partNameCurrent == null) {
+ // Update the current partition we are working on
+ partNameCurrent = partName;
+ // Create a new list for this new partition
+ partColStatsList = new ArrayList<ColumnStatisticsObj>();
+ // Add the col stat for the current column
+ partColStatsList.add(prepareCSObj(row, 1));
+ } else if (!partNameCurrent.equalsIgnoreCase(partName)) {
+ // Save the previous partition and its col stat list
+ partColStatsMap.put(partNameCurrent, partColStatsList);
+ // Update the current partition we are working on
+ partNameCurrent = partName;
+ // Create a new list for this new partition
+ partColStatsList = new ArrayList<ColumnStatisticsObj>();
+ // Add the col stat for the current column
+ partColStatsList.add(prepareCSObj(row, 1));
+ } else {
+ partColStatsList.add(prepareCSObj(row, 1));
+ }
Deadline.checkTimeout();
}
query.closeAll();
http://git-wip-us.apache.org/repos/asf/hive/blob/d85beaa9/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreUtils.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreUtils.java b/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreUtils.java
index 870896c..8328428 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreUtils.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreUtils.java
@@ -1171,6 +1171,15 @@ public class MetaStoreUtils {
return addCols(getSchemaWithoutCols(sd, tblsd, parameters, databaseName, tableName, partitionKeys), tblsd.getCols());
}
+ public static List<String> getColumnNamesForTable(Table table) {
+ List<String> colNames = new ArrayList<String>();
+ Iterator<FieldSchema> colsIterator = table.getSd().getColsIterator();
+ while (colsIterator.hasNext()) {
+ colNames.add(colsIterator.next().getName());
+ }
+ return colNames;
+ }
+
public static String getColumnNameDelimiter(List<FieldSchema> fieldSchemas) {
// we first take a look if any fieldSchemas contain COMMA
for (int i = 0; i < fieldSchemas.size(); i++) {
@@ -1180,7 +1189,7 @@ public class MetaStoreUtils {
}
return String.valueOf(SerDeUtils.COMMA);
}
-
+
/**
* Convert FieldSchemas to columnNames.
*/
http://git-wip-us.apache.org/repos/asf/hive/blob/d85beaa9/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java b/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java
index b28983f..19becb8 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java
@@ -7173,23 +7173,18 @@ public class ObjectStore implements RawStore, Configurable {
}
@Override
- public Map<String, ColumnStatisticsObj> getAggrColStatsForTablePartitions(String dbName,
+ public Map<String, List<ColumnStatisticsObj>> getColStatsForTablePartitions(String dbName,
String tableName) throws MetaException, NoSuchObjectException {
- final boolean useDensityFunctionForNDVEstimation = HiveConf.getBoolVar(getConf(),
- HiveConf.ConfVars.HIVE_METASTORE_STATS_NDV_DENSITY_FUNCTION);
- final double ndvTuner = HiveConf.getFloatVar(getConf(),
- HiveConf.ConfVars.HIVE_METASTORE_STATS_NDV_TUNER);
- return new GetHelper<Map<String, ColumnStatisticsObj>>(dbName, tableName, true, false) {
+ return new GetHelper<Map<String, List<ColumnStatisticsObj>>>(dbName, tableName, true, false) {
@Override
- protected Map<String, ColumnStatisticsObj> getSqlResult(
- GetHelper<Map<String, ColumnStatisticsObj>> ctx) throws MetaException {
- return directSql.getAggrColStatsForTablePartitions(dbName, tblName,
- useDensityFunctionForNDVEstimation, ndvTuner);
+ protected Map<String, List<ColumnStatisticsObj>> getSqlResult(
+ GetHelper<Map<String, List<ColumnStatisticsObj>>> ctx) throws MetaException {
+ return directSql.getColStatsForTablePartitions(dbName, tblName);
}
@Override
- protected Map<String, ColumnStatisticsObj> getJdoResult(
- GetHelper<Map<String, ColumnStatisticsObj>> ctx) throws MetaException,
+ protected Map<String, List<ColumnStatisticsObj>> getJdoResult(
+ GetHelper<Map<String, List<ColumnStatisticsObj>>> ctx) throws MetaException,
NoSuchObjectException {
// This is fast path for query optimizations, if we can find this info
// quickly using directSql, do it. No point in failing back to slow path
http://git-wip-us.apache.org/repos/asf/hive/blob/d85beaa9/metastore/src/java/org/apache/hadoop/hive/metastore/RawStore.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/RawStore.java b/metastore/src/java/org/apache/hadoop/hive/metastore/RawStore.java
index c1af690..964ffb2 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/RawStore.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/RawStore.java
@@ -579,14 +579,16 @@ public interface RawStore extends Configurable {
List<String> partNames, List<String> colNames) throws MetaException, NoSuchObjectException;
/**
- * Get all partition column statistics for a table
+ * Get all partition column statistics for a table in a db
+ *
* @param dbName
* @param tableName
- * @return Map of partition column statistics
+ * @return Map of partition column statistics. Key in the map is partition name. Value is a list
+ * of column stat object for each column in the partition
* @throws MetaException
* @throws NoSuchObjectException
*/
- public Map<String, ColumnStatisticsObj> getAggrColStatsForTablePartitions(String dbName,
+ public Map<String, List<ColumnStatisticsObj>> getColStatsForTablePartitions(String dbName,
String tableName) throws MetaException, NoSuchObjectException;
/**
http://git-wip-us.apache.org/repos/asf/hive/blob/d85beaa9/metastore/src/java/org/apache/hadoop/hive/metastore/StatObjectConverter.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/StatObjectConverter.java b/metastore/src/java/org/apache/hadoop/hive/metastore/StatObjectConverter.java
index fcf6f27..2dc2804 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/StatObjectConverter.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/StatObjectConverter.java
@@ -37,6 +37,7 @@ import org.apache.hadoop.hive.metastore.api.LongColumnStatsData;
import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
import org.apache.hadoop.hive.metastore.api.StringColumnStatsData;
+import org.apache.hadoop.hive.metastore.api.ColumnStatisticsData._Fields;
import org.apache.hadoop.hive.metastore.model.MPartition;
import org.apache.hadoop.hive.metastore.model.MPartitionColumnStatistics;
import org.apache.hadoop.hive.metastore.model.MTable;
@@ -700,4 +701,151 @@ public class StatObjectConverter {
return new BigDecimal(new BigInteger(d.getUnscaled()), d.getScale()).toString();
}
+ /**
+ * Set field values in oldStatObj from newStatObj
+ * @param oldStatObj
+ * @param newStatObj
+ */
+ public static void setFieldsIntoOldStats(ColumnStatisticsObj oldStatObj,
+ ColumnStatisticsObj newStatObj) {
+ _Fields typeNew = newStatObj.getStatsData().getSetField();
+ _Fields typeOld = oldStatObj.getStatsData().getSetField();
+ typeNew = typeNew == typeOld ? typeNew : null;
+ switch (typeNew) {
+ case BOOLEAN_STATS:
+ BooleanColumnStatsData oldBooleanStatsData = oldStatObj.getStatsData().getBooleanStats();
+ BooleanColumnStatsData newBooleanStatsData = newStatObj.getStatsData().getBooleanStats();
+ if (newBooleanStatsData.isSetNumTrues()) {
+ oldBooleanStatsData.setNumTrues(newBooleanStatsData.getNumTrues());
+ }
+ if (newBooleanStatsData.isSetNumFalses()) {
+ oldBooleanStatsData.setNumFalses(newBooleanStatsData.getNumFalses());
+ }
+ if (newBooleanStatsData.isSetNumNulls()) {
+ oldBooleanStatsData.setNumNulls(newBooleanStatsData.getNumNulls());
+ }
+ if (newBooleanStatsData.isSetBitVectors()) {
+ oldBooleanStatsData.setBitVectors(newBooleanStatsData.getBitVectors());
+ }
+ break;
+ case LONG_STATS: {
+ LongColumnStatsData oldLongStatsData = oldStatObj.getStatsData().getLongStats();
+ LongColumnStatsData newLongStatsData = newStatObj.getStatsData().getLongStats();
+ if (newLongStatsData.isSetHighValue()) {
+ oldLongStatsData.setHighValue(newLongStatsData.getHighValue());
+ }
+ if (newLongStatsData.isSetLowValue()) {
+ oldLongStatsData.setLowValue(newLongStatsData.getLowValue());
+ }
+ if (newLongStatsData.isSetNumNulls()) {
+ oldLongStatsData.setNumNulls(newLongStatsData.getNumNulls());
+ }
+ if (newLongStatsData.isSetNumDVs()) {
+ oldLongStatsData.setNumDVs(newLongStatsData.getNumDVs());
+ }
+ if (newLongStatsData.isSetBitVectors()) {
+ oldLongStatsData.setBitVectors(newLongStatsData.getBitVectors());
+ }
+ break;
+ }
+ case DOUBLE_STATS: {
+ DoubleColumnStatsData oldDoubleStatsData = oldStatObj.getStatsData().getDoubleStats();
+ DoubleColumnStatsData newDoubleStatsData = newStatObj.getStatsData().getDoubleStats();
+ if (newDoubleStatsData.isSetHighValue()) {
+ oldDoubleStatsData.setHighValue(newDoubleStatsData.getHighValue());
+ }
+ if (newDoubleStatsData.isSetLowValue()) {
+ oldDoubleStatsData.setLowValue(newDoubleStatsData.getLowValue());
+ }
+ if (newDoubleStatsData.isSetNumNulls()) {
+ oldDoubleStatsData.setNumNulls(newDoubleStatsData.getNumNulls());
+ }
+ if (newDoubleStatsData.isSetNumDVs()) {
+ oldDoubleStatsData.setNumDVs(newDoubleStatsData.getNumDVs());
+ }
+ if (newDoubleStatsData.isSetBitVectors()) {
+ oldDoubleStatsData.setBitVectors(newDoubleStatsData.getBitVectors());
+ }
+ break;
+ }
+ case STRING_STATS: {
+ StringColumnStatsData oldStringStatsData = oldStatObj.getStatsData().getStringStats();
+ StringColumnStatsData newStringStatsData = newStatObj.getStatsData().getStringStats();
+ if (newStringStatsData.isSetMaxColLen()) {
+ oldStringStatsData.setMaxColLen(newStringStatsData.getMaxColLen());
+ }
+ if (newStringStatsData.isSetAvgColLen()) {
+ oldStringStatsData.setAvgColLen(newStringStatsData.getAvgColLen());
+ }
+ if (newStringStatsData.isSetNumNulls()) {
+ oldStringStatsData.setNumNulls(newStringStatsData.getNumNulls());
+ }
+ if (newStringStatsData.isSetNumDVs()) {
+ oldStringStatsData.setNumDVs(newStringStatsData.getNumDVs());
+ }
+ if (newStringStatsData.isSetBitVectors()) {
+ oldStringStatsData.setBitVectors(newStringStatsData.getBitVectors());
+ }
+ break;
+ }
+ case BINARY_STATS:
+ BinaryColumnStatsData oldBinaryStatsData = oldStatObj.getStatsData().getBinaryStats();
+ BinaryColumnStatsData newBinaryStatsData = newStatObj.getStatsData().getBinaryStats();
+ if (newBinaryStatsData.isSetMaxColLen()) {
+ oldBinaryStatsData.setMaxColLen(newBinaryStatsData.getMaxColLen());
+ }
+ if (newBinaryStatsData.isSetAvgColLen()) {
+ oldBinaryStatsData.setAvgColLen(newBinaryStatsData.getAvgColLen());
+ }
+ if (newBinaryStatsData.isSetNumNulls()) {
+ oldBinaryStatsData.setNumNulls(newBinaryStatsData.getNumNulls());
+ }
+ if (newBinaryStatsData.isSetBitVectors()) {
+ oldBinaryStatsData.setBitVectors(newBinaryStatsData.getBitVectors());
+ }
+ break;
+ case DECIMAL_STATS: {
+ DecimalColumnStatsData oldDecimalStatsData = oldStatObj.getStatsData().getDecimalStats();
+ DecimalColumnStatsData newDecimalStatsData = newStatObj.getStatsData().getDecimalStats();
+ if (newDecimalStatsData.isSetHighValue()) {
+ oldDecimalStatsData.setHighValue(newDecimalStatsData.getHighValue());
+ }
+ if (newDecimalStatsData.isSetLowValue()) {
+ oldDecimalStatsData.setLowValue(newDecimalStatsData.getLowValue());
+ }
+ if (newDecimalStatsData.isSetNumNulls()) {
+ oldDecimalStatsData.setNumNulls(newDecimalStatsData.getNumNulls());
+ }
+ if (newDecimalStatsData.isSetNumDVs()) {
+ oldDecimalStatsData.setNumDVs(newDecimalStatsData.getNumDVs());
+ }
+ if (newDecimalStatsData.isSetBitVectors()) {
+ oldDecimalStatsData.setBitVectors(newDecimalStatsData.getBitVectors());
+ }
+ break;
+ }
+ case DATE_STATS: {
+ DateColumnStatsData oldDateStatsData = oldStatObj.getStatsData().getDateStats();
+ DateColumnStatsData newDateStatsData = newStatObj.getStatsData().getDateStats();
+ if (newDateStatsData.isSetHighValue()) {
+ oldDateStatsData.setHighValue(newDateStatsData.getHighValue());
+ }
+ if (newDateStatsData.isSetLowValue()) {
+ oldDateStatsData.setLowValue(newDateStatsData.getLowValue());
+ }
+ if (newDateStatsData.isSetNumNulls()) {
+ oldDateStatsData.setNumNulls(newDateStatsData.getNumNulls());
+ }
+ if (newDateStatsData.isSetNumDVs()) {
+ oldDateStatsData.setNumDVs(newDateStatsData.getNumDVs());
+ }
+ if (newDateStatsData.isSetBitVectors()) {
+ oldDateStatsData.setBitVectors(newDateStatsData.getBitVectors());
+ }
+ break;
+ }
+ default:
+ throw new IllegalArgumentException("Unknown stats type: " + typeNew.toString());
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/d85beaa9/metastore/src/java/org/apache/hadoop/hive/metastore/cache/CacheUtils.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/cache/CacheUtils.java b/metastore/src/java/org/apache/hadoop/hive/metastore/cache/CacheUtils.java
index 668499b..280655d 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/cache/CacheUtils.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/cache/CacheUtils.java
@@ -38,6 +38,10 @@ public class CacheUtils {
return dbName + delimit + tableName;
}
+ public static String buildKeyWithDelimit(String dbName, String tableName) {
+ return buildKey(dbName, tableName) + delimit;
+ }
+
public static String buildKey(String dbName, String tableName, List<String> partVals) {
String key = buildKey(dbName, tableName);
if (partVals == null || partVals.size() == 0) {
@@ -52,11 +56,38 @@ public class CacheUtils {
return key;
}
+ public static String buildKeyWithDelimit(String dbName, String tableName, List<String> partVals) {
+ return buildKey(dbName, tableName, partVals) + delimit;
+ }
+
public static String buildKey(String dbName, String tableName, List<String> partVals, String colName) {
String key = buildKey(dbName, tableName, partVals);
return key + delimit + colName;
}
+ public static String buildKey(String dbName, String tableName, String colName) {
+ String key = buildKey(dbName, tableName);
+ return key + delimit + colName;
+ }
+
+ public static String[] splitTableColStats(String key) {
+ return key.split(delimit);
+ }
+
+ public static Object[] splitPartitionColStats(String key) {
+ Object[] result = new Object[4];
+ String[] comps = key.split(delimit);
+ result[0] = comps[0];
+ result[1] = comps[1];
+ List<String> vals = new ArrayList<String>();
+ for (int i=2;i<comps.length-2;i++) {
+ vals.add(comps[i]);
+ }
+ result[2] = vals;
+ result[3] = comps[comps.length-1];
+ return result;
+ }
+
public static Table assemble(TableWrapper wrapper) {
Table t = wrapper.getTable().deepCopy();
if (wrapper.getSdHash()!=null) {
http://git-wip-us.apache.org/repos/asf/hive/blob/d85beaa9/metastore/src/java/org/apache/hadoop/hive/metastore/cache/CachedStore.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/cache/CachedStore.java b/metastore/src/java/org/apache/hadoop/hive/metastore/cache/CachedStore.java
index 1cc838f..78aab91 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/cache/CachedStore.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/cache/CachedStore.java
@@ -26,12 +26,15 @@ import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.common.FileUtils;
+import org.apache.hadoop.hive.common.StatsSetupConst;
import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.Deadline;
import org.apache.hadoop.hive.metastore.FileMetadataHandler;
import org.apache.hadoop.hive.metastore.MetaStoreUtils;
import org.apache.hadoop.hive.metastore.ObjectStore;
@@ -41,18 +44,11 @@ import org.apache.hadoop.hive.metastore.RawStore;
import org.apache.hadoop.hive.metastore.TableType;
import org.apache.hadoop.hive.metastore.Warehouse;
import org.apache.hadoop.hive.metastore.api.AggrStats;
-import org.apache.hadoop.hive.metastore.api.BinaryColumnStatsData;
-import org.apache.hadoop.hive.metastore.api.BooleanColumnStatsData;
import org.apache.hadoop.hive.metastore.api.ColumnStatistics;
-import org.apache.hadoop.hive.metastore.api.ColumnStatisticsData;
+import org.apache.hadoop.hive.metastore.api.ColumnStatisticsDesc;
import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
import org.apache.hadoop.hive.metastore.api.CurrentNotificationEventId;
import org.apache.hadoop.hive.metastore.api.Database;
-import org.apache.hadoop.hive.metastore.api.Date;
-import org.apache.hadoop.hive.metastore.api.DateColumnStatsData;
-import org.apache.hadoop.hive.metastore.api.Decimal;
-import org.apache.hadoop.hive.metastore.api.DecimalColumnStatsData;
-import org.apache.hadoop.hive.metastore.api.DoubleColumnStatsData;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.FileMetadataExprType;
import org.apache.hadoop.hive.metastore.api.Function;
@@ -61,7 +57,6 @@ import org.apache.hadoop.hive.metastore.api.Index;
import org.apache.hadoop.hive.metastore.api.InvalidInputException;
import org.apache.hadoop.hive.metastore.api.InvalidObjectException;
import org.apache.hadoop.hive.metastore.api.InvalidPartitionException;
-import org.apache.hadoop.hive.metastore.api.LongColumnStatsData;
import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
import org.apache.hadoop.hive.metastore.api.NotificationEvent;
@@ -77,13 +72,14 @@ import org.apache.hadoop.hive.metastore.api.RolePrincipalGrant;
import org.apache.hadoop.hive.metastore.api.SQLForeignKey;
import org.apache.hadoop.hive.metastore.api.SQLPrimaryKey;
import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
-import org.apache.hadoop.hive.metastore.api.StringColumnStatsData;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.metastore.api.TableMeta;
import org.apache.hadoop.hive.metastore.api.Type;
import org.apache.hadoop.hive.metastore.api.UnknownDBException;
import org.apache.hadoop.hive.metastore.api.UnknownPartitionException;
import org.apache.hadoop.hive.metastore.api.UnknownTableException;
+import org.apache.hadoop.hive.metastore.hbase.stats.merge.ColumnStatsMerger;
+import org.apache.hadoop.hive.metastore.hbase.stats.merge.ColumnStatsMergerFactory;
import org.apache.hadoop.hive.metastore.partition.spec.PartitionSpecProxy;
import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
@@ -104,15 +100,25 @@ import com.google.common.annotations.VisibleForTesting;
// TODO initial load slow?
// TODO size estimation
// TODO factor in extrapolation logic (using partitions found) during aggregate stats calculation
-// TODO factor in NDV estimation (density based estimation) logic when merging NDVs from 2 colStats object
-// TODO refactor to use same common code with StatObjectConverter (for merging 2 col stats objects)
public class CachedStore implements RawStore, Configurable {
private static ScheduledExecutorService cacheUpdateMaster = null;
- private static AtomicReference<Thread> runningMasterThread = new AtomicReference<Thread>(null);
+ private static ReentrantReadWriteLock databaseCacheLock = new ReentrantReadWriteLock(true);
+ private static AtomicBoolean isDatabaseCacheDirty = new AtomicBoolean(false);
+ private static ReentrantReadWriteLock tableCacheLock = new ReentrantReadWriteLock(true);
+ private static AtomicBoolean isTableCacheDirty = new AtomicBoolean(false);
+ private static ReentrantReadWriteLock partitionCacheLock = new ReentrantReadWriteLock(true);
+ private static AtomicBoolean isPartitionCacheDirty = new AtomicBoolean(false);
+ private static ReentrantReadWriteLock tableColStatsCacheLock = new ReentrantReadWriteLock(true);
+ private static AtomicBoolean isTableColStatsCacheDirty = new AtomicBoolean(false);
+ private static ReentrantReadWriteLock partitionColStatsCacheLock = new ReentrantReadWriteLock(
+ true);
+ private static AtomicBoolean isPartitionColStatsCacheDirty = new AtomicBoolean(false);
RawStore rawStore;
Configuration conf;
private PartitionExpressionProxy expressionProxy = null;
+ // Default value set to 100 milliseconds for test purpose
+ private long cacheRefreshPeriod = 100;
static boolean firstTime = true;
static final private Logger LOG = LoggerFactory.getLogger(CachedStore.class.getName());
@@ -209,6 +215,8 @@ public class CachedStore implements RawStore, Configurable {
LOG.info("Prewarming CachedStore");
prewarm();
LOG.info("CachedStore initialized");
+ // Start the cache update master-worker threads
+ startCacheUpdateService();
} catch (Exception e) {
throw new RuntimeException(e);
}
@@ -216,7 +224,10 @@ public class CachedStore implements RawStore, Configurable {
}
}
- private void prewarm() throws Exception {
+ @VisibleForTesting
+ void prewarm() throws Exception {
+ // Prevents throwing exceptions in our raw store calls since we're not using RawStoreProxy
+ Deadline.registerIfNot(1000000);
List<String> dbNames = rawStore.getAllDatabases();
for (String dbName : dbNames) {
Database db = rawStore.getDatabase(dbName);
@@ -226,35 +237,81 @@ public class CachedStore implements RawStore, Configurable {
Table table = rawStore.getTable(dbName, tblName);
SharedCache.addTableToCache(HiveStringUtils.normalizeIdentifier(dbName),
HiveStringUtils.normalizeIdentifier(tblName), table);
+ Deadline.startTimer("getPartitions");
List<Partition> partitions = rawStore.getPartitions(dbName, tblName, Integer.MAX_VALUE);
+ Deadline.stopTimer();
for (Partition partition : partitions) {
SharedCache.addPartitionToCache(HiveStringUtils.normalizeIdentifier(dbName),
HiveStringUtils.normalizeIdentifier(tblName), partition);
}
- Map<String, ColumnStatisticsObj> aggrStatsPerPartition = rawStore
- .getAggrColStatsForTablePartitions(dbName, tblName);
- SharedCache.addPartitionColStatsToCache(aggrStatsPerPartition);
+ // Cache partition column stats
+ Deadline.startTimer("getColStatsForTablePartitions");
+ Map<String, List<ColumnStatisticsObj>> colStatsPerPartition =
+ rawStore.getColStatsForTablePartitions(dbName, tblName);
+ Deadline.stopTimer();
+ if (colStatsPerPartition != null) {
+ SharedCache.addPartitionColStatsToCache(dbName, tblName, colStatsPerPartition);
+ }
+ // Cache table column stats
+ List<String> colNames = MetaStoreUtils.getColumnNamesForTable(table);
+ Deadline.startTimer("getTableColumnStatistics");
+ ColumnStatistics tableColStats =
+ rawStore.getTableColumnStatistics(dbName, tblName, colNames);
+ Deadline.stopTimer();
+ if ((tableColStats != null) && (tableColStats.getStatsObjSize() > 0)) {
+ SharedCache.addTableColStatsToCache(HiveStringUtils.normalizeIdentifier(dbName),
+ HiveStringUtils.normalizeIdentifier(tblName), tableColStats.getStatsObj());
+ }
}
}
- // Start the cache update master-worker threads
- startCacheUpdateService();
}
- private synchronized void startCacheUpdateService() {
+ @VisibleForTesting
+ synchronized void startCacheUpdateService() {
if (cacheUpdateMaster == null) {
cacheUpdateMaster = Executors.newScheduledThreadPool(1, new ThreadFactory() {
public Thread newThread(Runnable r) {
Thread t = Executors.defaultThreadFactory().newThread(r);
+ t.setName("CachedStore-CacheUpdateService: Thread-" + t.getId());
t.setDaemon(true);
return t;
}
});
- cacheUpdateMaster.scheduleAtFixedRate(new CacheUpdateMasterWork(this), 0, HiveConf
- .getTimeVar(conf, HiveConf.ConfVars.METASTORE_CACHED_RAW_STORE_CACHE_UPDATE_FREQUENCY,
- TimeUnit.MILLISECONDS), TimeUnit.MILLISECONDS);
+ if (!HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_IN_TEST)) {
+ cacheRefreshPeriod =
+ HiveConf.getTimeVar(conf,
+ HiveConf.ConfVars.METASTORE_CACHED_RAW_STORE_CACHE_UPDATE_FREQUENCY,
+ TimeUnit.MILLISECONDS);
+ }
+ LOG.info("CachedStore: starting cache update service (run every " + cacheRefreshPeriod + "ms");
+ cacheUpdateMaster.scheduleAtFixedRate(new CacheUpdateMasterWork(this), cacheRefreshPeriod,
+ cacheRefreshPeriod, TimeUnit.MILLISECONDS);
}
}
+ @VisibleForTesting
+ synchronized boolean stopCacheUpdateService(long timeout) {
+ boolean tasksStoppedBeforeShutdown = false;
+ if (cacheUpdateMaster != null) {
+ LOG.info("CachedStore: shutting down cache update service");
+ try {
+ tasksStoppedBeforeShutdown =
+ cacheUpdateMaster.awaitTermination(timeout, TimeUnit.MILLISECONDS);
+ } catch (InterruptedException e) {
+ LOG.info("CachedStore: cache update service was interrupted while waiting for tasks to "
+ + "complete before shutting down. Will make a hard stop now.");
+ }
+ cacheUpdateMaster.shutdownNow();
+ cacheUpdateMaster = null;
+ }
+ return tasksStoppedBeforeShutdown;
+ }
+
+ @VisibleForTesting
+ void setCacheRefreshPeriod(long time) {
+ this.cacheRefreshPeriod = time;
+ }
+
static class CacheUpdateMasterWork implements Runnable {
private CachedStore cachedStore;
@@ -265,86 +322,175 @@ public class CachedStore implements RawStore, Configurable {
@Override
public void run() {
- runningMasterThread.set(Thread.currentThread());
- RawStore rawStore = cachedStore.getRawStore();
+ // Prevents throwing exceptions in our raw store calls since we're not using RawStoreProxy
+ Deadline.registerIfNot(1000000);
+ LOG.debug("CachedStore: updating cached objects");
+ String rawStoreClassName =
+ HiveConf.getVar(cachedStore.conf, HiveConf.ConfVars.METASTORE_CACHED_RAW_STORE_IMPL,
+ ObjectStore.class.getName());
try {
+ RawStore rawStore =
+ ((Class<? extends RawStore>) MetaStoreUtils.getClass(rawStoreClassName)).newInstance();
+ rawStore.setConf(cachedStore.conf);
List<String> dbNames = rawStore.getAllDatabases();
- // Update the database in cache
- if (!updateDatabases(rawStore, dbNames)) {
- return;
- }
- // Update the tables and their partitions in cache
- if (!updateTables(rawStore, dbNames)) {
- return;
+ if (dbNames != null) {
+ // Update the database in cache
+ updateDatabases(rawStore, dbNames);
+ for (String dbName : dbNames) {
+ // Update the tables in cache
+ updateTables(rawStore, dbName);
+ List<String> tblNames = cachedStore.getAllTables(dbName);
+ for (String tblName : tblNames) {
+ // Update the partitions for a table in cache
+ updateTablePartitions(rawStore, dbName, tblName);
+ // Update the table column stats for a table in cache
+ updateTableColStats(rawStore, dbName, tblName);
+ // Update the partitions column stats for a table in cache
+ updateTablePartitionColStats(rawStore, dbName, tblName);
+ }
+ }
}
} catch (MetaException e) {
LOG.error("Updating CachedStore: error getting database names", e);
+ } catch (InstantiationException | IllegalAccessException e) {
+ throw new RuntimeException("Cannot instantiate " + rawStoreClassName, e);
}
}
- private boolean updateDatabases(RawStore rawStore, List<String> dbNames) {
- if (dbNames != null) {
- List<Database> databases = new ArrayList<Database>();
- for (String dbName : dbNames) {
- // If a preemption of this thread was requested, simply return before proceeding
- if (Thread.interrupted()) {
- return false;
+ private void updateDatabases(RawStore rawStore, List<String> dbNames) {
+ // Prepare the list of databases
+ List<Database> databases = new ArrayList<Database>();
+ for (String dbName : dbNames) {
+ Database db;
+ try {
+ db = rawStore.getDatabase(dbName);
+ databases.add(db);
+ } catch (NoSuchObjectException e) {
+ LOG.info("Updating CachedStore: database - " + dbName + " does not exist.", e);
+ }
+ }
+ // Update the cached database objects
+ try {
+ if (databaseCacheLock.writeLock().tryLock()) {
+ // Skip background updates if we detect change
+ if (isDatabaseCacheDirty.compareAndSet(true, false)) {
+ LOG.debug("Skipping database cache update; the database list we have is dirty.");
+ return;
}
- Database db;
- try {
- db = rawStore.getDatabase(dbName);
- databases.add(db);
- } catch (NoSuchObjectException e) {
- LOG.info("Updating CachedStore: database - " + dbName + " does not exist.", e);
+ SharedCache.refreshDatabases(databases);
+ }
+ } finally {
+ if (databaseCacheLock.isWriteLockedByCurrentThread()) {
+ databaseCacheLock.writeLock().unlock();
+ }
+ }
+ }
+
+ // Update the cached table objects
+ private void updateTables(RawStore rawStore, String dbName) {
+ List<Table> tables = new ArrayList<Table>();
+ try {
+ List<String> tblNames = rawStore.getAllTables(dbName);
+ for (String tblName : tblNames) {
+ Table table =
+ rawStore.getTable(HiveStringUtils.normalizeIdentifier(dbName),
+ HiveStringUtils.normalizeIdentifier(tblName));
+ tables.add(table);
+ }
+ if (tableCacheLock.writeLock().tryLock()) {
+ // Skip background updates if we detect change
+ if (isTableCacheDirty.compareAndSet(true, false)) {
+ LOG.debug("Skipping table cache update; the table list we have is dirty.");
+ return;
}
+ SharedCache.refreshTables(dbName, tables);
+ }
+ } catch (MetaException e) {
+ LOG.info("Updating CachedStore: unable to read tables for database - " + dbName, e);
+ } finally {
+ if (tableCacheLock.isWriteLockedByCurrentThread()) {
+ tableCacheLock.writeLock().unlock();
}
- // Update the cached database objects
- SharedCache.refreshDatabases(databases);
}
- return true;
}
- private boolean updateTables(RawStore rawStore, List<String> dbNames) {
- if (dbNames != null) {
- List<Table> tables = new ArrayList<Table>();
- for (String dbName : dbNames) {
- try {
- List<String> tblNames = rawStore.getAllTables(dbName);
- for (String tblName : tblNames) {
- // If a preemption of this thread was requested, simply return before proceeding
- if (Thread.interrupted()) {
- return false;
- }
- Table table = rawStore.getTable(dbName, tblName);
- tables.add(table);
- }
- // Update the cached database objects
- SharedCache.refreshTables(dbName, tables);
- for (String tblName : tblNames) {
- // If a preemption of this thread was requested, simply return before proceeding
- if (Thread.interrupted()) {
- return false;
- }
- List<Partition> partitions =
- rawStore.getPartitions(dbName, tblName, Integer.MAX_VALUE);
- SharedCache.refreshPartitions(dbName, tblName, partitions);
- }
- } catch (MetaException | NoSuchObjectException e) {
- LOG.error("Updating CachedStore: unable to read table", e);
- return false;
+ // Update the cached partition objects for a table
+ private void updateTablePartitions(RawStore rawStore, String dbName, String tblName) {
+ try {
+ Deadline.startTimer("getPartitions");
+ List<Partition> partitions = rawStore.getPartitions(dbName, tblName, Integer.MAX_VALUE);
+ Deadline.stopTimer();
+ if (partitionCacheLock.writeLock().tryLock()) {
+ // Skip background updates if we detect change
+ if (isPartitionCacheDirty.compareAndSet(true, false)) {
+ LOG.debug("Skipping partition cache update; the partition list we have is dirty.");
+ return;
}
+ SharedCache.refreshPartitions(HiveStringUtils.normalizeIdentifier(dbName),
+ HiveStringUtils.normalizeIdentifier(tblName), partitions);
+ }
+ } catch (MetaException | NoSuchObjectException e) {
+ LOG.info("Updating CachedStore: unable to read partitions of table: " + tblName, e);
+ } finally {
+ if (partitionCacheLock.isWriteLockedByCurrentThread()) {
+ partitionCacheLock.writeLock().unlock();
}
}
- return true;
}
- }
- // Interrupt the cache update background thread
- // Fire and forget (the master will respond appropriately when it gets a chance)
- // All writes to the cache go through synchronized methods, so fire & forget is fine.
- private void interruptCacheUpdateMaster() {
- if (runningMasterThread.get() != null) {
- runningMasterThread.get().interrupt();
+ // Update the cached col stats for this table
+ private void updateTableColStats(RawStore rawStore, String dbName, String tblName) {
+ try {
+ Table table = rawStore.getTable(dbName, tblName);
+ List<String> colNames = MetaStoreUtils.getColumnNamesForTable(table);
+ Deadline.startTimer("getTableColumnStatistics");
+ ColumnStatistics tableColStats =
+ rawStore.getTableColumnStatistics(dbName, tblName, colNames);
+ Deadline.stopTimer();
+ if (tableColStatsCacheLock.writeLock().tryLock()) {
+ // Skip background updates if we detect change
+ if (isTableColStatsCacheDirty.compareAndSet(true, false)) {
+ LOG.debug("Skipping table column stats cache update; the table column stats list we "
+ + "have is dirty.");
+ return;
+ }
+ SharedCache.refreshTableColStats(HiveStringUtils.normalizeIdentifier(dbName),
+ HiveStringUtils.normalizeIdentifier(tblName), tableColStats.getStatsObj());
+ }
+ } catch (MetaException | NoSuchObjectException e) {
+ LOG.info("Updating CachedStore: unable to read table column stats of table: " + tblName, e);
+ } finally {
+ if (tableColStatsCacheLock.isWriteLockedByCurrentThread()) {
+ tableColStatsCacheLock.writeLock().unlock();
+ }
+ }
+ }
+
+ // Update the cached partition col stats for a table
+ private void updateTablePartitionColStats(RawStore rawStore, String dbName, String tblName) {
+ try {
+ Deadline.startTimer("getColStatsForTablePartitions");
+ Map<String, List<ColumnStatisticsObj>> colStatsPerPartition =
+ rawStore.getColStatsForTablePartitions(dbName, tblName);
+ Deadline.stopTimer();
+ if (partitionColStatsCacheLock.writeLock().tryLock()) {
+ // Skip background updates if we detect change
+ if (isPartitionColStatsCacheDirty.compareAndSet(true, false)) {
+ LOG.debug("Skipping partition column stats cache update; the partition column stats "
+ + "list we have is dirty.");
+ return;
+ }
+ SharedCache.refreshPartitionColStats(HiveStringUtils.normalizeIdentifier(dbName),
+ HiveStringUtils.normalizeIdentifier(tblName), colStatsPerPartition);
+ }
+ } catch (MetaException | NoSuchObjectException e) {
+ LOG.info("Updating CachedStore: unable to read partitions column stats of table: "
+ + tblName, e);
+ } finally {
+ if (partitionColStatsCacheLock.isWriteLockedByCurrentThread()) {
+ partitionColStatsCacheLock.writeLock().unlock();
+ }
+ }
}
}
@@ -374,11 +520,17 @@ public class CachedStore implements RawStore, Configurable {
}
@Override
- public void createDatabase(Database db)
- throws InvalidObjectException, MetaException {
+ public void createDatabase(Database db) throws InvalidObjectException, MetaException {
rawStore.createDatabase(db);
- interruptCacheUpdateMaster();
- SharedCache.addDatabaseToCache(HiveStringUtils.normalizeIdentifier(db.getName()), db.deepCopy());
+ try {
+ // Wait if background cache update is happening
+ databaseCacheLock.readLock().lock();
+ isDatabaseCacheDirty.set(true);
+ SharedCache.addDatabaseToCache(HiveStringUtils.normalizeIdentifier(db.getName()),
+ db.deepCopy());
+ } finally {
+ databaseCacheLock.readLock().unlock();
+ }
}
@Override
@@ -387,26 +539,38 @@ public class CachedStore implements RawStore, Configurable {
if (db == null) {
throw new NoSuchObjectException();
}
- return SharedCache.getDatabaseFromCache(HiveStringUtils.normalizeIdentifier(dbName));
+ return db;
}
@Override
public boolean dropDatabase(String dbname) throws NoSuchObjectException, MetaException {
boolean succ = rawStore.dropDatabase(dbname);
if (succ) {
- interruptCacheUpdateMaster();
- SharedCache.removeDatabaseFromCache(HiveStringUtils.normalizeIdentifier(dbname));
+ try {
+ // Wait if background cache update is happening
+ databaseCacheLock.readLock().lock();
+ isDatabaseCacheDirty.set(true);
+ SharedCache.removeDatabaseFromCache(HiveStringUtils.normalizeIdentifier(dbname));
+ } finally {
+ databaseCacheLock.readLock().unlock();
+ }
}
return succ;
}
@Override
- public boolean alterDatabase(String dbName, Database db)
- throws NoSuchObjectException, MetaException {
+ public boolean alterDatabase(String dbName, Database db) throws NoSuchObjectException,
+ MetaException {
boolean succ = rawStore.alterDatabase(dbName, db);
if (succ) {
- interruptCacheUpdateMaster();
- SharedCache.alterDatabaseInCache(HiveStringUtils.normalizeIdentifier(dbName), db);
+ try {
+ // Wait if background cache update is happening
+ databaseCacheLock.readLock().lock();
+ isDatabaseCacheDirty.set(true);
+ SharedCache.alterDatabaseInCache(HiveStringUtils.normalizeIdentifier(dbName), db);
+ } finally {
+ databaseCacheLock.readLock().unlock();
+ }
}
return succ;
}
@@ -462,24 +626,45 @@ public class CachedStore implements RawStore, Configurable {
}
@Override
- public void createTable(Table tbl)
- throws InvalidObjectException, MetaException {
+ public void createTable(Table tbl) throws InvalidObjectException, MetaException {
rawStore.createTable(tbl);
- interruptCacheUpdateMaster();
validateTableType(tbl);
- SharedCache.addTableToCache(HiveStringUtils.normalizeIdentifier(tbl.getDbName()),
- HiveStringUtils.normalizeIdentifier(tbl.getTableName()), tbl);
+ try {
+ // Wait if background cache update is happening
+ tableCacheLock.readLock().lock();
+ isTableCacheDirty.set(true);
+ SharedCache.addTableToCache(HiveStringUtils.normalizeIdentifier(tbl.getDbName()),
+ HiveStringUtils.normalizeIdentifier(tbl.getTableName()), tbl);
+ } finally {
+ tableCacheLock.readLock().unlock();
+ }
}
@Override
- public boolean dropTable(String dbName, String tableName)
- throws MetaException, NoSuchObjectException, InvalidObjectException,
- InvalidInputException {
+ public boolean dropTable(String dbName, String tableName) throws MetaException,
+ NoSuchObjectException, InvalidObjectException, InvalidInputException {
boolean succ = rawStore.dropTable(dbName, tableName);
if (succ) {
- interruptCacheUpdateMaster();
- SharedCache.removeTableFromCache(HiveStringUtils.normalizeIdentifier(dbName),
- HiveStringUtils.normalizeIdentifier(tableName));
+ // Remove table
+ try {
+ // Wait if background table cache update is happening
+ tableCacheLock.readLock().lock();
+ isTableCacheDirty.set(true);
+ SharedCache.removeTableFromCache(HiveStringUtils.normalizeIdentifier(dbName),
+ HiveStringUtils.normalizeIdentifier(tableName));
+ } finally {
+ tableCacheLock.readLock().unlock();
+ }
+ // Remove table col stats
+ try {
+ // Wait if background table col stats cache update is happening
+ tableColStatsCacheLock.readLock().lock();
+ isTableColStatsCacheDirty.set(true);
+ SharedCache.removeTableColStatsFromCache(HiveStringUtils.normalizeIdentifier(dbName),
+ HiveStringUtils.normalizeIdentifier(tableName));
+ } finally {
+ tableColStatsCacheLock.readLock().unlock();
+ }
}
return succ;
}
@@ -496,57 +681,74 @@ public class CachedStore implements RawStore, Configurable {
}
@Override
- public boolean addPartition(Partition part)
- throws InvalidObjectException, MetaException {
+ public boolean addPartition(Partition part) throws InvalidObjectException, MetaException {
boolean succ = rawStore.addPartition(part);
if (succ) {
- interruptCacheUpdateMaster();
- SharedCache.addPartitionToCache(HiveStringUtils.normalizeIdentifier(part.getDbName()),
- HiveStringUtils.normalizeIdentifier(part.getTableName()), part);
+ try {
+ // Wait if background cache update is happening
+ partitionCacheLock.readLock().lock();
+ isPartitionCacheDirty.set(true);
+ SharedCache.addPartitionToCache(HiveStringUtils.normalizeIdentifier(part.getDbName()),
+ HiveStringUtils.normalizeIdentifier(part.getTableName()), part);
+ } finally {
+ partitionCacheLock.readLock().unlock();
+ }
}
return succ;
}
@Override
- public boolean addPartitions(String dbName, String tblName,
- List<Partition> parts) throws InvalidObjectException, MetaException {
+ public boolean addPartitions(String dbName, String tblName, List<Partition> parts)
+ throws InvalidObjectException, MetaException {
boolean succ = rawStore.addPartitions(dbName, tblName, parts);
if (succ) {
- interruptCacheUpdateMaster();
- for (Partition part : parts) {
- SharedCache.addPartitionToCache(HiveStringUtils.normalizeIdentifier(part.getDbName()),
- HiveStringUtils.normalizeIdentifier(part.getTableName()), part);
+ try {
+ // Wait if background cache update is happening
+ partitionCacheLock.readLock().lock();
+ isPartitionCacheDirty.set(true);
+ for (Partition part : parts) {
+ SharedCache.addPartitionToCache(HiveStringUtils.normalizeIdentifier(dbName),
+ HiveStringUtils.normalizeIdentifier(tblName), part);
+ }
+ } finally {
+ partitionCacheLock.readLock().unlock();
}
}
return succ;
}
@Override
- public boolean addPartitions(String dbName, String tblName,
- PartitionSpecProxy partitionSpec, boolean ifNotExists)
- throws InvalidObjectException, MetaException {
+ public boolean addPartitions(String dbName, String tblName, PartitionSpecProxy partitionSpec,
+ boolean ifNotExists) throws InvalidObjectException, MetaException {
boolean succ = rawStore.addPartitions(dbName, tblName, partitionSpec, ifNotExists);
if (succ) {
- interruptCacheUpdateMaster();
- PartitionSpecProxy.PartitionIterator iterator = partitionSpec.getPartitionIterator();
- while (iterator.hasNext()) {
- Partition part = iterator.next();
- SharedCache.addPartitionToCache(HiveStringUtils.normalizeIdentifier(dbName),
- HiveStringUtils.normalizeIdentifier(tblName), part);
+ try {
+ // Wait if background cache update is happening
+ partitionCacheLock.readLock().lock();
+ isPartitionCacheDirty.set(true);
+ PartitionSpecProxy.PartitionIterator iterator = partitionSpec.getPartitionIterator();
+ while (iterator.hasNext()) {
+ Partition part = iterator.next();
+ SharedCache.addPartitionToCache(HiveStringUtils.normalizeIdentifier(dbName),
+ HiveStringUtils.normalizeIdentifier(tblName), part);
+ }
+ } finally {
+ partitionCacheLock.readLock().unlock();
}
}
return succ;
}
@Override
- public Partition getPartition(String dbName, String tableName,
- List<String> part_vals) throws MetaException, NoSuchObjectException {
- Partition part = SharedCache.getPartitionFromCache(HiveStringUtils.normalizeIdentifier(dbName),
- HiveStringUtils.normalizeIdentifier(tableName), part_vals);
+ public Partition getPartition(String dbName, String tableName, List<String> part_vals)
+ throws MetaException, NoSuchObjectException {
+ Partition part =
+ SharedCache.getPartitionFromCache(HiveStringUtils.normalizeIdentifier(dbName),
+ HiveStringUtils.normalizeIdentifier(tableName), part_vals);
if (part != null) {
part.unsetPrivileges();
} else {
- throw new NoSuchObjectException();
+ throw new NoSuchObjectException("partition values=" + part_vals.toString());
}
return part;
}
@@ -559,14 +761,30 @@ public class CachedStore implements RawStore, Configurable {
}
@Override
- public boolean dropPartition(String dbName, String tableName,
- List<String> part_vals) throws MetaException, NoSuchObjectException,
- InvalidObjectException, InvalidInputException {
+ public boolean dropPartition(String dbName, String tableName, List<String> part_vals)
+ throws MetaException, NoSuchObjectException, InvalidObjectException, InvalidInputException {
boolean succ = rawStore.dropPartition(dbName, tableName, part_vals);
if (succ) {
- interruptCacheUpdateMaster();
- SharedCache.removePartitionFromCache(HiveStringUtils.normalizeIdentifier(dbName),
- HiveStringUtils.normalizeIdentifier(tableName), part_vals);
+ // Remove partition
+ try {
+ // Wait if background cache update is happening
+ partitionCacheLock.readLock().lock();
+ isPartitionCacheDirty.set(true);
+ SharedCache.removePartitionFromCache(HiveStringUtils.normalizeIdentifier(dbName),
+ HiveStringUtils.normalizeIdentifier(tableName), part_vals);
+ } finally {
+ partitionCacheLock.readLock().unlock();
+ }
+ // Remove partition col stats
+ try {
+ // Wait if background cache update is happening
+ partitionColStatsCacheLock.readLock().lock();
+ isPartitionColStatsCacheDirty.set(true);
+ SharedCache.removePartitionColStatsFromCache(HiveStringUtils.normalizeIdentifier(dbName),
+ HiveStringUtils.normalizeIdentifier(tableName), part_vals);
+ } finally {
+ partitionColStatsCacheLock.readLock().unlock();
+ }
}
return succ;
}
@@ -588,10 +806,28 @@ public class CachedStore implements RawStore, Configurable {
public void alterTable(String dbName, String tblName, Table newTable)
throws InvalidObjectException, MetaException {
rawStore.alterTable(dbName, tblName, newTable);
- interruptCacheUpdateMaster();
validateTableType(newTable);
- SharedCache.alterTableInCache(HiveStringUtils.normalizeIdentifier(dbName),
- HiveStringUtils.normalizeIdentifier(tblName), newTable);
+ // Update table cache
+ try {
+ // Wait if background cache update is happening
+ tableCacheLock.readLock().lock();
+ isTableCacheDirty.set(true);
+ SharedCache.alterTableInCache(HiveStringUtils.normalizeIdentifier(dbName),
+ HiveStringUtils.normalizeIdentifier(tblName), newTable);
+ } finally {
+ tableCacheLock.readLock().unlock();
+ }
+ // Update partition cache (key might have changed since table name is a
+ // component of key)
+ try {
+ // Wait if background cache update is happening
+ partitionCacheLock.readLock().lock();
+ isPartitionCacheDirty.set(true);
+ SharedCache.alterTableInPartitionCache(HiveStringUtils.normalizeIdentifier(dbName),
+ HiveStringUtils.normalizeIdentifier(tblName), newTable);
+ } finally {
+ partitionCacheLock.readLock().unlock();
+ }
}
@Override
@@ -685,26 +921,62 @@ public class CachedStore implements RawStore, Configurable {
}
@Override
- public void alterPartition(String dbName, String tblName,
- List<String> partVals, Partition newPart)
+ public void alterPartition(String dbName, String tblName, List<String> partVals, Partition newPart)
throws InvalidObjectException, MetaException {
rawStore.alterPartition(dbName, tblName, partVals, newPart);
- interruptCacheUpdateMaster();
- SharedCache.alterPartitionInCache(HiveStringUtils.normalizeIdentifier(dbName),
- HiveStringUtils.normalizeIdentifier(tblName), partVals, newPart);
+ // Update partition cache
+ try {
+ // Wait if background cache update is happening
+ partitionCacheLock.readLock().lock();
+ isPartitionCacheDirty.set(true);
+ SharedCache.alterPartitionInCache(HiveStringUtils.normalizeIdentifier(dbName),
+ HiveStringUtils.normalizeIdentifier(tblName), partVals, newPart);
+ } finally {
+ partitionCacheLock.readLock().unlock();
+ }
+ // Update partition column stats cache
+ try {
+ // Wait if background cache update is happening
+ partitionColStatsCacheLock.readLock().lock();
+ isPartitionColStatsCacheDirty.set(true);
+ SharedCache.alterPartitionInColStatsCache(HiveStringUtils.normalizeIdentifier(dbName),
+ HiveStringUtils.normalizeIdentifier(tblName), partVals, newPart);
+ } finally {
+ partitionColStatsCacheLock.readLock().unlock();
+ }
}
@Override
- public void alterPartitions(String dbName, String tblName,
- List<List<String>> partValsList, List<Partition> newParts)
- throws InvalidObjectException, MetaException {
+ public void alterPartitions(String dbName, String tblName, List<List<String>> partValsList,
+ List<Partition> newParts) throws InvalidObjectException, MetaException {
rawStore.alterPartitions(dbName, tblName, partValsList, newParts);
- interruptCacheUpdateMaster();
- for (int i=0;i<partValsList.size();i++) {
- List<String> partVals = partValsList.get(i);
- Partition newPart = newParts.get(i);
- SharedCache.alterPartitionInCache(HiveStringUtils.normalizeIdentifier(dbName),
- HiveStringUtils.normalizeIdentifier(tblName), partVals, newPart);
+ // Update partition cache
+ try {
+ // Wait if background cache update is happening
+ partitionCacheLock.readLock().lock();
+ isPartitionCacheDirty.set(true);
+ for (int i = 0; i < partValsList.size(); i++) {
+ List<String> partVals = partValsList.get(i);
+ Partition newPart = newParts.get(i);
+ SharedCache.alterPartitionInCache(HiveStringUtils.normalizeIdentifier(dbName),
+ HiveStringUtils.normalizeIdentifier(tblName), partVals, newPart);
+ }
+ } finally {
+ partitionCacheLock.readLock().unlock();
+ }
+ // Update partition column stats cache
+ try {
+ // Wait if background cache update is happening
+ partitionColStatsCacheLock.readLock().lock();
+ isPartitionColStatsCacheDirty.set(true);
+ for (int i = 0; i < partValsList.size(); i++) {
+ List<String> partVals = partValsList.get(i);
+ Partition newPart = newParts.get(i);
+ SharedCache.alterPartitionInColStatsCache(HiveStringUtils.normalizeIdentifier(dbName),
+ HiveStringUtils.normalizeIdentifier(tblName), partVals, newPart);
+ }
+ } finally {
+ partitionColStatsCacheLock.readLock().unlock();
}
}
@@ -1095,55 +1367,199 @@ public class CachedStore implements RawStore, Configurable {
@Override
public boolean updateTableColumnStatistics(ColumnStatistics colStats)
- throws NoSuchObjectException, MetaException, InvalidObjectException,
- InvalidInputException {
+ throws NoSuchObjectException, MetaException, InvalidObjectException, InvalidInputException {
boolean succ = rawStore.updateTableColumnStatistics(colStats);
if (succ) {
- SharedCache.updateTableColumnStatistics(HiveStringUtils.normalizeIdentifier(colStats.getStatsDesc().getDbName()),
- HiveStringUtils.normalizeIdentifier(colStats.getStatsDesc().getTableName()), colStats.getStatsObj());
+ String dbName = colStats.getStatsDesc().getDbName();
+ String tableName = colStats.getStatsDesc().getTableName();
+ List<ColumnStatisticsObj> statsObjs = colStats.getStatsObj();
+ Table tbl = getTable(dbName, tableName);
+ List<String> colNames = new ArrayList<>();
+ for (ColumnStatisticsObj statsObj : statsObjs) {
+ colNames.add(statsObj.getColName());
+ }
+ StatsSetupConst.setColumnStatsState(tbl.getParameters(), colNames);
+
+ // Update table
+ try {
+ // Wait if background cache update is happening
+ tableCacheLock.readLock().lock();
+ isTableCacheDirty.set(true);
+ SharedCache.alterTableInCache(HiveStringUtils.normalizeIdentifier(dbName),
+ HiveStringUtils.normalizeIdentifier(tableName), tbl);
+ } finally {
+ tableCacheLock.readLock().unlock();
+ }
+
+ // Update table col stats
+ try {
+ // Wait if background cache update is happening
+ tableColStatsCacheLock.readLock().lock();
+ isTableColStatsCacheDirty.set(true);
+ SharedCache.updateTableColStatsInCache(HiveStringUtils.normalizeIdentifier(dbName),
+ HiveStringUtils.normalizeIdentifier(tableName), statsObjs);
+ } finally {
+ tableColStatsCacheLock.readLock().unlock();
+ }
}
return succ;
}
@Override
- public boolean updatePartitionColumnStatistics(ColumnStatistics colStats,
- List<String> partVals) throws NoSuchObjectException, MetaException,
- InvalidObjectException, InvalidInputException {
- boolean succ = rawStore.updatePartitionColumnStatistics(colStats, partVals);
+ public ColumnStatistics getTableColumnStatistics(String dbName, String tableName,
+ List<String> colNames) throws MetaException, NoSuchObjectException {
+ ColumnStatisticsDesc csd = new ColumnStatisticsDesc(true, dbName, tableName);
+ List<ColumnStatisticsObj> colStatObjs = new ArrayList<ColumnStatisticsObj>();
+ for (String colName : colNames) {
+ String colStatsCacheKey =
+ CacheUtils.buildKey(HiveStringUtils.normalizeIdentifier(dbName),
+ HiveStringUtils.normalizeIdentifier(tableName), colName);
+ ColumnStatisticsObj colStat = SharedCache.getCachedTableColStats(colStatsCacheKey);
+ if (colStat != null) {
+ colStatObjs.add(colStat);
+ }
+ }
+ if (colStatObjs.isEmpty()) {
+ return null;
+ } else {
+ return new ColumnStatistics(csd, colStatObjs);
+ }
+ }
+
+ @Override
+ public boolean deleteTableColumnStatistics(String dbName, String tableName, String colName)
+ throws NoSuchObjectException, MetaException, InvalidObjectException, InvalidInputException {
+ boolean succ = rawStore.deleteTableColumnStatistics(dbName, tableName, colName);
if (succ) {
- SharedCache.updatePartitionColumnStatistics(HiveStringUtils.normalizeIdentifier(colStats.getStatsDesc().getDbName()),
- HiveStringUtils.normalizeIdentifier(colStats.getStatsDesc().getTableName()), partVals, colStats.getStatsObj());
+ try {
+ // Wait if background cache update is happening
+ tableColStatsCacheLock.readLock().lock();
+ isTableColStatsCacheDirty.set(true);
+ SharedCache.removeTableColStatsFromCache(HiveStringUtils.normalizeIdentifier(dbName),
+ HiveStringUtils.normalizeIdentifier(tableName), colName);
+ } finally {
+ tableColStatsCacheLock.readLock().unlock();
+ }
}
return succ;
}
@Override
- public ColumnStatistics getTableColumnStatistics(String dbName,
- String tableName, List<String> colName)
- throws MetaException, NoSuchObjectException {
- return rawStore.getTableColumnStatistics(dbName, tableName, colName);
+ public boolean updatePartitionColumnStatistics(ColumnStatistics colStats, List<String> partVals)
+ throws NoSuchObjectException, MetaException, InvalidObjectException, InvalidInputException {
+ boolean succ = rawStore.updatePartitionColumnStatistics(colStats, partVals);
+ if (succ) {
+ String dbName = colStats.getStatsDesc().getDbName();
+ String tableName = colStats.getStatsDesc().getTableName();
+ List<ColumnStatisticsObj> statsObjs = colStats.getStatsObj();
+ Partition part = getPartition(dbName, tableName, partVals);
+ List<String> colNames = new ArrayList<>();
+ for (ColumnStatisticsObj statsObj : statsObjs) {
+ colNames.add(statsObj.getColName());
+ }
+ StatsSetupConst.setColumnStatsState(part.getParameters(), colNames);
+
+ // Update partition
+ try {
+ // Wait if background cache update is happening
+ partitionCacheLock.readLock().lock();
+ isPartitionCacheDirty.set(true);
+ SharedCache.alterPartitionInCache(HiveStringUtils.normalizeIdentifier(dbName),
+ HiveStringUtils.normalizeIdentifier(tableName), partVals, part);
+ } finally {
+ partitionCacheLock.readLock().unlock();
+ }
+
+ // Update partition column stats
+ try {
+ // Wait if background cache update is happening
+ partitionColStatsCacheLock.readLock().lock();
+ isPartitionColStatsCacheDirty.set(true);
+ SharedCache.updatePartitionColStatsInCache(
+ HiveStringUtils.normalizeIdentifier(colStats.getStatsDesc().getDbName()),
+ HiveStringUtils.normalizeIdentifier(colStats.getStatsDesc().getTableName()), partVals,
+ colStats.getStatsObj());
+ } finally {
+ partitionColStatsCacheLock.readLock().unlock();
+ }
+ }
+ return succ;
}
@Override
- public List<ColumnStatistics> getPartitionColumnStatistics(String dbName,
- String tblName, List<String> partNames, List<String> colNames)
- throws MetaException, NoSuchObjectException {
+ // TODO: calculate from cached values.
+ // Need to see if it makes sense to do this as some col stats maybe out of date/missing on cache.
+ public List<ColumnStatistics> getPartitionColumnStatistics(String dbName, String tblName,
+ List<String> partNames, List<String> colNames) throws MetaException, NoSuchObjectException {
return rawStore.getPartitionColumnStatistics(dbName, tblName, partNames, colNames);
}
@Override
- public boolean deletePartitionColumnStatistics(String dbName,
- String tableName, String partName, List<String> partVals, String colName)
- throws NoSuchObjectException, MetaException, InvalidObjectException,
- InvalidInputException {
- return rawStore.deletePartitionColumnStatistics(dbName, tableName, partName, partVals, colName);
+ public boolean deletePartitionColumnStatistics(String dbName, String tableName, String partName,
+ List<String> partVals, String colName) throws NoSuchObjectException, MetaException,
+ InvalidObjectException, InvalidInputException {
+ boolean succ =
+ rawStore.deletePartitionColumnStatistics(dbName, tableName, partName, partVals, colName);
+ if (succ) {
+ try {
+ // Wait if background cache update is happening
+ partitionColStatsCacheLock.readLock().lock();
+ isPartitionColStatsCacheDirty.set(true);
+ SharedCache.removePartitionColStatsFromCache(HiveStringUtils.normalizeIdentifier(dbName),
+ HiveStringUtils.normalizeIdentifier(tableName), partVals, colName);
+ } finally {
+ partitionColStatsCacheLock.readLock().unlock();
+ }
+ }
+ return succ;
}
@Override
- public boolean deleteTableColumnStatistics(String dbName, String tableName,
- String colName) throws NoSuchObjectException, MetaException,
- InvalidObjectException, InvalidInputException {
- return rawStore.deleteTableColumnStatistics(dbName, tableName, colName);
+ public AggrStats get_aggr_stats_for(String dbName, String tblName, List<String> partNames,
+ List<String> colNames) throws MetaException, NoSuchObjectException {
+ List<ColumnStatisticsObj> colStats = new ArrayList<ColumnStatisticsObj>(colNames.size());
+ for (String colName : colNames) {
+ ColumnStatisticsObj colStat =
+ mergeColStatsForPartitions(HiveStringUtils.normalizeIdentifier(dbName),
+ HiveStringUtils.normalizeIdentifier(tblName), partNames, colName);
+ if (colStat == null) {
+ // Stop and fall back to underlying RawStore
+ colStats = null;
+ break;
+ } else {
+ colStats.add(colStat);
+ }
+ }
+ if (colStats == null) {
+ return rawStore.get_aggr_stats_for(dbName, tblName, partNames, colNames);
+ } else {
+ return new AggrStats(colStats, partNames.size());
+ }
+ }
+
+ private ColumnStatisticsObj mergeColStatsForPartitions(String dbName, String tblName,
+ List<String> partNames, String colName) throws MetaException {
+ ColumnStatisticsObj colStats = null;
+ for (String partName : partNames) {
+ String colStatsCacheKey =
+ CacheUtils.buildKey(dbName, tblName, partNameToVals(partName), colName);
+ ColumnStatisticsObj colStatsForPart =
+ SharedCache.getCachedPartitionColStats(colStatsCacheKey);
+ if (colStatsForPart == null) {
+ // we don't have stats for all the partitions
+ // logic for extrapolation hasn't been added to CacheStore
+ // So stop now, and lets fallback to underlying RawStore
+ return null;
+ }
+ if (colStats == null) {
+ colStats = colStatsForPart;
+ } else {
+ ColumnStatsMerger merger =
+ ColumnStatsMergerFactory.getColumnStatsMerger(colStats, colStatsForPart);
+ merger.merge(colStats, colStatsForPart);
+ }
+ }
+ return colStats;
}
@Override
@@ -1209,14 +1625,34 @@ public class CachedStore implements RawStore, Configurable {
}
@Override
- public void dropPartitions(String dbName, String tblName,
- List<String> partNames) throws MetaException, NoSuchObjectException {
+ public void dropPartitions(String dbName, String tblName, List<String> partNames)
+ throws MetaException, NoSuchObjectException {
rawStore.dropPartitions(dbName, tblName, partNames);
- interruptCacheUpdateMaster();
- for (String partName : partNames) {
- List<String> vals = partNameToVals(partName);
- SharedCache.removePartitionFromCache(HiveStringUtils.normalizeIdentifier(dbName),
- HiveStringUtils.normalizeIdentifier(tblName), vals);
+ // Remove partitions
+ try {
+ // Wait if background cache update is happening
+ partitionCacheLock.readLock().lock();
+ isPartitionCacheDirty.set(true);
+ for (String partName : partNames) {
+ List<String> vals = partNameToVals(partName);
+ SharedCache.removePartitionFromCache(HiveStringUtils.normalizeIdentifier(dbName),
+ HiveStringUtils.normalizeIdentifier(tblName), vals);
+ }
+ } finally {
+ partitionCacheLock.readLock().unlock();
+ }
+ // Remove partition col stats
+ try {
+ // Wait if background cache update is happening
+ partitionColStatsCacheLock.readLock().lock();
+ isPartitionColStatsCacheDirty.set(true);
+ for (String partName : partNames) {
+ List<String> part_vals = partNameToVals(partName);
+ SharedCache.removePartitionColStatsFromCache(HiveStringUtils.normalizeIdentifier(dbName),
+ HiveStringUtils.normalizeIdentifier(tblName), part_vals);
+ }
+ } finally {
+ partitionColStatsCacheLock.readLock().unlock();
}
}
@@ -1326,130 +1762,6 @@ public class CachedStore implements RawStore, Configurable {
}
@Override
- public AggrStats get_aggr_stats_for(String dbName, String tblName,
- List<String> partNames, List<String> colNames)
- throws MetaException, NoSuchObjectException {
- List<ColumnStatisticsObj> colStats = new ArrayList<ColumnStatisticsObj>(colNames.size());
- for (String colName : colNames) {
- colStats.add(mergeColStatsForPartitions(HiveStringUtils.normalizeIdentifier(dbName),
- HiveStringUtils.normalizeIdentifier(tblName), partNames, colName));
- }
- // TODO: revisit the partitions not found case for extrapolation
- return new AggrStats(colStats, partNames.size());
- }
-
- private ColumnStatisticsObj mergeColStatsForPartitions(String dbName, String tblName,
- List<String> partNames, String colName) throws MetaException {
- ColumnStatisticsObj colStats = null;
- for (String partName : partNames) {
- String colStatsCacheKey = CacheUtils.buildKey(dbName, tblName, partNameToVals(partName), colName);
- ColumnStatisticsObj colStatsForPart = SharedCache.getCachedPartitionColStats(
- colStatsCacheKey);
- if (colStats == null) {
- colStats = colStatsForPart;
- } else {
- colStats = mergeColStatsObj(colStats, colStatsForPart);
- }
- }
- return colStats;
- }
-
- private ColumnStatisticsObj mergeColStatsObj(ColumnStatisticsObj colStats1,
- ColumnStatisticsObj colStats2) throws MetaException {
- if ((!colStats1.getColType().equalsIgnoreCase(colStats2.getColType()))
- && (!colStats1.getColName().equalsIgnoreCase(colStats2.getColName()))) {
- throw new MetaException("Can't merge column stats for two partitions for different columns.");
- }
- ColumnStatisticsData csd = new ColumnStatisticsData();
- ColumnStatisticsObj cso = new ColumnStatisticsObj(colStats1.getColName(),
- colStats1.getColType(), csd);
- ColumnStatisticsData csData1 = colStats1.getStatsData();
- ColumnStatisticsData csData2 = colStats2.getStatsData();
- String colType = colStats1.getColType().toLowerCase();
- if (colType.equals("boolean")) {
- BooleanColumnStatsData boolStats = new BooleanColumnStatsData();
- boolStats.setNumFalses(csData1.getBooleanStats().getNumFalses()
- + csData2.getBooleanStats().getNumFalses());
- boolStats.setNumTrues(csData1.getBooleanStats().getNumTrues()
- + csData2.getBooleanStats().getNumTrues());
- boolStats.setNumNulls(csData1.getBooleanStats().getNumNulls()
- + csData2.getBooleanStats().getNumNulls());
- csd.setBooleanStats(boolStats);
- } else if (colType.equals("string") || colType.startsWith("varchar")
- || colType.startsWith("char")) {
- StringColumnStatsData stringStats = new StringColumnStatsData();
- stringStats.setNumNulls(csData1.getStringStats().getNumNulls()
- + csData2.getStringStats().getNumNulls());
- stringStats.setAvgColLen(Math.max(csData1.getStringStats().getAvgColLen(), csData2
- .getStringStats().getAvgColLen()));
- stringStats.setMaxColLen(Math.max(csData1.getStringStats().getMaxColLen(), csData2
- .getStringStats().getMaxColLen()));
- stringStats.setNumDVs(Math.max(csData1.getStringStats().getNumDVs(), csData2.getStringStats()
- .getNumDVs()));
- csd.setStringStats(stringStats);
- } else if (colType.equals("binary")) {
- BinaryColumnStatsData binaryStats = new BinaryColumnStatsData();
- binaryStats.setNumNulls(csData1.getBinaryStats().getNumNulls()
- + csData2.getBinaryStats().getNumNulls());
- binaryStats.setAvgColLen(Math.max(csData1.getBinaryStats().getAvgColLen(), csData2
- .getBinaryStats().getAvgColLen()));
- binaryStats.setMaxColLen(Math.max(csData1.getBinaryStats().getMaxColLen(), csData2
- .getBinaryStats().getMaxColLen()));
- csd.setBinaryStats(binaryStats);
- } else if (colType.equals("bigint") || colType.equals("int") || colType.equals("smallint")
- || colType.equals("tinyint") || colType.equals("timestamp")) {
- LongColumnStatsData longStats = new LongColumnStatsData();
- longStats.setNumNulls(csData1.getLongStats().getNumNulls()
- + csData2.getLongStats().getNumNulls());
- longStats.setHighValue(Math.max(csData1.getLongStats().getHighValue(), csData2.getLongStats()
- .getHighValue()));
- longStats.setLowValue(Math.min(csData1.getLongStats().getLowValue(), csData2.getLongStats()
- .getLowValue()));
- longStats.setNumDVs(Math.max(csData1.getLongStats().getNumDVs(), csData2.getLongStats()
- .getNumDVs()));
- csd.setLongStats(longStats);
- } else if (colType.equals("date")) {
- DateColumnStatsData dateStats = new DateColumnStatsData();
- dateStats.setNumNulls(csData1.getDateStats().getNumNulls()
- + csData2.getDateStats().getNumNulls());
- dateStats.setHighValue(new Date(Math.max(csData1.getDateStats().getHighValue()
- .getDaysSinceEpoch(), csData2.getDateStats().getHighValue().getDaysSinceEpoch())));
- dateStats.setHighValue(new Date(Math.min(csData1.getDateStats().getLowValue()
- .getDaysSinceEpoch(), csData2.getDateStats().getLowValue().getDaysSinceEpoch())));
- dateStats.setNumDVs(Math.max(csData1.getDateStats().getNumDVs(), csData2.getDateStats()
- .getNumDVs()));
- csd.setDateStats(dateStats);
- } else if (colType.equals("double") || colType.equals("float")) {
- DoubleColumnStatsData doubleStats = new DoubleColumnStatsData();
- doubleStats.setNumNulls(csData1.getDoubleStats().getNumNulls()
- + csData2.getDoubleStats().getNumNulls());
- doubleStats.setHighValue(Math.max(csData1.getDoubleStats().getHighValue(), csData2
- .getDoubleStats().getHighValue()));
- doubleStats.setLowValue(Math.min(csData1.getDoubleStats().getLowValue(), csData2
- .getDoubleStats().getLowValue()));
- doubleStats.setNumDVs(Math.max(csData1.getDoubleStats().getNumDVs(), csData2.getDoubleStats()
- .getNumDVs()));
- csd.setDoubleStats(doubleStats);
- } else if (colType.startsWith("decimal")) {
- DecimalColumnStatsData decimalStats = new DecimalColumnStatsData();
- decimalStats.setNumNulls(csData1.getDecimalStats().getNumNulls()
- + csData2.getDecimalStats().getNumNulls());
- Decimal high = (csData1.getDecimalStats().getHighValue()
- .compareTo(csData2.getDecimalStats().getHighValue()) > 0) ? csData1.getDecimalStats()
- .getHighValue() : csData2.getDecimalStats().getHighValue();
- decimalStats.setHighValue(high);
- Decimal low = (csData1.getDecimalStats().getLowValue()
- .compareTo(csData2.getDecimalStats().getLowValue()) < 0) ? csData1.getDecimalStats()
- .getLowValue() : csData2.getDecimalStats().getLowValue();
- decimalStats.setLowValue(low);
- decimalStats.setNumDVs(Math.max(csData1.getDecimalStats().getNumDVs(), csData2
- .getDecimalStats().getNumDVs()));
- csd.setDecimalStats(decimalStats);
- }
- return cso;
- }
-
- @Override
public NotificationEventResponse getNextNotification(
NotificationEventRequest rqst) {
return rawStore.getNextNotification(rqst);
@@ -1565,10 +1877,9 @@ public class CachedStore implements RawStore, Configurable {
}
@Override
- public Map<String, ColumnStatisticsObj> getAggrColStatsForTablePartitions(
- String dbName, String tableName)
- throws MetaException, NoSuchObjectException {
- return rawStore.getAggrColStatsForTablePartitions(dbName, tableName);
+ public Map<String, List<ColumnStatisticsObj>> getColStatsForTablePartitions(String dbName,
+ String tableName) throws MetaException, NoSuchObjectException {
+ return rawStore.getColStatsForTablePartitions(dbName, tableName);
}
public RawStore getRawStore() {