You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by ja...@apache.org on 2015/09/14 07:28:52 UTC

[05/15] drill git commit: DRILL-3180: JDBC Storage Plugin updates.

DRILL-3180: JDBC Storage Plugin updates.

- Move to leverage Calcite's JDBC adapter capabilities for pushdowns, schema, etc.
- Add test cases using Derby


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/e12cd470
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/e12cd470
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/e12cd470

Branch: refs/heads/master
Commit: e12cd470e4ab57b025840fdfa200a051a01df029
Parents: 8478e9f
Author: Jacques Nadeau <ja...@apache.org>
Authored: Sat Aug 1 18:11:51 2015 -0700
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Sun Sep 13 18:26:44 2015 -0700

----------------------------------------------------------------------
 .../org/apache/drill/common/AutoCloseables.java |   6 +-
 contrib/pom.xml                                 |   1 +
 contrib/storage-jdbc/pom.xml                    |  76 +++
 .../exec/store/jdbc/DrillJdbcRuleBase.java      | 119 +++++
 .../drill/exec/store/jdbc/JdbcBatchCreator.java |  42 ++
 .../apache/drill/exec/store/jdbc/JdbcDrel.java  |  51 ++
 .../exec/store/jdbc/JdbcExpressionCheck.java    | 119 +++++
 .../drill/exec/store/jdbc/JdbcGroupScan.java    | 107 +++++
 .../exec/store/jdbc/JdbcIntermediatePrel.java   |  79 ++++
 .../apache/drill/exec/store/jdbc/JdbcPrel.java  | 120 +++++
 .../drill/exec/store/jdbc/JdbcRecordReader.java | 431 +++++++++++++++++
 .../exec/store/jdbc/JdbcStorageConfig.java      | 120 +++++
 .../exec/store/jdbc/JdbcStoragePlugin.java      | 400 ++++++++++++++++
 .../drill/exec/store/jdbc/JdbcSubScan.java      |  71 +++
 .../resources/bootstrap-storage-plugins.json    |  10 +
 .../src/main/resources/drill-module.conf        |  18 +
 .../drill/exec/store/jdbc/TestJdbcPlugin.java   | 181 +++++++
 .../resources/bootstrap-storage-plugins.json    |  10 +
 .../storage-jdbc/src/test/resources/logback.xml |  48 ++
 contrib/storage-mpjdbc/pom.xml                  |  79 ----
 .../exec/store/mpjdbc/MPJdbcBatchCreator.java   |  54 ---
 .../drill/exec/store/mpjdbc/MPJdbcClient.java   | 300 ------------
 .../exec/store/mpjdbc/MPJdbcClientOptions.java  |  52 --
 .../exec/store/mpjdbc/MPJdbcCnxnManager.java    |  69 ---
 .../exec/store/mpjdbc/MPJdbcFilterBuilder.java  | 235 ---------
 .../exec/store/mpjdbc/MPJdbcFilterRule.java     |  60 ---
 .../exec/store/mpjdbc/MPJdbcFormatConfig.java   | 109 -----
 .../exec/store/mpjdbc/MPJdbcFormatPlugin.java   | 170 -------
 .../exec/store/mpjdbc/MPJdbcGroupScan.java      | 181 -------
 .../exec/store/mpjdbc/MPJdbcRecordReader.java   | 471 -------------------
 .../drill/exec/store/mpjdbc/MPJdbcScanSpec.java |  76 ---
 .../exec/store/mpjdbc/MPJdbcSchemaConfig.java   |  80 ----
 .../exec/store/mpjdbc/MPJdbcSchemaFilter.java   |  23 -
 .../exec/store/mpjdbc/MPJdbcSchemaSubScan.java  |  55 ---
 .../drill/exec/store/mpjdbc/MPJdbcSubScan.java  | 119 -----
 .../resources/bootstrap-storage-plugins.json    |  12 -
 .../src/main/resources/checkstyle-config.xml    |  41 --
 .../main/resources/checkstyle-suppressions.xml  |  19 -
 .../src/main/resources/drill-module.conf        |  30 --
 .../exec/planner/logical/DrillJoinRel.java      |  26 +-
 .../exec/planner/logical/DrillRuleSets.java     |  11 +-
 .../physical/DrillDistributionTraitDef.java     |   7 +-
 .../physical/explain/NumberingRelWriter.java    |  18 +-
 .../drill/exec/planner/sql/SchemaUtilites.java  |   2 +-
 .../planner/sql/handlers/DefaultSqlHandler.java |  13 +-
 .../planner/sql/handlers/PrelFinalizable.java   |  47 ++
 .../apache/drill/exec/store/AbstractSchema.java |   6 +-
 .../drill/exec/store/AbstractStoragePlugin.java |   3 +-
 .../apache/drill/exec/store/StoragePlugin.java  |   3 +-
 .../drill/exec/store/StoragePluginRegistry.java |   8 +-
 .../drill/exec/store/SubSchemaWrapper.java      |  15 +-
 51 files changed, 2115 insertions(+), 2288 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/e12cd470/common/src/main/java/org/apache/drill/common/AutoCloseables.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/drill/common/AutoCloseables.java b/common/src/main/java/org/apache/drill/common/AutoCloseables.java
index c080c52..3c4aa23 100644
--- a/common/src/main/java/org/apache/drill/common/AutoCloseables.java
+++ b/common/src/main/java/org/apache/drill/common/AutoCloseables.java
@@ -36,10 +36,14 @@ public class AutoCloseables {
    * @param logger the logger to use to record the exception if there was one
    */
   public static void close(final AutoCloseable ac, final Logger logger) {
+    if (ac == null) {
+      return;
+    }
+
     try {
       ac.close();
     } catch(Exception e) {
-      logger.warn("Failure on close(): " + e);
+      logger.warn("Failure on close(): {}", e);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/e12cd470/contrib/pom.xml
----------------------------------------------------------------------
diff --git a/contrib/pom.xml b/contrib/pom.xml
index 8c00e76..2f3ac9f 100644
--- a/contrib/pom.xml
+++ b/contrib/pom.xml
@@ -35,6 +35,7 @@
     <module>storage-hbase</module>
     <module>storage-hive</module>
     <module>storage-mongo</module>
+    <module>storage-jdbc</module>
     <module>sqlline</module>
     <module>data</module>
   </modules>

http://git-wip-us.apache.org/repos/asf/drill/blob/e12cd470/contrib/storage-jdbc/pom.xml
----------------------------------------------------------------------
diff --git a/contrib/storage-jdbc/pom.xml b/contrib/storage-jdbc/pom.xml
new file mode 100755
index 0000000..be20811
--- /dev/null
+++ b/contrib/storage-jdbc/pom.xml
@@ -0,0 +1,76 @@
+<?xml version="1.0"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements.  See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd" xmlns="http://maven.apache.org/POM/4.0.0"
+    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <artifactId>drill-contrib-parent</artifactId>
+    <groupId>org.apache.drill.contrib</groupId>
+    <version>1.2.0-SNAPSHOT</version>
+  </parent>
+
+  <artifactId>drill-jdbc-storage</artifactId>
+
+  <name>contrib/jdbc-storage-plugin</name>
+
+  <dependencies>
+
+
+    <dependency>
+      <groupId>org.apache.drill.exec</groupId>
+      <artifactId>drill-java-exec</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    
+    <!-- Test dependencies -->
+    <dependency>
+      <groupId>org.apache.drill.exec</groupId>
+      <artifactId>drill-java-exec</artifactId>
+      <classifier>tests</classifier>
+      <version>${project.version}</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.drill</groupId>
+      <artifactId>drill-common</artifactId>
+      <classifier>tests</classifier>
+      <version>${project.version}</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>com.yammer.metrics</groupId>
+      <artifactId>metrics-core</artifactId>
+      <version>2.1.1</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.derby</groupId>
+      <artifactId>derbyclient</artifactId>
+      <version>10.11.1.1</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.derby</groupId>
+      <artifactId>derbynet</artifactId>
+      <version>10.11.1.1</version>
+      <scope>test</scope>
+    </dependency>    
+  </dependencies>
+
+  
+</project>

http://git-wip-us.apache.org/repos/asf/drill/blob/e12cd470/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/DrillJdbcRuleBase.java
----------------------------------------------------------------------
diff --git a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/DrillJdbcRuleBase.java b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/DrillJdbcRuleBase.java
new file mode 100644
index 0000000..bbb4daf
--- /dev/null
+++ b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/DrillJdbcRuleBase.java
@@ -0,0 +1,119 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.jdbc;
+
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.calcite.adapter.jdbc.JdbcConvention;
+import org.apache.calcite.adapter.jdbc.JdbcRules;
+import org.apache.calcite.plan.Convention;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.plan.RelTrait;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.convert.ConverterRule;
+import org.apache.calcite.rel.logical.LogicalFilter;
+import org.apache.calcite.rel.logical.LogicalProject;
+import org.apache.calcite.rex.RexNode;
+
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+
+abstract class DrillJdbcRuleBase extends ConverterRule {
+
+  protected final LoadingCache<RexNode, Boolean> checkedExpressions = CacheBuilder.newBuilder()
+      .maximumSize(1000)
+      .expireAfterWrite(10, TimeUnit.MINUTES)
+      .build(
+          new CacheLoader<RexNode, Boolean>() {
+            public Boolean load(RexNode expr) {
+              return JdbcExpressionCheck.isOnlyStandardExpressions(expr);
+            }
+          });
+
+  protected final JdbcConvention out;
+
+  private DrillJdbcRuleBase(Class<? extends RelNode> clazz, RelTrait in, JdbcConvention out, String description) {
+    super(clazz, in, out, description);
+    this.out = out;
+  }
+
+  static class DrillJdbcProjectRule extends DrillJdbcRuleBase {
+
+    public DrillJdbcProjectRule(JdbcConvention out) {
+      super(LogicalProject.class, Convention.NONE, out, "JdbcProjectRule");
+    }
+
+    public RelNode convert(RelNode rel) {
+      LogicalProject project = (LogicalProject) rel;
+      return new JdbcRules.JdbcProject(rel.getCluster(), rel.getTraitSet().replace(this.out), convert(
+          project.getInput(), project.getInput().getTraitSet().replace(this.out)), project.getProjects(),
+          project.getRowType());
+    }
+
+    @Override
+    public boolean matches(RelOptRuleCall call) {
+      try {
+
+        final LogicalProject project = (LogicalProject) call.rel(0);
+        for (RexNode node : project.getChildExps()) {
+          if (!checkedExpressions.get(node)) {
+            return false;
+          }
+        }
+        return true;
+
+      } catch (ExecutionException e) {
+        throw new IllegalStateException("Failure while trying to evaluate pushdown.", e);
+      }
+    }
+  }
+
+  static class DrillJdbcFilterRule extends DrillJdbcRuleBase {
+
+    public DrillJdbcFilterRule(JdbcConvention out) {
+      super(LogicalFilter.class, Convention.NONE, out, "DrillJdbcFilterRule");
+    }
+
+    public RelNode convert(RelNode rel) {
+      LogicalFilter filter = (LogicalFilter) rel;
+
+      return new JdbcRules.JdbcFilter(rel.getCluster(), rel.getTraitSet().replace(this.out), convert(filter.getInput(),
+          filter.getInput().getTraitSet().replace(this.out)), filter.getCondition());
+    }
+
+    @Override
+    public boolean matches(RelOptRuleCall call) {
+      try {
+
+        final LogicalFilter filter = (LogicalFilter) call.rel(0);
+        for (RexNode node : filter.getChildExps()) {
+          if (!checkedExpressions.get(node)) {
+            return false;
+          }
+        }
+        return true;
+
+      } catch (ExecutionException e) {
+        throw new IllegalStateException("Failure while trying to evaluate pushdown.", e);
+      }
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/e12cd470/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcBatchCreator.java
----------------------------------------------------------------------
diff --git a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcBatchCreator.java b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcBatchCreator.java
new file mode 100755
index 0000000..fa44b55
--- /dev/null
+++ b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcBatchCreator.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.jdbc;
+
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.physical.impl.BatchCreator;
+import org.apache.drill.exec.physical.impl.ScanBatch;
+//import org.apache.drill.exec.record.CloseableRecordBatch;
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.store.RecordReader;
+
+import com.google.common.base.Preconditions;
+
+public class JdbcBatchCreator implements BatchCreator<JdbcSubScan> {
+  @Override
+  public ScanBatch getBatch(FragmentContext context, JdbcSubScan config,
+      List<RecordBatch> children) throws ExecutionSetupException {
+    Preconditions.checkArgument(children.isEmpty());
+    JdbcStoragePlugin plugin = config.getPlugin();
+    RecordReader reader = new JdbcRecordReader(context, plugin.getSource(), config.getSql(), plugin.getName());
+    return new ScanBatch(config, context, Collections.singletonList(reader).iterator());
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/e12cd470/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcDrel.java
----------------------------------------------------------------------
diff --git a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcDrel.java b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcDrel.java
new file mode 100644
index 0000000..52dd29f
--- /dev/null
+++ b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcDrel.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.jdbc;
+
+import java.util.List;
+
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.SingleRel;
+import org.apache.drill.common.logical.data.LogicalOperator;
+import org.apache.drill.exec.planner.logical.DrillImplementor;
+import org.apache.drill.exec.planner.logical.DrillRel;
+
+public class JdbcDrel extends SingleRel implements DrillRel {
+
+  public JdbcDrel(RelOptCluster cluster, RelTraitSet traits, RelNode child) {
+    super(cluster, traits, child);
+  }
+
+  @Override
+  public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) {
+    return new JdbcDrel(getCluster(), traitSet, inputs.iterator().next());
+  }
+
+  @Override
+  protected Object clone() throws CloneNotSupportedException {
+    return copy(getTraitSet(), getInputs());
+  }
+
+  @Override
+  public LogicalOperator implement(DrillImplementor implementor) {
+    throw new UnsupportedOperationException();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/e12cd470/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcExpressionCheck.java
----------------------------------------------------------------------
diff --git a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcExpressionCheck.java b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcExpressionCheck.java
new file mode 100644
index 0000000..2015a77
--- /dev/null
+++ b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcExpressionCheck.java
@@ -0,0 +1,119 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.jdbc;
+
+import org.apache.calcite.rex.RexCall;
+import org.apache.calcite.rex.RexCorrelVariable;
+import org.apache.calcite.rex.RexDynamicParam;
+import org.apache.calcite.rex.RexFieldAccess;
+import org.apache.calcite.rex.RexFieldCollation;
+import org.apache.calcite.rex.RexInputRef;
+import org.apache.calcite.rex.RexLiteral;
+import org.apache.calcite.rex.RexLocalRef;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.rex.RexOver;
+import org.apache.calcite.rex.RexRangeRef;
+import org.apache.calcite.rex.RexVisitor;
+import org.apache.calcite.rex.RexWindow;
+import org.apache.drill.exec.planner.sql.DrillSqlOperator;
+
+/**
+ * Visitor class that determines whether or not a particular RexNode expression tree contains only standard expressions.
+ * If RexNode tree contains Drill specific expressions, the tree will respond with false.
+ */
+class JdbcExpressionCheck implements RexVisitor<Boolean> {
+
+  private static final JdbcExpressionCheck INSTANCE = new JdbcExpressionCheck();
+
+  public static boolean isOnlyStandardExpressions(RexNode rex) {
+    return rex.accept(INSTANCE);
+  }
+
+  @Override
+  public Boolean visitInputRef(RexInputRef paramRexInputRef) {
+    return true;
+  }
+
+  @Override
+  public Boolean visitLocalRef(RexLocalRef paramRexLocalRef) {
+    return true;
+  }
+
+  @Override
+  public Boolean visitLiteral(RexLiteral paramRexLiteral) {
+    return true;
+  }
+
+  @Override
+  public Boolean visitCall(RexCall paramRexCall) {
+    if(paramRexCall.getOperator() instanceof DrillSqlOperator){
+      return false;
+    }else{
+      for (RexNode operand : paramRexCall.operands) {
+        if (!operand.accept(this)) {
+          return false;
+        }
+      }
+    }
+    return true;
+  }
+
+  public Boolean visitOver(RexOver over) {
+    if (!visitCall(over)) {
+      return false;
+    }
+    ;
+
+    final RexWindow window = over.getWindow();
+    for (RexFieldCollation orderKey : window.orderKeys) {
+      if (!((RexNode) orderKey.left).accept(this)) {
+        return false;
+      }
+    }
+
+    for (RexNode partitionKey : window.partitionKeys) {
+      if (!partitionKey.accept(this)) {
+        return false;
+      }
+    }
+
+    return true;
+
+  }
+
+  @Override
+  public Boolean visitCorrelVariable(RexCorrelVariable paramRexCorrelVariable) {
+    return true;
+  }
+
+  @Override
+  public Boolean visitDynamicParam(RexDynamicParam paramRexDynamicParam) {
+    return true;
+  }
+
+  @Override
+  public Boolean visitRangeRef(RexRangeRef paramRexRangeRef) {
+    return true;
+  }
+
+  @Override
+  public Boolean visitFieldAccess(RexFieldAccess paramRexFieldAccess) {
+    return paramRexFieldAccess.getReferenceExpr().accept(this);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/e12cd470/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcGroupScan.java
----------------------------------------------------------------------
diff --git a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcGroupScan.java b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcGroupScan.java
new file mode 100644
index 0000000..95b03cf
--- /dev/null
+++ b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcGroupScan.java
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.jdbc;
+
+import java.util.List;
+
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.logical.StoragePluginConfig;
+import org.apache.drill.exec.physical.PhysicalOperatorSetupException;
+import org.apache.drill.exec.physical.base.AbstractGroupScan;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.base.ScanStats;
+import org.apache.drill.exec.physical.base.ScanStats.GroupScanProperty;
+import org.apache.drill.exec.physical.base.SubScan;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+import org.apache.drill.exec.store.StoragePluginRegistry;
+
+import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+
+@JsonTypeName("jdbc-scan")
+public class JdbcGroupScan extends AbstractGroupScan {
+
+  private final String sql;
+  private final JdbcStoragePlugin plugin;
+  private final double rows;
+
+  @JsonCreator
+  public JdbcGroupScan(
+      @JsonProperty("sql") String sql,
+      @JsonProperty("config") StoragePluginConfig config,
+      @JsonProperty("rows") double rows,
+      @JacksonInject StoragePluginRegistry plugins) throws ExecutionSetupException {
+    super("");
+    this.sql = sql;
+    this.plugin = (JdbcStoragePlugin) plugins.getPlugin(config);
+    this.rows = rows;
+  }
+
+  JdbcGroupScan(String sql, JdbcStoragePlugin plugin, double rows) {
+    super("");
+    this.sql = sql;
+    this.plugin = plugin;
+    this.rows = rows;
+  }
+
+  @Override
+  public void applyAssignments(List<DrillbitEndpoint> endpoints) throws PhysicalOperatorSetupException {
+  }
+
+  @Override
+  public SubScan getSpecificScan(int minorFragmentId) throws ExecutionSetupException {
+    return new JdbcSubScan(sql, plugin);
+  }
+
+  @Override
+  public int getMaxParallelizationWidth() {
+    return 1;
+  }
+
+  @Override
+  public ScanStats getScanStats() {
+    return new ScanStats(
+        GroupScanProperty.NO_EXACT_ROW_COUNT,
+        (long) Math.max(rows, 1),
+        1,
+        1);
+  }
+
+  public String getSql() {
+    return sql;
+  }
+
+  @Override
+  public String getDigest() {
+    return sql + String.valueOf(plugin.getConfig());
+  }
+
+  public StoragePluginConfig getConfig() {
+    return plugin.getConfig();
+  }
+
+  @Override
+  public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) throws ExecutionSetupException {
+    return new JdbcGroupScan(sql, plugin, rows);
+  }
+
+
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/drill/blob/e12cd470/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcIntermediatePrel.java
----------------------------------------------------------------------
diff --git a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcIntermediatePrel.java b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcIntermediatePrel.java
new file mode 100644
index 0000000..0adb5e0
--- /dev/null
+++ b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcIntermediatePrel.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.jdbc;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.planner.physical.PhysicalPlanCreator;
+import org.apache.drill.exec.planner.physical.Prel;
+import org.apache.drill.exec.planner.physical.SinglePrel;
+import org.apache.drill.exec.planner.physical.visitor.PrelVisitor;
+import org.apache.drill.exec.planner.sql.handlers.PrelFinalizable;
+import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
+
+/**
+ * Prel used to represent a JDBC Conversion within an expression tree. This Prel will replaced with a full JdbcPrel
+ * before execution can happen.
+ */
+public class JdbcIntermediatePrel extends SinglePrel implements PrelFinalizable {
+
+  public JdbcIntermediatePrel(RelOptCluster cluster, RelTraitSet traits, RelNode child) {
+    super(cluster, traits, child);
+  }
+
+  @Override
+  public PhysicalOperator getPhysicalOperator(PhysicalPlanCreator creator) throws IOException {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) {
+    return new JdbcIntermediatePrel(getCluster(), traitSet, getInput());
+  }
+
+  @Override
+  protected Object clone() throws CloneNotSupportedException {
+    return copy(getTraitSet(), getInputs());
+  }
+
+  @Override
+  public SelectionVectorMode getEncoding() {
+    return SelectionVectorMode.NONE;
+  }
+
+  @Override
+  public Prel finalizeRel() {
+    return new JdbcPrel(getCluster(), getTraitSet(), this);
+  }
+
+  @Override
+  public <T, X, E extends Throwable> T accept(PrelVisitor<T, X, E> logicalVisitor, X value) throws E {
+    throw new UnsupportedOperationException("This needs to be finalized before using a PrelVisitor.");
+  }
+
+  @Override
+  public boolean needsFinalColumnReordering() {
+    return false;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/e12cd470/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcPrel.java
----------------------------------------------------------------------
diff --git a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcPrel.java b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcPrel.java
new file mode 100644
index 0000000..2433fbd
--- /dev/null
+++ b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcPrel.java
@@ -0,0 +1,120 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.jdbc;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Iterator;
+
+import org.apache.calcite.adapter.java.JavaTypeFactory;
+import org.apache.calcite.adapter.jdbc.JdbcImplementor;
+import org.apache.calcite.plan.ConventionTraitDef;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.plan.volcano.RelSubset;
+import org.apache.calcite.rel.AbstractRelNode;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.RelShuttleImpl;
+import org.apache.calcite.rel.RelWriter;
+import org.apache.calcite.sql.SqlDialect;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.planner.physical.PhysicalPlanCreator;
+import org.apache.drill.exec.planner.physical.Prel;
+import org.apache.drill.exec.planner.physical.visitor.PrelVisitor;
+import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
+import org.apache.drill.exec.store.jdbc.JdbcStoragePlugin.DrillJdbcConvention;
+
+/**
+ * Represents a JDBC Plan once the children nodes have been rewritten into SQL.
+ */
+public class JdbcPrel extends AbstractRelNode implements Prel {
+
+  private final String sql;
+  private final double rows;
+  private final DrillJdbcConvention convention;
+
+  public JdbcPrel(RelOptCluster cluster, RelTraitSet traitSet, JdbcIntermediatePrel prel) {
+    super(cluster, traitSet);
+    final RelNode input = prel.getInput();
+    rows = input.getRows();
+    convention = (DrillJdbcConvention) input.getTraitSet().getTrait(ConventionTraitDef.INSTANCE);
+
+    // generate sql for tree.
+    final SqlDialect dialect = convention.getPlugin().getDialect();
+    final JdbcImplementor jdbcImplementor = new JdbcImplementor(
+        dialect,
+        (JavaTypeFactory) getCluster().getTypeFactory());
+    final JdbcImplementor.Result result =
+        jdbcImplementor.visitChild(0, input.accept(new SubsetRemover()));
+    sql = result.asQuery().toSqlString(dialect).getSql();
+    rowType = input.getRowType();
+  }
+
+  private class SubsetRemover extends RelShuttleImpl {
+
+    @Override
+    public RelNode visit(RelNode other) {
+      if (other instanceof RelSubset) {
+        return ((RelSubset) other).getBest().accept(this);
+      } else {
+        return super.visit(other);
+      }
+    }
+
+  }
+
+  @Override
+  public PhysicalOperator getPhysicalOperator(PhysicalPlanCreator creator) throws IOException {
+    return new JdbcGroupScan(sql, convention.getPlugin(), rows);
+  }
+
+  @Override
+  public RelWriter explainTerms(RelWriter pw) {
+    return super.explainTerms(pw).item("sql", sql);
+  }
+
+  @Override
+  public double getRows() {
+    return rows;
+  }
+
+  @Override
+  public Iterator<Prel> iterator() {
+    return Collections.emptyIterator();
+  }
+
+  @Override
+  public <T, X, E extends Throwable> T accept(PrelVisitor<T, X, E> logicalVisitor, X value) throws E {
+    return logicalVisitor.visitPrel(this, value);
+  }
+
+  @Override
+  public SelectionVectorMode[] getSupportedEncodings() {
+    return SelectionVectorMode.DEFAULT;
+  }
+
+  @Override
+  public SelectionVectorMode getEncoding() {
+    return SelectionVectorMode.NONE;
+  }
+
+  @Override
+  public boolean needsFinalColumnReordering() {
+    return false;
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/e12cd470/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcRecordReader.java
----------------------------------------------------------------------
diff --git a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcRecordReader.java b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcRecordReader.java
new file mode 100755
index 0000000..69c45c2
--- /dev/null
+++ b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcRecordReader.java
@@ -0,0 +1,431 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.jdbc;
+
+import java.math.BigDecimal;
+import java.sql.Connection;
+import java.sql.Date;
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.sql.Time;
+import java.sql.Timestamp;
+
+import javax.sql.DataSource;
+
+import org.apache.drill.common.AutoCloseables;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.types.TypeProtos.MajorType;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.common.types.Types;
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.expr.TypeHelper;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.OperatorContext;
+import org.apache.drill.exec.physical.impl.OutputMutator;
+import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.store.AbstractRecordReader;
+import org.apache.drill.exec.vector.NullableBigIntVector;
+import org.apache.drill.exec.vector.NullableBitVector;
+import org.apache.drill.exec.vector.NullableDateVector;
+import org.apache.drill.exec.vector.NullableFloat4Vector;
+import org.apache.drill.exec.vector.NullableFloat8Vector;
+import org.apache.drill.exec.vector.NullableIntVector;
+import org.apache.drill.exec.vector.NullableTimeStampVector;
+import org.apache.drill.exec.vector.NullableTimeVector;
+import org.apache.drill.exec.vector.NullableVarBinaryVector;
+import org.apache.drill.exec.vector.NullableVarCharVector;
+import org.apache.drill.exec.vector.ValueVector;
+
+import com.google.common.base.Charsets;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+
+@SuppressWarnings("unchecked")
+class JdbcRecordReader extends AbstractRecordReader {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory
+      .getLogger(JdbcRecordReader.class);
+
+  private static final ImmutableMap<Integer, MinorType> JDBC_TYPE_MAPPINGS;
+  private final DataSource source;
+  private ResultSet resultSet;
+  private final String storagePluginName;
+  private FragmentContext fragmentContext;
+  private Connection connection;
+  private Statement statement;
+  private final String sql;
+  private ImmutableList<ValueVector> vectors;
+  private ImmutableList<Copier<?>> copiers;
+
+  private OperatorContext operatorContext;
+
+  public JdbcRecordReader(FragmentContext fragmentContext, DataSource source, String sql, String storagePluginName) {
+    this.fragmentContext = fragmentContext;
+    this.source = source;
+    this.sql = sql;
+    this.storagePluginName = storagePluginName;
+  }
+
+  static {
+    JDBC_TYPE_MAPPINGS = (ImmutableMap<Integer, MinorType>) (Object) ImmutableMap.builder()
+        .put(java.sql.Types.DOUBLE, MinorType.FLOAT8)
+        .put(java.sql.Types.FLOAT, MinorType.FLOAT4)
+        .put(java.sql.Types.TINYINT, MinorType.INT)
+        .put(java.sql.Types.SMALLINT, MinorType.INT)
+        .put(java.sql.Types.INTEGER, MinorType.INT)
+        .put(java.sql.Types.BIGINT, MinorType.BIGINT)
+
+        .put(java.sql.Types.CHAR, MinorType.VARCHAR)
+        .put(java.sql.Types.VARCHAR, MinorType.VARCHAR)
+        .put(java.sql.Types.LONGVARCHAR, MinorType.VARCHAR)
+
+        .put(java.sql.Types.NCHAR, MinorType.VARCHAR)
+        .put(java.sql.Types.NVARCHAR, MinorType.VARCHAR)
+        .put(java.sql.Types.LONGNVARCHAR, MinorType.VARCHAR)
+
+        .put(java.sql.Types.VARBINARY, MinorType.VARBINARY)
+        .put(java.sql.Types.LONGVARBINARY, MinorType.VARBINARY)
+
+        .put(java.sql.Types.NUMERIC, MinorType.FLOAT8)
+        .put(java.sql.Types.DECIMAL, MinorType.FLOAT8)
+        .put(java.sql.Types.REAL, MinorType.FLOAT8)
+
+        .put(java.sql.Types.DATE, MinorType.DATE)
+        .put(java.sql.Types.TIME, MinorType.TIME)
+        .put(java.sql.Types.TIMESTAMP, MinorType.TIMESTAMP)
+
+        .put(java.sql.Types.BOOLEAN, MinorType.BIT)
+
+        .build();
+  }
+
+  private Copier<?> getCopier(int jdbcType, int offset, ResultSet result, ValueVector v) {
+
+    if (v instanceof NullableBigIntVector) {
+      return new BigIntCopier(offset, result, (NullableBigIntVector.Mutator) v.getMutator());
+    } else if (v instanceof NullableFloat4Vector) {
+      return new Float4Copier(offset, result, (NullableFloat4Vector.Mutator) v.getMutator());
+    } else if (v instanceof NullableFloat8Vector) {
+      return new Float8Copier(offset, result, (NullableFloat8Vector.Mutator) v.getMutator());
+    } else if (v instanceof NullableIntVector) {
+      return new IntCopier(offset, result, (NullableIntVector.Mutator) v.getMutator());
+    } else if (v instanceof NullableVarCharVector) {
+      return new VarCharCopier(offset, result, (NullableVarCharVector.Mutator) v.getMutator());
+    } else if (v instanceof NullableVarBinaryVector) {
+      return new VarBinaryCopier(offset, result, (NullableVarBinaryVector.Mutator) v.getMutator());
+    } else if (v instanceof NullableDateVector) {
+      return new DateCopier(offset, result, (NullableDateVector.Mutator) v.getMutator());
+    } else if (v instanceof NullableTimeVector) {
+      return new TimeCopier(offset, result, (NullableTimeVector.Mutator) v.getMutator());
+    } else if (v instanceof NullableTimeStampVector) {
+      return new TimeStampCopier(offset, result, (NullableTimeStampVector.Mutator) v.getMutator());
+    } else if (v instanceof NullableBitVector) {
+      return new BitCopier(offset, result, (NullableBitVector.Mutator) v.getMutator());
+    }
+
+    throw new IllegalArgumentException("Unknown how to handle vector.");
+  }
+
+  @Override
+  public void setup(OperatorContext operatorContext, OutputMutator output) throws ExecutionSetupException {
+    try {
+
+      this.operatorContext = operatorContext;
+      connection = source.getConnection();
+      statement = connection.createStatement();
+      resultSet = statement.executeQuery(sql);
+
+      final ResultSetMetaData meta = resultSet.getMetaData();
+      final int columns = meta.getColumnCount();
+      ImmutableList.Builder<ValueVector> vectorBuilder = ImmutableList.builder();
+      ImmutableList.Builder<Copier<?>> copierBuilder = ImmutableList.builder();
+
+      for (int i = 1; i <= columns; i++) {
+        final String name = meta.getColumnLabel(i);
+        final int jdbcType = meta.getColumnType(i);
+        final int width = meta.getPrecision(i);
+        final int scale = meta.getScale(i);
+        MinorType minorType = JDBC_TYPE_MAPPINGS.get(jdbcType);
+        if (minorType == null) {
+          throw UserException.dataReadError()
+              .message("The JDBC storage plugin failed while trying to execute a query. "
+                  + "The JDBC data type %d is not currently supported.", jdbcType)
+
+              .addContext("sql", sql)
+              .addContext("plugin", storagePluginName)
+              .build(logger);
+        }
+
+        final MajorType type = Types.optional(minorType);
+        final MaterializedField field = MaterializedField.create(name, type);
+        final Class<? extends ValueVector> clazz = (Class<? extends ValueVector>) TypeHelper.getValueVectorClass(
+            minorType, type.getMode());
+        ValueVector vector = output.addField(field, clazz);
+        vectorBuilder.add(vector);
+        copierBuilder.add(getCopier(jdbcType, i, resultSet, vector));
+
+      }
+
+      vectors = vectorBuilder.build();
+      copiers = copierBuilder.build();
+
+    } catch (SQLException | SchemaChangeException e) {
+      throw UserException.dataReadError(e)
+          .message("The JDBC storage plugin failed while trying setup the SQL query. ")
+          .addContext("sql", sql)
+          .addContext("plugin", storagePluginName)
+          .build(logger);
+    }
+  }
+
+
+  @Override
+  public int next() {
+    int counter = 0;
+    Boolean b = true;
+    try {
+      while (counter < 4095 && b == true) { // loop at 4095 since nullables use one more than record count and we
+                                            // allocate on powers of two.
+        b = resultSet.next();
+        if(b == false) {
+            break;
+        }
+        for (Copier<?> c : copiers) {
+          c.copy(counter);
+        }
+        counter++;
+      }
+    } catch (SQLException e) {
+      throw UserException
+          .dataReadError(e)
+          .message("Failure while attempting to read from database.")
+          .addContext("sql", sql)
+          .addContext("plugin", storagePluginName)
+          .build(logger);
+    }
+
+    for (ValueVector vv : vectors) {
+      vv.getMutator().setValueCount(counter > 0 ? counter : 0);
+    }
+
+    return counter>0 ? counter : 0;
+  }
+
+  @Override
+  public void cleanup() {
+    AutoCloseables.close(resultSet, logger);
+    AutoCloseables.close(statement, logger);
+    AutoCloseables.close(connection, logger);
+  }
+
+  private abstract class Copier<T extends ValueVector.Mutator> {
+    protected final int columnIndex;
+    protected final ResultSet result;
+    protected final T mutator;
+
+    public Copier(int columnIndex, ResultSet result, T mutator) {
+      super();
+      this.columnIndex = columnIndex;
+      this.result = result;
+      this.mutator = mutator;
+    }
+
+    abstract void copy(int index) throws SQLException;
+  }
+
+  private class IntCopier extends Copier<NullableIntVector.Mutator> {
+    public IntCopier(int offset, ResultSet set, NullableIntVector.Mutator mutator) {
+      super(offset, set, mutator);
+    }
+
+    @Override
+    void copy(int index) throws SQLException {
+      mutator.setSafe(index, result.getInt(columnIndex));
+      if (result.wasNull()) {
+        mutator.setNull(index);
+      }
+    }
+  }
+
+  private class BigIntCopier extends Copier<NullableBigIntVector.Mutator> {
+    public BigIntCopier(int offset, ResultSet set, NullableBigIntVector.Mutator mutator) {
+      super(offset, set, mutator);
+    }
+
+    @Override
+    void copy(int index) throws SQLException {
+      mutator.setSafe(index, result.getLong(columnIndex));
+      if (result.wasNull()) {
+        mutator.setNull(index);
+      }
+    }
+
+  }
+
+  private class Float4Copier extends Copier<NullableFloat4Vector.Mutator> {
+
+    public Float4Copier(int columnIndex, ResultSet result, NullableFloat4Vector.Mutator mutator) {
+      super(columnIndex, result, mutator);
+    }
+
+    @Override
+    void copy(int index) throws SQLException {
+      mutator.setSafe(index, result.getFloat(columnIndex));
+      if (result.wasNull()) {
+        mutator.setNull(index);
+      }
+    }
+
+  }
+
+
+  private class Float8Copier extends Copier<NullableFloat8Vector.Mutator> {
+
+    public Float8Copier(int columnIndex, ResultSet result, NullableFloat8Vector.Mutator mutator) {
+      super(columnIndex, result, mutator);
+    }
+
+    @Override
+    void copy(int index) throws SQLException {
+      mutator.setSafe(index, result.getDouble(columnIndex));
+      if (result.wasNull()) {
+        mutator.setNull(index);
+      }
+
+    }
+
+  }
+
+  private class DecimalCopier extends Copier<NullableFloat8Vector.Mutator> {
+
+    public DecimalCopier(int columnIndex, ResultSet result, NullableFloat8Vector.Mutator mutator) {
+      super(columnIndex, result, mutator);
+    }
+
+    @Override
+    void copy(int index) throws SQLException {
+      BigDecimal decimal = result.getBigDecimal(columnIndex);
+      if (decimal != null) {
+        mutator.setSafe(index, decimal.doubleValue());
+      }
+    }
+
+  }
+
+  private class VarCharCopier extends Copier<NullableVarCharVector.Mutator> {
+
+    public VarCharCopier(int columnIndex, ResultSet result, NullableVarCharVector.Mutator mutator) {
+      super(columnIndex, result, mutator);
+    }
+
+    @Override
+    void copy(int index) throws SQLException {
+      String val = resultSet.getString(columnIndex);
+      if (val != null) {
+        byte[] record = val.getBytes(Charsets.UTF_8);
+        mutator.setSafe(index, record, 0, record.length);
+      }
+    }
+
+  }
+
+  private class VarBinaryCopier extends Copier<NullableVarBinaryVector.Mutator> {
+
+    public VarBinaryCopier(int columnIndex, ResultSet result, NullableVarBinaryVector.Mutator mutator) {
+      super(columnIndex, result, mutator);
+    }
+
+    @Override
+    void copy(int index) throws SQLException {
+      byte[] record = result.getBytes(columnIndex);
+      if (record != null) {
+        mutator.setSafe(index, record, 0, record.length);
+      }
+    }
+
+  }
+
+  private class DateCopier extends Copier<NullableDateVector.Mutator> {
+
+    public DateCopier(int columnIndex, ResultSet result, NullableDateVector.Mutator mutator) {
+      super(columnIndex, result, mutator);
+    }
+
+    @Override
+    void copy(int index) throws SQLException {
+      Date date = result.getDate(columnIndex);
+      if (date != null) {
+        mutator.setSafe(index, date.getTime());
+      }
+    }
+
+  }
+
+  private class TimeCopier extends Copier<NullableTimeVector.Mutator> {
+
+    public TimeCopier(int columnIndex, ResultSet result, NullableTimeVector.Mutator mutator) {
+      super(columnIndex, result, mutator);
+    }
+
+    @Override
+    void copy(int index) throws SQLException {
+      Time time = result.getTime(columnIndex);
+      if (time != null) {
+        mutator.setSafe(index, (int) time.getTime());
+      }
+
+    }
+
+  }
+
+  private class TimeStampCopier extends Copier<NullableTimeStampVector.Mutator> {
+
+    public TimeStampCopier(int columnIndex, ResultSet result, NullableTimeStampVector.Mutator mutator) {
+      super(columnIndex, result, mutator);
+    }
+
+    @Override
+    void copy(int index) throws SQLException {
+      Timestamp stamp = result.getTimestamp(columnIndex);
+      if (stamp != null) {
+        mutator.setSafe(index, stamp.getTime());
+      }
+
+    }
+
+  }
+
+  private class BitCopier extends Copier<NullableBitVector.Mutator> {
+
+    public BitCopier(int columnIndex, ResultSet result, NullableBitVector.Mutator mutator) {
+      super(columnIndex, result, mutator);
+    }
+
+    @Override
+    void copy(int index) throws SQLException {
+      mutator.setSafe(index, result.getBoolean(columnIndex) ? 1 : 0);
+      if (result.wasNull()) {
+        mutator.setNull(index);
+      }
+    }
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/e12cd470/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcStorageConfig.java
----------------------------------------------------------------------
diff --git a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcStorageConfig.java b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcStorageConfig.java
new file mode 100755
index 0000000..5a921d4
--- /dev/null
+++ b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcStorageConfig.java
@@ -0,0 +1,120 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.jdbc;
+
+import org.apache.drill.common.logical.StoragePluginConfig;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+
+@JsonTypeName(JdbcStorageConfig.NAME)
+public class JdbcStorageConfig extends StoragePluginConfig {
+
+  public static final String NAME = "jdbc";
+
+  private final String driver;
+  private final String url;
+  private final String username;
+  private final String password;
+
+  @JsonCreator
+  public JdbcStorageConfig(
+      @JsonProperty("driver") String driver,
+      @JsonProperty("url") String url,
+      @JsonProperty("username") String username,
+      @JsonProperty("password") String password) {
+    super();
+    this.driver = driver;
+    this.url = url;
+    this.username = username;
+    this.password = password;
+  }
+
+  public String getDriver() {
+    return driver;
+  }
+
+  public String getUrl() {
+    return url;
+  }
+
+  public String getUsername() {
+    return username;
+  }
+
+  public String getPassword() {
+    return password;
+  }
+
+  @Override
+  public int hashCode() {
+    final int prime = 31;
+    int result = 1;
+    result = prime * result + ((driver == null) ? 0 : driver.hashCode());
+    result = prime * result + ((password == null) ? 0 : password.hashCode());
+    result = prime * result + ((url == null) ? 0 : url.hashCode());
+    result = prime * result + ((username == null) ? 0 : username.hashCode());
+    return result;
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (this == obj) {
+      return true;
+    }
+    if (obj == null) {
+      return false;
+    }
+    if (getClass() != obj.getClass()) {
+      return false;
+    }
+    JdbcStorageConfig other = (JdbcStorageConfig) obj;
+    if (driver == null) {
+      if (other.driver != null) {
+        return false;
+      }
+    } else if (!driver.equals(other.driver)) {
+      return false;
+    }
+    if (password == null) {
+      if (other.password != null) {
+        return false;
+      }
+    } else if (!password.equals(other.password)) {
+      return false;
+    }
+    if (url == null) {
+      if (other.url != null) {
+        return false;
+      }
+    } else if (!url.equals(other.url)) {
+      return false;
+    }
+    if (username == null) {
+      if (other.username != null) {
+        return false;
+      }
+    } else if (!username.equals(other.username)) {
+      return false;
+    }
+    return true;
+  }
+
+
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/e12cd470/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcStoragePlugin.java
----------------------------------------------------------------------
diff --git a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcStoragePlugin.java b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcStoragePlugin.java
new file mode 100755
index 0000000..f27f6f1
--- /dev/null
+++ b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcStoragePlugin.java
@@ -0,0 +1,400 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.jdbc;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import javax.sql.DataSource;
+
+import org.apache.calcite.adapter.jdbc.JdbcConvention;
+import org.apache.calcite.adapter.jdbc.JdbcRules;
+import org.apache.calcite.adapter.jdbc.JdbcRules.JdbcJoin;
+import org.apache.calcite.adapter.jdbc.JdbcSchema;
+import org.apache.calcite.linq4j.tree.ConstantUntypedNull;
+import org.apache.calcite.plan.RelOptPlanner;
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.convert.ConverterRule;
+import org.apache.calcite.rel.rules.FilterSetOpTransposeRule;
+import org.apache.calcite.rel.rules.ProjectRemoveRule;
+import org.apache.calcite.rex.RexCall;
+import org.apache.calcite.rex.RexInputRef;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.schema.Function;
+import org.apache.calcite.schema.Schema;
+import org.apache.calcite.schema.SchemaPlus;
+import org.apache.calcite.schema.Table;
+import org.apache.calcite.sql.SqlDialect;
+import org.apache.commons.dbcp.BasicDataSource;
+import org.apache.drill.common.JSONOptions;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.ops.OptimizerRulesContext;
+import org.apache.drill.exec.physical.base.AbstractGroupScan;
+import org.apache.drill.exec.planner.logical.DrillRel;
+import org.apache.drill.exec.planner.physical.Prel;
+import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.store.AbstractSchema;
+import org.apache.drill.exec.store.AbstractStoragePlugin;
+import org.apache.drill.exec.store.SchemaConfig;
+import org.apache.drill.exec.store.jdbc.DrillJdbcRuleBase.DrillJdbcFilterRule;
+import org.apache.drill.exec.store.jdbc.DrillJdbcRuleBase.DrillJdbcProjectRule;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Maps;
+
+public class JdbcStoragePlugin extends AbstractStoragePlugin {
+  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(JdbcStoragePlugin.class);
+
+  // Rules from Calcite's JdbcRules class that we want to avoid using.
+  private static String[] RULES_TO_AVOID = {
+      "JdbcToEnumerableConverterRule", "JdbcFilterRule", "JdbcProjectRule"
+  };
+
+
+  private final JdbcStorageConfig config;
+  private final DrillbitContext context;
+  private final DataSource source;
+  private final String name;
+  private final SqlDialect dialect;
+  private final DrillJdbcConvention convention;
+
+
+  public JdbcStoragePlugin(JdbcStorageConfig config, DrillbitContext context, String name) {
+    this.context = context;
+    this.config = config;
+    this.name = name;
+    BasicDataSource source = new BasicDataSource();
+    source.setDriverClassName(config.getDriver());
+    source.setUrl(config.getUrl());
+
+    if (config.getUsername() != null) {
+      source.setUsername(config.getUsername());
+    }
+
+    if (config.getPassword() != null) {
+      source.setPassword(config.getPassword());
+    }
+
+    this.source = source;
+    this.dialect = JdbcSchema.createDialect(source);
+    this.convention = new DrillJdbcConvention(dialect, name);
+  }
+
+
+  class DrillJdbcConvention extends JdbcConvention {
+
+    private final ImmutableSet<RelOptRule> rules;
+
+    public DrillJdbcConvention(SqlDialect dialect, String name) {
+      super(dialect, ConstantUntypedNull.INSTANCE, name);
+
+
+      // build rules for this convention.
+      ImmutableSet.Builder<RelOptRule> builder = ImmutableSet.builder();
+
+      builder.add(JDBC_PRULE_INSTANCE);
+      builder.add(new JdbcDrelConverterRule(this));
+      builder.add(new DrillJdbcProjectRule(this));
+      builder.add(new DrillJdbcFilterRule(this));
+
+      outside: for (RelOptRule rule : JdbcRules.rules(this)) {
+        final String description = rule.toString();
+
+        // we want to black list some rules but the parent Calcite package is all or none.
+        // Therefore, we remove rules with names we don't like.
+        for(String black : RULES_TO_AVOID){
+          if(description.equals(black)){
+            continue outside;
+          }
+
+        }
+
+        builder.add(rule);
+      }
+
+      builder.add(FilterSetOpTransposeRule.INSTANCE);
+      builder.add(ProjectRemoveRule.INSTANCE);
+
+      rules = builder.build();
+    }
+
+    @Override
+    public void register(RelOptPlanner planner) {
+      for (RelOptRule rule : rules) {
+        planner.addRule(rule);
+      }
+    }
+
+    public Set<RelOptRule> getRules() {
+      return rules;
+    }
+
+    public JdbcStoragePlugin getPlugin() {
+      return JdbcStoragePlugin.this;
+    }
+  }
+
+  /**
+   * Returns whether a condition is supported by {@link JdbcJoin}.
+   *
+   * <p>Corresponds to the capabilities of
+   * {@link JdbcJoin#convertConditionToSqlNode}.
+   *
+   * @param node Condition
+   * @return Whether condition is supported
+   */
+  private static boolean canJoinOnCondition(RexNode node) {
+    final List<RexNode> operands;
+    switch (node.getKind()) {
+    case AND:
+    case OR:
+      operands = ((RexCall) node).getOperands();
+      for (RexNode operand : operands) {
+        if (!canJoinOnCondition(operand)) {
+          return false;
+        }
+      }
+      return true;
+
+    case EQUALS:
+    case IS_NOT_DISTINCT_FROM:
+    case NOT_EQUALS:
+    case GREATER_THAN:
+    case GREATER_THAN_OR_EQUAL:
+    case LESS_THAN:
+    case LESS_THAN_OR_EQUAL:
+      operands = ((RexCall) node).getOperands();
+      if ((operands.get(0) instanceof RexInputRef)
+          && (operands.get(1) instanceof RexInputRef)) {
+        return true;
+      }
+      // fall through
+
+    default:
+      return false;
+    }
+  }
+
+
+  private static final JdbcPrule JDBC_PRULE_INSTANCE = new JdbcPrule();
+
+  private static class JdbcPrule extends ConverterRule {
+
+    private JdbcPrule() {
+      super(JdbcDrel.class, DrillRel.DRILL_LOGICAL, Prel.DRILL_PHYSICAL, "JDBC_PREL_Converter");
+    }
+
+    @Override
+    public RelNode convert(RelNode in) {
+
+      return new JdbcIntermediatePrel(
+          in.getCluster(),
+          in.getTraitSet().replace(getOutTrait()),
+          in.getInput(0));
+    }
+
+  }
+
+  private class JdbcDrelConverterRule extends ConverterRule {
+
+    public JdbcDrelConverterRule(DrillJdbcConvention in) {
+      super(RelNode.class, in, DrillRel.DRILL_LOGICAL, "JDBC_DREL_Converter" + in.getName());
+    }
+
+    @Override
+    public RelNode convert(RelNode in) {
+      return new JdbcDrel(in.getCluster(), in.getTraitSet().replace(DrillRel.DRILL_LOGICAL),
+          convert(in, in.getTraitSet().replace(this.getInTrait())));
+    }
+
+  }
+
+  private class CapitalizingJdbcSchema extends AbstractSchema {
+
+    private final JdbcSchema inner;
+
+    public CapitalizingJdbcSchema(List<String> parentSchemaPath, String name, DataSource dataSource,
+        SqlDialect dialect, JdbcConvention convention, String catalog, String schema) {
+      super(parentSchemaPath, name);
+      inner = new JdbcSchema(dataSource, dialect, convention, catalog, schema);
+    }
+
+    @Override
+    public String getTypeName() {
+      return JdbcStorageConfig.NAME;
+    }
+
+    @Override
+    public Collection<Function> getFunctions(String name) {
+      return inner.getFunctions(name);
+    }
+
+    @Override
+    public Set<String> getFunctionNames() {
+      return inner.getFunctionNames();
+    }
+
+    @Override
+    public Schema getSubSchema(String name) {
+      return inner.getSubSchema(name);
+    }
+
+    @Override
+    public Set<String> getSubSchemaNames() {
+      return inner.getSubSchemaNames();
+    }
+
+    @Override
+    public Set<String> getTableNames() {
+      return inner.getTableNames();
+    }
+
+    @Override
+    public Table getTable(String name) {
+      Table table = inner.getTable(name);
+      if (table != null) {
+        return table;
+      }
+      return inner.getTable(name.toUpperCase());
+
+    }
+
+  }
+
+  private class JdbcCatalogSchema extends AbstractSchema {
+
+    private final Map<String, CapitalizingJdbcSchema> schemaMap = Maps.newHashMap();
+    private final CapitalizingJdbcSchema defaultSchema;
+
+    public JdbcCatalogSchema(String name) {
+      super(ImmutableList.<String> of(), name);
+
+      try (Connection con = source.getConnection(); ResultSet set = con.getMetaData().getCatalogs()) {
+        while (set.next()) {
+          final String catalogName = set.getString(1);
+          CapitalizingJdbcSchema schema = new CapitalizingJdbcSchema(getSchemaPath(), catalogName, source, dialect,
+              convention, catalogName, null);
+          schemaMap.put(catalogName, schema);
+        }
+      } catch (SQLException e) {
+        logger.warn("Failure while attempting to load JDBC schema.", e);
+      }
+
+      // unable to read general catalog
+      if (schemaMap.isEmpty()) {
+        schemaMap.put("default", new CapitalizingJdbcSchema(ImmutableList.<String> of(), name, source, dialect,
+            convention,
+            null, null));
+      }
+
+      defaultSchema = schemaMap.values().iterator().next();
+
+    }
+
+    @Override
+    public String getTypeName() {
+      return JdbcStorageConfig.NAME;
+    }
+
+    @Override
+    public Schema getDefaultSchema() {
+      return defaultSchema;
+    }
+
+    @Override
+    public Schema getSubSchema(String name) {
+      return schemaMap.get(name);
+    }
+
+    @Override
+    public Set<String> getSubSchemaNames() {
+      return schemaMap.keySet();
+    }
+
+    @Override
+    public Table getTable(String name) {
+      Schema schema = getDefaultSchema();
+      if (schema != null) {
+        Table t = schema.getTable(name);
+        if (t != null) {
+          return t;
+        }
+        return schema.getTable(name.toUpperCase());
+      } else {
+        return null;
+      }
+    }
+
+    @Override
+    public Set<String> getTableNames() {
+      return defaultSchema.getTableNames();
+    }
+
+  }
+
+  @Override
+  public void registerSchemas(SchemaConfig config, SchemaPlus parent) {
+    JdbcCatalogSchema schema = new JdbcCatalogSchema(name);
+    parent.add(name, schema);
+  }
+
+  @Override
+  public JdbcStorageConfig getConfig() {
+    return config;
+  }
+
+  public DrillbitContext getContext() {
+    return this.context;
+  }
+
+  public String getName() {
+    return this.name;
+  }
+
+  @Override
+  public boolean supportsRead() {
+    return true;
+  }
+
+  public DataSource getSource() {
+    return source;
+  }
+
+  public SqlDialect getDialect() {
+    return dialect;
+  }
+
+  @Override
+  public AbstractGroupScan getPhysicalScan(String userName, JSONOptions selection, List<SchemaPath> columns)
+      throws IOException {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public Set<RelOptRule> getOptimizerRules(OptimizerRulesContext context) {
+    return convention.getRules();
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/e12cd470/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcSubScan.java
----------------------------------------------------------------------
diff --git a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcSubScan.java b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcSubScan.java
new file mode 100755
index 0000000..fcafd4c
--- /dev/null
+++ b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcSubScan.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.jdbc;
+
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.logical.StoragePluginConfig;
+import org.apache.drill.exec.physical.base.AbstractSubScan;
+import org.apache.drill.exec.store.StoragePluginRegistry;
+
+import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+
+@JsonTypeName("jdbc-sub-scan")
+public class JdbcSubScan extends AbstractSubScan {
+
+  private final String sql;
+  private final JdbcStoragePlugin plugin;
+
+  @JsonCreator
+  public JdbcSubScan(
+      @JsonProperty("sql") String sql,
+      @JsonProperty("config") StoragePluginConfig config,
+      @JacksonInject StoragePluginRegistry plugins) throws ExecutionSetupException {
+    super("");
+    this.sql = sql;
+    this.plugin = (JdbcStoragePlugin) plugins.getPlugin(config);
+  }
+
+  JdbcSubScan(String sql, JdbcStoragePlugin plugin) {
+    super("");
+    this.sql = sql;
+    this.plugin = plugin;
+  }
+
+  @Override
+  public int getOperatorType() {
+    return -1;
+  }
+
+  public String getSql() {
+    return sql;
+  }
+
+  public StoragePluginConfig getConfig() {
+    return plugin.getConfig();
+  }
+
+  @JsonIgnore
+  public JdbcStoragePlugin getPlugin() {
+    return plugin;
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/drill/blob/e12cd470/contrib/storage-jdbc/src/main/resources/bootstrap-storage-plugins.json
----------------------------------------------------------------------
diff --git a/contrib/storage-jdbc/src/main/resources/bootstrap-storage-plugins.json b/contrib/storage-jdbc/src/main/resources/bootstrap-storage-plugins.json
new file mode 100755
index 0000000..7d88052
--- /dev/null
+++ b/contrib/storage-jdbc/src/main/resources/bootstrap-storage-plugins.json
@@ -0,0 +1,10 @@
+{
+  "storage":{
+    "jdbc" : {
+      type:"jdbc",
+      enabled: false,
+      driver:"org.apache.derby.jdbc.ClientDriver",
+      url:"jdbc:derby://localhost:20000/memory:testDB;"
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/e12cd470/contrib/storage-jdbc/src/main/resources/drill-module.conf
----------------------------------------------------------------------
diff --git a/contrib/storage-jdbc/src/main/resources/drill-module.conf b/contrib/storage-jdbc/src/main/resources/drill-module.conf
new file mode 100755
index 0000000..721a599
--- /dev/null
+++ b/contrib/storage-jdbc/src/main/resources/drill-module.conf
@@ -0,0 +1,18 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+//  This file tells Drill to consider this module when class path scanning.
+//  This file can also include any supplementary configuration information.
+//  This file is in HOCON format, see https://github.com/typesafehub/config/blob/master/HOCON.md for more information.

http://git-wip-us.apache.org/repos/asf/drill/blob/e12cd470/contrib/storage-jdbc/src/test/java/org/apache/drill/exec/store/jdbc/TestJdbcPlugin.java
----------------------------------------------------------------------
diff --git a/contrib/storage-jdbc/src/test/java/org/apache/drill/exec/store/jdbc/TestJdbcPlugin.java b/contrib/storage-jdbc/src/test/java/org/apache/drill/exec/store/jdbc/TestJdbcPlugin.java
new file mode 100644
index 0000000..1f15068
--- /dev/null
+++ b/contrib/storage-jdbc/src/test/java/org/apache/drill/exec/store/jdbc/TestJdbcPlugin.java
@@ -0,0 +1,181 @@
+package org.apache.drill.exec.store.jdbc;
+
+import static org.junit.Assert.assertEquals;
+
+import java.net.InetAddress;
+import java.sql.Connection;
+
+import org.apache.commons.dbcp.BasicDataSource;
+import org.apache.derby.drda.NetworkServerControl;
+import org.apache.drill.BaseTestQuery;
+import org.apache.drill.PlanTestBase;
+import org.apache.drill.exec.proto.UserBitShared.QueryType;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+public class TestJdbcPlugin extends PlanTestBase {
+
+  static NetworkServerControl server;
+
+  @BeforeClass
+  public static void setupDefaultTestCluster() throws Exception {
+    System.setProperty("derby.drda.startNetworkServer", "true");
+    server = new NetworkServerControl(InetAddress.getByName("localhost"),
+        20000,
+        "admin",
+        "admin");
+    java.io.PrintWriter consoleWriter = new java.io.PrintWriter(System.out, true);
+    server.start(consoleWriter);
+
+    BasicDataSource source = new BasicDataSource();
+    source.setUrl("jdbc:derby://localhost:20000/memory:testDB;create=true");
+    source.setDriverClassName("org.apache.derby.jdbc.ClientDriver");
+
+    final String insertValues1 = "INSERT INTO person VALUES (1, 'Smith', null, '{number:\"123 Main\"}','mtrx', "
+        + "'xy', 333.333, 444.444, 555.00, TIME('15:09:02'), DATE('1994-02-23'), TIMESTAMP('1962-09-23 03:23:34.234'),"
+        + " 666.66, 1, -1, false)";
+    final String insertValues2 = "INSERT INTO person (PersonId) VALUES (null)";
+    try (Connection c = source.getConnection()) {
+      c.createStatement().execute("CREATE TABLE person\n" +
+          "(\n" +
+          "PersonID int,\n" +
+          "LastName varchar(255),\n" +
+          "FirstName varchar(255),\n" +
+          "Address varchar(255),\n" +
+          "City varchar(255),\n" +
+          "Code char(2),\n" +
+          "dbl double,\n" +
+          "flt float,\n" +
+          "rel real,\n" +
+          "tm time,\n" +
+          "dt date,\n" +
+          "tms timestamp,\n" +
+          "num numeric(10,2), \n" +
+          "sm smallint,\n" +
+          "bi bigint,\n" +
+          "bool boolean\n" +
+
+          ")");
+
+      c.createStatement().execute(insertValues1);
+      c.createStatement().execute(insertValues2);
+      c.createStatement().execute(insertValues1);
+    }
+
+    BaseTestQuery.setupDefaultTestCluster();
+  }
+
+  @AfterClass
+  public static void shutdownDb() throws Exception {
+    server.shutdown();
+  }
+
+  @Test
+  public void validateResult() throws Exception {
+    // we'll test data except for date, time and timestamps. Derby mangles these due to improper timezone support.
+    testBuilder()
+        .sqlQuery(
+            "select PERSONID, LASTNAME, FIRSTNAME, ADDRESS, CITY, CODE, DBL, FLT, REL, NUM, SM, BI, BOOL from testdb.`default`.PERSON")
+        .ordered()
+        .baselineColumns("PERSONID", "LASTNAME", "FIRSTNAME", "ADDRESS", "CITY", "CODE", "DBL", "FLT", "REL",
+            "NUM", "SM", "BI", "BOOL")
+        .baselineValues(1, "Smith", null, "{number:\"123 Main\"}", "mtrx", "xy", 333.333, 444.444, 555.00,
+            666.66, 1, -1l, false)
+        .baselineValues(null, null, null, null, null, null, null, null, null, null, null, null, null)
+        .baselineValues(1, "Smith", null, "{number:\"123 Main\"}", "mtrx", "xy", 333.333, 444.444, 555.00,
+            666.66, 1, -1l, false)
+        .build().run();
+  }
+
+  @Test
+  public void queryDefaultSchema() throws Exception {
+    testNoResult("select * from testdb.PERSON");
+  }
+
+  @Test
+  public void queryDifferentCase() throws Exception {
+    testNoResult("select * from testdb.person");
+  }
+
+  @Test
+  public void pushdownJoin() throws Exception {
+    testNoResult("use testdb");
+    String query = "select x.PersonId from (select PersonId from person)x "
+        + "join (select PersonId from person)y on x.PersonId = y.PersonId ";
+    testPlanMatchingPatterns(query, new String[] {}, new String[] { "Join" });
+
+  }
+
+  @Test
+  public void pushdownJoinAndFilterPushDown() throws Exception {
+    final String query = "select * from \n" +
+        "testdb.`default`.PERSON e\n" +
+        "INNER JOIN \n" +
+        "testdb.`default`.PERSON s\n" +
+        "ON e.FirstName = s.FirstName\n" +
+        "WHERE e.LastName > 'hello'";
+
+    testPlanMatchingPatterns(query, new String[] {}, new String[] { "Join", "Filter" });
+  }
+
+  @Test
+  public void pushdownAggregation() throws Exception {
+    final String query = "select count(*) from \n" +
+        "testdb.`default`.PERSON";
+
+    testPlanMatchingPatterns(query, new String[] {}, new String[] { "Aggregate" });
+  }
+
+  @Test
+  public void pushdownDoubleJoinAndFilter() throws Exception {
+    final String query = "select * from \n" +
+        "testdb.`default`.PERSON e\n" +
+        "INNER JOIN \n" +
+        "testdb.`default`.PERSON s\n" +
+        "ON e.PersonId = s.PersonId\n" +
+        "INNER JOIN \n" +
+        "testdb.`default`.PERSON ed\n" +
+        "ON e.PersonId = ed.PersonId\n" +
+        "WHERE s.FirstName > 'abc' and ed.FirstName > 'efg'";
+    testPlanMatchingPatterns(query, new String[] {}, new String[] { "Join", "Filter" });
+  }
+
+  @Test
+  public void showTablesDefaultSchema() throws Exception {
+    testNoResult("use testdb");
+    assertEquals(1, testRunAndPrint(QueryType.SQL, "show tables like 'PERSON'"));
+  }
+
+  @Test
+  public void describe() throws Exception {
+    testNoResult("use testdb");
+    assertEquals(16, testRunAndPrint(QueryType.SQL, "describe PERSON"));
+  }
+
+  @Test
+  public void ensureDrillFunctionsAreNotPushedDown() throws Exception {
+    // This should verify that we're not trying to push CONVERT_FROM into the JDBC storage plugin. If were pushing
+    // this function down, the SQL query would fail.
+    testNoResult("select CONVERT_FROM(Address, 'JSON') from testdb.person where PersonId = 1");
+  }
+
+  @Test
+  public void pushdownFilter() throws Exception {
+    testNoResult("use testdb");
+    String query = "select * from person where PersonId = 1";
+    testPlanMatchingPatterns(query, new String[] {}, new String[] { "Filter" });
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/e12cd470/contrib/storage-jdbc/src/test/resources/bootstrap-storage-plugins.json
----------------------------------------------------------------------
diff --git a/contrib/storage-jdbc/src/test/resources/bootstrap-storage-plugins.json b/contrib/storage-jdbc/src/test/resources/bootstrap-storage-plugins.json
new file mode 100755
index 0000000..200ab93
--- /dev/null
+++ b/contrib/storage-jdbc/src/test/resources/bootstrap-storage-plugins.json
@@ -0,0 +1,10 @@
+{
+  "storage":{
+    testdb : {
+      type:"jdbc",
+      enabled: true,
+      driver:"org.apache.derby.jdbc.ClientDriver",
+      url:"jdbc:derby://localhost:20000/memory:testDB;"
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/e12cd470/contrib/storage-jdbc/src/test/resources/logback.xml
----------------------------------------------------------------------
diff --git a/contrib/storage-jdbc/src/test/resources/logback.xml b/contrib/storage-jdbc/src/test/resources/logback.xml
new file mode 100644
index 0000000..5facafe
--- /dev/null
+++ b/contrib/storage-jdbc/src/test/resources/logback.xml
@@ -0,0 +1,48 @@
+<?xml version="1.0" encoding="UTF-8" ?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements.  See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<configuration>
+
+  <appender name="SOCKET" class="de.huxhorn.lilith.logback.appender.ClassicMultiplexSocketAppender">
+    <Compressing>true</Compressing>
+    <ReconnectionDelay>10000</ReconnectionDelay>
+    <IncludeCallerData>true</IncludeCallerData>
+    <RemoteHosts>${LILITH_HOSTNAME:-localhost}</RemoteHosts>
+  </appender>
+
+ <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
+    <!-- encoders are assigned the type
+         ch.qos.logback.classic.encoder.PatternLayoutEncoder by default -->
+    <encoder>
+      <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</pattern>
+    </encoder>
+    <level value="warn" />
+  </appender>
+
+  <logger name="org.apache.drill" additivity="false">
+    <level value="debug" />
+    <appender-ref ref="SOCKET" />
+<!--     <appender-ref ref="STDOUT" /> -->
+  </logger>
+
+  <root>
+    <level value="debug" />
+    <appender-ref ref="SOCKET" />
+<!--     <appender-ref ref="STDOUT" /> -->
+  </root>
+
+</configuration>

http://git-wip-us.apache.org/repos/asf/drill/blob/e12cd470/contrib/storage-mpjdbc/pom.xml
----------------------------------------------------------------------
diff --git a/contrib/storage-mpjdbc/pom.xml b/contrib/storage-mpjdbc/pom.xml
deleted file mode 100755
index 5e9afca..0000000
--- a/contrib/storage-mpjdbc/pom.xml
+++ /dev/null
@@ -1,79 +0,0 @@
-<?xml version="1.0"?>
-<!--
- Licensed to the Apache Software Foundation (ASF) under one or more
- contributor license agreements.  See the NOTICE file distributed with
- this work for additional information regarding copyright ownership.
- The ASF licenses this file to You under the Apache License, Version 2.0
- (the "License"); you may not use this file except in compliance with
- the License.  You may obtain a copy of the License at
-
-     http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
--->
-<project xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd" xmlns="http://maven.apache.org/POM/4.0.0"
-    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
-  <modelVersion>4.0.0</modelVersion>
-  <parent>
-    <artifactId>drill-contrib-parent</artifactId>
-    <groupId>org.apache.drill.contrib</groupId>
-    <version>0.9.0-SNAPSHOT</version>
-  </parent>
-
-  <artifactId>drill-mpjdbc-storage</artifactId>
-
-  <name>contrib/mpjdbc-storage-plugin</name>
-
-  <dependencies>
-    <dependency>
-      <groupId>org.apache.drill.exec</groupId>
-      <artifactId>drill-java-exec</artifactId>
-      <version>${project.version}</version>
-    </dependency>
-    
-
-    <!-- Test dependencie -->
-    <dependency>
-      <groupId>org.apache.drill.exec</groupId>
-      <artifactId>drill-java-exec</artifactId>
-      <classifier>tests</classifier>
-      <version>${project.version}</version>
-      <scope>test</scope>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.drill</groupId>
-      <artifactId>drill-common</artifactId>
-      <classifier>tests</classifier>
-      <version>${project.version}</version>
-      <scope>test</scope>
-    </dependency>
-    <dependency>
-      <groupId>com.yammer.metrics</groupId>
-      <artifactId>metrics-core</artifactId>
-      <version>2.1.1</version>
-      <scope>test</scope>
-    </dependency>
-  </dependencies>
-
-  <build>
-    <plugins>
-      <plugin>
-        <groupId>org.apache.maven.plugins</groupId>
-        <artifactId>maven-surefire-plugin</artifactId>
-        <configuration>
-          <systemProperties>
-            <property>
-              <name>logback.log.dir</name>
-              <value>${project.build.directory}/surefire-reports</value>
-            </property>
-          </systemProperties>
-        </configuration>
-      </plugin>
-    </plugins>
-  </build>
-  
-</project>

http://git-wip-us.apache.org/repos/asf/drill/blob/e12cd470/contrib/storage-mpjdbc/src/main/java/org/apache/drill/exec/store/mpjdbc/MPJdbcBatchCreator.java
----------------------------------------------------------------------
diff --git a/contrib/storage-mpjdbc/src/main/java/org/apache/drill/exec/store/mpjdbc/MPJdbcBatchCreator.java b/contrib/storage-mpjdbc/src/main/java/org/apache/drill/exec/store/mpjdbc/MPJdbcBatchCreator.java
deleted file mode 100644
index af08b2e..0000000
--- a/contrib/storage-mpjdbc/src/main/java/org/apache/drill/exec/store/mpjdbc/MPJdbcBatchCreator.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.store.mpjdbc;
-
-import java.util.Collections;
-import java.util.List;
-
-import org.apache.drill.common.exceptions.ExecutionSetupException;
-import org.apache.drill.common.expression.SchemaPath;
-import org.apache.drill.exec.ops.FragmentContext;
-import org.apache.drill.exec.physical.base.GroupScan;
-import org.apache.drill.exec.physical.impl.BatchCreator;
-import org.apache.drill.exec.physical.impl.ScanBatch;
-//import org.apache.drill.exec.record.CloseableRecordBatch;
-import org.apache.drill.exec.record.RecordBatch;
-import org.apache.drill.exec.store.RecordReader;
-import org.apache.drill.exec.store.mpjdbc.MPJdbcSchemaSubScan;
-
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-
-public class MPJdbcBatchCreator implements BatchCreator<MPJdbcSubScan> {
-  @Override
-  public RecordBatch getBatch(FragmentContext context, MPJdbcSubScan config,
-      List<RecordBatch> children) throws ExecutionSetupException {
-    Preconditions.checkArgument(children.isEmpty());
-    List<RecordReader> readers = Lists.newArrayList();
-    List<SchemaPath> columns = null;
-    try {
-      if ((columns = config.getColumns()) == null) {
-        columns = GroupScan.ALL_COLUMNS;
-      }
-      readers.add(new MPJdbcRecordReader(context,config));
-    } catch (Exception e1) {
-      throw new ExecutionSetupException(e1);
-    }
-    return new ScanBatch(config, context, readers.iterator());
-  }
-}