You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Manish (JIRA)" <ji...@apache.org> on 2018/12/24 15:10:00 UTC

[jira] [Created] (SPARK-26436) Dataframe as a result of GroupByKey and flatMapGroups operation throws java.lang.UnsupportedException when groupByKey is applied on it.

Manish created SPARK-26436:
------------------------------

             Summary: Dataframe as a result of GroupByKey and flatMapGroups operation throws java.lang.UnsupportedException when groupByKey is applied on it.
                 Key: SPARK-26436
                 URL: https://issues.apache.org/jira/browse/SPARK-26436
             Project: Spark
          Issue Type: Bug
          Components: Spark Core
    Affects Versions: 2.4.0
            Reporter: Manish


There seems to be a bug on groupByKey api for cases when it (groupByKey) is applied on a DataSet resulting from a former groupByKey and flatMapGroups invocation.

In such cases groupByKey throws the following exception:

java.lang.UnsupportedException: fieldIndex on a Row without schema is undefined.

 

Although the dataframe has a valid schema and a groupBy("key") or repartition($"key") api calls on the same Dataframe and key succeed.

 

Following is the code that reproduces the scenario:

 

{code}

 

{{ import org.apache.spark.sql.catalyst.encoders.RowEncoder import org.apache.spark.sql.{Row, SparkSession} import org.apache.spark.sql.types.{ IntegerType, StructField, StructType} import scala.collection.mutable.ListBuffer object Test { def main(args: Array[String]): Unit = { val values = List(List("1", "One") ,List("1", "Two") ,List("2", "Three"),List("2","4")).map(x =>(x(0), x(1))) val session = SparkSession.builder.config("spark.master", "local").getOrCreate import session.implicits._ val dataFrame = values.toDF dataFrame.show() dataFrame.printSchema() val newSchema = StructType(dataFrame.schema.fields ++ Array( StructField("Count", IntegerType, false) ) ) val expr = RowEncoder.apply(newSchema) val tranform = dataFrame.groupByKey(row => row.getAs[String]("_1")).flatMapGroups((key, inputItr) => { val inputSeq = inputItr.toSeq val length = inputSeq.size var listBuff = new ListBuffer[Row]() var counter : Int= 0 for(i <- 0 until(length)) { counter+=1 } for(i <- 0 until length ) { var x = inputSeq(i) listBuff += Row.fromSeq(x.toSeq ++ Array[Int](counter)) } listBuff.iterator })(expr) tranform.show val newSchema1 = StructType(tranform.schema.fields ++ Array( StructField("Count1", IntegerType, false) ) ) val expr1 = RowEncoder.apply(newSchema1) val tranform2 = tranform.groupByKey(row => row.getAs[String]("_1")).flatMapGroups((key, inputItr) => { val inputSeq = inputItr.toSeq val length = inputSeq.size var listBuff = new ListBuffer[Row]() var counter : Int= 0 for(i <- 0 until(length)) { counter+=1 } for(i <- 0 until length ) { var x = inputSeq(i) listBuff += Row.fromSeq(x.toSeq ++ Array[Int](counter)) } listBuff.iterator })(expr1) tranform2.show } }}}

{code}

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org