You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2015/10/08 21:35:59 UTC
[4/9] hive git commit: HIVE-11969 : start Tez session in background
when starting CLI (Sergey Shelukhin, reviewed by Gopal V)
HIVE-11969 : start Tez session in background when starting CLI (Sergey Shelukhin, reviewed by Gopal V)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/7f9023ea
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/7f9023ea
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/7f9023ea
Branch: refs/heads/llap
Commit: 7f9023ea0323821626f17e30d04b5acffb1d3048
Parents: 556877c
Author: Sergey Shelukhin <se...@apache.org>
Authored: Wed Oct 7 13:55:36 2015 -0700
Committer: Sergey Shelukhin <se...@apache.org>
Committed: Wed Oct 7 13:55:36 2015 -0700
----------------------------------------------------------------------
.../org/apache/hadoop/hive/cli/CliDriver.java | 9 +-
.../org/apache/hadoop/hive/conf/HiveConf.java | 3 +
.../hive/ql/exec/tez/TezSessionState.java | 204 +++++++++++++++----
.../apache/hadoop/hive/ql/exec/tez/TezTask.java | 6 +-
.../hadoop/hive/ql/session/SessionState.java | 52 +++--
.../hadoop/hive/ql/exec/tez/TestTezTask.java | 2 +
6 files changed, 222 insertions(+), 54 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/7f9023ea/cli/src/java/org/apache/hadoop/hive/cli/CliDriver.java
----------------------------------------------------------------------
diff --git a/cli/src/java/org/apache/hadoop/hive/cli/CliDriver.java b/cli/src/java/org/apache/hadoop/hive/cli/CliDriver.java
index 4b52578..3a80f99 100644
--- a/cli/src/java/org/apache/hadoop/hive/cli/CliDriver.java
+++ b/cli/src/java/org/apache/hadoop/hive/cli/CliDriver.java
@@ -64,6 +64,7 @@ import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveVariableSource;
import org.apache.hadoop.hive.conf.Validator;
import org.apache.hadoop.hive.conf.VariableSubstitution;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.ql.CommandNeedRetryException;
import org.apache.hadoop.hive.ql.Driver;
@@ -690,7 +691,13 @@ public class CliDriver {
}).substitute(conf, prompt);
prompt2 = spacesForString(prompt);
- SessionState.start(ss);
+ if (HiveConf.getBoolVar(conf, ConfVars.HIVE_CLI_TEZ_SESSION_ASYNC)) {
+ // Start the session in a fire-and-forget manner. When the asynchronously initialized parts of
+ // the session are needed, the corresponding getters and other methods will wait as needed.
+ SessionState.beginStart(ss, console);
+ } else {
+ SessionState.start(ss);
+ }
// execute cli driver work
try {
http://git-wip-us.apache.org/repos/asf/hive/blob/7f9023ea/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 54a529e..bf48f69 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -1712,6 +1712,9 @@ public class HiveConf extends Configuration {
HIVE_CLI_PRINT_HEADER("hive.cli.print.header", false, "Whether to print the names of the columns in query output."),
+ HIVE_CLI_TEZ_SESSION_ASYNC("hive.cli.tez.session.async", true, "Whether to start Tez\n" +
+ "session in background when running CLI with Tez, allowing CLI to be available earlier."),
+
HIVE_ERROR_ON_EMPTY_PARTITION("hive.error.on.empty.partition", false,
"Whether to throw an exception if dynamic partition insert generates empty results."),
http://git-wip-us.apache.org/repos/asf/hive/blob/7f9023ea/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java
index 568ebbe..6ed6421 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java
@@ -23,14 +23,19 @@ import java.io.IOException;
import java.io.InputStream;
import java.net.URISyntaxException;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
-import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.FutureTask;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import javax.security.auth.login.LoginException;
@@ -46,7 +51,7 @@ import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.session.SessionState;
-import org.apache.hadoop.hive.shims.ShimLoader;
+import org.apache.hadoop.hive.ql.session.SessionState.LogHelper;
import org.apache.hadoop.hive.shims.Utils;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.LocalResource;
@@ -70,6 +75,9 @@ public class TezSessionState {
private Path tezScratchDir;
private LocalResource appJarLr;
private TezClient session;
+ private Future<TezClient> sessionFuture;
+ /** Console used for user feedback during async session opening. */
+ private LogHelper console;
private String sessionId;
private final DagUtils utils;
private String queueName;
@@ -97,13 +105,40 @@ public class TezSessionState {
this.sessionId = sessionId;
}
- /**
- * Returns whether a session has been established
- */
+ public boolean isOpening() {
+ if (session != null || sessionFuture == null) return false;
+ try {
+ session = sessionFuture.get(0, TimeUnit.NANOSECONDS);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ return false;
+ } catch (ExecutionException e) {
+ throw new RuntimeException(e);
+ } catch (CancellationException e) {
+ return false;
+ } catch (TimeoutException e) {
+ return true;
+ }
+ return false;
+ }
+
public boolean isOpen() {
- return session != null;
+ if (session != null) return true;
+ if (sessionFuture == null) return false;
+ try {
+ session = sessionFuture.get(0, TimeUnit.NANOSECONDS);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ return false;
+ } catch (ExecutionException e) {
+ throw new RuntimeException(e);
+ } catch (TimeoutException | CancellationException e) {
+ return false;
+ }
+ return true;
}
+
/**
* Get all open sessions. Only used to clean up at shutdown.
* @return List<TezSessionState>
@@ -124,9 +159,21 @@ public class TezSessionState {
* @throws URISyntaxException
* @throws LoginException
* @throws TezException
+ * @throws InterruptedException
*/
public void open(HiveConf conf, String[] additionalFiles)
throws IOException, LoginException, IllegalArgumentException, URISyntaxException, TezException {
+ openInternal(conf, additionalFiles, false, null);
+ }
+
+ public void beginOpen(HiveConf conf, String[] additionalFiles, LogHelper console)
+ throws IOException, LoginException, IllegalArgumentException, URISyntaxException, TezException {
+ openInternal(conf, additionalFiles, true, console);
+ }
+
+ private void openInternal(
+ final HiveConf conf, String[] additionalFiles, boolean isAsync, LogHelper console)
+ throws IOException, LoginException, IllegalArgumentException, URISyntaxException, TezException {
this.conf = conf;
this.queueName = conf.get("tez.queue.name");
this.doAsEnabled = conf.getBoolVar(HiveConf.ConfVars.HIVE_SERVER2_ENABLE_DOAS);
@@ -152,7 +199,7 @@ public class TezSessionState {
appJarLr = createJarLocalResource(utils.getExecJarPathLocal());
// configuration for the application master
- Map<String, LocalResource> commonLocalResources = new HashMap<String, LocalResource>();
+ final Map<String, LocalResource> commonLocalResources = new HashMap<String, LocalResource>();
commonLocalResources.put(utils.getBaseName(appJarLr), appJarLr);
for (LocalResource lr : localizedResources) {
commonLocalResources.put(utils.getBaseName(lr), lr);
@@ -164,7 +211,7 @@ public class TezSessionState {
// and finally we're ready to create and start the session
// generate basic tez config
- TezConfiguration tezConfig = new TezConfiguration(conf);
+ final TezConfiguration tezConfig = new TezConfiguration(conf);
tezConfig.set(TezConfiguration.TEZ_AM_STAGING_DIR, tezScratchDir.toUri().toString());
Utilities.stripHivePasswordDetails(tezConfig);
@@ -176,37 +223,85 @@ public class TezSessionState {
tezConfig.setInt(TezConfiguration.TEZ_AM_SESSION_MIN_HELD_CONTAINERS, n);
}
- session = TezClient.create("HIVE-" + sessionId, tezConfig, true,
+ final TezClient session = TezClient.create("HIVE-" + sessionId, tezConfig, true,
commonLocalResources, null);
LOG.info("Opening new Tez Session (id: " + sessionId
+ ", scratch dir: " + tezScratchDir + ")");
TezJobMonitor.initShutdownHook();
- session.start();
+ if (!isAsync) {
+ startSessionAndContainers(session, conf, commonLocalResources, tezConfig, false);
+ this.session = session;
+ } else {
+ FutureTask<TezClient> sessionFuture = new FutureTask<>(new Callable<TezClient>() {
+ @Override
+ public TezClient call() throws Exception {
+ return startSessionAndContainers(session, conf, commonLocalResources, tezConfig, true);
+ }
+ });
+ new Thread(sessionFuture, "Tez session start thread").start();
+ // We assume here nobody will try to get session before open() returns.
+ this.console = console;
+ this.sessionFuture = sessionFuture;
+ }
+ }
- if (HiveConf.getBoolVar(conf, ConfVars.HIVE_PREWARM_ENABLED)) {
- int n = HiveConf.getIntVar(conf, ConfVars.HIVE_PREWARM_NUM_CONTAINERS);
- LOG.info("Prewarming " + n + " containers (id: " + sessionId
- + ", scratch dir: " + tezScratchDir + ")");
- PreWarmVertex prewarmVertex = utils.createPreWarmVertex(tezConfig, n,
- commonLocalResources);
- try {
- session.preWarm(prewarmVertex);
- } catch (IOException ie) {
- if (ie.getMessage().contains("Interrupted while waiting")) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Hive Prewarm threw an exception ", ie);
+ private TezClient startSessionAndContainers(TezClient session, HiveConf conf,
+ Map<String, LocalResource> commonLocalResources, TezConfiguration tezConfig,
+ boolean isOnThread) throws TezException, IOException {
+ session.start();
+ boolean isSuccessful = false;
+ try {
+ if (HiveConf.getBoolVar(conf, ConfVars.HIVE_PREWARM_ENABLED)) {
+ int n = HiveConf.getIntVar(conf, ConfVars.HIVE_PREWARM_NUM_CONTAINERS);
+ LOG.info("Prewarming " + n + " containers (id: " + sessionId
+ + ", scratch dir: " + tezScratchDir + ")");
+ PreWarmVertex prewarmVertex = utils.createPreWarmVertex(
+ tezConfig, n, commonLocalResources);
+ try {
+ session.preWarm(prewarmVertex);
+ } catch (IOException ie) {
+ if (!isOnThread && ie.getMessage().contains("Interrupted while waiting")) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Hive Prewarm threw an exception ", ie);
+ }
+ } else {
+ throw ie;
}
- } else {
- throw ie;
}
}
+ try {
+ session.waitTillReady();
+ } catch (InterruptedException ie) {
+ if (isOnThread) throw new IOException(ie);
+ //ignore
+ }
+ isSuccessful = true;
+ return session;
+ } finally {
+ if (isOnThread && !isSuccessful) {
+ closeAndIgnoreExceptions(session);
+ }
}
+ }
+
+ private static void closeAndIgnoreExceptions(TezClient session) {
+ try {
+ session.stop();
+ } catch (SessionNotRunning nr) {
+ // Ignore.
+ } catch (IOException | TezException ex) {
+ LOG.info("Failed to close Tez session after failure to initialize: " + ex.getMessage());
+ }
+ }
+
+ public void endOpen() throws InterruptedException, CancellationException {
+ if (this.session != null || this.sessionFuture == null) return;
try {
- session.waitTillReady();
- } catch(InterruptedException ie) {
- //ignore
+ this.session = this.sessionFuture.get();
+ } catch (ExecutionException e) {
+ throw new RuntimeException(e);
}
}
@@ -250,21 +345,32 @@ public class TezSessionState {
* @throws Exception
*/
public void close(boolean keepTmpDir) throws Exception {
- if (!isOpen()) {
- return;
- }
-
- LOG.info("Closing Tez Session");
- try {
- session.stop();
- } catch (SessionNotRunning nr) {
- // ignore
+ if (session != null) {
+ LOG.info("Closing Tez Session");
+ closeClient(session);
+ } else if (sessionFuture != null) {
+ sessionFuture.cancel(true);
+ TezClient asyncSession = null;
+ try {
+ asyncSession = sessionFuture.get(); // In case it was done and noone looked at it.
+ } catch (ExecutionException | CancellationException e) {
+ // ignore
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ // ignore
+ }
+ if (asyncSession != null) {
+ LOG.info("Closing Tez Session");
+ closeClient(asyncSession);
+ }
}
if (!keepTmpDir) {
cleanupScratchDir();
}
session = null;
+ sessionFuture = null;
+ console = null;
tezScratchDir = null;
conf = null;
appJarLr = null;
@@ -272,6 +378,15 @@ public class TezSessionState {
localizedResources.clear();
}
+ private void closeClient(TezClient client) throws TezException,
+ IOException {
+ try {
+ client.stop();
+ } catch (SessionNotRunning nr) {
+ // ignore
+ }
+ }
+
public void cleanupScratchDir () throws IOException {
FileSystem fs = tezScratchDir.getFileSystem(conf);
fs.delete(tezScratchDir, true);
@@ -283,6 +398,21 @@ public class TezSessionState {
}
public TezClient getSession() {
+ if (session == null && sessionFuture != null) {
+ if (!sessionFuture.isDone()) {
+ console.printInfo("Waiting for Tez session and AM to be ready...");
+ }
+ try {
+ session = sessionFuture.get();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ return null;
+ } catch (ExecutionException e) {
+ throw new RuntimeException(e);
+ } catch (CancellationException e) {
+ return null;
+ }
+ }
return session;
}
http://git-wip-us.apache.org/repos/asf/hive/blob/7f9023ea/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
index 2d740ed..c62e929 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
@@ -53,6 +53,7 @@ import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.LocalResourceType;
+import org.apache.tez.client.TezClient;
import org.apache.tez.common.counters.CounterGroup;
import org.apache.tez.common.counters.TezCounter;
import org.apache.tez.common.counters.TezCounters;
@@ -251,7 +252,8 @@ public class TezTask extends Task<TezWork> {
final boolean missingLocalResources = !session
.hasResources(inputOutputJars);
- if (!session.isOpen()) {
+ TezClient client = session.getSession();
+ if (client == null) {
// can happen if the user sets the tez flag after the session was
// established
LOG.info("Tez session hasn't been created yet. Opening session");
@@ -263,7 +265,7 @@ public class TezTask extends Task<TezWork> {
if (missingLocalResources) {
LOG.info("Tez session missing resources," +
" adding additional necessary resources");
- session.getSession().addAppMasterLocalFiles(extraResources);
+ client.addAppMasterLocalFiles(extraResources);
}
session.refreshLocalResourcesFromConf(conf);
http://git-wip-us.apache.org/repos/asf/hive/blob/7f9023ea/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java b/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java
index dc8c336..56b0fae 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java
@@ -38,6 +38,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
+import java.util.concurrent.CancellationException;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.commons.io.FileUtils;
@@ -85,7 +86,6 @@ import org.apache.hadoop.hive.shims.HadoopShims;
import org.apache.hadoop.hive.shims.ShimLoader;
import org.apache.hadoop.hive.shims.Utils;
import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.Shell;
import com.google.common.base.Preconditions;
@@ -474,6 +474,21 @@ public class SessionState {
* when switching from one session to another.
*/
public static SessionState start(SessionState startSs) {
+ start(startSs, false, null);
+ return startSs;
+ }
+
+ public static void beginStart(SessionState startSs, LogHelper console) {
+ start(startSs, true, console);
+ }
+
+ public static void endStart(SessionState startSs)
+ throws CancellationException, InterruptedException {
+ if (startSs.tezSessionState == null) return;
+ startSs.tezSessionState.endOpen();
+ }
+
+ private static void start(SessionState startSs, boolean isAsync, LogHelper console) {
setCurrentSessionState(startSs);
if (startSs.hiveHist == null){
@@ -521,20 +536,31 @@ public class SessionState {
throw new RuntimeException(e);
}
- if (HiveConf.getVar(startSs.getConf(), HiveConf.ConfVars.HIVE_EXECUTION_ENGINE)
- .equals("tez") && (startSs.isHiveServerQuery == false)) {
- try {
- if (startSs.tezSessionState == null) {
- startSs.tezSessionState = new TezSessionState(startSs.getSessionId());
- }
- if (!startSs.tezSessionState.isOpen()) {
- startSs.tezSessionState.open(startSs.conf); // should use conf on session start-up
+ String engine = HiveConf.getVar(startSs.getConf(), HiveConf.ConfVars.HIVE_EXECUTION_ENGINE);
+ if (!engine.equals("tez") || startSs.isHiveServerQuery) return;
+
+ try {
+ if (startSs.tezSessionState == null) {
+ startSs.tezSessionState = new TezSessionState(startSs.getSessionId());
+ }
+ if (startSs.tezSessionState.isOpen()) {
+ return;
+ }
+ if (startSs.tezSessionState.isOpening()) {
+ if (!isAsync) {
+ startSs.tezSessionState.endOpen();
}
- } catch (Exception e) {
- throw new RuntimeException(e);
+ return;
+ }
+ // Neither open nor opening.
+ if (!isAsync) {
+ startSs.tezSessionState.open(startSs.conf); // should use conf on session start-up
+ } else {
+ startSs.tezSessionState.beginOpen(startSs.conf, null, console);
}
+ } catch (Exception e) {
+ throw new RuntimeException(e);
}
- return startSs;
}
/**
@@ -1572,8 +1598,6 @@ public class SessionState {
}
}
-
-
public TezSessionState getTezSession() {
return tezSessionState;
}
http://git-wip-us.apache.org/repos/asf/hive/blob/7f9023ea/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java
index d004a27..858cca0 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java
@@ -236,6 +236,7 @@ public class TestTezTask {
.thenReturn(resources);
when(utils.getBaseName(res)).thenReturn("foo.jar");
when(sessionState.isOpen()).thenReturn(true);
+ when(sessionState.isOpening()).thenReturn(false);
when(sessionState.hasResources(inputOutputJars)).thenReturn(false);
task.updateSession(sessionState, conf, path, inputOutputJars, resMap);
verify(session).addAppMasterLocalFiles(resMap);
@@ -254,6 +255,7 @@ public class TestTezTask {
.thenReturn(resources);
when(utils.getBaseName(res)).thenReturn("foo.jar");
when(sessionState.isOpen()).thenReturn(true);
+ when(sessionState.isOpening()).thenReturn(false);
when(sessionState.hasResources(inputOutputJars)).thenReturn(false);
task.addExtraResourcesToDag(sessionState, dag, inputOutputJars, resMap);
verify(dag).addTaskLocalFiles(resMap);