You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by th...@apache.org on 2014/02/22 23:18:40 UTC
svn commit: r1570935 - in /hive/trunk:
common/src/java/org/apache/hadoop/hive/conf/ conf/
service/src/java/org/apache/hive/service/cli/
service/src/java/org/apache/hive/service/cli/operation/
service/src/test/org/apache/hive/service/cli/
Author: thejas
Date: Sat Feb 22 22:18:39 2014
New Revision: 1570935
URL: http://svn.apache.org/r1570935
Log:
HIVE-5217 : Add long polling to asynchronous execution in HiveServer2 (Vaibhav Gumashta via Thejas Nair)
Modified:
hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
hive/trunk/conf/hive-default.xml.template
hive/trunk/service/src/java/org/apache/hive/service/cli/CLIService.java
hive/trunk/service/src/java/org/apache/hive/service/cli/operation/ExecuteStatementOperation.java
hive/trunk/service/src/java/org/apache/hive/service/cli/operation/HiveCommandOperation.java
hive/trunk/service/src/java/org/apache/hive/service/cli/operation/MetadataOperation.java
hive/trunk/service/src/java/org/apache/hive/service/cli/operation/Operation.java
hive/trunk/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java
hive/trunk/service/src/test/org/apache/hive/service/cli/CLIServiceTest.java
Modified: hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
URL: http://svn.apache.org/viewvc/hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java?rev=1570935&r1=1570934&r2=1570935&view=diff
==============================================================================
--- hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (original)
+++ hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java Sat Feb 22 22:18:39 2014
@@ -816,6 +816,9 @@ public class HiveConf extends Configurat
// will wait for a new task to arrive before terminating
HIVE_SERVER2_ASYNC_EXEC_KEEPALIVE_TIME("hive.server2.async.exec.keepalive.time", 10),
+ // Time in milliseconds that HiveServer2 will wait,
+ // before responding to asynchronous calls that use long polling
+ HIVE_SERVER2_LONG_POLLING_TIMEOUT("hive.server2.long.polling.timeout", 5000L),
// HiveServer2 auth configuration
HIVE_SERVER2_AUTHENTICATION("hive.server2.authentication", "NONE",
Modified: hive/trunk/conf/hive-default.xml.template
URL: http://svn.apache.org/viewvc/hive/trunk/conf/hive-default.xml.template?rev=1570935&r1=1570934&r2=1570935&view=diff
==============================================================================
--- hive/trunk/conf/hive-default.xml.template (original)
+++ hive/trunk/conf/hive-default.xml.template Sat Feb 22 22:18:39 2014
@@ -2119,6 +2119,12 @@
</property>
<property>
+ <name>hive.server2.long.polling.timeout</name>
+ <value>5000L</value>
+ <description>Time in milliseconds that HiveServer2 will wait, before responding to asynchronous calls that use long polling</description>
+</property>
+
+<property>
<name>hive.server2.async.exec.wait.queue.size</name>
<value>100</value>
<description>Size of the wait queue for async thread pool in HiveServer2.
Modified: hive/trunk/service/src/java/org/apache/hive/service/cli/CLIService.java
URL: http://svn.apache.org/viewvc/hive/trunk/service/src/java/org/apache/hive/service/cli/CLIService.java?rev=1570935&r1=1570934&r2=1570935&view=diff
==============================================================================
--- hive/trunk/service/src/java/org/apache/hive/service/cli/CLIService.java (original)
+++ hive/trunk/service/src/java/org/apache/hive/service/cli/CLIService.java Sat Feb 22 22:18:39 2014
@@ -21,6 +21,10 @@ package org.apache.hive.service.cli;
import java.io.IOException;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import javax.security.auth.login.LoginException;
@@ -38,6 +42,7 @@ import org.apache.hadoop.hive.shims.Shim
import org.apache.hive.service.CompositeService;
import org.apache.hive.service.ServiceException;
import org.apache.hive.service.auth.HiveAuthFactory;
+import org.apache.hive.service.cli.operation.Operation;
import org.apache.hive.service.cli.session.SessionManager;
import org.apache.hive.service.cli.thrift.TProtocolVersion;
@@ -123,7 +128,7 @@ public class CLIService extends Composit
public SessionHandle openSessionWithImpersonation(TProtocolVersion protocol, String username,
String password, Map<String, String> configuration, String delegationToken)
- throws HiveSQLException {
+ throws HiveSQLException {
SessionHandle sessionHandle = sessionManager.openSession(protocol, username, password, configuration,
true, delegationToken);
LOG.debug(sessionHandle + ": openSession()");
@@ -146,9 +151,9 @@ public class CLIService extends Composit
*/
@Override
public SessionHandle openSessionWithImpersonation(String username, String password, Map<String, String> configuration,
- String delegationToken) throws HiveSQLException {
+ String delegationToken) throws HiveSQLException {
SessionHandle sessionHandle = sessionManager.openSession(SERVER_VERSION, username, password, configuration,
- true, delegationToken);
+ true, delegationToken);
LOG.debug(sessionHandle + ": openSession()");
return sessionHandle;
}
@@ -297,8 +302,33 @@ public class CLIService extends Composit
@Override
public OperationStatus getOperationStatus(OperationHandle opHandle)
throws HiveSQLException {
- OperationStatus opStatus = sessionManager.getOperationManager()
- .getOperationStatus(opHandle);
+ Operation operation = sessionManager.getOperationManager().getOperation(opHandle);
+ /**
+ * If this is a background operation run asynchronously,
+ * we block for a configured duration, before we return
+ * (duration: HIVE_SERVER2_LONG_POLLING_TIMEOUT).
+ * However, if the background operation is complete, we return immediately.
+ */
+ if (operation.shouldRunAsync()) {
+ long timeout = operation.getParentSession().getHiveConf().getLongVar(
+ HiveConf.ConfVars.HIVE_SERVER2_LONG_POLLING_TIMEOUT);
+ try {
+ operation.getBackgroundHandle().get(timeout, TimeUnit.MILLISECONDS);
+ } catch (TimeoutException e) {
+ // No Op, return to the caller since long polling timeout has expired
+ LOG.trace(opHandle + ": Long polling timed out");
+ } catch (CancellationException e) {
+ // The background operation thread was cancelled
+ LOG.trace(opHandle + ": The background operation was cancelled", e);
+ } catch (ExecutionException e) {
+ // The background operation thread was aborted
+ LOG.trace(opHandle + ": The background operation was aborted", e);
+ } catch (InterruptedException e) {
+ // No op, this thread was interrupted
+ // In this case, the call might return sooner than long polling timeout
+ }
+ }
+ OperationStatus opStatus = operation.getStatus();
LOG.debug(opHandle + ": getOperationStatus()");
return opStatus;
}
@@ -310,7 +340,7 @@ public class CLIService extends Composit
public void cancelOperation(OperationHandle opHandle)
throws HiveSQLException {
sessionManager.getOperationManager().getOperation(opHandle)
- .getParentSession().cancelOperation(opHandle);
+ .getParentSession().cancelOperation(opHandle);
LOG.debug(opHandle + ": cancelOperation()");
}
@@ -321,7 +351,7 @@ public class CLIService extends Composit
public void closeOperation(OperationHandle opHandle)
throws HiveSQLException {
sessionManager.getOperationManager().getOperation(opHandle)
- .getParentSession().closeOperation(opHandle);
+ .getParentSession().closeOperation(opHandle);
LOG.debug(opHandle + ": closeOperation");
}
Modified: hive/trunk/service/src/java/org/apache/hive/service/cli/operation/ExecuteStatementOperation.java
URL: http://svn.apache.org/viewvc/hive/trunk/service/src/java/org/apache/hive/service/cli/operation/ExecuteStatementOperation.java?rev=1570935&r1=1570934&r2=1570935&view=diff
==============================================================================
--- hive/trunk/service/src/java/org/apache/hive/service/cli/operation/ExecuteStatementOperation.java (original)
+++ hive/trunk/service/src/java/org/apache/hive/service/cli/operation/ExecuteStatementOperation.java Sat Feb 22 22:18:39 2014
@@ -19,16 +19,12 @@ package org.apache.hive.service.cli.oper
+import java.sql.SQLException;
import java.util.HashMap;
-import java.util.HashSet;
import java.util.Map;
-import java.util.Set;
-import java.sql.SQLException;
-import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.ql.processors.CommandProcessorFactory;
import org.apache.hadoop.hive.ql.processors.CommandProcessor;
-import org.apache.hadoop.hive.ql.processors.HiveCommand;
+import org.apache.hadoop.hive.ql.processors.CommandProcessorFactory;
import org.apache.hive.service.cli.HiveSQLException;
import org.apache.hive.service.cli.OperationType;
import org.apache.hive.service.cli.session.HiveSession;
@@ -37,8 +33,9 @@ public abstract class ExecuteStatementOp
protected String statement = null;
protected Map<String, String> confOverlay = new HashMap<String, String>();
- public ExecuteStatementOperation(HiveSession parentSession, String statement, Map<String, String> confOverlay) {
- super(parentSession, OperationType.EXECUTE_STATEMENT);
+ public ExecuteStatementOperation(HiveSession parentSession, String statement,
+ Map<String, String> confOverlay, boolean runInBackground) {
+ super(parentSession, OperationType.EXECUTE_STATEMENT, runInBackground);
this.statement = statement;
setConfOverlay(confOverlay);
}
@@ -49,7 +46,7 @@ public abstract class ExecuteStatementOp
public static ExecuteStatementOperation newExecuteStatementOperation(
HiveSession parentSession, String statement, Map<String, String> confOverlay, boolean runAsync)
- throws HiveSQLException {
+ throws HiveSQLException {
String[] tokens = statement.trim().split("\\s+");
CommandProcessor processor = null;
try {
Modified: hive/trunk/service/src/java/org/apache/hive/service/cli/operation/HiveCommandOperation.java
URL: http://svn.apache.org/viewvc/hive/trunk/service/src/java/org/apache/hive/service/cli/operation/HiveCommandOperation.java?rev=1570935&r1=1570934&r2=1570935&view=diff
==============================================================================
--- hive/trunk/service/src/java/org/apache/hive/service/cli/operation/HiveCommandOperation.java (original)
+++ hive/trunk/service/src/java/org/apache/hive/service/cli/operation/HiveCommandOperation.java Sat Feb 22 22:18:39 2014
@@ -60,7 +60,7 @@ public class HiveCommandOperation extend
protected HiveCommandOperation(HiveSession parentSession, String statement,
CommandProcessor commandProcessor, Map<String, String> confOverlay) {
- super(parentSession, statement, confOverlay);
+ super(parentSession, statement, confOverlay, false);
this.commandProcessor = commandProcessor;
setupSessionIO(parentSession.getSessionState());
}
Modified: hive/trunk/service/src/java/org/apache/hive/service/cli/operation/MetadataOperation.java
URL: http://svn.apache.org/viewvc/hive/trunk/service/src/java/org/apache/hive/service/cli/operation/MetadataOperation.java?rev=1570935&r1=1570934&r2=1570935&view=diff
==============================================================================
--- hive/trunk/service/src/java/org/apache/hive/service/cli/operation/MetadataOperation.java (original)
+++ hive/trunk/service/src/java/org/apache/hive/service/cli/operation/MetadataOperation.java Sat Feb 22 22:18:39 2014
@@ -35,7 +35,7 @@ public abstract class MetadataOperation
private static final char SEARCH_STRING_ESCAPE = '\\';
protected MetadataOperation(HiveSession parentSession, OperationType opType) {
- super(parentSession, opType);
+ super(parentSession, opType, false);
setHasResultSet(true);
}
@@ -85,15 +85,15 @@ public abstract class MetadataOperation
* format '.*' This is driven by the datanucleusFormat flag.
*/
private String convertPattern(final String pattern, boolean datanucleusFormat) {
- String wStr;
- if (datanucleusFormat) {
- wStr = "*";
- } else {
- wStr = ".*";
- }
- return pattern
- .replaceAll("([^\\\\])%", "$1" + wStr).replaceAll("\\\\%", "%").replaceAll("^%", wStr)
- .replaceAll("([^\\\\])_", "$1.").replaceAll("\\\\_", "_").replaceAll("^_", ".");
+ String wStr;
+ if (datanucleusFormat) {
+ wStr = "*";
+ } else {
+ wStr = ".*";
+ }
+ return pattern
+ .replaceAll("([^\\\\])%", "$1" + wStr).replaceAll("\\\\%", "%").replaceAll("^%", wStr)
+ .replaceAll("([^\\\\])_", "$1.").replaceAll("\\\\_", "_").replaceAll("^_", ".");
}
}
Modified: hive/trunk/service/src/java/org/apache/hive/service/cli/operation/Operation.java
URL: http://svn.apache.org/viewvc/hive/trunk/service/src/java/org/apache/hive/service/cli/operation/Operation.java?rev=1570935&r1=1570934&r2=1570935&view=diff
==============================================================================
--- hive/trunk/service/src/java/org/apache/hive/service/cli/operation/Operation.java (original)
+++ hive/trunk/service/src/java/org/apache/hive/service/cli/operation/Operation.java Sat Feb 22 22:18:39 2014
@@ -18,6 +18,7 @@
package org.apache.hive.service.cli.operation;
import java.util.EnumSet;
+import java.util.concurrent.Future;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -43,16 +44,31 @@ public abstract class Operation {
public static final long DEFAULT_FETCH_MAX_ROWS = 100;
protected boolean hasResultSet;
protected volatile HiveSQLException operationException;
+ protected final boolean runAsync;
+ protected volatile Future<?> backgroundHandle;
protected static final EnumSet<FetchOrientation> DEFAULT_FETCH_ORIENTATION_SET =
EnumSet.of(FetchOrientation.FETCH_NEXT,FetchOrientation.FETCH_FIRST);
-
- protected Operation(HiveSession parentSession, OperationType opType) {
+
+ protected Operation(HiveSession parentSession, OperationType opType, boolean runInBackground) {
super();
this.parentSession = parentSession;
+ this.runAsync = runInBackground;
this.opHandle = new OperationHandle(opType, parentSession.getProtocolVersion());
}
+ public Future<?> getBackgroundHandle() {
+ return backgroundHandle;
+ }
+
+ protected void setBackgroundHandle(Future<?> backgroundHandle) {
+ this.backgroundHandle = backgroundHandle;
+ }
+
+ public boolean shouldRunAsync() {
+ return runAsync;
+ }
+
public void setConfiguration(HiveConf configuration) {
this.configuration = new HiveConf(configuration);
}
@@ -160,7 +176,7 @@ public abstract class Operation {
EnumSet<FetchOrientation> supportedOrientations) throws HiveSQLException {
if (!supportedOrientations.contains(orientation)) {
throw new HiveSQLException("The fetch type " + orientation.toString() +
- " is not supported for this resultset", "HY106");
+ " is not supported for this resultset", "HY106");
}
}
}
Modified: hive/trunk/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java
URL: http://svn.apache.org/viewvc/hive/trunk/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java?rev=1570935&r1=1570934&r2=1570935&view=diff
==============================================================================
--- hive/trunk/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java (original)
+++ hive/trunk/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java Sat Feb 22 22:18:39 2014
@@ -66,15 +66,12 @@ public class SQLOperation extends Execut
private TableSchema resultSchema = null;
private Schema mResultSchema = null;
private SerDe serde = null;
- private final boolean runAsync;
- private volatile Future<?> backgroundHandle;
private boolean fetchStarted = false;
public SQLOperation(HiveSession parentSession, String statement, Map<String,
String> confOverlay, boolean runInBackground) {
// TODO: call setRemoteUser in ExecuteStatementOperation or higher.
- super(parentSession, statement, confOverlay);
- this.runAsync = runInBackground;
+ super(parentSession, statement, confOverlay, runInBackground);
}
/***
@@ -159,7 +156,7 @@ public class SQLOperation extends Execut
public void run() throws HiveSQLException {
setState(OperationState.PENDING);
prepare(getConfigForOperation());
- if (!runAsync) {
+ if (!shouldRunAsync()) {
runInternal(getConfigForOperation());
} else {
Runnable backgroundOperation = new Runnable() {
@@ -177,8 +174,9 @@ public class SQLOperation extends Execut
};
try {
// This submit blocks if no background threads are available to run this operation
- backgroundHandle =
+ Future<?> backgroundHandle =
getParentSession().getSessionManager().submitBackgroundOperation(backgroundOperation);
+ setBackgroundHandle(backgroundHandle);
} catch (RejectedExecutionException rejected) {
setState(OperationState.ERROR);
throw new HiveSQLException("All the asynchronous threads are currently busy, " +
@@ -189,7 +187,8 @@ public class SQLOperation extends Execut
private void cleanup(OperationState state) throws HiveSQLException {
setState(state);
- if (runAsync) {
+ if (shouldRunAsync()) {
+ Future<?> backgroundHandle = getBackgroundHandle();
if (backgroundHandle != null) {
backgroundHandle.cancel(true);
}
@@ -349,7 +348,7 @@ public class SQLOperation extends Execut
*/
private HiveConf getConfigForOperation() throws HiveSQLException {
HiveConf sqlOperationConf = getParentSession().getHiveConf();
- if (!getConfOverlay().isEmpty() || runAsync) {
+ if (!getConfOverlay().isEmpty() || shouldRunAsync()) {
// clone the partent session config for this query
sqlOperationConf = new HiveConf(sqlOperationConf);
@@ -364,5 +363,4 @@ public class SQLOperation extends Execut
}
return sqlOperationConf;
}
-
}
Modified: hive/trunk/service/src/test/org/apache/hive/service/cli/CLIServiceTest.java
URL: http://svn.apache.org/viewvc/hive/trunk/service/src/test/org/apache/hive/service/cli/CLIServiceTest.java?rev=1570935&r1=1570934&r2=1570935&view=diff
==============================================================================
--- hive/trunk/service/src/test/org/apache/hive/service/cli/CLIServiceTest.java (original)
+++ hive/trunk/service/src/test/org/apache/hive/service/cli/CLIServiceTest.java Sat Feb 22 22:18:39 2014
@@ -22,14 +22,13 @@ import static org.junit.Assert.assertEqu
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
-import static org.junit.Assert.assertTrue;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.ql.ErrorMsg;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -57,7 +56,7 @@ public abstract class CLIServiceTest {
}
@Test
- public void openSessionTest() throws Exception {
+ public void testOpenSession() throws Exception {
SessionHandle sessionHandle = client.openSession(
"tom", "password", Collections.<String, String>emptyMap());
assertNotNull(sessionHandle);
@@ -69,7 +68,7 @@ public abstract class CLIServiceTest {
}
@Test
- public void getFunctionsTest() throws Exception {
+ public void testGetFunctions() throws Exception {
SessionHandle sessionHandle = client.openSession("tom", "password");
assertNotNull(sessionHandle);
@@ -106,7 +105,7 @@ public abstract class CLIServiceTest {
}
@Test
- public void getInfoTest() throws Exception {
+ public void testGetInfo() throws Exception {
SessionHandle sessionHandle = client.openSession(
"tom", "password", Collections.<String, String>emptyMap());
assertNotNull(sessionHandle);
@@ -123,6 +122,10 @@ public abstract class CLIServiceTest {
client.closeSession(sessionHandle);
}
+ /**
+ * Test the blocking execution of a query
+ * @throws Exception
+ */
@Test
public void testExecuteStatement() throws Exception {
HashMap<String, String> confOverlay = new HashMap<String, String>();
@@ -161,113 +164,171 @@ public abstract class CLIServiceTest {
client.closeSession(sessionHandle);
}
+ /**
+ * Test async execution of a well-formed and a malformed query with different long polling durations
+ * - Test malformed query with default long polling timeout
+ * - Test well-formed query with default long polling timeout
+ * - Test well-formed query with long polling timeout set to 0
+ * - Test well-formed query with long polling timeout set to 500 millis
+ * - Test well-formed query cancellation
+ * @throws Exception
+ */
@Test
public void testExecuteStatementAsync() throws Exception {
- HashMap<String, String> confOverlay = new HashMap<String, String>();
- SessionHandle sessionHandle = client.openSession("tom", "password",
- new HashMap<String, String>());
- // Timeout for the poll in case of asynchronous execute
- long pollTimeout = System.currentTimeMillis() + 100000;
+ Map<String, String> confOverlay = new HashMap<String, String>();
+ String tableName = "TEST_EXEC_ASYNC";
+ String columnDefinitions = "(ID STRING)";
+ String queryString;
+
+ // Open a session and set up the test data
+ SessionHandle sessionHandle = setupTestData(tableName, columnDefinitions, confOverlay);
assertNotNull(sessionHandle);
+
OperationState state = null;
OperationHandle opHandle;
OperationStatus opStatus = null;
// Change lock manager, otherwise unit-test doesn't go through
- String queryString = "SET " + HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY.varname
+ queryString = "SET " + HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY.varname
+ " = false";
opHandle = client.executeStatement(sessionHandle, queryString, confOverlay);
client.closeOperation(opHandle);
- // Drop the table if it exists
- queryString = "DROP TABLE IF EXISTS TEST_EXEC_ASYNC";
- opHandle = client.executeStatement(sessionHandle, queryString, confOverlay);
- client.closeOperation(opHandle);
+ // Set longPollingTimeout to a custom value for different test cases
+ long longPollingTimeout;
- // Create a test table
- queryString = "CREATE TABLE TEST_EXEC_ASYNC(ID STRING)";
- opHandle = client.executeStatement(sessionHandle, queryString, confOverlay);
- client.closeOperation(opHandle);
-
- // Test async execution response when query is malformed
- // Compile time error
- // This query will error out during compilation (which is done synchronous as of now)
- String wrongQueryString = "SELECT NON_EXISTANT_COLUMN FROM TEST_EXEC_ASYNC";
+ /**
+ * Execute a malformed async query with default config,
+ * to give a compile time error.
+ * (compilation is done synchronous as of now)
+ */
+ longPollingTimeout = new HiveConf().getLongVar(ConfVars.HIVE_SERVER2_LONG_POLLING_TIMEOUT);
+ queryString = "SELECT NON_EXISTING_COLUMN FROM " + tableName;
try {
- opHandle = client.executeStatementAsync(sessionHandle, wrongQueryString, confOverlay);
- fail("Async syntax excution should fail");
- } catch (HiveSQLException e) {
+ runQueryAsync(sessionHandle, queryString, confOverlay, OperationState.ERROR, longPollingTimeout);
+ }
+ catch (HiveSQLException e) {
// expected error
}
-
-
- // Runtime error
- wrongQueryString = "CREATE TABLE NON_EXISTING_TAB (ID STRING) location 'hdfs://fooNN:10000/a/b/c'";
- opHandle = client.executeStatementAsync(sessionHandle, wrongQueryString, confOverlay);
-
- int count = 0;
- while (true) {
- // Break if polling times out
- if (System.currentTimeMillis() > pollTimeout) {
- System.out.println("Polling timed out");
- break;
- }
- opStatus = client.getOperationStatus(opHandle);
- state = opStatus.getState();
- System.out.println("Polling: " + opHandle + " count=" + (++count)
- + " state=" + state);
- if (state == OperationState.CANCELED || state == OperationState.CLOSED
- || state == OperationState.FINISHED || state == OperationState.ERROR) {
- break;
- }
- Thread.sleep(1000);
- }
- assertEquals("Operation should be in error state", OperationState.ERROR, state);
+ /**
+ * Execute a malformed async query with default config,
+ * to give a runtime time error.
+ * Also check that the sqlState and errorCode should be set
+ */
+ queryString = "CREATE TABLE NON_EXISTING_TAB (ID STRING) location 'hdfs://localhost:10000/a/b/c'";
+ opStatus = runQueryAsync(sessionHandle, queryString, confOverlay, OperationState.ERROR, longPollingTimeout);
// sqlState, errorCode should be set
assertEquals(opStatus.getOperationException().getSQLState(), "08S01");
assertEquals(opStatus.getOperationException().getErrorCode(), 1);
- client.closeOperation(opHandle);
-
- // Test async execution when query is well formed
- queryString = "SELECT ID FROM TEST_EXEC_ASYNC";
+ /**
+ * Execute an async query with default config
+ */
+ queryString = "SELECT ID FROM " + tableName;
+ runQueryAsync(sessionHandle, queryString, confOverlay, OperationState.FINISHED, longPollingTimeout);
+
+ /**
+ * Execute an async query with long polling timeout set to 0
+ */
+ longPollingTimeout = 0;
+ queryString = "SELECT ID FROM " + tableName;
+ runQueryAsync(sessionHandle, queryString, confOverlay, OperationState.FINISHED, longPollingTimeout);
+
+ /**
+ * Execute an async query with long polling timeout set to 500 millis
+ */
+ longPollingTimeout = 500;
+ queryString = "SELECT ID FROM " + tableName;
+ runQueryAsync(sessionHandle, queryString, confOverlay, OperationState.FINISHED, longPollingTimeout);
+
+ /**
+ * Cancellation test
+ */
+ queryString = "SELECT ID FROM " + tableName;
opHandle = client.executeStatementAsync(sessionHandle, queryString, confOverlay);
- assertTrue(opHandle.hasResultSet());
-
- count = 0;
+ System.out.println("Cancelling " + opHandle);
+ client.cancelOperation(opHandle);
+ state = client.getOperationStatus(opHandle).getState();
+ System.out.println(opHandle + " after cancelling, state= " + state);
+ assertEquals("Query should be cancelled", OperationState.CANCELED, state);
+
+ // Cleanup
+ queryString = "DROP TABLE " + tableName;
+ client.executeStatement(sessionHandle, queryString, confOverlay);
+ client.closeSession(sessionHandle);
+ }
+
+ /**
+ * Sets up a test specific table with the given column definitions and config
+ * @param tableName
+ * @param columnDefinitions
+ * @param confOverlay
+ * @throws Exception
+ */
+ private SessionHandle setupTestData(String tableName, String columnDefinitions,
+ Map<String, String> confOverlay) throws Exception {
+ SessionHandle sessionHandle = client.openSession("tom", "password", confOverlay);
+ assertNotNull(sessionHandle);
+
+ String queryString = "SET " + HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY.varname
+ + " = false";
+ client.executeStatement(sessionHandle, queryString, confOverlay);
+
+ // Drop the table if it exists
+ queryString = "DROP TABLE IF EXISTS " + tableName;
+ client.executeStatement(sessionHandle, queryString, confOverlay);
+
+ // Create a test table
+ queryString = "CREATE TABLE " + tableName + columnDefinitions;
+ client.executeStatement(sessionHandle, queryString, confOverlay);
+
+ return sessionHandle;
+ }
+
+ private OperationStatus runQueryAsync(SessionHandle sessionHandle, String queryString,
+ Map<String, String> confOverlay, OperationState expectedState,
+ long longPollingTimeout) throws HiveSQLException {
+ // Timeout for the iteration in case of asynchronous execute
+ long testIterationTimeout = System.currentTimeMillis() + 100000;
+ long longPollingStart;
+ long longPollingEnd;
+ long longPollingTimeDelta;
+ OperationStatus opStatus = null;
+ OperationState state = null;
+ confOverlay.put(HiveConf.ConfVars.HIVE_SERVER2_LONG_POLLING_TIMEOUT.varname, String.valueOf(longPollingTimeout));
+ OperationHandle opHandle = client.executeStatementAsync(sessionHandle, queryString, confOverlay);
+ int count = 0;
while (true) {
- // Break if polling times out
- if (System.currentTimeMillis() > pollTimeout) {
+ // Break if iteration times out
+ if (System.currentTimeMillis() > testIterationTimeout) {
System.out.println("Polling timed out");
break;
}
+ longPollingStart = System.currentTimeMillis();
+ System.out.println("Long polling starts at: " + longPollingStart);
opStatus = client.getOperationStatus(opHandle);
state = opStatus.getState();
+ longPollingEnd = System.currentTimeMillis();
+ System.out.println("Long polling ends at: " + longPollingEnd);
+
System.out.println("Polling: " + opHandle + " count=" + (++count)
+ " state=" + state);
- if (state == OperationState.CANCELED || state == OperationState.CLOSED
- || state == OperationState.FINISHED || state == OperationState.ERROR) {
+ if (state == OperationState.CANCELED ||
+ state == OperationState.CLOSED ||
+ state == OperationState.FINISHED ||
+ state == OperationState.ERROR) {
break;
+ } else {
+ // Verify that getOperationStatus returned only after the long polling timeout
+ longPollingTimeDelta = longPollingEnd - longPollingStart;
+ // Scale down by a factor of 0.9 to account for approximate values
+ assertTrue(longPollingTimeDelta - 0.9*longPollingTimeout > 0);
}
- Thread.sleep(1000);
}
- assertEquals("Query should be finished", OperationState.FINISHED, state);
- client.closeOperation(opHandle);
-
- // Cancellation test
- opHandle = client.executeStatementAsync(sessionHandle, queryString, confOverlay);
- System.out.println("cancelling " + opHandle);
- client.cancelOperation(opHandle);
- state = client.getOperationStatus(opHandle).getState();
- System.out.println(opHandle + " after cancelling, state= " + state);
- assertEquals("Query should be cancelled", OperationState.CANCELED, state);
-
- // Cleanup
- queryString = "DROP TABLE IF EXISTS TEST_EXEC_ASYNC";
- opHandle = client.executeStatement(sessionHandle, queryString, confOverlay);
+ assertEquals(expectedState, client.getOperationStatus(opHandle).getState());
client.closeOperation(opHandle);
- client.closeSession(sessionHandle);
+ return opStatus;
}
/**