You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by na...@apache.org on 2014/09/02 06:41:31 UTC
svn commit: r1621912 [2/2] - in /hive/trunk:
common/src/java/org/apache/hadoop/hive/conf/
hcatalog/core/src/test/java/org/apache/hive/hcatalog/cli/
hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/
itests/hive-unit/src/test/java/org/apach...
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java?rev=1621912&r1=1621911&r2=1621912&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java Tue Sep 2 04:41:29 2014
@@ -43,6 +43,7 @@ import java.io.IOException;
import java.security.PrivilegedExceptionAction;
import java.util.List;
import java.util.Set;
+import java.util.concurrent.TimeUnit;
/**
* A class to initiate compactions. This will run in a separate thread.
@@ -50,7 +51,6 @@ import java.util.Set;
public class Initiator extends CompactorThread {
static final private String CLASS_NAME = Initiator.class.getName();
static final private Log LOG = LogFactory.getLog(CLASS_NAME);
- static final private int threadId = 10000;
static final private String NO_COMPACTION = "NO_AUTO_COMPACTION";
@@ -63,7 +63,7 @@ public class Initiator extends Compactor
try {
recoverFailedCompactions(false);
- int abortedThreashold = HiveConf.getIntVar(conf,
+ int abortedThreshold = HiveConf.getIntVar(conf,
HiveConf.ConfVars.HIVE_COMPACTOR_ABORTEDTXN_THRESHOLD);
// Make sure we run through the loop once before checking to stop as this makes testing
@@ -77,7 +77,7 @@ public class Initiator extends Compactor
try {
ShowCompactResponse currentCompactions = txnHandler.showCompact(new ShowCompactRequest());
ValidTxnList txns = TxnHandler.createValidTxnList(txnHandler.getOpenTxns());
- Set<CompactionInfo> potentials = txnHandler.findPotentialCompactions(abortedThreashold);
+ Set<CompactionInfo> potentials = txnHandler.findPotentialCompactions(abortedThreshold);
LOG.debug("Found " + potentials.size() + " potential compactions, " +
"checking to see if we should compact any of them");
for (CompactionInfo ci : potentials) {
@@ -140,13 +140,13 @@ public class Initiator extends Compactor
public void init(BooleanPointer stop) throws MetaException {
super.init(stop);
checkInterval =
- HiveConf.getLongVar(conf, HiveConf.ConfVars.HIVE_COMPACTOR_CHECK_INTERVAL) * 1000;
+ conf.getTimeVar(HiveConf.ConfVars.HIVE_COMPACTOR_CHECK_INTERVAL, TimeUnit.MILLISECONDS) ;
}
private void recoverFailedCompactions(boolean remoteOnly) throws MetaException {
if (!remoteOnly) txnHandler.revokeFromLocalWorkers(Worker.hostname());
- txnHandler.revokeTimedoutWorkers(HiveConf.getLongVar(conf,
- HiveConf.ConfVars.HIVE_COMPACTOR_WORKER_TIMEOUT));
+ txnHandler.revokeTimedoutWorkers(HiveConf.getTimeVar(conf,
+ HiveConf.ConfVars.HIVE_COMPACTOR_WORKER_TIMEOUT, TimeUnit.MILLISECONDS));
}
// Figure out if there are any currently running compactions on the same table or partition.
Modified: hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java?rev=1621912&r1=1621911&r2=1621912&view=diff
==============================================================================
--- hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java (original)
+++ hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java Tue Sep 2 04:41:29 2014
@@ -30,6 +30,7 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.TimeUnit;
/**
* Tests for the compactor Initiator thread.
@@ -89,7 +90,7 @@ public class TestInitiator extends Compa
txnHandler.findNextToCompact("nosuchhost-193892");
HiveConf conf = new HiveConf();
- HiveConf.setLongVar(conf, HiveConf.ConfVars.HIVE_COMPACTOR_WORKER_TIMEOUT, 1L);
+ conf.setTimeVar(HiveConf.ConfVars.HIVE_COMPACTOR_WORKER_TIMEOUT, 1L, TimeUnit.MILLISECONDS);
startInitiator(conf);
Modified: hive/trunk/ql/src/test/results/clientpositive/show_conf.q.out
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/results/clientpositive/show_conf.q.out?rev=1621912&r1=1621911&r2=1621912&view=diff
==============================================================================
--- hive/trunk/ql/src/test/results/clientpositive/show_conf.q.out (original)
+++ hive/trunk/ql/src/test/results/clientpositive/show_conf.q.out Tue Sep 2 04:41:29 2014
@@ -7,4 +7,4 @@ PREHOOK: query: show conf "hive.stats.re
PREHOOK: type: SHOWCONF
POSTHOOK: query: show conf "hive.stats.retries.wait"
POSTHOOK: type: SHOWCONF
-3000 INT The base waiting window (in milliseconds) before the next retry. The actual wait time is calculated by baseWindow * failures baseWindow * (failure 1) * (random number between [0.0,1.0]).
+3000ms STRING(TIME) Expects a time value with unit (d/day, h/hour, m/min, s/sec, ms/msec, us/usec, ns/nsec), which is msec if not specified. The base waiting window before the next retry. The actual wait time is calculated by baseWindow * failures baseWindow * (failure + 1) * (random number between [0.0,1.0]).
Modified: hive/trunk/service/src/java/org/apache/hadoop/hive/service/HiveServer.java
URL: http://svn.apache.org/viewvc/hive/trunk/service/src/java/org/apache/hadoop/hive/service/HiveServer.java?rev=1621912&r1=1621911&r2=1621912&view=diff
==============================================================================
--- hive/trunk/service/src/java/org/apache/hadoop/hive/service/HiveServer.java (original)
+++ hive/trunk/service/src/java/org/apache/hadoop/hive/service/HiveServer.java Tue Sep 2 04:41:29 2014
@@ -30,6 +30,7 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
+import java.util.concurrent.TimeUnit;
import org.apache.commons.cli.OptionBuilder;
import org.apache.commons.logging.Log;
@@ -62,8 +63,6 @@ import org.apache.thrift.transport.TServ
import org.apache.thrift.transport.TServerTransport;
import org.apache.thrift.transport.TTransport;
import org.apache.thrift.transport.TTransportFactory;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
import com.facebook.fb303.fb_status;
/**
@@ -670,8 +669,11 @@ public class HiveServer extends ThriftHi
boolean tcpKeepAlive = conf.getBoolVar(HiveConf.ConfVars.SERVER_TCP_KEEP_ALIVE);
+ int timeout = (int) HiveConf.getTimeVar(
+ conf, HiveConf.ConfVars.SERVER_READ_SOCKET_TIMEOUT, TimeUnit.MILLISECONDS);
- TServerTransport serverTransport = tcpKeepAlive ? new TServerSocketKeepAlive(cli.port) : new TServerSocket(cli.port, 1000 * conf.getIntVar(HiveConf.ConfVars.SERVER_READ_SOCKET_TIMEOUT));
+ TServerTransport serverTransport =
+ tcpKeepAlive ? new TServerSocketKeepAlive(cli.port) : new TServerSocket(cli.port, timeout);
// set all properties specified on the command line
for (Map.Entry<Object, Object> item : hiveconf.entrySet()) {
Modified: hive/trunk/service/src/java/org/apache/hive/service/cli/CLIService.java
URL: http://svn.apache.org/viewvc/hive/trunk/service/src/java/org/apache/hive/service/cli/CLIService.java?rev=1621912&r1=1621911&r2=1621912&view=diff
==============================================================================
--- hive/trunk/service/src/java/org/apache/hive/service/cli/CLIService.java (original)
+++ hive/trunk/service/src/java/org/apache/hive/service/cli/CLIService.java Tue Sep 2 04:41:29 2014
@@ -362,8 +362,9 @@ public class CLIService extends Composit
* However, if the background operation is complete, we return immediately.
*/
if (operation.shouldRunAsync()) {
- long timeout = operation.getParentSession().getHiveConf().getLongVar(
- HiveConf.ConfVars.HIVE_SERVER2_LONG_POLLING_TIMEOUT);
+ HiveConf conf = operation.getParentSession().getHiveConf();
+ long timeout = HiveConf.getTimeVar(conf,
+ HiveConf.ConfVars.HIVE_SERVER2_LONG_POLLING_TIMEOUT, TimeUnit.MILLISECONDS);
try {
operation.getBackgroundHandle().get(timeout, TimeUnit.MILLISECONDS);
} catch (TimeoutException e) {
Modified: hive/trunk/service/src/java/org/apache/hive/service/cli/OperationState.java
URL: http://svn.apache.org/viewvc/hive/trunk/service/src/java/org/apache/hive/service/cli/OperationState.java?rev=1621912&r1=1621911&r2=1621912&view=diff
==============================================================================
--- hive/trunk/service/src/java/org/apache/hive/service/cli/OperationState.java (original)
+++ hive/trunk/service/src/java/org/apache/hive/service/cli/OperationState.java Tue Sep 2 04:41:29 2014
@@ -25,29 +25,26 @@ import org.apache.hive.service.cli.thrif
*
*/
public enum OperationState {
- INITIALIZED(TOperationState.INITIALIZED_STATE),
- RUNNING(TOperationState.RUNNING_STATE),
- FINISHED(TOperationState.FINISHED_STATE),
- CANCELED(TOperationState.CANCELED_STATE),
- CLOSED(TOperationState.CLOSED_STATE),
- ERROR(TOperationState.ERROR_STATE),
- UNKNOWN(TOperationState.UKNOWN_STATE),
- PENDING(TOperationState.PENDING_STATE);
+ INITIALIZED(TOperationState.INITIALIZED_STATE, false),
+ RUNNING(TOperationState.RUNNING_STATE, false),
+ FINISHED(TOperationState.FINISHED_STATE, true),
+ CANCELED(TOperationState.CANCELED_STATE, true),
+ CLOSED(TOperationState.CLOSED_STATE, true),
+ ERROR(TOperationState.ERROR_STATE, true),
+ UNKNOWN(TOperationState.UKNOWN_STATE, false),
+ PENDING(TOperationState.PENDING_STATE, false);
private final TOperationState tOperationState;
+ private final boolean terminal;
- OperationState(TOperationState tOperationState) {
+ OperationState(TOperationState tOperationState, boolean terminal) {
this.tOperationState = tOperationState;
+ this.terminal = terminal;
}
+ // must be sync with TOperationState in order
public static OperationState getOperationState(TOperationState tOperationState) {
- // TODO: replace this with a Map?
- for (OperationState opState : values()) {
- if (tOperationState.equals(opState.tOperationState)) {
- return opState;
- }
- }
- return OperationState.UNKNOWN;
+ return OperationState.values()[tOperationState.getValue()];
}
public static void validateTransition(OperationState oldState,
@@ -91,7 +88,8 @@ public enum OperationState {
default:
// fall-through
}
- throw new HiveSQLException("Illegal Operation state transition");
+ throw new HiveSQLException("Illegal Operation state transition " +
+ "from " + oldState + " to " + newState);
}
public void validateTransition(OperationState newState)
@@ -102,4 +100,8 @@ public enum OperationState {
public TOperationState toTOperationState() {
return tOperationState;
}
+
+ public boolean isTerminal() {
+ return terminal;
+ }
}
Modified: hive/trunk/service/src/java/org/apache/hive/service/cli/operation/Operation.java
URL: http://svn.apache.org/viewvc/hive/trunk/service/src/java/org/apache/hive/service/cli/operation/Operation.java?rev=1621912&r1=1621911&r2=1621912&view=diff
==============================================================================
--- hive/trunk/service/src/java/org/apache/hive/service/cli/operation/Operation.java (original)
+++ hive/trunk/service/src/java/org/apache/hive/service/cli/operation/Operation.java Tue Sep 2 04:41:29 2014
@@ -21,6 +21,7 @@ import java.io.File;
import java.io.FileNotFoundException;
import java.util.EnumSet;
import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -52,14 +53,19 @@ public abstract class Operation {
protected OperationLog operationLog;
protected boolean isOperationLogEnabled;
+ private long operationTimeout;
+ private long lastAccessTime;
+
protected static final EnumSet<FetchOrientation> DEFAULT_FETCH_ORIENTATION_SET =
EnumSet.of(FetchOrientation.FETCH_NEXT,FetchOrientation.FETCH_FIRST);
protected Operation(HiveSession parentSession, OperationType opType, boolean runInBackground) {
- super();
this.parentSession = parentSession;
this.runAsync = runInBackground;
this.opHandle = new OperationHandle(opType, parentSession.getProtocolVersion());
+ lastAccessTime = System.currentTimeMillis();
+ operationTimeout = HiveConf.getTimeVar(parentSession.getHiveConf(),
+ HiveConf.ConfVars.HIVE_SERVER2_IDLE_OPERATION_TIMEOUT, TimeUnit.MILLISECONDS);
}
public Future<?> getBackgroundHandle() {
@@ -111,7 +117,6 @@ public abstract class Operation {
opHandle.setHasResultSet(hasResultSet);
}
-
public OperationLog getOperationLog() {
return operationLog;
}
@@ -119,9 +124,33 @@ public abstract class Operation {
protected final OperationState setState(OperationState newState) throws HiveSQLException {
state.validateTransition(newState);
this.state = newState;
+ this.lastAccessTime = System.currentTimeMillis();
return this.state;
}
+ public boolean isTimedOut(long current) {
+ if (operationTimeout == 0) {
+ return false;
+ }
+ if (operationTimeout > 0) {
+ // check only when it's in terminal state
+ return state.isTerminal() && lastAccessTime + operationTimeout <= current;
+ }
+ return lastAccessTime + -operationTimeout <= current;
+ }
+
+ public long getLastAccessTime() {
+ return lastAccessTime;
+ }
+
+ public long getOperationTimeout() {
+ return operationTimeout;
+ }
+
+ public void setOperationTimeout(long operationTimeout) {
+ this.operationTimeout = operationTimeout;
+ }
+
protected void setOperationException(HiveSQLException operationException) {
this.operationException = operationException;
}
@@ -130,6 +159,7 @@ public abstract class Operation {
if (this.state != state) {
throw new HiveSQLException("Expected state " + state + ", but found " + this.state);
}
+ this.lastAccessTime = System.currentTimeMillis();
}
public boolean isRunning() {
Modified: hive/trunk/service/src/java/org/apache/hive/service/cli/operation/OperationManager.java
URL: http://svn.apache.org/viewvc/hive/trunk/service/src/java/org/apache/hive/service/cli/operation/OperationManager.java?rev=1621912&r1=1621911&r2=1621912&view=diff
==============================================================================
--- hive/trunk/service/src/java/org/apache/hive/service/cli/operation/OperationManager.java (original)
+++ hive/trunk/service/src/java/org/apache/hive/service/cli/operation/OperationManager.java Tue Sep 2 04:41:29 2014
@@ -19,6 +19,7 @@
package org.apache.hive.service.cli.operation;
import java.util.Enumeration;
+import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -155,15 +156,27 @@ public class OperationManager extends Ab
return operation;
}
- public synchronized Operation getOperation(OperationHandle operationHandle)
- throws HiveSQLException {
- Operation operation = handleToOperation.get(operationHandle);
+ public Operation getOperation(OperationHandle operationHandle) throws HiveSQLException {
+ Operation operation = getOperationInternal(operationHandle);
if (operation == null) {
throw new HiveSQLException("Invalid OperationHandle: " + operationHandle);
}
return operation;
}
+ private synchronized Operation getOperationInternal(OperationHandle operationHandle) {
+ return handleToOperation.get(operationHandle);
+ }
+
+ private synchronized Operation removeTimedOutOperation(OperationHandle operationHandle) {
+ Operation operation = handleToOperation.get(operationHandle);
+ if (operation != null && operation.isTimedOut(System.currentTimeMillis())) {
+ handleToOperation.remove(operationHandle);
+ return operation;
+ }
+ return null;
+ }
+
private synchronized void addOperation(Operation operation) {
handleToOperation.put(operation.getHandle(), operation);
}
@@ -252,4 +265,16 @@ public class OperationManager extends Ab
public OperationLog getOperationLogByThread() {
return OperationLog.getCurrentOperationLog();
}
+
+ public List<Operation> removeExpiredOperations(OperationHandle[] handles) {
+ List<Operation> removed = new ArrayList<Operation>();
+ for (OperationHandle handle : handles) {
+ Operation operation = removeTimedOutOperation(handle);
+ if (operation != null) {
+ LOG.warn("Operation " + handle + " is timed-out and will be closed");
+ removed.add(operation);
+ }
+ }
+ return removed;
+ }
}
Modified: hive/trunk/service/src/java/org/apache/hive/service/cli/session/HiveSession.java
URL: http://svn.apache.org/viewvc/hive/trunk/service/src/java/org/apache/hive/service/cli/session/HiveSession.java?rev=1621912&r1=1621911&r2=1621912&view=diff
==============================================================================
--- hive/trunk/service/src/java/org/apache/hive/service/cli/session/HiveSession.java (original)
+++ hive/trunk/service/src/java/org/apache/hive/service/cli/session/HiveSession.java Tue Sep 2 04:41:29 2014
@@ -27,9 +27,9 @@ import org.apache.hive.service.cli.*;
public interface HiveSession extends HiveSessionBase {
- public void open();
+ void open();
- public IMetaStoreClient getMetaStoreClient() throws HiveSQLException;
+ IMetaStoreClient getMetaStoreClient() throws HiveSQLException;
/**
* getInfo operation handler
@@ -37,7 +37,7 @@ public interface HiveSession extends Hiv
* @return
* @throws HiveSQLException
*/
- public GetInfoValue getInfo(GetInfoType getInfoType) throws HiveSQLException;
+ GetInfoValue getInfo(GetInfoType getInfoType) throws HiveSQLException;
/**
* execute operation handler
@@ -46,7 +46,7 @@ public interface HiveSession extends Hiv
* @return
* @throws HiveSQLException
*/
- public OperationHandle executeStatement(String statement,
+ OperationHandle executeStatement(String statement,
Map<String, String> confOverlay) throws HiveSQLException;
/**
@@ -56,7 +56,7 @@ public interface HiveSession extends Hiv
* @return
* @throws HiveSQLException
*/
- public OperationHandle executeStatementAsync(String statement,
+ OperationHandle executeStatementAsync(String statement,
Map<String, String> confOverlay) throws HiveSQLException;
/**
@@ -64,14 +64,14 @@ public interface HiveSession extends Hiv
* @return
* @throws HiveSQLException
*/
- public OperationHandle getTypeInfo() throws HiveSQLException;
+ OperationHandle getTypeInfo() throws HiveSQLException;
/**
* getCatalogs operation handler
* @return
* @throws HiveSQLException
*/
- public OperationHandle getCatalogs() throws HiveSQLException;
+ OperationHandle getCatalogs() throws HiveSQLException;
/**
* getSchemas operation handler
@@ -80,7 +80,7 @@ public interface HiveSession extends Hiv
* @return
* @throws HiveSQLException
*/
- public OperationHandle getSchemas(String catalogName, String schemaName)
+ OperationHandle getSchemas(String catalogName, String schemaName)
throws HiveSQLException;
/**
@@ -92,7 +92,7 @@ public interface HiveSession extends Hiv
* @return
* @throws HiveSQLException
*/
- public OperationHandle getTables(String catalogName, String schemaName,
+ OperationHandle getTables(String catalogName, String schemaName,
String tableName, List<String> tableTypes) throws HiveSQLException;
/**
@@ -100,7 +100,7 @@ public interface HiveSession extends Hiv
* @return
* @throws HiveSQLException
*/
- public OperationHandle getTableTypes() throws HiveSQLException ;
+ OperationHandle getTableTypes() throws HiveSQLException ;
/**
* getColumns operation handler
@@ -111,7 +111,7 @@ public interface HiveSession extends Hiv
* @return
* @throws HiveSQLException
*/
- public OperationHandle getColumns(String catalogName, String schemaName,
+ OperationHandle getColumns(String catalogName, String schemaName,
String tableName, String columnName) throws HiveSQLException;
/**
@@ -122,31 +122,33 @@ public interface HiveSession extends Hiv
* @return
* @throws HiveSQLException
*/
- public OperationHandle getFunctions(String catalogName, String schemaName,
+ OperationHandle getFunctions(String catalogName, String schemaName,
String functionName) throws HiveSQLException;
/**
* close the session
* @throws HiveSQLException
*/
- public void close() throws HiveSQLException;
+ void close() throws HiveSQLException;
- public void cancelOperation(OperationHandle opHandle) throws HiveSQLException;
+ void cancelOperation(OperationHandle opHandle) throws HiveSQLException;
- public void closeOperation(OperationHandle opHandle) throws HiveSQLException;
+ void closeOperation(OperationHandle opHandle) throws HiveSQLException;
- public TableSchema getResultSetMetadata(OperationHandle opHandle)
+ TableSchema getResultSetMetadata(OperationHandle opHandle)
throws HiveSQLException;
- public RowSet fetchResults(OperationHandle opHandle, FetchOrientation orientation,
+ RowSet fetchResults(OperationHandle opHandle, FetchOrientation orientation,
long maxRows, FetchType fetchType) throws HiveSQLException;
- public String getDelegationToken(HiveAuthFactory authFactory, String owner,
+ String getDelegationToken(HiveAuthFactory authFactory, String owner,
String renewer) throws HiveSQLException;
- public void cancelDelegationToken(HiveAuthFactory authFactory, String tokenStr)
+ void cancelDelegationToken(HiveAuthFactory authFactory, String tokenStr)
throws HiveSQLException;
- public void renewDelegationToken(HiveAuthFactory authFactory, String tokenStr)
+ void renewDelegationToken(HiveAuthFactory authFactory, String tokenStr)
throws HiveSQLException;
+
+ void closeExpiredOperations();
}
Modified: hive/trunk/service/src/java/org/apache/hive/service/cli/session/HiveSessionBase.java
URL: http://svn.apache.org/viewvc/hive/trunk/service/src/java/org/apache/hive/service/cli/session/HiveSessionBase.java?rev=1621912&r1=1621911&r2=1621912&view=diff
==============================================================================
--- hive/trunk/service/src/java/org/apache/hive/service/cli/session/HiveSessionBase.java (original)
+++ hive/trunk/service/src/java/org/apache/hive/service/cli/session/HiveSessionBase.java Tue Sep 2 04:41:29 2014
@@ -92,4 +92,6 @@ public interface HiveSessionBase {
String getIpAddress();
void setIpAddress(String ipAddress);
+
+ long getLastAccessTime();
}
Modified: hive/trunk/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java
URL: http://svn.apache.org/viewvc/hive/trunk/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java?rev=1621912&r1=1621911&r2=1621912&view=diff
==============================================================================
--- hive/trunk/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java (original)
+++ hive/trunk/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java Tue Sep 2 04:41:29 2014
@@ -54,6 +54,7 @@ import org.apache.hive.service.cli.opera
import org.apache.hive.service.cli.operation.GetTableTypesOperation;
import org.apache.hive.service.cli.operation.GetTypeInfoOperation;
import org.apache.hive.service.cli.operation.MetadataOperation;
+import org.apache.hive.service.cli.operation.Operation;
import org.apache.hive.service.cli.operation.OperationManager;
import org.apache.hive.service.cli.thrift.TProtocolVersion;
import org.apache.hive.service.server.ThreadWithGarbageCleanup;
@@ -84,6 +85,8 @@ public class HiveSessionImpl implements
private boolean isOperationLogEnabled;
private File sessionLogDir;
+ private volatile long lastAccessTime;
+
public HiveSessionImpl(TProtocolVersion protocol, String username, String password,
HiveConf serverhiveConf, String ipAddress) {
this.username = username;
@@ -108,6 +111,8 @@ public class HiveSessionImpl implements
sessionState = new SessionState(hiveConf, username);
sessionState.setUserIpAddress(ipAddress);
sessionState.setIsHiveServerQuery(true);
+
+ lastAccessTime = System.currentTimeMillis();
SessionState.start(sessionState);
}
@@ -239,10 +244,13 @@ public class HiveSessionImpl implements
SessionState.start(sessionState);
}
- protected synchronized void acquire() throws HiveSQLException {
+ protected synchronized void acquire(boolean userAccess) {
// Need to make sure that the this HiveServer2's session's session state is
// stored in the thread local for the handler thread.
SessionState.setCurrentSessionState(sessionState);
+ if (userAccess) {
+ lastAccessTime = System.currentTimeMillis();
+ }
}
/**
@@ -252,14 +260,16 @@ public class HiveSessionImpl implements
* when this thread is garbage collected later.
* @see org.apache.hive.service.server.ThreadWithGarbageCleanup#finalize()
*/
- protected synchronized void release() {
- assert sessionState != null;
+ protected synchronized void release(boolean userAccess) {
SessionState.detachSession();
if (ThreadWithGarbageCleanup.currentThread() instanceof ThreadWithGarbageCleanup) {
ThreadWithGarbageCleanup currentThread =
(ThreadWithGarbageCleanup) ThreadWithGarbageCleanup.currentThread();
currentThread.cacheThreadLocalRawStore();
}
+ if (userAccess) {
+ lastAccessTime = System.currentTimeMillis();
+ }
}
@Override
@@ -298,7 +308,7 @@ public class HiveSessionImpl implements
@Override
public GetInfoValue getInfo(GetInfoType getInfoType)
throws HiveSQLException {
- acquire();
+ acquire(true);
try {
switch (getInfoType) {
case CLI_SERVER_NAME:
@@ -318,7 +328,7 @@ public class HiveSessionImpl implements
throw new HiveSQLException("Unrecognized GetInfoType value: " + getInfoType.toString());
}
} finally {
- release();
+ release(true);
}
}
@@ -337,7 +347,7 @@ public class HiveSessionImpl implements
private OperationHandle executeStatementInternal(String statement, Map<String, String> confOverlay,
boolean runAsync)
throws HiveSQLException {
- acquire();
+ acquire(true);
OperationManager operationManager = getOperationManager();
ExecuteStatementOperation operation = operationManager
@@ -355,14 +365,14 @@ public class HiveSessionImpl implements
}
throw e;
} finally {
- release();
+ release(true);
}
}
@Override
public OperationHandle getTypeInfo()
throws HiveSQLException {
- acquire();
+ acquire(true);
OperationManager operationManager = getOperationManager();
GetTypeInfoOperation operation = operationManager.newGetTypeInfoOperation(getSession());
@@ -375,14 +385,14 @@ public class HiveSessionImpl implements
operationManager.closeOperation(opHandle);
throw e;
} finally {
- release();
+ release(true);
}
}
@Override
public OperationHandle getCatalogs()
throws HiveSQLException {
- acquire();
+ acquire(true);
OperationManager operationManager = getOperationManager();
GetCatalogsOperation operation = operationManager.newGetCatalogsOperation(getSession());
@@ -395,14 +405,14 @@ public class HiveSessionImpl implements
operationManager.closeOperation(opHandle);
throw e;
} finally {
- release();
+ release(true);
}
}
@Override
public OperationHandle getSchemas(String catalogName, String schemaName)
throws HiveSQLException {
- acquire();
+ acquire(true);
OperationManager operationManager = getOperationManager();
GetSchemasOperation operation =
@@ -416,7 +426,7 @@ public class HiveSessionImpl implements
operationManager.closeOperation(opHandle);
throw e;
} finally {
- release();
+ release(true);
}
}
@@ -424,7 +434,7 @@ public class HiveSessionImpl implements
public OperationHandle getTables(String catalogName, String schemaName, String tableName,
List<String> tableTypes)
throws HiveSQLException {
- acquire();
+ acquire(true);
OperationManager operationManager = getOperationManager();
MetadataOperation operation =
@@ -438,14 +448,14 @@ public class HiveSessionImpl implements
operationManager.closeOperation(opHandle);
throw e;
} finally {
- release();
+ release(true);
}
}
@Override
public OperationHandle getTableTypes()
throws HiveSQLException {
- acquire();
+ acquire(true);
OperationManager operationManager = getOperationManager();
GetTableTypesOperation operation = operationManager.newGetTableTypesOperation(getSession());
@@ -458,14 +468,14 @@ public class HiveSessionImpl implements
operationManager.closeOperation(opHandle);
throw e;
} finally {
- release();
+ release(true);
}
}
@Override
public OperationHandle getColumns(String catalogName, String schemaName,
String tableName, String columnName) throws HiveSQLException {
- acquire();
+ acquire(true);
OperationManager operationManager = getOperationManager();
GetColumnsOperation operation = operationManager.newGetColumnsOperation(getSession(),
@@ -479,14 +489,14 @@ public class HiveSessionImpl implements
operationManager.closeOperation(opHandle);
throw e;
} finally {
- release();
+ release(true);
}
}
@Override
public OperationHandle getFunctions(String catalogName, String schemaName, String functionName)
throws HiveSQLException {
- acquire();
+ acquire(true);
OperationManager operationManager = getOperationManager();
GetFunctionsOperation operation = operationManager
@@ -500,14 +510,14 @@ public class HiveSessionImpl implements
operationManager.closeOperation(opHandle);
throw e;
} finally {
- release();
+ release(true);
}
}
@Override
public void close() throws HiveSQLException {
try {
- acquire();
+ acquire(true);
/**
* For metadata operations like getTables(), getColumns() etc,
* the session allocates a private metastore handler which should be
@@ -532,7 +542,7 @@ public class HiveSessionImpl implements
} catch (IOException ioe) {
throw new HiveSQLException("Failure to close", ioe);
} finally {
- release();
+ release(true);
}
}
@@ -562,50 +572,79 @@ public class HiveSessionImpl implements
}
@Override
+ public long getLastAccessTime() {
+ return lastAccessTime;
+ }
+
+ @Override
+ public void closeExpiredOperations() {
+ OperationHandle[] handles = opHandleSet.toArray(new OperationHandle[opHandleSet.size()]);
+ if (handles.length > 0) {
+ List<Operation> operations = operationManager.removeExpiredOperations(handles);
+ if (!operations.isEmpty()) {
+ closeTimedOutOperations(operations);
+ }
+ }
+ }
+
+ private void closeTimedOutOperations(List<Operation> operations) {
+ acquire(false);
+ try {
+ for (Operation operation : operations) {
+ opHandleSet.remove(operation.getHandle());
+ try {
+ operation.close();
+ } catch (Exception e) {
+ LOG.warn("Exception is thrown closing timed-out operation " + operation.getHandle(), e);
+ }
+ }
+ } finally {
+ release(false);
+ }
+ }
+
+ @Override
public void cancelOperation(OperationHandle opHandle) throws HiveSQLException {
- acquire();
+ acquire(true);
try {
sessionManager.getOperationManager().cancelOperation(opHandle);
} finally {
- release();
+ release(true);
}
}
@Override
public void closeOperation(OperationHandle opHandle) throws HiveSQLException {
- acquire();
+ acquire(true);
try {
operationManager.closeOperation(opHandle);
opHandleSet.remove(opHandle);
} finally {
- release();
+ release(true);
}
}
@Override
public TableSchema getResultSetMetadata(OperationHandle opHandle) throws HiveSQLException {
- acquire();
+ acquire(true);
try {
return sessionManager.getOperationManager().getOperationResultSetSchema(opHandle);
} finally {
- release();
+ release(true);
}
}
@Override
public RowSet fetchResults(OperationHandle opHandle, FetchOrientation orientation,
long maxRows, FetchType fetchType) throws HiveSQLException {
- acquire();
+ acquire(true);
try {
if (fetchType == FetchType.QUERY_OUTPUT) {
- return sessionManager.getOperationManager()
- .getOperationNextRowSet(opHandle, orientation, maxRows);
- } else {
- return sessionManager.getOperationManager()
- .getOperationLogRowSet(opHandle, orientation, maxRows);
+ return operationManager.getOperationNextRowSet(opHandle, orientation, maxRows);
}
+ return operationManager.getOperationLogRowSet(opHandle, orientation, maxRows);
} finally {
- release();
+ release(true);
}
}
Modified: hive/trunk/service/src/java/org/apache/hive/service/cli/session/HiveSessionImplwithUGI.java
URL: http://svn.apache.org/viewvc/hive/trunk/service/src/java/org/apache/hive/service/cli/session/HiveSessionImplwithUGI.java?rev=1621912&r1=1621911&r2=1621912&view=diff
==============================================================================
--- hive/trunk/service/src/java/org/apache/hive/service/cli/session/HiveSessionImplwithUGI.java (original)
+++ hive/trunk/service/src/java/org/apache/hive/service/cli/session/HiveSessionImplwithUGI.java Tue Sep 2 04:41:29 2014
@@ -19,7 +19,6 @@
package org.apache.hive.service.cli.session;
import java.io.IOException;
-import java.util.Map;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.metadata.Hive;
@@ -83,8 +82,8 @@ public class HiveSessionImplwithUGI exte
}
@Override
- protected synchronized void acquire() throws HiveSQLException {
- super.acquire();
+ protected synchronized void acquire(boolean userAccess) {
+ super.acquire(userAccess);
// if we have a metastore connection with impersonation, then set it first
if (sessionHive != null) {
Hive.set(sessionHive);
@@ -98,11 +97,11 @@ public class HiveSessionImplwithUGI exte
@Override
public void close() throws HiveSQLException {
try {
- acquire();
+ acquire(true);
ShimLoader.getHadoopShims().closeAllForUGI(sessionUgi);
cancelDelegationToken();
} finally {
- release();
+ release(true);
super.close();
}
}
Modified: hive/trunk/service/src/java/org/apache/hive/service/cli/session/SessionManager.java
URL: http://svn.apache.org/viewvc/hive/trunk/service/src/java/org/apache/hive/service/cli/session/SessionManager.java?rev=1621912&r1=1621911&r2=1621912&view=diff
==============================================================================
--- hive/trunk/service/src/java/org/apache/hive/service/cli/session/SessionManager.java (original)
+++ hive/trunk/service/src/java/org/apache/hive/service/cli/session/SessionManager.java Tue Sep 2 04:41:29 2014
@@ -20,6 +20,8 @@ package org.apache.hive.service.cli.sess
import java.io.File;
import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@@ -59,6 +61,11 @@ public class SessionManager extends Comp
private boolean isOperationLogEnabled;
private File operationLogRootDir;
+ private long checkInterval;
+ private long sessionTimeout;
+
+ private volatile boolean shutdown;
+
public SessionManager() {
super("SessionManager");
}
@@ -81,20 +88,28 @@ public class SessionManager extends Comp
}
private void createBackgroundOperationPool() {
- int backgroundPoolSize = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_ASYNC_EXEC_THREADS);
- LOG.info("HiveServer2: Background operation thread pool size: " + backgroundPoolSize);
- int backgroundPoolQueueSize = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_ASYNC_EXEC_WAIT_QUEUE_SIZE);
- LOG.info("HiveServer2: Background operation thread wait queue size: " + backgroundPoolQueueSize);
- int keepAliveTime = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_ASYNC_EXEC_KEEPALIVE_TIME);
- LOG.info("HiveServer2: Background operation thread keepalive time: " + keepAliveTime);
- // Create a thread pool with #backgroundPoolSize threads
+ int poolSize = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_ASYNC_EXEC_THREADS);
+ LOG.info("HiveServer2: Background operation thread pool size: " + poolSize);
+ int poolQueueSize = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_ASYNC_EXEC_WAIT_QUEUE_SIZE);
+ LOG.info("HiveServer2: Background operation thread wait queue size: " + poolQueueSize);
+ long keepAliveTime = HiveConf.getTimeVar(
+ hiveConf, ConfVars.HIVE_SERVER2_ASYNC_EXEC_KEEPALIVE_TIME, TimeUnit.SECONDS);
+ LOG.info(
+ "HiveServer2: Background operation thread keepalive time: " + keepAliveTime + " seconds");
+
+ // Create a thread pool with #poolSize threads
// Threads terminate when they are idle for more than the keepAliveTime
- // A bounded blocking queue is used to queue incoming operations, if #operations > backgroundPoolSize
+ // A bounded blocking queue is used to queue incoming operations, if #operations > poolSize
String threadPoolName = "HiveServer2-Background-Pool";
- backgroundOperationPool = new ThreadPoolExecutor(backgroundPoolSize, backgroundPoolSize,
- keepAliveTime, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(backgroundPoolQueueSize),
+ backgroundOperationPool = new ThreadPoolExecutor(poolSize, poolSize,
+ keepAliveTime, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(poolQueueSize),
new ThreadFactoryWithGarbageCleanup(threadPoolName));
backgroundOperationPool.allowCoreThreadTimeOut(true);
+
+ checkInterval = HiveConf.getTimeVar(
+ hiveConf, ConfVars.HIVE_SERVER2_SESSION_CHECK_INTERVAL, TimeUnit.MILLISECONDS);
+ sessionTimeout = HiveConf.getTimeVar(
+ hiveConf, ConfVars.HIVE_SERVER2_IDLE_SESSION_TIMEOUT, TimeUnit.MILLISECONDS);
}
private void applyAuthorizationConfigPolicy(HiveConf newHiveConf) throws HiveException {
@@ -139,20 +154,61 @@ public class SessionManager extends Comp
@Override
public synchronized void start() {
super.start();
+ if (checkInterval > 0) {
+ startTimeoutChecker();
+ }
+ }
+
+ private void startTimeoutChecker() {
+ final long interval = Math.max(checkInterval, 3000l); // minimum 3 seconds
+ Runnable timeoutChecker = new Runnable() {
+ @Override
+ public void run() {
+ for (sleepInterval(interval); !shutdown; sleepInterval(interval)) {
+ long current = System.currentTimeMillis();
+ for (HiveSession session : new ArrayList<HiveSession>(handleToSession.values())) {
+ if (sessionTimeout > 0 && session.getLastAccessTime() + sessionTimeout <= current) {
+ SessionHandle handle = session.getSessionHandle();
+ LOG.warn("Session " + handle + " is Timed-out (last access : " +
+ new Date(session.getLastAccessTime()) + ") and will be closed");
+ try {
+ closeSession(handle);
+ } catch (HiveSQLException e) {
+ LOG.warn("Exception is thrown closing session " + handle, e);
+ }
+ } else {
+ session.closeExpiredOperations();
+ }
+ }
+ }
+ }
+
+ private void sleepInterval(long interval) {
+ try {
+ Thread.sleep(interval);
+ } catch (InterruptedException e) {
+ // ignore
+ }
+ }
+ };
+ backgroundOperationPool.execute(timeoutChecker);
}
@Override
public synchronized void stop() {
super.stop();
+ shutdown = true;
if (backgroundOperationPool != null) {
backgroundOperationPool.shutdown();
- int timeout = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_ASYNC_EXEC_SHUTDOWN_TIMEOUT);
+ long timeout = hiveConf.getTimeVar(
+ ConfVars.HIVE_SERVER2_ASYNC_EXEC_SHUTDOWN_TIMEOUT, TimeUnit.SECONDS);
try {
backgroundOperationPool.awaitTermination(timeout, TimeUnit.SECONDS);
} catch (InterruptedException e) {
LOG.warn("HIVE_SERVER2_ASYNC_EXEC_SHUTDOWN_TIMEOUT = " + timeout +
" seconds has been exceeded. RUNNING background operations will be shut down", e);
}
+ backgroundOperationPool = null;
}
cleanupLoggingRootDir();
}
Modified: hive/trunk/service/src/java/org/apache/hive/service/cli/thrift/ThriftBinaryCLIService.java
URL: http://svn.apache.org/viewvc/hive/trunk/service/src/java/org/apache/hive/service/cli/thrift/ThriftBinaryCLIService.java?rev=1621912&r1=1621911&r2=1621912&view=diff
==============================================================================
--- hive/trunk/service/src/java/org/apache/hive/service/cli/thrift/ThriftBinaryCLIService.java (original)
+++ hive/trunk/service/src/java/org/apache/hive/service/cli/thrift/ThriftBinaryCLIService.java Tue Sep 2 04:41:29 2014
@@ -70,7 +70,8 @@ public class ThriftBinaryCLIService exte
minWorkerThreads = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_THRIFT_MIN_WORKER_THREADS);
maxWorkerThreads = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_THRIFT_MAX_WORKER_THREADS);
- workerKeepAliveTime = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_THRIFT_WORKER_KEEPALIVE_TIME);
+ workerKeepAliveTime = hiveConf.getTimeVar(
+ ConfVars.HIVE_SERVER2_THRIFT_WORKER_KEEPALIVE_TIME, TimeUnit.SECONDS);
String threadPoolName = "HiveServer2-Handler-Pool";
ExecutorService executorService = new ThreadPoolExecutor(minWorkerThreads, maxWorkerThreads,
workerKeepAliveTime, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(),
Modified: hive/trunk/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java
URL: http://svn.apache.org/viewvc/hive/trunk/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java?rev=1621912&r1=1621911&r2=1621912&view=diff
==============================================================================
--- hive/trunk/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java (original)
+++ hive/trunk/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java Tue Sep 2 04:41:29 2014
@@ -61,7 +61,7 @@ public abstract class ThriftCLIService e
protected int minWorkerThreads;
protected int maxWorkerThreads;
- protected int workerKeepAliveTime;
+ protected long workerKeepAliveTime;
protected static HiveAuthFactory hiveAuthFactory;
Modified: hive/trunk/service/src/java/org/apache/hive/service/cli/thrift/ThriftHttpCLIService.java
URL: http://svn.apache.org/viewvc/hive/trunk/service/src/java/org/apache/hive/service/cli/thrift/ThriftHttpCLIService.java?rev=1621912&r1=1621911&r2=1621912&view=diff
==============================================================================
--- hive/trunk/service/src/java/org/apache/hive/service/cli/thrift/ThriftHttpCLIService.java (original)
+++ hive/trunk/service/src/java/org/apache/hive/service/cli/thrift/ThriftHttpCLIService.java Tue Sep 2 04:41:29 2014
@@ -69,7 +69,8 @@ public class ThriftHttpCLIService extend
minWorkerThreads = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_THRIFT_HTTP_MIN_WORKER_THREADS);
maxWorkerThreads = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_THRIFT_HTTP_MAX_WORKER_THREADS);
- workerKeepAliveTime = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_THRIFT_HTTP_WORKER_KEEPALIVE_TIME);
+ workerKeepAliveTime = hiveConf.getTimeVar(
+ ConfVars.HIVE_SERVER2_THRIFT_HTTP_WORKER_KEEPALIVE_TIME, TimeUnit.SECONDS);
String httpPath = getHttpPath(hiveConf.getVar(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_HTTP_PATH));
@@ -110,7 +111,8 @@ public class ThriftHttpCLIService extend
// Linux:yes, Windows:no
connector.setReuseAddress(!Shell.WINDOWS);
- int maxIdleTime = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_THRIFT_HTTP_MAX_IDLE_TIME);
+ int maxIdleTime = (int) hiveConf.getTimeVar(
+ ConfVars.HIVE_SERVER2_THRIFT_HTTP_MAX_IDLE_TIME, TimeUnit.MILLISECONDS);
connector.setMaxIdleTime(maxIdleTime);
httpServer.addConnector(connector);
Modified: hive/trunk/service/src/test/org/apache/hive/service/cli/CLIServiceTest.java
URL: http://svn.apache.org/viewvc/hive/trunk/service/src/test/org/apache/hive/service/cli/CLIServiceTest.java?rev=1621912&r1=1621911&r2=1621912&view=diff
==============================================================================
--- hive/trunk/service/src/test/org/apache/hive/service/cli/CLIServiceTest.java (original)
+++ hive/trunk/service/src/test/org/apache/hive/service/cli/CLIServiceTest.java Tue Sep 2 04:41:29 2014
@@ -26,9 +26,9 @@ import static org.junit.Assert.fail;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
+import java.util.concurrent.TimeUnit;
import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -202,7 +202,8 @@ public abstract class CLIServiceTest {
* to give a compile time error.
* (compilation is done synchronous as of now)
*/
- longPollingTimeout = new HiveConf().getLongVar(ConfVars.HIVE_SERVER2_LONG_POLLING_TIMEOUT);
+ longPollingTimeout = HiveConf.getTimeVar(new HiveConf(),
+ HiveConf.ConfVars.HIVE_SERVER2_LONG_POLLING_TIMEOUT, TimeUnit.MILLISECONDS);
queryString = "SELECT NON_EXISTING_COLUMN FROM " + tableName;
try {
runQueryAsync(sessionHandle, queryString, confOverlay, OperationState.ERROR, longPollingTimeout);
@@ -295,7 +296,7 @@ public abstract class CLIServiceTest {
long longPollingTimeDelta;
OperationStatus opStatus = null;
OperationState state = null;
- confOverlay.put(HiveConf.ConfVars.HIVE_SERVER2_LONG_POLLING_TIMEOUT.varname, String.valueOf(longPollingTimeout));
+ confOverlay.put(HiveConf.ConfVars.HIVE_SERVER2_LONG_POLLING_TIMEOUT.varname, longPollingTimeout + "ms");
OperationHandle opHandle = client.executeStatementAsync(sessionHandle, queryString, confOverlay);
int count = 0;
while (true) {