You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by cg...@apache.org on 2022/12/12 17:28:49 UTC

[drill] branch master updated: DRILL-8358: Storage plugin for querying other Apache Drill clusters (#2709)

This is an automated email from the ASF dual-hosted git repository.

cgivre pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git


The following commit(s) were added to refs/heads/master by this push:
     new 314105cf9e DRILL-8358: Storage plugin for querying other Apache Drill clusters (#2709)
314105cf9e is described below

commit 314105cf9e24326c60f10b26a37c8d2ee5a23786
Author: Volodymyr Vysotskyi <vv...@gmail.com>
AuthorDate: Mon Dec 12 19:28:42 2022 +0200

    DRILL-8358: Storage plugin for querying other Apache Drill clusters (#2709)
---
 .../drill/exec/store/excel/ExcelBatchReader.java   |   4 +
 .../drill/exec/store/excel/TestExcelFormat.java    |  17 +-
 contrib/pom.xml                                    |   1 +
 contrib/storage-drill/README.md                    |  36 +++
 contrib/storage-drill/pom.xml                      |  80 ++++++
 .../exec/store/drill/plugin/DrillGroupScan.java    | 156 +++++++++++
 .../exec/store/drill/plugin/DrillRecordReader.java | 238 ++++++++++++++++
 .../store/drill/plugin/DrillScanBatchCreator.java  |  27 +-
 .../exec/store/drill/plugin/DrillScanSpec.java     |  69 +++++
 .../store/drill/plugin/DrillStoragePlugin.java     | 143 ++++++++++
 .../drill/plugin/DrillStoragePluginConfig.java     | 169 +++++++++++
 .../exec/store/drill/plugin/DrillSubScan.java      |  91 ++++++
 .../exec/store/drill/plugin/package-info.java      |  23 +-
 .../drill/plugin/plan/DrillPluginImplementor.java  | 208 ++++++++++++++
 .../drill/plugin/schema/DrillPluginSchema.java     | 139 +++++++++
 .../drill/plugin/schema/DrillSchemaFactory.java    |  68 +++++
 .../main/resources/bootstrap-storage-plugins.json  |   9 +
 .../src/main/resources/drill-module.conf           |  24 ++
 .../store/drill/plugin/DrillPluginQueriesTest.java | 312 +++++++++++++++++++++
 distribution/pom.xml                               |   5 +
 distribution/src/assemble/component.xml            |   1 +
 .../java/org/apache/drill/exec/ExecConstants.java  |   2 +
 .../drill/exec/planner/physical/LeafPrel.java      |   7 +
 .../physical/visitor/StarColumnConverter.java      |   3 +-
 .../physical/visitor/TopProjectVisitor.java        |  10 +-
 .../exec/rpc/user/BlockingResultsListener.java     | 252 +++++++++++++++++
 .../exec/store/enumerable/plan/EnumerablePrel.java |   6 -
 .../drill/exec/store/plan/rel/PluginPrel.java      |  17 +-
 .../java-exec/src/main/resources/drill-module.conf |   3 +-
 .../exec/planner/rm/TestMemoryCalculator.java      |   2 +-
 .../org/apache/drill/jdbc/impl/DrillCursor.java    | 244 +---------------
 .../apache/drill/jdbc/impl/DrillResultSetImpl.java |   4 +-
 .../apache/drill/jdbc/PreparedStatementTest.java   |   4 +-
 .../java/org/apache/drill/jdbc/StatementTest.java  |   4 +-
 34 files changed, 2066 insertions(+), 312 deletions(-)

diff --git a/contrib/format-excel/src/main/java/org/apache/drill/exec/store/excel/ExcelBatchReader.java b/contrib/format-excel/src/main/java/org/apache/drill/exec/store/excel/ExcelBatchReader.java
index 3af6cc74e1..baa99b74b0 100644
--- a/contrib/format-excel/src/main/java/org/apache/drill/exec/store/excel/ExcelBatchReader.java
+++ b/contrib/format-excel/src/main/java/org/apache/drill/exec/store/excel/ExcelBatchReader.java
@@ -20,6 +20,7 @@ package org.apache.drill.exec.store.excel;
 
 import com.github.pjfanning.xlsx.StreamingReader;
 import com.github.pjfanning.xlsx.impl.StreamingWorkbook;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.drill.common.exceptions.CustomErrorContext;
 import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.common.types.TypeProtos.DataMode;
@@ -381,6 +382,9 @@ public class ExcelBatchReader implements ManagedReader {
 
             // Remove leading and trailing whitespace
             tempColumnName = tempColumnName.trim();
+            if (StringUtils.isEmpty(tempColumnName)) {
+              tempColumnName = MISSING_FIELD_NAME_HEADER + (colPosition + 1);
+            }
             tempColumnName = deconflictColumnNames(tempColumnName);
             makeColumn(builder, tempColumnName, MinorType.FLOAT8);
             excelFieldNames.add(colPosition, tempColumnName);
diff --git a/contrib/format-excel/src/test/java/org/apache/drill/exec/store/excel/TestExcelFormat.java b/contrib/format-excel/src/test/java/org/apache/drill/exec/store/excel/TestExcelFormat.java
index 9fe9f8e52b..960fcbe37d 100644
--- a/contrib/format-excel/src/test/java/org/apache/drill/exec/store/excel/TestExcelFormat.java
+++ b/contrib/format-excel/src/test/java/org/apache/drill/exec/store/excel/TestExcelFormat.java
@@ -710,6 +710,7 @@ public class TestExcelFormat extends ClusterTest {
 
     RowSet results = client.queryBuilder().sql(sql).rowSet();
     TupleMetadata expectedSchema = new SchemaBuilder()
+      .addNullable("field_1", MinorType.FLOAT8)
       .addNullable("original_id", MinorType.VARCHAR)
       .addNullable("original_nameFirst", MinorType.VARCHAR)
       .addNullable("original_nameLast", MinorType.VARCHAR)
@@ -722,14 +723,14 @@ public class TestExcelFormat extends ClusterTest {
       .buildSchema();
 
     RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
-      .addRow("XXXX00000001", "James", "Kushner", null, null, 10235, "US", LocalDate.parse("1957-04-18").atStartOfDay().toInstant(ZoneOffset.UTC), "NY")
-      .addRow("XXXX00000002", "Steve", "Hecht", null, null, 11213, "US", LocalDate.parse("1982-08-10").atStartOfDay().toInstant(ZoneOffset.UTC), "NY")
-      .addRow("XXXX00000003", "Ethan", "Stein", null, null, 10028, "US", LocalDate.parse("1991-04-11").atStartOfDay().toInstant(ZoneOffset.UTC), "NY")
-      .addRow("XXXX00000004", "Mohammed", "Fatima", null, "Baltimore", 21202, "US", LocalDate.parse("1990-05-15").atStartOfDay().toInstant(ZoneOffset.UTC), "MD")
-      .addRow("XXXX00000005", "Yakov", "Borodin", null, "Teaneck", 7666, "US", LocalDate.parse("1986-12-20").atStartOfDay().toInstant(ZoneOffset.UTC), "NJ")
-      .addRow("XXXX00000006", "Akhil", "Chavda", null, null, null, "US", null, null)
-      .addRow("XXXX00000007", "Mark", "Rahman", null, "Ellicott City", 21043, null, LocalDate.parse("1974-06-13").atStartOfDay().toInstant(ZoneOffset.UTC), "MD")
-      .addRow("XXXX00000008", "Henry", "Smith", "xxxx@gmail.com", null, null, null, null, null)
+      .addRow(0F, "XXXX00000001", "James", "Kushner", null, null, 10235, "US", LocalDate.parse("1957-04-18").atStartOfDay().toInstant(ZoneOffset.UTC), "NY")
+      .addRow(1F, "XXXX00000002", "Steve", "Hecht", null, null, 11213, "US", LocalDate.parse("1982-08-10").atStartOfDay().toInstant(ZoneOffset.UTC), "NY")
+      .addRow(2F, "XXXX00000003", "Ethan", "Stein", null, null, 10028, "US", LocalDate.parse("1991-04-11").atStartOfDay().toInstant(ZoneOffset.UTC), "NY")
+      .addRow(3F, "XXXX00000004", "Mohammed", "Fatima", null, "Baltimore", 21202, "US", LocalDate.parse("1990-05-15").atStartOfDay().toInstant(ZoneOffset.UTC), "MD")
+      .addRow(4F, "XXXX00000005", "Yakov", "Borodin", null, "Teaneck", 7666, "US", LocalDate.parse("1986-12-20").atStartOfDay().toInstant(ZoneOffset.UTC), "NJ")
+      .addRow(5F, "XXXX00000006", "Akhil", "Chavda", null, null, null, "US", null, null)
+      .addRow(6F, "XXXX00000007", "Mark", "Rahman", null, "Ellicott City", 21043, null, LocalDate.parse("1974-06-13").atStartOfDay().toInstant(ZoneOffset.UTC), "MD")
+      .addRow(7F, "XXXX00000008", "Henry", "Smith", "xxxx@gmail.com", null, null, null, null, null)
       .build();
 
     new RowSetComparison(expected).verifyAndClearAll(results);
diff --git a/contrib/pom.xml b/contrib/pom.xml
index e728da95d3..476bc7f063 100644
--- a/contrib/pom.xml
+++ b/contrib/pom.xml
@@ -58,6 +58,7 @@
     <module>format-pcapng</module>
     <module>format-iceberg</module>
     <module>format-deltalake</module>
+    <module>storage-drill</module>
     <module>storage-phoenix</module>
     <module>storage-googlesheets</module>
     <module>storage-hive</module>
diff --git a/contrib/storage-drill/README.md b/contrib/storage-drill/README.md
new file mode 100644
index 0000000000..0f4491ba54
--- /dev/null
+++ b/contrib/storage-drill/README.md
@@ -0,0 +1,36 @@
+# Apache Drill storage plugin
+
+This storage plugin allows Drill to query other Drill clusters.
+Unlike the JDBC driver, this plugin doesn't produce extra conversions of input data and transfers it
+as is to the required operators. But similar to JDBC, it fetches data batches only when it is ready
+to process it to avoid memory issues.
+
+## Supported optimizations and features
+
+Drill storage plugin supports push-down of all operators it has.
+It determines which parts of the query plan could be pushed down and converts them to SQL queries
+submitted to the underlying Drill cluster.
+
+## Configuration
+
+Drill storage plugin has the following configuration properties:
+
+- `type` - storage plugin type, should be `'drill'`
+- `connection` - JDBC connection string to connect to underlying Drill cluster. Please refer to
+  [Configuration](https://drill.apache.org/docs/using-the-jdbc-driver/#using-the-jdbc-url-for-a-random-drillbit-connection) page for more details
+- `properties` - Connection properties. Please refer to [Configuration](https://drill.apache.org/docs/using-the-jdbc-driver/#using-the-jdbc-url-for-a-random-drillbit-connection) page for more details
+- `credentialsProvider` - credentials provider
+
+### Storage config example:
+
+```json
+{
+  "storage":{
+    "drill" : {
+      "type":"drill",
+      "connection":"jdbc:drill:drillbit=localhost:31010",
+      "enabled": false
+    }
+  }
+}
+```
diff --git a/contrib/storage-drill/pom.xml b/contrib/storage-drill/pom.xml
new file mode 100644
index 0000000000..f3b88bb28c
--- /dev/null
+++ b/contrib/storage-drill/pom.xml
@@ -0,0 +1,80 @@
+<?xml version="1.0"?>
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <artifactId>drill-contrib-parent</artifactId>
+    <groupId>org.apache.drill.contrib</groupId>
+    <version>2.0.0-SNAPSHOT</version>
+  </parent>
+
+  <artifactId>drill-storage</artifactId>
+
+  <name>Drill : Contrib : Storage : Drill</name>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.drill.exec</groupId>
+      <artifactId>drill-java-exec</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+
+    <!-- Test dependency -->
+    <dependency>
+      <groupId>org.apache.drill.exec</groupId>
+      <artifactId>drill-java-exec</artifactId>
+      <classifier>tests</classifier>
+      <version>${project.version}</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.drill</groupId>
+      <artifactId>drill-common</artifactId>
+      <classifier>tests</classifier>
+      <version>${project.version}</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.drill.contrib.data</groupId>
+      <artifactId>tpch-sample-data</artifactId>
+      <version>${project.version}</version>
+      <scope>test</scope>
+    </dependency>
+  </dependencies>
+
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-surefire-plugin</artifactId>
+        <configuration>
+          <systemProperties>
+            <property>
+              <name>logback.log.dir</name>
+              <value>${project.build.directory}/surefire-reports</value>
+            </property>
+          </systemProperties>
+        </configuration>
+      </plugin>
+    </plugins>
+  </build>
+
+</project>
diff --git a/contrib/storage-drill/src/main/java/org/apache/drill/exec/store/drill/plugin/DrillGroupScan.java b/contrib/storage-drill/src/main/java/org/apache/drill/exec/store/drill/plugin/DrillGroupScan.java
new file mode 100644
index 0000000000..48c60ec99b
--- /dev/null
+++ b/contrib/storage-drill/src/main/java/org/apache/drill/exec/store/drill/plugin/DrillGroupScan.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.drill.plugin;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.calcite.avatica.util.Casing;
+import org.apache.calcite.sql.SqlDialect;
+import org.apache.commons.lang3.builder.EqualsBuilder;
+import org.apache.commons.lang3.builder.HashCodeBuilder;
+import org.apache.drill.common.PlanStringBuilder;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.physical.base.AbstractGroupScan;
+import org.apache.drill.exec.physical.base.GroupScan;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.base.ScanStats;
+import org.apache.drill.exec.physical.base.SubScan;
+import org.apache.drill.exec.planner.sql.conversion.SqlConverter;
+import org.apache.drill.exec.proto.CoordinationProtos;
+
+import java.util.List;
+
+public class DrillGroupScan extends AbstractGroupScan {
+  private static final double ROWS = 1e6;
+
+  private final DrillStoragePluginConfig pluginConfig;
+  private final DrillScanSpec scanSpec;
+
+  @JsonCreator
+  public DrillGroupScan(
+      @JsonProperty("userName") String userName,
+      @JsonProperty("pluginConfig") DrillStoragePluginConfig pluginConfig,
+      @JsonProperty("scanSpec") DrillScanSpec scanSpec) {
+    super(userName);
+    this.pluginConfig = pluginConfig;
+    this.scanSpec = scanSpec;
+  }
+
+  public DrillGroupScan(DrillGroupScan that) {
+    super(that);
+    this.pluginConfig = that.pluginConfig;
+    this.scanSpec = that.scanSpec;
+  }
+
+  @JsonProperty("pluginConfig")
+  public DrillStoragePluginConfig getPluginConfig() {
+    return pluginConfig;
+  }
+
+  @JsonProperty("scanSpec")
+  public DrillScanSpec getScanSpec() {
+    return scanSpec;
+  }
+
+  @Override
+  public void applyAssignments(List<CoordinationProtos.DrillbitEndpoint> endpoints) {
+  }
+
+  @Override
+  public SubScan getSpecificScan(int minorFragmentId) {
+    return new DrillSubScan(userName, pluginConfig, scanSpec.getQuery());
+  }
+
+  @JsonIgnore
+  public SqlDialect getDialect() {
+    return new SqlDialect(SqlDialect.EMPTY_CONTEXT
+      .withIdentifierQuoteString(pluginConfig.getIdentifierQuoteString())
+      .withConformance(SqlConverter.DRILL_CONFORMANCE)
+      .withUnquotedCasing(Casing.UNCHANGED)
+      .withQuotedCasing(Casing.UNCHANGED));
+  }
+
+  @Override
+  public int getMaxParallelizationWidth() {
+    return 1;
+  }
+
+  @Override
+  public String getDigest() {
+    return toString();
+  }
+
+  @Override
+  public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) {
+    return new DrillGroupScan(this);
+  }
+
+  @Override
+  public GroupScan clone(List<SchemaPath> columns) {
+    return new DrillGroupScan(this);
+  }
+
+  @Override
+  public ScanStats getScanStats() {
+    return new ScanStats(
+      ScanStats.GroupScanProperty.NO_EXACT_ROW_COUNT,
+      (long) Math.max(ROWS, 1),
+      1,
+      1);
+  }
+
+  @Override
+  public String toString() {
+    return new PlanStringBuilder(this)
+      .field("scanSpec", scanSpec)
+      .toString();
+  }
+
+  @Override
+  @JsonIgnore
+  public List<SchemaPath> getColumns() {
+    return super.getColumns();
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+
+    DrillGroupScan that = (DrillGroupScan) o;
+
+    return new EqualsBuilder()
+      .append(getPluginConfig(), that.getPluginConfig())
+      .append(getScanSpec(), that.getScanSpec())
+      .isEquals();
+  }
+
+  @Override
+  public int hashCode() {
+    return new HashCodeBuilder(17, 37)
+      .append(getPluginConfig())
+      .append(getScanSpec())
+      .toHashCode();
+  }
+}
diff --git a/contrib/storage-drill/src/main/java/org/apache/drill/exec/store/drill/plugin/DrillRecordReader.java b/contrib/storage-drill/src/main/java/org/apache/drill/exec/store/drill/plugin/DrillRecordReader.java
new file mode 100644
index 0000000000..d680657511
--- /dev/null
+++ b/contrib/storage-drill/src/main/java/org/apache/drill/exec/store/drill/plugin/DrillRecordReader.java
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.drill.plugin;
+
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.client.DrillClient;
+import org.apache.drill.exec.exception.OutOfMemoryException;
+import org.apache.drill.exec.ops.ExecutorFragmentContext;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.OperatorContext;
+import org.apache.drill.exec.proto.UserBitShared;
+import org.apache.drill.exec.proto.UserBitShared.QueryType;
+import org.apache.drill.exec.record.BatchSchema;
+import org.apache.drill.exec.record.CloseableRecordBatch;
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.record.RecordBatchLoader;
+import org.apache.drill.exec.record.TypedFieldId;
+import org.apache.drill.exec.record.VectorContainer;
+import org.apache.drill.exec.record.VectorWrapper;
+import org.apache.drill.exec.record.WritableBatch;
+import org.apache.drill.exec.record.selection.SelectionVector2;
+import org.apache.drill.exec.record.selection.SelectionVector4;
+import org.apache.drill.exec.rpc.user.BlockingResultsListener;
+import org.apache.drill.exec.rpc.user.QueryDataBatch;
+import org.apache.drill.exec.testing.ControlsInjector;
+import org.apache.drill.exec.testing.ControlsInjectorFactory;
+import org.apache.drill.exec.util.ImpersonationUtil;
+import org.apache.drill.shaded.guava.com.google.common.base.Stopwatch;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.SQLTimeoutException;
+import java.util.Iterator;
+import java.util.Optional;
+
+public class DrillRecordReader implements CloseableRecordBatch {
+  private static final Logger logger = LoggerFactory.getLogger(DrillRecordReader.class);
+
+  private static final ControlsInjector injector = ControlsInjectorFactory.getInjector(DrillRecordReader.class);
+
+  private final DrillClient drillClient;
+  private final RecordBatchLoader batchLoader;
+  private final FragmentContext context;
+
+  private final BlockingResultsListener resultsListener;
+  private final UserBitShared.QueryId id;
+  private BatchSchema schema;
+  private boolean first = true;
+  private final OperatorContext oContext;
+
+  public DrillRecordReader(ExecutorFragmentContext context, DrillSubScan config)
+      throws OutOfMemoryException, ExecutionSetupException {
+    this.context = context;
+    this.oContext = context.newOperatorContext(config);
+    this.batchLoader = new RecordBatchLoader(oContext.getAllocator());
+
+    String userName = Optional.ofNullable(config.getUserName()).orElse(ImpersonationUtil.getProcessUserName());
+    this.drillClient = config.getPluginConfig().getDrillClient(userName, oContext.getAllocator());
+    long queryTimeout = drillClient.getConfig().getLong(ExecConstants.JDBC_QUERY_TIMEOUT);
+    int batchQueueThrottlingThreshold = drillClient.getConfig()
+      .getInt(ExecConstants.JDBC_BATCH_QUEUE_THROTTLING_THRESHOLD);
+    Stopwatch stopwatch = Stopwatch.createStarted();
+    this.resultsListener =
+      new BlockingResultsListener(() -> stopwatch, () -> queryTimeout, batchQueueThrottlingThreshold);
+    this.drillClient.runQuery(QueryType.SQL, config.getQuery(), resultsListener);
+    this.id = resultsListener.getQueryId();
+    try {
+      resultsListener.awaitFirstMessage();
+    } catch (InterruptedException | SQLTimeoutException e) {
+      throw new ExecutionSetupException(e);
+    }
+  }
+
+  @Override
+  public FragmentContext getContext() {
+    return context;
+  }
+
+  @Override
+  public BatchSchema getSchema() {
+    return schema;
+  }
+
+  @Override
+  public int getRecordCount() {
+    return batchLoader.getRecordCount();
+  }
+
+  @Override
+  public void cancel() {
+    drillClient.cancelQuery(id);
+  }
+
+  @Override
+  public Iterator<VectorWrapper<?>> iterator() {
+    return batchLoader.iterator();
+  }
+
+  @Override
+  public SelectionVector2 getSelectionVector2() {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public SelectionVector4 getSelectionVector4() {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public TypedFieldId getValueVectorId(SchemaPath path) {
+    return batchLoader.getValueVectorId(path);
+  }
+
+  @Override
+  public VectorWrapper<?> getValueAccessorById(Class<?> clazz, int... ids) {
+    return batchLoader.getValueAccessorById(clazz, ids);
+  }
+
+  private QueryDataBatch getNextBatch() {
+    try {
+      injector.injectChecked(context.getExecutionControls(), "next-allocate", OutOfMemoryException.class);
+      return resultsListener.getNext();
+    } catch(InterruptedException e) {
+      // Preserve evidence that the interruption occurred so that code higher up
+      // on the call stack can learn of the
+      // interruption and respond to it if it wants to.
+      Thread.currentThread().interrupt();
+
+      return null;
+    } catch (Exception e) {
+      throw UserException.dataReadError(e)
+        .addContext("Failure when reading incoming batch")
+        .build(logger);
+    }
+  }
+
+  @Override
+  public RecordBatch.IterOutcome next() {
+    batchLoader.resetRecordCount();
+    oContext.getStats().startProcessing();
+    try {
+      QueryDataBatch batch;
+      try {
+        oContext.getStats().startWait();
+        batch = getNextBatch();
+
+        // skip over empty batches. we do this since these are basically control messages.
+        while (batch != null && batch.getHeader().getDef().getRecordCount() == 0
+          && (!first || batch.getHeader().getDef().getFieldCount() == 0)) {
+          batch = getNextBatch();
+        }
+      } finally {
+        oContext.getStats().stopWait();
+      }
+
+      first = false;
+
+      if (batch == null) {
+        // Represents last outcome of next(). If an Exception is thrown
+        // during the method's execution a value IterOutcome.STOP will be assigned.
+        IterOutcome lastOutcome = IterOutcome.NONE;
+        batchLoader.zero();
+        context.getExecutorState().checkContinue();
+        return lastOutcome;
+      }
+
+      if (context.getAllocator().isOverLimit()) {
+        context.requestMemory(this);
+        if (context.getAllocator().isOverLimit()) {
+          throw new OutOfMemoryException("Allocator over limit");
+        }
+      }
+
+      UserBitShared.RecordBatchDef rbd = batch.getHeader().getDef();
+      boolean schemaChanged = batchLoader.load(rbd, batch.getData());
+
+      batch.release();
+      if (schemaChanged) {
+        this.schema = batchLoader.getSchema();
+        oContext.getStats().batchReceived(0, rbd.getRecordCount(), true);
+        return RecordBatch.IterOutcome.OK_NEW_SCHEMA;
+      } else {
+        oContext.getStats().batchReceived(0, rbd.getRecordCount(), false);
+        return RecordBatch.IterOutcome.OK;
+      }
+    } finally {
+      oContext.getStats().stopProcessing();
+    }
+  }
+
+  @Override
+  public WritableBatch getWritableBatch() {
+    return batchLoader.getWritableBatch();
+  }
+
+  @Override
+  public void close() {
+    logger.debug("Closing {}", getClass().getCanonicalName());
+    batchLoader.clear();
+    resultsListener.close();
+    drillClient.close();
+  }
+
+  @Override
+  public VectorContainer getOutgoingContainer() {
+    throw new UnsupportedOperationException(
+      String.format("You should not call getOutgoingContainer() for class %s",
+        getClass().getCanonicalName()));
+  }
+
+  @Override
+  public VectorContainer getContainer() {
+    return batchLoader.getContainer();
+  }
+
+  @Override
+  public void dump() {
+    logger.error("DrillRecordReader[batchLoader={}, schema={}]", batchLoader, schema);
+  }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/LeafPrel.java b/contrib/storage-drill/src/main/java/org/apache/drill/exec/store/drill/plugin/DrillScanBatchCreator.java
similarity index 50%
copy from exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/LeafPrel.java
copy to contrib/storage-drill/src/main/java/org/apache/drill/exec/store/drill/plugin/DrillScanBatchCreator.java
index 55a79556bc..6837831f4e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/LeafPrel.java
+++ b/contrib/storage-drill/src/main/java/org/apache/drill/exec/store/drill/plugin/DrillScanBatchCreator.java
@@ -15,23 +15,24 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.drill.exec.planner.physical;
+package org.apache.drill.exec.store.drill.plugin;
 
-import java.util.Collections;
-import java.util.Iterator;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.exec.ops.ExecutorFragmentContext;
+import org.apache.drill.exec.physical.impl.BatchCreator;
+import org.apache.drill.exec.record.CloseableRecordBatch;
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
 
-/**
- * Prel without children.
- */
-public interface LeafPrel extends Prel {
+import java.util.List;
 
-  @Override
-  default boolean needsFinalColumnReordering() {
-    return true;
-  }
+@SuppressWarnings("unused")
+public class DrillScanBatchCreator implements BatchCreator<DrillSubScan> {
 
   @Override
-  default Iterator<Prel> iterator() {
-    return Collections.emptyIterator();
+  public CloseableRecordBatch getBatch(ExecutorFragmentContext context, DrillSubScan subScan,
+      List<RecordBatch> children) throws ExecutionSetupException {
+    Preconditions.checkArgument(children.isEmpty());
+    return new DrillRecordReader(context, subScan);
   }
 }
diff --git a/contrib/storage-drill/src/main/java/org/apache/drill/exec/store/drill/plugin/DrillScanSpec.java b/contrib/storage-drill/src/main/java/org/apache/drill/exec/store/drill/plugin/DrillScanSpec.java
new file mode 100644
index 0000000000..f50602a15d
--- /dev/null
+++ b/contrib/storage-drill/src/main/java/org/apache/drill/exec/store/drill/plugin/DrillScanSpec.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.drill.plugin;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.drill.common.PlanStringBuilder;
+import org.apache.drill.exec.planner.logical.DrillTableSelection;
+
+import java.util.List;
+
+public class DrillScanSpec implements DrillTableSelection {
+  private List<String> schemaPath;
+  private String collectionName;
+
+  private String query;
+
+  @JsonCreator
+  public DrillScanSpec(@JsonProperty("schemaPath") List<String> schemaPath,
+      @JsonProperty("collectionName") String collectionName) {
+    this.schemaPath = schemaPath;
+    this.collectionName = collectionName;
+  }
+
+  public DrillScanSpec(String query) {
+    this.query = query;
+  }
+
+  public List<String> getSchemaPath() {
+    return this.schemaPath;
+  }
+
+  public String getCollectionName() {
+    return this.collectionName;
+  }
+
+  public String getQuery() {
+    return this.query;
+  }
+
+  @Override
+  public String toString() {
+    return new PlanStringBuilder(this)
+      .field("schemaPath", schemaPath)
+      .field("collectionName", collectionName)
+      .field("query", query)
+      .toString();
+  }
+
+  @Override
+  public String digest() {
+    return toString();
+  }
+}
diff --git a/contrib/storage-drill/src/main/java/org/apache/drill/exec/store/drill/plugin/DrillStoragePlugin.java b/contrib/storage-drill/src/main/java/org/apache/drill/exec/store/drill/plugin/DrillStoragePlugin.java
new file mode 100644
index 0000000000..b4c77ef5cc
--- /dev/null
+++ b/contrib/storage-drill/src/main/java/org/apache/drill/exec/store/drill/plugin/DrillStoragePlugin.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.drill.plugin;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.calcite.plan.Convention;
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.schema.SchemaPlus;
+import org.apache.drill.common.AutoCloseables;
+import org.apache.drill.common.JSONOptions;
+import org.apache.drill.exec.client.DrillClient;
+import org.apache.drill.exec.ops.OptimizerRulesContext;
+import org.apache.drill.exec.physical.base.AbstractGroupScan;
+import org.apache.drill.exec.planner.PlannerPhase;
+import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.store.AbstractStoragePlugin;
+import org.apache.drill.exec.store.PluginRulesProviderImpl;
+import org.apache.drill.exec.store.SchemaConfig;
+import org.apache.drill.exec.store.StoragePluginRulesSupplier;
+import org.apache.drill.exec.store.drill.plugin.plan.DrillPluginImplementor;
+import org.apache.drill.exec.store.drill.plugin.schema.DrillSchemaFactory;
+import org.apache.drill.exec.store.plan.rel.PluginRel;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+public class DrillStoragePlugin extends AbstractStoragePlugin {
+
+  private final DrillStoragePluginConfig drillConfig;
+  private final DrillSchemaFactory schemaFactory;
+  private final StoragePluginRulesSupplier storagePluginRulesSupplier;
+  private final Map<String, DrillClient> userClients;
+
+  public DrillStoragePlugin(
+      DrillStoragePluginConfig drillConfig,
+      DrillbitContext context,
+      String name) {
+    super(context, name);
+    this.drillConfig = drillConfig;
+    this.schemaFactory = new DrillSchemaFactory(this, name);
+    this.storagePluginRulesSupplier = storagePluginRulesSupplier(name);
+
+    assert drillConfig.getConnection().startsWith(DrillStoragePluginConfig.CONNECTION_STRING_PREFIX);
+
+    this.userClients = new ConcurrentHashMap<>();
+  }
+
+  private static StoragePluginRulesSupplier storagePluginRulesSupplier(String name) {
+    Convention convention = new Convention.Impl("DRILL." + name, PluginRel.class);
+    return StoragePluginRulesSupplier.builder()
+      .rulesProvider(new PluginRulesProviderImpl(convention, DrillPluginImplementor::new))
+      .supportsProjectPushdown(true)
+      .supportsSortPushdown(true)
+      .supportsAggregatePushdown(true)
+      .supportsFilterPushdown(true)
+      .supportsLimitPushdown(true)
+      .supportsUnionPushdown(true)
+      .supportsJoinPushdown(true)
+      .convention(convention)
+      .build();
+  }
+
+  @Override
+  public DrillStoragePluginConfig getConfig() {
+    return drillConfig;
+  }
+
+  @Override
+  public void registerSchemas(SchemaConfig schemaConfig, SchemaPlus parent) {
+    schemaFactory.registerSchemas(schemaConfig, parent);
+  }
+
+  @Override
+  public boolean supportsRead() {
+    return true;
+  }
+
+  @Override
+  public AbstractGroupScan getPhysicalScan(String userName, JSONOptions selection) throws IOException {
+    DrillScanSpec scanSpec = selection.getListWith(new ObjectMapper(), new TypeReference<DrillScanSpec>() {
+    });
+    return new DrillGroupScan(userName, drillConfig, scanSpec);
+  }
+
+  @Override
+  public Set<? extends RelOptRule> getOptimizerRules(OptimizerRulesContext optimizerContext, PlannerPhase phase) {
+    switch (phase) {
+      case PHYSICAL:
+      case LOGICAL:
+        return storagePluginRulesSupplier.getOptimizerRules();
+      case LOGICAL_PRUNE_AND_JOIN:
+      case LOGICAL_PRUNE:
+      case PARTITION_PRUNING:
+      case JOIN_PLANNING:
+      default:
+        return Collections.emptySet();
+    }
+  }
+
+  public Convention convention() {
+    return storagePluginRulesSupplier.convention();
+  }
+
+  public DrillClient getClient(String userName) {
+    userClients.computeIfAbsent(userName, this::createClient);
+    // recompute drill client for the case of closed connection
+    return userClients.computeIfPresent(userName, (name, value) -> {
+      if (!value.connectionIsActive()) {
+        AutoCloseables.closeSilently(value);
+        return createClient(name);
+      }
+      return value;
+    });
+  }
+
+  private DrillClient createClient(String userName) {
+    return drillConfig.getDrillClient(userName, null);
+  }
+
+  @Override
+  public void close() {
+    AutoCloseables.closeSilently(userClients.values().toArray(new AutoCloseable[0]));
+  }
+}
diff --git a/contrib/storage-drill/src/main/java/org/apache/drill/exec/store/drill/plugin/DrillStoragePluginConfig.java b/contrib/storage-drill/src/main/java/org/apache/drill/exec/store/drill/plugin/DrillStoragePluginConfig.java
new file mode 100644
index 0000000000..3dfb72216b
--- /dev/null
+++ b/contrib/storage-drill/src/main/java/org/apache/drill/exec/store/drill/plugin/DrillStoragePluginConfig.java
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.drill.plugin;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import org.apache.calcite.avatica.ConnectStringParser;
+import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.common.config.DrillProperties;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.logical.StoragePluginConfig;
+import org.apache.drill.common.logical.security.CredentialsProvider;
+import org.apache.drill.common.logical.security.PlainCredentialsProvider;
+import org.apache.drill.exec.client.DrillClient;
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.proto.UserBitShared.UserCredentials;
+import org.apache.drill.exec.rpc.RpcException;
+import org.apache.drill.exec.store.security.UsernamePasswordCredentials;
+import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
+import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableMap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.SQLException;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Properties;
+
+@JsonTypeName(DrillStoragePluginConfig.NAME)
+public class DrillStoragePluginConfig extends StoragePluginConfig {
+  private static final Logger logger = LoggerFactory.getLogger(DrillStoragePluginConfig.class);
+
+  public static final String NAME = "drill";
+  public static final String CONNECTION_STRING_PREFIX = "jdbc:drill:";
+
+  private static final String DEFAULT_QUOTING_IDENTIFIER = "`";
+
+  private final String connection;
+  private final Properties properties;
+
+  @JsonCreator
+  public DrillStoragePluginConfig(
+      @JsonProperty("connection") String connection,
+      @JsonProperty("properties") Properties properties,
+      @JsonProperty("credentialsProvider") CredentialsProvider credentialsProvider,
+      @JsonProperty("authMode") String authMode) {
+    super(getCredentialsProvider(credentialsProvider), credentialsProvider == null,
+      AuthMode.parseOrDefault(authMode, AuthMode.SHARED_USER));
+    this.connection = connection;
+    this.properties = Optional.ofNullable(properties).orElse(new Properties());
+  }
+
+  private DrillStoragePluginConfig(DrillStoragePluginConfig that,
+    CredentialsProvider credentialsProvider) {
+    super(getCredentialsProvider(credentialsProvider),
+      credentialsProvider == null, that.authMode);
+    this.connection = that.connection;
+    this.properties = that.properties;
+  }
+
+  @JsonProperty("connection")
+  public String getConnection() {
+    return connection;
+  }
+
+  @JsonProperty("properties")
+  public Properties getProperties() {
+    return properties;
+  }
+
+  private static CredentialsProvider getCredentialsProvider(CredentialsProvider credentialsProvider) {
+    return credentialsProvider != null ? credentialsProvider : PlainCredentialsProvider.EMPTY_CREDENTIALS_PROVIDER;
+  }
+
+  @JsonIgnore
+  public String getIdentifierQuoteString() {
+    return properties.getProperty(DrillProperties.QUOTING_IDENTIFIERS, DEFAULT_QUOTING_IDENTIFIER);
+  }
+
+  @Override
+  public DrillStoragePluginConfig updateCredentialProvider(CredentialsProvider credentialsProvider) {
+    return new DrillStoragePluginConfig(this, credentialsProvider);
+  }
+
+  private Optional<UsernamePasswordCredentials> getUsernamePasswordCredentials(
+    UserCredentials userCredentials) {
+    switch (authMode) {
+    case SHARED_USER:
+      return new UsernamePasswordCredentials.Builder()
+        .setCredentialsProvider(credentialsProvider)
+        .build();
+    case USER_TRANSLATION:
+      Preconditions.checkNotNull(
+        userCredentials,
+        "A drill query user is required for user translation auth mode."
+      );
+      return new UsernamePasswordCredentials.Builder()
+        .setCredentialsProvider(credentialsProvider)
+        .setQueryUser(userCredentials.getUserName())
+        .build();
+    default:
+      throw UserException.validationError()
+        .message("This storage plugin does not support auth mode: %s", authMode)
+        .build(logger);
+    }
+  }
+
+  private Map<String, String> getCredentials(UserCredentials userCredentials) {
+    return getUsernamePasswordCredentials(userCredentials)
+      .<Map<String, String>>map(creds -> ImmutableMap.of(DrillProperties.USER, creds.getUsername(),
+        DrillProperties.PASSWORD, creds.getPassword()))
+      .orElse(Collections.emptyMap());
+  }
+
+  @JsonIgnore
+  public DrillClient getDrillClient(String userName, BufferAllocator allocator) {
+    try {
+      String urlSuffix = connection.substring(CONNECTION_STRING_PREFIX.length());
+      Properties props = ConnectStringParser.parse(urlSuffix, properties);
+      props.putAll(getCredentials(UserCredentials.newBuilder().setUserName(userName).build()));
+
+      DrillConfig dConfig = DrillConfig.forClient();
+      boolean isDirect = props.getProperty(DrillProperties.DRILLBIT_CONNECTION) != null;
+      DrillClient client = new DrillClient(dConfig, null, allocator, isDirect);
+
+      String connect = props.getProperty(DrillProperties.ZOOKEEPER_CONNECTION);
+      client.connect(connect, props);
+      return client;
+    } catch (RpcException | SQLException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    DrillStoragePluginConfig that = (DrillStoragePluginConfig) o;
+    return Objects.equals(connection, that.connection);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(connection);
+  }
+}
diff --git a/contrib/storage-drill/src/main/java/org/apache/drill/exec/store/drill/plugin/DrillSubScan.java b/contrib/storage-drill/src/main/java/org/apache/drill/exec/store/drill/plugin/DrillSubScan.java
new file mode 100644
index 0000000000..bd7061651f
--- /dev/null
+++ b/contrib/storage-drill/src/main/java/org/apache/drill/exec/store/drill/plugin/DrillSubScan.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.drill.plugin;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import org.apache.drill.common.logical.StoragePluginConfig;
+import org.apache.drill.exec.physical.base.AbstractBase;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.base.PhysicalVisitor;
+import org.apache.drill.exec.physical.base.SubScan;
+import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
+
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+
+@JsonTypeName("drill-read")
+public class DrillSubScan extends AbstractBase implements SubScan {
+
+  public static final String OPERATOR_TYPE = "DRILL_SUB_SCAN";
+
+  private final String query;
+
+  @JsonProperty
+  private final DrillStoragePluginConfig pluginConfig;
+
+  @JsonCreator
+  public DrillSubScan(
+      @JsonProperty("userName") String userName,
+      @JsonProperty("pluginConfig") StoragePluginConfig pluginConfig,
+      @JsonProperty("query") String query) {
+    super(userName);
+    this.pluginConfig = (DrillStoragePluginConfig) pluginConfig;
+    this.query = query;
+  }
+
+  public DrillSubScan(String userName,
+    DrillStoragePluginConfig storagePluginConfig,
+    String query) {
+    super(userName);
+    this.pluginConfig = storagePluginConfig;
+    this.query = query;
+  }
+
+  @Override
+  public <T, X, E extends Throwable> T accept(
+      PhysicalVisitor<T, X, E> physicalVisitor, X value) throws E {
+    return physicalVisitor.visitSubScan(this, value);
+  }
+
+  @Override
+  public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) {
+    Preconditions.checkArgument(children.isEmpty());
+    return new DrillSubScan(getUserName(), pluginConfig, query);
+  }
+
+  public DrillStoragePluginConfig getPluginConfig() {
+    return pluginConfig;
+  }
+
+  public String getQuery() {
+    return query;
+  }
+
+  @Override
+  public String getOperatorType() {
+    return OPERATOR_TYPE;
+  }
+
+  @Override
+  public Iterator<PhysicalOperator> iterator() {
+    return Collections.emptyIterator();
+  }
+}
diff --git a/exec/jdbc/src/main/java/org/apache/drill/jdbc/SqlTimeoutException.java b/contrib/storage-drill/src/main/java/org/apache/drill/exec/store/drill/plugin/package-info.java
similarity index 57%
rename from exec/jdbc/src/main/java/org/apache/drill/jdbc/SqlTimeoutException.java
rename to contrib/storage-drill/src/main/java/org/apache/drill/exec/store/drill/plugin/package-info.java
index 7087a8c9e8..a210cc6446 100644
--- a/exec/jdbc/src/main/java/org/apache/drill/jdbc/SqlTimeoutException.java
+++ b/contrib/storage-drill/src/main/java/org/apache/drill/exec/store/drill/plugin/package-info.java
@@ -15,24 +15,9 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.drill.jdbc;
-
-import java.sql.SQLTimeoutException;
-
 /**
- * Indicates that an operation timed out. This is not an error; you can
- * retry the operation.
+ * Drill storage plugin.
+ * <p>
+ * Enables querying Drill as a data store.
  */
-public class SqlTimeoutException extends SQLTimeoutException {
-  private static final long serialVersionUID = 2017_04_03L;
-
-  SqlTimeoutException() {
-    // SQLException(reason, SQLState, vendorCode)
-    // REVIEW mb 19-Jul-05 Is there a standard SQLState?
-    super("timeout", null, 0);
-  }
-
-  public SqlTimeoutException(long timeoutValueInSeconds) {
-    super("Query timed out in "+ timeoutValueInSeconds + " seconds");
-  }
-}
+package org.apache.drill.exec.store.drill.plugin;
diff --git a/contrib/storage-drill/src/main/java/org/apache/drill/exec/store/drill/plugin/plan/DrillPluginImplementor.java b/contrib/storage-drill/src/main/java/org/apache/drill/exec/store/drill/plugin/plan/DrillPluginImplementor.java
new file mode 100644
index 0000000000..7f1cb16903
--- /dev/null
+++ b/contrib/storage-drill/src/main/java/org/apache/drill/exec/store/drill/plugin/plan/DrillPluginImplementor.java
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.drill.plugin.plan;
+
+import org.apache.calcite.adapter.java.JavaTypeFactory;
+import org.apache.calcite.adapter.jdbc.JdbcImplementor;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Aggregate;
+import org.apache.calcite.rel.core.Filter;
+import org.apache.calcite.rel.core.Join;
+import org.apache.calcite.rel.core.Project;
+import org.apache.calcite.rel.core.Sort;
+import org.apache.calcite.rel.core.TableScan;
+import org.apache.calcite.rel.core.Union;
+import org.apache.calcite.sql.SqlDialect;
+import org.apache.calcite.sql.SqlIdentifier;
+import org.apache.calcite.sql.parser.SqlParserPos;
+import org.apache.drill.exec.physical.base.GroupScan;
+import org.apache.drill.exec.planner.common.DrillLimitRelBase;
+import org.apache.drill.exec.store.StoragePlugin;
+import org.apache.drill.exec.store.drill.plugin.DrillGroupScan;
+import org.apache.drill.exec.store.drill.plugin.DrillScanSpec;
+import org.apache.drill.exec.store.drill.plugin.DrillStoragePlugin;
+import org.apache.drill.exec.store.plan.AbstractPluginImplementor;
+import org.apache.drill.exec.store.plan.rel.PluginAggregateRel;
+import org.apache.drill.exec.store.plan.rel.PluginFilterRel;
+import org.apache.drill.exec.store.plan.rel.PluginJoinRel;
+import org.apache.drill.exec.store.plan.rel.PluginLimitRel;
+import org.apache.drill.exec.store.plan.rel.PluginProjectRel;
+import org.apache.drill.exec.store.plan.rel.PluginSortRel;
+import org.apache.drill.exec.store.plan.rel.PluginUnionRel;
+import org.apache.drill.exec.store.plan.rel.StoragePluginTableScan;
+import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableList;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Optional;
+
+public class DrillPluginImplementor  extends AbstractPluginImplementor {
+  private DrillGroupScan groupScan;
+  private boolean isRoot = true;
+
+  @Override
+  protected Class<? extends StoragePlugin> supportedPlugin() {
+    return DrillStoragePlugin.class;
+  }
+
+  @Override
+  public void implement(StoragePluginTableScan scan) {
+    groupScan = (DrillGroupScan) scan.getGroupScan();
+
+    if (isRoot) {
+      completeProcessing(scan);
+    }
+  }
+
+  private void completeProcessing(RelNode scan) {
+    String query = buildQuery(scan);
+    DrillScanSpec scanSpec = new DrillScanSpec(query);
+    groupScan = new DrillGroupScan(groupScan.getUserName(), groupScan.getPluginConfig(), scanSpec);
+  }
+
+  @Override
+  public void implement(PluginAggregateRel aggregate) throws IOException {
+    process(aggregate);
+  }
+
+  private void process(RelNode relNode) throws IOException {
+    boolean isRoot = this.isRoot;
+    this.isRoot = false;
+    for (RelNode input : relNode.getInputs()) {
+      visitChild(input);
+    }
+
+    if (isRoot) {
+      completeProcessing(relNode);
+    }
+  }
+
+  @Override
+  public void implement(PluginFilterRel filter) throws IOException {
+    process(filter);
+  }
+
+  @Override
+  public void implement(PluginLimitRel limit) throws IOException {
+    process(limit);
+  }
+
+  @Override
+  public void implement(PluginProjectRel project) throws IOException {
+    process(project);
+  }
+
+  @Override
+  public void implement(PluginSortRel sort) throws IOException {
+    process(sort);
+  }
+
+  @Override
+  public void implement(PluginUnionRel union) throws IOException {
+    process(union);
+  }
+
+  @Override
+  public void implement(PluginJoinRel join) throws IOException {
+    process(join);
+  }
+
+  @Override
+  public boolean canImplement(Aggregate aggregate) {
+    return true;
+  }
+
+  @Override
+  public boolean canImplement(Filter filter) {
+    return true;
+  }
+
+  @Override
+  public boolean canImplement(DrillLimitRelBase limit) {
+    return true;
+  }
+
+  @Override
+  public boolean canImplement(Project project) {
+    return true;
+  }
+
+  @Override
+  public boolean canImplement(Sort sort) {
+    return true;
+  }
+
+  @Override
+  public boolean canImplement(Union union) {
+    return true;
+  }
+
+  @Override
+  public boolean canImplement(TableScan scan) {
+    return true;
+  }
+
+  @Override
+  public boolean canImplement(Join scan) {
+    return true;
+  }
+
+  @Override
+  protected boolean hasPluginGroupScan(RelNode node) {
+    return findGroupScan(node) instanceof DrillGroupScan;
+  }
+
+  @Override
+  public GroupScan getPhysicalOperator() {
+    return groupScan;
+  }
+
+  public String buildQuery(RelNode node) {
+    SqlDialect dialect = groupScan.getDialect();
+    JdbcImplementor jdbcImplementor = new DrillRelToSqlConverter(dialect,
+      (JavaTypeFactory) node.getCluster().getTypeFactory());
+    JdbcImplementor.Result result = jdbcImplementor.visitRoot(node);
+    return result.asStatement().toSqlString(dialect).getSql();
+  }
+
+  public static class DrillRelToSqlConverter extends JdbcImplementor {
+
+    public DrillRelToSqlConverter(SqlDialect dialect, JavaTypeFactory typeFactory) {
+      super(dialect, typeFactory);
+    }
+
+    @SuppressWarnings("unused")
+    public Result visit(PluginLimitRel e) {
+      Result x = visitInput(e, 0, Clause.OFFSET, Clause.FETCH);
+      Builder builder = x.builder(e);
+      Optional.ofNullable(e.getFetch())
+        .ifPresent(fetch -> builder.setFetch(builder.context.toSql(null, fetch)));
+      Optional.ofNullable(e.getOffset())
+        .ifPresent(offset -> builder.setOffset(builder.context.toSql(null, offset)));
+      return builder.result();
+    }
+
+    @Override
+    public Result visit(TableScan scan) {
+      List<String> qualifiedName = scan.getTable().getQualifiedName();
+      SqlIdentifier sqlIdentifier = new SqlIdentifier(
+        qualifiedName.subList(1, qualifiedName.size()), SqlParserPos.ZERO);
+      return result(sqlIdentifier, ImmutableList.of(Clause.FROM), scan, null);
+    }
+  }
+}
diff --git a/contrib/storage-drill/src/main/java/org/apache/drill/exec/store/drill/plugin/schema/DrillPluginSchema.java b/contrib/storage-drill/src/main/java/org/apache/drill/exec/store/drill/plugin/schema/DrillPluginSchema.java
new file mode 100644
index 0000000000..0fde3d4fee
--- /dev/null
+++ b/contrib/storage-drill/src/main/java/org/apache/drill/exec/store/drill/plugin/schema/DrillPluginSchema.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.drill.plugin.schema;
+
+import org.apache.calcite.schema.SchemaPlus;
+import org.apache.calcite.schema.Table;
+import org.apache.drill.common.exceptions.DrillRuntimeException;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.planner.logical.DrillTable;
+import org.apache.drill.exec.proto.UserProtos;
+import org.apache.drill.exec.store.AbstractSchema;
+import org.apache.drill.exec.store.drill.plugin.DrillScanSpec;
+import org.apache.drill.exec.store.drill.plugin.DrillStoragePlugin;
+import org.apache.drill.exec.store.drill.plugin.DrillStoragePluginConfig;
+import org.apache.drill.exec.store.plan.rel.PluginDrillTable;
+import org.apache.drill.exec.util.ImpersonationUtil;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
+
+public class DrillPluginSchema extends AbstractSchema {
+
+  private final DrillStoragePlugin plugin;
+
+  private final Map<String, DrillPluginSchema> schemaMap = new HashMap<>();
+
+  private final Map<String, DrillTable> drillTables = new HashMap<>();
+  private final String userName;
+
+  public DrillPluginSchema(DrillStoragePlugin plugin, String name, String userName) {
+    super(Collections.emptyList(), name);
+    this.plugin = plugin;
+    this.userName = Optional.ofNullable(userName).orElse(ImpersonationUtil.getProcessUserName());
+
+    getSchemasList().stream()
+      .map(UserProtos.SchemaMetadata::getSchemaName)
+      .map(String::toLowerCase)
+      .map(SchemaPath::parseFromString)
+      .forEach(this::addSubSchema);
+  }
+
+  private DrillPluginSchema(DrillStoragePlugin plugin, List<String> parentSchemaPath, String name, String userName) {
+    super(parentSchemaPath, name);
+    this.plugin = plugin;
+    this.userName = userName;
+  }
+
+  private void addSubSchema(SchemaPath path) {
+    DrillPluginSchema drillSchema = new DrillPluginSchema(plugin, getSchemaPath(), path.getRootSegmentPath(), userName);
+    schemaMap.put(path.getRootSegmentPath(), drillSchema);
+    while (!path.isLeaf()) {
+      path = new SchemaPath(path.getRootSegment().getChild().getNameSegment());
+      DrillPluginSchema child = new DrillPluginSchema(plugin, drillSchema.getSchemaPath(), path.getRootSegmentPath(), userName);
+      drillSchema.schemaMap.put(path.getRootSegmentPath(), child);
+      drillSchema = child;
+    }
+  }
+
+  private List<UserProtos.SchemaMetadata> getSchemasList() {
+    try {
+      return plugin.getClient(userName).getSchemas(null, null)
+        .get().getSchemasList();
+    } catch (InterruptedException | ExecutionException e) {
+      throw new DrillRuntimeException(e);
+    }
+  }
+
+  @Override
+  public AbstractSchema getSubSchema(String name) {
+    return schemaMap.get(name);
+  }
+
+  void setHolder(SchemaPlus plusOfThis) {
+    getSubSchemaNames().forEach(s -> plusOfThis.add(s, getSubSchema(s)));
+  }
+
+  @Override
+  public boolean showInInformationSchema() {
+    return true;
+  }
+
+  @Override
+  public Set<String> getSubSchemaNames() {
+    return schemaMap.keySet();
+  }
+
+  @Override
+  public Table getTable(String name) {
+    return drillTables.computeIfAbsent(name,
+      key -> {
+        DrillScanSpec drillScanSpec = new DrillScanSpec(getSchemaPath(), key);
+        return new PluginDrillTable(plugin, getName(), null, drillScanSpec, plugin.convention());
+      });
+  }
+
+  @Override
+  public String getTypeName() {
+    return DrillStoragePluginConfig.NAME;
+  }
+
+  @Override
+  public Set<String> getTableNames() {
+    try {
+      List<String> schemaPaths = getSchemaPath();
+      String schemaPath = SchemaPath.getCompoundPath(
+        schemaPaths.subList(1, schemaPaths.size()).toArray(new String[0])).getAsUnescapedPath();
+      UserProtos.LikeFilter schemaNameFilter = UserProtos.LikeFilter.newBuilder()
+        .setPattern(schemaPath)
+        .build();
+      return plugin.getClient(userName).getTables(null, schemaNameFilter, null, null)
+        .get().getTablesList().stream()
+        .map(UserProtos.TableMetadata::getTableName)
+        .collect(Collectors.toSet());
+    } catch (InterruptedException | ExecutionException e) {
+      throw new DrillRuntimeException(e);
+    }
+  }
+}
diff --git a/contrib/storage-drill/src/main/java/org/apache/drill/exec/store/drill/plugin/schema/DrillSchemaFactory.java b/contrib/storage-drill/src/main/java/org/apache/drill/exec/store/drill/plugin/schema/DrillSchemaFactory.java
new file mode 100644
index 0000000000..ee68ec3b25
--- /dev/null
+++ b/contrib/storage-drill/src/main/java/org/apache/drill/exec/store/drill/plugin/schema/DrillSchemaFactory.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.drill.plugin.schema;
+
+import org.apache.calcite.schema.SchemaPlus;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.drill.common.exceptions.DrillRuntimeException;
+import org.apache.drill.exec.store.AbstractSchemaFactory;
+import org.apache.drill.exec.store.SchemaConfig;
+import org.apache.drill.exec.store.drill.plugin.DrillStoragePlugin;
+import org.apache.drill.exec.util.ImpersonationUtil;
+import org.apache.drill.shaded.guava.com.google.common.cache.CacheBuilder;
+import org.apache.drill.shaded.guava.com.google.common.cache.CacheLoader;
+import org.apache.drill.shaded.guava.com.google.common.cache.LoadingCache;
+
+import java.util.Optional;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+
+public class DrillSchemaFactory extends AbstractSchemaFactory {
+
+  private final LoadingCache<Pair<String, String>, DrillPluginSchema> databases;
+  private final DrillStoragePlugin plugin;
+
+  public DrillSchemaFactory(DrillStoragePlugin plugin, String schemaName) {
+    super(schemaName);
+    this.plugin = plugin;
+
+    databases = CacheBuilder
+        .newBuilder()
+        .expireAfterAccess(1, TimeUnit.MINUTES)
+        .build(new DatabaseLoader());
+  }
+
+  @Override
+  public void registerSchemas(SchemaConfig schemaConfig, SchemaPlus parent) {
+    try {
+      String userName = Optional.ofNullable(schemaConfig.getUserName()).orElse(ImpersonationUtil.getProcessUserName());
+      DrillPluginSchema schema = databases.get(Pair.of(getName(), userName));
+      SchemaPlus hPlus = parent.add(getName(), schema);
+      schema.setHolder(hPlus);
+    } catch (ExecutionException e) {
+      throw new DrillRuntimeException(e);
+    }
+  }
+
+  private class DatabaseLoader extends CacheLoader<Pair<String, String>, DrillPluginSchema> {
+    @Override
+    public DrillPluginSchema load(Pair<String, String> key) {
+      return new DrillPluginSchema(plugin, key.getKey(), key.getValue());
+    }
+  }
+}
diff --git a/contrib/storage-drill/src/main/resources/bootstrap-storage-plugins.json b/contrib/storage-drill/src/main/resources/bootstrap-storage-plugins.json
new file mode 100644
index 0000000000..b1cb94d67b
--- /dev/null
+++ b/contrib/storage-drill/src/main/resources/bootstrap-storage-plugins.json
@@ -0,0 +1,9 @@
+{
+  "storage":{
+    "drill" : {
+      "type":"drill",
+      "connection":"jdbc:drill:drillbit=localhost:31010",
+      "enabled": false
+    }
+  }
+}
diff --git a/contrib/storage-drill/src/main/resources/drill-module.conf b/contrib/storage-drill/src/main/resources/drill-module.conf
new file mode 100644
index 0000000000..33539df806
--- /dev/null
+++ b/contrib/storage-drill/src/main/resources/drill-module.conf
@@ -0,0 +1,24 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+#  This file tells Drill to consider this module when class path scanning.
+#  This file can also include any supplementary configuration information.
+#  This file is in HOCON format, see https://github.com/typesafehub/config/blob/master/HOCON.md for more information.
+drill.classpath.scanning: {
+  packages += "org.apache.drill.exec.store.drill.plugin"
+}
diff --git a/contrib/storage-drill/src/test/java/org/apache/drill/exec/store/drill/plugin/DrillPluginQueriesTest.java b/contrib/storage-drill/src/test/java/org/apache/drill/exec/store/drill/plugin/DrillPluginQueriesTest.java
new file mode 100644
index 0000000000..2bef81998d
--- /dev/null
+++ b/contrib/storage-drill/src/test/java/org/apache/drill/exec/store/drill/plugin/DrillPluginQueriesTest.java
@@ -0,0 +1,312 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.drill.plugin;
+
+import org.apache.drill.common.AutoCloseables;
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.exec.physical.rowSet.RowSet;
+import org.apache.drill.exec.physical.rowSet.RowSetBuilder;
+import org.apache.drill.exec.record.metadata.SchemaBuilder;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.test.ClientFixture;
+import org.apache.drill.test.ClusterFixture;
+import org.apache.drill.test.ClusterTest;
+import org.apache.drill.test.rowSet.RowSetComparison;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.Properties;
+
+import static org.junit.Assert.assertEquals;
+
+public class DrillPluginQueriesTest extends ClusterTest {
+
+  private static final String TABLE_NAME = "dfs.tmp.test_table";
+
+  private static ClusterFixture drill;
+  private static ClientFixture drillClient;
+
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+    initPlugin();
+  }
+
+  @AfterClass
+  public static void shutdown() throws Exception {
+    AutoCloseables.close(drill, drillClient);
+  }
+
+  private static void initPlugin() throws Exception {
+    startCluster(ClusterFixture.builder(dirTestWatcher));
+    drill = ClusterFixture.builder(dirTestWatcher).build();
+
+    DrillStoragePluginConfig config = new DrillStoragePluginConfig(
+      "jdbc:drill:drillbit=localhost:" + drill.drillbit().getUserPort(),
+      new Properties(), null, null);
+    config.setEnabled(true);
+    cluster.defineStoragePlugin("drill", config);
+    cluster.defineStoragePlugin("drill2", config);
+    drillClient = drill.clientFixture();
+
+    drillClient.queryBuilder()
+      .sql("create table %s as select * from cp.`tpch/nation.parquet`", TABLE_NAME)
+      .run();
+  }
+
+  @Test
+  public void testSerDe() throws Exception {
+    String plan = queryBuilder().sql("select * from drill.%s", TABLE_NAME).explainJson();
+    long count = queryBuilder().physical(plan).run().recordCount();
+    assertEquals(25, count);
+  }
+
+  @Test
+  public void testShowDatabases() throws Exception {
+    testBuilder()
+      .sqlQuery("show databases where SCHEMA_NAME='drill.dfs.tmp'")
+      .unOrdered()
+      .baselineColumns("SCHEMA_NAME")
+      .baselineValues("drill.dfs.tmp")
+      .go();
+  }
+
+  @Test
+  public void testShowTables() throws Exception {
+    testBuilder()
+      .sqlQuery("show tables IN drill.INFORMATION_SCHEMA")
+      .unOrdered()
+      .baselineColumns("TABLE_SCHEMA", "TABLE_NAME")
+      .baselineValues("drill.information_schema", "VIEWS")
+      .baselineValues("drill.information_schema", "CATALOGS")
+      .baselineValues("drill.information_schema", "COLUMNS")
+      .baselineValues("drill.information_schema", "PARTITIONS")
+      .baselineValues("drill.information_schema", "FILES")
+      .baselineValues("drill.information_schema", "SCHEMATA")
+      .baselineValues("drill.information_schema", "TABLES")
+      .go();
+  }
+
+  @Test
+  public void testProjectPushDown() throws Exception {
+    String query = "select n_nationkey, n_regionkey, n_name from drill.%s";
+
+    queryBuilder()
+        .sql(query, TABLE_NAME)
+        .planMatcher()
+        .include("query=\"SELECT `n_nationkey`, `n_regionkey`, `n_name`")
+        .exclude("\\*")
+        .match();
+
+    RowSet sets = queryBuilder()
+      .sql(query, TABLE_NAME)
+      .rowSet();
+
+    TupleMetadata schema = new SchemaBuilder()
+      .add("n_nationkey", TypeProtos.MinorType.INT)
+      .add("n_regionkey", TypeProtos.MinorType.INT)
+      .add("n_name", TypeProtos.MinorType.VARCHAR)
+      .build();
+
+    RowSet expected = new RowSetBuilder(client.allocator(), schema)
+      .addRow(0, 0, "ALGERIA")
+      .addRow(1, 1, "ARGENTINA")
+      .addRow(2, 1, "BRAZIL")
+      .addRow(3, 1, "CANADA")
+      .addRow(4, 4, "EGYPT")
+      .addRow(5, 0, "ETHIOPIA")
+      .addRow(6, 3, "FRANCE")
+      .addRow(7, 3, "GERMANY")
+      .addRow(8, 2, "INDIA")
+      .addRow(9, 2, "INDONESIA")
+      .addRow(10, 4, "IRAN")
+      .addRow(11, 4, "IRAQ")
+      .addRow(12, 2, "JAPAN")
+      .addRow(13, 4, "JORDAN")
+      .addRow(14, 0, "KENYA")
+      .addRow(15, 0, "MOROCCO")
+      .addRow(16, 0, "MOZAMBIQUE")
+      .addRow(17, 1, "PERU")
+      .addRow(18, 2, "CHINA")
+      .addRow(19, 3, "ROMANIA")
+      .addRow(20, 4, "SAUDI ARABIA")
+      .addRow(21, 2, "VIETNAM")
+      .addRow(22, 3, "RUSSIA")
+      .addRow(23, 3, "UNITED KINGDOM")
+      .addRow(24, 1, "UNITED STATES")
+      .build();
+
+    new RowSetComparison(expected).verifyAndClearAll(sets);
+  }
+
+  @Test
+  public void testFilterPushDown() throws Exception {
+    String query = "select n_name, n_nationkey from drill.%s where n_nationkey = 0";
+    queryBuilder()
+        .sql(query, TABLE_NAME)
+        .planMatcher()
+        .include("WHERE")
+        .exclude("Filter")
+        .match();
+
+    testBuilder()
+      .unOrdered()
+      .sqlQuery(query, TABLE_NAME)
+      .baselineColumns("n_name", "n_nationkey")
+      .baselineValues("ALGERIA", 0)
+      .go();
+  }
+
+  @Test
+  public void testFilterPushDownWithJoin() throws Exception {
+    String query = "select * from drill.%s e\n" +
+        "join drill.%s s on e.n_nationkey = s.n_nationkey where e.n_name = 'BRAZIL'";
+
+    queryBuilder()
+        .sql(query, TABLE_NAME, TABLE_NAME)
+        .planMatcher()
+        .include("INNER JOIN")
+        .match();
+
+    testBuilder()
+      .ordered()
+      .sqlQuery(query, TABLE_NAME, TABLE_NAME)
+      .baselineColumns("n_nationkey", "n_name", "n_regionkey", "n_comment", "n_nationkey0",
+        "n_name0", "n_regionkey0", "n_comment0")
+      .baselineValues(2, "BRAZIL", 1, "y alongside of the pending deposits. carefully special " +
+        "packages are about the ironic forges. slyly special ", 2, "BRAZIL", 1, "y alongside of " +
+        "the pending deposits. carefully special packages are about the ironic forges. slyly special ")
+      .go();
+  }
+
+  @Test
+  public void testJoinDifferentDrillPlugins() throws Exception {
+    String query = "select * from drill.%s e\n" +
+      "join drill2.cp.`tpch/nation.parquet` s on e.n_nationkey = s.n_nationkey where e.n_name = 'BRAZIL'";
+
+    queryBuilder()
+      .sql(query, TABLE_NAME, TABLE_NAME)
+      .planMatcher()
+      .include("HashJoin")
+      .exclude("INNER JOIN")
+      .match();
+
+    testBuilder()
+      .unOrdered()
+      .sqlQuery(query, TABLE_NAME, TABLE_NAME)
+      .baselineColumns("n_nationkey", "n_name", "n_regionkey", "n_comment", "n_nationkey0",
+        "n_name0", "n_regionkey0", "n_comment0")
+      .baselineValues(2, "BRAZIL", 1, "y alongside of the pending deposits. carefully special " +
+        "packages are about the ironic forges. slyly special ", 2, "BRAZIL", 1, "y alongside of " +
+        "the pending deposits. carefully special packages are about the ironic forges. slyly special ")
+      .go();
+  }
+
+  @Test
+  public void testAggregationPushDown() throws Exception {
+    String query = "select count(*) c from drill.%s";
+    queryBuilder()
+        .sql(query, TABLE_NAME)
+        .planMatcher()
+        .include("query=\"SELECT COUNT\\(\\*\\)")
+        .match();
+
+    testBuilder()
+      .unOrdered()
+      .sqlQuery(query, TABLE_NAME)
+      .baselineColumns("c")
+      .baselineValues(25L)
+      .go();
+  }
+
+  @Test
+  public void testLimitPushDown() throws Exception {
+    String query = "select n_name from drill.%s FETCH NEXT 1 ROWS ONLY";
+    queryBuilder()
+        .sql(query, TABLE_NAME)
+        .planMatcher()
+        .include("FETCH NEXT 1 ROWS ONLY")
+        .match();
+
+    testBuilder()
+      .unOrdered()
+      .sqlQuery(query, TABLE_NAME)
+      .baselineColumns("n_name")
+      .baselineValues("ALGERIA")
+      .go();
+  }
+
+  @Test
+  public void testLimitWithSortPushDown() throws Exception {
+    String query = "select n_nationkey from drill.%s order by n_name limit 3";
+    queryBuilder()
+        .sql(query, TABLE_NAME)
+        .planMatcher()
+        .include("ORDER BY `n_name`", "FETCH NEXT 3 ROWS ONLY")
+        .match();
+
+    testBuilder()
+      .unOrdered()
+      .sqlQuery(query, TABLE_NAME)
+      .baselineColumns("n_nationkey")
+      .baselineValues(0)
+      .baselineValues(1)
+      .baselineValues(2)
+      .go();
+  }
+
+  @Test
+  public void testAggregationWithGroupByPushDown() throws Exception {
+    String query = "select sum(n_nationkey) s from drill.%s group by n_regionkey";
+    queryBuilder()
+        .sql(query, TABLE_NAME)
+        .planMatcher()
+        .include("query=\"SELECT SUM\\(`n_nationkey`\\)", "GROUP BY `n_regionkey`")
+        .match();
+
+    testBuilder()
+      .unOrdered()
+      .sqlQuery(query, TABLE_NAME)
+      .baselineColumns("s")
+      .baselineValues(47L)
+      .baselineValues(50L)
+      .baselineValues(58L)
+      .baselineValues(68L)
+      .baselineValues(77L)
+      .go();
+  }
+
+  @Test
+  public void testUnionAllPushDown() throws Exception {
+    String query = "select col1, col2 from drill.%s " +
+      "union all " +
+      "select col1, col2 from drill.%s";
+    queryBuilder()
+      .sql(query, TABLE_NAME, TABLE_NAME)
+      .planMatcher()
+      .include("UNION ALL")
+      .match();
+
+    long recordCount = queryBuilder()
+      .sql(query, TABLE_NAME, TABLE_NAME)
+      .run()
+      .recordCount();
+
+    assertEquals(50L, recordCount);
+  }
+}
diff --git a/distribution/pom.xml b/distribution/pom.xml
index f0fb9537c9..eff3b81671 100644
--- a/distribution/pom.xml
+++ b/distribution/pom.xml
@@ -489,6 +489,11 @@
           <artifactId>drill-deltalake-format</artifactId>
           <version>${project.version}</version>
         </dependency>
+        <dependency>
+          <groupId>org.apache.drill.contrib</groupId>
+          <artifactId>drill-storage</artifactId>
+          <version>${project.version}</version>
+        </dependency>
       </dependencies>
     </profile>
 
diff --git a/distribution/src/assemble/component.xml b/distribution/src/assemble/component.xml
index 7708be4929..36c7133430 100644
--- a/distribution/src/assemble/component.xml
+++ b/distribution/src/assemble/component.xml
@@ -70,6 +70,7 @@
         <include>org.apache.drill.contrib:drill-druid-storage:jar</include>
         <include>org.apache.drill.contrib:drill-iceberg-format:jar</include>
         <include>org.apache.drill.contrib:drill-deltalake-format:jar</include>
+        <include>org.apache.drill.contrib:drill-storage:jar</include>
       </includes>
       <outputDirectory>jars</outputDirectory>
       <useProjectArtifact>false</useProjectArtifact>
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
index ded472e32c..740f8996ce 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
@@ -308,6 +308,8 @@ public final class ExecConstants {
   /** Size of JDBC batch queue (in batches) above which throttling begins. */
   public static final String JDBC_BATCH_QUEUE_THROTTLING_THRESHOLD =
       "drill.jdbc.batch_queue_throttling_threshold";
+  public static final String JDBC_QUERY_TIMEOUT =
+    "drill.jdbc.query_timeout";
   // Thread pool size for scan threads. Used by the Parquet scan.
   public static final String SCAN_THREADPOOL_SIZE = "drill.exec.scan.threadpool_size";
   // The size of the thread pool used by a scan to decode the data. Used by Parquet
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/LeafPrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/LeafPrel.java
index 55a79556bc..bbd992faa2 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/LeafPrel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/LeafPrel.java
@@ -17,6 +17,8 @@
  */
 package org.apache.drill.exec.planner.physical;
 
+import org.apache.drill.exec.planner.physical.visitor.PrelVisitor;
+
 import java.util.Collections;
 import java.util.Iterator;
 
@@ -34,4 +36,9 @@ public interface LeafPrel extends Prel {
   default Iterator<Prel> iterator() {
     return Collections.emptyIterator();
   }
+
+  @Override
+  default  <T, X, E extends Throwable> T accept(PrelVisitor<T, X, E> logicalVisitor, X value) throws E {
+    return logicalVisitor.visitLeaf(this, value);
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/StarColumnConverter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/StarColumnConverter.java
index 8d91628bf1..03a72a619a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/StarColumnConverter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/StarColumnConverter.java
@@ -193,7 +193,8 @@ public class StarColumnConverter extends BasePrelVisitor<Prel, Void, RuntimeExce
     // Require prefix rename : there exists other expression, in addition to a star column.
     if (!prefixedForStar  // not set yet.
         && StarColumnHelper.containsStarColumn(prel.getRowType())
-        && prel.getRowType().getFieldNames().size() > 1) {
+        && prel.getRowType().getFieldNames().size() > 1
+        && !(prel instanceof LeafPrel)) {
       prefixedForStar = true;
     }
 
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/TopProjectVisitor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/TopProjectVisitor.java
index 29a75aa490..03908a5347 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/TopProjectVisitor.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/TopProjectVisitor.java
@@ -20,6 +20,7 @@ package org.apache.drill.exec.planner.physical.visitor;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.core.Project;
 import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeField;
 import org.apache.calcite.rex.RexBuilder;
 import org.apache.calcite.rex.RexNode;
 import org.apache.calcite.rex.RexUtil;
@@ -74,19 +75,20 @@ public class TopProjectVisitor extends BasePrelVisitor<Prel, Void, RuntimeExcept
 
   @Override
   public Prel visitScreen(ScreenPrel prel, Void value) {
-    // insert project under screen only if we don't have writer underneath
-    if (containsWriter(prel)) {
+    // insert project under screen only if we don't have writer underneath or dynamic star is projected
+    if (containsWriter(prel)
+      || prel.getRowType().getFieldList().stream().allMatch(RelDataTypeField::isDynamicStar)) {
       return prel;
     }
 
     Prel newChild = ((Prel) prel.getInput()).accept(this, value);
-    return prel.copy(prel.getTraitSet(), Collections.singletonList((RelNode)addTopProjectPrel(newChild, validatedRowType)));
+    return prel.copy(prel.getTraitSet(), Collections.singletonList(addTopProjectPrel(newChild, validatedRowType)));
   }
 
   @Override
   public Prel visitWriter(WriterPrel prel, Void value) {
     Prel newChild = ((Prel) prel.getInput()).accept(this, value);
-    return prel.copy(prel.getTraitSet(), Collections.singletonList((RelNode)addTopProjectPrel(newChild, validatedRowType)));
+    return prel.copy(prel.getTraitSet(), Collections.singletonList(addTopProjectPrel(newChild, validatedRowType)));
   }
 
   /**
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/BlockingResultsListener.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/BlockingResultsListener.java
new file mode 100644
index 0000000000..976904b323
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/BlockingResultsListener.java
@@ -0,0 +1,252 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.rpc.user;
+
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.exec.proto.UserBitShared;
+import org.apache.drill.exec.proto.helper.QueryIdHelper;
+import org.apache.drill.exec.rpc.ConnectionThrottle;
+import org.apache.drill.shaded.guava.com.google.common.base.Stopwatch;
+import org.apache.drill.shaded.guava.com.google.common.collect.Queues;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.SQLTimeoutException;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Supplier;
+
+public class BlockingResultsListener implements UserResultsListener {
+  private static final Logger logger = LoggerFactory.getLogger(BlockingResultsListener.class);
+
+  private static final AtomicInteger NEXT_INSTANCE_ID = new AtomicInteger(1);
+
+  private final int instanceId;
+
+  private final int batchQueueThrottlingThreshold;
+
+  private volatile UserBitShared.QueryId queryId;
+
+  private int lastReceivedBatchNumber;
+
+  private int lastDequeuedBatchNumber;
+
+  private volatile UserException executionFailureException;
+
+  private volatile boolean completed;
+
+  /**
+   * Whether throttling of incoming data is active.
+   */
+  private final AtomicBoolean throttled = new AtomicBoolean(false);
+
+  private volatile ConnectionThrottle throttle;
+
+  private volatile boolean closed;
+
+  private final CountDownLatch firstMessageReceived = new CountDownLatch(1);
+
+  private final LinkedBlockingDeque<QueryDataBatch> batchQueue =
+    Queues.newLinkedBlockingDeque();
+
+  private final Supplier<Stopwatch> elapsedTimer;
+
+  private final Supplier<Long> timeoutInMilliseconds;
+
+  public BlockingResultsListener(Supplier<Stopwatch> elapsedTimer, Supplier<Long> timeoutInMilliseconds,
+    int batchQueueThrottlingThreshold) {
+    this.elapsedTimer = elapsedTimer;
+    this.timeoutInMilliseconds = timeoutInMilliseconds;
+    this.instanceId = NEXT_INSTANCE_ID.getAndIncrement();
+    this.batchQueueThrottlingThreshold = batchQueueThrottlingThreshold;
+    logger.debug("[#{}] Query listener created.", instanceId);
+  }
+
+  /**
+   * Starts throttling if not currently throttling.
+   *
+   * @param throttle the "throttlable" object to throttle
+   * @return true if actually started (wasn't throttling already)
+   */
+  private boolean startThrottlingIfNot(ConnectionThrottle throttle) {
+    final boolean started = throttled.compareAndSet(false, true);
+    if (started) {
+      this.throttle = throttle;
+      throttle.setAutoRead(false);
+    }
+    return started;
+  }
+
+  /**
+   * Stops throttling if currently throttling.
+   *
+   * @return true if actually stopped (was throttling)
+   */
+  private boolean stopThrottlingIfSo() {
+    final boolean stopped = throttled.compareAndSet(true, false);
+    if (stopped) {
+      throttle.setAutoRead(true);
+      throttle = null;
+    }
+    return stopped;
+  }
+
+  public void awaitFirstMessage() throws InterruptedException, SQLTimeoutException {
+    //Check if a non-zero timeout has been set
+    if (timeoutInMilliseconds.get() > 0) {
+      //Identifying remaining in milliseconds to maintain a granularity close to integer value of
+      // timeout
+      long timeToTimeout =
+        timeoutInMilliseconds.get() - elapsedTimer.get().elapsed(TimeUnit.MILLISECONDS);
+      if (timeToTimeout <= 0 || !firstMessageReceived.await(timeToTimeout, TimeUnit.MILLISECONDS)) {
+        throw new SQLTimeoutException("Query timed out in "+ TimeUnit.MILLISECONDS.toSeconds(timeoutInMilliseconds.get()) + " seconds");
+      }
+    } else {
+      firstMessageReceived.await();
+    }
+  }
+
+  private void releaseIfFirst() {
+    firstMessageReceived.countDown();
+  }
+
+  @Override
+  public void queryIdArrived(UserBitShared.QueryId queryId) {
+    logger.debug("[#{}] Received query ID: {}.",
+      instanceId, QueryIdHelper.getQueryId(queryId));
+    this.queryId = queryId;
+  }
+
+  @Override
+  public void submissionFailed(UserException ex) {
+    logger.debug("Received query failure: {} {}", instanceId, ex);
+    this.executionFailureException = ex;
+    this.completed = true;
+    close();
+    logger.info("[#{}] Query failed: ", instanceId, ex);
+  }
+
+  @Override
+  public void dataArrived(QueryDataBatch result, ConnectionThrottle throttle) {
+    lastReceivedBatchNumber++;
+    logger.debug("[#{}] Received query data batch #{}: {}.",
+      instanceId, lastReceivedBatchNumber, result);
+
+    // If we're in a closed state, just release the message.
+    if (closed) {
+      result.release();
+      completed = true;
+      return;
+    }
+
+    // We're active; let's add to the queue.
+    batchQueue.add(result);
+
+    // Throttle server if queue size has exceed threshold.
+    if (batchQueue.size() > batchQueueThrottlingThreshold) {
+      if (startThrottlingIfNot(throttle)) {
+        logger.debug("[#{}] Throttling started at queue size {}.",
+          instanceId, batchQueue.size());
+      }
+    }
+
+    releaseIfFirst();
+  }
+
+  @Override
+  public void queryCompleted(UserBitShared.QueryResult.QueryState state) {
+    logger.debug("[#{}] Received query completion: {}.", instanceId, state);
+    releaseIfFirst();
+    completed = true;
+  }
+
+  public UserBitShared.QueryId getQueryId() {
+    return queryId;
+  }
+
+  /**
+   * Gets the next batch of query results from the queue.
+   *
+   * @return the next batch, or {@code null} after last batch has been returned
+   * @throws UserException        if the query failed
+   * @throws InterruptedException if waiting on the queue was interrupted
+   */
+  public QueryDataBatch getNext() throws UserException, InterruptedException, SQLTimeoutException {
+    while (true) {
+      if (executionFailureException != null) {
+        logger.debug("[#{}] Dequeued query failure exception: {}.",
+          instanceId, executionFailureException);
+        throw executionFailureException;
+      }
+      if (completed && batchQueue.isEmpty()) {
+        return null;
+      } else {
+        QueryDataBatch qdb = batchQueue.poll(50, TimeUnit.MILLISECONDS);
+        if (qdb != null) {
+          lastDequeuedBatchNumber++;
+          logger.debug("[#{}] Dequeued query data batch #{}: {}.",
+            instanceId, lastDequeuedBatchNumber, qdb);
+
+          // Unthrottle server if queue size has dropped enough below threshold:
+          if (batchQueue.size() < batchQueueThrottlingThreshold / 2
+            || batchQueue.size() == 0  // (in case threshold < 2)
+          ) {
+            if (stopThrottlingIfSo()) {
+              logger.debug("[#{}] Throttling stopped at queue size {}.",
+                instanceId, batchQueue.size());
+            }
+          }
+          return qdb;
+        }
+
+        // Check and throw SQLTimeoutException
+        if (timeoutInMilliseconds.get() > 0 && elapsedTimer.get().elapsed(TimeUnit.MILLISECONDS) >= timeoutInMilliseconds.get()) {
+          throw new SQLTimeoutException("Query timed out in "+ TimeUnit.MILLISECONDS.toSeconds(timeoutInMilliseconds.get()) + " seconds");
+        }
+      }
+    }
+  }
+
+  public void close() {
+    logger.debug("[#{}] Query listener closing.", instanceId);
+    closed = true;
+    if (stopThrottlingIfSo()) {
+      logger.debug("[#{}] Throttling stopped at close() (at queue size {}).",
+        instanceId, batchQueue.size());
+    }
+    while (!batchQueue.isEmpty()) {
+      // Don't bother with query timeout, we're closing the cursor
+      QueryDataBatch qdb = batchQueue.poll();
+      if (qdb != null && qdb.getData() != null) {
+        qdb.getData().release();
+      }
+    }
+    // Close may be called before the first result is received and therefore
+    // when the main thread is blocked waiting for the result.  In that case
+    // we want to unblock the main thread.
+    releaseIfFirst();
+    completed = true;
+  }
+
+  public boolean isCompleted() {
+    return completed;
+  }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/enumerable/plan/EnumerablePrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/enumerable/plan/EnumerablePrel.java
index 2d6e5c8572..f22c69a6ba 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/enumerable/plan/EnumerablePrel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/enumerable/plan/EnumerablePrel.java
@@ -31,7 +31,6 @@ import org.apache.drill.exec.physical.base.PhysicalOperator;
 import org.apache.drill.exec.planner.common.DrillRelOptUtil;
 import org.apache.drill.exec.planner.physical.LeafPrel;
 import org.apache.drill.exec.planner.physical.PhysicalPlanCreator;
-import org.apache.drill.exec.planner.physical.visitor.PrelVisitor;
 import org.apache.drill.exec.record.BatchSchema;
 import org.apache.drill.exec.record.metadata.TupleMetadata;
 import org.apache.drill.exec.record.metadata.schema.SchemaProvider;
@@ -104,11 +103,6 @@ public class EnumerablePrel extends AbstractRelNode implements LeafPrel {
     return rows;
   }
 
-  @Override
-  public <T, X, E extends Throwable> T accept(PrelVisitor<T, X, E> logicalVisitor, X value) throws E {
-    return logicalVisitor.visitLeaf(this, value);
-  }
-
   @Override
   public BatchSchema.SelectionVectorMode[] getSupportedEncodings() {
     return BatchSchema.SelectionVectorMode.DEFAULT;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rel/PluginPrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rel/PluginPrel.java
index cc9642687f..1d4fff8df1 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rel/PluginPrel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rel/PluginPrel.java
@@ -25,21 +25,18 @@ import org.apache.calcite.rel.type.RelDataType;
 import org.apache.drill.common.exceptions.DrillRuntimeException;
 import org.apache.drill.exec.physical.base.GroupScan;
 import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.planner.physical.LeafPrel;
 import org.apache.drill.exec.planner.physical.PhysicalPlanCreator;
-import org.apache.drill.exec.planner.physical.Prel;
-import org.apache.drill.exec.planner.physical.visitor.PrelVisitor;
 import org.apache.drill.exec.record.BatchSchema;
 import org.apache.drill.exec.store.SubsetRemover;
 import org.apache.drill.exec.store.plan.PluginImplementor;
 
 import java.io.IOException;
-import java.util.Collections;
-import java.util.Iterator;
 
 /**
  * Represents a plugin-specific plan once children nodes have been pushed down into group scan.
  */
-public class PluginPrel extends AbstractRelNode implements Prel {
+public class PluginPrel extends AbstractRelNode implements LeafPrel {
   private final GroupScan groupScan;
   private final RelDataType rowType;
 
@@ -61,11 +58,6 @@ public class PluginPrel extends AbstractRelNode implements Prel {
     return creator.addMetadata(this, groupScan);
   }
 
-  @Override
-  public <T, X, E extends Throwable> T accept(PrelVisitor<T, X, E> logicalVisitor, X value) throws E {
-    return logicalVisitor.visitPrel(this, value);
-  }
-
   @Override
   public BatchSchema.SelectionVectorMode[] getSupportedEncodings() {
     return BatchSchema.SelectionVectorMode.DEFAULT;
@@ -81,11 +73,6 @@ public class PluginPrel extends AbstractRelNode implements Prel {
     return false;
   }
 
-  @Override
-  public Iterator<Prel> iterator() {
-    return Collections.emptyIterator();
-  }
-
   @Override
   public RelWriter explainTerms(RelWriter pw) {
     return super.explainTerms(pw).item("groupScan", groupScan);
diff --git a/exec/java-exec/src/main/resources/drill-module.conf b/exec/java-exec/src/main/resources/drill-module.conf
index 5468f2695a..4dc45ec857 100644
--- a/exec/java-exec/src/main/resources/drill-module.conf
+++ b/exec/java-exec/src/main/resources/drill-module.conf
@@ -507,7 +507,8 @@ drill.exec: {
 }
 
 drill.jdbc: {
-  batch_queue_throttling_threshold: 100
+  batch_queue_throttling_threshold: 100,
+  query_timeout: 600000
 }
 
 # The following are defaults for system and session options.
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/planner/rm/TestMemoryCalculator.java b/exec/java-exec/src/test/java/org/apache/drill/exec/planner/rm/TestMemoryCalculator.java
index 891b4a6a22..d27bec118b 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/planner/rm/TestMemoryCalculator.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/planner/rm/TestMemoryCalculator.java
@@ -193,7 +193,7 @@ public class TestMemoryCalculator extends PlanTestBase {
     SimpleParallelizer parallelizer = new QueueQueryParallelizer(false, queryContext);
     PlanningSet planningSet = preparePlanningSet(activeEndpoints, DEFAULT_SLICE_TARGET, resources, sql, parallelizer);
     parallelizer.adjustMemory(planningSet, createSet(planningSet.getRootWrapper()), activeEndpoints);
-    assertTrue("memory requirement is different", Iterables.all(resources.entrySet(), (e) -> e.getValue().getMemory() == 30));
+    assertTrue("memory requirement is different", Iterables.all(resources.entrySet(), (e) -> e.getValue().getMemory() == 20));
   }
 
 
diff --git a/exec/jdbc/src/main/java/org/apache/drill/jdbc/impl/DrillCursor.java b/exec/jdbc/src/main/java/org/apache/drill/jdbc/impl/DrillCursor.java
index 65d94dbcc0..f627b3ffcd 100644
--- a/exec/jdbc/src/main/java/org/apache/drill/jdbc/impl/DrillCursor.java
+++ b/exec/jdbc/src/main/java/org/apache/drill/jdbc/impl/DrillCursor.java
@@ -20,15 +20,12 @@ package org.apache.drill.jdbc.impl;
 import static org.slf4j.LoggerFactory.getLogger;
 
 import java.sql.SQLException;
-import java.sql.SQLTimeoutException;
 import java.util.ArrayList;
 import java.util.Calendar;
 import java.util.List;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.LinkedBlockingDeque;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
 
+import org.apache.drill.exec.rpc.user.BlockingResultsListener;
 import org.apache.drill.jdbc.DrillStatement;
 import org.apache.drill.shaded.guava.com.google.common.base.Stopwatch;
 import org.apache.calcite.avatica.AvaticaStatement;
@@ -40,249 +37,18 @@ import org.apache.calcite.avatica.util.Cursor;
 import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.client.DrillClient;
-import org.apache.drill.exec.proto.UserBitShared.QueryId;
-import org.apache.drill.exec.proto.UserBitShared.QueryResult.QueryState;
 import org.apache.drill.exec.proto.UserBitShared.QueryType;
 import org.apache.drill.exec.proto.UserProtos.PreparedStatement;
 import org.apache.drill.exec.proto.helper.QueryIdHelper;
 import org.apache.drill.exec.record.BatchSchema;
 import org.apache.drill.exec.record.RecordBatchLoader;
-import org.apache.drill.exec.rpc.ConnectionThrottle;
 import org.apache.drill.exec.rpc.user.QueryDataBatch;
-import org.apache.drill.exec.rpc.user.UserResultsListener;
 import org.apache.drill.exec.store.ischema.InfoSchemaConstants;
 import org.apache.drill.jdbc.SchemaChangeListener;
-import org.apache.drill.jdbc.SqlTimeoutException;
 import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.drill.shaded.guava.com.google.common.collect.Queues;
 
 public class DrillCursor implements Cursor {
 
-  ////////////////////////////////////////
-  // ResultsListener:
-  static class ResultsListener implements UserResultsListener {
-    private static final Logger logger = LoggerFactory.getLogger(ResultsListener.class);
-
-    private static volatile int nextInstanceId = 1;
-
-    /** (Just for logging.) */
-    private final int instanceId;
-
-    private final int batchQueueThrottlingThreshold;
-
-    /** (Just for logging.) */
-    private volatile QueryId queryId;
-
-    /** (Just for logging.) */
-    private int lastReceivedBatchNumber;
-    /** (Just for logging.) */
-    private int lastDequeuedBatchNumber;
-
-    private volatile UserException executionFailureException;
-
-    // TODO:  Revisit "completed".  Determine and document exactly what it
-    // means.  Some uses imply that it means that incoming messages indicate
-    // that the _query_ has _terminated_ (not necessarily _completing_
-    // normally), while some uses imply that it's some other state of the
-    // ResultListener.  Some uses seem redundant.)
-    volatile boolean completed;
-
-    /** Whether throttling of incoming data is active. */
-    private final AtomicBoolean throttled = new AtomicBoolean(false);
-    private volatile ConnectionThrottle throttle;
-
-    private volatile boolean closed;
-
-    private final CountDownLatch firstMessageReceived = new CountDownLatch(1);
-
-    final LinkedBlockingDeque<QueryDataBatch> batchQueue =
-        Queues.newLinkedBlockingDeque();
-
-    private final DrillCursor parent;
-    Stopwatch elapsedTimer;
-
-    /**
-     * ...
-     * @param parent
-     *        reference to DrillCursor
-     * @param batchQueueThrottlingThreshold
-     *        queue size threshold for throttling server
-     */
-    ResultsListener(DrillCursor parent, int batchQueueThrottlingThreshold) {
-      this.parent = parent;
-      instanceId = nextInstanceId++;
-      this.batchQueueThrottlingThreshold = batchQueueThrottlingThreshold;
-      logger.debug("[#{}] Query listener created.", instanceId);
-    }
-
-    /**
-     * Starts throttling if not currently throttling.
-     * @param  throttle  the "throttlable" object to throttle
-     * @return  true if actually started (wasn't throttling already)
-     */
-    private boolean startThrottlingIfNot(ConnectionThrottle throttle) {
-      final boolean started = throttled.compareAndSet(false, true);
-      if (started) {
-        this.throttle = throttle;
-        throttle.setAutoRead(false);
-      }
-      return started;
-    }
-
-    /**
-     * Stops throttling if currently throttling.
-     * @return  true if actually stopped (was throttling)
-     */
-    private boolean stopThrottlingIfSo() {
-      final boolean stopped = throttled.compareAndSet(true, false);
-      if (stopped) {
-        throttle.setAutoRead(true);
-        throttle = null;
-      }
-      return stopped;
-    }
-
-    public void awaitFirstMessage() throws InterruptedException, SQLTimeoutException {
-      //Check if a non-zero timeout has been set
-      if (parent.timeoutInMilliseconds > 0) {
-        //Identifying remaining in milliseconds to maintain a granularity close to integer value of timeout
-        long timeToTimeout = parent.timeoutInMilliseconds - parent.elapsedTimer.elapsed(TimeUnit.MILLISECONDS);
-        if (timeToTimeout <= 0 || !firstMessageReceived.await(timeToTimeout, TimeUnit.MILLISECONDS)) {
-            throw new SqlTimeoutException(TimeUnit.MILLISECONDS.toSeconds(parent.timeoutInMilliseconds));
-        }
-      } else {
-        firstMessageReceived.await();
-      }
-    }
-
-    private void releaseIfFirst() {
-      firstMessageReceived.countDown();
-    }
-
-    @Override
-    public void queryIdArrived(QueryId queryId) {
-      logger.debug("[#{}] Received query ID: {}.",
-                    instanceId, QueryIdHelper.getQueryId(queryId));
-      this.queryId = queryId;
-    }
-
-    @Override
-    public void submissionFailed(UserException ex) {
-      logger.debug("Received query failure: {} {}", instanceId, ex);
-      this.executionFailureException = ex;
-      completed = true;
-      close();
-      logger.info("[#{}] Query failed: ", instanceId, ex);
-    }
-
-    @Override
-    public void dataArrived(QueryDataBatch result, ConnectionThrottle throttle) {
-      lastReceivedBatchNumber++;
-      logger.debug("[#{}] Received query data batch #{}: {}.",
-                    instanceId, lastReceivedBatchNumber, result);
-
-      // If we're in a closed state, just release the message.
-      if (closed) {
-        result.release();
-        // TODO:  Revisit member completed:  Is ResultListener really completed
-        // after only one data batch after being closed?
-        completed = true;
-        return;
-      }
-
-      // We're active; let's add to the queue.
-      batchQueue.add(result);
-
-      // Throttle server if queue size has exceed threshold.
-      if (batchQueue.size() > batchQueueThrottlingThreshold) {
-        if (startThrottlingIfNot(throttle)) {
-          logger.debug("[#{}] Throttling started at queue size {}.",
-                        instanceId, batchQueue.size());
-        }
-      }
-
-      releaseIfFirst();
-    }
-
-    @Override
-    public void queryCompleted(QueryState state) {
-      logger.debug("[#{}] Received query completion: {}.", instanceId, state);
-      releaseIfFirst();
-      completed = true;
-    }
-
-    QueryId getQueryId() {
-      return queryId;
-    }
-
-    /**
-     * Gets the next batch of query results from the queue.
-     * @return  the next batch, or {@code null} after last batch has been returned
-     * @throws UserException
-     *         if the query failed
-     * @throws InterruptedException
-     *         if waiting on the queue was interrupted
-     */
-    QueryDataBatch getNext() throws UserException, InterruptedException, SQLTimeoutException {
-      while (true) {
-        if (executionFailureException != null) {
-          logger.debug("[#{}] Dequeued query failure exception: {}.",
-                        instanceId, executionFailureException);
-          throw executionFailureException;
-        }
-        if (completed && batchQueue.isEmpty()) {
-          return null;
-        } else {
-          QueryDataBatch qdb = batchQueue.poll(50, TimeUnit.MILLISECONDS);
-          if (qdb != null) {
-            lastDequeuedBatchNumber++;
-            logger.debug("[#{}] Dequeued query data batch #{}: {}.",
-                          instanceId, lastDequeuedBatchNumber, qdb);
-
-            // Unthrottle server if queue size has dropped enough below threshold:
-            if (batchQueue.size() < batchQueueThrottlingThreshold / 2
-                 || batchQueue.size() == 0  // (in case threshold < 2)
-                ) {
-              if (stopThrottlingIfSo()) {
-                logger.debug("[#{}] Throttling stopped at queue size {}.",
-                              instanceId, batchQueue.size());
-              }
-            }
-            return qdb;
-          }
-
-          // Check and throw SQLTimeoutException
-          if (parent.timeoutInMilliseconds > 0 && parent.elapsedTimer.elapsed(TimeUnit.MILLISECONDS) >= parent.timeoutInMilliseconds) {
-            throw new SqlTimeoutException(TimeUnit.MILLISECONDS.toSeconds(parent.timeoutInMilliseconds));
-          }
-        }
-      }
-    }
-
-    void close() {
-      logger.debug("[#{}] Query listener closing.", instanceId);
-      closed = true;
-      if (stopThrottlingIfSo()) {
-        logger.debug("[#{}] Throttling stopped at close() (at queue size {}).",
-                      instanceId, batchQueue.size());
-      }
-      while (!batchQueue.isEmpty()) {
-        // Don't bother with query timeout, we're closing the cursor
-        QueryDataBatch qdb = batchQueue.poll();
-        if (qdb != null && qdb.getData() != null) {
-          qdb.getData().release();
-        }
-      }
-      // Close may be called before the first result is received and therefore
-      // when the main thread is blocked waiting for the result.  In that case
-      // we want to unblock the main thread.
-      firstMessageReceived.countDown(); // TODO:  Why not call releaseIfFirst as used elsewhere?
-      completed = true;
-    }
-  }
-
   private static final Logger logger = getLogger(DrillCursor.class);
 
   /** JDBC-specified string for unknown catalog, schema, and table names. */
@@ -295,7 +61,7 @@ public class DrillCursor implements Cursor {
   /** Holds current batch of records (none before first load). */
   private final RecordBatchLoader currentBatchHolder;
 
-  private final ResultsListener resultsListener;
+  private final BlockingResultsListener resultsListener;
   private SchemaChangeListener changeListener;
 
   private final DrillAccessorList accessors = new DrillAccessorList();
@@ -355,7 +121,9 @@ public class DrillCursor implements Cursor {
     final int batchQueueThrottlingThreshold =
         client.getConfig().getInt(
             ExecConstants.JDBC_BATCH_QUEUE_THROTTLING_THRESHOLD);
-    resultsListener = new ResultsListener(this, batchQueueThrottlingThreshold);
+    resultsListener = new BlockingResultsListener(this::getElapsedTimer,
+      this::getTimeoutInMilliseconds,
+      batchQueueThrottlingThreshold);
     currentBatchHolder = new RecordBatchLoader(client.getAllocator());
 
     // Set Query Timeout
@@ -395,7 +163,7 @@ public class DrillCursor implements Cursor {
   }
 
   synchronized void cleanup() {
-    if (resultsListener.getQueryId() != null && ! resultsListener.completed) {
+    if (resultsListener.getQueryId() != null && ! resultsListener.isCompleted()) {
       connection.getClient().cancelQuery(resultsListener.getQueryId());
     }
     resultsListener.close();
diff --git a/exec/jdbc/src/main/java/org/apache/drill/jdbc/impl/DrillResultSetImpl.java b/exec/jdbc/src/main/java/org/apache/drill/jdbc/impl/DrillResultSetImpl.java
index 0ce1e796b9..86dac42adf 100644
--- a/exec/jdbc/src/main/java/org/apache/drill/jdbc/impl/DrillResultSetImpl.java
+++ b/exec/jdbc/src/main/java/org/apache/drill/jdbc/impl/DrillResultSetImpl.java
@@ -30,6 +30,7 @@ import java.sql.ResultSetMetaData;
 import java.sql.RowId;
 import java.sql.SQLException;
 import java.sql.SQLFeatureNotSupportedException;
+import java.sql.SQLTimeoutException;
 import java.sql.SQLType;
 import java.sql.SQLXML;
 import java.sql.Time;
@@ -48,7 +49,6 @@ import org.apache.calcite.avatica.util.Cursor;
 import org.apache.drill.jdbc.AlreadyClosedSqlException;
 import org.apache.drill.jdbc.DrillResultSet;
 import org.apache.drill.jdbc.ExecutionCanceledSqlException;
-import org.apache.drill.jdbc.SqlTimeoutException;
 
 import org.apache.drill.shaded.guava.com.google.common.base.Stopwatch;
 
@@ -100,7 +100,7 @@ public class DrillResultSetImpl extends AvaticaResultSet implements DrillResultS
     if (elapsedTimer != null) {
       //The timer has already been started by the DrillCursor at this point
       if (elapsedTimer.elapsed(TimeUnit.MILLISECONDS) > this.queryTimeoutInMilliseconds) {
-        throw new SqlTimeoutException(TimeUnit.MILLISECONDS.toSeconds(this.queryTimeoutInMilliseconds));
+        throw new SQLTimeoutException("Query timed out in "+ TimeUnit.MILLISECONDS.toSeconds(queryTimeoutInMilliseconds) + " seconds");
       }
     }
   }
diff --git a/exec/jdbc/src/test/java/org/apache/drill/jdbc/PreparedStatementTest.java b/exec/jdbc/src/test/java/org/apache/drill/jdbc/PreparedStatementTest.java
index f8ce2e2e37..43fe2dffa4 100644
--- a/exec/jdbc/src/test/java/org/apache/drill/jdbc/PreparedStatementTest.java
+++ b/exec/jdbc/src/test/java/org/apache/drill/jdbc/PreparedStatementTest.java
@@ -347,7 +347,7 @@ public class PreparedStatementTest extends JdbcTestBase {
    * Test setting timeout for a query that actually times out because of lack of timely server response
    */
   @Ignore ( "Pause Injection appears broken for PreparedStatement" )
-  @Test ( expected = SqlTimeoutException.class )
+  @Test ( expected = SQLTimeoutException.class )
   public void testServerTriggeredQueryTimeout() throws Exception {
     //Setting to a very low value (2sec)
     int timeoutDuration = 2;
@@ -383,7 +383,7 @@ public class PreparedStatementTest extends JdbcTestBase {
         }
       } catch (SQLTimeoutException sqlEx) {
         logger.info("SQLTimeoutException thrown: {}", sqlEx.getMessage());
-        throw (SqlTimeoutException) sqlEx;
+        throw sqlEx;
       } finally {
         //Pause briefly to wait for server to unblock
         try {
diff --git a/exec/jdbc/src/test/java/org/apache/drill/jdbc/StatementTest.java b/exec/jdbc/src/test/java/org/apache/drill/jdbc/StatementTest.java
index de15b05d11..2da66bedcf 100644
--- a/exec/jdbc/src/test/java/org/apache/drill/jdbc/StatementTest.java
+++ b/exec/jdbc/src/test/java/org/apache/drill/jdbc/StatementTest.java
@@ -173,7 +173,7 @@ public class StatementTest extends JdbcTestBase {
   /**
    * Test setting timeout for a query that actually times out because of lack of timely server response
    */
-  @Test ( expected = SqlTimeoutException.class )
+  @Test ( expected = SQLTimeoutException.class )
   public void testServerTriggeredQueryTimeout() throws Exception {
     // Setting to a very low value (2sec)
     int timeoutDuration = 2;
@@ -209,7 +209,7 @@ public class StatementTest extends JdbcTestBase {
         }
       } catch (SQLTimeoutException sqlEx) {
         logger.info("SQLTimeoutException thrown: {}", sqlEx.getMessage());
-        throw (SqlTimeoutException) sqlEx;
+        throw sqlEx;
       } finally {
         // Pause briefly to wait for server to unblock
         try {