You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by pa...@apache.org on 2014/11/27 18:32:02 UTC
[1/4] drill git commit: DRILL-1739 Handle missing fields in complex
parquet reader
Repository: drill
Updated Branches:
refs/heads/master 163917ea1 -> 3119011df
DRILL-1739 Handle missing fields in complex parquet reader
Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/cf9e71d6
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/cf9e71d6
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/cf9e71d6
Branch: refs/heads/master
Commit: cf9e71d654ddcc203900b4c3187e4bea0ba801d5
Parents: 41e444d
Author: Parth Chandra <pc...@maprtech.com>
Authored: Tue Nov 25 16:57:06 2014 -0800
Committer: Parth Chandra <pc...@maprtech.com>
Committed: Wed Nov 26 14:51:04 2014 -0800
----------------------------------------------------------------------
.../parquet2/DrillParquetGroupConverter.java | 23 ++--
.../exec/store/parquet2/DrillParquetReader.java | 117 +++++++++++++------
2 files changed, 93 insertions(+), 47 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/drill/blob/cf9e71d6/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetGroupConverter.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetGroupConverter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetGroupConverter.java
index 9bc4a1e..d219927 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetGroupConverter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetGroupConverter.java
@@ -103,19 +103,22 @@ public class DrillParquetGroupConverter extends GroupConverter {
SchemaPath col=null;
PathSegment colPath = null;
PathSegment colNextChild = null;
- if(colIterator.hasNext()) {
+ while(colIterator.hasNext()) {
col = colIterator.next();
- colPath=col.getRootSegment();
+ colPath = col.getRootSegment();
colNextChild = colPath.getChild();
- }
- if( colPath!=null && colPath.isNamed() && (!colPath.getNameSegment().getPath().equals("*")) ){
- name=colPath.getNameSegment().getPath();
- // We may have a field that does not exist in the schema
- if(!name.equalsIgnoreCase(type.getName())){
- continue;
+
+ if (colPath != null && colPath.isNamed() && (!colPath.getNameSegment().getPath().equals("*"))) {
+ name = colPath.getNameSegment().getPath();
+ // We may have a field that does not exist in the schema
+ if (!name.equalsIgnoreCase(type.getName())) {
+ continue;
+ }
}
- }else{
- name=type.getName();
+ break;
+ }
+ if (name == null) {
+ name = type.getName();
}
if (!isPrimitive) {
http://git-wip-us.apache.org/repos/asf/drill/blob/cf9e71d6/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java
index 4d223f0..4455c50 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java
@@ -19,25 +19,30 @@ package org.apache.drill.exec.store.parquet2;
import java.io.IOException;
import java.util.Arrays;
-import java.util.Collection;
-import java.util.HashMap;
import java.util.List;
+import java.util.Collection;
+import java.util.ArrayList;
import java.util.Map;
+import java.util.HashMap;
import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.common.expression.PathSegment;
import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.types.TypeProtos;
import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.expr.TypeHelper;
import org.apache.drill.exec.memory.OutOfMemoryException;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.ops.OperatorContext;
import org.apache.drill.exec.physical.impl.OutputMutator;
+import org.apache.drill.exec.record.MaterializedField;
import org.apache.drill.exec.record.MaterializedField.Key;
import org.apache.drill.exec.store.AbstractRecordReader;
import org.apache.drill.exec.store.parquet.RowGroupReadEntry;
import org.apache.drill.exec.vector.AllocationHelper;
import org.apache.drill.exec.vector.BaseValueVector;
import org.apache.drill.exec.vector.ValueVector;
+import org.apache.drill.exec.vector.NullableBitVector;
import org.apache.drill.exec.vector.VariableWidthVector;
import org.apache.drill.exec.vector.complex.impl.VectorContainerWriter;
import org.apache.hadoop.conf.Configuration;
@@ -68,6 +73,10 @@ public class DrillParquetReader extends AbstractRecordReader {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillParquetReader.class);
+ // same as the DEFAULT_RECORDS_TO_READ_IF_NOT_FIXED_WIDTH in ParquetRecordReader
+
+ private static final char DEFAULT_RECORDS_TO_READ = 32*1024;
+
private ParquetMetadata footer;
private MessageType schema;
private Configuration conf;
@@ -87,6 +96,17 @@ public class DrillParquetReader extends AbstractRecordReader {
private final int fillLevelCheckThreshold;
private FragmentContext fragmentContext;
+ // For columns not found in the file, we need to return a schema element with the correct number of values
+ // at that position in the schema. Currently this requires a vector be present. Here is a list of all of these vectors
+ // that need only have their value count set at the end of each call to next(), as the values default to null.
+ private List<NullableBitVector> nullFilledVectors;
+ // Keeps track of the number of records returned in the case where only columns outside of the file were selected.
+ // No actual data needs to be read out of the file, we only need to return batches until we have 'read' the number of
+ // records specified in the row group metadata
+ long mockRecordsRead=0;
+ private List<SchemaPath> columnsNotFound=null;
+ boolean noColumnsFound = false; // true if none of the columns in the projection list is found in the schema
+
public DrillParquetReader(FragmentContext fragmentContext, ParquetMetadata footer, RowGroupReadEntry entry, List<SchemaPath> columns, Configuration conf) {
this.footer = footer;
@@ -98,27 +118,27 @@ public class DrillParquetReader extends AbstractRecordReader {
fillLevelCheckThreshold = this.fragmentContext.getOptions().getOption(ExecConstants.PARQUET_VECTOR_FILL_THRESHOLD).num_val.intValue();
}
- public static MessageType getProjection(MessageType schema, Collection<SchemaPath> columns) {
+ public static MessageType getProjection(MessageType schema,
+ Collection<SchemaPath> columns,
+ List<SchemaPath> columnsNotFound) {
MessageType projection = null;
String messageName = schema.getName();
List<ColumnDescriptor> schemaColumns = schema.getColumns();
for (SchemaPath path : columns) {
+ boolean colNotFound=true;
for (ColumnDescriptor colDesc: schemaColumns) {
- String[] schemaColDesc = colDesc.getPath();
+ String[] schemaColDesc = Arrays.copyOf(colDesc.getPath(), colDesc.getPath().length);
SchemaPath schemaPath = SchemaPath.getCompoundPath(schemaColDesc);
-
PathSegment schemaSeg = schemaPath.getRootSegment();
PathSegment colSeg = path.getRootSegment();
List<String> segments = Lists.newArrayList();
- List<String> colSegments = Lists.newArrayList();
while(schemaSeg != null && colSeg != null){
if (colSeg.isNamed()) {
// DRILL-1739 - Use case insensitive name comparison
if(schemaSeg.getNameSegment().getPath().equalsIgnoreCase(colSeg.getNameSegment().getPath())) {
segments.add(schemaSeg.getNameSegment().getPath());
- colSegments.add(colSeg.getNameSegment().getPath());
}else{
break;
}
@@ -133,11 +153,9 @@ public class DrillParquetReader extends AbstractRecordReader {
if (!segments.isEmpty()) {
String[] pathSegments = new String[segments.size()];
segments.toArray(pathSegments);
- String[] colPathSegments = new String[colSegments.size()];
- colSegments.toArray(colPathSegments);
-
- // Use the field names from the schema or we get an exception if the case of the name doesn't match
- Type t = getType(colPathSegments, pathSegments, 0, schema);
+ colNotFound=false;
+ // Use the field names from the schema otherwise we get an exception if the case of the name doesn't match
+ Type t = getType(pathSegments, 0, schema);
if (projection == null) {
projection = new MessageType(messageName, t);
@@ -147,6 +165,9 @@ public class DrillParquetReader extends AbstractRecordReader {
break;
}
}
+ if(colNotFound){
+ columnsNotFound.add(path);
+ }
}
return projection;
}
@@ -172,9 +193,23 @@ public class DrillParquetReader extends AbstractRecordReader {
if (isStarQuery()) {
projection = schema;
} else {
- projection = getProjection(schema, getColumns());
- if (projection == null) {
- projection = schema;
+ columnsNotFound=new ArrayList<SchemaPath>();
+ projection = getProjection(schema, getColumns(), columnsNotFound);
+ if(projection == null){
+ projection = schema;
+ }
+ if(columnsNotFound!=null && columnsNotFound.size()>0) {
+ nullFilledVectors = new ArrayList();
+ for(SchemaPath col: columnsNotFound){
+ nullFilledVectors.add(
+ (NullableBitVector)output.addField(MaterializedField.create(col,
+ org.apache.drill.common.types.Types.optional(TypeProtos.MinorType.BIT)),
+ (Class<? extends ValueVector>) TypeHelper.getValueVectorClass(TypeProtos.MinorType.BIT,
+ TypeProtos.DataMode.OPTIONAL)));
+ }
+ if(columnsNotFound.size()==getColumns().size()){
+ noColumnsFound=true;
+ }
}
}
@@ -207,38 +242,24 @@ public class DrillParquetReader extends AbstractRecordReader {
}
}
- writer = new VectorContainerWriter(output);
- recordMaterializer = new DrillParquetRecordMaterializer(output, writer, projection, getColumns());
- primitiveVectors = writer.getMapVector().getPrimitiveVectors();
- recordReader = columnIO.getRecordReader(pageReadStore, recordMaterializer);
+ if(!noColumnsFound) {
+ writer = new VectorContainerWriter(output);
+ recordMaterializer = new DrillParquetRecordMaterializer(output, writer, projection, getColumns());
+ primitiveVectors = writer.getMapVector().getPrimitiveVectors();
+ recordReader = columnIO.getRecordReader(pageReadStore, recordMaterializer);
+ }
} catch (Exception e) {
throw new ExecutionSetupException(e);
}
}
- private static Type getType(String[] colSegs, String[] pathSegments, int depth, MessageType schema) {
+ private static Type getType(String[] pathSegments, int depth, MessageType schema) {
Type type = schema.getType(Arrays.copyOfRange(pathSegments, 0, depth + 1));
- //String name = colSegs[depth]; // get the name from the column list not the schema
if (depth + 1 == pathSegments.length) {
- //type.name = colSegs[depth];
- //Type newType = type;
-
- //if(type.isPrimitive()){
- // //newType = new PrimitiveType(type.getRepetition(), type.asPrimitiveType().getPrimitiveTypeName(), name, type.getOriginalType());
- // Types.PrimitiveBuilder<PrimitiveType> builder = Types.primitive(
- // type.asPrimitiveType().getPrimitiveTypeName(), type.getRepetition());
- // if (PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY == type.asPrimitiveType().getPrimitiveTypeName()) {
- // builder.length(type.asPrimitiveType().getTypeLength());
- // }
- // newType = builder.named(name);
- //}else{
- // //newType = new GroupType(type.getRepetition(), name, type);
- // newType = new GroupType(type.getRepetition(), name, type.asGroupType().getFields());
- //}
return type;
} else {
Preconditions.checkState(!type.isPrimitive());
- return new GroupType(type.getRepetition(), type.getName(), getType(colSegs, pathSegments, depth + 1, schema));
+ return new GroupType(type.getRepetition(), type.getName(), getType(pathSegments, depth + 1, schema));
}
}
@@ -247,6 +268,21 @@ public class DrillParquetReader extends AbstractRecordReader {
@Override
public int next() {
int count = 0;
+
+ // No columns found in the file were selected, simply return a full batch of null records for each column requested
+ if (noColumnsFound) {
+ if (mockRecordsRead == footer.getBlocks().get(entry.getRowGroupIndex()).getRowCount()) {
+ return 0;
+ }
+ long recordsToRead = 0;
+ recordsToRead = Math.min(DEFAULT_RECORDS_TO_READ, footer.getBlocks().get(entry.getRowGroupIndex()).getRowCount() - mockRecordsRead);
+ for (ValueVector vv : nullFilledVectors ) {
+ vv.getMutator().setValueCount( (int) recordsToRead);
+ }
+ mockRecordsRead += recordsToRead;
+ return (int) recordsToRead;
+ }
+
while (count < 4000 && totalRead < recordCount) {
recordMaterializer.setPosition(count);
recordReader.read();
@@ -262,6 +298,13 @@ public class DrillParquetReader extends AbstractRecordReader {
}
}
writer.setValueCount(count);
+ // if we have requested columns that were not found in the file fill their vectors with null
+ // (by simply setting the value counts inside of them, as they start null filled)
+ if (nullFilledVectors != null) {
+ for (ValueVector vv : nullFilledVectors ) {
+ vv.getMutator().setValueCount(count);
+ }
+ }
return count;
}
[2/4] drill git commit: DRILL-1738: Allow case insensitive read in
Complex parquet reader
Posted by pa...@apache.org.
DRILL-1738: Allow case insensitive read in Complex parquet reader
Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/41e444dd
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/41e444dd
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/41e444dd
Branch: refs/heads/master
Commit: 41e444dd51217a0c4b417d3c0b2aed386a287696
Parents: 163917e
Author: Parth Chandra <pc...@maprtech.com>
Authored: Mon Nov 24 09:48:35 2014 -0800
Committer: Parth Chandra <pc...@maprtech.com>
Committed: Wed Nov 26 14:51:04 2014 -0800
----------------------------------------------------------------------
.../parquet2/DrillParquetGroupConverter.java | 59 +++++++++--
.../exec/store/parquet2/DrillParquetReader.java | 103 ++++++++++++++-----
.../DrillParquetRecordMaterializer.java | 8 +-
3 files changed, 134 insertions(+), 36 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/drill/blob/41e444dd/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetGroupConverter.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetGroupConverter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetGroupConverter.java
index c6310b1..9bc4a1e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetGroupConverter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetGroupConverter.java
@@ -20,8 +20,13 @@ package org.apache.drill.exec.store.parquet2;
import io.netty.buffer.DrillBuf;
import java.math.BigDecimal;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
import java.util.List;
+import org.apache.drill.common.expression.PathSegment;
+import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.exec.expr.holders.BigIntHolder;
import org.apache.drill.exec.expr.holders.BitHolder;
import org.apache.drill.exec.expr.holders.DateHolder;
@@ -76,35 +81,73 @@ public class DrillParquetGroupConverter extends GroupConverter {
private MapWriter mapWriter;
private final OutputMutator mutator;
- public DrillParquetGroupConverter(OutputMutator mutator, ComplexWriterImpl complexWriter, MessageType schema) {
- this(mutator, complexWriter.rootAsMap(), schema);
+ public DrillParquetGroupConverter(OutputMutator mutator, ComplexWriterImpl complexWriter, MessageType schema, Collection<SchemaPath> columns) {
+ this(mutator, complexWriter.rootAsMap(), schema, columns);
}
- public DrillParquetGroupConverter(OutputMutator mutator, MapWriter mapWriter, GroupType schema) {
+ // This function assumes that the fields in the schema parameter are in the same order as the fields in the columns parameter. The
+ // columns parameter may have fields that are not present in the schema, though.
+ public DrillParquetGroupConverter(OutputMutator mutator, MapWriter mapWriter, GroupType schema, Collection<SchemaPath> columns) {
this.mapWriter = mapWriter;
this.mutator = mutator;
converters = Lists.newArrayList();
+
+ Iterator<SchemaPath> colIterator=columns.iterator();
+
for (Type type : schema.getFields()) {
Repetition rep = type.getRepetition();
boolean isPrimitive = type.isPrimitive();
+
+ // Match the name of the field in the schema definition to the name of the field in the query.
+ String name = null;
+ SchemaPath col=null;
+ PathSegment colPath = null;
+ PathSegment colNextChild = null;
+ if(colIterator.hasNext()) {
+ col = colIterator.next();
+ colPath=col.getRootSegment();
+ colNextChild = colPath.getChild();
+ }
+ if( colPath!=null && colPath.isNamed() && (!colPath.getNameSegment().getPath().equals("*")) ){
+ name=colPath.getNameSegment().getPath();
+ // We may have a field that does not exist in the schema
+ if(!name.equalsIgnoreCase(type.getName())){
+ continue;
+ }
+ }else{
+ name=type.getName();
+ }
+
if (!isPrimitive) {
+ Collection<SchemaPath> c = new ArrayList<SchemaPath>();
+
+ while(colNextChild!=null) {
+ if(colNextChild.isNamed()) {
+ break;
+ }
+ colNextChild=colNextChild.getChild();
+ }
+
+ if(colNextChild!=null) {
+ SchemaPath s = new SchemaPath(colNextChild.getNameSegment());
+ c.add(s);
+ }
if (rep != Repetition.REPEATED) {
- DrillParquetGroupConverter converter = new DrillParquetGroupConverter(mutator, mapWriter.map(type.getName()), type.asGroupType());
+ DrillParquetGroupConverter converter = new DrillParquetGroupConverter(mutator, mapWriter.map(name), type.asGroupType(), c);
converters.add(converter);
} else {
- DrillParquetGroupConverter converter = new DrillParquetGroupConverter(mutator, mapWriter.list(type.getName()).map(), type.asGroupType());
+ DrillParquetGroupConverter converter = new DrillParquetGroupConverter(mutator, mapWriter.list(name).map(), type.asGroupType(), c);
converters.add(converter);
}
} else {
- PrimitiveConverter converter = getConverterForType(type.asPrimitiveType());
+ PrimitiveConverter converter = getConverterForType(name, type.asPrimitiveType());
converters.add(converter);
}
}
}
- private PrimitiveConverter getConverterForType(PrimitiveType type) {
+ private PrimitiveConverter getConverterForType(String name, PrimitiveType type) {
- String name = type.getName();
switch(type.getPrimitiveTypeName()) {
case INT32: {
if (type.getOriginalType() == null) {
http://git-wip-us.apache.org/repos/asf/drill/blob/41e444dd/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java
index c3e8330..4d223f0 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java
@@ -44,6 +44,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import parquet.column.ColumnDescriptor;
import parquet.hadoop.CodecFactoryExposer;
import parquet.hadoop.ColumnChunkIncReadStore;
import parquet.hadoop.metadata.BlockMetaData;
@@ -56,9 +57,12 @@ import parquet.io.MessageColumnIO;
import parquet.schema.GroupType;
import parquet.schema.MessageType;
import parquet.schema.Type;
+import parquet.schema.PrimitiveType;
+
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
+import parquet.schema.Types;
public class DrillParquetReader extends AbstractRecordReader {
@@ -96,31 +100,51 @@ public class DrillParquetReader extends AbstractRecordReader {
public static MessageType getProjection(MessageType schema, Collection<SchemaPath> columns) {
MessageType projection = null;
+
+ String messageName = schema.getName();
+ List<ColumnDescriptor> schemaColumns = schema.getColumns();
+
for (SchemaPath path : columns) {
- List<String> segments = Lists.newArrayList();
- PathSegment rootSegment = path.getRootSegment();
- PathSegment seg = rootSegment;
- String messageName = schema.getName();
- while(seg != null){
- if(seg.isNamed()) {
- segments.add(seg.getNameSegment().getPath());
+ for (ColumnDescriptor colDesc: schemaColumns) {
+ String[] schemaColDesc = colDesc.getPath();
+ SchemaPath schemaPath = SchemaPath.getCompoundPath(schemaColDesc);
+
+ PathSegment schemaSeg = schemaPath.getRootSegment();
+ PathSegment colSeg = path.getRootSegment();
+ List<String> segments = Lists.newArrayList();
+ List<String> colSegments = Lists.newArrayList();
+ while(schemaSeg != null && colSeg != null){
+ if (colSeg.isNamed()) {
+ // DRILL-1739 - Use case insensitive name comparison
+ if(schemaSeg.getNameSegment().getPath().equalsIgnoreCase(colSeg.getNameSegment().getPath())) {
+ segments.add(schemaSeg.getNameSegment().getPath());
+ colSegments.add(colSeg.getNameSegment().getPath());
+ }else{
+ break;
+ }
+ }else{
+ colSeg=colSeg.getChild();
+ continue;
+ }
+ colSeg = colSeg.getChild();
+ schemaSeg = schemaSeg.getChild();
}
- seg = seg.getChild();
- }
- String[] pathSegments = new String[segments.size()];
- segments.toArray(pathSegments);
- Type type = null;
- try {
- type = schema.getType(pathSegments);
- } catch (InvalidRecordException e) {
- logger.warn("Invalid record" , e);
- }
- if (type != null) {
- Type t = getType(pathSegments, 0, schema);
- if (projection == null) {
- projection = new MessageType(messageName, t);
- } else {
- projection = projection.union(new MessageType(messageName, t));
+ // Field exists in schema
+ if (!segments.isEmpty()) {
+ String[] pathSegments = new String[segments.size()];
+ segments.toArray(pathSegments);
+ String[] colPathSegments = new String[colSegments.size()];
+ colSegments.toArray(colPathSegments);
+
+ // Use the field names from the schema or we get an exception if the case of the name doesn't match
+ Type t = getType(colPathSegments, pathSegments, 0, schema);
+
+ if (projection == null) {
+ projection = new MessageType(messageName, t);
+ } else {
+ projection = projection.union(new MessageType(messageName, t));
+ }
+ break;
}
}
}
@@ -184,7 +208,7 @@ public class DrillParquetReader extends AbstractRecordReader {
}
writer = new VectorContainerWriter(output);
- recordMaterializer = new DrillParquetRecordMaterializer(output, writer, projection);
+ recordMaterializer = new DrillParquetRecordMaterializer(output, writer, projection, getColumns());
primitiveVectors = writer.getMapVector().getPrimitiveVectors();
recordReader = columnIO.getRecordReader(pageReadStore, recordMaterializer);
} catch (Exception e) {
@@ -192,13 +216,29 @@ public class DrillParquetReader extends AbstractRecordReader {
}
}
- private static Type getType(String[] pathSegments, int depth, MessageType schema) {
+ private static Type getType(String[] colSegs, String[] pathSegments, int depth, MessageType schema) {
Type type = schema.getType(Arrays.copyOfRange(pathSegments, 0, depth + 1));
+ //String name = colSegs[depth]; // get the name from the column list not the schema
if (depth + 1 == pathSegments.length) {
+ //type.name = colSegs[depth];
+ //Type newType = type;
+
+ //if(type.isPrimitive()){
+ // //newType = new PrimitiveType(type.getRepetition(), type.asPrimitiveType().getPrimitiveTypeName(), name, type.getOriginalType());
+ // Types.PrimitiveBuilder<PrimitiveType> builder = Types.primitive(
+ // type.asPrimitiveType().getPrimitiveTypeName(), type.getRepetition());
+ // if (PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY == type.asPrimitiveType().getPrimitiveTypeName()) {
+ // builder.length(type.asPrimitiveType().getTypeLength());
+ // }
+ // newType = builder.named(name);
+ //}else{
+ // //newType = new GroupType(type.getRepetition(), name, type);
+ // newType = new GroupType(type.getRepetition(), name, type.asGroupType().getFields());
+ //}
return type;
} else {
Preconditions.checkState(!type.isPrimitive());
- return new GroupType(type.getRepetition(), type.getName(), getType(pathSegments, depth + 1, schema));
+ return new GroupType(type.getRepetition(), type.getName(), getType(colSegs, pathSegments, depth + 1, schema));
}
}
@@ -254,5 +294,16 @@ public class DrillParquetReader extends AbstractRecordReader {
this.operatorContext = operatorContext;
}
+ static public class ProjectedColumnType{
+ ProjectedColumnType(String projectedColumnName, MessageType type){
+ this.projectedColumnName=projectedColumnName;
+ this.type=type;
+ }
+
+ public String projectedColumnName;
+ public MessageType type;
+
+
+ }
}
http://git-wip-us.apache.org/repos/asf/drill/blob/41e444dd/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetRecordMaterializer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetRecordMaterializer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetRecordMaterializer.java
index f9c3480..720e8be 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetRecordMaterializer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetRecordMaterializer.java
@@ -17,6 +17,7 @@
*/
package org.apache.drill.exec.store.parquet2;
+import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.exec.physical.impl.OutputMutator;
import org.apache.drill.exec.vector.complex.writer.BaseWriter.ComplexWriter;
@@ -24,14 +25,17 @@ import parquet.io.api.GroupConverter;
import parquet.io.api.RecordMaterializer;
import parquet.schema.MessageType;
+import java.util.Collection;
+import java.util.List;
+
public class DrillParquetRecordMaterializer extends RecordMaterializer<Void> {
public DrillParquetGroupConverter root;
private ComplexWriter complexWriter;
- public DrillParquetRecordMaterializer(OutputMutator mutator, ComplexWriter complexWriter, MessageType schema) {
+ public DrillParquetRecordMaterializer(OutputMutator mutator, ComplexWriter complexWriter, MessageType schema, Collection<SchemaPath> columns) {
this.complexWriter = complexWriter;
- root = new DrillParquetGroupConverter(mutator, complexWriter.rootAsMap(), schema);
+ root = new DrillParquetGroupConverter(mutator, complexWriter.rootAsMap(), schema, columns);
}
public void setPosition(int position) {
[4/4] drill git commit: DRILL-1743: check capacity before writing
into map vector
Posted by pa...@apache.org.
DRILL-1743: check capacity before writing into map vector
Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/3119011d
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/3119011d
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/3119011d
Branch: refs/heads/master
Commit: 3119011df44d28d7590de6613f536d64366a1736
Parents: a94a028
Author: Hanifi Gunes <hg...@maprtech.com>
Authored: Wed Nov 19 18:06:01 2014 -0800
Committer: Parth Chandra <pc...@maprtech.com>
Committed: Wed Nov 26 14:54:36 2014 -0800
----------------------------------------------------------------------
.../java-exec/src/main/codegen/templates/MapWriters.java | 11 +++++++----
1 file changed, 7 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/drill/blob/3119011d/exec/java-exec/src/main/codegen/templates/MapWriters.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/codegen/templates/MapWriters.java b/exec/java-exec/src/main/codegen/templates/MapWriters.java
index 1fdb4db..b8bd73e 100644
--- a/exec/java-exec/src/main/codegen/templates/MapWriters.java
+++ b/exec/java-exec/src/main/codegen/templates/MapWriters.java
@@ -129,16 +129,16 @@ public class ${mode}MapWriter extends AbstractFieldWriter{
}
}
-
+
public void end(){
// noop
}
<#else>
-
+
public void setValueCount(int count){
container.getMutator().setValueCount(count);
}
-
+
public void setPosition(int index){
super.setPosition(index);
for(FieldWriter w: fields.values()){
@@ -146,7 +146,10 @@ public class ${mode}MapWriter extends AbstractFieldWriter{
}
}
public void start(){
- // noop
+ // check capacity only after we have a non empty container
+ if(container.size() > 0 && ok()) {
+ checkValueCapacity();
+ }
}
public void end(){
[3/4] drill git commit: DRILL-1660: return accurate buffers size when
repeated map vector has no underlying vectors
Posted by pa...@apache.org.
DRILL-1660: return accurate buffers size when repeated map vector has no underlying vectors
Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/a94a028c
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/a94a028c
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/a94a028c
Branch: refs/heads/master
Commit: a94a028cfc37fc5b5e8eed319a389cc74682ba68
Parents: cf9e71d
Author: Hanifi Gunes <hg...@maprtech.com>
Authored: Wed Nov 12 13:03:57 2014 -0800
Committer: Parth Chandra <pc...@maprtech.com>
Committed: Wed Nov 26 14:54:25 2014 -0800
----------------------------------------------------------------------
.../fn/TestJsonReaderWithSparseFiles.java | 142 ++++++++++---------
.../complex/fn/single-record-with-empties.json | 1 +
2 files changed, 78 insertions(+), 65 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/drill/blob/a94a028c/exec/java-exec/src/test/java/org/apache/drill/exec/vector/complex/fn/TestJsonReaderWithSparseFiles.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/vector/complex/fn/TestJsonReaderWithSparseFiles.java b/exec/java-exec/src/test/java/org/apache/drill/exec/vector/complex/fn/TestJsonReaderWithSparseFiles.java
index 3cfdc1d..3765a29 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/vector/complex/fn/TestJsonReaderWithSparseFiles.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/vector/complex/fn/TestJsonReaderWithSparseFiles.java
@@ -22,11 +22,10 @@ import java.util.List;
import java.util.Objects;
import org.apache.drill.BaseTestQuery;
-import org.apache.drill.common.expression.SchemaPath;
-import org.apache.drill.exec.proto.UserBitShared;
-import org.apache.drill.exec.proto.beans.QueryResult;
import org.apache.drill.exec.record.RecordBatchLoader;
import org.apache.drill.exec.rpc.user.QueryResultBatch;
+import org.apache.drill.exec.util.JsonStringArrayList;
+import org.apache.drill.exec.util.JsonStringHashMap;
import org.apache.drill.exec.vector.ValueVector;
import org.junit.Test;
@@ -36,6 +35,55 @@ public class TestJsonReaderWithSparseFiles extends BaseTestQuery {
void apply(T param);
}
+ static class TypeConverter {
+
+ public Object convert(Object obj) {
+ if (obj == null) {
+ return null;
+ }
+ if (obj instanceof JsonStringArrayList || obj instanceof JsonStringHashMap) {
+ return obj.toString();
+ }
+ return obj;
+ }
+
+ }
+
+ static class Verifier implements Function<RecordBatchLoader> {
+
+ private final int count;
+ private final Object[][] values;
+ private final TypeConverter converter = new TypeConverter();
+
+ protected Verifier(int count, Object[][] values) {
+ this.count = count;
+ this.values = values;
+ }
+
+ @Override
+ public void apply(RecordBatchLoader loader) {
+ assert loader.getRecordCount() == count : "invalid record count returned";
+
+ Object[] row;
+ Object expected;
+ Object actual;
+ for (int r=0;r<values.length;r++) {
+ row = values[r];
+ for (int c=0; c<values[r].length; c++) {
+ expected = row[c];
+ actual = loader.getValueAccessorById(ValueVector.class, c).getValueVector().getAccessor().getObject(r);
+ actual = converter.convert(actual);
+ assert Objects.equals(actual, expected) : String.format("row:%d - col:%d - expected:%s[%s] - actual:%s[%s]",
+ r, c,
+ expected,
+ expected==null?"null":expected.getClass().getSimpleName(),
+ actual,
+ actual==null?"null":actual.getClass().getSimpleName());
+ }
+ }
+ }
+ }
+
protected void query(final String query, final Function<RecordBatchLoader> testBody) throws Exception {
List<QueryResultBatch> batches = testSqlWithResults(query);
RecordBatchLoader loader = new RecordBatchLoader(client.getAllocator());
@@ -55,73 +103,37 @@ public class TestJsonReaderWithSparseFiles extends BaseTestQuery {
@Test
public void testIfDrillCanReadSparseRecords() throws Exception {
final String sql = "select * from cp.`vector/complex/fn/sparse.json`";
- query(sql, new Function<RecordBatchLoader>() {
- @Override
- public void apply(RecordBatchLoader loader) {
- assert loader.getRecordCount() == 4 : "invalid record count returned";
-
- //XXX: make sure value order matches vector order
- final Object[][] values = new Object[][] {
- {null, null},
- {1L, null},
- {null, 2L},
- {3L, 3L}
- };
-
- Object[] row;
- Object expected;
- Object actual;
- for (int r=0;r<values.length;r++) {
- row = values[r];
- for (int c=0; c<values[r].length; c++) {
- expected = row[c];
- actual = loader.getValueAccessorById(ValueVector.class, c).getValueVector().getAccessor().getObject(r);
- assert Objects.equals(expected, actual) : String.format("row:%d - col:%d - expected:%s[%s] - actual:%s[%s]",
- r, c,
- expected,
- expected==null?"null":expected.getClass().getSimpleName(),
- actual,
- actual==null?"null":actual.getClass().getSimpleName());
- }
- }
- }
- });
+ //XXX: make sure value order matches vector order
+ final Object[][] values = new Object[][] {
+ {null, null},
+ {1L, null},
+ {null, 2L},
+ {3L, 3L}
+ };
+ query(sql, new Verifier(4, values));
}
@Test
public void testIfDrillCanReadSparseNestedRecordsWithoutRaisingException() throws Exception {
final String sql = "select * from cp.`vector/complex/fn/nested-with-nulls.json`";
- query(sql, new Function<RecordBatchLoader>() {
- @Override
- public void apply(RecordBatchLoader loader) {
- assert loader.getRecordCount() == 4 : "invalid record count returned";
-
- //XXX: make sure value order matches vector order
- final Object[][] values = new Object[][] {
- {"[{},{},{},{\"name\":\"doe\"},{}]"},
- {"[]"},
- {"[{\"name\":\"john\",\"id\":10}]"},
- {"[{},{}]"},
- };
-
- Object[] row;
- Object expected;
- Object actual;
- for (int r=0;r<values.length;r++) {
- row = values[r];
- for (int c = 0; c < values[r].length; c++) {
- expected = row[c];
- actual = loader.getValueAccessorById(ValueVector.class, c).getValueVector().getAccessor().getObject(r);
- assert Objects.equals(actual, expected) : String.format("row:%d - col:%d - expected:%s[%s] - actual:%s[%s]",
- r, c,
- expected,
- expected == null ? "null" : expected.getClass().getSimpleName(),
- actual,
- actual == null ? "null" : actual.getClass().getSimpleName());
- }
- }
- }
- });
+ //XXX: make sure value order matches vector order
+ final Object[][] values = new Object[][] {
+ {"[{},{},{},{\"name\":\"doe\"},{}]"},
+ {"[]"},
+ {"[{\"name\":\"john\",\"id\":10}]"},
+ {"[{},{}]"},
+ };
+ query(sql, new Verifier(4, values));
+ }
+
+ @Test
+ public void testIfDrillCanQuerySingleRecordWithEmpties() throws Exception {
+ final String sql = "select * from cp.`vector/complex/fn/single-record-with-empties.json`";
+ //XXX: make sure value order matches vector order
+ final Object[][] values = new Object[][] {
+ {"[{},{}]"},
+ };
+ query(sql, new Verifier(1, values));
}
}
http://git-wip-us.apache.org/repos/asf/drill/blob/a94a028c/exec/java-exec/src/test/resources/vector/complex/fn/single-record-with-empties.json
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/vector/complex/fn/single-record-with-empties.json b/exec/java-exec/src/test/resources/vector/complex/fn/single-record-with-empties.json
new file mode 100644
index 0000000..a3c2fbf
--- /dev/null
+++ b/exec/java-exec/src/test/resources/vector/complex/fn/single-record-with-empties.json
@@ -0,0 +1 @@
+{"users":[{}, {"name":null}]}
\ No newline at end of file