You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Asim Jalis (JIRA)" <ji...@apache.org> on 2015/08/18 01:12:45 UTC

[jira] [Created] (SPARK-10069) Python's ReduceByKeyAndWindow DStream Keeps Growing

Asim Jalis created SPARK-10069:
----------------------------------

             Summary: Python's ReduceByKeyAndWindow DStream Keeps Growing
                 Key: SPARK-10069
                 URL: https://issues.apache.org/jira/browse/SPARK-10069
             Project: Spark
          Issue Type: Bug
          Components: PySpark
    Affects Versions: 1.4.1
            Reporter: Asim Jalis


When I use reduceByKeyAndWindow with func and invFunc (in PySpark) the size of the window keeps growing. I am appending the code that reproduces this issue. This prints out the count() of the dstream which goes up every batch by 10 elements. 

Is this a bug in the Python version of Scala or is this expected behavior?

Here is the code that reproduces this issue.

{code}
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pprint import pprint

print 'Initializing ssc'
ssc = StreamingContext(SparkContext(), batchDuration=1)
ssc.checkpoint('ckpt')

ds = ssc.textFileStream('input') \
    .map(lambda event: (event,1)) \
    .reduceByKeyAndWindow(
        func=lambda count1,count2: count1+count2,
        invFunc=lambda count1,count2: count1-count2,
        windowDuration=10,
        slideDuration=2)

ds.pprint()
ds.count().pprint()

print 'Starting ssc'
ssc.start()

import itertools
import time
import random

from distutils import dir_util 

def batch_write(batch_data, batch_file_path):
    with open(batch_file_path,'w') as batch_file: 
        for element in batch_data:
            line = str(element) + "\n"
            batch_file.write(line)

def xrange_write(
        batch_size = 5,
        batch_dir = 'input',
        batch_duration = 1):
    '''Every batch_duration write a file with batch_size numbers,
    forever. Start at 0 and keep incrementing. Intended for testing
    Spark Streaming code.'''

    dir_util.mkpath('./input')
    for i in itertools.count():
        min = batch_size * i 
        max = batch_size * (i + 1)
        batch_data = xrange(min,max)
        file_path = batch_dir + '/' + str(i)
        batch_write(batch_data, file_path)
        time.sleep(batch_duration)

print 'Feeding data to app'
xrange_write()
 
ssc.awaitTermination()
{code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org