You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@drill.apache.org by ja...@apache.org on 2013/08/16 03:44:40 UTC
[02/27] git commit: Repeated values in JsonRecordReader
Repeated values in JsonRecordReader
Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/38ab96f3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/38ab96f3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/38ab96f3
Branch: refs/heads/master
Commit: 38ab96f335537c6bbdb6a4a64b1c6e13755172f6
Parents: 73fad99
Author: Timothy Chen <tn...@gmail.com>
Authored: Sat Aug 3 15:29:35 2013 -0700
Committer: Timothy Chen <tn...@gmail.com>
Committed: Thu Aug 8 23:36:41 2013 -0700
----------------------------------------------------------------------
.../drill/common/expression/SchemaPath.java | 2 +-
.../templates/RepeatedValueVectors.java | 10 +-
.../exec/schema/json/jackson/JacksonHelper.java | 116 +++++++++++--------
.../drill/exec/store/JSONRecordReader.java | 86 ++++++++------
.../apache/drill/exec/store/VectorHolder.java | 20 ++--
.../drill/exec/vector/AllocationHelper.java | 2 +-
.../drill/exec/vector/RepeatedMutator.java | 23 ----
.../physical/impl/TestSimpleFragmentRun.java | 81 ++++++++++++-
.../drill/exec/store/JSONRecordReaderTest.java | 56 ++++++++-
.../src/test/resources/scan_json_test_4.json | 23 ++--
10 files changed, 279 insertions(+), 140 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/38ab96f3/sandbox/prototype/common/src/main/java/org/apache/drill/common/expression/SchemaPath.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/common/src/main/java/org/apache/drill/common/expression/SchemaPath.java b/sandbox/prototype/common/src/main/java/org/apache/drill/common/expression/SchemaPath.java
index 19d1069..6f1a733 100644
--- a/sandbox/prototype/common/src/main/java/org/apache/drill/common/expression/SchemaPath.java
+++ b/sandbox/prototype/common/src/main/java/org/apache/drill/common/expression/SchemaPath.java
@@ -42,7 +42,7 @@ public class SchemaPath extends LogicalExpressionBase {
")*$";
// reads well in RegexBuddy
- private static final String SEGMENT_REGEX = "(?:\n" + "(\\[\\d+\\])\n" + "|\n" + "'?\n"
+ private static final String SEGMENT_REGEX = "(?:\n" + "\\[(\\d+)\\]\n" + "|\n" + "'?\n"
+ "([^\\.\\[\\+\\-\\!\\]\\}]+) # identifier\n" + "'?\n" + ")\n"
+ "([\\+\\-\\!\\]\\}]?) # collision type";
private static final int GROUP_INDEX = 1;
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/38ab96f3/sandbox/prototype/exec/java-exec/src/main/codegen/ValueVectors/templates/RepeatedValueVectors.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/codegen/ValueVectors/templates/RepeatedValueVectors.java b/sandbox/prototype/exec/java-exec/src/main/codegen/ValueVectors/templates/RepeatedValueVectors.java
index c629a1d..f4a7049 100644
--- a/sandbox/prototype/exec/java-exec/src/main/codegen/ValueVectors/templates/RepeatedValueVectors.java
+++ b/sandbox/prototype/exec/java-exec/src/main/codegen/ValueVectors/templates/RepeatedValueVectors.java
@@ -1,4 +1,4 @@
-
+import org.apache.drill.exec.vector.ValueVector;
<@pp.dropOutputFile />
<#list types as type>
@@ -249,7 +249,7 @@ import com.google.common.collect.Lists;
}
}
- public final class Mutator implements RepeatedMutator {
+ public final class Mutator implements ValueVector.Mutator {
private Mutator(){
@@ -264,12 +264,18 @@ import com.google.common.collect.Lists;
*/
public void add(int index, <#if type.major == "VarLen">byte[]<#elseif (type.width < 4)>int<#else>${minor.javaType!type.javaType}</#if> value) {
int nextOffset = offsets.getAccessor().get(index+1);
+ if (index > 0 && nextOffset == 0) {
+ nextOffset = offsets.getAccessor().get(index);
+ }
values.getMutator().set(nextOffset, value);
offsets.getMutator().set(index+1, nextOffset+1);
}
public void add(int index, ${minor.class}Holder holder){
int nextOffset = offsets.getAccessor().get(index+1);
+ if (index > 0 && nextOffset == 0) {
+ nextOffset = offsets.getAccessor().get(index);
+ }
values.getMutator().set(nextOffset, holder);
offsets.getMutator().set(index+1, nextOffset+1);
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/38ab96f3/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/schema/json/jackson/JacksonHelper.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/schema/json/jackson/JacksonHelper.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/schema/json/jackson/JacksonHelper.java
index 0e2c052..d8f0646 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/schema/json/jackson/JacksonHelper.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/schema/json/jackson/JacksonHelper.java
@@ -18,63 +18,83 @@
package org.apache.drill.exec.schema.json.jackson;
-import java.io.IOException;
-
-import org.apache.drill.common.types.Types;
-import org.apache.drill.common.types.TypeProtos.DataMode;
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.core.JsonToken;
import org.apache.drill.common.types.TypeProtos.MajorType;
import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.common.types.Types;
-import com.fasterxml.jackson.core.JsonParser;
-import com.fasterxml.jackson.core.JsonToken;
+import java.io.IOException;
public class JacksonHelper {
- public static final MajorType STRING_TYPE = Types.optional(MinorType.VARCHAR);
- public static final MajorType BOOLEAN_TYPE = Types.optional(MinorType.BIT);
- public static final MajorType ARRAY_TYPE = Types.repeated(MinorType.LATE);
- public static final MajorType MAP_TYPE = Types.repeated(MinorType.MAP);
- public static final MajorType INT_TYPE = Types.optional(MinorType.INT);
- public static final MajorType FLOAT_TYPE = Types.optional(MinorType.FLOAT4);
- public static final MajorType NULL_TYPE = Types.optional(MinorType.LATE);
+ public static final MajorType STRING_TYPE = Types.optional(MinorType.VARCHAR);
+ public static final MajorType REPEATED_STRING_TYPE = Types.repeated(MinorType.VARCHAR);
+ public static final MajorType BOOLEAN_TYPE = Types.optional(MinorType.BIT);
+ public static final MajorType REPEATED_BOOLEAN_TYPE = Types.repeated(MinorType.BIT);
+ public static final MajorType ARRAY_TYPE = Types.repeated(MinorType.LATE);
+ public static final MajorType MAP_TYPE = Types.repeated(MinorType.MAP);
+ public static final MajorType INT_TYPE = Types.optional(MinorType.INT);
+ public static final MajorType REPEATED_INT_TYPE = Types.repeated(MinorType.INT);
+ public static final MajorType FLOAT_TYPE = Types.optional(MinorType.FLOAT4);
+ public static final MajorType REPEATED_FLOAT_TYPE = Types.repeated(MinorType.FLOAT4);
+ public static final MajorType NULL_TYPE = Types.optional(MinorType.LATE);
- public static MajorType getFieldType(JsonToken token) {
- switch(token) {
- case VALUE_STRING:
- return STRING_TYPE;
- case VALUE_FALSE:
- return BOOLEAN_TYPE;
- case VALUE_TRUE:
- return BOOLEAN_TYPE;
- case START_ARRAY:
- return ARRAY_TYPE;
- case START_OBJECT:
- return MAP_TYPE;
- case VALUE_NUMBER_INT:
- return INT_TYPE;
- case VALUE_NUMBER_FLOAT:
- return FLOAT_TYPE;
- case VALUE_NULL:
- return NULL_TYPE;
- }
+ public static MajorType getFieldType(JsonToken token, boolean repeated) {
+ if (repeated) {
+ switch (token) {
+ case VALUE_STRING:
+ return REPEATED_STRING_TYPE;
+ case VALUE_FALSE:
+ case VALUE_TRUE:
+ return REPEATED_BOOLEAN_TYPE;
+ case START_ARRAY:
+ return ARRAY_TYPE;
+ case START_OBJECT:
+ return MAP_TYPE;
+ case VALUE_NUMBER_INT:
+ return REPEATED_INT_TYPE;
+ case VALUE_NUMBER_FLOAT:
+ return REPEATED_FLOAT_TYPE;
+ }
+ } else {
- throw new UnsupportedOperationException("Unsupported Jackson type: " + token);
+ switch (token) {
+ case VALUE_STRING:
+ return STRING_TYPE;
+ case VALUE_FALSE:
+ case VALUE_TRUE:
+ return BOOLEAN_TYPE;
+ case START_ARRAY:
+ return ARRAY_TYPE;
+ case START_OBJECT:
+ return MAP_TYPE;
+ case VALUE_NUMBER_INT:
+ return INT_TYPE;
+ case VALUE_NUMBER_FLOAT:
+ return FLOAT_TYPE;
+ case VALUE_NULL:
+ return NULL_TYPE;
+ }
}
- public static Object getValueFromFieldType(JsonParser parser, MinorType fieldType) throws IOException {
- switch (fieldType) {
- case INT:
- return parser.getIntValue();
- case VARCHAR:
- return parser.getValueAsString();
- case FLOAT4:
- return parser.getFloatValue();
- case BIT:
- return parser.getBooleanValue();
- case LATE:
- return null;
- default:
- throw new RuntimeException("Unexpected Field type to return value: " + fieldType.toString());
- }
+ throw new UnsupportedOperationException("Unsupported Jackson type: " + token + ", Repeated: " + repeated);
+ }
+
+ public static Object getValueFromFieldType(JsonParser parser, MinorType fieldType) throws IOException {
+ switch (fieldType) {
+ case INT:
+ return parser.getIntValue();
+ case VARCHAR:
+ return parser.getValueAsString();
+ case FLOAT4:
+ return parser.getFloatValue();
+ case BIT:
+ return parser.getBooleanValue();
+ case LATE:
+ return null;
+ default:
+ throw new RuntimeException("Unexpected Field type to return value: " + fieldType.toString());
}
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/38ab96f3/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/JSONRecordReader.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/JSONRecordReader.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/JSONRecordReader.java
index ff7d315..a4887c0 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/JSONRecordReader.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/JSONRecordReader.java
@@ -103,7 +103,7 @@ public class JSONRecordReader implements RecordReader {
int nextRowIndex = 0;
try {
- while (ReadType.OBJECT.readRecord(null, this, null, nextRowIndex++, 0)) {
+ while (ReadType.OBJECT.readRecord(this, null, nextRowIndex++, 0)) {
parser.nextToken(); // Read to START_OBJECT token
if (!parser.hasCurrentToken()) {
@@ -180,12 +180,14 @@ public class JSONRecordReader implements RecordReader {
ARRAY(END_ARRAY) {
@Override
public Field createField(RecordSchema parentSchema, String prefixFieldName, String fieldName, MajorType fieldType, int index) {
- return new OrderedField(parentSchema, fieldType, prefixFieldName, index);
+ return new NamedField(parentSchema, prefixFieldName, fieldName, fieldType);
+ //return new OrderedField(parentSchema, fieldType, prefixFieldName, index);
}
@Override
public RecordSchema createSchema() throws IOException {
- return new ListSchema();
+ return new ObjectSchema();
+ //return new ListSchema();
}
},
OBJECT(END_OBJECT) {
@@ -215,8 +217,7 @@ public class JSONRecordReader implements RecordReader {
}
@SuppressWarnings("ConstantConditions")
- public boolean readRecord(Field parentField,
- JSONRecordReader reader,
+ public boolean readRecord(JSONRecordReader reader,
String prefixFieldName,
int rowIndex,
int groupCount) throws IOException, SchemaChangeException {
@@ -232,7 +233,7 @@ public class JSONRecordReader implements RecordReader {
}
String fieldName = parser.getCurrentName();
- MajorType fieldType = JacksonHelper.getFieldType(token);
+ MajorType fieldType = JacksonHelper.getFieldType(token, this == ReadType.ARRAY);
ReadType readType = null;
switch (token) {
case START_ARRAY:
@@ -246,17 +247,17 @@ public class JSONRecordReader implements RecordReader {
}
if (fieldType != null) { // Including nulls
- isFull = isFull ||
- !recordData(
- parentField,
- readType,
- reader,
- fieldType,
- prefixFieldName,
- fieldName,
- rowIndex,
- colIndex,
- groupCount);
+ boolean currentFieldFull = !recordData(
+ readType,
+ reader,
+ fieldType,
+ prefixFieldName,
+ fieldName,
+ rowIndex,
+ colIndex,
+ groupCount);
+
+ isFull = isFull || currentFieldFull;
}
token = parser.nextToken();
colIndex += 1;
@@ -277,8 +278,7 @@ public class JSONRecordReader implements RecordReader {
}
}
- private boolean recordData(Field parentField,
- JSONRecordReader.ReadType readType,
+ private boolean recordData(JSONRecordReader.ReadType readType,
JSONRecordReader reader,
MajorType fieldType,
String prefixFieldName,
@@ -322,18 +322,13 @@ public class JSONRecordReader implements RecordReader {
field.assignSchemaIfNull(newSchema);
if (fieldSchema == null) reader.setCurrentSchema(newSchema);
- if(readType == ReadType.ARRAY) {
- readType.readRecord(field, reader, field.getFullFieldName(), rowIndex, groupCount);
- } else {
- readType.readRecord(field, reader, field.getFullFieldName(), rowIndex, groupCount);
- }
+ readType.readRecord(reader, field.getFullFieldName(), rowIndex, groupCount);
reader.setCurrentSchema(currentSchema);
} else {
return addValueToVector(
rowIndex,
holder,
- reader.getAllocator(),
JacksonHelper.getValueFromFieldType(
reader.getParser(),
fieldType.getMinorType()
@@ -346,10 +341,10 @@ public class JSONRecordReader implements RecordReader {
return true;
}
- private static <T> boolean addValueToVector(int index, VectorHolder holder, BufferAllocator allocator, T val, MinorType minorType, int groupCount) {
+ private static <T> boolean addValueToVector(int index, VectorHolder holder, T val, MinorType minorType, int groupCount) {
switch (minorType) {
case INT: {
- holder.incAndCheckLength(32);
+ holder.incAndCheckLength(32 + 1);
if (groupCount == 0) {
if (val != null) {
NullableIntVector int4 = (NullableIntVector) holder.getValueVector();
@@ -363,13 +358,14 @@ public class JSONRecordReader implements RecordReader {
RepeatedIntVector repeatedInt4 = (RepeatedIntVector) holder.getValueVector();
RepeatedIntVector.Mutator m = repeatedInt4.getMutator();
+ holder.setGroupCount(index);
m.add(index, (Integer) val);
}
- return holder.hasEnoughSpace(32);
+ return holder.hasEnoughSpace(32 + 1);
}
case FLOAT4: {
- holder.incAndCheckLength(32);
+ holder.incAndCheckLength(32 + 1);
if (groupCount == 0) {
if (val != null) {
NullableFloat4Vector float4 = (NullableFloat4Vector) holder.getValueVector();
@@ -383,9 +379,10 @@ public class JSONRecordReader implements RecordReader {
RepeatedFloat4Vector repeatedFloat4 = (RepeatedFloat4Vector) holder.getValueVector();
RepeatedFloat4Vector.Mutator m = repeatedFloat4.getMutator();
- m.add(groupCount, (Float) val);
+ holder.setGroupCount(index);
+ m.add(index, (Float) val);
}
- return holder.hasEnoughSpace(32);
+ return holder.hasEnoughSpace(32 + 1);
}
case VARCHAR: {
if (val == null) {
@@ -401,16 +398,29 @@ public class JSONRecordReader implements RecordReader {
} else {
RepeatedVarCharVector repeatedVarLen4 = (RepeatedVarCharVector) holder.getValueVector();
RepeatedVarCharVector.Mutator m = repeatedVarLen4.getMutator();
+ holder.setGroupCount(index);
m.add(index, bytes);
}
- return holder.hasEnoughSpace(length);
+ return holder.hasEnoughSpace(length + 4 + 1);
}
}
case BIT: {
- holder.incAndCheckLength(1);
- NullableBitVector bit = (NullableBitVector) holder.getValueVector();
- if (val != null) {
- bit.getMutator().set(index, (Boolean) val ? 1 : 0);
+ holder.incAndCheckLength(1 + 1);
+ if (groupCount == 0) {
+ if (val != null) {
+ NullableBitVector bit = (NullableBitVector) holder.getValueVector();
+ NullableBitVector.Mutator m = bit.getMutator();
+ m.set(index, (Boolean) val ? 1 : 0);
+ }
+ } else {
+ if (val == null) {
+ throw new UnsupportedOperationException("Nullable repeated boolean is not supported.");
+ }
+
+ RepeatedBitVector repeatedBit = (RepeatedBitVector) holder.getValueVector();
+ RepeatedBitVector.Mutator m = repeatedBit.getMutator();
+ holder.setGroupCount(index);
+ m.add(index, (Boolean) val ? 1 : 0);
}
return holder.hasEnoughSpace(1 + 1);
}
@@ -443,7 +453,9 @@ public class JSONRecordReader implements RecordReader {
MajorType type = field.getFieldType();
MaterializedField f = MaterializedField.create(new SchemaPath(field.getFullFieldName(), ExpressionPosition.UNKNOWN), type);
- if (f.getType().getMinorType().equals(MinorType.MAP)) {
+ MinorType minorType = f.getType().getMinorType();
+
+ if (minorType.equals(MinorType.MAP) || minorType.equals(MinorType.LATE)) {
return null;
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/38ab96f3/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/VectorHolder.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/VectorHolder.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/VectorHolder.java
index 43d3cd9..2c28082 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/VectorHolder.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/VectorHolder.java
@@ -64,22 +64,18 @@ public class VectorHolder {
public void populateVectorLength() {
ValueVector.Mutator mutator = vector.getMutator();
- if(mutator instanceof NonRepeatedMutator) {
- ((NonRepeatedMutator)mutator).setValueCount(count);
- } else if(mutator instanceof RepeatedMutator) {
- ((RepeatedMutator)mutator).setGroupAndValueCount(groupCount, count);
+ if(vector instanceof RepeatedFixedWidthVector || vector instanceof RepeatedVariableWidthVector) {
+ mutator.setValueCount(groupCount);
} else {
- throw new UnsupportedOperationException("Mutator not supported: " + mutator.getClass().getName());
+ mutator.setValueCount(count);
}
}
public void allocateNew(int valueLength) {
- if (vector instanceof FixedWidthVector) {
- ((FixedWidthVector) vector).allocateNew(valueLength);
- } else if (vector instanceof VariableWidthVector) {
- ((VariableWidthVector) vector).allocateNew(valueLength * 10, valueLength);
- } else {
- throw new UnsupportedOperationException();
- }
+ AllocationHelper.allocate(vector, valueLength, 10, 5);
+ }
+
+ public void allocateNew(int valueLength, int repeatedPerTop) {
+ AllocationHelper.allocate(vector, valueLength, 10, repeatedPerTop);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/38ab96f3/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/vector/AllocationHelper.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/vector/AllocationHelper.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/vector/AllocationHelper.java
index 69c17f4..5007dbd 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/vector/AllocationHelper.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/vector/AllocationHelper.java
@@ -10,7 +10,7 @@ public class AllocationHelper {
public static void allocate(ValueVector v, int valueCount, int bytesPerValue, int repeatedPerTop){
if(v instanceof FixedWidthVector){
((FixedWidthVector) v).allocateNew(valueCount);
- }else if(v instanceof VariableWidthVector){
+ } else if (v instanceof VariableWidthVector) {
((VariableWidthVector) v).allocateNew(valueCount * bytesPerValue, valueCount);
}else if(v instanceof RepeatedFixedWidthVector){
((RepeatedFixedWidthVector) v).allocateNew(valueCount, valueCount * repeatedPerTop);
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/38ab96f3/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/vector/RepeatedMutator.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/vector/RepeatedMutator.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/vector/RepeatedMutator.java
deleted file mode 100644
index 1227d02..0000000
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/vector/RepeatedMutator.java
+++ /dev/null
@@ -1,23 +0,0 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- ******************************************************************************/
-
-package org.apache.drill.exec.vector;
-
-public interface RepeatedMutator extends ValueVector.Mutator {
- public void setGroupAndValueCount(int groupCount, int valueCount);
-}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/38ab96f3/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestSimpleFragmentRun.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestSimpleFragmentRun.java b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestSimpleFragmentRun.java
index 5d4e700..cabe9b3 100644
--- a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestSimpleFragmentRun.java
+++ b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestSimpleFragmentRun.java
@@ -87,7 +87,6 @@ public class TestSimpleFragmentRun extends PopUnitTestBase {
System.out.println();
}
-
for (int i = 0; i < batchLoader.getRecordCount(); i++) {
boolean first = true;
recordCount++;
@@ -101,13 +100,87 @@ public class TestSimpleFragmentRun extends PopUnitTestBase {
}
if(!first) System.out.println();
}
-
-
-
}
logger.debug("Received results {}", results);
assertEquals(recordCount, 200);
}
}
+
+ @Test
+ public void runJSONScanPopFragment() throws Exception {
+ try (RemoteServiceSet serviceSet = RemoteServiceSet.getLocalServiceSet();
+ Drillbit bit = new Drillbit(CONFIG, serviceSet);
+ DrillClient client = new DrillClient(CONFIG, serviceSet.getCoordinator())) {
+
+ // run query.
+ bit.run();
+ client.connect();
+ List<QueryResultBatch> results = client.runQuery(QueryType.PHYSICAL, Files.toString(FileUtils.getResourceAsFile("/physical_json_scan_test1.json"), Charsets.UTF_8));
+
+ // look at records
+ RecordBatchLoader batchLoader = new RecordBatchLoader(bit.getContext().getAllocator());
+ int recordCount = 0;
+
+ int expectedBatchCount = 2;
+
+ assertEquals(expectedBatchCount, results.size());
+
+ for (int i = 0; i < results.size(); ++i) {
+ QueryResultBatch batch = results.get(i);
+ if (i == 0) {
+ assertTrue(batch.hasData());
+ } else {
+ assertFalse(batch.hasData());
+ return;
+ }
+
+ assertTrue(batchLoader.load(batch.getHeader().getDef(), batch.getData()));
+ boolean firstColumn = true;
+
+ // print headers.
+ System.out.println("\n\n========NEW SCHEMA=========\n\n");
+ for (VectorWrapper<?> v : batchLoader) {
+
+ if (firstColumn) {
+ firstColumn = false;
+ } else {
+ System.out.print("\t");
+ }
+ System.out.print(v.getField().getName());
+ System.out.print("[");
+ System.out.print(v.getField().getType().getMinorType());
+ System.out.print("]");
+ }
+
+ System.out.println();
+
+
+ for (int r = 0; i < batchLoader.getRecordCount(); r++) {
+ boolean first = true;
+ recordCount++;
+ for (VectorWrapper<?> v : batchLoader) {
+ if (first) {
+ first = false;
+ } else {
+ System.out.print("\t");
+ }
+
+ ValueVector.Accessor accessor = v.getValueVector().getAccessor();
+
+ if (v.getField().getType().getMinorType() == TypeProtos.MinorType.VARCHAR) {
+ System.out.println(new String((byte[]) accessor.getObject(r), UTF_8));
+ } else {
+ System.out.print(accessor.getObject(r));
+ }
+ }
+ if (!first) System.out.println();
+ }
+
+ }
+
+ assertEquals(2, recordCount);
+ }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/38ab96f3/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/JSONRecordReaderTest.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/JSONRecordReaderTest.java b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/JSONRecordReaderTest.java
index 0ebb529..b39ac8a 100644
--- a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/JSONRecordReaderTest.java
+++ b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/JSONRecordReaderTest.java
@@ -5,6 +5,7 @@ import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.nio.charset.Charset;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
@@ -68,7 +69,12 @@ public class JSONRecordReaderTest {
SchemaDefProtos.FieldDef def = metadata.getDef();
assertEquals(expectedMinorType, def.getMajorType().getMinorType());
String[] parts = name.split("\\.");
- assertEquals(parts.length, def.getNameList().size());
+ int expected = parts.length;
+ boolean expectingArray = List.class.isAssignableFrom(value.getClass());
+ if (expectingArray) {
+ expected += 1;
+ }
+ assertEquals(expected, def.getNameList().size());
for(int i = 0; i < parts.length; ++i) {
assertEquals(parts[i], def.getName(i).getName());
}
@@ -78,10 +84,21 @@ public class JSONRecordReaderTest {
}
T val = (T) valueVector.getAccessor().getObject(index);
- if (val instanceof byte[]) {
- assertTrue(Arrays.equals((byte[]) value, (byte[]) val));
+ assertValue(value, val);
+ }
+
+ private void assertValue(Object expected, Object found) {
+ if (found instanceof byte[]) {
+ assertTrue(Arrays.equals((byte[]) expected, (byte[]) found));
+ } else if(found instanceof ArrayList) {
+ List expectedArray = (List) expected;
+ List foundArray = (List) found;
+ assertEquals(expectedArray.size(), foundArray.size());
+ for(int i = 0; i < expectedArray.size(); ++i) {
+ assertValue(expectedArray.get(i), foundArray.get(i));
+ }
} else {
- assertEquals(value, val);
+ assertEquals(expected, found);
}
}
@@ -234,4 +251,35 @@ public class JSONRecordReaderTest {
assertEquals(0, jr.next());
assertTrue(mutator.getRemovedFields().isEmpty());
}
+
+ @Test
+ public void testRepeatedFields(@Injectable final FragmentContext context) throws ExecutionSetupException {
+ new Expectations() {
+ {
+ context.getAllocator();
+ returns(new DirectBufferAllocator());
+ }
+ };
+
+ JSONRecordReader jr = new JSONRecordReader(context, getResource("scan_json_test_4.json"));
+
+ MockOutputMutator mutator = new MockOutputMutator();
+ List<ValueVector> addFields = mutator.getAddFields();
+ jr.setup(mutator);
+ assertEquals(2, jr.next());
+ assertEquals(7, addFields.size());
+ assertField(addFields.get(0), 0, MinorType.INT, 123, "test");
+ assertField(addFields.get(1), 0, MinorType.INT, Arrays.asList(1, 2, 3), "test2");
+ assertField(addFields.get(2), 0, MinorType.INT, Arrays.asList(4, 5, 6), "test3.a");
+ assertField(addFields.get(3), 0, MinorType.INT, Arrays.asList(7, 8, 9), "test3.b");
+ assertField(addFields.get(4), 0, MinorType.INT, Arrays.asList(10, 11, 12), "test3.c.d");
+ assertField(addFields.get(5), 0, MinorType.FLOAT4, Arrays.<Float>asList((float) 1.1, (float) 1.2, (float) 1.3), "testFloat");
+ assertField(addFields.get(6), 0, MinorType.VARCHAR, Arrays.asList("hello".getBytes(UTF_8), "drill".getBytes(UTF_8)), "testStr");
+ assertField(addFields.get(1), 1, MinorType.INT, Arrays.asList(1, 2), "test2");
+ assertField(addFields.get(2), 1, MinorType.INT, Arrays.asList(7, 7, 7, 8), "test3.a");
+ assertField(addFields.get(5), 1, MinorType.FLOAT4, Arrays.<Float>asList((float) 2.2, (float) 2.3,(float) 2.4), "testFloat");
+
+ assertEquals(0, jr.next());
+ assertTrue(mutator.getRemovedFields().isEmpty());
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/38ab96f3/sandbox/prototype/exec/java-exec/src/test/resources/scan_json_test_4.json
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/test/resources/scan_json_test_4.json b/sandbox/prototype/exec/java-exec/src/test/resources/scan_json_test_4.json
index 0fb3202..fd003ac 100644
--- a/sandbox/prototype/exec/java-exec/src/test/resources/scan_json_test_4.json
+++ b/sandbox/prototype/exec/java-exec/src/test/resources/scan_json_test_4.json
@@ -1,14 +1,21 @@
{
"test": 123,
"test2": [1,2,3],
- "a": {
- "b": 1
- }
+ "test3": {
+ "a": [4,5,6],
+ "b": [7,8,9],
+ "c": {
+ "d": [10, 11, 12]
+ }
+ },
+ "testFloat": [1.1, 1.2, 1.3],
+ "testStr": ["hello", "drill"]
}
{
- "test": 1234,
- "test3": false,
- "a": {
- "b": 2
- }
+ "test2": [1,2],
+ "test3": {
+ "a": [7,7,7,8],
+ "b": []
+ },
+ "testFloat": [2.2, 2.3, 2.4]
}
\ No newline at end of file