You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by "viirya (via GitHub)" <gi...@apache.org> on 2023/09/19 07:04:47 UTC

[GitHub] [spark] viirya commented on a diff in pull request #42940: [SPARK-45178][SS] Fallback to execute a single batch for Trigger.AvailableNow with unsupported sources rather than using wrapper

viirya commented on code in PR #42940:
URL: https://github.com/apache/spark/pull/42940#discussion_r1329632673


##########
docs/ss-migration-guide.md:
##########
@@ -26,6 +26,10 @@ Note that this migration guide describes the items specific to Structured Stream
 Many items of SQL migration can be applied when migrating Structured Streaming to higher versions.
 Please refer [Migration Guide: SQL, Datasets and DataFrame](sql-migration-guide.html).
 
+## Upgrading from Structured Streaming 3.5 to 4.0
+
+- Since Spark 4.0, Spark falls back to single batch execution if any source in the query does not support `Trigger.AvailableNow`. This is to avoid any possible correctness, duplication, and dataloss issue due to incompatibility between source and wrapper implementation. (See [SPARK-45178](https://issues.apache.org/jira/browse/SPARK-45178) for more details.)

Review Comment:
   If this is about correctness, do you plan to backport this to 3.4/3.5 branch? 



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/AvailableNowDataStreamWrapper.scala:
##########
@@ -28,6 +28,12 @@ import org.apache.spark.sql.connector.read.streaming
 class AvailableNowDataStreamWrapper(val delegate: SparkDataStream)
   extends SparkDataStream with SupportsTriggerAvailableNow with Logging {
 
+  // See SPARK-45178 for more details.
+  logWarning(s"Activating the wrapper implementation of Trigger.AvailableNow for source " +
+    s"[$delegate]. Note that this might introduce possibility of deduplication, dataloss, " +
+    s"correctness issue. Enable the config with extreme care. We strongly recommend to contact " +
+    "the data source developer to support Trigger.AvailableNow.")

Review Comment:
   ```suggestion
     logWarning("Activating the wrapper implementation of Trigger.AvailableNow for source " +
       s"[$delegate]. Note that this might introduce possibility of deduplication, dataloss, " +
       "correctness issue. Enable the config with extreme care. We strongly recommend to contact " +
       "the data source developer to support Trigger.AvailableNow.")
   ```



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala:
##########
@@ -201,7 +207,15 @@ case class MemoryStream[A : Encoder](
 
   override def initialOffset: OffsetV2 = LongOffset(-1)
 
+  override def prepareForTriggerAvailableNow(): Unit = synchronized {
+    availableNowEndOffset = latestOffset(initialOffset, ReadLimit.allAvailable())
+  }
+
   override def latestOffset(): OffsetV2 = {
+    throw new IllegalStateException("Should not reach here!")
+  }

Review Comment:
   Not sure why this cannot be called. Cannot `MemoryStream` be used as a `MicroBatchStream` after this change?



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala:
##########
@@ -52,11 +52,46 @@ class MicroBatchExecution(
 
   @volatile protected var sources: Seq[SparkDataStream] = Seq.empty
 
-  protected val triggerExecutor: TriggerExecutor = trigger match {
-    case t: ProcessingTimeTrigger => ProcessingTimeExecutor(t, triggerClock)
-    case OneTimeTrigger => SingleBatchExecutor()
-    case AvailableNowTrigger => MultiBatchExecutor()
-    case _ => throw new IllegalStateException(s"Unknown type of trigger: $trigger")
+  @volatile protected[sql] var triggerExecutor: TriggerExecutor = _
+
+  protected def getTrigger(): TriggerExecutor = {
+    assert(sources.nonEmpty, "sources should have been retrieved from the plan!")
+    trigger match {
+      case t: ProcessingTimeTrigger => ProcessingTimeExecutor(t, triggerClock)
+      case OneTimeTrigger => SingleBatchExecutor()
+      case AvailableNowTrigger =>
+        // When the flag is enabled, Spark will wrap sources which does not support

Review Comment:
   ```suggestion
           // When the flag is enabled, Spark will wrap sources which do not support
   ```



##########
docs/ss-migration-guide.md:
##########
@@ -26,6 +26,10 @@ Note that this migration guide describes the items specific to Structured Stream
 Many items of SQL migration can be applied when migrating Structured Streaming to higher versions.
 Please refer [Migration Guide: SQL, Datasets and DataFrame](sql-migration-guide.html).
 
+## Upgrading from Structured Streaming 3.5 to 4.0
+
+- Since Spark 4.0, Spark falls back to single batch execution if any source in the query does not support `Trigger.AvailableNow`. This is to avoid any possible correctness, duplication, and dataloss issue due to incompatibility between source and wrapper implementation. (See [SPARK-45178](https://issues.apache.org/jira/browse/SPARK-45178) for more details.)

Review Comment:
   For "single batch execution", isn't it deprecated now? Do we guarantee that it could be used in future version?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org