You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by ad...@apache.org on 2016/09/13 01:32:03 UTC

[16/50] [abbrv] drill git commit: Adding support for Json tables.

Adding support for Json tables.

+ Re-factored code to separate binary table specific code from the common code.
+ Added test cases


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/f97a3332
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/f97a3332
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/f97a3332

Branch: refs/heads/master
Commit: f97a33321887458996cf7e311252fc9e8719feb3
Parents: e19c048
Author: Aditya <ad...@mapr.com>
Authored: Wed Oct 14 23:49:09 2015 -0700
Committer: Aditya Kishore <ad...@apache.org>
Committed: Fri Sep 9 10:08:31 2016 -0700

----------------------------------------------------------------------
 contrib/format-maprdb/pom.xml                   | 221 ++++++--
 .../store/maprdb/CompareFunctionsProcessor.java | 547 -------------------
 .../exec/store/maprdb/MapRDBFilterBuilder.java  | 356 ------------
 .../exec/store/maprdb/MapRDBFormatPlugin.java   |  51 +-
 .../exec/store/maprdb/MapRDBGroupScan.java      | 270 ++-------
 .../store/maprdb/MapRDBPushFilterIntoScan.java  | 144 -----
 .../store/maprdb/MapRDBScanBatchCreator.java    |  21 +-
 .../drill/exec/store/maprdb/MapRDBSubScan.java  | 177 ++----
 .../exec/store/maprdb/MapRDBSubScanSpec.java    | 113 ++++
 .../exec/store/maprdb/MapRDBTableStats.java     |  11 +-
 .../exec/store/maprdb/TabletFragmentInfo.java   | 108 ++++
 .../maprdb/binary/BinaryTableGroupScan.java     | 216 ++++++++
 .../binary/CompareFunctionsProcessor.java       | 547 +++++++++++++++++++
 .../maprdb/binary/MapRDBFilterBuilder.java      | 356 ++++++++++++
 .../maprdb/binary/MapRDBPushFilterIntoScan.java | 141 +++++
 .../store/maprdb/json/JsonTableGroupScan.java   | 186 +++++++
 .../maprdb/json/MaprDBJsonRecordReader.java     | 386 +++++++++++++
 .../drill/exec/store/maprdb/util/CommonFns.java |  26 +
 .../drill/maprdb/tests/MaprDBTestsSuite.java    | 162 ++++++
 .../tests/binary/TestMapRDBFilterPushDown.java  |  47 ++
 .../maprdb/tests/binary/TestMapRDBSimple.java   |  53 ++
 .../drill/maprdb/tests/json/TestSimpleJson.java |  75 +++
 .../src/test/resources/hbase-site.xml           |  25 +
 .../src/test/resources/json/business.json       |  10 +
 .../src/test/resources/logback.xml              |  12 +-
 25 files changed, 2771 insertions(+), 1490 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/f97a3332/contrib/format-maprdb/pom.xml
----------------------------------------------------------------------
diff --git a/contrib/format-maprdb/pom.xml b/contrib/format-maprdb/pom.xml
index 9e299b5..bcb6c29 100644
--- a/contrib/format-maprdb/pom.xml
+++ b/contrib/format-maprdb/pom.xml
@@ -15,102 +15,223 @@
  See the License for the specific language governing permissions and
  limitations under the License.
 -->
-<project xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd" xmlns="http://maven.apache.org/POM/4.0.0"
+<project xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"
+    xmlns="http://maven.apache.org/POM/4.0.0"
     xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
   <modelVersion>4.0.0</modelVersion>
+
   <parent>
-    <artifactId>drill-contrib-parent</artifactId>
-    <groupId>org.apache.drill.contrib</groupId>
-    <version>1.5.0-SNAPSHOT</version>
+    <groupId>com.mapr</groupId>
+    <artifactId>mapr-release</artifactId>
+    <version>5.1.0-mapr-SNAPSHOT</version>
+    <relativePath/>
   </parent>
 
   <artifactId>drill-storage-maprdb</artifactId>
-
   <name>maprdb-storage-plugin</name>
+  <version>1.5.0-SNAPSHOT</version>
 
   <properties>
-    <hbase.TestSuite>**/HBaseTestsSuite.class</hbase.TestSuite>
+    <mapr.version>${project.parent.version}</mapr.version>
+    <drill.version>${project.version}</drill.version>
+    <hbase.version>0.98.12-mapr-1506</hbase.version>
+    <maprdb.TestSuite>**/MaprDBTestsSuite.class</maprdb.TestSuite>
   </properties>
 
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-surefire-plugin</artifactId>
+        <configuration>
+          <includes>
+            <include>${maprdb.TestSuite}</include>
+          </includes>
+          <systemProperties>
+            <property>
+              <name>logback.log.dir</name>
+              <value>${project.build.directory}/surefire-reports</value>
+            </property>
+          </systemProperties>
+        </configuration>
+      </plugin>
+    </plugins>
+  </build>
+
   <dependencies>
     <dependency>
+      <groupId>com.mapr.hadoop</groupId>
+      <artifactId>maprfs</artifactId>
+      <exclusions>
+        <exclusion>
+          <artifactId>commons-logging</artifactId>
+          <groupId>commons-logging</groupId>
+        </exclusion>
+        <exclusion>
+          <artifactId>slf4j-log4j12</artifactId>
+          <groupId>org.slf4j</groupId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+    <dependency>
+      <groupId>com.mapr.fs</groupId>
+      <artifactId>mapr-hbase</artifactId>
+      <exclusions>
+        <exclusion>
+          <artifactId>slf4j-log4j12</artifactId>
+          <groupId>org.slf4j</groupId>
+        </exclusion>
+        <exclusion>
+          <groupId>com.sun.jersey</groupId>
+          <artifactId>jersey-core</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>com.sun.jersey</groupId>
+          <artifactId>jersey-server</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>com.sun.jersey</groupId>
+          <artifactId>jersey-json</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>commons-logging</groupId>
+          <artifactId>commons-logging</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>asm</groupId>
+          <artifactId>asm</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.mortbay.jetty</groupId>
+          <artifactId>servlet-api-2.5</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+    <dependency>
+      <groupId>com.mapr.db</groupId>
+      <artifactId>maprdb</artifactId>
+      <exclusions>
+        <exclusion>
+          <artifactId>slf4j-log4j12</artifactId>
+          <groupId>org.slf4j</groupId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+    <dependency>
       <groupId>org.apache.drill.exec</groupId>
       <artifactId>drill-java-exec</artifactId>
-      <version>${project.version}</version>
+      <version>${drill.version}</version>
+      <exclusions>
+        <exclusion>
+          <artifactId>log4j-over-slf4j</artifactId>
+          <groupId>org.slf4j</groupId>
+        </exclusion>
+      </exclusions>
     </dependency>
     <dependency>
       <groupId>org.apache.drill.contrib</groupId>
       <artifactId>drill-storage-hbase</artifactId>
-      <version>${project.version}</version>
+      <version>${drill.version}</version>
+      <exclusions>
+        <exclusion>
+          <artifactId>log4j-over-slf4j</artifactId>
+          <groupId>org.slf4j</groupId>
+        </exclusion>
+        <exclusion>
+          <artifactId>hbase-client</artifactId>
+          <groupId>org.apache.hbase</groupId>
+        </exclusion>
+      </exclusions>
     </dependency>
+
+    <!-- Test dependencies -->
     <dependency>
-      <groupId>com.mapr.hadoop</groupId>
-      <artifactId>maprfs</artifactId>
+      <groupId>ch.qos.logback</groupId>
+      <artifactId>logback-core</artifactId>
     </dependency>
     <dependency>
-      <groupId>com.mapr.fs</groupId>
-      <artifactId>mapr-hbase</artifactId>
-      <version>4.1.0-mapr</version>
+      <groupId>ch.qos.logback</groupId>
+      <artifactId>logback-classic</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>de.huxhorn.lilith</groupId>
+      <artifactId>de.huxhorn.lilith.logback.appender.multiplex-classic</artifactId>
+      <version>0.9.44</version>
+      <scope>test</scope>
     </dependency>
 
-    <!-- Test dependencies -->
+    <dependency>
+      <groupId>com.mapr</groupId>
+      <artifactId>mapr-java-utils</artifactId>
+      <classifier>tests</classifier>
+      <exclusions>
+        <exclusion>
+          <artifactId>slf4j-log4j12</artifactId>
+          <groupId>org.slf4j</groupId>
+        </exclusion>
+      </exclusions>
+    </dependency>
     <dependency>
       <groupId>org.apache.drill.exec</groupId>
       <artifactId>drill-java-exec</artifactId>
+      <version>${drill.version}</version>
       <classifier>tests</classifier>
-      <version>${project.version}</version>
       <scope>test</scope>
     </dependency>
     <dependency>
       <groupId>org.apache.drill</groupId>
       <artifactId>drill-common</artifactId>
+      <version>${drill.version}</version>
+      <classifier>tests</classifier>
+      <scope>test</scope>
+      <exclusions>
+        <exclusion>
+          <artifactId>log4j-over-slf4j</artifactId>
+          <groupId>org.slf4j</groupId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.drill.contrib</groupId>
+      <artifactId>drill-storage-hbase</artifactId>
+      <version>${drill.version}</version>
       <classifier>tests</classifier>
-      <version>${project.version}</version>
       <scope>test</scope>
     </dependency>
     <dependency>
-      <groupId>com.yammer.metrics</groupId>
-      <artifactId>metrics-core</artifactId>
-      <version>2.1.1</version>
+      <groupId>org.apache.hbase</groupId>
+      <artifactId>hbase-server</artifactId>
       <scope>test</scope>
+      <classifier>tests</classifier>
+      <exclusions>
+        <exclusion>
+          <artifactId>commons-logging</artifactId>
+          <groupId>commons-logging</groupId>
+        </exclusion>
+        <exclusion>
+          <artifactId>servlet-api-2.5</artifactId>
+          <groupId>org.mortbay.jetty</groupId>
+        </exclusion>
+        <exclusion>
+          <artifactId>servlet-api</artifactId>
+          <groupId>javax.servlet</groupId>
+        </exclusion>
+        <exclusion>
+          <artifactId>slf4j-log4j12</artifactId>
+          <groupId>org.slf4j</groupId>
+        </exclusion>
+      </exclusions>
     </dependency>
+
   </dependencies>
 
   <repositories>
     <repository>
       <id>mapr-releases</id>
-        <url>http://repository.mapr.com/nexus/content/repositories/releases</url>
-      <snapshots>
-        <enabled>true</enabled>
-      </snapshots>
-      <releases>
-        <enabled>true</enabled>
-      </releases>
+      <url>http://repository.mapr.com/nexus/content/repositories/releases</url>
+      <snapshots><enabled>true</enabled></snapshots>
+      <releases><enabled>true</enabled></releases>
     </repository>
   </repositories>
 
-  <build>
-    <plugins>
-      <plugin>
-        <groupId>org.apache.maven.plugins</groupId>
-        <artifactId>maven-surefire-plugin</artifactId>
-        <configuration>
-          <includes>
-            <include>${hbase.TestSuite}</include>
-          </includes>
-          <systemProperties>
-            <property>
-              <name>hbase.test.root</name>
-              <value>${project.build.directory}/data</value>
-            </property>
-            <property>
-              <name>logback.log.dir</name>
-              <value>${project.build.directory}/surefire-reports</value>
-            </property>
-          </systemProperties>
-        </configuration>
-      </plugin>
-    </plugins>
-  </build>
-
 </project>

http://git-wip-us.apache.org/repos/asf/drill/blob/f97a3332/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/CompareFunctionsProcessor.java
----------------------------------------------------------------------
diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/CompareFunctionsProcessor.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/CompareFunctionsProcessor.java
deleted file mode 100644
index c6c2504..0000000
--- a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/CompareFunctionsProcessor.java
+++ /dev/null
@@ -1,547 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.store.maprdb;
-
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.Unpooled;
-
-import java.nio.ByteBuffer;
-import java.nio.ByteOrder;
-
-import org.apache.drill.common.expression.CastExpression;
-import org.apache.drill.common.expression.ConvertExpression;
-import org.apache.drill.common.expression.FunctionCall;
-import org.apache.drill.common.expression.LogicalExpression;
-import org.apache.drill.common.expression.SchemaPath;
-import org.apache.drill.common.expression.ValueExpressions.BooleanExpression;
-import org.apache.drill.common.expression.ValueExpressions.DateExpression;
-import org.apache.drill.common.expression.ValueExpressions.DoubleExpression;
-import org.apache.drill.common.expression.ValueExpressions.FloatExpression;
-import org.apache.drill.common.expression.ValueExpressions.IntExpression;
-import org.apache.drill.common.expression.ValueExpressions.LongExpression;
-import org.apache.drill.common.expression.ValueExpressions.QuotedString;
-import org.apache.drill.common.expression.ValueExpressions.TimeExpression;
-import org.apache.drill.common.expression.ValueExpressions.TimeStampExpression;
-import org.apache.drill.common.expression.visitors.AbstractExprVisitor;
-import org.apache.hadoop.hbase.util.Order;
-import org.apache.hadoop.hbase.util.PositionedByteRange;
-import org.apache.hadoop.hbase.util.SimplePositionedByteRange;
-
-import org.apache.drill.exec.store.hbase.DrillHBaseConstants;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.filter.Filter;
-import org.apache.hadoop.hbase.filter.PrefixFilter;
-
-import com.google.common.base.Charsets;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.ImmutableSet;
-
-class CompareFunctionsProcessor extends AbstractExprVisitor<Boolean, LogicalExpression, RuntimeException> {
-  private byte[] value;
-  private boolean success;
-  private boolean isEqualityFn;
-  private SchemaPath path;
-  private String functionName;
-  private boolean sortOrderAscending;
-
-  // Fields for row-key prefix comparison
-  // If the query is on row-key prefix, we cannot use a standard template to identify startRow, stopRow and filter
-  // Hence, we use these local variables(set depending upon the encoding type in user query)
-  private boolean isRowKeyPrefixComparison;
-  byte[] rowKeyPrefixStartRow;
-  byte[] rowKeyPrefixStopRow;
-  Filter rowKeyPrefixFilter;
-
-  public static boolean isCompareFunction(String functionName) {
-    return COMPARE_FUNCTIONS_TRANSPOSE_MAP.keySet().contains(functionName);
-  }
-
-  public static CompareFunctionsProcessor process(FunctionCall call, boolean nullComparatorSupported) {
-    String functionName = call.getName();
-    LogicalExpression nameArg = call.args.get(0);
-    LogicalExpression valueArg = call.args.size() >= 2 ? call.args.get(1) : null;
-    CompareFunctionsProcessor evaluator = new CompareFunctionsProcessor(functionName);
-
-    if (valueArg != null) { // binary function
-      if (VALUE_EXPRESSION_CLASSES.contains(nameArg.getClass())) {
-        LogicalExpression swapArg = valueArg;
-        valueArg = nameArg;
-        nameArg = swapArg;
-        evaluator.functionName = COMPARE_FUNCTIONS_TRANSPOSE_MAP.get(functionName);
-      }
-      evaluator.success = nameArg.accept(evaluator, valueArg);
-    } else if (nullComparatorSupported && call.args.get(0) instanceof SchemaPath) {
-      evaluator.success = true;
-      evaluator.path = (SchemaPath) nameArg;
-    }
-
-    return evaluator;
-  }
-
-  public CompareFunctionsProcessor(String functionName) {
-    this.success = false;
-    this.functionName = functionName;
-    this.isEqualityFn = COMPARE_FUNCTIONS_TRANSPOSE_MAP.containsKey(functionName)
-        && COMPARE_FUNCTIONS_TRANSPOSE_MAP.get(functionName).equals(functionName);
-    this.isRowKeyPrefixComparison = false;
-    this.sortOrderAscending = true;
-  }
-
-  public byte[] getValue() {
-    return value;
-  }
-
-  public boolean isSuccess() {
-    return success;
-  }
-
-  public SchemaPath getPath() {
-    return path;
-  }
-
-  public String getFunctionName() {
-    return functionName;
-  }
-
-  public boolean isRowKeyPrefixComparison() {
-	return isRowKeyPrefixComparison;
-  }
-
-  public byte[] getRowKeyPrefixStartRow() {
-    return rowKeyPrefixStartRow;
-  }
-
-  public byte[] getRowKeyPrefixStopRow() {
-  return rowKeyPrefixStopRow;
-  }
-
-  public Filter getRowKeyPrefixFilter() {
-  return rowKeyPrefixFilter;
-  }
-
-  public boolean isSortOrderAscending() {
-    return sortOrderAscending;
-  }
-
-  @Override
-  public Boolean visitCastExpression(CastExpression e, LogicalExpression valueArg) throws RuntimeException {
-    if (e.getInput() instanceof CastExpression || e.getInput() instanceof SchemaPath) {
-      return e.getInput().accept(this, valueArg);
-    }
-    return false;
-  }
-
-  @Override
-  public Boolean visitConvertExpression(ConvertExpression e, LogicalExpression valueArg) throws RuntimeException {
-    if (e.getConvertFunction() == ConvertExpression.CONVERT_FROM) {
-
-      String encodingType = e.getEncodingType();
-      int prefixLength    = 0;
-
-      // Handle scan pruning in the following scenario:
-      // The row-key is a composite key and the CONVERT_FROM() function has byte_substr() as input function which is
-      // querying for the first few bytes of the row-key(start-offset 1)
-      // Example WHERE clause:
-      // CONVERT_FROM(BYTE_SUBSTR(row_key, 1, 8), 'DATE_EPOCH_BE') < DATE '2015-06-17'
-      if (e.getInput() instanceof FunctionCall) {
-
-        // We can prune scan range only for big-endian encoded data
-        if (encodingType.endsWith("_BE") == false) {
-          return false;
-        }
-
-        FunctionCall call = (FunctionCall)e.getInput();
-        String functionName = call.getName();
-        if (!functionName.equalsIgnoreCase("byte_substr")) {
-          return false;
-        }
-
-        LogicalExpression nameArg = call.args.get(0);
-        LogicalExpression valueArg1 = call.args.size() >= 2 ? call.args.get(1) : null;
-        LogicalExpression valueArg2 = call.args.size() >= 3 ? call.args.get(2) : null;
-
-        if (((nameArg instanceof SchemaPath) == false) ||
-             (valueArg1 == null) || ((valueArg1 instanceof IntExpression) == false) ||
-             (valueArg2 == null) || ((valueArg2 instanceof IntExpression) == false)) {
-          return false;
-        }
-
-        boolean isRowKey = ((SchemaPath)nameArg).getAsUnescapedPath().equals(DrillHBaseConstants.ROW_KEY);
-        int offset = ((IntExpression)valueArg1).getInt();
-
-        if (!isRowKey || (offset != 1)) {
-          return false;
-        }
-
-        this.path    = (SchemaPath)nameArg;
-        prefixLength = ((IntExpression)valueArg2).getInt();
-        this.isRowKeyPrefixComparison = true;
-        return visitRowKeyPrefixConvertExpression(e, prefixLength, valueArg);
-      }
-
-      if (e.getInput() instanceof SchemaPath) {
-        ByteBuf bb = null;
-
-        switch (encodingType) {
-        case "INT_BE":
-        case "INT":
-        case "UINT_BE":
-        case "UINT":
-        case "UINT4_BE":
-        case "UINT4":
-          if (valueArg instanceof IntExpression
-              && (isEqualityFn || encodingType.startsWith("U"))) {
-            bb = newByteBuf(4, encodingType.endsWith("_BE"));
-            bb.writeInt(((IntExpression)valueArg).getInt());
-          }
-          break;
-        case "BIGINT_BE":
-        case "BIGINT":
-        case "UINT8_BE":
-        case "UINT8":
-          if (valueArg instanceof LongExpression
-              && (isEqualityFn || encodingType.startsWith("U"))) {
-            bb = newByteBuf(8, encodingType.endsWith("_BE"));
-            bb.writeLong(((LongExpression)valueArg).getLong());
-          }
-          break;
-        case "FLOAT":
-          if (valueArg instanceof FloatExpression && isEqualityFn) {
-            bb = newByteBuf(4, true);
-            bb.writeFloat(((FloatExpression)valueArg).getFloat());
-          }
-          break;
-        case "DOUBLE":
-          if (valueArg instanceof DoubleExpression && isEqualityFn) {
-            bb = newByteBuf(8, true);
-            bb.writeDouble(((DoubleExpression)valueArg).getDouble());
-          }
-          break;
-        case "TIME_EPOCH":
-        case "TIME_EPOCH_BE":
-          if (valueArg instanceof TimeExpression) {
-            bb = newByteBuf(8, encodingType.endsWith("_BE"));
-            bb.writeLong(((TimeExpression)valueArg).getTime());
-          }
-          break;
-        case "DATE_EPOCH":
-        case "DATE_EPOCH_BE":
-          if (valueArg instanceof DateExpression) {
-            bb = newByteBuf(8, encodingType.endsWith("_BE"));
-            bb.writeLong(((DateExpression)valueArg).getDate());
-          }
-          break;
-        case "BOOLEAN_BYTE":
-          if (valueArg instanceof BooleanExpression) {
-            bb = newByteBuf(1, false /* does not matter */);
-            bb.writeByte(((BooleanExpression)valueArg).getBoolean() ? 1 : 0);
-          }
-          break;
-        case "DOUBLE_OB":
-        case "DOUBLE_OBD":
-          if (valueArg instanceof DoubleExpression) {
-            bb = newByteBuf(9, true);
-            PositionedByteRange br = new SimplePositionedByteRange(bb.array(), 0, 9);
-            if (encodingType.endsWith("_OBD")) {
-              org.apache.hadoop.hbase.util.OrderedBytes.encodeFloat64(br,
-                  ((DoubleExpression)valueArg).getDouble(), Order.DESCENDING);
-              this.sortOrderAscending = false;
-            } else {
-              org.apache.hadoop.hbase.util.OrderedBytes.encodeFloat64(br,
-                  ((DoubleExpression)valueArg).getDouble(), Order.ASCENDING);
-            }
-          }
-          break;
-        case "FLOAT_OB":
-        case "FLOAT_OBD":
-          if (valueArg instanceof FloatExpression) {
-            bb = newByteBuf(5, true);
-            PositionedByteRange br = new SimplePositionedByteRange(bb.array(), 0, 5);
-            if (encodingType.endsWith("_OBD")) {
-              org.apache.hadoop.hbase.util.OrderedBytes.encodeFloat32(br,
-                  ((FloatExpression)valueArg).getFloat(), Order.DESCENDING);
-              this.sortOrderAscending = false;
-            } else {
-              org.apache.hadoop.hbase.util.OrderedBytes.encodeFloat32(br,
-                        ((FloatExpression)valueArg).getFloat(), Order.ASCENDING);
-            }
-          }
-          break;
-        case "BIGINT_OB":
-        case "BIGINT_OBD":
-          if (valueArg instanceof LongExpression) {
-            bb = newByteBuf(9, true);
-            PositionedByteRange br = new SimplePositionedByteRange(bb.array(), 0, 9);
-            if (encodingType.endsWith("_OBD")) {
-              org.apache.hadoop.hbase.util.OrderedBytes.encodeInt64(br,
-                        ((LongExpression)valueArg).getLong(), Order.DESCENDING);
-              this.sortOrderAscending = false;
-            } else {
-              org.apache.hadoop.hbase.util.OrderedBytes.encodeInt64(br,
-                  ((LongExpression)valueArg).getLong(), Order.ASCENDING);
-            }
-          }
-          break;
-        case "INT_OB":
-        case "INT_OBD":
-          if (valueArg instanceof IntExpression) {
-            bb = newByteBuf(5, true);
-            PositionedByteRange br = new SimplePositionedByteRange(bb.array(), 0, 5);
-            if (encodingType.endsWith("_OBD")) {
-              org.apache.hadoop.hbase.util.OrderedBytes.encodeInt32(br,
-                  ((IntExpression)valueArg).getInt(), Order.DESCENDING);
-              this.sortOrderAscending = false;
-            } else {
-              org.apache.hadoop.hbase.util.OrderedBytes.encodeInt32(br,
-                        ((IntExpression)valueArg).getInt(), Order.ASCENDING);
-            }
-          }
-          break;
-        case "UTF8_OB":
-        case "UTF8_OBD":
-          if (valueArg instanceof QuotedString) {
-            int stringLen = ((QuotedString) valueArg).value.getBytes(Charsets.UTF_8).length;
-            bb = newByteBuf(stringLen + 2, true);
-            PositionedByteRange br = new SimplePositionedByteRange(bb.array(), 0, stringLen + 2);
-            if (encodingType.endsWith("_OBD")) {
-              org.apache.hadoop.hbase.util.OrderedBytes.encodeString(br,
-                  ((QuotedString)valueArg).value, Order.DESCENDING);
-              this.sortOrderAscending = false;
-            } else {
-              org.apache.hadoop.hbase.util.OrderedBytes.encodeString(br,
-                        ((QuotedString)valueArg).value, Order.ASCENDING);
-            }
-          }
-          break;
-        case "UTF8":
-        // let visitSchemaPath() handle this.
-          return e.getInput().accept(this, valueArg);
-        }
-
-        if (bb != null) {
-          this.value = bb.array();
-          this.path = (SchemaPath)e.getInput();
-          return true;
-        }
-      }
-    }
-    return false;
-  }
-
-  private Boolean visitRowKeyPrefixConvertExpression(ConvertExpression e,
-    int prefixLength, LogicalExpression valueArg) {
-    String encodingType = e.getEncodingType();
-    rowKeyPrefixStartRow = HConstants.EMPTY_START_ROW;
-    rowKeyPrefixStopRow  = HConstants.EMPTY_START_ROW;
-    rowKeyPrefixFilter   = null;
-
-    if ((encodingType.compareTo("UINT4_BE") == 0) ||
-        (encodingType.compareTo("UINT_BE") == 0)) {
-      if (prefixLength != 4) {
-        throw new RuntimeException("Invalid length(" + prefixLength + ") of row-key prefix");
-      }
-
-      int val;
-      if ((valueArg instanceof IntExpression) == false) {
-        return false;
-      }
-
-      val = ((IntExpression)valueArg).getInt();
-
-      // For TIME_EPOCH_BE/BIGINT_BE encoding, the operators that we push-down are =, <>, <, <=, >, >=
-      switch (functionName) {
-      case "equal":
-        rowKeyPrefixFilter = new PrefixFilter(ByteBuffer.allocate(4).putInt(val).array());
-        rowKeyPrefixStartRow = ByteBuffer.allocate(4).putInt(val).array();
-        rowKeyPrefixStopRow = ByteBuffer.allocate(4).putInt(val + 1).array();
-        return true;
-      case "greater_than_or_equal_to":
-        rowKeyPrefixStartRow = ByteBuffer.allocate(4).putInt(val).array();
-        return true;
-      case "greater_than":
-        rowKeyPrefixStartRow = ByteBuffer.allocate(4).putInt(val + 1).array();
-        return true;
-      case "less_than_or_equal_to":
-        rowKeyPrefixStopRow = ByteBuffer.allocate(4).putInt(val + 1).array();
-        return true;
-      case "less_than":
-        rowKeyPrefixStopRow = ByteBuffer.allocate(4).putInt(val).array();
-        return true;
-      }
-
-      return false;
-    }
-
-    if ((encodingType.compareTo("TIMESTAMP_EPOCH_BE") == 0) ||
-        (encodingType.compareTo("TIME_EPOCH_BE") == 0) ||
-        (encodingType.compareTo("UINT8_BE") == 0)) {
-
-      if (prefixLength != 8) {
-        throw new RuntimeException("Invalid length(" + prefixLength + ") of row-key prefix");
-      }
-
-      long val;
-      if (encodingType.compareTo("TIME_EPOCH_BE") == 0) {
-        if ((valueArg instanceof TimeExpression) == false) {
-          return false;
-        }
-
-        val = ((TimeExpression)valueArg).getTime();
-      } else if (encodingType.compareTo("UINT8_BE") == 0){
-        if ((valueArg instanceof LongExpression) == false) {
-          return false;
-        }
-
-        val = ((LongExpression)valueArg).getLong();
-      } else if (encodingType.compareTo("TIMESTAMP_EPOCH_BE") == 0) {
-        if ((valueArg instanceof TimeStampExpression) == false) {
-          return false;
-        }
-
-        val = ((TimeStampExpression)valueArg).getTimeStamp();
-      } else {
-        // Should not reach here.
-        return false;
-      }
-
-      // For TIME_EPOCH_BE/BIGINT_BE encoding, the operators that we push-down are =, <>, <, <=, >, >=
-      switch (functionName) {
-      case "equal":
-        rowKeyPrefixFilter = new PrefixFilter(ByteBuffer.allocate(8).putLong(val).array());
-        rowKeyPrefixStartRow = ByteBuffer.allocate(8).putLong(val).array();
-        rowKeyPrefixStopRow = ByteBuffer.allocate(8).putLong(val + 1).array();
-        return true;
-      case "greater_than_or_equal_to":
-        rowKeyPrefixStartRow = ByteBuffer.allocate(8).putLong(val).array();
-        return true;
-      case "greater_than":
-        rowKeyPrefixStartRow = ByteBuffer.allocate(8).putLong(val + 1).array();
-        return true;
-      case "less_than_or_equal_to":
-        rowKeyPrefixStopRow = ByteBuffer.allocate(8).putLong(val + 1).array();
-        return true;
-      case "less_than":
-        rowKeyPrefixStopRow = ByteBuffer.allocate(8).putLong(val).array();
-        return true;
-      }
-
-      return false;
-    }
-
-    if (encodingType.compareTo("DATE_EPOCH_BE") == 0) {
-      if ((valueArg instanceof DateExpression) == false) {
-        return false;
-      }
-
-      if (prefixLength != 8) {
-        throw new RuntimeException("Invalid length(" + prefixLength + ") of row-key prefix");
-      }
-
-      final long MILLISECONDS_IN_A_DAY  = (long)1000 * 60 * 60 * 24;
-      long dateToSet;
-      // For DATE encoding, the operators that we push-down are =, <>, <, <=, >, >=
-      switch (functionName) {
-      case "equal":
-        long startDate = ((DateExpression)valueArg).getDate();
-        rowKeyPrefixStartRow = ByteBuffer.allocate(8).putLong(startDate).array();
-        long stopDate  = ((DateExpression)valueArg).getDate() + MILLISECONDS_IN_A_DAY;
-        rowKeyPrefixStopRow = ByteBuffer.allocate(8).putLong(stopDate).array();
-        return true;
-      case "greater_than_or_equal_to":
-        dateToSet = ((DateExpression)valueArg).getDate();
-        rowKeyPrefixStartRow = ByteBuffer.allocate(8).putLong(dateToSet).array();
-        return true;
-      case "greater_than":
-        dateToSet = ((DateExpression)valueArg).getDate() + MILLISECONDS_IN_A_DAY;
-        rowKeyPrefixStartRow = ByteBuffer.allocate(8).putLong(dateToSet).array();
-        return true;
-      case "less_than_or_equal_to":
-        dateToSet = ((DateExpression)valueArg).getDate() + MILLISECONDS_IN_A_DAY;
-        rowKeyPrefixStopRow = ByteBuffer.allocate(8).putLong(dateToSet).array();
-        return true;
-      case "less_than":
-        dateToSet = ((DateExpression)valueArg).getDate();
-        rowKeyPrefixStopRow = ByteBuffer.allocate(8).putLong(dateToSet).array();
-        return true;
-      }
-
-      return false;
-    }
-
-    return false;
-  }
-
-  @Override
-  public Boolean visitUnknown(LogicalExpression e, LogicalExpression valueArg) throws RuntimeException {
-    return false;
-  }
-
-  @Override
-  public Boolean visitSchemaPath(SchemaPath path, LogicalExpression valueArg) throws RuntimeException {
-    if (valueArg instanceof QuotedString) {
-      this.value = ((QuotedString) valueArg).value.getBytes(Charsets.UTF_8);
-      this.path = path;
-      return true;
-    }
-    return false;
-  }
-
-  private static ByteBuf newByteBuf(int size, boolean bigEndian) {
-    return Unpooled.wrappedBuffer(new byte[size])
-        .order(bigEndian ? ByteOrder.BIG_ENDIAN : ByteOrder.LITTLE_ENDIAN)
-        .writerIndex(0);
-  }
-
-  private static final ImmutableSet<Class<? extends LogicalExpression>> VALUE_EXPRESSION_CLASSES;
-  static {
-    ImmutableSet.Builder<Class<? extends LogicalExpression>> builder = ImmutableSet.builder();
-    VALUE_EXPRESSION_CLASSES = builder
-        .add(BooleanExpression.class)
-        .add(DateExpression.class)
-        .add(DoubleExpression.class)
-        .add(FloatExpression.class)
-        .add(IntExpression.class)
-        .add(LongExpression.class)
-        .add(QuotedString.class)
-        .add(TimeExpression.class)
-        .build();
-  }
-
-  private static final ImmutableMap<String, String> COMPARE_FUNCTIONS_TRANSPOSE_MAP;
-  static {
-    ImmutableMap.Builder<String, String> builder = ImmutableMap.builder();
-    COMPARE_FUNCTIONS_TRANSPOSE_MAP = builder
-        // unary functions
-        .put("isnotnull", "isnotnull")
-        .put("isNotNull", "isNotNull")
-        .put("is not null", "is not null")
-        .put("isnull", "isnull")
-        .put("isNull", "isNull")
-        .put("is null", "is null")
-        // binary functions
-        .put("like", "like")
-        .put("equal", "equal")
-        .put("not_equal", "not_equal")
-        .put("greater_than_or_equal_to", "less_than_or_equal_to")
-        .put("greater_than", "less_than")
-        .put("less_than_or_equal_to", "greater_than_or_equal_to")
-        .put("less_than", "greater_than")
-        .build();
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/drill/blob/f97a3332/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/MapRDBFilterBuilder.java
----------------------------------------------------------------------
diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/MapRDBFilterBuilder.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/MapRDBFilterBuilder.java
deleted file mode 100644
index 857d799..0000000
--- a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/MapRDBFilterBuilder.java
+++ /dev/null
@@ -1,356 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.store.maprdb;
-
-import java.util.Arrays;
-
-import org.apache.drill.common.expression.BooleanOperator;
-import org.apache.drill.common.expression.FunctionCall;
-import org.apache.drill.common.expression.LogicalExpression;
-import org.apache.drill.common.expression.SchemaPath;
-import org.apache.drill.common.expression.visitors.AbstractExprVisitor;
-import org.apache.drill.exec.store.hbase.DrillHBaseConstants;
-import org.apache.drill.exec.store.hbase.HBaseRegexParser;
-import org.apache.drill.exec.store.hbase.HBaseScanSpec;
-import org.apache.drill.exec.store.hbase.HBaseUtils;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.filter.BinaryComparator;
-import org.apache.hadoop.hbase.filter.ByteArrayComparable;
-import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
-import org.apache.hadoop.hbase.filter.Filter;
-import org.apache.hadoop.hbase.filter.NullComparator;
-import org.apache.hadoop.hbase.filter.PrefixFilter;
-import org.apache.hadoop.hbase.filter.RegexStringComparator;
-import org.apache.hadoop.hbase.filter.RowFilter;
-import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
-
-import com.google.common.base.Charsets;
-import com.google.common.collect.ImmutableList;
-
-public class MapRDBFilterBuilder extends AbstractExprVisitor<HBaseScanSpec, Void, RuntimeException> implements DrillHBaseConstants {
-
-  final private MapRDBGroupScan groupScan;
-
-  final private LogicalExpression le;
-
-  private boolean allExpressionsConverted = true;
-
-  private static Boolean nullComparatorSupported = null;
-
-  MapRDBFilterBuilder(MapRDBGroupScan groupScan, LogicalExpression le) {
-    this.groupScan = groupScan;
-    this.le = le;
-  }
-
-  public HBaseScanSpec parseTree() {
-    HBaseScanSpec parsedSpec = le.accept(this, null);
-    if (parsedSpec != null) {
-      parsedSpec = mergeScanSpecs("booleanAnd", this.groupScan.getHBaseScanSpec(), parsedSpec);
-      /*
-       * If RowFilter is THE filter attached to the scan specification,
-       * remove it since its effect is also achieved through startRow and stopRow.
-       */
-      Filter filter = parsedSpec.getFilter();
-      if (filter instanceof RowFilter &&
-          ((RowFilter)filter).getOperator() != CompareOp.NOT_EQUAL &&
-          ((RowFilter)filter).getComparator() instanceof BinaryComparator) {
-    	  filter = null;
-      }
-    }
-    return parsedSpec;
-  }
-
-  public boolean isAllExpressionsConverted() {
-    return allExpressionsConverted;
-  }
-
-  @Override
-  public HBaseScanSpec visitUnknown(LogicalExpression e, Void value) throws RuntimeException {
-    allExpressionsConverted = false;
-    return null;
-  }
-
-  @Override
-  public HBaseScanSpec visitBooleanOperator(BooleanOperator op, Void value) throws RuntimeException {
-    return visitFunctionCall(op, value);
-  }
-
-  @Override
-  public HBaseScanSpec visitFunctionCall(FunctionCall call, Void value) throws RuntimeException {
-    HBaseScanSpec nodeScanSpec = null;
-    String functionName = call.getName();
-    ImmutableList<LogicalExpression> args = call.args;
-
-    if (CompareFunctionsProcessor.isCompareFunction(functionName)) {
-      /*
-       * HBASE-10848: Bug in HBase versions (0.94.[0-18], 0.96.[0-2], 0.98.[0-1])
-       * causes a filter with NullComparator to fail. Enable only if specified in
-       * the configuration (after ensuring that the HBase cluster has the fix).
-       */
-      if (nullComparatorSupported == null) {
-        nullComparatorSupported = groupScan.getHBaseConf().getBoolean("drill.hbase.supports.null.comparator", false);
-      }
-
-      CompareFunctionsProcessor processor = CompareFunctionsProcessor.process(call, nullComparatorSupported);
-      if (processor.isSuccess()) {
-        nodeScanSpec = createHBaseScanSpec(call, processor);
-      }
-    } else {
-      switch (functionName) {
-      case "booleanAnd":
-      case "booleanOr":
-        HBaseScanSpec firstScanSpec = args.get(0).accept(this, null);
-        for (int i = 1; i < args.size(); ++i) {
-          HBaseScanSpec nextScanSpec = args.get(i).accept(this, null);
-          if (firstScanSpec != null && nextScanSpec != null) {
-            nodeScanSpec = mergeScanSpecs(functionName, firstScanSpec, nextScanSpec);
-          } else {
-            allExpressionsConverted = false;
-            if ("booleanAnd".equals(functionName)) {
-              nodeScanSpec = firstScanSpec == null ? nextScanSpec : firstScanSpec;
-            }
-          }
-          firstScanSpec = nodeScanSpec;
-        }
-        break;
-      }
-    }
-
-    if (nodeScanSpec == null) {
-      allExpressionsConverted = false;
-    }
-
-    return nodeScanSpec;
-  }
-
-  private HBaseScanSpec mergeScanSpecs(String functionName, HBaseScanSpec leftScanSpec, HBaseScanSpec rightScanSpec) {
-    Filter newFilter = null;
-    byte[] startRow = HConstants.EMPTY_START_ROW;
-    byte[] stopRow = HConstants.EMPTY_END_ROW;
-
-    switch (functionName) {
-    case "booleanAnd":
-      newFilter = HBaseUtils.andFilterAtIndex(leftScanSpec.getFilter(), -1, rightScanSpec.getFilter()); //HBaseUtils.LAST_FILTER
-      startRow = HBaseUtils.maxOfStartRows(leftScanSpec.getStartRow(), rightScanSpec.getStartRow());
-      stopRow = HBaseUtils.minOfStopRows(leftScanSpec.getStopRow(), rightScanSpec.getStopRow());
-      break;
-    case "booleanOr":
-      newFilter = HBaseUtils.orFilterAtIndex(leftScanSpec.getFilter(), -1, rightScanSpec.getFilter()); //HBaseUtils.LAST_FILTER
-      startRow = HBaseUtils.minOfStartRows(leftScanSpec.getStartRow(), rightScanSpec.getStartRow());
-      stopRow = HBaseUtils.maxOfStopRows(leftScanSpec.getStopRow(), rightScanSpec.getStopRow());
-    }
-    return new HBaseScanSpec(groupScan.getTableName(), startRow, stopRow, newFilter);
-  }
-
-  private HBaseScanSpec createHBaseScanSpec(FunctionCall call, CompareFunctionsProcessor processor) {
-    String functionName = processor.getFunctionName();
-    SchemaPath field = processor.getPath();
-    byte[] fieldValue = processor.getValue();
-    boolean sortOrderAscending = processor.isSortOrderAscending();
-    boolean isRowKey = field.getAsUnescapedPath().equals(ROW_KEY);
-    if (!(isRowKey
-        || (!field.getRootSegment().isLastPath()
-            && field.getRootSegment().getChild().isLastPath()
-            && field.getRootSegment().getChild().isNamed())
-           )
-        ) {
-      /*
-       * if the field in this function is neither the row_key nor a qualified HBase column, return.
-       */
-      return null;
-    }
-
-    if (processor.isRowKeyPrefixComparison()) {
-      return createRowKeyPrefixScanSpec(call, processor);
-    }
-
-    CompareOp compareOp = null;
-    boolean isNullTest = false;
-    ByteArrayComparable comparator = new BinaryComparator(fieldValue);
-    byte[] startRow = HConstants.EMPTY_START_ROW;
-    byte[] stopRow = HConstants.EMPTY_END_ROW;
-    switch (functionName) {
-    case "equal":
-      compareOp = CompareOp.EQUAL;
-      if (isRowKey) {
-        startRow = fieldValue;
-        /* stopRow should be just greater than 'value'*/
-        stopRow = Arrays.copyOf(fieldValue, fieldValue.length+1);
-        compareOp = CompareOp.EQUAL;
-      }
-      break;
-    case "not_equal":
-      compareOp = CompareOp.NOT_EQUAL;
-      break;
-    case "greater_than_or_equal_to":
-      if (sortOrderAscending) {
-        compareOp = CompareOp.GREATER_OR_EQUAL;
-        if (isRowKey) {
-          startRow = fieldValue;
-        }
-      } else {
-        compareOp = CompareOp.LESS_OR_EQUAL;
-        if (isRowKey) {
-          // stopRow should be just greater than 'value'
-          stopRow = Arrays.copyOf(fieldValue, fieldValue.length+1);
-        }
-      }
-      break;
-    case "greater_than":
-      if (sortOrderAscending) {
-        compareOp = CompareOp.GREATER;
-        if (isRowKey) {
-          // startRow should be just greater than 'value'
-          startRow = Arrays.copyOf(fieldValue, fieldValue.length+1);
-        }
-      } else {
-        compareOp = CompareOp.LESS;
-        if (isRowKey) {
-          stopRow = fieldValue;
-        }
-      }
-      break;
-    case "less_than_or_equal_to":
-      if (sortOrderAscending) {
-        compareOp = CompareOp.LESS_OR_EQUAL;
-        if (isRowKey) {
-          // stopRow should be just greater than 'value'
-          stopRow = Arrays.copyOf(fieldValue, fieldValue.length+1);
-        }
-      } else {
-        compareOp = CompareOp.GREATER_OR_EQUAL;
-        if (isRowKey) {
-          startRow = fieldValue;
-        }
-      }
-      break;
-    case "less_than":
-      if (sortOrderAscending) {
-        compareOp = CompareOp.LESS;
-        if (isRowKey) {
-          stopRow = fieldValue;
-        }
-      } else {
-        compareOp = CompareOp.GREATER;
-        if (isRowKey) {
-          // startRow should be just greater than 'value'
-          startRow = Arrays.copyOf(fieldValue, fieldValue.length+1);
-        }
-      }
-      break;
-    case "isnull":
-    case "isNull":
-    case "is null":
-      if (isRowKey) {
-        return null;
-      }
-      isNullTest = true;
-      compareOp = CompareOp.EQUAL;
-      comparator = new NullComparator();
-      break;
-    case "isnotnull":
-    case "isNotNull":
-    case "is not null":
-      if (isRowKey) {
-        return null;
-      }
-      compareOp = CompareOp.NOT_EQUAL;
-      comparator = new NullComparator();
-      break;
-    case "like":
-      /*
-       * Convert the LIKE operand to Regular Expression pattern so that we can
-       * apply RegexStringComparator()
-       */
-      HBaseRegexParser parser = new HBaseRegexParser(call).parse();
-      compareOp = CompareOp.EQUAL;
-      comparator = new RegexStringComparator(parser.getRegexString());
-
-      /*
-       * We can possibly do better if the LIKE operator is on the row_key
-       */
-      if (isRowKey) {
-        String prefix = parser.getPrefixString();
-        if (prefix != null) { // group 3 is literal
-          /*
-           * If there is a literal prefix, it can help us prune the scan to a sub range
-           */
-          if (prefix.equals(parser.getLikeString())) {
-            /* The operand value is literal. This turns the LIKE operator to EQUAL operator */
-            startRow = stopRow = fieldValue;
-            compareOp = null;
-          } else {
-            startRow = prefix.getBytes(Charsets.UTF_8);
-            stopRow = startRow.clone();
-            boolean isMaxVal = true;
-            for (int i = stopRow.length - 1; i >= 0 ; --i) {
-              int nextByteValue = (0xff & stopRow[i]) + 1;
-              if (nextByteValue < 0xff) {
-                stopRow[i] = (byte) nextByteValue;
-                isMaxVal = false;
-                break;
-              } else {
-                stopRow[i] = 0;
-              }
-            }
-            if (isMaxVal) {
-              stopRow = HConstants.EMPTY_END_ROW;
-            }
-          }
-        }
-      }
-      break;
-    }
-
-    if (compareOp != null || startRow != HConstants.EMPTY_START_ROW || stopRow != HConstants.EMPTY_END_ROW) {
-      Filter filter = null;
-      if (isRowKey) {
-        if (compareOp != null) {
-          filter = new RowFilter(compareOp, comparator);
-        }
-      } else {
-        byte[] family = HBaseUtils.getBytes(field.getRootSegment().getPath());
-        byte[] qualifier = HBaseUtils.getBytes(field.getRootSegment().getChild().getNameSegment().getPath());
-        filter = new SingleColumnValueFilter(family, qualifier, compareOp, comparator);
-        ((SingleColumnValueFilter)filter).setLatestVersionOnly(true);
-        if (!isNullTest) {
-          ((SingleColumnValueFilter)filter).setFilterIfMissing(true);
-        }
-      }
-      return new HBaseScanSpec(groupScan.getTableName(), startRow, stopRow, filter);
-    }
-    // else
-    return null;
-  }
-
-  private HBaseScanSpec createRowKeyPrefixScanSpec(FunctionCall call,
-      CompareFunctionsProcessor processor) {
-    byte[] startRow = processor.getRowKeyPrefixStartRow();
-    byte[] stopRow  = processor.getRowKeyPrefixStopRow();
-    Filter filter   = processor.getRowKeyPrefixFilter();
-
-    if (startRow != HConstants.EMPTY_START_ROW ||
-      stopRow != HConstants.EMPTY_END_ROW ||
-      filter != null) {
-      return new HBaseScanSpec(groupScan.getTableName(), startRow, stopRow, filter);
-    }
-
-    // else
-    return null;
-  }
-}

http://git-wip-us.apache.org/repos/asf/drill/blob/f97a3332/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/MapRDBFormatPlugin.java
----------------------------------------------------------------------
diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/MapRDBFormatPlugin.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/MapRDBFormatPlugin.java
index aa91901..d22434d 100644
--- a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/MapRDBFormatPlugin.java
+++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/MapRDBFormatPlugin.java
@@ -17,7 +17,11 @@
  */
 package org.apache.drill.exec.store.maprdb;
 
+import static com.mapr.fs.jni.MapRConstants.MAPRFS_PREFIX;
+
 import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
 import java.util.List;
 import java.util.Set;
 
@@ -31,26 +35,36 @@ import org.apache.drill.exec.physical.base.PhysicalOperator;
 import org.apache.drill.exec.server.DrillbitContext;
 import org.apache.drill.exec.store.StoragePluginOptimizerRule;
 import org.apache.drill.exec.store.dfs.FileSelection;
+import org.apache.drill.exec.store.dfs.FileSystemConfig;
 import org.apache.drill.exec.store.dfs.FileSystemPlugin;
 import org.apache.drill.exec.store.dfs.FormatMatcher;
 import org.apache.drill.exec.store.dfs.FormatPlugin;
 import org.apache.drill.exec.store.hbase.HBaseScanSpec;
+import org.apache.drill.exec.store.maprdb.binary.BinaryTableGroupScan;
+import org.apache.drill.exec.store.maprdb.binary.MapRDBPushFilterIntoScan;
+import org.apache.drill.exec.store.maprdb.json.JsonTableGroupScan;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
 
 import com.fasterxml.jackson.annotation.JsonIgnore;
 import com.google.common.collect.ImmutableSet;
+import com.mapr.fs.MapRFileSystem;
+import com.mapr.fs.tables.TableProperties;
 
 public class MapRDBFormatPlugin implements FormatPlugin {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory
       .getLogger(MapRDBFormatPlugin.class);
 
-  private final StoragePluginConfig storageConfig;
+  private final FileSystemConfig storageConfig;
   private final MapRDBFormatPluginConfig config;
   private final MapRDBFormatMatcher matcher;
   private final Configuration fsConf;
   private final DrillbitContext context;
   private final String name;
 
+  private volatile FileSystemPlugin storagePlugin;
+  private volatile MapRFileSystem maprfs;
+
   public MapRDBFormatPlugin(String name, DrillbitContext context, Configuration fsConf,
       StoragePluginConfig storageConfig) {
     this(name, context, fsConf, storageConfig, new MapRDBFormatPluginConfig());
@@ -61,9 +75,15 @@ public class MapRDBFormatPlugin implements FormatPlugin {
     this.context = context;
     this.config = formatConfig;
     this.matcher = new MapRDBFormatMatcher(this);
-    this.storageConfig = storageConfig;
+    this.storageConfig = (FileSystemConfig) storageConfig;
     this.fsConf = fsConf;
     this.name = name == null ? "maprdb" : name;
+    try {
+      this.maprfs = new MapRFileSystem();
+      maprfs.initialize(new URI(MAPRFS_PREFIX), fsConf);
+    } catch (IOException | URISyntaxException e) {
+      throw new RuntimeException(e);
+    }
   }
 
   @Override
@@ -108,14 +128,14 @@ public class MapRDBFormatPlugin implements FormatPlugin {
     List<String> files = selection.getFiles();
     assert (files.size() == 1);
     String tableName = files.get(0);
-    HBaseScanSpec scanSpec = new HBaseScanSpec(tableName);
-    try {
-      return new MapRDBGroupScan(userName,
-          (FileSystemPlugin) (context.getStorage().getPlugin(storageConfig)), this, scanSpec,
-          columns);
-    } catch (ExecutionSetupException e) {
-      e.printStackTrace();
-      return null;
+    TableProperties props = maprfs.getTableProperties(new Path(tableName));
+
+    if (props.getAttr().getJson()) {
+      MapRDBSubScanSpec scanSpec = new MapRDBSubScanSpec().setTableName(tableName);
+      return new JsonTableGroupScan(userName, getStoragePlugin(), this, scanSpec, columns);
+    } else {
+      HBaseScanSpec scanSpec = new HBaseScanSpec(tableName);
+      return new BinaryTableGroupScan(userName, getStoragePlugin(), this, scanSpec, columns);
     }
   }
 
@@ -139,4 +159,15 @@ public class MapRDBFormatPlugin implements FormatPlugin {
     return name;
   }
 
+  public synchronized FileSystemPlugin getStoragePlugin() {
+    if (this.storagePlugin == null) {
+      try {
+        this.storagePlugin = (FileSystemPlugin) (context.getStorage().getPlugin(storageConfig));
+      } catch (ExecutionSetupException e) {
+        throw new RuntimeException(e);
+      }
+    }
+    return storagePlugin;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/f97a3332/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/MapRDBGroupScan.java
----------------------------------------------------------------------
diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/MapRDBGroupScan.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/MapRDBGroupScan.java
index 9358753..cbfb18c 100644
--- a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/MapRDBGroupScan.java
+++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/MapRDBGroupScan.java
@@ -17,7 +17,6 @@
  */
 package org.apache.drill.exec.store.maprdb;
 
-import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Comparator;
@@ -30,163 +29,64 @@ import java.util.NavigableMap;
 import java.util.PriorityQueue;
 import java.util.Queue;
 import java.util.Set;
-import java.util.TreeMap;
 import java.util.concurrent.TimeUnit;
 
-import org.apache.drill.common.exceptions.DrillRuntimeException;
-import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.exec.physical.EndpointAffinity;
 import org.apache.drill.exec.physical.base.AbstractGroupScan;
-import org.apache.drill.exec.physical.base.GroupScan;
-import org.apache.drill.exec.physical.base.PhysicalOperator;
-import org.apache.drill.exec.physical.base.ScanStats;
-import org.apache.drill.exec.physical.base.ScanStats.GroupScanProperty;
 import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
-import org.apache.drill.exec.store.StoragePluginRegistry;
 import org.apache.drill.exec.store.dfs.FileSystemConfig;
 import org.apache.drill.exec.store.dfs.FileSystemPlugin;
-import org.apache.drill.exec.store.hbase.DrillHBaseConstants;
-import org.apache.drill.exec.store.hbase.HBaseScanSpec;
-import org.apache.drill.exec.store.hbase.HBaseSubScan.HBaseSubScanSpec;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.HRegionInfo;
-import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.ServerName;
-import org.apache.hadoop.hbase.client.HTable;
-import org.codehaus.jackson.annotate.JsonCreator;
-
-import com.fasterxml.jackson.annotation.JacksonInject;
+
 import com.fasterxml.jackson.annotation.JsonIgnore;
 import com.fasterxml.jackson.annotation.JsonProperty;
-import com.fasterxml.jackson.annotation.JsonTypeName;
-import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Stopwatch;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 
-@JsonTypeName("maprdb-scan")
-public class MapRDBGroupScan extends AbstractGroupScan implements DrillHBaseConstants {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MapRDBGroupScan.class);
-
-  private static final Comparator<List<HBaseSubScanSpec>> LIST_SIZE_COMPARATOR = new Comparator<List<HBaseSubScanSpec>>() {
-    @Override
-    public int compare(List<HBaseSubScanSpec> list1, List<HBaseSubScanSpec> list2) {
-      return list1.size() - list2.size();
-    }
-  };
-
-  private static final Comparator<List<HBaseSubScanSpec>> LIST_SIZE_COMPARATOR_REV = Collections.reverseOrder(LIST_SIZE_COMPARATOR);
-
-  private List<SchemaPath> columns;
-
-  private HBaseScanSpec hbaseScanSpec;
+public abstract class MapRDBGroupScan extends AbstractGroupScan {
+  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MapRDBGroupScan.class);
 
   private FileSystemPlugin storagePlugin;
 
   private MapRDBFormatPlugin formatPlugin;
 
-  private Stopwatch watch = Stopwatch.createUnstarted();
+  protected List<SchemaPath> columns;
 
-  private Map<Integer, List<HBaseSubScanSpec>> endpointFragmentMapping;
+  protected Map<Integer, List<MapRDBSubScanSpec>> endpointFragmentMapping;
 
-  private NavigableMap<HRegionInfo, ServerName> regionsToScan;
-
-  private HTableDescriptor hTableDesc;
+  protected NavigableMap<TabletFragmentInfo, String> regionsToScan;
 
   private boolean filterPushedDown = false;
 
-  private MapRDBTableStats tableStats;
-
-  @JsonCreator
-  public MapRDBGroupScan(@JsonProperty("userName") final String userName,
-                        @JsonProperty("hbaseScanSpec") HBaseScanSpec hbaseScanSpec,
-                        @JsonProperty("storage") FileSystemConfig storagePluginConfig,
-                        @JsonProperty("format") MapRDBFormatPluginConfig formatPluginConfig,
-                        @JsonProperty("columns") List<SchemaPath> columns,
-                        @JacksonInject StoragePluginRegistry pluginRegistry) throws IOException, ExecutionSetupException {
-    this (userName, (FileSystemPlugin) pluginRegistry.getPlugin(storagePluginConfig),
-            (MapRDBFormatPlugin) pluginRegistry.getFormatPlugin(storagePluginConfig, formatPluginConfig),
-            hbaseScanSpec,
-            columns);
-  }
+  private Stopwatch watch = new Stopwatch();
 
-  public MapRDBGroupScan(String userName, FileSystemPlugin storagePlugin, MapRDBFormatPlugin formatPlugin, HBaseScanSpec scanSpec, List<SchemaPath> columns) {
-    super(userName);
-    this.storagePlugin = storagePlugin;
-    this.formatPlugin = formatPlugin;
-    this.hbaseScanSpec = scanSpec;
-    this.columns = columns;
-    init();
-  }
+  private static final Comparator<List<MapRDBSubScanSpec>> LIST_SIZE_COMPARATOR = new Comparator<List<MapRDBSubScanSpec>>() {
+    @Override
+    public int compare(List<MapRDBSubScanSpec> list1, List<MapRDBSubScanSpec> list2) {
+      return list1.size() - list2.size();
+    }
+  };
 
-  /**
-   * Private constructor, used for cloning.
-   * @param that The HBaseGroupScan to clone
-   */
-  private MapRDBGroupScan(MapRDBGroupScan that) {
+  private static final Comparator<List<MapRDBSubScanSpec>> LIST_SIZE_COMPARATOR_REV = Collections.reverseOrder(LIST_SIZE_COMPARATOR);
+
+  public MapRDBGroupScan(MapRDBGroupScan that) {
     super(that);
     this.columns = that.columns;
-    this.hbaseScanSpec = that.hbaseScanSpec;
-    this.endpointFragmentMapping = that.endpointFragmentMapping;
-    this.regionsToScan = that.regionsToScan;
-    this.storagePlugin = that.storagePlugin;
     this.formatPlugin = that.formatPlugin;
-    this.hTableDesc = that.hTableDesc;
+    this.storagePlugin = that.storagePlugin;
+    this.regionsToScan = that.regionsToScan;
     this.filterPushedDown = that.filterPushedDown;
-    this.tableStats = that.tableStats;
-  }
-
-  @Override
-  public GroupScan clone(List<SchemaPath> columns) {
-    MapRDBGroupScan newScan = new MapRDBGroupScan(this);
-    newScan.columns = columns;
-    newScan.verifyColumns();
-    return newScan;
-  }
-
-  private void init() {
-    logger.debug("Getting region locations");
-    try {
-      HTable table = new HTable(HBaseConfiguration.create(), hbaseScanSpec.getTableName());
-      tableStats = new MapRDBTableStats(table);
-      this.hTableDesc = table.getTableDescriptor();
-      NavigableMap<HRegionInfo, ServerName> regionsMap = table.getRegionLocations();
-      table.close();
-
-      boolean foundStartRegion = false;
-      regionsToScan = new TreeMap<HRegionInfo, ServerName>();
-      for (Entry<HRegionInfo, ServerName> mapEntry : regionsMap.entrySet()) {
-        HRegionInfo regionInfo = mapEntry.getKey();
-        if (!foundStartRegion && hbaseScanSpec.getStartRow() != null && hbaseScanSpec.getStartRow().length != 0 && !regionInfo.containsRow(hbaseScanSpec.getStartRow())) {
-          continue;
-        }
-        foundStartRegion = true;
-        regionsToScan.put(regionInfo, mapEntry.getValue());
-        if (hbaseScanSpec.getStopRow() != null && hbaseScanSpec.getStopRow().length != 0 && regionInfo.containsRow(hbaseScanSpec.getStopRow())) {
-          break;
-        }
-      }
-    } catch (Exception e) {
-      throw new DrillRuntimeException("Error getting region info for table: " + hbaseScanSpec.getTableName(), e);
-    }
-    verifyColumns();
   }
 
-  private void verifyColumns() {
-    /*
-    if (columns != null) {
-      for (SchemaPath column : columns) {
-        if (!(column.equals(ROW_KEY_PATH) || hTableDesc.hasFamily(HBaseUtils.getBytes(column.getRootSegment().getPath())))) {
-          DrillRuntimeException.format("The column family '%s' does not exist in HBase table: %s .",
-              column.getRootSegment().getPath(), hTableDesc.getNameAsString());
-        }
-      }
-    }
-    */
+  public MapRDBGroupScan(FileSystemPlugin storagePlugin,
+      MapRDBFormatPlugin formatPlugin, List<SchemaPath> columns, String userName) {
+    super(userName);
+    this.storagePlugin = storagePlugin;
+    this.formatPlugin = formatPlugin;
+    this.columns = columns;
   }
 
   @Override
@@ -199,8 +99,8 @@ public class MapRDBGroupScan extends AbstractGroupScan implements DrillHBaseCons
     }
 
     Map<DrillbitEndpoint, EndpointAffinity> affinityMap = new HashMap<DrillbitEndpoint, EndpointAffinity>();
-    for (ServerName sn : regionsToScan.values()) {
-      DrillbitEndpoint ep = endpointMap.get(sn.getHostname());
+    for (String serverName : regionsToScan.values()) {
+      DrillbitEndpoint ep = endpointMap.get(serverName);
       if (ep != null) {
         EndpointAffinity affinity = affinityMap.get(ep);
         if (affinity == null) {
@@ -247,7 +147,7 @@ public class MapRDBGroupScan extends AbstractGroupScan implements DrillHBaseCons
      * Initialize these two maps
      */
     for (int i = 0; i < numSlots; ++i) {
-      endpointFragmentMapping.put(i, new ArrayList<HBaseSubScanSpec>(maxPerEndpointSlot));
+      endpointFragmentMapping.put(i, new ArrayList<MapRDBSubScanSpec>(maxPerEndpointSlot));
       String hostname = incomingEndpoints.get(i).getAddress();
       Queue<Integer> hostIndexQueue = endpointHostIndexListMap.get(hostname);
       if (hostIndexQueue == null) {
@@ -257,21 +157,21 @@ public class MapRDBGroupScan extends AbstractGroupScan implements DrillHBaseCons
       hostIndexQueue.add(i);
     }
 
-    Set<Entry<HRegionInfo, ServerName>> regionsToAssignSet = Sets.newHashSet(regionsToScan.entrySet());
+    Set<Entry<TabletFragmentInfo, String>> regionsToAssignSet = Sets.newHashSet(regionsToScan.entrySet());
 
     /*
      * First, we assign regions which are hosted on region servers running on drillbit endpoints
      */
-    for (Iterator<Entry<HRegionInfo, ServerName>> regionsIterator = regionsToAssignSet.iterator(); regionsIterator.hasNext(); /*nothing*/) {
-      Entry<HRegionInfo, ServerName> regionEntry = regionsIterator.next();
+    for (Iterator<Entry<TabletFragmentInfo, String>> regionsIterator = regionsToAssignSet.iterator(); regionsIterator.hasNext(); /*nothing*/) {
+      Entry<TabletFragmentInfo, String> regionEntry = regionsIterator.next();
       /*
        * Test if there is a drillbit endpoint which is also an HBase RegionServer that hosts the current HBase region
        */
-      Queue<Integer> endpointIndexlist = endpointHostIndexListMap.get(regionEntry.getValue().getHostname());
+      Queue<Integer> endpointIndexlist = endpointHostIndexListMap.get(regionEntry.getValue());
       if (endpointIndexlist != null) {
         Integer slotIndex = endpointIndexlist.poll();
-        List<HBaseSubScanSpec> endpointSlotScanList = endpointFragmentMapping.get(slotIndex);
-        endpointSlotScanList.add(regionInfoToSubScanSpec(regionEntry.getKey()));
+        List<MapRDBSubScanSpec> endpointSlotScanList = endpointFragmentMapping.get(slotIndex);
+        endpointSlotScanList.add(getSubScanSpec(regionEntry.getKey()));
         // add to the tail of the slot list, to add more later in round robin fashion
         endpointIndexlist.offer(slotIndex);
         // this region has been assigned
@@ -282,9 +182,9 @@ public class MapRDBGroupScan extends AbstractGroupScan implements DrillHBaseCons
     /*
      * Build priority queues of slots, with ones which has tasks lesser than 'minPerEndpointSlot' and another which have more.
      */
-    PriorityQueue<List<HBaseSubScanSpec>> minHeap = new PriorityQueue<List<HBaseSubScanSpec>>(numSlots, LIST_SIZE_COMPARATOR);
-    PriorityQueue<List<HBaseSubScanSpec>> maxHeap = new PriorityQueue<List<HBaseSubScanSpec>>(numSlots, LIST_SIZE_COMPARATOR_REV);
-    for(List<HBaseSubScanSpec> listOfScan : endpointFragmentMapping.values()) {
+    PriorityQueue<List<MapRDBSubScanSpec>> minHeap = new PriorityQueue<List<MapRDBSubScanSpec>>(numSlots, LIST_SIZE_COMPARATOR);
+    PriorityQueue<List<MapRDBSubScanSpec>> maxHeap = new PriorityQueue<List<MapRDBSubScanSpec>>(numSlots, LIST_SIZE_COMPARATOR_REV);
+    for(List<MapRDBSubScanSpec> listOfScan : endpointFragmentMapping.values()) {
       if (listOfScan.size() < minPerEndpointSlot) {
         minHeap.offer(listOfScan);
       } else if (listOfScan.size() > minPerEndpointSlot){
@@ -296,9 +196,9 @@ public class MapRDBGroupScan extends AbstractGroupScan implements DrillHBaseCons
      * Now, let's process any regions which remain unassigned and assign them to slots with minimum number of assignments.
      */
     if (regionsToAssignSet.size() > 0) {
-      for (Entry<HRegionInfo, ServerName> regionEntry : regionsToAssignSet) {
-        List<HBaseSubScanSpec> smallestList = minHeap.poll();
-        smallestList.add(regionInfoToSubScanSpec(regionEntry.getKey()));
+      for (Entry<TabletFragmentInfo, String> regionEntry : regionsToAssignSet) {
+        List<MapRDBSubScanSpec> smallestList = minHeap.poll();
+        smallestList.add(getSubScanSpec(regionEntry.getKey()));
         if (smallestList.size() < maxPerEndpointSlot) {
           minHeap.offer(smallestList);
         }
@@ -309,8 +209,8 @@ public class MapRDBGroupScan extends AbstractGroupScan implements DrillHBaseCons
      * While there are slots with lesser than 'minPerEndpointSlot' unit work, balance from those with more.
      */
     while(minHeap.peek() != null && minHeap.peek().size() < minPerEndpointSlot) {
-      List<HBaseSubScanSpec> smallestList = minHeap.poll();
-      List<HBaseSubScanSpec> largestList = maxHeap.poll();
+      List<MapRDBSubScanSpec> smallestList = (List<MapRDBSubScanSpec>) minHeap.poll();
+      List<MapRDBSubScanSpec> largestList = (List<MapRDBSubScanSpec>) maxHeap.poll();
       smallestList.add(largestList.remove(largestList.size()-1));
       if (largestList.size() > minPerEndpointSlot) {
         maxHeap.offer(largestList);
@@ -329,77 +229,21 @@ public class MapRDBGroupScan extends AbstractGroupScan implements DrillHBaseCons
         watch.elapsed(TimeUnit.NANOSECONDS)/1000, incomingEndpoints, endpointFragmentMapping.toString());
   }
 
-  private HBaseSubScanSpec regionInfoToSubScanSpec(HRegionInfo ri) {
-    HBaseScanSpec spec = hbaseScanSpec;
-    HBaseSubScanSpec subScanSpec = new HBaseSubScanSpec(spec.getTableName(),
-            regionsToScan.get(ri).getHostname(),
-            (!isNullOrEmpty(spec.getStartRow()) && ri.containsRow(spec.getStartRow())) ? spec.getStartRow() : ri.getStartKey(),
-            (!isNullOrEmpty(spec.getStopRow()) && ri.containsRow(spec.getStopRow())) ? spec.getStopRow() : ri.getEndKey(),
-            spec.getSerializedFilter(),
-            null);
-    return subScanSpec;
-  }
-
-  private boolean isNullOrEmpty(byte[] key) {
-    return key == null || key.length == 0;
-  }
-
-  @Override
-  public MapRDBSubScan getSpecificScan(int minorFragmentId) {
-    assert minorFragmentId < endpointFragmentMapping.size() : String.format(
-        "Mappings length [%d] should be greater than minor fragment id [%d] but it isn't.", endpointFragmentMapping.size(),
-        minorFragmentId);
-    return new MapRDBSubScan(getUserName(), storagePlugin, storagePlugin.getConfig(), endpointFragmentMapping.get(minorFragmentId), columns);
-  }
-
   @Override
   public int getMaxParallelizationWidth() {
     return regionsToScan.size();
   }
 
-  @Override
-  public ScanStats getScanStats() {
-    //TODO: look at stats for this.
-    long rowCount = (long) ((hbaseScanSpec.getFilter() != null ? .5 : 1) * tableStats.getNumRows());
-    int avgColumnSize = 10;
-    int numColumns = (columns == null || columns.isEmpty()) ? 100 : columns.size();
-    return new ScanStats(GroupScanProperty.NO_EXACT_ROW_COUNT, rowCount, 1, avgColumnSize * numColumns * rowCount);
-  }
-
-  @Override
-  @JsonIgnore
-  public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) {
-    Preconditions.checkArgument(children.isEmpty());
-    return new MapRDBGroupScan(this);
-  }
-
   @JsonIgnore
   public MapRDBFormatPlugin getFormatPlugin() {
     return formatPlugin;
   }
 
-  @JsonIgnore
-  public Configuration getHBaseConf() {
-    return HBaseConfiguration.create();
-  }
-
-  @JsonIgnore
-  public String getTableName() {
-    return getHBaseScanSpec().getTableName();
-  }
-
   @Override
   public String getDigest() {
     return toString();
   }
 
-  @Override
-  public String toString() {
-    return "MapRDBGroupScan [HBaseScanSpec="
-        + hbaseScanSpec + ", columns="
-        + columns + "]";
-  }
-
   @JsonProperty("storage")
   public FileSystemConfig getStorageConfig() {
     return (FileSystemConfig) storagePlugin.getConfig();
@@ -407,7 +251,7 @@ public class MapRDBGroupScan extends AbstractGroupScan implements DrillHBaseCons
 
   @JsonIgnore
   public FileSystemPlugin getStoragePlugin(){
-	  return storagePlugin;
+    return storagePlugin;
   }
 
   @JsonProperty
@@ -415,11 +259,6 @@ public class MapRDBGroupScan extends AbstractGroupScan implements DrillHBaseCons
     return columns;
   }
 
-  @JsonProperty
-  public HBaseScanSpec getHBaseScanSpec() {
-    return hbaseScanSpec;
-  }
-
   @JsonIgnore
   public boolean canPushdownProjects(List<SchemaPath> columns) {
     return true;
@@ -435,29 +274,6 @@ public class MapRDBGroupScan extends AbstractGroupScan implements DrillHBaseCons
     return filterPushedDown;
   }
 
-  /**
-   * Empty constructor, do not use, only for testing.
-   */
-  @VisibleForTesting
-  public MapRDBGroupScan() {
-    super((String)null);
-  }
-
-  /**
-   * Do not use, only for testing.
-   */
-  @VisibleForTesting
-  public void setHBaseScanSpec(HBaseScanSpec hbaseScanSpec) {
-    this.hbaseScanSpec = hbaseScanSpec;
-  }
-
-  /**
-   * Do not use, only for testing.
-   */
-  @JsonIgnore
-  @VisibleForTesting
-  public void setRegionsToScan(NavigableMap<HRegionInfo, ServerName> regionsToScan) {
-    this.regionsToScan = regionsToScan;
-  }
+  protected abstract MapRDBSubScanSpec getSubScanSpec(TabletFragmentInfo key);
 
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/f97a3332/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/MapRDBPushFilterIntoScan.java
----------------------------------------------------------------------
diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/MapRDBPushFilterIntoScan.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/MapRDBPushFilterIntoScan.java
deleted file mode 100644
index 50f3d95..0000000
--- a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/MapRDBPushFilterIntoScan.java
+++ /dev/null
@@ -1,144 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.drill.exec.store.maprdb;
-
-import org.apache.drill.common.expression.LogicalExpression;
-import org.apache.drill.exec.planner.logical.DrillOptiq;
-import org.apache.drill.exec.planner.logical.DrillParseContext;
-import org.apache.drill.exec.planner.logical.RelOptHelper;
-import org.apache.drill.exec.planner.physical.FilterPrel;
-import org.apache.drill.exec.planner.physical.PrelUtil;
-import org.apache.drill.exec.planner.physical.ProjectPrel;
-import org.apache.drill.exec.planner.physical.ScanPrel;
-import org.apache.drill.exec.store.StoragePluginOptimizerRule;
-import org.apache.drill.exec.store.hbase.HBaseScanSpec;
-import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.plan.RelOptRuleCall;
-import org.apache.calcite.plan.RelOptRuleOperand;
-import org.apache.calcite.plan.RelOptUtil;
-import org.apache.calcite.rex.RexNode;
-
-import com.google.common.collect.ImmutableList;
-
-public abstract class MapRDBPushFilterIntoScan extends StoragePluginOptimizerRule {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MapRDBPushFilterIntoScan.class);
-	
-//  public static final StoragePluginOptimizerRule INSTANCE = new MapRDBPushFilterIntoScan();
-
-  private MapRDBPushFilterIntoScan(RelOptRuleOperand operand, String description) {
-    super(operand, description);
-  }
-
-  public static final StoragePluginOptimizerRule FILTER_ON_SCAN = new MapRDBPushFilterIntoScan(RelOptHelper.some(FilterPrel.class, RelOptHelper.any(ScanPrel.class)), "MapRDBPushFilterIntoScan:Filter_On_Scan") {
-
-    @Override
-    public void onMatch(RelOptRuleCall call) {
-      final ScanPrel scan = (ScanPrel) call.rel(1);
-      final FilterPrel filter = (FilterPrel) call.rel(0);
-      final RexNode condition = filter.getCondition();
-
-      MapRDBGroupScan groupScan = (MapRDBGroupScan)scan.getGroupScan();
-      if (groupScan.isFilterPushedDown()) {
-        /*
-         * The rule can get triggered again due to the transformed "scan => filter" sequence
-         * created by the earlier execution of this rule when we could not do a complete
-         * conversion of Optiq Filter's condition to HBase Filter. In such cases, we rely upon
-         * this flag to not do a re-processing of the rule on the already transformed call.
-         */
-        return;
-      }
-
-      doPushFilterToScan(call, filter, null, scan, groupScan, condition);
-    }
-
-    @Override
-    public boolean matches(RelOptRuleCall call) {
-      final ScanPrel scan = (ScanPrel) call.rel(1);
-      if (scan.getGroupScan() instanceof MapRDBGroupScan) {
-        return super.matches(call);
-      }
-      return false;
-    }
-  };
-
-  public static final StoragePluginOptimizerRule FILTER_ON_PROJECT = new MapRDBPushFilterIntoScan(RelOptHelper.some(FilterPrel.class, RelOptHelper.some(ProjectPrel.class, RelOptHelper.any(ScanPrel.class))), "MapRDBPushFilterIntoScan:Filter_On_Project") {
-
-    @Override
-    public void onMatch(RelOptRuleCall call) {
-      final ScanPrel scan = (ScanPrel) call.rel(2);
-      final ProjectPrel project = (ProjectPrel) call.rel(1);
-      final FilterPrel filter = (FilterPrel) call.rel(0);
-
-      MapRDBGroupScan groupScan = (MapRDBGroupScan)scan.getGroupScan();
-      if (groupScan.isFilterPushedDown()) {
-        /*
-         * The rule can get triggered again due to the transformed "scan => filter" sequence
-         * created by the earlier execution of this rule when we could not do a complete
-         * conversion of Optiq Filter's condition to HBase Filter. In such cases, we rely upon
-         * this flag to not do a re-processing of the rule on the already transformed call.
-         */
-         return;
-      }
-
-      // convert the filter to one that references the child of the project
-      final RexNode condition =  RelOptUtil.pushFilterPastProject(filter.getCondition(), project);
-
-      doPushFilterToScan(call, filter, project, scan, groupScan, condition);
-    }
-
-    @Override
-    public boolean matches(RelOptRuleCall call) {
-      final ScanPrel scan = (ScanPrel) call.rel(2);
-      if (scan.getGroupScan() instanceof MapRDBGroupScan) {
-        return super.matches(call);
-      }
-      return false;
-    }
-  };
-
-  protected void doPushFilterToScan(final RelOptRuleCall call, final FilterPrel filter, final ProjectPrel project, final ScanPrel scan, final MapRDBGroupScan groupScan, final RexNode condition) {
-
-    final LogicalExpression conditionExp = DrillOptiq.toDrill(new DrillParseContext(PrelUtil.getPlannerSettings(call.getPlanner())), scan, condition);
-    final MapRDBFilterBuilder maprdbFilterBuilder = new MapRDBFilterBuilder(groupScan, conditionExp);
-    final HBaseScanSpec newScanSpec = maprdbFilterBuilder.parseTree();
-    if (newScanSpec == null) {
-      return; //no filter pushdown ==> No transformation.
-    }
-
-    final MapRDBGroupScan newGroupsScan = new MapRDBGroupScan(groupScan.getUserName(), groupScan.getStoragePlugin(),
-                                                              groupScan.getFormatPlugin(), newScanSpec, groupScan.getColumns());
-    newGroupsScan.setFilterPushedDown(true);
-
-    final ScanPrel newScanPrel = ScanPrel.create(scan, filter.getTraitSet(), newGroupsScan, scan.getRowType());
-
-    // Depending on whether is a project in the middle, assign either scan or copy of project to childRel.
-    final RelNode childRel = project == null ? newScanPrel : project.copy(project.getTraitSet(), ImmutableList.of((RelNode)newScanPrel));;
-
-    if (maprdbFilterBuilder.isAllExpressionsConverted()) {
-        /*
-         * Since we could convert the entire filter condition expression into an HBase filter,
-         * we can eliminate the filter operator altogether.
-         */
-      call.transformTo(childRel);
-    } else {
-      call.transformTo(filter.copy(filter.getTraitSet(), ImmutableList.of(childRel)));
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/drill/blob/f97a3332/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/MapRDBScanBatchCreator.java
----------------------------------------------------------------------
diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/MapRDBScanBatchCreator.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/MapRDBScanBatchCreator.java
index 1beabc9..058de61 100644
--- a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/MapRDBScanBatchCreator.java
+++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/MapRDBScanBatchCreator.java
@@ -26,7 +26,10 @@ import org.apache.drill.exec.physical.impl.ScanBatch;
 import org.apache.drill.exec.record.RecordBatch;
 import org.apache.drill.exec.store.RecordReader;
 import org.apache.drill.exec.store.hbase.HBaseRecordReader;
-import org.apache.drill.exec.store.hbase.HBaseSubScan;
+import org.apache.drill.exec.store.hbase.HBaseSubScan.HBaseSubScanSpec;
+import org.apache.drill.exec.store.maprdb.binary.BinaryTableGroupScan;
+import org.apache.drill.exec.store.maprdb.json.MaprDBJsonRecordReader;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 
 import com.google.common.base.Preconditions;
@@ -39,11 +42,14 @@ public class MapRDBScanBatchCreator implements BatchCreator<MapRDBSubScan>{
   public ScanBatch getBatch(FragmentContext context, MapRDBSubScan subScan, List<RecordBatch> children) throws ExecutionSetupException {
     Preconditions.checkArgument(children.isEmpty());
     List<RecordReader> readers = Lists.newArrayList();
-    for(HBaseSubScan.HBaseSubScanSpec scanSpec : subScan.getRegionScanSpecList()){
+    Configuration conf = HBaseConfiguration.create();
+    for(MapRDBSubScanSpec scanSpec : subScan.getRegionScanSpecList()){
       try {
-        readers.add(
-            new HBaseRecordReader(HBaseConfiguration.create(), scanSpec, subScan.getColumns(), context)
-        );
+        if (BinaryTableGroupScan.TABLE_BINARY.equals(subScan.getTableType())) {
+          readers.add(new HBaseRecordReader(conf, getHBaseSubScanSpec(scanSpec), subScan.getColumns(), context));
+        } else {
+          readers.add(new MaprDBJsonRecordReader(scanSpec, subScan.getColumns(), context));
+        }
       } catch (Exception e1) {
         throw new ExecutionSetupException(e1);
       }
@@ -51,4 +57,9 @@ public class MapRDBScanBatchCreator implements BatchCreator<MapRDBSubScan>{
     return new ScanBatch(subScan, context, readers.iterator());
   }
 
+  private HBaseSubScanSpec getHBaseSubScanSpec(MapRDBSubScanSpec scanSpec) {
+    return new HBaseSubScanSpec(scanSpec.getTableName(), scanSpec.getRegionServer(),
+        scanSpec.getStartRow(), scanSpec.getStopRow(), scanSpec.getSerializedFilter(), null);
+  }
+
 }