You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by S <sh...@gmail.com> on 2021/08/25 08:44:48 UTC

Spark Structured Streaming Continuous Trigger on multiple sinks

Hello,

I have a structured streaming job that needs to be able to write to
multiple sinks. We are using *Continuous* Trigger *and not* *Microbatch*
Trigger.

1. When we use the foreach method using:
*dataset1.writeStream.foreach(kafka ForEachWriter
logic).trigger(ContinuousMode).start().awaitTermination() *
*dataset1.writeStream.foreach(mongo **ForEachWriter *
*logic).trigger(ContinuousMode).start().awaitTermination() *
The first statement blocks the second one for obvious reasons. So this does
not serve our purpose.
2. The next step for this problem would be to use the *foreachbatch*. That
is not supported in the ContinuousMode.
3. The next step was to use something like this
*dataset1.writeStream.format("kafka").trigger(ContinuousMode).start().awaitTermination()
*
*dataset1.writeStream.format("mongo").trigger(ContinuousMode).start().awaitTermination()*
for both the sinks. This does not work either. Only the 1st query works.
The second one does not.

Is there any solution to the problem of being able to write to multiple
sinks in Continuous Trigger Mode using Structured Streaming?

-- 

Best Regards,

Sheel Pancholi

Re: Spark Structured Streaming Continuous Trigger on multiple sinks

Posted by Alex Ott <al...@gmail.com>.
Just don't call .awaitTermindation() because it blocks execution of the
next line of code. You can assign result of .start() to a specific
variable, or put them into list/array.

And to wait until one of the streams finishes, use
spark.streams.awaitAnyTermination() or something like this
(https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#managing-streaming-queries) 

S  at "Wed, 25 Aug 2021 14:14:48 +0530" wrote:
 S> Hello,

 S> I have a structured streaming job that needs to be able to write to multiple sinks. We are using Continuous Trigger and not Microbatch Trigger. 

 S> 1. When we use the foreach method using:
 S> dataset1.writeStream.foreach(kafka ForEachWriter logic).trigger(ContinuousMode).start().awaitTermination() 
 S> dataset1.writeStream.foreach(mongo ForEachWriter logic).trigger(ContinuousMode).start().awaitTermination() 
 S> The first statement blocks the second one for obvious reasons. So this does not serve our purpose.
 S> 2. The next step for this problem would be to use the foreachbatch. That is not supported in the ContinuousMode.
 S> 3. The next step was to use something like this 
 S> dataset1.writeStream.format("kafka").trigger(ContinuousMode).start().awaitTermination() 
 S> dataset1.writeStream.format("mongo").trigger(ContinuousMode).start().awaitTermination()
 S> for both the sinks. This does not work either. Only the 1st query works. The second one does not.

 S> Is there any solution to the problem of being able to write to multiple sinks in Continuous Trigger Mode using Structured Streaming?



-- 
With best wishes,                    Alex Ott
http://alexott.net/
Twitter: alexott_en (English), alexott (Russian)

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscribe@spark.apache.org