You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Asim Jalis (JIRA)" <ji...@apache.org> on 2015/08/18 01:12:45 UTC
[jira] [Created] (SPARK-10069) Python's ReduceByKeyAndWindow
DStream Keeps Growing
Asim Jalis created SPARK-10069:
----------------------------------
Summary: Python's ReduceByKeyAndWindow DStream Keeps Growing
Key: SPARK-10069
URL: https://issues.apache.org/jira/browse/SPARK-10069
Project: Spark
Issue Type: Bug
Components: PySpark
Affects Versions: 1.4.1
Reporter: Asim Jalis
When I use reduceByKeyAndWindow with func and invFunc (in PySpark) the size of the window keeps growing. I am appending the code that reproduces this issue. This prints out the count() of the dstream which goes up every batch by 10 elements.
Is this a bug in the Python version of Scala or is this expected behavior?
Here is the code that reproduces this issue.
{code}
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pprint import pprint
print 'Initializing ssc'
ssc = StreamingContext(SparkContext(), batchDuration=1)
ssc.checkpoint('ckpt')
ds = ssc.textFileStream('input') \
.map(lambda event: (event,1)) \
.reduceByKeyAndWindow(
func=lambda count1,count2: count1+count2,
invFunc=lambda count1,count2: count1-count2,
windowDuration=10,
slideDuration=2)
ds.pprint()
ds.count().pprint()
print 'Starting ssc'
ssc.start()
import itertools
import time
import random
from distutils import dir_util
def batch_write(batch_data, batch_file_path):
with open(batch_file_path,'w') as batch_file:
for element in batch_data:
line = str(element) + "\n"
batch_file.write(line)
def xrange_write(
batch_size = 5,
batch_dir = 'input',
batch_duration = 1):
'''Every batch_duration write a file with batch_size numbers,
forever. Start at 0 and keep incrementing. Intended for testing
Spark Streaming code.'''
dir_util.mkpath('./input')
for i in itertools.count():
min = batch_size * i
max = batch_size * (i + 1)
batch_data = xrange(min,max)
file_path = batch_dir + '/' + str(i)
batch_write(batch_data, file_path)
time.sleep(batch_duration)
print 'Feeding data to app'
xrange_write()
ssc.awaitTermination()
{code}
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org