You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Ilias Karalis (JIRA)" <ji...@apache.org> on 2019/05/31 07:17:00 UTC
[jira] [Created] (SPARK-27895) Spark streaming - RDD filter is
always refreshing providing updated filtered items
Ilias Karalis created SPARK-27895:
-------------------------------------
Summary: Spark streaming - RDD filter is always refreshing providing updated filtered items
Key: SPARK-27895
URL: https://issues.apache.org/jira/browse/SPARK-27895
Project: Spark
Issue Type: Bug
Components: DStreams
Affects Versions: 2.4.3, 2.4.2, 2.4.0
Environment: Intellij, running local in windows10 laptop.
Reporter: Ilias Karalis
Spark streaming: 2.4.x
Scala: 2.11.11
foreachRDD of DStream,
in case filter is used on RDD then filter is always refreshing, providing new results continuously until new batch is processed. For the new batch, the same occurs.
With the same code, if we do rdd.collect() and then run the filter on the collection, we get just one time results, which remains stable until new batch is coming in.
Filter function is based on random probability (reservoir sampling).
{color:#000080}val {color}toSampleRDD: RDD[(Long, Long)] = inputRdd.filter(x=> chooseX(x) )
{color:#000080}def {color}chooseX (x:(Long, Long)) : Boolean = {
{color:#808080}
{color} {color:#000080}val {color}r = scala.util.Random
{color:#000080}val {color}p = r.nextFloat()
edgeTotalCounter += {color:#0000ff}1
{color} {color:#808080}
{color} {color:#000080}if {color}(p < (sampleLength.toFloat / edgeTotalCounter.toFloat)) {
edgeLocalRDDCounter += {color:#0000ff}1
{color} println({color:#008000}"Edge " {color}+x + {color:#008000}" has been selected and is number : " {color}+ edgeLocalRDDCounter +{color:#008000}"."{color})
{color:#000080}true
{color} }
{color:#000080}else
{color}{color:#000080} false
{color}}
edgeLocalRDDCounter counts selected edges from inputRDD.
Strange is that the counter is increased 1st time from 1 to y, then filter continues to run unexpectedly again and the counter is increased again starting from y+1 to z. After that each time filter unexpectedly continues to run, it provides results for which the counter starts from y+1. Each time filter runs provides different results and filters different number of edges.
toSampleRDD always changes accordingly to new provided results.
When new batch is coming in then it starts the same behavior for the new batch.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org