You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Wang Zekai (Jira)" <ji...@apache.org> on 2021/10/05 09:27:00 UTC
[jira] [Created] (SPARK-36932) Misuse "merge schema" when mapGroups
Wang Zekai created SPARK-36932:
----------------------------------
Summary: Misuse "merge schema" when mapGroups
Key: SPARK-36932
URL: https://issues.apache.org/jira/browse/SPARK-36932
Project: Spark
Issue Type: Bug
Components: Java API
Affects Versions: 3.0.0
Reporter: Wang Zekai
{code:java}
// Test case for this bug
val spark = SparkSession.builder().master("local[*]").getOrCreate()
val data1 = Seq(
Row("0", 1),
Row("0", 2))
val schema1 = StructType(List(
StructField("col0", StringType),
StructField("col1", IntegerType))
)
val data2 = Seq(
Row("0", 1),
Row("0", 2))
val schema2 = StructType(List(
StructField("str0", StringType),
StructField("col0", IntegerType))
)
val df1 = spark.createDataFrame(spark.sparkContext.makeRDD(data1), schema1)
val df2 = spark.createDataFrame(spark.sparkContext.makeRDD(data2), schema2)
val joined = df1.join(df2, df1("col0") === df2("str0"), "left")
import spark.implicits._
val distinct = joined
.groupByKey {
row => row.getInt(1)
}
.mapGroups {
case (_, iter) =>
iter.maxBy(row => {
row.getInt(3)
})
}(RowEncoder(joined.schema))
distinct.show(){code}
{code:java}
// A part of errors
Exception in thread "main" org.apache.spark.SparkException: Failed to merge fields 'col0' and 'col0'. Failed to merge incompatible data types string and int
at org.apache.spark.sql.types.StructType$.$anonfun$merge$2(StructType.scala:593)
at scala.Option.map(Option.scala:163)
at org.apache.spark.sql.types.StructType$.$anonfun$merge$1(StructType.scala:585)
at org.apache.spark.sql.types.StructType$.$anonfun$merge$1$adapted
(StructType.scala:582)
at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198)
at org.apache.spark.sql.types.StructType$.merge(StructType.scala:582)
at org.apache.spark.sql.types.StructType.merge(StructType.scala:492)
at org.apache.spark.sql.catalyst.expressions.SchemaPruning$.$
anonfun$pruneDataSchema$2(SchemaPruning.scala:36)
at scala.collection.LinearSeqOptimized.foldLeft(LinearSeqOptimized.scala:126)
at scala.collection.LinearSeqOptimized.foldLeft$(LinearSeqOptimized.scala:122)
at scala.collection.immutable.List.foldLeft(List.scala:89)
at scala.collection.LinearSeqOptimized.reduceLeft(LinearSeqOptimized.scala:140)
at scala.collection.LinearSeqOptimized.reduceLeft$(LinearSeqOptimized.scala:138)
at scala.collection.immutable.List.reduceLeft(List.scala:89)
{code}
After left join two dataframe which have two shemas with the same name but different types, we use groupByKey and mapGroups to get the result. But it will makes some mistakes. Is it my grammatical mistake? If not, I think It may be related to schema merge in StructType.scala: 593. How can I turn off schema merging?
--
This message was sent by Atlassian Jira
(v8.3.4#803005)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org