You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2016/01/27 23:47:15 UTC

[02/14] hive git commit: HIVE-12915 : Tez session pool has concurrency issues during init (Sergey Shelukhin, reviewed by Siddharth Seth)

HIVE-12915 : Tez session pool has concurrency issues during init (Sergey Shelukhin, reviewed by Siddharth Seth)

Conflicts:
	ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/c09cc1d2
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/c09cc1d2
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/c09cc1d2

Branch: refs/heads/branch-2.0
Commit: c09cc1d2d944d9b79a803cccd3f1f4570187e276
Parents: 36f8b75
Author: Sergey Shelukhin <se...@apache.org>
Authored: Wed Jan 27 14:13:40 2016 -0800
Committer: Sergey Shelukhin <se...@apache.org>
Committed: Wed Jan 27 14:17:21 2016 -0800

----------------------------------------------------------------------
 .../org/apache/hadoop/hive/conf/HiveConf.java   | 11 +++
 .../hive/ql/exec/spark/HashTableLoader.java     |  1 -
 .../hive/ql/exec/tez/TezSessionPoolManager.java | 96 ++++++++++++--------
 .../hive/ql/exec/tez/TestTezSessionPool.java    |  2 +-
 .../apache/hive/service/server/HiveServer2.java | 12 ++-
 5 files changed, 79 insertions(+), 43 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/c09cc1d2/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 7a25ece..a11d1bc 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -3140,6 +3140,17 @@ public class HiveConf extends Configuration {
     return conf.getTrimmed(var.varname, var.defaultStrVal);
   }
 
+  public static String[] getTrimmedStringsVar(Configuration conf, ConfVars var) {
+    assert (var.valClass == String.class) : var.varname;
+    String[] result = conf.getTrimmedStrings(var.varname, (String[])null);
+    if (result != null) return result;
+    if (var.altName != null) {
+      result = conf.getTrimmedStrings(var.altName, (String[])null);
+      if (result != null) return result;
+    }
+    return org.apache.hadoop.util.StringUtils.getTrimmedStrings(var.defaultStrVal);
+  }
+
   public static String getVar(Configuration conf, ConfVars var, String defaultVal) {
     if (var.altName != null) {
       return conf.get(var.varname, conf.get(var.altName, defaultVal));

http://git-wip-us.apache.org/repos/asf/hive/blob/c09cc1d2/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HashTableLoader.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HashTableLoader.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HashTableLoader.java
index 64474e6..1634f42 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HashTableLoader.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HashTableLoader.java
@@ -161,7 +161,6 @@ public class HashTableLoader implements org.apache.hadoop.hive.ql.exec.HashTable
     }
     MapJoinTableContainer mapJoinTable = SmallTableCache.get(path);
     if (mapJoinTable == null) {
-      // TODO#: HERE?
       synchronized (path.toString().intern()) {
         mapJoinTable = SmallTableCache.get(path);
         if (mapJoinTable == null) {

http://git-wip-us.apache.org/repos/asf/hive/blob/c09cc1d2/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java
index 1321b5f..891d2a8 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java
@@ -26,16 +26,17 @@ import java.net.URISyntaxException;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.PriorityBlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 import java.util.Collection;
-import java.util.Collections;
 import java.util.Comparator;
 import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.Queue;
 import java.util.Random;
 import java.util.Set;
 
@@ -77,13 +78,18 @@ public class TezSessionPoolManager {
 
 
   private Semaphore llapQueue;
-  private int blockingQueueLength = -1;
   private HiveConf initConf = null;
   int numConcurrentLlapQueries = -1;
   private long sessionLifetimeMs = 0;
   private long sessionLifetimeJitterMs = 0;
-
-  private boolean inited = false;
+  /** A queue for initial sessions that have not been started yet. */
+  private Queue<TezSessionPoolSession> initialSessions =
+      new ConcurrentLinkedQueue<TezSessionPoolSession>();
+  /**
+   * Indicates whether we should try to use defaultSessionPool.
+   * We assume that setupPool is either called before any activity, or not called at all.
+   */
+  private volatile boolean hasInitialSessions = false;
 
   private static TezSessionPoolManager sessionPool = null;
 
@@ -101,18 +107,23 @@ public class TezSessionPoolManager {
   protected TezSessionPoolManager() {
   }
 
+  private void startInitialSession(TezSessionPoolSession sessionState) throws Exception {
+    HiveConf newConf = new HiveConf(initConf);
+    boolean isUsable = sessionState.tryUse();
+    if (!isUsable) throw new IOException(sessionState + " is not usable at pool startup");
+    newConf.set("tez.queue.name", sessionState.getQueueName());
+    sessionState.open(newConf);
+    if (sessionState.returnAfterUse()) {
+      defaultQueuePool.put(sessionState);
+    }
+  }
+
   public void startPool() throws Exception {
-    this.inited = true;
-    for (int i = 0; i < blockingQueueLength; i++) {
-      HiveConf newConf = new HiveConf(initConf);
-      TezSessionPoolSession sessionState = defaultQueuePool.take();
-      boolean isUsable = sessionState.tryUse();
-      if (!isUsable) throw new IOException(sessionState + " is not usable at pool startup");
-      newConf.set("tez.queue.name", sessionState.getQueueName());
-      sessionState.open(newConf);
-      if (sessionState.returnAfterUse()) {
-        defaultQueuePool.put(sessionState);
-      }
+    if (initialSessions.isEmpty()) return;
+    while (true) {
+      TezSessionPoolSession session = initialSessions.poll();
+      if (session == null) break;
+      startInitialSession(session);
     }
     if (expirationThread != null) {
       expirationThread.start();
@@ -121,16 +132,32 @@ public class TezSessionPoolManager {
   }
 
   public void setupPool(HiveConf conf) throws InterruptedException {
-    String defaultQueues = HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_SERVER2_TEZ_DEFAULT_QUEUES);
+    String[] defaultQueueList = HiveConf.getTrimmedStringsVar(
+        conf, HiveConf.ConfVars.HIVE_SERVER2_TEZ_DEFAULT_QUEUES);
+    int emptyNames = 0; // We don't create sessions for empty entries.
+    for (String queueName : defaultQueueList) {
+      if (queueName.isEmpty()) {
+        ++emptyNames;
+      }
+    }
     int numSessions = conf.getIntVar(ConfVars.HIVE_SERVER2_TEZ_SESSIONS_PER_DEFAULT_QUEUE);
+    int numSessionsTotal = numSessions * (defaultQueueList.length - emptyNames);
+    if (numSessionsTotal > 0) {
+      defaultQueuePool = new ArrayBlockingQueue<TezSessionPoolSession>(numSessionsTotal);
+    }
+
     numConcurrentLlapQueries = conf.getIntVar(ConfVars.HIVE_SERVER2_LLAP_CONCURRENT_QUERIES);
+    llapQueue = new Semaphore(numConcurrentLlapQueries, true);
+
+    this.initConf = conf;
+
     sessionLifetimeMs = conf.getTimeVar(
         ConfVars.HIVE_SERVER2_TEZ_SESSION_LIFETIME, TimeUnit.MILLISECONDS);
     if (sessionLifetimeMs != 0) {
       sessionLifetimeJitterMs = conf.getTimeVar(
           ConfVars.HIVE_SERVER2_TEZ_SESSION_LIFETIME_JITTER, TimeUnit.MILLISECONDS);
       if (LOG.isDebugEnabled()) {
-        LOG.debug("Starting session expiration threads; session lifetime is "
+        LOG.debug("Session expiration is enabled; session lifetime is "
             + sessionLifetimeMs + " + [0, " + sessionLifetimeJitterMs + ") ms");
       }
       expirationQueue = new PriorityBlockingQueue<>(11, new Comparator<TezSessionPoolSession>() {
@@ -141,6 +168,11 @@ public class TezSessionPoolManager {
         }
       });
       restartQueue = new LinkedBlockingQueue<>();
+    }
+    this.hasInitialSessions = numSessionsTotal > 0;
+    // From this point on, session creation will wait for the default pool (if # of sessions > 0).
+
+    if (sessionLifetimeMs != 0) {
       expirationThread = new Thread(new Runnable() {
         @Override
         public void run() {
@@ -155,27 +187,18 @@ public class TezSessionPoolManager {
       }, "TezSessionPool-cleanup");
     }
 
-    // the list of queues is a comma separated list.
-    String defaultQueueList[] = defaultQueues.split(",");
-    defaultQueuePool = new ArrayBlockingQueue<TezSessionPoolSession>(
-        numSessions * defaultQueueList.length);
-    llapQueue = new Semaphore(numConcurrentLlapQueries, true);
-
-    this.initConf = conf;
     /*
      *  with this the ordering of sessions in the queue will be (with 2 sessions 3 queues)
      *  s1q1, s1q2, s1q3, s2q1, s2q2, s2q3 there by ensuring uniform distribution of
      *  the sessions across queues at least to begin with. Then as sessions get freed up, the list
      *  may change this ordering.
      */
-    blockingQueueLength = 0;
     for (int i = 0; i < numSessions; i++) {
-      for (String queue : defaultQueueList) {
-        if (queue.length() == 0) {
+      for (String queueName : defaultQueueList) {
+        if (queueName.isEmpty()) {
           continue;
         }
-        defaultQueuePool.put(createAndInitSession(queue, true));
-        blockingQueueLength++;
+        initialSessions.add(createAndInitSession(queueName, true));
       }
     }
   }
@@ -196,7 +219,6 @@ public class TezSessionPoolManager {
   private TezSessionState getSession(HiveConf conf, boolean doOpen,
       boolean forceCreate)
       throws Exception {
-
     String queueName = conf.get("tez.queue.name");
 
     boolean nonDefaultUser = conf.getBoolVar(HiveConf.ConfVars.HIVE_SERVER2_ENABLE_DOAS);
@@ -207,12 +229,10 @@ public class TezSessionPoolManager {
      * their own credentials. We expect that with the new security model, things will
      * run as user hive in most cases.
      */
-    if (forceCreate || !(this.inited)
-        || ((queueName != null) && (!queueName.isEmpty()))
-        || (nonDefaultUser) || (defaultQueuePool == null) || (blockingQueueLength <= 0)) {
+    if (forceCreate || nonDefaultUser || !hasInitialSessions
+        || ((queueName != null) && !queueName.isEmpty())) {
       LOG.info("QueueName: " + queueName + " nonDefaultUser: " + nonDefaultUser +
-          " defaultQueuePool: " + defaultQueuePool +
-          " blockingQueueLength: " + blockingQueueLength);
+          " defaultQueuePool: " + defaultQueuePool + " hasInitialSessions: " + hasInitialSessions);
       return getNewSessionState(conf, queueName, doOpen);
     }
 
@@ -266,7 +286,7 @@ public class TezSessionPoolManager {
     // session in the SessionState
   }
 
-  public void closeIfNotDefault(
+  public static void closeIfNotDefault(
       TezSessionState tezSessionState, boolean keepTmpDir) throws Exception {
     LOG.info("Closing tez session default? " + tezSessionState.isDefault());
     if (!tezSessionState.isDefault()) {
@@ -275,7 +295,7 @@ public class TezSessionPoolManager {
   }
 
   public void stop() throws Exception {
-    if ((sessionPool == null) || (this.inited == false)) {
+    if ((sessionPool == null) || !this.hasInitialSessions) {
       return;
     }
 
@@ -328,7 +348,7 @@ public class TezSessionPoolManager {
    * sessions for e.g. when a CLI session is started. The CLI session could re-use the
    * same tez session eliminating the latencies of new AM and containers.
    */
-  private boolean canWorkWithSameSession(TezSessionState session, HiveConf conf)
+  private static boolean canWorkWithSameSession(TezSessionState session, HiveConf conf)
        throws HiveException {
     if (session == null || conf == null) {
       return false;

http://git-wip-us.apache.org/repos/asf/hive/blob/c09cc1d2/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionPool.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionPool.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionPool.java
index a2791a1..a38f4ba 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionPool.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionPool.java
@@ -124,7 +124,7 @@ public class TestTezSessionPool {
       poolManager.setupPool(conf);
       poolManager.startPool();
     } catch (Exception e) {
-      e.printStackTrace();
+      LOG.error("Initialization error", e);
       fail();
     }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/c09cc1d2/service/src/java/org/apache/hive/service/server/HiveServer2.java
----------------------------------------------------------------------
diff --git a/service/src/java/org/apache/hive/service/server/HiveServer2.java b/service/src/java/org/apache/hive/service/server/HiveServer2.java
index 2f55591..512a810 100644
--- a/service/src/java/org/apache/hive/service/server/HiveServer2.java
+++ b/service/src/java/org/apache/hive/service/server/HiveServer2.java
@@ -500,6 +500,13 @@ public class HiveServer2 extends CompositeService {
       maxAttempts = hiveConf.getLongVar(HiveConf.ConfVars.HIVE_SERVER2_MAX_START_ATTEMPTS);
       HiveServer2 server = null;
       try {
+        // Initialize the pool before we start the server; don't start yet.
+        TezSessionPoolManager sessionPool = null;
+        if (hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_TEZ_INITIALIZE_DEFAULT_SESSIONS)) {
+          sessionPool = TezSessionPoolManager.getInstance();
+          sessionPool.setupPool(hiveConf);
+        }
+
         // Cleanup the scratch dir before starting
         ServerUtils.cleanUpScratchDir(hiveConf);
         server = new HiveServer2();
@@ -522,9 +529,8 @@ public class HiveServer2 extends CompositeService {
         if (hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_SUPPORT_DYNAMIC_SERVICE_DISCOVERY)) {
           server.addServerInstanceToZooKeeper(hiveConf);
         }
-        if (hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_TEZ_INITIALIZE_DEFAULT_SESSIONS)) {
-          TezSessionPoolManager sessionPool = TezSessionPoolManager.getInstance();
-          sessionPool.setupPool(hiveConf);
+
+        if (sessionPool != null) {
           sessionPool.startPool();
         }