You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2022/12/21 07:09:11 UTC

[GitHub] [spark] zhengruifeng opened a new pull request, #39151: [WIP] Introduce a proto message to make plan deterministic

zhengruifeng opened a new pull request, #39151:
URL: https://github.com/apache/spark/pull/39151

   ### What changes were proposed in this pull request?
   
   Introduce a proto message to make plan deterministic
   
   
   ### Why are the changes needed?
   
   to make randomSplit more clearer
   
   
   ### Does this PR introduce _any_ user-facing change?
   No
   
   
   ### How was this patch tested?
   ...


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] zhengruifeng commented on pull request #39151: [WIP] Introduce a proto message to make plan deterministic

Posted by GitBox <gi...@apache.org>.
zhengruifeng commented on PR #39151:
URL: https://github.com/apache/spark/pull/39151#issuecomment-1360942354

   cc @cloud-fan 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] zhengruifeng commented on a diff in pull request #39151: [WIP] Introduce a proto message to make plan deterministic

Posted by GitBox <gi...@apache.org>.
zhengruifeng commented on code in PR #39151:
URL: https://github.com/apache/spark/pull/39151#discussion_r1054058148


##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala:
##########
@@ -361,6 +363,38 @@ class SparkConnectPlanner(session: SparkSession) {
     }
   }
 
+  private def transformDeterminize(rel: proto.Determinize): LogicalPlan = {
+    if (!rel.hasInput) {
+      throw InvalidPlanInput("Determinize needs a plan input")
+    }
+
+    val input = transformRelation(rel.getInput)
+    if (input.deterministic) {

Review Comment:
   can we rely on this `deterministic` method?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] zhengruifeng commented on a diff in pull request #39151: [WIP] Introduce a proto message to make plan deterministic

Posted by GitBox <gi...@apache.org>.
zhengruifeng commented on code in PR #39151:
URL: https://github.com/apache/spark/pull/39151#discussion_r1054049998


##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala:
##########
@@ -361,6 +363,38 @@ class SparkConnectPlanner(session: SparkSession) {
     }
   }
 
+  private def transformDeterminize(rel: proto.Determinize): LogicalPlan = {
+    if (!rel.hasInput) {
+      throw InvalidPlanInput("Determinize needs a plan input")
+    }
+
+    val input = transformRelation(rel.getInput)
+    if (input.deterministic) {
+      return input
+    }
+
+    val dataset = Dataset.ofRows(session, input)
+    if (dataset.logicalPlan.deterministic || dataset.storageLevel != StorageLevel.NONE) {
+      return dataset.logicalPlan
+    }
+
+    // It is possible that the underlying dataframe doesn't guarantee the ordering of rows in its
+    // constituent partitions each time a split is materialized which could result in
+    // overlapping splits. To prevent this, we explicitly sort each input partition to make the
+    // ordering deterministic. Note that MapTypes cannot be sorted and are explicitly pruned out
+    // from the sort order.
+    val sortOrder = input.output
+      .filter(attr => RowOrdering.isOrderable(attr.dataType))
+      .map(SortOrder(_, Ascending))
+    if (sortOrder.nonEmpty) {
+      Sort(sortOrder, global = false, input)

Review Comment:
   I suspect whether the sort here make dataset deterministic, maybe we should apply this sorter https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala#L338-L369



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] zhengruifeng closed pull request #39151: [WIP] Introduce a proto message to make plan deterministic

Posted by GitBox <gi...@apache.org>.
zhengruifeng closed pull request #39151: [WIP] Introduce a proto message to make plan deterministic
URL: https://github.com/apache/spark/pull/39151


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] zhengruifeng commented on pull request #39151: [WIP] Introduce a proto message to make plan deterministic

Posted by GitBox <gi...@apache.org>.
zhengruifeng commented on PR #39151:
URL: https://github.com/apache/spark/pull/39151#issuecomment-1360941255

   quick tests:
   
   1, cache (after make some private ones public)
   
   ```
   
   scala> val df = spark.range(0, 100)
   df: org.apache.spark.sql.Dataset[Long] = [id: bigint]
   
   scala> df.cache()
   res0: df.type = [id: bigint]
   
   scala> val plan = df.logicalPlan
   plan: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan =
   Range (0, 100, step=1, splits=Some(10))
   
   scala> import org.apache.spark.sql.{Column, Dataset, SparkSession}
   import org.apache.spark.sql.{Column, Dataset, SparkSession}
   
   scala> val df2 = Dataset.ofRows(spark, plan)
   df2: org.apache.spark.sql.DataFrame = [id: bigint]
   
   scala> df.storageLevel
   res1: org.apache.spark.storage.StorageLevel = StorageLevel(disk, memory, deserialized, 1 replicas)
   
   scala> df2.storageLevel
   res2: org.apache.spark.storage.StorageLevel = StorageLevel(disk, memory, deserialized, 1 replicas)
   
   scala>  df == df2
   res3: Boolean = false
   
   scala> df2.explain
   == Physical Plan ==
   InMemoryTableScan [id#0L]
      +- InMemoryRelation [id#0L], StorageLevel(disk, memory, deserialized, 1 replicas)
            +- *(1) Range (0, 100, step=1, splits=10)
   
   
   
   scala> df.explain
   == Physical Plan ==
   InMemoryTableScan [id#0L]
      +- InMemoryRelation [id#0L], StorageLevel(disk, memory, deserialized, 1 replicas)
            +- *(1) Range (0, 100, step=1, splits=10)
   
   ```
   
   2, relation
   
   ```
   scala> val df = spark.range(0, 100)
   df: org.apache.spark.sql.Dataset[Long] = [id: bigint]
   
   scala> df.write.parquet("/tmp/1.pq")
                                                                                   
   scala> val df2 = spark.read.parquet("/tmp/1.pq")
   df2: org.apache.spark.sql.DataFrame = [id: bigint]
   
   scala> df2.queryExecution.logical.deterministic
   res2: Boolean = true
   
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org