You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by amit kumar singh <am...@gmail.com> on 2018/05/05 16:20:34 UTC

help needed in perforance improvement of spark structured streaming

Hi Community,

I have a use case where i need to call stored procedure through structured
streaming.

I am able to send kafka message and call stored procedure ,

but since foreach sink keeps on executing stored procedure per message

i want to combine all the messages in single dtaframe and then call  stored
procedure at once

is it possible to do


current code

select('value cast "string",'topic)
  .select('topic,concat_ws(",", 'value cast "string") as 'value1)
 .groupBy('topic cast "string").count()
.coalesce(1)
.as[String]
.writeStream
.trigger(ProcessingTime("60 seconds"))
.option("checkpointLocation", checkpointUrl)
.foreach(new SimpleSqlServerSink(jdbcUrl, connectionProperties))




thanks
rohit

Re: help needed in perforance improvement of spark structured streaming

Posted by amit kumar singh <am...@gmail.com>.
hi team

any help with this



I have a use case where i need to call stored procedure through structured
streaming.

I am able to send kafka message and call stored procedure ,

but since foreach sink keeps on executing stored procedure per message

i want to combine all the messages in single dtaframe and then call  stored
procedure at once

is it possible to do


current code

select('value cast "string",'topic)
  .select('topic,concat_ws(",", 'value cast "string") as 'value1)
 .groupBy('topic cast "string").count()
.coalesce(1)
.as[String]
.writeStream
.trigger(ProcessingTime("60 seconds"))
.option("checkpointLocation", checkpointUrl)
.foreach(new SimpleSqlServerSink(jdbcUrl, connectionProperties))







On Sat, May 5, 2018 at 12:20 PM, amit kumar singh <am...@gmail.com>
wrote:

> Hi Community,
>
> I have a use case where i need to call stored procedure through structured
> streaming.
>
> I am able to send kafka message and call stored procedure ,
>
> but since foreach sink keeps on executing stored procedure per message
>
> i want to combine all the messages in single dtaframe and then call
> stored procedure at once
>
> is it possible to do
>
>
> current code
>
> select('value cast "string",'topic)
>   .select('topic,concat_ws(",", 'value cast "string") as 'value1)
>  .groupBy('topic cast "string").count()
> .coalesce(1)
> .as[String]
> .writeStream
> .trigger(ProcessingTime("60 seconds"))
> .option("checkpointLocation", checkpointUrl)
> .foreach(new SimpleSqlServerSink(jdbcUrl, connectionProperties))
>
>
>
>
> thanks
> rohit
>