You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2015/07/16 00:40:02 UTC
hive git commit: HIVE-10535 : LLAP: Cleanup map join cache when a
query completes (Sergey Shelukhin, reviewed by Siddharth Seth)
Repository: hive
Updated Branches:
refs/heads/llap 08aabedca -> 53b0cb750
HIVE-10535 : LLAP: Cleanup map join cache when a query completes (Sergey Shelukhin, reviewed by Siddharth Seth)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/53b0cb75
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/53b0cb75
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/53b0cb75
Branch: refs/heads/llap
Commit: 53b0cb7505887474aff954a62ae93ca8e90b6f63
Parents: 08aabed
Author: Sergey Shelukhin <se...@apache.org>
Authored: Wed Jul 15 15:39:50 2015 -0700
Committer: Sergey Shelukhin <se...@apache.org>
Committed: Wed Jul 15 15:39:50 2015 -0700
----------------------------------------------------------------------
.../llap/daemon/impl/ContainerRunnerImpl.java | 22 ++++++--
.../hive/llap/daemon/impl/QueryTracker.java | 18 +++++--
.../llap/shufflehandler/ShuffleHandler.java | 1 +
.../hadoop/hive/ql/exec/MapJoinOperator.java | 6 +--
.../hadoop/hive/ql/exec/ObjectCacheFactory.java | 46 +++++++++++++---
.../hadoop/hive/ql/exec/ObjectCacheWrapper.java | 57 ++++++++++++++++++++
.../hive/ql/exec/tez/LlapObjectCache.java | 5 +-
.../hive/ql/exec/tez/MapRecordProcessor.java | 9 ++--
.../ql/exec/tez/MergeFileRecordProcessor.java | 6 +--
.../hive/ql/exec/tez/RecordProcessor.java | 2 +-
.../hive/ql/exec/tez/ReduceRecordProcessor.java | 6 +--
.../hadoop/hive/ql/exec/tez/TezProcessor.java | 17 +++++-
.../apache/hadoop/hive/shims/Hadoop23Shims.java | 2 +-
13 files changed, 163 insertions(+), 34 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/53b0cb75/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
index cba057c..710c593 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
@@ -26,6 +26,7 @@ import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.llap.configuration.LlapConfiguration;
import org.apache.hadoop.hive.llap.daemon.ContainerRunner;
import org.apache.hadoop.hive.llap.daemon.FragmentCompletionHandler;
@@ -42,6 +43,7 @@ import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWor
import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.TerminateFragmentRequestProto;
import org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorMetrics;
import org.apache.hadoop.hive.llap.shufflehandler.ShuffleHandler;
+import org.apache.hadoop.hive.ql.exec.tez.TezProcessor;
import org.apache.hadoop.io.DataInputBuffer;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.token.Token;
@@ -185,14 +187,13 @@ public class ContainerRunnerImpl extends CompositeService implements ContainerRu
Token<JobTokenIdentifier> jobToken = TokenCache.getSessionToken(credentials);
- // TODO Unregistering does not happen at the moment, since there's no signals on when an app completes.
LOG.info("DEBUG: Registering request with the ShuffleHandler");
ShuffleHandler.get()
.registerDag(request.getApplicationIdString(), dagIdentifier, jobToken,
request.getUser(), localDirs);
TaskRunnerCallable callable = new TaskRunnerCallable(request, fragmentInfo, new Configuration(getConfig()),
- new ExecutionContextImpl(localAddress.get().getHostName()), env,
+ new LlapExecutionContext(localAddress.get().getHostName(), queryTracker), env,
credentials, memoryPerExecutor, amReporter, confParams, metrics, killedTaskHandler,
this);
try {
@@ -209,6 +210,21 @@ public class ContainerRunnerImpl extends CompositeService implements ContainerRu
}
}
+ private static class LlapExecutionContext extends ExecutionContextImpl
+ implements TezProcessor.Hook {
+ private final QueryTracker queryTracker;
+ public LlapExecutionContext(String hostname, QueryTracker queryTracker) {
+ super(hostname);
+ this.queryTracker = queryTracker;
+ }
+
+ @Override
+ public void initializeHook(TezProcessor source) {
+ queryTracker.registerDagQueryId(source.getContext().getDAGName(),
+ HiveConf.getVar(source.getConf(), HiveConf.ConfVars.HIVEQUERYID));
+ }
+ }
+
@Override
public void sourceStateUpdated(SourceStateUpdatedRequestProto request) {
LOG.info("Processing state update: " + stringifySourceStateUpdateRequest(request));
@@ -307,7 +323,7 @@ public class ContainerRunnerImpl extends CompositeService implements ContainerRu
public void queryFailed(String queryId, String dagName) {
LOG.info("Processing query failed notification for {}", dagName);
List<QueryFragmentInfo> knownFragments =
- queryTracker.queryComplete(null, dagName, -1);
+ queryTracker.queryComplete(queryId, dagName, -1);
LOG.info("DBG: Pending fragment count for failed query {} = {}", dagName,
knownFragments.size());
for (QueryFragmentInfo fragmentInfo : knownFragments) {
http://git-wip-us.apache.org/repos/asf/hive/blob/53b0cb75/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java
index 19147e3..2db2833 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java
@@ -20,6 +20,7 @@ import org.apache.hadoop.hive.llap.configuration.LlapConfiguration;
import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.FragmentSpecProto;
import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateProto;
import org.apache.hadoop.hive.llap.shufflehandler.ShuffleHandler;
+import org.apache.hadoop.hive.ql.exec.ObjectCacheFactory;
import org.apache.hadoop.service.CompositeService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -69,6 +70,10 @@ public class QueryTracker extends CompositeService {
// may be processed by a thread which ends up executing before a task.
private final ConcurrentMap<String, ConcurrentMap<String, SourceStateProto>> sourceCompletionMap = new ConcurrentHashMap<>();
+ // Tracks queryId by dagName. This can only be set when config is parsed in TezProcessor,
+ // all the other existing code passes queryId equal to 0 everywhere.
+ private final ConcurrentHashMap<String, String> dagNameToQueryId = new ConcurrentHashMap<>();
+
public QueryTracker(Configuration conf, String[] localDirsBase) {
super("QueryTracker");
this.localDirsBase = localDirsBase;
@@ -158,7 +163,6 @@ public class QueryTracker extends CompositeService {
completedDagMap.add(dagName);
LOG.info("Processing queryComplete for dagName={} with deleteDelay={} seconds", dagName,
deleteDelay);
- completedDagMap.add(dagName);
QueryInfo queryInfo = queryInfoMap.remove(dagName);
if (queryInfo == null) {
LOG.warn("Ignoring query complete for unknown dag: {}", dagName);
@@ -176,10 +180,13 @@ public class QueryTracker extends CompositeService {
// and the structures are cleaned up once all tasks complete. New requests, however, should not
// be allowed after a query complete is received.
sourceCompletionMap.remove(dagName);
+ String savedQueryId = dagNameToQueryId.remove(dagName);
+ queryId = queryId == null ? savedQueryId : queryId;
dagSpecificLocks.remove(dagName);
+ if (queryId != null) {
+ ObjectCacheFactory.removeLlapQueryCache(queryId);
+ }
return queryInfo.getRegisteredFragments();
- // TODO HIVE-10762 Issue a kill message to all running fragments for this container.
- // TODO HIVE-10535 Cleanup map join cache
} finally {
dagLock.writeLock().unlock();
}
@@ -227,4 +234,9 @@ public class QueryTracker extends CompositeService {
}
return dagMap;
}
+
+ public void registerDagQueryId(String dagName, String queryId) {
+ if (queryId == null) return;
+ dagNameToQueryId.putIfAbsent(dagName, queryId);
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/53b0cb75/llap-server/src/java/org/apache/hadoop/hive/llap/shufflehandler/ShuffleHandler.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/shufflehandler/ShuffleHandler.java b/llap-server/src/java/org/apache/hadoop/hive/llap/shufflehandler/ShuffleHandler.java
index 741c7f2..099cf26 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/shufflehandler/ShuffleHandler.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/shufflehandler/ShuffleHandler.java
@@ -263,6 +263,7 @@ public class ShuffleHandler implements AttemptRegistrationListener {
maxShuffleThreads = 2 * Runtime.getRuntime().availableProcessors();
}
+ // TODO: this is never used
localDirs = conf.getTrimmedStrings(SHUFFLE_HANDLER_LOCAL_DIRS);
shuffleBufferSize = conf.getInt(SHUFFLE_BUFFER_SIZE,
http://git-wip-us.apache.org/repos/asf/hive/blob/53b0cb75/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java
index a77b37a..31c5723 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java
@@ -132,10 +132,10 @@ public class MapJoinOperator extends AbstractMapJoinOperator<MapJoinDesc> implem
// On Tez only: The hash map might already be cached in the container we run
// the task in. On MR: The cache is a no-op.
- cacheKey = HiveConf.getVar(hconf, HiveConf.ConfVars.HIVEQUERYID)
- + "__HASH_MAP_"+this.getOperatorId()+"_container";
+ String queryId = HiveConf.getVar(hconf, HiveConf.ConfVars.HIVEQUERYID);
+ cacheKey = "HASH_MAP_" + this.getOperatorId() + "_container";
- cache = ObjectCacheFactory.getCache(hconf);
+ cache = ObjectCacheFactory.getCache(hconf, queryId);
loader = getHashTableLoader(hconf);
hashMapRowGetters = null;
http://git-wip-us.apache.org/repos/asf/hive/blob/53b0cb75/ql/src/java/org/apache/hadoop/hive/ql/exec/ObjectCacheFactory.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ObjectCacheFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/ObjectCacheFactory.java
index 5d48651..dcf16f9 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ObjectCacheFactory.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ObjectCacheFactory.java
@@ -18,6 +18,10 @@
package org.apache.hadoop.hive.ql.exec;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.llap.io.api.LlapIoProxy;
@@ -29,6 +33,9 @@ import org.apache.hadoop.hive.ql.exec.tez.LlapObjectCache;
* the hive conf.
*/
public class ObjectCacheFactory {
+ private static final ConcurrentHashMap<String, ObjectCache> llapQueryCaches =
+ new ConcurrentHashMap<>();
+ private static final Log LOG = LogFactory.getLog(ObjectCacheFactory.class);
private ObjectCacheFactory() {
// avoid instantiation
@@ -37,19 +44,42 @@ public class ObjectCacheFactory {
/**
* Returns the appropriate cache
*/
- public static ObjectCache getCache(Configuration conf) {
+ public static ObjectCache getCache(Configuration conf, String queryId) {
if (HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("tez")) {
if (LlapIoProxy.isDaemon()) { // daemon
- if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.LLAP_OBJECT_CACHE_ENABLED)) {
- return new org.apache.hadoop.hive.ql.exec.tez.LlapObjectCache();
- } else { // no cache
- return new org.apache.hadoop.hive.ql.exec.mr.ObjectCache();
- }
+ if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.LLAP_OBJECT_CACHE_ENABLED)) {
+ return getLlapObjectCache(queryId);
+ } else { // no cache
+ return new ObjectCacheWrapper(
+ new org.apache.hadoop.hive.ql.exec.mr.ObjectCache(), queryId);
+ }
} else { // container
- return new org.apache.hadoop.hive.ql.exec.tez.ObjectCache();
+ return new ObjectCacheWrapper(
+ new org.apache.hadoop.hive.ql.exec.tez.ObjectCache(), queryId);
}
} else { // mr or spark
- return new org.apache.hadoop.hive.ql.exec.mr.ObjectCache();
+ return new ObjectCacheWrapper(
+ new org.apache.hadoop.hive.ql.exec.mr.ObjectCache(), queryId);
+ }
+ }
+
+ private static ObjectCache getLlapObjectCache(String queryId) {
+ // If order of events (i.e. dagstart and fragmentstart) was guaranteed, we could just
+ // create the cache when dag starts, and blindly return it to execution here.
+ ObjectCache result = llapQueryCaches.get(queryId);
+ if (result != null) return result;
+ result = new LlapObjectCache();
+ ObjectCache old = llapQueryCaches.putIfAbsent(queryId, result);
+ if (old == null && LOG.isDebugEnabled()) {
+ LOG.debug("Created object cache for " + queryId);
+ }
+ return (old != null) ? old : result;
+ }
+
+ public static void removeLlapQueryCache(String queryId) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Removing object cache for " + queryId);
}
+ llapQueryCaches.remove(queryId);
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/53b0cb75/ql/src/java/org/apache/hadoop/hive/ql/exec/ObjectCacheWrapper.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ObjectCacheWrapper.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/ObjectCacheWrapper.java
new file mode 100644
index 0000000..9768efa
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ObjectCacheWrapper.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.exec;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.Future;
+
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+
+public class ObjectCacheWrapper implements ObjectCache {
+ private final String queryId;
+ private final ObjectCache globalCache;
+ public ObjectCacheWrapper(ObjectCache globalCache, String queryId) {
+ this.queryId = queryId;
+ this.globalCache = globalCache;
+ }
+
+ @Override
+ public void release(String key) {
+ globalCache.release(makeKey(key));
+ }
+
+ @Override
+ public <T> T retrieve(String key, Callable<T> fn) throws HiveException {
+ return globalCache.retrieve(makeKey(key), fn);
+ }
+
+ @Override
+ public <T> Future<T> retrieveAsync(String key, Callable<T> fn)
+ throws HiveException {
+ return globalCache.retrieveAsync(makeKey(key), fn);
+ }
+
+ @Override
+ public void remove(String key) {
+ globalCache.remove(makeKey(key));
+ }
+
+ private String makeKey(String key) {
+ return queryId + "_" + key;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/53b0cb75/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/LlapObjectCache.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/LlapObjectCache.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/LlapObjectCache.java
index b4a3236..00f3c54 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/LlapObjectCache.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/LlapObjectCache.java
@@ -52,17 +52,14 @@ public class LlapObjectCache implements org.apache.hadoop.hive.ql.exec.ObjectCac
private static ExecutorService staticPool = Executors.newCachedThreadPool();
- private static final boolean isLogDebugEnabled = LOG.isDebugEnabled();
private static final boolean isLogInfoEnabled = LOG.isInfoEnabled();
- public LlapObjectCache() {
- }
-
@Override
public void release(String key) {
// nothing to do, soft references will clean themselves up
}
+ @SuppressWarnings("unchecked")
@Override
public <T> T retrieve(String key, Callable<T> fn) throws HiveException {
http://git-wip-us.apache.org/repos/asf/hive/blob/53b0cb75/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java
index 9ce8b8c..c758000 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java
@@ -94,10 +94,11 @@ public class MapRecordProcessor extends RecordProcessor {
public MapRecordProcessor(final JobConf jconf, final ProcessorContext context) throws Exception {
super(jconf, context);
+ String queryId = HiveConf.getVar(jconf, HiveConf.ConfVars.HIVEQUERYID);
if (LlapIoProxy.isDaemon()) { // do not cache plan
cache = new org.apache.hadoop.hive.ql.exec.mr.ObjectCache();
} else {
- cache = ObjectCacheFactory.getCache(jconf);
+ cache = ObjectCacheFactory.getCache(jconf, queryId);
}
execContext = new ExecMapperContext(jconf);
execContext.setJc(jconf);
@@ -111,8 +112,8 @@ public class MapRecordProcessor extends RecordProcessor {
perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TEZ_INIT_OPERATORS);
super.init(mrReporter, inputs, outputs);
- String queryId = HiveConf.getVar(jconf, HiveConf.ConfVars.HIVEQUERYID);
- String key = queryId + processorContext.getTaskVertexName() + MAP_PLAN_KEY;
+
+ String key = processorContext.getTaskVertexName() + MAP_PLAN_KEY;
cacheKeys.add(key);
// create map and fetch operators
@@ -133,7 +134,7 @@ public class MapRecordProcessor extends RecordProcessor {
continue;
}
- key = queryId + processorContext.getTaskVertexName() + prefix;
+ key = processorContext.getTaskVertexName() + prefix;
cacheKeys.add(key);
mergeWorkList.add(
http://git-wip-us.apache.org/repos/asf/hive/blob/53b0cb75/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MergeFileRecordProcessor.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MergeFileRecordProcessor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MergeFileRecordProcessor.java
index 7c0eb89..f5d34db 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MergeFileRecordProcessor.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MergeFileRecordProcessor.java
@@ -92,14 +92,14 @@ public class MergeFileRecordProcessor extends RecordProcessor {
.initialize();
}
+ String queryId = HiveConf.getVar(jconf, HiveConf.ConfVars.HIVEQUERYID);
org.apache.hadoop.hive.ql.exec.ObjectCache cache = ObjectCacheFactory
- .getCache(jconf);
+ .getCache(jconf, queryId);
try {
execContext.setJc(jconf);
- String queryId = HiveConf.getVar(jconf, HiveConf.ConfVars.HIVEQUERYID);
- cacheKey = queryId + MAP_PLAN_KEY;
+ cacheKey = MAP_PLAN_KEY;
MapWork mapWork = (MapWork) cache.retrieve(cacheKey, new Callable<Object>() {
@Override
http://git-wip-us.apache.org/repos/asf/hive/blob/53b0cb75/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/RecordProcessor.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/RecordProcessor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/RecordProcessor.java
index 0859dc4..6182dab 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/RecordProcessor.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/RecordProcessor.java
@@ -131,7 +131,7 @@ public abstract class RecordProcessor {
continue;
}
- key = queryId + prefix;
+ key = prefix;
cacheKeys.add(key);
mergeWorkList.add((BaseWork) cache.retrieve(key, new Callable<Object>() {
http://git-wip-us.apache.org/repos/asf/hive/blob/53b0cb75/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java
index e5b2d93..77677a0 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java
@@ -91,14 +91,14 @@ public class ReduceRecordProcessor extends RecordProcessor{
ObjectCache cache;
+ String queryId = HiveConf.getVar(jconf, HiveConf.ConfVars.HIVEQUERYID);
if (LlapIoProxy.isDaemon()) { // don't cache plan
cache = new org.apache.hadoop.hive.ql.exec.mr.ObjectCache();
} else {
- cache = ObjectCacheFactory.getCache(jconf);
+ cache = ObjectCacheFactory.getCache(jconf, queryId);
}
- String queryId = HiveConf.getVar(jconf, HiveConf.ConfVars.HIVEQUERYID);
- String cacheKey = queryId + processorContext.getTaskVertexName() + REDUCE_PLAN_KEY;
+ String cacheKey = processorContext.getTaskVertexName() + REDUCE_PLAN_KEY;
cacheKeys = Lists.newArrayList(cacheKey);
reduceWork = (ReduceWork) cache.retrieve(cacheKey, new Callable<Object>() {
@Override
http://git-wip-us.apache.org/repos/asf/hive/blob/53b0cb75/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java
index 9baa0c1..1abcf3b 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java
@@ -34,6 +34,7 @@ import org.apache.tez.common.TezUtils;
import org.apache.tez.mapreduce.processor.MRTaskReporter;
import org.apache.tez.runtime.api.AbstractLogicalIOProcessor;
import org.apache.tez.runtime.api.Event;
+import org.apache.tez.runtime.api.ExecutionContext;
import org.apache.tez.runtime.api.LogicalInput;
import org.apache.tez.runtime.api.LogicalOutput;
import org.apache.tez.runtime.api.ProcessorContext;
@@ -45,7 +46,13 @@ import org.apache.tez.runtime.library.api.KeyValueWriter;
*/
public class TezProcessor extends AbstractLogicalIOProcessor {
-
+ /**
+ * This provides the ability to pass things into TezProcessor, which is normally impossible
+ * because of how Tez APIs are structured. Piggyback on ExecutionContext.
+ */
+ public static interface Hook {
+ void initializeHook(TezProcessor source);
+ }
private static final Log LOG = LogFactory.getLog(TezProcessor.class);
protected boolean isMap = false;
@@ -92,6 +99,10 @@ public class TezProcessor extends AbstractLogicalIOProcessor {
Configuration conf = TezUtils.createConfFromUserPayload(getContext().getUserPayload());
this.jobConf = new JobConf(conf);
this.processorContext = getContext();
+ ExecutionContext execCtx = processorContext.getExecutionContext();
+ if (execCtx instanceof Hook) {
+ ((Hook)execCtx).initializeHook(this);
+ }
setupMRLegacyConfigs(processorContext);
perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.TEZ_INITIALIZE_PROCESSOR);
}
@@ -217,4 +228,8 @@ public class TezProcessor extends AbstractLogicalIOProcessor {
writer.write(key, value);
}
}
+
+ public JobConf getConf() {
+ return jobConf;
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/53b0cb75/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java
----------------------------------------------------------------------
diff --git a/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java b/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java
index cc2ee56..21dde51 100644
--- a/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java
+++ b/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java
@@ -708,7 +708,7 @@ public class Hadoop23Shims extends HadoopShimsSecure {
if (!filter.accept(filterPath)) continue;
}
LocatedFileStatus lfs = next.makeQualifiedLocated(fsUri, p);
- result.add(new HdfsFileStatusWithIdImpl(lfs, next.getFileId())); // TODO#: here
+ result.add(new HdfsFileStatusWithIdImpl(lfs, next.getFileId()));
}
current = current.hasMore() ? dfsc.listPaths(src, current.getLastName(), true) : null;
}