You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by "zhengruifeng (via GitHub)" <gi...@apache.org> on 2023/03/03 05:49:34 UTC
[GitHub] [spark] zhengruifeng opened a new pull request, #40263: [SPARK-42659][ML] Reimplement `FPGrowthModel.transform` with dataframe operations
zhengruifeng opened a new pull request, #40263:
URL: https://github.com/apache/spark/pull/40263
### What changes were proposed in this pull request?
Reimplement `FPGrowthModel.transform` with dataframe operations
### Why are the changes needed?
delay the `collect()` of model dataframe `associationRules`
### Does this PR introduce _any_ user-facing change?
no
### How was this patch tested?
existing UT
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] [spark] srowen commented on pull request #40263: [SPARK-42659][ML] Reimplement `FPGrowthModel.transform` with dataframe operations
Posted by "srowen (via GitHub)" <gi...@apache.org>.
srowen commented on PR #40263:
URL: https://github.com/apache/spark/pull/40263#issuecomment-1473071461
If it's faster and gives the right answers, sure
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] [spark] srowen commented on pull request #40263: [SPARK-42659][ML] Reimplement `FPGrowthModel.transform` with dataframe operations
Posted by "srowen (via GitHub)" <gi...@apache.org>.
srowen commented on PR #40263:
URL: https://github.com/apache/spark/pull/40263#issuecomment-1466280043
So this seems slower on a medium-sized data set. I don't know if delaying the collect() matters much; the overall execution time matters. I'm worried that this gets much slower on 1M or 10M records. Does this buy us other benefits, like, is this necessary to support "Safe Spark" or something?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] [spark] zhengruifeng commented on pull request #40263: [SPARK-42659][ML] Reimplement `FPGrowthModel.transform` with dataframe operations
Posted by "zhengruifeng (via GitHub)" <gi...@apache.org>.
zhengruifeng commented on PR #40263:
URL: https://github.com/apache/spark/pull/40263#issuecomment-1473064231
@srowen if the latest performance test seems fine, then I'd ask the SQL guys whether we can have a subquery method in DataFrame APIs.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] [spark] zhengruifeng commented on pull request #40263: [SPARK-42659][ML] Reimplement `FPGrowthModel.transform` with dataframe operations
Posted by "zhengruifeng (via GitHub)" <gi...@apache.org>.
zhengruifeng commented on PR #40263:
URL: https://github.com/apache/spark/pull/40263#issuecomment-1465662299
I did a quick test with dataset `T10I4D100K` in http://fimi.uantwerpen.be/data/
fit:
```
scala> val df = sc.textFile("/Users/ruifeng.zheng/.dev/data/T10I4D100K.dat").map(_.split(" ")).toDF("items")
df: org.apache.spark.sql.DataFrame = [items: array<string>]
scala> df.count
res16: Long = 100000
scala> val model = new FPGrowth().setMinSupport(0.01).setMinConfidence(0.01).fit(df)
model: org.apache.spark.ml.fpm.FPGrowthModel = FPGrowthModel: uid=fpgrowth_92901252345a, numTrainingRecords=100000
scala> model.freqItemsets.count
res17: Long = 385
scala> model.associationRules.count
res18: Long = 21
scala> model.save("/tmp/fpm.model")
```
transformation:
```
import org.apache.spark.ml.fpm._
val df = sc.textFile("/Users/ruifeng.zheng/.dev/data/T10I4D100K.dat").map(_.split(" ")).toDF("items")
df.cache()
df.count()
val model = FPGrowthModel.load("/tmp/fpm.model")
model.transform(df).explain("extended")
Seq.range(0, 100).foreach{i => model.transform(df).count()} // warms up
val start = System.currentTimeMillis; Seq.range(0, 100).foreach{i => model.transform(df).count()}; val end = System.currentTimeMillis; end - start
val start = System.currentTimeMillis; Seq.range(0, 100).foreach{i => model.transform(df).schema}; val end = System.currentTimeMillis; end - start
```
master:
```
master
```
scala> val model = FPGrowthModel.load("/tmp/fpm.model")
model: org.apache.spark.ml.fpm.FPGrowthModel = FPGrowthModel: uid=fpgrowth_92901252345a, numTrainingRecords=100000
scala> model.transform(df).explain("extended")
== Parsed Logical Plan ==
'Project [items#5, UDF('items) AS prediction#70]
+- Project [value#2 AS items#5]
+- SerializeFromObject [mapobjects(lambdavariable(MapObject, ObjectType(class java.lang.Object), true, 1), staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(lambdavariable(MapObject, ObjectType(class java.lang.Object), true, 1), StringType, ObjectType(class java.lang.String)), true, false, true), input[0, [Ljava.lang.String;, true], None) AS value#2]
+- ExternalRDD [obj#1]
== Analyzed Logical Plan ==
items: array<string>, prediction: array<string>
Project [items#5, UDF(items#5) AS prediction#70]
+- Project [value#2 AS items#5]
+- SerializeFromObject [mapobjects(lambdavariable(MapObject, ObjectType(class java.lang.Object), true, 1), staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(lambdavariable(MapObject, ObjectType(class java.lang.Object), true, 1), StringType, ObjectType(class java.lang.String)), true, false, true), input[0, [Ljava.lang.String;, true], None) AS value#2]
+- ExternalRDD [obj#1]
== Optimized Logical Plan ==
Project [items#5, UDF(items#5) AS prediction#70]
+- InMemoryRelation [items#5], StorageLevel(disk, memory, deserialized, 1 replicas)
+- *(1) Project [value#2 AS items#5]
+- *(1) SerializeFromObject [mapobjects(lambdavariable(MapObject, ObjectType(class java.lang.Object), true, -1), staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(lambdavariable(MapObject, ObjectType(class java.lang.Object), true, -1), StringType, ObjectType(class java.lang.String)), true, false, true), input[0, [Ljava.lang.String;, true], None) AS value#2]
+- Scan[obj#1]
== Physical Plan ==
*(1) Project [items#5, UDF(items#5) AS prediction#70]
+- InMemoryTableScan [items#5]
+- InMemoryRelation [items#5], StorageLevel(disk, memory, deserialized, 1 replicas)
+- *(1) Project [value#2 AS items#5]
+- *(1) SerializeFromObject [mapobjects(lambdavariable(MapObject, ObjectType(class java.lang.Object), true, -1), staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(lambdavariable(MapObject, ObjectType(class java.lang.Object), true, -1), StringType, ObjectType(class java.lang.String)), true, false, true), input[0, [Ljava.lang.String;, true], None) AS value#2]
+- Scan[obj#1]
scala> Seq.range(0, 100).foreach{i => model.transform(df).count()} // warms up
scala> val start = System.currentTimeMillis; Seq.range(0, 100).foreach{i => model.transform(df).count()}; val end = System.currentTimeMillis; end - start
start: Long = 1678692855532
end: Long = 1678692860098
res4: Long = 4566
scala> val start = System.currentTimeMillis; Seq.range(0, 100).foreach{i => model.transform(df).schema}; val end = System.currentTimeMillis; end - start
start: Long = 1678692860277
end: Long = 1678692862372
res5: Long = 2095
```
this PR:
```
scala> model.transform(df).explain("extended")
== Parsed Logical Plan ==
'Project [items#5, CASE WHEN NOT isnull('items) THEN aggregate('prediction, cast(array() as array<string>), lambdafunction(CASE WHEN forall(lambda 'y_1[antecedent], lambdafunction(array_contains('items, lambda 'x_2), lambda 'x_2, false)) THEN array_union(lambda 'x_0, array_except(lambda 'y_1[consequent], 'items)) ELSE lambda 'x_0 END, lambda 'x_0, lambda 'y_1, false), lambdafunction(lambda 'x_3, lambda 'x_3, false)) ELSE cast(array() as array<string>) END AS prediction#72]
+- Join Cross
:- Project [value#2 AS items#5]
: +- SerializeFromObject [mapobjects(lambdavariable(MapObject, ObjectType(class java.lang.Object), true, 1), staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(lambdavariable(MapObject, ObjectType(class java.lang.Object), true, 1), StringType, ObjectType(class java.lang.String)), true, false, true), input[0, [Ljava.lang.String;, true], None) AS value#2]
: +- ExternalRDD [obj#1]
+- ResolvedHint (strategy=broadcast)
+- Aggregate [collect_list(struct(antecedent, antecedent#57, consequent, consequent#58), 0, 0) AS prediction#68]
+- Filter (NOT isnull(antecedent#57) AND NOT isnull(consequent#58))
+- LogicalRDD [antecedent#57, consequent#58, confidence#59, lift#60, support#61], false
== Analyzed Logical Plan ==
items: array<string>, prediction: array<string>
Project [items#5, CASE WHEN NOT isnull(items#5) THEN aggregate(prediction#68, cast(array() as array<string>), lambdafunction(CASE WHEN forall(lambda y_1#74.antecedent, lambdafunction(array_contains(items#5, lambda x_2#76), lambda x_2#76, false)) THEN array_union(lambda x_0#73, array_except(lambda y_1#74.consequent, items#5)) ELSE lambda x_0#73 END, lambda x_0#73, lambda y_1#74, false), lambdafunction(lambda x_3#75, lambda x_3#75, false)) ELSE cast(array() as array<string>) END AS prediction#72]
+- Join Cross
:- Project [value#2 AS items#5]
: +- SerializeFromObject [mapobjects(lambdavariable(MapObject, ObjectType(class java.lang.Object), true, 1), staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(lambdavariable(MapObject, ObjectType(class java.lang.Object), true, 1), StringType, ObjectType(class java.lang.String)), true, false, true), input[0, [Ljava.lang.String;, true], None) AS value#2]
: +- ExternalRDD [obj#1]
+- ResolvedHint (strategy=broadcast)
+- Aggregate [collect_list(struct(antecedent, antecedent#57, consequent, consequent#58), 0, 0) AS prediction#68]
+- Filter (NOT isnull(antecedent#57) AND NOT isnull(consequent#58))
+- LogicalRDD [antecedent#57, consequent#58, confidence#59, lift#60, support#61], false
== Optimized Logical Plan ==
Project [items#5, CASE WHEN isnotnull(items#5) THEN aggregate(prediction#68, [], lambdafunction(CASE WHEN forall(lambda y_1#74.antecedent, lambdafunction(array_contains(items#5, lambda x_2#76), lambda x_2#76, false)) THEN array_union(lambda x_0#73, array_except(lambda y_1#74.consequent, items#5)) ELSE lambda x_0#73 END, lambda x_0#73, lambda y_1#74, false), lambdafunction(lambda x_3#75, lambda x_3#75, false)) ELSE [] END AS prediction#72]
+- Join Cross, rightHint=(strategy=broadcast)
:- InMemoryRelation [items#5], StorageLevel(disk, memory, deserialized, 1 replicas)
: +- *(1) Project [value#2 AS items#5]
: +- *(1) SerializeFromObject [mapobjects(lambdavariable(MapObject, ObjectType(class java.lang.Object), true, -1), staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(lambdavariable(MapObject, ObjectType(class java.lang.Object), true, -1), StringType, ObjectType(class java.lang.String)), true, false, true), input[0, [Ljava.lang.String;, true], None) AS value#2]
: +- Scan[obj#1]
+- Aggregate [collect_list(struct(antecedent, antecedent#57, consequent, consequent#58), 0, 0) AS prediction#68]
+- Project [antecedent#57, consequent#58]
+- LogicalRDD [antecedent#57, consequent#58, confidence#59, lift#60, support#61], false
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Project [items#5, CASE WHEN isnotnull(items#5) THEN aggregate(prediction#68, [], lambdafunction(CASE WHEN forall(lambda y_1#74.antecedent, lambdafunction(array_contains(items#5, lambda x_2#76), lambda x_2#76, false)) THEN array_union(lambda x_0#73, array_except(lambda y_1#74.consequent, items#5)) ELSE lambda x_0#73 END, lambda x_0#73, lambda y_1#74, false), lambdafunction(lambda x_3#75, lambda x_3#75, false)) ELSE [] END AS prediction#72]
+- BroadcastNestedLoopJoin BuildRight, Cross
:- InMemoryTableScan [items#5]
: +- InMemoryRelation [items#5], StorageLevel(disk, memory, deserialized, 1 replicas)
: +- *(1) Project [value#2 AS items#5]
: +- *(1) SerializeFromObject [mapobjects(lambdavariable(MapObject, ObjectType(class java.lang.Object), true, -1), staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(lambdavariable(MapObject, ObjectType(class java.lang.Object), true, -1), StringType, ObjectType(class java.lang.String)), true, false, true), input[0, [Ljava.lang.String;, true], None) AS value#2]
: +- Scan[obj#1]
+- BroadcastExchange IdentityBroadcastMode, [plan_id=117]
+- ObjectHashAggregate(keys=[], functions=[collect_list(struct(antecedent, antecedent#57, consequent, consequent#58), 0, 0)], output=[prediction#68])
+- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=114]
+- ObjectHashAggregate(keys=[], functions=[partial_collect_list(struct(antecedent, antecedent#57, consequent, consequent#58), 0, 0)], output=[buf#95])
+- Project [antecedent#57, consequent#58]
+- Scan ExistingRDD[antecedent#57,consequent#58,confidence#59,lift#60,support#61]
scala> Seq.range(0, 100).foreach{i => model.transform(df).count()} // warms up
scala> val start = System.currentTimeMillis; Seq.range(0, 100).foreach{i => model.transform(df).count()}; val end = System.currentTimeMillis; end - start
start: Long = 1678693708534
end: Long = 1678693713436
res6: Long = 4902
scala> val start = System.currentTimeMillis; Seq.range(0, 100).foreach{i => model.transform(df).schema}; val end = System.currentTimeMillis; end - start
start: Long = 1678693713596
end: Long = 1678693713807
res7: Long = 211
```
the transformation is a bit slower 4566 -> 4902, but when we need to analyze the dataframe it will be much faster 2095 -> 211 since the `collect` execution is delayed.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] [spark] zhengruifeng commented on a diff in pull request #40263: [SPARK-42659][ML] Reimplement `FPGrowthModel.transform` with dataframe operations
Posted by "zhengruifeng (via GitHub)" <gi...@apache.org>.
zhengruifeng commented on code in PR #40263:
URL: https://github.com/apache/spark/pull/40263#discussion_r1125383324
##########
mllib/src/main/scala/org/apache/spark/ml/fpm/FPGrowth.scala:
##########
@@ -275,29 +274,38 @@ class FPGrowthModel private[ml] (
@Since("2.2.0")
override def transform(dataset: Dataset[_]): DataFrame = {
transformSchema(dataset.schema, logging = true)
- genericTransform(dataset)
- }
-
- private def genericTransform(dataset: Dataset[_]): DataFrame = {
- val rules: Array[(Seq[Any], Seq[Any])] = associationRules.select("antecedent", "consequent")
- .rdd.map(r => (r.getSeq(0), r.getSeq(1)))
- .collect().asInstanceOf[Array[(Seq[Any], Seq[Any])]]
- val brRules = dataset.sparkSession.sparkContext.broadcast(rules)
-
- val dt = dataset.schema($(itemsCol)).dataType
- // For each rule, examine the input items and summarize the consequents
- val predictUDF = SparkUserDefinedFunction((items: Seq[Any]) => {
- if (items != null) {
- val itemset = items.toSet
- brRules.value.filter(_._1.forall(itemset.contains))
- .flatMap(_._2.filter(!itemset.contains(_))).distinct
- } else {
- Seq.empty
- }},
- dt,
- Nil
+ val arrayType = associationRules.schema("consequent").dataType
+
+ dataset.crossJoin(
+ broadcast(
+ associationRules
+ .where(not(isnull(col("antecedent"))) &&
+ not(isnull(col("consequent"))))
+ .select(
+ collect_list(
+ struct("antecedent", "consequent")
+ ).as($(predictionCol))
+ )
+ )
+ ).withColumn(
+ $(predictionCol),
+ when(not(isnull(col($(itemsCol)))),
+ array_sort(
+ array_distinct(
+ aggregate(
+ col($(predictionCol)),
+ array().cast(arrayType),
+ (r, s) => when(
+ forall(s.getField("antecedent"),
+ c => array_contains(col($(itemsCol)), c)),
+ array_union(r,
+ array_except(s.getField("consequent"), col($(itemsCol))))
+ ).otherwise(r)
+ )
+ )
+ )
+ ).otherwise(array().cast(arrayType))
Review Comment:
actually the main goals are:
1, try to avoid eager action in transformation; like what we did in https://github.com/apache/spark/commit/6a0713a141fa98d83029d8388508cbbc40fd554e
2, try to avoid collect the DF `associationRules` to the driver, I feel such collection will be a potential problem in Spark Connect, which there are many Spark Sessions in the driver.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] [spark] zhengruifeng closed pull request #40263: [SPARK-42659][ML] Reimplement `FPGrowthModel.transform` with dataframe operations
Posted by "zhengruifeng (via GitHub)" <gi...@apache.org>.
zhengruifeng closed pull request #40263: [SPARK-42659][ML] Reimplement `FPGrowthModel.transform` with dataframe operations
URL: https://github.com/apache/spark/pull/40263
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] [spark] srowen commented on a diff in pull request #40263: [SPARK-42659][ML] Reimplement `FPGrowthModel.transform` with dataframe operations
Posted by "srowen (via GitHub)" <gi...@apache.org>.
srowen commented on code in PR #40263:
URL: https://github.com/apache/spark/pull/40263#discussion_r1124505521
##########
mllib/src/main/scala/org/apache/spark/ml/fpm/FPGrowth.scala:
##########
@@ -275,29 +274,38 @@ class FPGrowthModel private[ml] (
@Since("2.2.0")
override def transform(dataset: Dataset[_]): DataFrame = {
transformSchema(dataset.schema, logging = true)
- genericTransform(dataset)
- }
-
- private def genericTransform(dataset: Dataset[_]): DataFrame = {
- val rules: Array[(Seq[Any], Seq[Any])] = associationRules.select("antecedent", "consequent")
- .rdd.map(r => (r.getSeq(0), r.getSeq(1)))
- .collect().asInstanceOf[Array[(Seq[Any], Seq[Any])]]
- val brRules = dataset.sparkSession.sparkContext.broadcast(rules)
-
- val dt = dataset.schema($(itemsCol)).dataType
- // For each rule, examine the input items and summarize the consequents
- val predictUDF = SparkUserDefinedFunction((items: Seq[Any]) => {
- if (items != null) {
- val itemset = items.toSet
- brRules.value.filter(_._1.forall(itemset.contains))
- .flatMap(_._2.filter(!itemset.contains(_))).distinct
- } else {
- Seq.empty
- }},
- dt,
- Nil
+ val arrayType = associationRules.schema("consequent").dataType
+
+ dataset.crossJoin(
+ broadcast(
+ associationRules
+ .where(not(isnull(col("antecedent"))) &&
+ not(isnull(col("consequent"))))
+ .select(
+ collect_list(
+ struct("antecedent", "consequent")
+ ).as($(predictionCol))
+ )
+ )
+ ).withColumn(
+ $(predictionCol),
+ when(not(isnull(col($(itemsCol)))),
+ array_sort(
+ array_distinct(
+ aggregate(
+ col($(predictionCol)),
+ array().cast(arrayType),
+ (r, s) => when(
+ forall(s.getField("antecedent"),
+ c => array_contains(col($(itemsCol)), c)),
+ array_union(r,
+ array_except(s.getField("consequent"), col($(itemsCol))))
+ ).otherwise(r)
+ )
+ )
+ )
+ ).otherwise(array().cast(arrayType))
Review Comment:
Agreed, a join could be a lot slower
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] [spark] zhengruifeng commented on a diff in pull request #40263: [SPARK-42659][ML] Reimplement `FPGrowthModel.transform` with dataframe operations
Posted by "zhengruifeng (via GitHub)" <gi...@apache.org>.
zhengruifeng commented on code in PR #40263:
URL: https://github.com/apache/spark/pull/40263#discussion_r1125384107
##########
mllib/src/main/scala/org/apache/spark/ml/fpm/FPGrowth.scala:
##########
@@ -275,29 +274,38 @@ class FPGrowthModel private[ml] (
@Since("2.2.0")
override def transform(dataset: Dataset[_]): DataFrame = {
transformSchema(dataset.schema, logging = true)
- genericTransform(dataset)
- }
-
- private def genericTransform(dataset: Dataset[_]): DataFrame = {
- val rules: Array[(Seq[Any], Seq[Any])] = associationRules.select("antecedent", "consequent")
- .rdd.map(r => (r.getSeq(0), r.getSeq(1)))
- .collect().asInstanceOf[Array[(Seq[Any], Seq[Any])]]
- val brRules = dataset.sparkSession.sparkContext.broadcast(rules)
-
- val dt = dataset.schema($(itemsCol)).dataType
- // For each rule, examine the input items and summarize the consequents
- val predictUDF = SparkUserDefinedFunction((items: Seq[Any]) => {
- if (items != null) {
- val itemset = items.toSet
- brRules.value.filter(_._1.forall(itemset.contains))
- .flatMap(_._2.filter(!itemset.contains(_))).distinct
- } else {
- Seq.empty
- }},
- dt,
- Nil
+ val arrayType = associationRules.schema("consequent").dataType
+
+ dataset.crossJoin(
+ broadcast(
+ associationRules
+ .where(not(isnull(col("antecedent"))) &&
+ not(isnull(col("consequent"))))
+ .select(
+ collect_list(
+ struct("antecedent", "consequent")
+ ).as($(predictionCol))
+ )
+ )
+ ).withColumn(
+ $(predictionCol),
+ when(not(isnull(col($(itemsCol)))),
+ array_sort(
+ array_distinct(
+ aggregate(
+ col($(predictionCol)),
+ array().cast(arrayType),
+ (r, s) => when(
+ forall(s.getField("antecedent"),
+ c => array_contains(col($(itemsCol)), c)),
+ array_union(r,
+ array_except(s.getField("consequent"), col($(itemsCol))))
+ ).otherwise(r)
+ )
+ )
+ )
+ ).otherwise(array().cast(arrayType))
Review Comment:
> It looks more complex than the spark UDF code before
I think I can simplify it a bit.
> a join could be a lot slower
Since it is a BroadcastHashJoin, so I think will not be very bad
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] [spark] srowen commented on a diff in pull request #40263: [SPARK-42659][ML] Reimplement `FPGrowthModel.transform` with dataframe operations
Posted by "srowen (via GitHub)" <gi...@apache.org>.
srowen commented on code in PR #40263:
URL: https://github.com/apache/spark/pull/40263#discussion_r1125476948
##########
mllib/src/main/scala/org/apache/spark/ml/fpm/FPGrowth.scala:
##########
@@ -275,29 +274,38 @@ class FPGrowthModel private[ml] (
@Since("2.2.0")
override def transform(dataset: Dataset[_]): DataFrame = {
transformSchema(dataset.schema, logging = true)
- genericTransform(dataset)
- }
-
- private def genericTransform(dataset: Dataset[_]): DataFrame = {
- val rules: Array[(Seq[Any], Seq[Any])] = associationRules.select("antecedent", "consequent")
- .rdd.map(r => (r.getSeq(0), r.getSeq(1)))
- .collect().asInstanceOf[Array[(Seq[Any], Seq[Any])]]
- val brRules = dataset.sparkSession.sparkContext.broadcast(rules)
-
- val dt = dataset.schema($(itemsCol)).dataType
- // For each rule, examine the input items and summarize the consequents
- val predictUDF = SparkUserDefinedFunction((items: Seq[Any]) => {
- if (items != null) {
- val itemset = items.toSet
- brRules.value.filter(_._1.forall(itemset.contains))
- .flatMap(_._2.filter(!itemset.contains(_))).distinct
- } else {
- Seq.empty
- }},
- dt,
- Nil
+ val arrayType = associationRules.schema("consequent").dataType
+
+ dataset.crossJoin(
+ broadcast(
+ associationRules
+ .where(not(isnull(col("antecedent"))) &&
+ not(isnull(col("consequent"))))
+ .select(
+ collect_list(
+ struct("antecedent", "consequent")
+ ).as($(predictionCol))
+ )
+ )
+ ).withColumn(
+ $(predictionCol),
+ when(not(isnull(col($(itemsCol)))),
+ array_sort(
+ array_distinct(
+ aggregate(
+ col($(predictionCol)),
+ array().cast(arrayType),
+ (r, s) => when(
+ forall(s.getField("antecedent"),
+ c => array_contains(col($(itemsCol)), c)),
+ array_union(r,
+ array_except(s.getField("consequent"), col($(itemsCol))))
+ ).otherwise(r)
+ )
+ )
+ )
+ ).otherwise(array().cast(arrayType))
Review Comment:
A quick benchmark on medium-sized data would help, for sure.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] [spark] srowen commented on pull request #40263: [SPARK-42659][ML] Reimplement `FPGrowthModel.transform` with dataframe operations
Posted by "srowen (via GitHub)" <gi...@apache.org>.
srowen commented on PR #40263:
URL: https://github.com/apache/spark/pull/40263#issuecomment-1477246959
I don't know enough to say whether it's worth a new method. Can we start with the change that needs no new API, is it a big enough win?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] [spark] WeichenXu123 commented on a diff in pull request #40263: [SPARK-42659][ML] Reimplement `FPGrowthModel.transform` with dataframe operations
Posted by "WeichenXu123 (via GitHub)" <gi...@apache.org>.
WeichenXu123 commented on code in PR #40263:
URL: https://github.com/apache/spark/pull/40263#discussion_r1124253537
##########
mllib/src/main/scala/org/apache/spark/ml/fpm/FPGrowth.scala:
##########
@@ -275,29 +274,38 @@ class FPGrowthModel private[ml] (
@Since("2.2.0")
override def transform(dataset: Dataset[_]): DataFrame = {
transformSchema(dataset.schema, logging = true)
- genericTransform(dataset)
- }
-
- private def genericTransform(dataset: Dataset[_]): DataFrame = {
- val rules: Array[(Seq[Any], Seq[Any])] = associationRules.select("antecedent", "consequent")
- .rdd.map(r => (r.getSeq(0), r.getSeq(1)))
- .collect().asInstanceOf[Array[(Seq[Any], Seq[Any])]]
- val brRules = dataset.sparkSession.sparkContext.broadcast(rules)
-
- val dt = dataset.schema($(itemsCol)).dataType
- // For each rule, examine the input items and summarize the consequents
- val predictUDF = SparkUserDefinedFunction((items: Seq[Any]) => {
- if (items != null) {
- val itemset = items.toSet
- brRules.value.filter(_._1.forall(itemset.contains))
- .flatMap(_._2.filter(!itemset.contains(_))).distinct
- } else {
- Seq.empty
- }},
- dt,
- Nil
+ val arrayType = associationRules.schema("consequent").dataType
+
+ dataset.crossJoin(
+ broadcast(
+ associationRules
+ .where(not(isnull(col("antecedent"))) &&
+ not(isnull(col("consequent"))))
+ .select(
+ collect_list(
+ struct("antecedent", "consequent")
+ ).as($(predictionCol))
+ )
+ )
+ ).withColumn(
+ $(predictionCol),
+ when(not(isnull(col($(itemsCol)))),
+ array_sort(
+ array_distinct(
+ aggregate(
+ col($(predictionCol)),
+ array().cast(arrayType),
+ (r, s) => when(
+ forall(s.getField("antecedent"),
+ c => array_contains(col($(itemsCol)), c)),
+ array_union(r,
+ array_except(s.getField("consequent"), col($(itemsCol))))
+ ).otherwise(r)
+ )
+ )
+ )
+ ).otherwise(array().cast(arrayType))
Review Comment:
Does this change improve performance ?
It looks more complex than the spark UDF code before.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] [spark] zhengruifeng commented on pull request #40263: [SPARK-42659][ML] Reimplement `FPGrowthModel.transform` with dataframe operations
Posted by "zhengruifeng (via GitHub)" <gi...@apache.org>.
zhengruifeng commented on PR #40263:
URL: https://github.com/apache/spark/pull/40263#issuecomment-1467821846
yes, the `BroadcastNestedLoopJoin` is slower.
Then I have a try with subquery, and it's faster in both execution and analysis, but I have to create temp view and write the sql query then, see https://github.com/apache/spark/pull/40263/commits/63595ba03d9f18fe0b43bfb09f974ea50cb2c651
`model.transform(df).count()`: 4566 -> 4902
`model.transform(df).schema`: 2095 -> 298
So I'm trying to add a new method `Dataset.withScalarSubquery` in https://github.com/apache/spark/pull/40263/commits/c41ac094eb40520948d95108a78431694a33772d
not sure whether it is the correct way to support `ScalarSubquery` in DataFrame APIs, but it is actually a pain point to me.
```
scala> model.transform(df).explain("extended")
== Parsed Logical Plan ==
'Project [items#5, CASE WHEN NOT isnull('items) THEN aggregate('prediction, cast(array() as array<string>), lambdafunction(CASE WHEN forall(lambda 'y_1[antecedent], lambdafunction(array_contains('items, lambda 'x_2), lambda 'x_2, false)) THEN array_union(lambda 'x_0, array_except(lambda 'y_1[consequent], 'items)) ELSE lambda 'x_0 END, lambda 'x_0, lambda 'y_1, false), lambdafunction(lambda 'x_3, lambda 'x_3, false)) ELSE cast(array() as array<string>) END AS prediction#74]
+- Project [items#5, scalar-subquery#70 [] AS prediction#71]
: +- Aggregate [collect_list(struct(antecedent, antecedent#57, consequent, consequent#58), 0, 0) AS collect_list(struct(antecedent, consequent))#68]
: +- Filter (NOT isnull(antecedent#57) AND NOT isnull(consequent#58))
: +- LogicalRDD [antecedent#57, consequent#58, confidence#59, lift#60, support#61], false
+- Project [value#2 AS items#5]
+- SerializeFromObject [mapobjects(lambdavariable(MapObject, ObjectType(class java.lang.Object), true, 1), staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(lambdavariable(MapObject, ObjectType(class java.lang.Object), true, 1), StringType, ObjectType(class java.lang.String)), true, false, true), input[0, [Ljava.lang.String;, true], None) AS value#2]
+- ExternalRDD [obj#1]
== Analyzed Logical Plan ==
items: array<string>, prediction: array<string>
Project [items#5, CASE WHEN NOT isnull(items#5) THEN aggregate(prediction#71, cast(array() as array<string>), lambdafunction(CASE WHEN forall(lambda y_1#76.antecedent, lambdafunction(array_contains(items#5, lambda x_2#78), lambda x_2#78, false)) THEN array_union(lambda x_0#75, array_except(lambda y_1#76.consequent, items#5)) ELSE lambda x_0#75 END, lambda x_0#75, lambda y_1#76, false), lambdafunction(lambda x_3#77, lambda x_3#77, false)) ELSE cast(array() as array<string>) END AS prediction#74]
+- Project [items#5, scalar-subquery#70 [] AS prediction#71]
: +- Aggregate [collect_list(struct(antecedent, antecedent#57, consequent, consequent#58), 0, 0) AS collect_list(struct(antecedent, consequent))#68]
: +- Filter (NOT isnull(antecedent#57) AND NOT isnull(consequent#58))
: +- LogicalRDD [antecedent#57, consequent#58, confidence#59, lift#60, support#61], false
+- Project [value#2 AS items#5]
+- SerializeFromObject [mapobjects(lambdavariable(MapObject, ObjectType(class java.lang.Object), true, 1), staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(lambdavariable(MapObject, ObjectType(class java.lang.Object), true, 1), StringType, ObjectType(class java.lang.String)), true, false, true), input[0, [Ljava.lang.String;, true], None) AS value#2]
+- ExternalRDD [obj#1]
== Optimized Logical Plan ==
Project [items#5, CASE WHEN isnotnull(items#5) THEN aggregate(scalar-subquery#70 [], [], lambdafunction(CASE WHEN forall(lambda y_1#76.antecedent, lambdafunction(array_contains(items#5, lambda x_2#78), lambda x_2#78, false)) THEN array_union(lambda x_0#75, array_except(lambda y_1#76.consequent, items#5)) ELSE lambda x_0#75 END, lambda x_0#75, lambda y_1#76, false), lambdafunction(lambda x_3#77, lambda x_3#77, false)) ELSE [] END AS prediction#74]
: +- Aggregate [collect_list(struct(antecedent, antecedent#57, consequent, consequent#58), 0, 0) AS collect_list(struct(antecedent, consequent))#68]
: +- Project [antecedent#57, consequent#58]
: +- LogicalRDD [antecedent#57, consequent#58, confidence#59, lift#60, support#61], false
+- InMemoryRelation [items#5], StorageLevel(disk, memory, deserialized, 1 replicas)
+- *(1) Project [value#2 AS items#5]
+- *(1) SerializeFromObject [mapobjects(lambdavariable(MapObject, ObjectType(class java.lang.Object), true, -1), staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(lambdavariable(MapObject, ObjectType(class java.lang.Object), true, -1), StringType, ObjectType(class java.lang.String)), true, false, true), input[0, [Ljava.lang.String;, true], None) AS value#2]
+- Scan[obj#1]
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Project [items#5, CASE WHEN isnotnull(items#5) THEN aggregate(Subquery subquery#70, [id=#105], [], lambdafunction(CASE WHEN forall(lambda y_1#76.antecedent, lambdafunction(array_contains(items#5, lambda x_2#78), lambda x_2#78, false)) THEN array_union(lambda x_0#75, array_except(lambda y_1#76.consequent, items#5)) ELSE lambda x_0#75 END, lambda x_0#75, lambda y_1#76, false), lambdafunction(lambda x_3#77, lambda x_3#77, false)) ELSE [] END AS prediction#74]
: +- Subquery subquery#70, [id=#105]
: +- AdaptiveSparkPlan isFinalPlan=false
: +- ObjectHashAggregate(keys=[], functions=[collect_list(struct(antecedent, antecedent#57, consequent, consequent#58), 0, 0)], output=[collect_list(struct(antecedent, consequent))#68])
: +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=103]
: +- ObjectHashAggregate(keys=[], functions=[partial_collect_list(struct(antecedent, antecedent#57, consequent, consequent#58), 0, 0)], output=[buf#97])
: +- Project [antecedent#57, consequent#58]
: +- Scan ExistingRDD[antecedent#57,consequent#58,confidence#59,lift#60,support#61]
+- InMemoryTableScan [items#5]
+- InMemoryRelation [items#5], StorageLevel(disk, memory, deserialized, 1 replicas)
+- *(1) Project [value#2 AS items#5]
+- *(1) SerializeFromObject [mapobjects(lambdavariable(MapObject, ObjectType(class java.lang.Object), true, -1), staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(lambdavariable(MapObject, ObjectType(class java.lang.Object), true, -1), StringType, ObjectType(class java.lang.String)), true, false, true), input[0, [Ljava.lang.String;, true], None) AS value#2]
+- Scan[obj#1]
scala> Seq.range(0, 100).foreach{i => model.transform(df).count()} // warms up
scala> val start = System.currentTimeMillis; Seq.range(0, 100).foreach{i => model.transform(df).count()}; val end = System.currentTimeMillis; end - start
start: Long = 1678788928604
end: Long = 1678788931650
res4: Long = 3046
scala> val start = System.currentTimeMillis; Seq.range(0, 100).foreach{i => model.transform(df).schema}; val end = System.currentTimeMillis; end - start
start: Long = 1678788931785
end: Long = 1678788932083
res5: Long = 298
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] [spark] zhengruifeng commented on pull request #40263: [SPARK-42659][ML] Reimplement `FPGrowthModel.transform` with dataframe operations
Posted by "zhengruifeng (via GitHub)" <gi...@apache.org>.
zhengruifeng commented on PR #40263:
URL: https://github.com/apache/spark/pull/40263#issuecomment-1477193966
TL;DR I want to apply scalar subquery to optimize `FPGrowthModel.transform`, there are two options:
1, create temp views and use `spark.sql`, see https://github.com/apache/spark/commit/63595ba03d9f18fe0b43bfb09f974ea50cb2c651;
2, add `private[spark] def withScalarSubquery(colName: String, subquery: Dataset[_]): DataFrame`, it seems much more convenient but not sure whether it is a proper way.
cc @cloud-fan @HyukjinKwon
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] [spark] zhengruifeng commented on pull request #40263: [SPARK-42659][ML] Reimplement `FPGrowthModel.transform` with dataframe operations
Posted by "zhengruifeng (via GitHub)" <gi...@apache.org>.
zhengruifeng commented on PR #40263:
URL: https://github.com/apache/spark/pull/40263#issuecomment-1478764552
@srowen sounds reasonable
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org