You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by ti...@apache.org on 2019/03/13 19:17:08 UTC

[drill] branch master updated: DRILL-7100: Fixed IllegalArgumentException when reading Parquet data

This is an automated email from the ASF dual-hosted git repository.

timothyfarkas pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git


The following commit(s) were added to refs/heads/master by this push:
     new b20a2e6  DRILL-7100: Fixed IllegalArgumentException when reading Parquet data
b20a2e6 is described below

commit b20a2e6b5ee82814011a031d8a8282b0fec3ffe1
Author: Salim Achouche <sa...@gmail.com>
AuthorDate: Tue Mar 12 18:06:43 2019 -0700

    DRILL-7100: Fixed IllegalArgumentException when reading Parquet data
---
 .../batchsizing/BatchOverflowOptimizer.java        |  4 +-
 .../batchsizing/BatchSizingMemoryUtil.java         | 46 ++++++++++----------
 .../batchsizing/RecordBatchSizerManager.java       | 49 ++++++++++++----------
 .../columnreaders/TestBatchSizingMemoryUtil.java   | 12 +++---
 .../apache/drill/exec/memory/BaseAllocator.java    | 16 +++++++
 5 files changed, 73 insertions(+), 54 deletions(-)

diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/batchsizing/BatchOverflowOptimizer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/batchsizing/BatchOverflowOptimizer.java
index ca8fc05..3a7177f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/batchsizing/BatchOverflowOptimizer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/batchsizing/BatchOverflowOptimizer.java
@@ -91,7 +91,7 @@ final class BatchOverflowOptimizer {
       // do not account for null values as we are interested in the
       // actual data that is being stored within a batch.
       BatchSizingMemoryUtil.getMemoryUsage(stat.vector, stat.numValuesRead, vectorMemoryUsage);
-      final int batchColumnPrecision = Math.max(1, vectorMemoryUsage.dataBytesUsed / stat.numValuesRead);
+      final long batchColumnPrecision = Math.max(1, vectorMemoryUsage.dataBytesUsed / stat.numValuesRead);
 
       double currAvgPrecision = columnPrecisionStats.avgPrecision;
       double newAvgPrecision  = ((numBatches - 1) * currAvgPrecision + batchColumnPrecision) / numBatches;
@@ -138,7 +138,7 @@ final class BatchOverflowOptimizer {
     /** Materialized field */
     private final MaterializedField field;
     /** Average column precision */
-    private int avgPrecision;
+    private long avgPrecision;
 
     private ColumnPrecisionStats(MaterializedField field) {
       this.field = field;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/batchsizing/BatchSizingMemoryUtil.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/batchsizing/BatchSizingMemoryUtil.java
index 302d0eb..8782568 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/batchsizing/BatchSizingMemoryUtil.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/batchsizing/BatchSizingMemoryUtil.java
@@ -59,9 +59,9 @@ public final class BatchSizingMemoryUtil {
    *         limit; false otherwise
    */
   public static boolean canAddNewData(ColumnMemoryUsageInfo columnMemoryUsage,
-    int newBitsMemory,
-    int newOffsetsMemory,
-    int newDataMemory) {
+    long newBitsMemory,
+    long newOffsetsMemory,
+    long newDataMemory) {
 
     // First we need to update the vector memory usage
     final VectorMemoryUsageInfo vectorMemoryUsage = columnMemoryUsage.vectorMemoryUsage;
@@ -69,20 +69,20 @@ public final class BatchSizingMemoryUtil {
 
     // We need to compute the new ValueVector memory usage if we attempt to add the new payload
     // usedCapacity, int newPayload, int currentCapacity
-    int totalBitsMemory = computeNewVectorCapacity(vectorMemoryUsage.bitsBytesUsed,
+    long totalBitsMemory = computeNewVectorCapacity(vectorMemoryUsage.bitsBytesUsed,
       newBitsMemory,
       vectorMemoryUsage.bitsBytesCapacity);
 
-    int totalOffsetsMemory = computeNewVectorCapacity(vectorMemoryUsage.offsetsBytesUsed,
+    long totalOffsetsMemory = computeNewVectorCapacity(vectorMemoryUsage.offsetsBytesUsed,
       newOffsetsMemory,
       vectorMemoryUsage.offsetsByteCapacity);
 
-    int totalDataMemory = computeNewVectorCapacity(vectorMemoryUsage.dataBytesUsed,
+    long totalDataMemory = computeNewVectorCapacity(vectorMemoryUsage.dataBytesUsed,
       newDataMemory,
       vectorMemoryUsage.dataByteCapacity);
 
     // Alright now we can figure out whether the new payload will take us over the maximum memory threshold
-    int totalMemory = totalBitsMemory + totalOffsetsMemory + totalDataMemory;
+    long totalMemory = totalBitsMemory + totalOffsetsMemory + totalDataMemory;
     assert totalMemory >= 0;
 
     return totalMemory <= columnMemoryUsage.memoryQuota.getMaxMemoryUsage();
@@ -227,16 +227,16 @@ public final class BatchSizingMemoryUtil {
    * @param valueCount number of column values
    * @return memory size required to store "valueCount" within a value vector
    */
-  public static int computeFixedLengthVectorMemory(ParquetColumnMetadata column, int valueCount) {
+  public static long computeFixedLengthVectorMemory(ParquetColumnMetadata column, int valueCount) {
     assert column.isFixedLength();
 
     // Formula:  memory-usage = next-power-of-two(byte-size * valueCount)  // nullable storage (if any)
     //         + next-power-of-two(DT_LEN * valueCount)                    // data storage
 
-    int memoryUsage = BaseAllocator.nextPowerOfTwo(getFixedColumnTypePrecision(column) * valueCount);
+    long memoryUsage = BaseAllocator.longNextPowerOfTwo(getFixedColumnTypePrecision(column) * valueCount);
 
     if (column.getField().isNullable()) {
-      memoryUsage += BaseAllocator.nextPowerOfTwo(BYTE_VALUE_WIDTH * valueCount);
+      memoryUsage += BaseAllocator.longNextPowerOfTwo(BYTE_VALUE_WIDTH * valueCount);
     }
 
     return memoryUsage;
@@ -248,19 +248,19 @@ public final class BatchSizingMemoryUtil {
    * @param valueCount number of column values
    * @return memory size required to store "valueCount" within a value vector
    */
-  public static int computeVariableLengthVectorMemory(ParquetColumnMetadata column,
-    int averagePrecision, int valueCount) {
+  public static long computeVariableLengthVectorMemory(ParquetColumnMetadata column,
+    long averagePrecision, int valueCount) {
 
     assert !column.isFixedLength();
 
     // Formula:  memory-usage = next-power-of-two(byte-size * valueCount)  // nullable storage (if any)
     //         + next-power-of-two(int-size * valueCount)                  // offsets storage
     //         + next-power-of-two(DT_LEN * valueCount)                    // data storage
-    int memoryUsage = BaseAllocator.nextPowerOfTwo(averagePrecision * valueCount);
-    memoryUsage += BaseAllocator.nextPowerOfTwo(INT_VALUE_WIDTH * (valueCount + 1));
+    long memoryUsage = BaseAllocator.longNextPowerOfTwo(averagePrecision * valueCount);
+    memoryUsage += BaseAllocator.longNextPowerOfTwo(INT_VALUE_WIDTH * (valueCount + 1));
 
     if (column.getField().isNullable()) {
-      memoryUsage += BaseAllocator.nextPowerOfTwo(valueCount);
+      memoryUsage += BaseAllocator.longNextPowerOfTwo(valueCount);
     }
     return memoryUsage;
   }
@@ -269,8 +269,8 @@ public final class BatchSizingMemoryUtil {
 // Internal implementation
 // ----------------------------------------------------------------------------
 
-  private static int computeNewVectorCapacity(int usedCapacity, int newPayload, int currentCapacity) {
-    int newUsedCapacity = BaseAllocator.nextPowerOfTwo(usedCapacity + newPayload);
+  private static long computeNewVectorCapacity(long usedCapacity, long newPayload, long currentCapacity) {
+    long newUsedCapacity = BaseAllocator.longNextPowerOfTwo(usedCapacity + newPayload);
     assert newUsedCapacity >= 0;
 
     return Math.max(currentCapacity, newUsedCapacity);
@@ -299,17 +299,17 @@ public final class BatchSizingMemoryUtil {
    */
   public static final class VectorMemoryUsageInfo {
     /** Bits vector capacity */
-    public int bitsBytesCapacity;
+    public long bitsBytesCapacity;
     /** Offsets vector capacity */
-    public int offsetsByteCapacity;
+    public long offsetsByteCapacity;
     /** Data vector capacity */
-    public int dataByteCapacity;
+    public long dataByteCapacity;
     /** Bits vector used up capacity */
-    public int bitsBytesUsed;
+    public long bitsBytesUsed;
     /** Offsets vector used up capacity */
-    public int offsetsBytesUsed;
+    public long offsetsBytesUsed;
     /** Data vector used up capacity */
-    public int dataBytesUsed;
+    public long dataBytesUsed;
 
     public void reset() {
       bitsBytesCapacity = 0;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/batchsizing/RecordBatchSizerManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/batchsizing/RecordBatchSizerManager.java
index 5ddcf7e..2e22ce3 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/batchsizing/RecordBatchSizerManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/batchsizing/RecordBatchSizerManager.java
@@ -20,6 +20,7 @@ package org.apache.drill.exec.store.parquet.columnreaders.batchsizing;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
+
 import org.apache.drill.common.map.CaseInsensitiveMap;
 import org.apache.drill.common.types.TypeProtos.MajorType;
 import org.apache.drill.exec.ExecConstants;
@@ -34,6 +35,7 @@ import org.apache.drill.exec.util.record.RecordBatchStats;
 import org.apache.drill.exec.util.record.RecordBatchStats.RecordBatchStatsContext;
 import org.apache.drill.exec.vector.AllocationHelper;
 import org.apache.drill.exec.vector.ValueVector;
+import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
 
 /**
  * This class is tasked with managing all aspects of flat Parquet reader record batch sizing logic.
@@ -56,11 +58,11 @@ public final class RecordBatchSizerManager {
   /** Configured Parquet records per batch */
   private final int configRecordsPerBatch;
   /** Configured Parquet memory size per batch */
-  private final int configMemorySizePerBatch;
+  private final long configMemorySizePerBatch;
   /** An upper bound on the Parquet records per batch based on the configured value and schema */
   private int maxRecordsPerBatch;
   /** An upper bound on the Parquet memory size per batch based on the configured value and schema  */
-  private int maxMemorySizePerBatch;
+  private long maxMemorySizePerBatch;
   /** The current number of records per batch as it can be dynamically optimized */
   private int recordsPerBatch;
 
@@ -162,7 +164,8 @@ public final class RecordBatchSizerManager {
         ColumnMemoryInfo columnMemoryInfo = columnMemoryInfoMap.get(v.getField().getName());
 
         if (columnMemoryInfo != null) {
-          AllocationHelper.allocate(v, recordsPerBatch, columnMemoryInfo.columnPrecision, 0);
+          Preconditions.checkState(columnMemoryInfo.columnPrecision <= Integer.MAX_VALUE, "Column precision cannot exceed 2GB");
+          AllocationHelper.allocate(v, recordsPerBatch, (int) columnMemoryInfo.columnPrecision, 0);
         } else {
           // This column was found in another Parquet file but not the current one; so we inject
           // a null value. At this time, we do not account for such columns. Why? the right design is
@@ -219,7 +222,7 @@ public final class RecordBatchSizerManager {
   /**
    * @return current total memory per batch (may change across batches)
    */
-  public int getCurrentMemorySizePerBatch() {
+  public long getCurrentMemorySizePerBatch() {
     return maxMemorySizePerBatch; // Current logic doesn't mutate the max-memory after it has been set
   }
 
@@ -233,7 +236,7 @@ public final class RecordBatchSizerManager {
   /**
    * @return configured memory size per batch (may be different from the enforced one)
    */
-  public int getConfigMemorySizePerBatch() {
+  public long getConfigMemorySizePerBatch() {
     return configMemorySizePerBatch;
   }
 
@@ -265,13 +268,13 @@ public final class RecordBatchSizerManager {
 // Internal implementation logic
 // ----------------------------------------------------------------------------
 
-  private int getConfiguredMaxBatchMemory(OptionManager options) {
+  private long getConfiguredMaxBatchMemory(OptionManager options) {
     // Use the parquet specific configuration if set
-    int maxMemory = (int) options.getLong(ExecConstants.PARQUET_FLAT_BATCH_MEMORY_SIZE);
+    long maxMemory = options.getLong(ExecConstants.PARQUET_FLAT_BATCH_MEMORY_SIZE);
 
     // Otherwise, use the common property
     if (maxMemory <= 0) {
-      maxMemory = (int) options.getLong(ExecConstants.OUTPUT_BATCH_SIZE);
+      maxMemory = options.getLong(ExecConstants.OUTPUT_BATCH_SIZE);
     }
     return maxMemory;
   }
@@ -304,8 +307,8 @@ public final class RecordBatchSizerManager {
     return normalizedNumRecords;
   }
 
-  private int normalizeMemorySizePerBatch() {
-    int normalizedMemorySize = configMemorySizePerBatch;
+  private long normalizeMemorySizePerBatch() {
+    long normalizedMemorySize = configMemorySizePerBatch;
 
     if (normalizedMemorySize <= 0) {
       final String message = String.format("Invalid Parquet memory per batch [%d] byte(s)",
@@ -321,10 +324,10 @@ public final class RecordBatchSizerManager {
       return normalizedMemorySize; // NOOP
     }
 
-    final int memorySizePerColumn = normalizedMemorySize / numColumns;
+    final long memorySizePerColumn = normalizedMemorySize / numColumns;
 
     if (memorySizePerColumn < MIN_COLUMN_MEMORY_SZ) {
-      final int prevValue   = normalizedMemorySize;
+      final long prevValue   = normalizedMemorySize;
       normalizedMemorySize  = MIN_COLUMN_MEMORY_SZ * numColumns;
 
       final String message = String.format("The Parquet memory per batch [%d] byte(s) is too low for this query ; using [%d] bytes",
@@ -444,9 +447,9 @@ public final class RecordBatchSizerManager {
       return; // we're done
     }
 
-    final int totalMemoryNeeded = requiredMemory.fixedLenRequiredMemory + requiredMemory.variableLenRequiredMemory;
-    final int extraMemorySpace = maxMemorySizePerBatch - totalMemoryNeeded;
-    final int perColumnExtraSpace = extraMemorySpace / numVariableLengthColumns;
+    final long totalMemoryNeeded = requiredMemory.fixedLenRequiredMemory + requiredMemory.variableLenRequiredMemory;
+    final long extraMemorySpace = maxMemorySizePerBatch - totalMemoryNeeded;
+    final long perColumnExtraSpace = extraMemorySpace / numVariableLengthColumns;
 
     if (perColumnExtraSpace == 0) {
       return;
@@ -481,7 +484,7 @@ public final class RecordBatchSizerManager {
     return remove;
   }
 
-  private int computeVectorMemory(ColumnMemoryInfo columnInfo, int numValues) {
+  private long computeVectorMemory(ColumnMemoryInfo columnInfo, int numValues) {
     if (columnInfo.columnMeta.isFixedLength()) {
       return BatchSizingMemoryUtil.computeFixedLengthVectorMemory(columnInfo.columnMeta, numValues);
     }
@@ -506,7 +509,7 @@ public final class RecordBatchSizerManager {
       requiredMemory.variableLenRequiredMemory   += columnInfo.columnMemoryQuota.maxMemoryUsage;
     }
 
-    final int totalMemoryNeeded = requiredMemory.fixedLenRequiredMemory + requiredMemory.variableLenRequiredMemory;
+    final long totalMemoryNeeded = requiredMemory.fixedLenRequiredMemory + requiredMemory.variableLenRequiredMemory;
     assert totalMemoryNeeded > 0;
 
     double neededMemoryRatio = ((double) maxMemorySizePerBatch) / totalMemoryNeeded;
@@ -612,7 +615,7 @@ public final class RecordBatchSizerManager {
   /** Field memory quota */
   public static final class ColumnMemoryQuota {
     /** Maximum cumulative memory that could be used */
-    private int maxMemoryUsage;
+    private long maxMemoryUsage;
     /** Maximum number of values that could be inserted */
     private int maxNumValues;
 
@@ -622,14 +625,14 @@ public final class RecordBatchSizerManager {
     /**
      * @param maxMemoryUsage maximum cumulative memory that could be used
      */
-    public ColumnMemoryQuota(int maxMemoryUsage) {
+    public ColumnMemoryQuota(long maxMemoryUsage) {
       this.maxMemoryUsage = maxMemoryUsage;
     }
 
     /**
      * @return the maxMemoryUsage
      */
-    public int getMaxMemoryUsage() {
+    public long getMaxMemoryUsage() {
       return maxMemoryUsage;
     }
 
@@ -651,7 +654,7 @@ public final class RecordBatchSizerManager {
     /** Column metadata */
     ParquetColumnMetadata columnMeta;
     /** Column value precision (maximum length for VL columns) */
-    int columnPrecision;
+    long columnPrecision;
     /** Column current memory quota within a batch */
     final ColumnMemoryQuota columnMemoryQuota = new ColumnMemoryQuota();
   }
@@ -659,9 +662,9 @@ public final class RecordBatchSizerManager {
   /** Memory requirements container */
   static final class MemoryRequirementContainer {
     /** Memory needed for the fixed length columns given a specific record size */
-    private int fixedLenRequiredMemory;
+    private long fixedLenRequiredMemory;
     /** Memory needed for the fixed length columns given a specific record size */
-    private int variableLenRequiredMemory;
+    private long variableLenRequiredMemory;
 
     private void reset() {
       this.fixedLenRequiredMemory    = 0;
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/columnreaders/TestBatchSizingMemoryUtil.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/columnreaders/TestBatchSizingMemoryUtil.java
index 896675a..4cf1503 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/columnreaders/TestBatchSizingMemoryUtil.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/columnreaders/TestBatchSizingMemoryUtil.java
@@ -93,9 +93,9 @@ public class TestBatchSizingMemoryUtil extends PhysicalOpUnitTestBase {
 
     for (int columnIdx = 0; columnIdx < 3; columnIdx++) {
       final ColumnMemoryUsageInfo columnInfo = columnMemoryInfo[columnIdx];
-      final int remainingBitsCapacity = getRemainingBitsCapacity(columnInfo);
-      final int remainingOffsetsCapacity = getRemainingOffsetsCapacity(columnInfo);
-      final int remainingDataCapacity = getRemainingDataCapacity(columnInfo);
+      final long remainingBitsCapacity = getRemainingBitsCapacity(columnInfo);
+      final long remainingOffsetsCapacity = getRemainingOffsetsCapacity(columnInfo);
+      final long remainingDataCapacity = getRemainingDataCapacity(columnInfo);
 
       // Test current VV is within quota (since we are not adding new entries)
       Assert.assertTrue(BatchSizingMemoryUtil.canAddNewData(columnInfo, 0, 0, 0));
@@ -152,15 +152,15 @@ public class TestBatchSizingMemoryUtil extends PhysicalOpUnitTestBase {
     return result;
   }
 
-  private static int getRemainingBitsCapacity(ColumnMemoryUsageInfo columnInfo) {
+  private static long getRemainingBitsCapacity(ColumnMemoryUsageInfo columnInfo) {
     return columnInfo.vectorMemoryUsage.bitsBytesCapacity - columnInfo.vectorMemoryUsage.bitsBytesUsed;
   }
 
-  private static int getRemainingOffsetsCapacity(ColumnMemoryUsageInfo columnInfo) {
+  private static long getRemainingOffsetsCapacity(ColumnMemoryUsageInfo columnInfo) {
     return columnInfo.vectorMemoryUsage.offsetsByteCapacity - columnInfo.vectorMemoryUsage.offsetsBytesUsed;
   }
 
-  private static int getRemainingDataCapacity(ColumnMemoryUsageInfo columnInfo) {
+  private static long getRemainingDataCapacity(ColumnMemoryUsageInfo columnInfo) {
     return columnInfo.vectorMemoryUsage.dataByteCapacity - columnInfo.vectorMemoryUsage.dataBytesUsed;
   }
 
diff --git a/exec/memory/base/src/main/java/org/apache/drill/exec/memory/BaseAllocator.java b/exec/memory/base/src/main/java/org/apache/drill/exec/memory/BaseAllocator.java
index ab3ff1e..408b865 100644
--- a/exec/memory/base/src/main/java/org/apache/drill/exec/memory/BaseAllocator.java
+++ b/exec/memory/base/src/main/java/org/apache/drill/exec/memory/BaseAllocator.java
@@ -577,6 +577,22 @@ public abstract class BaseAllocator extends Accountant implements BufferAllocato
   }
 
   /**
+   * Rounds up the provided value to the nearest power of two.
+   *
+   * @param val
+   *          An integer long value.
+   * @return The closest power of two of that value.
+   */
+  public static long longNextPowerOfTwo(long val) {
+    long highestBit = Long.highestOneBit(val);
+    if (highestBit == val) {
+      return val;
+    } else {
+      return highestBit << 1;
+    }
+  }
+
+  /**
    * Verifies the accounting state of the allocator. Only works for DEBUG.
    *
    * @throws IllegalStateException