You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Roberto Mirizzi (JIRA)" <ji...@apache.org> on 2017/01/17 00:42:26 UTC
[jira] [Commented] (SPARK-14409) Investigate adding a
RankingEvaluator to ML
[ https://issues.apache.org/jira/browse/SPARK-14409?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15824774#comment-15824774 ]
Roberto Mirizzi commented on SPARK-14409:
-----------------------------------------
I implemented the RankingEvaluator to be used with ALS. Here's the code
{code:java}
package org.apache.spark.ml.evaluation
import org.apache.spark.annotation.Experimental
import org.apache.spark.ml.evaluation.Evaluator
import org.apache.spark.ml.param.{Params, Param, ParamMap, ParamValidators}
import org.apache.spark.ml.util.{DefaultParamsReadable, DefaultParamsWritable, Identifiable, SchemaUtils}
import org.apache.spark.mllib.evaluation.RankingMetrics
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.Dataset
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.{IntegerType, DoubleType, FloatType}
/**
* Created by Roberto Mirizzi on 12/5/16.
*/
/**
* :: Experimental ::
* Evaluator for ranking, which expects two input columns: prediction and label.
*/
@Experimental
final class RankingEvaluator(override val uid: String)
extends Evaluator with HasUserCol with HasItemCol with HasPredictionCol with HasLabelCol with DefaultParamsWritable {
def this() = this(Identifiable.randomUID("rankEval"))
/**
* Param for metric name in evaluation. Supports:
* - `"map"` (default): mean average precision
* - `"p@k"`: precision@k (1 <= k <= 10)
* - `"ndcg@k"`: normalized discounted cumulative gain@k (1 <= k <= 10)
*
* @group param
*/
val metricName: Param[String] = {
val allowedParams = ParamValidators.inArray(Array("map", "p@1", "p@2", "p@3", "p@4", "p@5", "p@6", "p@7", "p@8", "p@9", "p@10",
"ndcg@1", "ndcg@2", "ndcg@3", "ndcg@4", "ndcg@5", "ndcg@6", "ndcg@7", "ndcg@8", "ndcg@9", "ndcg@10"))
new Param(this, "metricName", "metric name in evaluation (map|p@1|p@2|p@3|p@4|p@5|p@6|p@7|p@8|p@9|p@10|" +
"ndcg@1|ndcg@2|ndcg@3|ndcg@4|ndcg@5|ndcg@6|ndcg@7|ndcg@8|ndcg@9|ndcg@10)", allowedParams)
}
val goodThreshold: Param[String] = {
new Param(this, "goodThreshold", "threshold for good labels")
}
/** @group getParam */
def getMetricName: String = $(metricName)
/** @group setParam */
def setMetricName(value: String): this.type = set(metricName, value)
/** @group getParam */
def getGoodThreshold: Double = $(goodThreshold).toDouble
/** @group setParam */
def setGoodThreshold(value: Double): this.type = set(goodThreshold, value.toString)
/** @group setParam */
def setUserCol(value: String): this.type = set(userCol, value)
/** @group setParam */
def setItemCol(value: String): this.type = set(itemCol, value)
/** @group setParam */
def setLabelCol(value: String): this.type = set(labelCol, value)
/** @group setParam */
def setPredictionCol(value: String): this.type = set(predictionCol, value)
setDefault(metricName -> "map")
setDefault(goodThreshold -> "0")
override def evaluate(dataset: Dataset[_]): Double = {
val spark = dataset.sparkSession
import spark.implicits._
val schema = dataset.schema
SchemaUtils.checkNumericType(schema, $(userCol))
SchemaUtils.checkNumericType(schema, $(itemCol))
SchemaUtils.checkColumnTypes(schema, $(labelCol), Seq(DoubleType, FloatType))
SchemaUtils.checkColumnTypes(schema, $(predictionCol), Seq(DoubleType, FloatType))
val windowByUserRankByPrediction = Window.partitionBy(col($(userCol))).orderBy(col($(predictionCol)).desc)
val windowByUserRankByRating = Window.partitionBy(col($(userCol))).orderBy(col($(labelCol)).desc)
val predictionDataset = dataset.select(col($(userCol)).cast(IntegerType),
col($(itemCol)).cast(IntegerType),
col($(predictionCol)).cast(FloatType), row_number().over(windowByUserRankByPrediction).as("rank"))
.where(s"rank <= 10")
.groupBy(col($(userCol)))
.agg(collect_list(col($(itemCol))).as("prediction_list"))
.withColumnRenamed($(userCol), "predicted_userId")
.as[(Int, Array[Int])]
predictionDataset.show()
// // alternative to the above query
// dataset.createOrReplaceTempView("sortedRanking")
// spark.sql("SELECT _1 AS predicted_userId, collect_list(_2) AS prediction_list FROM " +
// "(SELECT *, row_number() OVER (PARTITION BY _1 ORDER BY _4 DESC) AS rank FROM sortedRanking) x " +
// "WHERE rank <= 10 " +
// "GROUP BY predicted_userId").as[(Int, Array[Int])]
val actualDataset = dataset.select(col($(userCol)).cast(IntegerType),
col($(itemCol)).cast(IntegerType),
row_number().over(windowByUserRankByRating))
.where(col($(labelCol)).cast(DoubleType) > $(goodThreshold))
.groupBy(col($(userCol)))
.agg(collect_list(col($(itemCol))).as("actual_list"))
.withColumnRenamed($(userCol), "actual_userId")
.as[(Int, Array[Int])]
actualDataset.show()
val predictionAndLabels = actualDataset
.join(predictionDataset, actualDataset("actual_userId") === predictionDataset("predicted_userId"))
.select("prediction_list", "actual_list")
.as[(Array[Int], Array[Int])]
.rdd
val metrics = new RankingMetrics[Int](predictionAndLabels)
val metric = $(metricName) match {
case "map" => metrics.meanAveragePrecision
case "p@1" => metrics.precisionAt(1)
case "p@2" => metrics.precisionAt(2)
case "p@3" => metrics.precisionAt(3)
case "p@4" => metrics.precisionAt(4)
case "p@5" => metrics.precisionAt(5)
case "p@6" => metrics.precisionAt(6)
case "p@7" => metrics.precisionAt(7)
case "p@8" => metrics.precisionAt(8)
case "p@9" => metrics.precisionAt(9)
case "p@10" => metrics.precisionAt(10)
case "ndcg@1" => metrics.ndcgAt(1)
case "ndcg@2" => metrics.ndcgAt(2)
case "ndcg@3" => metrics.ndcgAt(3)
case "ndcg@4" => metrics.ndcgAt(4)
case "ndcg@5" => metrics.ndcgAt(5)
case "ndcg@6" => metrics.ndcgAt(6)
case "ndcg@7" => metrics.ndcgAt(7)
case "ndcg@8" => metrics.ndcgAt(8)
case "ndcg@9" => metrics.ndcgAt(9)
case "ndcg@10" => metrics.ndcgAt(10)
}
metric
}
override def isLargerBetter: Boolean = $(metricName) match {
case "map" => true
case "p@1" => true
case "p@2" => true
case "p@3" => true
case "p@4" => true
case "p@5" => true
case "p@6" => true
case "p@7" => true
case "p@8" => true
case "p@9" => true
case "p@10" => true
case "ndcg@1" => true
case "ndcg@2" => true
case "ndcg@3" => true
case "ndcg@4" => true
case "ndcg@5" => true
case "ndcg@6" => true
case "ndcg@7" => true
case "ndcg@8" => true
case "ndcg@9" => true
case "ndcg@10" => true
}
override def copy(extra: ParamMap): RankingEvaluator = defaultCopy(extra)
}
object RankingEvaluator extends DefaultParamsReadable[RankingEvaluator] {
override def load(path: String): RankingEvaluator = super.load(path)
}
/**
* Trait for shared param userCol (default: "user").
*/
private[evaluator] trait HasUserCol extends Params {
/**
* Param for label column name.
*
* @group param
*/
final val userCol: Param[String] = new Param[String](this, "userCol", "user column name")
setDefault(userCol, "user")
/** @group getParam */
final def getUserCol: String = $(userCol)
}
/**
* Trait for shared param itemCol (default: "item").
*/
private[evaluator] trait HasItemCol extends Params {
/**
* Param for label column name.
*
* @group param
*/
final val itemCol: Param[String] = new Param[String](this, "itemCol", "item column name")
setDefault(itemCol, "item")
/** @group getParam */
final def getItemCol: String = $(itemCol)
}
/**
* Trait for shared param labelCol (default: "label").
*/
private[evaluator] trait HasLabelCol extends Params {
/**
* Param for prediction column name.
*
* @group param
*/
final val labelCol: Param[String] = new Param[String](this, "labelCol", "label column name")
setDefault(labelCol, "label")
/** @group getParam */
final def getLabelCol: String = $(labelCol)
}
/**
* Trait for shared param predictionCol (default: "prediction").
*/
private[evaluator] trait HasPredictionCol extends Params {
/**
* Param for prediction column name.
*
* @group param
*/
final val predictionCol: Param[String] = new Param[String](this, "predictionCol", "prediction column name")
setDefault(predictionCol, "prediction")
/** @group getParam */
final def getPredictionCol: String = $(predictionCol)
}
{code}
> Investigate adding a RankingEvaluator to ML
> -------------------------------------------
>
> Key: SPARK-14409
> URL: https://issues.apache.org/jira/browse/SPARK-14409
> Project: Spark
> Issue Type: New Feature
> Components: ML
> Reporter: Nick Pentreath
> Priority: Minor
>
> {{mllib.evaluation}} contains a {{RankingMetrics}} class, while there is no {{RankingEvaluator}} in {{ml.evaluation}}. Such an evaluator can be useful for recommendation evaluation (and can be useful in other settings potentially).
> Should be thought about in conjunction with adding the "recommendAll" methods in SPARK-13857, so that top-k ranking metrics can be used in cross-validators.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org