You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2015/07/02 19:43:45 UTC
hive git commit: HIVE-11015 : LLAP: MiniTez tez_smb_main,
tez_bmj_schema_evolution fail with NPE (Sergey Shelukhin,
reviewed by Vikram Dixit K)
Repository: hive
Updated Branches:
refs/heads/llap 255220119 -> 44e550b63
HIVE-11015 : LLAP: MiniTez tez_smb_main, tez_bmj_schema_evolution fail with NPE (Sergey Shelukhin, reviewed by Vikram Dixit K)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/44e550b6
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/44e550b6
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/44e550b6
Branch: refs/heads/llap
Commit: 44e550b63bbb47ae6d1c731b07cad12289cc2b41
Parents: 2552201
Author: Sergey Shelukhin <se...@apache.org>
Authored: Thu Jul 2 10:43:26 2015 -0700
Committer: Sergey Shelukhin <se...@apache.org>
Committed: Thu Jul 2 10:43:26 2015 -0700
----------------------------------------------------------------------
.../llap/daemon/impl/TaskRunnerCallable.java | 62 +++---
.../hadoop/hive/ql/exec/FilterOperator.java | 3 +-
.../hive/ql/exec/mr/ExecMapperContext.java | 10 +-
.../hive/ql/exec/tez/MapRecordProcessor.java | 2 +-
.../ql/io/HiveContextAwareRecordReader.java | 2 +-
.../hadoop/hive/ql/io/HiveInputFormat.java | 1 -
.../org/apache/hadoop/hive/ql/io/IOContext.java | 55 -----
.../apache/hadoop/hive/ql/io/IOContextMap.java | 117 +++++++++++
.../hadoop/hive/ql/exec/TestOperators.java | 3 +-
.../ql/io/TestHiveBinarySearchRecordReader.java | 2 +-
.../hadoop/hive/ql/io/TestIOContextMap.java | 207 +++++++++++++++++++
11 files changed, 368 insertions(+), 96 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/44e550b6/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java
index 9b14fa3..1a125cb 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java
@@ -38,6 +38,7 @@ import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWor
import org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorMetrics;
import org.apache.hadoop.hive.llap.protocol.LlapTaskUmbilicalProtocol;
import org.apache.hadoop.hive.llap.tezplugins.Converters;
+import org.apache.hadoop.hive.ql.io.IOContextMap;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.Credentials;
@@ -198,38 +199,43 @@ public class TaskRunnerCallable extends CallableWithNdc<TaskRunner2Result> {
new AtomicLong(0),
request.getContainerIdString());
- synchronized (this) {
- if (shouldRunTask) {
- taskRunner = new TezTaskRunner2(conf, taskUgi, fragmentInfo.getLocalDirs(),
- taskSpec,
- request.getAppAttemptNumber(),
- serviceConsumerMetadata, envMap, startedInputsMap, taskReporter, executor,
- objectRegistry,
- pid,
- executionContext, memoryAvailable);
- }
- }
- if (taskRunner == null) {
- LOG.info("Not starting task {} since it was killed earlier", taskSpec.getTaskAttemptID());
- return new TaskRunner2Result(EndReason.KILL_REQUESTED, null, false);
- }
-
+ String attemptId = fragmentInfo.getFragmentIdentifierString();
+ IOContextMap.setThreadAttemptId(attemptId);
try {
- TaskRunner2Result result = taskRunner.run();
- if (result.isContainerShutdownRequested()) {
- LOG.warn("Unexpected container shutdown requested while running task. Ignoring");
+ synchronized (this) {
+ if (shouldRunTask) {
+ taskRunner = new TezTaskRunner2(conf, taskUgi, fragmentInfo.getLocalDirs(),
+ taskSpec,
+ request.getAppAttemptNumber(),
+ serviceConsumerMetadata, envMap, startedInputsMap, taskReporter, executor,
+ objectRegistry,
+ pid,
+ executionContext, memoryAvailable);
+ }
+ }
+ if (taskRunner == null) {
+ LOG.info("Not starting task {} since it was killed earlier", taskSpec.getTaskAttemptID());
+ return new TaskRunner2Result(EndReason.KILL_REQUESTED, null, false);
}
- isCompleted.set(true);
- return result;
- } finally {
- // TODO Fix UGI and FS Handling. Closing UGI here causes some errors right now.
- // FileSystem.closeAllForUGI(taskUgi);
- LOG.info("ExecutionTime for Container: " + request.getContainerIdString() + "=" +
- runtimeWatch.stop().elapsedMillis());
- if (LOG.isDebugEnabled()) {
- LOG.debug("canFinish post completion: " + taskSpec.getTaskAttemptID() + ": " + canFinish());
+ try {
+ TaskRunner2Result result = taskRunner.run();
+ if (result.isContainerShutdownRequested()) {
+ LOG.warn("Unexpected container shutdown requested while running task. Ignoring");
+ }
+ isCompleted.set(true);
+ return result;
+ } finally {
+ // TODO Fix UGI and FS Handling. Closing UGI here causes some errors right now.
+ // FileSystem.closeAllForUGI(taskUgi);
+ LOG.info("ExecutionTime for Container: " + request.getContainerIdString() + "=" +
+ runtimeWatch.stop().elapsedMillis());
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("canFinish post completion: " + taskSpec.getTaskAttemptID() + ": " + canFinish());
+ }
}
+ } finally {
+ IOContextMap.clearThreadAttempt(attemptId);
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/44e550b6/ql/src/java/org/apache/hadoop/hive/ql/exec/FilterOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/FilterOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/FilterOperator.java
index ed78593..0e7e79d 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/FilterOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/FilterOperator.java
@@ -25,6 +25,7 @@ import java.util.concurrent.Future;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.io.IOContext;
+import org.apache.hadoop.hive.ql.io.IOContextMap;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.plan.FilterDesc;
import org.apache.hadoop.hive.ql.plan.api.OperatorType;
@@ -61,7 +62,7 @@ public class FilterOperator extends Operator<FilterDesc> implements
}
conditionInspector = null;
- ioContext = IOContext.get(hconf);
+ ioContext = IOContextMap.get(hconf);
} catch (Throwable e) {
throw new HiveException(e);
}
http://git-wip-us.apache.org/repos/asf/hive/blob/44e550b6/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapperContext.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapperContext.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapperContext.java
index 13d0650..fc5abfe 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapperContext.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapperContext.java
@@ -22,8 +22,8 @@ import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.ql.exec.FetchOperator;
-import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.io.IOContext;
+import org.apache.hadoop.hive.ql.io.IOContextMap;
import org.apache.hadoop.hive.ql.plan.MapredLocalWork;
import org.apache.hadoop.mapred.JobConf;
@@ -63,11 +63,11 @@ public class ExecMapperContext {
public ExecMapperContext(JobConf jc) {
this.jc = jc;
- ioCxt = IOContext.get(jc);
+ ioCxt = IOContextMap.get(jc);
}
public void clear() {
- IOContext.clear();
+ IOContextMap.clear();
ioCxt = null;
}
@@ -151,8 +151,4 @@ public class ExecMapperContext {
public IOContext getIoCxt() {
return ioCxt;
}
-
- public void setIoCxt(IOContext ioCxt) {
- this.ioCxt = ioCxt;
- }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/44e550b6/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java
index 2172fdb..e205f1e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java
@@ -201,7 +201,7 @@ public class MapRecordProcessor extends RecordProcessor {
mergeMapOp.setChildren(jconf);
DummyStoreOperator dummyOp = getJoinParentOp(mergeMapOp);
- mapOp.setConnectedOperators(mergeMapWork.getTag(), dummyOp);
+ mapOp.setConnectedOperators(mergeMapWork.getTag(), dummyOp);
mergeMapOp.passExecContext(new ExecMapperContext(jconf));
mergeMapOp.initializeLocalWork(jconf);
http://git-wip-us.apache.org/repos/asf/hive/blob/44e550b6/ql/src/java/org/apache/hadoop/hive/ql/io/HiveContextAwareRecordReader.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveContextAwareRecordReader.java b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveContextAwareRecordReader.java
index 9b3f8ec..738ca9c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveContextAwareRecordReader.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveContextAwareRecordReader.java
@@ -162,7 +162,7 @@ public abstract class HiveContextAwareRecordReader<K, V> implements RecordReader
}
public IOContext getIOContext() {
- return IOContext.get(jobConf);
+ return IOContextMap.get(jobConf);
}
private void initIOContext(long startPos, boolean isBlockPointer,
http://git-wip-us.apache.org/repos/asf/hive/blob/44e550b6/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
index 55cc7df..0d9b644 100755
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
@@ -264,7 +264,6 @@ public class HiveInputFormat<K extends WritableComparable, V extends Writable>
public RecordReader getRecordReader(InputSplit split, JobConf job,
Reporter reporter) throws IOException {
-
HiveInputSplit hsplit = (HiveInputSplit) split;
InputSplit inputSplit = hsplit.getInputSplit();
http://git-wip-us.apache.org/repos/asf/hive/blob/44e550b6/ql/src/java/org/apache/hadoop/hive/ql/io/IOContext.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/IOContext.java b/ql/src/java/org/apache/hadoop/hive/ql/io/IOContext.java
index fc88949..019db8d 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/IOContext.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/IOContext.java
@@ -18,13 +18,7 @@
package org.apache.hadoop.hive.ql.io;
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.ql.exec.Utilities;
/**
* IOContext basically contains the position information of the current
@@ -35,55 +29,6 @@ import org.apache.hadoop.hive.ql.exec.Utilities;
* nextBlockStart refers the end of current row and beginning of next row.
*/
public class IOContext {
- public static final String DEFAULT_CONTEXT = "";
-
- private static final ThreadLocal<Map<String,IOContext>> threadLocalMap
- = new ThreadLocal<Map<String,IOContext>>() {
- @Override
- protected synchronized Map<String,IOContext> initialValue() {
- Map<String, IOContext> map = new HashMap<String, IOContext>();
- map.put(DEFAULT_CONTEXT, new IOContext());
- return map;
- }
- };
-
- /**
- * Spark uses this thread local TODO: no it doesn't?
- */
- private static final ThreadLocal<IOContext> threadLocal = new ThreadLocal<IOContext>(){
- @Override
- protected IOContext initialValue() { return new IOContext(); }
- };
-
- private static IOContext get() {
- return IOContext.threadLocalMap.get().get(DEFAULT_CONTEXT);
- }
-
- /**
- * Tez and MR use this map but are single threaded per JVM thus no synchronization is required.
- */
- private static final Map<String, IOContext> inputNameIOContextMap = new HashMap<String, IOContext>();
-
- public static IOContext get(Configuration conf) {
- String inputName = conf.get(Utilities.INPUT_NAME);
- Map<String, IOContext> inputNameIOContextMap = threadLocalMap.get();
-
- if (inputName == null) {
- inputName = DEFAULT_CONTEXT;
- }
-
- if (!inputNameIOContextMap.containsKey(inputName)) {
- IOContext ioContext = new IOContext();
- inputNameIOContextMap.put(inputName, ioContext);
- }
-
- return inputNameIOContextMap.get(inputName);
- }
-
- public static void clear() {
- threadLocal.remove();
- }
-
private long currentBlockStart;
private long nextBlockStart;
private long currentRow;
http://git-wip-us.apache.org/repos/asf/hive/blob/44e550b6/ql/src/java/org/apache/hadoop/hive/ql/io/IOContextMap.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/IOContextMap.java b/ql/src/java/org/apache/hadoop/hive/ql/io/IOContextMap.java
new file mode 100644
index 0000000..57e7e2a
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/IOContextMap.java
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.io;
+
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.Utilities;
+
+/**
+ * There used to be a global static map of IOContext-s inside IOContext (Hive style!).
+ * Unfortunately, due to variety of factors, this is now a giant fustercluck.
+ * 1) Spark doesn't apparently care about multiple inputs, but has multiple threads, so one
+ * threadlocal IOContext was added for it.
+ * 2) LLAP has lots of tasks in the same process so globals no longer cut it either.
+ * 3) However, Tez runs 2+ threads for one task (e.g. TezTaskEventRouter and TezChild), and these
+ * surprisingly enough need the same context. Tez, in its infinite wisdom, doesn't allow them
+ * to communicate in any way nor provide any shared context.
+ * So we are going to...
+ * 1) Keep the good ol' global map for MR and Tez. Hive style!
+ * 2) Keep the threadlocal for Spark. Hive style!
+ * 3) Create inheritable (TADA!) threadlocal with attemptId, only set in LLAP; that will propagate
+ * to all the little Tez threads, and we will keep a map per attempt. Hive style squared!
+ */
+public class IOContextMap {
+ public static final String DEFAULT_CONTEXT = "";
+ private static final Log LOG = LogFactory.getLog(IOContextMap.class);
+
+ /** Used for Tez and MR */
+ private static final ConcurrentHashMap<String, IOContext> globalMap =
+ new ConcurrentHashMap<String, IOContext>();
+
+ /** Used for Spark */
+ private static final ThreadLocal<IOContext> sparkThreadLocal = new ThreadLocal<IOContext>(){
+ @Override
+ protected IOContext initialValue() { return new IOContext(); }
+ };
+
+ /** Used for Tez+LLAP */
+ private static final ConcurrentHashMap<String, ConcurrentHashMap<String, IOContext>> attemptMap =
+ new ConcurrentHashMap<String, ConcurrentHashMap<String, IOContext>>();
+
+ // TODO: This depends on Tez creating separate threads, as it does now. If that changes, some
+ // other way to propagate/find out attempt ID would be needed (e.g. see TEZ-2587).
+ private static final InheritableThreadLocal<String> threadAttemptId =
+ new InheritableThreadLocal<>();
+
+ public static void setThreadAttemptId(String attemptId) {
+ assert attemptId != null;
+ threadAttemptId.set(attemptId);
+ }
+
+ public static void clearThreadAttempt(String attemptId) {
+ assert attemptId != null;
+ String attemptIdCheck = threadAttemptId.get();
+ if (!attemptId.equals(attemptIdCheck)) {
+ LOG.error("Thread is clearing context for "
+ + attemptId + ", but " + attemptIdCheck + " expected");
+ }
+ attemptMap.remove(attemptId);
+ threadAttemptId.remove();
+ }
+
+ public static IOContext get(Configuration conf) {
+ if (HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("spark")) {
+ return sparkThreadLocal.get();
+ }
+ String inputName = conf.get(Utilities.INPUT_NAME);
+ if (inputName == null) {
+ inputName = DEFAULT_CONTEXT;
+ }
+ String attemptId = threadAttemptId.get();
+ ConcurrentHashMap<String, IOContext> map;
+ if (attemptId == null) {
+ map = globalMap;
+ } else {
+ map = attemptMap.get(attemptId);
+ if (map == null) {
+ map = new ConcurrentHashMap<>();
+ ConcurrentHashMap<String, IOContext> oldMap = attemptMap.putIfAbsent(attemptId, map);
+ if (oldMap != null) {
+ map = oldMap;
+ }
+ }
+ }
+
+ IOContext ioContext = map.get(inputName);
+ if (ioContext != null) return ioContext;
+ ioContext = new IOContext();
+ IOContext oldContext = map.putIfAbsent(inputName, ioContext);
+ return (oldContext == null) ? ioContext : oldContext;
+ }
+
+ public static void clear() {
+ sparkThreadLocal.remove();
+ globalMap.clear();
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/44e550b6/ql/src/test/org/apache/hadoop/hive/ql/exec/TestOperators.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/TestOperators.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestOperators.java
index 62057d8..c3a36c0 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/exec/TestOperators.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestOperators.java
@@ -33,6 +33,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.Driver;
import org.apache.hadoop.hive.ql.io.IOContext;
+import org.apache.hadoop.hive.ql.io.IOContextMap;
import org.apache.hadoop.hive.ql.parse.TypeCheckProcFactory;
import org.apache.hadoop.hive.ql.plan.CollectDesc;
import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc;
@@ -272,7 +273,7 @@ public class TestOperators extends TestCase {
JobConf hconf = new JobConf(TestOperators.class);
HiveConf.setVar(hconf, HiveConf.ConfVars.HADOOPMAPFILENAME,
"hdfs:///testDir/testFile");
- IOContext.get(hconf).setInputPath(
+ IOContextMap.get(hconf).setInputPath(
new Path("hdfs:///testDir/testFile"));
// initialize pathToAliases
http://git-wip-us.apache.org/repos/asf/hive/blob/44e550b6/ql/src/test/org/apache/hadoop/hive/ql/io/TestHiveBinarySearchRecordReader.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/TestHiveBinarySearchRecordReader.java b/ql/src/test/org/apache/hadoop/hive/ql/io/TestHiveBinarySearchRecordReader.java
index 7a1748c..9dc4f5b 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/io/TestHiveBinarySearchRecordReader.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/io/TestHiveBinarySearchRecordReader.java
@@ -116,7 +116,7 @@ public class TestHiveBinarySearchRecordReader extends TestCase {
private void resetIOContext() {
conf.set(Utilities.INPUT_NAME, "TestHiveBinarySearchRecordReader");
- ioContext = IOContext.get(conf);
+ ioContext = IOContextMap.get(conf);
ioContext.setUseSorted(false);
ioContext.setBinarySearching(false);
ioContext.setEndBinarySearch(false);
http://git-wip-us.apache.org/repos/asf/hive/blob/44e550b6/ql/src/test/org/apache/hadoop/hive/ql/io/TestIOContextMap.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/TestIOContextMap.java b/ql/src/test/org/apache/hadoop/hive/ql/io/TestIOContextMap.java
new file mode 100644
index 0000000..dad5536
--- /dev/null
+++ b/ql/src/test/org/apache/hadoop/hive/ql/io/TestIOContextMap.java
@@ -0,0 +1,207 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.io;
+
+import static org.junit.Assert.*;
+
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.FutureTask;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.junit.Test;
+
+import com.google.common.collect.Sets;
+
+public class TestIOContextMap {
+
+ private void syncThreadStart(final CountDownLatch cdlIn, final CountDownLatch cdlOut) {
+ cdlIn.countDown();
+ try {
+ cdlOut.await();
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Test
+ public void testMRTezGlobalMap() throws Exception {
+ // Tests concurrent modification, and that results are the same per input across threads
+ // but different between inputs.
+ final int THREAD_COUNT = 2, ITER_COUNT = 1000;
+ final AtomicInteger countdown = new AtomicInteger(ITER_COUNT);
+ final CountDownLatch phase1End = new CountDownLatch(THREAD_COUNT);
+ final IOContext[] results = new IOContext[ITER_COUNT];
+ ExecutorService executor = Executors.newFixedThreadPool(THREAD_COUNT);
+ final CountDownLatch cdlIn = new CountDownLatch(THREAD_COUNT), cdlOut = new CountDownLatch(1);
+
+ @SuppressWarnings("unchecked")
+ FutureTask<Void>[] tasks = new FutureTask[THREAD_COUNT];
+ for (int i = 0; i < tasks.length; ++i) {
+ tasks[i] = new FutureTask<Void>(new Callable<Void>() {
+ public Void call() throws Exception {
+ Configuration conf = new Configuration();
+ syncThreadStart(cdlIn, cdlOut);
+ // Phase 1 - create objects.
+ while (true) {
+ int nextIx = countdown.decrementAndGet();
+ if (nextIx < 0) break;
+ conf.set(Utilities.INPUT_NAME, "Input " + nextIx);
+ results[nextIx] = IOContextMap.get(conf);
+ if (nextIx == 0) break;
+ }
+ phase1End.countDown();
+ phase1End.await();
+ // Phase 2 - verify we get the expected objects created by all threads.
+ for (int i = 0; i < ITER_COUNT; ++i) {
+ conf.set(Utilities.INPUT_NAME, "Input " + i);
+ IOContext ctx = IOContextMap.get(conf);
+ assertSame(results[i], ctx);
+ }
+ return null;
+ }
+ });
+ executor.execute(tasks[i]);
+ }
+
+ cdlIn.await(); // Wait for all threads to be ready.
+ cdlOut.countDown(); // Release them at the same time.
+ for (int i = 0; i < tasks.length; ++i) {
+ tasks[i].get();
+ }
+ Set<IOContext> resultSet = Sets.newIdentityHashSet();
+ for (int i = 0; i < results.length; ++i) {
+ assertTrue(resultSet.add(results[i])); // All the objects must be different.
+ }
+ }
+
+ @Test
+ public void testTezLlapAttemptMap() throws Exception {
+ // Tests that different threads get the same object per attempt per input, and different
+ // between attempts/inputs; that attempt is inherited between threads; and that clearing
+ // the attempt produces a different result.
+ final int THREAD_COUNT = 2, ITER_COUNT = 1000, ATTEMPT_COUNT = 3;
+ final AtomicInteger countdown = new AtomicInteger(ITER_COUNT);
+ final IOContext[] results = new IOContext[ITER_COUNT * ATTEMPT_COUNT];
+ ExecutorService executor = Executors.newFixedThreadPool(THREAD_COUNT);
+ final CountDownLatch cdlIn = new CountDownLatch(THREAD_COUNT), cdlOut = new CountDownLatch(1);
+
+ @SuppressWarnings("unchecked")
+ FutureTask<Void>[] tasks = new FutureTask[THREAD_COUNT];
+ for (int i = 0; i < tasks.length; ++i) {
+ tasks[i] = new FutureTask<Void>(new Callable<Void>() {
+ public Void call() throws Exception {
+ final Configuration conf = new Configuration(), conf2 = new Configuration();
+ syncThreadStart(cdlIn, cdlOut);
+ while (true) {
+ int nextIx = countdown.decrementAndGet();
+ if (nextIx < 0) break;
+ String input1 = "Input " + nextIx;
+ conf.set(Utilities.INPUT_NAME, input1);
+ for (int j = 0; j < ATTEMPT_COUNT; ++j) {
+ String attemptId = "Attempt " + nextIx + ":" + j;
+ IOContextMap.setThreadAttemptId(attemptId);
+ final IOContext r1 = results[(nextIx * ATTEMPT_COUNT) + j] = IOContextMap.get(conf);
+ // For some attempts, check inheritance.
+ if ((nextIx % (ITER_COUNT / 10)) == 0) {
+ String input2 = "Input2 " + nextIx;
+ conf2.set(Utilities.INPUT_NAME, input2);
+ final AtomicReference<IOContext> ref2 = new AtomicReference<>();
+ Thread t = new Thread(new Runnable() {
+ public void run() {
+ assertSame(r1, IOContextMap.get(conf));
+ ref2.set(IOContextMap.get(conf2));
+ }
+ });
+ t.start();
+ t.join();
+ assertSame(ref2.get(), IOContextMap.get(conf2));
+ }
+ // Don't clear the attempt ID, or the stuff will be cleared.
+ }
+ if (nextIx == 0) break;
+ }
+ return null;
+ }
+ });
+ executor.execute(tasks[i]);
+ }
+
+ cdlIn.await(); // Wait for all threads to be ready.
+ cdlOut.countDown(); // Release them at the same time.
+ for (int i = 0; i < tasks.length; ++i) {
+ tasks[i].get();
+ }
+ Configuration conf = new Configuration();
+ Set<IOContext> resultSet = Sets.newIdentityHashSet();
+ for (int i = 0; i < ITER_COUNT; ++i) {
+ conf.set(Utilities.INPUT_NAME, "Input " + i);
+ for (int j = 0; j < ATTEMPT_COUNT; ++j) {
+ String attemptId = "Attempt " + i + ":" + j;
+ IOContext result = results[(i * ATTEMPT_COUNT) + j];
+ assertTrue(resultSet.add(result)); // All the objects must be different.
+ IOContextMap.setThreadAttemptId(attemptId);
+ assertSame(result, IOContextMap.get(conf)); // Matching result for attemptId + input.
+ IOContextMap.clearThreadAttempt(attemptId);
+ IOContextMap.setThreadAttemptId(attemptId);
+ assertNotSame(result, IOContextMap.get(conf)); // Different result after clearing.
+ }
+ }
+ }
+
+ @Test
+ public void testSparkThreadLocal() throws Exception {
+ // Test that input name does not change IOContext returned, and that each thread gets its own.
+ final Configuration conf1 = new Configuration();
+ conf1.set(HiveConf.ConfVars.HIVE_EXECUTION_ENGINE.varname, "spark");
+ final Configuration conf2 = new Configuration(conf1);
+ conf2.set(Utilities.INPUT_NAME, "Other input");
+ final int THREAD_COUNT = 2;
+ ExecutorService executor = Executors.newFixedThreadPool(THREAD_COUNT);
+ final CountDownLatch cdlIn = new CountDownLatch(THREAD_COUNT), cdlOut = new CountDownLatch(1);
+ @SuppressWarnings("unchecked")
+ FutureTask<IOContext>[] tasks = new FutureTask[THREAD_COUNT];
+ for (int i = 0; i < tasks.length; ++i) {
+ tasks[i] = new FutureTask<IOContext>(new Callable<IOContext>() {
+ public IOContext call() throws Exception {
+ syncThreadStart(cdlIn, cdlOut);
+ IOContext c1 = IOContextMap.get(conf1), c2 = IOContextMap.get(conf2);
+ assertSame(c1, c2);
+ return c1;
+ }
+ });
+ executor.execute(tasks[i]);
+ }
+
+ cdlIn.await(); // Wait for all threads to be ready.
+ cdlOut.countDown(); // Release them at the same time.
+ Set<IOContext> results = Sets.newIdentityHashSet();
+ for (int i = 0; i < tasks.length; ++i) {
+ assertTrue(results.add(tasks[i].get())); // All the objects must be different.
+ }
+ }
+
+}