You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by th...@apache.org on 2014/02/22 23:18:40 UTC

svn commit: r1570935 - in /hive/trunk: common/src/java/org/apache/hadoop/hive/conf/ conf/ service/src/java/org/apache/hive/service/cli/ service/src/java/org/apache/hive/service/cli/operation/ service/src/test/org/apache/hive/service/cli/

Author: thejas
Date: Sat Feb 22 22:18:39 2014
New Revision: 1570935

URL: http://svn.apache.org/r1570935
Log:
HIVE-5217 : Add long polling to asynchronous execution in HiveServer2 (Vaibhav Gumashta via Thejas Nair)

Modified:
    hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
    hive/trunk/conf/hive-default.xml.template
    hive/trunk/service/src/java/org/apache/hive/service/cli/CLIService.java
    hive/trunk/service/src/java/org/apache/hive/service/cli/operation/ExecuteStatementOperation.java
    hive/trunk/service/src/java/org/apache/hive/service/cli/operation/HiveCommandOperation.java
    hive/trunk/service/src/java/org/apache/hive/service/cli/operation/MetadataOperation.java
    hive/trunk/service/src/java/org/apache/hive/service/cli/operation/Operation.java
    hive/trunk/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java
    hive/trunk/service/src/test/org/apache/hive/service/cli/CLIServiceTest.java

Modified: hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
URL: http://svn.apache.org/viewvc/hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java?rev=1570935&r1=1570934&r2=1570935&view=diff
==============================================================================
--- hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (original)
+++ hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java Sat Feb 22 22:18:39 2014
@@ -816,6 +816,9 @@ public class HiveConf extends Configurat
     // will wait for a new task to arrive before terminating
     HIVE_SERVER2_ASYNC_EXEC_KEEPALIVE_TIME("hive.server2.async.exec.keepalive.time", 10),
 
+    // Time in milliseconds that HiveServer2 will wait,
+    // before responding to asynchronous calls that use long polling
+    HIVE_SERVER2_LONG_POLLING_TIMEOUT("hive.server2.long.polling.timeout", 5000L),
 
     // HiveServer2 auth configuration
     HIVE_SERVER2_AUTHENTICATION("hive.server2.authentication", "NONE",

Modified: hive/trunk/conf/hive-default.xml.template
URL: http://svn.apache.org/viewvc/hive/trunk/conf/hive-default.xml.template?rev=1570935&r1=1570934&r2=1570935&view=diff
==============================================================================
--- hive/trunk/conf/hive-default.xml.template (original)
+++ hive/trunk/conf/hive-default.xml.template Sat Feb 22 22:18:39 2014
@@ -2119,6 +2119,12 @@
 </property>
 
 <property>
+  <name>hive.server2.long.polling.timeout</name>
+  <value>5000L</value>
+  <description>Time in milliseconds that HiveServer2 will wait, before responding to asynchronous calls that use long polling</description>
+</property>
+
+<property>
   <name>hive.server2.async.exec.wait.queue.size</name>
   <value>100</value>
   <description>Size of the wait queue for async thread pool in HiveServer2.

Modified: hive/trunk/service/src/java/org/apache/hive/service/cli/CLIService.java
URL: http://svn.apache.org/viewvc/hive/trunk/service/src/java/org/apache/hive/service/cli/CLIService.java?rev=1570935&r1=1570934&r2=1570935&view=diff
==============================================================================
--- hive/trunk/service/src/java/org/apache/hive/service/cli/CLIService.java (original)
+++ hive/trunk/service/src/java/org/apache/hive/service/cli/CLIService.java Sat Feb 22 22:18:39 2014
@@ -21,6 +21,10 @@ package org.apache.hive.service.cli;
 import java.io.IOException;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 
 import javax.security.auth.login.LoginException;
 
@@ -38,6 +42,7 @@ import org.apache.hadoop.hive.shims.Shim
 import org.apache.hive.service.CompositeService;
 import org.apache.hive.service.ServiceException;
 import org.apache.hive.service.auth.HiveAuthFactory;
+import org.apache.hive.service.cli.operation.Operation;
 import org.apache.hive.service.cli.session.SessionManager;
 import org.apache.hive.service.cli.thrift.TProtocolVersion;
 
@@ -123,7 +128,7 @@ public class CLIService extends Composit
 
   public SessionHandle openSessionWithImpersonation(TProtocolVersion protocol, String username,
       String password, Map<String, String> configuration, String delegationToken)
-      throws HiveSQLException {
+          throws HiveSQLException {
     SessionHandle sessionHandle = sessionManager.openSession(protocol, username, password, configuration,
         true, delegationToken);
     LOG.debug(sessionHandle + ": openSession()");
@@ -146,9 +151,9 @@ public class CLIService extends Composit
    */
   @Override
   public SessionHandle openSessionWithImpersonation(String username, String password, Map<String, String> configuration,
-       String delegationToken) throws HiveSQLException {
+      String delegationToken) throws HiveSQLException {
     SessionHandle sessionHandle = sessionManager.openSession(SERVER_VERSION, username, password, configuration,
-          true, delegationToken);
+        true, delegationToken);
     LOG.debug(sessionHandle + ": openSession()");
     return sessionHandle;
   }
@@ -297,8 +302,33 @@ public class CLIService extends Composit
   @Override
   public OperationStatus getOperationStatus(OperationHandle opHandle)
       throws HiveSQLException {
-    OperationStatus opStatus = sessionManager.getOperationManager()
-        .getOperationStatus(opHandle);
+    Operation operation = sessionManager.getOperationManager().getOperation(opHandle);
+    /**
+     * If this is a background operation run asynchronously,
+     * we block for a configured duration, before we return
+     * (duration: HIVE_SERVER2_LONG_POLLING_TIMEOUT).
+     * However, if the background operation is complete, we return immediately.
+     */
+    if (operation.shouldRunAsync()) {
+      long timeout = operation.getParentSession().getHiveConf().getLongVar(
+          HiveConf.ConfVars.HIVE_SERVER2_LONG_POLLING_TIMEOUT);
+      try {
+        operation.getBackgroundHandle().get(timeout, TimeUnit.MILLISECONDS);
+      } catch (TimeoutException e) {
+        // No Op, return to the caller since long polling timeout has expired
+        LOG.trace(opHandle + ": Long polling timed out");
+      } catch (CancellationException e) {
+        // The background operation thread was cancelled
+        LOG.trace(opHandle + ": The background operation was cancelled", e);
+      } catch (ExecutionException e) {
+        // The background operation thread was aborted
+        LOG.trace(opHandle + ": The background operation was aborted", e);
+      } catch (InterruptedException e) {
+        // No op, this thread was interrupted
+        // In this case, the call might return sooner than long polling timeout
+      }
+    }
+    OperationStatus opStatus = operation.getStatus();
     LOG.debug(opHandle + ": getOperationStatus()");
     return opStatus;
   }
@@ -310,7 +340,7 @@ public class CLIService extends Composit
   public void cancelOperation(OperationHandle opHandle)
       throws HiveSQLException {
     sessionManager.getOperationManager().getOperation(opHandle)
-        .getParentSession().cancelOperation(opHandle);
+    .getParentSession().cancelOperation(opHandle);
     LOG.debug(opHandle + ": cancelOperation()");
   }
 
@@ -321,7 +351,7 @@ public class CLIService extends Composit
   public void closeOperation(OperationHandle opHandle)
       throws HiveSQLException {
     sessionManager.getOperationManager().getOperation(opHandle)
-        .getParentSession().closeOperation(opHandle);
+    .getParentSession().closeOperation(opHandle);
     LOG.debug(opHandle + ": closeOperation");
   }
 

Modified: hive/trunk/service/src/java/org/apache/hive/service/cli/operation/ExecuteStatementOperation.java
URL: http://svn.apache.org/viewvc/hive/trunk/service/src/java/org/apache/hive/service/cli/operation/ExecuteStatementOperation.java?rev=1570935&r1=1570934&r2=1570935&view=diff
==============================================================================
--- hive/trunk/service/src/java/org/apache/hive/service/cli/operation/ExecuteStatementOperation.java (original)
+++ hive/trunk/service/src/java/org/apache/hive/service/cli/operation/ExecuteStatementOperation.java Sat Feb 22 22:18:39 2014
@@ -19,16 +19,12 @@ package org.apache.hive.service.cli.oper
 
 
 
+import java.sql.SQLException;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.Map;
-import java.util.Set;
-import java.sql.SQLException;
 
-import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.ql.processors.CommandProcessorFactory;
 import org.apache.hadoop.hive.ql.processors.CommandProcessor;
-import org.apache.hadoop.hive.ql.processors.HiveCommand;
+import org.apache.hadoop.hive.ql.processors.CommandProcessorFactory;
 import org.apache.hive.service.cli.HiveSQLException;
 import org.apache.hive.service.cli.OperationType;
 import org.apache.hive.service.cli.session.HiveSession;
@@ -37,8 +33,9 @@ public abstract class ExecuteStatementOp
   protected String statement = null;
   protected Map<String, String> confOverlay = new HashMap<String, String>();
 
-  public ExecuteStatementOperation(HiveSession parentSession, String statement, Map<String, String> confOverlay) {
-    super(parentSession, OperationType.EXECUTE_STATEMENT);
+  public ExecuteStatementOperation(HiveSession parentSession, String statement,
+      Map<String, String> confOverlay, boolean runInBackground) {
+    super(parentSession, OperationType.EXECUTE_STATEMENT, runInBackground);
     this.statement = statement;
     setConfOverlay(confOverlay);
   }
@@ -49,7 +46,7 @@ public abstract class ExecuteStatementOp
 
   public static ExecuteStatementOperation newExecuteStatementOperation(
       HiveSession parentSession, String statement, Map<String, String> confOverlay, boolean runAsync)
-      throws HiveSQLException {
+          throws HiveSQLException {
     String[] tokens = statement.trim().split("\\s+");
     CommandProcessor processor = null;
     try {

Modified: hive/trunk/service/src/java/org/apache/hive/service/cli/operation/HiveCommandOperation.java
URL: http://svn.apache.org/viewvc/hive/trunk/service/src/java/org/apache/hive/service/cli/operation/HiveCommandOperation.java?rev=1570935&r1=1570934&r2=1570935&view=diff
==============================================================================
--- hive/trunk/service/src/java/org/apache/hive/service/cli/operation/HiveCommandOperation.java (original)
+++ hive/trunk/service/src/java/org/apache/hive/service/cli/operation/HiveCommandOperation.java Sat Feb 22 22:18:39 2014
@@ -60,7 +60,7 @@ public class HiveCommandOperation extend
 
   protected HiveCommandOperation(HiveSession parentSession, String statement,
       CommandProcessor commandProcessor, Map<String, String> confOverlay) {
-    super(parentSession, statement, confOverlay);
+    super(parentSession, statement, confOverlay, false);
     this.commandProcessor = commandProcessor;
     setupSessionIO(parentSession.getSessionState());
   }

Modified: hive/trunk/service/src/java/org/apache/hive/service/cli/operation/MetadataOperation.java
URL: http://svn.apache.org/viewvc/hive/trunk/service/src/java/org/apache/hive/service/cli/operation/MetadataOperation.java?rev=1570935&r1=1570934&r2=1570935&view=diff
==============================================================================
--- hive/trunk/service/src/java/org/apache/hive/service/cli/operation/MetadataOperation.java (original)
+++ hive/trunk/service/src/java/org/apache/hive/service/cli/operation/MetadataOperation.java Sat Feb 22 22:18:39 2014
@@ -35,7 +35,7 @@ public abstract class MetadataOperation 
   private static final char SEARCH_STRING_ESCAPE = '\\';
 
   protected MetadataOperation(HiveSession parentSession, OperationType opType) {
-    super(parentSession, opType);
+    super(parentSession, opType, false);
     setHasResultSet(true);
   }
 
@@ -85,15 +85,15 @@ public abstract class MetadataOperation 
    * format '.*'  This is driven by the datanucleusFormat flag.
    */
   private String convertPattern(final String pattern, boolean datanucleusFormat) {
-      String wStr;
-      if (datanucleusFormat) {
-        wStr = "*";
-      } else {
-        wStr = ".*";
-      }
-      return pattern
-          .replaceAll("([^\\\\])%", "$1" + wStr).replaceAll("\\\\%", "%").replaceAll("^%", wStr)
-          .replaceAll("([^\\\\])_", "$1.").replaceAll("\\\\_", "_").replaceAll("^_", ".");
+    String wStr;
+    if (datanucleusFormat) {
+      wStr = "*";
+    } else {
+      wStr = ".*";
+    }
+    return pattern
+        .replaceAll("([^\\\\])%", "$1" + wStr).replaceAll("\\\\%", "%").replaceAll("^%", wStr)
+        .replaceAll("([^\\\\])_", "$1.").replaceAll("\\\\_", "_").replaceAll("^_", ".");
   }
 
 }

Modified: hive/trunk/service/src/java/org/apache/hive/service/cli/operation/Operation.java
URL: http://svn.apache.org/viewvc/hive/trunk/service/src/java/org/apache/hive/service/cli/operation/Operation.java?rev=1570935&r1=1570934&r2=1570935&view=diff
==============================================================================
--- hive/trunk/service/src/java/org/apache/hive/service/cli/operation/Operation.java (original)
+++ hive/trunk/service/src/java/org/apache/hive/service/cli/operation/Operation.java Sat Feb 22 22:18:39 2014
@@ -18,6 +18,7 @@
 package org.apache.hive.service.cli.operation;
 
 import java.util.EnumSet;
+import java.util.concurrent.Future;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -43,16 +44,31 @@ public abstract class Operation {
   public static final long DEFAULT_FETCH_MAX_ROWS = 100;
   protected boolean hasResultSet;
   protected volatile HiveSQLException operationException;
+  protected final boolean runAsync;
+  protected volatile Future<?> backgroundHandle;
 
   protected static final EnumSet<FetchOrientation> DEFAULT_FETCH_ORIENTATION_SET =
       EnumSet.of(FetchOrientation.FETCH_NEXT,FetchOrientation.FETCH_FIRST);
-  
-  protected Operation(HiveSession parentSession, OperationType opType) {
+
+  protected Operation(HiveSession parentSession, OperationType opType, boolean runInBackground) {
     super();
     this.parentSession = parentSession;
+    this.runAsync = runInBackground;
     this.opHandle = new OperationHandle(opType, parentSession.getProtocolVersion());
   }
 
+  public Future<?> getBackgroundHandle() {
+    return backgroundHandle;
+  }
+
+  protected void setBackgroundHandle(Future<?> backgroundHandle) {
+    this.backgroundHandle = backgroundHandle;
+  }
+
+  public boolean shouldRunAsync() {
+    return runAsync;
+  }
+
   public void setConfiguration(HiveConf configuration) {
     this.configuration = new HiveConf(configuration);
   }
@@ -160,7 +176,7 @@ public abstract class Operation {
       EnumSet<FetchOrientation> supportedOrientations) throws HiveSQLException {
     if (!supportedOrientations.contains(orientation)) {
       throw new HiveSQLException("The fetch type " + orientation.toString() +
-        " is not supported for this resultset", "HY106");
+          " is not supported for this resultset", "HY106");
     }
   }
 }

Modified: hive/trunk/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java
URL: http://svn.apache.org/viewvc/hive/trunk/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java?rev=1570935&r1=1570934&r2=1570935&view=diff
==============================================================================
--- hive/trunk/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java (original)
+++ hive/trunk/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java Sat Feb 22 22:18:39 2014
@@ -66,15 +66,12 @@ public class SQLOperation extends Execut
   private TableSchema resultSchema = null;
   private Schema mResultSchema = null;
   private SerDe serde = null;
-  private final boolean runAsync;
-  private volatile Future<?> backgroundHandle;
   private boolean fetchStarted = false;
 
   public SQLOperation(HiveSession parentSession, String statement, Map<String,
       String> confOverlay, boolean runInBackground) {
     // TODO: call setRemoteUser in ExecuteStatementOperation or higher.
-    super(parentSession, statement, confOverlay);
-    this.runAsync = runInBackground;
+    super(parentSession, statement, confOverlay, runInBackground);
   }
 
   /***
@@ -159,7 +156,7 @@ public class SQLOperation extends Execut
   public void run() throws HiveSQLException {
     setState(OperationState.PENDING);
     prepare(getConfigForOperation());
-    if (!runAsync) {
+    if (!shouldRunAsync()) {
       runInternal(getConfigForOperation());
     } else {
       Runnable backgroundOperation = new Runnable() {
@@ -177,8 +174,9 @@ public class SQLOperation extends Execut
       };
       try {
         // This submit blocks if no background threads are available to run this operation
-        backgroundHandle =
+        Future<?> backgroundHandle =
             getParentSession().getSessionManager().submitBackgroundOperation(backgroundOperation);
+        setBackgroundHandle(backgroundHandle);
       } catch (RejectedExecutionException rejected) {
         setState(OperationState.ERROR);
         throw new HiveSQLException("All the asynchronous threads are currently busy, " +
@@ -189,7 +187,8 @@ public class SQLOperation extends Execut
 
   private void cleanup(OperationState state) throws HiveSQLException {
     setState(state);
-    if (runAsync) {
+    if (shouldRunAsync()) {
+      Future<?> backgroundHandle = getBackgroundHandle();
       if (backgroundHandle != null) {
         backgroundHandle.cancel(true);
       }
@@ -349,7 +348,7 @@ public class SQLOperation extends Execut
    */
   private HiveConf getConfigForOperation() throws HiveSQLException {
     HiveConf sqlOperationConf = getParentSession().getHiveConf();
-    if (!getConfOverlay().isEmpty() || runAsync) {
+    if (!getConfOverlay().isEmpty() || shouldRunAsync()) {
       // clone the partent session config for this query
       sqlOperationConf = new HiveConf(sqlOperationConf);
 
@@ -364,5 +363,4 @@ public class SQLOperation extends Execut
     }
     return sqlOperationConf;
   }
-
 }

Modified: hive/trunk/service/src/test/org/apache/hive/service/cli/CLIServiceTest.java
URL: http://svn.apache.org/viewvc/hive/trunk/service/src/test/org/apache/hive/service/cli/CLIServiceTest.java?rev=1570935&r1=1570934&r2=1570935&view=diff
==============================================================================
--- hive/trunk/service/src/test/org/apache/hive/service/cli/CLIServiceTest.java (original)
+++ hive/trunk/service/src/test/org/apache/hive/service/cli/CLIServiceTest.java Sat Feb 22 22:18:39 2014
@@ -22,14 +22,13 @@ import static org.junit.Assert.assertEqu
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
-import static org.junit.Assert.assertTrue;
 
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 
 import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.ql.ErrorMsg;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -57,7 +56,7 @@ public abstract class CLIServiceTest {
   }
 
   @Test
-  public void openSessionTest() throws Exception {
+  public void testOpenSession() throws Exception {
     SessionHandle sessionHandle = client.openSession(
         "tom", "password", Collections.<String, String>emptyMap());
     assertNotNull(sessionHandle);
@@ -69,7 +68,7 @@ public abstract class CLIServiceTest {
   }
 
   @Test
-  public void getFunctionsTest() throws Exception {
+  public void testGetFunctions() throws Exception {
     SessionHandle sessionHandle = client.openSession("tom", "password");
     assertNotNull(sessionHandle);
 
@@ -106,7 +105,7 @@ public abstract class CLIServiceTest {
   }
 
   @Test
-  public void getInfoTest() throws Exception {
+  public void testGetInfo() throws Exception {
     SessionHandle sessionHandle = client.openSession(
         "tom", "password", Collections.<String, String>emptyMap());
     assertNotNull(sessionHandle);
@@ -123,6 +122,10 @@ public abstract class CLIServiceTest {
     client.closeSession(sessionHandle);
   }
 
+  /**
+   * Test the blocking execution of a query
+   * @throws Exception
+   */
   @Test
   public void testExecuteStatement() throws Exception {
     HashMap<String, String> confOverlay = new HashMap<String, String>();
@@ -161,113 +164,171 @@ public abstract class CLIServiceTest {
     client.closeSession(sessionHandle);
   }
 
+  /**
+   * Test async execution of a well-formed and a malformed query with different long polling durations
+   * - Test malformed query with default long polling timeout
+   * - Test well-formed query with default long polling timeout
+   * - Test well-formed query with long polling timeout set to 0
+   * - Test well-formed query with long polling timeout set to 500 millis
+   * - Test well-formed query cancellation
+   * @throws Exception
+   */
   @Test
   public void testExecuteStatementAsync() throws Exception {
-    HashMap<String, String> confOverlay = new HashMap<String, String>();
-    SessionHandle sessionHandle = client.openSession("tom", "password",
-        new HashMap<String, String>());
-    // Timeout for the poll in case of asynchronous execute
-    long pollTimeout = System.currentTimeMillis() + 100000;
+    Map<String, String> confOverlay = new HashMap<String, String>();
+    String tableName = "TEST_EXEC_ASYNC";
+    String columnDefinitions = "(ID STRING)";
+    String queryString;
+
+    // Open a session and set up the test data
+    SessionHandle sessionHandle = setupTestData(tableName, columnDefinitions, confOverlay);
     assertNotNull(sessionHandle);
+
     OperationState state = null;
     OperationHandle opHandle;
     OperationStatus opStatus = null;
 
     // Change lock manager, otherwise unit-test doesn't go through
-    String queryString = "SET " + HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY.varname
+    queryString = "SET " + HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY.varname
         + " = false";
     opHandle = client.executeStatement(sessionHandle, queryString, confOverlay);
     client.closeOperation(opHandle);
 
-    // Drop the table if it exists
-    queryString = "DROP TABLE IF EXISTS TEST_EXEC_ASYNC";
-    opHandle = client.executeStatement(sessionHandle, queryString, confOverlay);
-    client.closeOperation(opHandle);
+    // Set longPollingTimeout to a custom value for different test cases
+    long longPollingTimeout;
 
-    // Create a test table
-    queryString = "CREATE TABLE TEST_EXEC_ASYNC(ID STRING)";
-    opHandle = client.executeStatement(sessionHandle, queryString, confOverlay);
-    client.closeOperation(opHandle);
-
-    // Test async execution response when query is malformed
-    // Compile time error
-    // This query will error out during compilation (which is done synchronous as of now)
-    String wrongQueryString = "SELECT NON_EXISTANT_COLUMN FROM TEST_EXEC_ASYNC";
+    /**
+     * Execute a malformed async query with default config,
+     * to give a compile time error.
+     * (compilation is done synchronous as of now)
+     */
+    longPollingTimeout = new HiveConf().getLongVar(ConfVars.HIVE_SERVER2_LONG_POLLING_TIMEOUT);
+    queryString = "SELECT NON_EXISTING_COLUMN FROM " + tableName;
     try {
-      opHandle = client.executeStatementAsync(sessionHandle, wrongQueryString, confOverlay);
-      fail("Async syntax excution should fail");
-    } catch (HiveSQLException e) {
+      runQueryAsync(sessionHandle, queryString, confOverlay, OperationState.ERROR, longPollingTimeout);
+    }
+    catch (HiveSQLException e) {
       // expected error
     }
-    
-
-    // Runtime error
-    wrongQueryString = "CREATE TABLE NON_EXISTING_TAB (ID STRING) location 'hdfs://fooNN:10000/a/b/c'";
-    opHandle = client.executeStatementAsync(sessionHandle, wrongQueryString, confOverlay);
-
-    int count = 0;
-    while (true) {
-      // Break if polling times out
-      if (System.currentTimeMillis() > pollTimeout) {
-        System.out.println("Polling timed out");
-        break;
-      }
-      opStatus = client.getOperationStatus(opHandle);
-      state = opStatus.getState();
-      System.out.println("Polling: " + opHandle + " count=" + (++count)
-          + " state=" + state);
 
-      if (state == OperationState.CANCELED || state == OperationState.CLOSED
-          || state == OperationState.FINISHED || state == OperationState.ERROR) {
-        break;
-      }
-      Thread.sleep(1000);
-    }
-    assertEquals("Operation should be in error state", OperationState.ERROR, state);
+    /**
+     * Execute a malformed async query with default config,
+     * to give a runtime time error.
+     * Also check that the sqlState and errorCode should be set
+     */
+    queryString = "CREATE TABLE NON_EXISTING_TAB (ID STRING) location 'hdfs://localhost:10000/a/b/c'";
+    opStatus = runQueryAsync(sessionHandle, queryString, confOverlay, OperationState.ERROR, longPollingTimeout);
     // sqlState, errorCode should be set
     assertEquals(opStatus.getOperationException().getSQLState(), "08S01");
     assertEquals(opStatus.getOperationException().getErrorCode(), 1);
-    client.closeOperation(opHandle);
-    
-    // Test async execution when query is well formed
-    queryString = "SELECT ID FROM TEST_EXEC_ASYNC";
+    /**
+     * Execute an async query with default config
+     */
+    queryString = "SELECT ID FROM " + tableName;
+    runQueryAsync(sessionHandle, queryString, confOverlay, OperationState.FINISHED, longPollingTimeout);
+
+    /**
+     * Execute an async query with long polling timeout set to 0
+     */
+    longPollingTimeout = 0;
+    queryString = "SELECT ID FROM " + tableName;
+    runQueryAsync(sessionHandle, queryString, confOverlay, OperationState.FINISHED, longPollingTimeout);
+
+    /**
+     * Execute an async query with long polling timeout set to 500 millis
+     */
+    longPollingTimeout = 500;
+    queryString = "SELECT ID FROM " + tableName;
+    runQueryAsync(sessionHandle, queryString, confOverlay, OperationState.FINISHED, longPollingTimeout);
+
+    /**
+     * Cancellation test
+     */
+    queryString = "SELECT ID FROM " + tableName;
     opHandle = client.executeStatementAsync(sessionHandle, queryString, confOverlay);
-    assertTrue(opHandle.hasResultSet());
-    
-    count = 0;
+    System.out.println("Cancelling " + opHandle);
+    client.cancelOperation(opHandle);
+    state = client.getOperationStatus(opHandle).getState();
+    System.out.println(opHandle + " after cancelling, state= " + state);
+    assertEquals("Query should be cancelled", OperationState.CANCELED, state);
+
+    // Cleanup
+    queryString = "DROP TABLE " + tableName;
+    client.executeStatement(sessionHandle, queryString, confOverlay);
+    client.closeSession(sessionHandle);
+  }
+
+  /**
+   * Sets up a test specific table with the given column definitions and config
+   * @param tableName
+   * @param columnDefinitions
+   * @param confOverlay
+   * @throws Exception
+   */
+  private SessionHandle setupTestData(String tableName, String columnDefinitions,
+      Map<String, String> confOverlay) throws Exception {
+    SessionHandle sessionHandle = client.openSession("tom", "password", confOverlay);
+    assertNotNull(sessionHandle);
+
+    String queryString = "SET " + HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY.varname
+        + " = false";
+    client.executeStatement(sessionHandle, queryString, confOverlay);
+
+    // Drop the table if it exists
+    queryString = "DROP TABLE IF EXISTS " + tableName;
+    client.executeStatement(sessionHandle, queryString, confOverlay);
+
+    // Create a test table
+    queryString = "CREATE TABLE " + tableName + columnDefinitions;
+    client.executeStatement(sessionHandle, queryString, confOverlay);
+
+    return sessionHandle;
+  }
+
+  private OperationStatus runQueryAsync(SessionHandle sessionHandle, String queryString,
+      Map<String, String> confOverlay, OperationState expectedState,
+      long longPollingTimeout) throws HiveSQLException {
+    // Timeout for the iteration in case of asynchronous execute
+    long testIterationTimeout = System.currentTimeMillis() + 100000;
+    long longPollingStart;
+    long longPollingEnd;
+    long longPollingTimeDelta;
+    OperationStatus opStatus = null;
+    OperationState state = null;
+    confOverlay.put(HiveConf.ConfVars.HIVE_SERVER2_LONG_POLLING_TIMEOUT.varname, String.valueOf(longPollingTimeout));
+    OperationHandle opHandle = client.executeStatementAsync(sessionHandle, queryString, confOverlay);
+    int count = 0;
     while (true) {
-      // Break if polling times out
-      if (System.currentTimeMillis() > pollTimeout) {
+      // Break if iteration times out
+      if (System.currentTimeMillis() > testIterationTimeout) {
         System.out.println("Polling timed out");
         break;
       }
+      longPollingStart = System.currentTimeMillis();
+      System.out.println("Long polling starts at: " + longPollingStart);
       opStatus = client.getOperationStatus(opHandle);
       state = opStatus.getState();
+      longPollingEnd = System.currentTimeMillis();
+      System.out.println("Long polling ends at: " + longPollingEnd);
+
       System.out.println("Polling: " + opHandle + " count=" + (++count)
           + " state=" + state);
 
-      if (state == OperationState.CANCELED || state == OperationState.CLOSED
-          || state == OperationState.FINISHED || state == OperationState.ERROR) {
+      if (state == OperationState.CANCELED ||
+          state == OperationState.CLOSED ||
+          state == OperationState.FINISHED ||
+          state == OperationState.ERROR) {
         break;
+      } else {
+        // Verify that getOperationStatus returned only after the long polling timeout
+        longPollingTimeDelta = longPollingEnd - longPollingStart;
+        // Scale down by a factor of 0.9 to account for approximate values
+        assertTrue(longPollingTimeDelta - 0.9*longPollingTimeout > 0);
       }
-      Thread.sleep(1000);
     }
-    assertEquals("Query should be finished", OperationState.FINISHED, state);
-    client.closeOperation(opHandle);
-
-    // Cancellation test
-    opHandle = client.executeStatementAsync(sessionHandle, queryString, confOverlay);
-    System.out.println("cancelling " + opHandle);
-    client.cancelOperation(opHandle);
-    state = client.getOperationStatus(opHandle).getState();
-    System.out.println(opHandle + " after cancelling, state= " + state);
-    assertEquals("Query should be cancelled", OperationState.CANCELED, state);
-
-    // Cleanup
-    queryString = "DROP TABLE IF EXISTS TEST_EXEC_ASYNC";
-    opHandle = client.executeStatement(sessionHandle, queryString, confOverlay);
+    assertEquals(expectedState, client.getOperationStatus(opHandle).getState());
     client.closeOperation(opHandle);
-    client.closeSession(sessionHandle);
+    return opStatus;
   }
 
   /**