You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by sr...@apache.org on 2018/09/01 23:08:02 UTC
spark git commit: [SPARK-10697][ML] Add lift to Association rules
Repository: spark
Updated Branches:
refs/heads/master 6ad8d4c37 -> a3dccd24c
[SPARK-10697][ML] Add lift to Association rules
## What changes were proposed in this pull request?
The PR adds the lift measure to Association rules.
## How was this patch tested?
existing and modified UTs
Closes #22236 from mgaido91/SPARK-10697.
Authored-by: Marco Gaido <ma...@gmail.com>
Signed-off-by: Sean Owen <se...@databricks.com>
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a3dccd24
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a3dccd24
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a3dccd24
Branch: refs/heads/master
Commit: a3dccd24c2e932b90e647e678f351f5b5568305b
Parents: 6ad8d4c
Author: Marco Gaido <ma...@gmail.com>
Authored: Sat Sep 1 18:07:58 2018 -0500
Committer: Sean Owen <se...@databricks.com>
Committed: Sat Sep 1 18:07:58 2018 -0500
----------------------------------------------------------------------
R/pkg/R/mllib_fpm.R | 5 +-
R/pkg/tests/fulltests/test_mllib_fpm.R | 3 +-
.../org/apache/spark/ml/fpm/FPGrowth.scala | 61 +++++++++++++++-----
.../spark/mllib/fpm/AssociationRules.scala | 37 ++++++++++--
.../org/apache/spark/mllib/fpm/FPGrowth.scala | 25 +++++---
.../org/apache/spark/ml/fpm/FPGrowthSuite.scala | 6 +-
project/MimaExcludes.scala | 4 ++
python/pyspark/ml/fpm.py | 3 +-
python/pyspark/ml/tests.py | 4 +-
9 files changed, 108 insertions(+), 40 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/a3dccd24/R/pkg/R/mllib_fpm.R
----------------------------------------------------------------------
diff --git a/R/pkg/R/mllib_fpm.R b/R/pkg/R/mllib_fpm.R
index e239490..4ad34fe 100644
--- a/R/pkg/R/mllib_fpm.R
+++ b/R/pkg/R/mllib_fpm.R
@@ -116,10 +116,11 @@ setMethod("spark.freqItemsets", signature(object = "FPGrowthModel"),
# Get association rules.
#' @return A \code{SparkDataFrame} with association rules.
-#' The \code{SparkDataFrame} contains three columns:
+#' The \code{SparkDataFrame} contains four columns:
#' \code{antecedent} (an array of the same type as the input column),
#' \code{consequent} (an array of the same type as the input column),
-#' and \code{condfidence} (confidence).
+#' \code{condfidence} (confidence for the rule)
+#' and \code{lift} (lift for the rule)
#' @rdname spark.fpGrowth
#' @aliases associationRules,FPGrowthModel-method
#' @note spark.associationRules(FPGrowthModel) since 2.2.0
http://git-wip-us.apache.org/repos/asf/spark/blob/a3dccd24/R/pkg/tests/fulltests/test_mllib_fpm.R
----------------------------------------------------------------------
diff --git a/R/pkg/tests/fulltests/test_mllib_fpm.R b/R/pkg/tests/fulltests/test_mllib_fpm.R
index 69dda52..d80f66a 100644
--- a/R/pkg/tests/fulltests/test_mllib_fpm.R
+++ b/R/pkg/tests/fulltests/test_mllib_fpm.R
@@ -44,7 +44,8 @@ test_that("spark.fpGrowth", {
expected_association_rules <- data.frame(
antecedent = I(list(list("2"), list("3"))),
consequent = I(list(list("1"), list("1"))),
- confidence = c(1, 1)
+ confidence = c(1, 1),
+ lift = c(1, 1)
)
expect_equivalent(expected_association_rules, collect(spark.associationRules(model)))
http://git-wip-us.apache.org/repos/asf/spark/blob/a3dccd24/mllib/src/main/scala/org/apache/spark/ml/fpm/FPGrowth.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/ml/fpm/FPGrowth.scala b/mllib/src/main/scala/org/apache/spark/ml/fpm/FPGrowth.scala
index 85c483c..840a89b 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/fpm/FPGrowth.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/fpm/FPGrowth.scala
@@ -20,6 +20,8 @@ package org.apache.spark.ml.fpm
import scala.reflect.ClassTag
import org.apache.hadoop.fs.Path
+import org.json4s.{DefaultFormats, JObject}
+import org.json4s.JsonDSL._
import org.apache.spark.annotation.{Experimental, Since}
import org.apache.spark.ml.{Estimator, Model}
@@ -34,6 +36,7 @@ import org.apache.spark.sql._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import org.apache.spark.storage.StorageLevel
+import org.apache.spark.util.VersionUtils
/**
* Common params for FPGrowth and FPGrowthModel
@@ -175,7 +178,8 @@ class FPGrowth @Since("2.2.0") (
if (handlePersistence) {
items.persist(StorageLevel.MEMORY_AND_DISK)
}
-
+ val inputRowCount = items.count()
+ instr.logNumExamples(inputRowCount)
val parentModel = mllibFP.run(items)
val rows = parentModel.freqItemsets.map(f => Row(f.items, f.freq))
val schema = StructType(Seq(
@@ -187,7 +191,8 @@ class FPGrowth @Since("2.2.0") (
items.unpersist()
}
- copyValues(new FPGrowthModel(uid, frequentItems)).setParent(this)
+ copyValues(new FPGrowthModel(uid, frequentItems, parentModel.itemSupport, inputRowCount))
+ .setParent(this)
}
@Since("2.2.0")
@@ -217,7 +222,9 @@ object FPGrowth extends DefaultParamsReadable[FPGrowth] {
@Experimental
class FPGrowthModel private[ml] (
@Since("2.2.0") override val uid: String,
- @Since("2.2.0") @transient val freqItemsets: DataFrame)
+ @Since("2.2.0") @transient val freqItemsets: DataFrame,
+ private val itemSupport: scala.collection.Map[Any, Double],
+ private val numTrainingRecords: Long)
extends Model[FPGrowthModel] with FPGrowthParams with MLWritable {
/** @group setParam */
@@ -241,9 +248,9 @@ class FPGrowthModel private[ml] (
@transient private var _cachedRules: DataFrame = _
/**
- * Get association rules fitted using the minConfidence. Returns a dataframe
- * with three fields, "antecedent", "consequent" and "confidence", where "antecedent" and
- * "consequent" are Array[T] and "confidence" is Double.
+ * Get association rules fitted using the minConfidence. Returns a dataframe with four fields,
+ * "antecedent", "consequent", "confidence" and "lift", where "antecedent" and "consequent" are
+ * Array[T], whereas "confidence" and "lift" are Double.
*/
@Since("2.2.0")
@transient def associationRules: DataFrame = {
@@ -251,7 +258,7 @@ class FPGrowthModel private[ml] (
_cachedRules
} else {
_cachedRules = AssociationRules
- .getAssociationRulesFromFP(freqItemsets, "items", "freq", $(minConfidence))
+ .getAssociationRulesFromFP(freqItemsets, "items", "freq", $(minConfidence), itemSupport)
_cachedMinConf = $(minConfidence)
_cachedRules
}
@@ -301,7 +308,7 @@ class FPGrowthModel private[ml] (
@Since("2.2.0")
override def copy(extra: ParamMap): FPGrowthModel = {
- val copied = new FPGrowthModel(uid, freqItemsets)
+ val copied = new FPGrowthModel(uid, freqItemsets, itemSupport, numTrainingRecords)
copyValues(copied, extra).setParent(this.parent)
}
@@ -323,7 +330,8 @@ object FPGrowthModel extends MLReadable[FPGrowthModel] {
class FPGrowthModelWriter(instance: FPGrowthModel) extends MLWriter {
override protected def saveImpl(path: String): Unit = {
- DefaultParamsWriter.saveMetadata(instance, path, sc)
+ val extraMetadata: JObject = Map("numTrainingRecords" -> instance.numTrainingRecords)
+ DefaultParamsWriter.saveMetadata(instance, path, sc, extraMetadata = Some(extraMetadata))
val dataPath = new Path(path, "data").toString
instance.freqItemsets.write.parquet(dataPath)
}
@@ -335,10 +343,28 @@ object FPGrowthModel extends MLReadable[FPGrowthModel] {
private val className = classOf[FPGrowthModel].getName
override def load(path: String): FPGrowthModel = {
+ implicit val format = DefaultFormats
val metadata = DefaultParamsReader.loadMetadata(path, sc, className)
+ val (major, minor) = VersionUtils.majorMinorVersion(metadata.sparkVersion)
+ val numTrainingRecords = if (major.toInt < 2 || (major.toInt == 2 && minor.toInt < 4)) {
+ // 2.3 and before don't store the count
+ 0L
+ } else {
+ // 2.4+
+ (metadata.metadata \ "numTrainingRecords").extract[Long]
+ }
val dataPath = new Path(path, "data").toString
val frequentItems = sparkSession.read.parquet(dataPath)
- val model = new FPGrowthModel(metadata.uid, frequentItems)
+ val itemSupport = if (numTrainingRecords == 0L) {
+ Map.empty[Any, Double]
+ } else {
+ frequentItems.rdd.flatMap {
+ case Row(items: Seq[_], count: Long) if items.length == 1 =>
+ Some(items.head -> count.toDouble / numTrainingRecords)
+ case _ => None
+ }.collectAsMap()
+ }
+ val model = new FPGrowthModel(metadata.uid, frequentItems, itemSupport, numTrainingRecords)
metadata.getAndSetParams(model)
model
}
@@ -354,27 +380,30 @@ private[fpm] object AssociationRules {
* @param itemsCol column name for frequent itemsets
* @param freqCol column name for appearance count of the frequent itemsets
* @param minConfidence minimum confidence for generating the association rules
- * @return a DataFrame("antecedent"[Array], "consequent"[Array], "confidence"[Double])
- * containing the association rules.
+ * @param itemSupport map containing an item and its support
+ * @return a DataFrame("antecedent"[Array], "consequent"[Array], "confidence"[Double],
+ * "lift" [Double]) containing the association rules.
*/
def getAssociationRulesFromFP[T: ClassTag](
dataset: Dataset[_],
itemsCol: String,
freqCol: String,
- minConfidence: Double): DataFrame = {
+ minConfidence: Double,
+ itemSupport: scala.collection.Map[T, Double]): DataFrame = {
val freqItemSetRdd = dataset.select(itemsCol, freqCol).rdd
.map(row => new FreqItemset(row.getSeq[T](0).toArray, row.getLong(1)))
val rows = new MLlibAssociationRules()
.setMinConfidence(minConfidence)
- .run(freqItemSetRdd)
- .map(r => Row(r.antecedent, r.consequent, r.confidence))
+ .run(freqItemSetRdd, itemSupport)
+ .map(r => Row(r.antecedent, r.consequent, r.confidence, r.lift.orNull))
val dt = dataset.schema(itemsCol).dataType
val schema = StructType(Seq(
StructField("antecedent", dt, nullable = false),
StructField("consequent", dt, nullable = false),
- StructField("confidence", DoubleType, nullable = false)))
+ StructField("confidence", DoubleType, nullable = false),
+ StructField("lift", DoubleType)))
val rules = dataset.sparkSession.createDataFrame(rows, schema)
rules
}
http://git-wip-us.apache.org/repos/asf/spark/blob/a3dccd24/mllib/src/main/scala/org/apache/spark/mllib/fpm/AssociationRules.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/fpm/AssociationRules.scala b/mllib/src/main/scala/org/apache/spark/mllib/fpm/AssociationRules.scala
index acb83ac..43d256b 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/fpm/AssociationRules.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/fpm/AssociationRules.scala
@@ -56,11 +56,24 @@ class AssociationRules private[fpm] (
/**
* Computes the association rules with confidence above `minConfidence`.
* @param freqItemsets frequent itemset model obtained from [[FPGrowth]]
- * @return a `Set[Rule[Item]]` containing the association rules.
+ * @return a `RDD[Rule[Item]]` containing the association rules.
*
*/
@Since("1.5.0")
def run[Item: ClassTag](freqItemsets: RDD[FreqItemset[Item]]): RDD[Rule[Item]] = {
+ run(freqItemsets, Map.empty[Item, Double])
+ }
+
+ /**
+ * Computes the association rules with confidence above `minConfidence`.
+ * @param freqItemsets frequent itemset model obtained from [[FPGrowth]]
+ * @param itemSupport map containing an item and its support
+ * @return a `RDD[Rule[Item]]` containing the association rules. The rules will be able to
+ * compute also the lift metric.
+ */
+ @Since("2.4.0")
+ def run[Item: ClassTag](freqItemsets: RDD[FreqItemset[Item]],
+ itemSupport: scala.collection.Map[Item, Double]): RDD[Rule[Item]] = {
// For candidate rule X => Y, generate (X, (Y, freq(X union Y)))
val candidates = freqItemsets.flatMap { itemset =>
val items = itemset.items
@@ -76,8 +89,13 @@ class AssociationRules private[fpm] (
// Join to get (X, ((Y, freq(X union Y)), freq(X))), generate rules, and filter by confidence
candidates.join(freqItemsets.map(x => (x.items.toSeq, x.freq)))
.map { case (antecendent, ((consequent, freqUnion), freqAntecedent)) =>
- new Rule(antecendent.toArray, consequent.toArray, freqUnion, freqAntecedent)
- }.filter(_.confidence >= minConfidence)
+ new Rule(antecendent.toArray,
+ consequent.toArray,
+ freqUnion,
+ freqAntecedent,
+ // the consequent contains always only one element
+ itemSupport.get(consequent.head))
+ }.filter(_.confidence >= minConfidence)
}
/**
@@ -107,14 +125,21 @@ object AssociationRules {
@Since("1.5.0") val antecedent: Array[Item],
@Since("1.5.0") val consequent: Array[Item],
freqUnion: Double,
- freqAntecedent: Double) extends Serializable {
+ freqAntecedent: Double,
+ freqConsequent: Option[Double]) extends Serializable {
/**
* Returns the confidence of the rule.
*
*/
@Since("1.5.0")
- def confidence: Double = freqUnion.toDouble / freqAntecedent
+ def confidence: Double = freqUnion / freqAntecedent
+
+ /**
+ * Returns the lift of the rule.
+ */
+ @Since("2.4.0")
+ def lift: Option[Double] = freqConsequent.map(fCons => confidence / fCons)
require(antecedent.toSet.intersect(consequent.toSet).isEmpty, {
val sharedItems = antecedent.toSet.intersect(consequent.toSet)
@@ -142,7 +167,7 @@ object AssociationRules {
override def toString: String = {
s"${antecedent.mkString("{", ",", "}")} => " +
- s"${consequent.mkString("{", ",", "}")}: ${confidence}"
+ s"${consequent.mkString("{", ",", "}")}: (confidence: $confidence; lift: $lift)"
}
}
}
http://git-wip-us.apache.org/repos/asf/spark/blob/a3dccd24/mllib/src/main/scala/org/apache/spark/mllib/fpm/FPGrowth.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/fpm/FPGrowth.scala b/mllib/src/main/scala/org/apache/spark/mllib/fpm/FPGrowth.scala
index 4f2b7e6..3a1bc35 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/fpm/FPGrowth.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/fpm/FPGrowth.scala
@@ -48,9 +48,14 @@ import org.apache.spark.storage.StorageLevel
* @tparam Item item type
*/
@Since("1.3.0")
-class FPGrowthModel[Item: ClassTag] @Since("1.3.0") (
- @Since("1.3.0") val freqItemsets: RDD[FreqItemset[Item]])
+class FPGrowthModel[Item: ClassTag] @Since("2.4.0") (
+ @Since("1.3.0") val freqItemsets: RDD[FreqItemset[Item]],
+ @Since("2.4.0") val itemSupport: Map[Item, Double])
extends Saveable with Serializable {
+
+ @Since("1.3.0")
+ def this(freqItemsets: RDD[FreqItemset[Item]]) = this(freqItemsets, Map.empty)
+
/**
* Generates association rules for the `Item`s in [[freqItemsets]].
* @param confidence minimal confidence of the rules produced
@@ -58,7 +63,7 @@ class FPGrowthModel[Item: ClassTag] @Since("1.3.0") (
@Since("1.5.0")
def generateAssociationRules(confidence: Double): RDD[AssociationRules.Rule[Item]] = {
val associationRules = new AssociationRules(confidence)
- associationRules.run(freqItemsets)
+ associationRules.run(freqItemsets, itemSupport)
}
/**
@@ -213,9 +218,12 @@ class FPGrowth private[spark] (
val minCount = math.ceil(minSupport * count).toLong
val numParts = if (numPartitions > 0) numPartitions else data.partitions.length
val partitioner = new HashPartitioner(numParts)
- val freqItems = genFreqItems(data, minCount, partitioner)
- val freqItemsets = genFreqItemsets(data, minCount, freqItems, partitioner)
- new FPGrowthModel(freqItemsets)
+ val freqItemsCount = genFreqItems(data, minCount, partitioner)
+ val freqItemsets = genFreqItemsets(data, minCount, freqItemsCount.map(_._1), partitioner)
+ val itemSupport = freqItemsCount.map {
+ case (item, cnt) => item -> cnt.toDouble / count
+ }.toMap
+ new FPGrowthModel(freqItemsets, itemSupport)
}
/**
@@ -231,12 +239,12 @@ class FPGrowth private[spark] (
* Generates frequent items by filtering the input data using minimal support level.
* @param minCount minimum count for frequent itemsets
* @param partitioner partitioner used to distribute items
- * @return array of frequent pattern ordered by their frequencies
+ * @return array of frequent patterns and their frequencies ordered by their frequencies
*/
private def genFreqItems[Item: ClassTag](
data: RDD[Array[Item]],
minCount: Long,
- partitioner: Partitioner): Array[Item] = {
+ partitioner: Partitioner): Array[(Item, Long)] = {
data.flatMap { t =>
val uniq = t.toSet
if (t.length != uniq.size) {
@@ -248,7 +256,6 @@ class FPGrowth private[spark] (
.filter(_._2 >= minCount)
.collect()
.sortBy(-_._2)
- .map(_._1)
}
/**
http://git-wip-us.apache.org/repos/asf/spark/blob/a3dccd24/mllib/src/test/scala/org/apache/spark/ml/fpm/FPGrowthSuite.scala
----------------------------------------------------------------------
diff --git a/mllib/src/test/scala/org/apache/spark/ml/fpm/FPGrowthSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/fpm/FPGrowthSuite.scala
index 87f8b90..b75526a 100644
--- a/mllib/src/test/scala/org/apache/spark/ml/fpm/FPGrowthSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/ml/fpm/FPGrowthSuite.scala
@@ -39,9 +39,9 @@ class FPGrowthSuite extends SparkFunSuite with MLlibTestSparkContext with Defaul
val model = new FPGrowth().setMinSupport(0.5).fit(data)
val generatedRules = model.setMinConfidence(0.5).associationRules
val expectedRules = spark.createDataFrame(Seq(
- (Array("2"), Array("1"), 1.0),
- (Array("1"), Array("2"), 0.75)
- )).toDF("antecedent", "consequent", "confidence")
+ (Array("2"), Array("1"), 1.0, 1.0),
+ (Array("1"), Array("2"), 0.75, 1.0)
+ )).toDF("antecedent", "consequent", "confidence", "lift")
.withColumn("antecedent", col("antecedent").cast(ArrayType(dt)))
.withColumn("consequent", col("consequent").cast(ArrayType(dt)))
assert(expectedRules.sort("antecedent").rdd.collect().sameElements(
http://git-wip-us.apache.org/repos/asf/spark/blob/a3dccd24/project/MimaExcludes.scala
----------------------------------------------------------------------
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index 4f250c9..62f8b1a 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -36,6 +36,10 @@ object MimaExcludes {
// Exclude rules for 2.4.x
lazy val v24excludes = v23excludes ++ Seq(
+ // [SPARK-10697][ML] Add lift to Association rules
+ ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.ml.fpm.FPGrowthModel.this"),
+ ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.mllib.fpm.AssociationRules#Rule.this"),
+
// [SPARK-25044] Address translation of LMF closure primitive args to Object in Scala 2.12
ProblemFilters.exclude[MissingTypesProblem]("org.apache.spark.sql.expressions.UserDefinedFunction$"),
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.expressions.UserDefinedFunction.apply"),
http://git-wip-us.apache.org/repos/asf/spark/blob/a3dccd24/python/pyspark/ml/fpm.py
----------------------------------------------------------------------
diff --git a/python/pyspark/ml/fpm.py b/python/pyspark/ml/fpm.py
index f939442..c2b29b7 100644
--- a/python/pyspark/ml/fpm.py
+++ b/python/pyspark/ml/fpm.py
@@ -145,10 +145,11 @@ class FPGrowthModel(JavaModel, JavaMLWritable, JavaMLReadable):
@since("2.2.0")
def associationRules(self):
"""
- DataFrame with three columns:
+ DataFrame with four columns:
* `antecedent` - Array of the same type as the input column.
* `consequent` - Array of the same type as the input column.
* `confidence` - Confidence for the rule (`DoubleType`).
+ * `lift` - Lift for the rule (`DoubleType`).
"""
return self._call_java("associationRules")
http://git-wip-us.apache.org/repos/asf/spark/blob/a3dccd24/python/pyspark/ml/tests.py
----------------------------------------------------------------------
diff --git a/python/pyspark/ml/tests.py b/python/pyspark/ml/tests.py
index 5c87d1d..625d992 100755
--- a/python/pyspark/ml/tests.py
+++ b/python/pyspark/ml/tests.py
@@ -2158,8 +2158,8 @@ class FPGrowthTests(SparkSessionTestCase):
fpm = fp.fit(self.data)
expected_association_rules = self.spark.createDataFrame(
- [([3], [1], 1.0), ([2], [1], 1.0)],
- ["antecedent", "consequent", "confidence"]
+ [([3], [1], 1.0, 1.0), ([2], [1], 1.0, 1.0)],
+ ["antecedent", "consequent", "confidence", "lift"]
)
actual_association_rules = fpm.associationRules
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org