You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by xu...@apache.org on 2015/07/16 09:52:32 UTC
[14/50] [abbrv] hive git commit: HIVE-11170 : port parts of
HIVE-11015 to master for ease of future merging (Sergey Shelukhin,
reviewed by Vikram Dixit K)
HIVE-11170 : port parts of HIVE-11015 to master for ease of future merging (Sergey Shelukhin, reviewed by Vikram Dixit K)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/d89a7d1e
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/d89a7d1e
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/d89a7d1e
Branch: refs/heads/parquet
Commit: d89a7d1e7fe7fb51aeb514e4357ae149158b2a34
Parents: d314425
Author: Sergey Shelukhin <se...@apache.org>
Authored: Thu Jul 9 17:50:32 2015 -0700
Committer: Sergey Shelukhin <se...@apache.org>
Committed: Thu Jul 9 17:50:32 2015 -0700
----------------------------------------------------------------------
.../hadoop/hive/ql/exec/FilterOperator.java | 3 +-
.../hive/ql/exec/mr/ExecMapperContext.java | 10 +-
.../ql/io/HiveContextAwareRecordReader.java | 2 +-
.../org/apache/hadoop/hive/ql/io/IOContext.java | 43 ------
.../apache/hadoop/hive/ql/io/IOContextMap.java | 81 +++++++++++
.../hadoop/hive/ql/exec/TestOperators.java | 3 +-
.../ql/io/TestHiveBinarySearchRecordReader.java | 2 +-
.../hadoop/hive/ql/io/TestIOContextMap.java | 133 +++++++++++++++++++
8 files changed, 223 insertions(+), 54 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/d89a7d1e/ql/src/java/org/apache/hadoop/hive/ql/exec/FilterOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/FilterOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/FilterOperator.java
index 65301c0..ae35766 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/FilterOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/FilterOperator.java
@@ -25,6 +25,7 @@ import java.util.concurrent.Future;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.io.IOContext;
+import org.apache.hadoop.hive.ql.io.IOContextMap;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.plan.FilterDesc;
import org.apache.hadoop.hive.ql.plan.api.OperatorType;
@@ -61,7 +62,7 @@ public class FilterOperator extends Operator<FilterDesc> implements
}
conditionInspector = null;
- ioContext = IOContext.get(hconf);
+ ioContext = IOContextMap.get(hconf);
} catch (Throwable e) {
throw new HiveException(e);
}
http://git-wip-us.apache.org/repos/asf/hive/blob/d89a7d1e/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapperContext.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapperContext.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapperContext.java
index 13d0650..fc5abfe 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapperContext.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapperContext.java
@@ -22,8 +22,8 @@ import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.ql.exec.FetchOperator;
-import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.io.IOContext;
+import org.apache.hadoop.hive.ql.io.IOContextMap;
import org.apache.hadoop.hive.ql.plan.MapredLocalWork;
import org.apache.hadoop.mapred.JobConf;
@@ -63,11 +63,11 @@ public class ExecMapperContext {
public ExecMapperContext(JobConf jc) {
this.jc = jc;
- ioCxt = IOContext.get(jc);
+ ioCxt = IOContextMap.get(jc);
}
public void clear() {
- IOContext.clear();
+ IOContextMap.clear();
ioCxt = null;
}
@@ -151,8 +151,4 @@ public class ExecMapperContext {
public IOContext getIoCxt() {
return ioCxt;
}
-
- public void setIoCxt(IOContext ioCxt) {
- this.ioCxt = ioCxt;
- }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/d89a7d1e/ql/src/java/org/apache/hadoop/hive/ql/io/HiveContextAwareRecordReader.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveContextAwareRecordReader.java b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveContextAwareRecordReader.java
index 9b3f8ec..738ca9c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveContextAwareRecordReader.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveContextAwareRecordReader.java
@@ -162,7 +162,7 @@ public abstract class HiveContextAwareRecordReader<K, V> implements RecordReader
}
public IOContext getIOContext() {
- return IOContext.get(jobConf);
+ return IOContextMap.get(jobConf);
}
private void initIOContext(long startPos, boolean isBlockPointer,
http://git-wip-us.apache.org/repos/asf/hive/blob/d89a7d1e/ql/src/java/org/apache/hadoop/hive/ql/io/IOContext.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/IOContext.java b/ql/src/java/org/apache/hadoop/hive/ql/io/IOContext.java
index ebad0a6..019db8d 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/IOContext.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/IOContext.java
@@ -18,13 +18,7 @@
package org.apache.hadoop.hive.ql.io;
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.ql.exec.Utilities;
/**
* IOContext basically contains the position information of the current
@@ -35,43 +29,6 @@ import org.apache.hadoop.hive.ql.exec.Utilities;
* nextBlockStart refers the end of current row and beginning of next row.
*/
public class IOContext {
-
- /**
- * Spark uses this thread local
- */
- private static final ThreadLocal<IOContext> threadLocal = new ThreadLocal<IOContext>(){
- @Override
- protected IOContext initialValue() { return new IOContext(); }
- };
-
- private static IOContext get() {
- return IOContext.threadLocal.get();
- }
-
- /**
- * Tez and MR use this map but are single threaded per JVM thus no synchronization is required.
- */
- private static final Map<String, IOContext> inputNameIOContextMap = new HashMap<String, IOContext>();
-
-
- public static IOContext get(Configuration conf) {
- if (HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("spark")) {
- return get();
- }
- String inputName = conf.get(Utilities.INPUT_NAME);
- if (!inputNameIOContextMap.containsKey(inputName)) {
- IOContext ioContext = new IOContext();
- inputNameIOContextMap.put(inputName, ioContext);
- }
-
- return inputNameIOContextMap.get(inputName);
- }
-
- public static void clear() {
- IOContext.threadLocal.remove();
- inputNameIOContextMap.clear();
- }
-
private long currentBlockStart;
private long nextBlockStart;
private long currentRow;
http://git-wip-us.apache.org/repos/asf/hive/blob/d89a7d1e/ql/src/java/org/apache/hadoop/hive/ql/io/IOContextMap.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/IOContextMap.java b/ql/src/java/org/apache/hadoop/hive/ql/io/IOContextMap.java
new file mode 100644
index 0000000..342c526
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/IOContextMap.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.io;
+
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.Utilities;
+
+/**
+ * NOTE: before LLAP branch merge, there's no LLAP code here.
+ * There used to be a global static map of IOContext-s inside IOContext (Hive style!).
+ * Unfortunately, due to variety of factors, this is now a giant fustercluck.
+ * 1) Spark doesn't apparently care about multiple inputs, but has multiple threads, so one
+ * threadlocal IOContext was added for it.
+ * 2) LLAP has lots of tasks in the same process so globals no longer cut it either.
+ * 3) However, Tez runs 2+ threads for one task (e.g. TezTaskEventRouter and TezChild), and these
+ * surprisingly enough need the same context. Tez, in its infinite wisdom, doesn't allow them
+ * to communicate in any way nor provide any shared context.
+ * So we are going to...
+ * 1) Keep the good ol' global map for MR and Tez. Hive style!
+ * 2) Keep the threadlocal for Spark. Hive style!
+ * 3) Create inheritable (TADA!) threadlocal with attemptId, only set in LLAP; that will propagate
+ * to all the little Tez threads, and we will keep a map per attempt. Hive style squared!
+ */
+public class IOContextMap {
+ public static final String DEFAULT_CONTEXT = "";
+ private static final Log LOG = LogFactory.getLog(IOContextMap.class);
+
+ /** Used for Tez and MR */
+ private static final ConcurrentHashMap<String, IOContext> globalMap =
+ new ConcurrentHashMap<String, IOContext>();
+
+ /** Used for Spark */
+ private static final ThreadLocal<IOContext> sparkThreadLocal = new ThreadLocal<IOContext>(){
+ @Override
+ protected IOContext initialValue() { return new IOContext(); }
+ };
+
+ public static IOContext get(Configuration conf) {
+ if (HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("spark")) {
+ return sparkThreadLocal.get();
+ }
+ String inputName = conf.get(Utilities.INPUT_NAME);
+ if (inputName == null) {
+ inputName = DEFAULT_CONTEXT;
+ }
+ ConcurrentHashMap<String, IOContext> map;
+ map = globalMap;
+
+ IOContext ioContext = map.get(inputName);
+ if (ioContext != null) return ioContext;
+ ioContext = new IOContext();
+ IOContext oldContext = map.putIfAbsent(inputName, ioContext);
+ return (oldContext == null) ? ioContext : oldContext;
+ }
+
+ public static void clear() {
+ sparkThreadLocal.remove();
+ globalMap.clear();
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/d89a7d1e/ql/src/test/org/apache/hadoop/hive/ql/exec/TestOperators.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/TestOperators.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestOperators.java
index 62057d8..c3a36c0 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/exec/TestOperators.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestOperators.java
@@ -33,6 +33,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.Driver;
import org.apache.hadoop.hive.ql.io.IOContext;
+import org.apache.hadoop.hive.ql.io.IOContextMap;
import org.apache.hadoop.hive.ql.parse.TypeCheckProcFactory;
import org.apache.hadoop.hive.ql.plan.CollectDesc;
import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc;
@@ -272,7 +273,7 @@ public class TestOperators extends TestCase {
JobConf hconf = new JobConf(TestOperators.class);
HiveConf.setVar(hconf, HiveConf.ConfVars.HADOOPMAPFILENAME,
"hdfs:///testDir/testFile");
- IOContext.get(hconf).setInputPath(
+ IOContextMap.get(hconf).setInputPath(
new Path("hdfs:///testDir/testFile"));
// initialize pathToAliases
http://git-wip-us.apache.org/repos/asf/hive/blob/d89a7d1e/ql/src/test/org/apache/hadoop/hive/ql/io/TestHiveBinarySearchRecordReader.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/TestHiveBinarySearchRecordReader.java b/ql/src/test/org/apache/hadoop/hive/ql/io/TestHiveBinarySearchRecordReader.java
index 7a1748c..9dc4f5b 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/io/TestHiveBinarySearchRecordReader.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/io/TestHiveBinarySearchRecordReader.java
@@ -116,7 +116,7 @@ public class TestHiveBinarySearchRecordReader extends TestCase {
private void resetIOContext() {
conf.set(Utilities.INPUT_NAME, "TestHiveBinarySearchRecordReader");
- ioContext = IOContext.get(conf);
+ ioContext = IOContextMap.get(conf);
ioContext.setUseSorted(false);
ioContext.setBinarySearching(false);
ioContext.setEndBinarySearch(false);
http://git-wip-us.apache.org/repos/asf/hive/blob/d89a7d1e/ql/src/test/org/apache/hadoop/hive/ql/io/TestIOContextMap.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/TestIOContextMap.java b/ql/src/test/org/apache/hadoop/hive/ql/io/TestIOContextMap.java
new file mode 100644
index 0000000..4469353
--- /dev/null
+++ b/ql/src/test/org/apache/hadoop/hive/ql/io/TestIOContextMap.java
@@ -0,0 +1,133 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.io;
+
+import static org.junit.Assert.*;
+
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.FutureTask;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.junit.Test;
+
+import com.google.common.collect.Sets;
+
+public class TestIOContextMap {
+
+ private void syncThreadStart(final CountDownLatch cdlIn, final CountDownLatch cdlOut) {
+ cdlIn.countDown();
+ try {
+ cdlOut.await();
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Test
+ public void testMRTezGlobalMap() throws Exception {
+ // Tests concurrent modification, and that results are the same per input across threads
+ // but different between inputs.
+ final int THREAD_COUNT = 2, ITER_COUNT = 1000;
+ final AtomicInteger countdown = new AtomicInteger(ITER_COUNT);
+ final CountDownLatch phase1End = new CountDownLatch(THREAD_COUNT);
+ final IOContext[] results = new IOContext[ITER_COUNT];
+ ExecutorService executor = Executors.newFixedThreadPool(THREAD_COUNT);
+ final CountDownLatch cdlIn = new CountDownLatch(THREAD_COUNT), cdlOut = new CountDownLatch(1);
+
+ @SuppressWarnings("unchecked")
+ FutureTask<Void>[] tasks = new FutureTask[THREAD_COUNT];
+ for (int i = 0; i < tasks.length; ++i) {
+ tasks[i] = new FutureTask<Void>(new Callable<Void>() {
+ public Void call() throws Exception {
+ Configuration conf = new Configuration();
+ syncThreadStart(cdlIn, cdlOut);
+ // Phase 1 - create objects.
+ while (true) {
+ int nextIx = countdown.decrementAndGet();
+ if (nextIx < 0) break;
+ conf.set(Utilities.INPUT_NAME, "Input " + nextIx);
+ results[nextIx] = IOContextMap.get(conf);
+ if (nextIx == 0) break;
+ }
+ phase1End.countDown();
+ phase1End.await();
+ // Phase 2 - verify we get the expected objects created by all threads.
+ for (int i = 0; i < ITER_COUNT; ++i) {
+ conf.set(Utilities.INPUT_NAME, "Input " + i);
+ IOContext ctx = IOContextMap.get(conf);
+ assertSame(results[i], ctx);
+ }
+ return null;
+ }
+ });
+ executor.execute(tasks[i]);
+ }
+
+ cdlIn.await(); // Wait for all threads to be ready.
+ cdlOut.countDown(); // Release them at the same time.
+ for (int i = 0; i < tasks.length; ++i) {
+ tasks[i].get();
+ }
+ Set<IOContext> resultSet = Sets.newIdentityHashSet();
+ for (int i = 0; i < results.length; ++i) {
+ assertTrue(resultSet.add(results[i])); // All the objects must be different.
+ }
+ }
+
+ @Test
+ public void testSparkThreadLocal() throws Exception {
+ // Test that input name does not change IOContext returned, and that each thread gets its own.
+ final Configuration conf1 = new Configuration();
+ conf1.set(HiveConf.ConfVars.HIVE_EXECUTION_ENGINE.varname, "spark");
+ final Configuration conf2 = new Configuration(conf1);
+ conf2.set(Utilities.INPUT_NAME, "Other input");
+ final int THREAD_COUNT = 2;
+ ExecutorService executor = Executors.newFixedThreadPool(THREAD_COUNT);
+ final CountDownLatch cdlIn = new CountDownLatch(THREAD_COUNT), cdlOut = new CountDownLatch(1);
+ @SuppressWarnings("unchecked")
+ FutureTask<IOContext>[] tasks = new FutureTask[THREAD_COUNT];
+ for (int i = 0; i < tasks.length; ++i) {
+ tasks[i] = new FutureTask<IOContext>(new Callable<IOContext>() {
+ public IOContext call() throws Exception {
+ syncThreadStart(cdlIn, cdlOut);
+ IOContext c1 = IOContextMap.get(conf1), c2 = IOContextMap.get(conf2);
+ assertSame(c1, c2);
+ return c1;
+ }
+ });
+ executor.execute(tasks[i]);
+ }
+
+ cdlIn.await(); // Wait for all threads to be ready.
+ cdlOut.countDown(); // Release them at the same time.
+ Set<IOContext> results = Sets.newIdentityHashSet();
+ for (int i = 0; i < tasks.length; ++i) {
+ assertTrue(results.add(tasks[i].get())); // All the objects must be different.
+ }
+ }
+
+}