You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2022/12/10 11:58:52 UTC

[GitHub] [spark] beliefer opened a new pull request, #39017: [SPARK-41440][CONNECT][PYTHON] Implement `DataFrame.randomSplit`

beliefer opened a new pull request, #39017:
URL: https://github.com/apache/spark/pull/39017

   ### What changes were proposed in this pull request?
   Implement `DataFrame.randomSplit` with a proto message
   
   Implement `DataFrame.randomSplit` for scala API
   Implement `DataFrame.randomSplit` for python API
   
   
   ### Why are the changes needed?
   for Connect API coverage
   
   
   ### Does this PR introduce _any_ user-facing change?
   'No'. New API
   
   
   ### How was this patch tested?
   New test cases.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a diff in pull request #39017: [SPARK-41440][CONNECT][PYTHON] Implement `DataFrame.randomSplit`

Posted by GitBox <gi...@apache.org>.
cloud-fan commented on code in PR #39017:
URL: https://github.com/apache/spark/pull/39017#discussion_r1053961520


##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala:
##########
@@ -132,12 +132,32 @@ class SparkConnectPlanner(session: SparkSession) {
    * wrap such fields into proto messages.
    */
   private def transformSample(rel: proto.Sample): LogicalPlan = {
+    val input = Dataset.ofRows(session, transformRelation(rel.getInput))
+    val plan = if (rel.getForceStableSort) {
+      // It is possible that the underlying dataframe doesn't guarantee the ordering of rows in its
+      // constituent partitions each time a split is materialized which could result in
+      // overlapping splits. To prevent this, we explicitly sort each input partition to make the
+      // ordering deterministic. Note that MapTypes cannot be sorted and are explicitly pruned out
+      // from the sort order.
+      val sortOrder = input.logicalPlan.output
+        .filter(attr => RowOrdering.isOrderable(attr.dataType))
+        .map(SortOrder(_, Ascending))
+      if (sortOrder.nonEmpty) {
+        Sort(sortOrder, global = false, input.logicalPlan)
+      } else {
+        input.logicalPlan
+      }
+    } else {
+      input.cache()

Review Comment:
   This code path is for `df.sample` as well, does it need cache too?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] zhengruifeng commented on a diff in pull request #39017: [SPARK-41440][CONNECT][PYTHON] Implement `DataFrame.randomSplit`

Posted by GitBox <gi...@apache.org>.
zhengruifeng commented on code in PR #39017:
URL: https://github.com/apache/spark/pull/39017#discussion_r1053990188


##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala:
##########
@@ -132,12 +132,32 @@ class SparkConnectPlanner(session: SparkSession) {
    * wrap such fields into proto messages.
    */
   private def transformSample(rel: proto.Sample): LogicalPlan = {
+    val input = Dataset.ofRows(session, transformRelation(rel.getInput))
+    val plan = if (rel.getForceStableSort) {
+      // It is possible that the underlying dataframe doesn't guarantee the ordering of rows in its
+      // constituent partitions each time a split is materialized which could result in
+      // overlapping splits. To prevent this, we explicitly sort each input partition to make the
+      // ordering deterministic. Note that MapTypes cannot be sorted and are explicitly pruned out
+      // from the sort order.
+      val sortOrder = input.logicalPlan.output
+        .filter(attr => RowOrdering.isOrderable(attr.dataType))
+        .map(SortOrder(_, Ascending))
+      if (sortOrder.nonEmpty) {
+        Sort(sortOrder, global = false, input.logicalPlan)
+      } else {
+        input.logicalPlan
+      }
+    } else {
+      input.cache()

Review Comment:
   ok, let me have a try



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon closed pull request #39017: [SPARK-41440][CONNECT][PYTHON] Implement `DataFrame.randomSplit`

Posted by GitBox <gi...@apache.org>.
HyukjinKwon closed pull request #39017: [SPARK-41440][CONNECT][PYTHON] Implement `DataFrame.randomSplit`
URL: https://github.com/apache/spark/pull/39017


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] beliefer commented on pull request #39017: [SPARK-41440][CONNECT][PYTHON] Implement `DataFrame.randomSplit`

Posted by GitBox <gi...@apache.org>.
beliefer commented on PR #39017:
URL: https://github.com/apache/spark/pull/39017#issuecomment-1345491299

   ping @zhengruifeng @amaliujia @HyukjinKwon 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] beliefer commented on a diff in pull request #39017: [SPARK-41440][CONNECT][PYTHON] Implement `DataFrame.randomSplit`

Posted by GitBox <gi...@apache.org>.
beliefer commented on code in PR #39017:
URL: https://github.com/apache/spark/pull/39017#discussion_r1046787533


##########
python/pyspark/sql/connect/dataframe.py:
##########
@@ -875,6 +877,60 @@ def to_jcols(
 
     melt = unpivot
 
+    def randomSplit(
+        self,
+        weights: List[float],
+        seed: Optional[int] = None,
+    ) -> List["DataFrame"]:
+        """Randomly splits this :class:`DataFrame` with the provided weights.
+
+        .. versionadded:: 3.4.0
+
+        Parameters
+        ----------
+        weights : list
+            list of doubles as weights with which to split the :class:`DataFrame`.
+            Weights will be normalized if they don't sum up to 1.0.
+        seed : int, optional
+            The seed for sampling.
+
+        Returns
+        -------
+        list
+            List of DataFrames.
+        """
+        for w in weights:
+            if w < 0.0:
+                raise ValueError("Weights must be positive. Found weight value: %s" % w)
+        seed = seed if seed is not None else random.randint(0, sys.maxsize)
+        total = sum(weights)
+        if total <= 0:
+            raise ValueError("Sum of weights must be positive, but got: %s" % w)
+        proportions = list(map(lambda x: x / total, weights))
+        normalizedCumWeights = [0.0]
+        for v in proportions:
+            normalizedCumWeights.append(normalizedCumWeights[-1] + v)
+        j = 1
+        length = len(normalizedCumWeights)
+        result = []
+        while j < length:
+            lowerBound = normalizedCumWeights[j - 1]
+            upperBound = normalizedCumWeights[j]
+            samplePlan = DataFrame.withPlan(
+                plan.Sample(
+                    child=self._plan,
+                    lower_bound=lowerBound,
+                    upper_bound=upperBound,
+                    with_replacement=False,
+                    seed=int(seed),
+                ),
+                session=self._session,
+            )
+            result.append(samplePlan)
+            j += 1
+
+        return result

Review Comment:
   randomSplit of Dataset returns multiple Dataset. This method just follows the former's behavior.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] beliefer commented on pull request #39017: [SPARK-41440][CONNECT][PYTHON] Implement `DataFrame.randomSplit`

Posted by GitBox <gi...@apache.org>.
beliefer commented on PR #39017:
URL: https://github.com/apache/spark/pull/39017#issuecomment-1352686464

   > and then the python client side can generate all the sampled splits from this on
   
   ping @HyukjinKwon 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] zhengruifeng commented on a diff in pull request #39017: [SPARK-41440][CONNECT][PYTHON] Implement `DataFrame.randomSplit`

Posted by GitBox <gi...@apache.org>.
zhengruifeng commented on code in PR #39017:
URL: https://github.com/apache/spark/pull/39017#discussion_r1047998212


##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala:
##########
@@ -132,12 +132,31 @@ class SparkConnectPlanner(session: SparkSession) {
    * wrap such fields into proto messages.
    */
   private def transformSample(rel: proto.Sample): LogicalPlan = {
+    val logicalPlan = transformRelation(rel.getInput)
+    val plan = if (rel.getForceStableSort) {
+      // It is possible that the underlying dataframe doesn't guarantee the ordering of rows in its
+      // constituent partitions each time a split is materialized which could result in
+      // overlapping splits. To prevent this, we explicitly sort each input partition to make the
+      // ordering deterministic. Note that MapTypes cannot be sorted and are explicitly pruned out
+      // from the sort order.
+      val sortOrder = logicalPlan.output
+        .filter(attr => RowOrdering.isOrderable(attr.dataType))
+        .map(SortOrder(_, Ascending))
+      if (sortOrder.nonEmpty) {
+        Sort(sortOrder, global = false, logicalPlan)
+      } else {
+        logicalPlan

Review Comment:
   not sure whether this works:
   
   ```
       val input = Dataset.ofRows(session, transformRelation(rel.getInput))
       val dataset = if (...) { input.sort(...) } else { input.cache() }
       dataset.logicalPlan
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a diff in pull request #39017: [SPARK-41440][CONNECT][PYTHON] Implement `DataFrame.randomSplit`

Posted by GitBox <gi...@apache.org>.
cloud-fan commented on code in PR #39017:
URL: https://github.com/apache/spark/pull/39017#discussion_r1054345074


##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala:
##########
@@ -132,12 +132,32 @@ class SparkConnectPlanner(session: SparkSession) {
    * wrap such fields into proto messages.
    */
   private def transformSample(rel: proto.Sample): LogicalPlan = {
+    val input = Dataset.ofRows(session, transformRelation(rel.getInput))
+    val plan = if (rel.getForceStableSort) {
+      // It is possible that the underlying dataframe doesn't guarantee the ordering of rows in its
+      // constituent partitions each time a split is materialized which could result in
+      // overlapping splits. To prevent this, we explicitly sort each input partition to make the
+      // ordering deterministic. Note that MapTypes cannot be sorted and are explicitly pruned out
+      // from the sort order.
+      val sortOrder = input.logicalPlan.output
+        .filter(attr => RowOrdering.isOrderable(attr.dataType))
+        .map(SortOrder(_, Ascending))
+      if (sortOrder.nonEmpty) {
+        Sort(sortOrder, global = false, input.logicalPlan)
+      } else {
+        input.logicalPlan
+      }
+    } else {
+      input.cache()

Review Comment:
   Then we need a new proto message for randomSplit, and can't reuse Sample, to avoid adding extra cache for `df.sample`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] beliefer commented on a diff in pull request #39017: [SPARK-41440][CONNECT][PYTHON] Implement `DataFrame.randomSplit`

Posted by GitBox <gi...@apache.org>.
beliefer commented on code in PR #39017:
URL: https://github.com/apache/spark/pull/39017#discussion_r1055470653


##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala:
##########
@@ -132,12 +132,32 @@ class SparkConnectPlanner(session: SparkSession) {
    * wrap such fields into proto messages.
    */
   private def transformSample(rel: proto.Sample): LogicalPlan = {
+    val input = Dataset.ofRows(session, transformRelation(rel.getInput))
+    val plan = if (rel.getForceStableSort) {
+      // It is possible that the underlying dataframe doesn't guarantee the ordering of rows in its
+      // constituent partitions each time a split is materialized which could result in
+      // overlapping splits. To prevent this, we explicitly sort each input partition to make the
+      // ordering deterministic. Note that MapTypes cannot be sorted and are explicitly pruned out
+      // from the sort order.
+      val sortOrder = input.logicalPlan.output
+        .filter(attr => RowOrdering.isOrderable(attr.dataType))
+        .map(SortOrder(_, Ascending))
+      if (sortOrder.nonEmpty) {
+        Sort(sortOrder, global = false, input.logicalPlan)
+      } else {
+        input.logicalPlan
+      }
+    } else {
+      input.cache()

Review Comment:
   I submit another PR. refer https://github.com/apache/spark/pull/39182



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] zhengruifeng commented on a diff in pull request #39017: [SPARK-41440][CONNECT][PYTHON] Implement `DataFrame.randomSplit`

Posted by GitBox <gi...@apache.org>.
zhengruifeng commented on code in PR #39017:
URL: https://github.com/apache/spark/pull/39017#discussion_r1054993058


##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala:
##########
@@ -132,12 +132,32 @@ class SparkConnectPlanner(session: SparkSession) {
    * wrap such fields into proto messages.
    */
   private def transformSample(rel: proto.Sample): LogicalPlan = {
+    val input = Dataset.ofRows(session, transformRelation(rel.getInput))
+    val plan = if (rel.getForceStableSort) {
+      // It is possible that the underlying dataframe doesn't guarantee the ordering of rows in its
+      // constituent partitions each time a split is materialized which could result in
+      // overlapping splits. To prevent this, we explicitly sort each input partition to make the
+      // ordering deterministic. Note that MapTypes cannot be sorted and are explicitly pruned out
+      // from the sort order.
+      val sortOrder = input.logicalPlan.output
+        .filter(attr => RowOrdering.isOrderable(attr.dataType))
+        .map(SortOrder(_, Ascending))
+      if (sortOrder.nonEmpty) {
+        Sort(sortOrder, global = false, input.logicalPlan)
+      } else {
+        input.logicalPlan
+      }
+    } else {
+      input.cache()

Review Comment:
   ok, please just fix cache for Sample @beliefer 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] beliefer commented on a diff in pull request #39017: [SPARK-41440][CONNECT][PYTHON] Implement `DataFrame.randomSplit`

Posted by GitBox <gi...@apache.org>.
beliefer commented on code in PR #39017:
URL: https://github.com/apache/spark/pull/39017#discussion_r1047909144


##########
python/pyspark/sql/connect/dataframe.py:
##########
@@ -875,6 +877,60 @@ def to_jcols(
 
     melt = unpivot
 
+    def randomSplit(
+        self,
+        weights: List[float],
+        seed: Optional[int] = None,
+    ) -> List["DataFrame"]:
+        """Randomly splits this :class:`DataFrame` with the provided weights.
+
+        .. versionadded:: 3.4.0
+
+        Parameters
+        ----------
+        weights : list
+            list of doubles as weights with which to split the :class:`DataFrame`.
+            Weights will be normalized if they don't sum up to 1.0.
+        seed : int, optional
+            The seed for sampling.
+
+        Returns
+        -------
+        list
+            List of DataFrames.
+        """
+        for w in weights:
+            if w < 0.0:
+                raise ValueError("Weights must be positive. Found weight value: %s" % w)
+        seed = seed if seed is not None else random.randint(0, sys.maxsize)
+        total = sum(weights)
+        if total <= 0:
+            raise ValueError("Sum of weights must be positive, but got: %s" % w)
+        proportions = list(map(lambda x: x / total, weights))
+        normalizedCumWeights = [0.0]
+        for v in proportions:
+            normalizedCumWeights.append(normalizedCumWeights[-1] + v)
+        j = 1
+        length = len(normalizedCumWeights)
+        result = []
+        while j < length:
+            lowerBound = normalizedCumWeights[j - 1]
+            upperBound = normalizedCumWeights[j]
+            samplePlan = DataFrame.withPlan(
+                plan.Sample(
+                    child=self._plan,
+                    lower_bound=lowerBound,
+                    upper_bound=upperBound,
+                    with_replacement=False,
+                    seed=int(seed),
+                ),
+                session=self._session,
+            )
+            result.append(samplePlan)
+            j += 1
+
+        return result

Review Comment:
   `forceStableSort` looks more reasonable.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] grundprinzip commented on a diff in pull request #39017: [SPARK-41440][CONNECT][PYTHON] Implement `DataFrame.randomSplit`

Posted by GitBox <gi...@apache.org>.
grundprinzip commented on code in PR #39017:
URL: https://github.com/apache/spark/pull/39017#discussion_r1049953432


##########
connector/connect/server/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectProtoSuite.scala:
##########
@@ -588,6 +588,22 @@ class SparkConnectProtoSuite extends PlanTest with SparkConnectPlanTest {
     comparePlans(connectPlan1, sparkPlan1)
   }
 
+  test("Test RandomSplit") {
+    val splitRelations0 = connectTestRelation.randomSplit(Array[Double](1, 2, 3), 1)

Review Comment:
   If I'm not mistaken all of the tests excercise the code path with `force_stable_sort` it would be good to test without as well to make sure to execute the else branch.



##########
python/pyspark/sql/tests/connect/test_connect_plan_only.py:
##########
@@ -228,6 +229,41 @@ def test_melt(self):
         self.assertEqual(plan.root.unpivot.variable_column_name, "variable")
         self.assertEqual(plan.root.unpivot.value_column_name, "value")
 
+    def test_random_split(self):

Review Comment:
   ```suggestion
       def test_random_split(self):
           # SPARK-41440: Support for Random split.
   ```



##########
python/pyspark/sql/tests/connect/test_connect_basic.py:
##########
@@ -806,6 +806,20 @@ def test_unpivot(self):
             .toPandas(),
         )
 
+    def test_random_split(self):

Review Comment:
   ```suggestion
       def test_random_split(self):
           # SPARK-41440: Support for Random Split.
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] beliefer commented on a diff in pull request #39017: [SPARK-41440][CONNECT][PYTHON] Implement `DataFrame.randomSplit`

Posted by GitBox <gi...@apache.org>.
beliefer commented on code in PR #39017:
URL: https://github.com/apache/spark/pull/39017#discussion_r1047996591


##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala:
##########
@@ -132,12 +132,31 @@ class SparkConnectPlanner(session: SparkSession) {
    * wrap such fields into proto messages.
    */
   private def transformSample(rel: proto.Sample): LogicalPlan = {
+    val logicalPlan = transformRelation(rel.getInput)
+    val plan = if (rel.getForceStableSort) {
+      // It is possible that the underlying dataframe doesn't guarantee the ordering of rows in its
+      // constituent partitions each time a split is materialized which could result in
+      // overlapping splits. To prevent this, we explicitly sort each input partition to make the
+      // ordering deterministic. Note that MapTypes cannot be sorted and are explicitly pruned out
+      // from the sort order.
+      val sortOrder = logicalPlan.output
+        .filter(attr => RowOrdering.isOrderable(attr.dataType))
+        .map(SortOrder(_, Ascending))
+      if (sortOrder.nonEmpty) {
+        Sort(sortOrder, global = false, logicalPlan)
+      } else {
+        logicalPlan

Review Comment:
   Yes. This is hard to call.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] zhengruifeng commented on pull request #39017: [SPARK-41440][CONNECT][PYTHON] Implement `DataFrame.randomSplit`

Posted by GitBox <gi...@apache.org>.
zhengruifeng commented on PR #39017:
URL: https://github.com/apache/spark/pull/39017#issuecomment-1350371627

   I personally prefer adding a separate proto message to create  a sorted or cached dataframe [here](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala#L2560-L2574) , and do not touch `Sample`
   
   and then the python client side can generate all the sampled splits from this one.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] amaliujia commented on a diff in pull request #39017: [SPARK-41440][CONNECT][PYTHON] Implement `DataFrame.randomSplit`

Posted by GitBox <gi...@apache.org>.
amaliujia commented on code in PR #39017:
URL: https://github.com/apache/spark/pull/39017#discussion_r1049950541


##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala:
##########
@@ -132,12 +132,31 @@ class SparkConnectPlanner(session: SparkSession) {
    * wrap such fields into proto messages.
    */
   private def transformSample(rel: proto.Sample): LogicalPlan = {
+    val logicalPlan = transformRelation(rel.getInput)
+    val plan = if (rel.getForceStableSort) {
+      // It is possible that the underlying dataframe doesn't guarantee the ordering of rows in its
+      // constituent partitions each time a split is materialized which could result in
+      // overlapping splits. To prevent this, we explicitly sort each input partition to make the
+      // ordering deterministic. Note that MapTypes cannot be sorted and are explicitly pruned out
+      // from the sort order.
+      val sortOrder = logicalPlan.output
+        .filter(attr => RowOrdering.isOrderable(attr.dataType))
+        .map(SortOrder(_, Ascending))
+      if (sortOrder.nonEmpty) {
+        Sort(sortOrder, global = false, logicalPlan)
+      } else {
+        logicalPlan

Review Comment:
   ah good catch



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] beliefer commented on a diff in pull request #39017: [SPARK-41440][CONNECT][PYTHON] Implement `DataFrame.randomSplit`

Posted by GitBox <gi...@apache.org>.
beliefer commented on code in PR #39017:
URL: https://github.com/apache/spark/pull/39017#discussion_r1050417409


##########
connector/connect/server/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectProtoSuite.scala:
##########
@@ -588,6 +588,22 @@ class SparkConnectProtoSuite extends PlanTest with SparkConnectPlanTest {
     comparePlans(connectPlan1, sparkPlan1)
   }
 
+  test("Test RandomSplit") {
+    val splitRelations0 = connectTestRelation.randomSplit(Array[Double](1, 2, 3), 1)

Review Comment:
   Yes. we only need test force_stable_sort is true here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] zhengruifeng commented on a diff in pull request #39017: [SPARK-41440][CONNECT][PYTHON] Implement `DataFrame.randomSplit`

Posted by GitBox <gi...@apache.org>.
zhengruifeng commented on code in PR #39017:
URL: https://github.com/apache/spark/pull/39017#discussion_r1053970824


##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala:
##########
@@ -132,12 +132,32 @@ class SparkConnectPlanner(session: SparkSession) {
    * wrap such fields into proto messages.
    */
   private def transformSample(rel: proto.Sample): LogicalPlan = {
+    val input = Dataset.ofRows(session, transformRelation(rel.getInput))
+    val plan = if (rel.getForceStableSort) {
+      // It is possible that the underlying dataframe doesn't guarantee the ordering of rows in its
+      // constituent partitions each time a split is materialized which could result in
+      // overlapping splits. To prevent this, we explicitly sort each input partition to make the
+      // ordering deterministic. Note that MapTypes cannot be sorted and are explicitly pruned out
+      // from the sort order.
+      val sortOrder = input.logicalPlan.output
+        .filter(attr => RowOrdering.isOrderable(attr.dataType))
+        .map(SortOrder(_, Ascending))
+      if (sortOrder.nonEmpty) {
+        Sort(sortOrder, global = false, input.logicalPlan)
+      } else {
+        input.logicalPlan
+      }
+    } else {
+      input.cache()

Review Comment:
   > This code path is for df.sample as well, does it need cache too?
   
   I think the original Sample is wrongly cached here



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] grundprinzip commented on a diff in pull request #39017: [SPARK-41440][CONNECT][PYTHON] Implement `DataFrame.randomSplit`

Posted by GitBox <gi...@apache.org>.
grundprinzip commented on code in PR #39017:
URL: https://github.com/apache/spark/pull/39017#discussion_r1054169096


##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala:
##########
@@ -132,12 +132,32 @@ class SparkConnectPlanner(session: SparkSession) {
    * wrap such fields into proto messages.
    */
   private def transformSample(rel: proto.Sample): LogicalPlan = {
+    val input = Dataset.ofRows(session, transformRelation(rel.getInput))
+    val plan = if (rel.getForceStableSort) {
+      // It is possible that the underlying dataframe doesn't guarantee the ordering of rows in its
+      // constituent partitions each time a split is materialized which could result in
+      // overlapping splits. To prevent this, we explicitly sort each input partition to make the
+      // ordering deterministic. Note that MapTypes cannot be sorted and are explicitly pruned out
+      // from the sort order.
+      val sortOrder = input.logicalPlan.output
+        .filter(attr => RowOrdering.isOrderable(attr.dataType))
+        .map(SortOrder(_, Ascending))
+      if (sortOrder.nonEmpty) {
+        Sort(sortOrder, global = false, input.logicalPlan)
+      } else {
+        input.logicalPlan
+      }
+    } else {
+      input.cache()

Review Comment:
   @zhengruifeng what is the purpose of the new proto message? The deterministic part is handled inside the planner. Exposing it as a new message indicates there is used beyond the plan parameter?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] beliefer commented on pull request #39017: [SPARK-41440][CONNECT][PYTHON] Implement `DataFrame.randomSplit`

Posted by GitBox <gi...@apache.org>.
beliefer commented on PR #39017:
URL: https://github.com/apache/spark/pull/39017#issuecomment-1350542839

   > and then the python client side can generate all the sampled splits from this one.
   
   cc @cloud-fan @HyukjinKwon 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] grundprinzip commented on a diff in pull request #39017: [SPARK-41440][CONNECT][PYTHON] Implement `DataFrame.randomSplit`

Posted by GitBox <gi...@apache.org>.
grundprinzip commented on code in PR #39017:
URL: https://github.com/apache/spark/pull/39017#discussion_r1047197796


##########
python/pyspark/sql/connect/dataframe.py:
##########
@@ -875,6 +877,60 @@ def to_jcols(
 
     melt = unpivot
 
+    def randomSplit(
+        self,
+        weights: List[float],
+        seed: Optional[int] = None,
+    ) -> List["DataFrame"]:
+        """Randomly splits this :class:`DataFrame` with the provided weights.
+
+        .. versionadded:: 3.4.0
+
+        Parameters
+        ----------
+        weights : list
+            list of doubles as weights with which to split the :class:`DataFrame`.
+            Weights will be normalized if they don't sum up to 1.0.
+        seed : int, optional
+            The seed for sampling.
+
+        Returns
+        -------
+        list
+            List of DataFrames.
+        """
+        for w in weights:
+            if w < 0.0:
+                raise ValueError("Weights must be positive. Found weight value: %s" % w)
+        seed = seed if seed is not None else random.randint(0, sys.maxsize)
+        total = sum(weights)
+        if total <= 0:
+            raise ValueError("Sum of weights must be positive, but got: %s" % w)
+        proportions = list(map(lambda x: x / total, weights))
+        normalizedCumWeights = [0.0]
+        for v in proportions:
+            normalizedCumWeights.append(normalizedCumWeights[-1] + v)
+        j = 1
+        length = len(normalizedCumWeights)
+        result = []
+        while j < length:
+            lowerBound = normalizedCumWeights[j - 1]
+            upperBound = normalizedCumWeights[j]
+            samplePlan = DataFrame.withPlan(
+                plan.Sample(
+                    child=self._plan,
+                    lower_bound=lowerBound,
+                    upper_bound=upperBound,
+                    with_replacement=False,
+                    seed=int(seed),
+                ),
+                session=self._session,
+            )
+            result.append(samplePlan)
+            j += 1
+
+        return result

Review Comment:
   yeah I think this one is interesting because because it's one of the few cases where the dataset actually implements client side logic. 
   
   The approach might be generally good, but I would remove the `RandomSplit` method and add a flag to `Sample` called `forceStableSort` or similar to indicate what the intent is.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] grundprinzip commented on a diff in pull request #39017: [SPARK-41440][CONNECT][PYTHON] Implement `DataFrame.randomSplit`

Posted by GitBox <gi...@apache.org>.
grundprinzip commented on code in PR #39017:
URL: https://github.com/apache/spark/pull/39017#discussion_r1047188863


##########
connector/connect/common/src/main/protobuf/spark/connect/relations.proto:
##########
@@ -601,3 +602,10 @@ message Unpivot {
   // (Required) Name of the value column.
   string value_column_name = 5;
 }
+
+// Randomly splits this Dataset with the provided weights.
+// Note: this message is just a wrapper for input relation.
+message RandomSplit {
+  // (Required) The input relation.
+  Relation input = 1;

Review Comment:
   I might be missing some context, please help me :)
   
   It looks like you want to express that RandomSplit can never occur outside of Sample. So the wrapping would be something like
   
   ```
   Sample(RandomSplit(input))
   ```
   
   In the current approach for the Relations all relations are more or less valid top level objects that can be used independently. I feel that in this case RandomSplit should rather be an element of the Sample message.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] zhengruifeng commented on a diff in pull request #39017: [SPARK-41440][CONNECT][PYTHON] Implement `DataFrame.randomSplit`

Posted by GitBox <gi...@apache.org>.
zhengruifeng commented on code in PR #39017:
URL: https://github.com/apache/spark/pull/39017#discussion_r1053969587


##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala:
##########
@@ -132,12 +132,32 @@ class SparkConnectPlanner(session: SparkSession) {
    * wrap such fields into proto messages.
    */
   private def transformSample(rel: proto.Sample): LogicalPlan = {
+    val input = Dataset.ofRows(session, transformRelation(rel.getInput))
+    val plan = if (rel.getForceStableSort) {
+      // It is possible that the underlying dataframe doesn't guarantee the ordering of rows in its
+      // constituent partitions each time a split is materialized which could result in
+      // overlapping splits. To prevent this, we explicitly sort each input partition to make the
+      // ordering deterministic. Note that MapTypes cannot be sorted and are explicitly pruned out
+      // from the sort order.
+      val sortOrder = input.logicalPlan.output
+        .filter(attr => RowOrdering.isOrderable(attr.dataType))
+        .map(SortOrder(_, Ascending))
+      if (sortOrder.nonEmpty) {
+        Sort(sortOrder, global = false, input.logicalPlan)
+      } else {
+        input.logicalPlan
+      }
+    } else {
+      input.cache()

Review Comment:
   I still feel like we should introduce a new proto which only try to make the input dataset deterministic (by sorting or caching),  to avoid mixing RandomSplit with Sample.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] beliefer commented on a diff in pull request #39017: [SPARK-41440][CONNECT][PYTHON] Implement `DataFrame.randomSplit`

Posted by GitBox <gi...@apache.org>.
beliefer commented on code in PR #39017:
URL: https://github.com/apache/spark/pull/39017#discussion_r1054166606


##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala:
##########
@@ -132,12 +132,32 @@ class SparkConnectPlanner(session: SparkSession) {
    * wrap such fields into proto messages.
    */
   private def transformSample(rel: proto.Sample): LogicalPlan = {
+    val input = Dataset.ofRows(session, transformRelation(rel.getInput))
+    val plan = if (rel.getForceStableSort) {
+      // It is possible that the underlying dataframe doesn't guarantee the ordering of rows in its
+      // constituent partitions each time a split is materialized which could result in
+      // overlapping splits. To prevent this, we explicitly sort each input partition to make the
+      // ordering deterministic. Note that MapTypes cannot be sorted and are explicitly pruned out
+      // from the sort order.
+      val sortOrder = input.logicalPlan.output
+        .filter(attr => RowOrdering.isOrderable(attr.dataType))
+        .map(SortOrder(_, Ascending))
+      if (sortOrder.nonEmpty) {
+        Sort(sortOrder, global = false, input.logicalPlan)
+      } else {
+        input.logicalPlan
+      }
+    } else {
+      input.cache()

Review Comment:
   Yes. We should avoid cached plan for sample.
   @zhengruifeng You will fix this issue? or I try to fix it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] beliefer commented on pull request #39017: [SPARK-41440][CONNECT][PYTHON] Implement `DataFrame.randomSplit`

Posted by GitBox <gi...@apache.org>.
beliefer commented on PR #39017:
URL: https://github.com/apache/spark/pull/39017#issuecomment-1360811101

   @HyukjinKwon @amaliujia @grundprinzip @zhengruifeng @cloud-fan Thank you for all!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] zhengruifeng commented on a diff in pull request #39017: [SPARK-41440][CONNECT][PYTHON] Implement `DataFrame.randomSplit`

Posted by GitBox <gi...@apache.org>.
zhengruifeng commented on code in PR #39017:
URL: https://github.com/apache/spark/pull/39017#discussion_r1053969587


##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala:
##########
@@ -132,12 +132,32 @@ class SparkConnectPlanner(session: SparkSession) {
    * wrap such fields into proto messages.
    */
   private def transformSample(rel: proto.Sample): LogicalPlan = {
+    val input = Dataset.ofRows(session, transformRelation(rel.getInput))
+    val plan = if (rel.getForceStableSort) {
+      // It is possible that the underlying dataframe doesn't guarantee the ordering of rows in its
+      // constituent partitions each time a split is materialized which could result in
+      // overlapping splits. To prevent this, we explicitly sort each input partition to make the
+      // ordering deterministic. Note that MapTypes cannot be sorted and are explicitly pruned out
+      // from the sort order.
+      val sortOrder = input.logicalPlan.output
+        .filter(attr => RowOrdering.isOrderable(attr.dataType))
+        .map(SortOrder(_, Ascending))
+      if (sortOrder.nonEmpty) {
+        Sort(sortOrder, global = false, input.logicalPlan)
+      } else {
+        input.logicalPlan
+      }
+    } else {
+      input.cache()

Review Comment:
   I still feel like we should introduce a new proto which only try to make the input dataset deterministic (by sorting or caching or just skip if the input is already deterministic or bababa),  to avoid mixing RandomSplit with Sample.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] amaliujia commented on a diff in pull request #39017: [SPARK-41440][CONNECT][PYTHON] Implement `DataFrame.randomSplit`

Posted by GitBox <gi...@apache.org>.
amaliujia commented on code in PR #39017:
URL: https://github.com/apache/spark/pull/39017#discussion_r1053960737


##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala:
##########
@@ -132,12 +132,32 @@ class SparkConnectPlanner(session: SparkSession) {
    * wrap such fields into proto messages.
    */
   private def transformSample(rel: proto.Sample): LogicalPlan = {
+    val input = Dataset.ofRows(session, transformRelation(rel.getInput))
+    val plan = if (rel.getForceStableSort) {
+      // It is possible that the underlying dataframe doesn't guarantee the ordering of rows in its
+      // constituent partitions each time a split is materialized which could result in
+      // overlapping splits. To prevent this, we explicitly sort each input partition to make the
+      // ordering deterministic. Note that MapTypes cannot be sorted and are explicitly pruned out
+      // from the sort order.
+      val sortOrder = input.logicalPlan.output
+        .filter(attr => RowOrdering.isOrderable(attr.dataType))
+        .map(SortOrder(_, Ascending))
+      if (sortOrder.nonEmpty) {
+        Sort(sortOrder, global = false, input.logicalPlan)
+      } else {
+        input.logicalPlan
+      }
+    } else {
+      input.cache()

Review Comment:
   This follows the existing implementation: https://github.com/apache/spark/blob/e23983a32df2bdb2c25673bb4f2a99b3c319ad23/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala#L2572



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a diff in pull request #39017: [SPARK-41440][CONNECT][PYTHON] Implement `DataFrame.randomSplit`

Posted by GitBox <gi...@apache.org>.
cloud-fan commented on code in PR #39017:
URL: https://github.com/apache/spark/pull/39017#discussion_r1053987005


##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala:
##########
@@ -132,12 +132,32 @@ class SparkConnectPlanner(session: SparkSession) {
    * wrap such fields into proto messages.
    */
   private def transformSample(rel: proto.Sample): LogicalPlan = {
+    val input = Dataset.ofRows(session, transformRelation(rel.getInput))
+    val plan = if (rel.getForceStableSort) {
+      // It is possible that the underlying dataframe doesn't guarantee the ordering of rows in its
+      // constituent partitions each time a split is materialized which could result in
+      // overlapping splits. To prevent this, we explicitly sort each input partition to make the
+      // ordering deterministic. Note that MapTypes cannot be sorted and are explicitly pruned out
+      // from the sort order.
+      val sortOrder = input.logicalPlan.output
+        .filter(attr => RowOrdering.isOrderable(attr.dataType))
+        .map(SortOrder(_, Ascending))
+      if (sortOrder.nonEmpty) {
+        Sort(sortOrder, global = false, input.logicalPlan)
+      } else {
+        input.logicalPlan
+      }
+    } else {
+      input.cache()

Review Comment:
   @zhengruifeng can you add such a proto message?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on pull request #39017: [SPARK-41440][CONNECT][PYTHON] Implement `DataFrame.randomSplit`

Posted by GitBox <gi...@apache.org>.
cloud-fan commented on PR #39017:
URL: https://github.com/apache/spark/pull/39017#issuecomment-1352661404

   > and then the python client side can generate all the sampled splits from this one.
   
   SGTM


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] beliefer commented on a diff in pull request #39017: [SPARK-41440][CONNECT][PYTHON] Implement `DataFrame.randomSplit`

Posted by GitBox <gi...@apache.org>.
beliefer commented on code in PR #39017:
URL: https://github.com/apache/spark/pull/39017#discussion_r1050418759


##########
python/pyspark/sql/connect/dataframe.py:
##########
@@ -875,6 +877,60 @@ def to_jcols(
 
     melt = unpivot
 
+    def randomSplit(
+        self,
+        weights: List[float],
+        seed: Optional[int] = None,
+    ) -> List["DataFrame"]:
+        """Randomly splits this :class:`DataFrame` with the provided weights.
+
+        .. versionadded:: 3.4.0
+
+        Parameters
+        ----------
+        weights : list
+            list of doubles as weights with which to split the :class:`DataFrame`.
+            Weights will be normalized if they don't sum up to 1.0.
+        seed : int, optional
+            The seed for sampling.
+
+        Returns
+        -------
+        list
+            List of DataFrames.
+        """
+        for w in weights:
+            if w < 0.0:
+                raise ValueError("Weights must be positive. Found weight value: %s" % w)

Review Comment:
   More lightweight to check at client.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] beliefer commented on a diff in pull request #39017: [SPARK-41440][CONNECT][PYTHON] Implement `DataFrame.randomSplit`

Posted by GitBox <gi...@apache.org>.
beliefer commented on code in PR #39017:
URL: https://github.com/apache/spark/pull/39017#discussion_r1046585027


##########
connector/connect/common/src/main/protobuf/spark/connect/relations.proto:
##########
@@ -601,3 +602,10 @@ message Unpivot {
   // (Required) Name of the value column.
   string value_column_name = 5;
 }
+
+// Randomly splits this Dataset with the provided weights.
+// Note: this message is just a wrapper for input relation.
+message RandomSplit {
+  // (Required) The input relation.
+  Relation input = 1;

Review Comment:
   This method is also possible, but it seems to be somewhat coupled.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] beliefer commented on a diff in pull request #39017: [SPARK-41440][CONNECT][PYTHON] Implement `DataFrame.randomSplit`

Posted by GitBox <gi...@apache.org>.
beliefer commented on code in PR #39017:
URL: https://github.com/apache/spark/pull/39017#discussion_r1046733400


##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala:
##########
@@ -132,12 +132,31 @@ class SparkConnectPlanner(session: SparkSession) {
    * wrap such fields into proto messages.
    */
   private def transformSample(rel: proto.Sample): LogicalPlan = {
+    val plan = if (rel.getInput.hasRandomSplit) {

Review Comment:
   Because we should construct the child plan of `Sample`. The child plan can be easily built in `SparkConnectPlanner` but hard in connect's `dataframe`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] zhengruifeng commented on a diff in pull request #39017: [SPARK-41440][CONNECT][PYTHON] Implement `DataFrame.randomSplit`

Posted by GitBox <gi...@apache.org>.
zhengruifeng commented on code in PR #39017:
URL: https://github.com/apache/spark/pull/39017#discussion_r1047994969


##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala:
##########
@@ -132,12 +132,31 @@ class SparkConnectPlanner(session: SparkSession) {
    * wrap such fields into proto messages.
    */
   private def transformSample(rel: proto.Sample): LogicalPlan = {
+    val logicalPlan = transformRelation(rel.getInput)
+    val plan = if (rel.getForceStableSort) {
+      // It is possible that the underlying dataframe doesn't guarantee the ordering of rows in its
+      // constituent partitions each time a split is materialized which could result in
+      // overlapping splits. To prevent this, we explicitly sort each input partition to make the
+      // ordering deterministic. Note that MapTypes cannot be sorted and are explicitly pruned out
+      // from the sort order.
+      val sortOrder = logicalPlan.output
+        .filter(attr => RowOrdering.isOrderable(attr.dataType))
+        .map(SortOrder(_, Ascending))
+      if (sortOrder.nonEmpty) {
+        Sort(sortOrder, global = false, logicalPlan)
+      } else {
+        logicalPlan

Review Comment:
   `Dataset.randomSplit` will do caching. https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala#L2570-L2572



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on pull request #39017: [SPARK-41440][CONNECT][PYTHON] Implement `DataFrame.randomSplit`

Posted by GitBox <gi...@apache.org>.
HyukjinKwon commented on PR #39017:
URL: https://github.com/apache/spark/pull/39017#issuecomment-1360730282

   Merged to master.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] zhengruifeng commented on a diff in pull request #39017: [SPARK-41440][CONNECT][PYTHON] Implement `DataFrame.randomSplit`

Posted by GitBox <gi...@apache.org>.
zhengruifeng commented on code in PR #39017:
URL: https://github.com/apache/spark/pull/39017#discussion_r1054055800


##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala:
##########
@@ -132,12 +132,32 @@ class SparkConnectPlanner(session: SparkSession) {
    * wrap such fields into proto messages.
    */
   private def transformSample(rel: proto.Sample): LogicalPlan = {
+    val input = Dataset.ofRows(session, transformRelation(rel.getInput))
+    val plan = if (rel.getForceStableSort) {
+      // It is possible that the underlying dataframe doesn't guarantee the ordering of rows in its
+      // constituent partitions each time a split is materialized which could result in
+      // overlapping splits. To prevent this, we explicitly sort each input partition to make the
+      // ordering deterministic. Note that MapTypes cannot be sorted and are explicitly pruned out
+      // from the sort order.
+      val sortOrder = input.logicalPlan.output
+        .filter(attr => RowOrdering.isOrderable(attr.dataType))
+        .map(SortOrder(_, Ascending))
+      if (sortOrder.nonEmpty) {
+        Sort(sortOrder, global = false, input.logicalPlan)
+      } else {
+        input.logicalPlan
+      }
+    } else {
+      input.cache()

Review Comment:
   https://github.com/apache/spark/pull/39151  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] grundprinzip commented on pull request #39017: [SPARK-41440][CONNECT][PYTHON] Implement `DataFrame.randomSplit`

Posted by GitBox <gi...@apache.org>.
grundprinzip commented on PR #39017:
URL: https://github.com/apache/spark/pull/39017#issuecomment-1358100507

   @beliefer please have a look at the merge conflict.
   
   @HyukjinKwon @zhengruifeng please take a look.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] beliefer commented on a diff in pull request #39017: [SPARK-41440][CONNECT][PYTHON] Implement `DataFrame.randomSplit`

Posted by GitBox <gi...@apache.org>.
beliefer commented on code in PR #39017:
URL: https://github.com/apache/spark/pull/39017#discussion_r1046787533


##########
python/pyspark/sql/connect/dataframe.py:
##########
@@ -875,6 +877,60 @@ def to_jcols(
 
     melt = unpivot
 
+    def randomSplit(
+        self,
+        weights: List[float],
+        seed: Optional[int] = None,
+    ) -> List["DataFrame"]:
+        """Randomly splits this :class:`DataFrame` with the provided weights.
+
+        .. versionadded:: 3.4.0
+
+        Parameters
+        ----------
+        weights : list
+            list of doubles as weights with which to split the :class:`DataFrame`.
+            Weights will be normalized if they don't sum up to 1.0.
+        seed : int, optional
+            The seed for sampling.
+
+        Returns
+        -------
+        list
+            List of DataFrames.
+        """
+        for w in weights:
+            if w < 0.0:
+                raise ValueError("Weights must be positive. Found weight value: %s" % w)
+        seed = seed if seed is not None else random.randint(0, sys.maxsize)
+        total = sum(weights)
+        if total <= 0:
+            raise ValueError("Sum of weights must be positive, but got: %s" % w)
+        proportions = list(map(lambda x: x / total, weights))
+        normalizedCumWeights = [0.0]
+        for v in proportions:
+            normalizedCumWeights.append(normalizedCumWeights[-1] + v)
+        j = 1
+        length = len(normalizedCumWeights)
+        result = []
+        while j < length:
+            lowerBound = normalizedCumWeights[j - 1]
+            upperBound = normalizedCumWeights[j]
+            samplePlan = DataFrame.withPlan(
+                plan.Sample(
+                    child=self._plan,
+                    lower_bound=lowerBound,
+                    upper_bound=upperBound,
+                    with_replacement=False,
+                    seed=int(seed),
+                ),
+                session=self._session,
+            )
+            result.append(samplePlan)
+            j += 1
+
+        return result

Review Comment:
   `randomSplit` of `Dataset` returns multiple Dataset. This method just follows the former's behavior.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a diff in pull request #39017: [SPARK-41440][CONNECT][PYTHON] Implement `DataFrame.randomSplit`

Posted by GitBox <gi...@apache.org>.
cloud-fan commented on code in PR #39017:
URL: https://github.com/apache/spark/pull/39017#discussion_r1053959669


##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala:
##########
@@ -132,12 +132,32 @@ class SparkConnectPlanner(session: SparkSession) {
    * wrap such fields into proto messages.
    */
   private def transformSample(rel: proto.Sample): LogicalPlan = {
+    val input = Dataset.ofRows(session, transformRelation(rel.getInput))
+    val plan = if (rel.getForceStableSort) {
+      // It is possible that the underlying dataframe doesn't guarantee the ordering of rows in its
+      // constituent partitions each time a split is materialized which could result in
+      // overlapping splits. To prevent this, we explicitly sort each input partition to make the
+      // ordering deterministic. Note that MapTypes cannot be sorted and are explicitly pruned out
+      // from the sort order.
+      val sortOrder = input.logicalPlan.output
+        .filter(attr => RowOrdering.isOrderable(attr.dataType))
+        .map(SortOrder(_, Ascending))
+      if (sortOrder.nonEmpty) {
+        Sort(sortOrder, global = false, input.logicalPlan)
+      } else {
+        input.logicalPlan
+      }
+    } else {
+      input.cache()

Review Comment:
   why do we cache here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a diff in pull request #39017: [SPARK-41440][CONNECT][PYTHON] Implement `DataFrame.randomSplit`

Posted by GitBox <gi...@apache.org>.
cloud-fan commented on code in PR #39017:
URL: https://github.com/apache/spark/pull/39017#discussion_r1046738676


##########
python/pyspark/sql/connect/dataframe.py:
##########
@@ -875,6 +877,60 @@ def to_jcols(
 
     melt = unpivot
 
+    def randomSplit(
+        self,
+        weights: List[float],
+        seed: Optional[int] = None,
+    ) -> List["DataFrame"]:
+        """Randomly splits this :class:`DataFrame` with the provided weights.
+
+        .. versionadded:: 3.4.0
+
+        Parameters
+        ----------
+        weights : list
+            list of doubles as weights with which to split the :class:`DataFrame`.
+            Weights will be normalized if they don't sum up to 1.0.
+        seed : int, optional
+            The seed for sampling.
+
+        Returns
+        -------
+        list
+            List of DataFrames.
+        """
+        for w in weights:
+            if w < 0.0:
+                raise ValueError("Weights must be positive. Found weight value: %s" % w)
+        seed = seed if seed is not None else random.randint(0, sys.maxsize)
+        total = sum(weights)
+        if total <= 0:
+            raise ValueError("Sum of weights must be positive, but got: %s" % w)
+        proportions = list(map(lambda x: x / total, weights))
+        normalizedCumWeights = [0.0]
+        for v in proportions:
+            normalizedCumWeights.append(normalizedCumWeights[-1] + v)
+        j = 1
+        length = len(normalizedCumWeights)
+        result = []
+        while j < length:
+            lowerBound = normalizedCumWeights[j - 1]
+            upperBound = normalizedCumWeights[j]
+            samplePlan = DataFrame.withPlan(
+                plan.Sample(
+                    child=self._plan,
+                    lower_bound=lowerBound,
+                    upper_bound=upperBound,
+                    with_replacement=False,
+                    seed=int(seed),
+                ),
+                session=self._session,
+            )
+            result.append(samplePlan)
+            j += 1
+
+        return result

Review Comment:
   Seems the proto message is not well designed. The client code shouldn't be that complicated.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] amaliujia commented on pull request #39017: [SPARK-41440][CONNECT][PYTHON] Implement `DataFrame.randomSplit`

Posted by GitBox <gi...@apache.org>.
amaliujia commented on PR #39017:
URL: https://github.com/apache/spark/pull/39017#issuecomment-1350353222

   The current implementation looks pretty good.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a diff in pull request #39017: [SPARK-41440][CONNECT][PYTHON] Implement `DataFrame.randomSplit`

Posted by GitBox <gi...@apache.org>.
cloud-fan commented on code in PR #39017:
URL: https://github.com/apache/spark/pull/39017#discussion_r1046606153


##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala:
##########
@@ -132,12 +132,31 @@ class SparkConnectPlanner(session: SparkSession) {
    * wrap such fields into proto messages.
    */
   private def transformSample(rel: proto.Sample): LogicalPlan = {
+    val plan = if (rel.getInput.hasRandomSplit) {

Review Comment:
   I'm a bit confused, This function is matching `Sample` but checks `RandomSplit`. Aren't they the different proto messages?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] amaliujia commented on a diff in pull request #39017: [SPARK-41440][CONNECT][PYTHON] Implement `DataFrame.randomSplit`

Posted by GitBox <gi...@apache.org>.
amaliujia commented on code in PR #39017:
URL: https://github.com/apache/spark/pull/39017#discussion_r1046358161


##########
connector/connect/common/src/main/protobuf/spark/connect/relations.proto:
##########
@@ -601,3 +602,10 @@ message Unpivot {
   // (Required) Name of the value column.
   string value_column_name = 5;
 }
+
+// Randomly splits this Dataset with the provided weights.
+// Note: this message is just a wrapper for input relation.
+message RandomSplit {
+  // (Required) The input relation.
+  Relation input = 1;

Review Comment:
   hmmm I am thinking you do not need this message. Maybe you only need add a `bool` to the Sample message to indicate this is a case of random split?
   
   cc @zhengruifeng @cloud-fan 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org