You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2015/07/02 19:43:45 UTC

hive git commit: HIVE-11015 : LLAP: MiniTez tez_smb_main, tez_bmj_schema_evolution fail with NPE (Sergey Shelukhin, reviewed by Vikram Dixit K)

Repository: hive
Updated Branches:
  refs/heads/llap 255220119 -> 44e550b63


HIVE-11015 : LLAP: MiniTez tez_smb_main, tez_bmj_schema_evolution fail with NPE (Sergey Shelukhin, reviewed by Vikram Dixit K)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/44e550b6
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/44e550b6
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/44e550b6

Branch: refs/heads/llap
Commit: 44e550b63bbb47ae6d1c731b07cad12289cc2b41
Parents: 2552201
Author: Sergey Shelukhin <se...@apache.org>
Authored: Thu Jul 2 10:43:26 2015 -0700
Committer: Sergey Shelukhin <se...@apache.org>
Committed: Thu Jul 2 10:43:26 2015 -0700

----------------------------------------------------------------------
 .../llap/daemon/impl/TaskRunnerCallable.java    |  62 +++---
 .../hadoop/hive/ql/exec/FilterOperator.java     |   3 +-
 .../hive/ql/exec/mr/ExecMapperContext.java      |  10 +-
 .../hive/ql/exec/tez/MapRecordProcessor.java    |   2 +-
 .../ql/io/HiveContextAwareRecordReader.java     |   2 +-
 .../hadoop/hive/ql/io/HiveInputFormat.java      |   1 -
 .../org/apache/hadoop/hive/ql/io/IOContext.java |  55 -----
 .../apache/hadoop/hive/ql/io/IOContextMap.java  | 117 +++++++++++
 .../hadoop/hive/ql/exec/TestOperators.java      |   3 +-
 .../ql/io/TestHiveBinarySearchRecordReader.java |   2 +-
 .../hadoop/hive/ql/io/TestIOContextMap.java     | 207 +++++++++++++++++++
 11 files changed, 368 insertions(+), 96 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/44e550b6/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java
index 9b14fa3..1a125cb 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java
@@ -38,6 +38,7 @@ import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWor
 import org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorMetrics;
 import org.apache.hadoop.hive.llap.protocol.LlapTaskUmbilicalProtocol;
 import org.apache.hadoop.hive.llap.tezplugins.Converters;
+import org.apache.hadoop.hive.ql.io.IOContextMap;
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.Credentials;
@@ -198,38 +199,43 @@ public class TaskRunnerCallable extends CallableWithNdc<TaskRunner2Result> {
         new AtomicLong(0),
         request.getContainerIdString());
 
-    synchronized (this) {
-      if (shouldRunTask) {
-        taskRunner = new TezTaskRunner2(conf, taskUgi, fragmentInfo.getLocalDirs(),
-            taskSpec,
-            request.getAppAttemptNumber(),
-            serviceConsumerMetadata, envMap, startedInputsMap, taskReporter, executor,
-            objectRegistry,
-            pid,
-            executionContext, memoryAvailable);
-      }
-    }
-    if (taskRunner == null) {
-      LOG.info("Not starting task {} since it was killed earlier", taskSpec.getTaskAttemptID());
-      return new TaskRunner2Result(EndReason.KILL_REQUESTED, null, false);
-    }
-
+    String attemptId = fragmentInfo.getFragmentIdentifierString();
+    IOContextMap.setThreadAttemptId(attemptId);
     try {
-      TaskRunner2Result result = taskRunner.run();
-      if (result.isContainerShutdownRequested()) {
-        LOG.warn("Unexpected container shutdown requested while running task. Ignoring");
+      synchronized (this) {
+        if (shouldRunTask) {
+          taskRunner = new TezTaskRunner2(conf, taskUgi, fragmentInfo.getLocalDirs(),
+              taskSpec,
+              request.getAppAttemptNumber(),
+              serviceConsumerMetadata, envMap, startedInputsMap, taskReporter, executor,
+              objectRegistry,
+              pid,
+              executionContext, memoryAvailable);
+        }
+      }
+      if (taskRunner == null) {
+        LOG.info("Not starting task {} since it was killed earlier", taskSpec.getTaskAttemptID());
+        return new TaskRunner2Result(EndReason.KILL_REQUESTED, null, false);
       }
-      isCompleted.set(true);
-      return result;
 
-    } finally {
-      // TODO Fix UGI and FS Handling. Closing UGI here causes some errors right now.
-      //        FileSystem.closeAllForUGI(taskUgi);
-      LOG.info("ExecutionTime for Container: " + request.getContainerIdString() + "=" +
-          runtimeWatch.stop().elapsedMillis());
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("canFinish post completion: " + taskSpec.getTaskAttemptID() + ": " + canFinish());
+      try {
+        TaskRunner2Result result = taskRunner.run();
+        if (result.isContainerShutdownRequested()) {
+          LOG.warn("Unexpected container shutdown requested while running task. Ignoring");
+        }
+        isCompleted.set(true);
+        return result;
+      } finally {
+        // TODO Fix UGI and FS Handling. Closing UGI here causes some errors right now.
+        //        FileSystem.closeAllForUGI(taskUgi);
+        LOG.info("ExecutionTime for Container: " + request.getContainerIdString() + "=" +
+            runtimeWatch.stop().elapsedMillis());
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("canFinish post completion: " + taskSpec.getTaskAttemptID() + ": " + canFinish());
+        }
       }
+    } finally {
+      IOContextMap.clearThreadAttempt(attemptId);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/44e550b6/ql/src/java/org/apache/hadoop/hive/ql/exec/FilterOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/FilterOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/FilterOperator.java
index ed78593..0e7e79d 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/FilterOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/FilterOperator.java
@@ -25,6 +25,7 @@ import java.util.concurrent.Future;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.io.IOContext;
+import org.apache.hadoop.hive.ql.io.IOContextMap;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.plan.FilterDesc;
 import org.apache.hadoop.hive.ql.plan.api.OperatorType;
@@ -61,7 +62,7 @@ public class FilterOperator extends Operator<FilterDesc> implements
       }
 
       conditionInspector = null;
-      ioContext = IOContext.get(hconf);
+      ioContext = IOContextMap.get(hconf);
     } catch (Throwable e) {
       throw new HiveException(e);
     }

http://git-wip-us.apache.org/repos/asf/hive/blob/44e550b6/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapperContext.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapperContext.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapperContext.java
index 13d0650..fc5abfe 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapperContext.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapperContext.java
@@ -22,8 +22,8 @@ import java.util.Map;
 import org.apache.commons.logging.Log;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.ql.exec.FetchOperator;
-import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.hive.ql.io.IOContext;
+import org.apache.hadoop.hive.ql.io.IOContextMap;
 import org.apache.hadoop.hive.ql.plan.MapredLocalWork;
 import org.apache.hadoop.mapred.JobConf;
 
@@ -63,11 +63,11 @@ public class ExecMapperContext {
 
   public ExecMapperContext(JobConf jc) {
     this.jc = jc;
-    ioCxt = IOContext.get(jc);
+    ioCxt = IOContextMap.get(jc);
   }
 
   public void clear() {
-    IOContext.clear();
+    IOContextMap.clear();
     ioCxt = null;
   }
 
@@ -151,8 +151,4 @@ public class ExecMapperContext {
   public IOContext getIoCxt() {
     return ioCxt;
   }
-
-  public void setIoCxt(IOContext ioCxt) {
-    this.ioCxt = ioCxt;
-  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/44e550b6/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java
index 2172fdb..e205f1e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java
@@ -201,7 +201,7 @@ public class MapRecordProcessor extends RecordProcessor {
             mergeMapOp.setChildren(jconf);
 
             DummyStoreOperator dummyOp = getJoinParentOp(mergeMapOp);
-	          mapOp.setConnectedOperators(mergeMapWork.getTag(), dummyOp);
+            mapOp.setConnectedOperators(mergeMapWork.getTag(), dummyOp);
 
             mergeMapOp.passExecContext(new ExecMapperContext(jconf));
             mergeMapOp.initializeLocalWork(jconf);

http://git-wip-us.apache.org/repos/asf/hive/blob/44e550b6/ql/src/java/org/apache/hadoop/hive/ql/io/HiveContextAwareRecordReader.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveContextAwareRecordReader.java b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveContextAwareRecordReader.java
index 9b3f8ec..738ca9c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveContextAwareRecordReader.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveContextAwareRecordReader.java
@@ -162,7 +162,7 @@ public abstract class HiveContextAwareRecordReader<K, V> implements RecordReader
   }
 
   public IOContext getIOContext() {
-    return IOContext.get(jobConf);
+    return IOContextMap.get(jobConf);
   }
 
   private void initIOContext(long startPos, boolean isBlockPointer,

http://git-wip-us.apache.org/repos/asf/hive/blob/44e550b6/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
index 55cc7df..0d9b644 100755
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
@@ -264,7 +264,6 @@ public class HiveInputFormat<K extends WritableComparable, V extends Writable>
 
   public RecordReader getRecordReader(InputSplit split, JobConf job,
       Reporter reporter) throws IOException {
-
     HiveInputSplit hsplit = (HiveInputSplit) split;
 
     InputSplit inputSplit = hsplit.getInputSplit();

http://git-wip-us.apache.org/repos/asf/hive/blob/44e550b6/ql/src/java/org/apache/hadoop/hive/ql/io/IOContext.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/IOContext.java b/ql/src/java/org/apache/hadoop/hive/ql/io/IOContext.java
index fc88949..019db8d 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/IOContext.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/IOContext.java
@@ -18,13 +18,7 @@
 
 package org.apache.hadoop.hive.ql.io;
 
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.ql.exec.Utilities;
 
 /**
  * IOContext basically contains the position information of the current
@@ -35,55 +29,6 @@ import org.apache.hadoop.hive.ql.exec.Utilities;
  * nextBlockStart refers the end of current row and beginning of next row.
  */
 public class IOContext {
-  public static final String DEFAULT_CONTEXT = "";
-
-  private static final ThreadLocal<Map<String,IOContext>> threadLocalMap
-      = new ThreadLocal<Map<String,IOContext>>() {
-    @Override
-    protected synchronized Map<String,IOContext> initialValue() {
-      Map<String, IOContext> map = new HashMap<String, IOContext>(); 
-      map.put(DEFAULT_CONTEXT, new IOContext());
-      return map;
-    }
-  };
-
-  /**
-   * Spark uses this thread local TODO: no it doesn't?
-   */
-  private static final ThreadLocal<IOContext> threadLocal = new ThreadLocal<IOContext>(){
-    @Override
-    protected IOContext initialValue() { return new IOContext(); }
-  };
-
-  private static IOContext get() {
-      return IOContext.threadLocalMap.get().get(DEFAULT_CONTEXT);
-  }
-
-  /**
-   * Tez and MR use this map but are single threaded per JVM thus no synchronization is required.
-   */
-  private static final Map<String, IOContext> inputNameIOContextMap = new HashMap<String, IOContext>();
-
-  public static IOContext get(Configuration conf) {
-    String inputName = conf.get(Utilities.INPUT_NAME);
-    Map<String, IOContext> inputNameIOContextMap = threadLocalMap.get();
-
-    if (inputName == null) {
-      inputName = DEFAULT_CONTEXT;
-    }
-
-    if (!inputNameIOContextMap.containsKey(inputName)) {
-      IOContext ioContext = new IOContext();
-      inputNameIOContextMap.put(inputName, ioContext);
-    }
-
-    return inputNameIOContextMap.get(inputName);
-  }
-
-  public static void clear() {
-      threadLocal.remove();
-  }
-
   private long currentBlockStart;
   private long nextBlockStart;
   private long currentRow;

http://git-wip-us.apache.org/repos/asf/hive/blob/44e550b6/ql/src/java/org/apache/hadoop/hive/ql/io/IOContextMap.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/IOContextMap.java b/ql/src/java/org/apache/hadoop/hive/ql/io/IOContextMap.java
new file mode 100644
index 0000000..57e7e2a
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/IOContextMap.java
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.io;
+
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.Utilities;
+
+/**
+ * There used to be a global static map of IOContext-s inside IOContext (Hive style!).
+ * Unfortunately, due to variety of factors, this is now a giant fustercluck.
+ * 1) Spark doesn't apparently care about multiple inputs, but has multiple threads, so one
+ *    threadlocal IOContext was added for it.
+ * 2) LLAP has lots of tasks in the same process so globals no longer cut it either.
+ * 3) However, Tez runs 2+ threads for one task (e.g. TezTaskEventRouter and TezChild), and these
+ *    surprisingly enough need the same context. Tez, in its infinite wisdom, doesn't allow them
+ *    to communicate in any way nor provide any shared context.
+ * So we are going to...
+ * 1) Keep the good ol' global map for MR and Tez. Hive style!
+ * 2) Keep the threadlocal for Spark. Hive style!
+ * 3) Create inheritable (TADA!) threadlocal with attemptId, only set in LLAP; that will propagate
+ *    to all the little Tez threads, and we will keep a map per attempt. Hive style squared!
+ */
+public class IOContextMap {
+  public static final String DEFAULT_CONTEXT = "";
+  private static final Log LOG = LogFactory.getLog(IOContextMap.class);
+
+  /** Used for Tez and MR */
+  private static final ConcurrentHashMap<String, IOContext> globalMap =
+      new ConcurrentHashMap<String, IOContext>();
+
+  /** Used for Spark */
+  private static final ThreadLocal<IOContext> sparkThreadLocal = new ThreadLocal<IOContext>(){
+    @Override
+    protected IOContext initialValue() { return new IOContext(); }
+  };
+
+  /** Used for Tez+LLAP */
+  private static final ConcurrentHashMap<String, ConcurrentHashMap<String, IOContext>> attemptMap =
+      new ConcurrentHashMap<String, ConcurrentHashMap<String, IOContext>>();
+
+  // TODO: This depends on Tez creating separate threads, as it does now. If that changes, some
+  //       other way to propagate/find out attempt ID would be needed (e.g. see TEZ-2587).
+  private static final InheritableThreadLocal<String> threadAttemptId =
+      new InheritableThreadLocal<>();
+
+  public static void setThreadAttemptId(String attemptId) {
+    assert attemptId != null;
+    threadAttemptId.set(attemptId);
+  }
+
+  public static void clearThreadAttempt(String attemptId) {
+    assert attemptId != null;
+    String attemptIdCheck = threadAttemptId.get();
+    if (!attemptId.equals(attemptIdCheck)) {
+      LOG.error("Thread is clearing context for "
+          + attemptId + ", but " + attemptIdCheck + " expected");
+    }
+    attemptMap.remove(attemptId);
+    threadAttemptId.remove();
+  }
+
+  public static IOContext get(Configuration conf) {
+    if (HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("spark")) {
+      return sparkThreadLocal.get();
+    }
+    String inputName = conf.get(Utilities.INPUT_NAME);
+    if (inputName == null) {
+      inputName = DEFAULT_CONTEXT;
+    }
+    String attemptId = threadAttemptId.get();
+    ConcurrentHashMap<String, IOContext> map;
+    if (attemptId == null) {
+      map = globalMap;
+    } else {
+      map = attemptMap.get(attemptId);
+      if (map == null) {
+        map = new ConcurrentHashMap<>();
+        ConcurrentHashMap<String, IOContext> oldMap = attemptMap.putIfAbsent(attemptId, map);
+        if (oldMap != null) {
+          map = oldMap;
+        }
+      }
+    }
+
+    IOContext ioContext = map.get(inputName);
+    if (ioContext != null) return ioContext;
+    ioContext = new IOContext();
+    IOContext oldContext = map.putIfAbsent(inputName, ioContext);
+    return (oldContext == null) ? ioContext : oldContext;
+  }
+
+  public static void clear() {
+    sparkThreadLocal.remove();
+    globalMap.clear();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/44e550b6/ql/src/test/org/apache/hadoop/hive/ql/exec/TestOperators.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/TestOperators.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestOperators.java
index 62057d8..c3a36c0 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/exec/TestOperators.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestOperators.java
@@ -33,6 +33,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.Driver;
 import org.apache.hadoop.hive.ql.io.IOContext;
+import org.apache.hadoop.hive.ql.io.IOContextMap;
 import org.apache.hadoop.hive.ql.parse.TypeCheckProcFactory;
 import org.apache.hadoop.hive.ql.plan.CollectDesc;
 import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc;
@@ -272,7 +273,7 @@ public class TestOperators extends TestCase {
       JobConf hconf = new JobConf(TestOperators.class);
       HiveConf.setVar(hconf, HiveConf.ConfVars.HADOOPMAPFILENAME,
           "hdfs:///testDir/testFile");
-      IOContext.get(hconf).setInputPath(
+      IOContextMap.get(hconf).setInputPath(
           new Path("hdfs:///testDir/testFile"));
 
       // initialize pathToAliases

http://git-wip-us.apache.org/repos/asf/hive/blob/44e550b6/ql/src/test/org/apache/hadoop/hive/ql/io/TestHiveBinarySearchRecordReader.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/TestHiveBinarySearchRecordReader.java b/ql/src/test/org/apache/hadoop/hive/ql/io/TestHiveBinarySearchRecordReader.java
index 7a1748c..9dc4f5b 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/io/TestHiveBinarySearchRecordReader.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/io/TestHiveBinarySearchRecordReader.java
@@ -116,7 +116,7 @@ public class TestHiveBinarySearchRecordReader extends TestCase {
 
   private void resetIOContext() {
     conf.set(Utilities.INPUT_NAME, "TestHiveBinarySearchRecordReader");
-    ioContext = IOContext.get(conf);
+    ioContext = IOContextMap.get(conf);
     ioContext.setUseSorted(false);
     ioContext.setBinarySearching(false);
     ioContext.setEndBinarySearch(false);

http://git-wip-us.apache.org/repos/asf/hive/blob/44e550b6/ql/src/test/org/apache/hadoop/hive/ql/io/TestIOContextMap.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/TestIOContextMap.java b/ql/src/test/org/apache/hadoop/hive/ql/io/TestIOContextMap.java
new file mode 100644
index 0000000..dad5536
--- /dev/null
+++ b/ql/src/test/org/apache/hadoop/hive/ql/io/TestIOContextMap.java
@@ -0,0 +1,207 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.io;
+
+import static org.junit.Assert.*;
+
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.FutureTask;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.junit.Test;
+
+import com.google.common.collect.Sets;
+
+public class TestIOContextMap {
+
+  private void syncThreadStart(final CountDownLatch cdlIn, final CountDownLatch cdlOut) {
+    cdlIn.countDown();
+    try {
+      cdlOut.await();
+    } catch (InterruptedException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Test
+  public void testMRTezGlobalMap() throws Exception {
+    // Tests concurrent modification, and that results are the same per input across threads
+    // but different between inputs.
+    final int THREAD_COUNT = 2, ITER_COUNT = 1000;
+    final AtomicInteger countdown = new AtomicInteger(ITER_COUNT);
+    final CountDownLatch phase1End = new CountDownLatch(THREAD_COUNT);
+    final IOContext[] results = new IOContext[ITER_COUNT];
+    ExecutorService executor = Executors.newFixedThreadPool(THREAD_COUNT);
+    final CountDownLatch cdlIn = new CountDownLatch(THREAD_COUNT), cdlOut = new CountDownLatch(1);
+
+    @SuppressWarnings("unchecked")
+    FutureTask<Void>[] tasks = new FutureTask[THREAD_COUNT];
+    for (int i = 0; i < tasks.length; ++i) {
+      tasks[i] = new FutureTask<Void>(new Callable<Void>() {
+        public Void call() throws Exception {
+          Configuration conf = new Configuration();
+          syncThreadStart(cdlIn, cdlOut);
+          // Phase 1 - create objects.
+          while (true) {
+            int nextIx = countdown.decrementAndGet();
+            if (nextIx < 0) break;
+            conf.set(Utilities.INPUT_NAME, "Input " + nextIx);
+            results[nextIx] = IOContextMap.get(conf);
+            if (nextIx == 0) break;
+          }
+          phase1End.countDown();
+          phase1End.await();
+          // Phase 2 - verify we get the expected objects created by all threads.
+          for (int i = 0; i < ITER_COUNT; ++i) {
+            conf.set(Utilities.INPUT_NAME, "Input " + i);
+            IOContext ctx = IOContextMap.get(conf);
+            assertSame(results[i], ctx);
+          }
+          return null;
+        }
+      });
+      executor.execute(tasks[i]);
+    }
+
+    cdlIn.await(); // Wait for all threads to be ready.
+    cdlOut.countDown(); // Release them at the same time.
+    for (int i = 0; i < tasks.length; ++i) {
+      tasks[i].get();
+    }
+    Set<IOContext> resultSet = Sets.newIdentityHashSet();
+    for (int i = 0; i < results.length; ++i) {
+      assertTrue(resultSet.add(results[i])); // All the objects must be different.
+    }
+  }
+
+  @Test
+  public void testTezLlapAttemptMap() throws Exception {
+    // Tests that different threads get the same object per attempt per input, and different
+    // between attempts/inputs; that attempt is inherited between threads; and that clearing
+    // the attempt produces a different result.
+    final int THREAD_COUNT = 2, ITER_COUNT = 1000, ATTEMPT_COUNT = 3;
+    final AtomicInteger countdown = new AtomicInteger(ITER_COUNT);
+    final IOContext[] results = new IOContext[ITER_COUNT * ATTEMPT_COUNT];
+    ExecutorService executor = Executors.newFixedThreadPool(THREAD_COUNT);
+    final CountDownLatch cdlIn = new CountDownLatch(THREAD_COUNT), cdlOut = new CountDownLatch(1);
+
+    @SuppressWarnings("unchecked")
+    FutureTask<Void>[] tasks = new FutureTask[THREAD_COUNT];
+    for (int i = 0; i < tasks.length; ++i) {
+      tasks[i] = new FutureTask<Void>(new Callable<Void>() {
+        public Void call() throws Exception {
+          final Configuration conf = new Configuration(), conf2 = new Configuration();
+          syncThreadStart(cdlIn, cdlOut);
+          while (true) {
+            int nextIx = countdown.decrementAndGet();
+            if (nextIx < 0) break;
+            String input1 = "Input " + nextIx;
+            conf.set(Utilities.INPUT_NAME, input1);
+            for (int j = 0; j < ATTEMPT_COUNT; ++j) {
+              String attemptId = "Attempt " + nextIx + ":" + j;
+              IOContextMap.setThreadAttemptId(attemptId);
+              final IOContext r1 = results[(nextIx * ATTEMPT_COUNT) + j] = IOContextMap.get(conf);
+              // For some attempts, check inheritance.
+              if ((nextIx % (ITER_COUNT / 10)) == 0) {
+                String input2 = "Input2 " + nextIx;
+                conf2.set(Utilities.INPUT_NAME, input2);
+                final AtomicReference<IOContext> ref2 = new AtomicReference<>();
+                Thread t = new Thread(new Runnable() {
+                  public void run() {
+                    assertSame(r1, IOContextMap.get(conf));
+                    ref2.set(IOContextMap.get(conf2));
+                  }
+                });
+                t.start();
+                t.join();
+                assertSame(ref2.get(), IOContextMap.get(conf2));
+              }
+              // Don't clear the attempt ID, or the stuff will be cleared.
+            }
+            if (nextIx == 0) break;
+          }
+          return null;
+        }
+      });
+      executor.execute(tasks[i]);
+    }
+
+    cdlIn.await(); // Wait for all threads to be ready.
+    cdlOut.countDown(); // Release them at the same time.
+    for (int i = 0; i < tasks.length; ++i) {
+      tasks[i].get();
+    }
+    Configuration conf = new Configuration();
+    Set<IOContext> resultSet = Sets.newIdentityHashSet();
+    for (int i = 0; i < ITER_COUNT; ++i) {
+      conf.set(Utilities.INPUT_NAME, "Input " + i);
+      for (int j = 0; j < ATTEMPT_COUNT; ++j) {
+        String attemptId = "Attempt " + i + ":" + j;
+        IOContext result = results[(i * ATTEMPT_COUNT) + j];
+        assertTrue(resultSet.add(result)); // All the objects must be different.
+        IOContextMap.setThreadAttemptId(attemptId);
+        assertSame(result, IOContextMap.get(conf)); // Matching result for attemptId + input.
+        IOContextMap.clearThreadAttempt(attemptId);
+        IOContextMap.setThreadAttemptId(attemptId);
+        assertNotSame(result, IOContextMap.get(conf)); // Different result after clearing.
+      }
+    }
+  }
+
+  @Test
+  public void testSparkThreadLocal() throws Exception {
+    // Test that input name does not change IOContext returned, and that each thread gets its own.
+    final Configuration conf1 = new Configuration();
+    conf1.set(HiveConf.ConfVars.HIVE_EXECUTION_ENGINE.varname, "spark");
+    final Configuration conf2 = new Configuration(conf1);
+    conf2.set(Utilities.INPUT_NAME, "Other input");
+    final int THREAD_COUNT = 2;
+    ExecutorService executor = Executors.newFixedThreadPool(THREAD_COUNT);
+    final CountDownLatch cdlIn = new CountDownLatch(THREAD_COUNT), cdlOut = new CountDownLatch(1);
+    @SuppressWarnings("unchecked")
+    FutureTask<IOContext>[] tasks = new FutureTask[THREAD_COUNT];
+    for (int i = 0; i < tasks.length; ++i) {
+      tasks[i] = new FutureTask<IOContext>(new Callable<IOContext>() {
+        public IOContext call() throws Exception {
+          syncThreadStart(cdlIn, cdlOut);
+          IOContext c1 = IOContextMap.get(conf1), c2 = IOContextMap.get(conf2);
+          assertSame(c1, c2);
+          return c1;
+        }
+      });
+      executor.execute(tasks[i]);
+    }
+
+    cdlIn.await(); // Wait for all threads to be ready.
+    cdlOut.countDown(); // Release them at the same time.
+    Set<IOContext> results = Sets.newIdentityHashSet();
+    for (int i = 0; i < tasks.length; ++i) {
+      assertTrue(results.add(tasks[i].get())); // All the objects must be different.
+    }
+  }
+
+}