You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by na...@apache.org on 2014/09/02 06:41:31 UTC

svn commit: r1621912 [2/2] - in /hive/trunk: common/src/java/org/apache/hadoop/hive/conf/ hcatalog/core/src/test/java/org/apache/hive/hcatalog/cli/ hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/ itests/hive-unit/src/test/java/org/apach...

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java?rev=1621912&r1=1621911&r2=1621912&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java Tue Sep  2 04:41:29 2014
@@ -43,6 +43,7 @@ import java.io.IOException;
 import java.security.PrivilegedExceptionAction;
 import java.util.List;
 import java.util.Set;
+import java.util.concurrent.TimeUnit;
 
 /**
  * A class to initiate compactions.  This will run in a separate thread.
@@ -50,7 +51,6 @@ import java.util.Set;
 public class Initiator extends CompactorThread {
   static final private String CLASS_NAME = Initiator.class.getName();
   static final private Log LOG = LogFactory.getLog(CLASS_NAME);
-  static final private int threadId = 10000;
 
   static final private String NO_COMPACTION = "NO_AUTO_COMPACTION";
 
@@ -63,7 +63,7 @@ public class Initiator extends Compactor
     try {
       recoverFailedCompactions(false);
 
-      int abortedThreashold = HiveConf.getIntVar(conf,
+      int abortedThreshold = HiveConf.getIntVar(conf,
           HiveConf.ConfVars.HIVE_COMPACTOR_ABORTEDTXN_THRESHOLD);
 
       // Make sure we run through the loop once before checking to stop as this makes testing
@@ -77,7 +77,7 @@ public class Initiator extends Compactor
         try {
           ShowCompactResponse currentCompactions = txnHandler.showCompact(new ShowCompactRequest());
           ValidTxnList txns = TxnHandler.createValidTxnList(txnHandler.getOpenTxns());
-          Set<CompactionInfo> potentials = txnHandler.findPotentialCompactions(abortedThreashold);
+          Set<CompactionInfo> potentials = txnHandler.findPotentialCompactions(abortedThreshold);
           LOG.debug("Found " + potentials.size() + " potential compactions, " +
               "checking to see if we should compact any of them");
           for (CompactionInfo ci : potentials) {
@@ -140,13 +140,13 @@ public class Initiator extends Compactor
   public void init(BooleanPointer stop) throws MetaException {
     super.init(stop);
     checkInterval =
-        HiveConf.getLongVar(conf, HiveConf.ConfVars.HIVE_COMPACTOR_CHECK_INTERVAL) * 1000;
+        conf.getTimeVar(HiveConf.ConfVars.HIVE_COMPACTOR_CHECK_INTERVAL, TimeUnit.MILLISECONDS) ;
   }
 
   private void recoverFailedCompactions(boolean remoteOnly) throws MetaException {
     if (!remoteOnly) txnHandler.revokeFromLocalWorkers(Worker.hostname());
-    txnHandler.revokeTimedoutWorkers(HiveConf.getLongVar(conf,
-        HiveConf.ConfVars.HIVE_COMPACTOR_WORKER_TIMEOUT));
+    txnHandler.revokeTimedoutWorkers(HiveConf.getTimeVar(conf,
+        HiveConf.ConfVars.HIVE_COMPACTOR_WORKER_TIMEOUT, TimeUnit.MILLISECONDS));
   }
 
   // Figure out if there are any currently running compactions on the same table or partition.

Modified: hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java?rev=1621912&r1=1621911&r2=1621912&view=diff
==============================================================================
--- hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java (original)
+++ hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java Tue Sep  2 04:41:29 2014
@@ -30,6 +30,7 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.TimeUnit;
 
 /**
  * Tests for the compactor Initiator thread.
@@ -89,7 +90,7 @@ public class TestInitiator extends Compa
     txnHandler.findNextToCompact("nosuchhost-193892");
 
     HiveConf conf = new HiveConf();
-    HiveConf.setLongVar(conf, HiveConf.ConfVars.HIVE_COMPACTOR_WORKER_TIMEOUT, 1L);
+    conf.setTimeVar(HiveConf.ConfVars.HIVE_COMPACTOR_WORKER_TIMEOUT, 1L, TimeUnit.MILLISECONDS);
 
     startInitiator(conf);
 

Modified: hive/trunk/ql/src/test/results/clientpositive/show_conf.q.out
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/results/clientpositive/show_conf.q.out?rev=1621912&r1=1621911&r2=1621912&view=diff
==============================================================================
--- hive/trunk/ql/src/test/results/clientpositive/show_conf.q.out (original)
+++ hive/trunk/ql/src/test/results/clientpositive/show_conf.q.out Tue Sep  2 04:41:29 2014
@@ -7,4 +7,4 @@ PREHOOK: query: show conf "hive.stats.re
 PREHOOK: type: SHOWCONF
 POSTHOOK: query: show conf "hive.stats.retries.wait"
 POSTHOOK: type: SHOWCONF
-3000	INT	The base waiting window (in milliseconds) before the next retry. The actual wait time is calculated by baseWindow * failures baseWindow * (failure  1) * (random number between [0.0,1.0]).
+3000ms	STRING(TIME)	Expects a time value with unit (d/day, h/hour, m/min, s/sec, ms/msec, us/usec, ns/nsec), which is msec if not specified. The base waiting window before the next retry. The actual wait time is calculated by baseWindow * failures baseWindow * (failure + 1) * (random number between [0.0,1.0]).

Modified: hive/trunk/service/src/java/org/apache/hadoop/hive/service/HiveServer.java
URL: http://svn.apache.org/viewvc/hive/trunk/service/src/java/org/apache/hadoop/hive/service/HiveServer.java?rev=1621912&r1=1621911&r2=1621912&view=diff
==============================================================================
--- hive/trunk/service/src/java/org/apache/hadoop/hive/service/HiveServer.java (original)
+++ hive/trunk/service/src/java/org/apache/hadoop/hive/service/HiveServer.java Tue Sep  2 04:41:29 2014
@@ -30,6 +30,7 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.commons.cli.OptionBuilder;
 import org.apache.commons.logging.Log;
@@ -62,8 +63,6 @@ import org.apache.thrift.transport.TServ
 import org.apache.thrift.transport.TServerTransport;
 import org.apache.thrift.transport.TTransport;
 import org.apache.thrift.transport.TTransportFactory;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
 import com.facebook.fb303.fb_status;
 
 /**
@@ -670,8 +669,11 @@ public class HiveServer extends ThriftHi
 
 
       boolean tcpKeepAlive = conf.getBoolVar(HiveConf.ConfVars.SERVER_TCP_KEEP_ALIVE);
+      int timeout = (int) HiveConf.getTimeVar(
+          conf, HiveConf.ConfVars.SERVER_READ_SOCKET_TIMEOUT, TimeUnit.MILLISECONDS);
 
-      TServerTransport serverTransport = tcpKeepAlive ? new TServerSocketKeepAlive(cli.port) : new TServerSocket(cli.port, 1000 * conf.getIntVar(HiveConf.ConfVars.SERVER_READ_SOCKET_TIMEOUT));
+      TServerTransport serverTransport =
+          tcpKeepAlive ? new TServerSocketKeepAlive(cli.port) : new TServerSocket(cli.port, timeout);
 
       // set all properties specified on the command line
       for (Map.Entry<Object, Object> item : hiveconf.entrySet()) {

Modified: hive/trunk/service/src/java/org/apache/hive/service/cli/CLIService.java
URL: http://svn.apache.org/viewvc/hive/trunk/service/src/java/org/apache/hive/service/cli/CLIService.java?rev=1621912&r1=1621911&r2=1621912&view=diff
==============================================================================
--- hive/trunk/service/src/java/org/apache/hive/service/cli/CLIService.java (original)
+++ hive/trunk/service/src/java/org/apache/hive/service/cli/CLIService.java Tue Sep  2 04:41:29 2014
@@ -362,8 +362,9 @@ public class CLIService extends Composit
      * However, if the background operation is complete, we return immediately.
      */
     if (operation.shouldRunAsync()) {
-      long timeout = operation.getParentSession().getHiveConf().getLongVar(
-          HiveConf.ConfVars.HIVE_SERVER2_LONG_POLLING_TIMEOUT);
+      HiveConf conf = operation.getParentSession().getHiveConf();
+      long timeout = HiveConf.getTimeVar(conf,
+          HiveConf.ConfVars.HIVE_SERVER2_LONG_POLLING_TIMEOUT, TimeUnit.MILLISECONDS);
       try {
         operation.getBackgroundHandle().get(timeout, TimeUnit.MILLISECONDS);
       } catch (TimeoutException e) {

Modified: hive/trunk/service/src/java/org/apache/hive/service/cli/OperationState.java
URL: http://svn.apache.org/viewvc/hive/trunk/service/src/java/org/apache/hive/service/cli/OperationState.java?rev=1621912&r1=1621911&r2=1621912&view=diff
==============================================================================
--- hive/trunk/service/src/java/org/apache/hive/service/cli/OperationState.java (original)
+++ hive/trunk/service/src/java/org/apache/hive/service/cli/OperationState.java Tue Sep  2 04:41:29 2014
@@ -25,29 +25,26 @@ import org.apache.hive.service.cli.thrif
  *
  */
 public enum OperationState {
-  INITIALIZED(TOperationState.INITIALIZED_STATE),
-  RUNNING(TOperationState.RUNNING_STATE),
-  FINISHED(TOperationState.FINISHED_STATE),
-  CANCELED(TOperationState.CANCELED_STATE),
-  CLOSED(TOperationState.CLOSED_STATE),
-  ERROR(TOperationState.ERROR_STATE),
-  UNKNOWN(TOperationState.UKNOWN_STATE),
-  PENDING(TOperationState.PENDING_STATE);
+  INITIALIZED(TOperationState.INITIALIZED_STATE, false),
+  RUNNING(TOperationState.RUNNING_STATE, false),
+  FINISHED(TOperationState.FINISHED_STATE, true),
+  CANCELED(TOperationState.CANCELED_STATE, true),
+  CLOSED(TOperationState.CLOSED_STATE, true),
+  ERROR(TOperationState.ERROR_STATE, true),
+  UNKNOWN(TOperationState.UKNOWN_STATE, false),
+  PENDING(TOperationState.PENDING_STATE, false);
 
   private final TOperationState tOperationState;
+  private final boolean terminal;
 
-  OperationState(TOperationState tOperationState) {
+  OperationState(TOperationState tOperationState, boolean terminal) {
     this.tOperationState = tOperationState;
+    this.terminal = terminal;
   }
 
+  // must be sync with TOperationState in order
   public static OperationState getOperationState(TOperationState tOperationState) {
-    // TODO: replace this with a Map?
-    for (OperationState opState : values()) {
-      if (tOperationState.equals(opState.tOperationState)) {
-        return opState;
-      }
-    }
-    return OperationState.UNKNOWN;
+    return OperationState.values()[tOperationState.getValue()];
   }
 
   public static void validateTransition(OperationState oldState,
@@ -91,7 +88,8 @@ public enum OperationState {
     default:
       // fall-through
     }
-    throw new HiveSQLException("Illegal Operation state transition");
+    throw new HiveSQLException("Illegal Operation state transition " +
+        "from " + oldState + " to " + newState);
   }
 
   public void validateTransition(OperationState newState)
@@ -102,4 +100,8 @@ public enum OperationState {
   public TOperationState toTOperationState() {
     return tOperationState;
   }
+
+  public boolean isTerminal() {
+    return terminal;
+  }
 }

Modified: hive/trunk/service/src/java/org/apache/hive/service/cli/operation/Operation.java
URL: http://svn.apache.org/viewvc/hive/trunk/service/src/java/org/apache/hive/service/cli/operation/Operation.java?rev=1621912&r1=1621911&r2=1621912&view=diff
==============================================================================
--- hive/trunk/service/src/java/org/apache/hive/service/cli/operation/Operation.java (original)
+++ hive/trunk/service/src/java/org/apache/hive/service/cli/operation/Operation.java Tue Sep  2 04:41:29 2014
@@ -21,6 +21,7 @@ import java.io.File;
 import java.io.FileNotFoundException;
 import java.util.EnumSet;
 import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -52,14 +53,19 @@ public abstract class Operation {
   protected OperationLog operationLog;
   protected boolean isOperationLogEnabled;
 
+  private long operationTimeout;
+  private long lastAccessTime;
+
   protected static final EnumSet<FetchOrientation> DEFAULT_FETCH_ORIENTATION_SET =
       EnumSet.of(FetchOrientation.FETCH_NEXT,FetchOrientation.FETCH_FIRST);
 
   protected Operation(HiveSession parentSession, OperationType opType, boolean runInBackground) {
-    super();
     this.parentSession = parentSession;
     this.runAsync = runInBackground;
     this.opHandle = new OperationHandle(opType, parentSession.getProtocolVersion());
+    lastAccessTime = System.currentTimeMillis();
+    operationTimeout = HiveConf.getTimeVar(parentSession.getHiveConf(),
+        HiveConf.ConfVars.HIVE_SERVER2_IDLE_OPERATION_TIMEOUT, TimeUnit.MILLISECONDS);
   }
 
   public Future<?> getBackgroundHandle() {
@@ -111,7 +117,6 @@ public abstract class Operation {
     opHandle.setHasResultSet(hasResultSet);
   }
 
-
   public OperationLog getOperationLog() {
     return operationLog;
   }
@@ -119,9 +124,33 @@ public abstract class Operation {
   protected final OperationState setState(OperationState newState) throws HiveSQLException {
     state.validateTransition(newState);
     this.state = newState;
+    this.lastAccessTime = System.currentTimeMillis();
     return this.state;
   }
 
+  public boolean isTimedOut(long current) {
+    if (operationTimeout == 0) {
+      return false;
+    }
+    if (operationTimeout > 0) {
+      // check only when it's in terminal state
+      return state.isTerminal() && lastAccessTime + operationTimeout <= current;
+    }
+    return lastAccessTime + -operationTimeout <= current;
+  }
+
+  public long getLastAccessTime() {
+    return lastAccessTime;
+  }
+
+  public long getOperationTimeout() {
+    return operationTimeout;
+  }
+
+  public void setOperationTimeout(long operationTimeout) {
+    this.operationTimeout = operationTimeout;
+  }
+
   protected void setOperationException(HiveSQLException operationException) {
     this.operationException = operationException;
   }
@@ -130,6 +159,7 @@ public abstract class Operation {
     if (this.state != state) {
       throw new HiveSQLException("Expected state " + state + ", but found " + this.state);
     }
+    this.lastAccessTime = System.currentTimeMillis();
   }
 
   public boolean isRunning() {

Modified: hive/trunk/service/src/java/org/apache/hive/service/cli/operation/OperationManager.java
URL: http://svn.apache.org/viewvc/hive/trunk/service/src/java/org/apache/hive/service/cli/operation/OperationManager.java?rev=1621912&r1=1621911&r2=1621912&view=diff
==============================================================================
--- hive/trunk/service/src/java/org/apache/hive/service/cli/operation/OperationManager.java (original)
+++ hive/trunk/service/src/java/org/apache/hive/service/cli/operation/OperationManager.java Tue Sep  2 04:41:29 2014
@@ -19,6 +19,7 @@
 package org.apache.hive.service.cli.operation;
 
 import java.util.Enumeration;
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -155,15 +156,27 @@ public class OperationManager extends Ab
     return operation;
   }
 
-  public synchronized Operation getOperation(OperationHandle operationHandle)
-      throws HiveSQLException {
-    Operation operation = handleToOperation.get(operationHandle);
+  public Operation getOperation(OperationHandle operationHandle) throws HiveSQLException {
+    Operation operation = getOperationInternal(operationHandle);
     if (operation == null) {
       throw new HiveSQLException("Invalid OperationHandle: " + operationHandle);
     }
     return operation;
   }
 
+  private synchronized Operation getOperationInternal(OperationHandle operationHandle) {
+    return handleToOperation.get(operationHandle);
+  }
+
+  private synchronized Operation removeTimedOutOperation(OperationHandle operationHandle) {
+    Operation operation = handleToOperation.get(operationHandle);
+    if (operation != null && operation.isTimedOut(System.currentTimeMillis())) {
+      handleToOperation.remove(operationHandle);
+      return operation;
+    }
+    return null;
+  }
+
   private synchronized void addOperation(Operation operation) {
     handleToOperation.put(operation.getHandle(), operation);
   }
@@ -252,4 +265,16 @@ public class OperationManager extends Ab
   public OperationLog getOperationLogByThread() {
     return OperationLog.getCurrentOperationLog();
   }
+
+  public List<Operation> removeExpiredOperations(OperationHandle[] handles) {
+    List<Operation> removed = new ArrayList<Operation>();
+    for (OperationHandle handle : handles) {
+      Operation operation = removeTimedOutOperation(handle);
+      if (operation != null) {
+        LOG.warn("Operation " + handle + " is timed-out and will be closed");
+        removed.add(operation);
+      }
+    }
+    return removed;
+  }
 }

Modified: hive/trunk/service/src/java/org/apache/hive/service/cli/session/HiveSession.java
URL: http://svn.apache.org/viewvc/hive/trunk/service/src/java/org/apache/hive/service/cli/session/HiveSession.java?rev=1621912&r1=1621911&r2=1621912&view=diff
==============================================================================
--- hive/trunk/service/src/java/org/apache/hive/service/cli/session/HiveSession.java (original)
+++ hive/trunk/service/src/java/org/apache/hive/service/cli/session/HiveSession.java Tue Sep  2 04:41:29 2014
@@ -27,9 +27,9 @@ import org.apache.hive.service.cli.*;
 
 public interface HiveSession extends HiveSessionBase {
 
-  public void open();
+  void open();
 
-  public IMetaStoreClient getMetaStoreClient() throws HiveSQLException;
+  IMetaStoreClient getMetaStoreClient() throws HiveSQLException;
 
   /**
    * getInfo operation handler
@@ -37,7 +37,7 @@ public interface HiveSession extends Hiv
    * @return
    * @throws HiveSQLException
    */
-  public GetInfoValue getInfo(GetInfoType getInfoType) throws HiveSQLException;
+  GetInfoValue getInfo(GetInfoType getInfoType) throws HiveSQLException;
 
   /**
    * execute operation handler
@@ -46,7 +46,7 @@ public interface HiveSession extends Hiv
    * @return
    * @throws HiveSQLException
    */
-  public OperationHandle executeStatement(String statement,
+  OperationHandle executeStatement(String statement,
       Map<String, String> confOverlay) throws HiveSQLException;
 
   /**
@@ -56,7 +56,7 @@ public interface HiveSession extends Hiv
    * @return
    * @throws HiveSQLException
    */
-  public OperationHandle executeStatementAsync(String statement,
+  OperationHandle executeStatementAsync(String statement,
       Map<String, String> confOverlay) throws HiveSQLException;
 
   /**
@@ -64,14 +64,14 @@ public interface HiveSession extends Hiv
    * @return
    * @throws HiveSQLException
    */
-  public OperationHandle getTypeInfo() throws HiveSQLException;
+  OperationHandle getTypeInfo() throws HiveSQLException;
 
   /**
    * getCatalogs operation handler
    * @return
    * @throws HiveSQLException
    */
-  public OperationHandle getCatalogs() throws HiveSQLException;
+  OperationHandle getCatalogs() throws HiveSQLException;
 
   /**
    * getSchemas operation handler
@@ -80,7 +80,7 @@ public interface HiveSession extends Hiv
    * @return
    * @throws HiveSQLException
    */
-  public OperationHandle getSchemas(String catalogName, String schemaName)
+  OperationHandle getSchemas(String catalogName, String schemaName)
       throws HiveSQLException;
 
   /**
@@ -92,7 +92,7 @@ public interface HiveSession extends Hiv
    * @return
    * @throws HiveSQLException
    */
-  public OperationHandle getTables(String catalogName, String schemaName,
+  OperationHandle getTables(String catalogName, String schemaName,
       String tableName, List<String> tableTypes) throws HiveSQLException;
 
   /**
@@ -100,7 +100,7 @@ public interface HiveSession extends Hiv
    * @return
    * @throws HiveSQLException
    */
-  public OperationHandle getTableTypes() throws HiveSQLException ;
+  OperationHandle getTableTypes() throws HiveSQLException ;
 
   /**
    * getColumns operation handler
@@ -111,7 +111,7 @@ public interface HiveSession extends Hiv
    * @return
    * @throws HiveSQLException
    */
-  public OperationHandle getColumns(String catalogName, String schemaName,
+  OperationHandle getColumns(String catalogName, String schemaName,
       String tableName, String columnName)  throws HiveSQLException;
 
   /**
@@ -122,31 +122,33 @@ public interface HiveSession extends Hiv
    * @return
    * @throws HiveSQLException
    */
-  public OperationHandle getFunctions(String catalogName, String schemaName,
+  OperationHandle getFunctions(String catalogName, String schemaName,
       String functionName) throws HiveSQLException;
 
   /**
    * close the session
    * @throws HiveSQLException
    */
-  public void close() throws HiveSQLException;
+  void close() throws HiveSQLException;
 
-  public void cancelOperation(OperationHandle opHandle) throws HiveSQLException;
+  void cancelOperation(OperationHandle opHandle) throws HiveSQLException;
 
-  public void closeOperation(OperationHandle opHandle) throws HiveSQLException;
+  void closeOperation(OperationHandle opHandle) throws HiveSQLException;
 
-  public TableSchema getResultSetMetadata(OperationHandle opHandle)
+  TableSchema getResultSetMetadata(OperationHandle opHandle)
       throws HiveSQLException;
 
-  public RowSet fetchResults(OperationHandle opHandle, FetchOrientation orientation,
+  RowSet fetchResults(OperationHandle opHandle, FetchOrientation orientation,
       long maxRows, FetchType fetchType) throws HiveSQLException;
 
-  public String getDelegationToken(HiveAuthFactory authFactory, String owner,
+  String getDelegationToken(HiveAuthFactory authFactory, String owner,
       String renewer) throws HiveSQLException;
 
-  public void cancelDelegationToken(HiveAuthFactory authFactory, String tokenStr)
+  void cancelDelegationToken(HiveAuthFactory authFactory, String tokenStr)
       throws HiveSQLException;
 
-  public void renewDelegationToken(HiveAuthFactory authFactory, String tokenStr)
+  void renewDelegationToken(HiveAuthFactory authFactory, String tokenStr)
       throws HiveSQLException;
+
+  void closeExpiredOperations();
 }

Modified: hive/trunk/service/src/java/org/apache/hive/service/cli/session/HiveSessionBase.java
URL: http://svn.apache.org/viewvc/hive/trunk/service/src/java/org/apache/hive/service/cli/session/HiveSessionBase.java?rev=1621912&r1=1621911&r2=1621912&view=diff
==============================================================================
--- hive/trunk/service/src/java/org/apache/hive/service/cli/session/HiveSessionBase.java (original)
+++ hive/trunk/service/src/java/org/apache/hive/service/cli/session/HiveSessionBase.java Tue Sep  2 04:41:29 2014
@@ -92,4 +92,6 @@ public interface HiveSessionBase {
   String getIpAddress();
 
   void setIpAddress(String ipAddress);
+
+  long getLastAccessTime();
 }

Modified: hive/trunk/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java
URL: http://svn.apache.org/viewvc/hive/trunk/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java?rev=1621912&r1=1621911&r2=1621912&view=diff
==============================================================================
--- hive/trunk/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java (original)
+++ hive/trunk/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java Tue Sep  2 04:41:29 2014
@@ -54,6 +54,7 @@ import org.apache.hive.service.cli.opera
 import org.apache.hive.service.cli.operation.GetTableTypesOperation;
 import org.apache.hive.service.cli.operation.GetTypeInfoOperation;
 import org.apache.hive.service.cli.operation.MetadataOperation;
+import org.apache.hive.service.cli.operation.Operation;
 import org.apache.hive.service.cli.operation.OperationManager;
 import org.apache.hive.service.cli.thrift.TProtocolVersion;
 import org.apache.hive.service.server.ThreadWithGarbageCleanup;
@@ -84,6 +85,8 @@ public class HiveSessionImpl implements 
   private boolean isOperationLogEnabled;
   private File sessionLogDir;
 
+  private volatile long lastAccessTime;
+
   public HiveSessionImpl(TProtocolVersion protocol, String username, String password,
       HiveConf serverhiveConf, String ipAddress) {
     this.username = username;
@@ -108,6 +111,8 @@ public class HiveSessionImpl implements 
     sessionState = new SessionState(hiveConf, username);
     sessionState.setUserIpAddress(ipAddress);
     sessionState.setIsHiveServerQuery(true);
+
+    lastAccessTime = System.currentTimeMillis();
     SessionState.start(sessionState);
   }
 
@@ -239,10 +244,13 @@ public class HiveSessionImpl implements 
     SessionState.start(sessionState);
   }
 
-  protected synchronized void acquire() throws HiveSQLException {
+  protected synchronized void acquire(boolean userAccess) {
     // Need to make sure that the this HiveServer2's session's session state is
     // stored in the thread local for the handler thread.
     SessionState.setCurrentSessionState(sessionState);
+    if (userAccess) {
+      lastAccessTime = System.currentTimeMillis();
+    }
   }
 
   /**
@@ -252,14 +260,16 @@ public class HiveSessionImpl implements 
    * when this thread is garbage collected later.
    * @see org.apache.hive.service.server.ThreadWithGarbageCleanup#finalize()
    */
-  protected synchronized void release() {
-    assert sessionState != null;
+  protected synchronized void release(boolean userAccess) {
     SessionState.detachSession();
     if (ThreadWithGarbageCleanup.currentThread() instanceof ThreadWithGarbageCleanup) {
       ThreadWithGarbageCleanup currentThread =
           (ThreadWithGarbageCleanup) ThreadWithGarbageCleanup.currentThread();
       currentThread.cacheThreadLocalRawStore();
     }
+    if (userAccess) {
+      lastAccessTime = System.currentTimeMillis();
+    }
   }
 
   @Override
@@ -298,7 +308,7 @@ public class HiveSessionImpl implements 
   @Override
   public GetInfoValue getInfo(GetInfoType getInfoType)
       throws HiveSQLException {
-    acquire();
+    acquire(true);
     try {
       switch (getInfoType) {
       case CLI_SERVER_NAME:
@@ -318,7 +328,7 @@ public class HiveSessionImpl implements 
         throw new HiveSQLException("Unrecognized GetInfoType value: " + getInfoType.toString());
       }
     } finally {
-      release();
+      release(true);
     }
   }
 
@@ -337,7 +347,7 @@ public class HiveSessionImpl implements 
   private OperationHandle executeStatementInternal(String statement, Map<String, String> confOverlay,
       boolean runAsync)
           throws HiveSQLException {
-    acquire();
+    acquire(true);
 
     OperationManager operationManager = getOperationManager();
     ExecuteStatementOperation operation = operationManager
@@ -355,14 +365,14 @@ public class HiveSessionImpl implements 
       }
       throw e;
     } finally {
-      release();
+      release(true);
     }
   }
 
   @Override
   public OperationHandle getTypeInfo()
       throws HiveSQLException {
-    acquire();
+    acquire(true);
 
     OperationManager operationManager = getOperationManager();
     GetTypeInfoOperation operation = operationManager.newGetTypeInfoOperation(getSession());
@@ -375,14 +385,14 @@ public class HiveSessionImpl implements 
       operationManager.closeOperation(opHandle);
       throw e;
     } finally {
-      release();
+      release(true);
     }
   }
 
   @Override
   public OperationHandle getCatalogs()
       throws HiveSQLException {
-    acquire();
+    acquire(true);
 
     OperationManager operationManager = getOperationManager();
     GetCatalogsOperation operation = operationManager.newGetCatalogsOperation(getSession());
@@ -395,14 +405,14 @@ public class HiveSessionImpl implements 
       operationManager.closeOperation(opHandle);
       throw e;
     } finally {
-      release();
+      release(true);
     }
   }
 
   @Override
   public OperationHandle getSchemas(String catalogName, String schemaName)
       throws HiveSQLException {
-    acquire();
+    acquire(true);
 
     OperationManager operationManager = getOperationManager();
     GetSchemasOperation operation =
@@ -416,7 +426,7 @@ public class HiveSessionImpl implements 
       operationManager.closeOperation(opHandle);
       throw e;
     } finally {
-      release();
+      release(true);
     }
   }
 
@@ -424,7 +434,7 @@ public class HiveSessionImpl implements 
   public OperationHandle getTables(String catalogName, String schemaName, String tableName,
       List<String> tableTypes)
           throws HiveSQLException {
-    acquire();
+    acquire(true);
 
     OperationManager operationManager = getOperationManager();
     MetadataOperation operation =
@@ -438,14 +448,14 @@ public class HiveSessionImpl implements 
       operationManager.closeOperation(opHandle);
       throw e;
     } finally {
-      release();
+      release(true);
     }
   }
 
   @Override
   public OperationHandle getTableTypes()
       throws HiveSQLException {
-    acquire();
+    acquire(true);
 
     OperationManager operationManager = getOperationManager();
     GetTableTypesOperation operation = operationManager.newGetTableTypesOperation(getSession());
@@ -458,14 +468,14 @@ public class HiveSessionImpl implements 
       operationManager.closeOperation(opHandle);
       throw e;
     } finally {
-      release();
+      release(true);
     }
   }
 
   @Override
   public OperationHandle getColumns(String catalogName, String schemaName,
       String tableName, String columnName)  throws HiveSQLException {
-    acquire();
+    acquire(true);
 
     OperationManager operationManager = getOperationManager();
     GetColumnsOperation operation = operationManager.newGetColumnsOperation(getSession(),
@@ -479,14 +489,14 @@ public class HiveSessionImpl implements 
       operationManager.closeOperation(opHandle);
       throw e;
     } finally {
-      release();
+      release(true);
     }
   }
 
   @Override
   public OperationHandle getFunctions(String catalogName, String schemaName, String functionName)
       throws HiveSQLException {
-    acquire();
+    acquire(true);
 
     OperationManager operationManager = getOperationManager();
     GetFunctionsOperation operation = operationManager
@@ -500,14 +510,14 @@ public class HiveSessionImpl implements 
       operationManager.closeOperation(opHandle);
       throw e;
     } finally {
-      release();
+      release(true);
     }
   }
 
   @Override
   public void close() throws HiveSQLException {
     try {
-      acquire();
+      acquire(true);
       /**
        * For metadata operations like getTables(), getColumns() etc,
        * the session allocates a private metastore handler which should be
@@ -532,7 +542,7 @@ public class HiveSessionImpl implements 
     } catch (IOException ioe) {
       throw new HiveSQLException("Failure to close", ioe);
     } finally {
-      release();
+      release(true);
     }
   }
 
@@ -562,50 +572,79 @@ public class HiveSessionImpl implements 
   }
 
   @Override
+  public long getLastAccessTime() {
+    return lastAccessTime;
+  }
+
+  @Override
+  public void closeExpiredOperations() {
+    OperationHandle[] handles = opHandleSet.toArray(new OperationHandle[opHandleSet.size()]);
+    if (handles.length > 0) {
+      List<Operation> operations = operationManager.removeExpiredOperations(handles);
+      if (!operations.isEmpty()) {
+        closeTimedOutOperations(operations);
+      }
+    }
+  }
+
+  private void closeTimedOutOperations(List<Operation> operations) {
+    acquire(false);
+    try {
+      for (Operation operation : operations) {
+        opHandleSet.remove(operation.getHandle());
+        try {
+          operation.close();
+        } catch (Exception e) {
+          LOG.warn("Exception is thrown closing timed-out operation " + operation.getHandle(), e);
+        }
+      }
+    } finally {
+      release(false);
+    }
+  }
+
+  @Override
   public void cancelOperation(OperationHandle opHandle) throws HiveSQLException {
-    acquire();
+    acquire(true);
     try {
       sessionManager.getOperationManager().cancelOperation(opHandle);
     } finally {
-      release();
+      release(true);
     }
   }
 
   @Override
   public void closeOperation(OperationHandle opHandle) throws HiveSQLException {
-    acquire();
+    acquire(true);
     try {
       operationManager.closeOperation(opHandle);
       opHandleSet.remove(opHandle);
     } finally {
-      release();
+      release(true);
     }
   }
 
   @Override
   public TableSchema getResultSetMetadata(OperationHandle opHandle) throws HiveSQLException {
-    acquire();
+    acquire(true);
     try {
       return sessionManager.getOperationManager().getOperationResultSetSchema(opHandle);
     } finally {
-      release();
+      release(true);
     }
   }
 
   @Override
   public RowSet fetchResults(OperationHandle opHandle, FetchOrientation orientation,
       long maxRows, FetchType fetchType) throws HiveSQLException {
-    acquire();
+    acquire(true);
     try {
       if (fetchType == FetchType.QUERY_OUTPUT) {
-        return sessionManager.getOperationManager()
-            .getOperationNextRowSet(opHandle, orientation, maxRows);
-      } else {
-        return sessionManager.getOperationManager()
-            .getOperationLogRowSet(opHandle, orientation, maxRows);
+        return operationManager.getOperationNextRowSet(opHandle, orientation, maxRows);
       }
+      return operationManager.getOperationLogRowSet(opHandle, orientation, maxRows);
     } finally {
-      release();
+      release(true);
     }
   }
 

Modified: hive/trunk/service/src/java/org/apache/hive/service/cli/session/HiveSessionImplwithUGI.java
URL: http://svn.apache.org/viewvc/hive/trunk/service/src/java/org/apache/hive/service/cli/session/HiveSessionImplwithUGI.java?rev=1621912&r1=1621911&r2=1621912&view=diff
==============================================================================
--- hive/trunk/service/src/java/org/apache/hive/service/cli/session/HiveSessionImplwithUGI.java (original)
+++ hive/trunk/service/src/java/org/apache/hive/service/cli/session/HiveSessionImplwithUGI.java Tue Sep  2 04:41:29 2014
@@ -19,7 +19,6 @@
 package org.apache.hive.service.cli.session;
 
 import java.io.IOException;
-import java.util.Map;
 
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.metadata.Hive;
@@ -83,8 +82,8 @@ public class HiveSessionImplwithUGI exte
   }
 
   @Override
-  protected synchronized void acquire() throws HiveSQLException {
-    super.acquire();
+  protected synchronized void acquire(boolean userAccess) {
+    super.acquire(userAccess);
     // if we have a metastore connection with impersonation, then set it first
     if (sessionHive != null) {
       Hive.set(sessionHive);
@@ -98,11 +97,11 @@ public class HiveSessionImplwithUGI exte
   @Override
   public void close() throws HiveSQLException {
     try {
-    acquire();
+    acquire(true);
     ShimLoader.getHadoopShims().closeAllForUGI(sessionUgi);
     cancelDelegationToken();
     } finally {
-      release();
+      release(true);
       super.close();
     }
   }

Modified: hive/trunk/service/src/java/org/apache/hive/service/cli/session/SessionManager.java
URL: http://svn.apache.org/viewvc/hive/trunk/service/src/java/org/apache/hive/service/cli/session/SessionManager.java?rev=1621912&r1=1621911&r2=1621912&view=diff
==============================================================================
--- hive/trunk/service/src/java/org/apache/hive/service/cli/session/SessionManager.java (original)
+++ hive/trunk/service/src/java/org/apache/hive/service/cli/session/SessionManager.java Tue Sep  2 04:41:29 2014
@@ -20,6 +20,8 @@ package org.apache.hive.service.cli.sess
 
 import java.io.File;
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Date;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
@@ -59,6 +61,11 @@ public class SessionManager extends Comp
   private boolean isOperationLogEnabled;
   private File operationLogRootDir;
 
+  private long checkInterval;
+  private long sessionTimeout;
+
+  private volatile boolean shutdown;
+
   public SessionManager() {
     super("SessionManager");
   }
@@ -81,20 +88,28 @@ public class SessionManager extends Comp
   }
 
   private void createBackgroundOperationPool() {
-    int backgroundPoolSize = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_ASYNC_EXEC_THREADS);
-    LOG.info("HiveServer2: Background operation thread pool size: " + backgroundPoolSize);
-    int backgroundPoolQueueSize = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_ASYNC_EXEC_WAIT_QUEUE_SIZE);
-    LOG.info("HiveServer2: Background operation thread wait queue size: " + backgroundPoolQueueSize);
-    int keepAliveTime = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_ASYNC_EXEC_KEEPALIVE_TIME);
-    LOG.info("HiveServer2: Background operation thread keepalive time: " + keepAliveTime);
-    // Create a thread pool with #backgroundPoolSize threads
+    int poolSize = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_ASYNC_EXEC_THREADS);
+    LOG.info("HiveServer2: Background operation thread pool size: " + poolSize);
+    int poolQueueSize = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_ASYNC_EXEC_WAIT_QUEUE_SIZE);
+    LOG.info("HiveServer2: Background operation thread wait queue size: " + poolQueueSize);
+    long keepAliveTime = HiveConf.getTimeVar(
+        hiveConf, ConfVars.HIVE_SERVER2_ASYNC_EXEC_KEEPALIVE_TIME, TimeUnit.SECONDS);
+    LOG.info(
+        "HiveServer2: Background operation thread keepalive time: " + keepAliveTime + " seconds");
+
+    // Create a thread pool with #poolSize threads
     // Threads terminate when they are idle for more than the keepAliveTime
-    // A bounded blocking queue is used to queue incoming operations, if #operations > backgroundPoolSize
+    // A bounded blocking queue is used to queue incoming operations, if #operations > poolSize
     String threadPoolName = "HiveServer2-Background-Pool";
-    backgroundOperationPool = new ThreadPoolExecutor(backgroundPoolSize, backgroundPoolSize,
-        keepAliveTime, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(backgroundPoolQueueSize),
+    backgroundOperationPool = new ThreadPoolExecutor(poolSize, poolSize,
+        keepAliveTime, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(poolQueueSize),
         new ThreadFactoryWithGarbageCleanup(threadPoolName));
     backgroundOperationPool.allowCoreThreadTimeOut(true);
+
+    checkInterval = HiveConf.getTimeVar(
+        hiveConf, ConfVars.HIVE_SERVER2_SESSION_CHECK_INTERVAL, TimeUnit.MILLISECONDS);
+    sessionTimeout = HiveConf.getTimeVar(
+        hiveConf, ConfVars.HIVE_SERVER2_IDLE_SESSION_TIMEOUT, TimeUnit.MILLISECONDS);
   }
 
   private void applyAuthorizationConfigPolicy(HiveConf newHiveConf) throws HiveException {
@@ -139,20 +154,61 @@ public class SessionManager extends Comp
   @Override
   public synchronized void start() {
     super.start();
+    if (checkInterval > 0) {
+      startTimeoutChecker();
+    }
+  }
+
+  private void startTimeoutChecker() {
+    final long interval = Math.max(checkInterval, 3000l);  // minimum 3 seconds
+    Runnable timeoutChecker = new Runnable() {
+      @Override
+      public void run() {
+        for (sleepInterval(interval); !shutdown; sleepInterval(interval)) {
+          long current = System.currentTimeMillis();
+          for (HiveSession session : new ArrayList<HiveSession>(handleToSession.values())) {
+            if (sessionTimeout > 0 && session.getLastAccessTime() + sessionTimeout <= current) {
+              SessionHandle handle = session.getSessionHandle();
+              LOG.warn("Session " + handle + " is Timed-out (last access : " +
+                  new Date(session.getLastAccessTime()) + ") and will be closed");
+              try {
+                closeSession(handle);
+              } catch (HiveSQLException e) {
+                LOG.warn("Exception is thrown closing session " + handle, e);
+              }
+            } else {
+              session.closeExpiredOperations();
+            }
+          }
+        }
+      }
+
+      private void sleepInterval(long interval) {
+        try {
+          Thread.sleep(interval);
+        } catch (InterruptedException e) {
+          // ignore
+        }
+      }
+    };
+    backgroundOperationPool.execute(timeoutChecker);
   }
 
   @Override
   public synchronized void stop() {
     super.stop();
+    shutdown = true;
     if (backgroundOperationPool != null) {
       backgroundOperationPool.shutdown();
-      int timeout = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_ASYNC_EXEC_SHUTDOWN_TIMEOUT);
+      long timeout = hiveConf.getTimeVar(
+          ConfVars.HIVE_SERVER2_ASYNC_EXEC_SHUTDOWN_TIMEOUT, TimeUnit.SECONDS);
       try {
         backgroundOperationPool.awaitTermination(timeout, TimeUnit.SECONDS);
       } catch (InterruptedException e) {
         LOG.warn("HIVE_SERVER2_ASYNC_EXEC_SHUTDOWN_TIMEOUT = " + timeout +
             " seconds has been exceeded. RUNNING background operations will be shut down", e);
       }
+      backgroundOperationPool = null;
     }
     cleanupLoggingRootDir();
   }

Modified: hive/trunk/service/src/java/org/apache/hive/service/cli/thrift/ThriftBinaryCLIService.java
URL: http://svn.apache.org/viewvc/hive/trunk/service/src/java/org/apache/hive/service/cli/thrift/ThriftBinaryCLIService.java?rev=1621912&r1=1621911&r2=1621912&view=diff
==============================================================================
--- hive/trunk/service/src/java/org/apache/hive/service/cli/thrift/ThriftBinaryCLIService.java (original)
+++ hive/trunk/service/src/java/org/apache/hive/service/cli/thrift/ThriftBinaryCLIService.java Tue Sep  2 04:41:29 2014
@@ -70,7 +70,8 @@ public class ThriftBinaryCLIService exte
 
       minWorkerThreads = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_THRIFT_MIN_WORKER_THREADS);
       maxWorkerThreads = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_THRIFT_MAX_WORKER_THREADS);
-      workerKeepAliveTime = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_THRIFT_WORKER_KEEPALIVE_TIME);
+      workerKeepAliveTime = hiveConf.getTimeVar(
+          ConfVars.HIVE_SERVER2_THRIFT_WORKER_KEEPALIVE_TIME, TimeUnit.SECONDS);
       String threadPoolName = "HiveServer2-Handler-Pool";
       ExecutorService executorService = new ThreadPoolExecutor(minWorkerThreads, maxWorkerThreads,
           workerKeepAliveTime, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(),

Modified: hive/trunk/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java
URL: http://svn.apache.org/viewvc/hive/trunk/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java?rev=1621912&r1=1621911&r2=1621912&view=diff
==============================================================================
--- hive/trunk/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java (original)
+++ hive/trunk/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java Tue Sep  2 04:41:29 2014
@@ -61,7 +61,7 @@ public abstract class ThriftCLIService e
 
   protected int minWorkerThreads;
   protected int maxWorkerThreads;
-  protected int workerKeepAliveTime;
+  protected long workerKeepAliveTime;
 
   protected static HiveAuthFactory hiveAuthFactory;
 

Modified: hive/trunk/service/src/java/org/apache/hive/service/cli/thrift/ThriftHttpCLIService.java
URL: http://svn.apache.org/viewvc/hive/trunk/service/src/java/org/apache/hive/service/cli/thrift/ThriftHttpCLIService.java?rev=1621912&r1=1621911&r2=1621912&view=diff
==============================================================================
--- hive/trunk/service/src/java/org/apache/hive/service/cli/thrift/ThriftHttpCLIService.java (original)
+++ hive/trunk/service/src/java/org/apache/hive/service/cli/thrift/ThriftHttpCLIService.java Tue Sep  2 04:41:29 2014
@@ -69,7 +69,8 @@ public class ThriftHttpCLIService extend
 
       minWorkerThreads = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_THRIFT_HTTP_MIN_WORKER_THREADS);
       maxWorkerThreads = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_THRIFT_HTTP_MAX_WORKER_THREADS);
-      workerKeepAliveTime = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_THRIFT_HTTP_WORKER_KEEPALIVE_TIME);
+      workerKeepAliveTime = hiveConf.getTimeVar(
+          ConfVars.HIVE_SERVER2_THRIFT_HTTP_WORKER_KEEPALIVE_TIME, TimeUnit.SECONDS);
 
       String httpPath =  getHttpPath(hiveConf.getVar(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_HTTP_PATH));
 
@@ -110,7 +111,8 @@ public class ThriftHttpCLIService extend
       // Linux:yes, Windows:no
       connector.setReuseAddress(!Shell.WINDOWS);
       
-      int maxIdleTime = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_THRIFT_HTTP_MAX_IDLE_TIME);
+      int maxIdleTime = (int) hiveConf.getTimeVar(
+          ConfVars.HIVE_SERVER2_THRIFT_HTTP_MAX_IDLE_TIME, TimeUnit.MILLISECONDS);
       connector.setMaxIdleTime(maxIdleTime);
       
       httpServer.addConnector(connector);

Modified: hive/trunk/service/src/test/org/apache/hive/service/cli/CLIServiceTest.java
URL: http://svn.apache.org/viewvc/hive/trunk/service/src/test/org/apache/hive/service/cli/CLIServiceTest.java?rev=1621912&r1=1621911&r2=1621912&view=diff
==============================================================================
--- hive/trunk/service/src/test/org/apache/hive/service/cli/CLIServiceTest.java (original)
+++ hive/trunk/service/src/test/org/apache/hive/service/cli/CLIServiceTest.java Tue Sep  2 04:41:29 2014
@@ -26,9 +26,9 @@ import static org.junit.Assert.fail;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -202,7 +202,8 @@ public abstract class CLIServiceTest {
      * to give a compile time error.
      * (compilation is done synchronous as of now)
      */
-    longPollingTimeout = new HiveConf().getLongVar(ConfVars.HIVE_SERVER2_LONG_POLLING_TIMEOUT);
+    longPollingTimeout = HiveConf.getTimeVar(new HiveConf(),
+        HiveConf.ConfVars.HIVE_SERVER2_LONG_POLLING_TIMEOUT, TimeUnit.MILLISECONDS);
     queryString = "SELECT NON_EXISTING_COLUMN FROM " + tableName;
     try {
       runQueryAsync(sessionHandle, queryString, confOverlay, OperationState.ERROR, longPollingTimeout);
@@ -295,7 +296,7 @@ public abstract class CLIServiceTest {
     long longPollingTimeDelta;
     OperationStatus opStatus = null;
     OperationState state = null;
-    confOverlay.put(HiveConf.ConfVars.HIVE_SERVER2_LONG_POLLING_TIMEOUT.varname, String.valueOf(longPollingTimeout));
+    confOverlay.put(HiveConf.ConfVars.HIVE_SERVER2_LONG_POLLING_TIMEOUT.varname, longPollingTimeout + "ms");
     OperationHandle opHandle = client.executeStatementAsync(sessionHandle, queryString, confOverlay);
     int count = 0;
     while (true) {