You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2022/02/09 06:54:11 UTC

[GitHub] [spark] LeeeeLiu opened a new pull request #35458: [SPARK-38033] The structured streaming processing cannot be started b…

LeeeeLiu opened a new pull request #35458:
URL: https://github.com/apache/spark/pull/35458


   <!--
   Thanks for sending a pull request!  Here are some tips for you:
     1. If this is your first time, please read our contributor guidelines: https://spark.apache.org/contributing.html
     2. Ensure you have added or run the appropriate tests for your PR: https://spark.apache.org/developer-tools.html
     3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP][SPARK-XXXX] Your PR title ...'.
     4. Be sure to keep the PR description updated to reflect all changes.
     5. Please write your PR title to summarize what this PR proposes.
     6. If possible, provide a concise example to reproduce the issue for a faster review.
     7. If you want to add a new configuration, please read the guideline first for naming configurations in
        'core/src/main/scala/org/apache/spark/internal/config/ConfigEntry.scala'.
     8. If you want to add or modify an error type or message, please read the guideline first in
        'core/src/main/resources/error/README.md'.
   -->
   
   ### What changes were proposed in this pull request?
   
   The code of method: populateStartOffsets in class: org.apache.spark.sql.execution.streaming.MicroBatchExecution is modified.
   
   ### Why are the changes needed?
   
   In some unexpected cases, commit and offset are inconsistent, and offset is not written into HDFS continuously, as follows:
               commits
               /tmp/streaming_xxxxxxxx/commits/113256
               /tmp/streaming_xxxxxxxx/commits/113257
   
               offsets
               /tmp/streaming_xxxxxxxx/offsets/113257
               /tmp/streaming_xxxxxxxx/offsets/113259
              
   When we start the streaming program, batch ${latestBatchId - 1} is 113258, but offsets 113258 doesn't exist, an exception will be thrown, resulting in the program cannot be started. As an improvement, Spark doesn‘t need to repair itself, but we could probably do some simply analysis and give better error message.
   
   ### Does this PR introduce _any_ user-facing change?
   
   Yes.
   An error message is logged if the exception is thrown.
   
   ### How was this patch tested?
   
   Existing tests, just add an error massage


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on pull request #35458: [SPARK-38033][SS] The structured streaming processing cannot be started b…

Posted by GitBox <gi...@apache.org>.
HyukjinKwon commented on pull request #35458:
URL: https://github.com/apache/spark/pull/35458#issuecomment-1037616396


   Oh wait, why is this targeting branch-3.1 only? Is master branch fine?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] LeeeeLiu commented on pull request #35458: [SPARK-38033][SS] The structured streaming processing cannot be started b…

Posted by GitBox <gi...@apache.org>.
LeeeeLiu commented on pull request #35458:
URL: https://github.com/apache/spark/pull/35458#issuecomment-1037754447


   > Oh wait, why is this targeting branch-3.1 only? Is master branch fine?
   
   Oh, I made the modification based on branch-3.1, master is OK. Do you need me to do anything else?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on pull request #35458: [SPARK-38033][SS] The structured streaming processing cannot be started b…

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on pull request #35458:
URL: https://github.com/apache/spark/pull/35458#issuecomment-1038592030


   It would be nice if we can try to reproduce it manually and see whether the log message is correctly logged. For sure, we need to try it with master branch first, and 3.2, and 3.1.
   
   @LeeeeLiu We put every changes into master branch first, unless the fix is specific to older version. Could you please try to reproduce the issue with master branch, and change this PR to be against master branch if the issue persists in master branch as well? Thanks in advance!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] LeeeeLiu commented on pull request #35458: [SPARK-38033][SS] The structured streaming processing cannot be started b…

Posted by GitBox <gi...@apache.org>.
LeeeeLiu commented on pull request #35458:
URL: https://github.com/apache/spark/pull/35458#issuecomment-1035915174


   @HyukjinKwon , @HeartSaVioR It has been revised. Could you please verify this patch?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR closed pull request #35458: [SPARK-38033][SS] The structured streaming processing cannot be started b…

Posted by GitBox <gi...@apache.org>.
HeartSaVioR closed pull request #35458:
URL: https://github.com/apache/spark/pull/35458


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] LeeeeLiu commented on pull request #35458: [SPARK-38033][SS] The structured streaming processing cannot be started b…

Posted by GitBox <gi...@apache.org>.
LeeeeLiu commented on pull request #35458:
URL: https://github.com/apache/spark/pull/35458#issuecomment-1038702189


   > It would be nice if we can try to reproduce it manually and see whether the log message is correctly logged. For sure, we need to try it with master branch first, and 3.2, and 3.1.
   > 
   > @LeeeeLiu We put every changes into master branch first, unless the fix is specific to older version. Could you please try to reproduce the issue with master branch, and change this PR to be against master branch if the issue persists in master branch as well? Thanks in advance!
   
   This is my first time submitting PR, so some things are not clear. Thank you for your answer. I will verify in the Master branch and try to write a test case to output these logs.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on a change in pull request #35458: [SPARK-38033][SS] The structured streaming processing cannot be started b…

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on a change in pull request #35458:
URL: https://github.com/apache/spark/pull/35458#discussion_r803220341



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
##########
@@ -282,7 +282,27 @@ class MicroBatchExecution(
         /* Initialize committed offsets to a committed batch, which at this
          * is the second latest batch id in the offset log. */
         if (latestBatchId != 0) {
+          /* SPARK-38033: In some unexpected cases, commit and offset are inconsistent,
+            * and offset is not written into HDFS continuously, as follows:
+            *
+            * commits
+            * /tmp/streaming_xxxxxxxx/commits/113256
+            * /tmp/streaming_xxxxxxxx/commits/113257
+            * offsets
+            * /tmp/streaming_xxxxxxxx/offsets/113257
+            * /tmp/streaming_xxxxxxxx/offsets/113259
+            *
+            * When we start the streaming program, batch ${latestBatchId - 1} is 113258,
+            * but offsets 113258 doesn't exist, an exception will be thrown,resulting in
+            * the program cannot be started. As an improvement, we could probably do some
+            * simply analysis and give better error message. */
           val secondLatestOffsets = offsetLog.get(latestBatchId - 1).getOrElse {
+            logError(s"Please check the checkpoint, batch ${latestBatchId - 1} doesn't exist. " +
+              s"If the latest offset is $latestBatchId, the latest commit is ${latestBatchId - 2}" +
+              s" and offset ${latestBatchId - 1} doesn't exist. You can try to remove the offset" +
+              s" $latestBatchId and start over. If your query aims end-to-end exactly once" +
+              s" semantic, and you can also remove the output from the batch ${latestBatchId - 1}" +

Review comment:
       it is not that something "good to have" to fulfill the fault tolerance semantic. it is a requirement.
   
   ```suggestion
                 s" semantic, you need to also remove the output from the batch ${latestBatchId - 1}" +
   ```

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
##########
@@ -282,7 +282,27 @@ class MicroBatchExecution(
         /* Initialize committed offsets to a committed batch, which at this
          * is the second latest batch id in the offset log. */
         if (latestBatchId != 0) {
+          /* SPARK-38033: In some unexpected cases, commit and offset are inconsistent,
+            * and offset is not written into HDFS continuously, as follows:
+            *
+            * commits
+            * /tmp/streaming_xxxxxxxx/commits/113256
+            * /tmp/streaming_xxxxxxxx/commits/113257
+            * offsets
+            * /tmp/streaming_xxxxxxxx/offsets/113257
+            * /tmp/streaming_xxxxxxxx/offsets/113259
+            *
+            * When we start the streaming program, batch ${latestBatchId - 1} is 113258,
+            * but offsets 113258 doesn't exist, an exception will be thrown,resulting in
+            * the program cannot be started. As an improvement, we could probably do some
+            * simply analysis and give better error message. */
           val secondLatestOffsets = offsetLog.get(latestBatchId - 1).getOrElse {
+            logError(s"Please check the checkpoint, batch ${latestBatchId - 1} doesn't exist. " +
+              s"If the latest offset is $latestBatchId, the latest commit is ${latestBatchId - 2}" +
+              s" and offset ${latestBatchId - 1} doesn't exist. You can try to remove the offset" +
+              s" $latestBatchId and start over. If your query aims end-to-end exactly once" +
+              s" semantic, and you can also remove the output from the batch ${latestBatchId - 1}" +

Review comment:
       That said, we need to also remove "if possible" in below line. If they can't do, it becomes at-least-once semantic.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on pull request #35458: [SPARK-38033][SS] The structured streaming processing cannot be started b…

Posted by GitBox <gi...@apache.org>.
HyukjinKwon commented on pull request #35458:
URL: https://github.com/apache/spark/pull/35458#issuecomment-1037616396


   Oh wait, why is this targeting branch-3.1 only? Is master branch fine?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on pull request #35458: [SPARK-38033][SS] The structured streaming processing cannot be started b…

Posted by GitBox <gi...@apache.org>.
HyukjinKwon commented on pull request #35458:
URL: https://github.com/apache/spark/pull/35458#issuecomment-1034362131


   cc @HeartSaVioR FYI


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on pull request #35458: [SPARK-38033][SS] The structured streaming processing cannot be started b…

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on pull request #35458:
URL: https://github.com/apache/spark/pull/35458#issuecomment-1040913222


   Let me close this one since we have a PR for master branch.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] LeeeeLiu commented on a change in pull request #35458: [SPARK-38033][SS] The structured streaming processing cannot be started b…

Posted by GitBox <gi...@apache.org>.
LeeeeLiu commented on a change in pull request #35458:
URL: https://github.com/apache/spark/pull/35458#discussion_r803258924



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
##########
@@ -282,7 +282,27 @@ class MicroBatchExecution(
         /* Initialize committed offsets to a committed batch, which at this
          * is the second latest batch id in the offset log. */
         if (latestBatchId != 0) {
+          /* SPARK-38033: In some unexpected cases, commit and offset are inconsistent,
+            * and offset is not written into HDFS continuously, as follows:
+            *
+            * commits
+            * /tmp/streaming_xxxxxxxx/commits/113256
+            * /tmp/streaming_xxxxxxxx/commits/113257
+            * offsets
+            * /tmp/streaming_xxxxxxxx/offsets/113257
+            * /tmp/streaming_xxxxxxxx/offsets/113259
+            *
+            * When we start the streaming program, batch ${latestBatchId - 1} is 113258,
+            * but offsets 113258 doesn't exist, an exception will be thrown,resulting in
+            * the program cannot be started. As an improvement, we could probably do some
+            * simply analysis and give better error message. */
           val secondLatestOffsets = offsetLog.get(latestBatchId - 1).getOrElse {
+            logError(s"Please check the checkpoint, batch ${latestBatchId - 1} doesn't exist. " +
+              s"If the latest offset is $latestBatchId, the latest commit is ${latestBatchId - 2}" +
+              s" and offset ${latestBatchId - 1} doesn't exist. You can try to remove the offset" +
+              s" $latestBatchId and start over. If your query aims end-to-end exactly once" +
+              s" semantic, and you can also remove the output from the batch ${latestBatchId - 1}" +

Review comment:
       Yes, I got it, this is more accurate. Thank you for your suggestion.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] LeeeeLiu commented on a change in pull request #35458: [SPARK-38033] The structured streaming processing cannot be started b…

Posted by GitBox <gi...@apache.org>.
LeeeeLiu commented on a change in pull request #35458:
URL: https://github.com/apache/spark/pull/35458#discussion_r802336617



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
##########
@@ -282,7 +282,27 @@ class MicroBatchExecution(
         /* Initialize committed offsets to a committed batch, which at this
          * is the second latest batch id in the offset log. */
         if (latestBatchId != 0) {
+          /* SPARK-38033: In some unexpected cases, commit and offset are inconsistent,
+            * and offset is not written into HDFS continuously, as follows:
+            *
+            * commits
+            * /tmp/streaming_xxxxxxxx/commits/113256
+            * /tmp/streaming_xxxxxxxx/commits/113257
+            * offsets
+            * /tmp/streaming_xxxxxxxx/offsets/113257
+            * /tmp/streaming_xxxxxxxx/offsets/113259
+            *
+            * When we start the streaming program, batch ${latestBatchId - 1} is 113258,
+            * but offsets 113258 doesn't exist, an exception will be thrown,resulting in
+            * the program cannot be started. As an improvement, we could probably do some
+            * simply analysis and give better error message. */
           val secondLatestOffsets = offsetLog.get(latestBatchId - 1).getOrElse {
+            logError(s"Please check the checkpoint, batch ${latestBatchId - 1} doesn't exist. " +
+              s"If the latest offset is $latestBatchId, the latest commit is ${latestBatchId - 2}" +
+              s" and offset ${latestBatchId - 1} doesn't exist. You can try to remove the offset" +
+              s" $latestBatchId and start over. If your query aims end-to-end exactly once" +
+              s" semantic, and you can may want to also remove the output from the batch" +

Review comment:
       Oh sorry, my native language is not English. So let me change it.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] martin-g commented on a change in pull request #35458: [SPARK-38033] The structured streaming processing cannot be started b…

Posted by GitBox <gi...@apache.org>.
martin-g commented on a change in pull request #35458:
URL: https://github.com/apache/spark/pull/35458#discussion_r802331141



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
##########
@@ -282,7 +282,27 @@ class MicroBatchExecution(
         /* Initialize committed offsets to a committed batch, which at this
          * is the second latest batch id in the offset log. */
         if (latestBatchId != 0) {
+          /* SPARK-38033: In some unexpected cases, commit and offset are inconsistent,
+            * and offset is not written into HDFS continuously, as follows:
+            *
+            * commits
+            * /tmp/streaming_xxxxxxxx/commits/113256
+            * /tmp/streaming_xxxxxxxx/commits/113257
+            * offsets
+            * /tmp/streaming_xxxxxxxx/offsets/113257
+            * /tmp/streaming_xxxxxxxx/offsets/113259
+            *
+            * When we start the streaming program, batch ${latestBatchId - 1} is 113258,
+            * but offsets 113258 doesn't exist, an exception will be thrown,resulting in
+            * the program cannot be started. As an improvement, we could probably do some
+            * simply analysis and give better error message. */
           val secondLatestOffsets = offsetLog.get(latestBatchId - 1).getOrElse {
+            logError(s"Please check the checkpoint, batch ${latestBatchId - 1} doesn't exist. " +
+              s"If the latest offset is $latestBatchId, the latest commit is ${latestBatchId - 2}" +
+              s" and offset ${latestBatchId - 1} doesn't exist. You can try to remove the offset" +
+              s" $latestBatchId and start over. If your query aims end-to-end exactly once" +
+              s" semantic, and you can may want to also remove the output from the batch" +

Review comment:
       `... you can may want to also remove ...` 
   I am not native English speaker but this part of the sentence sounds wrong to me.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] LeeeeLiu commented on a change in pull request #35458: [SPARK-38033][SS] The structured streaming processing cannot be started b…

Posted by GitBox <gi...@apache.org>.
LeeeeLiu commented on a change in pull request #35458:
URL: https://github.com/apache/spark/pull/35458#discussion_r803255133



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
##########
@@ -282,7 +282,27 @@ class MicroBatchExecution(
         /* Initialize committed offsets to a committed batch, which at this
          * is the second latest batch id in the offset log. */
         if (latestBatchId != 0) {
+          /* SPARK-38033: In some unexpected cases, commit and offset are inconsistent,
+            * and offset is not written into HDFS continuously, as follows:
+            *
+            * commits
+            * /tmp/streaming_xxxxxxxx/commits/113256
+            * /tmp/streaming_xxxxxxxx/commits/113257
+            * offsets
+            * /tmp/streaming_xxxxxxxx/offsets/113257
+            * /tmp/streaming_xxxxxxxx/offsets/113259
+            *
+            * When we start the streaming program, batch ${latestBatchId - 1} is 113258,
+            * but offsets 113258 doesn't exist, an exception will be thrown,resulting in
+            * the program cannot be started. As an improvement, we could probably do some
+            * simply analysis and give better error message. */
           val secondLatestOffsets = offsetLog.get(latestBatchId - 1).getOrElse {
+            logError(s"Please check the checkpoint, batch ${latestBatchId - 1} doesn't exist. " +
+              s"If the latest offset is $latestBatchId, the latest commit is ${latestBatchId - 2}" +
+              s" and offset ${latestBatchId - 1} doesn't exist. You can try to remove the offset" +
+              s" $latestBatchId and start over. If your query aims end-to-end exactly once" +
+              s" semantic, and you can also remove the output from the batch ${latestBatchId - 1}" +
+              s" manually if possible before starting.")

Review comment:
       I got it, thank you for your suggestion.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #35458: [SPARK-38033][SS] The structured streaming processing cannot be started b…

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #35458:
URL: https://github.com/apache/spark/pull/35458#issuecomment-1034554487


   Can one of the admins verify this patch?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on a change in pull request #35458: [SPARK-38033][SS] The structured streaming processing cannot be started b…

Posted by GitBox <gi...@apache.org>.
HyukjinKwon commented on a change in pull request #35458:
URL: https://github.com/apache/spark/pull/35458#discussion_r803208343



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
##########
@@ -282,7 +282,27 @@ class MicroBatchExecution(
         /* Initialize committed offsets to a committed batch, which at this
          * is the second latest batch id in the offset log. */
         if (latestBatchId != 0) {
+          /* SPARK-38033: In some unexpected cases, commit and offset are inconsistent,
+            * and offset is not written into HDFS continuously, as follows:
+            *
+            * commits
+            * /tmp/streaming_xxxxxxxx/commits/113256
+            * /tmp/streaming_xxxxxxxx/commits/113257
+            * offsets
+            * /tmp/streaming_xxxxxxxx/offsets/113257
+            * /tmp/streaming_xxxxxxxx/offsets/113259
+            *
+            * When we start the streaming program, batch ${latestBatchId - 1} is 113258,
+            * but offsets 113258 doesn't exist, an exception will be thrown,resulting in
+            * the program cannot be started. As an improvement, we could probably do some
+            * simply analysis and give better error message. */
           val secondLatestOffsets = offsetLog.get(latestBatchId - 1).getOrElse {
+            logError(s"Please check the checkpoint, batch ${latestBatchId - 1} doesn't exist. " +
+              s"If the latest offset is $latestBatchId, the latest commit is ${latestBatchId - 2}" +
+              s" and offset ${latestBatchId - 1} doesn't exist. You can try to remove the offset" +
+              s" $latestBatchId and start over. If your query aims end-to-end exactly once" +
+              s" semantic, and you can also remove the output from the batch ${latestBatchId - 1}" +
+              s" manually if possible before starting.")

Review comment:
       ```suggestion
                 " manually if possible before starting.")
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] LeeeeLiu commented on pull request #35458: [SPARK-38033][SS] The structured streaming processing cannot be started b…

Posted by GitBox <gi...@apache.org>.
LeeeeLiu commented on pull request #35458:
URL: https://github.com/apache/spark/pull/35458#issuecomment-1037754447


   > Oh wait, why is this targeting branch-3.1 only? Is master branch fine?
   
   Oh, I made the modification based on branch-3.1, master is OK. Do you need me to do anything else?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org