You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by pr...@apache.org on 2014/03/22 18:26:02 UTC
svn commit: r1580264 - in /hive/trunk:
itests/hive-unit/src/test/java/org/apache/hive/jdbc/
jdbc/src/java/org/apache/hive/jdbc/
service/src/java/org/apache/hive/service/cli/
service/src/java/org/apache/hive/service/cli/operation/
Author: prasadm
Date: Sat Mar 22 17:26:01 2014
New Revision: 1580264
URL: http://svn.apache.org/r1580264
Log:
HIVE-6472: JDBC cancel will not work with current HiveServer2 (Vaibhav Gumashta via Prasad Mujumdar)
Modified:
hive/trunk/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcDriver2.java
hive/trunk/jdbc/src/java/org/apache/hive/jdbc/HiveStatement.java
hive/trunk/service/src/java/org/apache/hive/service/cli/OperationState.java
hive/trunk/service/src/java/org/apache/hive/service/cli/operation/ExecuteStatementOperation.java
hive/trunk/service/src/java/org/apache/hive/service/cli/operation/Operation.java
hive/trunk/service/src/java/org/apache/hive/service/cli/operation/OperationManager.java
hive/trunk/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java
Modified: hive/trunk/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcDriver2.java
URL: http://svn.apache.org/viewvc/hive/trunk/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcDriver2.java?rev=1580264&r1=1580263&r2=1580264&view=diff
==============================================================================
--- hive/trunk/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcDriver2.java (original)
+++ hive/trunk/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcDriver2.java Sat Mar 22 17:26:01 2014
@@ -49,6 +49,7 @@ import java.util.regex.Pattern;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.TableType;
+import org.apache.hadoop.hive.ql.exec.UDF;
import org.apache.hadoop.hive.ql.processors.DfsProcessor;
import org.apache.hadoop.hive.ql.processors.SetProcessor;
import org.apache.hive.common.util.HiveVersionInfo;
@@ -2005,4 +2006,68 @@ public class TestJdbcDriver2 {
assertEquals("role1", res.getString(1));
res.close();
}
+
+ /**
+ * Test the cancellation of a query that is running.
+ * We spawn 2 threads - one running the query and
+ * the other attempting to cancel.
+ * We're using a dummy udf to simulate a query,
+ * that runs for a sufficiently long time.
+ * @throws Exception
+ */
+ @Test
+ public void testQueryCancel() throws Exception {
+ String udfName = SleepUDF.class.getName();
+ Statement stmt1 = con.createStatement();
+ stmt1.execute("create temporary function sleepUDF as '" + udfName + "'");
+ stmt1.close();
+ final Statement stmt = con.createStatement();
+ // Thread executing the query
+ Thread tExecute = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ System.out.println("Executing query: ");
+ stmt.executeQuery("select sleepUDF(t1.under_col) as u0, t1.under_col as u1, " +
+ "t2.under_col as u2 from " + tableName + "t1 join " + tableName +
+ " t2 on t1.under_col = t2.under_col");
+ fail("Expecting SQLException");
+ } catch (SQLException e) {
+ // This thread should throw an exception
+ assertNotNull(e);
+ System.out.println(e.toString());
+ }
+ }
+ });
+ // Thread cancelling the query
+ Thread tCancel = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ Thread.sleep(1000);
+ System.out.println("Cancelling query: ");
+ stmt.cancel();
+ } catch (Exception e) {
+ // No-op
+ }
+ }
+ });
+ tExecute.start();
+ tCancel.start();
+ tExecute.join();
+ tCancel.join();
+ stmt.close();
+ }
+
+ // A udf which sleeps for 100ms to simulate a long running query
+ public static class SleepUDF extends UDF {
+ public Integer evaluate(final Integer value) {
+ try {
+ Thread.sleep(100);
+ } catch (InterruptedException e) {
+ // No-op
+ }
+ return value;
+ }
+ }
}
Modified: hive/trunk/jdbc/src/java/org/apache/hive/jdbc/HiveStatement.java
URL: http://svn.apache.org/viewvc/hive/trunk/jdbc/src/java/org/apache/hive/jdbc/HiveStatement.java?rev=1580264&r1=1580263&r2=1580264&view=diff
==============================================================================
--- hive/trunk/jdbc/src/java/org/apache/hive/jdbc/HiveStatement.java (original)
+++ hive/trunk/jdbc/src/java/org/apache/hive/jdbc/HiveStatement.java Sat Mar 22 17:26:01 2014
@@ -24,6 +24,7 @@ import java.sql.SQLException;
import java.sql.SQLWarning;
import java.util.HashMap;
import java.util.Map;
+import java.util.concurrent.locks.ReentrantLock;
import org.apache.hive.service.cli.thrift.TCLIService;
import org.apache.hive.service.cli.thrift.TCancelOperationReq;
@@ -75,9 +76,9 @@ public class HiveStatement implements ja
*/
private boolean isClosed = false;
- /**
- *
- */
+ // A fair reentrant lock
+ private ReentrantLock transportLock = new ReentrantLock(true);
+
public HiveStatement(HiveConnection connection, TCLIService.Iface client,
TSessionHandle sessHandle) {
this(connection, client, sessHandle, false);
@@ -121,7 +122,9 @@ public class HiveStatement implements ja
TCancelOperationReq cancelReq = new TCancelOperationReq();
cancelReq.setOperationHandle(stmtHandle);
try {
+ transportLock.lock();
TCancelOperationResp cancelResp = client.CancelOperation(cancelReq);
+ transportLock.unlock();
Utils.verifySuccessWithInfo(cancelResp.getStatus());
} catch (SQLException e) {
throw e;
@@ -157,7 +160,9 @@ public class HiveStatement implements ja
if (stmtHandle != null) {
TCloseOperationReq closeReq = new TCloseOperationReq();
closeReq.setOperationHandle(stmtHandle);
+ transportLock.lock();
TCloseOperationResp closeResp = client.CloseOperation(closeReq);
+ transportLock.unlock();
Utils.verifySuccessWithInfo(closeResp.getStatus());
}
} catch (SQLException e) {
@@ -217,9 +222,11 @@ public class HiveStatement implements ja
*/
execReq.setRunAsync(true);
execReq.setConfOverlay(sessConf);
+ transportLock.lock();
TExecuteStatementResp execResp = client.ExecuteStatement(execReq);
Utils.verifySuccessWithInfo(execResp.getStatus());
stmtHandle = execResp.getOperationHandle();
+ transportLock.unlock();
} catch (SQLException eS) {
throw eS;
} catch (Exception ex) {
@@ -237,7 +244,9 @@ public class HiveStatement implements ja
* For an async SQLOperation, GetOperationStatus will use the long polling approach
* It will essentially return after the HIVE_SERVER2_LONG_POLLING_TIMEOUT (a server config) expires
*/
+ transportLock.lock();
statusResp = client.GetOperationStatus(statusReq);
+ transportLock.unlock();
Utils.verifySuccessWithInfo(statusResp.getStatus());
if (statusResp.isSetOperationState()) {
switch (statusResp.getOperationState()) {
Modified: hive/trunk/service/src/java/org/apache/hive/service/cli/OperationState.java
URL: http://svn.apache.org/viewvc/hive/trunk/service/src/java/org/apache/hive/service/cli/OperationState.java?rev=1580264&r1=1580263&r2=1580264&view=diff
==============================================================================
--- hive/trunk/service/src/java/org/apache/hive/service/cli/OperationState.java (original)
+++ hive/trunk/service/src/java/org/apache/hive/service/cli/OperationState.java Sat Mar 22 17:26:01 2014
@@ -40,7 +40,6 @@ public enum OperationState {
this.tOperationState = tOperationState;
}
-
public static OperationState getOperationState(TOperationState tOperationState) {
// TODO: replace this with a Map?
for (OperationState opState : values()) {
@@ -51,13 +50,15 @@ public enum OperationState {
return OperationState.UNKNOWN;
}
- public static void validateTransition(OperationState oldState, OperationState newState)
- throws HiveSQLException {
+ public static void validateTransition(OperationState oldState,
+ OperationState newState)
+ throws HiveSQLException {
switch (oldState) {
case INITIALIZED:
switch (newState) {
case PENDING:
case RUNNING:
+ case CANCELED:
case CLOSED:
return;
}
Modified: hive/trunk/service/src/java/org/apache/hive/service/cli/operation/ExecuteStatementOperation.java
URL: http://svn.apache.org/viewvc/hive/trunk/service/src/java/org/apache/hive/service/cli/operation/ExecuteStatementOperation.java?rev=1580264&r1=1580263&r2=1580264&view=diff
==============================================================================
--- hive/trunk/service/src/java/org/apache/hive/service/cli/operation/ExecuteStatementOperation.java (original)
+++ hive/trunk/service/src/java/org/apache/hive/service/cli/operation/ExecuteStatementOperation.java Sat Mar 22 17:26:01 2014
@@ -17,8 +17,6 @@
*/
package org.apache.hive.service.cli.operation;
-
-
import java.sql.SQLException;
import java.util.HashMap;
import java.util.Map;
Modified: hive/trunk/service/src/java/org/apache/hive/service/cli/operation/Operation.java
URL: http://svn.apache.org/viewvc/hive/trunk/service/src/java/org/apache/hive/service/cli/operation/Operation.java?rev=1580264&r1=1580263&r2=1580264&view=diff
==============================================================================
--- hive/trunk/service/src/java/org/apache/hive/service/cli/operation/Operation.java (original)
+++ hive/trunk/service/src/java/org/apache/hive/service/cli/operation/Operation.java Sat Mar 22 17:26:01 2014
@@ -34,7 +34,6 @@ import org.apache.hive.service.cli.Table
import org.apache.hive.service.cli.session.HiveSession;
import org.apache.hive.service.cli.thrift.TProtocolVersion;
-
public abstract class Operation {
protected final HiveSession parentSession;
private OperationState state = OperationState.INITIALIZED;
Modified: hive/trunk/service/src/java/org/apache/hive/service/cli/operation/OperationManager.java
URL: http://svn.apache.org/viewvc/hive/trunk/service/src/java/org/apache/hive/service/cli/operation/OperationManager.java?rev=1580264&r1=1580263&r2=1580264&view=diff
==============================================================================
--- hive/trunk/service/src/java/org/apache/hive/service/cli/operation/OperationManager.java (original)
+++ hive/trunk/service/src/java/org/apache/hive/service/cli/operation/OperationManager.java Sat Mar 22 17:26:01 2014
@@ -22,11 +22,14 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hive.service.AbstractService;
import org.apache.hive.service.cli.FetchOrientation;
import org.apache.hive.service.cli.HiveSQLException;
import org.apache.hive.service.cli.OperationHandle;
+import org.apache.hive.service.cli.OperationState;
import org.apache.hive.service.cli.OperationStatus;
import org.apache.hive.service.cli.RowSet;
import org.apache.hive.service.cli.TableSchema;
@@ -38,6 +41,8 @@ import org.apache.hive.service.cli.sessi
*/
public class OperationManager extends AbstractService {
+ private final Log LOG = LogFactory.getLog(OperationManager.class.getName());
+
private HiveConf hiveConf;
private final Map<OperationHandle, Operation> handleToOperation =
new HashMap<OperationHandle, Operation>();
@@ -124,7 +129,8 @@ public class OperationManager extends Ab
return operation;
}
- public synchronized Operation getOperation(OperationHandle operationHandle) throws HiveSQLException {
+ public synchronized Operation getOperation(OperationHandle operationHandle)
+ throws HiveSQLException {
Operation operation = handleToOperation.get(operationHandle);
if (operation == null) {
throw new HiveSQLException("Invalid OperationHandle: " + operationHandle);
@@ -140,12 +146,26 @@ public class OperationManager extends Ab
return handleToOperation.remove(opHandle);
}
- public OperationStatus getOperationStatus(OperationHandle opHandle) throws HiveSQLException {
+ public OperationStatus getOperationStatus(OperationHandle opHandle)
+ throws HiveSQLException {
return getOperation(opHandle).getStatus();
}
public void cancelOperation(OperationHandle opHandle) throws HiveSQLException {
- getOperation(opHandle).cancel();
+ Operation operation = getOperation(opHandle);
+ OperationState opState = operation.getStatus().getState();
+ if (opState == OperationState.CANCELED ||
+ opState == OperationState.CLOSED ||
+ opState == OperationState.FINISHED ||
+ opState == OperationState.ERROR ||
+ opState == OperationState.UNKNOWN) {
+ // Cancel should be a no-op in either cases
+ LOG.debug(opHandle + ": Operation is already aborted in state - " + opState);
+ }
+ else {
+ LOG.debug(opHandle + ": Attempting to cancel from state - " + opState);
+ operation.cancel();
+ }
}
public void closeOperation(OperationHandle opHandle) throws HiveSQLException {
@@ -161,7 +181,8 @@ public class OperationManager extends Ab
return getOperation(opHandle).getResultSetSchema();
}
- public RowSet getOperationNextRowSet(OperationHandle opHandle) throws HiveSQLException {
+ public RowSet getOperationNextRowSet(OperationHandle opHandle)
+ throws HiveSQLException {
return getOperation(opHandle).getNextRowSet();
}
Modified: hive/trunk/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java
URL: http://svn.apache.org/viewvc/hive/trunk/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java?rev=1580264&r1=1580263&r2=1580264&view=diff
==============================================================================
--- hive/trunk/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java (original)
+++ hive/trunk/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java Sat Mar 22 17:26:01 2014
@@ -135,16 +135,23 @@ public class SQLOperation extends Execut
// case, when calling fetch queries since execute() has returned.
// For now, we disable the test attempts.
driver.setTryCount(Integer.MAX_VALUE);
-
response = driver.run();
if (0 != response.getResponseCode()) {
throw new HiveSQLException("Error while processing statement: "
+ response.getErrorMessage(), response.getSQLState(), response.getResponseCode());
}
-
} catch (HiveSQLException e) {
- setState(OperationState.ERROR);
- throw e;
+ // If the operation was cancelled by another thread,
+ // Driver#run will return a non-zero response code.
+ // We will simply return if the operation state is CANCELED,
+ // otherwise throw an exception
+ if (getStatus().getState() == OperationState.CANCELED) {
+ return;
+ }
+ else {
+ setState(OperationState.ERROR);
+ throw e;
+ }
} catch (Exception e) {
setState(OperationState.ERROR);
throw new HiveSQLException("Error running query: " + e.toString(), e);