You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2019/08/16 23:01:45 UTC

[impala] branch master updated (f24432f -> df2c6f2)

This is an automated email from the ASF dual-hosted git repository.

tarmstrong pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git.


    from f24432f  IMPALA-8862: Don't ship jetty and ant
     new 4ee31de  IMPALA-8661 : Add randomized tests to stress MetastoreEventsProcessor
     new 70b0492  IMPALA-8847: Fix test_event_processing.py on Hive-3
     new df2c6f2  IMPALA-8841: Try to fix Tez related dataload flakiness

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../catalog/events/MetastoreEventsProcessor.java   |   15 +-
 .../catalog/events/EventsProcessorStressTest.java  |  232 ++++
 .../apache/impala/testutil/HiveJdbcClientPool.java |  163 +++
 .../apache/impala/testutil/ImpalaJdbcClient.java   |   27 +-
 .../java/org/apache/impala/testutil/TestUtils.java |   16 +
 .../apache/impala/util/RandomHiveQueryRunner.java  | 1169 ++++++++++++++++++++
 testdata/bin/create-load-data.sh                   |   15 +
 tests/custom_cluster/test_event_processing.py      |    2 +-
 tests/test-hive-udfs/pom.xml                       |    4 +
 9 files changed, 1618 insertions(+), 25 deletions(-)
 create mode 100644 fe/src/test/java/org/apache/impala/catalog/events/EventsProcessorStressTest.java
 create mode 100644 fe/src/test/java/org/apache/impala/testutil/HiveJdbcClientPool.java
 create mode 100644 fe/src/test/java/org/apache/impala/util/RandomHiveQueryRunner.java


[impala] 02/03: IMPALA-8847: Fix test_event_processing.py on Hive-3

Posted by ta...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tarmstrong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 70b0492e6b0322ec5b673a2ab20e02d9e35d0a93
Author: Vihang Karajgaonkar <vi...@apache.org>
AuthorDate: Fri Aug 16 10:16:23 2019 -0700

    IMPALA-8847: Fix test_event_processing.py on Hive-3
    
    The test_event_processing fails on Hive-3 due to a error in the syntax of the
    hive query for transactional tables. It was missed unfortunately when
    IMPALA-8847 was merged.
    
    Testing done:
    Ran the test on both Hive-2 and Hive-3 and it passes now
    
    Change-Id: I6507540bfbc0d131a061865ed9ed94792ccfa758
    Reviewed-on: http://gerrit.cloudera.org:8080/14082
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
    Reviewed-by: Vihang Karajgaonkar <vi...@cloudera.com>
---
 tests/custom_cluster/test_event_processing.py | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/tests/custom_cluster/test_event_processing.py b/tests/custom_cluster/test_event_processing.py
index f099978..9605381 100644
--- a/tests/custom_cluster/test_event_processing.py
+++ b/tests/custom_cluster/test_event_processing.py
@@ -150,7 +150,7 @@ class TestEventProcessing(CustomClusterTestSuite):
            "'transactional_properties'='insert_only')"
     test_tbl = unique_database + ".test_events"
     self.run_stmt_in_hive("create table {0} (key string, value string) \
-      partitioned by (year int) {1} stored as parquet".format(test_tbl, TBLPROPERTIES))
+      partitioned by (year int) stored as parquet {1}".format(test_tbl, TBLPROPERTIES))
     EventProcessorUtils.wait_for_event_processing(self.hive_client)
     self.client.execute("describe {0}".format(test_tbl))
 


[impala] 03/03: IMPALA-8841: Try to fix Tez related dataload flakiness

Posted by ta...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tarmstrong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit df2c6f200f66e6849e17ef177c99adf035766d6a
Author: Csaba Ringhofer <cs...@cloudera.com>
AuthorDate: Fri Aug 16 16:18:29 2019 +0200

    IMPALA-8841: Try to fix Tez related dataload flakiness
    
    The flakiness may be related to starting Hive queries in parallel which
    triggers initializing Tez resources in parallel (only needed at the
    first statement that uses Tez). Doing a non-parallel statement at first
    may solve the issue.
    
    Also includes a fix for a recent issue in  'build-and-copy-hive-udfs'
    introduced by the version bump
    in https://gerrit.cloudera.org/#/c/14043/
    
    Change-Id: Id21d57483fe7a4f72f450fb71f8f53b3c1ef6327
    Reviewed-on: http://gerrit.cloudera.org:8080/14081
    Reviewed-by: Vihang Karajgaonkar <vi...@cloudera.com>
    Reviewed-by: Tim Armstrong <ta...@cloudera.com>
    Tested-by: Tim Armstrong <ta...@cloudera.com>
---
 testdata/bin/create-load-data.sh | 15 +++++++++++++++
 tests/test-hive-udfs/pom.xml     |  4 ++++
 2 files changed, 19 insertions(+)

diff --git a/testdata/bin/create-load-data.sh b/testdata/bin/create-load-data.sh
index 74f0f63..a081280 100755
--- a/testdata/bin/create-load-data.sh
+++ b/testdata/bin/create-load-data.sh
@@ -51,6 +51,7 @@ SNAPSHOT_FILE=""
 LOAD_DATA_ARGS=""
 EXPLORATION_STRATEGY="exhaustive"
 export JDBC_URL="jdbc:hive2://${HS2_HOST_PORT}/default;"
+HIVE_CMD="beeline -n $USER -u $JDBC_URL"
 
 # For logging when using run-step.
 LOG_DIR=${IMPALA_DATA_LOADING_LOGS_DIR}
@@ -588,6 +589,15 @@ function check-hdfs-health {
   done
 }
 
+function warm-up-hive {
+  echo "Running warm up Hive statements"
+  $HIVE_CMD -e "create database if not exists functional;"
+  $HIVE_CMD -e "create table if not exists hive_warm_up_tbl (i int);"
+  # The insert below starts a Tez session (if Hive uses Tez) and initializes
+  # .hiveJars directory in HDFS, see IMPALA-8841.
+  $HIVE_CMD -e "insert overwrite table hive_warm_up_tbl values (1);"
+}
+
 # For kerberized clusters, use kerberos
 if ${CLUSTER_DIR}/admin is_kerberized; then
   LOAD_DATA_ARGS="${LOAD_DATA_ARGS} --use_kerberos --principal=${MINIKDC_PRINC_HIVE}"
@@ -607,6 +617,11 @@ if [[ "${TARGET_FILESYSTEM}" == "hdfs" ]]; then
 fi
 
 if [ $SKIP_METADATA_LOAD -eq 0 ]; then
+  # Using Hive in non-parallel mode before starting parallel execution may help with some
+  # flakiness during data load, see IMPALA-8841. The problem only occurs in Hive 3
+  # environment, but always doing the warm up shouldn't hurt much and may make it easier
+  # to investigate future issues where Hive doesn't work at all.
+  warm-up-hive
   run-step "Loading custom schemas" load-custom-schemas.log load-custom-schemas
   # Run some steps in parallel, with run-step-backgroundable / run-step-wait-all.
   # This is effective on steps that take a long time and don't depend on each
diff --git a/tests/test-hive-udfs/pom.xml b/tests/test-hive-udfs/pom.xml
index eb3ff82..e51d598 100644
--- a/tests/test-hive-udfs/pom.xml
+++ b/tests/test-hive-udfs/pom.xml
@@ -68,6 +68,10 @@ under the License.
           <groupId>net.minidev</groupId>
           <artifactId>json-smart</artifactId>
         </exclusion>
+        <exclusion>
+          <groupId>org.apache.hive.shims</groupId>
+          <artifactId>hive-shims-0.20</artifactId>
+        </exclusion>
       </exclusions>
     </dependency>
     <dependency>


[impala] 01/03: IMPALA-8661 : Add randomized tests to stress MetastoreEventsProcessor

Posted by ta...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tarmstrong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 4ee31de3a104b4d64008a087925b4830e95ea826
Author: Vihang Karajgaonkar <vi...@cloudera.com>
AuthorDate: Wed Jul 10 10:24:03 2019 -0700

    IMPALA-8661 : Add randomized tests to stress MetastoreEventsProcessor
    
    This change adds a new stress test for MetastoreEventsProcessor. This
    test randomly executes hive queries to generate a lot of events. The
    event processor is invoked at random intervals so that a variable batch
    of events is processed everytime. After each batch is processed, the
    test checks the status of events processor. By default, on CDH builds
    the test is configured to run with 4 concurrent Hive clients and each
    of the client runs 50 random Hive queries. These defaults can be
    overridden by passing system properties using maven command arguments
    "-DnumClients" and "-DnumQueriesPerClients". Additionally, the test
    also creates impala clients which keep issuing refresh table commands
    on the test databases to make sure that eventProcessor is doing some
    real work rather than invalidating/refreshing tables which are
    already incomplete.
    
    This test is added as a junit test and uses Hive JDBC to issue the sqls.
    This is much faster than the end-to-end python test which issues each
    hive query in a separate beeline sessions which re-establishes the
    connection every time.
    
    Notes:
    1. Ran the test with defaults. It generates about 500 events
    and runs for close to 4.5 min. This can be changed to a lower
    value if we see significant increased delay in the test job runtimes.
    3. On CDP builds the concurrent hive queries run very slow due to
    container provisioning time on the minicluster. I have left this as a
    TODO to investigate. The test runs in single threaded mode with
    increased number of queries when running against Hive-3
    
    Change-Id: I8c85b83efd4f56b5ae0e8d1dc6a2ee2feb6721ce
    Reviewed-on: http://gerrit.cloudera.org:8080/13932
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
    Reviewed-by: Vihang Karajgaonkar <vi...@cloudera.com>
---
 .../catalog/events/MetastoreEventsProcessor.java   |   15 +-
 .../catalog/events/EventsProcessorStressTest.java  |  232 ++++
 .../apache/impala/testutil/HiveJdbcClientPool.java |  163 +++
 .../apache/impala/testutil/ImpalaJdbcClient.java   |   27 +-
 .../java/org/apache/impala/testutil/TestUtils.java |   16 +
 .../apache/impala/util/RandomHiveQueryRunner.java  | 1169 ++++++++++++++++++++
 6 files changed, 1598 insertions(+), 24 deletions(-)

diff --git a/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEventsProcessor.java b/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEventsProcessor.java
index 5edc2d3..e0888f8 100644
--- a/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEventsProcessor.java
+++ b/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEventsProcessor.java
@@ -62,10 +62,11 @@ import org.slf4j.LoggerFactory;
  * configured to talk with the same metastore.
  *
  * This class is used to poll metastore for such events at a given frequency. By observing
- * such events, we can take appropriate action on the catalogD (invalidate/add/remove) so
- * that catalog represents the latest information available in metastore. We keep track of
- * the last synced event id in each polling iteration so the next batch can be requested
- * appropriately. The current batch size is constant and set to MAX_EVENTS_PER_RPC.
+ * such events, we can take appropriate action on the catalogD
+ * (refresh/invalidate/add/remove) so that catalog represents the latest information
+ * available in metastore. We keep track of the last synced event id in each polling
+ * iteration so the next batch can be requested appropriately. The current batch size is
+ * constant and set to MAX_EVENTS_PER_RPC.
  *
  * <pre>
  *      +---------------+   +----------------+        +--------------+
@@ -126,8 +127,10 @@ import org.slf4j.LoggerFactory;
  * +-------------+--------------+------------+------------+
  * </pre>
  *
- * Current event handlers only rely on createTime on Table and Partition. Database
- * createTime is a work-in-progress in Hive in (HIVE-20776)
+ * Currently event handlers rely on creation time on Database, Table and Partition to
+ * uniquely determine if the object from event is same as object in the catalog. This
+ * information is used to make sure that we are deleting the right incarnation of the
+ * object when compared to Metastore.
  *
  * Self-events:
  * Events could be generated by this Catalog's operations as well. Invalidating table
diff --git a/fe/src/test/java/org/apache/impala/catalog/events/EventsProcessorStressTest.java b/fe/src/test/java/org/apache/impala/catalog/events/EventsProcessorStressTest.java
new file mode 100644
index 0000000..a340ed1
--- /dev/null
+++ b/fe/src/test/java/org/apache/impala/catalog/events/EventsProcessorStressTest.java
@@ -0,0 +1,232 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.impala.catalog.events;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import org.apache.hadoop.hive.metastore.api.CurrentNotificationEventId;
+import org.apache.impala.catalog.CatalogException;
+import org.apache.impala.catalog.CatalogServiceCatalog;
+import org.apache.impala.catalog.MetaStoreClientPool.MetaStoreClient;
+import org.apache.impala.catalog.events.MetastoreEventsProcessor.EventProcessorStatus;
+import org.apache.impala.util.RandomHiveQueryRunner;
+import org.apache.impala.common.Pair;
+import org.apache.impala.compat.MetastoreShim;
+import org.apache.impala.testutil.CatalogServiceTestCatalog;
+import org.apache.impala.util.PatternMatcher;
+import org.apache.thrift.TException;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This stress test for EventsProcessor generates a lot of events by running many Hive
+ * queries in parallel. At the same time it creates impala clients which continuously
+ * keeps refreshing the tables in the test database. While events are being generated,
+ * the event processor is triggered at random intervals to simulate a variable sized
+ * event batches. As of July 2019 this test generates about 2110 events on Hive-2 with
+ * the default number of Clients and number of queries per client. The defaults can be
+ * overridden by providing system properties for "numClients" and "numQueriesPerClient"
+ * to the test
+ */
+public class EventsProcessorStressTest {
+
+  // use a fixed seed value to make the test repeatable
+  private static final Random random = new Random(117);
+
+  private static CatalogServiceCatalog catalog_;
+  private static MetastoreEventsProcessor eventsProcessor_;
+  private static final String testDbPrefix_ = "events_stress_db_";
+  private static final String testTblPrefix_ = "stress_test_tbl_";
+  // number of concurrent hive and impala clients
+  private static final int numClients_;
+  // total number of random queries per client
+  private static final int numQueriesPerClient_;
+  private ExecutorService impalaRefreshExecutorService_;
+  private static final Logger LOG =
+      LoggerFactory.getLogger(EventsProcessorStressTest.class);
+
+  static {
+    Pair<Integer, Integer> configs = getConcurrencyConfigs();
+    numClients_ = configs.first;
+    numQueriesPerClient_ = configs.second;
+  }
+
+  /**
+   * Gets the concurrency factor (number of clients nad number of queries per client) for
+   * the test
+   */
+  private static Pair<Integer, Integer> getConcurrencyConfigs() {
+    int numClients = 4;
+    if (System.getProperty("numClients") != null) {
+      numClients = Integer.parseInt(System.getProperty("numClients"));
+    } else if (MetastoreShim.getMajorVersion() >= 3) {
+      // in CDP environment, we run hive queries on Tez using Yarn applications
+      // currently there are limitations due to which we can only run one session at time
+      // without affecting the performance
+      numClients = 1;
+    }
+    int numQueriesPerClient = 50;
+    if (System.getProperty("numQueriesPerClient") != null) {
+      numQueriesPerClient = Integer.parseInt(System.getProperty("numQueriesPerClient"));
+    } else if (MetastoreShim.getMajorVersion() >= 3) {
+      // in CDP we use only 1 client, so increase the number of queries to have
+      // reasonable number of events generated
+      numQueriesPerClient = 200;
+    }
+    return new Pair<>(numClients, numQueriesPerClient);
+  }
+
+  /**
+   * Initialize the test catalog and start a synchronous events processor
+   */
+  @BeforeClass
+  public static void setupTestEnv() throws Exception {
+    catalog_ = CatalogServiceTestCatalog.create();
+    try (MetaStoreClient metaStoreClient = catalog_.getMetaStoreClient()) {
+      CurrentNotificationEventId currentNotificationId =
+          metaStoreClient.getHiveClient().getCurrentNotificationEventId();
+      eventsProcessor_ = new SynchronousHMSEventProcessorForTests(
+          catalog_, currentNotificationId.getEventId(), 10L);
+      eventsProcessor_.start();
+    }
+    catalog_.setMetastoreEventProcessor(eventsProcessor_);
+  }
+
+  /**
+   * Cleans up the test databases and shuts down the event processor
+   */
+  @AfterClass
+  public static void destroyTestEnv() {
+    try {
+      for (int i = 0; i < numClients_; i++) {
+        try (MetaStoreClient msClient = catalog_.getMetaStoreClient()) {
+          msClient.getHiveClient().dropDatabase(testDbPrefix_ + i, true, true, true);
+        }
+        // remove database from catalog as well to clean up catalog state
+        catalog_.removeDb(testDbPrefix_ + i);
+      }
+    } catch (Exception ex) {
+      // ignored
+    } finally {
+      if (eventsProcessor_ != null) {
+        eventsProcessor_.shutdown();
+      }
+    }
+  }
+
+  /**
+   * Creates Impala clients which issue reload on table in the test data base in a loop to
+   * simulate a actual load in a real system. Having the tables loaded is important so
+   * that events processor is doing some real work rather than just invalidating
+   * incomplete tables.
+   */
+  private void startImpalaRefreshClients() {
+    impalaRefreshExecutorService_ = Executors.newFixedThreadPool(numClients_,
+        new ThreadFactoryBuilder().setNameFormat("impala-refresh-client-%d")
+            .setDaemon(true).build());
+    for (int i = 0; i < numClients_; i++) {
+      impalaRefreshExecutorService_.submit((Runnable) () -> {
+        final int clientId =
+            Integer.parseInt(Thread.currentThread().getName().substring(
+                "impala-refresh-client-".length()));
+        final String dbName = testDbPrefix_ + clientId;
+        while (true) {
+          try {
+            List<String> tablenames = catalog_.getTableNames(dbName,
+                PatternMatcher.MATCHER_MATCH_ALL);
+            for (String tbl : tablenames) {
+              catalog_.reloadTable(catalog_.getTable(dbName, tbl), "test refresh "
+                  + "operation for events stress test");
+            }
+            // wait for a random duration between 0 and 3 seconds. We want the refresh
+            // clients to aggressive than the event polling threads
+            Thread.sleep(random.nextInt(3000));
+          } catch (CatalogException | InterruptedException e) {
+            // ignore exceptions since it is possible that hive has dropped the
+            // database or table
+          }
+        }
+      });
+    }
+  }
+
+  /**
+   * Stop the impala refresh clients
+   */
+  private void stopImpalaRefreshClients() {
+    impalaRefreshExecutorService_.shutdownNow();
+  }
+
+  /**
+   * Gets the current notification event id
+   */
+  private long getCurrentNotificationId() throws TException {
+    try (MetaStoreClient metaStoreClient = catalog_.getMetaStoreClient()) {
+      return metaStoreClient.getHiveClient().getCurrentNotificationEventId().getEventId();
+    }
+  }
+
+  @Test
+  public void testUsingRandomHiveQueries() throws Exception {
+    LOG.info("Using number of clients: {} number of queries per client: {}", numClients_,
+        numQueriesPerClient_);
+    final RandomHiveQueryRunner queryRunner = new RandomHiveQueryRunner(random,
+        testDbPrefix_, testTblPrefix_, numClients_, numQueriesPerClient_, null);
+    long eventIdBefore = getCurrentNotificationId();
+    queryRunner.start();
+    startImpalaRefreshClients();
+    try {
+      while (!queryRunner.isTerminated()) {
+        // randomly wait between 0 and 10 seconds
+        Thread.sleep(random.nextInt(10000));
+        eventsProcessor_.processEvents();
+        // make sure that events processor is in ACTIVE state after every batch which
+        // is processed
+        Assert.assertEquals(EventProcessorStatus.ACTIVE, eventsProcessor_.getStatus());
+      }
+      queryRunner.checkForErrors();
+    } catch (Exception ex) {
+      LOG.error(ex.getMessage(), ex);
+      Assert.fail(unwrapCause(ex));
+    } finally {
+      stopImpalaRefreshClients();
+      queryRunner.shutdownNow();
+      LOG.info("Total number of events generated {}",
+          getCurrentNotificationId() - eventIdBefore);
+    }
+  }
+
+  /**
+   * Unwraps the exception cause from the trace
+   */
+  private String unwrapCause(Throwable ex) {
+    String cause = ex.getMessage();
+    while (ex.getCause() != null) {
+      cause = ex.getCause().getMessage();
+      ex = ex.getCause();
+    }
+    return cause;
+  }
+}
diff --git a/fe/src/test/java/org/apache/impala/testutil/HiveJdbcClientPool.java b/fe/src/test/java/org/apache/impala/testutil/HiveJdbcClientPool.java
new file mode 100644
index 0000000..c83ca8b
--- /dev/null
+++ b/fe/src/test/java/org/apache/impala/testutil/HiveJdbcClientPool.java
@@ -0,0 +1,163 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.impala.testutil;
+
+import com.google.common.base.Preconditions;
+import java.io.Closeable;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Simple connection pooling implementation which opens a given number of connections
+ * to HS2 and reuses the connections to submit the queries. It exposes a autocloseable
+ * HiveJdbcClient class which can be use to submit hive queries. When the close is
+ * called on the HiveJdbcClient it releases the connections back to the pool so that it
+ * can reused by another query
+ */
+public class HiveJdbcClientPool implements Closeable {
+
+  private static final Logger LOG = LoggerFactory.getLogger(ImpalaJdbcClient.class);
+  private static AtomicInteger clientIdGenerator = new AtomicInteger(0);
+
+  private final static String HIVE_SERVER2_DRIVER_NAME =
+      "org.apache.hive.jdbc.HiveDriver";
+  private final int poolSize_;
+  private final BlockingQueue<HiveJdbcClient> freeClients_;
+  private final long timeoutInSeconds_;
+  private final static int DEFAULT_PORT_NUMBER = 11050;
+
+  public class HiveJdbcClient implements AutoCloseable {
+
+    private final Connection conn_;
+    private Statement stmt_;
+    private final int clientId;
+
+    private HiveJdbcClient(String connString) throws SQLException {
+      conn_ = DriverManager.getConnection(connString);
+      stmt_ = conn_.createStatement();
+      clientId = clientIdGenerator.getAndIncrement();
+    }
+
+    public int getClientId() { return clientId; }
+
+    @Override
+    public void close() throws SQLException {
+      if (stmt_ != null) {
+        stmt_.close();
+      }
+      freeClients_.add(this);
+    }
+
+    private void validateConnection() throws SQLException {
+      Preconditions.checkNotNull(conn_,"Connection not initialized.");
+      Preconditions.checkState(!conn_.isClosed(), "Connection is not open");
+      Preconditions.checkNotNull(stmt_);
+
+      // Re-open if the statement if it has been closed.
+      if (stmt_.isClosed()) {
+        stmt_ = conn_.createStatement();
+      }
+    }
+
+    /*
+     * Executes the given query and returns the ResultSet. Will re-open the Statement
+     * if needed.
+     */
+    public ResultSet execQuery(String query) throws SQLException {
+      validateConnection();
+      LOG.debug("Executing: " + query);
+      return stmt_.executeQuery(query);
+    }
+
+    /**
+     * Executes a given query and returns true if the query is successful
+     */
+    public boolean executeSql(String sql) throws SQLException {
+      validateConnection();
+      LOG.debug("Executing sql : " + sql);
+      return stmt_.execute(sql);
+    }
+  }
+
+  public HiveJdbcClient getClient() throws TimeoutException, InterruptedException {
+    try {
+      HiveJdbcClient client = freeClients_.poll(timeoutInSeconds_, TimeUnit.SECONDS);
+      if (client == null) {
+        throw new TimeoutException("Timed out while waiting to get a "
+            + "new client. Consider increasing the pool size");
+      }
+      return client;
+    } catch (InterruptedException e) {
+      LOG.error("Interrupted while waiting to a Hive JDBC client", e);
+      throw e;
+    }
+  }
+
+  private HiveJdbcClientPool(int poolsize, long timeoutInSeconds)
+      throws ClassNotFoundException, SQLException {
+    Preconditions.checkArgument(poolsize > 0);
+    this.poolSize_ = poolsize;
+    this.timeoutInSeconds_ = timeoutInSeconds;
+    this.freeClients_ = new LinkedBlockingQueue<>(poolsize);
+    LOG.info("Using JDBC Driver Name: " + HIVE_SERVER2_DRIVER_NAME);
+    // Make sure the driver can be found, throws a ClassNotFoundException if
+    // it is not available.
+    Class.forName(HIVE_SERVER2_DRIVER_NAME);
+    String connString = String.format(TestUtils.HS2_CONNECTION_TEMPLATE,
+        DEFAULT_PORT_NUMBER,
+        "default");
+    LOG.info("Using connection string: " + connString);
+    for (int i = 0; i < poolSize_; i++) {
+      freeClients_.add(new HiveJdbcClient(connString));
+    }
+  }
+
+  public static synchronized HiveJdbcClientPool create(int poolSize)
+      throws SQLException, ClassNotFoundException {
+    return new HiveJdbcClientPool(poolSize, 5 * 60);
+  }
+
+  /*
+   * Closes the internal Statement and Connection objects. If they are already closed
+   * this is a no-op.
+   */
+  @Override
+  public void close() {
+    int closedCount = poolSize_;
+    while (closedCount > 0) {
+      try {
+        HiveJdbcClient client = freeClients_.poll(5 * 60, TimeUnit.SECONDS);
+        if (client.stmt_ != null) { client.stmt_.close(); }
+        if (client.conn_ != null) { client.conn_.close(); }
+        closedCount--;
+      } catch (Exception e) {
+        throw new RuntimeException(e);
+      }
+    }
+  }
+}
diff --git a/fe/src/test/java/org/apache/impala/testutil/ImpalaJdbcClient.java b/fe/src/test/java/org/apache/impala/testutil/ImpalaJdbcClient.java
index c52399a..7cd076c 100644
--- a/fe/src/test/java/org/apache/impala/testutil/ImpalaJdbcClient.java
+++ b/fe/src/test/java/org/apache/impala/testutil/ImpalaJdbcClient.java
@@ -45,12 +45,6 @@ import com.google.common.collect.Lists;
 public class ImpalaJdbcClient {
   private static final Logger LOG = Logger.getLogger(ImpalaJdbcClient.class);
 
-  // Note: The older Hive Server JDBC driver (Hive .9 and earlier) is named similarly:
-  // "org.apache.hadoop.hive.jdbc.HiveDriver". However, Impala currently only supports
-  // the Hive Server 2 JDBC driver (Hive .10 and later).
-  private final static String HIVE_SERVER2_DRIVER_NAME =
-      "org.apache.hive.jdbc.HiveDriver";
-
   // Hive uses simple SASL by default. The auth configuration 'none' (both for the client
   // and the server) correspond to using simple SASL.
   private final static String SASL_AUTH_SPEC = ";auth=none";
@@ -67,11 +61,6 @@ public class ImpalaJdbcClient {
   private final static int HS2_BINARY_PORT = 21050;
   private final static int HS2_HTTP_PORT = 28000;
 
-  // The default connection string template to connect to localhost on a given port
-  // number.
-  private final static String DEFAULT_CONNECTION_TEMPLATE =
-      "jdbc:hive2://localhost:%d/default";
-
   private final String driverName_;
   private final String connString_;
   private Connection conn_;
@@ -150,15 +139,17 @@ public class ImpalaJdbcClient {
   }
 
   public static ImpalaJdbcClient createClientUsingHiveJdbcDriver() {
-    return createClient(HIVE_SERVER2_DRIVER_NAME, getNoAuthConnectionStr("binary"));
+    return createClient(TestUtils.HIVE_SERVER2_DRIVER_NAME,
+        getNoAuthConnectionStr("binary"));
   }
 
   public static ImpalaJdbcClient createClientUsingHiveJdbcDriver(String connString) {
-    return createClient(HIVE_SERVER2_DRIVER_NAME, connString);
+    return createClient(TestUtils.HIVE_SERVER2_DRIVER_NAME, connString);
   }
 
   public static ImpalaJdbcClient createHttpClientUsingHiveJdbcDriver() {
-    return createClient(HIVE_SERVER2_DRIVER_NAME, getNoAuthConnectionStr("http"));
+    return createClient(TestUtils.HIVE_SERVER2_DRIVER_NAME,
+        getNoAuthConnectionStr("http"));
   }
 
   public static String getNoAuthConnectionStr(String connType) {
@@ -171,12 +162,12 @@ public class ImpalaJdbcClient {
   }
 
   private static String getConnectionStr(String connType, String authStr) {
-    String connString = DEFAULT_CONNECTION_TEMPLATE + authStr;
+    String connString = TestUtils.HS2_CONNECTION_TEMPLATE + authStr;
     if (connType == "binary") {
-      return String.format(connString, HS2_BINARY_PORT);
+      return String.format(connString, HS2_BINARY_PORT, "default");
     } else {
       Preconditions.checkState(connType == "http");
-      return String.format(connString + HTTP_TRANSPORT_SPEC, HS2_HTTP_PORT);
+      return String.format(connString + HTTP_TRANSPORT_SPEC, HS2_HTTP_PORT, "default");
     }
   }
 
@@ -260,7 +251,7 @@ public class ImpalaJdbcClient {
       System.exit(1);
     }
 
-    String driver = cmdArgs.getOptionValue("d", HIVE_SERVER2_DRIVER_NAME);
+    String driver = cmdArgs.getOptionValue("d", TestUtils.HIVE_SERVER2_DRIVER_NAME);
 
     return new ClientExecOptions(connStr, query, driver);
   }
diff --git a/fe/src/test/java/org/apache/impala/testutil/TestUtils.java b/fe/src/test/java/org/apache/impala/testutil/TestUtils.java
index 3849b2e..ae0e0b2 100644
--- a/fe/src/test/java/org/apache/impala/testutil/TestUtils.java
+++ b/fe/src/test/java/org/apache/impala/testutil/TestUtils.java
@@ -43,6 +43,7 @@ import javax.json.JsonWriter;
 import javax.json.JsonWriterFactory;
 import javax.json.stream.JsonGenerator;
 
+import org.apache.commons.lang3.RandomStringUtils;
 import org.apache.impala.catalog.Catalog;
 import org.apache.impala.common.PrintUtils;
 import org.apache.impala.common.RuntimeEnv;
@@ -70,6 +71,14 @@ public class TestUtils {
   private final static String NUMBER_REGEX = "\\d+(\\.\\d+)?";
   private final static String BYTE_SUFFIX_REGEX = "[KMGT]?B";
   private final static String BYTE_VALUE_REGEX = NUMBER_REGEX + BYTE_SUFFIX_REGEX;
+  // Note: The older Hive Server JDBC driver (Hive .9 and earlier) is named similarly:
+  // "org.apache.hadoop.hive.jdbc.HiveDriver". However, Impala currently only supports
+  // the Hive Server 2 JDBC driver (Hive .10 and later).
+  final static String HIVE_SERVER2_DRIVER_NAME =
+      "org.apache.hive.jdbc.HiveDriver";
+  // The default connection string template to connect to localhost on a given port
+  // number.
+  final static String HS2_CONNECTION_TEMPLATE = "jdbc:hive2://localhost:%d/default";
 
   public interface ResultFilter {
     public boolean matches(String input);
@@ -429,4 +438,11 @@ public class TestUtils {
     }
     return null;
   }
+
+  /**
+   * Returns a random alphanumeric string of given length
+   */
+  public static String getRandomString(int size) {
+    return RandomStringUtils.randomAlphanumeric(size);
+  }
 }
diff --git a/fe/src/test/java/org/apache/impala/util/RandomHiveQueryRunner.java b/fe/src/test/java/org/apache/impala/util/RandomHiveQueryRunner.java
new file mode 100644
index 0000000..1e6f91c
--- /dev/null
+++ b/fe/src/test/java/org/apache/impala/util/RandomHiveQueryRunner.java
@@ -0,0 +1,1169 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.impala.util;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Range;
+import com.google.common.collect.RangeMap;
+import com.google.common.collect.TreeRangeMap;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+import java.util.Random;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.impala.compat.MetastoreShim;
+import org.apache.impala.testutil.HiveJdbcClientPool;
+import org.apache.impala.testutil.HiveJdbcClientPool.HiveJdbcClient;
+import org.apache.impala.testutil.TestUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class can be used to run a lot of hive queries in parallel to generate stress on
+ * Catalog. Its uses a simple Hive JDBC pool to submit queries in parallel which is much
+ * faster than the python end-to-end tests which submit each hive query over a new
+ * connection from Beeline shell. Each Hive query which is executed is idempotent. It
+ * creates the necessary dependent metadata before actually executing. Currently it
+ * supports metadata modification hive queries which include create/alter/drop of
+ * database/table/partition and dynamic partition insert with overwrite
+ */
+public class RandomHiveQueryRunner {
+
+  private final Random random_;
+  private final String tblNamePrefix_;
+  private final String dbNamePrefix_;
+  // number of hive clients to concurrently execute random hive queries
+  private final int numClients_;
+  // number of queries per hive client
+  private final int numQueriesPerClient_;
+  // used to generate unique partition names for each client, also used to get
+  // partition names to be dropped in case of drop partition queries
+  private final AtomicInteger[] partitionIdGenerators_;
+  private final ExecutorService executorService_;
+  // Hive JDBC connection pool
+  private final HiveJdbcClientPool hiveJdbcClientPool_;
+  // Futures to keep track if any query errored out or not
+  private ArrayList<Future<Void>> futures_;
+  private final AtomicBoolean isStarted_ = new AtomicBoolean(false);
+  private static final Logger LOG =
+      LoggerFactory.getLogger(RandomHiveQueryRunner.class);
+  // total weight of all the queryType defined. Used for calculating the bound for the
+  // random number generator for determining the next random query type
+  private int totalQueryWeight_;
+  // maps a given number to a queryType using the weight distribution of the queryTypes
+  private final RangeMap<Integer, QueryType> rangeMap = TreeRangeMap.create();
+
+  private final List<QueryType> skippedQueryTypes;
+
+  // we need to add small delay between the start of each query to work-around
+  // MAPREDUCE-6441
+  private final Object delayLock_ = new Object();
+  /**
+   * Query type with weight. The weight of a QueryType determines the probability of its
+   * occurrence by the Random Query runner. Higher the weight, more the probability of its
+   * occurrence. This is needed to simulate the behavior in practice where not all the
+   * query-types are equally likely to occur. Generally, partition level metadata updates
+   * are most common, then table level and then database level. The weights used below are
+   * not based off any real-world data but follow reasonable values based on what a
+   * typical ETL workload looks like.
+   */
+  enum QueryType {
+    CREATE_DB(5),
+    DROP_DB(5),
+    CREATE_TABLE(10),
+    CREATE_TABLE_AS_SELECT(10),
+    DROP_TABLE(10),
+    ALTER_TABLE_ADD_COL(5),
+    ALTER_TABLE_ADD_PROPERTY(15),
+    ADD_PARTITION(25),
+    DROP_PARTITION(25),
+    DYN_PARTITION_INSERT(30),
+    DYN_PARTITION_INSERT_OVERWRITE(20),
+    INSERT_PARTITION(20),
+    INSERT_OVERWRITE_PARTITION(15),
+    INSERT_TABLE(25),
+    INSERT_OVERWRITE_TABLE(20),
+    // set config is not interesting since it does not change any metadata and it is
+    // not randomly executed anyways. keeping the weight to 0
+    SET_CONFIG(0);
+
+    private final int weight_;
+
+    QueryType(int weight) {
+      Preconditions.checkArgument(weight >= 0);
+      this.weight_ = weight;
+    }
+  }
+
+  /**
+   * Base class for all the TestHiveQueries. Each test hive query may have one or more
+   * dependent queries needed for setting it up. For example, add partition query
+   * depends on the table and the database to exist in order to succeed. Each query runs
+   * the dependent queries before running the actual query
+   *
+   * All the TestHiveQueries must be idempotent to make sure there are no issues when
+   * running them in a random order.
+   */
+  private static abstract class TestHiveQuery {
+
+    private List<TestHiveQuery> dependentQueries_ = new ArrayList<>();
+
+    protected abstract String getQuery();
+
+    protected final QueryType queryType_;
+
+    TestHiveQuery(QueryType queryType) {
+      this.queryType_ = queryType;
+    }
+
+    private void runInternal(HiveJdbcClient client) throws SQLException {
+      for (TestHiveQuery dependentQuery : dependentQueries_) {
+        dependentQuery.runInternal(client);
+      }
+      client.executeSql(getQuery());
+    }
+
+    /**
+     * Executes this query by using one of the Hive client from the given pool. All the
+     * dependent queries along with this query will use the same hive client.
+     */
+    public void run(HiveJdbcClientPool pool) throws Exception {
+      try (HiveJdbcClient hiveJdbcClient = pool.getClient()) {
+        runInternal(hiveJdbcClient);
+      } catch (Exception e) {
+        LOG.error("Unexpected error received while running the hive query", e);
+        throw e;
+      }
+    }
+
+    /**
+     * Adds to the list of dependent queries it is it not already present in it
+     */
+    protected void addDependentQuery(TestHiveQuery dependentQuery) {
+      if (!dependentQueries_.contains(dependentQuery)) {
+        dependentQueries_.add(dependentQuery);
+      }
+    }
+
+    @Override
+    public String toString() {
+      StringBuilder sb = new StringBuilder("\n" + getQuery());
+      for (int i = 0; i < dependentQueries_.size(); i++) {
+        sb.append("\n   " + dependentQueries_.get(i).getQuery());
+      }
+      return sb.toString();
+    }
+  }
+
+  /**
+   * Create database [if not exists] query
+   */
+  private static class CreateDbQuery extends TestHiveQuery {
+
+    private final String dbName_;
+    private final boolean ifNotExists_;
+
+    private CreateDbQuery(String dbName, boolean ifNotExists) {
+      super(QueryType.CREATE_DB);
+      dbName_ = Preconditions.checkNotNull(dbName);
+      ifNotExists_ = ifNotExists;
+    }
+
+    static CreateDbQuery create(String dbName_) {
+      CreateDbQuery query = new CreateDbQuery(dbName_, false);
+      // if the user did not provide if not exists clause, the dependent query must
+      // drop an existing db with the same name to make sure that this query succeeds
+      query.addDependentQuery(new DropDbQuery(dbName_, true, true));
+      return query;
+    }
+
+    @Override
+    public String getQuery() {
+      StringBuilder sb = new StringBuilder("create database ");
+      if (ifNotExists_) { sb.append("if not exists "); }
+      sb.append(dbName_);
+      return sb.toString();
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) { return true; }
+      if (o == null || getClass() != o.getClass()) { return false; }
+      CreateDbQuery that = (CreateDbQuery) o;
+      return ifNotExists_ == that.ifNotExists_ &&
+          Objects.equals(dbName_, that.dbName_);
+    }
+
+    @Override
+    public int hashCode() {
+      return Objects.hash(dbName_, ifNotExists_);
+    }
+  }
+
+  /**
+   * Drop database [if exists] [cascade] hive query
+   */
+  private static class DropDbQuery extends TestHiveQuery {
+
+    private final String dbName_;
+    private final boolean ifExists_;
+    private final boolean cascade_;
+
+    private DropDbQuery(String dbName, boolean ifExists, boolean cascade) {
+      super(QueryType.DROP_DB);
+      this.dbName_ = Preconditions.checkNotNull(dbName);
+      this.ifExists_ = ifExists;
+      this.cascade_ = cascade;
+    }
+
+    static DropDbQuery create(String dbName) {
+      DropDbQuery query = new DropDbQuery(dbName, false, true);
+      // add a dependent query to create the db if it does not
+      // exist already to make sure that this query is successful
+      query.addDependentQuery(new CreateDbQuery(dbName, true));
+      return query;
+    }
+
+    @Override
+    public String getQuery() {
+      StringBuilder sb = new StringBuilder("drop database ");
+      if (ifExists_) { sb.append("if exists "); }
+      sb.append(dbName_);
+      if (cascade_) { sb.append(" cascade"); }
+      return sb.toString();
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) { return true; }
+      if (o == null || getClass() != o.getClass()) { return false; }
+      DropDbQuery that = (DropDbQuery) o;
+      return ifExists_ == that.ifExists_ &&
+          cascade_ == that.cascade_ &&
+          Objects.equals(dbName_, that.dbName_);
+    }
+
+    @Override
+    public int hashCode() {
+      return Objects.hash(dbName_, ifExists_, cascade_);
+    }
+  }
+
+  /**
+   * Creates a non-partitioned table using the following pattern using Hive Create table
+   * <tablename> [if not exists]
+   */
+  private static class CreateTblQuery extends TestHiveQuery {
+
+    private final String tblName_;
+    private final String dbName_;
+    private final String tblSpec_;
+    private final boolean ifNotExists_;
+
+    private CreateTblQuery(String dbName, String tblName, String tblSpec,
+        boolean ifNotExists) {
+      super(QueryType.CREATE_TABLE);
+      this.dbName_ = Preconditions.checkNotNull(dbName);
+      this.tblName_ = Preconditions.checkNotNull(tblName);
+      this.tblSpec_ = Preconditions.checkNotNull(tblSpec);
+      this.ifNotExists_ = ifNotExists;
+    }
+
+    static CreateTblQuery create(String dbName, String tblName, String tblSpec) {
+      CreateTblQuery query = new CreateTblQuery(dbName, tblName, tblSpec, false);
+      // in order for this query to succeed, the database must exists
+      query.addDependentQuery(new CreateDbQuery(dbName, true));
+      // if not exists clause is not provided, add a optional drop tbl if exists to
+      // make sure that this query succeeds
+      query.addDependentQuery(new DropTblQuery(dbName, tblName, true));
+      return query;
+    }
+
+    @Override
+    protected String getQuery() {
+      StringBuilder sb = new StringBuilder("create table ");
+      if (ifNotExists_) { sb.append("if not exists "); }
+      sb.append(String.format("%s.%s ", dbName_, tblName_));
+      sb.append("(" + tblSpec_ + ")");
+      return sb.toString();
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) { return true; }
+      if (o == null || getClass() != o.getClass()) { return false; }
+      CreateTblQuery that = (CreateTblQuery) o;
+      return ifNotExists_ == that.ifNotExists_ &&
+          tblName_.equals(that.tblName_) &&
+          dbName_.equals(that.dbName_) &&
+          tblSpec_.equals(that.tblSpec_);
+    }
+
+    @Override
+    public int hashCode() {
+      return Objects.hash(tblName_, dbName_, tblSpec_, ifNotExists_);
+    }
+  }
+
+  /**
+   * Same as CreateTblQuery but creates a partitioned table instead. Random query running
+   * provides a built-in way to generate valid partition names of the pattern
+   * (part=<int>). If this built-in pattern is not used, callers must make sure that the
+   * partitionSpec is valid for subsequent queries on this table
+   */
+  private static class CreatePartitionedTblQuery extends CreateTblQuery {
+
+    private final String partitionSpec_;
+
+    private CreatePartitionedTblQuery(String dbName, String tblName, String tblSpec,
+        String partitionSpec, boolean ifNotExists) {
+      super(dbName, tblName, tblSpec, ifNotExists);
+      this.partitionSpec_ = partitionSpec;
+    }
+
+    static CreatePartitionedTblQuery create(String dbName, String tblName,
+        String tblSpec, String partitionSpec) {
+      CreatePartitionedTblQuery query = new CreatePartitionedTblQuery(dbName, tblName,
+          tblSpec, partitionSpec, false);
+      // in order for this query to succeed, the database must exist
+      query.addDependentQuery(new CreateDbQuery(dbName, true));
+      // if not exists clause is not provided, add a optional drop tbl if exists to
+      // make sure that this query succeeds
+      query.addDependentQuery(new DropTblQuery(dbName, tblName, true));
+      return query;
+    }
+
+    @Override
+    protected String getQuery() {
+      return super.getQuery() + " partitioned by (" + partitionSpec_ + ")";
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) { return true; }
+      if (o == null || getClass() != o.getClass()) { return false; }
+      if (!super.equals(o)) { return false; }
+      CreatePartitionedTblQuery that = (CreatePartitionedTblQuery) o;
+      return Objects.equals(partitionSpec_, that.partitionSpec_);
+    }
+
+    @Override
+    public int hashCode() {
+      return Objects.hash(super.hashCode(), partitionSpec_);
+    }
+  }
+
+  private static class CreateTblAsSelect extends TestHiveQuery {
+
+    private final String tblName_;
+    private final String dbName_;
+    private final String srcDbName_;
+    private final String srcTblName_;
+    private final boolean ifNotExists_;
+
+    private CreateTblAsSelect(String dbName, String tblName,
+        String srcDbName, String srcTblName,
+        boolean ifNotExists) {
+      super(QueryType.CREATE_TABLE_AS_SELECT);
+      this.dbName_ = Preconditions.checkNotNull(dbName);
+      this.tblName_ = Preconditions.checkNotNull(tblName);
+      this.srcDbName_ = Preconditions.checkNotNull(srcDbName);
+      this.srcTblName_ = Preconditions.checkNotNull(srcTblName);
+      this.ifNotExists_ = ifNotExists;
+    }
+
+    static CreateTblAsSelect create(String dbName, String tblName, String srcDbName,
+        String srcTblName) {
+      CreateTblAsSelect query = new CreateTblAsSelect(dbName, tblName, srcDbName,
+          srcTblName, false);
+      // in order for this query to succeed, the database must exists
+      query.addDependentQuery(new CreateDbQuery(dbName, true));
+      // add a optional drop tbl if exists to
+      // make sure that this query succeeds
+      query.addDependentQuery(new DropTblQuery(dbName, tblName, true));
+      return query;
+    }
+
+    @Override
+    protected String getQuery() {
+      StringBuilder sb = new StringBuilder("create table ");
+      if (ifNotExists_) { sb.append("if not exists "); }
+      sb.append(String.format("%s.%s ", dbName_, tblName_));
+      sb.append(" like ");
+      sb.append(String.format(" %s.%s ", srcDbName_, srcTblName_));
+      return sb.toString();
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) { return true; }
+      if (o == null || getClass() != o.getClass()) { return false; }
+      CreateTblAsSelect that = (CreateTblAsSelect) o;
+      return ifNotExists_ == that.ifNotExists_ &&
+          tblName_.equals(that.tblName_) &&
+          dbName_.equals(that.dbName_) &&
+          srcDbName_.equals(that.srcDbName_) &&
+          srcTblName_.equals(that.srcTblName_);
+    }
+
+    @Override
+    public int hashCode() {
+      return Objects.hash(tblName_, dbName_, srcDbName_, srcTblName_, ifNotExists_);
+    }
+  }
+
+  /**
+   * Drop table [if exists]
+   */
+  private static class DropTblQuery extends TestHiveQuery {
+
+    private final String tblName_;
+    private final String dbName_;
+    private static final String dummyTblSpecForDepQueries_ = "c1 int";
+    private final boolean ifExists_;
+
+    private DropTblQuery(String dbName, String tblName, boolean ifExists) {
+      super(QueryType.DROP_TABLE);
+      this.dbName_ = Preconditions.checkNotNull(dbName);
+      this.tblName_ = Preconditions.checkNotNull(tblName);
+      this.ifExists_ = ifExists;
+    }
+
+    static DropTblQuery create(String dbName, String tblName) {
+      DropTblQuery query = new DropTblQuery(dbName, tblName, false);
+      query.addDependentQuery(new CreateDbQuery(dbName, true));
+      query.addDependentQuery(new CreateTblQuery(dbName, tblName,
+          dummyTblSpecForDepQueries_, true));
+      return query;
+    }
+
+    @Override
+    protected String getQuery() {
+      StringBuilder sb = new StringBuilder("drop table ");
+      if (ifExists_) { sb.append("if exists "); }
+      sb.append(String.format("%s.%s ", dbName_, tblName_));
+      return sb.toString();
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) { return true; }
+      if (o == null || getClass() != o.getClass()) { return false; }
+      DropTblQuery that = (DropTblQuery) o;
+      return ifExists_ == that.ifExists_ &&
+          tblName_.equals(that.tblName_) &&
+          dbName_.equals(that.dbName_) &&
+          dummyTblSpecForDepQueries_.equals(that.dummyTblSpecForDepQueries_);
+    }
+
+    @Override
+    public int hashCode() {
+      return Objects.hash(tblName_, dbName_, dummyTblSpecForDepQueries_, ifExists_);
+    }
+  }
+
+  /**
+   * Base class for all the alter table queries
+   */
+  private static abstract class AlterTblQuery extends TestHiveQuery {
+
+    protected final String tblName_;
+    protected final String dbName_;
+    protected final String partitionSpec_;
+
+    private AlterTblQuery(String dbName, String tblName, QueryType queryType) {
+      super(queryType);
+      this.dbName_ = Preconditions.checkNotNull(dbName);
+      this.tblName_ = Preconditions.checkNotNull(tblName);
+      this.partitionSpec_ = null;
+      addDependentQuery(new CreateDbQuery(dbName_, true));
+      addDependentQuery(new CreateTblQuery(dbName_, tblName_,
+          getRandomColName(6) +
+              " string", true));
+    }
+
+    private AlterTblQuery(String dbName, String tblName, String partitionSpec,
+        QueryType queryType) {
+      super(queryType);
+      this.dbName_ = Preconditions.checkNotNull(dbName);
+      this.tblName_ = Preconditions.checkNotNull(tblName);
+      this.partitionSpec_ = partitionSpec;
+    }
+
+    static String getPartitionColFromSpec(String partitionSpec) {
+      Preconditions.checkNotNull(partitionSpec);
+      if (partitionSpec.contains("=")) {
+        // we assume for simplicity that for this test purpose all partitioned tables
+        // have only one partition key
+        String[] parts = partitionSpec.split("=");
+        return parts[0].trim() + " int";
+      }
+      return partitionSpec;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) { return true; }
+      if (!(o instanceof AlterTblQuery)) { return false; }
+      AlterTblQuery that = (AlterTblQuery) o;
+      return tblName_.equals(that.tblName_) &&
+          dbName_.equals(that.dbName_) &&
+          Objects.equals(partitionSpec_, that.partitionSpec_);
+    }
+
+    @Override
+    public int hashCode() {
+      return Objects.hash(tblName_, dbName_, partitionSpec_);
+    }
+  }
+
+  /**
+   * Alter table add col
+   */
+  private static class AlterTblAddColQuery extends AlterTblQuery {
+
+    private final String colSpec_;
+
+    private AlterTblAddColQuery(String dbName, String tblName, String colSpec) {
+      super(dbName, tblName, QueryType.ALTER_TABLE_ADD_COL);
+      this.colSpec_ = Preconditions.checkNotNull(colSpec);
+    }
+
+    static AlterTblAddColQuery create(String dbName, String tblName, String colSpec) {
+      AlterTblAddColQuery query = new AlterTblAddColQuery(dbName, tblName, colSpec);
+      query.addDependentQuery(new CreateDbQuery(dbName, true));
+      query.addDependentQuery(new CreateTblQuery(dbName, tblName,
+          getRandomColName(6) +
+              " string", true));
+      return query;
+    }
+
+    @Override
+    protected String getQuery() {
+      return String.format("alter table %s.%s add columns (%s)", dbName_, tblName_,
+          colSpec_);
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) { return true; }
+      if (o == null || getClass() != o.getClass()) { return false; }
+      if (!super.equals(o)) { return false; }
+      AlterTblAddColQuery that = (AlterTblAddColQuery) o;
+      return Objects.equals(colSpec_, that.colSpec_);
+    }
+
+    @Override
+    public int hashCode() {
+      return Objects.hash(super.hashCode(), colSpec_);
+    }
+  }
+
+  private static class AlterTblAddProperty extends AlterTblQuery {
+
+    private final String key_;
+    private final String value_;
+
+    private AlterTblAddProperty(String dbName, String tblName, String key, String val) {
+      super(dbName, tblName, QueryType.ALTER_TABLE_ADD_PROPERTY);
+      this.key_ = Preconditions.checkNotNull(key);
+      this.value_ = Preconditions.checkNotNull(val);
+    }
+
+    static AlterTblAddProperty create(String dbName, String tblName, String key,
+        String val) {
+      AlterTblAddProperty query = new AlterTblAddProperty(dbName, tblName, key, val);
+      query.addDependentQuery(new CreateDbQuery(dbName, true));
+      query.addDependentQuery(new CreateTblQuery(dbName, tblName,
+          getRandomColName(6) +
+              " string", true));
+      return query;
+    }
+
+    @Override
+    protected String getQuery() {
+      return String.format("alter table %s.%s set tblproperties ('%s'='%s')", dbName_,
+          tblName_, key_, value_);
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) { return true; }
+      if (o == null || getClass() != o.getClass()) { return false; }
+      if (!super.equals(o)) { return false; }
+      AlterTblAddProperty that = (AlterTblAddProperty) o;
+      return key_.equals(that.key_) &&
+          value_.equals(that.value_);
+    }
+
+    @Override
+    public int hashCode() {
+      return Objects.hash(super.hashCode(), key_, value_);
+    }
+  }
+
+  private static class AlterTableAddPartition extends AlterTblQuery {
+
+    private final boolean ifNotExists_;
+
+    private AlterTableAddPartition(String dbName, String tblName, String partitionSpec,
+        boolean ifNotExists) {
+      super(dbName, tblName, partitionSpec, QueryType.ADD_PARTITION);
+      ifNotExists_ = ifNotExists;
+    }
+
+    static AlterTableAddPartition create(String dbName, String tblName,
+        String partitionSpec) {
+      Preconditions.checkNotNull(partitionSpec);
+      AlterTableAddPartition alterTableAddPartition = new AlterTableAddPartition(dbName
+          , tblName, partitionSpec, false);
+      alterTableAddPartition.addDependentQuery(new CreateDbQuery(dbName, true));
+      alterTableAddPartition.addDependentQuery(new CreatePartitionedTblQuery(dbName,
+          tblName, getRandomColName(6) + " string",
+          getPartitionColFromSpec(partitionSpec), true));
+      alterTableAddPartition.addDependentQuery(new AlterTableDropPartition(dbName,
+          tblName, partitionSpec, true));
+      return alterTableAddPartition;
+    }
+
+    @Override
+    protected String getQuery() {
+      StringBuilder sb = new StringBuilder();
+      sb.append(String.format("alter table %s.%s add ", dbName_, tblName_));
+      if (ifNotExists_) { sb.append("if not exists "); }
+      sb.append(String.format(" partition (%s)", partitionSpec_));
+      return sb.toString();
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) { return true; }
+      if (o == null || getClass() != o.getClass()) { return false; }
+      if (!super.equals(o)) { return false; }
+      AlterTableAddPartition that = (AlterTableAddPartition) o;
+      return ifNotExists_ == that.ifNotExists_;
+    }
+
+    @Override
+    public int hashCode() {
+      return Objects.hash(super.hashCode(), ifNotExists_);
+    }
+  }
+
+  private static class AlterTableDropPartition extends AlterTblQuery {
+
+    private final boolean ifExists_;
+
+    private AlterTableDropPartition(String dbName, String tblName, String partitionSpec
+        , boolean ifExists) {
+      super(dbName, tblName, partitionSpec, QueryType.DROP_PARTITION);
+      ifExists_ = ifExists;
+    }
+
+    static AlterTableDropPartition create(String dbName, String tblName,
+        String partitionSpec) {
+      Preconditions.checkNotNull(partitionSpec);
+      AlterTableDropPartition query = new AlterTableDropPartition(dbName, tblName,
+          partitionSpec, false);
+      query.addDependentQuery(new CreateDbQuery(dbName, true));
+      query.addDependentQuery(new CreatePartitionedTblQuery(dbName,
+          tblName, getRandomColName(6) + " string",
+          getPartitionColFromSpec(partitionSpec), true));
+      query.addDependentQuery(new AlterTableAddPartition(dbName, tblName,
+          partitionSpec, true));
+      return query;
+    }
+
+    @Override
+    protected String getQuery() {
+      StringBuilder sb = new StringBuilder(String.format("alter table %s.%s drop ",
+          dbName_, tblName_));
+      if (ifExists_) { sb.append(" if exists "); }
+      sb.append(String.format(" partition (%s)", partitionSpec_));
+      return sb.toString();
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) { return true; }
+      if (o == null || getClass() != o.getClass()) { return false; }
+      if (!super.equals(o)) { return false; }
+      AlterTableDropPartition that = (AlterTableDropPartition) o;
+      return ifExists_ == that.ifExists_;
+    }
+
+    @Override
+    public int hashCode() {
+      return Objects.hash(super.hashCode(), ifExists_);
+    }
+  }
+
+
+  /**
+   * Hive's set <key> = <value> statement needed as a precondition for some other queries
+   * like dynamic partition inserts
+   */
+  private static class SetConfigStmt extends TestHiveQuery {
+
+    private final String key_;
+    private final String val_;
+
+    private SetConfigStmt(String key, String value) {
+      super(QueryType.SET_CONFIG);
+      this.key_ = Preconditions.checkNotNull(key);
+      this.val_ = Preconditions.checkNotNull(value);
+    }
+
+    @Override
+    protected String getQuery() {
+      return String.format("set %s = %s", key_, val_);
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) { return true; }
+      if (o == null || getClass() != o.getClass()) { return false; }
+      SetConfigStmt that = (SetConfigStmt) o;
+      return key_.equals(that.key_) &&
+          val_.equals(that.val_);
+    }
+
+    @Override
+    public int hashCode() {
+      return Objects.hash(key_, val_);
+    }
+  }
+
+  private abstract static class SourceTableBasedQuery extends TestHiveQuery {
+
+    protected final String tblName_;
+    protected final String dbName_;
+    protected final String srcDbName_;
+    protected final String srcTblName_;
+
+    private SourceTableBasedQuery(String dbName, String tblName, String srcDbName,
+        String srcTblName, QueryType queryType) {
+      super(queryType);
+      this.dbName_ = Preconditions.checkNotNull(dbName);
+      this.tblName_ = Preconditions.checkNotNull(tblName);
+      this.srcDbName_ = Preconditions.checkNotNull(srcDbName);
+      this.srcTblName_ = Preconditions.checkNotNull(srcTblName);
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) { return true; }
+      if (o == null || getClass() != o.getClass()) { return false; }
+      SourceTableBasedQuery that = (SourceTableBasedQuery) o;
+      return tblName_.equals(that.tblName_) &&
+          dbName_.equals(that.dbName_) &&
+          srcDbName_.equals(that.srcDbName_) &&
+          srcTblName_.equals(that.srcTblName_);
+    }
+
+    @Override
+    public int hashCode() {
+      return Objects.hash(tblName_, dbName_, srcDbName_, srcTblName_);
+    }
+  }
+
+  private static class InsertTblOrPartition extends SourceTableBasedQuery {
+
+    private final boolean overwrite_;
+    private final String partitionSpec_;
+    private final int limit_;
+
+    private InsertTblOrPartition(String dbName, String tblName, String srcDbName,
+        String srcTblName, boolean overwrite, String partitionSpec, int limit) {
+      super(dbName, tblName, srcDbName, srcTblName,
+          getQueryType(overwrite, partitionSpec));
+      this.overwrite_ = overwrite;
+      // if partitionSpec is null its is insert into table query
+      this.partitionSpec_ = partitionSpec;
+      Preconditions.checkArgument(limit > 0);
+      this.limit_ = limit;
+    }
+
+    private static QueryType getQueryType(boolean overwrite, String partitionSpec) {
+      if (partitionSpec != null) {
+        return overwrite ? QueryType.INSERT_OVERWRITE_PARTITION :
+            QueryType.INSERT_PARTITION;
+      } else {
+        return overwrite ? QueryType.INSERT_OVERWRITE_TABLE :
+            QueryType.INSERT_TABLE;
+      }
+    }
+
+    static InsertTblOrPartition create(String dbName, String tblName, String srcDbName,
+        String srcTblName, boolean overwrite, String partitionSpec, int limit) {
+      InsertTblOrPartition query = new InsertTblOrPartition(dbName, tblName, srcDbName,
+          srcTblName, overwrite, partitionSpec, limit);
+      // in order for this query to succeed, the database must exists
+      query.addDependentQuery(new CreateDbQuery(dbName, true));
+      // add a optional drop tbl if exists to
+      // make sure that this query succeeds
+      query.addDependentQuery(new DropTblQuery(dbName, tblName, true));
+      // makes sure that the table exists before you run the insert
+      query.addDependentQuery(new CreateTblAsSelect(dbName, tblName, srcDbName,
+          srcTblName, true));
+      query.addDependentQuery(new SetConfigStmt("hive.exec.dynamic.partition.mode",
+          "nonstrict"));
+      query.addDependentQuery(new SetConfigStmt("hive.exec.max.dynamic.partitions",
+          "10000"));
+      query.addDependentQuery(new SetConfigStmt("hive.exec.max.dynamic.partitions"
+          + ".pernode", "10000"));
+      // in CDP builds where tez is the default execution engine on hive, running
+      // many hive queries in parallel is slow because application master is not
+      // released until the session is closed. This timeout value will close the
+      // tez application if no new query is submitted even if the session is not
+      // closed, thereby releasing resources faster
+      if (MetastoreShim.getMajorVersion() >= 3) {
+        query.addDependentQuery(
+            new SetConfigStmt("tez.session.am.dag.submit.timeout.secs", "2"));
+      }
+      return query;
+    }
+
+    @Override
+    protected String getQuery() {
+      StringBuilder sb = new StringBuilder("insert ");
+      if (overwrite_) {
+        sb.append("overwrite table ");
+      } else {
+        sb.append("into table ");
+      }
+      sb.append(String.format("%s.%s ", dbName_, tblName_));
+      if (partitionSpec_ != null) {
+        sb.append(String.format("partition (%s) ", partitionSpec_));
+      }
+      sb.append(String.format("select * from %s.%s limit %s", srcDbName_, srcTblName_,
+          limit_));
+      return sb.toString();
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) { return true; }
+      if (o == null || getClass() != o.getClass()) { return false; }
+      if (!super.equals(o)) { return false; }
+      InsertTblOrPartition that = (InsertTblOrPartition) o;
+      return overwrite_ == that.overwrite_ &&
+          limit_ == that.limit_ &&
+          Objects.equals(partitionSpec_, that.partitionSpec_);
+    }
+
+    @Override
+    public int hashCode() {
+      return Objects.hash(super.hashCode(), overwrite_, partitionSpec_, limit_);
+    }
+  }
+
+  /**
+   * Generates a insert [overwrite] table select * from source_table to dynamically add
+   * the partitions based on the number of partitions from the source table
+   */
+  static class DynamicPartitionInsert extends SourceTableBasedQuery {
+
+    private final String dyPartitionSpec_;
+    private final boolean overwrite_;
+
+    private DynamicPartitionInsert(String dbName, String tblName,
+        String srcDbName, String srcTblName, String dyPartitionSpec, boolean overwrite) {
+      super(dbName, tblName, srcDbName, srcTblName, overwrite ?
+          QueryType.DYN_PARTITION_INSERT_OVERWRITE : QueryType.DYN_PARTITION_INSERT);
+      this.dyPartitionSpec_ = Preconditions.checkNotNull(dyPartitionSpec);
+      this.overwrite_ = overwrite;
+    }
+
+    static DynamicPartitionInsert create(String dbName, String tblName,
+        String srcDbName, String srcTblName, String dyPartitionSpec, boolean overwrite) {
+      DynamicPartitionInsert query = new DynamicPartitionInsert(dbName, tblName,
+          srcDbName, srcTblName, dyPartitionSpec, overwrite);
+      // in order for this query to succeed, the database must exists
+      query.addDependentQuery(new CreateDbQuery(dbName, true));
+      // add a optional drop tbl if exists to
+      // make sure that this query succeeds
+      query.addDependentQuery(new DropTblQuery(dbName, tblName, true));
+      // makes sure that the table exists before you run the insert
+      query.addDependentQuery(new CreateTblAsSelect(dbName, tblName, srcDbName,
+          srcTblName, true));
+      query.addDependentQuery(new SetConfigStmt("hive.exec.dynamic.partition.mode",
+          "nonstrict"));
+      query.addDependentQuery(new SetConfigStmt("hive.exec.max.dynamic.partitions",
+          "10000"));
+      query.addDependentQuery(new SetConfigStmt("hive.exec.max.dynamic.partitions"
+          + ".pernode", "10000"));
+      // in CDP builds where tez is the default execution engine on hive, running
+      // many hive queries in parallel is slow because application master is not
+      // released until the session is closed. This timeout value will close the
+      // tez application if no new query is submitted even if the session is not
+      // closed, thereby releasing resources faster
+      if (MetastoreShim.getMajorVersion() >= 3) {
+        query.addDependentQuery(
+            new SetConfigStmt("tez.session.am.dag.submit.timeout.secs", "2"));
+      }
+      return query;
+    }
+
+    @Override
+    protected String getQuery() {
+      StringBuilder sb = new StringBuilder("insert");
+      if (overwrite_) {
+        sb.append(" overwrite table ");
+      } else {
+        sb.append(" into table ");
+      }
+      sb.append(String.format("%s.%s partition (%s)", dbName_, tblName_,
+          dyPartitionSpec_));
+      sb.append(" select * from ");
+      sb.append(String.format("%s.%s", srcDbName_, srcTblName_));
+      return sb.toString();
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) { return true; }
+      if (o == null || getClass() != o.getClass()) { return false; }
+      if (!super.equals(o)) { return false; }
+      DynamicPartitionInsert that = (DynamicPartitionInsert) o;
+      return overwrite_ == that.overwrite_ &&
+          Objects.equals(dyPartitionSpec_, that.dyPartitionSpec_);
+    }
+
+    @Override
+    public int hashCode() {
+      return Objects.hash(super.hashCode(), dyPartitionSpec_, overwrite_);
+    }
+  }
+
+  public RandomHiveQueryRunner(Random random, String dbNamePrefix,
+      String tblNamePrefix, int numClients, int numQueriesPerClient,
+      List<QueryType> skippedQueryTypes)
+      throws SQLException, ClassNotFoundException {
+    this.random_ = Preconditions.checkNotNull(random);
+    this.numClients_ = numClients;
+    this.numQueriesPerClient_ = numQueriesPerClient;
+    this.partitionIdGenerators_ = new AtomicInteger[numClients];
+    for (int i = 0; i < numClients; i++) {
+      partitionIdGenerators_[i] = new AtomicInteger(0);
+    }
+    this.dbNamePrefix_ = Preconditions.checkNotNull(dbNamePrefix);
+    this.tblNamePrefix_ = Preconditions.checkNotNull(tblNamePrefix);
+    this.executorService_ = Executors.newFixedThreadPool(numClients,
+        new ThreadFactoryBuilder()
+            .setNameFormat("hive-query-executor-%d")
+            .build());
+    hiveJdbcClientPool_ = HiveJdbcClientPool.create(numClients);
+    totalQueryWeight_ = 0;
+    this.skippedQueryTypes = skippedQueryTypes;
+    for (QueryType qt : QueryType.values()) {
+      rangeMap
+          .put(Range.closedOpen(totalQueryWeight_, totalQueryWeight_ + qt.weight_), qt);
+      totalQueryWeight_ += qt.weight_;
+    }
+  }
+
+  private QueryType getNextQueryType() {
+    return rangeMap.get(random_.nextInt(totalQueryWeight_));
+  }
+
+  /**
+   * Gets the next random hive query according to the weight distribution provided to the
+   * QueryTypes
+   */
+  private TestHiveQuery getNext(int clientId) {
+    QueryType type = getNextQueryType();
+    if (skippedQueryTypes != null && skippedQueryTypes.contains(type)) {
+      LOG.info("Skipping this query type {}", type);
+      return null;
+    }
+    String dbName = dbNamePrefix_ + clientId;
+    String tblName = tblNamePrefix_ + clientId;
+    switch (type) {
+      case CREATE_DB:
+        return CreateDbQuery.create(dbName);
+      case DROP_DB:
+        return DropDbQuery.create(dbName);
+      case CREATE_TABLE:
+        return CreateTblQuery.create(dbName, tblName, "c1 int, c2 string");
+      case CREATE_TABLE_AS_SELECT:
+        return CreateTblAsSelect.create(dbName, tblName, "functional", "alltypes");
+      case DROP_TABLE:
+        return DropTblQuery.create(dbName, tblName);
+      case ALTER_TABLE_ADD_COL:
+        return AlterTblAddColQuery.create(dbName, tblName, getRandomColName(6) +
+            " string");
+      case ALTER_TABLE_ADD_PROPERTY:
+        return AlterTblAddProperty.create(dbName, tblName, getRandomColName(6),
+            getRandomColName(6));
+      case ADD_PARTITION:
+        // for partitioned tables use a different tbl name so that it does not
+        // conflict with non-partitioned tab create statements
+        // get the next partition name from the partitionIdGenerator for this client
+        tblName += "_part";
+        return AlterTableAddPartition.create(dbName, tblName,
+            "part=" + partitionIdGenerators_[clientId].getAndIncrement());
+      case DROP_PARTITION:
+        // for partitioned tables use a different tbl name so that it does not
+        // conflict with non-partitioned tab create statements
+        tblName += "_part";
+        // it is possible that this drop partition is being called before any partition
+        // was ever added on this table. Hence partitionIdGenerator for this table could
+        // be 0 which cannot be used for the random.nextInt(bound). Check if the
+        // lastPartitionId is 0 if yes, use a hard-coded bound of 1
+        int lastPartitionId = partitionIdGenerators_[clientId].get();
+        int bound = 1;
+        if (lastPartitionId > 0) {
+          bound = lastPartitionId;
+        }
+        return AlterTableDropPartition.create(dbName, tblName,
+            "part=" + random_.nextInt(bound));
+      case DYN_PARTITION_INSERT:
+        // dynamic partition insert
+        // use a different table name since the partition keys are different than the
+        // built-in ones
+        tblName += "_alltypes_part";
+        return DynamicPartitionInsert.create(dbName, tblName, "functional",
+            "alltypes",
+            "year,month", false);
+      case DYN_PARTITION_INSERT_OVERWRITE:
+        // dynamic partition insert overwrite
+        // use a different table name since the partition keys are different than the
+        // built-in ones
+        tblName += "_alltypes_part";
+        return DynamicPartitionInsert.create(dbName, tblName, "functional", "alltypes",
+            "year,month", true);
+      case INSERT_PARTITION:
+        // insert into partition
+        tblName += "_alltypes_part";
+        return InsertTblOrPartition.create(dbName, tblName, "functional", "alltypes",
+            false,
+            "year,month", 100);
+      case INSERT_OVERWRITE_PARTITION:
+        // insert overwrite into partition
+        tblName += "_alltypes_part";
+        return InsertTblOrPartition.create(dbName, tblName, "functional", "alltypes",
+            true,
+            "year,month", 100);
+      case INSERT_TABLE:
+        // insert into table
+        tblName += "_alltypes_part";
+        return InsertTblOrPartition.create(dbName, tblName, "functional",
+            "alltypesnopart",
+            false,
+            null, 100);
+      case INSERT_OVERWRITE_TABLE:
+        // insert overwrite into table
+        tblName += "_alltypes_part";
+        return InsertTblOrPartition.create(dbName, tblName, "functional",
+            "alltypesnopart",
+            true,
+            null, 100);
+      default:
+        throw new RuntimeException(String.format("Invalid statement type %s", type));
+    }
+  }
+
+  public void start() throws Exception {
+    if (!isStarted_.compareAndSet(false, true)) {
+      throw new Exception("Random hive query generator is already started");
+    }
+    futures_ = new ArrayList<>();
+    for (int i = 0; i < numClients_; i++) {
+      futures_.add(executorService_.submit(() -> {
+        final int clientId =
+            Integer.parseInt(Thread.currentThread().getName().substring(
+                "hive-query-executor-".length()));
+        int queryNumber = 1;
+        while (queryNumber <= numQueriesPerClient_) {
+          TestHiveQuery query = getNext(clientId);
+          if (query == null) {
+            continue;
+          }
+          try {
+            LOG.info("Client {} running hive query set {}: {}", clientId, queryNumber,
+                query);
+            // add a delay between the start of each query to work around MAPREDUCE-6441
+            synchronized (delayLock_) {
+              Thread.sleep(10);
+            }
+            query.run(hiveJdbcClientPool_);
+            queryNumber++;
+          } catch (Exception e) {
+            throw new ExecutionException(String.format("Client %s errored out while "
+                    + "executing query set "
+                    + "%s %s or its dependent queries. Exception message is: %s",
+                clientId, queryNumber, query, e.getMessage()), e);
+          }
+        }
+        return null;
+      }));
+    }
+    executorService_.shutdown();
+  }
+
+  /**
+   * Checks if any of the queries failed during execution. The queryRunner must be in a
+   * terminated state to make sure all the pending tasks are completed
+   *
+   * @throws Exception in case the query runner is interrupted or errors out
+   */
+  public void checkForErrors() throws ExecutionException {
+    Preconditions.checkState(isStarted_.get());
+    Preconditions.checkState(executorService_.isTerminated());
+    try {
+      for (Future<Void> result : futures_) {
+        result.get();
+      }
+    } catch (InterruptedException e) {
+      // ignored
+    }
+  }
+
+  /**
+   * Returns true if all the tasks have completed
+   */
+  public boolean isTerminated() {
+    return executorService_.isTerminated();
+  }
+
+  /**
+  * Returns a random column name for the queries
+  */
+  private static String getRandomColName(int size) {
+    return "col_" + TestUtils.getRandomString(size);
+  }
+
+  /**
+   * Terminates query runner. No gaurantees beyond best effort to terminate currently
+   * running queries
+   */
+  public void shutdownNow() {
+    try {
+      executorService_.shutdownNow();
+    } finally {
+      if (hiveJdbcClientPool_ != null) { hiveJdbcClientPool_.close(); }
+    }
+  }
+}