You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by pa...@apache.org on 2014/11/27 18:32:02 UTC

[1/4] drill git commit: DRILL-1739 Handle missing fields in complex parquet reader

Repository: drill
Updated Branches:
  refs/heads/master 163917ea1 -> 3119011df


DRILL-1739 Handle missing fields in complex parquet reader


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/cf9e71d6
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/cf9e71d6
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/cf9e71d6

Branch: refs/heads/master
Commit: cf9e71d654ddcc203900b4c3187e4bea0ba801d5
Parents: 41e444d
Author: Parth Chandra <pc...@maprtech.com>
Authored: Tue Nov 25 16:57:06 2014 -0800
Committer: Parth Chandra <pc...@maprtech.com>
Committed: Wed Nov 26 14:51:04 2014 -0800

----------------------------------------------------------------------
 .../parquet2/DrillParquetGroupConverter.java    |  23 ++--
 .../exec/store/parquet2/DrillParquetReader.java | 117 +++++++++++++------
 2 files changed, 93 insertions(+), 47 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/cf9e71d6/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetGroupConverter.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetGroupConverter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetGroupConverter.java
index 9bc4a1e..d219927 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetGroupConverter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetGroupConverter.java
@@ -103,19 +103,22 @@ public class DrillParquetGroupConverter extends GroupConverter {
       SchemaPath col=null;
       PathSegment colPath = null;
       PathSegment colNextChild = null;
-      if(colIterator.hasNext()) {
+      while(colIterator.hasNext()) {
         col = colIterator.next();
-        colPath=col.getRootSegment();
+        colPath = col.getRootSegment();
         colNextChild = colPath.getChild();
-      }
-      if( colPath!=null && colPath.isNamed() && (!colPath.getNameSegment().getPath().equals("*")) ){
-        name=colPath.getNameSegment().getPath();
-        // We may have a field that does not exist in the schema
-        if(!name.equalsIgnoreCase(type.getName())){
-          continue;
+
+        if (colPath != null && colPath.isNamed() && (!colPath.getNameSegment().getPath().equals("*"))) {
+          name = colPath.getNameSegment().getPath();
+          // We may have a field that does not exist in the schema
+          if (!name.equalsIgnoreCase(type.getName())) {
+            continue;
+          }
         }
-      }else{
-        name=type.getName();
+        break;
+      }
+      if (name == null) {
+        name = type.getName();
       }
 
       if (!isPrimitive) {

http://git-wip-us.apache.org/repos/asf/drill/blob/cf9e71d6/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java
index 4d223f0..4455c50 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java
@@ -19,25 +19,30 @@ package org.apache.drill.exec.store.parquet2;
 
 import java.io.IOException;
 import java.util.Arrays;
-import java.util.Collection;
-import java.util.HashMap;
 import java.util.List;
+import java.util.Collection;
+import java.util.ArrayList;
 import java.util.Map;
+import java.util.HashMap;
 
 import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.common.expression.PathSegment;
 import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.types.TypeProtos;
 import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.expr.TypeHelper;
 import org.apache.drill.exec.memory.OutOfMemoryException;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.ops.OperatorContext;
 import org.apache.drill.exec.physical.impl.OutputMutator;
+import org.apache.drill.exec.record.MaterializedField;
 import org.apache.drill.exec.record.MaterializedField.Key;
 import org.apache.drill.exec.store.AbstractRecordReader;
 import org.apache.drill.exec.store.parquet.RowGroupReadEntry;
 import org.apache.drill.exec.vector.AllocationHelper;
 import org.apache.drill.exec.vector.BaseValueVector;
 import org.apache.drill.exec.vector.ValueVector;
+import org.apache.drill.exec.vector.NullableBitVector;
 import org.apache.drill.exec.vector.VariableWidthVector;
 import org.apache.drill.exec.vector.complex.impl.VectorContainerWriter;
 import org.apache.hadoop.conf.Configuration;
@@ -68,6 +73,10 @@ public class DrillParquetReader extends AbstractRecordReader {
 
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillParquetReader.class);
 
+  // same as the DEFAULT_RECORDS_TO_READ_IF_NOT_FIXED_WIDTH in ParquetRecordReader
+
+  private static final char DEFAULT_RECORDS_TO_READ = 32*1024;
+
   private ParquetMetadata footer;
   private MessageType schema;
   private Configuration conf;
@@ -87,6 +96,17 @@ public class DrillParquetReader extends AbstractRecordReader {
   private final int fillLevelCheckThreshold;
   private FragmentContext fragmentContext;
 
+  // For columns not found in the file, we need to return a schema element with the correct number of values
+  // at that position in the schema. Currently this requires a vector be present. Here is a list of all of these vectors
+  // that need only have their value count set at the end of each call to next(), as the values default to null.
+  private List<NullableBitVector> nullFilledVectors;
+  // Keeps track of the number of records returned in the case where only columns outside of the file were selected.
+  // No actual data needs to be read out of the file, we only need to return batches until we have 'read' the number of
+  // records specified in the row group metadata
+  long mockRecordsRead=0;
+  private List<SchemaPath> columnsNotFound=null;
+  boolean noColumnsFound = false; // true if none of the columns in the projection list is found in the schema
+
 
   public DrillParquetReader(FragmentContext fragmentContext, ParquetMetadata footer, RowGroupReadEntry entry, List<SchemaPath> columns, Configuration conf) {
     this.footer = footer;
@@ -98,27 +118,27 @@ public class DrillParquetReader extends AbstractRecordReader {
     fillLevelCheckThreshold = this.fragmentContext.getOptions().getOption(ExecConstants.PARQUET_VECTOR_FILL_THRESHOLD).num_val.intValue();
   }
 
-  public static MessageType getProjection(MessageType schema, Collection<SchemaPath> columns) {
+  public static MessageType getProjection(MessageType schema,
+                                          Collection<SchemaPath> columns,
+                                          List<SchemaPath> columnsNotFound) {
     MessageType projection = null;
 
     String messageName = schema.getName();
     List<ColumnDescriptor> schemaColumns = schema.getColumns();
 
     for (SchemaPath path : columns) {
+      boolean colNotFound=true;
       for (ColumnDescriptor colDesc: schemaColumns) {
-        String[] schemaColDesc = colDesc.getPath();
+        String[] schemaColDesc = Arrays.copyOf(colDesc.getPath(), colDesc.getPath().length);
         SchemaPath schemaPath = SchemaPath.getCompoundPath(schemaColDesc);
-
         PathSegment schemaSeg = schemaPath.getRootSegment();
         PathSegment colSeg = path.getRootSegment();
         List<String> segments = Lists.newArrayList();
-        List<String> colSegments = Lists.newArrayList();
         while(schemaSeg != null && colSeg != null){
           if (colSeg.isNamed()) {
             // DRILL-1739 - Use case insensitive name comparison
             if(schemaSeg.getNameSegment().getPath().equalsIgnoreCase(colSeg.getNameSegment().getPath())) {
               segments.add(schemaSeg.getNameSegment().getPath());
-              colSegments.add(colSeg.getNameSegment().getPath());
             }else{
               break;
             }
@@ -133,11 +153,9 @@ public class DrillParquetReader extends AbstractRecordReader {
         if (!segments.isEmpty()) {
           String[] pathSegments = new String[segments.size()];
           segments.toArray(pathSegments);
-          String[] colPathSegments = new String[colSegments.size()];
-          colSegments.toArray(colPathSegments);
-
-          // Use the field names from the schema or we get an exception if the case of the name doesn't match
-          Type t = getType(colPathSegments, pathSegments, 0, schema);
+          colNotFound=false;
+          // Use the field names from the schema otherwise we get an exception if the case of the name doesn't match
+          Type t = getType(pathSegments, 0, schema);
 
           if (projection == null) {
             projection = new MessageType(messageName, t);
@@ -147,6 +165,9 @@ public class DrillParquetReader extends AbstractRecordReader {
           break;
         }
       }
+      if(colNotFound){
+        columnsNotFound.add(path);
+      }
     }
     return projection;
   }
@@ -172,9 +193,23 @@ public class DrillParquetReader extends AbstractRecordReader {
       if (isStarQuery()) {
         projection = schema;
       } else {
-        projection = getProjection(schema, getColumns());
-        if (projection == null) {
-          projection = schema;
+        columnsNotFound=new ArrayList<SchemaPath>();
+        projection = getProjection(schema, getColumns(), columnsNotFound);
+        if(projection == null){
+            projection = schema;
+        }
+        if(columnsNotFound!=null && columnsNotFound.size()>0) {
+          nullFilledVectors = new ArrayList();
+          for(SchemaPath col: columnsNotFound){
+            nullFilledVectors.add(
+              (NullableBitVector)output.addField(MaterializedField.create(col,
+                  org.apache.drill.common.types.Types.optional(TypeProtos.MinorType.BIT)),
+                (Class<? extends ValueVector>) TypeHelper.getValueVectorClass(TypeProtos.MinorType.BIT,
+                  TypeProtos.DataMode.OPTIONAL)));
+          }
+          if(columnsNotFound.size()==getColumns().size()){
+            noColumnsFound=true;
+          }
         }
       }
 
@@ -207,38 +242,24 @@ public class DrillParquetReader extends AbstractRecordReader {
         }
       }
 
-      writer = new VectorContainerWriter(output);
-      recordMaterializer = new DrillParquetRecordMaterializer(output, writer, projection, getColumns());
-      primitiveVectors = writer.getMapVector().getPrimitiveVectors();
-      recordReader = columnIO.getRecordReader(pageReadStore, recordMaterializer);
+      if(!noColumnsFound) {
+        writer = new VectorContainerWriter(output);
+        recordMaterializer = new DrillParquetRecordMaterializer(output, writer, projection, getColumns());
+        primitiveVectors = writer.getMapVector().getPrimitiveVectors();
+        recordReader = columnIO.getRecordReader(pageReadStore, recordMaterializer);
+      }
     } catch (Exception e) {
       throw new ExecutionSetupException(e);
     }
   }
 
-  private static Type getType(String[] colSegs, String[] pathSegments, int depth, MessageType schema) {
+  private static Type getType(String[] pathSegments, int depth, MessageType schema) {
     Type type = schema.getType(Arrays.copyOfRange(pathSegments, 0, depth + 1));
-    //String name = colSegs[depth]; // get the name from the column list not the schema
     if (depth + 1 == pathSegments.length) {
-      //type.name = colSegs[depth];
-      //Type newType = type;
-
-      //if(type.isPrimitive()){
-      //  //newType = new PrimitiveType(type.getRepetition(), type.asPrimitiveType().getPrimitiveTypeName(), name, type.getOriginalType());
-      //  Types.PrimitiveBuilder<PrimitiveType> builder = Types.primitive(
-      //    type.asPrimitiveType().getPrimitiveTypeName(), type.getRepetition());
-      //  if (PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY == type.asPrimitiveType().getPrimitiveTypeName()) {
-      //    builder.length(type.asPrimitiveType().getTypeLength());
-      //  }
-      //  newType = builder.named(name);
-      //}else{
-      //  //newType = new GroupType(type.getRepetition(), name, type);
-      //  newType = new GroupType(type.getRepetition(), name, type.asGroupType().getFields());
-      //}
       return type;
     } else {
       Preconditions.checkState(!type.isPrimitive());
-      return new GroupType(type.getRepetition(), type.getName(), getType(colSegs, pathSegments, depth + 1, schema));
+      return new GroupType(type.getRepetition(), type.getName(), getType(pathSegments, depth + 1, schema));
     }
   }
 
@@ -247,6 +268,21 @@ public class DrillParquetReader extends AbstractRecordReader {
   @Override
   public int next() {
     int count = 0;
+
+    // No columns found in the file were selected, simply return a full batch of null records for each column requested
+    if (noColumnsFound) {
+      if (mockRecordsRead == footer.getBlocks().get(entry.getRowGroupIndex()).getRowCount()) {
+        return 0;
+      }
+      long recordsToRead = 0;
+      recordsToRead = Math.min(DEFAULT_RECORDS_TO_READ, footer.getBlocks().get(entry.getRowGroupIndex()).getRowCount() - mockRecordsRead);
+      for (ValueVector vv : nullFilledVectors ) {
+        vv.getMutator().setValueCount( (int) recordsToRead);
+      }
+      mockRecordsRead += recordsToRead;
+      return (int) recordsToRead;
+    }
+
     while (count < 4000 && totalRead < recordCount) {
       recordMaterializer.setPosition(count);
       recordReader.read();
@@ -262,6 +298,13 @@ public class DrillParquetReader extends AbstractRecordReader {
       }
     }
     writer.setValueCount(count);
+    // if we have requested columns that were not found in the file fill their vectors with null
+    // (by simply setting the value counts inside of them, as they start null filled)
+    if (nullFilledVectors != null) {
+      for (ValueVector vv : nullFilledVectors ) {
+        vv.getMutator().setValueCount(count);
+      }
+    }
     return count;
   }
 


[2/4] drill git commit: DRILL-1738: Allow case insensitive read in Complex parquet reader

Posted by pa...@apache.org.
DRILL-1738: Allow case insensitive read in Complex parquet reader


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/41e444dd
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/41e444dd
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/41e444dd

Branch: refs/heads/master
Commit: 41e444dd51217a0c4b417d3c0b2aed386a287696
Parents: 163917e
Author: Parth Chandra <pc...@maprtech.com>
Authored: Mon Nov 24 09:48:35 2014 -0800
Committer: Parth Chandra <pc...@maprtech.com>
Committed: Wed Nov 26 14:51:04 2014 -0800

----------------------------------------------------------------------
 .../parquet2/DrillParquetGroupConverter.java    |  59 +++++++++--
 .../exec/store/parquet2/DrillParquetReader.java | 103 ++++++++++++++-----
 .../DrillParquetRecordMaterializer.java         |   8 +-
 3 files changed, 134 insertions(+), 36 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/41e444dd/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetGroupConverter.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetGroupConverter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetGroupConverter.java
index c6310b1..9bc4a1e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetGroupConverter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetGroupConverter.java
@@ -20,8 +20,13 @@ package org.apache.drill.exec.store.parquet2;
 import io.netty.buffer.DrillBuf;
 
 import java.math.BigDecimal;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
 import java.util.List;
 
+import org.apache.drill.common.expression.PathSegment;
+import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.exec.expr.holders.BigIntHolder;
 import org.apache.drill.exec.expr.holders.BitHolder;
 import org.apache.drill.exec.expr.holders.DateHolder;
@@ -76,35 +81,73 @@ public class DrillParquetGroupConverter extends GroupConverter {
   private MapWriter mapWriter;
   private final OutputMutator mutator;
 
-  public DrillParquetGroupConverter(OutputMutator mutator, ComplexWriterImpl complexWriter, MessageType schema) {
-    this(mutator, complexWriter.rootAsMap(), schema);
+  public DrillParquetGroupConverter(OutputMutator mutator, ComplexWriterImpl complexWriter, MessageType schema, Collection<SchemaPath> columns) {
+    this(mutator, complexWriter.rootAsMap(), schema, columns);
   }
 
-  public DrillParquetGroupConverter(OutputMutator mutator, MapWriter mapWriter, GroupType schema) {
+  // This function assumes that the fields in the schema parameter are in the same order as the fields in the columns parameter. The
+  // columns parameter may have fields that are not present in the schema, though.
+  public DrillParquetGroupConverter(OutputMutator mutator, MapWriter mapWriter, GroupType schema, Collection<SchemaPath> columns) {
     this.mapWriter = mapWriter;
     this.mutator = mutator;
     converters = Lists.newArrayList();
+
+    Iterator<SchemaPath> colIterator=columns.iterator();
+
     for (Type type : schema.getFields()) {
       Repetition rep = type.getRepetition();
       boolean isPrimitive = type.isPrimitive();
+
+      // Match the name of the field in the schema definition to the name of the field in the query.
+      String name = null;
+      SchemaPath col=null;
+      PathSegment colPath = null;
+      PathSegment colNextChild = null;
+      if(colIterator.hasNext()) {
+        col = colIterator.next();
+        colPath=col.getRootSegment();
+        colNextChild = colPath.getChild();
+      }
+      if( colPath!=null && colPath.isNamed() && (!colPath.getNameSegment().getPath().equals("*")) ){
+        name=colPath.getNameSegment().getPath();
+        // We may have a field that does not exist in the schema
+        if(!name.equalsIgnoreCase(type.getName())){
+          continue;
+        }
+      }else{
+        name=type.getName();
+      }
+
       if (!isPrimitive) {
+        Collection<SchemaPath> c = new ArrayList<SchemaPath>();
+
+        while(colNextChild!=null) {
+          if(colNextChild.isNamed()) {
+            break;
+          }
+          colNextChild=colNextChild.getChild();
+        }
+
+        if(colNextChild!=null) {
+          SchemaPath s = new SchemaPath(colNextChild.getNameSegment());
+          c.add(s);
+        }
         if (rep != Repetition.REPEATED) {
-          DrillParquetGroupConverter converter = new DrillParquetGroupConverter(mutator, mapWriter.map(type.getName()), type.asGroupType());
+          DrillParquetGroupConverter converter = new DrillParquetGroupConverter(mutator, mapWriter.map(name), type.asGroupType(), c);
           converters.add(converter);
         } else {
-          DrillParquetGroupConverter converter = new DrillParquetGroupConverter(mutator, mapWriter.list(type.getName()).map(), type.asGroupType());
+          DrillParquetGroupConverter converter = new DrillParquetGroupConverter(mutator, mapWriter.list(name).map(), type.asGroupType(), c);
           converters.add(converter);
         }
       } else {
-        PrimitiveConverter converter = getConverterForType(type.asPrimitiveType());
+        PrimitiveConverter converter = getConverterForType(name, type.asPrimitiveType());
         converters.add(converter);
       }
     }
   }
 
-  private PrimitiveConverter getConverterForType(PrimitiveType type) {
+  private PrimitiveConverter getConverterForType(String name, PrimitiveType type) {
 
-    String name = type.getName();
     switch(type.getPrimitiveTypeName()) {
       case INT32: {
         if (type.getOriginalType() == null) {

http://git-wip-us.apache.org/repos/asf/drill/blob/41e444dd/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java
index c3e8330..4d223f0 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java
@@ -44,6 +44,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 
+import parquet.column.ColumnDescriptor;
 import parquet.hadoop.CodecFactoryExposer;
 import parquet.hadoop.ColumnChunkIncReadStore;
 import parquet.hadoop.metadata.BlockMetaData;
@@ -56,9 +57,12 @@ import parquet.io.MessageColumnIO;
 import parquet.schema.GroupType;
 import parquet.schema.MessageType;
 import parquet.schema.Type;
+import parquet.schema.PrimitiveType;
+
 
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
+import parquet.schema.Types;
 
 public class DrillParquetReader extends AbstractRecordReader {
 
@@ -96,31 +100,51 @@ public class DrillParquetReader extends AbstractRecordReader {
 
   public static MessageType getProjection(MessageType schema, Collection<SchemaPath> columns) {
     MessageType projection = null;
+
+    String messageName = schema.getName();
+    List<ColumnDescriptor> schemaColumns = schema.getColumns();
+
     for (SchemaPath path : columns) {
-      List<String> segments = Lists.newArrayList();
-      PathSegment rootSegment = path.getRootSegment();
-      PathSegment seg = rootSegment;
-      String messageName = schema.getName();
-      while(seg != null){
-        if(seg.isNamed()) {
-          segments.add(seg.getNameSegment().getPath());
+      for (ColumnDescriptor colDesc: schemaColumns) {
+        String[] schemaColDesc = colDesc.getPath();
+        SchemaPath schemaPath = SchemaPath.getCompoundPath(schemaColDesc);
+
+        PathSegment schemaSeg = schemaPath.getRootSegment();
+        PathSegment colSeg = path.getRootSegment();
+        List<String> segments = Lists.newArrayList();
+        List<String> colSegments = Lists.newArrayList();
+        while(schemaSeg != null && colSeg != null){
+          if (colSeg.isNamed()) {
+            // DRILL-1739 - Use case insensitive name comparison
+            if(schemaSeg.getNameSegment().getPath().equalsIgnoreCase(colSeg.getNameSegment().getPath())) {
+              segments.add(schemaSeg.getNameSegment().getPath());
+              colSegments.add(colSeg.getNameSegment().getPath());
+            }else{
+              break;
+            }
+          }else{
+            colSeg=colSeg.getChild();
+            continue;
+          }
+          colSeg = colSeg.getChild();
+          schemaSeg = schemaSeg.getChild();
         }
-        seg = seg.getChild();
-      }
-      String[] pathSegments = new String[segments.size()];
-      segments.toArray(pathSegments);
-      Type type = null;
-      try {
-        type = schema.getType(pathSegments);
-      } catch (InvalidRecordException e) {
-        logger.warn("Invalid record" , e);
-      }
-      if (type != null) {
-        Type t = getType(pathSegments, 0, schema);
-        if (projection == null) {
-          projection = new MessageType(messageName, t);
-        } else {
-          projection = projection.union(new MessageType(messageName, t));
+        // Field exists in schema
+        if (!segments.isEmpty()) {
+          String[] pathSegments = new String[segments.size()];
+          segments.toArray(pathSegments);
+          String[] colPathSegments = new String[colSegments.size()];
+          colSegments.toArray(colPathSegments);
+
+          // Use the field names from the schema or we get an exception if the case of the name doesn't match
+          Type t = getType(colPathSegments, pathSegments, 0, schema);
+
+          if (projection == null) {
+            projection = new MessageType(messageName, t);
+          } else {
+            projection = projection.union(new MessageType(messageName, t));
+          }
+          break;
         }
       }
     }
@@ -184,7 +208,7 @@ public class DrillParquetReader extends AbstractRecordReader {
       }
 
       writer = new VectorContainerWriter(output);
-      recordMaterializer = new DrillParquetRecordMaterializer(output, writer, projection);
+      recordMaterializer = new DrillParquetRecordMaterializer(output, writer, projection, getColumns());
       primitiveVectors = writer.getMapVector().getPrimitiveVectors();
       recordReader = columnIO.getRecordReader(pageReadStore, recordMaterializer);
     } catch (Exception e) {
@@ -192,13 +216,29 @@ public class DrillParquetReader extends AbstractRecordReader {
     }
   }
 
-  private static Type getType(String[] pathSegments, int depth, MessageType schema) {
+  private static Type getType(String[] colSegs, String[] pathSegments, int depth, MessageType schema) {
     Type type = schema.getType(Arrays.copyOfRange(pathSegments, 0, depth + 1));
+    //String name = colSegs[depth]; // get the name from the column list not the schema
     if (depth + 1 == pathSegments.length) {
+      //type.name = colSegs[depth];
+      //Type newType = type;
+
+      //if(type.isPrimitive()){
+      //  //newType = new PrimitiveType(type.getRepetition(), type.asPrimitiveType().getPrimitiveTypeName(), name, type.getOriginalType());
+      //  Types.PrimitiveBuilder<PrimitiveType> builder = Types.primitive(
+      //    type.asPrimitiveType().getPrimitiveTypeName(), type.getRepetition());
+      //  if (PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY == type.asPrimitiveType().getPrimitiveTypeName()) {
+      //    builder.length(type.asPrimitiveType().getTypeLength());
+      //  }
+      //  newType = builder.named(name);
+      //}else{
+      //  //newType = new GroupType(type.getRepetition(), name, type);
+      //  newType = new GroupType(type.getRepetition(), name, type.asGroupType().getFields());
+      //}
       return type;
     } else {
       Preconditions.checkState(!type.isPrimitive());
-      return new GroupType(type.getRepetition(), type.getName(), getType(pathSegments, depth + 1, schema));
+      return new GroupType(type.getRepetition(), type.getName(), getType(colSegs, pathSegments, depth + 1, schema));
     }
   }
 
@@ -254,5 +294,16 @@ public class DrillParquetReader extends AbstractRecordReader {
     this.operatorContext = operatorContext;
   }
 
+  static public class ProjectedColumnType{
+    ProjectedColumnType(String projectedColumnName, MessageType type){
+      this.projectedColumnName=projectedColumnName;
+      this.type=type;
+    }
+
+    public String projectedColumnName;
+    public MessageType type;
+
+
+  }
 
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/41e444dd/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetRecordMaterializer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetRecordMaterializer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetRecordMaterializer.java
index f9c3480..720e8be 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetRecordMaterializer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetRecordMaterializer.java
@@ -17,6 +17,7 @@
  */
 package org.apache.drill.exec.store.parquet2;
 
+import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.exec.physical.impl.OutputMutator;
 import org.apache.drill.exec.vector.complex.writer.BaseWriter.ComplexWriter;
 
@@ -24,14 +25,17 @@ import parquet.io.api.GroupConverter;
 import parquet.io.api.RecordMaterializer;
 import parquet.schema.MessageType;
 
+import java.util.Collection;
+import java.util.List;
+
 public class DrillParquetRecordMaterializer extends RecordMaterializer<Void> {
 
   public DrillParquetGroupConverter root;
   private ComplexWriter complexWriter;
 
-  public DrillParquetRecordMaterializer(OutputMutator mutator, ComplexWriter complexWriter, MessageType schema) {
+  public DrillParquetRecordMaterializer(OutputMutator mutator, ComplexWriter complexWriter, MessageType schema, Collection<SchemaPath> columns) {
     this.complexWriter = complexWriter;
-    root = new DrillParquetGroupConverter(mutator, complexWriter.rootAsMap(), schema);
+    root = new DrillParquetGroupConverter(mutator, complexWriter.rootAsMap(), schema, columns);
   }
 
   public void setPosition(int position) {


[4/4] drill git commit: DRILL-1743: check capacity before writing into map vector

Posted by pa...@apache.org.
DRILL-1743: check capacity before writing into map vector


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/3119011d
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/3119011d
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/3119011d

Branch: refs/heads/master
Commit: 3119011df44d28d7590de6613f536d64366a1736
Parents: a94a028
Author: Hanifi Gunes <hg...@maprtech.com>
Authored: Wed Nov 19 18:06:01 2014 -0800
Committer: Parth Chandra <pc...@maprtech.com>
Committed: Wed Nov 26 14:54:36 2014 -0800

----------------------------------------------------------------------
 .../java-exec/src/main/codegen/templates/MapWriters.java | 11 +++++++----
 1 file changed, 7 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/3119011d/exec/java-exec/src/main/codegen/templates/MapWriters.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/codegen/templates/MapWriters.java b/exec/java-exec/src/main/codegen/templates/MapWriters.java
index 1fdb4db..b8bd73e 100644
--- a/exec/java-exec/src/main/codegen/templates/MapWriters.java
+++ b/exec/java-exec/src/main/codegen/templates/MapWriters.java
@@ -129,16 +129,16 @@ public class ${mode}MapWriter extends AbstractFieldWriter{
     }
   }
   
-  
+
   public void end(){
     // noop
   }
   <#else>
-  
+
   public void setValueCount(int count){
     container.getMutator().setValueCount(count);
   }
-  
+
   public void setPosition(int index){
     super.setPosition(index);
     for(FieldWriter w: fields.values()){
@@ -146,7 +146,10 @@ public class ${mode}MapWriter extends AbstractFieldWriter{
     }
   }
   public void start(){
-    // noop
+    // check capacity only after we have a non empty container
+    if(container.size() > 0 && ok()) {
+      checkValueCapacity();
+    }
   }
   
   public void end(){


[3/4] drill git commit: DRILL-1660: return accurate buffers size when repeated map vector has no underlying vectors

Posted by pa...@apache.org.
DRILL-1660: return accurate buffers size when repeated map vector has no underlying vectors


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/a94a028c
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/a94a028c
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/a94a028c

Branch: refs/heads/master
Commit: a94a028cfc37fc5b5e8eed319a389cc74682ba68
Parents: cf9e71d
Author: Hanifi Gunes <hg...@maprtech.com>
Authored: Wed Nov 12 13:03:57 2014 -0800
Committer: Parth Chandra <pc...@maprtech.com>
Committed: Wed Nov 26 14:54:25 2014 -0800

----------------------------------------------------------------------
 .../fn/TestJsonReaderWithSparseFiles.java       | 142 ++++++++++---------
 .../complex/fn/single-record-with-empties.json  |   1 +
 2 files changed, 78 insertions(+), 65 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/a94a028c/exec/java-exec/src/test/java/org/apache/drill/exec/vector/complex/fn/TestJsonReaderWithSparseFiles.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/vector/complex/fn/TestJsonReaderWithSparseFiles.java b/exec/java-exec/src/test/java/org/apache/drill/exec/vector/complex/fn/TestJsonReaderWithSparseFiles.java
index 3cfdc1d..3765a29 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/vector/complex/fn/TestJsonReaderWithSparseFiles.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/vector/complex/fn/TestJsonReaderWithSparseFiles.java
@@ -22,11 +22,10 @@ import java.util.List;
 import java.util.Objects;
 
 import org.apache.drill.BaseTestQuery;
-import org.apache.drill.common.expression.SchemaPath;
-import org.apache.drill.exec.proto.UserBitShared;
-import org.apache.drill.exec.proto.beans.QueryResult;
 import org.apache.drill.exec.record.RecordBatchLoader;
 import org.apache.drill.exec.rpc.user.QueryResultBatch;
+import org.apache.drill.exec.util.JsonStringArrayList;
+import org.apache.drill.exec.util.JsonStringHashMap;
 import org.apache.drill.exec.vector.ValueVector;
 import org.junit.Test;
 
@@ -36,6 +35,55 @@ public class TestJsonReaderWithSparseFiles extends BaseTestQuery {
     void apply(T param);
   }
 
+  static class TypeConverter {
+
+    public Object convert(Object obj) {
+      if (obj == null) {
+        return null;
+      }
+      if (obj instanceof JsonStringArrayList || obj instanceof JsonStringHashMap) {
+        return obj.toString();
+      }
+      return obj;
+    }
+
+  }
+
+  static class Verifier implements Function<RecordBatchLoader> {
+
+    private final int count;
+    private final Object[][] values;
+    private final TypeConverter converter = new TypeConverter();
+
+    protected Verifier(int count, Object[][] values) {
+      this.count = count;
+      this.values = values;
+    }
+
+    @Override
+    public void apply(RecordBatchLoader loader) {
+      assert loader.getRecordCount() == count : "invalid record count returned";
+
+      Object[] row;
+      Object expected;
+      Object actual;
+      for (int r=0;r<values.length;r++) {
+        row = values[r];
+        for (int c=0; c<values[r].length; c++) {
+          expected = row[c];
+          actual = loader.getValueAccessorById(ValueVector.class, c).getValueVector().getAccessor().getObject(r);
+          actual = converter.convert(actual);
+          assert Objects.equals(actual, expected) : String.format("row:%d - col:%d - expected:%s[%s] - actual:%s[%s]",
+              r, c,
+              expected,
+              expected==null?"null":expected.getClass().getSimpleName(),
+              actual,
+              actual==null?"null":actual.getClass().getSimpleName());
+        }
+      }
+    }
+  }
+
   protected void query(final String query, final Function<RecordBatchLoader> testBody) throws Exception {
     List<QueryResultBatch> batches = testSqlWithResults(query);
     RecordBatchLoader loader = new RecordBatchLoader(client.getAllocator());
@@ -55,73 +103,37 @@ public class TestJsonReaderWithSparseFiles extends BaseTestQuery {
   @Test
   public void testIfDrillCanReadSparseRecords() throws Exception {
     final String sql = "select * from cp.`vector/complex/fn/sparse.json`";
-    query(sql, new Function<RecordBatchLoader>() {
-      @Override
-      public void apply(RecordBatchLoader loader) {
-        assert loader.getRecordCount() == 4 : "invalid record count returned";
-
-        //XXX: make sure value order matches vector order
-        final Object[][] values = new Object[][] {
-            {null, null},
-            {1L, null},
-            {null, 2L},
-            {3L, 3L}
-        };
-
-        Object[] row;
-        Object expected;
-        Object actual;
-        for (int r=0;r<values.length;r++) {
-          row = values[r];
-          for (int c=0; c<values[r].length; c++) {
-            expected = row[c];
-            actual = loader.getValueAccessorById(ValueVector.class, c).getValueVector().getAccessor().getObject(r);
-            assert Objects.equals(expected, actual) : String.format("row:%d - col:%d - expected:%s[%s] - actual:%s[%s]",
-                r, c,
-                expected,
-                expected==null?"null":expected.getClass().getSimpleName(),
-                actual,
-                actual==null?"null":actual.getClass().getSimpleName());
-          }
-        }
-      }
-    });
+    //XXX: make sure value order matches vector order
+    final Object[][] values = new Object[][] {
+        {null, null},
+        {1L, null},
+        {null, 2L},
+        {3L, 3L}
+    };
+    query(sql, new Verifier(4, values));
   }
 
   @Test
   public void testIfDrillCanReadSparseNestedRecordsWithoutRaisingException() throws Exception {
     final String sql = "select * from cp.`vector/complex/fn/nested-with-nulls.json`";
-    query(sql, new Function<RecordBatchLoader>() {
-      @Override
-      public void apply(RecordBatchLoader loader) {
-        assert loader.getRecordCount() == 4 : "invalid record count returned";
-
-        //XXX: make sure value order matches vector order
-        final Object[][] values = new Object[][] {
-            {"[{},{},{},{\"name\":\"doe\"},{}]"},
-            {"[]"},
-            {"[{\"name\":\"john\",\"id\":10}]"},
-            {"[{},{}]"},
-        };
-
-        Object[] row;
-        Object expected;
-        Object actual;
-        for (int r=0;r<values.length;r++) {
-          row = values[r];
-          for (int c = 0; c < values[r].length; c++) {
-            expected = row[c];
-            actual = loader.getValueAccessorById(ValueVector.class, c).getValueVector().getAccessor().getObject(r);
-            assert Objects.equals(actual, expected) : String.format("row:%d - col:%d - expected:%s[%s] - actual:%s[%s]",
-                r, c,
-                expected,
-                expected == null ? "null" : expected.getClass().getSimpleName(),
-                actual,
-                actual == null ? "null" : actual.getClass().getSimpleName());
-          }
-        }
-      }
-    });
+    //XXX: make sure value order matches vector order
+    final Object[][] values = new Object[][] {
+        {"[{},{},{},{\"name\":\"doe\"},{}]"},
+        {"[]"},
+        {"[{\"name\":\"john\",\"id\":10}]"},
+        {"[{},{}]"},
+    };
+    query(sql, new Verifier(4, values));
+  }
+
+  @Test
+  public void testIfDrillCanQuerySingleRecordWithEmpties() throws Exception {
+    final String sql = "select * from cp.`vector/complex/fn/single-record-with-empties.json`";
+    //XXX: make sure value order matches vector order
+    final Object[][] values = new Object[][] {
+        {"[{},{}]"},
+    };
+    query(sql, new Verifier(1, values));
   }
 
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/a94a028c/exec/java-exec/src/test/resources/vector/complex/fn/single-record-with-empties.json
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/vector/complex/fn/single-record-with-empties.json b/exec/java-exec/src/test/resources/vector/complex/fn/single-record-with-empties.json
new file mode 100644
index 0000000..a3c2fbf
--- /dev/null
+++ b/exec/java-exec/src/test/resources/vector/complex/fn/single-record-with-empties.json
@@ -0,0 +1 @@
+{"users":[{}, {"name":null}]}
\ No newline at end of file