You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@drill.apache.org by ja...@apache.org on 2013/08/16 03:44:42 UTC

[04/27] git commit: Fix late type binding for json record reader

Fix late type binding for json record reader


Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/b1e48b32
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/b1e48b32
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/b1e48b32

Branch: refs/heads/master
Commit: b1e48b32e3bc5e240a01d75f83ac5d2be4b2e7ae
Parents: a15f5b1
Author: Timothy Chen <tn...@gmail.com>
Authored: Sun Aug 11 11:55:24 2013 -0700
Committer: Timothy Chen <tn...@gmail.com>
Committed: Sun Aug 11 11:55:24 2013 -0700

----------------------------------------------------------------------
 .../org/apache/drill/common/types/Types.java    |  27 ++++-
 .../org/apache/drill/exec/schema/Field.java     | 111 +++++++++++--------
 .../exec/schema/json/jackson/JacksonHelper.java |   1 +
 .../drill/exec/store/JSONRecordReader.java      |  49 +++++---
 .../drill/exec/store/JSONRecordReaderTest.java  |  41 +++++--
 .../src/test/resources/scan_json_test_5.json    |  33 +++---
 6 files changed, 170 insertions(+), 92 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b1e48b32/sandbox/prototype/common/src/main/java/org/apache/drill/common/types/Types.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/common/src/main/java/org/apache/drill/common/types/Types.java b/sandbox/prototype/common/src/main/java/org/apache/drill/common/types/Types.java
index e81bc89..f07f726 100644
--- a/sandbox/prototype/common/src/main/java/org/apache/drill/common/types/Types.java
+++ b/sandbox/prototype/common/src/main/java/org/apache/drill/common/types/Types.java
@@ -4,6 +4,8 @@ import org.apache.drill.common.types.TypeProtos.DataMode;
 import org.apache.drill.common.types.TypeProtos.MajorType;
 import org.apache.drill.common.types.TypeProtos.MinorType;
 
+import static org.apache.drill.common.types.TypeProtos.DataMode.REPEATED;
+
 public class Types {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(Types.class);
   
@@ -16,7 +18,7 @@ public class Types {
   }
   
   public static boolean isNumericType(MajorType type){
-    if(type.getMode() == DataMode.REPEATED) return false;
+    if(type.getMode() == REPEATED) return false;
     
     switch(type.getMinorType()){
     case BIGINT:
@@ -40,7 +42,7 @@ public class Types {
   }
   
   public static boolean usesHolderForGet(MajorType type){
-    if(type.getMode() == DataMode.REPEATED) return true;
+    if(type.getMode() == REPEATED) return true;
     switch(type.getMinorType()){
     case BIGINT:
     case DECIMAL4:
@@ -76,7 +78,7 @@ public class Types {
   
   
   public static boolean isStringScalarType(MajorType type){
-    if(type.getMode() == DataMode.REPEATED) return false;
+    if(type.getMode() == REPEATED) return false;
     switch(type.getMinorType()){
     case FIXEDCHAR:
     case FIXED16CHAR:
@@ -89,7 +91,7 @@ public class Types {
   }
   
   public static boolean isBytesScalarType(MajorType type){
-    if(type.getMode() == DataMode.REPEATED) return false;
+    if(type.getMode() == REPEATED) return false;
     switch(type.getMinorType()){
     case FIXEDBINARY:
     case VARBINARY:
@@ -100,7 +102,7 @@ public class Types {
   }
   
   public static Comparability getComparability(MajorType type){
-    if(type.getMode() == DataMode.REPEATED) return Comparability.NONE;
+    if(type.getMode() == REPEATED) return Comparability.NONE;
     if(type.getMinorType() == MinorType.LATE) return Comparability.UNKNOWN;
     
     switch(type.getMinorType()){
@@ -144,12 +146,25 @@ public class Types {
   }
   
   public static MajorType repeated(MinorType type){
-    return MajorType.newBuilder().setMode(DataMode.REPEATED).setMinorType(type).build();
+    return MajorType.newBuilder().setMode(REPEATED).setMinorType(type).build();
   }
   
   public static MajorType optional(MinorType type){
     return MajorType.newBuilder().setMode(DataMode.OPTIONAL).setMinorType(type).build();
   }
+
+  public static MajorType overrideMinorType(MajorType originalMajorType, MinorType overrideMinorType) {
+    switch(originalMajorType.getMode()) {
+      case REPEATED:
+        return repeated(overrideMinorType);
+      case OPTIONAL:
+        return optional(overrideMinorType);
+      case REQUIRED:
+        return required(overrideMinorType);
+      default:
+        throw new UnsupportedOperationException();
+    }
+  }
   
   
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b1e48b32/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/schema/Field.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/schema/Field.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/schema/Field.java
index 85bbdf3..080be92 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/schema/Field.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/schema/Field.java
@@ -20,6 +20,7 @@ package org.apache.drill.exec.schema;
 
 import org.apache.drill.common.expression.ExpressionPosition;
 import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.types.TypeProtos;
 import org.apache.drill.common.types.TypeProtos.MajorType;
 import org.apache.drill.exec.record.MaterializedField;
 
@@ -27,64 +28,80 @@ import com.google.common.base.Objects;
 import com.google.common.base.Strings;
 
 public abstract class Field {
-    final MajorType fieldType;
-    final String prefixFieldName;
-    RecordSchema schema;
-    RecordSchema parentSchema;
-    boolean read;
-
-    public Field(RecordSchema parentSchema, MajorType type, String prefixFieldName) {
-        fieldType = type;
-        this.prefixFieldName = prefixFieldName;
-        this.parentSchema = parentSchema;
+  final String prefixFieldName;
+  MajorType fieldType;
+  RecordSchema schema;
+  RecordSchema parentSchema;
+  boolean read;
+
+  public Field(RecordSchema parentSchema, MajorType type, String prefixFieldName) {
+    fieldType = type;
+    this.prefixFieldName = prefixFieldName;
+    this.parentSchema = parentSchema;
+  }
+
+  public MaterializedField getAsMaterializedField() {
+    return MaterializedField.create(new SchemaPath(getFieldName(), ExpressionPosition.UNKNOWN), fieldType);
+  }
+
+  public abstract String getFieldName();
+
+  public String getFullFieldName() {
+    String fieldName = getFieldName();
+    if(Strings.isNullOrEmpty(prefixFieldName)) {
+      return fieldName;
+    } else if(Strings.isNullOrEmpty(fieldName)) {
+      return prefixFieldName;
+    } else {
+      return prefixFieldName + "." + getFieldName();
     }
+  }
 
-    public MaterializedField getAsMaterializedField(){
-      return MaterializedField.create(new SchemaPath(getFieldName(), ExpressionPosition.UNKNOWN), fieldType);
-    }
-    
-    public abstract String getFieldName();
+  public void setRead(boolean read) {
+    this.read = read;
+  }
 
-    public String getFullFieldName() {
-        return Strings.isNullOrEmpty(prefixFieldName) ? getFieldName() : prefixFieldName + "." + getFieldName();
-    }
+  protected abstract Objects.ToStringHelper addAttributesToHelper(Objects.ToStringHelper helper);
 
-    public void setRead(boolean read) {
-        this.read = read;
-    }
+  Objects.ToStringHelper getAttributesStringHelper() {
+    return Objects.toStringHelper(this).add("type", fieldType)
+        .add("fullFieldName", getFullFieldName())
+        .add("schema", schema == null ? null : schema.toSchemaString()).omitNullValues();
+  }
 
-    protected abstract Objects.ToStringHelper addAttributesToHelper(Objects.ToStringHelper helper);
+  @Override
+  public String toString() {
+    return addAttributesToHelper(getAttributesStringHelper()).toString();
+  }
 
-    Objects.ToStringHelper getAttributesStringHelper() {
-        return Objects.toStringHelper(this).add("type", fieldType)
-                .add("fullFieldName", getFullFieldName())
-                .add("schema", schema == null ? null : schema.toSchemaString()).omitNullValues();
-    }
+  public RecordSchema getAssignedSchema() {
+    return schema;
+  }
 
-    @Override
-    public String toString() {
-        return addAttributesToHelper(getAttributesStringHelper()).toString();
+  public void assignSchemaIfNull(RecordSchema newSchema) {
+    if (!hasSchema()) {
+      schema = newSchema;
     }
+  }
 
-    public RecordSchema getAssignedSchema() {
-        return schema;
-    }
+  public boolean isRead() {
+    return read;
+  }
 
-    public void assignSchemaIfNull(RecordSchema newSchema) {
-        if (!hasSchema()) {
-            schema = newSchema;
-        }
-    }
+  public boolean hasSchema() {
+    return schema != null;
+  }
 
-    public boolean isRead() {
-        return read;
-    }
+  public MajorType getFieldType() {
+    return fieldType;
+  }
 
-    public boolean hasSchema() {
-        return schema != null;
-    }
+  public void setFieldType(MajorType fieldType) {
+    this.fieldType = fieldType;
+  }
 
-    public MajorType getFieldType() {
-        return fieldType;
-    }
+  @Override
+  public int hashCode() {
+    return getFullFieldName().hashCode();
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b1e48b32/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/schema/json/jackson/JacksonHelper.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/schema/json/jackson/JacksonHelper.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/schema/json/jackson/JacksonHelper.java
index d8f0646..22167b1 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/schema/json/jackson/JacksonHelper.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/schema/json/jackson/JacksonHelper.java
@@ -92,6 +92,7 @@ public class JacksonHelper {
       case BIT:
         return parser.getBooleanValue();
       case LATE:
+      case NULL:
         return null;
       default:
         throw new RuntimeException("Unexpected Field type to return value: " + fieldType.toString());

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b1e48b32/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/JSONRecordReader.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/JSONRecordReader.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/JSONRecordReader.java
index a4887c0..21b8c1b 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/JSONRecordReader.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/JSONRecordReader.java
@@ -16,6 +16,7 @@ import org.apache.drill.common.expression.ExpressionPosition;
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.common.types.TypeProtos.MajorType;
 import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.common.types.Types;
 import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.ops.FragmentContext;
@@ -42,7 +43,7 @@ public class JSONRecordReader implements RecordReader {
 
   private final String inputPath;
 
-  private final Map<Field, VectorHolder> valueVectorMap;
+  private final Map<String, VectorHolder> valueVectorMap;
 
   private JsonParser parser;
   private SchemaIdGenerator generator;
@@ -181,13 +182,11 @@ public class JSONRecordReader implements RecordReader {
       @Override
       public Field createField(RecordSchema parentSchema, String prefixFieldName, String fieldName, MajorType fieldType, int index) {
         return new NamedField(parentSchema, prefixFieldName, fieldName, fieldType);
-        //return new OrderedField(parentSchema, fieldType, prefixFieldName, index);
       }
 
       @Override
       public RecordSchema createSchema() throws IOException {
         return new ObjectSchema();
-        //return new ListSchema();
       }
     },
     OBJECT(END_OBJECT) {
@@ -287,18 +286,30 @@ public class JSONRecordReader implements RecordReader {
                                int colIndex,
                                int groupCount) throws IOException, SchemaChangeException {
       RecordSchema currentSchema = reader.getCurrentSchema();
-      Field field = currentSchema.getField(fieldName, colIndex);
+      Field field = currentSchema.getField(fieldName == null ? prefixFieldName : fieldName, colIndex);
       boolean isFieldFound = field != null;
       List<Field> removedFields = reader.getRemovedFields();
-      if (!isFieldFound || !field.getFieldType().equals(fieldType)) {
-        if (isFieldFound) {
+      boolean newFieldLateBound = fieldType.getMinorType().equals(MinorType.LATE);
+
+      if (isFieldFound && !field.getFieldType().equals(fieldType)) {
+        boolean existingFieldLateBound = field.getFieldType().getMinorType().equals(MinorType.LATE);
+
+        if (newFieldLateBound && !existingFieldLateBound) {
+          fieldType = Types.overrideMinorType(fieldType, field.getFieldType().getMinorType());
+        } else if (!newFieldLateBound && existingFieldLateBound) {
+          field.setFieldType(Types.overrideMinorType(field.getFieldType(), fieldType.getMinorType()));
+        } else if (!newFieldLateBound && !existingFieldLateBound) {
           if (field.hasSchema()) {
             removeChildFields(removedFields, field);
           }
           removedFields.add(field);
           currentSchema.removeField(field, colIndex);
+
+          isFieldFound = false;
         }
+      }
 
+      if (!isFieldFound) {
         field = createField(
             currentSchema,
             prefixFieldName,
@@ -316,16 +327,19 @@ public class JSONRecordReader implements RecordReader {
       VectorHolder holder = getOrCreateVectorHolder(reader, field);
       if (readType != null) {
         RecordSchema fieldSchema = field.getAssignedSchema();
-        reader.setCurrentSchema(fieldSchema);
-
         RecordSchema newSchema = readType.createSchema();
-        field.assignSchemaIfNull(newSchema);
 
-        if (fieldSchema == null) reader.setCurrentSchema(newSchema);
-        readType.readRecord(reader, field.getFullFieldName(), rowIndex, groupCount);
+        if (readType != ReadType.ARRAY) {
+          reader.setCurrentSchema(fieldSchema);
+          if (fieldSchema == null) reader.setCurrentSchema(newSchema);
+          readType.readRecord(reader, field.getFullFieldName(), rowIndex, groupCount);
+        } else {
+          readType.readRecord(reader, field.getFullFieldName(), rowIndex, groupCount);
+        }
 
         reader.setCurrentSchema(currentSchema);
-      } else {
+
+      } else if (holder != null && !newFieldLateBound && fieldType.getMinorType() != MinorType.LATE) {
         return addValueToVector(
             rowIndex,
             holder,
@@ -447,22 +461,23 @@ public class JSONRecordReader implements RecordReader {
   }
 
   private VectorHolder getOrCreateVectorHolder(Field field) throws SchemaChangeException {
-    VectorHolder holder = valueVectorMap.get(field);
+    String fullFieldName = field.getFullFieldName();
+    VectorHolder holder = valueVectorMap.get(fullFieldName);
 
     if (holder == null) {
       MajorType type = field.getFieldType();
-      MaterializedField f = MaterializedField.create(new SchemaPath(field.getFullFieldName(), ExpressionPosition.UNKNOWN), type);
-
-      MinorType minorType = f.getType().getMinorType();
+      MinorType minorType = type.getMinorType();
 
       if (minorType.equals(MinorType.MAP) || minorType.equals(MinorType.LATE)) {
         return null;
       }
 
+      MaterializedField f = MaterializedField.create(new SchemaPath(fullFieldName, ExpressionPosition.UNKNOWN), type);
+
       ValueVector v = TypeHelper.getNewVector(f, allocator);
       AllocationHelper.allocate(v, batchSize, 50);
       holder = new VectorHolder(batchSize, v);
-      valueVectorMap.put(field, holder);
+      valueVectorMap.put(fullFieldName, holder);
       outputMutator.addField(v);
       return holder;
     }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b1e48b32/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/JSONRecordReaderTest.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/JSONRecordReaderTest.java b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/JSONRecordReaderTest.java
index b39ac8a..6b353ae 100644
--- a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/JSONRecordReaderTest.java
+++ b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/JSONRecordReaderTest.java
@@ -70,10 +70,6 @@ public class JSONRecordReaderTest {
     assertEquals(expectedMinorType, def.getMajorType().getMinorType());
     String[] parts = name.split("\\.");
     int expected = parts.length;
-    boolean expectingArray = List.class.isAssignableFrom(value.getClass());
-    if (expectingArray) {
-      expected += 1;
-    }
     assertEquals(expected, def.getNameList().size());
     for(int i = 0; i < parts.length; ++i) {
       assertEquals(parts[i], def.getName(i).getName());
@@ -203,12 +199,12 @@ public class JSONRecordReaderTest {
     assertEquals("c", removedFields.get(0).getName());
     removedFields.clear();
     assertEquals(1, jr.next());
-    assertEquals(8, addFields.size()); // The reappearing of field 'c' is also included
+    assertEquals(7, addFields.size()); // The reappearing of field 'c' is also included
     assertField(addFields.get(0), 0, MinorType.INT, 12345, "test");
     assertField(addFields.get(3), 0, MinorType.BIT, true, "bool");
     assertField(addFields.get(5), 0, MinorType.INT, 6, "d");
-    assertField(addFields.get(6), 0, MinorType.FLOAT4, (float) 5.16, "c");
-    assertField(addFields.get(7), 0, MinorType.VARCHAR, "test3".getBytes(UTF_8), "str2");
+    assertField(addFields.get(2), 0, MinorType.FLOAT4, (float) 5.16, "c");
+    assertField(addFields.get(6), 0, MinorType.VARCHAR, "test3".getBytes(UTF_8), "str2");
     assertEquals(2, removedFields.size());
     Iterables.find(removedFields, new Predicate<MaterializedField>() {
       @Override
@@ -282,4 +278,35 @@ public class JSONRecordReaderTest {
     assertEquals(0, jr.next());
     assertTrue(mutator.getRemovedFields().isEmpty());
   }
+
+  @Test
+  public void testRepeatedMissingFields(@Injectable final FragmentContext context) throws ExecutionSetupException {
+    new Expectations() {
+      {
+        context.getAllocator();
+        returns(new DirectBufferAllocator());
+      }
+    };
+
+    JSONRecordReader jr = new JSONRecordReader(context, getResource("scan_json_test_5.json"));
+
+    MockOutputMutator mutator = new MockOutputMutator();
+    List<ValueVector> addFields = mutator.getAddFields();
+    jr.setup(mutator);
+    assertEquals(9, jr.next());
+    assertEquals(1, addFields.size());
+    assertField(addFields.get(0), 0, MinorType.INT, Arrays.<Integer>asList(), "test");
+    assertField(addFields.get(0), 1, MinorType.INT, Arrays.asList(1, 2, 3), "test");
+    assertField(addFields.get(0), 2, MinorType.INT, Arrays.<Integer>asList(), "test");
+    assertField(addFields.get(0), 3, MinorType.INT, Arrays.<Integer>asList(), "test");
+    assertField(addFields.get(0), 4, MinorType.INT, Arrays.asList(4, 5, 6), "test");
+    assertField(addFields.get(0), 5, MinorType.INT, Arrays.<Integer>asList(), "test");
+    assertField(addFields.get(0), 6, MinorType.INT, Arrays.<Integer>asList(), "test");
+    assertField(addFields.get(0), 7, MinorType.INT, Arrays.asList(7, 8, 9), "test");
+    assertField(addFields.get(0), 8, MinorType.INT, Arrays.<Integer>asList(), "test");
+
+
+    assertEquals(0, jr.next());
+    assertTrue(mutator.getRemovedFields().isEmpty());
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b1e48b32/sandbox/prototype/exec/java-exec/src/test/resources/scan_json_test_5.json
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/test/resources/scan_json_test_5.json b/sandbox/prototype/exec/java-exec/src/test/resources/scan_json_test_5.json
index ae1aaf2..4977c60 100644
--- a/sandbox/prototype/exec/java-exec/src/test/resources/scan_json_test_5.json
+++ b/sandbox/prototype/exec/java-exec/src/test/resources/scan_json_test_5.json
@@ -1,21 +1,24 @@
 {
-    "test": 123,
-    "test2": [1,2,3],
-    "a": {
-    	 "b": 1
-    }
+    "test": []
 }
 {
-    "test": 1234,
-    "test3": false,
-    "a": {
-    	 "b": 2
-    }
+    "test": [1,2,3]
+}
+{
+    "test": []
+}
+{
+    "test": null
+}
+{
+    "test": [4,5,6]
+}
+{
+}
+{
+}
+{
+    "test": [7,8,9]
 }
 {
-    "test": 1234,
-    "test2": 1.5,
-    "a": {
-    	 "b": 2
-    }
 }
\ No newline at end of file