You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by karan alang <ka...@gmail.com> on 2022/02/25 22:30:25 UTC

StructuredStreaming error - pyspark.sql.utils.StreamingQueryException: batch 44 doesn't exist

Hello All,
I'm running a StructuredStreaming program on GCP Dataproc, which reads data
from Kafka, does some processing and puts processed data back into Kafka.
The program was running fine, when I killed it (to make minor changes), and
then re-started it.

It is giving me the error - pyspark.sql.utils.StreamingQueryException:
batch 44 doesn't exist

Here is the error:

22/02/25 22:14:08 ERROR
org.apache.spark.sql.execution.streaming.MicroBatchExecution: Query
[id = 0b73937f-1140-40fe-b201-cf4b7443b599, runId =
43c9242d-993a-4b9a-be2b-04d8c9e05b06] terminated with error
java.lang.IllegalStateException: batch 44 doesn't exist
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$populateStartOffsets$1(MicroBatchExecution.scala:286)
    at scala.Option.getOrElse(Option.scala:189)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.populateStartOffsets(MicroBatchExecution.scala:286)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:197)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:357)
    at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:355)
    at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:68)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:194)
    at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:57)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:188)
    at org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream$1(StreamExecution.scala:334)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
    at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:317)
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:244)
Traceback (most recent call last):
  File "/tmp/0149aedd804c42f288718e013fb16f9c/StructuredStreaming_GCP_Versa_Sase_gcloud.py",
line 609, in <module>
    query.awaitTermination()
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/streaming.py",
line 101, in awaitTermination
  File "/usr/lib/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py",
line 1304, in __call__
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/utils.py",
line 117, in deco
pyspark.sql.utils.StreamingQueryException: batch 44 doesn't exist


Question - what is the cause of this error and how to debug/fix ? Also, I
notice that the checkpoint location gets corrupted occasionally, when I do
multiple restarts. After checkpoint corruption, it does not return any
records

For the above issue(as well as when the checkpoint was corrupted), when i
cleared the checkpoint location and re-started the program, it went trhough
fine.

Pls note: while doing readStream, i've enabled failOnDataLoss=false

Additional details are in stackoverflow :

https://stackoverflow.com/questions/71272328/structuredstreaming-error-pyspark-sql-utils-streamingqueryexception-batch-44

any input on this ?

tia!

Re: StructuredStreaming error - pyspark.sql.utils.StreamingQueryException: batch 44 doesn't exist

Posted by Gourav Sengupta <go...@gmail.com>.
Hi,

May be the purpose of the article is different, but:
instead of:  sources (trail files) --> kafka --> flume --> write to cloud
storage -->> SSS
a much simpler solution is:  sources (trail files) -->  write to cloud
storage -->> SSS

Putting additional components and hops just does sound a bit difficult for
me to understand.

Regards,
Gourav

On Sat, Feb 26, 2022 at 5:12 PM Mich Talebzadeh <mi...@gmail.com>
wrote:

> Besides, is the structure of your checkpoint as in this article of mine?
>
> Processing Change Data Capture with Spark Structured Streaming
> <https://www.linkedin.com/pulse/processing-change-data-capture-spark-structured-talebzadeh-ph-d-/?trackingId=kPyvLBnnSjWPUR8vIq%2FAQw%3D%3D>
>
> Section on "The concept of checkpointing and its value with trigger once"
>
>
> see also
> https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#recovering-from-failures-with-checkpointing
>
>
> You can try to clear the checkpoint directories content, run your spark
> job for a while and try to CTRL c etc to kill and check what are the
> entries under sources sub-directory
>
>
> HTH
>
>
>    view my Linkedin profile
> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Sat, 26 Feb 2022 at 10:44, Mich Talebzadeh <mi...@gmail.com>
> wrote:
>
>> Check the thread I forwarded on how to gracefully shutdown spark
>> structured streaming
>>
>> HTH
>>
>> On Fri, 25 Feb 2022 at 22:31, karan alang <ka...@gmail.com> wrote:
>>
>>> Hello All,
>>> I'm running a StructuredStreaming program on GCP Dataproc, which reads
>>> data from Kafka, does some processing and puts processed data back into
>>> Kafka. The program was running fine, when I killed it (to make minor
>>> changes), and then re-started it.
>>>
>>> It is giving me the error -
>>> pyspark.sql.utils.StreamingQueryExceptionace: batch 44 doesn't exist
>>>
>>> Here is the error:
>>>
>>> 22/02/25 22:14:08 ERROR org.apache.spark.sql.execution.streaming.MicroBatchExecution: Query [id = 0b73937f-1140-40fe-b201-cf4b7443b599, runId = 43c9242d-993a-4b9a-be2b-04d8c9e05b06] terminated with error
>>> java.lang.IllegalStateException: batch 44 doesn't exist
>>>     at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$populateStartOffsets$1(MicroBatchExecution.scala:286)
>>>     at scala.Option.getOrElse(Option.scala:189)
>>>     at org.apache.spark.sql.execution.streaming.MicroBatchExecution.populateStartOffsets(MicroBatchExecution.scala:286)
>>>     at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:197)
>>>     at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
>>>     at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:357)
>>>     at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:355)
>>>     at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:68)
>>>     at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:194)
>>>     at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:57)
>>>     at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:188)
>>>     at org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream$1(StreamExecution.scala:334)
>>>     at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
>>>     at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
>>>     at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:317)
>>>     at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:244)
>>> Traceback (most recent call last):
>>>   File "/tmp/0149aedd804c42f288718e013fb16f9c/StructuredStreaming_GCP_Versa_Sase_gcloud.py", line 609, in <module>
>>>     query.awaitTermination()
>>>   File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/streaming.py", line 101, in awaitTermination
>>>   File "/usr/lib/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1304, in __call__
>>>   File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 117, in deco
>>> pyspark.sql.utils.StreamingQueryException: batch 44 doesn't exist
>>>
>>>
>>> Question - what is the cause of this error and how to debug/fix ? Also,
>>> I notice that the checkpoint location gets corrupted occasionally, when I
>>> do multiple restarts. After checkpoint corruption, it does not return any
>>> records
>>>
>>> For the above issue(as well as when the checkpoint was corrupted), when
>>> i cleared the checkpoint location and re-started the program, it went trhough
>>> fine.
>>>
>>> Pls note: while doing readStream, i've enabled failOnDataLoss=false
>>>
>>> Additional details are in stackoverflow :
>>>
>>>
>>> https://stackoverflow.com/questions/71272328/structuredstreaming-error-pyspark-sql-utils-streamingqueryexception-batch-44
>>>
>>> any input on this ?
>>>
>>> tia!
>>>
>>>
>>> --
>>
>>
>>
>>    view my Linkedin profile
>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>
>>
>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>

Re: StructuredStreaming error - pyspark.sql.utils.StreamingQueryException: batch 44 doesn't exist

Posted by Mich Talebzadeh <mi...@gmail.com>.
Besides, is the structure of your checkpoint as in this article of mine?

Processing Change Data Capture with Spark Structured Streaming
<https://www.linkedin.com/pulse/processing-change-data-capture-spark-structured-talebzadeh-ph-d-/?trackingId=kPyvLBnnSjWPUR8vIq%2FAQw%3D%3D>

Section on "The concept of checkpointing and its value with trigger once"


see also
https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#recovering-from-failures-with-checkpointing


You can try to clear the checkpoint directories content, run your spark job
for a while and try to CTRL c etc to kill and check what are the
entries under sources sub-directory


HTH


   view my Linkedin profile
<https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>


 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Sat, 26 Feb 2022 at 10:44, Mich Talebzadeh <mi...@gmail.com>
wrote:

> Check the thread I forwarded on how to gracefully shutdown spark
> structured streaming
>
> HTH
>
> On Fri, 25 Feb 2022 at 22:31, karan alang <ka...@gmail.com> wrote:
>
>> Hello All,
>> I'm running a StructuredStreaming program on GCP Dataproc, which reads
>> data from Kafka, does some processing and puts processed data back into
>> Kafka. The program was running fine, when I killed it (to make minor
>> changes), and then re-started it.
>>
>> It is giving me the error - pyspark.sql.utils.StreamingQueryExceptionace:
>> batch 44 doesn't exist
>>
>> Here is the error:
>>
>> 22/02/25 22:14:08 ERROR org.apache.spark.sql.execution.streaming.MicroBatchExecution: Query [id = 0b73937f-1140-40fe-b201-cf4b7443b599, runId = 43c9242d-993a-4b9a-be2b-04d8c9e05b06] terminated with error
>> java.lang.IllegalStateException: batch 44 doesn't exist
>>     at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$populateStartOffsets$1(MicroBatchExecution.scala:286)
>>     at scala.Option.getOrElse(Option.scala:189)
>>     at org.apache.spark.sql.execution.streaming.MicroBatchExecution.populateStartOffsets(MicroBatchExecution.scala:286)
>>     at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:197)
>>     at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
>>     at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:357)
>>     at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:355)
>>     at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:68)
>>     at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:194)
>>     at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:57)
>>     at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:188)
>>     at org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream$1(StreamExecution.scala:334)
>>     at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
>>     at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
>>     at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:317)
>>     at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:244)
>> Traceback (most recent call last):
>>   File "/tmp/0149aedd804c42f288718e013fb16f9c/StructuredStreaming_GCP_Versa_Sase_gcloud.py", line 609, in <module>
>>     query.awaitTermination()
>>   File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/streaming.py", line 101, in awaitTermination
>>   File "/usr/lib/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1304, in __call__
>>   File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 117, in deco
>> pyspark.sql.utils.StreamingQueryException: batch 44 doesn't exist
>>
>>
>> Question - what is the cause of this error and how to debug/fix ? Also, I
>> notice that the checkpoint location gets corrupted occasionally, when I do
>> multiple restarts. After checkpoint corruption, it does not return any
>> records
>>
>> For the above issue(as well as when the checkpoint was corrupted), when i
>> cleared the checkpoint location and re-started the program, it went trhough
>> fine.
>>
>> Pls note: while doing readStream, i've enabled failOnDataLoss=false
>>
>> Additional details are in stackoverflow :
>>
>>
>> https://stackoverflow.com/questions/71272328/structuredstreaming-error-pyspark-sql-utils-streamingqueryexception-batch-44
>>
>> any input on this ?
>>
>> tia!
>>
>>
>> --
>
>
>
>    view my Linkedin profile
> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>

Re: StructuredStreaming error - pyspark.sql.utils.StreamingQueryException: batch 44 doesn't exist

Posted by Mich Talebzadeh <mi...@gmail.com>.
I checked this process of gracefully terminating the topic when the flag is
set to terminate the topic. In this case the topic is called md => market
data. The first two batches and then you set the termination flag on

Topic market data => md, batchId is 236, at 2022-03-01 20:52:00.099259
+------------------------------------+------+-------------------+------+
|rowkey                              |ticker|timeissued         |price |
+------------------------------------+------+-------------------+------+
|edb5e84d-630d-43e4-9e83-9a8e3cd5adef|IBM   |2022-03-01 20:52:01|191.95|
|32351db1-b241-4e87-9d0b-48b81c0e2b85|MSFT  |2022-03-01 20:52:01|27.1  |
|43df704d-7b1d-4adb-8658-643d5a1fb877|BP    |2022-03-01 20:52:02|584.75|
|03e1cdbb-b65f-4306-8e0f-2c383e587e5b|SAP   |2022-03-01 20:52:02|34.46 |
|423b39f1-81f6-456d-b651-2b1bd928036e|MRW   |2022-03-01 20:52:01|235.9 |
|5bb795c8-bc55-4c7c-8ea4-5aac9e6225ef|SBRY  |2022-03-01 20:52:02|396.65|
|99b77350-f0f1-47b3-bb23-2679e6b3be9f|TSCO  |2022-03-01 20:52:02|207.04|
|e7be9fc2-4030-48aa-90db-3ffa68d70636|VOD   |2022-03-01 20:52:02|204.32|
|7e577a26-2ffe-4313-b8f1-17212a78bace|MKS   |2022-03-01 20:52:02|609.5 |
|f3548462-99ee-4551-a284-b6fcc0e1c43b|ORCL  |2022-03-01 20:52:01|32.14 |
+------------------------------------+------+-------------------+------+

Topic market data => md, batchId is 237, at 2022-03-01 20:52:30.080247
+------------------------------------+------+-------------------+------+
|rowkey                              |ticker|timeissued         |price |
+------------------------------------+------+-------------------+------+
|9289d0da-6b6c-48a3-9de8-ade18ecb62b1|VOD   |2022-03-01 20:52:34|238.85|
|cb990ffd-cbeb-4805-a451-bacde22a9a4a|ORCL  |2022-03-01 20:52:34|41.04 |
|7aa818f8-7b48-4941-a663-a881529605f6|BP    |2022-03-01 20:52:34|456.7 |
|03783b5e-f98e-453f-b8b1-9c88d9053bf8|IBM   |2022-03-01 20:52:33|181.07|
|85581bd3-04e8-4598-9ac2-10bb9af4ca4e|MSFT  |2022-03-01 20:52:34|28.14 |
|7113ce60-9721-499a-9d57-c8eea7619aae|TSCO  |2022-03-01 20:52:34|362.3 |
|e424782b-7767-4fa6-b090-d31d01010783|MKS   |2022-03-01 20:52:34|594.5 |
|c158aaa3-4d46-4f35-9927-3c3fe52e8025|MRW   |2022-03-01 20:52:34|273.47|
|d53933ca-07dc-4069-b303-e3ed95590484|SBRY  |2022-03-01 20:52:34|278.05|
|25fde8d8-7740-47b1-a3a3-d2e77aacbb1f|SAP   |2022-03-01 20:52:34|33.25 |
+------------------------------------+------+-------------------+------+

Request terminating streaming process for topic md at 2022-03-01
20:52:39.654067

When this streaming job is terminated orderly,  there is no corruption. The
logic is shown below:


def sendToControl(dfnewtopic, batchId):

    if(len(dfnewtopic.take(1))) > 0:

       queue = dfnewtopic.select(col("queue")).collect()[0][0]

        status = dfnewtopic.select(col("status")).collect()[0][0]

        if((queue == config['MDVariables']['topic']) & (status == 'false')):

          spark_session = s.spark_session(config['common']['appName'])

          active = spark_session.streams.active

          for e in active:

             name = e.name

             if(name == config['MDVariables']['topic']):

                print(f"""Request terminating streaming process for topic
{name} at {datetime.now()} """)

                e.stop()

    else:

        print("DataFrame newtopic is empty")


HTH


   view my Linkedin profile
<https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>


 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Sat, 26 Feb 2022 at 10:44, Mich Talebzadeh <mi...@gmail.com>
wrote:

> Check the thread I forwarded on how to gracefully shutdown spark
> structured streaming
>
> HTH
>
> On Fri, 25 Feb 2022 at 22:31, karan alang <ka...@gmail.com> wrote:
>
>> Hello All,
>> I'm running a StructuredStreaming program on GCP Dataproc, which reads
>> data from Kafka, does some processing and puts processed data back into
>> Kafka. The program was running fine, when I killed it (to make minor
>> changes), and then re-started it.
>>
>> It is giving me the error - pyspark.sql.utils.StreamingQueryExceptionace:
>> batch 44 doesn't exist
>>
>> Here is the error:
>>
>> 22/02/25 22:14:08 ERROR org.apache.spark.sql.execution.streaming.MicroBatchExecution: Query [id = 0b73937f-1140-40fe-b201-cf4b7443b599, runId = 43c9242d-993a-4b9a-be2b-04d8c9e05b06] terminated with error
>> java.lang.IllegalStateException: batch 44 doesn't exist
>>     at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$populateStartOffsets$1(MicroBatchExecution.scala:286)
>>     at scala.Option.getOrElse(Option.scala:189)
>>     at org.apache.spark.sql.execution.streaming.MicroBatchExecution.populateStartOffsets(MicroBatchExecution.scala:286)
>>     at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:197)
>>     at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
>>     at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:357)
>>     at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:355)
>>     at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:68)
>>     at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:194)
>>     at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:57)
>>     at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:188)
>>     at org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream$1(StreamExecution.scala:334)
>>     at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
>>     at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
>>     at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:317)
>>     at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:244)
>> Traceback (most recent call last):
>>   File "/tmp/0149aedd804c42f288718e013fb16f9c/StructuredStreaming_GCP_Versa_Sase_gcloud.py", line 609, in <module>
>>     query.awaitTermination()
>>   File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/streaming.py", line 101, in awaitTermination
>>   File "/usr/lib/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1304, in __call__
>>   File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 117, in deco
>> pyspark.sql.utils.StreamingQueryException: batch 44 doesn't exist
>>
>>
>> Question - what is the cause of this error and how to debug/fix ? Also, I
>> notice that the checkpoint location gets corrupted occasionally, when I do
>> multiple restarts. After checkpoint corruption, it does not return any
>> records
>>
>> For the above issue(as well as when the checkpoint was corrupted), when i
>> cleared the checkpoint location and re-started the program, it went trhough
>> fine.
>>
>> Pls note: while doing readStream, i've enabled failOnDataLoss=false
>>
>> Additional details are in stackoverflow :
>>
>>
>> https://stackoverflow.com/questions/71272328/structuredstreaming-error-pyspark-sql-utils-streamingqueryexception-batch-44
>>
>> any input on this ?
>>
>> tia!
>>
>>
>> --
>
>
>
>    view my Linkedin profile
> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>

Re: StructuredStreaming error - pyspark.sql.utils.StreamingQueryException: batch 44 doesn't exist

Posted by karan alang <ka...@gmail.com>.
Hi Mich,
thnx .. i'll check the thread you forwarded, and revert back.

regds,
Karan Alang

On Sat, Feb 26, 2022 at 2:44 AM Mich Talebzadeh <mi...@gmail.com>
wrote:

> Check the thread I forwarded on how to gracefully shutdown spark
> structured streaming
>
> HTH
>
> On Fri, 25 Feb 2022 at 22:31, karan alang <ka...@gmail.com> wrote:
>
>> Hello All,
>> I'm running a StructuredStreaming program on GCP Dataproc, which reads
>> data from Kafka, does some processing and puts processed data back into
>> Kafka. The program was running fine, when I killed it (to make minor
>> changes), and then re-started it.
>>
>> It is giving me the error - pyspark.sql.utils.StreamingQueryExceptionace:
>> batch 44 doesn't exist
>>
>> Here is the error:
>>
>> 22/02/25 22:14:08 ERROR org.apache.spark.sql.execution.streaming.MicroBatchExecution: Query [id = 0b73937f-1140-40fe-b201-cf4b7443b599, runId = 43c9242d-993a-4b9a-be2b-04d8c9e05b06] terminated with error
>> java.lang.IllegalStateException: batch 44 doesn't exist
>>     at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$populateStartOffsets$1(MicroBatchExecution.scala:286)
>>     at scala.Option.getOrElse(Option.scala:189)
>>     at org.apache.spark.sql.execution.streaming.MicroBatchExecution.populateStartOffsets(MicroBatchExecution.scala:286)
>>     at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:197)
>>     at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
>>     at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:357)
>>     at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:355)
>>     at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:68)
>>     at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:194)
>>     at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:57)
>>     at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:188)
>>     at org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream$1(StreamExecution.scala:334)
>>     at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
>>     at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
>>     at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:317)
>>     at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:244)
>> Traceback (most recent call last):
>>   File "/tmp/0149aedd804c42f288718e013fb16f9c/StructuredStreaming_GCP_Versa_Sase_gcloud.py", line 609, in <module>
>>     query.awaitTermination()
>>   File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/streaming.py", line 101, in awaitTermination
>>   File "/usr/lib/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1304, in __call__
>>   File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 117, in deco
>> pyspark.sql.utils.StreamingQueryException: batch 44 doesn't exist
>>
>>
>> Question - what is the cause of this error and how to debug/fix ? Also, I
>> notice that the checkpoint location gets corrupted occasionally, when I do
>> multiple restarts. After checkpoint corruption, it does not return any
>> records
>>
>> For the above issue(as well as when the checkpoint was corrupted), when i
>> cleared the checkpoint location and re-started the program, it went trhough
>> fine.
>>
>> Pls note: while doing readStream, i've enabled failOnDataLoss=false
>>
>> Additional details are in stackoverflow :
>>
>>
>> https://stackoverflow.com/questions/71272328/structuredstreaming-error-pyspark-sql-utils-streamingqueryexception-batch-44
>>
>> any input on this ?
>>
>> tia!
>>
>>
>> --
>
>
>
>    view my Linkedin profile
> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>

Re: StructuredStreaming error - pyspark.sql.utils.StreamingQueryException: batch 44 doesn't exist

Posted by Gourav Sengupta <go...@gmail.com>.
Hi Karan,

If you are running at least once operation, then you can restart the failed
job with a new checkpoint area, and you will end up with duplicates in your
target but the job will run fine.

Since you are using stateful operations, if your keys are large to manage
in a state try to use RocksDB, it was introduced by Tathagata Das a few
years ago in the Databricks version, and it has now been made available in
the open source version, it really works well.

Let me know how things go, and what was your final solution.


Regards,
Gourav Sengupta

On Mon, Feb 28, 2022 at 6:02 AM karan alang <ka...@gmail.com> wrote:

> Hi Gourav,
>
> Pls see my responses below :
>
> Can you please let us know:
> 1. the SPARK version, and the kind of streaming query that you are
> running?
>
> KA : Apache Spark 3.1.2 - on Dataproc using Ubunto 18.04 (the highest
> Spark version supported on dataproc is 3.1.2) ,
>
> 2. whether you are using at least once, utmost once, or only once concepts?
>
> KA : default value - at-least once delivery semantics
> (per my understanding, i don't believe delivery semantics is related to
> the issue, though)
>
> 3. any additional details that you can provide, regarding the storage
> duration in Kafka, etc?
>
> KA : storage duration - 1 day ..
> However, as I mentioned in the stackoverflow ticket, on readStream ->
> "failOnDataLoss" = "false", so the log retention should not cause this
> issue.
>
> 4. are your running stateful or stateless operations? If you are using
> stateful operations and SPARK 3.2 try to use RocksDB which is now natively
> integrated with SPARK :)
>
> KA : Stateful - since i'm using windowing+watermark in the aggregation
> queries.
>
> Also, thnx - will check the links you provided.
>
> regds,
> Karan Alang
>
> On Sat, Feb 26, 2022 at 3:31 AM Gourav Sengupta <go...@gmail.com>
> wrote:
>
>> Hi,
>>
>> Can you please let us know:
>> 1. the SPARK version, and the kind of streaming query that you are
>> running?
>> 2. whether you are using at least once, utmost once, or only once
>> concepts?
>> 3. any additional details that you can provide, regarding the storage
>> duration in Kafka, etc?
>> 4. are your running stateful or stateless operations? If you are using
>> stateful operations and SPARK 3.2 try to use RocksDB which is now natively
>> integrated with SPARK :)
>>
>> Besides the mail sent by Mich, the following are useful:
>> 1.
>> https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#managing-streaming-queries
>> (see the stop operation, and awaitTermination... operation)
>> 2. Try to always ensure that you are doing exception handling based on
>> the option mentioned in the above link, long running streaming programmes
>> in distributed systems do have issues, and handling exceptions is important
>> 3. There is another thing which I do, and it is around reading the
>> streaming metrics and pushing them for logging, that helps me to know in
>> long running system whether there are any performance issues or not (
>> https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#reading-metrics-interactively
>> or
>> https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#reporting-metrics-programmatically-using-asynchronous-apis)
>> . The following is an interesting reading on the kind of metrics to look
>> out for and the way to interpret them (
>> https://docs.databricks.com/spark/latest/rdd-streaming/debugging-streaming-applications.html
>> )
>>
>>
>> Regards,
>> Gourav
>>
>>
>> On Sat, Feb 26, 2022 at 10:45 AM Mich Talebzadeh <
>> mich.talebzadeh@gmail.com> wrote:
>>
>>> Check the thread I forwarded on how to gracefully shutdown spark
>>> structured streaming
>>>
>>> HTH
>>>
>>> On Fri, 25 Feb 2022 at 22:31, karan alang <ka...@gmail.com> wrote:
>>>
>>>> Hello All,
>>>> I'm running a StructuredStreaming program on GCP Dataproc, which reads
>>>> data from Kafka, does some processing and puts processed data back into
>>>> Kafka. The program was running fine, when I killed it (to make minor
>>>> changes), and then re-started it.
>>>>
>>>> It is giving me the error -
>>>> pyspark.sql.utils.StreamingQueryExceptionace: batch 44 doesn't exist
>>>>
>>>> Here is the error:
>>>>
>>>> 22/02/25 22:14:08 ERROR org.apache.spark.sql.execution.streaming.MicroBatchExecution: Query [id = 0b73937f-1140-40fe-b201-cf4b7443b599, runId = 43c9242d-993a-4b9a-be2b-04d8c9e05b06] terminated with error
>>>> java.lang.IllegalStateException: batch 44 doesn't exist
>>>>     at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$populateStartOffsets$1(MicroBatchExecution.scala:286)
>>>>     at scala.Option.getOrElse(Option.scala:189)
>>>>     at org.apache.spark.sql.execution.streaming.MicroBatchExecution.populateStartOffsets(MicroBatchExecution.scala:286)
>>>>     at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:197)
>>>>     at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
>>>>     at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:357)
>>>>     at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:355)
>>>>     at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:68)
>>>>     at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:194)
>>>>     at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:57)
>>>>     at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:188)
>>>>     at org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream$1(StreamExecution.scala:334)
>>>>     at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
>>>>     at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
>>>>     at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:317)
>>>>     at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:244)
>>>> Traceback (most recent call last):
>>>>   File "/tmp/0149aedd804c42f288718e013fb16f9c/StructuredStreaming_GCP_Versa_Sase_gcloud.py", line 609, in <module>
>>>>     query.awaitTermination()
>>>>   File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/streaming.py", line 101, in awaitTermination
>>>>   File "/usr/lib/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1304, in __call__
>>>>   File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 117, in deco
>>>> pyspark.sql.utils.StreamingQueryException: batch 44 doesn't exist
>>>>
>>>>
>>>> Question - what is the cause of this error and how to debug/fix ? Also,
>>>> I notice that the checkpoint location gets corrupted occasionally, when I
>>>> do multiple restarts. After checkpoint corruption, it does not return any
>>>> records
>>>>
>>>> For the above issue(as well as when the checkpoint was corrupted), when
>>>> i cleared the checkpoint location and re-started the program, it went t
>>>> rhough fine.
>>>>
>>>> Pls note: while doing readStream, i've enabled failOnDataLoss=false
>>>>
>>>> Additional details are in stackoverflow :
>>>>
>>>>
>>>> https://stackoverflow.com/questions/71272328/structuredstreaming-error-pyspark-sql-utils-streamingqueryexception-batch-44
>>>>
>>>> any input on this ?
>>>>
>>>> tia!
>>>>
>>>>
>>>> --
>>>
>>>
>>>
>>>    view my Linkedin profile
>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>
>>>
>>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>>
>>>
>>>
>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>> any loss, damage or destruction of data or any other property which may
>>> arise from relying on this email's technical content is explicitly
>>> disclaimed. The author will in no case be liable for any monetary damages
>>> arising from such loss, damage or destruction.
>>>
>>>
>>>
>>

Re: StructuredStreaming error - pyspark.sql.utils.StreamingQueryException: batch 44 doesn't exist

Posted by karan alang <ka...@gmail.com>.
Hi Gourav,

Pls see my responses below :

Can you please let us know:
1. the SPARK version, and the kind of streaming query that you are running?

KA : Apache Spark 3.1.2 - on Dataproc using Ubunto 18.04 (the highest Spark
version supported on dataproc is 3.1.2) ,

2. whether you are using at least once, utmost once, or only once concepts?

KA : default value - at-least once delivery semantics
(per my understanding, i don't believe delivery semantics is related to the
issue, though)

3. any additional details that you can provide, regarding the storage
duration in Kafka, etc?

KA : storage duration - 1 day ..
However, as I mentioned in the stackoverflow ticket, on readStream ->
"failOnDataLoss" = "false", so the log retention should not cause this
issue.

4. are your running stateful or stateless operations? If you are using
stateful operations and SPARK 3.2 try to use RocksDB which is now natively
integrated with SPARK :)

KA : Stateful - since i'm using windowing+watermark in the aggregation
queries.

Also, thnx - will check the links you provided.

regds,
Karan Alang

On Sat, Feb 26, 2022 at 3:31 AM Gourav Sengupta <go...@gmail.com>
wrote:

> Hi,
>
> Can you please let us know:
> 1. the SPARK version, and the kind of streaming query that you are
> running?
> 2. whether you are using at least once, utmost once, or only once concepts?
> 3. any additional details that you can provide, regarding the storage
> duration in Kafka, etc?
> 4. are your running stateful or stateless operations? If you are using
> stateful operations and SPARK 3.2 try to use RocksDB which is now natively
> integrated with SPARK :)
>
> Besides the mail sent by Mich, the following are useful:
> 1.
> https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#managing-streaming-queries
> (see the stop operation, and awaitTermination... operation)
> 2. Try to always ensure that you are doing exception handling based on the
> option mentioned in the above link, long running streaming programmes in
> distributed systems do have issues, and handling exceptions is important
> 3. There is another thing which I do, and it is around reading the
> streaming metrics and pushing them for logging, that helps me to know in
> long running system whether there are any performance issues or not (
> https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#reading-metrics-interactively
> or
> https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#reporting-metrics-programmatically-using-asynchronous-apis)
> . The following is an interesting reading on the kind of metrics to look
> out for and the way to interpret them (
> https://docs.databricks.com/spark/latest/rdd-streaming/debugging-streaming-applications.html
> )
>
>
> Regards,
> Gourav
>
>
> On Sat, Feb 26, 2022 at 10:45 AM Mich Talebzadeh <
> mich.talebzadeh@gmail.com> wrote:
>
>> Check the thread I forwarded on how to gracefully shutdown spark
>> structured streaming
>>
>> HTH
>>
>> On Fri, 25 Feb 2022 at 22:31, karan alang <ka...@gmail.com> wrote:
>>
>>> Hello All,
>>> I'm running a StructuredStreaming program on GCP Dataproc, which reads
>>> data from Kafka, does some processing and puts processed data back into
>>> Kafka. The program was running fine, when I killed it (to make minor
>>> changes), and then re-started it.
>>>
>>> It is giving me the error -
>>> pyspark.sql.utils.StreamingQueryExceptionace: batch 44 doesn't exist
>>>
>>> Here is the error:
>>>
>>> 22/02/25 22:14:08 ERROR org.apache.spark.sql.execution.streaming.MicroBatchExecution: Query [id = 0b73937f-1140-40fe-b201-cf4b7443b599, runId = 43c9242d-993a-4b9a-be2b-04d8c9e05b06] terminated with error
>>> java.lang.IllegalStateException: batch 44 doesn't exist
>>>     at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$populateStartOffsets$1(MicroBatchExecution.scala:286)
>>>     at scala.Option.getOrElse(Option.scala:189)
>>>     at org.apache.spark.sql.execution.streaming.MicroBatchExecution.populateStartOffsets(MicroBatchExecution.scala:286)
>>>     at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:197)
>>>     at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
>>>     at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:357)
>>>     at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:355)
>>>     at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:68)
>>>     at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:194)
>>>     at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:57)
>>>     at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:188)
>>>     at org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream$1(StreamExecution.scala:334)
>>>     at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
>>>     at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
>>>     at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:317)
>>>     at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:244)
>>> Traceback (most recent call last):
>>>   File "/tmp/0149aedd804c42f288718e013fb16f9c/StructuredStreaming_GCP_Versa_Sase_gcloud.py", line 609, in <module>
>>>     query.awaitTermination()
>>>   File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/streaming.py", line 101, in awaitTermination
>>>   File "/usr/lib/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1304, in __call__
>>>   File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 117, in deco
>>> pyspark.sql.utils.StreamingQueryException: batch 44 doesn't exist
>>>
>>>
>>> Question - what is the cause of this error and how to debug/fix ? Also,
>>> I notice that the checkpoint location gets corrupted occasionally, when I
>>> do multiple restarts. After checkpoint corruption, it does not return any
>>> records
>>>
>>> For the above issue(as well as when the checkpoint was corrupted), when
>>> i cleared the checkpoint location and re-started the program, it went trhough
>>> fine.
>>>
>>> Pls note: while doing readStream, i've enabled failOnDataLoss=false
>>>
>>> Additional details are in stackoverflow :
>>>
>>>
>>> https://stackoverflow.com/questions/71272328/structuredstreaming-error-pyspark-sql-utils-streamingqueryexception-batch-44
>>>
>>> any input on this ?
>>>
>>> tia!
>>>
>>>
>>> --
>>
>>
>>
>>    view my Linkedin profile
>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>
>>
>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>

Re: StructuredStreaming error - pyspark.sql.utils.StreamingQueryException: batch 44 doesn't exist

Posted by Gourav Sengupta <go...@gmail.com>.
Hi,

Can you please let us know:
1. the SPARK version, and the kind of streaming query that you are running?
2. whether you are using at least once, utmost once, or only once concepts?
3. any additional details that you can provide, regarding the storage
duration in Kafka, etc?
4. are your running stateful or stateless operations? If you are using
stateful operations and SPARK 3.2 try to use RocksDB which is now natively
integrated with SPARK :)

Besides the mail sent by Mich, the following are useful:
1.
https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#managing-streaming-queries
(see the stop operation, and awaitTermination... operation)
2. Try to always ensure that you are doing exception handling based on the
option mentioned in the above link, long running streaming programmes in
distributed systems do have issues, and handling exceptions is important
3. There is another thing which I do, and it is around reading the
streaming metrics and pushing them for logging, that helps me to know in
long running system whether there are any performance issues or not (
https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#reading-metrics-interactively
or
https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#reporting-metrics-programmatically-using-asynchronous-apis)
. The following is an interesting reading on the kind of metrics to look
out for and the way to interpret them (
https://docs.databricks.com/spark/latest/rdd-streaming/debugging-streaming-applications.html
)


Regards,
Gourav


On Sat, Feb 26, 2022 at 10:45 AM Mich Talebzadeh <mi...@gmail.com>
wrote:

> Check the thread I forwarded on how to gracefully shutdown spark
> structured streaming
>
> HTH
>
> On Fri, 25 Feb 2022 at 22:31, karan alang <ka...@gmail.com> wrote:
>
>> Hello All,
>> I'm running a StructuredStreaming program on GCP Dataproc, which reads
>> data from Kafka, does some processing and puts processed data back into
>> Kafka. The program was running fine, when I killed it (to make minor
>> changes), and then re-started it.
>>
>> It is giving me the error - pyspark.sql.utils.StreamingQueryExceptionace:
>> batch 44 doesn't exist
>>
>> Here is the error:
>>
>> 22/02/25 22:14:08 ERROR org.apache.spark.sql.execution.streaming.MicroBatchExecution: Query [id = 0b73937f-1140-40fe-b201-cf4b7443b599, runId = 43c9242d-993a-4b9a-be2b-04d8c9e05b06] terminated with error
>> java.lang.IllegalStateException: batch 44 doesn't exist
>>     at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$populateStartOffsets$1(MicroBatchExecution.scala:286)
>>     at scala.Option.getOrElse(Option.scala:189)
>>     at org.apache.spark.sql.execution.streaming.MicroBatchExecution.populateStartOffsets(MicroBatchExecution.scala:286)
>>     at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:197)
>>     at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
>>     at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:357)
>>     at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:355)
>>     at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:68)
>>     at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:194)
>>     at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:57)
>>     at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:188)
>>     at org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream$1(StreamExecution.scala:334)
>>     at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
>>     at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
>>     at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:317)
>>     at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:244)
>> Traceback (most recent call last):
>>   File "/tmp/0149aedd804c42f288718e013fb16f9c/StructuredStreaming_GCP_Versa_Sase_gcloud.py", line 609, in <module>
>>     query.awaitTermination()
>>   File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/streaming.py", line 101, in awaitTermination
>>   File "/usr/lib/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1304, in __call__
>>   File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 117, in deco
>> pyspark.sql.utils.StreamingQueryException: batch 44 doesn't exist
>>
>>
>> Question - what is the cause of this error and how to debug/fix ? Also, I
>> notice that the checkpoint location gets corrupted occasionally, when I do
>> multiple restarts. After checkpoint corruption, it does not return any
>> records
>>
>> For the above issue(as well as when the checkpoint was corrupted), when i
>> cleared the checkpoint location and re-started the program, it went trhough
>> fine.
>>
>> Pls note: while doing readStream, i've enabled failOnDataLoss=false
>>
>> Additional details are in stackoverflow :
>>
>>
>> https://stackoverflow.com/questions/71272328/structuredstreaming-error-pyspark-sql-utils-streamingqueryexception-batch-44
>>
>> any input on this ?
>>
>> tia!
>>
>>
>> --
>
>
>
>    view my Linkedin profile
> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>

Re: StructuredStreaming error - pyspark.sql.utils.StreamingQueryException: batch 44 doesn't exist

Posted by Mich Talebzadeh <mi...@gmail.com>.
Check the thread I forwarded on how to gracefully shutdown spark structured
streaming

HTH

On Fri, 25 Feb 2022 at 22:31, karan alang <ka...@gmail.com> wrote:

> Hello All,
> I'm running a StructuredStreaming program on GCP Dataproc, which reads
> data from Kafka, does some processing and puts processed data back into
> Kafka. The program was running fine, when I killed it (to make minor
> changes), and then re-started it.
>
> It is giving me the error - pyspark.sql.utils.StreamingQueryExceptionace:
> batch 44 doesn't exist
>
> Here is the error:
>
> 22/02/25 22:14:08 ERROR org.apache.spark.sql.execution.streaming.MicroBatchExecution: Query [id = 0b73937f-1140-40fe-b201-cf4b7443b599, runId = 43c9242d-993a-4b9a-be2b-04d8c9e05b06] terminated with error
> java.lang.IllegalStateException: batch 44 doesn't exist
>     at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$populateStartOffsets$1(MicroBatchExecution.scala:286)
>     at scala.Option.getOrElse(Option.scala:189)
>     at org.apache.spark.sql.execution.streaming.MicroBatchExecution.populateStartOffsets(MicroBatchExecution.scala:286)
>     at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:197)
>     at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
>     at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:357)
>     at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:355)
>     at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:68)
>     at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:194)
>     at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:57)
>     at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:188)
>     at org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream$1(StreamExecution.scala:334)
>     at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
>     at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
>     at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:317)
>     at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:244)
> Traceback (most recent call last):
>   File "/tmp/0149aedd804c42f288718e013fb16f9c/StructuredStreaming_GCP_Versa_Sase_gcloud.py", line 609, in <module>
>     query.awaitTermination()
>   File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/streaming.py", line 101, in awaitTermination
>   File "/usr/lib/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1304, in __call__
>   File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 117, in deco
> pyspark.sql.utils.StreamingQueryException: batch 44 doesn't exist
>
>
> Question - what is the cause of this error and how to debug/fix ? Also, I
> notice that the checkpoint location gets corrupted occasionally, when I do
> multiple restarts. After checkpoint corruption, it does not return any
> records
>
> For the above issue(as well as when the checkpoint was corrupted), when i
> cleared the checkpoint location and re-started the program, it went trhough
> fine.
>
> Pls note: while doing readStream, i've enabled failOnDataLoss=false
>
> Additional details are in stackoverflow :
>
>
> https://stackoverflow.com/questions/71272328/structuredstreaming-error-pyspark-sql-utils-streamingqueryexception-batch-44
>
> any input on this ?
>
> tia!
>
>
> --



   view my Linkedin profile
<https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>


 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.

Re: StructuredStreaming error - pyspark.sql.utils.StreamingQueryException: batch 44 doesn't exist

Posted by karan alang <ka...@gmail.com>.
Hi Gabor,
i just responded to your comment on stackoverflow.

regds,
Karan Alang

On Sat, Feb 26, 2022 at 3:06 PM Gabor Somogyi <ga...@gmail.com>
wrote:

> Hi Karan,
>
> Plz have a look at the stackoverflow comment I've had 2 days ago😉
>
> G
>
> On Fri, 25 Feb 2022, 23:31 karan alang, <ka...@gmail.com> wrote:
>
>> Hello All,
>> I'm running a StructuredStreaming program on GCP Dataproc, which reads
>> data from Kafka, does some processing and puts processed data back into
>> Kafka. The program was running fine, when I killed it (to make minor
>> changes), and then re-started it.
>>
>> It is giving me the error - pyspark.sql.utils.StreamingQueryException:
>> batch 44 doesn't exist
>>
>> Here is the error:
>>
>> 22/02/25 22:14:08 ERROR org.apache.spark.sql.execution.streaming.MicroBatchExecution: Query [id = 0b73937f-1140-40fe-b201-cf4b7443b599, runId = 43c9242d-993a-4b9a-be2b-04d8c9e05b06] terminated with error
>> java.lang.IllegalStateException: batch 44 doesn't exist
>>     at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$populateStartOffsets$1(MicroBatchExecution.scala:286)
>>     at scala.Option.getOrElse(Option.scala:189)
>>     at org.apache.spark.sql.execution.streaming.MicroBatchExecution.populateStartOffsets(MicroBatchExecution.scala:286)
>>     at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:197)
>>     at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
>>     at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:357)
>>     at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:355)
>>     at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:68)
>>     at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:194)
>>     at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:57)
>>     at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:188)
>>     at org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream$1(StreamExecution.scala:334)
>>     at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
>>     at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
>>     at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:317)
>>     at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:244)
>> Traceback (most recent call last):
>>   File "/tmp/0149aedd804c42f288718e013fb16f9c/StructuredStreaming_GCP_Versa_Sase_gcloud.py", line 609, in <module>
>>     query.awaitTermination()
>>   File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/streaming.py", line 101, in awaitTermination
>>   File "/usr/lib/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1304, in __call__
>>   File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 117, in deco
>> pyspark.sql.utils.StreamingQueryException: batch 44 doesn't exist
>>
>>
>> Question - what is the cause of this error and how to debug/fix ? Also, I
>> notice that the checkpoint location gets corrupted occasionally, when I do
>> multiple restarts. After checkpoint corruption, it does not return any
>> records
>>
>> For the above issue(as well as when the checkpoint was corrupted), when i
>> cleared the checkpoint location and re-started the program, it went trhough
>> fine.
>>
>> Pls note: while doing readStream, i've enabled failOnDataLoss=false
>>
>> Additional details are in stackoverflow :
>>
>>
>> https://stackoverflow.com/questions/71272328/structuredstreaming-error-pyspark-sql-utils-streamingqueryexception-batch-44
>>
>> any input on this ?
>>
>> tia!
>>
>>
>>

Re: StructuredStreaming error - pyspark.sql.utils.StreamingQueryException: batch 44 doesn't exist

Posted by Gabor Somogyi <ga...@gmail.com>.
Hi Karan,

Plz have a look at the stackoverflow comment I've had 2 days ago😉

G

On Fri, 25 Feb 2022, 23:31 karan alang, <ka...@gmail.com> wrote:

> Hello All,
> I'm running a StructuredStreaming program on GCP Dataproc, which reads
> data from Kafka, does some processing and puts processed data back into
> Kafka. The program was running fine, when I killed it (to make minor
> changes), and then re-started it.
>
> It is giving me the error - pyspark.sql.utils.StreamingQueryException:
> batch 44 doesn't exist
>
> Here is the error:
>
> 22/02/25 22:14:08 ERROR org.apache.spark.sql.execution.streaming.MicroBatchExecution: Query [id = 0b73937f-1140-40fe-b201-cf4b7443b599, runId = 43c9242d-993a-4b9a-be2b-04d8c9e05b06] terminated with error
> java.lang.IllegalStateException: batch 44 doesn't exist
>     at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$populateStartOffsets$1(MicroBatchExecution.scala:286)
>     at scala.Option.getOrElse(Option.scala:189)
>     at org.apache.spark.sql.execution.streaming.MicroBatchExecution.populateStartOffsets(MicroBatchExecution.scala:286)
>     at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:197)
>     at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
>     at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:357)
>     at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:355)
>     at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:68)
>     at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:194)
>     at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:57)
>     at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:188)
>     at org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream$1(StreamExecution.scala:334)
>     at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
>     at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
>     at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:317)
>     at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:244)
> Traceback (most recent call last):
>   File "/tmp/0149aedd804c42f288718e013fb16f9c/StructuredStreaming_GCP_Versa_Sase_gcloud.py", line 609, in <module>
>     query.awaitTermination()
>   File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/streaming.py", line 101, in awaitTermination
>   File "/usr/lib/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1304, in __call__
>   File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 117, in deco
> pyspark.sql.utils.StreamingQueryException: batch 44 doesn't exist
>
>
> Question - what is the cause of this error and how to debug/fix ? Also, I
> notice that the checkpoint location gets corrupted occasionally, when I do
> multiple restarts. After checkpoint corruption, it does not return any
> records
>
> For the above issue(as well as when the checkpoint was corrupted), when i
> cleared the checkpoint location and re-started the program, it went trhough
> fine.
>
> Pls note: while doing readStream, i've enabled failOnDataLoss=false
>
> Additional details are in stackoverflow :
>
>
> https://stackoverflow.com/questions/71272328/structuredstreaming-error-pyspark-sql-utils-streamingqueryexception-batch-44
>
> any input on this ?
>
> tia!
>
>
>