You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by th...@apache.org on 2014/03/03 11:29:55 UTC
svn commit: r1573516 -
/hive/trunk/jdbc/src/java/org/apache/hive/jdbc/HiveStatement.java
Author: thejas
Date: Mon Mar 3 10:29:54 2014
New Revision: 1573516
URL: http://svn.apache.org/r1573516
Log:
HIVE-5232 : Make JDBC use the new HiveServer2 async execution API by default (Thejas Nair via Vaibhav Gumashta)
Modified:
hive/trunk/jdbc/src/java/org/apache/hive/jdbc/HiveStatement.java
Modified: hive/trunk/jdbc/src/java/org/apache/hive/jdbc/HiveStatement.java
URL: http://svn.apache.org/viewvc/hive/trunk/jdbc/src/java/org/apache/hive/jdbc/HiveStatement.java?rev=1573516&r1=1573515&r2=1573516&view=diff
==============================================================================
--- hive/trunk/jdbc/src/java/org/apache/hive/jdbc/HiveStatement.java (original)
+++ hive/trunk/jdbc/src/java/org/apache/hive/jdbc/HiveStatement.java Mon Mar 3 10:29:54 2014
@@ -84,7 +84,7 @@ public class HiveStatement implements ja
}
public HiveStatement(HiveConnection connection, TCLIService.Iface client,
- TSessionHandle sessHandle, boolean isScrollableResultset) {
+ TSessionHandle sessHandle, boolean isScrollableResultset) {
this.connection = connection;
this.client = client;
this.sessHandle = sessHandle;
@@ -97,6 +97,7 @@ public class HiveStatement implements ja
* @see java.sql.Statement#addBatch(java.lang.String)
*/
+ @Override
public void addBatch(String sql) throws SQLException {
throw new SQLException("Method not supported");
}
@@ -107,6 +108,7 @@ public class HiveStatement implements ja
* @see java.sql.Statement#cancel()
*/
+ @Override
public void cancel() throws SQLException {
if (isClosed) {
throw new SQLException("Can't cancel after statement has been closed");
@@ -134,6 +136,7 @@ public class HiveStatement implements ja
* @see java.sql.Statement#clearBatch()
*/
+ @Override
public void clearBatch() throws SQLException {
throw new SQLException("Method not supported");
}
@@ -144,6 +147,7 @@ public class HiveStatement implements ja
* @see java.sql.Statement#clearWarnings()
*/
+ @Override
public void clearWarnings() throws SQLException {
warningChain = null;
}
@@ -169,6 +173,7 @@ public class HiveStatement implements ja
*
* @see java.sql.Statement#close()
*/
+ @Override
public void close() throws SQLException {
if (isClosed) {
return;
@@ -192,6 +197,7 @@ public class HiveStatement implements ja
* @see java.sql.Statement#execute(java.lang.String)
*/
+ @Override
public boolean execute(String sql) throws SQLException {
if (isClosed) {
throw new SQLException("Can't execute after statement has been closed");
@@ -203,6 +209,13 @@ public class HiveStatement implements ja
}
TExecuteStatementReq execReq = new TExecuteStatementReq(sessHandle, sql);
+ /**
+ * Run asynchronously whenever possible
+ * Currently only a SQLOperation can be run asynchronously,
+ * in a background operation thread
+ * Compilation is synchronous and execution is asynchronous
+ */
+ execReq.setRunAsync(true);
execReq.setConfOverlay(sessConf);
TExecuteStatementResp execResp = client.ExecuteStatement(execReq);
Utils.verifySuccessWithInfo(execResp.getStatus());
@@ -213,46 +226,49 @@ public class HiveStatement implements ja
throw new SQLException(ex.toString(), "08S01", ex);
}
- if (!stmtHandle.isHasResultSet()) {
- // Poll until the query has completed one way or another. DML queries will not return a result
- // set, but we should not return from this method until the query has completed to avoid
- // racing with possible subsequent session shutdown, or queries that depend on the results
- // materialised here.
- TGetOperationStatusReq statusReq = new TGetOperationStatusReq(stmtHandle);
- boolean requestComplete = false;
- while (!requestComplete) {
- try {
- TGetOperationStatusResp statusResp = client.GetOperationStatus(statusReq);
- Utils.verifySuccessWithInfo(statusResp.getStatus());
- if (statusResp.isSetOperationState()) {
- switch (statusResp.getOperationState()) {
- case CLOSED_STATE:
- case FINISHED_STATE:
- return false;
- case CANCELED_STATE:
- // 01000 -> warning
- throw new SQLException("Query was cancelled", "01000");
- case ERROR_STATE:
- // HY000 -> general error
- throw new SQLException("Query failed", "HY000");
- case UKNOWN_STATE:
- throw new SQLException("Unknown query", "HY000");
- case INITIALIZED_STATE:
- case PENDING_STATE:
- case RUNNING_STATE:
- break;
- }
+ TGetOperationStatusReq statusReq = new TGetOperationStatusReq(stmtHandle);
+ boolean operationComplete = false;
+ TGetOperationStatusResp statusResp;
+
+ // Poll on the operation status, till the operation is complete
+ while (!operationComplete) {
+ try {
+ /**
+ * For an async SQLOperation, GetOperationStatus will use the long polling approach
+ * It will essentially return after the HIVE_SERVER2_LONG_POLLING_TIMEOUT (a server config) expires
+ */
+ statusResp = client.GetOperationStatus(statusReq);
+ Utils.verifySuccessWithInfo(statusResp.getStatus());
+ if (statusResp.isSetOperationState()) {
+ switch (statusResp.getOperationState()) {
+ case CLOSED_STATE:
+ case FINISHED_STATE:
+ operationComplete = true;
+ break;
+ case CANCELED_STATE:
+ // 01000 -> warning
+ throw new SQLException("Query was cancelled", "01000");
+ case ERROR_STATE:
+ // Get the error details from the underlying exception
+ throw new SQLException(statusResp.getErrorMessage(),
+ statusResp.getSqlState(), statusResp.getErrorCode());
+ case UKNOWN_STATE:
+ throw new SQLException("Unknown query", "HY000");
+ case INITIALIZED_STATE:
+ case PENDING_STATE:
+ case RUNNING_STATE:
+ break;
}
- } catch (Exception ex) {
- throw new SQLException(ex.toString(), "08S01", ex);
- }
-
- try {
- Thread.sleep(100);
- } catch (InterruptedException ex) {
- // Ignore
}
+ } catch (SQLException e) {
+ throw e;
+ } catch (Exception e) {
+ throw new SQLException(e.toString(), "08S01", e);
}
+ }
+
+ // The query should be completed by now
+ if (!stmtHandle.isHasResultSet()) {
return false;
}
resultSet = new HiveQueryResultSet.Builder(this).setClient(client).setSessionHandle(sessHandle)
@@ -268,6 +284,7 @@ public class HiveStatement implements ja
* @see java.sql.Statement#execute(java.lang.String, int)
*/
+ @Override
public boolean execute(String sql, int autoGeneratedKeys) throws SQLException {
throw new SQLException("Method not supported");
}
@@ -278,6 +295,7 @@ public class HiveStatement implements ja
* @see java.sql.Statement#execute(java.lang.String, int[])
*/
+ @Override
public boolean execute(String sql, int[] columnIndexes) throws SQLException {
throw new SQLException("Method not supported");
}
@@ -288,6 +306,7 @@ public class HiveStatement implements ja
* @see java.sql.Statement#execute(java.lang.String, java.lang.String[])
*/
+ @Override
public boolean execute(String sql, String[] columnNames) throws SQLException {
throw new SQLException("Method not supported");
}
@@ -298,6 +317,7 @@ public class HiveStatement implements ja
* @see java.sql.Statement#executeBatch()
*/
+ @Override
public int[] executeBatch() throws SQLException {
throw new SQLException("Method not supported");
}
@@ -308,6 +328,7 @@ public class HiveStatement implements ja
* @see java.sql.Statement#executeQuery(java.lang.String)
*/
+ @Override
public ResultSet executeQuery(String sql) throws SQLException {
if (!execute(sql)) {
throw new SQLException("The query did not generate a result set!");
@@ -321,6 +342,7 @@ public class HiveStatement implements ja
* @see java.sql.Statement#executeUpdate(java.lang.String)
*/
+ @Override
public int executeUpdate(String sql) throws SQLException {
execute(sql);
return 0;
@@ -332,6 +354,7 @@ public class HiveStatement implements ja
* @see java.sql.Statement#executeUpdate(java.lang.String, int)
*/
+ @Override
public int executeUpdate(String sql, int autoGeneratedKeys) throws SQLException {
throw new SQLException("Method not supported");
}
@@ -342,6 +365,7 @@ public class HiveStatement implements ja
* @see java.sql.Statement#executeUpdate(java.lang.String, int[])
*/
+ @Override
public int executeUpdate(String sql, int[] columnIndexes) throws SQLException {
throw new SQLException("Method not supported");
}
@@ -352,6 +376,7 @@ public class HiveStatement implements ja
* @see java.sql.Statement#executeUpdate(java.lang.String, java.lang.String[])
*/
+ @Override
public int executeUpdate(String sql, String[] columnNames) throws SQLException {
throw new SQLException("Method not supported");
}
@@ -362,6 +387,7 @@ public class HiveStatement implements ja
* @see java.sql.Statement#getConnection()
*/
+ @Override
public Connection getConnection() throws SQLException {
return this.connection;
}
@@ -372,6 +398,7 @@ public class HiveStatement implements ja
* @see java.sql.Statement#getFetchDirection()
*/
+ @Override
public int getFetchDirection() throws SQLException {
throw new SQLException("Method not supported");
}
@@ -382,6 +409,7 @@ public class HiveStatement implements ja
* @see java.sql.Statement#getFetchSize()
*/
+ @Override
public int getFetchSize() throws SQLException {
return fetchSize;
}
@@ -392,6 +420,7 @@ public class HiveStatement implements ja
* @see java.sql.Statement#getGeneratedKeys()
*/
+ @Override
public ResultSet getGeneratedKeys() throws SQLException {
throw new SQLException("Method not supported");
}
@@ -402,6 +431,7 @@ public class HiveStatement implements ja
* @see java.sql.Statement#getMaxFieldSize()
*/
+ @Override
public int getMaxFieldSize() throws SQLException {
throw new SQLException("Method not supported");
}
@@ -412,6 +442,7 @@ public class HiveStatement implements ja
* @see java.sql.Statement#getMaxRows()
*/
+ @Override
public int getMaxRows() throws SQLException {
return maxRows;
}
@@ -422,6 +453,7 @@ public class HiveStatement implements ja
* @see java.sql.Statement#getMoreResults()
*/
+ @Override
public boolean getMoreResults() throws SQLException {
throw new SQLException("Method not supported");
}
@@ -432,6 +464,7 @@ public class HiveStatement implements ja
* @see java.sql.Statement#getMoreResults(int)
*/
+ @Override
public boolean getMoreResults(int current) throws SQLException {
throw new SQLException("Method not supported");
}
@@ -442,6 +475,7 @@ public class HiveStatement implements ja
* @see java.sql.Statement#getQueryTimeout()
*/
+ @Override
public int getQueryTimeout() throws SQLException {
throw new SQLException("Method not supported");
}
@@ -452,6 +486,7 @@ public class HiveStatement implements ja
* @see java.sql.Statement#getResultSet()
*/
+ @Override
public ResultSet getResultSet() throws SQLException {
return resultSet;
}
@@ -462,6 +497,7 @@ public class HiveStatement implements ja
* @see java.sql.Statement#getResultSetConcurrency()
*/
+ @Override
public int getResultSetConcurrency() throws SQLException {
throw new SQLException("Method not supported");
}
@@ -472,6 +508,7 @@ public class HiveStatement implements ja
* @see java.sql.Statement#getResultSetHoldability()
*/
+ @Override
public int getResultSetHoldability() throws SQLException {
throw new SQLException("Method not supported");
}
@@ -482,6 +519,7 @@ public class HiveStatement implements ja
* @see java.sql.Statement#getResultSetType()
*/
+ @Override
public int getResultSetType() throws SQLException {
throw new SQLException("Method not supported");
}
@@ -492,6 +530,7 @@ public class HiveStatement implements ja
* @see java.sql.Statement#getUpdateCount()
*/
+ @Override
public int getUpdateCount() throws SQLException {
return 0;
}
@@ -502,6 +541,7 @@ public class HiveStatement implements ja
* @see java.sql.Statement#getWarnings()
*/
+ @Override
public SQLWarning getWarnings() throws SQLException {
return warningChain;
}
@@ -512,6 +552,7 @@ public class HiveStatement implements ja
* @see java.sql.Statement#isClosed()
*/
+ @Override
public boolean isClosed() throws SQLException {
return isClosed;
}
@@ -527,6 +568,7 @@ public class HiveStatement implements ja
* @see java.sql.Statement#isPoolable()
*/
+ @Override
public boolean isPoolable() throws SQLException {
throw new SQLException("Method not supported");
}
@@ -537,6 +579,7 @@ public class HiveStatement implements ja
* @see java.sql.Statement#setCursorName(java.lang.String)
*/
+ @Override
public void setCursorName(String name) throws SQLException {
throw new SQLException("Method not supported");
}
@@ -547,6 +590,7 @@ public class HiveStatement implements ja
* @see java.sql.Statement#setEscapeProcessing(boolean)
*/
+ @Override
public void setEscapeProcessing(boolean enable) throws SQLException {
throw new SQLException("Method not supported");
}
@@ -557,6 +601,7 @@ public class HiveStatement implements ja
* @see java.sql.Statement#setFetchDirection(int)
*/
+ @Override
public void setFetchDirection(int direction) throws SQLException {
throw new SQLException("Method not supported");
}
@@ -567,6 +612,7 @@ public class HiveStatement implements ja
* @see java.sql.Statement#setFetchSize(int)
*/
+ @Override
public void setFetchSize(int rows) throws SQLException {
fetchSize = rows;
}
@@ -577,6 +623,7 @@ public class HiveStatement implements ja
* @see java.sql.Statement#setMaxFieldSize(int)
*/
+ @Override
public void setMaxFieldSize(int max) throws SQLException {
throw new SQLException("Method not supported");
}
@@ -587,6 +634,7 @@ public class HiveStatement implements ja
* @see java.sql.Statement#setMaxRows(int)
*/
+ @Override
public void setMaxRows(int max) throws SQLException {
if (max < 0) {
throw new SQLException("max must be >= 0");
@@ -600,6 +648,7 @@ public class HiveStatement implements ja
* @see java.sql.Statement#setPoolable(boolean)
*/
+ @Override
public void setPoolable(boolean poolable) throws SQLException {
throw new SQLException("Method not supported");
}
@@ -610,6 +659,7 @@ public class HiveStatement implements ja
* @see java.sql.Statement#setQueryTimeout(int)
*/
+ @Override
public void setQueryTimeout(int seconds) throws SQLException {
throw new SQLException("Method not supported");
}
@@ -620,6 +670,7 @@ public class HiveStatement implements ja
* @see java.sql.Wrapper#isWrapperFor(java.lang.Class)
*/
+ @Override
public boolean isWrapperFor(Class<?> iface) throws SQLException {
throw new SQLException("Method not supported");
}
@@ -630,6 +681,7 @@ public class HiveStatement implements ja
* @see java.sql.Wrapper#unwrap(java.lang.Class)
*/
+ @Override
public <T> T unwrap(Class<T> iface) throws SQLException {
throw new SQLException("Method not supported");
}