You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@drill.apache.org by ja...@apache.org on 2013/08/16 03:44:42 UTC
[04/27] git commit: Fix late type binding for json record reader
Fix late type binding for json record reader
Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/b1e48b32
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/b1e48b32
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/b1e48b32
Branch: refs/heads/master
Commit: b1e48b32e3bc5e240a01d75f83ac5d2be4b2e7ae
Parents: a15f5b1
Author: Timothy Chen <tn...@gmail.com>
Authored: Sun Aug 11 11:55:24 2013 -0700
Committer: Timothy Chen <tn...@gmail.com>
Committed: Sun Aug 11 11:55:24 2013 -0700
----------------------------------------------------------------------
.../org/apache/drill/common/types/Types.java | 27 ++++-
.../org/apache/drill/exec/schema/Field.java | 111 +++++++++++--------
.../exec/schema/json/jackson/JacksonHelper.java | 1 +
.../drill/exec/store/JSONRecordReader.java | 49 +++++---
.../drill/exec/store/JSONRecordReaderTest.java | 41 +++++--
.../src/test/resources/scan_json_test_5.json | 33 +++---
6 files changed, 170 insertions(+), 92 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b1e48b32/sandbox/prototype/common/src/main/java/org/apache/drill/common/types/Types.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/common/src/main/java/org/apache/drill/common/types/Types.java b/sandbox/prototype/common/src/main/java/org/apache/drill/common/types/Types.java
index e81bc89..f07f726 100644
--- a/sandbox/prototype/common/src/main/java/org/apache/drill/common/types/Types.java
+++ b/sandbox/prototype/common/src/main/java/org/apache/drill/common/types/Types.java
@@ -4,6 +4,8 @@ import org.apache.drill.common.types.TypeProtos.DataMode;
import org.apache.drill.common.types.TypeProtos.MajorType;
import org.apache.drill.common.types.TypeProtos.MinorType;
+import static org.apache.drill.common.types.TypeProtos.DataMode.REPEATED;
+
public class Types {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(Types.class);
@@ -16,7 +18,7 @@ public class Types {
}
public static boolean isNumericType(MajorType type){
- if(type.getMode() == DataMode.REPEATED) return false;
+ if(type.getMode() == REPEATED) return false;
switch(type.getMinorType()){
case BIGINT:
@@ -40,7 +42,7 @@ public class Types {
}
public static boolean usesHolderForGet(MajorType type){
- if(type.getMode() == DataMode.REPEATED) return true;
+ if(type.getMode() == REPEATED) return true;
switch(type.getMinorType()){
case BIGINT:
case DECIMAL4:
@@ -76,7 +78,7 @@ public class Types {
public static boolean isStringScalarType(MajorType type){
- if(type.getMode() == DataMode.REPEATED) return false;
+ if(type.getMode() == REPEATED) return false;
switch(type.getMinorType()){
case FIXEDCHAR:
case FIXED16CHAR:
@@ -89,7 +91,7 @@ public class Types {
}
public static boolean isBytesScalarType(MajorType type){
- if(type.getMode() == DataMode.REPEATED) return false;
+ if(type.getMode() == REPEATED) return false;
switch(type.getMinorType()){
case FIXEDBINARY:
case VARBINARY:
@@ -100,7 +102,7 @@ public class Types {
}
public static Comparability getComparability(MajorType type){
- if(type.getMode() == DataMode.REPEATED) return Comparability.NONE;
+ if(type.getMode() == REPEATED) return Comparability.NONE;
if(type.getMinorType() == MinorType.LATE) return Comparability.UNKNOWN;
switch(type.getMinorType()){
@@ -144,12 +146,25 @@ public class Types {
}
public static MajorType repeated(MinorType type){
- return MajorType.newBuilder().setMode(DataMode.REPEATED).setMinorType(type).build();
+ return MajorType.newBuilder().setMode(REPEATED).setMinorType(type).build();
}
public static MajorType optional(MinorType type){
return MajorType.newBuilder().setMode(DataMode.OPTIONAL).setMinorType(type).build();
}
+
+ public static MajorType overrideMinorType(MajorType originalMajorType, MinorType overrideMinorType) {
+ switch(originalMajorType.getMode()) {
+ case REPEATED:
+ return repeated(overrideMinorType);
+ case OPTIONAL:
+ return optional(overrideMinorType);
+ case REQUIRED:
+ return required(overrideMinorType);
+ default:
+ throw new UnsupportedOperationException();
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b1e48b32/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/schema/Field.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/schema/Field.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/schema/Field.java
index 85bbdf3..080be92 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/schema/Field.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/schema/Field.java
@@ -20,6 +20,7 @@ package org.apache.drill.exec.schema;
import org.apache.drill.common.expression.ExpressionPosition;
import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.types.TypeProtos;
import org.apache.drill.common.types.TypeProtos.MajorType;
import org.apache.drill.exec.record.MaterializedField;
@@ -27,64 +28,80 @@ import com.google.common.base.Objects;
import com.google.common.base.Strings;
public abstract class Field {
- final MajorType fieldType;
- final String prefixFieldName;
- RecordSchema schema;
- RecordSchema parentSchema;
- boolean read;
-
- public Field(RecordSchema parentSchema, MajorType type, String prefixFieldName) {
- fieldType = type;
- this.prefixFieldName = prefixFieldName;
- this.parentSchema = parentSchema;
+ final String prefixFieldName;
+ MajorType fieldType;
+ RecordSchema schema;
+ RecordSchema parentSchema;
+ boolean read;
+
+ public Field(RecordSchema parentSchema, MajorType type, String prefixFieldName) {
+ fieldType = type;
+ this.prefixFieldName = prefixFieldName;
+ this.parentSchema = parentSchema;
+ }
+
+ public MaterializedField getAsMaterializedField() {
+ return MaterializedField.create(new SchemaPath(getFieldName(), ExpressionPosition.UNKNOWN), fieldType);
+ }
+
+ public abstract String getFieldName();
+
+ public String getFullFieldName() {
+ String fieldName = getFieldName();
+ if(Strings.isNullOrEmpty(prefixFieldName)) {
+ return fieldName;
+ } else if(Strings.isNullOrEmpty(fieldName)) {
+ return prefixFieldName;
+ } else {
+ return prefixFieldName + "." + getFieldName();
}
+ }
- public MaterializedField getAsMaterializedField(){
- return MaterializedField.create(new SchemaPath(getFieldName(), ExpressionPosition.UNKNOWN), fieldType);
- }
-
- public abstract String getFieldName();
+ public void setRead(boolean read) {
+ this.read = read;
+ }
- public String getFullFieldName() {
- return Strings.isNullOrEmpty(prefixFieldName) ? getFieldName() : prefixFieldName + "." + getFieldName();
- }
+ protected abstract Objects.ToStringHelper addAttributesToHelper(Objects.ToStringHelper helper);
- public void setRead(boolean read) {
- this.read = read;
- }
+ Objects.ToStringHelper getAttributesStringHelper() {
+ return Objects.toStringHelper(this).add("type", fieldType)
+ .add("fullFieldName", getFullFieldName())
+ .add("schema", schema == null ? null : schema.toSchemaString()).omitNullValues();
+ }
- protected abstract Objects.ToStringHelper addAttributesToHelper(Objects.ToStringHelper helper);
+ @Override
+ public String toString() {
+ return addAttributesToHelper(getAttributesStringHelper()).toString();
+ }
- Objects.ToStringHelper getAttributesStringHelper() {
- return Objects.toStringHelper(this).add("type", fieldType)
- .add("fullFieldName", getFullFieldName())
- .add("schema", schema == null ? null : schema.toSchemaString()).omitNullValues();
- }
+ public RecordSchema getAssignedSchema() {
+ return schema;
+ }
- @Override
- public String toString() {
- return addAttributesToHelper(getAttributesStringHelper()).toString();
+ public void assignSchemaIfNull(RecordSchema newSchema) {
+ if (!hasSchema()) {
+ schema = newSchema;
}
+ }
- public RecordSchema getAssignedSchema() {
- return schema;
- }
+ public boolean isRead() {
+ return read;
+ }
- public void assignSchemaIfNull(RecordSchema newSchema) {
- if (!hasSchema()) {
- schema = newSchema;
- }
- }
+ public boolean hasSchema() {
+ return schema != null;
+ }
- public boolean isRead() {
- return read;
- }
+ public MajorType getFieldType() {
+ return fieldType;
+ }
- public boolean hasSchema() {
- return schema != null;
- }
+ public void setFieldType(MajorType fieldType) {
+ this.fieldType = fieldType;
+ }
- public MajorType getFieldType() {
- return fieldType;
- }
+ @Override
+ public int hashCode() {
+ return getFullFieldName().hashCode();
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b1e48b32/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/schema/json/jackson/JacksonHelper.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/schema/json/jackson/JacksonHelper.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/schema/json/jackson/JacksonHelper.java
index d8f0646..22167b1 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/schema/json/jackson/JacksonHelper.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/schema/json/jackson/JacksonHelper.java
@@ -92,6 +92,7 @@ public class JacksonHelper {
case BIT:
return parser.getBooleanValue();
case LATE:
+ case NULL:
return null;
default:
throw new RuntimeException("Unexpected Field type to return value: " + fieldType.toString());
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b1e48b32/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/JSONRecordReader.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/JSONRecordReader.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/JSONRecordReader.java
index a4887c0..21b8c1b 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/JSONRecordReader.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/JSONRecordReader.java
@@ -16,6 +16,7 @@ import org.apache.drill.common.expression.ExpressionPosition;
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.common.types.TypeProtos.MajorType;
import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.common.types.Types;
import org.apache.drill.exec.exception.SchemaChangeException;
import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.ops.FragmentContext;
@@ -42,7 +43,7 @@ public class JSONRecordReader implements RecordReader {
private final String inputPath;
- private final Map<Field, VectorHolder> valueVectorMap;
+ private final Map<String, VectorHolder> valueVectorMap;
private JsonParser parser;
private SchemaIdGenerator generator;
@@ -181,13 +182,11 @@ public class JSONRecordReader implements RecordReader {
@Override
public Field createField(RecordSchema parentSchema, String prefixFieldName, String fieldName, MajorType fieldType, int index) {
return new NamedField(parentSchema, prefixFieldName, fieldName, fieldType);
- //return new OrderedField(parentSchema, fieldType, prefixFieldName, index);
}
@Override
public RecordSchema createSchema() throws IOException {
return new ObjectSchema();
- //return new ListSchema();
}
},
OBJECT(END_OBJECT) {
@@ -287,18 +286,30 @@ public class JSONRecordReader implements RecordReader {
int colIndex,
int groupCount) throws IOException, SchemaChangeException {
RecordSchema currentSchema = reader.getCurrentSchema();
- Field field = currentSchema.getField(fieldName, colIndex);
+ Field field = currentSchema.getField(fieldName == null ? prefixFieldName : fieldName, colIndex);
boolean isFieldFound = field != null;
List<Field> removedFields = reader.getRemovedFields();
- if (!isFieldFound || !field.getFieldType().equals(fieldType)) {
- if (isFieldFound) {
+ boolean newFieldLateBound = fieldType.getMinorType().equals(MinorType.LATE);
+
+ if (isFieldFound && !field.getFieldType().equals(fieldType)) {
+ boolean existingFieldLateBound = field.getFieldType().getMinorType().equals(MinorType.LATE);
+
+ if (newFieldLateBound && !existingFieldLateBound) {
+ fieldType = Types.overrideMinorType(fieldType, field.getFieldType().getMinorType());
+ } else if (!newFieldLateBound && existingFieldLateBound) {
+ field.setFieldType(Types.overrideMinorType(field.getFieldType(), fieldType.getMinorType()));
+ } else if (!newFieldLateBound && !existingFieldLateBound) {
if (field.hasSchema()) {
removeChildFields(removedFields, field);
}
removedFields.add(field);
currentSchema.removeField(field, colIndex);
+
+ isFieldFound = false;
}
+ }
+ if (!isFieldFound) {
field = createField(
currentSchema,
prefixFieldName,
@@ -316,16 +327,19 @@ public class JSONRecordReader implements RecordReader {
VectorHolder holder = getOrCreateVectorHolder(reader, field);
if (readType != null) {
RecordSchema fieldSchema = field.getAssignedSchema();
- reader.setCurrentSchema(fieldSchema);
-
RecordSchema newSchema = readType.createSchema();
- field.assignSchemaIfNull(newSchema);
- if (fieldSchema == null) reader.setCurrentSchema(newSchema);
- readType.readRecord(reader, field.getFullFieldName(), rowIndex, groupCount);
+ if (readType != ReadType.ARRAY) {
+ reader.setCurrentSchema(fieldSchema);
+ if (fieldSchema == null) reader.setCurrentSchema(newSchema);
+ readType.readRecord(reader, field.getFullFieldName(), rowIndex, groupCount);
+ } else {
+ readType.readRecord(reader, field.getFullFieldName(), rowIndex, groupCount);
+ }
reader.setCurrentSchema(currentSchema);
- } else {
+
+ } else if (holder != null && !newFieldLateBound && fieldType.getMinorType() != MinorType.LATE) {
return addValueToVector(
rowIndex,
holder,
@@ -447,22 +461,23 @@ public class JSONRecordReader implements RecordReader {
}
private VectorHolder getOrCreateVectorHolder(Field field) throws SchemaChangeException {
- VectorHolder holder = valueVectorMap.get(field);
+ String fullFieldName = field.getFullFieldName();
+ VectorHolder holder = valueVectorMap.get(fullFieldName);
if (holder == null) {
MajorType type = field.getFieldType();
- MaterializedField f = MaterializedField.create(new SchemaPath(field.getFullFieldName(), ExpressionPosition.UNKNOWN), type);
-
- MinorType minorType = f.getType().getMinorType();
+ MinorType minorType = type.getMinorType();
if (minorType.equals(MinorType.MAP) || minorType.equals(MinorType.LATE)) {
return null;
}
+ MaterializedField f = MaterializedField.create(new SchemaPath(fullFieldName, ExpressionPosition.UNKNOWN), type);
+
ValueVector v = TypeHelper.getNewVector(f, allocator);
AllocationHelper.allocate(v, batchSize, 50);
holder = new VectorHolder(batchSize, v);
- valueVectorMap.put(field, holder);
+ valueVectorMap.put(fullFieldName, holder);
outputMutator.addField(v);
return holder;
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b1e48b32/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/JSONRecordReaderTest.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/JSONRecordReaderTest.java b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/JSONRecordReaderTest.java
index b39ac8a..6b353ae 100644
--- a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/JSONRecordReaderTest.java
+++ b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/JSONRecordReaderTest.java
@@ -70,10 +70,6 @@ public class JSONRecordReaderTest {
assertEquals(expectedMinorType, def.getMajorType().getMinorType());
String[] parts = name.split("\\.");
int expected = parts.length;
- boolean expectingArray = List.class.isAssignableFrom(value.getClass());
- if (expectingArray) {
- expected += 1;
- }
assertEquals(expected, def.getNameList().size());
for(int i = 0; i < parts.length; ++i) {
assertEquals(parts[i], def.getName(i).getName());
@@ -203,12 +199,12 @@ public class JSONRecordReaderTest {
assertEquals("c", removedFields.get(0).getName());
removedFields.clear();
assertEquals(1, jr.next());
- assertEquals(8, addFields.size()); // The reappearing of field 'c' is also included
+ assertEquals(7, addFields.size()); // The reappearing of field 'c' is also included
assertField(addFields.get(0), 0, MinorType.INT, 12345, "test");
assertField(addFields.get(3), 0, MinorType.BIT, true, "bool");
assertField(addFields.get(5), 0, MinorType.INT, 6, "d");
- assertField(addFields.get(6), 0, MinorType.FLOAT4, (float) 5.16, "c");
- assertField(addFields.get(7), 0, MinorType.VARCHAR, "test3".getBytes(UTF_8), "str2");
+ assertField(addFields.get(2), 0, MinorType.FLOAT4, (float) 5.16, "c");
+ assertField(addFields.get(6), 0, MinorType.VARCHAR, "test3".getBytes(UTF_8), "str2");
assertEquals(2, removedFields.size());
Iterables.find(removedFields, new Predicate<MaterializedField>() {
@Override
@@ -282,4 +278,35 @@ public class JSONRecordReaderTest {
assertEquals(0, jr.next());
assertTrue(mutator.getRemovedFields().isEmpty());
}
+
+ @Test
+ public void testRepeatedMissingFields(@Injectable final FragmentContext context) throws ExecutionSetupException {
+ new Expectations() {
+ {
+ context.getAllocator();
+ returns(new DirectBufferAllocator());
+ }
+ };
+
+ JSONRecordReader jr = new JSONRecordReader(context, getResource("scan_json_test_5.json"));
+
+ MockOutputMutator mutator = new MockOutputMutator();
+ List<ValueVector> addFields = mutator.getAddFields();
+ jr.setup(mutator);
+ assertEquals(9, jr.next());
+ assertEquals(1, addFields.size());
+ assertField(addFields.get(0), 0, MinorType.INT, Arrays.<Integer>asList(), "test");
+ assertField(addFields.get(0), 1, MinorType.INT, Arrays.asList(1, 2, 3), "test");
+ assertField(addFields.get(0), 2, MinorType.INT, Arrays.<Integer>asList(), "test");
+ assertField(addFields.get(0), 3, MinorType.INT, Arrays.<Integer>asList(), "test");
+ assertField(addFields.get(0), 4, MinorType.INT, Arrays.asList(4, 5, 6), "test");
+ assertField(addFields.get(0), 5, MinorType.INT, Arrays.<Integer>asList(), "test");
+ assertField(addFields.get(0), 6, MinorType.INT, Arrays.<Integer>asList(), "test");
+ assertField(addFields.get(0), 7, MinorType.INT, Arrays.asList(7, 8, 9), "test");
+ assertField(addFields.get(0), 8, MinorType.INT, Arrays.<Integer>asList(), "test");
+
+
+ assertEquals(0, jr.next());
+ assertTrue(mutator.getRemovedFields().isEmpty());
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b1e48b32/sandbox/prototype/exec/java-exec/src/test/resources/scan_json_test_5.json
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/test/resources/scan_json_test_5.json b/sandbox/prototype/exec/java-exec/src/test/resources/scan_json_test_5.json
index ae1aaf2..4977c60 100644
--- a/sandbox/prototype/exec/java-exec/src/test/resources/scan_json_test_5.json
+++ b/sandbox/prototype/exec/java-exec/src/test/resources/scan_json_test_5.json
@@ -1,21 +1,24 @@
{
- "test": 123,
- "test2": [1,2,3],
- "a": {
- "b": 1
- }
+ "test": []
}
{
- "test": 1234,
- "test3": false,
- "a": {
- "b": 2
- }
+ "test": [1,2,3]
+}
+{
+ "test": []
+}
+{
+ "test": null
+}
+{
+ "test": [4,5,6]
+}
+{
+}
+{
+}
+{
+ "test": [7,8,9]
}
{
- "test": 1234,
- "test2": 1.5,
- "a": {
- "b": 2
- }
}
\ No newline at end of file