You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by vibhatha <vi...@gmail.com> on 2019/10/21 21:15:09 UTC
Streaming Reduction on a stream of Arrays
Hi,
I have an issue is reduce in streaming. I don't get a reduced stream when I
use a custom object.
Here is the code snippet that I used to test this.
Issue is, the reduction clearly works for a simple sequence, but what I
really want is to do is
send an array by adding such an array at a time to the queue and process
them in parallel.
For instance I send the following set of sequences
DataObject(t1, Seq(0,1,2,3))
DataObject(t2, Seq(0,1,2,4))
DataObject(t3, Seq(0,1,2,5))
DataObject(t4, Seq(0,1,2,1))
.
.
.
DataObject(t5, Seq(0,1,2,2))
Let's say the minibatch size is 1 millisecond and it get's 3 elements to
that batch.
DataObject(t2, Seq(0,1,2,4))
DataObject(t3, Seq(0,1,2,5))
DataObject(t4, Seq(0,1,2,1))
After reduction what I expect is
DataObject(t2, Seq(0,3,6,10))
But in my code, I always get a non-reduced value. I am new to Spark
streaming, but I have used Batch processing for a while now. There is an
issue with my code.
I need some input to find what is missing?
Thank you.
------------------------------------------------------------------------------------
Code
------------------------------------------------------------------------------------
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.{Milliseconds, Seconds, StreamingContext}
import org.apache.spark.SparkConf
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
class Reduce(paralelizm: Int, size: Int, iterations: Int, batchSize: Int,
queueDelay: Int, logPath: String) extends Serializable {
def execute(): Unit = {
val conf = new SparkConf().setAppName("sparkperformance_reduce")
val ssc = new StreamingContext(conf, Seconds(batchSize))
ssc.sparkContext.setLogLevel("ERROR")
// do work start
//var status = streamSeq(ssc, paralelizm, size, iterations, queueDelay)
streamDataObject(ssc, paralelizm, size, iterations, queueDelay)
//streamQ(ssc, iterations, paralelizm)
// do work stop
ssc.start()
ssc.awaitTermination()
}
def streamSeq(ssc: StreamingContext, parallelizm: Int, size: Int,
iterations: Int, queueDelay: Int): Boolean = {
var status = false
val rddQueu = new mutable.Queue[RDD[Int]]()
val inputStream = ssc.queueStream(rddQueu)
val result = inputStream.reduce((x, y) => x + y)
result.print()
for (i <- 1 to iterations) {
rddQueu += ssc.sparkContext.makeRDD(1 to size * parallelizm,
parallelizm)
if (i == iterations) {
status = true
}
Thread.sleep(queueDelay)
}
status
}
def streamDataObject(ssc: StreamingContext, parallelizm: Int, size: Int,
iterations: Int, queueDelay: Int): Unit = {
val rddQueue = new mutable.Queue[RDD[DataObject]]
val inputStream = ssc.queueStream(rddQueue)
val reduceResult = inputStream.reduce(update)
reduceResult.foreachRDD((rdd) => rdd.foreach((p) => println(p.x + ":" +
p.y)))
//inputStream.count().print()
reduceResult.count().print()
var count = 0
for (i <- 1 to iterations) {
val dataObject = new DataObject(System.currentTimeMillis(), 1 to size
* parallelizm)
val seq1: Seq[DataObject] = Seq[DataObject](dataObject)
rddQueue += ssc.sparkContext.makeRDD(seq1)
Thread.sleep(queueDelay)
}
}
def update(a: DataObject, b: DataObject): DataObject = {
a.y = Math.min(a.y, b.y)
a.x = a.x.zip(b.x).map { case (x, y) => x + y }
}
def printDataObject(dataObject: DataObject): Unit = {
println(dataObject.y + ":::" + dataObject.x)
}
class DataObject(yc: Long, xc: Seq[Int]) extends Serializable {
var y: Long = yc
var x: Seq[Int] = xc
def update(dataObject: DataObject): DataObject = {
this.y = Math.min(y, dataObject.y)
this.x = 100 to 116
this
}
}
}
--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscribe@spark.apache.org