You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by gu...@apache.org on 2014/09/02 21:57:07 UTC

svn commit: r1622108 [27/27] - in /hive/branches/tez: ./ accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/ beeline/src/java/org/apache/hive/beeline/ beeline/src/test/org/apache/hive/beeline/ bin/ bin/ext/ checkstyle/ common/src/java/...

Modified: hive/branches/tez/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java?rev=1622108&r1=1622107&r2=1622108&view=diff
==============================================================================
--- hive/branches/tez/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java (original)
+++ hive/branches/tez/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java Tue Sep  2 19:56:56 2014
@@ -28,6 +28,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
+import org.apache.commons.io.FileUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hive.common.cli.HiveFileProcessor;
@@ -44,14 +45,7 @@ import org.apache.hadoop.hive.ql.process
 import org.apache.hadoop.hive.ql.session.SessionState;
 import org.apache.hive.common.util.HiveVersionInfo;
 import org.apache.hive.service.auth.HiveAuthFactory;
-import org.apache.hive.service.cli.FetchOrientation;
-import org.apache.hive.service.cli.GetInfoType;
-import org.apache.hive.service.cli.GetInfoValue;
-import org.apache.hive.service.cli.HiveSQLException;
-import org.apache.hive.service.cli.OperationHandle;
-import org.apache.hive.service.cli.RowSet;
-import org.apache.hive.service.cli.SessionHandle;
-import org.apache.hive.service.cli.TableSchema;
+import org.apache.hive.service.cli.*;
 import org.apache.hive.service.cli.operation.ExecuteStatementOperation;
 import org.apache.hive.service.cli.operation.GetCatalogsOperation;
 import org.apache.hive.service.cli.operation.GetColumnsOperation;
@@ -60,8 +54,10 @@ import org.apache.hive.service.cli.opera
 import org.apache.hive.service.cli.operation.GetTableTypesOperation;
 import org.apache.hive.service.cli.operation.GetTypeInfoOperation;
 import org.apache.hive.service.cli.operation.MetadataOperation;
+import org.apache.hive.service.cli.operation.Operation;
 import org.apache.hive.service.cli.operation.OperationManager;
 import org.apache.hive.service.cli.thrift.TProtocolVersion;
+import org.apache.hive.service.server.ThreadWithGarbageCleanup;
 
 /**
  * HiveSession
@@ -86,6 +82,10 @@ public class HiveSessionImpl implements 
   private OperationManager operationManager;
   private IMetaStoreClient metastoreClient = null;
   private final Set<OperationHandle> opHandleSet = new HashSet<OperationHandle>();
+  private boolean isOperationLogEnabled;
+  private File sessionLogDir;
+
+  private volatile long lastAccessTime;
 
   public HiveSessionImpl(TProtocolVersion protocol, String username, String password,
       HiveConf serverhiveConf, String ipAddress) {
@@ -95,27 +95,32 @@ public class HiveSessionImpl implements 
     this.hiveConf = new HiveConf(serverhiveConf);
     this.ipAddress = ipAddress;
 
-    // set an explicit session name to control the download directory name
+    // Set an explicit session name to control the download directory name
     hiveConf.set(ConfVars.HIVESESSIONID.varname,
         sessionHandle.getHandleIdentifier().toString());
-    // use thrift transportable formatter
+    // Use thrift transportable formatter
     hiveConf.set(ListSinkOperator.OUTPUT_FORMATTER,
         FetchFormatter.ThriftFormatter.class.getName());
     hiveConf.setInt(ListSinkOperator.OUTPUT_PROTOCOL, protocol.getValue());
 
+    /**
+     * Create a new SessionState object that will be associated with this HiveServer2 session.
+     * When the server executes multiple queries in the same session,
+     * this SessionState object is reused across multiple queries.
+     */
     sessionState = new SessionState(hiveConf, username);
     sessionState.setUserIpAddress(ipAddress);
     sessionState.setIsHiveServerQuery(true);
+
+    lastAccessTime = System.currentTimeMillis();
     SessionState.start(sessionState);
   }
 
   @Override
   public void initialize(Map<String, String> sessionConfMap) throws Exception {
-    //process global init file: .hiverc
+    // Process global init file: .hiverc
     processGlobalInitFile();
-    SessionState.setCurrentSessionState(sessionState);
-
-    //set conf properties specified by user from client side
+    // Set conf properties specified by user from client side
     if (sessionConfMap != null) {
       configureSession(sessionConfMap);
     }
@@ -169,6 +174,7 @@ public class HiveSessionImpl implements 
   }
 
   private void configureSession(Map<String, String> sessionConfMap) throws Exception {
+    SessionState.setCurrentSessionState(sessionState);
     for (Map.Entry<String, String> entry : sessionConfMap.entrySet()) {
       String key = entry.getKey();
       if (key.startsWith("set:")) {
@@ -182,6 +188,34 @@ public class HiveSessionImpl implements 
   }
 
   @Override
+  public void setOperationLogSessionDir(File operationLogRootDir) {
+    sessionLogDir = new File(operationLogRootDir, sessionHandle.getHandleIdentifier().toString());
+    isOperationLogEnabled = true;
+
+    if (!sessionLogDir.exists()) {
+      if (!sessionLogDir.mkdir()) {
+        LOG.warn("Unable to create operation log session directory: " +
+            sessionLogDir.getAbsolutePath());
+        isOperationLogEnabled = false;
+      }
+    }
+
+    if (isOperationLogEnabled) {
+      LOG.info("Operation log session directory is created: " + sessionLogDir.getAbsolutePath());
+    }
+  }
+
+  @Override
+  public boolean isOperationLogEnabled() {
+    return isOperationLogEnabled;
+  }
+
+  @Override
+  public File getOperationLogSessionDir() {
+    return sessionLogDir;
+  }
+
+  @Override
   public TProtocolVersion getProtocolVersion() {
     return sessionHandle.getProtocolVersion();
   }
@@ -210,15 +244,32 @@ public class HiveSessionImpl implements 
     SessionState.start(sessionState);
   }
 
-  protected synchronized void acquire() throws HiveSQLException {
-    // need to make sure that the this connections session state is
-    // stored in the thread local for sessions.
+  protected synchronized void acquire(boolean userAccess) {
+    // Need to make sure that the this HiveServer2's session's session state is
+    // stored in the thread local for the handler thread.
     SessionState.setCurrentSessionState(sessionState);
+    if (userAccess) {
+      lastAccessTime = System.currentTimeMillis();
+    }
   }
 
-  protected synchronized void release() {
-    assert sessionState != null;
+  /**
+   * 1. We'll remove the ThreadLocal SessionState as this thread might now serve
+   * other requests.
+   * 2. We'll cache the ThreadLocal RawStore object for this background thread for an orderly cleanup
+   * when this thread is garbage collected later.
+   * @see org.apache.hive.service.server.ThreadWithGarbageCleanup#finalize()
+   */
+  protected synchronized void release(boolean userAccess) {
     SessionState.detachSession();
+    if (ThreadWithGarbageCleanup.currentThread() instanceof ThreadWithGarbageCleanup) {
+      ThreadWithGarbageCleanup currentThread =
+          (ThreadWithGarbageCleanup) ThreadWithGarbageCleanup.currentThread();
+      currentThread.cacheThreadLocalRawStore();
+    }
+    if (userAccess) {
+      lastAccessTime = System.currentTimeMillis();
+    }
   }
 
   @Override
@@ -257,7 +308,7 @@ public class HiveSessionImpl implements 
   @Override
   public GetInfoValue getInfo(GetInfoType getInfoType)
       throws HiveSQLException {
-    acquire();
+    acquire(true);
     try {
       switch (getInfoType) {
       case CLI_SERVER_NAME:
@@ -277,7 +328,7 @@ public class HiveSessionImpl implements 
         throw new HiveSQLException("Unrecognized GetInfoType value: " + getInfoType.toString());
       }
     } finally {
-      release();
+      release(true);
     }
   }
 
@@ -296,7 +347,7 @@ public class HiveSessionImpl implements 
   private OperationHandle executeStatementInternal(String statement, Map<String, String> confOverlay,
       boolean runAsync)
           throws HiveSQLException {
-    acquire();
+    acquire(true);
 
     OperationManager operationManager = getOperationManager();
     ExecuteStatementOperation operation = operationManager
@@ -314,14 +365,14 @@ public class HiveSessionImpl implements 
       }
       throw e;
     } finally {
-      release();
+      release(true);
     }
   }
 
   @Override
   public OperationHandle getTypeInfo()
       throws HiveSQLException {
-    acquire();
+    acquire(true);
 
     OperationManager operationManager = getOperationManager();
     GetTypeInfoOperation operation = operationManager.newGetTypeInfoOperation(getSession());
@@ -334,14 +385,14 @@ public class HiveSessionImpl implements 
       operationManager.closeOperation(opHandle);
       throw e;
     } finally {
-      release();
+      release(true);
     }
   }
 
   @Override
   public OperationHandle getCatalogs()
       throws HiveSQLException {
-    acquire();
+    acquire(true);
 
     OperationManager operationManager = getOperationManager();
     GetCatalogsOperation operation = operationManager.newGetCatalogsOperation(getSession());
@@ -354,14 +405,14 @@ public class HiveSessionImpl implements 
       operationManager.closeOperation(opHandle);
       throw e;
     } finally {
-      release();
+      release(true);
     }
   }
 
   @Override
   public OperationHandle getSchemas(String catalogName, String schemaName)
       throws HiveSQLException {
-    acquire();
+    acquire(true);
 
     OperationManager operationManager = getOperationManager();
     GetSchemasOperation operation =
@@ -375,7 +426,7 @@ public class HiveSessionImpl implements 
       operationManager.closeOperation(opHandle);
       throw e;
     } finally {
-      release();
+      release(true);
     }
   }
 
@@ -383,7 +434,7 @@ public class HiveSessionImpl implements 
   public OperationHandle getTables(String catalogName, String schemaName, String tableName,
       List<String> tableTypes)
           throws HiveSQLException {
-    acquire();
+    acquire(true);
 
     OperationManager operationManager = getOperationManager();
     MetadataOperation operation =
@@ -397,14 +448,14 @@ public class HiveSessionImpl implements 
       operationManager.closeOperation(opHandle);
       throw e;
     } finally {
-      release();
+      release(true);
     }
   }
 
   @Override
   public OperationHandle getTableTypes()
       throws HiveSQLException {
-    acquire();
+    acquire(true);
 
     OperationManager operationManager = getOperationManager();
     GetTableTypesOperation operation = operationManager.newGetTableTypesOperation(getSession());
@@ -417,14 +468,14 @@ public class HiveSessionImpl implements 
       operationManager.closeOperation(opHandle);
       throw e;
     } finally {
-      release();
+      release(true);
     }
   }
 
   @Override
   public OperationHandle getColumns(String catalogName, String schemaName,
       String tableName, String columnName)  throws HiveSQLException {
-    acquire();
+    acquire(true);
 
     OperationManager operationManager = getOperationManager();
     GetColumnsOperation operation = operationManager.newGetColumnsOperation(getSession(),
@@ -438,14 +489,14 @@ public class HiveSessionImpl implements 
       operationManager.closeOperation(opHandle);
       throw e;
     } finally {
-      release();
+      release(true);
     }
   }
 
   @Override
   public OperationHandle getFunctions(String catalogName, String schemaName, String functionName)
       throws HiveSQLException {
-    acquire();
+    acquire(true);
 
     OperationManager operationManager = getOperationManager();
     GetFunctionsOperation operation = operationManager
@@ -459,16 +510,16 @@ public class HiveSessionImpl implements 
       operationManager.closeOperation(opHandle);
       throw e;
     } finally {
-      release();
+      release(true);
     }
   }
 
   @Override
   public void close() throws HiveSQLException {
     try {
-      acquire();
+      acquire(true);
       /**
-       *  For metadata operations like getTables(), getColumns() etc,
+       * For metadata operations like getTables(), getColumns() etc,
        * the session allocates a private metastore handler which should be
        * closed at the end of the session
        */
@@ -480,6 +531,9 @@ public class HiveSessionImpl implements 
         operationManager.closeOperation(opHandle);
       }
       opHandleSet.clear();
+      // Cleanup session log directory.
+      cleanupSessionLogDir();
+
       HiveHistory hiveHist = sessionState.getHiveHistory();
       if (null != hiveHist) {
         hiveHist.closeStream();
@@ -488,7 +542,17 @@ public class HiveSessionImpl implements 
     } catch (IOException ioe) {
       throw new HiveSQLException("Failure to close", ioe);
     } finally {
-      release();
+      release(true);
+    }
+  }
+
+  private void cleanupSessionLogDir() {
+    if (isOperationLogEnabled) {
+      try {
+        FileUtils.forceDelete(sessionLogDir);
+      } catch (Exception e) {
+        LOG.error("Failed to cleanup session log dir: " + sessionHandle, e);
+      }
     }
   }
 
@@ -508,55 +572,79 @@ public class HiveSessionImpl implements 
   }
 
   @Override
+  public long getLastAccessTime() {
+    return lastAccessTime;
+  }
+
+  @Override
+  public void closeExpiredOperations() {
+    OperationHandle[] handles = opHandleSet.toArray(new OperationHandle[opHandleSet.size()]);
+    if (handles.length > 0) {
+      List<Operation> operations = operationManager.removeExpiredOperations(handles);
+      if (!operations.isEmpty()) {
+        closeTimedOutOperations(operations);
+      }
+    }
+  }
+
+  private void closeTimedOutOperations(List<Operation> operations) {
+    acquire(false);
+    try {
+      for (Operation operation : operations) {
+        opHandleSet.remove(operation.getHandle());
+        try {
+          operation.close();
+        } catch (Exception e) {
+          LOG.warn("Exception is thrown closing timed-out operation " + operation.getHandle(), e);
+        }
+      }
+    } finally {
+      release(false);
+    }
+  }
+
+  @Override
   public void cancelOperation(OperationHandle opHandle) throws HiveSQLException {
-    acquire();
+    acquire(true);
     try {
       sessionManager.getOperationManager().cancelOperation(opHandle);
     } finally {
-      release();
+      release(true);
     }
   }
 
   @Override
   public void closeOperation(OperationHandle opHandle) throws HiveSQLException {
-    acquire();
+    acquire(true);
     try {
       operationManager.closeOperation(opHandle);
       opHandleSet.remove(opHandle);
     } finally {
-      release();
+      release(true);
     }
   }
 
   @Override
   public TableSchema getResultSetMetadata(OperationHandle opHandle) throws HiveSQLException {
-    acquire();
+    acquire(true);
     try {
       return sessionManager.getOperationManager().getOperationResultSetSchema(opHandle);
     } finally {
-      release();
+      release(true);
     }
   }
 
   @Override
-  public RowSet fetchResults(OperationHandle opHandle, FetchOrientation orientation, long maxRows)
-      throws HiveSQLException {
-    acquire();
+  public RowSet fetchResults(OperationHandle opHandle, FetchOrientation orientation,
+      long maxRows, FetchType fetchType) throws HiveSQLException {
+    acquire(true);
     try {
-      return sessionManager.getOperationManager()
-          .getOperationNextRowSet(opHandle, orientation, maxRows);
-    } finally {
-      release();
-    }
-  }
-
-  @Override
-  public RowSet fetchResults(OperationHandle opHandle) throws HiveSQLException {
-    acquire();
-    try {
-      return sessionManager.getOperationManager().getOperationNextRowSet(opHandle);
+      if (fetchType == FetchType.QUERY_OUTPUT) {
+        return operationManager.getOperationNextRowSet(opHandle, orientation, maxRows);
+      }
+      return operationManager.getOperationLogRowSet(opHandle, orientation, maxRows);
     } finally {
-      release();
+      release(true);
     }
   }
 

Modified: hive/branches/tez/service/src/java/org/apache/hive/service/cli/session/HiveSessionImplwithUGI.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/service/src/java/org/apache/hive/service/cli/session/HiveSessionImplwithUGI.java?rev=1622108&r1=1622107&r2=1622108&view=diff
==============================================================================
--- hive/branches/tez/service/src/java/org/apache/hive/service/cli/session/HiveSessionImplwithUGI.java (original)
+++ hive/branches/tez/service/src/java/org/apache/hive/service/cli/session/HiveSessionImplwithUGI.java Tue Sep  2 19:56:56 2014
@@ -19,7 +19,6 @@
 package org.apache.hive.service.cli.session;
 
 import java.io.IOException;
-import java.util.Map;
 
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.metadata.Hive;
@@ -48,6 +47,14 @@ public class HiveSessionImplwithUGI exte
     super(protocol, username, password, hiveConf, ipAddress);
     setSessionUGI(username);
     setDelegationToken(delegationToken);
+
+    // create a new metastore connection for this particular user session
+    Hive.set(null);
+    try {
+      sessionHive = Hive.get(getHiveConf());
+    } catch (HiveException e) {
+      throw new HiveSQLException("Failed to setup metastore connection", e);
+    }
   }
 
   // setup appropriate UGI for the session
@@ -75,8 +82,8 @@ public class HiveSessionImplwithUGI exte
   }
 
   @Override
-  protected synchronized void acquire() throws HiveSQLException {
-    super.acquire();
+  protected synchronized void acquire(boolean userAccess) {
+    super.acquire(userAccess);
     // if we have a metastore connection with impersonation, then set it first
     if (sessionHive != null) {
       Hive.set(sessionHive);
@@ -90,11 +97,11 @@ public class HiveSessionImplwithUGI exte
   @Override
   public void close() throws HiveSQLException {
     try {
-    acquire();
+    acquire(true);
     ShimLoader.getHadoopShims().closeAllForUGI(sessionUgi);
     cancelDelegationToken();
     } finally {
-      release();
+      release(true);
       super.close();
     }
   }
@@ -115,13 +122,6 @@ public class HiveSessionImplwithUGI exte
       } catch (IOException e) {
         throw new HiveSQLException("Couldn't setup delegation token in the ugi", e);
       }
-      // create a new metastore connection using the delegation token
-      Hive.set(null);
-      try {
-        sessionHive = Hive.get(getHiveConf());
-      } catch (HiveException e) {
-        throw new HiveSQLException("Failed to setup metastore connection", e);
-      }
     }
   }
 

Modified: hive/branches/tez/service/src/java/org/apache/hive/service/cli/session/SessionManager.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/service/src/java/org/apache/hive/service/cli/session/SessionManager.java?rev=1622108&r1=1622107&r2=1622108&view=diff
==============================================================================
--- hive/branches/tez/service/src/java/org/apache/hive/service/cli/session/SessionManager.java (original)
+++ hive/branches/tez/service/src/java/org/apache/hive/service/cli/session/SessionManager.java Tue Sep  2 19:56:56 2014
@@ -18,6 +18,10 @@
 
 package org.apache.hive.service.cli.session;
 
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Date;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
@@ -26,6 +30,7 @@ import java.util.concurrent.LinkedBlocki
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.commons.io.FileUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hive.conf.HiveConf;
@@ -38,6 +43,7 @@ import org.apache.hive.service.cli.HiveS
 import org.apache.hive.service.cli.SessionHandle;
 import org.apache.hive.service.cli.operation.OperationManager;
 import org.apache.hive.service.cli.thrift.TProtocolVersion;
+import org.apache.hive.service.server.ThreadFactoryWithGarbageCleanup;
 
 /**
  * SessionManager.
@@ -52,6 +58,13 @@ public class SessionManager extends Comp
       new ConcurrentHashMap<SessionHandle, HiveSession>();
   private final OperationManager operationManager = new OperationManager();
   private ThreadPoolExecutor backgroundOperationPool;
+  private boolean isOperationLogEnabled;
+  private File operationLogRootDir;
+
+  private long checkInterval;
+  private long sessionTimeout;
+
+  private volatile boolean shutdown;
 
   public SessionManager() {
     super("SessionManager");
@@ -64,24 +77,41 @@ public class SessionManager extends Comp
     } catch (HiveException e) {
       throw new RuntimeException("Error applying authorization policy on hive configuration", e);
     }
-
     this.hiveConf = hiveConf;
-    int backgroundPoolSize = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_ASYNC_EXEC_THREADS);
-    LOG.info("HiveServer2: Async execution thread pool size: " + backgroundPoolSize);
-    int backgroundPoolQueueSize = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_ASYNC_EXEC_WAIT_QUEUE_SIZE);
-    LOG.info("HiveServer2: Async execution wait queue size: " + backgroundPoolQueueSize);
-    int keepAliveTime = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_ASYNC_EXEC_KEEPALIVE_TIME);
-    LOG.info("HiveServer2: Async execution thread keepalive time: " + keepAliveTime);
-    // Create a thread pool with #backgroundPoolSize threads
-    // Threads terminate when they are idle for more than the keepAliveTime
-    // An bounded blocking queue is used to queue incoming operations, if #operations > backgroundPoolSize
-    backgroundOperationPool = new ThreadPoolExecutor(backgroundPoolSize, backgroundPoolSize,
-        keepAliveTime, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(backgroundPoolQueueSize));
-    backgroundOperationPool.allowCoreThreadTimeOut(true);
+    //Create operation log root directory, if operation logging is enabled
+    if (hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_LOGGING_OPERATION_ENABLED)) {
+      initOperationLogRootDir();
+    }
+    createBackgroundOperationPool();
     addService(operationManager);
     super.init(hiveConf);
   }
 
+  private void createBackgroundOperationPool() {
+    int poolSize = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_ASYNC_EXEC_THREADS);
+    LOG.info("HiveServer2: Background operation thread pool size: " + poolSize);
+    int poolQueueSize = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_ASYNC_EXEC_WAIT_QUEUE_SIZE);
+    LOG.info("HiveServer2: Background operation thread wait queue size: " + poolQueueSize);
+    long keepAliveTime = HiveConf.getTimeVar(
+        hiveConf, ConfVars.HIVE_SERVER2_ASYNC_EXEC_KEEPALIVE_TIME, TimeUnit.SECONDS);
+    LOG.info(
+        "HiveServer2: Background operation thread keepalive time: " + keepAliveTime + " seconds");
+
+    // Create a thread pool with #poolSize threads
+    // Threads terminate when they are idle for more than the keepAliveTime
+    // A bounded blocking queue is used to queue incoming operations, if #operations > poolSize
+    String threadPoolName = "HiveServer2-Background-Pool";
+    backgroundOperationPool = new ThreadPoolExecutor(poolSize, poolSize,
+        keepAliveTime, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(poolQueueSize),
+        new ThreadFactoryWithGarbageCleanup(threadPoolName));
+    backgroundOperationPool.allowCoreThreadTimeOut(true);
+
+    checkInterval = HiveConf.getTimeVar(
+        hiveConf, ConfVars.HIVE_SERVER2_SESSION_CHECK_INTERVAL, TimeUnit.MILLISECONDS);
+    sessionTimeout = HiveConf.getTimeVar(
+        hiveConf, ConfVars.HIVE_SERVER2_IDLE_SESSION_TIMEOUT, TimeUnit.MILLISECONDS);
+  }
+
   private void applyAuthorizationConfigPolicy(HiveConf newHiveConf) throws HiveException {
     // authorization setup using SessionState should be revisited eventually, as
     // authorization and authentication are not session specific settings
@@ -91,23 +121,106 @@ public class SessionManager extends Comp
     ss.applyAuthorizationPolicy();
   }
 
+  private void initOperationLogRootDir() {
+    operationLogRootDir = new File(
+        hiveConf.getVar(ConfVars.HIVE_SERVER2_LOGGING_OPERATION_LOG_LOCATION));
+    isOperationLogEnabled = true;
+
+    if (operationLogRootDir.exists() && !operationLogRootDir.isDirectory()) {
+      LOG.warn("The operation log root directory exists, but it is not a directory: " +
+          operationLogRootDir.getAbsolutePath());
+      isOperationLogEnabled = false;
+    }
+
+    if (!operationLogRootDir.exists()) {
+      if (!operationLogRootDir.mkdirs()) {
+        LOG.warn("Unable to create operation log root directory: " +
+            operationLogRootDir.getAbsolutePath());
+        isOperationLogEnabled = false;
+      }
+    }
+
+    if (isOperationLogEnabled) {
+      LOG.info("Operation log root directory is created: " + operationLogRootDir.getAbsolutePath());
+      try {
+        FileUtils.forceDeleteOnExit(operationLogRootDir);
+      } catch (IOException e) {
+        LOG.warn("Failed to schedule cleanup HS2 operation logging root dir: " +
+            operationLogRootDir.getAbsolutePath(), e);
+      }
+    }
+  }
+
   @Override
   public synchronized void start() {
     super.start();
+    if (checkInterval > 0) {
+      startTimeoutChecker();
+    }
+  }
+
+  private void startTimeoutChecker() {
+    final long interval = Math.max(checkInterval, 3000l);  // minimum 3 seconds
+    Runnable timeoutChecker = new Runnable() {
+      @Override
+      public void run() {
+        for (sleepInterval(interval); !shutdown; sleepInterval(interval)) {
+          long current = System.currentTimeMillis();
+          for (HiveSession session : new ArrayList<HiveSession>(handleToSession.values())) {
+            if (sessionTimeout > 0 && session.getLastAccessTime() + sessionTimeout <= current) {
+              SessionHandle handle = session.getSessionHandle();
+              LOG.warn("Session " + handle + " is Timed-out (last access : " +
+                  new Date(session.getLastAccessTime()) + ") and will be closed");
+              try {
+                closeSession(handle);
+              } catch (HiveSQLException e) {
+                LOG.warn("Exception is thrown closing session " + handle, e);
+              }
+            } else {
+              session.closeExpiredOperations();
+            }
+          }
+        }
+      }
+
+      private void sleepInterval(long interval) {
+        try {
+          Thread.sleep(interval);
+        } catch (InterruptedException e) {
+          // ignore
+        }
+      }
+    };
+    backgroundOperationPool.execute(timeoutChecker);
   }
 
   @Override
   public synchronized void stop() {
     super.stop();
+    shutdown = true;
     if (backgroundOperationPool != null) {
       backgroundOperationPool.shutdown();
-      int timeout = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_ASYNC_EXEC_SHUTDOWN_TIMEOUT);
+      long timeout = hiveConf.getTimeVar(
+          ConfVars.HIVE_SERVER2_ASYNC_EXEC_SHUTDOWN_TIMEOUT, TimeUnit.SECONDS);
       try {
         backgroundOperationPool.awaitTermination(timeout, TimeUnit.SECONDS);
       } catch (InterruptedException e) {
         LOG.warn("HIVE_SERVER2_ASYNC_EXEC_SHUTDOWN_TIMEOUT = " + timeout +
             " seconds has been exceeded. RUNNING background operations will be shut down", e);
       }
+      backgroundOperationPool = null;
+    }
+    cleanupLoggingRootDir();
+  }
+
+  private void cleanupLoggingRootDir() {
+    if (isOperationLogEnabled) {
+      try {
+        FileUtils.forceDelete(operationLogRootDir);
+      } catch (Exception e) {
+        LOG.warn("Failed to cleanup root dir of HS2 logging: " + operationLogRootDir
+            .getAbsolutePath(), e);
+      }
     }
   }
 
@@ -132,6 +245,9 @@ public class SessionManager extends Comp
     session.setOperationManager(operationManager);
     try {
       session.initialize(sessionConf);
+      if (isOperationLogEnabled) {
+        session.setOperationLogSessionDir(operationLogRootDir);
+      }
       session.open();
     } catch (Exception e) {
       throw new HiveSQLException("Failed to open new session", e);

Modified: hive/branches/tez/service/src/java/org/apache/hive/service/cli/thrift/ThriftBinaryCLIService.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/service/src/java/org/apache/hive/service/cli/thrift/ThriftBinaryCLIService.java?rev=1622108&r1=1622107&r2=1622108&view=diff
==============================================================================
--- hive/branches/tez/service/src/java/org/apache/hive/service/cli/thrift/ThriftBinaryCLIService.java (original)
+++ hive/branches/tez/service/src/java/org/apache/hive/service/cli/thrift/ThriftBinaryCLIService.java Tue Sep  2 19:56:56 2014
@@ -19,12 +19,17 @@
 package org.apache.hive.service.cli.thrift;
 
 import java.net.InetSocketAddress;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
 import org.apache.hadoop.hive.shims.ShimLoader;
 import org.apache.hive.service.auth.HiveAuthFactory;
 import org.apache.hive.service.cli.CLIService;
+import org.apache.hive.service.server.ThreadFactoryWithGarbageCleanup;
 import org.apache.thrift.TProcessorFactory;
 import org.apache.thrift.protocol.TBinaryProtocol;
 import org.apache.thrift.server.TThreadPoolServer;
@@ -65,6 +70,12 @@ public class ThriftBinaryCLIService exte
 
       minWorkerThreads = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_THRIFT_MIN_WORKER_THREADS);
       maxWorkerThreads = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_THRIFT_MAX_WORKER_THREADS);
+      workerKeepAliveTime = hiveConf.getTimeVar(
+          ConfVars.HIVE_SERVER2_THRIFT_WORKER_KEEPALIVE_TIME, TimeUnit.SECONDS);
+      String threadPoolName = "HiveServer2-Handler-Pool";
+      ExecutorService executorService = new ThreadPoolExecutor(minWorkerThreads, maxWorkerThreads,
+          workerKeepAliveTime, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(),
+          new ThreadFactoryWithGarbageCleanup(threadPoolName));
 
       TServerSocket serverSocket = null;
       if (!hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_USE_SSL)) {
@@ -84,8 +95,7 @@ public class ThriftBinaryCLIService exte
       .processorFactory(processorFactory)
       .transportFactory(transportFactory)
       .protocolFactory(new TBinaryProtocol.Factory())
-      .minWorkerThreads(minWorkerThreads)
-      .maxWorkerThreads(maxWorkerThreads);
+      .executorService(executorService);
 
       server = new TThreadPoolServer(sargs);
 

Modified: hive/branches/tez/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java?rev=1622108&r1=1622107&r2=1622108&view=diff
==============================================================================
--- hive/branches/tez/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java (original)
+++ hive/branches/tez/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java Tue Sep  2 19:56:56 2014
@@ -29,20 +29,10 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
-import org.apache.hadoop.hive.ql.session.SessionState;
 import org.apache.hive.service.AbstractService;
 import org.apache.hive.service.auth.HiveAuthFactory;
 import org.apache.hive.service.auth.TSetIpAddressProcessor;
-import org.apache.hive.service.cli.CLIService;
-import org.apache.hive.service.cli.FetchOrientation;
-import org.apache.hive.service.cli.GetInfoType;
-import org.apache.hive.service.cli.GetInfoValue;
-import org.apache.hive.service.cli.HiveSQLException;
-import org.apache.hive.service.cli.OperationHandle;
-import org.apache.hive.service.cli.OperationStatus;
-import org.apache.hive.service.cli.RowSet;
-import org.apache.hive.service.cli.SessionHandle;
-import org.apache.hive.service.cli.TableSchema;
+import org.apache.hive.service.cli.*;
 import org.apache.hive.service.cli.session.SessionManager;
 import org.apache.thrift.TException;
 import org.apache.thrift.server.TServer;
@@ -71,6 +61,7 @@ public abstract class ThriftCLIService e
 
   protected int minWorkerThreads;
   protected int maxWorkerThreads;
+  protected long workerKeepAliveTime;
 
   protected static HiveAuthFactory hiveAuthFactory;
 
@@ -242,7 +233,9 @@ public abstract class ThriftCLIService e
     if (userName == null) {
       userName = req.getUsername();
     }
-    return getProxyUser(userName, req.getConfiguration(), getIpAddress());
+    String effectiveClientUser = getProxyUser(userName, req.getConfiguration(), getIpAddress());
+    LOG.debug("Client's username: " + effectiveClientUser);
+    return effectiveClientUser;
   }
 
   /**
@@ -532,7 +525,8 @@ public abstract class ThriftCLIService e
       RowSet rowSet = cliService.fetchResults(
           new OperationHandle(req.getOperationHandle()),
           FetchOrientation.getFetchOrientation(req.getOrientation()),
-          req.getMaxRows());
+          req.getMaxRows(),
+          FetchType.getFetchType(req.getFetchType()));
       resp.setResults(rowSet.toTRowSet());
       resp.setHasMoreRows(false);
       resp.setStatus(OK_STATUS);

Modified: hive/branches/tez/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIServiceClient.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIServiceClient.java?rev=1622108&r1=1622107&r2=1622108&view=diff
==============================================================================
--- hive/branches/tez/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIServiceClient.java (original)
+++ hive/branches/tez/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIServiceClient.java Tue Sep  2 19:56:56 2014
@@ -22,18 +22,7 @@ import java.util.List;
 import java.util.Map;
 
 import org.apache.hive.service.auth.HiveAuthFactory;
-import org.apache.hive.service.cli.CLIServiceClient;
-import org.apache.hive.service.cli.FetchOrientation;
-import org.apache.hive.service.cli.GetInfoType;
-import org.apache.hive.service.cli.GetInfoValue;
-import org.apache.hive.service.cli.HiveSQLException;
-import org.apache.hive.service.cli.OperationHandle;
-import org.apache.hive.service.cli.OperationState;
-import org.apache.hive.service.cli.OperationStatus;
-import org.apache.hive.service.cli.RowSet;
-import org.apache.hive.service.cli.RowSetFactory;
-import org.apache.hive.service.cli.SessionHandle;
-import org.apache.hive.service.cli.TableSchema;
+import org.apache.hive.service.cli.*;
 import org.apache.thrift.TException;
 
 /**
@@ -377,17 +366,15 @@ public class ThriftCLIServiceClient exte
     }
   }
 
-  /* (non-Javadoc)
-   * @see org.apache.hive.service.cli.ICLIService#fetchResults(org.apache.hive.service.cli.OperationHandle, org.apache.hive.service.cli.FetchOrientation, long)
-   */
   @Override
-  public RowSet fetchResults(OperationHandle opHandle, FetchOrientation orientation, long maxRows)
-      throws HiveSQLException {
+  public RowSet fetchResults(OperationHandle opHandle, FetchOrientation orientation, long maxRows,
+      FetchType fetchType) throws HiveSQLException {
     try {
       TFetchResultsReq req = new TFetchResultsReq();
       req.setOperationHandle(opHandle.toTOperationHandle());
       req.setOrientation(orientation.toTFetchOrientation());
       req.setMaxRows(maxRows);
+      req.setFetchType(fetchType.toTFetchType());
       TFetchResultsResp resp = cliService.FetchResults(req);
       checkStatus(resp.getStatus());
       return RowSetFactory.create(resp.getResults(), opHandle.getProtocolVersion());
@@ -404,7 +391,7 @@ public class ThriftCLIServiceClient exte
   @Override
   public RowSet fetchResults(OperationHandle opHandle) throws HiveSQLException {
     // TODO: set the correct default fetch size
-    return fetchResults(opHandle, FetchOrientation.FETCH_NEXT, 10000);
+    return fetchResults(opHandle, FetchOrientation.FETCH_NEXT, 10000, FetchType.QUERY_OUTPUT);
   }
 
   @Override

Modified: hive/branches/tez/service/src/java/org/apache/hive/service/cli/thrift/ThriftHttpCLIService.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/service/src/java/org/apache/hive/service/cli/thrift/ThriftHttpCLIService.java?rev=1622108&r1=1622107&r2=1622108&view=diff
==============================================================================
--- hive/branches/tez/service/src/java/org/apache/hive/service/cli/thrift/ThriftHttpCLIService.java (original)
+++ hive/branches/tez/service/src/java/org/apache/hive/service/cli/thrift/ThriftHttpCLIService.java Tue Sep  2 19:56:56 2014
@@ -18,6 +18,11 @@
 
 package org.apache.hive.service.cli.thrift;
 
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
 import org.apache.hadoop.hive.shims.ShimLoader;
@@ -26,6 +31,7 @@ import org.apache.hadoop.util.Shell;
 import org.apache.hive.service.auth.HiveAuthFactory;
 import org.apache.hive.service.auth.HiveAuthFactory.AuthTypes;
 import org.apache.hive.service.cli.CLIService;
+import org.apache.hive.service.server.ThreadFactoryWithGarbageCleanup;
 import org.apache.thrift.TProcessor;
 import org.apache.thrift.TProcessorFactory;
 import org.apache.thrift.protocol.TBinaryProtocol;
@@ -36,7 +42,7 @@ import org.eclipse.jetty.server.ssl.SslS
 import org.eclipse.jetty.servlet.ServletContextHandler;
 import org.eclipse.jetty.servlet.ServletHolder;
 import org.eclipse.jetty.util.ssl.SslContextFactory;
-import org.eclipse.jetty.util.thread.QueuedThreadPool;
+import org.eclipse.jetty.util.thread.ExecutorThreadPool;
 
 
 public class ThriftHttpCLIService extends ThriftCLIService {
@@ -63,13 +69,18 @@ public class ThriftHttpCLIService extend
 
       minWorkerThreads = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_THRIFT_HTTP_MIN_WORKER_THREADS);
       maxWorkerThreads = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_THRIFT_HTTP_MAX_WORKER_THREADS);
+      workerKeepAliveTime = hiveConf.getTimeVar(
+          ConfVars.HIVE_SERVER2_THRIFT_HTTP_WORKER_KEEPALIVE_TIME, TimeUnit.SECONDS);
 
       String httpPath =  getHttpPath(hiveConf.getVar(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_HTTP_PATH));
 
       httpServer = new org.eclipse.jetty.server.Server();
-      QueuedThreadPool threadPool = new QueuedThreadPool();
-      threadPool.setMinThreads(minWorkerThreads);
-      threadPool.setMaxThreads(maxWorkerThreads);
+      String threadPoolName = "HiveServer2-HttpHandler-Pool";
+      ExecutorService executorService = new ThreadPoolExecutor(minWorkerThreads, maxWorkerThreads,
+          workerKeepAliveTime, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(),
+          new ThreadFactoryWithGarbageCleanup(threadPoolName));
+
+      ExecutorThreadPool threadPool = new ExecutorThreadPool(executorService);
       httpServer.setThreadPool(threadPool);
 
       SelectChannelConnector connector = new SelectChannelConnector();;
@@ -100,7 +111,8 @@ public class ThriftHttpCLIService extend
       // Linux:yes, Windows:no
       connector.setReuseAddress(!Shell.WINDOWS);
       
-      int maxIdleTime = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_THRIFT_HTTP_MAX_IDLE_TIME);
+      int maxIdleTime = (int) hiveConf.getTimeVar(
+          ConfVars.HIVE_SERVER2_THRIFT_HTTP_MAX_IDLE_TIME, TimeUnit.MILLISECONDS);
       connector.setMaxIdleTime(maxIdleTime);
       
       httpServer.addConnector(connector);

Modified: hive/branches/tez/service/src/test/org/apache/hive/service/cli/CLIServiceTest.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/service/src/test/org/apache/hive/service/cli/CLIServiceTest.java?rev=1622108&r1=1622107&r2=1622108&view=diff
==============================================================================
--- hive/branches/tez/service/src/test/org/apache/hive/service/cli/CLIServiceTest.java (original)
+++ hive/branches/tez/service/src/test/org/apache/hive/service/cli/CLIServiceTest.java Tue Sep  2 19:56:56 2014
@@ -26,9 +26,9 @@ import static org.junit.Assert.fail;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -202,7 +202,8 @@ public abstract class CLIServiceTest {
      * to give a compile time error.
      * (compilation is done synchronous as of now)
      */
-    longPollingTimeout = new HiveConf().getLongVar(ConfVars.HIVE_SERVER2_LONG_POLLING_TIMEOUT);
+    longPollingTimeout = HiveConf.getTimeVar(new HiveConf(),
+        HiveConf.ConfVars.HIVE_SERVER2_LONG_POLLING_TIMEOUT, TimeUnit.MILLISECONDS);
     queryString = "SELECT NON_EXISTING_COLUMN FROM " + tableName;
     try {
       runQueryAsync(sessionHandle, queryString, confOverlay, OperationState.ERROR, longPollingTimeout);
@@ -295,7 +296,7 @@ public abstract class CLIServiceTest {
     long longPollingTimeDelta;
     OperationStatus opStatus = null;
     OperationState state = null;
-    confOverlay.put(HiveConf.ConfVars.HIVE_SERVER2_LONG_POLLING_TIMEOUT.varname, String.valueOf(longPollingTimeout));
+    confOverlay.put(HiveConf.ConfVars.HIVE_SERVER2_LONG_POLLING_TIMEOUT.varname, longPollingTimeout + "ms");
     OperationHandle opHandle = client.executeStatementAsync(sessionHandle, queryString, confOverlay);
     int count = 0;
     while (true) {