You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by pr...@apache.org on 2014/03/22 18:26:02 UTC

svn commit: r1580264 - in /hive/trunk: itests/hive-unit/src/test/java/org/apache/hive/jdbc/ jdbc/src/java/org/apache/hive/jdbc/ service/src/java/org/apache/hive/service/cli/ service/src/java/org/apache/hive/service/cli/operation/

Author: prasadm
Date: Sat Mar 22 17:26:01 2014
New Revision: 1580264

URL: http://svn.apache.org/r1580264
Log:
HIVE-6472: JDBC cancel will not work with current HiveServer2 (Vaibhav Gumashta via Prasad Mujumdar)

Modified:
    hive/trunk/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcDriver2.java
    hive/trunk/jdbc/src/java/org/apache/hive/jdbc/HiveStatement.java
    hive/trunk/service/src/java/org/apache/hive/service/cli/OperationState.java
    hive/trunk/service/src/java/org/apache/hive/service/cli/operation/ExecuteStatementOperation.java
    hive/trunk/service/src/java/org/apache/hive/service/cli/operation/Operation.java
    hive/trunk/service/src/java/org/apache/hive/service/cli/operation/OperationManager.java
    hive/trunk/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java

Modified: hive/trunk/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcDriver2.java
URL: http://svn.apache.org/viewvc/hive/trunk/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcDriver2.java?rev=1580264&r1=1580263&r2=1580264&view=diff
==============================================================================
--- hive/trunk/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcDriver2.java (original)
+++ hive/trunk/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcDriver2.java Sat Mar 22 17:26:01 2014
@@ -49,6 +49,7 @@ import java.util.regex.Pattern;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.TableType;
+import org.apache.hadoop.hive.ql.exec.UDF;
 import org.apache.hadoop.hive.ql.processors.DfsProcessor;
 import org.apache.hadoop.hive.ql.processors.SetProcessor;
 import org.apache.hive.common.util.HiveVersionInfo;
@@ -2005,4 +2006,68 @@ public class TestJdbcDriver2 {
     assertEquals("role1", res.getString(1));
     res.close();
   }
+
+  /**
+   * Test the cancellation of a query that is running.
+   * We spawn 2 threads - one running the query and
+   * the other attempting to cancel.
+   * We're using a dummy udf to simulate a query,
+   * that runs for a sufficiently long time.
+   * @throws Exception
+   */
+  @Test
+  public void testQueryCancel() throws Exception {
+    String udfName = SleepUDF.class.getName();
+    Statement stmt1 = con.createStatement();
+    stmt1.execute("create temporary function sleepUDF as '" + udfName + "'");
+    stmt1.close();
+    final Statement stmt = con.createStatement();
+    // Thread executing the query
+    Thread tExecute = new Thread(new Runnable() {
+      @Override
+      public void run() {
+        try {
+          System.out.println("Executing query: ");
+          stmt.executeQuery("select sleepUDF(t1.under_col) as u0, t1.under_col as u1, " +
+              "t2.under_col as u2 from " + tableName +  "t1 join " + tableName +
+              " t2 on t1.under_col = t2.under_col");
+          fail("Expecting SQLException");
+        } catch (SQLException e) {
+          // This thread should throw an exception
+          assertNotNull(e);
+          System.out.println(e.toString());
+        }
+      }
+    });
+    // Thread cancelling the query
+    Thread tCancel = new Thread(new Runnable() {
+      @Override
+      public void run() {
+        try {
+          Thread.sleep(1000);
+          System.out.println("Cancelling query: ");
+          stmt.cancel();
+        } catch (Exception e) {
+          // No-op
+        }
+      }
+    });
+    tExecute.start();
+    tCancel.start();
+    tExecute.join();
+    tCancel.join();
+    stmt.close();
+  }
+
+  // A udf which sleeps for 100ms to simulate a long running query
+  public static class SleepUDF extends UDF {
+    public Integer evaluate(final Integer value) {
+      try {
+        Thread.sleep(100);
+      } catch (InterruptedException e) {
+        // No-op
+      }
+      return value;
+    }
+  }
 }

Modified: hive/trunk/jdbc/src/java/org/apache/hive/jdbc/HiveStatement.java
URL: http://svn.apache.org/viewvc/hive/trunk/jdbc/src/java/org/apache/hive/jdbc/HiveStatement.java?rev=1580264&r1=1580263&r2=1580264&view=diff
==============================================================================
--- hive/trunk/jdbc/src/java/org/apache/hive/jdbc/HiveStatement.java (original)
+++ hive/trunk/jdbc/src/java/org/apache/hive/jdbc/HiveStatement.java Sat Mar 22 17:26:01 2014
@@ -24,6 +24,7 @@ import java.sql.SQLException;
 import java.sql.SQLWarning;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.locks.ReentrantLock;
 
 import org.apache.hive.service.cli.thrift.TCLIService;
 import org.apache.hive.service.cli.thrift.TCancelOperationReq;
@@ -75,9 +76,9 @@ public class HiveStatement implements ja
    */
   private boolean isClosed = false;
 
-  /**
-   *
-   */
+  // A fair reentrant lock
+  private ReentrantLock transportLock = new ReentrantLock(true);
+
   public HiveStatement(HiveConnection connection, TCLIService.Iface client,
       TSessionHandle sessHandle) {
     this(connection, client, sessHandle, false);
@@ -121,7 +122,9 @@ public class HiveStatement implements ja
     TCancelOperationReq cancelReq = new TCancelOperationReq();
     cancelReq.setOperationHandle(stmtHandle);
     try {
+      transportLock.lock();
       TCancelOperationResp cancelResp = client.CancelOperation(cancelReq);
+      transportLock.unlock();
       Utils.verifySuccessWithInfo(cancelResp.getStatus());
     } catch (SQLException e) {
       throw e;
@@ -157,7 +160,9 @@ public class HiveStatement implements ja
       if (stmtHandle != null) {
         TCloseOperationReq closeReq = new TCloseOperationReq();
         closeReq.setOperationHandle(stmtHandle);
+        transportLock.lock();
         TCloseOperationResp closeResp = client.CloseOperation(closeReq);
+        transportLock.unlock();
         Utils.verifySuccessWithInfo(closeResp.getStatus());
       }
     } catch (SQLException e) {
@@ -217,9 +222,11 @@ public class HiveStatement implements ja
        */
       execReq.setRunAsync(true);
       execReq.setConfOverlay(sessConf);
+      transportLock.lock();
       TExecuteStatementResp execResp = client.ExecuteStatement(execReq);
       Utils.verifySuccessWithInfo(execResp.getStatus());
       stmtHandle = execResp.getOperationHandle();
+      transportLock.unlock();
     } catch (SQLException eS) {
       throw eS;
     } catch (Exception ex) {
@@ -237,7 +244,9 @@ public class HiveStatement implements ja
          * For an async SQLOperation, GetOperationStatus will use the long polling approach
          * It will essentially return after the HIVE_SERVER2_LONG_POLLING_TIMEOUT (a server config) expires
          */
+        transportLock.lock();
         statusResp = client.GetOperationStatus(statusReq);
+        transportLock.unlock();
         Utils.verifySuccessWithInfo(statusResp.getStatus());
         if (statusResp.isSetOperationState()) {
           switch (statusResp.getOperationState()) {

Modified: hive/trunk/service/src/java/org/apache/hive/service/cli/OperationState.java
URL: http://svn.apache.org/viewvc/hive/trunk/service/src/java/org/apache/hive/service/cli/OperationState.java?rev=1580264&r1=1580263&r2=1580264&view=diff
==============================================================================
--- hive/trunk/service/src/java/org/apache/hive/service/cli/OperationState.java (original)
+++ hive/trunk/service/src/java/org/apache/hive/service/cli/OperationState.java Sat Mar 22 17:26:01 2014
@@ -40,7 +40,6 @@ public enum OperationState {
     this.tOperationState = tOperationState;
   }
 
-
   public static OperationState getOperationState(TOperationState tOperationState) {
     // TODO: replace this with a Map?
     for (OperationState opState : values()) {
@@ -51,13 +50,15 @@ public enum OperationState {
     return OperationState.UNKNOWN;
   }
 
-  public static void validateTransition(OperationState oldState, OperationState newState)
-      throws HiveSQLException {
+  public static void validateTransition(OperationState oldState,
+      OperationState newState)
+          throws HiveSQLException {
     switch (oldState) {
     case INITIALIZED:
       switch (newState) {
       case PENDING:
       case RUNNING:
+      case CANCELED:
       case CLOSED:
         return;
       }

Modified: hive/trunk/service/src/java/org/apache/hive/service/cli/operation/ExecuteStatementOperation.java
URL: http://svn.apache.org/viewvc/hive/trunk/service/src/java/org/apache/hive/service/cli/operation/ExecuteStatementOperation.java?rev=1580264&r1=1580263&r2=1580264&view=diff
==============================================================================
--- hive/trunk/service/src/java/org/apache/hive/service/cli/operation/ExecuteStatementOperation.java (original)
+++ hive/trunk/service/src/java/org/apache/hive/service/cli/operation/ExecuteStatementOperation.java Sat Mar 22 17:26:01 2014
@@ -17,8 +17,6 @@
  */
 package org.apache.hive.service.cli.operation;
 
-
-
 import java.sql.SQLException;
 import java.util.HashMap;
 import java.util.Map;

Modified: hive/trunk/service/src/java/org/apache/hive/service/cli/operation/Operation.java
URL: http://svn.apache.org/viewvc/hive/trunk/service/src/java/org/apache/hive/service/cli/operation/Operation.java?rev=1580264&r1=1580263&r2=1580264&view=diff
==============================================================================
--- hive/trunk/service/src/java/org/apache/hive/service/cli/operation/Operation.java (original)
+++ hive/trunk/service/src/java/org/apache/hive/service/cli/operation/Operation.java Sat Mar 22 17:26:01 2014
@@ -34,7 +34,6 @@ import org.apache.hive.service.cli.Table
 import org.apache.hive.service.cli.session.HiveSession;
 import org.apache.hive.service.cli.thrift.TProtocolVersion;
 
-
 public abstract class Operation {
   protected final HiveSession parentSession;
   private OperationState state = OperationState.INITIALIZED;

Modified: hive/trunk/service/src/java/org/apache/hive/service/cli/operation/OperationManager.java
URL: http://svn.apache.org/viewvc/hive/trunk/service/src/java/org/apache/hive/service/cli/operation/OperationManager.java?rev=1580264&r1=1580263&r2=1580264&view=diff
==============================================================================
--- hive/trunk/service/src/java/org/apache/hive/service/cli/operation/OperationManager.java (original)
+++ hive/trunk/service/src/java/org/apache/hive/service/cli/operation/OperationManager.java Sat Mar 22 17:26:01 2014
@@ -22,11 +22,14 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hive.service.AbstractService;
 import org.apache.hive.service.cli.FetchOrientation;
 import org.apache.hive.service.cli.HiveSQLException;
 import org.apache.hive.service.cli.OperationHandle;
+import org.apache.hive.service.cli.OperationState;
 import org.apache.hive.service.cli.OperationStatus;
 import org.apache.hive.service.cli.RowSet;
 import org.apache.hive.service.cli.TableSchema;
@@ -38,6 +41,8 @@ import org.apache.hive.service.cli.sessi
  */
 public class OperationManager extends AbstractService {
 
+  private final Log LOG = LogFactory.getLog(OperationManager.class.getName());
+
   private HiveConf hiveConf;
   private final Map<OperationHandle, Operation> handleToOperation =
       new HashMap<OperationHandle, Operation>();
@@ -124,7 +129,8 @@ public class OperationManager extends Ab
     return operation;
   }
 
-  public synchronized Operation getOperation(OperationHandle operationHandle) throws HiveSQLException {
+  public synchronized Operation getOperation(OperationHandle operationHandle)
+      throws HiveSQLException {
     Operation operation = handleToOperation.get(operationHandle);
     if (operation == null) {
       throw new HiveSQLException("Invalid OperationHandle: " + operationHandle);
@@ -140,12 +146,26 @@ public class OperationManager extends Ab
     return handleToOperation.remove(opHandle);
   }
 
-  public OperationStatus getOperationStatus(OperationHandle opHandle) throws HiveSQLException {
+  public OperationStatus getOperationStatus(OperationHandle opHandle)
+      throws HiveSQLException {
     return getOperation(opHandle).getStatus();
   }
 
   public void cancelOperation(OperationHandle opHandle) throws HiveSQLException {
-    getOperation(opHandle).cancel();
+    Operation operation = getOperation(opHandle);
+    OperationState opState = operation.getStatus().getState();
+    if (opState == OperationState.CANCELED ||
+        opState == OperationState.CLOSED ||
+        opState == OperationState.FINISHED ||
+        opState == OperationState.ERROR ||
+        opState == OperationState.UNKNOWN) {
+      // Cancel should be a no-op in either cases
+      LOG.debug(opHandle + ": Operation is already aborted in state - " + opState);
+    }
+    else {
+      LOG.debug(opHandle + ": Attempting to cancel from state - " + opState);
+      operation.cancel();
+    }
   }
 
   public void closeOperation(OperationHandle opHandle) throws HiveSQLException {
@@ -161,7 +181,8 @@ public class OperationManager extends Ab
     return getOperation(opHandle).getResultSetSchema();
   }
 
-  public RowSet getOperationNextRowSet(OperationHandle opHandle) throws HiveSQLException {
+  public RowSet getOperationNextRowSet(OperationHandle opHandle)
+      throws HiveSQLException {
     return getOperation(opHandle).getNextRowSet();
   }
 

Modified: hive/trunk/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java
URL: http://svn.apache.org/viewvc/hive/trunk/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java?rev=1580264&r1=1580263&r2=1580264&view=diff
==============================================================================
--- hive/trunk/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java (original)
+++ hive/trunk/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java Sat Mar 22 17:26:01 2014
@@ -135,16 +135,23 @@ public class SQLOperation extends Execut
       // case, when calling fetch queries since execute() has returned.
       // For now, we disable the test attempts.
       driver.setTryCount(Integer.MAX_VALUE);
-
       response = driver.run();
       if (0 != response.getResponseCode()) {
         throw new HiveSQLException("Error while processing statement: "
             + response.getErrorMessage(), response.getSQLState(), response.getResponseCode());
       }
-
     } catch (HiveSQLException e) {
-      setState(OperationState.ERROR);
-      throw e;
+      // If the operation was cancelled by another thread,
+      // Driver#run will return a non-zero response code.
+      // We will simply return if the operation state is CANCELED,
+      // otherwise throw an exception
+      if (getStatus().getState() == OperationState.CANCELED) {
+        return;
+      }
+      else {
+        setState(OperationState.ERROR);
+        throw e;
+      }
     } catch (Exception e) {
       setState(OperationState.ERROR);
       throw new HiveSQLException("Error running query: " + e.toString(), e);