You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by ji...@xtronica.no on 2021/05/22 00:10:24 UTC

RE: multiple query with structured streaming in spark does not work

Hi Amit;

 �

Thank you for your prompt reply and kind help. Wonder how to set the scheduler to FAIR mode in python. Following code seems to me does not work out.

 �

conf = SparkConf().setMaster("local").setAppName("HSMSTest1")

sc = SparkContext(conf=conf)

sc.setLocalProperty('spark.scheduler.mode', 'FAIR')

spark = SparkSession.builder.appName("HSMSStructedStreaming1").getOrCreate()

 �

by the way, as I am using nc -lk 9999 to input the stream, will it cause by the reason as the input stream can only be consumed by one query as mentioned in below post as;

 �

https://stackoverflow.com/questions/45618489/executing-separate-streaming-queries-in-spark-structured-streaming

 �

appreciate your further help/support.

 �

Best Regards,

 �

Jian Xu

 �

From: Amit Joshi <ma...@gmail.com> 
Sent: Friday, May 21, 2021 12:52 PM
To: jianxu@xtronica.no
Cc: user@spark.apache.org
Subject: Re: multiple query with structured streaming in spark does not work

 �

Hi Jian,

 �

You have to use same spark session to run all the queries.

And use the following to wait for termination.

 �

q1 = writestream.start

q2 = writstream2.start

spark.streams.awaitAnyTermination

 �

And also set the scheduler in the spark config to FAIR scheduler.

 �

 �

Regards

Amit Joshi

 �



On Saturday, May 22, 2021, <jianxu@xtronica.no <ma...@xtronica.no> > wrote:

Hi There;

 �

I am new to spark. We are using spark to develop our app for data streaming with sensor readings. 

 �

I am having trouble to get two queries with structured streaming working concurrently.

 �

Following is the code. It can only work with one of them. Wonder if there is any way to get it doing. Appreciate help from the team.

 �

Regards,

 �

Jian Xu

 �

 �

hostName = 'localhost'

portNumber= 9999

wSize= '10 seconds' 

sSize ='2 seconds'

 �

def wnq_fb_func(batch_df, batch_id):

 � � � print("batch is processed from time:{}".format(datetime.now()))

 � � � print(batch_df.collect())

 � � � batch_df.show(10,False,False)

 � � � 

lines = spark.readStream.format('socket').option('host', hostName).option('port', portNumber).option('includeTimestamp', True).load()

 �

nSensors=3

 �

scols = split(lines.value, ',').cast(ArrayType(FloatType()))

sensorCols = []

for i in range(nSensors):

 � � � sensorCols.append(scols.getItem(i).alias('sensor'+ str(i)))

 � � � 

nlines=lines.select(lines.timestamp,lines.value, *sensorCols)

nlines.printSchema()

 �

wnlines =nlines.select(window(nlines.timestamp, wSize, sSize).alias('TimeWindow'), *lines.columns)

wnquery= wnlines.writeStream.trigger(processingTime=sSize)\

.outputMode('append').foreachBatch(wnq_fb_func).start()

 �

nquery=nlines.writeStream.outputMode('append').format('console').start()

nquery.awaitTermination()

wnquery.awaitTermination()

 �

 �

 �


Re: multiple query with structured streaming in spark does not work

Posted by Amit Joshi <ma...@gmail.com>.
Hi Jian,

I found this link that could be useful.
https://spark.apache.org/docs/latest/job-scheduling.html#scheduling-within-an-application

By the way you can try once giving enough resources to run both jobs
without defining the scheduler.
I mean run the queries with default scheduler, but provide enough memory in
the spark cluster to run both.


Regards
Amit Joshi



On Sat, May 22, 2021 at 5:41 AM <ji...@xtronica.no> wrote:

> Hi Amit;
>
>
>
> Thank you for your prompt reply and kind help. Wonder how to set the
> scheduler to FAIR mode in python. Following code seems to me does not work
> out.
>
>
>
> conf = SparkConf().setMaster("local").setAppName("HSMSTest1")
>
> sc = SparkContext(conf=conf)
>
> sc.setLocalProperty('spark.scheduler.mode', 'FAIR')
>
> spark =
> SparkSession.builder.appName("HSMSStructedStreaming1").getOrCreate()
>
>
>
> by the way, as I am using nc -lk 9999 to input the stream, will it cause
> by the reason as the input stream can only be consumed by one query as
> mentioned in below post as;
>
>
>
>
> https://stackoverflow.com/questions/45618489/executing-separate-streaming-queries-in-spark-structured-streaming
>
>
>
> appreciate your further help/support.
>
>
>
> Best Regards,
>
>
>
> Jian Xu
>
>
>
> *From:* Amit Joshi <ma...@gmail.com>
> *Sent:* Friday, May 21, 2021 12:52 PM
> *To:* jianxu@xtronica.no
> *Cc:* user@spark.apache.org
> *Subject:* Re: multiple query with structured streaming in spark does not
> work
>
>
>
> Hi Jian,
>
>
>
> You have to use same spark session to run all the queries.
>
> And use the following to wait for termination.
>
>
>
> q1 = writestream.start
>
> q2 = writstream2.start
>
> spark.streams.awaitAnyTermination
>
>
>
> And also set the scheduler in the spark config to FAIR scheduler.
>
>
>
>
>
> Regards
>
> Amit Joshi
>
>
>
>
>
> On Saturday, May 22, 2021, <ji...@xtronica.no> wrote:
>
> Hi There;
>
>
>
> I am new to spark. We are using spark to develop our app for data
> streaming with sensor readings.
>
>
>
> I am having trouble to get two queries with structured streaming working
> concurrently.
>
>
>
> Following is the code. It can only work with one of them. Wonder if there
> is any way to get it doing. Appreciate help from the team.
>
>
>
> Regards,
>
>
>
> Jian Xu
>
>
>
>
>
> hostName = 'localhost'
>
> portNumber= 9999
>
> wSize= '10 seconds'
>
> sSize ='2 seconds'
>
>
>
> def wnq_fb_func(batch_df, batch_id):
>
>     print("batch is processed from time:{}".format(datetime.now()))
>
>     print(batch_df.collect())
>
>     batch_df.show(10,False,False)
>
>
>
> lines = spark.readStream.format('socket').option('host',
> hostName).option('port', portNumber).option('includeTimestamp', True).load()
>
>
>
> nSensors=3
>
>
>
> scols = split(lines.value, ',').cast(ArrayType(FloatType()))
>
> sensorCols = []
>
> for i in range(nSensors):
>
>     sensorCols.append(scols.getItem(i).alias('sensor'+ str(i)))
>
>
>
> nlines=lines.select(lines.timestamp,lines.value, *sensorCols)
>
> nlines.printSchema()
>
>
>
> wnlines =nlines.select(window(nlines.timestamp, wSize,
> sSize).alias('TimeWindow'), *lines.columns)
>
> wnquery= wnlines.writeStream.trigger(processingTime=sSize)\
>
> .outputMode('append').foreachBatch(wnq_fb_func).start()
>
>
>
> nquery=nlines.writeStream.outputMode('append').format('console').start()
>
> nquery.awaitTermination()
>
> wnquery.awaitTermination()
>
>
>
>
>
>
>
>

RE: multiple query with structured streaming in spark does not work

Posted by ji...@xtronica.no.
Hi Amit;

 �

Further to my last email, I managed to set the scheduler to fair via code conf = SparkConf().setMaster("local").setAppName("HSMSTest1").set("spark.scheduler.mode", "FAIR")

 �

I can see the mode is changed in web view. Though the result is same. This does not work out. And it might be the reason as stated in the post. My question is how to use socket to carry multiple queries originated with same input streaming. Or it is not applicable with socket streaming mode at all.

 �

Regards,

 �

Jian Xu

 �

From: jianxu@xtronica.no <ji...@xtronica.no> 
Sent: Friday, May 21, 2021 5:10 PM
To: 'Amit Joshi' <ma...@gmail.com>
Cc: user@spark.apache.org
Subject: RE: multiple query with structured streaming in spark does not work 

 �

Hi Amit;

 �

Thank you for your prompt reply and kind help. Wonder how to set the scheduler to FAIR mode in python. Following code seems to me does not work out.

 �

conf = SparkConf().setMaster("local").setAppName("HSMSTest1")

sc = SparkContext(conf=conf)

sc.setLocalProperty('spark.scheduler.mode', 'FAIR')

spark = SparkSession.builder.appName("HSMSStructedStreaming1").getOrCreate()

 �

by the way, as I am using nc -lk 9999 to input the stream, will it cause by the reason as the input stream can only be consumed by one query as mentioned in below post as;

 �

https://stackoverflow.com/questions/45618489/executing-separate-streaming-queries-in-spark-structured-streaming

 �

appreciate your further help/support.

 �

Best Regards,

 �

Jian Xu

 �

From: Amit Joshi <mailtojoshiamit@gmail.com <ma...@gmail.com> > 
Sent: Friday, May 21, 2021 12:52 PM
To: jianxu@xtronica.no <ma...@xtronica.no> 
Cc: user@spark.apache.org <ma...@spark.apache.org> 
Subject: Re: multiple query with structured streaming in spark does not work

 �

Hi Jian,

 �

You have to use same spark session to run all the queries.

And use the following to wait for termination.

 �

q1 = writestream.start

q2 = writstream2.start

spark.streams.awaitAnyTermination

 �

And also set the scheduler in the spark config to FAIR scheduler.

 �

 �

Regards

Amit Joshi

 �



On Saturday, May 22, 2021, <jianxu@xtronica.no <ma...@xtronica.no> > wrote:

Hi There;

 �

I am new to spark. We are using spark to develop our app for data streaming with sensor readings. 

 �

I am having trouble to get two queries with structured streaming working concurrently.

 �

Following is the code. It can only work with one of them. Wonder if there is any way to get it doing. Appreciate help from the team.

 �

Regards,

 �

Jian Xu

 �

 �

hostName = 'localhost'

portNumber= 9999

wSize= '10 seconds' 

sSize ='2 seconds'

 �

def wnq_fb_func(batch_df, batch_id):

 � � � print("batch is processed from time:{}".format(datetime.now()))

 � � � print(batch_df.collect())

 � � � batch_df.show(10,False,False)

 � � � 

lines = spark.readStream.format('socket').option('host', hostName).option('port', portNumber).option('includeTimestamp', True).load()

 �

nSensors=3

 �

scols = split(lines.value, ',').cast(ArrayType(FloatType()))

sensorCols = []

for i in range(nSensors):

 � � � sensorCols.append(scols.getItem(i).alias('sensor'+ str(i)))

 � � � 

nlines=lines.select(lines.timestamp,lines.value, *sensorCols)

nlines.printSchema()

 �

wnlines =nlines.select(window(nlines.timestamp, wSize, sSize).alias('TimeWindow'), *lines.columns)

wnquery= wnlines.writeStream.trigger(processingTime=sSize)\

.outputMode('append').foreachBatch(wnq_fb_func).start()

 �

nquery=nlines.writeStream.outputMode('append').format('console').start()

nquery.awaitTermination()

wnquery.awaitTermination()

 �

 �

 �