You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by ad...@apache.org on 2016/09/13 01:32:30 UTC

[43/50] [abbrv] drill git commit: MD-572: Column names in MapR-DB JSON tables are case-sensitive

MD-572: Column names in MapR-DB JSON tables are case-sensitive

Disable pushdown of both filter and projects (by default set to false).

This will allow Drill to handle both of these operators in a case-insensitive way.


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/156819d0
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/156819d0
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/156819d0

Branch: refs/heads/master
Commit: 156819d0470b95a2822c03a498ed5c5b872d2022
Parents: c74d75c
Author: Aditya <ad...@mapr.com>
Authored: Tue Mar 8 16:57:52 2016 -0800
Committer: Aditya Kishore <ad...@apache.org>
Committed: Fri Sep 9 10:08:39 2016 -0700

----------------------------------------------------------------------
 .../store/mapr/db/MapRDBFormatPluginConfig.java |  16 +-
 .../store/mapr/db/MapRDBPushFilterIntoScan.java |   4 +-
 .../store/mapr/db/json/JsonTableGroupScan.java  |  11 +-
 .../mapr/db/json/MaprDBJsonRecordReader.java    | 382 +++++++------------
 .../drill/maprdb/tests/json/TestSimpleJson.java |  32 +-
 5 files changed, 200 insertions(+), 245 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/156819d0/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBFormatPluginConfig.java
----------------------------------------------------------------------
diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBFormatPluginConfig.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBFormatPluginConfig.java
index 82b360c..7295265 100644
--- a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBFormatPluginConfig.java
+++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBFormatPluginConfig.java
@@ -27,8 +27,9 @@ import com.fasterxml.jackson.annotation.JsonTypeName;
 @JsonTypeName("maprdb")  @JsonInclude(Include.NON_DEFAULT)
 public class MapRDBFormatPluginConfig extends TableFormatPluginConfig {
 
-  private boolean allTextMode = false;
-  private boolean readAllNumbersAsDouble = false;
+  public boolean allTextMode = false;
+  public boolean readAllNumbersAsDouble = false;
+  public boolean enablePushdown = true;
 
   @Override
   public int hashCode() {
@@ -42,6 +43,8 @@ public class MapRDBFormatPluginConfig extends TableFormatPluginConfig {
       return false;
     } else if (allTextMode != other.allTextMode) {
       return false;
+    } else if (enablePushdown != other.enablePushdown) {
+      return false;
     }
 
     return true;
@@ -65,4 +68,13 @@ public class MapRDBFormatPluginConfig extends TableFormatPluginConfig {
     readAllNumbersAsDouble = read;
   }
 
+  public boolean isEnablePushdown() {
+    return enablePushdown;
+  }
+
+  @JsonProperty("enablePushdown")
+  public void setEnablePushdown(boolean enablePushdown) {
+    this.enablePushdown = enablePushdown;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/156819d0/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBPushFilterIntoScan.java
----------------------------------------------------------------------
diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBPushFilterIntoScan.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBPushFilterIntoScan.java
index 7292182..6a286a8 100644
--- a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBPushFilterIntoScan.java
+++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBPushFilterIntoScan.java
@@ -112,7 +112,8 @@ public abstract class MapRDBPushFilterIntoScan extends StoragePluginOptimizerRul
       FilterPrel filter, final ProjectPrel project, ScanPrel scan,
       JsonTableGroupScan groupScan, RexNode condition) {
 
-    if (groupScan.isFilterPushedDown()) {
+    if (groupScan.isDisablePushdown() // Do not pushdown filter if it is disabled in plugin configuration
+        || groupScan.isFilterPushedDown()) { // see below
       /*
        * The rule can get triggered again due to the transformed "scan => filter" sequence
        * created by the earlier execution of this rule when we could not do a complete
@@ -202,4 +203,5 @@ public abstract class MapRDBPushFilterIntoScan extends StoragePluginOptimizerRul
       call.transformTo(filter.copy(filter.getTraitSet(), ImmutableList.of(childRel)));
     }
   }
+
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/156819d0/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/JsonTableGroupScan.java
----------------------------------------------------------------------
diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/JsonTableGroupScan.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/JsonTableGroupScan.java
index 9e23af7..0c8ffda 100644
--- a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/JsonTableGroupScan.java
+++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/JsonTableGroupScan.java
@@ -112,7 +112,7 @@ public class JsonTableGroupScan extends MapRDBGroupScan {
       regionsToScan = new TreeMap<TabletFragmentInfo, String>();
       for (TabletInfo tabletInfo : tabletInfos) {
         TabletInfoImpl tabletInfoImpl = (TabletInfoImpl) tabletInfo;
-        if (!foundStartRegion 
+        if (!foundStartRegion
             && !isNullOrEmpty(scanSpec.getStartRow())
             && !tabletInfoImpl.containsRow(scanSpec.getStartRow())) {
           continue;
@@ -171,6 +171,15 @@ public class JsonTableGroupScan extends MapRDBGroupScan {
     return scanSpec.getTableName();
   }
 
+  public boolean isDisablePushdown() {
+    return !formatPluginConfig.isEnablePushdown();
+  }
+
+  @JsonIgnore
+  public boolean canPushdownProjects(List<SchemaPath> columns) {
+    return formatPluginConfig.isEnablePushdown();
+  }
+
   @Override
   public String toString() {
     return "JsonTableGroupScan [ScanSpec=" + scanSpec + ", columns=" + columns + "]";

http://git-wip-us.apache.org/repos/asf/drill/blob/156819d0/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/MaprDBJsonRecordReader.java
----------------------------------------------------------------------
diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/MaprDBJsonRecordReader.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/MaprDBJsonRecordReader.java
index 7fbcd1b..cb86e32 100644
--- a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/MaprDBJsonRecordReader.java
+++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/MaprDBJsonRecordReader.java
@@ -18,10 +18,8 @@
 package org.apache.drill.exec.store.mapr.db.json;
 
 import static org.ojai.DocumentConstants.ID_KEY;
-import io.netty.buffer.DrillBuf;
 
 import java.nio.ByteBuffer;
-import java.nio.charset.Charset;
 import java.util.Collection;
 import java.util.Iterator;
 import java.util.List;
@@ -42,11 +40,8 @@ import org.apache.drill.exec.store.AbstractRecordReader;
 import org.apache.drill.exec.store.mapr.db.MapRDBFormatPluginConfig;
 import org.apache.drill.exec.store.mapr.db.MapRDBSubScanSpec;
 import org.apache.drill.exec.vector.BaseValueVector;
+import org.apache.drill.exec.vector.complex.impl.MapOrListWriterImpl;
 import org.apache.drill.exec.vector.complex.impl.VectorContainerWriter;
-import org.apache.drill.exec.vector.complex.writer.BaseWriter.ListWriter;
-import org.apache.drill.exec.vector.complex.writer.BaseWriter.MapWriter;
-import org.apache.drill.exec.vector.complex.writer.VarBinaryWriter;
-import org.apache.drill.exec.vector.complex.writer.VarCharWriter;
 import org.ojai.DocumentReader;
 import org.ojai.DocumentReader.EventType;
 import org.ojai.DocumentStream;
@@ -54,7 +49,6 @@ import org.ojai.FieldPath;
 import org.ojai.FieldSegment;
 import org.ojai.Value;
 import org.ojai.store.QueryCondition;
-import org.ojai.types.OTime;
 
 import com.google.common.base.Preconditions;
 import com.google.common.base.Stopwatch;
@@ -68,10 +62,13 @@ import com.mapr.db.ojai.DBDocumentReaderBase;
 import com.mapr.db.util.ByteBufs;
 import com.mapr.org.apache.hadoop.hbase.util.Bytes;
 
+import io.netty.buffer.DrillBuf;
+
 public class MaprDBJsonRecordReader extends AbstractRecordReader {
   private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MaprDBJsonRecordReader.class);
 
   public static final SchemaPath ID_PATH = SchemaPath.getSimplePath(ID_KEY);
+  private final long MILLISECONDS_IN_A_DAY  = (long)1000 * 60 * 60 * 24;
 
   private Table table;
   private QueryCondition condition;
@@ -79,7 +76,7 @@ public class MaprDBJsonRecordReader extends AbstractRecordReader {
 
   private final String tableName;
   private OperatorContext operatorContext;
-  private VectorContainerWriter writer;
+  private VectorContainerWriter vectorWriter;
 
   private DrillBuf buffer;
 
@@ -91,8 +88,8 @@ public class MaprDBJsonRecordReader extends AbstractRecordReader {
   private boolean idOnly;
   private final boolean unionEnabled;
   private final boolean readNumbersAsDouble;
+  private boolean disablePushdown;
   private final boolean allTextMode;
-  private final long MILLISECONDS_IN_A_DAY  = (long)1000 * 60 * 60 * 24;
 
   public MaprDBJsonRecordReader(MapRDBSubScanSpec subScanSpec,
       MapRDBFormatPluginConfig formatPluginConfig,
@@ -114,12 +111,13 @@ public class MaprDBJsonRecordReader extends AbstractRecordReader {
     unionEnabled = context.getOptions().getOption(ExecConstants.ENABLE_UNION_TYPE);
     readNumbersAsDouble = formatPluginConfig.isReadAllNumbersAsDouble();
     allTextMode = formatPluginConfig.isAllTextMode();
+    disablePushdown = !formatPluginConfig.isEnablePushdown();
   }
 
   @Override
   protected Collection<SchemaPath> transformColumns(Collection<SchemaPath> columns) {
     Set<SchemaPath> transformed = Sets.newLinkedHashSet();
-    if (!isStarQuery()) {
+    if (!isStarQuery() && !disablePushdown) {
       Set<FieldPath> projectedFieldsSet = Sets.newTreeSet();
       for (SchemaPath column : columns) {
         if (column.getRootSegment().getPath().equalsIgnoreCase(ID_KEY)) {
@@ -154,7 +152,7 @@ public class MaprDBJsonRecordReader extends AbstractRecordReader {
 
   @Override
   public void setup(OperatorContext context, OutputMutator output) throws ExecutionSetupException {
-    this.writer = new VectorContainerWriter(output, unionEnabled);
+    this.vectorWriter = new VectorContainerWriter(output, unionEnabled);
     this.operatorContext = context;
 
     try {
@@ -172,8 +170,8 @@ public class MaprDBJsonRecordReader extends AbstractRecordReader {
     Stopwatch watch = Stopwatch.createUnstarted();
     watch.start();
 
-    writer.allocate();
-    writer.reset();
+    vectorWriter.allocate();
+    vectorWriter.reset();
 
     int recordCount = 0;
     DBDocumentReaderBase reader = null;
@@ -182,24 +180,18 @@ public class MaprDBJsonRecordReader extends AbstractRecordReader {
       try {
         reader = nextDocumentReader();
         if (reader == null) break;
-        writer.setPosition(recordCount);
+
+        vectorWriter.setPosition(recordCount);
+        MapOrListWriterImpl writer = new MapOrListWriterImpl(vectorWriter.rootAsMap());
         if (idOnly) {
           Value id = reader.getId();
-          MapWriter map = writer.rootAsMap();
-
           try {
             switch(id.getType()) {
             case STRING:
-              writeString(map.varChar(ID_KEY), id.getString());
-              recordCount++;
+              writeString(writer, ID_KEY, id.getString());
               break;
             case BINARY:
-              if (allTextMode) {
-                writeString(map.varChar(ID_KEY), new String(id.getBinary().array(), Charset.forName("UTF-8")));
-              } else {
-                writeBinary(map.varBinary(ID_KEY), id.getBinary());
-              }
-              recordCount++;
+              writeBinary(writer, ID_KEY, id.getBinary());
               break;
             default:
               throw new UnsupportedOperationException(id.getType() +
@@ -213,9 +205,9 @@ public class MaprDBJsonRecordReader extends AbstractRecordReader {
           if (reader.next() != EventType.START_MAP) {
             throw dataReadError("The document did not start with START_MAP!");
           }
-          writeToMap(reader, writer.rootAsMap());
-          recordCount++;
+          writeToListOrMap(writer, reader);
         }
+        recordCount++;
       } catch (UserException e) {
         throw UserException.unsupportedError(e)
             .addContext(String.format("Table: %s, document id: '%s'",
@@ -225,132 +217,74 @@ public class MaprDBJsonRecordReader extends AbstractRecordReader {
       }
     }
 
-    writer.setValueCount(recordCount);
+    vectorWriter.setValueCount(recordCount);
     logger.debug("Took {} ms to get {} records", watch.elapsed(TimeUnit.MILLISECONDS), recordCount);
     return recordCount;
   }
 
-  private void writeToMap(DBDocumentReaderBase reader, MapWriter map) {
-    map.start();
+  private void writeToListOrMap(MapOrListWriterImpl writer, DBDocumentReaderBase reader) {
+    String fieldName = null;
+    writer.start();
     outside: while (true) {
       EventType event = reader.next();
-      if (event == null || event == EventType.END_MAP) break outside;
+      if (event == null
+          || event == EventType.END_MAP
+          || event == EventType.END_ARRAY) {
+        break outside;
+      } else if (reader.inMap()) {
+        fieldName = reader.getFieldName();
+      }
 
-      String fieldName = reader.getFieldName();
       try {
         switch (event) {
         case NULL:
           break; // not setting the field will leave it as null
         case BINARY:
-          if (allTextMode) {
-            writeString(map.varChar(fieldName), new String(reader.getBinary().array(), Charset.forName("UTF-8")));
-          } else {
-            writeBinary(map.varBinary(fieldName), reader.getBinary());
-          }
+          writeBinary(writer, fieldName, reader.getBinary());
           break;
         case BOOLEAN:
-          if (allTextMode) {
-            writeString(map.varChar(fieldName), String.valueOf(reader.getBoolean()));
-          } else {
-            map.bit(fieldName).writeBit(reader.getBoolean() ? 1 : 0);
-          }
+          writeBoolean(writer, fieldName, reader);
           break;
         case STRING:
-          writeString(map.varChar(fieldName), reader.getString());
+          writeString(writer, fieldName, reader.getString());
           break;
         case BYTE:
-          if (allTextMode) {
-            writeString(map.varChar(fieldName), String.valueOf(reader.getByte()));
-          } else if (readNumbersAsDouble) {
-            map.float8(fieldName).writeFloat8(reader.getByte());
-          } else {
-            map.tinyInt(fieldName).writeTinyInt(reader.getByte());
-          }
+          writeByte(writer, fieldName, reader);
           break;
         case SHORT:
-          if (allTextMode) {
-            writeString(map.varChar(fieldName), String.valueOf(reader.getShort()));
-          } else if (readNumbersAsDouble) {
-            map.float8(fieldName).writeFloat8(reader.getShort());
-          } else {
-            map.smallInt(fieldName).writeSmallInt(reader.getShort());
-          }
+          writeShort(writer, fieldName, reader);
           break;
         case INT:
-          if (allTextMode) {
-            writeString(map.varChar(fieldName), String.valueOf(reader.getInt()));
-          } else if (readNumbersAsDouble) {
-            map.float8(fieldName).writeFloat8(reader.getInt());
-          } else {
-            map.integer(fieldName).writeInt(reader.getInt());
-          }
+          writeInt(writer, fieldName, reader);
           break;
         case LONG:
-          if (allTextMode) {
-            writeString(map.varChar(fieldName), String.valueOf(reader.getLong()));
-          } else if (readNumbersAsDouble) {
-            map.float8(fieldName).writeFloat8(reader.getLong());
-          } else {
-            map.bigInt(fieldName).writeBigInt(reader.getLong());
-          }
+          writeLong(writer, fieldName, reader);
           break;
         case FLOAT:
-          if (allTextMode) {
-            writeString(map.varChar(fieldName), String.valueOf(reader.getFloat()));
-          } else if (readNumbersAsDouble) {
-            map.float8(fieldName).writeFloat8(reader.getFloat());
-          } else {
-            map.float4(fieldName).writeFloat4(reader.getFloat());
-          }
+          writeFloat(writer, fieldName, reader);
           break;
         case DOUBLE:
-          if (allTextMode) {
-            writeString(map.varChar(fieldName), String.valueOf(reader.getDouble()));
-          } else {
-            map.float8(fieldName).writeFloat8(reader.getDouble());
-          }
+          writeDouble(writer, fieldName, reader);
           break;
         case DECIMAL:
           throw unsupportedError("Decimal type is currently not supported.");
         case DATE:
-          if (allTextMode) {
-            writeString(map.varChar(fieldName), reader.getDate().toString());
-          } else {
-
-            long milliSecondsSinceEpoch = reader.getDate().toDaysSinceEpoch() * MILLISECONDS_IN_A_DAY;
-            map.date(fieldName).writeDate(milliSecondsSinceEpoch);
-          }
+          writeDate(writer, fieldName, reader);
           break;
         case TIME:
-          if (allTextMode) {
-            writeString(map.varChar(fieldName), reader.getTime().toString());
-          } else {
-            OTime t = reader.getTime();
-            int h = t.getHour();
-            int m = t.getMinute();
-            int s = t.getSecond();
-            int ms = t.getMilliSecond();
-            int millisOfDay = ms + (s + ((m + (h * 60)) * 60)) * 1000;
-            map.time(fieldName).writeTime(millisOfDay);
-          }
+          writeTime(writer, fieldName, reader);
           break;
         case TIMESTAMP:
-          if (allTextMode) {
-            writeString(map.varChar(fieldName), reader.getTimestamp().toString());
-          } else {
-            map.timeStamp(fieldName).writeTimeStamp(reader.getTimestampLong());
-          }
+          writeTimeStamp(writer, fieldName, reader);
           break;
         case INTERVAL:
           throw unsupportedError("Interval type is currently not supported.");
         case START_MAP:
-          writeToMap(reader, map.map(fieldName));
+          writeToListOrMap((MapOrListWriterImpl) (reader.inMap() ? writer.map(fieldName) : writer.listoftmap(fieldName)), reader);
           break;
         case START_ARRAY:
-          writeToList(reader, map.list(fieldName));
+          writeToListOrMap((MapOrListWriterImpl) writer.list(fieldName), reader);
           break;
-        case END_ARRAY:
-          throw dataReadError("Encountered an END_ARRAY event inside a map.");
         default:
           throw unsupportedError("Unsupported type: %s encountered during the query.", event);
         }
@@ -359,145 +293,115 @@ public class MaprDBJsonRecordReader extends AbstractRecordReader {
             IdCodec.asString(reader.getId()), fieldName), e);
       }
     }
-    map.end();
+    writer.end();
   }
 
-  private void writeToList(DBDocumentReaderBase reader, ListWriter list) {
-    list.startList();
-    outside: while (true) {
-      EventType event = reader.next();
-      if (event == null || event == EventType.END_ARRAY) break outside;
-
-      switch (event) {
-      case NULL:
-        throw unsupportedError("Null values are not supported in lists.");
-      case BINARY:
-        if (allTextMode) {
-          writeString(list.varChar(), new String(reader.getBinary().array(), Charset.forName("UTF-8")));
-        } else {
-          writeBinary(list.varBinary(), reader.getBinary());
-        }
-        break;
-      case BOOLEAN:
-        if (allTextMode) {
-          writeString(list.varChar(), String.valueOf(reader.getBoolean()));
-        } else {
-          list.bit().writeBit(reader.getBoolean() ? 1 : 0);
-        }
-        break;
-      case STRING:
-        writeString(list.varChar(), reader.getString());
-        break;
-      case BYTE:
-        if (allTextMode) {
-          writeString(list.varChar(), String.valueOf(reader.getByte()));
-        } else if (readNumbersAsDouble) {
-          list.float8().writeFloat8(reader.getByte());
-        } else {
-          list.tinyInt().writeTinyInt(reader.getByte());
-        }
-        break;
-      case SHORT:
-        if (allTextMode) {
-          writeString(list.varChar(), String.valueOf(reader.getShort()));
-        } else if (readNumbersAsDouble) {
-          list.float8().writeFloat8(reader.getShort());
-        } else {
-          list.smallInt().writeSmallInt(reader.getShort());
-        }
-        break;
-      case INT:
-        if (allTextMode) {
-          writeString(list.varChar(), String.valueOf(reader.getInt()));
-        } else if (readNumbersAsDouble) {
-          list.float8().writeFloat8(reader.getInt());
-        } else {
-          list.integer().writeInt(reader.getInt());
-        }
-        break;
-      case LONG:
-        if (allTextMode) {
-          writeString(list.varChar(), String.valueOf(reader.getLong()));
-        } else if (readNumbersAsDouble) {
-          list.float8().writeFloat8(reader.getLong());
-        } else {
-          list.bigInt().writeBigInt(reader.getLong());
-        }
-        break;
-      case FLOAT:
-        if (allTextMode) {
-          writeString(list.varChar(), String.valueOf(reader.getFloat()));
-        } else if (readNumbersAsDouble) {
-          list.float8().writeFloat8(reader.getFloat());
-        } else {
-          list.float4().writeFloat4(reader.getFloat());
-        }
-        break;
-      case DOUBLE:
-        if (allTextMode) {
-          writeString(list.varChar(), String.valueOf(reader.getDouble()));
-        } else {
-          list.float8().writeFloat8(reader.getDouble());
-        }
-        break;
-      case DECIMAL:
-        throw unsupportedError("Decimals are currently not supported.");
-      case DATE:
-        if (allTextMode) {
-          writeString(list.varChar(), reader.getDate().toString());
-        } else {
-          long milliSecondsSinceEpoch = reader.getDate().toDaysSinceEpoch() * MILLISECONDS_IN_A_DAY;
-          list.date().writeDate(milliSecondsSinceEpoch);
-        }
-        break;
-      case TIME:
-        if (allTextMode) {
-          writeString(list.varChar(), reader.getTime().toString());
-        } else {
-          OTime t = reader.getTime();
-          int h = t.getHour();
-          int m = t.getMinute();
-          int s = t.getSecond();
-          int ms = t.getMilliSecond();
-          int millisOfDay = ms + (s + ((m + (h * 60)) * 60)) * 1000;
-          list.time().writeTime(millisOfDay);
-        }
-        break;
-      case TIMESTAMP:
-        if (allTextMode) {
-          writeString(list.varChar(), reader.getTimestamp().toString());
-        } else {
-          list.timeStamp().writeTimeStamp(reader.getTimestampLong());
-        }
-        break;
-      case INTERVAL:
-        throw unsupportedError("Interval is currently not supported.");
-      case START_MAP:
-        writeToMap(reader, list.map());
-        break;
-      case END_MAP:
-        throw dataReadError("Encountered an END_MAP event inside a list.");
-      case START_ARRAY:
-        writeToList(reader, list.list());
-        break;
-      default:
-        throw unsupportedError("Unsupported type: %s encountered during the query.%s", event);
-      }
+  private void writeTimeStamp(MapOrListWriterImpl writer, String fieldName, DBDocumentReaderBase reader) {
+    if (allTextMode) {
+      writeString(writer, fieldName, reader.getTimestamp().toUTCString());
+    } else {
+      ((writer.map != null) ? writer.map.timeStamp(fieldName) : writer.list.timeStamp()).writeTimeStamp(reader.getTimestampLong());
+    }
+  }
+
+  private void writeTime(MapOrListWriterImpl writer, String fieldName, DBDocumentReaderBase reader) {
+    if (allTextMode) {
+      writeString(writer, reader.getTime().toTimeStr(), fieldName);
+    } else {
+      ((writer.map != null) ? writer.map.time(fieldName) : writer.list.time()).writeTime(reader.getTimeInt());
+    }
+  }
+
+  private void writeDate(MapOrListWriterImpl writer, String fieldName, DBDocumentReaderBase reader) {
+    if (allTextMode) {
+      writeString(writer, reader.getDate().toDateStr(), fieldName);
+    } else {
+      long milliSecondsSinceEpoch = reader.getDateInt() * MILLISECONDS_IN_A_DAY;
+      ((writer.map != null) ? writer.map.date(fieldName) : writer.list.date()).writeDate(milliSecondsSinceEpoch);
+    }
+  }
+
+  private void writeDouble(MapOrListWriterImpl writer, String fieldName, DBDocumentReaderBase reader) {
+    if (allTextMode) {
+      writeString(writer, String.valueOf(reader.getDouble()), fieldName);
+    } else {
+      writer.float8(fieldName).writeFloat8(reader.getDouble());
+    }
+  }
+
+  private void writeFloat(MapOrListWriterImpl writer, String fieldName, DBDocumentReaderBase reader) {
+    if (allTextMode) {
+      writeString(writer, String.valueOf(reader.getFloat()), fieldName);
+    } else if (readNumbersAsDouble) {
+      writer.float8(fieldName).writeFloat8(reader.getFloat());
+    } else {
+      writer.float4(fieldName).writeFloat4(reader.getFloat());
+    }
+  }
+
+  private void writeLong(MapOrListWriterImpl writer, String fieldName, DBDocumentReaderBase reader) {
+    if (allTextMode) {
+      writeString(writer, String.valueOf(reader.getLong()), fieldName);
+    } else if (readNumbersAsDouble) {
+      writer.float8(fieldName).writeFloat8(reader.getLong());
+    } else {
+      writer.bigInt(fieldName).writeBigInt(reader.getLong());
+    }
+  }
+
+  private void writeInt(MapOrListWriterImpl writer, String fieldName, DBDocumentReaderBase reader) {
+    if (allTextMode) {
+      writeString(writer, String.valueOf(reader.getInt()), fieldName);
+    } else if (readNumbersAsDouble) {
+      writer.float8(fieldName).writeFloat8(reader.getInt());
+    } else {
+      writer.integer(fieldName).writeInt(reader.getInt());
+    }
+  }
+
+  private void writeShort(MapOrListWriterImpl writer, String fieldName, DBDocumentReaderBase reader) {
+    if (allTextMode) {
+      writeString(writer, String.valueOf(reader.getShort()), fieldName);
+    } else if (readNumbersAsDouble) {
+      writer.float8(fieldName).writeFloat8(reader.getShort());
+    } else {
+      ((writer.map != null) ? writer.map.smallInt(fieldName) : writer.list.smallInt()).writeSmallInt(reader.getShort());
     }
-    list.endList();
   }
 
-  private void writeBinary(VarBinaryWriter binaryWriter, ByteBuffer buf) {
-    buffer = buffer.reallocIfNeeded(buf.remaining());
-    buffer.setBytes(0, buf, buf.position(), buf.remaining());
-    binaryWriter.writeVarBinary(0, buf.remaining(), buffer);
+  private void writeByte(MapOrListWriterImpl writer, String fieldName, DBDocumentReaderBase reader) {
+    if (allTextMode) {
+      writeString(writer, String.valueOf(reader.getByte()), fieldName);
+    } else if (readNumbersAsDouble) {
+      writer.float8(fieldName).writeFloat8(reader.getByte());
+    } else {
+      ((writer.map != null) ? writer.map.tinyInt(fieldName) : writer.list.tinyInt()).writeTinyInt(reader.getByte());
+    }
+  }
+
+  private void writeBoolean(MapOrListWriterImpl writer, String fieldName, DBDocumentReaderBase reader) {
+    if (allTextMode) {
+      writeString(writer, String.valueOf(reader.getBoolean()), fieldName);
+    } else {
+      writer.bit(fieldName).writeBit(reader.getBoolean() ? 1 : 0);
+    }
+  }
+
+  private void writeBinary(MapOrListWriterImpl writer, String fieldName, ByteBuffer buf) {
+    if (allTextMode) {
+      writeString(writer, fieldName, Bytes.toString(buf));
+    } else {
+      buffer = buffer.reallocIfNeeded(buf.remaining());
+      buffer.setBytes(0, buf, buf.position(), buf.remaining());
+      writer.binary(fieldName).writeVarBinary(0, buf.remaining(), buffer);
+    }
   }
 
-  private void writeString(VarCharWriter varCharWriter, String string) {
-    final byte[] strBytes = Bytes.toBytes(string);
+  private void writeString(MapOrListWriterImpl writer, String fieldName, String value) {
+    final byte[] strBytes = Bytes.toBytes(value);
     buffer = buffer.reallocIfNeeded(strBytes.length);
     buffer.setBytes(0, strBytes);
-    varCharWriter.writeVarChar(0, strBytes.length, buffer);
+    writer.varChar(fieldName).writeVarChar(0, strBytes.length, buffer);
   }
 
   private UserException unsupportedError(String format, Object... args) {

http://git-wip-us.apache.org/repos/asf/drill/blob/156819d0/contrib/format-maprdb/src/test/java/com/mapr/drill/maprdb/tests/json/TestSimpleJson.java
----------------------------------------------------------------------
diff --git a/contrib/format-maprdb/src/test/java/com/mapr/drill/maprdb/tests/json/TestSimpleJson.java b/contrib/format-maprdb/src/test/java/com/mapr/drill/maprdb/tests/json/TestSimpleJson.java
index 225fb2f..2bf2c31 100644
--- a/contrib/format-maprdb/src/test/java/com/mapr/drill/maprdb/tests/json/TestSimpleJson.java
+++ b/contrib/format-maprdb/src/test/java/com/mapr/drill/maprdb/tests/json/TestSimpleJson.java
@@ -39,7 +39,16 @@ import com.mapr.tests.annotations.ClusterTest;
 public class TestSimpleJson extends BaseJsonTest {
 
   @Test
-  public void testMe() throws Exception {
+  public void testSelectStar() throws Exception {
+    final String sql = "SELECT\n"
+        + "  *\n"
+        + "FROM\n"
+        + "  hbase.`business` business";
+    runSQLAndVerifyCount(sql, 10);
+  }
+
+  @Test
+  public void testSelectId() throws Exception {
     setColumnWidths(new int[] {23});
     final String sql = "SELECT\n"
         + "  _id\n"
@@ -58,6 +67,24 @@ public class TestSimpleJson extends BaseJsonTest {
   }
 
   @Test
+  public void testPushdownDisabled() throws Exception {
+    setColumnWidths(new int[] {25, 40, 40, 40});
+    final String sql = "SELECT\n"
+        + "  _id, name, categories, full_address\n"
+        + "FROM\n"
+        + "  table(hbase.`business`(type => 'maprdb', enablePushdown => false)) business\n"
+        + "WHERE\n"
+        + " name <> 'Sprint'"
+        ;
+    runSQLAndVerifyCount(sql, 9);
+
+    final String[] expectedPlan = {"condition=null", "columns=\\[`\\*`\\]"};
+    final String[] excludedPlan = {"condition=\\(name != \"Sprint\"\\)", "columns=\\[`name`, `_id`, `categories`, `full_address`\\]"};
+
+    PlanTestBase.testPlanMatchingPatterns(sql, expectedPlan, excludedPlan);
+  }
+
+  @Test
   public void testPushdownStringEqual() throws Exception {
     setColumnWidths(new int[] {25, 40, 40, 40});
     final String sql = "SELECT\n"
@@ -131,7 +158,7 @@ public class TestSimpleJson extends BaseJsonTest {
         ;
     runSQLAndVerifyCount(sql, 9);
 
-    final String[] expectedPlan = {"condition=\\(name != \"Sprint\"\\)"};
+    final String[] expectedPlan = {"condition=\\(name != \"Sprint\"\\)", "columns=\\[`name`, `_id`, `categories`, `full_address`\\]"};
     final String[] excludedPlan = {};
 
     PlanTestBase.testPlanMatchingPatterns(sql, expectedPlan, excludedPlan);
@@ -409,4 +436,5 @@ public class TestSimpleJson extends BaseJsonTest {
 
     PlanTestBase.testPlanMatchingPatterns(sql, expectedPlan, excludedPlan);
   }
+
 }