You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2022/08/06 15:58:52 UTC

[GitHub] [spark] sunchao commented on a diff in pull request #34199: [SPARK-36935][SQL] Extend ParquetSchemaConverter to compute Parquet repetition & definition level

sunchao commented on code in PR #34199:
URL: https://github.com/apache/spark/pull/34199#discussion_r939545064


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala:
##########
@@ -222,17 +302,36 @@ class ParquetToSparkSchemaConverter(
       case _: ListLogicalTypeAnnotation =>
         ParquetSchemaConverter.checkConversionRequirement(
           field.getFieldCount == 1, s"Invalid list type $field")
+        ParquetSchemaConverter.checkConversionRequirement(
+          sparkReadType.forall(_.isInstanceOf[ArrayType]),
+          s"Invalid Spark read type: expected $field to be list type but found $sparkReadType")
 
-        val repeatedType = field.getType(0)
+        val repeated = groupColumn.getChild(0)
+        val repeatedType = repeated.getType
         ParquetSchemaConverter.checkConversionRequirement(
           repeatedType.isRepetition(REPEATED), s"Invalid list type $field")
+        val sparkReadElementType = sparkReadType.map(_.asInstanceOf[ArrayType].elementType)
 
         if (isElementType(repeatedType, field.getName)) {
-          ArrayType(convertField(repeatedType), containsNull = false)
+          var converted = convertField(repeated, sparkReadElementType)
+          val convertedType = sparkReadElementType.getOrElse(converted.sparkType)
+
+          // legacy format such as:
+          //   optional group my_list (LIST) {
+          //     repeated int32 element;
+          //   }
+          // we should mark the primitive field as required
+          if (repeatedType.isPrimitive) converted = converted.copy(required = true)
+
+          ParquetColumn(ArrayType(convertedType, containsNull = false),
+            groupColumn, Seq(converted))
         } else {
-          val elementType = repeatedType.asGroupType().getType(0)
-          val optional = elementType.isRepetition(OPTIONAL)
-          ArrayType(convertField(elementType), containsNull = optional)
+          val element = repeated.asInstanceOf[GroupColumnIO].getChild(0)
+          val converted = convertField(element, sparkReadElementType)
+          val convertedType = sparkReadElementType.getOrElse(converted.sparkType)

Review Comment:
   The purpose here is to make sure we don't have any precision loss during the conversion. For instance, `sparkReadElementType` may be smallint or tinyint while Parquet only support int. In this case, after conversion we'll still want to use the read schema passed from Spark. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org