You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by "zhengruifeng (via GitHub)" <gi...@apache.org> on 2023/03/03 05:49:34 UTC

[GitHub] [spark] zhengruifeng opened a new pull request, #40263: [SPARK-42659][ML] Reimplement `FPGrowthModel.transform` with dataframe operations

zhengruifeng opened a new pull request, #40263:
URL: https://github.com/apache/spark/pull/40263

   ### What changes were proposed in this pull request?
   Reimplement `FPGrowthModel.transform` with dataframe operations
   
   
   ### Why are the changes needed?
   delay the `collect()` of model dataframe `associationRules`
   
   
   ### Does this PR introduce _any_ user-facing change?
   no
   
   
   ### How was this patch tested?
   existing UT


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] srowen commented on pull request #40263: [SPARK-42659][ML] Reimplement `FPGrowthModel.transform` with dataframe operations

Posted by "srowen (via GitHub)" <gi...@apache.org>.
srowen commented on PR #40263:
URL: https://github.com/apache/spark/pull/40263#issuecomment-1473071461

   If it's faster and gives the right answers, sure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] srowen commented on pull request #40263: [SPARK-42659][ML] Reimplement `FPGrowthModel.transform` with dataframe operations

Posted by "srowen (via GitHub)" <gi...@apache.org>.
srowen commented on PR #40263:
URL: https://github.com/apache/spark/pull/40263#issuecomment-1466280043

   So this seems slower on a medium-sized data set. I don't know if delaying the collect() matters much; the overall execution time matters. I'm worried that this gets much slower on 1M or 10M records. Does this buy us other benefits, like, is this necessary to support "Safe Spark" or something?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] zhengruifeng commented on pull request #40263: [SPARK-42659][ML] Reimplement `FPGrowthModel.transform` with dataframe operations

Posted by "zhengruifeng (via GitHub)" <gi...@apache.org>.
zhengruifeng commented on PR #40263:
URL: https://github.com/apache/spark/pull/40263#issuecomment-1473064231

   @srowen if the latest performance test seems fine, then I'd ask the SQL guys whether we can have a subquery method in DataFrame APIs.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] zhengruifeng commented on pull request #40263: [SPARK-42659][ML] Reimplement `FPGrowthModel.transform` with dataframe operations

Posted by "zhengruifeng (via GitHub)" <gi...@apache.org>.
zhengruifeng commented on PR #40263:
URL: https://github.com/apache/spark/pull/40263#issuecomment-1465662299

   I did a quick test with dataset `T10I4D100K` in http://fimi.uantwerpen.be/data/ 
   
   fit:
   ```
   scala> val df = sc.textFile("/Users/ruifeng.zheng/.dev/data/T10I4D100K.dat").map(_.split(" ")).toDF("items")
   df: org.apache.spark.sql.DataFrame = [items: array<string>]
   
   scala> df.count
   res16: Long = 100000
   
   scala> val model = new FPGrowth().setMinSupport(0.01).setMinConfidence(0.01).fit(df)
   model: org.apache.spark.ml.fpm.FPGrowthModel = FPGrowthModel: uid=fpgrowth_92901252345a, numTrainingRecords=100000
   
   scala> model.freqItemsets.count
   res17: Long = 385                                                               
   
   scala> model.associationRules.count
   res18: Long = 21                                                                
   
   scala> model.save("/tmp/fpm.model")
   ```
   
   
   transformation:
   ```
   import org.apache.spark.ml.fpm._
   val df = sc.textFile("/Users/ruifeng.zheng/.dev/data/T10I4D100K.dat").map(_.split(" ")).toDF("items")
   df.cache()
   df.count()
   
   val model = FPGrowthModel.load("/tmp/fpm.model")
   model.transform(df).explain("extended")
   Seq.range(0, 100).foreach{i => model.transform(df).count()} // warms up
   val start = System.currentTimeMillis; Seq.range(0, 100).foreach{i => model.transform(df).count()}; val end = System.currentTimeMillis; end - start
   val start = System.currentTimeMillis; Seq.range(0, 100).foreach{i => model.transform(df).schema}; val end = System.currentTimeMillis; end - start
   ```
   
   master:
   ```
   master
   ```
   scala> val model = FPGrowthModel.load("/tmp/fpm.model")
   model: org.apache.spark.ml.fpm.FPGrowthModel = FPGrowthModel: uid=fpgrowth_92901252345a, numTrainingRecords=100000
   
   scala> model.transform(df).explain("extended")
   == Parsed Logical Plan ==
   'Project [items#5, UDF('items) AS prediction#70]
   +- Project [value#2 AS items#5]
      +- SerializeFromObject [mapobjects(lambdavariable(MapObject, ObjectType(class java.lang.Object), true, 1), staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(lambdavariable(MapObject, ObjectType(class java.lang.Object), true, 1), StringType, ObjectType(class java.lang.String)), true, false, true), input[0, [Ljava.lang.String;, true], None) AS value#2]
         +- ExternalRDD [obj#1]
   
   == Analyzed Logical Plan ==
   items: array<string>, prediction: array<string>
   Project [items#5, UDF(items#5) AS prediction#70]
   +- Project [value#2 AS items#5]
      +- SerializeFromObject [mapobjects(lambdavariable(MapObject, ObjectType(class java.lang.Object), true, 1), staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(lambdavariable(MapObject, ObjectType(class java.lang.Object), true, 1), StringType, ObjectType(class java.lang.String)), true, false, true), input[0, [Ljava.lang.String;, true], None) AS value#2]
         +- ExternalRDD [obj#1]
   
   == Optimized Logical Plan ==
   Project [items#5, UDF(items#5) AS prediction#70]
   +- InMemoryRelation [items#5], StorageLevel(disk, memory, deserialized, 1 replicas)
         +- *(1) Project [value#2 AS items#5]
            +- *(1) SerializeFromObject [mapobjects(lambdavariable(MapObject, ObjectType(class java.lang.Object), true, -1), staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(lambdavariable(MapObject, ObjectType(class java.lang.Object), true, -1), StringType, ObjectType(class java.lang.String)), true, false, true), input[0, [Ljava.lang.String;, true], None) AS value#2]
               +- Scan[obj#1]
   
   == Physical Plan ==
   *(1) Project [items#5, UDF(items#5) AS prediction#70]
   +- InMemoryTableScan [items#5]
         +- InMemoryRelation [items#5], StorageLevel(disk, memory, deserialized, 1 replicas)
               +- *(1) Project [value#2 AS items#5]
                  +- *(1) SerializeFromObject [mapobjects(lambdavariable(MapObject, ObjectType(class java.lang.Object), true, -1), staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(lambdavariable(MapObject, ObjectType(class java.lang.Object), true, -1), StringType, ObjectType(class java.lang.String)), true, false, true), input[0, [Ljava.lang.String;, true], None) AS value#2]
                     +- Scan[obj#1]
   
   
   scala> Seq.range(0, 100).foreach{i => model.transform(df).count()} // warms up
   
   scala> val start = System.currentTimeMillis; Seq.range(0, 100).foreach{i => model.transform(df).count()}; val end = System.currentTimeMillis; end - start
   start: Long = 1678692855532
   end: Long = 1678692860098
   res4: Long = 4566
   
   scala> val start = System.currentTimeMillis; Seq.range(0, 100).foreach{i => model.transform(df).schema}; val end = System.currentTimeMillis; end - start
   start: Long = 1678692860277
   end: Long = 1678692862372
   res5: Long = 2095
   ```
   
   this PR:
   ```
   scala> model.transform(df).explain("extended")
   == Parsed Logical Plan ==
   'Project [items#5, CASE WHEN NOT isnull('items) THEN aggregate('prediction, cast(array() as array<string>), lambdafunction(CASE WHEN forall(lambda 'y_1[antecedent], lambdafunction(array_contains('items, lambda 'x_2), lambda 'x_2, false)) THEN array_union(lambda 'x_0, array_except(lambda 'y_1[consequent], 'items)) ELSE lambda 'x_0 END, lambda 'x_0, lambda 'y_1, false), lambdafunction(lambda 'x_3, lambda 'x_3, false)) ELSE cast(array() as array<string>) END AS prediction#72]
   +- Join Cross
      :- Project [value#2 AS items#5]
      :  +- SerializeFromObject [mapobjects(lambdavariable(MapObject, ObjectType(class java.lang.Object), true, 1), staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(lambdavariable(MapObject, ObjectType(class java.lang.Object), true, 1), StringType, ObjectType(class java.lang.String)), true, false, true), input[0, [Ljava.lang.String;, true], None) AS value#2]
      :     +- ExternalRDD [obj#1]
      +- ResolvedHint (strategy=broadcast)
         +- Aggregate [collect_list(struct(antecedent, antecedent#57, consequent, consequent#58), 0, 0) AS prediction#68]
            +- Filter (NOT isnull(antecedent#57) AND NOT isnull(consequent#58))
               +- LogicalRDD [antecedent#57, consequent#58, confidence#59, lift#60, support#61], false
   
   == Analyzed Logical Plan ==
   items: array<string>, prediction: array<string>
   Project [items#5, CASE WHEN NOT isnull(items#5) THEN aggregate(prediction#68, cast(array() as array<string>), lambdafunction(CASE WHEN forall(lambda y_1#74.antecedent, lambdafunction(array_contains(items#5, lambda x_2#76), lambda x_2#76, false)) THEN array_union(lambda x_0#73, array_except(lambda y_1#74.consequent, items#5)) ELSE lambda x_0#73 END, lambda x_0#73, lambda y_1#74, false), lambdafunction(lambda x_3#75, lambda x_3#75, false)) ELSE cast(array() as array<string>) END AS prediction#72]
   +- Join Cross
      :- Project [value#2 AS items#5]
      :  +- SerializeFromObject [mapobjects(lambdavariable(MapObject, ObjectType(class java.lang.Object), true, 1), staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(lambdavariable(MapObject, ObjectType(class java.lang.Object), true, 1), StringType, ObjectType(class java.lang.String)), true, false, true), input[0, [Ljava.lang.String;, true], None) AS value#2]
      :     +- ExternalRDD [obj#1]
      +- ResolvedHint (strategy=broadcast)
         +- Aggregate [collect_list(struct(antecedent, antecedent#57, consequent, consequent#58), 0, 0) AS prediction#68]
            +- Filter (NOT isnull(antecedent#57) AND NOT isnull(consequent#58))
               +- LogicalRDD [antecedent#57, consequent#58, confidence#59, lift#60, support#61], false
   
   == Optimized Logical Plan ==
   Project [items#5, CASE WHEN isnotnull(items#5) THEN aggregate(prediction#68, [], lambdafunction(CASE WHEN forall(lambda y_1#74.antecedent, lambdafunction(array_contains(items#5, lambda x_2#76), lambda x_2#76, false)) THEN array_union(lambda x_0#73, array_except(lambda y_1#74.consequent, items#5)) ELSE lambda x_0#73 END, lambda x_0#73, lambda y_1#74, false), lambdafunction(lambda x_3#75, lambda x_3#75, false)) ELSE [] END AS prediction#72]
   +- Join Cross, rightHint=(strategy=broadcast)
      :- InMemoryRelation [items#5], StorageLevel(disk, memory, deserialized, 1 replicas)
      :     +- *(1) Project [value#2 AS items#5]
      :        +- *(1) SerializeFromObject [mapobjects(lambdavariable(MapObject, ObjectType(class java.lang.Object), true, -1), staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(lambdavariable(MapObject, ObjectType(class java.lang.Object), true, -1), StringType, ObjectType(class java.lang.String)), true, false, true), input[0, [Ljava.lang.String;, true], None) AS value#2]
      :           +- Scan[obj#1]
      +- Aggregate [collect_list(struct(antecedent, antecedent#57, consequent, consequent#58), 0, 0) AS prediction#68]
         +- Project [antecedent#57, consequent#58]
            +- LogicalRDD [antecedent#57, consequent#58, confidence#59, lift#60, support#61], false
   
   == Physical Plan ==
   AdaptiveSparkPlan isFinalPlan=false
   +- Project [items#5, CASE WHEN isnotnull(items#5) THEN aggregate(prediction#68, [], lambdafunction(CASE WHEN forall(lambda y_1#74.antecedent, lambdafunction(array_contains(items#5, lambda x_2#76), lambda x_2#76, false)) THEN array_union(lambda x_0#73, array_except(lambda y_1#74.consequent, items#5)) ELSE lambda x_0#73 END, lambda x_0#73, lambda y_1#74, false), lambdafunction(lambda x_3#75, lambda x_3#75, false)) ELSE [] END AS prediction#72]
      +- BroadcastNestedLoopJoin BuildRight, Cross
         :- InMemoryTableScan [items#5]
         :     +- InMemoryRelation [items#5], StorageLevel(disk, memory, deserialized, 1 replicas)
         :           +- *(1) Project [value#2 AS items#5]
         :              +- *(1) SerializeFromObject [mapobjects(lambdavariable(MapObject, ObjectType(class java.lang.Object), true, -1), staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(lambdavariable(MapObject, ObjectType(class java.lang.Object), true, -1), StringType, ObjectType(class java.lang.String)), true, false, true), input[0, [Ljava.lang.String;, true], None) AS value#2]
         :                 +- Scan[obj#1]
         +- BroadcastExchange IdentityBroadcastMode, [plan_id=117]
            +- ObjectHashAggregate(keys=[], functions=[collect_list(struct(antecedent, antecedent#57, consequent, consequent#58), 0, 0)], output=[prediction#68])
               +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=114]
                  +- ObjectHashAggregate(keys=[], functions=[partial_collect_list(struct(antecedent, antecedent#57, consequent, consequent#58), 0, 0)], output=[buf#95])
                     +- Project [antecedent#57, consequent#58]
                        +- Scan ExistingRDD[antecedent#57,consequent#58,confidence#59,lift#60,support#61]
   
   
   scala> Seq.range(0, 100).foreach{i => model.transform(df).count()} // warms up
   
   scala> val start = System.currentTimeMillis; Seq.range(0, 100).foreach{i => model.transform(df).count()}; val end = System.currentTimeMillis; end - start
   start: Long = 1678693708534
   end: Long = 1678693713436
   res6: Long = 4902
   
   scala> val start = System.currentTimeMillis; Seq.range(0, 100).foreach{i => model.transform(df).schema}; val end = System.currentTimeMillis; end - start
   start: Long = 1678693713596
   end: Long = 1678693713807
   res7: Long = 211
   ```
   
   
   the transformation is a bit slower 4566 -> 4902, but when we need to analyze the dataframe it will be much faster 2095 -> 211 since the `collect` execution is delayed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] zhengruifeng commented on a diff in pull request #40263: [SPARK-42659][ML] Reimplement `FPGrowthModel.transform` with dataframe operations

Posted by "zhengruifeng (via GitHub)" <gi...@apache.org>.
zhengruifeng commented on code in PR #40263:
URL: https://github.com/apache/spark/pull/40263#discussion_r1125383324


##########
mllib/src/main/scala/org/apache/spark/ml/fpm/FPGrowth.scala:
##########
@@ -275,29 +274,38 @@ class FPGrowthModel private[ml] (
   @Since("2.2.0")
   override def transform(dataset: Dataset[_]): DataFrame = {
     transformSchema(dataset.schema, logging = true)
-    genericTransform(dataset)
-  }
-
-  private def genericTransform(dataset: Dataset[_]): DataFrame = {
-    val rules: Array[(Seq[Any], Seq[Any])] = associationRules.select("antecedent", "consequent")
-      .rdd.map(r => (r.getSeq(0), r.getSeq(1)))
-      .collect().asInstanceOf[Array[(Seq[Any], Seq[Any])]]
-    val brRules = dataset.sparkSession.sparkContext.broadcast(rules)
-
-    val dt = dataset.schema($(itemsCol)).dataType
-    // For each rule, examine the input items and summarize the consequents
-    val predictUDF = SparkUserDefinedFunction((items: Seq[Any]) => {
-      if (items != null) {
-        val itemset = items.toSet
-        brRules.value.filter(_._1.forall(itemset.contains))
-          .flatMap(_._2.filter(!itemset.contains(_))).distinct
-      } else {
-        Seq.empty
-      }},
-      dt,
-      Nil
+    val arrayType = associationRules.schema("consequent").dataType
+
+    dataset.crossJoin(
+      broadcast(
+        associationRules
+          .where(not(isnull(col("antecedent"))) &&
+            not(isnull(col("consequent"))))
+          .select(
+            collect_list(
+              struct("antecedent", "consequent")
+            ).as($(predictionCol))
+          )
+      )
+    ).withColumn(
+      $(predictionCol),
+      when(not(isnull(col($(itemsCol)))),
+        array_sort(
+          array_distinct(
+            aggregate(
+              col($(predictionCol)),
+              array().cast(arrayType),
+              (r, s) => when(
+                forall(s.getField("antecedent"),
+                  c => array_contains(col($(itemsCol)), c)),
+                array_union(r,
+                  array_except(s.getField("consequent"), col($(itemsCol))))
+              ).otherwise(r)
+            )
+          )
+        )
+      ).otherwise(array().cast(arrayType))

Review Comment:
   actually the main goals are:
   1, try to avoid eager action in transformation; like what we did in https://github.com/apache/spark/commit/6a0713a141fa98d83029d8388508cbbc40fd554e
   2, try to avoid collect the DF `associationRules` to the driver, I feel such collection will be a potential problem in Spark Connect, which there are many Spark Sessions in the driver. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] zhengruifeng closed pull request #40263: [SPARK-42659][ML] Reimplement `FPGrowthModel.transform` with dataframe operations

Posted by "zhengruifeng (via GitHub)" <gi...@apache.org>.
zhengruifeng closed pull request #40263: [SPARK-42659][ML] Reimplement `FPGrowthModel.transform` with dataframe operations
URL: https://github.com/apache/spark/pull/40263


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] srowen commented on a diff in pull request #40263: [SPARK-42659][ML] Reimplement `FPGrowthModel.transform` with dataframe operations

Posted by "srowen (via GitHub)" <gi...@apache.org>.
srowen commented on code in PR #40263:
URL: https://github.com/apache/spark/pull/40263#discussion_r1124505521


##########
mllib/src/main/scala/org/apache/spark/ml/fpm/FPGrowth.scala:
##########
@@ -275,29 +274,38 @@ class FPGrowthModel private[ml] (
   @Since("2.2.0")
   override def transform(dataset: Dataset[_]): DataFrame = {
     transformSchema(dataset.schema, logging = true)
-    genericTransform(dataset)
-  }
-
-  private def genericTransform(dataset: Dataset[_]): DataFrame = {
-    val rules: Array[(Seq[Any], Seq[Any])] = associationRules.select("antecedent", "consequent")
-      .rdd.map(r => (r.getSeq(0), r.getSeq(1)))
-      .collect().asInstanceOf[Array[(Seq[Any], Seq[Any])]]
-    val brRules = dataset.sparkSession.sparkContext.broadcast(rules)
-
-    val dt = dataset.schema($(itemsCol)).dataType
-    // For each rule, examine the input items and summarize the consequents
-    val predictUDF = SparkUserDefinedFunction((items: Seq[Any]) => {
-      if (items != null) {
-        val itemset = items.toSet
-        brRules.value.filter(_._1.forall(itemset.contains))
-          .flatMap(_._2.filter(!itemset.contains(_))).distinct
-      } else {
-        Seq.empty
-      }},
-      dt,
-      Nil
+    val arrayType = associationRules.schema("consequent").dataType
+
+    dataset.crossJoin(
+      broadcast(
+        associationRules
+          .where(not(isnull(col("antecedent"))) &&
+            not(isnull(col("consequent"))))
+          .select(
+            collect_list(
+              struct("antecedent", "consequent")
+            ).as($(predictionCol))
+          )
+      )
+    ).withColumn(
+      $(predictionCol),
+      when(not(isnull(col($(itemsCol)))),
+        array_sort(
+          array_distinct(
+            aggregate(
+              col($(predictionCol)),
+              array().cast(arrayType),
+              (r, s) => when(
+                forall(s.getField("antecedent"),
+                  c => array_contains(col($(itemsCol)), c)),
+                array_union(r,
+                  array_except(s.getField("consequent"), col($(itemsCol))))
+              ).otherwise(r)
+            )
+          )
+        )
+      ).otherwise(array().cast(arrayType))

Review Comment:
   Agreed, a join could be a lot slower



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] zhengruifeng commented on a diff in pull request #40263: [SPARK-42659][ML] Reimplement `FPGrowthModel.transform` with dataframe operations

Posted by "zhengruifeng (via GitHub)" <gi...@apache.org>.
zhengruifeng commented on code in PR #40263:
URL: https://github.com/apache/spark/pull/40263#discussion_r1125384107


##########
mllib/src/main/scala/org/apache/spark/ml/fpm/FPGrowth.scala:
##########
@@ -275,29 +274,38 @@ class FPGrowthModel private[ml] (
   @Since("2.2.0")
   override def transform(dataset: Dataset[_]): DataFrame = {
     transformSchema(dataset.schema, logging = true)
-    genericTransform(dataset)
-  }
-
-  private def genericTransform(dataset: Dataset[_]): DataFrame = {
-    val rules: Array[(Seq[Any], Seq[Any])] = associationRules.select("antecedent", "consequent")
-      .rdd.map(r => (r.getSeq(0), r.getSeq(1)))
-      .collect().asInstanceOf[Array[(Seq[Any], Seq[Any])]]
-    val brRules = dataset.sparkSession.sparkContext.broadcast(rules)
-
-    val dt = dataset.schema($(itemsCol)).dataType
-    // For each rule, examine the input items and summarize the consequents
-    val predictUDF = SparkUserDefinedFunction((items: Seq[Any]) => {
-      if (items != null) {
-        val itemset = items.toSet
-        brRules.value.filter(_._1.forall(itemset.contains))
-          .flatMap(_._2.filter(!itemset.contains(_))).distinct
-      } else {
-        Seq.empty
-      }},
-      dt,
-      Nil
+    val arrayType = associationRules.schema("consequent").dataType
+
+    dataset.crossJoin(
+      broadcast(
+        associationRules
+          .where(not(isnull(col("antecedent"))) &&
+            not(isnull(col("consequent"))))
+          .select(
+            collect_list(
+              struct("antecedent", "consequent")
+            ).as($(predictionCol))
+          )
+      )
+    ).withColumn(
+      $(predictionCol),
+      when(not(isnull(col($(itemsCol)))),
+        array_sort(
+          array_distinct(
+            aggregate(
+              col($(predictionCol)),
+              array().cast(arrayType),
+              (r, s) => when(
+                forall(s.getField("antecedent"),
+                  c => array_contains(col($(itemsCol)), c)),
+                array_union(r,
+                  array_except(s.getField("consequent"), col($(itemsCol))))
+              ).otherwise(r)
+            )
+          )
+        )
+      ).otherwise(array().cast(arrayType))

Review Comment:
   > It looks more complex than the spark UDF code before
   
   I think I can simplify it a bit.
   
   > a join could be a lot slower
   
   Since it is a BroadcastHashJoin, so I think will not be very bad



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] srowen commented on a diff in pull request #40263: [SPARK-42659][ML] Reimplement `FPGrowthModel.transform` with dataframe operations

Posted by "srowen (via GitHub)" <gi...@apache.org>.
srowen commented on code in PR #40263:
URL: https://github.com/apache/spark/pull/40263#discussion_r1125476948


##########
mllib/src/main/scala/org/apache/spark/ml/fpm/FPGrowth.scala:
##########
@@ -275,29 +274,38 @@ class FPGrowthModel private[ml] (
   @Since("2.2.0")
   override def transform(dataset: Dataset[_]): DataFrame = {
     transformSchema(dataset.schema, logging = true)
-    genericTransform(dataset)
-  }
-
-  private def genericTransform(dataset: Dataset[_]): DataFrame = {
-    val rules: Array[(Seq[Any], Seq[Any])] = associationRules.select("antecedent", "consequent")
-      .rdd.map(r => (r.getSeq(0), r.getSeq(1)))
-      .collect().asInstanceOf[Array[(Seq[Any], Seq[Any])]]
-    val brRules = dataset.sparkSession.sparkContext.broadcast(rules)
-
-    val dt = dataset.schema($(itemsCol)).dataType
-    // For each rule, examine the input items and summarize the consequents
-    val predictUDF = SparkUserDefinedFunction((items: Seq[Any]) => {
-      if (items != null) {
-        val itemset = items.toSet
-        brRules.value.filter(_._1.forall(itemset.contains))
-          .flatMap(_._2.filter(!itemset.contains(_))).distinct
-      } else {
-        Seq.empty
-      }},
-      dt,
-      Nil
+    val arrayType = associationRules.schema("consequent").dataType
+
+    dataset.crossJoin(
+      broadcast(
+        associationRules
+          .where(not(isnull(col("antecedent"))) &&
+            not(isnull(col("consequent"))))
+          .select(
+            collect_list(
+              struct("antecedent", "consequent")
+            ).as($(predictionCol))
+          )
+      )
+    ).withColumn(
+      $(predictionCol),
+      when(not(isnull(col($(itemsCol)))),
+        array_sort(
+          array_distinct(
+            aggregate(
+              col($(predictionCol)),
+              array().cast(arrayType),
+              (r, s) => when(
+                forall(s.getField("antecedent"),
+                  c => array_contains(col($(itemsCol)), c)),
+                array_union(r,
+                  array_except(s.getField("consequent"), col($(itemsCol))))
+              ).otherwise(r)
+            )
+          )
+        )
+      ).otherwise(array().cast(arrayType))

Review Comment:
   A quick benchmark on medium-sized data would help, for sure.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] srowen commented on pull request #40263: [SPARK-42659][ML] Reimplement `FPGrowthModel.transform` with dataframe operations

Posted by "srowen (via GitHub)" <gi...@apache.org>.
srowen commented on PR #40263:
URL: https://github.com/apache/spark/pull/40263#issuecomment-1477246959

   I don't know enough to say whether it's worth a new method. Can we start with the change that needs no new API, is it a big enough win?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] WeichenXu123 commented on a diff in pull request #40263: [SPARK-42659][ML] Reimplement `FPGrowthModel.transform` with dataframe operations

Posted by "WeichenXu123 (via GitHub)" <gi...@apache.org>.
WeichenXu123 commented on code in PR #40263:
URL: https://github.com/apache/spark/pull/40263#discussion_r1124253537


##########
mllib/src/main/scala/org/apache/spark/ml/fpm/FPGrowth.scala:
##########
@@ -275,29 +274,38 @@ class FPGrowthModel private[ml] (
   @Since("2.2.0")
   override def transform(dataset: Dataset[_]): DataFrame = {
     transformSchema(dataset.schema, logging = true)
-    genericTransform(dataset)
-  }
-
-  private def genericTransform(dataset: Dataset[_]): DataFrame = {
-    val rules: Array[(Seq[Any], Seq[Any])] = associationRules.select("antecedent", "consequent")
-      .rdd.map(r => (r.getSeq(0), r.getSeq(1)))
-      .collect().asInstanceOf[Array[(Seq[Any], Seq[Any])]]
-    val brRules = dataset.sparkSession.sparkContext.broadcast(rules)
-
-    val dt = dataset.schema($(itemsCol)).dataType
-    // For each rule, examine the input items and summarize the consequents
-    val predictUDF = SparkUserDefinedFunction((items: Seq[Any]) => {
-      if (items != null) {
-        val itemset = items.toSet
-        brRules.value.filter(_._1.forall(itemset.contains))
-          .flatMap(_._2.filter(!itemset.contains(_))).distinct
-      } else {
-        Seq.empty
-      }},
-      dt,
-      Nil
+    val arrayType = associationRules.schema("consequent").dataType
+
+    dataset.crossJoin(
+      broadcast(
+        associationRules
+          .where(not(isnull(col("antecedent"))) &&
+            not(isnull(col("consequent"))))
+          .select(
+            collect_list(
+              struct("antecedent", "consequent")
+            ).as($(predictionCol))
+          )
+      )
+    ).withColumn(
+      $(predictionCol),
+      when(not(isnull(col($(itemsCol)))),
+        array_sort(
+          array_distinct(
+            aggregate(
+              col($(predictionCol)),
+              array().cast(arrayType),
+              (r, s) => when(
+                forall(s.getField("antecedent"),
+                  c => array_contains(col($(itemsCol)), c)),
+                array_union(r,
+                  array_except(s.getField("consequent"), col($(itemsCol))))
+              ).otherwise(r)
+            )
+          )
+        )
+      ).otherwise(array().cast(arrayType))

Review Comment:
   Does this change improve performance ?
   It looks more complex than the spark UDF code before.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] zhengruifeng commented on pull request #40263: [SPARK-42659][ML] Reimplement `FPGrowthModel.transform` with dataframe operations

Posted by "zhengruifeng (via GitHub)" <gi...@apache.org>.
zhengruifeng commented on PR #40263:
URL: https://github.com/apache/spark/pull/40263#issuecomment-1467821846

   yes, the `BroadcastNestedLoopJoin` is slower.
   
   Then I have a try with subquery, and it's faster in both execution and analysis, but I have to create temp view and write the sql query then, see https://github.com/apache/spark/pull/40263/commits/63595ba03d9f18fe0b43bfb09f974ea50cb2c651
   
   `model.transform(df).count()`: 4566 -> 4902
   `model.transform(df).schema`: 2095 -> 298
   
   So I'm trying to add a new method `Dataset.withScalarSubquery` in https://github.com/apache/spark/pull/40263/commits/c41ac094eb40520948d95108a78431694a33772d 
   
   not sure whether it is the correct way to support `ScalarSubquery` in DataFrame APIs, but it is actually a pain point to me.
   
   ```
   scala> model.transform(df).explain("extended")
   == Parsed Logical Plan ==
   'Project [items#5, CASE WHEN NOT isnull('items) THEN aggregate('prediction, cast(array() as array<string>), lambdafunction(CASE WHEN forall(lambda 'y_1[antecedent], lambdafunction(array_contains('items, lambda 'x_2), lambda 'x_2, false)) THEN array_union(lambda 'x_0, array_except(lambda 'y_1[consequent], 'items)) ELSE lambda 'x_0 END, lambda 'x_0, lambda 'y_1, false), lambdafunction(lambda 'x_3, lambda 'x_3, false)) ELSE cast(array() as array<string>) END AS prediction#74]
   +- Project [items#5, scalar-subquery#70 [] AS prediction#71]
      :  +- Aggregate [collect_list(struct(antecedent, antecedent#57, consequent, consequent#58), 0, 0) AS collect_list(struct(antecedent, consequent))#68]
      :     +- Filter (NOT isnull(antecedent#57) AND NOT isnull(consequent#58))
      :        +- LogicalRDD [antecedent#57, consequent#58, confidence#59, lift#60, support#61], false
      +- Project [value#2 AS items#5]
         +- SerializeFromObject [mapobjects(lambdavariable(MapObject, ObjectType(class java.lang.Object), true, 1), staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(lambdavariable(MapObject, ObjectType(class java.lang.Object), true, 1), StringType, ObjectType(class java.lang.String)), true, false, true), input[0, [Ljava.lang.String;, true], None) AS value#2]
            +- ExternalRDD [obj#1]
   
   == Analyzed Logical Plan ==
   items: array<string>, prediction: array<string>
   Project [items#5, CASE WHEN NOT isnull(items#5) THEN aggregate(prediction#71, cast(array() as array<string>), lambdafunction(CASE WHEN forall(lambda y_1#76.antecedent, lambdafunction(array_contains(items#5, lambda x_2#78), lambda x_2#78, false)) THEN array_union(lambda x_0#75, array_except(lambda y_1#76.consequent, items#5)) ELSE lambda x_0#75 END, lambda x_0#75, lambda y_1#76, false), lambdafunction(lambda x_3#77, lambda x_3#77, false)) ELSE cast(array() as array<string>) END AS prediction#74]
   +- Project [items#5, scalar-subquery#70 [] AS prediction#71]
      :  +- Aggregate [collect_list(struct(antecedent, antecedent#57, consequent, consequent#58), 0, 0) AS collect_list(struct(antecedent, consequent))#68]
      :     +- Filter (NOT isnull(antecedent#57) AND NOT isnull(consequent#58))
      :        +- LogicalRDD [antecedent#57, consequent#58, confidence#59, lift#60, support#61], false
      +- Project [value#2 AS items#5]
         +- SerializeFromObject [mapobjects(lambdavariable(MapObject, ObjectType(class java.lang.Object), true, 1), staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(lambdavariable(MapObject, ObjectType(class java.lang.Object), true, 1), StringType, ObjectType(class java.lang.String)), true, false, true), input[0, [Ljava.lang.String;, true], None) AS value#2]
            +- ExternalRDD [obj#1]
   
   == Optimized Logical Plan ==
   Project [items#5, CASE WHEN isnotnull(items#5) THEN aggregate(scalar-subquery#70 [], [], lambdafunction(CASE WHEN forall(lambda y_1#76.antecedent, lambdafunction(array_contains(items#5, lambda x_2#78), lambda x_2#78, false)) THEN array_union(lambda x_0#75, array_except(lambda y_1#76.consequent, items#5)) ELSE lambda x_0#75 END, lambda x_0#75, lambda y_1#76, false), lambdafunction(lambda x_3#77, lambda x_3#77, false)) ELSE [] END AS prediction#74]
   :  +- Aggregate [collect_list(struct(antecedent, antecedent#57, consequent, consequent#58), 0, 0) AS collect_list(struct(antecedent, consequent))#68]
   :     +- Project [antecedent#57, consequent#58]
   :        +- LogicalRDD [antecedent#57, consequent#58, confidence#59, lift#60, support#61], false
   +- InMemoryRelation [items#5], StorageLevel(disk, memory, deserialized, 1 replicas)
         +- *(1) Project [value#2 AS items#5]
            +- *(1) SerializeFromObject [mapobjects(lambdavariable(MapObject, ObjectType(class java.lang.Object), true, -1), staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(lambdavariable(MapObject, ObjectType(class java.lang.Object), true, -1), StringType, ObjectType(class java.lang.String)), true, false, true), input[0, [Ljava.lang.String;, true], None) AS value#2]
               +- Scan[obj#1]
   
   == Physical Plan ==
   AdaptiveSparkPlan isFinalPlan=false
   +- Project [items#5, CASE WHEN isnotnull(items#5) THEN aggregate(Subquery subquery#70, [id=#105], [], lambdafunction(CASE WHEN forall(lambda y_1#76.antecedent, lambdafunction(array_contains(items#5, lambda x_2#78), lambda x_2#78, false)) THEN array_union(lambda x_0#75, array_except(lambda y_1#76.consequent, items#5)) ELSE lambda x_0#75 END, lambda x_0#75, lambda y_1#76, false), lambdafunction(lambda x_3#77, lambda x_3#77, false)) ELSE [] END AS prediction#74]
      :  +- Subquery subquery#70, [id=#105]
      :     +- AdaptiveSparkPlan isFinalPlan=false
      :        +- ObjectHashAggregate(keys=[], functions=[collect_list(struct(antecedent, antecedent#57, consequent, consequent#58), 0, 0)], output=[collect_list(struct(antecedent, consequent))#68])
      :           +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=103]
      :              +- ObjectHashAggregate(keys=[], functions=[partial_collect_list(struct(antecedent, antecedent#57, consequent, consequent#58), 0, 0)], output=[buf#97])
      :                 +- Project [antecedent#57, consequent#58]
      :                    +- Scan ExistingRDD[antecedent#57,consequent#58,confidence#59,lift#60,support#61]
      +- InMemoryTableScan [items#5]
            +- InMemoryRelation [items#5], StorageLevel(disk, memory, deserialized, 1 replicas)
                  +- *(1) Project [value#2 AS items#5]
                     +- *(1) SerializeFromObject [mapobjects(lambdavariable(MapObject, ObjectType(class java.lang.Object), true, -1), staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(lambdavariable(MapObject, ObjectType(class java.lang.Object), true, -1), StringType, ObjectType(class java.lang.String)), true, false, true), input[0, [Ljava.lang.String;, true], None) AS value#2]
                        +- Scan[obj#1]
   
   
   scala> Seq.range(0, 100).foreach{i => model.transform(df).count()} // warms up
   
   scala> val start = System.currentTimeMillis; Seq.range(0, 100).foreach{i => model.transform(df).count()}; val end = System.currentTimeMillis; end - start
   start: Long = 1678788928604
   end: Long = 1678788931650
   res4: Long = 3046
   
   scala> val start = System.currentTimeMillis; Seq.range(0, 100).foreach{i => model.transform(df).schema}; val end = System.currentTimeMillis; end - start
   start: Long = 1678788931785
   end: Long = 1678788932083
   res5: Long = 298
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] zhengruifeng commented on pull request #40263: [SPARK-42659][ML] Reimplement `FPGrowthModel.transform` with dataframe operations

Posted by "zhengruifeng (via GitHub)" <gi...@apache.org>.
zhengruifeng commented on PR #40263:
URL: https://github.com/apache/spark/pull/40263#issuecomment-1477193966

   TL;DR  I want to apply scalar subquery to optimize `FPGrowthModel.transform`, there are two options:
   
   1, create temp views and use `spark.sql`, see https://github.com/apache/spark/commit/63595ba03d9f18fe0b43bfb09f974ea50cb2c651;
   
   2, add `private[spark] def withScalarSubquery(colName: String, subquery: Dataset[_]): DataFrame`, it seems much more convenient but not sure whether it is a proper way.
   
   cc @cloud-fan @HyukjinKwon 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] zhengruifeng commented on pull request #40263: [SPARK-42659][ML] Reimplement `FPGrowthModel.transform` with dataframe operations

Posted by "zhengruifeng (via GitHub)" <gi...@apache.org>.
zhengruifeng commented on PR #40263:
URL: https://github.com/apache/spark/pull/40263#issuecomment-1478764552

   @srowen sounds reasonable


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org