You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by vg...@apache.org on 2018/04/11 22:45:47 UTC

hive git commit: HIVE-18840: CachedStore: Prioritize loading of recently accessed tables during prewarm (Vaibhav Gumashta reviewed by Daniel Dai)

Repository: hive
Updated Branches:
  refs/heads/master 42187fdbc -> b3fe6522e


HIVE-18840: CachedStore: Prioritize loading of recently accessed tables during prewarm (Vaibhav Gumashta reviewed by Daniel Dai)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/b3fe6522
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/b3fe6522
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/b3fe6522

Branch: refs/heads/master
Commit: b3fe6522e651fa4f00f1a1a75e6f12c132eacf21
Parents: 42187fd
Author: Vaibhav Gumashta <vg...@hortonworks.com>
Authored: Wed Apr 11 15:39:30 2018 -0700
Committer: Vaibhav Gumashta <vg...@hortonworks.com>
Committed: Wed Apr 11 15:39:30 2018 -0700

----------------------------------------------------------------------
 .../hive/metastore/cache/CachedStore.java       | 198 +++++++++++--------
 1 file changed, 114 insertions(+), 84 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/b3fe6522/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/cache/CachedStore.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/cache/CachedStore.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/cache/CachedStore.java
index c47856d..1ce86bb 100644
--- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/cache/CachedStore.java
+++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/cache/CachedStore.java
@@ -18,23 +18,21 @@
 package org.apache.hadoop.hive.metastore.cache;
 
 
-import java.io.Closeable;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
-import java.util.Collections;
+import java.util.EmptyStackException;
 import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.Stack;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
@@ -100,7 +98,6 @@ import org.apache.hadoop.hive.metastore.api.SQLUniqueConstraint;
 import org.apache.hadoop.hive.metastore.api.SchemaVersion;
 import org.apache.hadoop.hive.metastore.api.SchemaVersionDescriptor;
 import org.apache.hadoop.hive.metastore.api.SerDeInfo;
-import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
 import org.apache.hadoop.hive.metastore.api.Table;
 import org.apache.hadoop.hive.metastore.api.TableMeta;
 import org.apache.hadoop.hive.metastore.api.Type;
@@ -146,6 +143,7 @@ public class CachedStore implements RawStore, Configurable {
   // Time after which metastore cache is updated from metastore DB by the background update thread
   private static long cacheRefreshPeriodMS = DEFAULT_CACHE_REFRESH_PERIOD;
   private static AtomicBoolean isCachePrewarmed = new AtomicBoolean(false);
+  private static TablesPendingPrewarm tblsPendingPrewarm = new TablesPendingPrewarm();
   private RawStore rawStore = null;
   private Configuration conf;
   private PartitionExpressionProxy expressionProxy = null;
@@ -153,10 +151,6 @@ public class CachedStore implements RawStore, Configurable {
 
   static final private Logger LOG = LoggerFactory.getLogger(CachedStore.class.getName());
 
-  public CachedStore() {
-
-  }
-
   @Override
   public void setConf(Configuration conf) {
     setConfInternal(conf);
@@ -211,12 +205,13 @@ public class CachedStore implements RawStore, Configurable {
       Collection<String> catalogsToCache;
       try {
         catalogsToCache = catalogsToCache(rawStore);
-        LOG.info("Going to cache catalogs: " +
-            org.apache.commons.lang.StringUtils.join(catalogsToCache, ", "));
+        LOG.info("Going to cache catalogs: "
+            + org.apache.commons.lang.StringUtils.join(catalogsToCache, ", "));
         List<Catalog> catalogs = new ArrayList<>(catalogsToCache.size());
-        for (String catName : catalogsToCache) catalogs.add(rawStore.getCatalog(catName));
+        for (String catName : catalogsToCache)
+          catalogs.add(rawStore.getCatalog(catName));
         sharedCache.populateCatalogsInCache(catalogs);
-      } catch (MetaException|NoSuchObjectException e) {
+      } catch (MetaException | NoSuchObjectException e) {
         LOG.warn("Failed to populate catalogs in cache, going to try again", e);
         // try again
         continue;
@@ -232,8 +227,8 @@ public class CachedStore implements RawStore, Configurable {
               databases.add(rawStore.getDatabase(catName, dbName));
             } catch (NoSuchObjectException e) {
               // Continue with next database
-              LOG.warn("Failed to cache database " +
-                  Warehouse.getCatalogQualifiedDbName(catName, dbName) + ", moving on", e);
+              LOG.warn("Failed to cache database "
+                  + Warehouse.getCatalogQualifiedDbName(catName, dbName) + ", moving on", e);
             }
           }
         } catch (MetaException e) {
@@ -251,87 +246,92 @@ public class CachedStore implements RawStore, Configurable {
         try {
           tblNames = rawStore.getAllTables(catName, dbName);
         } catch (MetaException e) {
-          LOG.warn("Failed to cache tables for database " +
-              Warehouse.getCatalogQualifiedDbName(catName, dbName) + ", moving on");
+          LOG.warn("Failed to cache tables for database "
+              + Warehouse.getCatalogQualifiedDbName(catName, dbName) + ", moving on");
           // Continue with next database
           continue;
         }
+        tblsPendingPrewarm.addTableNamesForPrewarming(tblNames);
+        int totalTablesToCache = tblNames.size();
         int numberOfTablesCachedSoFar = 0;
-        for (String tblName : tblNames) {
-          tblName = StringUtils.normalizeIdentifier(tblName);
-          if (!shouldCacheTable(catName, dbName, tblName)) {
-            continue;
-
-          }
-          Table table;
+        while (tblsPendingPrewarm.hasMoreTablesToPrewarm()) {
           try {
-            table = rawStore.getTable(catName, dbName, tblName);
-          } catch (MetaException e) {
-            LOG.warn("Failed cache table " +
-                Warehouse.getCatalogQualifiedTableName(catName, dbName, tblName) +
-                ", moving on");
-            // It is possible the table is deleted during fetching tables of the database,
-            // in that case, continue with the next table
-            continue;
-          }
-          List<String> colNames = MetaStoreUtils.getColumnNamesForTable(table);
-          try {
-            ColumnStatistics tableColStats = null;
-            List<Partition> partitions = null;
-            List<ColumnStatistics> partitionColStats = null;
-            AggrStats aggrStatsAllPartitions = null;
-            AggrStats aggrStatsAllButDefaultPartition = null;
-            if (table.isSetPartitionKeys()) {
-              Deadline.startTimer("getPartitions");
-              partitions = rawStore.getPartitions(catName, dbName, tblName, Integer.MAX_VALUE);
-              Deadline.stopTimer();
-              List<String> partNames = new ArrayList<>(partitions.size());
-              for (Partition p : partitions) {
-                partNames.add(Warehouse.makePartName(table.getPartitionKeys(), p.getValues()));
-              }
-              if (!partNames.isEmpty()) {
-                // Get partition column stats for this table
-                Deadline.startTimer("getPartitionColumnStatistics");
-                partitionColStats =
-                    rawStore.getPartitionColumnStatistics(catName, dbName, tblName, partNames, colNames);
-                Deadline.stopTimer();
-                // Get aggregate stats for all partitions of a table and for all but default
-                // partition
-                Deadline.startTimer("getAggrPartitionColumnStatistics");
-                aggrStatsAllPartitions =
-                    rawStore.get_aggr_stats_for(catName, dbName, tblName, partNames, colNames);
+            String tblName =
+                StringUtils.normalizeIdentifier(tblsPendingPrewarm.getNextTableNameToPrewarm());
+            if (!shouldCacheTable(catName, dbName, tblName)) {
+              continue;
+            }
+            Table table;
+            try {
+              table = rawStore.getTable(catName, dbName, tblName);
+            } catch (MetaException e) {
+              // It is possible the table is deleted during fetching tables of the database,
+              // in that case, continue with the next table
+              continue;
+            }
+            List<String> colNames = MetaStoreUtils.getColumnNamesForTable(table);
+            try {
+              ColumnStatistics tableColStats = null;
+              List<Partition> partitions = null;
+              List<ColumnStatistics> partitionColStats = null;
+              AggrStats aggrStatsAllPartitions = null;
+              AggrStats aggrStatsAllButDefaultPartition = null;
+              if (table.isSetPartitionKeys()) {
+                Deadline.startTimer("getPartitions");
+                partitions = rawStore.getPartitions(catName, dbName, tblName, Integer.MAX_VALUE);
                 Deadline.stopTimer();
-                // Remove default partition from partition names and get aggregate
-                // stats again
-                List<FieldSchema> partKeys = table.getPartitionKeys();
-                String defaultPartitionValue =
-                    MetastoreConf.getVar(rawStore.getConf(), ConfVars.DEFAULTPARTITIONNAME);
-                List<String> partCols = new ArrayList<>();
-                List<String> partVals = new ArrayList<>();
-                for (FieldSchema fs : partKeys) {
-                  partCols.add(fs.getName());
-                  partVals.add(defaultPartitionValue);
+                List<String> partNames = new ArrayList<>(partitions.size());
+                for (Partition p : partitions) {
+                  partNames.add(Warehouse.makePartName(table.getPartitionKeys(), p.getValues()));
+                }
+                if (!partNames.isEmpty()) {
+                  // Get partition column stats for this table
+                  Deadline.startTimer("getPartitionColumnStatistics");
+                  partitionColStats = rawStore.getPartitionColumnStatistics(catName, dbName,
+                      tblName, partNames, colNames);
+                  Deadline.stopTimer();
+                  // Get aggregate stats for all partitions of a table and for all but default
+                  // partition
+                  Deadline.startTimer("getAggrPartitionColumnStatistics");
+                  aggrStatsAllPartitions =
+                      rawStore.get_aggr_stats_for(catName, dbName, tblName, partNames, colNames);
+                  Deadline.stopTimer();
+                  // Remove default partition from partition names and get aggregate
+                  // stats again
+                  List<FieldSchema> partKeys = table.getPartitionKeys();
+                  String defaultPartitionValue =
+                      MetastoreConf.getVar(rawStore.getConf(), ConfVars.DEFAULTPARTITIONNAME);
+                  List<String> partCols = new ArrayList<>();
+                  List<String> partVals = new ArrayList<>();
+                  for (FieldSchema fs : partKeys) {
+                    partCols.add(fs.getName());
+                    partVals.add(defaultPartitionValue);
+                  }
+                  String defaultPartitionName = FileUtils.makePartName(partCols, partVals);
+                  partNames.remove(defaultPartitionName);
+                  Deadline.startTimer("getAggrPartitionColumnStatistics");
+                  aggrStatsAllButDefaultPartition =
+                      rawStore.get_aggr_stats_for(catName, dbName, tblName, partNames, colNames);
+                  Deadline.stopTimer();
                 }
-                String defaultPartitionName = FileUtils.makePartName(partCols, partVals);
-                partNames.remove(defaultPartitionName);
-                Deadline.startTimer("getAggrPartitionColumnStatistics");
-                aggrStatsAllButDefaultPartition =
-                    rawStore.get_aggr_stats_for(catName, dbName, tblName, partNames, colNames);
+              } else {
+                Deadline.startTimer("getTableColumnStatistics");
+                tableColStats =
+                    rawStore.getTableColumnStatistics(catName, dbName, tblName, colNames);
                 Deadline.stopTimer();
               }
-            } else {
-              Deadline.startTimer("getTableColumnStatistics");
-              tableColStats = rawStore.getTableColumnStatistics(catName, dbName, tblName, colNames);
-              Deadline.stopTimer();
+              sharedCache.populateTableInCache(table, tableColStats, partitions, partitionColStats,
+                  aggrStatsAllPartitions, aggrStatsAllButDefaultPartition);
+            } catch (MetaException | NoSuchObjectException e) {
+              // Continue with next table
+              continue;
             }
-            sharedCache.populateTableInCache(table, tableColStats, partitions, partitionColStats,
-                aggrStatsAllPartitions, aggrStatsAllButDefaultPartition);
-          } catch (MetaException | NoSuchObjectException e) {
-            // Continue with next table
+            LOG.debug("Processed database: {}'s table: {}. Cached {} / {}  tables so far.", dbName,
+                tblName, ++numberOfTablesCachedSoFar, totalTablesToCache);
+          } catch (EmptyStackException e) {
+            // We've prewarmed this database, continue with the next one
             continue;
           }
-          LOG.debug("Processed database: {}'s table: {}. Cached {} / {}  tables so far.", dbName,
-              tblName, ++numberOfTablesCachedSoFar, tblNames.size());
         }
         LOG.debug("Processed database: {}. Cached {} / {} databases so far.", dbName,
             ++numberOfDatabasesCachedSoFar, databases.size());
@@ -344,6 +344,32 @@ public class CachedStore implements RawStore, Configurable {
     sharedCache.completeTableCachePrewarm();
   }
 
+  static class TablesPendingPrewarm {
+    private Stack<String> tableNames = new Stack<>();
+
+    private synchronized void addTableNamesForPrewarming(List<String> tblNames) {
+      tableNames.clear();
+      if (tblNames != null) {
+        tableNames.addAll(tblNames);
+      }
+    }
+
+    private synchronized boolean hasMoreTablesToPrewarm() {
+      return !tableNames.empty();
+    }
+
+    private synchronized String getNextTableNameToPrewarm() {
+      return tableNames.pop();
+    }
+
+    private synchronized void prioritizeTableForPrewarm(String tblName) {
+      // If the table is in the pending prewarm list, move it to the top
+      if (tableNames.remove(tblName)) {
+        tableNames.push(tblName);
+      }
+    }
+  }
+
   @VisibleForTesting
   static void setCachePrewarmedState(boolean state) {
     isCachePrewarmed.set(state);
@@ -830,6 +856,10 @@ public class CachedStore implements RawStore, Configurable {
     Table tbl = sharedCache.getTableFromCache(catName, dbName, tblName);
     if (tbl == null) {
       // This table is not yet loaded in cache
+      // If the prewarm thread is working on this table's database,
+      // let's move this table to the top of tblNamesBeingPrewarmed stack,
+      // so that it gets loaded to the cache faster and is available for subsequent requests
+      tblsPendingPrewarm.prioritizeTableForPrewarm(tblName);
       return rawStore.getTable(catName, dbName, tblName);
     }
     if (tbl != null) {