You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@drill.apache.org by ja...@apache.org on 2013/08/02 05:03:10 UTC
[10/13] git commit: Fix JsonRecordReader nested test
Fix JsonRecordReader nested test
Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/bff8ab65
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/bff8ab65
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/bff8ab65
Branch: refs/heads/master
Commit: bff8ab65c83f5256e8fc95ea58261071a161deb2
Parents: fdb5c41
Author: Timothy Chen <tn...@gmail.com>
Authored: Tue Jul 23 21:20:06 2013 -0700
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Thu Aug 1 19:30:47 2013 -0700
----------------------------------------------------------------------
.../drill/exec/store/JSONRecordReader.java | 683 ++++++++++---------
.../drill/exec/store/JSONRecordReaderTest.java | 84 +--
2 files changed, 369 insertions(+), 398 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/bff8ab65/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/JSONRecordReader.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/JSONRecordReader.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/JSONRecordReader.java
index f1b5538..f72b519 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/JSONRecordReader.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/JSONRecordReader.java
@@ -31,13 +31,7 @@ import org.apache.drill.exec.schema.OrderedField;
import org.apache.drill.exec.schema.RecordSchema;
import org.apache.drill.exec.schema.SchemaIdGenerator;
import org.apache.drill.exec.schema.json.jackson.JacksonHelper;
-import org.apache.drill.exec.vector.AllocationHelper;
-import org.apache.drill.exec.vector.NullableBitVector;
-import org.apache.drill.exec.vector.NullableFloat4Vector;
-import org.apache.drill.exec.vector.NullableIntVector;
-import org.apache.drill.exec.vector.NullableVarCharVector;
-import org.apache.drill.exec.vector.TypeHelper;
-import org.apache.drill.exec.vector.ValueVector;
+import org.apache.drill.exec.vector.*;
import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.core.JsonParser;
@@ -51,377 +45,384 @@ import com.google.common.io.InputSupplier;
import com.google.common.io.Resources;
public class JSONRecordReader implements RecordReader {
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(JSONRecordReader.class);
- private static final int DEFAULT_LENGTH = 256 * 1024; // 256kb
- public static final Charset UTF_8 = Charset.forName("UTF-8");
-
- private final String inputPath;
-
- private final Map<Field, VectorHolder> valueVectorMap;
-
- private JsonParser parser;
- private SchemaIdGenerator generator;
- private DiffSchema diffSchema;
- private RecordSchema currentSchema;
- private List<Field> removedFields;
- private OutputMutator outputMutator;
- private BufferAllocator allocator;
- private int batchSize;
-
- public JSONRecordReader(FragmentContext fragmentContext, String inputPath, int batchSize) {
- this.inputPath = inputPath;
- this.allocator = fragmentContext.getAllocator();
- this.batchSize = batchSize;
- valueVectorMap = Maps.newHashMap();
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(JSONRecordReader.class);
+ private static final int DEFAULT_LENGTH = 256 * 1024; // 256kb
+ public static final Charset UTF_8 = Charset.forName("UTF-8");
+
+ private final String inputPath;
+
+ private final Map<Field, VectorHolder> valueVectorMap;
+
+ private JsonParser parser;
+ private SchemaIdGenerator generator;
+ private DiffSchema diffSchema;
+ private RecordSchema currentSchema;
+ private List<Field> removedFields;
+ private OutputMutator outputMutator;
+ private BufferAllocator allocator;
+ private int batchSize;
+
+ public JSONRecordReader(FragmentContext fragmentContext, String inputPath, int batchSize) {
+ this.inputPath = inputPath;
+ this.allocator = fragmentContext.getAllocator();
+ this.batchSize = batchSize;
+ valueVectorMap = Maps.newHashMap();
+ }
+
+ public JSONRecordReader(FragmentContext fragmentContext, String inputPath) {
+ this(fragmentContext, inputPath, DEFAULT_LENGTH);
+ }
+
+ private JsonParser getParser() {
+ return parser;
+ }
+
+ @Override
+ public void setup(OutputMutator output) throws ExecutionSetupException {
+ outputMutator = output;
+ currentSchema = new ObjectSchema();
+ diffSchema = new DiffSchema();
+ removedFields = Lists.newArrayList();
+
+ try {
+ InputSupplier<InputStreamReader> input;
+ if (inputPath.startsWith("resource:")) {
+ input = Resources.newReaderSupplier(Resources.getResource(inputPath.substring(9)), Charsets.UTF_8);
+ } else {
+ input = Files.newReaderSupplier(new File(inputPath), Charsets.UTF_8);
+ }
+
+ JsonFactory factory = new JsonFactory();
+ parser = factory.createJsonParser(input.getInput());
+ parser.nextToken(); // Read to the first START_OBJECT token
+ generator = new SchemaIdGenerator();
+ } catch (IOException e) {
+ throw new ExecutionSetupException(e);
}
+ }
- public JSONRecordReader(FragmentContext fragmentContext, String inputPath) {
- this(fragmentContext, inputPath, DEFAULT_LENGTH);
+ @Override
+ public int next() {
+ if (parser.isClosed() || !parser.hasCurrentToken()) {
+ return 0;
}
- private JsonParser getParser() {
- return parser;
- }
+ resetBatch();
- @Override
- public void setup(OutputMutator output) throws ExecutionSetupException {
- outputMutator = output;
- currentSchema = new ObjectSchema();
- diffSchema = new DiffSchema();
- removedFields = Lists.newArrayList();
-
- try {
- InputSupplier<InputStreamReader> input;
- if (inputPath.startsWith("resource:")) {
- input = Resources.newReaderSupplier(Resources.getResource(inputPath.substring(9)), Charsets.UTF_8);
- } else {
- input = Files.newReaderSupplier(new File(inputPath), Charsets.UTF_8);
- }
-
- JsonFactory factory = new JsonFactory();
- parser = factory.createJsonParser(input.getInput());
- parser.nextToken(); // Read to the first START_OBJECT token
- generator = new SchemaIdGenerator();
- } catch (IOException e) {
- throw new ExecutionSetupException(e);
- }
- }
-
- @Override
- public int next() {
- if (parser.isClosed() || !parser.hasCurrentToken()) {
- return 0;
- }
+ int nextRowIndex = 0;
- resetBatch();
+ try {
+ while (ReadType.OBJECT.readRecord(null, this, null, nextRowIndex++)) {
+ parser.nextToken(); // Read to START_OBJECT token
- int nextRowIndex = 0;
-
- try {
- while (ReadType.OBJECT.readRecord(null, this, null, nextRowIndex++)) {
- parser.nextToken(); // Read to START_OBJECT token
-
- if (!parser.hasCurrentToken()) {
- parser.close();
- break;
- }
- }
+ if (!parser.hasCurrentToken()) {
+ parser.close();
+ break;
+ }
+ }
- parser.nextToken();
+ parser.nextToken();
- if (!parser.hasCurrentToken()) {
- parser.close();
- }
+ if (!parser.hasCurrentToken()) {
+ parser.close();
+ }
- // Garbage collect fields never referenced in this batch
- for (Field field : Iterables.concat(currentSchema.removeUnreadFields(), removedFields)) {
- diffSchema.addRemovedField(field);
- outputMutator.removeField(field.getAsMaterializedField());
- }
+ // Garbage collect fields never referenced in this batch
+ for (Field field : Iterables.concat(currentSchema.removeUnreadFields(), removedFields)) {
+ diffSchema.addRemovedField(field);
+ outputMutator.removeField(field.getAsMaterializedField());
+ }
- } catch (IOException | SchemaChangeException e) {
- logger.error("Error reading next in Json reader", e);
- }
- return nextRowIndex;
+ } catch (IOException | SchemaChangeException e) {
+ logger.error("Error reading next in Json reader", e);
}
+ return nextRowIndex;
+ }
- private void resetBatch() {
- for (VectorHolder value : valueVectorMap.values()) {
- value.reset();
- }
-
- currentSchema.resetMarkedFields();
- diffSchema.reset();
- removedFields.clear();
+ private void resetBatch() {
+ for (VectorHolder value : valueVectorMap.values()) {
+ value.reset();
}
- @Override
- public void cleanup() {
- try {
- parser.close();
- } catch (IOException e) {
- logger.warn("Error closing Json parser", e);
- }
+ currentSchema.resetMarkedFields();
+ diffSchema.reset();
+ removedFields.clear();
+ }
+
+ @Override
+ public void cleanup() {
+ try {
+ parser.close();
+ } catch (IOException e) {
+ logger.warn("Error closing Json parser", e);
}
-
-
- private RecordSchema getCurrentSchema() {
- return currentSchema;
+ }
+
+
+ private RecordSchema getCurrentSchema() {
+ return currentSchema;
+ }
+
+ private void setCurrentSchema(RecordSchema schema) {
+ currentSchema = schema;
+ }
+
+ private List<Field> getRemovedFields() {
+ return removedFields;
+ }
+
+ private DiffSchema getDiffSchema() {
+ return diffSchema;
+ }
+
+ public BufferAllocator getAllocator() {
+ return allocator;
+ }
+
+ public OutputMutator getOutputMutator() {
+ return outputMutator;
+ }
+
+ public static enum ReadType {
+ ARRAY(END_ARRAY) {
+ @Override
+ public Field createField(RecordSchema parentSchema, String prefixFieldName, String fieldName, MajorType fieldType, int index) {
+ return new OrderedField(parentSchema, fieldType, prefixFieldName, index);
+ }
+
+ @Override
+ public RecordSchema createSchema() throws IOException {
+ return new ListSchema();
+ }
+ },
+ OBJECT(END_OBJECT) {
+ @Override
+ public Field createField(RecordSchema parentSchema,
+ String prefixFieldName,
+ String fieldName,
+ MajorType fieldType,
+ int index) {
+ return new NamedField(parentSchema, prefixFieldName, fieldName, fieldType);
+ }
+
+ @Override
+ public RecordSchema createSchema() throws IOException {
+ return new ObjectSchema();
+ }
+ };
+
+ private final JsonToken endObject;
+
+ ReadType(JsonToken endObject) {
+ this.endObject = endObject;
}
- private void setCurrentSchema(RecordSchema schema) {
- currentSchema = schema;
+ public JsonToken getEndObject() {
+ return endObject;
}
- private List<Field> getRemovedFields() {
- return removedFields;
- }
+ public boolean readRecord(Field parentField,
+ JSONRecordReader reader,
+ String prefixFieldName,
+ int rowIndex) throws IOException, SchemaChangeException {
+ JsonParser parser = reader.getParser();
+ JsonToken token = parser.nextToken();
+ JsonToken endObject = getEndObject();
+ int colIndex = 0;
+ boolean isFull = false;
+ while (token != endObject) {
+ if (token == FIELD_NAME) {
+ token = parser.nextToken();
+ continue;
+ }
- private DiffSchema getDiffSchema() {
- return diffSchema;
- }
+ String fieldName = parser.getCurrentName();
+ MajorType fieldType = JacksonHelper.getFieldType(token);
+ ReadType readType = null;
+ switch (token) {
+ case START_ARRAY:
+ readType = ReadType.ARRAY;
+ break;
+ case START_OBJECT:
+ readType = ReadType.OBJECT;
+ break;
+ }
+ if (fieldType != null) { // Including nulls
+ boolean currentFieldNotFull = recordData(
+ parentField,
+ readType,
+ reader,
+ fieldType,
+ prefixFieldName,
+ fieldName,
+ rowIndex, colIndex);
+
+ isFull = isFull || !currentFieldNotFull;
- public BufferAllocator getAllocator() {
- return allocator;
+ }
+ token = parser.nextToken();
+ colIndex += 1;
+ }
+ return !isFull;
}
- public OutputMutator getOutputMutator() {
- return outputMutator;
+ private void removeChildFields(List<Field> removedFields, Field field) {
+ RecordSchema schema = field.getAssignedSchema();
+ if (schema == null) {
+ return;
+ }
+ for (Field childField : schema.getFields()) {
+ removedFields.add(childField);
+ if (childField.hasSchema()) {
+ removeChildFields(removedFields, childField);
+ }
+ }
}
- public static enum ReadType {
- ARRAY(END_ARRAY) {
- @Override
- public Field createField(RecordSchema parentSchema, String prefixFieldName, String fieldName, MajorType fieldType, int index) {
- return new OrderedField(parentSchema, fieldType, prefixFieldName, index);
- }
-
- @Override
- public RecordSchema createSchema() throws IOException {
- return new ListSchema();
- }
- },
- OBJECT(END_OBJECT) {
- @Override
- public Field createField(RecordSchema parentSchema,
- String prefixFieldName,
- String fieldName,
- MajorType fieldType,
- int index) {
- return new NamedField(parentSchema, prefixFieldName, fieldName, fieldType);
- }
-
- @Override
- public RecordSchema createSchema() throws IOException {
- return new ObjectSchema();
- }
- };
-
- private final JsonToken endObject;
-
- ReadType(JsonToken endObject) {
- this.endObject = endObject;
+ private boolean recordData(Field parentField,
+ JSONRecordReader.ReadType readType,
+ JSONRecordReader reader,
+ MajorType fieldType,
+ String prefixFieldName,
+ String fieldName,
+ int rowIndex,
+ int colIndex) throws IOException, SchemaChangeException {
+ RecordSchema currentSchema = reader.getCurrentSchema();
+ Field field = currentSchema.getField(fieldName, colIndex);
+ boolean isFieldFound = field != null;
+ List<Field> removedFields = reader.getRemovedFields();
+ if (!isFieldFound || !field.getFieldType().equals(fieldType)) {
+ if (isFieldFound) {
+ if (field.hasSchema()) {
+ removeChildFields(removedFields, field);
+ }
+ removedFields.add(field);
+ currentSchema.removeField(field, colIndex);
}
- public JsonToken getEndObject() {
- return endObject;
- }
+ field = createField(
+ currentSchema,
+ prefixFieldName,
+ fieldName,
+ fieldType,
+ colIndex
+ );
+
+ reader.recordNewField(field);
+ currentSchema.addField(field);
+ }
+
+ field.setRead(true);
+
+ VectorHolder holder = getOrCreateVectorHolder(reader, field);
+ if (readType != null) {
+ RecordSchema fieldSchema = field.getAssignedSchema();
+ reader.setCurrentSchema(fieldSchema);
+
+ RecordSchema newSchema = readType.createSchema();
+ field.assignSchemaIfNull(newSchema);
+
+ if (fieldSchema == null) reader.setCurrentSchema(newSchema);
+ readType.readRecord(field, reader, field.getFullFieldName(), rowIndex);
+
+ reader.setCurrentSchema(currentSchema);
+ } else if (holder != null) {
+ return addValueToVector(
+ rowIndex,
+ holder,
+ reader.getAllocator(),
+ JacksonHelper.getValueFromFieldType(
+ reader.getParser(),
+ fieldType.getMinorType()
+ ),
+ fieldType.getMinorType()
+ );
+ }
+
+ return true;
+ }
- public boolean readRecord(Field parentField,
- JSONRecordReader reader,
- String prefixFieldName,
- int rowIndex) throws IOException, SchemaChangeException {
- JsonParser parser = reader.getParser();
- JsonToken token = parser.nextToken();
- JsonToken endObject = getEndObject();
- int colIndex = 0;
- boolean isFull = false;
- while (token != endObject) {
- if (token == FIELD_NAME) {
- token = parser.nextToken();
- continue;
- }
-
- String fieldName = parser.getCurrentName();
- MajorType fieldType = JacksonHelper.getFieldType(token);
- ReadType readType = null;
- switch (token) {
- case START_ARRAY:
- readType = ReadType.ARRAY;
- break;
- case START_OBJECT:
- readType = ReadType.OBJECT;
- break;
- }
- if (fieldType != null) { // Including nulls
- isFull = isFull ||
- !recordData(
- parentField,
- readType,
- reader,
- fieldType,
- prefixFieldName,
- fieldName,
- rowIndex, colIndex);
- }
- token = parser.nextToken();
- colIndex += 1;
- }
- return !isFull;
+ private static <T> boolean addValueToVector(int index, VectorHolder holder, BufferAllocator allocator, T val, MinorType minorType) {
+ switch (minorType) {
+ case INT: {
+ holder.incAndCheckLength(32 + 1);
+ NullableIntVector int4 = (NullableIntVector) holder.getValueVector();
+ NullableIntVector.Mutator m = int4.getMutator();
+ if (val != null) {
+ m.set(index, (Integer) val);
+ }
+ return holder.hasEnoughSpace(32 + 1);
}
-
- private void removeChildFields(List<Field> removedFields, Field field) {
- RecordSchema schema = field.getAssignedSchema();
- if (schema == null) {
- return;
- }
- for (Field childField : schema.getFields()) {
- removedFields.add(childField);
- if (childField.hasSchema()) {
- removeChildFields(removedFields, childField);
- }
- }
+ case FLOAT4: {
+ holder.incAndCheckLength(32 + 1);
+ NullableFloat4Vector float4 = (NullableFloat4Vector) holder.getValueVector();
+ NullableFloat4Vector.Mutator m = float4.getMutator();
+ if (val != null) {
+ m.set(index, (Float) val);
+ }
+ return holder.hasEnoughSpace(32 + 1);
}
-
- private boolean recordData(Field parentField,
- JSONRecordReader.ReadType readType,
- JSONRecordReader reader,
- MajorType fieldType,
- String prefixFieldName,
- String fieldName,
- int rowIndex,
- int colIndex) throws IOException, SchemaChangeException {
- RecordSchema currentSchema = reader.getCurrentSchema();
- Field field = currentSchema.getField(fieldName, colIndex);
- boolean isFieldFound = field != null;
- List<Field> removedFields = reader.getRemovedFields();
- if (!isFieldFound || !field.getFieldType().equals(fieldType)) {
- if (isFieldFound) {
- if (field.hasSchema()) {
- removeChildFields(removedFields, field);
- }
- removedFields.add(field);
- currentSchema.removeField(field, colIndex);
- }
-
- field = createField(
- currentSchema,
- prefixFieldName,
- fieldName,
- fieldType,
- colIndex
- );
-
- reader.recordNewField(field);
- currentSchema.addField(field);
- }
-
- field.setRead(true);
-
- VectorHolder holder = getOrCreateVectorHolder(reader, field);
- if (readType != null) {
- RecordSchema fieldSchema = field.getAssignedSchema();
- reader.setCurrentSchema(fieldSchema);
-
- RecordSchema newSchema = readType.createSchema();
- field.assignSchemaIfNull(newSchema);
-
- if (fieldSchema == null) reader.setCurrentSchema(newSchema);
- readType.readRecord(field, reader, field.getFullFieldName(), rowIndex);
-
- reader.setCurrentSchema(currentSchema);
- } else {
- return addValueToVector(
- rowIndex,
- holder,
- reader.getAllocator(),
- JacksonHelper.getValueFromFieldType(
- reader.getParser(),
- fieldType.getMinorType()
- ),
- fieldType.getMinorType()
- );
- }
-
- return true;
+ case VARCHAR: {
+ if (val == null) {
+ return (index + 1) * 4 <= holder.getLength();
+ } else {
+ byte[] bytes = ((String) val).getBytes(UTF_8);
+ int length = bytes.length;
+ holder.incAndCheckLength(length);
+ NullableVarCharVector varLen4 = (NullableVarCharVector) holder.getValueVector();
+ NullableVarCharVector.Mutator m = varLen4.getMutator();
+ m.set(index, bytes);
+ return holder.hasEnoughSpace(length + 4 + 1);
+ }
}
-
- private static <T> boolean addValueToVector(int index, VectorHolder holder, BufferAllocator allocator, T val, MinorType minorType) {
- switch (minorType) {
- case INT: {
- holder.incAndCheckLength(32);
- NullableIntVector int4 = (NullableIntVector) holder.getValueVector();
- NullableIntVector.Mutator m = int4.getMutator();
- if (val != null) {
- m.set(index, (Integer) val);
- }
- return holder.hasEnoughSpace(32);
- }
- case FLOAT4: {
- holder.incAndCheckLength(32);
- NullableFloat4Vector float4 = (NullableFloat4Vector) holder.getValueVector();
- NullableFloat4Vector.Mutator m = float4.getMutator();
- if (val != null) {
- m.set(index, (Float) val);
- }
- return holder.hasEnoughSpace(32);
- }
- case VARCHAR: {
- if (val == null) {
- return (index + 1) * 4 <= holder.getLength();
- } else {
- byte[] bytes = ((String) val).getBytes(UTF_8);
- int length = bytes.length;
- holder.incAndCheckLength(length);
- NullableVarCharVector varLen4 = (NullableVarCharVector) holder.getValueVector();
- NullableVarCharVector.Mutator m = varLen4.getMutator();
- m.set(index, bytes);
- return holder.hasEnoughSpace(length);
- }
- }
- case BIT: {
- holder.incAndCheckLength(1);
- NullableBitVector bit = (NullableBitVector) holder.getValueVector();
- if (val != null) {
- bit.getMutator().set(index, (Boolean)val ? 1 : 0);
- }
- return holder.hasEnoughSpace(1);
- }
- default:
- throw new DrillRuntimeException("Type not supported to add value. Type: " + minorType);
- }
+ case BIT: {
+ holder.incAndCheckLength(1);
+ NullableBitVector bit = (NullableBitVector) holder.getValueVector();
+ if (val != null) {
+ bit.getMutator().set(index, (Boolean) val ? 1 : 0);
+ }
+ return holder.hasEnoughSpace(1 + 1);
}
+ default:
+ throw new DrillRuntimeException("Type not supported to add value. Type: " + minorType);
+ }
+ }
- private VectorHolder getOrCreateVectorHolder(JSONRecordReader reader, Field field) throws SchemaChangeException {
- return reader.getOrCreateVectorHolder(field);
- }
+ private VectorHolder getOrCreateVectorHolder(JSONRecordReader reader, Field field) throws SchemaChangeException {
+ return reader.getOrCreateVectorHolder(field);
+ }
- public abstract RecordSchema createSchema() throws IOException;
+ public abstract RecordSchema createSchema() throws IOException;
- public abstract Field createField(RecordSchema parentSchema,
- String prefixFieldName,
- String fieldName,
- MajorType fieldType,
- int index);
- }
+ public abstract Field createField(RecordSchema parentSchema,
+ String prefixFieldName,
+ String fieldName,
+ MajorType fieldType,
+ int index);
+ }
- private void recordNewField(Field field) {
- diffSchema.recordNewField(field);
- }
+ private void recordNewField(Field field) {
+ diffSchema.recordNewField(field);
+ }
- private VectorHolder getOrCreateVectorHolder(Field field) throws SchemaChangeException {
- VectorHolder holder = valueVectorMap.get(field);
-
- if (holder == null) {
- MajorType type = field.getFieldType();
- MaterializedField f = MaterializedField.create(new SchemaPath(field.getFieldName(), ExpressionPosition.UNKNOWN), type);
- ValueVector v = TypeHelper.getNewVector(f, allocator);
- AllocationHelper.allocate(v, batchSize, 50);
- holder = new VectorHolder(batchSize, v);
- valueVectorMap.put(field, holder);
- outputMutator.addField(v);
- return holder;
- }
- return holder;
+ private VectorHolder getOrCreateVectorHolder(Field field) throws SchemaChangeException {
+ VectorHolder holder = valueVectorMap.get(field);
+
+ if (holder == null) {
+ MajorType type = field.getFieldType();
+ MaterializedField f = MaterializedField.create(new SchemaPath(field.getFullFieldName(), ExpressionPosition.UNKNOWN), type);
+
+ if (f.getType().getMinorType().equals(MinorType.MAP)) {
+ return null;
+ }
+
+ ValueVector v = TypeHelper.getNewVector(f, allocator);
+ AllocationHelper.allocate(v, batchSize, 50);
+ holder = new VectorHolder(batchSize, v);
+ valueVectorMap.put(field, holder);
+ outputMutator.addField(v);
+ return holder;
}
+ return holder;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/bff8ab65/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/JSONRecordReaderTest.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/JSONRecordReaderTest.java b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/JSONRecordReaderTest.java
index 661b029..0ebb529 100644
--- a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/JSONRecordReaderTest.java
+++ b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/JSONRecordReaderTest.java
@@ -8,6 +8,8 @@ import java.nio.charset.Charset;
import java.util.Arrays;
import java.util.List;
+import com.google.common.base.Predicate;
+import com.google.common.collect.Iterables;
import mockit.Expectations;
import mockit.Injectable;
@@ -65,7 +67,11 @@ public class JSONRecordReaderTest {
UserBitShared.FieldMetadata metadata = valueVector.getMetadata();
SchemaDefProtos.FieldDef def = metadata.getDef();
assertEquals(expectedMinorType, def.getMajorType().getMinorType());
- assertEquals(name, def.getNameList().get(0).getName());
+ String[] parts = name.split("\\.");
+ assertEquals(parts.length, def.getNameList().size());
+ for(int i = 0; i < parts.length; ++i) {
+ assertEquals(parts[i], def.getName(i).getName());
+ }
if (expectedMinorType == MinorType.MAP) {
return;
@@ -144,7 +150,7 @@ public class JSONRecordReaderTest {
assertEquals(0, jr.next());
}
- @Test @Ignore
+ @Test
public void testChangedSchemaInTwoBatches(@Injectable final FragmentContext context) throws IOException,
ExecutionSetupException {
new Expectations() {
@@ -177,7 +183,7 @@ public class JSONRecordReaderTest {
assertField(addFields.get(4), 0, MinorType.VARCHAR, "test2".getBytes(UTF_8), "str1");
assertField(addFields.get(5), 0, MinorType.INT, 4, "d");
assertEquals(1, removedFields.size());
- //assertEquals(3, (int) removedFields.get(0));
+ assertEquals("c", removedFields.get(0).getName());
removedFields.clear();
assertEquals(1, jr.next());
assertEquals(8, addFields.size()); // The reappearing of field 'c' is also included
@@ -187,12 +193,22 @@ public class JSONRecordReaderTest {
assertField(addFields.get(6), 0, MinorType.FLOAT4, (float) 5.16, "c");
assertField(addFields.get(7), 0, MinorType.VARCHAR, "test3".getBytes(UTF_8), "str2");
assertEquals(2, removedFields.size());
-// assertTrue(removedFields.contains(5));
-// assertTrue(removedFields.contains(2));
+ Iterables.find(removedFields, new Predicate<MaterializedField>() {
+ @Override
+ public boolean apply(MaterializedField materializedField) {
+ return materializedField.getName().equals("str1");
+ }
+ });
+ Iterables.find(removedFields, new Predicate<MaterializedField>() {
+ @Override
+ public boolean apply(MaterializedField materializedField) {
+ return materializedField.getName().equals("b");
+ }
+ });
assertEquals(0, jr.next());
}
- @Test @Ignore
+ @Test
public void testNestedFieldInSameBatch(@Injectable final FragmentContext context) throws ExecutionSetupException {
new Expectations() {
{
@@ -207,61 +223,15 @@ public class JSONRecordReaderTest {
List<ValueVector> addFields = mutator.getAddFields();
jr.setup(mutator);
assertEquals(2, jr.next());
- assertEquals(5, addFields.size());
+ assertEquals(3, addFields.size());
assertField(addFields.get(0), 0, MinorType.INT, 123, "test");
- assertField(addFields.get(1), 0, MinorType.MAP, null, "a");
- assertField(addFields.get(2), 0, MinorType.VARCHAR, "test".getBytes(UTF_8), "b");
- assertField(addFields.get(3), 0, MinorType.MAP, null, "a");
- assertField(addFields.get(4), 0, MinorType.BIT, true, "d");
+ assertField(addFields.get(1), 0, MinorType.VARCHAR, "test".getBytes(UTF_8), "a.b");
+ assertField(addFields.get(2), 0, MinorType.BIT, true, "a.a.d");
assertField(addFields.get(0), 1, MinorType.INT, 1234, "test");
- assertField(addFields.get(1), 1, MinorType.MAP, null, "a");
- assertField(addFields.get(2), 1, MinorType.VARCHAR, "test2".getBytes(UTF_8), "b");
- assertField(addFields.get(3), 1, MinorType.MAP, null, "a");
- assertField(addFields.get(4), 1, MinorType.BIT, true, "d");
+ assertField(addFields.get(1), 1, MinorType.VARCHAR, "test2".getBytes(UTF_8), "a.b");
+ assertField(addFields.get(2), 1, MinorType.BIT, false, "a.a.d");
assertEquals(0, jr.next());
assertTrue(mutator.getRemovedFields().isEmpty());
}
-
- /*
- *
- * @Test public void testScanJsonRemovedOneField() throws IOException { ScanJson sj = new
- * ScanJson(getResource("scan_json_test_3.json")); PhysicalOperatorIterator iterator = sj.getIterator();
- * expectSchemaChanged(iterator); DiffSchema diffSchema = expectSchemaChanged(iterator).getSchemaChanges();
- * assertEquals(0, diffSchema.getAddedFields().size()); assertEquals(1, diffSchema.getRemovedFields().size());
- * assertEquals(PhysicalOperatorIterator.NextOutcome.NONE_LEFT, iterator.next()); }
- *
- * @Test public void testScanJsonAddOneRemoveOne() throws IOException { ScanJson sj = new
- * ScanJson(getResource("scan_json_test_4.json")); PhysicalOperatorIterator iterator = sj.getIterator();
- * expectSchemaChanged(iterator); DiffSchema diffSchema = expectSchemaChanged(iterator).getSchemaChanges();
- * assertEquals(1, diffSchema.getAddedFields().size()); assertEquals(1, diffSchema.getRemovedFields().size());
- * assertEquals(PhysicalOperatorIterator.NextOutcome.NONE_LEFT, iterator.next()); }
- *
- * @Test public void testScanJsonCycleAdditions() throws IOException { ScanJson sj = new
- * ScanJson(getResource("scan_json_test_5.json")); PhysicalOperatorIterator iterator = sj.getIterator();
- * expectSchemaChanged(iterator); DiffSchema diffSchema = expectSchemaChanged(iterator).getSchemaChanges();
- * assertEquals(1, diffSchema.getAddedFields().size()); assertEquals(1, diffSchema.getRemovedFields().size());
- * diffSchema = expectSchemaChanged(iterator).getSchemaChanges(); assertEquals(1, diffSchema.getAddedFields().size());
- * assertEquals(Field.FieldType.FLOAT, diffSchema.getAddedFields().get(0).getFieldType()); assertEquals("test2",
- * diffSchema.getAddedFields().get(0).getFieldName()); assertEquals(1, diffSchema.getRemovedFields().size());
- * assertEquals(Field.FieldType.BOOLEAN, diffSchema.getRemovedFields().get(0).getFieldType()); assertEquals("test3",
- * diffSchema.getRemovedFields().get(0).getFieldName()); assertEquals(PhysicalOperatorIterator.NextOutcome.NONE_LEFT,
- * iterator.next()); }
- *
- * @Test public void testScanJsonModifiedOneFieldType() throws IOException { ScanJson sj = new
- * ScanJson(getResource("scan_json_test_6.json")); PhysicalOperatorIterator iterator = sj.getIterator();
- * expectSchemaChanged(iterator); DiffSchema diffSchema = expectSchemaChanged(iterator).getSchemaChanges();
- * List<Field> addedFields = diffSchema.getAddedFields(); assertEquals(4, addedFields.size()); List<Field>
- * removedFields = diffSchema.getRemovedFields(); assertEquals(4, removedFields.size()); assertFieldExists("test",
- * Field.FieldType.STRING, addedFields); assertFieldExists("test2", Field.FieldType.BOOLEAN, addedFields);
- * assertFieldExists("b", Field.FieldType.ARRAY, addedFields); assertFieldExists("[0]", Field.FieldType.INTEGER,
- * addedFields); assertFieldExists("test", Field.FieldType.INTEGER, removedFields); assertFieldExists("test2",
- * Field.FieldType.ARRAY, removedFields); assertFieldExists("b", Field.FieldType.INTEGER, removedFields);
- * assertFieldExists("[0]", Field.FieldType.INTEGER, removedFields);
- * assertEquals(PhysicalOperatorIterator.NextOutcome.NONE_LEFT, iterator.next()); }
- *
- * private void expectSchemaChanged(PhysicalOperatorIterator iterator) throws IOException { }
- *
- * private void expectDataRecord(PhysicalOperatorIterator iterator) throws IOException { }
- */
}