You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Matt Smith <ma...@gmail.com> on 2016/10/25 06:10:53 UTC
Grouping into Arrays
I worked up the following for grouping a DataFrame by a key and aggregating
into arrays. It works, but I think it is horrible. Is there a better
way? Especially one that does not require RDDs? This is a common pattern
we need as we often want to explode JSON arrays, do something to enrich the
data, then collapse it back into a structure similar to pre-exploded, but
with the enriched data. collect_list seems to be the pattern I am looking
for but it only works with Hive and only with primitives. Help?
thx.
def groupToArray(df: DataFrame, groupByCols: Seq[String], arrayCol:
String): DataFrame = {
val sourceSchema = df.schema
val arrayField = StructField(arrayCol,
ArrayType(sourceSchema(arrayCol).dataType))
val groupByIndexes = groupByCols.map( colName =>
sourceSchema.fieldIndex(colName))
val arrayIndex = sourceSchema.fieldIndex(arrayCol)
val destSchema = StructType(
groupByCols.map( colName => sourceSchema(colName)) :+
arrayField
)
val rowRdd = df
.rdd
.groupBy( r => groupByIndexes.map(r(_)) )
.map{ case (_, rowsIter) =>
val rowValues = rowsIter.head.toSeq
val arr = rowsIter.map { r => r(arrayIndex) }
val keys = groupByIndexes.map( ndx => rowValues(ndx))
Row.fromSeq(keys :+ arr)
}
df.sqlContext.createDataFrame(rowRdd, destSchema)
}