You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by jd...@apache.org on 2018/04/09 23:15:40 UTC
hive git commit: HIVE-19014: utilize YARN-8028 (queue ACL check) in
Hive Tez session pool (Sergey Shelukhin, reviewed by Jason Dere)
Repository: hive
Updated Branches:
refs/heads/master 109c594a1 -> 76b696c26
HIVE-19014: utilize YARN-8028 (queue ACL check) in Hive Tez session pool (Sergey Shelukhin, reviewed by Jason Dere)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/76b696c2
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/76b696c2
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/76b696c2
Branch: refs/heads/master
Commit: 76b696c266122851e9704b5cf4d6ffd55efe0240
Parents: 109c594
Author: Jason Dere <jd...@hortonworks.com>
Authored: Mon Apr 9 16:15:09 2018 -0700
Committer: Jason Dere <jd...@hortonworks.com>
Committed: Mon Apr 9 16:15:09 2018 -0700
----------------------------------------------------------------------
.../org/apache/hadoop/hive/conf/HiveConf.java | 3 +
.../java/org/apache/hadoop/hive/ql/Driver.java | 1 +
.../hadoop/hive/ql/exec/FunctionTask.java | 2 +-
.../hive/ql/exec/tez/TezSessionPoolManager.java | 60 ++++++--
.../apache/hadoop/hive/ql/exec/tez/TezTask.java | 13 +-
.../hive/ql/exec/tez/YarnQueueHelper.java | 143 +++++++++++++++++++
.../hive/ql/parse/DDLSemanticAnalyzer.java | 1 +
.../ql/udf/generic/GenericUDFLoggedInUser.java | 1 +
.../apache/hive/service/server/HiveServer2.java | 9 +-
9 files changed, 214 insertions(+), 19 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/76b696c2/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 0627c35..17b2485 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -3011,6 +3011,9 @@ public class HiveConf extends Configuration {
"This flag is used in HiveServer2 to enable a user to use HiveServer2 without\n" +
"turning on Tez for HiveServer2. The user could potentially want to run queries\n" +
"over Tez without the pool of sessions."),
+ HIVE_SERVER2_TEZ_QUEUE_ACCESS_CHECK("hive.server2.tez.queue.access.check", false,
+ "Whether to check user access to explicitly specified YARN queues. " +
+ "yarn.resourcemanager.webapp.address must be configured to use this."),
HIVE_SERVER2_TEZ_SESSION_LIFETIME("hive.server2.tez.session.lifetime", "162h",
new TimeValidator(TimeUnit.HOURS),
"The lifetime of the Tez sessions launched by HS2 when default sessions are enabled.\n" +
http://git-wip-us.apache.org/repos/asf/hive/blob/76b696c2/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
index 79db006..a88453c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
@@ -2045,6 +2045,7 @@ public class Driver implements IDriver {
SessionState ss = SessionState.get();
+ // TODO: should this use getUserFromAuthenticator?
hookContext = new PrivateHookContext(plan, queryState, ctx.getPathToCS(), SessionState.get().getUserName(),
ss.getUserIpAddress(), InetAddress.getLocalHost().getHostAddress(), operationId,
ss.getSessionId(), Thread.currentThread().getName(), ss.isHiveServerQuery(), perfLogger, queryInfo, ctx);
http://git-wip-us.apache.org/repos/asf/hive/blob/76b696c2/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionTask.java
index 1de333e..a0a90a9 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionTask.java
@@ -185,7 +185,7 @@ public class FunctionTask extends Task<FunctionWork> {
funcName,
dbName,
className,
- SessionState.get().getUserName(),
+ SessionState.get().getUserName(), // TODO: should this use getUserFromAuthenticator?
PrincipalType.USER,
(int) (System.currentTimeMillis() / 1000),
org.apache.hadoop.hive.metastore.api.FunctionType.JAVA,
http://git-wip-us.apache.org/repos/asf/hive/blob/76b696c2/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java
index a051f90..2633390 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java
@@ -26,21 +26,23 @@ import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
+
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.metastore.api.WMFullResourcePlan;
import org.apache.hadoop.hive.metastore.api.WMTrigger;
import org.apache.hadoop.hive.ql.exec.tez.TezSessionPoolSession.Manager;
import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hadoop.hive.ql.wm.ExecutionTrigger;
import org.apache.hadoop.hive.ql.wm.SessionTriggerProvider;
import org.apache.hadoop.hive.ql.wm.Trigger;
import org.apache.hadoop.hive.ql.wm.TriggerActionHandler;
import org.apache.hadoop.hive.shims.Utils;
-import org.apache.hadoop.security.UserGroupInformation;
import org.apache.tez.dag.api.TezConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+
import com.google.common.annotations.VisibleForTesting;
/**
@@ -82,8 +84,9 @@ public class TezSessionPoolManager extends TezSessionPoolSession.AbstractTrigger
/** This is used to close non-default sessions, and also all sessions when stopping. */
private final List<TezSessionState> openSessions = new LinkedList<>();
private SessionTriggerProvider sessionTriggerProvider;
- private TriggerActionHandler triggerActionHandler;
+ private TriggerActionHandler<?> triggerActionHandler;
private TriggerValidatorRunnable triggerValidatorRunnable;
+ private YarnQueueHelper yarnQueueChecker;
/** Note: this is not thread-safe. */
public static TezSessionPoolManager getInstance() {
@@ -99,6 +102,9 @@ public class TezSessionPoolManager extends TezSessionPoolSession.AbstractTrigger
}
public void startPool(HiveConf conf, final WMFullResourcePlan resourcePlan) throws Exception {
+ if (restrictedConfig == null) { // Sanity check; restrictedConfig is always set in setup.
+ throw new AssertionError("setupPool or setupNonPool needs to be called first");
+ }
if (defaultSessionPool != null) {
defaultSessionPool.start();
}
@@ -108,7 +114,8 @@ public class TezSessionPoolManager extends TezSessionPoolSession.AbstractTrigger
initTriggers(conf);
if (resourcePlan != null) {
updateTriggers(resourcePlan);
- LOG.info("Updated tez session pool manager with active resource plan: {}", resourcePlan.getPlan().getName());
+ LOG.info("Updated tez session pool manager with active resource plan: {}",
+ resourcePlan.getPlan().getName());
}
}
@@ -159,10 +166,21 @@ public class TezSessionPoolManager extends TezSessionPoolSession.AbstractTrigger
});
}
+ setupNonPool(conf);
+
+ // Only creates the expiration tracker if expiration is configured.
+ expirationTracker = SessionExpirationTracker.create(conf, this);
+
+ // From this point on, session creation will wait for the default pool (if # of sessions > 0).
+ this.hasInitialSessions = numSessionsTotal > 0;
+ }
+
+ public void setupNonPool(HiveConf conf) {
+ this.initConf = conf;
numConcurrentLlapQueries = conf.getIntVar(ConfVars.HIVE_SERVER2_LLAP_CONCURRENT_QUERIES);
llapQueue = new Semaphore(numConcurrentLlapQueries, true);
- String queueAllowedStr = HiveConf.getVar(initConf,
+ String queueAllowedStr = HiveConf.getVar(conf,
ConfVars.HIVE_SERVER2_TEZ_SESSION_CUSTOM_QUEUE_ALLOWED);
try {
this.customQueueAllowed = CustomQueueAllowed.valueOf(queueAllowedStr.toUpperCase());
@@ -170,16 +188,12 @@ public class TezSessionPoolManager extends TezSessionPoolSession.AbstractTrigger
throw new RuntimeException("Invalid value '" + queueAllowedStr + "' for " +
ConfVars.HIVE_SERVER2_TEZ_SESSION_CUSTOM_QUEUE_ALLOWED.varname);
}
+ if (customQueueAllowed == CustomQueueAllowed.TRUE
+ && HiveConf.getBoolVar(conf, ConfVars.HIVE_SERVER2_TEZ_QUEUE_ACCESS_CHECK)) {
+ this.yarnQueueChecker = new YarnQueueHelper(conf);
+ }
restrictedConfig = new RestrictedConfigChecker(conf);
- // Only creates the expiration tracker if expiration is configured.
- expirationTracker = SessionExpirationTracker.create(conf, this);
-
- // From this point on, session creation will wait for the default pool (if # of sessions > 0).
- this.hasInitialSessions = numSessionsTotal > 0;
- if (!hasInitialSessions) {
- return;
- }
}
public void initTriggers(final HiveConf conf) {
@@ -219,7 +233,8 @@ public class TezSessionPoolManager extends TezSessionPoolSession.AbstractTrigger
boolean hasQueue = (queueName != null) && !queueName.isEmpty();
if (hasQueue) {
switch (customQueueAllowed) {
- case FALSE: throw new HiveException("Specifying " + TezConfiguration.TEZ_QUEUE_NAME + " is not allowed");
+ case FALSE: throw new HiveException("Specifying "
+ + TezConfiguration.TEZ_QUEUE_NAME + " is not allowed");
case IGNORE: {
LOG.warn("User has specified " + queueName + " queue; ignoring the setting");
queueName = null;
@@ -228,6 +243,20 @@ public class TezSessionPoolManager extends TezSessionPoolSession.AbstractTrigger
}
default: // All good.
}
+
+ if (yarnQueueChecker != null) {
+ SessionState ss = SessionState.get();
+ String userName = null;
+ if (ss != null) {
+ userName = ss.getAuthenticator() != null
+ ? ss.getAuthenticator().getUserName() : ss.getUserName();
+ }
+ if (userName == null) {
+ userName = Utils.getUGI().getShortUserName();
+ LOG.info("No session user set; using the UGI user " + userName);
+ }
+ yarnQueueChecker.checkQueueAccess(queueName, userName);
+ }
}
// Check the restricted configs that the users cannot set.
@@ -389,8 +418,9 @@ public class TezSessionPoolManager extends TezSessionPoolSession.AbstractTrigger
}
try {
- UserGroupInformation ugi = Utils.getUGI();
- String userName = ugi.getShortUserName();
+ // Note: this is not the calling user, but rather the user under which this session will
+ // actually run (which is a different under doAs=false). This seems to be intended.
+ String userName = Utils.getUGI().getShortUserName();
// TODO Will these checks work if some other user logs in. Isn't a doAs check required somewhere here as well.
// Should a doAs check happen here instead of after the user test.
// With HiveServer2 - who is the incoming user in terms of UGI (the hive user itself, or the user who actually submitted the query)
http://git-wip-us.apache.org/repos/asf/hive/blob/76b696c2/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
index a5f4cb7..84ae157 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
@@ -155,12 +155,12 @@ public class TezTask extends Task<TezWork> {
// We only need a username for UGI to use for groups; getGroups will fetch the groups
// based on Hadoop configuration, as documented at
// https://hadoop.apache.org/docs/r2.8.0/hadoop-project-dist/hadoop-common/GroupsMapping.html
- String userName = ss.getUserName();
+ String userName = getUserNameForGroups(ss);
List<String> groups = null;
if (userName == null) {
userName = "anonymous";
} else {
- groups = UserGroupInformation.createRemoteUser(ss.getUserName()).getGroups();
+ groups = UserGroupInformation.createRemoteUser(userName).getGroups();
}
MappingInput mi = new MappingInput(userName, groups,
ss.getHiveVariables().get("wmpool"), ss.getHiveVariables().get("wmapp"));
@@ -315,6 +315,15 @@ public class TezTask extends Task<TezWork> {
return rc;
}
+ private String getUserNameForGroups(SessionState ss) {
+ // This should be removed when authenticator and the 2-username mess is cleaned up.
+ if (ss.getAuthenticator() != null) {
+ String userName = ss.getAuthenticator().getUserName();
+ if (userName != null) return userName;
+ }
+ return ss.getUserName();
+ }
+
private void closeDagClientOnCancellation(DAGClient dagClient) {
try {
dagClient.tryKillDAG();
http://git-wip-us.apache.org/repos/asf/hive/blob/76b696c2/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/YarnQueueHelper.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/YarnQueueHelper.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/YarnQueueHelper.java
new file mode 100644
index 0000000..c9c859a
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/YarnQueueHelper.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.hadoop.hive.ql.exec.tez;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.HttpURLConnection;
+import java.net.URL;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.authentication.client.AuthenticatedURL;
+import org.apache.hadoop.security.authentication.client.AuthenticationException;
+import org.apache.http.HttpStatus;
+import org.json.JSONException;
+import org.json.JSONObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+
+public class YarnQueueHelper {
+ private final static Logger LOG = LoggerFactory.getLogger(YarnQueueHelper.class);
+ private static final String PERMISSION_PATH = "/ws/v1/cluster/queues/%s/access?user=%s";
+
+ private final String[] rmNodes;
+ private int lastKnownGoodUrl;
+
+ public YarnQueueHelper(HiveConf conf) {
+ rmNodes = conf.getTrimmedStrings("yarn.resourcemanager.webapp.address");
+ Preconditions.checkArgument((rmNodes != null && rmNodes.length > 0),
+ "yarn.resourcemanager.webapp.address must be set to enable queue access checks");
+ lastKnownGoodUrl = 0;
+ }
+
+ public void checkQueueAccess(String queueName, String userName) throws IOException {
+ String urlSuffix = String.format(PERMISSION_PATH, queueName, userName);
+ // TODO: if we ever use this endpoint for anything else, refactor cycling into a separate class.
+ int urlIx = lastKnownGoodUrl, lastUrlIx = ((urlIx == 0) ? rmNodes.length : urlIx) - 1;
+ Exception firstError = null;
+ while (true) {
+ String node = rmNodes[urlIx];
+ try {
+ String error = checkQueueAccessFromSingleRm("http://" + node + urlSuffix);
+ lastKnownGoodUrl = urlIx;
+ if (error == null) return; // null error message here means the user has access.
+ throw new HiveException(error.isEmpty()
+ ? (userName + " has no access to " + queueName) : error);
+ } catch (Exception ex) {
+ LOG.warn("Cannot check queue access against RM " + node, ex);
+ if (firstError == null) {
+ firstError = ex;
+ }
+ }
+ if (urlIx == lastUrlIx) {
+ throw new IOException("Cannot access any RM service; first error", firstError);
+ }
+ urlIx = (urlIx + 1) % rmNodes.length;
+ }
+ }
+
+ private String checkQueueAccessFromSingleRm(String urlString) throws IOException {
+ URL url = new URL(urlString);
+ HttpURLConnection connection = UserGroupInformation.isSecurityEnabled() ?
+ getSecureConnection(url) : (HttpURLConnection)url.openConnection();
+ int statusCode = connection.getResponseCode();
+ switch (statusCode) {
+ case HttpStatus.SC_OK: return processResponse(connection);
+ case HttpStatus.SC_FORBIDDEN: {
+ // Throw a special exception since it's usually a well-known misconfiguration.
+ throw new IOException(handleUnexpectedStatusCode(connection, statusCode, "check that the "
+ + "HiveServer2 principal is in the administrator list of the root YARN queue"));
+ }
+ default: throw new IOException(handleUnexpectedStatusCode(connection, statusCode, null));
+ }
+ }
+
+ private String processResponse(HttpURLConnection connection) throws IOException {
+ InputStream stream = connection.getInputStream();
+ if (stream == null) {
+ throw new IOException(handleUnexpectedStatusCode(
+ connection, HttpStatus.SC_OK, "No input on successful API call"));
+ }
+ String jsonStr = IOUtils.toString(stream);
+ try {
+ JSONObject obj = new JSONObject(jsonStr);
+ boolean result = obj.getBoolean("allowed");
+ if (result) return null;
+ String diag = obj.getString("diagnostics");
+ return diag == null ? "" : diag;
+ } catch (JSONException ex) {
+ LOG.error("Couldn't parse " + jsonStr, ex);
+ throw ex;
+ }
+
+ }
+
+ /** Gets the Hadoop kerberos secure connection (not an SSL connection). */
+ private HttpURLConnection getSecureConnection(URL url) throws IOException {
+ AuthenticatedURL.Token token = new AuthenticatedURL.Token();
+ try {
+ return new AuthenticatedURL().openConnection(url, token);
+ } catch (AuthenticationException e) {
+ throw new IOException(e);
+ }
+ }
+
+ public String handleUnexpectedStatusCode(
+ HttpURLConnection connection, int statusCode, String errorStr) throws IOException {
+ // We do no handle anything but OK for now. Again, we need a real client for this API.
+ // TODO: handle 401 and return a new connection? nothing for now
+ InputStream errorStream = connection.getErrorStream();
+ String error = "Received " + statusCode + (errorStr == null ? "" : (" (" + errorStr + ")"));
+ if (errorStream != null) {
+ error += ": " + IOUtils.toString(errorStream);
+ } else {
+ errorStream = connection.getInputStream();
+ if (errorStream != null) {
+ error += ": " + IOUtils.toString(errorStream);
+ }
+ }
+ return error;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/76b696c2/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java
index 9e66422..e029566 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java
@@ -3507,6 +3507,7 @@ public class DDLSemanticAnalyzer extends BaseSemanticAnalyzer {
cmd.append(")");
}
SessionState ss = SessionState.get();
+ // TODO: should this use getUserFromAuthenticator?
String uName = (ss == null? null: ss.getUserName());
Driver driver = new Driver(conf, uName, queryState.getLineageState());
int rc = driver.compile(cmd.toString(), false);
http://git-wip-us.apache.org/repos/asf/hive/blob/76b696c2/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFLoggedInUser.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFLoggedInUser.java b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFLoggedInUser.java
index 3ed793e..3d36e0f 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFLoggedInUser.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFLoggedInUser.java
@@ -45,6 +45,7 @@ public class GenericUDFLoggedInUser extends GenericUDF {
}
if (loggedInUser == null) {
+ // TODO: getUserFromAuthenticator?
String loggedInUserName = SessionState.get().getUserName();
if (loggedInUserName != null) {
loggedInUser = new Text(loggedInUserName);
http://git-wip-us.apache.org/repos/asf/hive/blob/76b696c2/service/src/java/org/apache/hive/service/server/HiveServer2.java
----------------------------------------------------------------------
diff --git a/service/src/java/org/apache/hive/service/server/HiveServer2.java b/service/src/java/org/apache/hive/service/server/HiveServer2.java
index 6308c5c..47f84b5 100644
--- a/service/src/java/org/apache/hive/service/server/HiveServer2.java
+++ b/service/src/java/org/apache/hive/service/server/HiveServer2.java
@@ -665,7 +665,12 @@ public class HiveServer2 extends CompositeService {
if (!activePassiveHA) {
LOG.info("HS2 interactive HA not enabled. Starting tez sessions..");
- startOrReconnectTezSessions();
+ try {
+ startOrReconnectTezSessions();
+ } catch (Exception e) {
+ LOG.error("Error starting Tez sessions: ", e);
+ throw new ServiceException(e);
+ }
} else {
LOG.info("HS2 interactive HA enabled. Tez sessions will be started/reconnected by the leader.");
}
@@ -738,6 +743,8 @@ public class HiveServer2 extends CompositeService {
tezSessionPoolManager = TezSessionPoolManager.getInstance();
if (hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_TEZ_INITIALIZE_DEFAULT_SESSIONS)) {
tezSessionPoolManager.setupPool(hiveConf);
+ } else {
+ tezSessionPoolManager.setupNonPool(hiveConf);
}
tezSessionPoolManager.startPool(hiveConf, resourcePlan);
LOG.info("Tez session pool manager initialized.");