You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by daniloascione <gi...@git.apache.org> on 2017/01/17 15:33:17 UTC

[GitHub] spark pull request #16618: [SPARK-14409][ML] Add RankingEvaluator

GitHub user daniloascione opened a pull request:

    https://github.com/apache/spark/pull/16618

    [SPARK-14409][ML] Add RankingEvaluator

    ## What changes were proposed in this pull request?
    
    This patch adds the implementation of a Dataframe api based RankingEvaluator to ML (ml.evaluation)
    
    ## How was this patch tested?
    
    Additional test cast has been added.


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/daniloascione/spark SPARK-14409

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/16618.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #16618
    
----
commit c93ab86d35984e9f70a3b4f543fb88f5541333f0
Author: Danilo Ascione <da...@gmail.com>
Date:   2017-01-05T09:34:23Z

    [SPARK-14409][ML] Add RankingEvaluator and MPR metric

commit bfd7dc5f3d08cbef311b7e4828c22efedf2117d8
Author: Danilo Ascione <da...@gmail.com>
Date:   2017-01-10T17:30:15Z

    [SPARK-14409][ML] Handle NaN in predictions

commit 3b23bfb035514ce2a039b03d3e4ecb881f68a0f6
Author: Danilo Ascione <da...@gmail.com>
Date:   2017-01-17T15:22:57Z

    [SPARK-14409][ML] Add basic test

commit ca212db97632e71e9f999b0eac7fe43adb9ee3c6
Author: Danilo Ascione <da...@gmail.com>
Date:   2017-01-17T15:26:20Z

    Merge remote-tracking branch 'upstream/master' into SPARK-14409

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16618: [SPARK-14409][ML] Add RankingEvaluator

Posted by daniloascione <gi...@git.apache.org>.
Github user daniloascione commented on the issue:

    https://github.com/apache/spark/pull/16618
  
    Please consider this PR WIP. Discussion in JIRA https://issues.apache.org/jira/browse/SPARK-14409


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #16618: [SPARK-14409][ML][WIP] Add RankingEvaluator

Posted by daniloascione <gi...@git.apache.org>.
Github user daniloascione commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16618#discussion_r113702013
  
    --- Diff: mllib/src/main/scala/org/apache/spark/ml/evaluation/RankingEvaluator.scala ---
    @@ -0,0 +1,138 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.spark.ml.evaluation
    +
    +import org.apache.spark.annotation.{Experimental, Since}
    +import org.apache.spark.ml.param.{IntParam, Param, ParamMap, ParamValidators}
    +import org.apache.spark.ml.param.shared.{HasLabelCol, HasPredictionCol}
    +import org.apache.spark.ml.util.{DefaultParamsReadable, DefaultParamsWritable, Identifiable, SchemaUtils}
    +import org.apache.spark.sql.{DataFrame, Dataset}
    +import org.apache.spark.sql.expressions.Window
    +import org.apache.spark.sql.functions.{coalesce, col, collect_list, row_number, udf}
    +import org.apache.spark.sql.types.LongType
    +
    +/**
    + * Evaluator for ranking.
    + */
    +@Since("2.2.0")
    +@Experimental
    +final class RankingEvaluator @Since("2.2.0")(@Since("2.2.0") override val uid: String)
    +  extends Evaluator with HasPredictionCol with HasLabelCol with DefaultParamsWritable {
    +
    +  @Since("2.2.0")
    +  def this() = this(Identifiable.randomUID("rankingEval"))
    +
    +  @Since("2.2.0")
    +  val k = new IntParam(this, "k", "Top-K cutoff", (x: Int) => x > 0)
    +
    +  /** @group getParam */
    +  @Since("2.2.0")
    +  def getK: Int = $(k)
    +
    +  /** @group setParam */
    +  @Since("2.2.0")
    +  def setK(value: Int): this.type = set(k, value)
    +
    +  setDefault(k -> 1)
    +
    +  @Since("2.2.0")
    +  val metricName: Param[String] = {
    +    val allowedParams = ParamValidators.inArray(Array("mpr"))
    +    new Param(this, "metricName", "metric name in evaluation (mpr)", allowedParams)
    +  }
    +
    +  /** @group getParam */
    +  @Since("2.2.0")
    +  def getMetricName: String = $(metricName)
    +
    +  /** @group setParam */
    +  @Since("2.2.0")
    +  def setMetricName(value: String): this.type = set(metricName, value)
    +
    +  /** @group setParam */
    +  @Since("2.2.0")
    +  def setPredictionCol(value: String): this.type = set(predictionCol, value)
    +
    +  /** @group setParam */
    +  @Since("2.2.0")
    +  def setLabelCol(value: String): this.type = set(labelCol, value)
    +
    +  /**
    +   * Param for query column name.
    +   * @group param
    +   */
    +  val queryCol: Param[String] = new Param[String](this, "queryCol", "query column name")
    +
    +  setDefault(queryCol, "query")
    +
    +  /** @group getParam */
    +  @Since("2.2.0")
    +  def getQueryCol: String = $(queryCol)
    +
    +  /** @group setParam */
    +  @Since("2.2.0")
    +  def setQueryCol(value: String): this.type = set(queryCol, value)
    +
    +  setDefault(metricName -> "mpr")
    +
    +  @Since("2.2.0")
    +  override def evaluate(dataset: Dataset[_]): Double = {
    +    val schema = dataset.schema
    +    SchemaUtils.checkNumericType(schema, $(predictionCol))
    +    SchemaUtils.checkNumericType(schema, $(labelCol))
    +    SchemaUtils.checkNumericType(schema, $(queryCol))
    +
    +    val w = Window.partitionBy(col($(queryCol))).orderBy(col($(predictionCol)).desc)
    +
    +    val topAtk: DataFrame = dataset
    +      .na.drop("all", Seq($(predictionCol)))
    +      .select(col($(predictionCol)), col($(labelCol)).cast(LongType), col($(queryCol)))
    +      .withColumn("rn", row_number().over(w)).where(col("rn") <= $(k))
    +      .drop("rn")
    +      .groupBy(col($(queryCol)))
    +      .agg(collect_list($(labelCol)).as("topAtk"))
    +
    +    val mapToEmptyArray_ = udf(() => Array.empty[Long])
    +
    +    val predictionAndLabels: DataFrame = dataset
    +      .join(topAtk, Seq($(queryCol)), "outer")
    +      .withColumn("topAtk", coalesce(col("topAtk"), mapToEmptyArray_()))
    +      .select($(labelCol), "topAtk")
    --- End diff --
    
    Yes, I agree. This is currently done in the previous step, when the topAtk Dataframe is calculated ([line 101](https://github.com/apache/spark/pull/16618/files/fa2155af8947347a2fc1e565cf05a19529022266#diff-0345c4cb1878d3bb0d84297202fdc95fR101)).
    
    Unfortunately this is not compatible with RankingMetrics, which expects the format of predictionAndLabels as input. I didn't want to change RankingMetrics in this same PR. 
    So the predictionAndLabels DataFrame is calculated to use the same RankingMetrics from mllib (well, it is now UDFs based, but I didn't touched its logic).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #16618: [SPARK-14409][ML][WIP] Add RankingEvaluator

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/spark/pull/16618


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #16618: [SPARK-14409][ML][WIP] Add RankingEvaluator

Posted by ebernhardson <gi...@git.apache.org>.
Github user ebernhardson commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16618#discussion_r113358325
  
    --- Diff: mllib/src/main/scala/org/apache/spark/ml/evaluation/RankingEvaluator.scala ---
    @@ -0,0 +1,138 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.spark.ml.evaluation
    +
    +import org.apache.spark.annotation.{Experimental, Since}
    +import org.apache.spark.ml.param.{IntParam, Param, ParamMap, ParamValidators}
    +import org.apache.spark.ml.param.shared.{HasLabelCol, HasPredictionCol}
    +import org.apache.spark.ml.util.{DefaultParamsReadable, DefaultParamsWritable, Identifiable, SchemaUtils}
    +import org.apache.spark.sql.{DataFrame, Dataset}
    +import org.apache.spark.sql.expressions.Window
    +import org.apache.spark.sql.functions.{coalesce, col, collect_list, row_number, udf}
    +import org.apache.spark.sql.types.LongType
    +
    +/**
    + * Evaluator for ranking.
    + */
    +@Since("2.2.0")
    +@Experimental
    +final class RankingEvaluator @Since("2.2.0")(@Since("2.2.0") override val uid: String)
    +  extends Evaluator with HasPredictionCol with HasLabelCol with DefaultParamsWritable {
    +
    +  @Since("2.2.0")
    +  def this() = this(Identifiable.randomUID("rankingEval"))
    +
    +  @Since("2.2.0")
    +  val k = new IntParam(this, "k", "Top-K cutoff", (x: Int) => x > 0)
    +
    +  /** @group getParam */
    +  @Since("2.2.0")
    +  def getK: Int = $(k)
    +
    +  /** @group setParam */
    +  @Since("2.2.0")
    +  def setK(value: Int): this.type = set(k, value)
    +
    +  setDefault(k -> 1)
    +
    +  @Since("2.2.0")
    +  val metricName: Param[String] = {
    +    val allowedParams = ParamValidators.inArray(Array("mpr"))
    +    new Param(this, "metricName", "metric name in evaluation (mpr)", allowedParams)
    +  }
    +
    +  /** @group getParam */
    +  @Since("2.2.0")
    +  def getMetricName: String = $(metricName)
    +
    +  /** @group setParam */
    +  @Since("2.2.0")
    +  def setMetricName(value: String): this.type = set(metricName, value)
    +
    +  /** @group setParam */
    +  @Since("2.2.0")
    +  def setPredictionCol(value: String): this.type = set(predictionCol, value)
    +
    +  /** @group setParam */
    +  @Since("2.2.0")
    +  def setLabelCol(value: String): this.type = set(labelCol, value)
    +
    +  /**
    +   * Param for query column name.
    +   * @group param
    +   */
    +  val queryCol: Param[String] = new Param[String](this, "queryCol", "query column name")
    +
    +  setDefault(queryCol, "query")
    +
    +  /** @group getParam */
    +  @Since("2.2.0")
    +  def getQueryCol: String = $(queryCol)
    +
    +  /** @group setParam */
    +  @Since("2.2.0")
    +  def setQueryCol(value: String): this.type = set(queryCol, value)
    +
    +  setDefault(metricName -> "mpr")
    +
    +  @Since("2.2.0")
    +  override def evaluate(dataset: Dataset[_]): Double = {
    +    val schema = dataset.schema
    +    SchemaUtils.checkNumericType(schema, $(predictionCol))
    +    SchemaUtils.checkNumericType(schema, $(labelCol))
    +    SchemaUtils.checkNumericType(schema, $(queryCol))
    +
    +    val w = Window.partitionBy(col($(queryCol))).orderBy(col($(predictionCol)).desc)
    +
    +    val topAtk: DataFrame = dataset
    +      .na.drop("all", Seq($(predictionCol)))
    +      .select(col($(predictionCol)), col($(labelCol)).cast(LongType), col($(queryCol)))
    +      .withColumn("rn", row_number().over(w)).where(col("rn") <= $(k))
    +      .drop("rn")
    +      .groupBy(col($(queryCol)))
    +      .agg(collect_list($(labelCol)).as("topAtk"))
    +
    +    val mapToEmptyArray_ = udf(() => Array.empty[Long])
    +
    +    val predictionAndLabels: DataFrame = dataset
    +      .join(topAtk, Seq($(queryCol)), "outer")
    +      .withColumn("topAtk", coalesce(col("topAtk"), mapToEmptyArray_()))
    +      .select($(labelCol), "topAtk")
    --- End diff --
    
    Don't we also need to run an aggregation on the label column, roughly the same as the previous aggregation but using labelCol as the sort instead of predictionCol? 
    
    Currently this generates a row per prediction, when ranking tasks should have a row per query. I think the aggregation should be run twice, then those two aggregations should be joined together on queryCol. That would result in a dataset containing (actual labels of top k predictions, actual labels of top k actual)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16618: [SPARK-14409][ML][WIP] Add RankingEvaluator

Posted by MLnick <gi...@git.apache.org>.
Github user MLnick commented on the issue:

    https://github.com/apache/spark/pull/16618
  
    @daniloascione any update?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16618: [SPARK-14409][ML][WIP] Add RankingEvaluator

Posted by Kornel <gi...@git.apache.org>.
Github user Kornel commented on the issue:

    https://github.com/apache/spark/pull/16618
  
    @MLnick I'm wondering what's the status of this issue: seems closed, have you any plans on picking it up again?
    
    I might pick it up, but I'm not sure what's left: move from package mllib to ml and maybe a python API?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16618: [SPARK-14409][ML][WIP] Add RankingEvaluator

Posted by daniloascione <gi...@git.apache.org>.
Github user daniloascione commented on the issue:

    https://github.com/apache/spark/pull/16618
  
    I rewrote the ranking metrics from the mllib package as UDFs (as suggested [here](https://issues.apache.org/jira/browse/SPARK-14409?focusedCommentId=15896933&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-15896933)) with [minimum changes](https://github.com/apache/spark/pull/16618/commits/fa2155af8947347a2fc1e565cf05a19529022266) to the logic.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16618: [SPARK-14409][ML][WIP] Add RankingEvaluator

Posted by MLnick <gi...@git.apache.org>.
Github user MLnick commented on the issue:

    https://github.com/apache/spark/pull/16618
  
    The basic direction looks right - I won't have time to review immediately. Spark 2.2 QA code freeze will happen shortly so this will wait until 2.3 dev cycle starts


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16618: [SPARK-14409][ML][WIP] Add RankingEvaluator

Posted by acompa <gi...@git.apache.org>.
Github user acompa commented on the issue:

    https://github.com/apache/spark/pull/16618
  
    I'm also curious about this @MLnick. Seems like there was a lot of movement earlier this year, but this PR has gotten stale. 
    
    I can also contribute if @Kornel cannot for any reason.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16618: [SPARK-14409][ML] Add RankingEvaluator

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/16618
  
    Can one of the admins verify this patch?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #16618: [SPARK-14409][ML][WIP] Add RankingEvaluator

Posted by daniloascione <gi...@git.apache.org>.
Github user daniloascione commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16618#discussion_r113721386
  
    --- Diff: mllib/src/main/scala/org/apache/spark/ml/evaluation/RankingMetrics.scala ---
    @@ -0,0 +1,202 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.spark.ml.evaluation
    +
    +import org.apache.spark.annotation.Since
    +import org.apache.spark.internal.Logging
    +import org.apache.spark.sql.{Column, DataFrame}
    +import org.apache.spark.sql.functions.{mean, sum}
    +import org.apache.spark.sql.functions.udf
    +import org.apache.spark.sql.types.DoubleType
    +
    +@Since("2.2.0")
    +class RankingMetrics(
    +  predictionAndObservations: DataFrame, predictionCol: String, labelCol: String)
    +  extends Logging with Serializable {
    +
    +  /**
    +   * Compute the Mean Percentile Rank (MPR) of all the queries.
    +   *
    +   * See the following paper for detail ("Expected percentile rank" in the paper):
    +   * Hu, Y., Y. Koren, and C. Volinsky. \u201cCollaborative Filtering for Implicit Feedback Datasets.\u201d
    +   * In 2008 Eighth IEEE International Conference on Data Mining, 263\u201372, 2008.
    +   * doi:10.1109/ICDM.2008.22.
    +   *
    +   * @return the mean percentile rank
    +   */
    +  lazy val meanPercentileRank: Double = {
    +
    +    def rank = udf((predicted: Seq[Any], actual: Any) => {
    +      val l_i = predicted.indexOf(actual)
    +
    +      if (l_i == -1) {
    +        1
    +      } else {
    +        l_i.toDouble / predicted.size
    +      }
    +    }, DoubleType)
    +
    +    val R_prime = predictionAndObservations.count()
    +    val predictionColumn: Column = predictionAndObservations.col(predictionCol)
    +    val labelColumn: Column = predictionAndObservations.col(labelCol)
    +
    +    val rankSum: Double = predictionAndObservations
    +      .withColumn("rank", rank(predictionColumn, labelColumn))
    +      .agg(sum("rank")).first().getDouble(0)
    +
    +    rankSum / R_prime
    +  }
    +
    +  /**
    +   * Compute the average precision of all the queries, truncated at ranking position k.
    +   *
    +   * If for a query, the ranking algorithm returns n (n is less than k) results, the precision
    +   * value will be computed as #(relevant items retrieved) / k. This formula also applies when
    +   * the size of the ground truth set is less than k.
    +   *
    +   * If a query has an empty ground truth set, zero will be used as precision together with
    +   * a log warning.
    +   *
    +   * See the following paper for detail:
    +   *
    +   * IR evaluation methods for retrieving highly relevant documents. K. Jarvelin and J. Kekalainen
    +   *
    +   * @param k the position to compute the truncated precision, must be positive
    +   * @return the average precision at the first k ranking positions
    +   */
    +  @Since("2.2.0")
    +  def precisionAt(k: Int): Double = {
    +    require(k > 0, "ranking position k should be positive")
    +
    +    def precisionAtK = udf((predicted: Seq[Any], actual: Seq[Any]) => {
    +      val actualSet = actual.toSet
    +      if (actualSet.nonEmpty) {
    +        val n = math.min(predicted.length, k)
    +        var i = 0
    +        var cnt = 0
    +        while (i < n) {
    +          if (actualSet.contains(predicted(i))) {
    +            cnt += 1
    +          }
    +          i += 1
    +        }
    +        cnt.toDouble / k
    +      } else {
    +        logWarning("Empty ground truth set, check input data")
    +        0.0
    +      }
    +    }, DoubleType)
    +
    +    val predictionColumn: Column = predictionAndObservations.col(predictionCol)
    +    val labelColumn: Column = predictionAndObservations.col(labelCol)
    +
    +    predictionAndObservations
    +      .withColumn("predictionAtK", precisionAtK(predictionColumn, labelColumn))
    +      .agg(mean("predictionAtK")).first().getDouble(0)
    +  }
    +
    +  /**
    +   * Returns the mean average precision (MAP) of all the queries.
    +   * If a query has an empty ground truth set, the average precision will be zero and a log
    +   * warning is generated.
    +   */
    +  lazy val meanAveragePrecision: Double = {
    +
    +    def map = udf((predicted: Seq[Any], actual: Seq[Any]) => {
    +      val actualSet = actual.toSet
    +      if (actualSet.nonEmpty) {
    +        var i = 0
    +        var cnt = 0
    +        var precSum = 0.0
    +        val n = predicted.length
    +        while (i < n) {
    +          if (actualSet.contains(predicted(i))) {
    +            cnt += 1
    +            precSum += cnt.toDouble / (i + 1)
    +          }
    +          i += 1
    +        }
    +        precSum / actualSet.size
    +      } else {
    +        logWarning("Empty ground truth set, check input data")
    +        0.0
    +      }
    +    }, DoubleType)
    +
    +    val predictionColumn: Column = predictionAndObservations.col(predictionCol)
    +    val labelColumn: Column = predictionAndObservations.col(labelCol)
    +
    +    predictionAndObservations
    +      .withColumn("MAP", map(predictionColumn, labelColumn))
    +      .agg(mean("MAP")).first().getDouble(0)
    +  }
    +
    +  /**
    +   * Compute the average NDCG value of all the queries, truncated at ranking position k.
    +   * The discounted cumulative gain at position k is computed as:
    +   *    sum,,i=1,,^k^ (2^{relevance of ''i''th item}^ - 1) / log(i + 1),
    +   * and the NDCG is obtained by dividing the DCG value on the ground truth set. In the current
    +   * implementation, the relevance value is binary.
    +
    +   * If a query has an empty ground truth set, zero will be used as ndcg together with
    +   * a log warning.
    +   *
    +   * See the following paper for detail:
    +   *
    +   * IR evaluation methods for retrieving highly relevant documents. K. Jarvelin and J. Kekalainen
    +   *
    +   * @param k the position to compute the truncated ndcg, must be positive
    +   * @return the average ndcg at the first k ranking positions
    +   */
    +  @Since("2.2.0")
    +  def ndcgAt(k: Int): Double = {
    +    require(k > 0, "ranking position k should be positive")
    +
    +    def ndcgAtK = udf((predicted: Seq[Any], actual: Seq[Any]) => {
    +      val actualSet = actual.toSet
    +
    +      if (actualSet.nonEmpty) {
    +        val labSetSize = actualSet.size
    +        val n = math.min(math.max(predicted.length, labSetSize), k)
    +        var maxDcg = 0.0
    +        var dcg = 0.0
    +        var i = 0
    +        while (i < n) {
    +          val gain = 1.0 / math.log(i + 2)
    +          if (i < predicted.length && actualSet.contains(predicted(i))) {
    --- End diff --
    
    Yes, this should be fixed in another PR to keep changes isolated. FYI, original JIRA for this is [here](https://issues.apache.org/jira/browse/SPARK-3568)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16618: [SPARK-14409][ML][WIP] Add RankingEvaluator

Posted by MLnick <gi...@git.apache.org>.
Github user MLnick commented on the issue:

    https://github.com/apache/spark/pull/16618
  
    @daniloascione are you able to update this? I'd like to target for `2.3`.
    
    But, can we do the following:
    
    1. Port over ranking metrics (udfs) to `ml` with the `RankingEvaluator` as you've done, but _excluding_ MPR
    2. Also don't make any logic changes for the metrics calculation
    
    Let's focus on porting things over and getting the API right. Then create follow up tickets for additional metrics (MPR and any others) as well as looking into correcting the logic and/or naming of the existing metrics.
    
    Thanks!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #16618: [SPARK-14409][ML][WIP] Add RankingEvaluator

Posted by ebernhardson <gi...@git.apache.org>.
Github user ebernhardson commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16618#discussion_r113355473
  
    --- Diff: mllib/src/main/scala/org/apache/spark/ml/evaluation/RankingMetrics.scala ---
    @@ -0,0 +1,202 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.spark.ml.evaluation
    +
    +import org.apache.spark.annotation.Since
    +import org.apache.spark.internal.Logging
    +import org.apache.spark.sql.{Column, DataFrame}
    +import org.apache.spark.sql.functions.{mean, sum}
    +import org.apache.spark.sql.functions.udf
    +import org.apache.spark.sql.types.DoubleType
    +
    +@Since("2.2.0")
    +class RankingMetrics(
    +  predictionAndObservations: DataFrame, predictionCol: String, labelCol: String)
    +  extends Logging with Serializable {
    +
    +  /**
    +   * Compute the Mean Percentile Rank (MPR) of all the queries.
    +   *
    +   * See the following paper for detail ("Expected percentile rank" in the paper):
    +   * Hu, Y., Y. Koren, and C. Volinsky. \u201cCollaborative Filtering for Implicit Feedback Datasets.\u201d
    +   * In 2008 Eighth IEEE International Conference on Data Mining, 263\u201372, 2008.
    +   * doi:10.1109/ICDM.2008.22.
    +   *
    +   * @return the mean percentile rank
    +   */
    +  lazy val meanPercentileRank: Double = {
    +
    +    def rank = udf((predicted: Seq[Any], actual: Any) => {
    +      val l_i = predicted.indexOf(actual)
    +
    +      if (l_i == -1) {
    +        1
    +      } else {
    +        l_i.toDouble / predicted.size
    +      }
    +    }, DoubleType)
    +
    +    val R_prime = predictionAndObservations.count()
    +    val predictionColumn: Column = predictionAndObservations.col(predictionCol)
    +    val labelColumn: Column = predictionAndObservations.col(labelCol)
    +
    +    val rankSum: Double = predictionAndObservations
    +      .withColumn("rank", rank(predictionColumn, labelColumn))
    +      .agg(sum("rank")).first().getDouble(0)
    +
    +    rankSum / R_prime
    +  }
    +
    +  /**
    +   * Compute the average precision of all the queries, truncated at ranking position k.
    +   *
    +   * If for a query, the ranking algorithm returns n (n is less than k) results, the precision
    +   * value will be computed as #(relevant items retrieved) / k. This formula also applies when
    +   * the size of the ground truth set is less than k.
    +   *
    +   * If a query has an empty ground truth set, zero will be used as precision together with
    +   * a log warning.
    +   *
    +   * See the following paper for detail:
    +   *
    +   * IR evaluation methods for retrieving highly relevant documents. K. Jarvelin and J. Kekalainen
    +   *
    +   * @param k the position to compute the truncated precision, must be positive
    +   * @return the average precision at the first k ranking positions
    +   */
    +  @Since("2.2.0")
    +  def precisionAt(k: Int): Double = {
    +    require(k > 0, "ranking position k should be positive")
    +
    +    def precisionAtK = udf((predicted: Seq[Any], actual: Seq[Any]) => {
    +      val actualSet = actual.toSet
    +      if (actualSet.nonEmpty) {
    +        val n = math.min(predicted.length, k)
    +        var i = 0
    +        var cnt = 0
    +        while (i < n) {
    +          if (actualSet.contains(predicted(i))) {
    +            cnt += 1
    +          }
    +          i += 1
    +        }
    +        cnt.toDouble / k
    +      } else {
    +        logWarning("Empty ground truth set, check input data")
    +        0.0
    +      }
    +    }, DoubleType)
    +
    +    val predictionColumn: Column = predictionAndObservations.col(predictionCol)
    +    val labelColumn: Column = predictionAndObservations.col(labelCol)
    +
    +    predictionAndObservations
    +      .withColumn("predictionAtK", precisionAtK(predictionColumn, labelColumn))
    +      .agg(mean("predictionAtK")).first().getDouble(0)
    +  }
    +
    +  /**
    +   * Returns the mean average precision (MAP) of all the queries.
    +   * If a query has an empty ground truth set, the average precision will be zero and a log
    +   * warning is generated.
    +   */
    +  lazy val meanAveragePrecision: Double = {
    +
    +    def map = udf((predicted: Seq[Any], actual: Seq[Any]) => {
    +      val actualSet = actual.toSet
    +      if (actualSet.nonEmpty) {
    +        var i = 0
    +        var cnt = 0
    +        var precSum = 0.0
    +        val n = predicted.length
    +        while (i < n) {
    +          if (actualSet.contains(predicted(i))) {
    +            cnt += 1
    +            precSum += cnt.toDouble / (i + 1)
    +          }
    +          i += 1
    +        }
    +        precSum / actualSet.size
    +      } else {
    +        logWarning("Empty ground truth set, check input data")
    +        0.0
    +      }
    +    }, DoubleType)
    +
    +    val predictionColumn: Column = predictionAndObservations.col(predictionCol)
    +    val labelColumn: Column = predictionAndObservations.col(labelCol)
    +
    +    predictionAndObservations
    +      .withColumn("MAP", map(predictionColumn, labelColumn))
    +      .agg(mean("MAP")).first().getDouble(0)
    +  }
    +
    +  /**
    +   * Compute the average NDCG value of all the queries, truncated at ranking position k.
    +   * The discounted cumulative gain at position k is computed as:
    +   *    sum,,i=1,,^k^ (2^{relevance of ''i''th item}^ - 1) / log(i + 1),
    +   * and the NDCG is obtained by dividing the DCG value on the ground truth set. In the current
    +   * implementation, the relevance value is binary.
    +
    +   * If a query has an empty ground truth set, zero will be used as ndcg together with
    +   * a log warning.
    +   *
    +   * See the following paper for detail:
    +   *
    +   * IR evaluation methods for retrieving highly relevant documents. K. Jarvelin and J. Kekalainen
    +   *
    +   * @param k the position to compute the truncated ndcg, must be positive
    +   * @return the average ndcg at the first k ranking positions
    +   */
    +  @Since("2.2.0")
    +  def ndcgAt(k: Int): Double = {
    +    require(k > 0, "ranking position k should be positive")
    +
    +    def ndcgAtK = udf((predicted: Seq[Any], actual: Seq[Any]) => {
    +      val actualSet = actual.toSet
    +
    +      if (actualSet.nonEmpty) {
    +        val labSetSize = actualSet.size
    +        val n = math.min(math.max(predicted.length, labSetSize), k)
    +        var maxDcg = 0.0
    +        var dcg = 0.0
    +        var i = 0
    +        while (i < n) {
    +          val gain = 1.0 / math.log(i + 2)
    --- End diff --
    
    Any particular reason to use 1.0 here instead of the label, as described in the method docs? It might require some re-jiggering as you are accepting Seq[Any] and you instead need a number of some sort (both ints and floats are used in practice)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16618: [SPARK-14409][ML] Add RankingEvaluator

Posted by HyukjinKwon <gi...@git.apache.org>.
Github user HyukjinKwon commented on the issue:

    https://github.com/apache/spark/pull/16618
  
    Could you add `[WIP]` in the title if it is WIP?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #16618: [SPARK-14409][ML][WIP] Add RankingEvaluator

Posted by ebernhardson <gi...@git.apache.org>.
Github user ebernhardson commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16618#discussion_r113360277
  
    --- Diff: mllib/src/main/scala/org/apache/spark/ml/evaluation/RankingMetrics.scala ---
    @@ -0,0 +1,202 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.spark.ml.evaluation
    +
    +import org.apache.spark.annotation.Since
    +import org.apache.spark.internal.Logging
    +import org.apache.spark.sql.{Column, DataFrame}
    +import org.apache.spark.sql.functions.{mean, sum}
    +import org.apache.spark.sql.functions.udf
    +import org.apache.spark.sql.types.DoubleType
    +
    +@Since("2.2.0")
    +class RankingMetrics(
    +  predictionAndObservations: DataFrame, predictionCol: String, labelCol: String)
    +  extends Logging with Serializable {
    +
    +  /**
    +   * Compute the Mean Percentile Rank (MPR) of all the queries.
    +   *
    +   * See the following paper for detail ("Expected percentile rank" in the paper):
    +   * Hu, Y., Y. Koren, and C. Volinsky. \u201cCollaborative Filtering for Implicit Feedback Datasets.\u201d
    +   * In 2008 Eighth IEEE International Conference on Data Mining, 263\u201372, 2008.
    +   * doi:10.1109/ICDM.2008.22.
    +   *
    +   * @return the mean percentile rank
    +   */
    +  lazy val meanPercentileRank: Double = {
    +
    +    def rank = udf((predicted: Seq[Any], actual: Any) => {
    +      val l_i = predicted.indexOf(actual)
    +
    +      if (l_i == -1) {
    +        1
    +      } else {
    +        l_i.toDouble / predicted.size
    +      }
    +    }, DoubleType)
    +
    +    val R_prime = predictionAndObservations.count()
    +    val predictionColumn: Column = predictionAndObservations.col(predictionCol)
    +    val labelColumn: Column = predictionAndObservations.col(labelCol)
    +
    +    val rankSum: Double = predictionAndObservations
    +      .withColumn("rank", rank(predictionColumn, labelColumn))
    +      .agg(sum("rank")).first().getDouble(0)
    +
    +    rankSum / R_prime
    +  }
    +
    +  /**
    +   * Compute the average precision of all the queries, truncated at ranking position k.
    +   *
    +   * If for a query, the ranking algorithm returns n (n is less than k) results, the precision
    +   * value will be computed as #(relevant items retrieved) / k. This formula also applies when
    +   * the size of the ground truth set is less than k.
    +   *
    +   * If a query has an empty ground truth set, zero will be used as precision together with
    +   * a log warning.
    +   *
    +   * See the following paper for detail:
    +   *
    +   * IR evaluation methods for retrieving highly relevant documents. K. Jarvelin and J. Kekalainen
    +   *
    +   * @param k the position to compute the truncated precision, must be positive
    +   * @return the average precision at the first k ranking positions
    +   */
    +  @Since("2.2.0")
    +  def precisionAt(k: Int): Double = {
    +    require(k > 0, "ranking position k should be positive")
    +
    +    def precisionAtK = udf((predicted: Seq[Any], actual: Seq[Any]) => {
    +      val actualSet = actual.toSet
    +      if (actualSet.nonEmpty) {
    +        val n = math.min(predicted.length, k)
    +        var i = 0
    +        var cnt = 0
    +        while (i < n) {
    +          if (actualSet.contains(predicted(i))) {
    +            cnt += 1
    +          }
    +          i += 1
    +        }
    +        cnt.toDouble / k
    +      } else {
    +        logWarning("Empty ground truth set, check input data")
    +        0.0
    +      }
    +    }, DoubleType)
    +
    +    val predictionColumn: Column = predictionAndObservations.col(predictionCol)
    +    val labelColumn: Column = predictionAndObservations.col(labelCol)
    +
    +    predictionAndObservations
    +      .withColumn("predictionAtK", precisionAtK(predictionColumn, labelColumn))
    +      .agg(mean("predictionAtK")).first().getDouble(0)
    +  }
    +
    +  /**
    +   * Returns the mean average precision (MAP) of all the queries.
    +   * If a query has an empty ground truth set, the average precision will be zero and a log
    +   * warning is generated.
    +   */
    +  lazy val meanAveragePrecision: Double = {
    +
    +    def map = udf((predicted: Seq[Any], actual: Seq[Any]) => {
    +      val actualSet = actual.toSet
    +      if (actualSet.nonEmpty) {
    +        var i = 0
    +        var cnt = 0
    +        var precSum = 0.0
    +        val n = predicted.length
    +        while (i < n) {
    +          if (actualSet.contains(predicted(i))) {
    +            cnt += 1
    +            precSum += cnt.toDouble / (i + 1)
    +          }
    +          i += 1
    +        }
    +        precSum / actualSet.size
    +      } else {
    +        logWarning("Empty ground truth set, check input data")
    +        0.0
    +      }
    +    }, DoubleType)
    +
    +    val predictionColumn: Column = predictionAndObservations.col(predictionCol)
    +    val labelColumn: Column = predictionAndObservations.col(labelCol)
    +
    +    predictionAndObservations
    +      .withColumn("MAP", map(predictionColumn, labelColumn))
    +      .agg(mean("MAP")).first().getDouble(0)
    +  }
    +
    +  /**
    +   * Compute the average NDCG value of all the queries, truncated at ranking position k.
    +   * The discounted cumulative gain at position k is computed as:
    +   *    sum,,i=1,,^k^ (2^{relevance of ''i''th item}^ - 1) / log(i + 1),
    +   * and the NDCG is obtained by dividing the DCG value on the ground truth set. In the current
    +   * implementation, the relevance value is binary.
    +
    +   * If a query has an empty ground truth set, zero will be used as ndcg together with
    +   * a log warning.
    +   *
    +   * See the following paper for detail:
    +   *
    +   * IR evaluation methods for retrieving highly relevant documents. K. Jarvelin and J. Kekalainen
    +   *
    +   * @param k the position to compute the truncated ndcg, must be positive
    +   * @return the average ndcg at the first k ranking positions
    +   */
    +  @Since("2.2.0")
    +  def ndcgAt(k: Int): Double = {
    +    require(k > 0, "ranking position k should be positive")
    +
    +    def ndcgAtK = udf((predicted: Seq[Any], actual: Seq[Any]) => {
    +      val actualSet = actual.toSet
    +
    +      if (actualSet.nonEmpty) {
    +        val labSetSize = actualSet.size
    +        val n = math.min(math.max(predicted.length, labSetSize), k)
    +        var maxDcg = 0.0
    +        var dcg = 0.0
    +        var i = 0
    +        while (i < n) {
    +          val gain = 1.0 / math.log(i + 2)
    +          if (i < predicted.length && actualSet.contains(predicted(i))) {
    --- End diff --
    
    This doesn't seem right, there is no overlap between the calculation of dcg and max_dcg. The question asked here should be if the label at predicted(i) is "good". When treating the labels as binary relevant/not relevant I suppose that might use a threshold, but better would be to move away from a binary dcg and use the full equation from the docblock. I understand though that you are not looking to make major updates to the code from mllib, so it would probably be reasonable for someone to fix this in a followup.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org