You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by sr...@apache.org on 2022/06/16 13:25:19 UTC

[spark] branch master updated: [SPARK-39446][MLLIB] Add relevance score for nDCG evaluation

This is an automated email from the ASF dual-hosted git repository.

srowen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 81208992cb0 [SPARK-39446][MLLIB] Add relevance score for nDCG evaluation
81208992cb0 is described below

commit 81208992cb007e4addc6148600433da1f1a5c9ab
Author: uchiiii <uc...@gmail.com>
AuthorDate: Thu Jun 16 08:24:57 2022 -0500

    [SPARK-39446][MLLIB] Add relevance score for nDCG evaluation
    
    ### What changes were proposed in this pull request?
    - To add relevance score to evaluate nDCG in the function `ndcgAt`.
    - To extend the interface of constructor of `RankingMetrics` class.
    
    ### Why are the changes needed?
    - The precise definition of nDCG is [here on Wikipedia](https://en.wikipedia.org/wiki/Discounted_cumulative_gain), where relevance score is used. Currently, the implementation of nDCG on spark (MLlib) treats this as binary (0 or 1). This PR is to extend the `ndcgAt` function to be able to treat relevance score.
    
    ### Does this PR introduce _any_ user-facing change?
    - I extended the interface of `RankingMetrics` class for `ndcgAt` function, so now it accepts `RDD[(Array[T], Array[T], Array[Double])]` or `RDD[(Array[T], Array[T])])` as the constructor arguments while the `RDD[(Array[T], Array[T])])` was only accepted.
    
    ### How was this patch tested?
    - One test was added in [mllib/src/test/scala/org/apache/spark/mllib/evaluation/RankingMetricsSuite.scala](https://github.com/apache/spark/pull/36843/files#diff-b85de09fb1428c03471c7c46305ebb9171964e9743d3ac5f4cd9bcb14bdcf6fd)
    
    Closes #36843 from uchiiii/add_relevance_to_ndcg.
    
    Authored-by: uchiiii <uc...@gmail.com>
    Signed-off-by: Sean Owen <sr...@gmail.com>
---
 .../spark/mllib/evaluation/RankingMetrics.scala    | 78 +++++++++++++------
 .../mllib/evaluation/RankingMetricsSuite.scala     | 87 ++++++++++++++++------
 2 files changed, 120 insertions(+), 45 deletions(-)

diff --git a/mllib/src/main/scala/org/apache/spark/mllib/evaluation/RankingMetrics.scala b/mllib/src/main/scala/org/apache/spark/mllib/evaluation/RankingMetrics.scala
index 9e35ee2d60f..7fccff9a24e 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/evaluation/RankingMetrics.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/evaluation/RankingMetrics.scala
@@ -32,11 +32,23 @@ import org.apache.spark.rdd.RDD
  *
  * Java users should use `RankingMetrics$.of` to create a [[RankingMetrics]] instance.
  *
- * @param predictionAndLabels an RDD of (predicted ranking, ground truth set) pairs.
+ * @param predictionAndLabels an RDD of (predicted ranking, ground truth set) pair
+ *                            or (predicted ranking, ground truth set,
+ * .                          relevance value of ground truth set).
+ *                            Since 3.4.0, it supports ndcg evaluation with relevance value.
  */
 @Since("1.2.0")
-class RankingMetrics[T: ClassTag](predictionAndLabels: RDD[(Array[T], Array[T])])
-  extends Logging with Serializable {
+class RankingMetrics[T: ClassTag] @Since("3.4.0") (
+    predictionAndLabels: RDD[(Array[T], Array[T], Array[Double])])
+    extends Logging
+    with Serializable {
+
+  @Since("1.2.0")
+  def this(predictionAndLabelsWithoutRelevance: => RDD[(Array[T], Array[T])]) = {
+    this(predictionAndLabelsWithoutRelevance.map {
+      case (pred, lab) => (pred, lab, Array.empty[Double])
+    })
+  }
 
   /**
    * Compute the average precision of all the queries, truncated at ranking position k.
@@ -58,7 +70,7 @@ class RankingMetrics[T: ClassTag](predictionAndLabels: RDD[(Array[T], Array[T])]
   @Since("1.2.0")
   def precisionAt(k: Int): Double = {
     require(k > 0, "ranking position k should be positive")
-    predictionAndLabels.map { case (pred, lab) =>
+    predictionAndLabels.map { case (pred, lab, _) =>
       countRelevantItemRatio(pred, lab, k, k)
     }.mean()
   }
@@ -70,7 +82,7 @@ class RankingMetrics[T: ClassTag](predictionAndLabels: RDD[(Array[T], Array[T])]
    */
   @Since("1.2.0")
   lazy val meanAveragePrecision: Double = {
-    predictionAndLabels.map { case (pred, lab) =>
+    predictionAndLabels.map { case (pred, lab, _) =>
       val labSet = lab.toSet
       val k = math.max(pred.length, labSet.size)
       averagePrecision(pred, labSet, k)
@@ -87,7 +99,7 @@ class RankingMetrics[T: ClassTag](predictionAndLabels: RDD[(Array[T], Array[T])]
   @Since("3.0.0")
   def meanAveragePrecisionAt(k: Int): Double = {
     require(k > 0, "ranking position k should be positive")
-    predictionAndLabels.map { case (pred, lab) =>
+    predictionAndLabels.map { case (pred, lab, _) =>
       averagePrecision(pred, lab.toSet, k)
     }.mean()
   }
@@ -127,7 +139,7 @@ class RankingMetrics[T: ClassTag](predictionAndLabels: RDD[(Array[T], Array[T])]
    * The discounted cumulative gain at position k is computed as:
    *    sum,,i=1,,^k^ (2^{relevance of ''i''th item}^ - 1) / log(i + 1),
    * and the NDCG is obtained by dividing the DCG value on the ground truth set. In the current
-   * implementation, the relevance value is binary.
+   * implementation, the relevance value is binary if the relevance value is empty.
 
    * If a query has an empty ground truth set, zero will be used as ndcg together with
    * a log warning.
@@ -142,8 +154,15 @@ class RankingMetrics[T: ClassTag](predictionAndLabels: RDD[(Array[T], Array[T])]
   @Since("1.2.0")
   def ndcgAt(k: Int): Double = {
     require(k > 0, "ranking position k should be positive")
-    predictionAndLabels.map { case (pred, lab) =>
+    predictionAndLabels.map { case (pred, lab, rel) =>
+      val useBinary = rel.isEmpty
       val labSet = lab.toSet
+      val relMap = lab.zip(rel).toMap
+      if (useBinary && lab.size != rel.size) {
+        logWarning(
+          "# of ground truth set and # of relevance value set should be equal, " +
+            "check input data")
+      }
 
       if (labSet.nonEmpty) {
         val labSetSize = labSet.size
@@ -152,18 +171,32 @@ class RankingMetrics[T: ClassTag](predictionAndLabels: RDD[(Array[T], Array[T])]
         var dcg = 0.0
         var i = 0
         while (i < n) {
-          // Base of the log doesn't matter for calculating NDCG,
-          // if the relevance value is binary.
-          val gain = 1.0 / math.log(i + 2)
-          if (i < pred.length && labSet.contains(pred(i))) {
-            dcg += gain
-          }
-          if (i < labSetSize) {
-            maxDcg += gain
+          if (useBinary) {
+            // Base of the log doesn't matter for calculating NDCG,
+            // if the relevance value is binary.
+            val gain = 1.0 / math.log(i + 2)
+            if (i < pred.length && labSet.contains(pred(i))) {
+              dcg += gain
+            }
+            if (i < labSetSize) {
+              maxDcg += gain
+            }
+          } else {
+            if (i < pred.length) {
+              dcg += (math.pow(2.0, relMap.getOrElse(pred(i), 0.0)) - 1) / math.log(i + 2)
+            }
+            if (i < labSetSize) {
+              maxDcg += (math.pow(2.0, relMap.getOrElse(lab(i), 0.0)) - 1) / math.log(i + 2)
+            }
           }
           i += 1
         }
-        dcg / maxDcg
+        if (maxDcg == 0.0) {
+          logWarning("Maximum of relevance of ground truth set is zero, check input data")
+          0.0
+        } else {
+          dcg / maxDcg
+        }
       } else {
         logWarning("Empty ground truth set, check input data")
         0.0
@@ -191,7 +224,7 @@ class RankingMetrics[T: ClassTag](predictionAndLabels: RDD[(Array[T], Array[T])]
   @Since("3.0.0")
   def recallAt(k: Int): Double = {
     require(k > 0, "ranking position k should be positive")
-    predictionAndLabels.map { case (pred, lab) =>
+    predictionAndLabels.map { case (pred, lab, _) =>
       countRelevantItemRatio(pred, lab, k, lab.toSet.size)
     }.mean()
   }
@@ -207,10 +240,11 @@ class RankingMetrics[T: ClassTag](predictionAndLabels: RDD[(Array[T], Array[T])]
    * @param denominator the denominator of ratio
    * @return relevant item ratio at the first k ranking positions
    */
-  private def countRelevantItemRatio(pred: Array[T],
-                                     lab: Array[T],
-                                     k: Int,
-                                     denominator: Int): Double = {
+  private def countRelevantItemRatio(
+      pred: Array[T],
+      lab: Array[T],
+      k: Int,
+      denominator: Int): Double = {
     val labSet = lab.toSet
     if (labSet.nonEmpty) {
       val n = math.min(pred.length, k)
diff --git a/mllib/src/test/scala/org/apache/spark/mllib/evaluation/RankingMetricsSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/evaluation/RankingMetricsSuite.scala
index 489eb15f4db..a10cb5c9a4e 100644
--- a/mllib/src/test/scala/org/apache/spark/mllib/evaluation/RankingMetricsSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/mllib/evaluation/RankingMetricsSuite.scala
@@ -28,20 +28,20 @@ class RankingMetricsSuite extends SparkFunSuite with MLlibTestSparkContext {
       Seq(
         (Array(1, 6, 2, 7, 8, 3, 9, 10, 4, 5), Array(1, 2, 3, 4, 5)),
         (Array(4, 1, 5, 6, 2, 7, 3, 8, 9, 10), Array(1, 2, 3)),
-        (Array(1, 2, 3, 4, 5), Array.empty[Int])
-      ), 2)
-    val eps = 1.0E-5
+        (Array(1, 2, 3, 4, 5), Array.empty[Int])),
+      2)
+    val eps = 1.0e-5
 
     val metrics = new RankingMetrics(predictionAndLabels)
     val map = metrics.meanAveragePrecision
 
-    assert(metrics.precisionAt(1) ~== 1.0/3 absTol eps)
-    assert(metrics.precisionAt(2) ~== 1.0/3 absTol eps)
-    assert(metrics.precisionAt(3) ~== 1.0/3 absTol eps)
-    assert(metrics.precisionAt(4) ~== 0.75/3 absTol eps)
-    assert(metrics.precisionAt(5) ~== 0.8/3 absTol eps)
-    assert(metrics.precisionAt(10) ~== 0.8/3 absTol eps)
-    assert(metrics.precisionAt(15) ~== 8.0/45 absTol eps)
+    assert(metrics.precisionAt(1) ~== 1.0 / 3 absTol eps)
+    assert(metrics.precisionAt(2) ~== 1.0 / 3 absTol eps)
+    assert(metrics.precisionAt(3) ~== 1.0 / 3 absTol eps)
+    assert(metrics.precisionAt(4) ~== 0.75 / 3 absTol eps)
+    assert(metrics.precisionAt(5) ~== 0.8 / 3 absTol eps)
+    assert(metrics.precisionAt(10) ~== 0.8 / 3 absTol eps)
+    assert(metrics.precisionAt(15) ~== 8.0 / 45 absTol eps)
 
     assert(map ~== 0.355026 absTol eps)
 
@@ -49,27 +49,68 @@ class RankingMetricsSuite extends SparkFunSuite with MLlibTestSparkContext {
     assert(metrics.meanAveragePrecisionAt(2) ~== 0.25 absTol eps)
     assert(metrics.meanAveragePrecisionAt(3) ~== 0.24074 absTol eps)
 
-    assert(metrics.ndcgAt(3) ~== 1.0/3 absTol eps)
+    assert(metrics.ndcgAt(3) ~== 1.0 / 3 absTol eps)
     assert(metrics.ndcgAt(5) ~== 0.328788 absTol eps)
     assert(metrics.ndcgAt(10) ~== 0.487913 absTol eps)
     assert(metrics.ndcgAt(15) ~== metrics.ndcgAt(10) absTol eps)
 
-    assert(metrics.recallAt(1) ~== 1.0/15 absTol eps)
-    assert(metrics.recallAt(2) ~== 8.0/45 absTol eps)
-    assert(metrics.recallAt(3) ~== 11.0/45 absTol eps)
-    assert(metrics.recallAt(4) ~== 11.0/45 absTol eps)
-    assert(metrics.recallAt(5) ~== 16.0/45 absTol eps)
-    assert(metrics.recallAt(10) ~== 2.0/3 absTol eps)
-    assert(metrics.recallAt(15) ~== 2.0/3 absTol eps)
+    assert(metrics.recallAt(1) ~== 1.0 / 15 absTol eps)
+    assert(metrics.recallAt(2) ~== 8.0 / 45 absTol eps)
+    assert(metrics.recallAt(3) ~== 11.0 / 45 absTol eps)
+    assert(metrics.recallAt(4) ~== 11.0 / 45 absTol eps)
+    assert(metrics.recallAt(5) ~== 16.0 / 45 absTol eps)
+    assert(metrics.recallAt(10) ~== 2.0 / 3 absTol eps)
+    assert(metrics.recallAt(15) ~== 2.0 / 3 absTol eps)
   }
 
-  test("MAP, NDCG, Recall with few predictions (SPARK-14886)") {
+  test("Ranking metrics: NDCG with relevance") {
     val predictionAndLabels = sc.parallelize(
       Seq(
-        (Array(1, 6, 2), Array(1, 2, 3, 4, 5)),
-        (Array.empty[Int], Array(1, 2, 3))
-      ), 2)
-    val eps = 1.0E-5
+        (
+          Array(1, 6, 2, 7, 8, 3, 9, 10, 4, 5),
+          Array(1, 2, 3, 4, 5),
+          Array(3.0, 2.0, 1.0, 1.0, 1.0)),
+        (Array(4, 1, 5, 6, 2, 7, 3, 8, 9, 10), Array(1, 2, 3), Array(2.0, 0.0, 0.0)),
+        (Array(1, 2, 3, 4, 5), Array.empty[Int], Array.empty[Double])),
+      2)
+    val eps = 1.0e-5
+
+    val metrics = new RankingMetrics(predictionAndLabels)
+    val map = metrics.meanAveragePrecision
+
+    assert(metrics.precisionAt(1) ~== 1.0 / 3 absTol eps)
+    assert(metrics.precisionAt(2) ~== 1.0 / 3 absTol eps)
+    assert(metrics.precisionAt(3) ~== 1.0 / 3 absTol eps)
+    assert(metrics.precisionAt(4) ~== 0.75 / 3 absTol eps)
+    assert(metrics.precisionAt(5) ~== 0.8 / 3 absTol eps)
+    assert(metrics.precisionAt(10) ~== 0.8 / 3 absTol eps)
+    assert(metrics.precisionAt(15) ~== 8.0 / 45 absTol eps)
+
+    assert(map ~== 0.355026 absTol eps)
+
+    assert(metrics.meanAveragePrecisionAt(1) ~== 0.333334 absTol eps)
+    assert(metrics.meanAveragePrecisionAt(2) ~== 0.25 absTol eps)
+    assert(metrics.meanAveragePrecisionAt(3) ~== 0.24074 absTol eps)
+
+    assert(metrics.ndcgAt(3) ~== 0.511959 absTol eps)
+    assert(metrics.ndcgAt(5) ~== 0.487806 absTol eps)
+    assert(metrics.ndcgAt(10) ~== 0.518700 absTol eps)
+    assert(metrics.ndcgAt(15) ~== metrics.ndcgAt(10) absTol eps)
+
+    assert(metrics.recallAt(1) ~== 1.0 / 15 absTol eps)
+    assert(metrics.recallAt(2) ~== 8.0 / 45 absTol eps)
+    assert(metrics.recallAt(3) ~== 11.0 / 45 absTol eps)
+    assert(metrics.recallAt(4) ~== 11.0 / 45 absTol eps)
+    assert(metrics.recallAt(5) ~== 16.0 / 45 absTol eps)
+    assert(metrics.recallAt(10) ~== 2.0 / 3 absTol eps)
+    assert(metrics.recallAt(15) ~== 2.0 / 3 absTol eps)
+  }
+
+  test("MAP, NDCG, Recall with few predictions (SPARK-14886)") {
+    val predictionAndLabels = sc.parallelize(
+      Seq((Array(1, 6, 2), Array(1, 2, 3, 4, 5)), (Array.empty[Int], Array(1, 2, 3))),
+      2)
+    val eps = 1.0e-5
 
     val metrics = new RankingMetrics(predictionAndLabels)
     assert(metrics.precisionAt(1) ~== 0.5 absTol eps)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org