You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ha...@apache.org on 2017/12/08 23:33:48 UTC

[2/2] hive git commit: HIVE-18196 : Druid Mini Cluster to run Qtests integrations tests. (Slim Bouguerra via Ashutosh Chauhan)

HIVE-18196 : Druid Mini Cluster to run Qtests integrations tests. (Slim Bouguerra via Ashutosh Chauhan)

Signed-off-by: Ashutosh Chauhan <ha...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/1b3711b3
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/1b3711b3
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/1b3711b3

Branch: refs/heads/master
Commit: 1b3711b33cdce33688eabcc715880d2134242692
Parents: 1d53035
Author: Slim Bouguerra <sl...@gmail.com>
Authored: Fri Dec 8 15:32:46 2017 -0800
Committer: Ashutosh Chauhan <ha...@apache.org>
Committed: Fri Dec 8 15:32:46 2017 -0800

----------------------------------------------------------------------
 .../org/apache/hadoop/hive/conf/HiveConf.java   |   4 +-
 data/scripts/q_test_cleanup_druid.sql           |   2 +
 data/scripts/q_test_druid_init.sql              |  29 +
 druid-handler/pom.xml                           |   1 +
 .../hadoop/hive/druid/DruidStorageHandler.java  | 365 ++++-----
 .../hive/druid/DruidStorageHandlerUtils.java    |  34 +-
 .../hive/druid/TestDruidStorageHandler.java     |  21 +-
 .../hive/ql/io/TestDruidRecordWriter.java       |   2 +-
 itests/hive-unit-hadoop2/pom.xml                |   6 +
 itests/hive-unit/pom.xml                        |   6 +
 itests/pom.xml                                  |   1 +
 itests/qtest-accumulo/pom.xml                   |   4 +
 itests/qtest-druid/pom.xml                      | 255 +++++++
 .../java/org/apache/hive/druid/DruidNode.java   |  26 +
 .../org/apache/hive/druid/ForkingDruidNode.java | 160 ++++
 .../org/apache/hive/druid/MiniDruidCluster.java | 194 +++++
 itests/qtest-spark/pom.xml                      |   6 +
 itests/qtest/pom.xml                            |  22 +-
 .../hadoop/hive/cli/TestMiniDruidCliDriver.java |  62 ++
 .../test/resources/testconfiguration.properties |   4 +
 itests/util/pom.xml                             |  37 +
 .../hadoop/hive/cli/control/CliConfigs.java     |  24 +
 .../org/apache/hadoop/hive/ql/QTestUtil.java    |  38 +-
 pom.xml                                         |   4 +-
 .../queries/clientpositive/druidmini_test1.q    | 121 +++
 .../clientpositive/druidmini_test_insert.q      |  53 ++
 .../clientpositive/druid/druidmini_test1.q.out  | 748 +++++++++++++++++++
 .../druid/druidmini_test_insert.q.out           | 150 ++++
 28 files changed, 2177 insertions(+), 202 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/1b3711b3/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index adfa139..a0b163d 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -2066,7 +2066,7 @@ public class HiveConf extends Configuration {
             , "druid deep storage location."),
     DRUID_METADATA_BASE("hive.druid.metadata.base", "druid", "Default prefix for metadata tables"),
     DRUID_METADATA_DB_TYPE("hive.druid.metadata.db.type", "mysql",
-            new PatternSet("mysql", "postgresql"), "Type of the metadata database."
+            new PatternSet("mysql", "postgresql", "derby"), "Type of the metadata database."
     ),
     DRUID_METADATA_DB_USERNAME("hive.druid.metadata.username", "",
             "Username to connect to Type of the metadata DB."
@@ -2081,7 +2081,7 @@ public class HiveConf extends Configuration {
             "Default hdfs working directory used to store some intermediate metadata"
     ),
     HIVE_DRUID_MAX_TRIES("hive.druid.maxTries", 5, "Maximum number of retries before giving up"),
-    HIVE_DRUID_PASSIVE_WAIT_TIME("hive.druid.passiveWaitTimeMs", 30000,
+    HIVE_DRUID_PASSIVE_WAIT_TIME("hive.druid.passiveWaitTimeMs", 30000L,
             "Wait time in ms default to 30 seconds."
     ),
     HIVE_DRUID_BITMAP_FACTORY_TYPE("hive.druid.bitmap.type", "roaring", new PatternSet("roaring", "concise"), "Coding algorithm use to encode the bitmaps"),

http://git-wip-us.apache.org/repos/asf/hive/blob/1b3711b3/data/scripts/q_test_cleanup_druid.sql
----------------------------------------------------------------------
diff --git a/data/scripts/q_test_cleanup_druid.sql b/data/scripts/q_test_cleanup_druid.sql
new file mode 100644
index 0000000..b0d3425
--- /dev/null
+++ b/data/scripts/q_test_cleanup_druid.sql
@@ -0,0 +1,2 @@
+DROP TABLE IF EXISTS alltypesorc;
+DROP TABLE IF EXISTS druid_table;

http://git-wip-us.apache.org/repos/asf/hive/blob/1b3711b3/data/scripts/q_test_druid_init.sql
----------------------------------------------------------------------
diff --git a/data/scripts/q_test_druid_init.sql b/data/scripts/q_test_druid_init.sql
new file mode 100644
index 0000000..ee025f1
--- /dev/null
+++ b/data/scripts/q_test_druid_init.sql
@@ -0,0 +1,29 @@
+set hive.stats.dbclass=fs;
+--
+-- Table alltypesorc
+--
+DROP TABLE IF EXISTS alltypesorc;
+CREATE TABLE alltypesorc(
+    ctinyint TINYINT,
+    csmallint SMALLINT,
+    cint INT,
+    cbigint BIGINT,
+    cfloat FLOAT,
+    cdouble DOUBLE,
+    cstring1 STRING,
+    cstring2 STRING,
+    ctimestamp1 TIMESTAMP,
+    ctimestamp2 TIMESTAMP,
+    cboolean1 BOOLEAN,
+    cboolean2 BOOLEAN)
+    STORED AS ORC;
+
+LOAD DATA LOCAL INPATH "${hiveconf:test.data.dir}/alltypesorc"
+OVERWRITE INTO  TABLE alltypesorc;
+
+ANALYZE TABLE alltypesorc COMPUTE STATISTICS;
+
+ANALYZE TABLE alltypesorc COMPUTE STATISTICS FOR COLUMNS ctinyint,csmallint,cint,cbigint,cfloat,cdouble,cstring1,cstring2,ctimestamp1,ctimestamp2,cboolean1,cboolean2;
+
+-- Druid Table
+

http://git-wip-us.apache.org/repos/asf/hive/blob/1b3711b3/druid-handler/pom.xml
----------------------------------------------------------------------
diff --git a/druid-handler/pom.xml b/druid-handler/pom.xml
index 5c8b521..2a62b90 100644
--- a/druid-handler/pom.xml
+++ b/druid-handler/pom.xml
@@ -341,6 +341,7 @@
                       <include>net.jpountz.lz4:*</include>
                       <include>org.apache.commons:*</include>
                       <include>org.roaringbitmap:*</include>
+                      <include>org.apache.derby:*</include>
                     </includes>
                   </artifactSet>
                   <filters>

http://git-wip-us.apache.org/repos/asf/hive/blob/1b3711b3/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandler.java
----------------------------------------------------------------------
diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandler.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandler.java
index fe66a44..33d811d 100644
--- a/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandler.java
+++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandler.java
@@ -17,10 +17,25 @@
  */
 package org.apache.hadoop.hive.druid;
 
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
+import com.google.common.base.Supplier;
+import com.google.common.base.Suppliers;
+import com.google.common.base.Throwables;
+import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import com.metamx.common.RetryUtils;
+import com.metamx.common.lifecycle.Lifecycle;
+import com.metamx.http.client.HttpClient;
+import com.metamx.http.client.HttpClientConfig;
+import com.metamx.http.client.HttpClientInit;
 import io.druid.metadata.MetadataStorageConnectorConfig;
 import io.druid.metadata.MetadataStorageTablesConfig;
 import io.druid.metadata.SQLMetadataConnector;
+import io.druid.metadata.storage.derby.DerbyConnector;
+import io.druid.metadata.storage.derby.DerbyMetadataStorage;
 import io.druid.metadata.storage.mysql.MySQLConnector;
 import io.druid.metadata.storage.postgresql.PostgreSQLConnector;
 import io.druid.segment.loading.DataSegmentPusher;
@@ -28,7 +43,6 @@ import io.druid.segment.loading.SegmentLoadingException;
 import io.druid.storage.hdfs.HdfsDataSegmentPusher;
 import io.druid.storage.hdfs.HdfsDataSegmentPusherConfig;
 import io.druid.timeline.DataSegment;
-
 import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
@@ -58,25 +72,7 @@ import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.OutputFormat;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hive.common.util.ShutdownHookManager;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-import com.google.common.base.Predicate;
-import com.google.common.base.Strings;
-import com.google.common.base.Supplier;
-import com.google.common.base.Suppliers;
-import com.google.common.base.Throwables;
-import com.google.common.collect.FluentIterable;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Sets;
-import com.metamx.common.RetryUtils;
-import com.metamx.common.lifecycle.Lifecycle;
-import com.metamx.http.client.HttpClient;
-import com.metamx.http.client.HttpClientConfig;
-import com.metamx.http.client.HttpClientInit;
-
 import org.joda.time.DateTime;
-import org.joda.time.DateTimeZone;
 import org.joda.time.Period;
 import org.skife.jdbi.v2.exceptions.CallbackFailedException;
 import org.slf4j.Logger;
@@ -88,6 +84,8 @@ import java.net.URL;
 import java.util.Collection;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
 
 /**
  * DruidStorageHandler provides a HiveStorageHandler implementation for Druid.
@@ -116,9 +114,9 @@ public class DruidStorageHandler extends DefaultHiveMetaHook implements HiveStor
     ShutdownHookManager.addShutdownHook(() -> lifecycle.stop());
   }
 
-  private final SQLMetadataConnector connector;
+  private SQLMetadataConnector connector;
 
-  private final MetadataStorageTablesConfig druidMetadataStorageTablesConfig;
+  private MetadataStorageTablesConfig druidMetadataStorageTablesConfig = null;
 
   private String uniqueId = null;
 
@@ -127,48 +125,6 @@ public class DruidStorageHandler extends DefaultHiveMetaHook implements HiveStor
   private Configuration conf;
 
   public DruidStorageHandler() {
-    //this is the default value in druid
-    final String base = HiveConf
-            .getVar(SessionState.getSessionConf(), HiveConf.ConfVars.DRUID_METADATA_BASE);
-    final String dbType = HiveConf
-            .getVar(SessionState.getSessionConf(), HiveConf.ConfVars.DRUID_METADATA_DB_TYPE);
-    final String username = HiveConf
-            .getVar(SessionState.getSessionConf(), HiveConf.ConfVars.DRUID_METADATA_DB_USERNAME);
-    final String password = HiveConf
-            .getVar(SessionState.getSessionConf(), HiveConf.ConfVars.DRUID_METADATA_DB_PASSWORD);
-    final String uri = HiveConf
-            .getVar(SessionState.getSessionConf(), HiveConf.ConfVars.DRUID_METADATA_DB_URI);
-    druidMetadataStorageTablesConfig = MetadataStorageTablesConfig.fromBase(base);
-
-    final Supplier<MetadataStorageConnectorConfig> storageConnectorConfigSupplier = Suppliers.<MetadataStorageConnectorConfig>ofInstance(
-            new MetadataStorageConnectorConfig() {
-              @Override
-              public String getConnectURI() {
-                return uri;
-              }
-
-              @Override
-              public String getUser() {
-                return username;
-              }
-
-              @Override
-              public String getPassword() {
-                return password;
-              }
-            });
-
-    if (dbType.equals("mysql")) {
-      connector = new MySQLConnector(storageConnectorConfigSupplier,
-              Suppliers.ofInstance(druidMetadataStorageTablesConfig)
-      );
-    } else if (dbType.equals("postgresql")) {
-      connector = new PostgreSQLConnector(storageConnectorConfigSupplier,
-              Suppliers.ofInstance(druidMetadataStorageTablesConfig)
-      );
-    } else {
-      throw new IllegalStateException(String.format("Unknown metadata storage type [%s]", dbType));
-    }
   }
 
   @VisibleForTesting
@@ -250,13 +206,13 @@ public class DruidStorageHandler extends DefaultHiveMetaHook implements HiveStor
     // We need to check the Druid metadata
     dataSourceName = Warehouse.getQualifiedName(table);
     try {
-      connector.createSegmentTable();
+      getConnector().createSegmentTable();
     } catch (Exception e) {
       LOG.error("Exception while trying to create druid segments table", e);
       throw new MetaException(e.getMessage());
     }
     Collection<String> existingDataSources = DruidStorageHandlerUtils
-            .getAllDataSourceNames(connector, druidMetadataStorageTablesConfig);
+            .getAllDataSourceNames(getConnector(), getDruidMetadataStorageTablesConfig());
     LOG.debug("pre-create data source with name {}", dataSourceName);
     if (existingDataSources.contains(dataSourceName)) {
       throw new MetaException(String.format("Data source [%s] already existing", dataSourceName));
@@ -272,7 +228,7 @@ public class DruidStorageHandler extends DefaultHiveMetaHook implements HiveStor
     final Path segmentDescriptorDir = getSegmentDescriptorDir();
     try {
       List<DataSegment> dataSegmentList = DruidStorageHandlerUtils
-              .getPublishedSegments(segmentDescriptorDir, getConf());
+              .getCreatedSegments(segmentDescriptorDir, getConf());
       for (DataSegment dataSegment : dataSegmentList) {
         try {
           deleteSegment(dataSegment);
@@ -290,144 +246,146 @@ public class DruidStorageHandler extends DefaultHiveMetaHook implements HiveStor
 
   @Override
   public void commitCreateTable(Table table) throws MetaException {
-    LOG.debug("commit create table {}", table.getTableName());
     if (MetaStoreUtils.isExternalTable(table)) {
       // For external tables, we do not need to do anything else
       return;
     }
-    publishSegments(table, true);
+    loadDruidSegments(table, true);
   }
 
-  public void publishSegments(Table table, boolean overwrite) throws MetaException {
+
+  protected void loadDruidSegments(Table table, boolean overwrite) throws MetaException {
+    // at this point we have Druid segments from reducers but we need to atomically
+    // rename and commit to metadata
     final String dataSourceName = table.getParameters().get(Constants.DRUID_DATA_SOURCE);
     final List<DataSegment> segmentList = Lists.newArrayList();
     final Path tableDir = getSegmentDescriptorDir();
-    console.logInfo(String.format("Committing hive table {} druid data source {} to the druid metadata store",
-            table.getTableName(), dataSourceName
-    ));
+    // Read the created segments metadata from the table staging directory
     try {
-      segmentList.addAll(DruidStorageHandlerUtils.getPublishedSegments(tableDir, getConf()));
+      segmentList.addAll(DruidStorageHandlerUtils.getCreatedSegments(tableDir, getConf()));
     } catch (IOException e) {
       LOG.error("Failed to load segments descriptor from directory {}", tableDir.toString());
       Throwables.propagate(e);
       cleanWorkingDir();
     }
+    // Moving Druid segments and committing to druid metadata as one transaction.
+    final HdfsDataSegmentPusherConfig hdfsSegmentPusherConfig = new HdfsDataSegmentPusherConfig();
+    List<DataSegment> publishedDataSegmentList = Lists.newArrayList();
+    final String segmentDirectory =
+            table.getParameters().get(Constants.DRUID_SEGMENT_DIRECTORY) != null
+                    ? table.getParameters().get(Constants.DRUID_SEGMENT_DIRECTORY)
+                    : HiveConf.getVar(getConf(), HiveConf.ConfVars.DRUID_SEGMENT_DIRECTORY);
+    LOG.info(String.format(
+            "Moving [%s] Druid segments from staging directory [%s] to Deep storage [%s]",
+            segmentList.size(),
+            getStagingWorkingDir(),
+            segmentDirectory
+
+            ));
+    hdfsSegmentPusherConfig.setStorageDirectory(segmentDirectory);
     try {
-      final String segmentDirectory =
-              table.getParameters().get(Constants.DRUID_SEGMENT_DIRECTORY) != null
-                      ? table.getParameters().get(Constants.DRUID_SEGMENT_DIRECTORY)
-                      : HiveConf.getVar(getConf(), HiveConf.ConfVars.DRUID_SEGMENT_DIRECTORY);
-      LOG.info(
-              String.format("Will move [%s] druid segments from [%s] to [%s]",
-                      segmentList.size(),
-                      getStagingWorkingDir(),
-                      segmentDirectory
-
-              ));
-      HdfsDataSegmentPusherConfig pusherConfig = new HdfsDataSegmentPusherConfig();
-      pusherConfig.setStorageDirectory(segmentDirectory);
-      DataSegmentPusher dataSegmentPusher = new HdfsDataSegmentPusher(pusherConfig, getConf(), DruidStorageHandlerUtils.JSON_MAPPER);
-      DruidStorageHandlerUtils.publishSegments(
-              connector,
-              druidMetadataStorageTablesConfig,
+      DataSegmentPusher dataSegmentPusher = new HdfsDataSegmentPusher(hdfsSegmentPusherConfig,
+              getConf(),
+              DruidStorageHandlerUtils.JSON_MAPPER
+      );
+      publishedDataSegmentList = DruidStorageHandlerUtils.publishSegmentsAndCommit(
+              getConnector(),
+              getDruidMetadataStorageTablesConfig(),
               dataSourceName,
               segmentList,
               overwrite,
-              segmentDirectory,
               getConf(),
               dataSegmentPusher
-
       );
-    } catch (CallbackFailedException | IOException e ) {
-      LOG.error("Failed to publish segments");
-      if (e instanceof CallbackFailedException)  {
+
+    } catch (CallbackFailedException | IOException e) {
+      LOG.error("Failed to move segments from staging directory");
+      if (e instanceof CallbackFailedException) {
         Throwables.propagate(e.getCause());
       }
       Throwables.propagate(e);
     } finally {
       cleanWorkingDir();
     }
-      final String coordinatorAddress = HiveConf
-              .getVar(getConf(), HiveConf.ConfVars.HIVE_DRUID_COORDINATOR_DEFAULT_ADDRESS);
-      int maxTries = HiveConf.getIntVar(getConf(), HiveConf.ConfVars.HIVE_DRUID_MAX_TRIES);
-      if (maxTries == 0) {
-        return;
-      }
-      LOG.debug("checking load status from coordinator {}", coordinatorAddress);
+      checkLoadStatus(publishedDataSegmentList);
+  }
+
+  /**
+   * This function checks the load status of Druid segments by polling druid coordinator.
+   * @param segments List of druid segments to check for
+   *
+   * @return count of yet to load segments.
+   */
+  private int checkLoadStatus(List<DataSegment> segments){
+    final String coordinatorAddress = HiveConf
+            .getVar(getConf(), HiveConf.ConfVars.HIVE_DRUID_COORDINATOR_DEFAULT_ADDRESS);
+    int maxTries = HiveConf.getIntVar(getConf(), HiveConf.ConfVars.HIVE_DRUID_MAX_TRIES);
+    if (maxTries == 0) {
+      return segments.size();
+    }
+    LOG.debug("checking load status from coordinator {}", coordinatorAddress);
 
-      String coordinatorResponse = null;
+    String coordinatorResponse;
+    try {
+      coordinatorResponse = RetryUtils.retry(() -> DruidStorageHandlerUtils.getURL(getHttpClient(),
+              new URL(String.format("http://%s/status", coordinatorAddress))
+      ), input -> input instanceof IOException, maxTries);
+    } catch (Exception e) {
+      console.printInfo(
+              "Will skip waiting for data loading, coordinator unavailable");
+      return segments.size();
+    }
+    if (Strings.isNullOrEmpty(coordinatorResponse)) {
+      console.printInfo(
+              "Will skip waiting for data loading empty response from coordinator");
+      return segments.size();
+    }
+    console.printInfo(
+            String.format("Waiting for the loading of [%s] segments", segments.size()));
+    long passiveWaitTimeMs = HiveConf
+            .getLongVar(getConf(), HiveConf.ConfVars.HIVE_DRUID_PASSIVE_WAIT_TIME);
+    Set<URL> UrlsOfUnloadedSegments = segments.stream().map(dataSegment -> {
       try {
-        coordinatorResponse = RetryUtils.retry(() -> DruidStorageHandlerUtils.getURL(getHttpClient(),
-                new URL(String.format("http://%s/status", coordinatorAddress))
-        ), input -> input instanceof IOException, maxTries);
-      } catch (Exception e) {
-        console.printInfo(
-                "Will skip waiting for data loading, coordinator unavailable");
-        return;
-      }
-      if (Strings.isNullOrEmpty(coordinatorResponse)) {
-        console.printInfo(
-                "Will skip waiting for data loading empty response from coordinator");
-        return;
+        //Need to make sure that we are using segment identifier
+        return new URL(String.format("http://%s/druid/coordinator/v1/datasources/%s/segments/%s",
+                coordinatorAddress, dataSegment.getDataSource(), dataSegment.getIdentifier()
+        ));
+      } catch (MalformedURLException e) {
+        Throwables.propagate(e);
       }
-      console.printInfo(
-              String.format("Waiting for the loading of [%s] segments", segmentList.size()));
-      long passiveWaitTimeMs = HiveConf
-              .getLongVar(getConf(), HiveConf.ConfVars.HIVE_DRUID_PASSIVE_WAIT_TIME);
-      ImmutableSet<URL> setOfUrls = FluentIterable.from(segmentList)
-              .transform(dataSegment -> {
-                try {
-                  //Need to make sure that we are using UTC since most of the druid cluster use UTC by default
-                  return new URL(String
-                          .format("http://%s/druid/coordinator/v1/datasources/%s/segments/%s",
-                                  coordinatorAddress, dataSourceName, DataSegment
-                                          .makeDataSegmentIdentifier(dataSegment.getDataSource(),
-                                                  new DateTime(dataSegment.getInterval()
-                                                          .getStartMillis(), DateTimeZone.UTC),
-                                                  new DateTime(dataSegment.getInterval()
-                                                          .getEndMillis(), DateTimeZone.UTC),
-                                                  dataSegment.getVersion(),
-                                                  dataSegment.getShardSpec()
-                                          )
-                          ));
-                } catch (MalformedURLException e) {
-                  Throwables.propagate(e);
-                }
-                return null;
-              }).toSet();
-
-      int numRetries = 0;
-      while (numRetries++ < maxTries && !setOfUrls.isEmpty()) {
-        setOfUrls = ImmutableSet.copyOf(Sets.filter(setOfUrls, new Predicate<URL>() {
-          @Override
-          public boolean apply(URL input) {
-            try {
-              String result = DruidStorageHandlerUtils.getURL(getHttpClient(), input);
-              LOG.debug("Checking segment {} response is {}", input, result);
-              return Strings.isNullOrEmpty(result);
-            } catch (IOException e) {
-              LOG.error(String.format("Error while checking URL [%s]", input), e);
-              return true;
-            }
-          }
-        }));
+      return null;
+    }).collect(Collectors.toSet());
 
+    int numRetries = 0;
+    while (numRetries++ < maxTries && !UrlsOfUnloadedSegments.isEmpty()) {
+      UrlsOfUnloadedSegments = ImmutableSet.copyOf(Sets.filter(UrlsOfUnloadedSegments, input -> {
         try {
-          if (!setOfUrls.isEmpty()) {
-            Thread.sleep(passiveWaitTimeMs);
-          }
-        } catch (InterruptedException e) {
-          Thread.interrupted();
-          Throwables.propagate(e);
+          String result = DruidStorageHandlerUtils.getURL(getHttpClient(), input);
+          LOG.debug("Checking segment [{}] response is [{}]", input, result);
+          return Strings.isNullOrEmpty(result);
+        } catch (IOException e) {
+          LOG.error(String.format("Error while checking URL [%s]", input), e);
+          return true;
         }
+      }));
+
+      try {
+        if (!UrlsOfUnloadedSegments.isEmpty()) {
+          Thread.sleep(passiveWaitTimeMs);
+        }
+      } catch (InterruptedException e) {
+        Thread.interrupted();
+        Throwables.propagate(e);
       }
-      if (!setOfUrls.isEmpty()) {
-        // We are not Throwing an exception since it might be a transient issue that is blocking loading
-        console.printError(String.format(
-                "Wait time exhausted and we have [%s] out of [%s] segments not loaded yet",
-                setOfUrls.size(), segmentList.size()
-        ));
-      }
+    }
+    if (!UrlsOfUnloadedSegments.isEmpty()) {
+      // We are not Throwing an exception since it might be a transient issue that is blocking loading
+      console.printError(String.format(
+              "Wait time exhausted and we have [%s] out of [%s] segments not loaded yet",
+              UrlsOfUnloadedSegments.size(), segments.size()
+      ));
+    }
+    return UrlsOfUnloadedSegments.size();
   }
 
   @VisibleForTesting
@@ -503,7 +461,7 @@ public class DruidStorageHandler extends DefaultHiveMetaHook implements HiveStor
     if (deleteData == true) {
       LOG.info("Dropping with purge all the data for data source {}", dataSourceName);
       List<DataSegment> dataSegmentList = DruidStorageHandlerUtils
-              .getDataSegmentList(connector, druidMetadataStorageTablesConfig, dataSourceName);
+              .getDataSegmentList(getConnector(), getDruidMetadataStorageTablesConfig(), dataSourceName);
       if (dataSegmentList.isEmpty()) {
         LOG.info("Nothing to delete for data source {}", dataSourceName);
         return;
@@ -517,7 +475,7 @@ public class DruidStorageHandler extends DefaultHiveMetaHook implements HiveStor
       }
     }
     if (DruidStorageHandlerUtils
-            .disableDataSource(connector, druidMetadataStorageTablesConfig, dataSourceName)) {
+            .disableDataSource(getConnector(), getDruidMetadataStorageTablesConfig(), dataSourceName)) {
       LOG.info("Successfully dropped druid data source {}", dataSourceName);
     }
   }
@@ -529,7 +487,7 @@ public class DruidStorageHandler extends DefaultHiveMetaHook implements HiveStor
     if (MetaStoreUtils.isExternalTable(table)) {
       throw new MetaException("Cannot insert data into external table backed by Druid");
     }
-    this.publishSegments(table, overwrite);
+    this.loadDruidSegments(table, overwrite);
   }
 
   @Override
@@ -602,6 +560,69 @@ public class DruidStorageHandler extends DefaultHiveMetaHook implements HiveStor
     return new Path(getRootWorkingDir(), makeStagingName());
   }
 
+  private MetadataStorageTablesConfig getDruidMetadataStorageTablesConfig() {
+    if (druidMetadataStorageTablesConfig != null) {
+      return druidMetadataStorageTablesConfig;
+    }
+    final String base = HiveConf
+            .getVar(getConf(), HiveConf.ConfVars.DRUID_METADATA_BASE);
+    druidMetadataStorageTablesConfig = MetadataStorageTablesConfig.fromBase(base);
+    return druidMetadataStorageTablesConfig;
+  }
+
+  private SQLMetadataConnector getConnector() {
+    if (connector != null) {
+      return connector;
+    }
+
+    final String dbType = HiveConf
+            .getVar(getConf(), HiveConf.ConfVars.DRUID_METADATA_DB_TYPE);
+    final String username = HiveConf
+            .getVar(getConf(), HiveConf.ConfVars.DRUID_METADATA_DB_USERNAME);
+    final String password = HiveConf
+            .getVar(getConf(), HiveConf.ConfVars.DRUID_METADATA_DB_PASSWORD);
+    final String uri = HiveConf
+            .getVar(getConf(), HiveConf.ConfVars.DRUID_METADATA_DB_URI);
+
+
+    final Supplier<MetadataStorageConnectorConfig> storageConnectorConfigSupplier = Suppliers.<MetadataStorageConnectorConfig>ofInstance(
+            new MetadataStorageConnectorConfig() {
+              @Override
+              public String getConnectURI() {
+                return uri;
+              }
+
+              @Override
+              public String getUser() {
+                return Strings.emptyToNull(username);
+              }
+
+              @Override
+              public String getPassword() {
+                return Strings.emptyToNull(password);
+              }
+            });
+    if (dbType.equals("mysql")) {
+      connector = new MySQLConnector(storageConnectorConfigSupplier,
+              Suppliers.ofInstance(getDruidMetadataStorageTablesConfig())
+      );
+    } else if (dbType.equals("postgresql")) {
+      connector = new PostgreSQLConnector(storageConnectorConfigSupplier,
+              Suppliers.ofInstance(getDruidMetadataStorageTablesConfig())
+      );
+
+    } else if (dbType.equals("derby")) {
+      connector = new DerbyConnector(new DerbyMetadataStorage(storageConnectorConfigSupplier.get()),
+              storageConnectorConfigSupplier, Suppliers.ofInstance(getDruidMetadataStorageTablesConfig())
+      );
+    }
+    else {
+      throw new IllegalStateException(String.format("Unknown metadata storage type [%s]", dbType));
+    }
+
+    return connector;
+  }
+
   @VisibleForTesting
   protected String makeStagingName() {
     return ".staging-".concat(getUniqueId().replace(":", ""));

http://git-wip-us.apache.org/repos/asf/hive/blob/1b3711b3/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandlerUtils.java
----------------------------------------------------------------------
diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandlerUtils.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandlerUtils.java
index fbceaac..d8afb5d 100644
--- a/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandlerUtils.java
+++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandlerUtils.java
@@ -83,7 +83,6 @@ import org.skife.jdbi.v2.PreparedBatch;
 import org.skife.jdbi.v2.Query;
 import org.skife.jdbi.v2.ResultIterator;
 import org.skife.jdbi.v2.StatementContext;
-import org.skife.jdbi.v2.TransactionCallback;
 import org.skife.jdbi.v2.exceptions.CallbackFailedException;
 import org.skife.jdbi.v2.tweak.HandleCallback;
 import org.skife.jdbi.v2.util.ByteArrayMapper;
@@ -101,7 +100,9 @@ import java.net.URL;
 import java.net.UnknownHostException;
 import java.sql.SQLException;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -246,7 +247,7 @@ public final class DruidStorageHandlerUtils {
    *
    * @throws IOException can be for the case we did not produce data.
    */
-  public static List<DataSegment> getPublishedSegments(Path taskDir, Configuration conf)
+  public static List<DataSegment> getCreatedSegments(Path taskDir, Configuration conf)
           throws IOException {
     ImmutableList.Builder<DataSegment> publishedSegmentsBuilder = ImmutableList.builder();
     FileSystem fs = taskDir.getFileSystem(conf);
@@ -373,17 +374,34 @@ public final class DruidStorageHandlerUtils {
     return true;
   }
 
-  public static void publishSegments(final SQLMetadataConnector connector,
+  /**
+   * First computes the segments timeline to accommodate new segments for insert into case
+   * Then moves segments to druid deep storage with updated metadata/version
+   * ALL IS DONE IN ONE TRANSACTION
+   *
+   * @param connector DBI connector to commit
+   * @param metadataStorageTablesConfig Druid metadata tables definitions
+   * @param dataSource Druid datasource name
+   * @param segments List of segments to move and commit to metadata
+   * @param overwrite if it is an insert overwrite
+   * @param conf Configuration
+   * @param dataSegmentPusher segment pusher
+   *
+   * @return List of successfully published Druid segments.
+   * This list has the updated versions and metadata about segments after move and timeline sorting
+   *
+   * @throws CallbackFailedException
+   */
+  public static List<DataSegment> publishSegmentsAndCommit(final SQLMetadataConnector connector,
           final MetadataStorageTablesConfig metadataStorageTablesConfig,
           final String dataSource,
           final List<DataSegment> segments,
           boolean overwrite,
-          String segmentDirectory,
           Configuration conf,
           DataSegmentPusher dataSegmentPusher
   ) throws CallbackFailedException {
-    connector.getDBI().inTransaction(
-            (TransactionCallback<Void>) (handle, transactionStatus) -> {
+    return connector.getDBI().inTransaction(
+            (handle, transactionStatus) -> {
               // We create the timeline for the existing and new segments
               VersionedIntervalTimeline<String, DataSegment> timeline;
               if (overwrite) {
@@ -397,7 +415,7 @@ public final class DruidStorageHandlerUtils {
                 // Append Mode
                 if (segments.isEmpty()) {
                   // If there are no new segments, we can just bail out
-                  return null;
+                  return Collections.EMPTY_LIST;
                 }
                 // Otherwise, build a timeline of existing segments in metadata storage
                 Interval indexedInterval = JodaUtils
@@ -504,7 +522,7 @@ public final class DruidStorageHandlerUtils {
               }
               batch.execute();
 
-              return null;
+              return finalSegmentsToPublish;
             }
     );
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/1b3711b3/druid-handler/src/test/org/apache/hadoop/hive/druid/TestDruidStorageHandler.java
----------------------------------------------------------------------
diff --git a/druid-handler/src/test/org/apache/hadoop/hive/druid/TestDruidStorageHandler.java b/druid-handler/src/test/org/apache/hadoop/hive/druid/TestDruidStorageHandler.java
index 0b13a08..6f7fc78 100644
--- a/druid-handler/src/test/org/apache/hadoop/hive/druid/TestDruidStorageHandler.java
+++ b/druid-handler/src/test/org/apache/hadoop/hive/druid/TestDruidStorageHandler.java
@@ -323,10 +323,9 @@ public class TestDruidStorageHandler {
             .asList(createSegment(new Path(taskDirPath, DruidStorageHandlerUtils.INDEX_ZIP).toString(),
                     new Interval(100, 150), "v0", new LinearShardSpec(0)));
     DruidStorageHandlerUtils
-            .publishSegments(connector, metadataStorageTablesConfig, DATA_SOURCE_NAME,
+            .publishSegmentsAndCommit(connector, metadataStorageTablesConfig, DATA_SOURCE_NAME,
                     existingSegments,
                     true,
-                    taskDirPath.toString(),
                     config,
                     dataSegmentPusher
             );
@@ -377,10 +376,9 @@ public class TestDruidStorageHandler {
             .asList(createSegment(new Path(taskDirPath, DruidStorageHandlerUtils.INDEX_ZIP).toString(),
                     new Interval(100, 150), "v0", new LinearShardSpec(0)));
     DruidStorageHandlerUtils
-            .publishSegments(connector, metadataStorageTablesConfig, DATA_SOURCE_NAME,
+            .publishSegmentsAndCommit(connector, metadataStorageTablesConfig, DATA_SOURCE_NAME,
                     existingSegments,
                     true,
-                    taskDirPath.toString(),
                     config,
                     dataSegmentPusher
             );
@@ -522,10 +520,9 @@ public class TestDruidStorageHandler {
     DataSegmentPusher dataSegmentPusher = new HdfsDataSegmentPusher(pusherConfig, config, DruidStorageHandlerUtils.JSON_MAPPER);
 
     DruidStorageHandlerUtils
-            .publishSegments(connector, metadataStorageTablesConfig, DATA_SOURCE_NAME,
+            .publishSegmentsAndCommit(connector, metadataStorageTablesConfig, DATA_SOURCE_NAME,
                     existingSegments,
                     true,
-                    taskDirPath.toString(),
                     config,
                     dataSegmentPusher
             );
@@ -576,10 +573,9 @@ public class TestDruidStorageHandler {
             .asList(createSegment(new Path(taskDirPath, DruidStorageHandlerUtils.INDEX_ZIP).toString(),
                     new Interval(100, 150), "v0", new LinearShardSpec(0)));
     DruidStorageHandlerUtils
-            .publishSegments(connector, metadataStorageTablesConfig, DATA_SOURCE_NAME,
+            .publishSegmentsAndCommit(connector, metadataStorageTablesConfig, DATA_SOURCE_NAME,
                     existingSegments,
                     true,
-                    taskDirPath.toString(),
                     config,
                     dataSegmentPusher
             );
@@ -630,10 +626,9 @@ public class TestDruidStorageHandler {
     pusherConfig.setStorageDirectory(config.get(String.valueOf(HiveConf.ConfVars.DRUID_SEGMENT_DIRECTORY)));
     DataSegmentPusher dataSegmentPusher = new HdfsDataSegmentPusher(pusherConfig, config, DruidStorageHandlerUtils.JSON_MAPPER);
     DruidStorageHandlerUtils
-            .publishSegments(connector, metadataStorageTablesConfig, DATA_SOURCE_NAME,
+            .publishSegmentsAndCommit(connector, metadataStorageTablesConfig, DATA_SOURCE_NAME,
                     existingSegments,
                     true,
-                    taskDirPath.toString(),
                     config,
                     dataSegmentPusher
             );
@@ -699,10 +694,9 @@ public class TestDruidStorageHandler {
     pusherConfig.setStorageDirectory(taskDirPath.toString());
     DataSegmentPusher dataSegmentPusher = new HdfsDataSegmentPusher(pusherConfig, config, DruidStorageHandlerUtils.JSON_MAPPER);
     DruidStorageHandlerUtils
-            .publishSegments(connector, metadataStorageTablesConfig, DATA_SOURCE_NAME,
+            .publishSegmentsAndCommit(connector, metadataStorageTablesConfig, DATA_SOURCE_NAME,
                     existingSegments,
                     true,
-                    taskDirPath.toString(),
                     config,
                     dataSegmentPusher
             );
@@ -738,10 +732,9 @@ public class TestDruidStorageHandler {
     pusherConfig.setStorageDirectory(taskDirPath.toString());
     DataSegmentPusher dataSegmentPusher = new HdfsDataSegmentPusher(pusherConfig, config, DruidStorageHandlerUtils.JSON_MAPPER);
     DruidStorageHandlerUtils
-            .publishSegments(connector, metadataStorageTablesConfig, DATA_SOURCE_NAME,
+            .publishSegmentsAndCommit(connector, metadataStorageTablesConfig, DATA_SOURCE_NAME,
                     existingSegments,
                     true,
-                    taskDirPath.toString(),
                     config,
                     dataSegmentPusher
             );

http://git-wip-us.apache.org/repos/asf/hive/blob/1b3711b3/druid-handler/src/test/org/apache/hadoop/hive/ql/io/TestDruidRecordWriter.java
----------------------------------------------------------------------
diff --git a/druid-handler/src/test/org/apache/hadoop/hive/ql/io/TestDruidRecordWriter.java b/druid-handler/src/test/org/apache/hadoop/hive/ql/io/TestDruidRecordWriter.java
index af75bfb..4bf3fa2 100644
--- a/druid-handler/src/test/org/apache/hadoop/hive/ql/io/TestDruidRecordWriter.java
+++ b/druid-handler/src/test/org/apache/hadoop/hive/ql/io/TestDruidRecordWriter.java
@@ -187,7 +187,7 @@ public class TestDruidRecordWriter {
     }
     druidRecordWriter.close(false);
     List<DataSegment> dataSegmentList = DruidStorageHandlerUtils
-            .getPublishedSegments(segmentDescriptroPath, config);
+            .getCreatedSegments(segmentDescriptroPath, config);
     Assert.assertEquals(1, dataSegmentList.size());
     File tmpUnzippedSegmentDir = temporaryFolder.newFolder();
     new LocalDataSegmentPuller().getSegmentFiles(dataSegmentList.get(0), tmpUnzippedSegmentDir);

http://git-wip-us.apache.org/repos/asf/hive/blob/1b3711b3/itests/hive-unit-hadoop2/pom.xml
----------------------------------------------------------------------
diff --git a/itests/hive-unit-hadoop2/pom.xml b/itests/hive-unit-hadoop2/pom.xml
index ebf81c9..fb31fd4 100644
--- a/itests/hive-unit-hadoop2/pom.xml
+++ b/itests/hive-unit-hadoop2/pom.xml
@@ -72,6 +72,12 @@
       <groupId>org.apache.hive</groupId>
       <artifactId>hive-it-util</artifactId>
       <version>${project.version}</version>
+      <exclusions>
+        <exclusion>
+          <groupId>org.apache.hive</groupId>
+          <artifactId>hive-it-druid</artifactId>
+        </exclusion>
+      </exclusions>
       <scope>test</scope>
     </dependency>
     <dependency>

http://git-wip-us.apache.org/repos/asf/hive/blob/1b3711b3/itests/hive-unit/pom.xml
----------------------------------------------------------------------
diff --git a/itests/hive-unit/pom.xml b/itests/hive-unit/pom.xml
index 73a454f..626bbfb 100644
--- a/itests/hive-unit/pom.xml
+++ b/itests/hive-unit/pom.xml
@@ -94,6 +94,12 @@
       <groupId>org.apache.hive</groupId>
       <artifactId>hive-it-util</artifactId>
       <version>${project.version}</version>
+      <exclusions>
+        <exclusion>
+          <groupId>org.apache.hive</groupId>
+          <artifactId>hive-it-druid</artifactId>
+        </exclusion>
+      </exclusions>
     </dependency>
     <!-- inter-project -->
     <dependency>

http://git-wip-us.apache.org/repos/asf/hive/blob/1b3711b3/itests/pom.xml
----------------------------------------------------------------------
diff --git a/itests/pom.xml b/itests/pom.xml
index 3bf29f9..a782cd2 100644
--- a/itests/pom.xml
+++ b/itests/pom.xml
@@ -46,6 +46,7 @@
    <module>hive-jmh</module>
    <module>hive-unit-hadoop2</module>
    <module>hive-minikdc</module>
+   <module>qtest-druid</module>
   </modules>
 
   <profiles>

http://git-wip-us.apache.org/repos/asf/hive/blob/1b3711b3/itests/qtest-accumulo/pom.xml
----------------------------------------------------------------------
diff --git a/itests/qtest-accumulo/pom.xml b/itests/qtest-accumulo/pom.xml
index eae436b..1386ce7 100644
--- a/itests/qtest-accumulo/pom.xml
+++ b/itests/qtest-accumulo/pom.xml
@@ -94,6 +94,10 @@
           <groupId>org.apache.hive</groupId>
           <artifactId>hive-exec</artifactId>
         </exclusion>
+        <exclusion>
+          <groupId>org.apache.hive</groupId>
+          <artifactId>hive-it-druid</artifactId>
+        </exclusion>
       </exclusions>
     </dependency>
     <dependency>

http://git-wip-us.apache.org/repos/asf/hive/blob/1b3711b3/itests/qtest-druid/pom.xml
----------------------------------------------------------------------
diff --git a/itests/qtest-druid/pom.xml b/itests/qtest-druid/pom.xml
new file mode 100644
index 0000000..a807d03
--- /dev/null
+++ b/itests/qtest-druid/pom.xml
@@ -0,0 +1,255 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~ with the License.  You may obtain a copy of the License at
+  ~
+  ~      http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <parent>
+    <artifactId>hive-it</artifactId>
+    <groupId>org.apache.hive</groupId>
+    <version>3.0.0-SNAPSHOT</version>
+    <relativePath>../pom.xml</relativePath>
+  </parent>
+  <modelVersion>4.0.0</modelVersion>
+
+  <artifactId>hive-it-druid</artifactId>
+
+  <packaging>jar</packaging>
+  <name>Hive Integration - QFile Druid Tests</name>
+
+  <!-- dependencies are always listed in sorted order by groupId, artifectId -->
+  <!-- test intra-project -->
+  <properties>
+    <hive.path.to.root>../..</hive.path.to.root>
+    <druid.curator.version>2.11.0</druid.curator.version>
+    <druid.jersey.version>1.19.3</druid.jersey.version>
+    <druid.jetty.version>9.3.19.v20170502</druid.jetty.version>
+    <druid.derby.version>10.11.1.1</druid.derby.version>
+    <druid.guava.version>16.0.1</druid.guava.version>
+    <druid.guice.version>4.1.0</druid.guice.version>
+  </properties>
+      <dependencies>
+        <dependency>
+          <groupId>io.druid</groupId>
+          <artifactId>druid-server</artifactId>
+          <version>${druid.version}</version>
+          <exclusions>
+            <exclusion>
+              <artifactId>jersey-server</artifactId>
+              <groupId>com.sun.jersey</groupId>
+            </exclusion>
+            <exclusion>
+              <artifactId>jersey-servlet</artifactId>
+              <groupId>com.sun.jersey</groupId>
+            </exclusion>
+            <exclusion>
+              <artifactId>jersey-core</artifactId>
+              <groupId>com.sun.jersey</groupId>
+            </exclusion>
+          </exclusions>
+        </dependency>
+        <dependency>
+          <groupId>io.druid</groupId>
+          <artifactId>druid-services</artifactId>
+          <version>${druid.version}</version>
+          <exclusions>
+            <exclusion>
+              <artifactId>jersey-server</artifactId>
+              <groupId>com.sun.jersey</groupId>
+            </exclusion>
+            <exclusion>
+              <artifactId>jersey-servlet</artifactId>
+              <groupId>com.sun.jersey</groupId>
+            </exclusion>
+            <exclusion>
+              <artifactId>jersey-core</artifactId>
+              <groupId>com.sun.jersey</groupId>
+            </exclusion>
+          </exclusions>
+        </dependency>
+        <dependency>
+          <groupId>com.google.guava</groupId>
+          <artifactId>guava</artifactId>
+          <version>${druid.guava.version}</version>
+        </dependency>
+        <dependency>
+          <groupId>io.druid.extensions</groupId>
+          <artifactId>druid-hdfs-storage</artifactId>
+          <version>${druid.version}</version>
+          <exclusions>
+            <exclusion>
+              <groupId>com.sun.jersey</groupId>
+              <artifactId>jersey-servlet</artifactId>
+            </exclusion>
+            <exclusion>
+              <groupId>com.sun.jersey</groupId>
+              <artifactId>jersey-client</artifactId>
+            </exclusion>
+          </exclusions>
+        </dependency>
+        <dependency>
+          <groupId>org.apache.logging.log4j</groupId>
+          <artifactId>log4j-api</artifactId>
+          <version>${log4j2.version}</version>
+        </dependency>
+        <dependency>
+          <groupId>org.apache.logging.log4j</groupId>
+          <artifactId>log4j-core</artifactId>
+          <version>${log4j2.version}</version>
+        </dependency>
+        <dependency>
+          <groupId>org.apache.curator</groupId>
+          <artifactId>curator-framework</artifactId>
+          <version>${druid.curator.version}</version>
+        </dependency>
+        <dependency>
+          <groupId>org.apache.curator</groupId>
+          <artifactId>curator-client</artifactId>
+          <version>${druid.curator.version}</version>
+        </dependency>
+        <dependency>
+          <groupId>org.apache.curator</groupId>
+          <artifactId>curator-x-discovery</artifactId>
+          <version>${druid.curator.version}</version>
+        </dependency>
+        <dependency>
+          <groupId>org.apache.curator</groupId>
+          <artifactId>curator-recipes</artifactId>
+          <version>${druid.curator.version}</version>
+        </dependency>
+       <dependency>
+          <groupId>com.sun.jersey</groupId>
+          <artifactId>jersey-bundle</artifactId>
+          <version>${druid.jersey.version}</version>
+        </dependency>
+        <dependency>
+          <groupId>org.eclipse.jetty</groupId>
+          <artifactId>jetty-server</artifactId>
+          <version>${druid.jetty.version}</version>
+        </dependency>
+        <dependency>
+          <groupId>org.eclipse.jetty</groupId>
+          <artifactId>jetty-servlet</artifactId>
+          <version>${druid.jetty.version}</version>
+        </dependency>
+        <dependency>
+          <groupId>org.eclipse.jetty</groupId>
+          <artifactId>jetty-servlets</artifactId>
+          <version>${druid.jetty.version}</version>
+        </dependency>
+        <dependency>
+          <groupId>org.eclipse.jetty</groupId>
+          <artifactId>jetty-proxy</artifactId>
+          <version>${druid.jetty.version}</version>
+        </dependency>
+        <dependency>
+          <groupId>org.eclipse.jetty</groupId>
+          <artifactId>jetty-util</artifactId>
+          <version>${druid.jetty.version}</version>
+        </dependency>
+        <dependency>
+          <groupId>org.eclipse.jetty</groupId>
+          <artifactId>jetty-security</artifactId>
+          <version>${druid.jetty.version}</version>
+        </dependency>
+        <dependency>
+          <groupId>com.fasterxml.jackson.core</groupId>
+          <artifactId>jackson-core</artifactId>
+          <version>${jackson.new.version}</version>
+        </dependency>
+        <dependency>
+          <groupId>com.fasterxml.jackson.core</groupId>
+          <artifactId>jackson-databind</artifactId>
+          <version>${jackson.new.version}</version>
+        </dependency>
+        <dependency>
+          <groupId>org.apache.hadoop</groupId>
+          <artifactId>hadoop-annotations</artifactId>
+          <scope>compile</scope>
+          <version>${hadoop.version}</version>
+        </dependency>
+        <dependency>
+          <groupId>com.google.inject</groupId>
+          <artifactId>guice</artifactId>
+          <version>${druid.guice.version}</version>
+        </dependency>
+        <!-- inter-project -->
+        <dependency>
+          <groupId>junit</groupId>
+          <artifactId>junit</artifactId>
+          <version>${junit.version}</version>
+          <scope>test</scope>
+        </dependency>
+      </dependencies>
+  <build>
+
+    <plugins>
+      <!-- Maven Shade Plugin -->
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-shade-plugin</artifactId>
+        <version>${maven.shade.plugin.version}</version>
+        <executions>
+          <!-- Run shade goal on package phase -->
+          <execution>
+            <phase>package</phase>
+            <goals>
+              <goal>shade</goal>
+            </goals>
+
+
+            <configuration>
+              <shadeTestJar>false</shadeTestJar>
+              <createDependencyReducedPom>false</createDependencyReducedPom>
+              <transformers>
+                  <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
+                    <mainClass>io.druid.cli.Main</mainClass>
+                  </transformer>
+                  <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
+              </transformers>
+              <artifactSet>
+                <excludes>
+                  <exclude>junit:*</exclude>
+                  <exclude>jmock:*</exclude>
+                  <exclude>*:xml-apis</exclude>
+                  <exclude>org.apache.maven:lib:tests</exclude>
+                  <exclude>javax.ws.rs:jsr311-api</exclude>
+                  <exclude>*:javax.el-api</exclude>
+                  <exclude>*:jsp-api*</exclude>
+                </excludes>
+              </artifactSet>
+              <filters>
+                <filter>
+                  <artifact>*:*</artifact>
+                  <excludes>
+                    <exclude>META-INF/*.SF</exclude>
+                    <exclude>META-INF/*.DSA</exclude>
+                    <exclude>META-INF/*.RSA</exclude>
+                  </excludes>
+                </filter>
+              </filters>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
+    </plugins>
+  </build>
+
+
+</project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hive/blob/1b3711b3/itests/qtest-druid/src/main/java/org/apache/hive/druid/DruidNode.java
----------------------------------------------------------------------
diff --git a/itests/qtest-druid/src/main/java/org/apache/hive/druid/DruidNode.java b/itests/qtest-druid/src/main/java/org/apache/hive/druid/DruidNode.java
new file mode 100644
index 0000000..1911144
--- /dev/null
+++ b/itests/qtest-druid/src/main/java/org/apache/hive/druid/DruidNode.java
@@ -0,0 +1,26 @@
+package org.apache.hive.druid;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+public abstract class DruidNode implements Closeable{
+
+  private final String nodeType;
+
+  public DruidNode(String nodeId) {this.nodeType = nodeId;}
+
+  final public String getNodeType() {
+    return nodeType;
+  }
+
+  /**
+   * starts the druid node
+   */
+  public abstract void start() throws IOException;
+
+  /**
+   * @return true if the process is working
+   */
+  public abstract boolean isAlive();
+
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/1b3711b3/itests/qtest-druid/src/main/java/org/apache/hive/druid/ForkingDruidNode.java
----------------------------------------------------------------------
diff --git a/itests/qtest-druid/src/main/java/org/apache/hive/druid/ForkingDruidNode.java b/itests/qtest-druid/src/main/java/org/apache/hive/druid/ForkingDruidNode.java
new file mode 100644
index 0000000..f81a0ca
--- /dev/null
+++ b/itests/qtest-druid/src/main/java/org/apache/hive/druid/ForkingDruidNode.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.druid;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
+import com.google.common.base.Throwables;
+import com.google.common.collect.Lists;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URLClassLoader;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+public class ForkingDruidNode extends DruidNode {
+  private final static String DEFAULT_JAVA_CMD = "java";
+
+  private final static Logger log = LoggerFactory.getLogger(ForkingDruidNode.class);
+
+  private final String classpath;
+
+  private final Map<String, String> properties;
+
+  private final List<String> jvmArgs;
+
+  private final File logLocation;
+
+  private final File logFile;
+
+  private final String javaCmd;
+
+  private final ProcessBuilder processBuilder = new ProcessBuilder();
+
+  private Process druidProcess = null;
+
+  private Boolean started = false;
+
+  private final List<String> allowedPrefixes = Lists.newArrayList(
+          "com.metamx",
+          "druid",
+          "io.druid",
+          "java.io.tmpdir",
+          "hadoop"
+  );
+
+  public ForkingDruidNode(String nodeType,
+          String extraClasspath,
+          Map<String, String> properties,
+          List<String> jvmArgs,
+          File logLocation,
+          String javaCmd
+  ) {
+    super(nodeType);
+
+    final List<String> command = Lists.newArrayList();
+    this.classpath = Strings.isNullOrEmpty(extraClasspath)
+            ? System.getProperty("java.class.path")
+            : extraClasspath;
+    this.properties = properties == null ? new HashMap<>() : properties;
+    this.jvmArgs = Preconditions.checkNotNull(jvmArgs);
+    this.logLocation = logLocation == null ? new File("/tmp/druid") : logLocation;
+    if (!this.logLocation.exists()) {
+      this.logLocation.mkdirs();
+    }
+
+    this.javaCmd = javaCmd == null ? DEFAULT_JAVA_CMD : javaCmd;
+
+    logFile = new File(this.logLocation, getNodeType() + ".log");
+    // set the log stream
+    processBuilder.redirectErrorStream(true);
+    processBuilder.redirectOutput(ProcessBuilder.Redirect.appendTo(logFile));
+    command.add(this.javaCmd);
+    command.addAll(this.jvmArgs);
+    command.add("-server");
+    command.add("-cp");
+    command.add(classpath);
+
+    // inject properties from the main App that matches allowedPrefix
+    for (String propName : System.getProperties().stringPropertyNames()) {
+      for (String allowedPrefix : allowedPrefixes) {
+        if (propName.startsWith(allowedPrefix)) {
+          command.add(
+                  String.format(
+                          "-D%s=%s",
+                          propName,
+                          System.getProperty(propName)
+                  )
+          );
+        }
+      }
+    }
+    this.properties
+            .forEach((key, value) -> command.add(String.format("-D%s=%s", key, value)));
+    command.addAll(Lists.newArrayList("io.druid.cli.Main", "server", getNodeType()));
+    processBuilder.command(command);
+    log.info("Creating forking druid node with " + String.join(" ", processBuilder.command()));
+  }
+
+  @Override
+  public void start() throws IOException {
+    synchronized (started) {
+      if (started == false) {
+        druidProcess = processBuilder.start();
+        started = true;
+      }
+      log.info("Started " + getNodeType());
+    }
+  }
+
+  @Override
+  public boolean isAlive() {
+    synchronized (started) {
+      return started && druidProcess != null && druidProcess.isAlive();
+    }
+  }
+
+  @Override
+  public void close() throws IOException {
+    synchronized (started) {
+      if (druidProcess != null && druidProcess.isAlive()) {
+        druidProcess.destroy();
+      }
+      try {
+        log.info("Waiting for " + getNodeType());
+        if (druidProcess.waitFor(5000, TimeUnit.MILLISECONDS)) {
+          log.info(String.format("Shutdown completed for node [%s]", getNodeType()));
+        } else {
+          log.info(String.format("Waiting to shutdown node [%s] exhausted shutting down forcibly", getNodeType()));
+          druidProcess.destroyForcibly();
+        }
+      } catch (InterruptedException e) {
+        Thread.interrupted();
+        Throwables.propagate(e);
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/1b3711b3/itests/qtest-druid/src/main/java/org/apache/hive/druid/MiniDruidCluster.java
----------------------------------------------------------------------
diff --git a/itests/qtest-druid/src/main/java/org/apache/hive/druid/MiniDruidCluster.java b/itests/qtest-druid/src/main/java/org/apache/hive/druid/MiniDruidCluster.java
new file mode 100644
index 0000000..71259dc
--- /dev/null
+++ b/itests/qtest-druid/src/main/java/org/apache/hive/druid/MiniDruidCluster.java
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.druid;
+
+import com.google.common.base.Throwables;
+import com.google.common.collect.ImmutableMap;
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.service.AbstractService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * This class has the hooks to start and stop the external Druid Nodes
+ */
+public class MiniDruidCluster extends AbstractService {
+  private static final Logger log = LoggerFactory.getLogger(MiniDruidCluster.class);
+
+  private static final String COMMON_DRUID_JVM_PROPPERTIES = "-Duser.timezone=UTC -Dfile.encoding=UTF-8 -Djava.util.logging.manager=org.apache.logging.log4j.jul.LogManager -Ddruid.emitter=logging -Ddruid.emitter.logging.logLevel=info";
+
+  private static final List<String> HISTORICAL_JVM_CONF = Arrays
+          .asList("-server", "-XX:MaxDirectMemorySize=10g", "-Xmx512m", "-Xmx512m",
+                  COMMON_DRUID_JVM_PROPPERTIES
+          );
+
+  private static final List<String> COORDINATOR_JVM_CONF = Arrays
+          .asList("-server", "-XX:MaxDirectMemorySize=2g", "-Xmx512m", "-Xms512m",
+                  COMMON_DRUID_JVM_PROPPERTIES
+          );
+
+  private static final Map<String, String> COMMON_DRUID_CONF = ImmutableMap.of(
+          "druid.metadata.storage.type", "derby"
+  );
+
+  private static final Map<String, String> COMMON_DRUID_HISTORICAL = ImmutableMap.of(
+          "druid.processing.buffer.sizeBytes", "213870912",
+          "druid.processing.numThreads", "2",
+          "druid.server.maxSize", "130000000000"
+  );
+
+  private static final Map<String, String> COMMON_COORDINATOR_INDEXER = ImmutableMap
+          .of(
+                  "druid.indexer.logs.type", "file",
+                  "druid.coordinator.asOverlord.enabled", "true",
+                  "druid.coordinator.asOverlord.overlordService", "druid/overlord",
+                  "druid.coordinator.period", "PT10S",
+                  "druid.manager.segments.pollDuration", "PT10S"
+          );
+
+  private final DruidNode historical;
+
+  private final DruidNode broker;
+
+  // Coordinator is running as Overlord as well.
+  private final DruidNode coordinator;
+
+  private final List<DruidNode> druidNodes;
+
+  private final File dataDirectory;
+
+  private final File logDirectory;
+
+  public MiniDruidCluster(String name) {
+    this(name, "/tmp/miniDruid/log", "/tmp/miniDruid/data", 2181, null);
+  }
+
+
+  public MiniDruidCluster(String name, String logDir, String dataDir, Integer zookeeperPort, String classpath) {
+    super(name);
+    this.dataDirectory = new File(dataDir, "druid-data");
+    this.logDirectory = new File(logDir);
+    try {
+
+      if (dataDirectory.exists()) {
+        // need to clean data directory to ensure that there is no interference from old runs
+        // Cleaning is happening here to allow debugging in case of tests fail
+        // we don;t have to clean logs since it is an append mode
+        log.info("Cleaning the druid-data directory [{}]", dataDirectory.getAbsolutePath());
+        FileUtils.deleteDirectory(dataDirectory);
+      } else {
+        log.info("Creating the druid-data directory [{}]", dataDirectory.getAbsolutePath());
+        dataDirectory.mkdirs();
+      }
+    } catch (IOException e) {
+      log.error("Failed to clean data directory");
+      Throwables.propagate(e);
+    }
+    String derbyURI = String
+            .format("jdbc:derby://localhost:1527/%s/druid_derby/metadata.db;create=true",
+                    dataDirectory.getAbsolutePath()
+            );
+    String segmentsCache = String
+            .format("[{\"path\":\"%s/druid/segment-cache\",\"maxSize\":130000000000}]",
+                    dataDirectory.getAbsolutePath()
+            );
+    String indexingLogDir = new File(logDirectory, "indexer-log").getAbsolutePath();
+
+    ImmutableMap.Builder<String, String> coordinatorMapBuilder = new ImmutableMap.Builder();
+    ImmutableMap.Builder<String, String> historicalMapBuilder = new ImmutableMap.Builder();
+
+    Map<String, String> coordinatorProperties = coordinatorMapBuilder.putAll(COMMON_DRUID_CONF)
+            .putAll(COMMON_COORDINATOR_INDEXER)
+            .put("druid.metadata.storage.connector.connectURI", derbyURI)
+            .put("druid.indexer.logs.directory", indexingLogDir)
+            .put("druid.zk.service.host", "localhost:" + zookeeperPort)
+            .put("druid.coordinator.startDelay", "PT1S")
+            .build();
+    Map<String, String> historicalProperties = historicalMapBuilder.putAll(COMMON_DRUID_CONF)
+            .putAll(COMMON_DRUID_HISTORICAL)
+            .put("druid.zk.service.host", "localhost:" + zookeeperPort)
+            .put("druid.segmentCache.locations", segmentsCache)
+            .put("druid.storage.storageDirectory", getDeepStorageDir())
+            .put("druid.storage.type", "hdfs")
+            .build();
+    coordinator = new ForkingDruidNode("coordinator", classpath, coordinatorProperties,
+            COORDINATOR_JVM_CONF,
+            logDirectory, null
+    );
+    historical = new ForkingDruidNode("historical", classpath, historicalProperties, HISTORICAL_JVM_CONF,
+            logDirectory, null
+    );
+    broker = new ForkingDruidNode("broker", classpath, historicalProperties, HISTORICAL_JVM_CONF,
+            logDirectory, null
+    );
+    druidNodes = Arrays.asList(coordinator, historical, broker);
+
+  }
+
+  @Override
+  protected void serviceStart() throws Exception {
+    druidNodes.stream().forEach(node -> {
+      try {
+        node.start();
+      } catch (IOException e) {
+        log.error("Failed to start node " + node.getNodeType()
+                + " Consequently will destroy the cluster");
+        druidNodes.stream().filter(node1 -> node1.isAlive()).forEach(nodeToStop -> {
+          try {
+            log.info("Stopping Node " + nodeToStop.getNodeType());
+            nodeToStop.close();
+          } catch (IOException e1) {
+            log.error("Error while stopping " + nodeToStop.getNodeType(), e1);
+          }
+        });
+        Throwables.propagate(e);
+      }
+    });
+  }
+
+  @Override
+  protected void serviceStop() throws Exception {
+    druidNodes.stream().forEach(node -> {
+      try {
+        node.close();
+      } catch (IOException e) {
+        // nothing that we can really do about it
+        log.error(String.format("Failed to stop druid node [%s]", node.getNodeType()), e);
+      }
+    });
+  }
+
+
+  public String getMetadataURI() {
+    return String.format("jdbc:derby://localhost:1527/%s/druid_derby/metadata.db",
+            dataDirectory.getAbsolutePath()
+    );
+  }
+
+  public String getDeepStorageDir() {
+    return dataDirectory.getAbsolutePath() + File.separator + "deep-storage";
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/1b3711b3/itests/qtest-spark/pom.xml
----------------------------------------------------------------------
diff --git a/itests/qtest-spark/pom.xml b/itests/qtest-spark/pom.xml
index 4b25223..72b13a1 100644
--- a/itests/qtest-spark/pom.xml
+++ b/itests/qtest-spark/pom.xml
@@ -142,6 +142,12 @@
       <groupId>org.apache.hive</groupId>
       <artifactId>hive-it-util</artifactId>
       <version>${project.version}</version>
+      <exclusions>
+        <exclusion>
+          <groupId>org.apache.hive</groupId>
+          <artifactId>hive-it-druid</artifactId>
+        </exclusion>
+      </exclusions>
       <scope>test</scope>
     </dependency>
     <dependency>

http://git-wip-us.apache.org/repos/asf/hive/blob/1b3711b3/itests/qtest/pom.xml
----------------------------------------------------------------------
diff --git a/itests/qtest/pom.xml b/itests/qtest/pom.xml
index 1ac6b30..f60ae1b 100644
--- a/itests/qtest/pom.xml
+++ b/itests/qtest/pom.xml
@@ -383,6 +383,26 @@
           </exclusion>
       </exclusions>
    </dependency>
+    <dependency>
+      <groupId>org.apache.hive</groupId>
+      <artifactId>hive-it-druid</artifactId>
+      <version>${project.version}</version>
+      <scope>test</scope>
+      <exclusions>
+        <exclusion>
+          <groupId>com.sun.jersey</groupId>
+          <artifactId>jersey-bundle</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.slf4j</groupId>
+          <artifactId>slf4j-log4j12</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>commmons-logging</groupId>
+          <artifactId>commons-logging</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
   </dependencies>
   <profiles>
     <profile>
@@ -428,7 +448,7 @@
                 <taskdef resource="net/sf/antcontrib/antcontrib.properties"
                   classpathref="maven.plugin.classpath" />
                 <mkdir dir="${project.build.directory}/qfile-results/clientpositive/" />
-                <mkdir dir="${project.build.directory}/qfile-results/clientpositive/perf" /> 
+                <mkdir dir="${project.build.directory}/qfile-results/clientpositive/perf" />
                 <mkdir dir="${project.build.directory}/qfile-results/clientnegative/" />
                 <mkdir dir="${project.build.directory}/qfile-results/clientcompare"/>
                 <mkdir dir="${project.build.directory}/qfile-results/positive/" />

http://git-wip-us.apache.org/repos/asf/hive/blob/1b3711b3/itests/qtest/src/test/java/org/apache/hadoop/hive/cli/TestMiniDruidCliDriver.java
----------------------------------------------------------------------
diff --git a/itests/qtest/src/test/java/org/apache/hadoop/hive/cli/TestMiniDruidCliDriver.java b/itests/qtest/src/test/java/org/apache/hadoop/hive/cli/TestMiniDruidCliDriver.java
new file mode 100644
index 0000000..fa75d65
--- /dev/null
+++ b/itests/qtest/src/test/java/org/apache/hadoop/hive/cli/TestMiniDruidCliDriver.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.cli;
+
+import org.apache.hadoop.hive.cli.control.CliAdapter;
+import org.apache.hadoop.hive.cli.control.CliConfigs;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestRule;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+import java.io.File;
+import java.util.List;
+
+@RunWith(Parameterized.class)
+public class TestMiniDruidCliDriver {
+
+  static CliAdapter adapter = new CliConfigs.MiniDruidCliConfig().getCliAdapter();
+
+  @Parameters(name = "{0}")
+  public static List<Object[]> getParameters() throws Exception {
+    return adapter.getParameters();
+  }
+
+  @ClassRule
+  public static TestRule cliClassRule = adapter.buildClassRule();
+
+  @Rule
+  public TestRule cliTestRule = adapter.buildTestRule();
+
+  private String name;
+  private File qfile;
+
+  public TestMiniDruidCliDriver(String name, File qfile) {
+    this.name = name;
+    this.qfile = qfile;
+  }
+
+  @Test
+  public void testCliDriver() throws Exception {
+    adapter.runTest(name, qfile);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/1b3711b3/itests/src/test/resources/testconfiguration.properties
----------------------------------------------------------------------
diff --git a/itests/src/test/resources/testconfiguration.properties b/itests/src/test/resources/testconfiguration.properties
index c910712..37079b7 100644
--- a/itests/src/test/resources/testconfiguration.properties
+++ b/itests/src/test/resources/testconfiguration.properties
@@ -1594,3 +1594,7 @@ spark.query.negative.files=groupby2_map_skew_multi_distinct.q,\
 
 spark.perf.disabled.query.files=query14.q,\
   query64.q
+
+druid.query.files=druidmini_test1.q,\
+  druidmini_test_insert.q
+

http://git-wip-us.apache.org/repos/asf/hive/blob/1b3711b3/itests/util/pom.xml
----------------------------------------------------------------------
diff --git a/itests/util/pom.xml b/itests/util/pom.xml
index d311507..abff200 100644
--- a/itests/util/pom.xml
+++ b/itests/util/pom.xml
@@ -93,6 +93,12 @@
       <groupId>org.apache.hive</groupId>
       <artifactId>hive-metastore</artifactId>
       <version>${project.version}</version>
+      <exclusions>
+        <exclusion>
+          <artifactId>guice</artifactId>
+          <groupId>com.google.inject</groupId>
+        </exclusion>
+      </exclusions>
     </dependency>
     <dependency>
       <groupId>org.apache.hive</groupId>
@@ -126,11 +132,23 @@
       <groupId>org.apache.hadoop</groupId>
       <artifactId>hadoop-common</artifactId>
       <version>${hadoop.version}</version>
+      <exclusions>
+        <exclusion>
+          <artifactId>guice</artifactId>
+          <groupId>com.google.inject</groupId>
+        </exclusion>
+      </exclusions>
     </dependency>
     <dependency>
       <groupId>org.apache.hadoop</groupId>
       <artifactId>hadoop-mapreduce-client-core</artifactId>
       <version>${hadoop.version}</version>
+      <exclusions>
+        <exclusion>
+          <artifactId>guice</artifactId>
+          <groupId>com.google.inject</groupId>
+        </exclusion>
+      </exclusions>
     </dependency>
     <dependency>
       <groupId>org.apache.hbase</groupId>
@@ -192,5 +210,24 @@
       <version>${hbase.version}</version>
       <classifier>tests</classifier>
     </dependency>
+    <dependency>
+      <groupId>org.apache.hive</groupId>
+      <artifactId>hive-it-druid</artifactId>
+      <version>${project.version}</version>
+      <exclusions>
+        <exclusion>
+          <groupId>com.sun.jersey</groupId>
+          <artifactId>jersey-bundle</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.slf4j</groupId>
+          <artifactId>slf4j-log4j12</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>commmons-logging</groupId>
+          <artifactId>commons-logging</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
   </dependencies>
 </project>

http://git-wip-us.apache.org/repos/asf/hive/blob/1b3711b3/itests/util/src/main/java/org/apache/hadoop/hive/cli/control/CliConfigs.java
----------------------------------------------------------------------
diff --git a/itests/util/src/main/java/org/apache/hadoop/hive/cli/control/CliConfigs.java b/itests/util/src/main/java/org/apache/hadoop/hive/cli/control/CliConfigs.java
index 438d296..dd6f15c 100644
--- a/itests/util/src/main/java/org/apache/hadoop/hive/cli/control/CliConfigs.java
+++ b/itests/util/src/main/java/org/apache/hadoop/hive/cli/control/CliConfigs.java
@@ -55,6 +55,7 @@ public class CliConfigs {
         excludesFrom(testConfigProps, "miniSparkOnYarn.only.query.files");
         excludesFrom(testConfigProps, "disabled.query.files");
         excludesFrom(testConfigProps, "localSpark.only.query.files");
+        excludesFrom(testConfigProps, "druid.query.files");
 
         setResultsDir("ql/src/test/results/clientpositive");
         setLogDir("itests/qtest/target/qfile-results/clientpositive");
@@ -163,6 +164,29 @@ public class CliConfigs {
     }
   }
 
+  public static class MiniDruidCliConfig extends AbstractCliConfig {
+    public MiniDruidCliConfig() {
+      super(CoreCliDriver.class);
+      try {
+        setQueryDir("ql/src/test/queries/clientpositive");
+
+        includesFrom(testConfigProps, "druid.query.files");
+
+        setResultsDir("ql/src/test/results/clientpositive/druid");
+        setLogDir("itests/qtest/target/tmp/log");
+
+        setInitScript("q_test_druid_init.sql");
+        setCleanupScript("q_test_cleanup_druid.sql");
+        setHiveConfDir("");
+        setClusterType(MiniClusterType.druid);
+        setMetastoreType(MetastoreType.sql);
+        setFsType(QTestUtil.FsType.hdfs);
+      } catch (Exception e) {
+        throw new RuntimeException("can't construct cliconfig", e);
+      }
+    }
+  }
+
   public static class MiniLlapLocalCliConfig extends AbstractCliConfig {
 
     public MiniLlapLocalCliConfig() {

http://git-wip-us.apache.org/repos/asf/hive/blob/1b3711b3/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java
----------------------------------------------------------------------
diff --git a/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java b/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java
index 36ad581..88034d7 100644
--- a/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java
+++ b/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java
@@ -96,6 +96,7 @@ import org.apache.hadoop.hive.common.io.SortAndDigestPrintStream;
 import org.apache.hadoop.hive.common.io.SortPrintStream;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hive.druid.MiniDruidCluster;
 import org.apache.hadoop.hive.llap.LlapItUtils;
 import org.apache.hadoop.hive.llap.daemon.MiniLlapCluster;
 import org.apache.hadoop.hive.llap.io.api.LlapProxy;
@@ -200,6 +201,8 @@ public class QTestUtil {
   private final String initScript;
   private final String cleanupScript;
 
+  private MiniDruidCluster druidCluster;
+
   public interface SuiteAddTestFunctor {
     public void addTestToSuite(TestSuite suite, Object setup, String tName);
   }
@@ -363,6 +366,17 @@ public class QTestUtil {
         conf.set(confEntry.getKey(), clusterSpecificConf.get(confEntry.getKey()));
       }
     }
+    if (druidCluster != null) {
+      final Path druidDeepStorage = fs.makeQualified(new Path(druidCluster.getDeepStorageDir()));
+      fs.mkdirs(druidDeepStorage);
+      conf.set("hive.druid.storage.storageDirectory", druidDeepStorage.toUri().getPath());
+      conf.set("hive.druid.metadata.db.type", "derby");
+      conf.set("hive.druid.metadata.uri", druidCluster.getMetadataURI());
+      final Path scratchDir = fs
+              .makeQualified(new Path(System.getProperty("test.tmp.dir"), "druidStagingDir"));
+      fs.mkdirs(scratchDir);
+      conf.set("hive.druid.working.directory", scratchDir.toUri().getPath());
+    }
   }
 
   private void setFsRelatedProperties(HiveConf conf, boolean isLocalFs, FileSystem fs) {
@@ -438,7 +452,8 @@ public class QTestUtil {
   private enum CoreClusterType {
     MR,
     TEZ,
-    SPARK
+    SPARK,
+    DRUID
   }
 
   public enum FsType {
@@ -456,7 +471,8 @@ public class QTestUtil {
     miniSparkOnYarn(CoreClusterType.SPARK, FsType.hdfs),
     llap(CoreClusterType.TEZ, FsType.hdfs),
     llap_local(CoreClusterType.TEZ, FsType.local),
-    none(CoreClusterType.MR, FsType.local);
+    none(CoreClusterType.MR, FsType.local),
+    druid(CoreClusterType.DRUID, FsType.hdfs);
 
 
     private final CoreClusterType coreClusterType;
@@ -491,6 +507,8 @@ public class QTestUtil {
         return llap;
       } else if (type.equals("llap_local")) {
         return llap_local;
+      } else if (type.equals("druid")) {
+      return druid;
       } else {
         return none;
       }
@@ -645,6 +663,18 @@ public class QTestUtil {
       mr = shims.getMiniSparkCluster(conf, 2, uriString, 1);
     } else if (clusterType == MiniClusterType.mr) {
       mr = shims.getMiniMrCluster(conf, 2, uriString, 1);
+    } else if (clusterType == MiniClusterType.druid) {
+      final String tempDir = System.getProperty("test.tmp.dir");
+      druidCluster = new MiniDruidCluster("mini-druid",
+              getLogDirectory(),
+              tempDir,
+              setup.zkPort,
+              Utilities.jarFinderGetJar(MiniDruidCluster.class)
+      );
+      druidCluster.init(conf);
+      final Path druidDeepStorage = fs.makeQualified(new Path(druidCluster.getDeepStorageDir()));
+      fs.mkdirs(druidDeepStorage);
+      druidCluster.start();
     }
   }
 
@@ -657,6 +687,10 @@ public class QTestUtil {
     if (clusterType.getCoreClusterType() == CoreClusterType.TEZ) {
       SessionState.get().getTezSession().destroy();
     }
+    if (druidCluster != null) {
+      druidCluster.stop();
+      druidCluster = null;
+    }
     setup.tearDown();
     if (sparkSession != null) {
       try {

http://git-wip-us.apache.org/repos/asf/hive/blob/1b3711b3/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 6d8ab5e..3287ab0 100644
--- a/pom.xml
+++ b/pom.xml
@@ -137,7 +137,7 @@
     <commons-lang3.version>3.2</commons-lang3.version>
     <commons-pool.version>1.5.4</commons-pool.version>
     <commons-dbcp.version>1.4</commons-dbcp.version>
-    <derby.version>10.10.2.0</derby.version>
+    <derby.version>10.11.1.1</derby.version>
     <dropwizard.version>3.1.0</dropwizard.version>
     <dropwizard-metrics-hadoop-metrics2-reporter.version>0.1.2</dropwizard-metrics-hadoop-metrics2-reporter.version>
     <druid.version>0.10.1</druid.version>
@@ -168,7 +168,7 @@
     <jdo-api.version>3.0.1</jdo-api.version>
     <jettison.version>1.1</jettison.version>
     <jetty.version>9.3.8.v20160314</jetty.version>
-    <jersey.version>1.14</jersey.version>
+    <jersey.version>1.19</jersey.version>
     <!-- Glassfish jersey is included for Spark client test only -->
     <glassfish.jersey.version>2.22.2</glassfish.jersey.version>
     <jline.version>2.12</jline.version>

http://git-wip-us.apache.org/repos/asf/hive/blob/1b3711b3/ql/src/test/queries/clientpositive/druidmini_test1.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/druidmini_test1.q b/ql/src/test/queries/clientpositive/druidmini_test1.q
new file mode 100644
index 0000000..630e617
--- /dev/null
+++ b/ql/src/test/queries/clientpositive/druidmini_test1.q
@@ -0,0 +1,121 @@
+CREATE TABLE druid_table
+STORED BY 'org.apache.hadoop.hive.druid.DruidStorageHandler'
+TBLPROPERTIES ("druid.segment.granularity" = "HOUR", "druid.query.granularity" = "MINUTE")
+AS
+SELECT cast (`ctimestamp1` as timestamp with local time zone) as `__time`,
+  cstring1,
+  cstring2,
+  cdouble,
+  cfloat,
+  ctinyint,
+  csmallint,
+  cint,
+  cbigint,
+  cboolean1,
+  cboolean2
+  FROM alltypesorc where ctimestamp1 IS NOT NULL;
+
+-- Time Series Query
+explain select count(*) FROM druid_table;
+SELECT count(*) FROM druid_table;
+
+
+EXPLAIN SELECT floor_year(`__time`), SUM(cfloat), SUM(cdouble), SUM(ctinyint), SUM(csmallint),SUM(cint), SUM(cbigint)
+FROM druid_table GROUP BY floor_year(`__time`);
+
+SELECT floor_year(`__time`), SUM(cfloat), SUM(cdouble), SUM(ctinyint), SUM(csmallint),SUM(cint), SUM(cbigint)
+FROM druid_table GROUP BY floor_year(`__time`);
+
+EXPLAIN SELECT floor_year(`__time`), MIN(cfloat), MIN(cdouble), MIN(ctinyint), MIN(csmallint),MIN(cint), MIN(cbigint)
+FROM druid_table GROUP BY floor_year(`__time`);
+
+SELECT floor_year(`__time`), MIN(cfloat), MIN(cdouble), MIN(ctinyint), MIN(csmallint),MIN(cint), MIN(cbigint)
+FROM druid_table GROUP BY floor_year(`__time`);
+
+
+EXPLAIN SELECT floor_year(`__time`), MAX(cfloat), MAX(cdouble), MAX(ctinyint), MAX(csmallint),MAX(cint), MAX(cbigint)
+FROM druid_table GROUP BY floor_year(`__time`);
+
+SELECT floor_year(`__time`), MAX(cfloat), MAX(cdouble), MAX(ctinyint), MAX(csmallint),MAX(cint), MAX(cbigint)
+FROM druid_table GROUP BY floor_year(`__time`);
+
+
+-- Group By
+
+
+EXPLAIN SELECT cstring1, SUM(cdouble) as s FROM druid_table GROUP BY cstring1 ORDER BY s ASC LIMIT 10;
+
+SELECT cstring1, SUM(cdouble) as s FROM druid_table GROUP BY cstring1 ORDER BY s ASC LIMIT 10;
+
+
+EXPLAIN SELECT cstring2, MAX(cdouble) FROM druid_table GROUP BY cstring2 ORDER BY cstring2 ASC LIMIT 10;
+
+SELECT cstring2, MAX(cdouble) FROM druid_table GROUP BY cstring2 ORDER BY cstring2 ASC LIMIT 10;
+
+
+-- TIME STUFF
+
+EXPLAIN
+SELECT `__time`
+FROM druid_table ORDER BY `__time` ASC LIMIT 10;
+
+SELECT `__time`
+FROM druid_table ORDER BY `__time` ASC LIMIT 10;
+
+EXPLAIN
+SELECT `__time`
+FROM druid_table
+WHERE `__time` < '1970-03-01 00:00:00' ORDER BY `__time` ASC LIMIT 10;
+
+
+SELECT `__time`
+FROM druid_table
+WHERE `__time` < '1970-03-01 00:00:00' ORDER BY `__time` ASC LIMIT 10;
+
+
+EXPLAIN
+SELECT `__time`
+FROM druid_table
+WHERE `__time` >= '1968-01-01 00:00:00' AND `__time` <= '1970-03-01 00:00:00' ORDER BY `__time` ASC LIMIT 10;
+
+
+SELECT `__time`
+FROM druid_table
+WHERE `__time` >= '1968-01-01 00:00:00' AND `__time` <= '1970-03-01 00:00:00' ORDER BY `__time` ASC LIMIT 10;
+
+
+EXPLAIN
+SELECT `__time`
+FROM druid_table
+WHERE `__time` >= '1968-01-01 00:00:00' AND `__time` <= '1970-03-01 00:00:00'
+    AND `__time` < '2011-01-01 00:00:00' ORDER BY `__time` ASC LIMIT 10;
+
+
+SELECT `__time`
+FROM druid_table
+WHERE `__time` >= '1968-01-01 00:00:00' AND `__time` <= '1970-03-01 00:00:00'
+    AND `__time` < '2011-01-01 00:00:00' ORDER BY `__time` ASC LIMIT 10;
+
+
+EXPLAIN
+SELECT `__time`
+FROM druid_table
+WHERE `__time` BETWEEN '1968-01-01 00:00:00' AND '1970-01-01 00:00:00' ORDER BY `__time` ASC LIMIT 10;;
+
+
+SELECT `__time`
+FROM druid_table
+WHERE `__time` BETWEEN '1968-01-01 00:00:00' AND '1970-01-01 00:00:00' ORDER BY `__time` ASC LIMIT 10;;
+
+
+EXPLAIN
+SELECT `__time`
+FROM druid_table
+WHERE (`__time` BETWEEN '1968-01-01 00:00:00' AND '1970-01-01 00:00:00')
+    OR (`__time` BETWEEN '1968-02-01 00:00:00' AND '1970-04-01 00:00:00') ORDER BY `__time` ASC LIMIT 10;
+
+
+SELECT `__time`
+FROM druid_table
+WHERE (`__time` BETWEEN '1968-01-01 00:00:00' AND '1970-01-01 00:00:00')
+    OR (`__time` BETWEEN '1968-02-01 00:00:00' AND '1970-04-01 00:00:00') ORDER BY `__time` ASC LIMIT 10;

http://git-wip-us.apache.org/repos/asf/hive/blob/1b3711b3/ql/src/test/queries/clientpositive/druidmini_test_insert.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/druidmini_test_insert.q b/ql/src/test/queries/clientpositive/druidmini_test_insert.q
new file mode 100644
index 0000000..558e246
--- /dev/null
+++ b/ql/src/test/queries/clientpositive/druidmini_test_insert.q
@@ -0,0 +1,53 @@
+CREATE TABLE druid_alltypesorc
+STORED BY 'org.apache.hadoop.hive.druid.DruidStorageHandler'
+TBLPROPERTIES ("druid.segment.granularity" = "HOUR", "druid.query.granularity" = "MINUTE")
+AS
+SELECT cast (`ctimestamp2` as timestamp with local time zone) as `__time`,
+  cstring1,
+  cstring2,
+  cdouble,
+  cfloat,
+  ctinyint,
+  csmallint,
+  cint,
+  cbigint,
+  cboolean1,
+  cboolean2
+  FROM alltypesorc where ctimestamp2 IS NOT NULL;
+
+SELECT COUNT(*) FROM druid_alltypesorc;
+
+INSERT INTO TABLE druid_alltypesorc
+SELECT cast (`ctimestamp1` as timestamp with local time zone) as `__time`,
+  cstring1,
+  cstring2,
+  cdouble,
+  cfloat,
+  ctinyint,
+  csmallint,
+  cint,
+  cbigint,
+  cboolean1,
+  cboolean2
+  FROM alltypesorc where ctimestamp1 IS NOT NULL;
+
+
+SELECT COUNT(*) FROM druid_alltypesorc;
+
+INSERT OVERWRITE TABLE druid_alltypesorc
+SELECT cast (`ctimestamp1` as timestamp with local time zone) as `__time`,
+  cstring1,
+  cstring2,
+  cdouble,
+  cfloat,
+  ctinyint,
+  csmallint,
+  cint,
+  cbigint,
+  cboolean1,
+  cboolean2
+  FROM alltypesorc where ctimestamp1 IS NOT NULL;
+
+SELECT COUNT(*) FROM druid_alltypesorc;
+
+DROP TABLE druid_alltypesorc;