You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by ti...@apache.org on 2018/06/30 01:52:27 UTC
[drill] branch master updated: DRILL-6147: Adding Columnar Parquet
Batch Sizing functionality
This is an automated email from the ASF dual-hosted git repository.
timothyfarkas pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git
The following commit(s) were added to refs/heads/master by this push:
new f481a7c DRILL-6147: Adding Columnar Parquet Batch Sizing functionality
f481a7c is described below
commit f481a7c2833b8c7ebabe02a37590cdd3e559ca5e
Author: Salim Achouche <sa...@gmail.com>
AuthorDate: Tue Jun 19 19:23:41 2018 -0700
DRILL-6147: Adding Columnar Parquet Batch Sizing functionality
closes #1330
---
.../java/org/apache/drill/exec/ExecConstants.java | 16 +
.../apache/drill/exec/physical/impl/ScanBatch.java | 45 ++
.../exec/server/options/SystemOptionManager.java | 4 +
.../store/parquet/columnreaders/BatchReader.java | 53 +-
.../store/parquet/columnreaders/BitReader.java | 4 +-
.../store/parquet/columnreaders/ColumnReader.java | 2 +-
.../parquet/columnreaders/ColumnReaderFactory.java | 115 ++--
.../columnreaders/FixedByteAlignedReader.java | 32 +-
.../columnreaders/FixedWidthRepeatedReader.java | 6 +-
.../parquet/columnreaders/NullableBitReader.java | 4 +-
.../columnreaders/NullableColumnReader.java | 4 +-
.../NullableFixedByteAlignedReaders.java | 64 +-
.../NullableVarLengthValuesColumn.java | 4 +-
.../columnreaders/ParquetColumnMetadata.java | 18 +-
.../ParquetFixedWidthDictionaryReaders.java | 44 +-
.../parquet/columnreaders/ParquetRecordReader.java | 74 ++-
.../store/parquet/columnreaders/ParquetSchema.java | 18 +-
.../store/parquet/columnreaders/ReadState.java | 76 ++-
.../columnreaders/VarLenAbstractEntryReader.java | 75 +--
.../VarLenAbstractPageEntryReader.java | 100 ++++
.../parquet/columnreaders/VarLenBinaryReader.java | 239 +++++++-
.../columnreaders/VarLenBulkPageReader.java | 78 ++-
.../columnreaders/VarLenColumnBulkEntry.java | 16 +
.../columnreaders/VarLenColumnBulkInput.java | 272 +++++++--
.../columnreaders/VarLenEntryDictionaryReader.java | 14 +-
.../parquet/columnreaders/VarLenEntryReader.java | 30 +-
.../columnreaders/VarLenFixedEntryReader.java | 8 +-
.../VarLenNullableDictionaryReader.java | 16 +-
.../columnreaders/VarLenNullableEntryReader.java | 14 +-
.../VarLenNullableFixedEntryReader.java | 8 +-
.../columnreaders/VarLenOverflowReader.java | 372 ++++++++++++
.../parquet/columnreaders/VarLengthColumn.java | 4 +-
.../columnreaders/VarLengthColumnReaders.java | 24 +-
.../columnreaders/VarLengthValuesColumn.java | 4 +-
.../batchsizing/BatchOverflowOptimizer.java | 147 +++++
.../batchsizing/BatchSizingMemoryUtil.java | 329 ++++++++++
.../batchsizing/OverflowSerDeUtil.java | 366 ++++++++++++
.../batchsizing/RecordBatchOverflow.java | 177 ++++++
.../batchsizing/RecordBatchSizerManager.java | 661 +++++++++++++++++++++
.../drill/exec/util/record/RecordBatchStats.java | 225 +++++++
.../java-exec/src/main/resources/drill-module.conf | 6 +
.../columnreaders/TestBatchSizingMemoryUtil.java | 167 ++++++
.../codegen/templates/NullableValueVectors.java | 7 +
.../codegen/templates/VariableLengthVectors.java | 12 +-
44 files changed, 3525 insertions(+), 429 deletions(-)
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
index 634f670..bc16272 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
@@ -316,6 +316,13 @@ public final class ExecConstants {
public static final String PARQUET_FLAT_READER_BULK = "store.parquet.flat.reader.bulk";
public static final OptionValidator PARQUET_FLAT_READER_BULK_VALIDATOR = new BooleanValidator(PARQUET_FLAT_READER_BULK);
+ // Controls the flat parquet reader batching constraints (number of record and memory limit)
+ public static final String PARQUET_FLAT_BATCH_NUM_RECORDS = "store.parquet.flat.batch.num_records";
+ public static final OptionValidator PARQUET_FLAT_BATCH_NUM_RECORDS_VALIDATOR = new RangeLongValidator(PARQUET_FLAT_BATCH_NUM_RECORDS, 1, Integer.MAX_VALUE);
+ public static final String PARQUET_FLAT_BATCH_MEMORY_SIZE = "store.parquet.flat.batch.memory_size";
+ // This configuration is used to overwrite the common memory batch sizing configuration property
+ public static final OptionValidator PARQUET_FLAT_BATCH_MEMORY_SIZE_VALIDATOR = new RangeLongValidator(PARQUET_FLAT_BATCH_MEMORY_SIZE, 0, Integer.MAX_VALUE);
+
public static final String JSON_ALL_TEXT_MODE = "store.json.all_text_mode";
public static final BooleanValidator JSON_READER_ALL_TEXT_MODE_VALIDATOR = new BooleanValidator(JSON_ALL_TEXT_MODE);
public static final BooleanValidator JSON_EXTENDED_TYPES = new BooleanValidator("store.json.extended_types");
@@ -689,4 +696,13 @@ public final class ExecConstants {
public static final String ALLOW_LOOPBACK_ADDRESS_BINDING = "drill.exec.allow_loopback_address_binding";
+ /** Enables batch size statistics logging */
+ public static final String STATS_LOGGING_BATCH_SIZE_OPTION = "drill.exec.stats.logging.batch_size";
+ public static final BooleanValidator STATS_LOGGING_BATCH_SIZE_VALIDATOR = new BooleanValidator(STATS_LOGGING_BATCH_SIZE_OPTION);
+
+ /** Enables fine-grained batch size statistics logging */
+ public static final String STATS_LOGGING_FG_BATCH_SIZE_OPTION = "drill.exec.stats.logging.fine_grained.batch_size";
+ public static final BooleanValidator STATS_LOGGING_BATCH_FG_SIZE_VALIDATOR = new BooleanValidator(STATS_LOGGING_FG_BATCH_SIZE_OPTION);
+
+
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
index 4a62752..09e785e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
@@ -47,6 +47,8 @@ import org.apache.drill.exec.store.RecordReader;
import org.apache.drill.exec.testing.ControlsInjector;
import org.apache.drill.exec.testing.ControlsInjectorFactory;
import org.apache.drill.exec.util.CallBack;
+import org.apache.drill.exec.util.record.RecordBatchStats;
+import org.apache.drill.exec.util.record.RecordBatchStats.RecordBatchStatsContext;
import org.apache.drill.exec.vector.AllocationHelper;
import org.apache.drill.exec.vector.NullableVarCharVector;
import org.apache.drill.exec.vector.SchemaChangeCallBack;
@@ -80,6 +82,8 @@ public class ScanBatch implements CloseableRecordBatch {
private final BufferAllocator allocator;
private final List<Map<String, String>> implicitColumnList;
private String currentReaderClassName;
+ private final RecordBatchStatsContext batchStatsLogging;
+
/**
*
* @param context
@@ -117,6 +121,7 @@ public class ScanBatch implements CloseableRecordBatch {
this.implicitColumnList = implicitColumnList;
addImplicitVectors();
currentReader = null;
+ batchStatsLogging = new RecordBatchStatsContext(context, oContext);
} finally {
oContext.getStats().stopProcessing();
}
@@ -174,6 +179,7 @@ public class ScanBatch implements CloseableRecordBatch {
boolean isNewSchema = mutator.isNewSchema();
populateImplicitVectorsAndSetCount();
oContext.getStats().batchReceived(0, recordCount, isNewSchema);
+ logRecordBatchStats();
if (recordCount == 0) {
currentReader.close();
@@ -291,6 +297,45 @@ public class ScanBatch implements CloseableRecordBatch {
return container.getValueAccessorById(clazz, ids);
}
+ private void logRecordBatchStats() {
+ final int MAX_FQN_LENGTH = 50;
+
+ if (recordCount == 0) {
+ return; // NOOP
+ }
+
+ RecordBatchStats.logRecordBatchStats(
+ batchStatsLogging.getContextOperatorId(),
+ getFQNForLogging(MAX_FQN_LENGTH),
+ this,
+ batchStatsLogging,
+ logger);
+ }
+
+ /** Might truncate the FQN if too long */
+ private String getFQNForLogging(int maxLength) {
+ final String FQNKey = "FQN";
+ final ValueVector v = mutator.implicitFieldVectorMap.get(FQNKey);
+
+ final Object fqnObj;
+
+ if (v == null
+ || v.getAccessor().getValueCount() == 0
+ || (fqnObj = ((NullableVarCharVector) v).getAccessor().getObject(0)) == null) {
+
+ return "NA";
+ }
+
+ String fqn = fqnObj.toString();
+
+ if (fqn != null && fqn.length() > maxLength) {
+ fqn = fqn.substring(fqn.length() - maxLength, fqn.length());
+ }
+
+ return fqn;
+ }
+
+
/**
* Row set mutator implementation provided to record readers created by
* this scan batch. Made visible so that tests can create this mutator
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
index 06b4c57..e6368f5 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
@@ -149,6 +149,8 @@ public class SystemOptionManager extends BaseOptionManager implements AutoClosea
new OptionDefinition(ExecConstants.PARQUET_PAGEREADER_USE_FADVISE_VALIDATOR),
new OptionDefinition(ExecConstants.PARQUET_READER_INT96_AS_TIMESTAMP_VALIDATOR),
new OptionDefinition(ExecConstants.PARQUET_FLAT_READER_BULK_VALIDATOR),
+ new OptionDefinition(ExecConstants.PARQUET_FLAT_BATCH_NUM_RECORDS_VALIDATOR, new OptionMetaData(OptionValue.AccessibleScopes.SYSTEM_AND_SESSION, true, true)),
+ new OptionDefinition(ExecConstants.PARQUET_FLAT_BATCH_MEMORY_SIZE_VALIDATOR, new OptionMetaData(OptionValue.AccessibleScopes.SYSTEM_AND_SESSION, true, true)),
new OptionDefinition(ExecConstants.JSON_READER_ALL_TEXT_MODE_VALIDATOR),
new OptionDefinition(ExecConstants.JSON_WRITER_NAN_INF_NUMBERS_VALIDATOR),
new OptionDefinition(ExecConstants.JSON_READER_NAN_INF_NUMBERS_VALIDATOR),
@@ -229,6 +231,8 @@ public class SystemOptionManager extends BaseOptionManager implements AutoClosea
new OptionDefinition(ExecConstants.ENABLE_VECTOR_VALIDATOR),
new OptionDefinition(ExecConstants.ENABLE_ITERATOR_VALIDATOR),
new OptionDefinition(ExecConstants.OUTPUT_BATCH_SIZE_VALIDATOR, new OptionMetaData(OptionValue.AccessibleScopes.SYSTEM, true, false)),
+ new OptionDefinition(ExecConstants.STATS_LOGGING_BATCH_SIZE_VALIDATOR, new OptionMetaData(OptionValue.AccessibleScopes.SYSTEM_AND_SESSION, true, true)),
+ new OptionDefinition(ExecConstants.STATS_LOGGING_BATCH_FG_SIZE_VALIDATOR,new OptionMetaData(OptionValue.AccessibleScopes.SYSTEM_AND_SESSION, true, true)),
new OptionDefinition(ExecConstants.FRAG_RUNNER_RPC_TIMEOUT_VALIDATOR, new OptionMetaData(OptionValue.AccessibleScopes.SYSTEM, true, true)),
};
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/BatchReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/BatchReader.java
index 367b226..25dfbc8 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/BatchReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/BatchReader.java
@@ -21,7 +21,7 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
-
+import com.google.common.base.Preconditions;
import com.google.common.base.Stopwatch;
import com.google.common.collect.Lists;
@@ -38,14 +38,14 @@ public abstract class BatchReader {
public int readBatch() throws Exception {
ColumnReader<?> firstColumnStatus = readState.getFirstColumnReader();
- long recordsToRead = Math.min(getReadCount(firstColumnStatus), readState.getRecordsToRead());
+ int currBatchNumRecords = readState.batchSizerMgr().getCurrentRecordsPerBatch();
+ long recordsToRead = Math.min(currBatchNumRecords, readState.getRemainingValuesToRead());
int readCount = readRecords(firstColumnStatus, recordsToRead);
+
readState.fillNullVectors(readCount);
return readCount;
}
- protected abstract long getReadCount(ColumnReader<?> firstColumnStatus);
-
protected abstract int readRecords(ColumnReader<?> firstColumnStatus, long recordsToRead) throws Exception;
protected void readAllFixedFields(long recordsToRead) throws Exception {
@@ -59,14 +59,14 @@ public abstract class BatchReader {
}
protected void readAllFixedFieldsSerial(long recordsToRead) throws IOException {
- for (ColumnReader<?> crs : readState.getColumnReaders()) {
+ for (ColumnReader<?> crs : readState.getFixedLenColumnReaders()) {
crs.processPages(recordsToRead);
}
}
protected void readAllFixedFieldsParallel(long recordsToRead) throws Exception {
ArrayList<Future<Long>> futures = Lists.newArrayList();
- for (ColumnReader<?> crs : readState.getColumnReaders()) {
+ for (ColumnReader<?> crs : readState.getFixedLenColumnReaders()) {
Future<Long> f = crs.processPagesAsync(recordsToRead);
if (f != null) {
futures.add(f);
@@ -106,15 +106,6 @@ public abstract class BatchReader {
}
@Override
- protected long getReadCount(ColumnReader<?> firstColumnStatus) {
- if (readState.recordsRead() == readState.schema().getGroupRecordCount()) {
- return 0;
- }
- return Math.min(ParquetRecordReader.DEFAULT_RECORDS_TO_READ_IF_VARIABLE_WIDTH,
- readState.schema().getGroupRecordCount() - readState.recordsRead());
- }
-
- @Override
protected int readRecords(ColumnReader<?> firstColumnStatus, long recordsToRead) {
readState.updateCounts((int) recordsToRead);
return (int) recordsToRead;
@@ -133,15 +124,15 @@ public abstract class BatchReader {
}
@Override
- protected long getReadCount(ColumnReader<?> firstColumnStatus) {
- return Math.min(readState.schema().getRecordsPerBatch(),
- firstColumnStatus.columnChunkMetaData.getValueCount() - firstColumnStatus.totalValuesRead);
- }
-
- @Override
protected int readRecords(ColumnReader<?> firstColumnStatus, long recordsToRead) throws Exception {
readAllFixedFields(recordsToRead);
- return firstColumnStatus.getRecordsReadInCurrentPass();
+
+ Preconditions.checkNotNull(firstColumnStatus != null);
+ readState.setValuesReadInCurrentPass(firstColumnStatus.getRecordsReadInCurrentPass()); // get the number of rows read
+
+ readState.updateCounts((int) recordsToRead); // update the shared Reader State
+
+ return readState.getValuesReadInCurrentPass();
}
}
@@ -157,15 +148,21 @@ public abstract class BatchReader {
}
@Override
- protected long getReadCount(ColumnReader<?> firstColumnStatus) {
- return ParquetRecordReader.DEFAULT_RECORDS_TO_READ_IF_VARIABLE_WIDTH;
- }
-
- @Override
protected int readRecords(ColumnReader<?> firstColumnStatus, long recordsToRead) throws Exception {
+ // We should not rely on the "firstColumnStatus.getRecordsReadInCurrentPass()" when dealing
+ // with variable length columns as each might return a different number of records. The batch size
+ // will be the lowest value. The variable column readers will update the "readState" object to
+ // reflect the correct information.
long fixedRecordsToRead = readState.varLengthReader().readFields(recordsToRead);
readAllFixedFields(fixedRecordsToRead);
- return firstColumnStatus.getRecordsReadInCurrentPass();
+
+ // Sanity check the fixed readers read the expected number of rows
+ Preconditions.checkArgument(firstColumnStatus == null
+ || firstColumnStatus.getRecordsReadInCurrentPass() == readState.getValuesReadInCurrentPass());
+
+ readState.updateCounts((int) fixedRecordsToRead);
+
+ return readState.getValuesReadInCurrentPass();
}
}
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/BitReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/BitReader.java
index 8865f0b..b4f1f25 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/BitReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/BitReader.java
@@ -25,9 +25,9 @@ import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
final class BitReader extends ColumnReader<BitVector> {
- BitReader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor, ColumnChunkMetaData columnChunkMetaData,
+ BitReader(ParquetRecordReader parentReader, ColumnDescriptor descriptor, ColumnChunkMetaData columnChunkMetaData,
boolean fixedLength, BitVector v, SchemaElement schemaElement) throws ExecutionSetupException {
- super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
+ super(parentReader, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
}
@Override
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReader.java
index dabd8ca..c343d31 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReader.java
@@ -83,7 +83,7 @@ public abstract class ColumnReader<V extends ValueVector> {
volatile boolean isShuttingDown; //Indicate to not submit any new AsyncPageReader Tasks during clear()
- protected ColumnReader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor,
+ protected ColumnReader(ParquetRecordReader parentReader, ColumnDescriptor descriptor,
ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, V v, SchemaElement schemaElement) throws ExecutionSetupException {
this.parentReader = parentReader;
this.columnDescriptor = descriptor;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReaderFactory.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReaderFactory.java
index ad5d23a..798d3c2 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReaderFactory.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReaderFactory.java
@@ -60,12 +60,11 @@ public class ColumnReaderFactory {
* @param fixedLength
* @param descriptor
* @param columnChunkMetaData
- * @param allocateSize - the size of the vector to create
- * @return
+ * @return ColumnReader object instance
* @throws SchemaChangeException
*/
static ColumnReader<?> createFixedColumnReader(ParquetRecordReader recordReader, boolean fixedLength, ColumnDescriptor descriptor,
- ColumnChunkMetaData columnChunkMetaData, int allocateSize, ValueVector v,
+ ColumnChunkMetaData columnChunkMetaData, ValueVector v,
SchemaElement schemaElement)
throws Exception {
ConvertedType convertedType = schemaElement.getConverted_type();
@@ -73,30 +72,30 @@ public class ColumnReaderFactory {
// ColumnReader for actually transferring data into the data vector inside of our repeated vector
if (descriptor.getMaxDefinitionLevel() == 0 || descriptor.getMaxRepetitionLevel() > 0) {
if (columnChunkMetaData.getType() == PrimitiveType.PrimitiveTypeName.BOOLEAN){
- return new BitReader(recordReader, allocateSize, descriptor, columnChunkMetaData,
+ return new BitReader(recordReader, descriptor, columnChunkMetaData,
fixedLength, (BitVector) v, schemaElement);
} else if (!columnChunkMetaData.getEncodings().contains(Encoding.PLAIN_DICTIONARY) && (
columnChunkMetaData.getType() == PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY
|| columnChunkMetaData.getType() == PrimitiveType.PrimitiveTypeName.INT96)) {
switch (convertedType) {
case DECIMAL:
- return new FixedByteAlignedReader.VarDecimalReader(recordReader, allocateSize, descriptor,
+ return new FixedByteAlignedReader.VarDecimalReader(recordReader, descriptor,
columnChunkMetaData, fixedLength, (VarDecimalVector) v, schemaElement);
case INTERVAL:
- return new FixedByteAlignedReader.IntervalReader(recordReader, allocateSize, descriptor,
+ return new FixedByteAlignedReader.IntervalReader(recordReader, descriptor,
columnChunkMetaData, fixedLength, (IntervalVector) v, schemaElement);
default:
- return new FixedByteAlignedReader.FixedBinaryReader(recordReader, allocateSize, descriptor,
+ return new FixedByteAlignedReader.FixedBinaryReader(recordReader, descriptor,
columnChunkMetaData, (VariableWidthVector) v, schemaElement);
}
} else if (columnChunkMetaData.getType() == PrimitiveType.PrimitiveTypeName.INT32 && convertedType == ConvertedType.DATE){
switch(recordReader.getDateCorruptionStatus()) {
case META_SHOWS_CORRUPTION:
- return new FixedByteAlignedReader.CorruptDateReader(recordReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, (DateVector) v, schemaElement);
+ return new FixedByteAlignedReader.CorruptDateReader(recordReader, descriptor, columnChunkMetaData, fixedLength, (DateVector) v, schemaElement);
case META_SHOWS_NO_CORRUPTION:
- return new FixedByteAlignedReader.DateReader(recordReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, (DateVector) v, schemaElement);
+ return new FixedByteAlignedReader.DateReader(recordReader, descriptor, columnChunkMetaData, fixedLength, (DateVector) v, schemaElement);
case META_UNCLEAR_TEST_VALUES:
- return new FixedByteAlignedReader.CorruptionDetectingDateReader(recordReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, (DateVector) v, schemaElement);
+ return new FixedByteAlignedReader.CorruptionDetectingDateReader(recordReader, descriptor, columnChunkMetaData, fixedLength, (DateVector) v, schemaElement);
default:
throw new ExecutionSetupException(
String.format("Issue setting up parquet reader for date type, " +
@@ -108,78 +107,78 @@ public class ColumnReaderFactory {
switch (columnChunkMetaData.getType()) {
case INT32:
if (convertedType == null) {
- return new ParquetFixedWidthDictionaryReaders.DictionaryIntReader(recordReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, (IntVector) v, schemaElement);
+ return new ParquetFixedWidthDictionaryReaders.DictionaryIntReader(recordReader, descriptor, columnChunkMetaData, fixedLength, (IntVector) v, schemaElement);
}
switch (convertedType) {
case DECIMAL:
- return new ParquetFixedWidthDictionaryReaders.DictionaryVarDecimalReader(recordReader, allocateSize,
+ return new ParquetFixedWidthDictionaryReaders.DictionaryVarDecimalReader(recordReader,
descriptor, columnChunkMetaData, fixedLength, (VarDecimalVector) v, schemaElement);
case TIME_MILLIS:
- return new ParquetFixedWidthDictionaryReaders.DictionaryTimeReader(recordReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, (TimeVector) v, schemaElement);
+ return new ParquetFixedWidthDictionaryReaders.DictionaryTimeReader(recordReader, descriptor, columnChunkMetaData, fixedLength, (TimeVector) v, schemaElement);
case INT_8:
case INT_16:
case INT_32:
- return new ParquetFixedWidthDictionaryReaders.DictionaryIntReader(recordReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, (IntVector) v, schemaElement);
+ return new ParquetFixedWidthDictionaryReaders.DictionaryIntReader(recordReader, descriptor, columnChunkMetaData, fixedLength, (IntVector) v, schemaElement);
case UINT_8:
case UINT_16:
case UINT_32:
- return new ParquetFixedWidthDictionaryReaders.DictionaryUInt4Reader(recordReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, (UInt4Vector) v, schemaElement);
+ return new ParquetFixedWidthDictionaryReaders.DictionaryUInt4Reader(recordReader, descriptor, columnChunkMetaData, fixedLength, (UInt4Vector) v, schemaElement);
default:
throw new ExecutionSetupException("Unsupported dictionary converted type " + convertedType + " for primitive type INT32");
}
case INT64:
if (convertedType == null) {
- return new ParquetFixedWidthDictionaryReaders.DictionaryBigIntReader(recordReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, (BigIntVector) v, schemaElement);
+ return new ParquetFixedWidthDictionaryReaders.DictionaryBigIntReader(recordReader, descriptor, columnChunkMetaData, fixedLength, (BigIntVector) v, schemaElement);
}
switch (convertedType) {
case INT_64:
- return new ParquetFixedWidthDictionaryReaders.DictionaryBigIntReader(recordReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, (BigIntVector) v, schemaElement);
+ return new ParquetFixedWidthDictionaryReaders.DictionaryBigIntReader(recordReader, descriptor, columnChunkMetaData, fixedLength, (BigIntVector) v, schemaElement);
case UINT_64:
- return new ParquetFixedWidthDictionaryReaders.DictionaryUInt8Reader(recordReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, (UInt8Vector) v, schemaElement);
+ return new ParquetFixedWidthDictionaryReaders.DictionaryUInt8Reader(recordReader, descriptor, columnChunkMetaData, fixedLength, (UInt8Vector) v, schemaElement);
case DECIMAL:
- return new ParquetFixedWidthDictionaryReaders.DictionaryVarDecimalReader(recordReader, allocateSize,
+ return new ParquetFixedWidthDictionaryReaders.DictionaryVarDecimalReader(recordReader,
descriptor, columnChunkMetaData, fixedLength, (VarDecimalVector) v, schemaElement);
case TIMESTAMP_MILLIS:
- return new ParquetFixedWidthDictionaryReaders.DictionaryTimeStampReader(recordReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, (TimeStampVector) v, schemaElement);
+ return new ParquetFixedWidthDictionaryReaders.DictionaryTimeStampReader(recordReader, descriptor, columnChunkMetaData, fixedLength, (TimeStampVector) v, schemaElement);
default:
throw new ExecutionSetupException("Unsupported dictionary converted type " + convertedType + " for primitive type INT64");
}
case FLOAT:
- return new ParquetFixedWidthDictionaryReaders.DictionaryFloat4Reader(recordReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, (Float4Vector) v, schemaElement);
+ return new ParquetFixedWidthDictionaryReaders.DictionaryFloat4Reader(recordReader, descriptor, columnChunkMetaData, fixedLength, (Float4Vector) v, schemaElement);
case DOUBLE:
- return new ParquetFixedWidthDictionaryReaders.DictionaryFloat8Reader(recordReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, (Float8Vector) v, schemaElement);
+ return new ParquetFixedWidthDictionaryReaders.DictionaryFloat8Reader(recordReader, descriptor, columnChunkMetaData, fixedLength, (Float8Vector) v, schemaElement);
case FIXED_LEN_BYTE_ARRAY:
- return new ParquetFixedWidthDictionaryReaders.DictionaryFixedBinaryReader(recordReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, (VarBinaryVector) v, schemaElement);
+ return new ParquetFixedWidthDictionaryReaders.DictionaryFixedBinaryReader(recordReader, descriptor, columnChunkMetaData, fixedLength, (VarBinaryVector) v, schemaElement);
case INT96:
if (recordReader.getFragmentContext().getOptions().getOption(ExecConstants.PARQUET_READER_INT96_AS_TIMESTAMP).bool_val) {
- return new ParquetFixedWidthDictionaryReaders.DictionaryBinaryAsTimeStampReader(recordReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, (TimeStampVector) v, schemaElement);
+ return new ParquetFixedWidthDictionaryReaders.DictionaryBinaryAsTimeStampReader(recordReader, descriptor, columnChunkMetaData, fixedLength, (TimeStampVector) v, schemaElement);
} else {
- return new ParquetFixedWidthDictionaryReaders.DictionaryFixedBinaryReader(recordReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, (VarBinaryVector) v, schemaElement);
+ return new ParquetFixedWidthDictionaryReaders.DictionaryFixedBinaryReader(recordReader, descriptor, columnChunkMetaData, fixedLength, (VarBinaryVector) v, schemaElement);
}
default:
throw new ExecutionSetupException("Unsupported dictionary column type " + descriptor.getType().name() );
}
} else if (convertedType == ConvertedType.DECIMAL) {
- return new FixedByteAlignedReader.VarDecimalReader(recordReader, allocateSize,
+ return new FixedByteAlignedReader.VarDecimalReader(recordReader,
descriptor, columnChunkMetaData, fixedLength, (VarDecimalVector) v, schemaElement);
} else {
- return new FixedByteAlignedReader<>(recordReader, allocateSize, descriptor, columnChunkMetaData,
+ return new FixedByteAlignedReader<>(recordReader, descriptor, columnChunkMetaData,
fixedLength, v, schemaElement);
}
}
} else { // if the column is nullable
if (columnChunkMetaData.getType() == PrimitiveType.PrimitiveTypeName.BOOLEAN) {
- return new NullableBitReader(recordReader, allocateSize, descriptor, columnChunkMetaData,
+ return new NullableBitReader(recordReader, descriptor, columnChunkMetaData,
fixedLength, (NullableBitVector) v, schemaElement);
} else if (columnChunkMetaData.getType() == PrimitiveType.PrimitiveTypeName.INT32 && convertedType == ConvertedType.DATE) {
switch(recordReader.getDateCorruptionStatus()) {
case META_SHOWS_CORRUPTION:
- return new NullableFixedByteAlignedReaders.NullableCorruptDateReader(recordReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, (NullableDateVector)v, schemaElement);
+ return new NullableFixedByteAlignedReaders.NullableCorruptDateReader(recordReader, descriptor, columnChunkMetaData, fixedLength, (NullableDateVector)v, schemaElement);
case META_SHOWS_NO_CORRUPTION:
- return new NullableFixedByteAlignedReaders.NullableDateReader(recordReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, (NullableDateVector) v, schemaElement);
+ return new NullableFixedByteAlignedReaders.NullableDateReader(recordReader, descriptor, columnChunkMetaData, fixedLength, (NullableDateVector) v, schemaElement);
case META_UNCLEAR_TEST_VALUES:
- return new NullableFixedByteAlignedReaders.CorruptionDetectingNullableDateReader(recordReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, (NullableDateVector) v, schemaElement);
+ return new NullableFixedByteAlignedReaders.CorruptionDetectingNullableDateReader(recordReader, descriptor, columnChunkMetaData, fixedLength, (NullableDateVector) v, schemaElement);
default:
throw new ExecutionSetupException(
String.format("Issue setting up parquet reader for date type, " +
@@ -188,21 +187,21 @@ public class ColumnReaderFactory {
}
} else if (columnChunkMetaData.getType() == PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY) {
if (convertedType == ConvertedType.DECIMAL) {
- return new NullableFixedByteAlignedReaders.NullableVarDecimalReader(recordReader, allocateSize,
+ return new NullableFixedByteAlignedReaders.NullableVarDecimalReader(recordReader,
descriptor, columnChunkMetaData, fixedLength, (NullableVarDecimalVector) v, schemaElement);
} else if (convertedType == ConvertedType.INTERVAL) {
- return new NullableFixedByteAlignedReaders.NullableIntervalReader(recordReader, allocateSize, descriptor,
+ return new NullableFixedByteAlignedReaders.NullableIntervalReader(recordReader, descriptor,
columnChunkMetaData, fixedLength, (NullableIntervalVector) v, schemaElement);
}
} else {
- return getNullableColumnReader(recordReader, allocateSize, descriptor,
+ return getNullableColumnReader(recordReader, descriptor,
columnChunkMetaData, fixedLength, v, schemaElement);
}
}
throw new Exception("Unexpected parquet metadata configuration.");
}
- static VarLengthValuesColumn<?> getReader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor,
+ static VarLengthValuesColumn<?> getReader(ParquetRecordReader parentReader, ColumnDescriptor descriptor,
ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, ValueVector v,
SchemaElement schemaElement
) throws ExecutionSetupException {
@@ -210,39 +209,39 @@ public class ColumnReaderFactory {
switch (descriptor.getMaxDefinitionLevel()) {
case 0:
if (convertedType == null) {
- return new VarLengthColumnReaders.VarBinaryColumn(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, (VarBinaryVector) v, schemaElement);
+ return new VarLengthColumnReaders.VarBinaryColumn(parentReader, descriptor, columnChunkMetaData, fixedLength, (VarBinaryVector) v, schemaElement);
}
switch (convertedType) {
case UTF8:
case ENUM:
- return new VarLengthColumnReaders.VarCharColumn(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, (VarCharVector) v, schemaElement);
+ return new VarLengthColumnReaders.VarCharColumn(parentReader, descriptor, columnChunkMetaData, fixedLength, (VarCharVector) v, schemaElement);
case DECIMAL:
if (v instanceof VarDecimalVector) {
- return new VarLengthColumnReaders.VarDecimalColumn(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, (VarDecimalVector) v, schemaElement);
+ return new VarLengthColumnReaders.VarDecimalColumn(parentReader, descriptor, columnChunkMetaData, fixedLength, (VarDecimalVector) v, schemaElement);
}
default:
- return new VarLengthColumnReaders.VarBinaryColumn(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, (VarBinaryVector) v, schemaElement);
+ return new VarLengthColumnReaders.VarBinaryColumn(parentReader, descriptor, columnChunkMetaData, fixedLength, (VarBinaryVector) v, schemaElement);
}
default:
if (convertedType == null) {
- return new VarLengthColumnReaders.NullableVarBinaryColumn(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, (NullableVarBinaryVector) v, schemaElement);
+ return new VarLengthColumnReaders.NullableVarBinaryColumn(parentReader, descriptor, columnChunkMetaData, fixedLength, (NullableVarBinaryVector) v, schemaElement);
}
switch (convertedType) {
case UTF8:
case ENUM:
- return new VarLengthColumnReaders.NullableVarCharColumn(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, (NullableVarCharVector) v, schemaElement);
+ return new VarLengthColumnReaders.NullableVarCharColumn(parentReader, descriptor, columnChunkMetaData, fixedLength, (NullableVarCharVector) v, schemaElement);
case DECIMAL:
if (v instanceof NullableVarDecimalVector) {
- return new VarLengthColumnReaders.NullableVarDecimalColumn(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, (NullableVarDecimalVector) v, schemaElement);
+ return new VarLengthColumnReaders.NullableVarDecimalColumn(parentReader, descriptor, columnChunkMetaData, fixedLength, (NullableVarDecimalVector) v, schemaElement);
}
default:
- return new VarLengthColumnReaders.NullableVarBinaryColumn(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, (NullableVarBinaryVector) v, schemaElement);
+ return new VarLengthColumnReaders.NullableVarBinaryColumn(parentReader, descriptor, columnChunkMetaData, fixedLength, (NullableVarBinaryVector) v, schemaElement);
}
}
}
- public static NullableColumnReader<?> getNullableColumnReader(ParquetRecordReader parentReader, int allocateSize,
+ public static NullableColumnReader<?> getNullableColumnReader(ParquetRecordReader parentReader,
ColumnDescriptor columnDescriptor,
ColumnChunkMetaData columnChunkMetaData,
boolean fixedLength,
@@ -254,58 +253,58 @@ public class ColumnReaderFactory {
if (columnDescriptor.getType() == PrimitiveType.PrimitiveTypeName.INT96) {
// TODO: check convertedType once parquet support TIMESTAMP_NANOS type annotation.
if (parentReader.getFragmentContext().getOptions().getOption(ExecConstants.PARQUET_READER_INT96_AS_TIMESTAMP).bool_val) {
- return new NullableFixedByteAlignedReaders.NullableFixedBinaryAsTimeStampReader(parentReader, allocateSize, columnDescriptor, columnChunkMetaData, true, (NullableTimeStampVector) valueVec, schemaElement);
+ return new NullableFixedByteAlignedReaders.NullableFixedBinaryAsTimeStampReader(parentReader, columnDescriptor, columnChunkMetaData, true, (NullableTimeStampVector) valueVec, schemaElement);
} else {
- return new NullableFixedByteAlignedReaders.NullableFixedBinaryReader(parentReader, allocateSize, columnDescriptor, columnChunkMetaData, true, (NullableVarBinaryVector) valueVec, schemaElement);
+ return new NullableFixedByteAlignedReaders.NullableFixedBinaryReader(parentReader, columnDescriptor, columnChunkMetaData, true, (NullableVarBinaryVector) valueVec, schemaElement);
}
} else if (convertedType == ConvertedType.DECIMAL) {
// NullableVarDecimalVector allows storing of values with different width,
// so every time when the value is added, offset vector should be updated.
// Therefore NullableVarDecimalReader is used here instead of NullableFixedByteAlignedReader.
- return new NullableFixedByteAlignedReaders.NullableVarDecimalReader(parentReader, allocateSize,
+ return new NullableFixedByteAlignedReaders.NullableVarDecimalReader(parentReader,
columnDescriptor, columnChunkMetaData, fixedLength, (NullableVarDecimalVector) valueVec, schemaElement);
} else {
- return new NullableFixedByteAlignedReaders.NullableFixedByteAlignedReader<>(parentReader, allocateSize, columnDescriptor, columnChunkMetaData, fixedLength, valueVec, schemaElement);
+ return new NullableFixedByteAlignedReaders.NullableFixedByteAlignedReader<>(parentReader, columnDescriptor, columnChunkMetaData, fixedLength, valueVec, schemaElement);
}
} else {
switch (columnDescriptor.getType()) {
case INT32:
if (convertedType == null) {
- return new NullableFixedByteAlignedReaders.NullableDictionaryIntReader(parentReader, allocateSize, columnDescriptor, columnChunkMetaData, fixedLength, (NullableIntVector) valueVec, schemaElement);
+ return new NullableFixedByteAlignedReaders.NullableDictionaryIntReader(parentReader, columnDescriptor, columnChunkMetaData, fixedLength, (NullableIntVector) valueVec, schemaElement);
}
switch (convertedType) {
case DECIMAL:
- return new NullableFixedByteAlignedReaders.NullableDictionaryVarDecimalReader(parentReader, allocateSize,
+ return new NullableFixedByteAlignedReaders.NullableDictionaryVarDecimalReader(parentReader,
columnDescriptor, columnChunkMetaData, fixedLength, (NullableVarDecimalVector) valueVec, schemaElement);
case TIME_MILLIS:
- return new NullableFixedByteAlignedReaders.NullableDictionaryTimeReader(parentReader, allocateSize, columnDescriptor, columnChunkMetaData, fixedLength, (NullableTimeVector)valueVec, schemaElement);
+ return new NullableFixedByteAlignedReaders.NullableDictionaryTimeReader(parentReader, columnDescriptor, columnChunkMetaData, fixedLength, (NullableTimeVector)valueVec, schemaElement);
default:
throw new ExecutionSetupException("Unsupported nullable converted type " + convertedType + " for primitive type INT32");
}
case INT64:
if (convertedType == null) {
- return new NullableFixedByteAlignedReaders.NullableDictionaryBigIntReader(parentReader, allocateSize, columnDescriptor, columnChunkMetaData, fixedLength, (NullableBigIntVector)valueVec, schemaElement);
+ return new NullableFixedByteAlignedReaders.NullableDictionaryBigIntReader(parentReader, columnDescriptor, columnChunkMetaData, fixedLength, (NullableBigIntVector)valueVec, schemaElement);
}
switch (convertedType) {
case DECIMAL:
- return new NullableFixedByteAlignedReaders.NullableDictionaryVarDecimalReader(parentReader, allocateSize,
+ return new NullableFixedByteAlignedReaders.NullableDictionaryVarDecimalReader(parentReader,
columnDescriptor, columnChunkMetaData, fixedLength, (NullableVarDecimalVector) valueVec, schemaElement);
case TIMESTAMP_MILLIS:
- return new NullableFixedByteAlignedReaders.NullableDictionaryTimeStampReader(parentReader, allocateSize, columnDescriptor, columnChunkMetaData, fixedLength, (NullableTimeStampVector)valueVec, schemaElement);
+ return new NullableFixedByteAlignedReaders.NullableDictionaryTimeStampReader(parentReader, columnDescriptor, columnChunkMetaData, fixedLength, (NullableTimeStampVector)valueVec, schemaElement);
default:
throw new ExecutionSetupException("Unsupported nullable converted type " + convertedType + " for primitive type INT64");
}
case INT96:
// TODO: check convertedType once parquet support TIMESTAMP_NANOS type annotation.
if (parentReader.getFragmentContext().getOptions().getOption(ExecConstants.PARQUET_READER_INT96_AS_TIMESTAMP).bool_val) {
- return new NullableFixedByteAlignedReaders.NullableFixedBinaryAsTimeStampReader(parentReader, allocateSize, columnDescriptor, columnChunkMetaData, true, (NullableTimeStampVector) valueVec, schemaElement);
+ return new NullableFixedByteAlignedReaders.NullableFixedBinaryAsTimeStampReader(parentReader, columnDescriptor, columnChunkMetaData, true, (NullableTimeStampVector) valueVec, schemaElement);
} else {
- return new NullableFixedByteAlignedReaders.NullableFixedBinaryReader(parentReader, allocateSize, columnDescriptor, columnChunkMetaData, true, (NullableVarBinaryVector) valueVec, schemaElement);
+ return new NullableFixedByteAlignedReaders.NullableFixedBinaryReader(parentReader, columnDescriptor, columnChunkMetaData, true, (NullableVarBinaryVector) valueVec, schemaElement);
}
case FLOAT:
- return new NullableFixedByteAlignedReaders.NullableDictionaryFloat4Reader(parentReader, allocateSize, columnDescriptor, columnChunkMetaData, fixedLength, (NullableFloat4Vector)valueVec, schemaElement);
+ return new NullableFixedByteAlignedReaders.NullableDictionaryFloat4Reader(parentReader, columnDescriptor, columnChunkMetaData, fixedLength, (NullableFloat4Vector)valueVec, schemaElement);
case DOUBLE:
- return new NullableFixedByteAlignedReaders.NullableDictionaryFloat8Reader(parentReader, allocateSize, columnDescriptor, columnChunkMetaData, fixedLength, (NullableFloat8Vector)valueVec, schemaElement);
+ return new NullableFixedByteAlignedReaders.NullableDictionaryFloat8Reader(parentReader, columnDescriptor, columnChunkMetaData, fixedLength, (NullableFloat8Vector)valueVec, schemaElement);
default:
throw new ExecutionSetupException("Unsupported nullable column type " + columnDescriptor.getType().name() );
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/FixedByteAlignedReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/FixedByteAlignedReader.java
index 385b117..338b08d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/FixedByteAlignedReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/FixedByteAlignedReader.java
@@ -38,9 +38,9 @@ class FixedByteAlignedReader<V extends ValueVector> extends ColumnReader<V> {
protected DrillBuf bytebuf;
- FixedByteAlignedReader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor, ColumnChunkMetaData columnChunkMetaData,
+ FixedByteAlignedReader(ParquetRecordReader parentReader, ColumnDescriptor descriptor, ColumnChunkMetaData columnChunkMetaData,
boolean fixedLength, V v, SchemaElement schemaElement) throws ExecutionSetupException {
- super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
+ super(parentReader, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
}
// this method is called by its superclass during a read loop
@@ -67,9 +67,9 @@ class FixedByteAlignedReader<V extends ValueVector> extends ColumnReader<V> {
// TODO - replace this with fixed binary type in drill
VariableWidthVector castedVector;
- FixedBinaryReader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor, ColumnChunkMetaData columnChunkMetaData,
+ FixedBinaryReader(ParquetRecordReader parentReader, ColumnDescriptor descriptor, ColumnChunkMetaData columnChunkMetaData,
VariableWidthVector v, SchemaElement schemaElement) throws ExecutionSetupException {
- super(parentReader, allocateSize, descriptor, columnChunkMetaData, true, v, schemaElement);
+ super(parentReader, descriptor, columnChunkMetaData, true, v, schemaElement);
castedVector = v;
}
@@ -91,9 +91,9 @@ class FixedByteAlignedReader<V extends ValueVector> extends ColumnReader<V> {
protected int dataTypeLengthInBytes;
- ConvertedReader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor, ColumnChunkMetaData columnChunkMetaData,
+ ConvertedReader(ParquetRecordReader parentReader, ColumnDescriptor descriptor, ColumnChunkMetaData columnChunkMetaData,
boolean fixedLength, V v, SchemaElement schemaElement) throws ExecutionSetupException {
- super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
+ super(parentReader, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
}
@Override
@@ -115,9 +115,9 @@ class FixedByteAlignedReader<V extends ValueVector> extends ColumnReader<V> {
public static class DateReader extends ConvertedReader<DateVector> {
private final DateVector.Mutator mutator;
- DateReader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor, ColumnChunkMetaData columnChunkMetaData,
+ DateReader(ParquetRecordReader parentReader, ColumnDescriptor descriptor, ColumnChunkMetaData columnChunkMetaData,
boolean fixedLength, DateVector v, SchemaElement schemaElement) throws ExecutionSetupException {
- super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
+ super(parentReader, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
mutator = v.getMutator();
}
@@ -141,9 +141,9 @@ class FixedByteAlignedReader<V extends ValueVector> extends ColumnReader<V> {
private final DateVector.Mutator mutator;
- CorruptDateReader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor, ColumnChunkMetaData columnChunkMetaData,
+ CorruptDateReader(ParquetRecordReader parentReader, ColumnDescriptor descriptor, ColumnChunkMetaData columnChunkMetaData,
boolean fixedLength, DateVector v, SchemaElement schemaElement) throws ExecutionSetupException {
- super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
+ super(parentReader, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
mutator = v.getMutator();
}
@@ -171,9 +171,9 @@ class FixedByteAlignedReader<V extends ValueVector> extends ColumnReader<V> {
private final DateVector.Mutator mutator;
- CorruptionDetectingDateReader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor, ColumnChunkMetaData columnChunkMetaData,
+ CorruptionDetectingDateReader(ParquetRecordReader parentReader, ColumnDescriptor descriptor, ColumnChunkMetaData columnChunkMetaData,
boolean fixedLength, DateVector v, SchemaElement schemaElement) throws ExecutionSetupException {
- super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
+ super(parentReader, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
mutator = v.getMutator();
}
@@ -197,9 +197,9 @@ class FixedByteAlignedReader<V extends ValueVector> extends ColumnReader<V> {
public static class VarDecimalReader extends ConvertedReader<VarDecimalVector> {
- VarDecimalReader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor, ColumnChunkMetaData columnChunkMetaData,
+ VarDecimalReader(ParquetRecordReader parentReader, ColumnDescriptor descriptor, ColumnChunkMetaData columnChunkMetaData,
boolean fixedLength, VarDecimalVector v, SchemaElement schemaElement) throws ExecutionSetupException {
- super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
+ super(parentReader, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
}
@Override
@@ -220,9 +220,9 @@ class FixedByteAlignedReader<V extends ValueVector> extends ColumnReader<V> {
}
public static class IntervalReader extends ConvertedReader<IntervalVector> {
- IntervalReader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor, ColumnChunkMetaData columnChunkMetaData,
+ IntervalReader(ParquetRecordReader parentReader, ColumnDescriptor descriptor, ColumnChunkMetaData columnChunkMetaData,
boolean fixedLength, IntervalVector v, SchemaElement schemaElement) throws ExecutionSetupException {
- super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
+ super(parentReader, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
}
@Override
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/FixedWidthRepeatedReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/FixedWidthRepeatedReader.java
index 162e502..0b2d678 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/FixedWidthRepeatedReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/FixedWidthRepeatedReader.java
@@ -46,8 +46,10 @@ public class FixedWidthRepeatedReader extends VarLengthColumn<RepeatedValueVecto
boolean notFishedReadingList;
byte[] leftOverBytes;
- FixedWidthRepeatedReader(ParquetRecordReader parentReader, ColumnReader<?> dataReader, int dataTypeLengthInBytes, int allocateSize, ColumnDescriptor descriptor, ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, RepeatedValueVector valueVector, SchemaElement schemaElement) throws ExecutionSetupException {
- super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, valueVector, schemaElement);
+ FixedWidthRepeatedReader(ParquetRecordReader parentReader, ColumnReader<?> dataReader, int dataTypeLengthInBytes,
+ ColumnDescriptor descriptor, ColumnChunkMetaData columnChunkMetaData, boolean fixedLength,
+ RepeatedValueVector valueVector, SchemaElement schemaElement) throws ExecutionSetupException {
+ super(parentReader, descriptor, columnChunkMetaData, fixedLength, valueVector, schemaElement);
this.dataTypeLengthInBytes = dataTypeLengthInBytes;
this.dataReader = dataReader;
this.dataReader.pageReader.clear();
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableBitReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableBitReader.java
index b97bc82..e3a8f31 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableBitReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableBitReader.java
@@ -33,9 +33,9 @@ import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
*/
final class NullableBitReader extends ColumnReader<NullableBitVector> {
- NullableBitReader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor, ColumnChunkMetaData columnChunkMetaData,
+ NullableBitReader(ParquetRecordReader parentReader, ColumnDescriptor descriptor, ColumnChunkMetaData columnChunkMetaData,
boolean fixedLength, NullableBitVector v, SchemaElement schemaElement) throws ExecutionSetupException {
- super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
+ super(parentReader, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
}
@Override
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableColumnReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableColumnReader.java
index 949474b..ed61bef 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableColumnReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableColumnReader.java
@@ -38,9 +38,9 @@ abstract class NullableColumnReader<V extends ValueVector> extends ColumnReader<
/** Definition level wrapper to handle {@link ValueVector} limitations */
private final DefLevelReaderWrapper definitionLevelWrapper = new DefLevelReaderWrapper();
- NullableColumnReader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor, ColumnChunkMetaData columnChunkMetaData,
+ NullableColumnReader(ParquetRecordReader parentReader, ColumnDescriptor descriptor, ColumnChunkMetaData columnChunkMetaData,
boolean fixedLength, V v, SchemaElement schemaElement) throws ExecutionSetupException {
- super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
+ super(parentReader, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
castedBaseVector = (BaseDataValueVector) v;
castedVectorMutator = (NullableVectorDefinitionSetter) v.getMutator();
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableFixedByteAlignedReaders.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableFixedByteAlignedReaders.java
index 9e66b1f..6a09bd6 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableFixedByteAlignedReaders.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableFixedByteAlignedReaders.java
@@ -49,9 +49,9 @@ public class NullableFixedByteAlignedReaders {
static class NullableFixedByteAlignedReader<V extends ValueVector> extends NullableColumnReader<V> {
protected DrillBuf bytebuf;
- NullableFixedByteAlignedReader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor,
+ NullableFixedByteAlignedReader(ParquetRecordReader parentReader, ColumnDescriptor descriptor,
ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, V v, SchemaElement schemaElement) throws ExecutionSetupException {
- super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
+ super(parentReader, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
}
// this method is called by its superclass during a read loop
@@ -70,9 +70,9 @@ public class NullableFixedByteAlignedReaders {
* each value.
*/
static class NullableFixedBinaryReader extends NullableFixedByteAlignedReader<NullableVarBinaryVector> {
- NullableFixedBinaryReader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor,
+ NullableFixedBinaryReader(ParquetRecordReader parentReader, ColumnDescriptor descriptor,
ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, NullableVarBinaryVector v, SchemaElement schemaElement) throws ExecutionSetupException {
- super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
+ super(parentReader, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
}
@Override
@@ -109,9 +109,9 @@ public class NullableFixedByteAlignedReaders {
* impala timestamp values with nanoseconds precision (12 bytes). It reads such values as a drill timestamp (8 bytes).
*/
static class NullableFixedBinaryAsTimeStampReader extends NullableFixedByteAlignedReader<NullableTimeStampVector> {
- NullableFixedBinaryAsTimeStampReader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor,
+ NullableFixedBinaryAsTimeStampReader(ParquetRecordReader parentReader, ColumnDescriptor descriptor,
ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, NullableTimeStampVector v, SchemaElement schemaElement) throws ExecutionSetupException {
- super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
+ super(parentReader, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
}
@Override
@@ -135,10 +135,10 @@ public class NullableFixedByteAlignedReaders {
static class NullableDictionaryIntReader extends NullableColumnReader<NullableIntVector> {
- NullableDictionaryIntReader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor,
+ NullableDictionaryIntReader(ParquetRecordReader parentReader, ColumnDescriptor descriptor,
ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, NullableIntVector v,
SchemaElement schemaElement) throws ExecutionSetupException {
- super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
+ super(parentReader, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
}
// this method is called by its superclass during a read loop
@@ -161,10 +161,10 @@ public class NullableFixedByteAlignedReaders {
static class NullableDictionaryTimeReader extends NullableColumnReader<NullableTimeVector> {
- NullableDictionaryTimeReader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor,
+ NullableDictionaryTimeReader(ParquetRecordReader parentReader, ColumnDescriptor descriptor,
ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, NullableTimeVector v,
SchemaElement schemaElement) throws ExecutionSetupException {
- super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
+ super(parentReader, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
}
// this method is called by its superclass during a read loop
@@ -184,10 +184,10 @@ public class NullableFixedByteAlignedReaders {
static class NullableDictionaryBigIntReader extends NullableColumnReader<NullableBigIntVector> {
- NullableDictionaryBigIntReader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor,
+ NullableDictionaryBigIntReader(ParquetRecordReader parentReader, ColumnDescriptor descriptor,
ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, NullableBigIntVector v,
SchemaElement schemaElement) throws ExecutionSetupException {
- super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
+ super(parentReader, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
}
// this method is called by its superclass during a read loop
@@ -207,10 +207,10 @@ public class NullableFixedByteAlignedReaders {
static class NullableDictionaryTimeStampReader extends NullableColumnReader<NullableTimeStampVector> {
- NullableDictionaryTimeStampReader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor,
+ NullableDictionaryTimeStampReader(ParquetRecordReader parentReader, ColumnDescriptor descriptor,
ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, NullableTimeStampVector v,
SchemaElement schemaElement) throws ExecutionSetupException {
- super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
+ super(parentReader, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
}
// this method is called by its superclass during a read loop
@@ -230,10 +230,10 @@ public class NullableFixedByteAlignedReaders {
static class NullableDictionaryVarDecimalReader extends NullableColumnReader<NullableVarDecimalVector> {
- NullableDictionaryVarDecimalReader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor,
+ NullableDictionaryVarDecimalReader(ParquetRecordReader parentReader, ColumnDescriptor descriptor,
ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, NullableVarDecimalVector v,
SchemaElement schemaElement) throws ExecutionSetupException {
- super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
+ super(parentReader, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
}
// this method is called by its superclass during a read loop
@@ -276,10 +276,10 @@ public class NullableFixedByteAlignedReaders {
static class NullableDictionaryFloat4Reader extends NullableColumnReader<NullableFloat4Vector> {
- NullableDictionaryFloat4Reader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor,
+ NullableDictionaryFloat4Reader(ParquetRecordReader parentReader, ColumnDescriptor descriptor,
ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, NullableFloat4Vector v,
SchemaElement schemaElement) throws ExecutionSetupException {
- super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
+ super(parentReader, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
}
// this method is called by its superclass during a read loop
@@ -299,10 +299,10 @@ public class NullableFixedByteAlignedReaders {
static class NullableDictionaryFloat8Reader extends NullableColumnReader<NullableFloat8Vector> {
- NullableDictionaryFloat8Reader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor,
+ NullableDictionaryFloat8Reader(ParquetRecordReader parentReader, ColumnDescriptor descriptor,
ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, NullableFloat8Vector v,
SchemaElement schemaElement) throws ExecutionSetupException {
- super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
+ super(parentReader, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
}
// this method is called by its superclass during a read loop
@@ -324,9 +324,9 @@ public class NullableFixedByteAlignedReaders {
protected int dataTypeLengthInBytes;
- NullableConvertedReader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor,
+ NullableConvertedReader(ParquetRecordReader parentReader, ColumnDescriptor descriptor,
ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, V v, SchemaElement schemaElement) throws ExecutionSetupException {
- super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
+ super(parentReader, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
}
@Override
@@ -344,9 +344,9 @@ public class NullableFixedByteAlignedReaders {
}
public static class NullableDateReader extends NullableConvertedReader<NullableDateVector> {
- NullableDateReader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor, ColumnChunkMetaData columnChunkMetaData,
+ NullableDateReader(ParquetRecordReader parentReader, ColumnDescriptor descriptor, ColumnChunkMetaData columnChunkMetaData,
boolean fixedLength, NullableDateVector v, SchemaElement schemaElement) throws ExecutionSetupException {
- super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
+ super(parentReader, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
}
@Override
@@ -368,9 +368,9 @@ public class NullableFixedByteAlignedReaders {
*/
public static class NullableCorruptDateReader extends NullableConvertedReader<NullableDateVector> {
- NullableCorruptDateReader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor, ColumnChunkMetaData columnChunkMetaData,
+ NullableCorruptDateReader(ParquetRecordReader parentReader, ColumnDescriptor descriptor, ColumnChunkMetaData columnChunkMetaData,
boolean fixedLength, NullableDateVector v, SchemaElement schemaElement) throws ExecutionSetupException {
- super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
+ super(parentReader, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
}
@Override
@@ -395,11 +395,11 @@ public class NullableFixedByteAlignedReaders {
*/
public static class CorruptionDetectingNullableDateReader extends NullableConvertedReader<NullableDateVector> {
- CorruptionDetectingNullableDateReader(ParquetRecordReader parentReader, int allocateSize,
+ CorruptionDetectingNullableDateReader(ParquetRecordReader parentReader,
ColumnDescriptor descriptor, ColumnChunkMetaData columnChunkMetaData,
boolean fixedLength, NullableDateVector v, SchemaElement schemaElement)
throws ExecutionSetupException {
- super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
+ super(parentReader, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
}
@Override
@@ -420,9 +420,9 @@ public class NullableFixedByteAlignedReaders {
}
public static class NullableVarDecimalReader extends NullableConvertedReader<NullableVarDecimalVector> {
- NullableVarDecimalReader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor, ColumnChunkMetaData columnChunkMetaData,
+ NullableVarDecimalReader(ParquetRecordReader parentReader, ColumnDescriptor descriptor, ColumnChunkMetaData columnChunkMetaData,
boolean fixedLength, NullableVarDecimalVector v, SchemaElement schemaElement) throws ExecutionSetupException {
- super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
+ super(parentReader, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
}
// TODO: allow reading page instead of reading every record separately
@@ -445,9 +445,9 @@ public class NullableFixedByteAlignedReaders {
}
public static class NullableIntervalReader extends NullableConvertedReader<NullableIntervalVector> {
- NullableIntervalReader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor, ColumnChunkMetaData columnChunkMetaData,
+ NullableIntervalReader(ParquetRecordReader parentReader, ColumnDescriptor descriptor, ColumnChunkMetaData columnChunkMetaData,
boolean fixedLength, NullableIntervalVector v, SchemaElement schemaElement) throws ExecutionSetupException {
- super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
+ super(parentReader, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
}
@Override
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableVarLengthValuesColumn.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableVarLengthValuesColumn.java
index f236315..7608d17 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableVarLengthValuesColumn.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableVarLengthValuesColumn.java
@@ -33,10 +33,10 @@ public abstract class NullableVarLengthValuesColumn<V extends ValueVector> exten
int nullsRead;
boolean currentValNull = false;
- NullableVarLengthValuesColumn(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor,
+ NullableVarLengthValuesColumn(ParquetRecordReader parentReader, ColumnDescriptor descriptor,
ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, V v,
SchemaElement schemaElement) throws ExecutionSetupException {
- super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
+ super(parentReader, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
}
@Override
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetColumnMetadata.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetColumnMetadata.java
index d49a416..147938d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetColumnMetadata.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetColumnMetadata.java
@@ -106,7 +106,7 @@ public class ParquetColumnMetadata {
*
* @return the length if fixed width, else <tt>UNDEFINED_LENGTH</tt> (-1)
*/
- private int getDataTypeLength() {
+ public int getDataTypeLength() {
if (! isFixedLength()) {
return UNDEFINED_LENGTH;
} else if (isRepeated()) {
@@ -126,29 +126,33 @@ public class ParquetColumnMetadata {
return column.getMaxRepetitionLevel() > 0;
}
+ public MaterializedField getField() {
+ return field;
+ }
+
ValueVector buildVector(OutputMutator output) throws SchemaChangeException {
Class<? extends ValueVector> vectorClass = TypeHelper.getValueVectorClass(type.getMinorType(), type.getMode());
vector = output.addField(field, vectorClass);
return vector;
}
- ColumnReader<?> makeFixedWidthReader(ParquetRecordReader reader, int recordsPerBatch) throws Exception {
+ ColumnReader<?> makeFixedWidthReader(ParquetRecordReader reader) throws Exception {
return ColumnReaderFactory.createFixedColumnReader(reader, true,
- column, columnChunkMetaData, recordsPerBatch, vector, se);
+ column, columnChunkMetaData, vector, se);
}
@SuppressWarnings("resource")
- FixedWidthRepeatedReader makeRepeatedFixedWidthReader(ParquetRecordReader reader, int recordsPerBatch) throws Exception {
+ FixedWidthRepeatedReader makeRepeatedFixedWidthReader(ParquetRecordReader reader) throws Exception {
final RepeatedValueVector repeatedVector = RepeatedValueVector.class.cast(vector);
ColumnReader<?> dataReader = ColumnReaderFactory.createFixedColumnReader(reader, true,
- column, columnChunkMetaData, recordsPerBatch,
+ column, columnChunkMetaData,
repeatedVector.getDataVector(), se);
return new FixedWidthRepeatedReader(reader, dataReader,
- getTypeLengthInBits(column.getType()), UNDEFINED_LENGTH, column, columnChunkMetaData, false, repeatedVector, se);
+ getTypeLengthInBits(column.getType()), column, columnChunkMetaData, false, repeatedVector, se);
}
VarLengthValuesColumn<?> makeVariableWidthReader(ParquetRecordReader reader) throws ExecutionSetupException {
- return ColumnReaderFactory.getReader(reader, UNDEFINED_LENGTH, column, columnChunkMetaData, false, vector, se);
+ return ColumnReaderFactory.getReader(reader, column, columnChunkMetaData, false, vector, se);
}
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetFixedWidthDictionaryReaders.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetFixedWidthDictionaryReaders.java
index 38ab700..171983e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetFixedWidthDictionaryReaders.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetFixedWidthDictionaryReaders.java
@@ -42,10 +42,10 @@ public class ParquetFixedWidthDictionaryReaders {
private static final double BITS_COUNT_IN_BYTE_DOUBLE_VALUE = 8.0;
static class DictionaryIntReader extends FixedByteAlignedReader<IntVector> {
- DictionaryIntReader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor,
+ DictionaryIntReader(ParquetRecordReader parentReader, ColumnDescriptor descriptor,
ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, IntVector v,
SchemaElement schemaElement) throws ExecutionSetupException {
- super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
+ super(parentReader, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
}
// this method is called by its superclass during a read loop
@@ -67,10 +67,10 @@ public class ParquetFixedWidthDictionaryReaders {
* This class uses for reading unsigned integer fields.
*/
static class DictionaryUInt4Reader extends FixedByteAlignedReader<UInt4Vector> {
- DictionaryUInt4Reader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor,
+ DictionaryUInt4Reader(ParquetRecordReader parentReader, ColumnDescriptor descriptor,
ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, UInt4Vector v,
SchemaElement schemaElement) throws ExecutionSetupException {
- super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
+ super(parentReader, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
}
// this method is called by its superclass during a read loop
@@ -99,10 +99,10 @@ public class ParquetFixedWidthDictionaryReaders {
}
static class DictionaryFixedBinaryReader extends FixedByteAlignedReader<VarBinaryVector> {
- DictionaryFixedBinaryReader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor,
+ DictionaryFixedBinaryReader(ParquetRecordReader parentReader, ColumnDescriptor descriptor,
ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, VarBinaryVector v,
SchemaElement schemaElement) throws ExecutionSetupException {
- super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
+ super(parentReader, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
}
// this method is called by its superclass during a read loop
@@ -141,10 +141,10 @@ public class ParquetFixedWidthDictionaryReaders {
}
static class DictionaryTimeReader extends FixedByteAlignedReader<TimeVector> {
- DictionaryTimeReader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor,
+ DictionaryTimeReader(ParquetRecordReader parentReader, ColumnDescriptor descriptor,
ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, TimeVector v,
SchemaElement schemaElement) throws ExecutionSetupException {
- super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
+ super(parentReader, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
}
// this method is called by its superclass during a read loop
@@ -163,10 +163,10 @@ public class ParquetFixedWidthDictionaryReaders {
}
static class DictionaryBigIntReader extends FixedByteAlignedReader<BigIntVector> {
- DictionaryBigIntReader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor,
+ DictionaryBigIntReader(ParquetRecordReader parentReader, ColumnDescriptor descriptor,
ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, BigIntVector v,
SchemaElement schemaElement) throws ExecutionSetupException {
- super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
+ super(parentReader, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
}
// this method is called by its superclass during a read loop
@@ -198,10 +198,10 @@ public class ParquetFixedWidthDictionaryReaders {
* This class uses for reading unsigned BigInt fields.
*/
static class DictionaryUInt8Reader extends FixedByteAlignedReader<UInt8Vector> {
- DictionaryUInt8Reader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor,
+ DictionaryUInt8Reader(ParquetRecordReader parentReader, ColumnDescriptor descriptor,
ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, UInt8Vector v,
SchemaElement schemaElement) throws ExecutionSetupException {
- super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
+ super(parentReader, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
}
// this method is called by its superclass during a read loop
@@ -231,10 +231,10 @@ public class ParquetFixedWidthDictionaryReaders {
static class DictionaryVarDecimalReader extends FixedByteAlignedReader<VarDecimalVector> {
- DictionaryVarDecimalReader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor,
+ DictionaryVarDecimalReader(ParquetRecordReader parentReader, ColumnDescriptor descriptor,
ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, VarDecimalVector v,
SchemaElement schemaElement) throws ExecutionSetupException {
- super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
+ super(parentReader, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
}
// this method is called by its superclass during a read loop
@@ -288,10 +288,10 @@ public class ParquetFixedWidthDictionaryReaders {
}
static class DictionaryTimeStampReader extends FixedByteAlignedReader<TimeStampVector> {
- DictionaryTimeStampReader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor,
+ DictionaryTimeStampReader(ParquetRecordReader parentReader, ColumnDescriptor descriptor,
ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, TimeStampVector v,
SchemaElement schemaElement) throws ExecutionSetupException {
- super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
+ super(parentReader, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
}
// this method is called by its superclass during a read loop
@@ -312,10 +312,10 @@ public class ParquetFixedWidthDictionaryReaders {
}
static class DictionaryBinaryAsTimeStampReader extends FixedByteAlignedReader<TimeStampVector> {
- DictionaryBinaryAsTimeStampReader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor,
+ DictionaryBinaryAsTimeStampReader(ParquetRecordReader parentReader, ColumnDescriptor descriptor,
ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, TimeStampVector v,
SchemaElement schemaElement) throws ExecutionSetupException {
- super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
+ super(parentReader, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
}
// this method is called by its superclass during a read loop
@@ -337,10 +337,10 @@ public class ParquetFixedWidthDictionaryReaders {
}
static class DictionaryFloat4Reader extends FixedByteAlignedReader<Float4Vector> {
- DictionaryFloat4Reader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor,
+ DictionaryFloat4Reader(ParquetRecordReader parentReader, ColumnDescriptor descriptor,
ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, Float4Vector v,
SchemaElement schemaElement) throws ExecutionSetupException {
- super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
+ super(parentReader, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
}
// this method is called by its superclass during a read loop
@@ -356,10 +356,10 @@ public class ParquetFixedWidthDictionaryReaders {
}
static class DictionaryFloat8Reader extends FixedByteAlignedReader<Float8Vector> {
- DictionaryFloat8Reader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor,
+ DictionaryFloat8Reader(ParquetRecordReader parentReader, ColumnDescriptor descriptor,
ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, Float8Vector v,
SchemaElement schemaElement) throws ExecutionSetupException {
- super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
+ super(parentReader, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
}
// this method is called by its superclass during a read loop
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java
index 712bbce..e1ca73f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java
@@ -17,10 +17,11 @@
*/
package org.apache.drill.exec.store.parquet.columnreaders;
+import com.google.common.base.Stopwatch;
+import com.google.common.collect.ImmutableList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
-
import org.apache.drill.common.exceptions.DrillRuntimeException;
import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.common.expression.SchemaPath;
@@ -33,25 +34,17 @@ import org.apache.drill.exec.physical.impl.OutputMutator;
import org.apache.drill.exec.store.AbstractRecordReader;
import org.apache.drill.exec.store.parquet.ParquetReaderStats;
import org.apache.drill.exec.store.parquet.ParquetReaderUtility;
-import org.apache.drill.exec.vector.AllocationHelper;
+import org.apache.drill.exec.store.parquet.columnreaders.batchsizing.RecordBatchSizerManager;
import org.apache.drill.exec.vector.ValueVector;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.hadoop.CodecFactory;
import org.apache.parquet.hadoop.metadata.ParquetMetadata;
-import com.google.common.base.Stopwatch;
-import com.google.common.collect.ImmutableList;
-
public class ParquetRecordReader extends AbstractRecordReader {
private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ParquetRecordReader.class);
- // this value has been inflated to read in multiple value vectors at once, and then break them up into smaller vectors
- private static final int NUMBER_OF_VECTORS = 1;
- private static final long DEFAULT_BATCH_LENGTH = 256 * 1024 * NUMBER_OF_VECTORS; // 256kb
- private static final long DEFAULT_BATCH_LENGTH_IN_BITS = DEFAULT_BATCH_LENGTH * 8; // 256kb
- static final char DEFAULT_RECORDS_TO_READ_IF_VARIABLE_WIDTH = 32*1024; // 32K
- static final int DEFAULT_RECORDS_TO_READ_IF_FIXED_WIDTH = 64*1024 - 1; // 64K - 1, max SV2 can address
+ /** Set when caller wants to read all the rows contained within the Parquet file */
static final int NUM_RECORDS_TO_READ_NOT_SPECIFIED = -1;
// When no column is required by the downstream operator, ask SCAN to return a DEFAULT column. If such column does not exist,
@@ -69,8 +62,7 @@ public class ParquetRecordReader extends AbstractRecordReader {
private OperatorContext operatorContext;
private FileSystem fileSystem;
- private final long batchSize;
- private long numRecordsToRead; // number of records to read
+ private final long numRecordsToRead; // number of records to read
Path hadoopPath;
private ParquetMetadata footer;
@@ -80,8 +72,12 @@ public class ParquetRecordReader extends AbstractRecordReader {
private final FragmentContext fragmentContext;
ParquetReaderUtility.DateCorruptionStatus dateCorruptionStatus;
+ /** Parquet Schema */
ParquetSchema schema;
+ /** Container object for holding Parquet columnar readers state */
ReadState readState;
+ /** Responsible for managing record batch size constraints */
+ RecordBatchSizerManager batchSizerMgr;
public boolean useAsyncColReader;
public boolean useAsyncPageReader;
@@ -134,7 +130,7 @@ public class ParquetRecordReader extends AbstractRecordReader {
ParquetMetadata footer,
List<SchemaPath> columns,
ParquetReaderUtility.DateCorruptionStatus dateCorruptionStatus) throws ExecutionSetupException {
- this(fragmentContext, DEFAULT_BATCH_LENGTH_IN_BITS, numRecordsToRead,
+ this(fragmentContext, numRecordsToRead,
path, rowGroupIndex, fs, codecFactory, footer, columns, dateCorruptionStatus);
}
@@ -147,13 +143,12 @@ public class ParquetRecordReader extends AbstractRecordReader {
List<SchemaPath> columns,
ParquetReaderUtility.DateCorruptionStatus dateCorruptionStatus)
throws ExecutionSetupException {
- this(fragmentContext, DEFAULT_BATCH_LENGTH_IN_BITS, footer.getBlocks().get(rowGroupIndex).getRowCount(),
+ this(fragmentContext, footer.getBlocks().get(rowGroupIndex).getRowCount(),
path, rowGroupIndex, fs, codecFactory, footer, columns, dateCorruptionStatus);
}
public ParquetRecordReader(
FragmentContext fragmentContext,
- long batchSize,
long numRecordsToRead,
String path,
int rowGroupIndex,
@@ -168,11 +163,10 @@ public class ParquetRecordReader extends AbstractRecordReader {
this.fileSystem = fs;
this.codecFactory = codecFactory;
this.rowGroupIndex = rowGroupIndex;
- this.batchSize = batchSize;
this.footer = footer;
this.dateCorruptionStatus = dateCorruptionStatus;
this.fragmentContext = fragmentContext;
- this.numRecordsToRead = numRecordsToRead;
+ this.numRecordsToRead = initNumRecordsToRead(numRecordsToRead, rowGroupIndex, footer);
this.useAsyncColReader = fragmentContext.getOptions().getOption(ExecConstants.PARQUET_COLUMNREADER_ASYNC).bool_val;
this.useAsyncPageReader = fragmentContext.getOptions().getOption(ExecConstants.PARQUET_PAGEREADER_ASYNC).bool_val;
this.useBufferedReader = fragmentContext.getOptions().getOption(ExecConstants.PARQUET_PAGEREADER_USE_BUFFERED_READ).bool_val;
@@ -215,8 +209,8 @@ public class ParquetRecordReader extends AbstractRecordReader {
return schema.getBitWidthAllFixedFields();
}
- public long getBatchSize() {
- return batchSize;
+ public RecordBatchSizerManager getBatchSizesMgr() {
+ return batchSizerMgr;
}
public OperatorContext getOperatorContext() {
@@ -242,18 +236,19 @@ public class ParquetRecordReader extends AbstractRecordReader {
* contains at least one variable-width field, or is a "mock" scan consisting
* only of null fields (fields in the SELECT clause but not in the Parquet file.)
*/
-
@Override
public void setup(OperatorContext operatorContext, OutputMutator output) throws ExecutionSetupException {
this.operatorContext = operatorContext;
schema = new ParquetSchema(fragmentContext.getOptions(), rowGroupIndex, footer, isStarQuery() ? null : getColumns());
+ batchSizerMgr = new RecordBatchSizerManager(fragmentContext.getOptions(), schema, numRecordsToRead);
logger.debug("Reading row group({}) with {} records in file {}.", rowGroupIndex, footer.getBlocks().get(rowGroupIndex).getRowCount(),
hadoopPath.toUri().getPath());
try {
- schema.buildSchema(batchSize);
- readState = new ReadState(schema, parquetReaderStats, numRecordsToRead, useAsyncColReader);
+ schema.buildSchema();
+ batchSizerMgr.setup();
+ readState = new ReadState(schema, batchSizerMgr, parquetReaderStats, numRecordsToRead, useAsyncColReader);
readState.buildReader(this, output);
} catch (Exception e) {
throw handleException("Failure in setting up reader", e);
@@ -277,21 +272,13 @@ public class ParquetRecordReader extends AbstractRecordReader {
@Override
public void allocate(Map<String, ValueVector> vectorMap) throws OutOfMemoryException {
- try {
- int recordsPerBatch = schema.getRecordsPerBatch();
- for (final ValueVector v : vectorMap.values()) {
- AllocationHelper.allocate(v, recordsPerBatch, 50, 10);
- }
- } catch (NullPointerException e) {
- throw new OutOfMemoryException();
- }
+ batchSizerMgr.allocate(vectorMap);
}
/**
* Read the next record batch from the file using the reader and read state
* created previously.
*/
-
@Override
public int next() {
readState.resetBatch();
@@ -314,14 +301,25 @@ public class ParquetRecordReader extends AbstractRecordReader {
logger.debug("Read {} records out of row group({}) in file '{}'",
recordsRead, rowGroupIndex,
hadoopPath.toUri().getPath());
+
// enable this for debugging when it is know that a whole file will be read
// limit kills upstream operators once it has enough records, so this assert will fail
-// assert totalRecordsRead == footer.getBlocks().get(rowGroupIndex).getRowCount();
+ // assert totalRecordsRead == footer.getBlocks().get(rowGroupIndex).getRowCount();
+
+ // NOTE - We check whether the parquet reader data structure is not null before calling close();
+ // this is because close() can be invoked before setup() has executed (probably because of
+ // spurious failures).
+
if (readState != null) {
readState.close();
readState = null;
}
+ if (batchSizerMgr != null) {
+ batchSizerMgr.close();
+ batchSizerMgr = null;
+ }
+
codecFactory.release();
if (parquetReaderStats != null) {
@@ -339,4 +337,14 @@ public class ParquetRecordReader extends AbstractRecordReader {
protected List<SchemaPath> getDefaultColumnsToRead() {
return DEFAULT_COLS_TO_READ;
}
+
+ private int initNumRecordsToRead(long numRecordsToRead, int rowGroupIndex, ParquetMetadata footer) {
+ // Callers can pass -1 if they want to read all rows.
+ if (numRecordsToRead == NUM_RECORDS_TO_READ_NOT_SPECIFIED) {
+ return (int) footer.getBlocks().get(rowGroupIndex).getRowCount();
+ } else {
+ assert (numRecordsToRead >= 0);
+ return (int) Math.min(numRecordsToRead, footer.getBlocks().get(rowGroupIndex).getRowCount());
+ }
+ }
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetSchema.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetSchema.java
index 3935919..6717445 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetSchema.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetSchema.java
@@ -48,7 +48,7 @@ import com.google.common.collect.Lists;
* to the schema that Drill and the Parquet reader uses.
*/
-public class ParquetSchema {
+public final class ParquetSchema {
/**
* Set of columns specified in the SELECT clause. Will be null for
* a SELECT * query.
@@ -74,7 +74,6 @@ public class ParquetSchema {
private int bitWidthAllFixedFields;
private boolean allFieldsFixedLength;
private long groupRecordCount;
- private int recordsPerBatch;
/**
* Build the Parquet schema. The schema can be based on a "SELECT *",
@@ -108,22 +107,13 @@ public class ParquetSchema {
* Build the schema for this read as a combination of the schema specified in
* the Parquet footer and the list of columns selected in the query.
*
- * @param batchSize target size of the batch, in rows
* @throws Exception if anything goes wrong
*/
- public void buildSchema(long batchSize) throws Exception {
+ public void buildSchema() throws Exception {
groupRecordCount = footer.getBlocks().get(rowGroupIndex).getRowCount();
loadParquetSchema();
computeFixedPart();
-
- if (! selectedColumnMetadata.isEmpty() && allFieldsFixedLength) {
- recordsPerBatch = (int) Math.min(Math.min(batchSize / bitWidthAllFixedFields,
- footer.getBlocks().get(0).getColumns().get(0).getValueCount()), ParquetRecordReader.DEFAULT_RECORDS_TO_READ_IF_FIXED_WIDTH);
- }
- else {
- recordsPerBatch = ParquetRecordReader.DEFAULT_RECORDS_TO_READ_IF_VARIABLE_WIDTH;
- }
}
/**
@@ -168,7 +158,6 @@ public class ParquetSchema {
public boolean isStarQuery() { return selectedCols == null; }
public ParquetMetadata footer() { return footer; }
public int getBitWidthAllFixedFields() { return bitWidthAllFixedFields; }
- public int getRecordsPerBatch() { return recordsPerBatch; }
public boolean allFieldsFixedLength() { return allFieldsFixedLength; }
public List<ParquetColumnMetadata> getColumnMetadata() { return selectedColumnMetadata; }
@@ -195,9 +184,6 @@ public class ParquetSchema {
*/
private boolean fieldSelected(MaterializedField field) {
- // TODO - not sure if this is how we want to represent this
- // for now it makes the existing tests pass, simply selecting
- // all available data if no columns are provided
if (isStarQuery()) {
return true;
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ReadState.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ReadState.java
index f94edf1..c545862 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ReadState.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ReadState.java
@@ -24,6 +24,7 @@ import java.util.Map;
import org.apache.drill.exec.physical.impl.OutputMutator;
import org.apache.drill.exec.store.parquet.ParquetReaderStats;
+import org.apache.drill.exec.store.parquet.columnreaders.batchsizing.RecordBatchSizerManager;
import org.apache.drill.exec.vector.NullableIntVector;
import org.apache.drill.exec.vector.ValueVector;
import org.apache.parquet.column.ColumnDescriptor;
@@ -39,7 +40,10 @@ import org.apache.parquet.hadoop.metadata.BlockMetaData;
*/
public class ReadState {
+ /** The Parquet Schema */
private final ParquetSchema schema;
+ /** Responsible for managing record batch size constraints */
+ private final RecordBatchSizerManager batchSizerMgr;
private final ParquetReaderStats parquetReaderStats;
private VarLenBinaryReader varLengthReader;
/**
@@ -48,8 +52,12 @@ public class ReadState {
* that need only have their value count set at the end of each call to next(), as the values default to null.
*/
private List<NullableIntVector> nullFilledVectors;
- private List<ColumnReader<?>> columnReaders = new ArrayList<>();
- private long numRecordsToRead; // number of records to read
+ private List<ColumnReader<?>> fixedLenColumnReaders = new ArrayList<>();
+ private final long totalNumRecordsToRead; // number of records to read
+
+ // counter for the values that have been read in this pass (a single call to the next() method)
+ private int valuesReadInCurrentBatch;
+
/**
* Keeps track of the number of records read thus far.
* <p>
@@ -60,19 +68,29 @@ public class ReadState {
private long totalRecordsRead;
private boolean useAsyncColReader;
- public ReadState(ParquetSchema schema, ParquetReaderStats parquetReaderStats, long numRecordsToRead, boolean useAsyncColReader) {
+ public ReadState(ParquetSchema schema,
+ RecordBatchSizerManager batchSizerMgr, ParquetReaderStats parquetReaderStats, long numRecordsToRead,
+ boolean useAsyncColReader) {
+
this.schema = schema;
+ this.batchSizerMgr = batchSizerMgr;
this.parquetReaderStats = parquetReaderStats;
this.useAsyncColReader = useAsyncColReader;
if (! schema.isStarQuery()) {
nullFilledVectors = new ArrayList<>();
}
+
+ // Because of JIRA DRILL-6528, the Parquet reader is sometimes getting the wrong
+ // number of rows to read. For now, returning all a file data (till
+ // downstream operator stop consuming).
+ numRecordsToRead = -1;
+
// Callers can pass -1 if they want to read all rows.
if (numRecordsToRead == ParquetRecordReader.NUM_RECORDS_TO_READ_NOT_SPECIFIED) {
- this.numRecordsToRead = schema.getGroupRecordCount();
+ this.totalNumRecordsToRead = schema.getGroupRecordCount();
} else {
assert (numRecordsToRead >= 0);
- this.numRecordsToRead = Math.min(numRecordsToRead, schema.getGroupRecordCount());
+ this.totalNumRecordsToRead = Math.min(numRecordsToRead, schema.getGroupRecordCount());
}
}
@@ -99,10 +117,10 @@ public class ReadState {
// create a reader and add it to the appropriate list
varLengthColumns.add(columnMetadata.makeVariableWidthReader(reader));
} else if (columnMetadata.isRepeated()) {
- varLengthColumns.add(columnMetadata.makeRepeatedFixedWidthReader(reader, schema.getRecordsPerBatch()));
+ varLengthColumns.add(columnMetadata.makeRepeatedFixedWidthReader(reader));
}
else {
- columnReaders.add(columnMetadata.makeFixedWidthReader(reader, schema.getRecordsPerBatch()));
+ fixedLenColumnReaders.add(columnMetadata.makeFixedWidthReader(reader));
}
}
varLengthReader = new VarLenBinaryReader(reader, varLengthColumns);
@@ -119,8 +137,8 @@ public class ReadState {
*/
public ColumnReader<?> getFirstColumnReader() {
- if (columnReaders.size() > 0) {
- return columnReaders.get(0);
+ if (fixedLenColumnReaders.size() > 0) {
+ return fixedLenColumnReaders.get(0);
}
else if (varLengthReader.columns.size() > 0) {
return varLengthReader.columns.get(0);
@@ -130,23 +148,48 @@ public class ReadState {
}
public void resetBatch() {
- for (final ColumnReader<?> column : columnReaders) {
+ for (final ColumnReader<?> column : fixedLenColumnReaders) {
column.valuesReadInCurrentPass = 0;
}
for (final VarLengthColumn<?> r : varLengthReader.columns) {
r.valuesReadInCurrentPass = 0;
}
+ setValuesReadInCurrentPass(0);
}
public ParquetSchema schema() { return schema; }
- public List<ColumnReader<?>> getColumnReaders() { return columnReaders; }
+ public RecordBatchSizerManager batchSizerMgr() { return batchSizerMgr; }
+ public List<ColumnReader<?>> getFixedLenColumnReaders() { return fixedLenColumnReaders; }
public long recordsRead() { return totalRecordsRead; }
public VarLenBinaryReader varLengthReader() { return varLengthReader; }
- public long getRecordsToRead() { return numRecordsToRead; }
+ public long getTotalRecordsToRead() { return totalNumRecordsToRead; }
public boolean useAsyncColReader() { return useAsyncColReader; }
public ParquetReaderStats parquetReaderStats() { return parquetReaderStats; }
/**
+ * @return values read within the latest batch
+ */
+ public int getValuesReadInCurrentPass() {
+ return valuesReadInCurrentBatch;
+ }
+
+ /**
+ * @return remaining values to read
+ */
+ public int getRemainingValuesToRead() {
+ assert totalNumRecordsToRead >= totalRecordsRead;
+ return (int) (totalNumRecordsToRead - totalRecordsRead);
+ }
+
+ /**
+ * @param valuesReadInCurrentBatch the valuesReadInCurrentBatch to set
+ */
+ public void setValuesReadInCurrentPass(int valuesReadInCurrentBatch) {
+ this.valuesReadInCurrentBatch = valuesReadInCurrentBatch;
+ }
+
+
+ /**
* When the SELECT clause references columns that do not exist in the Parquet
* file, we don't issue an error; instead we simply make up a column and
* fill it with nulls. This method does the work of null-filling the made-up
@@ -170,16 +213,15 @@ public class ReadState {
public void updateCounts(int readCount) {
totalRecordsRead += readCount;
- numRecordsToRead -= readCount;
}
public void close() {
- if (columnReaders != null) {
- for (final ColumnReader<?> column : columnReaders) {
+ if (fixedLenColumnReaders != null) {
+ for (final ColumnReader<?> column : fixedLenColumnReaders) {
column.clear();
}
- columnReaders.clear();
- columnReaders = null;
+ fixedLenColumnReaders.clear();
+ fixedLenColumnReaders = null;
}
if (varLengthReader != null) {
for (final VarLengthColumn<? extends ValueVector> r : varLengthReader.columns) {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenAbstractEntryReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenAbstractEntryReader.java
index f6fab66..3c9610a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenAbstractEntryReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenAbstractEntryReader.java
@@ -17,24 +17,20 @@
*/
package org.apache.drill.exec.store.parquet.columnreaders;
+import io.netty.buffer.DrillBuf;
import java.nio.ByteBuffer;
-import org.apache.drill.exec.store.parquet.columnreaders.VarLenColumnBulkInput.ColumnPrecisionInfo;
-import org.apache.drill.exec.store.parquet.columnreaders.VarLenColumnBulkInput.PageDataInfo;
-
-import io.netty.buffer.DrillBuf;
+import org.apache.drill.exec.store.parquet.columnreaders.VarLenColumnBulkInput.VarLenColumnBulkInputCallback;
/** Abstract class for sub-classes implementing several algorithms for loading a Bulk Entry */
abstract class VarLenAbstractEntryReader {
/** byte buffer used for buffering page data */
protected final ByteBuffer buffer;
- /** Page Data Information */
- protected final PageDataInfo pageInfo;
- /** expected precision type: fixed or variable length */
- protected final ColumnPrecisionInfo columnPrecInfo;
/** Bulk entry */
protected final VarLenColumnBulkEntry entry;
+ /** A callback to allow bulk readers interact with their container */
+ private final VarLenColumnBulkInputCallback containerCallback;
/**
* CTOR.
@@ -44,14 +40,12 @@ abstract class VarLenAbstractEntryReader {
* @param entry reusable bulk entry object
*/
VarLenAbstractEntryReader(ByteBuffer buffer,
- PageDataInfo pageInfo,
- ColumnPrecisionInfo columnPrecInfo,
- VarLenColumnBulkEntry entry) {
+ VarLenColumnBulkEntry entry,
+ VarLenColumnBulkInputCallback containerCallback) {
this.buffer = buffer;
- this.pageInfo = pageInfo;
- this.columnPrecInfo = columnPrecInfo;
this.entry = entry;
+ this.containerCallback = containerCallback;
}
/**
@@ -61,53 +55,14 @@ abstract class VarLenAbstractEntryReader {
abstract VarLenColumnBulkEntry getEntry(int valsToReadWithinPage);
/**
- * Indicates whether to use bulk processing
- */
- protected final boolean bulkProcess() {
- return columnPrecInfo.bulkProcess;
- }
-
- /**
- * Loads new data into the buffer if empty or the force flag is set.
- *
- * @param force flag to force loading new data into the buffer
- */
- protected final boolean load(boolean force) {
-
- if (!force && buffer.hasRemaining()) {
- return true; // NOOP
- }
-
- // We're here either because the buffer is empty or we want to force a new load operation.
- // In the case of force, there might be unprocessed data (still in the buffer) which is fine
- // since the caller updates the page data buffer's offset only for the data it has consumed; this
- // means unread data will be loaded again but this time will be positioned in the beginning of the
- // buffer. This can happen only for the last entry in the buffer when either of its length or value
- // is incomplete.
- buffer.clear();
-
- final int bufferCapacity = VarLenBulkPageReader.BUFF_SZ;
- final int remaining = remainingPageData();
- final int toCopy = remaining > bufferCapacity ? bufferCapacity : remaining;
-
- if (toCopy == 0) {
- return false;
- }
-
- pageInfo.pageData.getBytes(pageInfo.pageDataOff, buffer.array(), buffer.position(), toCopy);
-
- buffer.limit(toCopy);
-
- // At this point the buffer position is 0 and its limit set to the number of bytes copied.
-
- return true;
- }
-
- /**
- * @return remaining data in current page
+ * @param newBitsMemory new "bits" memory size
+ * @param newOffsetsMemory new "offsets" memory size
+ * @param newDataMemory new "data" memory size
+ * @return true if the new payload ("bits", "offsets", "data") will trigger a constraint violation; false
+ * otherwise
*/
- protected final int remainingPageData() {
- return pageInfo.pageDataLen - pageInfo.pageDataOff;
+ protected boolean batchMemoryConstraintsReached(int newBitsMemory, int newOffsetsMemory, int newDataMemory) {
+ return containerCallback.batchMemoryConstraintsReached(newBitsMemory, newOffsetsMemory, newDataMemory);
}
/**
@@ -115,7 +70,7 @@ abstract class VarLenAbstractEntryReader {
* @param pos start position
* @return an integer encoded as a low endian
*/
- protected final int getInt(final byte[] buff, final int pos) {
+ static final int getInt(final byte[] buff, final int pos) {
return DrillBuf.getInt(buff, pos);
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenAbstractPageEntryReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenAbstractPageEntryReader.java
new file mode 100644
index 0000000..fecf1ce
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenAbstractPageEntryReader.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.parquet.columnreaders;
+
+import java.nio.ByteBuffer;
+import org.apache.drill.exec.store.parquet.columnreaders.VarLenColumnBulkInput.VarLenColumnBulkInputCallback;
+import org.apache.drill.exec.store.parquet.columnreaders.VarLenColumnBulkInput.ColumnPrecisionInfo;
+import org.apache.drill.exec.store.parquet.columnreaders.VarLenColumnBulkInput.PageDataInfo;
+
+/** Abstract class for sub-classes implementing several strategies for loading a Bulk Entry from a Parquet page */
+abstract class VarLenAbstractPageEntryReader extends VarLenAbstractEntryReader {
+
+ protected final PageDataInfo pageInfo;
+ /** expected precision type: fixed or variable length */
+ protected final ColumnPrecisionInfo columnPrecInfo;
+
+ /**
+ * CTOR.
+ * @param _buffer byte buffer for data buffering (within CPU cache)
+ * @param _pageInfo page being processed information
+ * @param _columnPrecInfo column precision information
+ * @param _entry reusable bulk entry object
+ */
+ VarLenAbstractPageEntryReader(ByteBuffer _buffer,
+ PageDataInfo _pageInfo,
+ ColumnPrecisionInfo _columnPrecInfo,
+ VarLenColumnBulkEntry _entry,
+ VarLenColumnBulkInputCallback _containerCallback) {
+
+ super(_buffer, _entry, _containerCallback);
+
+ this.pageInfo = _pageInfo;
+ this.columnPrecInfo = _columnPrecInfo;
+ }
+
+ /**
+ * Indicates whether to use bulk processing
+ */
+ protected final boolean bulkProcess() {
+ return columnPrecInfo.bulkProcess;
+ }
+
+ /**
+ * Loads new data into the buffer if empty or the force flag is set.
+ *
+ * @param force flag to force loading new data into the buffer
+ */
+ protected final boolean load(boolean force) {
+
+ if (!force && buffer.hasRemaining()) {
+ return true; // NOOP
+ }
+
+ // We're here either because the buffer is empty or we want to force a new load operation.
+ // In the case of force, there might be unprocessed data (still in the buffer) which is fine
+ // since the caller updates the page data buffer's offset only for the data it has consumed; this
+ // means unread data will be loaded again but this time will be positioned in the beginning of the
+ // buffer. This can happen only for the last entry in the buffer when either of its length or value
+ // is incomplete.
+ buffer.clear();
+
+ int remaining = remainingPageData();
+ int bufferCapacity = buffer.capacity() - VarLenBulkPageReader.PADDING;
+ int toCopy = remaining > bufferCapacity ? bufferCapacity : remaining;
+
+ if (toCopy == 0) {
+ return false;
+ }
+
+ pageInfo.pageData.getBytes(pageInfo.pageDataOff, buffer.array(), buffer.position(), toCopy);
+
+ buffer.limit(toCopy);
+
+ // At this point the buffer position is 0 and its limit set to the number of bytes copied.
+
+ return true;
+ }
+
+ /**
+ * @return remaining data in current page
+ */
+ protected final int remainingPageData() {
+ return pageInfo.pageDataLen - pageInfo.pageDataOff;
+ }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenBinaryReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenBinaryReader.java
index 8156db1..7bdc33e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenBinaryReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenBinaryReader.java
@@ -19,39 +19,43 @@ package org.apache.drill.exec.store.parquet.columnreaders;
import com.google.common.base.Stopwatch;
import com.google.common.collect.Lists;
-import org.apache.drill.common.exceptions.DrillRuntimeException;
-import org.apache.drill.exec.vector.ValueVector;
-
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
import java.util.List;
+import java.util.Map;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
+import org.apache.drill.common.exceptions.DrillRuntimeException;
+import org.apache.drill.exec.store.parquet.columnreaders.batchsizing.RecordBatchOverflow;
+import org.apache.drill.exec.store.parquet.columnreaders.batchsizing.RecordBatchOverflow.FieldOverflowDefinition;
+import org.apache.drill.exec.store.parquet.columnreaders.batchsizing.RecordBatchSizerManager;
+import org.apache.drill.exec.store.parquet.columnreaders.batchsizing.RecordBatchSizerManager.FieldOverflowState;
+import org.apache.drill.exec.store.parquet.columnreaders.batchsizing.RecordBatchSizerManager.FieldOverflowStateContainer;
+import org.apache.drill.exec.store.parquet.columnreaders.batchsizing.RecordBatchSizerManager.VarLenColumnBatchStats;
+import org.apache.drill.exec.vector.ValueVector;
+/** Class which handles reading a batch of rows from a set of variable columns */
public class VarLenBinaryReader {
+ private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(VarLenBinaryReader.class);
- ParquetRecordReader parentReader;
+ final ParquetRecordReader parentReader;
+ final RecordBatchSizerManager batchSizer;
final List<VarLengthColumn<? extends ValueVector>> columns;
+ /** Sorting columns to minimize overflow */
+ final List<VLColumnContainer> orderedColumns;
+ private final Comparator<VLColumnContainer> comparator = new VLColumnComparator();
final boolean useAsyncTasks;
- private final long targetRecordCount;
private final boolean useBulkReader;
public VarLenBinaryReader(ParquetRecordReader parentReader, List<VarLengthColumn<? extends ValueVector>> columns) {
this.parentReader = parentReader;
+ this.batchSizer = parentReader.getBatchSizesMgr();
this.columns = columns;
+ this.orderedColumns = populateOrderedColumns();
this.useAsyncTasks = parentReader.useAsyncColReader;
this.useBulkReader = parentReader.useBulkReader();
-
- // Can't read any more records than fixed width fields will fit.
- // Note: this calculation is very likely wrong; it is a simplified
- // version of earlier code, but probably needs even more attention.
-
- int totalFixedFieldWidth = parentReader.getBitWidthAllFixedFields() / 8;
- if (totalFixedFieldWidth == 0) {
- targetRecordCount = 0;
- } else {
- targetRecordCount = parentReader.getBatchSize() / totalFixedFieldWidth;
- }
}
/**
@@ -70,11 +74,8 @@ public class VarLenBinaryReader {
}
Stopwatch timer = Stopwatch.createStarted();
- // Can't read any more records than fixed width fields will fit.
-
- if (targetRecordCount > 0) {
- recordsToReadInThisPass = Math.min(recordsToReadInThisPass, targetRecordCount);
- }
+ // Ensure we do not read more than batch record count
+ recordsToReadInThisPass = Math.min(recordsToReadInThisPass, batchSizer.getCurrentRecordsPerBatch());
long recordsReadInCurrentPass = 0;
@@ -90,22 +91,161 @@ public class VarLenBinaryReader {
recordsReadInCurrentPass = readRecordsInBulk((int) recordsToReadInThisPass);
}
+ // Publish this information
+ parentReader.readState.setValuesReadInCurrentPass((int) recordsReadInCurrentPass);
+
+ // Update the stats
parentReader.parquetReaderStats.timeVarColumnRead.addAndGet(timer.elapsed(TimeUnit.NANOSECONDS));
return recordsReadInCurrentPass;
}
private int readRecordsInBulk(int recordsToReadInThisPass) throws IOException {
- int recordsReadInCurrentPass = -1;
+ int batchNumRecords = recordsToReadInThisPass;
+ List<VarLenColumnBatchStats> columnStats = new ArrayList<VarLenColumnBatchStats>(columns.size());
+ int prevReadColumns = -1;
+ boolean overflowCondition = false;
+
+ for (VLColumnContainer columnContainer : orderedColumns) {
+ VarLengthColumn<?> columnReader = columnContainer.column;
+
+ // Read the column data
+ int readColumns = columnReader.readRecordsInBulk(batchNumRecords);
+ assert readColumns <= batchNumRecords : "Reader cannot return more values than requested..";
+
+ if (!overflowCondition) {
+ if (prevReadColumns >= 0 && prevReadColumns != readColumns) {
+ overflowCondition = true;
+ } else {
+ prevReadColumns = readColumns;
+ }
+ }
+
+ // Enqueue this column entry information to handle overflow conditions; we will not know
+ // whether an overflow happened till all variable length columns have been processed
+ columnStats.add(new VarLenColumnBatchStats(columnReader.valueVec, readColumns));
+ // Decrease the number of records to read when a column returns less records (minimize overflow)
+ if (batchNumRecords > readColumns) {
+ batchNumRecords = readColumns;
+ // it seems this column caused an overflow (higher layer will not ask for more values than remaining)
+ ++columnContainer.numCausedOverflows;
+ }
+ }
+
+ // Set the value-count for each column
for (VarLengthColumn<?> columnReader : columns) {
- int readColumns = columnReader.readRecordsInBulk(recordsToReadInThisPass);
- assert (readColumns >= 0 && recordsReadInCurrentPass == readColumns || recordsReadInCurrentPass == -1);
+ columnReader.valuesReadInCurrentPass = batchNumRecords;
+ }
+
+ // Publish this batch statistics
+ publishBatchStats(columnStats, batchNumRecords);
- recordsReadInCurrentPass = readColumns;
+ // Handle column(s) overflow if any
+ if (overflowCondition) {
+ handleColumnOverflow(columnStats, batchNumRecords);
}
- return recordsReadInCurrentPass;
+ return batchNumRecords;
+ }
+
+ private void handleColumnOverflow(List<VarLenColumnBatchStats> columnStats, int batchNumRecords) {
+ // Overflow would happen if a column returned more values than "batchValueCount"; this can happen
+ // when a column Ci is called first, returns num-values-i, and then another column cj is called which
+ // returns less values than num-values-i.
+ RecordBatchOverflow.Builder builder = null;
+
+ // We need to collect all columns which are subject to an overflow (except for the ones which are already
+ // returning values from previous batch overflow)
+ for (VarLenColumnBatchStats columnStat : columnStats) {
+ if (columnStat.numValuesRead > batchNumRecords) {
+ // We need to figure out whether this column was already returning values from a previous batch
+ // overflow; if it is, then this is a NOOP (as the overflow data is still available to be replayed)
+ if (fieldHasAlreadyOverflowData(columnStat.vector.getField().getName())) {
+ continue;
+ }
+
+ // We need to set the value-count as otherwise some size related vector APIs won't work
+ columnStat.vector.getMutator().setValueCount(batchNumRecords);
+
+ // Lazy initialization
+ if (builder == null) {
+ builder = RecordBatchOverflow.newBuilder(parentReader.getOperatorContext().getAllocator());
+ }
+
+ final int numOverflowValues = columnStat.numValuesRead - batchNumRecords;
+ builder.addFieldOverflow(columnStat.vector, batchNumRecords, numOverflowValues);
+ }
+ }
+
+ // Register batch overflow data with the record batch sizer manager (if any)
+ if (builder != null) {
+ Map<String, FieldOverflowStateContainer> overflowContainerMap = parentReader.batchSizerMgr.getFieldOverflowMap();
+ Map<String, FieldOverflowDefinition> overflowDefMap = builder.build().getRecordOverflowDefinition().getFieldOverflowDefs();
+
+ for (Map.Entry<String, FieldOverflowDefinition> entry : overflowDefMap.entrySet()) {
+ FieldOverflowStateContainer overflowStateContainer = new FieldOverflowStateContainer(entry.getValue(), null);
+ // Register this overflow condition
+ overflowContainerMap.put(entry.getKey(), overflowStateContainer);
+ }
+ }
+
+ reorderVLColumns();
+ }
+
+ private void reorderVLColumns() {
+ // Finally, re-order the variable length columns since an overflow occurred
+ Collections.sort(orderedColumns, comparator);
+
+ if (logger.isDebugEnabled()) {
+ boolean isFirstValue = true;
+ final StringBuilder msg = new StringBuilder(RecordBatchSizerManager.BATCH_STATS_PREFIX);
+ msg.append(": Dumping the variable length columns read order: ");
+
+ for (VLColumnContainer container : orderedColumns) {
+ if (!isFirstValue) {
+ msg.append(", ");
+ } else {
+ isFirstValue = false;
+ }
+ msg.append(container.column.valueVec.getField().getName());
+ }
+ msg.append('.');
+
+ logger.debug(msg.toString());
+ }
+ }
+
+ private boolean fieldHasAlreadyOverflowData(String field) {
+ FieldOverflowStateContainer container = parentReader.batchSizerMgr.getFieldOverflowContainer(field);
+
+ if (container == null) {
+ return false;
+ }
+
+ if (container.overflowState == null || container.overflowState.isOverflowDataFullyConsumed()) {
+ parentReader.batchSizerMgr.releaseFieldOverflowContainer(field);
+ return false;
+ }
+ return true;
+ }
+
+ private void publishBatchStats(List<VarLenColumnBatchStats> stats, int batchNumRecords) {
+ // First, let us inform the variable columns of the number of records returned by this batch; this
+ // is for managing overflow data state.
+ Map<String, FieldOverflowStateContainer> overflowMap =
+ parentReader.batchSizerMgr.getFieldOverflowMap();
+
+ for (FieldOverflowStateContainer container : overflowMap.values()) {
+ FieldOverflowState overflowState = container.overflowState;
+
+ if (overflowState != null) {
+ overflowState.onNewBatchValuesConsumed(batchNumRecords);
+ }
+ }
+
+ // Now publish the same to the record batch sizer manager
+ parentReader.batchSizerMgr.onEndOfBatch(batchNumRecords, stats);
}
private long determineSizesSerial(long recordsToReadInThisPass) throws IOException {
@@ -168,4 +308,51 @@ public class VarLenBinaryReader {
throw new DrillRuntimeException(message, e);
}
+ private List<VLColumnContainer> populateOrderedColumns() {
+ List<VLColumnContainer> result = new ArrayList<VLColumnContainer>(columns.size());
+
+ // first, we need to populate this list
+ for (VarLengthColumn<? extends ValueVector> column : columns) {
+ result.add(new VLColumnContainer(column));
+ }
+
+ // now perform the sorting
+ Collections.sort(result, comparator);
+
+ return result;
+ }
+
+// ----------------------------------------------------------------------------
+// Internal Data Structure
+// ----------------------------------------------------------------------------
+
+ /** Container class which will will allow us to implement ordering so to minimize overflow */
+ private static final class VLColumnContainer {
+ /** Variable length column */
+ private final VarLengthColumn<? extends ValueVector> column;
+ /** Number of times this method caused overflow */
+ private int numCausedOverflows;
+
+ /** Constructor */
+ private VLColumnContainer(VarLengthColumn<? extends ValueVector> column) {
+ this.column = column;
+ }
+ }
+
+ /** Comparator class to minimize overflow; columns with highest chance of causing overflow
+ * should be iterated first (have smallest value).
+ */
+ private static final class VLColumnComparator implements Comparator<VLColumnContainer> {
+ // columns which caused overflows, should execute earlier (lowest order)
+ @Override
+ public int compare(VLColumnContainer o1, VLColumnContainer o2) {
+ assert o1 != null && o2 != null;
+
+ if (o1.numCausedOverflows == o2.numCausedOverflows) { return 0; }
+ if (o1.numCausedOverflows < o2.numCausedOverflows) { return 1; }
+
+ return -1;
+ }
+ }
+
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenBulkPageReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenBulkPageReader.java
index 385cb83..0e50406 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenBulkPageReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenBulkPageReader.java
@@ -25,7 +25,8 @@ import org.apache.drill.common.exceptions.DrillRuntimeException;
import org.apache.drill.exec.store.parquet.columnreaders.VarLenColumnBulkInput.ColumnPrecisionInfo;
import org.apache.drill.exec.store.parquet.columnreaders.VarLenColumnBulkInput.ColumnPrecisionType;
import org.apache.drill.exec.store.parquet.columnreaders.VarLenColumnBulkInput.PageDataInfo;
-import org.apache.drill.exec.store.parquet.columnreaders.VarLenColumnBulkInput.VLColumnBulkInputCallback;
+import org.apache.drill.exec.store.parquet.columnreaders.VarLenColumnBulkInput.VarLenColumnBulkInputCallback;
+import org.apache.drill.exec.store.parquet.columnreaders.batchsizing.RecordBatchSizerManager.FieldOverflowStateContainer;
/** Provides bulk reads when accessing Parquet's page payload for variable length columns */
final class VarLenBulkPageReader {
@@ -47,7 +48,9 @@ final class VarLenBulkPageReader {
/** Bulk entry */
private final VarLenColumnBulkEntry entry;
/** A callback to allow bulk readers interact with their container */
- private final VLColumnBulkInputCallback containerCallback;
+ private final VarLenColumnBulkInputCallback containerCallback;
+ /** A reference to column's overflow data (could be null) */
+ private FieldOverflowStateContainer fieldOverflowStateContainer;
// Various BulkEntry readers
final VarLenAbstractEntryReader fixedReader;
@@ -57,10 +60,14 @@ final class VarLenBulkPageReader {
final VarLenAbstractEntryReader dictionaryReader;
final VarLenAbstractEntryReader nullableDictionaryReader;
+ // Overflow reader
+ private VarLenOverflowReader overflowReader;
+
VarLenBulkPageReader(
PageDataInfo pageInfoInput,
ColumnPrecisionInfo columnPrecInfoInput,
- VLColumnBulkInputCallback containerCallbackInput) {
+ VarLenColumnBulkInputCallback containerCallbackInput,
+ FieldOverflowStateContainer fieldOverflowStateContainer) {
// Set the buffer to the native byte order
this.buffer.order(ByteOrder.nativeOrder());
@@ -72,14 +79,22 @@ final class VarLenBulkPageReader {
this.columnPrecInfo = columnPrecInfoInput;
this.entry = new VarLenColumnBulkEntry(this.columnPrecInfo);
this.containerCallback = containerCallbackInput;
+ this.fieldOverflowStateContainer = fieldOverflowStateContainer;
// Initialize the Variable Length Entry Readers
- fixedReader = new VarLenFixedEntryReader(buffer, pageInfo, columnPrecInfo, entry);
- nullableFixedReader = new VarLenNullableFixedEntryReader(buffer, pageInfo, columnPrecInfo, entry);
- variableLengthReader = new VarLenEntryReader(buffer, pageInfo, columnPrecInfo, entry);
- nullableVLReader = new VarLenNullableEntryReader(buffer, pageInfo, columnPrecInfo, entry);
- dictionaryReader = new VarLenEntryDictionaryReader(buffer, pageInfo, columnPrecInfo, entry);
- nullableDictionaryReader = new VarLenNullableDictionaryReader(buffer, pageInfo, columnPrecInfo, entry);
+ fixedReader = new VarLenFixedEntryReader(buffer, pageInfo, columnPrecInfo, entry, containerCallback);
+ nullableFixedReader = new VarLenNullableFixedEntryReader(buffer, pageInfo, columnPrecInfo, entry, containerCallback);
+ variableLengthReader = new VarLenEntryReader(buffer, pageInfo, columnPrecInfo, entry, containerCallback);
+ nullableVLReader = new VarLenNullableEntryReader(buffer, pageInfo, columnPrecInfo, entry, containerCallback);
+ dictionaryReader = new VarLenEntryDictionaryReader(buffer, pageInfo, columnPrecInfo, entry, containerCallback);
+ nullableDictionaryReader = new VarLenNullableDictionaryReader(buffer, pageInfo, columnPrecInfo, entry, containerCallback);
+
+ // Overflow reader is initialized only when a previous batch produced overflow data for this column
+ if (this.fieldOverflowStateContainer == null) {
+ overflowReader = null;
+ } else {
+ overflowReader = new VarLenOverflowReader(buffer, entry, containerCallback, fieldOverflowStateContainer);
+ }
}
final void set(PageDataInfo pageInfoInput, boolean clear) {
@@ -91,13 +106,25 @@ final class VarLenBulkPageReader {
pageInfo.dictionaryValueReader = pageInfoInput.dictionaryValueReader;
pageInfo.numPageValues = pageInfoInput.numPageValues;
if (clear) {
- buffer.clear();
- }
+ buffer.clear();
+ }
}
final VarLenColumnBulkEntry getEntry(int valuesToRead) {
VarLenColumnBulkEntry entry = null;
+ // If there is overflow data, then we need to consume it first
+ if (overflowDataAvailable()) {
+ entry = overflowReader.getEntry(valuesToRead);
+ entry.setReadFromPage(false); // entry was read from the overflow data
+
+ return entry;
+ }
+
+ // It seems there is no overflow data anymore; if we previously were reading from it, then it
+ // needs to get de-initialized before reading new page data.
+ deinitOverflowDataIfNeeded();
+
if (ColumnPrecisionType.isPrecTypeFixed(columnPrecInfo.columnPrecisionType)) {
if ((entry = getFixedEntry(valuesToRead)) == null) {
// The only reason for a null to be returned is when the "getFixedEntry" method discovers
@@ -118,14 +145,15 @@ final class VarLenBulkPageReader {
}
columnPrecInfo.columnPrecisionType = ColumnPrecisionType.DT_PRECISION_IS_VARIABLE;
- entry = getVLEntry(valuesToRead);
+ entry = getVarLenEntry(valuesToRead);
}
} else {
- entry = getVLEntry(valuesToRead);
+ entry = getVarLenEntry(valuesToRead);
}
if (entry != null) {
+ entry.setReadFromPage(true); // entry was read from a Parquet page
pageInfo.numPageFieldsRead += entry.getNumValues();
}
return entry;
@@ -139,10 +167,9 @@ final class VarLenBulkPageReader {
}
}
- private final VarLenColumnBulkEntry getVLEntry(int valuesToRead) {
- if (pageInfo.dictionaryValueReader == null
- || !pageInfo.dictionaryValueReader.isDefined()) {
-
+ private final VarLenColumnBulkEntry getVarLenEntry(int valuesToRead) {
+ // Let start with non-dictionary encoding as it is predominant
+ if (!pageInfo.dictionaryValueReader.isDefined()) {
if (pageInfo.definitionLevels.hasDefinitionLevels()) {
return nullableVLReader.getEntry(valuesToRead);
} else {
@@ -157,4 +184,19 @@ final class VarLenBulkPageReader {
}
}
-}
+ private boolean overflowDataAvailable() {
+ if (overflowReader == null) {
+ return false;
+ }
+ return overflowReader.getRemainingOverflowData() > 0;
+ }
+
+ private void deinitOverflowDataIfNeeded() {
+ if (overflowReader != null) {
+ containerCallback.deinitOverflowData();
+ overflowReader = null;
+ fieldOverflowStateContainer = null;
+ }
+ }
+
+}
\ No newline at end of file
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenColumnBulkEntry.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenColumnBulkEntry.java
index e6158e1..bc77415 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenColumnBulkEntry.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenColumnBulkEntry.java
@@ -47,6 +47,8 @@ final class VarLenColumnBulkEntry implements VarLenBulkEntry {
private boolean arrayBacked;
/** indicator on whether the current data buffer is externally or internally owned */
private boolean internalDataBuf;
+ /** indicates whether the entry was read from the overflow data or page data */
+ private boolean readFromPage;
VarLenColumnBulkEntry(ColumnPrecisionInfo columnPrecInfo) {
this(columnPrecInfo, VarLenBulkPageReader.BUFF_SZ);
@@ -169,4 +171,18 @@ final class VarLenColumnBulkEntry implements VarLenBulkEntry {
return lengths.length;
}
+ /**
+ * @return the readFromPage
+ */
+ boolean isReadFromPage() {
+ return readFromPage;
+ }
+
+ /**
+ * @param readFromPage the readFromPage to set
+ */
+ void setReadFromPage(boolean readFromPage) {
+ this.readFromPage = readFromPage;
+ }
+
}
\ No newline at end of file
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenColumnBulkInput.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenColumnBulkInput.java
index 1b30737..4dbd482 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenColumnBulkInput.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenColumnBulkInput.java
@@ -22,6 +22,12 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import org.apache.drill.common.exceptions.DrillRuntimeException;
+import org.apache.drill.exec.store.parquet.columnreaders.VarLenOverflowReader.FieldOverflowStateImpl;
+import org.apache.drill.exec.store.parquet.columnreaders.batchsizing.BatchSizingMemoryUtil;
+import org.apache.drill.exec.store.parquet.columnreaders.batchsizing.BatchSizingMemoryUtil.ColumnMemoryUsageInfo;
+import org.apache.drill.exec.store.parquet.columnreaders.batchsizing.RecordBatchSizerManager;
+import org.apache.drill.exec.store.parquet.columnreaders.batchsizing.RecordBatchSizerManager.ColumnMemoryQuota;
+import org.apache.drill.exec.store.parquet.columnreaders.batchsizing.RecordBatchSizerManager.FieldOverflowStateContainer;
import org.apache.drill.exec.vector.ValueVector;
import org.apache.drill.exec.vector.VarLenBulkEntry;
import org.apache.drill.exec.vector.VarLenBulkInput;
@@ -29,12 +35,16 @@ import org.apache.parquet.column.values.ValuesReader;
import org.apache.parquet.io.api.Binary;
/** Implements the {@link VarLenBulkInput} interface to optimize data copy */
-final class VarLenColumnBulkInput<V extends ValueVector> implements VarLenBulkInput<VarLenBulkEntry> {
+public final class VarLenColumnBulkInput<V extends ValueVector> implements VarLenBulkInput<VarLenBulkEntry> {
+ private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(VarLenColumnBulkInput.class);
+
/** A cut off number for bulk processing */
private static final int BULK_PROCESSING_MAX_PREC_LEN = 1 << 10;
/** parent object */
private final VarLengthValuesColumn<V> parentInst;
+ /** Batch sizer manager */
+ private final RecordBatchSizerManager batchSizerMgr;
/** Column precision type information (owner by caller) */
private final ColumnPrecisionInfo columnPrecInfo;
/** Custom definition level reader */
@@ -44,6 +54,10 @@ final class VarLenColumnBulkInput<V extends ValueVector> implements VarLenBulkIn
/** The records to read */
private final int recordsToRead;
+ /** Maximum Memory (in bytes) to use for loading this field's data
+ * (soft limit as a single row can go beyond this value)
+ */
+ private ColumnMemoryQuota columnMemoryQuota;
/** Current operation bulk reader state */
private final OprBulkReadState oprReadState;
/** Container class for holding page data information */
@@ -51,7 +65,11 @@ final class VarLenColumnBulkInput<V extends ValueVector> implements VarLenBulkIn
/** Buffered page payload */
private VarLenBulkPageReader buffPagePayload;
/** A callback to allow child readers interact with this class */
- private final VLColumnBulkInputCallback callback;
+ private final VarLenColumnBulkInputCallback callback;
+ /** Column memory usage information */
+ private final ColumnMemoryUsageInfo columnMemoryUsage = new ColumnMemoryUsageInfo();
+ /** A reference to column's overflow data (could be null) */
+ private FieldOverflowStateContainer fieldOverflowStateContainer;
/**
* CTOR.
@@ -64,17 +82,19 @@ final class VarLenColumnBulkInput<V extends ValueVector> implements VarLenBulkIn
int recordsToRead, BulkReaderState bulkReaderState) throws IOException {
this.parentInst = parentInst;
+ this.batchSizerMgr = this.parentInst.parentReader.batchSizerMgr;
this.recordsToRead = recordsToRead;
- this.callback = new VLColumnBulkInputCallback(parentInst.pageReader);
+ this.callback = new VarLenColumnBulkInputCallback(this);
this.columnPrecInfo = bulkReaderState.columnPrecInfo;
this.custDefLevelReader = bulkReaderState.definitionLevelReader;
this.custDictionaryReader = bulkReaderState.dictionaryReader;
+ this.fieldOverflowStateContainer = this.batchSizerMgr.getFieldOverflowContainer(parentInst.valueVec.getField().getName());
// Load page if none have been read
loadPageIfNeeed();
// Create the internal READ_STATE object based on the current page-reader state
- this.oprReadState = new OprBulkReadState(parentInst.pageReader.readyToReadPosInBytes, parentInst.pageReader.valuesRead, 0);
+ this.oprReadState = new OprBulkReadState(parentInst.pageReader.readyToReadPosInBytes, parentInst.pageReader.valuesRead);
// Let's try to figure out whether this columns is fixed or variable length; this information
// is not always accurate within the Parquet schema metadata.
@@ -90,26 +110,34 @@ final class VarLenColumnBulkInput<V extends ValueVector> implements VarLenBulkIn
@Override
public boolean hasNext() {
try {
- if (oprReadState.batchFieldIndex < recordsToRead) {
- // We need to ensure there is a page of data to be read
- if (!parentInst.pageReader.hasPage() || parentInst.pageReader.currentPageCount == oprReadState.numPageFieldsProcessed) {
- long totalValueCount = parentInst.columnChunkMetaData.getValueCount();
-
- if (totalValueCount == (parentInst.totalValuesRead + oprReadState.batchFieldIndex) || !parentInst.pageReader.next()) {
- parentInst.hitRowGroupEnd();
- return false;
+ if (!batchConstraintsReached()) {
+ // If there is overflow data, then proceed; otherwise, make sure there is still Parquet data to be
+ // read.
+ if (!overflowDataAvailable()) {
+ // We need to ensure there is a page of data to be read
+ if (!parentInst.pageReader.hasPage() || parentInst.pageReader.currentPageCount == oprReadState.numPageFieldsProcessed) {
+ long totalValueCount = parentInst.columnChunkMetaData.getValueCount();
+
+ if (totalValueCount == (parentInst.totalValuesRead + oprReadState.batchNumValuesReadFromPages)
+ || !parentInst.pageReader.next()) {
+
+ parentInst.hitRowGroupEnd();
+ return false;
+ }
+
+ // Reset the state object page read metadata
+ oprReadState.numPageFieldsProcessed = 0;
+ oprReadState.pageReadPos = parentInst.pageReader.readyToReadPosInBytes;
+
+ // Update the value readers information
+ setValuesReadersOnNewPage();
+
+ // Update the buffered-page-payload since we've read a new page
+ setBufferedPagePayload();
}
-
- // Reset the state object page read metadata
- oprReadState.numPageFieldsProcessed = 0;
- oprReadState.pageReadPos = parentInst.pageReader.readyToReadPosInBytes;
-
- // Update the value readers information
- setValuesReadersOnNewPage();
-
- // Update the buffered-page-payload since we've read a new page
- setBufferedPagePayload();
}
+ // Alright, we didn't hit a batch constraint and are able to read either from the overflow data or
+ // the Parquet data.
return true;
} else {
return false;
@@ -122,19 +150,22 @@ final class VarLenColumnBulkInput<V extends ValueVector> implements VarLenBulkIn
/** {@inheritDoc} */
@Override
public final VarLenBulkEntry next() {
- final int toReadRemaining = recordsToRead - oprReadState.batchFieldIndex;
- final int pageRemaining = parentInst.pageReader.currentPageCount - oprReadState.numPageFieldsProcessed;
- final int remaining = Math.min(toReadRemaining, pageRemaining);
- final VarLenBulkEntry result = buffPagePayload.getEntry(remaining);
+ final int remaining = getRemainingRecords();
+ final VarLenColumnBulkEntry result = buffPagePayload.getEntry(remaining);
// Update position for next read
- if (result != null) {
- // Page read position is meaningful only when dictionary mode is off
- if (pageInfo.dictionaryValueReader == null
- || !pageInfo.dictionaryValueReader.isDefined()) {
- oprReadState.pageReadPos += (result.getTotalLength() + 4 * result.getNumNonNullValues());
+ if (result != null && result.getNumValues() > 0) {
+ // We need to update page stats only when we are reading directly from Parquet pages; there are
+ // situations where we have to return the overflow data (read in a previous batch)
+ if (result.isReadFromPage()) {
+ // Page read position is meaningful only when dictionary mode is off
+ if (!pageInfo.dictionaryValueReader.isDefined()) {
+ oprReadState.pageReadPos += (result.getTotalLength() + 4 * result.getNumNonNullValues());
+ }
+ oprReadState.numPageFieldsProcessed += result.getNumValues();
+ oprReadState.batchNumValuesReadFromPages += result.getNumValues();
}
- oprReadState.numPageFieldsProcessed += result.getNumValues();
+ // Update the batch field index
oprReadState.batchFieldIndex += result.getNumValues();
}
return result;
@@ -160,11 +191,41 @@ final class VarLenColumnBulkInput<V extends ValueVector> implements VarLenBulkIn
// Page read position is meaningful only when dictionary mode is off
if (pageInfo.dictionaryValueReader == null
- || !pageInfo.dictionaryValueReader.isDefined()) {
+ || !pageInfo.dictionaryValueReader.isDefined()) {
parentInst.pageReader.readyToReadPosInBytes = oprReadState.pageReadPos;
}
parentInst.pageReader.valuesRead = oprReadState.numPageFieldsProcessed;
- parentInst.totalValuesRead += oprReadState.batchFieldIndex;
+ parentInst.totalValuesRead += oprReadState.batchNumValuesReadFromPages;
+
+ if (logger.isDebugEnabled()) {
+ String message = String.format("requested=%d, returned=%d, total-returned=%d",
+ recordsToRead,
+ oprReadState.batchFieldIndex,
+ parentInst.totalValuesRead);
+
+ logger.debug(message);
+ }
+ }
+
+ /**
+ * @return minimum memory size required to process a variable column in a columnar manner
+ */
+ public static int getMinVLColumnMemorySize() {
+ // How did we come up with this number?
+ // Let's first lay down some facts
+ // a) the allocator-rounding-to-next-power-of-two has to be accounted for
+ // b) VL columns use up to three vectors "bits", "offsets", and "values"
+ // c) The maximum number of entries per chunk is chunk-size / 4
+ // d) "data" and "length" sizes within a chunk have a reverse relationship (if one grows, then the other shrinks)
+ // e) "data" and "length" within a chunk cannot exceed 1 chunk-size each when loaded into the ValueVector
+ // This information gives the following upper bound
+ // - max-chunk-vv-footprint < (chunk-size/4 + 1chunk-size + 1/2 chunk-size) < 2 chunk-sizes
+ // Why?
+ // - max-bits footprint is controlled by c)
+ // - "data" and "offsets" can have a max footprint of 1 chunk size when they are over chunk-size/2 since
+ // roundup happens; if one goes beyond chunk-size/2, then the other is less than that (inverse relationship)
+ // which leads to a maximum memory footprint of 1 chunk-size + 1/2 chunk-size
+ return VarLenBulkPageReader.BUFF_SZ * 2;
}
final int getReadBatchFields() {
@@ -201,14 +262,14 @@ final class VarLenColumnBulkInput<V extends ValueVector> implements VarLenBulkIn
pageInfo.numPageFieldsRead = oprReadState.numPageFieldsProcessed;
if (buffPagePayload == null) {
- buffPagePayload = new VarLenBulkPageReader(pageInfo, columnPrecInfo, callback);
+ buffPagePayload = new VarLenBulkPageReader(pageInfo, columnPrecInfo, callback, fieldOverflowStateContainer);
} else {
buffPagePayload.set(pageInfo, true);
}
} else {
if (buffPagePayload == null) {
- buffPagePayload = new VarLenBulkPageReader(null, columnPrecInfo, callback);
+ buffPagePayload = new VarLenBulkPageReader(null, columnPrecInfo, callback, fieldOverflowStateContainer);
}
}
}
@@ -311,6 +372,113 @@ final class VarLenColumnBulkInput<V extends ValueVector> implements VarLenBulkIn
}
}
+ private boolean batchConstraintsReached() {
+ // Let's update this column's memory quota
+ columnMemoryQuota = batchSizerMgr.getCurrentFieldBatchMemory(parentInst.valueVec.getField().getName());
+ assert columnMemoryQuota.getMaxMemoryUsage() > 0;
+
+ // Now try to figure out whether the next chunk will take us beyond the memory quota
+ final int maxNumRecordsInChunk = VarLenBulkPageReader.BUFF_SZ / BatchSizingMemoryUtil.INT_VALUE_WIDTH;
+
+ if (this.parentInst.valueVec.getField().isNullable()) {
+ return batchConstraintsReached(
+ maxNumRecordsInChunk * BatchSizingMemoryUtil.BYTE_VALUE_WIDTH, // max "bits" space within a chunk
+ maxNumRecordsInChunk * BatchSizingMemoryUtil.INT_VALUE_WIDTH, // max "offsets" space within a chunk
+ VarLenBulkPageReader.BUFF_SZ // max "data" space within a chunk
+ );
+
+ } else {
+ return batchConstraintsReached(
+ 0,
+ maxNumRecordsInChunk * BatchSizingMemoryUtil.INT_VALUE_WIDTH, // max "offsets" space within a chunk
+ VarLenBulkPageReader.BUFF_SZ // max "data" space within a chunk
+ );
+ }
+ }
+
+ private boolean batchConstraintsReached(int newBitsMemory, int newOffsetsMemory, int newDataMemory) {
+ assert oprReadState.batchFieldIndex <= recordsToRead; // cannot read beyond the batch size
+
+ // Did we reach the batch size limit?
+ if (oprReadState.batchFieldIndex == recordsToRead) {
+ return true; // batch size reached
+ }
+
+ // Memory constraint check logic:
+ // - if this is the first chunk to process, then let it proceed as we need to at least return
+ // one row; this also means the minimum batch memory shouldn't be lower than 2 chunks (please refer
+ // to getMinVLColumnMemorySize() method for more information).
+ //
+ // - Otherwise, we make sure that the memory growth after processing a chunk cannot go beyond the maximum
+ // batch memory for this column
+ // - There is also a caveat that needs to be handled during processing:
+ // o The page-bulk-reader will stop loading entries if it encounters a large value (doesn't fit within
+ // the chunk)
+ // o There is an exception though, which is if the entry is the first one within the batch (this is to
+ // ensure that we always make progress)
+ // o In this situation a callback to this object is made to assess whether this large entry can be loaded
+ // into the ValueVector.
+
+ // Is this the first chunk to be processed?
+ if (oprReadState.batchFieldIndex == 0) {
+ return false; // we should process at least one chunk
+ }
+
+ // Is the next processed chunk going to cause memory to overflow beyond the allowed limit?
+ columnMemoryUsage.vector = parentInst.valueVec;
+ columnMemoryUsage.memoryQuota = columnMemoryQuota;
+ columnMemoryUsage.currValueCount = oprReadState.batchFieldIndex;
+
+ // Return true if we cannot add this new payload
+ return !BatchSizingMemoryUtil.canAddNewData(columnMemoryUsage, newBitsMemory, newOffsetsMemory, newDataMemory);
+ }
+
+ private int getRemainingRecords() {
+ // remaining records to return within this batch
+ final int toReadRemaining = recordsToRead - oprReadState.batchFieldIndex;
+ final int remainingOverflowData = getRemainingOverflowData();
+ final int remaining;
+
+ // This method remainder semantic depends on whether we are dealing with page data or
+ // overflow data; now that overflow data is behaving like a source of input
+ if (remainingOverflowData == 0) {
+ final int pageRemaining = parentInst.pageReader.currentPageCount - oprReadState.numPageFieldsProcessed;
+ remaining = Math.min(toReadRemaining, pageRemaining);
+
+ } else {
+ remaining = Math.min(toReadRemaining, remainingOverflowData);
+ }
+
+ return remaining;
+ }
+
+ private boolean overflowDataAvailable() {
+ return getRemainingOverflowData() > 0;
+ }
+
+ private int getRemainingOverflowData() {
+
+ if (fieldOverflowStateContainer != null) {
+ FieldOverflowStateImpl overflowState =
+ (FieldOverflowStateImpl) fieldOverflowStateContainer.overflowState;
+
+ if (overflowState != null) {
+ return overflowState.getRemainingOverflowData();
+ } else {
+ // This can happen if this is the first time we are accessing this container as
+ // the overflow reader didn't have the chance consume any overflow data yet.
+ return fieldOverflowStateContainer.overflowDef.numValues;
+ }
+ }
+ return 0;
+ }
+
+ private void deinitOverflowData() {
+ batchSizerMgr.releaseFieldOverflowContainer(parentInst.valueVec.getField().getName());
+
+ fieldOverflowStateContainer = null;
+ }
+
// --------------------------------------------------------------------------
// Inner Classes
// --------------------------------------------------------------------------
@@ -377,11 +545,14 @@ final class VarLenColumnBulkInput<V extends ValueVector> implements VarLenBulkIn
int numPageFieldsProcessed;
/** field index within current batch */
int batchFieldIndex;
+ /** number of values actually read from Parquet pages (not from overflow data) within this batch */
+ int batchNumValuesReadFromPages;
- OprBulkReadState(long pageReadPos, int numPageFieldsRead, int batchFieldIndex) {
+ OprBulkReadState(long pageReadPos, int numPageFieldsRead) {
this.pageReadPos = pageReadPos;
this.numPageFieldsProcessed = numPageFieldsRead;
- this.batchFieldIndex = batchFieldIndex;
+ this.batchFieldIndex = 0;
+ this.batchNumValuesReadFromPages = 0;
}
}
@@ -404,12 +575,15 @@ final class VarLenColumnBulkInput<V extends ValueVector> implements VarLenBulkIn
}
/** Callback to allow a bulk reader interact with its parent */
- final static class VLColumnBulkInputCallback {
+ final static class VarLenColumnBulkInputCallback {
+ /** Parent instance */
+ final VarLenColumnBulkInput<? extends ValueVector> parentInst;
/** Page reader object */
PageReader pageReader;
- VLColumnBulkInputCallback(PageReader _pageReader) {
- this.pageReader = _pageReader;
+ VarLenColumnBulkInputCallback(VarLenColumnBulkInput<? extends ValueVector> parentInst) {
+ this.parentInst = parentInst;
+ this.pageReader = this.parentInst.parentInst.pageReader;
}
/**
@@ -428,6 +602,22 @@ final class VarLenColumnBulkInput<V extends ValueVector> implements VarLenBulkIn
ValuesReader getDefinitionLevelsReader() {
return pageReader.definitionLevels;
}
+
+ /**
+ * @param newBitsMemory new "bits" memory size
+ * @param newOffsetsMemory new "offsets" memory size
+ * @param newDataMemory new "data" memory size
+ * @return true if the new payload ("bits", "offsets", "data") will trigger a constraint violation; false
+ * otherwise
+ */
+ boolean batchMemoryConstraintsReached(int newBitsMemory, int newOffsetsMemory, int newDataMemory) {
+ return parentInst.batchConstraintsReached(newBitsMemory, newOffsetsMemory, newDataMemory);
+ }
+
+ /** Informs the parent the overflow data cannot be used anymore */
+ void deinitOverflowData() {
+ parentInst.deinitOverflowData();
+ }
}
/** A wrapper value reader with the ability to control when to read the next value */
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenEntryDictionaryReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenEntryDictionaryReader.java
index 0cd2416..8ba7ac4 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenEntryDictionaryReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenEntryDictionaryReader.java
@@ -21,17 +21,19 @@ import java.nio.ByteBuffer;
import org.apache.drill.exec.store.parquet.columnreaders.VarLenColumnBulkInput.ColumnPrecisionInfo;
import org.apache.drill.exec.store.parquet.columnreaders.VarLenColumnBulkInput.DictionaryReaderWrapper;
import org.apache.drill.exec.store.parquet.columnreaders.VarLenColumnBulkInput.PageDataInfo;
+import org.apache.drill.exec.store.parquet.columnreaders.VarLenColumnBulkInput.VarLenColumnBulkInputCallback;
import org.apache.parquet.io.api.Binary;
/** Handles variable data types using a dictionary */
-final class VarLenEntryDictionaryReader extends VarLenAbstractEntryReader {
+final class VarLenEntryDictionaryReader extends VarLenAbstractPageEntryReader {
VarLenEntryDictionaryReader(ByteBuffer buffer,
PageDataInfo pageInfo,
ColumnPrecisionInfo columnPrecInfo,
- VarLenColumnBulkEntry entry) {
+ VarLenColumnBulkEntry entry,
+ VarLenColumnBulkInputCallback containerCallback) {
- super(buffer, pageInfo, columnPrecInfo, entry);
+ super(buffer, pageInfo, columnPrecInfo, entry, containerCallback);
}
/** {@inheritDoc} */
@@ -92,6 +94,12 @@ final class VarLenEntryDictionaryReader extends VarLenAbstractEntryReader {
final Binary currEntry = valueReader.getEntry();
final int dataLen = currEntry.length();
+ // Is there enough memory to handle this large value?
+ if (batchMemoryConstraintsReached(0, 4, dataLen)) {
+ entry.set(0, 0, 0, 0); // no data to be consumed
+ return entry;
+ }
+
// Set the value length
valueLengths[0] = dataLen;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenEntryReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenEntryReader.java
index e570b3e..d95050d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenEntryReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenEntryReader.java
@@ -21,16 +21,18 @@ import java.nio.ByteBuffer;
import org.apache.drill.common.exceptions.DrillRuntimeException;
import org.apache.drill.exec.store.parquet.columnreaders.VarLenColumnBulkInput.ColumnPrecisionInfo;
import org.apache.drill.exec.store.parquet.columnreaders.VarLenColumnBulkInput.PageDataInfo;
+import org.apache.drill.exec.store.parquet.columnreaders.VarLenColumnBulkInput.VarLenColumnBulkInputCallback;
/** Handles variable data types. */
-final class VarLenEntryReader extends VarLenAbstractEntryReader {
+final class VarLenEntryReader extends VarLenAbstractPageEntryReader {
VarLenEntryReader(ByteBuffer buffer,
PageDataInfo pageInfo,
ColumnPrecisionInfo columnPrecInfo,
- VarLenColumnBulkEntry entry) {
+ VarLenColumnBulkEntry entry,
+ VarLenColumnBulkInputCallback containerCallback) {
- super(buffer, pageInfo, columnPrecInfo, entry);
+ super(buffer, pageInfo, columnPrecInfo, entry, containerCallback);
}
/** {@inheritDoc} */
@@ -64,23 +66,23 @@ final class VarLenEntryReader extends VarLenAbstractEntryReader {
break;
}
- final int data_len = getInt(srcBuff, srcPos);
+ final int dataLen = getInt(srcBuff, srcPos);
srcPos += 4;
- if (srcLen < (srcPos + data_len)
- || tgtLen < (tgtPos + data_len)) {
+ if (srcLen < (srcPos + dataLen)
+ || tgtLen < (tgtPos + dataLen)) {
break;
}
- valueLengths[numValues++] = data_len;
+ valueLengths[numValues++] = dataLen;
- if (data_len > 0) {
- vlCopy(srcBuff, srcPos, tgtBuff, tgtPos, data_len);
+ if (dataLen > 0) {
+ vlCopy(srcBuff, srcPos, tgtBuff, tgtPos, dataLen);
// Update the counters
- srcPos += data_len;
- tgtPos += data_len;
+ srcPos += dataLen;
+ tgtPos += dataLen;
}
}
@@ -119,6 +121,12 @@ final class VarLenEntryReader extends VarLenAbstractEntryReader {
throw new DrillRuntimeException(message);
}
+ // Is there enough memory to handle this large value?
+ if (batchMemoryConstraintsReached(0, 4, dataLen)) {
+ entry.set(0, 0, 0, 0); // no data to be consumed
+ return entry;
+ }
+
// Register the length
valueLengths[0] = dataLen;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenFixedEntryReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenFixedEntryReader.java
index a69488b..e8dc15f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenFixedEntryReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenFixedEntryReader.java
@@ -20,16 +20,18 @@ package org.apache.drill.exec.store.parquet.columnreaders;
import java.nio.ByteBuffer;
import org.apache.drill.exec.store.parquet.columnreaders.VarLenColumnBulkInput.ColumnPrecisionInfo;
import org.apache.drill.exec.store.parquet.columnreaders.VarLenColumnBulkInput.PageDataInfo;
+import org.apache.drill.exec.store.parquet.columnreaders.VarLenColumnBulkInput.VarLenColumnBulkInputCallback;
/** Handles fixed data types that have been erroneously tagged as Variable Length. */
-final class VarLenFixedEntryReader extends VarLenAbstractEntryReader {
+final class VarLenFixedEntryReader extends VarLenAbstractPageEntryReader {
VarLenFixedEntryReader(ByteBuffer buffer,
PageDataInfo pageInfo,
ColumnPrecisionInfo columnPrecInfo,
- VarLenColumnBulkEntry entry) {
+ VarLenColumnBulkEntry entry,
+ VarLenColumnBulkInputCallback containerCallback) {
- super(buffer, pageInfo, columnPrecInfo, entry);
+ super(buffer, pageInfo, columnPrecInfo, entry, containerCallback);
}
/** {@inheritDoc} */
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenNullableDictionaryReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenNullableDictionaryReader.java
index c95a108..f7b6dce 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenNullableDictionaryReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenNullableDictionaryReader.java
@@ -21,17 +21,19 @@ import java.nio.ByteBuffer;
import org.apache.drill.exec.store.parquet.columnreaders.VarLenColumnBulkInput.ColumnPrecisionInfo;
import org.apache.drill.exec.store.parquet.columnreaders.VarLenColumnBulkInput.DictionaryReaderWrapper;
import org.apache.drill.exec.store.parquet.columnreaders.VarLenColumnBulkInput.PageDataInfo;
+import org.apache.drill.exec.store.parquet.columnreaders.VarLenColumnBulkInput.VarLenColumnBulkInputCallback;
import org.apache.parquet.io.api.Binary;
/** Handles nullable variable data types using a dictionary */
-final class VarLenNullableDictionaryReader extends VarLenAbstractEntryReader {
+final class VarLenNullableDictionaryReader extends VarLenAbstractPageEntryReader {
VarLenNullableDictionaryReader(ByteBuffer buffer,
PageDataInfo pageInfo,
ColumnPrecisionInfo columnPrecInfo,
- VarLenColumnBulkEntry entry) {
+ VarLenColumnBulkEntry entry,
+ VarLenColumnBulkInputCallback containerCallback) {
- super(buffer, pageInfo, columnPrecInfo, entry);
+ super(buffer, pageInfo, columnPrecInfo, entry, containerCallback);
}
/** {@inheritDoc} */
@@ -102,15 +104,21 @@ final class VarLenNullableDictionaryReader extends VarLenAbstractEntryReader {
private final VarLenColumnBulkEntry getEntrySingle(int valsToReadWithinPage) {
final int[] valueLengths = entry.getValuesLength();
+ final DictionaryReaderWrapper valueReader = pageInfo.dictionaryValueReader;
// Initialize the reader if needed
pageInfo.definitionLevels.readFirstIntegerIfNeeded();
if (pageInfo.definitionLevels.readCurrInteger() == 1) {
- final DictionaryReaderWrapper valueReader = pageInfo.dictionaryValueReader;
final Binary currEntry = valueReader.getEntry();
final int dataLen = currEntry.length();
+ // Is there enough memory to handle this large value?
+ if (batchMemoryConstraintsReached(1, 4, dataLen)) {
+ entry.set(0, 0, 0, 0); // no data to be consumed
+ return entry;
+ }
+
// Set the value length
valueLengths[0] = dataLen;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenNullableEntryReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenNullableEntryReader.java
index 7a172bb..7ffb27a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenNullableEntryReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenNullableEntryReader.java
@@ -21,16 +21,18 @@ import java.nio.ByteBuffer;
import org.apache.drill.common.exceptions.DrillRuntimeException;
import org.apache.drill.exec.store.parquet.columnreaders.VarLenColumnBulkInput.ColumnPrecisionInfo;
import org.apache.drill.exec.store.parquet.columnreaders.VarLenColumnBulkInput.PageDataInfo;
+import org.apache.drill.exec.store.parquet.columnreaders.VarLenColumnBulkInput.VarLenColumnBulkInputCallback;
/** Handles variable data types. */
-final class VarLenNullableEntryReader extends VarLenAbstractEntryReader {
+final class VarLenNullableEntryReader extends VarLenAbstractPageEntryReader {
VarLenNullableEntryReader(ByteBuffer buffer,
PageDataInfo pageInfo,
ColumnPrecisionInfo columnPrecInfo,
- VarLenColumnBulkEntry entry) {
+ VarLenColumnBulkEntry entry,
+ VarLenColumnBulkInputCallback containerCallback) {
- super(buffer, pageInfo, columnPrecInfo, entry);
+ super(buffer, pageInfo, columnPrecInfo, entry, containerCallback);
}
/** {@inheritDoc} */
@@ -142,6 +144,12 @@ final class VarLenNullableEntryReader extends VarLenAbstractEntryReader {
throw new DrillRuntimeException(message);
}
+ // Is there enough memory to handle this large value?
+ if (batchMemoryConstraintsReached(1, 4, dataLen)) {
+ entry.set(0, 0, 0, 0); // no data to be consumed
+ return entry;
+ }
+
// Register the length
valueLengths[0] = dataLen;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenNullableFixedEntryReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenNullableFixedEntryReader.java
index c776934..98089fd 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenNullableFixedEntryReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenNullableFixedEntryReader.java
@@ -20,17 +20,19 @@ package org.apache.drill.exec.store.parquet.columnreaders;
import java.nio.ByteBuffer;
import org.apache.drill.exec.store.parquet.columnreaders.VarLenColumnBulkInput.ColumnPrecisionInfo;
import org.apache.drill.exec.store.parquet.columnreaders.VarLenColumnBulkInput.PageDataInfo;
+import org.apache.drill.exec.store.parquet.columnreaders.VarLenColumnBulkInput.VarLenColumnBulkInputCallback;
import org.apache.parquet.column.values.ValuesReader;
/** Handles nullable fixed data types that have been erroneously tagged as Variable Length. */
-final class VarLenNullableFixedEntryReader extends VarLenAbstractEntryReader {
+final class VarLenNullableFixedEntryReader extends VarLenAbstractPageEntryReader {
VarLenNullableFixedEntryReader(ByteBuffer buffer,
PageDataInfo pageInfo,
ColumnPrecisionInfo columnPrecInfo,
- VarLenColumnBulkEntry entry) {
+ VarLenColumnBulkEntry entry,
+ VarLenColumnBulkInputCallback containerCallback) {
- super(buffer, pageInfo, columnPrecInfo, entry);
+ super(buffer, pageInfo, columnPrecInfo, entry, containerCallback);
}
/** {@inheritDoc} */
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenOverflowReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenOverflowReader.java
new file mode 100644
index 0000000..cacd5c8
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenOverflowReader.java
@@ -0,0 +1,372 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.parquet.columnreaders;
+
+import java.nio.ByteBuffer;
+
+import org.apache.drill.exec.store.parquet.columnreaders.VarLenColumnBulkInput.VarLenColumnBulkInputCallback;
+import org.apache.drill.exec.store.parquet.columnreaders.batchsizing.BatchSizingMemoryUtil;
+import org.apache.drill.exec.store.parquet.columnreaders.batchsizing.RecordBatchOverflow.FieldOverflowDefinition;
+import org.apache.drill.exec.store.parquet.columnreaders.batchsizing.RecordBatchSizerManager.FieldOverflowState;
+import org.apache.drill.exec.store.parquet.columnreaders.batchsizing.RecordBatchSizerManager.FieldOverflowStateContainer;
+
+/**
+ * This class is responsible for processing serialized overflow data (generated in a previous batch); this way
+ * overflow data becomes an input source and is thus a) efficiently re-loaded into the current
+ * batch ValueVector and b) subjected to the same batching constraints rules.
+ */
+public final class VarLenOverflowReader extends VarLenAbstractEntryReader {
+ private final FieldOverflowStateContainer fieldOverflowContainer;
+ private final boolean isNullable;
+ private final FieldOverflowStateImpl overflowState;
+
+ /**
+ * CTOR.
+ * @param buffer byte buffer for data buffering (within CPU cache)
+ * @param pageInfo page being processed information
+ * @param columnPrecInfo column precision information
+ * @param entry reusable bulk entry object
+ */
+ VarLenOverflowReader(ByteBuffer buffer,
+ VarLenColumnBulkEntry entry,
+ VarLenColumnBulkInputCallback containerCallback,
+ FieldOverflowStateContainer fieldOverflowContainer) {
+
+ super(buffer, entry, containerCallback);
+
+ this.fieldOverflowContainer = fieldOverflowContainer;
+ this.isNullable = fieldOverflowContainer.overflowDef.field.isNullable();
+
+ // Initialize the overflow state object
+ initOverflowStateIfNeeded();
+
+ // By now the overflow state object should be initialized
+ overflowState = (FieldOverflowStateImpl) fieldOverflowContainer.overflowState;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ VarLenColumnBulkEntry getEntry(int valuesToRead) {
+
+ if (getRemainingOverflowData() == 0) {
+ return null; // overflow data fully committed
+ }
+
+ final int[] valueLengths = entry.getValuesLength();
+ final FieldOverflowDefinition overflowDef = fieldOverflowContainer.overflowDef;
+ final OverflowDataCache overflowDataCache = overflowState.overflowDataCache;
+ final int maxDataSize = VarLenBulkPageReader.BUFF_SZ;
+
+ // Flush the cache across batches
+ if (overflowState.currValueIdx == overflowState.numCommittedValues) {
+ overflowDataCache.flush();
+ }
+
+ // load some overflow data for processing
+ final int maxValues = Math.min(entry.getMaxEntries(), valuesToRead);
+ final int numAvailableValues = overflowDataCache.load(overflowState.currValueIdx, maxValues);
+ final int firstValueDataOffset = getDataBufferStartOffset() + adjustDataOffset(overflowState.currValueIdx);
+ int totalDataLen = 0;
+ int currValueIdx = overflowState.currValueIdx;
+ int idx = 0;
+ int numNulls = 0;
+
+ for ( ; idx < numAvailableValues; idx++, currValueIdx++) {
+ // Is this value defined?
+ if (!isNullable || overflowDataCache.getNullable(currValueIdx) == 1) {
+ final int dataLen = overflowDataCache.getDataLength(currValueIdx);
+
+ if ((totalDataLen + dataLen) > maxDataSize) {
+ break;
+ }
+
+ totalDataLen += dataLen;
+ valueLengths[idx] = dataLen;
+
+ } else {
+ valueLengths[idx] = -1;
+ ++numNulls;
+ }
+ }
+
+ // We encountered a large value or no overflow data; need special handling
+ if (idx == 0) {
+ final int dataLen = overflowDataCache.getDataLength(currValueIdx);
+ return handleLargeEntry(maxDataSize, firstValueDataOffset, dataLen);
+ }
+
+ // Update the next overflow value index to be processed
+ overflowState.currValueIdx = currValueIdx;
+
+ // Now set the bulk entry
+ entry.set(firstValueDataOffset, totalDataLen, idx, idx - numNulls, overflowDef.buffer);
+
+ return entry;
+ }
+
+ /**
+ * @return remaining overflow data (total-overflow-data - (committed + returned-within-current-batch))
+ */
+ int getRemainingOverflowData() {
+ return overflowState.getRemainingOverflowData();
+ }
+
+ private VarLenColumnBulkEntry handleLargeEntry(int maxDataSize, int firstValueDataOffset, int totalDataLen) {
+
+ final FieldOverflowDefinition overflowDef = fieldOverflowContainer.overflowDef;
+ final FieldOverflowStateImpl overflowState = (FieldOverflowStateImpl) fieldOverflowContainer.overflowState;
+ final int[] valueLengths = entry.getValuesLength();
+
+ // Is there enough memory to handle this large value?
+ if (batchMemoryConstraintsReached(isNullable ? 1 : 0, 4, totalDataLen)) {
+ entry.set(0, 0, 0, 0); // no data to be consumed
+ return entry;
+ }
+
+ // Register the length
+ valueLengths[0] = totalDataLen;
+
+ // We already have all the information we need
+ entry.set(firstValueDataOffset, totalDataLen, 1, 1, overflowDef.buffer);
+
+ // Update the current value index
+ overflowState.currValueIdx++;
+ return entry;
+ }
+
+ void initOverflowStateIfNeeded() {
+ // An overflow happened in the previous batch; this is the first time we are trying to
+ // consume this overflow data (in some cases, several batches are needed if somehow the
+ // number-of-records-per-batch becomes small).
+ if (fieldOverflowContainer.overflowState == null) {
+ fieldOverflowContainer.overflowState = new FieldOverflowStateImpl(buffer, fieldOverflowContainer.overflowDef);
+ }
+ }
+
+ private int adjustDataOffset(int valueIdx) {
+ // The overflow definition stores offsets without adjustment (the offsets still refer to the original
+ // buffer). We need to perform a minor transformation to compute the correct offset within the new buffer:
+ // adjusted-offset(value-idx) = offset(value-i) - offset(value-0)
+ int firstOffset, targetOffset;
+
+ if (!isNullable) {
+ firstOffset = fieldOverflowContainer.overflowDef.buffer.getInt(0);
+ targetOffset = fieldOverflowContainer.overflowDef.buffer.getInt(valueIdx * 4);
+
+ } else {
+ final int numOverflowValues = fieldOverflowContainer.overflowDef.numValues;
+ firstOffset = fieldOverflowContainer.overflowDef.buffer.getInt(numOverflowValues);
+ targetOffset = fieldOverflowContainer.overflowDef.buffer.getInt(numOverflowValues + valueIdx * 4);
+ }
+
+ return targetOffset - firstOffset;
+ }
+
+ private int getDataBufferStartOffset() {
+ if (!isNullable) {
+ // <num-values+1 offsets><data>
+ return (fieldOverflowContainer.overflowDef.numValues + 1) * 4;
+
+ } else {
+ // <num-values nullable bytes><num-values+1 offsets><data>
+ return fieldOverflowContainer.overflowDef.numValues + (fieldOverflowContainer.overflowDef.numValues + 1) * 4;
+ }
+ }
+
+// ----------------------------------------------------------------------------
+// Inner Data Structure
+// ----------------------------------------------------------------------------
+
+ /** Allows overflow reader to maintain overflow data state */
+ final static class FieldOverflowStateImpl implements FieldOverflowState {
+ /**
+ * The number of overflow values consumed by previous batches; this means that if a new overflow
+ * happens, then we should return uncommitted values (un-consumed)
+ */
+ private int numCommittedValues;
+ /** Next value index to be processed */
+ private int currValueIdx;
+ /** A heap cache to accelerate loading of overflow data into bulk entries */
+ private final OverflowDataCache overflowDataCache;
+
+ private FieldOverflowStateImpl(ByteBuffer buffer, FieldOverflowDefinition overflowDef) {
+ overflowDataCache = new OverflowDataCache(buffer, overflowDef);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void onNewBatchValuesConsumed(int numValues) {
+ if (numCommittedValues < overflowDataCache.overflowDef.numValues) {
+ numCommittedValues += numValues;
+ currValueIdx = numCommittedValues;
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public boolean isOverflowDataFullyConsumed() {
+ return numCommittedValues == overflowDataCache.overflowDef.numValues;
+ }
+
+ /**
+ * @return remaining overflow data (total-overflow-data - (committed + returned-within-current-batch))
+ */
+ int getRemainingOverflowData() {
+ assert currValueIdx <= overflowDataCache.overflowDef.numValues;
+
+ return overflowDataCache.overflowDef.numValues - currValueIdx;
+ }
+
+ }
+
+ /** Enable us to reuse cached overflow data across calls */
+ final static class OverflowDataCache {
+ /** Cache format: [<nullvalue>*][<offset>*]; the last "-1" is to account for the extra offset to retrieve
+ * since each value requires two offsets */
+ private static final int MAX_NUM_VALUES = (VarLenBulkPageReader.BUFF_SZ / (BatchSizingMemoryUtil.INT_VALUE_WIDTH + 1)) - 1;
+
+ /** Byte buffer for CPU caching */
+ private final byte[] bufferArray;
+ /** Overflow definition */
+ private final FieldOverflowDefinition overflowDef;
+ /** Whether this column is optional */
+ final boolean isNullable;
+ /** start index of the first cached overflow data */
+ private int firstCachedValueIdx;
+ /** number of cached values */
+ private int numCachedValues;
+
+ private OverflowDataCache(ByteBuffer buffer, FieldOverflowDefinition overflowDef) {
+ this.bufferArray = buffer.array();
+ this.overflowDef = overflowDef;
+ this.isNullable = this.overflowDef.field.isNullable();
+ this.firstCachedValueIdx = -1;
+ this.numCachedValues = -1;
+ }
+
+ private void flush() {
+ this.firstCachedValueIdx = -1;
+ this.numCachedValues = -1;
+ }
+
+ private int load(int currentValueIdx, int maxValues) {
+ if (currentValueIdx >= overflowDef.numValues) {
+ throw new RuntimeException();
+ }
+ assert currentValueIdx < overflowDef.numValues;
+
+ if (numCachedValues > 0
+ && currentValueIdx >= lowerBound()
+ && currentValueIdx <= upperBound()) {
+
+ return upperBound() - currentValueIdx + 1;
+ }
+
+ // Either cache is empty or the requested values are not in the cache
+ loadInternal(currentValueIdx, maxValues);
+
+ return numCachedValues;
+ }
+
+ /**
+ * @return the lowest overflow value index in the cache
+ */
+ private int lowerBound() {
+ return firstCachedValueIdx;
+ }
+
+ /**
+ * @return the highest overflow value index in the cache
+ */
+ private int upperBound() {
+ return firstCachedValueIdx + numCachedValues - 1;
+ }
+
+ private byte getNullable(int valueIdx) {
+ assert isNullable;
+ assert valueIdx >= lowerBound();
+ assert valueIdx <= upperBound();
+
+ // We need to map the overflow value index to the buffer array representation
+ final int cacheIdx = (valueIdx - lowerBound()) * BatchSizingMemoryUtil.BYTE_VALUE_WIDTH;
+ return bufferArray[cacheIdx];
+ }
+
+ private int getDataLength(int valueIdx) {
+ assert valueIdx >= lowerBound();
+ assert valueIdx <= upperBound();
+
+ // We need to map the overflow value index to the buffer array representation
+ int cacheIdx1, cacheIdx2;
+
+ if (!isNullable) {
+ cacheIdx1 = (valueIdx - lowerBound()) * BatchSizingMemoryUtil.INT_VALUE_WIDTH;
+ } else {
+ cacheIdx1 = numCachedValues +
+ (valueIdx - lowerBound()) * BatchSizingMemoryUtil.INT_VALUE_WIDTH;
+ }
+ cacheIdx2 = cacheIdx1 + BatchSizingMemoryUtil.INT_VALUE_WIDTH;
+
+ return VarLenOverflowReader.getInt(bufferArray, cacheIdx2) - VarLenOverflowReader.getInt(bufferArray, cacheIdx1);
+ }
+
+ private void loadInternal(int targetIdx, int maxValues) {
+ // We need to load a new batch of overflow data (bits and offsets)
+ firstCachedValueIdx = targetIdx;
+
+ final int remaining = remaining();
+ assert remaining > 0;
+
+ final int maxValuesToLoad = Math.min(MAX_NUM_VALUES, maxValues);
+ numCachedValues = Math.min(remaining, maxValuesToLoad);
+
+ // Let us load the nullable & offsets data (the actual data doesn't have to be loaded)
+ loadNullable();
+ loadOffsets();
+ }
+
+ void loadNullable() {
+ if (!isNullable) {
+ return; // NOOP
+ }
+
+ overflowDef.buffer.getBytes(firstCachedValueIdx, bufferArray, 0, numCachedValues);
+ }
+
+ void loadOffsets() {
+ int sourceIdx, targetIdx;
+
+ if (!isNullable) {
+ sourceIdx = firstCachedValueIdx * BatchSizingMemoryUtil.INT_VALUE_WIDTH;
+ targetIdx = 0;
+
+ } else {
+ sourceIdx = overflowDef.numValues + (firstCachedValueIdx * BatchSizingMemoryUtil.INT_VALUE_WIDTH);
+ targetIdx = numCachedValues;
+ }
+
+ // We always get one extra value as to compute the length of value-i we need offset-i and offset-i+1
+ overflowDef.buffer.getBytes(sourceIdx, bufferArray, targetIdx, (numCachedValues + 1) * BatchSizingMemoryUtil.INT_VALUE_WIDTH);
+ }
+
+ private int remaining() {
+ return overflowDef.numValues - firstCachedValueIdx;
+ }
+ }
+
+}
\ No newline at end of file
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLengthColumn.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLengthColumn.java
index 16ccb85..c5e5bfd 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLengthColumn.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLengthColumn.java
@@ -32,10 +32,10 @@ public abstract class VarLengthColumn<V extends ValueVector> extends ColumnReade
Binary currDictVal;
- VarLengthColumn(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor,
+ VarLengthColumn(ParquetRecordReader parentReader, ColumnDescriptor descriptor,
ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, V v,
SchemaElement schemaElement) throws ExecutionSetupException {
- super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
+ super(parentReader, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
if (columnChunkMetaData.getEncodings().contains(Encoding.PLAIN_DICTIONARY)) {
usingDictionary = true;
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLengthColumnReaders.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLengthColumnReaders.java
index 17453fd..f4b9fe6 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLengthColumnReaders.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLengthColumnReaders.java
@@ -41,10 +41,10 @@ public final class VarLengthColumnReaders {
protected VarDecimalVector varDecimalVector;
protected VarDecimalVector.Mutator mutator;
- VarDecimalColumn(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor,
+ VarDecimalColumn(ParquetRecordReader parentReader, ColumnDescriptor descriptor,
ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, VarDecimalVector v,
SchemaElement schemaElement) throws ExecutionSetupException {
- super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
+ super(parentReader, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
this.varDecimalVector = v;
this.mutator = v.getMutator();
}
@@ -87,10 +87,10 @@ public final class VarLengthColumnReaders {
protected NullableVarDecimalVector nullableVarDecimalVector;
protected NullableVarDecimalVector.Mutator mutator;
- NullableVarDecimalColumn(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor,
+ NullableVarDecimalColumn(ParquetRecordReader parentReader, ColumnDescriptor descriptor,
ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, NullableVarDecimalVector v,
SchemaElement schemaElement) throws ExecutionSetupException {
- super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
+ super(parentReader, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
nullableVarDecimalVector = v;
this.mutator = v.getMutator();
}
@@ -133,10 +133,10 @@ public final class VarLengthColumnReaders {
private final VarCharVector.Mutator mutator;
private final VarCharVector varCharVector;
- VarCharColumn(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor,
+ VarCharColumn(ParquetRecordReader parentReader, ColumnDescriptor descriptor,
ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, VarCharVector v,
SchemaElement schemaElement) throws ExecutionSetupException {
- super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
+ super(parentReader, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
this.varCharVector = v;
this.mutator = v.getMutator();
}
@@ -181,10 +181,10 @@ public final class VarLengthColumnReaders {
protected final NullableVarCharVector.Mutator mutator;
private final NullableVarCharVector vector;
- NullableVarCharColumn(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor,
+ NullableVarCharColumn(ParquetRecordReader parentReader, ColumnDescriptor descriptor,
ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, NullableVarCharVector v,
SchemaElement schemaElement) throws ExecutionSetupException {
- super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
+ super(parentReader, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
this.vector = v;
this.mutator = vector.getMutator();
}
@@ -228,10 +228,10 @@ public final class VarLengthColumnReaders {
private final VarBinaryVector varBinaryVector;
private final VarBinaryVector.Mutator mutator;
- VarBinaryColumn(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor,
+ VarBinaryColumn(ParquetRecordReader parentReader, ColumnDescriptor descriptor,
ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, VarBinaryVector v,
SchemaElement schemaElement) throws ExecutionSetupException {
- super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
+ super(parentReader, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
this.varBinaryVector = v;
this.mutator = v.getMutator();
@@ -277,10 +277,10 @@ public final class VarLengthColumnReaders {
private final NullableVarBinaryVector nullableVarBinaryVector;
private final NullableVarBinaryVector.Mutator mutator;
- NullableVarBinaryColumn(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor,
+ NullableVarBinaryColumn(ParquetRecordReader parentReader, ColumnDescriptor descriptor,
ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, NullableVarBinaryVector v,
SchemaElement schemaElement) throws ExecutionSetupException {
- super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
+ super(parentReader, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
this.nullableVarBinaryVector = v;
this.mutator = v.getMutator();
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLengthValuesColumn.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLengthValuesColumn.java
index 4dc59af..5212ed5 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLengthValuesColumn.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLengthValuesColumn.java
@@ -44,11 +44,11 @@ public abstract class VarLengthValuesColumn<V extends ValueVector> extends VarLe
/** Bulk read operation state that needs to be maintained across batch calls */
protected final BulkReaderState bulkReaderState = new BulkReaderState();
- VarLengthValuesColumn(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor,
+ VarLengthValuesColumn(ParquetRecordReader parentReader, ColumnDescriptor descriptor,
ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, V v,
SchemaElement schemaElement) throws ExecutionSetupException {
- super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
+ super(parentReader, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
variableWidthVector = (VariableWidthVector) valueVec;
if (columnChunkMetaData.getEncodings().contains(Encoding.PLAIN_DICTIONARY)) {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/batchsizing/BatchOverflowOptimizer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/batchsizing/BatchOverflowOptimizer.java
new file mode 100644
index 0000000..ca8fc05
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/batchsizing/BatchOverflowOptimizer.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.parquet.columnreaders.batchsizing;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.drill.common.map.CaseInsensitiveMap;
+import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.store.parquet.columnreaders.ParquetColumnMetadata;
+import org.apache.drill.exec.store.parquet.columnreaders.batchsizing.BatchSizingMemoryUtil.VectorMemoryUsageInfo;
+import org.apache.drill.exec.store.parquet.columnreaders.batchsizing.RecordBatchSizerManager.ColumnMemoryInfo;
+import org.apache.drill.exec.store.parquet.columnreaders.batchsizing.RecordBatchSizerManager.VarLenColumnBatchStats;
+
+/**
+ * A class which uses aggregate statistics to minimize overflow (in terms
+ * of size and occurrence).
+ */
+final class BatchOverflowOptimizer {
+ /** A threshold to trigger column memory redistribution based on latest statistics */
+ private static final int MAX_NUM_OVERFLOWS = 3;
+
+ /** Field to column memory information map */
+ private final Map<String, ColumnMemoryInfo> columnMemoryInfoMap;
+ /** Stats for each VL column */
+ private final Map<String, ColumnPrecisionStats> columnStatsMap = CaseInsensitiveMap.newHashMap();
+ /** Number of batches */
+ private int numBatches;
+ /** Number of overflows */
+ private int numOverflows;
+ /** An indicator to track whether a column precision has changed */
+ boolean columnPrecisionChanged;
+
+ BatchOverflowOptimizer(Map<String, ColumnMemoryInfo> columnMemoryInfoMap) {
+ this.columnMemoryInfoMap = columnMemoryInfoMap;
+ }
+
+ void setup() {
+ for (ColumnMemoryInfo columnInfo : columnMemoryInfoMap.values()) {
+ final ParquetColumnMetadata columnMeta = columnInfo.columnMeta;
+
+ if (!columnMeta.isFixedLength()) {
+ columnStatsMap.put(
+ columnMeta.getField().getName(),
+ new ColumnPrecisionStats(columnMeta.getField())
+ );
+ }
+ }
+ }
+
+ boolean onEndOfBatch(int batchNumRecords, List<VarLenColumnBatchStats> batchStats) {
+
+ if (batchNumRecords == 0) {
+ return false; // NOOP
+ }
+
+ ++numBatches; // increment the number of batches
+
+ // Indicator that an overflow happened
+ boolean overflow = false;
+
+ // Reusable container to compute a VL column's amount of data within
+ // this batch.
+ final VectorMemoryUsageInfo vectorMemoryUsage = new VectorMemoryUsageInfo();
+
+ for (VarLenColumnBatchStats stat : batchStats) {
+ final String columnName = stat.vector.getField().getName();
+ ColumnPrecisionStats columnPrecisionStats = columnStatsMap.get(columnName);
+ assert columnPrecisionStats != null;
+
+ if (stat.numValuesRead > batchNumRecords) {
+ overflow = true;
+ }
+
+ // Compute this column data usage in the last batch; note that we
+ // do not account for null values as we are interested in the
+ // actual data that is being stored within a batch.
+ BatchSizingMemoryUtil.getMemoryUsage(stat.vector, stat.numValuesRead, vectorMemoryUsage);
+ final int batchColumnPrecision = Math.max(1, vectorMemoryUsage.dataBytesUsed / stat.numValuesRead);
+
+ double currAvgPrecision = columnPrecisionStats.avgPrecision;
+ double newAvgPrecision = ((numBatches - 1) * currAvgPrecision + batchColumnPrecision) / numBatches;
+
+ if (newAvgPrecision > currAvgPrecision) {
+ columnPrecisionStats.avgPrecision = (int) Math.ceil(newAvgPrecision);
+ columnPrecisionChanged = true;
+ }
+ }
+
+ if (overflow) {
+ ++numOverflows;
+ }
+
+ if (numBatches == 1 // In the first batch, we only used defaults; we need to update
+ || (columnPrecisionChanged && numOverflows >= MAX_NUM_OVERFLOWS) // better stats and overflow occurred
+ ) {
+
+ for (ColumnPrecisionStats columnPrecisionStats : columnStatsMap.values()) {
+ ColumnMemoryInfo columnInfo = columnMemoryInfoMap.get(columnPrecisionStats.field.getName());
+ assert columnInfo != null;
+
+ // update the precision
+ columnInfo.columnPrecision = columnPrecisionStats.avgPrecision;
+ columnInfo.columnMemoryQuota.reset();
+ }
+
+ // Reset some tracking counters
+ numOverflows = 0;
+ columnPrecisionChanged = false;
+
+ return true;
+ }
+
+ return false; // NOOP
+ }
+
+// ----------------------------------------------------------------------------
+// Inner Classes
+// ----------------------------------------------------------------------------
+
+ /** Container class which computes the average variable column precision */
+ private static final class ColumnPrecisionStats {
+ /** Materialized field */
+ private final MaterializedField field;
+ /** Average column precision */
+ private int avgPrecision;
+
+ private ColumnPrecisionStats(MaterializedField field) {
+ this.field = field;
+ }
+ }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/batchsizing/BatchSizingMemoryUtil.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/batchsizing/BatchSizingMemoryUtil.java
new file mode 100644
index 0000000..302d0eb
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/batchsizing/BatchSizingMemoryUtil.java
@@ -0,0 +1,329 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.parquet.columnreaders.batchsizing;
+
+import org.apache.drill.common.types.TypeProtos.MajorType;
+import org.apache.drill.exec.expr.TypeHelper;
+import org.apache.drill.exec.memory.BaseAllocator;
+import org.apache.drill.exec.store.parquet.columnreaders.ParquetColumnMetadata;
+import org.apache.drill.exec.store.parquet.columnreaders.batchsizing.RecordBatchSizerManager.ColumnMemoryQuota;
+import org.apache.drill.exec.vector.NullableVarBinaryVector;
+import org.apache.drill.exec.vector.NullableVarCharVector;
+import org.apache.drill.exec.vector.NullableVarDecimalVector;
+import org.apache.drill.exec.vector.UInt1Vector;
+import org.apache.drill.exec.vector.UInt4Vector;
+import org.apache.drill.exec.vector.ValueVector;
+import org.apache.drill.exec.vector.VarBinaryVector;
+import org.apache.drill.exec.vector.VarCharVector;
+import org.apache.drill.exec.vector.VarDecimalVector;
+import org.apache.drill.exec.vector.VariableWidthVector;
+
+/** Helper class to assist the Flat Parquet reader build batches which adhere to memory sizing constraints */
+public final class BatchSizingMemoryUtil {
+
+ /** BYTE in-memory width */
+ public static final int BYTE_VALUE_WIDTH = UInt1Vector.VALUE_WIDTH;
+ /** INT in-memory width */
+ public static final int INT_VALUE_WIDTH = UInt4Vector.VALUE_WIDTH;
+ /** Default variable length column average precision;
+ * computed in such a way that 64k values will fit within one MB to minimize internal fragmentation
+ */
+ public static final int DEFAULT_VL_COLUMN_AVG_PRECISION = 16;
+
+ /**
+ * This method will also load detailed information about this column's current memory usage (with regard
+ * to the value vectors).
+ *
+ * @param columnMemoryUsage container which contains column's memory usage information (usage information will
+ * be automatically updated by this method)
+ * @param newBitsMemory New nullable data which might be inserted when processing a new input chunk
+ * @param newOffsetsMemory New offsets data which might be inserted when processing a new input chunk
+ * @param newDataMemory New data which might be inserted when processing a new input chunk
+ *
+ * @return true if adding the new data will not lead this column's Value Vector go beyond the allowed
+ * limit; false otherwise
+ */
+ public static boolean canAddNewData(ColumnMemoryUsageInfo columnMemoryUsage,
+ int newBitsMemory,
+ int newOffsetsMemory,
+ int newDataMemory) {
+
+ // First we need to update the vector memory usage
+ final VectorMemoryUsageInfo vectorMemoryUsage = columnMemoryUsage.vectorMemoryUsage;
+ getMemoryUsage(columnMemoryUsage.vector, columnMemoryUsage.currValueCount, vectorMemoryUsage);
+
+ // We need to compute the new ValueVector memory usage if we attempt to add the new payload
+ // usedCapacity, int newPayload, int currentCapacity
+ int totalBitsMemory = computeNewVectorCapacity(vectorMemoryUsage.bitsBytesUsed,
+ newBitsMemory,
+ vectorMemoryUsage.bitsBytesCapacity);
+
+ int totalOffsetsMemory = computeNewVectorCapacity(vectorMemoryUsage.offsetsBytesUsed,
+ newOffsetsMemory,
+ vectorMemoryUsage.offsetsByteCapacity);
+
+ int totalDataMemory = computeNewVectorCapacity(vectorMemoryUsage.dataBytesUsed,
+ newDataMemory,
+ vectorMemoryUsage.dataByteCapacity);
+
+ // Alright now we can figure out whether the new payload will take us over the maximum memory threshold
+ int totalMemory = totalBitsMemory + totalOffsetsMemory + totalDataMemory;
+ assert totalMemory >= 0;
+
+ return totalMemory <= columnMemoryUsage.memoryQuota.getMaxMemoryUsage();
+ }
+
+ /**
+ * Load memory usage information for a variable length value vector
+ *
+ * @param vector source value vector
+ * @param currValueCount current value count
+ * @param vectorMemory result object which contains source vector memory usage information
+ */
+ public static void getMemoryUsage(ValueVector sourceVector,
+ int currValueCount,
+ VectorMemoryUsageInfo vectorMemoryUsage) {
+
+ assert sourceVector instanceof VariableWidthVector;
+
+ vectorMemoryUsage.reset(); // reset result container
+
+ final MajorType type = sourceVector.getField().getType();
+
+ switch (type.getMinorType()) {
+ case VARCHAR: {
+ switch (type.getMode()) {
+ case REQUIRED: {
+ VarCharVector vector = (VarCharVector) sourceVector;
+ vectorMemoryUsage.offsetsByteCapacity = vector.getOffsetVector().getValueCapacity() * INT_VALUE_WIDTH;
+ vectorMemoryUsage.dataByteCapacity = vector.getByteCapacity();
+ vectorMemoryUsage.offsetsBytesUsed = vector.getOffsetVector().getPayloadByteCount(currValueCount);
+ vectorMemoryUsage.dataBytesUsed = vector.getPayloadByteCount(currValueCount) - vectorMemoryUsage.offsetsBytesUsed;
+ break;
+ }
+ case OPTIONAL: {
+ NullableVarCharVector vector = (NullableVarCharVector) sourceVector;
+ VarCharVector values = vector.getValuesVector();
+ vectorMemoryUsage.bitsBytesCapacity = vector.getBitsValueCapacity();
+ vectorMemoryUsage.offsetsByteCapacity = values.getOffsetVector().getValueCapacity() * INT_VALUE_WIDTH;
+ vectorMemoryUsage.dataByteCapacity = values.getByteCapacity();
+ vectorMemoryUsage.bitsBytesUsed = currValueCount * BYTE_VALUE_WIDTH;
+ vectorMemoryUsage.offsetsBytesUsed = values.getOffsetVector().getPayloadByteCount(currValueCount);
+ vectorMemoryUsage.dataBytesUsed = values.getPayloadByteCount(currValueCount) - vectorMemoryUsage.offsetsBytesUsed;
+ break;
+ }
+
+ default : throw new IllegalArgumentException("Mode [" + type.getMode().name() + "] not supported..");
+ }
+ break;
+ }
+
+ case VARBINARY: {
+ switch (type.getMode()) {
+ case REQUIRED: {
+ VarBinaryVector vector = (VarBinaryVector) sourceVector;
+ vectorMemoryUsage.offsetsByteCapacity = vector.getOffsetVector().getValueCapacity() * INT_VALUE_WIDTH;
+ vectorMemoryUsage.dataByteCapacity = vector.getByteCapacity();
+ vectorMemoryUsage.offsetsBytesUsed = vector.getOffsetVector().getPayloadByteCount(currValueCount);
+ vectorMemoryUsage.dataBytesUsed = vector.getPayloadByteCount(currValueCount) - vectorMemoryUsage.offsetsBytesUsed;
+ break;
+ }
+ case OPTIONAL: {
+ NullableVarBinaryVector vector = (NullableVarBinaryVector) sourceVector;
+ VarBinaryVector values = vector.getValuesVector();
+ vectorMemoryUsage.bitsBytesCapacity = vector.getBitsValueCapacity();
+ vectorMemoryUsage.offsetsByteCapacity = values.getOffsetVector().getValueCapacity() * INT_VALUE_WIDTH;
+ vectorMemoryUsage.dataByteCapacity = values.getByteCapacity();
+ vectorMemoryUsage.bitsBytesUsed = currValueCount * BYTE_VALUE_WIDTH;
+ vectorMemoryUsage.offsetsBytesUsed = values.getOffsetVector().getPayloadByteCount(currValueCount);
+ vectorMemoryUsage.dataBytesUsed = values.getPayloadByteCount(currValueCount) - vectorMemoryUsage.offsetsBytesUsed;
+ break;
+ }
+
+ default : throw new IllegalArgumentException("Mode [" + type.getMode().name() + "] not supported..");
+ }
+ break;
+ }
+
+ case VARDECIMAL: {
+ switch (type.getMode()) {
+ case REQUIRED: {
+ VarDecimalVector vector = (VarDecimalVector) sourceVector;
+ vectorMemoryUsage.offsetsByteCapacity = vector.getOffsetVector().getValueCapacity() * INT_VALUE_WIDTH;
+ vectorMemoryUsage.dataByteCapacity = vector.getByteCapacity();
+ vectorMemoryUsage.offsetsBytesUsed = vector.getOffsetVector().getPayloadByteCount(currValueCount);
+ vectorMemoryUsage.dataBytesUsed = vector.getPayloadByteCount(currValueCount) - vectorMemoryUsage.offsetsBytesUsed;
+ break;
+ }
+ case OPTIONAL: {
+ NullableVarDecimalVector vector = (NullableVarDecimalVector) sourceVector;
+ VarDecimalVector values = vector.getValuesVector();
+ vectorMemoryUsage.bitsBytesCapacity = vector.getBitsValueCapacity();
+ vectorMemoryUsage.offsetsByteCapacity = values.getOffsetVector().getValueCapacity() * INT_VALUE_WIDTH;
+ vectorMemoryUsage.dataByteCapacity = values.getByteCapacity();
+ vectorMemoryUsage.bitsBytesUsed = currValueCount * BYTE_VALUE_WIDTH;
+ vectorMemoryUsage.offsetsBytesUsed = values.getOffsetVector().getPayloadByteCount(currValueCount);
+ vectorMemoryUsage.dataBytesUsed = values.getPayloadByteCount(currValueCount) - vectorMemoryUsage.offsetsBytesUsed;
+ break;
+ }
+
+ default : throw new IllegalArgumentException("Mode [" + type.getMode().name() + "] not supported..");
+ }
+ break;
+ }
+
+ default : throw new IllegalArgumentException("Type [" + type.getMinorType().name() + "] not supported..");
+ } // End of minor-type-switch-statement
+
+ assert vectorMemoryUsage.bitsBytesCapacity >= 0;
+ assert vectorMemoryUsage.bitsBytesUsed >= 0;
+ assert vectorMemoryUsage.offsetsByteCapacity >= 0;
+ assert vectorMemoryUsage.offsetsBytesUsed >= 0;
+ assert vectorMemoryUsage.dataByteCapacity >= 0;
+ assert vectorMemoryUsage.dataBytesUsed >= 0;
+
+ }
+
+ /**
+ * @param column fixed column's metadata
+ * @return column byte precision
+ */
+ public static int getFixedColumnTypePrecision(ParquetColumnMetadata column) {
+ assert column.isFixedLength();
+
+ return TypeHelper.getSize(column.getField().getType());
+ }
+
+ /**
+ * This method will return a default value for variable columns; it aims at minimizing internal fragmentation.
+ * <p><b>Note</b> that the {@link TypeHelper} uses a large default value which might not be always appropriate.
+ *
+ * @param column fixed column's metadata
+ * @return column byte precision
+ */
+ public static int getAvgVariableLengthColumnTypePrecision(ParquetColumnMetadata column) {
+ assert !column.isFixedLength();
+
+ return DEFAULT_VL_COLUMN_AVG_PRECISION;
+ }
+
+ /**
+ * @param fixed column's metadata
+ * @param valueCount number of column values
+ * @return memory size required to store "valueCount" within a value vector
+ */
+ public static int computeFixedLengthVectorMemory(ParquetColumnMetadata column, int valueCount) {
+ assert column.isFixedLength();
+
+ // Formula: memory-usage = next-power-of-two(byte-size * valueCount) // nullable storage (if any)
+ // + next-power-of-two(DT_LEN * valueCount) // data storage
+
+ int memoryUsage = BaseAllocator.nextPowerOfTwo(getFixedColumnTypePrecision(column) * valueCount);
+
+ if (column.getField().isNullable()) {
+ memoryUsage += BaseAllocator.nextPowerOfTwo(BYTE_VALUE_WIDTH * valueCount);
+ }
+
+ return memoryUsage;
+ }
+
+ /**
+ * @param variable length column's metadata
+ * @param averagePrecision VL column average precision
+ * @param valueCount number of column values
+ * @return memory size required to store "valueCount" within a value vector
+ */
+ public static int computeVariableLengthVectorMemory(ParquetColumnMetadata column,
+ int averagePrecision, int valueCount) {
+
+ assert !column.isFixedLength();
+
+ // Formula: memory-usage = next-power-of-two(byte-size * valueCount) // nullable storage (if any)
+ // + next-power-of-two(int-size * valueCount) // offsets storage
+ // + next-power-of-two(DT_LEN * valueCount) // data storage
+ int memoryUsage = BaseAllocator.nextPowerOfTwo(averagePrecision * valueCount);
+ memoryUsage += BaseAllocator.nextPowerOfTwo(INT_VALUE_WIDTH * (valueCount + 1));
+
+ if (column.getField().isNullable()) {
+ memoryUsage += BaseAllocator.nextPowerOfTwo(valueCount);
+ }
+ return memoryUsage;
+ }
+
+// ----------------------------------------------------------------------------
+// Internal implementation
+// ----------------------------------------------------------------------------
+
+ private static int computeNewVectorCapacity(int usedCapacity, int newPayload, int currentCapacity) {
+ int newUsedCapacity = BaseAllocator.nextPowerOfTwo(usedCapacity + newPayload);
+ assert newUsedCapacity >= 0;
+
+ return Math.max(currentCapacity, newUsedCapacity);
+ }
+
+// ----------------------------------------------------------------------------
+// Inner data structure
+// ----------------------------------------------------------------------------
+
+ /**
+ * A container class to hold a column batch memory usage information.
+ */
+ public static final class ColumnMemoryUsageInfo {
+ /** Value vector which contains the column batch data */
+ public ValueVector vector;
+ /** Column memory quota */
+ public ColumnMemoryQuota memoryQuota;
+ /** Current record count stored within the value vector */
+ public int currValueCount;
+ /** Current vector memory usage */
+ public final VectorMemoryUsageInfo vectorMemoryUsage = new VectorMemoryUsageInfo();
+ }
+
+ /** Container class which holds memory usage information about a variable length {@link ValueVector};
+ * all values are in bytes.
+ */
+ public static final class VectorMemoryUsageInfo {
+ /** Bits vector capacity */
+ public int bitsBytesCapacity;
+ /** Offsets vector capacity */
+ public int offsetsByteCapacity;
+ /** Data vector capacity */
+ public int dataByteCapacity;
+ /** Bits vector used up capacity */
+ public int bitsBytesUsed;
+ /** Offsets vector used up capacity */
+ public int offsetsBytesUsed;
+ /** Data vector used up capacity */
+ public int dataBytesUsed;
+
+ public void reset() {
+ bitsBytesCapacity = 0;
+ offsetsByteCapacity = 0;
+ dataByteCapacity = 0;
+ bitsBytesUsed = 0;
+ offsetsBytesUsed = 0;
+ dataBytesUsed = 0;
+ }
+ }
+
+ /** Disabling object instantiation */
+ private BatchSizingMemoryUtil() {
+ // NOOP
+ }
+
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/batchsizing/OverflowSerDeUtil.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/batchsizing/OverflowSerDeUtil.java
new file mode 100644
index 0000000..c542803
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/batchsizing/OverflowSerDeUtil.java
@@ -0,0 +1,366 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.parquet.columnreaders.batchsizing;
+
+import io.netty.buffer.DrillBuf;
+import java.util.List;
+import java.util.Map;
+import org.apache.drill.common.map.CaseInsensitiveMap;
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.store.parquet.columnreaders.batchsizing.RecordBatchOverflow.FieldOverflowDefinition;
+import org.apache.drill.exec.store.parquet.columnreaders.batchsizing.RecordBatchOverflow.FieldOverflowEntry;
+import org.apache.drill.exec.store.parquet.columnreaders.batchsizing.RecordBatchOverflow.RecordOverflowContainer;
+import org.apache.drill.exec.store.parquet.columnreaders.batchsizing.RecordBatchOverflow.RecordOverflowDefinition;
+import org.apache.drill.exec.vector.UInt1Vector;
+import org.apache.drill.exec.vector.UInt4Vector;
+
+/**
+ * Field overflow SERDE utility; note that overflow data is serialized as a way to minimize
+ * memory usage. This information is deserialized back to ValueVectors when it is needed in
+ * the next batch.
+ *
+ * <p><b>NOTE -</b>We use a specialized implementation for overflow SERDE (instead of reusing
+ * existing ones) because of the following reasons:
+ * <ul>
+ * <li>We want to only serialize a subset of the VV data
+ * <li>Other SERDE methods will not copy the data contiguously and instead rely on the
+ * RPC layer to write the drill buffers in the correct order so that they are
+ * de-serialized as a single contiguous drill buffer
+ * </ul>
+ */
+final class OverflowSerDeUtil {
+ private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(OverflowSerDeUtil.class);
+
+ /**
+ * Serializes a collection of overflow fields into a memory buffer:
+ * <ul>
+ * <li>Serialization logic can handle a subset of values (should be contiguous)
+ * <li>Serialized data is copied into a single DrillBuf
+ * <li>Currently, only variable length data is supported
+ * </ul>
+ *
+ * @param fieldOverflowEntries input collection of field overflow entries
+ * @param allocator buffer allocator
+ * @return record overflow container; null if the input buffer is empty
+ */
+ static RecordOverflowContainer serialize(List<FieldOverflowEntry> fieldOverflowEntries,
+ BufferAllocator allocator) {
+
+ if (fieldOverflowEntries == null || fieldOverflowEntries.isEmpty()) {
+ return null;
+ }
+
+ // We need to:
+ // - Construct a map of VLVectorSerDe for each overflow field
+ // - Compute the total space required for efficient serialization of all overflow data
+ final Map<String, VLVectorSerializer> fieldSerDeMap = CaseInsensitiveMap.newHashMap();
+ int bufferLength = 0;
+
+ for (FieldOverflowEntry fieldOverflowEntry : fieldOverflowEntries) {
+ final VLVectorSerializer fieldVLSerDe = new VLVectorSerializer(fieldOverflowEntry);
+ fieldSerDeMap.put(fieldOverflowEntry.vector.getField().getName(), fieldVLSerDe);
+
+ bufferLength += fieldVLSerDe.getBytesUsed(fieldOverflowEntry.firstValueIdx, fieldOverflowEntry.numValues);
+ }
+ assert bufferLength >= 0;
+
+ // Allocate the required memory to serialize the overflow fields
+ final DrillBuf buffer = allocator.buffer(bufferLength);
+
+ if (logger.isDebugEnabled()) {
+ logger.debug(String.format("Allocated a buffer of length %d to handle overflow", bufferLength));
+ }
+
+ // Create the result object
+ final RecordOverflowContainer recordOverflowContainer = new RecordOverflowContainer();
+ final RecordOverflowDefinition recordOverflowDef = recordOverflowContainer.recordOverflowDef;
+
+ // Now serialize field overflow into the drill buffer
+ int bufferOffset = 0;
+ FieldSerializerContainer fieldSerializerContainer = new FieldSerializerContainer();
+
+ for (FieldOverflowEntry fieldOverflowEntry : fieldOverflowEntries) {
+ fieldSerializerContainer.clear();
+
+ // Serialize the field overflow data into the buffer
+ VLVectorSerializer fieldSerDe = fieldSerDeMap.get(fieldOverflowEntry.vector.getField().getName());
+ assert fieldSerDe != null;
+
+ fieldSerDe.copyValueVector(fieldOverflowEntry.firstValueIdx,
+ fieldOverflowEntry.numValues,
+ buffer,
+ bufferOffset,
+ fieldSerializerContainer);
+
+ // Create a view DrillBuf for isolating this field overflow data
+ DrillBuf fieldBuffer = buffer.slice(bufferOffset, fieldSerializerContainer.totalByteLength);
+ fieldBuffer.retain(1); // Increase the reference count
+ fieldBuffer.writerIndex(fieldSerializerContainer.totalByteLength);
+
+ // Enqueue a field overflow definition object for the current field
+ FieldOverflowDefinition fieldOverflowDef = new FieldOverflowDefinition(
+ fieldOverflowEntry.vector.getField(),
+ fieldOverflowEntry.numValues,
+ fieldSerializerContainer.dataByteLen,
+ fieldBuffer);
+
+ recordOverflowDef.getFieldOverflowDefs().put(fieldOverflowEntry.vector.getField().getName(), fieldOverflowDef);
+
+ // Update this drill buffer current offset
+ bufferOffset += fieldSerializerContainer.totalByteLength;
+ }
+
+ // Finally, release the original buffer
+ boolean isReleased = buffer.release();
+ assert !isReleased; // the reference count should still be higher than zero
+
+ return recordOverflowContainer;
+ }
+
+ /** Disabling object instantiation */
+ private OverflowSerDeUtil() {
+ // NOOP
+ }
+
+// ----------------------------------------------------------------------------
+// Internal Data Structure
+// ----------------------------------------------------------------------------
+
+ /** Container class to store the result of field overflow serialization */
+ private static final class FieldSerializerContainer {
+ /** Data byte length */
+ int dataByteLen;
+ /** Total byte length */
+ int totalByteLength;
+
+ void clear() {
+ dataByteLen = 0;
+ totalByteLength = 0;
+ }
+ }
+
+ /**
+ * Helper class for handling variable length {@link ValueVector} overflow serialization logic
+ */
+ private static final class VLVectorSerializer {
+ private static final int BYTE_VALUE_WIDTH = UInt1Vector.VALUE_WIDTH;
+ private static final int INT_VALUE_WIDTH = UInt4Vector.VALUE_WIDTH;
+
+ /** Field overflow entry */
+ private final FieldOverflowEntry fieldOverflowEntry;
+
+ /** Set of DrillBuf's that make up the underlying ValueVector. Only nullable
+ * (VarChar or VarBinary) vectors have three entries. The format is
+ * ["bits-vector"] "offsets-vector" "values-vector"
+ */
+ private final DrillBuf[] buffers;
+
+ /**
+ * Constructor.
+ * @param fieldOverflowEntry field overflow entry
+ */
+ private VLVectorSerializer(FieldOverflowEntry fieldOverflowEntry) {
+ this.fieldOverflowEntry = fieldOverflowEntry;
+ this.buffers = this.fieldOverflowEntry.vector.getBuffers(false);
+ }
+
+ /**
+ * The number of bytes used (by the {@link ValueVector}) to store a value range delimited
+ * by the parameters "firstValueIdx" and "numValues".
+ *
+ * @param firstValueIdx start index of the first value to copy
+ * @param numValues number of values to copy
+ */
+ private int getBytesUsed(int firstValueIdx, int numValues) {
+ int bytesUsed = 0;
+
+ // Only nullable (VarChar or VarBinary) vectors have three entries. The format is
+ // ["bits-vector"] "offsets-vector" "values-vector"
+ if (isNullable()) {
+ bytesUsed += numValues * BYTE_VALUE_WIDTH;
+ }
+
+ // Add the length of the "offsets" vector for the requested range
+ bytesUsed += (numValues + 1) * INT_VALUE_WIDTH;
+
+ // Add the length of the "values" vector for the requested range
+ bytesUsed += getDataLen(firstValueIdx, numValues);
+
+ return bytesUsed;
+ }
+
+ private void copyValueVector(int firstValueIdx,
+ int numValues,
+ DrillBuf targetBuffer,
+ int targetStartIdx,
+ FieldSerializerContainer fieldSerializerContainer) {
+
+ int len = 0;
+ int totalLen = 0;
+
+ // First copy the "bits" vector
+ len = copyBitsVector(firstValueIdx, numValues, targetBuffer, targetStartIdx);
+ assert len >= 0;
+
+ // Then copy the "offsets" vector
+ totalLen += len;
+ len = copyOffsetVector(firstValueIdx, numValues, targetBuffer, targetStartIdx + totalLen);
+ assert len >= 0;
+
+ // Then copy the "values" vector
+ totalLen += len;
+ len = copyValuesVector(firstValueIdx, numValues, targetBuffer, targetStartIdx + totalLen);
+ assert len >= 0;
+
+ // Finally, update the field serializer container
+ fieldSerializerContainer.dataByteLen = len;
+ fieldSerializerContainer.totalByteLength = (totalLen + len);
+ }
+
+ /**
+ * Copy the "bits" vector if the {@link ValueVector} is nullable; NOOP otherwise.
+ *
+ * @param firstValueIdx start index of the first value to copy
+ * @param numValues number of values to copy
+ * @param targetBuffer target buffer
+ * @param targetStartIdx target buffer start index
+ *
+ * @return number of bytes written
+ */
+ private int copyBitsVector(int firstValueIdx, int numValues, DrillBuf targetBuffer, int targetStartIdx) {
+ int bytesCopied = 0;
+
+ if (!isNullable()) {
+ return bytesCopied;
+ }
+
+ DrillBuf srcBuffer = getBitsBuffer();
+ assert srcBuffer != null;
+
+ bytesCopied = numValues * BYTE_VALUE_WIDTH;
+
+ // Now copy the bits data into the target buffer
+ srcBuffer.getBytes(firstValueIdx * BYTE_VALUE_WIDTH,
+ targetBuffer,
+ targetStartIdx,
+ bytesCopied);
+
+ return bytesCopied;
+ }
+
+ /**
+ * Copy the "offset" vector if the {@link ValueVector}; note that no adjustment of the offsets will be done.
+ * This task will be done during overflow data de-serialization.
+ *
+ * @param firstValueIdx start index of the first value to copy
+ * @param numValues number of values to copy
+ * @param targetBuffer target buffer
+ * @param targetStartIdx target buffer start index
+ *
+ * @return number of bytes written
+ */
+ private int copyOffsetVector(int firstValueIdx,
+ int numValues,
+ DrillBuf targetBuffer,
+ int targetStartIdx) {
+
+ final int bytesCopied = (numValues + 1) * INT_VALUE_WIDTH;
+
+ DrillBuf srcBuffer = getOffsetsBuffer();
+ assert srcBuffer != null;
+
+ // Now copy the bits data into the target buffer
+ srcBuffer.getBytes(firstValueIdx * INT_VALUE_WIDTH,
+ targetBuffer,
+ targetStartIdx,
+ bytesCopied);
+
+ return bytesCopied;
+ }
+
+ /**
+ * Copy the "values" vector if the {@link ValueVector}.
+ *
+ * @param firstValueIdx start index of the first value to copy
+ * @param numValues number of values to copy
+ * @param targetBuffer target buffer
+ * @param targetStartIdx target buffer start index
+ *
+ * @return number of bytes written
+ */
+ private int copyValuesVector(int firstValueIdx,
+ int numValues,
+ DrillBuf targetBuffer,
+ int targetStartIdx) {
+
+ final DrillBuf offsets = getOffsetsBuffer();
+ final int startDataOffset = offsets.getInt(firstValueIdx * INT_VALUE_WIDTH);
+ final int bytesCopied = getDataLen(firstValueIdx, numValues);
+ final DrillBuf srcBuffer = getValuesBuffer();
+ assert srcBuffer != null;
+
+ // Now copy the bits data into the target buffer
+ srcBuffer.getBytes(startDataOffset,
+ targetBuffer,
+ targetStartIdx,
+ bytesCopied);
+
+ return bytesCopied;
+ }
+
+ /**
+ *
+ * @param firstValueIdx start index of the first value to copy
+ * @param numValues number of values to copy
+ *
+ * @return data length contained in the range [first-val, (first-val+range-len)]
+ */
+ private int getDataLen(int firstValueIdx, int numValues) {
+ // Data values for a range can be copied by the following formula:
+ // offsets[firstValueIdx+numValues] - offsets[firstValueIdx]
+ final DrillBuf offsets = getOffsetsBuffer();
+ final int endDataOffset = offsets.getInt((firstValueIdx + numValues) * INT_VALUE_WIDTH);
+ final int startDataOffset = offsets.getInt(firstValueIdx * INT_VALUE_WIDTH);
+
+ return endDataOffset - startDataOffset;
+
+ }
+
+ /** @return "bits" buffer; null if the associated {@link ValueVector} is not nullable */
+ private DrillBuf getBitsBuffer() {
+ return buffers.length == 3 ? buffers[0] : null;
+ }
+
+ /** @return "offsets" buffer */
+ private DrillBuf getOffsetsBuffer() {
+ return buffers.length == 3 ? buffers[1] : buffers[0];
+ }
+
+ /** @return "values" buffer */
+ private DrillBuf getValuesBuffer() {
+ return buffers.length == 3 ? buffers[2] : buffers[1];
+ }
+
+ /** @return whether this vector is nullable */
+ private boolean isNullable() {
+ return getBitsBuffer() != null;
+ }
+
+ } // End of VLVectorSerDe class
+
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/batchsizing/RecordBatchOverflow.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/batchsizing/RecordBatchOverflow.java
new file mode 100644
index 0000000..76422ae
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/batchsizing/RecordBatchOverflow.java
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.parquet.columnreaders.batchsizing;
+
+import io.netty.buffer.DrillBuf;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import org.apache.drill.common.map.CaseInsensitiveMap;
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.vector.ValueVector;
+import org.apache.drill.exec.vector.VariableWidthVector;
+
+/**
+ * Logic for handling batch record overflow; this class essentially serializes overflow vector data in a
+ * compact manner so that it is reused for building the next record batch.
+ */
+public final class RecordBatchOverflow {
+ /** Record Overflow Definition */
+ final RecordOverflowDefinition recordOverflowDef;
+ /** Buffer allocator */
+ final BufferAllocator allocator;
+
+ /**
+ * @param allocator buffer allocator
+ * @return new builder object
+ */
+ public static Builder newBuilder(BufferAllocator allocator) {
+ return new Builder(allocator);
+ }
+
+ /**
+ * @return the record overflow definition
+ */
+ public RecordOverflowDefinition getRecordOverflowDefinition() {
+ return recordOverflowDef;
+ }
+
+ /**
+ * Constructor.
+ *
+ * @param recordOverflowDef record overflow definition
+ * @param allocator buffer allocator
+ */
+ private RecordBatchOverflow(RecordOverflowDefinition recordOverflowDef,
+ BufferAllocator allocator) {
+
+ this.recordOverflowDef = recordOverflowDef;
+ this.allocator = allocator;
+ }
+
+// ----------------------------------------------------------------------------
+// Inner Data Structure
+// ----------------------------------------------------------------------------
+
+ /** Builder class to construct a {@link RecordBatchOverflow} object */
+ public static final class Builder {
+ /** Field overflow list */
+ private final List<FieldOverflowEntry> fieldOverflowEntries = new ArrayList<FieldOverflowEntry>();
+ /** Buffer allocator */
+ private final BufferAllocator allocator;
+
+ /**
+ * Build class to construct a {@link RecordBatchOverflow} object.
+ * @param allocator buffer allocator
+ */
+ private Builder(BufferAllocator allocator) {
+ this.allocator = allocator;
+ }
+
+ /**
+ * Add an overflow field to this batch record overflow object; note that currently only
+ * variable numValues objects are supported.
+ *
+ * @param vector a value vector with overflow values
+ * @param firstValueIdx index of first overflow value
+ * @param numValues the number of overflow values starting at index "firstValueIdx"
+ */
+ public void addFieldOverflow(ValueVector vector, int firstValueIdx, int numValues) {
+ assert vector instanceof VariableWidthVector;
+ fieldOverflowEntries.add(new FieldOverflowEntry(vector, firstValueIdx, numValues));
+ }
+
+ /**
+ * @return a new built {link BatchRecordOverflow} object instance
+ */
+ public RecordBatchOverflow build() {
+ RecordOverflowContainer overflowContainer = OverflowSerDeUtil.serialize(fieldOverflowEntries, allocator);
+ RecordBatchOverflow result =
+ new RecordBatchOverflow(overflowContainer.recordOverflowDef, allocator);
+
+ return result;
+ }
+ } // End of Builder
+
+ /** Field overflow entry */
+ static final class FieldOverflowEntry {
+ /** A value vector with overflow values */
+ final ValueVector vector;
+ /** index of first overflow value */
+ final int firstValueIdx;
+ /** The number of overflow values starting at index "firstValueIdx" */
+ final int numValues;
+
+ /**
+ * Field overflow entry constructor
+ *
+ * @param vector a value vector with overflow values
+ * @param firstValueIdx index of first overflow value
+ * @param numValues the number of overflow values starting at index "firstValueIdx"
+ */
+ private FieldOverflowEntry(ValueVector vector, int firstValueIdx, int numValues) {
+ this.vector = vector;
+ this.firstValueIdx = firstValueIdx;
+ this.numValues = numValues;
+ }
+ } // End of FieldOverflowEntry
+
+ /** Record batch definition */
+ public static final class RecordOverflowDefinition {
+ private final Map<String, FieldOverflowDefinition> fieldOverflowDefs = CaseInsensitiveMap.newHashMap();
+
+ public Map<String, FieldOverflowDefinition> getFieldOverflowDefs() {
+ return fieldOverflowDefs;
+ }
+ } // End of RecordOverflowDefinition
+
+ /** Field overflow definition */
+ public static final class FieldOverflowDefinition {
+ /** Materialized field */
+ public final MaterializedField field;
+ /** Number of values */
+ public final int numValues;
+ /** Data byte length */
+ public final int dataByteLen;
+ /** DrillBuf where the serialized data is stored */
+ public final DrillBuf buffer;
+
+ /**
+ * Field overflow definition constructor
+ * @param field materialized field
+ * @param numValues number of values
+ * @param dataByteLen data byte length
+ * @param buffer DrillBuf where the serialized data is stored
+ */
+ FieldOverflowDefinition(MaterializedField field, int numValues, int dataByteLen, DrillBuf buffer) {
+ this.field = field;
+ this.numValues = numValues;
+ this.dataByteLen = dataByteLen;
+ this.buffer = buffer;
+ }
+ } // End of FieldOverflowDefinition
+
+ /** Record overflow container */
+ static final class RecordOverflowContainer {
+ /** Record Overflow Definition */
+ final RecordOverflowDefinition recordOverflowDef = new RecordOverflowDefinition();
+
+ } // End of RecordOverflowContainer
+
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/batchsizing/RecordBatchSizerManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/batchsizing/RecordBatchSizerManager.java
new file mode 100644
index 0000000..01644f7
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/batchsizing/RecordBatchSizerManager.java
@@ -0,0 +1,661 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.parquet.columnreaders.batchsizing;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import org.apache.drill.common.map.CaseInsensitiveMap;
+import org.apache.drill.common.types.TypeProtos.MajorType;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.exception.OutOfMemoryException;
+import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.server.options.OptionManager;
+import org.apache.drill.exec.store.parquet.columnreaders.ParquetColumnMetadata;
+import org.apache.drill.exec.store.parquet.columnreaders.ParquetSchema;
+import org.apache.drill.exec.store.parquet.columnreaders.VarLenColumnBulkInput;
+import org.apache.drill.exec.store.parquet.columnreaders.batchsizing.RecordBatchOverflow.FieldOverflowDefinition;
+import org.apache.drill.exec.vector.AllocationHelper;
+import org.apache.drill.exec.vector.ValueVector;
+
+/**
+ * This class is tasked with managing all aspects of flat Parquet reader record batch sizing logic.
+ * Currently a record batch size is constrained with two parameters: Number of rows and Memory usage.
+ */
+public final class RecordBatchSizerManager {
+ private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RecordBatchSizerManager.class);
+ public static final String BATCH_STATS_PREFIX = "BATCH_STATS";
+
+ /** Minimum column memory size */
+ private static final int MIN_COLUMN_MEMORY_SZ = VarLenColumnBulkInput.getMinVLColumnMemorySize();
+
+ /** Parquet schema object */
+ private final ParquetSchema schema;
+ /** Total records to read */
+ private final long totalRecordsToRead;
+ /** Logic to minimize overflow occurrences */
+ private final BatchOverflowOptimizer overflowOptimizer;
+
+ /** Configured Parquet records per batch */
+ private final int configRecordsPerBatch;
+ /** Configured Parquet memory size per batch */
+ private final int configMemorySizePerBatch;
+ /** An upper bound on the Parquet records per batch based on the configured value and schema */
+ private int maxRecordsPerBatch;
+ /** An upper bound on the Parquet memory size per batch based on the configured value and schema */
+ private int maxMemorySizePerBatch;
+ /** The current number of records per batch as it can be dynamically optimized */
+ private int recordsPerBatch;
+
+ /** List of fixed columns */
+ private final List<ColumnMemoryInfo> fixedLengthColumns = new ArrayList<ColumnMemoryInfo>();
+ /** List of variable columns */
+ private final List<ColumnMemoryInfo> variableLengthColumns = new ArrayList<ColumnMemoryInfo>();
+ /** Field to column memory information map */
+ private final Map<String, ColumnMemoryInfo> columnMemoryInfoMap = CaseInsensitiveMap.newHashMap();
+ /** Indicator invoked when column(s) precision change */
+ private boolean columnPrecisionChanged;
+
+ /**
+ * Field overflow map; this information is stored within this class for two reasons:
+ * a) centralization to simplify resource deallocation (overflow data is backed by Direct Memory)
+ * b) overflow is a result of batch constraints enforcement which this class manages the overflow logic
+ */
+ private Map<String, FieldOverflowStateContainer> fieldOverflowMap = CaseInsensitiveMap.newHashMap();
+
+ /**
+ * Constructor.
+ *
+ * @param options drill options
+ * @param schema current reader schema
+ * @param totalRecordsToRead total number of rows to read
+ */
+ public RecordBatchSizerManager(OptionManager options,
+ ParquetSchema schema,
+ long totalRecordsToRead) {
+
+ this.schema = schema;
+ this.totalRecordsToRead = totalRecordsToRead;
+ this.configRecordsPerBatch = (int) options.getLong(ExecConstants.PARQUET_FLAT_BATCH_NUM_RECORDS);
+ this.configMemorySizePerBatch = getConfiguredMaxBatchMemory(options);
+ this.maxMemorySizePerBatch = this.configMemorySizePerBatch;
+ this.maxRecordsPerBatch = this.configRecordsPerBatch;
+ this.recordsPerBatch = this.configRecordsPerBatch;
+ this.overflowOptimizer = new BatchOverflowOptimizer(columnMemoryInfoMap);
+ }
+
+ /**
+ * Tunes record batch parameters based on configuration and schema.
+ */
+ public void setup() {
+
+ // Normalize batch parameters
+ this.maxMemorySizePerBatch = normalizeMemorySizePerBatch();
+ this.maxRecordsPerBatch = normalizeNumRecordsPerBatch();
+
+ // Let's load the column metadata
+ loadColumnsPrecisionInfo();
+
+ if (getNumColumns() == 0) {
+ return; // there are cases where downstream operators don't select any columns
+ // in such a case, Parquet will return the pseudo column _DEFAULT_COL_TO_READ_
+ }
+
+ // We need to divide the overall memory pool amongst all columns
+ assignColumnsBatchMemory();
+
+ // Initialize the overflow optimizer
+ overflowOptimizer.setup();
+ }
+
+ /**
+ * @return the schema
+ */
+ public ParquetSchema getSchema() {
+ return schema;
+ }
+
+ /**
+ * Allocates value vectors for the current batch.
+ *
+ * @param vectorMap a collection of value vectors keyed by their field names
+ * @throws OutOfMemoryException
+ */
+ public void allocate(Map<String, ValueVector> vectorMap) throws OutOfMemoryException {
+
+ if (columnPrecisionChanged) {
+ // We need to divide the overall memory pool amongst all columns
+ assignColumnsBatchMemory();
+ }
+
+ try {
+ for (final ValueVector v : vectorMap.values()) {
+ ColumnMemoryInfo columnMemoryInfo = columnMemoryInfoMap.get(v.getField().getName());
+
+ if (columnMemoryInfo != null) {
+ AllocationHelper.allocate(v, recordsPerBatch, columnMemoryInfo.columnPrecision, 0);
+ } else {
+ // This column was found in another Parquet file but not the current one; so we inject
+ // a null value. At this time, we do not account for such columns. Why? the right design is
+ // to create a ZERO byte all-nulls value vector to handle such columns (there could be hundred of these).
+ AllocationHelper.allocate(v, recordsPerBatch, 0, 0); // the helper will still use a precision of 1
+ }
+ }
+ } catch (NullPointerException e) {
+ throw new OutOfMemoryException();
+ }
+ }
+
+ /**
+ * @return the field overflow state map
+ */
+ public Map<String, FieldOverflowStateContainer> getFieldOverflowMap() {
+ return fieldOverflowMap;
+ }
+
+ /**
+ * @param field materialized field
+ * @return field overflow state container
+ */
+ public FieldOverflowStateContainer getFieldOverflowContainer(String field) {
+ return fieldOverflowMap.get(field);
+ }
+
+ /**
+ * Releases the overflow data resources associated with this field; also removes the overflow
+ * container from the overflow containers map.
+ *
+ * @param field materialized field
+ * @return true if this field's overflow container was removed from the overflow containers map
+ */
+ public boolean releaseFieldOverflowContainer(String field) {
+ return releaseFieldOverflowContainer(field, true);
+ }
+
+ /**
+ * @param field materialized field
+ * @return field batch memory quota
+ */
+ public ColumnMemoryQuota getCurrentFieldBatchMemory(String field) {
+ return columnMemoryInfoMap.get(field).columnMemoryQuota;
+ }
+
+ /**
+ * @return current number of records per batch (may change across batches)
+ */
+ public int getCurrentRecordsPerBatch() {
+ return recordsPerBatch;
+ }
+
+ /**
+ * @return current total memory per batch (may change across batches)
+ */
+ public int getCurrentMemorySizePerBatch() {
+ return maxMemorySizePerBatch; // Current logic doesn't mutate the max-memory after it has been set
+ }
+
+ /**
+ * @return configured number of records per batch (may be different from the enforced one)
+ */
+ public int getConfigRecordsPerBatch() {
+ return configRecordsPerBatch;
+ }
+
+ /**
+ * @return configured memory size per batch (may be different from the enforced one)
+ */
+ public int getConfigMemorySizePerBatch() {
+ return configMemorySizePerBatch;
+ }
+
+ /**
+ * Enables this object to optimize the impact of overflows by computing more
+ * accurate VL column precision.
+ *
+ * @param batchNumRecords number of records in this batch
+ * @param batchStats columns statistics
+ */
+ public void onEndOfBatch(int batchNumRecords, List<VarLenColumnBatchStats> batchStats) {
+ columnPrecisionChanged = overflowOptimizer.onEndOfBatch(batchNumRecords, batchStats);
+ }
+
+ /**
+ * Closes all resources managed by this object
+ */
+ public void close() {
+
+ for (String field: fieldOverflowMap.keySet()) {
+ releaseFieldOverflowContainer(field, false);
+ }
+
+ // now clear the map
+ fieldOverflowMap.clear();
+ }
+
+// ----------------------------------------------------------------------------
+// Internal implementation logic
+// ----------------------------------------------------------------------------
+
+ private int getConfiguredMaxBatchMemory(OptionManager options) {
+ // Use the parquet specific configuration if set
+ int maxMemory = (int) options.getLong(ExecConstants.PARQUET_FLAT_BATCH_MEMORY_SIZE);
+
+ // Otherwise, use the common property
+ if (maxMemory <= 0) {
+ maxMemory = (int) options.getLong(ExecConstants.OUTPUT_BATCH_SIZE);
+ }
+ return maxMemory;
+ }
+
+ private int normalizeNumRecordsPerBatch() {
+ int normalizedNumRecords = configRecordsPerBatch;
+
+ if (configRecordsPerBatch <= 0) {
+ final String message = String.format("Invalid Parquet number of record(s) per batch [%d]",
+ configRecordsPerBatch);
+
+ throw new IllegalArgumentException(message);
+ }
+
+ if (normalizedNumRecords > totalRecordsToRead) {
+ if (logger.isDebugEnabled()) {
+ final String message = String.format("The requested number of record(s) to read is lower than the records per batch; "
+ + "updating the number of record(s) per batch from [%d] to [%d]",
+ normalizedNumRecords, totalRecordsToRead);
+ logger.debug(message);
+ }
+ normalizedNumRecords = (int) totalRecordsToRead;
+ }
+
+ if (logger.isDebugEnabled()) {
+ final String message = String.format("%s: The Parquet reader number of record(s) has been set to [%d]",
+ BATCH_STATS_PREFIX, normalizedNumRecords);
+ logger.debug(message);
+ }
+
+ return normalizedNumRecords;
+ }
+
+ private int normalizeMemorySizePerBatch() {
+ int normalizedMemorySize = configMemorySizePerBatch;
+
+ if (normalizedMemorySize <= 0) {
+ final String message = String.format("Invalid Parquet memory per batch [%d] byte(s)",
+ configMemorySizePerBatch);
+
+ throw new IllegalArgumentException(message);
+ }
+
+ // Ensure the minimal memory size per column is satisfied
+ final int numColumns = schema.getColumnMetadata().size();
+
+ if (numColumns == 0) {
+ return normalizedMemorySize; // NOOP
+ }
+
+ final int memorySizePerColumn = normalizedMemorySize / numColumns;
+
+ if (memorySizePerColumn < MIN_COLUMN_MEMORY_SZ) {
+ final int prevValue = normalizedMemorySize;
+ normalizedMemorySize = MIN_COLUMN_MEMORY_SZ * numColumns;
+
+ final String message = String.format("The Parquet memory per batch [%d] byte(s) is too low for this query ; using [%d] bytes",
+ prevValue, normalizedMemorySize);
+ logger.warn(message);
+ }
+
+ if (logger.isDebugEnabled()) {
+ final String message = String.format("%s: The Parquet reader batch memory has been set to [%d] byte(s)",
+ BATCH_STATS_PREFIX, normalizedMemorySize);
+ logger.debug(message);
+ }
+
+ return normalizedMemorySize;
+ }
+
+ private void loadColumnsPrecisionInfo() {
+ assert fixedLengthColumns.size() == 0;
+ assert variableLengthColumns.size() == 0;
+
+ for (ParquetColumnMetadata columnMetadata : schema.getColumnMetadata()) {
+ assert !columnMetadata.isRepeated() : "This reader doesn't handle repeated columns..";
+
+ ColumnMemoryInfo columnMemoryInfo = new ColumnMemoryInfo();
+ columnMemoryInfoMap.put(columnMetadata.getField().getName(), columnMemoryInfo);
+
+ if (columnMetadata.isFixedLength()) {
+ columnMemoryInfo.columnMeta = columnMetadata;
+ columnMemoryInfo.columnPrecision = BatchSizingMemoryUtil.getFixedColumnTypePrecision(columnMetadata);
+ columnMemoryInfo.columnMemoryQuota.reset();
+
+ fixedLengthColumns.add(columnMemoryInfo);
+ } else {
+
+ columnMemoryInfo.columnMeta = columnMetadata;
+ columnMemoryInfo.columnPrecision = BatchSizingMemoryUtil.getAvgVariableLengthColumnTypePrecision(columnMetadata);
+ columnMemoryInfo.columnMemoryQuota.reset();
+
+ variableLengthColumns.add(columnMemoryInfo);
+ }
+ }
+ }
+
+ private void assignColumnsBatchMemory() {
+
+ if (getNumColumns() == 0) {
+ return;
+ }
+
+ recordsPerBatch = maxRecordsPerBatch;
+
+ // Cache the original records per batch as it may change
+ int originalRecordsPerBatch = recordsPerBatch;
+
+ // Perform the fine-grained memory quota assignment
+ assignFineGrainedMemoryQuota();
+
+ // log the new record batch if it changed
+ if (logger.isDebugEnabled()) {
+ assert recordsPerBatch <= maxRecordsPerBatch;
+
+ if (originalRecordsPerBatch != recordsPerBatch) {
+ final String message = String.format("%s: The Parquet records per batch [%d] has been decreased to [%d]",
+ BATCH_STATS_PREFIX, originalRecordsPerBatch, recordsPerBatch);
+ logger.debug(message);
+ }
+
+ // Now dump the per column memory quotas
+ dumpColumnMemoryQuotas();
+ }
+ }
+
+ private void assignFineGrainedMemoryQuota() {
+
+ // - Compute the memory required based on the current batch size and assigned column precision
+ // - Compute the ration have-memory / needed-memory
+ // - if less than one, new-num-records = num-recs * ratio; go back to previous steps
+ // - distribute the extra memory uniformly across the variable length columns
+
+ MemoryRequirementContainer requiredMemory = new MemoryRequirementContainer();
+ int newRecordsPerBatch = recordsPerBatch;
+
+ while (true) {
+ // Compute max-memory / needed-memory-for-current-num-records
+ recordsPerBatch = newRecordsPerBatch;
+ double neededMemoryRatio = computeNeededMemoryRatio(requiredMemory);
+ assert neededMemoryRatio <= 1;
+
+ newRecordsPerBatch = (int) (recordsPerBatch * neededMemoryRatio);
+ assert newRecordsPerBatch <= recordsPerBatch;
+
+ if (newRecordsPerBatch <= 1) {
+ recordsPerBatch = 1;
+ computeNeededMemoryRatio(requiredMemory); // update the memory quota with this new number of records
+ break; // we cannot process less than one row
+
+ } else if (newRecordsPerBatch < recordsPerBatch) {
+ // We computed a new number of records per batch; we need to
+ // a) make sure this new number satisfies our needs and b)
+ // per per column quota
+ continue;
+ }
+ assert recordsPerBatch == newRecordsPerBatch;
+
+ // Alright, we have now found the target number of records; we need
+ // only to adjust the remaining memory (if any) amongst the variable
+ // length columns.
+ distributeExtraMemorySpace(requiredMemory);
+ break; // we're done
+ }
+ }
+
+ private void distributeExtraMemorySpace(MemoryRequirementContainer requiredMemory) {
+ // Distribute uniformly the extra memory space to the variable length columns
+ // to minimize the chance of overflow conditions.
+ final int numVariableLengthColumns = variableLengthColumns.size();
+
+ if (numVariableLengthColumns == 0) {
+ return; // we're done
+ }
+
+ final int totalMemoryNeeded = requiredMemory.fixedLenRequiredMemory + requiredMemory.variableLenRequiredMemory;
+ final int extraMemorySpace = maxMemorySizePerBatch - totalMemoryNeeded;
+ final int perColumnExtraSpace = extraMemorySpace / numVariableLengthColumns;
+
+ if (perColumnExtraSpace == 0) {
+ return;
+ }
+
+ for (ColumnMemoryInfo columnInfo : variableLengthColumns) {
+ columnInfo.columnMemoryQuota.maxMemoryUsage += perColumnExtraSpace;
+ }
+ }
+
+ private int getNumColumns() {
+ return fixedLengthColumns.size() + variableLengthColumns.size();
+ }
+
+ private boolean releaseFieldOverflowContainer(String field, boolean remove) {
+ FieldOverflowStateContainer container = getFieldOverflowContainer(field);
+
+ if (container == null) {
+ return false; // NOOP
+ }
+
+ // We need to release resources associated with this container
+ container.release();
+ container.overflowDef = null;
+ container.overflowState = null;
+
+ if (remove) {
+ // Finally remove this container from the map
+ fieldOverflowMap.remove(field);
+ }
+
+ return remove;
+ }
+
+ private int computeVectorMemory(ColumnMemoryInfo columnInfo, int numValues) {
+ if (columnInfo.columnMeta.isFixedLength()) {
+ return BatchSizingMemoryUtil.computeFixedLengthVectorMemory(columnInfo.columnMeta, numValues);
+ }
+ return BatchSizingMemoryUtil.computeVariableLengthVectorMemory(
+ columnInfo.columnMeta,
+ columnInfo.columnPrecision,
+ numValues);
+ }
+
+ private double computeNeededMemoryRatio(MemoryRequirementContainer requiredMemory) {
+ requiredMemory.reset();
+
+ for (ColumnMemoryInfo columnInfo : fixedLengthColumns) {
+ columnInfo.columnMemoryQuota.maxMemoryUsage = computeVectorMemory(columnInfo, recordsPerBatch);
+ columnInfo.columnMemoryQuota.maxNumValues = recordsPerBatch;
+ requiredMemory.fixedLenRequiredMemory += columnInfo.columnMemoryQuota.maxMemoryUsage;
+ }
+
+ for (ColumnMemoryInfo columnInfo : variableLengthColumns) {
+ columnInfo.columnMemoryQuota.maxMemoryUsage = computeVectorMemory(columnInfo, recordsPerBatch);
+ columnInfo.columnMemoryQuota.maxNumValues = recordsPerBatch;
+ requiredMemory.variableLenRequiredMemory += columnInfo.columnMemoryQuota.maxMemoryUsage;
+ }
+
+ final int totalMemoryNeeded = requiredMemory.fixedLenRequiredMemory + requiredMemory.variableLenRequiredMemory;
+ assert totalMemoryNeeded > 0;
+
+ double neededMemoryRatio = ((double) maxMemorySizePerBatch) / totalMemoryNeeded;
+
+ return neededMemoryRatio > 1 ? 1 : neededMemoryRatio;
+ }
+
+ private void dumpColumnMemoryQuotas() {
+ StringBuilder msg = new StringBuilder(BATCH_STATS_PREFIX);
+ msg.append(": Field Quotas:\n\tName\tType\tPrec\tQuota\n");
+
+ for (ColumnMemoryInfo columnInfo : columnMemoryInfoMap.values()) {
+ msg.append("\t");
+ msg.append(BATCH_STATS_PREFIX);
+ msg.append("\t");
+ msg.append(columnInfo.columnMeta.getField().getName());
+ msg.append("\t");
+ printType(columnInfo.columnMeta.getField(), msg);
+ msg.append("\t");
+ msg.append(columnInfo.columnPrecision);
+ msg.append("\t");
+ msg.append(columnInfo.columnMemoryQuota.maxMemoryUsage);
+ msg.append("\n");
+ }
+
+ logger.debug(msg.toString());
+ }
+
+ private static void printType(MaterializedField field, StringBuilder msg) {
+ final MajorType type = field.getType();
+
+ msg.append(type.getMinorType().name());
+ msg.append(':');
+ msg.append(type.getMode().name());
+ }
+
+
+// ----------------------------------------------------------------------------
+// Inner Data Structure
+// ----------------------------------------------------------------------------
+
+ /** An abstraction to allow column readers attach custom field overflow state */
+ public static interface FieldOverflowState {
+
+ /** Overflow data can become an input source for the next batch(s); this method
+ * allows the reader framework to inform individual readers on the number of
+ * values that have been consumed from the current overflow data
+ *
+ * @param numValues the number of values consumed within the current batch
+ */
+ void onNewBatchValuesConsumed(int numValues);
+
+ /**
+ * @return true if the overflow data has been fully consumed (all overflow data consumed by
+ * the Parquet reader)
+ */
+ boolean isOverflowDataFullyConsumed();
+ }
+
+ /** Container object to hold current field overflow state */
+ public static final class FieldOverflowStateContainer {
+ /** Field overflow definition */
+ public FieldOverflowDefinition overflowDef;
+ /** Field overflow state */
+ public FieldOverflowState overflowState;
+
+ public FieldOverflowStateContainer(FieldOverflowDefinition overflowDef, FieldOverflowState overflowState) {
+ this.overflowDef = overflowDef;
+ this.overflowState = overflowState;
+ }
+
+ private void release() {
+ if (overflowDef != null) {
+ if (logger.isDebugEnabled()) {
+ logger.debug(String.format(
+ "Releasing a buffer of length %d used to handle overflow data", overflowDef.buffer.capacity()));
+ }
+ overflowDef.buffer.release();
+ }
+ overflowDef = null;
+ overflowState = null;
+ }
+ }
+
+ /** Container object to supply variable columns statistics to the batch sizer */
+ public final static class VarLenColumnBatchStats {
+ /** Value vector associated with a VL column */
+ public final ValueVector vector;
+ /** Number of values read in the current batch */
+ public final int numValuesRead;
+
+ /**
+ * Constructor.
+ * @param vector value vector
+ * @param numValuesRead number of values
+ */
+ public VarLenColumnBatchStats(ValueVector vector, int numValuesRead) {
+ this.vector = vector;
+ this.numValuesRead = numValuesRead;
+ }
+ }
+
+ /** Field memory quota */
+ public static final class ColumnMemoryQuota {
+ /** Maximum cumulative memory that could be used */
+ private int maxMemoryUsage;
+ /** Maximum number of values that could be inserted */
+ private int maxNumValues;
+
+ public ColumnMemoryQuota() {
+ }
+
+ /**
+ * @param maxMemoryUsage maximum cumulative memory that could be used
+ */
+ public ColumnMemoryQuota(int maxMemoryUsage) {
+ this.maxMemoryUsage = maxMemoryUsage;
+ }
+
+ /**
+ * @return the maxMemoryUsage
+ */
+ public int getMaxMemoryUsage() {
+ return maxMemoryUsage;
+ }
+
+ /**
+ * @return the maxNumValues
+ */
+ public int getMaxNumValues() {
+ return maxNumValues;
+ }
+
+ void reset() {
+ maxMemoryUsage = 0;
+ maxNumValues = 0;
+ }
+ }
+
+ /** A container which holds a column memory precision & current quota information */
+ static final class ColumnMemoryInfo {
+ /** Column metadata */
+ ParquetColumnMetadata columnMeta;
+ /** Column value precision (maximum length for VL columns) */
+ int columnPrecision;
+ /** Column current memory quota within a batch */
+ final ColumnMemoryQuota columnMemoryQuota = new ColumnMemoryQuota();
+ }
+
+ /** Memory requirements container */
+ static final class MemoryRequirementContainer {
+ /** Memory needed for the fixed length columns given a specific record size */
+ private int fixedLenRequiredMemory;
+ /** Memory needed for the fixed length columns given a specific record size */
+ private int variableLenRequiredMemory;
+
+ private void reset() {
+ this.fixedLenRequiredMemory = 0;
+ this.variableLenRequiredMemory = 0;
+ }
+ }
+
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/util/record/RecordBatchStats.java b/exec/java-exec/src/main/java/org/apache/drill/exec/util/record/RecordBatchStats.java
new file mode 100644
index 0000000..8b213a8
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/util/record/RecordBatchStats.java
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.util.record;
+
+import org.apache.drill.common.types.TypeProtos.MajorType;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.FragmentContextImpl;
+import org.apache.drill.exec.ops.OperatorContext;
+import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
+import org.apache.drill.exec.proto.helper.QueryIdHelper;
+import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.record.RecordBatchSizer;
+import org.apache.drill.exec.record.RecordBatchSizer.ColumnSize;
+
+/**
+ * Utility class to capture key record batch statistics.
+ */
+public final class RecordBatchStats {
+ /** A prefix for all batch stats to simplify search */
+ public static final String BATCH_STATS_PREFIX = "BATCH_STATS";
+
+ /** Helper class which loads contextual record batch logging options */
+ public static final class RecordBatchStatsContext {
+ private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RecordBatchStatsContext.class);
+
+ /** batch size logging for all readers */
+ private final boolean enableBatchSzLogging;
+ /** Fine grained batch size logging */
+ private final boolean enableFgBatchSzLogging;
+ /** Unique Operator Identifier */
+ private final String contextOperatorId;
+
+ /**
+ * @param options options manager
+ */
+ public RecordBatchStatsContext(FragmentContext context, OperatorContext oContext) {
+ enableBatchSzLogging = context.getOptions().getBoolean(ExecConstants.STATS_LOGGING_BATCH_SIZE_OPTION);
+ enableFgBatchSzLogging = context.getOptions().getBoolean(ExecConstants.STATS_LOGGING_FG_BATCH_SIZE_OPTION);
+ contextOperatorId = new StringBuilder()
+ .append(getQueryId(context))
+ .append(":")
+ .append(oContext.getStats().getId())
+ .toString();
+ }
+
+ /**
+ * @return the enableBatchSzLogging
+ */
+ public boolean isEnableBatchSzLogging() {
+ return enableBatchSzLogging || enableFgBatchSzLogging || logger.isDebugEnabled();
+ }
+
+ /**
+ * @return the enableFgBatchSzLogging
+ */
+ public boolean isEnableFgBatchSzLogging() {
+ return enableFgBatchSzLogging || logger.isDebugEnabled();
+ }
+
+ /**
+ * @return indicates whether stats messages should be logged in info or debug level
+ */
+ public boolean useInfoLevelLogging() {
+ return isEnableBatchSzLogging() && !logger.isDebugEnabled();
+ }
+
+ /**
+ * @return the contextOperatorId
+ */
+ public String getContextOperatorId() {
+ return contextOperatorId;
+ }
+
+ private String getQueryId(FragmentContext _context) {
+ if (_context instanceof FragmentContextImpl) {
+ final FragmentContextImpl context = (FragmentContextImpl) _context;
+ final FragmentHandle handle = context.getHandle();
+
+ if (handle != null) {
+ return QueryIdHelper.getQueryIdentifier(handle);
+ }
+ }
+ return "NA";
+ }
+ }
+
+ /**
+ * Constructs record batch statistics for the input record batch
+ *
+ * @param stats instance identifier
+ * @param sourceId optional source identifier for scanners
+ * @param recordBatch a set of records
+ * @param verbose whether to include fine-grained stats
+ *
+ * @return a string containing the record batch statistics
+ */
+ public static String printRecordBatchStats(String statsId,
+ String sourceId,
+ RecordBatch recordBatch,
+ boolean verbose) {
+
+ final RecordBatchSizer batchSizer = new RecordBatchSizer(recordBatch);
+ final StringBuilder msg = new StringBuilder();
+
+ msg.append(BATCH_STATS_PREFIX);
+ msg.append(" Originator: {");
+ msg.append(statsId);
+ if (sourceId != null) {
+ msg.append(':');
+ msg.append(sourceId);
+ }
+ msg.append("}, Batch size: {");
+ msg.append( " Records: " );
+ msg.append(batchSizer.rowCount());
+ msg.append(", Total size: ");
+ msg.append(batchSizer.getActualSize());
+ msg.append(", Data size: ");
+ msg.append(batchSizer.getNetBatchSize());
+ msg.append(", Gross row width: ");
+ msg.append(batchSizer.getGrossRowWidth());
+ msg.append(", Net row width: ");
+ msg.append(batchSizer.getNetRowWidth());
+ msg.append(", Density: ");
+ msg.append(batchSizer.getAvgDensity());
+ msg.append("% }\n");
+
+ if (verbose) {
+ msg.append("Batch schema & sizes: {\n");
+ for (ColumnSize colSize : batchSizer.columns().values()) {
+ msg.append(BATCH_STATS_PREFIX);
+ msg.append("\t");
+ msg.append(statsId);
+ msg.append('\t');
+ msg.append(colSize.toString());
+ msg.append(" }\n");
+ }
+ msg.append(" }\n");
+ }
+
+ return msg.toString();
+ }
+
+ /**
+ * Logs record batch statistics for the input record batch (logging happens only
+ * when record statistics logging is enabled).
+ *
+ * @param stats instance identifier
+ * @param sourceId optional source identifier for scanners
+ * @param recordBatch a set of records
+ * @param verbose whether to include fine-grained stats
+ * @param logger Logger where to print the record batch statistics
+ */
+ public static void logRecordBatchStats(String statsId,
+ String sourceId,
+ RecordBatch recordBatch,
+ RecordBatchStatsContext batchStatsLogging,
+ org.slf4j.Logger logger) {
+
+ if (!batchStatsLogging.isEnableBatchSzLogging()) {
+ return; // NOOP
+ }
+
+ final boolean verbose = batchStatsLogging.isEnableFgBatchSzLogging();
+ final String msg = printRecordBatchStats(statsId, sourceId, recordBatch, verbose);
+
+ if (batchStatsLogging.useInfoLevelLogging()) {
+ logger.info(msg);
+ } else {
+ logger.debug(msg);
+ }
+ }
+
+ /**
+ * Prints a materialized field type
+ * @param field materialized field
+ * @param msg string builder where to append the field type
+ */
+ public static void printFieldType(MaterializedField field, StringBuilder msg) {
+ final MajorType type = field.getType();
+
+ msg.append(type.getMinorType().name());
+ msg.append(':');
+ msg.append(type.getMode().name());
+ }
+
+ /**
+ * @param allocator dumps allocator statistics
+ * @return string with allocator statistics
+ */
+ public static String printAllocatorStats(BufferAllocator allocator) {
+ StringBuilder msg = new StringBuilder();
+ msg.append(BATCH_STATS_PREFIX);
+ msg.append(": dumping allocator statistics:\n");
+ msg.append(BATCH_STATS_PREFIX);
+ msg.append(": ");
+ msg.append(allocator.toString());
+
+ return msg.toString();
+ }
+
+ /**
+ * Disabling class object instantiation.
+ */
+ private RecordBatchStats() {
+ }
+
+}
\ No newline at end of file
diff --git a/exec/java-exec/src/main/resources/drill-module.conf b/exec/java-exec/src/main/resources/drill-module.conf
index 99caeab..23d59d3 100644
--- a/exec/java-exec/src/main/resources/drill-module.conf
+++ b/exec/java-exec/src/main/resources/drill-module.conf
@@ -482,6 +482,8 @@ drill.exec.options: {
exec.storage.enable_new_text_reader: true,
exec.udf.enable_dynamic_support: true,
exec.udf.use_dynamic: true,
+ drill.exec.stats.logging.batch_size: false,
+ drill.exec.stats.logging.fine_grained.batch_size: false,
new_view_default_permissions: 700,
org.apache.drill.exec.compile.ClassTransformer.scalar_replacement: "try",
planner.add_producer_consumer: false,
@@ -581,6 +583,10 @@ drill.exec.options: {
store.parquet.writer.logical_type_for_decimals: "fixed_len_byte_array",
store.parquet.writer.use_single_fs_block: false,
store.parquet.flat.reader.bulk: true,
+ store.parquet.flat.batch.num_records: 32767,
+ # Using common operators batch configuration unless the Parquet specific
+ # configuration is used
+ store.parquet.flat.batch.memory_size: 0,
store.partition.hash_distribute: false,
store.text.estimated_row_size_bytes: 100.0,
store.kafka.all_text_mode: false,
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/columnreaders/TestBatchSizingMemoryUtil.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/columnreaders/TestBatchSizingMemoryUtil.java
new file mode 100644
index 0000000..9f4e026
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/columnreaders/TestBatchSizingMemoryUtil.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.parquet.columnreaders;
+
+import java.math.BigDecimal;
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.exec.physical.unit.PhysicalOpUnitTestBase;
+import org.apache.drill.exec.record.VectorContainer;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.exec.store.parquet.columnreaders.batchsizing.BatchSizingMemoryUtil;
+import org.apache.drill.exec.store.parquet.columnreaders.batchsizing.BatchSizingMemoryUtil.ColumnMemoryUsageInfo;
+import org.apache.drill.exec.store.parquet.columnreaders.batchsizing.RecordBatchSizerManager.ColumnMemoryQuota;
+import org.apache.drill.test.rowSet.RowSet;
+import org.apache.drill.test.rowSet.RowSetBuilder;
+import org.apache.drill.test.rowSet.schema.SchemaBuilder;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class TestBatchSizingMemoryUtil extends PhysicalOpUnitTestBase {
+// private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TestBatchSizingMemoryUtil.class);
+
+ // Batch schema
+ private static TupleMetadata schema;
+ private static TupleMetadata nullableSchema;
+
+ // Row set
+ private RowSet.SingleRowSet rowSet;
+
+ // Column memory usage information
+ private final ColumnMemoryUsageInfo[] columnMemoryInfo = new ColumnMemoryUsageInfo[3];
+
+ @BeforeClass
+ public static void setUpBeforeClass() throws Exception {
+ schema = new SchemaBuilder()
+ .add("name_vchar", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.REQUIRED)
+ .add("name_vbinary", TypeProtos.MinorType.VARBINARY, TypeProtos.DataMode.REQUIRED)
+ .add("name_vdecimal", TypeProtos.MinorType.VARDECIMAL, TypeProtos.DataMode.REQUIRED)
+ .buildSchema();
+
+ nullableSchema = new SchemaBuilder()
+ .add("name_vchar", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+ .add("name_vbinary", TypeProtos.MinorType.VARBINARY, TypeProtos.DataMode.OPTIONAL)
+ .add("name_vdecimal", TypeProtos.MinorType.VARDECIMAL, TypeProtos.DataMode.OPTIONAL)
+ .buildSchema();
+ }
+
+ @Test
+ public void testCanAddNewData() {
+ try {
+ testCanAddNewData(false);
+
+ } finally {
+ if (rowSet != null) {
+ rowSet.clear();
+ }
+ }
+ }
+
+ @Test
+ public void testCanAddNewNullabalData() {
+ try {
+ testCanAddNewData(true);
+
+ } finally {
+ if (rowSet != null) {
+ rowSet.clear();
+ }
+ }
+ }
+
+ private void testCanAddNewData(boolean isOptional) {
+ final Object[] data = {"0123456789", new byte[10], new BigDecimal(Long.MAX_VALUE) };
+ final int numRows = 1;
+
+ // Load the test data into the associated Value Vectors
+ loadTestData(numRows, isOptional, data);
+
+ for (int columnIdx = 0; columnIdx < 3; columnIdx++) {
+ final ColumnMemoryUsageInfo columnInfo = columnMemoryInfo[columnIdx];
+ final int remainingBitsCapacity = getRemainingBitsCapacity(columnInfo);
+ final int remainingOffsetsCapacity = getRemainingOffsetsCapacity(columnInfo);
+ final int remainingDataCapacity = getRemainingDataCapacity(columnInfo);
+
+ // Test current VV is within quota (since we are not adding new entries)
+ Assert.assertTrue(BatchSizingMemoryUtil.canAddNewData(columnInfo, 0, 0, 0));
+
+ if (isOptional) {
+ // Test add the maximum offsets and data
+ Assert.assertTrue(BatchSizingMemoryUtil.canAddNewData(columnInfo, remainingBitsCapacity, remainingOffsetsCapacity, remainingDataCapacity));
+
+ // Test VV overflow: for bits, offsets, data, and then all
+ Assert.assertFalse(BatchSizingMemoryUtil.canAddNewData(columnInfo, remainingBitsCapacity + 1, remainingOffsetsCapacity, remainingDataCapacity));
+ Assert.assertFalse(BatchSizingMemoryUtil.canAddNewData(columnInfo, remainingBitsCapacity, remainingOffsetsCapacity + 1, remainingDataCapacity));
+ Assert.assertFalse(BatchSizingMemoryUtil.canAddNewData(columnInfo, remainingBitsCapacity, remainingOffsetsCapacity, remainingDataCapacity + 1));
+ Assert.assertFalse(BatchSizingMemoryUtil.canAddNewData(columnInfo, remainingBitsCapacity + 1, remainingOffsetsCapacity + 1, remainingDataCapacity + 1));
+ } else {
+ // Test add the maximum offsets and data
+ Assert.assertTrue(BatchSizingMemoryUtil.canAddNewData(columnInfo, 0, remainingOffsetsCapacity, remainingDataCapacity));
+
+ // Test VV overflow: for offsets, data, and then both
+ Assert.assertFalse(BatchSizingMemoryUtil.canAddNewData(columnInfo, 0, remainingOffsetsCapacity + 1, remainingDataCapacity));
+ Assert.assertFalse(BatchSizingMemoryUtil.canAddNewData(columnInfo, 0, remainingOffsetsCapacity, remainingDataCapacity + 1));
+ Assert.assertFalse(BatchSizingMemoryUtil.canAddNewData(columnInfo, 0, remainingOffsetsCapacity + 1, remainingDataCapacity + 1));
+ }
+ }
+ }
+
+ private void loadTestData(int numRows, boolean isOptional, Object...data) {
+ // First, lets create a row set
+ rowSet = null;
+ final TupleMetadata targetSchema = isOptional ? nullableSchema : schema;
+ final RowSetBuilder builder = operatorFixture.rowSetBuilder(targetSchema);
+
+ for (int rowIdx = 0; rowIdx < numRows; ++rowIdx) {
+ builder.addRow(data);
+ }
+ rowSet = builder.build();
+
+ // Now load the column memory information
+ for (int columnIdx = 0; columnIdx < columnMemoryInfo.length; columnIdx++) {
+ columnMemoryInfo[columnIdx] = getColumnMemoryUsageInfo(columnIdx);
+ }
+ }
+
+ private ColumnMemoryUsageInfo getColumnMemoryUsageInfo(int columnIdx) {
+ final VectorContainer vectorContainer = rowSet.container();
+ final ColumnMemoryUsageInfo result = new ColumnMemoryUsageInfo();
+
+ result.vector = vectorContainer.getValueVector(columnIdx).getValueVector();
+ result.currValueCount = vectorContainer.getRecordCount();
+ result.memoryQuota = new ColumnMemoryQuota(result.vector.getAllocatedSize());
+
+ // Load the VV memory usage information
+ BatchSizingMemoryUtil.getMemoryUsage(result.vector, result.currValueCount, result.vectorMemoryUsage);
+
+ return result;
+ }
+
+ private static int getRemainingBitsCapacity(ColumnMemoryUsageInfo columnInfo) {
+ return columnInfo.vectorMemoryUsage.bitsBytesCapacity - columnInfo.vectorMemoryUsage.bitsBytesUsed;
+ }
+
+ private static int getRemainingOffsetsCapacity(ColumnMemoryUsageInfo columnInfo) {
+ return columnInfo.vectorMemoryUsage.offsetsByteCapacity - columnInfo.vectorMemoryUsage.offsetsBytesUsed;
+ }
+
+ private static int getRemainingDataCapacity(ColumnMemoryUsageInfo columnInfo) {
+ return columnInfo.vectorMemoryUsage.dataByteCapacity - columnInfo.vectorMemoryUsage.dataBytesUsed;
+ }
+
+}
diff --git a/exec/vector/src/main/codegen/templates/NullableValueVectors.java b/exec/vector/src/main/codegen/templates/NullableValueVectors.java
index cdff556..f30cfae 100644
--- a/exec/vector/src/main/codegen/templates/NullableValueVectors.java
+++ b/exec/vector/src/main/codegen/templates/NullableValueVectors.java
@@ -395,6 +395,13 @@ public final class ${className} extends BaseDataValueVector implements <#if type
return v;
}
+ /**
+ * @return Underlying "bits" vector value capacity
+ */
+ public int getBitsValueCapacity() {
+ return bits.getValueCapacity();
+ }
+
public void copyFrom(int fromIndex, int thisIndex, Nullable${minor.class}Vector from){
final Accessor fromAccessor = from.getAccessor();
if (!fromAccessor.isNull(fromIndex)) {
diff --git a/exec/vector/src/main/codegen/templates/VariableLengthVectors.java b/exec/vector/src/main/codegen/templates/VariableLengthVectors.java
index daef7ba..c35728e 100644
--- a/exec/vector/src/main/codegen/templates/VariableLengthVectors.java
+++ b/exec/vector/src/main/codegen/templates/VariableLengthVectors.java
@@ -632,6 +632,10 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements V
// Let's process the input
while (input.hasNext()) {
T entry = input.next();
+
+ if (entry == null || entry.getNumValues() == 0) {
+ break; // this could happen when handling columnar batch sizing constraints
+ }
bufferedMutator.setSafe(entry);
if (callback != null) {
@@ -897,7 +901,7 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements V
this.parent = parent;
this.dataBuffOff = this.parent.offsetVector.getAccessor().get(startIdx);
this.totalDataLen = this.dataBuffOff;
- this.offsetsMutator = new UInt4Vector.BufferedMutator(startIdx, buffSz * 4, parent.offsetVector);
+ this.offsetsMutator = new UInt4Vector.BufferedMutator(startIdx, buffSz, parent.offsetVector);
// Forcing the offsetsMutator to operate at index+1
this.offsetsMutator.setSafe(this.dataBuffOff);
@@ -970,6 +974,7 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements V
// Update counters
dataBuffOff += buffer.position();
+ assert dataBuffOff == totalDataLen;
// Reset the byte buffer
buffer.clear();
@@ -1008,6 +1013,11 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements V
remaining -= toCopy;
} while (remaining > 0);
+
+ // We need to flush as offset data can be accessed during loading to
+ // figure out current payload size.
+ offsetsMutator.flush();
+
}
}
}