You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by sr...@apache.org on 2018/09/01 23:08:02 UTC

spark git commit: [SPARK-10697][ML] Add lift to Association rules

Repository: spark
Updated Branches:
  refs/heads/master 6ad8d4c37 -> a3dccd24c


[SPARK-10697][ML] Add lift to Association rules

## What changes were proposed in this pull request?

The PR adds the lift measure to Association rules.

## How was this patch tested?

existing and modified UTs

Closes #22236 from mgaido91/SPARK-10697.

Authored-by: Marco Gaido <ma...@gmail.com>
Signed-off-by: Sean Owen <se...@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a3dccd24
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a3dccd24
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a3dccd24

Branch: refs/heads/master
Commit: a3dccd24c2e932b90e647e678f351f5b5568305b
Parents: 6ad8d4c
Author: Marco Gaido <ma...@gmail.com>
Authored: Sat Sep 1 18:07:58 2018 -0500
Committer: Sean Owen <se...@databricks.com>
Committed: Sat Sep 1 18:07:58 2018 -0500

----------------------------------------------------------------------
 R/pkg/R/mllib_fpm.R                             |  5 +-
 R/pkg/tests/fulltests/test_mllib_fpm.R          |  3 +-
 .../org/apache/spark/ml/fpm/FPGrowth.scala      | 61 +++++++++++++++-----
 .../spark/mllib/fpm/AssociationRules.scala      | 37 ++++++++++--
 .../org/apache/spark/mllib/fpm/FPGrowth.scala   | 25 +++++---
 .../org/apache/spark/ml/fpm/FPGrowthSuite.scala |  6 +-
 project/MimaExcludes.scala                      |  4 ++
 python/pyspark/ml/fpm.py                        |  3 +-
 python/pyspark/ml/tests.py                      |  4 +-
 9 files changed, 108 insertions(+), 40 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/a3dccd24/R/pkg/R/mllib_fpm.R
----------------------------------------------------------------------
diff --git a/R/pkg/R/mllib_fpm.R b/R/pkg/R/mllib_fpm.R
index e239490..4ad34fe 100644
--- a/R/pkg/R/mllib_fpm.R
+++ b/R/pkg/R/mllib_fpm.R
@@ -116,10 +116,11 @@ setMethod("spark.freqItemsets", signature(object = "FPGrowthModel"),
 # Get association rules.
 
 #' @return A \code{SparkDataFrame} with association rules.
-#'         The \code{SparkDataFrame} contains three columns:
+#'         The \code{SparkDataFrame} contains four columns:
 #'         \code{antecedent} (an array of the same type as the input column),
 #'         \code{consequent} (an array of the same type as the input column),
-#'         and \code{condfidence} (confidence).
+#'         \code{condfidence} (confidence for the rule)
+#'         and \code{lift} (lift for the rule)
 #' @rdname spark.fpGrowth
 #' @aliases associationRules,FPGrowthModel-method
 #' @note spark.associationRules(FPGrowthModel) since 2.2.0

http://git-wip-us.apache.org/repos/asf/spark/blob/a3dccd24/R/pkg/tests/fulltests/test_mllib_fpm.R
----------------------------------------------------------------------
diff --git a/R/pkg/tests/fulltests/test_mllib_fpm.R b/R/pkg/tests/fulltests/test_mllib_fpm.R
index 69dda52..d80f66a 100644
--- a/R/pkg/tests/fulltests/test_mllib_fpm.R
+++ b/R/pkg/tests/fulltests/test_mllib_fpm.R
@@ -44,7 +44,8 @@ test_that("spark.fpGrowth", {
   expected_association_rules <- data.frame(
     antecedent = I(list(list("2"), list("3"))),
     consequent = I(list(list("1"), list("1"))),
-    confidence = c(1, 1)
+    confidence = c(1, 1),
+    lift = c(1, 1)
   )
 
   expect_equivalent(expected_association_rules, collect(spark.associationRules(model)))

http://git-wip-us.apache.org/repos/asf/spark/blob/a3dccd24/mllib/src/main/scala/org/apache/spark/ml/fpm/FPGrowth.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/ml/fpm/FPGrowth.scala b/mllib/src/main/scala/org/apache/spark/ml/fpm/FPGrowth.scala
index 85c483c..840a89b 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/fpm/FPGrowth.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/fpm/FPGrowth.scala
@@ -20,6 +20,8 @@ package org.apache.spark.ml.fpm
 import scala.reflect.ClassTag
 
 import org.apache.hadoop.fs.Path
+import org.json4s.{DefaultFormats, JObject}
+import org.json4s.JsonDSL._
 
 import org.apache.spark.annotation.{Experimental, Since}
 import org.apache.spark.ml.{Estimator, Model}
@@ -34,6 +36,7 @@ import org.apache.spark.sql._
 import org.apache.spark.sql.functions._
 import org.apache.spark.sql.types._
 import org.apache.spark.storage.StorageLevel
+import org.apache.spark.util.VersionUtils
 
 /**
  * Common params for FPGrowth and FPGrowthModel
@@ -175,7 +178,8 @@ class FPGrowth @Since("2.2.0") (
     if (handlePersistence) {
       items.persist(StorageLevel.MEMORY_AND_DISK)
     }
-
+    val inputRowCount = items.count()
+    instr.logNumExamples(inputRowCount)
     val parentModel = mllibFP.run(items)
     val rows = parentModel.freqItemsets.map(f => Row(f.items, f.freq))
     val schema = StructType(Seq(
@@ -187,7 +191,8 @@ class FPGrowth @Since("2.2.0") (
       items.unpersist()
     }
 
-    copyValues(new FPGrowthModel(uid, frequentItems)).setParent(this)
+    copyValues(new FPGrowthModel(uid, frequentItems, parentModel.itemSupport, inputRowCount))
+      .setParent(this)
   }
 
   @Since("2.2.0")
@@ -217,7 +222,9 @@ object FPGrowth extends DefaultParamsReadable[FPGrowth] {
 @Experimental
 class FPGrowthModel private[ml] (
     @Since("2.2.0") override val uid: String,
-    @Since("2.2.0") @transient val freqItemsets: DataFrame)
+    @Since("2.2.0") @transient val freqItemsets: DataFrame,
+    private val itemSupport: scala.collection.Map[Any, Double],
+    private val numTrainingRecords: Long)
   extends Model[FPGrowthModel] with FPGrowthParams with MLWritable {
 
   /** @group setParam */
@@ -241,9 +248,9 @@ class FPGrowthModel private[ml] (
   @transient private var _cachedRules: DataFrame = _
 
   /**
-   * Get association rules fitted using the minConfidence. Returns a dataframe
-   * with three fields, "antecedent", "consequent" and "confidence", where "antecedent" and
-   * "consequent" are Array[T] and "confidence" is Double.
+   * Get association rules fitted using the minConfidence. Returns a dataframe with four fields,
+   * "antecedent", "consequent", "confidence" and "lift", where "antecedent" and "consequent" are
+   * Array[T], whereas "confidence" and "lift" are Double.
    */
   @Since("2.2.0")
   @transient def associationRules: DataFrame = {
@@ -251,7 +258,7 @@ class FPGrowthModel private[ml] (
       _cachedRules
     } else {
       _cachedRules = AssociationRules
-        .getAssociationRulesFromFP(freqItemsets, "items", "freq", $(minConfidence))
+        .getAssociationRulesFromFP(freqItemsets, "items", "freq", $(minConfidence), itemSupport)
       _cachedMinConf = $(minConfidence)
       _cachedRules
     }
@@ -301,7 +308,7 @@ class FPGrowthModel private[ml] (
 
   @Since("2.2.0")
   override def copy(extra: ParamMap): FPGrowthModel = {
-    val copied = new FPGrowthModel(uid, freqItemsets)
+    val copied = new FPGrowthModel(uid, freqItemsets, itemSupport, numTrainingRecords)
     copyValues(copied, extra).setParent(this.parent)
   }
 
@@ -323,7 +330,8 @@ object FPGrowthModel extends MLReadable[FPGrowthModel] {
   class FPGrowthModelWriter(instance: FPGrowthModel) extends MLWriter {
 
     override protected def saveImpl(path: String): Unit = {
-      DefaultParamsWriter.saveMetadata(instance, path, sc)
+      val extraMetadata: JObject = Map("numTrainingRecords" -> instance.numTrainingRecords)
+      DefaultParamsWriter.saveMetadata(instance, path, sc, extraMetadata = Some(extraMetadata))
       val dataPath = new Path(path, "data").toString
       instance.freqItemsets.write.parquet(dataPath)
     }
@@ -335,10 +343,28 @@ object FPGrowthModel extends MLReadable[FPGrowthModel] {
     private val className = classOf[FPGrowthModel].getName
 
     override def load(path: String): FPGrowthModel = {
+      implicit val format = DefaultFormats
       val metadata = DefaultParamsReader.loadMetadata(path, sc, className)
+      val (major, minor) = VersionUtils.majorMinorVersion(metadata.sparkVersion)
+      val numTrainingRecords = if (major.toInt < 2 || (major.toInt == 2 && minor.toInt < 4)) {
+        // 2.3 and before don't store the count
+        0L
+      } else {
+        // 2.4+
+        (metadata.metadata \ "numTrainingRecords").extract[Long]
+      }
       val dataPath = new Path(path, "data").toString
       val frequentItems = sparkSession.read.parquet(dataPath)
-      val model = new FPGrowthModel(metadata.uid, frequentItems)
+      val itemSupport = if (numTrainingRecords == 0L) {
+        Map.empty[Any, Double]
+      } else {
+        frequentItems.rdd.flatMap {
+            case Row(items: Seq[_], count: Long) if items.length == 1 =>
+              Some(items.head -> count.toDouble / numTrainingRecords)
+            case _ => None
+          }.collectAsMap()
+      }
+      val model = new FPGrowthModel(metadata.uid, frequentItems, itemSupport, numTrainingRecords)
       metadata.getAndSetParams(model)
       model
     }
@@ -354,27 +380,30 @@ private[fpm] object AssociationRules {
    * @param itemsCol column name for frequent itemsets
    * @param freqCol column name for appearance count of the frequent itemsets
    * @param minConfidence minimum confidence for generating the association rules
-   * @return a DataFrame("antecedent"[Array], "consequent"[Array], "confidence"[Double])
-   *         containing the association rules.
+   * @param itemSupport map containing an item and its support
+   * @return a DataFrame("antecedent"[Array], "consequent"[Array], "confidence"[Double],
+   *         "lift" [Double]) containing the association rules.
    */
   def getAssociationRulesFromFP[T: ClassTag](
         dataset: Dataset[_],
         itemsCol: String,
         freqCol: String,
-        minConfidence: Double): DataFrame = {
+        minConfidence: Double,
+        itemSupport: scala.collection.Map[T, Double]): DataFrame = {
 
     val freqItemSetRdd = dataset.select(itemsCol, freqCol).rdd
       .map(row => new FreqItemset(row.getSeq[T](0).toArray, row.getLong(1)))
     val rows = new MLlibAssociationRules()
       .setMinConfidence(minConfidence)
-      .run(freqItemSetRdd)
-      .map(r => Row(r.antecedent, r.consequent, r.confidence))
+      .run(freqItemSetRdd, itemSupport)
+      .map(r => Row(r.antecedent, r.consequent, r.confidence, r.lift.orNull))
 
     val dt = dataset.schema(itemsCol).dataType
     val schema = StructType(Seq(
       StructField("antecedent", dt, nullable = false),
       StructField("consequent", dt, nullable = false),
-      StructField("confidence", DoubleType, nullable = false)))
+      StructField("confidence", DoubleType, nullable = false),
+      StructField("lift", DoubleType)))
     val rules = dataset.sparkSession.createDataFrame(rows, schema)
     rules
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/a3dccd24/mllib/src/main/scala/org/apache/spark/mllib/fpm/AssociationRules.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/fpm/AssociationRules.scala b/mllib/src/main/scala/org/apache/spark/mllib/fpm/AssociationRules.scala
index acb83ac..43d256b 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/fpm/AssociationRules.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/fpm/AssociationRules.scala
@@ -56,11 +56,24 @@ class AssociationRules private[fpm] (
   /**
    * Computes the association rules with confidence above `minConfidence`.
    * @param freqItemsets frequent itemset model obtained from [[FPGrowth]]
-   * @return a `Set[Rule[Item]]` containing the association rules.
+   * @return a `RDD[Rule[Item]]` containing the association rules.
    *
    */
   @Since("1.5.0")
   def run[Item: ClassTag](freqItemsets: RDD[FreqItemset[Item]]): RDD[Rule[Item]] = {
+    run(freqItemsets, Map.empty[Item, Double])
+  }
+
+  /**
+   * Computes the association rules with confidence above `minConfidence`.
+   * @param freqItemsets frequent itemset model obtained from [[FPGrowth]]
+   * @param itemSupport map containing an item and its support
+   * @return a `RDD[Rule[Item]]` containing the association rules. The rules will be able to
+   *         compute also the lift metric.
+   */
+  @Since("2.4.0")
+  def run[Item: ClassTag](freqItemsets: RDD[FreqItemset[Item]],
+      itemSupport: scala.collection.Map[Item, Double]): RDD[Rule[Item]] = {
     // For candidate rule X => Y, generate (X, (Y, freq(X union Y)))
     val candidates = freqItemsets.flatMap { itemset =>
       val items = itemset.items
@@ -76,8 +89,13 @@ class AssociationRules private[fpm] (
     // Join to get (X, ((Y, freq(X union Y)), freq(X))), generate rules, and filter by confidence
     candidates.join(freqItemsets.map(x => (x.items.toSeq, x.freq)))
       .map { case (antecendent, ((consequent, freqUnion), freqAntecedent)) =>
-      new Rule(antecendent.toArray, consequent.toArray, freqUnion, freqAntecedent)
-    }.filter(_.confidence >= minConfidence)
+        new Rule(antecendent.toArray,
+          consequent.toArray,
+          freqUnion,
+          freqAntecedent,
+          // the consequent contains always only one element
+          itemSupport.get(consequent.head))
+      }.filter(_.confidence >= minConfidence)
   }
 
   /**
@@ -107,14 +125,21 @@ object AssociationRules {
       @Since("1.5.0") val antecedent: Array[Item],
       @Since("1.5.0") val consequent: Array[Item],
       freqUnion: Double,
-      freqAntecedent: Double) extends Serializable {
+      freqAntecedent: Double,
+      freqConsequent: Option[Double]) extends Serializable {
 
     /**
      * Returns the confidence of the rule.
      *
      */
     @Since("1.5.0")
-    def confidence: Double = freqUnion.toDouble / freqAntecedent
+    def confidence: Double = freqUnion / freqAntecedent
+
+    /**
+     * Returns the lift of the rule.
+     */
+    @Since("2.4.0")
+    def lift: Option[Double] = freqConsequent.map(fCons => confidence / fCons)
 
     require(antecedent.toSet.intersect(consequent.toSet).isEmpty, {
       val sharedItems = antecedent.toSet.intersect(consequent.toSet)
@@ -142,7 +167,7 @@ object AssociationRules {
 
     override def toString: String = {
       s"${antecedent.mkString("{", ",", "}")} => " +
-        s"${consequent.mkString("{", ",", "}")}: ${confidence}"
+        s"${consequent.mkString("{", ",", "}")}: (confidence: $confidence; lift: $lift)"
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/a3dccd24/mllib/src/main/scala/org/apache/spark/mllib/fpm/FPGrowth.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/fpm/FPGrowth.scala b/mllib/src/main/scala/org/apache/spark/mllib/fpm/FPGrowth.scala
index 4f2b7e6..3a1bc35 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/fpm/FPGrowth.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/fpm/FPGrowth.scala
@@ -48,9 +48,14 @@ import org.apache.spark.storage.StorageLevel
  * @tparam Item item type
  */
 @Since("1.3.0")
-class FPGrowthModel[Item: ClassTag] @Since("1.3.0") (
-    @Since("1.3.0") val freqItemsets: RDD[FreqItemset[Item]])
+class FPGrowthModel[Item: ClassTag] @Since("2.4.0") (
+    @Since("1.3.0") val freqItemsets: RDD[FreqItemset[Item]],
+    @Since("2.4.0") val itemSupport: Map[Item, Double])
   extends Saveable with Serializable {
+
+  @Since("1.3.0")
+  def this(freqItemsets: RDD[FreqItemset[Item]]) = this(freqItemsets, Map.empty)
+
   /**
    * Generates association rules for the `Item`s in [[freqItemsets]].
    * @param confidence minimal confidence of the rules produced
@@ -58,7 +63,7 @@ class FPGrowthModel[Item: ClassTag] @Since("1.3.0") (
   @Since("1.5.0")
   def generateAssociationRules(confidence: Double): RDD[AssociationRules.Rule[Item]] = {
     val associationRules = new AssociationRules(confidence)
-    associationRules.run(freqItemsets)
+    associationRules.run(freqItemsets, itemSupport)
   }
 
   /**
@@ -213,9 +218,12 @@ class FPGrowth private[spark] (
     val minCount = math.ceil(minSupport * count).toLong
     val numParts = if (numPartitions > 0) numPartitions else data.partitions.length
     val partitioner = new HashPartitioner(numParts)
-    val freqItems = genFreqItems(data, minCount, partitioner)
-    val freqItemsets = genFreqItemsets(data, minCount, freqItems, partitioner)
-    new FPGrowthModel(freqItemsets)
+    val freqItemsCount = genFreqItems(data, minCount, partitioner)
+    val freqItemsets = genFreqItemsets(data, minCount, freqItemsCount.map(_._1), partitioner)
+    val itemSupport = freqItemsCount.map {
+      case (item, cnt) => item -> cnt.toDouble / count
+    }.toMap
+    new FPGrowthModel(freqItemsets, itemSupport)
   }
 
   /**
@@ -231,12 +239,12 @@ class FPGrowth private[spark] (
    * Generates frequent items by filtering the input data using minimal support level.
    * @param minCount minimum count for frequent itemsets
    * @param partitioner partitioner used to distribute items
-   * @return array of frequent pattern ordered by their frequencies
+   * @return array of frequent patterns and their frequencies ordered by their frequencies
    */
   private def genFreqItems[Item: ClassTag](
       data: RDD[Array[Item]],
       minCount: Long,
-      partitioner: Partitioner): Array[Item] = {
+      partitioner: Partitioner): Array[(Item, Long)] = {
     data.flatMap { t =>
       val uniq = t.toSet
       if (t.length != uniq.size) {
@@ -248,7 +256,6 @@ class FPGrowth private[spark] (
       .filter(_._2 >= minCount)
       .collect()
       .sortBy(-_._2)
-      .map(_._1)
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/a3dccd24/mllib/src/test/scala/org/apache/spark/ml/fpm/FPGrowthSuite.scala
----------------------------------------------------------------------
diff --git a/mllib/src/test/scala/org/apache/spark/ml/fpm/FPGrowthSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/fpm/FPGrowthSuite.scala
index 87f8b90..b75526a 100644
--- a/mllib/src/test/scala/org/apache/spark/ml/fpm/FPGrowthSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/ml/fpm/FPGrowthSuite.scala
@@ -39,9 +39,9 @@ class FPGrowthSuite extends SparkFunSuite with MLlibTestSparkContext with Defaul
       val model = new FPGrowth().setMinSupport(0.5).fit(data)
       val generatedRules = model.setMinConfidence(0.5).associationRules
       val expectedRules = spark.createDataFrame(Seq(
-        (Array("2"), Array("1"), 1.0),
-        (Array("1"), Array("2"), 0.75)
-      )).toDF("antecedent", "consequent", "confidence")
+        (Array("2"), Array("1"), 1.0, 1.0),
+        (Array("1"), Array("2"), 0.75, 1.0)
+      )).toDF("antecedent", "consequent", "confidence", "lift")
         .withColumn("antecedent", col("antecedent").cast(ArrayType(dt)))
         .withColumn("consequent", col("consequent").cast(ArrayType(dt)))
       assert(expectedRules.sort("antecedent").rdd.collect().sameElements(

http://git-wip-us.apache.org/repos/asf/spark/blob/a3dccd24/project/MimaExcludes.scala
----------------------------------------------------------------------
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index 4f250c9..62f8b1a 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -36,6 +36,10 @@ object MimaExcludes {
 
   // Exclude rules for 2.4.x
   lazy val v24excludes = v23excludes ++ Seq(
+    // [SPARK-10697][ML] Add lift to Association rules
+    ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.ml.fpm.FPGrowthModel.this"),
+    ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.mllib.fpm.AssociationRules#Rule.this"),
+
     // [SPARK-25044] Address translation of LMF closure primitive args to Object in Scala 2.12
     ProblemFilters.exclude[MissingTypesProblem]("org.apache.spark.sql.expressions.UserDefinedFunction$"),
     ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.expressions.UserDefinedFunction.apply"),

http://git-wip-us.apache.org/repos/asf/spark/blob/a3dccd24/python/pyspark/ml/fpm.py
----------------------------------------------------------------------
diff --git a/python/pyspark/ml/fpm.py b/python/pyspark/ml/fpm.py
index f939442..c2b29b7 100644
--- a/python/pyspark/ml/fpm.py
+++ b/python/pyspark/ml/fpm.py
@@ -145,10 +145,11 @@ class FPGrowthModel(JavaModel, JavaMLWritable, JavaMLReadable):
     @since("2.2.0")
     def associationRules(self):
         """
-        DataFrame with three columns:
+        DataFrame with four columns:
         * `antecedent`  - Array of the same type as the input column.
         * `consequent`  - Array of the same type as the input column.
         * `confidence`  - Confidence for the rule (`DoubleType`).
+        * `lift`        - Lift for the rule (`DoubleType`).
         """
         return self._call_java("associationRules")
 

http://git-wip-us.apache.org/repos/asf/spark/blob/a3dccd24/python/pyspark/ml/tests.py
----------------------------------------------------------------------
diff --git a/python/pyspark/ml/tests.py b/python/pyspark/ml/tests.py
index 5c87d1d..625d992 100755
--- a/python/pyspark/ml/tests.py
+++ b/python/pyspark/ml/tests.py
@@ -2158,8 +2158,8 @@ class FPGrowthTests(SparkSessionTestCase):
         fpm = fp.fit(self.data)
 
         expected_association_rules = self.spark.createDataFrame(
-            [([3], [1], 1.0), ([2], [1], 1.0)],
-            ["antecedent", "consequent", "confidence"]
+            [([3], [1], 1.0, 1.0), ([2], [1], 1.0, 1.0)],
+            ["antecedent", "consequent", "confidence", "lift"]
         )
         actual_association_rules = fpm.associationRules
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org