You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Mich Talebzadeh <mi...@gmail.com> on 2023/03/07 17:25:19 UTC

Re: [Spark Structured Streaming] Could we apply new options of readStream/writeStream without stopping spark application (zero downtime)?

hm interesting proposition. I guess you mean altering one of following
parameters in flight


          streamingDataFrame = self.spark \
                .readStream \
                .format("kafka") \
                .option("kafka.bootstrap.servers",
config['MDVariables']['bootstrapServers'],) \
                .option("schema.registry.url",
config['MDVariables']['schemaRegistryURL']) \
                .option("group.id", config['common']['appName']) \
                .option("zookeeper.connection.timeout.ms",
config['MDVariables']['zookeeperConnectionTimeoutMs']) \
                .option("rebalance.backoff.ms",
config['MDVariables']['rebalanceBackoffMS']) \
                .option("zookeeper.session.timeout.ms",
config['MDVariables']['zookeeperSessionTimeOutMs']) \
                .option("auto.commit.interval.ms",
config['MDVariables']['autoCommitIntervalMS']) \
                .option("subscribe", config['MDVariables']['topic']) \
                .option("failOnDataLoss", "false") \
                .option("includeHeaders", "true") \
                .option("startingOffsets", "latest") \
                .load() \
                .select(from_json(col("value").cast("string"),
schema).alias("parsed_value"))

Ok, one secure way of doing it though shutting down the streaming process
gracefully without loss of data that impacts consumers. The other method
implies inflight changes as suggested by the topic with zeio interruptions.
Interestingly one of our clients requested a similar solution. As solutions
architect /engineering manager I should come back with few options. I am on
the case so to speak. There is a considerable interest in Spark Structured
Streaming across the board, especially in trading systems.

HTH


   view my Linkedin profile
<https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>


 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Thu, 16 Feb 2023 at 04:12, hueiyuan su <hu...@gmail.com> wrote:

> *Component*: Spark Structured Streaming
> *Level*: Advanced
> *Scenario*: How-to
>
> -------------------------
> *Problems Description*
> I would like to confirm could we directly apply new options of
> readStream/writeStream without stopping current running spark structured
> streaming applications? For example, if we just want to adjust throughput
> properties of readStream with kafka. Do we have method can just adjust it
> without stopping application? If you have any ideas, please let me know. I
> will be appreciate it and your answer.
>
>
> --
> Best Regards,
>
> Mars Su
> *Phone*: 0988-661-013
> *Email*: hueiyuansu@gmail.com
>

Re: [Spark Structured Streaming] Could we apply new options of readStream/writeStream without stopping spark application (zero downtime)?

Posted by hueiyuan su <hu...@gmail.com>.
Dear Mich,

Sure, that is a good idea. If we have a pause() function, we can
temporarily stop streaming and adjust configuration, maybe from environment
variable.
Once these parameters are adjust, we can restart the streaming to apply the
newest parameter without stop spark streaming application.

Mich Talebzadeh <mi...@gmail.com> 於 2023年3月10日 週五 上午12:26寫道:

> most probably we will require an  additional method pause()
>
>
> https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.streaming.StreamingQuery.html
>
> to allow us to pause (as opposed to stop()) the streaming process and
> resume after changing the parameters. The state of streaming needs to be
> preserved.
>
> HTH
>
>
>
>    view my Linkedin profile
> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Tue, 7 Mar 2023 at 17:25, Mich Talebzadeh <mi...@gmail.com>
> wrote:
>
>> hm interesting proposition. I guess you mean altering one of following
>> parameters in flight
>>
>>
>>           streamingDataFrame = self.spark \
>>                 .readStream \
>>                 .format("kafka") \
>>                 .option("kafka.bootstrap.servers",
>> config['MDVariables']['bootstrapServers'],) \
>>                 .option("schema.registry.url",
>> config['MDVariables']['schemaRegistryURL']) \
>>                 .option("group.id", config['common']['appName']) \
>>                 .option("zookeeper.connection.timeout.ms",
>> config['MDVariables']['zookeeperConnectionTimeoutMs']) \
>>                 .option("rebalance.backoff.ms",
>> config['MDVariables']['rebalanceBackoffMS']) \
>>                 .option("zookeeper.session.timeout.ms",
>> config['MDVariables']['zookeeperSessionTimeOutMs']) \
>>                 .option("auto.commit.interval.ms",
>> config['MDVariables']['autoCommitIntervalMS']) \
>>                 .option("subscribe", config['MDVariables']['topic']) \
>>                 .option("failOnDataLoss", "false") \
>>                 .option("includeHeaders", "true") \
>>                 .option("startingOffsets", "latest") \
>>                 .load() \
>>                 .select(from_json(col("value").cast("string"),
>> schema).alias("parsed_value"))
>>
>> Ok, one secure way of doing it though shutting down the streaming process
>> gracefully without loss of data that impacts consumers. The other method
>> implies inflight changes as suggested by the topic with zeio interruptions.
>> Interestingly one of our clients requested a similar solution. As solutions
>> architect /engineering manager I should come back with few options. I am on
>> the case so to speak. There is a considerable interest in Spark Structured
>> Streaming across the board, especially in trading systems.
>>
>> HTH
>>
>>
>>    view my Linkedin profile
>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>
>>
>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>>
>> On Thu, 16 Feb 2023 at 04:12, hueiyuan su <hu...@gmail.com> wrote:
>>
>>> *Component*: Spark Structured Streaming
>>> *Level*: Advanced
>>> *Scenario*: How-to
>>>
>>> -------------------------
>>> *Problems Description*
>>> I would like to confirm could we directly apply new options of
>>> readStream/writeStream without stopping current running spark structured
>>> streaming applications? For example, if we just want to adjust throughput
>>> properties of readStream with kafka. Do we have method can just adjust it
>>> without stopping application? If you have any ideas, please let me know. I
>>> will be appreciate it and your answer.
>>>
>>>
>>> --
>>> Best Regards,
>>>
>>> Mars Su
>>> *Phone*: 0988-661-013
>>> *Email*: hueiyuansu@gmail.com
>>>
>>

-- 
Best Regards,

Mars Su
*Phone*: 0988-661-013
*Email*: hueiyuansu@gmail.com

Re: [Spark Structured Streaming] Could we apply new options of readStream/writeStream without stopping spark application (zero downtime)?

Posted by Mich Talebzadeh <mi...@gmail.com>.
most probably we will require an  additional method pause()

https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.streaming.StreamingQuery.html

to allow us to pause (as opposed to stop()) the streaming process and
resume after changing the parameters. The state of streaming needs to be
preserved.

HTH



   view my Linkedin profile
<https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>


 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Tue, 7 Mar 2023 at 17:25, Mich Talebzadeh <mi...@gmail.com>
wrote:

> hm interesting proposition. I guess you mean altering one of following
> parameters in flight
>
>
>           streamingDataFrame = self.spark \
>                 .readStream \
>                 .format("kafka") \
>                 .option("kafka.bootstrap.servers",
> config['MDVariables']['bootstrapServers'],) \
>                 .option("schema.registry.url",
> config['MDVariables']['schemaRegistryURL']) \
>                 .option("group.id", config['common']['appName']) \
>                 .option("zookeeper.connection.timeout.ms",
> config['MDVariables']['zookeeperConnectionTimeoutMs']) \
>                 .option("rebalance.backoff.ms",
> config['MDVariables']['rebalanceBackoffMS']) \
>                 .option("zookeeper.session.timeout.ms",
> config['MDVariables']['zookeeperSessionTimeOutMs']) \
>                 .option("auto.commit.interval.ms",
> config['MDVariables']['autoCommitIntervalMS']) \
>                 .option("subscribe", config['MDVariables']['topic']) \
>                 .option("failOnDataLoss", "false") \
>                 .option("includeHeaders", "true") \
>                 .option("startingOffsets", "latest") \
>                 .load() \
>                 .select(from_json(col("value").cast("string"),
> schema).alias("parsed_value"))
>
> Ok, one secure way of doing it though shutting down the streaming process
> gracefully without loss of data that impacts consumers. The other method
> implies inflight changes as suggested by the topic with zeio interruptions.
> Interestingly one of our clients requested a similar solution. As solutions
> architect /engineering manager I should come back with few options. I am on
> the case so to speak. There is a considerable interest in Spark Structured
> Streaming across the board, especially in trading systems.
>
> HTH
>
>
>    view my Linkedin profile
> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Thu, 16 Feb 2023 at 04:12, hueiyuan su <hu...@gmail.com> wrote:
>
>> *Component*: Spark Structured Streaming
>> *Level*: Advanced
>> *Scenario*: How-to
>>
>> -------------------------
>> *Problems Description*
>> I would like to confirm could we directly apply new options of
>> readStream/writeStream without stopping current running spark structured
>> streaming applications? For example, if we just want to adjust throughput
>> properties of readStream with kafka. Do we have method can just adjust it
>> without stopping application? If you have any ideas, please let me know. I
>> will be appreciate it and your answer.
>>
>>
>> --
>> Best Regards,
>>
>> Mars Su
>> *Phone*: 0988-661-013
>> *Email*: hueiyuansu@gmail.com
>>
>