You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Everett Anderson <ev...@nuna.com.INVALID> on 2017/05/01 16:47:35 UTC
Re: Calculate mode separately for multiple columns in row
Two more ways:
*Using the Typed Dataset API with Rows*
Caveat: The docs about flatMapGroups do warn "This function does not
support partial aggregation, and as a result requires shuffling all the
data in the Dataset. If an application intends to perform an aggregation
over each key, it is best to use the reduce function or an Aggregator."
class FrequencyCalculator[T]() {
private val frequencies: mutable.Map[T, Long] = mutable.Map[T, Long]()
def update(value: T): Unit = {
frequencies.put(value, frequencies.getOrElse(value, 0L) + 1L)
}
def getMode: T = {
frequencies.maxBy(_._2)._1
}
}
val encoder = RowEncoder.apply(input.schema)
val output = input
.groupByKey(_.getString(0))
.flatMapGroups((k, rows) => {
val colorCalculator = new FrequencyCalculator[String]()
val featureCalculator = new FrequencyCalculator[String]()
rows.foreach(r => {
colorCalculator.update(r.getString(r.fieldIndex("color")))
featureCalculator.update(r.getString(r.fieldIndex("feature")))
})
Iterator(Row(k, colorCalculator.getMode, featureCalculator.getMode))
}) (encoder)
*Using SQL API with Self-Joins Per Column*
Caveat: I imagine this is inefficient for a large set of columns you need
the mode for.
object SqlModeUtils {
// Calculates a table of group column => mode for the data column.
private def getModeSubTable(df: DataFrame, groupCol: Column, dataCol:
Column): DataFrame = {
val counts = df.groupBy(groupCol, dataCol).count
val windowSpec =
Window.partitionBy(groupCol).orderBy(counts("count").desc)
counts
.withColumn("rank",
functions.dense_rank().over(windowSpec)).where("rank = 1")
.select(groupCol, dataCol)
}
// Calculates a table of group col => data1 mode, data2 mode, data3 mode.
def getModePerColumn(groupCol: String, df: DataFrame, dataCols: String*):
DataFrame = {
dataCols
.map(dataCol => getModeSubTable(df, df(groupCol), df(dataCol)))
.reduce((left: DataFrame, right: DataFrame) => left.join(right,
groupCol))
}
}
val output = SqlModeUtils.getModePerColumn("creature", input, "color",
"feature")
I believe this is equivalent to the SQL
SELECT color_mode.creature,
color_mode.color,
feature_mode.feature
FROM ((SELECT creature,
color,
*Dense_rank*()
OVER(
partition BY creature
ORDER BY color_count DESC) AS rank
FROM (SELECT creature,
color,
*Count*(*) AS color_count
FROM data
GROUP BY creature,
color)
HAVING rank = 1) AS color_mode
JOIN (SELECT creature,
feature,
*Dense_rank*()
OVER(
partition BY creature
ORDER BY feature_count DESC) AS rank
FROM (SELECT creature,
feature,
*Count*(*) AS feature_count
FROM data
GROUP BY creature,
feature)
HAVING rank = 1) AS feature_mode
ON color_mode.creature = feature_mode.creature)
On Thu, Apr 27, 2017 at 9:40 AM, Everett Anderson <ev...@nuna.com> wrote:
> For the curious, I played around with a UDAF for this (shown below). On
> the downside, it assembles a Map of all possible values of the column
> that'll need to be stored in memory somewhere.
>
> I suspect some kind of sorted groupByKey + cogroup could stream values
> through, though might not support partial aggregation, then. Will try that
> next.
>
> /**
> * [[UserDefinedAggregateFunction]] for computing the mode of a string
> column.
> *
> * WARNING: This will assemble a Map of all possible values in memory.
> *
> * It'll ignore null values and return null if all values are null.
> */
> class ModeAggregateFunction extends UserDefinedAggregateFunction {
>
> override def inputSchema: StructType = StructType(StructField("value",
> StringType) :: Nil)
>
> override def bufferSchema: StructType = StructType(
> StructField("values", MapType(StringType, LongType, valueContainsNull
> = false)) :: Nil)
>
> override def dataType: DataType = StringType
>
> override def deterministic: Boolean = true
>
> // This is the initial value for your buffer schema.
> override def initialize(buffer: MutableAggregationBuffer): Unit = {
> buffer(0) = Map[String, Long]()
> }
>
> override def update(buffer: MutableAggregationBuffer, input: Row): Unit
> = {
> if (input == null || input.getString(0) == null) {
> return
> }
>
> val value = input.getString(0)
> val frequencies = buffer.getAs[Map[String, Long]](0)
> val count = frequencies.getOrElse(value, 0L)
>
> buffer(0) = frequencies + (value -> (count + 1L))
> }
>
> override def merge(buffer1: MutableAggregationBuffer, buffer2: Row):
> Unit = {
> val frequencies1: Map[String, Long] = buffer1.getAs[Map[String,
> Long]](0)
> val frequencies2: Map[String, Long] = buffer2.getAs[Map[String,
> Long]](0)
>
> buffer1(0) = frequencies1 ++ frequencies2.map({
> case (k: String,v: Long) => k -> (v.asInstanceOf[Long] +
> frequencies1.getOrElse(k, 0L))
> })
> }
>
> override def evaluate(buffer: Row): Any = {
> val frequencies = buffer.getAs[Map[String, Long]](0)
> if (frequencies.isEmpty) {
> return null
> }
> frequencies.maxBy(_._2)._1
> }
> }
>
>
>
>
> On Wed, Apr 26, 2017 at 10:21 AM, Everett Anderson <ev...@nuna.com>
> wrote:
>
>> Hi,
>>
>> One common situation I run across is that I want to compact my data and
>> select the mode (most frequent value) in several columns for each group.
>>
>> Even calculating mode for one column in SQL is a bit tricky. The ways
>> I've seen usually involve a nested sub-select with a group by + count and
>> then a window function using rank().
>>
>> However, what if you want to calculate the mode for several columns,
>> producing a new row with the results? And let's say the set of columns is
>> only known at runtime.
>>
>> In Spark SQL, I start going down a road of many self-joins. The more
>> efficient way leads me to either RDD[Row] or Dataset[Row] where I could do
>> a groupByKey + flatMapGroups, keeping state as I iterate over the Rows in
>> each group.
>>
>> What's the best way?
>>
>> Here's a contrived example:
>>
>> val input = spark.sparkContext.parallelize(Seq(
>> ("catosaur", "black", "claws"),
>> ("catosaur", "orange", "scales"),
>> ("catosaur", "black", "scales"),
>> ("catosaur", "orange", "scales"),
>> ("catosaur", "black", "spikes"),
>> ("bearcopter", "gray", "claws"),
>> ("bearcopter", "black", "fur"),
>> ("bearcopter", "gray", "flight"),
>> ("bearcopter", "gray", "flight")))
>> .toDF("creature", "color", "feature")
>>
>> +----------+------+-------+
>> |creature |color |feature|
>> +----------+------+-------+
>> |catosaur |black |claws |
>> |catosaur |orange|scales |
>> |catosaur |black |scales |
>> |catosaur |orange|scales |
>> |catosaur |black |spikes |
>> |bearcopter|gray |claws |
>> |bearcopter|black |fur |
>> |bearcopter|gray |flight |
>> |bearcopter|gray |flight |
>> +----------+------+-------+
>>
>> val expectedOutput = spark.sparkContext.parallelize(Seq(
>> ("catosaur", "black", "scales"),
>> ("bearcopter", "gray", "flight")))
>> .toDF("creature", "color", "feature")
>>
>> +----------+-----+-------+
>> |creature |color|feature|
>> +----------+-----+-------+
>> |catosaur |black|scales |
>> |bearcopter|gray |flight |
>> +----------+-----+-------+
>>
>>
>>
>>
>>
>>
>