You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by vg...@apache.org on 2018/04/11 22:45:47 UTC
hive git commit: HIVE-18840: CachedStore: Prioritize loading of
recently accessed tables during prewarm (Vaibhav Gumashta reviewed by Daniel
Dai)
Repository: hive
Updated Branches:
refs/heads/master 42187fdbc -> b3fe6522e
HIVE-18840: CachedStore: Prioritize loading of recently accessed tables during prewarm (Vaibhav Gumashta reviewed by Daniel Dai)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/b3fe6522
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/b3fe6522
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/b3fe6522
Branch: refs/heads/master
Commit: b3fe6522e651fa4f00f1a1a75e6f12c132eacf21
Parents: 42187fd
Author: Vaibhav Gumashta <vg...@hortonworks.com>
Authored: Wed Apr 11 15:39:30 2018 -0700
Committer: Vaibhav Gumashta <vg...@hortonworks.com>
Committed: Wed Apr 11 15:39:30 2018 -0700
----------------------------------------------------------------------
.../hive/metastore/cache/CachedStore.java | 198 +++++++++++--------
1 file changed, 114 insertions(+), 84 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/b3fe6522/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/cache/CachedStore.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/cache/CachedStore.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/cache/CachedStore.java
index c47856d..1ce86bb 100644
--- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/cache/CachedStore.java
+++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/cache/CachedStore.java
@@ -18,23 +18,21 @@
package org.apache.hadoop.hive.metastore.cache;
-import java.io.Closeable;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
-import java.util.Collections;
+import java.util.EmptyStackException;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import java.util.Stack;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@@ -100,7 +98,6 @@ import org.apache.hadoop.hive.metastore.api.SQLUniqueConstraint;
import org.apache.hadoop.hive.metastore.api.SchemaVersion;
import org.apache.hadoop.hive.metastore.api.SchemaVersionDescriptor;
import org.apache.hadoop.hive.metastore.api.SerDeInfo;
-import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.metastore.api.TableMeta;
import org.apache.hadoop.hive.metastore.api.Type;
@@ -146,6 +143,7 @@ public class CachedStore implements RawStore, Configurable {
// Time after which metastore cache is updated from metastore DB by the background update thread
private static long cacheRefreshPeriodMS = DEFAULT_CACHE_REFRESH_PERIOD;
private static AtomicBoolean isCachePrewarmed = new AtomicBoolean(false);
+ private static TablesPendingPrewarm tblsPendingPrewarm = new TablesPendingPrewarm();
private RawStore rawStore = null;
private Configuration conf;
private PartitionExpressionProxy expressionProxy = null;
@@ -153,10 +151,6 @@ public class CachedStore implements RawStore, Configurable {
static final private Logger LOG = LoggerFactory.getLogger(CachedStore.class.getName());
- public CachedStore() {
-
- }
-
@Override
public void setConf(Configuration conf) {
setConfInternal(conf);
@@ -211,12 +205,13 @@ public class CachedStore implements RawStore, Configurable {
Collection<String> catalogsToCache;
try {
catalogsToCache = catalogsToCache(rawStore);
- LOG.info("Going to cache catalogs: " +
- org.apache.commons.lang.StringUtils.join(catalogsToCache, ", "));
+ LOG.info("Going to cache catalogs: "
+ + org.apache.commons.lang.StringUtils.join(catalogsToCache, ", "));
List<Catalog> catalogs = new ArrayList<>(catalogsToCache.size());
- for (String catName : catalogsToCache) catalogs.add(rawStore.getCatalog(catName));
+ for (String catName : catalogsToCache)
+ catalogs.add(rawStore.getCatalog(catName));
sharedCache.populateCatalogsInCache(catalogs);
- } catch (MetaException|NoSuchObjectException e) {
+ } catch (MetaException | NoSuchObjectException e) {
LOG.warn("Failed to populate catalogs in cache, going to try again", e);
// try again
continue;
@@ -232,8 +227,8 @@ public class CachedStore implements RawStore, Configurable {
databases.add(rawStore.getDatabase(catName, dbName));
} catch (NoSuchObjectException e) {
// Continue with next database
- LOG.warn("Failed to cache database " +
- Warehouse.getCatalogQualifiedDbName(catName, dbName) + ", moving on", e);
+ LOG.warn("Failed to cache database "
+ + Warehouse.getCatalogQualifiedDbName(catName, dbName) + ", moving on", e);
}
}
} catch (MetaException e) {
@@ -251,87 +246,92 @@ public class CachedStore implements RawStore, Configurable {
try {
tblNames = rawStore.getAllTables(catName, dbName);
} catch (MetaException e) {
- LOG.warn("Failed to cache tables for database " +
- Warehouse.getCatalogQualifiedDbName(catName, dbName) + ", moving on");
+ LOG.warn("Failed to cache tables for database "
+ + Warehouse.getCatalogQualifiedDbName(catName, dbName) + ", moving on");
// Continue with next database
continue;
}
+ tblsPendingPrewarm.addTableNamesForPrewarming(tblNames);
+ int totalTablesToCache = tblNames.size();
int numberOfTablesCachedSoFar = 0;
- for (String tblName : tblNames) {
- tblName = StringUtils.normalizeIdentifier(tblName);
- if (!shouldCacheTable(catName, dbName, tblName)) {
- continue;
-
- }
- Table table;
+ while (tblsPendingPrewarm.hasMoreTablesToPrewarm()) {
try {
- table = rawStore.getTable(catName, dbName, tblName);
- } catch (MetaException e) {
- LOG.warn("Failed cache table " +
- Warehouse.getCatalogQualifiedTableName(catName, dbName, tblName) +
- ", moving on");
- // It is possible the table is deleted during fetching tables of the database,
- // in that case, continue with the next table
- continue;
- }
- List<String> colNames = MetaStoreUtils.getColumnNamesForTable(table);
- try {
- ColumnStatistics tableColStats = null;
- List<Partition> partitions = null;
- List<ColumnStatistics> partitionColStats = null;
- AggrStats aggrStatsAllPartitions = null;
- AggrStats aggrStatsAllButDefaultPartition = null;
- if (table.isSetPartitionKeys()) {
- Deadline.startTimer("getPartitions");
- partitions = rawStore.getPartitions(catName, dbName, tblName, Integer.MAX_VALUE);
- Deadline.stopTimer();
- List<String> partNames = new ArrayList<>(partitions.size());
- for (Partition p : partitions) {
- partNames.add(Warehouse.makePartName(table.getPartitionKeys(), p.getValues()));
- }
- if (!partNames.isEmpty()) {
- // Get partition column stats for this table
- Deadline.startTimer("getPartitionColumnStatistics");
- partitionColStats =
- rawStore.getPartitionColumnStatistics(catName, dbName, tblName, partNames, colNames);
- Deadline.stopTimer();
- // Get aggregate stats for all partitions of a table and for all but default
- // partition
- Deadline.startTimer("getAggrPartitionColumnStatistics");
- aggrStatsAllPartitions =
- rawStore.get_aggr_stats_for(catName, dbName, tblName, partNames, colNames);
+ String tblName =
+ StringUtils.normalizeIdentifier(tblsPendingPrewarm.getNextTableNameToPrewarm());
+ if (!shouldCacheTable(catName, dbName, tblName)) {
+ continue;
+ }
+ Table table;
+ try {
+ table = rawStore.getTable(catName, dbName, tblName);
+ } catch (MetaException e) {
+ // It is possible the table is deleted during fetching tables of the database,
+ // in that case, continue with the next table
+ continue;
+ }
+ List<String> colNames = MetaStoreUtils.getColumnNamesForTable(table);
+ try {
+ ColumnStatistics tableColStats = null;
+ List<Partition> partitions = null;
+ List<ColumnStatistics> partitionColStats = null;
+ AggrStats aggrStatsAllPartitions = null;
+ AggrStats aggrStatsAllButDefaultPartition = null;
+ if (table.isSetPartitionKeys()) {
+ Deadline.startTimer("getPartitions");
+ partitions = rawStore.getPartitions(catName, dbName, tblName, Integer.MAX_VALUE);
Deadline.stopTimer();
- // Remove default partition from partition names and get aggregate
- // stats again
- List<FieldSchema> partKeys = table.getPartitionKeys();
- String defaultPartitionValue =
- MetastoreConf.getVar(rawStore.getConf(), ConfVars.DEFAULTPARTITIONNAME);
- List<String> partCols = new ArrayList<>();
- List<String> partVals = new ArrayList<>();
- for (FieldSchema fs : partKeys) {
- partCols.add(fs.getName());
- partVals.add(defaultPartitionValue);
+ List<String> partNames = new ArrayList<>(partitions.size());
+ for (Partition p : partitions) {
+ partNames.add(Warehouse.makePartName(table.getPartitionKeys(), p.getValues()));
+ }
+ if (!partNames.isEmpty()) {
+ // Get partition column stats for this table
+ Deadline.startTimer("getPartitionColumnStatistics");
+ partitionColStats = rawStore.getPartitionColumnStatistics(catName, dbName,
+ tblName, partNames, colNames);
+ Deadline.stopTimer();
+ // Get aggregate stats for all partitions of a table and for all but default
+ // partition
+ Deadline.startTimer("getAggrPartitionColumnStatistics");
+ aggrStatsAllPartitions =
+ rawStore.get_aggr_stats_for(catName, dbName, tblName, partNames, colNames);
+ Deadline.stopTimer();
+ // Remove default partition from partition names and get aggregate
+ // stats again
+ List<FieldSchema> partKeys = table.getPartitionKeys();
+ String defaultPartitionValue =
+ MetastoreConf.getVar(rawStore.getConf(), ConfVars.DEFAULTPARTITIONNAME);
+ List<String> partCols = new ArrayList<>();
+ List<String> partVals = new ArrayList<>();
+ for (FieldSchema fs : partKeys) {
+ partCols.add(fs.getName());
+ partVals.add(defaultPartitionValue);
+ }
+ String defaultPartitionName = FileUtils.makePartName(partCols, partVals);
+ partNames.remove(defaultPartitionName);
+ Deadline.startTimer("getAggrPartitionColumnStatistics");
+ aggrStatsAllButDefaultPartition =
+ rawStore.get_aggr_stats_for(catName, dbName, tblName, partNames, colNames);
+ Deadline.stopTimer();
}
- String defaultPartitionName = FileUtils.makePartName(partCols, partVals);
- partNames.remove(defaultPartitionName);
- Deadline.startTimer("getAggrPartitionColumnStatistics");
- aggrStatsAllButDefaultPartition =
- rawStore.get_aggr_stats_for(catName, dbName, tblName, partNames, colNames);
+ } else {
+ Deadline.startTimer("getTableColumnStatistics");
+ tableColStats =
+ rawStore.getTableColumnStatistics(catName, dbName, tblName, colNames);
Deadline.stopTimer();
}
- } else {
- Deadline.startTimer("getTableColumnStatistics");
- tableColStats = rawStore.getTableColumnStatistics(catName, dbName, tblName, colNames);
- Deadline.stopTimer();
+ sharedCache.populateTableInCache(table, tableColStats, partitions, partitionColStats,
+ aggrStatsAllPartitions, aggrStatsAllButDefaultPartition);
+ } catch (MetaException | NoSuchObjectException e) {
+ // Continue with next table
+ continue;
}
- sharedCache.populateTableInCache(table, tableColStats, partitions, partitionColStats,
- aggrStatsAllPartitions, aggrStatsAllButDefaultPartition);
- } catch (MetaException | NoSuchObjectException e) {
- // Continue with next table
+ LOG.debug("Processed database: {}'s table: {}. Cached {} / {} tables so far.", dbName,
+ tblName, ++numberOfTablesCachedSoFar, totalTablesToCache);
+ } catch (EmptyStackException e) {
+ // We've prewarmed this database, continue with the next one
continue;
}
- LOG.debug("Processed database: {}'s table: {}. Cached {} / {} tables so far.", dbName,
- tblName, ++numberOfTablesCachedSoFar, tblNames.size());
}
LOG.debug("Processed database: {}. Cached {} / {} databases so far.", dbName,
++numberOfDatabasesCachedSoFar, databases.size());
@@ -344,6 +344,32 @@ public class CachedStore implements RawStore, Configurable {
sharedCache.completeTableCachePrewarm();
}
+ static class TablesPendingPrewarm {
+ private Stack<String> tableNames = new Stack<>();
+
+ private synchronized void addTableNamesForPrewarming(List<String> tblNames) {
+ tableNames.clear();
+ if (tblNames != null) {
+ tableNames.addAll(tblNames);
+ }
+ }
+
+ private synchronized boolean hasMoreTablesToPrewarm() {
+ return !tableNames.empty();
+ }
+
+ private synchronized String getNextTableNameToPrewarm() {
+ return tableNames.pop();
+ }
+
+ private synchronized void prioritizeTableForPrewarm(String tblName) {
+ // If the table is in the pending prewarm list, move it to the top
+ if (tableNames.remove(tblName)) {
+ tableNames.push(tblName);
+ }
+ }
+ }
+
@VisibleForTesting
static void setCachePrewarmedState(boolean state) {
isCachePrewarmed.set(state);
@@ -830,6 +856,10 @@ public class CachedStore implements RawStore, Configurable {
Table tbl = sharedCache.getTableFromCache(catName, dbName, tblName);
if (tbl == null) {
// This table is not yet loaded in cache
+ // If the prewarm thread is working on this table's database,
+ // let's move this table to the top of tblNamesBeingPrewarmed stack,
+ // so that it gets loaded to the cache faster and is available for subsequent requests
+ tblsPendingPrewarm.prioritizeTableForPrewarm(tblName);
return rawStore.getTable(catName, dbName, tblName);
}
if (tbl != null) {