You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Wang Zekai (Jira)" <ji...@apache.org> on 2021/10/05 09:27:00 UTC

[jira] [Created] (SPARK-36932) Misuse "merge schema" when mapGroups

Wang Zekai created SPARK-36932:
----------------------------------

             Summary: Misuse "merge schema" when mapGroups
                 Key: SPARK-36932
                 URL: https://issues.apache.org/jira/browse/SPARK-36932
             Project: Spark
          Issue Type: Bug
          Components: Java API
    Affects Versions: 3.0.0
            Reporter: Wang Zekai


{code:java}
// Test case for this bug
val spark = SparkSession.builder().master("local[*]").getOrCreate()

val data1 = Seq(
  Row("0", 1),
  Row("0", 2))
val schema1 = StructType(List(
  StructField("col0", StringType),
  StructField("col1", IntegerType))
)
val data2 = Seq(
  Row("0", 1),
  Row("0", 2))
val schema2 = StructType(List(
  StructField("str0", StringType),
  StructField("col0", IntegerType))
)

val df1 = spark.createDataFrame(spark.sparkContext.makeRDD(data1), schema1)
val df2 = spark.createDataFrame(spark.sparkContext.makeRDD(data2), schema2)

val joined = df1.join(df2, df1("col0") === df2("str0"), "left")

import spark.implicits._
val distinct = joined
  .groupByKey {
    row => row.getInt(1)
  }
  .mapGroups {
    case (_, iter) =>
        iter.maxBy(row => {
          row.getInt(3)
        })
  }(RowEncoder(joined.schema))

distinct.show(){code}
{code:java}
 // A part of errors
Exception in thread "main" org.apache.spark.SparkException: Failed to merge fields 'col0' and 'col0'. Failed to merge incompatible data types string and int 
at org.apache.spark.sql.types.StructType$.$anonfun$merge$2(StructType.scala:593)
at scala.Option.map(Option.scala:163)
at org.apache.spark.sql.types.StructType$.$anonfun$merge$1(StructType.scala:585)
at org.apache.spark.sql.types.StructType$.$anonfun$merge$1$adapted
(StructType.scala:582)
at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198) 
at org.apache.spark.sql.types.StructType$.merge(StructType.scala:582) 
at org.apache.spark.sql.types.StructType.merge(StructType.scala:492)
at org.apache.spark.sql.catalyst.expressions.SchemaPruning$.$
anonfun$pruneDataSchema$2(SchemaPruning.scala:36)
at scala.collection.LinearSeqOptimized.foldLeft(LinearSeqOptimized.scala:126)
at scala.collection.LinearSeqOptimized.foldLeft$(LinearSeqOptimized.scala:122) 
at scala.collection.immutable.List.foldLeft(List.scala:89) 
at scala.collection.LinearSeqOptimized.reduceLeft(LinearSeqOptimized.scala:140) 
at scala.collection.LinearSeqOptimized.reduceLeft$(LinearSeqOptimized.scala:138) 
at scala.collection.immutable.List.reduceLeft(List.scala:89)
{code}

After left join two dataframe which have two shemas with the same name but different types, we use groupByKey and mapGroups to get the result. But it will makes some mistakes. Is it my grammatical mistake? If not, I think It may be related to schema merge in StructType.scala: 593. How can I turn off schema merging?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org