You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by ji...@xtronica.no on 2021/05/28 00:49:53 UTC

mqtt module

Hi Amit;

 �

Do you have any idea where to find mqtt module. It supposes to be under pyspark.streaming? I could not find it with the latest version of 3.1.1. I need to connect the structured streaming via mqtt.

 �

Appreciate any help with the matter.

 �

Regards,

 �

Jian Xu

 �

From: Amit Joshi <ma...@gmail.com> 
Sent: Friday, May 21, 2021 9:38 PM
To: jianxu@xtronica.no
Cc: spark-user <us...@spark.apache.org>
Subject: Re: multiple query with structured streaming in spark does not work

 �

Hi Jian,

 �

I found this link that could be useful.

https://spark.apache.org/docs/latest/job-scheduling.html#scheduling-within-an-application

 �

By �the way you can try once giving enough resources to run both jobs without defining the scheduler.

I mean run the queries with default �scheduler, but provide enough memory in the spark cluster to run both.

 �

 �

Regards

Amit Joshi

 �

 �

 �

On Sat, May 22, 2021 at 5:41 AM <jianxu@xtronica.no <ma...@xtronica.no> > wrote:

Hi Amit;

 �

Thank you for your prompt reply and kind help. Wonder how to set the scheduler to FAIR mode in python. Following code seems to me does not work out.

 �

conf = SparkConf().setMaster("local").setAppName("HSMSTest1")

sc = SparkContext(conf=conf)

sc.setLocalProperty('spark.scheduler.mode', 'FAIR')

spark = SparkSession.builder.appName("HSMSStructedStreaming1").getOrCreate()

 �

by the way, as I am using nc -lk 9999 to input the stream, will it cause by the reason as the input stream can only be consumed by one query as mentioned in below post as;

 �

https://stackoverflow.com/questions/45618489/executing-separate-streaming-queries-in-spark-structured-streaming

 �

appreciate your further help/support.

 �

Best Regards,

 �

Jian Xu

 �

From: Amit Joshi <mailtojoshiamit@gmail.com <ma...@gmail.com> > 
Sent: Friday, May 21, 2021 12:52 PM
To: jianxu@xtronica.no <ma...@xtronica.no> 
Cc: user@spark.apache.org <ma...@spark.apache.org> 
Subject: Re: multiple query with structured streaming in spark does not work

 �

Hi Jian,

 �

You have to use same spark session to run all the queries.

And use the following to wait for termination.

 �

q1 = writestream.start

q2 = writstream2.start

spark.streams.awaitAnyTermination

 �

And also set the scheduler in the spark config to FAIR scheduler.

 �

 �

Regards

Amit Joshi

 �



On Saturday, May 22, 2021, <jianxu@xtronica.no <ma...@xtronica.no> > wrote:

Hi There;

 �

I am new to spark. We are using spark to develop our app for data streaming with sensor readings. 

 �

I am having trouble to get two queries with structured streaming working concurrently.

 �

Following is the code. It can only work with one of them. Wonder if there is any way to get it doing. Appreciate help from the team.

 �

Regards,

 �

Jian Xu

 �

 �

hostName = 'localhost'

portNumber= 9999

wSize= '10 seconds' 

sSize ='2 seconds'

 �

def wnq_fb_func(batch_df, batch_id):

 � � � print("batch is processed from time:{}".format(datetime.now()))

 � � � print(batch_df.collect())

 � � � batch_df.show(10,False,False)

 � � � 

lines = spark.readStream.format('socket').option('host', hostName).option('port', portNumber).option('includeTimestamp', True).load()

 �

nSensors=3

 �

scols = split(lines.value, ',').cast(ArrayType(FloatType()))

sensorCols = []

for i in range(nSensors):

 � � � sensorCols.append(scols.getItem(i).alias('sensor'+ str(i)))

 � � � 

nlines=lines.select(lines.timestamp,lines.value, *sensorCols)

nlines.printSchema()

 �

wnlines =nlines.select(window(nlines.timestamp, wSize, sSize).alias('TimeWindow'), *lines.columns)

wnquery= wnlines.writeStream.trigger(processingTime=sSize)\

.outputMode('append').foreachBatch(wnq_fb_func).start()

 �

nquery=nlines.writeStream.outputMode('append').format('console').start()

nquery.awaitTermination()

wnquery.awaitTermination()

 �

 �

 �