You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by ji...@xtronica.no on 2021/05/21 19:08:57 UTC
multiple query with structured streaming in spark does not work
Hi There;
�
I am new to spark. We are using spark to develop our app for data streaming with sensor readings.
�
I am having trouble to get two queries with structured streaming working concurrently.
�
Following is the code. It can only work with one of them. Wonder if there is any way to get it doing. Appreciate help from the team.
�
Regards,
�
Jian Xu
�
�
hostName = 'localhost'
portNumber= 9999
wSize= '10 seconds'
sSize ='2 seconds'
�
def wnq_fb_func(batch_df, batch_id):
print("batch is processed from time:{}".format(datetime.now()))
print(batch_df.collect())
batch_df.show(10,False,False)
lines = spark.readStream.format('socket').option('host', hostName).option('port', portNumber).option('includeTimestamp', True).load()
�
nSensors=3
�
scols = split(lines.value, ',').cast(ArrayType(FloatType()))
sensorCols = []
for i in range(nSensors):
sensorCols.append(scols.getItem(i).alias('sensor'+ str(i)))
nlines=lines.select(lines.timestamp,lines.value, *sensorCols)
nlines.printSchema()
�
wnlines =nlines.select(window(nlines.timestamp, wSize, sSize).alias('TimeWindow'), *lines.columns)
wnquery= wnlines.writeStream.trigger(processingTime=sSize)\
.outputMode('append').foreachBatch(wnq_fb_func).start()
�
nquery=nlines.writeStream.outputMode('append').format('console').start()
nquery.awaitTermination()
wnquery.awaitTermination()
�
�
�
Re: multiple query with structured streaming in spark does not work
Posted by Amit Joshi <ma...@gmail.com>.
Hi Jian,
You have to use same spark session to run all the queries.
And use the following to wait for termination.
q1 = writestream.start
q2 = writstream2.start
spark.streams.awaitAnyTermination
And also set the scheduler in the spark config to FAIR scheduler.
Regards
Amit Joshi
On Saturday, May 22, 2021, <ji...@xtronica.no> wrote:
> Hi There;
>
>
>
> I am new to spark. We are using spark to develop our app for data
> streaming with sensor readings.
>
>
>
> I am having trouble to get two queries with structured streaming working
> concurrently.
>
>
>
> Following is the code. It can only work with one of them. Wonder if there
> is any way to get it doing. Appreciate help from the team.
>
>
>
> Regards,
>
>
>
> Jian Xu
>
>
>
>
>
> hostName = 'localhost'
>
> portNumber= 9999
>
> wSize= '10 seconds'
>
> sSize ='2 seconds'
>
>
>
> def wnq_fb_func(batch_df, batch_id):
>
> print("batch is processed from time:{}".format(datetime.now()))
>
> print(batch_df.collect())
>
> batch_df.show(10,False,False)
>
>
>
> lines = spark.readStream.format('socket').option('host',
> hostName).option('port', portNumber).option('includeTimestamp',
> True).load()
>
>
>
> nSensors=3
>
>
>
> scols = split(lines.value, ',').cast(ArrayType(FloatType()))
>
> sensorCols = []
>
> for i in range(nSensors):
>
> sensorCols.append(scols.getItem(i).alias('sensor'+ str(i)))
>
>
>
> nlines=lines.select(lines.timestamp,lines.value, *sensorCols)
>
> nlines.printSchema()
>
>
>
> wnlines =nlines.select(window(nlines.timestamp, wSize,
> sSize).alias('TimeWindow'), *lines.columns)
>
> wnquery= wnlines.writeStream.trigger(processingTime=sSize)\
>
> .outputMode('append').foreachBatch(wnq_fb_func).start()
>
>
>
> nquery=nlines.writeStream.outputMode('append').format('console').start()
>
> nquery.awaitTermination()
>
> wnquery.awaitTermination()
>
>
>
>
>
>
>