You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by jd...@apache.org on 2018/04/09 23:48:57 UTC
hive git commit: HIVE-19127: Concurrency fixes in QueryResultsCache
(Jason Dere, reviewed by Deepak Jaiswal)
Repository: hive
Updated Branches:
refs/heads/master 76b696c26 -> a1034102d
HIVE-19127: Concurrency fixes in QueryResultsCache (Jason Dere, reviewed by Deepak Jaiswal)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/a1034102
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/a1034102
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/a1034102
Branch: refs/heads/master
Commit: a1034102d3580922f6c8f9d186272280d6917802
Parents: 76b696c
Author: Jason Dere <jd...@hortonworks.com>
Authored: Mon Apr 9 16:48:23 2018 -0700
Committer: Jason Dere <jd...@hortonworks.com>
Committed: Mon Apr 9 16:48:23 2018 -0700
----------------------------------------------------------------------
.../ql/cache/results/QueryResultsCache.java | 112 +++++++++++--------
1 file changed, 68 insertions(+), 44 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/a1034102/ql/src/java/org/apache/hadoop/hive/ql/cache/results/QueryResultsCache.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/cache/results/QueryResultsCache.java b/ql/src/java/org/apache/hadoop/hive/ql/cache/results/QueryResultsCache.java
index ac5ae57..b1a3646 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/cache/results/QueryResultsCache.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/cache/results/QueryResultsCache.java
@@ -502,32 +502,39 @@ public final class QueryResultsCache {
return false;
}
- if (requiresMove) {
- // Move the query results to the query cache directory.
- cachedResultsPath = moveResultsToCacheDirectory(queryResultsPath);
- dataDirMoved = true;
- }
- LOG.info("Moved query results from {} to {} (size {}) for query '{}'",
- queryResultsPath, cachedResultsPath, resultSize, queryText);
-
- // Create a new FetchWork to reference the new cache location.
- FetchWork fetchWorkForCache =
- new FetchWork(cachedResultsPath, fetchWork.getTblDesc(), fetchWork.getLimit());
- fetchWorkForCache.setCachedResult(true);
- cacheEntry.fetchWork = fetchWorkForCache;
- cacheEntry.cachedResultsPath = cachedResultsPath;
- cacheEntry.size = resultSize;
- this.cacheSize += resultSize;
- cacheEntry.createTime = System.currentTimeMillis();
-
- cacheEntry.setStatus(CacheEntryStatus.VALID);
- // Mark this entry as being in use. Caller will need to release later.
- cacheEntry.addReader();
-
- scheduleEntryInvalidation(cacheEntry);
-
- // Notify any queries waiting on this cacheEntry to become valid.
+ // Synchronize on the cache entry so that no one else can invalidate this entry
+ // while we are in the process of setting it to valid.
synchronized (cacheEntry) {
+ if (cacheEntry.getStatus() == CacheEntryStatus.INVALID) {
+ // Entry either expired, or was invalidated due to table updates
+ return false;
+ }
+
+ if (requiresMove) {
+ // Move the query results to the query cache directory.
+ cachedResultsPath = moveResultsToCacheDirectory(queryResultsPath);
+ dataDirMoved = true;
+ }
+ LOG.info("Moved query results from {} to {} (size {}) for query '{}'",
+ queryResultsPath, cachedResultsPath, resultSize, queryText);
+
+ // Create a new FetchWork to reference the new cache location.
+ FetchWork fetchWorkForCache =
+ new FetchWork(cachedResultsPath, fetchWork.getTblDesc(), fetchWork.getLimit());
+ fetchWorkForCache.setCachedResult(true);
+ cacheEntry.fetchWork = fetchWorkForCache;
+ cacheEntry.cachedResultsPath = cachedResultsPath;
+ cacheEntry.size = resultSize;
+ this.cacheSize += resultSize;
+ cacheEntry.createTime = System.currentTimeMillis();
+
+ cacheEntry.setStatus(CacheEntryStatus.VALID);
+ // Mark this entry as being in use. Caller will need to release later.
+ cacheEntry.addReader();
+
+ scheduleEntryInvalidation(cacheEntry);
+
+ // Notify any queries waiting on this cacheEntry to become valid.
cacheEntry.notifyAll();
}
@@ -564,7 +571,11 @@ public final class QueryResultsCache {
try {
writeLock.lock();
LOG.info("Clearing the results cache");
- for (CacheEntry entry : lru.keySet().toArray(EMPTY_CACHEENTRY_ARRAY)) {
+ CacheEntry[] allEntries = null;
+ synchronized (lru) {
+ allEntries = lru.keySet().toArray(EMPTY_CACHEENTRY_ARRAY);
+ }
+ for (CacheEntry entry : allEntries) {
try {
removeEntry(entry);
} catch (Exception err) {
@@ -611,10 +622,15 @@ public final class QueryResultsCache {
public void removeEntry(CacheEntry entry) {
entry.invalidate();
- removeFromLookup(entry);
- lru.remove(entry);
- // Should the cache size be updated here, or after the result data has actually been deleted?
- cacheSize -= entry.size;
+ rwLock.writeLock().lock();
+ try {
+ removeFromLookup(entry);
+ lru.remove(entry);
+ // Should the cache size be updated here, or after the result data has actually been deleted?
+ cacheSize -= entry.size;
+ } finally {
+ rwLock.writeLock().unlock();
+ }
}
private void removeFromLookup(CacheEntry entry) {
@@ -674,6 +690,20 @@ public final class QueryResultsCache {
return true;
}
+ private CacheEntry findEntryToRemove() {
+ // Entries should be in LRU order in the keyset iterator.
+ Set<CacheEntry> entries = lru.keySet();
+ synchronized (lru) {
+ for (CacheEntry removalCandidate : entries) {
+ if (removalCandidate.getStatus() != CacheEntryStatus.VALID) {
+ continue;
+ }
+ return removalCandidate;
+ }
+ }
+ return null;
+ }
+
private boolean clearSpaceForCacheEntry(CacheEntry entry, long size) {
if (hasSpaceForCacheEntry(entry, size)) {
return true;
@@ -682,20 +712,14 @@ public final class QueryResultsCache {
LOG.info("Clearing space for cache entry for query: [{}] with size {}",
entry.getQueryText(), size);
- // Entries should be in LRU order in the keyset iterator.
- CacheEntry[] entries = lru.keySet().toArray(EMPTY_CACHEENTRY_ARRAY);
- for (CacheEntry removalCandidate : entries) {
- if (removalCandidate.getStatus() != CacheEntryStatus.VALID) {
- // Only entries marked as valid should have results that can be removed.
- continue;
- }
- // Only delete the entry if it has no readers.
- if (!(removalCandidate.numReaders() > 0)) {
- LOG.info("Removing entry: {}", removalCandidate);
- removeEntry(removalCandidate);
- if (hasSpaceForCacheEntry(entry, size)) {
- return true;
- }
+ CacheEntry removalCandidate;
+ while ((removalCandidate = findEntryToRemove()) != null) {
+ LOG.info("Removing entry: {}", removalCandidate);
+ removeEntry(removalCandidate);
+ // TODO: Should we wait for the entry to actually be deleted from HDFS? Would have to
+ // poll the reader count, waiting for it to reach 0, at which point cleanup should occur.
+ if (hasSpaceForCacheEntry(entry, size)) {
+ return true;
}
}