You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2016/07/13 04:36:16 UTC
[04/10] hive git commit: HIVE-14111 : better concurrency handling for
TezSessionState - part I (Sergey Shelukhin, reviewed by Siddharth Seth)
HIVE-14111 : better concurrency handling for TezSessionState - part I (Sergey Shelukhin, reviewed by Siddharth Seth)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/09fb8f12
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/09fb8f12
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/09fb8f12
Branch: refs/heads/branch-2.1
Commit: 09fb8f127c067089a27f218cbe1dc680ee6b41e1
Parents: 605e5aa
Author: Sergey Shelukhin <se...@apache.org>
Authored: Tue Jul 12 21:05:14 2016 -0700
Committer: Sergey Shelukhin <se...@apache.org>
Committed: Tue Jul 12 21:06:23 2016 -0700
----------------------------------------------------------------------
.../org/apache/hadoop/hive/ql/QTestUtil.java | 2 +-
.../hive/ql/exec/tez/TezSessionPoolManager.java | 69 +++++-----
.../hive/ql/exec/tez/TezSessionState.java | 31 ++++-
.../apache/hadoop/hive/ql/exec/tez/TezTask.java | 135 ++++++++++---------
.../hadoop/hive/ql/session/SessionState.java | 17 ++-
.../hive/ql/exec/tez/TestTezSessionPool.java | 4 +-
.../cli/operation/MetadataOperation.java | 2 +-
.../hive/service/cli/operation/Operation.java | 4 +-
.../service/cli/operation/SQLOperation.java | 1 +
9 files changed, 155 insertions(+), 110 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/09fb8f12/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java
----------------------------------------------------------------------
diff --git a/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java b/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java
index fa68b75..655352d 100644
--- a/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java
+++ b/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java
@@ -981,8 +981,8 @@ public class QTestUtil {
&& (clusterType == MiniClusterType.tez || clusterType == MiniClusterType.llap)) {
// Copy the tezSessionState from the old CliSessionState.
tezSessionState = oldSs.getTezSession();
- ss.setTezSession(tezSessionState);
oldSs.setTezSession(null);
+ ss.setTezSession(tezSessionState);
oldSs.close();
}
http://git-wip-us.apache.org/repos/asf/hive/blob/09fb8f12/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java
index f8f3cad..917268f 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java
@@ -45,6 +45,7 @@ import java.util.Set;
import javax.security.auth.login.LoginException;
+import org.apache.tez.dag.api.TezConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.fs.Path;
@@ -82,7 +83,8 @@ public class TezSessionPoolManager {
private Semaphore llapQueue;
private HiveConf initConf = null;
- int numConcurrentLlapQueries = -1;
+ // Config settings.
+ private int numConcurrentLlapQueries = -1;
private long sessionLifetimeMs = 0;
private long sessionLifetimeJitterMs = 0;
/** A queue for initial sessions that have not been started yet. */
@@ -112,10 +114,11 @@ public class TezSessionPoolManager {
}
private void startInitialSession(TezSessionPoolSession sessionState) throws Exception {
- HiveConf newConf = new HiveConf(initConf);
+ HiveConf newConf = new HiveConf(initConf); // TODO Why is this configuration management not happening inside TezSessionPool.
+ // Makes no senses for it to be mixed up like this.
boolean isUsable = sessionState.tryUse();
if (!isUsable) throw new IOException(sessionState + " is not usable at pool startup");
- newConf.set("tez.queue.name", sessionState.getQueueName());
+ newConf.set(TezConfiguration.TEZ_QUEUE_NAME, sessionState.getQueueName());
sessionState.open(newConf);
if (sessionState.returnAfterUse()) {
defaultQueuePool.put(sessionState);
@@ -134,6 +137,7 @@ public class TezSessionPoolManager {
startInitialSession(session);
}
} else {
+ // TODO What is this doing now ?
final SessionState parentSessionState = SessionState.get();
// The runnable has no mutable state, so each thread can run the same thing.
final AtomicReference<Exception> firstError = new AtomicReference<>(null);
@@ -150,6 +154,7 @@ public class TezSessionPoolManager {
} catch (Exception e) {
if (!firstError.compareAndSet(null, e)) {
LOG.error("Failed to start session; ignoring due to previous error", e);
+ // TODO Why even continue after this. We're already in a state where things are messed up ?
}
}
}
@@ -248,8 +253,11 @@ public class TezSessionPoolManager {
}
}
+ // TODO Create and init session sets up queue, isDefault - but does not initialize the configuration
private TezSessionPoolSession createAndInitSession(String queue, boolean isDefault) {
TezSessionPoolSession sessionState = createSession(TezSessionState.makeSessionId());
+ // TODO When will the queue ever be null.
+ // Pass queue and default in as constructor parameters, and make them final.
if (queue != null) {
sessionState.setQueueName(queue);
}
@@ -266,6 +274,7 @@ public class TezSessionPoolManager {
throws Exception {
String queueName = conf.get("tez.queue.name");
+ // TODO Session re-use completely disabled for doAs=true. Always launches a new session.
boolean nonDefaultUser = conf.getBoolVar(HiveConf.ConfVars.HIVE_SERVER2_ENABLE_DOAS);
/*
@@ -333,7 +342,7 @@ public class TezSessionPoolManager {
public static void closeIfNotDefault(
TezSessionState tezSessionState, boolean keepTmpDir) throws Exception {
- LOG.info("Closing tez session default? " + tezSessionState.isDefault());
+ LOG.info("Closing tez session if not default: " + tezSessionState);
if (!tezSessionState.isDefault()) {
tezSessionState.close(keepTmpDir);
}
@@ -401,6 +410,11 @@ public class TezSessionPoolManager {
try {
UserGroupInformation ugi = Utils.getUGI();
String userName = ugi.getShortUserName();
+ // TODO Will these checks work if some other user logs in. Isn't a doAs check required somewhere here as well.
+ // Should a doAs check happen here instead of after the user test.
+ // With HiveServer2 - who is the incoming user in terms of UGI (the hive user itself, or the user who actually submitted the query)
+
+ // Working in the assumption that the user here will be the hive user if doAs = false, we'll make it past this false check.
LOG.info("The current user: " + userName + ", session user: " + session.getUser());
if (userName.equals(session.getUser()) == false) {
LOG.info("Different users incoming: " + userName + " existing: " + session.getUser());
@@ -412,34 +426,19 @@ public class TezSessionPoolManager {
boolean doAsEnabled = conf.getBoolVar(HiveConf.ConfVars.HIVE_SERVER2_ENABLE_DOAS);
// either variables will never be null because a default value is returned in case of absence
- if (doAsEnabled != conf.getBoolVar(HiveConf.ConfVars.HIVE_SERVER2_ENABLE_DOAS)) {
+ if (doAsEnabled != session.getConf().getBoolVar(HiveConf.ConfVars.HIVE_SERVER2_ENABLE_DOAS)) {
return false;
}
if (!session.isDefault()) {
String queueName = session.getQueueName();
- LOG.info("Current queue name is " + queueName + " incoming queue name is "
- + conf.get("tez.queue.name"));
- if (queueName == null) {
- if (conf.get("tez.queue.name") != null) {
- // queue names are different
- return false;
- } else {
- return true;
- }
- }
-
- if (!queueName.equals(conf.get("tez.queue.name"))) {
- // the String.equals method handles the case of conf not having the queue name as well.
- return false;
- }
+ String confQueueName = conf.get(TezConfiguration.TEZ_QUEUE_NAME);
+ LOG.info("Current queue name is " + queueName + " incoming queue name is " + confQueueName);
+ return (queueName == null) ? confQueueName == null : queueName.equals(confQueueName);
} else {
// this session should never be a default session unless something has messed up.
- throw new HiveException("Default queue should always be returned." +
- "Hence we should not be here.");
+ throw new HiveException("The pool session " + session + " should have been returned to the pool");
}
-
- return true;
}
public TezSessionState getSession(TezSessionState session, HiveConf conf, boolean doOpen,
@@ -447,6 +446,7 @@ public class TezSessionPoolManager {
if (llap && (this.numConcurrentLlapQueries > 0)) {
llapQueue.acquire(); // blocks if no more llap queries can be submitted.
}
+
if (canWorkWithSameSession(session, conf)) {
return session;
}
@@ -458,13 +458,16 @@ public class TezSessionPoolManager {
return getSession(conf, doOpen, forceCreate);
}
- public void closeAndOpen(TezSessionState sessionState, HiveConf conf,
+ /** Reopens the session that was found to not be running. */
+ public void reopenSession(TezSessionState sessionState, HiveConf conf,
String[] additionalFiles, boolean keepTmpDir) throws Exception {
HiveConf sessionConf = sessionState.getConf();
- if (sessionConf != null && sessionConf.get("tez.queue.name") != null) {
- conf.set("tez.queue.name", sessionConf.get("tez.queue.name"));
+ if (sessionConf != null && sessionConf.get(TezConfiguration.TEZ_QUEUE_NAME) != null) {
+ conf.set(TezConfiguration.TEZ_QUEUE_NAME, sessionConf.get(TezConfiguration.TEZ_QUEUE_NAME));
}
- closeIfNotDefault(sessionState, keepTmpDir);
+ // TODO: close basically resets the object to a bunch of nulls.
+ // We should ideally not reuse the object because it's pointless and error-prone.
+ sessionState.close(keepTmpDir); // Clean up stuff.
sessionState.open(conf, additionalFiles);
}
@@ -479,7 +482,8 @@ public class TezSessionPoolManager {
}
}
- private void closeAndReopen(TezSessionPoolSession oldSession) throws Exception {
+ /** Closes a running (expired) pool session and reopens it. */
+ private void closeAndReopenPoolSession(TezSessionPoolSession oldSession) throws Exception {
String queueName = oldSession.getQueueName();
HiveConf conf = oldSession.getConf();
Path scratchDir = oldSession.getTezScratchDir();
@@ -502,7 +506,7 @@ public class TezSessionPoolManager {
TezSessionPoolSession next = restartQueue.take();
LOG.info("Restarting the expired session [" + next + "]");
try {
- closeAndReopen(next);
+ closeAndReopenPoolSession(next);
} catch (InterruptedException ie) {
throw ie;
} catch (Exception e) {
@@ -573,7 +577,7 @@ public class TezSessionPoolManager {
* if it's time, the expiration is triggered; in that case, or if it was already triggered, the
* caller gets a different session. When the session is in use when it expires, the expiration
* thread ignores it and lets the return to the pool take care of the expiration.
- * */
+ */
@VisibleForTesting
static class TezSessionPoolSession extends TezSessionState {
private static final int STATE_NONE = 0, STATE_IN_USE = 1, STATE_EXPIRED = 2;
@@ -670,6 +674,7 @@ public class TezSessionPoolManager {
public boolean tryExpire(boolean isAsync) throws Exception {
if (expirationNs == null) return true;
if (!shouldExpire()) return false;
+ // Try to expire the session if it's not in use; if in use, bail.
while (true) {
if (sessionState.get() != STATE_NONE) return true; // returnAfterUse will take care of this
if (sessionState.compareAndSet(STATE_NONE, STATE_EXPIRED)) {
@@ -683,7 +688,7 @@ public class TezSessionPoolManager {
if (async) {
parent.restartQueue.add(this);
} else {
- parent.closeAndReopen(this);
+ parent.closeAndReopenPoolSession(this);
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/09fb8f12/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java
index d04cfa3..1009359 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java
@@ -39,6 +39,7 @@ import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicReference;
import javax.security.auth.login.LoginException;
@@ -115,6 +116,8 @@ public class TezSessionState {
private boolean defaultQueue = false;
private String user;
+ private AtomicReference<String> ownerThread = new AtomicReference<>(null);
+
private final Set<String> additionalFilesNotFromConf = new HashSet<String>();
private final Set<LocalResource> localizedResources = new HashSet<LocalResource>();
private boolean doAsEnabled;
@@ -129,7 +132,7 @@ public class TezSessionState {
public String toString() {
return "sessionId=" + sessionId + ", queueName=" + queueName + ", user=" + user
- + ", doAs=" + doAsEnabled + ", isOpen=" + isOpen();
+ + ", doAs=" + doAsEnabled + ", isOpen=" + isOpen() + ", isDefault=" + defaultQueue;
}
/**
@@ -226,12 +229,14 @@ public class TezSessionState {
boolean isAsync, LogHelper console, Path scratchDir) throws IOException, LoginException,
IllegalArgumentException, URISyntaxException, TezException {
this.conf = conf;
- this.queueName = conf.get("tez.queue.name");
+ // TODO Why is the queue name set again. It has already been setup via setQueueName. Do only one of the two.
+ this.queueName = conf.get(TezConfiguration.TEZ_QUEUE_NAME);
this.doAsEnabled = conf.getBoolVar(HiveConf.ConfVars.HIVE_SERVER2_ENABLE_DOAS);
- final boolean llapMode = "llap".equals(HiveConf.getVar(
+ final boolean llapMode = "llap".equalsIgnoreCase(HiveConf.getVar(
conf, HiveConf.ConfVars.HIVE_EXECUTION_MODE));
+ // TODO This - at least for the session pool - will always be the hive user. How does doAs above this affect things ?
UserGroupInformation ugi = Utils.getUGI();
user = ugi.getShortUserName();
LOG.info("User of session id " + sessionId + " is " + user);
@@ -285,6 +290,7 @@ public class TezSessionState {
llapCredentials = new Credentials();
llapCredentials.addToken(LlapTokenIdentifier.KIND_NAME, getLlapToken(user, tezConfig));
}
+ // TODO Change this to not serialize the entire Configuration - minor.
UserPayload servicePluginPayload = TezUtils.createUserPayloadFromConf(tezConfig);
// we need plugins to handle llap and uber mode
servicePluginsDescriptor = ServicePluginsDescriptor.create(true,
@@ -664,7 +670,7 @@ public class TezSessionState {
}
public void setDefault() {
- defaultQueue = true;
+ defaultQueue = true;
}
public boolean isDefault() {
@@ -686,4 +692,21 @@ public class TezSessionState {
public boolean getDoAsEnabled() {
return doAsEnabled;
}
+
+ /** Mark session as free for use from TezTask, for safety/debugging purposes. */
+ public void markFree() {
+ if (ownerThread.getAndSet(null) == null) throw new AssertionError("Not in use");
+ }
+
+ /** Mark session as being in use from TezTask, for safety/debugging purposes. */
+ public void markInUse() {
+ String newName = Thread.currentThread().getName();
+ do {
+ String oldName = ownerThread.get();
+ if (oldName != null) {
+ throw new AssertionError("Tez session is already in use from "
+ + oldName + "; cannot use from " + newName);
+ }
+ } while (!ownerThread.compareAndSet(null, newName));
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/09fb8f12/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
index 9e114c0..e4b69a5 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
@@ -130,81 +130,73 @@ public class TezTask extends Task<TezWork> {
// Need to remove this static hack. But this is the way currently to get a session.
SessionState ss = SessionState.get();
session = ss.getTezSession();
- session =
- TezSessionPoolManager.getInstance().getSession(session, conf, false,
- getWork().getLlapMode());
+ session = TezSessionPoolManager.getInstance().getSession(
+ session, conf, false, getWork().getLlapMode());
ss.setTezSession(session);
+ try {
+ // jobConf will hold all the configuration for hadoop, tez, and hive
+ JobConf jobConf = utils.createConfiguration(conf);
- // jobConf will hold all the configuration for hadoop, tez, and hive
- JobConf jobConf = utils.createConfiguration(conf);
+ // Get all user jars from work (e.g. input format stuff).
+ String[] inputOutputJars = work.configureJobConfAndExtractJars(jobConf);
- // Get all user jars from work (e.g. input format stuff).
- String[] inputOutputJars = work.configureJobConfAndExtractJars(jobConf);
+ // we will localize all the files (jars, plans, hashtables) to the
+ // scratch dir. let's create this and tmp first.
+ Path scratchDir = ctx.getMRScratchDir();
- // we will localize all the files (jars, plans, hashtables) to the
- // scratch dir. let's create this and tmp first.
- Path scratchDir = ctx.getMRScratchDir();
+ // create the tez tmp dir
+ scratchDir = utils.createTezDir(scratchDir, conf);
- // create the tez tmp dir
- scratchDir = utils.createTezDir(scratchDir, conf);
+ Map<String,LocalResource> inputOutputLocalResources =
+ getExtraLocalResources(jobConf, scratchDir, inputOutputJars);
- Map<String,LocalResource> inputOutputLocalResources =
- getExtraLocalResources(jobConf, scratchDir, inputOutputJars);
+ // Ensure the session is open and has the necessary local resources
+ updateSession(session, jobConf, scratchDir, inputOutputJars, inputOutputLocalResources);
- // Ensure the session is open and has the necessary local resources
- updateSession(session, jobConf, scratchDir, inputOutputJars, inputOutputLocalResources);
+ List<LocalResource> additionalLr = session.getLocalizedResources();
+ logResources(additionalLr);
- List<LocalResource> additionalLr = session.getLocalizedResources();
+ // unless already installed on all the cluster nodes, we'll have to
+ // localize hive-exec.jar as well.
+ LocalResource appJarLr = session.getAppJarLr();
- // log which resources we're adding (apart from the hive exec)
- if (LOG.isDebugEnabled()) {
- if (additionalLr == null || additionalLr.size() == 0) {
- LOG.debug("No local resources to process (other than hive-exec)");
- } else {
- for (LocalResource lr: additionalLr) {
- LOG.debug("Adding local resource: " + lr.getResource());
- }
- }
- }
+ // next we translate the TezWork to a Tez DAG
+ DAG dag = build(jobConf, work, scratchDir, appJarLr, additionalLr, ctx);
+ CallerContext callerContext = CallerContext.create(
+ "HIVE", queryPlan.getQueryId(),
+ "HIVE_QUERY_ID", queryPlan.getQueryStr());
+ dag.setCallerContext(callerContext);
- // unless already installed on all the cluster nodes, we'll have to
- // localize hive-exec.jar as well.
- LocalResource appJarLr = session.getAppJarLr();
-
- // next we translate the TezWork to a Tez DAG
- DAG dag = build(jobConf, work, scratchDir, appJarLr, additionalLr, ctx);
- CallerContext callerContext = CallerContext.create(
- "HIVE", queryPlan.getQueryId(),
- "HIVE_QUERY_ID", queryPlan.getQueryStr());
- dag.setCallerContext(callerContext);
-
- // Add the extra resources to the dag
- addExtraResourcesToDag(session, dag, inputOutputJars, inputOutputLocalResources);
-
- // submit will send the job to the cluster and start executing
- dagClient = submit(jobConf, dag, scratchDir, appJarLr, session,
- additionalLr, inputOutputJars, inputOutputLocalResources);
-
- // finally monitor will print progress until the job is done
- TezJobMonitor monitor = new TezJobMonitor(work.getWorkMap());
- rc = monitor.monitorExecution(dagClient, conf, dag, ctx);
- if (rc != 0) {
- this.setException(new HiveException(monitor.getDiagnostics()));
- }
+ // Add the extra resources to the dag
+ addExtraResourcesToDag(session, dag, inputOutputJars, inputOutputLocalResources);
- // fetch the counters
- try {
- Set<StatusGetOpts> statusGetOpts = EnumSet.of(StatusGetOpts.GET_COUNTERS);
- counters = dagClient.getDAGStatus(statusGetOpts).getDAGCounters();
- } catch (Exception err) {
- // Don't fail execution due to counters - just don't print summary info
- LOG.error("Failed to get counters: " + err, err);
- counters = null;
+ // submit will send the job to the cluster and start executing
+ dagClient = submit(jobConf, dag, scratchDir, appJarLr, session,
+ additionalLr, inputOutputJars, inputOutputLocalResources);
+
+ // finally monitor will print progress until the job is done
+ TezJobMonitor monitor = new TezJobMonitor(work.getWorkMap());
+ rc = monitor.monitorExecution(dagClient, conf, dag, ctx);
+ if (rc != 0) {
+ this.setException(new HiveException(monitor.getDiagnostics()));
+ }
+
+ // fetch the counters
+ try {
+ Set<StatusGetOpts> statusGetOpts = EnumSet.of(StatusGetOpts.GET_COUNTERS);
+ counters = dagClient.getDAGStatus(statusGetOpts).getDAGCounters();
+ } catch (Exception err) {
+ // Don't fail execution due to counters - just don't print summary info
+ LOG.error("Failed to get counters: " + err, err);
+ counters = null;
+ }
+ } finally {
+ // We return this to the pool even if it's unusable; reopen is supposed to handle this.
+ TezSessionPoolManager.getInstance().returnSession(session, getWork().getLlapMode());
}
- TezSessionPoolManager.getInstance().returnSession(session, getWork().getLlapMode());
if (LOG.isInfoEnabled() && counters != null
- && (conf.getBoolVar(conf, HiveConf.ConfVars.TEZ_EXEC_SUMMARY) ||
+ && (HiveConf.getBoolVar(conf, HiveConf.ConfVars.TEZ_EXEC_SUMMARY) ||
Utilities.isPerfOrAboveLogging(conf))) {
for (CounterGroup group: counters) {
LOG.info(group.getDisplayName() +":");
@@ -244,6 +236,18 @@ public class TezTask extends Task<TezWork> {
return rc;
}
+ private void logResources(List<LocalResource> additionalLr) {
+ // log which resources we're adding (apart from the hive exec)
+ if (!LOG.isDebugEnabled()) return;
+ if (additionalLr == null || additionalLr.size() == 0) {
+ LOG.debug("No local resources to process (other than hive-exec)");
+ } else {
+ for (LocalResource lr: additionalLr) {
+ LOG.debug("Adding local resource: " + lr.getResource());
+ }
+ }
+ }
+
/**
* Converted the list of jars into local resources
*/
@@ -270,6 +274,7 @@ public class TezTask extends Task<TezWork> {
.hasResources(inputOutputJars);
TezClient client = session.getSession();
+ // TODO null can also mean that this operation was interrupted. Should we really try to re-create the session in that case ?
if (client == null) {
// can happen if the user sets the tez flag after the session was
// established
@@ -448,8 +453,10 @@ public class TezTask extends Task<TezWork> {
console.printInfo("Tez session was closed. Reopening...");
// close the old one, but keep the tmp files around
- TezSessionPoolManager.getInstance().closeAndOpen(sessionState, this.conf, inputOutputJars,
- true);
+ // TODO Why is the session being create using a conf instance belonging to TezTask
+ // - instead of the session conf instance.
+ TezSessionPoolManager.getInstance().reopenSession(
+ sessionState, this.conf, inputOutputJars, true);
console.printInfo("Session re-established.");
dagClient = sessionState.getSession().submitDAG(dag);
@@ -459,7 +466,7 @@ public class TezTask extends Task<TezWork> {
try {
console.printInfo("Dag submit failed due to " + e.getMessage() + " stack trace: "
+ Arrays.toString(e.getStackTrace()) + " retrying...");
- TezSessionPoolManager.getInstance().closeAndOpen(sessionState, this.conf, inputOutputJars,
+ TezSessionPoolManager.getInstance().reopenSession(sessionState, this.conf, inputOutputJars,
true);
dagClient = sessionState.getSession().submitDAG(dag);
} catch (Exception retryException) {
http://git-wip-us.apache.org/repos/asf/hive/blob/09fb8f12/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java b/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java
index ce43f7d..e318b92 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java
@@ -583,7 +583,7 @@ public class SessionState {
try {
if (startSs.tezSessionState == null) {
- startSs.tezSessionState = new TezSessionState(startSs.getSessionId());
+ startSs.setTezSession(new TezSessionState(startSs.getSessionId()));
}
if (startSs.tezSessionState.isOpen()) {
return;
@@ -1487,12 +1487,12 @@ public class SessionState {
try {
if (tezSessionState != null) {
- TezSessionPoolManager.getInstance().closeIfNotDefault(tezSessionState, false);
+ TezSessionPoolManager.closeIfNotDefault(tezSessionState, false);
}
} catch (Exception e) {
LOG.info("Error closing tez session", e);
} finally {
- tezSessionState = null;
+ setTezSession(null);
}
try {
@@ -1577,8 +1577,17 @@ public class SessionState {
return tezSessionState;
}
+ /** Called from TezTask to attach a TezSession to use to the threadlocal. Ugly pattern... */
public void setTezSession(TezSessionState session) {
- this.tezSessionState = session;
+ if (tezSessionState == session) return; // The same object.
+ if (tezSessionState != null) {
+ tezSessionState.markFree();
+ tezSessionState = null;
+ }
+ if (session != null) {
+ session.markInUse();
+ }
+ tezSessionState = session;
}
public String getUserName() {
http://git-wip-us.apache.org/repos/asf/hive/blob/09fb8f12/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionPool.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionPool.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionPool.java
index 956fd29..ec90801 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionPool.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionPool.java
@@ -238,7 +238,7 @@ public class TestTezSessionPool {
TezSessionState session = Mockito.mock(TezSessionState.class);
Mockito.when(session.isDefault()).thenReturn(false);
- poolManager.closeAndOpen(session, conf, null, false);
+ poolManager.reopenSession(session, conf, null, false);
Mockito.verify(session).close(false);
String[] files = null;
@@ -261,7 +261,7 @@ public class TestTezSessionPool {
Mockito.when(session.isDefault()).thenReturn(false);
String[] extraResources = new String[] { "file:///tmp/foo.jar" };
- poolManager.closeAndOpen(session, conf, extraResources, false);
+ poolManager.reopenSession(session, conf, extraResources, false);
Mockito.verify(session).close(false);
Mockito.verify(session).open(conf, extraResources);
http://git-wip-us.apache.org/repos/asf/hive/blob/09fb8f12/service/src/java/org/apache/hive/service/cli/operation/MetadataOperation.java
----------------------------------------------------------------------
diff --git a/service/src/java/org/apache/hive/service/cli/operation/MetadataOperation.java b/service/src/java/org/apache/hive/service/cli/operation/MetadataOperation.java
index 44463c9..d216454 100644
--- a/service/src/java/org/apache/hive/service/cli/operation/MetadataOperation.java
+++ b/service/src/java/org/apache/hive/service/cli/operation/MetadataOperation.java
@@ -44,7 +44,7 @@ public abstract class MetadataOperation extends Operation {
private static final char SEARCH_STRING_ESCAPE = '\\';
protected MetadataOperation(HiveSession parentSession, OperationType opType) {
- super(parentSession, opType, false);
+ super(parentSession, opType);
setHasResultSet(true);
}
http://git-wip-us.apache.org/repos/asf/hive/blob/09fb8f12/service/src/java/org/apache/hive/service/cli/operation/Operation.java
----------------------------------------------------------------------
diff --git a/service/src/java/org/apache/hive/service/cli/operation/Operation.java b/service/src/java/org/apache/hive/service/cli/operation/Operation.java
index 4e56ea5..c3be295 100644
--- a/service/src/java/org/apache/hive/service/cli/operation/Operation.java
+++ b/service/src/java/org/apache/hive/service/cli/operation/Operation.java
@@ -85,8 +85,8 @@ public abstract class Operation {
protected static final EnumSet<FetchOrientation> DEFAULT_FETCH_ORIENTATION_SET =
EnumSet.of(FetchOrientation.FETCH_NEXT,FetchOrientation.FETCH_FIRST);
- protected Operation(HiveSession parentSession, OperationType opType, boolean runInBackground) {
- this(parentSession, null, opType, runInBackground);
+ protected Operation(HiveSession parentSession, OperationType opType) {
+ this(parentSession, null, opType, false);
}
protected Operation(HiveSession parentSession, Map<String, String> confOverlay, OperationType opType, boolean runInBackground) {
http://git-wip-us.apache.org/repos/asf/hive/blob/09fb8f12/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java
----------------------------------------------------------------------
diff --git a/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java b/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java
index 81e790c..57c954a 100644
--- a/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java
+++ b/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java
@@ -290,6 +290,7 @@ public class SQLOperation extends ExecuteStatementOperation {
@Override
public Object run() throws HiveSQLException {
Hive.set(parentHive);
+ // TODO: can this result in cross-thread reuse of session state?
SessionState.setCurrentSessionState(parentSessionState);
PerfLogger.setPerfLogger(parentPerfLogger);
// Set current OperationLog in this async thread for keeping on saving query log.