You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by sr...@apache.org on 2018/02/13 12:20:37 UTC

spark git commit: [SPARK-23318][ML] FP-growth: WARN FPGrowth: Input data is not cached

Repository: spark
Updated Branches:
  refs/heads/master 407f67249 -> 9dae71516


[SPARK-23318][ML] FP-growth: WARN FPGrowth: Input data is not cached

## What changes were proposed in this pull request?

Cache the RDD of items in ml.FPGrowth before passing it to mllib.FPGrowth. Cache only when the user did not cache the input dataset of transactions. This fixes the warning about uncached data emerging from mllib.FPGrowth.

## How was this patch tested?

Manually:
1. Run ml.FPGrowthExample - warning is there
2. Apply the fix
3. Run ml.FPGrowthExample again - no warning anymore

Author: Arseniy Tashoyan <ta...@gmail.com>

Closes #20578 from tashoyan/SPARK-23318.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9dae7151
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9dae7151
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9dae7151

Branch: refs/heads/master
Commit: 9dae715168a8e72e318ab231c34a1069bfa342a6
Parents: 407f672
Author: Arseniy Tashoyan <ta...@gmail.com>
Authored: Tue Feb 13 06:20:34 2018 -0600
Committer: Sean Owen <so...@cloudera.com>
Committed: Tue Feb 13 06:20:34 2018 -0600

----------------------------------------------------------------------
 .../scala/org/apache/spark/ml/fpm/FPGrowth.scala     | 15 ++++++++++++++-
 1 file changed, 14 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/9dae7151/mllib/src/main/scala/org/apache/spark/ml/fpm/FPGrowth.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/ml/fpm/FPGrowth.scala b/mllib/src/main/scala/org/apache/spark/ml/fpm/FPGrowth.scala
index aa7871d..3d041fc 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/fpm/FPGrowth.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/fpm/FPGrowth.scala
@@ -32,6 +32,7 @@ import org.apache.spark.mllib.fpm.FPGrowth.FreqItemset
 import org.apache.spark.sql._
 import org.apache.spark.sql.functions._
 import org.apache.spark.sql.types._
+import org.apache.spark.storage.StorageLevel
 
 /**
  * Common params for FPGrowth and FPGrowthModel
@@ -158,18 +159,30 @@ class FPGrowth @Since("2.2.0") (
   }
 
   private def genericFit[T: ClassTag](dataset: Dataset[_]): FPGrowthModel = {
+    val handlePersistence = dataset.storageLevel == StorageLevel.NONE
+
     val data = dataset.select($(itemsCol))
-    val items = data.where(col($(itemsCol)).isNotNull).rdd.map(r => r.getSeq[T](0).toArray)
+    val items = data.where(col($(itemsCol)).isNotNull).rdd.map(r => r.getSeq[Any](0).toArray)
     val mllibFP = new MLlibFPGrowth().setMinSupport($(minSupport))
     if (isSet(numPartitions)) {
       mllibFP.setNumPartitions($(numPartitions))
     }
+
+    if (handlePersistence) {
+      items.persist(StorageLevel.MEMORY_AND_DISK)
+    }
+
     val parentModel = mllibFP.run(items)
     val rows = parentModel.freqItemsets.map(f => Row(f.items, f.freq))
     val schema = StructType(Seq(
       StructField("items", dataset.schema($(itemsCol)).dataType, nullable = false),
       StructField("freq", LongType, nullable = false)))
     val frequentItems = dataset.sparkSession.createDataFrame(rows, schema)
+
+    if (handlePersistence) {
+      items.unpersist()
+    }
+
     copyValues(new FPGrowthModel(uid, frequentItems)).setParent(this)
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org