You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by we...@apache.org on 2017/05/24 23:52:10 UTC
[36/54] [abbrv] hive git commit: HIVE-16579: CachedStore:
improvements to partition col stats caching and cache column stats for
unpartitioned table (Daniel Dai, Thejas Nair,
Vaibhav Gumashta reviewed by Daniel Dai, Thejas Nair)
http://git-wip-us.apache.org/repos/asf/hive/blob/d85beaa9/metastore/src/java/org/apache/hadoop/hive/metastore/cache/SharedCache.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/cache/SharedCache.java b/metastore/src/java/org/apache/hadoop/hive/metastore/cache/SharedCache.java
index 7beee42..6b6355b 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/cache/SharedCache.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/cache/SharedCache.java
@@ -21,14 +21,18 @@ import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.ArrayList;
import java.util.HashMap;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.Map.Entry;
import java.util.TreeMap;
-import org.apache.hadoop.hive.common.StatsSetupConst;
+import org.apache.hadoop.hive.metastore.StatObjectConverter;
+import org.apache.hadoop.hive.metastore.Warehouse;
import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
import org.apache.hadoop.hive.metastore.api.Database;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
import org.apache.hadoop.hive.metastore.api.Table;
@@ -38,17 +42,26 @@ import org.apache.hadoop.hive.metastore.cache.CachedStore.StorageDescriptorWrapp
import org.apache.hadoop.hive.metastore.cache.CachedStore.TableWrapper;
import org.apache.hadoop.hive.metastore.hbase.HBaseUtils;
import org.apache.hive.common.util.HiveStringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import com.google.common.annotations.VisibleForTesting;
public class SharedCache {
private static Map<String, Database> databaseCache = new TreeMap<String, Database>();
private static Map<String, TableWrapper> tableCache = new TreeMap<String, TableWrapper>();
- private static Map<String, PartitionWrapper> partitionCache = new TreeMap<String, PartitionWrapper>();
- private static Map<String, ColumnStatisticsObj> partitionColStatsCache = new TreeMap<String, ColumnStatisticsObj>();
- private static Map<ByteArrayWrapper, StorageDescriptorWrapper> sdCache = new HashMap<ByteArrayWrapper, StorageDescriptorWrapper>();
+ private static Map<String, PartitionWrapper> partitionCache =
+ new TreeMap<String, PartitionWrapper>();
+ private static Map<String, ColumnStatisticsObj> partitionColStatsCache =
+ new TreeMap<String, ColumnStatisticsObj>();
+ private static Map<String, ColumnStatisticsObj> tableColStatsCache =
+ new TreeMap<String, ColumnStatisticsObj>();
+ private static Map<ByteArrayWrapper, StorageDescriptorWrapper> sdCache =
+ new HashMap<ByteArrayWrapper, StorageDescriptorWrapper>();
private static MessageDigest md;
+ static final private Logger LOG = LoggerFactory.getLogger(SharedCache.class.getName());
+
static {
try {
md = MessageDigest.getInstance("MD5");
@@ -97,11 +110,13 @@ public class SharedCache {
Table tblCopy = tbl.deepCopy();
tblCopy.setDbName(HiveStringUtils.normalizeIdentifier(dbName));
tblCopy.setTableName(HiveStringUtils.normalizeIdentifier(tblName));
- for (FieldSchema fs : tblCopy.getPartitionKeys()) {
- fs.setName(HiveStringUtils.normalizeIdentifier(fs.getName()));
+ if (tblCopy.getPartitionKeys() != null) {
+ for (FieldSchema fs : tblCopy.getPartitionKeys()) {
+ fs.setName(HiveStringUtils.normalizeIdentifier(fs.getName()));
+ }
}
TableWrapper wrapper;
- if (tbl.getSd()!=null) {
+ if (tbl.getSd() != null) {
byte[] sdHash = HBaseUtils.hashStorageDescriptor(tbl.getSd(), md);
StorageDescriptor sd = tbl.getSd();
increSd(sd, sdHash);
@@ -121,10 +136,54 @@ public class SharedCache {
}
}
+ public static synchronized ColumnStatisticsObj getCachedTableColStats(String colStatsCacheKey) {
+ return tableColStatsCache.get(colStatsCacheKey);
+ }
+
+ public static synchronized void removeTableColStatsFromCache(String dbName, String tblName) {
+ String partialKey = CacheUtils.buildKeyWithDelimit(dbName, tblName);
+ Iterator<Entry<String, ColumnStatisticsObj>> iterator =
+ tableColStatsCache.entrySet().iterator();
+ while (iterator.hasNext()) {
+ Entry<String, ColumnStatisticsObj> entry = iterator.next();
+ String key = entry.getKey();
+ if (key.toLowerCase().startsWith(partialKey.toLowerCase())) {
+ iterator.remove();
+ }
+ }
+ }
+
+ public static synchronized void removeTableColStatsFromCache(String dbName, String tblName,
+ String colName) {
+ tableColStatsCache.remove(CacheUtils.buildKey(dbName, tblName, colName));
+ }
+
+ public static synchronized void updateTableColStatsInCache(String dbName, String tableName,
+ List<ColumnStatisticsObj> colStatsForTable) {
+ for (ColumnStatisticsObj colStatObj : colStatsForTable) {
+ // Get old stats object if present
+ String key = CacheUtils.buildKey(dbName, tableName, colStatObj.getColName());
+ ColumnStatisticsObj oldStatsObj = tableColStatsCache.get(key);
+ if (oldStatsObj != null) {
+ LOG.debug("CachedStore: updating table column stats for column: " + colStatObj.getColName()
+ + ", of table: " + tableName + " and database: " + dbName);
+ // Update existing stat object's field
+ StatObjectConverter.setFieldsIntoOldStats(oldStatsObj, colStatObj);
+ } else {
+ // No stats exist for this key; add a new object to the cache
+ tableColStatsCache.put(key, colStatObj);
+ }
+ }
+ }
+
public static synchronized void alterTableInCache(String dbName, String tblName, Table newTable) {
removeTableFromCache(dbName, tblName);
addTableToCache(HiveStringUtils.normalizeIdentifier(newTable.getDbName()),
HiveStringUtils.normalizeIdentifier(newTable.getTableName()), newTable);
+ }
+
+ public static synchronized void alterTableInPartitionCache(String dbName, String tblName,
+ Table newTable) {
if (!dbName.equals(newTable.getDbName()) || !tblName.equals(newTable.getTableName())) {
List<Partition> partitions = listCachedPartitions(dbName, tblName, -1);
for (Partition part : partitions) {
@@ -137,6 +196,58 @@ public class SharedCache {
}
}
+ public static synchronized void alterTableInTableColStatsCache(String dbName, String tblName,
+ Table newTable) {
+ if (!dbName.equals(newTable.getDbName()) || !tblName.equals(newTable.getTableName())) {
+ String oldPartialTableStatsKey = CacheUtils.buildKeyWithDelimit(dbName, tblName);
+ Iterator<Entry<String, ColumnStatisticsObj>> iterator =
+ tableColStatsCache.entrySet().iterator();
+ Map<String, ColumnStatisticsObj> newTableColStats =
+ new HashMap<String, ColumnStatisticsObj>();
+ while (iterator.hasNext()) {
+ Entry<String, ColumnStatisticsObj> entry = iterator.next();
+ String key = entry.getKey();
+ ColumnStatisticsObj colStatObj = entry.getValue();
+ if (key.toLowerCase().startsWith(oldPartialTableStatsKey.toLowerCase())) {
+ String[] decomposedKey = CacheUtils.splitTableColStats(key);
+ String newKey = CacheUtils.buildKey(decomposedKey[0], decomposedKey[1], decomposedKey[2]);
+ newTableColStats.put(newKey, colStatObj);
+ iterator.remove();
+ }
+ }
+ tableColStatsCache.putAll(newTableColStats);
+ }
+ }
+
+ public static synchronized void alterTableInPartitionColStatsCache(String dbName, String tblName,
+ Table newTable) {
+ if (!dbName.equals(newTable.getDbName()) || !tblName.equals(newTable.getTableName())) {
+ List<Partition> partitions = listCachedPartitions(dbName, tblName, -1);
+ Map<String, ColumnStatisticsObj> newPartitionColStats =
+ new HashMap<String, ColumnStatisticsObj>();
+ for (Partition part : partitions) {
+ String oldPartialPartitionKey =
+ CacheUtils.buildKeyWithDelimit(dbName, tblName, part.getValues());
+ Iterator<Entry<String, ColumnStatisticsObj>> iterator =
+ partitionColStatsCache.entrySet().iterator();
+ while (iterator.hasNext()) {
+ Entry<String, ColumnStatisticsObj> entry = iterator.next();
+ String key = entry.getKey();
+ ColumnStatisticsObj colStatObj = entry.getValue();
+ if (key.toLowerCase().startsWith(oldPartialPartitionKey.toLowerCase())) {
+ Object[] decomposedKey = CacheUtils.splitPartitionColStats(key);
+ String newKey =
+ CacheUtils.buildKey((String) decomposedKey[0], (String) decomposedKey[1],
+ (List<String>) decomposedKey[2], (String) decomposedKey[3]);
+ newPartitionColStats.put(newKey, colStatObj);
+ iterator.remove();
+ }
+ }
+ }
+ partitionColStatsCache.putAll(newPartitionColStats);
+ }
+ }
+
public static synchronized int getCachedTableCount() {
return tableCache.size();
}
@@ -151,18 +262,6 @@ public class SharedCache {
return tables;
}
- public static synchronized void updateTableColumnStatistics(String dbName, String tableName,
- List<ColumnStatisticsObj> statsObjs) {
- Table tbl = getTableFromCache(dbName, tableName);
- tbl.getSd().getParameters();
- List<String> colNames = new ArrayList<>();
- for (ColumnStatisticsObj statsObj:statsObjs) {
- colNames.add(statsObj.getColName());
- }
- StatsSetupConst.setColumnStatsState(tbl.getParameters(), colNames);
- alterTableInCache(dbName, tableName, tbl);
- }
-
public static synchronized List<TableMeta> getTableMeta(String dbNames, String tableNames, List<String> tableTypes) {
List<TableMeta> tableMetas = new ArrayList<TableMeta>();
for (String dbName : listCachedDatabases()) {
@@ -214,14 +313,51 @@ public class SharedCache {
return partitionCache.containsKey(CacheUtils.buildKey(dbName, tblName, part_vals));
}
- public static synchronized Partition removePartitionFromCache(String dbName, String tblName, List<String> part_vals) {
- PartitionWrapper wrapper = partitionCache.remove(CacheUtils.buildKey(dbName, tblName, part_vals));
- if (wrapper.getSdHash()!=null) {
+ public static synchronized Partition removePartitionFromCache(String dbName, String tblName,
+ List<String> part_vals) {
+ PartitionWrapper wrapper =
+ partitionCache.remove(CacheUtils.buildKey(dbName, tblName, part_vals));
+ if (wrapper.getSdHash() != null) {
decrSd(wrapper.getSdHash());
}
return wrapper.getPartition();
}
+ // Remove cached column stats for all partitions of a table
+ public static synchronized void removePartitionColStatsFromCache(String dbName, String tblName) {
+ String partialKey = CacheUtils.buildKeyWithDelimit(dbName, tblName);
+ Iterator<Entry<String, ColumnStatisticsObj>> iterator =
+ partitionColStatsCache.entrySet().iterator();
+ while (iterator.hasNext()) {
+ Entry<String, ColumnStatisticsObj> entry = iterator.next();
+ String key = entry.getKey();
+ if (key.toLowerCase().startsWith(partialKey.toLowerCase())) {
+ iterator.remove();
+ }
+ }
+ }
+
+ // Remove cached column stats for a particular partition of a table
+ public static synchronized void removePartitionColStatsFromCache(String dbName, String tblName,
+ List<String> partVals) {
+ String partialKey = CacheUtils.buildKeyWithDelimit(dbName, tblName, partVals);
+ Iterator<Entry<String, ColumnStatisticsObj>> iterator =
+ partitionColStatsCache.entrySet().iterator();
+ while (iterator.hasNext()) {
+ Entry<String, ColumnStatisticsObj> entry = iterator.next();
+ String key = entry.getKey();
+ if (key.toLowerCase().startsWith(partialKey.toLowerCase())) {
+ iterator.remove();
+ }
+ }
+ }
+
+ // Remove cached column stats for a particular partition and a particular column of a table
+ public static synchronized void removePartitionColStatsFromCache(String dbName, String tblName,
+ List<String> partVals, String colName) {
+ partitionColStatsCache.remove(CacheUtils.buildKey(dbName, tblName, partVals, colName));
+ }
+
public static synchronized List<Partition> listCachedPartitions(String dbName, String tblName, int max) {
List<Partition> partitions = new ArrayList<Partition>();
int count = 0;
@@ -236,22 +372,53 @@ public class SharedCache {
return partitions;
}
- public static synchronized void alterPartitionInCache(String dbName, String tblName, List<String> partVals, Partition newPart) {
+ public static synchronized void alterPartitionInCache(String dbName, String tblName,
+ List<String> partVals, Partition newPart) {
removePartitionFromCache(dbName, tblName, partVals);
addPartitionToCache(HiveStringUtils.normalizeIdentifier(newPart.getDbName()),
HiveStringUtils.normalizeIdentifier(newPart.getTableName()), newPart);
}
- public static synchronized void updatePartitionColumnStatistics(String dbName, String tableName,
- List<String> partVals, List<ColumnStatisticsObj> statsObjs) {
- Partition part = getPartitionFromCache(dbName, tableName, partVals);
- part.getSd().getParameters();
- List<String> colNames = new ArrayList<>();
- for (ColumnStatisticsObj statsObj:statsObjs) {
- colNames.add(statsObj.getColName());
+ public static synchronized void alterPartitionInColStatsCache(String dbName, String tblName,
+ List<String> partVals, Partition newPart) {
+ String oldPartialPartitionKey = CacheUtils.buildKeyWithDelimit(dbName, tblName, partVals);
+ Map<String, ColumnStatisticsObj> newPartitionColStats =
+ new HashMap<String, ColumnStatisticsObj>();
+ Iterator<Entry<String, ColumnStatisticsObj>> iterator =
+ partitionColStatsCache.entrySet().iterator();
+ while (iterator.hasNext()) {
+ Entry<String, ColumnStatisticsObj> entry = iterator.next();
+ String key = entry.getKey();
+ ColumnStatisticsObj colStatObj = entry.getValue();
+ if (key.toLowerCase().startsWith(oldPartialPartitionKey.toLowerCase())) {
+ Object[] decomposedKey = CacheUtils.splitPartitionColStats(key);
+ String newKey =
+ CacheUtils.buildKey(HiveStringUtils.normalizeIdentifier(newPart.getDbName()),
+ HiveStringUtils.normalizeIdentifier(newPart.getTableName()), newPart.getValues(),
+ (String) decomposedKey[3]);
+ newPartitionColStats.put(newKey, colStatObj);
+ iterator.remove();
+ }
+ }
+ partitionColStatsCache.putAll(newPartitionColStats);
+ }
+
+ public static synchronized void updatePartitionColStatsInCache(String dbName, String tableName,
+ List<String> partVals, List<ColumnStatisticsObj> colStatsObjs) {
+ for (ColumnStatisticsObj colStatObj : colStatsObjs) {
+ // Get old stats object if present
+ String key = CacheUtils.buildKey(dbName, tableName, partVals, colStatObj.getColName());
+ ColumnStatisticsObj oldStatsObj = partitionColStatsCache.get(key);
+ if (oldStatsObj != null) {
+ // Update existing stat object's field
+ LOG.debug("CachedStore: updating partition column stats for column: "
+ + colStatObj.getColName() + ", of table: " + tableName + " and database: " + dbName);
+ StatObjectConverter.setFieldsIntoOldStats(oldStatsObj, colStatObj);
+ } else {
+ // No stats exist for this key; add a new object to the cache
+ partitionColStatsCache.put(key, colStatObj);
+ }
}
- StatsSetupConst.setColumnStatsState(part.getParameters(), colNames);
- alterPartitionInCache(dbName, tableName, partVals, part);
}
public static synchronized int getCachedPartitionCount() {
@@ -262,10 +429,47 @@ public class SharedCache {
return partitionColStatsCache.get(key);
}
- public static synchronized void addPartitionColStatsToCache(Map<String, ColumnStatisticsObj> aggrStatsPerPartition) {
- partitionColStatsCache.putAll(aggrStatsPerPartition);
+ public static synchronized void addPartitionColStatsToCache(String dbName, String tableName,
+ Map<String, List<ColumnStatisticsObj>> colStatsPerPartition) {
+ for (Map.Entry<String, List<ColumnStatisticsObj>> entry : colStatsPerPartition.entrySet()) {
+ String partName = entry.getKey();
+ try {
+ List<String> partVals = Warehouse.getPartValuesFromPartName(partName);
+ for (ColumnStatisticsObj colStatObj : entry.getValue()) {
+ String key = CacheUtils.buildKey(dbName, tableName, partVals, colStatObj.getColName());
+ partitionColStatsCache.put(key, colStatObj);
+ }
+ } catch (MetaException e) {
+ LOG.info("Unable to add partition: " + partName + " to SharedCache", e);
+ }
+ }
}
+ public static synchronized void refreshPartitionColStats(String dbName, String tableName,
+ Map<String, List<ColumnStatisticsObj>> newColStatsPerPartition) {
+ LOG.debug("CachedStore: updating cached partition column stats objects for database: " + dbName
+ + " and table: " + tableName);
+ removePartitionColStatsFromCache(dbName, tableName);
+ addPartitionColStatsToCache(dbName, tableName, newColStatsPerPartition);
+ }
+
+ public static synchronized void addTableColStatsToCache(String dbName, String tableName,
+ List<ColumnStatisticsObj> colStatsForTable) {
+ for (ColumnStatisticsObj colStatObj : colStatsForTable) {
+ String key = CacheUtils.buildKey(dbName, tableName, colStatObj.getColName());
+ tableColStatsCache.put(key, colStatObj);
+ }
+ }
+
+ public static synchronized void refreshTableColStats(String dbName, String tableName,
+ List<ColumnStatisticsObj> colStatsForTable) {
+ LOG.debug("CachedStore: updating cached table column stats objects for database: " + dbName
+ + " and table: " + tableName);
+ // Remove all old cache entries for this table
+ removeTableColStatsFromCache(dbName, tableName);
+ // Add new entries to cache
+ addTableColStatsToCache(dbName, tableName, colStatsForTable);
+ }
public static void increSd(StorageDescriptor sd, byte[] sdHash) {
ByteArrayWrapper byteArray = new ByteArrayWrapper(sdHash);
@@ -295,6 +499,7 @@ public class SharedCache {
// Replace databases in databaseCache with the new list
public static synchronized void refreshDatabases(List<Database> databases) {
+ LOG.debug("CachedStore: updating cached database objects");
for (String dbName : listCachedDatabases()) {
removeDatabaseFromCache(dbName);
}
@@ -305,6 +510,7 @@ public class SharedCache {
// Replace tables in tableCache with the new list
public static synchronized void refreshTables(String dbName, List<Table> tables) {
+ LOG.debug("CachedStore: updating cached table objects for database: " + dbName);
for (Table tbl : listCachedTables(dbName)) {
removeTableFromCache(dbName, tbl.getTableName());
}
@@ -313,17 +519,18 @@ public class SharedCache {
}
}
- public static void refreshPartitions(String dbName, String tblName, List<Partition> partitions) {
- List<String> keysToRemove = new ArrayList<String>();
- for (Map.Entry<String, PartitionWrapper> entry : partitionCache.entrySet()) {
- if (entry.getValue().getPartition().getDbName().equals(dbName)
- && entry.getValue().getPartition().getTableName().equals(tblName)) {
- keysToRemove.add(entry.getKey());
+ public static synchronized void refreshPartitions(String dbName, String tblName,
+ List<Partition> partitions) {
+ LOG.debug("CachedStore: updating cached partition objects for database: " + dbName
+ + " and table: " + tblName);
+ Iterator<Entry<String, PartitionWrapper>> iterator = partitionCache.entrySet().iterator();
+ while (iterator.hasNext()) {
+ PartitionWrapper partitionWrapper = iterator.next().getValue();
+ if (partitionWrapper.getPartition().getDbName().equals(dbName)
+ && partitionWrapper.getPartition().getTableName().equals(tblName)) {
+ iterator.remove();
}
}
- for (String key : keysToRemove) {
- partitionCache.remove(key);
- }
for (Partition part : partitions) {
addPartitionToCache(dbName, tblName, part);
}
http://git-wip-us.apache.org/repos/asf/hive/blob/d85beaa9/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseStore.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseStore.java b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseStore.java
index 0c7d8bb..a7681dd 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseStore.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseStore.java
@@ -2854,7 +2854,7 @@ public class HBaseStore implements RawStore {
}
@Override
- public Map<String, ColumnStatisticsObj> getAggrColStatsForTablePartitions(String dbName,
+ public Map<String, List<ColumnStatisticsObj>> getColStatsForTablePartitions(String dbName,
String tableName) throws MetaException, NoSuchObjectException {
// TODO: see if it makes sense to implement this here
return null;
http://git-wip-us.apache.org/repos/asf/hive/blob/d85beaa9/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/merge/ColumnStatsMergerFactory.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/merge/ColumnStatsMergerFactory.java b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/merge/ColumnStatsMergerFactory.java
index da6cd46..fe890e4 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/merge/ColumnStatsMergerFactory.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/merge/ColumnStatsMergerFactory.java
@@ -26,6 +26,7 @@ import org.apache.hadoop.hive.metastore.api.BooleanColumnStatsData;
import org.apache.hadoop.hive.metastore.api.ColumnStatisticsData;
import org.apache.hadoop.hive.metastore.api.ColumnStatisticsData._Fields;
import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
+import org.apache.hadoop.hive.metastore.api.DateColumnStatsData;
import org.apache.hadoop.hive.metastore.api.DecimalColumnStatsData;
import org.apache.hadoop.hive.metastore.api.DoubleColumnStatsData;
import org.apache.hadoop.hive.metastore.api.LongColumnStatsData;
@@ -35,7 +36,7 @@ public class ColumnStatsMergerFactory {
private ColumnStatsMergerFactory() {
}
-
+
// we depend on the toString() method for javolution.util.FastCollection.
private static int countNumBitVectors(String s) {
if (s != null) {
@@ -88,8 +89,15 @@ public class ColumnStatsMergerFactory {
numBitVectors = nbvNew == nbvOld ? nbvNew : 0;
break;
}
+ case DATE_STATS: {
+ agg = new DateColumnStatsMerger();
+ int nbvNew = countNumBitVectors(statsObjNew.getStatsData().getDateStats().getBitVectors());
+ int nbvOld = countNumBitVectors(statsObjOld.getStatsData().getDateStats().getBitVectors());
+ numBitVectors = nbvNew == nbvOld ? nbvNew : 0;
+ break;
+ }
default:
- throw new RuntimeException("Woh, bad. Unknown stats type " + typeNew.toString());
+ throw new IllegalArgumentException("Unknown stats type " + typeNew.toString());
}
if (numBitVectors > 0) {
agg.ndvEstimator = new NumDistinctValueEstimator(numBitVectors);
@@ -127,8 +135,12 @@ public class ColumnStatsMergerFactory {
csd.setDecimalStats(new DecimalColumnStatsData());
break;
+ case DATE_STATS:
+ csd.setDateStats(new DateColumnStatsData());
+ break;
+
default:
- throw new RuntimeException("Woh, bad. Unknown stats type!");
+ throw new IllegalArgumentException("Unknown stats type");
}
cso.setStatsData(csd);
http://git-wip-us.apache.org/repos/asf/hive/blob/d85beaa9/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/merge/DateColumnStatsMerger.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/merge/DateColumnStatsMerger.java b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/merge/DateColumnStatsMerger.java
new file mode 100644
index 0000000..3179b23
--- /dev/null
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/merge/DateColumnStatsMerger.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hadoop.hive.metastore.hbase.stats.merge;
+
+import org.apache.hadoop.hive.metastore.NumDistinctValueEstimator;
+import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
+import org.apache.hadoop.hive.metastore.api.Date;
+import org.apache.hadoop.hive.metastore.api.DateColumnStatsData;
+
+public class DateColumnStatsMerger extends ColumnStatsMerger {
+ @Override
+ public void merge(ColumnStatisticsObj aggregateColStats, ColumnStatisticsObj newColStats) {
+ DateColumnStatsData aggregateData = aggregateColStats.getStatsData().getDateStats();
+ DateColumnStatsData newData = newColStats.getStatsData().getDateStats();
+ Date lowValue =
+ aggregateData.getLowValue().compareTo(newData.getLowValue()) < 0 ? aggregateData
+ .getLowValue() : newData.getLowValue();
+ aggregateData.setLowValue(lowValue);
+ Date highValue =
+ aggregateData.getHighValue().compareTo(newData.getHighValue()) >= 0 ? aggregateData
+ .getHighValue() : newData.getHighValue();
+ aggregateData.setHighValue(highValue);
+ aggregateData.setNumNulls(aggregateData.getNumNulls() + newData.getNumNulls());
+ if (ndvEstimator == null || !newData.isSetBitVectors() || newData.getBitVectors().length() == 0) {
+ aggregateData.setNumDVs(Math.max(aggregateData.getNumDVs(), newData.getNumDVs()));
+ } else {
+ ndvEstimator.mergeEstimators(new NumDistinctValueEstimator(aggregateData.getBitVectors(),
+ ndvEstimator.getnumBitVectors()));
+ ndvEstimator.mergeEstimators(new NumDistinctValueEstimator(newData.getBitVectors(),
+ ndvEstimator.getnumBitVectors()));
+ long ndv = ndvEstimator.estimateNumDistinctValues();
+ LOG.debug("Use bitvector to merge column " + aggregateColStats.getColName() + "'s ndvs of "
+ + aggregateData.getNumDVs() + " and " + newData.getNumDVs() + " to be " + ndv);
+ aggregateData.setNumDVs(ndv);
+ aggregateData.setBitVectors(ndvEstimator.serialize().toString());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/d85beaa9/metastore/src/test/org/apache/hadoop/hive/metastore/DummyRawStoreControlledCommit.java
----------------------------------------------------------------------
diff --git a/metastore/src/test/org/apache/hadoop/hive/metastore/DummyRawStoreControlledCommit.java b/metastore/src/test/org/apache/hadoop/hive/metastore/DummyRawStoreControlledCommit.java
index f613c30..f53944f 100644
--- a/metastore/src/test/org/apache/hadoop/hive/metastore/DummyRawStoreControlledCommit.java
+++ b/metastore/src/test/org/apache/hadoop/hive/metastore/DummyRawStoreControlledCommit.java
@@ -872,7 +872,7 @@ public class DummyRawStoreControlledCommit implements RawStore, Configurable {
}
@Override
- public Map<String, ColumnStatisticsObj> getAggrColStatsForTablePartitions(String dbName,
+ public Map<String, List<ColumnStatisticsObj>> getColStatsForTablePartitions(String dbName,
String tableName) throws MetaException, NoSuchObjectException {
// TODO Auto-generated method stub
return null;
http://git-wip-us.apache.org/repos/asf/hive/blob/d85beaa9/metastore/src/test/org/apache/hadoop/hive/metastore/DummyRawStoreForJdoConnection.java
----------------------------------------------------------------------
diff --git a/metastore/src/test/org/apache/hadoop/hive/metastore/DummyRawStoreForJdoConnection.java b/metastore/src/test/org/apache/hadoop/hive/metastore/DummyRawStoreForJdoConnection.java
index 1720e37..e0f5cdb 100644
--- a/metastore/src/test/org/apache/hadoop/hive/metastore/DummyRawStoreForJdoConnection.java
+++ b/metastore/src/test/org/apache/hadoop/hive/metastore/DummyRawStoreForJdoConnection.java
@@ -888,7 +888,7 @@ public class DummyRawStoreForJdoConnection implements RawStore {
}
@Override
- public Map<String, ColumnStatisticsObj> getAggrColStatsForTablePartitions(String dbName,
+ public Map<String, List<ColumnStatisticsObj>> getColStatsForTablePartitions(String dbName,
String tableName) throws MetaException, NoSuchObjectException {
// TODO Auto-generated method stub
return null;
http://git-wip-us.apache.org/repos/asf/hive/blob/d85beaa9/metastore/src/test/org/apache/hadoop/hive/metastore/cache/TestCachedStore.java
----------------------------------------------------------------------
diff --git a/metastore/src/test/org/apache/hadoop/hive/metastore/cache/TestCachedStore.java b/metastore/src/test/org/apache/hadoop/hive/metastore/cache/TestCachedStore.java
index 0ab20d6..7a3ec09 100644
--- a/metastore/src/test/org/apache/hadoop/hive/metastore/cache/TestCachedStore.java
+++ b/metastore/src/test/org/apache/hadoop/hive/metastore/cache/TestCachedStore.java
@@ -25,11 +25,22 @@ import java.util.Map;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.ObjectStore;
+import org.apache.hadoop.hive.metastore.TableType;
import org.apache.hadoop.hive.metastore.TestObjectStore.MockPartitionExpressionProxy;
+import org.apache.hadoop.hive.metastore.api.BooleanColumnStatsData;
+import org.apache.hadoop.hive.metastore.api.ColumnStatistics;
+import org.apache.hadoop.hive.metastore.api.ColumnStatisticsData;
+import org.apache.hadoop.hive.metastore.api.ColumnStatisticsDesc;
+import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
import org.apache.hadoop.hive.metastore.api.Database;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.LongColumnStatsData;
+import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.PrincipalType;
+import org.apache.hadoop.hive.metastore.api.SerDeInfo;
import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.metastore.api.StringColumnStatsData;
import org.apache.hadoop.hive.metastore.api.Table;
import org.junit.Assert;
import org.junit.Before;
@@ -37,18 +48,24 @@ import org.junit.Test;
public class TestCachedStore {
- private CachedStore cachedStore = new CachedStore();
+ private ObjectStore objectStore;
+ private CachedStore cachedStore;
@Before
public void setUp() throws Exception {
HiveConf conf = new HiveConf();
- conf.setVar(HiveConf.ConfVars.METASTORE_EXPRESSION_PROXY_CLASS, MockPartitionExpressionProxy.class.getName());
-
- ObjectStore objectStore = new ObjectStore();
+ conf.setBoolean(HiveConf.ConfVars.HIVE_IN_TEST.varname, true);
+ conf.setVar(HiveConf.ConfVars.METASTORE_EXPRESSION_PROXY_CLASS,
+ MockPartitionExpressionProxy.class.getName());
+ objectStore = new ObjectStore();
objectStore.setConf(conf);
+ cachedStore = new CachedStore();
+ cachedStore.setConf(conf);
+ // Stop the CachedStore cache update service. We'll start it explicitly to control the test
+ cachedStore.stopCacheUpdateService(1);
- cachedStore.setRawStore(objectStore);
-
+ // Stop the CachedStore cache update service. We'll start it explicitly to control the test
+ cachedStore.stopCacheUpdateService(1);
SharedCache.getDatabaseCache().clear();
SharedCache.getTableCache().clear();
SharedCache.getPartitionCache().clear();
@@ -56,6 +73,426 @@ public class TestCachedStore {
SharedCache.getPartitionColStatsCache().clear();
}
+ /**********************************************************************************************
+ * Methods that test CachedStore
+ *********************************************************************************************/
+
+ @Test
+ public void testDatabaseOps() throws Exception {
+ // Add a db via ObjectStore
+ String dbName = "testDatabaseOps";
+ String dbDescription = "testDatabaseOps";
+ String dbLocation = "file:/tmp";
+ Map<String, String> dbParams = new HashMap<String, String>();
+ String dbOwner = "user1";
+ Database db = new Database(dbName, dbDescription, dbLocation, dbParams);
+ db.setOwnerName(dbOwner);
+ db.setOwnerType(PrincipalType.USER);
+ objectStore.createDatabase(db);
+ db = objectStore.getDatabase(dbName);
+ // Prewarm CachedStore
+ cachedStore.prewarm();
+
+ // Read database via CachedStore
+ Database dbNew = cachedStore.getDatabase(dbName);
+ Assert.assertEquals(db, dbNew);
+
+ // Add another db via CachedStore
+ final String dbName1 = "testDatabaseOps1";
+ final String dbDescription1 = "testDatabaseOps1";
+ Database db1 = new Database(dbName1, dbDescription1, dbLocation, dbParams);
+ db1.setOwnerName(dbOwner);
+ db1.setOwnerType(PrincipalType.USER);
+ cachedStore.createDatabase(db1);
+ db1 = cachedStore.getDatabase(dbName1);
+
+ // Read db via ObjectStore
+ dbNew = objectStore.getDatabase(dbName1);
+ Assert.assertEquals(db1, dbNew);
+
+ // Alter the db via CachedStore (can only alter owner or parameters)
+ db = new Database(dbName, dbDescription, dbLocation, dbParams);
+ dbOwner = "user2";
+ db.setOwnerName(dbOwner);
+ db.setOwnerType(PrincipalType.USER);
+ cachedStore.alterDatabase(dbName, db);
+ db = cachedStore.getDatabase(dbName);
+
+ // Read db via ObjectStore
+ dbNew = objectStore.getDatabase(dbName);
+ Assert.assertEquals(db, dbNew);
+
+ // Add another db via ObjectStore
+ final String dbName2 = "testDatabaseOps2";
+ final String dbDescription2 = "testDatabaseOps2";
+ Database db2 = new Database(dbName2, dbDescription2, dbLocation, dbParams);
+ db2.setOwnerName(dbOwner);
+ db2.setOwnerType(PrincipalType.USER);
+ objectStore.createDatabase(db2);
+ db2 = objectStore.getDatabase(dbName2);
+
+ // Alter db "testDatabaseOps" via ObjectStore
+ dbOwner = "user1";
+ db = new Database(dbName, dbDescription, dbLocation, dbParams);
+ db.setOwnerName(dbOwner);
+ db.setOwnerType(PrincipalType.USER);
+ objectStore.alterDatabase(dbName, db);
+ db = objectStore.getDatabase(dbName);
+
+ // Drop db "testDatabaseOps1" via ObjectStore
+ objectStore.dropDatabase(dbName1);
+
+ // We update twice to accurately detect if cache is dirty or not
+ updateCache(cachedStore, 100, 500, 100);
+ updateCache(cachedStore, 100, 500, 100);
+
+ // Read the newly added db via CachedStore
+ dbNew = cachedStore.getDatabase(dbName2);
+ Assert.assertEquals(db2, dbNew);
+
+ // Read the altered db via CachedStore (altered user from "user2" to "user1")
+ dbNew = cachedStore.getDatabase(dbName);
+ Assert.assertEquals(db, dbNew);
+
+ // Try to read the dropped db after cache update
+ try {
+ dbNew = cachedStore.getDatabase(dbName1);
+ Assert.fail("The database: " + dbName1
+ + " should have been removed from the cache after running the update service");
+ } catch (NoSuchObjectException e) {
+ // Expected
+ }
+
+ // Clean up
+ objectStore.dropDatabase(dbName);
+ objectStore.dropDatabase(dbName2);
+ }
+
+ @Test
+ public void testTableOps() throws Exception {
+ // Add a db via ObjectStore
+ String dbName = "testTableOps";
+ String dbDescription = "testTableOps";
+ String dbLocation = "file:/tmp";
+ Map<String, String> dbParams = new HashMap<String, String>();
+ String dbOwner = "user1";
+ Database db = new Database(dbName, dbDescription, dbLocation, dbParams);
+ db.setOwnerName(dbOwner);
+ db.setOwnerType(PrincipalType.USER);
+ objectStore.createDatabase(db);
+ db = objectStore.getDatabase(dbName);
+
+ // Add a table via ObjectStore
+ String tblName = "tbl";
+ String tblOwner = "user1";
+ String serdeLocation = "file:/tmp";
+ FieldSchema col1 = new FieldSchema("col1", "int", "integer column");
+ FieldSchema col2 = new FieldSchema("col2", "string", "string column");
+ List<FieldSchema> cols = new ArrayList<FieldSchema>();
+ cols.add(col1);
+ cols.add(col2);
+ Map<String, String> serdeParams = new HashMap<String, String>();
+ Map<String, String> tblParams = new HashMap<String, String>();
+ SerDeInfo serdeInfo = new SerDeInfo("serde", "seriallib", new HashMap<String, String>());
+ StorageDescriptor sd =
+ new StorageDescriptor(cols, serdeLocation, "input", "output", false, 0, serdeInfo, null,
+ null, serdeParams);
+ sd.setStoredAsSubDirectories(false);
+ Table tbl =
+ new Table(tblName, dbName, tblOwner, 0, 0, 0, sd, new ArrayList<FieldSchema>(), tblParams,
+ null, null, TableType.MANAGED_TABLE.toString());
+ objectStore.createTable(tbl);
+ tbl = objectStore.getTable(dbName, tblName);
+
+ // Prewarm CachedStore
+ cachedStore.prewarm();
+
+ // Read database, table via CachedStore
+ Database dbNew = cachedStore.getDatabase(dbName);
+ Assert.assertEquals(db, dbNew);
+ Table tblNew = cachedStore.getTable(dbName, tblName);
+ Assert.assertEquals(tbl, tblNew);
+
+ // Add a new table via CachedStore
+ String tblName1 = "tbl1";
+ Table tbl1 =
+ new Table(tblName1, dbName, tblOwner, 0, 0, 0, sd, new ArrayList<FieldSchema>(), tblParams,
+ null, null, TableType.MANAGED_TABLE.toString());
+ cachedStore.createTable(tbl1);
+ tbl1 = cachedStore.getTable(dbName, tblName1);
+
+ // Read via object store
+ tblNew = objectStore.getTable(dbName, tblName1);
+ Assert.assertEquals(tbl1, tblNew);
+
+ // Add a new table via ObjectStore
+ String tblName2 = "tbl2";
+ Table tbl2 =
+ new Table(tblName2, dbName, tblOwner, 0, 0, 0, sd, new ArrayList<FieldSchema>(), tblParams,
+ null, null, TableType.MANAGED_TABLE.toString());
+ objectStore.createTable(tbl2);
+ tbl2 = objectStore.getTable(dbName, tblName2);
+
+ // Alter table "tbl" via ObjectStore
+ tblOwner = "user2";
+ tbl =
+ new Table(tblName, dbName, tblOwner, 0, 0, 0, sd, new ArrayList<FieldSchema>(), tblParams,
+ null, null, TableType.MANAGED_TABLE.toString());
+ objectStore.alterTable(dbName, tblName, tbl);
+ tbl = objectStore.getTable(dbName, tblName);
+
+ // Drop table "tbl1" via ObjectStore
+ objectStore.dropTable(dbName, tblName1);
+
+ // We update twice to accurately detect if cache is dirty or not
+ updateCache(cachedStore, 100, 500, 100);
+ updateCache(cachedStore, 100, 500, 100);
+
+ // Read "tbl2" via CachedStore
+ tblNew = cachedStore.getTable(dbName, tblName2);
+ Assert.assertEquals(tbl2, tblNew);
+
+ // Read the altered "tbl" via CachedStore
+ tblNew = cachedStore.getTable(dbName, tblName);
+ Assert.assertEquals(tbl, tblNew);
+
+ // Try to read the dropped "tbl1" via CachedStore (should throw exception)
+ tblNew = cachedStore.getTable(dbName, tblName1);
+ Assert.assertNull(tblNew);
+
+ // Should return "tbl" and "tbl2"
+ List<String> tblNames = cachedStore.getTables(dbName, "*");
+ Assert.assertTrue(tblNames.contains(tblName));
+ Assert.assertTrue(!tblNames.contains(tblName1));
+ Assert.assertTrue(tblNames.contains(tblName2));
+
+ // Clean up
+ objectStore.dropTable(dbName, tblName);
+ objectStore.dropTable(dbName, tblName2);
+ objectStore.dropDatabase(dbName);
+ }
+
+ @Test
+ public void testPartitionOps() throws Exception {
+ // Add a db via ObjectStore
+ String dbName = "testPartitionOps";
+ String dbDescription = "testPartitionOps";
+ String dbLocation = "file:/tmp";
+ Map<String, String> dbParams = new HashMap<String, String>();
+ String dbOwner = "user1";
+ Database db = new Database(dbName, dbDescription, dbLocation, dbParams);
+ db.setOwnerName(dbOwner);
+ db.setOwnerType(PrincipalType.USER);
+ objectStore.createDatabase(db);
+ db = objectStore.getDatabase(dbName);
+
+ // Add a table via ObjectStore
+ String tblName = "tbl";
+ String tblOwner = "user1";
+ String serdeLocation = "file:/tmp";
+ FieldSchema col1 = new FieldSchema("col1", "int", "integer column");
+ FieldSchema col2 = new FieldSchema("col2", "string", "string column");
+ List<FieldSchema> cols = new ArrayList<FieldSchema>();
+ cols.add(col1);
+ cols.add(col2);
+ Map<String, String> serdeParams = new HashMap<String, String>();
+ Map<String, String> tblParams = new HashMap<String, String>();
+ SerDeInfo serdeInfo = new SerDeInfo("serde", "seriallib", null);
+ StorageDescriptor sd =
+ new StorageDescriptor(cols, serdeLocation, "input", "output", false, 0, serdeInfo, null,
+ null, serdeParams);
+ FieldSchema ptnCol1 = new FieldSchema("part1", "string", "string partition column");
+ List<FieldSchema> ptnCols = new ArrayList<FieldSchema>();
+ ptnCols.add(ptnCol1);
+ Table tbl =
+ new Table(tblName, dbName, tblOwner, 0, 0, 0, sd, ptnCols, tblParams, null, null,
+ TableType.MANAGED_TABLE.toString());
+ objectStore.createTable(tbl);
+ tbl = objectStore.getTable(dbName, tblName);
+ final String ptnColVal1 = "aaa";
+ Map<String, String> partParams = new HashMap<String, String>();
+ Partition ptn1 =
+ new Partition(Arrays.asList(ptnColVal1), dbName, tblName, 0, 0, sd, partParams);
+ objectStore.addPartition(ptn1);
+ ptn1 = objectStore.getPartition(dbName, tblName, Arrays.asList(ptnColVal1));
+ final String ptnColVal2 = "bbb";
+ Partition ptn2 =
+ new Partition(Arrays.asList(ptnColVal2), dbName, tblName, 0, 0, sd, partParams);
+ objectStore.addPartition(ptn2);
+ ptn2 = objectStore.getPartition(dbName, tblName, Arrays.asList(ptnColVal2));
+
+ // Prewarm CachedStore
+ cachedStore.prewarm();
+
+ // Read database, table, partition via CachedStore
+ Database dbNew = cachedStore.getDatabase(dbName);
+ Assert.assertEquals(db, dbNew);
+ Table tblNew = cachedStore.getTable(dbName, tblName);
+ Assert.assertEquals(tbl, tblNew);
+ Partition newPtn1 = cachedStore.getPartition(dbName, tblName, Arrays.asList(ptnColVal1));
+ Assert.assertEquals(ptn1, newPtn1);
+ Partition newPtn2 = cachedStore.getPartition(dbName, tblName, Arrays.asList(ptnColVal2));
+ Assert.assertEquals(ptn2, newPtn2);
+
+ // Add a new partition via ObjectStore
+ final String ptnColVal3 = "ccc";
+ Partition ptn3 =
+ new Partition(Arrays.asList(ptnColVal3), dbName, tblName, 0, 0, sd, partParams);
+ objectStore.addPartition(ptn3);
+ ptn3 = objectStore.getPartition(dbName, tblName, Arrays.asList(ptnColVal3));
+
+ // Alter an existing partition ("aaa") via ObjectStore
+ final String ptnColVal1Alt = "aaaAlt";
+ Partition ptn1Atl =
+ new Partition(Arrays.asList(ptnColVal1Alt), dbName, tblName, 0, 0, sd, partParams);
+ objectStore.alterPartition(dbName, tblName, Arrays.asList(ptnColVal1), ptn1Atl);
+ ptn1Atl = objectStore.getPartition(dbName, tblName, Arrays.asList(ptnColVal1Alt));
+
+ // Drop an existing partition ("bbb") via ObjectStore
+ objectStore.dropPartition(dbName, tblName, Arrays.asList(ptnColVal2));
+
+ // We update twice to accurately detect if cache is dirty or not
+ updateCache(cachedStore, 100, 500, 100);
+ updateCache(cachedStore, 100, 500, 100);
+
+ // Read the newly added partition via CachedStore
+ Partition newPtn = cachedStore.getPartition(dbName, tblName, Arrays.asList(ptnColVal3));
+ Assert.assertEquals(ptn3, newPtn);
+
+ // Read the altered partition via CachedStore
+ newPtn = cachedStore.getPartition(dbName, tblName, Arrays.asList(ptnColVal1Alt));
+ Assert.assertEquals(ptn1Atl, newPtn);
+
+ // Try to read the dropped partition via CachedStore
+ try {
+ newPtn = cachedStore.getPartition(dbName, tblName, Arrays.asList(ptnColVal2));
+ Assert.fail("The partition: " + ptnColVal2
+ + " should have been removed from the cache after running the update service");
+ } catch (NoSuchObjectException e) {
+ // Expected
+ }
+ }
+
+ //@Test
+ public void testTableColStatsOps() throws Exception {
+ // Add a db via ObjectStore
+ String dbName = "testTableColStatsOps";
+ String dbDescription = "testTableColStatsOps";
+ String dbLocation = "file:/tmp";
+ Map<String, String> dbParams = new HashMap<String, String>();
+ String dbOwner = "user1";
+ Database db = new Database(dbName, dbDescription, dbLocation, dbParams);
+ db.setOwnerName(dbOwner);
+ db.setOwnerType(PrincipalType.USER);
+ objectStore.createDatabase(db);
+ db = objectStore.getDatabase(dbName);
+
+ // Add a table via ObjectStore
+ final String tblName = "tbl";
+ final String tblOwner = "user1";
+ final String serdeLocation = "file:/tmp";
+ final FieldSchema col1 = new FieldSchema("col1", "int", "integer column");
+ // Stats values for col1
+ long col1LowVal = 5;
+ long col1HighVal = 500;
+ long col1Nulls = 10;
+ long col1DV = 20;
+ final FieldSchema col2 = new FieldSchema("col2", "string", "string column");
+ // Stats values for col2
+ long col2MaxColLen = 100;
+ double col2AvgColLen = 45.5;
+ long col2Nulls = 5;
+ long col2DV = 40;
+ final FieldSchema col3 = new FieldSchema("col3", "boolean", "boolean column");
+ // Stats values for col3
+ long col3NumTrues = 100;
+ long col3NumFalses = 30;
+ long col3Nulls = 10;
+ final List<FieldSchema> cols = new ArrayList<FieldSchema>();
+ cols.add(col1);
+ cols.add(col2);
+ cols.add(col3);
+ Map<String, String> serdeParams = new HashMap<String, String>();
+ Map<String, String> tblParams = new HashMap<String, String>();
+ final SerDeInfo serdeInfo = new SerDeInfo("serde", "seriallib", null);
+ StorageDescriptor sd =
+ new StorageDescriptor(cols, serdeLocation, "input", "output", false, 0, serdeInfo, null,
+ null, serdeParams);
+ Table tbl =
+ new Table(tblName, dbName, tblOwner, 0, 0, 0, sd, new ArrayList<FieldSchema>(), tblParams,
+ null, null, TableType.MANAGED_TABLE.toString());
+ objectStore.createTable(tbl);
+ tbl = objectStore.getTable(dbName, tblName);
+
+ // Add ColumnStatistics for tbl to metastore DB via ObjectStore
+ ColumnStatistics stats = new ColumnStatistics();
+ ColumnStatisticsDesc statsDesc = new ColumnStatisticsDesc(true, dbName, tblName);
+ List<ColumnStatisticsObj> colStatObjs = new ArrayList<ColumnStatisticsObj>();
+
+ // Col1
+ ColumnStatisticsData data1 = new ColumnStatisticsData();
+ ColumnStatisticsObj col1Stats = new ColumnStatisticsObj(col1.getName(), col1.getType(), data1);
+ LongColumnStatsData longStats = new LongColumnStatsData();
+ longStats.setLowValue(col1LowVal);
+ longStats.setHighValue(col1HighVal);
+ longStats.setNumNulls(col1Nulls);
+ longStats.setNumDVs(col1DV);
+ data1.setLongStats(longStats);
+ colStatObjs.add(col1Stats);
+
+ // Col2
+ ColumnStatisticsData data2 = new ColumnStatisticsData();
+ ColumnStatisticsObj col2Stats = new ColumnStatisticsObj(col2.getName(), col2.getType(), data2);
+ StringColumnStatsData stringStats = new StringColumnStatsData();
+ stringStats.setMaxColLen(col2MaxColLen);
+ stringStats.setAvgColLen(col2AvgColLen);
+ stringStats.setNumNulls(col2Nulls);
+ stringStats.setNumDVs(col2DV);
+ data2.setStringStats(stringStats);
+ colStatObjs.add(col2Stats);
+
+ // Col3
+ ColumnStatisticsData data3 = new ColumnStatisticsData();
+ ColumnStatisticsObj col3Stats = new ColumnStatisticsObj(col3.getName(), col3.getType(), data3);
+ BooleanColumnStatsData boolStats = new BooleanColumnStatsData();
+ boolStats.setNumTrues(col3NumTrues);
+ boolStats.setNumFalses(col3NumFalses);
+ boolStats.setNumNulls(col3Nulls);
+ data3.setBooleanStats(boolStats);
+ colStatObjs.add(col3Stats);
+
+ stats.setStatsDesc(statsDesc);
+ stats.setStatsObj(colStatObjs);
+
+ // Save to DB
+ objectStore.updateTableColumnStatistics(stats);
+
+ // Prewarm CachedStore
+ cachedStore.prewarm();
+
+ // Read table stats via CachedStore
+ ColumnStatistics newStats =
+ cachedStore.getTableColumnStatistics(dbName, tblName,
+ Arrays.asList(col1.getName(), col2.getName(), col3.getName()));
+ Assert.assertEquals(stats, newStats);
+ }
+
+ private void updateCache(CachedStore cachedStore, long frequency, long sleepTime,
+ long shutdownTimeout) throws InterruptedException {
+ // Set cache refresh period to 100 milliseconds
+ cachedStore.setCacheRefreshPeriod(100);
+ // Start the CachedStore update service
+ cachedStore.startCacheUpdateService();
+ // Sleep for 500 ms so that cache update is complete
+ Thread.sleep(500);
+ // Stop cache update service
+ cachedStore.stopCacheUpdateService(100);
+ }
+
+ /**********************************************************************************************
+ * Methods that test SharedCache
+ *********************************************************************************************/
+
@Test
public void testSharedStoreDb() {
Database db1 = new Database();
@@ -160,6 +597,7 @@ public class TestCachedStore {
Assert.assertEquals(SharedCache.getSdCache().size(), 2);
}
+
@Test
public void testSharedStorePartition() {
Partition part1 = new Partition();