You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by jd...@apache.org on 2018/02/07 21:40:43 UTC
[3/3] hive git commit: HIVE-18513: Query results caching (Jason Dere,
reviewed by Jesus Camacho Rodriguez)
HIVE-18513: Query results caching (Jason Dere, reviewed by Jesus Camacho Rodriguez)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/1733a371
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/1733a371
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/1733a371
Branch: refs/heads/master
Commit: 1733a3712f0c548f9181e6e5b5298f466cad755e
Parents: 075077d
Author: Jason Dere <jd...@hortonworks.com>
Authored: Wed Feb 7 13:39:41 2018 -0800
Committer: Jason Dere <jd...@hortonworks.com>
Committed: Wed Feb 7 13:39:41 2018 -0800
----------------------------------------------------------------------
.../org/apache/hadoop/hive/conf/HiveConf.java | 21 +
.../apache/hadoop/hive/ql/log/PerfLogger.java | 1 +
data/conf/hive-site.xml | 5 +
data/conf/llap/hive-site.xml | 5 +
data/conf/perf-reg/spark/hive-site.xml | 5 +
data/conf/perf-reg/tez/hive-site.xml | 5 +
data/conf/rlist/hive-site.xml | 5 +
data/conf/spark/local/hive-site.xml | 5 +
data/conf/spark/standalone/hive-site.xml | 5 +
data/conf/spark/yarn-client/hive-site.xml | 5 +
data/conf/tez/hive-site.xml | 5 +
.../src/test/resources/hive-site.xml | 5 +
.../test/resources/testconfiguration.properties | 1 +
.../org/apache/hadoop/hive/ql/QTestUtil.java | 4 +
.../java/org/apache/hadoop/hive/ql/Driver.java | 64 ++
.../hive/ql/cache/results/CacheUsage.java | 73 ++
.../ql/cache/results/QueryResultsCache.java | 666 +++++++++++++++++++
.../apache/hadoop/hive/ql/exec/Utilities.java | 33 +
.../ql/optimizer/calcite/HiveCalciteUtil.java | 38 ++
.../HiveRelOpMaterializationValidator.java | 285 ++++++++
.../hive/ql/parse/BaseSemanticAnalyzer.java | 13 +
.../hadoop/hive/ql/parse/CalcitePlanner.java | 11 +
.../hadoop/hive/ql/parse/QBParseInfo.java | 3 +
.../hadoop/hive/ql/parse/SemanticAnalyzer.java | 273 ++++++++
.../apache/hadoop/hive/ql/plan/FetchWork.java | 13 +
.../hadoop/hive/ql/session/SessionState.java | 22 +-
.../queries/clientpositive/results_cache_1.q | 96 +++
.../queries/clientpositive/results_cache_2.q | 41 ++
.../clientpositive/results_cache_capacity.q | 52 ++
.../clientpositive/results_cache_lifetime.q | 14 +
.../clientpositive/results_cache_temptable.q | 42 ++
.../clientpositive/results_cache_with_masking.q | 17 +
.../clientpositive/llap/results_cache_1.q.out | 584 ++++++++++++++++
.../clientpositive/results_cache_1.q.out | 579 ++++++++++++++++
.../clientpositive/results_cache_2.q.out | 176 +++++
.../clientpositive/results_cache_capacity.q.out | 238 +++++++
.../clientpositive/results_cache_lifetime.q.out | 112 ++++
.../results_cache_temptable.q.out | 293 ++++++++
.../results_cache_with_masking.q.out | 106 +++
.../apache/hive/service/server/HiveServer2.java | 10 +
40 files changed, 3910 insertions(+), 21 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/1733a371/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 26e08e4..67e22f6 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -3707,6 +3707,27 @@ public class HiveConf extends Configuration {
HIVE_EXEC_INPUT_LISTING_MAX_THREADS("hive.exec.input.listing.max.threads", 0, new SizeValidator(0L, true, 1024L, true),
"Maximum number of threads that Hive uses to list file information from file systems (recommended > 1 for blobstore)."),
+ HIVE_QUERY_RESULTS_CACHE_ENABLED("hive.query.results.cache.enabled", true,
+ "If the query results cache is enabled. This will keep results of previously executed queries " +
+ "to be reused if the same query is executed again."),
+
+ HIVE_QUERY_RESULTS_CACHE_DIRECTORY("hive.query.results.cache.directory",
+ "/tmp/hive/_resultscache_",
+ "Location of the query results cache directory. Temporary results from queries " +
+ "will be moved to this location."),
+
+ HIVE_QUERY_RESULTS_CACHE_MAX_ENTRY_LIFETIME("hive.query.results.cache.max.entry.lifetime", "3600s",
+ new TimeValidator(TimeUnit.SECONDS),
+ "Maximum lifetime in seconds for an entry in the query results cache. A nonpositive value means infinite."),
+
+ HIVE_QUERY_RESULTS_CACHE_MAX_SIZE("hive.query.results.cache.max.size",
+ (long) 2 * 1024 * 1024 * 1024,
+ "Maximum total size in bytes that the query results cache directory is allowed to use on the filesystem."),
+
+ HIVE_QUERY_RESULTS_CACHE_MAX_ENTRY_SIZE("hive.query.results.cache.max.entry.size",
+ (long) 10 * 1024 * 1024,
+ "Maximum size in bytes that a single query result is allowed to use in the results cache directory"),
+
/* BLOBSTORE section */
HIVE_BLOBSTORE_SUPPORTED_SCHEMES("hive.blobstore.supported.schemes", "s3,s3a,s3n",
http://git-wip-us.apache.org/repos/asf/hive/blob/1733a371/common/src/java/org/apache/hadoop/hive/ql/log/PerfLogger.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/ql/log/PerfLogger.java b/common/src/java/org/apache/hadoop/hive/ql/log/PerfLogger.java
index 2767bca..764a832 100644
--- a/common/src/java/org/apache/hadoop/hive/ql/log/PerfLogger.java
+++ b/common/src/java/org/apache/hadoop/hive/ql/log/PerfLogger.java
@@ -71,6 +71,7 @@ public class PerfLogger {
public static final String TEZ_INIT_OPERATORS = "TezInitializeOperators";
public static final String LOAD_HASHTABLE = "LoadHashtable";
public static final String TEZ_GET_SESSION = "TezGetSession";
+ public static final String SAVE_TO_RESULTS_CACHE = "saveToResultsCache";
public static final String SPARK_SUBMIT_TO_RUNNING = "SparkSubmitToRunning";
public static final String SPARK_BUILD_PLAN = "SparkBuildPlan";
http://git-wip-us.apache.org/repos/asf/hive/blob/1733a371/data/conf/hive-site.xml
----------------------------------------------------------------------
diff --git a/data/conf/hive-site.xml b/data/conf/hive-site.xml
index 01f83d1..b56cbd2 100644
--- a/data/conf/hive-site.xml
+++ b/data/conf/hive-site.xml
@@ -328,4 +328,9 @@
<value>99</value>
</property>
+<property>
+ <name>hive.query.results.cache.enabled</name>
+ <value>false</value>
+</property>
+
</configuration>
http://git-wip-us.apache.org/repos/asf/hive/blob/1733a371/data/conf/llap/hive-site.xml
----------------------------------------------------------------------
diff --git a/data/conf/llap/hive-site.xml b/data/conf/llap/hive-site.xml
index cdda875..c4c299c 100644
--- a/data/conf/llap/hive-site.xml
+++ b/data/conf/llap/hive-site.xml
@@ -348,4 +348,9 @@
<value>99</value>
</property>
+<property>
+ <name>hive.query.results.cache.enabled</name>
+ <value>false</value>
+</property>
+
</configuration>
http://git-wip-us.apache.org/repos/asf/hive/blob/1733a371/data/conf/perf-reg/spark/hive-site.xml
----------------------------------------------------------------------
diff --git a/data/conf/perf-reg/spark/hive-site.xml b/data/conf/perf-reg/spark/hive-site.xml
index 497a61f..5ca660d 100644
--- a/data/conf/perf-reg/spark/hive-site.xml
+++ b/data/conf/perf-reg/spark/hive-site.xml
@@ -265,4 +265,9 @@
<value>org.apache.hadoop.hive.metastore.ObjectStore</value>
</property>
+<property>
+ <name>hive.query.results.cache.enabled</name>
+ <value>false</value>
+</property>
+
</configuration>
http://git-wip-us.apache.org/repos/asf/hive/blob/1733a371/data/conf/perf-reg/tez/hive-site.xml
----------------------------------------------------------------------
diff --git a/data/conf/perf-reg/tez/hive-site.xml b/data/conf/perf-reg/tez/hive-site.xml
index 012369f..62ecb74 100644
--- a/data/conf/perf-reg/tez/hive-site.xml
+++ b/data/conf/perf-reg/tez/hive-site.xml
@@ -282,4 +282,9 @@
<value>true</value>
</property>
+<property>
+ <name>hive.query.results.cache.enabled</name>
+ <value>false</value>
+</property>
+
</configuration>
http://git-wip-us.apache.org/repos/asf/hive/blob/1733a371/data/conf/rlist/hive-site.xml
----------------------------------------------------------------------
diff --git a/data/conf/rlist/hive-site.xml b/data/conf/rlist/hive-site.xml
index 9de00e5..630e481 100644
--- a/data/conf/rlist/hive-site.xml
+++ b/data/conf/rlist/hive-site.xml
@@ -319,4 +319,9 @@
<value>99</value>
</property>
+<property>
+ <name>hive.query.results.cache.enabled</name>
+ <value>false</value>
+</property>
+
</configuration>
http://git-wip-us.apache.org/repos/asf/hive/blob/1733a371/data/conf/spark/local/hive-site.xml
----------------------------------------------------------------------
diff --git a/data/conf/spark/local/hive-site.xml b/data/conf/spark/local/hive-site.xml
index fd0e6a0..8ff6256 100644
--- a/data/conf/spark/local/hive-site.xml
+++ b/data/conf/spark/local/hive-site.xml
@@ -261,4 +261,9 @@
<value>false</value>
</property>
+<property>
+ <name>hive.query.results.cache.enabled</name>
+ <value>false</value>
+</property>
+
</configuration>
http://git-wip-us.apache.org/repos/asf/hive/blob/1733a371/data/conf/spark/standalone/hive-site.xml
----------------------------------------------------------------------
diff --git a/data/conf/spark/standalone/hive-site.xml b/data/conf/spark/standalone/hive-site.xml
index 7095979..84851c7 100644
--- a/data/conf/spark/standalone/hive-site.xml
+++ b/data/conf/spark/standalone/hive-site.xml
@@ -266,4 +266,9 @@
<value>false</value>
</property>
+<property>
+ <name>hive.query.results.cache.enabled</name>
+ <value>false</value>
+</property>
+
</configuration>
http://git-wip-us.apache.org/repos/asf/hive/blob/1733a371/data/conf/spark/yarn-client/hive-site.xml
----------------------------------------------------------------------
diff --git a/data/conf/spark/yarn-client/hive-site.xml b/data/conf/spark/yarn-client/hive-site.xml
index a9a788b..6c63362 100644
--- a/data/conf/spark/yarn-client/hive-site.xml
+++ b/data/conf/spark/yarn-client/hive-site.xml
@@ -306,4 +306,9 @@
<value>false</value>
</property>
+<property>
+ <name>hive.query.results.cache.enabled</name>
+ <value>false</value>
+</property>
+
</configuration>
http://git-wip-us.apache.org/repos/asf/hive/blob/1733a371/data/conf/tez/hive-site.xml
----------------------------------------------------------------------
diff --git a/data/conf/tez/hive-site.xml b/data/conf/tez/hive-site.xml
index 4519678..236adc7 100644
--- a/data/conf/tez/hive-site.xml
+++ b/data/conf/tez/hive-site.xml
@@ -293,4 +293,9 @@
<value>99</value>
</property>
+<property>
+ <name>hive.query.results.cache.enabled</name>
+ <value>false</value>
+</property>
+
</configuration>
http://git-wip-us.apache.org/repos/asf/hive/blob/1733a371/itests/hive-blobstore/src/test/resources/hive-site.xml
----------------------------------------------------------------------
diff --git a/itests/hive-blobstore/src/test/resources/hive-site.xml b/itests/hive-blobstore/src/test/resources/hive-site.xml
index 038db0d..775c559 100644
--- a/itests/hive-blobstore/src/test/resources/hive-site.xml
+++ b/itests/hive-blobstore/src/test/resources/hive-site.xml
@@ -284,6 +284,11 @@
<value>hdfs,pfile,file,s3,s3a,pblob</value>
</property>
+ <property>
+ <name>hive.query.results.cache.enabled</name>
+ <value>false</value>
+ </property>
+
<!--
To run these tests:
# Create a file blobstore-conf.xml - DO NOT ADD TO REVISION CONTROL
http://git-wip-us.apache.org/repos/asf/hive/blob/1733a371/itests/src/test/resources/testconfiguration.properties
----------------------------------------------------------------------
diff --git a/itests/src/test/resources/testconfiguration.properties b/itests/src/test/resources/testconfiguration.properties
index 9a76b85..2a22db9 100644
--- a/itests/src/test/resources/testconfiguration.properties
+++ b/itests/src/test/resources/testconfiguration.properties
@@ -254,6 +254,7 @@ minillaplocal.shared.query.files=alter_merge_2_orc.q,\
ptf.q,\
ptf_matchpath.q,\
ptf_streaming.q,\
+ results_cache_1.q,\
sample1.q,\
selectDistinctStar.q,\
select_dummy_source.q,\
http://git-wip-us.apache.org/repos/asf/hive/blob/1733a371/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java
----------------------------------------------------------------------
diff --git a/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java b/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java
index 2d0aca0..fcce531 100644
--- a/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java
+++ b/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java
@@ -101,6 +101,7 @@ import org.apache.hadoop.hive.llap.daemon.MiniLlapCluster;
import org.apache.hadoop.hive.llap.io.api.LlapProxy;
import org.apache.hadoop.hive.metastore.Warehouse;
import org.apache.hadoop.hive.metastore.api.Index;
+import org.apache.hadoop.hive.ql.cache.results.QueryResultsCache;
import org.apache.hadoop.hive.ql.exec.FunctionRegistry;
import org.apache.hadoop.hive.ql.exec.Task;
import org.apache.hadoop.hive.ql.exec.Utilities;
@@ -1013,6 +1014,9 @@ public class QTestUtil {
return;
}
+ // Remove any cached results from the previous test.
+ QueryResultsCache.cleanupInstance();
+
// allocate and initialize a new conf since a test can
// modify conf by using 'set' commands
conf = new HiveConf(IDriver.class);
http://git-wip-us.apache.org/repos/asf/hive/blob/1733a371/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
index 2d7e459..c6f7d64 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
@@ -55,6 +55,9 @@ import org.apache.hadoop.hive.metastore.ColumnType;
import org.apache.hadoop.hive.metastore.HiveMetaStoreUtils;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.Schema;
+import org.apache.hadoop.hive.ql.cache.results.CacheUsage;
+import org.apache.hadoop.hive.ql.cache.results.QueryResultsCache;
+import org.apache.hadoop.hive.ql.cache.results.QueryResultsCache.CacheEntry;
import org.apache.hadoop.hive.ql.exec.ConditionalTask;
import org.apache.hadoop.hive.ql.exec.DagUtils;
import org.apache.hadoop.hive.ql.exec.ExplainTask;
@@ -188,6 +191,9 @@ public class Driver implements IDriver {
// either initTxnMgr or from the SessionState, in that order.
private HiveTxnManager queryTxnMgr;
+ private CacheUsage cacheUsage;
+ private CacheEntry usedCacheEntry;
+
private enum DriverState {
INITIALIZED,
COMPILING,
@@ -638,6 +644,11 @@ public class Driver implements IDriver {
}
LOG.info("Semantic Analysis Completed");
+ // Retrieve information about cache usage for the query.
+ if (conf.getBoolVar(HiveConf.ConfVars.HIVE_QUERY_RESULTS_CACHE_ENABLED)) {
+ cacheUsage = sem.getCacheUsage();
+ }
+
// validate the plan
sem.validate();
perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.ANALYZE);
@@ -1788,6 +1799,45 @@ public class Driver implements IDriver {
return new CommandProcessorResponse(ret, errorMessage, SQLState, downstreamError);
}
+ private void useFetchFromCache(CacheEntry cacheEntry) {
+ // Change query FetchTask to use new location specified in results cache.
+ FetchTask fetchTaskFromCache = (FetchTask) TaskFactory.get(cacheEntry.getFetchWork(), conf);
+ fetchTaskFromCache.initialize(queryState, plan, null, ctx.getOpContext());
+ plan.setFetchTask(fetchTaskFromCache);
+ cacheUsage = new CacheUsage(CacheUsage.CacheStatus.QUERY_USING_CACHE, cacheEntry);
+ }
+
+ private void checkCacheUsage() throws Exception {
+ if (cacheUsage != null) {
+ if (cacheUsage.getStatus() == CacheUsage.CacheStatus.QUERY_USING_CACHE) {
+ // Using a previously cached result.
+ CacheEntry cacheEntry = cacheUsage.getCacheEntry();
+
+ // Reader count already incremented during cache lookup.
+ // Save to usedCacheEntry to ensure reader is released after query.
+ usedCacheEntry = cacheEntry;
+ } else if (cacheUsage.getStatus() == CacheUsage.CacheStatus.CAN_CACHE_QUERY_RESULTS &&
+ plan.getFetchTask() != null) {
+ // The query could not be resolved using the cache, but the query results
+ // can be added to the cache for future queries to use.
+ PerfLogger perfLogger = SessionState.getPerfLogger();
+ perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.SAVE_TO_RESULTS_CACHE);
+
+ CacheEntry savedCacheEntry =
+ QueryResultsCache.getInstance().addToCache(
+ cacheUsage.getQueryInfo(),
+ plan.getFetchTask().getWork());
+ if (savedCacheEntry != null) {
+ useFetchFromCache(savedCacheEntry);
+ // addToCache() already increments the reader count. Set usedCacheEntry so it gets released.
+ usedCacheEntry = savedCacheEntry;
+ }
+
+ perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.SAVE_TO_RESULTS_CACHE);
+ }
+ }
+ }
+
private void execute() throws CommandProcessorResponse {
PerfLogger perfLogger = SessionState.getPerfLogger();
perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.DRIVER_EXECUTE);
@@ -2002,6 +2052,8 @@ public class Driver implements IDriver {
}
perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.RUN_TASKS);
+ checkCacheUsage();
+
// in case we decided to run everything in local mode, restore the
// the jobtracker setting to its initial value
ctx.restoreOriginalTracker();
@@ -2410,12 +2462,23 @@ public class Driver implements IDriver {
LOG.debug(" Exception while clearing the FetchTask ", e);
}
}
+
+ private void releaseCachedResult() {
+ // Assumes the reader count has been incremented automatically by the results cache by either
+ // lookup or creating the cache entry.
+ if (usedCacheEntry != null) {
+ usedCacheEntry.releaseReader();
+ usedCacheEntry = null;
+ }
+ }
+
// Close and release resources within a running query process. Since it runs under
// driver state COMPILING, EXECUTING or INTERRUPT, it would not have race condition
// with the releases probably running in the other closing thread.
private int closeInProcess(boolean destroyed) {
releaseDriverContext();
releasePlan();
+ releaseCachedResult();
releaseFetchTask();
releaseResStream();
releaseContext();
@@ -2445,6 +2508,7 @@ public class Driver implements IDriver {
return 0;
}
releasePlan();
+ releaseCachedResult();
releaseFetchTask();
releaseResStream();
releaseContext();
http://git-wip-us.apache.org/repos/asf/hive/blob/1733a371/ql/src/java/org/apache/hadoop/hive/ql/cache/results/CacheUsage.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/cache/results/CacheUsage.java b/ql/src/java/org/apache/hadoop/hive/ql/cache/results/CacheUsage.java
new file mode 100644
index 0000000..08b791a
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/cache/results/CacheUsage.java
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.cache.results;
+
+import org.apache.hadoop.hive.ql.cache.results.QueryResultsCache.CacheEntry;
+import org.apache.hadoop.hive.ql.cache.results.QueryResultsCache.QueryInfo;
+
+/**
+ * Helper class during semantic analysis that indicates if the query can use the cache,
+ * or if the results from the query can be added to the results cache.
+ */
+public class CacheUsage {
+
+ public enum CacheStatus {
+ CACHE_NOT_USED,
+ QUERY_USING_CACHE,
+ CAN_CACHE_QUERY_RESULTS,
+ };
+
+ private CacheUsage.CacheStatus status;
+ private CacheEntry cacheEntry;
+ private QueryInfo queryInfo;
+
+ public CacheUsage(CacheStatus status, CacheEntry cacheEntry) {
+ this.status = status;
+ this.cacheEntry = cacheEntry;
+ }
+
+ public CacheUsage(CacheStatus status, QueryInfo queryInfo) {
+ this.status = status;
+ this.queryInfo = queryInfo;
+ }
+
+ public CacheUsage.CacheStatus getStatus() {
+ return status;
+ }
+
+ public void setStatus(CacheUsage.CacheStatus status) {
+ this.status = status;
+ }
+
+ public CacheEntry getCacheEntry() {
+ return cacheEntry;
+ }
+
+ public void setCacheEntry(CacheEntry cacheEntry) {
+ this.cacheEntry = cacheEntry;
+ }
+
+ public QueryInfo getQueryInfo() {
+ return queryInfo;
+ }
+
+ public void setQueryInfo(QueryInfo queryInfo) {
+ this.queryInfo = queryInfo;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/1733a371/ql/src/java/org/apache/hadoop/hive/ql/cache/results/QueryResultsCache.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/cache/results/QueryResultsCache.java b/ql/src/java/org/apache/hadoop/hive/ql/cache/results/QueryResultsCache.java
new file mode 100644
index 0000000..131127e
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/cache/results/QueryResultsCache.java
@@ -0,0 +1,666 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.cache.results;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import org.apache.hadoop.fs.ContentSummary;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.hooks.Entity.Type;
+import org.apache.hadoop.hive.ql.hooks.ReadEntity;
+import org.apache.hadoop.hive.ql.metadata.SessionHiveMetaStoreClient;
+import org.apache.hadoop.hive.ql.metadata.Table;
+import org.apache.hadoop.hive.ql.parse.ColumnAccessInfo;
+import org.apache.hadoop.hive.ql.parse.TableAccessInfo;
+import org.apache.hadoop.hive.ql.plan.FetchWork;
+import org.apache.hadoop.hive.ql.plan.HiveOperation;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A class to handle management and lookup of cached Hive query results.
+ */
+public final class QueryResultsCache {
+
+ private static final Logger LOG = LoggerFactory.getLogger(QueryResultsCache.class);
+
+ public static class LookupInfo {
+ private String queryText;
+
+ public LookupInfo(String queryText) {
+ super();
+ this.queryText = queryText;
+ }
+
+ public String getQueryText() {
+ return queryText;
+ }
+ }
+
+ public static class QueryInfo {
+ private LookupInfo lookupInfo;
+ private HiveOperation hiveOperation;
+ private List<FieldSchema> resultSchema;
+ private TableAccessInfo tableAccessInfo;
+ private ColumnAccessInfo columnAccessInfo;
+ private Set<ReadEntity> inputs;
+
+ public QueryInfo(
+ LookupInfo lookupInfo,
+ HiveOperation hiveOperation,
+ List<FieldSchema> resultSchema,
+ TableAccessInfo tableAccessInfo,
+ ColumnAccessInfo columnAccessInfo,
+ Set<ReadEntity> inputs) {
+ this.lookupInfo = lookupInfo;
+ this.hiveOperation = hiveOperation;
+ this.resultSchema = resultSchema;
+ this.tableAccessInfo = tableAccessInfo;
+ this.columnAccessInfo = columnAccessInfo;
+ this.inputs = inputs;
+ }
+
+ public LookupInfo getLookupInfo() {
+ return lookupInfo;
+ }
+
+ public void setLookupInfo(LookupInfo lookupInfo) {
+ this.lookupInfo = lookupInfo;
+ }
+
+ public HiveOperation getHiveOperation() {
+ return hiveOperation;
+ }
+
+ public void setHiveOperation(HiveOperation hiveOperation) {
+ this.hiveOperation = hiveOperation;
+ }
+
+ public List<FieldSchema> getResultSchema() {
+ return resultSchema;
+ }
+
+ public void setResultSchema(List<FieldSchema> resultSchema) {
+ this.resultSchema = resultSchema;
+ }
+
+ public TableAccessInfo getTableAccessInfo() {
+ return tableAccessInfo;
+ }
+
+ public void setTableAccessInfo(TableAccessInfo tableAccessInfo) {
+ this.tableAccessInfo = tableAccessInfo;
+ }
+
+ public ColumnAccessInfo getColumnAccessInfo() {
+ return columnAccessInfo;
+ }
+
+ public void setColumnAccessInfo(ColumnAccessInfo columnAccessInfo) {
+ this.columnAccessInfo = columnAccessInfo;
+ }
+
+ public Set<ReadEntity> getInputs() {
+ return inputs;
+ }
+
+ public void setInputs(Set<ReadEntity> inputs) {
+ this.inputs = inputs;
+ }
+ }
+
+ public static class CacheEntry {
+ private QueryInfo queryInfo;
+ private FetchWork fetchWork;
+ private Path cachedResultsPath;
+
+ // Cache administration
+ private long createTime;
+ private long size;
+ private AtomicBoolean valid = new AtomicBoolean(false);
+ private AtomicInteger readers = new AtomicInteger(0);
+ private ScheduledFuture<?> invalidationFuture = null;
+
+ public boolean isValid() {
+ return valid.get();
+ }
+
+ public void releaseReader() {
+ int readerCount = 0;
+ synchronized (this) {
+ readerCount = readers.decrementAndGet();
+ }
+ LOG.debug("releaseReader: entry: {}, readerCount: {}", this, readerCount);
+ Preconditions.checkState(readerCount >= 0);
+
+ cleanupIfNeeded();
+ }
+
+ public String toString() {
+ return "CacheEntry query: [" + getQueryInfo().getLookupInfo().getQueryText()
+ + "], location: " + cachedResultsPath
+ + ", size: " + size;
+ }
+
+ public boolean addReader() {
+ boolean added = false;
+ int readerCount = 0;
+ synchronized (this) {
+ if (valid.get()) {
+ readerCount = readers.incrementAndGet();
+ added = true;
+ }
+ }
+ Preconditions.checkState(readerCount > 0);
+ LOG.debug("addReader: entry: {}, readerCount: {}", this, readerCount);
+ return added;
+ }
+
+ private int numReaders() {
+ return readers.get();
+ }
+
+ private void invalidate() {
+ boolean wasValid = setValidity(false);
+
+ if (wasValid) {
+ LOG.info("Invalidated cache entry: {}", this);
+
+ if (invalidationFuture != null) {
+ // The cache entry has just been invalidated, no need for the scheduled invalidation.
+ invalidationFuture.cancel(false);
+ }
+ cleanupIfNeeded();
+ }
+ }
+
+ /**
+ * Set the validity, returning the previous validity value.
+ * @param valid
+ * @return
+ */
+ private boolean setValidity(boolean valid) {
+ synchronized(this) {
+ return this.valid.getAndSet(valid);
+ }
+ }
+
+ private void cleanupIfNeeded() {
+ if (!isValid() && readers.get() <= 0) {
+ QueryResultsCache.cleanupEntry(this);
+ }
+ }
+
+ private String getQueryText() {
+ return getQueryInfo().getLookupInfo().getQueryText();
+ }
+
+ public FetchWork getFetchWork() {
+ return fetchWork;
+ }
+
+ public QueryInfo getQueryInfo() {
+ return queryInfo;
+ }
+
+ public Path getCachedResultsPath() {
+ return cachedResultsPath;
+ }
+ }
+
+ // Allow lookup by query string
+ private final Map<String, Set<CacheEntry>> queryMap = new HashMap<String, Set<CacheEntry>>();
+
+ // LRU. Could also implement LRU as a doubly linked list if CacheEntry keeps its node.
+ // Use synchronized map since even read actions cause the lru to get updated.
+ private final Map<CacheEntry, CacheEntry> lru = Collections.synchronizedMap(
+ new LinkedHashMap<CacheEntry, CacheEntry>(INITIAL_LRU_SIZE, LRU_LOAD_FACTOR, true));
+
+ private final HiveConf conf;
+ private Path cacheDirPath;
+ private long cacheSize = 0;
+ private long maxCacheSize;
+ private long maxEntrySize;
+ private long maxEntryLifetime;
+ private ReadWriteLock rwLock = new ReentrantReadWriteLock();
+
+ private QueryResultsCache(HiveConf configuration) throws IOException {
+ this.conf = configuration;
+
+ // Set up cache directory
+ Path rootCacheDir = new Path(conf.getVar(HiveConf.ConfVars.HIVE_QUERY_RESULTS_CACHE_DIRECTORY));
+ LOG.info("Initializing query results cache at {}", rootCacheDir);
+ Utilities.ensurePathIsWritable(rootCacheDir, conf);
+
+ String currentCacheDirName = "results-" + UUID.randomUUID().toString();
+ cacheDirPath = new Path(rootCacheDir, currentCacheDirName);
+ FileSystem fs = cacheDirPath.getFileSystem(conf);
+ FsPermission fsPermission = new FsPermission("700");
+ fs.mkdirs(cacheDirPath, fsPermission);
+
+ // Results cache directory should be cleaned up at process termination.
+ fs.deleteOnExit(cacheDirPath);
+
+ maxCacheSize = conf.getLongVar(HiveConf.ConfVars.HIVE_QUERY_RESULTS_CACHE_MAX_SIZE);
+ maxEntrySize = conf.getLongVar(HiveConf.ConfVars.HIVE_QUERY_RESULTS_CACHE_MAX_ENTRY_SIZE);
+ maxEntryLifetime = conf.getTimeVar(
+ HiveConf.ConfVars.HIVE_QUERY_RESULTS_CACHE_MAX_ENTRY_LIFETIME,
+ TimeUnit.MILLISECONDS);
+
+ LOG.info("Query results cache: cacheDirectory {}, maxCacheSize {}, maxEntrySize {}, maxEntryLifetime {}",
+ cacheDirPath, maxCacheSize, maxEntrySize, maxEntryLifetime);
+ }
+
+ private static final AtomicBoolean inited = new AtomicBoolean(false);
+ private static QueryResultsCache instance;
+
+ public static void initialize(HiveConf conf) throws IOException {
+ if (!inited.getAndSet(true)) {
+ try {
+ instance = new QueryResultsCache(conf);
+ } catch (IOException err) {
+ inited.set(false);
+ throw err;
+ }
+ }
+ }
+
+ public static QueryResultsCache getInstance() {
+ return instance;
+ }
+
+ /**
+ * Check if the cache contains an entry for the requested LookupInfo.
+ * @param request
+ * @param addReader Should the reader count be incremented during the lookup.
+ * This will ensure the returned entry can be used after the lookup.
+ * If true, the caller will be responsible for decrementing the reader count
+ * using CacheEntry.releaseReader().
+ * @return The cached result if there is a match in the cache, or null if no match is found.
+ */
+ public CacheEntry lookup(LookupInfo request, boolean addReader) {
+ CacheEntry result = null;
+
+ LOG.debug("QueryResultsCache lookup for query: {}", request.queryText);
+
+ Lock readLock = rwLock.readLock();
+ try {
+ readLock.lock();
+ Set<CacheEntry> candidates = queryMap.get(request.queryText);
+ if (candidates != null) {
+ for (CacheEntry candidate : candidates) {
+ if (entryMatches(request, candidate)) {
+ result = candidate;
+ break;
+ }
+ }
+
+ if (result != null) {
+ lru.get(result); // Update LRU
+
+ if (!result.isValid()) {
+ // Entry is in the cache, but not valid.
+ // This can happen when the entry is first added, before the data has been moved
+ // to the results cache directory. We cannot use this entry yet.
+ result = null;
+ } else {
+ if (addReader) {
+ // Caller will need to be responsible for releasing the reader count.
+ result.addReader();
+ }
+ }
+ }
+ }
+ } finally {
+ readLock.unlock();
+ }
+
+ LOG.debug("QueryResultsCache lookup result: {}", result);
+
+ return result;
+ }
+
+ /**
+ * Add an entry to the query results cache.
+ * Important: Adding the entry to the cache will increment the reader count for the cache entry.
+ * CacheEntry.releaseReader() should be called when the caller is done with the cache entry.
+ *
+ * @param queryInfo
+ * @param fetchWork
+ * @return The entry if added to the cache. null if the entry is not added.
+ */
+ public CacheEntry addToCache(QueryInfo queryInfo, FetchWork fetchWork) {
+
+ CacheEntry addedEntry = null;
+ boolean dataDirMoved = false;
+ Path queryResultsPath = null;
+ Path cachedResultsPath = null;
+ String queryText = queryInfo.getLookupInfo().getQueryText();
+
+ // Should we remove other candidate entries if they are equivalent to these query results?
+ try {
+ CacheEntry potentialEntry = new CacheEntry();
+ potentialEntry.queryInfo = queryInfo;
+ queryResultsPath = fetchWork.getTblDir();
+ FileSystem resultsFs = queryResultsPath.getFileSystem(conf);
+ ContentSummary cs = resultsFs.getContentSummary(queryResultsPath);
+ potentialEntry.size = cs.getLength();
+
+ Lock writeLock = rwLock.writeLock();
+ try {
+ writeLock.lock();
+
+ if (!shouldEntryBeAdded(potentialEntry)) {
+ return null;
+ }
+ if (!clearSpaceForCacheEntry(potentialEntry)) {
+ return null;
+ }
+
+ LOG.info("Adding cache entry for query '{}'", queryText);
+
+ // Add the entry to the cache structures while under write lock. Do not mark the entry
+ // as valid yet, since the query results have not yet been moved to the cache directory.
+ // Do the data move after unlocking since it might take time.
+ // Mark the entry as valid once the data has been moved to the cache directory.
+ Set<CacheEntry> entriesForQuery = queryMap.get(queryText);
+ if (entriesForQuery == null) {
+ entriesForQuery = new HashSet<CacheEntry>();
+ queryMap.put(queryText, entriesForQuery);
+ }
+ entriesForQuery.add(potentialEntry);
+ lru.put(potentialEntry, potentialEntry);
+ cacheSize += potentialEntry.size;
+ addedEntry = potentialEntry;
+
+ } finally {
+ writeLock.unlock();
+ }
+
+ // Move the query results to the query cache directory.
+ cachedResultsPath = moveResultsToCacheDirectory(queryResultsPath);
+ dataDirMoved = true;
+ LOG.info("Moved query results from {} to {} (size {}) for query '{}'",
+ queryResultsPath, cachedResultsPath, cs.getLength(), queryText);
+
+ // Create a new FetchWork to reference the new cache location.
+ FetchWork fetchWorkForCache =
+ new FetchWork(cachedResultsPath, fetchWork.getTblDesc(), fetchWork.getLimit());
+ fetchWorkForCache.setCachedResult(true);
+ addedEntry.fetchWork = fetchWorkForCache;
+ addedEntry.cachedResultsPath = cachedResultsPath;
+ addedEntry.createTime = System.currentTimeMillis();
+ addedEntry.setValidity(true);
+
+ // Mark this entry as being in use. Caller will need to release later.
+ addedEntry.addReader();
+
+ scheduleEntryInvalidation(addedEntry);
+ } catch (Exception err) {
+ LOG.error("Failed to create cache entry for query results for query: " + queryText, err);
+
+ if (addedEntry != null) {
+ // If the entry was already added to the cache when we hit error, clean up properly.
+
+ if (dataDirMoved) {
+ // If data was moved from original location to cache directory, we need to move it back!
+ LOG.info("Restoring query results from {} back to {}", cachedResultsPath, queryResultsPath);
+ try {
+ FileSystem fs = cachedResultsPath.getFileSystem(conf);
+ fs.rename(cachedResultsPath, queryResultsPath);
+ addedEntry.cachedResultsPath = null;
+ } catch (Exception err2) {
+ String errMsg = "Failed cleanup during failed attempt to cache query: " + queryText;
+ LOG.error(errMsg);
+ throw new RuntimeException(errMsg);
+ }
+ }
+
+ addedEntry.invalidate();
+ if (addedEntry.numReaders() > 0) {
+ addedEntry.releaseReader();
+ }
+ }
+
+ return null;
+ }
+
+ return addedEntry;
+ }
+
+ public void clear() {
+ Lock writeLock = rwLock.writeLock();
+ try {
+ writeLock.lock();
+ LOG.info("Clearing the results cache");
+ for (CacheEntry entry : lru.keySet().toArray(EMPTY_CACHEENTRY_ARRAY)) {
+ try {
+ removeEntry(entry);
+ } catch (Exception err) {
+ LOG.error("Error removing cache entry " + entry, err);
+ }
+ }
+ } finally {
+ writeLock.unlock();
+ }
+ }
+
+ public long getSize() {
+ Lock readLock = rwLock.readLock();
+ try {
+ readLock.lock();
+ return cacheSize;
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+ private static final int INITIAL_LRU_SIZE = 16;
+ private static final float LRU_LOAD_FACTOR = 0.75f;
+ private static final CacheEntry[] EMPTY_CACHEENTRY_ARRAY = {};
+
+ private boolean entryMatches(LookupInfo lookupInfo, CacheEntry entry) {
+ QueryInfo queryInfo = entry.getQueryInfo();
+ for (ReadEntity readEntity : queryInfo.getInputs()) {
+ // Check that the tables used do not resolve to temp tables.
+ if (readEntity.getType() == Type.TABLE) {
+ Table tableUsed = readEntity.getTable();
+ Map<String, Table> tempTables =
+ SessionHiveMetaStoreClient.getTempTablesForDatabase(tableUsed.getDbName());
+ if (tempTables != null && tempTables.containsKey(tableUsed.getTableName())) {
+ LOG.info("{} resolves to a temporary table in the current session. This query cannot use the cache.",
+ tableUsed.getTableName());
+ return false;
+ }
+ }
+ }
+
+ return true;
+ }
+
+ private void removeEntry(CacheEntry entry) {
+ entry.invalidate();
+ removeFromLookup(entry);
+ lru.remove(entry);
+ // Should the cache size be updated here, or after the result data has actually been deleted?
+ cacheSize -= entry.size;
+ }
+
+ private void removeFromLookup(CacheEntry entry) {
+ String queryString = entry.getQueryText();
+ Set<CacheEntry> entries = queryMap.get(queryString);
+ Preconditions.checkState(entries != null);
+ boolean deleted = entries.remove(entry);
+ Preconditions.checkState(deleted);
+ if (entries.isEmpty()) {
+ queryMap.remove(queryString);
+ }
+ }
+
+ private void calculateEntrySize(CacheEntry entry, FetchWork fetchWork) throws IOException {
+ Path queryResultsPath = fetchWork.getTblDir();
+ FileSystem resultsFs = queryResultsPath.getFileSystem(conf);
+ ContentSummary cs = resultsFs.getContentSummary(queryResultsPath);
+ entry.size = cs.getLength();
+ }
+
+ /**
+ * Determines if the cache entry should be added to the results cache.
+ */
+ private boolean shouldEntryBeAdded(CacheEntry entry) {
+ // Assumes the cache lock has already been taken.
+ if (maxEntrySize >= 0 && entry.size > maxEntrySize) {
+ LOG.debug("Cache entry size {} larger than max entry size ({})", entry.size, maxEntrySize);
+ return false;
+ }
+
+ return true;
+ }
+
+ private Path moveResultsToCacheDirectory(Path queryResultsPath) throws IOException {
+ String dirName = UUID.randomUUID().toString();
+ Path cachedResultsPath = new Path(cacheDirPath, dirName);
+ FileSystem fs = cachedResultsPath.getFileSystem(conf);
+ fs.rename(queryResultsPath, cachedResultsPath);
+ return cachedResultsPath;
+ }
+
+ private boolean hasSpaceForCacheEntry(CacheEntry entry) {
+ if (maxCacheSize >= 0) {
+ return (cacheSize + entry.size) <= maxCacheSize;
+ }
+ // Negative max cache size means unbounded.
+ return true;
+ }
+
+ private boolean clearSpaceForCacheEntry(CacheEntry entry) {
+ if (hasSpaceForCacheEntry(entry)) {
+ return true;
+ }
+
+ LOG.info("Clearing space for cache entry for query: [{}] with size {}",
+ entry.getQueryText(), entry.size);
+
+ // Entries should be in LRU order in the keyset iterator.
+ CacheEntry[] entries = lru.keySet().toArray(EMPTY_CACHEENTRY_ARRAY);
+ for (CacheEntry removalCandidate : entries) {
+ if (!removalCandidate.isValid()) {
+ // Likely an entry which is still getting its results moved to the cache directory.
+ continue;
+ }
+ // Only delete the entry if it has no readers.
+ if (!(removalCandidate.numReaders() > 0)) {
+ LOG.info("Removing entry: {}", removalCandidate);
+ removeEntry(removalCandidate);
+ if (hasSpaceForCacheEntry(entry)) {
+ return true;
+ }
+ }
+ }
+
+ LOG.info("Could not free enough space for cache entry for query: [{}] withe size {}",
+ entry.getQueryText(), entry.size);
+ return false;
+ }
+
+
+ @VisibleForTesting
+ public static void cleanupInstance() {
+ // This should only ever be called in testing scenarios.
+ // There should not be any other users of the cache or its entries or this may mess up cleanup.
+ if (inited.get()) {
+ getInstance().clear();
+ instance = null;
+ inited.set(false);
+ }
+ }
+
+ private static ScheduledExecutorService invalidationExecutor = null;
+ private static ExecutorService deletionExecutor = null;
+
+ static {
+ ThreadFactory threadFactory =
+ new ThreadFactoryBuilder().setDaemon(true).setNameFormat("QueryResultsCache %d").build();
+ invalidationExecutor = Executors.newSingleThreadScheduledExecutor(threadFactory);
+ deletionExecutor = Executors.newSingleThreadExecutor(threadFactory);
+ }
+
+ private void scheduleEntryInvalidation(final CacheEntry entry) {
+ if (maxEntryLifetime >= 0) {
+ // Schedule task to invalidate cache entry.
+ ScheduledFuture<?> future = invalidationExecutor.schedule(new Runnable() {
+ @Override
+ public void run() {
+ entry.invalidate();
+ }
+ }, maxEntryLifetime, TimeUnit.MILLISECONDS);
+ entry.invalidationFuture = future;
+ }
+ }
+
+ private static void cleanupEntry(final CacheEntry entry) {
+ Preconditions.checkState(!entry.isValid());
+
+ if (entry.cachedResultsPath != null) {
+ deletionExecutor.execute(new Runnable() {
+ @Override
+ public void run() {
+ Path path = entry.cachedResultsPath;
+ LOG.info("Cache directory cleanup: deleting {}", path);
+ try {
+ FileSystem fs = entry.cachedResultsPath.getFileSystem(getInstance().conf);
+ fs.delete(entry.cachedResultsPath, true);
+ } catch (Exception err) {
+ LOG.error("Error while trying to delete " + path, err);
+ }
+ }
+ });
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/1733a371/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
index 675ca12..8f44c94 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
@@ -2626,6 +2626,10 @@ public final class Utilities {
return getTasks(tasks, new TaskFilterFunction<>(ExecDriver.class));
}
+ public static int getNumClusterJobs(List<Task<? extends Serializable>> tasks) {
+ return getMRTasks(tasks).size() + getTezTasks(tasks).size() + getSparkTasks(tasks).size();
+ }
+
static class TaskFilterFunction<T> implements DAGTraversal.Function {
private Set<Task<? extends Serializable>> visited = new HashSet<>();
private Class<T> requiredType;
@@ -4445,4 +4449,33 @@ public final class Utilities {
return AcidUtils.ORIGINAL_PATTERN.matcher(path.getName()).matches() ||
AcidUtils.ORIGINAL_PATTERN_COPY.matcher(path.getName()).matches();
}
+
+ /**
+ * Checks if path passed in exists and has writable permissions.
+ * The path will be created if it does not exist.
+ * @param rootHDFSDirPath
+ * @param conf
+ */
+ public static void ensurePathIsWritable(Path rootHDFSDirPath, HiveConf conf) throws IOException {
+ FsPermission writableHDFSDirPermission = new FsPermission((short)00733);
+ FileSystem fs = rootHDFSDirPath.getFileSystem(conf);
+ if (!fs.exists(rootHDFSDirPath)) {
+ Utilities.createDirsWithPermission(conf, rootHDFSDirPath, writableHDFSDirPermission, true);
+ }
+ FsPermission currentHDFSDirPermission = fs.getFileStatus(rootHDFSDirPath).getPermission();
+ if (rootHDFSDirPath != null && rootHDFSDirPath.toUri() != null) {
+ String schema = rootHDFSDirPath.toUri().getScheme();
+ LOG.debug("HDFS dir: " + rootHDFSDirPath + " with schema " + schema + ", permission: " +
+ currentHDFSDirPermission);
+ } else {
+ LOG.debug(
+ "HDFS dir: " + rootHDFSDirPath + ", permission: " + currentHDFSDirPermission);
+ }
+ // If the root HDFS scratch dir already exists, make sure it is writeable.
+ if (!((currentHDFSDirPermission.toShort() & writableHDFSDirPermission
+ .toShort()) == writableHDFSDirPermission.toShort())) {
+ throw new RuntimeException("The dir: " + rootHDFSDirPath
+ + " on HDFS should be writable. Current permissions are: " + currentHDFSDirPermission);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/1733a371/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/HiveCalciteUtil.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/HiveCalciteUtil.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/HiveCalciteUtil.java
index f0dd167..1f8a48c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/HiveCalciteUtil.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/HiveCalciteUtil.java
@@ -967,6 +967,44 @@ public class HiveCalciteUtil {
return AggregateCall.create(aggFunction, false, argList, -1, aggFnRetType, null);
}
+ /**
+ * Is the expression usable for query materialization.
+ */
+ public static boolean isMaterializable(RexNode expr) {
+ return (checkMaterializable(expr) == null);
+ }
+
+ /**
+ * Check if the expression is usable for query materialization, returning the first failing expression.
+ */
+ public static RexCall checkMaterializable(RexNode expr) {
+ boolean deterministic = true;
+ RexCall failingCall = null;
+
+ if (expr == null) {
+ return null;
+ }
+
+ RexVisitor<Void> visitor = new RexVisitorImpl<Void>(true) {
+ @Override
+ public Void visitCall(org.apache.calcite.rex.RexCall call) {
+ // non-deterministic functions as well as runtime constants are not materializable.
+ if (!call.getOperator().isDeterministic() || call.getOperator().isDynamicFunction()) {
+ throw new Util.FoundOne(call);
+ }
+ return super.visitCall(call);
+ }
+ };
+
+ try {
+ expr.accept(visitor);
+ } catch (Util.FoundOne e) {
+ failingCall = (RexCall) e.getNode();
+ }
+
+ return failingCall;
+ }
+
public static HiveTableFunctionScan createUDTFForSetOp(RelOptCluster cluster, RelNode input)
throws SemanticException {
RelTraitSet traitSet = TraitsUtil.getDefaultTraitSet(cluster);
http://git-wip-us.apache.org/repos/asf/hive/blob/1733a371/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/HiveRelOpMaterializationValidator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/HiveRelOpMaterializationValidator.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/HiveRelOpMaterializationValidator.java
new file mode 100644
index 0000000..8c1bcb3
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/HiveRelOpMaterializationValidator.java
@@ -0,0 +1,285 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.optimizer.calcite;
+
+import java.util.List;
+
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.AggregateCall;
+import org.apache.calcite.rel.core.TableFunctionScan;
+import org.apache.calcite.rel.core.TableScan;
+import org.apache.calcite.rel.logical.LogicalAggregate;
+import org.apache.calcite.rel.logical.LogicalCorrelate;
+import org.apache.calcite.rel.logical.LogicalExchange;
+import org.apache.calcite.rel.logical.LogicalFilter;
+import org.apache.calcite.rel.logical.LogicalIntersect;
+import org.apache.calcite.rel.logical.LogicalJoin;
+import org.apache.calcite.rel.logical.LogicalMatch;
+import org.apache.calcite.rel.logical.LogicalMinus;
+import org.apache.calcite.rel.logical.LogicalProject;
+import org.apache.calcite.rel.logical.LogicalSort;
+import org.apache.calcite.rel.logical.LogicalUnion;
+import org.apache.calcite.rel.logical.LogicalValues;
+import org.apache.calcite.rex.RexCall;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.util.Util;
+
+import org.apache.hadoop.hive.metastore.TableType;
+
+import org.apache.hadoop.hive.ql.metadata.Table;
+import org.apache.hadoop.hive.ql.optimizer.calcite.HiveCalciteUtil.JoinPredicateInfo;
+import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveAggregate;
+import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveExcept;
+import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveFilter;
+import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveIntersect;
+import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveJoin;
+import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveProject;
+import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveSemiJoin;
+import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveSortLimit;
+import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveTableScan;
+import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveUnion;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Checks the query plan for conditions that would make the plan unsuitable for
+ * materialized views or query caching:
+ * - References to temporary or external tables
+ * - References to non-determinisitc functions.
+ */
+public class HiveRelOpMaterializationValidator extends HiveRelShuttleImpl {
+ static final Logger LOG = LoggerFactory.getLogger(HiveRelOpMaterializationValidator.class);
+
+ protected String invalidMaterializationReason;
+
+ public void validateQueryMaterialization(RelNode relNode) {
+ try {
+ relNode.accept(this);
+ } catch (Util.FoundOne e) {
+ // Can ignore - the check failed.
+ }
+ }
+
+ @Override
+ public RelNode visit(TableScan scan) {
+ if (scan instanceof HiveTableScan) {
+ HiveTableScan hiveScan = (HiveTableScan) scan;
+ RelOptHiveTable relOptHiveTable = (RelOptHiveTable) hiveScan.getTable();
+ Table tab = relOptHiveTable.getHiveTableMD();
+ if (tab.isTemporary()) {
+ fail(tab.getTableName() + " is a temporary table");
+ }
+ TableType tt = tab.getTableType();
+ if (tab.getTableType() == TableType.EXTERNAL_TABLE) {
+ fail(tab.getFullyQualifiedName() + " is an external table");
+ }
+ return scan;
+ }
+
+ // TableScan of a non-Hive table - don't support for materializations.
+ fail(scan.getTable().getQualifiedName() + " is a table scan of a non-Hive table.");
+ return scan;
+ }
+
+ @Override
+ public RelNode visit(HiveProject project) {
+ for (RexNode expr : project.getProjects()) {
+ checkExpr(expr);
+ }
+ return super.visit(project);
+ }
+
+ @Override
+ public RelNode visit(HiveFilter filter) {
+ checkExpr(filter.getCondition());
+ return super.visit(filter);
+ }
+
+ @Override
+ public RelNode visit(HiveJoin join) {
+ checkExpr(join.getCondition());
+ return super.visit(join);
+ }
+
+ @Override
+ public RelNode visit(HiveAggregate aggregate) {
+ // Is there anything to check here?
+ return super.visit(aggregate);
+ }
+
+ @Override
+ public RelNode visit(RelNode node) {
+ // There are several Hive RelNode types which do not have their own visit() method
+ // defined in the HiveRelShuttle interface, which need to be handled appropriately here.
+ // Per jcamachorodriguez we should not encounter HiveMultiJoin/HiveSortExchange
+ // during these checks, so no need to add those here.
+ if (node instanceof HiveUnion) {
+ return visit((HiveUnion) node);
+ } else if (node instanceof HiveSortLimit) {
+ return visit((HiveSortLimit) node);
+ } else if (node instanceof HiveSemiJoin) {
+ return visit((HiveSemiJoin) node);
+ } else if (node instanceof HiveExcept) {
+ return visit((HiveExcept) node);
+ } else if (node instanceof HiveIntersect) {
+ return visit((HiveIntersect) node);
+ }
+
+ // Fall-back for an unexpected RelNode type
+ return fail(node);
+ }
+
+ @Override
+ public RelNode visit(TableFunctionScan scan) {
+ checkExpr(scan.getCall());
+ return super.visit(scan);
+ }
+
+ @Override
+ public RelNode visit(LogicalValues values) {
+ // Not expected to be encountered for Hive - fail
+ return fail(values);
+ }
+
+ @Override
+ public RelNode visit(LogicalFilter filter) {
+ // Not expected to be encountered for Hive - fail
+ return fail(filter);
+ }
+
+ @Override
+ public RelNode visit(LogicalProject project) {
+ // Not expected to be encountered for Hive - fail
+ return fail(project);
+ }
+
+ @Override
+ public RelNode visit(LogicalJoin join) {
+ // Not expected to be encountered for Hive - fail
+ return fail(join);
+ }
+
+ @Override
+ public RelNode visit(LogicalCorrelate correlate) {
+ // Not expected to be encountered for Hive - fail
+ return fail(correlate);
+ }
+
+ @Override
+ public RelNode visit(LogicalUnion union) {
+ // Not expected to be encountered for Hive - fail
+ return fail(union);
+ }
+
+ @Override
+ public RelNode visit(LogicalIntersect intersect) {
+ // Not expected to be encountered for Hive - fail
+ return fail(intersect);
+ }
+
+ @Override
+ public RelNode visit(LogicalMinus minus) {
+ // Not expected to be encountered for Hive - fail
+ return fail(minus);
+ }
+
+ @Override
+ public RelNode visit(LogicalAggregate aggregate) {
+ // Not expected to be encountered for Hive - fail
+ return fail(aggregate);
+ }
+
+ @Override
+ public RelNode visit(LogicalMatch match) {
+ // Not expected to be encountered for Hive - fail
+ return fail(match);
+ }
+
+ @Override
+ public RelNode visit(LogicalSort sort) {
+ // Not expected to be encountered for Hive - fail
+ return fail(sort);
+ }
+
+ @Override
+ public RelNode visit(LogicalExchange exchange) {
+ // Not expected to be encountered for Hive - fail
+ return fail(exchange);
+ }
+
+ // Note: Not currently part of the HiveRelNode interface
+ private RelNode visit(HiveUnion union) {
+ return visitChildren(union);
+ }
+
+ // Note: Not currently part of the HiveRelNode interface
+ private RelNode visit(HiveSortLimit sort) {
+ checkExpr(sort.getFetchExpr());
+ checkExpr(sort.getOffsetExpr());
+ return visitChildren(sort);
+ }
+
+ // Note: Not currently part of the HiveRelNode interface
+ private RelNode visit(HiveSemiJoin semiJoin) {
+ checkExpr(semiJoin.getCondition());
+ checkExpr(semiJoin.getJoinFilter());
+ return visitChildren(semiJoin);
+ }
+
+ // Note: Not currently part of the HiveRelNode interface
+ private RelNode visit(HiveExcept except) {
+ return visitChildren(except);
+ }
+
+ // Note: Not currently part of the HiveRelNode interface
+ private RelNode visit(HiveIntersect intersect) {
+ return visitChildren(intersect);
+ }
+
+ private void fail(String reason) {
+ setInvalidMaterializationReason(reason);
+ throw Util.FoundOne.NULL;
+ }
+
+ private RelNode fail(RelNode node) {
+ setInvalidMaterializationReason("Unsupported RelNode type " + node.getRelTypeName() +
+ " encountered in the query plan");
+ throw Util.FoundOne.NULL;
+ }
+
+ private void checkExpr(RexNode expr) {
+ RexCall invalidCall = HiveCalciteUtil.checkMaterializable(expr);
+ if (invalidCall != null) {
+ fail(invalidCall.getOperator().getName() + " is not a deterministic function");
+ }
+ }
+
+ public String getInvalidMaterializationReason() {
+ return invalidMaterializationReason;
+ }
+
+ public void setInvalidMaterializationReason(String invalidMaterializationReason) {
+ this.invalidMaterializationReason = invalidMaterializationReason;
+ }
+
+ public boolean isValidMaterialization() {
+ return invalidMaterializationReason == null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/1733a371/ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java
index e553a81..8a1bfd2 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java
@@ -55,6 +55,8 @@ import org.apache.hadoop.hive.ql.Context;
import org.apache.hadoop.hive.ql.ErrorMsg;
import org.apache.hadoop.hive.ql.QueryProperties;
import org.apache.hadoop.hive.ql.QueryState;
+import org.apache.hadoop.hive.ql.cache.results.CacheUsage;
+import org.apache.hadoop.hive.ql.cache.results.QueryResultsCache;
import org.apache.hadoop.hive.ql.exec.FetchTask;
import org.apache.hadoop.hive.ql.exec.Task;
import org.apache.hadoop.hive.ql.exec.TaskFactory;
@@ -148,6 +150,9 @@ public abstract class BaseSemanticAnalyzer {
protected LineageInfo linfo;
protected TableAccessInfo tableAccessInfo;
protected ColumnAccessInfo columnAccessInfo;
+
+ protected CacheUsage cacheUsage;
+
/**
* Columns accessed by updates
*/
@@ -1945,4 +1950,12 @@ public abstract class BaseSemanticAnalyzer {
}
return SessionState.get().getTxnMgr();
}
+
+ public CacheUsage getCacheUsage() {
+ return cacheUsage;
+ }
+
+ public void setCacheUsage(CacheUsage cacheUsage) {
+ this.cacheUsage = cacheUsage;
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/1733a371/ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java
index 85a1f34..cf2bc13 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java
@@ -154,6 +154,7 @@ import org.apache.hadoop.hive.ql.optimizer.calcite.HiveConfPlannerContext;
import org.apache.hadoop.hive.ql.optimizer.calcite.HiveDefaultRelMetadataProvider;
import org.apache.hadoop.hive.ql.optimizer.calcite.HivePlannerContext;
import org.apache.hadoop.hive.ql.optimizer.calcite.HiveRelFactories;
+import org.apache.hadoop.hive.ql.optimizer.calcite.HiveRelOpMaterializationValidator;
import org.apache.hadoop.hive.ql.optimizer.calcite.HiveRexExecutorImpl;
import org.apache.hadoop.hive.ql.optimizer.calcite.HiveTypeSystemImpl;
import org.apache.hadoop.hive.ql.optimizer.calcite.RelOptHiveTable;
@@ -1442,6 +1443,16 @@ public class CalcitePlanner extends SemanticAnalyzer {
}
perfLogger.PerfLogEnd(this.getClass().getName(), PerfLogger.OPTIMIZER, "Calcite: Plan generation");
+ // Validate query materialization (materialized views, query results caching.
+ // This check needs to occur before constant folding, which may remove some
+ // function calls from the query plan.
+ HiveRelOpMaterializationValidator matValidator = new HiveRelOpMaterializationValidator();
+ matValidator.validateQueryMaterialization(calciteGenPlan);
+ if (!matValidator.isValidMaterialization()) {
+ String reason = matValidator.getInvalidMaterializationReason();
+ setInvalidQueryMaterializationReason(reason);
+ }
+
// Create executor
RexExecutor executorProvider = new HiveRexExecutorImpl(optCluster);
calciteGenPlan.getCluster().getPlanner().setExecutor(executorProvider);
http://git-wip-us.apache.org/repos/asf/hive/blob/1733a371/ql/src/java/org/apache/hadoop/hive/ql/parse/QBParseInfo.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/QBParseInfo.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/QBParseInfo.java
index ae2ec3d..5789ee0 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/QBParseInfo.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/QBParseInfo.java
@@ -675,6 +675,9 @@ public class QBParseInfo {
return insertOverwriteTables;
}
+ public boolean hasInsertTables() {
+ return this.insertIntoTables.size() > 0 || this.insertOverwriteTables.size() > 0;
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/1733a371/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
index e6ee968..8e587f1 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
@@ -84,10 +84,13 @@ import org.apache.hadoop.hive.ql.Context;
import org.apache.hadoop.hive.ql.ErrorMsg;
import org.apache.hadoop.hive.ql.QueryProperties;
import org.apache.hadoop.hive.ql.QueryState;
+import org.apache.hadoop.hive.ql.cache.results.CacheUsage;
+import org.apache.hadoop.hive.ql.cache.results.QueryResultsCache;
import org.apache.hadoop.hive.ql.exec.AbstractMapJoinOperator;
import org.apache.hadoop.hive.ql.exec.ArchiveUtils;
import org.apache.hadoop.hive.ql.exec.ColumnInfo;
import org.apache.hadoop.hive.ql.exec.ExprNodeEvaluatorFactory;
+import org.apache.hadoop.hive.ql.exec.FetchTask;
import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
import org.apache.hadoop.hive.ql.exec.FilterOperator;
import org.apache.hadoop.hive.ql.exec.FunctionInfo;
@@ -361,6 +364,8 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
HiveParser.TOK_ORDERBY, HiveParser.TOK_WINDOWSPEC, HiveParser.TOK_CLUSTERBY,
HiveParser.TOK_DISTRIBUTEBY, HiveParser.TOK_SORTBY);
+ private String invalidQueryMaterializationReason;
+
static class Phase1Ctx {
String dest;
int nextNum;
@@ -11211,6 +11216,76 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
return getTableObjectByName(tableName, true);
}
+ private static void walkASTAndQualifyNames(ASTNode ast,
+ Set<String> cteAlias, Context ctx, Hive db, Set<Integer> ignoredTokens, UnparseTranslator unparseTranslator) {
+ Queue<Node> queue = new LinkedList<>();
+ queue.add(ast);
+ while (!queue.isEmpty()) {
+ ASTNode astNode = (ASTNode) queue.poll();
+ if (astNode.getToken().getType() == HiveParser.TOK_TABNAME) {
+ // Check if this is table name is qualified or not
+ String tabIdName = getUnescapedName(astNode).toLowerCase();
+ // if alias to CTE contains the table name, we do not do the translation because
+ // cte is actually a subquery.
+ if (!cteAlias.contains(tabIdName)) {
+ unparseTranslator.addTableNameTranslation(astNode, SessionState.get().getCurrentDatabase());
+ }
+ }
+
+ if (astNode.getChildCount() > 0 && !ignoredTokens.contains(astNode.getToken().getType())) {
+ for (Node child : astNode.getChildren()) {
+ queue.offer(child);
+ }
+ }
+ }
+ }
+
+ // Walk through the AST.
+ // Replace all TOK_TABREF with fully qualified table name, if it is not already fully qualified.
+ protected String rewriteQueryWithQualifiedNames(ASTNode ast, TokenRewriteStream tokenRewriteStream)
+ throws SemanticException {
+ UnparseTranslator unparseTranslator = new UnparseTranslator(conf);
+ unparseTranslator.enable();
+
+ // 1. collect information about CTE if there is any.
+ // The base table of CTE should be masked.
+ // The CTE itself should not be masked in the references in the following main query.
+ Set<String> cteAlias = new HashSet<>();
+ if (ast.getChildCount() > 0
+ && HiveParser.TOK_CTE == ((ASTNode) ast.getChild(0)).getToken().getType()) {
+ // the structure inside CTE is like this
+ // TOK_CTE
+ // TOK_SUBQUERY
+ // sq1 (may refer to sq2)
+ // ...
+ // TOK_SUBQUERY
+ // sq2
+ ASTNode cte = (ASTNode) ast.getChild(0);
+ // we start from sq2, end up with sq1.
+ for (int index = cte.getChildCount() - 1; index >= 0; index--) {
+ ASTNode subq = (ASTNode) cte.getChild(index);
+ String alias = unescapeIdentifier(subq.getChild(1).getText());
+ if (cteAlias.contains(alias)) {
+ throw new SemanticException("Duplicate definition of " + alias);
+ } else {
+ cteAlias.add(alias);
+ walkASTAndQualifyNames(ast, cteAlias, ctx, db, ignoredTokens, unparseTranslator);
+ }
+ }
+ // walk the other part of ast
+ for (int index = 1; index < ast.getChildCount(); index++) {
+ walkASTAndQualifyNames(ast, cteAlias, ctx, db, ignoredTokens, unparseTranslator);
+ }
+ } else { // there is no CTE, walk the whole AST
+ walkASTAndQualifyNames(ast, cteAlias, ctx, db, ignoredTokens, unparseTranslator);
+ }
+
+ unparseTranslator.applyTranslations(tokenRewriteStream);
+ String rewrittenQuery = tokenRewriteStream.toString(
+ ast.getTokenStartIndex(), ast.getTokenStopIndex());
+ return rewrittenQuery;
+ }
+
private static void walkASTMarkTABREF(TableMask tableMask, ASTNode ast, Set<String> cteAlias,
Context ctx, Hive db, Map<String, Table> tabNameToTabObject, Set<Integer> ignoredTokens)
throws SemanticException {
@@ -11549,6 +11624,20 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
}
}
+ // Check query results cache.
+ // If no masking/filtering required, then we can check the cache now, before
+ // generating the operator tree and going through CBO.
+ // Otherwise we have to wait until after the masking/filtering step.
+ boolean isCacheEnabled = conf.getBoolVar(HiveConf.ConfVars.HIVE_QUERY_RESULTS_CACHE_ENABLED);
+ QueryResultsCache.LookupInfo lookupInfo = null;
+ boolean needsTransform = needsTransform();
+ if (isCacheEnabled && !needsTransform && queryTypeCanUseCache()) {
+ lookupInfo = createLookupInfoForQuery(ast);
+ if (checkResultsCache(lookupInfo)) {
+ return;
+ }
+ }
+
// 2. Gen OP Tree from resolved Parse Tree
Operator sinkOp = genOPTree(ast, plannerCtx);
@@ -11571,6 +11660,16 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
}
}
+ // Check query results cache
+ // In the case that row or column masking/filtering was required, the cache must be checked
+ // here, after applying the masking/filtering rewrite rules to the AST.
+ if (isCacheEnabled && needsTransform && queryTypeCanUseCache()) {
+ lookupInfo = createLookupInfoForQuery(ast);
+ if (checkResultsCache(lookupInfo)) {
+ return;
+ }
+ }
+
// 3. Deduce Resultset Schema
if (createVwDesc != null && !this.ctx.isCboSucceeded()) {
resultSchema = convertRowSchemaToViewSchema(opParseCtx.get(sinkOp).getRowResolver());
@@ -11705,6 +11804,15 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
putAccessedColumnsToReadEntity(inputs, columnAccessInfo);
}
+ if (isCacheEnabled && lookupInfo != null) {
+ if (queryCanBeCached()) {
+ QueryResultsCache.QueryInfo queryInfo = createCacheQueryInfoForQuery(lookupInfo);
+
+ // Specify that the results of this query can be cached.
+ setCacheUsage(new CacheUsage(
+ CacheUsage.CacheStatus.CAN_CACHE_QUERY_RESULTS, queryInfo));
+ }
+ }
}
private void putAccessedColumnsToReadEntity(HashSet<ReadEntity> inputs, ColumnAccessInfo columnAccessInfo) {
@@ -13913,6 +14021,158 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
this.loadFileWork = loadFileWork;
}
+ /**
+ * Generate the query string for this query (with fully resolved table references).
+ * @return The query string with resolved references. NULL if an error occurred.
+ */
+ private String getQueryStringForCache(ASTNode ast) {
+ // Use the UnparseTranslator to resolve unqualified table names.
+ String queryString = ctx.getTokenRewriteStream().toString(ast.getTokenStartIndex(), ast.getTokenStopIndex());
+
+ // Re-using the TokenRewriteStream map for views so we do not overwrite the current TokenRewriteStream
+ String rewriteStreamName = "__qualified_query_string__";
+ ASTNode astNode;
+ try {
+ astNode = ParseUtils.parse(queryString, ctx, rewriteStreamName);
+ TokenRewriteStream tokenRewriteStream = ctx.getViewTokenRewriteStream(rewriteStreamName);
+ String fullyQualifiedQuery = rewriteQueryWithQualifiedNames(astNode, tokenRewriteStream);
+ return fullyQualifiedQuery;
+ } catch (Exception err) {
+ LOG.error("Unexpected error while reparsing the query string [" + queryString + "]", err);
+ // Don't fail the query - just return null (caller should skip cache lookup).
+ return null;
+ }
+ }
+
+ private QueryResultsCache.LookupInfo createLookupInfoForQuery(ASTNode astNode) {
+ QueryResultsCache.LookupInfo lookupInfo = null;
+ String queryString = getQueryStringForCache(astNode);
+ if (queryString != null) {
+ lookupInfo = new QueryResultsCache.LookupInfo(queryString);
+ }
+ return lookupInfo;
+ }
+
+ /**
+ * Set the query plan to use cache entry passed in to return the query results.
+ * @param cacheEntry The results cache entry that will be used to resolve the query.
+ */
+ private void useCachedResult(QueryResultsCache.CacheEntry cacheEntry) {
+ // Change query FetchTask to use new location specified in results cache.
+ FetchTask fetchTask = (FetchTask) TaskFactory.get(cacheEntry.getFetchWork(), conf);
+ setFetchTask(fetchTask);
+
+ queryState.setCommandType(cacheEntry.getQueryInfo().getHiveOperation());
+ resultSchema = cacheEntry.getQueryInfo().getResultSchema();
+ setTableAccessInfo(cacheEntry.getQueryInfo().getTableAccessInfo());
+ setColumnAccessInfo(cacheEntry.getQueryInfo().getColumnAccessInfo());
+ inputs.addAll(cacheEntry.getQueryInfo().getInputs());
+
+ // Indicate that the query will use a cached result.
+ setCacheUsage(new CacheUsage(
+ CacheUsage.CacheStatus.QUERY_USING_CACHE, cacheEntry));
+ }
+
+ private QueryResultsCache.QueryInfo createCacheQueryInfoForQuery(QueryResultsCache.LookupInfo lookupInfo) {
+ return new QueryResultsCache.QueryInfo(lookupInfo, queryState.getHiveOperation(),
+ resultSchema, getTableAccessInfo(), getColumnAccessInfo(), inputs);
+ }
+
+ /**
+ * Some initial checks for a query to see if we can look this query up in the results cache.
+ */
+ private boolean queryTypeCanUseCache() {
+ if (this instanceof ColumnStatsSemanticAnalyzer) {
+ // Column stats generates "select compute_stats() .." queries.
+ // Disable caching for these.
+ return false;
+ }
+
+ if (queryState.getHiveOperation() != HiveOperation.QUERY) {
+ return false;
+ }
+
+ if (qb.getParseInfo().isAnalyzeCommand()) {
+ return false;
+ }
+
+ if (qb.getParseInfo().hasInsertTables()) {
+ return false;
+ }
+
+ return true;
+ }
+
+ private boolean needsTransform() {
+ return SessionState.get().getAuthorizerV2() != null &&
+ SessionState.get().getAuthorizerV2().needTransform();
+ }
+
+ /**
+ * Called after a query plan has been generated, to determine if the results of this query
+ * can be added to the results cache.
+ */
+ private boolean queryCanBeCached() {
+ if (!queryTypeCanUseCache()) {
+ LOG.info("Not eligible for results caching - wrong query type");
+ return false;
+ }
+
+ // Query should have a fetch task.
+ if (getFetchTask() == null) {
+ LOG.info("Not eligible for results caching - no fetch task");
+ return false;
+ }
+
+ // At least one mr/tez/spark job
+ if (Utilities.getNumClusterJobs(getRootTasks()) == 0) {
+ LOG.info("Not eligible for results caching - no mr/tez/spark jobs");
+ return false;
+ }
+
+ // The query materialization validation check only occurs in CBO. Thus only cache results if CBO was used.
+ if (!ctx.isCboSucceeded()) {
+ LOG.info("Caching of query results is disabled if CBO was not run.");
+ return false;
+ }
+
+ if (!isValidQueryMaterialization()) {
+ LOG.info("Not eligible for results caching - {}", getInvalidQueryMaterializationReason());
+ return false;
+ }
+
+ return true;
+ }
+
+ /**
+ * Check the query results cache to see if the query represented by the lookupInfo can be
+ * answered using the results cache. If the cache contains a suitable entry, the semantic analyzer
+ * will be configured to use the found cache entry to anwer the query.
+ */
+ private boolean checkResultsCache(QueryResultsCache.LookupInfo lookupInfo) {
+ if (lookupInfo == null) {
+ return false;
+ }
+ try {
+ // In case this has not been initialized elsewhere.
+ QueryResultsCache.initialize(conf);
+ } catch (Exception err) {
+ throw new IllegalStateException(err);
+ }
+ // Don't increment the reader count for explain queries.
+ boolean isExplainQuery = (ctx.getExplainConfig() != null);
+ QueryResultsCache.CacheEntry cacheEntry =
+ QueryResultsCache.getInstance().lookup(lookupInfo, !isExplainQuery);
+ if (cacheEntry != null) {
+ // Use the cache rather than full query execution.
+ useCachedResult(cacheEntry);
+
+ // At this point the caller should return from semantic analysis.
+ return true;
+ }
+ return false;
+ }
+
private static final class ColsAndTypes {
public ColsAndTypes(String cols, String colTypes) {
this.cols = cols;
@@ -13921,4 +14181,17 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
public String cols;
public String colTypes;
}
+
+ public String getInvalidQueryMaterializationReason() {
+ return invalidQueryMaterializationReason;
+ }
+
+ public void setInvalidQueryMaterializationReason(
+ String invalidQueryMaterializationReason) {
+ this.invalidQueryMaterializationReason = invalidQueryMaterializationReason;
+ }
+
+ public boolean isValidQueryMaterialization() {
+ return (invalidQueryMaterializationReason == null);
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/1733a371/ql/src/java/org/apache/hadoop/hive/ql/plan/FetchWork.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/FetchWork.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/FetchWork.java
index 7243dc7..1f139c8 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/FetchWork.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/FetchWork.java
@@ -75,6 +75,11 @@ public class FetchWork implements Serializable {
*/
private boolean isUsingThriftJDBCBinarySerDe = false;
+ /**
+ * Whether this FetchWork is returning a cached query result
+ */
+ private boolean isCachedResult = false;
+
public boolean isHiveServerQuery() {
return isHiveServerQuery;
}
@@ -364,4 +369,12 @@ public class FetchWork implements Serializable {
}
return new FetchExplainVectorization(this);
}
+ @Explain(displayName = "Cached Query Result", displayOnlyOnTrue = true, explainLevels = { Level.USER, Level.DEFAULT, Level.EXTENDED })
+ public boolean isCachedResult() {
+ return isCachedResult;
+ }
+
+ public void setCachedResult(boolean isCachedResult) {
+ this.isCachedResult = isCachedResult;
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/1733a371/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java b/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java
index 3946d4a..dfc2dfa 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java
@@ -725,27 +725,7 @@ public class SessionState {
*/
private Path createRootHDFSDir(HiveConf conf) throws IOException {
Path rootHDFSDirPath = new Path(HiveConf.getVar(conf, HiveConf.ConfVars.SCRATCHDIR));
- FsPermission writableHDFSDirPermission = new FsPermission((short)00733);
- FileSystem fs = rootHDFSDirPath.getFileSystem(conf);
- if (!fs.exists(rootHDFSDirPath)) {
- Utilities.createDirsWithPermission(conf, rootHDFSDirPath, writableHDFSDirPermission, true);
- }
- FsPermission currentHDFSDirPermission = fs.getFileStatus(rootHDFSDirPath).getPermission();
- if (rootHDFSDirPath != null && rootHDFSDirPath.toUri() != null) {
- String schema = rootHDFSDirPath.toUri().getScheme();
- LOG.debug(
- "HDFS root scratch dir: " + rootHDFSDirPath + " with schema " + schema + ", permission: " +
- currentHDFSDirPermission);
- } else {
- LOG.debug(
- "HDFS root scratch dir: " + rootHDFSDirPath + ", permission: " + currentHDFSDirPermission);
- }
- // If the root HDFS scratch dir already exists, make sure it is writeable.
- if (!((currentHDFSDirPermission.toShort() & writableHDFSDirPermission
- .toShort()) == writableHDFSDirPermission.toShort())) {
- throw new RuntimeException("The root scratch dir: " + rootHDFSDirPath
- + " on HDFS should be writable. Current permissions are: " + currentHDFSDirPermission);
- }
+ Utilities.ensurePathIsWritable(rootHDFSDirPath, conf);
return rootHDFSDirPath;
}
http://git-wip-us.apache.org/repos/asf/hive/blob/1733a371/ql/src/test/queries/clientpositive/results_cache_1.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/results_cache_1.q b/ql/src/test/queries/clientpositive/results_cache_1.q
new file mode 100644
index 0000000..4aea60e
--- /dev/null
+++ b/ql/src/test/queries/clientpositive/results_cache_1.q
@@ -0,0 +1,96 @@
+
+set hive.query.results.cache.enabled=true;
+
+explain
+select count(*) from src a join src b on (a.key = b.key);
+select count(*) from src a join src b on (a.key = b.key);
+
+set test.comment="Cache should be used for this query";
+set test.comment;
+explain
+select count(*) from src a join src b on (a.key = b.key);
+select count(*) from src a join src b on (a.key = b.key);
+
+set hive.query.results.cache.enabled=false;
+set test.comment="Cache is disabled, should not be used here.";
+set test.comment;
+explain
+select count(*) from src a join src b on (a.key = b.key);
+
+create database db1;
+use db1;
+create table src as select key, value from default.src;
+
+set hive.query.results.cache.enabled=true;
+set test.comment="Same query string, but different current database. Cache should not be used since unqualified tablenames resolve to different tables";
+set test.comment;
+explain
+select count(*) from src a join src b on (a.key = b.key);
+
+use default;
+
+-- Union
+select * from src where key = 0
+union all
+select * from src where key = 2;
+
+set test.comment="Union all. Cache should be used now";
+set test.comment;
+explain
+select * from src where key = 0
+union all
+select * from src where key = 2;
+
+select * from src where key = 0
+union all
+select * from src where key = 2;
+
+
+-- CTE
+with q1 as ( select distinct key from q2 ),
+q2 as ( select key, value from src where key < 10 )
+select * from q1 a, q1 b where a.key = b.key;
+
+set test.comment="CTE. Cache should be used now";
+set test.comment;
+explain
+with q1 as ( select distinct key from q2 ),
+q2 as ( select key, value from src where key < 10 )
+select * from q1 a, q1 b where a.key = b.key;
+
+with q1 as ( select distinct key from q2 ),
+q2 as ( select key, value from src where key < 10 )
+select * from q1 a, q1 b where a.key = b.key;
+
+-- Intersect/Except
+with q1 as ( select distinct key, value from src ),
+q2 as ( select key, value from src where key < 10 ),
+q3 as ( select key, value from src where key = 0 )
+select * from q1 intersect all select * from q2 except all select * from q3;
+
+set test.comment="Intersect/Except. Cache should be used now";
+set test.comment;
+explain
+with q1 as ( select distinct key, value from src ),
+q2 as ( select key, value from src where key < 10 ),
+q3 as ( select key, value from src where key = 0 )
+select * from q1 intersect all select * from q2 except all select * from q3;
+
+with q1 as ( select distinct key, value from src ),
+q2 as ( select key, value from src where key < 10 ),
+q3 as ( select key, value from src where key = 0 )
+select * from q1 intersect all select * from q2 except all select * from q3;
+
+-- Semijoin. Use settings from cbo_semijoin
+set hive.mapred.mode=nonstrict;
+set hive.exec.check.crossproducts=false;
+set hive.stats.fetch.column.stats=true;
+set hive.auto.convert.join=false;
+
+select a, c, count(*) from (select key as a, c_int+1 as b, sum(c_int) as c from cbo_t1 where (cbo_t1.c_int + 1 >= 0) and (cbo_t1.c_int > 0 or cbo_t1.c_float >= 0) group by c_float, cbo_t1.c_int, key having cbo_t1.c_float > 0 and (c_int >=1 or c_float >= 1) and (c_int + c_float) >= 0 order by a+b desc, c asc limit 5) cbo_t1 left semi join (select key as p, c_int+1 as q, sum(c_int) as r from cbo_t2 where (cbo_t2.c_int + 1 >= 0) and (cbo_t2.c_int > 0 or cbo_t2.c_float >= 0) group by c_float, cbo_t2.c_int, key having cbo_t2.c_float > 0 and (c_int >=1 or c_float >= 1) and (c_int + c_float) >= 0 order by q+r/10 desc, p limit 5) cbo_t2 on cbo_t1.a=p left semi join cbo_t3 on cbo_t1.a=key where (b + 1 >= 0) and (b > 0 or a >= 0) group by a, c having a > 0 and (a >=1 or c >= 1) and (a + c) >= 0 order by c, a;
+
+set test.comment="Semijoin. Cache should be used now";
+set test.comment;
+explain
+select a, c, count(*) from (select key as a, c_int+1 as b, sum(c_int) as c from cbo_t1 where (cbo_t1.c_int + 1 >= 0) and (cbo_t1.c_int > 0 or cbo_t1.c_float >= 0) group by c_float, cbo_t1.c_int, key having cbo_t1.c_float > 0 and (c_int >=1 or c_float >= 1) and (c_int + c_float) >= 0 order by a+b desc, c asc limit 5) cbo_t1 left semi join (select key as p, c_int+1 as q, sum(c_int) as r from cbo_t2 where (cbo_t2.c_int + 1 >= 0) and (cbo_t2.c_int > 0 or cbo_t2.c_float >= 0) group by c_float, cbo_t2.c_int, key having cbo_t2.c_float > 0 and (c_int >=1 or c_float >= 1) and (c_int + c_float) >= 0 order by q+r/10 desc, p limit 5) cbo_t2 on cbo_t1.a=p left semi join cbo_t3 on cbo_t1.a=key where (b + 1 >= 0) and (b > 0 or a >= 0) group by a, c having a > 0 and (a >=1 or c >= 1) and (a + c) >= 0 order by c, a;
+select a, c, count(*) from (select key as a, c_int+1 as b, sum(c_int) as c from cbo_t1 where (cbo_t1.c_int + 1 >= 0) and (cbo_t1.c_int > 0 or cbo_t1.c_float >= 0) group by c_float, cbo_t1.c_int, key having cbo_t1.c_float > 0 and (c_int >=1 or c_float >= 1) and (c_int + c_float) >= 0 order by a+b desc, c asc limit 5) cbo_t1 left semi join (select key as p, c_int+1 as q, sum(c_int) as r from cbo_t2 where (cbo_t2.c_int + 1 >= 0) and (cbo_t2.c_int > 0 or cbo_t2.c_float >= 0) group by c_float, cbo_t2.c_int, key having cbo_t2.c_float > 0 and (c_int >=1 or c_float >= 1) and (c_int + c_float) >= 0 order by q+r/10 desc, p limit 5) cbo_t2 on cbo_t1.a=p left semi join cbo_t3 on cbo_t1.a=key where (b + 1 >= 0) and (b > 0 or a >= 0) group by a, c having a > 0 and (a >=1 or c >= 1) and (a + c) >= 0 order by c, a;
http://git-wip-us.apache.org/repos/asf/hive/blob/1733a371/ql/src/test/queries/clientpositive/results_cache_2.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/results_cache_2.q b/ql/src/test/queries/clientpositive/results_cache_2.q
new file mode 100644
index 0000000..96a9092
--- /dev/null
+++ b/ql/src/test/queries/clientpositive/results_cache_2.q
@@ -0,0 +1,41 @@
+
+set hive.query.results.cache.enabled=true;
+set hive.fetch.task.conversion=more;
+
+-- Test 1: fetch task
+explain
+select key, value from src where key=0;
+select key, value from src where key=0;
+
+set test.comment=Query only requires fetch task - should not use results cache;
+set test.comment;
+explain
+select key, value from src where key=0;
+
+
+-- Test 2: deterministic function should use cache.
+select c1, count(*)
+from (select sign(value) c1, value from src where key < 10) q
+group by c1;
+
+set test.comment=This query should use the cache;
+set test.comment;
+explain
+select c1, count(*)
+from (select sign(value) c1, value from src where key < 10) q
+group by c1;
+
+-- Test 3: non-deterministic functions should not be cached
+-- Set current timestamp config to get repeatable result.
+set hive.test.currenttimestamp=2012-01-01 01:02:03;
+
+select c1, count(*)
+from (select current_timestamp c1, value from src where key < 10) q
+group by c1;
+
+set test.comment=Queries using non-deterministic functions should not use results cache;
+set test.comment;
+explain
+select c1, count(*)
+from (select current_timestamp c1, value from src where key < 10) q
+group by c1;