You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Marcelo Vanzin (JIRA)" <ji...@apache.org> on 2019/02/12 22:20:00 UTC

[jira] [Updated] (SPARK-26436) Dataframe resulting from a GroupByKey and flatMapGroups operation throws java.lang.UnsupportedException when groupByKey is applied on it.

     [ https://issues.apache.org/jira/browse/SPARK-26436?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Marcelo Vanzin updated SPARK-26436:
-----------------------------------
    Component/s:     (was: Spark Core)
                 SQL

> Dataframe resulting from a GroupByKey and flatMapGroups operation throws java.lang.UnsupportedException when groupByKey is applied on it.
> -----------------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-26436
>                 URL: https://issues.apache.org/jira/browse/SPARK-26436
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 2.4.0
>            Reporter: Manish
>            Priority: Major
>
> There seems to be a bug on groupByKey api for cases when it (groupByKey) is applied on a DataSet resulting from a former groupByKey and flatMapGroups invocation.
> In such cases groupByKey throws the following exception:
> java.lang.UnsupportedException: fieldIndex on a Row without schema is undefined.
>  
> Although the dataframe has a valid schema and a groupBy("key") or repartition($"key") api calls on the same Dataframe and key succeed.
>  
> Following is the code that reproduces the scenario:
>  
> {code:scala}
>  
>    import org.apache.spark.sql.catalyst.encoders.RowEncoder
> import org.apache.spark.sql.{Row, SparkSession}
> import org.apache.spark.sql.types.{ IntegerType, StructField, StructType}
> import scala.collection.mutable.ListBuffer
>   object Test {
>     def main(args: Array[String]): Unit = {
>       val values = List(List("1", "One") ,List("1", "Two") ,List("2", "Three"),List("2","4")).map(x =>(x(0), x(1)))
>       val session = SparkSession.builder.config("spark.master", "local").getOrCreate
>       import session.implicits._
>       val dataFrame = values.toDF
>       dataFrame.show()
>       dataFrame.printSchema()
>       val newSchema = StructType(dataFrame.schema.fields
>         ++ Array(
>         StructField("Count", IntegerType, false)
>       )
>       )
>       val expr = RowEncoder.apply(newSchema)
>       val tranform =  dataFrame.groupByKey(row => row.getAs[String]("_1")).flatMapGroups((key, inputItr) => {
>         val inputSeq = inputItr.toSeq
>         val length = inputSeq.size
>         var listBuff = new ListBuffer[Row]()
>         var counter : Int= 0
>         for(i <- 0 until(length))
>         {
>           counter+=1
>         }
>         for(i <- 0 until length ) {
>           var x = inputSeq(i)
>           listBuff += Row.fromSeq(x.toSeq ++ Array[Int](counter))
>         }
>         listBuff.iterator
>       })(expr)
>       tranform.show
>       val newSchema1 = StructType(tranform.schema.fields
>         ++ Array(
>         StructField("Count1", IntegerType, false)
>       )
>       )
>       val expr1 = RowEncoder.apply(newSchema1)
>       val tranform2 =  tranform.groupByKey(row => row.getAs[String]("_1")).flatMapGroups((key, inputItr) => {
>         val inputSeq = inputItr.toSeq
>         val length = inputSeq.size
>         var listBuff = new ListBuffer[Row]()
>         var counter : Int= 0
>         for(i <- 0 until(length))
>         {
>           counter+=1
>         }
>         for(i <- 0 until length ) {
>           var x = inputSeq(i)
>           listBuff += Row.fromSeq(x.toSeq ++ Array[Int](counter))
>         }
>         listBuff.iterator
>       })(expr1)
>       tranform2.show
>     }
> }
> Test.main(null)
> {code}
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org