You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Murugesan Guruswamy (JIRA)" <ji...@apache.org> on 2016/07/25 15:48:20 UTC

[jira] [Commented] (SPARK-10069) Python's ReduceByKeyAndWindow DStream Keeps Growing

    [ https://issues.apache.org/jira/browse/SPARK-10069?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15392138#comment-15392138 ] 

Murugesan Guruswamy commented on SPARK-10069:
---------------------------------------------

I am seeing the same problem with the window growing when I do a reduceByKeyAndWindow on a windowed stream in a yarn cluster mode in EMR. Is this problem fixed in pyspark?

> Python's ReduceByKeyAndWindow DStream Keeps Growing
> ---------------------------------------------------
>
>                 Key: SPARK-10069
>                 URL: https://issues.apache.org/jira/browse/SPARK-10069
>             Project: Spark
>          Issue Type: Bug
>          Components: PySpark
>    Affects Versions: 1.4.1
>            Reporter: Asim Jalis
>
> When I use reduceByKeyAndWindow with func and invFunc (in PySpark) the size of the window keeps growing. I am appending the code that reproduces this issue. This prints out the count() of the dstream which goes up every batch by 10 elements. 
> Is this a bug in the Python version of Scala or is this expected behavior?
> Here is the code that reproduces this issue.
> {code}
> from pyspark import SparkContext
> from pyspark.streaming import StreamingContext
> from pprint import pprint
> print 'Initializing ssc'
> ssc = StreamingContext(SparkContext(), batchDuration=1)
> ssc.checkpoint('ckpt')
> ds = ssc.textFileStream('input') \
>     .map(lambda event: (event,1)) \
>     .reduceByKeyAndWindow(
>         func=lambda count1,count2: count1+count2,
>         invFunc=lambda count1,count2: count1-count2,
>         windowDuration=10,
>         slideDuration=2)
> ds.pprint()
> ds.count().pprint()
> print 'Starting ssc'
> ssc.start()
> import itertools
> import time
> import random
> from distutils import dir_util 
> def batch_write(batch_data, batch_file_path):
>     with open(batch_file_path,'w') as batch_file: 
>         for element in batch_data:
>             line = str(element) + "\n"
>             batch_file.write(line)
> def xrange_write(
>         batch_size = 5,
>         batch_dir = 'input',
>         batch_duration = 1):
>     '''Every batch_duration write a file with batch_size numbers,
>     forever. Start at 0 and keep incrementing. Intended for testing
>     Spark Streaming code.'''
>     dir_util.mkpath('./input')
>     for i in itertools.count():
>         min = batch_size * i 
>         max = batch_size * (i + 1)
>         batch_data = xrange(min,max)
>         file_path = batch_dir + '/' + str(i)
>         batch_write(batch_data, file_path)
>         time.sleep(batch_duration)
> print 'Feeding data to app'
> xrange_write()
>  
> ssc.awaitTermination()
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org