You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by vibhatha <vi...@gmail.com> on 2019/10/21 21:15:09 UTC

Streaming Reduction on a stream of Arrays

Hi, 

I have an issue is reduce in streaming. I don't get a reduced stream when I
use a custom object. 

Here is the code snippet that I used to test this. 

Issue is, the reduction clearly works for a simple sequence, but what I
really want is to do is 
send an array by adding such an array at a time to the queue and process
them in parallel.

For instance I send the following set of sequences

DataObject(t1, Seq(0,1,2,3))
DataObject(t2, Seq(0,1,2,4))
DataObject(t3, Seq(0,1,2,5))
DataObject(t4, Seq(0,1,2,1))
.
.
.
DataObject(t5, Seq(0,1,2,2))

Let's say the minibatch size is 1 millisecond and it get's 3 elements to
that batch. 

DataObject(t2, Seq(0,1,2,4))
DataObject(t3, Seq(0,1,2,5))
DataObject(t4, Seq(0,1,2,1))

After reduction what I expect is 

DataObject(t2, Seq(0,3,6,10))


But in my code, I always get a non-reduced value. I am new to Spark
streaming, but I have used Batch processing for a while now. There is an
issue with my code. 

I need some input to find what is missing? 

Thank you. 


------------------------------------------------------------------------------------
Code
------------------------------------------------------------------------------------

import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.{Milliseconds, Seconds, StreamingContext}
import org.apache.spark.SparkConf

import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer

class Reduce(paralelizm: Int, size: Int, iterations: Int, batchSize: Int,
queueDelay: Int, logPath: String) extends Serializable {

  def execute(): Unit = {
    val conf = new SparkConf().setAppName("sparkperformance_reduce")
    val ssc = new StreamingContext(conf, Seconds(batchSize))
    ssc.sparkContext.setLogLevel("ERROR")
    

    // do work start
    //var status = streamSeq(ssc, paralelizm, size, iterations, queueDelay)
    streamDataObject(ssc, paralelizm, size, iterations, queueDelay)
    //streamQ(ssc, iterations, paralelizm)
    // do work stop
    
    ssc.start()
    ssc.awaitTermination()
  }

  def streamSeq(ssc: StreamingContext, parallelizm: Int, size: Int,
iterations: Int, queueDelay: Int): Boolean = {
    var status = false
    val rddQueu = new mutable.Queue[RDD[Int]]()
    val inputStream = ssc.queueStream(rddQueu)
    val result = inputStream.reduce((x, y) => x + y)
    result.print()
    for (i <- 1 to iterations) {
      rddQueu += ssc.sparkContext.makeRDD(1 to size * parallelizm,
parallelizm)
      if (i == iterations) {
        status = true
      }
      Thread.sleep(queueDelay)
    }
    status
  }

  def streamDataObject(ssc: StreamingContext, parallelizm: Int, size: Int,
iterations: Int, queueDelay: Int): Unit = {

    val rddQueue = new mutable.Queue[RDD[DataObject]]
    val inputStream = ssc.queueStream(rddQueue)
    val reduceResult  = inputStream.reduce(update)
    reduceResult.foreachRDD((rdd) => rdd.foreach((p) => println(p.x + ":" +
p.y)))
    //inputStream.count().print()
    reduceResult.count().print()

    var count = 0
    for (i <- 1 to iterations) {
      val dataObject = new DataObject(System.currentTimeMillis(), 1 to size
* parallelizm)
      val seq1: Seq[DataObject] = Seq[DataObject](dataObject)
      rddQueue += ssc.sparkContext.makeRDD(seq1)
      Thread.sleep(queueDelay)
    }

  }

  def update(a: DataObject, b: DataObject): DataObject = {
    a.y = Math.min(a.y, b.y)
    a.x = a.x.zip(b.x).map { case (x, y) => x + y }
  }

  def printDataObject(dataObject: DataObject): Unit = {
    println(dataObject.y + ":::" + dataObject.x)
  }


  class DataObject(yc: Long, xc: Seq[Int]) extends Serializable {
    var y: Long = yc
    var x: Seq[Int] = xc

    def update(dataObject: DataObject): DataObject = {
      this.y = Math.min(y, dataObject.y)
      this.x = 100 to 116
      this
    }
  }

}



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscribe@spark.apache.org