You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2022/01/27 23:21:20 UTC

[GitHub] [hudi] manojpec commented on a change in pull request #4705: [HUDI-3337] Fixing Parquet Column Range metadata extraction

manojpec commented on a change in pull request #4705:
URL: https://github.com/apache/hudi/pull/4705#discussion_r794079202



##########
File path: hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/columnstats/ColumnStatsIndexHelper.java
##########
@@ -447,8 +448,8 @@ private static String composeZIndexColName(String col, String statName) {
           new Float(colMetadata.getMaxValue().toString()));
     } else if (colType instanceof BinaryType) {
       return Pair.of(
-          ((Binary) colMetadata.getMinValue()).getBytes(),
-          ((Binary) colMetadata.getMaxValue()).getBytes());
+          ((ByteBuffer) colMetadata.getMinValue()).array(),

Review comment:
       Now that `convertToNativeJavaType()` returns a java String type for ParquetUTF8BinaryStringType, Line 426 needs to change right? No need for Binary casting there.  

##########
File path: hudi-common/src/main/java/org/apache/hudi/common/model/HoodieColumnRangeMetadata.java
##########
@@ -31,15 +29,13 @@
   private final T minValue;
   private final T maxValue;
   private final long numNulls;
-  private final PrimitiveStringifier stringifier;
 
-  public HoodieColumnRangeMetadata(final String filePath, final String columnName, final T minValue, final T maxValue, final long numNulls, final PrimitiveStringifier stringifier) {
+  public HoodieColumnRangeMetadata(final String filePath, final String columnName, final T minValue, final T maxValue, final long numNulls) {

Review comment:
       Each data type has its own Stringifier type. A common one is sufficient here?

##########
File path: hudi-common/src/main/java/org/apache/hudi/common/util/ParquetUtils.java
##########
@@ -360,24 +361,56 @@ public Boolean apply(String recordKey) {
 
     return new HoodieColumnRangeMetadata<T>(
         one.getFilePath(),
-        one.getColumnName(), minValue, maxValue, one.getNumNulls() + another.getNumNulls(), one.getStringifier());
+        one.getColumnName(), minValue, maxValue, one.getNumNulls() + another.getNumNulls());
   }
 
   private static Comparable<?> convertToNativeJavaType(PrimitiveType primitiveType, Comparable val) {
     if (primitiveType.getOriginalType() == OriginalType.DECIMAL) {
-      DecimalMetadata decimalMetadata = primitiveType.getDecimalMetadata();
-      return BigDecimal.valueOf((Integer) val, decimalMetadata.getScale());
+      return extractDecimal(val, primitiveType.getDecimalMetadata());
     } else if (primitiveType.getOriginalType() == OriginalType.DATE) {
       // NOTE: This is a workaround to address race-condition in using
       //       {@code SimpleDataFormat} concurrently (w/in {@code DateStringifier})
       // TODO cleanup after Parquet upgrade to 1.12
       synchronized (primitiveType.stringifier()) {
+        // Date logical type is implemented as a signed INT32
+        // REF: https://github.com/apache/parquet-format/blob/master/LogicalTypes.md
         return java.sql.Date.valueOf(
             primitiveType.stringifier().stringify((Integer) val)
         );
       }
+    } else if (primitiveType.getOriginalType() == OriginalType.UTF8) {
+      // NOTE: UTF8 type designates a byte array that should be interpreted as a
+      // UTF-8 encoded character string
+      // REF: https://github.com/apache/parquet-format/blob/master/LogicalTypes.md
+      return ((Binary) val).toStringUsingUTF8();
+    } else if (primitiveType.getPrimitiveTypeName() == PrimitiveType.PrimitiveTypeName.BINARY) {
+      // NOTE: `getBytes` access makes a copy of the underlying byte buffer
+      return ((Binary) val).toByteBuffer();
     }
 
     return val;
   }
+
+  @Nonnull
+  private static BigDecimal extractDecimal(Object val, DecimalMetadata decimalMetadata) {
+    // In Parquet, Decimal could be represented as either of
+    //    1. INT32 (for 1 <= precision <= 9)
+    //    2. INT64 (for 1 <= precision <= 18)
+    //    3. FIXED_LEN_BYTE_ARRAY (precision is limited by the array size. Length n can store <= floor(log_10(2^(8*n - 1) - 1)) base-10 digits)
+    //    4. BINARY (precision is not limited)
+    // REF: https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#DECIMAL
+    int scale = decimalMetadata.getScale();
+    if (val == null) {
+      return null;
+    } else if (val instanceof Integer) {
+      return BigDecimal.valueOf((Integer) val, scale);
+    } else if (val instanceof Long) {
+      return BigDecimal.valueOf((Long) val, scale);
+    } else if (val instanceof Binary) {
+      // NOTE: Unscaled number is stored in BE format (most significant byte is 0th)
+      return new BigDecimal(new BigInteger(((Binary)val).getBytesUnsafe()), scale);
+    } else {
+      throw new UnsupportedOperationException("not supported");

Review comment:
       Throwing error with some extra details will be helpful for debugging. Maybe the val type or value ?

##########
File path: hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestColumnStatsIndex.scala
##########
@@ -268,11 +276,86 @@ class TestColumnStatsIndex extends HoodieClientTestBase {
 
         df.selectExpr(exprs: _*)
           .collect()
-      }),
+      }).asJava,
       indexSchema
     )
   }
 
+  @Test
+  def testParquetMetadataRangeExtraction(): Unit = {
+    val df = generateRandomDataFrame(spark)
+
+    val pathStr = tempDir.resolve("min-max").toAbsolutePath.toString
+
+    df.write.format("parquet")
+      .mode(SaveMode.Overwrite)
+      .save(pathStr)
+
+    val utils = new ParquetUtils

Review comment:
       nit: few white lines can be trimmed here

##########
File path: hudi-common/src/main/java/org/apache/hudi/common/util/ParquetUtils.java
##########
@@ -360,24 +361,56 @@ public Boolean apply(String recordKey) {
 
     return new HoodieColumnRangeMetadata<T>(
         one.getFilePath(),
-        one.getColumnName(), minValue, maxValue, one.getNumNulls() + another.getNumNulls(), one.getStringifier());
+        one.getColumnName(), minValue, maxValue, one.getNumNulls() + another.getNumNulls());
   }
 
   private static Comparable<?> convertToNativeJavaType(PrimitiveType primitiveType, Comparable val) {
     if (primitiveType.getOriginalType() == OriginalType.DECIMAL) {
-      DecimalMetadata decimalMetadata = primitiveType.getDecimalMetadata();
-      return BigDecimal.valueOf((Integer) val, decimalMetadata.getScale());
+      return extractDecimal(val, primitiveType.getDecimalMetadata());
     } else if (primitiveType.getOriginalType() == OriginalType.DATE) {
       // NOTE: This is a workaround to address race-condition in using
       //       {@code SimpleDataFormat} concurrently (w/in {@code DateStringifier})
       // TODO cleanup after Parquet upgrade to 1.12
       synchronized (primitiveType.stringifier()) {
+        // Date logical type is implemented as a signed INT32

Review comment:
       I heard Parquet is already upgraded. If so, can we remove this sync workaround ?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org