You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Calvin Pietersen (Jira)" <ji...@apache.org> on 2022/08/24 00:38:00 UTC
[jira] [Updated] (SPARK-40200) unpersist cascades with Kryo, MEMORY_AND_DISK_SER and monotonically_increasing_id
[ https://issues.apache.org/jira/browse/SPARK-40200?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Calvin Pietersen updated SPARK-40200:
-------------------------------------
Description:
Unpersist of a parent dataset which has a column from `monotonically_increasing_id` cascades to a child dataset when
* joined on another dataset
* kryo serialization is enabled
* storage level is MEMORY_AND_DISK_SER
* not all rows join????
```
import org.apache.spark.sql.functions.monotonically_increasing_id
import org.apache.spark.storage.StorageLevel
case class a(value: String, id: Long)
val storageLevel = StorageLevel.MEMORY_AND_DISK_SER // cascades
//val storageLevel = StorageLevel.MEMORY_ONLY // doesn't cascade
val acc = sc.longAccumulator("acc")
val parent1DS = spark
.createDataset(Seq("a", "b", "c"))
.withColumn("id", monotonically_increasing_id)
.as[a]
.persist(storageLevel)
val parent2DS = spark
.createDataset(Seq(1, 2, 3)) // 0,1,2 doesn't cascade
.persist(storageLevel)
val childDS = parent1DS
.joinWith(parent2DS, parent1DS("id") === parent2DS("value"))
.map(i =>
{ acc.add(1) i }
).persist(storageLevel)
childDS.count
parent1DS.unpersist
childDS.count
acc.value should be(2)
```
was:
Unpersist of a parent dataset which has a column from `monotonically_increasing_id` cascades to a child dataset when
* joined on another dataset
* kryo serialization is enabled
* storage level is MEMORY_AND_DISK_SER
* not all rows join????
```
import org.apache.spark.sql.functions.monotonically_increasing_id
import org.apache.spark.storage.StorageLevel
case class a(value: String, id: Long)
val storageLevel = StorageLevel.MEMORY_AND_DISK_SER // cascades
//val storageLevel = StorageLevel.MEMORY_ONLY // doesn't cascade
val acc = sc.longAccumulator("acc")
val parent1DS = spark
.createDataset(Seq("a", "b", "c"))
.withColumn("id", monotonically_increasing_id)
.as[a]
.persist(storageLevel)
val parent2DS = spark
.createDataset(Seq(1, 2, 3)) // 0,1,2 doesn't cascade
.persist(storageLevel)
val childDS = parent1DS
.joinWith(parent2DS, parent1DS("id") === parent2DS("value"))
.map(i => {
acc.add(1)
i
}).persist(storageLevel)
childDS.count
parent1DS.unpersist
childDS.count
acc.value should be(2)
```
> unpersist cascades with Kryo, MEMORY_AND_DISK_SER and monotonically_increasing_id
> ---------------------------------------------------------------------------------
>
> Key: SPARK-40200
> URL: https://issues.apache.org/jira/browse/SPARK-40200
> Project: Spark
> Issue Type: Bug
> Components: Spark Core
> Affects Versions: 3.3.0
> Environment: spark-3.3.0
> Reporter: Calvin Pietersen
> Priority: Major
>
> Unpersist of a parent dataset which has a column from `monotonically_increasing_id` cascades to a child dataset when
> * joined on another dataset
> * kryo serialization is enabled
> * storage level is MEMORY_AND_DISK_SER
> * not all rows join????
>
>
>
>
> ```
> import org.apache.spark.sql.functions.monotonically_increasing_id
> import org.apache.spark.storage.StorageLevel
> case class a(value: String, id: Long)
> val storageLevel = StorageLevel.MEMORY_AND_DISK_SER // cascades
> //val storageLevel = StorageLevel.MEMORY_ONLY // doesn't cascade
> val acc = sc.longAccumulator("acc")
> val parent1DS = spark
> .createDataset(Seq("a", "b", "c"))
> .withColumn("id", monotonically_increasing_id)
> .as[a]
> .persist(storageLevel)
> val parent2DS = spark
> .createDataset(Seq(1, 2, 3)) // 0,1,2 doesn't cascade
> .persist(storageLevel)
> val childDS = parent1DS
> .joinWith(parent2DS, parent1DS("id") === parent2DS("value"))
> .map(i =>
> { acc.add(1) i }
> ).persist(storageLevel)
> childDS.count
> parent1DS.unpersist
> childDS.count
> acc.value should be(2)
> ```
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org