You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2016/01/27 23:47:15 UTC
[02/14] hive git commit: HIVE-12915 : Tez session pool has
concurrency issues during init (Sergey Shelukhin, reviewed by Siddharth Seth)
HIVE-12915 : Tez session pool has concurrency issues during init (Sergey Shelukhin, reviewed by Siddharth Seth)
Conflicts:
ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/c09cc1d2
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/c09cc1d2
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/c09cc1d2
Branch: refs/heads/branch-2.0
Commit: c09cc1d2d944d9b79a803cccd3f1f4570187e276
Parents: 36f8b75
Author: Sergey Shelukhin <se...@apache.org>
Authored: Wed Jan 27 14:13:40 2016 -0800
Committer: Sergey Shelukhin <se...@apache.org>
Committed: Wed Jan 27 14:17:21 2016 -0800
----------------------------------------------------------------------
.../org/apache/hadoop/hive/conf/HiveConf.java | 11 +++
.../hive/ql/exec/spark/HashTableLoader.java | 1 -
.../hive/ql/exec/tez/TezSessionPoolManager.java | 96 ++++++++++++--------
.../hive/ql/exec/tez/TestTezSessionPool.java | 2 +-
.../apache/hive/service/server/HiveServer2.java | 12 ++-
5 files changed, 79 insertions(+), 43 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/c09cc1d2/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 7a25ece..a11d1bc 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -3140,6 +3140,17 @@ public class HiveConf extends Configuration {
return conf.getTrimmed(var.varname, var.defaultStrVal);
}
+ public static String[] getTrimmedStringsVar(Configuration conf, ConfVars var) {
+ assert (var.valClass == String.class) : var.varname;
+ String[] result = conf.getTrimmedStrings(var.varname, (String[])null);
+ if (result != null) return result;
+ if (var.altName != null) {
+ result = conf.getTrimmedStrings(var.altName, (String[])null);
+ if (result != null) return result;
+ }
+ return org.apache.hadoop.util.StringUtils.getTrimmedStrings(var.defaultStrVal);
+ }
+
public static String getVar(Configuration conf, ConfVars var, String defaultVal) {
if (var.altName != null) {
return conf.get(var.varname, conf.get(var.altName, defaultVal));
http://git-wip-us.apache.org/repos/asf/hive/blob/c09cc1d2/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HashTableLoader.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HashTableLoader.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HashTableLoader.java
index 64474e6..1634f42 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HashTableLoader.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HashTableLoader.java
@@ -161,7 +161,6 @@ public class HashTableLoader implements org.apache.hadoop.hive.ql.exec.HashTable
}
MapJoinTableContainer mapJoinTable = SmallTableCache.get(path);
if (mapJoinTable == null) {
- // TODO#: HERE?
synchronized (path.toString().intern()) {
mapJoinTable = SmallTableCache.get(path);
if (mapJoinTable == null) {
http://git-wip-us.apache.org/repos/asf/hive/blob/c09cc1d2/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java
index 1321b5f..891d2a8 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java
@@ -26,16 +26,17 @@ import java.net.URISyntaxException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.Collection;
-import java.util.Collections;
import java.util.Comparator;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
+import java.util.Queue;
import java.util.Random;
import java.util.Set;
@@ -77,13 +78,18 @@ public class TezSessionPoolManager {
private Semaphore llapQueue;
- private int blockingQueueLength = -1;
private HiveConf initConf = null;
int numConcurrentLlapQueries = -1;
private long sessionLifetimeMs = 0;
private long sessionLifetimeJitterMs = 0;
-
- private boolean inited = false;
+ /** A queue for initial sessions that have not been started yet. */
+ private Queue<TezSessionPoolSession> initialSessions =
+ new ConcurrentLinkedQueue<TezSessionPoolSession>();
+ /**
+ * Indicates whether we should try to use defaultSessionPool.
+ * We assume that setupPool is either called before any activity, or not called at all.
+ */
+ private volatile boolean hasInitialSessions = false;
private static TezSessionPoolManager sessionPool = null;
@@ -101,18 +107,23 @@ public class TezSessionPoolManager {
protected TezSessionPoolManager() {
}
+ private void startInitialSession(TezSessionPoolSession sessionState) throws Exception {
+ HiveConf newConf = new HiveConf(initConf);
+ boolean isUsable = sessionState.tryUse();
+ if (!isUsable) throw new IOException(sessionState + " is not usable at pool startup");
+ newConf.set("tez.queue.name", sessionState.getQueueName());
+ sessionState.open(newConf);
+ if (sessionState.returnAfterUse()) {
+ defaultQueuePool.put(sessionState);
+ }
+ }
+
public void startPool() throws Exception {
- this.inited = true;
- for (int i = 0; i < blockingQueueLength; i++) {
- HiveConf newConf = new HiveConf(initConf);
- TezSessionPoolSession sessionState = defaultQueuePool.take();
- boolean isUsable = sessionState.tryUse();
- if (!isUsable) throw new IOException(sessionState + " is not usable at pool startup");
- newConf.set("tez.queue.name", sessionState.getQueueName());
- sessionState.open(newConf);
- if (sessionState.returnAfterUse()) {
- defaultQueuePool.put(sessionState);
- }
+ if (initialSessions.isEmpty()) return;
+ while (true) {
+ TezSessionPoolSession session = initialSessions.poll();
+ if (session == null) break;
+ startInitialSession(session);
}
if (expirationThread != null) {
expirationThread.start();
@@ -121,16 +132,32 @@ public class TezSessionPoolManager {
}
public void setupPool(HiveConf conf) throws InterruptedException {
- String defaultQueues = HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_SERVER2_TEZ_DEFAULT_QUEUES);
+ String[] defaultQueueList = HiveConf.getTrimmedStringsVar(
+ conf, HiveConf.ConfVars.HIVE_SERVER2_TEZ_DEFAULT_QUEUES);
+ int emptyNames = 0; // We don't create sessions for empty entries.
+ for (String queueName : defaultQueueList) {
+ if (queueName.isEmpty()) {
+ ++emptyNames;
+ }
+ }
int numSessions = conf.getIntVar(ConfVars.HIVE_SERVER2_TEZ_SESSIONS_PER_DEFAULT_QUEUE);
+ int numSessionsTotal = numSessions * (defaultQueueList.length - emptyNames);
+ if (numSessionsTotal > 0) {
+ defaultQueuePool = new ArrayBlockingQueue<TezSessionPoolSession>(numSessionsTotal);
+ }
+
numConcurrentLlapQueries = conf.getIntVar(ConfVars.HIVE_SERVER2_LLAP_CONCURRENT_QUERIES);
+ llapQueue = new Semaphore(numConcurrentLlapQueries, true);
+
+ this.initConf = conf;
+
sessionLifetimeMs = conf.getTimeVar(
ConfVars.HIVE_SERVER2_TEZ_SESSION_LIFETIME, TimeUnit.MILLISECONDS);
if (sessionLifetimeMs != 0) {
sessionLifetimeJitterMs = conf.getTimeVar(
ConfVars.HIVE_SERVER2_TEZ_SESSION_LIFETIME_JITTER, TimeUnit.MILLISECONDS);
if (LOG.isDebugEnabled()) {
- LOG.debug("Starting session expiration threads; session lifetime is "
+ LOG.debug("Session expiration is enabled; session lifetime is "
+ sessionLifetimeMs + " + [0, " + sessionLifetimeJitterMs + ") ms");
}
expirationQueue = new PriorityBlockingQueue<>(11, new Comparator<TezSessionPoolSession>() {
@@ -141,6 +168,11 @@ public class TezSessionPoolManager {
}
});
restartQueue = new LinkedBlockingQueue<>();
+ }
+ this.hasInitialSessions = numSessionsTotal > 0;
+ // From this point on, session creation will wait for the default pool (if # of sessions > 0).
+
+ if (sessionLifetimeMs != 0) {
expirationThread = new Thread(new Runnable() {
@Override
public void run() {
@@ -155,27 +187,18 @@ public class TezSessionPoolManager {
}, "TezSessionPool-cleanup");
}
- // the list of queues is a comma separated list.
- String defaultQueueList[] = defaultQueues.split(",");
- defaultQueuePool = new ArrayBlockingQueue<TezSessionPoolSession>(
- numSessions * defaultQueueList.length);
- llapQueue = new Semaphore(numConcurrentLlapQueries, true);
-
- this.initConf = conf;
/*
* with this the ordering of sessions in the queue will be (with 2 sessions 3 queues)
* s1q1, s1q2, s1q3, s2q1, s2q2, s2q3 there by ensuring uniform distribution of
* the sessions across queues at least to begin with. Then as sessions get freed up, the list
* may change this ordering.
*/
- blockingQueueLength = 0;
for (int i = 0; i < numSessions; i++) {
- for (String queue : defaultQueueList) {
- if (queue.length() == 0) {
+ for (String queueName : defaultQueueList) {
+ if (queueName.isEmpty()) {
continue;
}
- defaultQueuePool.put(createAndInitSession(queue, true));
- blockingQueueLength++;
+ initialSessions.add(createAndInitSession(queueName, true));
}
}
}
@@ -196,7 +219,6 @@ public class TezSessionPoolManager {
private TezSessionState getSession(HiveConf conf, boolean doOpen,
boolean forceCreate)
throws Exception {
-
String queueName = conf.get("tez.queue.name");
boolean nonDefaultUser = conf.getBoolVar(HiveConf.ConfVars.HIVE_SERVER2_ENABLE_DOAS);
@@ -207,12 +229,10 @@ public class TezSessionPoolManager {
* their own credentials. We expect that with the new security model, things will
* run as user hive in most cases.
*/
- if (forceCreate || !(this.inited)
- || ((queueName != null) && (!queueName.isEmpty()))
- || (nonDefaultUser) || (defaultQueuePool == null) || (blockingQueueLength <= 0)) {
+ if (forceCreate || nonDefaultUser || !hasInitialSessions
+ || ((queueName != null) && !queueName.isEmpty())) {
LOG.info("QueueName: " + queueName + " nonDefaultUser: " + nonDefaultUser +
- " defaultQueuePool: " + defaultQueuePool +
- " blockingQueueLength: " + blockingQueueLength);
+ " defaultQueuePool: " + defaultQueuePool + " hasInitialSessions: " + hasInitialSessions);
return getNewSessionState(conf, queueName, doOpen);
}
@@ -266,7 +286,7 @@ public class TezSessionPoolManager {
// session in the SessionState
}
- public void closeIfNotDefault(
+ public static void closeIfNotDefault(
TezSessionState tezSessionState, boolean keepTmpDir) throws Exception {
LOG.info("Closing tez session default? " + tezSessionState.isDefault());
if (!tezSessionState.isDefault()) {
@@ -275,7 +295,7 @@ public class TezSessionPoolManager {
}
public void stop() throws Exception {
- if ((sessionPool == null) || (this.inited == false)) {
+ if ((sessionPool == null) || !this.hasInitialSessions) {
return;
}
@@ -328,7 +348,7 @@ public class TezSessionPoolManager {
* sessions for e.g. when a CLI session is started. The CLI session could re-use the
* same tez session eliminating the latencies of new AM and containers.
*/
- private boolean canWorkWithSameSession(TezSessionState session, HiveConf conf)
+ private static boolean canWorkWithSameSession(TezSessionState session, HiveConf conf)
throws HiveException {
if (session == null || conf == null) {
return false;
http://git-wip-us.apache.org/repos/asf/hive/blob/c09cc1d2/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionPool.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionPool.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionPool.java
index a2791a1..a38f4ba 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionPool.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionPool.java
@@ -124,7 +124,7 @@ public class TestTezSessionPool {
poolManager.setupPool(conf);
poolManager.startPool();
} catch (Exception e) {
- e.printStackTrace();
+ LOG.error("Initialization error", e);
fail();
}
http://git-wip-us.apache.org/repos/asf/hive/blob/c09cc1d2/service/src/java/org/apache/hive/service/server/HiveServer2.java
----------------------------------------------------------------------
diff --git a/service/src/java/org/apache/hive/service/server/HiveServer2.java b/service/src/java/org/apache/hive/service/server/HiveServer2.java
index 2f55591..512a810 100644
--- a/service/src/java/org/apache/hive/service/server/HiveServer2.java
+++ b/service/src/java/org/apache/hive/service/server/HiveServer2.java
@@ -500,6 +500,13 @@ public class HiveServer2 extends CompositeService {
maxAttempts = hiveConf.getLongVar(HiveConf.ConfVars.HIVE_SERVER2_MAX_START_ATTEMPTS);
HiveServer2 server = null;
try {
+ // Initialize the pool before we start the server; don't start yet.
+ TezSessionPoolManager sessionPool = null;
+ if (hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_TEZ_INITIALIZE_DEFAULT_SESSIONS)) {
+ sessionPool = TezSessionPoolManager.getInstance();
+ sessionPool.setupPool(hiveConf);
+ }
+
// Cleanup the scratch dir before starting
ServerUtils.cleanUpScratchDir(hiveConf);
server = new HiveServer2();
@@ -522,9 +529,8 @@ public class HiveServer2 extends CompositeService {
if (hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_SUPPORT_DYNAMIC_SERVICE_DISCOVERY)) {
server.addServerInstanceToZooKeeper(hiveConf);
}
- if (hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_TEZ_INITIALIZE_DEFAULT_SESSIONS)) {
- TezSessionPoolManager sessionPool = TezSessionPoolManager.getInstance();
- sessionPool.setupPool(hiveConf);
+
+ if (sessionPool != null) {
sessionPool.startPool();
}