You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by pa...@apache.org on 2014/11/27 18:32:03 UTC

[2/4] drill git commit: DRILL-1738: Allow case insensitive read in Complex parquet reader

DRILL-1738: Allow case insensitive read in Complex parquet reader


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/41e444dd
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/41e444dd
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/41e444dd

Branch: refs/heads/master
Commit: 41e444dd51217a0c4b417d3c0b2aed386a287696
Parents: 163917e
Author: Parth Chandra <pc...@maprtech.com>
Authored: Mon Nov 24 09:48:35 2014 -0800
Committer: Parth Chandra <pc...@maprtech.com>
Committed: Wed Nov 26 14:51:04 2014 -0800

----------------------------------------------------------------------
 .../parquet2/DrillParquetGroupConverter.java    |  59 +++++++++--
 .../exec/store/parquet2/DrillParquetReader.java | 103 ++++++++++++++-----
 .../DrillParquetRecordMaterializer.java         |   8 +-
 3 files changed, 134 insertions(+), 36 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/41e444dd/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetGroupConverter.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetGroupConverter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetGroupConverter.java
index c6310b1..9bc4a1e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetGroupConverter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetGroupConverter.java
@@ -20,8 +20,13 @@ package org.apache.drill.exec.store.parquet2;
 import io.netty.buffer.DrillBuf;
 
 import java.math.BigDecimal;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
 import java.util.List;
 
+import org.apache.drill.common.expression.PathSegment;
+import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.exec.expr.holders.BigIntHolder;
 import org.apache.drill.exec.expr.holders.BitHolder;
 import org.apache.drill.exec.expr.holders.DateHolder;
@@ -76,35 +81,73 @@ public class DrillParquetGroupConverter extends GroupConverter {
   private MapWriter mapWriter;
   private final OutputMutator mutator;
 
-  public DrillParquetGroupConverter(OutputMutator mutator, ComplexWriterImpl complexWriter, MessageType schema) {
-    this(mutator, complexWriter.rootAsMap(), schema);
+  public DrillParquetGroupConverter(OutputMutator mutator, ComplexWriterImpl complexWriter, MessageType schema, Collection<SchemaPath> columns) {
+    this(mutator, complexWriter.rootAsMap(), schema, columns);
   }
 
-  public DrillParquetGroupConverter(OutputMutator mutator, MapWriter mapWriter, GroupType schema) {
+  // This function assumes that the fields in the schema parameter are in the same order as the fields in the columns parameter. The
+  // columns parameter may have fields that are not present in the schema, though.
+  public DrillParquetGroupConverter(OutputMutator mutator, MapWriter mapWriter, GroupType schema, Collection<SchemaPath> columns) {
     this.mapWriter = mapWriter;
     this.mutator = mutator;
     converters = Lists.newArrayList();
+
+    Iterator<SchemaPath> colIterator=columns.iterator();
+
     for (Type type : schema.getFields()) {
       Repetition rep = type.getRepetition();
       boolean isPrimitive = type.isPrimitive();
+
+      // Match the name of the field in the schema definition to the name of the field in the query.
+      String name = null;
+      SchemaPath col=null;
+      PathSegment colPath = null;
+      PathSegment colNextChild = null;
+      if(colIterator.hasNext()) {
+        col = colIterator.next();
+        colPath=col.getRootSegment();
+        colNextChild = colPath.getChild();
+      }
+      if( colPath!=null && colPath.isNamed() && (!colPath.getNameSegment().getPath().equals("*")) ){
+        name=colPath.getNameSegment().getPath();
+        // We may have a field that does not exist in the schema
+        if(!name.equalsIgnoreCase(type.getName())){
+          continue;
+        }
+      }else{
+        name=type.getName();
+      }
+
       if (!isPrimitive) {
+        Collection<SchemaPath> c = new ArrayList<SchemaPath>();
+
+        while(colNextChild!=null) {
+          if(colNextChild.isNamed()) {
+            break;
+          }
+          colNextChild=colNextChild.getChild();
+        }
+
+        if(colNextChild!=null) {
+          SchemaPath s = new SchemaPath(colNextChild.getNameSegment());
+          c.add(s);
+        }
         if (rep != Repetition.REPEATED) {
-          DrillParquetGroupConverter converter = new DrillParquetGroupConverter(mutator, mapWriter.map(type.getName()), type.asGroupType());
+          DrillParquetGroupConverter converter = new DrillParquetGroupConverter(mutator, mapWriter.map(name), type.asGroupType(), c);
           converters.add(converter);
         } else {
-          DrillParquetGroupConverter converter = new DrillParquetGroupConverter(mutator, mapWriter.list(type.getName()).map(), type.asGroupType());
+          DrillParquetGroupConverter converter = new DrillParquetGroupConverter(mutator, mapWriter.list(name).map(), type.asGroupType(), c);
           converters.add(converter);
         }
       } else {
-        PrimitiveConverter converter = getConverterForType(type.asPrimitiveType());
+        PrimitiveConverter converter = getConverterForType(name, type.asPrimitiveType());
         converters.add(converter);
       }
     }
   }
 
-  private PrimitiveConverter getConverterForType(PrimitiveType type) {
+  private PrimitiveConverter getConverterForType(String name, PrimitiveType type) {
 
-    String name = type.getName();
     switch(type.getPrimitiveTypeName()) {
       case INT32: {
         if (type.getOriginalType() == null) {

http://git-wip-us.apache.org/repos/asf/drill/blob/41e444dd/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java
index c3e8330..4d223f0 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java
@@ -44,6 +44,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 
+import parquet.column.ColumnDescriptor;
 import parquet.hadoop.CodecFactoryExposer;
 import parquet.hadoop.ColumnChunkIncReadStore;
 import parquet.hadoop.metadata.BlockMetaData;
@@ -56,9 +57,12 @@ import parquet.io.MessageColumnIO;
 import parquet.schema.GroupType;
 import parquet.schema.MessageType;
 import parquet.schema.Type;
+import parquet.schema.PrimitiveType;
+
 
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
+import parquet.schema.Types;
 
 public class DrillParquetReader extends AbstractRecordReader {
 
@@ -96,31 +100,51 @@ public class DrillParquetReader extends AbstractRecordReader {
 
   public static MessageType getProjection(MessageType schema, Collection<SchemaPath> columns) {
     MessageType projection = null;
+
+    String messageName = schema.getName();
+    List<ColumnDescriptor> schemaColumns = schema.getColumns();
+
     for (SchemaPath path : columns) {
-      List<String> segments = Lists.newArrayList();
-      PathSegment rootSegment = path.getRootSegment();
-      PathSegment seg = rootSegment;
-      String messageName = schema.getName();
-      while(seg != null){
-        if(seg.isNamed()) {
-          segments.add(seg.getNameSegment().getPath());
+      for (ColumnDescriptor colDesc: schemaColumns) {
+        String[] schemaColDesc = colDesc.getPath();
+        SchemaPath schemaPath = SchemaPath.getCompoundPath(schemaColDesc);
+
+        PathSegment schemaSeg = schemaPath.getRootSegment();
+        PathSegment colSeg = path.getRootSegment();
+        List<String> segments = Lists.newArrayList();
+        List<String> colSegments = Lists.newArrayList();
+        while(schemaSeg != null && colSeg != null){
+          if (colSeg.isNamed()) {
+            // DRILL-1739 - Use case insensitive name comparison
+            if(schemaSeg.getNameSegment().getPath().equalsIgnoreCase(colSeg.getNameSegment().getPath())) {
+              segments.add(schemaSeg.getNameSegment().getPath());
+              colSegments.add(colSeg.getNameSegment().getPath());
+            }else{
+              break;
+            }
+          }else{
+            colSeg=colSeg.getChild();
+            continue;
+          }
+          colSeg = colSeg.getChild();
+          schemaSeg = schemaSeg.getChild();
         }
-        seg = seg.getChild();
-      }
-      String[] pathSegments = new String[segments.size()];
-      segments.toArray(pathSegments);
-      Type type = null;
-      try {
-        type = schema.getType(pathSegments);
-      } catch (InvalidRecordException e) {
-        logger.warn("Invalid record" , e);
-      }
-      if (type != null) {
-        Type t = getType(pathSegments, 0, schema);
-        if (projection == null) {
-          projection = new MessageType(messageName, t);
-        } else {
-          projection = projection.union(new MessageType(messageName, t));
+        // Field exists in schema
+        if (!segments.isEmpty()) {
+          String[] pathSegments = new String[segments.size()];
+          segments.toArray(pathSegments);
+          String[] colPathSegments = new String[colSegments.size()];
+          colSegments.toArray(colPathSegments);
+
+          // Use the field names from the schema or we get an exception if the case of the name doesn't match
+          Type t = getType(colPathSegments, pathSegments, 0, schema);
+
+          if (projection == null) {
+            projection = new MessageType(messageName, t);
+          } else {
+            projection = projection.union(new MessageType(messageName, t));
+          }
+          break;
         }
       }
     }
@@ -184,7 +208,7 @@ public class DrillParquetReader extends AbstractRecordReader {
       }
 
       writer = new VectorContainerWriter(output);
-      recordMaterializer = new DrillParquetRecordMaterializer(output, writer, projection);
+      recordMaterializer = new DrillParquetRecordMaterializer(output, writer, projection, getColumns());
       primitiveVectors = writer.getMapVector().getPrimitiveVectors();
       recordReader = columnIO.getRecordReader(pageReadStore, recordMaterializer);
     } catch (Exception e) {
@@ -192,13 +216,29 @@ public class DrillParquetReader extends AbstractRecordReader {
     }
   }
 
-  private static Type getType(String[] pathSegments, int depth, MessageType schema) {
+  private static Type getType(String[] colSegs, String[] pathSegments, int depth, MessageType schema) {
     Type type = schema.getType(Arrays.copyOfRange(pathSegments, 0, depth + 1));
+    //String name = colSegs[depth]; // get the name from the column list not the schema
     if (depth + 1 == pathSegments.length) {
+      //type.name = colSegs[depth];
+      //Type newType = type;
+
+      //if(type.isPrimitive()){
+      //  //newType = new PrimitiveType(type.getRepetition(), type.asPrimitiveType().getPrimitiveTypeName(), name, type.getOriginalType());
+      //  Types.PrimitiveBuilder<PrimitiveType> builder = Types.primitive(
+      //    type.asPrimitiveType().getPrimitiveTypeName(), type.getRepetition());
+      //  if (PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY == type.asPrimitiveType().getPrimitiveTypeName()) {
+      //    builder.length(type.asPrimitiveType().getTypeLength());
+      //  }
+      //  newType = builder.named(name);
+      //}else{
+      //  //newType = new GroupType(type.getRepetition(), name, type);
+      //  newType = new GroupType(type.getRepetition(), name, type.asGroupType().getFields());
+      //}
       return type;
     } else {
       Preconditions.checkState(!type.isPrimitive());
-      return new GroupType(type.getRepetition(), type.getName(), getType(pathSegments, depth + 1, schema));
+      return new GroupType(type.getRepetition(), type.getName(), getType(colSegs, pathSegments, depth + 1, schema));
     }
   }
 
@@ -254,5 +294,16 @@ public class DrillParquetReader extends AbstractRecordReader {
     this.operatorContext = operatorContext;
   }
 
+  static public class ProjectedColumnType{
+    ProjectedColumnType(String projectedColumnName, MessageType type){
+      this.projectedColumnName=projectedColumnName;
+      this.type=type;
+    }
+
+    public String projectedColumnName;
+    public MessageType type;
+
+
+  }
 
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/41e444dd/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetRecordMaterializer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetRecordMaterializer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetRecordMaterializer.java
index f9c3480..720e8be 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetRecordMaterializer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetRecordMaterializer.java
@@ -17,6 +17,7 @@
  */
 package org.apache.drill.exec.store.parquet2;
 
+import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.exec.physical.impl.OutputMutator;
 import org.apache.drill.exec.vector.complex.writer.BaseWriter.ComplexWriter;
 
@@ -24,14 +25,17 @@ import parquet.io.api.GroupConverter;
 import parquet.io.api.RecordMaterializer;
 import parquet.schema.MessageType;
 
+import java.util.Collection;
+import java.util.List;
+
 public class DrillParquetRecordMaterializer extends RecordMaterializer<Void> {
 
   public DrillParquetGroupConverter root;
   private ComplexWriter complexWriter;
 
-  public DrillParquetRecordMaterializer(OutputMutator mutator, ComplexWriter complexWriter, MessageType schema) {
+  public DrillParquetRecordMaterializer(OutputMutator mutator, ComplexWriter complexWriter, MessageType schema, Collection<SchemaPath> columns) {
     this.complexWriter = complexWriter;
-    root = new DrillParquetGroupConverter(mutator, complexWriter.rootAsMap(), schema);
+    root = new DrillParquetGroupConverter(mutator, complexWriter.rootAsMap(), schema, columns);
   }
 
   public void setPosition(int position) {