You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by vg...@apache.org on 2019/04/03 17:42:08 UTC

[hive] branch master updated: HIVE-21386: Extend the fetch task enhancement done in HIVE-21279 to make it work with query result cache (Vineet Garg, reviewed by Jason Dere)

This is an automated email from the ASF dual-hosted git repository.

vgarg pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new e8ca5a2  HIVE-21386: Extend the fetch task enhancement done in HIVE-21279 to make it work with query result cache (Vineet Garg, reviewed by Jason Dere)
e8ca5a2 is described below

commit e8ca5a27c8c1ea29b5b0b518f0061f10626a25cc
Author: Vineet Garg <vg...@apache.org>
AuthorDate: Wed Apr 3 10:41:30 2019 -0700

    HIVE-21386: Extend the fetch task enhancement done in HIVE-21279 to make it work with query result cache (Vineet Garg, reviewed by Jason Dere)
---
 ql/src/java/org/apache/hadoop/hive/ql/Context.java | 63 ++++++++++++--
 ql/src/java/org/apache/hadoop/hive/ql/Driver.java  | 11 ++-
 .../hive/ql/cache/results/QueryResultsCache.java   | 95 ++++++++--------------
 .../org/apache/hadoop/hive/ql/exec/Utilities.java  |  3 +-
 .../hadoop/hive/ql/parse/SemanticAnalyzer.java     | 35 ++++++--
 .../llap/results_cache_diff_fs.q.out               |  8 +-
 6 files changed, 131 insertions(+), 84 deletions(-)

diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Context.java b/ql/src/java/org/apache/hadoop/hive/ql/Context.java
index cbe0d04..7af4531 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/Context.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/Context.java
@@ -99,6 +99,9 @@ public class Context {
   // Keeps track of scratch directories created for different scheme/authority
   private final Map<String, Path> fsScratchDirs = new HashMap<String, Path>();
 
+  // keeps track of result cache dir for the query, later cleaned up by context cleanup
+  private Path fsResultCacheDirs = null;
+
   private Configuration conf;
   protected int pathid = 10000;
   protected ExplainConfiguration explainConfig = null;
@@ -343,6 +346,7 @@ public class Context {
     this.localScratchDir = ctx.localScratchDir;
     this.scratchDirPermission = ctx.scratchDirPermission;
     this.fsScratchDirs.putAll(ctx.fsScratchDirs);
+    this.fsResultCacheDirs = ctx.fsResultCacheDirs;
     this.conf = ctx.conf;
     this.pathid = ctx.pathid;
     this.explainConfig = ctx.explainConfig;
@@ -372,6 +376,14 @@ public class Context {
     return fsScratchDirs;
   }
 
+  public void setFsResultCacheDirs(Path fsResultCacheDirs) {
+    this.fsResultCacheDirs = fsResultCacheDirs;
+  }
+
+  public Path getFsResultCacheDirs() {
+    return this.fsResultCacheDirs;
+  }
+
   public Map<LoadTableDesc, WriteEntity> getLoadTableOutputMap() {
     return loadTableOutputMap;
   }
@@ -628,14 +640,40 @@ public class Context {
   /**
    * Remove any created scratch directories.
    */
+  public void removeResultCacheDir() {
+    if(this.fsResultCacheDirs != null) {
+      try {
+        Path p = this.fsResultCacheDirs;
+        FileSystem fs = p.getFileSystem(conf);
+        LOG.debug("Deleting result cache dir: {}", p);
+        fs.delete(p, true);
+        fs.cancelDeleteOnExit(p);
+      } catch (Exception e) {
+        LOG.warn("Error Removing result cache dir: "
+                     + StringUtils.stringifyException(e));
+      }
+    }
+  }
+
+  /**
+   * Remove any created scratch directories.
+   */
   public void removeScratchDir() {
+    String resultCacheDir = null;
+    if(this.fsResultCacheDirs != null) {
+      resultCacheDir = this.fsResultCacheDirs.toUri().getPath();
+    }
     for (Map.Entry<String, Path> entry : fsScratchDirs.entrySet()) {
       try {
         Path p = entry.getValue();
+        if(resultCacheDir == null || !p.toUri().getPath().contains(resultCacheDir)) {
+          // delete only the paths which aren't result cache dir path
+          // because that will be taken care by removeResultCacheDir
         FileSystem fs = p.getFileSystem(conf);
         LOG.debug("Deleting scratch dir: {}",  p);
         fs.delete(p, true);
         fs.cancelDeleteOnExit(p);
+        }
       } catch (Exception e) {
         LOG.warn("Error Removing Scratch: "
             + StringUtils.stringifyException(e));
@@ -774,21 +812,25 @@ public class Context {
     resDirPaths = null;
   }
 
-  public void clear() throws IOException {
+  public void clear() throws IOException{
+    this.clear(true);
+  }
+
+  public void clear(boolean deleteResultDir) throws IOException {
     // First clear the other contexts created by this query
     for (Context subContext : rewrittenStatementContexts) {
       subContext.clear();
     }
     // Then clear this context
-    if (resDir != null) {
-      try {
-        FileSystem fs = resDir.getFileSystem(conf);
-        LOG.debug("Deleting result dir: {}",  resDir);
-        fs.delete(resDir, true);
-      } catch (IOException e) {
-        LOG.info("Context clear error: " + StringUtils.stringifyException(e));
+      if (resDir != null) {
+        try {
+          FileSystem fs = resDir.getFileSystem(conf);
+          LOG.debug("Deleting result dir: {}", resDir);
+          fs.delete(resDir, true);
+        } catch (IOException e) {
+          LOG.info("Context clear error: " + StringUtils.stringifyException(e));
+        }
       }
-    }
 
     if (resFile != null) {
       try {
@@ -799,6 +841,9 @@ public class Context {
         LOG.info("Context clear error: " + StringUtils.stringifyException(e));
       }
     }
+    if(deleteResultDir) {
+      removeResultCacheDir();
+    }
     removeMaterializedCTEs();
     removeScratchDir();
     originalTracker = null;
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
index cac14a6..4f14fa5 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
@@ -2835,7 +2835,14 @@ public class Driver implements IDriver {
   private void releaseContext() {
     try {
       if (ctx != null) {
-        ctx.clear();
+        boolean deleteResultDir = true;
+        // don't let context delete result dirs and scratch dirs if result was cached
+        if(this.cacheUsage != null
+            && this.cacheUsage.getStatus() == CacheUsage.CacheStatus.QUERY_USING_CACHE) {
+          deleteResultDir = false;
+
+        }
+        ctx.clear(deleteResultDir);
         if (ctx.getHiveLocks() != null) {
           hiveLocks.addAll(ctx.getHiveLocks());
           ctx.setHiveLocks(null);
@@ -2931,10 +2938,10 @@ public class Driver implements IDriver {
         lDrvState.abort();
       }
       releasePlan();
+      releaseContext();
       releaseCachedResult();
       releaseFetchTask();
       releaseResStream();
-      releaseContext();
       lDrvState.driverState = DriverState.CLOSED;
     } finally {
       lDrvState.stateLock.unlock();
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/cache/results/QueryResultsCache.java b/ql/src/java/org/apache/hadoop/hive/ql/cache/results/QueryResultsCache.java
index 0b7166b..517ead8 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/cache/results/QueryResultsCache.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/cache/results/QueryResultsCache.java
@@ -48,6 +48,7 @@ import java.util.stream.Stream;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.ContentSummary;
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
@@ -191,6 +192,7 @@ public final class QueryResultsCache {
     private QueryInfo queryInfo;
     private FetchWork fetchWork;
     private Path cachedResultsPath;
+    private Set<FileStatus> cachedResultPaths;
 
     // Cache administration
     private long size;
@@ -275,8 +277,9 @@ public final class QueryResultsCache {
 
     public FetchWork getFetchWork() {
       // FetchWork's sink is used to hold results, so each query needs a separate copy of FetchWork
-      FetchWork fetch = new FetchWork(cachedResultsPath, fetchWork.getTblDesc(), fetchWork.getLimit());
+      FetchWork fetch = new FetchWork(fetchWork.getTblDir(), fetchWork.getTblDesc(), fetchWork.getLimit());
       fetch.setCachedResult(true);
+      fetch.setFilesToFetch(this.cachedResultPaths);
       return fetch;
     }
 
@@ -409,6 +412,10 @@ public final class QueryResultsCache {
     return instance;
   }
 
+  public Path getCacheDirPath() {
+    return cacheDirPath;
+  }
+
   /**
    * Check if the cache contains an entry for the requested LookupInfo.
    * @param request
@@ -517,30 +524,26 @@ public final class QueryResultsCache {
    * @return
    */
   public boolean setEntryValid(CacheEntry cacheEntry, FetchWork fetchWork) {
-    String queryText = cacheEntry.getQueryText();
-    boolean dataDirMoved = false;
     Path queryResultsPath = null;
     Path cachedResultsPath = null;
 
     try {
-      boolean requiresMove = true;
+      // if we are here file sink op should have created files to fetch from
+      assert(fetchWork.getFilesToFetch() != null );
+
+      boolean requiresCaching = true;
       queryResultsPath = fetchWork.getTblDir();
       FileSystem resultsFs = queryResultsPath.getFileSystem(conf);
-      long resultSize;
-      if (resultsFs.exists(queryResultsPath)) {
-        ContentSummary cs = resultsFs.getContentSummary(queryResultsPath);
-        resultSize = cs.getLength();
-      } else {
-        // No actual result directory, no need to move anything.
-        cachedResultsPath = zeroRowsPath;
-        // Even if there are no results to move, at least check that we have permission
-        // to check the existence of zeroRowsPath, or the read using the cache will fail.
-        // A failure here will cause this query to not be added to the cache.
-        FileSystem cacheFs = cachedResultsPath.getFileSystem(conf);
-        boolean fakePathExists = cacheFs.exists(zeroRowsPath);
-
-        resultSize = 0;
-        requiresMove = false;
+
+      long resultSize = 0;
+      for(FileStatus fs:fetchWork.getFilesToFetch()) {
+        if(resultsFs.exists(fs.getPath())) {
+          resultSize +=  fs.getLen();
+        } else {
+          // No actual result directory, no need to cache anything.
+          requiresCaching = false;
+          break;
+        }
       }
 
       if (!shouldEntryBeAdded(cacheEntry, resultSize)) {
@@ -555,20 +558,22 @@ public final class QueryResultsCache {
           return false;
         }
 
-        if (requiresMove) {
-          // Move the query results to the query cache directory.
-          cachedResultsPath = moveResultsToCacheDirectory(queryResultsPath);
-          dataDirMoved = true;
+        if (requiresCaching) {
+          cacheEntry.cachedResultPaths = new HashSet<>();
+            for(FileStatus fs:fetchWork.getFilesToFetch()) {
+              cacheEntry.cachedResultPaths.add(fs);
+            }
+          LOG.info("Cached query result paths located at {} (size {}) for query '{}'",
+              queryResultsPath, resultSize, cacheEntry.getQueryText());
         }
-        LOG.info("Moved query results from {} to {} (size {}) for query '{}'",
-            queryResultsPath, cachedResultsPath, resultSize, queryText);
 
         // Create a new FetchWork to reference the new cache location.
         FetchWork fetchWorkForCache =
-            new FetchWork(cachedResultsPath, fetchWork.getTblDesc(), fetchWork.getLimit());
+            new FetchWork(fetchWork.getTblDir(), fetchWork.getTblDesc(), fetchWork.getLimit());
         fetchWorkForCache.setCachedResult(true);
+        fetchWorkForCache.setFilesToFetch(fetchWork.getFilesToFetch());
         cacheEntry.fetchWork = fetchWorkForCache;
-        cacheEntry.cachedResultsPath = cachedResultsPath;
+        //cacheEntry.cachedResultsPath = cachedResultsPath;
         cacheEntry.size = resultSize;
         this.cacheSize += resultSize;
 
@@ -585,23 +590,10 @@ public final class QueryResultsCache {
       incrementMetric(MetricsConstant.QC_VALID_ENTRIES);
       incrementMetric(MetricsConstant.QC_TOTAL_ENTRIES_ADDED);
     } catch (Exception err) {
+      String queryText = cacheEntry.getQueryText();
       LOG.error("Failed to create cache entry for query results for query: " + queryText, err);
-
-      if (dataDirMoved) {
-        // If data was moved from original location to cache directory, we need to move it back!
-        LOG.info("Restoring query results from {} back to {}", cachedResultsPath, queryResultsPath);
-        try {
-          FileSystem fs = cachedResultsPath.getFileSystem(conf);
-          fs.rename(cachedResultsPath, queryResultsPath);
-          cacheEntry.size = 0;
-          cacheEntry.cachedResultsPath = null;
-        } catch (Exception err2) {
-          String errMsg = "Failed cleanup during failed attempt to cache query: " + queryText;
-          LOG.error(errMsg);
-          throw new RuntimeException(errMsg);
-        }
-      }
-
+      cacheEntry.size = 0;
+      cacheEntry.cachedResultsPath = null;
       // Invalidate the entry. Rely on query cleanup to remove from lookup.
       cacheEntry.invalidate();
       return false;
@@ -783,23 +775,6 @@ public final class QueryResultsCache {
     return true;
   }
 
-  private Path moveResultsToCacheDirectory(Path queryResultsPath) throws IOException {
-    String dirName = UUID.randomUUID().toString();
-    Path cachedResultsPath = new Path(cacheDirPath, dirName);
-    FileSystem fs = cachedResultsPath.getFileSystem(conf);
-    try {
-      boolean resultsMoved = Hive.moveFile(conf, queryResultsPath, cachedResultsPath, false, false, false);
-      if (!resultsMoved) {
-        throw new IOException("Failed to move " + queryResultsPath + " to " + cachedResultsPath);
-      }
-    } catch (IOException err) {
-      throw err;
-    } catch (Exception err) {
-      throw new IOException("Error moving " + queryResultsPath + " to " + cachedResultsPath, err);
-    }
-    return cachedResultsPath;
-  }
-
   private boolean hasSpaceForCacheEntry(CacheEntry entry, long size) {
     if (maxCacheSize >= 0) {
       return (cacheSize + size) <= maxCacheSize;
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
index 052b70f..36bc08f 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
@@ -1487,8 +1487,7 @@ public final class Utilities {
     //  * query cache is disabled
     //  * if it is select query
     if (conf != null && conf.getIsQuery() && conf.getFilesToFetch() != null
-        && HiveConf.getVar(hConf, ConfVars.HIVE_EXECUTION_ENGINE).equalsIgnoreCase("tez")
-        && !HiveConf.getBoolVar(hConf, ConfVars.HIVE_QUERY_RESULTS_CACHE_ENABLED)){
+        && HiveConf.getVar(hConf, ConfVars.HIVE_EXECUTION_ENGINE).equalsIgnoreCase("tez")){
       return true;
     }
     return false;
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
index 6b0fe73..73ae3ba 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
@@ -42,6 +42,7 @@ import java.util.Set;
 import java.util.SortedMap;
 import java.util.TreeMap;
 import java.util.TreeSet;
+import java.util.UUID;
 import java.util.regex.Pattern;
 import java.util.regex.PatternSyntaxException;
 import java.util.stream.Collectors;
@@ -7176,6 +7177,23 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
     return currUDF;
   }
 
+  private Path getDestinationFilePath(final String destinationFile, boolean isMmTable)
+      throws SemanticException {
+    if (this.isResultsCacheEnabled() && this.queryTypeCanUseCache()) {
+      assert (!isMmTable);
+      QueryResultsCache instance = QueryResultsCache.getInstance();
+      // QueryResultsCache should have been initialized by now
+      if (instance != null) {
+        Path resultCacheTopDir = instance.getCacheDirPath();
+        String dirName = UUID.randomUUID().toString();
+        Path resultDir = new Path(resultCacheTopDir, dirName);
+        this.ctx.setFsResultCacheDirs(resultDir);
+        return resultDir;
+      }
+    }
+    return new Path(destinationFile);
+  }
+
   @SuppressWarnings("nls")
   protected Operator genFileSinkPlan(String dest, QB qb, Operator input)
       throws SemanticException {
@@ -7440,7 +7458,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
       isLocal = true;
       // fall through
     case QBMetaData.DEST_DFS_FILE: {
-      destinationPath = new Path(qbm.getDestFileForAlias(dest));
+      destinationPath = getDestinationFilePath(qbm.getDestFileForAlias(dest), isMmTable);
 
       // CTAS case: the file output format and serde are defined by the create
       // table command rather than taking the default value
@@ -15062,6 +15080,9 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
    * Some initial checks for a query to see if we can look this query up in the results cache.
    */
   private boolean queryTypeCanUseCache() {
+    if(this.qb == null || this.qb.getParseInfo() == null) {
+      return false;
+    }
     if (this instanceof ColumnStatsSemanticAnalyzer) {
       // Column stats generates "select compute_stats() .." queries.
       // Disable caching for these.
@@ -15072,13 +15093,13 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
       return false;
     }
 
-    if (qb.getParseInfo().isAnalyzeCommand()) {
-      return false;
-    }
+      if (qb.getParseInfo().isAnalyzeCommand()) {
+        return false;
+      }
 
-    if (qb.getParseInfo().hasInsertTables()) {
-      return false;
-    }
+      if (qb.getParseInfo().hasInsertTables()) {
+        return false;
+      }
 
     // HIVE-19096 - disable for explain analyze
     if (ctx.getExplainAnalyze() != null) {
diff --git a/ql/src/test/results/clientpositive/llap/results_cache_diff_fs.q.out b/ql/src/test/results/clientpositive/llap/results_cache_diff_fs.q.out
index ed6af87..54a3451 100644
--- a/ql/src/test/results/clientpositive/llap/results_cache_diff_fs.q.out
+++ b/ql/src/test/results/clientpositive/llap/results_cache_diff_fs.q.out
@@ -3,12 +3,12 @@ PREHOOK: query: explain
 select count(*) from src a join src b on (a.key = b.key)
 PREHOOK: type: QUERY
 PREHOOK: Input: default@src
-PREHOOK: Output: hdfs://### HDFS PATH ###
+#### A masked pattern was here ####
 POSTHOOK: query: explain
 select count(*) from src a join src b on (a.key = b.key)
 POSTHOOK: type: QUERY
 POSTHOOK: Input: default@src
-POSTHOOK: Output: hdfs://### HDFS PATH ###
+#### A masked pattern was here ####
 STAGE DEPENDENCIES:
   Stage-1 is a root stage
   Stage-0 depends on stages: Stage-1
@@ -107,11 +107,11 @@ STAGE PLANS:
 PREHOOK: query: select count(*) from src a join src b on (a.key = b.key)
 PREHOOK: type: QUERY
 PREHOOK: Input: default@src
-PREHOOK: Output: hdfs://### HDFS PATH ###
+#### A masked pattern was here ####
 POSTHOOK: query: select count(*) from src a join src b on (a.key = b.key)
 POSTHOOK: type: QUERY
 POSTHOOK: Input: default@src
-POSTHOOK: Output: hdfs://### HDFS PATH ###
+#### A masked pattern was here ####
 1028
 test.comment="Cache should be used for this query"
 PREHOOK: query: explain