You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by ad...@apache.org on 2016/09/13 01:32:12 UTC
[25/50] [abbrv] drill git commit: MD-670: Querying MapR-DB JSON
Tables returns no results
MD-670: Querying MapR-DB JSON Tables returns no results
* Use DocumentReader API to emit "_id" field instead of handling it as a special case.
* Update the DrillBuf reference field when reallocation happen.
* Catch the correct exception when schema change happens and include the field name in the warning message.
+ Get rid of unused code.
Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/869cfbdf
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/869cfbdf
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/869cfbdf
Branch: refs/heads/master
Commit: 869cfbdf0599c708db62fb7a4d308e22ccbe2fb7
Parents: 004aad9
Author: Aditya <ad...@mapr.com>
Authored: Mon Feb 1 17:43:42 2016 -0800
Committer: Aditya Kishore <ad...@apache.org>
Committed: Fri Sep 9 10:08:34 2016 -0700
----------------------------------------------------------------------
.../maprdb/json/MaprDBJsonRecordReader.java | 94 +++++++-------------
.../drill/maprdb/tests/MaprDBTestsSuite.java | 5 ++
.../drill/maprdb/tests/json/TestSimpleJson.java | 62 ++++++-------
3 files changed, 66 insertions(+), 95 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/drill/blob/869cfbdf/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/json/MaprDBJsonRecordReader.java
----------------------------------------------------------------------
diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/json/MaprDBJsonRecordReader.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/json/MaprDBJsonRecordReader.java
index fd8bf93..b5e4ceb 100644
--- a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/json/MaprDBJsonRecordReader.java
+++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/json/MaprDBJsonRecordReader.java
@@ -37,7 +37,6 @@ import org.apache.drill.exec.ops.OperatorStats;
import org.apache.drill.exec.physical.impl.OutputMutator;
import org.apache.drill.exec.store.AbstractRecordReader;
import org.apache.drill.exec.store.maprdb.MapRDBSubScanSpec;
-import org.apache.drill.exec.store.maprdb.util.CommonFns;
import org.apache.drill.exec.vector.BaseValueVector;
import org.apache.drill.exec.vector.complex.impl.VectorContainerWriter;
import org.apache.drill.exec.vector.complex.writer.BaseWriter.ListWriter;
@@ -49,9 +48,7 @@ import org.ojai.DocumentReader;
import org.ojai.DocumentReader.EventType;
import org.ojai.DocumentStream;
import org.ojai.FieldPath;
-import org.ojai.Value;
import org.ojai.store.QueryCondition;
-import org.ojai.store.QueryCondition.Op;
import com.google.common.base.Preconditions;
import com.google.common.base.Stopwatch;
@@ -86,9 +83,11 @@ public class MaprDBJsonRecordReader extends AbstractRecordReader {
private DocumentStream<Document> documentStream;
private Iterator<DocumentReader> documentReaderIterators;
-
+
private boolean includeId;
+ private String currentFieldName;
+
public MaprDBJsonRecordReader(MapRDBSubScanSpec subScanSpec,
List<SchemaPath> projectedColumns, FragmentContext context) {
buffer = context.getManagedBuffer();
@@ -98,22 +97,6 @@ public class MaprDBJsonRecordReader extends AbstractRecordReader {
setColumns(projectedColumns);
}
- private void addKeyCondition(QueryCondition condition, Op op, byte[] key) {
- if (!CommonFns.isNullOrEmpty(key)) {
- Value value = IdCodec.decode(key);
- switch (value.getType()) {
- case STRING:
- condition.is(ID_FIELD, op, value.getString());
- return;
- case BINARY:
- condition.is(ID_FIELD, op, value.getBinary());
- return;
- default:
- throw new UnsupportedOperationException("");
- }
- }
- }
-
@Override
protected Collection<SchemaPath> transformColumns(Collection<SchemaPath> columns) {
Set<SchemaPath> transformed = Sets.newLinkedHashSet();
@@ -145,7 +128,7 @@ public class MaprDBJsonRecordReader extends AbstractRecordReader {
try {
table = MapRDB.getTable(tableName);
- table.setOption(TableOption.EXCLUDEID, true);
+ table.setOption(TableOption.EXCLUDEID, !includeId);
documentStream = table.find(condition, projectedFields);
documentReaderIterators = documentStream.documentReaders().iterator();
} catch (DBException e) {
@@ -171,25 +154,11 @@ public class MaprDBJsonRecordReader extends AbstractRecordReader {
throw new IllegalStateException("The document did not start with START_MAP!");
}
try {
- MapWriter map = writer.rootAsMap();
- if (includeId && reader.getId() != null) {
- switch (reader.getId().getType()) {
- case BINARY:
- writeBinary(map.varBinary(ID_KEY), reader.getId().getBinary());
- break;
- case STRING:
- writeString(map.varChar(ID_KEY), reader.getId().getString());
- break;
- default:
- throw new UnsupportedOperationException(reader.getId().getType() +
- " is not a supported type for _id field.");
- }
- }
- writeToMap(reader, map);
+ writeToMap(reader, writer.rootAsMap());
recordCount++;
- } catch (IllegalStateException e) {
- logger.warn(String.format("Possible schema change at _id: %s",
- IdCodec.asString(reader.getId())), e);
+ } catch (IllegalArgumentException e) {
+ logger.warn(String.format("Possible schema change at _id: '%s', field: '%s'",
+ IdCodec.asString(reader.getId()), currentFieldName), e);
}
}
@@ -199,62 +168,60 @@ public class MaprDBJsonRecordReader extends AbstractRecordReader {
}
private void writeToMap(DBDocumentReaderBase reader, MapWriter map) {
- String fieldName = null;
map.start();
outside: while (true) {
EventType event = reader.next();
- if (event == null) break outside;
- fieldName = reader.getFieldName();
+ if (event == null || event == EventType.END_MAP) break outside;
+
+ currentFieldName = reader.getFieldName();
switch (event) {
case NULL:
- map.varChar(fieldName).write(null); // treat as VARCHAR for now
+ map.varChar(currentFieldName).write(null); // treat as VARCHAR for now
case BINARY:
- writeBinary(map.varBinary(fieldName), reader.getBinary());
+ writeBinary(map.varBinary(currentFieldName), reader.getBinary());
break;
case BOOLEAN:
- map.bit(fieldName).writeBit(reader.getBoolean() ? 1 : 0);
+ map.bit(currentFieldName).writeBit(reader.getBoolean() ? 1 : 0);
break;
case STRING:
- writeString(map.varChar(fieldName), reader.getString());
+ writeString(map.varChar(currentFieldName), reader.getString());
break;
case BYTE:
- map.tinyInt(fieldName).writeTinyInt(reader.getByte());
+ map.tinyInt(currentFieldName).writeTinyInt(reader.getByte());
break;
case SHORT:
- map.smallInt(fieldName).writeSmallInt(reader.getShort());
+ map.smallInt(currentFieldName).writeSmallInt(reader.getShort());
break;
case INT:
- map.integer(fieldName).writeInt(reader.getInt());
+ map.integer(currentFieldName).writeInt(reader.getInt());
break;
case LONG:
- map.bigInt(fieldName).writeBigInt(reader.getLong());
+ map.bigInt(currentFieldName).writeBigInt(reader.getLong());
break;
case FLOAT:
- map.float4(fieldName).writeFloat4(reader.getFloat());
+ map.float4(currentFieldName).writeFloat4(reader.getFloat());
break;
case DOUBLE:
- map.float8(fieldName).writeFloat8(reader.getDouble());
+ map.float8(currentFieldName).writeFloat8(reader.getDouble());
break;
case DECIMAL:
throw new UnsupportedOperationException("Decimals are currently not supported.");
case DATE:
- map.date(fieldName).writeDate(reader.getDate().toDate().getTime());
+ map.date(currentFieldName).writeDate(reader.getDate().toDate().getTime());
break;
case TIME:
- map.time(fieldName).writeTime(reader.getTimeInt());
+ map.time(currentFieldName).writeTime(reader.getTimeInt());
break;
case TIMESTAMP:
- map.timeStamp(fieldName).writeTimeStamp(reader.getTimestampLong());
+ map.timeStamp(currentFieldName).writeTimeStamp(reader.getTimestampLong());
break;
case INTERVAL:
throw new UnsupportedOperationException("Interval is currently not supported.");
case START_MAP:
- writeToMap(reader, map.map(fieldName));
+ writeToMap(reader, map.map(currentFieldName));
break;
- case END_MAP:
- break outside;
case START_ARRAY:
- writeToList(reader, map.list(fieldName));
+ writeToList(reader, map.list(currentFieldName));
break;
case END_ARRAY:
throw new IllegalStateException("Shouldn't get a END_ARRAY inside a map");
@@ -269,7 +236,8 @@ public class MaprDBJsonRecordReader extends AbstractRecordReader {
list.startList();
outside: while (true) {
EventType event = reader.next();
- if (event == null) break outside;
+ if (event == null || event == EventType.END_ARRAY) break outside;
+
switch (event) {
case NULL:
list.varChar().write(null); // treat as VARCHAR for now
@@ -321,8 +289,6 @@ public class MaprDBJsonRecordReader extends AbstractRecordReader {
case START_ARRAY:
writeToList(reader, list.list());
break;
- case END_ARRAY:
- break outside;
default:
throw new UnsupportedOperationException("Unsupported type: " + event);
}
@@ -331,14 +297,14 @@ public class MaprDBJsonRecordReader extends AbstractRecordReader {
}
private void writeBinary(VarBinaryWriter binaryWriter, ByteBuffer buf) {
- buffer.reallocIfNeeded(buf.remaining());
+ buffer = buffer.reallocIfNeeded(buf.remaining());
buffer.setBytes(0, buf, buf.position(), buf.remaining());
binaryWriter.writeVarBinary(0, buf.remaining(), buffer);
}
private void writeString(VarCharWriter varCharWriter, String string) {
final byte[] strBytes = Bytes.toBytes(string);
- buffer.reallocIfNeeded(strBytes.length);
+ buffer = buffer.reallocIfNeeded(strBytes.length);
buffer.setBytes(0, strBytes);
varCharWriter.writeVarChar(0, strBytes.length, buffer);
}
http://git-wip-us.apache.org/repos/asf/drill/blob/869cfbdf/contrib/format-maprdb/src/test/java/com/mapr/drill/maprdb/tests/MaprDBTestsSuite.java
----------------------------------------------------------------------
diff --git a/contrib/format-maprdb/src/test/java/com/mapr/drill/maprdb/tests/MaprDBTestsSuite.java b/contrib/format-maprdb/src/test/java/com/mapr/drill/maprdb/tests/MaprDBTestsSuite.java
index 0f54796..e81aa09 100644
--- a/contrib/format-maprdb/src/test/java/com/mapr/drill/maprdb/tests/MaprDBTestsSuite.java
+++ b/contrib/format-maprdb/src/test/java/com/mapr/drill/maprdb/tests/MaprDBTestsSuite.java
@@ -108,6 +108,11 @@ public class MaprDBTestsSuite {
" \"location\": \"/tmp\"," +
" \"writable\": false," +
" \"defaultInputFormat\": \"maprdb\"" +
+ " }," +
+ " \"root\": {" +
+ " \"location\": \"/\"," +
+ " \"writable\": false," +
+ " \"defaultInputFormat\": \"maprdb\"" +
" }" +
" }," +
" \"formats\": {" +
http://git-wip-us.apache.org/repos/asf/drill/blob/869cfbdf/contrib/format-maprdb/src/test/java/com/mapr/drill/maprdb/tests/json/TestSimpleJson.java
----------------------------------------------------------------------
diff --git a/contrib/format-maprdb/src/test/java/com/mapr/drill/maprdb/tests/json/TestSimpleJson.java b/contrib/format-maprdb/src/test/java/com/mapr/drill/maprdb/tests/json/TestSimpleJson.java
index f4c7e89..f05b87a 100644
--- a/contrib/format-maprdb/src/test/java/com/mapr/drill/maprdb/tests/json/TestSimpleJson.java
+++ b/contrib/format-maprdb/src/test/java/com/mapr/drill/maprdb/tests/json/TestSimpleJson.java
@@ -67,10 +67,10 @@ public class TestSimpleJson extends BaseTestQuery {
+ " name = 'Sprint'"
;
runSQLAndVerifyCount(sql, 1);
-
+
final String[] expectedPlan = {"condition=\\(name = \"Sprint\"\\)"};
final String[] excludedPlan = {};
-
+
PlanTestBase.testPlanMatchingPatterns(sql, expectedPlan, excludedPlan);
}
@@ -85,13 +85,13 @@ public class TestSimpleJson extends BaseTestQuery {
+ " name LIKE 'S%'"
;
runSQLAndVerifyCount(sql, 3);
-
+
final String[] expectedPlan = {"condition=\\(name MATCHES \"\\^\\\\\\\\QS\\\\\\\\E\\.\\*\\$\"\\)"};
final String[] excludedPlan = {};
-
+
PlanTestBase.testPlanMatchingPatterns(sql, expectedPlan, excludedPlan);
}
-
+
@Test
public void testPushdownStringNotEqual() throws Exception {
setColumnWidths(new int[] {25, 40, 40, 40});
@@ -103,10 +103,10 @@ public class TestSimpleJson extends BaseTestQuery {
+ " name <> 'Sprint'"
;
runSQLAndVerifyCount(sql, 9);
-
+
final String[] expectedPlan = {"condition=\\(name != \"Sprint\"\\)"};
final String[] excludedPlan = {};
-
+
PlanTestBase.testPlanMatchingPatterns(sql, expectedPlan, excludedPlan);
}
@@ -121,10 +121,10 @@ public class TestSimpleJson extends BaseTestQuery {
+ " zip = 85260"
;
runSQLAndVerifyCount(sql, 1);
-
+
final String[] expectedPlan = {"condition=\\(zip = \\{\"\\$numberLong\":85260\\}\\)"};
final String[] excludedPlan = {};
-
+
PlanTestBase.testPlanMatchingPatterns(sql, expectedPlan, excludedPlan);
}
@@ -141,10 +141,10 @@ public class TestSimpleJson extends BaseTestQuery {
+ " city = 'Las Vegas'"
;
runSQLAndVerifyCount(sql, 4);
-
+
final String[] expectedPlan = {"condition=\\(\\(zip = \\{\"\\$numberLong\":85260\\}\\) or \\(city = \"Las Vegas\"\\)\\)"};
final String[] excludedPlan = {};
-
+
PlanTestBase.testPlanMatchingPatterns(sql, expectedPlan, excludedPlan);
}
@@ -159,10 +159,10 @@ public class TestSimpleJson extends BaseTestQuery {
+ " _id = 'jFTZmywe7StuZ2hEjxyA'"
;
runSQLAndVerifyCount(sql, 1);
-
+
final String[] expectedPlan = {"condition=\\(_id = \"jFTZmywe7StuZ2hEjxyA\"\\)"};
final String[] excludedPlan ={};
-
+
PlanTestBase.testPlanMatchingPatterns(sql, expectedPlan, excludedPlan);
}
@@ -179,10 +179,10 @@ public class TestSimpleJson extends BaseTestQuery {
+ " name = 'Subway'"
;
runSQLAndVerifyCount(sql, 1);
-
+
final String[] expectedPlan = {"condition=\\(\\(_id = \"jFTZmywe7StuZ2hEjxyA\"\\) and \\(name = \"Subway\"\\)\\)"};
final String[] excludedPlan ={};
-
+
PlanTestBase.testPlanMatchingPatterns(sql, expectedPlan, excludedPlan);
}
@@ -197,10 +197,10 @@ public class TestSimpleJson extends BaseTestQuery {
+ " b.`attributes.Ambience.casual` = false"
;
runSQLAndVerifyCount(sql, 1);
-
+
final String[] expectedPlan = {"condition=\\(attributes.Ambience.casual = false\\)"};
final String[] excludedPlan ={};
-
+
PlanTestBase.testPlanMatchingPatterns(sql, expectedPlan, excludedPlan);
}
@@ -215,10 +215,10 @@ public class TestSimpleJson extends BaseTestQuery {
+ " b.`attributes.Attire` = 'casual'"
;
runSQLAndVerifyCount(sql, 4);
-
+
final String[] expectedPlan = {"condition=\\(attributes.Attire = \"casual\"\\)"};
final String[] excludedPlan ={};
-
+
PlanTestBase.testPlanMatchingPatterns(sql, expectedPlan, excludedPlan);
}
@Test
@@ -233,10 +233,10 @@ public class TestSimpleJson extends BaseTestQuery {
+ " business.`attributes.Ambience.casual` IS NULL"
;
runSQLAndVerifyCount(sql, 7);
-
+
final String[] expectedPlan = {"condition=\\(attributes.Ambience.casual = null\\)"};
final String[] excludedPlan ={};
-
+
PlanTestBase.testPlanMatchingPatterns(sql, expectedPlan, excludedPlan);
}
@@ -252,10 +252,10 @@ public class TestSimpleJson extends BaseTestQuery {
+ " b.`attributes.Ambience.casual` IS NOT NULL"
;
runSQLAndVerifyCount(sql, 3);
-
+
final String[] expectedPlan = {"condition=\\(attributes.Ambience.casual != null\\)"};
final String[] excludedPlan ={};
-
+
PlanTestBase.testPlanMatchingPatterns(sql, expectedPlan, excludedPlan);
}
@@ -270,10 +270,10 @@ public class TestSimpleJson extends BaseTestQuery {
+ " b.`attributes.Accepts Credit Cards` IS NULL"
;
runSQLAndVerifyCount(sql, 3);
-
+
final String[] expectedPlan = {"condition=\\(attributes.Accepts Credit Cards = null\\)"};
final String[] excludedPlan ={};
-
+
PlanTestBase.testPlanMatchingPatterns(sql, expectedPlan, excludedPlan);
}
@@ -287,10 +287,10 @@ public class TestSimpleJson extends BaseTestQuery {
+ " stars > 4.0"
;
runSQLAndVerifyCount(sql, 2);
-
+
final String[] expectedPlan = {"condition=\\(stars > 4\\)"};
final String[] excludedPlan ={};
-
+
PlanTestBase.testPlanMatchingPatterns(sql, expectedPlan, excludedPlan);
}
@@ -305,13 +305,13 @@ public class TestSimpleJson extends BaseTestQuery {
+ " stars > 4.1"
;
runSQLAndVerifyCount(sql, 1);
-
+
final String[] expectedPlan = {"condition=\\(\\(attributes.Good For.lunch = true\\) and \\(stars > 4.1\\)\\)"};
final String[] excludedPlan ={};
-
+
PlanTestBase.testPlanMatchingPatterns(sql, expectedPlan, excludedPlan);
}
-
+
/*
@Test
public void testPushDownSubField5() throws Exception {
@@ -363,7 +363,7 @@ public class TestSimpleJson extends BaseTestQuery {
runSQLAndVerifyCount(sql, 1);
}
*/
-
+
protected List<QueryDataBatch> runHBaseSQLlWithResults(String sql) throws Exception {
System.out.println("Running query:\n" + sql);
return testSqlWithResults(sql);