You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by "HeartSaVioR (via GitHub)" <gi...@apache.org> on 2023/09/15 05:22:53 UTC

[GitHub] [spark] HeartSaVioR opened a new pull request, #42940: [SPARK-45178][SS] Fallback to execute a single batch for Trigger.AvailableNow with unsupported sources rather than using wrapper

HeartSaVioR opened a new pull request, #42940:
URL: https://github.com/apache/spark/pull/42940

   ### What changes were proposed in this pull request?
   
   This PR proposes to change the behavior when user runs streaming query with Trigger.AvailableNow, which query has any source which does not support Trigger.AvailableNow. Instead of using wrapper implementation, this PR proposes to fall back to execute a single batch (a.k.a Trigger.Once). 
   
   This PR introduces a new flag `spark.sql.streaming.triggerAvailableNowWrapper.enabled` to retain the behavior for advanced and extreme users. The flag is marked as internal since it's really only for extreme users who are concerned about behavioral change.
   
   Minor details would be following:
   
   * This PR does not use Trigger.Once, hence users won't see deprecation warning for Trigger.Once. 
   * This PR will provide a warning log to inform the source(s) which doesn't support Trigger.AvailableNow, so that users can indicate which source(s) is/are preventing them to enjoy benefits of Trigger.AvailableNow.
   
   ### Why are the changes needed?
   
   We have observed a data duplication issue with 3rd party data source when it's used with Trigger.AvailableNow. The source didn't support Trigger.AvailableNow, and unfortunately is also not played well with wrapper implementation.
   
   We care more about possible correctness issue than better coverage of Trigger.AvailableNow, hence want to stop using wrapper implementation by default. We also care about not breaking existing query, so fallback to single batch execution rather than failing the query.
   
   ### Does this PR introduce _any_ user-facing change?
   
   Yes, this introduces a behavioral change for streaming query with Trigger.AvailableNow which contains any source not supporting Trigger.AvailableNow.
   
   ### How was this patch tested?
   
   Modified UT.
   
   ### Was this patch authored or co-authored using generative AI tooling?
   
   No.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on pull request #42940: [SPARK-45178][SS] Fallback to execute a single batch for Trigger.AvailableNow with unsupported sources rather than using wrapper

Posted by "HeartSaVioR (via GitHub)" <gi...@apache.org>.
HeartSaVioR commented on PR #42940:
URL: https://github.com/apache/spark/pull/42940#issuecomment-1726601027

   Ah I commented too late :) Thanks for your support!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR closed pull request #42940: [SPARK-45178][SS] Fallback to execute a single batch for Trigger.AvailableNow with unsupported sources rather than using wrapper

Posted by "HeartSaVioR (via GitHub)" <gi...@apache.org>.
HeartSaVioR closed pull request #42940: [SPARK-45178][SS] Fallback to execute a single batch for Trigger.AvailableNow with unsupported sources rather than using wrapper
URL: https://github.com/apache/spark/pull/42940


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on a diff in pull request #42940: [SPARK-45178][SS] Fallback to execute a single batch for Trigger.AvailableNow with unsupported sources rather than using wrapper

Posted by "HeartSaVioR (via GitHub)" <gi...@apache.org>.
HeartSaVioR commented on code in PR #42940:
URL: https://github.com/apache/spark/pull/42940#discussion_r1329947013


##########
docs/ss-migration-guide.md:
##########
@@ -26,6 +26,10 @@ Note that this migration guide describes the items specific to Structured Stream
 Many items of SQL migration can be applied when migrating Structured Streaming to higher versions.
 Please refer [Migration Guide: SQL, Datasets and DataFrame](sql-migration-guide.html).
 
+## Upgrading from Structured Streaming 3.5 to 4.0
+
+- Since Spark 4.0, Spark falls back to single batch execution if any source in the query does not support `Trigger.AvailableNow`. This is to avoid any possible correctness, duplication, and dataloss issue due to incompatibility between source and wrapper implementation. (See [SPARK-45178](https://issues.apache.org/jira/browse/SPARK-45178) for more details.)

Review Comment:
   I intentionally avoid saying it is Trigger.Once. We deprecated it in good reason, and I'd say it is still worth saying users have to use Trigger.AvailableNow. We just have a fallback to Trigger.Once in technical reason, unfortunately.
   
   Ideally, we still need to persuade 3rd party to implement Trigger.AvailableNow, but I also see that several data source projects having no update for a couple of years, which is unfortunate. Maybe we shouldn't introduce fallback logic and don't support the source so that 3rd party would indicate the necessity. My bad.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] anishshri-db commented on a diff in pull request #42940: [SPARK-45178][SS] Fallback to execute a single batch for Trigger.AvailableNow with unsupported sources rather than using wrapper

Posted by "anishshri-db (via GitHub)" <gi...@apache.org>.
anishshri-db commented on code in PR #42940:
URL: https://github.com/apache/spark/pull/42940#discussion_r1329420413


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/AvailableNowDataStreamWrapper.scala:
##########
@@ -28,6 +28,12 @@ import org.apache.spark.sql.connector.read.streaming
 class AvailableNowDataStreamWrapper(val delegate: SparkDataStream)
   extends SparkDataStream with SupportsTriggerAvailableNow with Logging {
 
+  // See SPARK-45178 for more details.
+  logWarning(s"Activating the wrapper implementation of Trigger.AvailableNow for source " +
+    s"[$delegate]. Note that this might introduce possibility of deduplication, dataloss, " +
+    s"correctness issue. Enable the config with extreme care. We strongly recommend to contact " +
+    "with data source developer to support Trigger.AvailableNow.")

Review Comment:
   ok sounds good



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on a diff in pull request #42940: [SPARK-45178][SS] Fallback to execute a single batch for Trigger.AvailableNow with unsupported sources rather than using wrapper

Posted by "HeartSaVioR (via GitHub)" <gi...@apache.org>.
HeartSaVioR commented on code in PR #42940:
URL: https://github.com/apache/spark/pull/42940#discussion_r1329424143


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala:
##########
@@ -201,7 +207,15 @@ case class MemoryStream[A : Encoder](
 
   override def initialOffset: OffsetV2 = LongOffset(-1)
 
+  override def prepareForTriggerAvailableNow(): Unit = synchronized {

Review Comment:
   While I agree this is a side-effect, this heavily increases a test coverage as well. We test with three data sources in TriggerAvailableNowSuite, and none of three supports Trigger.AvailableNow, hence we are basically missing the case where data source supports Trigger.AvailableNow in that suite. We missed that for a long time, I'd like to handle this as well, as long as we are here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] anishshri-db commented on a diff in pull request #42940: [SPARK-45178][SS] Fallback to execute a single batch for Trigger.AvailableNow with unsupported sources rather than using wrapper

Posted by "anishshri-db (via GitHub)" <gi...@apache.org>.
anishshri-db commented on code in PR #42940:
URL: https://github.com/apache/spark/pull/42940#discussion_r1329417880


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala:
##########
@@ -52,11 +52,40 @@ class MicroBatchExecution(
 
   @volatile protected var sources: Seq[SparkDataStream] = Seq.empty
 
-  protected val triggerExecutor: TriggerExecutor = trigger match {
-    case t: ProcessingTimeTrigger => ProcessingTimeExecutor(t, triggerClock)
-    case OneTimeTrigger => SingleBatchExecutor()
-    case AvailableNowTrigger => MultiBatchExecutor()
-    case _ => throw new IllegalStateException(s"Unknown type of trigger: $trigger")
+  @volatile protected[sql] var triggerExecutor: TriggerExecutor = _
+
+  protected def getTrigger(): TriggerExecutor = {
+    assert(sources.nonEmpty, "sources should have been retrieved from the plan!")
+    trigger match {
+      case t: ProcessingTimeTrigger => ProcessingTimeExecutor(t, triggerClock)
+      case OneTimeTrigger => SingleBatchExecutor()
+      case AvailableNowTrigger =>
+        // See SPARK-45178 for more details.
+        if (sparkSession.sqlContext.conf.getConf(

Review Comment:
   Not sure Im reading this correctly. If the flag is enabled, we use multi-batch directly ? Shouldn't this be the case if the flag is disabled ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on a diff in pull request #42940: [SPARK-45178][SS] Fallback to execute a single batch for Trigger.AvailableNow with unsupported sources rather than using wrapper

Posted by "HeartSaVioR (via GitHub)" <gi...@apache.org>.
HeartSaVioR commented on code in PR #42940:
URL: https://github.com/apache/spark/pull/42940#discussion_r1329418792


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/AvailableNowDataStreamWrapper.scala:
##########
@@ -28,6 +28,12 @@ import org.apache.spark.sql.connector.read.streaming
 class AvailableNowDataStreamWrapper(val delegate: SparkDataStream)
   extends SparkDataStream with SupportsTriggerAvailableNow with Logging {
 
+  // See SPARK-45178 for more details.
+  logWarning(s"Activating the wrapper implementation of Trigger.AvailableNow for source " +
+    s"[$delegate]. Note that this might introduce possibility of deduplication, dataloss, " +
+    s"correctness issue. Enable the config with extreme care. We strongly recommend to contact " +
+    "with data source developer to support Trigger.AvailableNow.")

Review Comment:
   maybe I'd remove Spark here - they need to contact 3rd party data source developer, not Spark community.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on pull request #42940: [SPARK-45178][SS] Fallback to execute a single batch for Trigger.AvailableNow with unsupported sources rather than using wrapper

Posted by "HeartSaVioR (via GitHub)" <gi...@apache.org>.
HeartSaVioR commented on PR #42940:
URL: https://github.com/apache/spark/pull/42940#issuecomment-1726773266

   Thanks, merging to master.
   (I'll probably raise a discussion to dev@ for porting back to 3.5/3.4.)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] anishshri-db commented on a diff in pull request #42940: [SPARK-45178][SS] Fallback to execute a single batch for Trigger.AvailableNow with unsupported sources rather than using wrapper

Posted by "anishshri-db (via GitHub)" <gi...@apache.org>.
anishshri-db commented on code in PR #42940:
URL: https://github.com/apache/spark/pull/42940#discussion_r1329378160


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala:
##########
@@ -52,11 +52,40 @@ class MicroBatchExecution(
 
   @volatile protected var sources: Seq[SparkDataStream] = Seq.empty
 
-  protected val triggerExecutor: TriggerExecutor = trigger match {
-    case t: ProcessingTimeTrigger => ProcessingTimeExecutor(t, triggerClock)
-    case OneTimeTrigger => SingleBatchExecutor()
-    case AvailableNowTrigger => MultiBatchExecutor()
-    case _ => throw new IllegalStateException(s"Unknown type of trigger: $trigger")
+  @volatile protected[sql] var triggerExecutor: TriggerExecutor = _
+
+  protected def getTrigger(): TriggerExecutor = {
+    assert(sources.nonEmpty, "sources should have been retrieved from the plan!")
+    trigger match {
+      case t: ProcessingTimeTrigger => ProcessingTimeExecutor(t, triggerClock)
+      case OneTimeTrigger => SingleBatchExecutor()
+      case AvailableNowTrigger =>
+        // See SPARK-45178 for more details.
+        if (sparkSession.sqlContext.conf.getConf(
+            SQLConf.STREAMING_TRIGGER_AVAILABLE_NOW_WRAPPER_ENABLED)) {
+          logInfo("Configured to use the wrapper of Trigger.AvailableNow.")
+          MultiBatchExecutor()
+        } else {
+          val supportsTriggerAvailableNow = sources.distinct.forall { src =>
+            val supports = src.isInstanceOf[SupportsTriggerAvailableNow]
+            if (!supports) {
+              logWarning(s"source [$src] does not support Trigger.AvailableNow. Failing back to " +

Review Comment:
   Nit: Typo ? `Falling back to` ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on pull request #42940: [SPARK-45178][SS] Fallback to execute a single batch for Trigger.AvailableNow with unsupported sources rather than using wrapper

Posted by "HeartSaVioR (via GitHub)" <gi...@apache.org>.
HeartSaVioR commented on PR #42940:
URL: https://github.com/apache/spark/pull/42940#issuecomment-1726572679

   Looks like python side env issue. I'll rebase and see the chance to be already fixed in latest master.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on pull request #42940: [SPARK-45178][SS] Fallback to execute a single batch for Trigger.AvailableNow with unsupported sources rather than using wrapper

Posted by "HeartSaVioR (via GitHub)" <gi...@apache.org>.
HeartSaVioR commented on PR #42940:
URL: https://github.com/apache/spark/pull/42940#issuecomment-1720694800

   cc. @zsxwing @brkyvz @viirya @anishshri-db Mind taking a look? Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] anishshri-db commented on a diff in pull request #42940: [SPARK-45178][SS] Fallback to execute a single batch for Trigger.AvailableNow with unsupported sources rather than using wrapper

Posted by "anishshri-db (via GitHub)" <gi...@apache.org>.
anishshri-db commented on code in PR #42940:
URL: https://github.com/apache/spark/pull/42940#discussion_r1329377950


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala:
##########
@@ -52,11 +52,40 @@ class MicroBatchExecution(
 
   @volatile protected var sources: Seq[SparkDataStream] = Seq.empty
 
-  protected val triggerExecutor: TriggerExecutor = trigger match {
-    case t: ProcessingTimeTrigger => ProcessingTimeExecutor(t, triggerClock)
-    case OneTimeTrigger => SingleBatchExecutor()
-    case AvailableNowTrigger => MultiBatchExecutor()
-    case _ => throw new IllegalStateException(s"Unknown type of trigger: $trigger")
+  @volatile protected[sql] var triggerExecutor: TriggerExecutor = _
+
+  protected def getTrigger(): TriggerExecutor = {
+    assert(sources.nonEmpty, "sources should have been retrieved from the plan!")
+    trigger match {
+      case t: ProcessingTimeTrigger => ProcessingTimeExecutor(t, triggerClock)
+      case OneTimeTrigger => SingleBatchExecutor()
+      case AvailableNowTrigger =>
+        // See SPARK-45178 for more details.
+        if (sparkSession.sqlContext.conf.getConf(
+            SQLConf.STREAMING_TRIGGER_AVAILABLE_NOW_WRAPPER_ENABLED)) {
+          logInfo("Configured to use the wrapper of Trigger.AvailableNow.")

Review Comment:
   Can we log some more query specific info here ? or we don't have more info here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] anishshri-db commented on a diff in pull request #42940: [SPARK-45178][SS] Fallback to execute a single batch for Trigger.AvailableNow with unsupported sources rather than using wrapper

Posted by "anishshri-db (via GitHub)" <gi...@apache.org>.
anishshri-db commented on code in PR #42940:
URL: https://github.com/apache/spark/pull/42940#discussion_r1329317649


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala:
##########
@@ -201,7 +207,15 @@ case class MemoryStream[A : Encoder](
 
   override def initialOffset: OffsetV2 = LongOffset(-1)
 
+  override def prepareForTriggerAvailableNow(): Unit = synchronized {

Review Comment:
   This is not strictly related to this change right ? basically we are trying to add support for AvailNow for memory source here ? Should we move that to a separate PR ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on a diff in pull request #42940: [SPARK-45178][SS] Fallback to execute a single batch for Trigger.AvailableNow with unsupported sources rather than using wrapper

Posted by "HeartSaVioR (via GitHub)" <gi...@apache.org>.
HeartSaVioR commented on code in PR #42940:
URL: https://github.com/apache/spark/pull/42940#discussion_r1329417758


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala:
##########
@@ -2180,6 +2180,17 @@ object SQLConf {
       .booleanConf
       .createWithDefault(true)
 
+  val STREAMING_TRIGGER_AVAILABLE_NOW_WRAPPER_ENABLED =
+    buildConf("spark.sql.streaming.triggerAvailableNowWrapper.enabled")
+      .internal()
+      .doc("Whether to use the wrapper implementation of Trigger.AvailableNow if the source " +
+        "does not support Trigger.AvailableNow. Enabling this allows the benefits of " +
+        "Trigger.AvailableNow with sources which don't support it, but some sources " +
+        "may show unexpected behavior including duplication, data loss, etc. So use with " +

Review Comment:
   I'm enumerating up the possibility - what we actually observed is a duplication, but suppose the data source which does not rely on offset management from Spark and tries to maintain that from source itself, then additional call might end up skipping some data to be provided.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on a diff in pull request #42940: [SPARK-45178][SS] Fallback to execute a single batch for Trigger.AvailableNow with unsupported sources rather than using wrapper

Posted by "HeartSaVioR (via GitHub)" <gi...@apache.org>.
HeartSaVioR commented on code in PR #42940:
URL: https://github.com/apache/spark/pull/42940#discussion_r1329947013


##########
docs/ss-migration-guide.md:
##########
@@ -26,6 +26,10 @@ Note that this migration guide describes the items specific to Structured Stream
 Many items of SQL migration can be applied when migrating Structured Streaming to higher versions.
 Please refer [Migration Guide: SQL, Datasets and DataFrame](sql-migration-guide.html).
 
+## Upgrading from Structured Streaming 3.5 to 4.0
+
+- Since Spark 4.0, Spark falls back to single batch execution if any source in the query does not support `Trigger.AvailableNow`. This is to avoid any possible correctness, duplication, and dataloss issue due to incompatibility between source and wrapper implementation. (See [SPARK-45178](https://issues.apache.org/jira/browse/SPARK-45178) for more details.)

Review Comment:
   I intentionally avoid saying it is Trigger.Once. We deprecated it in good reason, and I'd say it is still worth saying users have to use Trigger.AvailableNow. We just have a fallback to Trigger.Once in technical reason, unfortunately.
   
   Ideally, we still need to persuade 3rd party to implement Trigger.AvailableNow and remove Trigger.Once at all eventually (I know, it won't happen, we barely remove public API), but I also see that several data source projects having no update for a couple of years, which is unfortunate. Maybe we shouldn't introduce fallback logic and don't support the source so that 3rd party would indicate the necessity. My bad.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on pull request #42940: [SPARK-45178][SS] Fallback to execute a single batch for Trigger.AvailableNow with unsupported sources rather than using wrapper

Posted by "HeartSaVioR (via GitHub)" <gi...@apache.org>.
HeartSaVioR commented on PR #42940:
URL: https://github.com/apache/spark/pull/42940#issuecomment-1726769354

   https://github.com/HeartSaVioR/spark/runs/16947355981
   Looks like the test is flaky rather than consistent failure, it passed before rebasing and also passed locally.
   
   https://github.com/HeartSaVioR/spark/actions/runs/6234669032
   This was the CI before rebasing.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on a diff in pull request #42940: [SPARK-45178][SS] Fallback to execute a single batch for Trigger.AvailableNow with unsupported sources rather than using wrapper

Posted by "HeartSaVioR (via GitHub)" <gi...@apache.org>.
HeartSaVioR commented on code in PR #42940:
URL: https://github.com/apache/spark/pull/42940#discussion_r1329427441


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala:
##########
@@ -52,11 +52,40 @@ class MicroBatchExecution(
 
   @volatile protected var sources: Seq[SparkDataStream] = Seq.empty
 
-  protected val triggerExecutor: TriggerExecutor = trigger match {
-    case t: ProcessingTimeTrigger => ProcessingTimeExecutor(t, triggerClock)
-    case OneTimeTrigger => SingleBatchExecutor()
-    case AvailableNowTrigger => MultiBatchExecutor()
-    case _ => throw new IllegalStateException(s"Unknown type of trigger: $trigger")
+  @volatile protected[sql] var triggerExecutor: TriggerExecutor = _
+
+  protected def getTrigger(): TriggerExecutor = {
+    assert(sources.nonEmpty, "sources should have been retrieved from the plan!")
+    trigger match {
+      case t: ProcessingTimeTrigger => ProcessingTimeExecutor(t, triggerClock)
+      case OneTimeTrigger => SingleBatchExecutor()
+      case AvailableNowTrigger =>
+        // See SPARK-45178 for more details.
+        if (sparkSession.sqlContext.conf.getConf(
+            SQLConf.STREAMING_TRIGGER_AVAILABLE_NOW_WRAPPER_ENABLED)) {
+          logInfo("Configured to use the wrapper of Trigger.AvailableNow.")

Review Comment:
   Yeah let's put queryID and runID at least to differentiate the query. Please note that this is an INFO log indicating that they turned on the flag (which is OK), and the WARN log will follow when they are falling back to use wrapper (which could be problematic).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] anishshri-db commented on a diff in pull request #42940: [SPARK-45178][SS] Fallback to execute a single batch for Trigger.AvailableNow with unsupported sources rather than using wrapper

Posted by "anishshri-db (via GitHub)" <gi...@apache.org>.
anishshri-db commented on code in PR #42940:
URL: https://github.com/apache/spark/pull/42940#discussion_r1329316346


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/AvailableNowDataStreamWrapper.scala:
##########
@@ -28,6 +28,12 @@ import org.apache.spark.sql.connector.read.streaming
 class AvailableNowDataStreamWrapper(val delegate: SparkDataStream)
   extends SparkDataStream with SupportsTriggerAvailableNow with Logging {
 
+  // See SPARK-45178 for more details.
+  logWarning(s"Activating the wrapper implementation of Trigger.AvailableNow for source " +
+    s"[$delegate]. Note that this might introduce possibility of deduplication, dataloss, " +
+    s"correctness issue. Enable the config with extreme care. We strongly recommend to contact " +
+    "with data source developer to support Trigger.AvailableNow.")

Review Comment:
   Nit: `to contact the Spark data source developer to support`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on a diff in pull request #42940: [SPARK-45178][SS] Fallback to execute a single batch for Trigger.AvailableNow with unsupported sources rather than using wrapper

Posted by "HeartSaVioR (via GitHub)" <gi...@apache.org>.
HeartSaVioR commented on code in PR #42940:
URL: https://github.com/apache/spark/pull/42940#discussion_r1329415427


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala:
##########
@@ -2180,6 +2180,17 @@ object SQLConf {
       .booleanConf
       .createWithDefault(true)
 
+  val STREAMING_TRIGGER_AVAILABLE_NOW_WRAPPER_ENABLED =
+    buildConf("spark.sql.streaming.triggerAvailableNowWrapper.enabled")
+      .internal()
+      .doc("Whether to use the wrapper implementation of Trigger.AvailableNow if the source " +
+        "does not support Trigger.AvailableNow. Enabling this allows the benefits of " +
+        "Trigger.AvailableNow with sources which don't support it, but some sources " +
+        "may show unexpected behavior including duplication, data loss, etc. So use with " +
+        "extreme care! The ideal direction is to persuade developers of source(s) to " +
+        "support Trigger.AvailableNow.")

Review Comment:
   I'd say we are going to break a bunch of 3rd party data sources then, which is what we have been avoided so far.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] viirya commented on a diff in pull request #42940: [SPARK-45178][SS] Fallback to execute a single batch for Trigger.AvailableNow with unsupported sources rather than using wrapper

Posted by "viirya (via GitHub)" <gi...@apache.org>.
viirya commented on code in PR #42940:
URL: https://github.com/apache/spark/pull/42940#discussion_r1329632673


##########
docs/ss-migration-guide.md:
##########
@@ -26,6 +26,10 @@ Note that this migration guide describes the items specific to Structured Stream
 Many items of SQL migration can be applied when migrating Structured Streaming to higher versions.
 Please refer [Migration Guide: SQL, Datasets and DataFrame](sql-migration-guide.html).
 
+## Upgrading from Structured Streaming 3.5 to 4.0
+
+- Since Spark 4.0, Spark falls back to single batch execution if any source in the query does not support `Trigger.AvailableNow`. This is to avoid any possible correctness, duplication, and dataloss issue due to incompatibility between source and wrapper implementation. (See [SPARK-45178](https://issues.apache.org/jira/browse/SPARK-45178) for more details.)

Review Comment:
   If this is about correctness, do you plan to backport this to 3.4/3.5 branch? 



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/AvailableNowDataStreamWrapper.scala:
##########
@@ -28,6 +28,12 @@ import org.apache.spark.sql.connector.read.streaming
 class AvailableNowDataStreamWrapper(val delegate: SparkDataStream)
   extends SparkDataStream with SupportsTriggerAvailableNow with Logging {
 
+  // See SPARK-45178 for more details.
+  logWarning(s"Activating the wrapper implementation of Trigger.AvailableNow for source " +
+    s"[$delegate]. Note that this might introduce possibility of deduplication, dataloss, " +
+    s"correctness issue. Enable the config with extreme care. We strongly recommend to contact " +
+    "the data source developer to support Trigger.AvailableNow.")

Review Comment:
   ```suggestion
     logWarning("Activating the wrapper implementation of Trigger.AvailableNow for source " +
       s"[$delegate]. Note that this might introduce possibility of deduplication, dataloss, " +
       "correctness issue. Enable the config with extreme care. We strongly recommend to contact " +
       "the data source developer to support Trigger.AvailableNow.")
   ```



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala:
##########
@@ -201,7 +207,15 @@ case class MemoryStream[A : Encoder](
 
   override def initialOffset: OffsetV2 = LongOffset(-1)
 
+  override def prepareForTriggerAvailableNow(): Unit = synchronized {
+    availableNowEndOffset = latestOffset(initialOffset, ReadLimit.allAvailable())
+  }
+
   override def latestOffset(): OffsetV2 = {
+    throw new IllegalStateException("Should not reach here!")
+  }

Review Comment:
   Not sure why this cannot be called. Cannot `MemoryStream` be used as a `MicroBatchStream` after this change?



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala:
##########
@@ -52,11 +52,46 @@ class MicroBatchExecution(
 
   @volatile protected var sources: Seq[SparkDataStream] = Seq.empty
 
-  protected val triggerExecutor: TriggerExecutor = trigger match {
-    case t: ProcessingTimeTrigger => ProcessingTimeExecutor(t, triggerClock)
-    case OneTimeTrigger => SingleBatchExecutor()
-    case AvailableNowTrigger => MultiBatchExecutor()
-    case _ => throw new IllegalStateException(s"Unknown type of trigger: $trigger")
+  @volatile protected[sql] var triggerExecutor: TriggerExecutor = _
+
+  protected def getTrigger(): TriggerExecutor = {
+    assert(sources.nonEmpty, "sources should have been retrieved from the plan!")
+    trigger match {
+      case t: ProcessingTimeTrigger => ProcessingTimeExecutor(t, triggerClock)
+      case OneTimeTrigger => SingleBatchExecutor()
+      case AvailableNowTrigger =>
+        // When the flag is enabled, Spark will wrap sources which does not support

Review Comment:
   ```suggestion
           // When the flag is enabled, Spark will wrap sources which do not support
   ```



##########
docs/ss-migration-guide.md:
##########
@@ -26,6 +26,10 @@ Note that this migration guide describes the items specific to Structured Stream
 Many items of SQL migration can be applied when migrating Structured Streaming to higher versions.
 Please refer [Migration Guide: SQL, Datasets and DataFrame](sql-migration-guide.html).
 
+## Upgrading from Structured Streaming 3.5 to 4.0
+
+- Since Spark 4.0, Spark falls back to single batch execution if any source in the query does not support `Trigger.AvailableNow`. This is to avoid any possible correctness, duplication, and dataloss issue due to incompatibility between source and wrapper implementation. (See [SPARK-45178](https://issues.apache.org/jira/browse/SPARK-45178) for more details.)

Review Comment:
   For "single batch execution", isn't it deprecated now? Do we guarantee that it could be used in future version?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on a diff in pull request #42940: [SPARK-45178][SS] Fallback to execute a single batch for Trigger.AvailableNow with unsupported sources rather than using wrapper

Posted by "HeartSaVioR (via GitHub)" <gi...@apache.org>.
HeartSaVioR commented on code in PR #42940:
URL: https://github.com/apache/spark/pull/42940#discussion_r1329941703


##########
docs/ss-migration-guide.md:
##########
@@ -26,6 +26,10 @@ Note that this migration guide describes the items specific to Structured Stream
 Many items of SQL migration can be applied when migrating Structured Streaming to higher versions.
 Please refer [Migration Guide: SQL, Datasets and DataFrame](sql-migration-guide.html).
 
+## Upgrading from Structured Streaming 3.5 to 4.0
+
+- Since Spark 4.0, Spark falls back to single batch execution if any source in the query does not support `Trigger.AvailableNow`. This is to avoid any possible correctness, duplication, and dataloss issue due to incompatibility between source and wrapper implementation. (See [SPARK-45178](https://issues.apache.org/jira/browse/SPARK-45178) for more details.)

Review Comment:
   Let's discuss about this after the fix is landed in master branch. It is technically a behavioral change, hence easier to introduce the change in new major/minor version but may need some discussion to introduce the change in bugfix version. Maybe I can send a mail in dev@ and try to gather voices on it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on pull request #42940: [SPARK-45178][SS] Fallback to execute a single batch for Trigger.AvailableNow with unsupported sources rather than using wrapper

Posted by "HeartSaVioR (via GitHub)" <gi...@apache.org>.
HeartSaVioR commented on PR #42940:
URL: https://github.com/apache/spark/pull/42940#issuecomment-1728653787

   https://lists.apache.org/thread/ljronxf6bymvqjmlwpzy84gzgvnqrmoh
   ^^^ DISCUSSION thread in dev@.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on a diff in pull request #42940: [SPARK-45178][SS] Fallback to execute a single batch for Trigger.AvailableNow with unsupported sources rather than using wrapper

Posted by "HeartSaVioR (via GitHub)" <gi...@apache.org>.
HeartSaVioR commented on code in PR #42940:
URL: https://github.com/apache/spark/pull/42940#discussion_r1329424458


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala:
##########
@@ -52,11 +52,40 @@ class MicroBatchExecution(
 
   @volatile protected var sources: Seq[SparkDataStream] = Seq.empty
 
-  protected val triggerExecutor: TriggerExecutor = trigger match {
-    case t: ProcessingTimeTrigger => ProcessingTimeExecutor(t, triggerClock)
-    case OneTimeTrigger => SingleBatchExecutor()
-    case AvailableNowTrigger => MultiBatchExecutor()
-    case _ => throw new IllegalStateException(s"Unknown type of trigger: $trigger")
+  @volatile protected[sql] var triggerExecutor: TriggerExecutor = _
+
+  protected def getTrigger(): TriggerExecutor = {
+    assert(sources.nonEmpty, "sources should have been retrieved from the plan!")
+    trigger match {
+      case t: ProcessingTimeTrigger => ProcessingTimeExecutor(t, triggerClock)
+      case OneTimeTrigger => SingleBatchExecutor()
+      case AvailableNowTrigger =>
+        // See SPARK-45178 for more details.
+        if (sparkSession.sqlContext.conf.getConf(

Review Comment:
   The logic is correct, but I agree that it's very confusing. I'll add some comment to elaborate.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on a diff in pull request #42940: [SPARK-45178][SS] Fallback to execute a single batch for Trigger.AvailableNow with unsupported sources rather than using wrapper

Posted by "HeartSaVioR (via GitHub)" <gi...@apache.org>.
HeartSaVioR commented on code in PR #42940:
URL: https://github.com/apache/spark/pull/42940#discussion_r1329948493


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala:
##########
@@ -201,7 +207,15 @@ case class MemoryStream[A : Encoder](
 
   override def initialOffset: OffsetV2 = LongOffset(-1)
 
+  override def prepareForTriggerAvailableNow(): Unit = synchronized {
+    availableNowEndOffset = latestOffset(initialOffset, ReadLimit.allAvailable())
+  }
+
   override def latestOffset(): OffsetV2 = {
+    throw new IllegalStateException("Should not reach here!")
+  }

Review Comment:
   When data source implements AdmissionControl, this method is guaranteed to be never called. latestOffset(start, limit) will supersede this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] anishshri-db commented on a diff in pull request #42940: [SPARK-45178][SS] Fallback to execute a single batch for Trigger.AvailableNow with unsupported sources rather than using wrapper

Posted by "anishshri-db (via GitHub)" <gi...@apache.org>.
anishshri-db commented on code in PR #42940:
URL: https://github.com/apache/spark/pull/42940#discussion_r1329313595


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala:
##########
@@ -2180,6 +2180,17 @@ object SQLConf {
       .booleanConf
       .createWithDefault(true)
 
+  val STREAMING_TRIGGER_AVAILABLE_NOW_WRAPPER_ENABLED =
+    buildConf("spark.sql.streaming.triggerAvailableNowWrapper.enabled")
+      .internal()
+      .doc("Whether to use the wrapper implementation of Trigger.AvailableNow if the source " +
+        "does not support Trigger.AvailableNow. Enabling this allows the benefits of " +
+        "Trigger.AvailableNow with sources which don't support it, but some sources " +
+        "may show unexpected behavior including duplication, data loss, etc. So use with " +
+        "extreme care! The ideal direction is to persuade developers of source(s) to " +
+        "support Trigger.AvailableNow.")

Review Comment:
   Is there any way to expedite this ? Any way to force developers to upgrade their sources to be compatible with latest Spark requirements ?



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala:
##########
@@ -2180,6 +2180,17 @@ object SQLConf {
       .booleanConf
       .createWithDefault(true)
 
+  val STREAMING_TRIGGER_AVAILABLE_NOW_WRAPPER_ENABLED =
+    buildConf("spark.sql.streaming.triggerAvailableNowWrapper.enabled")
+      .internal()
+      .doc("Whether to use the wrapper implementation of Trigger.AvailableNow if the source " +
+        "does not support Trigger.AvailableNow. Enabling this allows the benefits of " +
+        "Trigger.AvailableNow with sources which don't support it, but some sources " +
+        "may show unexpected behavior including duplication, data loss, etc. So use with " +

Review Comment:
   When would this cause data loss ? Given there are so many dangers, is it worthwhile to expose this if we cannot reason about correctness ? Can we limit the use only in certain situations perhaps to prevent users from shooting themselves in the foot ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on pull request #42940: [SPARK-45178][SS] Fallback to execute a single batch for Trigger.AvailableNow with unsupported sources rather than using wrapper

Posted by "HeartSaVioR (via GitHub)" <gi...@apache.org>.
HeartSaVioR commented on PR #42940:
URL: https://github.com/apache/spark/pull/42940#issuecomment-1726600452

   > As we are already in the status that sources not supporting Trigger.AvailableNow can work with the wrapper instead of simply failing since Trigger.Once is deprecated, it sounds reasonable to continue making it work under single batch execution.
   >
   > One option is to stop support of it in 4.0 (as it is deprecated since 3.4), but add fallback logic for previous branches which provide the support for now (as it is related to correctness)?
   
   Removing API would trigger a long discussion and debate. From my experience of Spark community (you've experienced longer than me :) ), I don't believe the community wants to remove the API unless there is a clear proof that few users use it. I think that's probably Spark 5.0, not 4.0.
   
   That said, we couldn't apply more aggressive alternatives. I don't feel like we can persuade community to decide existing query to fail, which users cannot make any change to mitigate (it would require 3rd party data source engineer to work on).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on a diff in pull request #42940: [SPARK-45178][SS] Fallback to execute a single batch for Trigger.AvailableNow with unsupported sources rather than using wrapper

Posted by "HeartSaVioR (via GitHub)" <gi...@apache.org>.
HeartSaVioR commented on code in PR #42940:
URL: https://github.com/apache/spark/pull/42940#discussion_r1329416576


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala:
##########
@@ -2180,6 +2180,17 @@ object SQLConf {
       .booleanConf
       .createWithDefault(true)
 
+  val STREAMING_TRIGGER_AVAILABLE_NOW_WRAPPER_ENABLED =
+    buildConf("spark.sql.streaming.triggerAvailableNowWrapper.enabled")
+      .internal()
+      .doc("Whether to use the wrapper implementation of Trigger.AvailableNow if the source " +
+        "does not support Trigger.AvailableNow. Enabling this allows the benefits of " +
+        "Trigger.AvailableNow with sources which don't support it, but some sources " +
+        "may show unexpected behavior including duplication, data loss, etc. So use with " +

Review Comment:
   This new config is marked as "internal", which is not going to be a part of public documentation. We expect very advanced users and/or operators only know about the existence of the config and use it as their own risk. We don't even expose this config in the warning message - that's the way I avoid users from shooting themselves on the foot.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org