You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by xu...@apache.org on 2015/07/16 09:52:32 UTC

[14/50] [abbrv] hive git commit: HIVE-11170 : port parts of HIVE-11015 to master for ease of future merging (Sergey Shelukhin, reviewed by Vikram Dixit K)

HIVE-11170 : port parts of HIVE-11015 to master for ease of future merging (Sergey Shelukhin, reviewed by Vikram Dixit K)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/d89a7d1e
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/d89a7d1e
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/d89a7d1e

Branch: refs/heads/parquet
Commit: d89a7d1e7fe7fb51aeb514e4357ae149158b2a34
Parents: d314425
Author: Sergey Shelukhin <se...@apache.org>
Authored: Thu Jul 9 17:50:32 2015 -0700
Committer: Sergey Shelukhin <se...@apache.org>
Committed: Thu Jul 9 17:50:32 2015 -0700

----------------------------------------------------------------------
 .../hadoop/hive/ql/exec/FilterOperator.java     |   3 +-
 .../hive/ql/exec/mr/ExecMapperContext.java      |  10 +-
 .../ql/io/HiveContextAwareRecordReader.java     |   2 +-
 .../org/apache/hadoop/hive/ql/io/IOContext.java |  43 ------
 .../apache/hadoop/hive/ql/io/IOContextMap.java  |  81 +++++++++++
 .../hadoop/hive/ql/exec/TestOperators.java      |   3 +-
 .../ql/io/TestHiveBinarySearchRecordReader.java |   2 +-
 .../hadoop/hive/ql/io/TestIOContextMap.java     | 133 +++++++++++++++++++
 8 files changed, 223 insertions(+), 54 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/d89a7d1e/ql/src/java/org/apache/hadoop/hive/ql/exec/FilterOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/FilterOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/FilterOperator.java
index 65301c0..ae35766 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/FilterOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/FilterOperator.java
@@ -25,6 +25,7 @@ import java.util.concurrent.Future;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.io.IOContext;
+import org.apache.hadoop.hive.ql.io.IOContextMap;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.plan.FilterDesc;
 import org.apache.hadoop.hive.ql.plan.api.OperatorType;
@@ -61,7 +62,7 @@ public class FilterOperator extends Operator<FilterDesc> implements
       }
 
       conditionInspector = null;
-      ioContext = IOContext.get(hconf);
+      ioContext = IOContextMap.get(hconf);
     } catch (Throwable e) {
       throw new HiveException(e);
     }

http://git-wip-us.apache.org/repos/asf/hive/blob/d89a7d1e/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapperContext.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapperContext.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapperContext.java
index 13d0650..fc5abfe 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapperContext.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapperContext.java
@@ -22,8 +22,8 @@ import java.util.Map;
 import org.apache.commons.logging.Log;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.ql.exec.FetchOperator;
-import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.hive.ql.io.IOContext;
+import org.apache.hadoop.hive.ql.io.IOContextMap;
 import org.apache.hadoop.hive.ql.plan.MapredLocalWork;
 import org.apache.hadoop.mapred.JobConf;
 
@@ -63,11 +63,11 @@ public class ExecMapperContext {
 
   public ExecMapperContext(JobConf jc) {
     this.jc = jc;
-    ioCxt = IOContext.get(jc);
+    ioCxt = IOContextMap.get(jc);
   }
 
   public void clear() {
-    IOContext.clear();
+    IOContextMap.clear();
     ioCxt = null;
   }
 
@@ -151,8 +151,4 @@ public class ExecMapperContext {
   public IOContext getIoCxt() {
     return ioCxt;
   }
-
-  public void setIoCxt(IOContext ioCxt) {
-    this.ioCxt = ioCxt;
-  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/d89a7d1e/ql/src/java/org/apache/hadoop/hive/ql/io/HiveContextAwareRecordReader.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveContextAwareRecordReader.java b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveContextAwareRecordReader.java
index 9b3f8ec..738ca9c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveContextAwareRecordReader.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveContextAwareRecordReader.java
@@ -162,7 +162,7 @@ public abstract class HiveContextAwareRecordReader<K, V> implements RecordReader
   }
 
   public IOContext getIOContext() {
-    return IOContext.get(jobConf);
+    return IOContextMap.get(jobConf);
   }
 
   private void initIOContext(long startPos, boolean isBlockPointer,

http://git-wip-us.apache.org/repos/asf/hive/blob/d89a7d1e/ql/src/java/org/apache/hadoop/hive/ql/io/IOContext.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/IOContext.java b/ql/src/java/org/apache/hadoop/hive/ql/io/IOContext.java
index ebad0a6..019db8d 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/IOContext.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/IOContext.java
@@ -18,13 +18,7 @@
 
 package org.apache.hadoop.hive.ql.io;
 
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.ql.exec.Utilities;
 
 /**
  * IOContext basically contains the position information of the current
@@ -35,43 +29,6 @@ import org.apache.hadoop.hive.ql.exec.Utilities;
  * nextBlockStart refers the end of current row and beginning of next row.
  */
 public class IOContext {
-
-  /**
-   * Spark uses this thread local
-   */
-  private static final ThreadLocal<IOContext> threadLocal = new ThreadLocal<IOContext>(){
-    @Override
-    protected IOContext initialValue() { return new IOContext(); }
- };
-
-  private static IOContext get() {
-    return IOContext.threadLocal.get();
-  }
-
-  /**
-   * Tez and MR use this map but are single threaded per JVM thus no synchronization is required.
-   */
-  private static final Map<String, IOContext> inputNameIOContextMap = new HashMap<String, IOContext>();
-
-
-  public static IOContext get(Configuration conf) {
-    if (HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("spark")) {
-      return get();
-    }
-    String inputName = conf.get(Utilities.INPUT_NAME);
-    if (!inputNameIOContextMap.containsKey(inputName)) {
-      IOContext ioContext = new IOContext();
-      inputNameIOContextMap.put(inputName, ioContext);
-    }
-
-    return inputNameIOContextMap.get(inputName);
-  }
-
-  public static void clear() {
-    IOContext.threadLocal.remove();
-    inputNameIOContextMap.clear();
-  }
-
   private long currentBlockStart;
   private long nextBlockStart;
   private long currentRow;

http://git-wip-us.apache.org/repos/asf/hive/blob/d89a7d1e/ql/src/java/org/apache/hadoop/hive/ql/io/IOContextMap.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/IOContextMap.java b/ql/src/java/org/apache/hadoop/hive/ql/io/IOContextMap.java
new file mode 100644
index 0000000..342c526
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/IOContextMap.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.io;
+
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.Utilities;
+
+/**
+ * NOTE: before LLAP branch merge, there's no LLAP code here.
+ * There used to be a global static map of IOContext-s inside IOContext (Hive style!).
+ * Unfortunately, due to variety of factors, this is now a giant fustercluck.
+ * 1) Spark doesn't apparently care about multiple inputs, but has multiple threads, so one
+ *    threadlocal IOContext was added for it.
+ * 2) LLAP has lots of tasks in the same process so globals no longer cut it either.
+ * 3) However, Tez runs 2+ threads for one task (e.g. TezTaskEventRouter and TezChild), and these
+ *    surprisingly enough need the same context. Tez, in its infinite wisdom, doesn't allow them
+ *    to communicate in any way nor provide any shared context.
+ * So we are going to...
+ * 1) Keep the good ol' global map for MR and Tez. Hive style!
+ * 2) Keep the threadlocal for Spark. Hive style!
+ * 3) Create inheritable (TADA!) threadlocal with attemptId, only set in LLAP; that will propagate
+ *    to all the little Tez threads, and we will keep a map per attempt. Hive style squared!
+ */
+public class IOContextMap {
+  public static final String DEFAULT_CONTEXT = "";
+  private static final Log LOG = LogFactory.getLog(IOContextMap.class);
+
+  /** Used for Tez and MR */
+  private static final ConcurrentHashMap<String, IOContext> globalMap =
+      new ConcurrentHashMap<String, IOContext>();
+
+  /** Used for Spark */
+  private static final ThreadLocal<IOContext> sparkThreadLocal = new ThreadLocal<IOContext>(){
+    @Override
+    protected IOContext initialValue() { return new IOContext(); }
+  };
+
+  public static IOContext get(Configuration conf) {
+    if (HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("spark")) {
+      return sparkThreadLocal.get();
+    }
+    String inputName = conf.get(Utilities.INPUT_NAME);
+    if (inputName == null) {
+      inputName = DEFAULT_CONTEXT;
+    }
+    ConcurrentHashMap<String, IOContext> map;
+    map = globalMap;
+
+    IOContext ioContext = map.get(inputName);
+    if (ioContext != null) return ioContext;
+    ioContext = new IOContext();
+    IOContext oldContext = map.putIfAbsent(inputName, ioContext);
+    return (oldContext == null) ? ioContext : oldContext;
+  }
+
+  public static void clear() {
+    sparkThreadLocal.remove();
+    globalMap.clear();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/d89a7d1e/ql/src/test/org/apache/hadoop/hive/ql/exec/TestOperators.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/TestOperators.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestOperators.java
index 62057d8..c3a36c0 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/exec/TestOperators.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestOperators.java
@@ -33,6 +33,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.Driver;
 import org.apache.hadoop.hive.ql.io.IOContext;
+import org.apache.hadoop.hive.ql.io.IOContextMap;
 import org.apache.hadoop.hive.ql.parse.TypeCheckProcFactory;
 import org.apache.hadoop.hive.ql.plan.CollectDesc;
 import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc;
@@ -272,7 +273,7 @@ public class TestOperators extends TestCase {
       JobConf hconf = new JobConf(TestOperators.class);
       HiveConf.setVar(hconf, HiveConf.ConfVars.HADOOPMAPFILENAME,
           "hdfs:///testDir/testFile");
-      IOContext.get(hconf).setInputPath(
+      IOContextMap.get(hconf).setInputPath(
           new Path("hdfs:///testDir/testFile"));
 
       // initialize pathToAliases

http://git-wip-us.apache.org/repos/asf/hive/blob/d89a7d1e/ql/src/test/org/apache/hadoop/hive/ql/io/TestHiveBinarySearchRecordReader.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/TestHiveBinarySearchRecordReader.java b/ql/src/test/org/apache/hadoop/hive/ql/io/TestHiveBinarySearchRecordReader.java
index 7a1748c..9dc4f5b 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/io/TestHiveBinarySearchRecordReader.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/io/TestHiveBinarySearchRecordReader.java
@@ -116,7 +116,7 @@ public class TestHiveBinarySearchRecordReader extends TestCase {
 
   private void resetIOContext() {
     conf.set(Utilities.INPUT_NAME, "TestHiveBinarySearchRecordReader");
-    ioContext = IOContext.get(conf);
+    ioContext = IOContextMap.get(conf);
     ioContext.setUseSorted(false);
     ioContext.setBinarySearching(false);
     ioContext.setEndBinarySearch(false);

http://git-wip-us.apache.org/repos/asf/hive/blob/d89a7d1e/ql/src/test/org/apache/hadoop/hive/ql/io/TestIOContextMap.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/TestIOContextMap.java b/ql/src/test/org/apache/hadoop/hive/ql/io/TestIOContextMap.java
new file mode 100644
index 0000000..4469353
--- /dev/null
+++ b/ql/src/test/org/apache/hadoop/hive/ql/io/TestIOContextMap.java
@@ -0,0 +1,133 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.io;
+
+import static org.junit.Assert.*;
+
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.FutureTask;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.junit.Test;
+
+import com.google.common.collect.Sets;
+
+public class TestIOContextMap {
+
+  private void syncThreadStart(final CountDownLatch cdlIn, final CountDownLatch cdlOut) {
+    cdlIn.countDown();
+    try {
+      cdlOut.await();
+    } catch (InterruptedException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Test
+  public void testMRTezGlobalMap() throws Exception {
+    // Tests concurrent modification, and that results are the same per input across threads
+    // but different between inputs.
+    final int THREAD_COUNT = 2, ITER_COUNT = 1000;
+    final AtomicInteger countdown = new AtomicInteger(ITER_COUNT);
+    final CountDownLatch phase1End = new CountDownLatch(THREAD_COUNT);
+    final IOContext[] results = new IOContext[ITER_COUNT];
+    ExecutorService executor = Executors.newFixedThreadPool(THREAD_COUNT);
+    final CountDownLatch cdlIn = new CountDownLatch(THREAD_COUNT), cdlOut = new CountDownLatch(1);
+
+    @SuppressWarnings("unchecked")
+    FutureTask<Void>[] tasks = new FutureTask[THREAD_COUNT];
+    for (int i = 0; i < tasks.length; ++i) {
+      tasks[i] = new FutureTask<Void>(new Callable<Void>() {
+        public Void call() throws Exception {
+          Configuration conf = new Configuration();
+          syncThreadStart(cdlIn, cdlOut);
+          // Phase 1 - create objects.
+          while (true) {
+            int nextIx = countdown.decrementAndGet();
+            if (nextIx < 0) break;
+            conf.set(Utilities.INPUT_NAME, "Input " + nextIx);
+            results[nextIx] = IOContextMap.get(conf);
+            if (nextIx == 0) break;
+          }
+          phase1End.countDown();
+          phase1End.await();
+          // Phase 2 - verify we get the expected objects created by all threads.
+          for (int i = 0; i < ITER_COUNT; ++i) {
+            conf.set(Utilities.INPUT_NAME, "Input " + i);
+            IOContext ctx = IOContextMap.get(conf);
+            assertSame(results[i], ctx);
+          }
+          return null;
+        }
+      });
+      executor.execute(tasks[i]);
+    }
+
+    cdlIn.await(); // Wait for all threads to be ready.
+    cdlOut.countDown(); // Release them at the same time.
+    for (int i = 0; i < tasks.length; ++i) {
+      tasks[i].get();
+    }
+    Set<IOContext> resultSet = Sets.newIdentityHashSet();
+    for (int i = 0; i < results.length; ++i) {
+      assertTrue(resultSet.add(results[i])); // All the objects must be different.
+    }
+  }
+
+  @Test
+  public void testSparkThreadLocal() throws Exception {
+    // Test that input name does not change IOContext returned, and that each thread gets its own.
+    final Configuration conf1 = new Configuration();
+    conf1.set(HiveConf.ConfVars.HIVE_EXECUTION_ENGINE.varname, "spark");
+    final Configuration conf2 = new Configuration(conf1);
+    conf2.set(Utilities.INPUT_NAME, "Other input");
+    final int THREAD_COUNT = 2;
+    ExecutorService executor = Executors.newFixedThreadPool(THREAD_COUNT);
+    final CountDownLatch cdlIn = new CountDownLatch(THREAD_COUNT), cdlOut = new CountDownLatch(1);
+    @SuppressWarnings("unchecked")
+    FutureTask<IOContext>[] tasks = new FutureTask[THREAD_COUNT];
+    for (int i = 0; i < tasks.length; ++i) {
+      tasks[i] = new FutureTask<IOContext>(new Callable<IOContext>() {
+        public IOContext call() throws Exception {
+          syncThreadStart(cdlIn, cdlOut);
+          IOContext c1 = IOContextMap.get(conf1), c2 = IOContextMap.get(conf2);
+          assertSame(c1, c2);
+          return c1;
+        }
+      });
+      executor.execute(tasks[i]);
+    }
+
+    cdlIn.await(); // Wait for all threads to be ready.
+    cdlOut.countDown(); // Release them at the same time.
+    Set<IOContext> results = Sets.newIdentityHashSet();
+    for (int i = 0; i < tasks.length; ++i) {
+      assertTrue(results.add(tasks[i].get())); // All the objects must be different.
+    }
+  }
+
+}