You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Nipuna Shantha <ni...@gmail.com> on 2023/05/23 18:13:32 UTC

Incremental Value dependents on another column of Data frame Spark

Hi all,

This is the sample set of data that I used for this task

[image: image.png]

My expected output is as below

[image: image.png]

My scenario is if Type is M01 the count should be 0 and if Type is M02 it
should be incremented from 1 or 0 until the sequence of M02 is finished.
Imagine this as a partition so row numbers cannot jumble. So can you guys
suggest a method to this scenario. Also for your concern this dataset is
really large; it has around 100000000 records and I am using spark with
scala

Thank You,
Best Regards

<https://www.avast.com/sig-email?utm_medium=email&utm_source=link&utm_campaign=sig-email&utm_content=webmail>
Virus-free.www.avast.com
<https://www.avast.com/sig-email?utm_medium=email&utm_source=link&utm_campaign=sig-email&utm_content=webmail>
<#DAB4FAD8-2DD7-40BB-A1B8-4E2AA1F9FDF2>

Re: Incremental Value dependents on another column of Data frame Spark

Posted by Enrico Minack <in...@enrico.minack.dev>.
Hi,

given your dataset:

val df=Seq(
   (1, 20230523, "M01"), (2, 20230523, "M01"), (3, 20230523, "M01"), (4, 20230523, "M02"), (5, 20230523, "M02"), (6, 20230523, "M02"), (7, 20230523, "M01"), (8, 20230523, "M01"), (9, 20230523, "M02"), (10, 20230523, "M02"), (11, 20230523, "M02"), (12, 20230523, "M01"), (13, 20230523, "M02")
).toDF("Row", "Date", "Type")

the simplest you can get with column functions is this:

import org.apache.spark.sql.expressions.Window
val dateWindow=Window.partitionBy("Date").orderBy("Row")
val batchWindow=Window.partitionBy("Date", "batch").orderBy("Row")
df.withColumn("inc", when($"Type" =!=lag($"Type", 1).over(dateWindow), lit(1)).otherwise(lit(0)))
   .withColumn("batch", sum($"inc").over(dateWindow))
   .withColumn("count", when($"Type" ==="M01", lit(0)).otherwise(row_number().over(batchWindow)))
   .show

This creates:

+---+--------+----+---+-----+-----+
|Row|    Date|Type|inc|batch|count|
+---+--------+----+---+-----+-----+
|  1|20230523| M01|  0|    0|    0|
|  2|20230523| M01|  0|    0|    0|
|  3|20230523| M01|  0|    0|    0|
|  4|20230523| M02|  1|    1|    1|
|  5|20230523| M02|  0|    1|    2|
|  6|20230523| M02|  0|    1|    3|
|  7|20230523| M01|  1|    2|    0|
|  8|20230523| M01|  0|    2|    0|
|  9|20230523| M02|  1|    3|    1|
| 10|20230523| M02|  0|    3|    2|
| 11|20230523| M02|  0|    3|    3|
| 12|20230523| M01|  1|    4|    0|
| 13|20230523| M02|  1|    5|    1|
+---+--------+----+---+-----+-----+

Column "inc" is used to split the partition into batches of same 
consecutive types. Column "batch" gives rows of those batches a unique 
ids. For each of those batch, we can use row_number to create count, but 
for "M01" we set "count" to 0.

The query plan does not look too bad, no extra shuffle involved.


Raghavendra proposed to iterate the partitions. I presume that you 
partition by "Date" and order within partition by "Row", which puts 
multiple dates into one partition. Even if you have one date per 
partition, AQE might coalesce partitions into bigger ones. This can can 
get you into trouble when a date starts with "M02".

You could group your dataset by "Date" and process the individual sorted 
groups (requires Spark 3.4.0). This way, you still partition by "Date" 
but process only individual dates, as proposed by Raghavendra:

import org.apache.spark.sql.Row
import org.apache.spark.sql.types.IntegerType
import org.apache.spark.sql.catalyst.encoders.RowEncoder val groups  =df.groupByKey(_.getInt(1))
groups.flatMapSortedGroups($"Row") {case (_:Int, rows:Iterator[Row]) =>
   var count =0 rows.map {row =>
     if (row.getString(2) =="M01") {
       count =0 }else {
       count +=1 }
     Row.fromSeq(row.toSeq :+count)
   }
}(RowEncoder(df.schema.add("count", IntegerType))).show

Cheers,
Enrico


Am 23.05.23 um 20:13 schrieb Nipuna Shantha:
> Hi all,
>
> This is the sample set of data that I used for this task
>
> image.png
>
> My expected output is as below
>
> image.png
>
> My scenario is if Type is M01 the count should be 0 and if Type is M02 
> it should be incremented from 1 or 0 until the sequence of M02 is 
> finished. Imagine this as a partition so row numbers cannot jumble. So 
> can you guys suggest a method to this scenario. Also for your concern 
> this dataset is really large; it has around 100000000 records and I am 
> using spark with scala
>
> Thank You,
> Best Regards
>
> <https://www.avast.com/sig-email?utm_medium=email&utm_source=link&utm_campaign=sig-email&utm_content=webmail> 
> 	Virus-free.www.avast.com 
> <https://www.avast.com/sig-email?utm_medium=email&utm_source=link&utm_campaign=sig-email&utm_content=webmail> 
>
>

Re: Incremental Value dependents on another column of Data frame Spark

Posted by Raghavendra Ganesh <ra...@gmail.com>.
Given, you are already stating the above can be imagined as a partition, I
can think of mapPartitions iterator.

  val inputSchema = inputDf.schema
  val outputRdd = inputDf.rdd.mapPartitions(rows => new SomeClass(rows))
  val outputDf = sparkSession.createDataFrame(outputRdd,
inputSchema.add("counter", IntegerType))
}

class SomeClass(rows: Iterator[Row]) extends Iterator[Row] {
  var counter: Int = 0
  override def hasNext: Boolean = rows.hasNext

  override def next(): Row = {
    val row = rows.next()
    val rowType:String = row.getAs[String]("Type")
    if(rowType == "M01")
      counter = 0
    else
      counter += 1
    Row.fromSeq(row.toSeq ++ Seq(counter))
  }
}

--
Raghavendra


On Tue, May 23, 2023 at 11:44 PM Nipuna Shantha <ni...@gmail.com>
wrote:

> Hi all,
>
> This is the sample set of data that I used for this task
>
> [image: image.png]
>
> My expected output is as below
>
> [image: image.png]
>
> My scenario is if Type is M01 the count should be 0 and if Type is M02 it
> should be incremented from 1 or 0 until the sequence of M02 is finished.
> Imagine this as a partition so row numbers cannot jumble. So can you guys
> suggest a method to this scenario. Also for your concern this dataset is
> really large; it has around 100000000 records and I am using spark with
> scala
>
> Thank You,
> Best Regards
>
>
> <https://www.avast.com/sig-email?utm_medium=email&utm_source=link&utm_campaign=sig-email&utm_content=webmail>
> Virus-free.www.avast.com
> <https://www.avast.com/sig-email?utm_medium=email&utm_source=link&utm_campaign=sig-email&utm_content=webmail>
> <#m_4627475067266622656_DAB4FAD8-2DD7-40BB-A1B8-4E2AA1F9FDF2>
>