You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by vg...@apache.org on 2019/04/03 17:42:08 UTC
[hive] branch master updated: HIVE-21386: Extend the fetch task
enhancement done in HIVE-21279 to make it work with query result cache
(Vineet Garg, reviewed by Jason Dere)
This is an automated email from the ASF dual-hosted git repository.
vgarg pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new e8ca5a2 HIVE-21386: Extend the fetch task enhancement done in HIVE-21279 to make it work with query result cache (Vineet Garg, reviewed by Jason Dere)
e8ca5a2 is described below
commit e8ca5a27c8c1ea29b5b0b518f0061f10626a25cc
Author: Vineet Garg <vg...@apache.org>
AuthorDate: Wed Apr 3 10:41:30 2019 -0700
HIVE-21386: Extend the fetch task enhancement done in HIVE-21279 to make it work with query result cache (Vineet Garg, reviewed by Jason Dere)
---
ql/src/java/org/apache/hadoop/hive/ql/Context.java | 63 ++++++++++++--
ql/src/java/org/apache/hadoop/hive/ql/Driver.java | 11 ++-
.../hive/ql/cache/results/QueryResultsCache.java | 95 ++++++++--------------
.../org/apache/hadoop/hive/ql/exec/Utilities.java | 3 +-
.../hadoop/hive/ql/parse/SemanticAnalyzer.java | 35 ++++++--
.../llap/results_cache_diff_fs.q.out | 8 +-
6 files changed, 131 insertions(+), 84 deletions(-)
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Context.java b/ql/src/java/org/apache/hadoop/hive/ql/Context.java
index cbe0d04..7af4531 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/Context.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/Context.java
@@ -99,6 +99,9 @@ public class Context {
// Keeps track of scratch directories created for different scheme/authority
private final Map<String, Path> fsScratchDirs = new HashMap<String, Path>();
+ // keeps track of result cache dir for the query, later cleaned up by context cleanup
+ private Path fsResultCacheDirs = null;
+
private Configuration conf;
protected int pathid = 10000;
protected ExplainConfiguration explainConfig = null;
@@ -343,6 +346,7 @@ public class Context {
this.localScratchDir = ctx.localScratchDir;
this.scratchDirPermission = ctx.scratchDirPermission;
this.fsScratchDirs.putAll(ctx.fsScratchDirs);
+ this.fsResultCacheDirs = ctx.fsResultCacheDirs;
this.conf = ctx.conf;
this.pathid = ctx.pathid;
this.explainConfig = ctx.explainConfig;
@@ -372,6 +376,14 @@ public class Context {
return fsScratchDirs;
}
+ public void setFsResultCacheDirs(Path fsResultCacheDirs) {
+ this.fsResultCacheDirs = fsResultCacheDirs;
+ }
+
+ public Path getFsResultCacheDirs() {
+ return this.fsResultCacheDirs;
+ }
+
public Map<LoadTableDesc, WriteEntity> getLoadTableOutputMap() {
return loadTableOutputMap;
}
@@ -628,14 +640,40 @@ public class Context {
/**
* Remove any created scratch directories.
*/
+ public void removeResultCacheDir() {
+ if(this.fsResultCacheDirs != null) {
+ try {
+ Path p = this.fsResultCacheDirs;
+ FileSystem fs = p.getFileSystem(conf);
+ LOG.debug("Deleting result cache dir: {}", p);
+ fs.delete(p, true);
+ fs.cancelDeleteOnExit(p);
+ } catch (Exception e) {
+ LOG.warn("Error Removing result cache dir: "
+ + StringUtils.stringifyException(e));
+ }
+ }
+ }
+
+ /**
+ * Remove any created scratch directories.
+ */
public void removeScratchDir() {
+ String resultCacheDir = null;
+ if(this.fsResultCacheDirs != null) {
+ resultCacheDir = this.fsResultCacheDirs.toUri().getPath();
+ }
for (Map.Entry<String, Path> entry : fsScratchDirs.entrySet()) {
try {
Path p = entry.getValue();
+ if(resultCacheDir == null || !p.toUri().getPath().contains(resultCacheDir)) {
+ // delete only the paths which aren't result cache dir path
+ // because that will be taken care by removeResultCacheDir
FileSystem fs = p.getFileSystem(conf);
LOG.debug("Deleting scratch dir: {}", p);
fs.delete(p, true);
fs.cancelDeleteOnExit(p);
+ }
} catch (Exception e) {
LOG.warn("Error Removing Scratch: "
+ StringUtils.stringifyException(e));
@@ -774,21 +812,25 @@ public class Context {
resDirPaths = null;
}
- public void clear() throws IOException {
+ public void clear() throws IOException{
+ this.clear(true);
+ }
+
+ public void clear(boolean deleteResultDir) throws IOException {
// First clear the other contexts created by this query
for (Context subContext : rewrittenStatementContexts) {
subContext.clear();
}
// Then clear this context
- if (resDir != null) {
- try {
- FileSystem fs = resDir.getFileSystem(conf);
- LOG.debug("Deleting result dir: {}", resDir);
- fs.delete(resDir, true);
- } catch (IOException e) {
- LOG.info("Context clear error: " + StringUtils.stringifyException(e));
+ if (resDir != null) {
+ try {
+ FileSystem fs = resDir.getFileSystem(conf);
+ LOG.debug("Deleting result dir: {}", resDir);
+ fs.delete(resDir, true);
+ } catch (IOException e) {
+ LOG.info("Context clear error: " + StringUtils.stringifyException(e));
+ }
}
- }
if (resFile != null) {
try {
@@ -799,6 +841,9 @@ public class Context {
LOG.info("Context clear error: " + StringUtils.stringifyException(e));
}
}
+ if(deleteResultDir) {
+ removeResultCacheDir();
+ }
removeMaterializedCTEs();
removeScratchDir();
originalTracker = null;
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
index cac14a6..4f14fa5 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
@@ -2835,7 +2835,14 @@ public class Driver implements IDriver {
private void releaseContext() {
try {
if (ctx != null) {
- ctx.clear();
+ boolean deleteResultDir = true;
+ // don't let context delete result dirs and scratch dirs if result was cached
+ if(this.cacheUsage != null
+ && this.cacheUsage.getStatus() == CacheUsage.CacheStatus.QUERY_USING_CACHE) {
+ deleteResultDir = false;
+
+ }
+ ctx.clear(deleteResultDir);
if (ctx.getHiveLocks() != null) {
hiveLocks.addAll(ctx.getHiveLocks());
ctx.setHiveLocks(null);
@@ -2931,10 +2938,10 @@ public class Driver implements IDriver {
lDrvState.abort();
}
releasePlan();
+ releaseContext();
releaseCachedResult();
releaseFetchTask();
releaseResStream();
- releaseContext();
lDrvState.driverState = DriverState.CLOSED;
} finally {
lDrvState.stateLock.unlock();
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/cache/results/QueryResultsCache.java b/ql/src/java/org/apache/hadoop/hive/ql/cache/results/QueryResultsCache.java
index 0b7166b..517ead8 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/cache/results/QueryResultsCache.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/cache/results/QueryResultsCache.java
@@ -48,6 +48,7 @@ import java.util.stream.Stream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.ContentSummary;
+import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
@@ -191,6 +192,7 @@ public final class QueryResultsCache {
private QueryInfo queryInfo;
private FetchWork fetchWork;
private Path cachedResultsPath;
+ private Set<FileStatus> cachedResultPaths;
// Cache administration
private long size;
@@ -275,8 +277,9 @@ public final class QueryResultsCache {
public FetchWork getFetchWork() {
// FetchWork's sink is used to hold results, so each query needs a separate copy of FetchWork
- FetchWork fetch = new FetchWork(cachedResultsPath, fetchWork.getTblDesc(), fetchWork.getLimit());
+ FetchWork fetch = new FetchWork(fetchWork.getTblDir(), fetchWork.getTblDesc(), fetchWork.getLimit());
fetch.setCachedResult(true);
+ fetch.setFilesToFetch(this.cachedResultPaths);
return fetch;
}
@@ -409,6 +412,10 @@ public final class QueryResultsCache {
return instance;
}
+ public Path getCacheDirPath() {
+ return cacheDirPath;
+ }
+
/**
* Check if the cache contains an entry for the requested LookupInfo.
* @param request
@@ -517,30 +524,26 @@ public final class QueryResultsCache {
* @return
*/
public boolean setEntryValid(CacheEntry cacheEntry, FetchWork fetchWork) {
- String queryText = cacheEntry.getQueryText();
- boolean dataDirMoved = false;
Path queryResultsPath = null;
Path cachedResultsPath = null;
try {
- boolean requiresMove = true;
+ // if we are here file sink op should have created files to fetch from
+ assert(fetchWork.getFilesToFetch() != null );
+
+ boolean requiresCaching = true;
queryResultsPath = fetchWork.getTblDir();
FileSystem resultsFs = queryResultsPath.getFileSystem(conf);
- long resultSize;
- if (resultsFs.exists(queryResultsPath)) {
- ContentSummary cs = resultsFs.getContentSummary(queryResultsPath);
- resultSize = cs.getLength();
- } else {
- // No actual result directory, no need to move anything.
- cachedResultsPath = zeroRowsPath;
- // Even if there are no results to move, at least check that we have permission
- // to check the existence of zeroRowsPath, or the read using the cache will fail.
- // A failure here will cause this query to not be added to the cache.
- FileSystem cacheFs = cachedResultsPath.getFileSystem(conf);
- boolean fakePathExists = cacheFs.exists(zeroRowsPath);
-
- resultSize = 0;
- requiresMove = false;
+
+ long resultSize = 0;
+ for(FileStatus fs:fetchWork.getFilesToFetch()) {
+ if(resultsFs.exists(fs.getPath())) {
+ resultSize += fs.getLen();
+ } else {
+ // No actual result directory, no need to cache anything.
+ requiresCaching = false;
+ break;
+ }
}
if (!shouldEntryBeAdded(cacheEntry, resultSize)) {
@@ -555,20 +558,22 @@ public final class QueryResultsCache {
return false;
}
- if (requiresMove) {
- // Move the query results to the query cache directory.
- cachedResultsPath = moveResultsToCacheDirectory(queryResultsPath);
- dataDirMoved = true;
+ if (requiresCaching) {
+ cacheEntry.cachedResultPaths = new HashSet<>();
+ for(FileStatus fs:fetchWork.getFilesToFetch()) {
+ cacheEntry.cachedResultPaths.add(fs);
+ }
+ LOG.info("Cached query result paths located at {} (size {}) for query '{}'",
+ queryResultsPath, resultSize, cacheEntry.getQueryText());
}
- LOG.info("Moved query results from {} to {} (size {}) for query '{}'",
- queryResultsPath, cachedResultsPath, resultSize, queryText);
// Create a new FetchWork to reference the new cache location.
FetchWork fetchWorkForCache =
- new FetchWork(cachedResultsPath, fetchWork.getTblDesc(), fetchWork.getLimit());
+ new FetchWork(fetchWork.getTblDir(), fetchWork.getTblDesc(), fetchWork.getLimit());
fetchWorkForCache.setCachedResult(true);
+ fetchWorkForCache.setFilesToFetch(fetchWork.getFilesToFetch());
cacheEntry.fetchWork = fetchWorkForCache;
- cacheEntry.cachedResultsPath = cachedResultsPath;
+ //cacheEntry.cachedResultsPath = cachedResultsPath;
cacheEntry.size = resultSize;
this.cacheSize += resultSize;
@@ -585,23 +590,10 @@ public final class QueryResultsCache {
incrementMetric(MetricsConstant.QC_VALID_ENTRIES);
incrementMetric(MetricsConstant.QC_TOTAL_ENTRIES_ADDED);
} catch (Exception err) {
+ String queryText = cacheEntry.getQueryText();
LOG.error("Failed to create cache entry for query results for query: " + queryText, err);
-
- if (dataDirMoved) {
- // If data was moved from original location to cache directory, we need to move it back!
- LOG.info("Restoring query results from {} back to {}", cachedResultsPath, queryResultsPath);
- try {
- FileSystem fs = cachedResultsPath.getFileSystem(conf);
- fs.rename(cachedResultsPath, queryResultsPath);
- cacheEntry.size = 0;
- cacheEntry.cachedResultsPath = null;
- } catch (Exception err2) {
- String errMsg = "Failed cleanup during failed attempt to cache query: " + queryText;
- LOG.error(errMsg);
- throw new RuntimeException(errMsg);
- }
- }
-
+ cacheEntry.size = 0;
+ cacheEntry.cachedResultsPath = null;
// Invalidate the entry. Rely on query cleanup to remove from lookup.
cacheEntry.invalidate();
return false;
@@ -783,23 +775,6 @@ public final class QueryResultsCache {
return true;
}
- private Path moveResultsToCacheDirectory(Path queryResultsPath) throws IOException {
- String dirName = UUID.randomUUID().toString();
- Path cachedResultsPath = new Path(cacheDirPath, dirName);
- FileSystem fs = cachedResultsPath.getFileSystem(conf);
- try {
- boolean resultsMoved = Hive.moveFile(conf, queryResultsPath, cachedResultsPath, false, false, false);
- if (!resultsMoved) {
- throw new IOException("Failed to move " + queryResultsPath + " to " + cachedResultsPath);
- }
- } catch (IOException err) {
- throw err;
- } catch (Exception err) {
- throw new IOException("Error moving " + queryResultsPath + " to " + cachedResultsPath, err);
- }
- return cachedResultsPath;
- }
-
private boolean hasSpaceForCacheEntry(CacheEntry entry, long size) {
if (maxCacheSize >= 0) {
return (cacheSize + size) <= maxCacheSize;
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
index 052b70f..36bc08f 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
@@ -1487,8 +1487,7 @@ public final class Utilities {
// * query cache is disabled
// * if it is select query
if (conf != null && conf.getIsQuery() && conf.getFilesToFetch() != null
- && HiveConf.getVar(hConf, ConfVars.HIVE_EXECUTION_ENGINE).equalsIgnoreCase("tez")
- && !HiveConf.getBoolVar(hConf, ConfVars.HIVE_QUERY_RESULTS_CACHE_ENABLED)){
+ && HiveConf.getVar(hConf, ConfVars.HIVE_EXECUTION_ENGINE).equalsIgnoreCase("tez")){
return true;
}
return false;
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
index 6b0fe73..73ae3ba 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
@@ -42,6 +42,7 @@ import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.TreeSet;
+import java.util.UUID;
import java.util.regex.Pattern;
import java.util.regex.PatternSyntaxException;
import java.util.stream.Collectors;
@@ -7176,6 +7177,23 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
return currUDF;
}
+ private Path getDestinationFilePath(final String destinationFile, boolean isMmTable)
+ throws SemanticException {
+ if (this.isResultsCacheEnabled() && this.queryTypeCanUseCache()) {
+ assert (!isMmTable);
+ QueryResultsCache instance = QueryResultsCache.getInstance();
+ // QueryResultsCache should have been initialized by now
+ if (instance != null) {
+ Path resultCacheTopDir = instance.getCacheDirPath();
+ String dirName = UUID.randomUUID().toString();
+ Path resultDir = new Path(resultCacheTopDir, dirName);
+ this.ctx.setFsResultCacheDirs(resultDir);
+ return resultDir;
+ }
+ }
+ return new Path(destinationFile);
+ }
+
@SuppressWarnings("nls")
protected Operator genFileSinkPlan(String dest, QB qb, Operator input)
throws SemanticException {
@@ -7440,7 +7458,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
isLocal = true;
// fall through
case QBMetaData.DEST_DFS_FILE: {
- destinationPath = new Path(qbm.getDestFileForAlias(dest));
+ destinationPath = getDestinationFilePath(qbm.getDestFileForAlias(dest), isMmTable);
// CTAS case: the file output format and serde are defined by the create
// table command rather than taking the default value
@@ -15062,6 +15080,9 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
* Some initial checks for a query to see if we can look this query up in the results cache.
*/
private boolean queryTypeCanUseCache() {
+ if(this.qb == null || this.qb.getParseInfo() == null) {
+ return false;
+ }
if (this instanceof ColumnStatsSemanticAnalyzer) {
// Column stats generates "select compute_stats() .." queries.
// Disable caching for these.
@@ -15072,13 +15093,13 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
return false;
}
- if (qb.getParseInfo().isAnalyzeCommand()) {
- return false;
- }
+ if (qb.getParseInfo().isAnalyzeCommand()) {
+ return false;
+ }
- if (qb.getParseInfo().hasInsertTables()) {
- return false;
- }
+ if (qb.getParseInfo().hasInsertTables()) {
+ return false;
+ }
// HIVE-19096 - disable for explain analyze
if (ctx.getExplainAnalyze() != null) {
diff --git a/ql/src/test/results/clientpositive/llap/results_cache_diff_fs.q.out b/ql/src/test/results/clientpositive/llap/results_cache_diff_fs.q.out
index ed6af87..54a3451 100644
--- a/ql/src/test/results/clientpositive/llap/results_cache_diff_fs.q.out
+++ b/ql/src/test/results/clientpositive/llap/results_cache_diff_fs.q.out
@@ -3,12 +3,12 @@ PREHOOK: query: explain
select count(*) from src a join src b on (a.key = b.key)
PREHOOK: type: QUERY
PREHOOK: Input: default@src
-PREHOOK: Output: hdfs://### HDFS PATH ###
+#### A masked pattern was here ####
POSTHOOK: query: explain
select count(*) from src a join src b on (a.key = b.key)
POSTHOOK: type: QUERY
POSTHOOK: Input: default@src
-POSTHOOK: Output: hdfs://### HDFS PATH ###
+#### A masked pattern was here ####
STAGE DEPENDENCIES:
Stage-1 is a root stage
Stage-0 depends on stages: Stage-1
@@ -107,11 +107,11 @@ STAGE PLANS:
PREHOOK: query: select count(*) from src a join src b on (a.key = b.key)
PREHOOK: type: QUERY
PREHOOK: Input: default@src
-PREHOOK: Output: hdfs://### HDFS PATH ###
+#### A masked pattern was here ####
POSTHOOK: query: select count(*) from src a join src b on (a.key = b.key)
POSTHOOK: type: QUERY
POSTHOOK: Input: default@src
-POSTHOOK: Output: hdfs://### HDFS PATH ###
+#### A masked pattern was here ####
1028
test.comment="Cache should be used for this query"
PREHOOK: query: explain