You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by cg...@apache.org on 2020/06/25 00:23:14 UTC

[drill] branch master updated: DRILL-5956: Add Storage Plugin for Apache Druid

This is an automated email from the ASF dual-hosted git repository.

cgivre pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git


The following commit(s) were added to refs/heads/master by this push:
     new 3d6b67c  DRILL-5956: Add Storage Plugin for Apache Druid
3d6b67c is described below

commit 3d6b67ccf8bb6b5cb8d7fc1ef24c37db5210d07b
Author: akkapur <an...@gmail.com>
AuthorDate: Sat Oct 26 21:20:09 2019 -0400

    DRILL-5956: Add Storage Plugin for Apache Druid
---
 .../apache/drill/categories/DruidStorageTest.java  |  25 ++
 .../native/client/src/protobuf/UserBitShared.pb.cc |  17 +-
 .../native/client/src/protobuf/UserBitShared.pb.h  |   1 +
 contrib/pom.xml                                    |   1 +
 contrib/storage-druid/README.md                    |  59 ++++
 contrib/storage-druid/pom.xml                      |  89 ++++++
 .../store/druid/DruidCompareFunctionProcessor.java | 257 ++++++++++++++++++
 .../drill/exec/store/druid/DruidFilterBuilder.java | 192 +++++++++++++
 .../drill/exec/store/druid/DruidGroupScan.java     | 301 +++++++++++++++++++++
 .../store/druid/DruidPushDownFilterForScan.java    |  98 +++++++
 .../drill/exec/store/druid/DruidRecordReader.java  | 201 ++++++++++++++
 .../exec/store/druid/DruidScanBatchCreator.java    |  56 ++++
 .../drill/exec/store/druid/DruidScanSpec.java      |  87 ++++++
 .../exec/store/druid/DruidScanSpecBuilder.java     |  93 +++++++
 .../drill/exec/store/druid/DruidStoragePlugin.java | 102 +++++++
 .../exec/store/druid/DruidStoragePluginConfig.java |  84 ++++++
 .../drill/exec/store/druid/DruidSubScan.java       | 174 ++++++++++++
 .../exec/store/druid/common/DruidAndFilter.java    |  43 +++
 .../exec/store/druid/common/DruidBoundFilter.java  |  89 ++++++
 .../exec/store/druid/common/DruidCompareOp.java    |  73 +++++
 .../exec/store/druid/common/DruidConstants.java    |  25 ++
 .../drill/exec/store/druid/common/DruidFilter.java |  26 ++
 .../exec/store/druid/common/DruidFilterBase.java   |  34 +++
 .../druid/common/DruidFilterDeserializer.java      |  74 +++++
 .../exec/store/druid/common/DruidInFilter.java     |  49 ++++
 .../store/druid/common/DruidIntervalFilter.java    |  35 +++
 .../exec/store/druid/common/DruidNotFilter.java    |  44 +++
 .../exec/store/druid/common/DruidOrFilter.java     |  43 +++
 .../exec/store/druid/common/DruidRegexFilter.java  |  50 ++++
 .../exec/store/druid/common/DruidSearchFilter.java |  49 ++++
 .../store/druid/common/DruidSearchQuerySpec.java   |  49 ++++
 .../store/druid/common/DruidSelectorFilter.java    |  50 ++++
 .../drill/exec/store/druid/common/DruidUtils.java  |  30 ++
 .../exec/store/druid/druid/DruidDimensionSpec.java |  66 +++++
 .../druid/druid/DruidExtractionFunctionSpec.java   |  44 +++
 .../exec/store/druid/druid/DruidQueryType.java     |  24 ++
 .../store/druid/druid/DruidSelectResponse.java     |  45 +++
 .../exec/store/druid/druid/PagingIdentifier.java   |  39 +++
 .../drill/exec/store/druid/druid/PagingSpec.java   |  63 +++++
 .../drill/exec/store/druid/druid/SelectQuery.java  |  89 ++++++
 .../exec/store/druid/druid/SelectQueryBuilder.java | 108 ++++++++
 .../store/druid/druid/SimpleDatasourceInfo.java    |  44 +++
 .../druid/druid/SimpleDatasourceProperties.java    |  37 +++
 .../exec/store/druid/druid/SimpleSegmentInfo.java  |  58 ++++
 .../exec/store/druid/rest/DruidAdminClient.java    |  64 +++++
 .../exec/store/druid/rest/DruidQueryClient.java    | 108 ++++++++
 .../drill/exec/store/druid/rest/RestClient.java    |  27 ++
 .../exec/store/druid/rest/RestClientWrapper.java   |  52 ++++
 .../store/druid/schema/DruidSchemaFactory.java     | 137 ++++++++++
 .../main/resources/bootstrap-storage-plugins.json  |  11 +
 .../src/main/resources/drill-module.conf           |  24 ++
 .../exec/store/druid/DruidFilterBuilderTest.java   | 123 +++++++++
 .../exec/store/druid/DruidScanSpecBuilderTest.java | 253 +++++++++++++++++
 .../store/druid/DruidStoragePluginConfigTest.java  |  63 +++++
 .../drill/exec/store/druid/DruidTestBase.java      |  58 ++++
 .../drill/exec/store/druid/DruidTestConstants.java |  28 ++
 .../drill/exec/store/druid/DruidTestSuit.java      |  68 +++++
 .../drill/exec/store/druid/TestDataGenerator.java  | 183 +++++++++++++
 .../drill/exec/store/druid/TestDruidQueries.java   |  81 ++++++
 .../store/druid/rest/DruidQueryClientTest.java     | 112 ++++++++
 .../test/resources/bootstrap-storage-plugins.json  |  11 +
 .../src/test/resources/druid/docker-compose.yaml   | 136 ++++++++++
 .../src/test/resources/druid/environment           |  52 ++++
 .../src/test/resources/wikipedia-index.json        |  63 +++++
 distribution/pom.xml                               |  23 +-
 distribution/src/assemble/component.xml            |   1 +
 pom.xml                                            |   2 +
 .../org/apache/drill/exec/proto/UserBitShared.java |  23 +-
 protocol/src/main/protobuf/UserBitShared.proto     |   1 +
 69 files changed, 4897 insertions(+), 24 deletions(-)

diff --git a/common/src/test/java/org/apache/drill/categories/DruidStorageTest.java b/common/src/test/java/org/apache/drill/categories/DruidStorageTest.java
new file mode 100644
index 0000000..cfc9ce1
--- /dev/null
+++ b/common/src/test/java/org/apache/drill/categories/DruidStorageTest.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.categories;
+
+/**
+ * This is a category used to mark unit tests that test the Druid storage plugin.
+ */
+public interface DruidStorageTest {
+}
diff --git a/contrib/native/client/src/protobuf/UserBitShared.pb.cc b/contrib/native/client/src/protobuf/UserBitShared.pb.cc
index bac074a..6dbd625 100644
--- a/contrib/native/client/src/protobuf/UserBitShared.pb.cc
+++ b/contrib/native/client/src/protobuf/UserBitShared.pb.cc
@@ -956,7 +956,7 @@ const char descriptor_table_protodef_UserBitShared_2eproto[] PROTOBUF_SECTION_VA
   "ATEMENT\020\005*\207\001\n\rFragmentState\022\013\n\007SENDING\020\000"
   "\022\027\n\023AWAITING_ALLOCATION\020\001\022\013\n\007RUNNING\020\002\022\014"
   "\n\010FINISHED\020\003\022\r\n\tCANCELLED\020\004\022\n\n\006FAILED\020\005\022"
-  "\032\n\026CANCELLATION_REQUESTED\020\006*\212\013\n\020CoreOper"
+  "\032\n\026CANCELLATION_REQUESTED\020\006*\236\013\n\020CoreOper"
   "atorType\022\021\n\rSINGLE_SENDER\020\000\022\024\n\020BROADCAST"
   "_SENDER\020\001\022\n\n\006FILTER\020\002\022\022\n\016HASH_AGGREGATE\020"
   "\003\022\r\n\tHASH_JOIN\020\004\022\016\n\nMERGE_JOIN\020\005\022\031\n\025HASH"
@@ -991,12 +991,12 @@ const char descriptor_table_protodef_UserBitShared_2eproto[] PROTOBUF_SECTION_VA
   "MERGE\020=\022\021\n\rLTSV_SUB_SCAN\020>\022\021\n\rHDF5_SUB_S"
   "CAN\020\?\022\022\n\016EXCEL_SUB_SCAN\020@\022\020\n\014SHP_SUB_SCA"
   "N\020A\022\024\n\020METADATA_HANDLER\020B\022\027\n\023METADATA_CO"
-  "NTROLLER\020C\022\021\n\rSPSS_SUB_SCAN\020E\022\021\n\rHTTP_SU"
-  "B_SCAN\020F*g\n\nSaslStatus\022\020\n\014SASL_UNKNOWN\020\000"
-  "\022\016\n\nSASL_START\020\001\022\024\n\020SASL_IN_PROGRESS\020\002\022\020"
-  "\n\014SASL_SUCCESS\020\003\022\017\n\013SASL_FAILED\020\004B.\n\033org"
-  ".apache.drill.exec.protoB\rUserBitSharedH"
-  "\001"
+  "NTROLLER\020C\022\022\n\016DRUID_SUB_SCAN\020D\022\021\n\rSPSS_S"
+  "UB_SCAN\020E\022\021\n\rHTTP_SUB_SCAN\020F*g\n\nSaslStat"
+  "us\022\020\n\014SASL_UNKNOWN\020\000\022\016\n\nSASL_START\020\001\022\024\n\020"
+  "SASL_IN_PROGRESS\020\002\022\020\n\014SASL_SUCCESS\020\003\022\017\n\013"
+  "SASL_FAILED\020\004B.\n\033org.apache.drill.exec.p"
+  "rotoB\rUserBitSharedH\001"
   ;
 static const ::PROTOBUF_NAMESPACE_ID::internal::DescriptorTable*const descriptor_table_UserBitShared_2eproto_deps[3] = {
   &::descriptor_table_Coordination_2eproto,
@@ -1030,7 +1030,7 @@ static ::PROTOBUF_NAMESPACE_ID::internal::SCCInfoBase*const descriptor_table_Use
 static ::PROTOBUF_NAMESPACE_ID::internal::once_flag descriptor_table_UserBitShared_2eproto_once;
 static bool descriptor_table_UserBitShared_2eproto_initialized = false;
 const ::PROTOBUF_NAMESPACE_ID::internal::DescriptorTable descriptor_table_UserBitShared_2eproto = {
-  &descriptor_table_UserBitShared_2eproto_initialized, descriptor_table_protodef_UserBitShared_2eproto, "UserBitShared.proto", 5801,
+  &descriptor_table_UserBitShared_2eproto_initialized, descriptor_table_protodef_UserBitShared_2eproto, "UserBitShared.proto", 5821,
   &descriptor_table_UserBitShared_2eproto_once, descriptor_table_UserBitShared_2eproto_sccs, descriptor_table_UserBitShared_2eproto_deps, 22, 3,
   schemas, file_default_instances, TableStruct_UserBitShared_2eproto::offsets,
   file_level_metadata_UserBitShared_2eproto, 22, file_level_enum_descriptors_UserBitShared_2eproto, file_level_service_descriptors_UserBitShared_2eproto,
@@ -1266,6 +1266,7 @@ bool CoreOperatorType_IsValid(int value) {
     case 65:
     case 66:
     case 67:
+    case 68:
     case 69:
     case 70:
       return true;
diff --git a/contrib/native/client/src/protobuf/UserBitShared.pb.h b/contrib/native/client/src/protobuf/UserBitShared.pb.h
index 95cdb20..ae87641 100644
--- a/contrib/native/client/src/protobuf/UserBitShared.pb.h
+++ b/contrib/native/client/src/protobuf/UserBitShared.pb.h
@@ -390,6 +390,7 @@ enum CoreOperatorType : int {
   SHP_SUB_SCAN = 65,
   METADATA_HANDLER = 66,
   METADATA_CONTROLLER = 67,
+  DRUID_SUB_SCAN = 68,
   SPSS_SUB_SCAN = 69,
   HTTP_SUB_SCAN = 70
 };
diff --git a/contrib/pom.xml b/contrib/pom.xml
index d81f717..3fb8647 100644
--- a/contrib/pom.xml
+++ b/contrib/pom.xml
@@ -56,6 +56,7 @@
     <module>storage-kudu</module>
     <module>storage-opentsdb</module>
     <module>storage-http</module>
+    <module>storage-druid</module>
   </modules>
 
 </project>
diff --git a/contrib/storage-druid/README.md b/contrib/storage-druid/README.md
new file mode 100644
index 0000000..36575f3
--- /dev/null
+++ b/contrib/storage-druid/README.md
@@ -0,0 +1,59 @@
+# Drill Apache Druid Plugin
+
+Drill druid storage plugin allows you to perform SQL queries against Druid datasource(s).
+This storage plugin is part of [Apache Drill](https://github.com/apache/drill)
+
+### Tested with Druid version
+[0.16.0-incubating](https://github.com/apache/incubator-druid/releases/tag/druid-0.16.0-incubating)
+
+### Druid API
+
+Druid supports multiple native queries to address sundry use-cases.
+To fetch raw druid rows, druid API support two forms of query, `SELECT` (no relation to SQL) and `SCAN`.
+Currently, this plugin uses the [Select](https://druid.apache.org/docs/latest/querying/select-query.html)
+query API to fetch raw rows from druid as json.
+
+### Filter Push-Down
+
+Filters pushed down to native druid filter structure, converting SQL where clauses to the respective druid [Filters](https://druid.apache.org/docs/latest/querying/filters.html).
+
+### Plugin Registration
+
+The plugin can be registered in Apache Drill using the drill web interface by navigating to the ```storage``` page.
+Following is the default registration configuration.
+```json
+{
+  "type" : "druid",
+  "brokerAddress" : "http://localhost:8082",
+  "coordinatorAddress": "http://localhost:8081",
+  "averageRowSizeBytes": 100,
+  "enabled" : false
+}
+```
+
+### Druid storage plugin developer notes.
+
+* Building the plugin 
+
+    `mvn install -pl contrib/storage-druid`
+
+* Building DRILL
+
+    `mvn clean install -DskipTests`
+    
+* Start Drill In Embedded Mode (mac)
+
+    ```shell script
+    distribution/target/apache-drill-1.18.0-SNAPSHOT/apache-drill-1.18.0-SNAPSHOT/bin/drill-embedded
+    ```
+  
+* Starting Druid (Docker and Docker Compose required)
+    ```
+    cd contrib/storage-druid/src/test/resources/druid
+    docker-compose up -d
+    ```
+  
+  * There is an `Indexing Task Json` in the same folder as the docker compose file. It can be used to ingest the wikipedia datasource.
+  
+  * Make sure the druid storage plugin is enabled in Drill.
+
diff --git a/contrib/storage-druid/pom.xml b/contrib/storage-druid/pom.xml
new file mode 100755
index 0000000..60b8aae
--- /dev/null
+++ b/contrib/storage-druid/pom.xml
@@ -0,0 +1,89 @@
+<?xml version="1.0"?>
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>drill-contrib-parent</artifactId>
+        <groupId>org.apache.drill.contrib</groupId>
+        <version>1.18.0-SNAPSHOT</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>drill-druid-storage</artifactId>
+    <name>contrib/druid-storage-plugin</name>
+    <properties>
+        <druid.TestSuite>**/DruidTestSuit.class</druid.TestSuite>
+    </properties>
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.drill.exec</groupId>
+            <artifactId>drill-java-exec</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
+        <!-- Test dependencies -->
+        <dependency>
+            <groupId>org.apache.drill.exec</groupId>
+            <artifactId>drill-java-exec</artifactId>
+            <classifier>tests</classifier>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.drill</groupId>
+            <artifactId>drill-common</artifactId>
+            <classifier>tests</classifier>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.assertj</groupId>
+            <artifactId>assertj-core</artifactId>
+            <!-- use 2.9.1 for Java 7 projects -->
+            <version>3.11.1</version>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-surefire-plugin</artifactId>
+                <configuration>
+                    <includes>
+                        <include>${druid.TestSuite}</include>
+                    </includes>
+                    <excludes>
+                        <exclude>**/TestDruidQueries.java</exclude>
+                    </excludes>
+                    <systemProperties>
+                        <property>
+                            <name>logback.log.dir</name>
+                            <value>${project.build.directory}/surefire-reports</value>
+                        </property>
+                    </systemProperties>
+                </configuration>
+            </plugin>
+        </plugins>
+    </build>
+</project>
diff --git a/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidCompareFunctionProcessor.java b/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidCompareFunctionProcessor.java
new file mode 100755
index 0000000..0ef9615
--- /dev/null
+++ b/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidCompareFunctionProcessor.java
@@ -0,0 +1,257 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.druid;
+
+import org.apache.drill.common.FunctionNames;
+import org.apache.drill.common.expression.CastExpression;
+import org.apache.drill.common.expression.ConvertExpression;
+import org.apache.drill.common.expression.FunctionCall;
+import org.apache.drill.common.expression.LogicalExpression;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.expression.visitors.AbstractExprVisitor;
+import org.apache.drill.common.expression.ValueExpressions.BooleanExpression;
+import org.apache.drill.common.expression.ValueExpressions.DateExpression;
+import org.apache.drill.common.expression.ValueExpressions.DoubleExpression;
+import org.apache.drill.common.expression.ValueExpressions.FloatExpression;
+import org.apache.drill.common.expression.ValueExpressions.IntExpression;
+import org.apache.drill.common.expression.ValueExpressions.LongExpression;
+import org.apache.drill.common.expression.ValueExpressions.QuotedString;
+import org.apache.drill.common.expression.ValueExpressions.TimeExpression;
+
+import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableMap;
+import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableSet;
+
+public class DruidCompareFunctionProcessor extends
+  AbstractExprVisitor<Boolean, LogicalExpression, RuntimeException> {
+
+  private Object value;
+  private boolean success;
+  private boolean isEqualityFn;
+  private SchemaPath path;
+  private String functionName;
+
+  public static boolean isCompareFunction(String functionName) {
+    return COMPARE_FUNCTIONS_TRANSPOSE_MAP.containsKey(functionName);
+  }
+
+  public static DruidCompareFunctionProcessor process(FunctionCall call) {
+    String functionName = call.getName();
+    LogicalExpression nameArg = call.args().get(0);
+    LogicalExpression valueArg = call.args().size() == 2 ? call.args().get(1)
+      : null;
+    DruidCompareFunctionProcessor evaluator =
+        new DruidCompareFunctionProcessor(functionName);
+
+    if (valueArg != null) { // binary function
+      if (VALUE_EXPRESSION_CLASSES.contains(nameArg.getClass())) {
+        LogicalExpression swapArg = valueArg;
+        valueArg = nameArg;
+        nameArg = swapArg;
+        evaluator.functionName =
+            COMPARE_FUNCTIONS_TRANSPOSE_MAP.get(functionName);
+      }
+      evaluator.success = nameArg.accept(evaluator, valueArg);
+    } else if (call.args().get(0) instanceof SchemaPath) {
+      evaluator.success = true;
+      evaluator.path = (SchemaPath) nameArg;
+    }
+
+    return evaluator;
+  }
+
+  public DruidCompareFunctionProcessor(String functionName) {
+    this.success = false;
+    this.functionName = functionName;
+    this.isEqualityFn = COMPARE_FUNCTIONS_TRANSPOSE_MAP
+      .containsKey(functionName)
+      && COMPARE_FUNCTIONS_TRANSPOSE_MAP.get(functionName).equals(
+      functionName);
+  }
+
+  public Object getValue() {
+    return value;
+  }
+
+  public boolean isSuccess() {
+    return success;
+  }
+
+  public SchemaPath getPath() {
+    return path;
+  }
+
+  public String getFunctionName() {
+    return functionName;
+  }
+
+  @Override
+  public Boolean visitCastExpression(CastExpression e,
+                                     LogicalExpression valueArg) throws RuntimeException {
+    if (e.getInput() instanceof CastExpression
+      || e.getInput() instanceof SchemaPath) {
+      return e.getInput().accept(this, valueArg);
+    }
+    return false;
+  }
+
+  @Override
+  public Boolean visitConvertExpression(ConvertExpression e,
+                                        LogicalExpression valueArg) throws RuntimeException {
+    if (e.getConvertFunction().equals(ConvertExpression.CONVERT_FROM)
+      && e.getInput() instanceof SchemaPath) {
+      String encodingType = e.getEncodingType();
+      switch (encodingType) {
+        case "INT_BE":
+        case "INT":
+        case "UINT_BE":
+        case "UINT":
+        case "UINT4_BE":
+        case "UINT4":
+          if (valueArg instanceof IntExpression
+            && (isEqualityFn || encodingType.startsWith("U"))) {
+            this.value = ((IntExpression) valueArg).getInt();
+          }
+          break;
+        case "BIGINT_BE":
+        case "BIGINT":
+        case "UINT8_BE":
+        case "UINT8":
+          if (valueArg instanceof LongExpression
+            && (isEqualityFn || encodingType.startsWith("U"))) {
+            this.value = ((LongExpression) valueArg).getLong();
+          }
+          break;
+        case "FLOAT":
+          if (valueArg instanceof FloatExpression && isEqualityFn) {
+            this.value = ((FloatExpression) valueArg).getFloat();
+          }
+          break;
+        case "DOUBLE":
+          if (valueArg instanceof DoubleExpression && isEqualityFn) {
+            this.value = ((DoubleExpression) valueArg).getDouble();
+          }
+          break;
+        case "TIME_EPOCH":
+        case "TIME_EPOCH_BE":
+          if (valueArg instanceof TimeExpression) {
+            this.value = ((TimeExpression) valueArg).getTime();
+          }
+          break;
+        case "DATE_EPOCH":
+        case "DATE_EPOCH_BE":
+          if (valueArg instanceof DateExpression) {
+            this.value = ((DateExpression) valueArg).getDate();
+          }
+          break;
+        case "BOOLEAN_BYTE":
+          if (valueArg instanceof BooleanExpression) {
+            this.value = ((BooleanExpression) valueArg).getBoolean();
+          }
+          break;
+        case "UTF8":
+          // let visitSchemaPath() handle this.
+          return e.getInput().accept(this, valueArg);
+      }
+
+      if (value != null) {
+        this.path = (SchemaPath) e.getInput();
+        return true;
+      }
+    }
+    return false;
+  }
+
+  @Override
+  public Boolean visitUnknown(LogicalExpression e, LogicalExpression valueArg)
+    throws RuntimeException {
+    return false;
+  }
+
+  @Override
+  public Boolean visitSchemaPath(SchemaPath path, LogicalExpression valueArg)
+    throws RuntimeException {
+    if (valueArg instanceof QuotedString) {
+      this.value = ((QuotedString) valueArg).value;
+      this.path = path;
+      return true;
+    }
+
+    if (valueArg instanceof IntExpression) {
+      this.value = ((IntExpression) valueArg).getInt();
+      this.path = path;
+      return true;
+    }
+
+    if (valueArg instanceof LongExpression) {
+      this.value = ((LongExpression) valueArg).getLong();
+      this.path = path;
+      return true;
+    }
+
+    if (valueArg instanceof FloatExpression) {
+      this.value = ((FloatExpression) valueArg).getFloat();
+      this.path = path;
+      return true;
+    }
+
+    if (valueArg instanceof DoubleExpression) {
+      this.value = ((DoubleExpression) valueArg).getDouble();
+      this.path = path;
+      return true;
+    }
+
+    if (valueArg instanceof BooleanExpression) {
+      this.value = ((BooleanExpression) valueArg).getBoolean();
+      this.path = path;
+      return true;
+    }
+
+    return false;
+  }
+
+  private static final ImmutableSet<Class<? extends LogicalExpression>> VALUE_EXPRESSION_CLASSES;
+  static {
+    ImmutableSet.Builder<Class<? extends LogicalExpression>> builder = ImmutableSet.builder();
+    VALUE_EXPRESSION_CLASSES = builder.add(BooleanExpression.class)
+      .add(DateExpression.class).add(DoubleExpression.class)
+      .add(FloatExpression.class).add(IntExpression.class)
+      .add(LongExpression.class).add(QuotedString.class)
+      .add(TimeExpression.class).build();
+  }
+
+  private static final ImmutableMap<String, String> COMPARE_FUNCTIONS_TRANSPOSE_MAP;
+  static {
+    ImmutableMap.Builder<String, String> builder = ImmutableMap.builder();
+    COMPARE_FUNCTIONS_TRANSPOSE_MAP = builder
+      // unary functions
+      .put(FunctionNames.IS_NOT_NULL, FunctionNames.IS_NOT_NULL)
+      .put("isNotNull", "isNotNull")
+      .put("is not null", "is not null")
+      .put(FunctionNames.IS_NULL, FunctionNames.IS_NULL)
+      .put("isNull", "isNull")
+      .put("is null", "is null")
+      // binary functions
+      .put(FunctionNames.EQ, FunctionNames.EQ)
+      .put(FunctionNames.NE, FunctionNames.NE)
+      .put(FunctionNames.LIKE, FunctionNames.LIKE)
+      .put(FunctionNames.GE, FunctionNames.LE)
+      .put(FunctionNames.GT, FunctionNames.LT)
+      .put(FunctionNames.LE, FunctionNames.GE)
+      .put(FunctionNames.LT, FunctionNames.GT).build();
+  }
+}
diff --git a/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidFilterBuilder.java b/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidFilterBuilder.java
new file mode 100755
index 0000000..82ef65b
--- /dev/null
+++ b/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidFilterBuilder.java
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.druid;
+
+import org.apache.drill.common.FunctionNames;
+import org.apache.drill.common.expression.BooleanOperator;
+import org.apache.drill.common.expression.FunctionCall;
+import org.apache.drill.common.expression.LogicalExpression;
+import org.apache.drill.common.expression.visitors.AbstractExprVisitor;
+import org.apache.drill.exec.store.druid.common.DruidFilter;
+import org.apache.drill.exec.store.druid.common.DruidUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+
+public class DruidFilterBuilder extends
+  AbstractExprVisitor<DruidScanSpec, Void, RuntimeException> {
+
+  private static final Logger logger = LoggerFactory.getLogger(DruidFilterBuilder.class);
+
+  private final DruidGroupScan groupScan;
+  private final LogicalExpression le;
+  private final DruidScanSpecBuilder druidScanSpecBuilder;
+  private boolean allExpressionsConverted = true;
+
+  public DruidFilterBuilder(DruidGroupScan groupScan,
+                            LogicalExpression conditionExp) {
+    this.groupScan = groupScan;
+    this.le = conditionExp;
+    this.druidScanSpecBuilder = new DruidScanSpecBuilder();
+  }
+
+  public DruidScanSpec parseTree() {
+    logger.debug("DruidScanSpec parseTree() called.");
+    DruidScanSpec parsedSpec = le.accept(this, null);
+    if (parsedSpec != null) {
+      parsedSpec =
+          mergeScanSpecs(
+              FunctionNames.AND,
+              this.groupScan.getScanSpec(),
+              parsedSpec
+          );
+    }
+    return parsedSpec;
+  }
+
+  private DruidScanSpec mergeScanSpecs(String functionName,
+                                       DruidScanSpec leftScanSpec,
+                                       DruidScanSpec rightScanSpec) {
+    logger.debug("mergeScanSpecs called for functionName - {}", functionName);
+
+    DruidFilter newFilter = null;
+
+    switch (functionName) {
+      case FunctionNames.AND:
+        if (leftScanSpec.getFilter() != null
+            && rightScanSpec.getFilter() != null) {
+          newFilter =
+            DruidUtils
+              .andFilterAtIndex(
+                leftScanSpec.getFilter(),
+                rightScanSpec.getFilter()
+              );
+        } else if (leftScanSpec.getFilter() != null) {
+          newFilter = leftScanSpec.getFilter();
+        } else {
+          newFilter = rightScanSpec.getFilter();
+        }
+        break;
+      case FunctionNames.OR:
+        newFilter =
+          DruidUtils
+            .orFilterAtIndex(
+              leftScanSpec.getFilter(),
+              rightScanSpec.getFilter()
+            );
+    }
+
+    return new DruidScanSpec(
+      groupScan.getScanSpec().getDataSourceName(),
+      newFilter,
+      groupScan.getScanSpec().getDataSourceSize(),
+      groupScan.getScanSpec().getDataSourceMinTime(),
+      groupScan.getScanSpec().getDataSourceMaxTime());
+  }
+
+  public boolean isAllExpressionsConverted() {
+    return allExpressionsConverted;
+  }
+
+  @Override
+  public DruidScanSpec visitUnknown(LogicalExpression e, Void value)
+    throws RuntimeException {
+    allExpressionsConverted = false;
+    return null;
+  }
+
+  @Override
+  public DruidScanSpec visitBooleanOperator(BooleanOperator op, Void value) {
+    List<LogicalExpression> args = op.args();
+    DruidScanSpec nodeScanSpec = null;
+    String functionName = op.getName();
+
+    logger.debug("visitBooleanOperator Called. FunctionName - {}", functionName);
+
+    for (LogicalExpression arg : args) {
+      switch (functionName) {
+        case FunctionNames.AND:
+        case FunctionNames.OR:
+          if (nodeScanSpec == null) {
+            nodeScanSpec = arg.accept(this, null);
+          } else {
+            DruidScanSpec scanSpec = arg.accept(this, null);
+            if (scanSpec != null) {
+              nodeScanSpec = mergeScanSpecs(functionName, nodeScanSpec, scanSpec);
+            } else {
+              allExpressionsConverted = false;
+            }
+          }
+          break;
+      }
+    }
+    return nodeScanSpec;
+  }
+
+  @Override
+  public DruidScanSpec visitFunctionCall(FunctionCall call, Void value)
+    throws RuntimeException {
+    DruidScanSpec nodeScanSpec = null;
+    String functionName = call.getName();
+    List<LogicalExpression> args = call.args();
+
+    logger.debug("visitFunctionCall Called. FunctionName - {}", functionName);
+
+    if (DruidCompareFunctionProcessor.isCompareFunction(functionName)) {
+      DruidCompareFunctionProcessor processor = DruidCompareFunctionProcessor
+        .process(call);
+      if (processor.isSuccess()) {
+        DruidScanSpec scanSpec = groupScan.getScanSpec();
+        nodeScanSpec =
+          druidScanSpecBuilder
+            .build(scanSpec.getDataSourceName(),
+              scanSpec.getDataSourceSize(),
+              scanSpec.getDataSourceMinTime(),
+              scanSpec.getDataSourceMaxTime(),
+              processor.getFunctionName(),
+              processor.getPath(),
+              processor.getValue()
+            );
+      }
+    } else {
+      switch (functionName) {
+        case FunctionNames.AND:
+        case FunctionNames.OR:
+          DruidScanSpec leftScanSpec = args.get(0).accept(this, null);
+          DruidScanSpec rightScanSpec = args.get(1).accept(this, null);
+          if (leftScanSpec != null && rightScanSpec != null) {
+            nodeScanSpec =
+                mergeScanSpecs(functionName, leftScanSpec, rightScanSpec);
+          } else {
+            allExpressionsConverted = false;
+            if (FunctionNames.AND.equals(functionName)) {
+              nodeScanSpec = leftScanSpec == null ? rightScanSpec : leftScanSpec;
+            }
+          }
+          break;
+      }
+    }
+
+    if (nodeScanSpec == null) {
+      allExpressionsConverted = false;
+    }
+
+    return nodeScanSpec;
+  }
+}
diff --git a/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidGroupScan.java b/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidGroupScan.java
new file mode 100755
index 0000000..08fc56e
--- /dev/null
+++ b/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidGroupScan.java
@@ -0,0 +1,301 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.druid;
+
+import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+
+import org.apache.drill.common.PlanStringBuilder;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.physical.EndpointAffinity;
+import org.apache.drill.exec.physical.base.AbstractGroupScan;
+import org.apache.drill.exec.physical.base.GroupScan;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.base.ScanStats;
+import org.apache.drill.exec.proto.CoordinationProtos;
+import org.apache.drill.exec.store.StoragePluginRegistry;
+
+import org.apache.drill.exec.store.schedule.AffinityCreator;
+import org.apache.drill.exec.store.schedule.AssignmentCreator;
+import org.apache.drill.exec.store.schedule.CompleteWork;
+import org.apache.drill.exec.store.schedule.EndpointByteMap;
+import org.apache.drill.exec.store.schedule.EndpointByteMapImpl;
+import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
+import org.apache.drill.shaded.guava.com.google.common.collect.ListMultimap;
+import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+
+@JsonTypeName("druid-scan")
+public class DruidGroupScan extends AbstractGroupScan {
+
+  private static final Logger logger = LoggerFactory.getLogger(DruidGroupScan.class);
+  private static final long DEFAULT_TABLET_SIZE = 1000;
+  private final DruidScanSpec scanSpec;
+  private final DruidStoragePlugin storagePlugin;
+
+  private List<SchemaPath> columns;
+  private boolean filterPushedDown = false;
+  private int maxRecordsToRead;
+  private List<DruidWork> druidWorkList = new ArrayList<>();
+  private ListMultimap<Integer,DruidWork> assignments;
+  private List<EndpointAffinity> affinities;
+
+  @JsonCreator
+  public DruidGroupScan(@JsonProperty("userName") String userName,
+                        @JsonProperty("scanSpec") DruidScanSpec scanSpec,
+                        @JsonProperty("storagePluginConfig") DruidStoragePluginConfig storagePluginConfig,
+                        @JsonProperty("columns") List<SchemaPath> columns,
+                        @JsonProperty("maxRecordsToRead") int maxRecordsToRead,
+                        @JacksonInject StoragePluginRegistry pluginRegistry) {
+    this(userName,
+        pluginRegistry.resolve(storagePluginConfig, DruidStoragePlugin.class),
+        scanSpec,
+        columns,
+        maxRecordsToRead);
+  }
+
+  public DruidGroupScan(String userName,
+                        DruidStoragePlugin storagePlugin,
+                        DruidScanSpec scanSpec,
+                        List<SchemaPath> columns,
+                        int maxRecordsToRead) {
+    super(userName);
+    this.storagePlugin = storagePlugin;
+    this.scanSpec = scanSpec;
+    this.columns = columns == null || columns.size() == 0? ALL_COLUMNS : columns;
+    this.maxRecordsToRead = maxRecordsToRead;
+    init();
+  }
+
+  /**
+   * Private constructor, used for cloning.
+   * @param that The DruidGroupScan to clone
+   */
+  private DruidGroupScan(DruidGroupScan that) {
+    super(that);
+    this.columns = that.columns;
+    this.maxRecordsToRead = that.maxRecordsToRead;
+    this.scanSpec = that.scanSpec;
+    this.storagePlugin = that.storagePlugin;
+    this.filterPushedDown = that.filterPushedDown;
+    this.druidWorkList = that.druidWorkList;
+    this.assignments = that.assignments;
+  }
+
+  @Override
+  public GroupScan clone(List<SchemaPath> columns) {
+    DruidGroupScan newScan = new DruidGroupScan(this);
+    newScan.columns = columns;
+    return newScan;
+  }
+
+  public GroupScan clone(int maxRecordsToRead) {
+    DruidGroupScan newScan = new DruidGroupScan(this);
+    newScan.maxRecordsToRead = maxRecordsToRead;
+    return newScan;
+  }
+
+  @Override
+  public List<EndpointAffinity> getOperatorAffinity() {
+    if (affinities == null) {
+      affinities = AffinityCreator.getAffinityMap(druidWorkList);
+    }
+    return affinities;
+  }
+
+  @Override
+  public boolean canPushdownProjects(List<SchemaPath> columns) {
+    return true;
+  }
+
+  @Override
+  public boolean supportsLimitPushdown() {
+    return true;
+  }
+
+  @Override
+  public GroupScan applyLimit(int maxRecords) {
+    if (maxRecordsToRead == maxRecords) {
+      return null;
+    }
+    return clone(maxRecords);
+  }
+
+  @JsonIgnore
+  public boolean isFilterPushedDown() {
+    return filterPushedDown;
+  }
+
+  @JsonIgnore
+  public void setFilterPushedDown(boolean filterPushedDown) {
+    this.filterPushedDown = filterPushedDown;
+  }
+
+  private void init() {
+    logger.debug("Adding Druid Work for Table - {}. Filter - {}", getTableName(), getScanSpec().getFilter());
+
+    DruidWork druidWork =
+      new DruidWork(
+        new DruidSubScan.DruidSubScanSpec(
+          getTableName(),
+          getScanSpec().getFilter(),
+          getDatasourceSize(),
+          getDataSourceMinTime(),
+          getDataSourceMaxTime()
+        )
+      );
+    druidWorkList.add(druidWork);
+  }
+
+  private static class DruidWork implements CompleteWork {
+    private final EndpointByteMapImpl byteMap = new EndpointByteMapImpl();
+    private final DruidSubScan.DruidSubScanSpec druidSubScanSpec;
+
+    public DruidWork(DruidSubScan.DruidSubScanSpec druidSubScanSpec) {
+      this.druidSubScanSpec = druidSubScanSpec;
+    }
+
+    public DruidSubScan.DruidSubScanSpec getDruidSubScanSpec() {
+      return druidSubScanSpec;
+    }
+
+    @Override
+    public long getTotalBytes() {
+      return DEFAULT_TABLET_SIZE;
+    }
+
+    @Override
+    public EndpointByteMap getByteMap() {
+      return byteMap;
+    }
+
+    @Override
+    public int compareTo(CompleteWork o) {
+      return 0;
+    }
+  }
+
+  public ScanStats getScanStats() {
+    long recordCount = 100000 * druidWorkList.size();
+    return new ScanStats(
+        ScanStats.GroupScanProperty.NO_EXACT_ROW_COUNT,
+        recordCount,
+        1,
+        recordCount * storagePlugin.getConfig().getAverageRowSizeBytes());
+  }
+
+  @Override
+  public void applyAssignments(List<CoordinationProtos.DrillbitEndpoint> endpoints) {
+    assignments = AssignmentCreator.getMappings(endpoints, druidWorkList);
+  }
+
+  @Override
+  public DruidSubScan getSpecificScan(int minorFragmentId) {
+
+    List<DruidWork> workList = assignments.get(minorFragmentId);
+
+    List<DruidSubScan.DruidSubScanSpec> scanSpecList = Lists.newArrayList();
+    for (DruidWork druidWork : workList) {
+      scanSpecList
+        .add(
+          new DruidSubScan.DruidSubScanSpec(
+            druidWork.getDruidSubScanSpec().getDataSourceName(),
+            druidWork.getDruidSubScanSpec().getFilter(),
+            druidWork.getDruidSubScanSpec().getDataSourceSize(),
+            druidWork.getDruidSubScanSpec().getMinTime(),
+            druidWork.getDruidSubScanSpec().getMaxTime()
+          )
+        );
+    }
+
+    return new DruidSubScan(getUserName(), storagePlugin, scanSpecList, this.columns, this.maxRecordsToRead);
+  }
+
+  @JsonIgnore
+  public String getTableName() {
+    return getScanSpec().getDataSourceName();
+  }
+
+  @JsonIgnore
+  public long getDatasourceSize() {
+    return getScanSpec().getDataSourceSize();
+  }
+
+  @JsonIgnore
+  public String getDataSourceMinTime() {
+    return getScanSpec().getDataSourceMinTime();
+  }
+
+  @JsonIgnore
+  public String getDataSourceMaxTime() {
+    return getScanSpec().getDataSourceMaxTime();
+  }
+
+  @Override
+  public int getMaxParallelizationWidth() {
+    return druidWorkList.size();
+  }
+
+  @Override
+  public String getDigest() {
+    return toString();
+  }
+
+  @JsonProperty("druidScanSpec")
+  public DruidScanSpec getScanSpec() {
+    return scanSpec;
+  }
+
+  @JsonIgnore
+  public DruidStoragePlugin getStoragePlugin() {
+    return storagePlugin;
+  }
+
+  @JsonProperty
+  public List<SchemaPath> getColumns() {
+    return columns;
+  }
+
+  @JsonProperty
+  public int getMaxRecordsToRead() {
+    return maxRecordsToRead;
+  }
+
+  @Override
+  public String toString() {
+    return new PlanStringBuilder(this)
+        .field("druidScanSpec", scanSpec)
+        .field("columns", columns)
+        .field("druidStoragePlugin", storagePlugin)
+        .toString();
+  }
+
+  @Override
+  @JsonIgnore
+  public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) {
+    Preconditions.checkArgument(children.isEmpty());
+    return new DruidGroupScan(this);
+  }
+}
diff --git a/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidPushDownFilterForScan.java b/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidPushDownFilterForScan.java
new file mode 100755
index 0000000..6e313c6
--- /dev/null
+++ b/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidPushDownFilterForScan.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.druid;
+
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.rex.RexNode;
+import org.apache.drill.common.expression.LogicalExpression;
+import org.apache.drill.exec.planner.logical.DrillOptiq;
+import org.apache.drill.exec.planner.logical.DrillParseContext;
+import org.apache.drill.exec.planner.logical.RelOptHelper;
+import org.apache.drill.exec.planner.physical.FilterPrel;
+import org.apache.drill.exec.planner.physical.PrelUtil;
+import org.apache.drill.exec.planner.physical.ScanPrel;
+import org.apache.drill.exec.store.StoragePluginOptimizerRule;
+import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableList;
+
+public class DruidPushDownFilterForScan extends StoragePluginOptimizerRule {
+
+  public static final StoragePluginOptimizerRule INSTANCE = new DruidPushDownFilterForScan();
+
+  private DruidPushDownFilterForScan() {
+    super(
+      RelOptHelper.some(FilterPrel.class, RelOptHelper.any(ScanPrel.class)),
+      "DruidPushDownFilterForScan");
+  }
+
+  @Override
+  public void onMatch(RelOptRuleCall relOptRuleCall) {
+    final ScanPrel scan = relOptRuleCall.rel(1);
+    final FilterPrel filter = relOptRuleCall.rel(0);
+    final RexNode condition = filter.getCondition();
+
+    DruidGroupScan groupScan = (DruidGroupScan) scan.getGroupScan();
+    if (groupScan.isFilterPushedDown()) {
+      return;
+    }
+
+    LogicalExpression conditionExp =
+      DrillOptiq.toDrill(
+        new DrillParseContext(PrelUtil.getPlannerSettings(relOptRuleCall.getPlanner())),
+        scan,
+        condition);
+
+    DruidFilterBuilder druidFilterBuilder =
+      new DruidFilterBuilder(groupScan, conditionExp);
+
+    DruidScanSpec newScanSpec = druidFilterBuilder.parseTree();
+    if (newScanSpec == null) {
+      return; // no filter pushdown so nothing to apply.
+    }
+
+    DruidGroupScan newGroupsScan =
+        new DruidGroupScan(
+            groupScan.getUserName(),
+            groupScan.getStoragePlugin(),
+            newScanSpec,
+            groupScan.getColumns(),
+            groupScan.getMaxRecordsToRead());
+    newGroupsScan.setFilterPushedDown(true);
+
+    final ScanPrel newScanPrel = ScanPrel.create(scan, filter.getTraitSet(),
+      newGroupsScan, scan.getRowType());
+    if (druidFilterBuilder.isAllExpressionsConverted()) {
+      /*
+       * Since we could convert the entire filter condition expression into a
+       * Druid filter, we can eliminate the filter operator altogether.
+       */
+      relOptRuleCall.transformTo(newScanPrel);
+    } else {
+      relOptRuleCall.transformTo(filter.copy(filter.getTraitSet(),
+        ImmutableList.of(newScanPrel)));
+    }
+  }
+
+  @Override
+  public boolean matches(RelOptRuleCall call) {
+    final ScanPrel scan = call.rel(1);
+    if (scan.getGroupScan() instanceof DruidGroupScan) {
+      return super.matches(call);
+    }
+    return false;
+  }
+}
diff --git a/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidRecordReader.java b/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidRecordReader.java
new file mode 100755
index 0000000..c2bac59
--- /dev/null
+++ b/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidRecordReader.java
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.druid;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.OperatorContext;
+import org.apache.drill.exec.physical.impl.OutputMutator;
+import org.apache.drill.exec.store.AbstractRecordReader;
+import org.apache.drill.exec.store.druid.common.DruidFilter;
+import org.apache.drill.exec.store.druid.druid.DruidSelectResponse;
+import org.apache.drill.exec.store.druid.druid.PagingIdentifier;
+import org.apache.drill.exec.store.druid.druid.PagingSpec;
+import org.apache.drill.exec.store.druid.druid.SelectQuery;
+import org.apache.drill.exec.store.druid.druid.SelectQueryBuilder;
+import org.apache.drill.exec.store.druid.rest.DruidQueryClient;
+import org.apache.drill.exec.vector.BaseValueVector;
+import org.apache.drill.exec.vector.complex.fn.JsonReader;
+import org.apache.drill.exec.vector.complex.impl.VectorContainerWriter;
+import org.apache.drill.shaded.guava.com.google.common.base.Stopwatch;
+import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableList;
+import org.apache.drill.shaded.guava.com.google.common.collect.Sets;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+public class DruidRecordReader extends AbstractRecordReader {
+
+  private static final Logger logger = LoggerFactory.getLogger(DruidRecordReader.class);
+  private static final ObjectMapper objectMapper = new ObjectMapper();
+  private final DruidStoragePlugin plugin;
+  private final DruidSubScan.DruidSubScanSpec scanSpec;
+  private final List<String> dimensions;
+  private final DruidFilter filter;
+  private ArrayList<PagingIdentifier> pagingIdentifiers = new ArrayList<>();
+  private int maxRecordsToRead = -1;
+
+  private JsonReader jsonReader;
+  private VectorContainerWriter writer;
+
+  private final FragmentContext fragmentContext;
+  private final DruidQueryClient druidQueryClient;
+
+  public DruidRecordReader(DruidSubScan.DruidSubScanSpec subScanSpec,
+                           List<SchemaPath> projectedColumns,
+                           int maxRecordsToRead,
+                           FragmentContext context,
+                           DruidStoragePlugin plugin) {
+    dimensions = new ArrayList<>();
+    setColumns(projectedColumns);
+    this.maxRecordsToRead = maxRecordsToRead;
+    this.plugin = plugin;
+    scanSpec = subScanSpec;
+    fragmentContext = context;
+    this.filter = subScanSpec.getFilter();
+    this.druidQueryClient = plugin.getDruidQueryClient();
+  }
+
+  @Override
+  protected Collection<SchemaPath> transformColumns(Collection<SchemaPath> projectedColumns) {
+    Set<SchemaPath> transformed = Sets.newLinkedHashSet();
+    if (isStarQuery()) {
+      transformed.add(SchemaPath.STAR_COLUMN);
+    } else {
+      for (SchemaPath column : projectedColumns) {
+        String fieldName = column.getRootSegment().getPath();
+        transformed.add(column);
+        this.dimensions.add(fieldName);
+      }
+    }
+    return transformed;
+  }
+
+  @Override
+  public void setup(OperatorContext context, OutputMutator output) {
+    this.writer = new VectorContainerWriter(output);
+
+    this.jsonReader =
+      new JsonReader.Builder(fragmentContext.getManagedBuffer())
+        .schemaPathColumns(ImmutableList.copyOf(getColumns()))
+        .skipOuterList(true)
+        .build();
+  }
+
+  @Override
+  public int next() {
+    writer.allocate();
+    writer.reset();
+    Stopwatch watch = Stopwatch.createStarted();
+    try {
+      String query = getQuery();
+      DruidSelectResponse druidSelectResponse = druidQueryClient.executeQuery(query);
+      setNextPagingIdentifiers(druidSelectResponse);
+
+      int docCount = 0;
+      for (ObjectNode eventNode : druidSelectResponse.getEvents()) {
+        writer.setPosition(docCount);
+        jsonReader.setSource(eventNode);
+        try {
+          jsonReader.write(writer);
+        } catch (IOException e) {
+          throw UserException
+            .dataReadError(e)
+            .message("Failure while reading document")
+            .addContext("Failed Query", query)
+            .addContext("Parser was at record", eventNode.toString())
+            .addContext(e.getMessage())
+            .build(logger);
+        }
+        docCount++;
+      }
+
+      writer.setValueCount(docCount);
+      logger.debug("Took {} ms to get {} records", watch.elapsed(TimeUnit.MILLISECONDS), docCount);
+      return docCount;
+    } catch (Exception e) {
+      throw UserException
+        .dataReadError(e)
+        .message("Failure while executing druid query")
+        .addContext(e.getMessage())
+        .build(logger);
+    }
+  }
+
+  private String getQuery() throws JsonProcessingException {
+    int queryThreshold =
+      this.maxRecordsToRead >= 0
+        ? Math.min(BaseValueVector.INITIAL_VALUE_ALLOCATION, this.maxRecordsToRead)
+        : BaseValueVector.INITIAL_VALUE_ALLOCATION;
+    SelectQueryBuilder selectQueryBuilder = plugin.getSelectQueryBuilder();
+    SelectQuery selectQuery =
+      selectQueryBuilder.build(
+        scanSpec.dataSourceName,
+        this.dimensions,
+        this.filter,
+        new PagingSpec(this.pagingIdentifiers, queryThreshold),
+        scanSpec.getMinTime(),
+        scanSpec.getMaxTime()
+      );
+    return objectMapper.writeValueAsString(selectQuery);
+  }
+
+  private void setNextPagingIdentifiers(DruidSelectResponse druidSelectResponse) {
+    ArrayList<PagingIdentifier> newPagingIdentifiers = druidSelectResponse.getPagingIdentifiers();
+
+    Map<String, String> newPagingIdentifierNames =
+      newPagingIdentifiers
+        .stream()
+        .distinct()
+        .collect(Collectors.toMap(PagingIdentifier::getSegmentName, PagingIdentifier::getSegmentName));
+
+    for (PagingIdentifier previousPagingIdentifier : this.pagingIdentifiers) {
+      if (!newPagingIdentifierNames.containsKey(previousPagingIdentifier.getSegmentName())) {
+        newPagingIdentifiers.add(
+          new PagingIdentifier(
+            previousPagingIdentifier.getSegmentName(),
+            previousPagingIdentifier.getSegmentOffset() + 1)
+        );
+      }
+    }
+    this.pagingIdentifiers = newPagingIdentifiers;
+  }
+
+  @Override
+  public void close() throws Exception {
+    if (writer != null) {
+      writer.close();
+    }
+    if (pagingIdentifiers != null) {
+      pagingIdentifiers.clear();
+    }
+    jsonReader = null;
+  }
+}
diff --git a/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidScanBatchCreator.java b/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidScanBatchCreator.java
new file mode 100755
index 0000000..98ae5a3
--- /dev/null
+++ b/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidScanBatchCreator.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.druid;
+
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.ops.ExecutorFragmentContext;
+import org.apache.drill.exec.physical.impl.BatchCreator;
+import org.apache.drill.exec.physical.impl.ScanBatch;
+import org.apache.drill.exec.record.CloseableRecordBatch;
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.store.RecordReader;
+import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
+import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+
+public class DruidScanBatchCreator implements BatchCreator<DruidSubScan> {
+
+  private static final Logger logger = LoggerFactory.getLogger(DruidScanBatchCreator.class);
+
+  @Override
+  public CloseableRecordBatch getBatch(ExecutorFragmentContext context, DruidSubScan subScan, List<RecordBatch> children) throws ExecutionSetupException {
+    Preconditions.checkArgument(children.isEmpty());
+    List<RecordReader> readers = Lists.newArrayList();
+    List<SchemaPath> columns;
+
+    for (DruidSubScan.DruidSubScanSpec scanSpec : subScan.getScanSpec()) {
+      try {
+        columns = subScan.getColumns();
+        readers.add(new DruidRecordReader(scanSpec, columns, subScan.getMaxRecordsToRead(), context, subScan.getStorageEngine()));
+      } catch (Exception ex) {
+        throw new ExecutionSetupException(ex);
+      }
+    }
+    logger.debug("Number of record readers initialized - {}", readers.size());
+    return new ScanBatch(subScan, context, readers);
+  }
+}
diff --git a/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidScanSpec.java b/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidScanSpec.java
new file mode 100755
index 0000000..dcd7431
--- /dev/null
+++ b/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidScanSpec.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.druid;
+
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.drill.common.PlanStringBuilder;
+import org.apache.drill.exec.store.druid.common.DruidFilter;
+
+public class DruidScanSpec {
+
+  private final String dataSourceName;
+  private final long dataSourceSize;
+  private final String dataSourceMinTime;
+  private final String dataSourceMaxTime;
+  private DruidFilter filter;
+
+  @JsonCreator
+  public DruidScanSpec(@JsonProperty("dataSourceName") String dataSourceName,
+                       @JsonProperty("dataSourceSize") long dataSourceSize,
+                       @JsonProperty("dataSourceMinTime") String dataSourceMinTime,
+                       @JsonProperty("dataSourceMaxTime") String dataSourceMaxTime) {
+    this.dataSourceName = dataSourceName;
+    this.dataSourceSize = dataSourceSize;
+    this.dataSourceMinTime = dataSourceMinTime;
+    this.dataSourceMaxTime = dataSourceMaxTime;
+  }
+
+  public DruidScanSpec(String dataSourceName,
+                       DruidFilter filter,
+                       long dataSourceSize,
+                       String dataSourceMinTime,
+                       String dataSourceMaxTime) {
+    this.dataSourceName = dataSourceName;
+    this.dataSourceSize = dataSourceSize;
+    this.dataSourceMinTime = dataSourceMinTime;
+    this.dataSourceMaxTime = dataSourceMaxTime;
+    this.filter = filter;
+  }
+
+  public String getDataSourceName() {
+    return this.dataSourceName;
+  }
+
+  public long getDataSourceSize() {
+    return dataSourceSize;
+  }
+
+  public String getDataSourceMinTime() {
+    return dataSourceMinTime;
+  }
+
+  public String getDataSourceMaxTime() {
+    return dataSourceMaxTime;
+  }
+
+  public DruidFilter getFilter() {
+    return this.filter;
+  }
+
+  @Override
+  public String toString() {
+    return new PlanStringBuilder(this)
+      .field("dataSourceName", dataSourceName)
+      .field("", dataSourceSize)
+      .field("", dataSourceMinTime)
+      .field("", dataSourceMaxTime)
+      .field("filter", filter)
+      .toString();
+  }
+}
diff --git a/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidScanSpecBuilder.java b/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidScanSpecBuilder.java
new file mode 100644
index 0000000..e901ab6
--- /dev/null
+++ b/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidScanSpecBuilder.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.druid;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.drill.common.FunctionNames;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.store.druid.common.DruidBoundFilter;
+import org.apache.drill.exec.store.druid.common.DruidConstants;
+import org.apache.drill.exec.store.druid.common.DruidFilter;
+import org.apache.drill.exec.store.druid.common.DruidIntervalFilter;
+import org.apache.drill.exec.store.druid.common.DruidNotFilter;
+import org.apache.drill.exec.store.druid.common.DruidRegexFilter;
+import org.apache.drill.exec.store.druid.common.DruidSearchFilter;
+import org.apache.drill.exec.store.druid.common.DruidSearchQuerySpec;
+import org.apache.drill.exec.store.druid.common.DruidSelectorFilter;
+
+public class DruidScanSpecBuilder {
+
+  private static final String REGEX_KEYWORD_HINT = "$regex$_";
+
+  DruidScanSpec build(String dataSourceName,
+                      long dataSourceSize,
+                      String maxTime,
+                      String minTime,
+                      String functionName,
+                      SchemaPath field,
+                      Object fieldValue) {
+    String fieldName = field.getAsNamePart().getName();
+    DruidFilter filter = translateFilter(functionName, fieldName, String.valueOf(fieldValue));
+    return (filter == null) ? null : new DruidScanSpec(dataSourceName, filter, dataSourceSize, maxTime, minTime);
+  }
+
+  private DruidFilter translateFilter(String functionName, String fieldName, String fieldValue) {
+    switch (functionName) {
+      case FunctionNames.EQ: {
+        if (fieldName.equalsIgnoreCase(DruidConstants.INTERVAL_DIMENSION_NAME)) {
+          return new DruidIntervalFilter(fieldValue);
+        } else {
+          return new DruidSelectorFilter(fieldName, fieldValue);
+        }
+      }
+      case FunctionNames.NE: {
+        DruidSelectorFilter druidSelectorFilter = new DruidSelectorFilter(fieldName, fieldValue);
+        return new DruidNotFilter(druidSelectorFilter);
+      }
+      case FunctionNames.GE: {
+        return new DruidBoundFilter(fieldName, fieldValue, null, null, null);
+      }
+      case FunctionNames.GT: {
+        return new DruidBoundFilter(fieldName, fieldValue, null, true, null);
+      }
+      case FunctionNames.LE: {
+        return new DruidBoundFilter(fieldName, null, fieldValue, null, null);
+      }
+      case FunctionNames.LT: {
+        return new DruidBoundFilter(fieldName, null, fieldValue, null, true);
+      }
+      case FunctionNames.IS_NULL: {
+        return new DruidSelectorFilter(fieldName, null);
+      }
+      case FunctionNames.IS_NOT_NULL: {
+        DruidSelectorFilter druidSelectorFilter = new DruidSelectorFilter(fieldName, null);
+        return new DruidNotFilter(druidSelectorFilter);
+      }
+      case FunctionNames.LIKE: {
+        if (fieldValue.startsWith(REGEX_KEYWORD_HINT) ) {
+          return new DruidRegexFilter(fieldName, fieldValue.substring(REGEX_KEYWORD_HINT.length()));
+        } else {
+          String fieldValueNormalized = StringUtils.removeStart(StringUtils.removeEnd(fieldValue, "%"), "%");
+          return new DruidSearchFilter(fieldName, new DruidSearchQuerySpec(fieldValueNormalized, false));
+        }
+      }
+      default:
+        return null;
+    }
+  }
+}
diff --git a/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidStoragePlugin.java b/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidStoragePlugin.java
new file mode 100755
index 0000000..1006197
--- /dev/null
+++ b/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidStoragePlugin.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.druid;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.calcite.schema.SchemaPlus;
+import org.apache.drill.common.JSONOptions;
+import org.apache.drill.exec.ops.OptimizerRulesContext;
+import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.store.AbstractStoragePlugin;
+import org.apache.drill.exec.store.SchemaConfig;
+import org.apache.drill.exec.store.StoragePluginOptimizerRule;
+import org.apache.drill.exec.store.druid.druid.SelectQueryBuilder;
+import org.apache.drill.exec.store.druid.rest.DruidAdminClient;
+import org.apache.drill.exec.store.druid.rest.DruidQueryClient;
+import org.apache.drill.exec.store.druid.rest.RestClient;
+import org.apache.drill.exec.store.druid.rest.RestClientWrapper;
+import org.apache.drill.exec.store.druid.schema.DruidSchemaFactory;
+import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableSet;
+
+import java.io.IOException;
+import java.util.Set;
+
+public class DruidStoragePlugin extends AbstractStoragePlugin {
+  private final DrillbitContext context;
+  private final DruidStoragePluginConfig pluginConfig;
+  private final DruidAdminClient druidAdminClient;
+  private final DruidQueryClient druidQueryClient;
+  private final DruidSchemaFactory schemaFactory;
+  private final SelectQueryBuilder selectQueryBuilder;
+
+  public DruidStoragePlugin(DruidStoragePluginConfig pluginConfig, DrillbitContext context, String name) {
+    super(context, name);
+    this.pluginConfig = pluginConfig;
+    this.context = context;
+    RestClient restClient = new RestClientWrapper();
+    this.druidAdminClient = new DruidAdminClient(pluginConfig.getCoordinatorAddress(), restClient);
+    this.druidQueryClient = new DruidQueryClient(pluginConfig.getBrokerAddress(), restClient);
+    this.schemaFactory = new DruidSchemaFactory(this, name);
+    this.selectQueryBuilder = new SelectQueryBuilder();
+  }
+
+  @Override
+  public DruidGroupScan getPhysicalScan(String userName, JSONOptions selection) throws IOException {
+    DruidScanSpec scanSpec = selection.getListWith(new ObjectMapper(), new TypeReference<DruidScanSpec>() {});
+    return new DruidGroupScan(userName, this, scanSpec, null, -1);
+  }
+
+  @Override
+  public Set<StoragePluginOptimizerRule> getPhysicalOptimizerRules(
+    OptimizerRulesContext optimizerRulesContext) {
+    return ImmutableSet.of(DruidPushDownFilterForScan.INSTANCE);
+  }
+
+  @Override
+  public void registerSchemas(SchemaConfig schemaConfig, SchemaPlus parent) {
+    schemaFactory.registerSchemas(schemaConfig, parent);
+  }
+
+  @Override
+  public boolean supportsRead() {
+    return true;
+  }
+
+  @Override
+  public boolean supportsWrite() {
+    return false;
+  }
+
+  @Override
+  public DruidStoragePluginConfig getConfig() {
+    return pluginConfig;
+  }
+
+  public DrillbitContext getContext() {
+    return this.context;
+  }
+
+  public DruidAdminClient getAdminClient() {
+    return this.druidAdminClient;
+  }
+
+  public DruidQueryClient getDruidQueryClient() { return this.druidQueryClient; }
+
+  public SelectQueryBuilder getSelectQueryBuilder() { return selectQueryBuilder; }
+}
diff --git a/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidStoragePluginConfig.java b/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidStoragePluginConfig.java
new file mode 100755
index 0000000..1477117
--- /dev/null
+++ b/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidStoragePluginConfig.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.druid;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import org.apache.drill.common.logical.StoragePluginConfigBase;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Objects;
+
+@JsonTypeName(DruidStoragePluginConfig.NAME)
+public class DruidStoragePluginConfig extends StoragePluginConfigBase {
+
+  private static final Logger logger = LoggerFactory.getLogger(DruidStoragePluginConfig.class);
+  public static final String NAME = "druid";
+  private static final int DEFAULT_AVERAGE_ROW_SIZE_BYTES = 100;
+
+  private final String brokerAddress;
+  private final String coordinatorAddress;
+  private final int averageRowSizeBytes;
+
+  @JsonCreator
+  public DruidStoragePluginConfig(
+    @JsonProperty("brokerAddress") String brokerAddress,
+    @JsonProperty("coordinatorAddress") String coordinatorAddress,
+    @JsonProperty("averageRowSizeBytes") Integer averageRowSizeBytes) {
+
+    this.brokerAddress = brokerAddress;
+    this.coordinatorAddress = coordinatorAddress;
+    this.averageRowSizeBytes =
+        averageRowSizeBytes == null ? DEFAULT_AVERAGE_ROW_SIZE_BYTES : averageRowSizeBytes;
+    logger.debug(
+        "Broker Address - {}, Coordinator Address - {}, averageRowSizeBytes - {}",
+        brokerAddress,
+        coordinatorAddress,
+        averageRowSizeBytes
+    );
+  }
+
+  @Override
+  public boolean equals(Object that) {
+    if (this == that) {
+      return true;
+    } else if (that == null || getClass() != that.getClass()) {
+      return false;
+    }
+    DruidStoragePluginConfig thatConfig = (DruidStoragePluginConfig) that;
+    return Objects.equals(brokerAddress, thatConfig.brokerAddress) &&
+           Objects.equals(coordinatorAddress, thatConfig.coordinatorAddress);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(brokerAddress, coordinatorAddress);
+  }
+
+  public String getBrokerAddress() {
+    return brokerAddress;
+  }
+
+  public String getCoordinatorAddress() {
+    return coordinatorAddress;
+  }
+
+  public int getAverageRowSizeBytes() { return averageRowSizeBytes; }
+}
diff --git a/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidSubScan.java b/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidSubScan.java
new file mode 100755
index 0000000..e8beb3d
--- /dev/null
+++ b/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidSubScan.java
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.druid;
+
+import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import org.apache.drill.common.PlanStringBuilder;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.logical.StoragePluginConfig;
+import org.apache.drill.exec.physical.base.AbstractBase;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.base.PhysicalVisitor;
+import org.apache.drill.exec.physical.base.SubScan;
+import org.apache.drill.exec.proto.UserBitShared.CoreOperatorType;
+import org.apache.drill.exec.store.StoragePluginRegistry;
+import org.apache.drill.exec.store.druid.common.DruidFilter;
+import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
+
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+
+import static java.util.Collections.emptyIterator;
+
+/**
+ * A Class containing information to read a single druid data source.
+ */
+@JsonTypeName("druid-datasource-scan")
+public class DruidSubScan extends AbstractBase implements SubScan {
+  @JsonIgnore
+  private final DruidStoragePlugin druidStoragePlugin;
+  private final List<DruidSubScanSpec> scanSpec;
+  private final List<SchemaPath> columns;
+  private final int maxRecordsToRead;
+
+  @JsonCreator
+  public DruidSubScan(@JacksonInject StoragePluginRegistry registry,
+                      @JsonProperty("userName") String userName,
+                      @JsonProperty("config") StoragePluginConfig config,
+                      @JsonProperty("scanSpec") LinkedList<DruidSubScanSpec> datasourceScanSpecList,
+                      @JsonProperty("columns") List<SchemaPath> columns,
+                      @JsonProperty("maxRecordsToRead") int maxRecordsToRead) {
+    super(userName);
+    druidStoragePlugin = registry.resolve(config, DruidStoragePlugin.class);
+    this.scanSpec = datasourceScanSpecList;
+    this.columns = columns;
+    this.maxRecordsToRead = maxRecordsToRead;
+  }
+
+  public DruidSubScan(String userName,
+                      DruidStoragePlugin plugin,
+                      List<DruidSubScanSpec> dataSourceInfoList,
+                      List<SchemaPath> columns,
+                      int maxRecordsToRead) {
+    super(userName);
+    this.druidStoragePlugin = plugin;
+    this.scanSpec = dataSourceInfoList;
+    this.columns = columns;
+    this.maxRecordsToRead = maxRecordsToRead;
+  }
+
+  @Override
+  public <T, X, E extends Throwable> T accept(PhysicalVisitor<T, X, E> physicalVisitor, X value) throws E {
+    return physicalVisitor.visitSubScan(this, value);
+  }
+
+  @JsonIgnore
+  public List<DruidSubScanSpec> getScanSpec() {
+    return scanSpec;
+  }
+
+  public List<SchemaPath> getColumns() {
+    return columns;
+  }
+
+  public int getMaxRecordsToRead() { return maxRecordsToRead; }
+
+  @JsonIgnore
+  @Override
+  public boolean isExecutable() {
+    return false;
+  }
+
+  @JsonIgnore
+  public DruidStoragePlugin getStorageEngine(){
+    return druidStoragePlugin;
+  }
+
+  @Override
+  public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) {
+    Preconditions.checkArgument(children.isEmpty());
+    return new DruidSubScan(getUserName(), druidStoragePlugin, scanSpec, columns, maxRecordsToRead);
+  }
+
+  @JsonIgnore
+  @Override
+  public int getOperatorType() {
+    return CoreOperatorType.DRUID_SUB_SCAN_VALUE;
+  }
+
+  @Override
+  public Iterator<PhysicalOperator> iterator() {
+    return emptyIterator();
+  }
+
+  public static class DruidSubScanSpec {
+
+    protected final String dataSourceName;
+    protected final DruidFilter filter;
+    protected final long dataSourceSize;
+    protected final String maxTime;
+    protected final String minTime;
+
+    @JsonCreator
+    public DruidSubScanSpec(@JsonProperty("dataSourceName") String dataSourceName,
+                            @JsonProperty("filter") DruidFilter filter,
+                            @JsonProperty("dataSourceSize") long dataSourceSize,
+                            @JsonProperty("minTime") String minTime,
+                            @JsonProperty("maxTime") String maxTime) {
+      this.dataSourceName = dataSourceName;
+      this.filter = filter;
+      this.dataSourceSize = dataSourceSize;
+      this.minTime = minTime;
+      this.maxTime = maxTime;
+    }
+
+    public String getDataSourceName() {
+      return dataSourceName;
+    }
+
+    public DruidFilter getFilter() { return filter; }
+
+    public long getDataSourceSize() {
+      return dataSourceSize;
+    }
+
+    public String getMinTime() {
+      return minTime;
+    }
+
+    public String getMaxTime() {
+      return maxTime;
+    }
+
+    @Override
+    public String toString() {
+      return new PlanStringBuilder(this)
+        .field("dataSourceName", dataSourceName)
+        .field("filter", filter)
+        .field("dataSourceSize", dataSourceSize)
+        .field("minTime", minTime)
+        .field("maxTime", maxTime)
+        .toString();
+    }
+  }
+}
diff --git a/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/common/DruidAndFilter.java b/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/common/DruidAndFilter.java
new file mode 100755
index 0000000..d66b22f
--- /dev/null
+++ b/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/common/DruidAndFilter.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.druid.common;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonPropertyOrder;
+import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+
+import java.util.List;
+
+@JsonDeserialize(as = DruidAndFilter.class)
+@JsonPropertyOrder({ "type", "fields" })
+public class DruidAndFilter extends DruidFilterBase {
+  private final String type = DruidCompareOp.AND.getCompareOp();
+  private final List<DruidFilter> fields;
+
+  public DruidAndFilter(@JsonProperty("fields") List<DruidFilter> fields) {
+    this.fields = fields;
+  }
+
+  public String getType() {
+    return type;
+  }
+
+  public List<DruidFilter> getFields() {
+    return fields;
+  }
+}
diff --git a/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/common/DruidBoundFilter.java b/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/common/DruidBoundFilter.java
new file mode 100755
index 0000000..6ce76ca
--- /dev/null
+++ b/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/common/DruidBoundFilter.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.druid.common;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonPropertyOrder;
+import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import org.apache.commons.lang3.StringUtils;
+
+@JsonDeserialize(as = DruidBoundFilter.class)
+@JsonInclude(JsonInclude.Include.NON_NULL)
+@JsonPropertyOrder({ "type", "dimension", "lower", "upper", "lowerStrict", "upperStrict", "ordering" })
+public class DruidBoundFilter extends DruidFilterBase {
+  private final String type = DruidCompareOp.TYPE_BOUND.getCompareOp();
+  private final String dimension;
+  private final String lower;
+  private final String upper;
+  private final Boolean lowerStrict;
+  private final Boolean upperStrict;
+  private final String ordering;
+
+  @JsonCreator
+  public DruidBoundFilter(@JsonProperty("dimension") String dimension,
+                          @JsonProperty("lower") String lower,
+                          @JsonProperty("upper") String upper,
+                          @JsonProperty("lowerStrict") Boolean lowerStrict,
+                          @JsonProperty("upperStrict") Boolean upperStrict) {
+    this.dimension = dimension;
+    this.lower = lower;
+    this.upper= upper;
+    this.lowerStrict = lowerStrict;
+    this.upperStrict = upperStrict;
+    this.ordering = getCompareOrdering();
+  }
+
+  public String getType() {
+    return type;
+  }
+
+  public String getDimension() {
+    return dimension;
+  }
+
+  public String getLower() {
+    return lower;
+  }
+
+  public String getUpper() {
+    return upper;
+  }
+
+  public Boolean getLowerStrict() {
+    return lowerStrict;
+  }
+
+  public Boolean getUpperStrict() {
+    return upperStrict;
+  }
+
+  public String getOrdering() { return ordering; }
+
+  @JsonIgnore
+  private String getCompareOrdering() {
+    if (StringUtils.isNotEmpty(lower) && StringUtils.isNumeric(lower)
+        || StringUtils.isNotEmpty(upper) && StringUtils.isNumeric(upper)) {
+      return "numeric";
+    } else {
+      return "lexicographic";
+    }
+  }
+}
diff --git a/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/common/DruidCompareOp.java b/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/common/DruidCompareOp.java
new file mode 100755
index 0000000..44689c6
--- /dev/null
+++ b/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/common/DruidCompareOp.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.druid.common;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+public enum DruidCompareOp {
+  EQUAL("$eq"),
+  NOT_EQUAL("$ne"),
+  GREATER_OR_EQUAL("$gte"),
+  GREATER("$gt"),
+  LESS_OR_EQUAL("$lte"),
+  LESS("$lt"),
+  IN("$in"),
+  AND("and"),
+  OR("or"),
+  NOT("not"),
+  OPTIONS("$options"),
+  PROJECT("$project"),
+  COND("$cond"),
+  IFNULL("$ifNull"),
+  IFNOTNULL("$ifNotNull"),
+  SUM("$sum"),
+  GROUP_BY("$group"),
+  EXISTS("$exists"),
+  TYPE_SELECTOR("selector"),
+  TYPE_IN("in"),
+  TYPE_REGEX("regex"),
+  TYPE_SEARCH("search"),
+  TYPE_SEARCH_CONTAINS("contains"),
+  TYPE_SEARCH_CASESENSITIVE("caseSensitive"),
+  TYPE_BOUND("bound");
+  private final String compareOp;
+
+  private static final Map<String, DruidCompareOp> ENUM_MAP;
+
+  static {
+    Map<String, DruidCompareOp> map = new ConcurrentHashMap<>();
+    for (DruidCompareOp instance : DruidCompareOp.values()) {
+      map.put(instance.getCompareOp(),instance);
+    }
+    ENUM_MAP = Collections.unmodifiableMap(map);
+  }
+
+  DruidCompareOp(String compareOp) {
+    this.compareOp = compareOp;
+  }
+
+  public String getCompareOp() {
+    return compareOp;
+  }
+
+  public static DruidCompareOp get (String name) {
+    return ENUM_MAP.get(name);
+  }
+}
diff --git a/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/common/DruidConstants.java b/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/common/DruidConstants.java
new file mode 100644
index 0000000..c1c99ea
--- /dev/null
+++ b/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/common/DruidConstants.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.druid.common;
+
+public final class DruidConstants {
+  public static final String INTERVAL_DIMENSION_NAME = "eventInterval";
+  public static final String ISO_8601_DATE_STRING_FORMAT = "yyyy-MM-dd'T'HH:mm:ss.SSSZ";
+  public static final String DRUID_TIME_DIMENSIONS = "__time";
+}
diff --git a/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/common/DruidFilter.java b/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/common/DruidFilter.java
new file mode 100644
index 0000000..3aa583c
--- /dev/null
+++ b/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/common/DruidFilter.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.druid.common;
+
+import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+
+@JsonDeserialize(using = DruidFilterDeserializer.class)
+public interface DruidFilter {
+  String toJson();
+}
diff --git a/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/common/DruidFilterBase.java b/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/common/DruidFilterBase.java
new file mode 100755
index 0000000..481357b
--- /dev/null
+++ b/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/common/DruidFilterBase.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.druid.common;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+public class DruidFilterBase implements DruidFilter {
+  private static final ObjectMapper objectMapper = new ObjectMapper();
+
+  public String toJson() {
+    try {
+      return objectMapper.writeValueAsString(this);
+    } catch (JsonProcessingException e) {
+      e.printStackTrace();
+      return "";
+    }
+  }
+}
diff --git a/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/common/DruidFilterDeserializer.java b/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/common/DruidFilterDeserializer.java
new file mode 100644
index 0000000..bf8c9f7
--- /dev/null
+++ b/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/common/DruidFilterDeserializer.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.druid.common;
+
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.databind.DeserializationContext;
+import com.fasterxml.jackson.databind.JsonDeserializer;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+
+import java.io.IOException;
+
+import static org.apache.drill.exec.store.druid.common.DruidCompareOp.TYPE_SELECTOR;
+
+public class DruidFilterDeserializer extends JsonDeserializer<DruidFilter> {
+  @Override
+  public DruidFilter deserialize(JsonParser jp, DeserializationContext context) throws
+      IOException {
+    ObjectMapper mapper = (ObjectMapper) jp.getCodec();
+    ObjectNode root = mapper.readTree(jp);
+
+    boolean filterKnowsItsType = root.has("type");
+
+    if (filterKnowsItsType) {
+      DruidCompareOp type = DruidCompareOp.get(root.get("type").asText());
+      switch (type) {
+        case TYPE_SELECTOR: {
+          return mapper.readValue(root.toString(), DruidSelectorFilter.class);
+        }
+        case TYPE_BOUND: {
+          return mapper.readValue(root.toString(), DruidBoundFilter.class);
+        }
+        case TYPE_IN: {
+          return mapper.readValue(root.toString(), DruidInFilter.class);
+        }
+        case TYPE_REGEX: {
+          return mapper.readValue(root.toString(), DruidRegexFilter.class);
+        }
+        case TYPE_SEARCH: {
+          return mapper.readValue(root.toString(), DruidSearchFilter.class);
+        }
+        case AND: {
+          return mapper.readValue(root.toString(), DruidAndFilter.class);
+        }
+        case OR: {
+          return mapper.readValue(root.toString(), DruidOrFilter.class);
+        }
+        case NOT: {
+          return mapper.readValue(root.toString(), DruidNotFilter.class);
+        }
+      }
+    }
+    if (filterKnowsItsType && root.get("type").asText().equals(TYPE_SELECTOR.getCompareOp())) {
+      return mapper.readValue(root.toString(), DruidSelectorFilter.class);
+    }
+    return mapper.readValue(root.toString(), DruidSelectorFilter.class);
+  }
+}
diff --git a/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/common/DruidInFilter.java b/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/common/DruidInFilter.java
new file mode 100755
index 0000000..be04991
--- /dev/null
+++ b/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/common/DruidInFilter.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.druid.common;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+
+@JsonDeserialize(as = DruidInFilter.class)
+public class DruidInFilter extends DruidFilterBase {
+
+  private final String type = DruidCompareOp.TYPE_IN.getCompareOp();
+  private final String dimension;
+  private final String[] values;
+
+  @JsonCreator
+  public DruidInFilter(@JsonProperty("dimension") String dimension,
+                       @JsonProperty("values") String[] values) {
+    this.dimension = dimension;
+    this.values = values;
+  }
+
+  public String getType() {
+    return type;
+  }
+
+  public String getDimension() {
+    return dimension;
+  }
+
+  public String[] getValues() {
+    return values;
+  }
+}
diff --git a/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/common/DruidIntervalFilter.java b/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/common/DruidIntervalFilter.java
new file mode 100755
index 0000000..bbdd077
--- /dev/null
+++ b/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/common/DruidIntervalFilter.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.druid.common;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+public class DruidIntervalFilter extends DruidFilterBase {
+
+  private final String eventInterval;
+
+  @JsonCreator
+  public DruidIntervalFilter(@JsonProperty("eventInterval") String eventInterval) {
+    this.eventInterval = eventInterval;
+  }
+
+  public String getEventInterval() {
+    return eventInterval;
+  }
+}
diff --git a/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/common/DruidNotFilter.java b/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/common/DruidNotFilter.java
new file mode 100755
index 0000000..2f79882
--- /dev/null
+++ b/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/common/DruidNotFilter.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.druid.common;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonPropertyOrder;
+import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+
+@JsonDeserialize(as = DruidNotFilter.class)
+@JsonPropertyOrder({ "type", "field" })
+public class DruidNotFilter extends DruidFilterBase {
+
+  private final String type = DruidCompareOp.NOT.getCompareOp();
+  private final DruidFilter field;
+
+  @JsonCreator
+  public DruidNotFilter(@JsonProperty("field") DruidFilter field) {
+    this.field = field;
+  }
+
+  public String getType() {
+    return type;
+  }
+
+  public DruidFilter getField() {
+    return field;
+  }
+}
diff --git a/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/common/DruidOrFilter.java b/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/common/DruidOrFilter.java
new file mode 100755
index 0000000..f00a640
--- /dev/null
+++ b/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/common/DruidOrFilter.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.druid.common;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonPropertyOrder;
+import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+
+import java.util.List;
+
+@JsonDeserialize(as = DruidOrFilter.class)
+@JsonPropertyOrder({ "type", "fields" })
+public class DruidOrFilter extends DruidFilterBase {
+  private final String type = DruidCompareOp.OR.getCompareOp();
+  private final List<DruidFilter> fields;
+
+  public DruidOrFilter(@JsonProperty("fields") List<DruidFilter> fields) {
+    this.fields = fields;
+  }
+
+  public String getType() {
+    return type;
+  }
+
+  public List<DruidFilter> getFields() {
+    return fields;
+  }
+}
diff --git a/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/common/DruidRegexFilter.java b/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/common/DruidRegexFilter.java
new file mode 100755
index 0000000..00d6872
--- /dev/null
+++ b/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/common/DruidRegexFilter.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.druid.common;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonPropertyOrder;
+import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+
+@JsonDeserialize(as = DruidRegexFilter.class)
+@JsonPropertyOrder({ "type", "dimension", "pattern" })
+public class DruidRegexFilter extends DruidFilterBase {
+  private final String type = DruidCompareOp.TYPE_REGEX.getCompareOp();
+  private final String dimension;
+  private final String pattern;
+
+  @JsonCreator
+  public DruidRegexFilter(@JsonProperty("dimension") String dimension,
+                          @JsonProperty("pattern") String pattern) {
+    this.dimension = dimension;
+    this.pattern = pattern;
+  }
+
+  public String getType() {
+    return type;
+  }
+
+  public String getDimension() {
+    return dimension;
+  }
+
+  public String getPattern() {
+    return pattern;
+  }
+}
diff --git a/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/common/DruidSearchFilter.java b/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/common/DruidSearchFilter.java
new file mode 100755
index 0000000..d042a6f
--- /dev/null
+++ b/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/common/DruidSearchFilter.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.druid.common;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonPropertyOrder;
+import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+
+@JsonDeserialize(as = DruidSearchFilter.class)
+@JsonPropertyOrder({ "type", "dimension", "query" })
+public class DruidSearchFilter extends DruidFilterBase {
+
+  private final String type = DruidCompareOp.TYPE_SEARCH.getCompareOp();
+  private final String dimension;
+  private final DruidSearchQuerySpec query;
+
+  @JsonCreator
+  public DruidSearchFilter(@JsonProperty("dimension") String dimension,
+                            @JsonProperty("query") DruidSearchQuerySpec query) {
+    this.dimension = dimension;
+    this.query = query;
+  }
+
+  public String getType() {
+    return type;
+  }
+
+  public String getDimension() {
+    return dimension;
+  }
+
+  public DruidSearchQuerySpec getQuery() { return query; }
+}
diff --git a/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/common/DruidSearchQuerySpec.java b/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/common/DruidSearchQuerySpec.java
new file mode 100644
index 0000000..59309fd
--- /dev/null
+++ b/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/common/DruidSearchQuerySpec.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.druid.common;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonPropertyOrder;
+
+@JsonPropertyOrder({ "type", "value", "caseSensitive" })
+public class DruidSearchQuerySpec {
+  private final String type = DruidCompareOp.TYPE_SEARCH_CONTAINS.getCompareOp();
+  private final String value;
+  private final boolean caseSensitive;
+
+  @JsonCreator
+  public DruidSearchQuerySpec(@JsonProperty("value") String value,
+                              @JsonProperty("caseSensitive") boolean caseSensitive) {
+    this.value = value;
+    this.caseSensitive = caseSensitive;
+  }
+
+  public String getType() {
+    return type;
+  }
+
+  public String getValue() {
+    return value;
+  }
+
+  public boolean isCaseSensitive() {
+    return caseSensitive;
+  }
+}
diff --git a/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/common/DruidSelectorFilter.java b/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/common/DruidSelectorFilter.java
new file mode 100755
index 0000000..fe8f887
--- /dev/null
+++ b/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/common/DruidSelectorFilter.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.druid.common;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonPropertyOrder;
+import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+
+@JsonDeserialize(as = DruidSelectorFilter.class)
+@JsonPropertyOrder({ "type", "dimension", "value" })
+public class DruidSelectorFilter extends DruidFilterBase {
+  private final String type = DruidCompareOp.TYPE_SELECTOR.getCompareOp();
+  private final String dimension;
+  private final String value;
+
+  @JsonCreator
+  public DruidSelectorFilter(@JsonProperty("dimension") String dimension,
+                             @JsonProperty("value") String value) {
+    this.dimension = dimension;
+    this.value = value;
+  }
+
+  public String getType() {
+    return type;
+  }
+
+  public String getDimension() {
+    return dimension;
+  }
+
+  public String getValue() {
+    return value;
+  }
+}
diff --git a/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/common/DruidUtils.java b/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/common/DruidUtils.java
new file mode 100755
index 0000000..54bda5a
--- /dev/null
+++ b/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/common/DruidUtils.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.druid.common;
+
+import java.util.Arrays;
+
+public class DruidUtils {
+  public static DruidFilter andFilterAtIndex(DruidFilter leftFilter, DruidFilter rightFilter) {
+    return new DruidAndFilter(Arrays.asList(leftFilter, rightFilter));
+  }
+
+  public static DruidFilter orFilterAtIndex(DruidFilter leftFilter, DruidFilter rightFilter) {
+    return new DruidOrFilter(Arrays.asList(leftFilter, rightFilter));
+  }
+}
diff --git a/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/druid/DruidDimensionSpec.java b/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/druid/DruidDimensionSpec.java
new file mode 100644
index 0000000..a9e9cc1
--- /dev/null
+++ b/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/druid/DruidDimensionSpec.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.druid.druid;
+
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonPropertyOrder;
+import org.apache.commons.lang3.StringUtils;
+
+@JsonPropertyOrder({ "type", "dimension", "outputName", "outputType", "extractionFn" })
+@JsonInclude(JsonInclude.Include.NON_NULL)
+public class DruidDimensionSpec {
+  private final String type;
+  private final String dimension;
+  private final String outputName;
+  private final String outputType;
+  private final DruidExtractionFunctionSpec extractionFn;
+
+  public DruidDimensionSpec(@JsonProperty("type") String type,
+                            @JsonProperty("dimension") String dimension,
+                            @JsonProperty("outputName") String outputName,
+                            @JsonProperty("outputName") String outputType,
+                            @JsonProperty("extractionFn") DruidExtractionFunctionSpec extractionFn) {
+    this.type = StringUtils.isEmpty(type) ? "default" : type;
+    this.dimension = dimension;
+    this.outputName = outputName;
+    this.outputType = outputType;
+    this.extractionFn = extractionFn;
+  }
+
+  public String getType() {
+    return type;
+  }
+
+  public String getDimension() {
+    return dimension;
+  }
+
+  public String getOutputName() {
+    return outputName;
+  }
+
+  public String getOutputType() {
+    return outputType;
+  }
+
+  public DruidExtractionFunctionSpec getExtractionFn() {
+    return extractionFn;
+  }
+}
diff --git a/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/druid/DruidExtractionFunctionSpec.java b/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/druid/DruidExtractionFunctionSpec.java
new file mode 100644
index 0000000..bf11bb6
--- /dev/null
+++ b/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/druid/DruidExtractionFunctionSpec.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.druid.druid;
+
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonPropertyOrder;
+
+@JsonPropertyOrder({ "type", "format" })
+@JsonInclude(JsonInclude.Include.NON_NULL)
+public class DruidExtractionFunctionSpec {
+  private final String type;
+  private final String format;
+
+  public DruidExtractionFunctionSpec(@JsonProperty("type") String type,
+                                     @JsonProperty("format") String format) {
+    this.type = type;
+    this.format = format;
+  }
+
+  public String getType() {
+    return type;
+  }
+
+  public String getFormat() {
+    return format;
+  }
+}
diff --git a/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/druid/DruidQueryType.java b/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/druid/DruidQueryType.java
new file mode 100644
index 0000000..c760c67
--- /dev/null
+++ b/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/druid/DruidQueryType.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.druid.druid;
+
+public enum DruidQueryType {
+  SELECT,
+  SCAN
+}
diff --git a/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/druid/DruidSelectResponse.java b/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/druid/DruidSelectResponse.java
new file mode 100755
index 0000000..dd52b17
--- /dev/null
+++ b/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/druid/DruidSelectResponse.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.druid.druid;
+
+import com.fasterxml.jackson.databind.node.ObjectNode;
+
+import java.util.ArrayList;
+
+public class DruidSelectResponse {
+
+  private ArrayList<ObjectNode> events = new ArrayList<>();
+  private ArrayList<PagingIdentifier> pagingIdentifiers = new ArrayList<>();
+
+
+  public ArrayList<ObjectNode> getEvents() {
+    return events;
+  }
+
+  public void setEvents(ArrayList<ObjectNode> events) {
+    this.events = events;
+  }
+
+  public ArrayList<PagingIdentifier> getPagingIdentifiers() {
+    return pagingIdentifiers;
+  }
+
+  public void setPagingIdentifiers(ArrayList<PagingIdentifier> pagingIdentifiers) {
+    this.pagingIdentifiers = pagingIdentifiers;
+  }
+}
diff --git a/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/druid/PagingIdentifier.java b/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/druid/PagingIdentifier.java
new file mode 100755
index 0000000..bd5bf0c
--- /dev/null
+++ b/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/druid/PagingIdentifier.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.druid.druid;
+
+public class PagingIdentifier {
+
+  private final String segmentName;
+  private final int segmentOffset;
+
+  public PagingIdentifier(String segmentName, int segmentOffset) {
+    this.segmentName = segmentName;
+    this.segmentOffset = segmentOffset;
+  }
+
+  public String getSegmentName()
+  {
+    return segmentName;
+  }
+
+  public int getSegmentOffset()
+  {
+    return segmentOffset;
+  }
+}
diff --git a/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/druid/PagingSpec.java b/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/druid/PagingSpec.java
new file mode 100755
index 0000000..e186aa8
--- /dev/null
+++ b/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/druid/PagingSpec.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.druid.druid;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.commons.collections.CollectionUtils;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+public class PagingSpec {
+
+  @JsonProperty
+  private final boolean fromNext = true;
+
+  @JsonProperty
+  private final Map<String, Integer> pagingIdentifiers;
+
+  @JsonProperty
+  private int threshold;
+
+  public PagingSpec(List<PagingIdentifier> pagingIdentifiers, int threshold) {
+    this.pagingIdentifiers =
+      CollectionUtils.isEmpty(pagingIdentifiers)
+        ? new HashMap<>()
+        : pagingIdentifiers.stream().collect(
+        Collectors.toMap(PagingIdentifier::getSegmentName, PagingIdentifier::getSegmentOffset));
+    this.threshold = threshold;
+  }
+
+  public Map<String, Integer> getPagingIdentifiers() {
+    return pagingIdentifiers;
+  }
+
+  public int getThreshold() {
+    return threshold;
+  }
+
+  public void setThreshold(int threshold) {
+    this.threshold = threshold;
+  }
+
+  public boolean getFromNext() {
+    return fromNext;
+  }
+}
diff --git a/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/druid/SelectQuery.java b/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/druid/SelectQuery.java
new file mode 100755
index 0000000..4b2c2af
--- /dev/null
+++ b/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/druid/SelectQuery.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.druid.druid;
+
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonPropertyOrder;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+
+import java.util.List;
+
+@JsonPropertyOrder({ "queryType", "dataSource", "descending", "dimensions", "filter", "granularity", "intervals", "pagingSpec" })
+@JsonInclude(JsonInclude.Include.NON_NULL)
+public class SelectQuery {
+
+  @JsonProperty
+  private static final String granularity = "all";
+
+  @JsonProperty
+  private static final String queryType = "select";
+
+  @JsonProperty
+  private static final boolean descending = false;
+
+  private final String dataSource;
+  private final List<DruidDimensionSpec> dimensions;
+  private final ObjectNode filter;
+  private final List<String> intervals;
+  private final PagingSpec pagingSpec;
+
+  public SelectQuery(@JsonProperty("dataSource") String dataSource,
+                     @JsonProperty("dimensions") List<DruidDimensionSpec> dimensions,
+                     @JsonProperty("filter") ObjectNode filter,
+                     @JsonProperty("intervals") List<String> intervals,
+                     @JsonProperty("pagingSpec") PagingSpec pagingSpec) {
+    this.dataSource = dataSource;
+    this.dimensions = dimensions;
+    this.filter = filter;
+    this.intervals = intervals;
+    this.pagingSpec = pagingSpec;
+  }
+
+  public String getQueryType() {
+    return queryType;
+  }
+
+  public boolean isDescending() {
+    return descending;
+  }
+
+  public String getDataSource() {
+    return dataSource;
+  }
+
+  public List<DruidDimensionSpec> getDimensions() {
+    return dimensions;
+  }
+
+  public String getGranularity() {
+    return granularity;
+  }
+
+  public List<String> getIntervals() {
+    return intervals;
+  }
+
+  public PagingSpec getPagingSpec() {
+    return pagingSpec;
+  }
+
+  public ObjectNode getFilter() {
+    return filter;
+  }
+}
diff --git a/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/druid/SelectQueryBuilder.java b/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/druid/SelectQueryBuilder.java
new file mode 100644
index 0000000..6d040d5
--- /dev/null
+++ b/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/druid/SelectQueryBuilder.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.druid.druid;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.drill.exec.store.druid.common.DruidConstants;
+import org.apache.drill.exec.store.druid.common.DruidFilter;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.apache.drill.exec.store.druid.common.DruidConstants.INTERVAL_DIMENSION_NAME;
+
+public class SelectQueryBuilder {
+  private static final ObjectMapper objectMapper = new ObjectMapper();
+
+  public SelectQueryBuilder() {}
+
+  public SelectQuery build(String datasource,
+                           List<String> dimensions,
+                           DruidFilter druidFilter,
+                           PagingSpec pagingSpec,
+                           String minTime,
+                           String maxTime)
+    throws JsonProcessingException {
+    List<JsonNode> userInputIntervals =
+      druidFilter == null
+        ? new ArrayList<>()
+        : parseIntervalsFromFilter(druidFilter.toJson());
+    List<String> queryIntervals = getQueryIntervals(userInputIntervals, minTime, maxTime);
+    ObjectNode finalFilter =
+      druidFilter == null
+        ? null
+        : (ObjectNode) objectMapper.readTree(removeIntervalFilter(druidFilter.toJson(), userInputIntervals));
+
+    return new SelectQuery(
+      datasource,getDimensionsAsSpec(dimensions),
+      finalFilter,
+      queryIntervals,
+      pagingSpec);
+  }
+
+  private List<DruidDimensionSpec> getDimensionsAsSpec(List<String> columns) {
+    return columns.stream().map(column -> {
+      String type = StringUtils.equalsAnyIgnoreCase(column, DruidConstants.DRUID_TIME_DIMENSIONS) ? "extraction" : "default";
+      DruidExtractionFunctionSpec extractionFunctionSpec =
+        StringUtils.equalsAnyIgnoreCase(column, DruidConstants.DRUID_TIME_DIMENSIONS) ? getTimeExtractionFunction() : null;
+      return new DruidDimensionSpec(type, column, column, "STRING", extractionFunctionSpec);
+    }).collect(Collectors.toList());
+  }
+
+  private DruidExtractionFunctionSpec getTimeExtractionFunction() {
+    return new DruidExtractionFunctionSpec("timeFormat", DruidConstants.ISO_8601_DATE_STRING_FORMAT);
+  }
+
+  private List<JsonNode> parseIntervalsFromFilter(String filter)
+      throws JsonProcessingException {
+    //if the filter is on the special Interval Dimension, then use it for the interval.
+    JsonNode filterNode = objectMapper.readTree(filter);
+    return filterNode.findValues(INTERVAL_DIMENSION_NAME);
+  }
+
+  private List<String> getQueryIntervals(List<JsonNode> userInputIntervals,
+                                         String minTime,
+                                         String maxTime) {
+    if (userInputIntervals.isEmpty()) {
+      String interval = String.format("%s/%s", minTime, maxTime);
+      return Stream.of(interval).collect(Collectors.toList());
+    }
+
+    JsonNode firstIntervalNode = userInputIntervals.get(0);
+    String interval = firstIntervalNode.asText();
+    return Stream.of(interval).collect(Collectors.toList());
+  }
+
+  private String removeIntervalFilter(String filter, List<JsonNode> userInputIntervals) {
+    for (JsonNode intervalNode : userInputIntervals) {
+      String interval = intervalNode.asText();
+      String intervalSubString1 = ",{\"eventInterval\":\"" + interval + "\"}";
+      String intervalSubString2 = "{\"eventInterval\":\"" + interval + "\"},";
+      filter = filter.replace(intervalSubString1, "");
+      filter = filter.replace(intervalSubString2, "");
+    }
+    return filter;
+  }
+}
diff --git a/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/druid/SimpleDatasourceInfo.java b/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/druid/SimpleDatasourceInfo.java
new file mode 100644
index 0000000..877dea4
--- /dev/null
+++ b/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/druid/SimpleDatasourceInfo.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.druid.druid;
+
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+@JsonInclude(JsonInclude.Include.NON_NULL)
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class SimpleDatasourceInfo {
+  private final String name;
+  private final SimpleDatasourceProperties properties;
+
+  public SimpleDatasourceInfo(@JsonProperty("name") String name,
+                              @JsonProperty("properties") SimpleDatasourceProperties properties) {
+    this.name = name;
+    this.properties = properties;
+  }
+
+  public String getName() {
+    return name;
+  }
+
+  public SimpleDatasourceProperties getProperties() {
+    return properties;
+  }
+}
diff --git a/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/druid/SimpleDatasourceProperties.java b/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/druid/SimpleDatasourceProperties.java
new file mode 100644
index 0000000..a9f445a
--- /dev/null
+++ b/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/druid/SimpleDatasourceProperties.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.druid.druid;
+
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+@JsonInclude(JsonInclude.Include.NON_NULL)
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class SimpleDatasourceProperties {
+  private final SimpleSegmentInfo segments;
+
+  public SimpleDatasourceProperties(@JsonProperty("segments") SimpleSegmentInfo segments) {
+    this.segments = segments;
+  }
+
+  public SimpleSegmentInfo getSegments() {
+    return segments;
+  }
+}
diff --git a/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/druid/SimpleSegmentInfo.java b/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/druid/SimpleSegmentInfo.java
new file mode 100644
index 0000000..884960c
--- /dev/null
+++ b/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/druid/SimpleSegmentInfo.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.druid.druid;
+
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+@JsonInclude(JsonInclude.Include.NON_NULL)
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class SimpleSegmentInfo {
+  private final String maxTime;
+  private final long size;
+  private final String minTime;
+  private final int count;
+
+  public SimpleSegmentInfo(@JsonProperty("maxTime") String maxTime,
+                           @JsonProperty("size") long size,
+                           @JsonProperty("minTime") String minTime,
+                           @JsonProperty("count") int count) {
+    this.maxTime = maxTime;
+    this.size = size;
+    this.minTime = minTime;
+    this.count = count;
+  }
+
+  public String getMaxTime() {
+    return maxTime;
+  }
+
+  public String getMinTime() {
+    return minTime;
+  }
+
+  public long getSize() {
+    return size;
+  }
+
+  public int getCount() {
+    return count;
+  }
+}
diff --git a/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/rest/DruidAdminClient.java b/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/rest/DruidAdminClient.java
new file mode 100755
index 0000000..bbfd336
--- /dev/null
+++ b/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/rest/DruidAdminClient.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.druid.rest;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.exec.store.druid.druid.SimpleDatasourceInfo;
+import org.apache.http.HttpResponse;
+import org.apache.http.HttpStatus;
+import org.apache.http.util.EntityUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.List;
+
+public class DruidAdminClient {
+  private static final Logger logger = LoggerFactory.getLogger(DruidAdminClient.class);
+
+  private static final String DATASOURCES_BASE_URI = "/druid/coordinator/v1/datasources?simple";
+  private static final String DEFAULT_ENCODING = "UTF-8";
+  private static final ObjectMapper mapper = new ObjectMapper();
+
+  private final String coordinatorAddress;
+  private final RestClient restClient;
+
+  public DruidAdminClient(String coordinatorAddress, RestClient restClient) {
+    this.coordinatorAddress = coordinatorAddress;
+    this.restClient = restClient;
+  }
+
+  public List<SimpleDatasourceInfo> getDataSources() throws IOException {
+    String url = this.coordinatorAddress + DATASOURCES_BASE_URI;
+    HttpResponse response = restClient.get(url);
+
+    if (response.getStatusLine().getStatusCode() != HttpStatus.SC_OK) {
+      throw UserException
+        .dataReadError()
+        .message("Error getting druid datasources. HTTP request failed")
+        .addContext("Response code", response.getStatusLine().getStatusCode())
+        .addContext("Response message", response.getStatusLine().getReasonPhrase())
+        .build(logger);
+    }
+
+    String responseJson = EntityUtils.toString(response.getEntity(), DEFAULT_ENCODING);
+    return mapper.readValue(responseJson, new TypeReference<List<SimpleDatasourceInfo>>(){});
+  }
+}
diff --git a/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/rest/DruidQueryClient.java b/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/rest/DruidQueryClient.java
new file mode 100755
index 0000000..714c79c
--- /dev/null
+++ b/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/rest/DruidQueryClient.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.druid.rest;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.exec.store.druid.druid.DruidSelectResponse;
+import org.apache.drill.exec.store.druid.druid.PagingIdentifier;
+import org.apache.http.HttpResponse;
+import org.apache.http.HttpStatus;
+import org.apache.http.util.EntityUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.Map;
+
+public class DruidQueryClient {
+
+  private static final Logger logger = LoggerFactory.getLogger(DruidQueryClient.class);
+
+  private static final String QUERY_BASE_URI = "/druid/v2";
+  private static final ObjectMapper mapper = new ObjectMapper();
+
+  private final RestClient restClient;
+  private final String queryUrl;
+
+  public DruidQueryClient(String brokerURI, RestClient restClient) {
+    queryUrl = brokerURI + QUERY_BASE_URI;
+    this.restClient = restClient;
+    logger.debug("Initialized DruidQueryClient with druidURL - {}", this.queryUrl);
+  }
+
+  public DruidSelectResponse executeQuery(String query) throws Exception {
+    logger.debug("Executing Query - {}", query);
+    HttpResponse response = restClient.post(queryUrl, query);
+
+    if (response.getStatusLine().getStatusCode() != HttpStatus.SC_OK) {
+      throw UserException
+          .dataReadError()
+          .message("Error executing druid query. HTTP request failed")
+          .addContext("Response code", response.getStatusLine().getStatusCode())
+          .addContext("Response message", response.getStatusLine().getReasonPhrase())
+          .build(logger);
+    }
+
+    String data = EntityUtils.toString(response.getEntity());
+    ArrayNode responses = mapper.readValue(data, ArrayNode.class);
+    return parseResponse(responses);
+  }
+
+  private DruidSelectResponse parseResponse(ArrayNode responses) {
+    ArrayList<ObjectNode> events = new ArrayList<>();
+    ObjectNode pagingIdentifiersNode = null;
+
+    if (responses.size() > 0) {
+      ObjectNode firstNode = (ObjectNode) responses.get(0);
+      ObjectNode resultNode = (ObjectNode) firstNode.get("result");
+      pagingIdentifiersNode = (ObjectNode) resultNode.get("pagingIdentifiers");
+      ArrayNode eventsNode = (ArrayNode) resultNode.get("events");
+      for(int i=0;i < eventsNode.size(); i++) {
+        ObjectNode eventNode = (ObjectNode) eventsNode.get(i).get("event");
+        events.add(eventNode);
+      }
+    }
+
+    ArrayList<PagingIdentifier> pagingIdentifierList = new ArrayList<>();
+    if (pagingIdentifiersNode != null) {
+      for (Iterator<Map.Entry<String, JsonNode>> iterator = pagingIdentifiersNode.fields(); iterator.hasNext();) {
+        Map.Entry<String, JsonNode> currentNode = iterator.next();
+        if (currentNode != null) {
+          String segmentName = currentNode.getKey();
+          int segmentOffset = currentNode.getValue().asInt();
+          PagingIdentifier pagingIdentifier = new PagingIdentifier(segmentName, segmentOffset);
+          pagingIdentifierList.add(pagingIdentifier);
+        }
+      }
+    }
+
+    DruidSelectResponse druidSelectResponse = new DruidSelectResponse();
+    if (CollectionUtils.isNotEmpty(pagingIdentifierList)) {
+      druidSelectResponse.setPagingIdentifiers(pagingIdentifierList);
+    }
+
+    druidSelectResponse.setEvents(events);
+    return druidSelectResponse;
+  }
+}
diff --git a/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/rest/RestClient.java b/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/rest/RestClient.java
new file mode 100644
index 0000000..d88a41f
--- /dev/null
+++ b/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/rest/RestClient.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.druid.rest;
+
+import org.apache.http.HttpResponse;
+
+import java.io.IOException;
+
+public interface RestClient {
+  HttpResponse get(String url) throws IOException;
+  HttpResponse post(String url, String body) throws IOException;
+}
diff --git a/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/rest/RestClientWrapper.java b/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/rest/RestClientWrapper.java
new file mode 100644
index 0000000..3dd47ee
--- /dev/null
+++ b/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/rest/RestClientWrapper.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.druid.rest;
+
+import org.apache.http.HttpEntity;
+import org.apache.http.HttpResponse;
+import org.apache.http.client.HttpClient;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.entity.ByteArrayEntity;
+import org.apache.http.impl.client.DefaultHttpClient;
+
+import javax.ws.rs.core.HttpHeaders;
+import java.io.IOException;
+
+import static javax.ws.rs.core.MediaType.APPLICATION_JSON;
+import static org.apache.http.protocol.HTTP.CONTENT_TYPE;
+
+public class RestClientWrapper implements RestClient {
+  private static final HttpClient httpClient = new DefaultHttpClient();
+  private static final String DEFAULT_ENCODING = "UTF-8";
+
+  public HttpResponse get(String url) throws IOException {
+    HttpGet httpget = new HttpGet(url);
+    httpget.addHeader(HttpHeaders.CONTENT_TYPE, APPLICATION_JSON);
+    return httpClient.execute(httpget);
+  }
+
+  public HttpResponse post(String url, String body) throws IOException {
+    HttpPost httppost = new HttpPost(url);
+    httppost.addHeader(CONTENT_TYPE, APPLICATION_JSON);
+    HttpEntity entity = new ByteArrayEntity(body.getBytes(DEFAULT_ENCODING));
+    httppost.setEntity(entity);
+
+    return httpClient.execute(httppost);
+  }
+}
diff --git a/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/schema/DruidSchemaFactory.java b/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/schema/DruidSchemaFactory.java
new file mode 100755
index 0000000..7de77f4
--- /dev/null
+++ b/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/schema/DruidSchemaFactory.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.druid.schema;
+
+import org.apache.calcite.schema.SchemaPlus;
+import org.apache.calcite.schema.Table;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.exec.planner.logical.DrillTable;
+import org.apache.drill.exec.planner.logical.DynamicDrillTable;
+import org.apache.drill.exec.store.AbstractSchema;
+import org.apache.drill.exec.store.AbstractSchemaFactory;
+import org.apache.drill.exec.store.SchemaConfig;
+import org.apache.drill.exec.store.druid.DruidScanSpec;
+import org.apache.drill.exec.store.druid.DruidStoragePlugin;
+import org.apache.drill.exec.store.druid.DruidStoragePluginConfig;
+import org.apache.drill.exec.store.druid.druid.SimpleDatasourceInfo;
+import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableList;
+import org.apache.drill.shaded.guava.com.google.common.collect.Maps;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+public class DruidSchemaFactory extends AbstractSchemaFactory {
+
+  private static final Logger logger = LoggerFactory.getLogger(DruidSchemaFactory.class);
+  private final DruidStoragePlugin plugin;
+
+  public DruidSchemaFactory(DruidStoragePlugin plugin, String schemaName) {
+    super(schemaName);
+    this.plugin = plugin;
+  }
+
+  @Override
+  public void registerSchemas(SchemaConfig schemaConfig, SchemaPlus parent) {
+    DruidDataSources schema = new DruidDataSources(getName());
+    SchemaPlus hPlus = parent.add(getName(), schema);
+    schema.setHolder(hPlus);
+  }
+
+  public class DruidDataSources extends AbstractSchema {
+
+    private final Set<String> tableNames;
+    private final Map<String, DrillTable> drillTables = Maps.newHashMap();
+    private Map<String, SimpleDatasourceInfo> druidDatasourceInfos = Maps.newHashMap();
+
+    public DruidDataSources(String name) {
+      super(ImmutableList.of(), name);
+      this.tableNames = this.getTableNames();
+    }
+
+    public void setHolder(SchemaPlus plusOfThis) {
+    }
+
+    @Override
+    public AbstractSchema getSubSchema(String name) {
+      return null;
+    }
+
+    @Override
+    public Set<String> getSubSchemaNames() {
+      return Collections.emptySet();
+    }
+
+    @Override
+    public Table getTable(String tableName) {
+
+      if (!tableNames.contains(tableName)) {
+        return null;
+      }
+
+      try {
+
+        if (! drillTables.containsKey(tableName)) {
+          SimpleDatasourceInfo simpleDatasourceInfo = druidDatasourceInfos.get(tableName);
+          DruidScanSpec scanSpec =
+            new DruidScanSpec(
+              tableName,
+              simpleDatasourceInfo.getProperties().getSegments().getSize(),
+              simpleDatasourceInfo.getProperties().getSegments().getMinTime(),
+              simpleDatasourceInfo.getProperties().getSegments().getMaxTime()
+            );
+          DynamicDrillTable dynamicDrillTable =
+            new DynamicDrillTable(plugin, getName(), null, scanSpec);
+          drillTables.put(tableName, dynamicDrillTable);
+        }
+
+        return drillTables.get(tableName);
+      } catch (Exception e) {
+        logger.warn("Failure while retrieving druid table {}", tableName, e);
+        return null;
+      }
+    }
+
+    @Override
+    public Set<String> getTableNames() {
+      try {
+        List<SimpleDatasourceInfo> dataSources = plugin.getAdminClient().getDataSources();
+        this.druidDatasourceInfos =
+          dataSources.stream()
+            .collect(Collectors.toMap(SimpleDatasourceInfo::getName, x -> x));
+        Set<String> dataSourceNames = this.druidDatasourceInfos.keySet();
+        logger.debug("Found Druid DataSources - {}", StringUtils.join(dataSourceNames, ","));
+        return dataSourceNames;
+      } catch (Exception e) {
+        throw UserException.dataReadError(e)
+            .message("Failure while loading druid datasources for database '%s'.", getName())
+            .build(logger);
+      }
+    }
+
+    @Override
+    public String getTypeName() {
+      return DruidStoragePluginConfig.NAME;
+    }
+  }
+}
diff --git a/contrib/storage-druid/src/main/resources/bootstrap-storage-plugins.json b/contrib/storage-druid/src/main/resources/bootstrap-storage-plugins.json
new file mode 100755
index 0000000..3a07594
--- /dev/null
+++ b/contrib/storage-druid/src/main/resources/bootstrap-storage-plugins.json
@@ -0,0 +1,11 @@
+{
+  "storage":{
+    "druid" : {
+      "type" : "druid",
+      "brokerAddress" : "http://localhost:8082",
+      "coordinatorAddress": "http://localhost:8081",
+      "averageRowSizeBytes": 100,
+      "enabled" : false
+    }
+  }
+}
diff --git a/contrib/storage-druid/src/main/resources/drill-module.conf b/contrib/storage-druid/src/main/resources/drill-module.conf
new file mode 100755
index 0000000..4a75df4
--- /dev/null
+++ b/contrib/storage-druid/src/main/resources/drill-module.conf
@@ -0,0 +1,24 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+#  This file tells Drill to consider this module when class path scanning.
+#  This file can also include any supplementary configuration information.
+#  This file is in HOCON format, see https://github.com/typesafehub/config/blob/master/HOCON.md for more information.
+drill.classpath.scanning: {
+  packages += "org.apache.drill.exec.store.druid"
+}
diff --git a/contrib/storage-druid/src/test/java/org/apache/drill/exec/store/druid/DruidFilterBuilderTest.java b/contrib/storage-druid/src/test/java/org/apache/drill/exec/store/druid/DruidFilterBuilderTest.java
new file mode 100644
index 0000000..9ca7c0e
--- /dev/null
+++ b/contrib/storage-druid/src/test/java/org/apache/drill/exec/store/druid/DruidFilterBuilderTest.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.druid;
+
+import org.apache.drill.common.FunctionNames;
+import org.apache.drill.common.expression.BooleanOperator;
+import org.apache.drill.common.expression.LogicalExpression;
+import org.apache.drill.exec.store.druid.common.DruidSelectorFilter;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mock;
+
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class DruidFilterBuilderTest {
+
+  private static final String SOME_DATASOURCE_NAME = "some data source";
+  private static final long SOME_DATASOURCE_SIZE = 500;
+  private static final String SOME_DATASOURCE_MIN_TIME = "some min time";
+  private static final String SOME_DATASOURCE_MAX_TIME = "some max time";
+
+  @Mock
+  private LogicalExpression logicalExpression;
+  private DruidFilterBuilder druidFilterBuilder;
+  private DruidScanSpec druidScanSpecLeft;
+  private DruidScanSpec druidScanSpecRight;
+
+  @Before
+  public void setup() {
+    logicalExpression = mock(LogicalExpression.class);
+    DruidSelectorFilter someDruidSelectorFilter = new DruidSelectorFilter("some dimension", "some value");
+    DruidSelectorFilter someOtherDruidSelectorFilter = new DruidSelectorFilter("some other dimension", "some other value");
+    druidScanSpecLeft =
+      new DruidScanSpec(
+        SOME_DATASOURCE_NAME,
+        someDruidSelectorFilter,
+        SOME_DATASOURCE_SIZE,
+        SOME_DATASOURCE_MIN_TIME,
+        SOME_DATASOURCE_MAX_TIME
+      );
+    druidScanSpecRight =
+      new DruidScanSpec(
+        SOME_DATASOURCE_NAME,
+        someOtherDruidSelectorFilter,
+        SOME_DATASOURCE_SIZE,
+        SOME_DATASOURCE_MIN_TIME,
+        SOME_DATASOURCE_MAX_TIME
+        );
+    try {
+      when(logicalExpression.accept(any(), any())).thenReturn(druidScanSpecRight);
+    } catch (Exception ignored) { }
+
+    DruidGroupScan druidGroupScan = new DruidGroupScan("some username", null, druidScanSpecLeft, null, 5);
+    druidFilterBuilder = new DruidFilterBuilder(druidGroupScan, logicalExpression);
+  }
+
+  @Test
+  public void parseTreeWithAndOfTwoSelectorFilters() {
+    DruidScanSpec parsedSpec = druidFilterBuilder.parseTree();
+    String expectedFilterJson = "{\"type\":\"and\",\"fields\":[{\"type\":\"selector\",\"dimension\":\"some dimension\",\"value\":\"some value\"},{\"type\":\"selector\",\"dimension\":\"some other dimension\",\"value\":\"some other value\"}]}";
+    String actual = parsedSpec.getFilter().toJson();
+    assertThat(actual).isEqualTo(expectedFilterJson);
+  }
+
+  @Test
+  public void visitBooleanOperatorWithAndOperator() {
+    LogicalExpression logicalExpression2 = mock(LogicalExpression.class);
+    try {
+      when(logicalExpression.accept(any(), any())).thenReturn(druidScanSpecLeft);
+      when(logicalExpression2.accept(any(), any())).thenReturn(druidScanSpecRight);
+    } catch (Exception ignored) {}
+    BooleanOperator booleanOperator =
+        new BooleanOperator(
+            FunctionNames.AND,
+            Stream.of(logicalExpression, logicalExpression2).collect(Collectors.toList()), null
+        );
+    DruidScanSpec druidScanSpec =
+        druidFilterBuilder.visitBooleanOperator(booleanOperator, null);
+    String expectedFilterJson = "{\"type\":\"and\",\"fields\":[{\"type\":\"selector\",\"dimension\":\"some dimension\",\"value\":\"some value\"},{\"type\":\"selector\",\"dimension\":\"some other dimension\",\"value\":\"some other value\"}]}";
+    String actual = druidScanSpec.getFilter().toJson();
+    assertThat(actual).isEqualTo(expectedFilterJson);
+  }
+
+  @Test
+  public void visitBooleanOperatorWithOrOperator() {
+    LogicalExpression logicalExpression2 = mock(LogicalExpression.class);
+    try {
+      when(logicalExpression.accept(any(), any())).thenReturn(druidScanSpecLeft);
+      when(logicalExpression2.accept(any(), any())).thenReturn(druidScanSpecRight);
+    } catch (Exception ignored) {}
+    BooleanOperator booleanOperator =
+        new BooleanOperator(
+            FunctionNames.OR,
+            Stream.of(logicalExpression, logicalExpression2).collect(Collectors.toList()), null
+        );
+    DruidScanSpec druidScanSpec =
+        druidFilterBuilder.visitBooleanOperator(booleanOperator, null);
+    String expectedFilterJson = "{\"type\":\"or\",\"fields\":[{\"type\":\"selector\",\"dimension\":\"some dimension\",\"value\":\"some value\"},{\"type\":\"selector\",\"dimension\":\"some other dimension\",\"value\":\"some other value\"}]}";
+    String actual = druidScanSpec.getFilter().toJson();
+    assertThat(actual).isEqualTo(expectedFilterJson);
+  }
+}
diff --git a/contrib/storage-druid/src/test/java/org/apache/drill/exec/store/druid/DruidScanSpecBuilderTest.java b/contrib/storage-druid/src/test/java/org/apache/drill/exec/store/druid/DruidScanSpecBuilderTest.java
new file mode 100644
index 0000000..6c8a4f0
--- /dev/null
+++ b/contrib/storage-druid/src/test/java/org/apache/drill/exec/store/druid/DruidScanSpecBuilderTest.java
@@ -0,0 +1,253 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.druid;
+
+import org.apache.drill.common.FunctionNames;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.store.druid.common.DruidConstants;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+public class DruidScanSpecBuilderTest {
+
+  private static final String SOME_DATASOURCE_NAME = "some datasource";
+  private static final long SOME_DATASOURCE_SIZE = 500;
+  private static final String SOME_DATASOURCE_MIN_TIME = "some min time";
+  private static final String SOME_DATASOURCE_MAX_TIME = "some max time";
+  private static final String SOME_FIELD = "some field";
+  private static final String SOME_VALUE = "some value";
+
+  private DruidScanSpecBuilder druidScanSpecBuilder;
+
+  @Before
+  public void setup() {
+    druidScanSpecBuilder = new DruidScanSpecBuilder();
+  }
+
+  @Test
+  public void buildCalledWithEqualFxShouldBuildSelectorFilter() {
+    SchemaPath schemaPath = SchemaPath.getSimplePath(SOME_FIELD);
+    DruidScanSpec druidScanSpec =
+      druidScanSpecBuilder
+        .build(
+          SOME_DATASOURCE_NAME,
+          SOME_DATASOURCE_SIZE,
+          SOME_DATASOURCE_MIN_TIME,
+          SOME_DATASOURCE_MAX_TIME,
+          FunctionNames.EQ,
+          schemaPath,
+          SOME_VALUE);
+
+    assertThat(druidScanSpec.getFilter().toJson()).isEqualTo("{\"type\":\"selector\",\"dimension\":\"some field\",\"value\":\"some value\"}");
+  }
+
+  @Test
+  public void buildCalledWithEqualFxIntervalFieldShouldBuildIntervalFilter() {
+    SchemaPath schemaPath = SchemaPath.getSimplePath(DruidConstants.INTERVAL_DIMENSION_NAME);
+    DruidScanSpec druidScanSpec =
+      druidScanSpecBuilder.build(
+        SOME_DATASOURCE_NAME,
+        SOME_DATASOURCE_SIZE,
+        SOME_DATASOURCE_MIN_TIME,
+        SOME_DATASOURCE_MAX_TIME,
+        FunctionNames.EQ,
+        schemaPath,
+        SOME_VALUE);
+
+    assertThat(druidScanSpec.getFilter().toJson()).isEqualTo("{\"eventInterval\":\"some value\"}");
+  }
+
+  @Test
+  public void buildCalledWithNotEqualFxShouldBuildSelectorFilter() {
+    SchemaPath schemaPath = SchemaPath.getSimplePath(SOME_FIELD);
+    DruidScanSpec druidScanSpec =
+      druidScanSpecBuilder.build(
+        SOME_DATASOURCE_NAME,
+        SOME_DATASOURCE_SIZE,
+        SOME_DATASOURCE_MIN_TIME,
+        SOME_DATASOURCE_MAX_TIME,
+        FunctionNames.NE,
+        schemaPath, SOME_VALUE
+      );
+
+    assertThat(druidScanSpec.getFilter().toJson()).isEqualTo("{\"type\":\"not\",\"field\":{\"type\":\"selector\",\"dimension\":\"some field\",\"value\":\"some value\"}}");
+  }
+
+  @Test
+  public void buildCalledWithGreaterThanOrEqualToFxShouldBuildBoundFilter() {
+    SchemaPath schemaPath = SchemaPath.getSimplePath(SOME_FIELD);
+    DruidScanSpec druidScanSpec =
+      druidScanSpecBuilder.build(
+        SOME_DATASOURCE_NAME,
+        SOME_DATASOURCE_SIZE,
+        SOME_DATASOURCE_MIN_TIME,
+        SOME_DATASOURCE_MAX_TIME,
+        FunctionNames.GE,
+        schemaPath,
+        SOME_VALUE
+      );
+
+    assertThat(druidScanSpec.getFilter().toJson()).isEqualTo("{\"type\":\"bound\",\"dimension\":\"some field\",\"lower\":\"some value\",\"ordering\":\"lexicographic\"}");
+  }
+
+  @Test
+  public void buildCalledWithGreaterThanFxShouldBuildBoundFilter() {
+    SchemaPath schemaPath = SchemaPath.getSimplePath(SOME_FIELD);
+    DruidScanSpec druidScanSpec =
+      druidScanSpecBuilder.build(
+        SOME_DATASOURCE_NAME,
+        SOME_DATASOURCE_SIZE,
+        SOME_DATASOURCE_MIN_TIME,
+        SOME_DATASOURCE_MAX_TIME,
+        FunctionNames.GT,
+        schemaPath,
+        SOME_VALUE
+      );
+
+    assertThat(druidScanSpec.getFilter().toJson()).isEqualTo("{\"type\":\"bound\",\"dimension\":\"some field\",\"lower\":\"some value\",\"lowerStrict\":true,\"ordering\":\"lexicographic\"}");
+  }
+
+  @Test
+  public void buildCalledWithGreaterThanFxAndNumericValueShouldBuildBoundFilter() {
+    SchemaPath schemaPath = SchemaPath.getSimplePath(SOME_FIELD);
+    DruidScanSpec druidScanSpec =
+      druidScanSpecBuilder.build(
+        SOME_DATASOURCE_NAME,
+        SOME_DATASOURCE_SIZE,
+        SOME_DATASOURCE_MIN_TIME,
+        SOME_DATASOURCE_MAX_TIME,
+        FunctionNames.GT,
+        schemaPath,
+        "1"
+      );
+
+    assertThat(druidScanSpec.getFilter().toJson()).isEqualTo("{\"type\":\"bound\",\"dimension\":\"some field\",\"lower\":\"1\",\"lowerStrict\":true,\"ordering\":\"numeric\"}");
+  }
+
+  @Test
+  public void buildCalledWithLessThanOrEqualToFxShouldBuildBoundFilter() {
+    SchemaPath schemaPath = SchemaPath.getSimplePath(SOME_FIELD);
+    DruidScanSpec druidScanSpec =
+      druidScanSpecBuilder.build(
+        SOME_DATASOURCE_NAME,
+        SOME_DATASOURCE_SIZE,
+        SOME_DATASOURCE_MIN_TIME,
+        SOME_DATASOURCE_MAX_TIME,
+        FunctionNames.LE,
+        schemaPath,
+        SOME_VALUE);
+
+    assertThat(druidScanSpec.getFilter().toJson()).isEqualTo("{\"type\":\"bound\",\"dimension\":\"some field\",\"upper\":\"some value\",\"ordering\":\"lexicographic\"}");
+  }
+
+  @Test
+  public void buildCalledWithLessThanFxShouldBuildBoundFilter() {
+    SchemaPath schemaPath = SchemaPath.getSimplePath(SOME_FIELD);
+    DruidScanSpec druidScanSpec =
+      druidScanSpecBuilder.build(SOME_DATASOURCE_NAME,
+        SOME_DATASOURCE_SIZE,
+        SOME_DATASOURCE_MIN_TIME,
+        SOME_DATASOURCE_MAX_TIME,
+        FunctionNames.LT,
+        schemaPath,
+        SOME_VALUE);
+
+    assertThat(druidScanSpec.getFilter().toJson()).isEqualTo("{\"type\":\"bound\",\"dimension\":\"some field\",\"upper\":\"some value\",\"upperStrict\":true,\"ordering\":\"lexicographic\"}");
+  }
+
+  @Test
+  public void buildCalledWithLessThanFxAndNumericValueShouldBuildBoundFilter() {
+    SchemaPath schemaPath = SchemaPath.getSimplePath(SOME_FIELD);
+    DruidScanSpec druidScanSpec =
+      druidScanSpecBuilder.build(SOME_DATASOURCE_NAME,
+        SOME_DATASOURCE_SIZE,
+        SOME_DATASOURCE_MIN_TIME,
+        SOME_DATASOURCE_MAX_TIME,
+        FunctionNames.LT,
+        schemaPath,
+        "1");
+
+    assertThat(druidScanSpec.getFilter().toJson()).isEqualTo("{\"type\":\"bound\",\"dimension\":\"some field\",\"upper\":\"1\",\"upperStrict\":true,\"ordering\":\"numeric\"}");
+  }
+
+  @Test
+  public void buildCalledWithIsNullFxShouldBuildSelectorFilter() {
+    SchemaPath schemaPath = SchemaPath.getSimplePath(SOME_FIELD);
+    DruidScanSpec druidScanSpec =
+      druidScanSpecBuilder.build(
+        SOME_DATASOURCE_NAME,
+        SOME_DATASOURCE_SIZE,
+        SOME_DATASOURCE_MIN_TIME,
+        SOME_DATASOURCE_MAX_TIME,
+        FunctionNames.IS_NULL,
+        schemaPath,
+        null);
+    assertThat(druidScanSpec).isNotNull();
+    assertThat(druidScanSpec.getFilter().toJson()).isEqualTo("{\"type\":\"selector\",\"dimension\":\"some field\",\"value\":null}");
+  }
+
+  @Test
+  public void buildCalledWithIsNotNullFxShouldBuildSelectorFilter() {
+    SchemaPath schemaPath = SchemaPath.getSimplePath(SOME_FIELD);
+    DruidScanSpec druidScanSpec =
+      druidScanSpecBuilder.build(
+        SOME_DATASOURCE_NAME,
+        SOME_DATASOURCE_SIZE,
+        SOME_DATASOURCE_MIN_TIME,
+        SOME_DATASOURCE_MAX_TIME,
+        FunctionNames.IS_NOT_NULL,
+        schemaPath,
+        null);
+    assertThat(druidScanSpec).isNotNull();
+    assertThat(druidScanSpec.getFilter().toJson()).isEqualTo("{\"type\":\"not\",\"field\":{\"type\":\"selector\",\"dimension\":\"some field\",\"value\":null}}");
+  }
+
+  @Test
+  public void buildCalledWithLikeFxButIfValueIsPrefixedWithRegexKeywordHintShouldBuildRegexFilter() {
+    SchemaPath schemaPath = SchemaPath.getSimplePath(SOME_FIELD);
+    DruidScanSpec druidScanSpec =
+      druidScanSpecBuilder
+        .build(SOME_DATASOURCE_NAME,
+          SOME_DATASOURCE_SIZE,
+          SOME_DATASOURCE_MIN_TIME,
+          SOME_DATASOURCE_MAX_TIME,
+          FunctionNames.LIKE,
+          schemaPath,
+          "$regex$_some_regular_expression");
+
+    assertThat(druidScanSpec.getFilter().toJson()).isEqualTo("{\"type\":\"regex\",\"dimension\":\"some field\",\"pattern\":\"some_regular_expression\"}");
+  }
+
+  @Test
+  public void buildCalledWithLikeFxShouldBuildSearchFilter() {
+    SchemaPath schemaPath = SchemaPath.getSimplePath(SOME_FIELD);
+    DruidScanSpec druidScanSpec =
+      druidScanSpecBuilder
+        .build(SOME_DATASOURCE_NAME,
+          SOME_DATASOURCE_SIZE,
+          SOME_DATASOURCE_MIN_TIME,
+          SOME_DATASOURCE_MAX_TIME,
+          FunctionNames.LIKE,
+          schemaPath,
+          "some search string");
+
+    assertThat(druidScanSpec.getFilter().toJson()).isEqualTo("{\"type\":\"search\",\"dimension\":\"some field\",\"query\":{\"type\":\"contains\",\"value\":\"some search string\",\"caseSensitive\":false}}");
+  }
+}
diff --git a/contrib/storage-druid/src/test/java/org/apache/drill/exec/store/druid/DruidStoragePluginConfigTest.java b/contrib/storage-druid/src/test/java/org/apache/drill/exec/store/druid/DruidStoragePluginConfigTest.java
new file mode 100644
index 0000000..354b23c
--- /dev/null
+++ b/contrib/storage-druid/src/test/java/org/apache/drill/exec/store/druid/DruidStoragePluginConfigTest.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.druid;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.drill.shaded.guava.com.google.common.io.Resources;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URISyntaxException;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+public class DruidStoragePluginConfigTest {
+
+  @Test
+  public void testDruidStoragePluginConfigSuccessfullyParsed()
+      throws URISyntaxException, IOException {
+    ObjectMapper mapper = new ObjectMapper();
+    JsonNode storagePluginJson = mapper.readTree(new File(
+        Resources.getResource("bootstrap-storage-plugins.json").toURI()));
+    DruidStoragePluginConfig druidStoragePluginConfig =
+        mapper.treeToValue(storagePluginJson.get("storage").get("druid"), DruidStoragePluginConfig.class);
+    assertThat(druidStoragePluginConfig).isNotNull();
+    assertThat(druidStoragePluginConfig.getBrokerAddress()).isEqualTo("http://localhost:8082");
+    assertThat(druidStoragePluginConfig.getCoordinatorAddress()).isEqualTo("http://localhost:8081");
+    assertThat(druidStoragePluginConfig.getAverageRowSizeBytes()).isEqualTo(200);
+    assertThat(druidStoragePluginConfig.isEnabled()).isFalse();
+  }
+
+  @Test
+  public void testDefaultRowSizeUsedWhenNotProvidedInConfig()
+      throws JsonProcessingException {
+    String druidConfigStr = "{\n" + "  \"storage\":{\n" + "    \"druid\" : {\n"
+        + "      \"type\" : \"druid\",\n"
+        + "      \"brokerAddress\" : \"http://localhost:8082\",\n"
+        + "      \"coordinatorAddress\": \"http://localhost:8081\",\n"
+        + "      \"enabled\" : false\n" + "    }\n" + "  }\n" + "}\n";
+    ObjectMapper mapper = new ObjectMapper();
+    JsonNode storagePluginJson = mapper.readTree(druidConfigStr);
+    DruidStoragePluginConfig druidStoragePluginConfig =
+        mapper.treeToValue(storagePluginJson.get("storage").get("druid"), DruidStoragePluginConfig.class);
+    assertThat(druidStoragePluginConfig.getAverageRowSizeBytes()).isEqualTo(100);
+  }
+}
diff --git a/contrib/storage-druid/src/test/java/org/apache/drill/exec/store/druid/DruidTestBase.java b/contrib/storage-druid/src/test/java/org/apache/drill/exec/store/druid/DruidTestBase.java
new file mode 100644
index 0000000..ef47e26
--- /dev/null
+++ b/contrib/storage-druid/src/test/java/org/apache/drill/exec/store/druid/DruidTestBase.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.druid;
+
+import org.apache.drill.exec.store.StoragePluginRegistry;
+import org.apache.drill.test.ClusterFixture;
+import org.apache.drill.test.ClusterTest;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class DruidTestBase extends ClusterTest implements DruidTestConstants {
+  private static final Logger logger = LoggerFactory.getLogger(DruidTestBase.class);
+  private static StoragePluginRegistry pluginRegistry;
+
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+    startCluster(ClusterFixture.builder(dirTestWatcher));
+    pluginRegistry = cluster.drillbit().getContext().getStorage();
+
+    DruidTestSuit.initDruid();
+    initDruidStoragePlugin();
+  }
+
+  private static void initDruidStoragePlugin() throws Exception {
+    pluginRegistry
+      .put(
+        DruidStoragePluginConfig.NAME,
+        DruidTestSuit.getDruidStoragePluginConfig());
+  }
+
+  @AfterClass
+  public static void tearDownDruidTestBase()
+      throws StoragePluginRegistry.PluginException {
+    if (pluginRegistry != null) {
+      pluginRegistry.remove(DruidStoragePluginConfig.NAME);
+    } else {
+      logger.warn("Plugin Registry was null");
+    }
+  }
+}
diff --git a/contrib/storage-druid/src/test/java/org/apache/drill/exec/store/druid/DruidTestConstants.java b/contrib/storage-druid/src/test/java/org/apache/drill/exec/store/druid/DruidTestConstants.java
new file mode 100644
index 0000000..adafd4d
--- /dev/null
+++ b/contrib/storage-druid/src/test/java/org/apache/drill/exec/store/druid/DruidTestConstants.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.druid;
+
+public interface DruidTestConstants {
+  String TEST_DATASOURCE_WIKIPEDIA = "wikipedia";
+  String TEST_STRING_EQUALS_FILTER_QUERY_TEMPLATE1 = "select * from druid.`%s` as ds where ds.user = 'Eribro'";
+  String TEST_STRING_TWO_AND_EQUALS_FILTER_QUERY_TEMPLATE1 = "select * from druid.`%s` as ds where ds.user = 'Eribro' AND ds.comment like 'Musik'";
+  String TEST_STRING_TWO_OR_EQUALS_FILTER_QUERY_TEMPLATE1 = "select * from druid.`%s` as ds where ds.user = 'Eribro' OR ds.page = 'Rallicula'";
+  String TEST_QUERY_PROJECT_PUSH_DOWN_TEMPLATE_1 = "SELECT ds.`comment` FROM druid.`%s` as ds";
+  String TEST_QUERY_COUNT_QUERY_TEMPLATE = "SELECT count(*) as count FROM druid.`%s` as ds";
+}
diff --git a/contrib/storage-druid/src/test/java/org/apache/drill/exec/store/druid/DruidTestSuit.java b/contrib/storage-druid/src/test/java/org/apache/drill/exec/store/druid/DruidTestSuit.java
new file mode 100644
index 0000000..5e1c149
--- /dev/null
+++ b/contrib/storage-druid/src/test/java/org/apache/drill/exec/store/druid/DruidTestSuit.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.druid;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.drill.categories.DruidStorageTest;
+import org.apache.drill.categories.SlowTest;
+import org.apache.drill.exec.store.druid.rest.DruidQueryClientTest;
+import org.apache.drill.shaded.guava.com.google.common.io.Resources;
+import org.junit.BeforeClass;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.Suite;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+
+@RunWith(Suite.class)
+@Suite.SuiteClasses({
+    TestDruidQueries.class,
+    DruidScanSpecBuilderTest.class,
+    DruidStoragePluginConfigTest.class,
+    DruidQueryClientTest.class,
+    DruidFilterBuilderTest.class,
+    DruidScanSpecBuilderTest.class
+})
+@Category({SlowTest.class, DruidStorageTest.class})
+public class DruidTestSuit {
+  private static final Logger logger = LoggerFactory.getLogger(DruidTestSuit.class);
+
+  private static final ObjectMapper mapper = new ObjectMapper();
+
+  private static DruidStoragePluginConfig druidStoragePluginConfig = null;
+
+  @BeforeClass
+  public static void initDruid() {
+    try {
+      JsonNode storagePluginJson = mapper.readTree(new File(Resources.getResource("bootstrap-storage-plugins.json").toURI()));
+      druidStoragePluginConfig = mapper.treeToValue(storagePluginJson.get("storage").get("druid"), DruidStoragePluginConfig.class);
+      druidStoragePluginConfig.setEnabled(true);
+      TestDataGenerator.startImport(druidStoragePluginConfig);
+    } catch (Exception e) {
+      logger.error("Error importing data into DRUID", e);
+    }
+  }
+
+  public static DruidStoragePluginConfig getDruidStoragePluginConfig() {
+    return druidStoragePluginConfig;
+  }
+}
diff --git a/contrib/storage-druid/src/test/java/org/apache/drill/exec/store/druid/TestDataGenerator.java b/contrib/storage-druid/src/test/java/org/apache/drill/exec/store/druid/TestDataGenerator.java
new file mode 100644
index 0000000..902a796
--- /dev/null
+++ b/contrib/storage-druid/src/test/java/org/apache/drill/exec/store/druid/TestDataGenerator.java
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.druid;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.drill.shaded.guava.com.google.common.io.Resources;
+import org.apache.http.HttpEntity;
+import org.apache.http.HttpResponse;
+import org.apache.http.HttpStatus;
+import org.apache.http.StatusLine;
+import org.apache.http.client.HttpClient;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.entity.ByteArrayEntity;
+import org.apache.http.impl.client.DefaultHttpClient;
+import org.apache.http.util.EntityUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.ws.rs.core.HttpHeaders;
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.concurrent.TimeUnit;
+
+import static javax.ws.rs.core.MediaType.APPLICATION_JSON;
+import static org.apache.http.protocol.HTTP.CONTENT_TYPE;
+
+public class TestDataGenerator {
+  private static final Logger logger = LoggerFactory.getLogger(TestDataGenerator.class);
+
+  private static final HttpClient httpClient = new DefaultHttpClient();
+
+  private static final ObjectMapper mapper = new ObjectMapper();
+
+  private static final String DEFAULT_ENCODING = "UTF-8";
+
+  private static final String RESPONSE_SUCCESS = "SUCCESS";
+
+  public static void startImport(DruidStoragePluginConfig druidStoragePluginConfig) throws Exception {
+    if (isDruidRunning(druidStoragePluginConfig)) {
+      logger.debug("Starting Test Data Import");
+      String taskId = startImportTask(druidStoragePluginConfig);
+      waitForIndexingTaskToFinish(taskId, druidStoragePluginConfig);
+      logger.debug("Finished Test Data Import");
+    }
+    else {
+      logger.error("DRUID does not seem to be running...");
+    }
+  }
+
+  private static boolean isDruidRunning(DruidStoragePluginConfig druidStoragePluginConfig) {
+    try {
+      String healthCheckUrl = druidStoragePluginConfig.getCoordinatorAddress() + "/status/health";
+      HttpGet httpGet = new HttpGet(healthCheckUrl);
+      HttpResponse response = httpClient.execute(httpGet);
+      StatusLine statusLine = response.getStatusLine();
+      String status = EntityUtils.toString(response.getEntity(), DEFAULT_ENCODING);
+      return statusLine.getStatusCode() == HttpStatus.SC_OK && status.equalsIgnoreCase("true");
+    } catch (Exception ex) {
+      logger.error("Error getting druid status", ex);
+      return false;
+    }
+  }
+
+  private static String taskUrl(DruidStoragePluginConfig druidStoragePluginConfig) {
+    return druidStoragePluginConfig.getCoordinatorAddress() + "/druid/indexer/v1/task";
+  }
+
+  private static String startImportTask(DruidStoragePluginConfig druidStoragePluginConfig) throws URISyntaxException, IOException {
+    try {
+      String url = taskUrl(druidStoragePluginConfig);
+      byte[] taskConfig = Files.readAllBytes(Paths.get(Resources.getResource("wikipedia-index.json").toURI()));
+
+      HttpPost httpPost = new HttpPost(url);
+      httpPost.addHeader(CONTENT_TYPE, APPLICATION_JSON);
+      HttpEntity entity = new ByteArrayEntity(taskConfig);
+      httpPost.setEntity(entity);
+
+      HttpResponse response = httpClient.execute(httpPost);
+      String data = EntityUtils.toString(response.getEntity());
+      TaskStartResponse taskStartResponse = mapper.readValue(data, TaskStartResponse.class);
+      logger.debug("Started Indexing Task - " + taskStartResponse.getTaskId());
+      return taskStartResponse.getTaskId();
+    } catch (Exception ex) {
+      logger.error("Error starting Indexing Task");
+      throw ex;
+    }
+  }
+
+  private static void waitForIndexingTaskToFinish(String taskId, DruidStoragePluginConfig druidStoragePluginConfig) throws Exception {
+    int sleepMinutes = 1;
+    logger.info("Waiting {} minute(s) for Indexing Task - {} to finish", sleepMinutes, taskId);
+    Thread.sleep(TimeUnit.MINUTES.toMillis(sleepMinutes));
+
+    String url = taskUrl(druidStoragePluginConfig) + "/" + taskId + "/status";
+    HttpGet httpget = new HttpGet(url);
+    httpget.addHeader(HttpHeaders.CONTENT_TYPE, APPLICATION_JSON);
+
+    HttpResponse response = httpClient.execute(httpget);
+    String responseJson = EntityUtils.toString(response.getEntity(), DEFAULT_ENCODING);
+    TaskStatusResponse taskStatusResponse = mapper.readValue(responseJson, TaskStatusResponse.class);
+    if (!taskStatusResponse.taskStatus.status.equalsIgnoreCase(RESPONSE_SUCCESS)) {
+      throw new Exception(String.format("Task %s finished with status %s", taskId, taskStatusResponse.taskStatus.status));
+    }
+
+    logger.debug("Task {} finished successfully", taskId);
+  }
+
+  private static class TaskStartResponse {
+    @JsonProperty("task")
+    private String taskId;
+
+    @JsonCreator
+    public TaskStartResponse(@JsonProperty("task") String taskId) {
+      this.taskId = taskId;
+    }
+
+    public String getTaskId() {
+      return taskId;
+    }
+  }
+
+  @JsonIgnoreProperties(ignoreUnknown = true)
+  private static class TaskStatus {
+    @JsonProperty
+    String id;
+
+    @JsonProperty
+    String statusCode;
+
+    @JsonProperty
+    String status;
+
+    @JsonProperty
+    int duration;
+
+    @JsonProperty
+    String dataSource;
+
+    @JsonCreator
+    public TaskStatus(@JsonProperty("id") String id, @JsonProperty("statusCode") String statusCode, @JsonProperty("status") String status, @JsonProperty("duration") int duration, @JsonProperty("dataSource") String dataSource) {
+      this.id = id;
+      this.statusCode = statusCode;
+      this.status = status;
+      this.duration = duration;
+      this.dataSource = dataSource;
+    }
+  }
+
+  private static class TaskStatusResponse {
+    @JsonProperty("task")
+    String taskId;
+
+    @JsonProperty("status")
+    TaskStatus taskStatus;
+
+    public TaskStatusResponse(@JsonProperty("task") String taskId, @JsonProperty("status") TaskStatus taskStatus) {
+      this.taskId = taskId;
+      this.taskStatus = taskStatus;
+    }
+  }
+}
diff --git a/contrib/storage-druid/src/test/java/org/apache/drill/exec/store/druid/TestDruidQueries.java b/contrib/storage-druid/src/test/java/org/apache/drill/exec/store/druid/TestDruidQueries.java
new file mode 100644
index 0000000..32f426c
--- /dev/null
+++ b/contrib/storage-druid/src/test/java/org/apache/drill/exec/store/druid/TestDruidQueries.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.druid;
+
+import org.apache.drill.categories.DruidStorageTest;
+import org.apache.drill.categories.SlowTest;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Ignore("These tests require a running druid instance. You may start druid by using the docker-compose provide in resources/druid and enable these tests")
+@Category({SlowTest.class, DruidStorageTest.class})
+public class TestDruidQueries extends DruidTestBase {
+
+  @Test
+  public void testEqualsFilter() throws Exception {
+    testBuilder()
+      .sqlQuery(String.format(TEST_STRING_EQUALS_FILTER_QUERY_TEMPLATE1, TEST_DATASOURCE_WIKIPEDIA))
+      .unOrdered()
+      .expectsNumRecords(2)
+      .go();
+  }
+
+  @Test
+  public void testTwoANDdEqualsFilter() throws Exception {
+    testBuilder()
+        .sqlQuery(String.format(TEST_STRING_TWO_AND_EQUALS_FILTER_QUERY_TEMPLATE1, TEST_DATASOURCE_WIKIPEDIA))
+        .unOrdered()
+        .expectsNumRecords(1)
+        .go();
+  }
+
+  @Test
+  public void testTwoOrdEqualsFilter() throws Exception {
+    testBuilder()
+        .sqlQuery(String.format(TEST_STRING_TWO_OR_EQUALS_FILTER_QUERY_TEMPLATE1, TEST_DATASOURCE_WIKIPEDIA))
+        .unOrdered()
+        .expectsNumRecords(3)
+        .go();
+  }
+
+  @Test
+  public void testSingleColumnProject() throws Exception {
+    String query = String.format(TEST_QUERY_PROJECT_PUSH_DOWN_TEMPLATE_1, TEST_DATASOURCE_WIKIPEDIA);
+
+    testBuilder()
+        .sqlQuery(query)
+        .unOrdered()
+        .baselineColumns("comment")
+        .expectsNumRecords(39244)
+        .go();
+  }
+
+  @Test
+  public void testCountAllRowsQuery() throws Exception {
+    String query = String.format(TEST_QUERY_COUNT_QUERY_TEMPLATE, TEST_DATASOURCE_WIKIPEDIA);
+
+    testBuilder()
+      .sqlQuery(query)
+      .unOrdered()
+      .baselineColumns("count")
+      .baselineValues(39244L)
+      .go();
+  }
+}
diff --git a/contrib/storage-druid/src/test/java/org/apache/drill/exec/store/druid/rest/DruidQueryClientTest.java b/contrib/storage-druid/src/test/java/org/apache/drill/exec/store/druid/rest/DruidQueryClientTest.java
new file mode 100644
index 0000000..2a55df0
--- /dev/null
+++ b/contrib/storage-druid/src/test/java/org/apache/drill/exec/store/druid/rest/DruidQueryClientTest.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.druid.rest;
+
+import org.apache.drill.exec.store.druid.druid.DruidSelectResponse;
+import org.apache.http.HttpStatus;
+import org.apache.http.HttpResponse;
+import org.apache.http.StatusLine;
+import org.apache.http.HttpEntity;
+import org.apache.http.Header;
+import org.apache.http.HttpHeaders;
+import org.apache.http.message.BasicHeader;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mock;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.charset.StandardCharsets;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+public class DruidQueryClientTest {
+
+  @Mock
+  private RestClient restClient;
+
+  @Mock
+  private HttpResponse httpResponse;
+
+  @Mock
+  private StatusLine statusLine;
+
+  @Mock
+  private HttpEntity httpEntity;
+
+  private DruidQueryClient druidQueryClient;
+  private static final String BROKER_URI = "some broker uri";
+  private static final String QUERY = "some query";
+  private static final Header ENCODING_HEADER =
+      new BasicHeader(HttpHeaders.CONTENT_ENCODING, StandardCharsets.UTF_8.name());
+
+  @Before
+  public void setup() throws IOException {
+    restClient = mock(RestClient.class);
+    httpResponse = mock(HttpResponse.class);
+    statusLine = mock(StatusLine.class);
+    httpEntity = mock(HttpEntity.class);
+
+    when(statusLine.getStatusCode()).thenReturn(HttpStatus.SC_OK);
+    when(httpEntity.getContentEncoding()).thenReturn(ENCODING_HEADER);
+    when(httpResponse.getStatusLine()).thenReturn(statusLine);
+    when(httpResponse.getEntity()).thenReturn(httpEntity);
+    when(restClient.post(BROKER_URI + "/druid/v2", QUERY))
+        .thenReturn(httpResponse);
+
+    druidQueryClient = new DruidQueryClient(BROKER_URI, restClient);
+  }
+
+  @Test(expected=Exception.class)
+  public void executeQueryCalledDruidReturnsNon200ShouldThrowError()
+      throws Exception {
+    when(statusLine.getStatusCode()).thenReturn(HttpStatus.SC_INTERNAL_SERVER_ERROR);
+    druidQueryClient.executeQuery(QUERY);
+  }
+
+  @Test
+  public void executeQueryCalledNoResponsesFoundReturnsEmptyEventList()
+      throws Exception {
+    InputStream inputStream =
+        new ByteArrayInputStream("[]".getBytes(StandardCharsets.UTF_8.name()));
+    when(httpEntity.getContent()).thenReturn(inputStream);
+
+    DruidSelectResponse response = druidQueryClient.executeQuery(QUERY);
+    assertThat(response.getEvents()).isEmpty();
+    assertThat(response.getPagingIdentifiers()).isEmpty();
+  }
+
+  @Test
+  public void executeQueryCalledSuccessfullyParseQueryResults()
+      throws Exception {
+    String result = "[{\"result\":{\"pagingIdentifiers\":{\"some_segment_identifier\":500,\"some_other_segment_identifier\":501},\"events\":[{\"event\":{\"some_property\":\"some value\"}},{\"event\":{\"some_property\":\"some other value\"}}]}}]";
+    InputStream inputStream =
+        new ByteArrayInputStream(result.getBytes(StandardCharsets.UTF_8.name()));
+    when(httpEntity.getContent()).thenReturn(inputStream);
+
+    DruidSelectResponse response = druidQueryClient.executeQuery(QUERY);
+    assertThat(response.getEvents()).isNotEmpty();
+    assertThat(response.getEvents().size()).isEqualTo(2);
+    assertThat(response.getEvents().get(0).get("some_property").textValue()).isEqualTo("some value");
+    assertThat(response.getEvents().get(1).get("some_property").textValue()).isEqualTo("some other value");
+  }
+}
diff --git a/contrib/storage-druid/src/test/resources/bootstrap-storage-plugins.json b/contrib/storage-druid/src/test/resources/bootstrap-storage-plugins.json
new file mode 100644
index 0000000..7b1f46d
--- /dev/null
+++ b/contrib/storage-druid/src/test/resources/bootstrap-storage-plugins.json
@@ -0,0 +1,11 @@
+{
+  "storage":{
+    "druid" : {
+      "type" : "druid",
+      "brokerAddress" : "http://localhost:8082",
+      "coordinatorAddress": "http://localhost:8081",
+      "averageRowSizeBytes": 200,
+      "enabled" : false
+    }
+  }
+}
diff --git a/contrib/storage-druid/src/test/resources/druid/docker-compose.yaml b/contrib/storage-druid/src/test/resources/druid/docker-compose.yaml
new file mode 100644
index 0000000..ab81225
--- /dev/null
+++ b/contrib/storage-druid/src/test/resources/druid/docker-compose.yaml
@@ -0,0 +1,136 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+version: "2.2"
+
+volumes:
+  metadata_data: {}
+  druid_var:
+
+services:
+  postgres:
+    container_name: postgres
+    image: postgres:latest
+    volumes:
+      - metadata_data:/var/lib/postgresql/data
+    environment:
+      - POSTGRES_PASSWORD=FoolishPassword
+      - POSTGRES_USER=druid
+      - POSTGRES_DB=druid
+
+  # Need 3.5 or later for container nodes
+  zookeeper:
+    container_name: zookeeper
+    image: zookeeper:3.5
+    environment:
+      - ZOO_MY_ID=1
+
+  coordinator:
+    image: apache/incubator-druid:0.16.1-incubating
+    container_name: coordinator
+    volumes:
+      - druid_var:/opt/druid/var
+    depends_on:
+      - zookeeper
+      - postgres
+    ports:
+      - "8081:8081"
+    command:
+      - coordinator
+    env_file:
+      - environment
+
+  broker:
+    image: apache/incubator-druid:0.16.1-incubating
+    container_name: broker
+    volumes:
+      - druid_var:/opt/druid/var
+    depends_on:
+      - zookeeper
+      - postgres
+      - coordinator
+    ports:
+      - "8082:8082"
+    command:
+      - broker
+    env_file:
+      - environment
+
+  historical:
+    image: apache/incubator-druid:0.16.1-incubating
+    container_name: historical
+    volumes:
+      - druid_var:/opt/druid/var
+    depends_on:
+      - zookeeper
+      - postgres
+      - coordinator
+    ports:
+      - "8083:8083"
+    command:
+      - historical
+    env_file:
+      - environment
+
+  overlord:
+    image: apache/incubator-druid:0.16.1-incubating
+    container_name: overlord
+    volumes:
+      - druid_var:/opt/druid/var
+    depends_on:
+      - zookeeper
+      - postgres
+    ports:
+      - "8090:8090"
+    command:
+      - overlord
+    env_file:
+      - environment
+
+  middlemanager:
+    image: apache/incubator-druid:0.16.1-incubating
+    container_name: middlemanager
+    volumes:
+      - druid_var:/opt/druid/var
+    depends_on:
+      - zookeeper
+      - postgres
+      - coordinator
+    ports:
+      - "8091:8091"
+    command:
+      - middleManager
+    env_file:
+      - environment
+
+  router:
+    image: apache/incubator-druid:0.16.1-incubating
+    container_name: router
+    volumes:
+      - druid_var:/opt/druid/var
+    depends_on:
+      - zookeeper
+      - postgres
+      - coordinator
+    ports:
+      - "8888:8888"
+    command:
+      - router
+    env_file:
+      - environment
diff --git a/contrib/storage-druid/src/test/resources/druid/environment b/contrib/storage-druid/src/test/resources/druid/environment
new file mode 100644
index 0000000..259bcf5
--- /dev/null
+++ b/contrib/storage-druid/src/test/resources/druid/environment
@@ -0,0 +1,52 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+# Java tuning
+DRUID_XMX=1g
+DRUID_XMS=1g
+DRUID_MAXNEWSIZE=250m
+DRUID_NEWSIZE=250m
+DRUID_MAXDIRECTMEMORYSIZE=3172m
+
+druid_emitter_logging_logLevel=debug
+
+druid_extensions_loadList=["druid-histogram", "druid-datasketches", "druid-lookups-cached-global", "postgresql-metadata-storage"]
+
+druid_zk_service_host=zookeeper
+
+druid_metadata_storage_host=
+druid_metadata_storage_type=postgresql
+druid_metadata_storage_connector_connectURI=jdbc:postgresql://postgres:5432/druid
+druid_metadata_storage_connector_user=druid
+druid_metadata_storage_connector_password=FoolishPassword
+
+druid_coordinator_balancer_strategy=cachingCost
+
+druid_processing_buffer_sizeBytes=10000000
+druid_processing_numMergeBuffers=2
+druid_processing_numThreads=1
+
+druid_coordinator_asOverlord_enabled=false
+
+druid_indexer_runner_javaOptsArray=["-server", "-Xmx1g", "-Xms1g", "-XX:MaxDirectMemorySize=3g", "-Duser.timezone=UTC", "-Dfile.encoding=UTF-8", "-Djava.util.logging.manager=org.apache.logging.log4j.jul.LogManager"]
+druid_indexer_fork_property_druid_processing_buffer_sizeBytes=268435456
+
+druid_storage_type=local
+
+DRUID_LOG4J=<?xml version="1.0" encoding="UTF-8" ?><Configuration status="WARN"><Appenders><Console name="Console" target="SYSTEM_OUT"><PatternLayout pattern="%d{ISO8601} %p [%t] %c - %m%n"/></Console></Appenders><Loggers><Root level="info"><AppenderRef ref="Console"/></Root><Logger name="org.apache.druid.jetty.RequestLog" additivity="false" level="DEBUG"><AppenderRef ref="Console"/></Logger></Loggers></Configuration>
\ No newline at end of file
diff --git a/contrib/storage-druid/src/test/resources/wikipedia-index.json b/contrib/storage-druid/src/test/resources/wikipedia-index.json
new file mode 100644
index 0000000..c6d902f
--- /dev/null
+++ b/contrib/storage-druid/src/test/resources/wikipedia-index.json
@@ -0,0 +1,63 @@
+{
+  "type" : "index",
+  "spec" : {
+    "dataSchema" : {
+      "dataSource" : "wikipedia",
+      "parser" : {
+        "type" : "string",
+        "parseSpec" : {
+          "format" : "json",
+          "dimensionsSpec" : {
+            "dimensions" : [
+              "channel",
+              "cityName",
+              "comment",
+              "countryIsoCode",
+              "countryName",
+              "isAnonymous",
+              "isMinor",
+              "isNew",
+              "isRobot",
+              "isUnpatrolled",
+              "metroCode",
+              "namespace",
+              "page",
+              "regionIsoCode",
+              "regionName",
+              "user",
+              { "name": "added", "type": "long" },
+              { "name": "deleted", "type": "long" },
+              { "name": "delta", "type": "long" }
+            ]
+          },
+          "timestampSpec": {
+            "column": "time",
+            "format": "iso"
+          }
+        }
+      },
+      "metricsSpec" : [],
+      "granularitySpec" : {
+        "type" : "uniform",
+        "segmentGranularity" : "day",
+        "queryGranularity" : "none",
+        "intervals" : ["2015-09-12/2015-09-13"],
+        "rollup" : false
+      }
+    },
+    "ioConfig" : {
+      "type" : "index",
+      "firehose" : {
+        "type" : "local",
+        "baseDir" : "quickstart/tutorial/",
+        "filter" : "wikiticker-2015-09-12-sampled.json.gz"
+      },
+      "appendToExisting" : false
+    },
+    "tuningConfig" : {
+      "type" : "index",
+      "maxRowsPerSegment" : 5000000,
+      "maxRowsInMemory" : 25000
+    }
+  }
+}
\ No newline at end of file
diff --git a/distribution/pom.xml b/distribution/pom.xml
index 8b8369a..3f5eeb0 100644
--- a/distribution/pom.xml
+++ b/distribution/pom.xml
@@ -54,12 +54,12 @@
       <groupId>org.apache.drill.memory</groupId>
       <artifactId>drill-memory-base</artifactId>
       <version>${project.version}</version>
-    </dependency>    
+    </dependency>
     <dependency>
       <groupId>org.apache.drill.exec</groupId>
       <artifactId>drill-rpc</artifactId>
       <version>${project.version}</version>
-    </dependency>    
+    </dependency>
     <dependency>
       <groupId>org.apache.drill.exec</groupId>
       <artifactId>drill-java-exec</artifactId>
@@ -160,10 +160,10 @@
       <groupId>org.apache.hbase</groupId>
       <artifactId>hbase-client</artifactId>
       <exclusions>
-       <exclusion>
-           <groupId>io.netty</groupId>
-           <artifactId>netty</artifactId>
-       </exclusion>
+        <exclusion>
+          <groupId>io.netty</groupId>
+          <artifactId>netty</artifactId>
+        </exclusion>
       </exclusions>
     </dependency>
     <dependency>
@@ -327,7 +327,7 @@
           <artifactId>drill-storage-hive-core</artifactId>
           <version>${project.version}</version>
         </dependency>
-	      <dependency>
+        <dependency>
           <groupId>org.apache.drill.contrib</groupId>
           <artifactId>drill-storage-kafka</artifactId>
           <version>${project.version}</version>
@@ -378,6 +378,11 @@
           <artifactId>drill-format-excel</artifactId>
           <version>${project.version}</version>
         </dependency>
+        <dependency>
+          <groupId>org.apache.drill.contrib</groupId>
+          <artifactId>drill-druid-storage</artifactId>
+          <version>${project.version}</version>
+        </dependency>
       </dependencies>
     </profile>
 
@@ -578,7 +583,7 @@
                   </sources>
                 </mapping>
                 <mapping>
-		<directory>/opt/drill/sample-data</directory>
+                  <directory>/opt/drill/sample-data</directory>
                   <sources>
                     <source>
                       <location>target/apache-drill-${project.version}/apache-drill-${project.version}/sample-data</location>
@@ -749,4 +754,4 @@
     </profile>
   </profiles>
 
-</project>
+</project>
\ No newline at end of file
diff --git a/distribution/src/assemble/component.xml b/distribution/src/assemble/component.xml
index 01e371e..b9a2fce 100644
--- a/distribution/src/assemble/component.xml
+++ b/distribution/src/assemble/component.xml
@@ -54,6 +54,7 @@
         <include>org.apache.drill.contrib:drill-storage-http:jar</include>
         <include>org.apache.drill.contrib:drill-opentsdb-storage:jar</include>
         <include>org.apache.drill.contrib:drill-udfs:jar</include>
+        <include>org.apache.drill.contrib:drill-druid-storage:jar</include>
       </includes>
       <outputDirectory>jars</outputDirectory>
       <useProjectArtifact>false</useProjectArtifact>
diff --git a/pom.xml b/pom.xml
index 5329d58..8d716a9 100644
--- a/pom.xml
+++ b/pom.xml
@@ -386,6 +386,7 @@
             <exclude>**/*.csvh-test</exclude>
             <exclude>**/*.tsv</exclude>
             <exclude>**/*.txt</exclude>
+            <exclude>**/*.yaml</exclude>
             <exclude>**/*.ssv</exclude>
             <exclude>**/.buildpath</exclude>
             <exclude>**/target/**</exclude>
@@ -685,6 +686,7 @@
               <exclude>**/*.md</exclude>
               <exclude>**/*.eps</exclude>
               <exclude>**/*.json</exclude>
+              <exclude>**/*.yaml</exclude>
               <exclude>**/*.seq</exclude>
               <exclude>**/*.parquet</exclude>
               <exclude>**/*.avro</exclude>
diff --git a/protocol/src/main/java/org/apache/drill/exec/proto/UserBitShared.java b/protocol/src/main/java/org/apache/drill/exec/proto/UserBitShared.java
index 1510a46..01a51f0 100644
--- a/protocol/src/main/java/org/apache/drill/exec/proto/UserBitShared.java
+++ b/protocol/src/main/java/org/apache/drill/exec/proto/UserBitShared.java
@@ -686,6 +686,10 @@ public final class UserBitShared {
      */
     METADATA_CONTROLLER(67),
     /**
+     * <code>DRUID_SUB_SCAN = 68;</code>
+     */
+    DRUID_SUB_SCAN(68),
+    /**
      * <code>SPSS_SUB_SCAN = 69;</code>
      */
     SPSS_SUB_SCAN(69),
@@ -968,6 +972,10 @@ public final class UserBitShared {
      */
     public static final int METADATA_CONTROLLER_VALUE = 67;
     /**
+     * <code>DRUID_SUB_SCAN = 68;</code>
+     */
+    public static final int DRUID_SUB_SCAN_VALUE = 68;
+    /**
      * <code>SPSS_SUB_SCAN = 69;</code>
      */
     public static final int SPSS_SUB_SCAN_VALUE = 69;
@@ -1065,6 +1073,7 @@ public final class UserBitShared {
         case 65: return SHP_SUB_SCAN;
         case 66: return METADATA_HANDLER;
         case 67: return METADATA_CONTROLLER;
+        case 68: return DRUID_SUB_SCAN;
         case 69: return SPSS_SUB_SCAN;
         case 70: return HTTP_SUB_SCAN;
         default: return null;
@@ -29046,7 +29055,7 @@ public final class UserBitShared {
       "ATEMENT\020\005*\207\001\n\rFragmentState\022\013\n\007SENDING\020\000" +
       "\022\027\n\023AWAITING_ALLOCATION\020\001\022\013\n\007RUNNING\020\002\022\014" +
       "\n\010FINISHED\020\003\022\r\n\tCANCELLED\020\004\022\n\n\006FAILED\020\005\022" +
-      "\032\n\026CANCELLATION_REQUESTED\020\006*\212\013\n\020CoreOper" +
+      "\032\n\026CANCELLATION_REQUESTED\020\006*\236\013\n\020CoreOper" +
       "atorType\022\021\n\rSINGLE_SENDER\020\000\022\024\n\020BROADCAST" +
       "_SENDER\020\001\022\n\n\006FILTER\020\002\022\022\n\016HASH_AGGREGATE\020" +
       "\003\022\r\n\tHASH_JOIN\020\004\022\016\n\nMERGE_JOIN\020\005\022\031\n\025HASH" +
@@ -29081,12 +29090,12 @@ public final class UserBitShared {
       "MERGE\020=\022\021\n\rLTSV_SUB_SCAN\020>\022\021\n\rHDF5_SUB_S" +
       "CAN\020?\022\022\n\016EXCEL_SUB_SCAN\020@\022\020\n\014SHP_SUB_SCA" +
       "N\020A\022\024\n\020METADATA_HANDLER\020B\022\027\n\023METADATA_CO" +
-      "NTROLLER\020C\022\021\n\rSPSS_SUB_SCAN\020E\022\021\n\rHTTP_SU" +
-      "B_SCAN\020F*g\n\nSaslStatus\022\020\n\014SASL_UNKNOWN\020\000" +
-      "\022\016\n\nSASL_START\020\001\022\024\n\020SASL_IN_PROGRESS\020\002\022\020" +
-      "\n\014SASL_SUCCESS\020\003\022\017\n\013SASL_FAILED\020\004B.\n\033org" +
-      ".apache.drill.exec.protoB\rUserBitSharedH" +
-      "\001"
+      "NTROLLER\020C\022\022\n\016DRUID_SUB_SCAN\020D\022\021\n\rSPSS_S" +
+      "UB_SCAN\020E\022\021\n\rHTTP_SUB_SCAN\020F*g\n\nSaslStat" +
+      "us\022\020\n\014SASL_UNKNOWN\020\000\022\016\n\nSASL_START\020\001\022\024\n\020" +
+      "SASL_IN_PROGRESS\020\002\022\020\n\014SASL_SUCCESS\020\003\022\017\n\013" +
+      "SASL_FAILED\020\004B.\n\033org.apache.drill.exec.p" +
+      "rotoB\rUserBitSharedH\001"
     };
     descriptor = com.google.protobuf.Descriptors.FileDescriptor
       .internalBuildGeneratedFileFrom(descriptorData,
diff --git a/protocol/src/main/protobuf/UserBitShared.proto b/protocol/src/main/protobuf/UserBitShared.proto
index 3b99255..f7b7b02 100644
--- a/protocol/src/main/protobuf/UserBitShared.proto
+++ b/protocol/src/main/protobuf/UserBitShared.proto
@@ -379,6 +379,7 @@ enum CoreOperatorType {
   SHP_SUB_SCAN = 65;
   METADATA_HANDLER = 66;
   METADATA_CONTROLLER = 67;
+  DRUID_SUB_SCAN = 68;
   SPSS_SUB_SCAN = 69;
   HTTP_SUB_SCAN = 70;
 }