You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2016/01/23 01:04:41 UTC

hive git commit: HIVE-12528 : don't start HS2 Tez sessions in a single thread (Sergey Shelukhin, reviewed by Siddharth Seth)

Repository: hive
Updated Branches:
  refs/heads/master 00aba1b1b -> bc0f7d9f0


HIVE-12528 : don't start HS2 Tez sessions in a single thread (Sergey Shelukhin, reviewed by Siddharth Seth)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/bc0f7d9f
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/bc0f7d9f
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/bc0f7d9f

Branch: refs/heads/master
Commit: bc0f7d9f013fa538282ef9f0de12e1e1eee1a78f
Parents: 00aba1b
Author: Sergey Shelukhin <se...@apache.org>
Authored: Fri Jan 22 16:04:00 2016 -0800
Committer: Sergey Shelukhin <se...@apache.org>
Committed: Fri Jan 22 16:04:28 2016 -0800

----------------------------------------------------------------------
 .../org/apache/hadoop/hive/conf/HiveConf.java   |  4 +
 .../hadoop/hive/ql/exec/tez/DagUtils.java       | 99 +++++++++++++-------
 .../hive/ql/exec/tez/TezSessionPoolManager.java | 76 ++++++++++++---
 .../hive/ql/exec/tez/TestTezSessionPool.java    | 52 +++++++---
 4 files changed, 174 insertions(+), 57 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/bc0f7d9f/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 50a525c..fb62ae2 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -1921,6 +1921,10 @@ public class HiveConf extends Configuration {
     HIVE_SERVER2_TEZ_SESSION_LIFETIME_JITTER("hive.server2.tez.session.lifetime.jitter", "3h",
         new TimeValidator(TimeUnit.HOURS),
         "The jitter for Tez session lifetime; prevents all the sessions from restarting at once."),
+    HIVE_SERVER2_TEZ_SESSION_MAX_INIT_THREADS("hive.server2.tez.sessions.init.threads", 16,
+        "If hive.server2.tez.initialize.default.sessions is enabled, the maximum number of\n" +
+        "threads to use to initialize the default sessions."),
+
 
     // Operation log configuration
     HIVE_SERVER2_LOGGING_OPERATION_ENABLED("hive.server2.logging.operation.enabled", true,

http://git-wip-us.apache.org/repos/asf/hive/blob/bc0f7d9f/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java
index e8864ae..aa44d5f 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java
@@ -17,6 +17,8 @@
  */
 package org.apache.hadoop.hive.ql.exec.tez;
 
+import java.util.concurrent.ConcurrentHashMap;
+
 import com.google.common.base.Function;
 import com.google.common.collect.Iterators;
 import com.google.common.collect.Lists;
@@ -138,13 +140,21 @@ public class DagUtils {
   public static final String TEZ_TMP_DIR_KEY = "_hive_tez_tmp_dir";
   private static final Logger LOG = LoggerFactory.getLogger(DagUtils.class.getName());
   private static final String TEZ_DIR = "_tez_scratch_dir";
-  private static DagUtils instance;
+  private static final DagUtils instance = new DagUtils();
   // The merge file being currently processed.
   public static final String TEZ_MERGE_CURRENT_MERGE_FILE_PREFIX =
       "hive.tez.current.merge.file.prefix";
   // "A comma separated list of work names used as prefix.
   public static final String TEZ_MERGE_WORK_FILE_PREFIXES = "hive.tez.merge.file.prefixes";
 
+  /**
+   * Notifiers to synchronize resource localization across threads. If one thread is localizing
+   * a file, other threads can wait on the corresponding notifier object instead of just sleeping
+   * before re-checking HDFS. This is used just to avoid unnecesary waits; HDFS check still needs
+   * to be performed to make sure the resource is there and matches the expected file.
+   */
+  private final ConcurrentHashMap<String, Object> copyNotifiers = new ConcurrentHashMap<>();
+
   private void addCredentials(MapWork mapWork, DAG dag) {
     Set<String> paths = mapWork.getPathToAliases().keySet();
     if (!paths.isEmpty()) {
@@ -968,6 +978,7 @@ public class DagUtils {
   }
 
   /**
+   * Localizes a resources. Should be thread-safe.
    * @param src path to the source for the resource
    * @param dest path in hdfs for the resource
    * @param type local resource type (File/Archive)
@@ -975,51 +986,78 @@ public class DagUtils {
    * @return localresource from tez localization.
    * @throws IOException when any file system related calls fails.
    */
-  public LocalResource localizeResource(Path src, Path dest, LocalResourceType type, Configuration conf)
-    throws IOException {
+  public LocalResource localizeResource(
+      Path src, Path dest, LocalResourceType type, Configuration conf) throws IOException {
     FileSystem destFS = dest.getFileSystem(conf);
-
-    if (src != null && checkPreExisting(src, dest, conf) == false) {
+    if (src != null && !checkPreExisting(src, dest, conf)) {
       // copy the src to the destination and create local resource.
       // do not overwrite.
-      LOG.info("Localizing resource because it does not exist: " + src + " to dest: " + dest);
+      String srcStr = src.toString();
+      LOG.info("Localizing resource because it does not exist: " + srcStr + " to dest: " + dest);
+      Object notifierNew = new Object(),
+          notifierOld = copyNotifiers.putIfAbsent(srcStr, notifierNew),
+          notifier = (notifierOld == null) ? notifierNew : notifierOld;
+      // To avoid timing issues with notifications (and given that HDFS check is anyway the
+      // authoritative one), don't wait infinitely for the notifier, just wait a little bit
+      // and check HDFS before and after.
+      if (notifierOld != null
+          && checkOrWaitForTheFile(src, dest, conf, notifierOld, 1, 150, false)) {
+        return createLocalResource(destFS, dest, type, LocalResourceVisibility.PRIVATE);
+      }
       try {
         destFS.copyFromLocalFile(false, false, src, dest);
+        synchronized (notifier) {
+          notifier.notifyAll(); // Notify if we have successfully copied the file.
+        }
+        copyNotifiers.remove(srcStr, notifier);
       } catch (IOException e) {
-        LOG.info("Looks like another thread is writing the same file will wait.");
-        int waitAttempts =
-            conf.getInt(HiveConf.ConfVars.HIVE_LOCALIZE_RESOURCE_NUM_WAIT_ATTEMPTS.varname,
-                HiveConf.ConfVars.HIVE_LOCALIZE_RESOURCE_NUM_WAIT_ATTEMPTS.defaultIntVal);
+        LOG.info("Looks like another thread or process is writing the same file");
+        int waitAttempts = HiveConf.getIntVar(
+            conf, ConfVars.HIVE_LOCALIZE_RESOURCE_NUM_WAIT_ATTEMPTS);
         long sleepInterval = HiveConf.getTimeVar(
-            conf, HiveConf.ConfVars.HIVE_LOCALIZE_RESOURCE_WAIT_INTERVAL,
-            TimeUnit.MILLISECONDS);
-        LOG.info("Number of wait attempts: " + waitAttempts + ". Wait interval: "
-            + sleepInterval);
-        boolean found = false;
-        for (int i = 0; i < waitAttempts; i++) {
-          if (!checkPreExisting(src, dest, conf)) {
-            try {
-              Thread.sleep(sleepInterval);
-            } catch (InterruptedException interruptedException) {
-              throw new IOException(interruptedException);
-            }
-          } else {
-            found = true;
-            break;
-          }
-        }
-        if (!found) {
+            conf, HiveConf.ConfVars.HIVE_LOCALIZE_RESOURCE_WAIT_INTERVAL, TimeUnit.MILLISECONDS);
+        // Only log on the first wait, and check after wait on the last iteration.
+        if (!checkOrWaitForTheFile(
+            src, dest, conf, notifierOld, waitAttempts, sleepInterval, true)) {
           LOG.error("Could not find the jar that was being uploaded");
           throw new IOException("Previous writer likely failed to write " + dest +
               ". Failing because I am unlikely to write too.");
         }
+      } finally {
+        if (notifier == notifierNew) {
+          copyNotifiers.remove(srcStr, notifierNew);
+        }
       }
     }
-
     return createLocalResource(destFS, dest, type,
         LocalResourceVisibility.PRIVATE);
   }
 
+  public boolean checkOrWaitForTheFile(Path src, Path dest, Configuration conf, Object notifier,
+      int waitAttempts, long sleepInterval, boolean doLog) throws IOException {
+    for (int i = 0; i < waitAttempts; i++) {
+      if (checkPreExisting(src, dest, conf)) return true;
+      if (doLog && i == 0) {
+        LOG.info("Waiting for the file " + dest + " (" + waitAttempts + " attempts, with "
+            + sleepInterval + "ms interval)");
+      }
+      try {
+        if (notifier != null) {
+          // The writing thread has given us an object to wait on.
+          synchronized (notifier) {
+            notifier.wait(sleepInterval);
+          }
+        } else {
+          // Some other process is probably writing the file. Just sleep.
+          Thread.sleep(sleepInterval);
+        }
+      } catch (InterruptedException interruptedException) {
+        throw new IOException(interruptedException);
+      }
+    }
+    return checkPreExisting(src, dest, conf); // One last check.
+  }
+
   /**
    * Creates and initializes a JobConf object that can be used to execute
    * the DAG. The configuration object will contain configurations from mapred-site
@@ -1201,9 +1239,6 @@ public class DagUtils {
    * @return instance of this class
    */
   public static DagUtils getInstance() {
-    if (instance == null) {
-      instance = new DagUtils();
-    }
     return instance;
   }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/bc0f7d9f/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java
index 1321b5f..0d9fa6d 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java
@@ -19,17 +19,20 @@
 package org.apache.hadoop.hive.ql.exec.tez;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
 
 import java.io.IOException;
 import java.net.URISyntaxException;
 
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.PriorityBlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Comparator;
@@ -101,17 +104,63 @@ public class TezSessionPoolManager {
   protected TezSessionPoolManager() {
   }
 
+  private void startNextSessionFromQueue() throws Exception {
+    HiveConf newConf = new HiveConf(initConf);
+    TezSessionPoolSession sessionState = defaultQueuePool.take();
+    boolean isUsable = sessionState.tryUse();
+    if (!isUsable) throw new IOException(sessionState + " is not usable at pool startup");
+    newConf.set("tez.queue.name", sessionState.getQueueName());
+    sessionState.open(newConf);
+    if (sessionState.returnAfterUse()) {
+      defaultQueuePool.put(sessionState);
+    }
+  }
+
   public void startPool() throws Exception {
     this.inited = true;
-    for (int i = 0; i < blockingQueueLength; i++) {
-      HiveConf newConf = new HiveConf(initConf);
-      TezSessionPoolSession sessionState = defaultQueuePool.take();
-      boolean isUsable = sessionState.tryUse();
-      if (!isUsable) throw new IOException(sessionState + " is not usable at pool startup");
-      newConf.set("tez.queue.name", sessionState.getQueueName());
-      sessionState.open(newConf);
-      if (sessionState.returnAfterUse()) {
-        defaultQueuePool.put(sessionState);
+    if (blockingQueueLength == 0) return;
+    int threadCount = Math.min(blockingQueueLength,
+        HiveConf.getIntVar(initConf, ConfVars.HIVE_SERVER2_TEZ_SESSION_MAX_INIT_THREADS));
+    Preconditions.checkArgument(threadCount > 0);
+    if (threadCount == 1) {
+      for (int i = 0; i < blockingQueueLength; i++) {
+        // The queue is FIFO, so if we cycle thru length items, we'd start each session once.
+        startNextSessionFromQueue();
+      }
+    } else {
+      final SessionState parentSessionState = SessionState.get();
+      // The queue is FIFO, so if we cycle thru length items, we'd start each session once.
+      final AtomicInteger remainingToStart = new AtomicInteger(blockingQueueLength);
+      // The runnable has no mutable state, so each thread can run the same thing.
+      final AtomicReference<Exception> firstError = new AtomicReference<>(null);
+      Runnable runnable = new Runnable() {
+        public void run() {
+          if (parentSessionState != null) {
+            SessionState.setCurrentSessionState(parentSessionState);
+          }
+          while (remainingToStart.decrementAndGet() >= 0) {
+            try {
+              startNextSessionFromQueue();
+            } catch (Exception e) {
+              if (!firstError.compareAndSet(null, e)) {
+                LOG.error("Failed to start session; ignoring due to previous error", e);
+              }
+            }
+          }
+        }
+      };
+      Thread[] threads = new Thread[threadCount - 1];
+      for (int i = 0; i < threads.length; ++i) {
+        threads[i] = new Thread(runnable, "Tez session init " + i);
+        threads[i].start();
+      }
+      runnable.run();
+      for (int i = 0; i < threads.length; ++i) {
+        threads[i].join();
+      }
+      Exception ex = firstError.get();
+      if (ex != null) {
+        throw ex;
       }
     }
     if (expirationThread != null) {
@@ -163,10 +212,11 @@ public class TezSessionPoolManager {
 
     this.initConf = conf;
     /*
-     *  with this the ordering of sessions in the queue will be (with 2 sessions 3 queues)
-     *  s1q1, s1q2, s1q3, s2q1, s2q2, s2q3 there by ensuring uniform distribution of
-     *  the sessions across queues at least to begin with. Then as sessions get freed up, the list
-     *  may change this ordering.
+     * In a single-threaded init case, with this the ordering of sessions in the queue will be
+     * (with 2 sessions 3 queues) s1q1, s1q2, s1q3, s2q1, s2q2, s2q3 there by ensuring uniform
+     * distribution of the sessions across queues at least to begin with. Then as sessions get
+     * freed up, the list may change this ordering.
+     * In a multi threaded init case it's a free for all.
      */
     blockingQueueLength = 0;
     for (int i = 0; i < numSessions; i++) {

http://git-wip-us.apache.org/repos/asf/hive/blob/bc0f7d9f/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionPool.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionPool.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionPool.java
index a2791a1..b03f063 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionPool.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionPool.java
@@ -30,6 +30,7 @@ import org.mockito.Mockito;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
 
 public class TestTezSessionPool {
 
@@ -77,29 +78,24 @@ public class TestTezSessionPool {
   @Test
   public void testSessionPoolGetInOrder() {
     try {
-      conf.setBoolVar(HiveConf.ConfVars.HIVE_SERVER2_ENABLE_DOAS, false);
-      conf.setVar(HiveConf.ConfVars.HIVE_SERVER2_TEZ_DEFAULT_QUEUES, "a,b,c");
-      conf.setIntVar(HiveConf.ConfVars.HIVE_SERVER2_TEZ_SESSIONS_PER_DEFAULT_QUEUE, 2);
+      conf.setBoolVar(ConfVars.HIVE_SERVER2_ENABLE_DOAS, false);
+      conf.setVar(ConfVars.HIVE_SERVER2_TEZ_DEFAULT_QUEUES, "a,b,c");
+      conf.setIntVar(ConfVars.HIVE_SERVER2_TEZ_SESSIONS_PER_DEFAULT_QUEUE, 2);
+      conf.setIntVar(ConfVars.HIVE_SERVER2_TEZ_SESSION_MAX_INIT_THREADS, 1);
 
       poolManager = new TestTezSessionPoolManager();
       poolManager.setupPool(conf);
       poolManager.startPool();
       TezSessionState sessionState = poolManager.getSession(null, conf, true, false);
-      if (sessionState.getQueueName().compareTo("a") != 0) {
-        fail();
-      }
+      assertEquals("a", sessionState.getQueueName());
       poolManager.returnSession(sessionState, false);
 
       sessionState = poolManager.getSession(null, conf, true, false);
-      if (sessionState.getQueueName().compareTo("b") != 0) {
-        fail();
-      }
+      assertEquals("b", sessionState.getQueueName());
       poolManager.returnSession(sessionState, false);
 
       sessionState = poolManager.getSession(null, conf, true, false);
-      if (sessionState.getQueueName().compareTo("c") != 0) {
-        fail();
-      }
+      assertEquals("c", sessionState.getQueueName());
       poolManager.returnSession(sessionState, false);
 
       sessionState = poolManager.getSession(null, conf, true, false);
@@ -115,6 +111,38 @@ public class TestTezSessionPool {
     }
   }
 
+
+  @Test
+  public void testSessionPoolThreads() {
+    // Make sure we get a correct number of sessions in each queue and that we don't crash.
+    try {
+      conf.setBoolVar(ConfVars.HIVE_SERVER2_ENABLE_DOAS, false);
+      conf.setVar(ConfVars.HIVE_SERVER2_TEZ_DEFAULT_QUEUES, "0,1,2");
+      conf.setIntVar(ConfVars.HIVE_SERVER2_TEZ_SESSIONS_PER_DEFAULT_QUEUE, 4);
+      conf.setIntVar(ConfVars.HIVE_SERVER2_TEZ_SESSION_MAX_INIT_THREADS, 16);
+
+      poolManager = new TestTezSessionPoolManager();
+      poolManager.setupPool(conf);
+      poolManager.startPool();
+      TezSessionState[] sessions = new TezSessionState[12];
+      int[] queueCounts = new int[3];
+      for (int i = 0; i < sessions.length; ++i) {
+        sessions[i] = poolManager.getSession(null, conf, true, false);
+        queueCounts[Integer.parseInt(sessions[i].getQueueName())] += 1;
+      }
+      for (int i = 0; i < queueCounts.length; ++i) {
+        assertEquals(4, queueCounts[i]);
+      }
+      for (int i = 0; i < sessions.length; ++i) {
+        poolManager.returnSession(sessions[i], false);
+      }
+
+    } catch (Exception e) {
+      e.printStackTrace();
+      fail();
+    }
+  }
+
   @Test
   public void testLlapSessionQueuing() {
     try {