You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Nipuna Shantha <ni...@gmail.com> on 2023/05/23 18:13:32 UTC
Incremental Value dependents on another column of Data frame Spark
Hi all,
This is the sample set of data that I used for this task
[image: image.png]
My expected output is as below
[image: image.png]
My scenario is if Type is M01 the count should be 0 and if Type is M02 it
should be incremented from 1 or 0 until the sequence of M02 is finished.
Imagine this as a partition so row numbers cannot jumble. So can you guys
suggest a method to this scenario. Also for your concern this dataset is
really large; it has around 100000000 records and I am using spark with
scala
Thank You,
Best Regards
<https://www.avast.com/sig-email?utm_medium=email&utm_source=link&utm_campaign=sig-email&utm_content=webmail>
Virus-free.www.avast.com
<https://www.avast.com/sig-email?utm_medium=email&utm_source=link&utm_campaign=sig-email&utm_content=webmail>
<#DAB4FAD8-2DD7-40BB-A1B8-4E2AA1F9FDF2>
Re: Incremental Value dependents on another column of Data frame Spark
Posted by Enrico Minack <in...@enrico.minack.dev>.
Hi,
given your dataset:
val df=Seq(
(1, 20230523, "M01"), (2, 20230523, "M01"), (3, 20230523, "M01"), (4, 20230523, "M02"), (5, 20230523, "M02"), (6, 20230523, "M02"), (7, 20230523, "M01"), (8, 20230523, "M01"), (9, 20230523, "M02"), (10, 20230523, "M02"), (11, 20230523, "M02"), (12, 20230523, "M01"), (13, 20230523, "M02")
).toDF("Row", "Date", "Type")
the simplest you can get with column functions is this:
import org.apache.spark.sql.expressions.Window
val dateWindow=Window.partitionBy("Date").orderBy("Row")
val batchWindow=Window.partitionBy("Date", "batch").orderBy("Row")
df.withColumn("inc", when($"Type" =!=lag($"Type", 1).over(dateWindow), lit(1)).otherwise(lit(0)))
.withColumn("batch", sum($"inc").over(dateWindow))
.withColumn("count", when($"Type" ==="M01", lit(0)).otherwise(row_number().over(batchWindow)))
.show
This creates:
+---+--------+----+---+-----+-----+
|Row| Date|Type|inc|batch|count|
+---+--------+----+---+-----+-----+
| 1|20230523| M01| 0| 0| 0|
| 2|20230523| M01| 0| 0| 0|
| 3|20230523| M01| 0| 0| 0|
| 4|20230523| M02| 1| 1| 1|
| 5|20230523| M02| 0| 1| 2|
| 6|20230523| M02| 0| 1| 3|
| 7|20230523| M01| 1| 2| 0|
| 8|20230523| M01| 0| 2| 0|
| 9|20230523| M02| 1| 3| 1|
| 10|20230523| M02| 0| 3| 2|
| 11|20230523| M02| 0| 3| 3|
| 12|20230523| M01| 1| 4| 0|
| 13|20230523| M02| 1| 5| 1|
+---+--------+----+---+-----+-----+
Column "inc" is used to split the partition into batches of same
consecutive types. Column "batch" gives rows of those batches a unique
ids. For each of those batch, we can use row_number to create count, but
for "M01" we set "count" to 0.
The query plan does not look too bad, no extra shuffle involved.
Raghavendra proposed to iterate the partitions. I presume that you
partition by "Date" and order within partition by "Row", which puts
multiple dates into one partition. Even if you have one date per
partition, AQE might coalesce partitions into bigger ones. This can can
get you into trouble when a date starts with "M02".
You could group your dataset by "Date" and process the individual sorted
groups (requires Spark 3.4.0). This way, you still partition by "Date"
but process only individual dates, as proposed by Raghavendra:
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.IntegerType
import org.apache.spark.sql.catalyst.encoders.RowEncoder val groups =df.groupByKey(_.getInt(1))
groups.flatMapSortedGroups($"Row") {case (_:Int, rows:Iterator[Row]) =>
var count =0 rows.map {row =>
if (row.getString(2) =="M01") {
count =0 }else {
count +=1 }
Row.fromSeq(row.toSeq :+count)
}
}(RowEncoder(df.schema.add("count", IntegerType))).show
Cheers,
Enrico
Am 23.05.23 um 20:13 schrieb Nipuna Shantha:
> Hi all,
>
> This is the sample set of data that I used for this task
>
> image.png
>
> My expected output is as below
>
> image.png
>
> My scenario is if Type is M01 the count should be 0 and if Type is M02
> it should be incremented from 1 or 0 until the sequence of M02 is
> finished. Imagine this as a partition so row numbers cannot jumble. So
> can you guys suggest a method to this scenario. Also for your concern
> this dataset is really large; it has around 100000000 records and I am
> using spark with scala
>
> Thank You,
> Best Regards
>
> <https://www.avast.com/sig-email?utm_medium=email&utm_source=link&utm_campaign=sig-email&utm_content=webmail>
> Virus-free.www.avast.com
> <https://www.avast.com/sig-email?utm_medium=email&utm_source=link&utm_campaign=sig-email&utm_content=webmail>
>
>
Re: Incremental Value dependents on another column of Data frame Spark
Posted by Raghavendra Ganesh <ra...@gmail.com>.
Given, you are already stating the above can be imagined as a partition, I
can think of mapPartitions iterator.
val inputSchema = inputDf.schema
val outputRdd = inputDf.rdd.mapPartitions(rows => new SomeClass(rows))
val outputDf = sparkSession.createDataFrame(outputRdd,
inputSchema.add("counter", IntegerType))
}
class SomeClass(rows: Iterator[Row]) extends Iterator[Row] {
var counter: Int = 0
override def hasNext: Boolean = rows.hasNext
override def next(): Row = {
val row = rows.next()
val rowType:String = row.getAs[String]("Type")
if(rowType == "M01")
counter = 0
else
counter += 1
Row.fromSeq(row.toSeq ++ Seq(counter))
}
}
--
Raghavendra
On Tue, May 23, 2023 at 11:44 PM Nipuna Shantha <ni...@gmail.com>
wrote:
> Hi all,
>
> This is the sample set of data that I used for this task
>
> [image: image.png]
>
> My expected output is as below
>
> [image: image.png]
>
> My scenario is if Type is M01 the count should be 0 and if Type is M02 it
> should be incremented from 1 or 0 until the sequence of M02 is finished.
> Imagine this as a partition so row numbers cannot jumble. So can you guys
> suggest a method to this scenario. Also for your concern this dataset is
> really large; it has around 100000000 records and I am using spark with
> scala
>
> Thank You,
> Best Regards
>
>
> <https://www.avast.com/sig-email?utm_medium=email&utm_source=link&utm_campaign=sig-email&utm_content=webmail>
> Virus-free.www.avast.com
> <https://www.avast.com/sig-email?utm_medium=email&utm_source=link&utm_campaign=sig-email&utm_content=webmail>
> <#m_4627475067266622656_DAB4FAD8-2DD7-40BB-A1B8-4E2AA1F9FDF2>
>