You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by jd...@apache.org on 2018/02/07 21:40:43 UTC

[3/3] hive git commit: HIVE-18513: Query results caching (Jason Dere, reviewed by Jesus Camacho Rodriguez)

HIVE-18513: Query results caching (Jason Dere, reviewed by Jesus Camacho Rodriguez)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/1733a371
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/1733a371
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/1733a371

Branch: refs/heads/master
Commit: 1733a3712f0c548f9181e6e5b5298f466cad755e
Parents: 075077d
Author: Jason Dere <jd...@hortonworks.com>
Authored: Wed Feb 7 13:39:41 2018 -0800
Committer: Jason Dere <jd...@hortonworks.com>
Committed: Wed Feb 7 13:39:41 2018 -0800

----------------------------------------------------------------------
 .../org/apache/hadoop/hive/conf/HiveConf.java   |  21 +
 .../apache/hadoop/hive/ql/log/PerfLogger.java   |   1 +
 data/conf/hive-site.xml                         |   5 +
 data/conf/llap/hive-site.xml                    |   5 +
 data/conf/perf-reg/spark/hive-site.xml          |   5 +
 data/conf/perf-reg/tez/hive-site.xml            |   5 +
 data/conf/rlist/hive-site.xml                   |   5 +
 data/conf/spark/local/hive-site.xml             |   5 +
 data/conf/spark/standalone/hive-site.xml        |   5 +
 data/conf/spark/yarn-client/hive-site.xml       |   5 +
 data/conf/tez/hive-site.xml                     |   5 +
 .../src/test/resources/hive-site.xml            |   5 +
 .../test/resources/testconfiguration.properties |   1 +
 .../org/apache/hadoop/hive/ql/QTestUtil.java    |   4 +
 .../java/org/apache/hadoop/hive/ql/Driver.java  |  64 ++
 .../hive/ql/cache/results/CacheUsage.java       |  73 ++
 .../ql/cache/results/QueryResultsCache.java     | 666 +++++++++++++++++++
 .../apache/hadoop/hive/ql/exec/Utilities.java   |  33 +
 .../ql/optimizer/calcite/HiveCalciteUtil.java   |  38 ++
 .../HiveRelOpMaterializationValidator.java      | 285 ++++++++
 .../hive/ql/parse/BaseSemanticAnalyzer.java     |  13 +
 .../hadoop/hive/ql/parse/CalcitePlanner.java    |  11 +
 .../hadoop/hive/ql/parse/QBParseInfo.java       |   3 +
 .../hadoop/hive/ql/parse/SemanticAnalyzer.java  | 273 ++++++++
 .../apache/hadoop/hive/ql/plan/FetchWork.java   |  13 +
 .../hadoop/hive/ql/session/SessionState.java    |  22 +-
 .../queries/clientpositive/results_cache_1.q    |  96 +++
 .../queries/clientpositive/results_cache_2.q    |  41 ++
 .../clientpositive/results_cache_capacity.q     |  52 ++
 .../clientpositive/results_cache_lifetime.q     |  14 +
 .../clientpositive/results_cache_temptable.q    |  42 ++
 .../clientpositive/results_cache_with_masking.q |  17 +
 .../clientpositive/llap/results_cache_1.q.out   | 584 ++++++++++++++++
 .../clientpositive/results_cache_1.q.out        | 579 ++++++++++++++++
 .../clientpositive/results_cache_2.q.out        | 176 +++++
 .../clientpositive/results_cache_capacity.q.out | 238 +++++++
 .../clientpositive/results_cache_lifetime.q.out | 112 ++++
 .../results_cache_temptable.q.out               | 293 ++++++++
 .../results_cache_with_masking.q.out            | 106 +++
 .../apache/hive/service/server/HiveServer2.java |  10 +
 40 files changed, 3910 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/1733a371/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 26e08e4..67e22f6 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -3707,6 +3707,27 @@ public class HiveConf extends Configuration {
     HIVE_EXEC_INPUT_LISTING_MAX_THREADS("hive.exec.input.listing.max.threads", 0, new  SizeValidator(0L, true, 1024L, true),
         "Maximum number of threads that Hive uses to list file information from file systems (recommended > 1 for blobstore)."),
 
+    HIVE_QUERY_RESULTS_CACHE_ENABLED("hive.query.results.cache.enabled", true,
+        "If the query results cache is enabled. This will keep results of previously executed queries " +
+        "to be reused if the same query is executed again."),
+
+    HIVE_QUERY_RESULTS_CACHE_DIRECTORY("hive.query.results.cache.directory",
+        "/tmp/hive/_resultscache_",
+        "Location of the query results cache directory. Temporary results from queries " +
+        "will be moved to this location."),
+
+    HIVE_QUERY_RESULTS_CACHE_MAX_ENTRY_LIFETIME("hive.query.results.cache.max.entry.lifetime", "3600s",
+        new TimeValidator(TimeUnit.SECONDS),
+        "Maximum lifetime in seconds for an entry in the query results cache. A nonpositive value means infinite."),
+
+    HIVE_QUERY_RESULTS_CACHE_MAX_SIZE("hive.query.results.cache.max.size",
+        (long) 2 * 1024 * 1024 * 1024,
+        "Maximum total size in bytes that the query results cache directory is allowed to use on the filesystem."),
+
+    HIVE_QUERY_RESULTS_CACHE_MAX_ENTRY_SIZE("hive.query.results.cache.max.entry.size",
+        (long) 10 * 1024 * 1024,
+        "Maximum size in bytes that a single query result is allowed to use in the results cache directory"),
+
     /* BLOBSTORE section */
 
     HIVE_BLOBSTORE_SUPPORTED_SCHEMES("hive.blobstore.supported.schemes", "s3,s3a,s3n",

http://git-wip-us.apache.org/repos/asf/hive/blob/1733a371/common/src/java/org/apache/hadoop/hive/ql/log/PerfLogger.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/ql/log/PerfLogger.java b/common/src/java/org/apache/hadoop/hive/ql/log/PerfLogger.java
index 2767bca..764a832 100644
--- a/common/src/java/org/apache/hadoop/hive/ql/log/PerfLogger.java
+++ b/common/src/java/org/apache/hadoop/hive/ql/log/PerfLogger.java
@@ -71,6 +71,7 @@ public class PerfLogger {
   public static final String TEZ_INIT_OPERATORS = "TezInitializeOperators";
   public static final String LOAD_HASHTABLE = "LoadHashtable";
   public static final String TEZ_GET_SESSION = "TezGetSession";
+  public static final String SAVE_TO_RESULTS_CACHE = "saveToResultsCache";
 
   public static final String SPARK_SUBMIT_TO_RUNNING = "SparkSubmitToRunning";
   public static final String SPARK_BUILD_PLAN = "SparkBuildPlan";

http://git-wip-us.apache.org/repos/asf/hive/blob/1733a371/data/conf/hive-site.xml
----------------------------------------------------------------------
diff --git a/data/conf/hive-site.xml b/data/conf/hive-site.xml
index 01f83d1..b56cbd2 100644
--- a/data/conf/hive-site.xml
+++ b/data/conf/hive-site.xml
@@ -328,4 +328,9 @@
   <value>99</value>
 </property>
 
+<property>
+  <name>hive.query.results.cache.enabled</name>
+  <value>false</value>
+</property>
+
 </configuration>

http://git-wip-us.apache.org/repos/asf/hive/blob/1733a371/data/conf/llap/hive-site.xml
----------------------------------------------------------------------
diff --git a/data/conf/llap/hive-site.xml b/data/conf/llap/hive-site.xml
index cdda875..c4c299c 100644
--- a/data/conf/llap/hive-site.xml
+++ b/data/conf/llap/hive-site.xml
@@ -348,4 +348,9 @@
   <value>99</value>
 </property>
 
+<property>
+  <name>hive.query.results.cache.enabled</name>
+  <value>false</value>
+</property>
+
 </configuration>

http://git-wip-us.apache.org/repos/asf/hive/blob/1733a371/data/conf/perf-reg/spark/hive-site.xml
----------------------------------------------------------------------
diff --git a/data/conf/perf-reg/spark/hive-site.xml b/data/conf/perf-reg/spark/hive-site.xml
index 497a61f..5ca660d 100644
--- a/data/conf/perf-reg/spark/hive-site.xml
+++ b/data/conf/perf-reg/spark/hive-site.xml
@@ -265,4 +265,9 @@
   <value>org.apache.hadoop.hive.metastore.ObjectStore</value>
 </property>
 
+<property>
+  <name>hive.query.results.cache.enabled</name>
+  <value>false</value>
+</property>
+
 </configuration>

http://git-wip-us.apache.org/repos/asf/hive/blob/1733a371/data/conf/perf-reg/tez/hive-site.xml
----------------------------------------------------------------------
diff --git a/data/conf/perf-reg/tez/hive-site.xml b/data/conf/perf-reg/tez/hive-site.xml
index 012369f..62ecb74 100644
--- a/data/conf/perf-reg/tez/hive-site.xml
+++ b/data/conf/perf-reg/tez/hive-site.xml
@@ -282,4 +282,9 @@
   <value>true</value>
 </property>
 
+<property>
+  <name>hive.query.results.cache.enabled</name>
+  <value>false</value>
+</property>
+
 </configuration>

http://git-wip-us.apache.org/repos/asf/hive/blob/1733a371/data/conf/rlist/hive-site.xml
----------------------------------------------------------------------
diff --git a/data/conf/rlist/hive-site.xml b/data/conf/rlist/hive-site.xml
index 9de00e5..630e481 100644
--- a/data/conf/rlist/hive-site.xml
+++ b/data/conf/rlist/hive-site.xml
@@ -319,4 +319,9 @@
   <value>99</value>
 </property>
 
+<property>
+  <name>hive.query.results.cache.enabled</name>
+  <value>false</value>
+</property>
+
 </configuration>

http://git-wip-us.apache.org/repos/asf/hive/blob/1733a371/data/conf/spark/local/hive-site.xml
----------------------------------------------------------------------
diff --git a/data/conf/spark/local/hive-site.xml b/data/conf/spark/local/hive-site.xml
index fd0e6a0..8ff6256 100644
--- a/data/conf/spark/local/hive-site.xml
+++ b/data/conf/spark/local/hive-site.xml
@@ -261,4 +261,9 @@
   <value>false</value>
 </property>
 
+<property>
+  <name>hive.query.results.cache.enabled</name>
+  <value>false</value>
+</property>
+
 </configuration>

http://git-wip-us.apache.org/repos/asf/hive/blob/1733a371/data/conf/spark/standalone/hive-site.xml
----------------------------------------------------------------------
diff --git a/data/conf/spark/standalone/hive-site.xml b/data/conf/spark/standalone/hive-site.xml
index 7095979..84851c7 100644
--- a/data/conf/spark/standalone/hive-site.xml
+++ b/data/conf/spark/standalone/hive-site.xml
@@ -266,4 +266,9 @@
   <value>false</value>
 </property>
 
+<property>
+  <name>hive.query.results.cache.enabled</name>
+  <value>false</value>
+</property>
+
 </configuration>

http://git-wip-us.apache.org/repos/asf/hive/blob/1733a371/data/conf/spark/yarn-client/hive-site.xml
----------------------------------------------------------------------
diff --git a/data/conf/spark/yarn-client/hive-site.xml b/data/conf/spark/yarn-client/hive-site.xml
index a9a788b..6c63362 100644
--- a/data/conf/spark/yarn-client/hive-site.xml
+++ b/data/conf/spark/yarn-client/hive-site.xml
@@ -306,4 +306,9 @@
   <value>false</value>
 </property>
 
+<property>
+  <name>hive.query.results.cache.enabled</name>
+  <value>false</value>
+</property>
+
 </configuration>

http://git-wip-us.apache.org/repos/asf/hive/blob/1733a371/data/conf/tez/hive-site.xml
----------------------------------------------------------------------
diff --git a/data/conf/tez/hive-site.xml b/data/conf/tez/hive-site.xml
index 4519678..236adc7 100644
--- a/data/conf/tez/hive-site.xml
+++ b/data/conf/tez/hive-site.xml
@@ -293,4 +293,9 @@
   <value>99</value>
 </property>
 
+<property>
+  <name>hive.query.results.cache.enabled</name>
+  <value>false</value>
+</property>
+
 </configuration>

http://git-wip-us.apache.org/repos/asf/hive/blob/1733a371/itests/hive-blobstore/src/test/resources/hive-site.xml
----------------------------------------------------------------------
diff --git a/itests/hive-blobstore/src/test/resources/hive-site.xml b/itests/hive-blobstore/src/test/resources/hive-site.xml
index 038db0d..775c559 100644
--- a/itests/hive-blobstore/src/test/resources/hive-site.xml
+++ b/itests/hive-blobstore/src/test/resources/hive-site.xml
@@ -284,6 +284,11 @@
     <value>hdfs,pfile,file,s3,s3a,pblob</value>
   </property>
 
+  <property>
+    <name>hive.query.results.cache.enabled</name>
+    <value>false</value>
+  </property>
+
   <!--
   To run these tests:
   # Create a file blobstore-conf.xml  - DO NOT ADD TO REVISION CONTROL

http://git-wip-us.apache.org/repos/asf/hive/blob/1733a371/itests/src/test/resources/testconfiguration.properties
----------------------------------------------------------------------
diff --git a/itests/src/test/resources/testconfiguration.properties b/itests/src/test/resources/testconfiguration.properties
index 9a76b85..2a22db9 100644
--- a/itests/src/test/resources/testconfiguration.properties
+++ b/itests/src/test/resources/testconfiguration.properties
@@ -254,6 +254,7 @@ minillaplocal.shared.query.files=alter_merge_2_orc.q,\
   ptf.q,\
   ptf_matchpath.q,\
   ptf_streaming.q,\
+  results_cache_1.q,\
   sample1.q,\
   selectDistinctStar.q,\
   select_dummy_source.q,\

http://git-wip-us.apache.org/repos/asf/hive/blob/1733a371/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java
----------------------------------------------------------------------
diff --git a/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java b/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java
index 2d0aca0..fcce531 100644
--- a/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java
+++ b/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java
@@ -101,6 +101,7 @@ import org.apache.hadoop.hive.llap.daemon.MiniLlapCluster;
 import org.apache.hadoop.hive.llap.io.api.LlapProxy;
 import org.apache.hadoop.hive.metastore.Warehouse;
 import org.apache.hadoop.hive.metastore.api.Index;
+import org.apache.hadoop.hive.ql.cache.results.QueryResultsCache;
 import org.apache.hadoop.hive.ql.exec.FunctionRegistry;
 import org.apache.hadoop.hive.ql.exec.Task;
 import org.apache.hadoop.hive.ql.exec.Utilities;
@@ -1013,6 +1014,9 @@ public class QTestUtil {
       return;
     }
 
+    // Remove any cached results from the previous test.
+    QueryResultsCache.cleanupInstance();
+
     // allocate and initialize a new conf since a test can
     // modify conf by using 'set' commands
     conf = new HiveConf(IDriver.class);

http://git-wip-us.apache.org/repos/asf/hive/blob/1733a371/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
index 2d7e459..c6f7d64 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
@@ -55,6 +55,9 @@ import org.apache.hadoop.hive.metastore.ColumnType;
 import org.apache.hadoop.hive.metastore.HiveMetaStoreUtils;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.hadoop.hive.metastore.api.Schema;
+import org.apache.hadoop.hive.ql.cache.results.CacheUsage;
+import org.apache.hadoop.hive.ql.cache.results.QueryResultsCache;
+import org.apache.hadoop.hive.ql.cache.results.QueryResultsCache.CacheEntry;
 import org.apache.hadoop.hive.ql.exec.ConditionalTask;
 import org.apache.hadoop.hive.ql.exec.DagUtils;
 import org.apache.hadoop.hive.ql.exec.ExplainTask;
@@ -188,6 +191,9 @@ public class Driver implements IDriver {
   // either initTxnMgr or from the SessionState, in that order.
   private HiveTxnManager queryTxnMgr;
 
+  private CacheUsage cacheUsage;
+  private CacheEntry usedCacheEntry;
+
   private enum DriverState {
     INITIALIZED,
     COMPILING,
@@ -638,6 +644,11 @@ public class Driver implements IDriver {
       }
       LOG.info("Semantic Analysis Completed");
 
+      // Retrieve information about cache usage for the query.
+      if (conf.getBoolVar(HiveConf.ConfVars.HIVE_QUERY_RESULTS_CACHE_ENABLED)) {
+        cacheUsage = sem.getCacheUsage();
+      }
+
       // validate the plan
       sem.validate();
       perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.ANALYZE);
@@ -1788,6 +1799,45 @@ public class Driver implements IDriver {
     return new CommandProcessorResponse(ret, errorMessage, SQLState, downstreamError);
   }
 
+  private void useFetchFromCache(CacheEntry cacheEntry) {
+    // Change query FetchTask to use new location specified in results cache.
+    FetchTask fetchTaskFromCache = (FetchTask) TaskFactory.get(cacheEntry.getFetchWork(), conf);
+    fetchTaskFromCache.initialize(queryState, plan, null, ctx.getOpContext());
+    plan.setFetchTask(fetchTaskFromCache);
+    cacheUsage = new CacheUsage(CacheUsage.CacheStatus.QUERY_USING_CACHE, cacheEntry);
+  }
+
+  private void checkCacheUsage() throws Exception {
+    if (cacheUsage != null) {
+      if (cacheUsage.getStatus() == CacheUsage.CacheStatus.QUERY_USING_CACHE) {
+        // Using a previously cached result.
+        CacheEntry cacheEntry = cacheUsage.getCacheEntry();
+
+        // Reader count already incremented during cache lookup.
+        // Save to usedCacheEntry to ensure reader is released after query.
+        usedCacheEntry = cacheEntry;
+      } else if (cacheUsage.getStatus() == CacheUsage.CacheStatus.CAN_CACHE_QUERY_RESULTS &&
+          plan.getFetchTask() != null) {
+        // The query could not be resolved using the cache, but the query results
+        // can be added to the cache for future queries to use.
+        PerfLogger perfLogger = SessionState.getPerfLogger();
+        perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.SAVE_TO_RESULTS_CACHE);
+
+        CacheEntry savedCacheEntry =
+            QueryResultsCache.getInstance().addToCache(
+                cacheUsage.getQueryInfo(),
+                plan.getFetchTask().getWork());
+        if (savedCacheEntry != null) {
+          useFetchFromCache(savedCacheEntry);
+          // addToCache() already increments the reader count. Set usedCacheEntry so it gets released.
+          usedCacheEntry = savedCacheEntry;
+        }
+
+        perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.SAVE_TO_RESULTS_CACHE);
+      }
+    }
+  }
+
   private void execute() throws CommandProcessorResponse {
     PerfLogger perfLogger = SessionState.getPerfLogger();
     perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.DRIVER_EXECUTE);
@@ -2002,6 +2052,8 @@ public class Driver implements IDriver {
       }
       perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.RUN_TASKS);
 
+      checkCacheUsage();
+
       // in case we decided to run everything in local mode, restore the
       // the jobtracker setting to its initial value
       ctx.restoreOriginalTracker();
@@ -2410,12 +2462,23 @@ public class Driver implements IDriver {
       LOG.debug(" Exception while clearing the FetchTask ", e);
     }
   }
+
+  private void releaseCachedResult() {
+    // Assumes the reader count has been incremented automatically by the results cache by either
+    // lookup or creating the cache entry.
+    if (usedCacheEntry != null) {
+      usedCacheEntry.releaseReader();
+      usedCacheEntry = null;
+    }
+  }
+
   // Close and release resources within a running query process. Since it runs under
   // driver state COMPILING, EXECUTING or INTERRUPT, it would not have race condition
   // with the releases probably running in the other closing thread.
   private int closeInProcess(boolean destroyed) {
     releaseDriverContext();
     releasePlan();
+    releaseCachedResult();
     releaseFetchTask();
     releaseResStream();
     releaseContext();
@@ -2445,6 +2508,7 @@ public class Driver implements IDriver {
         return 0;
       }
       releasePlan();
+      releaseCachedResult();
       releaseFetchTask();
       releaseResStream();
       releaseContext();

http://git-wip-us.apache.org/repos/asf/hive/blob/1733a371/ql/src/java/org/apache/hadoop/hive/ql/cache/results/CacheUsage.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/cache/results/CacheUsage.java b/ql/src/java/org/apache/hadoop/hive/ql/cache/results/CacheUsage.java
new file mode 100644
index 0000000..08b791a
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/cache/results/CacheUsage.java
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.cache.results;
+
+import org.apache.hadoop.hive.ql.cache.results.QueryResultsCache.CacheEntry;
+import org.apache.hadoop.hive.ql.cache.results.QueryResultsCache.QueryInfo;
+
+/**
+ * Helper class during semantic analysis that indicates if the query can use the cache,
+ * or if the results from the query can be added to the results cache.
+ */
+public class CacheUsage {
+
+  public enum CacheStatus {
+    CACHE_NOT_USED,
+    QUERY_USING_CACHE,
+    CAN_CACHE_QUERY_RESULTS,
+  };
+
+  private CacheUsage.CacheStatus status;
+  private CacheEntry cacheEntry;
+  private QueryInfo queryInfo;
+
+  public CacheUsage(CacheStatus status, CacheEntry cacheEntry) {
+    this.status = status;
+    this.cacheEntry = cacheEntry;
+  }
+
+  public CacheUsage(CacheStatus status, QueryInfo queryInfo) {
+    this.status = status;
+    this.queryInfo = queryInfo;
+  }
+
+  public CacheUsage.CacheStatus getStatus() {
+    return status;
+  }
+
+  public void setStatus(CacheUsage.CacheStatus status) {
+    this.status = status;
+  }
+
+  public CacheEntry getCacheEntry() {
+    return cacheEntry;
+  }
+
+  public void setCacheEntry(CacheEntry cacheEntry) {
+    this.cacheEntry = cacheEntry;
+  }
+
+  public QueryInfo getQueryInfo() {
+    return queryInfo;
+  }
+
+  public void setQueryInfo(QueryInfo queryInfo) {
+    this.queryInfo = queryInfo;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/1733a371/ql/src/java/org/apache/hadoop/hive/ql/cache/results/QueryResultsCache.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/cache/results/QueryResultsCache.java b/ql/src/java/org/apache/hadoop/hive/ql/cache/results/QueryResultsCache.java
new file mode 100644
index 0000000..131127e
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/cache/results/QueryResultsCache.java
@@ -0,0 +1,666 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.cache.results;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import org.apache.hadoop.fs.ContentSummary;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.hooks.Entity.Type;
+import org.apache.hadoop.hive.ql.hooks.ReadEntity;
+import org.apache.hadoop.hive.ql.metadata.SessionHiveMetaStoreClient;
+import org.apache.hadoop.hive.ql.metadata.Table;
+import org.apache.hadoop.hive.ql.parse.ColumnAccessInfo;
+import org.apache.hadoop.hive.ql.parse.TableAccessInfo;
+import org.apache.hadoop.hive.ql.plan.FetchWork;
+import org.apache.hadoop.hive.ql.plan.HiveOperation;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A class to handle management and lookup of cached Hive query results.
+ */
+public final class QueryResultsCache {
+
+  private static final Logger LOG = LoggerFactory.getLogger(QueryResultsCache.class);
+
+  public static class LookupInfo {
+    private String queryText;
+
+    public LookupInfo(String queryText) {
+      super();
+      this.queryText = queryText;
+    }
+
+    public String getQueryText() {
+      return queryText;
+    }
+  }
+
+  public static class QueryInfo {
+    private LookupInfo lookupInfo;
+    private HiveOperation hiveOperation;
+    private List<FieldSchema> resultSchema;
+    private TableAccessInfo tableAccessInfo;
+    private ColumnAccessInfo columnAccessInfo;
+    private Set<ReadEntity> inputs;
+
+    public QueryInfo(
+        LookupInfo lookupInfo,
+        HiveOperation hiveOperation,
+        List<FieldSchema> resultSchema,
+        TableAccessInfo tableAccessInfo,
+        ColumnAccessInfo columnAccessInfo,
+        Set<ReadEntity> inputs) {
+      this.lookupInfo = lookupInfo;
+      this.hiveOperation = hiveOperation;
+      this.resultSchema = resultSchema;
+      this.tableAccessInfo = tableAccessInfo;
+      this.columnAccessInfo = columnAccessInfo;
+      this.inputs = inputs;
+    }
+
+    public LookupInfo getLookupInfo() {
+      return lookupInfo;
+    }
+
+    public void setLookupInfo(LookupInfo lookupInfo) {
+      this.lookupInfo = lookupInfo;
+    }
+
+    public HiveOperation getHiveOperation() {
+      return hiveOperation;
+    }
+
+    public void setHiveOperation(HiveOperation hiveOperation) {
+      this.hiveOperation = hiveOperation;
+    }
+
+    public List<FieldSchema> getResultSchema() {
+      return resultSchema;
+    }
+
+    public void setResultSchema(List<FieldSchema> resultSchema) {
+      this.resultSchema = resultSchema;
+    }
+
+    public TableAccessInfo getTableAccessInfo() {
+      return tableAccessInfo;
+    }
+
+    public void setTableAccessInfo(TableAccessInfo tableAccessInfo) {
+      this.tableAccessInfo = tableAccessInfo;
+    }
+
+    public ColumnAccessInfo getColumnAccessInfo() {
+      return columnAccessInfo;
+    }
+
+    public void setColumnAccessInfo(ColumnAccessInfo columnAccessInfo) {
+      this.columnAccessInfo = columnAccessInfo;
+    }
+
+    public Set<ReadEntity> getInputs() {
+      return inputs;
+    }
+
+    public void setInputs(Set<ReadEntity> inputs) {
+      this.inputs = inputs;
+    }
+  }
+
+  public static class CacheEntry {
+    private QueryInfo queryInfo;
+    private FetchWork fetchWork;
+    private Path cachedResultsPath;
+
+    // Cache administration
+    private long createTime;
+    private long size;
+    private AtomicBoolean valid = new AtomicBoolean(false);
+    private AtomicInteger readers = new AtomicInteger(0);
+    private ScheduledFuture<?> invalidationFuture = null;
+
+    public boolean isValid() {
+      return valid.get();
+    }
+
+    public void releaseReader() {
+      int readerCount = 0;
+      synchronized (this) {
+        readerCount = readers.decrementAndGet();
+      }
+      LOG.debug("releaseReader: entry: {}, readerCount: {}", this, readerCount);
+      Preconditions.checkState(readerCount >= 0);
+
+      cleanupIfNeeded();
+    }
+
+    public String toString() {
+      return "CacheEntry query: [" + getQueryInfo().getLookupInfo().getQueryText()
+          + "], location: " + cachedResultsPath
+          + ", size: " + size;
+    }
+
+    public boolean addReader() {
+      boolean added = false;
+      int readerCount = 0;
+      synchronized (this) {
+        if (valid.get()) {
+          readerCount = readers.incrementAndGet();
+          added = true;
+        }
+      }
+      Preconditions.checkState(readerCount > 0);
+      LOG.debug("addReader: entry: {}, readerCount: {}", this, readerCount);
+      return added;
+    }
+
+    private int numReaders() {
+      return readers.get();
+    }
+
+    private void invalidate() {
+      boolean wasValid = setValidity(false);
+
+      if (wasValid) {
+        LOG.info("Invalidated cache entry: {}", this);
+
+        if (invalidationFuture != null) {
+          // The cache entry has just been invalidated, no need for the scheduled invalidation.
+          invalidationFuture.cancel(false);
+        }
+        cleanupIfNeeded();
+      }
+    }
+
+    /**
+     * Set the validity, returning the previous validity value.
+     * @param valid
+     * @return
+     */
+    private boolean setValidity(boolean valid) {
+      synchronized(this) {
+        return this.valid.getAndSet(valid);
+      }
+    }
+
+    private void cleanupIfNeeded() {
+      if (!isValid() && readers.get() <= 0) {
+        QueryResultsCache.cleanupEntry(this);
+      }
+    }
+
+    private String getQueryText() {
+      return getQueryInfo().getLookupInfo().getQueryText();
+    }
+
+    public FetchWork getFetchWork() {
+      return fetchWork;
+    }
+
+    public QueryInfo getQueryInfo() {
+      return queryInfo;
+    }
+
+    public Path getCachedResultsPath() {
+      return cachedResultsPath;
+    }
+  }
+
+  // Allow lookup by query string
+  private final Map<String, Set<CacheEntry>> queryMap = new HashMap<String, Set<CacheEntry>>();
+
+  // LRU. Could also implement LRU as a doubly linked list if CacheEntry keeps its node.
+  // Use synchronized map since even read actions cause the lru to get updated.
+  private final Map<CacheEntry, CacheEntry> lru = Collections.synchronizedMap(
+      new LinkedHashMap<CacheEntry, CacheEntry>(INITIAL_LRU_SIZE, LRU_LOAD_FACTOR, true));
+
+  private final HiveConf conf;
+  private Path cacheDirPath;
+  private long cacheSize = 0;
+  private long maxCacheSize;
+  private long maxEntrySize;
+  private long maxEntryLifetime;
+  private ReadWriteLock rwLock = new ReentrantReadWriteLock();
+
+  private QueryResultsCache(HiveConf configuration) throws IOException {
+    this.conf = configuration;
+
+    // Set up cache directory
+    Path rootCacheDir = new Path(conf.getVar(HiveConf.ConfVars.HIVE_QUERY_RESULTS_CACHE_DIRECTORY));
+    LOG.info("Initializing query results cache at {}", rootCacheDir);
+    Utilities.ensurePathIsWritable(rootCacheDir, conf);
+
+    String currentCacheDirName = "results-" + UUID.randomUUID().toString();
+    cacheDirPath = new Path(rootCacheDir, currentCacheDirName);
+    FileSystem fs = cacheDirPath.getFileSystem(conf);
+    FsPermission fsPermission = new FsPermission("700");
+    fs.mkdirs(cacheDirPath, fsPermission);
+
+    // Results cache directory should be cleaned up at process termination.
+    fs.deleteOnExit(cacheDirPath);
+
+    maxCacheSize = conf.getLongVar(HiveConf.ConfVars.HIVE_QUERY_RESULTS_CACHE_MAX_SIZE);
+    maxEntrySize = conf.getLongVar(HiveConf.ConfVars.HIVE_QUERY_RESULTS_CACHE_MAX_ENTRY_SIZE);
+    maxEntryLifetime = conf.getTimeVar(
+        HiveConf.ConfVars.HIVE_QUERY_RESULTS_CACHE_MAX_ENTRY_LIFETIME,
+        TimeUnit.MILLISECONDS);
+
+    LOG.info("Query results cache: cacheDirectory {}, maxCacheSize {}, maxEntrySize {}, maxEntryLifetime {}",
+        cacheDirPath, maxCacheSize, maxEntrySize, maxEntryLifetime);
+  }
+
+  private static final AtomicBoolean inited = new AtomicBoolean(false);
+  private static QueryResultsCache instance;
+
+  public static void initialize(HiveConf conf) throws IOException {
+    if (!inited.getAndSet(true)) {
+      try {
+        instance = new QueryResultsCache(conf);
+      } catch (IOException err) {
+        inited.set(false);
+        throw err;
+      }
+    }
+  }
+
+  public static QueryResultsCache getInstance() {
+    return instance;
+  }
+
+  /**
+   * Check if the cache contains an entry for the requested LookupInfo.
+   * @param request
+   * @param addReader Should the reader count be incremented during the lookup.
+   *        This will ensure the returned entry can be used after the lookup.
+   *        If true, the caller will be responsible for decrementing the reader count
+   *        using CacheEntry.releaseReader().
+   * @return  The cached result if there is a match in the cache, or null if no match is found.
+   */
+  public CacheEntry lookup(LookupInfo request, boolean addReader) {
+    CacheEntry result = null;
+
+    LOG.debug("QueryResultsCache lookup for query: {}", request.queryText);
+
+    Lock readLock = rwLock.readLock();
+    try {
+      readLock.lock();
+      Set<CacheEntry> candidates = queryMap.get(request.queryText);
+      if (candidates != null) {
+        for (CacheEntry candidate : candidates) {
+          if (entryMatches(request, candidate)) {
+            result = candidate;
+            break;
+          }
+        }
+
+        if (result != null) {
+          lru.get(result);  // Update LRU
+
+          if (!result.isValid()) {
+            // Entry is in the cache, but not valid.
+            // This can happen when the entry is first added, before the data has been moved
+            // to the results cache directory. We cannot use this entry yet.
+            result = null;
+          } else {
+            if (addReader) {
+              // Caller will need to be responsible for releasing the reader count.
+              result.addReader();
+            }
+          }
+        }
+      }
+    } finally {
+      readLock.unlock();
+    }
+
+    LOG.debug("QueryResultsCache lookup result: {}", result);
+
+    return result;
+  }
+
+  /**
+   * Add an entry to the query results cache.
+   * Important: Adding the entry to the cache will increment the reader count for the cache entry.
+   * CacheEntry.releaseReader() should be called when the caller is done with the cache entry.
+   *
+   * @param queryInfo
+   * @param fetchWork
+   * @return The entry if added to the cache. null if the entry is not added.
+   */
+  public CacheEntry addToCache(QueryInfo queryInfo, FetchWork fetchWork) {
+
+    CacheEntry addedEntry = null;
+    boolean dataDirMoved = false;
+    Path queryResultsPath = null;
+    Path cachedResultsPath = null;
+    String queryText = queryInfo.getLookupInfo().getQueryText();
+
+    // Should we remove other candidate entries if they are equivalent to these query results?
+    try {
+      CacheEntry potentialEntry = new CacheEntry();
+      potentialEntry.queryInfo = queryInfo;
+      queryResultsPath = fetchWork.getTblDir();
+      FileSystem resultsFs = queryResultsPath.getFileSystem(conf);
+      ContentSummary cs = resultsFs.getContentSummary(queryResultsPath);
+      potentialEntry.size = cs.getLength();
+
+      Lock writeLock = rwLock.writeLock();
+      try {
+        writeLock.lock();
+
+        if (!shouldEntryBeAdded(potentialEntry)) {
+          return null;
+        }
+        if (!clearSpaceForCacheEntry(potentialEntry)) {
+          return null;
+        }
+
+        LOG.info("Adding cache entry for query '{}'", queryText);
+
+        // Add the entry to the cache structures while under write lock. Do not mark the entry
+        // as valid yet, since the query results have not yet been moved to the cache directory.
+        // Do the data move after unlocking since it might take time.
+        // Mark the entry as valid once the data has been moved to the cache directory.
+        Set<CacheEntry> entriesForQuery = queryMap.get(queryText);
+        if (entriesForQuery == null) {
+          entriesForQuery = new HashSet<CacheEntry>();
+          queryMap.put(queryText, entriesForQuery);
+        }
+        entriesForQuery.add(potentialEntry);
+        lru.put(potentialEntry, potentialEntry);
+        cacheSize += potentialEntry.size;
+        addedEntry = potentialEntry;
+
+      } finally {
+        writeLock.unlock();
+      }
+
+      // Move the query results to the query cache directory.
+      cachedResultsPath = moveResultsToCacheDirectory(queryResultsPath);
+      dataDirMoved = true;
+      LOG.info("Moved query results from {} to {} (size {}) for query '{}'",
+          queryResultsPath, cachedResultsPath, cs.getLength(), queryText);
+
+      // Create a new FetchWork to reference the new cache location.
+      FetchWork fetchWorkForCache =
+          new FetchWork(cachedResultsPath, fetchWork.getTblDesc(), fetchWork.getLimit());
+      fetchWorkForCache.setCachedResult(true);
+      addedEntry.fetchWork = fetchWorkForCache;
+      addedEntry.cachedResultsPath = cachedResultsPath;
+      addedEntry.createTime = System.currentTimeMillis();
+      addedEntry.setValidity(true);
+
+      // Mark this entry as being in use. Caller will need to release later.
+      addedEntry.addReader();
+
+      scheduleEntryInvalidation(addedEntry);
+    } catch (Exception err) {
+      LOG.error("Failed to create cache entry for query results for query: " + queryText, err);
+
+      if (addedEntry != null) {
+        // If the entry was already added to the cache when we hit error, clean up properly.
+
+        if (dataDirMoved) {
+          // If data was moved from original location to cache directory, we need to move it back!
+          LOG.info("Restoring query results from {} back to {}", cachedResultsPath, queryResultsPath);
+          try {
+            FileSystem fs = cachedResultsPath.getFileSystem(conf);
+            fs.rename(cachedResultsPath, queryResultsPath);
+            addedEntry.cachedResultsPath = null;
+          } catch (Exception err2) {
+            String errMsg = "Failed cleanup during failed attempt to cache query: " + queryText;
+            LOG.error(errMsg);
+            throw new RuntimeException(errMsg);
+          }
+        }
+
+        addedEntry.invalidate();
+        if (addedEntry.numReaders() > 0) {
+          addedEntry.releaseReader();
+        }
+      }
+
+      return null;
+    }
+
+    return addedEntry;
+  }
+
+  public void clear() {
+    Lock writeLock = rwLock.writeLock();
+    try {
+      writeLock.lock();
+      LOG.info("Clearing the results cache");
+      for (CacheEntry entry : lru.keySet().toArray(EMPTY_CACHEENTRY_ARRAY)) {
+        try {
+          removeEntry(entry);
+        } catch (Exception err) {
+          LOG.error("Error removing cache entry " + entry, err);
+        }
+      }
+    } finally {
+      writeLock.unlock();
+    }
+  }
+
+  public long getSize() {
+    Lock readLock = rwLock.readLock();
+    try {
+      readLock.lock();
+      return cacheSize;
+    } finally {
+      readLock.unlock();
+    }
+  }
+
+  private static final int INITIAL_LRU_SIZE = 16;
+  private static final float LRU_LOAD_FACTOR = 0.75f;
+  private static final CacheEntry[] EMPTY_CACHEENTRY_ARRAY = {};
+
+  private boolean entryMatches(LookupInfo lookupInfo, CacheEntry entry) {
+    QueryInfo queryInfo = entry.getQueryInfo();
+    for (ReadEntity readEntity : queryInfo.getInputs()) {
+      // Check that the tables used do not resolve to temp tables.
+      if (readEntity.getType() == Type.TABLE) {
+        Table tableUsed = readEntity.getTable();
+        Map<String, Table> tempTables =
+            SessionHiveMetaStoreClient.getTempTablesForDatabase(tableUsed.getDbName());
+        if (tempTables != null && tempTables.containsKey(tableUsed.getTableName())) {
+          LOG.info("{} resolves to a temporary table in the current session. This query cannot use the cache.",
+              tableUsed.getTableName());
+          return false;
+        }
+      }
+    }
+
+    return true;
+  }
+
+  private void removeEntry(CacheEntry entry) {
+    entry.invalidate();
+    removeFromLookup(entry);
+    lru.remove(entry);
+    // Should the cache size be updated here, or after the result data has actually been deleted?
+    cacheSize -= entry.size;
+  }
+
+  private void removeFromLookup(CacheEntry entry) {
+    String queryString = entry.getQueryText();
+    Set<CacheEntry> entries = queryMap.get(queryString);
+    Preconditions.checkState(entries != null);
+    boolean deleted = entries.remove(entry);
+    Preconditions.checkState(deleted);
+    if (entries.isEmpty()) {
+      queryMap.remove(queryString);
+    }
+  }
+
+  private void calculateEntrySize(CacheEntry entry, FetchWork fetchWork) throws IOException {
+    Path queryResultsPath = fetchWork.getTblDir();
+    FileSystem resultsFs = queryResultsPath.getFileSystem(conf);
+    ContentSummary cs = resultsFs.getContentSummary(queryResultsPath);
+    entry.size = cs.getLength();
+  }
+
+  /**
+   * Determines if the cache entry should be added to the results cache.
+   */
+  private boolean shouldEntryBeAdded(CacheEntry entry) {
+    // Assumes the cache lock has already been taken.
+    if (maxEntrySize >= 0 && entry.size > maxEntrySize) {
+      LOG.debug("Cache entry size {} larger than max entry size ({})", entry.size, maxEntrySize);
+      return false;
+    }
+
+    return true;
+  }
+
+  private Path moveResultsToCacheDirectory(Path queryResultsPath) throws IOException {
+    String dirName = UUID.randomUUID().toString();
+    Path cachedResultsPath = new Path(cacheDirPath, dirName);
+    FileSystem fs = cachedResultsPath.getFileSystem(conf);
+    fs.rename(queryResultsPath, cachedResultsPath);
+    return cachedResultsPath;
+  }
+
+  private boolean hasSpaceForCacheEntry(CacheEntry entry) {
+    if (maxCacheSize >= 0) {
+      return (cacheSize + entry.size) <= maxCacheSize;
+    }
+    // Negative max cache size means unbounded.
+    return true;
+  }
+
+  private boolean clearSpaceForCacheEntry(CacheEntry entry) {
+    if (hasSpaceForCacheEntry(entry)) {
+      return true;
+    }
+
+    LOG.info("Clearing space for cache entry for query: [{}] with size {}",
+        entry.getQueryText(), entry.size);
+
+    // Entries should be in LRU order in the keyset iterator.
+    CacheEntry[] entries = lru.keySet().toArray(EMPTY_CACHEENTRY_ARRAY);
+    for (CacheEntry removalCandidate : entries) {
+      if (!removalCandidate.isValid()) {
+        // Likely an entry which is still getting its results moved to the cache directory.
+        continue;
+      }
+      // Only delete the entry if it has no readers.
+      if (!(removalCandidate.numReaders() > 0)) {
+        LOG.info("Removing entry: {}", removalCandidate);
+        removeEntry(removalCandidate);
+        if (hasSpaceForCacheEntry(entry)) {
+          return true;
+        }
+      }
+    }
+
+    LOG.info("Could not free enough space for cache entry for query: [{}] withe size {}",
+        entry.getQueryText(), entry.size);
+    return false;
+  }
+
+
+  @VisibleForTesting
+  public static void cleanupInstance() {
+    // This should only ever be called in testing scenarios.
+    // There should not be any other users of the cache or its entries or this may mess up cleanup.
+    if (inited.get()) {
+      getInstance().clear();
+      instance = null;
+      inited.set(false);
+    }
+  }
+
+  private static ScheduledExecutorService invalidationExecutor = null;
+  private static ExecutorService deletionExecutor = null;
+
+  static {
+    ThreadFactory threadFactory =
+        new ThreadFactoryBuilder().setDaemon(true).setNameFormat("QueryResultsCache %d").build();
+    invalidationExecutor = Executors.newSingleThreadScheduledExecutor(threadFactory);
+    deletionExecutor = Executors.newSingleThreadExecutor(threadFactory);
+  }
+
+  private void scheduleEntryInvalidation(final CacheEntry entry) {
+    if (maxEntryLifetime >= 0) {
+      // Schedule task to invalidate cache entry.
+      ScheduledFuture<?> future = invalidationExecutor.schedule(new Runnable() {
+        @Override
+        public void run() {
+          entry.invalidate();
+        }
+      }, maxEntryLifetime, TimeUnit.MILLISECONDS);
+      entry.invalidationFuture = future;
+    }
+  }
+
+  private static void cleanupEntry(final CacheEntry entry) {
+    Preconditions.checkState(!entry.isValid());
+
+    if (entry.cachedResultsPath != null) {
+      deletionExecutor.execute(new Runnable() {
+        @Override
+        public void run() {
+          Path path = entry.cachedResultsPath;
+          LOG.info("Cache directory cleanup: deleting {}", path);
+          try {
+            FileSystem fs = entry.cachedResultsPath.getFileSystem(getInstance().conf);
+            fs.delete(entry.cachedResultsPath, true);
+          } catch (Exception err) {
+            LOG.error("Error while trying to delete " + path, err);
+          }
+        }
+      });
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/1733a371/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
index 675ca12..8f44c94 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
@@ -2626,6 +2626,10 @@ public final class Utilities {
     return getTasks(tasks, new TaskFilterFunction<>(ExecDriver.class));
   }
 
+  public static int getNumClusterJobs(List<Task<? extends Serializable>> tasks) {
+    return getMRTasks(tasks).size() + getTezTasks(tasks).size() + getSparkTasks(tasks).size();
+  }
+
   static class TaskFilterFunction<T> implements DAGTraversal.Function {
     private Set<Task<? extends Serializable>> visited = new HashSet<>();
     private Class<T> requiredType;
@@ -4445,4 +4449,33 @@ public final class Utilities {
     return AcidUtils.ORIGINAL_PATTERN.matcher(path.getName()).matches() ||
       AcidUtils.ORIGINAL_PATTERN_COPY.matcher(path.getName()).matches();
   }
+
+  /**
+   * Checks if path passed in exists and has writable permissions.
+   * The path will be created if it does not exist.
+   * @param rootHDFSDirPath
+   * @param conf
+   */
+  public static void ensurePathIsWritable(Path rootHDFSDirPath, HiveConf conf) throws IOException {
+    FsPermission writableHDFSDirPermission = new FsPermission((short)00733);
+    FileSystem fs = rootHDFSDirPath.getFileSystem(conf);
+    if (!fs.exists(rootHDFSDirPath)) {
+      Utilities.createDirsWithPermission(conf, rootHDFSDirPath, writableHDFSDirPermission, true);
+    }
+    FsPermission currentHDFSDirPermission = fs.getFileStatus(rootHDFSDirPath).getPermission();
+    if (rootHDFSDirPath != null && rootHDFSDirPath.toUri() != null) {
+      String schema = rootHDFSDirPath.toUri().getScheme();
+      LOG.debug("HDFS dir: " + rootHDFSDirPath + " with schema " + schema + ", permission: " +
+          currentHDFSDirPermission);
+    } else {
+      LOG.debug(
+        "HDFS dir: " + rootHDFSDirPath + ", permission: " + currentHDFSDirPermission);
+    }
+    // If the root HDFS scratch dir already exists, make sure it is writeable.
+    if (!((currentHDFSDirPermission.toShort() & writableHDFSDirPermission
+        .toShort()) == writableHDFSDirPermission.toShort())) {
+      throw new RuntimeException("The dir: " + rootHDFSDirPath
+          + " on HDFS should be writable. Current permissions are: " + currentHDFSDirPermission);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/1733a371/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/HiveCalciteUtil.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/HiveCalciteUtil.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/HiveCalciteUtil.java
index f0dd167..1f8a48c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/HiveCalciteUtil.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/HiveCalciteUtil.java
@@ -967,6 +967,44 @@ public class HiveCalciteUtil {
     return AggregateCall.create(aggFunction, false, argList, -1, aggFnRetType, null);
   }
 
+  /**
+   * Is the expression usable for query materialization.
+   */
+  public static boolean isMaterializable(RexNode expr) {
+    return (checkMaterializable(expr) == null);
+  }
+
+  /**
+   * Check if the expression is usable for query materialization, returning the first failing expression.
+   */
+  public static RexCall checkMaterializable(RexNode expr) {
+    boolean deterministic = true;
+    RexCall failingCall = null;
+
+    if (expr == null) {
+      return null;
+    }
+
+    RexVisitor<Void> visitor = new RexVisitorImpl<Void>(true) {
+      @Override
+      public Void visitCall(org.apache.calcite.rex.RexCall call) {
+        // non-deterministic functions as well as runtime constants are not materializable.
+        if (!call.getOperator().isDeterministic() || call.getOperator().isDynamicFunction()) {
+          throw new Util.FoundOne(call);
+        }
+        return super.visitCall(call);
+      }
+    };
+
+    try {
+      expr.accept(visitor);
+    } catch (Util.FoundOne e) {
+      failingCall = (RexCall) e.getNode();
+    }
+
+    return failingCall;
+  }
+
   public static HiveTableFunctionScan createUDTFForSetOp(RelOptCluster cluster, RelNode input)
       throws SemanticException {
     RelTraitSet traitSet = TraitsUtil.getDefaultTraitSet(cluster);

http://git-wip-us.apache.org/repos/asf/hive/blob/1733a371/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/HiveRelOpMaterializationValidator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/HiveRelOpMaterializationValidator.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/HiveRelOpMaterializationValidator.java
new file mode 100644
index 0000000..8c1bcb3
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/HiveRelOpMaterializationValidator.java
@@ -0,0 +1,285 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.optimizer.calcite;
+
+import java.util.List;
+
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.AggregateCall;
+import org.apache.calcite.rel.core.TableFunctionScan;
+import org.apache.calcite.rel.core.TableScan;
+import org.apache.calcite.rel.logical.LogicalAggregate;
+import org.apache.calcite.rel.logical.LogicalCorrelate;
+import org.apache.calcite.rel.logical.LogicalExchange;
+import org.apache.calcite.rel.logical.LogicalFilter;
+import org.apache.calcite.rel.logical.LogicalIntersect;
+import org.apache.calcite.rel.logical.LogicalJoin;
+import org.apache.calcite.rel.logical.LogicalMatch;
+import org.apache.calcite.rel.logical.LogicalMinus;
+import org.apache.calcite.rel.logical.LogicalProject;
+import org.apache.calcite.rel.logical.LogicalSort;
+import org.apache.calcite.rel.logical.LogicalUnion;
+import org.apache.calcite.rel.logical.LogicalValues;
+import org.apache.calcite.rex.RexCall;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.util.Util;
+
+import org.apache.hadoop.hive.metastore.TableType;
+
+import org.apache.hadoop.hive.ql.metadata.Table;
+import org.apache.hadoop.hive.ql.optimizer.calcite.HiveCalciteUtil.JoinPredicateInfo;
+import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveAggregate;
+import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveExcept;
+import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveFilter;
+import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveIntersect;
+import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveJoin;
+import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveProject;
+import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveSemiJoin;
+import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveSortLimit;
+import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveTableScan;
+import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveUnion;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Checks the query plan for conditions that would make the plan unsuitable for
+ * materialized views or query caching:
+ * - References to temporary or external tables
+ * - References to non-determinisitc functions.
+ */
+public class HiveRelOpMaterializationValidator extends HiveRelShuttleImpl {
+  static final Logger LOG = LoggerFactory.getLogger(HiveRelOpMaterializationValidator.class);
+
+  protected String invalidMaterializationReason;
+
+  public void validateQueryMaterialization(RelNode relNode) {
+    try {
+      relNode.accept(this);
+    } catch (Util.FoundOne e) {
+      // Can ignore - the check failed.
+    }
+  }
+
+  @Override
+  public RelNode visit(TableScan scan) {
+    if (scan instanceof HiveTableScan) {
+      HiveTableScan hiveScan = (HiveTableScan) scan;
+      RelOptHiveTable relOptHiveTable = (RelOptHiveTable) hiveScan.getTable();
+      Table tab = relOptHiveTable.getHiveTableMD();
+      if (tab.isTemporary()) {
+        fail(tab.getTableName() + " is a temporary table");
+      }
+      TableType tt = tab.getTableType();
+      if (tab.getTableType() == TableType.EXTERNAL_TABLE) {
+        fail(tab.getFullyQualifiedName() + " is an external table");
+      }
+      return scan;
+    }
+
+    // TableScan of a non-Hive table - don't support for materializations.
+    fail(scan.getTable().getQualifiedName() + " is a table scan of a non-Hive table.");
+    return scan;
+  }
+
+  @Override
+  public RelNode visit(HiveProject project) {
+    for (RexNode expr : project.getProjects()) {
+      checkExpr(expr);
+    }
+    return super.visit(project);
+  }
+
+  @Override
+  public RelNode visit(HiveFilter filter) {
+    checkExpr(filter.getCondition());
+    return super.visit(filter);
+  }
+
+  @Override
+  public RelNode visit(HiveJoin join) {
+    checkExpr(join.getCondition());
+    return super.visit(join);
+  }
+
+  @Override
+  public RelNode visit(HiveAggregate aggregate) {
+    // Is there anything to check here?
+    return super.visit(aggregate);
+  }
+
+  @Override
+  public RelNode visit(RelNode node) {
+    // There are several Hive RelNode types which do not have their own visit() method
+    // defined in the HiveRelShuttle interface, which need to be handled appropriately here.
+    // Per jcamachorodriguez we should not encounter HiveMultiJoin/HiveSortExchange
+    // during these checks, so no need to add those here.
+    if (node instanceof HiveUnion) {
+      return visit((HiveUnion) node);
+    } else if (node instanceof HiveSortLimit) {
+      return visit((HiveSortLimit) node);
+    } else if (node instanceof HiveSemiJoin) {
+      return visit((HiveSemiJoin) node);
+    } else if (node instanceof HiveExcept) {
+      return visit((HiveExcept) node);
+    } else if (node instanceof HiveIntersect) {
+      return visit((HiveIntersect) node);
+    }
+
+    // Fall-back for an unexpected RelNode type
+    return fail(node);
+  }
+
+  @Override
+  public RelNode visit(TableFunctionScan scan) {
+    checkExpr(scan.getCall());
+    return super.visit(scan);
+  }
+
+  @Override
+  public RelNode visit(LogicalValues values) {
+    // Not expected to be encountered for Hive - fail
+    return fail(values);
+  }
+
+  @Override
+  public RelNode visit(LogicalFilter filter) {
+    // Not expected to be encountered for Hive - fail
+    return fail(filter);
+  }
+
+  @Override
+  public RelNode visit(LogicalProject project) {
+    // Not expected to be encountered for Hive - fail
+    return fail(project);
+  }
+
+  @Override
+  public RelNode visit(LogicalJoin join) {
+    // Not expected to be encountered for Hive - fail
+    return fail(join);
+  }
+
+  @Override
+  public RelNode visit(LogicalCorrelate correlate) {
+    // Not expected to be encountered for Hive - fail
+    return fail(correlate);
+  }
+
+  @Override
+  public RelNode visit(LogicalUnion union) {
+    // Not expected to be encountered for Hive - fail
+    return fail(union);
+  }
+
+  @Override
+  public RelNode visit(LogicalIntersect intersect) {
+    // Not expected to be encountered for Hive - fail
+    return fail(intersect);
+  }
+
+  @Override
+  public RelNode visit(LogicalMinus minus) {
+    // Not expected to be encountered for Hive - fail
+    return fail(minus);
+  }
+
+  @Override
+  public RelNode visit(LogicalAggregate aggregate) {
+    // Not expected to be encountered for Hive - fail
+    return fail(aggregate);
+  }
+
+  @Override
+  public RelNode visit(LogicalMatch match) {
+    // Not expected to be encountered for Hive - fail
+    return fail(match);
+  }
+
+  @Override
+  public RelNode visit(LogicalSort sort) {
+    // Not expected to be encountered for Hive - fail
+    return fail(sort);
+  }
+
+  @Override
+  public RelNode visit(LogicalExchange exchange) {
+    // Not expected to be encountered for Hive - fail
+    return fail(exchange);
+  }
+
+  // Note: Not currently part of the HiveRelNode interface
+  private RelNode visit(HiveUnion union) {
+    return visitChildren(union);
+  }
+
+  // Note: Not currently part of the HiveRelNode interface
+  private RelNode visit(HiveSortLimit sort) {
+    checkExpr(sort.getFetchExpr());
+    checkExpr(sort.getOffsetExpr());
+    return visitChildren(sort);
+  }
+
+  // Note: Not currently part of the HiveRelNode interface
+  private RelNode visit(HiveSemiJoin semiJoin) {
+    checkExpr(semiJoin.getCondition());
+    checkExpr(semiJoin.getJoinFilter());
+    return visitChildren(semiJoin);
+  }
+
+  // Note: Not currently part of the HiveRelNode interface
+  private RelNode visit(HiveExcept except) {
+    return visitChildren(except);
+  }
+
+  // Note: Not currently part of the HiveRelNode interface
+  private RelNode visit(HiveIntersect intersect) {
+    return visitChildren(intersect);
+  }
+
+  private void fail(String reason) {
+    setInvalidMaterializationReason(reason);
+    throw Util.FoundOne.NULL;
+  }
+
+  private RelNode fail(RelNode node) {
+    setInvalidMaterializationReason("Unsupported RelNode type " + node.getRelTypeName() +
+        " encountered in the query plan");
+    throw Util.FoundOne.NULL;
+  }
+
+  private void checkExpr(RexNode expr) {
+    RexCall invalidCall = HiveCalciteUtil.checkMaterializable(expr);
+    if (invalidCall != null) {
+      fail(invalidCall.getOperator().getName() + " is not a deterministic function");
+    }
+  }
+
+  public String getInvalidMaterializationReason() {
+    return invalidMaterializationReason;
+  }
+
+  public void setInvalidMaterializationReason(String invalidMaterializationReason) {
+    this.invalidMaterializationReason = invalidMaterializationReason;
+  }
+
+  public boolean isValidMaterialization() {
+    return invalidMaterializationReason == null;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/1733a371/ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java
index e553a81..8a1bfd2 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java
@@ -55,6 +55,8 @@ import org.apache.hadoop.hive.ql.Context;
 import org.apache.hadoop.hive.ql.ErrorMsg;
 import org.apache.hadoop.hive.ql.QueryProperties;
 import org.apache.hadoop.hive.ql.QueryState;
+import org.apache.hadoop.hive.ql.cache.results.CacheUsage;
+import org.apache.hadoop.hive.ql.cache.results.QueryResultsCache;
 import org.apache.hadoop.hive.ql.exec.FetchTask;
 import org.apache.hadoop.hive.ql.exec.Task;
 import org.apache.hadoop.hive.ql.exec.TaskFactory;
@@ -148,6 +150,9 @@ public abstract class BaseSemanticAnalyzer {
   protected LineageInfo linfo;
   protected TableAccessInfo tableAccessInfo;
   protected ColumnAccessInfo columnAccessInfo;
+
+  protected CacheUsage cacheUsage;
+
   /**
    * Columns accessed by updates
    */
@@ -1945,4 +1950,12 @@ public abstract class BaseSemanticAnalyzer {
     }
     return SessionState.get().getTxnMgr();
   }
+
+  public CacheUsage getCacheUsage() {
+    return cacheUsage;
+  }
+
+  public void setCacheUsage(CacheUsage cacheUsage) {
+    this.cacheUsage = cacheUsage;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/1733a371/ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java
index 85a1f34..cf2bc13 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java
@@ -154,6 +154,7 @@ import org.apache.hadoop.hive.ql.optimizer.calcite.HiveConfPlannerContext;
 import org.apache.hadoop.hive.ql.optimizer.calcite.HiveDefaultRelMetadataProvider;
 import org.apache.hadoop.hive.ql.optimizer.calcite.HivePlannerContext;
 import org.apache.hadoop.hive.ql.optimizer.calcite.HiveRelFactories;
+import org.apache.hadoop.hive.ql.optimizer.calcite.HiveRelOpMaterializationValidator;
 import org.apache.hadoop.hive.ql.optimizer.calcite.HiveRexExecutorImpl;
 import org.apache.hadoop.hive.ql.optimizer.calcite.HiveTypeSystemImpl;
 import org.apache.hadoop.hive.ql.optimizer.calcite.RelOptHiveTable;
@@ -1442,6 +1443,16 @@ public class CalcitePlanner extends SemanticAnalyzer {
       }
       perfLogger.PerfLogEnd(this.getClass().getName(), PerfLogger.OPTIMIZER, "Calcite: Plan generation");
 
+      // Validate query materialization (materialized views, query results caching.
+      // This check needs to occur before constant folding, which may remove some
+      // function calls from the query plan.
+      HiveRelOpMaterializationValidator matValidator = new HiveRelOpMaterializationValidator();
+      matValidator.validateQueryMaterialization(calciteGenPlan);
+      if (!matValidator.isValidMaterialization()) {
+        String reason = matValidator.getInvalidMaterializationReason();
+        setInvalidQueryMaterializationReason(reason);
+      }
+
       // Create executor
       RexExecutor executorProvider = new HiveRexExecutorImpl(optCluster);
       calciteGenPlan.getCluster().getPlanner().setExecutor(executorProvider);

http://git-wip-us.apache.org/repos/asf/hive/blob/1733a371/ql/src/java/org/apache/hadoop/hive/ql/parse/QBParseInfo.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/QBParseInfo.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/QBParseInfo.java
index ae2ec3d..5789ee0 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/QBParseInfo.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/QBParseInfo.java
@@ -675,6 +675,9 @@ public class QBParseInfo {
     return insertOverwriteTables;
   }
 
+  public boolean hasInsertTables() {
+    return this.insertIntoTables.size() > 0 || this.insertOverwriteTables.size() > 0;
+  }
 }
 
 

http://git-wip-us.apache.org/repos/asf/hive/blob/1733a371/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
index e6ee968..8e587f1 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
@@ -84,10 +84,13 @@ import org.apache.hadoop.hive.ql.Context;
 import org.apache.hadoop.hive.ql.ErrorMsg;
 import org.apache.hadoop.hive.ql.QueryProperties;
 import org.apache.hadoop.hive.ql.QueryState;
+import org.apache.hadoop.hive.ql.cache.results.CacheUsage;
+import org.apache.hadoop.hive.ql.cache.results.QueryResultsCache;
 import org.apache.hadoop.hive.ql.exec.AbstractMapJoinOperator;
 import org.apache.hadoop.hive.ql.exec.ArchiveUtils;
 import org.apache.hadoop.hive.ql.exec.ColumnInfo;
 import org.apache.hadoop.hive.ql.exec.ExprNodeEvaluatorFactory;
+import org.apache.hadoop.hive.ql.exec.FetchTask;
 import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
 import org.apache.hadoop.hive.ql.exec.FilterOperator;
 import org.apache.hadoop.hive.ql.exec.FunctionInfo;
@@ -361,6 +364,8 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
       HiveParser.TOK_ORDERBY, HiveParser.TOK_WINDOWSPEC, HiveParser.TOK_CLUSTERBY,
       HiveParser.TOK_DISTRIBUTEBY, HiveParser.TOK_SORTBY);
 
+  private String invalidQueryMaterializationReason;
+
   static class Phase1Ctx {
     String dest;
     int nextNum;
@@ -11211,6 +11216,76 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
     return getTableObjectByName(tableName, true);
   }
 
+  private static void walkASTAndQualifyNames(ASTNode ast,
+      Set<String> cteAlias, Context ctx, Hive db, Set<Integer> ignoredTokens, UnparseTranslator unparseTranslator) {
+    Queue<Node> queue = new LinkedList<>();
+    queue.add(ast);
+    while (!queue.isEmpty()) {
+      ASTNode astNode = (ASTNode) queue.poll();
+      if (astNode.getToken().getType() == HiveParser.TOK_TABNAME) {
+        // Check if this is table name is qualified or not
+        String tabIdName = getUnescapedName(astNode).toLowerCase();
+        // if alias to CTE contains the table name, we do not do the translation because
+        // cte is actually a subquery.
+        if (!cteAlias.contains(tabIdName)) {
+          unparseTranslator.addTableNameTranslation(astNode, SessionState.get().getCurrentDatabase());
+        }
+      }
+
+      if (astNode.getChildCount() > 0 && !ignoredTokens.contains(astNode.getToken().getType())) {
+        for (Node child : astNode.getChildren()) {
+          queue.offer(child);
+        }
+      }
+    }
+  }
+
+  // Walk through the AST.
+  // Replace all TOK_TABREF with fully qualified table name, if it is not already fully qualified.
+  protected String rewriteQueryWithQualifiedNames(ASTNode ast, TokenRewriteStream tokenRewriteStream)
+      throws SemanticException {
+    UnparseTranslator unparseTranslator = new UnparseTranslator(conf);
+    unparseTranslator.enable();
+
+    // 1. collect information about CTE if there is any.
+    // The base table of CTE should be masked.
+    // The CTE itself should not be masked in the references in the following main query.
+    Set<String> cteAlias = new HashSet<>();
+    if (ast.getChildCount() > 0
+        && HiveParser.TOK_CTE == ((ASTNode) ast.getChild(0)).getToken().getType()) {
+      // the structure inside CTE is like this
+      // TOK_CTE
+      // TOK_SUBQUERY
+      // sq1 (may refer to sq2)
+      // ...
+      // TOK_SUBQUERY
+      // sq2
+      ASTNode cte = (ASTNode) ast.getChild(0);
+      // we start from sq2, end up with sq1.
+      for (int index = cte.getChildCount() - 1; index >= 0; index--) {
+        ASTNode subq = (ASTNode) cte.getChild(index);
+        String alias = unescapeIdentifier(subq.getChild(1).getText());
+        if (cteAlias.contains(alias)) {
+          throw new SemanticException("Duplicate definition of " + alias);
+        } else {
+          cteAlias.add(alias);
+          walkASTAndQualifyNames(ast, cteAlias, ctx, db, ignoredTokens, unparseTranslator);
+        }
+      }
+      // walk the other part of ast
+      for (int index = 1; index < ast.getChildCount(); index++) {
+        walkASTAndQualifyNames(ast, cteAlias, ctx, db, ignoredTokens, unparseTranslator);
+      }
+    } else { // there is no CTE, walk the whole AST
+      walkASTAndQualifyNames(ast, cteAlias, ctx, db, ignoredTokens, unparseTranslator);
+    }
+
+    unparseTranslator.applyTranslations(tokenRewriteStream);
+    String rewrittenQuery = tokenRewriteStream.toString(
+        ast.getTokenStartIndex(), ast.getTokenStopIndex());
+    return rewrittenQuery;
+  }
+
   private static void walkASTMarkTABREF(TableMask tableMask, ASTNode ast, Set<String> cteAlias,
                                         Context ctx, Hive db, Map<String, Table> tabNameToTabObject, Set<Integer> ignoredTokens)
       throws SemanticException {
@@ -11549,6 +11624,20 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
       }
     }
 
+    // Check query results cache.
+    // If no masking/filtering required, then we can check the cache now, before
+    // generating the operator tree and going through CBO.
+    // Otherwise we have to wait until after the masking/filtering step.
+    boolean isCacheEnabled = conf.getBoolVar(HiveConf.ConfVars.HIVE_QUERY_RESULTS_CACHE_ENABLED);
+    QueryResultsCache.LookupInfo lookupInfo = null;
+    boolean needsTransform = needsTransform();
+    if (isCacheEnabled && !needsTransform && queryTypeCanUseCache()) {
+      lookupInfo = createLookupInfoForQuery(ast);
+      if (checkResultsCache(lookupInfo)) {
+        return;
+      }
+    }
+
     // 2. Gen OP Tree from resolved Parse Tree
     Operator sinkOp = genOPTree(ast, plannerCtx);
 
@@ -11571,6 +11660,16 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
       }
     }
 
+    // Check query results cache
+    // In the case that row or column masking/filtering was required, the cache must be checked
+    // here, after applying the masking/filtering rewrite rules to the AST.
+    if (isCacheEnabled && needsTransform && queryTypeCanUseCache()) {
+      lookupInfo = createLookupInfoForQuery(ast);
+      if (checkResultsCache(lookupInfo)) {
+        return;
+      }
+    }
+
     // 3. Deduce Resultset Schema
     if (createVwDesc != null && !this.ctx.isCboSucceeded()) {
       resultSchema = convertRowSchemaToViewSchema(opParseCtx.get(sinkOp).getRowResolver());
@@ -11705,6 +11804,15 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
       putAccessedColumnsToReadEntity(inputs, columnAccessInfo);
     }
 
+    if (isCacheEnabled && lookupInfo != null) {
+      if (queryCanBeCached()) {
+        QueryResultsCache.QueryInfo queryInfo = createCacheQueryInfoForQuery(lookupInfo);
+
+        // Specify that the results of this query can be cached.
+        setCacheUsage(new CacheUsage(
+            CacheUsage.CacheStatus.CAN_CACHE_QUERY_RESULTS, queryInfo));
+      }
+    }
   }
 
   private void putAccessedColumnsToReadEntity(HashSet<ReadEntity> inputs, ColumnAccessInfo columnAccessInfo) {
@@ -13913,6 +14021,158 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
     this.loadFileWork = loadFileWork;
   }
 
+  /**
+   * Generate the query string for this query (with fully resolved table references).
+   * @return The query string with resolved references. NULL if an error occurred.
+   */
+  private String getQueryStringForCache(ASTNode ast) {
+    // Use the UnparseTranslator to resolve unqualified table names.
+    String queryString = ctx.getTokenRewriteStream().toString(ast.getTokenStartIndex(), ast.getTokenStopIndex());
+
+    // Re-using the TokenRewriteStream map for views so we do not overwrite the current TokenRewriteStream
+    String rewriteStreamName = "__qualified_query_string__";
+    ASTNode astNode;
+    try {
+      astNode = ParseUtils.parse(queryString, ctx, rewriteStreamName);
+      TokenRewriteStream tokenRewriteStream = ctx.getViewTokenRewriteStream(rewriteStreamName);
+      String fullyQualifiedQuery = rewriteQueryWithQualifiedNames(astNode, tokenRewriteStream);
+      return fullyQualifiedQuery;
+    } catch (Exception err) {
+      LOG.error("Unexpected error while reparsing the query string [" + queryString + "]", err);
+      // Don't fail the query - just return null (caller should skip cache lookup).
+      return null;
+    }
+  }
+
+  private QueryResultsCache.LookupInfo createLookupInfoForQuery(ASTNode astNode) {
+    QueryResultsCache.LookupInfo lookupInfo = null;
+    String queryString = getQueryStringForCache(astNode);
+    if (queryString != null) {
+      lookupInfo = new QueryResultsCache.LookupInfo(queryString);
+    }
+    return lookupInfo;
+  }
+
+  /**
+   * Set the query plan to use cache entry passed in to return the query results.
+   * @param cacheEntry The results cache entry that will be used to resolve the query.
+   */
+  private void useCachedResult(QueryResultsCache.CacheEntry cacheEntry) {
+    // Change query FetchTask to use new location specified in results cache.
+    FetchTask fetchTask = (FetchTask) TaskFactory.get(cacheEntry.getFetchWork(), conf);
+    setFetchTask(fetchTask);
+
+    queryState.setCommandType(cacheEntry.getQueryInfo().getHiveOperation());
+    resultSchema = cacheEntry.getQueryInfo().getResultSchema();
+    setTableAccessInfo(cacheEntry.getQueryInfo().getTableAccessInfo());
+    setColumnAccessInfo(cacheEntry.getQueryInfo().getColumnAccessInfo());
+    inputs.addAll(cacheEntry.getQueryInfo().getInputs());
+
+    // Indicate that the query will use a cached result.
+    setCacheUsage(new CacheUsage(
+        CacheUsage.CacheStatus.QUERY_USING_CACHE, cacheEntry));
+  }
+
+  private QueryResultsCache.QueryInfo createCacheQueryInfoForQuery(QueryResultsCache.LookupInfo lookupInfo) {
+    return new QueryResultsCache.QueryInfo(lookupInfo, queryState.getHiveOperation(),
+        resultSchema, getTableAccessInfo(), getColumnAccessInfo(), inputs);
+  }
+
+  /**
+   * Some initial checks for a query to see if we can look this query up in the results cache.
+   */
+  private boolean queryTypeCanUseCache() {
+    if (this instanceof ColumnStatsSemanticAnalyzer) {
+      // Column stats generates "select compute_stats() .." queries.
+      // Disable caching for these.
+      return false;
+    }
+
+    if (queryState.getHiveOperation() != HiveOperation.QUERY) {
+      return false;
+    }
+
+    if (qb.getParseInfo().isAnalyzeCommand()) {
+      return false;
+    }
+
+    if (qb.getParseInfo().hasInsertTables()) {
+      return false;
+    }
+
+    return true;
+  }
+
+  private boolean needsTransform() {
+    return SessionState.get().getAuthorizerV2() != null &&
+        SessionState.get().getAuthorizerV2().needTransform();
+  }
+
+  /**
+   * Called after a query plan has been generated, to determine if the results of this query
+   * can be added to the results cache.
+   */
+  private boolean queryCanBeCached() {
+    if (!queryTypeCanUseCache()) {
+      LOG.info("Not eligible for results caching - wrong query type");
+      return false;
+    }
+
+    // Query should have a fetch task.
+    if (getFetchTask() == null) {
+      LOG.info("Not eligible for results caching - no fetch task");
+      return false;
+    }
+
+    // At least one mr/tez/spark job
+    if (Utilities.getNumClusterJobs(getRootTasks()) == 0) {
+      LOG.info("Not eligible for results caching - no mr/tez/spark jobs");
+      return false;
+    }
+
+    // The query materialization validation check only occurs in CBO. Thus only cache results if CBO was used.
+    if (!ctx.isCboSucceeded()) {
+      LOG.info("Caching of query results is disabled if CBO was not run.");
+      return false;
+    }
+
+    if (!isValidQueryMaterialization()) {
+      LOG.info("Not eligible for results caching - {}", getInvalidQueryMaterializationReason());
+      return false;
+    }
+
+    return true;
+  }
+
+  /**
+   * Check the query results cache to see if the query represented by the lookupInfo can be
+   * answered using the results cache. If the cache contains a suitable entry, the semantic analyzer
+   * will be configured to use the found cache entry to anwer the query.
+   */
+  private boolean checkResultsCache(QueryResultsCache.LookupInfo lookupInfo) {
+    if (lookupInfo == null) {
+      return false;
+    }
+    try {
+      // In case this has not been initialized elsewhere.
+      QueryResultsCache.initialize(conf);
+    } catch (Exception err) {
+      throw new IllegalStateException(err);
+    }
+    // Don't increment the reader count for explain queries.
+    boolean isExplainQuery = (ctx.getExplainConfig() != null);
+    QueryResultsCache.CacheEntry cacheEntry =
+        QueryResultsCache.getInstance().lookup(lookupInfo, !isExplainQuery);
+    if (cacheEntry != null) {
+      // Use the cache rather than full query execution.
+      useCachedResult(cacheEntry);
+
+      // At this point the caller should return from semantic analysis.
+      return true;
+    }
+    return false;
+  }
+
   private static final class ColsAndTypes {
     public ColsAndTypes(String cols, String colTypes) {
       this.cols = cols;
@@ -13921,4 +14181,17 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
     public String cols;
     public String colTypes;
   }
+
+  public String getInvalidQueryMaterializationReason() {
+    return invalidQueryMaterializationReason;
+  }
+
+  public void setInvalidQueryMaterializationReason(
+      String invalidQueryMaterializationReason) {
+    this.invalidQueryMaterializationReason = invalidQueryMaterializationReason;
+  }
+
+  public boolean isValidQueryMaterialization() {
+    return (invalidQueryMaterializationReason == null);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/1733a371/ql/src/java/org/apache/hadoop/hive/ql/plan/FetchWork.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/FetchWork.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/FetchWork.java
index 7243dc7..1f139c8 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/FetchWork.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/FetchWork.java
@@ -75,6 +75,11 @@ public class FetchWork implements Serializable {
    */
   private boolean isUsingThriftJDBCBinarySerDe = false;
 
+  /**
+   * Whether this FetchWork is returning a cached query result
+   */
+  private boolean isCachedResult = false;
+
   public boolean isHiveServerQuery() {
 	return isHiveServerQuery;
   }
@@ -364,4 +369,12 @@ public class FetchWork implements Serializable {
     }
     return new FetchExplainVectorization(this);
   }
+  @Explain(displayName = "Cached Query Result", displayOnlyOnTrue = true, explainLevels = { Level.USER, Level.DEFAULT, Level.EXTENDED })
+  public boolean isCachedResult() {
+    return isCachedResult;
+  }
+
+  public void setCachedResult(boolean isCachedResult) {
+    this.isCachedResult = isCachedResult;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/1733a371/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java b/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java
index 3946d4a..dfc2dfa 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java
@@ -725,27 +725,7 @@ public class SessionState {
    */
   private Path createRootHDFSDir(HiveConf conf) throws IOException {
     Path rootHDFSDirPath = new Path(HiveConf.getVar(conf, HiveConf.ConfVars.SCRATCHDIR));
-    FsPermission writableHDFSDirPermission = new FsPermission((short)00733);
-    FileSystem fs = rootHDFSDirPath.getFileSystem(conf);
-    if (!fs.exists(rootHDFSDirPath)) {
-      Utilities.createDirsWithPermission(conf, rootHDFSDirPath, writableHDFSDirPermission, true);
-    }
-    FsPermission currentHDFSDirPermission = fs.getFileStatus(rootHDFSDirPath).getPermission();
-    if (rootHDFSDirPath != null && rootHDFSDirPath.toUri() != null) {
-      String schema = rootHDFSDirPath.toUri().getScheme();
-      LOG.debug(
-        "HDFS root scratch dir: " + rootHDFSDirPath + " with schema " + schema + ", permission: " +
-          currentHDFSDirPermission);
-    } else {
-      LOG.debug(
-        "HDFS root scratch dir: " + rootHDFSDirPath + ", permission: " + currentHDFSDirPermission);
-    }
-    // If the root HDFS scratch dir already exists, make sure it is writeable.
-    if (!((currentHDFSDirPermission.toShort() & writableHDFSDirPermission
-        .toShort()) == writableHDFSDirPermission.toShort())) {
-      throw new RuntimeException("The root scratch dir: " + rootHDFSDirPath
-          + " on HDFS should be writable. Current permissions are: " + currentHDFSDirPermission);
-    }
+    Utilities.ensurePathIsWritable(rootHDFSDirPath, conf);
     return rootHDFSDirPath;
   }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/1733a371/ql/src/test/queries/clientpositive/results_cache_1.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/results_cache_1.q b/ql/src/test/queries/clientpositive/results_cache_1.q
new file mode 100644
index 0000000..4aea60e
--- /dev/null
+++ b/ql/src/test/queries/clientpositive/results_cache_1.q
@@ -0,0 +1,96 @@
+
+set hive.query.results.cache.enabled=true;
+
+explain
+select count(*) from src a join src b on (a.key = b.key);
+select count(*) from src a join src b on (a.key = b.key);
+
+set test.comment="Cache should be used for this query";
+set test.comment;
+explain
+select count(*) from src a join src b on (a.key = b.key);
+select count(*) from src a join src b on (a.key = b.key);
+
+set hive.query.results.cache.enabled=false;
+set test.comment="Cache is disabled, should not be used here.";
+set test.comment;
+explain
+select count(*) from src a join src b on (a.key = b.key);
+
+create database db1;
+use db1;
+create table src as select key, value from default.src;
+
+set hive.query.results.cache.enabled=true;
+set test.comment="Same query string, but different current database. Cache should not be used since unqualified tablenames resolve to different tables";
+set test.comment;
+explain
+select count(*) from src a join src b on (a.key = b.key);
+
+use default;
+
+-- Union
+select * from src where key = 0
+union all
+select * from src where key = 2;
+
+set test.comment="Union all. Cache should be used now";
+set test.comment;
+explain
+select * from src where key = 0
+union all
+select * from src where key = 2;
+
+select * from src where key = 0
+union all
+select * from src where key = 2;
+
+
+-- CTE
+with q1 as ( select distinct key from q2 ),
+q2 as ( select key, value from src where key < 10 )
+select * from q1 a, q1 b where a.key = b.key;
+
+set test.comment="CTE. Cache should be used now";
+set test.comment;
+explain
+with q1 as ( select distinct key from q2 ),
+q2 as ( select key, value from src where key < 10 )
+select * from q1 a, q1 b where a.key = b.key;
+
+with q1 as ( select distinct key from q2 ),
+q2 as ( select key, value from src where key < 10 )
+select * from q1 a, q1 b where a.key = b.key;
+
+-- Intersect/Except
+with q1 as ( select distinct key, value from src ),
+q2 as ( select key, value from src where key < 10 ),
+q3 as ( select key, value from src where key = 0 )
+select * from q1 intersect all select * from q2 except all select * from q3;
+
+set test.comment="Intersect/Except. Cache should be used now";
+set test.comment;
+explain
+with q1 as ( select distinct key, value from src ),
+q2 as ( select key, value from src where key < 10 ),
+q3 as ( select key, value from src where key = 0 )
+select * from q1 intersect all select * from q2 except all select * from q3;
+
+with q1 as ( select distinct key, value from src ),
+q2 as ( select key, value from src where key < 10 ),
+q3 as ( select key, value from src where key = 0 )
+select * from q1 intersect all select * from q2 except all select * from q3;
+
+-- Semijoin. Use settings from cbo_semijoin
+set hive.mapred.mode=nonstrict;
+set hive.exec.check.crossproducts=false;
+set hive.stats.fetch.column.stats=true;
+set hive.auto.convert.join=false;
+
+select a, c, count(*)  from (select key as a, c_int+1 as b, sum(c_int) as c from cbo_t1 where (cbo_t1.c_int + 1 >= 0) and (cbo_t1.c_int > 0 or cbo_t1.c_float >= 0)  group by c_float, cbo_t1.c_int, key having cbo_t1.c_float > 0 and (c_int >=1 or c_float >= 1) and (c_int + c_float) >= 0 order by a+b desc, c asc limit 5) cbo_t1 left semi join (select key as p, c_int+1 as q, sum(c_int) as r from cbo_t2 where (cbo_t2.c_int + 1 >= 0) and (cbo_t2.c_int > 0 or cbo_t2.c_float >= 0)  group by c_float, cbo_t2.c_int, key having cbo_t2.c_float > 0 and (c_int >=1 or c_float >= 1) and (c_int + c_float) >= 0 order by q+r/10 desc, p limit 5) cbo_t2 on cbo_t1.a=p left semi join cbo_t3 on cbo_t1.a=key where (b + 1  >= 0) and (b > 0 or a >= 0) group by a, c  having a > 0 and (a >=1 or c >= 1) and (a + c) >= 0 order by c, a;
+
+set test.comment="Semijoin. Cache should be used now";
+set test.comment;
+explain
+select a, c, count(*)  from (select key as a, c_int+1 as b, sum(c_int) as c from cbo_t1 where (cbo_t1.c_int + 1 >= 0) and (cbo_t1.c_int > 0 or cbo_t1.c_float >= 0)  group by c_float, cbo_t1.c_int, key having cbo_t1.c_float > 0 and (c_int >=1 or c_float >= 1) and (c_int + c_float) >= 0 order by a+b desc, c asc limit 5) cbo_t1 left semi join (select key as p, c_int+1 as q, sum(c_int) as r from cbo_t2 where (cbo_t2.c_int + 1 >= 0) and (cbo_t2.c_int > 0 or cbo_t2.c_float >= 0)  group by c_float, cbo_t2.c_int, key having cbo_t2.c_float > 0 and (c_int >=1 or c_float >= 1) and (c_int + c_float) >= 0 order by q+r/10 desc, p limit 5) cbo_t2 on cbo_t1.a=p left semi join cbo_t3 on cbo_t1.a=key where (b + 1  >= 0) and (b > 0 or a >= 0) group by a, c  having a > 0 and (a >=1 or c >= 1) and (a + c) >= 0 order by c, a;
+select a, c, count(*)  from (select key as a, c_int+1 as b, sum(c_int) as c from cbo_t1 where (cbo_t1.c_int + 1 >= 0) and (cbo_t1.c_int > 0 or cbo_t1.c_float >= 0)  group by c_float, cbo_t1.c_int, key having cbo_t1.c_float > 0 and (c_int >=1 or c_float >= 1) and (c_int + c_float) >= 0 order by a+b desc, c asc limit 5) cbo_t1 left semi join (select key as p, c_int+1 as q, sum(c_int) as r from cbo_t2 where (cbo_t2.c_int + 1 >= 0) and (cbo_t2.c_int > 0 or cbo_t2.c_float >= 0)  group by c_float, cbo_t2.c_int, key having cbo_t2.c_float > 0 and (c_int >=1 or c_float >= 1) and (c_int + c_float) >= 0 order by q+r/10 desc, p limit 5) cbo_t2 on cbo_t1.a=p left semi join cbo_t3 on cbo_t1.a=key where (b + 1  >= 0) and (b > 0 or a >= 0) group by a, c  having a > 0 and (a >=1 or c >= 1) and (a + c) >= 0 order by c, a;

http://git-wip-us.apache.org/repos/asf/hive/blob/1733a371/ql/src/test/queries/clientpositive/results_cache_2.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/results_cache_2.q b/ql/src/test/queries/clientpositive/results_cache_2.q
new file mode 100644
index 0000000..96a9092
--- /dev/null
+++ b/ql/src/test/queries/clientpositive/results_cache_2.q
@@ -0,0 +1,41 @@
+
+set hive.query.results.cache.enabled=true;
+set hive.fetch.task.conversion=more;
+
+-- Test 1: fetch task
+explain
+select key, value from src where key=0;
+select key, value from src where key=0;
+
+set test.comment=Query only requires fetch task - should not use results cache;
+set test.comment;
+explain
+select key, value from src where key=0;
+
+
+-- Test 2: deterministic function should use cache.
+select c1, count(*)
+from (select sign(value) c1, value from src where key < 10) q
+group by c1;
+
+set test.comment=This query should use the cache;
+set test.comment;
+explain
+select c1, count(*)
+from (select sign(value) c1, value from src where key < 10) q
+group by c1;
+
+-- Test 3: non-deterministic functions should not be cached
+-- Set current timestamp config to get repeatable result.
+set hive.test.currenttimestamp=2012-01-01 01:02:03;
+
+select c1, count(*)
+from (select current_timestamp c1, value from src where key < 10) q
+group by c1;
+
+set test.comment=Queries using non-deterministic functions should not use results cache;
+set test.comment;
+explain
+select c1, count(*)
+from (select current_timestamp c1, value from src where key < 10) q
+group by c1;