You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by ti...@apache.org on 2018/06/30 01:52:27 UTC

[drill] branch master updated: DRILL-6147: Adding Columnar Parquet Batch Sizing functionality

This is an automated email from the ASF dual-hosted git repository.

timothyfarkas pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git


The following commit(s) were added to refs/heads/master by this push:
     new f481a7c  DRILL-6147: Adding Columnar Parquet Batch Sizing functionality
f481a7c is described below

commit f481a7c2833b8c7ebabe02a37590cdd3e559ca5e
Author: Salim Achouche <sa...@gmail.com>
AuthorDate: Tue Jun 19 19:23:41 2018 -0700

    DRILL-6147: Adding Columnar Parquet Batch Sizing functionality
    
    closes #1330
---
 .../java/org/apache/drill/exec/ExecConstants.java  |  16 +
 .../apache/drill/exec/physical/impl/ScanBatch.java |  45 ++
 .../exec/server/options/SystemOptionManager.java   |   4 +
 .../store/parquet/columnreaders/BatchReader.java   |  53 +-
 .../store/parquet/columnreaders/BitReader.java     |   4 +-
 .../store/parquet/columnreaders/ColumnReader.java  |   2 +-
 .../parquet/columnreaders/ColumnReaderFactory.java | 115 ++--
 .../columnreaders/FixedByteAlignedReader.java      |  32 +-
 .../columnreaders/FixedWidthRepeatedReader.java    |   6 +-
 .../parquet/columnreaders/NullableBitReader.java   |   4 +-
 .../columnreaders/NullableColumnReader.java        |   4 +-
 .../NullableFixedByteAlignedReaders.java           |  64 +-
 .../NullableVarLengthValuesColumn.java             |   4 +-
 .../columnreaders/ParquetColumnMetadata.java       |  18 +-
 .../ParquetFixedWidthDictionaryReaders.java        |  44 +-
 .../parquet/columnreaders/ParquetRecordReader.java |  74 ++-
 .../store/parquet/columnreaders/ParquetSchema.java |  18 +-
 .../store/parquet/columnreaders/ReadState.java     |  76 ++-
 .../columnreaders/VarLenAbstractEntryReader.java   |  75 +--
 .../VarLenAbstractPageEntryReader.java             | 100 ++++
 .../parquet/columnreaders/VarLenBinaryReader.java  | 239 +++++++-
 .../columnreaders/VarLenBulkPageReader.java        |  78 ++-
 .../columnreaders/VarLenColumnBulkEntry.java       |  16 +
 .../columnreaders/VarLenColumnBulkInput.java       | 272 +++++++--
 .../columnreaders/VarLenEntryDictionaryReader.java |  14 +-
 .../parquet/columnreaders/VarLenEntryReader.java   |  30 +-
 .../columnreaders/VarLenFixedEntryReader.java      |   8 +-
 .../VarLenNullableDictionaryReader.java            |  16 +-
 .../columnreaders/VarLenNullableEntryReader.java   |  14 +-
 .../VarLenNullableFixedEntryReader.java            |   8 +-
 .../columnreaders/VarLenOverflowReader.java        | 372 ++++++++++++
 .../parquet/columnreaders/VarLengthColumn.java     |   4 +-
 .../columnreaders/VarLengthColumnReaders.java      |  24 +-
 .../columnreaders/VarLengthValuesColumn.java       |   4 +-
 .../batchsizing/BatchOverflowOptimizer.java        | 147 +++++
 .../batchsizing/BatchSizingMemoryUtil.java         | 329 ++++++++++
 .../batchsizing/OverflowSerDeUtil.java             | 366 ++++++++++++
 .../batchsizing/RecordBatchOverflow.java           | 177 ++++++
 .../batchsizing/RecordBatchSizerManager.java       | 661 +++++++++++++++++++++
 .../drill/exec/util/record/RecordBatchStats.java   | 225 +++++++
 .../java-exec/src/main/resources/drill-module.conf |   6 +
 .../columnreaders/TestBatchSizingMemoryUtil.java   | 167 ++++++
 .../codegen/templates/NullableValueVectors.java    |   7 +
 .../codegen/templates/VariableLengthVectors.java   |  12 +-
 44 files changed, 3525 insertions(+), 429 deletions(-)

diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
index 634f670..bc16272 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
@@ -316,6 +316,13 @@ public final class ExecConstants {
   public static final String PARQUET_FLAT_READER_BULK = "store.parquet.flat.reader.bulk";
   public static final OptionValidator PARQUET_FLAT_READER_BULK_VALIDATOR = new BooleanValidator(PARQUET_FLAT_READER_BULK);
 
+  // Controls the flat parquet reader batching constraints (number of record and memory limit)
+  public static final String PARQUET_FLAT_BATCH_NUM_RECORDS = "store.parquet.flat.batch.num_records";
+  public static final OptionValidator PARQUET_FLAT_BATCH_NUM_RECORDS_VALIDATOR = new RangeLongValidator(PARQUET_FLAT_BATCH_NUM_RECORDS, 1, Integer.MAX_VALUE);
+  public static final String PARQUET_FLAT_BATCH_MEMORY_SIZE = "store.parquet.flat.batch.memory_size";
+  // This configuration is used to overwrite the common memory batch sizing configuration property
+  public static final OptionValidator PARQUET_FLAT_BATCH_MEMORY_SIZE_VALIDATOR = new RangeLongValidator(PARQUET_FLAT_BATCH_MEMORY_SIZE, 0, Integer.MAX_VALUE);
+
   public static final String JSON_ALL_TEXT_MODE = "store.json.all_text_mode";
   public static final BooleanValidator JSON_READER_ALL_TEXT_MODE_VALIDATOR = new BooleanValidator(JSON_ALL_TEXT_MODE);
   public static final BooleanValidator JSON_EXTENDED_TYPES = new BooleanValidator("store.json.extended_types");
@@ -689,4 +696,13 @@ public final class ExecConstants {
 
   public static final String ALLOW_LOOPBACK_ADDRESS_BINDING = "drill.exec.allow_loopback_address_binding";
 
+  /** Enables batch size statistics logging */
+  public static final String STATS_LOGGING_BATCH_SIZE_OPTION = "drill.exec.stats.logging.batch_size";
+  public static final BooleanValidator STATS_LOGGING_BATCH_SIZE_VALIDATOR = new BooleanValidator(STATS_LOGGING_BATCH_SIZE_OPTION);
+
+  /** Enables fine-grained batch size statistics logging */
+  public static final String STATS_LOGGING_FG_BATCH_SIZE_OPTION = "drill.exec.stats.logging.fine_grained.batch_size";
+  public static final BooleanValidator STATS_LOGGING_BATCH_FG_SIZE_VALIDATOR = new BooleanValidator(STATS_LOGGING_FG_BATCH_SIZE_OPTION);
+
+
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
index 4a62752..09e785e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
@@ -47,6 +47,8 @@ import org.apache.drill.exec.store.RecordReader;
 import org.apache.drill.exec.testing.ControlsInjector;
 import org.apache.drill.exec.testing.ControlsInjectorFactory;
 import org.apache.drill.exec.util.CallBack;
+import org.apache.drill.exec.util.record.RecordBatchStats;
+import org.apache.drill.exec.util.record.RecordBatchStats.RecordBatchStatsContext;
 import org.apache.drill.exec.vector.AllocationHelper;
 import org.apache.drill.exec.vector.NullableVarCharVector;
 import org.apache.drill.exec.vector.SchemaChangeCallBack;
@@ -80,6 +82,8 @@ public class ScanBatch implements CloseableRecordBatch {
   private final BufferAllocator allocator;
   private final List<Map<String, String>> implicitColumnList;
   private String currentReaderClassName;
+  private final RecordBatchStatsContext batchStatsLogging;
+
   /**
    *
    * @param context
@@ -117,6 +121,7 @@ public class ScanBatch implements CloseableRecordBatch {
       this.implicitColumnList = implicitColumnList;
       addImplicitVectors();
       currentReader = null;
+      batchStatsLogging = new RecordBatchStatsContext(context, oContext);
     } finally {
       oContext.getStats().stopProcessing();
     }
@@ -174,6 +179,7 @@ public class ScanBatch implements CloseableRecordBatch {
         boolean isNewSchema = mutator.isNewSchema();
         populateImplicitVectorsAndSetCount();
         oContext.getStats().batchReceived(0, recordCount, isNewSchema);
+        logRecordBatchStats();
 
         if (recordCount == 0) {
           currentReader.close();
@@ -291,6 +297,45 @@ public class ScanBatch implements CloseableRecordBatch {
     return container.getValueAccessorById(clazz, ids);
   }
 
+  private void logRecordBatchStats() {
+    final int MAX_FQN_LENGTH = 50;
+
+    if (recordCount == 0) {
+      return; // NOOP
+    }
+
+    RecordBatchStats.logRecordBatchStats(
+      batchStatsLogging.getContextOperatorId(),
+      getFQNForLogging(MAX_FQN_LENGTH),
+      this,
+      batchStatsLogging,
+      logger);
+  }
+
+  /** Might truncate the FQN if too long */
+  private String getFQNForLogging(int maxLength) {
+    final String FQNKey = "FQN";
+    final ValueVector v = mutator.implicitFieldVectorMap.get(FQNKey);
+
+    final Object fqnObj;
+
+    if (v == null
+     || v.getAccessor().getValueCount() == 0
+     || (fqnObj = ((NullableVarCharVector) v).getAccessor().getObject(0)) == null) {
+
+      return "NA";
+    }
+
+    String fqn = fqnObj.toString();
+
+    if (fqn != null && fqn.length() > maxLength) {
+      fqn = fqn.substring(fqn.length() - maxLength, fqn.length());
+    }
+
+    return fqn;
+  }
+
+
   /**
    * Row set mutator implementation provided to record readers created by
    * this scan batch. Made visible so that tests can create this mutator
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
index 06b4c57..e6368f5 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
@@ -149,6 +149,8 @@ public class SystemOptionManager extends BaseOptionManager implements AutoClosea
       new OptionDefinition(ExecConstants.PARQUET_PAGEREADER_USE_FADVISE_VALIDATOR),
       new OptionDefinition(ExecConstants.PARQUET_READER_INT96_AS_TIMESTAMP_VALIDATOR),
       new OptionDefinition(ExecConstants.PARQUET_FLAT_READER_BULK_VALIDATOR),
+      new OptionDefinition(ExecConstants.PARQUET_FLAT_BATCH_NUM_RECORDS_VALIDATOR, new OptionMetaData(OptionValue.AccessibleScopes.SYSTEM_AND_SESSION, true, true)),
+      new OptionDefinition(ExecConstants.PARQUET_FLAT_BATCH_MEMORY_SIZE_VALIDATOR, new OptionMetaData(OptionValue.AccessibleScopes.SYSTEM_AND_SESSION, true, true)),
       new OptionDefinition(ExecConstants.JSON_READER_ALL_TEXT_MODE_VALIDATOR),
       new OptionDefinition(ExecConstants.JSON_WRITER_NAN_INF_NUMBERS_VALIDATOR),
       new OptionDefinition(ExecConstants.JSON_READER_NAN_INF_NUMBERS_VALIDATOR),
@@ -229,6 +231,8 @@ public class SystemOptionManager extends BaseOptionManager implements AutoClosea
       new OptionDefinition(ExecConstants.ENABLE_VECTOR_VALIDATOR),
       new OptionDefinition(ExecConstants.ENABLE_ITERATOR_VALIDATOR),
       new OptionDefinition(ExecConstants.OUTPUT_BATCH_SIZE_VALIDATOR, new OptionMetaData(OptionValue.AccessibleScopes.SYSTEM, true, false)),
+      new OptionDefinition(ExecConstants.STATS_LOGGING_BATCH_SIZE_VALIDATOR, new OptionMetaData(OptionValue.AccessibleScopes.SYSTEM_AND_SESSION, true, true)),
+      new OptionDefinition(ExecConstants.STATS_LOGGING_BATCH_FG_SIZE_VALIDATOR,new OptionMetaData(OptionValue.AccessibleScopes.SYSTEM_AND_SESSION, true, true)),
       new OptionDefinition(ExecConstants.FRAG_RUNNER_RPC_TIMEOUT_VALIDATOR, new OptionMetaData(OptionValue.AccessibleScopes.SYSTEM, true, true)),
     };
 
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/BatchReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/BatchReader.java
index 367b226..25dfbc8 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/BatchReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/BatchReader.java
@@ -21,7 +21,7 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
-
+import com.google.common.base.Preconditions;
 import com.google.common.base.Stopwatch;
 import com.google.common.collect.Lists;
 
@@ -38,14 +38,14 @@ public abstract class BatchReader {
 
   public int readBatch() throws Exception {
     ColumnReader<?> firstColumnStatus = readState.getFirstColumnReader();
-    long recordsToRead = Math.min(getReadCount(firstColumnStatus), readState.getRecordsToRead());
+    int currBatchNumRecords = readState.batchSizerMgr().getCurrentRecordsPerBatch();
+    long recordsToRead = Math.min(currBatchNumRecords, readState.getRemainingValuesToRead());
     int readCount = readRecords(firstColumnStatus, recordsToRead);
+
     readState.fillNullVectors(readCount);
     return readCount;
   }
 
-  protected abstract long getReadCount(ColumnReader<?> firstColumnStatus);
-
   protected abstract int readRecords(ColumnReader<?> firstColumnStatus, long recordsToRead) throws Exception;
 
   protected void readAllFixedFields(long recordsToRead) throws Exception {
@@ -59,14 +59,14 @@ public abstract class BatchReader {
   }
 
   protected void readAllFixedFieldsSerial(long recordsToRead) throws IOException {
-    for (ColumnReader<?> crs : readState.getColumnReaders()) {
+    for (ColumnReader<?> crs : readState.getFixedLenColumnReaders()) {
       crs.processPages(recordsToRead);
     }
   }
 
   protected void readAllFixedFieldsParallel(long recordsToRead) throws Exception {
     ArrayList<Future<Long>> futures = Lists.newArrayList();
-    for (ColumnReader<?> crs : readState.getColumnReaders()) {
+    for (ColumnReader<?> crs : readState.getFixedLenColumnReaders()) {
       Future<Long> f = crs.processPagesAsync(recordsToRead);
       if (f != null) {
         futures.add(f);
@@ -106,15 +106,6 @@ public abstract class BatchReader {
     }
 
     @Override
-    protected long getReadCount(ColumnReader<?> firstColumnStatus) {
-      if (readState.recordsRead() == readState.schema().getGroupRecordCount()) {
-        return 0;
-      }
-      return Math.min(ParquetRecordReader.DEFAULT_RECORDS_TO_READ_IF_VARIABLE_WIDTH,
-                      readState.schema().getGroupRecordCount() - readState.recordsRead());
-    }
-
-    @Override
     protected int readRecords(ColumnReader<?> firstColumnStatus, long recordsToRead) {
       readState.updateCounts((int) recordsToRead);
       return (int) recordsToRead;
@@ -133,15 +124,15 @@ public abstract class BatchReader {
     }
 
     @Override
-    protected long getReadCount(ColumnReader<?> firstColumnStatus) {
-      return Math.min(readState.schema().getRecordsPerBatch(),
-                      firstColumnStatus.columnChunkMetaData.getValueCount() - firstColumnStatus.totalValuesRead);
-    }
-
-    @Override
     protected int readRecords(ColumnReader<?> firstColumnStatus, long recordsToRead) throws Exception {
       readAllFixedFields(recordsToRead);
-      return firstColumnStatus.getRecordsReadInCurrentPass();
+
+      Preconditions.checkNotNull(firstColumnStatus != null);
+      readState.setValuesReadInCurrentPass(firstColumnStatus.getRecordsReadInCurrentPass()); // get the number of rows read
+
+      readState.updateCounts((int) recordsToRead); // update the shared Reader State
+
+      return readState.getValuesReadInCurrentPass();
     }
   }
 
@@ -157,15 +148,21 @@ public abstract class BatchReader {
     }
 
     @Override
-    protected long getReadCount(ColumnReader<?> firstColumnStatus) {
-      return ParquetRecordReader.DEFAULT_RECORDS_TO_READ_IF_VARIABLE_WIDTH;
-    }
-
-    @Override
     protected int readRecords(ColumnReader<?> firstColumnStatus, long recordsToRead) throws Exception {
+      // We should not rely on the "firstColumnStatus.getRecordsReadInCurrentPass()" when dealing
+      // with variable length columns as each might return a different number of records. The batch size
+      // will be the lowest value. The variable column readers will update the "readState" object to
+      // reflect the correct information.
       long fixedRecordsToRead = readState.varLengthReader().readFields(recordsToRead);
       readAllFixedFields(fixedRecordsToRead);
-      return firstColumnStatus.getRecordsReadInCurrentPass();
+
+      // Sanity check the fixed readers read the expected number of rows
+      Preconditions.checkArgument(firstColumnStatus == null
+        || firstColumnStatus.getRecordsReadInCurrentPass() == readState.getValuesReadInCurrentPass());
+
+      readState.updateCounts((int) fixedRecordsToRead);
+
+      return readState.getValuesReadInCurrentPass();
     }
   }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/BitReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/BitReader.java
index 8865f0b..b4f1f25 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/BitReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/BitReader.java
@@ -25,9 +25,9 @@ import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
 
 final class BitReader extends ColumnReader<BitVector> {
 
-  BitReader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor, ColumnChunkMetaData columnChunkMetaData,
+  BitReader(ParquetRecordReader parentReader, ColumnDescriptor descriptor, ColumnChunkMetaData columnChunkMetaData,
             boolean fixedLength, BitVector v, SchemaElement schemaElement) throws ExecutionSetupException {
-    super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
+    super(parentReader, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
   }
 
   @Override
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReader.java
index dabd8ca..c343d31 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReader.java
@@ -83,7 +83,7 @@ public abstract class ColumnReader<V extends ValueVector> {
 
   volatile boolean isShuttingDown; //Indicate to not submit any new AsyncPageReader Tasks during clear()
 
-  protected ColumnReader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor,
+  protected ColumnReader(ParquetRecordReader parentReader, ColumnDescriptor descriptor,
       ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, V v, SchemaElement schemaElement) throws ExecutionSetupException {
     this.parentReader = parentReader;
     this.columnDescriptor = descriptor;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReaderFactory.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReaderFactory.java
index ad5d23a..798d3c2 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReaderFactory.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReaderFactory.java
@@ -60,12 +60,11 @@ public class ColumnReaderFactory {
    * @param fixedLength
    * @param descriptor
    * @param columnChunkMetaData
-   * @param allocateSize - the size of the vector to create
-   * @return
+   * @return ColumnReader object instance
    * @throws SchemaChangeException
    */
   static ColumnReader<?> createFixedColumnReader(ParquetRecordReader recordReader, boolean fixedLength, ColumnDescriptor descriptor,
-                                               ColumnChunkMetaData columnChunkMetaData, int allocateSize, ValueVector v,
+                                               ColumnChunkMetaData columnChunkMetaData, ValueVector v,
                                                SchemaElement schemaElement)
       throws Exception {
     ConvertedType convertedType = schemaElement.getConverted_type();
@@ -73,30 +72,30 @@ public class ColumnReaderFactory {
     // ColumnReader for actually transferring data into the data vector inside of our repeated vector
     if (descriptor.getMaxDefinitionLevel() == 0 || descriptor.getMaxRepetitionLevel() > 0) {
       if (columnChunkMetaData.getType() == PrimitiveType.PrimitiveTypeName.BOOLEAN){
-        return new BitReader(recordReader, allocateSize, descriptor, columnChunkMetaData,
+        return new BitReader(recordReader, descriptor, columnChunkMetaData,
             fixedLength, (BitVector) v, schemaElement);
       } else if (!columnChunkMetaData.getEncodings().contains(Encoding.PLAIN_DICTIONARY) && (
           columnChunkMetaData.getType() == PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY
               || columnChunkMetaData.getType() == PrimitiveType.PrimitiveTypeName.INT96)) {
         switch (convertedType) {
           case DECIMAL:
-            return new FixedByteAlignedReader.VarDecimalReader(recordReader, allocateSize, descriptor,
+            return new FixedByteAlignedReader.VarDecimalReader(recordReader, descriptor,
                 columnChunkMetaData, fixedLength, (VarDecimalVector) v, schemaElement);
           case INTERVAL:
-            return new FixedByteAlignedReader.IntervalReader(recordReader, allocateSize, descriptor,
+            return new FixedByteAlignedReader.IntervalReader(recordReader, descriptor,
                 columnChunkMetaData, fixedLength, (IntervalVector) v, schemaElement);
           default:
-            return new FixedByteAlignedReader.FixedBinaryReader(recordReader, allocateSize, descriptor,
+            return new FixedByteAlignedReader.FixedBinaryReader(recordReader, descriptor,
                 columnChunkMetaData, (VariableWidthVector) v, schemaElement);
         }
       } else if (columnChunkMetaData.getType() == PrimitiveType.PrimitiveTypeName.INT32 && convertedType == ConvertedType.DATE){
         switch(recordReader.getDateCorruptionStatus()) {
           case META_SHOWS_CORRUPTION:
-            return new FixedByteAlignedReader.CorruptDateReader(recordReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, (DateVector) v, schemaElement);
+            return new FixedByteAlignedReader.CorruptDateReader(recordReader, descriptor, columnChunkMetaData, fixedLength, (DateVector) v, schemaElement);
           case META_SHOWS_NO_CORRUPTION:
-            return new FixedByteAlignedReader.DateReader(recordReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, (DateVector) v, schemaElement);
+            return new FixedByteAlignedReader.DateReader(recordReader, descriptor, columnChunkMetaData, fixedLength, (DateVector) v, schemaElement);
           case META_UNCLEAR_TEST_VALUES:
-            return new FixedByteAlignedReader.CorruptionDetectingDateReader(recordReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, (DateVector) v, schemaElement);
+            return new FixedByteAlignedReader.CorruptionDetectingDateReader(recordReader, descriptor, columnChunkMetaData, fixedLength, (DateVector) v, schemaElement);
           default:
             throw new ExecutionSetupException(
                 String.format("Issue setting up parquet reader for date type, " +
@@ -108,78 +107,78 @@ public class ColumnReaderFactory {
           switch (columnChunkMetaData.getType()) {
             case INT32:
               if (convertedType == null) {
-                return new ParquetFixedWidthDictionaryReaders.DictionaryIntReader(recordReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, (IntVector) v, schemaElement);
+                return new ParquetFixedWidthDictionaryReaders.DictionaryIntReader(recordReader, descriptor, columnChunkMetaData, fixedLength, (IntVector) v, schemaElement);
               }
               switch (convertedType) {
                 case DECIMAL:
-                  return new ParquetFixedWidthDictionaryReaders.DictionaryVarDecimalReader(recordReader, allocateSize,
+                  return new ParquetFixedWidthDictionaryReaders.DictionaryVarDecimalReader(recordReader,
                       descriptor, columnChunkMetaData, fixedLength, (VarDecimalVector) v, schemaElement);
                 case TIME_MILLIS:
-                  return new ParquetFixedWidthDictionaryReaders.DictionaryTimeReader(recordReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, (TimeVector) v, schemaElement);
+                  return new ParquetFixedWidthDictionaryReaders.DictionaryTimeReader(recordReader, descriptor, columnChunkMetaData, fixedLength, (TimeVector) v, schemaElement);
                 case INT_8:
                 case INT_16:
                 case INT_32:
-                  return new ParquetFixedWidthDictionaryReaders.DictionaryIntReader(recordReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, (IntVector) v, schemaElement);
+                  return new ParquetFixedWidthDictionaryReaders.DictionaryIntReader(recordReader, descriptor, columnChunkMetaData, fixedLength, (IntVector) v, schemaElement);
                 case UINT_8:
                 case UINT_16:
                 case UINT_32:
-                  return new ParquetFixedWidthDictionaryReaders.DictionaryUInt4Reader(recordReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, (UInt4Vector) v, schemaElement);
+                  return new ParquetFixedWidthDictionaryReaders.DictionaryUInt4Reader(recordReader, descriptor, columnChunkMetaData, fixedLength, (UInt4Vector) v, schemaElement);
                 default:
                   throw new ExecutionSetupException("Unsupported dictionary converted type " + convertedType + " for primitive type INT32");
               }
             case INT64:
               if (convertedType == null) {
-                return new ParquetFixedWidthDictionaryReaders.DictionaryBigIntReader(recordReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, (BigIntVector) v, schemaElement);
+                return new ParquetFixedWidthDictionaryReaders.DictionaryBigIntReader(recordReader, descriptor, columnChunkMetaData, fixedLength, (BigIntVector) v, schemaElement);
               }
               switch (convertedType) {
                 case INT_64:
-                  return new ParquetFixedWidthDictionaryReaders.DictionaryBigIntReader(recordReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, (BigIntVector) v, schemaElement);
+                  return new ParquetFixedWidthDictionaryReaders.DictionaryBigIntReader(recordReader, descriptor, columnChunkMetaData, fixedLength, (BigIntVector) v, schemaElement);
                 case UINT_64:
-                  return new ParquetFixedWidthDictionaryReaders.DictionaryUInt8Reader(recordReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, (UInt8Vector) v, schemaElement);
+                  return new ParquetFixedWidthDictionaryReaders.DictionaryUInt8Reader(recordReader, descriptor, columnChunkMetaData, fixedLength, (UInt8Vector) v, schemaElement);
                 case DECIMAL:
-                  return new ParquetFixedWidthDictionaryReaders.DictionaryVarDecimalReader(recordReader, allocateSize,
+                  return new ParquetFixedWidthDictionaryReaders.DictionaryVarDecimalReader(recordReader,
                       descriptor, columnChunkMetaData, fixedLength, (VarDecimalVector) v, schemaElement);
                 case TIMESTAMP_MILLIS:
-                  return new ParquetFixedWidthDictionaryReaders.DictionaryTimeStampReader(recordReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, (TimeStampVector) v, schemaElement);
+                  return new ParquetFixedWidthDictionaryReaders.DictionaryTimeStampReader(recordReader, descriptor, columnChunkMetaData, fixedLength, (TimeStampVector) v, schemaElement);
                 default:
                   throw new ExecutionSetupException("Unsupported dictionary converted type " + convertedType + " for primitive type INT64");
               }
             case FLOAT:
-              return new ParquetFixedWidthDictionaryReaders.DictionaryFloat4Reader(recordReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, (Float4Vector) v, schemaElement);
+              return new ParquetFixedWidthDictionaryReaders.DictionaryFloat4Reader(recordReader, descriptor, columnChunkMetaData, fixedLength, (Float4Vector) v, schemaElement);
             case DOUBLE:
-              return new ParquetFixedWidthDictionaryReaders.DictionaryFloat8Reader(recordReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, (Float8Vector) v, schemaElement);
+              return new ParquetFixedWidthDictionaryReaders.DictionaryFloat8Reader(recordReader, descriptor, columnChunkMetaData, fixedLength, (Float8Vector) v, schemaElement);
             case FIXED_LEN_BYTE_ARRAY:
-              return new ParquetFixedWidthDictionaryReaders.DictionaryFixedBinaryReader(recordReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, (VarBinaryVector) v, schemaElement);
+              return new ParquetFixedWidthDictionaryReaders.DictionaryFixedBinaryReader(recordReader, descriptor, columnChunkMetaData, fixedLength, (VarBinaryVector) v, schemaElement);
             case INT96:
               if (recordReader.getFragmentContext().getOptions().getOption(ExecConstants.PARQUET_READER_INT96_AS_TIMESTAMP).bool_val) {
-                return new ParquetFixedWidthDictionaryReaders.DictionaryBinaryAsTimeStampReader(recordReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, (TimeStampVector) v, schemaElement);
+                return new ParquetFixedWidthDictionaryReaders.DictionaryBinaryAsTimeStampReader(recordReader, descriptor, columnChunkMetaData, fixedLength, (TimeStampVector) v, schemaElement);
               } else {
-                return new ParquetFixedWidthDictionaryReaders.DictionaryFixedBinaryReader(recordReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, (VarBinaryVector) v, schemaElement);
+                return new ParquetFixedWidthDictionaryReaders.DictionaryFixedBinaryReader(recordReader, descriptor, columnChunkMetaData, fixedLength, (VarBinaryVector) v, schemaElement);
               }
             default:
               throw new ExecutionSetupException("Unsupported dictionary column type " + descriptor.getType().name() );
           }
 
         } else if (convertedType == ConvertedType.DECIMAL) {
-          return new FixedByteAlignedReader.VarDecimalReader(recordReader, allocateSize,
+          return new FixedByteAlignedReader.VarDecimalReader(recordReader,
             descriptor, columnChunkMetaData, fixedLength, (VarDecimalVector) v, schemaElement);
         } else {
-          return new FixedByteAlignedReader<>(recordReader, allocateSize, descriptor, columnChunkMetaData,
+          return new FixedByteAlignedReader<>(recordReader, descriptor, columnChunkMetaData,
               fixedLength, v, schemaElement);
         }
       }
     } else { // if the column is nullable
       if (columnChunkMetaData.getType() == PrimitiveType.PrimitiveTypeName.BOOLEAN) {
-        return new NullableBitReader(recordReader, allocateSize, descriptor, columnChunkMetaData,
+        return new NullableBitReader(recordReader, descriptor, columnChunkMetaData,
             fixedLength, (NullableBitVector) v, schemaElement);
       } else if (columnChunkMetaData.getType() == PrimitiveType.PrimitiveTypeName.INT32 && convertedType == ConvertedType.DATE) {
         switch(recordReader.getDateCorruptionStatus()) {
           case META_SHOWS_CORRUPTION:
-            return new NullableFixedByteAlignedReaders.NullableCorruptDateReader(recordReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, (NullableDateVector)v, schemaElement);
+            return new NullableFixedByteAlignedReaders.NullableCorruptDateReader(recordReader, descriptor, columnChunkMetaData, fixedLength, (NullableDateVector)v, schemaElement);
           case META_SHOWS_NO_CORRUPTION:
-            return new NullableFixedByteAlignedReaders.NullableDateReader(recordReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, (NullableDateVector) v, schemaElement);
+            return new NullableFixedByteAlignedReaders.NullableDateReader(recordReader, descriptor, columnChunkMetaData, fixedLength, (NullableDateVector) v, schemaElement);
           case META_UNCLEAR_TEST_VALUES:
-            return new NullableFixedByteAlignedReaders.CorruptionDetectingNullableDateReader(recordReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, (NullableDateVector) v, schemaElement);
+            return new NullableFixedByteAlignedReaders.CorruptionDetectingNullableDateReader(recordReader, descriptor, columnChunkMetaData, fixedLength, (NullableDateVector) v, schemaElement);
           default:
             throw new ExecutionSetupException(
                 String.format("Issue setting up parquet reader for date type, " +
@@ -188,21 +187,21 @@ public class ColumnReaderFactory {
         }
       } else if (columnChunkMetaData.getType() == PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY) {
         if (convertedType == ConvertedType.DECIMAL) {
-          return new NullableFixedByteAlignedReaders.NullableVarDecimalReader(recordReader, allocateSize,
+          return new NullableFixedByteAlignedReaders.NullableVarDecimalReader(recordReader,
               descriptor, columnChunkMetaData, fixedLength, (NullableVarDecimalVector) v, schemaElement);
         } else if (convertedType == ConvertedType.INTERVAL) {
-          return new NullableFixedByteAlignedReaders.NullableIntervalReader(recordReader, allocateSize, descriptor,
+          return new NullableFixedByteAlignedReaders.NullableIntervalReader(recordReader, descriptor,
               columnChunkMetaData, fixedLength, (NullableIntervalVector) v, schemaElement);
         }
       } else {
-        return getNullableColumnReader(recordReader, allocateSize, descriptor,
+        return getNullableColumnReader(recordReader, descriptor,
             columnChunkMetaData, fixedLength, v, schemaElement);
       }
     }
     throw new Exception("Unexpected parquet metadata configuration.");
   }
 
-  static VarLengthValuesColumn<?> getReader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor,
+  static VarLengthValuesColumn<?> getReader(ParquetRecordReader parentReader, ColumnDescriptor descriptor,
                                           ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, ValueVector v,
                                           SchemaElement schemaElement
   ) throws ExecutionSetupException {
@@ -210,39 +209,39 @@ public class ColumnReaderFactory {
     switch (descriptor.getMaxDefinitionLevel()) {
       case 0:
         if (convertedType == null) {
-          return new VarLengthColumnReaders.VarBinaryColumn(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, (VarBinaryVector) v, schemaElement);
+          return new VarLengthColumnReaders.VarBinaryColumn(parentReader, descriptor, columnChunkMetaData, fixedLength, (VarBinaryVector) v, schemaElement);
         }
         switch (convertedType) {
           case UTF8:
           case ENUM:
-            return new VarLengthColumnReaders.VarCharColumn(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, (VarCharVector) v, schemaElement);
+            return new VarLengthColumnReaders.VarCharColumn(parentReader, descriptor, columnChunkMetaData, fixedLength, (VarCharVector) v, schemaElement);
           case DECIMAL:
             if (v instanceof VarDecimalVector) {
-              return new VarLengthColumnReaders.VarDecimalColumn(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, (VarDecimalVector) v, schemaElement);
+              return new VarLengthColumnReaders.VarDecimalColumn(parentReader, descriptor, columnChunkMetaData, fixedLength, (VarDecimalVector) v, schemaElement);
             }
           default:
-            return new VarLengthColumnReaders.VarBinaryColumn(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, (VarBinaryVector) v, schemaElement);
+            return new VarLengthColumnReaders.VarBinaryColumn(parentReader, descriptor, columnChunkMetaData, fixedLength, (VarBinaryVector) v, schemaElement);
         }
       default:
         if (convertedType == null) {
-          return new VarLengthColumnReaders.NullableVarBinaryColumn(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, (NullableVarBinaryVector) v, schemaElement);
+          return new VarLengthColumnReaders.NullableVarBinaryColumn(parentReader, descriptor, columnChunkMetaData, fixedLength, (NullableVarBinaryVector) v, schemaElement);
         }
 
         switch (convertedType) {
           case UTF8:
           case ENUM:
-            return new VarLengthColumnReaders.NullableVarCharColumn(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, (NullableVarCharVector) v, schemaElement);
+            return new VarLengthColumnReaders.NullableVarCharColumn(parentReader, descriptor, columnChunkMetaData, fixedLength, (NullableVarCharVector) v, schemaElement);
           case DECIMAL:
             if (v instanceof NullableVarDecimalVector) {
-              return new VarLengthColumnReaders.NullableVarDecimalColumn(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, (NullableVarDecimalVector) v, schemaElement);
+              return new VarLengthColumnReaders.NullableVarDecimalColumn(parentReader, descriptor, columnChunkMetaData, fixedLength, (NullableVarDecimalVector) v, schemaElement);
             }
           default:
-            return new VarLengthColumnReaders.NullableVarBinaryColumn(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, (NullableVarBinaryVector) v, schemaElement);
+            return new VarLengthColumnReaders.NullableVarBinaryColumn(parentReader, descriptor, columnChunkMetaData, fixedLength, (NullableVarBinaryVector) v, schemaElement);
         }
     }
   }
 
-  public static NullableColumnReader<?> getNullableColumnReader(ParquetRecordReader parentReader, int allocateSize,
+  public static NullableColumnReader<?> getNullableColumnReader(ParquetRecordReader parentReader,
                                                              ColumnDescriptor columnDescriptor,
                                                              ColumnChunkMetaData columnChunkMetaData,
                                                              boolean fixedLength,
@@ -254,58 +253,58 @@ public class ColumnReaderFactory {
       if (columnDescriptor.getType() == PrimitiveType.PrimitiveTypeName.INT96) {
          // TODO: check convertedType once parquet support TIMESTAMP_NANOS type annotation.
         if (parentReader.getFragmentContext().getOptions().getOption(ExecConstants.PARQUET_READER_INT96_AS_TIMESTAMP).bool_val) {
-          return new NullableFixedByteAlignedReaders.NullableFixedBinaryAsTimeStampReader(parentReader, allocateSize, columnDescriptor, columnChunkMetaData, true, (NullableTimeStampVector) valueVec, schemaElement);
+          return new NullableFixedByteAlignedReaders.NullableFixedBinaryAsTimeStampReader(parentReader, columnDescriptor, columnChunkMetaData, true, (NullableTimeStampVector) valueVec, schemaElement);
         } else {
-          return new NullableFixedByteAlignedReaders.NullableFixedBinaryReader(parentReader, allocateSize, columnDescriptor, columnChunkMetaData, true, (NullableVarBinaryVector) valueVec, schemaElement);
+          return new NullableFixedByteAlignedReaders.NullableFixedBinaryReader(parentReader, columnDescriptor, columnChunkMetaData, true, (NullableVarBinaryVector) valueVec, schemaElement);
         }
       } else if (convertedType == ConvertedType.DECIMAL) {
         // NullableVarDecimalVector allows storing of values with different width,
         // so every time when the value is added, offset vector should be updated.
         // Therefore NullableVarDecimalReader is used here instead of NullableFixedByteAlignedReader.
-        return new NullableFixedByteAlignedReaders.NullableVarDecimalReader(parentReader, allocateSize,
+        return new NullableFixedByteAlignedReaders.NullableVarDecimalReader(parentReader,
             columnDescriptor, columnChunkMetaData, fixedLength, (NullableVarDecimalVector) valueVec, schemaElement);
       } else {
-        return new NullableFixedByteAlignedReaders.NullableFixedByteAlignedReader<>(parentReader, allocateSize, columnDescriptor, columnChunkMetaData, fixedLength, valueVec, schemaElement);
+        return new NullableFixedByteAlignedReaders.NullableFixedByteAlignedReader<>(parentReader, columnDescriptor, columnChunkMetaData, fixedLength, valueVec, schemaElement);
       }
     } else {
       switch (columnDescriptor.getType()) {
         case INT32:
           if (convertedType == null) {
-            return new NullableFixedByteAlignedReaders.NullableDictionaryIntReader(parentReader, allocateSize, columnDescriptor, columnChunkMetaData, fixedLength, (NullableIntVector) valueVec, schemaElement);
+            return new NullableFixedByteAlignedReaders.NullableDictionaryIntReader(parentReader, columnDescriptor, columnChunkMetaData, fixedLength, (NullableIntVector) valueVec, schemaElement);
           }
           switch (convertedType) {
             case DECIMAL:
-              return new NullableFixedByteAlignedReaders.NullableDictionaryVarDecimalReader(parentReader, allocateSize,
+              return new NullableFixedByteAlignedReaders.NullableDictionaryVarDecimalReader(parentReader,
                   columnDescriptor, columnChunkMetaData, fixedLength, (NullableVarDecimalVector) valueVec, schemaElement);
             case TIME_MILLIS:
-              return new NullableFixedByteAlignedReaders.NullableDictionaryTimeReader(parentReader, allocateSize, columnDescriptor, columnChunkMetaData, fixedLength, (NullableTimeVector)valueVec, schemaElement);
+              return new NullableFixedByteAlignedReaders.NullableDictionaryTimeReader(parentReader, columnDescriptor, columnChunkMetaData, fixedLength, (NullableTimeVector)valueVec, schemaElement);
             default:
               throw new ExecutionSetupException("Unsupported nullable converted type " + convertedType + " for primitive type INT32");
           }
         case INT64:
           if (convertedType == null) {
-            return new NullableFixedByteAlignedReaders.NullableDictionaryBigIntReader(parentReader, allocateSize, columnDescriptor, columnChunkMetaData, fixedLength, (NullableBigIntVector)valueVec, schemaElement);
+            return new NullableFixedByteAlignedReaders.NullableDictionaryBigIntReader(parentReader, columnDescriptor, columnChunkMetaData, fixedLength, (NullableBigIntVector)valueVec, schemaElement);
           }
           switch (convertedType) {
             case DECIMAL:
-              return new NullableFixedByteAlignedReaders.NullableDictionaryVarDecimalReader(parentReader, allocateSize,
+              return new NullableFixedByteAlignedReaders.NullableDictionaryVarDecimalReader(parentReader,
                   columnDescriptor, columnChunkMetaData, fixedLength, (NullableVarDecimalVector) valueVec, schemaElement);
             case TIMESTAMP_MILLIS:
-              return new NullableFixedByteAlignedReaders.NullableDictionaryTimeStampReader(parentReader, allocateSize, columnDescriptor, columnChunkMetaData, fixedLength, (NullableTimeStampVector)valueVec, schemaElement);
+              return new NullableFixedByteAlignedReaders.NullableDictionaryTimeStampReader(parentReader, columnDescriptor, columnChunkMetaData, fixedLength, (NullableTimeStampVector)valueVec, schemaElement);
             default:
               throw new ExecutionSetupException("Unsupported nullable converted type " + convertedType + " for primitive type INT64");
           }
         case INT96:
           // TODO: check convertedType once parquet support TIMESTAMP_NANOS type annotation.
           if (parentReader.getFragmentContext().getOptions().getOption(ExecConstants.PARQUET_READER_INT96_AS_TIMESTAMP).bool_val) {
-            return new NullableFixedByteAlignedReaders.NullableFixedBinaryAsTimeStampReader(parentReader, allocateSize, columnDescriptor, columnChunkMetaData, true, (NullableTimeStampVector) valueVec, schemaElement);
+            return new NullableFixedByteAlignedReaders.NullableFixedBinaryAsTimeStampReader(parentReader, columnDescriptor, columnChunkMetaData, true, (NullableTimeStampVector) valueVec, schemaElement);
           } else {
-            return new NullableFixedByteAlignedReaders.NullableFixedBinaryReader(parentReader, allocateSize, columnDescriptor, columnChunkMetaData, true, (NullableVarBinaryVector) valueVec, schemaElement);
+            return new NullableFixedByteAlignedReaders.NullableFixedBinaryReader(parentReader, columnDescriptor, columnChunkMetaData, true, (NullableVarBinaryVector) valueVec, schemaElement);
           }
         case FLOAT:
-          return new NullableFixedByteAlignedReaders.NullableDictionaryFloat4Reader(parentReader, allocateSize, columnDescriptor, columnChunkMetaData, fixedLength, (NullableFloat4Vector)valueVec, schemaElement);
+          return new NullableFixedByteAlignedReaders.NullableDictionaryFloat4Reader(parentReader, columnDescriptor, columnChunkMetaData, fixedLength, (NullableFloat4Vector)valueVec, schemaElement);
         case DOUBLE:
-          return new NullableFixedByteAlignedReaders.NullableDictionaryFloat8Reader(parentReader, allocateSize, columnDescriptor, columnChunkMetaData, fixedLength, (NullableFloat8Vector)valueVec, schemaElement);
+          return new NullableFixedByteAlignedReaders.NullableDictionaryFloat8Reader(parentReader, columnDescriptor, columnChunkMetaData, fixedLength, (NullableFloat8Vector)valueVec, schemaElement);
         default:
           throw new ExecutionSetupException("Unsupported nullable column type " + columnDescriptor.getType().name() );
       }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/FixedByteAlignedReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/FixedByteAlignedReader.java
index 385b117..338b08d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/FixedByteAlignedReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/FixedByteAlignedReader.java
@@ -38,9 +38,9 @@ class FixedByteAlignedReader<V extends ValueVector> extends ColumnReader<V> {
   protected DrillBuf bytebuf;
 
 
-  FixedByteAlignedReader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor, ColumnChunkMetaData columnChunkMetaData,
+  FixedByteAlignedReader(ParquetRecordReader parentReader, ColumnDescriptor descriptor, ColumnChunkMetaData columnChunkMetaData,
                          boolean fixedLength, V v, SchemaElement schemaElement) throws ExecutionSetupException {
-    super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
+    super(parentReader, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
   }
 
   // this method is called by its superclass during a read loop
@@ -67,9 +67,9 @@ class FixedByteAlignedReader<V extends ValueVector> extends ColumnReader<V> {
     // TODO - replace this with fixed binary type in drill
     VariableWidthVector castedVector;
 
-    FixedBinaryReader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor, ColumnChunkMetaData columnChunkMetaData,
+    FixedBinaryReader(ParquetRecordReader parentReader, ColumnDescriptor descriptor, ColumnChunkMetaData columnChunkMetaData,
                     VariableWidthVector v, SchemaElement schemaElement) throws ExecutionSetupException {
-      super(parentReader, allocateSize, descriptor, columnChunkMetaData, true, v, schemaElement);
+      super(parentReader, descriptor, columnChunkMetaData, true, v, schemaElement);
       castedVector = v;
     }
 
@@ -91,9 +91,9 @@ class FixedByteAlignedReader<V extends ValueVector> extends ColumnReader<V> {
 
     protected int dataTypeLengthInBytes;
 
-    ConvertedReader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor, ColumnChunkMetaData columnChunkMetaData,
+    ConvertedReader(ParquetRecordReader parentReader, ColumnDescriptor descriptor, ColumnChunkMetaData columnChunkMetaData,
                            boolean fixedLength, V v, SchemaElement schemaElement) throws ExecutionSetupException {
-      super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
+      super(parentReader, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
     }
 
     @Override
@@ -115,9 +115,9 @@ class FixedByteAlignedReader<V extends ValueVector> extends ColumnReader<V> {
   public static class DateReader extends ConvertedReader<DateVector> {
 
     private final DateVector.Mutator mutator;
-    DateReader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor, ColumnChunkMetaData columnChunkMetaData,
+    DateReader(ParquetRecordReader parentReader, ColumnDescriptor descriptor, ColumnChunkMetaData columnChunkMetaData,
                     boolean fixedLength, DateVector v, SchemaElement schemaElement) throws ExecutionSetupException {
-      super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
+      super(parentReader, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
       mutator = v.getMutator();
     }
 
@@ -141,9 +141,9 @@ class FixedByteAlignedReader<V extends ValueVector> extends ColumnReader<V> {
 
     private final DateVector.Mutator mutator;
 
-    CorruptDateReader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor, ColumnChunkMetaData columnChunkMetaData,
+    CorruptDateReader(ParquetRecordReader parentReader, ColumnDescriptor descriptor, ColumnChunkMetaData columnChunkMetaData,
                       boolean fixedLength, DateVector v, SchemaElement schemaElement) throws ExecutionSetupException {
-      super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
+      super(parentReader, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
       mutator = v.getMutator();
     }
 
@@ -171,9 +171,9 @@ class FixedByteAlignedReader<V extends ValueVector> extends ColumnReader<V> {
 
     private final DateVector.Mutator mutator;
 
-    CorruptionDetectingDateReader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor, ColumnChunkMetaData columnChunkMetaData,
+    CorruptionDetectingDateReader(ParquetRecordReader parentReader, ColumnDescriptor descriptor, ColumnChunkMetaData columnChunkMetaData,
                                   boolean fixedLength, DateVector v, SchemaElement schemaElement) throws ExecutionSetupException {
-      super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
+      super(parentReader, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
       mutator = v.getMutator();
     }
 
@@ -197,9 +197,9 @@ class FixedByteAlignedReader<V extends ValueVector> extends ColumnReader<V> {
 
   public static class VarDecimalReader extends ConvertedReader<VarDecimalVector> {
 
-    VarDecimalReader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor, ColumnChunkMetaData columnChunkMetaData,
+    VarDecimalReader(ParquetRecordReader parentReader, ColumnDescriptor descriptor, ColumnChunkMetaData columnChunkMetaData,
         boolean fixedLength, VarDecimalVector v, SchemaElement schemaElement) throws ExecutionSetupException {
-      super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
+      super(parentReader, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
     }
 
     @Override
@@ -220,9 +220,9 @@ class FixedByteAlignedReader<V extends ValueVector> extends ColumnReader<V> {
   }
 
   public static class IntervalReader extends ConvertedReader<IntervalVector> {
-    IntervalReader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor, ColumnChunkMetaData columnChunkMetaData,
+    IntervalReader(ParquetRecordReader parentReader, ColumnDescriptor descriptor, ColumnChunkMetaData columnChunkMetaData,
                    boolean fixedLength, IntervalVector v, SchemaElement schemaElement) throws ExecutionSetupException {
-      super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
+      super(parentReader, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
     }
 
     @Override
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/FixedWidthRepeatedReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/FixedWidthRepeatedReader.java
index 162e502..0b2d678 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/FixedWidthRepeatedReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/FixedWidthRepeatedReader.java
@@ -46,8 +46,10 @@ public class FixedWidthRepeatedReader extends VarLengthColumn<RepeatedValueVecto
   boolean notFishedReadingList;
   byte[] leftOverBytes;
 
-  FixedWidthRepeatedReader(ParquetRecordReader parentReader, ColumnReader<?> dataReader, int dataTypeLengthInBytes, int allocateSize, ColumnDescriptor descriptor, ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, RepeatedValueVector valueVector, SchemaElement schemaElement) throws ExecutionSetupException {
-    super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, valueVector, schemaElement);
+  FixedWidthRepeatedReader(ParquetRecordReader parentReader, ColumnReader<?> dataReader, int dataTypeLengthInBytes,
+                           ColumnDescriptor descriptor, ColumnChunkMetaData columnChunkMetaData, boolean fixedLength,
+                           RepeatedValueVector valueVector, SchemaElement schemaElement) throws ExecutionSetupException {
+    super(parentReader, descriptor, columnChunkMetaData, fixedLength, valueVector, schemaElement);
     this.dataTypeLengthInBytes = dataTypeLengthInBytes;
     this.dataReader = dataReader;
     this.dataReader.pageReader.clear();
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableBitReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableBitReader.java
index b97bc82..e3a8f31 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableBitReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableBitReader.java
@@ -33,9 +33,9 @@ import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
  */
 final class NullableBitReader extends ColumnReader<NullableBitVector> {
 
-  NullableBitReader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor, ColumnChunkMetaData columnChunkMetaData,
+  NullableBitReader(ParquetRecordReader parentReader, ColumnDescriptor descriptor, ColumnChunkMetaData columnChunkMetaData,
                     boolean fixedLength, NullableBitVector v, SchemaElement schemaElement) throws ExecutionSetupException {
-    super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
+    super(parentReader, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
   }
 
   @Override
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableColumnReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableColumnReader.java
index 949474b..ed61bef 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableColumnReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableColumnReader.java
@@ -38,9 +38,9 @@ abstract class NullableColumnReader<V extends ValueVector> extends ColumnReader<
   /** Definition level wrapper to handle {@link ValueVector} limitations */
   private final DefLevelReaderWrapper definitionLevelWrapper = new DefLevelReaderWrapper();
 
-  NullableColumnReader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor, ColumnChunkMetaData columnChunkMetaData,
+  NullableColumnReader(ParquetRecordReader parentReader, ColumnDescriptor descriptor, ColumnChunkMetaData columnChunkMetaData,
                boolean fixedLength, V v, SchemaElement schemaElement) throws ExecutionSetupException {
-    super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
+    super(parentReader, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
 
     castedBaseVector = (BaseDataValueVector) v;
     castedVectorMutator = (NullableVectorDefinitionSetter) v.getMutator();
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableFixedByteAlignedReaders.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableFixedByteAlignedReaders.java
index 9e66b1f..6a09bd6 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableFixedByteAlignedReaders.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableFixedByteAlignedReaders.java
@@ -49,9 +49,9 @@ public class NullableFixedByteAlignedReaders {
   static class NullableFixedByteAlignedReader<V extends ValueVector> extends NullableColumnReader<V> {
     protected DrillBuf bytebuf;
 
-    NullableFixedByteAlignedReader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor,
+    NullableFixedByteAlignedReader(ParquetRecordReader parentReader, ColumnDescriptor descriptor,
                       ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, V v, SchemaElement schemaElement) throws ExecutionSetupException {
-      super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
+      super(parentReader, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
     }
 
     // this method is called by its superclass during a read loop
@@ -70,9 +70,9 @@ public class NullableFixedByteAlignedReaders {
    * each value.
    */
   static class NullableFixedBinaryReader extends NullableFixedByteAlignedReader<NullableVarBinaryVector> {
-    NullableFixedBinaryReader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor,
+    NullableFixedBinaryReader(ParquetRecordReader parentReader, ColumnDescriptor descriptor,
                                    ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, NullableVarBinaryVector v, SchemaElement schemaElement) throws ExecutionSetupException {
-      super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
+      super(parentReader, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
     }
 
     @Override
@@ -109,9 +109,9 @@ public class NullableFixedByteAlignedReaders {
    * impala timestamp values with nanoseconds precision (12 bytes). It reads such values as a drill timestamp (8 bytes).
    */
   static class NullableFixedBinaryAsTimeStampReader extends NullableFixedByteAlignedReader<NullableTimeStampVector> {
-    NullableFixedBinaryAsTimeStampReader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor,
+    NullableFixedBinaryAsTimeStampReader(ParquetRecordReader parentReader, ColumnDescriptor descriptor,
                               ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, NullableTimeStampVector v, SchemaElement schemaElement) throws ExecutionSetupException {
-      super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
+      super(parentReader, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
     }
 
     @Override
@@ -135,10 +135,10 @@ public class NullableFixedByteAlignedReaders {
 
   static class NullableDictionaryIntReader extends NullableColumnReader<NullableIntVector> {
 
-    NullableDictionaryIntReader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor,
+    NullableDictionaryIntReader(ParquetRecordReader parentReader, ColumnDescriptor descriptor,
                                 ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, NullableIntVector v,
                                 SchemaElement schemaElement) throws ExecutionSetupException {
-      super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
+      super(parentReader, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
     }
 
     // this method is called by its superclass during a read loop
@@ -161,10 +161,10 @@ public class NullableFixedByteAlignedReaders {
 
   static class NullableDictionaryTimeReader extends NullableColumnReader<NullableTimeVector> {
 
-    NullableDictionaryTimeReader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor,
+    NullableDictionaryTimeReader(ParquetRecordReader parentReader, ColumnDescriptor descriptor,
                                      ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, NullableTimeVector v,
                                      SchemaElement schemaElement) throws ExecutionSetupException {
-      super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
+      super(parentReader, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
     }
 
     // this method is called by its superclass during a read loop
@@ -184,10 +184,10 @@ public class NullableFixedByteAlignedReaders {
 
   static class NullableDictionaryBigIntReader extends NullableColumnReader<NullableBigIntVector> {
 
-    NullableDictionaryBigIntReader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor,
+    NullableDictionaryBigIntReader(ParquetRecordReader parentReader, ColumnDescriptor descriptor,
                                    ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, NullableBigIntVector v,
                                    SchemaElement schemaElement) throws ExecutionSetupException {
-      super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
+      super(parentReader, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
     }
 
     // this method is called by its superclass during a read loop
@@ -207,10 +207,10 @@ public class NullableFixedByteAlignedReaders {
 
   static class NullableDictionaryTimeStampReader extends NullableColumnReader<NullableTimeStampVector> {
 
-    NullableDictionaryTimeStampReader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor,
+    NullableDictionaryTimeStampReader(ParquetRecordReader parentReader, ColumnDescriptor descriptor,
                                    ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, NullableTimeStampVector v,
                                    SchemaElement schemaElement) throws ExecutionSetupException {
-      super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
+      super(parentReader, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
     }
 
     // this method is called by its superclass during a read loop
@@ -230,10 +230,10 @@ public class NullableFixedByteAlignedReaders {
 
   static class NullableDictionaryVarDecimalReader extends NullableColumnReader<NullableVarDecimalVector> {
 
-    NullableDictionaryVarDecimalReader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor,
+    NullableDictionaryVarDecimalReader(ParquetRecordReader parentReader, ColumnDescriptor descriptor,
         ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, NullableVarDecimalVector v,
         SchemaElement schemaElement) throws ExecutionSetupException {
-      super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
+      super(parentReader, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
     }
 
     // this method is called by its superclass during a read loop
@@ -276,10 +276,10 @@ public class NullableFixedByteAlignedReaders {
 
   static class NullableDictionaryFloat4Reader extends NullableColumnReader<NullableFloat4Vector> {
 
-    NullableDictionaryFloat4Reader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor,
+    NullableDictionaryFloat4Reader(ParquetRecordReader parentReader, ColumnDescriptor descriptor,
                                    ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, NullableFloat4Vector v,
                                    SchemaElement schemaElement) throws ExecutionSetupException {
-      super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
+      super(parentReader, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
     }
 
     // this method is called by its superclass during a read loop
@@ -299,10 +299,10 @@ public class NullableFixedByteAlignedReaders {
 
   static class NullableDictionaryFloat8Reader extends NullableColumnReader<NullableFloat8Vector> {
 
-    NullableDictionaryFloat8Reader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor,
+    NullableDictionaryFloat8Reader(ParquetRecordReader parentReader, ColumnDescriptor descriptor,
                                   ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, NullableFloat8Vector v,
                                   SchemaElement schemaElement) throws ExecutionSetupException {
-      super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
+      super(parentReader, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
     }
 
     // this method is called by its superclass during a read loop
@@ -324,9 +324,9 @@ public class NullableFixedByteAlignedReaders {
 
     protected int dataTypeLengthInBytes;
 
-    NullableConvertedReader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor,
+    NullableConvertedReader(ParquetRecordReader parentReader, ColumnDescriptor descriptor,
                             ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, V v, SchemaElement schemaElement) throws ExecutionSetupException {
-      super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
+      super(parentReader, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
     }
 
     @Override
@@ -344,9 +344,9 @@ public class NullableFixedByteAlignedReaders {
   }
 
   public static class NullableDateReader extends NullableConvertedReader<NullableDateVector> {
-    NullableDateReader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor, ColumnChunkMetaData columnChunkMetaData,
+    NullableDateReader(ParquetRecordReader parentReader, ColumnDescriptor descriptor, ColumnChunkMetaData columnChunkMetaData,
                        boolean fixedLength, NullableDateVector v, SchemaElement schemaElement) throws ExecutionSetupException {
-      super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
+      super(parentReader, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
     }
 
     @Override
@@ -368,9 +368,9 @@ public class NullableFixedByteAlignedReaders {
    */
   public static class NullableCorruptDateReader extends NullableConvertedReader<NullableDateVector> {
 
-    NullableCorruptDateReader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor, ColumnChunkMetaData columnChunkMetaData,
+    NullableCorruptDateReader(ParquetRecordReader parentReader, ColumnDescriptor descriptor, ColumnChunkMetaData columnChunkMetaData,
                        boolean fixedLength, NullableDateVector v, SchemaElement schemaElement) throws ExecutionSetupException {
-      super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
+      super(parentReader, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
     }
 
     @Override
@@ -395,11 +395,11 @@ public class NullableFixedByteAlignedReaders {
    */
   public static class CorruptionDetectingNullableDateReader extends NullableConvertedReader<NullableDateVector> {
 
-    CorruptionDetectingNullableDateReader(ParquetRecordReader parentReader, int allocateSize,
+    CorruptionDetectingNullableDateReader(ParquetRecordReader parentReader,
                                           ColumnDescriptor descriptor, ColumnChunkMetaData columnChunkMetaData,
                                           boolean fixedLength, NullableDateVector v, SchemaElement schemaElement)
         throws ExecutionSetupException {
-      super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
+      super(parentReader, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
     }
 
     @Override
@@ -420,9 +420,9 @@ public class NullableFixedByteAlignedReaders {
   }
 
   public static class NullableVarDecimalReader extends NullableConvertedReader<NullableVarDecimalVector> {
-    NullableVarDecimalReader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor, ColumnChunkMetaData columnChunkMetaData,
+    NullableVarDecimalReader(ParquetRecordReader parentReader, ColumnDescriptor descriptor, ColumnChunkMetaData columnChunkMetaData,
         boolean fixedLength, NullableVarDecimalVector v, SchemaElement schemaElement) throws ExecutionSetupException {
-      super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
+      super(parentReader, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
     }
 
     // TODO: allow reading page instead of reading every record separately
@@ -445,9 +445,9 @@ public class NullableFixedByteAlignedReaders {
   }
 
   public static class NullableIntervalReader extends NullableConvertedReader<NullableIntervalVector> {
-    NullableIntervalReader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor, ColumnChunkMetaData columnChunkMetaData,
+    NullableIntervalReader(ParquetRecordReader parentReader, ColumnDescriptor descriptor, ColumnChunkMetaData columnChunkMetaData,
                    boolean fixedLength, NullableIntervalVector v, SchemaElement schemaElement) throws ExecutionSetupException {
-      super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
+      super(parentReader, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
     }
 
     @Override
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableVarLengthValuesColumn.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableVarLengthValuesColumn.java
index f236315..7608d17 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableVarLengthValuesColumn.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableVarLengthValuesColumn.java
@@ -33,10 +33,10 @@ public abstract class NullableVarLengthValuesColumn<V extends ValueVector> exten
   int nullsRead;
   boolean currentValNull = false;
 
-  NullableVarLengthValuesColumn(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor,
+  NullableVarLengthValuesColumn(ParquetRecordReader parentReader, ColumnDescriptor descriptor,
                                 ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, V v,
                                 SchemaElement schemaElement) throws ExecutionSetupException {
-    super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
+    super(parentReader, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
   }
 
   @Override
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetColumnMetadata.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetColumnMetadata.java
index d49a416..147938d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetColumnMetadata.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetColumnMetadata.java
@@ -106,7 +106,7 @@ public class ParquetColumnMetadata {
    *
    * @return the length if fixed width, else <tt>UNDEFINED_LENGTH</tt> (-1)
    */
-  private int getDataTypeLength() {
+  public int getDataTypeLength() {
     if (! isFixedLength()) {
       return UNDEFINED_LENGTH;
     } else if (isRepeated()) {
@@ -126,29 +126,33 @@ public class ParquetColumnMetadata {
     return column.getMaxRepetitionLevel() > 0;
   }
 
+  public MaterializedField getField() {
+    return field;
+  }
+
   ValueVector buildVector(OutputMutator output) throws SchemaChangeException {
     Class<? extends ValueVector> vectorClass = TypeHelper.getValueVectorClass(type.getMinorType(), type.getMode());
     vector = output.addField(field, vectorClass);
     return vector;
   }
 
-  ColumnReader<?> makeFixedWidthReader(ParquetRecordReader reader, int recordsPerBatch) throws Exception {
+  ColumnReader<?> makeFixedWidthReader(ParquetRecordReader reader) throws Exception {
     return ColumnReaderFactory.createFixedColumnReader(reader, true,
-        column, columnChunkMetaData, recordsPerBatch, vector, se);
+        column, columnChunkMetaData, vector, se);
   }
 
   @SuppressWarnings("resource")
-  FixedWidthRepeatedReader makeRepeatedFixedWidthReader(ParquetRecordReader reader, int recordsPerBatch) throws Exception {
+  FixedWidthRepeatedReader makeRepeatedFixedWidthReader(ParquetRecordReader reader) throws Exception {
     final RepeatedValueVector repeatedVector = RepeatedValueVector.class.cast(vector);
     ColumnReader<?> dataReader = ColumnReaderFactory.createFixedColumnReader(reader, true,
-        column, columnChunkMetaData, recordsPerBatch,
+        column, columnChunkMetaData,
         repeatedVector.getDataVector(), se);
     return new FixedWidthRepeatedReader(reader, dataReader,
-        getTypeLengthInBits(column.getType()), UNDEFINED_LENGTH, column, columnChunkMetaData, false, repeatedVector, se);
+        getTypeLengthInBits(column.getType()), column, columnChunkMetaData, false, repeatedVector, se);
   }
 
   VarLengthValuesColumn<?> makeVariableWidthReader(ParquetRecordReader reader) throws ExecutionSetupException {
-    return ColumnReaderFactory.getReader(reader, UNDEFINED_LENGTH, column, columnChunkMetaData, false, vector, se);
+    return ColumnReaderFactory.getReader(reader, column, columnChunkMetaData, false, vector, se);
   }
 
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetFixedWidthDictionaryReaders.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetFixedWidthDictionaryReaders.java
index 38ab700..171983e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetFixedWidthDictionaryReaders.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetFixedWidthDictionaryReaders.java
@@ -42,10 +42,10 @@ public class ParquetFixedWidthDictionaryReaders {
   private static final double BITS_COUNT_IN_BYTE_DOUBLE_VALUE = 8.0;
 
   static class DictionaryIntReader extends FixedByteAlignedReader<IntVector> {
-    DictionaryIntReader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor,
+    DictionaryIntReader(ParquetRecordReader parentReader, ColumnDescriptor descriptor,
                                 ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, IntVector v,
                                 SchemaElement schemaElement) throws ExecutionSetupException {
-      super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
+      super(parentReader, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
     }
 
     // this method is called by its superclass during a read loop
@@ -67,10 +67,10 @@ public class ParquetFixedWidthDictionaryReaders {
    * This class uses for reading unsigned integer fields.
    */
   static class DictionaryUInt4Reader extends FixedByteAlignedReader<UInt4Vector> {
-    DictionaryUInt4Reader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor,
+    DictionaryUInt4Reader(ParquetRecordReader parentReader, ColumnDescriptor descriptor,
                         ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, UInt4Vector v,
                         SchemaElement schemaElement) throws ExecutionSetupException {
-      super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
+      super(parentReader, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
     }
 
     // this method is called by its superclass during a read loop
@@ -99,10 +99,10 @@ public class ParquetFixedWidthDictionaryReaders {
   }
 
   static class DictionaryFixedBinaryReader extends FixedByteAlignedReader<VarBinaryVector> {
-    DictionaryFixedBinaryReader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor,
+    DictionaryFixedBinaryReader(ParquetRecordReader parentReader, ColumnDescriptor descriptor,
                         ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, VarBinaryVector v,
                         SchemaElement schemaElement) throws ExecutionSetupException {
-      super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
+      super(parentReader, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
     }
 
     // this method is called by its superclass during a read loop
@@ -141,10 +141,10 @@ public class ParquetFixedWidthDictionaryReaders {
   }
 
   static class DictionaryTimeReader extends FixedByteAlignedReader<TimeVector> {
-    DictionaryTimeReader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor,
+    DictionaryTimeReader(ParquetRecordReader parentReader, ColumnDescriptor descriptor,
                         ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, TimeVector v,
                         SchemaElement schemaElement) throws ExecutionSetupException {
-      super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
+      super(parentReader, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
     }
 
     // this method is called by its superclass during a read loop
@@ -163,10 +163,10 @@ public class ParquetFixedWidthDictionaryReaders {
   }
 
   static class DictionaryBigIntReader extends FixedByteAlignedReader<BigIntVector> {
-    DictionaryBigIntReader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor,
+    DictionaryBigIntReader(ParquetRecordReader parentReader, ColumnDescriptor descriptor,
                                    ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, BigIntVector v,
                                    SchemaElement schemaElement) throws ExecutionSetupException {
-      super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
+      super(parentReader, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
     }
 
     // this method is called by its superclass during a read loop
@@ -198,10 +198,10 @@ public class ParquetFixedWidthDictionaryReaders {
    * This class uses for reading unsigned BigInt fields.
    */
   static class DictionaryUInt8Reader extends FixedByteAlignedReader<UInt8Vector> {
-    DictionaryUInt8Reader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor,
+    DictionaryUInt8Reader(ParquetRecordReader parentReader, ColumnDescriptor descriptor,
                            ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, UInt8Vector v,
                            SchemaElement schemaElement) throws ExecutionSetupException {
-      super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
+      super(parentReader, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
     }
 
     // this method is called by its superclass during a read loop
@@ -231,10 +231,10 @@ public class ParquetFixedWidthDictionaryReaders {
 
   static class DictionaryVarDecimalReader extends FixedByteAlignedReader<VarDecimalVector> {
 
-    DictionaryVarDecimalReader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor,
+    DictionaryVarDecimalReader(ParquetRecordReader parentReader, ColumnDescriptor descriptor,
         ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, VarDecimalVector v,
         SchemaElement schemaElement) throws ExecutionSetupException {
-      super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
+      super(parentReader, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
     }
 
     // this method is called by its superclass during a read loop
@@ -288,10 +288,10 @@ public class ParquetFixedWidthDictionaryReaders {
   }
 
   static class DictionaryTimeStampReader extends FixedByteAlignedReader<TimeStampVector> {
-    DictionaryTimeStampReader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor,
+    DictionaryTimeStampReader(ParquetRecordReader parentReader, ColumnDescriptor descriptor,
                            ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, TimeStampVector v,
                            SchemaElement schemaElement) throws ExecutionSetupException {
-      super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
+      super(parentReader, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
     }
 
     // this method is called by its superclass during a read loop
@@ -312,10 +312,10 @@ public class ParquetFixedWidthDictionaryReaders {
   }
 
   static class DictionaryBinaryAsTimeStampReader extends FixedByteAlignedReader<TimeStampVector> {
-    DictionaryBinaryAsTimeStampReader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor,
+    DictionaryBinaryAsTimeStampReader(ParquetRecordReader parentReader, ColumnDescriptor descriptor,
                               ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, TimeStampVector v,
                               SchemaElement schemaElement) throws ExecutionSetupException {
-      super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
+      super(parentReader, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
     }
 
     // this method is called by its superclass during a read loop
@@ -337,10 +337,10 @@ public class ParquetFixedWidthDictionaryReaders {
   }
 
   static class DictionaryFloat4Reader extends FixedByteAlignedReader<Float4Vector> {
-    DictionaryFloat4Reader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor,
+    DictionaryFloat4Reader(ParquetRecordReader parentReader, ColumnDescriptor descriptor,
                                    ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, Float4Vector v,
                                    SchemaElement schemaElement) throws ExecutionSetupException {
-      super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
+      super(parentReader, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
     }
 
     // this method is called by its superclass during a read loop
@@ -356,10 +356,10 @@ public class ParquetFixedWidthDictionaryReaders {
   }
 
   static class DictionaryFloat8Reader extends FixedByteAlignedReader<Float8Vector> {
-    DictionaryFloat8Reader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor,
+    DictionaryFloat8Reader(ParquetRecordReader parentReader, ColumnDescriptor descriptor,
                                    ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, Float8Vector v,
                                    SchemaElement schemaElement) throws ExecutionSetupException {
-      super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
+      super(parentReader, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
     }
 
     // this method is called by its superclass during a read loop
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java
index 712bbce..e1ca73f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java
@@ -17,10 +17,11 @@
  */
 package org.apache.drill.exec.store.parquet.columnreaders;
 
+import com.google.common.base.Stopwatch;
+import com.google.common.collect.ImmutableList;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
-
 import org.apache.drill.common.exceptions.DrillRuntimeException;
 import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.common.expression.SchemaPath;
@@ -33,25 +34,17 @@ import org.apache.drill.exec.physical.impl.OutputMutator;
 import org.apache.drill.exec.store.AbstractRecordReader;
 import org.apache.drill.exec.store.parquet.ParquetReaderStats;
 import org.apache.drill.exec.store.parquet.ParquetReaderUtility;
-import org.apache.drill.exec.vector.AllocationHelper;
+import org.apache.drill.exec.store.parquet.columnreaders.batchsizing.RecordBatchSizerManager;
 import org.apache.drill.exec.vector.ValueVector;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.parquet.hadoop.CodecFactory;
 import org.apache.parquet.hadoop.metadata.ParquetMetadata;
 
-import com.google.common.base.Stopwatch;
-import com.google.common.collect.ImmutableList;
-
 public class ParquetRecordReader extends AbstractRecordReader {
   private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ParquetRecordReader.class);
 
-  // this value has been inflated to read in multiple value vectors at once, and then break them up into smaller vectors
-  private static final int NUMBER_OF_VECTORS = 1;
-  private static final long DEFAULT_BATCH_LENGTH = 256 * 1024 * NUMBER_OF_VECTORS; // 256kb
-  private static final long DEFAULT_BATCH_LENGTH_IN_BITS = DEFAULT_BATCH_LENGTH * 8; // 256kb
-  static final char DEFAULT_RECORDS_TO_READ_IF_VARIABLE_WIDTH = 32*1024; // 32K
-  static final int DEFAULT_RECORDS_TO_READ_IF_FIXED_WIDTH = 64*1024 - 1; // 64K - 1, max SV2 can address
+  /** Set when caller wants to read all the rows contained within the Parquet file */
   static final int NUM_RECORDS_TO_READ_NOT_SPECIFIED = -1;
 
   // When no column is required by the downstream operator, ask SCAN to return a DEFAULT column. If such column does not exist,
@@ -69,8 +62,7 @@ public class ParquetRecordReader extends AbstractRecordReader {
   private OperatorContext operatorContext;
 
   private FileSystem fileSystem;
-  private final long batchSize;
-  private long numRecordsToRead; // number of records to read
+  private final long numRecordsToRead; // number of records to read
 
   Path hadoopPath;
   private ParquetMetadata footer;
@@ -80,8 +72,12 @@ public class ParquetRecordReader extends AbstractRecordReader {
   private final FragmentContext fragmentContext;
   ParquetReaderUtility.DateCorruptionStatus dateCorruptionStatus;
 
+  /** Parquet Schema */
   ParquetSchema schema;
+  /** Container object for holding Parquet columnar readers state */
   ReadState readState;
+  /** Responsible for managing record batch size constraints */
+  RecordBatchSizerManager batchSizerMgr;
 
   public boolean useAsyncColReader;
   public boolean useAsyncPageReader;
@@ -134,7 +130,7 @@ public class ParquetRecordReader extends AbstractRecordReader {
       ParquetMetadata footer,
       List<SchemaPath> columns,
       ParquetReaderUtility.DateCorruptionStatus dateCorruptionStatus) throws ExecutionSetupException {
-    this(fragmentContext, DEFAULT_BATCH_LENGTH_IN_BITS, numRecordsToRead,
+    this(fragmentContext, numRecordsToRead,
          path, rowGroupIndex, fs, codecFactory, footer, columns, dateCorruptionStatus);
   }
 
@@ -147,13 +143,12 @@ public class ParquetRecordReader extends AbstractRecordReader {
       List<SchemaPath> columns,
       ParquetReaderUtility.DateCorruptionStatus dateCorruptionStatus)
       throws ExecutionSetupException {
-      this(fragmentContext, DEFAULT_BATCH_LENGTH_IN_BITS, footer.getBlocks().get(rowGroupIndex).getRowCount(),
+      this(fragmentContext, footer.getBlocks().get(rowGroupIndex).getRowCount(),
            path, rowGroupIndex, fs, codecFactory, footer, columns, dateCorruptionStatus);
   }
 
   public ParquetRecordReader(
       FragmentContext fragmentContext,
-      long batchSize,
       long numRecordsToRead,
       String path,
       int rowGroupIndex,
@@ -168,11 +163,10 @@ public class ParquetRecordReader extends AbstractRecordReader {
     this.fileSystem = fs;
     this.codecFactory = codecFactory;
     this.rowGroupIndex = rowGroupIndex;
-    this.batchSize = batchSize;
     this.footer = footer;
     this.dateCorruptionStatus = dateCorruptionStatus;
     this.fragmentContext = fragmentContext;
-    this.numRecordsToRead = numRecordsToRead;
+    this.numRecordsToRead = initNumRecordsToRead(numRecordsToRead, rowGroupIndex, footer);
     this.useAsyncColReader = fragmentContext.getOptions().getOption(ExecConstants.PARQUET_COLUMNREADER_ASYNC).bool_val;
     this.useAsyncPageReader = fragmentContext.getOptions().getOption(ExecConstants.PARQUET_PAGEREADER_ASYNC).bool_val;
     this.useBufferedReader = fragmentContext.getOptions().getOption(ExecConstants.PARQUET_PAGEREADER_USE_BUFFERED_READ).bool_val;
@@ -215,8 +209,8 @@ public class ParquetRecordReader extends AbstractRecordReader {
     return schema.getBitWidthAllFixedFields();
   }
 
-  public long getBatchSize() {
-    return batchSize;
+  public RecordBatchSizerManager getBatchSizesMgr() {
+    return batchSizerMgr;
   }
 
   public OperatorContext getOperatorContext() {
@@ -242,18 +236,19 @@ public class ParquetRecordReader extends AbstractRecordReader {
    * contains at least one variable-width field, or is a "mock" scan consisting
    * only of null fields (fields in the SELECT clause but not in the Parquet file.)
    */
-
   @Override
   public void setup(OperatorContext operatorContext, OutputMutator output) throws ExecutionSetupException {
     this.operatorContext = operatorContext;
     schema = new ParquetSchema(fragmentContext.getOptions(), rowGroupIndex, footer, isStarQuery() ? null : getColumns());
+    batchSizerMgr = new RecordBatchSizerManager(fragmentContext.getOptions(), schema, numRecordsToRead);
 
     logger.debug("Reading row group({}) with {} records in file {}.", rowGroupIndex, footer.getBlocks().get(rowGroupIndex).getRowCount(),
         hadoopPath.toUri().getPath());
 
     try {
-      schema.buildSchema(batchSize);
-      readState = new ReadState(schema, parquetReaderStats, numRecordsToRead, useAsyncColReader);
+      schema.buildSchema();
+      batchSizerMgr.setup();
+      readState = new ReadState(schema, batchSizerMgr, parquetReaderStats, numRecordsToRead, useAsyncColReader);
       readState.buildReader(this, output);
     } catch (Exception e) {
       throw handleException("Failure in setting up reader", e);
@@ -277,21 +272,13 @@ public class ParquetRecordReader extends AbstractRecordReader {
 
   @Override
   public void allocate(Map<String, ValueVector> vectorMap) throws OutOfMemoryException {
-    try {
-      int recordsPerBatch = schema.getRecordsPerBatch();
-      for (final ValueVector v : vectorMap.values()) {
-        AllocationHelper.allocate(v, recordsPerBatch, 50, 10);
-      }
-    } catch (NullPointerException e) {
-      throw new OutOfMemoryException();
-    }
+    batchSizerMgr.allocate(vectorMap);
   }
 
   /**
    * Read the next record batch from the file using the reader and read state
    * created previously.
    */
-
   @Override
   public int next() {
     readState.resetBatch();
@@ -314,14 +301,25 @@ public class ParquetRecordReader extends AbstractRecordReader {
     logger.debug("Read {} records out of row group({}) in file '{}'",
         recordsRead, rowGroupIndex,
         hadoopPath.toUri().getPath());
+
     // enable this for debugging when it is know that a whole file will be read
     // limit kills upstream operators once it has enough records, so this assert will fail
-//    assert totalRecordsRead == footer.getBlocks().get(rowGroupIndex).getRowCount();
+    //    assert totalRecordsRead == footer.getBlocks().get(rowGroupIndex).getRowCount();
+
+    // NOTE - We check whether the parquet reader data structure is not null before calling close();
+    //        this is because close() can be invoked before setup() has executed (probably because of
+    //        spurious failures).
+
     if (readState != null) {
       readState.close();
       readState = null;
     }
 
+    if (batchSizerMgr != null) {
+      batchSizerMgr.close();
+      batchSizerMgr = null;
+    }
+
     codecFactory.release();
 
     if (parquetReaderStats != null) {
@@ -339,4 +337,14 @@ public class ParquetRecordReader extends AbstractRecordReader {
   protected List<SchemaPath> getDefaultColumnsToRead() {
     return DEFAULT_COLS_TO_READ;
   }
+
+  private int initNumRecordsToRead(long numRecordsToRead, int rowGroupIndex, ParquetMetadata footer) {
+    // Callers can pass -1 if they want to read all rows.
+    if (numRecordsToRead == NUM_RECORDS_TO_READ_NOT_SPECIFIED) {
+      return (int) footer.getBlocks().get(rowGroupIndex).getRowCount();
+    } else {
+      assert (numRecordsToRead >= 0);
+      return (int) Math.min(numRecordsToRead, footer.getBlocks().get(rowGroupIndex).getRowCount());
+    }
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetSchema.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetSchema.java
index 3935919..6717445 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetSchema.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetSchema.java
@@ -48,7 +48,7 @@ import com.google.common.collect.Lists;
  * to the schema that Drill and the Parquet reader uses.
  */
 
-public class ParquetSchema {
+public final class ParquetSchema {
   /**
    * Set of columns specified in the SELECT clause. Will be null for
    * a SELECT * query.
@@ -74,7 +74,6 @@ public class ParquetSchema {
   private int bitWidthAllFixedFields;
   private boolean allFieldsFixedLength;
   private long groupRecordCount;
-  private int recordsPerBatch;
 
   /**
    * Build the Parquet schema. The schema can be based on a "SELECT *",
@@ -108,22 +107,13 @@ public class ParquetSchema {
    * Build the schema for this read as a combination of the schema specified in
    * the Parquet footer and the list of columns selected in the query.
    *
-   * @param batchSize target size of the batch, in rows
    * @throws Exception if anything goes wrong
    */
 
-  public void buildSchema(long batchSize) throws Exception {
+  public void buildSchema() throws Exception {
     groupRecordCount = footer.getBlocks().get(rowGroupIndex).getRowCount();
     loadParquetSchema();
     computeFixedPart();
-
-    if (! selectedColumnMetadata.isEmpty() && allFieldsFixedLength) {
-      recordsPerBatch = (int) Math.min(Math.min(batchSize / bitWidthAllFixedFields,
-          footer.getBlocks().get(0).getColumns().get(0).getValueCount()), ParquetRecordReader.DEFAULT_RECORDS_TO_READ_IF_FIXED_WIDTH);
-    }
-    else {
-      recordsPerBatch = ParquetRecordReader.DEFAULT_RECORDS_TO_READ_IF_VARIABLE_WIDTH;
-    }
   }
 
   /**
@@ -168,7 +158,6 @@ public class ParquetSchema {
   public boolean isStarQuery() { return selectedCols == null; }
   public ParquetMetadata footer() { return footer; }
   public int getBitWidthAllFixedFields() { return bitWidthAllFixedFields; }
-  public int getRecordsPerBatch() { return recordsPerBatch; }
   public boolean allFieldsFixedLength() { return allFieldsFixedLength; }
   public List<ParquetColumnMetadata> getColumnMetadata() { return selectedColumnMetadata; }
 
@@ -195,9 +184,6 @@ public class ParquetSchema {
    */
 
   private boolean fieldSelected(MaterializedField field) {
-    // TODO - not sure if this is how we want to represent this
-    // for now it makes the existing tests pass, simply selecting
-    // all available data if no columns are provided
     if (isStarQuery()) {
       return true;
     }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ReadState.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ReadState.java
index f94edf1..c545862 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ReadState.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ReadState.java
@@ -24,6 +24,7 @@ import java.util.Map;
 
 import org.apache.drill.exec.physical.impl.OutputMutator;
 import org.apache.drill.exec.store.parquet.ParquetReaderStats;
+import org.apache.drill.exec.store.parquet.columnreaders.batchsizing.RecordBatchSizerManager;
 import org.apache.drill.exec.vector.NullableIntVector;
 import org.apache.drill.exec.vector.ValueVector;
 import org.apache.parquet.column.ColumnDescriptor;
@@ -39,7 +40,10 @@ import org.apache.parquet.hadoop.metadata.BlockMetaData;
  */
 
 public class ReadState {
+  /** The Parquet Schema */
   private final ParquetSchema schema;
+  /** Responsible for managing record batch size constraints */
+  private final RecordBatchSizerManager batchSizerMgr;
   private final ParquetReaderStats parquetReaderStats;
   private VarLenBinaryReader varLengthReader;
   /**
@@ -48,8 +52,12 @@ public class ReadState {
    * that need only have their value count set at the end of each call to next(), as the values default to null.
    */
   private List<NullableIntVector> nullFilledVectors;
-  private List<ColumnReader<?>> columnReaders = new ArrayList<>();
-  private long numRecordsToRead; // number of records to read
+  private List<ColumnReader<?>> fixedLenColumnReaders = new ArrayList<>();
+  private final long totalNumRecordsToRead; // number of records to read
+
+  // counter for the values that have been read in this pass (a single call to the next() method)
+  private int valuesReadInCurrentBatch;
+
   /**
    * Keeps track of the number of records read thus far.
    * <p>
@@ -60,19 +68,29 @@ public class ReadState {
   private long totalRecordsRead;
   private boolean useAsyncColReader;
 
-  public ReadState(ParquetSchema schema, ParquetReaderStats parquetReaderStats, long numRecordsToRead, boolean useAsyncColReader) {
+  public ReadState(ParquetSchema schema,
+    RecordBatchSizerManager batchSizerMgr, ParquetReaderStats parquetReaderStats, long numRecordsToRead,
+    boolean useAsyncColReader) {
+
     this.schema = schema;
+    this.batchSizerMgr = batchSizerMgr;
     this.parquetReaderStats = parquetReaderStats;
     this.useAsyncColReader = useAsyncColReader;
     if (! schema.isStarQuery()) {
       nullFilledVectors = new ArrayList<>();
     }
+
+    // Because of JIRA DRILL-6528, the Parquet reader is sometimes getting the wrong
+    // number of rows to read. For now, returning all a file data (till
+    // downstream operator stop consuming).
+    numRecordsToRead = -1;
+
     // Callers can pass -1 if they want to read all rows.
     if (numRecordsToRead == ParquetRecordReader.NUM_RECORDS_TO_READ_NOT_SPECIFIED) {
-      this.numRecordsToRead = schema.getGroupRecordCount();
+      this.totalNumRecordsToRead = schema.getGroupRecordCount();
     } else {
       assert (numRecordsToRead >= 0);
-      this.numRecordsToRead = Math.min(numRecordsToRead, schema.getGroupRecordCount());
+      this.totalNumRecordsToRead = Math.min(numRecordsToRead, schema.getGroupRecordCount());
     }
   }
 
@@ -99,10 +117,10 @@ public class ReadState {
         // create a reader and add it to the appropriate list
         varLengthColumns.add(columnMetadata.makeVariableWidthReader(reader));
       } else if (columnMetadata.isRepeated()) {
-        varLengthColumns.add(columnMetadata.makeRepeatedFixedWidthReader(reader, schema.getRecordsPerBatch()));
+        varLengthColumns.add(columnMetadata.makeRepeatedFixedWidthReader(reader));
       }
       else {
-        columnReaders.add(columnMetadata.makeFixedWidthReader(reader, schema.getRecordsPerBatch()));
+        fixedLenColumnReaders.add(columnMetadata.makeFixedWidthReader(reader));
       }
     }
     varLengthReader = new VarLenBinaryReader(reader, varLengthColumns);
@@ -119,8 +137,8 @@ public class ReadState {
    */
 
   public ColumnReader<?> getFirstColumnReader() {
-    if (columnReaders.size() > 0) {
-      return columnReaders.get(0);
+    if (fixedLenColumnReaders.size() > 0) {
+      return fixedLenColumnReaders.get(0);
     }
     else if (varLengthReader.columns.size() > 0) {
       return varLengthReader.columns.get(0);
@@ -130,23 +148,48 @@ public class ReadState {
   }
 
   public void resetBatch() {
-    for (final ColumnReader<?> column : columnReaders) {
+    for (final ColumnReader<?> column : fixedLenColumnReaders) {
       column.valuesReadInCurrentPass = 0;
     }
     for (final VarLengthColumn<?> r : varLengthReader.columns) {
       r.valuesReadInCurrentPass = 0;
     }
+    setValuesReadInCurrentPass(0);
   }
 
   public ParquetSchema schema() { return schema; }
-  public List<ColumnReader<?>> getColumnReaders() { return columnReaders; }
+  public RecordBatchSizerManager batchSizerMgr() { return batchSizerMgr; }
+  public List<ColumnReader<?>> getFixedLenColumnReaders() { return fixedLenColumnReaders; }
   public long recordsRead() { return totalRecordsRead; }
   public VarLenBinaryReader varLengthReader() { return varLengthReader; }
-  public long getRecordsToRead() { return numRecordsToRead; }
+  public long getTotalRecordsToRead() { return totalNumRecordsToRead; }
   public boolean useAsyncColReader() { return useAsyncColReader; }
   public ParquetReaderStats parquetReaderStats() { return parquetReaderStats; }
 
   /**
+   * @return values read within the latest batch
+   */
+  public int getValuesReadInCurrentPass() {
+    return valuesReadInCurrentBatch;
+  }
+
+  /**
+   * @return remaining values to read
+   */
+  public int getRemainingValuesToRead() {
+    assert totalNumRecordsToRead >= totalRecordsRead;
+    return (int) (totalNumRecordsToRead - totalRecordsRead);
+  }
+
+  /**
+   * @param valuesReadInCurrentBatch the valuesReadInCurrentBatch to set
+   */
+  public void setValuesReadInCurrentPass(int valuesReadInCurrentBatch) {
+    this.valuesReadInCurrentBatch = valuesReadInCurrentBatch;
+  }
+
+
+  /**
    * When the SELECT clause references columns that do not exist in the Parquet
    * file, we don't issue an error; instead we simply make up a column and
    * fill it with nulls. This method does the work of null-filling the made-up
@@ -170,16 +213,15 @@ public class ReadState {
 
   public void updateCounts(int readCount) {
     totalRecordsRead += readCount;
-    numRecordsToRead -= readCount;
   }
 
   public void close() {
-    if (columnReaders != null) {
-      for (final ColumnReader<?> column : columnReaders) {
+    if (fixedLenColumnReaders != null) {
+      for (final ColumnReader<?> column : fixedLenColumnReaders) {
         column.clear();
       }
-      columnReaders.clear();
-      columnReaders = null;
+      fixedLenColumnReaders.clear();
+      fixedLenColumnReaders = null;
     }
     if (varLengthReader != null) {
       for (final VarLengthColumn<? extends ValueVector> r : varLengthReader.columns) {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenAbstractEntryReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenAbstractEntryReader.java
index f6fab66..3c9610a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenAbstractEntryReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenAbstractEntryReader.java
@@ -17,24 +17,20 @@
  */
 package org.apache.drill.exec.store.parquet.columnreaders;
 
+import io.netty.buffer.DrillBuf;
 import java.nio.ByteBuffer;
 
-import org.apache.drill.exec.store.parquet.columnreaders.VarLenColumnBulkInput.ColumnPrecisionInfo;
-import org.apache.drill.exec.store.parquet.columnreaders.VarLenColumnBulkInput.PageDataInfo;
-
-import io.netty.buffer.DrillBuf;
+import org.apache.drill.exec.store.parquet.columnreaders.VarLenColumnBulkInput.VarLenColumnBulkInputCallback;
 
 /** Abstract class for sub-classes implementing several algorithms for loading a Bulk Entry */
 abstract class VarLenAbstractEntryReader {
 
   /** byte buffer used for buffering page data */
   protected final ByteBuffer buffer;
-  /** Page Data Information */
-  protected final PageDataInfo pageInfo;
-  /** expected precision type: fixed or variable length */
-  protected final ColumnPrecisionInfo columnPrecInfo;
   /** Bulk entry */
   protected final VarLenColumnBulkEntry entry;
+  /** A callback to allow bulk readers interact with their container */
+  private final VarLenColumnBulkInputCallback containerCallback;
 
   /**
    * CTOR.
@@ -44,14 +40,12 @@ abstract class VarLenAbstractEntryReader {
    * @param entry reusable bulk entry object
    */
   VarLenAbstractEntryReader(ByteBuffer buffer,
-    PageDataInfo pageInfo,
-    ColumnPrecisionInfo columnPrecInfo,
-    VarLenColumnBulkEntry entry) {
+    VarLenColumnBulkEntry entry,
+    VarLenColumnBulkInputCallback containerCallback) {
 
     this.buffer = buffer;
-    this.pageInfo = pageInfo;
-    this.columnPrecInfo = columnPrecInfo;
     this.entry = entry;
+    this.containerCallback = containerCallback;
   }
 
   /**
@@ -61,53 +55,14 @@ abstract class VarLenAbstractEntryReader {
   abstract VarLenColumnBulkEntry getEntry(int valsToReadWithinPage);
 
   /**
-   * Indicates whether to use bulk processing
-   */
-  protected final boolean bulkProcess() {
-    return columnPrecInfo.bulkProcess;
-  }
-
-  /**
-   * Loads new data into the buffer if empty or the force flag is set.
-   *
-   * @param force flag to force loading new data into the buffer
-   */
-  protected final boolean load(boolean force) {
-
-    if (!force && buffer.hasRemaining()) {
-      return true; // NOOP
-    }
-
-    // We're here either because the buffer is empty or we want to force a new load operation.
-    // In the case of force, there might be unprocessed data (still in the buffer) which is fine
-    // since the caller updates the page data buffer's offset only for the data it has consumed; this
-    // means unread data will be loaded again but this time will be positioned in the beginning of the
-    // buffer. This can happen only for the last entry in the buffer when either of its length or value
-    // is incomplete.
-    buffer.clear();
-
-    final int bufferCapacity = VarLenBulkPageReader.BUFF_SZ;
-    final int remaining = remainingPageData();
-    final int toCopy = remaining > bufferCapacity ? bufferCapacity : remaining;
-
-    if (toCopy == 0) {
-      return false;
-    }
-
-    pageInfo.pageData.getBytes(pageInfo.pageDataOff, buffer.array(), buffer.position(), toCopy);
-
-    buffer.limit(toCopy);
-
-    // At this point the buffer position is 0 and its limit set to the number of bytes copied.
-
-    return true;
-  }
-
-  /**
-   * @return remaining data in current page
+   * @param newBitsMemory new "bits" memory size
+   * @param newOffsetsMemory new "offsets" memory size
+   * @param newDataMemory new "data" memory size
+   * @return true if the new payload ("bits", "offsets", "data") will trigger a constraint violation; false
+   *         otherwise
    */
-  protected final int remainingPageData() {
-    return pageInfo.pageDataLen - pageInfo.pageDataOff;
+  protected boolean batchMemoryConstraintsReached(int newBitsMemory, int newOffsetsMemory, int newDataMemory) {
+    return containerCallback.batchMemoryConstraintsReached(newBitsMemory, newOffsetsMemory, newDataMemory);
   }
 
   /**
@@ -115,7 +70,7 @@ abstract class VarLenAbstractEntryReader {
    * @param pos start position
    * @return an integer encoded as a low endian
    */
-  protected final int getInt(final byte[] buff, final int pos) {
+  static final int getInt(final byte[] buff, final int pos) {
     return DrillBuf.getInt(buff, pos);
   }
 
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenAbstractPageEntryReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenAbstractPageEntryReader.java
new file mode 100644
index 0000000..fecf1ce
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenAbstractPageEntryReader.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.parquet.columnreaders;
+
+import java.nio.ByteBuffer;
+import org.apache.drill.exec.store.parquet.columnreaders.VarLenColumnBulkInput.VarLenColumnBulkInputCallback;
+import org.apache.drill.exec.store.parquet.columnreaders.VarLenColumnBulkInput.ColumnPrecisionInfo;
+import org.apache.drill.exec.store.parquet.columnreaders.VarLenColumnBulkInput.PageDataInfo;
+
+/** Abstract class for sub-classes implementing several strategies for loading a Bulk Entry from a Parquet page */
+abstract class VarLenAbstractPageEntryReader extends VarLenAbstractEntryReader {
+
+  protected final PageDataInfo pageInfo;
+  /** expected precision type: fixed or variable length */
+  protected final ColumnPrecisionInfo columnPrecInfo;
+
+  /**
+   * CTOR.
+   * @param _buffer byte buffer for data buffering (within CPU cache)
+   * @param _pageInfo page being processed information
+   * @param _columnPrecInfo column precision information
+   * @param _entry reusable bulk entry object
+   */
+  VarLenAbstractPageEntryReader(ByteBuffer _buffer,
+    PageDataInfo _pageInfo,
+    ColumnPrecisionInfo _columnPrecInfo,
+    VarLenColumnBulkEntry _entry,
+    VarLenColumnBulkInputCallback _containerCallback) {
+
+    super(_buffer, _entry, _containerCallback);
+
+    this.pageInfo = _pageInfo;
+    this.columnPrecInfo    = _columnPrecInfo;
+  }
+
+  /**
+   * Indicates whether to use bulk processing
+   */
+  protected final boolean bulkProcess() {
+    return columnPrecInfo.bulkProcess;
+  }
+
+  /**
+   * Loads new data into the buffer if empty or the force flag is set.
+   *
+   * @param force flag to force loading new data into the buffer
+   */
+  protected final boolean load(boolean force) {
+
+    if (!force && buffer.hasRemaining()) {
+      return true; // NOOP
+    }
+
+    // We're here either because the buffer is empty or we want to force a new load operation.
+    // In the case of force, there might be unprocessed data (still in the buffer) which is fine
+    // since the caller updates the page data buffer's offset only for the data it has consumed; this
+    // means unread data will be loaded again but this time will be positioned in the beginning of the
+    // buffer. This can happen only for the last entry in the buffer when either of its length or value
+    // is incomplete.
+    buffer.clear();
+
+    int remaining = remainingPageData();
+    int bufferCapacity = buffer.capacity() - VarLenBulkPageReader.PADDING;
+    int toCopy = remaining > bufferCapacity ? bufferCapacity : remaining;
+
+    if (toCopy == 0) {
+      return false;
+    }
+
+    pageInfo.pageData.getBytes(pageInfo.pageDataOff, buffer.array(), buffer.position(), toCopy);
+
+    buffer.limit(toCopy);
+
+    // At this point the buffer position is 0 and its limit set to the number of bytes copied.
+
+    return true;
+  }
+
+  /**
+   * @return remaining data in current page
+   */
+  protected final int remainingPageData() {
+    return pageInfo.pageDataLen - pageInfo.pageDataOff;
+  }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenBinaryReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenBinaryReader.java
index 8156db1..7bdc33e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenBinaryReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenBinaryReader.java
@@ -19,39 +19,43 @@ package org.apache.drill.exec.store.parquet.columnreaders;
 
 import com.google.common.base.Stopwatch;
 import com.google.common.collect.Lists;
-import org.apache.drill.common.exceptions.DrillRuntimeException;
-import org.apache.drill.exec.vector.ValueVector;
-
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
+import org.apache.drill.common.exceptions.DrillRuntimeException;
+import org.apache.drill.exec.store.parquet.columnreaders.batchsizing.RecordBatchOverflow;
+import org.apache.drill.exec.store.parquet.columnreaders.batchsizing.RecordBatchOverflow.FieldOverflowDefinition;
+import org.apache.drill.exec.store.parquet.columnreaders.batchsizing.RecordBatchSizerManager;
+import org.apache.drill.exec.store.parquet.columnreaders.batchsizing.RecordBatchSizerManager.FieldOverflowState;
+import org.apache.drill.exec.store.parquet.columnreaders.batchsizing.RecordBatchSizerManager.FieldOverflowStateContainer;
+import org.apache.drill.exec.store.parquet.columnreaders.batchsizing.RecordBatchSizerManager.VarLenColumnBatchStats;
+import org.apache.drill.exec.vector.ValueVector;
 
+/** Class which handles reading a batch of rows from a set of variable columns */
 public class VarLenBinaryReader {
+  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(VarLenBinaryReader.class);
 
-  ParquetRecordReader parentReader;
+  final ParquetRecordReader parentReader;
+  final RecordBatchSizerManager batchSizer;
   final List<VarLengthColumn<? extends ValueVector>> columns;
+  /** Sorting columns to minimize overflow */
+  final List<VLColumnContainer> orderedColumns;
+  private final Comparator<VLColumnContainer> comparator = new VLColumnComparator();
   final boolean useAsyncTasks;
-  private final long targetRecordCount;
   private final boolean useBulkReader;
 
   public VarLenBinaryReader(ParquetRecordReader parentReader, List<VarLengthColumn<? extends ValueVector>> columns) {
     this.parentReader = parentReader;
+    this.batchSizer = parentReader.getBatchSizesMgr();
     this.columns = columns;
+    this.orderedColumns = populateOrderedColumns();
     this.useAsyncTasks = parentReader.useAsyncColReader;
     this.useBulkReader = parentReader.useBulkReader();
-
-    // Can't read any more records than fixed width fields will fit.
-    // Note: this calculation is very likely wrong; it is a simplified
-    // version of earlier code, but probably needs even more attention.
-
-    int totalFixedFieldWidth = parentReader.getBitWidthAllFixedFields() / 8;
-    if (totalFixedFieldWidth == 0) {
-      targetRecordCount = 0;
-    } else {
-      targetRecordCount = parentReader.getBatchSize() / totalFixedFieldWidth;
-    }
   }
 
   /**
@@ -70,11 +74,8 @@ public class VarLenBinaryReader {
     }
     Stopwatch timer = Stopwatch.createStarted();
 
-    // Can't read any more records than fixed width fields will fit.
-
-    if (targetRecordCount > 0) {
-      recordsToReadInThisPass = Math.min(recordsToReadInThisPass, targetRecordCount);
-    }
+    // Ensure we do not read more than batch record count
+    recordsToReadInThisPass = Math.min(recordsToReadInThisPass, batchSizer.getCurrentRecordsPerBatch());
 
     long recordsReadInCurrentPass = 0;
 
@@ -90,22 +91,161 @@ public class VarLenBinaryReader {
       recordsReadInCurrentPass = readRecordsInBulk((int) recordsToReadInThisPass);
     }
 
+    // Publish this information
+    parentReader.readState.setValuesReadInCurrentPass((int) recordsReadInCurrentPass);
+
+    // Update the stats
     parentReader.parquetReaderStats.timeVarColumnRead.addAndGet(timer.elapsed(TimeUnit.NANOSECONDS));
 
     return recordsReadInCurrentPass;
   }
 
   private int readRecordsInBulk(int recordsToReadInThisPass) throws IOException {
-    int recordsReadInCurrentPass = -1;
+    int batchNumRecords = recordsToReadInThisPass;
+    List<VarLenColumnBatchStats> columnStats = new ArrayList<VarLenColumnBatchStats>(columns.size());
+    int prevReadColumns = -1;
+    boolean overflowCondition = false;
+
+    for (VLColumnContainer columnContainer : orderedColumns) {
+      VarLengthColumn<?> columnReader = columnContainer.column;
+
+      // Read the column data
+      int readColumns = columnReader.readRecordsInBulk(batchNumRecords);
+      assert readColumns <= batchNumRecords : "Reader cannot return more values than requested..";
+
+      if (!overflowCondition) {
+        if (prevReadColumns >= 0 && prevReadColumns != readColumns) {
+          overflowCondition = true;
+        } else {
+          prevReadColumns = readColumns;
+        }
+      }
+
+      // Enqueue this column entry information to handle overflow conditions; we will not know
+      // whether an overflow happened till all variable length columns have been processed
+      columnStats.add(new VarLenColumnBatchStats(columnReader.valueVec, readColumns));
 
+      // Decrease the number of records to read when a column returns less records (minimize overflow)
+      if (batchNumRecords > readColumns) {
+        batchNumRecords = readColumns;
+        // it seems this column caused an overflow (higher layer will not ask for more values than remaining)
+        ++columnContainer.numCausedOverflows;
+      }
+    }
+
+    // Set the value-count for each column
     for (VarLengthColumn<?> columnReader : columns) {
-      int readColumns = columnReader.readRecordsInBulk(recordsToReadInThisPass);
-      assert (readColumns >= 0 && recordsReadInCurrentPass == readColumns || recordsReadInCurrentPass == -1);
+      columnReader.valuesReadInCurrentPass = batchNumRecords;
+    }
+
+    // Publish this batch statistics
+    publishBatchStats(columnStats, batchNumRecords);
 
-      recordsReadInCurrentPass = readColumns;
+    // Handle column(s) overflow if any
+    if (overflowCondition) {
+      handleColumnOverflow(columnStats, batchNumRecords);
     }
 
-    return recordsReadInCurrentPass;
+    return batchNumRecords;
+  }
+
+  private void handleColumnOverflow(List<VarLenColumnBatchStats> columnStats, int batchNumRecords) {
+    // Overflow would happen if a column returned more values than "batchValueCount"; this can happen
+    // when a column Ci is called first, returns num-values-i, and then another column cj is called which
+    // returns less values than num-values-i.
+    RecordBatchOverflow.Builder builder = null;
+
+    // We need to collect all columns which are subject to an overflow (except for the ones which are already
+    // returning values from previous batch overflow)
+    for (VarLenColumnBatchStats columnStat : columnStats) {
+      if (columnStat.numValuesRead > batchNumRecords) {
+        // We need to figure out whether this column was already returning values from a previous batch
+        // overflow; if it is, then this is a NOOP (as the overflow data is still available to be replayed)
+        if (fieldHasAlreadyOverflowData(columnStat.vector.getField().getName())) {
+          continue;
+        }
+
+        // We need to set the value-count as otherwise some size related vector APIs won't work
+        columnStat.vector.getMutator().setValueCount(batchNumRecords);
+
+        // Lazy initialization
+        if (builder == null) {
+          builder = RecordBatchOverflow.newBuilder(parentReader.getOperatorContext().getAllocator());
+        }
+
+        final int numOverflowValues = columnStat.numValuesRead - batchNumRecords;
+        builder.addFieldOverflow(columnStat.vector, batchNumRecords, numOverflowValues);
+      }
+    }
+
+    // Register batch overflow data with the record batch sizer manager (if any)
+    if (builder != null) {
+      Map<String, FieldOverflowStateContainer> overflowContainerMap = parentReader.batchSizerMgr.getFieldOverflowMap();
+      Map<String, FieldOverflowDefinition> overflowDefMap           = builder.build().getRecordOverflowDefinition().getFieldOverflowDefs();
+
+      for (Map.Entry<String, FieldOverflowDefinition> entry : overflowDefMap.entrySet()) {
+        FieldOverflowStateContainer overflowStateContainer = new FieldOverflowStateContainer(entry.getValue(), null);
+        // Register this overflow condition
+        overflowContainerMap.put(entry.getKey(), overflowStateContainer);
+      }
+    }
+
+    reorderVLColumns();
+  }
+
+  private void reorderVLColumns() {
+    // Finally, re-order the variable length columns since an overflow occurred
+    Collections.sort(orderedColumns, comparator);
+
+    if (logger.isDebugEnabled()) {
+      boolean isFirstValue    = true;
+      final StringBuilder msg = new StringBuilder(RecordBatchSizerManager.BATCH_STATS_PREFIX);
+      msg.append(": Dumping the variable length columns read order: ");
+
+      for (VLColumnContainer container : orderedColumns) {
+        if (!isFirstValue) {
+          msg.append(", ");
+        } else {
+          isFirstValue = false;
+        }
+        msg.append(container.column.valueVec.getField().getName());
+      }
+      msg.append('.');
+
+      logger.debug(msg.toString());
+    }
+  }
+
+  private boolean fieldHasAlreadyOverflowData(String field) {
+    FieldOverflowStateContainer container = parentReader.batchSizerMgr.getFieldOverflowContainer(field);
+
+    if (container == null) {
+      return false;
+    }
+
+    if (container.overflowState == null || container.overflowState.isOverflowDataFullyConsumed()) {
+      parentReader.batchSizerMgr.releaseFieldOverflowContainer(field);
+      return false;
+    }
+    return true;
+  }
+
+  private void publishBatchStats(List<VarLenColumnBatchStats> stats, int batchNumRecords) {
+    // First, let us inform the variable columns of the number of records returned by this batch; this
+    // is for managing overflow data state.
+    Map<String, FieldOverflowStateContainer> overflowMap =
+      parentReader.batchSizerMgr.getFieldOverflowMap();
+
+    for (FieldOverflowStateContainer container : overflowMap.values()) {
+      FieldOverflowState overflowState = container.overflowState;
+
+      if (overflowState != null) {
+        overflowState.onNewBatchValuesConsumed(batchNumRecords);
+      }
+    }
+
+    // Now publish the same to the record batch sizer manager
+    parentReader.batchSizerMgr.onEndOfBatch(batchNumRecords, stats);
   }
 
   private long determineSizesSerial(long recordsToReadInThisPass) throws IOException {
@@ -168,4 +308,51 @@ public class VarLenBinaryReader {
     throw new DrillRuntimeException(message, e);
   }
 
+  private List<VLColumnContainer> populateOrderedColumns() {
+    List<VLColumnContainer> result = new ArrayList<VLColumnContainer>(columns.size());
+
+    // first, we need to populate this list
+    for (VarLengthColumn<? extends ValueVector> column : columns) {
+      result.add(new VLColumnContainer(column));
+    }
+
+    // now perform the sorting
+    Collections.sort(result, comparator);
+
+    return result;
+  }
+
+// ----------------------------------------------------------------------------
+// Internal Data Structure
+// ----------------------------------------------------------------------------
+
+  /** Container class which will will allow us to implement ordering so to minimize overflow */
+  private static final class VLColumnContainer {
+    /** Variable length column */
+    private final VarLengthColumn<? extends ValueVector> column;
+    /** Number of times this method caused overflow */
+    private int numCausedOverflows;
+
+    /** Constructor */
+    private VLColumnContainer(VarLengthColumn<? extends ValueVector> column) {
+      this.column = column;
+    }
+  }
+
+  /** Comparator class to minimize overflow; columns with highest chance of causing overflow
+   * should be iterated first (have smallest value).
+   */
+  private static final class VLColumnComparator implements Comparator<VLColumnContainer> {
+    // columns which caused overflows, should execute earlier (lowest order)
+    @Override
+    public int compare(VLColumnContainer o1, VLColumnContainer o2) {
+      assert o1 != null && o2 != null;
+
+      if (o1.numCausedOverflows == o2.numCausedOverflows) { return 0; }
+      if (o1.numCausedOverflows < o2.numCausedOverflows)  { return 1; }
+
+      return -1;
+    }
+  }
+
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenBulkPageReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenBulkPageReader.java
index 385cb83..0e50406 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenBulkPageReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenBulkPageReader.java
@@ -25,7 +25,8 @@ import org.apache.drill.common.exceptions.DrillRuntimeException;
 import org.apache.drill.exec.store.parquet.columnreaders.VarLenColumnBulkInput.ColumnPrecisionInfo;
 import org.apache.drill.exec.store.parquet.columnreaders.VarLenColumnBulkInput.ColumnPrecisionType;
 import org.apache.drill.exec.store.parquet.columnreaders.VarLenColumnBulkInput.PageDataInfo;
-import org.apache.drill.exec.store.parquet.columnreaders.VarLenColumnBulkInput.VLColumnBulkInputCallback;
+import org.apache.drill.exec.store.parquet.columnreaders.VarLenColumnBulkInput.VarLenColumnBulkInputCallback;
+import org.apache.drill.exec.store.parquet.columnreaders.batchsizing.RecordBatchSizerManager.FieldOverflowStateContainer;
 
 /** Provides bulk reads when accessing Parquet's page payload for variable length columns */
 final class VarLenBulkPageReader {
@@ -47,7 +48,9 @@ final class VarLenBulkPageReader {
   /** Bulk entry */
   private final VarLenColumnBulkEntry entry;
   /** A callback to allow bulk readers interact with their container */
-  private final VLColumnBulkInputCallback containerCallback;
+  private final VarLenColumnBulkInputCallback containerCallback;
+  /** A reference to column's overflow data (could be null) */
+  private FieldOverflowStateContainer fieldOverflowStateContainer;
 
   // Various BulkEntry readers
   final VarLenAbstractEntryReader fixedReader;
@@ -57,10 +60,14 @@ final class VarLenBulkPageReader {
   final VarLenAbstractEntryReader dictionaryReader;
   final VarLenAbstractEntryReader nullableDictionaryReader;
 
+  // Overflow reader
+  private VarLenOverflowReader overflowReader;
+
   VarLenBulkPageReader(
     PageDataInfo pageInfoInput,
     ColumnPrecisionInfo columnPrecInfoInput,
-    VLColumnBulkInputCallback containerCallbackInput) {
+    VarLenColumnBulkInputCallback containerCallbackInput,
+    FieldOverflowStateContainer fieldOverflowStateContainer) {
 
     // Set the buffer to the native byte order
     this.buffer.order(ByteOrder.nativeOrder());
@@ -72,14 +79,22 @@ final class VarLenBulkPageReader {
     this.columnPrecInfo = columnPrecInfoInput;
     this.entry = new VarLenColumnBulkEntry(this.columnPrecInfo);
     this.containerCallback = containerCallbackInput;
+    this.fieldOverflowStateContainer = fieldOverflowStateContainer;
 
     // Initialize the Variable Length Entry Readers
-    fixedReader = new VarLenFixedEntryReader(buffer, pageInfo, columnPrecInfo, entry);
-    nullableFixedReader = new VarLenNullableFixedEntryReader(buffer, pageInfo, columnPrecInfo, entry);
-    variableLengthReader = new VarLenEntryReader(buffer, pageInfo, columnPrecInfo, entry);
-    nullableVLReader = new VarLenNullableEntryReader(buffer, pageInfo, columnPrecInfo, entry);
-    dictionaryReader = new VarLenEntryDictionaryReader(buffer, pageInfo, columnPrecInfo, entry);
-    nullableDictionaryReader = new VarLenNullableDictionaryReader(buffer, pageInfo, columnPrecInfo, entry);
+    fixedReader = new VarLenFixedEntryReader(buffer, pageInfo, columnPrecInfo, entry, containerCallback);
+    nullableFixedReader = new VarLenNullableFixedEntryReader(buffer, pageInfo, columnPrecInfo, entry, containerCallback);
+    variableLengthReader = new VarLenEntryReader(buffer, pageInfo, columnPrecInfo, entry, containerCallback);
+    nullableVLReader = new VarLenNullableEntryReader(buffer, pageInfo, columnPrecInfo, entry, containerCallback);
+    dictionaryReader = new VarLenEntryDictionaryReader(buffer, pageInfo, columnPrecInfo, entry, containerCallback);
+    nullableDictionaryReader = new VarLenNullableDictionaryReader(buffer, pageInfo, columnPrecInfo, entry, containerCallback);
+
+    // Overflow reader is initialized only when a previous batch produced overflow data for this column
+    if (this.fieldOverflowStateContainer == null) {
+      overflowReader = null;
+    } else {
+      overflowReader = new VarLenOverflowReader(buffer, entry, containerCallback, fieldOverflowStateContainer);
+    }
   }
 
   final void set(PageDataInfo pageInfoInput, boolean clear) {
@@ -91,13 +106,25 @@ final class VarLenBulkPageReader {
     pageInfo.dictionaryValueReader = pageInfoInput.dictionaryValueReader;
     pageInfo.numPageValues = pageInfoInput.numPageValues;
     if (clear) {
-      buffer.clear();
-    }
+    buffer.clear();
+  }
   }
 
   final VarLenColumnBulkEntry getEntry(int valuesToRead) {
     VarLenColumnBulkEntry entry = null;
 
+    // If there is overflow data, then we need to consume it first
+    if (overflowDataAvailable()) {
+      entry = overflowReader.getEntry(valuesToRead);
+      entry.setReadFromPage(false); // entry was read from the overflow data
+
+      return entry;
+    }
+
+    // It seems there is no overflow data anymore; if we previously were reading from it, then it
+    // needs to get de-initialized before reading new page data.
+    deinitOverflowDataIfNeeded();
+
     if (ColumnPrecisionType.isPrecTypeFixed(columnPrecInfo.columnPrecisionType)) {
       if ((entry = getFixedEntry(valuesToRead)) == null) {
         // The only reason for a null to be returned is when the "getFixedEntry" method discovers
@@ -118,14 +145,15 @@ final class VarLenBulkPageReader {
         }
 
         columnPrecInfo.columnPrecisionType = ColumnPrecisionType.DT_PRECISION_IS_VARIABLE;
-        entry = getVLEntry(valuesToRead);
+        entry = getVarLenEntry(valuesToRead);
       }
 
     } else {
-      entry = getVLEntry(valuesToRead);
+      entry = getVarLenEntry(valuesToRead);
     }
 
     if (entry != null) {
+      entry.setReadFromPage(true); // entry was read from a Parquet page
       pageInfo.numPageFieldsRead += entry.getNumValues();
     }
     return entry;
@@ -139,10 +167,9 @@ final class VarLenBulkPageReader {
     }
   }
 
-  private final VarLenColumnBulkEntry getVLEntry(int valuesToRead) {
-    if (pageInfo.dictionaryValueReader == null
-     || !pageInfo.dictionaryValueReader.isDefined()) {
-
+  private final VarLenColumnBulkEntry getVarLenEntry(int valuesToRead) {
+    // Let start with non-dictionary encoding as it is predominant
+    if (!pageInfo.dictionaryValueReader.isDefined()) {
       if (pageInfo.definitionLevels.hasDefinitionLevels()) {
         return nullableVLReader.getEntry(valuesToRead);
       } else {
@@ -157,4 +184,19 @@ final class VarLenBulkPageReader {
     }
   }
 
-}
+  private boolean overflowDataAvailable() {
+    if (overflowReader == null) {
+      return false;
+    }
+    return overflowReader.getRemainingOverflowData() > 0;
+  }
+
+  private void deinitOverflowDataIfNeeded() {
+    if (overflowReader != null) {
+      containerCallback.deinitOverflowData();
+      overflowReader = null;
+      fieldOverflowStateContainer = null;
+    }
+  }
+
+}
\ No newline at end of file
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenColumnBulkEntry.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenColumnBulkEntry.java
index e6158e1..bc77415 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenColumnBulkEntry.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenColumnBulkEntry.java
@@ -47,6 +47,8 @@ final class VarLenColumnBulkEntry implements VarLenBulkEntry {
   private boolean arrayBacked;
   /** indicator on whether the current data buffer is externally or internally owned */
   private boolean internalDataBuf;
+  /** indicates whether the entry was read from the overflow data or page data */
+  private boolean readFromPage;
 
   VarLenColumnBulkEntry(ColumnPrecisionInfo columnPrecInfo) {
     this(columnPrecInfo, VarLenBulkPageReader.BUFF_SZ);
@@ -169,4 +171,18 @@ final class VarLenColumnBulkEntry implements VarLenBulkEntry {
     return lengths.length;
   }
 
+  /**
+   * @return the readFromPage
+   */
+  boolean isReadFromPage() {
+    return readFromPage;
+  }
+
+  /**
+   * @param readFromPage the readFromPage to set
+   */
+  void setReadFromPage(boolean readFromPage) {
+    this.readFromPage = readFromPage;
+  }
+
 }
\ No newline at end of file
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenColumnBulkInput.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenColumnBulkInput.java
index 1b30737..4dbd482 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenColumnBulkInput.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenColumnBulkInput.java
@@ -22,6 +22,12 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.ByteOrder;
 import org.apache.drill.common.exceptions.DrillRuntimeException;
+import org.apache.drill.exec.store.parquet.columnreaders.VarLenOverflowReader.FieldOverflowStateImpl;
+import org.apache.drill.exec.store.parquet.columnreaders.batchsizing.BatchSizingMemoryUtil;
+import org.apache.drill.exec.store.parquet.columnreaders.batchsizing.BatchSizingMemoryUtil.ColumnMemoryUsageInfo;
+import org.apache.drill.exec.store.parquet.columnreaders.batchsizing.RecordBatchSizerManager;
+import org.apache.drill.exec.store.parquet.columnreaders.batchsizing.RecordBatchSizerManager.ColumnMemoryQuota;
+import org.apache.drill.exec.store.parquet.columnreaders.batchsizing.RecordBatchSizerManager.FieldOverflowStateContainer;
 import org.apache.drill.exec.vector.ValueVector;
 import org.apache.drill.exec.vector.VarLenBulkEntry;
 import org.apache.drill.exec.vector.VarLenBulkInput;
@@ -29,12 +35,16 @@ import org.apache.parquet.column.values.ValuesReader;
 import org.apache.parquet.io.api.Binary;
 
 /** Implements the {@link VarLenBulkInput} interface to optimize data copy */
-final class VarLenColumnBulkInput<V extends ValueVector> implements VarLenBulkInput<VarLenBulkEntry> {
+public final class VarLenColumnBulkInput<V extends ValueVector> implements VarLenBulkInput<VarLenBulkEntry> {
+  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(VarLenColumnBulkInput.class);
+
   /** A cut off number for bulk processing */
   private static final int BULK_PROCESSING_MAX_PREC_LEN = 1 << 10;
 
   /** parent object */
   private final VarLengthValuesColumn<V> parentInst;
+  /** Batch sizer manager */
+  private final RecordBatchSizerManager batchSizerMgr;
   /** Column precision type information (owner by caller) */
   private final ColumnPrecisionInfo columnPrecInfo;
   /** Custom definition level reader */
@@ -44,6 +54,10 @@ final class VarLenColumnBulkInput<V extends ValueVector> implements VarLenBulkIn
 
   /** The records to read */
   private final int recordsToRead;
+  /** Maximum Memory (in bytes) to use for loading this field's data
+   * (soft limit as a single row can go beyond this value)
+   */
+  private ColumnMemoryQuota columnMemoryQuota;
   /** Current operation bulk reader state */
   private final OprBulkReadState oprReadState;
   /** Container class for holding page data information */
@@ -51,7 +65,11 @@ final class VarLenColumnBulkInput<V extends ValueVector> implements VarLenBulkIn
   /** Buffered page payload */
   private VarLenBulkPageReader buffPagePayload;
   /** A callback to allow child readers interact with this class */
-  private final VLColumnBulkInputCallback callback;
+  private final VarLenColumnBulkInputCallback callback;
+  /** Column memory usage information */
+  private final ColumnMemoryUsageInfo columnMemoryUsage = new ColumnMemoryUsageInfo();
+  /** A reference to column's overflow data (could be null) */
+  private FieldOverflowStateContainer fieldOverflowStateContainer;
 
   /**
    * CTOR.
@@ -64,17 +82,19 @@ final class VarLenColumnBulkInput<V extends ValueVector> implements VarLenBulkIn
     int recordsToRead, BulkReaderState bulkReaderState) throws IOException {
 
     this.parentInst = parentInst;
+    this.batchSizerMgr = this.parentInst.parentReader.batchSizerMgr;
     this.recordsToRead = recordsToRead;
-    this.callback = new VLColumnBulkInputCallback(parentInst.pageReader);
+    this.callback = new VarLenColumnBulkInputCallback(this);
     this.columnPrecInfo = bulkReaderState.columnPrecInfo;
     this.custDefLevelReader = bulkReaderState.definitionLevelReader;
     this.custDictionaryReader = bulkReaderState.dictionaryReader;
+    this.fieldOverflowStateContainer = this.batchSizerMgr.getFieldOverflowContainer(parentInst.valueVec.getField().getName());
 
     // Load page if none have been read
     loadPageIfNeeed();
 
     // Create the internal READ_STATE object based on the current page-reader state
-    this.oprReadState = new OprBulkReadState(parentInst.pageReader.readyToReadPosInBytes, parentInst.pageReader.valuesRead, 0);
+    this.oprReadState = new OprBulkReadState(parentInst.pageReader.readyToReadPosInBytes, parentInst.pageReader.valuesRead);
 
     // Let's try to figure out whether this columns is fixed or variable length; this information
     // is not always accurate within the Parquet schema metadata.
@@ -90,26 +110,34 @@ final class VarLenColumnBulkInput<V extends ValueVector> implements VarLenBulkIn
   @Override
   public boolean hasNext() {
     try {
-      if (oprReadState.batchFieldIndex < recordsToRead) {
-        // We need to ensure there is a page of data to be read
-        if (!parentInst.pageReader.hasPage() || parentInst.pageReader.currentPageCount == oprReadState.numPageFieldsProcessed) {
-          long totalValueCount = parentInst.columnChunkMetaData.getValueCount();
-
-          if (totalValueCount == (parentInst.totalValuesRead + oprReadState.batchFieldIndex) || !parentInst.pageReader.next()) {
-            parentInst.hitRowGroupEnd();
-            return false;
+      if (!batchConstraintsReached()) {
+        // If there is overflow data, then proceed; otherwise, make sure there is still Parquet data to be
+        // read.
+        if (!overflowDataAvailable()) {
+          // We need to ensure there is a page of data to be read
+          if (!parentInst.pageReader.hasPage() || parentInst.pageReader.currentPageCount == oprReadState.numPageFieldsProcessed) {
+            long totalValueCount = parentInst.columnChunkMetaData.getValueCount();
+
+            if (totalValueCount == (parentInst.totalValuesRead + oprReadState.batchNumValuesReadFromPages)
+                || !parentInst.pageReader.next()) {
+
+              parentInst.hitRowGroupEnd();
+              return false;
+            }
+
+            // Reset the state object page read metadata
+            oprReadState.numPageFieldsProcessed = 0;
+            oprReadState.pageReadPos = parentInst.pageReader.readyToReadPosInBytes;
+
+            // Update the value readers information
+            setValuesReadersOnNewPage();
+
+            // Update the buffered-page-payload since we've read a new page
+            setBufferedPagePayload();
           }
-
-          // Reset the state object page read metadata
-          oprReadState.numPageFieldsProcessed = 0;
-          oprReadState.pageReadPos = parentInst.pageReader.readyToReadPosInBytes;
-
-          // Update the value readers information
-          setValuesReadersOnNewPage();
-
-          // Update the buffered-page-payload since we've read a new page
-          setBufferedPagePayload();
         }
+        // Alright, we didn't hit a batch constraint and are able to read either from the overflow data or
+        // the Parquet data.
         return true;
       } else {
         return false;
@@ -122,19 +150,22 @@ final class VarLenColumnBulkInput<V extends ValueVector> implements VarLenBulkIn
   /** {@inheritDoc} */
   @Override
   public final VarLenBulkEntry next() {
-    final int toReadRemaining = recordsToRead - oprReadState.batchFieldIndex;
-    final int pageRemaining = parentInst.pageReader.currentPageCount - oprReadState.numPageFieldsProcessed;
-    final int remaining = Math.min(toReadRemaining, pageRemaining);
-    final VarLenBulkEntry result = buffPagePayload.getEntry(remaining);
+    final int remaining = getRemainingRecords();
+    final VarLenColumnBulkEntry result = buffPagePayload.getEntry(remaining);
 
     // Update position for next read
-    if (result != null) {
-      // Page read position is meaningful only when dictionary mode is off
-      if (pageInfo.dictionaryValueReader == null
-       || !pageInfo.dictionaryValueReader.isDefined()) {
-        oprReadState.pageReadPos += (result.getTotalLength() + 4 * result.getNumNonNullValues());
+    if (result != null && result.getNumValues() > 0) {
+      // We need to update page stats only when we are reading directly from Parquet pages; there are
+      // situations where we have to return the overflow data (read in a previous batch)
+      if (result.isReadFromPage()) {
+        // Page read position is meaningful only when dictionary mode is off
+        if (!pageInfo.dictionaryValueReader.isDefined()) {
+          oprReadState.pageReadPos += (result.getTotalLength() + 4 * result.getNumNonNullValues());
+        }
+        oprReadState.numPageFieldsProcessed += result.getNumValues();
+        oprReadState.batchNumValuesReadFromPages += result.getNumValues();
       }
-      oprReadState.numPageFieldsProcessed += result.getNumValues();
+      // Update the batch field index
       oprReadState.batchFieldIndex += result.getNumValues();
     }
     return result;
@@ -160,11 +191,41 @@ final class VarLenColumnBulkInput<V extends ValueVector> implements VarLenBulkIn
 
     // Page read position is meaningful only when dictionary mode is off
     if (pageInfo.dictionaryValueReader == null
-     || !pageInfo.dictionaryValueReader.isDefined()) {
+        || !pageInfo.dictionaryValueReader.isDefined()) {
       parentInst.pageReader.readyToReadPosInBytes = oprReadState.pageReadPos;
     }
     parentInst.pageReader.valuesRead = oprReadState.numPageFieldsProcessed;
-    parentInst.totalValuesRead += oprReadState.batchFieldIndex;
+    parentInst.totalValuesRead += oprReadState.batchNumValuesReadFromPages;
+
+    if (logger.isDebugEnabled()) {
+      String message = String.format("requested=%d, returned=%d, total-returned=%d",
+        recordsToRead,
+        oprReadState.batchFieldIndex,
+        parentInst.totalValuesRead);
+
+      logger.debug(message);
+    }
+  }
+
+  /**
+   * @return minimum memory size required to process a variable column in a columnar manner
+   */
+  public static int getMinVLColumnMemorySize() {
+    // How did we come up with this number?
+    // Let's first lay down some facts
+    // a) the allocator-rounding-to-next-power-of-two has to be accounted for
+    // b) VL columns use up to three vectors "bits", "offsets", and "values"
+    // c) The maximum number of entries per chunk is chunk-size / 4
+    // d) "data" and "length" sizes within a chunk have a reverse relationship (if one grows, then the other shrinks)
+    // e) "data" and "length" within a chunk cannot exceed 1 chunk-size each when loaded into the ValueVector
+    // This information gives the following upper bound
+    // - max-chunk-vv-footprint < (chunk-size/4 + 1chunk-size + 1/2 chunk-size) < 2  chunk-sizes
+    // Why?
+    // - max-bits footprint is controlled by c)
+    // - "data" and "offsets" can have a max footprint of 1 chunk size when they are over chunk-size/2 since
+    //   roundup happens; if one goes beyond chunk-size/2, then the other is less than that (inverse relationship)
+    //   which leads to a maximum memory footprint of 1 chunk-size + 1/2 chunk-size
+    return VarLenBulkPageReader.BUFF_SZ * 2;
   }
 
   final int getReadBatchFields() {
@@ -201,14 +262,14 @@ final class VarLenColumnBulkInput<V extends ValueVector> implements VarLenBulkIn
       pageInfo.numPageFieldsRead = oprReadState.numPageFieldsProcessed;
 
       if (buffPagePayload == null) {
-        buffPagePayload = new VarLenBulkPageReader(pageInfo, columnPrecInfo, callback);
+        buffPagePayload = new VarLenBulkPageReader(pageInfo, columnPrecInfo, callback, fieldOverflowStateContainer);
 
       } else {
         buffPagePayload.set(pageInfo, true);
       }
     } else {
       if (buffPagePayload == null) {
-        buffPagePayload = new VarLenBulkPageReader(null, columnPrecInfo, callback);
+        buffPagePayload = new VarLenBulkPageReader(null, columnPrecInfo, callback, fieldOverflowStateContainer);
       }
     }
   }
@@ -311,6 +372,113 @@ final class VarLenColumnBulkInput<V extends ValueVector> implements VarLenBulkIn
     }
   }
 
+  private boolean batchConstraintsReached() {
+    // Let's update this column's memory quota
+    columnMemoryQuota = batchSizerMgr.getCurrentFieldBatchMemory(parentInst.valueVec.getField().getName());
+    assert columnMemoryQuota.getMaxMemoryUsage() > 0;
+
+    // Now try to figure out whether the next chunk will take us beyond the memory quota
+    final int maxNumRecordsInChunk = VarLenBulkPageReader.BUFF_SZ / BatchSizingMemoryUtil.INT_VALUE_WIDTH;
+
+    if (this.parentInst.valueVec.getField().isNullable()) {
+      return batchConstraintsReached(
+        maxNumRecordsInChunk * BatchSizingMemoryUtil.BYTE_VALUE_WIDTH, // max "bits"    space within a chunk
+        maxNumRecordsInChunk * BatchSizingMemoryUtil.INT_VALUE_WIDTH,  // max "offsets" space within a chunk
+        VarLenBulkPageReader.BUFF_SZ                                   // max "data"    space within a chunk
+      );
+
+    } else {
+      return batchConstraintsReached(
+        0,
+        maxNumRecordsInChunk * BatchSizingMemoryUtil.INT_VALUE_WIDTH,  // max "offsets" space within a chunk
+        VarLenBulkPageReader.BUFF_SZ                                   // max "data"    space within a chunk
+      );
+    }
+  }
+
+  private boolean batchConstraintsReached(int newBitsMemory, int newOffsetsMemory, int newDataMemory) {
+    assert oprReadState.batchFieldIndex <= recordsToRead; // cannot read beyond the batch size
+
+    // Did we reach the batch size limit?
+    if (oprReadState.batchFieldIndex == recordsToRead) {
+      return true; // batch size reached
+    }
+
+    // Memory constraint check logic:
+    // - if this is the first chunk to process, then let it proceed as we need to at least return
+    //   one row; this also means the minimum batch memory shouldn't be lower than 2 chunks (please refer
+    //   to getMinVLColumnMemorySize() method for more information).
+    //
+    // - Otherwise, we make sure that the memory growth after processing a chunk cannot go beyond the maximum
+    //   batch memory for this column
+    // - There is also a caveat that needs to be handled during processing:
+    //   o The page-bulk-reader will stop loading entries if it encounters a large value (doesn't fit within
+    //     the chunk)
+    //   o There is an exception though, which is if the entry is the first one within the batch (this is to
+    //     ensure that we always make progress)
+    //   o In this situation a callback to this object is made to assess whether this large entry can be loaded
+    //     into the ValueVector.
+
+    // Is this the first chunk to be processed?
+    if (oprReadState.batchFieldIndex == 0) {
+      return false; // we should process at least one chunk
+    }
+
+    // Is the next processed chunk going to cause memory to overflow beyond the allowed limit?
+    columnMemoryUsage.vector = parentInst.valueVec;
+    columnMemoryUsage.memoryQuota = columnMemoryQuota;
+    columnMemoryUsage.currValueCount = oprReadState.batchFieldIndex;
+
+    // Return true if we cannot add this new payload
+    return !BatchSizingMemoryUtil.canAddNewData(columnMemoryUsage, newBitsMemory, newOffsetsMemory, newDataMemory);
+  }
+
+  private int getRemainingRecords() {
+    // remaining records to return within this batch
+    final int toReadRemaining       = recordsToRead - oprReadState.batchFieldIndex;
+    final int remainingOverflowData = getRemainingOverflowData();
+    final int remaining;
+
+    // This method remainder semantic depends on whether we are dealing with page data or
+    // overflow data; now that overflow data is behaving like a source of input
+    if (remainingOverflowData == 0) {
+      final int pageRemaining = parentInst.pageReader.currentPageCount - oprReadState.numPageFieldsProcessed;
+      remaining               = Math.min(toReadRemaining, pageRemaining);
+
+    } else {
+      remaining = Math.min(toReadRemaining, remainingOverflowData);
+    }
+
+    return remaining;
+  }
+
+  private boolean overflowDataAvailable() {
+    return getRemainingOverflowData() > 0;
+  }
+
+  private int getRemainingOverflowData() {
+
+    if (fieldOverflowStateContainer != null) {
+      FieldOverflowStateImpl overflowState =
+        (FieldOverflowStateImpl) fieldOverflowStateContainer.overflowState;
+
+      if (overflowState != null) {
+        return overflowState.getRemainingOverflowData();
+      } else {
+        // This can happen if this is the first time we are accessing this container as
+        // the overflow reader didn't have the chance consume any overflow data yet.
+        return fieldOverflowStateContainer.overflowDef.numValues;
+      }
+    }
+    return 0;
+  }
+
+  private void deinitOverflowData() {
+    batchSizerMgr.releaseFieldOverflowContainer(parentInst.valueVec.getField().getName());
+
+    fieldOverflowStateContainer = null;
+  }
+
   // --------------------------------------------------------------------------
   // Inner Classes
   // --------------------------------------------------------------------------
@@ -377,11 +545,14 @@ final class VarLenColumnBulkInput<V extends ValueVector> implements VarLenBulkIn
     int numPageFieldsProcessed;
     /** field index within current batch */
     int batchFieldIndex;
+    /** number of values actually read from Parquet pages (not from overflow data) within this batch */
+    int batchNumValuesReadFromPages;
 
-      OprBulkReadState(long pageReadPos, int numPageFieldsRead, int batchFieldIndex) {
+      OprBulkReadState(long pageReadPos, int numPageFieldsRead) {
         this.pageReadPos = pageReadPos;
         this.numPageFieldsProcessed = numPageFieldsRead;
-        this.batchFieldIndex = batchFieldIndex;
+        this.batchFieldIndex = 0;
+        this.batchNumValuesReadFromPages = 0;
       }
   }
 
@@ -404,12 +575,15 @@ final class VarLenColumnBulkInput<V extends ValueVector> implements VarLenBulkIn
   }
 
   /** Callback to allow a bulk reader interact with its parent */
-  final static class VLColumnBulkInputCallback {
+  final static class VarLenColumnBulkInputCallback {
+    /** Parent instance */
+    final VarLenColumnBulkInput<? extends ValueVector> parentInst;
     /** Page reader object */
     PageReader pageReader;
 
-    VLColumnBulkInputCallback(PageReader _pageReader) {
-      this.pageReader = _pageReader;
+    VarLenColumnBulkInputCallback(VarLenColumnBulkInput<? extends ValueVector> parentInst) {
+      this.parentInst = parentInst;
+      this.pageReader = this.parentInst.parentInst.pageReader;
     }
 
     /**
@@ -428,6 +602,22 @@ final class VarLenColumnBulkInput<V extends ValueVector> implements VarLenBulkIn
     ValuesReader getDefinitionLevelsReader() {
       return pageReader.definitionLevels;
     }
+
+    /**
+     * @param newBitsMemory new "bits" memory size
+     * @param newOffsetsMemory new "offsets" memory size
+     * @param newDataMemory new "data" memory size
+     * @return true if the new payload ("bits", "offsets", "data") will trigger a constraint violation; false
+     *         otherwise
+     */
+    boolean batchMemoryConstraintsReached(int newBitsMemory, int newOffsetsMemory, int newDataMemory) {
+      return parentInst.batchConstraintsReached(newBitsMemory, newOffsetsMemory, newDataMemory);
+    }
+
+    /** Informs the parent the overflow data cannot be used anymore */
+    void deinitOverflowData() {
+      parentInst.deinitOverflowData();
+    }
   }
 
   /** A wrapper value reader with the ability to control when to read the next value */
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenEntryDictionaryReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenEntryDictionaryReader.java
index 0cd2416..8ba7ac4 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenEntryDictionaryReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenEntryDictionaryReader.java
@@ -21,17 +21,19 @@ import java.nio.ByteBuffer;
 import org.apache.drill.exec.store.parquet.columnreaders.VarLenColumnBulkInput.ColumnPrecisionInfo;
 import org.apache.drill.exec.store.parquet.columnreaders.VarLenColumnBulkInput.DictionaryReaderWrapper;
 import org.apache.drill.exec.store.parquet.columnreaders.VarLenColumnBulkInput.PageDataInfo;
+import org.apache.drill.exec.store.parquet.columnreaders.VarLenColumnBulkInput.VarLenColumnBulkInputCallback;
 import org.apache.parquet.io.api.Binary;
 
 /** Handles variable data types using a dictionary */
-final class VarLenEntryDictionaryReader extends VarLenAbstractEntryReader {
+final class VarLenEntryDictionaryReader extends VarLenAbstractPageEntryReader {
 
   VarLenEntryDictionaryReader(ByteBuffer buffer,
     PageDataInfo pageInfo,
     ColumnPrecisionInfo columnPrecInfo,
-    VarLenColumnBulkEntry entry) {
+    VarLenColumnBulkEntry entry,
+    VarLenColumnBulkInputCallback containerCallback) {
 
-    super(buffer, pageInfo, columnPrecInfo, entry);
+    super(buffer, pageInfo, columnPrecInfo, entry, containerCallback);
   }
 
   /** {@inheritDoc} */
@@ -92,6 +94,12 @@ final class VarLenEntryDictionaryReader extends VarLenAbstractEntryReader {
     final Binary currEntry = valueReader.getEntry();
     final int dataLen = currEntry.length();
 
+    // Is there enough memory to handle this large value?
+    if (batchMemoryConstraintsReached(0, 4, dataLen)) {
+      entry.set(0, 0, 0, 0); // no data to be consumed
+      return entry;
+    }
+
     // Set the value length
     valueLengths[0] = dataLen;
 
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenEntryReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenEntryReader.java
index e570b3e..d95050d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenEntryReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenEntryReader.java
@@ -21,16 +21,18 @@ import java.nio.ByteBuffer;
 import org.apache.drill.common.exceptions.DrillRuntimeException;
 import org.apache.drill.exec.store.parquet.columnreaders.VarLenColumnBulkInput.ColumnPrecisionInfo;
 import org.apache.drill.exec.store.parquet.columnreaders.VarLenColumnBulkInput.PageDataInfo;
+import org.apache.drill.exec.store.parquet.columnreaders.VarLenColumnBulkInput.VarLenColumnBulkInputCallback;
 
 /** Handles variable data types. */
-final class VarLenEntryReader extends VarLenAbstractEntryReader {
+final class VarLenEntryReader extends VarLenAbstractPageEntryReader {
 
   VarLenEntryReader(ByteBuffer buffer,
     PageDataInfo pageInfo,
     ColumnPrecisionInfo columnPrecInfo,
-    VarLenColumnBulkEntry entry) {
+    VarLenColumnBulkEntry entry,
+    VarLenColumnBulkInputCallback containerCallback) {
 
-    super(buffer, pageInfo, columnPrecInfo, entry);
+    super(buffer, pageInfo, columnPrecInfo, entry, containerCallback);
   }
 
   /** {@inheritDoc} */
@@ -64,23 +66,23 @@ final class VarLenEntryReader extends VarLenAbstractEntryReader {
         break;
       }
 
-      final int data_len = getInt(srcBuff, srcPos);
+      final int dataLen = getInt(srcBuff, srcPos);
       srcPos += 4;
 
-      if (srcLen < (srcPos + data_len)
-       || tgtLen < (tgtPos + data_len)) {
+      if (srcLen < (srcPos + dataLen)
+       || tgtLen < (tgtPos + dataLen)) {
 
         break;
       }
 
-      valueLengths[numValues++] = data_len;
+      valueLengths[numValues++] = dataLen;
 
-      if (data_len > 0) {
-        vlCopy(srcBuff, srcPos, tgtBuff, tgtPos, data_len);
+      if (dataLen > 0) {
+        vlCopy(srcBuff, srcPos, tgtBuff, tgtPos, dataLen);
 
         // Update the counters
-        srcPos += data_len;
-        tgtPos += data_len;
+        srcPos += dataLen;
+        tgtPos += dataLen;
       }
     }
 
@@ -119,6 +121,12 @@ final class VarLenEntryReader extends VarLenAbstractEntryReader {
       throw new DrillRuntimeException(message);
     }
 
+    // Is there enough memory to handle this large value?
+    if (batchMemoryConstraintsReached(0, 4, dataLen)) {
+      entry.set(0, 0, 0, 0); // no data to be consumed
+      return entry;
+    }
+
     // Register the length
     valueLengths[0] = dataLen;
 
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenFixedEntryReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenFixedEntryReader.java
index a69488b..e8dc15f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenFixedEntryReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenFixedEntryReader.java
@@ -20,16 +20,18 @@ package org.apache.drill.exec.store.parquet.columnreaders;
 import java.nio.ByteBuffer;
 import org.apache.drill.exec.store.parquet.columnreaders.VarLenColumnBulkInput.ColumnPrecisionInfo;
 import org.apache.drill.exec.store.parquet.columnreaders.VarLenColumnBulkInput.PageDataInfo;
+import org.apache.drill.exec.store.parquet.columnreaders.VarLenColumnBulkInput.VarLenColumnBulkInputCallback;
 
 /** Handles fixed data types that have been erroneously tagged as Variable Length. */
-final class VarLenFixedEntryReader extends VarLenAbstractEntryReader {
+final class VarLenFixedEntryReader extends VarLenAbstractPageEntryReader {
 
   VarLenFixedEntryReader(ByteBuffer buffer,
     PageDataInfo pageInfo,
     ColumnPrecisionInfo columnPrecInfo,
-    VarLenColumnBulkEntry entry) {
+    VarLenColumnBulkEntry entry,
+    VarLenColumnBulkInputCallback containerCallback) {
 
-    super(buffer, pageInfo, columnPrecInfo, entry);
+    super(buffer, pageInfo, columnPrecInfo, entry, containerCallback);
   }
 
   /** {@inheritDoc} */
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenNullableDictionaryReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenNullableDictionaryReader.java
index c95a108..f7b6dce 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenNullableDictionaryReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenNullableDictionaryReader.java
@@ -21,17 +21,19 @@ import java.nio.ByteBuffer;
 import org.apache.drill.exec.store.parquet.columnreaders.VarLenColumnBulkInput.ColumnPrecisionInfo;
 import org.apache.drill.exec.store.parquet.columnreaders.VarLenColumnBulkInput.DictionaryReaderWrapper;
 import org.apache.drill.exec.store.parquet.columnreaders.VarLenColumnBulkInput.PageDataInfo;
+import org.apache.drill.exec.store.parquet.columnreaders.VarLenColumnBulkInput.VarLenColumnBulkInputCallback;
 import org.apache.parquet.io.api.Binary;
 
 /** Handles nullable variable data types using a dictionary */
-final class VarLenNullableDictionaryReader extends VarLenAbstractEntryReader {
+final class VarLenNullableDictionaryReader extends VarLenAbstractPageEntryReader {
 
   VarLenNullableDictionaryReader(ByteBuffer buffer,
     PageDataInfo pageInfo,
     ColumnPrecisionInfo columnPrecInfo,
-    VarLenColumnBulkEntry entry) {
+    VarLenColumnBulkEntry entry,
+    VarLenColumnBulkInputCallback containerCallback) {
 
-    super(buffer, pageInfo, columnPrecInfo, entry);
+    super(buffer, pageInfo, columnPrecInfo, entry, containerCallback);
   }
 
   /** {@inheritDoc} */
@@ -102,15 +104,21 @@ final class VarLenNullableDictionaryReader extends VarLenAbstractEntryReader {
 
   private final VarLenColumnBulkEntry getEntrySingle(int valsToReadWithinPage) {
     final int[] valueLengths = entry.getValuesLength();
+    final DictionaryReaderWrapper valueReader = pageInfo.dictionaryValueReader;
 
     // Initialize the reader if needed
     pageInfo.definitionLevels.readFirstIntegerIfNeeded();
 
     if (pageInfo.definitionLevels.readCurrInteger() == 1) {
-      final DictionaryReaderWrapper valueReader = pageInfo.dictionaryValueReader;
       final Binary currEntry = valueReader.getEntry();
       final int dataLen = currEntry.length();
 
+      // Is there enough memory to handle this large value?
+      if (batchMemoryConstraintsReached(1, 4, dataLen)) {
+        entry.set(0, 0, 0, 0); // no data to be consumed
+        return entry;
+      }
+
       // Set the value length
       valueLengths[0] = dataLen;
 
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenNullableEntryReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenNullableEntryReader.java
index 7a172bb..7ffb27a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenNullableEntryReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenNullableEntryReader.java
@@ -21,16 +21,18 @@ import java.nio.ByteBuffer;
 import org.apache.drill.common.exceptions.DrillRuntimeException;
 import org.apache.drill.exec.store.parquet.columnreaders.VarLenColumnBulkInput.ColumnPrecisionInfo;
 import org.apache.drill.exec.store.parquet.columnreaders.VarLenColumnBulkInput.PageDataInfo;
+import org.apache.drill.exec.store.parquet.columnreaders.VarLenColumnBulkInput.VarLenColumnBulkInputCallback;
 
 /** Handles variable data types. */
-final class VarLenNullableEntryReader extends VarLenAbstractEntryReader {
+final class VarLenNullableEntryReader extends VarLenAbstractPageEntryReader {
 
   VarLenNullableEntryReader(ByteBuffer buffer,
       PageDataInfo pageInfo,
       ColumnPrecisionInfo columnPrecInfo,
-      VarLenColumnBulkEntry entry) {
+      VarLenColumnBulkEntry entry,
+      VarLenColumnBulkInputCallback containerCallback) {
 
-    super(buffer, pageInfo, columnPrecInfo, entry);
+    super(buffer, pageInfo, columnPrecInfo, entry, containerCallback);
   }
 
   /** {@inheritDoc} */
@@ -142,6 +144,12 @@ final class VarLenNullableEntryReader extends VarLenAbstractEntryReader {
         throw new DrillRuntimeException(message);
       }
 
+      // Is there enough memory to handle this large value?
+      if (batchMemoryConstraintsReached(1, 4, dataLen)) {
+        entry.set(0, 0, 0, 0); // no data to be consumed
+        return entry;
+      }
+
       // Register the length
       valueLengths[0] = dataLen;
 
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenNullableFixedEntryReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenNullableFixedEntryReader.java
index c776934..98089fd 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenNullableFixedEntryReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenNullableFixedEntryReader.java
@@ -20,17 +20,19 @@ package org.apache.drill.exec.store.parquet.columnreaders;
 import java.nio.ByteBuffer;
 import org.apache.drill.exec.store.parquet.columnreaders.VarLenColumnBulkInput.ColumnPrecisionInfo;
 import org.apache.drill.exec.store.parquet.columnreaders.VarLenColumnBulkInput.PageDataInfo;
+import org.apache.drill.exec.store.parquet.columnreaders.VarLenColumnBulkInput.VarLenColumnBulkInputCallback;
 import org.apache.parquet.column.values.ValuesReader;
 
 /** Handles nullable fixed data types that have been erroneously tagged as Variable Length. */
-final class VarLenNullableFixedEntryReader extends VarLenAbstractEntryReader {
+final class VarLenNullableFixedEntryReader extends VarLenAbstractPageEntryReader {
 
   VarLenNullableFixedEntryReader(ByteBuffer buffer,
     PageDataInfo pageInfo,
     ColumnPrecisionInfo columnPrecInfo,
-    VarLenColumnBulkEntry entry) {
+    VarLenColumnBulkEntry entry,
+    VarLenColumnBulkInputCallback containerCallback) {
 
-    super(buffer, pageInfo, columnPrecInfo, entry);
+    super(buffer, pageInfo, columnPrecInfo, entry, containerCallback);
   }
 
   /** {@inheritDoc} */
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenOverflowReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenOverflowReader.java
new file mode 100644
index 0000000..cacd5c8
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenOverflowReader.java
@@ -0,0 +1,372 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.parquet.columnreaders;
+
+import java.nio.ByteBuffer;
+
+import org.apache.drill.exec.store.parquet.columnreaders.VarLenColumnBulkInput.VarLenColumnBulkInputCallback;
+import org.apache.drill.exec.store.parquet.columnreaders.batchsizing.BatchSizingMemoryUtil;
+import org.apache.drill.exec.store.parquet.columnreaders.batchsizing.RecordBatchOverflow.FieldOverflowDefinition;
+import org.apache.drill.exec.store.parquet.columnreaders.batchsizing.RecordBatchSizerManager.FieldOverflowState;
+import org.apache.drill.exec.store.parquet.columnreaders.batchsizing.RecordBatchSizerManager.FieldOverflowStateContainer;
+
+/**
+ * This class is responsible for processing serialized overflow data (generated in a previous batch); this way
+ * overflow data becomes an input source and is thus a) efficiently re-loaded into the current
+ * batch ValueVector and b) subjected to the same batching constraints rules.
+ */
+public final class VarLenOverflowReader extends VarLenAbstractEntryReader {
+  private final FieldOverflowStateContainer fieldOverflowContainer;
+  private final boolean isNullable;
+  private final FieldOverflowStateImpl overflowState;
+
+  /**
+   * CTOR.
+   * @param buffer byte buffer for data buffering (within CPU cache)
+   * @param pageInfo page being processed information
+   * @param columnPrecInfo column precision information
+   * @param entry reusable bulk entry object
+   */
+  VarLenOverflowReader(ByteBuffer buffer,
+    VarLenColumnBulkEntry entry,
+    VarLenColumnBulkInputCallback containerCallback,
+    FieldOverflowStateContainer fieldOverflowContainer) {
+
+    super(buffer, entry, containerCallback);
+
+    this.fieldOverflowContainer = fieldOverflowContainer;
+    this.isNullable = fieldOverflowContainer.overflowDef.field.isNullable();
+
+    // Initialize the overflow state object
+    initOverflowStateIfNeeded();
+
+    // By now the overflow state object should be initialized
+    overflowState = (FieldOverflowStateImpl) fieldOverflowContainer.overflowState;
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  VarLenColumnBulkEntry getEntry(int valuesToRead) {
+
+    if (getRemainingOverflowData() == 0) {
+      return null; // overflow data fully committed
+    }
+
+    final int[] valueLengths = entry.getValuesLength();
+    final FieldOverflowDefinition overflowDef = fieldOverflowContainer.overflowDef;
+    final OverflowDataCache overflowDataCache = overflowState.overflowDataCache;
+    final int maxDataSize = VarLenBulkPageReader.BUFF_SZ;
+
+    // Flush the cache across batches
+    if (overflowState.currValueIdx == overflowState.numCommittedValues) {
+      overflowDataCache.flush();
+    }
+
+    // load some overflow data for processing
+    final int maxValues = Math.min(entry.getMaxEntries(), valuesToRead);
+    final int numAvailableValues = overflowDataCache.load(overflowState.currValueIdx, maxValues);
+    final int firstValueDataOffset = getDataBufferStartOffset() + adjustDataOffset(overflowState.currValueIdx);
+    int totalDataLen = 0;
+    int currValueIdx = overflowState.currValueIdx;
+    int idx = 0;
+    int numNulls = 0;
+
+    for ( ; idx < numAvailableValues; idx++, currValueIdx++) {
+      // Is this value defined?
+      if (!isNullable || overflowDataCache.getNullable(currValueIdx) == 1) {
+        final int dataLen = overflowDataCache.getDataLength(currValueIdx);
+
+        if ((totalDataLen + dataLen) > maxDataSize) {
+          break;
+        }
+
+        totalDataLen += dataLen;
+        valueLengths[idx] = dataLen;
+
+      } else {
+        valueLengths[idx] = -1;
+        ++numNulls;
+      }
+    }
+
+    // We encountered a large value or no overflow data; need special handling
+    if (idx == 0) {
+      final int dataLen = overflowDataCache.getDataLength(currValueIdx);
+      return handleLargeEntry(maxDataSize, firstValueDataOffset, dataLen);
+    }
+
+    // Update the next overflow value index to be processed
+    overflowState.currValueIdx = currValueIdx;
+
+    // Now set the bulk entry
+    entry.set(firstValueDataOffset, totalDataLen, idx, idx - numNulls, overflowDef.buffer);
+
+    return entry;
+  }
+
+  /**
+   * @return remaining overflow data (total-overflow-data - (committed + returned-within-current-batch))
+   */
+  int getRemainingOverflowData() {
+    return overflowState.getRemainingOverflowData();
+  }
+
+  private VarLenColumnBulkEntry handleLargeEntry(int maxDataSize, int firstValueDataOffset, int totalDataLen) {
+
+    final FieldOverflowDefinition overflowDef  = fieldOverflowContainer.overflowDef;
+    final FieldOverflowStateImpl overflowState = (FieldOverflowStateImpl) fieldOverflowContainer.overflowState;
+    final int[] valueLengths = entry.getValuesLength();
+
+    // Is there enough memory to handle this large value?
+    if (batchMemoryConstraintsReached(isNullable ? 1 : 0, 4, totalDataLen)) {
+      entry.set(0, 0, 0, 0); // no data to be consumed
+      return entry;
+    }
+
+    // Register the length
+    valueLengths[0] = totalDataLen;
+
+    // We already have all the information we need
+    entry.set(firstValueDataOffset, totalDataLen, 1, 1, overflowDef.buffer);
+
+    // Update the current value index
+    overflowState.currValueIdx++;
+    return entry;
+  }
+
+  void initOverflowStateIfNeeded() {
+    // An overflow happened in the previous batch; this is the first time we are trying to
+    // consume this overflow data (in some cases, several batches are needed if somehow the
+    // number-of-records-per-batch becomes small).
+    if (fieldOverflowContainer.overflowState == null) {
+      fieldOverflowContainer.overflowState = new FieldOverflowStateImpl(buffer, fieldOverflowContainer.overflowDef);
+    }
+  }
+
+  private int adjustDataOffset(int valueIdx) {
+    // The overflow definition stores offsets without adjustment (the offsets still refer to the original
+    // buffer). We need to perform a minor transformation to compute the correct offset within the new buffer:
+    // adjusted-offset(value-idx) = offset(value-i) - offset(value-0)
+    int firstOffset, targetOffset;
+
+    if (!isNullable) {
+      firstOffset = fieldOverflowContainer.overflowDef.buffer.getInt(0);
+      targetOffset = fieldOverflowContainer.overflowDef.buffer.getInt(valueIdx * 4);
+
+    } else {
+      final int numOverflowValues = fieldOverflowContainer.overflowDef.numValues;
+      firstOffset = fieldOverflowContainer.overflowDef.buffer.getInt(numOverflowValues);
+      targetOffset = fieldOverflowContainer.overflowDef.buffer.getInt(numOverflowValues + valueIdx * 4);
+    }
+
+    return targetOffset - firstOffset;
+  }
+
+  private int getDataBufferStartOffset() {
+    if (!isNullable) {
+      // <num-values+1 offsets><data>
+      return (fieldOverflowContainer.overflowDef.numValues + 1) * 4;
+
+    } else {
+      // <num-values nullable bytes><num-values+1 offsets><data>
+      return fieldOverflowContainer.overflowDef.numValues + (fieldOverflowContainer.overflowDef.numValues + 1) * 4;
+    }
+  }
+
+// ----------------------------------------------------------------------------
+// Inner Data Structure
+// ----------------------------------------------------------------------------
+
+  /** Allows overflow reader to maintain overflow data state */
+  final static class FieldOverflowStateImpl implements FieldOverflowState {
+    /**
+     * The number of overflow values consumed by previous batches; this means that if a new overflow
+     * happens, then we should return uncommitted values (un-consumed)
+     */
+    private int numCommittedValues;
+    /** Next value index to be processed */
+    private int currValueIdx;
+    /** A heap cache to accelerate loading of overflow data into bulk entries */
+    private final OverflowDataCache overflowDataCache;
+
+    private FieldOverflowStateImpl(ByteBuffer buffer, FieldOverflowDefinition overflowDef) {
+      overflowDataCache = new OverflowDataCache(buffer, overflowDef);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public void onNewBatchValuesConsumed(int numValues) {
+      if (numCommittedValues < overflowDataCache.overflowDef.numValues) {
+        numCommittedValues += numValues;
+        currValueIdx = numCommittedValues;
+      }
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public boolean isOverflowDataFullyConsumed() {
+      return numCommittedValues == overflowDataCache.overflowDef.numValues;
+    }
+
+    /**
+     * @return remaining overflow data (total-overflow-data - (committed + returned-within-current-batch))
+     */
+    int getRemainingOverflowData() {
+      assert currValueIdx <= overflowDataCache.overflowDef.numValues;
+
+      return overflowDataCache.overflowDef.numValues - currValueIdx;
+    }
+
+  }
+
+  /** Enable us to reuse cached overflow data across calls */
+  final static class OverflowDataCache {
+    /** Cache format: [<nullvalue>*][<offset>*]; the last "-1" is to account for the extra offset to retrieve
+     * since each value requires two offsets  */
+    private static final int MAX_NUM_VALUES = (VarLenBulkPageReader.BUFF_SZ / (BatchSizingMemoryUtil.INT_VALUE_WIDTH + 1)) - 1;
+
+    /** Byte buffer for CPU caching */
+    private final byte[] bufferArray;
+    /** Overflow definition */
+    private final FieldOverflowDefinition overflowDef;
+    /** Whether this column is optional */
+    final boolean isNullable;
+    /** start index of the first cached overflow data */
+    private int firstCachedValueIdx;
+    /** number of cached values */
+    private int numCachedValues;
+
+    private OverflowDataCache(ByteBuffer buffer, FieldOverflowDefinition overflowDef) {
+      this.bufferArray = buffer.array();
+      this.overflowDef = overflowDef;
+      this.isNullable = this.overflowDef.field.isNullable();
+      this.firstCachedValueIdx = -1;
+      this.numCachedValues = -1;
+    }
+
+    private void flush() {
+      this.firstCachedValueIdx = -1;
+      this.numCachedValues = -1;
+    }
+
+    private int load(int currentValueIdx, int maxValues) {
+      if (currentValueIdx >= overflowDef.numValues) {
+        throw new RuntimeException();
+      }
+      assert currentValueIdx < overflowDef.numValues;
+
+      if (numCachedValues > 0
+       && currentValueIdx >= lowerBound()
+       && currentValueIdx <= upperBound()) {
+
+        return upperBound() - currentValueIdx + 1;
+      }
+
+      // Either cache is empty or the requested values are not in the cache
+      loadInternal(currentValueIdx, maxValues);
+
+      return numCachedValues;
+    }
+
+    /**
+     * @return the lowest overflow value index in the cache
+     */
+    private int lowerBound() {
+      return firstCachedValueIdx;
+    }
+
+    /**
+     * @return the highest overflow value index in the cache
+     */
+    private int upperBound() {
+      return firstCachedValueIdx + numCachedValues - 1;
+    }
+
+    private byte getNullable(int valueIdx) {
+      assert isNullable;
+      assert valueIdx >= lowerBound();
+      assert valueIdx <= upperBound();
+
+      // We need to map the overflow value index to the buffer array representation
+      final int cacheIdx = (valueIdx - lowerBound()) * BatchSizingMemoryUtil.BYTE_VALUE_WIDTH;
+      return bufferArray[cacheIdx];
+    }
+
+    private int getDataLength(int valueIdx) {
+      assert valueIdx >= lowerBound();
+      assert valueIdx <= upperBound();
+
+      // We need to map the overflow value index to the buffer array representation
+      int cacheIdx1, cacheIdx2;
+
+      if (!isNullable) {
+        cacheIdx1 = (valueIdx - lowerBound()) * BatchSizingMemoryUtil.INT_VALUE_WIDTH;
+      } else {
+        cacheIdx1 = numCachedValues +
+          (valueIdx - lowerBound()) * BatchSizingMemoryUtil.INT_VALUE_WIDTH;
+      }
+      cacheIdx2 = cacheIdx1 + BatchSizingMemoryUtil.INT_VALUE_WIDTH;
+
+      return VarLenOverflowReader.getInt(bufferArray, cacheIdx2) - VarLenOverflowReader.getInt(bufferArray, cacheIdx1);
+    }
+
+    private void loadInternal(int targetIdx, int maxValues) {
+      // We need to load a new batch of overflow data (bits and offsets)
+      firstCachedValueIdx = targetIdx;
+
+      final int remaining = remaining();
+      assert remaining > 0;
+
+      final int maxValuesToLoad = Math.min(MAX_NUM_VALUES, maxValues);
+      numCachedValues           = Math.min(remaining, maxValuesToLoad);
+
+      // Let us load the nullable & offsets data (the actual data doesn't have to be loaded)
+      loadNullable();
+      loadOffsets();
+    }
+
+    void loadNullable() {
+      if (!isNullable) {
+        return; // NOOP
+      }
+
+      overflowDef.buffer.getBytes(firstCachedValueIdx, bufferArray, 0, numCachedValues);
+    }
+
+    void loadOffsets() {
+      int sourceIdx, targetIdx;
+
+      if (!isNullable) {
+        sourceIdx = firstCachedValueIdx * BatchSizingMemoryUtil.INT_VALUE_WIDTH;
+        targetIdx = 0;
+
+      } else {
+        sourceIdx = overflowDef.numValues + (firstCachedValueIdx * BatchSizingMemoryUtil.INT_VALUE_WIDTH);
+        targetIdx = numCachedValues;
+      }
+
+      // We always get one extra value as to compute the length of value-i we need offset-i and offset-i+1
+      overflowDef.buffer.getBytes(sourceIdx, bufferArray, targetIdx, (numCachedValues + 1) * BatchSizingMemoryUtil.INT_VALUE_WIDTH);
+    }
+
+    private int remaining() {
+      return overflowDef.numValues - firstCachedValueIdx;
+    }
+  }
+
+}
\ No newline at end of file
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLengthColumn.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLengthColumn.java
index 16ccb85..c5e5bfd 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLengthColumn.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLengthColumn.java
@@ -32,10 +32,10 @@ public abstract class VarLengthColumn<V extends ValueVector> extends ColumnReade
 
   Binary currDictVal;
 
-  VarLengthColumn(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor,
+  VarLengthColumn(ParquetRecordReader parentReader, ColumnDescriptor descriptor,
                   ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, V v,
                   SchemaElement schemaElement) throws ExecutionSetupException {
-    super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
+    super(parentReader, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
       if (columnChunkMetaData.getEncodings().contains(Encoding.PLAIN_DICTIONARY)) {
         usingDictionary = true;
       }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLengthColumnReaders.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLengthColumnReaders.java
index 17453fd..f4b9fe6 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLengthColumnReaders.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLengthColumnReaders.java
@@ -41,10 +41,10 @@ public final class VarLengthColumnReaders {
     protected VarDecimalVector varDecimalVector;
     protected VarDecimalVector.Mutator mutator;
 
-    VarDecimalColumn(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor,
+    VarDecimalColumn(ParquetRecordReader parentReader, ColumnDescriptor descriptor,
                     ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, VarDecimalVector v,
                     SchemaElement schemaElement) throws ExecutionSetupException {
-      super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
+      super(parentReader, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
       this.varDecimalVector = v;
       this.mutator = v.getMutator();
     }
@@ -87,10 +87,10 @@ public final class VarLengthColumnReaders {
     protected NullableVarDecimalVector nullableVarDecimalVector;
     protected NullableVarDecimalVector.Mutator mutator;
 
-    NullableVarDecimalColumn(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor,
+    NullableVarDecimalColumn(ParquetRecordReader parentReader, ColumnDescriptor descriptor,
                             ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, NullableVarDecimalVector v,
                             SchemaElement schemaElement) throws ExecutionSetupException {
-      super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
+      super(parentReader, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
       nullableVarDecimalVector = v;
       this.mutator = v.getMutator();
     }
@@ -133,10 +133,10 @@ public final class VarLengthColumnReaders {
     private final VarCharVector.Mutator mutator;
     private final VarCharVector varCharVector;
 
-    VarCharColumn(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor,
+    VarCharColumn(ParquetRecordReader parentReader, ColumnDescriptor descriptor,
                   ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, VarCharVector v,
                   SchemaElement schemaElement) throws ExecutionSetupException {
-      super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
+      super(parentReader, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
       this.varCharVector = v;
       this.mutator       = v.getMutator();
     }
@@ -181,10 +181,10 @@ public final class VarLengthColumnReaders {
     protected final NullableVarCharVector.Mutator mutator;
     private final NullableVarCharVector vector;
 
-    NullableVarCharColumn(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor,
+    NullableVarCharColumn(ParquetRecordReader parentReader, ColumnDescriptor descriptor,
                           ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, NullableVarCharVector v,
                           SchemaElement schemaElement) throws ExecutionSetupException {
-      super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
+      super(parentReader, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
       this.vector  = v;
       this.mutator = vector.getMutator();
     }
@@ -228,10 +228,10 @@ public final class VarLengthColumnReaders {
     private final VarBinaryVector varBinaryVector;
     private final VarBinaryVector.Mutator mutator;
 
-    VarBinaryColumn(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor,
+    VarBinaryColumn(ParquetRecordReader parentReader, ColumnDescriptor descriptor,
                     ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, VarBinaryVector v,
                     SchemaElement schemaElement) throws ExecutionSetupException {
-      super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
+      super(parentReader, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
 
       this.varBinaryVector = v;
       this.mutator         = v.getMutator();
@@ -277,10 +277,10 @@ public final class VarLengthColumnReaders {
     private final NullableVarBinaryVector nullableVarBinaryVector;
     private final NullableVarBinaryVector.Mutator mutator;
 
-    NullableVarBinaryColumn(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor,
+    NullableVarBinaryColumn(ParquetRecordReader parentReader, ColumnDescriptor descriptor,
                             ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, NullableVarBinaryVector v,
                             SchemaElement schemaElement) throws ExecutionSetupException {
-      super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
+      super(parentReader, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
       this.nullableVarBinaryVector = v;
       this.mutator                 = v.getMutator();
     }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLengthValuesColumn.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLengthValuesColumn.java
index 4dc59af..5212ed5 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLengthValuesColumn.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLengthValuesColumn.java
@@ -44,11 +44,11 @@ public abstract class VarLengthValuesColumn<V extends ValueVector> extends VarLe
   /** Bulk read operation state that needs to be maintained across batch calls */
   protected final BulkReaderState bulkReaderState = new BulkReaderState();
 
-  VarLengthValuesColumn(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor,
+  VarLengthValuesColumn(ParquetRecordReader parentReader, ColumnDescriptor descriptor,
                         ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, V v,
                         SchemaElement schemaElement) throws ExecutionSetupException {
 
-    super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
+    super(parentReader, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
     variableWidthVector = (VariableWidthVector) valueVec;
 
     if (columnChunkMetaData.getEncodings().contains(Encoding.PLAIN_DICTIONARY)) {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/batchsizing/BatchOverflowOptimizer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/batchsizing/BatchOverflowOptimizer.java
new file mode 100644
index 0000000..ca8fc05
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/batchsizing/BatchOverflowOptimizer.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.parquet.columnreaders.batchsizing;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.drill.common.map.CaseInsensitiveMap;
+import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.store.parquet.columnreaders.ParquetColumnMetadata;
+import org.apache.drill.exec.store.parquet.columnreaders.batchsizing.BatchSizingMemoryUtil.VectorMemoryUsageInfo;
+import org.apache.drill.exec.store.parquet.columnreaders.batchsizing.RecordBatchSizerManager.ColumnMemoryInfo;
+import org.apache.drill.exec.store.parquet.columnreaders.batchsizing.RecordBatchSizerManager.VarLenColumnBatchStats;
+
+/**
+ * A class which uses aggregate statistics to minimize overflow (in terms
+ * of size and occurrence).
+ */
+final class BatchOverflowOptimizer {
+  /** A threshold to trigger column memory redistribution based on latest statistics */
+  private static final int MAX_NUM_OVERFLOWS = 3;
+
+  /** Field to column memory information map */
+  private final Map<String, ColumnMemoryInfo> columnMemoryInfoMap;
+  /** Stats for each VL column */
+  private final Map<String, ColumnPrecisionStats> columnStatsMap = CaseInsensitiveMap.newHashMap();
+  /** Number of batches */
+  private int numBatches;
+  /** Number of overflows */
+  private int numOverflows;
+  /** An indicator to track whether a column precision has changed */
+  boolean columnPrecisionChanged;
+
+  BatchOverflowOptimizer(Map<String, ColumnMemoryInfo> columnMemoryInfoMap) {
+    this.columnMemoryInfoMap = columnMemoryInfoMap;
+  }
+
+  void setup() {
+    for (ColumnMemoryInfo columnInfo : columnMemoryInfoMap.values()) {
+      final ParquetColumnMetadata columnMeta = columnInfo.columnMeta;
+
+      if (!columnMeta.isFixedLength()) {
+        columnStatsMap.put(
+          columnMeta.getField().getName(),
+          new ColumnPrecisionStats(columnMeta.getField())
+        );
+      }
+    }
+  }
+
+  boolean onEndOfBatch(int batchNumRecords, List<VarLenColumnBatchStats> batchStats) {
+
+    if (batchNumRecords == 0) {
+      return false; // NOOP
+    }
+
+    ++numBatches; // increment the number of batches
+
+    // Indicator that an overflow happened
+    boolean overflow = false;
+
+    // Reusable container to compute a VL column's amount of data within
+    // this batch.
+    final VectorMemoryUsageInfo vectorMemoryUsage = new VectorMemoryUsageInfo();
+
+    for (VarLenColumnBatchStats stat : batchStats) {
+      final String columnName                   = stat.vector.getField().getName();
+      ColumnPrecisionStats columnPrecisionStats = columnStatsMap.get(columnName);
+      assert columnPrecisionStats != null;
+
+      if (stat.numValuesRead > batchNumRecords) {
+        overflow = true;
+      }
+
+      // Compute this column data usage in the last batch; note that we
+      // do not account for null values as we are interested in the
+      // actual data that is being stored within a batch.
+      BatchSizingMemoryUtil.getMemoryUsage(stat.vector, stat.numValuesRead, vectorMemoryUsage);
+      final int batchColumnPrecision = Math.max(1, vectorMemoryUsage.dataBytesUsed / stat.numValuesRead);
+
+      double currAvgPrecision = columnPrecisionStats.avgPrecision;
+      double newAvgPrecision  = ((numBatches - 1) * currAvgPrecision + batchColumnPrecision) / numBatches;
+
+      if (newAvgPrecision > currAvgPrecision) {
+        columnPrecisionStats.avgPrecision = (int) Math.ceil(newAvgPrecision);
+        columnPrecisionChanged            = true;
+      }
+    }
+
+    if (overflow) {
+      ++numOverflows;
+    }
+
+    if (numBatches == 1 // In the first batch, we only used defaults; we need to update
+     || (columnPrecisionChanged && numOverflows >= MAX_NUM_OVERFLOWS) // better stats and overflow occurred
+    ) {
+
+      for (ColumnPrecisionStats columnPrecisionStats : columnStatsMap.values()) {
+        ColumnMemoryInfo columnInfo = columnMemoryInfoMap.get(columnPrecisionStats.field.getName());
+        assert columnInfo != null;
+
+        // update the precision
+        columnInfo.columnPrecision   = columnPrecisionStats.avgPrecision;
+        columnInfo.columnMemoryQuota.reset();
+      }
+
+      // Reset some tracking counters
+      numOverflows           = 0;
+      columnPrecisionChanged = false;
+
+      return true;
+    }
+
+    return false; // NOOP
+  }
+
+// ----------------------------------------------------------------------------
+// Inner Classes
+// ----------------------------------------------------------------------------
+
+  /** Container class which computes the average variable column precision */
+  private static final class ColumnPrecisionStats {
+    /** Materialized field */
+    private final MaterializedField field;
+    /** Average column precision */
+    private int avgPrecision;
+
+    private ColumnPrecisionStats(MaterializedField field) {
+      this.field = field;
+    }
+  }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/batchsizing/BatchSizingMemoryUtil.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/batchsizing/BatchSizingMemoryUtil.java
new file mode 100644
index 0000000..302d0eb
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/batchsizing/BatchSizingMemoryUtil.java
@@ -0,0 +1,329 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.parquet.columnreaders.batchsizing;
+
+import org.apache.drill.common.types.TypeProtos.MajorType;
+import org.apache.drill.exec.expr.TypeHelper;
+import org.apache.drill.exec.memory.BaseAllocator;
+import org.apache.drill.exec.store.parquet.columnreaders.ParquetColumnMetadata;
+import org.apache.drill.exec.store.parquet.columnreaders.batchsizing.RecordBatchSizerManager.ColumnMemoryQuota;
+import org.apache.drill.exec.vector.NullableVarBinaryVector;
+import org.apache.drill.exec.vector.NullableVarCharVector;
+import org.apache.drill.exec.vector.NullableVarDecimalVector;
+import org.apache.drill.exec.vector.UInt1Vector;
+import org.apache.drill.exec.vector.UInt4Vector;
+import org.apache.drill.exec.vector.ValueVector;
+import org.apache.drill.exec.vector.VarBinaryVector;
+import org.apache.drill.exec.vector.VarCharVector;
+import org.apache.drill.exec.vector.VarDecimalVector;
+import org.apache.drill.exec.vector.VariableWidthVector;
+
+/** Helper class to assist the Flat Parquet reader build batches which adhere to memory sizing constraints */
+public final class BatchSizingMemoryUtil {
+
+  /** BYTE in-memory width */
+  public static final int BYTE_VALUE_WIDTH = UInt1Vector.VALUE_WIDTH;
+  /** INT in-memory width */
+  public static final int INT_VALUE_WIDTH  = UInt4Vector.VALUE_WIDTH;
+  /** Default variable length column average precision;
+   * computed in such a way that 64k values will fit within one MB to minimize internal fragmentation
+   */
+  public static final int DEFAULT_VL_COLUMN_AVG_PRECISION = 16;
+
+  /**
+   * This method will also load detailed information about this column's current memory usage (with regard
+   * to the value vectors).
+   *
+   * @param columnMemoryUsage container which contains column's memory usage information (usage information will
+   *        be automatically updated by this method)
+   * @param newBitsMemory New nullable data which might be inserted when processing a new input chunk
+   * @param newOffsetsMemory New offsets data which might be inserted when processing a new input chunk
+   * @param newDataMemory New data which might be inserted when processing a new input chunk
+   *
+   * @return true if adding the new data will not lead this column's Value Vector go beyond the allowed
+   *         limit; false otherwise
+   */
+  public static boolean canAddNewData(ColumnMemoryUsageInfo columnMemoryUsage,
+    int newBitsMemory,
+    int newOffsetsMemory,
+    int newDataMemory) {
+
+    // First we need to update the vector memory usage
+    final VectorMemoryUsageInfo vectorMemoryUsage = columnMemoryUsage.vectorMemoryUsage;
+    getMemoryUsage(columnMemoryUsage.vector, columnMemoryUsage.currValueCount, vectorMemoryUsage);
+
+    // We need to compute the new ValueVector memory usage if we attempt to add the new payload
+    // usedCapacity, int newPayload, int currentCapacity
+    int totalBitsMemory = computeNewVectorCapacity(vectorMemoryUsage.bitsBytesUsed,
+      newBitsMemory,
+      vectorMemoryUsage.bitsBytesCapacity);
+
+    int totalOffsetsMemory = computeNewVectorCapacity(vectorMemoryUsage.offsetsBytesUsed,
+      newOffsetsMemory,
+      vectorMemoryUsage.offsetsByteCapacity);
+
+    int totalDataMemory = computeNewVectorCapacity(vectorMemoryUsage.dataBytesUsed,
+      newDataMemory,
+      vectorMemoryUsage.dataByteCapacity);
+
+    // Alright now we can figure out whether the new payload will take us over the maximum memory threshold
+    int totalMemory = totalBitsMemory + totalOffsetsMemory + totalDataMemory;
+    assert totalMemory >= 0;
+
+    return totalMemory <= columnMemoryUsage.memoryQuota.getMaxMemoryUsage();
+  }
+
+  /**
+   * Load memory usage information for a variable length value vector
+   *
+   * @param vector source value vector
+   * @param currValueCount current value count
+   * @param vectorMemory result object which contains source vector memory usage information
+   */
+  public static void getMemoryUsage(ValueVector sourceVector,
+    int currValueCount,
+    VectorMemoryUsageInfo vectorMemoryUsage) {
+
+    assert sourceVector instanceof VariableWidthVector;
+
+    vectorMemoryUsage.reset(); // reset result container
+
+    final MajorType type = sourceVector.getField().getType();
+
+    switch (type.getMinorType()) {
+    case VARCHAR: {
+      switch (type.getMode()) {
+        case REQUIRED: {
+          VarCharVector vector = (VarCharVector) sourceVector;
+          vectorMemoryUsage.offsetsByteCapacity = vector.getOffsetVector().getValueCapacity() * INT_VALUE_WIDTH;
+          vectorMemoryUsage.dataByteCapacity = vector.getByteCapacity();
+          vectorMemoryUsage.offsetsBytesUsed = vector.getOffsetVector().getPayloadByteCount(currValueCount);
+          vectorMemoryUsage.dataBytesUsed = vector.getPayloadByteCount(currValueCount) - vectorMemoryUsage.offsetsBytesUsed;
+          break;
+        }
+        case OPTIONAL: {
+          NullableVarCharVector vector = (NullableVarCharVector) sourceVector;
+          VarCharVector values = vector.getValuesVector();
+          vectorMemoryUsage.bitsBytesCapacity = vector.getBitsValueCapacity();
+          vectorMemoryUsage.offsetsByteCapacity = values.getOffsetVector().getValueCapacity() * INT_VALUE_WIDTH;
+          vectorMemoryUsage.dataByteCapacity = values.getByteCapacity();
+          vectorMemoryUsage.bitsBytesUsed = currValueCount * BYTE_VALUE_WIDTH;
+          vectorMemoryUsage.offsetsBytesUsed = values.getOffsetVector().getPayloadByteCount(currValueCount);
+          vectorMemoryUsage.dataBytesUsed = values.getPayloadByteCount(currValueCount) - vectorMemoryUsage.offsetsBytesUsed;
+          break;
+        }
+
+        default : throw new IllegalArgumentException("Mode [" + type.getMode().name() + "] not supported..");
+      }
+      break;
+    }
+
+    case VARBINARY: {
+      switch (type.getMode()) {
+        case REQUIRED: {
+          VarBinaryVector vector = (VarBinaryVector) sourceVector;
+          vectorMemoryUsage.offsetsByteCapacity = vector.getOffsetVector().getValueCapacity() * INT_VALUE_WIDTH;
+          vectorMemoryUsage.dataByteCapacity = vector.getByteCapacity();
+          vectorMemoryUsage.offsetsBytesUsed = vector.getOffsetVector().getPayloadByteCount(currValueCount);
+          vectorMemoryUsage.dataBytesUsed    = vector.getPayloadByteCount(currValueCount) - vectorMemoryUsage.offsetsBytesUsed;
+          break;
+        }
+        case OPTIONAL: {
+          NullableVarBinaryVector vector = (NullableVarBinaryVector) sourceVector;
+          VarBinaryVector values = vector.getValuesVector();
+          vectorMemoryUsage.bitsBytesCapacity = vector.getBitsValueCapacity();
+          vectorMemoryUsage.offsetsByteCapacity = values.getOffsetVector().getValueCapacity() * INT_VALUE_WIDTH;
+          vectorMemoryUsage.dataByteCapacity = values.getByteCapacity();
+          vectorMemoryUsage.bitsBytesUsed = currValueCount * BYTE_VALUE_WIDTH;
+          vectorMemoryUsage.offsetsBytesUsed = values.getOffsetVector().getPayloadByteCount(currValueCount);
+          vectorMemoryUsage.dataBytesUsed = values.getPayloadByteCount(currValueCount) - vectorMemoryUsage.offsetsBytesUsed;
+          break;
+        }
+
+        default : throw new IllegalArgumentException("Mode [" + type.getMode().name() + "] not supported..");
+      }
+      break;
+    }
+
+    case VARDECIMAL: {
+      switch (type.getMode()) {
+        case REQUIRED: {
+          VarDecimalVector vector = (VarDecimalVector) sourceVector;
+          vectorMemoryUsage.offsetsByteCapacity = vector.getOffsetVector().getValueCapacity() * INT_VALUE_WIDTH;
+          vectorMemoryUsage.dataByteCapacity = vector.getByteCapacity();
+          vectorMemoryUsage.offsetsBytesUsed = vector.getOffsetVector().getPayloadByteCount(currValueCount);
+          vectorMemoryUsage.dataBytesUsed = vector.getPayloadByteCount(currValueCount) - vectorMemoryUsage.offsetsBytesUsed;
+          break;
+        }
+        case OPTIONAL: {
+          NullableVarDecimalVector vector = (NullableVarDecimalVector) sourceVector;
+          VarDecimalVector values = vector.getValuesVector();
+          vectorMemoryUsage.bitsBytesCapacity = vector.getBitsValueCapacity();
+          vectorMemoryUsage.offsetsByteCapacity = values.getOffsetVector().getValueCapacity() * INT_VALUE_WIDTH;
+          vectorMemoryUsage.dataByteCapacity = values.getByteCapacity();
+          vectorMemoryUsage.bitsBytesUsed = currValueCount * BYTE_VALUE_WIDTH;
+          vectorMemoryUsage.offsetsBytesUsed = values.getOffsetVector().getPayloadByteCount(currValueCount);
+          vectorMemoryUsage.dataBytesUsed = values.getPayloadByteCount(currValueCount) - vectorMemoryUsage.offsetsBytesUsed;
+          break;
+        }
+
+        default : throw new IllegalArgumentException("Mode [" + type.getMode().name() + "] not supported..");
+      }
+      break;
+    }
+
+    default : throw new IllegalArgumentException("Type [" + type.getMinorType().name() + "] not supported..");
+    } // End of minor-type-switch-statement
+
+    assert vectorMemoryUsage.bitsBytesCapacity >= 0;
+    assert vectorMemoryUsage.bitsBytesUsed >= 0;
+    assert vectorMemoryUsage.offsetsByteCapacity >= 0;
+    assert vectorMemoryUsage.offsetsBytesUsed >= 0;
+    assert vectorMemoryUsage.dataByteCapacity >= 0;
+    assert vectorMemoryUsage.dataBytesUsed >= 0;
+
+  }
+
+  /**
+   * @param column fixed column's metadata
+   * @return column byte precision
+   */
+  public static int getFixedColumnTypePrecision(ParquetColumnMetadata column) {
+    assert column.isFixedLength();
+
+    return TypeHelper.getSize(column.getField().getType());
+  }
+
+  /**
+   * This method will return a default value for variable columns; it aims at minimizing internal fragmentation.
+   * <p><b>Note</b> that the {@link TypeHelper} uses a large default value which might not be always appropriate.
+   *
+   * @param column fixed column's metadata
+   * @return column byte precision
+   */
+  public static int getAvgVariableLengthColumnTypePrecision(ParquetColumnMetadata column) {
+    assert !column.isFixedLength();
+
+    return DEFAULT_VL_COLUMN_AVG_PRECISION;
+  }
+
+  /**
+   * @param fixed column's metadata
+   * @param valueCount number of column values
+   * @return memory size required to store "valueCount" within a value vector
+   */
+  public static int computeFixedLengthVectorMemory(ParquetColumnMetadata column, int valueCount) {
+    assert column.isFixedLength();
+
+    // Formula:  memory-usage = next-power-of-two(byte-size * valueCount)  // nullable storage (if any)
+    //         + next-power-of-two(DT_LEN * valueCount)                    // data storage
+
+    int memoryUsage = BaseAllocator.nextPowerOfTwo(getFixedColumnTypePrecision(column) * valueCount);
+
+    if (column.getField().isNullable()) {
+      memoryUsage += BaseAllocator.nextPowerOfTwo(BYTE_VALUE_WIDTH * valueCount);
+    }
+
+    return memoryUsage;
+  }
+
+  /**
+   * @param variable length column's metadata
+   * @param averagePrecision VL column average precision
+   * @param valueCount number of column values
+   * @return memory size required to store "valueCount" within a value vector
+   */
+  public static int computeVariableLengthVectorMemory(ParquetColumnMetadata column,
+    int averagePrecision, int valueCount) {
+
+    assert !column.isFixedLength();
+
+    // Formula:  memory-usage = next-power-of-two(byte-size * valueCount)  // nullable storage (if any)
+    //         + next-power-of-two(int-size * valueCount)                  // offsets storage
+    //         + next-power-of-two(DT_LEN * valueCount)                    // data storage
+    int memoryUsage = BaseAllocator.nextPowerOfTwo(averagePrecision * valueCount);
+    memoryUsage += BaseAllocator.nextPowerOfTwo(INT_VALUE_WIDTH * (valueCount + 1));
+
+    if (column.getField().isNullable()) {
+      memoryUsage += BaseAllocator.nextPowerOfTwo(valueCount);
+    }
+    return memoryUsage;
+  }
+
+// ----------------------------------------------------------------------------
+// Internal implementation
+// ----------------------------------------------------------------------------
+
+  private static int computeNewVectorCapacity(int usedCapacity, int newPayload, int currentCapacity) {
+    int newUsedCapacity = BaseAllocator.nextPowerOfTwo(usedCapacity + newPayload);
+    assert newUsedCapacity >= 0;
+
+    return Math.max(currentCapacity, newUsedCapacity);
+  }
+
+// ----------------------------------------------------------------------------
+// Inner data structure
+// ----------------------------------------------------------------------------
+
+  /**
+   * A container class to hold a column batch memory usage information.
+   */
+  public static final class ColumnMemoryUsageInfo {
+    /** Value vector which contains the column batch data */
+    public ValueVector vector;
+    /** Column memory quota */
+    public ColumnMemoryQuota memoryQuota;
+    /** Current record count stored within the value vector */
+    public int currValueCount;
+    /** Current vector memory usage */
+    public final VectorMemoryUsageInfo vectorMemoryUsage = new VectorMemoryUsageInfo();
+  }
+
+  /** Container class which holds memory usage information about a variable length {@link ValueVector};
+   * all values are in bytes.
+   */
+  public static final class VectorMemoryUsageInfo {
+    /** Bits vector capacity */
+    public int bitsBytesCapacity;
+    /** Offsets vector capacity */
+    public int offsetsByteCapacity;
+    /** Data vector capacity */
+    public int dataByteCapacity;
+    /** Bits vector used up capacity */
+    public int bitsBytesUsed;
+    /** Offsets vector used up capacity */
+    public int offsetsBytesUsed;
+    /** Data vector used up capacity */
+    public int dataBytesUsed;
+
+    public void reset() {
+      bitsBytesCapacity = 0;
+      offsetsByteCapacity = 0;
+      dataByteCapacity = 0;
+      bitsBytesUsed = 0;
+      offsetsBytesUsed = 0;
+      dataBytesUsed = 0;
+    }
+  }
+
+  /** Disabling object instantiation */
+  private BatchSizingMemoryUtil() {
+    // NOOP
+  }
+
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/batchsizing/OverflowSerDeUtil.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/batchsizing/OverflowSerDeUtil.java
new file mode 100644
index 0000000..c542803
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/batchsizing/OverflowSerDeUtil.java
@@ -0,0 +1,366 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.parquet.columnreaders.batchsizing;
+
+import io.netty.buffer.DrillBuf;
+import java.util.List;
+import java.util.Map;
+import org.apache.drill.common.map.CaseInsensitiveMap;
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.store.parquet.columnreaders.batchsizing.RecordBatchOverflow.FieldOverflowDefinition;
+import org.apache.drill.exec.store.parquet.columnreaders.batchsizing.RecordBatchOverflow.FieldOverflowEntry;
+import org.apache.drill.exec.store.parquet.columnreaders.batchsizing.RecordBatchOverflow.RecordOverflowContainer;
+import org.apache.drill.exec.store.parquet.columnreaders.batchsizing.RecordBatchOverflow.RecordOverflowDefinition;
+import org.apache.drill.exec.vector.UInt1Vector;
+import org.apache.drill.exec.vector.UInt4Vector;
+
+/**
+ * Field overflow SERDE utility; note that overflow data is serialized as a way to minimize
+ * memory usage. This information is deserialized back to ValueVectors when it is needed in
+ * the next batch.
+ *
+ * <p><b>NOTE -</b>We use a specialized implementation for overflow SERDE (instead of reusing
+ * existing ones) because of the following reasons:
+ * <ul>
+ * <li>We want to only serialize a subset of the VV data
+ * <li>Other SERDE methods will not copy the data contiguously and instead rely on the
+ *     RPC layer to write the drill buffers in the correct order so that they are
+ *     de-serialized as a single contiguous drill buffer
+ * </ul>
+ */
+final class OverflowSerDeUtil {
+  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(OverflowSerDeUtil.class);
+
+  /**
+   * Serializes a collection of overflow fields into a memory buffer:
+   * <ul>
+   * <li>Serialization logic can handle a subset of values (should be contiguous)
+   * <li>Serialized data is copied into a single DrillBuf
+   * <li>Currently, only variable length data is supported
+   * </ul>
+   *
+   * @param fieldOverflowEntries input collection of field overflow entries
+   * @param allocator buffer allocator
+   * @return record overflow container; null if the input buffer is empty
+   */
+  static RecordOverflowContainer serialize(List<FieldOverflowEntry> fieldOverflowEntries,
+    BufferAllocator allocator) {
+
+    if (fieldOverflowEntries == null || fieldOverflowEntries.isEmpty()) {
+      return null;
+    }
+
+    // We need to:
+    // - Construct a map of VLVectorSerDe for each overflow field
+    // - Compute the total space required for efficient serialization of all overflow data
+    final Map<String, VLVectorSerializer> fieldSerDeMap = CaseInsensitiveMap.newHashMap();
+    int bufferLength = 0;
+
+    for (FieldOverflowEntry fieldOverflowEntry : fieldOverflowEntries) {
+      final VLVectorSerializer fieldVLSerDe = new VLVectorSerializer(fieldOverflowEntry);
+      fieldSerDeMap.put(fieldOverflowEntry.vector.getField().getName(), fieldVLSerDe);
+
+      bufferLength += fieldVLSerDe.getBytesUsed(fieldOverflowEntry.firstValueIdx, fieldOverflowEntry.numValues);
+    }
+    assert bufferLength >= 0;
+
+    // Allocate the required memory to serialize the overflow fields
+    final DrillBuf buffer = allocator.buffer(bufferLength);
+
+    if (logger.isDebugEnabled()) {
+      logger.debug(String.format("Allocated a buffer of length %d to handle overflow", bufferLength));
+    }
+
+    // Create the result object
+    final RecordOverflowContainer recordOverflowContainer = new RecordOverflowContainer();
+    final RecordOverflowDefinition recordOverflowDef = recordOverflowContainer.recordOverflowDef;
+
+    // Now serialize field overflow into the drill buffer
+    int bufferOffset = 0;
+    FieldSerializerContainer fieldSerializerContainer = new FieldSerializerContainer();
+
+    for (FieldOverflowEntry fieldOverflowEntry : fieldOverflowEntries) {
+      fieldSerializerContainer.clear();
+
+      // Serialize the field overflow data into the buffer
+      VLVectorSerializer fieldSerDe = fieldSerDeMap.get(fieldOverflowEntry.vector.getField().getName());
+      assert fieldSerDe != null;
+
+      fieldSerDe.copyValueVector(fieldOverflowEntry.firstValueIdx,
+        fieldOverflowEntry.numValues,
+        buffer,
+        bufferOffset,
+        fieldSerializerContainer);
+
+      // Create a view DrillBuf for isolating this field overflow data
+      DrillBuf fieldBuffer = buffer.slice(bufferOffset, fieldSerializerContainer.totalByteLength);
+      fieldBuffer.retain(1); // Increase the reference count
+      fieldBuffer.writerIndex(fieldSerializerContainer.totalByteLength);
+
+      // Enqueue a field overflow definition object for the current field
+      FieldOverflowDefinition fieldOverflowDef = new FieldOverflowDefinition(
+        fieldOverflowEntry.vector.getField(),
+        fieldOverflowEntry.numValues,
+        fieldSerializerContainer.dataByteLen,
+        fieldBuffer);
+
+      recordOverflowDef.getFieldOverflowDefs().put(fieldOverflowEntry.vector.getField().getName(), fieldOverflowDef);
+
+      // Update this drill buffer current offset
+      bufferOffset += fieldSerializerContainer.totalByteLength;
+    }
+
+    // Finally, release the original buffer
+    boolean isReleased = buffer.release();
+    assert !isReleased; // the reference count should still be higher than zero
+
+    return recordOverflowContainer;
+  }
+
+  /** Disabling object instantiation */
+  private  OverflowSerDeUtil() {
+    // NOOP
+  }
+
+// ----------------------------------------------------------------------------
+// Internal Data Structure
+// ----------------------------------------------------------------------------
+
+  /** Container class to store the result of field overflow serialization */
+  private static final class FieldSerializerContainer {
+    /** Data byte length */
+    int dataByteLen;
+    /** Total byte length */
+    int totalByteLength;
+
+    void clear() {
+      dataByteLen = 0;
+      totalByteLength = 0;
+    }
+  }
+
+  /**
+   * Helper class for handling variable length {@link ValueVector} overflow serialization logic
+   */
+  private static final class VLVectorSerializer {
+    private static final int BYTE_VALUE_WIDTH = UInt1Vector.VALUE_WIDTH;
+    private static final int INT_VALUE_WIDTH  = UInt4Vector.VALUE_WIDTH;
+
+    /** Field overflow entry */
+    private final FieldOverflowEntry fieldOverflowEntry;
+
+    /** Set of DrillBuf's that make up the underlying ValueVector. Only nullable
+     * (VarChar or VarBinary) vectors have three entries. The format is
+     * ["bits-vector"] "offsets-vector" "values-vector"
+     */
+    private final DrillBuf[] buffers;
+
+    /**
+     * Constructor.
+     * @param fieldOverflowEntry field overflow entry
+     */
+    private VLVectorSerializer(FieldOverflowEntry fieldOverflowEntry) {
+      this.fieldOverflowEntry = fieldOverflowEntry;
+      this.buffers = this.fieldOverflowEntry.vector.getBuffers(false);
+    }
+
+    /**
+     * The number of bytes used (by the {@link ValueVector}) to store a value range delimited
+     * by the parameters "firstValueIdx" and "numValues".
+     *
+     * @param firstValueIdx start index of the first value to copy
+     * @param numValues number of values to copy
+     */
+    private int getBytesUsed(int firstValueIdx, int numValues) {
+      int bytesUsed = 0;
+
+      // Only nullable (VarChar or VarBinary) vectors have three entries. The format is
+      // ["bits-vector"] "offsets-vector" "values-vector"
+      if (isNullable()) {
+        bytesUsed += numValues * BYTE_VALUE_WIDTH;
+      }
+
+      // Add the length of the "offsets" vector for the requested range
+      bytesUsed += (numValues + 1) * INT_VALUE_WIDTH;
+
+      // Add the length of the "values" vector for the requested range
+      bytesUsed += getDataLen(firstValueIdx, numValues);
+
+      return bytesUsed;
+    }
+
+    private void copyValueVector(int firstValueIdx,
+      int numValues,
+      DrillBuf targetBuffer,
+      int targetStartIdx,
+      FieldSerializerContainer fieldSerializerContainer) {
+
+      int len = 0;
+      int totalLen = 0;
+
+      // First copy the "bits" vector
+      len = copyBitsVector(firstValueIdx, numValues, targetBuffer, targetStartIdx);
+      assert len >= 0;
+
+      // Then copy the "offsets" vector
+      totalLen += len;
+      len = copyOffsetVector(firstValueIdx, numValues, targetBuffer, targetStartIdx + totalLen);
+      assert len >= 0;
+
+      // Then copy the "values" vector
+      totalLen += len;
+      len = copyValuesVector(firstValueIdx, numValues, targetBuffer, targetStartIdx + totalLen);
+      assert len >= 0;
+
+      // Finally, update the field serializer container
+      fieldSerializerContainer.dataByteLen = len;
+      fieldSerializerContainer.totalByteLength = (totalLen + len);
+    }
+
+    /**
+     * Copy the "bits" vector if the {@link ValueVector} is nullable; NOOP otherwise.
+     *
+     * @param firstValueIdx start index of the first value to copy
+     * @param numValues number of values to copy
+     * @param targetBuffer target buffer
+     * @param targetStartIdx target buffer start index
+     *
+     * @return number of bytes written
+     */
+    private int copyBitsVector(int firstValueIdx, int numValues, DrillBuf targetBuffer, int targetStartIdx) {
+      int bytesCopied = 0;
+
+      if (!isNullable()) {
+        return bytesCopied;
+      }
+
+      DrillBuf srcBuffer = getBitsBuffer();
+      assert srcBuffer != null;
+
+      bytesCopied = numValues * BYTE_VALUE_WIDTH;
+
+      // Now copy the bits data into the target buffer
+      srcBuffer.getBytes(firstValueIdx * BYTE_VALUE_WIDTH,
+        targetBuffer,
+        targetStartIdx,
+        bytesCopied);
+
+      return bytesCopied;
+    }
+
+    /**
+     * Copy the "offset" vector if the {@link ValueVector}; note that no adjustment of the offsets will be done.
+     * This task will be done during overflow data de-serialization.
+     *
+     * @param firstValueIdx start index of the first value to copy
+     * @param numValues number of values to copy
+     * @param targetBuffer target buffer
+     * @param targetStartIdx target buffer start index
+     *
+     * @return number of bytes written
+     */
+    private int copyOffsetVector(int firstValueIdx,
+      int numValues,
+      DrillBuf targetBuffer,
+      int targetStartIdx) {
+
+      final int bytesCopied = (numValues + 1) * INT_VALUE_WIDTH;
+
+      DrillBuf srcBuffer = getOffsetsBuffer();
+      assert srcBuffer != null;
+
+      // Now copy the bits data into the target buffer
+      srcBuffer.getBytes(firstValueIdx * INT_VALUE_WIDTH,
+        targetBuffer,
+        targetStartIdx,
+        bytesCopied);
+
+      return bytesCopied;
+    }
+
+    /**
+     * Copy the "values" vector if the {@link ValueVector}.
+     *
+     * @param firstValueIdx start index of the first value to copy
+     * @param numValues number of values to copy
+     * @param targetBuffer target buffer
+     * @param targetStartIdx target buffer start index
+     *
+     * @return number of bytes written
+     */
+    private int copyValuesVector(int firstValueIdx,
+      int numValues,
+      DrillBuf targetBuffer,
+      int targetStartIdx) {
+
+      final DrillBuf offsets = getOffsetsBuffer();
+      final int startDataOffset = offsets.getInt(firstValueIdx * INT_VALUE_WIDTH);
+      final int bytesCopied = getDataLen(firstValueIdx, numValues);
+      final DrillBuf srcBuffer = getValuesBuffer();
+      assert srcBuffer  != null;
+
+      // Now copy the bits data into the target buffer
+      srcBuffer.getBytes(startDataOffset,
+        targetBuffer,
+        targetStartIdx,
+        bytesCopied);
+
+      return bytesCopied;
+    }
+
+    /**
+     *
+     * @param firstValueIdx start index of the first value to copy
+     * @param numValues number of values to copy
+     *
+     * @return data length contained in the range [first-val, (first-val+range-len)]
+     */
+    private int getDataLen(int firstValueIdx, int numValues) {
+      // Data values for a range can be copied by the following formula:
+      // offsets[firstValueIdx+numValues] - offsets[firstValueIdx]
+      final DrillBuf offsets = getOffsetsBuffer();
+      final int endDataOffset = offsets.getInt((firstValueIdx + numValues) * INT_VALUE_WIDTH);
+      final int startDataOffset = offsets.getInt(firstValueIdx * INT_VALUE_WIDTH);
+
+      return endDataOffset - startDataOffset;
+
+    }
+
+    /** @return "bits" buffer; null if the associated {@link ValueVector} is not nullable */
+    private DrillBuf getBitsBuffer() {
+      return buffers.length == 3 ? buffers[0] : null;
+    }
+
+    /** @return "offsets" buffer */
+    private DrillBuf getOffsetsBuffer() {
+      return buffers.length == 3 ? buffers[1] : buffers[0];
+    }
+
+    /** @return "values" buffer */
+    private DrillBuf getValuesBuffer() {
+      return buffers.length == 3 ? buffers[2] : buffers[1];
+    }
+
+    /** @return whether this vector is nullable */
+    private boolean isNullable() {
+      return getBitsBuffer() != null;
+    }
+
+  } // End of VLVectorSerDe class
+
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/batchsizing/RecordBatchOverflow.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/batchsizing/RecordBatchOverflow.java
new file mode 100644
index 0000000..76422ae
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/batchsizing/RecordBatchOverflow.java
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.parquet.columnreaders.batchsizing;
+
+import io.netty.buffer.DrillBuf;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import org.apache.drill.common.map.CaseInsensitiveMap;
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.vector.ValueVector;
+import org.apache.drill.exec.vector.VariableWidthVector;
+
+/**
+ * Logic for handling batch record overflow; this class essentially serializes overflow vector data in a
+ * compact manner so that it is reused for building the next record batch.
+ */
+public final class RecordBatchOverflow {
+  /** Record Overflow Definition */
+  final RecordOverflowDefinition recordOverflowDef;
+  /** Buffer allocator */
+  final BufferAllocator allocator;
+
+  /**
+   * @param allocator buffer allocator
+   * @return new builder object
+   */
+  public static Builder newBuilder(BufferAllocator allocator) {
+    return new Builder(allocator);
+  }
+
+  /**
+   * @return the record overflow definition
+   */
+  public RecordOverflowDefinition getRecordOverflowDefinition() {
+    return recordOverflowDef;
+  }
+
+  /**
+   * Constructor.
+   *
+   * @param recordOverflowDef record overflow definition
+   * @param allocator buffer allocator
+   */
+  private RecordBatchOverflow(RecordOverflowDefinition recordOverflowDef,
+    BufferAllocator allocator) {
+
+    this.recordOverflowDef = recordOverflowDef;
+    this.allocator = allocator;
+  }
+
+// ----------------------------------------------------------------------------
+// Inner Data Structure
+// ----------------------------------------------------------------------------
+
+  /** Builder class to construct a {@link RecordBatchOverflow} object */
+  public static final class Builder {
+    /** Field overflow list */
+    private final List<FieldOverflowEntry> fieldOverflowEntries = new ArrayList<FieldOverflowEntry>();
+    /** Buffer allocator */
+    private final BufferAllocator allocator;
+
+    /**
+     * Build class to construct a {@link RecordBatchOverflow} object.
+     * @param allocator buffer allocator
+     */
+    private Builder(BufferAllocator allocator) {
+      this.allocator = allocator;
+    }
+
+    /**
+     * Add an overflow field to this batch record overflow object; note that currently only
+     * variable numValues objects are supported.
+     *
+     * @param vector a value vector with overflow values
+     * @param firstValueIdx index of first overflow value
+     * @param numValues the number of overflow values starting at index "firstValueIdx"
+     */
+    public void addFieldOverflow(ValueVector vector, int firstValueIdx, int numValues) {
+      assert vector instanceof VariableWidthVector;
+      fieldOverflowEntries.add(new FieldOverflowEntry(vector, firstValueIdx, numValues));
+    }
+
+    /**
+     * @return a new built {link BatchRecordOverflow} object instance
+     */
+    public RecordBatchOverflow build() {
+      RecordOverflowContainer overflowContainer = OverflowSerDeUtil.serialize(fieldOverflowEntries, allocator);
+      RecordBatchOverflow result                =
+        new RecordBatchOverflow(overflowContainer.recordOverflowDef, allocator);
+
+      return result;
+    }
+  } // End of Builder
+
+  /** Field overflow entry */
+  static final class FieldOverflowEntry {
+    /** A value vector with overflow values */
+    final ValueVector vector;
+    /** index of first overflow value */
+    final int firstValueIdx;
+    /** The number of overflow values starting at index "firstValueIdx" */
+    final int numValues;
+
+    /**
+     * Field overflow entry constructor
+     *
+     * @param vector a value vector with overflow values
+     * @param firstValueIdx index of first overflow value
+     * @param numValues the number of overflow values starting at index "firstValueIdx"
+     */
+    private FieldOverflowEntry(ValueVector vector, int firstValueIdx, int numValues) {
+      this.vector = vector;
+      this.firstValueIdx = firstValueIdx;
+      this.numValues = numValues;
+    }
+  } // End of FieldOverflowEntry
+
+  /** Record batch definition */
+  public static final class RecordOverflowDefinition {
+    private final Map<String, FieldOverflowDefinition> fieldOverflowDefs = CaseInsensitiveMap.newHashMap();
+
+    public Map<String, FieldOverflowDefinition> getFieldOverflowDefs() {
+      return fieldOverflowDefs;
+    }
+  } // End of RecordOverflowDefinition
+
+  /** Field overflow definition */
+  public static final class FieldOverflowDefinition {
+    /** Materialized field */
+    public final MaterializedField field;
+    /** Number of values */
+    public final int numValues;
+    /** Data byte length */
+    public final int dataByteLen;
+    /** DrillBuf where the serialized data is stored */
+    public final DrillBuf buffer;
+
+    /**
+     * Field overflow definition constructor
+     * @param field materialized field
+     * @param numValues number of values
+     * @param dataByteLen data byte length
+     * @param buffer DrillBuf where the serialized data is stored
+     */
+    FieldOverflowDefinition(MaterializedField field, int numValues, int dataByteLen, DrillBuf buffer) {
+      this.field = field;
+      this.numValues = numValues;
+      this.dataByteLen = dataByteLen;
+      this.buffer = buffer;
+    }
+  } // End of FieldOverflowDefinition
+
+  /** Record overflow container */
+  static final class RecordOverflowContainer {
+    /** Record Overflow Definition */
+    final RecordOverflowDefinition recordOverflowDef = new RecordOverflowDefinition();
+
+  } // End of RecordOverflowContainer
+
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/batchsizing/RecordBatchSizerManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/batchsizing/RecordBatchSizerManager.java
new file mode 100644
index 0000000..01644f7
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/batchsizing/RecordBatchSizerManager.java
@@ -0,0 +1,661 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.parquet.columnreaders.batchsizing;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import org.apache.drill.common.map.CaseInsensitiveMap;
+import org.apache.drill.common.types.TypeProtos.MajorType;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.exception.OutOfMemoryException;
+import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.server.options.OptionManager;
+import org.apache.drill.exec.store.parquet.columnreaders.ParquetColumnMetadata;
+import org.apache.drill.exec.store.parquet.columnreaders.ParquetSchema;
+import org.apache.drill.exec.store.parquet.columnreaders.VarLenColumnBulkInput;
+import org.apache.drill.exec.store.parquet.columnreaders.batchsizing.RecordBatchOverflow.FieldOverflowDefinition;
+import org.apache.drill.exec.vector.AllocationHelper;
+import org.apache.drill.exec.vector.ValueVector;
+
+/**
+ * This class is tasked with managing all aspects of flat Parquet reader record batch sizing logic.
+ * Currently a record batch size is constrained with two parameters: Number of rows and Memory usage.
+ */
+public final class RecordBatchSizerManager {
+  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RecordBatchSizerManager.class);
+  public static final String BATCH_STATS_PREFIX = "BATCH_STATS";
+
+  /** Minimum column memory size */
+  private static final int MIN_COLUMN_MEMORY_SZ = VarLenColumnBulkInput.getMinVLColumnMemorySize();
+
+  /** Parquet schema object */
+  private final ParquetSchema schema;
+  /** Total records to read */
+  private final long totalRecordsToRead;
+  /** Logic to minimize overflow occurrences */
+  private final BatchOverflowOptimizer overflowOptimizer;
+
+  /** Configured Parquet records per batch */
+  private final int configRecordsPerBatch;
+  /** Configured Parquet memory size per batch */
+  private final int configMemorySizePerBatch;
+  /** An upper bound on the Parquet records per batch based on the configured value and schema */
+  private int maxRecordsPerBatch;
+  /** An upper bound on the Parquet memory size per batch based on the configured value and schema  */
+  private int maxMemorySizePerBatch;
+  /** The current number of records per batch as it can be dynamically optimized */
+  private int recordsPerBatch;
+
+  /** List of fixed columns */
+  private final List<ColumnMemoryInfo> fixedLengthColumns = new ArrayList<ColumnMemoryInfo>();
+  /** List of variable columns */
+  private final List<ColumnMemoryInfo> variableLengthColumns = new ArrayList<ColumnMemoryInfo>();
+  /** Field to column memory information map */
+  private final Map<String, ColumnMemoryInfo> columnMemoryInfoMap = CaseInsensitiveMap.newHashMap();
+  /** Indicator invoked when column(s) precision change */
+  private boolean columnPrecisionChanged;
+
+  /**
+   * Field overflow map; this information is stored within this class for two reasons:
+   * a) centralization to simplify resource deallocation (overflow data is backed by Direct Memory)
+   * b) overflow is a result of batch constraints enforcement which this class manages the overflow logic
+   */
+  private Map<String, FieldOverflowStateContainer> fieldOverflowMap = CaseInsensitiveMap.newHashMap();
+
+  /**
+   * Constructor.
+   *
+   * @param options drill options
+   * @param schema current reader schema
+   * @param totalRecordsToRead total number of rows to read
+   */
+  public RecordBatchSizerManager(OptionManager options,
+    ParquetSchema schema,
+    long totalRecordsToRead) {
+
+    this.schema = schema;
+    this.totalRecordsToRead = totalRecordsToRead;
+    this.configRecordsPerBatch = (int) options.getLong(ExecConstants.PARQUET_FLAT_BATCH_NUM_RECORDS);
+    this.configMemorySizePerBatch = getConfiguredMaxBatchMemory(options);
+    this.maxMemorySizePerBatch = this.configMemorySizePerBatch;
+    this.maxRecordsPerBatch = this.configRecordsPerBatch;
+    this.recordsPerBatch = this.configRecordsPerBatch;
+    this.overflowOptimizer = new BatchOverflowOptimizer(columnMemoryInfoMap);
+  }
+
+  /**
+   * Tunes record batch parameters based on configuration and schema.
+   */
+  public void setup() {
+
+    // Normalize batch parameters
+    this.maxMemorySizePerBatch = normalizeMemorySizePerBatch();
+    this.maxRecordsPerBatch = normalizeNumRecordsPerBatch();
+
+    // Let's load the column metadata
+    loadColumnsPrecisionInfo();
+
+    if (getNumColumns() == 0) {
+      return; // there are cases where downstream operators don't select any columns
+              // in such a case, Parquet will return the pseudo column _DEFAULT_COL_TO_READ_
+    }
+
+    // We need to divide the overall memory pool amongst all columns
+    assignColumnsBatchMemory();
+
+    // Initialize the overflow optimizer
+    overflowOptimizer.setup();
+  }
+
+  /**
+   * @return the schema
+   */
+  public ParquetSchema getSchema() {
+    return schema;
+  }
+
+  /**
+   * Allocates value vectors for the current batch.
+   *
+   * @param vectorMap a collection of value vectors keyed by their field names
+   * @throws OutOfMemoryException
+   */
+  public void allocate(Map<String, ValueVector> vectorMap) throws OutOfMemoryException {
+
+    if (columnPrecisionChanged) {
+      // We need to divide the overall memory pool amongst all columns
+      assignColumnsBatchMemory();
+    }
+
+    try {
+      for (final ValueVector v : vectorMap.values()) {
+        ColumnMemoryInfo columnMemoryInfo = columnMemoryInfoMap.get(v.getField().getName());
+
+        if (columnMemoryInfo != null) {
+          AllocationHelper.allocate(v, recordsPerBatch, columnMemoryInfo.columnPrecision, 0);
+        } else {
+          // This column was found in another Parquet file but not the current one; so we inject
+          // a null value. At this time, we do not account for such columns. Why? the right design is
+          // to create a ZERO byte all-nulls value vector to handle such columns (there could be hundred of these).
+          AllocationHelper.allocate(v, recordsPerBatch, 0, 0); // the helper will still use a precision of 1
+        }
+      }
+    } catch (NullPointerException e) {
+      throw new OutOfMemoryException();
+    }
+  }
+
+  /**
+   * @return the field overflow state map
+   */
+  public Map<String, FieldOverflowStateContainer> getFieldOverflowMap() {
+    return fieldOverflowMap;
+  }
+
+  /**
+   * @param field materialized field
+   * @return field overflow state container
+   */
+  public FieldOverflowStateContainer getFieldOverflowContainer(String field) {
+    return fieldOverflowMap.get(field);
+  }
+
+  /**
+   * Releases the overflow data resources associated with this field; also removes the overflow
+   * container from the overflow containers map.
+   *
+   * @param field materialized field
+   * @return true if this field's overflow container was removed from the overflow containers map
+   */
+  public boolean releaseFieldOverflowContainer(String field) {
+    return releaseFieldOverflowContainer(field, true);
+  }
+
+  /**
+   * @param field materialized field
+   * @return field batch memory quota
+   */
+  public ColumnMemoryQuota getCurrentFieldBatchMemory(String field) {
+    return columnMemoryInfoMap.get(field).columnMemoryQuota;
+  }
+
+  /**
+   * @return current number of records per batch (may change across batches)
+   */
+  public int getCurrentRecordsPerBatch() {
+    return recordsPerBatch;
+  }
+
+  /**
+   * @return current total memory per batch (may change across batches)
+   */
+  public int getCurrentMemorySizePerBatch() {
+    return maxMemorySizePerBatch; // Current logic doesn't mutate the max-memory after it has been set
+  }
+
+  /**
+   * @return configured number of records per batch (may be different from the enforced one)
+   */
+  public int getConfigRecordsPerBatch() {
+    return configRecordsPerBatch;
+  }
+
+  /**
+   * @return configured memory size per batch (may be different from the enforced one)
+   */
+  public int getConfigMemorySizePerBatch() {
+    return configMemorySizePerBatch;
+  }
+
+  /**
+   * Enables this object to optimize the impact of overflows by computing more
+   * accurate VL column precision.
+   *
+   * @param batchNumRecords number of records in this batch
+   * @param batchStats columns statistics
+   */
+  public void onEndOfBatch(int batchNumRecords, List<VarLenColumnBatchStats> batchStats) {
+    columnPrecisionChanged = overflowOptimizer.onEndOfBatch(batchNumRecords, batchStats);
+  }
+
+  /**
+   * Closes all resources managed by this object
+   */
+  public void close() {
+
+    for (String field: fieldOverflowMap.keySet()) {
+      releaseFieldOverflowContainer(field, false);
+    }
+
+    // now clear the map
+    fieldOverflowMap.clear();
+  }
+
+// ----------------------------------------------------------------------------
+// Internal implementation logic
+// ----------------------------------------------------------------------------
+
+  private int getConfiguredMaxBatchMemory(OptionManager options) {
+    // Use the parquet specific configuration if set
+    int maxMemory = (int) options.getLong(ExecConstants.PARQUET_FLAT_BATCH_MEMORY_SIZE);
+
+    // Otherwise, use the common property
+    if (maxMemory <= 0) {
+      maxMemory = (int) options.getLong(ExecConstants.OUTPUT_BATCH_SIZE);
+    }
+    return maxMemory;
+  }
+
+  private int normalizeNumRecordsPerBatch() {
+    int normalizedNumRecords = configRecordsPerBatch;
+
+    if (configRecordsPerBatch <= 0) {
+      final String message = String.format("Invalid Parquet number of record(s) per batch [%d]",
+        configRecordsPerBatch);
+
+      throw new IllegalArgumentException(message);
+    }
+
+    if (normalizedNumRecords > totalRecordsToRead) {
+      if (logger.isDebugEnabled()) {
+        final String message = String.format("The requested number of record(s) to read is lower than the records per batch; "
+          + "updating the number of record(s) per batch from [%d] to [%d]",
+          normalizedNumRecords, totalRecordsToRead);
+        logger.debug(message);
+      }
+      normalizedNumRecords = (int) totalRecordsToRead;
+    }
+
+    if (logger.isDebugEnabled()) {
+      final String message = String.format("%s: The Parquet reader number of record(s) has been set to [%d]",
+        BATCH_STATS_PREFIX, normalizedNumRecords);
+      logger.debug(message);
+    }
+
+    return normalizedNumRecords;
+  }
+
+  private int normalizeMemorySizePerBatch() {
+    int normalizedMemorySize = configMemorySizePerBatch;
+
+    if (normalizedMemorySize <= 0) {
+      final String message = String.format("Invalid Parquet memory per batch [%d] byte(s)",
+        configMemorySizePerBatch);
+
+      throw new IllegalArgumentException(message);
+    }
+
+    // Ensure the minimal memory size per column is satisfied
+    final int numColumns = schema.getColumnMetadata().size();
+
+    if (numColumns == 0) {
+      return normalizedMemorySize; // NOOP
+    }
+
+    final int memorySizePerColumn = normalizedMemorySize / numColumns;
+
+    if (memorySizePerColumn < MIN_COLUMN_MEMORY_SZ) {
+      final int prevValue   = normalizedMemorySize;
+      normalizedMemorySize  = MIN_COLUMN_MEMORY_SZ * numColumns;
+
+      final String message = String.format("The Parquet memory per batch [%d] byte(s) is too low for this query ; using [%d] bytes",
+        prevValue, normalizedMemorySize);
+      logger.warn(message);
+    }
+
+    if (logger.isDebugEnabled()) {
+      final String message = String.format("%s: The Parquet reader batch memory has been set to [%d] byte(s)",
+        BATCH_STATS_PREFIX, normalizedMemorySize);
+      logger.debug(message);
+    }
+
+    return normalizedMemorySize;
+  }
+
+  private void loadColumnsPrecisionInfo() {
+    assert fixedLengthColumns.size() == 0;
+    assert variableLengthColumns.size() == 0;
+
+    for (ParquetColumnMetadata columnMetadata : schema.getColumnMetadata()) {
+      assert !columnMetadata.isRepeated() : "This reader doesn't handle repeated columns..";
+
+      ColumnMemoryInfo columnMemoryInfo = new ColumnMemoryInfo();
+      columnMemoryInfoMap.put(columnMetadata.getField().getName(), columnMemoryInfo);
+
+      if (columnMetadata.isFixedLength()) {
+        columnMemoryInfo.columnMeta = columnMetadata;
+        columnMemoryInfo.columnPrecision = BatchSizingMemoryUtil.getFixedColumnTypePrecision(columnMetadata);
+        columnMemoryInfo.columnMemoryQuota.reset();
+
+        fixedLengthColumns.add(columnMemoryInfo);
+      } else {
+
+        columnMemoryInfo.columnMeta = columnMetadata;
+        columnMemoryInfo.columnPrecision = BatchSizingMemoryUtil.getAvgVariableLengthColumnTypePrecision(columnMetadata);
+        columnMemoryInfo.columnMemoryQuota.reset();
+
+        variableLengthColumns.add(columnMemoryInfo);
+      }
+    }
+  }
+
+  private void assignColumnsBatchMemory() {
+
+    if (getNumColumns() == 0) {
+      return;
+    }
+
+    recordsPerBatch = maxRecordsPerBatch;
+
+    // Cache the original records per batch as it may change
+    int originalRecordsPerBatch = recordsPerBatch;
+
+    // Perform the fine-grained memory quota assignment
+    assignFineGrainedMemoryQuota();
+
+    // log the new record batch if it changed
+    if (logger.isDebugEnabled()) {
+      assert recordsPerBatch <= maxRecordsPerBatch;
+
+      if (originalRecordsPerBatch != recordsPerBatch) {
+        final String message = String.format("%s: The Parquet records per batch [%d] has been decreased to [%d]",
+          BATCH_STATS_PREFIX, originalRecordsPerBatch, recordsPerBatch);
+        logger.debug(message);
+      }
+
+      // Now dump the per column memory quotas
+      dumpColumnMemoryQuotas();
+    }
+  }
+
+  private void assignFineGrainedMemoryQuota() {
+
+    // - Compute the memory required based on the current batch size and assigned column precision
+    // - Compute the ration have-memory / needed-memory
+    // - if less than one, new-num-records = num-recs * ratio; go back to previous steps
+    // - distribute the extra memory uniformly across the variable length columns
+
+    MemoryRequirementContainer requiredMemory = new MemoryRequirementContainer();
+    int newRecordsPerBatch = recordsPerBatch;
+
+    while (true) {
+      // Compute max-memory / needed-memory-for-current-num-records
+      recordsPerBatch          = newRecordsPerBatch;
+      double neededMemoryRatio = computeNeededMemoryRatio(requiredMemory);
+      assert neededMemoryRatio <= 1;
+
+      newRecordsPerBatch = (int) (recordsPerBatch * neededMemoryRatio);
+      assert newRecordsPerBatch <= recordsPerBatch;
+
+      if (newRecordsPerBatch <= 1) {
+        recordsPerBatch = 1;
+        computeNeededMemoryRatio(requiredMemory); // update the memory quota with this new number of records
+        break; // we cannot process less than one row
+
+      } else if (newRecordsPerBatch < recordsPerBatch) {
+        // We computed a new number of records per batch; we need to
+        // a) make sure this new number satisfies our needs and b)
+        // per per column quota
+        continue;
+      }
+      assert recordsPerBatch == newRecordsPerBatch;
+
+      // Alright, we have now found the target number of records; we need
+      // only to adjust the remaining memory (if any) amongst the variable
+      // length columns.
+      distributeExtraMemorySpace(requiredMemory);
+      break; // we're done
+    }
+  }
+
+  private void distributeExtraMemorySpace(MemoryRequirementContainer requiredMemory) {
+    // Distribute uniformly the extra memory space to the variable length columns
+    // to minimize the chance of overflow conditions.
+    final int numVariableLengthColumns = variableLengthColumns.size();
+
+    if (numVariableLengthColumns == 0) {
+      return; // we're done
+    }
+
+    final int totalMemoryNeeded = requiredMemory.fixedLenRequiredMemory + requiredMemory.variableLenRequiredMemory;
+    final int extraMemorySpace = maxMemorySizePerBatch - totalMemoryNeeded;
+    final int perColumnExtraSpace = extraMemorySpace / numVariableLengthColumns;
+
+    if (perColumnExtraSpace == 0) {
+      return;
+    }
+
+    for (ColumnMemoryInfo columnInfo : variableLengthColumns) {
+      columnInfo.columnMemoryQuota.maxMemoryUsage += perColumnExtraSpace;
+    }
+  }
+
+  private int getNumColumns() {
+    return fixedLengthColumns.size() + variableLengthColumns.size();
+  }
+
+  private boolean releaseFieldOverflowContainer(String field, boolean remove) {
+    FieldOverflowStateContainer container = getFieldOverflowContainer(field);
+
+    if (container == null) {
+      return false; // NOOP
+    }
+
+    // We need to release resources associated with this container
+    container.release();
+    container.overflowDef = null;
+    container.overflowState = null;
+
+    if (remove) {
+      // Finally remove this container from the map
+      fieldOverflowMap.remove(field);
+    }
+
+    return remove;
+  }
+
+  private int computeVectorMemory(ColumnMemoryInfo columnInfo, int numValues) {
+    if (columnInfo.columnMeta.isFixedLength()) {
+      return BatchSizingMemoryUtil.computeFixedLengthVectorMemory(columnInfo.columnMeta, numValues);
+    }
+    return BatchSizingMemoryUtil.computeVariableLengthVectorMemory(
+      columnInfo.columnMeta,
+      columnInfo.columnPrecision,
+      numValues);
+  }
+
+  private double computeNeededMemoryRatio(MemoryRequirementContainer requiredMemory) {
+    requiredMemory.reset();
+
+    for (ColumnMemoryInfo columnInfo : fixedLengthColumns) {
+      columnInfo.columnMemoryQuota.maxMemoryUsage = computeVectorMemory(columnInfo, recordsPerBatch);
+      columnInfo.columnMemoryQuota.maxNumValues   = recordsPerBatch;
+      requiredMemory.fixedLenRequiredMemory      += columnInfo.columnMemoryQuota.maxMemoryUsage;
+    }
+
+    for (ColumnMemoryInfo columnInfo : variableLengthColumns) {
+      columnInfo.columnMemoryQuota.maxMemoryUsage = computeVectorMemory(columnInfo, recordsPerBatch);
+      columnInfo.columnMemoryQuota.maxNumValues   = recordsPerBatch;
+      requiredMemory.variableLenRequiredMemory   += columnInfo.columnMemoryQuota.maxMemoryUsage;
+    }
+
+    final int totalMemoryNeeded = requiredMemory.fixedLenRequiredMemory + requiredMemory.variableLenRequiredMemory;
+    assert totalMemoryNeeded > 0;
+
+    double neededMemoryRatio = ((double) maxMemorySizePerBatch) / totalMemoryNeeded;
+
+    return neededMemoryRatio > 1 ? 1 : neededMemoryRatio;
+  }
+
+  private void dumpColumnMemoryQuotas() {
+    StringBuilder msg = new StringBuilder(BATCH_STATS_PREFIX);
+    msg.append(": Field Quotas:\n\tName\tType\tPrec\tQuota\n");
+
+    for (ColumnMemoryInfo columnInfo : columnMemoryInfoMap.values()) {
+      msg.append("\t");
+      msg.append(BATCH_STATS_PREFIX);
+      msg.append("\t");
+      msg.append(columnInfo.columnMeta.getField().getName());
+      msg.append("\t");
+      printType(columnInfo.columnMeta.getField(), msg);
+      msg.append("\t");
+      msg.append(columnInfo.columnPrecision);
+      msg.append("\t");
+      msg.append(columnInfo.columnMemoryQuota.maxMemoryUsage);
+      msg.append("\n");
+    }
+
+    logger.debug(msg.toString());
+  }
+
+  private  static void printType(MaterializedField field, StringBuilder msg) {
+    final MajorType type = field.getType();
+
+    msg.append(type.getMinorType().name());
+    msg.append(':');
+    msg.append(type.getMode().name());
+  }
+
+
+// ----------------------------------------------------------------------------
+// Inner Data Structure
+// ----------------------------------------------------------------------------
+
+  /** An abstraction to allow column readers attach custom field overflow state */
+  public static interface FieldOverflowState {
+
+    /** Overflow data can become an input source for the next batch(s); this method
+     * allows the reader framework to inform individual readers on the number of
+     * values that have been consumed from the current overflow data
+     *
+     * @param numValues the number of values consumed within the current batch
+     */
+    void onNewBatchValuesConsumed(int numValues);
+
+    /**
+     * @return true if the overflow data has been fully consumed (all overflow data consumed by
+     *              the Parquet reader)
+     */
+    boolean isOverflowDataFullyConsumed();
+  }
+
+  /** Container object to hold current field overflow state */
+  public static final class FieldOverflowStateContainer {
+    /** Field overflow definition */
+    public FieldOverflowDefinition overflowDef;
+    /** Field overflow state */
+    public FieldOverflowState overflowState;
+
+    public FieldOverflowStateContainer(FieldOverflowDefinition overflowDef, FieldOverflowState overflowState) {
+      this.overflowDef   = overflowDef;
+      this.overflowState = overflowState;
+    }
+
+    private void release() {
+      if (overflowDef != null) {
+        if (logger.isDebugEnabled()) {
+          logger.debug(String.format(
+            "Releasing a buffer of length %d used to handle overflow data", overflowDef.buffer.capacity()));
+        }
+        overflowDef.buffer.release();
+      }
+      overflowDef = null;
+      overflowState = null;
+    }
+  }
+
+  /** Container object to supply variable columns statistics to the batch sizer */
+  public final static class VarLenColumnBatchStats {
+    /** Value vector associated with a VL column */
+    public final ValueVector vector;
+    /** Number of values read in the current batch */
+    public final int numValuesRead;
+
+    /**
+     * Constructor.
+     * @param vector value vector
+     * @param numValuesRead number of values
+     */
+    public VarLenColumnBatchStats(ValueVector vector, int numValuesRead) {
+      this.vector = vector;
+      this.numValuesRead = numValuesRead;
+    }
+  }
+
+  /** Field memory quota */
+  public static final class ColumnMemoryQuota {
+    /** Maximum cumulative memory that could be used */
+    private int maxMemoryUsage;
+    /** Maximum number of values that could be inserted */
+    private int maxNumValues;
+
+    public ColumnMemoryQuota() {
+    }
+
+    /**
+     * @param maxMemoryUsage maximum cumulative memory that could be used
+     */
+    public ColumnMemoryQuota(int maxMemoryUsage) {
+      this.maxMemoryUsage = maxMemoryUsage;
+    }
+
+    /**
+     * @return the maxMemoryUsage
+     */
+    public int getMaxMemoryUsage() {
+      return maxMemoryUsage;
+    }
+
+    /**
+     * @return the maxNumValues
+     */
+    public int getMaxNumValues() {
+      return maxNumValues;
+    }
+
+    void reset() {
+      maxMemoryUsage = 0;
+      maxNumValues = 0;
+    }
+  }
+
+  /** A container which holds a column memory precision & current quota information */
+  static final class ColumnMemoryInfo {
+    /** Column metadata */
+    ParquetColumnMetadata columnMeta;
+    /** Column value precision (maximum length for VL columns) */
+    int columnPrecision;
+    /** Column current memory quota within a batch */
+    final ColumnMemoryQuota columnMemoryQuota = new ColumnMemoryQuota();
+  }
+
+  /** Memory requirements container */
+  static final class MemoryRequirementContainer {
+    /** Memory needed for the fixed length columns given a specific record size */
+    private int fixedLenRequiredMemory;
+    /** Memory needed for the fixed length columns given a specific record size */
+    private int variableLenRequiredMemory;
+
+    private void reset() {
+      this.fixedLenRequiredMemory    = 0;
+      this.variableLenRequiredMemory = 0;
+    }
+  }
+
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/util/record/RecordBatchStats.java b/exec/java-exec/src/main/java/org/apache/drill/exec/util/record/RecordBatchStats.java
new file mode 100644
index 0000000..8b213a8
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/util/record/RecordBatchStats.java
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.util.record;
+
+import org.apache.drill.common.types.TypeProtos.MajorType;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.FragmentContextImpl;
+import org.apache.drill.exec.ops.OperatorContext;
+import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
+import org.apache.drill.exec.proto.helper.QueryIdHelper;
+import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.record.RecordBatchSizer;
+import org.apache.drill.exec.record.RecordBatchSizer.ColumnSize;
+
+/**
+ * Utility class to capture key record batch statistics.
+ */
+public final class RecordBatchStats {
+  /** A prefix for all batch stats to simplify search */
+  public static final String BATCH_STATS_PREFIX = "BATCH_STATS";
+
+  /** Helper class which loads contextual record batch logging options */
+  public static final class RecordBatchStatsContext {
+    private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RecordBatchStatsContext.class);
+
+    /** batch size logging for all readers */
+    private final boolean enableBatchSzLogging;
+    /** Fine grained batch size logging */
+    private final boolean enableFgBatchSzLogging;
+    /** Unique Operator Identifier */
+    private final String contextOperatorId;
+
+    /**
+     * @param options options manager
+     */
+    public RecordBatchStatsContext(FragmentContext context, OperatorContext oContext) {
+      enableBatchSzLogging = context.getOptions().getBoolean(ExecConstants.STATS_LOGGING_BATCH_SIZE_OPTION);
+      enableFgBatchSzLogging = context.getOptions().getBoolean(ExecConstants.STATS_LOGGING_FG_BATCH_SIZE_OPTION);
+      contextOperatorId = new StringBuilder()
+        .append(getQueryId(context))
+        .append(":")
+        .append(oContext.getStats().getId())
+        .toString();
+    }
+
+    /**
+     * @return the enableBatchSzLogging
+     */
+    public boolean isEnableBatchSzLogging() {
+      return enableBatchSzLogging || enableFgBatchSzLogging || logger.isDebugEnabled();
+    }
+
+    /**
+     * @return the enableFgBatchSzLogging
+     */
+    public boolean isEnableFgBatchSzLogging() {
+      return enableFgBatchSzLogging || logger.isDebugEnabled();
+    }
+
+    /**
+     * @return indicates whether stats messages should be logged in info or debug level
+     */
+    public boolean useInfoLevelLogging() {
+      return isEnableBatchSzLogging() && !logger.isDebugEnabled();
+    }
+
+    /**
+     * @return the contextOperatorId
+     */
+    public String getContextOperatorId() {
+      return contextOperatorId;
+    }
+
+    private String getQueryId(FragmentContext _context) {
+      if (_context instanceof FragmentContextImpl) {
+        final FragmentContextImpl context = (FragmentContextImpl) _context;
+        final FragmentHandle handle       = context.getHandle();
+
+        if (handle != null) {
+          return QueryIdHelper.getQueryIdentifier(handle);
+        }
+      }
+      return "NA";
+    }
+  }
+
+  /**
+   * Constructs record batch statistics for the input record batch
+   *
+   * @param stats instance identifier
+   * @param sourceId optional source identifier for scanners
+   * @param recordBatch a set of records
+   * @param verbose whether to include fine-grained stats
+   *
+   * @return a string containing the record batch statistics
+   */
+  public static String printRecordBatchStats(String statsId,
+    String sourceId,
+    RecordBatch recordBatch,
+    boolean verbose) {
+
+    final RecordBatchSizer batchSizer = new RecordBatchSizer(recordBatch);
+    final StringBuilder msg = new StringBuilder();
+
+    msg.append(BATCH_STATS_PREFIX);
+    msg.append(" Originator: {");
+    msg.append(statsId);
+    if (sourceId != null) {
+      msg.append(':');
+      msg.append(sourceId);
+    }
+    msg.append("}, Batch size: {");
+    msg.append( "  Records: " );
+    msg.append(batchSizer.rowCount());
+    msg.append(", Total size: ");
+    msg.append(batchSizer.getActualSize());
+    msg.append(", Data size: ");
+    msg.append(batchSizer.getNetBatchSize());
+    msg.append(", Gross row width: ");
+    msg.append(batchSizer.getGrossRowWidth());
+    msg.append(", Net row width: ");
+    msg.append(batchSizer.getNetRowWidth());
+    msg.append(", Density: ");
+    msg.append(batchSizer.getAvgDensity());
+    msg.append("% }\n");
+
+    if (verbose) {
+      msg.append("Batch schema & sizes: {\n");
+      for (ColumnSize colSize : batchSizer.columns().values()) {
+        msg.append(BATCH_STATS_PREFIX);
+        msg.append("\t");
+        msg.append(statsId);
+        msg.append('\t');
+        msg.append(colSize.toString());
+        msg.append(" }\n");
+      }
+      msg.append(" }\n");
+    }
+
+    return msg.toString();
+  }
+
+  /**
+   * Logs record batch statistics for the input record batch (logging happens only
+   * when record statistics logging is enabled).
+   *
+   * @param stats instance identifier
+   * @param sourceId optional source identifier for scanners
+   * @param recordBatch a set of records
+   * @param verbose whether to include fine-grained stats
+   * @param logger Logger where to print the record batch statistics
+   */
+  public static void logRecordBatchStats(String statsId,
+    String sourceId,
+    RecordBatch recordBatch,
+    RecordBatchStatsContext batchStatsLogging,
+    org.slf4j.Logger logger) {
+
+    if (!batchStatsLogging.isEnableBatchSzLogging()) {
+      return; // NOOP
+    }
+
+    final boolean verbose = batchStatsLogging.isEnableFgBatchSzLogging();
+    final String msg = printRecordBatchStats(statsId, sourceId, recordBatch, verbose);
+
+    if (batchStatsLogging.useInfoLevelLogging()) {
+      logger.info(msg);
+    } else {
+      logger.debug(msg);
+    }
+  }
+
+  /**
+   * Prints a materialized field type
+   * @param field materialized field
+   * @param msg string builder where to append the field type
+   */
+  public static void printFieldType(MaterializedField field, StringBuilder msg) {
+    final MajorType type = field.getType();
+
+    msg.append(type.getMinorType().name());
+    msg.append(':');
+    msg.append(type.getMode().name());
+  }
+
+  /**
+   * @param allocator dumps allocator statistics
+   * @return string with allocator statistics
+   */
+  public static String printAllocatorStats(BufferAllocator allocator) {
+    StringBuilder msg = new StringBuilder();
+    msg.append(BATCH_STATS_PREFIX);
+    msg.append(": dumping allocator statistics:\n");
+    msg.append(BATCH_STATS_PREFIX);
+    msg.append(": ");
+    msg.append(allocator.toString());
+
+    return msg.toString();
+  }
+
+  /**
+   * Disabling class object instantiation.
+   */
+  private RecordBatchStats() {
+  }
+
+}
\ No newline at end of file
diff --git a/exec/java-exec/src/main/resources/drill-module.conf b/exec/java-exec/src/main/resources/drill-module.conf
index 99caeab..23d59d3 100644
--- a/exec/java-exec/src/main/resources/drill-module.conf
+++ b/exec/java-exec/src/main/resources/drill-module.conf
@@ -482,6 +482,8 @@ drill.exec.options: {
     exec.storage.enable_new_text_reader: true,
     exec.udf.enable_dynamic_support: true,
     exec.udf.use_dynamic: true,
+    drill.exec.stats.logging.batch_size: false,
+    drill.exec.stats.logging.fine_grained.batch_size: false,
     new_view_default_permissions: 700,
     org.apache.drill.exec.compile.ClassTransformer.scalar_replacement: "try",
     planner.add_producer_consumer: false,
@@ -581,6 +583,10 @@ drill.exec.options: {
     store.parquet.writer.logical_type_for_decimals: "fixed_len_byte_array",
     store.parquet.writer.use_single_fs_block: false,
     store.parquet.flat.reader.bulk: true,
+    store.parquet.flat.batch.num_records: 32767,
+    # Using common operators batch configuration unless the Parquet specific
+    # configuration is used
+    store.parquet.flat.batch.memory_size: 0,
     store.partition.hash_distribute: false,
     store.text.estimated_row_size_bytes: 100.0,
     store.kafka.all_text_mode: false,
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/columnreaders/TestBatchSizingMemoryUtil.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/columnreaders/TestBatchSizingMemoryUtil.java
new file mode 100644
index 0000000..9f4e026
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/columnreaders/TestBatchSizingMemoryUtil.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.parquet.columnreaders;
+
+import java.math.BigDecimal;
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.exec.physical.unit.PhysicalOpUnitTestBase;
+import org.apache.drill.exec.record.VectorContainer;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.exec.store.parquet.columnreaders.batchsizing.BatchSizingMemoryUtil;
+import org.apache.drill.exec.store.parquet.columnreaders.batchsizing.BatchSizingMemoryUtil.ColumnMemoryUsageInfo;
+import org.apache.drill.exec.store.parquet.columnreaders.batchsizing.RecordBatchSizerManager.ColumnMemoryQuota;
+import org.apache.drill.test.rowSet.RowSet;
+import org.apache.drill.test.rowSet.RowSetBuilder;
+import org.apache.drill.test.rowSet.schema.SchemaBuilder;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class TestBatchSizingMemoryUtil extends PhysicalOpUnitTestBase {
+//  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TestBatchSizingMemoryUtil.class);
+
+  // Batch schema
+  private static TupleMetadata schema;
+  private static TupleMetadata nullableSchema;
+
+  // Row set
+  private RowSet.SingleRowSet rowSet;
+
+  // Column memory usage information
+  private final ColumnMemoryUsageInfo[] columnMemoryInfo = new ColumnMemoryUsageInfo[3];
+
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+    schema = new SchemaBuilder()
+      .add("name_vchar", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.REQUIRED)
+      .add("name_vbinary", TypeProtos.MinorType.VARBINARY, TypeProtos.DataMode.REQUIRED)
+      .add("name_vdecimal", TypeProtos.MinorType.VARDECIMAL, TypeProtos.DataMode.REQUIRED)
+      .buildSchema();
+
+    nullableSchema = new SchemaBuilder()
+      .add("name_vchar", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+      .add("name_vbinary", TypeProtos.MinorType.VARBINARY, TypeProtos.DataMode.OPTIONAL)
+      .add("name_vdecimal", TypeProtos.MinorType.VARDECIMAL, TypeProtos.DataMode.OPTIONAL)
+      .buildSchema();
+  }
+
+  @Test
+  public void testCanAddNewData() {
+   try {
+     testCanAddNewData(false);
+
+    } finally {
+      if (rowSet != null) {
+        rowSet.clear();
+      }
+    }
+  }
+
+  @Test
+  public void testCanAddNewNullabalData() {
+   try {
+     testCanAddNewData(true);
+
+    } finally {
+      if (rowSet != null) {
+        rowSet.clear();
+      }
+    }
+  }
+
+  private void testCanAddNewData(boolean isOptional) {
+    final Object[] data = {"0123456789", new byte[10], new BigDecimal(Long.MAX_VALUE) };
+    final int numRows = 1;
+
+    // Load the test data into the associated Value Vectors
+    loadTestData(numRows, isOptional, data);
+
+    for (int columnIdx = 0; columnIdx < 3; columnIdx++) {
+      final ColumnMemoryUsageInfo columnInfo = columnMemoryInfo[columnIdx];
+      final int remainingBitsCapacity = getRemainingBitsCapacity(columnInfo);
+      final int remainingOffsetsCapacity = getRemainingOffsetsCapacity(columnInfo);
+      final int remainingDataCapacity = getRemainingDataCapacity(columnInfo);
+
+      // Test current VV is within quota (since we are not adding new entries)
+      Assert.assertTrue(BatchSizingMemoryUtil.canAddNewData(columnInfo, 0, 0, 0));
+
+      if (isOptional) {
+        // Test add the maximum offsets and data
+        Assert.assertTrue(BatchSizingMemoryUtil.canAddNewData(columnInfo, remainingBitsCapacity, remainingOffsetsCapacity, remainingDataCapacity));
+
+        // Test VV overflow: for bits, offsets, data, and then all
+        Assert.assertFalse(BatchSizingMemoryUtil.canAddNewData(columnInfo, remainingBitsCapacity + 1, remainingOffsetsCapacity, remainingDataCapacity));
+        Assert.assertFalse(BatchSizingMemoryUtil.canAddNewData(columnInfo, remainingBitsCapacity, remainingOffsetsCapacity + 1, remainingDataCapacity));
+        Assert.assertFalse(BatchSizingMemoryUtil.canAddNewData(columnInfo, remainingBitsCapacity, remainingOffsetsCapacity, remainingDataCapacity + 1));
+        Assert.assertFalse(BatchSizingMemoryUtil.canAddNewData(columnInfo, remainingBitsCapacity + 1, remainingOffsetsCapacity + 1, remainingDataCapacity + 1));
+      } else {
+        // Test add the maximum offsets and data
+        Assert.assertTrue(BatchSizingMemoryUtil.canAddNewData(columnInfo, 0, remainingOffsetsCapacity, remainingDataCapacity));
+
+        // Test VV overflow: for offsets, data, and then both
+        Assert.assertFalse(BatchSizingMemoryUtil.canAddNewData(columnInfo, 0, remainingOffsetsCapacity + 1, remainingDataCapacity));
+        Assert.assertFalse(BatchSizingMemoryUtil.canAddNewData(columnInfo, 0, remainingOffsetsCapacity, remainingDataCapacity + 1));
+        Assert.assertFalse(BatchSizingMemoryUtil.canAddNewData(columnInfo, 0, remainingOffsetsCapacity + 1, remainingDataCapacity + 1));
+      }
+    }
+  }
+
+  private void loadTestData(int numRows, boolean isOptional, Object...data) {
+    // First, lets create a row set
+    rowSet = null;
+    final TupleMetadata targetSchema = isOptional ? nullableSchema : schema;
+    final RowSetBuilder builder = operatorFixture.rowSetBuilder(targetSchema);
+
+    for (int rowIdx = 0; rowIdx < numRows; ++rowIdx) {
+      builder.addRow(data);
+    }
+    rowSet = builder.build();
+
+    // Now load the column memory information
+    for (int columnIdx = 0; columnIdx < columnMemoryInfo.length; columnIdx++) {
+      columnMemoryInfo[columnIdx] = getColumnMemoryUsageInfo(columnIdx);
+    }
+  }
+
+  private ColumnMemoryUsageInfo getColumnMemoryUsageInfo(int columnIdx) {
+    final VectorContainer vectorContainer = rowSet.container();
+    final ColumnMemoryUsageInfo result = new ColumnMemoryUsageInfo();
+
+    result.vector = vectorContainer.getValueVector(columnIdx).getValueVector();
+    result.currValueCount = vectorContainer.getRecordCount();
+    result.memoryQuota = new ColumnMemoryQuota(result.vector.getAllocatedSize());
+
+    // Load the VV memory usage information
+    BatchSizingMemoryUtil.getMemoryUsage(result.vector, result.currValueCount, result.vectorMemoryUsage);
+
+    return result;
+  }
+
+  private static int getRemainingBitsCapacity(ColumnMemoryUsageInfo columnInfo) {
+    return columnInfo.vectorMemoryUsage.bitsBytesCapacity - columnInfo.vectorMemoryUsage.bitsBytesUsed;
+  }
+
+  private static int getRemainingOffsetsCapacity(ColumnMemoryUsageInfo columnInfo) {
+    return columnInfo.vectorMemoryUsage.offsetsByteCapacity - columnInfo.vectorMemoryUsage.offsetsBytesUsed;
+  }
+
+  private static int getRemainingDataCapacity(ColumnMemoryUsageInfo columnInfo) {
+    return columnInfo.vectorMemoryUsage.dataByteCapacity - columnInfo.vectorMemoryUsage.dataBytesUsed;
+  }
+
+}
diff --git a/exec/vector/src/main/codegen/templates/NullableValueVectors.java b/exec/vector/src/main/codegen/templates/NullableValueVectors.java
index cdff556..f30cfae 100644
--- a/exec/vector/src/main/codegen/templates/NullableValueVectors.java
+++ b/exec/vector/src/main/codegen/templates/NullableValueVectors.java
@@ -395,6 +395,13 @@ public final class ${className} extends BaseDataValueVector implements <#if type
     return v;
   }
 
+  /**
+   * @return Underlying "bits" vector value capacity
+   */
+  public int getBitsValueCapacity() {
+    return bits.getValueCapacity();
+  }
+
   public void copyFrom(int fromIndex, int thisIndex, Nullable${minor.class}Vector from){
     final Accessor fromAccessor = from.getAccessor();
     if (!fromAccessor.isNull(fromIndex)) {
diff --git a/exec/vector/src/main/codegen/templates/VariableLengthVectors.java b/exec/vector/src/main/codegen/templates/VariableLengthVectors.java
index daef7ba..c35728e 100644
--- a/exec/vector/src/main/codegen/templates/VariableLengthVectors.java
+++ b/exec/vector/src/main/codegen/templates/VariableLengthVectors.java
@@ -632,6 +632,10 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements V
       // Let's process the input
       while (input.hasNext()) {
         T entry = input.next();
+
+        if (entry == null || entry.getNumValues() == 0) {
+          break; // this could happen when handling columnar batch sizing constraints
+        }
         bufferedMutator.setSafe(entry);
 
         if (callback != null) {
@@ -897,7 +901,7 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements V
       this.parent = parent;
       this.dataBuffOff = this.parent.offsetVector.getAccessor().get(startIdx);
       this.totalDataLen = this.dataBuffOff;
-      this.offsetsMutator = new UInt4Vector.BufferedMutator(startIdx, buffSz * 4, parent.offsetVector);
+      this.offsetsMutator = new UInt4Vector.BufferedMutator(startIdx, buffSz, parent.offsetVector);
 
       // Forcing the offsetsMutator to operate at index+1
       this.offsetsMutator.setSafe(this.dataBuffOff);
@@ -970,6 +974,7 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements V
 
       // Update counters
       dataBuffOff += buffer.position();
+      assert dataBuffOff == totalDataLen;
 
       // Reset the byte buffer
       buffer.clear();
@@ -1008,6 +1013,11 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements V
         remaining -= toCopy;
 
       } while (remaining > 0);
+
+      // We need to flush as offset data can be accessed during loading to
+      // figure out current payload size.
+      offsetsMutator.flush();
+
     }
   }
 }