You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Everett Anderson <ev...@nuna.com.INVALID> on 2017/05/01 16:47:35 UTC

Re: Calculate mode separately for multiple columns in row

Two more ways:

*Using the Typed Dataset API with Rows*

Caveat: The docs about flatMapGroups do warn "This function does not
support partial aggregation, and as a result requires shuffling all the
data in the Dataset. If an application intends to perform an aggregation
over each key, it is best to use the reduce function or an Aggregator."

class FrequencyCalculator[T]() {
  private val frequencies: mutable.Map[T, Long] = mutable.Map[T, Long]()

  def update(value: T): Unit = {
    frequencies.put(value, frequencies.getOrElse(value, 0L) + 1L)
  }

  def getMode: T = {
    frequencies.maxBy(_._2)._1
  }
}

val encoder = RowEncoder.apply(input.schema)

val output = input
    .groupByKey(_.getString(0))
    .flatMapGroups((k, rows) => {
        val colorCalculator = new FrequencyCalculator[String]()
        val featureCalculator = new FrequencyCalculator[String]()
        rows.foreach(r => {
          colorCalculator.update(r.getString(r.fieldIndex("color")))
          featureCalculator.update(r.getString(r.fieldIndex("feature")))
        })

        Iterator(Row(k, colorCalculator.getMode, featureCalculator.getMode))
    }) (encoder)

*Using SQL API with Self-Joins Per Column*

Caveat: I imagine this is inefficient for a large set of columns you need
the mode for.

object SqlModeUtils {
  // Calculates a table of group column => mode for the data column.
  private def getModeSubTable(df: DataFrame, groupCol: Column, dataCol:
Column): DataFrame = {
    val counts = df.groupBy(groupCol, dataCol).count
    val windowSpec =
Window.partitionBy(groupCol).orderBy(counts("count").desc)

    counts
      .withColumn("rank",
functions.dense_rank().over(windowSpec)).where("rank = 1")
      .select(groupCol, dataCol)
  }

  // Calculates a table of group col => data1 mode, data2 mode, data3 mode.
  def getModePerColumn(groupCol: String, df: DataFrame, dataCols: String*):
DataFrame = {
    dataCols
      .map(dataCol => getModeSubTable(df, df(groupCol), df(dataCol)))
      .reduce((left: DataFrame, right: DataFrame) => left.join(right,
groupCol))
  }
}

val output = SqlModeUtils.getModePerColumn("creature", input, "color",
"feature")

I believe this is equivalent to the SQL

SELECT color_mode.creature,
       color_mode.color,
       feature_mode.feature
FROM   ((SELECT creature,
               color,
               *Dense_rank*()
                 OVER(
                   partition BY creature
                   ORDER BY color_count DESC) AS rank
        FROM   (SELECT creature,
                       color,
                       *Count*(*) AS color_count
                FROM   data
                GROUP  BY creature,
                          color)
        HAVING rank = 1) AS color_mode
        JOIN (SELECT creature,
                     feature,
                     *Dense_rank*()
                       OVER(
                         partition BY creature
                         ORDER BY feature_count DESC) AS rank
              FROM   (SELECT creature,
                             feature,
                             *Count*(*) AS feature_count
                      FROM   data
                      GROUP  BY creature,
                                feature)
              HAVING rank = 1) AS feature_mode
          ON color_mode.creature = feature_mode.creature)


On Thu, Apr 27, 2017 at 9:40 AM, Everett Anderson <ev...@nuna.com> wrote:

> For the curious, I played around with a UDAF for this (shown below). On
> the downside, it assembles a Map of all possible values of the column
> that'll need to be stored in memory somewhere.
>
> I suspect some kind of sorted groupByKey + cogroup could stream values
> through, though might not support partial aggregation, then. Will try that
> next.
>
> /**
>   * [[UserDefinedAggregateFunction]] for computing the mode of a string
> column.
>   *
>   * WARNING: This will assemble a Map of all possible values in memory.
>   *
>   * It'll ignore null values and return null if all values are null.
>   */
> class ModeAggregateFunction extends UserDefinedAggregateFunction {
>
>   override def inputSchema: StructType = StructType(StructField("value",
> StringType) :: Nil)
>
>   override def bufferSchema: StructType = StructType(
>     StructField("values", MapType(StringType, LongType, valueContainsNull
> = false)) :: Nil)
>
>   override def dataType: DataType = StringType
>
>   override def deterministic: Boolean = true
>
>   // This is the initial value for your buffer schema.
>   override def initialize(buffer: MutableAggregationBuffer): Unit = {
>     buffer(0) = Map[String, Long]()
>   }
>
>   override def update(buffer: MutableAggregationBuffer, input: Row): Unit
> = {
>     if (input == null || input.getString(0) == null) {
>       return
>     }
>
>     val value = input.getString(0)
>     val frequencies = buffer.getAs[Map[String, Long]](0)
>     val count = frequencies.getOrElse(value, 0L)
>
>     buffer(0) = frequencies + (value -> (count + 1L))
>   }
>
>   override def merge(buffer1: MutableAggregationBuffer, buffer2: Row):
> Unit = {
>     val frequencies1: Map[String, Long] = buffer1.getAs[Map[String,
> Long]](0)
>     val frequencies2: Map[String, Long] = buffer2.getAs[Map[String,
> Long]](0)
>
>     buffer1(0) = frequencies1 ++ frequencies2.map({
>       case (k: String,v: Long) => k -> (v.asInstanceOf[Long] +
> frequencies1.getOrElse(k, 0L))
>     })
>   }
>
>   override def evaluate(buffer: Row): Any = {
>     val frequencies = buffer.getAs[Map[String, Long]](0)
>     if (frequencies.isEmpty) {
>       return null
>     }
>     frequencies.maxBy(_._2)._1
>   }
> }
>
>
>
>
> On Wed, Apr 26, 2017 at 10:21 AM, Everett Anderson <ev...@nuna.com>
> wrote:
>
>> Hi,
>>
>> One common situation I run across is that I want to compact my data and
>> select the mode (most frequent value) in several columns for each group.
>>
>> Even calculating mode for one column in SQL is a bit tricky. The ways
>> I've seen usually involve a nested sub-select with a group by + count and
>> then a window function using rank().
>>
>> However, what if you want to calculate the mode for several columns,
>> producing a new row with the results? And let's say the set of columns is
>> only known at runtime.
>>
>> In Spark SQL, I start going down a road of many self-joins. The more
>> efficient way leads me to either RDD[Row] or Dataset[Row] where I could do
>> a groupByKey + flatMapGroups, keeping state as I iterate over the Rows in
>> each group.
>>
>> What's the best way?
>>
>> Here's a contrived example:
>>
>> val input = spark.sparkContext.parallelize(Seq(
>>     ("catosaur", "black", "claws"),
>>     ("catosaur", "orange", "scales"),
>>     ("catosaur", "black", "scales"),
>>     ("catosaur", "orange", "scales"),
>>     ("catosaur", "black", "spikes"),
>>     ("bearcopter", "gray", "claws"),
>>     ("bearcopter", "black", "fur"),
>>     ("bearcopter", "gray", "flight"),
>>     ("bearcopter", "gray", "flight")))
>>     .toDF("creature", "color", "feature")
>>
>> +----------+------+-------+
>> |creature  |color |feature|
>> +----------+------+-------+
>> |catosaur  |black |claws  |
>> |catosaur  |orange|scales |
>> |catosaur  |black |scales |
>> |catosaur  |orange|scales |
>> |catosaur  |black |spikes |
>> |bearcopter|gray  |claws  |
>> |bearcopter|black |fur    |
>> |bearcopter|gray  |flight |
>> |bearcopter|gray  |flight |
>> +----------+------+-------+
>>
>> val expectedOutput = spark.sparkContext.parallelize(Seq(
>>     ("catosaur", "black", "scales"),
>>     ("bearcopter", "gray", "flight")))
>>     .toDF("creature", "color", "feature")
>>
>> +----------+-----+-------+
>> |creature  |color|feature|
>> +----------+-----+-------+
>> |catosaur  |black|scales |
>> |bearcopter|gray |flight |
>> +----------+-----+-------+
>>
>>
>>
>>
>>
>>
>