You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Shaji U <sh...@hotmail.com> on 2020/01/10 16:04:10 UTC

FW: [Spark Structured Streaming] Getting all the data in flatMapGroup

I have a scenario in which we need to calculate 'charges' for a stream of events which has the following details:



1. Event contains eventTime, facet, units

2. Free quantity per facet that needs to offset the earliest events based on the eventTime

3. Prices are also specified per facet

4. Events that arrive in a single minute can be considered equivalent (for reduced state maintenance) and all of them need to have free units proportionally distributed



I was hoping to make it the work in the following manner using spark structured streaming



1. Aggregate events at a minute level per facet using the window function per facet

2. Join with the price and free quantity

3. Group by facet

4. flatMapGroup by facet to then sort the aggregation by window start time, apply the results



what I am noticing is that the output of #4 is just the aggregation for which new events came in and not all the aggregation since the watermark.

**Qn: How can I fix this code to get all aggregation since the watermark or from a previous wh?**



Could someone help? A sample code is below…

Thanks



---------------

package test



import java.sql.Timestamp

import java.util.UUID

import org.apache.spark.sql.{Row, SparkSession}

import org.apache.spark.sql.functions.udf

import org.apache.spark.sql.streaming.{GroupState, GroupStateTimeout, OutputMode, Trigger}

import org.apache.spark.streaming.{Seconds, StreamingContext}



case class UsageEvent(Id: String, facetId: String, Units: Double, timeStamp:Timestamp)

case class FacetPricePoints(facetId: String, Price: Double, FreeUnits: Double)

case class UsageBlock(facetId: String, start:Timestamp, Units: Double)

case class UsageBlockWithPrice(facetId: String, start:Timestamp, Units: Double, Price: Double, FreeUnits: Double)

case class UsageBlockWithCharge(facetId: String, start:Timestamp, Units: Double, Price: Double, FreeUnits: Double, ChargedUnits: Double, Charge: Double)



object TestProcessing {

  def getUsageEventStream(ts: Timestamp, units: String) : UsageEvent = {UsageEvent(UUID.randomUUID().toString, "F1", units.toInt % 20, ts)}

  implicit def ordered: Ordering[Timestamp] = new Ordering[Timestamp] {def compare(x: Timestamp, y: Timestamp): Int = x compareTo y}

  def ChargeUsageBlock(Key: String, Value: Iterator[UsageBlockWithPrice]) : Iterator[UsageBlockWithCharge] =

  {

    val usageBlocks = Value.toList.sortBy(ub => ub.start)

    var freeUnits = 0.0

    var freeUnitsSet = false

    var newUe = for (ue <- usageBlocks)

      yield {

        freeUnits = if (!freeUnitsSet) ue.FreeUnits else freeUnits

        freeUnitsSet = true

        val freeUnitsInBlock = if (freeUnits > ue.Units) ue.Units else freeUnits

        val chargedUnits = ue.Units - freeUnitsInBlock

        freeUnits -= freeUnitsInBlock // todo: need to specify precision and rounding

        UsageBlockWithCharge(ue.facetId, ue.start, ue.Units, ue.Price, freeUnitsInBlock, chargedUnits, chargedUnits * ue.Price)

      }

    newUe.iterator

  }

  def main(args: Array[String]): Unit = {

    val spark = SparkSession.builder.appName("Test").getOrCreate()

    val stream = spark.readStream.format("rate").option("rowsPerSecond", 1).load()

    import spark.implicits._

    val ssc = new StreamingContext(spark.sparkContext, Seconds(10))

    val prices = ssc.sparkContext.parallelize(List( FacetPricePoints("F1", 30.0, 100.0))).toDF()



    val getUsageEventStreamUDF = udf((ts: Timestamp, units: String) => getUsageEventStream(ts, units)) // .where($"Units" < 2).

    val usageEventsRaw = stream.withColumn("Usage", getUsageEventStreamUDF(stream("timestamp"), stream("value"))).select("Usage.*").as[UsageEvent].dropDuplicates("Id").withWatermark("timeStamp", "1 hour")

    val aggUsage = usageEventsRaw.groupBy($"facetId", window($"timeStamp", "1 minute")).agg(sum($"Units") as "Units").selectExpr("facetId", "window.start", "Units").as[UsageBlock]

    val fifoRate = (Key: String, Value: Iterator[UsageBlockWithPrice]) => { ChargeUsageBlock(Key, Value) }

    val aggUsageCharge = aggUsage.joinWith(prices, prices.col("facetId") === usageEventsRaw.col("facetId")).select("_1.*", "_2.Price", "_2.FreeUnits").as[UsageBlockWithPrice].groupByKey(x => x.facetId).flatMapGroups(fifoRate).withWatermark("start", "1 hour")

    val fin = aggUsageCharge.writeStream.trigger((Trigger.ProcessingTime("10 seconds"))).outputMode(OutputMode.Update).format("console").start()

                // this applies freeUnits for every minute instead of just applying it once

    fin.awaitTermination()

    ssc.start()

    ssc.awaitTermination()

  }

}






RE: [Spark Structured Streaming] Getting all the data in flatMapGroup

Posted by Shaji U <sh...@hotmail.com>.
Let me rephrase the question more succinctly.

I have a

  1.  Aggregation (sum) with a groupBy on a column (facetId) and a eventtime window (1 minute) followed by
  2.  groupByKey on the column and flatMapGroup

I am expecting the flatMapGroup function to be called with all the time windows when anything changes but I am seeing flatMapGroup only called with changed windows.

Is that expected? If so, is there a way to workaround it – short of writing a custom aggregation function myself

Thanks,
-Shaji

From: Shaji U<ma...@hotmail.com>
Sent: Friday, January 10, 2020 8:04 AM
To: user@spark.apache.org<ma...@spark.apache.org>
Subject: FW: [Spark Structured Streaming] Getting all the data in flatMapGroup

I have a scenario in which we need to calculate 'charges' for a stream of events which has the following details:



1. Event contains eventTime, facet, units

2. Free quantity per facet that needs to offset the earliest events based on the eventTime

3. Prices are also specified per facet

4. Events that arrive in a single minute can be considered equivalent (for reduced state maintenance) and all of them need to have free units proportionally distributed



I was hoping to make it the work in the following manner using spark structured streaming



1. Aggregate events at a minute level per facet using the window function per facet

2. Join with the price and free quantity

3. Group by facet

4. flatMapGroup by facet to then sort the aggregation by window start time, apply the results



what I am noticing is that the output of #4 is just the aggregation for which new events came in and not all the aggregation since the watermark.

**Qn: How can I fix this code to get all aggregation since the watermark or from a previous wh?**



Could someone help? A sample code is below…

Thanks



---------------

package test



import java.sql.Timestamp

import java.util.UUID

import org.apache.spark.sql.{Row, SparkSession}

import org.apache.spark.sql.functions.udf

import org.apache.spark.sql.streaming.{GroupState, GroupStateTimeout, OutputMode, Trigger}

import org.apache.spark.streaming.{Seconds, StreamingContext}



case class UsageEvent(Id: String, facetId: String, Units: Double, timeStamp:Timestamp)

case class FacetPricePoints(facetId: String, Price: Double, FreeUnits: Double)

case class UsageBlock(facetId: String, start:Timestamp, Units: Double)

case class UsageBlockWithPrice(facetId: String, start:Timestamp, Units: Double, Price: Double, FreeUnits: Double)

case class UsageBlockWithCharge(facetId: String, start:Timestamp, Units: Double, Price: Double, FreeUnits: Double, ChargedUnits: Double, Charge: Double)



object TestProcessing {

  def getUsageEventStream(ts: Timestamp, units: String) : UsageEvent = {UsageEvent(UUID.randomUUID().toString, "F1", units.toInt % 20, ts)}

  implicit def ordered: Ordering[Timestamp] = new Ordering[Timestamp] {def compare(x: Timestamp, y: Timestamp): Int = x compareTo y}

  def ChargeUsageBlock(Key: String, Value: Iterator[UsageBlockWithPrice]) : Iterator[UsageBlockWithCharge] =

  {

    val usageBlocks = Value.toList.sortBy(ub => ub.start)

    var freeUnits = 0.0

    var freeUnitsSet = false

    var newUe = for (ue <- usageBlocks)

      yield {

        freeUnits = if (!freeUnitsSet) ue.FreeUnits else freeUnits

        freeUnitsSet = true

        val freeUnitsInBlock = if (freeUnits > ue.Units) ue.Units else freeUnits

        val chargedUnits = ue.Units - freeUnitsInBlock

        freeUnits -= freeUnitsInBlock // todo: need to specify precision and rounding

        UsageBlockWithCharge(ue.facetId, ue.start, ue.Units, ue.Price, freeUnitsInBlock, chargedUnits, chargedUnits * ue.Price)

      }

    newUe.iterator

  }

  def main(args: Array[String]): Unit = {

    val spark = SparkSession.builder.appName("Test").getOrCreate()

    val stream = spark.readStream.format("rate").option("rowsPerSecond", 1).load()

    import spark.implicits._

    val ssc = new StreamingContext(spark.sparkContext, Seconds(10))

    val prices = ssc.sparkContext.parallelize(List( FacetPricePoints("F1", 30.0, 100.0))).toDF()



    val getUsageEventStreamUDF = udf((ts: Timestamp, units: String) => getUsageEventStream(ts, units)) // .where($"Units" < 2).

    val usageEventsRaw = stream.withColumn("Usage", getUsageEventStreamUDF(stream("timestamp"), stream("value"))).select("Usage.*").as[UsageEvent].dropDuplicates("Id").withWatermark("timeStamp", "1 hour")

    val aggUsage = usageEventsRaw.groupBy($"facetId", window($"timeStamp", "1 minute")).agg(sum($"Units") as "Units").selectExpr("facetId", "window.start", "Units").as[UsageBlock]

    val fifoRate = (Key: String, Value: Iterator[UsageBlockWithPrice]) => { ChargeUsageBlock(Key, Value) }

    val aggUsageCharge = aggUsage.joinWith(prices, prices.col("facetId") === usageEventsRaw.col("facetId")).select("_1.*", "_2.Price", "_2.FreeUnits").as[UsageBlockWithPrice].groupByKey(x => x.facetId).flatMapGroups(fifoRate).withWatermark("start", "1 hour")

    val fin = aggUsageCharge.writeStream.trigger((Trigger.ProcessingTime("10 seconds"))).outputMode(OutputMode.Update).format("console").start()

                // this applies freeUnits for every minute instead of just applying it once

    fin.awaitTermination()

    ssc.start()

    ssc.awaitTermination()

  }

}