You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2022/02/04 01:47:36 UTC

[GitHub] [spark] HeartSaVioR commented on a change in pull request #35374: [SPARK-34183][SS] DataSource V2: Required distribution and ordering in micro-batch execution

HeartSaVioR commented on a change in pull request #35374:
URL: https://github.com/apache/spark/pull/35374#discussion_r799106086



##########
File path: sql/core/src/test/scala/org/apache/spark/sql/connector/WriteDistributionAndOrderingSuite.scala
##########
@@ -712,15 +831,84 @@ class WriteDistributionAndOrderingSuite
     }
   }
 
+  private def checkMicroBatchWriteRequirements(
+      tableDistribution: Distribution,
+      tableOrdering: Array[SortOrder],
+      tableNumPartitions: Option[Int],
+      expectedWritePartitioning: physical.Partitioning,
+      expectedWriteOrdering: Seq[catalyst.expressions.SortOrder],
+      writeTransform: DataFrame => DataFrame = df => df,
+      outputMode: String = "append",
+      expectAnalysisException: Boolean = false): Unit = {
+
+    catalog.createTable(ident, schema, Array.empty, emptyProps, tableDistribution,
+      tableOrdering, tableNumPartitions)
+
+    withTempDir { checkpointDir =>
+      val inputData = MemoryStream[(Long, String)]
+      val inputDF = inputData.toDF().toDF("id", "data")
+
+      val queryDF = outputMode match {
+        case "append" =>
+          inputDF
+        case "complete" =>

Review comment:
       Let's address "update" mode as well; it's ill-defined but there are users tolerating the definition due to the latency requirement.

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2Writes.scala
##########
@@ -77,6 +81,36 @@ object V2Writes extends Rule[LogicalPlan] with PredicateHelper {
       }
       val newQuery = DistributionAndOrderingUtils.prepareQuery(write, query, conf)
       o.copy(write = Some(write), query = newQuery)
+
+    case WriteToMicroBatchDataSource(
+        relation, table, query, queryId, writeOptions, outputMode, Some(batchId)) =>
+
+      val writeBuilder = newWriteBuilder(table, query, writeOptions, queryId)
+      val write = buildWriteForMicroBatch(table, writeBuilder, outputMode)
+      val microBatchWrite = new MicroBatchWrite(batchId, write.toStreaming)
+      val customMetrics = write.supportedCustomMetrics.toSeq
+      val newQuery = DistributionAndOrderingUtils.prepareQuery(write, query, conf)
+      WriteToDataSourceV2(relation, microBatchWrite, newQuery, customMetrics)
+  }
+
+  private def buildWriteForMicroBatch(
+      table: SupportsWrite,
+      writeBuilder: WriteBuilder,
+      outputMode: OutputMode): Write = {
+
+    outputMode match {
+      case Append =>
+        writeBuilder.build()
+      case Complete =>
+        // TODO: we should do this check earlier when we have capability API.

Review comment:
       I agree it's beyond this PR.

##########
File path: sql/catalyst/src/main/java/org/apache/spark/sql/connector/write/RequiresDistributionAndOrdering.java
##########
@@ -35,6 +35,11 @@
    * Spark will distribute incoming records across partitions to satisfy the required distribution
    * before passing the records to the data source table on write.
    * <p>
+   * Batch and micro-batch writes can request a particular data distribution.

Review comment:
       > We can add some validation and throw an exception for continuous execution (can be a separate PR?).
   
   +1
   If it's easy to do then I'd prefer doing this in this PR. We tend to have a long time gap between PRs, even across releases.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org