You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@drill.apache.org by ja...@apache.org on 2014/05/06 23:42:08 UTC
[08/15] git commit: Drill-400 change parquet reader to place
varbinary fields into VarCharVectors,
allowing them to be returned by default as UTF-8 Strings. Note that this will
only be the case with newer parquet files that were produced after Converted
Drill-400 change parquet reader to place varbinary fields into VarCharVectors, allowing them to be returned by default as UTF-8 Strings. Note that this will only be the case with newer parquet files that were produced after Converted Types were added to the format. This metadata stores the desired intrepertation of a column, but was not originally in the format. For older files you will still need to case binary data to Varchar.
Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/f071aca7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/f071aca7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/f071aca7
Branch: refs/heads/master
Commit: f071aca7f8c85eace6f96d931068bceabdb2c419
Parents: a2355d4
Author: Jason Altekruse <al...@gmail.com>
Authored: Tue Apr 22 21:31:14 2014 -0500
Committer: Steven Phillips <sp...@maprtech.com>
Committed: Mon May 5 18:51:02 2014 -0700
----------------------------------------------------------------------
exec/java-exec/pom.xml | 4 +-
.../drill/exec/store/parquet/BitReader.java | 9 +-
.../drill/exec/store/parquet/ColumnReader.java | 20 ++-
.../store/parquet/FixedByteAlignedReader.java | 5 +-
.../exec/store/parquet/NullableBitReader.java | 7 +-
.../store/parquet/NullableColumnReader.java | 19 +-
.../parquet/NullableFixedByteAlignedReader.java | 5 +-
.../exec/store/parquet/PageReadStatus.java | 6 +-
.../exec/store/parquet/ParquetRecordReader.java | 106 ++++++++---
.../exec/store/parquet/VarLenBinaryReader.java | 176 ++++++++++++++++---
.../store/parquet/ParquetRecordReaderTest.java | 21 ++-
.../store/parquet/ParquetResultListener.java | 31 +---
12 files changed, 305 insertions(+), 104 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/f071aca7/exec/java-exec/pom.xml
----------------------------------------------------------------------
diff --git a/exec/java-exec/pom.xml b/exec/java-exec/pom.xml
index 196b095..3e26662 100644
--- a/exec/java-exec/pom.xml
+++ b/exec/java-exec/pom.xml
@@ -95,7 +95,7 @@
<dependency>
<groupId>com.twitter</groupId>
<artifactId>parquet-column</artifactId>
- <version>1.2.8</version>
+ <version>1.4.0</version>
<exclusions>
<exclusion>
<groupId>org.apache.hadoop</groupId>
@@ -110,7 +110,7 @@
<dependency>
<groupId>com.twitter</groupId>
<artifactId>parquet-hadoop</artifactId>
- <version>1.2.8</version>
+ <version>1.4.0</version>
<exclusions>
<exclusion>
<groupId>org.apache.hadoop</groupId>
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/f071aca7/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/BitReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/BitReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/BitReader.java
index c489d5b..c323222 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/BitReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/BitReader.java
@@ -21,6 +21,7 @@ import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.exec.vector.BaseDataValueVector;
import org.apache.drill.exec.vector.ValueVector;
import parquet.column.ColumnDescriptor;
+import parquet.format.ConvertedType;
import parquet.hadoop.metadata.ColumnChunkMetaData;
final class BitReader extends ColumnReader {
@@ -30,8 +31,8 @@ final class BitReader extends ColumnReader {
private byte[] bytes;
BitReader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor, ColumnChunkMetaData columnChunkMetaData,
- boolean fixedLength, ValueVector v) throws ExecutionSetupException {
- super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v);
+ boolean fixedLength, ValueVector v, ConvertedType convertedType) throws ExecutionSetupException {
+ super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, convertedType);
}
@Override
@@ -47,11 +48,11 @@ final class BitReader extends ColumnReader {
bytes = pageReadStatus.pageDataByteArray;
// standard read, using memory mapping
if (pageReadStatus.bitShift == 0) {
- ((BaseDataValueVector) valueVecHolder.getValueVector()).getData().writeBytes(bytes,
+ ((BaseDataValueVector) valueVec).getData().writeBytes(bytes,
(int) readStartInBytes, (int) readLength);
} else { // read in individual values, because a bitshift is necessary with where the last page or batch ended
- vectorData = ((BaseDataValueVector) valueVecHolder.getValueVector()).getData();
+ vectorData = ((BaseDataValueVector) valueVec).getData();
nextByte = bytes[(int) Math.max(0, Math.ceil(pageReadStatus.valuesRead / 8.0) - 1)];
readLengthInBits = recordsReadInThisIteration + pageReadStatus.bitShift;
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/f071aca7/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ColumnReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ColumnReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ColumnReader.java
index d5c88ef..97ecfb8 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ColumnReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ColumnReader.java
@@ -24,17 +24,18 @@ import org.apache.drill.exec.vector.BaseValueVector;
import org.apache.drill.exec.vector.ValueVector;
import org.apache.hadoop.fs.FSDataInputStream;
import parquet.column.ColumnDescriptor;
+import parquet.format.ConvertedType;
import parquet.hadoop.metadata.ColumnChunkMetaData;
import parquet.schema.PrimitiveType;
import java.io.IOException;
-abstract class ColumnReader {
+abstract class ColumnReader<V extends ValueVector> {
final ParquetRecordReader parentReader;
// Value Vector for this column
- final VectorHolder valueVecHolder;
+ final V valueVec;
// column description from the parquet library
final ColumnDescriptor columnDescriptor;
// metadata of the column, from the parquet library
@@ -42,6 +43,8 @@ abstract class ColumnReader {
// status information on the current page
final PageReadStatus pageReadStatus;
+ final ConvertedType convertedType;
+
// quick reference to see if the field is fixed length (as this requires an instanceof)
final boolean isFixedLength;
@@ -62,16 +65,17 @@ abstract class ColumnReader {
long readStartInBytes = 0, readLength = 0, readLengthInBits = 0, recordsReadInThisIteration = 0;
protected ColumnReader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor,
- ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, ValueVector v) throws ExecutionSetupException {
+ ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, V v, ConvertedType convertedType) throws ExecutionSetupException {
this.parentReader = parentReader;
this.columnDescriptor = descriptor;
this.columnChunkMetaData = columnChunkMetaData;
this.isFixedLength = fixedLength;
+ this.convertedType = convertedType;
if (allocateSize > 1) {
- valueVecHolder = new VectorHolder(allocateSize, v);
+ valueVec = v;
} else {
- valueVecHolder = new VectorHolder(5000, v);
+ valueVec = v;
}
@@ -88,7 +92,7 @@ abstract class ColumnReader {
readLength = 0;
readLengthInBits = 0;
recordsReadInThisIteration = 0;
- vectorData = ((BaseValueVector) valueVecHolder.getValueVector()).getData();
+ vectorData = ((BaseValueVector) valueVec).getData();
do {
// if no page has been read, or all of the records have been read out of a page, read the next one
if (pageReadStatus.currentPage == null || pageReadStatus.valuesRead == pageReadStatus.currentPage.getValueCount()) {
@@ -108,11 +112,11 @@ abstract class ColumnReader {
pageReadStatus.readPosInBytes = readStartInBytes + readLength;
}
} while (valuesReadInCurrentPass < recordsToReadInThisPass && pageReadStatus.currentPage != null);
- valueVecHolder.getValueVector().getMutator().setValueCount(valuesReadInCurrentPass);
+ valueVec.getMutator().setValueCount(valuesReadInCurrentPass);
}
public void clear() {
- this.valueVecHolder.reset();
+ valueVec.clear();
this.pageReadStatus.clear();
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/f071aca7/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/FixedByteAlignedReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/FixedByteAlignedReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/FixedByteAlignedReader.java
index 4f14f60..0aa18cf 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/FixedByteAlignedReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/FixedByteAlignedReader.java
@@ -21,6 +21,7 @@ import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.exec.vector.BaseDataValueVector;
import org.apache.drill.exec.vector.ValueVector;
import parquet.column.ColumnDescriptor;
+import parquet.format.ConvertedType;
import parquet.hadoop.metadata.ColumnChunkMetaData;
class FixedByteAlignedReader extends ColumnReader {
@@ -29,8 +30,8 @@ class FixedByteAlignedReader extends ColumnReader {
FixedByteAlignedReader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor, ColumnChunkMetaData columnChunkMetaData,
- boolean fixedLength, ValueVector v) throws ExecutionSetupException {
- super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v);
+ boolean fixedLength, ValueVector v, ConvertedType convertedType) throws ExecutionSetupException {
+ super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, convertedType);
}
// this method is called by its superclass during a read loop
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/f071aca7/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/NullableBitReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/NullableBitReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/NullableBitReader.java
index 16c2715..22933ba 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/NullableBitReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/NullableBitReader.java
@@ -23,6 +23,7 @@ import org.apache.drill.exec.vector.BaseValueVector;
import org.apache.drill.exec.vector.NullableBitVector;
import org.apache.drill.exec.vector.ValueVector;
import parquet.column.ColumnDescriptor;
+import parquet.format.ConvertedType;
import parquet.hadoop.metadata.ColumnChunkMetaData;
import java.io.IOException;
@@ -38,8 +39,8 @@ import java.io.IOException;
final class NullableBitReader extends ColumnReader {
NullableBitReader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor, ColumnChunkMetaData columnChunkMetaData,
- boolean fixedLength, ValueVector v) throws ExecutionSetupException {
- super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v);
+ boolean fixedLength, ValueVector v, ConvertedType convertedType) throws ExecutionSetupException {
+ super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, convertedType);
}
@Override
@@ -52,7 +53,7 @@ final class NullableBitReader extends ColumnReader {
defLevel = pageReadStatus.definitionLevels.readInteger();
// if the value is defined
if (defLevel == columnDescriptor.getMaxDefinitionLevel()){
- if (!((NullableBitVector)valueVecHolder.getValueVector()).getMutator().setSafe(i + valuesReadInCurrentPass,
+ if (!((NullableBitVector)valueVec).getMutator().setSafe(i + valuesReadInCurrentPass,
pageReadStatus.valueReader.readBoolean() ? 1 : 0 )) {
throw new RuntimeException();
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/f071aca7/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/NullableColumnReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/NullableColumnReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/NullableColumnReader.java
index b6ae715..8faf756 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/NullableColumnReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/NullableColumnReader.java
@@ -22,6 +22,7 @@ import org.apache.drill.exec.vector.BaseValueVector;
import org.apache.drill.exec.vector.NullableVectorDefinitionSetter;
import org.apache.drill.exec.vector.ValueVector;
import parquet.column.ColumnDescriptor;
+import parquet.format.ConvertedType;
import parquet.hadoop.metadata.ColumnChunkMetaData;
import java.io.IOException;
@@ -35,8 +36,8 @@ abstract class NullableColumnReader extends ColumnReader{
int bitsUsed;
NullableColumnReader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor, ColumnChunkMetaData columnChunkMetaData,
- boolean fixedLength, ValueVector v) throws ExecutionSetupException {
- super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v);
+ boolean fixedLength, ValueVector v, ConvertedType convertedType) throws ExecutionSetupException {
+ super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, convertedType);
}
public void readAllFixedFields(long recordsToReadInThisPass, ColumnReader firstColumnStatus) throws IOException {
@@ -44,7 +45,7 @@ abstract class NullableColumnReader extends ColumnReader{
readLength = 0;
readLengthInBits = 0;
recordsReadInThisIteration = 0;
- vectorData = ((BaseValueVector)valueVecHolder.getValueVector()).getData();
+ vectorData = ((BaseValueVector)valueVec).getData();
do {
// if no page has been read, or all of the records have been read out of a page, read the next one
@@ -72,11 +73,11 @@ abstract class NullableColumnReader extends ColumnReader{
lastValueWasNull = true;
nullsFound = 0;
if (currentValueIndexInVector - totalValuesRead == recordsToReadInThisPass
- || currentValueIndexInVector >= valueVecHolder.getValueVector().getValueCapacity()){
+ || currentValueIndexInVector >= valueVec.getValueCapacity()){
break;
}
while(currentValueIndexInVector - totalValuesRead < recordsToReadInThisPass
- && currentValueIndexInVector < valueVecHolder.getValueVector().getValueCapacity()
+ && currentValueIndexInVector < valueVec.getValueCapacity()
&& pageReadStatus.valuesRead + definitionLevelsRead < pageReadStatus.currentPage.getValueCount()){
currentDefinitionLevel = pageReadStatus.definitionLevels.readInteger();
definitionLevelsRead++;
@@ -96,7 +97,7 @@ abstract class NullableColumnReader extends ColumnReader{
lastValueWasNull = false;
}
runLength++;
- ((NullableVectorDefinitionSetter)valueVecHolder.getValueVector().getMutator()).setIndexDefined(currentValueIndexInVector);
+ ((NullableVectorDefinitionSetter)valueVec.getMutator()).setIndexDefined(currentValueIndexInVector);
}
currentValueIndexInVector++;
}
@@ -104,9 +105,9 @@ abstract class NullableColumnReader extends ColumnReader{
recordsReadInThisIteration = runLength;
readField( runLength, firstColumnStatus);
- int writerIndex = ((BaseValueVector) valueVecHolder.getValueVector()).getData().writerIndex();
+ int writerIndex = ((BaseValueVector) valueVec).getData().writerIndex();
if ( dataTypeLengthInBits > 8 || (dataTypeLengthInBits < 8 && totalValuesRead + runLength % 8 == 0)){
- ((BaseValueVector) valueVecHolder.getValueVector()).getData().setIndex(0, writerIndex + (int) Math.ceil( nullsFound * dataTypeLengthInBits / 8.0));
+ ((BaseValueVector) valueVec).getData().setIndex(0, writerIndex + (int) Math.ceil( nullsFound * dataTypeLengthInBits / 8.0));
}
else if (dataTypeLengthInBits < 8){
rightBitShift += dataTypeLengthInBits * nullsFound;
@@ -125,7 +126,7 @@ abstract class NullableColumnReader extends ColumnReader{
}
}
while (valuesReadInCurrentPass < recordsToReadInThisPass && pageReadStatus.currentPage != null);
- valueVecHolder.getValueVector().getMutator().setValueCount(
+ valueVec.getMutator().setValueCount(
valuesReadInCurrentPass);
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/f071aca7/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/NullableFixedByteAlignedReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/NullableFixedByteAlignedReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/NullableFixedByteAlignedReader.java
index c2fc606..038f2d7 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/NullableFixedByteAlignedReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/NullableFixedByteAlignedReader.java
@@ -21,6 +21,7 @@ import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.exec.vector.ValueVector;
import parquet.column.ColumnDescriptor;
+import parquet.format.ConvertedType;
import parquet.hadoop.metadata.ColumnChunkMetaData;
class NullableFixedByteAlignedReader extends NullableColumnReader {
@@ -28,8 +29,8 @@ class NullableFixedByteAlignedReader extends NullableColumnReader {
private byte[] bytes;
NullableFixedByteAlignedReader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor,
- ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, ValueVector v) throws ExecutionSetupException {
- super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v);
+ ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, ValueVector v, ConvertedType convertedType) throws ExecutionSetupException {
+ super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, convertedType);
}
// this method is called by its superclass during a read loop
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/f071aca7/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/PageReadStatus.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/PageReadStatus.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/PageReadStatus.java
index 67262f6..fe83159 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/PageReadStatus.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/PageReadStatus.java
@@ -115,9 +115,9 @@ final class PageReadStatus {
if (parentColumnReader.columnDescriptor.getMaxDefinitionLevel() != 0){
definitionLevels = currentPage.getDlEncoding().getValuesReader(parentColumnReader.columnDescriptor, ValuesType.DEFINITION_LEVEL);
valueReader = currentPage.getValueEncoding().getValuesReader(parentColumnReader.columnDescriptor, ValuesType.VALUES);
- int endOfDefinitionLevels = definitionLevels.initFromPage(currentPage.getValueCount(), pageDataByteArray, 0);
- valueReader.initFromPage(currentPage.getValueCount(), pageDataByteArray, endOfDefinitionLevels);
- readPosInBytes = endOfDefinitionLevels;
+ definitionLevels.initFromPage(currentPage.getValueCount(), pageDataByteArray, 0);
+ readPosInBytes = definitionLevels.getNextOffset();
+ valueReader.initFromPage(currentPage.getValueCount(), pageDataByteArray, (int) readPosInBytes);
}
return true;
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/f071aca7/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordReader.java
index 9acb557..463f3ed 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordReader.java
@@ -21,6 +21,7 @@ import static com.google.common.base.Preconditions.checkArgument;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
import org.apache.drill.common.exceptions.DrillRuntimeException;
@@ -38,12 +39,21 @@ import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.physical.impl.OutputMutator;
import org.apache.drill.exec.record.MaterializedField;
import org.apache.drill.exec.store.RecordReader;
-import org.apache.drill.exec.vector.ValueVector;
+import org.apache.drill.exec.vector.*;
+import org.apache.drill.exec.vector.NullableVarBinaryVector;
+import org.apache.drill.exec.vector.NullableVarCharVector;
+import org.apache.drill.exec.vector.VarBinaryVector;
+import org.apache.drill.exec.vector.VarCharVector;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import parquet.column.ColumnDescriptor;
+import parquet.format.ConvertedType;
+import parquet.format.FileMetaData;
+import parquet.format.SchemaElement;
+import parquet.format.converter.ParquetMetadataConverter;
import parquet.hadoop.CodecFactoryExposer;
+import parquet.hadoop.ParquetFileWriter;
import parquet.hadoop.metadata.ColumnChunkMetaData;
import parquet.hadoop.metadata.ParquetMetadata;
import parquet.schema.PrimitiveType;
@@ -173,11 +183,23 @@ class ParquetRecordReader implements RecordReader {
int columnsToScan = 0;
MaterializedField field;
+ ParquetMetadataConverter metaConverter = new ParquetMetadataConverter();
+ FileMetaData fileMetaData;
+
+ // TODO - figure out how to deal with this better once we add nested reading, note also look where this map is used below
+ // store a map from column name to converted types if they are non-null
+ HashMap<String, ConvertedType> convertedTypes = new HashMap<>();
+ fileMetaData = new ParquetMetadataConverter().toParquetMetadata(ParquetFileWriter.CURRENT_VERSION, footer);
+ for (SchemaElement se : fileMetaData.getSchema()) {
+ convertedTypes.put(se.getName(), se.getConverted_type());
+ }
+
// loop to add up the length of the fixed width columns and build the schema
for (int i = 0; i < columns.size(); ++i) {
column = columns.get(i);
+ logger.debug("name: " + fileMetaData.getSchema().get(i).name);
field = MaterializedField.create(toFieldName(column.getPath()),
- toMajorType(column.getType(), getDataMode(column)));
+ toMajorType(column.getType(), getDataMode(column), convertedTypes.get(column.getPath()[0])));
if ( ! fieldSelected(field)){
continue;
}
@@ -203,9 +225,22 @@ class ParquetRecordReader implements RecordReader {
return;
}
if (allFieldsFixedLength) {
- recordsPerBatch = (int) Math.min(batchSize / bitWidthAllFixedFields, footer.getBlocks().get(0).getColumns().get(0).getValueCount());
+ recordsPerBatch = (int) Math.min(Math.min(batchSize / bitWidthAllFixedFields,
+ footer.getBlocks().get(0).getColumns().get(0).getValueCount()), 65535);
+ }
+ else {
+ recordsPerBatch = DEFAULT_RECORDS_TO_READ_IF_NOT_FIXED_WIDTH;
}
+// for (SchemaElement se : fileMetaData.getSchema()) {
+// if (fieldSelected())
+// System.out.println("convertedtype :" + se.getConverted_type());
+// System.out.println("name:" + se.getName());
+// System.out.println();
+//
+// }
try {
+ ValueVector v;
+ ConvertedType convertedType;
ArrayList<VarLenBinaryReader.VarLengthColumn> varLengthColumns = new ArrayList<>();
ArrayList<VarLenBinaryReader.NullableVarLengthColumn> nullableVarLengthColumns = new ArrayList<>();
// initialize all of the column read status objects
@@ -213,21 +248,38 @@ class ParquetRecordReader implements RecordReader {
for (int i = 0; i < columns.size(); ++i) {
column = columns.get(i);
columnChunkMetaData = footer.getBlocks().get(0).getColumns().get(i);
- MajorType type = toMajorType(column.getType(), getDataMode(column));
+ convertedType = convertedTypes.get(column.getPath()[0]);
+ MajorType type = toMajorType(column.getType(), getDataMode(column), convertedType);
field = MaterializedField.create(toFieldName(column.getPath()), type);
// the field was not requested to be read
if ( ! fieldSelected(field)) continue;
+ //convertedTypes.put()
fieldFixedLength = column.getType() != PrimitiveType.PrimitiveTypeName.BINARY;
- ValueVector v = output.addField(field, TypeHelper.getValueVectorClass(type.getMinorType(), type.getMode()));
+ v = output.addField(field, TypeHelper.getValueVectorClass(type.getMinorType(), type.getMode()));
if (column.getType() != PrimitiveType.PrimitiveTypeName.BINARY) {
- createFixedColumnReader(fieldFixedLength, column, columnChunkMetaData, recordsPerBatch, v);
+ createFixedColumnReader(fieldFixedLength, column, columnChunkMetaData, recordsPerBatch, v,
+ convertedType);
} else {
if (column.getMaxDefinitionLevel() == 0){// column is required
- varLengthColumns.add(new VarLenBinaryReader.VarLengthColumn(this, -1, column, columnChunkMetaData, false, v));
+ if (convertedType == ConvertedType.UTF8) {
+ varLengthColumns.add(
+ new VarLenBinaryReader.VarCharColumn(this, -1, column, columnChunkMetaData, false, (VarCharVector) v, convertedType));
+ } else {
+ varLengthColumns.add(
+ new VarLenBinaryReader.VarBinaryColumn(this, -1, column, columnChunkMetaData, false, (VarBinaryVector) v, convertedType));
+ }
}
else{
- nullableVarLengthColumns.add(new VarLenBinaryReader.NullableVarLengthColumn(this, -1, column, columnChunkMetaData, false, v));
+ if (convertedType == ConvertedType.UTF8) {
+ nullableVarLengthColumns.add(
+ new VarLenBinaryReader.NullableVarCharColumn(this, -1, column, columnChunkMetaData, false,
+ (NullableVarCharVector) v, convertedType));
+ } else {
+ nullableVarLengthColumns.add(
+ new VarLenBinaryReader.NullableVarBinaryColumn(this, -1, column, columnChunkMetaData, false,
+ (NullableVarBinaryVector) v, convertedType));
+ }
}
}
}
@@ -259,15 +311,15 @@ class ParquetRecordReader implements RecordReader {
private void resetBatch() {
for (ColumnReader column : columnStatuses) {
- column.valueVecHolder.reset();
+ AllocationHelper.allocate(column.valueVec, recordsPerBatch, 10, 5);
column.valuesReadInCurrentPass = 0;
}
for (VarLenBinaryReader.VarLengthColumn r : varLengthReader.columns){
- r.valueVecHolder.reset();
+ AllocationHelper.allocate(r.valueVec, recordsPerBatch, 10, 5);
r.valuesReadInCurrentPass = 0;
}
for (VarLenBinaryReader.NullableVarLengthColumn r : varLengthReader.nullableColumns){
- r.valueVecHolder.reset();
+ AllocationHelper.allocate(r.valueVec, recordsPerBatch, 10, 5);
r.valuesReadInCurrentPass = 0;
}
}
@@ -281,28 +333,29 @@ class ParquetRecordReader implements RecordReader {
* @throws SchemaChangeException
*/
private boolean createFixedColumnReader(boolean fixedLength, ColumnDescriptor descriptor,
- ColumnChunkMetaData columnChunkMetaData, int allocateSize, ValueVector v)
+ ColumnChunkMetaData columnChunkMetaData, int allocateSize, ValueVector v,
+ ConvertedType convertedType)
throws SchemaChangeException, ExecutionSetupException {
// if the column is required
if (descriptor.getMaxDefinitionLevel() == 0){
if (columnChunkMetaData.getType() == PrimitiveType.PrimitiveTypeName.BOOLEAN){
columnStatuses.add(new BitReader(this, allocateSize, descriptor, columnChunkMetaData,
- fixedLength, v));
+ fixedLength, v, convertedType));
}
else{
columnStatuses.add(new FixedByteAlignedReader(this, allocateSize, descriptor, columnChunkMetaData,
- fixedLength, v));
+ fixedLength, v, convertedType));
}
return true;
}
else { // if the column is nullable
if (columnChunkMetaData.getType() == PrimitiveType.PrimitiveTypeName.BOOLEAN){
columnStatuses.add(new NullableBitReader(this, allocateSize, descriptor, columnChunkMetaData,
- fixedLength, v));
+ fixedLength, v, convertedType));
}
else{
columnStatuses.add(new NullableFixedByteAlignedReader(this, allocateSize, descriptor, columnChunkMetaData,
- fixedLength, v));
+ fixedLength, v, convertedType));
}
return true;
}
@@ -363,18 +416,21 @@ class ParquetRecordReader implements RecordReader {
}
static TypeProtos.MajorType toMajorType(PrimitiveType.PrimitiveTypeName primitiveTypeName,
- TypeProtos.DataMode mode) {
- return toMajorType(primitiveTypeName, 0, mode);
+ TypeProtos.DataMode mode, ConvertedType convertedType) {
+ return toMajorType(primitiveTypeName, 0, mode, convertedType);
}
static TypeProtos.MajorType toMajorType(PrimitiveType.PrimitiveTypeName primitiveTypeName, int length,
- TypeProtos.DataMode mode) {
+ TypeProtos.DataMode mode, ConvertedType convertedType) {
switch (mode) {
case OPTIONAL:
switch (primitiveTypeName) {
case BINARY:
- return Types.optional(TypeProtos.MinorType.VARBINARY);
+ if (convertedType == ConvertedType.UTF8)
+ return Types.optional(TypeProtos.MinorType.VARCHAR);
+ else
+ return Types.optional(TypeProtos.MinorType.VARBINARY);
case INT64:
return Types.optional(TypeProtos.MinorType.BIGINT);
case INT32:
@@ -400,7 +456,10 @@ class ParquetRecordReader implements RecordReader {
case REQUIRED:
switch (primitiveTypeName) {
case BINARY:
- return Types.required(TypeProtos.MinorType.VARBINARY);
+ if (convertedType == ConvertedType.UTF8)
+ return Types.required(TypeProtos.MinorType.VARCHAR);
+ else
+ return Types.required(TypeProtos.MinorType.VARBINARY);
case INT64:
return Types.required(TypeProtos.MinorType.BIGINT);
case INT32:
@@ -426,7 +485,10 @@ class ParquetRecordReader implements RecordReader {
case REPEATED:
switch (primitiveTypeName) {
case BINARY:
- return Types.repeated(TypeProtos.MinorType.VARBINARY);
+ if (convertedType == ConvertedType.UTF8)
+ return Types.required(TypeProtos.MinorType.VARCHAR);
+ else
+ return Types.repeated(TypeProtos.MinorType.VARBINARY);
case INT64:
return Types.repeated(TypeProtos.MinorType.BIGINT);
case INT32:
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/f071aca7/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/VarLenBinaryReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/VarLenBinaryReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/VarLenBinaryReader.java
index 09d19a8..ae01104 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/VarLenBinaryReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/VarLenBinaryReader.java
@@ -18,11 +18,14 @@
package org.apache.drill.exec.store.parquet;
import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.exec.vector.*;
import org.apache.drill.exec.vector.NullableVarBinaryVector;
-import org.apache.drill.exec.vector.ValueVector;
+import org.apache.drill.exec.vector.NullableVarCharVector;
import org.apache.drill.exec.vector.VarBinaryVector;
+import org.apache.drill.exec.vector.VarCharVector;
import parquet.bytes.BytesUtils;
import parquet.column.ColumnDescriptor;
+import parquet.format.ConvertedType;
import parquet.hadoop.metadata.ColumnChunkMetaData;
import java.io.IOException;
@@ -42,25 +45,156 @@ public class VarLenBinaryReader {
this.columns = columns;
}
- public static class VarLengthColumn extends ColumnReader {
+ public static abstract class VarLengthColumn<V extends ValueVector> extends ColumnReader {
- VarLengthColumn(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor, ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, ValueVector v) throws ExecutionSetupException {
- super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v);
+ VarLengthColumn(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor,
+ ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, V v,
+ ConvertedType convertedType) throws ExecutionSetupException {
+ super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, convertedType);
}
@Override
protected void readField(long recordsToRead, ColumnReader firstColumnStatus) {
throw new UnsupportedOperationException();
}
+
+ public abstract boolean setSafe(int index, byte[] bytes, int start, int length);
+
+ public abstract int capacity();
+
+ }
+
+ public static abstract class NullableVarLengthColumn<V extends ValueVector> extends ColumnReader {
+
+ int nullsRead;
+ boolean currentValNull = false;
+
+ NullableVarLengthColumn(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor,
+ ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, V v,
+ ConvertedType convertedType ) throws ExecutionSetupException {
+ super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, convertedType);
+ }
+
+ public abstract boolean setSafe(int index, byte[] value, int start, int length);
+
+ public abstract int capacity();
+
+ @Override
+ protected void readField(long recordsToRead, ColumnReader firstColumnStatus) {
+ throw new UnsupportedOperationException();
+ }
+ }
+
+ public static class VarCharColumn extends VarLengthColumn <VarCharVector> {
+
+ // store a hard reference to the vector (which is also stored in the superclass) to prevent repetitive casting
+ protected VarCharVector varCharVector;
+
+ VarCharColumn(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor,
+ ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, VarCharVector v,
+ ConvertedType convertedType) throws ExecutionSetupException {
+ super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, convertedType);
+ varCharVector = v;
+ }
+
+ @Override
+ protected void readField(long recordsToRead, ColumnReader firstColumnStatus) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean setSafe(int index, byte[] bytes, int start, int length) {
+ return varCharVector.getMutator().setSafe(valuesReadInCurrentPass, bytes,
+ (int) (pageReadStatus.readPosInBytes + 4), dataTypeLengthInBits);
+ }
+
+ @Override
+ public int capacity() {
+ return varCharVector.getData().capacity();
+ }
}
- public static class NullableVarLengthColumn extends ColumnReader {
+ public static class NullableVarCharColumn extends NullableVarLengthColumn <NullableVarCharVector> {
int nullsRead;
boolean currentValNull = false;
+ // store a hard reference to the vector (which is also stored in the superclass) to prevent repetitive casting
+ protected NullableVarCharVector nullableVarCharVector;
+
+ NullableVarCharColumn(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor,
+ ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, NullableVarCharVector v,
+ ConvertedType convertedType ) throws ExecutionSetupException {
+ super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, convertedType);
+ nullableVarCharVector = v;
+ }
- NullableVarLengthColumn(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor, ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, ValueVector v) throws ExecutionSetupException {
- super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v);
+ public boolean setSafe(int index, byte[] value, int start, int length) {
+ return nullableVarCharVector.getMutator().setSafe(index, value,
+ start, length);
+ }
+
+ @Override
+ public int capacity() {
+ return nullableVarCharVector.getData().capacity();
+ }
+
+ @Override
+ protected void readField(long recordsToRead, ColumnReader firstColumnStatus) {
+ throw new UnsupportedOperationException();
+ }
+ }
+
+ public static class VarBinaryColumn extends VarLengthColumn <VarBinaryVector> {
+
+ // store a hard reference to the vector (which is also stored in the superclass) to prevent repetitive casting
+ protected VarBinaryVector varBinaryVector;
+
+ VarBinaryColumn(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor,
+ ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, VarBinaryVector v,
+ ConvertedType convertedType) throws ExecutionSetupException {
+ super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, convertedType);
+ varBinaryVector = v;
+ }
+
+ @Override
+ protected void readField(long recordsToRead, ColumnReader firstColumnStatus) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean setSafe(int index, byte[] bytes, int start, int length) {
+ return varBinaryVector.getMutator().setSafe(valuesReadInCurrentPass, bytes,
+ (int) (pageReadStatus.readPosInBytes + 4), dataTypeLengthInBits);
+ }
+
+ @Override
+ public int capacity() {
+ return varBinaryVector.getData().capacity();
+ }
+ }
+
+ public static class NullableVarBinaryColumn extends NullableVarLengthColumn <NullableVarBinaryVector> {
+
+ int nullsRead;
+ boolean currentValNull = false;
+ // store a hard reference to the vector (which is also stored in the superclass) to prevent repetitive casting
+ protected NullableVarBinaryVector nullableVarBinaryVector;
+
+ NullableVarBinaryColumn(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor,
+ ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, NullableVarBinaryVector v,
+ ConvertedType convertedType ) throws ExecutionSetupException {
+ super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, convertedType);
+ nullableVarBinaryVector = v;
+ }
+
+ public boolean setSafe(int index, byte[] value, int start, int length) {
+ return nullableVarBinaryVector.getMutator().setSafe(index, value,
+ start, length);
+ }
+
+ @Override
+ public int capacity() {
+ return nullableVarBinaryVector.getData().capacity();
}
@Override
@@ -83,8 +217,6 @@ public class VarLenBinaryReader {
int lengthVarFieldsInCurrentRecord;
boolean rowGroupFinished = false;
byte[] bytes;
- VarBinaryVector currVec;
- NullableVarBinaryVector currNullVec;
// write the first 0 offset
for (ColumnReader columnReader : columns) {
columnReader.bytesReadInCurrentPass = 0;
@@ -98,8 +230,8 @@ public class VarLenBinaryReader {
}
outer: do {
lengthVarFieldsInCurrentRecord = 0;
- for (ColumnReader columnReader : columns) {
- if (recordsReadInCurrentPass == columnReader.valueVecHolder.getValueVector().getValueCapacity()){
+ for (VarLengthColumn columnReader : columns) {
+ if (recordsReadInCurrentPass == columnReader.valueVec.getValueCapacity()){
rowGroupFinished = true;
break;
}
@@ -118,7 +250,7 @@ public class VarLenBinaryReader {
(int) columnReader.pageReadStatus.readPosInBytes);
lengthVarFieldsInCurrentRecord += columnReader.dataTypeLengthInBits;
- if (columnReader.bytesReadInCurrentPass + columnReader.dataTypeLengthInBits > ((VarBinaryVector) columnReader.valueVecHolder.getValueVector()).getData().capacity()) {
+ if (columnReader.bytesReadInCurrentPass + columnReader.dataTypeLengthInBits > columnReader.capacity()) {
break outer;
}
@@ -126,7 +258,7 @@ public class VarLenBinaryReader {
for (NullableVarLengthColumn columnReader : nullableColumns) {
// check to make sure there is capacity for the next value (for nullables this is a check to see if there is
// still space in the nullability recording vector)
- if (recordsReadInCurrentPass == columnReader.valueVecHolder.getValueVector().getValueCapacity()){
+ if (recordsReadInCurrentPass == columnReader.valueVec.getValueCapacity()){
rowGroupFinished = true;
break;
}
@@ -151,7 +283,7 @@ public class VarLenBinaryReader {
(int) columnReader.pageReadStatus.readPosInBytes);
lengthVarFieldsInCurrentRecord += columnReader.dataTypeLengthInBits;
- if (columnReader.bytesReadInCurrentPass + columnReader.dataTypeLengthInBits > ((NullableVarBinaryVector) columnReader.valueVecHolder.getValueVector()).getData().capacity()) {
+ if (columnReader.bytesReadInCurrentPass + columnReader.dataTypeLengthInBits > columnReader.capacity()) {
break outer;
}
}
@@ -160,12 +292,11 @@ public class VarLenBinaryReader {
> parentReader.getBatchSize()){
break outer;
}
- for (ColumnReader columnReader : columns) {
+ for (VarLengthColumn columnReader : columns) {
bytes = columnReader.pageReadStatus.pageDataByteArray;
- currVec = (VarBinaryVector) columnReader.valueVecHolder.getValueVector();
// again, I am re-purposing the unused field here, it is a length n BYTES, not bits
- boolean success = currVec.getMutator().setSafe(columnReader.valuesReadInCurrentPass, bytes,
- (int) columnReader.pageReadStatus.readPosInBytes + 4, columnReader.dataTypeLengthInBits);
+ boolean success = columnReader.setSafe(columnReader.valuesReadInCurrentPass, bytes,
+ (int) columnReader.pageReadStatus.readPosInBytes + 4, columnReader.dataTypeLengthInBits);
assert success;
columnReader.pageReadStatus.readPosInBytes += columnReader.dataTypeLengthInBits + 4;
columnReader.bytesReadInCurrentPass += columnReader.dataTypeLengthInBits + 4;
@@ -174,11 +305,10 @@ public class VarLenBinaryReader {
}
for (NullableVarLengthColumn columnReader : nullableColumns) {
bytes = columnReader.pageReadStatus.pageDataByteArray;
- currNullVec = (NullableVarBinaryVector) columnReader.valueVecHolder.getValueVector();
// again, I am re-purposing the unused field here, it is a length n BYTES, not bits
if (!columnReader.currentValNull && columnReader.dataTypeLengthInBits > 0){
- boolean success = currNullVec.getMutator().setSafe(columnReader.valuesReadInCurrentPass, bytes,
- (int) columnReader.pageReadStatus.readPosInBytes + 4, columnReader.dataTypeLengthInBits);
+ boolean success = columnReader.setSafe(columnReader.valuesReadInCurrentPass, bytes,
+ (int) columnReader.pageReadStatus.readPosInBytes + 4, columnReader.dataTypeLengthInBits);
assert success;
}
columnReader.currentValNull = false;
@@ -195,10 +325,10 @@ public class VarLenBinaryReader {
recordsReadInCurrentPass++;
} while (recordsReadInCurrentPass < recordsToReadInThisPass);
for (VarLengthColumn columnReader : columns) {
- columnReader.valueVecHolder.getValueVector().getMutator().setValueCount((int) recordsReadInCurrentPass);
+ columnReader.valueVec.getMutator().setValueCount((int) recordsReadInCurrentPass);
}
for (NullableVarLengthColumn columnReader : nullableColumns) {
- columnReader.valueVecHolder.getValueVector().getMutator().setValueCount((int) recordsReadInCurrentPass);
+ columnReader.valueVec.getMutator().setValueCount((int) recordsReadInCurrentPass);
}
return recordsReadInCurrentPass;
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/f071aca7/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java
index 9ba94fa..67b5394 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java
@@ -103,6 +103,11 @@ public class ParquetRecordReaderTest extends BaseTestQuery{
testParquetFullEngineLocalPath(planName, fileName, 2, numberRowGroups, recordsPerRowGroup);
}
+ public String getPlanForFile(String pathFileName, String parquetFileName) throws IOException {
+ return Files.toString(FileUtils.getResourceAsFile(pathFileName), Charsets.UTF_8)
+ .replaceFirst("&REPLACED_IN_PARQUET_TEST&", parquetFileName);
+ }
+
@Test
public void testMultipleRowGroupsAndReads2() throws Exception {
String readEntries;
@@ -273,15 +278,27 @@ public class ParquetRecordReaderTest extends BaseTestQuery{
@Ignore
@Test
+ /**
+ * Tests the reading of nullable var length columns, runs the tests twice, once on a file that has
+ * a converted type of UTF-8 to make sure it can be read
+ */
public void testNullableColumnsVarLen() throws Exception {
HashMap<String, FieldInfo> fields = new HashMap<>();
ParquetTestProperties props = new ParquetTestProperties(1, 300000, DEFAULT_BYTES_PER_PAGE, fields);
byte[] val = {'b'};
byte[] val2 = {'b', '2'};
- byte[] val3 = { 'l','o','n','g','e','r',' ','s','t','r','i','n','g'};
- Object[] boolVals = { val, val2, val3};
+ byte[] val3 = {'b', '3'};
+ byte[] val4 = { 'l','o','n','g','e','r',' ','s','t','r','i','n','g'};
+ Object[] boolVals = { val, val2, val4};
props.fields.put("a", new FieldInfo("boolean", "a", 1, boolVals, TypeProtos.MinorType.BIT, props));
+ //
testParquetFullEngineEventBased(false, "/parquet/parquet_nullable_varlen.json", "/tmp/nullable_varlen.parquet", 1, props);
+ fields.clear();
+ // pass strings instead of byte arrays
+ Object[] boolVals2 = { "b", "b2", "b3"};
+ props.fields.put("a", new FieldInfo("boolean", "a", 1, boolVals2, TypeProtos.MinorType.BIT, props));
+ testParquetFullEngineEventBased(false, "/parquet/parquet_scan_screen_read_entry_replace.json",
+ "\"/tmp/varLen.parquet/a\"", "unused", 1, props);
}
@Test
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/f071aca7/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetResultListener.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetResultListener.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetResultListener.java
index 73af98c..257a49e 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetResultListener.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetResultListener.java
@@ -78,7 +78,10 @@ public class ParquetResultListener implements UserResultsListener {
T val = (T) valueVector.getAccessor().getObject(index);
if (val instanceof byte[]) {
- assertEquals(true, Arrays.equals((byte[]) value, (byte[]) val));
+ assert(Arrays.equals((byte[]) value, (byte[]) val));
+ }
+ else if (val instanceof String) {
+ assert(val.equals(value));
} else {
assertEquals(value, val);
}
@@ -120,16 +123,7 @@ public class ParquetResultListener implements UserResultsListener {
}
for (int j = 0; j < vv.getAccessor().getValueCount(); j++) {
if (ParquetRecordReaderTest.VERBOSE_DEBUG){
- if (vv.getAccessor().getObject(j) instanceof byte[]){
- System.out.print("[len:" + ((byte[]) vv.getAccessor().getObject(j)).length + " - (");
- for (int k = 0; k < ((byte[]) vv.getAccessor().getObject(j)).length; k++){
- System.out.print((char)((byte[])vv.getAccessor().getObject(j))[k] + ",");
- }
- System.out.print(") ]");
- }
- else{
- System.out.print(Strings.padStart(vv.getAccessor().getObject(j) + "", 20, ' ') + " ");
- }
+ System.out.print(Strings.padStart(vv.getAccessor().getObject(j) + "", 20, ' ') + " ");
System.out.print(", " + (j % 25 == 0 ? "\n batch:" + batchCounter + " v:" + j + " - " : ""));
}
if (testValues){
@@ -161,20 +155,9 @@ public class ParquetResultListener implements UserResultsListener {
for (VectorWrapper vw : batchLoader) {
ValueVector v = vw.getValueVector();
- if (v.getAccessor().getObject(i) instanceof byte[]){
- System.out.print("[len:" + ((byte[]) v.getAccessor().getObject(i)).length + " - (");
- for (int j = 0; j < ((byte[]) v.getAccessor().getObject(i)).length; j++){
- System.out.print(((byte[])v.getAccessor().getObject(i))[j] + ",");
- }
- System.out.print(") ]");
- }
- else{
- System.out.print(Strings.padStart(v.getAccessor().getObject(i) + "", 20, ' ') + " ");
- }
+ System.out.print(Strings.padStart(v.getAccessor().getObject(i) + "", 20, ' ') + " ");
}
- System.out.println(
-
- );
+ System.out.println();
}
}
batchCounter++;