You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by gu...@apache.org on 2013/08/26 22:37:06 UTC
svn commit: r1517689 - in /hive/branches/tez: conf/
ql/src/java/org/apache/hadoop/hive/ql/exec/tez/
ql/src/java/org/apache/hadoop/hive/ql/session/
Author: gunther
Date: Mon Aug 26 20:37:06 2013
New Revision: 1517689
URL: http://svn.apache.org/r1517689
Log:
HIVE-5148: Jam sessions w/ Tez (Gunther Hagleitner)
Added:
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java
Modified:
hive/branches/tez/conf/hive-default.xml.template
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java
Modified: hive/branches/tez/conf/hive-default.xml.template
URL: http://svn.apache.org/viewvc/hive/branches/tez/conf/hive-default.xml.template?rev=1517689&r1=1517688&r2=1517689&view=diff
==============================================================================
--- hive/branches/tez/conf/hive-default.xml.template (original)
+++ hive/branches/tez/conf/hive-default.xml.template Mon Aug 26 20:37:06 2013
@@ -1919,6 +1919,14 @@
</description>
</property>
+<property>
+ <name>hive.optimize.tez</name>
+ <value>false</value>
+ <description>
+ Setting this property turns on Tez execution. Needs tez installed on the
+ cluster. (Only availble on hadoop 2)
+ </description>
+</property>
</configuration>
Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java?rev=1517689&r1=1517688&r2=1517689&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java Mon Aug 26 20:37:06 2013
@@ -27,8 +27,8 @@ import java.util.Map;
import javax.security.auth.login.LoginException;
-import org.apache.commons.lang.StringUtils;
import org.apache.commons.io.FilenameUtils;
+import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
@@ -143,7 +143,7 @@ public class DagUtils {
* @param w The second vertex (sink)
* @return
*/
- public static Edge createEdge(JobConf vConf, Vertex v, JobConf wConf, Vertex w)
+ public static Edge createEdge(JobConf vConf, Vertex v, JobConf wConf, Vertex w)
throws IOException {
// Tez needs to setup output subsequent input pairs correctly
@@ -180,18 +180,18 @@ public class DagUtils {
}
// write out the operator plan
- Path planPath = Utilities.setMapWork(conf, mapWork,
+ Path planPath = Utilities.setMapWork(conf, mapWork,
mrScratchDir.toUri().toString(), false);
LocalResource planLr = createLocalResource(fs,
planPath, LocalResourceType.FILE,
LocalResourceVisibility.APPLICATION);
// setup input paths and split info
- List<Path> inputPaths = Utilities.getInputPaths(conf, mapWork,
+ List<Path> inputPaths = Utilities.getInputPaths(conf, mapWork,
mrScratchDir.toUri().toString(), ctx);
Utilities.setInputPaths(conf, inputPaths);
- InputSplitInfo inputSplitInfo = MRHelpers.generateInputSplits(conf,
+ InputSplitInfo inputSplitInfo = MRHelpers.generateInputSplits(conf,
new Path(tezDir, ""+seqNo));
// create the directories FileSinkOperators need
@@ -332,7 +332,7 @@ public class DagUtils {
* @throws LoginException if we are unable to figure user information
* @throws IOException when any dfs operation fails.
*/
- private static Path getDefaultDestDir(Configuration conf) throws LoginException, IOException {
+ public static Path getDefaultDestDir(Configuration conf) throws LoginException, IOException {
UserGroupInformation ugi = ShimLoader.getHadoopShims().getUGIForConf(conf);
String userName = ShimLoader.getHadoopShims().getShortUserName(ugi);
String userPathStr = HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_USER_INSTALL_DIR);
@@ -409,11 +409,11 @@ public class DagUtils {
}
// the api that finds the jar being used by this class on disk
- private static String getExecJarPathLocal () throws URISyntaxException {
+ public static String getExecJarPathLocal () throws URISyntaxException {
// returns the location on disc of the jar of this class.
return DagUtils.class.getProtectionDomain().getCodeSource().getLocation().toURI().toString();
}
-
+
/*
* Helper function to retrieve the basename of a local resource
*/
@@ -425,7 +425,7 @@ public class DagUtils {
* @param pathStr - the string from which we try to determine the resource base name
* @return the name of the resource from a given path string.
*/
- private static String getResourceBaseName(String pathStr) {
+ public static String getResourceBaseName(String pathStr) {
String[] splits = pathStr.split("/");
return splits[splits.length - 1];
}
@@ -466,7 +466,7 @@ public class DagUtils {
* @return localresource from tez localization.
* @throws IOException when any file system related calls fails.
*/
- private static LocalResource localizeResource(Path src, Path dest, Configuration conf)
+ public static LocalResource localizeResource(Path src, Path dest, Configuration conf)
throws IOException {
FileSystem destFS = dest.getFileSystem(conf);
if (!(destFS instanceof DistributedFileSystem)) {
@@ -486,79 +486,6 @@ public class DagUtils {
}
/**
- * Returns a local resource representing the hive-exec jar. This resource will
- * be used to execute the plan on the cluster.
- * @param conf
- * @return LocalResource corresponding to the localized hive exec resource.
- * @throws IOException when any file system related call fails.
- * @throws LoginException when we are unable to determine the user.
- * @throws URISyntaxException when current jar location cannot be determined.
- */
- public static LocalResource createHiveExecLocalResource(HiveConf conf)
- throws IOException, LoginException, URISyntaxException {
- String hiveJarDir = conf.getVar(HiveConf.ConfVars.HIVE_JAR_DIRECTORY);
- String currentVersionPathStr = getExecJarPathLocal();
- String currentJarName = getResourceBaseName(currentVersionPathStr);
- FileSystem fs = null;
- Path jarPath = null;
- FileStatus dirStatus = null;
-
- if (hiveJarDir != null) {
- // check if it is a valid directory in HDFS
- Path hiveJarDirPath = new Path(hiveJarDir);
- fs = hiveJarDirPath.getFileSystem(conf);
-
- if (!(fs instanceof DistributedFileSystem)) {
- throw new IOException(ErrorMsg.INVALID_HDFS_URI.format(hiveJarDir));
- }
-
- try {
- dirStatus = fs.getFileStatus(hiveJarDirPath);
- } catch (FileNotFoundException fe) {
- // do nothing
- }
- if ((dirStatus != null) && (dirStatus.isDir())) {
- FileStatus[] listFileStatus = fs.listStatus(hiveJarDirPath);
- for (FileStatus fstatus : listFileStatus) {
- String jarName = getResourceBaseName(fstatus.getPath().toString());
- if (jarName.equals(currentJarName)) {
- // we have found the jar we need.
- jarPath = fstatus.getPath();
- return localizeResource(null, jarPath, conf);
- }
- }
-
- // jar wasn't in the directory, copy the one in current use
- if (jarPath == null) {
- return localizeResource(new Path(currentVersionPathStr), hiveJarDirPath, conf);
- }
- }
- }
-
- /*
- * specified location does not exist or is not a directory
- * try to push the jar to the hdfs location pointed by
- * config variable HIVE_INSTALL_DIR. Path will be
- * HIVE_INSTALL_DIR/{username}/.hiveJars/
- */
- if ((hiveJarDir == null) || (dirStatus == null) ||
- ((dirStatus != null) && (!dirStatus.isDir()))) {
- Path dest = getDefaultDestDir(conf);
- String destPathStr = dest.toString();
- String jarPathStr = destPathStr + "/" + currentJarName;
- dirStatus = fs.getFileStatus(dest);
- if (dirStatus.isDir()) {
- return localizeResource(new Path(currentVersionPathStr), new Path(jarPathStr), conf);
- } else {
- throw new IOException(ErrorMsg.INVALID_DIR.format(dest.toString()));
- }
- }
-
- // we couldn't find any valid locations. Throw exception
- throw new IOException(ErrorMsg.NO_VALID_LOCATIONS.getMsg());
- }
-
- /**
* Creates and initializes a JobConf object that can be used to execute
* the DAG. The configuration object will contain configurations from mapred-site
* overlaid with key/value pairs from the hiveConf object. Finally it will also
Added: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java?rev=1517689&view=auto
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java (added)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java Mon Aug 26 20:37:06 2013
@@ -0,0 +1,247 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.exec.tez;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.util.HashMap;
+import java.util.Map;
+
+import javax.security.auth.login.LoginException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.ErrorMsg;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.tez.client.AMConfiguration;
+import org.apache.tez.client.TezSession;
+import org.apache.tez.client.TezSessionConfiguration;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.TezException;
+import org.apache.tez.mapreduce.hadoop.MRHelpers;
+
+/**
+ * Holds session state related to Tez
+ */
+public class TezSessionState {
+
+ private static final Log LOG = LogFactory.getLog(TezSessionState.class.getName());
+ private static final String TEZ_DIR = "_tez_session_dir";
+
+ private HiveConf conf;
+ private Path tezScratchDir;
+ private LocalResource appJarLr;
+ private TezSession session;
+ private String sessionId;
+
+ /**
+ * Constructor. We do not automatically connect, because we only want to
+ * load tez classes when the user has tez installed.
+ */
+ public void TezSessionContext() {
+ }
+
+ /**
+ * Returns whether a session has been established
+ */
+ public boolean isOpen() {
+ return session != null;
+ }
+
+ /**
+ * Creates a tez session. A session is tied to either a cli/hs2 session. You can
+ * submit multiple DAGs against a session (as long as they are executed serially).
+ * @throws IOException
+ * @throws URISyntaxException
+ * @throws LoginException
+ * @throws TezException
+ */
+ public void open(String sessionId, HiveConf conf)
+ throws IOException, LoginException, URISyntaxException, TezException {
+
+ this.sessionId = sessionId;
+ this.conf = conf;
+
+ // create the tez tmp dir
+ tezScratchDir = createTezDir(sessionId);
+
+ // generate basic tez config
+ TezConfiguration tezConfig = new TezConfiguration(conf);
+
+ tezConfig.set(TezConfiguration.TEZ_AM_JAVA_OPTS,
+ MRHelpers.getMRAMJavaOpts(conf));
+
+ tezConfig.set(TezConfiguration.TEZ_AM_STAGING_DIR, tezScratchDir.toUri().toString());
+
+ // unless already installed on all the cluster nodes, we'll have to
+ // localize hive-exec.jar as well.
+ appJarLr = createHiveExecLocalResource();
+
+ // configuration for the application master
+ Map<String, LocalResource> commonLocalResources = new HashMap<String, LocalResource>();
+ commonLocalResources.put(DagUtils.getBaseName(appJarLr), appJarLr);
+
+ AMConfiguration amConfig = new AMConfiguration("default", null, commonLocalResources,
+ tezConfig, null);
+
+ // configuration for the session
+ TezSessionConfiguration sessionConfig = new TezSessionConfiguration(amConfig, tezConfig);
+
+ // and finally we're ready to create and start the session
+ session = new TezSession("HIVE-"+sessionId, sessionConfig);
+
+ LOG.info("Opening new Tez Session (id: "+sessionId+", scratch dir: "+tezScratchDir+")");
+ session.start();
+ }
+
+ /**
+ * Close a tez session. Will cleanup any tez/am related resources. After closing a session
+ * no further DAGs can be executed against it.
+ * @throws IOException
+ * @throws TezException
+ */
+ public void close() throws TezException, IOException {
+ if (!isOpen()) {
+ return;
+ }
+
+ LOG.info("Closing Tez Session");
+ session.stop();
+ FileSystem fs = tezScratchDir.getFileSystem(conf);
+ fs.delete(tezScratchDir, true);
+ session = null;
+ tezScratchDir = null;
+ conf = null;
+ appJarLr = null;
+ }
+
+ public String getSessionId() {
+ return sessionId;
+ }
+
+ public TezSession getSession() {
+ return session;
+ }
+
+ public Path getTezScratchDir() {
+ return tezScratchDir;
+ }
+
+ public LocalResource getAppJarLr() {
+ return appJarLr;
+ }
+
+ /**
+ * createTezDir creates a temporary directory in the scratchDir folder to
+ * be used with Tez. Assumes scratchDir exists.
+ */
+ private Path createTezDir(String sessionId)
+ throws IOException {
+
+ // tez needs its own scratch dir (per session)
+ Path tezDir = new Path(HiveConf.getVar(conf, HiveConf.ConfVars.SCRATCHDIR),
+ TEZ_DIR);
+ tezDir = new Path(tezDir, sessionId);
+ FileSystem fs = tezDir.getFileSystem(conf);
+ fs.mkdirs(tezDir);
+
+ // don't keep the directory around on non-clean exit
+ fs.deleteOnExit(tezDir);
+
+ return tezDir;
+ }
+
+ /**
+ * Returns a local resource representing the hive-exec jar. This resource will
+ * be used to execute the plan on the cluster.
+ * @param conf
+ * @return LocalResource corresponding to the localized hive exec resource.
+ * @throws IOException when any file system related call fails.
+ * @throws LoginException when we are unable to determine the user.
+ * @throws URISyntaxException when current jar location cannot be determined.
+ */
+ private LocalResource createHiveExecLocalResource()
+ throws IOException, LoginException, URISyntaxException {
+ String hiveJarDir = conf.getVar(HiveConf.ConfVars.HIVE_JAR_DIRECTORY);
+ String currentVersionPathStr = DagUtils.getExecJarPathLocal();
+ String currentJarName = DagUtils.getResourceBaseName(currentVersionPathStr);
+ FileSystem fs = null;
+ Path jarPath = null;
+ FileStatus dirStatus = null;
+
+ if (hiveJarDir != null) {
+ // check if it is a valid directory in HDFS
+ Path hiveJarDirPath = new Path(hiveJarDir);
+ fs = hiveJarDirPath.getFileSystem(conf);
+
+ if (!(fs instanceof DistributedFileSystem)) {
+ throw new IOException(ErrorMsg.INVALID_HDFS_URI.format(hiveJarDir));
+ }
+
+ try {
+ dirStatus = fs.getFileStatus(hiveJarDirPath);
+ } catch (FileNotFoundException fe) {
+ // do nothing
+ }
+ if ((dirStatus != null) && (dirStatus.isDir())) {
+ FileStatus[] listFileStatus = fs.listStatus(hiveJarDirPath);
+ for (FileStatus fstatus : listFileStatus) {
+ String jarName = DagUtils.getResourceBaseName(fstatus.getPath().toString());
+ if (jarName.equals(currentJarName)) {
+ // we have found the jar we need.
+ jarPath = fstatus.getPath();
+ return DagUtils.localizeResource(null, jarPath, conf);
+ }
+ }
+
+ // jar wasn't in the directory, copy the one in current use
+ if (jarPath == null) {
+ return DagUtils.localizeResource(new Path(currentVersionPathStr), hiveJarDirPath, conf);
+ }
+ }
+ }
+
+ /*
+ * specified location does not exist or is not a directory
+ * try to push the jar to the hdfs location pointed by
+ * config variable HIVE_INSTALL_DIR. Path will be
+ * HIVE_INSTALL_DIR/{username}/.hiveJars/
+ */
+ if ((hiveJarDir == null) || (dirStatus == null) ||
+ ((dirStatus != null) && (!dirStatus.isDir()))) {
+ Path dest = DagUtils.getDefaultDestDir(conf);
+ String destPathStr = dest.toString();
+ String jarPathStr = destPathStr + "/" + currentJarName;
+ dirStatus = fs.getFileStatus(dest);
+ if (dirStatus.isDir()) {
+ return DagUtils.localizeResource(new Path(currentVersionPathStr), new Path(jarPathStr), conf);
+ } else {
+ throw new IOException(ErrorMsg.INVALID_DIR.format(dest.toString()));
+ }
+ }
+
+ // we couldn't find any valid locations. Throw exception
+ throw new IOException(ErrorMsg.NO_VALID_LOCATIONS.getMsg());
+ }
+}
Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java?rev=1517689&r1=1517688&r2=1517689&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java Mon Aug 26 20:37:06 2013
@@ -36,17 +36,16 @@ import org.apache.hadoop.hive.ql.exec.Ut
import org.apache.hadoop.hive.ql.plan.BaseWork;
import org.apache.hadoop.hive.ql.plan.TezWork;
import org.apache.hadoop.hive.ql.plan.api.StageType;
+import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.records.LocalResource;
-import org.apache.tez.client.TezClient;
+import org.apache.tez.client.TezSession;
import org.apache.tez.dag.api.DAG;
import org.apache.tez.dag.api.Edge;
-import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.TezException;
import org.apache.tez.dag.api.Vertex;
import org.apache.tez.dag.api.client.DAGClient;
-import org.apache.tez.mapreduce.hadoop.MRHelpers;
/**
*
@@ -67,6 +66,7 @@ public class TezTask extends Task<TezWor
boolean cleanContext = false;
Context ctx = null;
DAGClient client = null;
+ TezSessionState session = null;
try {
// Get or create Context object. If we create it we have to clean
@@ -77,6 +77,16 @@ public class TezTask extends Task<TezWor
cleanContext = true;
}
+ // Need to remove this static hack. But this is the way currently to
+ // get a session.
+ SessionState ss = SessionState.get();
+ session = ss.getTezSession();
+ if (!session.isOpen()) {
+ // can happen if the user sets the tez flag after the session was
+ // established
+ session.open(ss.getSessionId(), conf);
+ }
+
// we will localize all the files (jars, plans, hashtables) to the
// scratch dir. let's create this first.
Path scratchDir = new Path(ctx.getMRScratchDir());
@@ -89,13 +99,13 @@ public class TezTask extends Task<TezWor
// unless already installed on all the cluster nodes, we'll have to
// localize hive-exec.jar as well.
- LocalResource appJarLr = DagUtils.createHiveExecLocalResource(conf);
+ LocalResource appJarLr = session.getAppJarLr();
// next we translate the TezWork to a Tez DAG
DAG dag = build(jobConf, work, scratchDir, appJarLr, ctx);
// submit will send the job to the cluster and start executing
- client = submit(jobConf, dag, scratchDir, appJarLr);
+ client = submit(jobConf, dag, scratchDir, appJarLr, session.getSession());
// finally monitor will print progress until the job is done
TezJobMonitor monitor = new TezJobMonitor();
@@ -151,7 +161,7 @@ public class TezTask extends Task<TezWor
// translate work to vertex
JobConf wxConf = DagUtils.initializeVertexConf(conf, w);
- Vertex wx = DagUtils.createVertex(wxConf, w, tezDir,
+ Vertex wx = DagUtils.createVertex(wxConf, w, tezDir,
i--, appJarLr, additionalLr, fs, ctx);
dag.addVertex(wx);
workToVertex.put(w, wx);
@@ -168,25 +178,12 @@ public class TezTask extends Task<TezWor
return dag;
}
- private DAGClient submit(JobConf conf, DAG dag, Path scratchDir, LocalResource appJarLr)
+ private DAGClient submit(JobConf conf, DAG dag, Path scratchDir,
+ LocalResource appJarLr, TezSession session)
throws IOException, TezException, InterruptedException {
- TezClient tezClient = new TezClient(new TezConfiguration(conf));
-
- // environment variables used by application master
- Map<String,String> amEnv = new HashMap<String, String>();
- MRHelpers.updateEnvironmentForMRTasks(conf, amEnv, false);
-
- // setup local resources used by application master
- Map<String, LocalResource> amLrs = new HashMap<String, LocalResource>();
- amLrs.put(DagUtils.getBaseName(appJarLr), appJarLr);
-
- Path tezDir = DagUtils.getTezDir(scratchDir);
-
// ready to start execution on the cluster
- DAGClient dagClient = tezClient.submitDAGApplication(dag, tezDir,
- null, "default", Collections.singletonList(""), amEnv, amLrs,
- new TezConfiguration(conf));
+ DAGClient dagClient = session.submitDAG(dag);
return dagClient;
}
@@ -209,7 +206,7 @@ public class TezTask extends Task<TezWor
// jobClose needs to execute successfully otherwise fail task
if (rc == 0) {
rc = 3;
- String mesg = "Job Commit failed with exception '"
+ String mesg = "Job Commit failed with exception '"
+ Utilities.getNameMessage(e) + "'";
console.printError(mesg, "\n" + StringUtils.stringifyException(e));
}
Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java?rev=1517689&r1=1517688&r2=1517689&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java Mon Aug 26 20:37:06 2013
@@ -43,6 +43,7 @@ import org.apache.hadoop.hive.conf.HiveC
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.ql.MapRedStats;
import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.exec.tez.TezSessionState;
import org.apache.hadoop.hive.ql.history.HiveHistory;
import org.apache.hadoop.hive.ql.log.PerfLogger;
import org.apache.hadoop.hive.ql.metadata.HiveException;
@@ -131,6 +132,8 @@ public class SessionState {
private Map<String, List<String>> localMapRedErrors;
+ private final TezSessionState tezSessionState;
+
/**
* Lineage state.
*/
@@ -189,6 +192,7 @@ public class SessionState {
this.conf = conf;
isSilent = conf.getBoolVar(HiveConf.ConfVars.HIVESESSIONSILENT);
ls = new LineageState();
+ tezSessionState = new TezSessionState();
overriddenConfigurations = new HashMap<String, String>();
overriddenConfigurations.putAll(HiveConf.getConfSystemProperties());
// if there isn't already a session name, go ahead and create it.
@@ -281,6 +285,16 @@ public class SessionState {
throw new RuntimeException(e);
}
+ if (HiveConf.getBoolVar(startSs.getConf(), HiveConf.ConfVars.HIVE_OPTIMIZE_TEZ)) {
+ try {
+ startSs.tezSessionState.open(startSs.getSessionId(), startSs.conf);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ } else {
+ LOG.info("No Tez session required at this point. hive.optimize.tez is false.");
+ }
+
return startSs;
}
@@ -749,6 +763,12 @@ public class SessionState {
} catch (IOException e) {
LOG.info("Error removing session resource dir " + resourceDir, e);
}
+
+ try {
+ tezSessionState.close();
+ } catch (Exception e) {
+ LOG.info("Error closing tez session", e);
+ }
}
/**
@@ -771,4 +791,8 @@ public class SessionState {
return perfLogger;
}
+ public TezSessionState getTezSession() {
+ return tezSessionState;
+ }
+
}