You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by ao...@apache.org on 2020/11/08 03:34:15 UTC

[iceberg] branch 0.10.x created (now fe96812)

This is an automated email from the ASF dual-hosted git repository.

aokolnychyi pushed a change to branch 0.10.x
in repository https://gitbox.apache.org/repos/asf/iceberg.git.


      at fe96812  Parquet: Fix vectorized reads for negative decimals (#1736)

This branch includes the following new commits:

     new de0d9cc  Core: Fix NPE in SnapshotManager.validateCurrentSnapshot (#1725)
     new 09d5a4a  Core: Fix NullPointerException in ManifestReader (#1730)
     new 5a822f7  API: Fix Expressions.notNull operation (#1722)
     new fe96812  Parquet: Fix vectorized reads for negative decimals (#1736)

The 4 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[iceberg] 01/04: Core: Fix NPE in SnapshotManager.validateCurrentSnapshot (#1725)

Posted by ao...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

aokolnychyi pushed a commit to branch 0.10.x
in repository https://gitbox.apache.org/repos/asf/iceberg.git

commit de0d9cca2e5c9da6d13263ec465a9853c6b6294f
Author: Jungtaek Lim <ka...@gmail.com>
AuthorDate: Fri Nov 6 10:02:10 2020 +0900

    Core: Fix NPE in SnapshotManager.validateCurrentSnapshot (#1725)
---
 core/src/main/java/org/apache/iceberg/SnapshotManager.java | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/core/src/main/java/org/apache/iceberg/SnapshotManager.java b/core/src/main/java/org/apache/iceberg/SnapshotManager.java
index 93ae46f..a663e06 100644
--- a/core/src/main/java/org/apache/iceberg/SnapshotManager.java
+++ b/core/src/main/java/org/apache/iceberg/SnapshotManager.java
@@ -235,7 +235,7 @@ public class SnapshotManager extends MergingSnapshotProducer<ManageSnapshots> im
   }
 
   private static void validateCurrentSnapshot(TableMetadata meta, Long requiredSnapshotId) {
-    if (requiredSnapshotId != null) {
+    if (requiredSnapshotId != null && meta.currentSnapshot() != null) {
       ValidationException.check(meta.currentSnapshot().snapshotId() == requiredSnapshotId,
           "Cannot fast-forward to non-append snapshot; current has changed: current=%s != required=%s",
           meta.currentSnapshot().snapshotId(), requiredSnapshotId);


[iceberg] 03/04: API: Fix Expressions.notNull operation (#1722)

Posted by ao...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

aokolnychyi pushed a commit to branch 0.10.x
in repository https://gitbox.apache.org/repos/asf/iceberg.git

commit 5a822f76e4bd3c4b3384e2b0976f2c06f7906c51
Author: yyanyy <71...@users.noreply.github.com>
AuthorDate: Fri Nov 6 12:12:45 2020 -0800

    API: Fix Expressions.notNull operation (#1722)
---
 api/src/main/java/org/apache/iceberg/expressions/Expressions.java | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/api/src/main/java/org/apache/iceberg/expressions/Expressions.java b/api/src/main/java/org/apache/iceberg/expressions/Expressions.java
index bd2639a..8089b15 100644
--- a/api/src/main/java/org/apache/iceberg/expressions/Expressions.java
+++ b/api/src/main/java/org/apache/iceberg/expressions/Expressions.java
@@ -120,7 +120,7 @@ public class Expressions {
   }
 
   public static <T> UnboundPredicate<T> notNull(UnboundTerm<T> expr) {
-    return new UnboundPredicate<>(Expression.Operation.IS_NULL, expr);
+    return new UnboundPredicate<>(Expression.Operation.NOT_NULL, expr);
   }
 
   public static <T> UnboundPredicate<T> lessThan(String name, T value) {


[iceberg] 04/04: Parquet: Fix vectorized reads for negative decimals (#1736)

Posted by ao...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

aokolnychyi pushed a commit to branch 0.10.x
in repository https://gitbox.apache.org/repos/asf/iceberg.git

commit fe968122352f90c921c577e76599c36a693dc4fc
Author: Samarth Jain <sa...@apache.org>
AuthorDate: Fri Nov 6 13:21:30 2020 -0800

    Parquet: Fix vectorized reads for negative decimals (#1736)
---
 .../java/org/apache/iceberg/util/RandomUtil.java   |  24 ++--
 .../arrow/vectorized/VectorizedArrowReader.java    |  14 ++-
 .../parquet/VectorizedColumnIterator.java          |  22 +++-
 ...orizedDictionaryEncodedParquetValuesReader.java |  51 +++++++--
 .../vectorized/parquet/VectorizedPageIterator.java |  39 ++++++-
 .../VectorizedParquetDefinitionLevelReader.java    | 125 +++++++++++++++++----
 6 files changed, 218 insertions(+), 57 deletions(-)

diff --git a/api/src/test/java/org/apache/iceberg/util/RandomUtil.java b/api/src/test/java/org/apache/iceberg/util/RandomUtil.java
index a9254cf..f01db61 100644
--- a/api/src/test/java/org/apache/iceberg/util/RandomUtil.java
+++ b/api/src/test/java/org/apache/iceberg/util/RandomUtil.java
@@ -31,6 +31,10 @@ public class RandomUtil {
   private RandomUtil() {
   }
 
+  private static boolean negate(int num) {
+    return num % 2 == 1;
+  }
+
   @SuppressWarnings("RandomModInteger")
   public static Object generatePrimitive(Type.PrimitiveType primitive,
                                          Random random) {
@@ -49,7 +53,7 @@ public class RandomUtil {
           case 3:
             return 0;
           default:
-            return random.nextInt();
+            return negate(choice) ? -1 * random.nextInt() : random.nextInt();
         }
 
       case LONG:
@@ -61,7 +65,7 @@ public class RandomUtil {
           case 3:
             return 0L;
           default:
-            return random.nextLong();
+            return negate(choice) ? -1L * random.nextLong() : random.nextLong();
         }
 
       case FLOAT:
@@ -83,7 +87,7 @@ public class RandomUtil {
           case 8:
             return Float.NaN;
           default:
-            return random.nextFloat();
+            return negate(choice) ? -1.0F * random.nextFloat() : random.nextFloat();
         }
 
       case DOUBLE:
@@ -105,7 +109,7 @@ public class RandomUtil {
           case 8:
             return Double.NaN;
           default:
-            return random.nextDouble();
+            return negate(choice) ? -1.0D * random.nextDouble() : random.nextDouble();
         }
 
       case DATE:
@@ -140,7 +144,8 @@ public class RandomUtil {
       case DECIMAL:
         Types.DecimalType type = (Types.DecimalType) primitive;
         BigInteger unscaled = randomUnscaled(type.precision(), random);
-        return new BigDecimal(unscaled, type.scale());
+        BigDecimal bigDecimal = new BigDecimal(unscaled, type.scale());
+        return negate(choice) ? bigDecimal.negate() : bigDecimal;
 
       default:
         throw new IllegalArgumentException(
@@ -155,11 +160,11 @@ public class RandomUtil {
         return true; // doesn't really matter for booleans since they are not dictionary encoded
       case INTEGER:
       case DATE:
-        return value;
+        return negate(value) ? -1 * value : value;
       case FLOAT:
-        return (float) value;
+        return negate(value) ? -1.0F * (float) value : (float) value;
       case DOUBLE:
-        return (double) value;
+        return negate(value) ? -1.0D * (double) value : (double) value;
       case LONG:
       case TIME:
       case TIMESTAMP:
@@ -177,7 +182,8 @@ public class RandomUtil {
       case DECIMAL:
         Types.DecimalType type = (Types.DecimalType) primitive;
         BigInteger unscaled = new BigInteger(String.valueOf(value + 1));
-        return new BigDecimal(unscaled, type.scale());
+        BigDecimal bd = new BigDecimal(unscaled, type.scale());
+        return negate(value) ? bd.negate() : bd;
       default:
         throw new IllegalArgumentException(
             "Cannot generate random value for unknown type: " + primitive);
diff --git a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/VectorizedArrowReader.java b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/VectorizedArrowReader.java
index bb571e0..4eb8091 100644
--- a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/VectorizedArrowReader.java
+++ b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/VectorizedArrowReader.java
@@ -92,7 +92,8 @@ public class VectorizedArrowReader implements VectorizedReader<VectorHolder> {
 
   private enum ReadType {
     FIXED_LENGTH_DECIMAL,
-    INT_LONG_BACKED_DECIMAL,
+    INT_BACKED_DECIMAL,
+    LONG_BACKED_DECIMAL,
     VARCHAR,
     VARBINARY,
     FIXED_WIDTH_BINARY,
@@ -130,8 +131,11 @@ public class VectorizedArrowReader implements VectorizedReader<VectorHolder> {
           case FIXED_LENGTH_DECIMAL:
             vectorizedColumnIterator.nextBatchFixedLengthDecimal(vec, typeWidth, nullabilityHolder);
             break;
-          case INT_LONG_BACKED_DECIMAL:
-            vectorizedColumnIterator.nextBatchIntLongBackedDecimal(vec, typeWidth, nullabilityHolder);
+          case INT_BACKED_DECIMAL:
+            vectorizedColumnIterator.nextBatchIntBackedDecimal(vec, nullabilityHolder);
+            break;
+          case LONG_BACKED_DECIMAL:
+            vectorizedColumnIterator.nextBatchLongBackedDecimal(vec, nullabilityHolder);
             break;
           case VARBINARY:
             vectorizedColumnIterator.nextBatchVarWidthType(vec, nullabilityHolder);
@@ -237,11 +241,11 @@ public class VectorizedArrowReader implements VectorizedReader<VectorHolder> {
                 this.typeWidth = primitive.getTypeLength();
                 break;
               case INT64:
-                this.readType = ReadType.INT_LONG_BACKED_DECIMAL;
+                this.readType = ReadType.LONG_BACKED_DECIMAL;
                 this.typeWidth = (int) BigIntVector.TYPE_WIDTH;
                 break;
               case INT32:
-                this.readType = ReadType.INT_LONG_BACKED_DECIMAL;
+                this.readType = ReadType.INT_BACKED_DECIMAL;
                 this.typeWidth = (int) IntVector.TYPE_WIDTH;
                 break;
               default:
diff --git a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedColumnIterator.java b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedColumnIterator.java
index cb9d278..d963c45 100644
--- a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedColumnIterator.java
+++ b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedColumnIterator.java
@@ -129,16 +129,30 @@ public class VectorizedColumnIterator extends BaseColumnIterator {
     }
   }
 
-  public void nextBatchIntLongBackedDecimal(
+  public void nextBatchIntBackedDecimal(
       FieldVector fieldVector,
-      int typeWidth,
       NullabilityHolder nullabilityHolder) {
     int rowsReadSoFar = 0;
     while (rowsReadSoFar < batchSize && hasNext()) {
       advance();
       int rowsInThisBatch =
-          vectorizedPageIterator.nextBatchIntLongBackedDecimal(fieldVector, batchSize - rowsReadSoFar,
-              rowsReadSoFar, typeWidth, nullabilityHolder);
+          vectorizedPageIterator.nextBatchIntBackedDecimal(fieldVector, batchSize - rowsReadSoFar,
+              rowsReadSoFar, nullabilityHolder);
+      rowsReadSoFar += rowsInThisBatch;
+      this.triplesRead += rowsInThisBatch;
+      fieldVector.setValueCount(rowsReadSoFar);
+    }
+  }
+
+  public void nextBatchLongBackedDecimal(
+          FieldVector fieldVector,
+          NullabilityHolder nullabilityHolder) {
+    int rowsReadSoFar = 0;
+    while (rowsReadSoFar < batchSize && hasNext()) {
+      advance();
+      int rowsInThisBatch =
+              vectorizedPageIterator.nextBatchLongBackedDecimal(fieldVector, batchSize - rowsReadSoFar,
+                      rowsReadSoFar, nullabilityHolder);
       rowsReadSoFar += rowsInThisBatch;
       this.triplesRead += rowsInThisBatch;
       fieldVector.setValueCount(rowsReadSoFar);
diff --git a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedDictionaryEncodedParquetValuesReader.java b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedDictionaryEncodedParquetValuesReader.java
index 52e389e..74d9e15 100644
--- a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedDictionaryEncodedParquetValuesReader.java
+++ b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedDictionaryEncodedParquetValuesReader.java
@@ -286,8 +286,8 @@ public class VectorizedDictionaryEncodedParquetValuesReader extends BaseVectoriz
         case RLE:
           for (int i = 0; i < num; i++) {
             byte[] decimalBytes = dict.decodeToBinary(currentValue).getBytesUnsafe();
-            byte[] vectorBytes = new byte[DecimalVector.TYPE_WIDTH];
-            System.arraycopy(decimalBytes, 0, vectorBytes, DecimalVector.TYPE_WIDTH - typeWidth, typeWidth);
+            byte[] vectorBytes = new byte[typeWidth];
+            System.arraycopy(decimalBytes, 0, vectorBytes, 0, typeWidth);
             ((DecimalVector) vector).setBigEndian(idx, vectorBytes);
             nullabilityHolder.setNotNull(idx);
             idx++;
@@ -296,8 +296,8 @@ public class VectorizedDictionaryEncodedParquetValuesReader extends BaseVectoriz
         case PACKED:
           for (int i = 0; i < num; i++) {
             byte[] decimalBytes = dict.decodeToBinary(packedValuesBuffer[packedValuesBufferIdx++]).getBytesUnsafe();
-            byte[] vectorBytes = new byte[DecimalVector.TYPE_WIDTH];
-            System.arraycopy(decimalBytes, 0, vectorBytes, DecimalVector.TYPE_WIDTH - typeWidth, typeWidth);
+            byte[] vectorBytes = new byte[typeWidth];
+            System.arraycopy(decimalBytes, 0, vectorBytes, 0, typeWidth);
             ((DecimalVector) vector).setBigEndian(idx, vectorBytes);
             nullabilityHolder.setNotNull(idx);
             idx++;
@@ -343,7 +343,7 @@ public class VectorizedDictionaryEncodedParquetValuesReader extends BaseVectoriz
     }
   }
 
-  void readBatchOfDictionaryEncodedIntLongBackedDecimals(FieldVector vector, int typeWidth, int startOffset,
+  void readBatchOfDictionaryEncodedIntBackedDecimals(FieldVector vector, int startOffset,
                                                          int numValuesToRead, Dictionary dict,
                                                          NullabilityHolder nullabilityHolder) {
     int left = numValuesToRead;
@@ -358,7 +358,7 @@ public class VectorizedDictionaryEncodedParquetValuesReader extends BaseVectoriz
           for (int i = 0; i < num; i++) {
             ((DecimalVector) vector).set(
                 idx,
-                typeWidth == Integer.BYTES ? dict.decodeToInt(currentValue) : dict.decodeToLong(currentValue));
+                dict.decodeToInt(currentValue));
             nullabilityHolder.setNotNull(idx);
             idx++;
           }
@@ -366,10 +366,41 @@ public class VectorizedDictionaryEncodedParquetValuesReader extends BaseVectoriz
         case PACKED:
           for (int i = 0; i < num; i++) {
             ((DecimalVector) vector).set(
-                idx,
-                typeWidth == Integer.BYTES ?
-                    dict.decodeToInt(packedValuesBuffer[packedValuesBufferIdx++])
-                    : dict.decodeToLong(packedValuesBuffer[packedValuesBufferIdx++]));
+                idx, dict.decodeToInt(packedValuesBuffer[packedValuesBufferIdx++]));
+            nullabilityHolder.setNotNull(idx);
+            idx++;
+          }
+          break;
+      }
+      left -= num;
+      currentCount -= num;
+    }
+  }
+
+  void readBatchOfDictionaryEncodedLongBackedDecimals(FieldVector vector, int startOffset,
+                                                     int numValuesToRead, Dictionary dict,
+                                                     NullabilityHolder nullabilityHolder) {
+    int left = numValuesToRead;
+    int idx = startOffset;
+    while (left > 0) {
+      if (this.currentCount == 0) {
+        this.readNextGroup();
+      }
+      int num = Math.min(left, this.currentCount);
+      switch (mode) {
+        case RLE:
+          for (int i = 0; i < num; i++) {
+            ((DecimalVector) vector).set(
+                    idx,
+                    dict.decodeToLong(currentValue));
+            nullabilityHolder.setNotNull(idx);
+            idx++;
+          }
+          break;
+        case PACKED:
+          for (int i = 0; i < num; i++) {
+            ((DecimalVector) vector).set(
+                    idx, dict.decodeToLong(packedValuesBuffer[packedValuesBufferIdx++]));
             nullabilityHolder.setNotNull(idx);
             idx++;
           }
diff --git a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedPageIterator.java b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedPageIterator.java
index 2aa6f2c..9876962 100644
--- a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedPageIterator.java
+++ b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedPageIterator.java
@@ -274,28 +274,26 @@ public class VectorizedPageIterator extends BasePageIterator {
    * Method for reading a batch of decimals backed by INT32 and INT64 parquet data types. Since Arrow stores all
    * decimals in 16 bytes, byte arrays are appropriately padded before being written to Arrow data buffers.
    */
-  public int nextBatchIntLongBackedDecimal(
+  public int nextBatchIntBackedDecimal(
       final FieldVector vector, final int expectedBatchSize, final int numValsInVector,
-      final int typeWidth, NullabilityHolder nullabilityHolder) {
+      NullabilityHolder nullabilityHolder) {
     final int actualBatchSize = getActualBatchSize(expectedBatchSize);
     if (actualBatchSize <= 0) {
       return 0;
     }
     if (dictionaryDecodeMode == DictionaryDecodeMode.EAGER) {
       vectorizedDefinitionLevelReader
-          .readBatchOfDictionaryEncodedIntLongBackedDecimals(
+          .readBatchOfDictionaryEncodedIntBackedDecimals(
               vector,
               numValsInVector,
-              typeWidth,
               actualBatchSize,
               nullabilityHolder,
               dictionaryEncodedValuesReader,
               dictionary);
     } else {
-      vectorizedDefinitionLevelReader.readBatchOfIntLongBackedDecimals(
+      vectorizedDefinitionLevelReader.readBatchOfIntBackedDecimals(
           vector,
           numValsInVector,
-          typeWidth,
           actualBatchSize,
           nullabilityHolder,
           plainValuesReader);
@@ -305,6 +303,35 @@ public class VectorizedPageIterator extends BasePageIterator {
     return actualBatchSize;
   }
 
+  public int nextBatchLongBackedDecimal(
+          final FieldVector vector, final int expectedBatchSize, final int numValsInVector,
+          NullabilityHolder nullabilityHolder) {
+    final int actualBatchSize = getActualBatchSize(expectedBatchSize);
+    if (actualBatchSize <= 0) {
+      return 0;
+    }
+    if (dictionaryDecodeMode == DictionaryDecodeMode.EAGER) {
+      vectorizedDefinitionLevelReader
+              .readBatchOfDictionaryEncodedLongBackedDecimals(
+                      vector,
+                      numValsInVector,
+                      actualBatchSize,
+                      nullabilityHolder,
+                      dictionaryEncodedValuesReader,
+                      dictionary);
+    } else {
+      vectorizedDefinitionLevelReader.readBatchOfLongBackedDecimals(
+              vector,
+              numValsInVector,
+              actualBatchSize,
+              nullabilityHolder,
+              plainValuesReader);
+    }
+    triplesRead += actualBatchSize;
+    this.hasNext = triplesRead < triplesCount;
+    return actualBatchSize;
+  }
+
   /**
    * Method for reading a batch of decimals backed by fixed length byte array parquet data type. Arrow stores all
    * decimals in 16 bytes. This method provides the necessary padding to the decimals read. Moreover, Arrow interprets
diff --git a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedParquetDefinitionLevelReader.java b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedParquetDefinitionLevelReader.java
index 7c21119..d330f09 100644
--- a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedParquetDefinitionLevelReader.java
+++ b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedParquetDefinitionLevelReader.java
@@ -623,12 +623,12 @@ public final class VectorizedParquetDefinitionLevelReader extends BaseVectorized
         this.readNextGroup();
       }
       int num = Math.min(left, this.currentCount);
-      byte[] byteArray = new byte[DecimalVector.TYPE_WIDTH];
+      byte[] byteArray = new byte[typeWidth];
       switch (mode) {
         case RLE:
           if (currentValue == maxDefLevel) {
             for (int i = 0; i < num; i++) {
-              valuesReader.getBuffer(typeWidth).get(byteArray, DecimalVector.TYPE_WIDTH - typeWidth, typeWidth);
+              valuesReader.getBuffer(typeWidth).get(byteArray, 0, typeWidth);
               ((DecimalVector) vector).setBigEndian(bufferIdx, byteArray);
               nullabilityHolder.setNotNull(bufferIdx);
               bufferIdx++;
@@ -641,7 +641,7 @@ public final class VectorizedParquetDefinitionLevelReader extends BaseVectorized
         case PACKED:
           for (int i = 0; i < num; ++i) {
             if (packedValuesBuffer[packedValuesBufferIdx++] == maxDefLevel) {
-              valuesReader.getBuffer(typeWidth).get(byteArray, DecimalVector.TYPE_WIDTH - typeWidth, typeWidth);
+              valuesReader.getBuffer(typeWidth).get(byteArray, 0, typeWidth);
               ((DecimalVector) vector).setBigEndian(bufferIdx, byteArray);
               nullabilityHolder.setNotNull(bufferIdx);
             } else {
@@ -685,8 +685,8 @@ public final class VectorizedParquetDefinitionLevelReader extends BaseVectorized
           for (int i = 0; i < num; i++) {
             if (packedValuesBuffer[packedValuesBufferIdx++] == maxDefLevel) {
               ByteBuffer decimalBytes = dict.decodeToBinary(dictionaryEncodedValuesReader.readInteger()).toByteBuffer();
-              byte[] vectorBytes = new byte[DecimalVector.TYPE_WIDTH];
-              System.arraycopy(decimalBytes, 0, vectorBytes, DecimalVector.TYPE_WIDTH - typeWidth, typeWidth);
+              byte[] vectorBytes = new byte[typeWidth];
+              System.arraycopy(decimalBytes, 0, vectorBytes, 0, typeWidth);
               ((DecimalVector) vector).setBigEndian(idx, vectorBytes);
               nullabilityHolder.setNotNull(idx);
             } else {
@@ -805,10 +805,48 @@ public final class VectorizedParquetDefinitionLevelReader extends BaseVectorized
     }
   }
 
-  public void readBatchOfIntLongBackedDecimals(
+  public void readBatchOfIntBackedDecimals(
       final FieldVector vector, final int startOffset,
-      final int typeWidth, final int numValsToRead, NullabilityHolder nullabilityHolder,
-      ValuesAsBytesReader valuesReader) {
+      final int numValsToRead, NullabilityHolder nullabilityHolder, ValuesAsBytesReader valuesReader) {
+    int bufferIdx = startOffset;
+    int left = numValsToRead;
+    while (left > 0) {
+      if (this.currentCount == 0) {
+        this.readNextGroup();
+      }
+      int num = Math.min(left, this.currentCount);
+      byte[] byteArray = new byte[Integer.BYTES];
+      switch (mode) {
+        case RLE:
+          if (currentValue == maxDefLevel) {
+            for (int i = 0; i < num; i++) {
+              setIntBackedDecimal(vector, nullabilityHolder, valuesReader, bufferIdx, byteArray);
+              bufferIdx++;
+            }
+          } else {
+            setNulls(nullabilityHolder, bufferIdx, num, vector.getValidityBuffer());
+            bufferIdx += num;
+          }
+          break;
+        case PACKED:
+          for (int i = 0; i < num; ++i) {
+            if (packedValuesBuffer[packedValuesBufferIdx++] == maxDefLevel) {
+              setIntBackedDecimal(vector, nullabilityHolder, valuesReader, bufferIdx, byteArray);
+            } else {
+              setNull(nullabilityHolder, bufferIdx, vector.getValidityBuffer());
+            }
+            bufferIdx++;
+          }
+          break;
+      }
+      left -= num;
+      currentCount -= num;
+    }
+  }
+
+  public void readBatchOfLongBackedDecimals(
+          final FieldVector vector, final int startOffset,
+          final int numValsToRead, NullabilityHolder nullabilityHolder, ValuesAsBytesReader valuesReader) {
     int bufferIdx = startOffset;
     int left = numValsToRead;
     while (left > 0) {
@@ -816,12 +854,12 @@ public final class VectorizedParquetDefinitionLevelReader extends BaseVectorized
         this.readNextGroup();
       }
       int num = Math.min(left, this.currentCount);
-      byte[] byteArray = new byte[DecimalVector.TYPE_WIDTH];
+      byte[] byteArray = new byte[Long.BYTES];
       switch (mode) {
         case RLE:
           if (currentValue == maxDefLevel) {
             for (int i = 0; i < num; i++) {
-              setIntLongBackedDecimal(vector, typeWidth, nullabilityHolder, valuesReader, bufferIdx, byteArray);
+              setLongBackedDecimal(vector, nullabilityHolder, valuesReader, bufferIdx, byteArray);
               bufferIdx++;
             }
           } else {
@@ -832,7 +870,7 @@ public final class VectorizedParquetDefinitionLevelReader extends BaseVectorized
         case PACKED:
           for (int i = 0; i < num; ++i) {
             if (packedValuesBuffer[packedValuesBufferIdx++] == maxDefLevel) {
-              setIntLongBackedDecimal(vector, typeWidth, nullabilityHolder, valuesReader, bufferIdx, byteArray);
+              setLongBackedDecimal(vector, nullabilityHolder, valuesReader, bufferIdx, byteArray);
             } else {
               setNull(nullabilityHolder, bufferIdx, vector.getValidityBuffer());
             }
@@ -845,20 +883,21 @@ public final class VectorizedParquetDefinitionLevelReader extends BaseVectorized
     }
   }
 
-  private void setIntLongBackedDecimal(FieldVector vector, int typeWidth, NullabilityHolder nullabilityHolder,
+  private void setIntBackedDecimal(FieldVector vector, NullabilityHolder nullabilityHolder,
                                        ValuesAsBytesReader valuesReader, int bufferIdx, byte[] byteArray) {
-    valuesReader.getBuffer(typeWidth).get(byteArray, 0, typeWidth);
-    vector.getDataBuffer().setBytes(bufferIdx * DecimalVector.TYPE_WIDTH, byteArray);
+    ((DecimalVector) vector).set(bufferIdx, valuesReader.getBuffer(Integer.BYTES).getInt());
     nullabilityHolder.setNotNull(bufferIdx);
-    if (setArrowValidityVector) {
-      BitVectorHelper.setBit(vector.getValidityBuffer(), bufferIdx);
-    }
   }
 
-  public void readBatchOfDictionaryEncodedIntLongBackedDecimals(
+  private void setLongBackedDecimal(FieldVector vector, NullabilityHolder nullabilityHolder,
+                                   ValuesAsBytesReader valuesReader, int bufferIdx, byte[] byteArray) {
+    ((DecimalVector) vector).set(bufferIdx, valuesReader.getBuffer(Long.BYTES).getLong());
+    nullabilityHolder.setNotNull(bufferIdx);
+  }
+
+  public void readBatchOfDictionaryEncodedIntBackedDecimals(
       final FieldVector vector,
       final int startOffset,
-      final int typeWidth,
       final int numValsToRead,
       NullabilityHolder nullabilityHolder,
       VectorizedDictionaryEncodedParquetValuesReader dictionaryEncodedValuesReader,
@@ -873,7 +912,7 @@ public final class VectorizedParquetDefinitionLevelReader extends BaseVectorized
       switch (mode) {
         case RLE:
           if (currentValue == maxDefLevel) {
-            dictionaryEncodedValuesReader.readBatchOfDictionaryEncodedIntLongBackedDecimals(vector, typeWidth, idx,
+            dictionaryEncodedValuesReader.readBatchOfDictionaryEncodedIntBackedDecimals(vector, idx,
                 num, dict, nullabilityHolder);
           } else {
             setNulls(nullabilityHolder, idx, num, vector.getValidityBuffer());
@@ -885,9 +924,49 @@ public final class VectorizedParquetDefinitionLevelReader extends BaseVectorized
             if (packedValuesBuffer[packedValuesBufferIdx++] == maxDefLevel) {
               ((DecimalVector) vector).set(
                   idx,
-                  typeWidth == Integer.BYTES ?
-                      dict.decodeToInt(dictionaryEncodedValuesReader.readInteger())
-                      : dict.decodeToLong(dictionaryEncodedValuesReader.readInteger()));
+                      dict.decodeToInt(dictionaryEncodedValuesReader.readInteger()));
+              nullabilityHolder.setNotNull(idx);
+            } else {
+              setNull(nullabilityHolder, idx, vector.getValidityBuffer());
+            }
+            idx++;
+          }
+          break;
+      }
+      left -= num;
+      currentCount -= num;
+    }
+  }
+
+  public void readBatchOfDictionaryEncodedLongBackedDecimals(
+          final FieldVector vector,
+          final int startOffset,
+          final int numValsToRead,
+          NullabilityHolder nullabilityHolder,
+          VectorizedDictionaryEncodedParquetValuesReader dictionaryEncodedValuesReader,
+          Dictionary dict) {
+    int idx = startOffset;
+    int left = numValsToRead;
+    while (left > 0) {
+      if (this.currentCount == 0) {
+        this.readNextGroup();
+      }
+      int num = Math.min(left, this.currentCount);
+      switch (mode) {
+        case RLE:
+          if (currentValue == maxDefLevel) {
+            dictionaryEncodedValuesReader.readBatchOfDictionaryEncodedLongBackedDecimals(vector, idx,
+                    num, dict, nullabilityHolder);
+          } else {
+            setNulls(nullabilityHolder, idx, num, vector.getValidityBuffer());
+          }
+          idx += num;
+          break;
+        case PACKED:
+          for (int i = 0; i < num; i++) {
+            if (packedValuesBuffer[packedValuesBufferIdx++] == maxDefLevel) {
+              ((DecimalVector) vector).set(
+                      idx, dict.decodeToLong(dictionaryEncodedValuesReader.readInteger()));
               nullabilityHolder.setNotNull(idx);
             } else {
               setNull(nullabilityHolder, idx, vector.getValidityBuffer());


[iceberg] 02/04: Core: Fix NullPointerException in ManifestReader (#1730)

Posted by ao...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

aokolnychyi pushed a commit to branch 0.10.x
in repository https://gitbox.apache.org/repos/asf/iceberg.git

commit 09d5a4a4ccb640992b5d0c6a79e54f780948923b
Author: Ryan Blue <bl...@apache.org>
AuthorDate: Thu Nov 5 17:48:18 2020 -0800

    Core: Fix NullPointerException in ManifestReader (#1730)
    
    This fixes a NullPointerException that is thrown by a ManifestReader for delete files when there is a query filter. The DeleteFileIndex projects all fields of a delete manifest, so it doesn't call select to select specific columns, unlike ManifestGroup, which selects * by default. When select is not called, methods that check whether to add stats columns fail, but only if there is a row filter because stats columns are not needed if there is no row filter.
    
    Existing tests either called select to configure the reader, or didn't pass a row filter and projected all rows. This adds a test that uses DeleteFileIndex and a test for ManifestReader. This also fixes dropStats in addition to requireStatsProjection.
    
    Co-authored-by: 钟保罗 <zh...@shxgroup.net>
---
 .../java/org/apache/iceberg/ManifestReader.java    |  2 +
 .../org/apache/iceberg/TestManifestReader.java     | 20 ++++++++++
 .../spark/source/TestSparkReaderDeletes.java       | 46 ++++++++++++++++++++++
 3 files changed, 68 insertions(+)

diff --git a/core/src/main/java/org/apache/iceberg/ManifestReader.java b/core/src/main/java/org/apache/iceberg/ManifestReader.java
index 8311c19..939de8d 100644
--- a/core/src/main/java/org/apache/iceberg/ManifestReader.java
+++ b/core/src/main/java/org/apache/iceberg/ManifestReader.java
@@ -271,6 +271,7 @@ public class ManifestReader<F extends ContentFile<F>>
   private static boolean requireStatsProjection(Expression rowFilter, Collection<String> columns) {
     // Make sure we have all stats columns for metrics evaluator
     return rowFilter != Expressions.alwaysTrue() &&
+        columns != null &&
         !columns.containsAll(ManifestReader.ALL_COLUMNS) &&
         !columns.containsAll(STATS_COLUMNS);
   }
@@ -279,6 +280,7 @@ public class ManifestReader<F extends ContentFile<F>>
     // Make sure we only drop all stats if we had projected all stats
     // We do not drop stats even if we had partially added some stats columns
     return rowFilter != Expressions.alwaysTrue() &&
+        columns != null &&
         !columns.containsAll(ManifestReader.ALL_COLUMNS) &&
         Sets.intersection(Sets.newHashSet(columns), STATS_COLUMNS).isEmpty();
   }
diff --git a/core/src/test/java/org/apache/iceberg/TestManifestReader.java b/core/src/test/java/org/apache/iceberg/TestManifestReader.java
index 1706404..c1c2423 100644
--- a/core/src/test/java/org/apache/iceberg/TestManifestReader.java
+++ b/core/src/test/java/org/apache/iceberg/TestManifestReader.java
@@ -21,8 +21,12 @@ package org.apache.iceberg;
 
 import java.io.IOException;
 import java.util.List;
+import java.util.stream.Collectors;
 import org.apache.iceberg.ManifestEntry.Status;
+import org.apache.iceberg.expressions.Expressions;
 import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Streams;
 import org.apache.iceberg.types.Types;
 import org.junit.Assert;
 import org.junit.Test;
@@ -52,6 +56,22 @@ public class TestManifestReader extends TableTestBase {
   }
 
   @Test
+  public void testReaderWithFilterWithoutSelect() throws IOException {
+    ManifestFile manifest = writeManifest(1000L, FILE_A, FILE_B, FILE_C);
+    try (ManifestReader<DataFile> reader = ManifestFiles.read(manifest, FILE_IO)
+        .filterRows(Expressions.equal("id", 0))) {
+      List<String> files = Streams.stream(reader)
+          .map(file -> file.path().toString())
+          .collect(Collectors.toList());
+
+      // note that all files are returned because the reader returns data files that may match, and the partition is
+      // bucketing by data, which doesn't help filter files
+      Assert.assertEquals("Should read the expected files",
+          Lists.newArrayList(FILE_A.path(), FILE_B.path(), FILE_C.path()), files);
+    }
+  }
+
+  @Test
   public void testInvalidUsage() throws IOException {
     ManifestFile manifest = writeManifest(FILE_A, FILE_B);
     AssertHelpers.assertThrows(
diff --git a/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java b/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java
index 229a361..68787a2 100644
--- a/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java
+++ b/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java
@@ -19,19 +19,28 @@
 
 package org.apache.iceberg.spark.source;
 
+import java.io.IOException;
+import java.util.List;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.iceberg.BaseTable;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.Files;
 import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.TableMetadata;
 import org.apache.iceberg.TableOperations;
+import org.apache.iceberg.TestHelpers;
 import org.apache.iceberg.catalog.Namespace;
 import org.apache.iceberg.catalog.TableIdentifier;
 import org.apache.iceberg.data.DeleteReadTests;
+import org.apache.iceberg.data.FileHelpers;
+import org.apache.iceberg.data.GenericRecord;
+import org.apache.iceberg.data.Record;
 import org.apache.iceberg.exceptions.AlreadyExistsException;
 import org.apache.iceberg.hive.HiveCatalog;
 import org.apache.iceberg.hive.TestHiveMetastore;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.spark.SparkStructLike;
 import org.apache.iceberg.types.Types;
 import org.apache.iceberg.util.StructLikeSet;
@@ -40,7 +49,9 @@ import org.apache.spark.sql.Row;
 import org.apache.spark.sql.SparkSession;
 import org.apache.spark.sql.internal.SQLConf;
 import org.junit.AfterClass;
+import org.junit.Assert;
 import org.junit.BeforeClass;
+import org.junit.Test;
 
 import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.METASTOREURIS;
 
@@ -113,4 +124,39 @@ public abstract class TestSparkReaderDeletes extends DeleteReadTests {
 
     return set;
   }
+
+  @Test
+  public void testEqualityDeleteWithFilter() throws IOException {
+    String tableName = "test_with_filter";
+    Table table = createTable(tableName, SCHEMA, SPEC);
+    Schema deleteRowSchema = table.schema().select("data");
+    Record dataDelete = GenericRecord.create(deleteRowSchema);
+    List<Record> dataDeletes = Lists.newArrayList(
+        dataDelete.copy("data", "a"), // id = 29
+        dataDelete.copy("data", "d"), // id = 89
+        dataDelete.copy("data", "g") // id = 122
+    );
+
+    DeleteFile eqDeletes = FileHelpers.writeDeleteFile(
+        table, Files.localOutput(temp.newFile()), TestHelpers.Row.of(0), dataDeletes, deleteRowSchema);
+
+    table.newRowDelta()
+        .addDeletes(eqDeletes)
+        .commit();
+
+    Types.StructType projection = table.schema().select("*").asStruct();
+    Dataset<Row> df = spark.read()
+        .format("iceberg")
+        .load(TableIdentifier.of("default", tableName).toString())
+        .filter("data = 'a'") // select a deleted row
+        .selectExpr("*");
+
+    StructLikeSet actual = StructLikeSet.create(projection);
+    df.collectAsList().forEach(row -> {
+      SparkStructLike rowWrapper = new SparkStructLike(projection);
+      actual.add(rowWrapper.wrap(row));
+    });
+
+    Assert.assertEquals("Table should contain no rows", 0, actual.size());
+  }
 }