You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Hyukjin Kwon (JIRA)" <ji...@apache.org> on 2019/06/03 01:29:00 UTC

[jira] [Commented] (SPARK-27895) Spark streaming - RDD filter is always refreshing providing updated filtered items

    [ https://issues.apache.org/jira/browse/SPARK-27895?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16854153#comment-16854153 ] 

Hyukjin Kwon commented on SPARK-27895:
--------------------------------------

Do you mean the results are not deterministic from random operation?

> Spark streaming - RDD filter is always refreshing providing updated filtered items
> ----------------------------------------------------------------------------------
>
>                 Key: SPARK-27895
>                 URL: https://issues.apache.org/jira/browse/SPARK-27895
>             Project: Spark
>          Issue Type: Bug
>          Components: DStreams
>    Affects Versions: 2.4.0, 2.4.2, 2.4.3
>         Environment: Intellij, running local in windows10 laptop.
>  
>            Reporter: Ilias Karalis
>            Priority: Major
>
> Spark streaming: 2.4.x
> Scala: 2.11.11
>  
> foreachRDD of DStream,
> in case filter is used on RDD then filter is always refreshing, providing new results continuously until new batch is processed. For the new batch, the same occurs.
> With the same code, if we do rdd.collect() and then run the filter on the collection, we get just one time results, which remains stable until new batch is coming in.
> Filter function is based on random probability (reservoir sampling).
>  
> {color:#000080}val {color}toSampleRDD: RDD[(Long, Long)] = inputRdd.filter(x=> chooseX(x) )
>  
> {color:#000080}def {color}chooseX (x:(Long, Long)) : Boolean = {
> {color:#808080}
> {color} {color:#000080}val {color}r = scala.util.Random
>  {color:#000080}val {color}p = r.nextFloat()
>  edgeTotalCounter += {color:#0000ff}1
> {color} {color:#808080}
> {color} {color:#000080}if {color}(p < (sampleLength.toFloat / edgeTotalCounter.toFloat)) {
>  edgeLocalRDDCounter += {color:#0000ff}1
> {color} println({color:#008000}"Edge " {color}+x + {color:#008000}" has been selected and is number : " {color}+ edgeLocalRDDCounter +{color:#008000}"."{color})
>  {color:#000080}true
> {color} }
>  {color:#000080}else
> {color}{color:#000080} false
> {color}}
>  
> edgeLocalRDDCounter counts selected edges from inputRDD.
> Strange is that the counter is increased 1st time from 1 to y, then filter continues to run unexpectedly again and the counter is increased again starting from y+1 to z. After that each time filter unexpectedly continues to run, it provides results for which the counter starts from y+1. Each time filter runs provides different results and filters different number of edges.
> toSampleRDD always changes accordingly to new provided results.
> When new batch is coming in then it starts the same behavior for the new batch.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org