You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by ad...@apache.org on 2016/09/13 01:32:30 UTC
[43/50] [abbrv] drill git commit: MD-572: Column names in MapR-DB
JSON tables are case-sensitive
MD-572: Column names in MapR-DB JSON tables are case-sensitive
Disable pushdown of both filter and projects (by default set to false).
This will allow Drill to handle both of these operators in a case-insensitive way.
Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/156819d0
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/156819d0
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/156819d0
Branch: refs/heads/master
Commit: 156819d0470b95a2822c03a498ed5c5b872d2022
Parents: c74d75c
Author: Aditya <ad...@mapr.com>
Authored: Tue Mar 8 16:57:52 2016 -0800
Committer: Aditya Kishore <ad...@apache.org>
Committed: Fri Sep 9 10:08:39 2016 -0700
----------------------------------------------------------------------
.../store/mapr/db/MapRDBFormatPluginConfig.java | 16 +-
.../store/mapr/db/MapRDBPushFilterIntoScan.java | 4 +-
.../store/mapr/db/json/JsonTableGroupScan.java | 11 +-
.../mapr/db/json/MaprDBJsonRecordReader.java | 382 +++++++------------
.../drill/maprdb/tests/json/TestSimpleJson.java | 32 +-
5 files changed, 200 insertions(+), 245 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/drill/blob/156819d0/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBFormatPluginConfig.java
----------------------------------------------------------------------
diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBFormatPluginConfig.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBFormatPluginConfig.java
index 82b360c..7295265 100644
--- a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBFormatPluginConfig.java
+++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBFormatPluginConfig.java
@@ -27,8 +27,9 @@ import com.fasterxml.jackson.annotation.JsonTypeName;
@JsonTypeName("maprdb") @JsonInclude(Include.NON_DEFAULT)
public class MapRDBFormatPluginConfig extends TableFormatPluginConfig {
- private boolean allTextMode = false;
- private boolean readAllNumbersAsDouble = false;
+ public boolean allTextMode = false;
+ public boolean readAllNumbersAsDouble = false;
+ public boolean enablePushdown = true;
@Override
public int hashCode() {
@@ -42,6 +43,8 @@ public class MapRDBFormatPluginConfig extends TableFormatPluginConfig {
return false;
} else if (allTextMode != other.allTextMode) {
return false;
+ } else if (enablePushdown != other.enablePushdown) {
+ return false;
}
return true;
@@ -65,4 +68,13 @@ public class MapRDBFormatPluginConfig extends TableFormatPluginConfig {
readAllNumbersAsDouble = read;
}
+ public boolean isEnablePushdown() {
+ return enablePushdown;
+ }
+
+ @JsonProperty("enablePushdown")
+ public void setEnablePushdown(boolean enablePushdown) {
+ this.enablePushdown = enablePushdown;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/drill/blob/156819d0/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBPushFilterIntoScan.java
----------------------------------------------------------------------
diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBPushFilterIntoScan.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBPushFilterIntoScan.java
index 7292182..6a286a8 100644
--- a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBPushFilterIntoScan.java
+++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBPushFilterIntoScan.java
@@ -112,7 +112,8 @@ public abstract class MapRDBPushFilterIntoScan extends StoragePluginOptimizerRul
FilterPrel filter, final ProjectPrel project, ScanPrel scan,
JsonTableGroupScan groupScan, RexNode condition) {
- if (groupScan.isFilterPushedDown()) {
+ if (groupScan.isDisablePushdown() // Do not pushdown filter if it is disabled in plugin configuration
+ || groupScan.isFilterPushedDown()) { // see below
/*
* The rule can get triggered again due to the transformed "scan => filter" sequence
* created by the earlier execution of this rule when we could not do a complete
@@ -202,4 +203,5 @@ public abstract class MapRDBPushFilterIntoScan extends StoragePluginOptimizerRul
call.transformTo(filter.copy(filter.getTraitSet(), ImmutableList.of(childRel)));
}
}
+
}
http://git-wip-us.apache.org/repos/asf/drill/blob/156819d0/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/JsonTableGroupScan.java
----------------------------------------------------------------------
diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/JsonTableGroupScan.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/JsonTableGroupScan.java
index 9e23af7..0c8ffda 100644
--- a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/JsonTableGroupScan.java
+++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/JsonTableGroupScan.java
@@ -112,7 +112,7 @@ public class JsonTableGroupScan extends MapRDBGroupScan {
regionsToScan = new TreeMap<TabletFragmentInfo, String>();
for (TabletInfo tabletInfo : tabletInfos) {
TabletInfoImpl tabletInfoImpl = (TabletInfoImpl) tabletInfo;
- if (!foundStartRegion
+ if (!foundStartRegion
&& !isNullOrEmpty(scanSpec.getStartRow())
&& !tabletInfoImpl.containsRow(scanSpec.getStartRow())) {
continue;
@@ -171,6 +171,15 @@ public class JsonTableGroupScan extends MapRDBGroupScan {
return scanSpec.getTableName();
}
+ public boolean isDisablePushdown() {
+ return !formatPluginConfig.isEnablePushdown();
+ }
+
+ @JsonIgnore
+ public boolean canPushdownProjects(List<SchemaPath> columns) {
+ return formatPluginConfig.isEnablePushdown();
+ }
+
@Override
public String toString() {
return "JsonTableGroupScan [ScanSpec=" + scanSpec + ", columns=" + columns + "]";
http://git-wip-us.apache.org/repos/asf/drill/blob/156819d0/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/MaprDBJsonRecordReader.java
----------------------------------------------------------------------
diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/MaprDBJsonRecordReader.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/MaprDBJsonRecordReader.java
index 7fbcd1b..cb86e32 100644
--- a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/MaprDBJsonRecordReader.java
+++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/MaprDBJsonRecordReader.java
@@ -18,10 +18,8 @@
package org.apache.drill.exec.store.mapr.db.json;
import static org.ojai.DocumentConstants.ID_KEY;
-import io.netty.buffer.DrillBuf;
import java.nio.ByteBuffer;
-import java.nio.charset.Charset;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
@@ -42,11 +40,8 @@ import org.apache.drill.exec.store.AbstractRecordReader;
import org.apache.drill.exec.store.mapr.db.MapRDBFormatPluginConfig;
import org.apache.drill.exec.store.mapr.db.MapRDBSubScanSpec;
import org.apache.drill.exec.vector.BaseValueVector;
+import org.apache.drill.exec.vector.complex.impl.MapOrListWriterImpl;
import org.apache.drill.exec.vector.complex.impl.VectorContainerWriter;
-import org.apache.drill.exec.vector.complex.writer.BaseWriter.ListWriter;
-import org.apache.drill.exec.vector.complex.writer.BaseWriter.MapWriter;
-import org.apache.drill.exec.vector.complex.writer.VarBinaryWriter;
-import org.apache.drill.exec.vector.complex.writer.VarCharWriter;
import org.ojai.DocumentReader;
import org.ojai.DocumentReader.EventType;
import org.ojai.DocumentStream;
@@ -54,7 +49,6 @@ import org.ojai.FieldPath;
import org.ojai.FieldSegment;
import org.ojai.Value;
import org.ojai.store.QueryCondition;
-import org.ojai.types.OTime;
import com.google.common.base.Preconditions;
import com.google.common.base.Stopwatch;
@@ -68,10 +62,13 @@ import com.mapr.db.ojai.DBDocumentReaderBase;
import com.mapr.db.util.ByteBufs;
import com.mapr.org.apache.hadoop.hbase.util.Bytes;
+import io.netty.buffer.DrillBuf;
+
public class MaprDBJsonRecordReader extends AbstractRecordReader {
private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MaprDBJsonRecordReader.class);
public static final SchemaPath ID_PATH = SchemaPath.getSimplePath(ID_KEY);
+ private final long MILLISECONDS_IN_A_DAY = (long)1000 * 60 * 60 * 24;
private Table table;
private QueryCondition condition;
@@ -79,7 +76,7 @@ public class MaprDBJsonRecordReader extends AbstractRecordReader {
private final String tableName;
private OperatorContext operatorContext;
- private VectorContainerWriter writer;
+ private VectorContainerWriter vectorWriter;
private DrillBuf buffer;
@@ -91,8 +88,8 @@ public class MaprDBJsonRecordReader extends AbstractRecordReader {
private boolean idOnly;
private final boolean unionEnabled;
private final boolean readNumbersAsDouble;
+ private boolean disablePushdown;
private final boolean allTextMode;
- private final long MILLISECONDS_IN_A_DAY = (long)1000 * 60 * 60 * 24;
public MaprDBJsonRecordReader(MapRDBSubScanSpec subScanSpec,
MapRDBFormatPluginConfig formatPluginConfig,
@@ -114,12 +111,13 @@ public class MaprDBJsonRecordReader extends AbstractRecordReader {
unionEnabled = context.getOptions().getOption(ExecConstants.ENABLE_UNION_TYPE);
readNumbersAsDouble = formatPluginConfig.isReadAllNumbersAsDouble();
allTextMode = formatPluginConfig.isAllTextMode();
+ disablePushdown = !formatPluginConfig.isEnablePushdown();
}
@Override
protected Collection<SchemaPath> transformColumns(Collection<SchemaPath> columns) {
Set<SchemaPath> transformed = Sets.newLinkedHashSet();
- if (!isStarQuery()) {
+ if (!isStarQuery() && !disablePushdown) {
Set<FieldPath> projectedFieldsSet = Sets.newTreeSet();
for (SchemaPath column : columns) {
if (column.getRootSegment().getPath().equalsIgnoreCase(ID_KEY)) {
@@ -154,7 +152,7 @@ public class MaprDBJsonRecordReader extends AbstractRecordReader {
@Override
public void setup(OperatorContext context, OutputMutator output) throws ExecutionSetupException {
- this.writer = new VectorContainerWriter(output, unionEnabled);
+ this.vectorWriter = new VectorContainerWriter(output, unionEnabled);
this.operatorContext = context;
try {
@@ -172,8 +170,8 @@ public class MaprDBJsonRecordReader extends AbstractRecordReader {
Stopwatch watch = Stopwatch.createUnstarted();
watch.start();
- writer.allocate();
- writer.reset();
+ vectorWriter.allocate();
+ vectorWriter.reset();
int recordCount = 0;
DBDocumentReaderBase reader = null;
@@ -182,24 +180,18 @@ public class MaprDBJsonRecordReader extends AbstractRecordReader {
try {
reader = nextDocumentReader();
if (reader == null) break;
- writer.setPosition(recordCount);
+
+ vectorWriter.setPosition(recordCount);
+ MapOrListWriterImpl writer = new MapOrListWriterImpl(vectorWriter.rootAsMap());
if (idOnly) {
Value id = reader.getId();
- MapWriter map = writer.rootAsMap();
-
try {
switch(id.getType()) {
case STRING:
- writeString(map.varChar(ID_KEY), id.getString());
- recordCount++;
+ writeString(writer, ID_KEY, id.getString());
break;
case BINARY:
- if (allTextMode) {
- writeString(map.varChar(ID_KEY), new String(id.getBinary().array(), Charset.forName("UTF-8")));
- } else {
- writeBinary(map.varBinary(ID_KEY), id.getBinary());
- }
- recordCount++;
+ writeBinary(writer, ID_KEY, id.getBinary());
break;
default:
throw new UnsupportedOperationException(id.getType() +
@@ -213,9 +205,9 @@ public class MaprDBJsonRecordReader extends AbstractRecordReader {
if (reader.next() != EventType.START_MAP) {
throw dataReadError("The document did not start with START_MAP!");
}
- writeToMap(reader, writer.rootAsMap());
- recordCount++;
+ writeToListOrMap(writer, reader);
}
+ recordCount++;
} catch (UserException e) {
throw UserException.unsupportedError(e)
.addContext(String.format("Table: %s, document id: '%s'",
@@ -225,132 +217,74 @@ public class MaprDBJsonRecordReader extends AbstractRecordReader {
}
}
- writer.setValueCount(recordCount);
+ vectorWriter.setValueCount(recordCount);
logger.debug("Took {} ms to get {} records", watch.elapsed(TimeUnit.MILLISECONDS), recordCount);
return recordCount;
}
- private void writeToMap(DBDocumentReaderBase reader, MapWriter map) {
- map.start();
+ private void writeToListOrMap(MapOrListWriterImpl writer, DBDocumentReaderBase reader) {
+ String fieldName = null;
+ writer.start();
outside: while (true) {
EventType event = reader.next();
- if (event == null || event == EventType.END_MAP) break outside;
+ if (event == null
+ || event == EventType.END_MAP
+ || event == EventType.END_ARRAY) {
+ break outside;
+ } else if (reader.inMap()) {
+ fieldName = reader.getFieldName();
+ }
- String fieldName = reader.getFieldName();
try {
switch (event) {
case NULL:
break; // not setting the field will leave it as null
case BINARY:
- if (allTextMode) {
- writeString(map.varChar(fieldName), new String(reader.getBinary().array(), Charset.forName("UTF-8")));
- } else {
- writeBinary(map.varBinary(fieldName), reader.getBinary());
- }
+ writeBinary(writer, fieldName, reader.getBinary());
break;
case BOOLEAN:
- if (allTextMode) {
- writeString(map.varChar(fieldName), String.valueOf(reader.getBoolean()));
- } else {
- map.bit(fieldName).writeBit(reader.getBoolean() ? 1 : 0);
- }
+ writeBoolean(writer, fieldName, reader);
break;
case STRING:
- writeString(map.varChar(fieldName), reader.getString());
+ writeString(writer, fieldName, reader.getString());
break;
case BYTE:
- if (allTextMode) {
- writeString(map.varChar(fieldName), String.valueOf(reader.getByte()));
- } else if (readNumbersAsDouble) {
- map.float8(fieldName).writeFloat8(reader.getByte());
- } else {
- map.tinyInt(fieldName).writeTinyInt(reader.getByte());
- }
+ writeByte(writer, fieldName, reader);
break;
case SHORT:
- if (allTextMode) {
- writeString(map.varChar(fieldName), String.valueOf(reader.getShort()));
- } else if (readNumbersAsDouble) {
- map.float8(fieldName).writeFloat8(reader.getShort());
- } else {
- map.smallInt(fieldName).writeSmallInt(reader.getShort());
- }
+ writeShort(writer, fieldName, reader);
break;
case INT:
- if (allTextMode) {
- writeString(map.varChar(fieldName), String.valueOf(reader.getInt()));
- } else if (readNumbersAsDouble) {
- map.float8(fieldName).writeFloat8(reader.getInt());
- } else {
- map.integer(fieldName).writeInt(reader.getInt());
- }
+ writeInt(writer, fieldName, reader);
break;
case LONG:
- if (allTextMode) {
- writeString(map.varChar(fieldName), String.valueOf(reader.getLong()));
- } else if (readNumbersAsDouble) {
- map.float8(fieldName).writeFloat8(reader.getLong());
- } else {
- map.bigInt(fieldName).writeBigInt(reader.getLong());
- }
+ writeLong(writer, fieldName, reader);
break;
case FLOAT:
- if (allTextMode) {
- writeString(map.varChar(fieldName), String.valueOf(reader.getFloat()));
- } else if (readNumbersAsDouble) {
- map.float8(fieldName).writeFloat8(reader.getFloat());
- } else {
- map.float4(fieldName).writeFloat4(reader.getFloat());
- }
+ writeFloat(writer, fieldName, reader);
break;
case DOUBLE:
- if (allTextMode) {
- writeString(map.varChar(fieldName), String.valueOf(reader.getDouble()));
- } else {
- map.float8(fieldName).writeFloat8(reader.getDouble());
- }
+ writeDouble(writer, fieldName, reader);
break;
case DECIMAL:
throw unsupportedError("Decimal type is currently not supported.");
case DATE:
- if (allTextMode) {
- writeString(map.varChar(fieldName), reader.getDate().toString());
- } else {
-
- long milliSecondsSinceEpoch = reader.getDate().toDaysSinceEpoch() * MILLISECONDS_IN_A_DAY;
- map.date(fieldName).writeDate(milliSecondsSinceEpoch);
- }
+ writeDate(writer, fieldName, reader);
break;
case TIME:
- if (allTextMode) {
- writeString(map.varChar(fieldName), reader.getTime().toString());
- } else {
- OTime t = reader.getTime();
- int h = t.getHour();
- int m = t.getMinute();
- int s = t.getSecond();
- int ms = t.getMilliSecond();
- int millisOfDay = ms + (s + ((m + (h * 60)) * 60)) * 1000;
- map.time(fieldName).writeTime(millisOfDay);
- }
+ writeTime(writer, fieldName, reader);
break;
case TIMESTAMP:
- if (allTextMode) {
- writeString(map.varChar(fieldName), reader.getTimestamp().toString());
- } else {
- map.timeStamp(fieldName).writeTimeStamp(reader.getTimestampLong());
- }
+ writeTimeStamp(writer, fieldName, reader);
break;
case INTERVAL:
throw unsupportedError("Interval type is currently not supported.");
case START_MAP:
- writeToMap(reader, map.map(fieldName));
+ writeToListOrMap((MapOrListWriterImpl) (reader.inMap() ? writer.map(fieldName) : writer.listoftmap(fieldName)), reader);
break;
case START_ARRAY:
- writeToList(reader, map.list(fieldName));
+ writeToListOrMap((MapOrListWriterImpl) writer.list(fieldName), reader);
break;
- case END_ARRAY:
- throw dataReadError("Encountered an END_ARRAY event inside a map.");
default:
throw unsupportedError("Unsupported type: %s encountered during the query.", event);
}
@@ -359,145 +293,115 @@ public class MaprDBJsonRecordReader extends AbstractRecordReader {
IdCodec.asString(reader.getId()), fieldName), e);
}
}
- map.end();
+ writer.end();
}
- private void writeToList(DBDocumentReaderBase reader, ListWriter list) {
- list.startList();
- outside: while (true) {
- EventType event = reader.next();
- if (event == null || event == EventType.END_ARRAY) break outside;
-
- switch (event) {
- case NULL:
- throw unsupportedError("Null values are not supported in lists.");
- case BINARY:
- if (allTextMode) {
- writeString(list.varChar(), new String(reader.getBinary().array(), Charset.forName("UTF-8")));
- } else {
- writeBinary(list.varBinary(), reader.getBinary());
- }
- break;
- case BOOLEAN:
- if (allTextMode) {
- writeString(list.varChar(), String.valueOf(reader.getBoolean()));
- } else {
- list.bit().writeBit(reader.getBoolean() ? 1 : 0);
- }
- break;
- case STRING:
- writeString(list.varChar(), reader.getString());
- break;
- case BYTE:
- if (allTextMode) {
- writeString(list.varChar(), String.valueOf(reader.getByte()));
- } else if (readNumbersAsDouble) {
- list.float8().writeFloat8(reader.getByte());
- } else {
- list.tinyInt().writeTinyInt(reader.getByte());
- }
- break;
- case SHORT:
- if (allTextMode) {
- writeString(list.varChar(), String.valueOf(reader.getShort()));
- } else if (readNumbersAsDouble) {
- list.float8().writeFloat8(reader.getShort());
- } else {
- list.smallInt().writeSmallInt(reader.getShort());
- }
- break;
- case INT:
- if (allTextMode) {
- writeString(list.varChar(), String.valueOf(reader.getInt()));
- } else if (readNumbersAsDouble) {
- list.float8().writeFloat8(reader.getInt());
- } else {
- list.integer().writeInt(reader.getInt());
- }
- break;
- case LONG:
- if (allTextMode) {
- writeString(list.varChar(), String.valueOf(reader.getLong()));
- } else if (readNumbersAsDouble) {
- list.float8().writeFloat8(reader.getLong());
- } else {
- list.bigInt().writeBigInt(reader.getLong());
- }
- break;
- case FLOAT:
- if (allTextMode) {
- writeString(list.varChar(), String.valueOf(reader.getFloat()));
- } else if (readNumbersAsDouble) {
- list.float8().writeFloat8(reader.getFloat());
- } else {
- list.float4().writeFloat4(reader.getFloat());
- }
- break;
- case DOUBLE:
- if (allTextMode) {
- writeString(list.varChar(), String.valueOf(reader.getDouble()));
- } else {
- list.float8().writeFloat8(reader.getDouble());
- }
- break;
- case DECIMAL:
- throw unsupportedError("Decimals are currently not supported.");
- case DATE:
- if (allTextMode) {
- writeString(list.varChar(), reader.getDate().toString());
- } else {
- long milliSecondsSinceEpoch = reader.getDate().toDaysSinceEpoch() * MILLISECONDS_IN_A_DAY;
- list.date().writeDate(milliSecondsSinceEpoch);
- }
- break;
- case TIME:
- if (allTextMode) {
- writeString(list.varChar(), reader.getTime().toString());
- } else {
- OTime t = reader.getTime();
- int h = t.getHour();
- int m = t.getMinute();
- int s = t.getSecond();
- int ms = t.getMilliSecond();
- int millisOfDay = ms + (s + ((m + (h * 60)) * 60)) * 1000;
- list.time().writeTime(millisOfDay);
- }
- break;
- case TIMESTAMP:
- if (allTextMode) {
- writeString(list.varChar(), reader.getTimestamp().toString());
- } else {
- list.timeStamp().writeTimeStamp(reader.getTimestampLong());
- }
- break;
- case INTERVAL:
- throw unsupportedError("Interval is currently not supported.");
- case START_MAP:
- writeToMap(reader, list.map());
- break;
- case END_MAP:
- throw dataReadError("Encountered an END_MAP event inside a list.");
- case START_ARRAY:
- writeToList(reader, list.list());
- break;
- default:
- throw unsupportedError("Unsupported type: %s encountered during the query.%s", event);
- }
+ private void writeTimeStamp(MapOrListWriterImpl writer, String fieldName, DBDocumentReaderBase reader) {
+ if (allTextMode) {
+ writeString(writer, fieldName, reader.getTimestamp().toUTCString());
+ } else {
+ ((writer.map != null) ? writer.map.timeStamp(fieldName) : writer.list.timeStamp()).writeTimeStamp(reader.getTimestampLong());
+ }
+ }
+
+ private void writeTime(MapOrListWriterImpl writer, String fieldName, DBDocumentReaderBase reader) {
+ if (allTextMode) {
+ writeString(writer, reader.getTime().toTimeStr(), fieldName);
+ } else {
+ ((writer.map != null) ? writer.map.time(fieldName) : writer.list.time()).writeTime(reader.getTimeInt());
+ }
+ }
+
+ private void writeDate(MapOrListWriterImpl writer, String fieldName, DBDocumentReaderBase reader) {
+ if (allTextMode) {
+ writeString(writer, reader.getDate().toDateStr(), fieldName);
+ } else {
+ long milliSecondsSinceEpoch = reader.getDateInt() * MILLISECONDS_IN_A_DAY;
+ ((writer.map != null) ? writer.map.date(fieldName) : writer.list.date()).writeDate(milliSecondsSinceEpoch);
+ }
+ }
+
+ private void writeDouble(MapOrListWriterImpl writer, String fieldName, DBDocumentReaderBase reader) {
+ if (allTextMode) {
+ writeString(writer, String.valueOf(reader.getDouble()), fieldName);
+ } else {
+ writer.float8(fieldName).writeFloat8(reader.getDouble());
+ }
+ }
+
+ private void writeFloat(MapOrListWriterImpl writer, String fieldName, DBDocumentReaderBase reader) {
+ if (allTextMode) {
+ writeString(writer, String.valueOf(reader.getFloat()), fieldName);
+ } else if (readNumbersAsDouble) {
+ writer.float8(fieldName).writeFloat8(reader.getFloat());
+ } else {
+ writer.float4(fieldName).writeFloat4(reader.getFloat());
+ }
+ }
+
+ private void writeLong(MapOrListWriterImpl writer, String fieldName, DBDocumentReaderBase reader) {
+ if (allTextMode) {
+ writeString(writer, String.valueOf(reader.getLong()), fieldName);
+ } else if (readNumbersAsDouble) {
+ writer.float8(fieldName).writeFloat8(reader.getLong());
+ } else {
+ writer.bigInt(fieldName).writeBigInt(reader.getLong());
+ }
+ }
+
+ private void writeInt(MapOrListWriterImpl writer, String fieldName, DBDocumentReaderBase reader) {
+ if (allTextMode) {
+ writeString(writer, String.valueOf(reader.getInt()), fieldName);
+ } else if (readNumbersAsDouble) {
+ writer.float8(fieldName).writeFloat8(reader.getInt());
+ } else {
+ writer.integer(fieldName).writeInt(reader.getInt());
+ }
+ }
+
+ private void writeShort(MapOrListWriterImpl writer, String fieldName, DBDocumentReaderBase reader) {
+ if (allTextMode) {
+ writeString(writer, String.valueOf(reader.getShort()), fieldName);
+ } else if (readNumbersAsDouble) {
+ writer.float8(fieldName).writeFloat8(reader.getShort());
+ } else {
+ ((writer.map != null) ? writer.map.smallInt(fieldName) : writer.list.smallInt()).writeSmallInt(reader.getShort());
}
- list.endList();
}
- private void writeBinary(VarBinaryWriter binaryWriter, ByteBuffer buf) {
- buffer = buffer.reallocIfNeeded(buf.remaining());
- buffer.setBytes(0, buf, buf.position(), buf.remaining());
- binaryWriter.writeVarBinary(0, buf.remaining(), buffer);
+ private void writeByte(MapOrListWriterImpl writer, String fieldName, DBDocumentReaderBase reader) {
+ if (allTextMode) {
+ writeString(writer, String.valueOf(reader.getByte()), fieldName);
+ } else if (readNumbersAsDouble) {
+ writer.float8(fieldName).writeFloat8(reader.getByte());
+ } else {
+ ((writer.map != null) ? writer.map.tinyInt(fieldName) : writer.list.tinyInt()).writeTinyInt(reader.getByte());
+ }
+ }
+
+ private void writeBoolean(MapOrListWriterImpl writer, String fieldName, DBDocumentReaderBase reader) {
+ if (allTextMode) {
+ writeString(writer, String.valueOf(reader.getBoolean()), fieldName);
+ } else {
+ writer.bit(fieldName).writeBit(reader.getBoolean() ? 1 : 0);
+ }
+ }
+
+ private void writeBinary(MapOrListWriterImpl writer, String fieldName, ByteBuffer buf) {
+ if (allTextMode) {
+ writeString(writer, fieldName, Bytes.toString(buf));
+ } else {
+ buffer = buffer.reallocIfNeeded(buf.remaining());
+ buffer.setBytes(0, buf, buf.position(), buf.remaining());
+ writer.binary(fieldName).writeVarBinary(0, buf.remaining(), buffer);
+ }
}
- private void writeString(VarCharWriter varCharWriter, String string) {
- final byte[] strBytes = Bytes.toBytes(string);
+ private void writeString(MapOrListWriterImpl writer, String fieldName, String value) {
+ final byte[] strBytes = Bytes.toBytes(value);
buffer = buffer.reallocIfNeeded(strBytes.length);
buffer.setBytes(0, strBytes);
- varCharWriter.writeVarChar(0, strBytes.length, buffer);
+ writer.varChar(fieldName).writeVarChar(0, strBytes.length, buffer);
}
private UserException unsupportedError(String format, Object... args) {
http://git-wip-us.apache.org/repos/asf/drill/blob/156819d0/contrib/format-maprdb/src/test/java/com/mapr/drill/maprdb/tests/json/TestSimpleJson.java
----------------------------------------------------------------------
diff --git a/contrib/format-maprdb/src/test/java/com/mapr/drill/maprdb/tests/json/TestSimpleJson.java b/contrib/format-maprdb/src/test/java/com/mapr/drill/maprdb/tests/json/TestSimpleJson.java
index 225fb2f..2bf2c31 100644
--- a/contrib/format-maprdb/src/test/java/com/mapr/drill/maprdb/tests/json/TestSimpleJson.java
+++ b/contrib/format-maprdb/src/test/java/com/mapr/drill/maprdb/tests/json/TestSimpleJson.java
@@ -39,7 +39,16 @@ import com.mapr.tests.annotations.ClusterTest;
public class TestSimpleJson extends BaseJsonTest {
@Test
- public void testMe() throws Exception {
+ public void testSelectStar() throws Exception {
+ final String sql = "SELECT\n"
+ + " *\n"
+ + "FROM\n"
+ + " hbase.`business` business";
+ runSQLAndVerifyCount(sql, 10);
+ }
+
+ @Test
+ public void testSelectId() throws Exception {
setColumnWidths(new int[] {23});
final String sql = "SELECT\n"
+ " _id\n"
@@ -58,6 +67,24 @@ public class TestSimpleJson extends BaseJsonTest {
}
@Test
+ public void testPushdownDisabled() throws Exception {
+ setColumnWidths(new int[] {25, 40, 40, 40});
+ final String sql = "SELECT\n"
+ + " _id, name, categories, full_address\n"
+ + "FROM\n"
+ + " table(hbase.`business`(type => 'maprdb', enablePushdown => false)) business\n"
+ + "WHERE\n"
+ + " name <> 'Sprint'"
+ ;
+ runSQLAndVerifyCount(sql, 9);
+
+ final String[] expectedPlan = {"condition=null", "columns=\\[`\\*`\\]"};
+ final String[] excludedPlan = {"condition=\\(name != \"Sprint\"\\)", "columns=\\[`name`, `_id`, `categories`, `full_address`\\]"};
+
+ PlanTestBase.testPlanMatchingPatterns(sql, expectedPlan, excludedPlan);
+ }
+
+ @Test
public void testPushdownStringEqual() throws Exception {
setColumnWidths(new int[] {25, 40, 40, 40});
final String sql = "SELECT\n"
@@ -131,7 +158,7 @@ public class TestSimpleJson extends BaseJsonTest {
;
runSQLAndVerifyCount(sql, 9);
- final String[] expectedPlan = {"condition=\\(name != \"Sprint\"\\)"};
+ final String[] expectedPlan = {"condition=\\(name != \"Sprint\"\\)", "columns=\\[`name`, `_id`, `categories`, `full_address`\\]"};
final String[] excludedPlan = {};
PlanTestBase.testPlanMatchingPatterns(sql, expectedPlan, excludedPlan);
@@ -409,4 +436,5 @@ public class TestSimpleJson extends BaseJsonTest {
PlanTestBase.testPlanMatchingPatterns(sql, expectedPlan, excludedPlan);
}
+
}