You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by th...@apache.org on 2014/03/03 11:29:55 UTC

svn commit: r1573516 - /hive/trunk/jdbc/src/java/org/apache/hive/jdbc/HiveStatement.java

Author: thejas
Date: Mon Mar  3 10:29:54 2014
New Revision: 1573516

URL: http://svn.apache.org/r1573516
Log:
HIVE-5232 : Make JDBC use the new HiveServer2 async execution API by default (Thejas Nair via Vaibhav Gumashta)

Modified:
    hive/trunk/jdbc/src/java/org/apache/hive/jdbc/HiveStatement.java

Modified: hive/trunk/jdbc/src/java/org/apache/hive/jdbc/HiveStatement.java
URL: http://svn.apache.org/viewvc/hive/trunk/jdbc/src/java/org/apache/hive/jdbc/HiveStatement.java?rev=1573516&r1=1573515&r2=1573516&view=diff
==============================================================================
--- hive/trunk/jdbc/src/java/org/apache/hive/jdbc/HiveStatement.java (original)
+++ hive/trunk/jdbc/src/java/org/apache/hive/jdbc/HiveStatement.java Mon Mar  3 10:29:54 2014
@@ -84,7 +84,7 @@ public class HiveStatement implements ja
   }
 
   public HiveStatement(HiveConnection connection, TCLIService.Iface client,
-        TSessionHandle sessHandle, boolean isScrollableResultset) {
+      TSessionHandle sessHandle, boolean isScrollableResultset) {
     this.connection = connection;
     this.client = client;
     this.sessHandle = sessHandle;
@@ -97,6 +97,7 @@ public class HiveStatement implements ja
    * @see java.sql.Statement#addBatch(java.lang.String)
    */
 
+  @Override
   public void addBatch(String sql) throws SQLException {
     throw new SQLException("Method not supported");
   }
@@ -107,6 +108,7 @@ public class HiveStatement implements ja
    * @see java.sql.Statement#cancel()
    */
 
+  @Override
   public void cancel() throws SQLException {
     if (isClosed) {
       throw new SQLException("Can't cancel after statement has been closed");
@@ -134,6 +136,7 @@ public class HiveStatement implements ja
    * @see java.sql.Statement#clearBatch()
    */
 
+  @Override
   public void clearBatch() throws SQLException {
     throw new SQLException("Method not supported");
   }
@@ -144,6 +147,7 @@ public class HiveStatement implements ja
    * @see java.sql.Statement#clearWarnings()
    */
 
+  @Override
   public void clearWarnings() throws SQLException {
     warningChain = null;
   }
@@ -169,6 +173,7 @@ public class HiveStatement implements ja
    *
    * @see java.sql.Statement#close()
    */
+  @Override
   public void close() throws SQLException {
     if (isClosed) {
       return;
@@ -192,6 +197,7 @@ public class HiveStatement implements ja
    * @see java.sql.Statement#execute(java.lang.String)
    */
 
+  @Override
   public boolean execute(String sql) throws SQLException {
     if (isClosed) {
       throw new SQLException("Can't execute after statement has been closed");
@@ -203,6 +209,13 @@ public class HiveStatement implements ja
       }
 
       TExecuteStatementReq execReq = new TExecuteStatementReq(sessHandle, sql);
+      /**
+       * Run asynchronously whenever possible
+       * Currently only a SQLOperation can be run asynchronously,
+       * in a background operation thread
+       * Compilation is synchronous and execution is asynchronous
+       */
+      execReq.setRunAsync(true);
       execReq.setConfOverlay(sessConf);
       TExecuteStatementResp execResp = client.ExecuteStatement(execReq);
       Utils.verifySuccessWithInfo(execResp.getStatus());
@@ -213,46 +226,49 @@ public class HiveStatement implements ja
       throw new SQLException(ex.toString(), "08S01", ex);
     }
 
-    if (!stmtHandle.isHasResultSet()) {
-      // Poll until the query has completed one way or another. DML queries will not return a result
-      // set, but we should not return from this method until the query has completed to avoid
-      // racing with possible subsequent session shutdown, or queries that depend on the results
-      // materialised here.
-      TGetOperationStatusReq statusReq = new TGetOperationStatusReq(stmtHandle);
-      boolean requestComplete = false;
-      while (!requestComplete) {
-        try {
-          TGetOperationStatusResp statusResp = client.GetOperationStatus(statusReq);
-          Utils.verifySuccessWithInfo(statusResp.getStatus());
-          if (statusResp.isSetOperationState()) {
-            switch (statusResp.getOperationState()) {
-            case CLOSED_STATE:
-            case FINISHED_STATE:
-              return false;
-            case CANCELED_STATE:
-              // 01000 -> warning
-              throw new SQLException("Query was cancelled", "01000");
-            case ERROR_STATE:
-              // HY000 -> general error
-              throw new SQLException("Query failed", "HY000");
-            case UKNOWN_STATE:
-              throw new SQLException("Unknown query", "HY000");
-            case INITIALIZED_STATE:
-            case PENDING_STATE:
-            case RUNNING_STATE:
-              break;
-            }
+    TGetOperationStatusReq statusReq = new TGetOperationStatusReq(stmtHandle);
+    boolean operationComplete = false;
+    TGetOperationStatusResp statusResp;
+
+    // Poll on the operation status, till the operation is complete
+    while (!operationComplete) {
+      try {
+        /**
+         * For an async SQLOperation, GetOperationStatus will use the long polling approach
+         * It will essentially return after the HIVE_SERVER2_LONG_POLLING_TIMEOUT (a server config) expires
+         */
+        statusResp = client.GetOperationStatus(statusReq);
+        Utils.verifySuccessWithInfo(statusResp.getStatus());
+        if (statusResp.isSetOperationState()) {
+          switch (statusResp.getOperationState()) {
+          case CLOSED_STATE:
+          case FINISHED_STATE:
+            operationComplete = true;
+            break;
+          case CANCELED_STATE:
+            // 01000 -> warning
+            throw new SQLException("Query was cancelled", "01000");
+          case ERROR_STATE:
+            // Get the error details from the underlying exception
+            throw new SQLException(statusResp.getErrorMessage(),
+                statusResp.getSqlState(), statusResp.getErrorCode());
+          case UKNOWN_STATE:
+            throw new SQLException("Unknown query", "HY000");
+          case INITIALIZED_STATE:
+          case PENDING_STATE:
+          case RUNNING_STATE:
+            break;
           }
-        } catch (Exception ex) {
-          throw new SQLException(ex.toString(), "08S01", ex);
-        }
-
-        try {
-          Thread.sleep(100);
-        } catch (InterruptedException ex) {
-          // Ignore
         }
+      } catch (SQLException e) {
+        throw e;
+      } catch (Exception e) {
+        throw new SQLException(e.toString(), "08S01", e);
       }
+    }
+
+    // The query should be completed by now
+    if (!stmtHandle.isHasResultSet()) {
       return false;
     }
     resultSet =  new HiveQueryResultSet.Builder(this).setClient(client).setSessionHandle(sessHandle)
@@ -268,6 +284,7 @@ public class HiveStatement implements ja
    * @see java.sql.Statement#execute(java.lang.String, int)
    */
 
+  @Override
   public boolean execute(String sql, int autoGeneratedKeys) throws SQLException {
     throw new SQLException("Method not supported");
   }
@@ -278,6 +295,7 @@ public class HiveStatement implements ja
    * @see java.sql.Statement#execute(java.lang.String, int[])
    */
 
+  @Override
   public boolean execute(String sql, int[] columnIndexes) throws SQLException {
     throw new SQLException("Method not supported");
   }
@@ -288,6 +306,7 @@ public class HiveStatement implements ja
    * @see java.sql.Statement#execute(java.lang.String, java.lang.String[])
    */
 
+  @Override
   public boolean execute(String sql, String[] columnNames) throws SQLException {
     throw new SQLException("Method not supported");
   }
@@ -298,6 +317,7 @@ public class HiveStatement implements ja
    * @see java.sql.Statement#executeBatch()
    */
 
+  @Override
   public int[] executeBatch() throws SQLException {
     throw new SQLException("Method not supported");
   }
@@ -308,6 +328,7 @@ public class HiveStatement implements ja
    * @see java.sql.Statement#executeQuery(java.lang.String)
    */
 
+  @Override
   public ResultSet executeQuery(String sql) throws SQLException {
     if (!execute(sql)) {
       throw new SQLException("The query did not generate a result set!");
@@ -321,6 +342,7 @@ public class HiveStatement implements ja
    * @see java.sql.Statement#executeUpdate(java.lang.String)
    */
 
+  @Override
   public int executeUpdate(String sql) throws SQLException {
     execute(sql);
     return 0;
@@ -332,6 +354,7 @@ public class HiveStatement implements ja
    * @see java.sql.Statement#executeUpdate(java.lang.String, int)
    */
 
+  @Override
   public int executeUpdate(String sql, int autoGeneratedKeys) throws SQLException {
     throw new SQLException("Method not supported");
   }
@@ -342,6 +365,7 @@ public class HiveStatement implements ja
    * @see java.sql.Statement#executeUpdate(java.lang.String, int[])
    */
 
+  @Override
   public int executeUpdate(String sql, int[] columnIndexes) throws SQLException {
     throw new SQLException("Method not supported");
   }
@@ -352,6 +376,7 @@ public class HiveStatement implements ja
    * @see java.sql.Statement#executeUpdate(java.lang.String, java.lang.String[])
    */
 
+  @Override
   public int executeUpdate(String sql, String[] columnNames) throws SQLException {
     throw new SQLException("Method not supported");
   }
@@ -362,6 +387,7 @@ public class HiveStatement implements ja
    * @see java.sql.Statement#getConnection()
    */
 
+  @Override
   public Connection getConnection() throws SQLException {
     return this.connection;
   }
@@ -372,6 +398,7 @@ public class HiveStatement implements ja
    * @see java.sql.Statement#getFetchDirection()
    */
 
+  @Override
   public int getFetchDirection() throws SQLException {
     throw new SQLException("Method not supported");
   }
@@ -382,6 +409,7 @@ public class HiveStatement implements ja
    * @see java.sql.Statement#getFetchSize()
    */
 
+  @Override
   public int getFetchSize() throws SQLException {
     return fetchSize;
   }
@@ -392,6 +420,7 @@ public class HiveStatement implements ja
    * @see java.sql.Statement#getGeneratedKeys()
    */
 
+  @Override
   public ResultSet getGeneratedKeys() throws SQLException {
     throw new SQLException("Method not supported");
   }
@@ -402,6 +431,7 @@ public class HiveStatement implements ja
    * @see java.sql.Statement#getMaxFieldSize()
    */
 
+  @Override
   public int getMaxFieldSize() throws SQLException {
     throw new SQLException("Method not supported");
   }
@@ -412,6 +442,7 @@ public class HiveStatement implements ja
    * @see java.sql.Statement#getMaxRows()
    */
 
+  @Override
   public int getMaxRows() throws SQLException {
     return maxRows;
   }
@@ -422,6 +453,7 @@ public class HiveStatement implements ja
    * @see java.sql.Statement#getMoreResults()
    */
 
+  @Override
   public boolean getMoreResults() throws SQLException {
     throw new SQLException("Method not supported");
   }
@@ -432,6 +464,7 @@ public class HiveStatement implements ja
    * @see java.sql.Statement#getMoreResults(int)
    */
 
+  @Override
   public boolean getMoreResults(int current) throws SQLException {
     throw new SQLException("Method not supported");
   }
@@ -442,6 +475,7 @@ public class HiveStatement implements ja
    * @see java.sql.Statement#getQueryTimeout()
    */
 
+  @Override
   public int getQueryTimeout() throws SQLException {
     throw new SQLException("Method not supported");
   }
@@ -452,6 +486,7 @@ public class HiveStatement implements ja
    * @see java.sql.Statement#getResultSet()
    */
 
+  @Override
   public ResultSet getResultSet() throws SQLException {
     return resultSet;
   }
@@ -462,6 +497,7 @@ public class HiveStatement implements ja
    * @see java.sql.Statement#getResultSetConcurrency()
    */
 
+  @Override
   public int getResultSetConcurrency() throws SQLException {
     throw new SQLException("Method not supported");
   }
@@ -472,6 +508,7 @@ public class HiveStatement implements ja
    * @see java.sql.Statement#getResultSetHoldability()
    */
 
+  @Override
   public int getResultSetHoldability() throws SQLException {
     throw new SQLException("Method not supported");
   }
@@ -482,6 +519,7 @@ public class HiveStatement implements ja
    * @see java.sql.Statement#getResultSetType()
    */
 
+  @Override
   public int getResultSetType() throws SQLException {
     throw new SQLException("Method not supported");
   }
@@ -492,6 +530,7 @@ public class HiveStatement implements ja
    * @see java.sql.Statement#getUpdateCount()
    */
 
+  @Override
   public int getUpdateCount() throws SQLException {
     return 0;
   }
@@ -502,6 +541,7 @@ public class HiveStatement implements ja
    * @see java.sql.Statement#getWarnings()
    */
 
+  @Override
   public SQLWarning getWarnings() throws SQLException {
     return warningChain;
   }
@@ -512,6 +552,7 @@ public class HiveStatement implements ja
    * @see java.sql.Statement#isClosed()
    */
 
+  @Override
   public boolean isClosed() throws SQLException {
     return isClosed;
   }
@@ -527,6 +568,7 @@ public class HiveStatement implements ja
    * @see java.sql.Statement#isPoolable()
    */
 
+  @Override
   public boolean isPoolable() throws SQLException {
     throw new SQLException("Method not supported");
   }
@@ -537,6 +579,7 @@ public class HiveStatement implements ja
    * @see java.sql.Statement#setCursorName(java.lang.String)
    */
 
+  @Override
   public void setCursorName(String name) throws SQLException {
     throw new SQLException("Method not supported");
   }
@@ -547,6 +590,7 @@ public class HiveStatement implements ja
    * @see java.sql.Statement#setEscapeProcessing(boolean)
    */
 
+  @Override
   public void setEscapeProcessing(boolean enable) throws SQLException {
     throw new SQLException("Method not supported");
   }
@@ -557,6 +601,7 @@ public class HiveStatement implements ja
    * @see java.sql.Statement#setFetchDirection(int)
    */
 
+  @Override
   public void setFetchDirection(int direction) throws SQLException {
     throw new SQLException("Method not supported");
   }
@@ -567,6 +612,7 @@ public class HiveStatement implements ja
    * @see java.sql.Statement#setFetchSize(int)
    */
 
+  @Override
   public void setFetchSize(int rows) throws SQLException {
     fetchSize = rows;
   }
@@ -577,6 +623,7 @@ public class HiveStatement implements ja
    * @see java.sql.Statement#setMaxFieldSize(int)
    */
 
+  @Override
   public void setMaxFieldSize(int max) throws SQLException {
     throw new SQLException("Method not supported");
   }
@@ -587,6 +634,7 @@ public class HiveStatement implements ja
    * @see java.sql.Statement#setMaxRows(int)
    */
 
+  @Override
   public void setMaxRows(int max) throws SQLException {
     if (max < 0) {
       throw new SQLException("max must be >= 0");
@@ -600,6 +648,7 @@ public class HiveStatement implements ja
    * @see java.sql.Statement#setPoolable(boolean)
    */
 
+  @Override
   public void setPoolable(boolean poolable) throws SQLException {
     throw new SQLException("Method not supported");
   }
@@ -610,6 +659,7 @@ public class HiveStatement implements ja
    * @see java.sql.Statement#setQueryTimeout(int)
    */
 
+  @Override
   public void setQueryTimeout(int seconds) throws SQLException {
     throw new SQLException("Method not supported");
   }
@@ -620,6 +670,7 @@ public class HiveStatement implements ja
    * @see java.sql.Wrapper#isWrapperFor(java.lang.Class)
    */
 
+  @Override
   public boolean isWrapperFor(Class<?> iface) throws SQLException {
     throw new SQLException("Method not supported");
   }
@@ -630,6 +681,7 @@ public class HiveStatement implements ja
    * @see java.sql.Wrapper#unwrap(java.lang.Class)
    */
 
+  @Override
   public <T> T unwrap(Class<T> iface) throws SQLException {
     throw new SQLException("Method not supported");
   }