You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Matt Smith <ma...@gmail.com> on 2016/10/25 06:10:53 UTC

Grouping into Arrays

I worked up the following for grouping a DataFrame by a key and aggregating
into arrays.  It works, but I think it is horrible.   Is there a better
way?  Especially one that does not require RDDs?  This is a common pattern
we need as we often want to explode JSON arrays, do something to enrich the
data, then collapse it back into a structure similar to pre-exploded, but
with the enriched data.  collect_list seems to be the pattern I am looking
for but it only works with Hive and only with primitives. Help?

thx.

  def groupToArray(df: DataFrame, groupByCols: Seq[String], arrayCol:
String): DataFrame = {
    val sourceSchema = df.schema
    val arrayField = StructField(arrayCol,
ArrayType(sourceSchema(arrayCol).dataType))
    val groupByIndexes = groupByCols.map( colName =>
sourceSchema.fieldIndex(colName))
    val arrayIndex = sourceSchema.fieldIndex(arrayCol)
    val destSchema = StructType(
      groupByCols.map( colName => sourceSchema(colName)) :+
      arrayField
    )
    val rowRdd = df
      .rdd
      .groupBy( r => groupByIndexes.map(r(_)) )
      .map{ case (_, rowsIter) =>
          val rowValues = rowsIter.head.toSeq
          val arr = rowsIter.map { r => r(arrayIndex) }
          val keys = groupByIndexes.map( ndx => rowValues(ndx))
          Row.fromSeq(keys :+ arr)
      }

    df.sqlContext.createDataFrame(rowRdd, destSchema)
  }