You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2022/12/22 13:39:56 UTC

[GitHub] [spark] beliefer opened a new pull request, #39182: [SPARK-41440][CONNECT][PYTHON] Create a new proto message for `RandomSplit`

beliefer opened a new pull request, #39182:
URL: https://github.com/apache/spark/pull/39182

   ### What changes were proposed in this pull request?
   https://github.com/apache/spark/pull/39017 supported `DataFrame.randomSplit`.
   
   But it exists two issue.
   https://github.com/apache/spark/pull/39017 let Sample at server coupled with RandomSplit.
   
   https://github.com/apache/spark/pull/39017 let Sample cache plan too.
   
   ### Why are the changes needed?
   This PR will decoupling between Sample and RandomSplit.
   This PR also avoid the cache operator for Sample.
   
   ### Does this PR introduce _any_ user-facing change?
   'No'.
   New feature.
   
   
   ### How was this patch tested?
   Tests updated.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] amaliujia commented on pull request #39182: [SPARK-41440][CONNECT][PYTHON] Create a new proto message for `RandomSplit`

Posted by GitBox <gi...@apache.org>.
amaliujia commented on PR #39182:
URL: https://github.com/apache/spark/pull/39182#issuecomment-1363211317

   cc @zhengruifeng @cloud-fan 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] beliefer commented on a diff in pull request #39182: [SPARK-41440][CONNECT][PYTHON] Create a new proto message for `RandomSplit`

Posted by GitBox <gi...@apache.org>.
beliefer commented on code in PR #39182:
URL: https://github.com/apache/spark/pull/39182#discussion_r1056347536


##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala:
##########
@@ -126,39 +128,37 @@ class SparkConnectPlanner(session: SparkSession) {
     SubqueryAlias(aliasIdentifier, transformRelation(alias.getInput))
   }
 
+  private def transformDeterministicOrder(rel: proto.DeterministicOrder): LogicalPlan = {
+    val input = Dataset.ofRows(session, transformRelation(rel.getInput))
+    // It is possible that the underlying dataframe doesn't guarantee the ordering of rows in its
+    // constituent partitions each time a split is materialized which could result in
+    // overlapping splits. To prevent this, we explicitly sort each input partition to make the
+    // ordering deterministic. Note that MapTypes cannot be sorted and are explicitly pruned out
+    // from the sort order.
+    val sortOrder = input.logicalPlan.output
+      .filter(attr => RowOrdering.isOrderable(attr.dataType))
+      .map(SortOrder(_, Ascending))
+    if (sortOrder.nonEmpty) {
+      Sort(sortOrder, global = false, input.logicalPlan)
+    } else {
+      input.cache()
+      input.logicalPlan
+    }
+  }
+
   /**
    * All fields of [[proto.Sample]] are optional. However, given those are proto primitive types,
    * we cannot differentiate if the field is not or set when the field's value equals to the type
    * default value. In the future if this ever become a problem, one solution could be that to
    * wrap such fields into proto messages.
    */
   private def transformSample(rel: proto.Sample): LogicalPlan = {
-    val input = Dataset.ofRows(session, transformRelation(rel.getInput))
-    val plan = if (rel.getForceStableSort) {

Review Comment:
   When using DeterministicOrder, if we can apply sort, we not need cache. otherwise, we need cache.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] beliefer commented on pull request #39182: [SPARK-41440][CONNECT][PYTHON] Create a new proto message for `RandomSplit`

Posted by GitBox <gi...@apache.org>.
beliefer commented on PR #39182:
URL: https://github.com/apache/spark/pull/39182#issuecomment-1365734322

   This PR replaces by https://github.com/apache/spark/pull/39240 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a diff in pull request #39182: [SPARK-41440][CONNECT][PYTHON] Create a new proto message for `RandomSplit`

Posted by GitBox <gi...@apache.org>.
cloud-fan commented on code in PR #39182:
URL: https://github.com/apache/spark/pull/39182#discussion_r1056156387


##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala:
##########
@@ -126,39 +128,37 @@ class SparkConnectPlanner(session: SparkSession) {
     SubqueryAlias(aliasIdentifier, transformRelation(alias.getInput))
   }
 
+  private def transformDeterministicOrder(rel: proto.DeterministicOrder): LogicalPlan = {
+    val input = Dataset.ofRows(session, transformRelation(rel.getInput))
+    // It is possible that the underlying dataframe doesn't guarantee the ordering of rows in its
+    // constituent partitions each time a split is materialized which could result in
+    // overlapping splits. To prevent this, we explicitly sort each input partition to make the
+    // ordering deterministic. Note that MapTypes cannot be sorted and are explicitly pruned out
+    // from the sort order.
+    val sortOrder = input.logicalPlan.output
+      .filter(attr => RowOrdering.isOrderable(attr.dataType))
+      .map(SortOrder(_, Ascending))
+    if (sortOrder.nonEmpty) {
+      Sort(sortOrder, global = false, input.logicalPlan)
+    } else {
+      input.cache()
+      input.logicalPlan
+    }
+  }
+
   /**
    * All fields of [[proto.Sample]] are optional. However, given those are proto primitive types,
    * we cannot differentiate if the field is not or set when the field's value equals to the type
    * default value. In the future if this ever become a problem, one solution could be that to
    * wrap such fields into proto messages.
    */
   private def transformSample(rel: proto.Sample): LogicalPlan = {
-    val input = Dataset.ofRows(session, transformRelation(rel.getInput))
-    val plan = if (rel.getForceStableSort) {

Review Comment:
   then how do we choose between sort and cache?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a diff in pull request #39182: [SPARK-41440][CONNECT][PYTHON] Create a new proto message for `RandomSplit`

Posted by GitBox <gi...@apache.org>.
cloud-fan commented on code in PR #39182:
URL: https://github.com/apache/spark/pull/39182#discussion_r1057418565


##########
connector/connect/common/src/main/protobuf/spark/connect/relations.proto:
##########
@@ -378,10 +379,6 @@ message Sample {
 
   // (Optional) The random seed.
   optional int64 seed = 5;
-
-  // (Optional) Explicitly sort the underlying plan to make the ordering deterministic.
-  // This flag is only used to randomly splits DataFrame with the provided weights.
-  optional bool force_stable_sort = 6;

Review Comment:
   I think this PR makes more changes than simply adding a new proto message. It makes me think that we don't really need a new proto message.
   
   We can just have a `deterministic_order` bool flag in the `Sample` message. If it's false, then it's a normal sample. If it's true, we try to sort first, if can't sort, then cache the dataframe.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] beliefer commented on a diff in pull request #39182: [SPARK-41440][CONNECT][PYTHON] Create a new proto message for `RandomSplit`

Posted by GitBox <gi...@apache.org>.
beliefer commented on code in PR #39182:
URL: https://github.com/apache/spark/pull/39182#discussion_r1056127197


##########
connector/connect/common/src/main/protobuf/spark/connect/relations.proto:
##########
@@ -598,3 +595,10 @@ message ToSchema {
   // The Sever side will update the dataframe with this schema.
   DataType schema = 2;
 }
+
+// Explicitly sort the underlying plan to make the ordering deterministic for Sample.
+// Note: this message is only used to randomly splits DataFrame with the provided weights.
+message DeterministicOrder {

Review Comment:
   DeterministicOrder just used for explicit ordering. It means the underlying dataframe doesn't guarantee the ordering of rows.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] beliefer commented on a diff in pull request #39182: [SPARK-41440][CONNECT][PYTHON] Create a new proto message for `RandomSplit`

Posted by GitBox <gi...@apache.org>.
beliefer commented on code in PR #39182:
URL: https://github.com/apache/spark/pull/39182#discussion_r1057544972


##########
connector/connect/common/src/main/protobuf/spark/connect/relations.proto:
##########
@@ -378,10 +379,6 @@ message Sample {
 
   // (Optional) The random seed.
   optional int64 seed = 5;
-
-  // (Optional) Explicitly sort the underlying plan to make the ordering deterministic.
-  // This flag is only used to randomly splits DataFrame with the provided weights.
-  optional bool force_stable_sort = 6;

Review Comment:
   Please see https://github.com/apache/spark/pull/39240



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] beliefer commented on a diff in pull request #39182: [SPARK-41440][CONNECT][PYTHON] Create a new proto message for `RandomSplit`

Posted by GitBox <gi...@apache.org>.
beliefer commented on code in PR #39182:
URL: https://github.com/apache/spark/pull/39182#discussion_r1057505509


##########
connector/connect/common/src/main/protobuf/spark/connect/relations.proto:
##########
@@ -378,10 +379,6 @@ message Sample {
 
   // (Optional) The random seed.
   optional int64 seed = 5;
-
-  // (Optional) Explicitly sort the underlying plan to make the ordering deterministic.
-  // This flag is only used to randomly splits DataFrame with the provided weights.
-  optional bool force_stable_sort = 6;

Review Comment:
   OK



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] beliefer commented on a diff in pull request #39182: [SPARK-41440][CONNECT][PYTHON] Create a new proto message for `RandomSplit`

Posted by GitBox <gi...@apache.org>.
beliefer commented on code in PR #39182:
URL: https://github.com/apache/spark/pull/39182#discussion_r1056127197


##########
connector/connect/common/src/main/protobuf/spark/connect/relations.proto:
##########
@@ -598,3 +595,10 @@ message ToSchema {
   // The Sever side will update the dataframe with this schema.
   DataType schema = 2;
 }
+
+// Explicitly sort the underlying plan to make the ordering deterministic for Sample.
+// Note: this message is only used to randomly splits DataFrame with the provided weights.
+message DeterministicOrder {

Review Comment:
   `DeterministicOrder` just used for explicit ordering. It means the underlying dataframe doesn't guarantee the ordering of rows.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] zhengruifeng commented on a diff in pull request #39182: [SPARK-41440][CONNECT][PYTHON] Create a new proto message for `RandomSplit`

Posted by GitBox <gi...@apache.org>.
zhengruifeng commented on code in PR #39182:
URL: https://github.com/apache/spark/pull/39182#discussion_r1057499120


##########
connector/connect/common/src/main/protobuf/spark/connect/relations.proto:
##########
@@ -378,10 +379,6 @@ message Sample {
 
   // (Optional) The random seed.
   optional int64 seed = 5;
-
-  // (Optional) Explicitly sort the underlying plan to make the ordering deterministic.
-  // This flag is only used to randomly splits DataFrame with the provided weights.
-  optional bool force_stable_sort = 6;

Review Comment:
   ok.  @beliefer  let's follow @cloud-fan 's suggestion 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] beliefer closed pull request #39182: [SPARK-41440][CONNECT][PYTHON] Create a new proto message for `RandomSplit`

Posted by GitBox <gi...@apache.org>.
beliefer closed pull request #39182: [SPARK-41440][CONNECT][PYTHON] Create a new proto message for `RandomSplit`
URL: https://github.com/apache/spark/pull/39182


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] beliefer commented on a diff in pull request #39182: [SPARK-41440][CONNECT][PYTHON] Create a new proto message for `RandomSplit`

Posted by GitBox <gi...@apache.org>.
beliefer commented on code in PR #39182:
URL: https://github.com/apache/spark/pull/39182#discussion_r1056347536


##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala:
##########
@@ -126,39 +128,37 @@ class SparkConnectPlanner(session: SparkSession) {
     SubqueryAlias(aliasIdentifier, transformRelation(alias.getInput))
   }
 
+  private def transformDeterministicOrder(rel: proto.DeterministicOrder): LogicalPlan = {
+    val input = Dataset.ofRows(session, transformRelation(rel.getInput))
+    // It is possible that the underlying dataframe doesn't guarantee the ordering of rows in its
+    // constituent partitions each time a split is materialized which could result in
+    // overlapping splits. To prevent this, we explicitly sort each input partition to make the
+    // ordering deterministic. Note that MapTypes cannot be sorted and are explicitly pruned out
+    // from the sort order.
+    val sortOrder = input.logicalPlan.output
+      .filter(attr => RowOrdering.isOrderable(attr.dataType))
+      .map(SortOrder(_, Ascending))
+    if (sortOrder.nonEmpty) {
+      Sort(sortOrder, global = false, input.logicalPlan)
+    } else {
+      input.cache()
+      input.logicalPlan
+    }
+  }
+
   /**
    * All fields of [[proto.Sample]] are optional. However, given those are proto primitive types,
    * we cannot differentiate if the field is not or set when the field's value equals to the type
    * default value. In the future if this ever become a problem, one solution could be that to
    * wrap such fields into proto messages.
    */
   private def transformSample(rel: proto.Sample): LogicalPlan = {
-    val input = Dataset.ofRows(session, transformRelation(rel.getInput))
-    val plan = if (rel.getForceStableSort) {

Review Comment:
   When using DeterministicOrder, if we can apply sort, cache no need. otherwise, cache needed.



##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala:
##########
@@ -126,39 +128,37 @@ class SparkConnectPlanner(session: SparkSession) {
     SubqueryAlias(aliasIdentifier, transformRelation(alias.getInput))
   }
 
+  private def transformDeterministicOrder(rel: proto.DeterministicOrder): LogicalPlan = {
+    val input = Dataset.ofRows(session, transformRelation(rel.getInput))
+    // It is possible that the underlying dataframe doesn't guarantee the ordering of rows in its
+    // constituent partitions each time a split is materialized which could result in
+    // overlapping splits. To prevent this, we explicitly sort each input partition to make the
+    // ordering deterministic. Note that MapTypes cannot be sorted and are explicitly pruned out
+    // from the sort order.
+    val sortOrder = input.logicalPlan.output
+      .filter(attr => RowOrdering.isOrderable(attr.dataType))
+      .map(SortOrder(_, Ascending))
+    if (sortOrder.nonEmpty) {
+      Sort(sortOrder, global = false, input.logicalPlan)
+    } else {
+      input.cache()
+      input.logicalPlan
+    }
+  }
+
   /**
    * All fields of [[proto.Sample]] are optional. However, given those are proto primitive types,
    * we cannot differentiate if the field is not or set when the field's value equals to the type
    * default value. In the future if this ever become a problem, one solution could be that to
    * wrap such fields into proto messages.
    */
   private def transformSample(rel: proto.Sample): LogicalPlan = {
-    val input = Dataset.ofRows(session, transformRelation(rel.getInput))
-    val plan = if (rel.getForceStableSort) {

Review Comment:
   When using `DeterministicOrder`, if we can apply sort, cache no need. otherwise, cache needed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] beliefer commented on a diff in pull request #39182: [SPARK-41440][CONNECT][PYTHON] Create a new proto message for `RandomSplit`

Posted by GitBox <gi...@apache.org>.
beliefer commented on code in PR #39182:
URL: https://github.com/apache/spark/pull/39182#discussion_r1056037625


##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala:
##########
@@ -126,39 +128,37 @@ class SparkConnectPlanner(session: SparkSession) {
     SubqueryAlias(aliasIdentifier, transformRelation(alias.getInput))
   }
 
+  private def transformDeterministicOrder(rel: proto.DeterministicOrder): LogicalPlan = {
+    val input = Dataset.ofRows(session, transformRelation(rel.getInput))
+    // It is possible that the underlying dataframe doesn't guarantee the ordering of rows in its
+    // constituent partitions each time a split is materialized which could result in
+    // overlapping splits. To prevent this, we explicitly sort each input partition to make the
+    // ordering deterministic. Note that MapTypes cannot be sorted and are explicitly pruned out
+    // from the sort order.
+    val sortOrder = input.logicalPlan.output
+      .filter(attr => RowOrdering.isOrderable(attr.dataType))
+      .map(SortOrder(_, Ascending))
+    if (sortOrder.nonEmpty) {
+      Sort(sortOrder, global = false, input.logicalPlan)
+    } else {
+      input.cache()
+      input.logicalPlan
+    }
+  }
+
   /**
    * All fields of [[proto.Sample]] are optional. However, given those are proto primitive types,
    * we cannot differentiate if the field is not or set when the field's value equals to the type
    * default value. In the future if this ever become a problem, one solution could be that to
    * wrap such fields into proto messages.
    */
   private def transformSample(rel: proto.Sample): LogicalPlan = {
-    val input = Dataset.ofRows(session, transformRelation(rel.getInput))
-    val plan = if (rel.getForceStableSort) {

Review Comment:
   We use `DeterministicOrder` message to instead `force stable sort`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a diff in pull request #39182: [SPARK-41440][CONNECT][PYTHON] Create a new proto message for `RandomSplit`

Posted by GitBox <gi...@apache.org>.
cloud-fan commented on code in PR #39182:
URL: https://github.com/apache/spark/pull/39182#discussion_r1056014283


##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala:
##########
@@ -126,39 +128,37 @@ class SparkConnectPlanner(session: SparkSession) {
     SubqueryAlias(aliasIdentifier, transformRelation(alias.getInput))
   }
 
+  private def transformDeterministicOrder(rel: proto.DeterministicOrder): LogicalPlan = {
+    val input = Dataset.ofRows(session, transformRelation(rel.getInput))
+    // It is possible that the underlying dataframe doesn't guarantee the ordering of rows in its
+    // constituent partitions each time a split is materialized which could result in
+    // overlapping splits. To prevent this, we explicitly sort each input partition to make the
+    // ordering deterministic. Note that MapTypes cannot be sorted and are explicitly pruned out
+    // from the sort order.
+    val sortOrder = input.logicalPlan.output
+      .filter(attr => RowOrdering.isOrderable(attr.dataType))
+      .map(SortOrder(_, Ascending))
+    if (sortOrder.nonEmpty) {
+      Sort(sortOrder, global = false, input.logicalPlan)
+    } else {
+      input.cache()
+      input.logicalPlan
+    }
+  }
+
   /**
    * All fields of [[proto.Sample]] are optional. However, given those are proto primitive types,
    * we cannot differentiate if the field is not or set when the field's value equals to the type
    * default value. In the future if this ever become a problem, one solution could be that to
    * wrap such fields into proto messages.
    */
   private def transformSample(rel: proto.Sample): LogicalPlan = {
-    val input = Dataset.ofRows(session, transformRelation(rel.getInput))
-    val plan = if (rel.getForceStableSort) {

Review Comment:
   we don't need the `force stable sort` anymore?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] amaliujia commented on a diff in pull request #39182: [SPARK-41440][CONNECT][PYTHON] Create a new proto message for `RandomSplit`

Posted by GitBox <gi...@apache.org>.
amaliujia commented on code in PR #39182:
URL: https://github.com/apache/spark/pull/39182#discussion_r1056016683


##########
connector/connect/common/src/main/protobuf/spark/connect/relations.proto:
##########
@@ -598,3 +595,10 @@ message ToSchema {
   // The Sever side will update the dataframe with this schema.
   DataType schema = 2;
 }
+
+// Explicitly sort the underlying plan to make the ordering deterministic for Sample.
+// Note: this message is only used to randomly splits DataFrame with the provided weights.
+message DeterministicOrder {

Review Comment:
   maybe also document that when there is no explicit ordering then the input plan is cached?   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] amaliujia commented on a diff in pull request #39182: [SPARK-41440][CONNECT][PYTHON] Create a new proto message for `RandomSplit`

Posted by GitBox <gi...@apache.org>.
amaliujia commented on code in PR #39182:
URL: https://github.com/apache/spark/pull/39182#discussion_r1056714950


##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala:
##########
@@ -126,39 +128,37 @@ class SparkConnectPlanner(session: SparkSession) {
     SubqueryAlias(aliasIdentifier, transformRelation(alias.getInput))
   }
 
+  private def transformDeterministicOrder(rel: proto.DeterministicOrder): LogicalPlan = {
+    val input = Dataset.ofRows(session, transformRelation(rel.getInput))
+    // It is possible that the underlying dataframe doesn't guarantee the ordering of rows in its
+    // constituent partitions each time a split is materialized which could result in
+    // overlapping splits. To prevent this, we explicitly sort each input partition to make the
+    // ordering deterministic. Note that MapTypes cannot be sorted and are explicitly pruned out
+    // from the sort order.
+    val sortOrder = input.logicalPlan.output
+      .filter(attr => RowOrdering.isOrderable(attr.dataType))
+      .map(SortOrder(_, Ascending))
+    if (sortOrder.nonEmpty) {
+      Sort(sortOrder, global = false, input.logicalPlan)
+    } else {
+      input.cache()
+      input.logicalPlan
+    }
+  }
+
   /**
    * All fields of [[proto.Sample]] are optional. However, given those are proto primitive types,
    * we cannot differentiate if the field is not or set when the field's value equals to the type
    * default value. In the future if this ever become a problem, one solution could be that to
    * wrap such fields into proto messages.
    */
   private def transformSample(rel: proto.Sample): LogicalPlan = {
-    val input = Dataset.ofRows(session, transformRelation(rel.getInput))
-    val plan = if (rel.getForceStableSort) {

Review Comment:
   yeah the difference is if there is any sort order provided. I am guessing because if there is sort order, we can then always sort to guarantee the output thus no need to cache.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org