You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Marcelo Vanzin (JIRA)" <ji...@apache.org> on 2019/02/12 22:20:00 UTC
[jira] [Updated] (SPARK-26436) Dataframe resulting from a
GroupByKey and flatMapGroups operation throws
java.lang.UnsupportedException when groupByKey is applied on it.
[ https://issues.apache.org/jira/browse/SPARK-26436?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Marcelo Vanzin updated SPARK-26436:
-----------------------------------
Component/s: (was: Spark Core)
SQL
> Dataframe resulting from a GroupByKey and flatMapGroups operation throws java.lang.UnsupportedException when groupByKey is applied on it.
> -----------------------------------------------------------------------------------------------------------------------------------------
>
> Key: SPARK-26436
> URL: https://issues.apache.org/jira/browse/SPARK-26436
> Project: Spark
> Issue Type: Bug
> Components: SQL
> Affects Versions: 2.4.0
> Reporter: Manish
> Priority: Major
>
> There seems to be a bug on groupByKey api for cases when it (groupByKey) is applied on a DataSet resulting from a former groupByKey and flatMapGroups invocation.
> In such cases groupByKey throws the following exception:
> java.lang.UnsupportedException: fieldIndex on a Row without schema is undefined.
>
> Although the dataframe has a valid schema and a groupBy("key") or repartition($"key") api calls on the same Dataframe and key succeed.
>
> Following is the code that reproduces the scenario:
>
> {code:scala}
>
> import org.apache.spark.sql.catalyst.encoders.RowEncoder
> import org.apache.spark.sql.{Row, SparkSession}
> import org.apache.spark.sql.types.{ IntegerType, StructField, StructType}
> import scala.collection.mutable.ListBuffer
> object Test {
> def main(args: Array[String]): Unit = {
> val values = List(List("1", "One") ,List("1", "Two") ,List("2", "Three"),List("2","4")).map(x =>(x(0), x(1)))
> val session = SparkSession.builder.config("spark.master", "local").getOrCreate
> import session.implicits._
> val dataFrame = values.toDF
> dataFrame.show()
> dataFrame.printSchema()
> val newSchema = StructType(dataFrame.schema.fields
> ++ Array(
> StructField("Count", IntegerType, false)
> )
> )
> val expr = RowEncoder.apply(newSchema)
> val tranform = dataFrame.groupByKey(row => row.getAs[String]("_1")).flatMapGroups((key, inputItr) => {
> val inputSeq = inputItr.toSeq
> val length = inputSeq.size
> var listBuff = new ListBuffer[Row]()
> var counter : Int= 0
> for(i <- 0 until(length))
> {
> counter+=1
> }
> for(i <- 0 until length ) {
> var x = inputSeq(i)
> listBuff += Row.fromSeq(x.toSeq ++ Array[Int](counter))
> }
> listBuff.iterator
> })(expr)
> tranform.show
> val newSchema1 = StructType(tranform.schema.fields
> ++ Array(
> StructField("Count1", IntegerType, false)
> )
> )
> val expr1 = RowEncoder.apply(newSchema1)
> val tranform2 = tranform.groupByKey(row => row.getAs[String]("_1")).flatMapGroups((key, inputItr) => {
> val inputSeq = inputItr.toSeq
> val length = inputSeq.size
> var listBuff = new ListBuffer[Row]()
> var counter : Int= 0
> for(i <- 0 until(length))
> {
> counter+=1
> }
> for(i <- 0 until length ) {
> var x = inputSeq(i)
> listBuff += Row.fromSeq(x.toSeq ++ Array[Int](counter))
> }
> listBuff.iterator
> })(expr1)
> tranform2.show
> }
> }
> Test.main(null)
> {code}
>
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org