You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@drill.apache.org by GitBox <gi...@apache.org> on 2020/05/11 00:33:17 UTC

[GitHub] [drill] cgivre commented on a change in pull request #1888: DRILL-5956: Add Storage Plugin for Apache Druid

cgivre commented on a change in pull request #1888:
URL: https://github.com/apache/drill/pull/1888#discussion_r363591692



##########
File path: contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidQueryClient.java
##########
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.druid;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.drill.exec.store.druid.druid.DruidSelectResponse;
+import org.apache.drill.exec.store.druid.druid.PagingIdentifier;
+import org.apache.http.HttpEntity;
+import org.apache.http.HttpResponse;
+import org.apache.http.client.HttpClient;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.entity.ByteArrayEntity;
+import org.apache.http.impl.client.DefaultHttpClient;
+import org.apache.http.util.EntityUtils;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.Map;
+
+import static org.apache.http.protocol.HTTP.CONTENT_TYPE;
+import static javax.ws.rs.core.MediaType.APPLICATION_JSON;
+
+public class DruidQueryClient {
+
+    private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DruidQueryClient.class);

Review comment:
       Same here.

##########
File path: contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidRecordReader.java
##########
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.druid;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import org.apache.drill.common.exceptions.DrillRuntimeException;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.OperatorContext;
+import org.apache.drill.exec.physical.impl.OutputMutator;
+import org.apache.drill.exec.store.AbstractRecordReader;
+import org.apache.drill.exec.store.druid.druid.DruidSelectResponse;
+import org.apache.drill.exec.store.druid.druid.PagingIdentifier;
+import org.apache.drill.exec.store.druid.druid.PagingSpec;
+import org.apache.drill.exec.store.druid.druid.SelectQuery;
+import org.apache.drill.exec.vector.complex.fn.JsonReader;
+import org.apache.drill.exec.vector.complex.impl.VectorContainerWriter;
+import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableList;
+import org.apache.drill.shaded.guava.com.google.common.collect.Sets;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Set;
+
+public class DruidRecordReader extends AbstractRecordReader {
+
+    private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DruidRecordReader.class);
+    private DruidStoragePlugin plugin;
+    private final DruidSubScan.DruidSubScanSpec scanSpec;
+    private List<String> dimensions;
+    private String filters;
+    private ArrayList<PagingIdentifier> pagingIdentifiers = new ArrayList<>();
+
+    private JsonReader jsonReader;
+    private VectorContainerWriter writer;
+
+    private OutputMutator output;
+    private OperatorContext context;
+    private final FragmentContext fragmentContext;
+
+    private ObjectMapper objectMapper = new ObjectMapper();
+
+    public DruidRecordReader(DruidSubScan.DruidSubScanSpec subScanSpec, List<SchemaPath> projectedColumns,
+                             FragmentContext context, DruidStoragePlugin plugin) {
+        dimensions = new ArrayList<String>();
+        setColumns(projectedColumns);
+        this.plugin = plugin;
+        scanSpec = subScanSpec;
+        fragmentContext = context;
+        this.filters = subScanSpec.getFilter();
+    }
+
+    @Override
+    protected Collection<SchemaPath> transformColumns(Collection<SchemaPath> projectedColumns) {
+        Set<SchemaPath> transformed = Sets.newLinkedHashSet();
+        if (isStarQuery()) {
+            transformed.add(SchemaPath.STAR_COLUMN);
+        } else {
+            for (SchemaPath column : projectedColumns) {
+                String fieldName = column.getRootSegment().getPath();
+                transformed.add(column);
+                this.dimensions.add(fieldName);
+            }
+        }
+        return transformed;
+    }
+
+    @Override
+    public void setup(OperatorContext context, OutputMutator output) throws ExecutionSetupException {
+        this.context = context;
+        this.output = output;
+        this.writer = new VectorContainerWriter(output);
+
+        //Lists.newArrayList(getColumns()), true, false, false
+        this.jsonReader =
+                new JsonReader.Builder(fragmentContext.getManagedBuffer())
+                        .schemaPathColumns(ImmutableList.copyOf(getColumns()))
+                        .skipOuterList(true)
+                        .build();
+        logger.debug(" Initialized JsonRecordReader. ");
+    }
+
+    @Override
+    public int next() {
+
+        writer.allocate();
+        writer.reset();
+        SelectQuery selectQuery = new SelectQuery(scanSpec.dataSourceName);
+        selectQuery.setDimensions(this.dimensions);
+        selectQuery.setFilter(this.filters);
+
+        ObjectNode paging = objectMapper.createObjectNode();
+        if (this.pagingIdentifiers != null && !this.pagingIdentifiers.isEmpty()) {
+            for (PagingIdentifier pagingIdentifier : this.pagingIdentifiers) {
+                paging.put(pagingIdentifier.getSegmentName(), pagingIdentifier.getSegmentOffset());
+            }
+        }
+
+        PagingSpec pagingSpec = new PagingSpec(paging);
+        selectQuery.setPagingSpec(pagingSpec);
+
+        DruidQueryClient druidQueryClient = plugin.getDruidQueryClient();
+
+        try {
+            String query = selectQuery.toJson();
+            logger.debug("Executing DRUID query - " + query);
+            DruidSelectResponse druidSelectResponse = druidQueryClient.ExecuteQuery(query);
+            ArrayList<PagingIdentifier> newPagingIdentifiers = druidSelectResponse.getPagingIdentifiers();
+
+            ArrayList<String> newPagingIdentifierNames = new ArrayList<>();
+            for (PagingIdentifier pagingIdentifier : newPagingIdentifiers) {
+                newPagingIdentifierNames.add(pagingIdentifier.getSegmentName());
+            }
+
+            for (PagingIdentifier pagingIdentifier : this.pagingIdentifiers) {
+                if (!newPagingIdentifierNames.contains(pagingIdentifier.getSegmentName())) {
+                    newPagingIdentifiers.add(
+                            new PagingIdentifier(pagingIdentifier.getSegmentName(),
+                                    pagingIdentifier.getSegmentOffset() + 1)
+                    );
+                }
+            }
+
+            //update the paging identifiers
+            this.pagingIdentifiers = newPagingIdentifiers;
+
+            int docCount = 0;
+            for (ObjectNode eventNode : druidSelectResponse.getEvents()) {
+                writer.setPosition(docCount);
+                jsonReader.setSource(eventNode);
+                try {
+                    jsonReader.write(writer);
+                } catch (IOException e) {
+                    String msg = "Failure while reading document. - Parser was at record: " + eventNode.toString();
+                    logger.error(msg, e);
+                    throw new DrillRuntimeException(msg, e);
+                }
+                docCount++;
+            }
+
+            writer.setValueCount(docCount);
+            return docCount;
+        } catch (IOException e) {
+            String msg = "Failure while reading documents";
+            logger.error(msg, e);
+            throw new DrillRuntimeException(msg, e);
+        }
+    }
+
+    @Override
+    public void close() throws Exception {
+

Review comment:
       You should close out any readers here.

##########
File path: contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidGroupScan.java
##########
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.druid;
+
+import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.physical.EndpointAffinity;
+import org.apache.drill.exec.physical.base.AbstractGroupScan;
+import org.apache.drill.exec.physical.base.GroupScan;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.base.ScanStats;
+import org.apache.drill.exec.proto.CoordinationProtos;
+import org.apache.drill.exec.store.StoragePluginRegistry;
+
+import org.apache.drill.exec.store.schedule.AffinityCreator;
+import org.apache.drill.exec.store.schedule.AssignmentCreator;
+import org.apache.drill.exec.store.schedule.CompleteWork;
+import org.apache.drill.exec.store.schedule.EndpointByteMap;
+import org.apache.drill.exec.store.schedule.EndpointByteMapImpl;
+import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
+import org.apache.drill.shaded.guava.com.google.common.collect.ListMultimap;
+import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+
+@JsonTypeName("druid-scan")
+public class DruidGroupScan extends AbstractGroupScan {
+
+    private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DruidGroupScan.class);

Review comment:
       Please import the `org.slf4j` class directly.

##########
File path: contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidRecordReader.java
##########
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.druid;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import org.apache.drill.common.exceptions.DrillRuntimeException;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.OperatorContext;
+import org.apache.drill.exec.physical.impl.OutputMutator;
+import org.apache.drill.exec.store.AbstractRecordReader;
+import org.apache.drill.exec.store.druid.druid.DruidSelectResponse;
+import org.apache.drill.exec.store.druid.druid.PagingIdentifier;
+import org.apache.drill.exec.store.druid.druid.PagingSpec;
+import org.apache.drill.exec.store.druid.druid.SelectQuery;
+import org.apache.drill.exec.vector.complex.fn.JsonReader;
+import org.apache.drill.exec.vector.complex.impl.VectorContainerWriter;
+import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableList;
+import org.apache.drill.shaded.guava.com.google.common.collect.Sets;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Set;
+
+public class DruidRecordReader extends AbstractRecordReader {
+
+    private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DruidRecordReader.class);

Review comment:
       And here...

##########
File path: contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidScanBatchCreator.java
##########
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.druid;
+
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.ops.ExecutorFragmentContext;
+import org.apache.drill.exec.physical.base.GroupScan;
+import org.apache.drill.exec.physical.impl.BatchCreator;
+import org.apache.drill.exec.physical.impl.ScanBatch;
+import org.apache.drill.exec.record.CloseableRecordBatch;
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.store.RecordReader;
+import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
+import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
+
+import java.util.List;
+
+public class DruidScanBatchCreator implements BatchCreator<DruidSubScan> {
+
+    static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DruidScanBatchCreator.class);

Review comment:
       Please make private and import the `org.slf4j` class as well.  

##########
File path: contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidStoragePlugin.java
##########
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.druid;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.calcite.schema.SchemaPlus;
+import org.apache.drill.common.JSONOptions;
+import org.apache.drill.exec.physical.base.AbstractGroupScan;
+import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.store.AbstractStoragePlugin;
+import org.apache.drill.exec.store.SchemaConfig;
+import org.apache.drill.exec.store.druid.schema.DruidSchemaFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+public class DruidStoragePlugin extends AbstractStoragePlugin {
+
+    static final Logger logger = LoggerFactory.getLogger(DruidStoragePlugin.class);
+
+    private final DrillbitContext context;
+    private final DruidStoragePluginConfig pluginConfig;
+    private final DruidAdminClient druidAdminClient;
+    private final DruidQueryClient druidQueryClient;
+    private final DruidSchemaFactory schemaFactory;
+
+    public DruidStoragePlugin(DruidStoragePluginConfig pluginConfig, DrillbitContext context, String name) {
+        super(context, name);
+        this.pluginConfig = pluginConfig;
+        this.context = context;
+        this.druidAdminClient = new DruidAdminClient(pluginConfig.GetCoordinatorURI());
+        this.druidQueryClient = new DruidQueryClient(pluginConfig.GetBrokerURI());
+        this.schemaFactory = new DruidSchemaFactory(this, name);
+    }
+
+    @Override
+    public AbstractGroupScan getPhysicalScan(String userName, JSONOptions selection) throws IOException {
+        DruidScanSpec scanSpec = selection.getListWith(new ObjectMapper(), new TypeReference<DruidScanSpec>() {});
+        return new DruidGroupScan(this, scanSpec, null);
+    }
+
+    @Override
+    public void registerSchemas(SchemaConfig schemaConfig, SchemaPlus parent) throws IOException {
+        schemaFactory.registerSchemas(schemaConfig, parent);
+    }
+
+/*    @Override
+    public Set<StoragePluginOptimizerRule> getPhysicalOptimizerRules(OptimizerRulesContext optimizerRulesContext) {
+        return ImmutableSet.of(DruidPushDownFilterForScan.INSTANCE);
+    }*/

Review comment:
       Please remove the commented out code.

##########
File path: contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidStoragePluginConfig.java
##########
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.druid;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import org.apache.drill.common.logical.StoragePluginConfigBase;
+
+@JsonTypeName(DruidStoragePluginConfig.NAME)
+public class DruidStoragePluginConfig extends StoragePluginConfigBase {
+
+    private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DruidStoragePluginConfig.class);
+
+    public static final String NAME = "druid";
+
+    @JsonProperty
+    private final String brokerAddress;
+
+    @JsonProperty
+    private final String coordinatorAddress;
+
+    @JsonCreator
+    public DruidStoragePluginConfig(
+            @JsonProperty("brokerAddress") String brokerAddress,
+            @JsonProperty("coordinatorAddress") String coordinatorAddress) {
+
+        this.brokerAddress = brokerAddress;
+        this.coordinatorAddress = coordinatorAddress;
+        logger.info("Broker Address - {}, Coordinator Address - {}", brokerAddress, coordinatorAddress);
+        //TODO Make this configurable.
+    }
+
+    @Override
+    public boolean equals(Object that) {
+        if (this == that) {
+            return true;
+        } else if (that == null || getClass() != that.getClass()) {
+            return false;
+        }
+        DruidStoragePluginConfig thatConfig = (DruidStoragePluginConfig) that;
+        return
+                (this.brokerAddress.equals(thatConfig.brokerAddress)
+                        && this.coordinatorAddress.equals(thatConfig.coordinatorAddress));
+    }
+
+    @Override
+    public int hashCode() {
+        int brokerAddressHashCode = this.brokerAddress != null ? this.brokerAddress.hashCode() : 0;
+        int coordinatorAddressHashCode = this.coordinatorAddress != null ? this.coordinatorAddress.hashCode() : 0;
+        return brokerAddressHashCode ^ coordinatorAddressHashCode;

Review comment:
       There are easier ways of doing this. 
   Take a look here: 
   https://github.com/apache/drill/blob/6c9257b6c73e038d86c9d7b65fb7ac798d1dac4d/contrib/format-excel/src/main/java/org/apache/drill/exec/store/excel/ExcelFormatConfig.java#L86-#L107

##########
File path: contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidStoragePluginConfig.java
##########
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.druid;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import org.apache.drill.common.logical.StoragePluginConfigBase;
+
+@JsonTypeName(DruidStoragePluginConfig.NAME)
+public class DruidStoragePluginConfig extends StoragePluginConfigBase {
+
+    private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DruidStoragePluginConfig.class);

Review comment:
       You know the routine about the logger ;-)

##########
File path: contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidAdminClient.java
##########
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.druid;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.http.HttpResponse;
+import org.apache.http.client.HttpClient;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.impl.client.DefaultHttpClient;
+import org.apache.http.util.EntityUtils;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Set;
+
+import static javax.ws.rs.core.HttpHeaders.CONTENT_TYPE;
+import static javax.ws.rs.core.MediaType.APPLICATION_JSON;
+
+public class DruidAdminClient {
+
+  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DruidAdminClient.class);

Review comment:
       Please fix `logger` import.  

##########
File path: contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidGroupScan.java
##########
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.druid;
+
+import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.physical.EndpointAffinity;
+import org.apache.drill.exec.physical.base.AbstractGroupScan;
+import org.apache.drill.exec.physical.base.GroupScan;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.base.ScanStats;
+import org.apache.drill.exec.proto.CoordinationProtos;
+import org.apache.drill.exec.store.StoragePluginRegistry;
+
+import org.apache.drill.exec.store.schedule.AffinityCreator;
+import org.apache.drill.exec.store.schedule.AssignmentCreator;
+import org.apache.drill.exec.store.schedule.CompleteWork;
+import org.apache.drill.exec.store.schedule.EndpointByteMap;
+import org.apache.drill.exec.store.schedule.EndpointByteMapImpl;
+import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
+import org.apache.drill.shaded.guava.com.google.common.collect.ListMultimap;
+import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+
+@JsonTypeName("druid-scan")
+public class DruidGroupScan extends AbstractGroupScan {
+
+  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DruidGroupScan.class);

Review comment:
       See comment re: `logger`

##########
File path: contrib/storage-druid/README.md
##########
@@ -0,0 +1,24 @@
+# Drill Apache Druid Plugin
+
+Drill druid storage plugin allows you to perform SQL queries against Druid datasource(s).
+
+### Tested with Druid version
+[0.16.0-incubating](https://github.com/apache/incubator-druid/releases/tag/druid-0.16.0-incubating)
+
+### Supported Druid Native Query Types
+
+1. [Select](https://druid.apache.org/docs/latest/querying/select-query.html)

Review comment:
       I think this section is misleading.  If you're referring to the fact that only certain types of queries are pushed down to Druid, you should say that explicitly.  This makes it sound like you can't execute all kinds of queries in Drill. 

##########
File path: contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidSubScan.java
##########
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.druid;
+
+import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.logical.StoragePluginConfig;
+import org.apache.drill.exec.physical.base.AbstractBase;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.base.PhysicalVisitor;
+import org.apache.drill.exec.physical.base.SubScan;
+import org.apache.drill.exec.proto.UserBitShared.CoreOperatorType;
+import org.apache.drill.exec.store.StoragePluginRegistry;
+import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
+
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+
+import static java.util.Collections.emptyIterator;
+
+/**
+ * A Class containing information to read a single druid data source.
+ */
+@JsonTypeName("druid-datasource-scan")
+public class DruidSubScan extends AbstractBase implements SubScan {
+
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DruidSubScan.class);
+
+  @JsonProperty
+  public final DruidStoragePluginConfig druidStoragePluginConfig;
+
+  @JsonIgnore
+  private final DruidStoragePlugin druidStoragePlugin;
+
+  private final List<DruidSubScanSpec> dataSourceScanSpecList;
+  private final List<SchemaPath> columns;
+
+  @JsonCreator
+  public DruidSubScan(@JacksonInject StoragePluginRegistry registry,
+                      @JsonProperty("userName") String userName,
+                      @JsonProperty("druidStoragePluginConfig") StoragePluginConfig druidStoragePluginConfig,
+                      @JsonProperty("datasourceScanSpecList") LinkedList<DruidSubScanSpec> datasourceScanSpecList,
+                      @JsonProperty("columns") List<SchemaPath> columns) throws ExecutionSetupException {
+    super(userName);
+    druidStoragePlugin = (DruidStoragePlugin) registry.getPlugin(druidStoragePluginConfig);
+    this.dataSourceScanSpecList = datasourceScanSpecList;
+    this.druidStoragePluginConfig = (DruidStoragePluginConfig) druidStoragePluginConfig;
+    this.columns = columns;
+  }
+
+  public DruidSubScan(String userName, DruidStoragePlugin plugin, DruidStoragePluginConfig config,
+                      List<DruidSubScanSpec> dataSourceInfoList, List<SchemaPath> columns) {
+    super(userName);
+    druidStoragePlugin = plugin;
+    druidStoragePluginConfig = config;
+    this.dataSourceScanSpecList = dataSourceInfoList;
+    this.columns = columns;
+  }
+
+  @Override
+  public <T, X, E extends Throwable> T accept(PhysicalVisitor<T, X, E> physicalVisitor, X value) throws E {
+    return physicalVisitor.visitSubScan(this, value);
+  }
+
+  public List<DruidSubScanSpec> getDataSourceScanSpecList() {
+    return dataSourceScanSpecList;
+  }
+
+  @JsonIgnore
+  public DruidStoragePluginConfig getStorageConfig() {
+    return druidStoragePluginConfig;
+  }
+
+  public List<SchemaPath> getColumns() {
+    return columns;
+  }
+
+  @Override
+  public boolean isExecutable() {
+    return false;
+  }
+
+  @JsonIgnore
+  public DruidStoragePlugin getStorageEngine(){
+    return druidStoragePlugin;
+  }
+
+  @Override
+  public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) throws ExecutionSetupException {

Review comment:
       Do we need the `throws` clause here?

##########
File path: contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidQueryClient.java
##########
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.druid;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.drill.exec.store.druid.druid.DruidSelectResponse;
+import org.apache.drill.exec.store.druid.druid.PagingIdentifier;
+import org.apache.http.HttpEntity;
+import org.apache.http.HttpResponse;
+import org.apache.http.client.HttpClient;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.entity.ByteArrayEntity;
+import org.apache.http.impl.client.DefaultHttpClient;
+import org.apache.http.util.EntityUtils;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.Map;
+
+import static org.apache.http.protocol.HTTP.CONTENT_TYPE;
+import static javax.ws.rs.core.MediaType.APPLICATION_JSON;
+
+public class DruidQueryClient {
+
+  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DruidQueryClient.class);

Review comment:
       Sorry... more logger... 
   Please add the import for `org.slf4j.Logger`

##########
File path: contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidSubScan.java
##########
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.druid;
+
+import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.logical.StoragePluginConfig;
+import org.apache.drill.exec.physical.base.AbstractBase;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.base.PhysicalVisitor;
+import org.apache.drill.exec.physical.base.SubScan;
+import org.apache.drill.exec.proto.UserBitShared.CoreOperatorType;
+import org.apache.drill.exec.store.StoragePluginRegistry;
+import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
+
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+
+import static java.util.Collections.emptyIterator;
+
+/**
+ * A Class containing information to read a single druid data source.
+ */
+@JsonTypeName("druid-datasource-scan")
+public class DruidSubScan extends AbstractBase implements SubScan {
+
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DruidSubScan.class);
+
+  @JsonProperty
+  public final DruidStoragePluginConfig druidStoragePluginConfig;
+
+  @JsonIgnore
+  private final DruidStoragePlugin druidStoragePlugin;
+
+  private final List<DruidSubScanSpec> dataSourceScanSpecList;
+  private final List<SchemaPath> columns;
+
+  @JsonCreator
+  public DruidSubScan(@JacksonInject StoragePluginRegistry registry,
+                      @JsonProperty("userName") String userName,
+                      @JsonProperty("druidStoragePluginConfig") StoragePluginConfig druidStoragePluginConfig,
+                      @JsonProperty("datasourceScanSpecList") LinkedList<DruidSubScanSpec> datasourceScanSpecList,
+                      @JsonProperty("columns") List<SchemaPath> columns) throws ExecutionSetupException {
+    super(userName);
+    druidStoragePlugin = (DruidStoragePlugin) registry.getPlugin(druidStoragePluginConfig);
+    this.dataSourceScanSpecList = datasourceScanSpecList;
+    this.druidStoragePluginConfig = (DruidStoragePluginConfig) druidStoragePluginConfig;
+    this.columns = columns;
+  }
+
+  public DruidSubScan(String userName, DruidStoragePlugin plugin, DruidStoragePluginConfig config,
+                      List<DruidSubScanSpec> dataSourceInfoList, List<SchemaPath> columns) {
+    super(userName);
+    druidStoragePlugin = plugin;
+    druidStoragePluginConfig = config;
+    this.dataSourceScanSpecList = dataSourceInfoList;
+    this.columns = columns;
+  }
+
+  @Override
+  public <T, X, E extends Throwable> T accept(PhysicalVisitor<T, X, E> physicalVisitor, X value) throws E {
+    return physicalVisitor.visitSubScan(this, value);
+  }
+
+  public List<DruidSubScanSpec> getDataSourceScanSpecList() {
+    return dataSourceScanSpecList;
+  }
+
+  @JsonIgnore
+  public DruidStoragePluginConfig getStorageConfig() {
+    return druidStoragePluginConfig;
+  }
+
+  public List<SchemaPath> getColumns() {
+    return columns;
+  }
+
+  @Override
+  public boolean isExecutable() {
+    return false;
+  }
+
+  @JsonIgnore
+  public DruidStoragePlugin getStorageEngine(){
+    return druidStoragePlugin;
+  }
+
+  @Override
+  public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) throws ExecutionSetupException {
+    Preconditions.checkArgument(children.isEmpty());
+    return new DruidSubScan(getUserName(), druidStoragePlugin, druidStoragePluginConfig, dataSourceScanSpecList, columns);
+  }
+
+  @Override
+  public int getOperatorType() {
+    return CoreOperatorType.DRUID_SUB_SCAN_VALUE;
+  }
+
+  @Override
+  public Iterator<PhysicalOperator> iterator() {
+    return emptyIterator();
+  }
+
+  public static class DruidSubScanSpec {
+
+    protected String dataSourceName;
+    protected String filter;
+
+    @JsonCreator
+    public DruidSubScanSpec(@JsonProperty("dataSourceName") String dataSourceName,
+                            @JsonProperty("filters") String filters) {
+      this.dataSourceName = dataSourceName;
+      this.filter = filters;
+    }
+
+    DruidSubScanSpec() {

Review comment:
       Please remove this if it is unneeded.

##########
File path: contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidStoragePluginConfig.java
##########
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.druid;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import org.apache.drill.common.logical.StoragePluginConfigBase;
+
+@JsonTypeName(DruidStoragePluginConfig.NAME)
+public class DruidStoragePluginConfig extends StoragePluginConfigBase {
+
+  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DruidStoragePluginConfig.class);
+
+  public static final String NAME = "druid";
+
+  @JsonProperty
+  private final String brokerAddress;
+
+  @JsonProperty
+  private final String coordinatorAddress;
+
+  @JsonCreator
+  public DruidStoragePluginConfig(
+    @JsonProperty("brokerAddress") String brokerAddress,

Review comment:
       One thing I see missing here is any kind of credential or authentication.  I don't really know much about Druid, but I assume it has various authentication methods.  Are we planning on supporting them?  If not in the initial commit, let's put some TODOs and JIRAs in place so that we know where and how to implement that.

##########
File path: contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidStoragePluginConfig.java
##########
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.druid;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import org.apache.drill.common.logical.StoragePluginConfigBase;
+
+@JsonTypeName(DruidStoragePluginConfig.NAME)
+public class DruidStoragePluginConfig extends StoragePluginConfigBase {
+
+  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DruidStoragePluginConfig.class);
+
+  public static final String NAME = "druid";
+
+  @JsonProperty
+  private final String brokerAddress;
+
+  @JsonProperty
+  private final String coordinatorAddress;
+
+  @JsonCreator
+  public DruidStoragePluginConfig(
+    @JsonProperty("brokerAddress") String brokerAddress,
+    @JsonProperty("coordinatorAddress") String coordinatorAddress) {
+
+    this.brokerAddress = brokerAddress;
+    this.coordinatorAddress = coordinatorAddress;
+    logger.info("Broker Address - {}, Coordinator Address - {}", brokerAddress, coordinatorAddress);
+    //TODO Make this configurable.
+  }
+
+  @Override
+  public boolean equals(Object that) {
+    if (this == that) {
+      return true;
+    } else if (that == null || getClass() != that.getClass()) {
+      return false;
+    }
+    DruidStoragePluginConfig thatConfig = (DruidStoragePluginConfig) that;

Review comment:
       Please consider using `Objects.equals()` for this.  See the Excel Format plugin for an example.

##########
File path: contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidSubScan.java
##########
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.druid;
+
+import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.logical.StoragePluginConfig;
+import org.apache.drill.exec.physical.base.AbstractBase;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.base.PhysicalVisitor;
+import org.apache.drill.exec.physical.base.SubScan;
+import org.apache.drill.exec.proto.UserBitShared.CoreOperatorType;
+import org.apache.drill.exec.store.StoragePluginRegistry;
+import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
+
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+
+import static java.util.Collections.emptyIterator;
+
+/**
+ * A Class containing information to read a single druid data source.
+ */
+@JsonTypeName("druid-datasource-scan")
+public class DruidSubScan extends AbstractBase implements SubScan {
+
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DruidSubScan.class);
+
+  @JsonProperty
+  public final DruidStoragePluginConfig druidStoragePluginConfig;
+
+  @JsonIgnore
+  private final DruidStoragePlugin druidStoragePlugin;
+
+  private final List<DruidSubScanSpec> dataSourceScanSpecList;
+  private final List<SchemaPath> columns;
+
+  @JsonCreator
+  public DruidSubScan(@JacksonInject StoragePluginRegistry registry,
+                      @JsonProperty("userName") String userName,
+                      @JsonProperty("druidStoragePluginConfig") StoragePluginConfig druidStoragePluginConfig,
+                      @JsonProperty("datasourceScanSpecList") LinkedList<DruidSubScanSpec> datasourceScanSpecList,
+                      @JsonProperty("columns") List<SchemaPath> columns) throws ExecutionSetupException {
+    super(userName);
+    druidStoragePlugin = (DruidStoragePlugin) registry.getPlugin(druidStoragePluginConfig);
+    this.dataSourceScanSpecList = datasourceScanSpecList;
+    this.druidStoragePluginConfig = (DruidStoragePluginConfig) druidStoragePluginConfig;
+    this.columns = columns;
+  }
+
+  public DruidSubScan(String userName, DruidStoragePlugin plugin, DruidStoragePluginConfig config,
+                      List<DruidSubScanSpec> dataSourceInfoList, List<SchemaPath> columns) {
+    super(userName);
+    druidStoragePlugin = plugin;
+    druidStoragePluginConfig = config;
+    this.dataSourceScanSpecList = dataSourceInfoList;
+    this.columns = columns;
+  }
+
+  @Override
+  public <T, X, E extends Throwable> T accept(PhysicalVisitor<T, X, E> physicalVisitor, X value) throws E {
+    return physicalVisitor.visitSubScan(this, value);
+  }
+
+  public List<DruidSubScanSpec> getDataSourceScanSpecList() {
+    return dataSourceScanSpecList;
+  }
+
+  @JsonIgnore
+  public DruidStoragePluginConfig getStorageConfig() {
+    return druidStoragePluginConfig;
+  }
+
+  public List<SchemaPath> getColumns() {
+    return columns;
+  }
+
+  @Override
+  public boolean isExecutable() {
+    return false;
+  }
+
+  @JsonIgnore
+  public DruidStoragePlugin getStorageEngine(){
+    return druidStoragePlugin;
+  }
+
+  @Override
+  public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) throws ExecutionSetupException {
+    Preconditions.checkArgument(children.isEmpty());
+    return new DruidSubScan(getUserName(), druidStoragePlugin, druidStoragePluginConfig, dataSourceScanSpecList, columns);
+  }
+
+  @Override
+  public int getOperatorType() {
+    return CoreOperatorType.DRUID_SUB_SCAN_VALUE;
+  }
+
+  @Override
+  public Iterator<PhysicalOperator> iterator() {
+    return emptyIterator();
+  }
+
+  public static class DruidSubScanSpec {
+
+    protected String dataSourceName;
+    protected String filter;

Review comment:
       Nit: Should the `filter` variable be singular or plural?  If is singular, please change in the constructor. 

##########
File path: contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidStoragePluginConfig.java
##########
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.druid;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import org.apache.drill.common.logical.StoragePluginConfigBase;
+
+@JsonTypeName(DruidStoragePluginConfig.NAME)
+public class DruidStoragePluginConfig extends StoragePluginConfigBase {
+
+  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DruidStoragePluginConfig.class);
+
+  public static final String NAME = "druid";
+
+  @JsonProperty
+  private final String brokerAddress;
+
+  @JsonProperty
+  private final String coordinatorAddress;
+
+  @JsonCreator
+  public DruidStoragePluginConfig(
+    @JsonProperty("brokerAddress") String brokerAddress,
+    @JsonProperty("coordinatorAddress") String coordinatorAddress) {
+
+    this.brokerAddress = brokerAddress;
+    this.coordinatorAddress = coordinatorAddress;
+    logger.info("Broker Address - {}, Coordinator Address - {}", brokerAddress, coordinatorAddress);

Review comment:
       @akkapur
   To avoid logger clutter please make this a `debug` message.

##########
File path: contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidStoragePluginConfig.java
##########
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.druid;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import org.apache.drill.common.logical.StoragePluginConfigBase;
+
+@JsonTypeName(DruidStoragePluginConfig.NAME)
+public class DruidStoragePluginConfig extends StoragePluginConfigBase {
+
+  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DruidStoragePluginConfig.class);
+
+  public static final String NAME = "druid";
+
+  @JsonProperty
+  private final String brokerAddress;
+
+  @JsonProperty
+  private final String coordinatorAddress;
+
+  @JsonCreator
+  public DruidStoragePluginConfig(
+    @JsonProperty("brokerAddress") String brokerAddress,
+    @JsonProperty("coordinatorAddress") String coordinatorAddress) {
+
+    this.brokerAddress = brokerAddress;
+    this.coordinatorAddress = coordinatorAddress;
+    logger.info("Broker Address - {}, Coordinator Address - {}", brokerAddress, coordinatorAddress);
+    //TODO Make this configurable.

Review comment:
       Please avoid `TODO` in production code.  If you don't want to implement this for this PR, you can leave the `TODO` but please create a JIRA ticket for it and add the JIRA number in a comment. 

##########
File path: contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidStoragePluginConfig.java
##########
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.druid;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import org.apache.drill.common.logical.StoragePluginConfigBase;
+
+@JsonTypeName(DruidStoragePluginConfig.NAME)
+public class DruidStoragePluginConfig extends StoragePluginConfigBase {
+
+  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DruidStoragePluginConfig.class);

Review comment:
       Please import logger class here and elsewhere.  

##########
File path: contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidScanSpecBuilder.java
##########
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.druid;
+
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.store.druid.common.DruidBoundFilter;
+import org.apache.drill.exec.store.druid.common.DruidIntervalFilter;
+import org.apache.drill.exec.store.druid.common.DruidNotFilter;
+import org.apache.drill.exec.store.druid.common.DruidRegexFilter;
+import org.apache.drill.exec.store.druid.common.DruidSearchFilter;
+import org.apache.drill.exec.store.druid.common.DruidSelectorFilter;
+import org.apache.drill.exec.store.druid.druid.SelectQuery;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+class DruidScanSpecBuilder {

Review comment:
       public?

##########
File path: contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidStoragePlugin.java
##########
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.druid;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.calcite.schema.SchemaPlus;
+import org.apache.drill.common.JSONOptions;
+import org.apache.drill.exec.ops.OptimizerRulesContext;
+import org.apache.drill.exec.physical.base.AbstractGroupScan;
+import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.store.AbstractStoragePlugin;
+import org.apache.drill.exec.store.SchemaConfig;
+import org.apache.drill.exec.store.StoragePluginOptimizerRule;
+import org.apache.drill.exec.store.druid.rest.DruidAdminClient;
+import org.apache.drill.exec.store.druid.rest.DruidQueryClient;
+import org.apache.drill.exec.store.druid.rest.RestClient;
+import org.apache.drill.exec.store.druid.rest.RestClientWrapper;
+import org.apache.drill.exec.store.druid.schema.DruidSchemaFactory;
+import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableSet;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Set;
+
+public class DruidStoragePlugin extends AbstractStoragePlugin {
+
+  static final Logger logger = LoggerFactory.getLogger(DruidStoragePlugin.class);

Review comment:
       Please make the logger private.  Here and elsewhere.

##########
File path: contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidSubScan.java
##########
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.druid;
+
+import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.logical.StoragePluginConfig;
+import org.apache.drill.exec.physical.base.AbstractBase;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.base.PhysicalVisitor;
+import org.apache.drill.exec.physical.base.SubScan;
+import org.apache.drill.exec.proto.UserBitShared.CoreOperatorType;
+import org.apache.drill.exec.store.StoragePluginRegistry;
+import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
+
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+
+import static java.util.Collections.emptyIterator;
+
+/**
+ * A Class containing information to read a single druid data source.
+ */
+@JsonTypeName("druid-datasource-scan")
+public class DruidSubScan extends AbstractBase implements SubScan {
+
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DruidSubScan.class);

Review comment:
       Please import the logger class directly.  In this class, the logger is not used at all, so please consider removing if it is unnecessary.   Also, make `private`.

##########
File path: contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidStoragePlugin.java
##########
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.druid;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.calcite.schema.SchemaPlus;
+import org.apache.drill.common.JSONOptions;
+import org.apache.drill.exec.ops.OptimizerRulesContext;
+import org.apache.drill.exec.physical.base.AbstractGroupScan;
+import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.store.AbstractStoragePlugin;
+import org.apache.drill.exec.store.SchemaConfig;
+import org.apache.drill.exec.store.StoragePluginOptimizerRule;
+import org.apache.drill.exec.store.druid.rest.DruidAdminClient;
+import org.apache.drill.exec.store.druid.rest.DruidQueryClient;
+import org.apache.drill.exec.store.druid.rest.RestClient;
+import org.apache.drill.exec.store.druid.rest.RestClientWrapper;
+import org.apache.drill.exec.store.druid.schema.DruidSchemaFactory;
+import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableSet;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Set;
+
+public class DruidStoragePlugin extends AbstractStoragePlugin {
+
+  static final Logger logger = LoggerFactory.getLogger(DruidStoragePlugin.class);
+
+  private final DrillbitContext context;
+  private final DruidStoragePluginConfig pluginConfig;
+  private final DruidAdminClient druidAdminClient;
+  private final DruidQueryClient druidQueryClient;
+  private final DruidSchemaFactory schemaFactory;
+
+  public DruidStoragePlugin(DruidStoragePluginConfig pluginConfig, DrillbitContext context, String name) {
+    super(context, name);
+    this.pluginConfig = pluginConfig;
+    this.context = context;
+    RestClient restClient = new RestClientWrapper();
+    this.druidAdminClient = new DruidAdminClient(pluginConfig.GetCoordinatorURI(), restClient);
+    this.druidQueryClient = new DruidQueryClient(pluginConfig.GetBrokerURI(), restClient);
+    this.schemaFactory = new DruidSchemaFactory(this, name);
+  }
+
+  @Override
+  public AbstractGroupScan getPhysicalScan(String userName, JSONOptions selection) throws IOException {

Review comment:
       Was there a reason why this is `AbstractGroupScan` instead of `DruidGroupScan`?

##########
File path: contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidStoragePluginConfig.java
##########
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.druid;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import org.apache.drill.common.logical.StoragePluginConfigBase;
+
+@JsonTypeName(DruidStoragePluginConfig.NAME)
+public class DruidStoragePluginConfig extends StoragePluginConfigBase {
+
+  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DruidStoragePluginConfig.class);
+
+  public static final String NAME = "druid";
+
+  @JsonProperty
+  private final String brokerAddress;
+
+  @JsonProperty
+  private final String coordinatorAddress;
+
+  @JsonCreator
+  public DruidStoragePluginConfig(
+    @JsonProperty("brokerAddress") String brokerAddress,
+    @JsonProperty("coordinatorAddress") String coordinatorAddress) {
+
+    this.brokerAddress = brokerAddress;
+    this.coordinatorAddress = coordinatorAddress;
+    logger.info("Broker Address - {}, Coordinator Address - {}", brokerAddress, coordinatorAddress);
+    //TODO Make this configurable.
+  }
+
+  @Override
+  public boolean equals(Object that) {
+    if (this == that) {
+      return true;
+    } else if (that == null || getClass() != that.getClass()) {
+      return false;
+    }
+    DruidStoragePluginConfig thatConfig = (DruidStoragePluginConfig) that;
+    return
+      (this.brokerAddress.equals(thatConfig.brokerAddress)
+        && this.coordinatorAddress.equals(thatConfig.coordinatorAddress));
+  }
+
+  @Override
+  public int hashCode() {

Review comment:
       Please consider using `Arrays.hashCode()` for this. 
   See: https://github.com/apache/drill/blob/cd0d5ec10aaf532787396c4b5d562fd05f0f28fc/contrib/format-excel/src/main/java/org/apache/drill/exec/store/excel/ExcelFormatConfig.java#L86-#L90

##########
File path: contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidScanSpecBuilder.java
##########
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.druid;
+
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.store.druid.common.DruidBoundFilter;
+import org.apache.drill.exec.store.druid.common.DruidIntervalFilter;
+import org.apache.drill.exec.store.druid.common.DruidNotFilter;
+import org.apache.drill.exec.store.druid.common.DruidRegexFilter;
+import org.apache.drill.exec.store.druid.common.DruidSearchFilter;
+import org.apache.drill.exec.store.druid.common.DruidSelectorFilter;
+import org.apache.drill.exec.store.druid.druid.SelectQuery;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+class DruidScanSpecBuilder {
+
+  private static final Logger logger = LoggerFactory.getLogger(DruidScanSpecBuilder.class);
+
+  private static final String REGEX_KEYWORD_HINT = "$regex$_";
+
+  DruidScanSpec build(String dataSourceName,
+                      String functionName,
+                      SchemaPath field,
+                      Object fieldValue) throws IOException {
+    // extract the field name
+
+    String fieldName = field.getAsNamePart().getName(); //.getAsUnescapedPath();
+    String filter;
+
+    logger.debug("createDruidScanSpec called. FunctionName - "

Review comment:
       Please use `{}` in logger messages as shown below.  (Forgive errors here)
   
   ```
   logger.debug("createDruidScanSpec called. FunctionName - {}  field - {}, fieldValue - {}", functionName, fieldName, fieldValue);
   ```
   
   Here and elsewhere.  Thanks!

##########
File path: contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidScanSpecBuilder.java
##########
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.druid;
+
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.store.druid.common.DruidBoundFilter;
+import org.apache.drill.exec.store.druid.common.DruidIntervalFilter;
+import org.apache.drill.exec.store.druid.common.DruidNotFilter;
+import org.apache.drill.exec.store.druid.common.DruidRegexFilter;
+import org.apache.drill.exec.store.druid.common.DruidSearchFilter;
+import org.apache.drill.exec.store.druid.common.DruidSelectorFilter;
+import org.apache.drill.exec.store.druid.druid.SelectQuery;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+class DruidScanSpecBuilder {
+
+  private static final Logger logger = LoggerFactory.getLogger(DruidScanSpecBuilder.class);
+
+  private static final String REGEX_KEYWORD_HINT = "$regex$_";
+
+  DruidScanSpec build(String dataSourceName,
+                      String functionName,
+                      SchemaPath field,
+                      Object fieldValue) throws IOException {
+    // extract the field name
+
+    String fieldName = field.getAsNamePart().getName(); //.getAsUnescapedPath();

Review comment:
       Please remove commented out code.

##########
File path: contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/schema/DruidSchemaFactory.java
##########
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.druid.schema;
+
+import org.apache.calcite.schema.SchemaPlus;
+import org.apache.calcite.schema.Table;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.drill.exec.planner.logical.DrillTable;
+import org.apache.drill.exec.planner.logical.DynamicDrillTable;
+import org.apache.drill.exec.store.AbstractSchema;
+import org.apache.drill.exec.store.AbstractSchemaFactory;
+import org.apache.drill.exec.store.SchemaConfig;
+import org.apache.drill.exec.store.druid.DruidScanSpec;
+import org.apache.drill.exec.store.druid.DruidStoragePlugin;
+import org.apache.drill.exec.store.druid.DruidStoragePluginConfig;
+import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableList;
+import org.apache.drill.shaded.guava.com.google.common.collect.Maps;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.Set;
+
+public class DruidSchemaFactory extends AbstractSchemaFactory {
+
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DruidSchemaFactory.class);

Review comment:
       @akkapur 
   Please make all loggers private and without the full package.  Here and elsewhere.

##########
File path: contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/rest/RestClientWrapper.java
##########
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.druid.rest;
+
+import org.apache.http.HttpEntity;
+import org.apache.http.HttpResponse;
+import org.apache.http.client.HttpClient;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.entity.ByteArrayEntity;
+import org.apache.http.impl.client.DefaultHttpClient;
+
+import javax.ws.rs.core.HttpHeaders;
+import java.io.IOException;
+
+import static javax.ws.rs.core.MediaType.APPLICATION_JSON;
+import static org.apache.http.protocol.HTTP.CONTENT_TYPE;
+
+public class RestClientWrapper implements RestClient {

Review comment:
       I'd like to see this committed, but I'd like to ask that once we commit this, can we create a JIRA to convert this to use `okhttp3` as the library for REST requests?  We don't have to make the change now, as long as there is a JIRA and a `TODO` in the code with the JIRA that's fine.

##########
File path: contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/schema/DruidSchemaFactory.java
##########
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.druid.schema;
+
+import org.apache.calcite.schema.SchemaPlus;
+import org.apache.calcite.schema.Table;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.drill.exec.planner.logical.DrillTable;
+import org.apache.drill.exec.planner.logical.DynamicDrillTable;
+import org.apache.drill.exec.store.AbstractSchema;
+import org.apache.drill.exec.store.AbstractSchemaFactory;
+import org.apache.drill.exec.store.SchemaConfig;
+import org.apache.drill.exec.store.druid.DruidScanSpec;
+import org.apache.drill.exec.store.druid.DruidStoragePlugin;
+import org.apache.drill.exec.store.druid.DruidStoragePluginConfig;
+import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableList;
+import org.apache.drill.shaded.guava.com.google.common.collect.Maps;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.Set;
+
+public class DruidSchemaFactory extends AbstractSchemaFactory {
+
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DruidSchemaFactory.class);
+  final DruidStoragePlugin plugin;
+
+  public DruidSchemaFactory(DruidStoragePlugin plugin, String schemaName) {
+    super(schemaName);
+    this.plugin = plugin;
+  }
+
+  @Override
+  public void registerSchemas(SchemaConfig schemaConfig, SchemaPlus parent) {
+    DruidDataSources schema = new DruidDataSources(getName());
+    SchemaPlus hPlus = parent.add(getName(), schema);
+    schema.setHolder(hPlus);
+  }
+
+  public class DruidDataSources extends AbstractSchema {
+
+    private final Set<String> tableNames;
+    private final Map<String, DrillTable> drillTables = Maps.newHashMap();
+
+    public DruidDataSources(String name) {
+      super(ImmutableList.<String>of(), name);
+      this.tableNames = this.getTableNames();
+    }
+
+    public void setHolder(SchemaPlus plusOfThis) {
+    }
+
+    @Override
+    public AbstractSchema getSubSchema(String name) {
+      return null;
+    }
+
+    @Override
+    public Set<String> getSubSchemaNames() {
+      return Collections.emptySet();
+    }
+
+    @Override
+    public Table getTable(String tableName) {
+
+      if (!tableNames.contains(tableName)) { // table does not exist
+        return null;
+      }
+
+      try {
+
+        if (! drillTables.containsKey(tableName)) {
+          DruidScanSpec scanSpec = new DruidScanSpec(tableName);
+          DynamicDrillTable dynamicDrillTable = new DynamicDrillTable(plugin, getName(), null, scanSpec);
+          drillTables.put(tableName, dynamicDrillTable);
+        }
+
+        return drillTables.get(tableName);
+      } catch (Exception e) {
+        logger.warn("Failure while retrieving druid table {}", tableName, e);

Review comment:
       Would you want to throw a `UserException` here to let the user know that something went wrong?

##########
File path: contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidScanSpecBuilder.java
##########
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.druid;
+
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.store.druid.common.DruidBoundFilter;
+import org.apache.drill.exec.store.druid.common.DruidIntervalFilter;
+import org.apache.drill.exec.store.druid.common.DruidNotFilter;
+import org.apache.drill.exec.store.druid.common.DruidRegexFilter;
+import org.apache.drill.exec.store.druid.common.DruidSearchFilter;
+import org.apache.drill.exec.store.druid.common.DruidSelectorFilter;
+import org.apache.drill.exec.store.druid.druid.SelectQuery;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+class DruidScanSpecBuilder {
+
+  private static final Logger logger = LoggerFactory.getLogger(DruidScanSpecBuilder.class);
+
+  private static final String REGEX_KEYWORD_HINT = "$regex$_";
+
+  DruidScanSpec build(String dataSourceName,
+                      String functionName,
+                      SchemaPath field,
+                      Object fieldValue) throws IOException {
+    // extract the field name
+
+    String fieldName = field.getAsNamePart().getName(); //.getAsUnescapedPath();
+    String filter;
+
+    logger.debug("createDruidScanSpec called. FunctionName - "
+      + functionName + ", field - " + fieldName + ", fieldValue - " + fieldValue);
+
+    switch (functionName) {
+      case "equal":

Review comment:
       Would it be better to make this an `ENUM` instead of doing string comparisons?

##########
File path: contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/rest/DruidQueryClient.java
##########
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.druid.rest;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.drill.exec.store.druid.druid.DruidSelectResponse;
+import org.apache.drill.exec.store.druid.druid.PagingIdentifier;
+import org.apache.http.HttpResponse;
+import org.apache.http.HttpStatus;
+import org.apache.http.util.EntityUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.Map;
+
+public class DruidQueryClient {
+
+  private static final Logger logger = LoggerFactory.getLogger(DruidQueryClient.class);
+
+  private static final String QUERY_BASE_URI = "/druid/v2";
+  private static final ObjectMapper mapper = new ObjectMapper();
+
+  private RestClient restClient;
+  private String queryUrl;
+
+  public DruidQueryClient(String brokerURI, RestClient restClient) {
+    queryUrl = brokerURI + QUERY_BASE_URI;
+    this.restClient = restClient;
+    logger.debug("Initialized DruidQueryClient with druidURL - " + this.queryUrl);
+  }
+
+  public DruidSelectResponse executeQuery(String query) throws Exception {
+    logger.debug("Executing Query - " + query);
+
+    HttpResponse response = restClient.post(queryUrl, query);
+
+    if (response.getStatusLine().getStatusCode() != HttpStatus.SC_OK) {
+      throw new Exception(

Review comment:
       Here and elsewhere, please use the `UserException` class to help give the user good error messages.  
   Code below is from the HTTP storage plugin as an example.
   
   ```
   if (!response.isSuccessful()) {
           throw UserException
             .dataReadError()
             .message("HTTP request failed")
             .addContext("Response code", response.code())
             .addContext("Response message", response.message())
             .addContext(errorContext)
             .build(logger);
         }
   ```

##########
File path: contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidStoragePluginConfig.java
##########
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.druid;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import org.apache.drill.common.logical.StoragePluginConfigBase;
+
+@JsonTypeName(DruidStoragePluginConfig.NAME)
+public class DruidStoragePluginConfig extends StoragePluginConfigBase {
+
+  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DruidStoragePluginConfig.class);

Review comment:
       @akkapur 

##########
File path: contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/schema/DruidSchemaFactory.java
##########
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.druid.schema;
+
+import org.apache.calcite.schema.SchemaPlus;
+import org.apache.calcite.schema.Table;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.drill.exec.planner.logical.DrillTable;
+import org.apache.drill.exec.planner.logical.DynamicDrillTable;
+import org.apache.drill.exec.store.AbstractSchema;
+import org.apache.drill.exec.store.AbstractSchemaFactory;
+import org.apache.drill.exec.store.SchemaConfig;
+import org.apache.drill.exec.store.druid.DruidScanSpec;
+import org.apache.drill.exec.store.druid.DruidStoragePlugin;
+import org.apache.drill.exec.store.druid.DruidStoragePluginConfig;
+import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableList;
+import org.apache.drill.shaded.guava.com.google.common.collect.Maps;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.Set;
+
+public class DruidSchemaFactory extends AbstractSchemaFactory {
+
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DruidSchemaFactory.class);
+  final DruidStoragePlugin plugin;
+
+  public DruidSchemaFactory(DruidStoragePlugin plugin, String schemaName) {
+    super(schemaName);
+    this.plugin = plugin;
+  }
+
+  @Override
+  public void registerSchemas(SchemaConfig schemaConfig, SchemaPlus parent) {
+    DruidDataSources schema = new DruidDataSources(getName());
+    SchemaPlus hPlus = parent.add(getName(), schema);
+    schema.setHolder(hPlus);
+  }
+
+  public class DruidDataSources extends AbstractSchema {
+
+    private final Set<String> tableNames;
+    private final Map<String, DrillTable> drillTables = Maps.newHashMap();
+
+    public DruidDataSources(String name) {
+      super(ImmutableList.<String>of(), name);
+      this.tableNames = this.getTableNames();
+    }
+
+    public void setHolder(SchemaPlus plusOfThis) {
+    }
+
+    @Override
+    public AbstractSchema getSubSchema(String name) {
+      return null;
+    }
+
+    @Override
+    public Set<String> getSubSchemaNames() {
+      return Collections.emptySet();
+    }
+
+    @Override
+    public Table getTable(String tableName) {
+
+      if (!tableNames.contains(tableName)) { // table does not exist
+        return null;
+      }
+
+      try {
+
+        if (! drillTables.containsKey(tableName)) {
+          DruidScanSpec scanSpec = new DruidScanSpec(tableName);
+          DynamicDrillTable dynamicDrillTable = new DynamicDrillTable(plugin, getName(), null, scanSpec);
+          drillTables.put(tableName, dynamicDrillTable);
+        }
+
+        return drillTables.get(tableName);
+      } catch (Exception e) {
+        logger.warn("Failure while retrieving druid table {}", tableName, e);
+        return null;
+      }
+    }
+
+    @Override
+    public Set<String> getTableNames() {
+      try {
+        Set<String> dataSources = plugin.getAdminClient().GetDataSources();
+        logger.debug("Found Druid DataSources - " + StringUtils.join(dataSources, ","));

Review comment:
       Please do a check to ensure that all logger statements use the format below: (not string concatenation)
   `logger.debug("Message: {}", message);`
   
   Here and elsewhere.

##########
File path: distribution/src/main/resources/storage-plugins-override-example.conf
##########
@@ -66,3 +66,11 @@
     enabled: true
   }
 }
+"storage": {
+  druid: {
+    type : "druid",
+    brokerAddress : "http://localhost:8082",
+    coordinatorAddress: "http://localhost:8081",
+    enabled : true

Review comment:
       I would remove this and create a `bootstrap-storage-plugin.json` file in the resources folder in your plugin.

##########
File path: distribution/pom.xml
##########
@@ -573,7 +578,7 @@
                   </sources>
                 </mapping>
                 <mapping>
-		<directory>/opt/drill/sample-data</directory>

Review comment:
       Please revert spacing.

##########
File path: contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidScanSpecBuilder.java
##########
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.druid;
+
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.store.druid.common.DruidBoundFilter;
+import org.apache.drill.exec.store.druid.common.DruidIntervalFilter;
+import org.apache.drill.exec.store.druid.common.DruidNotFilter;
+import org.apache.drill.exec.store.druid.common.DruidRegexFilter;
+import org.apache.drill.exec.store.druid.common.DruidSearchFilter;
+import org.apache.drill.exec.store.druid.common.DruidSelectorFilter;
+import org.apache.drill.exec.store.druid.druid.SelectQuery;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+class DruidScanSpecBuilder {
+
+  private static final Logger logger = LoggerFactory.getLogger(DruidScanSpecBuilder.class);
+
+  private static final String REGEX_KEYWORD_HINT = "$regex$_";
+
+  DruidScanSpec build(String dataSourceName,
+                      String functionName,
+                      SchemaPath field,
+                      Object fieldValue) throws IOException {
+    // extract the field name
+
+    String fieldName = field.getAsNamePart().getName(); //.getAsUnescapedPath();
+    String filter;
+
+    logger.debug("createDruidScanSpec called. FunctionName - "
+      + functionName + ", field - " + fieldName + ", fieldValue - " + fieldValue);
+
+    switch (functionName) {
+      case "equal":
+      {
+        if (fieldName.equalsIgnoreCase(SelectQuery.IntervalDimensionName)) {
+          DruidIntervalFilter druidIntervalFilter = new DruidIntervalFilter((String)fieldValue);
+          filter = druidIntervalFilter.toJson();
+          break;
+        } else {
+          DruidSelectorFilter druidSelectorFilter = new DruidSelectorFilter(fieldName, (String) fieldValue);
+          filter = druidSelectorFilter.toJson();
+          break;
+        }
+      }
+      case "not_equal":
+      {
+        DruidSelectorFilter druidSelectorFilter = new DruidSelectorFilter(fieldName, String.valueOf(fieldValue));
+        String selectorFilter = druidSelectorFilter.toJson();
+        DruidNotFilter druidNotFilter = new DruidNotFilter(selectorFilter);
+        filter = druidNotFilter.toJson();
+        break;
+      }
+      case "greater_than_or_equal_to":
+      {
+        DruidBoundFilter druidBoundFilter = new DruidBoundFilter(fieldName, String.valueOf(fieldValue), null);
+        filter = druidBoundFilter.toJson();
+        break;
+      }
+      case "greater_than":
+      {
+        DruidBoundFilter druidBoundFilter = new DruidBoundFilter(fieldName, String.valueOf(fieldValue), null);
+        druidBoundFilter.setLowerStrict(true);
+        filter = druidBoundFilter.toJson();
+        break;
+      }
+      case "less_than_or_equal_to":
+      {
+        DruidBoundFilter druidBoundFilter = new DruidBoundFilter(fieldName, null, String.valueOf(fieldValue));
+        filter = druidBoundFilter.toJson();
+        break;
+      }
+      case "less_than":
+      {
+        DruidBoundFilter druidBoundFilter = new DruidBoundFilter(fieldName, null, String.valueOf(fieldValue));
+        druidBoundFilter.setUpperStrict(true);
+        filter = druidBoundFilter.toJson();
+        break;
+      }
+      case "isnull":
+      case "isNull":
+      case "is null":
+      {
+        DruidSelectorFilter druidSelectorFilter = new DruidSelectorFilter(fieldName, null);
+        filter = druidSelectorFilter.toJson();
+        break;
+      }
+      case "isnotnull":
+      case "isNotNull":
+      case "is not null":
+      {
+        DruidSelectorFilter druidSelectorFilter = new DruidSelectorFilter(fieldName, null);
+        String selectorFilter = druidSelectorFilter.toJson();
+        DruidNotFilter druidNotFilter = new DruidNotFilter(selectorFilter);
+        filter = druidNotFilter.toJson();
+        break;
+      }
+      case "like":
+      {
+        String val = String.valueOf(fieldValue);
+        if(val.startsWith(REGEX_KEYWORD_HINT) ) {
+          DruidRegexFilter druidRegexFilter = new DruidRegexFilter(fieldName, val.substring(REGEX_KEYWORD_HINT.length()));
+          filter = druidRegexFilter.toJson();
+        }
+        else {
+          DruidSearchFilter druidSearchFilter = new DruidSearchFilter(fieldName, false, val);
+          filter = druidSearchFilter.toJson();
+        }
+        break;
+      }
+      default:
+        String message = "No support for functionName-" + functionName;

Review comment:
       A few things here:
   1.  If I'm following the logic correctly, this is where query functions are pushed down to Druid.  My first question is if the user is using a function in Drill that does not exist in Druid, wouldn't we just want to ignore it and let Drill apply that function after the data is received?
   
   2. I would recommend using Drill's `UserException` class for errors.  Take a look here: 
   https://github.com/apache/drill/blob/cc51f2458c49ab786634a0a739652a4b7d454b44/contrib/format-excel/src/main/java/org/apache/drill/exec/store/excel/ExcelBatchReader.java#L259-#L262.
   

##########
File path: contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidScanSpec.java
##########
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.druid;
+
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.drill.exec.store.druid.common.DruidFilter;
+
+public class DruidScanSpec {
+
+  private final String dataSourceName;
+  private DruidFilter filter;
+
+  @JsonCreator
+  public DruidScanSpec(@JsonProperty("dataSourceName") String dataSourceName) {
+    this.dataSourceName = dataSourceName;
+  }
+
+  public DruidScanSpec(String dataSourceName, DruidFilter filter) {
+    this.dataSourceName = dataSourceName;
+    this.filter = filter;
+  }
+
+  public String getDataSourceName() {
+    return this.dataSourceName;
+  }
+
+  public DruidFilter getFilter() {
+    return this.filter;
+  }
+
+  @Override
+  public String toString() {
+    String filter = this.filter == null ? "" : this.filter.toJson();

Review comment:
       Please use `PlanStringBuilder` here and anywhere there is a `toString()` function in a class that is used for query planning.

##########
File path: contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidScanBatchCreator.java
##########
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.druid;
+
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.ops.ExecutorFragmentContext;
+import org.apache.drill.exec.physical.impl.BatchCreator;
+import org.apache.drill.exec.physical.impl.ScanBatch;
+import org.apache.drill.exec.record.CloseableRecordBatch;
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.store.RecordReader;
+import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
+import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+
+public class DruidScanBatchCreator implements BatchCreator<DruidSubScan> {
+
+  static final Logger logger = LoggerFactory.getLogger(DruidScanBatchCreator.class);
+
+  @Override
+  public CloseableRecordBatch getBatch(ExecutorFragmentContext context, DruidSubScan subScan, List<RecordBatch> children) throws ExecutionSetupException {
+    Preconditions.checkArgument(children.isEmpty());
+    List<RecordReader> readers = Lists.newArrayList();
+    List<SchemaPath> columns;
+
+    for (DruidSubScan.DruidSubScanSpec scanSpec : subScan.getScanSpec()) {
+      try {
+        columns = subScan.getColumns();
+        readers.add(new DruidRecordReader(scanSpec, columns, context, subScan.getStorageEngine()));
+      } catch (Exception e1) {
+        throw new ExecutionSetupException(e1);
+      }
+    }
+    logger.debug("Number of record readers initialized : " + readers.size());

Review comment:
       Please do not use string concat in logging statements.

##########
File path: contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidScanBatchCreator.java
##########
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.druid;
+
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.ops.ExecutorFragmentContext;
+import org.apache.drill.exec.physical.impl.BatchCreator;
+import org.apache.drill.exec.physical.impl.ScanBatch;
+import org.apache.drill.exec.record.CloseableRecordBatch;
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.store.RecordReader;
+import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
+import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+
+public class DruidScanBatchCreator implements BatchCreator<DruidSubScan> {
+
+  static final Logger logger = LoggerFactory.getLogger(DruidScanBatchCreator.class);

Review comment:
       private

##########
File path: contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/druid/PagingIdentifier.java
##########
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.druid.druid;
+
+public class PagingIdentifier {
+
+  private String _segmentName;

Review comment:
       Also, please use standard camelCase for naming conventions.  

##########
File path: contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/druid/PagingIdentifier.java
##########
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.druid.druid;
+
+public class PagingIdentifier {
+
+  private String _segmentName;

Review comment:
       final?

##########
File path: contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidGroupScan.java
##########
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.druid;
+
+import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.physical.EndpointAffinity;
+import org.apache.drill.exec.physical.base.AbstractGroupScan;
+import org.apache.drill.exec.physical.base.GroupScan;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.base.ScanStats;
+import org.apache.drill.exec.proto.CoordinationProtos;
+import org.apache.drill.exec.store.StoragePluginRegistry;
+
+import org.apache.drill.exec.store.schedule.AffinityCreator;
+import org.apache.drill.exec.store.schedule.AssignmentCreator;
+import org.apache.drill.exec.store.schedule.CompleteWork;
+import org.apache.drill.exec.store.schedule.EndpointByteMap;
+import org.apache.drill.exec.store.schedule.EndpointByteMapImpl;
+import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
+import org.apache.drill.shaded.guava.com.google.common.collect.ListMultimap;
+import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+
+@JsonTypeName("druid-scan")
+public class DruidGroupScan extends AbstractGroupScan {
+
+  private static final Logger logger = LoggerFactory.getLogger(DruidGroupScan.class);
+  private static final long DEFAULT_TABLET_SIZE = 1000;
+  // assuming a 500 MB segment size with 5 millions rows per segment
+  private static final int DEFAULT_ROW_SIZE = 100;
+  private final DruidScanSpec scanSpec;
+  private final DruidStoragePlugin storagePlugin;
+
+  private List<SchemaPath> columns;
+  private boolean filterPushedDown = false;
+  private List<DruidWork> druidWorkList = new ArrayList<>();
+  private ListMultimap<Integer,DruidWork> assignments;
+  private List<EndpointAffinity> affinities;
+
+  @JsonCreator
+  public DruidGroupScan(@JsonProperty("userName") String userName,
+                        @JsonProperty("scanSpec") DruidScanSpec scanSpec,
+                        @JsonProperty("storagePluginConfig") DruidStoragePluginConfig storagePluginConfig,
+                        @JsonProperty("columns") List<SchemaPath> columns,
+                        @JacksonInject StoragePluginRegistry pluginRegistry) {
+    this(userName,
+        pluginRegistry.resolve(storagePluginConfig, DruidStoragePlugin.class),
+        scanSpec,
+        columns);
+  }
+
+  public DruidGroupScan(String userName, DruidStoragePlugin storagePlugin, DruidScanSpec scanSpec,
+                        List<SchemaPath> columns) {
+    super(userName);
+    this.storagePlugin = storagePlugin;
+    this.scanSpec = scanSpec;
+    this.columns = columns == null || columns.size() == 0? ALL_COLUMNS : columns;
+    init();
+  }
+
+  /**
+   * Private constructor, used for cloning.
+   * @param that The DruidGroupScan to clone
+   */
+  private DruidGroupScan(DruidGroupScan that) {
+    super(that);
+    this.columns = that.columns;
+    this.scanSpec = that.scanSpec;
+    this.storagePlugin = that.storagePlugin;
+    this.filterPushedDown = that.filterPushedDown;
+    this.druidWorkList = that.druidWorkList;
+    this.assignments = that.assignments;
+  }
+
+  @Override
+  public GroupScan clone(List<SchemaPath> columns) {
+    DruidGroupScan newScan = new DruidGroupScan(this);
+    newScan.columns = columns;
+    return newScan;
+  }
+
+  @Override
+  public List<EndpointAffinity> getOperatorAffinity() {
+    if (affinities == null) {
+      affinities = AffinityCreator.getAffinityMap(druidWorkList);
+    }
+    return affinities;
+  }
+
+  @Override
+  public boolean canPushdownProjects(List<SchemaPath> columns) {
+    return true;
+  }
+
+  @JsonIgnore
+  public boolean isFilterPushedDown() {
+    return filterPushedDown;
+  }
+
+  @JsonIgnore
+  public void setFilterPushedDown(boolean filterPushedDown) {
+    this.filterPushedDown = filterPushedDown;
+  }
+
+  private void init() {
+    logger.debug("Adding Druid Work for Table - " + getTableName() + " Filter - " + getScanSpec().getFilter());

Review comment:
       Please remove concat from logger statements.

##########
File path: contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidGroupScan.java
##########
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.druid;
+
+import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.physical.EndpointAffinity;
+import org.apache.drill.exec.physical.base.AbstractGroupScan;
+import org.apache.drill.exec.physical.base.GroupScan;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.base.ScanStats;
+import org.apache.drill.exec.proto.CoordinationProtos;
+import org.apache.drill.exec.store.StoragePluginRegistry;
+
+import org.apache.drill.exec.store.schedule.AffinityCreator;
+import org.apache.drill.exec.store.schedule.AssignmentCreator;
+import org.apache.drill.exec.store.schedule.CompleteWork;
+import org.apache.drill.exec.store.schedule.EndpointByteMap;
+import org.apache.drill.exec.store.schedule.EndpointByteMapImpl;
+import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
+import org.apache.drill.shaded.guava.com.google.common.collect.ListMultimap;
+import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+
+@JsonTypeName("druid-scan")
+public class DruidGroupScan extends AbstractGroupScan {
+
+  private static final Logger logger = LoggerFactory.getLogger(DruidGroupScan.class);
+  private static final long DEFAULT_TABLET_SIZE = 1000;
+  // assuming a 500 MB segment size with 5 millions rows per segment
+  private static final int DEFAULT_ROW_SIZE = 100;

Review comment:
       Is this something that you might want the reader to configure?  If so perhaps include it as an option in the config.

##########
File path: contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidSubScan.java
##########
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.druid;
+
+import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.logical.StoragePluginConfig;
+import org.apache.drill.exec.physical.base.AbstractBase;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.base.PhysicalVisitor;
+import org.apache.drill.exec.physical.base.SubScan;
+import org.apache.drill.exec.proto.UserBitShared.CoreOperatorType;
+import org.apache.drill.exec.store.StoragePluginRegistry;
+import org.apache.drill.exec.store.druid.common.DruidFilter;
+import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+
+import static java.util.Collections.emptyIterator;
+
+/**
+ * A Class containing information to read a single druid data source.
+ */
+@JsonTypeName("druid-datasource-scan")
+public class DruidSubScan extends AbstractBase implements SubScan {
+
+  static final Logger logger = LoggerFactory.getLogger(DruidSubScan.class);
+
+  @JsonIgnore
+  private final DruidStoragePlugin druidStoragePlugin;
+
+  private final List<DruidSubScanSpec> scanSpec;
+  private final List<SchemaPath> columns;
+
+  @JsonCreator
+  public DruidSubScan(@JacksonInject StoragePluginRegistry registry,
+                      @JsonProperty("userName") String userName,
+                      @JsonProperty("config") StoragePluginConfig config,
+                      @JsonProperty("scanSpec") LinkedList<DruidSubScanSpec> datasourceScanSpecList,
+                      @JsonProperty("columns") List<SchemaPath> columns) {
+    super(userName);
+    druidStoragePlugin = registry.resolve(config, DruidStoragePlugin.class);
+    this.scanSpec = datasourceScanSpecList;
+    this.columns = columns;
+  }
+
+  public DruidSubScan(String userName, DruidStoragePlugin plugin,
+                      List<DruidSubScanSpec> dataSourceInfoList, List<SchemaPath> columns) {
+    super(userName);
+    this.druidStoragePlugin = plugin;
+    this.scanSpec = dataSourceInfoList;
+    this.columns = columns;
+  }
+
+  @Override
+  public <T, X, E extends Throwable> T accept(PhysicalVisitor<T, X, E> physicalVisitor, X value) throws E {
+    return physicalVisitor.visitSubScan(this, value);
+  }
+
+  @JsonIgnore
+  public List<DruidSubScanSpec> getScanSpec() {
+    return scanSpec;
+  }
+
+  public List<SchemaPath> getColumns() {
+    return columns;
+  }
+
+  @JsonIgnore
+  @Override
+  public boolean isExecutable() {
+    return false;
+  }
+
+  @JsonIgnore
+  public DruidStoragePlugin getStorageEngine(){
+    return druidStoragePlugin;
+  }
+
+  @Override
+  public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) {
+    Preconditions.checkArgument(children.isEmpty());
+    return new DruidSubScan(getUserName(), druidStoragePlugin, scanSpec, columns);
+  }
+
+  @JsonIgnore
+  @Override
+  public int getOperatorType() {
+    return CoreOperatorType.DRUID_SUB_SCAN_VALUE;
+  }
+
+  @Override
+  public Iterator<PhysicalOperator> iterator() {
+    return emptyIterator();
+  }
+
+  public static class DruidSubScanSpec {
+
+    protected final String dataSourceName;
+    protected final DruidFilter filter;
+
+    @JsonCreator
+    public DruidSubScanSpec(@JsonProperty("dataSourceName") String dataSourceName,
+                            @JsonProperty("filter") DruidFilter filter) {
+      this.dataSourceName = dataSourceName;
+      this.filter = filter;
+    }
+
+    public String getDataSourceName() {
+      return dataSourceName;
+    }
+
+    public DruidFilter getFilter() { return filter; }
+
+    @Override
+    public String toString() {
+      return "DruidSubScanSpec [dataSourceName=" + dataSourceName  + ", filter=" + filter + "]";

Review comment:
       Please use `PlanStringBuilder` for the `toString()` method.

##########
File path: contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidRecordReader.java
##########
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.druid;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import org.apache.drill.common.exceptions.DrillRuntimeException;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.OperatorContext;
+import org.apache.drill.exec.physical.impl.OutputMutator;
+import org.apache.drill.exec.store.AbstractRecordReader;
+import org.apache.drill.exec.store.druid.common.DruidFilter;
+import org.apache.drill.exec.store.druid.druid.DruidSelectResponse;
+import org.apache.drill.exec.store.druid.druid.PagingIdentifier;
+import org.apache.drill.exec.store.druid.druid.PagingSpec;
+import org.apache.drill.exec.store.druid.druid.SelectQuery;
+import org.apache.drill.exec.store.druid.rest.DruidQueryClient;
+import org.apache.drill.exec.vector.complex.fn.JsonReader;
+import org.apache.drill.exec.vector.complex.impl.VectorContainerWriter;
+import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableList;
+import org.apache.drill.shaded.guava.com.google.common.collect.Sets;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Set;
+
+public class DruidRecordReader extends AbstractRecordReader {
+
+  private static final Logger logger = LoggerFactory.getLogger(DruidRecordReader.class);
+  private static final int BATCH_SIZE = 1000;
+  private final DruidStoragePlugin plugin;
+  private final DruidSubScan.DruidSubScanSpec scanSpec;
+  private final List<String> dimensions;
+  private final DruidFilter filter;
+  private ArrayList<PagingIdentifier> pagingIdentifiers = new ArrayList<>();
+
+  private JsonReader jsonReader;
+  private VectorContainerWriter writer;
+
+  private OutputMutator output;
+  private OperatorContext context;
+  private final FragmentContext fragmentContext;
+
+  private ObjectMapper objectMapper = new ObjectMapper();

Review comment:
       Likewise here.. please convert to local variable.

##########
File path: contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidGroupScan.java
##########
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.druid;
+
+import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.physical.EndpointAffinity;
+import org.apache.drill.exec.physical.base.AbstractGroupScan;
+import org.apache.drill.exec.physical.base.GroupScan;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.base.ScanStats;
+import org.apache.drill.exec.proto.CoordinationProtos;
+import org.apache.drill.exec.store.StoragePluginRegistry;
+
+import org.apache.drill.exec.store.schedule.AffinityCreator;
+import org.apache.drill.exec.store.schedule.AssignmentCreator;
+import org.apache.drill.exec.store.schedule.CompleteWork;
+import org.apache.drill.exec.store.schedule.EndpointByteMap;
+import org.apache.drill.exec.store.schedule.EndpointByteMapImpl;
+import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
+import org.apache.drill.shaded.guava.com.google.common.collect.ListMultimap;
+import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+
+@JsonTypeName("druid-scan")
+public class DruidGroupScan extends AbstractGroupScan {
+
+  private static final Logger logger = LoggerFactory.getLogger(DruidGroupScan.class);
+  private static final long DEFAULT_TABLET_SIZE = 1000;
+  // assuming a 500 MB segment size with 5 millions rows per segment
+  private static final int DEFAULT_ROW_SIZE = 100;
+  private final DruidScanSpec scanSpec;
+  private final DruidStoragePlugin storagePlugin;
+
+  private List<SchemaPath> columns;
+  private boolean filterPushedDown = false;
+  private List<DruidWork> druidWorkList = new ArrayList<>();
+  private ListMultimap<Integer,DruidWork> assignments;
+  private List<EndpointAffinity> affinities;
+
+  @JsonCreator
+  public DruidGroupScan(@JsonProperty("userName") String userName,
+                        @JsonProperty("scanSpec") DruidScanSpec scanSpec,
+                        @JsonProperty("storagePluginConfig") DruidStoragePluginConfig storagePluginConfig,
+                        @JsonProperty("columns") List<SchemaPath> columns,
+                        @JacksonInject StoragePluginRegistry pluginRegistry) {
+    this(userName,
+        pluginRegistry.resolve(storagePluginConfig, DruidStoragePlugin.class),
+        scanSpec,
+        columns);
+  }
+
+  public DruidGroupScan(String userName, DruidStoragePlugin storagePlugin, DruidScanSpec scanSpec,
+                        List<SchemaPath> columns) {
+    super(userName);
+    this.storagePlugin = storagePlugin;
+    this.scanSpec = scanSpec;
+    this.columns = columns == null || columns.size() == 0? ALL_COLUMNS : columns;
+    init();
+  }
+
+  /**
+   * Private constructor, used for cloning.
+   * @param that The DruidGroupScan to clone
+   */
+  private DruidGroupScan(DruidGroupScan that) {
+    super(that);
+    this.columns = that.columns;
+    this.scanSpec = that.scanSpec;
+    this.storagePlugin = that.storagePlugin;
+    this.filterPushedDown = that.filterPushedDown;
+    this.druidWorkList = that.druidWorkList;
+    this.assignments = that.assignments;
+  }
+
+  @Override
+  public GroupScan clone(List<SchemaPath> columns) {
+    DruidGroupScan newScan = new DruidGroupScan(this);
+    newScan.columns = columns;
+    return newScan;
+  }
+
+  @Override
+  public List<EndpointAffinity> getOperatorAffinity() {
+    if (affinities == null) {
+      affinities = AffinityCreator.getAffinityMap(druidWorkList);
+    }
+    return affinities;
+  }
+
+  @Override
+  public boolean canPushdownProjects(List<SchemaPath> columns) {
+    return true;
+  }
+
+  @JsonIgnore
+  public boolean isFilterPushedDown() {
+    return filterPushedDown;
+  }
+
+  @JsonIgnore
+  public void setFilterPushedDown(boolean filterPushedDown) {
+    this.filterPushedDown = filterPushedDown;
+  }
+
+  private void init() {
+    logger.debug("Adding Druid Work for Table - " + getTableName() + " Filter - " + getScanSpec().getFilter());
+
+    DruidWork druidWork =
+      new DruidWork(
+        new DruidSubScan.DruidSubScanSpec(
+          getTableName(),
+          getScanSpec().getFilter()
+        )
+      );
+    druidWorkList.add(druidWork);
+  }
+
+  private static class DruidWork implements CompleteWork {
+    private final EndpointByteMapImpl byteMap = new EndpointByteMapImpl();
+    private final DruidSubScan.DruidSubScanSpec druidSubScanSpec;
+
+    public DruidWork(DruidSubScan.DruidSubScanSpec druidSubScanSpec) {
+      this.druidSubScanSpec = druidSubScanSpec;
+    }
+
+    public DruidSubScan.DruidSubScanSpec getDruidSubScanSpec() {
+      return druidSubScanSpec;
+    }
+
+    @Override
+    public long getTotalBytes() {
+      return DEFAULT_TABLET_SIZE;
+    }
+
+    @Override
+    public EndpointByteMap getByteMap() {
+      return byteMap;
+    }
+
+    @Override
+    public int compareTo(CompleteWork o) {
+      return 0;
+    }
+  }
+
+  //TODO - MAY GET MORE PRECISE COUNT FROM DRUID ITSELF.
+  public ScanStats getScanStats() {
+    long recordCount = 100000 * druidWorkList.size();
+    return new ScanStats(
+        ScanStats.GroupScanProperty.NO_EXACT_ROW_COUNT,
+        recordCount,
+        1,
+        recordCount * DEFAULT_ROW_SIZE);
+  }
+
+  @Override
+  public void applyAssignments(List<CoordinationProtos.DrillbitEndpoint> endpoints) {
+    assignments = AssignmentCreator.getMappings(endpoints, druidWorkList);
+  }
+
+  @Override
+  public DruidSubScan getSpecificScan(int minorFragmentId) {
+
+    List<DruidWork> workList = assignments.get(minorFragmentId);
+
+    List<DruidSubScan.DruidSubScanSpec> scanSpecList = Lists.newArrayList();
+    for (DruidWork druidWork : workList) {
+      scanSpecList
+        .add(
+          new DruidSubScan.DruidSubScanSpec(
+            druidWork.getDruidSubScanSpec().getDataSourceName(),
+            druidWork.getDruidSubScanSpec().getFilter()
+          )
+        );
+    }
+
+    return new DruidSubScan(getUserName(), storagePlugin, scanSpecList, this.columns);
+  }
+
+  @JsonIgnore
+  public String getTableName() {
+    return getScanSpec().getDataSourceName();
+  }
+
+  @Override
+  public int getMaxParallelizationWidth() {
+    return druidWorkList.size();
+  }
+
+  @Override
+  public String getDigest() {
+    return toString();
+  }
+
+  @JsonProperty("druidScanSpec")
+  public DruidScanSpec getScanSpec() {
+    return scanSpec;
+  }
+
+  @JsonIgnore
+  public DruidStoragePlugin getStoragePlugin() {
+    return storagePlugin;
+  }
+
+  @JsonProperty
+  public List<SchemaPath> getColumns() {
+    return columns;
+  }
+
+  @Override
+  public String toString() {
+    return String.format("DruidGroupScan [DruidScanSpec=%s, columns=%s]", scanSpec, columns);

Review comment:
       Please use `PlanStringBuilder` for this and elsewhere.

##########
File path: contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/schema/DruidSchemaFactory.java
##########
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.druid.schema;
+
+import org.apache.calcite.schema.SchemaPlus;
+import org.apache.calcite.schema.Table;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.drill.exec.planner.logical.DrillTable;
+import org.apache.drill.exec.planner.logical.DynamicDrillTable;
+import org.apache.drill.exec.store.AbstractSchema;
+import org.apache.drill.exec.store.AbstractSchemaFactory;
+import org.apache.drill.exec.store.SchemaConfig;
+import org.apache.drill.exec.store.druid.DruidScanSpec;
+import org.apache.drill.exec.store.druid.DruidStoragePlugin;
+import org.apache.drill.exec.store.druid.DruidStoragePluginConfig;
+import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableList;
+import org.apache.drill.shaded.guava.com.google.common.collect.Maps;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.Set;
+
+public class DruidSchemaFactory extends AbstractSchemaFactory {
+
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DruidSchemaFactory.class);
+  final DruidStoragePlugin plugin;
+
+  public DruidSchemaFactory(DruidStoragePlugin plugin, String schemaName) {
+    super(schemaName);
+    this.plugin = plugin;
+  }
+
+  @Override
+  public void registerSchemas(SchemaConfig schemaConfig, SchemaPlus parent) {
+    DruidDataSources schema = new DruidDataSources(getName());
+    SchemaPlus hPlus = parent.add(getName(), schema);
+    schema.setHolder(hPlus);
+  }
+
+  public class DruidDataSources extends AbstractSchema {
+
+    private final Set<String> tableNames;
+    private final Map<String, DrillTable> drillTables = Maps.newHashMap();
+
+    public DruidDataSources(String name) {
+      super(ImmutableList.<String>of(), name);

Review comment:
       nit: `String` is not necessary here.

##########
File path: contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/druid/SelectQuery.java
##########
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.druid.druid;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import org.apache.commons.lang3.StringUtils;
+import org.joda.time.DateTime;
+import org.joda.time.DateTimeZone;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Objects;
+
+public class SelectQuery {
+
+  private static final ObjectMapper objectMapper = new ObjectMapper();
+  public static String IntervalDimensionName = "eventInterval";

Review comment:
       nit:  Move `IntervalDimensionName` lower in the list. (Please group all `final` variables together)

##########
File path: contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/druid/SelectQuery.java
##########
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.druid.druid;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import org.apache.commons.lang3.StringUtils;
+import org.joda.time.DateTime;
+import org.joda.time.DateTimeZone;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Objects;
+
+public class SelectQuery {
+
+  private static final ObjectMapper objectMapper = new ObjectMapper();
+  public static String IntervalDimensionName = "eventInterval";
+  private static final String ISO8601DateStringFormat = "yyyy-MM-dd'T'HH:mm:ss.SSSZ";
+  private static final String NixStartTime = "1970-01-01T00:00:00.000Z";
+  private String queryType = "select";
+  private String dataSource;
+  private boolean descending = false;
+  private ArrayList<String> dimensions = new ArrayList<>();
+  private ArrayList<String> metrics = new ArrayList<>();
+  private String granularity = "all";
+  private List<String> intervals = new ArrayList<>();
+  private PagingSpec pagingSpec = new PagingSpec(null);
+  private String filter;
+
+  public SelectQuery(String dataSource, List<String> intervals) {
+    this.dataSource = dataSource;
+    this.intervals = intervals;
+  }
+
+  public SelectQuery(String dataSource) {
+    this.dataSource = dataSource;
+
+    //Note - Interval is always this by default because there is no way to provide an interval via SQL
+    DateTime now = new DateTime();
+    DateTime zulu = now.toDateTime( DateTimeZone.UTC );
+    String interval = NixStartTime + "/" + zulu;
+    this.intervals.add(interval);
+  }
+
+  public String getQueryType() {
+    return queryType;
+  }
+
+  public void setQueryType(String queryType) {
+    this.queryType = queryType;
+  }
+
+  public String getDataSource() {
+    return dataSource;
+  }
+
+  public void setDataSource(String dataSource) {
+    this.dataSource = dataSource;
+  }
+
+  public boolean isDescending() {
+    return descending;
+  }
+
+  public void setDescending(boolean descending) {
+    this.descending = descending;
+  }
+
+  public ArrayList<String> getDimensions() {
+    return dimensions;
+  }
+
+  public void setDimensions(List<String> dimensions) {
+    this.dimensions = (ArrayList<String>) dimensions;
+  }
+
+  public ArrayList<String> getMetrics() {
+    return metrics;
+  }
+
+  public void setMetrics(ArrayList<String> metrics) {
+    this.metrics = metrics;
+  }
+
+  public String getGranularity() {
+    return granularity;
+  }
+
+  public void setGranularity(String granularity) {
+    this.granularity = granularity;
+  }
+
+  public List<String> getIntervals() {
+    return intervals;
+  }
+
+  public void setIntervals(ArrayList<String> intervals) {
+    this.intervals = intervals;
+  }
+
+  public PagingSpec getPagingSpec() {
+    return pagingSpec;
+  }
+
+  public void setPagingSpec(PagingSpec pagingSpec) {
+    this.pagingSpec = pagingSpec;
+  }
+
+  public String getFilter() {
+    return filter;
+  }
+
+  public void setFilter(String filter) {
+    this.filter = filter;
+  }
+
+  public String toJson() throws IOException {

Review comment:
       I think @paul-rogers mentioned this in another file, but is there some reason that Jackson was not used for se/de?  I think you could avoid having a toJson() method if you did that.
   

##########
File path: contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidScanSpecBuilder.java
##########
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.druid;
+
+import org.apache.drill.common.FunctionNames;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.store.druid.common.DruidBoundFilter;
+import org.apache.drill.exec.store.druid.common.DruidFilter;
+import org.apache.drill.exec.store.druid.common.DruidIntervalFilter;
+import org.apache.drill.exec.store.druid.common.DruidNotFilter;
+import org.apache.drill.exec.store.druid.common.DruidRegexFilter;
+import org.apache.drill.exec.store.druid.common.DruidSearchFilter;
+import org.apache.drill.exec.store.druid.common.DruidSearchQuerySpec;
+import org.apache.drill.exec.store.druid.common.DruidSelectorFilter;
+import org.apache.drill.exec.store.druid.druid.SelectQuery;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+class DruidScanSpecBuilder {
+
+  private static final Logger logger = LoggerFactory.getLogger(DruidScanSpecBuilder.class);
+  private static final String REGEX_KEYWORD_HINT = "$regex$_";
+
+  DruidScanSpec build(String dataSourceName,
+                      String functionName,
+                      SchemaPath field,
+                      Object fieldValue) throws IOException {

Review comment:
       If this method doesn't throw IOException, please remove.

##########
File path: contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/rest/RestClientWrapper.java
##########
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.druid.rest;
+
+import org.apache.http.HttpEntity;
+import org.apache.http.HttpResponse;
+import org.apache.http.client.HttpClient;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.entity.ByteArrayEntity;
+import org.apache.http.impl.client.DefaultHttpClient;
+
+import javax.ws.rs.core.HttpHeaders;
+import java.io.IOException;
+
+import static javax.ws.rs.core.MediaType.APPLICATION_JSON;
+import static org.apache.http.protocol.HTTP.CONTENT_TYPE;
+
+public class RestClientWrapper implements RestClient {
+  private static final HttpClient httpClient = new DefaultHttpClient();

Review comment:
       `DefaultHttpClient()` is deprecated.  Is there a substitute in that package?  If not, can we replace with `okhttp3`?

##########
File path: contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidRecordReader.java
##########
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.druid;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import org.apache.drill.common.exceptions.DrillRuntimeException;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.OperatorContext;
+import org.apache.drill.exec.physical.impl.OutputMutator;
+import org.apache.drill.exec.store.AbstractRecordReader;
+import org.apache.drill.exec.store.druid.common.DruidFilter;
+import org.apache.drill.exec.store.druid.druid.DruidSelectResponse;
+import org.apache.drill.exec.store.druid.druid.PagingIdentifier;
+import org.apache.drill.exec.store.druid.druid.PagingSpec;
+import org.apache.drill.exec.store.druid.druid.SelectQuery;
+import org.apache.drill.exec.store.druid.rest.DruidQueryClient;
+import org.apache.drill.exec.vector.complex.fn.JsonReader;
+import org.apache.drill.exec.vector.complex.impl.VectorContainerWriter;
+import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableList;
+import org.apache.drill.shaded.guava.com.google.common.collect.Sets;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Set;
+
+public class DruidRecordReader extends AbstractRecordReader {
+
+  private static final Logger logger = LoggerFactory.getLogger(DruidRecordReader.class);
+  private static final int BATCH_SIZE = 1000;
+  private final DruidStoragePlugin plugin;
+  private final DruidSubScan.DruidSubScanSpec scanSpec;
+  private final List<String> dimensions;
+  private final DruidFilter filter;
+  private ArrayList<PagingIdentifier> pagingIdentifiers = new ArrayList<>();
+
+  private JsonReader jsonReader;
+  private VectorContainerWriter writer;
+
+  private OutputMutator output;
+  private OperatorContext context;

Review comment:
       Please convert to local variable.

##########
File path: contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/rest/DruidQueryClient.java
##########
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.druid.rest;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.drill.exec.store.druid.druid.DruidSelectResponse;
+import org.apache.drill.exec.store.druid.druid.PagingIdentifier;
+import org.apache.http.HttpResponse;
+import org.apache.http.HttpStatus;
+import org.apache.http.util.EntityUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.Map;
+
+public class DruidQueryClient {
+
+  private static final Logger logger = LoggerFactory.getLogger(DruidQueryClient.class);
+
+  private static final String QUERY_BASE_URI = "/druid/v2";
+  private static final ObjectMapper mapper = new ObjectMapper();
+
+  private RestClient restClient;
+  private String queryUrl;
+
+  public DruidQueryClient(String brokerURI, RestClient restClient) {
+    queryUrl = brokerURI + QUERY_BASE_URI;
+    this.restClient = restClient;
+    logger.debug("Initialized DruidQueryClient with druidURL - " + this.queryUrl);
+  }
+
+  public DruidSelectResponse executeQuery(String query) throws Exception {
+    logger.debug("Executing Query - " + query);

Review comment:
       Same comment about loggers

##########
File path: contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidFilterBuilder.java
##########
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.druid;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import org.apache.drill.common.FunctionNames;
+import org.apache.drill.common.expression.BooleanOperator;
+import org.apache.drill.common.expression.FunctionCall;
+import org.apache.drill.common.expression.LogicalExpression;
+import org.apache.drill.common.expression.visitors.AbstractExprVisitor;
+import org.apache.drill.exec.store.druid.common.DruidFilter;
+import org.apache.drill.exec.store.druid.common.DruidUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+
+public class DruidFilterBuilder extends
+  AbstractExprVisitor<DruidScanSpec, Void, RuntimeException> {
+
+  private static final Logger logger = LoggerFactory.getLogger(DruidFilterBuilder.class);
+
+  private final DruidGroupScan groupScan;
+  private final LogicalExpression le;
+  private final DruidScanSpecBuilder druidScanSpecBuilder;
+  private boolean allExpressionsConverted = true;
+
+  public DruidFilterBuilder(DruidGroupScan groupScan,
+                            LogicalExpression conditionExp) {
+    this.groupScan = groupScan;
+    this.le = conditionExp;
+    this.druidScanSpecBuilder = new DruidScanSpecBuilder();
+  }
+
+  public DruidScanSpec parseTree() throws JsonProcessingException {

Review comment:
       If this doesn't throw an exception, please remove the `throws` clause.

##########
File path: contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidPushDownFilterForScan.java
##########
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.druid;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rex.RexNode;
+import org.apache.drill.common.expression.LogicalExpression;
+import org.apache.drill.exec.planner.logical.DrillOptiq;
+import org.apache.drill.exec.planner.logical.DrillParseContext;
+import org.apache.drill.exec.planner.logical.RelOptHelper;
+import org.apache.drill.exec.planner.physical.FilterPrel;
+import org.apache.drill.exec.planner.physical.PrelUtil;
+import org.apache.drill.exec.planner.physical.ScanPrel;
+import org.apache.drill.exec.store.StoragePluginOptimizerRule;
+import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableList;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class DruidPushDownFilterForScan extends StoragePluginOptimizerRule {
+
+  private static final Logger logger = LoggerFactory.getLogger(DruidPushDownFilterForScan.class);
+
+  public static final StoragePluginOptimizerRule INSTANCE = new DruidPushDownFilterForScan();
+
+  private DruidPushDownFilterForScan() {
+    super(
+      RelOptHelper.some(FilterPrel.class, RelOptHelper.any(ScanPrel.class)),
+      "DruidPushDownFilterForScan");
+  }
+
+  @Override
+  public void onMatch(RelOptRuleCall relOptRuleCall) {
+    final ScanPrel scan = (ScanPrel) relOptRuleCall.rel(1);
+    final FilterPrel filter = (FilterPrel) relOptRuleCall.rel(0);
+    final RexNode condition = filter.getCondition();
+
+    DruidGroupScan groupScan = (DruidGroupScan) scan.getGroupScan();
+    if (groupScan.isFilterPushedDown()) {
+      return;
+    }
+
+    LogicalExpression conditionExp =
+      DrillOptiq.toDrill(
+        new DrillParseContext(PrelUtil.getPlannerSettings(relOptRuleCall.getPlanner())),
+        scan,
+        condition);
+
+    DruidFilterBuilder druidFilterBuilder =
+      new DruidFilterBuilder(groupScan, conditionExp);
+
+    DruidScanSpec newScanSpec = null;
+    try {
+      newScanSpec = druidFilterBuilder.parseTree();
+    } catch (JsonProcessingException e) {
+      logger.error("Error in onMatch. Exception - " + e.getMessage());
+    }
+    if (newScanSpec == null) {
+      return; // no filter pushdown so nothing to apply.
+    }
+
+    DruidGroupScan newGroupsScan =
+        new DruidGroupScan(
+            groupScan.getUserName(),
+            groupScan.getStoragePlugin(),
+            newScanSpec,
+            groupScan.getColumns());
+    newGroupsScan.setFilterPushedDown(true);
+
+    final ScanPrel newScanPrel = ScanPrel.create(scan, filter.getTraitSet(),
+      newGroupsScan, scan.getRowType());
+    if (druidFilterBuilder.isAllExpressionsConverted()) {
+      /*
+       * Since we could convert the entire filter condition expression into an
+       * Druid filter, we can eliminate the filter operator altogether.
+       */
+      relOptRuleCall.transformTo(newScanPrel);
+    } else {
+      relOptRuleCall.transformTo(filter.copy(filter.getTraitSet(),
+        ImmutableList.of((RelNode) newScanPrel)));
+    }
+  }
+
+  @Override
+  public boolean matches(RelOptRuleCall call) {
+    final ScanPrel scan = (ScanPrel) call.rel(1);

Review comment:
       One more redundant cast...

##########
File path: contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidPushDownFilterForScan.java
##########
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.druid;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rex.RexNode;
+import org.apache.drill.common.expression.LogicalExpression;
+import org.apache.drill.exec.planner.logical.DrillOptiq;
+import org.apache.drill.exec.planner.logical.DrillParseContext;
+import org.apache.drill.exec.planner.logical.RelOptHelper;
+import org.apache.drill.exec.planner.physical.FilterPrel;
+import org.apache.drill.exec.planner.physical.PrelUtil;
+import org.apache.drill.exec.planner.physical.ScanPrel;
+import org.apache.drill.exec.store.StoragePluginOptimizerRule;
+import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableList;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class DruidPushDownFilterForScan extends StoragePluginOptimizerRule {
+
+  private static final Logger logger = LoggerFactory.getLogger(DruidPushDownFilterForScan.class);
+
+  public static final StoragePluginOptimizerRule INSTANCE = new DruidPushDownFilterForScan();
+
+  private DruidPushDownFilterForScan() {
+    super(
+      RelOptHelper.some(FilterPrel.class, RelOptHelper.any(ScanPrel.class)),
+      "DruidPushDownFilterForScan");
+  }
+
+  @Override
+  public void onMatch(RelOptRuleCall relOptRuleCall) {
+    final ScanPrel scan = (ScanPrel) relOptRuleCall.rel(1);

Review comment:
       Please remove redundant casts.

##########
File path: contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidSubScan.java
##########
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.druid;
+
+import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.logical.StoragePluginConfig;
+import org.apache.drill.exec.physical.base.AbstractBase;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.base.PhysicalVisitor;
+import org.apache.drill.exec.physical.base.SubScan;
+import org.apache.drill.exec.proto.UserBitShared.CoreOperatorType;
+import org.apache.drill.exec.store.StoragePluginRegistry;
+import org.apache.drill.exec.store.druid.common.DruidFilter;
+import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+
+import static java.util.Collections.emptyIterator;
+
+/**
+ * A Class containing information to read a single druid data source.
+ */
+@JsonTypeName("druid-datasource-scan")
+public class DruidSubScan extends AbstractBase implements SubScan {
+
+  static final Logger logger = LoggerFactory.getLogger(DruidSubScan.class);

Review comment:
       Remove logger if unused.

##########
File path: contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidRecordReader.java
##########
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.druid;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import org.apache.drill.common.exceptions.DrillRuntimeException;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.OperatorContext;
+import org.apache.drill.exec.physical.impl.OutputMutator;
+import org.apache.drill.exec.store.AbstractRecordReader;
+import org.apache.drill.exec.store.druid.common.DruidFilter;
+import org.apache.drill.exec.store.druid.druid.DruidSelectResponse;
+import org.apache.drill.exec.store.druid.druid.PagingIdentifier;
+import org.apache.drill.exec.store.druid.druid.PagingSpec;
+import org.apache.drill.exec.store.druid.druid.SelectQuery;
+import org.apache.drill.exec.store.druid.rest.DruidQueryClient;
+import org.apache.drill.exec.vector.complex.fn.JsonReader;
+import org.apache.drill.exec.vector.complex.impl.VectorContainerWriter;
+import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableList;
+import org.apache.drill.shaded.guava.com.google.common.collect.Sets;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Set;
+
+public class DruidRecordReader extends AbstractRecordReader {
+
+  private static final Logger logger = LoggerFactory.getLogger(DruidRecordReader.class);
+  private static final int BATCH_SIZE = 1000;
+  private final DruidStoragePlugin plugin;
+  private final DruidSubScan.DruidSubScanSpec scanSpec;
+  private final List<String> dimensions;
+  private final DruidFilter filter;
+  private ArrayList<PagingIdentifier> pagingIdentifiers = new ArrayList<>();
+
+  private JsonReader jsonReader;
+  private VectorContainerWriter writer;
+
+  private OutputMutator output;
+  private OperatorContext context;
+  private final FragmentContext fragmentContext;
+
+  private ObjectMapper objectMapper = new ObjectMapper();
+
+  public DruidRecordReader(DruidSubScan.DruidSubScanSpec subScanSpec, List<SchemaPath> projectedColumns,
+                           FragmentContext context, DruidStoragePlugin plugin) {
+    dimensions = new ArrayList<>();
+    setColumns(projectedColumns);
+    this.plugin = plugin;
+    scanSpec = subScanSpec;
+    fragmentContext = context;
+    this.filter = subScanSpec.getFilter();
+  }
+
+  @Override
+  protected Collection<SchemaPath> transformColumns(Collection<SchemaPath> projectedColumns) {
+    Set<SchemaPath> transformed = Sets.newLinkedHashSet();
+    if (isStarQuery()) {
+      transformed.add(SchemaPath.STAR_COLUMN);
+    } else {
+      for (SchemaPath column : projectedColumns) {
+        String fieldName = column.getRootSegment().getPath();
+        transformed.add(column);
+        this.dimensions.add(fieldName);
+      }
+    }
+    return transformed;
+  }
+
+  @Override
+  public void setup(OperatorContext context, OutputMutator output) {
+    this.context = context;
+    this.output = output;
+    this.writer = new VectorContainerWriter(output);
+
+    this.jsonReader =
+      new JsonReader.Builder(fragmentContext.getManagedBuffer())
+        .schemaPathColumns(ImmutableList.copyOf(getColumns()))
+        .skipOuterList(true)
+        .build();
+    logger.debug(" Initialized JsonRecordReader. ");
+  }
+
+  @Override
+  public int next() {
+    writer.allocate();
+    writer.reset();
+    SelectQuery selectQuery = new SelectQuery(scanSpec.dataSourceName);
+    selectQuery.setDimensions(this.dimensions);
+    if (this.filter != null) {
+      selectQuery.setFilter(this.filter.toJson());
+    }
+
+    ObjectNode paging = objectMapper.createObjectNode();
+    if (this.pagingIdentifiers != null && !this.pagingIdentifiers.isEmpty()) {
+      for (PagingIdentifier pagingIdentifier : this.pagingIdentifiers) {
+        paging.put(pagingIdentifier.getSegmentName(), pagingIdentifier.getSegmentOffset());
+      }
+    }
+
+    PagingSpec pagingSpec = new PagingSpec(paging, BATCH_SIZE);
+    selectQuery.setPagingSpec(pagingSpec);
+
+    DruidQueryClient druidQueryClient = plugin.getDruidQueryClient();
+
+    try {
+      String query = selectQuery.toJson();
+      logger.debug("Executing DRUID query - " + query);
+      DruidSelectResponse druidSelectResponse = druidQueryClient.executeQuery(query);
+      ArrayList<PagingIdentifier> newPagingIdentifiers = druidSelectResponse.getPagingIdentifiers();
+
+      ArrayList<String> newPagingIdentifierNames = new ArrayList<>();
+      for (PagingIdentifier pagingIdentifier : newPagingIdentifiers) {
+        newPagingIdentifierNames.add(pagingIdentifier.getSegmentName());
+      }
+
+      for (PagingIdentifier pagingIdentifier : this.pagingIdentifiers) {
+        if (!newPagingIdentifierNames.contains(pagingIdentifier.getSegmentName())) {
+          newPagingIdentifiers.add(
+            new PagingIdentifier(pagingIdentifier.getSegmentName(),
+              pagingIdentifier.getSegmentOffset() + 1)
+          );
+        }
+      }
+
+      //update the paging identifiers
+      this.pagingIdentifiers = newPagingIdentifiers;
+
+      int docCount = 0;
+      for (ObjectNode eventNode : druidSelectResponse.getEvents()) {
+        writer.setPosition(docCount);

Review comment:
       I don't know if you've been following the PRs surrounding the new JSON readers that @paul-rogers was working on. However, I would suggest taking a look as they can greatly simplify this class.  Basically, you can send the new readers an `InputStream` with JSON data.  
   
   Take a look here:
   https://github.com/apache/drill/blob/5a067daf114ebdc61334989bfcdc249fb335f68d/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpBatchReader.java#L52-L96
   

##########
File path: contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidStoragePlugin.java
##########
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.druid;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.calcite.schema.SchemaPlus;
+import org.apache.drill.common.JSONOptions;
+import org.apache.drill.exec.ops.OptimizerRulesContext;
+import org.apache.drill.exec.physical.base.AbstractGroupScan;
+import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.store.AbstractStoragePlugin;
+import org.apache.drill.exec.store.SchemaConfig;
+import org.apache.drill.exec.store.StoragePluginOptimizerRule;
+import org.apache.drill.exec.store.druid.rest.DruidAdminClient;
+import org.apache.drill.exec.store.druid.rest.DruidQueryClient;
+import org.apache.drill.exec.store.druid.rest.RestClient;
+import org.apache.drill.exec.store.druid.rest.RestClientWrapper;
+import org.apache.drill.exec.store.druid.schema.DruidSchemaFactory;
+import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableSet;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Set;
+
+public class DruidStoragePlugin extends AbstractStoragePlugin {
+
+  static final Logger logger = LoggerFactory.getLogger(DruidStoragePlugin.class);

Review comment:
       Actually, please remove logger as it is unused in this class.

##########
File path: contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidRecordReader.java
##########
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.druid;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import org.apache.drill.common.exceptions.DrillRuntimeException;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.OperatorContext;
+import org.apache.drill.exec.physical.impl.OutputMutator;
+import org.apache.drill.exec.store.AbstractRecordReader;
+import org.apache.drill.exec.store.druid.common.DruidFilter;
+import org.apache.drill.exec.store.druid.druid.DruidSelectResponse;
+import org.apache.drill.exec.store.druid.druid.PagingIdentifier;
+import org.apache.drill.exec.store.druid.druid.PagingSpec;
+import org.apache.drill.exec.store.druid.druid.SelectQuery;
+import org.apache.drill.exec.store.druid.rest.DruidQueryClient;
+import org.apache.drill.exec.vector.complex.fn.JsonReader;
+import org.apache.drill.exec.vector.complex.impl.VectorContainerWriter;
+import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableList;
+import org.apache.drill.shaded.guava.com.google.common.collect.Sets;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Set;
+
+public class DruidRecordReader extends AbstractRecordReader {
+
+  private static final Logger logger = LoggerFactory.getLogger(DruidRecordReader.class);
+  private static final int BATCH_SIZE = 1000;
+  private final DruidStoragePlugin plugin;
+  private final DruidSubScan.DruidSubScanSpec scanSpec;
+  private final List<String> dimensions;
+  private final DruidFilter filter;
+  private ArrayList<PagingIdentifier> pagingIdentifiers = new ArrayList<>();
+
+  private JsonReader jsonReader;
+  private VectorContainerWriter writer;
+
+  private OutputMutator output;

Review comment:
       Please convert to local variable. 

##########
File path: contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidGroupScan.java
##########
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.druid;
+
+import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.physical.EndpointAffinity;
+import org.apache.drill.exec.physical.base.AbstractGroupScan;
+import org.apache.drill.exec.physical.base.GroupScan;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.base.ScanStats;
+import org.apache.drill.exec.proto.CoordinationProtos;
+import org.apache.drill.exec.store.StoragePluginRegistry;
+
+import org.apache.drill.exec.store.schedule.AffinityCreator;
+import org.apache.drill.exec.store.schedule.AssignmentCreator;
+import org.apache.drill.exec.store.schedule.CompleteWork;
+import org.apache.drill.exec.store.schedule.EndpointByteMap;
+import org.apache.drill.exec.store.schedule.EndpointByteMapImpl;
+import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
+import org.apache.drill.shaded.guava.com.google.common.collect.ListMultimap;
+import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+
+@JsonTypeName("druid-scan")
+public class DruidGroupScan extends AbstractGroupScan {
+
+  private static final Logger logger = LoggerFactory.getLogger(DruidGroupScan.class);
+  private static final long DEFAULT_TABLET_SIZE = 1000;
+  // assuming a 500 MB segment size with 5 millions rows per segment
+  private static final int DEFAULT_ROW_SIZE = 100;
+  private final DruidScanSpec scanSpec;
+  private final DruidStoragePlugin storagePlugin;
+
+  private List<SchemaPath> columns;
+  private boolean filterPushedDown = false;
+  private List<DruidWork> druidWorkList = new ArrayList<>();
+  private ListMultimap<Integer,DruidWork> assignments;
+  private List<EndpointAffinity> affinities;
+
+  @JsonCreator
+  public DruidGroupScan(@JsonProperty("userName") String userName,
+                        @JsonProperty("scanSpec") DruidScanSpec scanSpec,
+                        @JsonProperty("storagePluginConfig") DruidStoragePluginConfig storagePluginConfig,
+                        @JsonProperty("columns") List<SchemaPath> columns,
+                        @JacksonInject StoragePluginRegistry pluginRegistry) {
+    this(userName,
+        pluginRegistry.resolve(storagePluginConfig, DruidStoragePlugin.class),
+        scanSpec,
+        columns);
+  }
+
+  public DruidGroupScan(String userName, DruidStoragePlugin storagePlugin, DruidScanSpec scanSpec,
+                        List<SchemaPath> columns) {
+    super(userName);
+    this.storagePlugin = storagePlugin;
+    this.scanSpec = scanSpec;
+    this.columns = columns == null || columns.size() == 0? ALL_COLUMNS : columns;
+    init();
+  }
+
+  /**
+   * Private constructor, used for cloning.
+   * @param that The DruidGroupScan to clone
+   */
+  private DruidGroupScan(DruidGroupScan that) {
+    super(that);
+    this.columns = that.columns;
+    this.scanSpec = that.scanSpec;
+    this.storagePlugin = that.storagePlugin;
+    this.filterPushedDown = that.filterPushedDown;
+    this.druidWorkList = that.druidWorkList;
+    this.assignments = that.assignments;
+  }
+
+  @Override
+  public GroupScan clone(List<SchemaPath> columns) {
+    DruidGroupScan newScan = new DruidGroupScan(this);
+    newScan.columns = columns;
+    return newScan;
+  }
+
+  @Override
+  public List<EndpointAffinity> getOperatorAffinity() {
+    if (affinities == null) {
+      affinities = AffinityCreator.getAffinityMap(druidWorkList);
+    }
+    return affinities;
+  }
+
+  @Override
+  public boolean canPushdownProjects(List<SchemaPath> columns) {
+    return true;
+  }
+
+  @JsonIgnore
+  public boolean isFilterPushedDown() {
+    return filterPushedDown;
+  }
+
+  @JsonIgnore
+  public void setFilterPushedDown(boolean filterPushedDown) {
+    this.filterPushedDown = filterPushedDown;
+  }
+
+  private void init() {
+    logger.debug("Adding Druid Work for Table - " + getTableName() + " Filter - " + getScanSpec().getFilter());
+
+    DruidWork druidWork =
+      new DruidWork(
+        new DruidSubScan.DruidSubScanSpec(
+          getTableName(),
+          getScanSpec().getFilter()
+        )
+      );
+    druidWorkList.add(druidWork);
+  }
+
+  private static class DruidWork implements CompleteWork {
+    private final EndpointByteMapImpl byteMap = new EndpointByteMapImpl();
+    private final DruidSubScan.DruidSubScanSpec druidSubScanSpec;
+
+    public DruidWork(DruidSubScan.DruidSubScanSpec druidSubScanSpec) {
+      this.druidSubScanSpec = druidSubScanSpec;
+    }
+
+    public DruidSubScan.DruidSubScanSpec getDruidSubScanSpec() {
+      return druidSubScanSpec;
+    }
+
+    @Override
+    public long getTotalBytes() {
+      return DEFAULT_TABLET_SIZE;
+    }
+
+    @Override
+    public EndpointByteMap getByteMap() {
+      return byteMap;
+    }
+
+    @Override
+    public int compareTo(CompleteWork o) {
+      return 0;
+    }
+  }
+
+  //TODO - MAY GET MORE PRECISE COUNT FROM DRUID ITSELF.
+  public ScanStats getScanStats() {
+    long recordCount = 100000 * druidWorkList.size();
+    return new ScanStats(
+        ScanStats.GroupScanProperty.NO_EXACT_ROW_COUNT,
+        recordCount,
+        1,
+        recordCount * DEFAULT_ROW_SIZE);
+  }
+

Review comment:
       To ensure that filters and other pushdowns happen, this function might need some work.  The idea here is that the group scan with the filters should have a lower cost than without.  Take a look at the HTTP plugin group scan here:
   
   https://github.com/apache/drill/blob/5a067daf114ebdc61334989bfcdc249fb335f68d/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpGroupScan.java#L185-L220
   
   

##########
File path: contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/rest/DruidQueryClient.java
##########
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.druid.rest;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.drill.exec.store.druid.druid.DruidSelectResponse;
+import org.apache.drill.exec.store.druid.druid.PagingIdentifier;
+import org.apache.http.HttpResponse;
+import org.apache.http.HttpStatus;
+import org.apache.http.util.EntityUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.Map;
+
+public class DruidQueryClient {
+
+  private static final Logger logger = LoggerFactory.getLogger(DruidQueryClient.class);
+
+  private static final String QUERY_BASE_URI = "/druid/v2";
+  private static final ObjectMapper mapper = new ObjectMapper();
+
+  private RestClient restClient;

Review comment:
       These two variables can be `final`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org