You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2022/08/06 13:35:13 UTC

[GitHub] [spark] zinking commented on a diff in pull request #34199: [SPARK-36935][SQL] Extend ParquetSchemaConverter to compute Parquet repetition & definition level

zinking commented on code in PR #34199:
URL: https://github.com/apache/spark/pull/34199#discussion_r939529361


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala:
##########
@@ -222,17 +302,36 @@ class ParquetToSparkSchemaConverter(
       case _: ListLogicalTypeAnnotation =>
         ParquetSchemaConverter.checkConversionRequirement(
           field.getFieldCount == 1, s"Invalid list type $field")
+        ParquetSchemaConverter.checkConversionRequirement(
+          sparkReadType.forall(_.isInstanceOf[ArrayType]),
+          s"Invalid Spark read type: expected $field to be list type but found $sparkReadType")
 
-        val repeatedType = field.getType(0)
+        val repeated = groupColumn.getChild(0)
+        val repeatedType = repeated.getType
         ParquetSchemaConverter.checkConversionRequirement(
           repeatedType.isRepetition(REPEATED), s"Invalid list type $field")
+        val sparkReadElementType = sparkReadType.map(_.asInstanceOf[ArrayType].elementType)
 
         if (isElementType(repeatedType, field.getName)) {
-          ArrayType(convertField(repeatedType), containsNull = false)
+          var converted = convertField(repeated, sparkReadElementType)
+          val convertedType = sparkReadElementType.getOrElse(converted.sparkType)
+
+          // legacy format such as:
+          //   optional group my_list (LIST) {
+          //     repeated int32 element;
+          //   }
+          // we should mark the primitive field as required
+          if (repeatedType.isPrimitive) converted = converted.copy(required = true)
+
+          ParquetColumn(ArrayType(convertedType, containsNull = false),
+            groupColumn, Seq(converted))
         } else {
-          val elementType = repeatedType.asGroupType().getType(0)
-          val optional = elementType.isRepetition(OPTIONAL)
-          ArrayType(convertField(elementType), containsNull = optional)
+          val element = repeated.asInstanceOf[GroupColumnIO].getChild(0)
+          val converted = convertField(element, sparkReadElementType)
+          val convertedType = sparkReadElementType.getOrElse(converted.sparkType)

Review Comment:
   @sunchao  if the type is already converted, why trying use the `sparkElement` first?  causing https://issues.apache.org/jira/browse/SPARK-39997



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org