You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by pa...@apache.org on 2014/11/27 18:32:03 UTC
[2/4] drill git commit: DRILL-1738: Allow case insensitive read in
Complex parquet reader
DRILL-1738: Allow case insensitive read in Complex parquet reader
Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/41e444dd
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/41e444dd
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/41e444dd
Branch: refs/heads/master
Commit: 41e444dd51217a0c4b417d3c0b2aed386a287696
Parents: 163917e
Author: Parth Chandra <pc...@maprtech.com>
Authored: Mon Nov 24 09:48:35 2014 -0800
Committer: Parth Chandra <pc...@maprtech.com>
Committed: Wed Nov 26 14:51:04 2014 -0800
----------------------------------------------------------------------
.../parquet2/DrillParquetGroupConverter.java | 59 +++++++++--
.../exec/store/parquet2/DrillParquetReader.java | 103 ++++++++++++++-----
.../DrillParquetRecordMaterializer.java | 8 +-
3 files changed, 134 insertions(+), 36 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/drill/blob/41e444dd/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetGroupConverter.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetGroupConverter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetGroupConverter.java
index c6310b1..9bc4a1e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetGroupConverter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetGroupConverter.java
@@ -20,8 +20,13 @@ package org.apache.drill.exec.store.parquet2;
import io.netty.buffer.DrillBuf;
import java.math.BigDecimal;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
import java.util.List;
+import org.apache.drill.common.expression.PathSegment;
+import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.exec.expr.holders.BigIntHolder;
import org.apache.drill.exec.expr.holders.BitHolder;
import org.apache.drill.exec.expr.holders.DateHolder;
@@ -76,35 +81,73 @@ public class DrillParquetGroupConverter extends GroupConverter {
private MapWriter mapWriter;
private final OutputMutator mutator;
- public DrillParquetGroupConverter(OutputMutator mutator, ComplexWriterImpl complexWriter, MessageType schema) {
- this(mutator, complexWriter.rootAsMap(), schema);
+ public DrillParquetGroupConverter(OutputMutator mutator, ComplexWriterImpl complexWriter, MessageType schema, Collection<SchemaPath> columns) {
+ this(mutator, complexWriter.rootAsMap(), schema, columns);
}
- public DrillParquetGroupConverter(OutputMutator mutator, MapWriter mapWriter, GroupType schema) {
+ // This function assumes that the fields in the schema parameter are in the same order as the fields in the columns parameter. The
+ // columns parameter may have fields that are not present in the schema, though.
+ public DrillParquetGroupConverter(OutputMutator mutator, MapWriter mapWriter, GroupType schema, Collection<SchemaPath> columns) {
this.mapWriter = mapWriter;
this.mutator = mutator;
converters = Lists.newArrayList();
+
+ Iterator<SchemaPath> colIterator=columns.iterator();
+
for (Type type : schema.getFields()) {
Repetition rep = type.getRepetition();
boolean isPrimitive = type.isPrimitive();
+
+ // Match the name of the field in the schema definition to the name of the field in the query.
+ String name = null;
+ SchemaPath col=null;
+ PathSegment colPath = null;
+ PathSegment colNextChild = null;
+ if(colIterator.hasNext()) {
+ col = colIterator.next();
+ colPath=col.getRootSegment();
+ colNextChild = colPath.getChild();
+ }
+ if( colPath!=null && colPath.isNamed() && (!colPath.getNameSegment().getPath().equals("*")) ){
+ name=colPath.getNameSegment().getPath();
+ // We may have a field that does not exist in the schema
+ if(!name.equalsIgnoreCase(type.getName())){
+ continue;
+ }
+ }else{
+ name=type.getName();
+ }
+
if (!isPrimitive) {
+ Collection<SchemaPath> c = new ArrayList<SchemaPath>();
+
+ while(colNextChild!=null) {
+ if(colNextChild.isNamed()) {
+ break;
+ }
+ colNextChild=colNextChild.getChild();
+ }
+
+ if(colNextChild!=null) {
+ SchemaPath s = new SchemaPath(colNextChild.getNameSegment());
+ c.add(s);
+ }
if (rep != Repetition.REPEATED) {
- DrillParquetGroupConverter converter = new DrillParquetGroupConverter(mutator, mapWriter.map(type.getName()), type.asGroupType());
+ DrillParquetGroupConverter converter = new DrillParquetGroupConverter(mutator, mapWriter.map(name), type.asGroupType(), c);
converters.add(converter);
} else {
- DrillParquetGroupConverter converter = new DrillParquetGroupConverter(mutator, mapWriter.list(type.getName()).map(), type.asGroupType());
+ DrillParquetGroupConverter converter = new DrillParquetGroupConverter(mutator, mapWriter.list(name).map(), type.asGroupType(), c);
converters.add(converter);
}
} else {
- PrimitiveConverter converter = getConverterForType(type.asPrimitiveType());
+ PrimitiveConverter converter = getConverterForType(name, type.asPrimitiveType());
converters.add(converter);
}
}
}
- private PrimitiveConverter getConverterForType(PrimitiveType type) {
+ private PrimitiveConverter getConverterForType(String name, PrimitiveType type) {
- String name = type.getName();
switch(type.getPrimitiveTypeName()) {
case INT32: {
if (type.getOriginalType() == null) {
http://git-wip-us.apache.org/repos/asf/drill/blob/41e444dd/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java
index c3e8330..4d223f0 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java
@@ -44,6 +44,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import parquet.column.ColumnDescriptor;
import parquet.hadoop.CodecFactoryExposer;
import parquet.hadoop.ColumnChunkIncReadStore;
import parquet.hadoop.metadata.BlockMetaData;
@@ -56,9 +57,12 @@ import parquet.io.MessageColumnIO;
import parquet.schema.GroupType;
import parquet.schema.MessageType;
import parquet.schema.Type;
+import parquet.schema.PrimitiveType;
+
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
+import parquet.schema.Types;
public class DrillParquetReader extends AbstractRecordReader {
@@ -96,31 +100,51 @@ public class DrillParquetReader extends AbstractRecordReader {
public static MessageType getProjection(MessageType schema, Collection<SchemaPath> columns) {
MessageType projection = null;
+
+ String messageName = schema.getName();
+ List<ColumnDescriptor> schemaColumns = schema.getColumns();
+
for (SchemaPath path : columns) {
- List<String> segments = Lists.newArrayList();
- PathSegment rootSegment = path.getRootSegment();
- PathSegment seg = rootSegment;
- String messageName = schema.getName();
- while(seg != null){
- if(seg.isNamed()) {
- segments.add(seg.getNameSegment().getPath());
+ for (ColumnDescriptor colDesc: schemaColumns) {
+ String[] schemaColDesc = colDesc.getPath();
+ SchemaPath schemaPath = SchemaPath.getCompoundPath(schemaColDesc);
+
+ PathSegment schemaSeg = schemaPath.getRootSegment();
+ PathSegment colSeg = path.getRootSegment();
+ List<String> segments = Lists.newArrayList();
+ List<String> colSegments = Lists.newArrayList();
+ while(schemaSeg != null && colSeg != null){
+ if (colSeg.isNamed()) {
+ // DRILL-1739 - Use case insensitive name comparison
+ if(schemaSeg.getNameSegment().getPath().equalsIgnoreCase(colSeg.getNameSegment().getPath())) {
+ segments.add(schemaSeg.getNameSegment().getPath());
+ colSegments.add(colSeg.getNameSegment().getPath());
+ }else{
+ break;
+ }
+ }else{
+ colSeg=colSeg.getChild();
+ continue;
+ }
+ colSeg = colSeg.getChild();
+ schemaSeg = schemaSeg.getChild();
}
- seg = seg.getChild();
- }
- String[] pathSegments = new String[segments.size()];
- segments.toArray(pathSegments);
- Type type = null;
- try {
- type = schema.getType(pathSegments);
- } catch (InvalidRecordException e) {
- logger.warn("Invalid record" , e);
- }
- if (type != null) {
- Type t = getType(pathSegments, 0, schema);
- if (projection == null) {
- projection = new MessageType(messageName, t);
- } else {
- projection = projection.union(new MessageType(messageName, t));
+ // Field exists in schema
+ if (!segments.isEmpty()) {
+ String[] pathSegments = new String[segments.size()];
+ segments.toArray(pathSegments);
+ String[] colPathSegments = new String[colSegments.size()];
+ colSegments.toArray(colPathSegments);
+
+ // Use the field names from the schema or we get an exception if the case of the name doesn't match
+ Type t = getType(colPathSegments, pathSegments, 0, schema);
+
+ if (projection == null) {
+ projection = new MessageType(messageName, t);
+ } else {
+ projection = projection.union(new MessageType(messageName, t));
+ }
+ break;
}
}
}
@@ -184,7 +208,7 @@ public class DrillParquetReader extends AbstractRecordReader {
}
writer = new VectorContainerWriter(output);
- recordMaterializer = new DrillParquetRecordMaterializer(output, writer, projection);
+ recordMaterializer = new DrillParquetRecordMaterializer(output, writer, projection, getColumns());
primitiveVectors = writer.getMapVector().getPrimitiveVectors();
recordReader = columnIO.getRecordReader(pageReadStore, recordMaterializer);
} catch (Exception e) {
@@ -192,13 +216,29 @@ public class DrillParquetReader extends AbstractRecordReader {
}
}
- private static Type getType(String[] pathSegments, int depth, MessageType schema) {
+ private static Type getType(String[] colSegs, String[] pathSegments, int depth, MessageType schema) {
Type type = schema.getType(Arrays.copyOfRange(pathSegments, 0, depth + 1));
+ //String name = colSegs[depth]; // get the name from the column list not the schema
if (depth + 1 == pathSegments.length) {
+ //type.name = colSegs[depth];
+ //Type newType = type;
+
+ //if(type.isPrimitive()){
+ // //newType = new PrimitiveType(type.getRepetition(), type.asPrimitiveType().getPrimitiveTypeName(), name, type.getOriginalType());
+ // Types.PrimitiveBuilder<PrimitiveType> builder = Types.primitive(
+ // type.asPrimitiveType().getPrimitiveTypeName(), type.getRepetition());
+ // if (PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY == type.asPrimitiveType().getPrimitiveTypeName()) {
+ // builder.length(type.asPrimitiveType().getTypeLength());
+ // }
+ // newType = builder.named(name);
+ //}else{
+ // //newType = new GroupType(type.getRepetition(), name, type);
+ // newType = new GroupType(type.getRepetition(), name, type.asGroupType().getFields());
+ //}
return type;
} else {
Preconditions.checkState(!type.isPrimitive());
- return new GroupType(type.getRepetition(), type.getName(), getType(pathSegments, depth + 1, schema));
+ return new GroupType(type.getRepetition(), type.getName(), getType(colSegs, pathSegments, depth + 1, schema));
}
}
@@ -254,5 +294,16 @@ public class DrillParquetReader extends AbstractRecordReader {
this.operatorContext = operatorContext;
}
+ static public class ProjectedColumnType{
+ ProjectedColumnType(String projectedColumnName, MessageType type){
+ this.projectedColumnName=projectedColumnName;
+ this.type=type;
+ }
+
+ public String projectedColumnName;
+ public MessageType type;
+
+
+ }
}
http://git-wip-us.apache.org/repos/asf/drill/blob/41e444dd/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetRecordMaterializer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetRecordMaterializer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetRecordMaterializer.java
index f9c3480..720e8be 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetRecordMaterializer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetRecordMaterializer.java
@@ -17,6 +17,7 @@
*/
package org.apache.drill.exec.store.parquet2;
+import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.exec.physical.impl.OutputMutator;
import org.apache.drill.exec.vector.complex.writer.BaseWriter.ComplexWriter;
@@ -24,14 +25,17 @@ import parquet.io.api.GroupConverter;
import parquet.io.api.RecordMaterializer;
import parquet.schema.MessageType;
+import java.util.Collection;
+import java.util.List;
+
public class DrillParquetRecordMaterializer extends RecordMaterializer<Void> {
public DrillParquetGroupConverter root;
private ComplexWriter complexWriter;
- public DrillParquetRecordMaterializer(OutputMutator mutator, ComplexWriter complexWriter, MessageType schema) {
+ public DrillParquetRecordMaterializer(OutputMutator mutator, ComplexWriter complexWriter, MessageType schema, Collection<SchemaPath> columns) {
this.complexWriter = complexWriter;
- root = new DrillParquetGroupConverter(mutator, complexWriter.rootAsMap(), schema);
+ root = new DrillParquetGroupConverter(mutator, complexWriter.rootAsMap(), schema, columns);
}
public void setPosition(int position) {