You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2015/07/16 00:40:02 UTC

hive git commit: HIVE-10535 : LLAP: Cleanup map join cache when a query completes (Sergey Shelukhin, reviewed by Siddharth Seth)

Repository: hive
Updated Branches:
  refs/heads/llap 08aabedca -> 53b0cb750


HIVE-10535 : LLAP: Cleanup map join cache when a query completes (Sergey Shelukhin, reviewed by Siddharth Seth)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/53b0cb75
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/53b0cb75
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/53b0cb75

Branch: refs/heads/llap
Commit: 53b0cb7505887474aff954a62ae93ca8e90b6f63
Parents: 08aabed
Author: Sergey Shelukhin <se...@apache.org>
Authored: Wed Jul 15 15:39:50 2015 -0700
Committer: Sergey Shelukhin <se...@apache.org>
Committed: Wed Jul 15 15:39:50 2015 -0700

----------------------------------------------------------------------
 .../llap/daemon/impl/ContainerRunnerImpl.java   | 22 ++++++--
 .../hive/llap/daemon/impl/QueryTracker.java     | 18 +++++--
 .../llap/shufflehandler/ShuffleHandler.java     |  1 +
 .../hadoop/hive/ql/exec/MapJoinOperator.java    |  6 +--
 .../hadoop/hive/ql/exec/ObjectCacheFactory.java | 46 +++++++++++++---
 .../hadoop/hive/ql/exec/ObjectCacheWrapper.java | 57 ++++++++++++++++++++
 .../hive/ql/exec/tez/LlapObjectCache.java       |  5 +-
 .../hive/ql/exec/tez/MapRecordProcessor.java    |  9 ++--
 .../ql/exec/tez/MergeFileRecordProcessor.java   |  6 +--
 .../hive/ql/exec/tez/RecordProcessor.java       |  2 +-
 .../hive/ql/exec/tez/ReduceRecordProcessor.java |  6 +--
 .../hadoop/hive/ql/exec/tez/TezProcessor.java   | 17 +++++-
 .../apache/hadoop/hive/shims/Hadoop23Shims.java |  2 +-
 13 files changed, 163 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/53b0cb75/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
index cba057c..710c593 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
@@ -26,6 +26,7 @@ import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.llap.configuration.LlapConfiguration;
 import org.apache.hadoop.hive.llap.daemon.ContainerRunner;
 import org.apache.hadoop.hive.llap.daemon.FragmentCompletionHandler;
@@ -42,6 +43,7 @@ import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWor
 import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.TerminateFragmentRequestProto;
 import org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorMetrics;
 import org.apache.hadoop.hive.llap.shufflehandler.ShuffleHandler;
+import org.apache.hadoop.hive.ql.exec.tez.TezProcessor;
 import org.apache.hadoop.io.DataInputBuffer;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.token.Token;
@@ -185,14 +187,13 @@ public class ContainerRunnerImpl extends CompositeService implements ContainerRu
 
       Token<JobTokenIdentifier> jobToken = TokenCache.getSessionToken(credentials);
 
-      // TODO Unregistering does not happen at the moment, since there's no signals on when an app completes.
       LOG.info("DEBUG: Registering request with the ShuffleHandler");
       ShuffleHandler.get()
           .registerDag(request.getApplicationIdString(), dagIdentifier, jobToken,
               request.getUser(), localDirs);
 
       TaskRunnerCallable callable = new TaskRunnerCallable(request, fragmentInfo, new Configuration(getConfig()),
-          new ExecutionContextImpl(localAddress.get().getHostName()), env,
+          new LlapExecutionContext(localAddress.get().getHostName(), queryTracker), env,
           credentials, memoryPerExecutor, amReporter, confParams, metrics, killedTaskHandler,
           this);
       try {
@@ -209,6 +210,21 @@ public class ContainerRunnerImpl extends CompositeService implements ContainerRu
     }
   }
 
+  private static class LlapExecutionContext extends ExecutionContextImpl
+      implements TezProcessor.Hook {
+    private final QueryTracker queryTracker;
+    public LlapExecutionContext(String hostname, QueryTracker queryTracker) {
+      super(hostname);
+      this.queryTracker = queryTracker;
+    }
+
+    @Override
+    public void initializeHook(TezProcessor source) {
+      queryTracker.registerDagQueryId(source.getContext().getDAGName(),
+          HiveConf.getVar(source.getConf(), HiveConf.ConfVars.HIVEQUERYID));
+    }
+  }
+
   @Override
   public void sourceStateUpdated(SourceStateUpdatedRequestProto request) {
     LOG.info("Processing state update: " + stringifySourceStateUpdateRequest(request));
@@ -307,7 +323,7 @@ public class ContainerRunnerImpl extends CompositeService implements ContainerRu
   public void queryFailed(String queryId, String dagName) {
     LOG.info("Processing query failed notification for {}", dagName);
     List<QueryFragmentInfo> knownFragments =
-        queryTracker.queryComplete(null, dagName, -1);
+        queryTracker.queryComplete(queryId, dagName, -1);
     LOG.info("DBG: Pending fragment count for failed query {} = {}", dagName,
         knownFragments.size());
     for (QueryFragmentInfo fragmentInfo : knownFragments) {

http://git-wip-us.apache.org/repos/asf/hive/blob/53b0cb75/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java
index 19147e3..2db2833 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java
@@ -20,6 +20,7 @@ import org.apache.hadoop.hive.llap.configuration.LlapConfiguration;
 import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.FragmentSpecProto;
 import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateProto;
 import org.apache.hadoop.hive.llap.shufflehandler.ShuffleHandler;
+import org.apache.hadoop.hive.ql.exec.ObjectCacheFactory;
 import org.apache.hadoop.service.CompositeService;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -69,6 +70,10 @@ public class QueryTracker extends CompositeService {
   // may be processed by a thread which ends up executing before a task.
   private final ConcurrentMap<String, ConcurrentMap<String, SourceStateProto>> sourceCompletionMap = new ConcurrentHashMap<>();
 
+  // Tracks queryId by dagName. This can only be set when config is parsed in TezProcessor,
+  // all the other existing code passes queryId equal to 0 everywhere.
+  private final ConcurrentHashMap<String, String> dagNameToQueryId = new ConcurrentHashMap<>();
+
   public QueryTracker(Configuration conf, String[] localDirsBase) {
     super("QueryTracker");
     this.localDirsBase = localDirsBase;
@@ -158,7 +163,6 @@ public class QueryTracker extends CompositeService {
       completedDagMap.add(dagName);
       LOG.info("Processing queryComplete for dagName={} with deleteDelay={} seconds", dagName,
           deleteDelay);
-      completedDagMap.add(dagName);
       QueryInfo queryInfo = queryInfoMap.remove(dagName);
       if (queryInfo == null) {
         LOG.warn("Ignoring query complete for unknown dag: {}", dagName);
@@ -176,10 +180,13 @@ public class QueryTracker extends CompositeService {
       // and the structures are cleaned up once all tasks complete. New requests, however, should not
       // be allowed after a query complete is received.
       sourceCompletionMap.remove(dagName);
+      String savedQueryId = dagNameToQueryId.remove(dagName);
+      queryId = queryId == null ? savedQueryId : queryId;
       dagSpecificLocks.remove(dagName);
+      if (queryId != null) {
+        ObjectCacheFactory.removeLlapQueryCache(queryId);
+      }
       return queryInfo.getRegisteredFragments();
-      // TODO HIVE-10762 Issue a kill message to all running fragments for this container.
-      // TODO HIVE-10535 Cleanup map join cache
     } finally {
       dagLock.writeLock().unlock();
     }
@@ -227,4 +234,9 @@ public class QueryTracker extends CompositeService {
     }
     return dagMap;
   }
+
+  public void registerDagQueryId(String dagName, String queryId) {
+    if (queryId == null) return;
+    dagNameToQueryId.putIfAbsent(dagName, queryId);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/53b0cb75/llap-server/src/java/org/apache/hadoop/hive/llap/shufflehandler/ShuffleHandler.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/shufflehandler/ShuffleHandler.java b/llap-server/src/java/org/apache/hadoop/hive/llap/shufflehandler/ShuffleHandler.java
index 741c7f2..099cf26 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/shufflehandler/ShuffleHandler.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/shufflehandler/ShuffleHandler.java
@@ -263,6 +263,7 @@ public class ShuffleHandler implements AttemptRegistrationListener {
       maxShuffleThreads = 2 * Runtime.getRuntime().availableProcessors();
     }
 
+    // TODO: this is never used
     localDirs = conf.getTrimmedStrings(SHUFFLE_HANDLER_LOCAL_DIRS);
 
     shuffleBufferSize = conf.getInt(SHUFFLE_BUFFER_SIZE,

http://git-wip-us.apache.org/repos/asf/hive/blob/53b0cb75/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java
index a77b37a..31c5723 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java
@@ -132,10 +132,10 @@ public class MapJoinOperator extends AbstractMapJoinOperator<MapJoinDesc> implem
 
     // On Tez only: The hash map might already be cached in the container we run
     // the task in. On MR: The cache is a no-op.
-    cacheKey = HiveConf.getVar(hconf, HiveConf.ConfVars.HIVEQUERYID)
-      + "__HASH_MAP_"+this.getOperatorId()+"_container";
+    String queryId = HiveConf.getVar(hconf, HiveConf.ConfVars.HIVEQUERYID);
+    cacheKey = "HASH_MAP_" + this.getOperatorId() + "_container";
 
-    cache = ObjectCacheFactory.getCache(hconf);
+    cache = ObjectCacheFactory.getCache(hconf, queryId);
     loader = getHashTableLoader(hconf);
 
     hashMapRowGetters = null;

http://git-wip-us.apache.org/repos/asf/hive/blob/53b0cb75/ql/src/java/org/apache/hadoop/hive/ql/exec/ObjectCacheFactory.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ObjectCacheFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/ObjectCacheFactory.java
index 5d48651..dcf16f9 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ObjectCacheFactory.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ObjectCacheFactory.java
@@ -18,6 +18,10 @@
 
 package org.apache.hadoop.hive.ql.exec;
 
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.llap.io.api.LlapIoProxy;
@@ -29,6 +33,9 @@ import org.apache.hadoop.hive.ql.exec.tez.LlapObjectCache;
  * the hive conf.
  */
 public class ObjectCacheFactory {
+  private static final ConcurrentHashMap<String, ObjectCache> llapQueryCaches =
+      new ConcurrentHashMap<>();
+  private static final Log LOG = LogFactory.getLog(ObjectCacheFactory.class);
 
   private ObjectCacheFactory() {
     // avoid instantiation
@@ -37,19 +44,42 @@ public class ObjectCacheFactory {
   /**
    * Returns the appropriate cache
    */
-  public static ObjectCache getCache(Configuration conf) {
+  public static ObjectCache getCache(Configuration conf, String queryId) {
     if (HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("tez")) {
       if (LlapIoProxy.isDaemon()) { // daemon
-	if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.LLAP_OBJECT_CACHE_ENABLED)) {
-	  return new org.apache.hadoop.hive.ql.exec.tez.LlapObjectCache();
-	} else { // no cache
-	  return new org.apache.hadoop.hive.ql.exec.mr.ObjectCache();
-	}
+        if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.LLAP_OBJECT_CACHE_ENABLED)) {
+          return getLlapObjectCache(queryId);
+        } else { // no cache
+          return new ObjectCacheWrapper(
+              new org.apache.hadoop.hive.ql.exec.mr.ObjectCache(), queryId);
+        }
       } else { // container
-	return new org.apache.hadoop.hive.ql.exec.tez.ObjectCache();
+        return new ObjectCacheWrapper(
+            new org.apache.hadoop.hive.ql.exec.tez.ObjectCache(), queryId);
       }
     } else { // mr or spark
-      return new org.apache.hadoop.hive.ql.exec.mr.ObjectCache();
+      return new ObjectCacheWrapper(
+          new  org.apache.hadoop.hive.ql.exec.mr.ObjectCache(), queryId);
+    }
+  }
+
+  private static ObjectCache getLlapObjectCache(String queryId) {
+    // If order of events (i.e. dagstart and fragmentstart) was guaranteed, we could just
+    // create the cache when dag starts, and blindly return it to execution here.
+    ObjectCache result = llapQueryCaches.get(queryId);
+    if (result != null) return result;
+    result = new LlapObjectCache();
+    ObjectCache old = llapQueryCaches.putIfAbsent(queryId, result);
+    if (old == null && LOG.isDebugEnabled()) {
+      LOG.debug("Created object cache for " + queryId);
+    }
+    return (old != null) ? old : result;
+  }
+
+  public static void removeLlapQueryCache(String queryId) {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Removing object cache for " + queryId);
     }
+    llapQueryCaches.remove(queryId);
   }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/53b0cb75/ql/src/java/org/apache/hadoop/hive/ql/exec/ObjectCacheWrapper.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ObjectCacheWrapper.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/ObjectCacheWrapper.java
new file mode 100644
index 0000000..9768efa
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ObjectCacheWrapper.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.exec;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.Future;
+
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+
+public class ObjectCacheWrapper implements ObjectCache {
+  private final String queryId;
+  private final ObjectCache globalCache;
+  public ObjectCacheWrapper(ObjectCache globalCache, String queryId) {
+    this.queryId = queryId;
+    this.globalCache = globalCache;
+  }
+
+  @Override
+  public void release(String key) {
+    globalCache.release(makeKey(key));
+  }
+
+  @Override
+  public <T> T retrieve(String key, Callable<T> fn) throws HiveException {
+    return globalCache.retrieve(makeKey(key), fn);
+  }
+
+  @Override
+  public <T> Future<T> retrieveAsync(String key, Callable<T> fn)
+      throws HiveException {
+    return globalCache.retrieveAsync(makeKey(key), fn);
+  }
+
+  @Override
+  public void remove(String key) {
+    globalCache.remove(makeKey(key));
+  }
+
+  private String makeKey(String key) {
+    return queryId + "_" + key;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/53b0cb75/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/LlapObjectCache.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/LlapObjectCache.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/LlapObjectCache.java
index b4a3236..00f3c54 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/LlapObjectCache.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/LlapObjectCache.java
@@ -52,17 +52,14 @@ public class LlapObjectCache implements org.apache.hadoop.hive.ql.exec.ObjectCac
 
   private static ExecutorService staticPool = Executors.newCachedThreadPool();
 
-  private static final boolean isLogDebugEnabled = LOG.isDebugEnabled();
   private static final boolean isLogInfoEnabled = LOG.isInfoEnabled();
 
-  public LlapObjectCache() {
-  }
-
   @Override
   public void release(String key) {
     // nothing to do, soft references will clean themselves up
   }
 
+  @SuppressWarnings("unchecked")
   @Override
   public <T> T retrieve(String key, Callable<T> fn) throws HiveException {
 

http://git-wip-us.apache.org/repos/asf/hive/blob/53b0cb75/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java
index 9ce8b8c..c758000 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java
@@ -94,10 +94,11 @@ public class MapRecordProcessor extends RecordProcessor {
 
   public MapRecordProcessor(final JobConf jconf, final ProcessorContext context) throws Exception {
     super(jconf, context);
+    String queryId = HiveConf.getVar(jconf, HiveConf.ConfVars.HIVEQUERYID);
     if (LlapIoProxy.isDaemon()) { // do not cache plan
       cache = new org.apache.hadoop.hive.ql.exec.mr.ObjectCache();
     } else {
-      cache = ObjectCacheFactory.getCache(jconf);
+      cache = ObjectCacheFactory.getCache(jconf, queryId);
     }
     execContext = new ExecMapperContext(jconf);
     execContext.setJc(jconf);
@@ -111,8 +112,8 @@ public class MapRecordProcessor extends RecordProcessor {
     perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TEZ_INIT_OPERATORS);
     super.init(mrReporter, inputs, outputs);
 
-    String queryId = HiveConf.getVar(jconf, HiveConf.ConfVars.HIVEQUERYID);
-    String key = queryId + processorContext.getTaskVertexName() + MAP_PLAN_KEY;
+
+    String key = processorContext.getTaskVertexName() + MAP_PLAN_KEY;
     cacheKeys.add(key);
 
     // create map and fetch operators
@@ -133,7 +134,7 @@ public class MapRecordProcessor extends RecordProcessor {
           continue;
         }
 
-        key = queryId + processorContext.getTaskVertexName() + prefix;
+        key = processorContext.getTaskVertexName() + prefix;
         cacheKeys.add(key);
 
         mergeWorkList.add(

http://git-wip-us.apache.org/repos/asf/hive/blob/53b0cb75/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MergeFileRecordProcessor.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MergeFileRecordProcessor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MergeFileRecordProcessor.java
index 7c0eb89..f5d34db 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MergeFileRecordProcessor.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MergeFileRecordProcessor.java
@@ -92,14 +92,14 @@ public class MergeFileRecordProcessor extends RecordProcessor {
           .initialize();
     }
 
+    String queryId = HiveConf.getVar(jconf, HiveConf.ConfVars.HIVEQUERYID);
     org.apache.hadoop.hive.ql.exec.ObjectCache cache = ObjectCacheFactory
-      .getCache(jconf);
+      .getCache(jconf, queryId);
 
     try {
       execContext.setJc(jconf);
 
-      String queryId = HiveConf.getVar(jconf, HiveConf.ConfVars.HIVEQUERYID);
-      cacheKey = queryId + MAP_PLAN_KEY;
+      cacheKey = MAP_PLAN_KEY;
 
       MapWork mapWork = (MapWork) cache.retrieve(cacheKey, new Callable<Object>() {
         @Override

http://git-wip-us.apache.org/repos/asf/hive/blob/53b0cb75/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/RecordProcessor.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/RecordProcessor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/RecordProcessor.java
index 0859dc4..6182dab 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/RecordProcessor.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/RecordProcessor.java
@@ -131,7 +131,7 @@ public abstract class RecordProcessor  {
           continue;
         }
 
-        key = queryId + prefix;
+        key = prefix;
         cacheKeys.add(key);
 
         mergeWorkList.add((BaseWork) cache.retrieve(key, new Callable<Object>() {

http://git-wip-us.apache.org/repos/asf/hive/blob/53b0cb75/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java
index e5b2d93..77677a0 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java
@@ -91,14 +91,14 @@ public class ReduceRecordProcessor  extends RecordProcessor{
 
     ObjectCache cache;
 
+    String queryId = HiveConf.getVar(jconf, HiveConf.ConfVars.HIVEQUERYID);
     if (LlapIoProxy.isDaemon()) { // don't cache plan
       cache = new org.apache.hadoop.hive.ql.exec.mr.ObjectCache();
     } else {
-      cache = ObjectCacheFactory.getCache(jconf);
+      cache = ObjectCacheFactory.getCache(jconf, queryId);
     }
 
-    String queryId = HiveConf.getVar(jconf, HiveConf.ConfVars.HIVEQUERYID);
-    String cacheKey = queryId + processorContext.getTaskVertexName() + REDUCE_PLAN_KEY;
+    String cacheKey = processorContext.getTaskVertexName() + REDUCE_PLAN_KEY;
     cacheKeys = Lists.newArrayList(cacheKey);
     reduceWork = (ReduceWork) cache.retrieve(cacheKey, new Callable<Object>() {
         @Override

http://git-wip-us.apache.org/repos/asf/hive/blob/53b0cb75/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java
index 9baa0c1..1abcf3b 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java
@@ -34,6 +34,7 @@ import org.apache.tez.common.TezUtils;
 import org.apache.tez.mapreduce.processor.MRTaskReporter;
 import org.apache.tez.runtime.api.AbstractLogicalIOProcessor;
 import org.apache.tez.runtime.api.Event;
+import org.apache.tez.runtime.api.ExecutionContext;
 import org.apache.tez.runtime.api.LogicalInput;
 import org.apache.tez.runtime.api.LogicalOutput;
 import org.apache.tez.runtime.api.ProcessorContext;
@@ -45,7 +46,13 @@ import org.apache.tez.runtime.library.api.KeyValueWriter;
  */
 public class TezProcessor extends AbstractLogicalIOProcessor {
 
-
+  /**
+   * This provides the ability to pass things into TezProcessor, which is normally impossible
+   * because of how Tez APIs are structured. Piggyback on ExecutionContext.
+   */
+  public static interface Hook {
+    void initializeHook(TezProcessor source);
+  }
 
   private static final Log LOG = LogFactory.getLog(TezProcessor.class);
   protected boolean isMap = false;
@@ -92,6 +99,10 @@ public class TezProcessor extends AbstractLogicalIOProcessor {
     Configuration conf = TezUtils.createConfFromUserPayload(getContext().getUserPayload());
     this.jobConf = new JobConf(conf);
     this.processorContext = getContext();
+    ExecutionContext execCtx = processorContext.getExecutionContext();
+    if (execCtx instanceof Hook) {
+      ((Hook)execCtx).initializeHook(this);
+    }
     setupMRLegacyConfigs(processorContext);
     perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.TEZ_INITIALIZE_PROCESSOR);
   }
@@ -217,4 +228,8 @@ public class TezProcessor extends AbstractLogicalIOProcessor {
       writer.write(key, value);
     }
   }
+
+  public JobConf getConf() {
+    return jobConf;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/53b0cb75/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java
----------------------------------------------------------------------
diff --git a/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java b/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java
index cc2ee56..21dde51 100644
--- a/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java
+++ b/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java
@@ -708,7 +708,7 @@ public class Hadoop23Shims extends HadoopShimsSecure {
           if (!filter.accept(filterPath)) continue;
         }
         LocatedFileStatus lfs = next.makeQualifiedLocated(fsUri, p);
-        result.add(new HdfsFileStatusWithIdImpl(lfs, next.getFileId())); // TODO#: here
+        result.add(new HdfsFileStatusWithIdImpl(lfs, next.getFileId()));
       }
       current = current.hasMore() ? dfsc.listPaths(src, current.getLastName(), true) : null;
     }