You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by jd...@apache.org on 2018/03/15 22:57:54 UTC
hive git commit: HIVE-18846: Query results cache: Allow queries to
refer to the pending results of a query that has not finished yet (Jason Dere,
reviewed by GopalV)
Repository: hive
Updated Branches:
refs/heads/master 165e35c04 -> 50e0077a8
HIVE-18846: Query results cache: Allow queries to refer to the pending results of a query that has not finished yet (Jason Dere, reviewed by GopalV)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/50e0077a
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/50e0077a
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/50e0077a
Branch: refs/heads/master
Commit: 50e0077a8c1fc3914b22c496b73833e004d14658
Parents: 165e35c
Author: Jason Dere <jd...@hortonworks.com>
Authored: Thu Mar 15 15:57:29 2018 -0700
Committer: Jason Dere <jd...@hortonworks.com>
Committed: Thu Mar 15 15:57:29 2018 -0700
----------------------------------------------------------------------
.../org/apache/hadoop/hive/conf/HiveConf.java | 4 +
.../test/resources/testconfiguration.properties | 1 +
.../java/org/apache/hadoop/hive/ql/Driver.java | 62 +++-
.../ql/cache/results/QueryResultsCache.java | 324 +++++++++++--------
.../hadoop/hive/ql/parse/SemanticAnalyzer.java | 36 ++-
.../clientpositive/results_cache_empty_result.q | 13 +
.../llap/results_cache_empty_result.q.out | 99 ++++++
.../results_cache_empty_result.q.out | 91 ++++++
8 files changed, 479 insertions(+), 151 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/50e0077a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 907d064..8bbf1be 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -3713,6 +3713,10 @@ public class HiveConf extends Configuration {
"If the query results cache is enabled. This will keep results of previously executed queries " +
"to be reused if the same query is executed again."),
+ HIVE_QUERY_RESULTS_CACHE_WAIT_FOR_PENDING_RESULTS("hive.query.results.cache.wait.for.pending.results", true,
+ "Should a query wait for the pending results of an already running query, " +
+ "in order to use the cached result when it becomes ready"),
+
HIVE_QUERY_RESULTS_CACHE_DIRECTORY("hive.query.results.cache.directory",
"/tmp/hive/_resultscache_",
"Location of the query results cache directory. Temporary results from queries " +
http://git-wip-us.apache.org/repos/asf/hive/blob/50e0077a/itests/src/test/resources/testconfiguration.properties
----------------------------------------------------------------------
diff --git a/itests/src/test/resources/testconfiguration.properties b/itests/src/test/resources/testconfiguration.properties
index 99d3817..f513fe5 100644
--- a/itests/src/test/resources/testconfiguration.properties
+++ b/itests/src/test/resources/testconfiguration.properties
@@ -255,6 +255,7 @@ minillaplocal.shared.query.files=alter_merge_2_orc.q,\
ptf_matchpath.q,\
ptf_streaming.q,\
results_cache_1.q,\
+ results_cache_empty_result.q,\
sample1.q,\
selectDistinctStar.q,\
select_dummy_source.q,\
http://git-wip-us.apache.org/repos/asf/hive/blob/50e0077a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
index d789ed0..eefcaea 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
@@ -1860,7 +1860,23 @@ public class Driver implements IDriver {
cacheUsage = new CacheUsage(CacheUsage.CacheStatus.QUERY_USING_CACHE, cacheEntry);
}
- private void checkCacheUsage() throws Exception {
+ private void preExecutionCacheActions() throws Exception {
+ if (cacheUsage != null) {
+ if (cacheUsage.getStatus() == CacheUsage.CacheStatus.CAN_CACHE_QUERY_RESULTS &&
+ plan.getFetchTask() != null) {
+ // The results of this query execution might be cacheable.
+ // Add a placeholder entry in the cache so other queries know this result is pending.
+ CacheEntry pendingCacheEntry =
+ QueryResultsCache.getInstance().addToCache(cacheUsage.getQueryInfo());
+ if (pendingCacheEntry != null) {
+ // Update cacheUsage to reference the pending entry.
+ this.cacheUsage.setCacheEntry(pendingCacheEntry);
+ }
+ }
+ }
+ }
+
+ private void postExecutionCacheActions() throws Exception {
if (cacheUsage != null) {
if (cacheUsage.getStatus() == CacheUsage.CacheStatus.QUERY_USING_CACHE) {
// Using a previously cached result.
@@ -1868,22 +1884,22 @@ public class Driver implements IDriver {
// Reader count already incremented during cache lookup.
// Save to usedCacheEntry to ensure reader is released after query.
- usedCacheEntry = cacheEntry;
+ this.usedCacheEntry = cacheEntry;
} else if (cacheUsage.getStatus() == CacheUsage.CacheStatus.CAN_CACHE_QUERY_RESULTS &&
+ cacheUsage.getCacheEntry() != null &&
plan.getFetchTask() != null) {
- // The query could not be resolved using the cache, but the query results
- // can be added to the cache for future queries to use.
+ // Save results to the cache for future queries to use.
PerfLogger perfLogger = SessionState.getPerfLogger();
perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.SAVE_TO_RESULTS_CACHE);
- CacheEntry savedCacheEntry =
- QueryResultsCache.getInstance().addToCache(
- cacheUsage.getQueryInfo(),
- plan.getFetchTask().getWork());
- if (savedCacheEntry != null) {
- useFetchFromCache(savedCacheEntry);
- // addToCache() already increments the reader count. Set usedCacheEntry so it gets released.
- usedCacheEntry = savedCacheEntry;
+ boolean savedToCache = QueryResultsCache.getInstance().setEntryValid(
+ cacheUsage.getCacheEntry(),
+ plan.getFetchTask().getWork());
+ LOG.info("savedToCache: {}", savedToCache);
+ if (savedToCache) {
+ useFetchFromCache(cacheUsage.getCacheEntry());
+ // setEntryValid() already increments the reader count. Set usedCacheEntry so it gets released.
+ this.usedCacheEntry = cacheUsage.getCacheEntry();
}
perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.SAVE_TO_RESULTS_CACHE);
@@ -2000,6 +2016,8 @@ public class Driver implements IDriver {
}
}
+ preExecutionCacheActions();
+
perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.RUN_TASKS);
// Loop while you either have tasks running, or tasks queued up
while (driverCxt.isRunning()) {
@@ -2099,7 +2117,7 @@ public class Driver implements IDriver {
}
perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.RUN_TASKS);
- checkCacheUsage();
+ postExecutionCacheActions();
// in case we decided to run everything in local mode, restore the
// the jobtracker setting to its initial value
@@ -2499,13 +2517,31 @@ public class Driver implements IDriver {
}
}
+ private boolean hasBadCacheAttempt() {
+ // Check if the query results were cacheable, and created a pending cache entry.
+ // If we successfully saved the results, the usage would have changed to QUERY_USING_CACHE.
+ return (cacheUsage != null &&
+ cacheUsage.getStatus() == CacheUsage.CacheStatus.CAN_CACHE_QUERY_RESULTS &&
+ cacheUsage.getCacheEntry() != null);
+ }
+
private void releaseCachedResult() {
// Assumes the reader count has been incremented automatically by the results cache by either
// lookup or creating the cache entry.
if (usedCacheEntry != null) {
usedCacheEntry.releaseReader();
usedCacheEntry = null;
+ } else if (hasBadCacheAttempt()) {
+ // This query create a pending cache entry but it was never saved with real results, cleanup.
+ // This step is required, as there may be queries waiting on this pending cache entry.
+ // Removing/invalidating this entry will notify the waiters that this entry cannot be used.
+ try {
+ QueryResultsCache.getInstance().removeEntry(cacheUsage.getCacheEntry());
+ } catch (Exception err) {
+ LOG.error("Error removing failed cache entry " + cacheUsage.getCacheEntry(), err);
+ }
}
+ cacheUsage = null;
}
// Close and release resources within a running query process. Since it runs under
http://git-wip-us.apache.org/repos/asf/hive/blob/50e0077a/ql/src/java/org/apache/hadoop/hive/ql/cache/results/QueryResultsCache.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/cache/results/QueryResultsCache.java b/ql/src/java/org/apache/hadoop/hive/ql/cache/results/QueryResultsCache.java
index 9cdc5fb..4fa1044 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/cache/results/QueryResultsCache.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/cache/results/QueryResultsCache.java
@@ -155,6 +155,10 @@ public final class QueryResultsCache {
}
}
+ public enum CacheEntryStatus {
+ VALID, INVALID, PENDING
+ }
+
public static class CacheEntry {
private QueryInfo queryInfo;
private FetchWork fetchWork;
@@ -163,13 +167,9 @@ public final class QueryResultsCache {
// Cache administration
private long createTime;
private long size;
- private AtomicBoolean valid = new AtomicBoolean(false);
private AtomicInteger readers = new AtomicInteger(0);
private ScheduledFuture<?> invalidationFuture = null;
-
- public boolean isValid() {
- return valid.get();
- }
+ private volatile CacheEntryStatus status = CacheEntryStatus.PENDING;
public void releaseReader() {
int readerCount = 0;
@@ -177,14 +177,13 @@ public final class QueryResultsCache {
readerCount = readers.decrementAndGet();
}
LOG.debug("releaseReader: entry: {}, readerCount: {}", this, readerCount);
- Preconditions.checkState(readerCount >= 0);
cleanupIfNeeded();
}
public String toString() {
return "CacheEntry query: [" + getQueryInfo().getLookupInfo().getQueryText()
- + "], location: " + cachedResultsPath
+ + "], status: " + status + ", location: " + cachedResultsPath
+ ", size: " + size;
}
@@ -192,13 +191,12 @@ public final class QueryResultsCache {
boolean added = false;
int readerCount = 0;
synchronized (this) {
- if (valid.get()) {
+ if (status == CacheEntryStatus.VALID) {
readerCount = readers.incrementAndGet();
added = true;
}
}
- Preconditions.checkState(readerCount > 0);
- LOG.debug("addReader: entry: {}, readerCount: {}", this, readerCount);
+ LOG.debug("addReader: entry: {}, readerCount: {}, added: {}", this, readerCount, added);
return added;
}
@@ -207,32 +205,36 @@ public final class QueryResultsCache {
}
private void invalidate() {
- boolean wasValid = setValidity(false);
-
- if (wasValid) {
- LOG.info("Invalidated cache entry: {}", this);
-
+ LOG.info("Invalidating cache entry: {}", this);
+ CacheEntryStatus prevStatus = setStatus(CacheEntryStatus.INVALID);
+ if (prevStatus == CacheEntryStatus.VALID) {
if (invalidationFuture != null) {
// The cache entry has just been invalidated, no need for the scheduled invalidation.
invalidationFuture.cancel(false);
}
cleanupIfNeeded();
+ } else if (prevStatus == CacheEntryStatus.PENDING) {
+ // Need to notify any queries waiting on the change from pending status.
+ synchronized (this) {
+ this.notifyAll();
+ }
}
}
- /**
- * Set the validity, returning the previous validity value.
- * @param valid
- * @return
- */
- private boolean setValidity(boolean valid) {
- synchronized(this) {
- return this.valid.getAndSet(valid);
+ public CacheEntryStatus getStatus() {
+ return status;
+ }
+
+ private CacheEntryStatus setStatus(CacheEntryStatus newStatus) {
+ synchronized (this) {
+ CacheEntryStatus oldStatus = status;
+ status = newStatus;
+ return oldStatus;
}
}
private void cleanupIfNeeded() {
- if (!isValid() && readers.get() <= 0) {
+ if (status == CacheEntryStatus.INVALID && readers.get() <= 0) {
QueryResultsCache.cleanupEntry(this);
}
}
@@ -255,6 +257,37 @@ public final class QueryResultsCache {
public Path getCachedResultsPath() {
return cachedResultsPath;
}
+
+ /**
+ * Wait for the cache entry to go from PENDING to VALID status.
+ * @return true if the cache entry successfully changed to VALID status,
+ * false if the status changes from PENDING to INVALID
+ */
+ public boolean waitForValidStatus() {
+ LOG.info("Waiting on pending cacheEntry");
+ long timeout = 1000;
+
+ while (true) {
+ try {
+ switch (status) {
+ case VALID:
+ return true;
+ case INVALID:
+ return false;
+ case PENDING:
+ // Status has not changed, continue waiting.
+ break;
+ }
+
+ synchronized (this) {
+ this.wait(timeout);
+ }
+ } catch (InterruptedException err) {
+ Thread.currentThread().interrupt();
+ return false;
+ }
+ }
+ }
}
// Allow lookup by query string
@@ -267,6 +300,7 @@ public final class QueryResultsCache {
private final HiveConf conf;
private Path cacheDirPath;
+ private Path zeroRowsPath;
private long cacheSize = 0;
private long maxCacheSize;
private long maxEntrySize;
@@ -287,6 +321,9 @@ public final class QueryResultsCache {
FsPermission fsPermission = new FsPermission("700");
fs.mkdirs(cacheDirPath, fsPermission);
+ // Create non-existent path for 0-row results
+ zeroRowsPath = new Path(cacheDirPath, "dummy_zero_rows");
+
// Results cache directory should be cleaned up at process termination.
fs.deleteOnExit(cacheDirPath);
@@ -327,7 +364,7 @@ public final class QueryResultsCache {
* using CacheEntry.releaseReader().
* @return The cached result if there is a match in the cache, or null if no match is found.
*/
- public CacheEntry lookup(LookupInfo request, boolean addReader) {
+ public CacheEntry lookup(LookupInfo request) {
CacheEntry result = null;
LOG.debug("QueryResultsCache lookup for query: {}", request.queryText);
@@ -337,27 +374,26 @@ public final class QueryResultsCache {
readLock.lock();
Set<CacheEntry> candidates = queryMap.get(request.queryText);
if (candidates != null) {
+ CacheEntry pendingResult = null;
for (CacheEntry candidate : candidates) {
if (entryMatches(request, candidate)) {
- result = candidate;
- break;
+ CacheEntryStatus entryStatus = candidate.status;
+ if (entryStatus == CacheEntryStatus.VALID) {
+ result = candidate;
+ break;
+ } else if (entryStatus == CacheEntryStatus.PENDING && pendingResult == null) {
+ pendingResult = candidate;
+ }
}
}
+ // Try to find valid entry, but settle for pending entry if that is all we have.
+ if (result == null && pendingResult != null) {
+ result = pendingResult;
+ }
+
if (result != null) {
lru.get(result); // Update LRU
-
- if (!result.isValid()) {
- // Entry is in the cache, but not valid.
- // This can happen when the entry is first added, before the data has been moved
- // to the results cache directory. We cannot use this entry yet.
- result = null;
- } else {
- if (addReader) {
- // Caller will need to be responsible for releasing the reader count.
- result.addReader();
- }
- }
}
}
} finally {
@@ -370,111 +406,124 @@ public final class QueryResultsCache {
}
/**
- * Add an entry to the query results cache.
+ * Add an entry to the cache.
+ * The new entry will be in PENDING state and not usable setEntryValid() is called on the entry.
+ * @param queryInfo
+ * @return
+ */
+ public CacheEntry addToCache(QueryInfo queryInfo) {
+ // Create placeholder entry with PENDING state.
+ String queryText = queryInfo.getLookupInfo().getQueryText();
+ CacheEntry addedEntry = new CacheEntry();
+ addedEntry.queryInfo = queryInfo;
+
+ Lock writeLock = rwLock.writeLock();
+ try {
+ writeLock.lock();
+
+ LOG.info("Adding placeholder cache entry for query '{}'", queryText);
+
+ // Add the entry to the cache structures while under write lock.
+ Set<CacheEntry> entriesForQuery = queryMap.get(queryText);
+ if (entriesForQuery == null) {
+ entriesForQuery = new HashSet<CacheEntry>();
+ queryMap.put(queryText, entriesForQuery);
+ }
+ entriesForQuery.add(addedEntry);
+ lru.put(addedEntry, addedEntry);
+ } finally {
+ writeLock.unlock();
+ }
+
+ return addedEntry;
+ }
+
+ /**
+ * Updates a pending cache entry with a FetchWork result from a finished query.
+ * If successful the cache entry will be set to valid status and be usable for cached queries.
* Important: Adding the entry to the cache will increment the reader count for the cache entry.
* CacheEntry.releaseReader() should be called when the caller is done with the cache entry.
- *
- * @param queryInfo
+ * @param cacheEntry
* @param fetchWork
- * @return The entry if added to the cache. null if the entry is not added.
+ * @return
*/
- public CacheEntry addToCache(QueryInfo queryInfo, FetchWork fetchWork) {
-
- CacheEntry addedEntry = null;
+ public boolean setEntryValid(CacheEntry cacheEntry, FetchWork fetchWork) {
+ String queryText = cacheEntry.getQueryText();
boolean dataDirMoved = false;
Path queryResultsPath = null;
Path cachedResultsPath = null;
- String queryText = queryInfo.getLookupInfo().getQueryText();
- // Should we remove other candidate entries if they are equivalent to these query results?
try {
- CacheEntry potentialEntry = new CacheEntry();
- potentialEntry.queryInfo = queryInfo;
+ boolean requiresMove = true;
queryResultsPath = fetchWork.getTblDir();
FileSystem resultsFs = queryResultsPath.getFileSystem(conf);
- ContentSummary cs = resultsFs.getContentSummary(queryResultsPath);
- potentialEntry.size = cs.getLength();
-
- Lock writeLock = rwLock.writeLock();
- try {
- writeLock.lock();
-
- if (!shouldEntryBeAdded(potentialEntry)) {
- return null;
- }
- if (!clearSpaceForCacheEntry(potentialEntry)) {
- return null;
- }
-
- LOG.info("Adding cache entry for query '{}'", queryText);
-
- // Add the entry to the cache structures while under write lock. Do not mark the entry
- // as valid yet, since the query results have not yet been moved to the cache directory.
- // Do the data move after unlocking since it might take time.
- // Mark the entry as valid once the data has been moved to the cache directory.
- Set<CacheEntry> entriesForQuery = queryMap.get(queryText);
- if (entriesForQuery == null) {
- entriesForQuery = new HashSet<CacheEntry>();
- queryMap.put(queryText, entriesForQuery);
- }
- entriesForQuery.add(potentialEntry);
- lru.put(potentialEntry, potentialEntry);
- cacheSize += potentialEntry.size;
- addedEntry = potentialEntry;
+ long resultSize;
+ if (resultsFs.exists(queryResultsPath)) {
+ ContentSummary cs = resultsFs.getContentSummary(queryResultsPath);
+ resultSize = cs.getLength();
+ } else {
+ // No actual result directory, no need to move anything.
+ cachedResultsPath = zeroRowsPath;
+ resultSize = 0;
+ requiresMove = false;
+ }
- } finally {
- writeLock.unlock();
+ if (!shouldEntryBeAdded(cacheEntry, resultSize)) {
+ return false;
}
- // Move the query results to the query cache directory.
- cachedResultsPath = moveResultsToCacheDirectory(queryResultsPath);
- dataDirMoved = true;
+ if (requiresMove) {
+ // Move the query results to the query cache directory.
+ cachedResultsPath = moveResultsToCacheDirectory(queryResultsPath);
+ dataDirMoved = true;
+ }
LOG.info("Moved query results from {} to {} (size {}) for query '{}'",
- queryResultsPath, cachedResultsPath, cs.getLength(), queryText);
+ queryResultsPath, cachedResultsPath, resultSize, queryText);
// Create a new FetchWork to reference the new cache location.
FetchWork fetchWorkForCache =
new FetchWork(cachedResultsPath, fetchWork.getTblDesc(), fetchWork.getLimit());
fetchWorkForCache.setCachedResult(true);
- addedEntry.fetchWork = fetchWorkForCache;
- addedEntry.cachedResultsPath = cachedResultsPath;
- addedEntry.createTime = System.currentTimeMillis();
- addedEntry.setValidity(true);
+ cacheEntry.fetchWork = fetchWorkForCache;
+ cacheEntry.cachedResultsPath = cachedResultsPath;
+ cacheEntry.size = resultSize;
+ this.cacheSize += resultSize;
+ cacheEntry.createTime = System.currentTimeMillis();
+ cacheEntry.setStatus(CacheEntryStatus.VALID);
// Mark this entry as being in use. Caller will need to release later.
- addedEntry.addReader();
+ cacheEntry.addReader();
- scheduleEntryInvalidation(addedEntry);
+ scheduleEntryInvalidation(cacheEntry);
+
+ // Notify any queries waiting on this cacheEntry to become valid.
+ synchronized (cacheEntry) {
+ cacheEntry.notifyAll();
+ }
} catch (Exception err) {
LOG.error("Failed to create cache entry for query results for query: " + queryText, err);
- if (addedEntry != null) {
- // If the entry was already added to the cache when we hit error, clean up properly.
-
- if (dataDirMoved) {
- // If data was moved from original location to cache directory, we need to move it back!
- LOG.info("Restoring query results from {} back to {}", cachedResultsPath, queryResultsPath);
- try {
- FileSystem fs = cachedResultsPath.getFileSystem(conf);
- fs.rename(cachedResultsPath, queryResultsPath);
- addedEntry.cachedResultsPath = null;
- } catch (Exception err2) {
- String errMsg = "Failed cleanup during failed attempt to cache query: " + queryText;
- LOG.error(errMsg);
- throw new RuntimeException(errMsg);
- }
- }
-
- addedEntry.invalidate();
- if (addedEntry.numReaders() > 0) {
- addedEntry.releaseReader();
+ if (dataDirMoved) {
+ // If data was moved from original location to cache directory, we need to move it back!
+ LOG.info("Restoring query results from {} back to {}", cachedResultsPath, queryResultsPath);
+ try {
+ FileSystem fs = cachedResultsPath.getFileSystem(conf);
+ fs.rename(cachedResultsPath, queryResultsPath);
+ cacheEntry.size = 0;
+ cacheEntry.cachedResultsPath = null;
+ } catch (Exception err2) {
+ String errMsg = "Failed cleanup during failed attempt to cache query: " + queryText;
+ LOG.error(errMsg);
+ throw new RuntimeException(errMsg);
}
}
- return null;
+ // Invalidate the entry. Rely on query cleanup to remove from lookup.
+ cacheEntry.invalidate();
+ return false;
}
- return addedEntry;
+ return true;
}
public void clear() {
@@ -527,7 +576,7 @@ public final class QueryResultsCache {
return true;
}
- private void removeEntry(CacheEntry entry) {
+ public void removeEntry(CacheEntry entry) {
entry.invalidate();
removeFromLookup(entry);
lru.remove(entry);
@@ -538,9 +587,14 @@ public final class QueryResultsCache {
private void removeFromLookup(CacheEntry entry) {
String queryString = entry.getQueryText();
Set<CacheEntry> entries = queryMap.get(queryString);
- Preconditions.checkState(entries != null);
+ if (entries == null) {
+ LOG.warn("ResultsCache: no entry for {}", queryString);
+ return;
+ }
boolean deleted = entries.remove(entry);
- Preconditions.checkState(deleted);
+ if (!deleted) {
+ LOG.warn("ResultsCache: Attempted to remove entry but it was not in the cache: {}", entry);
+ }
if (entries.isEmpty()) {
queryMap.remove(queryString);
}
@@ -556,10 +610,14 @@ public final class QueryResultsCache {
/**
* Determines if the cache entry should be added to the results cache.
*/
- private boolean shouldEntryBeAdded(CacheEntry entry) {
+ private boolean shouldEntryBeAdded(CacheEntry entry, long size) {
// Assumes the cache lock has already been taken.
- if (maxEntrySize >= 0 && entry.size > maxEntrySize) {
- LOG.debug("Cache entry size {} larger than max entry size ({})", entry.size, maxEntrySize);
+ if (maxEntrySize >= 0 && size > maxEntrySize) {
+ LOG.debug("Cache entry size {} larger than max entry size ({})", size, maxEntrySize);
+ return false;
+ }
+
+ if (!clearSpaceForCacheEntry(entry, size)) {
return false;
}
@@ -574,41 +632,41 @@ public final class QueryResultsCache {
return cachedResultsPath;
}
- private boolean hasSpaceForCacheEntry(CacheEntry entry) {
+ private boolean hasSpaceForCacheEntry(CacheEntry entry, long size) {
if (maxCacheSize >= 0) {
- return (cacheSize + entry.size) <= maxCacheSize;
+ return (cacheSize + size) <= maxCacheSize;
}
// Negative max cache size means unbounded.
return true;
}
- private boolean clearSpaceForCacheEntry(CacheEntry entry) {
- if (hasSpaceForCacheEntry(entry)) {
+ private boolean clearSpaceForCacheEntry(CacheEntry entry, long size) {
+ if (hasSpaceForCacheEntry(entry, size)) {
return true;
}
LOG.info("Clearing space for cache entry for query: [{}] with size {}",
- entry.getQueryText(), entry.size);
+ entry.getQueryText(), size);
// Entries should be in LRU order in the keyset iterator.
CacheEntry[] entries = lru.keySet().toArray(EMPTY_CACHEENTRY_ARRAY);
for (CacheEntry removalCandidate : entries) {
- if (!removalCandidate.isValid()) {
- // Likely an entry which is still getting its results moved to the cache directory.
+ if (removalCandidate.getStatus() != CacheEntryStatus.VALID) {
+ // Only entries marked as valid should have results that can be removed.
continue;
}
// Only delete the entry if it has no readers.
if (!(removalCandidate.numReaders() > 0)) {
LOG.info("Removing entry: {}", removalCandidate);
removeEntry(removalCandidate);
- if (hasSpaceForCacheEntry(entry)) {
+ if (hasSpaceForCacheEntry(entry, size)) {
return true;
}
}
}
LOG.info("Could not free enough space for cache entry for query: [{}] withe size {}",
- entry.getQueryText(), entry.size);
+ entry.getQueryText(), size);
return false;
}
@@ -636,11 +694,11 @@ public final class QueryResultsCache {
private void scheduleEntryInvalidation(final CacheEntry entry) {
if (maxEntryLifetime >= 0) {
- // Schedule task to invalidate cache entry.
+ // Schedule task to invalidate cache entry and remove from lookup.
ScheduledFuture<?> future = invalidationExecutor.schedule(new Runnable() {
@Override
public void run() {
- entry.invalidate();
+ removeEntry(entry);
}
}, maxEntryLifetime, TimeUnit.MILLISECONDS);
entry.invalidationFuture = future;
@@ -648,9 +706,11 @@ public final class QueryResultsCache {
}
private static void cleanupEntry(final CacheEntry entry) {
- Preconditions.checkState(!entry.isValid());
+ Preconditions.checkState(entry.getStatus() == CacheEntryStatus.INVALID);
+ final HiveConf conf = getInstance().conf;
- if (entry.cachedResultsPath != null) {
+ if (entry.cachedResultsPath != null &&
+ !getInstance().zeroRowsPath.equals(entry.cachedResultsPath)) {
deletionExecutor.execute(new Runnable() {
@Override
public void run() {
http://git-wip-us.apache.org/repos/asf/hive/blob/50e0077a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
index c65ad45..2342fff 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
@@ -14309,14 +14309,38 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
}
// Don't increment the reader count for explain queries.
boolean isExplainQuery = (ctx.getExplainConfig() != null);
- QueryResultsCache.CacheEntry cacheEntry =
- QueryResultsCache.getInstance().lookup(lookupInfo, !isExplainQuery);
+ QueryResultsCache.CacheEntry cacheEntry = QueryResultsCache.getInstance().lookup(lookupInfo);
if (cacheEntry != null) {
- // Use the cache rather than full query execution.
- useCachedResult(cacheEntry);
+ // Potentially wait on the cache entry if entry is in PENDING status
+ // Blocking here can potentially be dangerous - for example if the global compile lock
+ // is used this will block all subsequent queries that try to acquire the compile lock,
+ // so it should not be done unless parallel compilation is enabled.
+ // We might not want to block for explain queries as well.
+ if (cacheEntry.getStatus() == QueryResultsCache.CacheEntryStatus.PENDING) {
+ if (!isExplainQuery &&
+ conf.getBoolVar(HiveConf.ConfVars.HIVE_QUERY_RESULTS_CACHE_WAIT_FOR_PENDING_RESULTS) &&
+ conf.getBoolVar(HiveConf.ConfVars.HIVE_SERVER2_PARALLEL_COMPILATION)) {
+ if (!cacheEntry.waitForValidStatus()) {
+ LOG.info("Waiting on pending cacheEntry, but it failed to become valid");
+ return false;
+ }
+ } else {
+ LOG.info("Not waiting for pending cacheEntry");
+ return false;
+ }
+ }
- // At this point the caller should return from semantic analysis.
- return true;
+ if (cacheEntry.getStatus() == QueryResultsCache.CacheEntryStatus.VALID) {
+ if (!isExplainQuery) {
+ if (!cacheEntry.addReader()) {
+ return false;
+ }
+ }
+ // Use the cache rather than full query execution.
+ // At this point the caller should return from semantic analysis.
+ useCachedResult(cacheEntry);
+ return true;
+ }
}
return false;
}
http://git-wip-us.apache.org/repos/asf/hive/blob/50e0077a/ql/src/test/queries/clientpositive/results_cache_empty_result.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/results_cache_empty_result.q b/ql/src/test/queries/clientpositive/results_cache_empty_result.q
new file mode 100644
index 0000000..6213671
--- /dev/null
+++ b/ql/src/test/queries/clientpositive/results_cache_empty_result.q
@@ -0,0 +1,13 @@
+
+set hive.query.results.cache.enabled=true;
+
+explain
+select count(*), key from src a where key < 0 group by key;
+select count(*), key from src a where key < 0 group by key;
+
+set test.comment="Cache should be used for this query";
+set test.comment;
+explain
+select count(*), key from src a where key < 0 group by key;
+select count(*), key from src a where key < 0 group by key;
+
http://git-wip-us.apache.org/repos/asf/hive/blob/50e0077a/ql/src/test/results/clientpositive/llap/results_cache_empty_result.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/llap/results_cache_empty_result.q.out b/ql/src/test/results/clientpositive/llap/results_cache_empty_result.q.out
new file mode 100644
index 0000000..b63d079
--- /dev/null
+++ b/ql/src/test/results/clientpositive/llap/results_cache_empty_result.q.out
@@ -0,0 +1,99 @@
+PREHOOK: query: explain
+select count(*), key from src a where key < 0 group by key
+PREHOOK: type: QUERY
+POSTHOOK: query: explain
+select count(*), key from src a where key < 0 group by key
+POSTHOOK: type: QUERY
+STAGE DEPENDENCIES:
+ Stage-1 is a root stage
+ Stage-0 depends on stages: Stage-1
+
+STAGE PLANS:
+ Stage: Stage-1
+ Tez
+#### A masked pattern was here ####
+ Edges:
+ Reducer 2 <- Map 1 (SIMPLE_EDGE)
+#### A masked pattern was here ####
+ Vertices:
+ Map 1
+ Map Operator Tree:
+ TableScan
+ alias: a
+ Statistics: Num rows: 500 Data size: 43500 Basic stats: COMPLETE Column stats: COMPLETE
+ Filter Operator
+ predicate: (UDFToDouble(key) < 0.0D) (type: boolean)
+ Statistics: Num rows: 166 Data size: 14442 Basic stats: COMPLETE Column stats: COMPLETE
+ Group By Operator
+ aggregations: count()
+ keys: key (type: string)
+ mode: hash
+ outputColumnNames: _col0, _col1
+ Statistics: Num rows: 83 Data size: 7885 Basic stats: COMPLETE Column stats: COMPLETE
+ Reduce Output Operator
+ key expressions: _col0 (type: string)
+ sort order: +
+ Map-reduce partition columns: _col0 (type: string)
+ Statistics: Num rows: 83 Data size: 7885 Basic stats: COMPLETE Column stats: COMPLETE
+ value expressions: _col1 (type: bigint)
+ Execution mode: llap
+ LLAP IO: no inputs
+ Reducer 2
+ Execution mode: llap
+ Reduce Operator Tree:
+ Group By Operator
+ aggregations: count(VALUE._col0)
+ keys: KEY._col0 (type: string)
+ mode: mergepartial
+ outputColumnNames: _col0, _col1
+ Statistics: Num rows: 83 Data size: 7885 Basic stats: COMPLETE Column stats: COMPLETE
+ Select Operator
+ expressions: _col1 (type: bigint), _col0 (type: string)
+ outputColumnNames: _col0, _col1
+ Statistics: Num rows: 83 Data size: 7885 Basic stats: COMPLETE Column stats: COMPLETE
+ File Output Operator
+ compressed: false
+ Statistics: Num rows: 83 Data size: 7885 Basic stats: COMPLETE Column stats: COMPLETE
+ table:
+ input format: org.apache.hadoop.mapred.SequenceFileInputFormat
+ output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
+ serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+
+ Stage: Stage-0
+ Fetch Operator
+ limit: -1
+ Processor Tree:
+ ListSink
+
+PREHOOK: query: select count(*), key from src a where key < 0 group by key
+PREHOOK: type: QUERY
+PREHOOK: Input: default@src
+#### A masked pattern was here ####
+POSTHOOK: query: select count(*), key from src a where key < 0 group by key
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@src
+#### A masked pattern was here ####
+test.comment="Cache should be used for this query"
+PREHOOK: query: explain
+select count(*), key from src a where key < 0 group by key
+PREHOOK: type: QUERY
+POSTHOOK: query: explain
+select count(*), key from src a where key < 0 group by key
+POSTHOOK: type: QUERY
+STAGE DEPENDENCIES:
+ Stage-0 is a root stage
+
+STAGE PLANS:
+ Stage: Stage-0
+ Fetch Operator
+ limit: -1
+ Processor Tree:
+ ListSink
+ Cached Query Result: true
+
+PREHOOK: query: select count(*), key from src a where key < 0 group by key
+PREHOOK: type: QUERY
+PREHOOK: Input: default@src
+POSTHOOK: query: select count(*), key from src a where key < 0 group by key
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@src
http://git-wip-us.apache.org/repos/asf/hive/blob/50e0077a/ql/src/test/results/clientpositive/results_cache_empty_result.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/results_cache_empty_result.q.out b/ql/src/test/results/clientpositive/results_cache_empty_result.q.out
new file mode 100644
index 0000000..000e5f0
--- /dev/null
+++ b/ql/src/test/results/clientpositive/results_cache_empty_result.q.out
@@ -0,0 +1,91 @@
+PREHOOK: query: explain
+select count(*), key from src a where key < 0 group by key
+PREHOOK: type: QUERY
+POSTHOOK: query: explain
+select count(*), key from src a where key < 0 group by key
+POSTHOOK: type: QUERY
+STAGE DEPENDENCIES:
+ Stage-1 is a root stage
+ Stage-0 depends on stages: Stage-1
+
+STAGE PLANS:
+ Stage: Stage-1
+ Map Reduce
+ Map Operator Tree:
+ TableScan
+ alias: a
+ Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
+ Filter Operator
+ predicate: (UDFToDouble(key) < 0.0D) (type: boolean)
+ Statistics: Num rows: 166 Data size: 1763 Basic stats: COMPLETE Column stats: NONE
+ Group By Operator
+ aggregations: count()
+ keys: key (type: string)
+ mode: hash
+ outputColumnNames: _col0, _col1
+ Statistics: Num rows: 166 Data size: 1763 Basic stats: COMPLETE Column stats: NONE
+ Reduce Output Operator
+ key expressions: _col0 (type: string)
+ sort order: +
+ Map-reduce partition columns: _col0 (type: string)
+ Statistics: Num rows: 166 Data size: 1763 Basic stats: COMPLETE Column stats: NONE
+ value expressions: _col1 (type: bigint)
+ Reduce Operator Tree:
+ Group By Operator
+ aggregations: count(VALUE._col0)
+ keys: KEY._col0 (type: string)
+ mode: mergepartial
+ outputColumnNames: _col0, _col1
+ Statistics: Num rows: 83 Data size: 881 Basic stats: COMPLETE Column stats: NONE
+ Select Operator
+ expressions: _col1 (type: bigint), _col0 (type: string)
+ outputColumnNames: _col0, _col1
+ Statistics: Num rows: 83 Data size: 881 Basic stats: COMPLETE Column stats: NONE
+ File Output Operator
+ compressed: false
+ Statistics: Num rows: 83 Data size: 881 Basic stats: COMPLETE Column stats: NONE
+ table:
+ input format: org.apache.hadoop.mapred.SequenceFileInputFormat
+ output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
+ serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+
+ Stage: Stage-0
+ Fetch Operator
+ limit: -1
+ Processor Tree:
+ ListSink
+
+PREHOOK: query: select count(*), key from src a where key < 0 group by key
+PREHOOK: type: QUERY
+PREHOOK: Input: default@src
+#### A masked pattern was here ####
+POSTHOOK: query: select count(*), key from src a where key < 0 group by key
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@src
+#### A masked pattern was here ####
+test.comment="Cache should be used for this query"
+PREHOOK: query: explain
+select count(*), key from src a where key < 0 group by key
+PREHOOK: type: QUERY
+POSTHOOK: query: explain
+select count(*), key from src a where key < 0 group by key
+POSTHOOK: type: QUERY
+STAGE DEPENDENCIES:
+ Stage-0 is a root stage
+
+STAGE PLANS:
+ Stage: Stage-0
+ Fetch Operator
+ limit: -1
+ Processor Tree:
+ ListSink
+ Cached Query Result: true
+
+PREHOOK: query: select count(*), key from src a where key < 0 group by key
+PREHOOK: type: QUERY
+PREHOOK: Input: default@src
+#### A masked pattern was here ####
+POSTHOOK: query: select count(*), key from src a where key < 0 group by key
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@src
+#### A masked pattern was here ####