You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by gu...@apache.org on 2013/08/26 22:37:06 UTC

svn commit: r1517689 - in /hive/branches/tez: conf/ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ ql/src/java/org/apache/hadoop/hive/ql/session/

Author: gunther
Date: Mon Aug 26 20:37:06 2013
New Revision: 1517689

URL: http://svn.apache.org/r1517689
Log:
HIVE-5148: Jam sessions w/ Tez (Gunther Hagleitner)

Added:
    hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java
Modified:
    hive/branches/tez/conf/hive-default.xml.template
    hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java
    hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
    hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java

Modified: hive/branches/tez/conf/hive-default.xml.template
URL: http://svn.apache.org/viewvc/hive/branches/tez/conf/hive-default.xml.template?rev=1517689&r1=1517688&r2=1517689&view=diff
==============================================================================
--- hive/branches/tez/conf/hive-default.xml.template (original)
+++ hive/branches/tez/conf/hive-default.xml.template Mon Aug 26 20:37:06 2013
@@ -1919,6 +1919,14 @@
   </description>
 </property>
 
+<property>
+  <name>hive.optimize.tez</name>
+  <value>false</value>
+  <description>
+    Setting this property turns on Tez execution. Needs tez installed on the
+    cluster. (Only availble on hadoop 2)
+  </description>
+</property>
 
 </configuration>
 

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java?rev=1517689&r1=1517688&r2=1517689&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java Mon Aug 26 20:37:06 2013
@@ -27,8 +27,8 @@ import java.util.Map;
 
 import javax.security.auth.login.LoginException;
 
-import org.apache.commons.lang.StringUtils;
 import org.apache.commons.io.FilenameUtils;
+import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
@@ -143,7 +143,7 @@ public class DagUtils {
    * @param w The second vertex (sink)
    * @return
    */
-  public static Edge createEdge(JobConf vConf, Vertex v, JobConf wConf, Vertex w) 
+  public static Edge createEdge(JobConf vConf, Vertex v, JobConf wConf, Vertex w)
       throws IOException {
 
     // Tez needs to setup output subsequent input pairs correctly
@@ -180,18 +180,18 @@ public class DagUtils {
     }
 
     // write out the operator plan
-    Path planPath = Utilities.setMapWork(conf, mapWork, 
+    Path planPath = Utilities.setMapWork(conf, mapWork,
         mrScratchDir.toUri().toString(), false);
     LocalResource planLr = createLocalResource(fs,
         planPath, LocalResourceType.FILE,
         LocalResourceVisibility.APPLICATION);
 
     // setup input paths and split info
-    List<Path> inputPaths = Utilities.getInputPaths(conf, mapWork, 
+    List<Path> inputPaths = Utilities.getInputPaths(conf, mapWork,
         mrScratchDir.toUri().toString(), ctx);
     Utilities.setInputPaths(conf, inputPaths);
 
-    InputSplitInfo inputSplitInfo = MRHelpers.generateInputSplits(conf, 
+    InputSplitInfo inputSplitInfo = MRHelpers.generateInputSplits(conf,
         new Path(tezDir, ""+seqNo));
 
     // create the directories FileSinkOperators need
@@ -332,7 +332,7 @@ public class DagUtils {
    * @throws LoginException if we are unable to figure user information
    * @throws IOException when any dfs operation fails.
    */
-  private static Path getDefaultDestDir(Configuration conf) throws LoginException, IOException {
+  public static Path getDefaultDestDir(Configuration conf) throws LoginException, IOException {
     UserGroupInformation ugi = ShimLoader.getHadoopShims().getUGIForConf(conf);
     String userName = ShimLoader.getHadoopShims().getShortUserName(ugi);
     String userPathStr = HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_USER_INSTALL_DIR);
@@ -409,11 +409,11 @@ public class DagUtils {
   }
 
   // the api that finds the jar being used by this class on disk
-  private static String getExecJarPathLocal () throws URISyntaxException {
+  public static String getExecJarPathLocal () throws URISyntaxException {
       // returns the location on disc of the jar of this class.
     return DagUtils.class.getProtectionDomain().getCodeSource().getLocation().toURI().toString();
   }
-  
+
   /*
    * Helper function to retrieve the basename of a local resource
    */
@@ -425,7 +425,7 @@ public class DagUtils {
    * @param pathStr - the string from which we try to determine the resource base name
    * @return the name of the resource from a given path string.
    */
-  private static String getResourceBaseName(String pathStr) {
+  public static String getResourceBaseName(String pathStr) {
     String[] splits = pathStr.split("/");
     return splits[splits.length - 1];
   }
@@ -466,7 +466,7 @@ public class DagUtils {
    * @return localresource from tez localization.
    * @throws IOException when any file system related calls fails.
    */
-  private static LocalResource localizeResource(Path src, Path dest, Configuration conf)
+  public static LocalResource localizeResource(Path src, Path dest, Configuration conf)
       throws IOException {
     FileSystem destFS = dest.getFileSystem(conf);
     if (!(destFS instanceof DistributedFileSystem)) {
@@ -486,79 +486,6 @@ public class DagUtils {
   }
 
   /**
-   * Returns a local resource representing the hive-exec jar. This resource will
-   * be used to execute the plan on the cluster.
-   * @param conf
-   * @return LocalResource corresponding to the localized hive exec resource.
-   * @throws IOException when any file system related call fails.
-   * @throws LoginException when we are unable to determine the user.
-   * @throws URISyntaxException when current jar location cannot be determined.
-   */
-  public static LocalResource createHiveExecLocalResource(HiveConf conf)
-      throws IOException, LoginException, URISyntaxException {
-    String hiveJarDir = conf.getVar(HiveConf.ConfVars.HIVE_JAR_DIRECTORY);
-    String currentVersionPathStr = getExecJarPathLocal();
-    String currentJarName = getResourceBaseName(currentVersionPathStr);
-    FileSystem fs = null;
-    Path jarPath = null;
-    FileStatus dirStatus = null;
-
-    if (hiveJarDir != null) {
-      // check if it is a valid directory in HDFS
-      Path hiveJarDirPath = new Path(hiveJarDir);
-      fs = hiveJarDirPath.getFileSystem(conf);
-
-      if (!(fs instanceof DistributedFileSystem)) {
-        throw new IOException(ErrorMsg.INVALID_HDFS_URI.format(hiveJarDir));
-      }
-
-      try {
-        dirStatus = fs.getFileStatus(hiveJarDirPath);
-      } catch (FileNotFoundException fe) {
-        // do nothing
-      }
-      if ((dirStatus != null) && (dirStatus.isDir())) {
-        FileStatus[] listFileStatus = fs.listStatus(hiveJarDirPath);
-        for (FileStatus fstatus : listFileStatus) {
-          String jarName = getResourceBaseName(fstatus.getPath().toString());
-          if (jarName.equals(currentJarName)) {
-            // we have found the jar we need.
-            jarPath = fstatus.getPath();
-            return localizeResource(null, jarPath, conf);
-          }
-        }
-
-        // jar wasn't in the directory, copy the one in current use
-        if (jarPath == null) {
-          return localizeResource(new Path(currentVersionPathStr), hiveJarDirPath, conf);
-        }
-      }
-    }
-
-    /*
-     * specified location does not exist or is not a directory
-     * try to push the jar to the hdfs location pointed by
-     * config variable HIVE_INSTALL_DIR. Path will be
-     * HIVE_INSTALL_DIR/{username}/.hiveJars/
-     */
-    if ((hiveJarDir == null) || (dirStatus == null) ||
-        ((dirStatus != null) && (!dirStatus.isDir()))) {
-      Path dest = getDefaultDestDir(conf);
-      String destPathStr = dest.toString();
-      String jarPathStr = destPathStr + "/" + currentJarName;
-      dirStatus = fs.getFileStatus(dest);
-      if (dirStatus.isDir()) {
-        return localizeResource(new Path(currentVersionPathStr), new Path(jarPathStr), conf);
-      } else {
-        throw new IOException(ErrorMsg.INVALID_DIR.format(dest.toString()));
-      }
-    }
-
-    // we couldn't find any valid locations. Throw exception
-    throw new IOException(ErrorMsg.NO_VALID_LOCATIONS.getMsg());
-  }
-
-  /**
    * Creates and initializes a JobConf object that can be used to execute
    * the DAG. The configuration object will contain configurations from mapred-site
    * overlaid with key/value pairs from the hiveConf object. Finally it will also

Added: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java?rev=1517689&view=auto
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java (added)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java Mon Aug 26 20:37:06 2013
@@ -0,0 +1,247 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.exec.tez;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.util.HashMap;
+import java.util.Map;
+
+import javax.security.auth.login.LoginException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.ErrorMsg;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.tez.client.AMConfiguration;
+import org.apache.tez.client.TezSession;
+import org.apache.tez.client.TezSessionConfiguration;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.TezException;
+import org.apache.tez.mapreduce.hadoop.MRHelpers;
+
+/**
+ * Holds session state related to Tez
+ */
+public class TezSessionState {
+
+  private static final Log LOG = LogFactory.getLog(TezSessionState.class.getName());
+  private static final String TEZ_DIR = "_tez_session_dir";
+
+  private HiveConf conf;
+  private Path tezScratchDir;
+  private LocalResource appJarLr;
+  private TezSession session;
+  private String sessionId;
+
+  /**
+   * Constructor. We do not automatically connect, because we only want to
+   * load tez classes when the user has tez installed.
+   */
+  public void TezSessionContext() {
+  }
+
+  /**
+   * Returns whether a session has been established
+   */
+  public boolean isOpen() {
+    return session != null;
+  }
+
+  /**
+   * Creates a tez session. A session is tied to either a cli/hs2 session. You can
+   * submit multiple DAGs against a session (as long as they are executed serially).
+   * @throws IOException
+   * @throws URISyntaxException
+   * @throws LoginException
+   * @throws TezException
+   */
+  public void open(String sessionId, HiveConf conf)
+      throws IOException, LoginException, URISyntaxException, TezException {
+
+    this.sessionId = sessionId;
+    this.conf = conf;
+
+    // create the tez tmp dir
+    tezScratchDir = createTezDir(sessionId);
+
+    // generate basic tez config
+    TezConfiguration tezConfig = new TezConfiguration(conf);
+
+    tezConfig.set(TezConfiguration.TEZ_AM_JAVA_OPTS,
+        MRHelpers.getMRAMJavaOpts(conf));
+
+    tezConfig.set(TezConfiguration.TEZ_AM_STAGING_DIR, tezScratchDir.toUri().toString());
+
+    // unless already installed on all the cluster nodes, we'll have to
+    // localize hive-exec.jar as well.
+    appJarLr = createHiveExecLocalResource();
+
+    // configuration for the application master
+    Map<String, LocalResource> commonLocalResources = new HashMap<String, LocalResource>();
+    commonLocalResources.put(DagUtils.getBaseName(appJarLr), appJarLr);
+
+    AMConfiguration amConfig = new AMConfiguration("default", null, commonLocalResources,
+         tezConfig, null);
+
+    // configuration for the session
+    TezSessionConfiguration sessionConfig = new TezSessionConfiguration(amConfig, tezConfig);
+
+    // and finally we're ready to create and start the session
+    session = new TezSession("HIVE-"+sessionId, sessionConfig);
+
+    LOG.info("Opening new Tez Session (id: "+sessionId+", scratch dir: "+tezScratchDir+")");
+    session.start();
+  }
+
+  /**
+   * Close a tez session. Will cleanup any tez/am related resources. After closing a session
+   * no further DAGs can be executed against it.
+   * @throws IOException
+   * @throws TezException
+   */
+  public void close() throws TezException, IOException {
+    if (!isOpen()) {
+      return;
+    }
+
+    LOG.info("Closing Tez Session");
+    session.stop();
+    FileSystem fs = tezScratchDir.getFileSystem(conf);
+    fs.delete(tezScratchDir, true);
+    session = null;
+    tezScratchDir = null;
+    conf = null;
+    appJarLr = null;
+  }
+
+  public String getSessionId() {
+    return sessionId;
+  }
+
+  public TezSession getSession() {
+    return session;
+  }
+
+  public Path getTezScratchDir() {
+    return tezScratchDir;
+  }
+
+  public LocalResource getAppJarLr() {
+    return appJarLr;
+  }
+
+  /**
+   * createTezDir creates a temporary directory in the scratchDir folder to
+   * be used with Tez. Assumes scratchDir exists.
+   */
+  private Path createTezDir(String sessionId)
+      throws IOException {
+
+    // tez needs its own scratch dir (per session)
+    Path tezDir = new Path(HiveConf.getVar(conf, HiveConf.ConfVars.SCRATCHDIR),
+        TEZ_DIR);
+    tezDir = new Path(tezDir, sessionId);
+    FileSystem fs = tezDir.getFileSystem(conf);
+    fs.mkdirs(tezDir);
+
+    // don't keep the directory around on non-clean exit
+    fs.deleteOnExit(tezDir);
+
+    return tezDir;
+  }
+
+  /**
+   * Returns a local resource representing the hive-exec jar. This resource will
+   * be used to execute the plan on the cluster.
+   * @param conf
+   * @return LocalResource corresponding to the localized hive exec resource.
+   * @throws IOException when any file system related call fails.
+   * @throws LoginException when we are unable to determine the user.
+   * @throws URISyntaxException when current jar location cannot be determined.
+   */
+  private LocalResource createHiveExecLocalResource()
+      throws IOException, LoginException, URISyntaxException {
+    String hiveJarDir = conf.getVar(HiveConf.ConfVars.HIVE_JAR_DIRECTORY);
+    String currentVersionPathStr = DagUtils.getExecJarPathLocal();
+    String currentJarName = DagUtils.getResourceBaseName(currentVersionPathStr);
+    FileSystem fs = null;
+    Path jarPath = null;
+    FileStatus dirStatus = null;
+
+    if (hiveJarDir != null) {
+      // check if it is a valid directory in HDFS
+      Path hiveJarDirPath = new Path(hiveJarDir);
+      fs = hiveJarDirPath.getFileSystem(conf);
+
+      if (!(fs instanceof DistributedFileSystem)) {
+        throw new IOException(ErrorMsg.INVALID_HDFS_URI.format(hiveJarDir));
+      }
+
+      try {
+        dirStatus = fs.getFileStatus(hiveJarDirPath);
+      } catch (FileNotFoundException fe) {
+        // do nothing
+      }
+      if ((dirStatus != null) && (dirStatus.isDir())) {
+        FileStatus[] listFileStatus = fs.listStatus(hiveJarDirPath);
+        for (FileStatus fstatus : listFileStatus) {
+          String jarName = DagUtils.getResourceBaseName(fstatus.getPath().toString());
+          if (jarName.equals(currentJarName)) {
+            // we have found the jar we need.
+            jarPath = fstatus.getPath();
+            return DagUtils.localizeResource(null, jarPath, conf);
+          }
+        }
+
+        // jar wasn't in the directory, copy the one in current use
+        if (jarPath == null) {
+          return DagUtils.localizeResource(new Path(currentVersionPathStr), hiveJarDirPath, conf);
+        }
+      }
+    }
+
+    /*
+     * specified location does not exist or is not a directory
+     * try to push the jar to the hdfs location pointed by
+     * config variable HIVE_INSTALL_DIR. Path will be
+     * HIVE_INSTALL_DIR/{username}/.hiveJars/
+     */
+    if ((hiveJarDir == null) || (dirStatus == null) ||
+        ((dirStatus != null) && (!dirStatus.isDir()))) {
+      Path dest = DagUtils.getDefaultDestDir(conf);
+      String destPathStr = dest.toString();
+      String jarPathStr = destPathStr + "/" + currentJarName;
+      dirStatus = fs.getFileStatus(dest);
+      if (dirStatus.isDir()) {
+        return DagUtils.localizeResource(new Path(currentVersionPathStr), new Path(jarPathStr), conf);
+      } else {
+        throw new IOException(ErrorMsg.INVALID_DIR.format(dest.toString()));
+      }
+    }
+
+    // we couldn't find any valid locations. Throw exception
+    throw new IOException(ErrorMsg.NO_VALID_LOCATIONS.getMsg());
+  }
+}

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java?rev=1517689&r1=1517688&r2=1517689&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java Mon Aug 26 20:37:06 2013
@@ -36,17 +36,16 @@ import org.apache.hadoop.hive.ql.exec.Ut
 import org.apache.hadoop.hive.ql.plan.BaseWork;
 import org.apache.hadoop.hive.ql.plan.TezWork;
 import org.apache.hadoop.hive.ql.plan.api.StageType;
+import org.apache.hadoop.hive.ql.session.SessionState;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.yarn.api.records.LocalResource;
-import org.apache.tez.client.TezClient;
+import org.apache.tez.client.TezSession;
 import org.apache.tez.dag.api.DAG;
 import org.apache.tez.dag.api.Edge;
-import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.TezException;
 import org.apache.tez.dag.api.Vertex;
 import org.apache.tez.dag.api.client.DAGClient;
-import org.apache.tez.mapreduce.hadoop.MRHelpers;
 
 /**
  *
@@ -67,6 +66,7 @@ public class TezTask extends Task<TezWor
     boolean cleanContext = false;
     Context ctx = null;
     DAGClient client = null;
+    TezSessionState session = null;
 
     try {
       // Get or create Context object. If we create it we have to clean
@@ -77,6 +77,16 @@ public class TezTask extends Task<TezWor
         cleanContext = true;
       }
 
+      // Need to remove this static hack. But this is the way currently to
+      // get a session.
+      SessionState ss = SessionState.get();
+      session = ss.getTezSession();
+      if (!session.isOpen()) {
+        // can happen if the user sets the tez flag after the session was
+        // established
+        session.open(ss.getSessionId(), conf);
+      }
+
       // we will localize all the files (jars, plans, hashtables) to the
       // scratch dir. let's create this first.
       Path scratchDir = new Path(ctx.getMRScratchDir());
@@ -89,13 +99,13 @@ public class TezTask extends Task<TezWor
 
       // unless already installed on all the cluster nodes, we'll have to
       // localize hive-exec.jar as well.
-      LocalResource appJarLr = DagUtils.createHiveExecLocalResource(conf);
+      LocalResource appJarLr = session.getAppJarLr();
 
       // next we translate the TezWork to a Tez DAG
       DAG dag = build(jobConf, work, scratchDir, appJarLr, ctx);
 
       // submit will send the job to the cluster and start executing
-      client = submit(jobConf, dag, scratchDir, appJarLr);
+      client = submit(jobConf, dag, scratchDir, appJarLr, session.getSession());
 
       // finally monitor will print progress until the job is done
       TezJobMonitor monitor = new TezJobMonitor();
@@ -151,7 +161,7 @@ public class TezTask extends Task<TezWor
 
       // translate work to vertex
       JobConf wxConf = DagUtils.initializeVertexConf(conf, w);
-      Vertex wx = DagUtils.createVertex(wxConf, w, tezDir, 
+      Vertex wx = DagUtils.createVertex(wxConf, w, tezDir,
           i--, appJarLr, additionalLr, fs, ctx);
       dag.addVertex(wx);
       workToVertex.put(w, wx);
@@ -168,25 +178,12 @@ public class TezTask extends Task<TezWor
     return dag;
   }
 
-  private DAGClient submit(JobConf conf, DAG dag, Path scratchDir, LocalResource appJarLr)
+  private DAGClient submit(JobConf conf, DAG dag, Path scratchDir,
+      LocalResource appJarLr, TezSession session)
       throws IOException, TezException, InterruptedException {
 
-    TezClient tezClient = new TezClient(new TezConfiguration(conf));
-
-    // environment variables used by application master
-    Map<String,String> amEnv = new HashMap<String, String>();
-    MRHelpers.updateEnvironmentForMRTasks(conf, amEnv, false);
-
-    // setup local resources used by application master
-    Map<String, LocalResource> amLrs = new HashMap<String, LocalResource>();
-    amLrs.put(DagUtils.getBaseName(appJarLr), appJarLr);
-
-    Path tezDir = DagUtils.getTezDir(scratchDir);
-
     // ready to start execution on the cluster
-    DAGClient dagClient = tezClient.submitDAGApplication(dag, tezDir,
-        null, "default", Collections.singletonList(""), amEnv, amLrs,
-        new TezConfiguration(conf));
+    DAGClient dagClient = session.submitDAG(dag);
 
     return dagClient;
   }
@@ -209,7 +206,7 @@ public class TezTask extends Task<TezWor
       // jobClose needs to execute successfully otherwise fail task
       if (rc == 0) {
         rc = 3;
-        String mesg = "Job Commit failed with exception '" 
+        String mesg = "Job Commit failed with exception '"
           + Utilities.getNameMessage(e) + "'";
         console.printError(mesg, "\n" + StringUtils.stringifyException(e));
       }

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java?rev=1517689&r1=1517688&r2=1517689&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java Mon Aug 26 20:37:06 2013
@@ -43,6 +43,7 @@ import org.apache.hadoop.hive.conf.HiveC
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
 import org.apache.hadoop.hive.ql.MapRedStats;
 import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.exec.tez.TezSessionState;
 import org.apache.hadoop.hive.ql.history.HiveHistory;
 import org.apache.hadoop.hive.ql.log.PerfLogger;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
@@ -131,6 +132,8 @@ public class SessionState {
 
   private Map<String, List<String>> localMapRedErrors;
 
+  private final TezSessionState tezSessionState;
+
   /**
    * Lineage state.
    */
@@ -189,6 +192,7 @@ public class SessionState {
     this.conf = conf;
     isSilent = conf.getBoolVar(HiveConf.ConfVars.HIVESESSIONSILENT);
     ls = new LineageState();
+    tezSessionState = new TezSessionState();
     overriddenConfigurations = new HashMap<String, String>();
     overriddenConfigurations.putAll(HiveConf.getConfSystemProperties());
     // if there isn't already a session name, go ahead and create it.
@@ -281,6 +285,16 @@ public class SessionState {
       throw new RuntimeException(e);
     }
 
+    if (HiveConf.getBoolVar(startSs.getConf(), HiveConf.ConfVars.HIVE_OPTIMIZE_TEZ)) {
+      try {
+        startSs.tezSessionState.open(startSs.getSessionId(), startSs.conf);
+      } catch (Exception e) {
+        throw new RuntimeException(e);
+      }
+    } else {
+       LOG.info("No Tez session required at this point. hive.optimize.tez is false.");
+    }
+
     return startSs;
   }
 
@@ -749,6 +763,12 @@ public class SessionState {
     } catch (IOException e) {
       LOG.info("Error removing session resource dir " + resourceDir, e);
     }
+
+    try {
+      tezSessionState.close();
+    } catch (Exception e) {
+      LOG.info("Error closing tez session", e);
+    }
   }
 
   /**
@@ -771,4 +791,8 @@ public class SessionState {
     return perfLogger;
   }
 
+  public TezSessionState getTezSession() {
+    return tezSessionState;
+  }
+
 }