You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by jd...@apache.org on 2018/03/15 22:57:54 UTC

hive git commit: HIVE-18846: Query results cache: Allow queries to refer to the pending results of a query that has not finished yet (Jason Dere, reviewed by GopalV)

Repository: hive
Updated Branches:
  refs/heads/master 165e35c04 -> 50e0077a8


HIVE-18846: Query results cache: Allow queries to refer to the pending results of a query that has not finished yet (Jason Dere, reviewed by GopalV)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/50e0077a
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/50e0077a
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/50e0077a

Branch: refs/heads/master
Commit: 50e0077a8c1fc3914b22c496b73833e004d14658
Parents: 165e35c
Author: Jason Dere <jd...@hortonworks.com>
Authored: Thu Mar 15 15:57:29 2018 -0700
Committer: Jason Dere <jd...@hortonworks.com>
Committed: Thu Mar 15 15:57:29 2018 -0700

----------------------------------------------------------------------
 .../org/apache/hadoop/hive/conf/HiveConf.java   |   4 +
 .../test/resources/testconfiguration.properties |   1 +
 .../java/org/apache/hadoop/hive/ql/Driver.java  |  62 +++-
 .../ql/cache/results/QueryResultsCache.java     | 324 +++++++++++--------
 .../hadoop/hive/ql/parse/SemanticAnalyzer.java  |  36 ++-
 .../clientpositive/results_cache_empty_result.q |  13 +
 .../llap/results_cache_empty_result.q.out       |  99 ++++++
 .../results_cache_empty_result.q.out            |  91 ++++++
 8 files changed, 479 insertions(+), 151 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/50e0077a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 907d064..8bbf1be 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -3713,6 +3713,10 @@ public class HiveConf extends Configuration {
         "If the query results cache is enabled. This will keep results of previously executed queries " +
         "to be reused if the same query is executed again."),
 
+    HIVE_QUERY_RESULTS_CACHE_WAIT_FOR_PENDING_RESULTS("hive.query.results.cache.wait.for.pending.results", true,
+        "Should a query wait for the pending results of an already running query, " +
+        "in order to use the cached result when it becomes ready"),
+
     HIVE_QUERY_RESULTS_CACHE_DIRECTORY("hive.query.results.cache.directory",
         "/tmp/hive/_resultscache_",
         "Location of the query results cache directory. Temporary results from queries " +

http://git-wip-us.apache.org/repos/asf/hive/blob/50e0077a/itests/src/test/resources/testconfiguration.properties
----------------------------------------------------------------------
diff --git a/itests/src/test/resources/testconfiguration.properties b/itests/src/test/resources/testconfiguration.properties
index 99d3817..f513fe5 100644
--- a/itests/src/test/resources/testconfiguration.properties
+++ b/itests/src/test/resources/testconfiguration.properties
@@ -255,6 +255,7 @@ minillaplocal.shared.query.files=alter_merge_2_orc.q,\
   ptf_matchpath.q,\
   ptf_streaming.q,\
   results_cache_1.q,\
+  results_cache_empty_result.q,\
   sample1.q,\
   selectDistinctStar.q,\
   select_dummy_source.q,\

http://git-wip-us.apache.org/repos/asf/hive/blob/50e0077a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
index d789ed0..eefcaea 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
@@ -1860,7 +1860,23 @@ public class Driver implements IDriver {
     cacheUsage = new CacheUsage(CacheUsage.CacheStatus.QUERY_USING_CACHE, cacheEntry);
   }
 
-  private void checkCacheUsage() throws Exception {
+  private void preExecutionCacheActions() throws Exception {
+    if (cacheUsage != null) {
+      if (cacheUsage.getStatus() == CacheUsage.CacheStatus.CAN_CACHE_QUERY_RESULTS &&
+          plan.getFetchTask() != null) {
+        // The results of this query execution might be cacheable.
+        // Add a placeholder entry in the cache so other queries know this result is pending.
+        CacheEntry pendingCacheEntry =
+            QueryResultsCache.getInstance().addToCache(cacheUsage.getQueryInfo());
+        if (pendingCacheEntry != null) {
+          // Update cacheUsage to reference the pending entry.
+          this.cacheUsage.setCacheEntry(pendingCacheEntry);
+        }
+      }
+    }
+  }
+
+  private void postExecutionCacheActions() throws Exception {
     if (cacheUsage != null) {
       if (cacheUsage.getStatus() == CacheUsage.CacheStatus.QUERY_USING_CACHE) {
         // Using a previously cached result.
@@ -1868,22 +1884,22 @@ public class Driver implements IDriver {
 
         // Reader count already incremented during cache lookup.
         // Save to usedCacheEntry to ensure reader is released after query.
-        usedCacheEntry = cacheEntry;
+        this.usedCacheEntry = cacheEntry;
       } else if (cacheUsage.getStatus() == CacheUsage.CacheStatus.CAN_CACHE_QUERY_RESULTS &&
+          cacheUsage.getCacheEntry() != null &&
           plan.getFetchTask() != null) {
-        // The query could not be resolved using the cache, but the query results
-        // can be added to the cache for future queries to use.
+        // Save results to the cache for future queries to use.
         PerfLogger perfLogger = SessionState.getPerfLogger();
         perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.SAVE_TO_RESULTS_CACHE);
 
-        CacheEntry savedCacheEntry =
-            QueryResultsCache.getInstance().addToCache(
-                cacheUsage.getQueryInfo(),
-                plan.getFetchTask().getWork());
-        if (savedCacheEntry != null) {
-          useFetchFromCache(savedCacheEntry);
-          // addToCache() already increments the reader count. Set usedCacheEntry so it gets released.
-          usedCacheEntry = savedCacheEntry;
+        boolean savedToCache = QueryResultsCache.getInstance().setEntryValid(
+            cacheUsage.getCacheEntry(),
+            plan.getFetchTask().getWork());
+        LOG.info("savedToCache: {}", savedToCache);
+        if (savedToCache) {
+          useFetchFromCache(cacheUsage.getCacheEntry());
+          // setEntryValid() already increments the reader count. Set usedCacheEntry so it gets released.
+          this.usedCacheEntry = cacheUsage.getCacheEntry();
         }
 
         perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.SAVE_TO_RESULTS_CACHE);
@@ -2000,6 +2016,8 @@ public class Driver implements IDriver {
         }
       }
 
+      preExecutionCacheActions();
+
       perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.RUN_TASKS);
       // Loop while you either have tasks running, or tasks queued up
       while (driverCxt.isRunning()) {
@@ -2099,7 +2117,7 @@ public class Driver implements IDriver {
       }
       perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.RUN_TASKS);
 
-      checkCacheUsage();
+      postExecutionCacheActions();
 
       // in case we decided to run everything in local mode, restore the
       // the jobtracker setting to its initial value
@@ -2499,13 +2517,31 @@ public class Driver implements IDriver {
     }
   }
 
+  private boolean hasBadCacheAttempt() {
+    // Check if the query results were cacheable, and created a pending cache entry.
+    // If we successfully saved the results, the usage would have changed to QUERY_USING_CACHE.
+    return (cacheUsage != null &&
+        cacheUsage.getStatus() == CacheUsage.CacheStatus.CAN_CACHE_QUERY_RESULTS &&
+        cacheUsage.getCacheEntry() != null);
+  }
+
   private void releaseCachedResult() {
     // Assumes the reader count has been incremented automatically by the results cache by either
     // lookup or creating the cache entry.
     if (usedCacheEntry != null) {
       usedCacheEntry.releaseReader();
       usedCacheEntry = null;
+    } else if (hasBadCacheAttempt()) {
+      // This query create a pending cache entry but it was never saved with real results, cleanup.
+      // This step is required, as there may be queries waiting on this pending cache entry.
+      // Removing/invalidating this entry will notify the waiters that this entry cannot be used.
+      try {
+        QueryResultsCache.getInstance().removeEntry(cacheUsage.getCacheEntry());
+      } catch (Exception err) {
+        LOG.error("Error removing failed cache entry " + cacheUsage.getCacheEntry(), err);
+      }
     }
+    cacheUsage = null;
   }
 
   // Close and release resources within a running query process. Since it runs under

http://git-wip-us.apache.org/repos/asf/hive/blob/50e0077a/ql/src/java/org/apache/hadoop/hive/ql/cache/results/QueryResultsCache.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/cache/results/QueryResultsCache.java b/ql/src/java/org/apache/hadoop/hive/ql/cache/results/QueryResultsCache.java
index 9cdc5fb..4fa1044 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/cache/results/QueryResultsCache.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/cache/results/QueryResultsCache.java
@@ -155,6 +155,10 @@ public final class QueryResultsCache {
     }
   }
 
+  public enum CacheEntryStatus {
+    VALID, INVALID, PENDING
+  }
+
   public static class CacheEntry {
     private QueryInfo queryInfo;
     private FetchWork fetchWork;
@@ -163,13 +167,9 @@ public final class QueryResultsCache {
     // Cache administration
     private long createTime;
     private long size;
-    private AtomicBoolean valid = new AtomicBoolean(false);
     private AtomicInteger readers = new AtomicInteger(0);
     private ScheduledFuture<?> invalidationFuture = null;
-
-    public boolean isValid() {
-      return valid.get();
-    }
+    private volatile CacheEntryStatus status = CacheEntryStatus.PENDING;
 
     public void releaseReader() {
       int readerCount = 0;
@@ -177,14 +177,13 @@ public final class QueryResultsCache {
         readerCount = readers.decrementAndGet();
       }
       LOG.debug("releaseReader: entry: {}, readerCount: {}", this, readerCount);
-      Preconditions.checkState(readerCount >= 0);
 
       cleanupIfNeeded();
     }
 
     public String toString() {
       return "CacheEntry query: [" + getQueryInfo().getLookupInfo().getQueryText()
-          + "], location: " + cachedResultsPath
+          + "], status: " + status + ", location: " + cachedResultsPath
           + ", size: " + size;
     }
 
@@ -192,13 +191,12 @@ public final class QueryResultsCache {
       boolean added = false;
       int readerCount = 0;
       synchronized (this) {
-        if (valid.get()) {
+        if (status == CacheEntryStatus.VALID) {
           readerCount = readers.incrementAndGet();
           added = true;
         }
       }
-      Preconditions.checkState(readerCount > 0);
-      LOG.debug("addReader: entry: {}, readerCount: {}", this, readerCount);
+      LOG.debug("addReader: entry: {}, readerCount: {}, added: {}", this, readerCount, added);
       return added;
     }
 
@@ -207,32 +205,36 @@ public final class QueryResultsCache {
     }
 
     private void invalidate() {
-      boolean wasValid = setValidity(false);
-
-      if (wasValid) {
-        LOG.info("Invalidated cache entry: {}", this);
-
+      LOG.info("Invalidating cache entry: {}", this);
+      CacheEntryStatus prevStatus = setStatus(CacheEntryStatus.INVALID);
+      if (prevStatus == CacheEntryStatus.VALID) {
         if (invalidationFuture != null) {
           // The cache entry has just been invalidated, no need for the scheduled invalidation.
           invalidationFuture.cancel(false);
         }
         cleanupIfNeeded();
+      } else if (prevStatus == CacheEntryStatus.PENDING) {
+        // Need to notify any queries waiting on the change from pending status.
+        synchronized (this) {
+          this.notifyAll();
+        }
       }
     }
 
-    /**
-     * Set the validity, returning the previous validity value.
-     * @param valid
-     * @return
-     */
-    private boolean setValidity(boolean valid) {
-      synchronized(this) {
-        return this.valid.getAndSet(valid);
+    public CacheEntryStatus getStatus() {
+      return status;
+    }
+
+    private CacheEntryStatus setStatus(CacheEntryStatus newStatus) {
+      synchronized (this) {
+        CacheEntryStatus oldStatus = status;
+        status = newStatus;
+        return oldStatus;
       }
     }
 
     private void cleanupIfNeeded() {
-      if (!isValid() && readers.get() <= 0) {
+      if (status == CacheEntryStatus.INVALID && readers.get() <= 0) {
         QueryResultsCache.cleanupEntry(this);
       }
     }
@@ -255,6 +257,37 @@ public final class QueryResultsCache {
     public Path getCachedResultsPath() {
       return cachedResultsPath;
     }
+
+    /**
+     * Wait for the cache entry to go from PENDING to VALID status.
+     * @return true if the cache entry successfully changed to VALID status,
+     *         false if the status changes from PENDING to INVALID
+     */
+    public boolean waitForValidStatus() {
+      LOG.info("Waiting on pending cacheEntry");
+      long timeout = 1000;
+
+      while (true) {
+        try {
+          switch (status) {
+          case VALID:
+            return true;
+          case INVALID:
+            return false;
+          case PENDING:
+            // Status has not changed, continue waiting.
+            break;
+          }
+
+          synchronized (this) {
+            this.wait(timeout);
+          }
+        } catch (InterruptedException err) {
+          Thread.currentThread().interrupt();
+          return false;
+        }
+      }
+    }
   }
 
   // Allow lookup by query string
@@ -267,6 +300,7 @@ public final class QueryResultsCache {
 
   private final HiveConf conf;
   private Path cacheDirPath;
+  private Path zeroRowsPath;
   private long cacheSize = 0;
   private long maxCacheSize;
   private long maxEntrySize;
@@ -287,6 +321,9 @@ public final class QueryResultsCache {
     FsPermission fsPermission = new FsPermission("700");
     fs.mkdirs(cacheDirPath, fsPermission);
 
+    // Create non-existent path for 0-row results
+    zeroRowsPath = new Path(cacheDirPath, "dummy_zero_rows");
+
     // Results cache directory should be cleaned up at process termination.
     fs.deleteOnExit(cacheDirPath);
 
@@ -327,7 +364,7 @@ public final class QueryResultsCache {
    *        using CacheEntry.releaseReader().
    * @return  The cached result if there is a match in the cache, or null if no match is found.
    */
-  public CacheEntry lookup(LookupInfo request, boolean addReader) {
+  public CacheEntry lookup(LookupInfo request) {
     CacheEntry result = null;
 
     LOG.debug("QueryResultsCache lookup for query: {}", request.queryText);
@@ -337,27 +374,26 @@ public final class QueryResultsCache {
       readLock.lock();
       Set<CacheEntry> candidates = queryMap.get(request.queryText);
       if (candidates != null) {
+        CacheEntry pendingResult = null;
         for (CacheEntry candidate : candidates) {
           if (entryMatches(request, candidate)) {
-            result = candidate;
-            break;
+            CacheEntryStatus entryStatus = candidate.status;
+            if (entryStatus == CacheEntryStatus.VALID) {
+              result = candidate;
+              break;
+            } else if (entryStatus == CacheEntryStatus.PENDING && pendingResult == null) {
+              pendingResult = candidate;
+            }
           }
         }
 
+        // Try to find valid entry, but settle for pending entry if that is all we have.
+        if (result == null && pendingResult != null) {
+          result = pendingResult;
+        }
+
         if (result != null) {
           lru.get(result);  // Update LRU
-
-          if (!result.isValid()) {
-            // Entry is in the cache, but not valid.
-            // This can happen when the entry is first added, before the data has been moved
-            // to the results cache directory. We cannot use this entry yet.
-            result = null;
-          } else {
-            if (addReader) {
-              // Caller will need to be responsible for releasing the reader count.
-              result.addReader();
-            }
-          }
         }
       }
     } finally {
@@ -370,111 +406,124 @@ public final class QueryResultsCache {
   }
 
   /**
-   * Add an entry to the query results cache.
+   * Add an entry to the cache.
+   * The new entry will be in PENDING state and not usable setEntryValid() is called on the entry.
+   * @param queryInfo
+   * @return
+   */
+  public CacheEntry addToCache(QueryInfo queryInfo) {
+    // Create placeholder entry with PENDING state.
+    String queryText = queryInfo.getLookupInfo().getQueryText();
+    CacheEntry addedEntry = new CacheEntry();
+    addedEntry.queryInfo = queryInfo;
+
+    Lock writeLock = rwLock.writeLock();
+    try {
+      writeLock.lock();
+
+      LOG.info("Adding placeholder cache entry for query '{}'", queryText);
+
+      // Add the entry to the cache structures while under write lock.
+      Set<CacheEntry> entriesForQuery = queryMap.get(queryText);
+      if (entriesForQuery == null) {
+        entriesForQuery = new HashSet<CacheEntry>();
+        queryMap.put(queryText, entriesForQuery);
+      }
+      entriesForQuery.add(addedEntry);
+      lru.put(addedEntry, addedEntry);
+    } finally {
+      writeLock.unlock();
+    }
+
+    return addedEntry;
+  }
+
+  /**
+   * Updates a pending cache entry with a FetchWork result from a finished query.
+   * If successful the cache entry will be set to valid status and be usable for cached queries.
    * Important: Adding the entry to the cache will increment the reader count for the cache entry.
    * CacheEntry.releaseReader() should be called when the caller is done with the cache entry.
-   *
-   * @param queryInfo
+   * @param cacheEntry
    * @param fetchWork
-   * @return The entry if added to the cache. null if the entry is not added.
+   * @return
    */
-  public CacheEntry addToCache(QueryInfo queryInfo, FetchWork fetchWork) {
-
-    CacheEntry addedEntry = null;
+  public boolean setEntryValid(CacheEntry cacheEntry, FetchWork fetchWork) {
+    String queryText = cacheEntry.getQueryText();
     boolean dataDirMoved = false;
     Path queryResultsPath = null;
     Path cachedResultsPath = null;
-    String queryText = queryInfo.getLookupInfo().getQueryText();
 
-    // Should we remove other candidate entries if they are equivalent to these query results?
     try {
-      CacheEntry potentialEntry = new CacheEntry();
-      potentialEntry.queryInfo = queryInfo;
+      boolean requiresMove = true;
       queryResultsPath = fetchWork.getTblDir();
       FileSystem resultsFs = queryResultsPath.getFileSystem(conf);
-      ContentSummary cs = resultsFs.getContentSummary(queryResultsPath);
-      potentialEntry.size = cs.getLength();
-
-      Lock writeLock = rwLock.writeLock();
-      try {
-        writeLock.lock();
-
-        if (!shouldEntryBeAdded(potentialEntry)) {
-          return null;
-        }
-        if (!clearSpaceForCacheEntry(potentialEntry)) {
-          return null;
-        }
-
-        LOG.info("Adding cache entry for query '{}'", queryText);
-
-        // Add the entry to the cache structures while under write lock. Do not mark the entry
-        // as valid yet, since the query results have not yet been moved to the cache directory.
-        // Do the data move after unlocking since it might take time.
-        // Mark the entry as valid once the data has been moved to the cache directory.
-        Set<CacheEntry> entriesForQuery = queryMap.get(queryText);
-        if (entriesForQuery == null) {
-          entriesForQuery = new HashSet<CacheEntry>();
-          queryMap.put(queryText, entriesForQuery);
-        }
-        entriesForQuery.add(potentialEntry);
-        lru.put(potentialEntry, potentialEntry);
-        cacheSize += potentialEntry.size;
-        addedEntry = potentialEntry;
+      long resultSize;
+      if (resultsFs.exists(queryResultsPath)) {
+        ContentSummary cs = resultsFs.getContentSummary(queryResultsPath);
+        resultSize = cs.getLength();
+      } else {
+        // No actual result directory, no need to move anything.
+        cachedResultsPath = zeroRowsPath;
+        resultSize = 0;
+        requiresMove = false;
+      }
 
-      } finally {
-        writeLock.unlock();
+      if (!shouldEntryBeAdded(cacheEntry, resultSize)) {
+        return false;
       }
 
-      // Move the query results to the query cache directory.
-      cachedResultsPath = moveResultsToCacheDirectory(queryResultsPath);
-      dataDirMoved = true;
+      if (requiresMove) {
+        // Move the query results to the query cache directory.
+        cachedResultsPath = moveResultsToCacheDirectory(queryResultsPath);
+        dataDirMoved = true;
+      }
       LOG.info("Moved query results from {} to {} (size {}) for query '{}'",
-          queryResultsPath, cachedResultsPath, cs.getLength(), queryText);
+          queryResultsPath, cachedResultsPath, resultSize, queryText);
 
       // Create a new FetchWork to reference the new cache location.
       FetchWork fetchWorkForCache =
           new FetchWork(cachedResultsPath, fetchWork.getTblDesc(), fetchWork.getLimit());
       fetchWorkForCache.setCachedResult(true);
-      addedEntry.fetchWork = fetchWorkForCache;
-      addedEntry.cachedResultsPath = cachedResultsPath;
-      addedEntry.createTime = System.currentTimeMillis();
-      addedEntry.setValidity(true);
+      cacheEntry.fetchWork = fetchWorkForCache;
+      cacheEntry.cachedResultsPath = cachedResultsPath;
+      cacheEntry.size = resultSize;
+      this.cacheSize += resultSize;
+      cacheEntry.createTime = System.currentTimeMillis();
 
+      cacheEntry.setStatus(CacheEntryStatus.VALID);
       // Mark this entry as being in use. Caller will need to release later.
-      addedEntry.addReader();
+      cacheEntry.addReader();
 
-      scheduleEntryInvalidation(addedEntry);
+      scheduleEntryInvalidation(cacheEntry);
+
+      // Notify any queries waiting on this cacheEntry to become valid.
+      synchronized (cacheEntry) {
+        cacheEntry.notifyAll();
+      }
     } catch (Exception err) {
       LOG.error("Failed to create cache entry for query results for query: " + queryText, err);
 
-      if (addedEntry != null) {
-        // If the entry was already added to the cache when we hit error, clean up properly.
-
-        if (dataDirMoved) {
-          // If data was moved from original location to cache directory, we need to move it back!
-          LOG.info("Restoring query results from {} back to {}", cachedResultsPath, queryResultsPath);
-          try {
-            FileSystem fs = cachedResultsPath.getFileSystem(conf);
-            fs.rename(cachedResultsPath, queryResultsPath);
-            addedEntry.cachedResultsPath = null;
-          } catch (Exception err2) {
-            String errMsg = "Failed cleanup during failed attempt to cache query: " + queryText;
-            LOG.error(errMsg);
-            throw new RuntimeException(errMsg);
-          }
-        }
-
-        addedEntry.invalidate();
-        if (addedEntry.numReaders() > 0) {
-          addedEntry.releaseReader();
+      if (dataDirMoved) {
+        // If data was moved from original location to cache directory, we need to move it back!
+        LOG.info("Restoring query results from {} back to {}", cachedResultsPath, queryResultsPath);
+        try {
+          FileSystem fs = cachedResultsPath.getFileSystem(conf);
+          fs.rename(cachedResultsPath, queryResultsPath);
+          cacheEntry.size = 0;
+          cacheEntry.cachedResultsPath = null;
+        } catch (Exception err2) {
+          String errMsg = "Failed cleanup during failed attempt to cache query: " + queryText;
+          LOG.error(errMsg);
+          throw new RuntimeException(errMsg);
         }
       }
 
-      return null;
+      // Invalidate the entry. Rely on query cleanup to remove from lookup.
+      cacheEntry.invalidate();
+      return false;
     }
 
-    return addedEntry;
+    return true;
   }
 
   public void clear() {
@@ -527,7 +576,7 @@ public final class QueryResultsCache {
     return true;
   }
 
-  private void removeEntry(CacheEntry entry) {
+  public void removeEntry(CacheEntry entry) {
     entry.invalidate();
     removeFromLookup(entry);
     lru.remove(entry);
@@ -538,9 +587,14 @@ public final class QueryResultsCache {
   private void removeFromLookup(CacheEntry entry) {
     String queryString = entry.getQueryText();
     Set<CacheEntry> entries = queryMap.get(queryString);
-    Preconditions.checkState(entries != null);
+    if (entries == null) {
+      LOG.warn("ResultsCache: no entry for {}", queryString);
+      return;
+    }
     boolean deleted = entries.remove(entry);
-    Preconditions.checkState(deleted);
+    if (!deleted) {
+      LOG.warn("ResultsCache: Attempted to remove entry but it was not in the cache: {}", entry);
+    }
     if (entries.isEmpty()) {
       queryMap.remove(queryString);
     }
@@ -556,10 +610,14 @@ public final class QueryResultsCache {
   /**
    * Determines if the cache entry should be added to the results cache.
    */
-  private boolean shouldEntryBeAdded(CacheEntry entry) {
+  private boolean shouldEntryBeAdded(CacheEntry entry, long size) {
     // Assumes the cache lock has already been taken.
-    if (maxEntrySize >= 0 && entry.size > maxEntrySize) {
-      LOG.debug("Cache entry size {} larger than max entry size ({})", entry.size, maxEntrySize);
+    if (maxEntrySize >= 0 && size > maxEntrySize) {
+      LOG.debug("Cache entry size {} larger than max entry size ({})", size, maxEntrySize);
+      return false;
+    }
+
+    if (!clearSpaceForCacheEntry(entry, size)) {
       return false;
     }
 
@@ -574,41 +632,41 @@ public final class QueryResultsCache {
     return cachedResultsPath;
   }
 
-  private boolean hasSpaceForCacheEntry(CacheEntry entry) {
+  private boolean hasSpaceForCacheEntry(CacheEntry entry, long size) {
     if (maxCacheSize >= 0) {
-      return (cacheSize + entry.size) <= maxCacheSize;
+      return (cacheSize + size) <= maxCacheSize;
     }
     // Negative max cache size means unbounded.
     return true;
   }
 
-  private boolean clearSpaceForCacheEntry(CacheEntry entry) {
-    if (hasSpaceForCacheEntry(entry)) {
+  private boolean clearSpaceForCacheEntry(CacheEntry entry, long size) {
+    if (hasSpaceForCacheEntry(entry, size)) {
       return true;
     }
 
     LOG.info("Clearing space for cache entry for query: [{}] with size {}",
-        entry.getQueryText(), entry.size);
+        entry.getQueryText(), size);
 
     // Entries should be in LRU order in the keyset iterator.
     CacheEntry[] entries = lru.keySet().toArray(EMPTY_CACHEENTRY_ARRAY);
     for (CacheEntry removalCandidate : entries) {
-      if (!removalCandidate.isValid()) {
-        // Likely an entry which is still getting its results moved to the cache directory.
+      if (removalCandidate.getStatus() != CacheEntryStatus.VALID) {
+        // Only entries marked as valid should have results that can be removed.
         continue;
       }
       // Only delete the entry if it has no readers.
       if (!(removalCandidate.numReaders() > 0)) {
         LOG.info("Removing entry: {}", removalCandidate);
         removeEntry(removalCandidate);
-        if (hasSpaceForCacheEntry(entry)) {
+        if (hasSpaceForCacheEntry(entry, size)) {
           return true;
         }
       }
     }
 
     LOG.info("Could not free enough space for cache entry for query: [{}] withe size {}",
-        entry.getQueryText(), entry.size);
+        entry.getQueryText(), size);
     return false;
   }
 
@@ -636,11 +694,11 @@ public final class QueryResultsCache {
 
   private void scheduleEntryInvalidation(final CacheEntry entry) {
     if (maxEntryLifetime >= 0) {
-      // Schedule task to invalidate cache entry.
+      // Schedule task to invalidate cache entry and remove from lookup.
       ScheduledFuture<?> future = invalidationExecutor.schedule(new Runnable() {
         @Override
         public void run() {
-          entry.invalidate();
+          removeEntry(entry);
         }
       }, maxEntryLifetime, TimeUnit.MILLISECONDS);
       entry.invalidationFuture = future;
@@ -648,9 +706,11 @@ public final class QueryResultsCache {
   }
 
   private static void cleanupEntry(final CacheEntry entry) {
-    Preconditions.checkState(!entry.isValid());
+    Preconditions.checkState(entry.getStatus() == CacheEntryStatus.INVALID);
+    final HiveConf conf = getInstance().conf;
 
-    if (entry.cachedResultsPath != null) {
+    if (entry.cachedResultsPath != null &&
+        !getInstance().zeroRowsPath.equals(entry.cachedResultsPath)) {
       deletionExecutor.execute(new Runnable() {
         @Override
         public void run() {

http://git-wip-us.apache.org/repos/asf/hive/blob/50e0077a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
index c65ad45..2342fff 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
@@ -14309,14 +14309,38 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
     }
     // Don't increment the reader count for explain queries.
     boolean isExplainQuery = (ctx.getExplainConfig() != null);
-    QueryResultsCache.CacheEntry cacheEntry =
-        QueryResultsCache.getInstance().lookup(lookupInfo, !isExplainQuery);
+    QueryResultsCache.CacheEntry cacheEntry = QueryResultsCache.getInstance().lookup(lookupInfo);
     if (cacheEntry != null) {
-      // Use the cache rather than full query execution.
-      useCachedResult(cacheEntry);
+      // Potentially wait on the cache entry if entry is in PENDING status
+      // Blocking here can potentially be dangerous - for example if the global compile lock
+      // is used this will block all subsequent queries that try to acquire the compile lock,
+      // so it should not be done unless parallel compilation is enabled.
+      // We might not want to block for explain queries as well.
+      if (cacheEntry.getStatus() == QueryResultsCache.CacheEntryStatus.PENDING) {
+        if (!isExplainQuery &&
+            conf.getBoolVar(HiveConf.ConfVars.HIVE_QUERY_RESULTS_CACHE_WAIT_FOR_PENDING_RESULTS) &&
+            conf.getBoolVar(HiveConf.ConfVars.HIVE_SERVER2_PARALLEL_COMPILATION)) {
+          if (!cacheEntry.waitForValidStatus()) {
+            LOG.info("Waiting on pending cacheEntry, but it failed to become valid");
+            return false;
+          }
+        } else {
+          LOG.info("Not waiting for pending cacheEntry");
+          return false;
+        }
+      }
 
-      // At this point the caller should return from semantic analysis.
-      return true;
+      if (cacheEntry.getStatus() == QueryResultsCache.CacheEntryStatus.VALID) {
+        if (!isExplainQuery) {
+          if (!cacheEntry.addReader()) {
+            return false;
+          }
+        }
+        // Use the cache rather than full query execution.
+        // At this point the caller should return from semantic analysis.
+        useCachedResult(cacheEntry);
+        return true;
+      }
     }
     return false;
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/50e0077a/ql/src/test/queries/clientpositive/results_cache_empty_result.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/results_cache_empty_result.q b/ql/src/test/queries/clientpositive/results_cache_empty_result.q
new file mode 100644
index 0000000..6213671
--- /dev/null
+++ b/ql/src/test/queries/clientpositive/results_cache_empty_result.q
@@ -0,0 +1,13 @@
+
+set hive.query.results.cache.enabled=true;
+
+explain
+select count(*), key from src a where key < 0 group by key;
+select count(*), key from src a where key < 0 group by key;
+
+set test.comment="Cache should be used for this query";
+set test.comment;
+explain
+select count(*), key from src a where key < 0 group by key;
+select count(*), key from src a where key < 0 group by key;
+

http://git-wip-us.apache.org/repos/asf/hive/blob/50e0077a/ql/src/test/results/clientpositive/llap/results_cache_empty_result.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/llap/results_cache_empty_result.q.out b/ql/src/test/results/clientpositive/llap/results_cache_empty_result.q.out
new file mode 100644
index 0000000..b63d079
--- /dev/null
+++ b/ql/src/test/results/clientpositive/llap/results_cache_empty_result.q.out
@@ -0,0 +1,99 @@
+PREHOOK: query: explain
+select count(*), key from src a where key < 0 group by key
+PREHOOK: type: QUERY
+POSTHOOK: query: explain
+select count(*), key from src a where key < 0 group by key
+POSTHOOK: type: QUERY
+STAGE DEPENDENCIES:
+  Stage-1 is a root stage
+  Stage-0 depends on stages: Stage-1
+
+STAGE PLANS:
+  Stage: Stage-1
+    Tez
+#### A masked pattern was here ####
+      Edges:
+        Reducer 2 <- Map 1 (SIMPLE_EDGE)
+#### A masked pattern was here ####
+      Vertices:
+        Map 1 
+            Map Operator Tree:
+                TableScan
+                  alias: a
+                  Statistics: Num rows: 500 Data size: 43500 Basic stats: COMPLETE Column stats: COMPLETE
+                  Filter Operator
+                    predicate: (UDFToDouble(key) < 0.0D) (type: boolean)
+                    Statistics: Num rows: 166 Data size: 14442 Basic stats: COMPLETE Column stats: COMPLETE
+                    Group By Operator
+                      aggregations: count()
+                      keys: key (type: string)
+                      mode: hash
+                      outputColumnNames: _col0, _col1
+                      Statistics: Num rows: 83 Data size: 7885 Basic stats: COMPLETE Column stats: COMPLETE
+                      Reduce Output Operator
+                        key expressions: _col0 (type: string)
+                        sort order: +
+                        Map-reduce partition columns: _col0 (type: string)
+                        Statistics: Num rows: 83 Data size: 7885 Basic stats: COMPLETE Column stats: COMPLETE
+                        value expressions: _col1 (type: bigint)
+            Execution mode: llap
+            LLAP IO: no inputs
+        Reducer 2 
+            Execution mode: llap
+            Reduce Operator Tree:
+              Group By Operator
+                aggregations: count(VALUE._col0)
+                keys: KEY._col0 (type: string)
+                mode: mergepartial
+                outputColumnNames: _col0, _col1
+                Statistics: Num rows: 83 Data size: 7885 Basic stats: COMPLETE Column stats: COMPLETE
+                Select Operator
+                  expressions: _col1 (type: bigint), _col0 (type: string)
+                  outputColumnNames: _col0, _col1
+                  Statistics: Num rows: 83 Data size: 7885 Basic stats: COMPLETE Column stats: COMPLETE
+                  File Output Operator
+                    compressed: false
+                    Statistics: Num rows: 83 Data size: 7885 Basic stats: COMPLETE Column stats: COMPLETE
+                    table:
+                        input format: org.apache.hadoop.mapred.SequenceFileInputFormat
+                        output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
+                        serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+
+  Stage: Stage-0
+    Fetch Operator
+      limit: -1
+      Processor Tree:
+        ListSink
+
+PREHOOK: query: select count(*), key from src a where key < 0 group by key
+PREHOOK: type: QUERY
+PREHOOK: Input: default@src
+#### A masked pattern was here ####
+POSTHOOK: query: select count(*), key from src a where key < 0 group by key
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@src
+#### A masked pattern was here ####
+test.comment="Cache should be used for this query"
+PREHOOK: query: explain
+select count(*), key from src a where key < 0 group by key
+PREHOOK: type: QUERY
+POSTHOOK: query: explain
+select count(*), key from src a where key < 0 group by key
+POSTHOOK: type: QUERY
+STAGE DEPENDENCIES:
+  Stage-0 is a root stage
+
+STAGE PLANS:
+  Stage: Stage-0
+    Fetch Operator
+      limit: -1
+      Processor Tree:
+        ListSink
+      Cached Query Result: true
+
+PREHOOK: query: select count(*), key from src a where key < 0 group by key
+PREHOOK: type: QUERY
+PREHOOK: Input: default@src
+POSTHOOK: query: select count(*), key from src a where key < 0 group by key
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@src

http://git-wip-us.apache.org/repos/asf/hive/blob/50e0077a/ql/src/test/results/clientpositive/results_cache_empty_result.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/results_cache_empty_result.q.out b/ql/src/test/results/clientpositive/results_cache_empty_result.q.out
new file mode 100644
index 0000000..000e5f0
--- /dev/null
+++ b/ql/src/test/results/clientpositive/results_cache_empty_result.q.out
@@ -0,0 +1,91 @@
+PREHOOK: query: explain
+select count(*), key from src a where key < 0 group by key
+PREHOOK: type: QUERY
+POSTHOOK: query: explain
+select count(*), key from src a where key < 0 group by key
+POSTHOOK: type: QUERY
+STAGE DEPENDENCIES:
+  Stage-1 is a root stage
+  Stage-0 depends on stages: Stage-1
+
+STAGE PLANS:
+  Stage: Stage-1
+    Map Reduce
+      Map Operator Tree:
+          TableScan
+            alias: a
+            Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
+            Filter Operator
+              predicate: (UDFToDouble(key) < 0.0D) (type: boolean)
+              Statistics: Num rows: 166 Data size: 1763 Basic stats: COMPLETE Column stats: NONE
+              Group By Operator
+                aggregations: count()
+                keys: key (type: string)
+                mode: hash
+                outputColumnNames: _col0, _col1
+                Statistics: Num rows: 166 Data size: 1763 Basic stats: COMPLETE Column stats: NONE
+                Reduce Output Operator
+                  key expressions: _col0 (type: string)
+                  sort order: +
+                  Map-reduce partition columns: _col0 (type: string)
+                  Statistics: Num rows: 166 Data size: 1763 Basic stats: COMPLETE Column stats: NONE
+                  value expressions: _col1 (type: bigint)
+      Reduce Operator Tree:
+        Group By Operator
+          aggregations: count(VALUE._col0)
+          keys: KEY._col0 (type: string)
+          mode: mergepartial
+          outputColumnNames: _col0, _col1
+          Statistics: Num rows: 83 Data size: 881 Basic stats: COMPLETE Column stats: NONE
+          Select Operator
+            expressions: _col1 (type: bigint), _col0 (type: string)
+            outputColumnNames: _col0, _col1
+            Statistics: Num rows: 83 Data size: 881 Basic stats: COMPLETE Column stats: NONE
+            File Output Operator
+              compressed: false
+              Statistics: Num rows: 83 Data size: 881 Basic stats: COMPLETE Column stats: NONE
+              table:
+                  input format: org.apache.hadoop.mapred.SequenceFileInputFormat
+                  output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
+                  serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+
+  Stage: Stage-0
+    Fetch Operator
+      limit: -1
+      Processor Tree:
+        ListSink
+
+PREHOOK: query: select count(*), key from src a where key < 0 group by key
+PREHOOK: type: QUERY
+PREHOOK: Input: default@src
+#### A masked pattern was here ####
+POSTHOOK: query: select count(*), key from src a where key < 0 group by key
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@src
+#### A masked pattern was here ####
+test.comment="Cache should be used for this query"
+PREHOOK: query: explain
+select count(*), key from src a where key < 0 group by key
+PREHOOK: type: QUERY
+POSTHOOK: query: explain
+select count(*), key from src a where key < 0 group by key
+POSTHOOK: type: QUERY
+STAGE DEPENDENCIES:
+  Stage-0 is a root stage
+
+STAGE PLANS:
+  Stage: Stage-0
+    Fetch Operator
+      limit: -1
+      Processor Tree:
+        ListSink
+      Cached Query Result: true
+
+PREHOOK: query: select count(*), key from src a where key < 0 group by key
+PREHOOK: type: QUERY
+PREHOOK: Input: default@src
+#### A masked pattern was here ####
+POSTHOOK: query: select count(*), key from src a where key < 0 group by key
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@src
+#### A masked pattern was here ####