You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2016/01/23 01:04:41 UTC
hive git commit: HIVE-12528 : don't start HS2 Tez sessions in a
single thread (Sergey Shelukhin, reviewed by Siddharth Seth)
Repository: hive
Updated Branches:
refs/heads/master 00aba1b1b -> bc0f7d9f0
HIVE-12528 : don't start HS2 Tez sessions in a single thread (Sergey Shelukhin, reviewed by Siddharth Seth)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/bc0f7d9f
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/bc0f7d9f
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/bc0f7d9f
Branch: refs/heads/master
Commit: bc0f7d9f013fa538282ef9f0de12e1e1eee1a78f
Parents: 00aba1b
Author: Sergey Shelukhin <se...@apache.org>
Authored: Fri Jan 22 16:04:00 2016 -0800
Committer: Sergey Shelukhin <se...@apache.org>
Committed: Fri Jan 22 16:04:28 2016 -0800
----------------------------------------------------------------------
.../org/apache/hadoop/hive/conf/HiveConf.java | 4 +
.../hadoop/hive/ql/exec/tez/DagUtils.java | 99 +++++++++++++-------
.../hive/ql/exec/tez/TezSessionPoolManager.java | 76 ++++++++++++---
.../hive/ql/exec/tez/TestTezSessionPool.java | 52 +++++++---
4 files changed, 174 insertions(+), 57 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/bc0f7d9f/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 50a525c..fb62ae2 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -1921,6 +1921,10 @@ public class HiveConf extends Configuration {
HIVE_SERVER2_TEZ_SESSION_LIFETIME_JITTER("hive.server2.tez.session.lifetime.jitter", "3h",
new TimeValidator(TimeUnit.HOURS),
"The jitter for Tez session lifetime; prevents all the sessions from restarting at once."),
+ HIVE_SERVER2_TEZ_SESSION_MAX_INIT_THREADS("hive.server2.tez.sessions.init.threads", 16,
+ "If hive.server2.tez.initialize.default.sessions is enabled, the maximum number of\n" +
+ "threads to use to initialize the default sessions."),
+
// Operation log configuration
HIVE_SERVER2_LOGGING_OPERATION_ENABLED("hive.server2.logging.operation.enabled", true,
http://git-wip-us.apache.org/repos/asf/hive/blob/bc0f7d9f/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java
index e8864ae..aa44d5f 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java
@@ -17,6 +17,8 @@
*/
package org.apache.hadoop.hive.ql.exec.tez;
+import java.util.concurrent.ConcurrentHashMap;
+
import com.google.common.base.Function;
import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
@@ -138,13 +140,21 @@ public class DagUtils {
public static final String TEZ_TMP_DIR_KEY = "_hive_tez_tmp_dir";
private static final Logger LOG = LoggerFactory.getLogger(DagUtils.class.getName());
private static final String TEZ_DIR = "_tez_scratch_dir";
- private static DagUtils instance;
+ private static final DagUtils instance = new DagUtils();
// The merge file being currently processed.
public static final String TEZ_MERGE_CURRENT_MERGE_FILE_PREFIX =
"hive.tez.current.merge.file.prefix";
// "A comma separated list of work names used as prefix.
public static final String TEZ_MERGE_WORK_FILE_PREFIXES = "hive.tez.merge.file.prefixes";
+ /**
+ * Notifiers to synchronize resource localization across threads. If one thread is localizing
+ * a file, other threads can wait on the corresponding notifier object instead of just sleeping
+ * before re-checking HDFS. This is used just to avoid unnecesary waits; HDFS check still needs
+ * to be performed to make sure the resource is there and matches the expected file.
+ */
+ private final ConcurrentHashMap<String, Object> copyNotifiers = new ConcurrentHashMap<>();
+
private void addCredentials(MapWork mapWork, DAG dag) {
Set<String> paths = mapWork.getPathToAliases().keySet();
if (!paths.isEmpty()) {
@@ -968,6 +978,7 @@ public class DagUtils {
}
/**
+ * Localizes a resources. Should be thread-safe.
* @param src path to the source for the resource
* @param dest path in hdfs for the resource
* @param type local resource type (File/Archive)
@@ -975,51 +986,78 @@ public class DagUtils {
* @return localresource from tez localization.
* @throws IOException when any file system related calls fails.
*/
- public LocalResource localizeResource(Path src, Path dest, LocalResourceType type, Configuration conf)
- throws IOException {
+ public LocalResource localizeResource(
+ Path src, Path dest, LocalResourceType type, Configuration conf) throws IOException {
FileSystem destFS = dest.getFileSystem(conf);
-
- if (src != null && checkPreExisting(src, dest, conf) == false) {
+ if (src != null && !checkPreExisting(src, dest, conf)) {
// copy the src to the destination and create local resource.
// do not overwrite.
- LOG.info("Localizing resource because it does not exist: " + src + " to dest: " + dest);
+ String srcStr = src.toString();
+ LOG.info("Localizing resource because it does not exist: " + srcStr + " to dest: " + dest);
+ Object notifierNew = new Object(),
+ notifierOld = copyNotifiers.putIfAbsent(srcStr, notifierNew),
+ notifier = (notifierOld == null) ? notifierNew : notifierOld;
+ // To avoid timing issues with notifications (and given that HDFS check is anyway the
+ // authoritative one), don't wait infinitely for the notifier, just wait a little bit
+ // and check HDFS before and after.
+ if (notifierOld != null
+ && checkOrWaitForTheFile(src, dest, conf, notifierOld, 1, 150, false)) {
+ return createLocalResource(destFS, dest, type, LocalResourceVisibility.PRIVATE);
+ }
try {
destFS.copyFromLocalFile(false, false, src, dest);
+ synchronized (notifier) {
+ notifier.notifyAll(); // Notify if we have successfully copied the file.
+ }
+ copyNotifiers.remove(srcStr, notifier);
} catch (IOException e) {
- LOG.info("Looks like another thread is writing the same file will wait.");
- int waitAttempts =
- conf.getInt(HiveConf.ConfVars.HIVE_LOCALIZE_RESOURCE_NUM_WAIT_ATTEMPTS.varname,
- HiveConf.ConfVars.HIVE_LOCALIZE_RESOURCE_NUM_WAIT_ATTEMPTS.defaultIntVal);
+ LOG.info("Looks like another thread or process is writing the same file");
+ int waitAttempts = HiveConf.getIntVar(
+ conf, ConfVars.HIVE_LOCALIZE_RESOURCE_NUM_WAIT_ATTEMPTS);
long sleepInterval = HiveConf.getTimeVar(
- conf, HiveConf.ConfVars.HIVE_LOCALIZE_RESOURCE_WAIT_INTERVAL,
- TimeUnit.MILLISECONDS);
- LOG.info("Number of wait attempts: " + waitAttempts + ". Wait interval: "
- + sleepInterval);
- boolean found = false;
- for (int i = 0; i < waitAttempts; i++) {
- if (!checkPreExisting(src, dest, conf)) {
- try {
- Thread.sleep(sleepInterval);
- } catch (InterruptedException interruptedException) {
- throw new IOException(interruptedException);
- }
- } else {
- found = true;
- break;
- }
- }
- if (!found) {
+ conf, HiveConf.ConfVars.HIVE_LOCALIZE_RESOURCE_WAIT_INTERVAL, TimeUnit.MILLISECONDS);
+ // Only log on the first wait, and check after wait on the last iteration.
+ if (!checkOrWaitForTheFile(
+ src, dest, conf, notifierOld, waitAttempts, sleepInterval, true)) {
LOG.error("Could not find the jar that was being uploaded");
throw new IOException("Previous writer likely failed to write " + dest +
". Failing because I am unlikely to write too.");
}
+ } finally {
+ if (notifier == notifierNew) {
+ copyNotifiers.remove(srcStr, notifierNew);
+ }
}
}
-
return createLocalResource(destFS, dest, type,
LocalResourceVisibility.PRIVATE);
}
+ public boolean checkOrWaitForTheFile(Path src, Path dest, Configuration conf, Object notifier,
+ int waitAttempts, long sleepInterval, boolean doLog) throws IOException {
+ for (int i = 0; i < waitAttempts; i++) {
+ if (checkPreExisting(src, dest, conf)) return true;
+ if (doLog && i == 0) {
+ LOG.info("Waiting for the file " + dest + " (" + waitAttempts + " attempts, with "
+ + sleepInterval + "ms interval)");
+ }
+ try {
+ if (notifier != null) {
+ // The writing thread has given us an object to wait on.
+ synchronized (notifier) {
+ notifier.wait(sleepInterval);
+ }
+ } else {
+ // Some other process is probably writing the file. Just sleep.
+ Thread.sleep(sleepInterval);
+ }
+ } catch (InterruptedException interruptedException) {
+ throw new IOException(interruptedException);
+ }
+ }
+ return checkPreExisting(src, dest, conf); // One last check.
+ }
+
/**
* Creates and initializes a JobConf object that can be used to execute
* the DAG. The configuration object will contain configurations from mapred-site
@@ -1201,9 +1239,6 @@ public class DagUtils {
* @return instance of this class
*/
public static DagUtils getInstance() {
- if (instance == null) {
- instance = new DagUtils();
- }
return instance;
}
http://git-wip-us.apache.org/repos/asf/hive/blob/bc0f7d9f/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java
index 1321b5f..0d9fa6d 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java
@@ -19,17 +19,20 @@
package org.apache.hadoop.hive.ql.exec.tez;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
import java.io.IOException;
import java.net.URISyntaxException;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
@@ -101,17 +104,63 @@ public class TezSessionPoolManager {
protected TezSessionPoolManager() {
}
+ private void startNextSessionFromQueue() throws Exception {
+ HiveConf newConf = new HiveConf(initConf);
+ TezSessionPoolSession sessionState = defaultQueuePool.take();
+ boolean isUsable = sessionState.tryUse();
+ if (!isUsable) throw new IOException(sessionState + " is not usable at pool startup");
+ newConf.set("tez.queue.name", sessionState.getQueueName());
+ sessionState.open(newConf);
+ if (sessionState.returnAfterUse()) {
+ defaultQueuePool.put(sessionState);
+ }
+ }
+
public void startPool() throws Exception {
this.inited = true;
- for (int i = 0; i < blockingQueueLength; i++) {
- HiveConf newConf = new HiveConf(initConf);
- TezSessionPoolSession sessionState = defaultQueuePool.take();
- boolean isUsable = sessionState.tryUse();
- if (!isUsable) throw new IOException(sessionState + " is not usable at pool startup");
- newConf.set("tez.queue.name", sessionState.getQueueName());
- sessionState.open(newConf);
- if (sessionState.returnAfterUse()) {
- defaultQueuePool.put(sessionState);
+ if (blockingQueueLength == 0) return;
+ int threadCount = Math.min(blockingQueueLength,
+ HiveConf.getIntVar(initConf, ConfVars.HIVE_SERVER2_TEZ_SESSION_MAX_INIT_THREADS));
+ Preconditions.checkArgument(threadCount > 0);
+ if (threadCount == 1) {
+ for (int i = 0; i < blockingQueueLength; i++) {
+ // The queue is FIFO, so if we cycle thru length items, we'd start each session once.
+ startNextSessionFromQueue();
+ }
+ } else {
+ final SessionState parentSessionState = SessionState.get();
+ // The queue is FIFO, so if we cycle thru length items, we'd start each session once.
+ final AtomicInteger remainingToStart = new AtomicInteger(blockingQueueLength);
+ // The runnable has no mutable state, so each thread can run the same thing.
+ final AtomicReference<Exception> firstError = new AtomicReference<>(null);
+ Runnable runnable = new Runnable() {
+ public void run() {
+ if (parentSessionState != null) {
+ SessionState.setCurrentSessionState(parentSessionState);
+ }
+ while (remainingToStart.decrementAndGet() >= 0) {
+ try {
+ startNextSessionFromQueue();
+ } catch (Exception e) {
+ if (!firstError.compareAndSet(null, e)) {
+ LOG.error("Failed to start session; ignoring due to previous error", e);
+ }
+ }
+ }
+ }
+ };
+ Thread[] threads = new Thread[threadCount - 1];
+ for (int i = 0; i < threads.length; ++i) {
+ threads[i] = new Thread(runnable, "Tez session init " + i);
+ threads[i].start();
+ }
+ runnable.run();
+ for (int i = 0; i < threads.length; ++i) {
+ threads[i].join();
+ }
+ Exception ex = firstError.get();
+ if (ex != null) {
+ throw ex;
}
}
if (expirationThread != null) {
@@ -163,10 +212,11 @@ public class TezSessionPoolManager {
this.initConf = conf;
/*
- * with this the ordering of sessions in the queue will be (with 2 sessions 3 queues)
- * s1q1, s1q2, s1q3, s2q1, s2q2, s2q3 there by ensuring uniform distribution of
- * the sessions across queues at least to begin with. Then as sessions get freed up, the list
- * may change this ordering.
+ * In a single-threaded init case, with this the ordering of sessions in the queue will be
+ * (with 2 sessions 3 queues) s1q1, s1q2, s1q3, s2q1, s2q2, s2q3 there by ensuring uniform
+ * distribution of the sessions across queues at least to begin with. Then as sessions get
+ * freed up, the list may change this ordering.
+ * In a multi threaded init case it's a free for all.
*/
blockingQueueLength = 0;
for (int i = 0; i < numSessions; i++) {
http://git-wip-us.apache.org/repos/asf/hive/blob/bc0f7d9f/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionPool.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionPool.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionPool.java
index a2791a1..b03f063 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionPool.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionPool.java
@@ -30,6 +30,7 @@ import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
public class TestTezSessionPool {
@@ -77,29 +78,24 @@ public class TestTezSessionPool {
@Test
public void testSessionPoolGetInOrder() {
try {
- conf.setBoolVar(HiveConf.ConfVars.HIVE_SERVER2_ENABLE_DOAS, false);
- conf.setVar(HiveConf.ConfVars.HIVE_SERVER2_TEZ_DEFAULT_QUEUES, "a,b,c");
- conf.setIntVar(HiveConf.ConfVars.HIVE_SERVER2_TEZ_SESSIONS_PER_DEFAULT_QUEUE, 2);
+ conf.setBoolVar(ConfVars.HIVE_SERVER2_ENABLE_DOAS, false);
+ conf.setVar(ConfVars.HIVE_SERVER2_TEZ_DEFAULT_QUEUES, "a,b,c");
+ conf.setIntVar(ConfVars.HIVE_SERVER2_TEZ_SESSIONS_PER_DEFAULT_QUEUE, 2);
+ conf.setIntVar(ConfVars.HIVE_SERVER2_TEZ_SESSION_MAX_INIT_THREADS, 1);
poolManager = new TestTezSessionPoolManager();
poolManager.setupPool(conf);
poolManager.startPool();
TezSessionState sessionState = poolManager.getSession(null, conf, true, false);
- if (sessionState.getQueueName().compareTo("a") != 0) {
- fail();
- }
+ assertEquals("a", sessionState.getQueueName());
poolManager.returnSession(sessionState, false);
sessionState = poolManager.getSession(null, conf, true, false);
- if (sessionState.getQueueName().compareTo("b") != 0) {
- fail();
- }
+ assertEquals("b", sessionState.getQueueName());
poolManager.returnSession(sessionState, false);
sessionState = poolManager.getSession(null, conf, true, false);
- if (sessionState.getQueueName().compareTo("c") != 0) {
- fail();
- }
+ assertEquals("c", sessionState.getQueueName());
poolManager.returnSession(sessionState, false);
sessionState = poolManager.getSession(null, conf, true, false);
@@ -115,6 +111,38 @@ public class TestTezSessionPool {
}
}
+
+ @Test
+ public void testSessionPoolThreads() {
+ // Make sure we get a correct number of sessions in each queue and that we don't crash.
+ try {
+ conf.setBoolVar(ConfVars.HIVE_SERVER2_ENABLE_DOAS, false);
+ conf.setVar(ConfVars.HIVE_SERVER2_TEZ_DEFAULT_QUEUES, "0,1,2");
+ conf.setIntVar(ConfVars.HIVE_SERVER2_TEZ_SESSIONS_PER_DEFAULT_QUEUE, 4);
+ conf.setIntVar(ConfVars.HIVE_SERVER2_TEZ_SESSION_MAX_INIT_THREADS, 16);
+
+ poolManager = new TestTezSessionPoolManager();
+ poolManager.setupPool(conf);
+ poolManager.startPool();
+ TezSessionState[] sessions = new TezSessionState[12];
+ int[] queueCounts = new int[3];
+ for (int i = 0; i < sessions.length; ++i) {
+ sessions[i] = poolManager.getSession(null, conf, true, false);
+ queueCounts[Integer.parseInt(sessions[i].getQueueName())] += 1;
+ }
+ for (int i = 0; i < queueCounts.length; ++i) {
+ assertEquals(4, queueCounts[i]);
+ }
+ for (int i = 0; i < sessions.length; ++i) {
+ poolManager.returnSession(sessions[i], false);
+ }
+
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail();
+ }
+ }
+
@Test
public void testLlapSessionQueuing() {
try {