You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by "hvanhovell (via GitHub)" <gi...@apache.org> on 2023/02/22 20:47:50 UTC

[GitHub] [spark] hvanhovell opened a new pull request, #40130: [SPARK-42531][CONNECT] Scala Client Add Collections Functions

hvanhovell opened a new pull request, #40130:
URL: https://github.com/apache/spark/pull/40130

   ### What changes were proposed in this pull request?
   This PR adds all the collections functions to `functions.scala` for Scala client. This is the last PR large functions PR, there are a few functions missing, these will be added later.
   
   ### Why are the changes needed?
   We want the Scala client to have API parity with the existing API
   
   ### Does this PR introduce _any_ user-facing change?
   Yes, it adds functions to the Spark Connect Scala Client.
   
   ### How was this patch tested?
   Added tests to `PlanGenerationTestSuite` and to `ProtoToPlanTestSuite`. I have added a few tests to `ClientE2ETestSuite` for lambda functions (to make sure name scoping works) and the array shuffle function (non-deterministic, hard to test with golden files).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] hvanhovell closed pull request #40130: [SPARK-42531][CONNECT] Scala Client Add Collections Functions

Posted by "hvanhovell (via GitHub)" <gi...@apache.org>.
hvanhovell closed pull request #40130: [SPARK-42531][CONNECT] Scala Client Add Collections Functions
URL: https://github.com/apache/spark/pull/40130


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] LuciferYang commented on pull request #40130: [SPARK-42531][CONNECT] Scala Client Add Collections Functions

Posted by "LuciferYang (via GitHub)" <gi...@apache.org>.
LuciferYang commented on PR #40130:
URL: https://github.com/apache/spark/pull/40130#issuecomment-1441434091

   Checked the new test with Scala 2.13, all passed


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] hvanhovell commented on pull request #40130: [SPARK-42531][CONNECT] Scala Client Add Collections Functions

Posted by "hvanhovell (via GitHub)" <gi...@apache.org>.
hvanhovell commented on PR #40130:
URL: https://github.com/apache/spark/pull/40130#issuecomment-1440776736

   cc @LuciferYang 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] hvanhovell commented on a diff in pull request #40130: [SPARK-42531][CONNECT] Scala Client Add Collections Functions

Posted by "hvanhovell (via GitHub)" <gi...@apache.org>.
hvanhovell commented on code in PR #40130:
URL: https://github.com/apache/spark/pull/40130#discussion_r1115666233


##########
connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientE2ETestSuite.scala:
##########
@@ -336,4 +336,31 @@ class ClientE2ETestSuite extends RemoteSparkSession {
     checkSample(datasets.get(2), 3.0 / 10.0, 6.0 / 10.0, 9L)
     checkSample(datasets.get(3), 6.0 / 10.0, 1.0, 9L)
   }
+
+  test("lambda functions") {
+    // This test is mostly to validate lambda variables are properly resolved.
+    import org.apache.spark.sql.functions._

Review Comment:
   I guess we can. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] hvanhovell commented on pull request #40130: [SPARK-42531][CONNECT] Scala Client Add Collections Functions

Posted by "hvanhovell (via GitHub)" <gi...@apache.org>.
hvanhovell commented on PR #40130:
URL: https://github.com/apache/spark/pull/40130#issuecomment-1440776529

   For the reviewer I have manually tested this with scala-2.13.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] LuciferYang commented on a diff in pull request #40130: [SPARK-42531][CONNECT] Scala Client Add Collections Functions

Posted by "LuciferYang (via GitHub)" <gi...@apache.org>.
LuciferYang commented on code in PR #40130:
URL: https://github.com/apache/spark/pull/40130#discussion_r1115344944


##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/functions.scala:
##########
@@ -3845,6 +3849,1213 @@ object functions {
    */
   def timestamp_seconds(e: Column): Column = Column.fn("timestamp_seconds", e)
 
+  //////////////////////////////////////////////////////////////////////////////////////////////
+  // Collection functions
+  //////////////////////////////////////////////////////////////////////////////////////////////
+
+  /**
+   * Returns null if the array is null, true if the array contains `value`, and false otherwise.
+   * @group collection_funcs
+   * @since 3.4.0
+   */
+  def array_contains(column: Column, value: Any): Column =
+    Column.fn("array_contains", column, lit(value))
+
+  /**
+   * Returns an ARRAY containing all elements from the source ARRAY as well as the new element.
+   * The new element/column is located at end of the ARRAY.
+   *
+   * @group collection_funcs
+   * @since 3.4.0
+   */
+  def array_append(column: Column, element: Any): Column =
+    Column.fn("array_append", column, lit(element))
+
+  /**
+   * Returns `true` if `a1` and `a2` have at least one non-null element in common. If not and both
+   * the arrays are non-empty and any of them contains a `null`, it returns `null`. It returns
+   * `false` otherwise.
+   * @group collection_funcs
+   * @since 3.4.0
+   */
+  def arrays_overlap(a1: Column, a2: Column): Column = Column.fn("arrays_overlap", a1, a2)
+
+  /**
+   * Returns an array containing all the elements in `x` from index `start` (or starting from the
+   * end if `start` is negative) with the specified `length`.
+   *
+   * @param x
+   *   the array column to be sliced
+   * @param start
+   *   the starting index
+   * @param length
+   *   the length of the slice
+   *
+   * @group collection_funcs
+   * @since 2.4.0

Review Comment:
   `@since`



##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/functions.scala:
##########
@@ -3845,6 +3849,1213 @@ object functions {
    */
   def timestamp_seconds(e: Column): Column = Column.fn("timestamp_seconds", e)
 
+  //////////////////////////////////////////////////////////////////////////////////////////////
+  // Collection functions
+  //////////////////////////////////////////////////////////////////////////////////////////////
+
+  /**
+   * Returns null if the array is null, true if the array contains `value`, and false otherwise.
+   * @group collection_funcs
+   * @since 3.4.0
+   */
+  def array_contains(column: Column, value: Any): Column =
+    Column.fn("array_contains", column, lit(value))
+
+  /**
+   * Returns an ARRAY containing all elements from the source ARRAY as well as the new element.
+   * The new element/column is located at end of the ARRAY.
+   *
+   * @group collection_funcs
+   * @since 3.4.0
+   */
+  def array_append(column: Column, element: Any): Column =
+    Column.fn("array_append", column, lit(element))
+
+  /**
+   * Returns `true` if `a1` and `a2` have at least one non-null element in common. If not and both
+   * the arrays are non-empty and any of them contains a `null`, it returns `null`. It returns
+   * `false` otherwise.
+   * @group collection_funcs
+   * @since 3.4.0
+   */
+  def arrays_overlap(a1: Column, a2: Column): Column = Column.fn("arrays_overlap", a1, a2)
+
+  /**
+   * Returns an array containing all the elements in `x` from index `start` (or starting from the
+   * end if `start` is negative) with the specified `length`.
+   *
+   * @param x
+   *   the array column to be sliced
+   * @param start
+   *   the starting index
+   * @param length
+   *   the length of the slice
+   *
+   * @group collection_funcs
+   * @since 2.4.0
+   */
+  def slice(x: Column, start: Int, length: Int): Column =
+    slice(x, lit(start), lit(length))
+
+  /**
+   * Returns an array containing all the elements in `x` from index `start` (or starting from the
+   * end if `start` is negative) with the specified `length`.
+   *
+   * @param x
+   *   the array column to be sliced
+   * @param start
+   *   the starting index
+   * @param length
+   *   the length of the slice
+   *
+   * @group collection_funcs
+   * @since 3.4.0
+   */
+  def slice(x: Column, start: Column, length: Column): Column =
+    Column.fn("slice", x, start, length)
+
+  /**
+   * Concatenates the elements of `column` using the `delimiter`. Null values are replaced with
+   * `nullReplacement`.
+   * @group collection_funcs
+   * @since 3.4.0
+   */
+  def array_join(column: Column, delimiter: String, nullReplacement: String): Column =
+    Column.fn("array_join", column, lit(delimiter), lit(nullReplacement))
+
+  /**
+   * Concatenates the elements of `column` using the `delimiter`.
+   * @group collection_funcs
+   * @since 3.4.0
+   */
+  def array_join(column: Column, delimiter: String): Column =
+    Column.fn("array_join", column, lit(delimiter))
+
+  /**
+   * Concatenates multiple input columns together into a single column. The function works with
+   * strings, binary and compatible array columns.
+   *
+   * @group collection_funcs
+   * @since 3.4.0
+   */
+  @scala.annotation.varargs
+  def concat(exprs: Column*): Column = Column.fn("concat", exprs: _*)
+
+  /**
+   * Locates the position of the first occurrence of the value in the given array as long. Returns
+   * null if either of the arguments are null.
+   *
+   * @note
+   *   The position is not zero based, but 1 based index. Returns 0 if value could not be found in
+   *   array.
+   *
+   * @group collection_funcs
+   * @since 3.4.0
+   */
+  def array_position(column: Column, value: Any): Column =
+    Column.fn("array_position", column, lit(value))
+
+  /**
+   * Returns element of array at given index in value if column is array. Returns value for the
+   * given key in value if column is map.
+   *
+   * @group collection_funcs
+   * @since 3.4.0
+   */
+  def element_at(column: Column, value: Any): Column = Column.fn("element_at", column, lit(value))
+
+  /**
+   * Returns element of array at given (0-based) index. If the index points outside of the array
+   * boundaries, then this function returns NULL.
+   *
+   * @group collection_funcs
+   * @since 3.4.0
+   */
+  def get(column: Column, index: Column): Column = Column.fn("get", column, index)
+
+  /**
+   * Sorts the input array in ascending order. The elements of the input array must be orderable.
+   * NaN is greater than any non-NaN elements for double/float type. Null elements will be placed
+   * at the end of the returned array.
+   *
+   * @group collection_funcs
+   * @since 3.4.0
+   */
+  def array_sort(e: Column): Column = Column.fn("array_sort", e)
+
+  /**
+   * Sorts the input array based on the given comparator function. The comparator will take two
+   * arguments representing two elements of the array. It returns a negative integer, 0, or a
+   * positive integer as the first element is less than, equal to, or greater than the second
+   * element. If the comparator function returns null, the function will fail and raise an error.
+   *
+   * @group collection_funcs
+   * @since 3.4.0
+   */
+  def array_sort(e: Column, comparator: (Column, Column) => Column): Column =
+    Column.fn("array_sort", e, createLambda(comparator))
+
+  /**
+   * Remove all elements that equal to element from the given array.
+   *
+   * @group collection_funcs
+   * @since 3.4.0
+   */
+  def array_remove(column: Column, element: Any): Column =
+    Column.fn("array_remove", column, lit(element))
+
+  /**
+   * Remove all null elements from the given array.
+   *
+   * @group collection_funcs
+   * @since 3.4.0
+   */
+  def array_compact(column: Column): Column = Column.fn("array_compact", column)
+
+  /**
+   * Removes duplicate values from the array.
+   * @group collection_funcs
+   * @since 3.4.0
+   */
+  def array_distinct(e: Column): Column = Column.fn("array_distinct", e)
+
+  /**
+   * Returns an array of the elements in the intersection of the given two arrays, without
+   * duplicates.
+   *
+   * @group collection_funcs
+   * @since 3.4.0
+   */
+  def array_intersect(col1: Column, col2: Column): Column =
+    Column.fn("array_intersect", col1, col2)
+
+  /**
+   * Adds an item into a given array at a specified position
+   *
+   * @group collection_funcs
+   * @since 3.4.0
+   */
+  def array_insert(arr: Column, pos: Column, value: Column): Column =
+    Column.fn("array_insert", arr, pos, value)
+
+  /**
+   * Returns an array of the elements in the union of the given two arrays, without duplicates.
+   *
+   * @group collection_funcs
+   * @since 3.4.0
+   */
+  def array_union(col1: Column, col2: Column): Column =
+    Column.fn("array_union", col1, col2)
+
+  /**
+   * Returns an array of the elements in the first array but not in the second array, without
+   * duplicates. The order of elements in the result is not determined
+   *
+   * @group collection_funcs
+   * @since 3.4.0
+   */
+  def array_except(col1: Column, col2: Column): Column =
+    Column.fn("array_except", col1, col2)
+
+  private def newLambdaVariable(name: String): proto.Expression.UnresolvedNamedLambdaVariable = {
+    proto.Expression.UnresolvedNamedLambdaVariable
+      .newBuilder()
+      .addNameParts(name)
+      .build()
+  }
+
+  private def toLambdaVariableColumn(
+      v: proto.Expression.UnresolvedNamedLambdaVariable): Column = {
+    Column(_.setUnresolvedNamedLambdaVariable(v))
+  }
+
+  private def createLambda(f: Column => Column): Column = Column { builder =>
+    val x = newLambdaVariable("x")

Review Comment:
   Can we always use `x`? 
   Why not `UnresolvedNamedLambdaVariable.freshVarName("x")`?
   
   
   
   



##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/functions.scala:
##########
@@ -3845,6 +3849,1213 @@ object functions {
    */
   def timestamp_seconds(e: Column): Column = Column.fn("timestamp_seconds", e)
 
+  //////////////////////////////////////////////////////////////////////////////////////////////
+  // Collection functions
+  //////////////////////////////////////////////////////////////////////////////////////////////
+
+  /**
+   * Returns null if the array is null, true if the array contains `value`, and false otherwise.
+   * @group collection_funcs
+   * @since 3.4.0
+   */
+  def array_contains(column: Column, value: Any): Column =
+    Column.fn("array_contains", column, lit(value))
+
+  /**
+   * Returns an ARRAY containing all elements from the source ARRAY as well as the new element.
+   * The new element/column is located at end of the ARRAY.
+   *
+   * @group collection_funcs
+   * @since 3.4.0
+   */
+  def array_append(column: Column, element: Any): Column =
+    Column.fn("array_append", column, lit(element))
+
+  /**
+   * Returns `true` if `a1` and `a2` have at least one non-null element in common. If not and both
+   * the arrays are non-empty and any of them contains a `null`, it returns `null`. It returns
+   * `false` otherwise.
+   * @group collection_funcs
+   * @since 3.4.0
+   */
+  def arrays_overlap(a1: Column, a2: Column): Column = Column.fn("arrays_overlap", a1, a2)
+
+  /**
+   * Returns an array containing all the elements in `x` from index `start` (or starting from the
+   * end if `start` is negative) with the specified `length`.
+   *
+   * @param x
+   *   the array column to be sliced
+   * @param start
+   *   the starting index
+   * @param length
+   *   the length of the slice
+   *
+   * @group collection_funcs
+   * @since 2.4.0
+   */
+  def slice(x: Column, start: Int, length: Int): Column =
+    slice(x, lit(start), lit(length))
+
+  /**
+   * Returns an array containing all the elements in `x` from index `start` (or starting from the
+   * end if `start` is negative) with the specified `length`.
+   *
+   * @param x
+   *   the array column to be sliced
+   * @param start
+   *   the starting index
+   * @param length
+   *   the length of the slice
+   *
+   * @group collection_funcs
+   * @since 3.4.0
+   */
+  def slice(x: Column, start: Column, length: Column): Column =
+    Column.fn("slice", x, start, length)
+
+  /**
+   * Concatenates the elements of `column` using the `delimiter`. Null values are replaced with
+   * `nullReplacement`.
+   * @group collection_funcs
+   * @since 3.4.0
+   */
+  def array_join(column: Column, delimiter: String, nullReplacement: String): Column =
+    Column.fn("array_join", column, lit(delimiter), lit(nullReplacement))
+
+  /**
+   * Concatenates the elements of `column` using the `delimiter`.
+   * @group collection_funcs
+   * @since 3.4.0
+   */
+  def array_join(column: Column, delimiter: String): Column =
+    Column.fn("array_join", column, lit(delimiter))
+
+  /**
+   * Concatenates multiple input columns together into a single column. The function works with
+   * strings, binary and compatible array columns.
+   *
+   * @group collection_funcs
+   * @since 3.4.0
+   */
+  @scala.annotation.varargs
+  def concat(exprs: Column*): Column = Column.fn("concat", exprs: _*)
+
+  /**
+   * Locates the position of the first occurrence of the value in the given array as long. Returns
+   * null if either of the arguments are null.
+   *
+   * @note
+   *   The position is not zero based, but 1 based index. Returns 0 if value could not be found in
+   *   array.
+   *
+   * @group collection_funcs
+   * @since 3.4.0
+   */
+  def array_position(column: Column, value: Any): Column =
+    Column.fn("array_position", column, lit(value))
+
+  /**
+   * Returns element of array at given index in value if column is array. Returns value for the
+   * given key in value if column is map.
+   *
+   * @group collection_funcs
+   * @since 3.4.0
+   */
+  def element_at(column: Column, value: Any): Column = Column.fn("element_at", column, lit(value))
+
+  /**
+   * Returns element of array at given (0-based) index. If the index points outside of the array
+   * boundaries, then this function returns NULL.
+   *
+   * @group collection_funcs
+   * @since 3.4.0
+   */
+  def get(column: Column, index: Column): Column = Column.fn("get", column, index)
+
+  /**
+   * Sorts the input array in ascending order. The elements of the input array must be orderable.
+   * NaN is greater than any non-NaN elements for double/float type. Null elements will be placed
+   * at the end of the returned array.
+   *
+   * @group collection_funcs
+   * @since 3.4.0
+   */
+  def array_sort(e: Column): Column = Column.fn("array_sort", e)
+
+  /**
+   * Sorts the input array based on the given comparator function. The comparator will take two
+   * arguments representing two elements of the array. It returns a negative integer, 0, or a
+   * positive integer as the first element is less than, equal to, or greater than the second
+   * element. If the comparator function returns null, the function will fail and raise an error.
+   *
+   * @group collection_funcs
+   * @since 3.4.0
+   */
+  def array_sort(e: Column, comparator: (Column, Column) => Column): Column =
+    Column.fn("array_sort", e, createLambda(comparator))
+
+  /**
+   * Remove all elements that equal to element from the given array.
+   *
+   * @group collection_funcs
+   * @since 3.4.0
+   */
+  def array_remove(column: Column, element: Any): Column =
+    Column.fn("array_remove", column, lit(element))
+
+  /**
+   * Remove all null elements from the given array.
+   *
+   * @group collection_funcs
+   * @since 3.4.0
+   */
+  def array_compact(column: Column): Column = Column.fn("array_compact", column)
+
+  /**
+   * Removes duplicate values from the array.
+   * @group collection_funcs
+   * @since 3.4.0
+   */
+  def array_distinct(e: Column): Column = Column.fn("array_distinct", e)
+
+  /**
+   * Returns an array of the elements in the intersection of the given two arrays, without
+   * duplicates.
+   *
+   * @group collection_funcs
+   * @since 3.4.0
+   */
+  def array_intersect(col1: Column, col2: Column): Column =
+    Column.fn("array_intersect", col1, col2)
+
+  /**
+   * Adds an item into a given array at a specified position
+   *
+   * @group collection_funcs
+   * @since 3.4.0
+   */
+  def array_insert(arr: Column, pos: Column, value: Column): Column =
+    Column.fn("array_insert", arr, pos, value)
+
+  /**
+   * Returns an array of the elements in the union of the given two arrays, without duplicates.
+   *
+   * @group collection_funcs
+   * @since 3.4.0
+   */
+  def array_union(col1: Column, col2: Column): Column =
+    Column.fn("array_union", col1, col2)
+
+  /**
+   * Returns an array of the elements in the first array but not in the second array, without
+   * duplicates. The order of elements in the result is not determined
+   *
+   * @group collection_funcs
+   * @since 3.4.0
+   */
+  def array_except(col1: Column, col2: Column): Column =
+    Column.fn("array_except", col1, col2)
+
+  private def newLambdaVariable(name: String): proto.Expression.UnresolvedNamedLambdaVariable = {
+    proto.Expression.UnresolvedNamedLambdaVariable
+      .newBuilder()
+      .addNameParts(name)
+      .build()
+  }
+
+  private def toLambdaVariableColumn(
+      v: proto.Expression.UnresolvedNamedLambdaVariable): Column = {
+    Column(_.setUnresolvedNamedLambdaVariable(v))
+  }
+
+  private def createLambda(f: Column => Column): Column = Column { builder =>
+    val x = newLambdaVariable("x")
+    val function = f(toLambdaVariableColumn(x))

Review Comment:
   Yes, I know where I am wrong, thanks
   
   



##########
connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientE2ETestSuite.scala:
##########
@@ -336,4 +336,31 @@ class ClientE2ETestSuite extends RemoteSparkSession {
     checkSample(datasets.get(2), 3.0 / 10.0, 6.0 / 10.0, 9L)
     checkSample(datasets.get(3), 6.0 / 10.0, 1.0, 9L)
   }
+
+  test("lambda functions") {
+    // This test is mostly to validate lambda variables are properly resolved.
+    import org.apache.spark.sql.functions._

Review Comment:
   should we move this import  be put together with others?
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] LuciferYang commented on a diff in pull request #40130: [SPARK-42531][CONNECT] Scala Client Add Collections Functions

Posted by "LuciferYang (via GitHub)" <gi...@apache.org>.
LuciferYang commented on code in PR #40130:
URL: https://github.com/apache/spark/pull/40130#discussion_r1115386556


##########
connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientE2ETestSuite.scala:
##########
@@ -336,4 +336,31 @@ class ClientE2ETestSuite extends RemoteSparkSession {
     checkSample(datasets.get(2), 3.0 / 10.0, 6.0 / 10.0, 9L)
     checkSample(datasets.get(3), 6.0 / 10.0, 1.0, 9L)
   }
+
+  test("lambda functions") {
+    // This test is mostly to validate lambda variables are properly resolved.
+    import org.apache.spark.sql.functions._

Review Comment:
   should we put this import on the top like other imports?
   
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] hvanhovell commented on a diff in pull request #40130: [SPARK-42531][CONNECT] Scala Client Add Collections Functions

Posted by "hvanhovell (via GitHub)" <gi...@apache.org>.
hvanhovell commented on code in PR #40130:
URL: https://github.com/apache/spark/pull/40130#discussion_r1115665673


##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/functions.scala:
##########
@@ -3845,6 +3849,1213 @@ object functions {
    */
   def timestamp_seconds(e: Column): Column = Column.fn("timestamp_seconds", e)
 
+  //////////////////////////////////////////////////////////////////////////////////////////////
+  // Collection functions
+  //////////////////////////////////////////////////////////////////////////////////////////////
+
+  /**
+   * Returns null if the array is null, true if the array contains `value`, and false otherwise.
+   * @group collection_funcs
+   * @since 3.4.0
+   */
+  def array_contains(column: Column, value: Any): Column =
+    Column.fn("array_contains", column, lit(value))
+
+  /**
+   * Returns an ARRAY containing all elements from the source ARRAY as well as the new element.
+   * The new element/column is located at end of the ARRAY.
+   *
+   * @group collection_funcs
+   * @since 3.4.0
+   */
+  def array_append(column: Column, element: Any): Column =
+    Column.fn("array_append", column, lit(element))
+
+  /**
+   * Returns `true` if `a1` and `a2` have at least one non-null element in common. If not and both
+   * the arrays are non-empty and any of them contains a `null`, it returns `null`. It returns
+   * `false` otherwise.
+   * @group collection_funcs
+   * @since 3.4.0
+   */
+  def arrays_overlap(a1: Column, a2: Column): Column = Column.fn("arrays_overlap", a1, a2)
+
+  /**
+   * Returns an array containing all the elements in `x` from index `start` (or starting from the
+   * end if `start` is negative) with the specified `length`.
+   *
+   * @param x
+   *   the array column to be sliced
+   * @param start
+   *   the starting index
+   * @param length
+   *   the length of the slice
+   *
+   * @group collection_funcs
+   * @since 2.4.0
+   */
+  def slice(x: Column, start: Int, length: Int): Column =
+    slice(x, lit(start), lit(length))
+
+  /**
+   * Returns an array containing all the elements in `x` from index `start` (or starting from the
+   * end if `start` is negative) with the specified `length`.
+   *
+   * @param x
+   *   the array column to be sliced
+   * @param start
+   *   the starting index
+   * @param length
+   *   the length of the slice
+   *
+   * @group collection_funcs
+   * @since 3.4.0
+   */
+  def slice(x: Column, start: Column, length: Column): Column =
+    Column.fn("slice", x, start, length)
+
+  /**
+   * Concatenates the elements of `column` using the `delimiter`. Null values are replaced with
+   * `nullReplacement`.
+   * @group collection_funcs
+   * @since 3.4.0
+   */
+  def array_join(column: Column, delimiter: String, nullReplacement: String): Column =
+    Column.fn("array_join", column, lit(delimiter), lit(nullReplacement))
+
+  /**
+   * Concatenates the elements of `column` using the `delimiter`.
+   * @group collection_funcs
+   * @since 3.4.0
+   */
+  def array_join(column: Column, delimiter: String): Column =
+    Column.fn("array_join", column, lit(delimiter))
+
+  /**
+   * Concatenates multiple input columns together into a single column. The function works with
+   * strings, binary and compatible array columns.
+   *
+   * @group collection_funcs
+   * @since 3.4.0
+   */
+  @scala.annotation.varargs
+  def concat(exprs: Column*): Column = Column.fn("concat", exprs: _*)
+
+  /**
+   * Locates the position of the first occurrence of the value in the given array as long. Returns
+   * null if either of the arguments are null.
+   *
+   * @note
+   *   The position is not zero based, but 1 based index. Returns 0 if value could not be found in
+   *   array.
+   *
+   * @group collection_funcs
+   * @since 3.4.0
+   */
+  def array_position(column: Column, value: Any): Column =
+    Column.fn("array_position", column, lit(value))
+
+  /**
+   * Returns element of array at given index in value if column is array. Returns value for the
+   * given key in value if column is map.
+   *
+   * @group collection_funcs
+   * @since 3.4.0
+   */
+  def element_at(column: Column, value: Any): Column = Column.fn("element_at", column, lit(value))
+
+  /**
+   * Returns element of array at given (0-based) index. If the index points outside of the array
+   * boundaries, then this function returns NULL.
+   *
+   * @group collection_funcs
+   * @since 3.4.0
+   */
+  def get(column: Column, index: Column): Column = Column.fn("get", column, index)
+
+  /**
+   * Sorts the input array in ascending order. The elements of the input array must be orderable.
+   * NaN is greater than any non-NaN elements for double/float type. Null elements will be placed
+   * at the end of the returned array.
+   *
+   * @group collection_funcs
+   * @since 3.4.0
+   */
+  def array_sort(e: Column): Column = Column.fn("array_sort", e)
+
+  /**
+   * Sorts the input array based on the given comparator function. The comparator will take two
+   * arguments representing two elements of the array. It returns a negative integer, 0, or a
+   * positive integer as the first element is less than, equal to, or greater than the second
+   * element. If the comparator function returns null, the function will fail and raise an error.
+   *
+   * @group collection_funcs
+   * @since 3.4.0
+   */
+  def array_sort(e: Column, comparator: (Column, Column) => Column): Column =
+    Column.fn("array_sort", e, createLambda(comparator))
+
+  /**
+   * Remove all elements that equal to element from the given array.
+   *
+   * @group collection_funcs
+   * @since 3.4.0
+   */
+  def array_remove(column: Column, element: Any): Column =
+    Column.fn("array_remove", column, lit(element))
+
+  /**
+   * Remove all null elements from the given array.
+   *
+   * @group collection_funcs
+   * @since 3.4.0
+   */
+  def array_compact(column: Column): Column = Column.fn("array_compact", column)
+
+  /**
+   * Removes duplicate values from the array.
+   * @group collection_funcs
+   * @since 3.4.0
+   */
+  def array_distinct(e: Column): Column = Column.fn("array_distinct", e)
+
+  /**
+   * Returns an array of the elements in the intersection of the given two arrays, without
+   * duplicates.
+   *
+   * @group collection_funcs
+   * @since 3.4.0
+   */
+  def array_intersect(col1: Column, col2: Column): Column =
+    Column.fn("array_intersect", col1, col2)
+
+  /**
+   * Adds an item into a given array at a specified position
+   *
+   * @group collection_funcs
+   * @since 3.4.0
+   */
+  def array_insert(arr: Column, pos: Column, value: Column): Column =
+    Column.fn("array_insert", arr, pos, value)
+
+  /**
+   * Returns an array of the elements in the union of the given two arrays, without duplicates.
+   *
+   * @group collection_funcs
+   * @since 3.4.0
+   */
+  def array_union(col1: Column, col2: Column): Column =
+    Column.fn("array_union", col1, col2)
+
+  /**
+   * Returns an array of the elements in the first array but not in the second array, without
+   * duplicates. The order of elements in the result is not determined
+   *
+   * @group collection_funcs
+   * @since 3.4.0
+   */
+  def array_except(col1: Column, col2: Column): Column =
+    Column.fn("array_except", col1, col2)
+
+  private def newLambdaVariable(name: String): proto.Expression.UnresolvedNamedLambdaVariable = {
+    proto.Expression.UnresolvedNamedLambdaVariable
+      .newBuilder()
+      .addNameParts(name)
+      .build()
+  }
+
+  private def toLambdaVariableColumn(
+      v: proto.Expression.UnresolvedNamedLambdaVariable): Column = {
+    Column(_.setUnresolvedNamedLambdaVariable(v))
+  }
+
+  private def createLambda(f: Column => Column): Column = Column { builder =>
+    val x = newLambdaVariable("x")

Review Comment:
   Well first of we don't have that methods in connect :)... Secondly, and more importantly, the analyzer will resolve the named lambda variables and as part of that will make sure we use the correct variable. The client does not have to take care of this detail.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] LuciferYang commented on a diff in pull request #40130: [SPARK-42531][CONNECT] Scala Client Add Collections Functions

Posted by "LuciferYang (via GitHub)" <gi...@apache.org>.
LuciferYang commented on code in PR #40130:
URL: https://github.com/apache/spark/pull/40130#discussion_r1115721090


##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/functions.scala:
##########
@@ -3845,6 +3849,1213 @@ object functions {
    */
   def timestamp_seconds(e: Column): Column = Column.fn("timestamp_seconds", e)
 
+  //////////////////////////////////////////////////////////////////////////////////////////////
+  // Collection functions
+  //////////////////////////////////////////////////////////////////////////////////////////////
+
+  /**
+   * Returns null if the array is null, true if the array contains `value`, and false otherwise.
+   * @group collection_funcs
+   * @since 3.4.0
+   */
+  def array_contains(column: Column, value: Any): Column =
+    Column.fn("array_contains", column, lit(value))
+
+  /**
+   * Returns an ARRAY containing all elements from the source ARRAY as well as the new element.
+   * The new element/column is located at end of the ARRAY.
+   *
+   * @group collection_funcs
+   * @since 3.4.0
+   */
+  def array_append(column: Column, element: Any): Column =
+    Column.fn("array_append", column, lit(element))
+
+  /**
+   * Returns `true` if `a1` and `a2` have at least one non-null element in common. If not and both
+   * the arrays are non-empty and any of them contains a `null`, it returns `null`. It returns
+   * `false` otherwise.
+   * @group collection_funcs
+   * @since 3.4.0
+   */
+  def arrays_overlap(a1: Column, a2: Column): Column = Column.fn("arrays_overlap", a1, a2)
+
+  /**
+   * Returns an array containing all the elements in `x` from index `start` (or starting from the
+   * end if `start` is negative) with the specified `length`.
+   *
+   * @param x
+   *   the array column to be sliced
+   * @param start
+   *   the starting index
+   * @param length
+   *   the length of the slice
+   *
+   * @group collection_funcs
+   * @since 2.4.0
+   */
+  def slice(x: Column, start: Int, length: Int): Column =
+    slice(x, lit(start), lit(length))
+
+  /**
+   * Returns an array containing all the elements in `x` from index `start` (or starting from the
+   * end if `start` is negative) with the specified `length`.
+   *
+   * @param x
+   *   the array column to be sliced
+   * @param start
+   *   the starting index
+   * @param length
+   *   the length of the slice
+   *
+   * @group collection_funcs
+   * @since 3.4.0
+   */
+  def slice(x: Column, start: Column, length: Column): Column =
+    Column.fn("slice", x, start, length)
+
+  /**
+   * Concatenates the elements of `column` using the `delimiter`. Null values are replaced with
+   * `nullReplacement`.
+   * @group collection_funcs
+   * @since 3.4.0
+   */
+  def array_join(column: Column, delimiter: String, nullReplacement: String): Column =
+    Column.fn("array_join", column, lit(delimiter), lit(nullReplacement))
+
+  /**
+   * Concatenates the elements of `column` using the `delimiter`.
+   * @group collection_funcs
+   * @since 3.4.0
+   */
+  def array_join(column: Column, delimiter: String): Column =
+    Column.fn("array_join", column, lit(delimiter))
+
+  /**
+   * Concatenates multiple input columns together into a single column. The function works with
+   * strings, binary and compatible array columns.
+   *
+   * @group collection_funcs
+   * @since 3.4.0
+   */
+  @scala.annotation.varargs
+  def concat(exprs: Column*): Column = Column.fn("concat", exprs: _*)
+
+  /**
+   * Locates the position of the first occurrence of the value in the given array as long. Returns
+   * null if either of the arguments are null.
+   *
+   * @note
+   *   The position is not zero based, but 1 based index. Returns 0 if value could not be found in
+   *   array.
+   *
+   * @group collection_funcs
+   * @since 3.4.0
+   */
+  def array_position(column: Column, value: Any): Column =
+    Column.fn("array_position", column, lit(value))
+
+  /**
+   * Returns element of array at given index in value if column is array. Returns value for the
+   * given key in value if column is map.
+   *
+   * @group collection_funcs
+   * @since 3.4.0
+   */
+  def element_at(column: Column, value: Any): Column = Column.fn("element_at", column, lit(value))
+
+  /**
+   * Returns element of array at given (0-based) index. If the index points outside of the array
+   * boundaries, then this function returns NULL.
+   *
+   * @group collection_funcs
+   * @since 3.4.0
+   */
+  def get(column: Column, index: Column): Column = Column.fn("get", column, index)
+
+  /**
+   * Sorts the input array in ascending order. The elements of the input array must be orderable.
+   * NaN is greater than any non-NaN elements for double/float type. Null elements will be placed
+   * at the end of the returned array.
+   *
+   * @group collection_funcs
+   * @since 3.4.0
+   */
+  def array_sort(e: Column): Column = Column.fn("array_sort", e)
+
+  /**
+   * Sorts the input array based on the given comparator function. The comparator will take two
+   * arguments representing two elements of the array. It returns a negative integer, 0, or a
+   * positive integer as the first element is less than, equal to, or greater than the second
+   * element. If the comparator function returns null, the function will fail and raise an error.
+   *
+   * @group collection_funcs
+   * @since 3.4.0
+   */
+  def array_sort(e: Column, comparator: (Column, Column) => Column): Column =
+    Column.fn("array_sort", e, createLambda(comparator))
+
+  /**
+   * Remove all elements that equal to element from the given array.
+   *
+   * @group collection_funcs
+   * @since 3.4.0
+   */
+  def array_remove(column: Column, element: Any): Column =
+    Column.fn("array_remove", column, lit(element))
+
+  /**
+   * Remove all null elements from the given array.
+   *
+   * @group collection_funcs
+   * @since 3.4.0
+   */
+  def array_compact(column: Column): Column = Column.fn("array_compact", column)
+
+  /**
+   * Removes duplicate values from the array.
+   * @group collection_funcs
+   * @since 3.4.0
+   */
+  def array_distinct(e: Column): Column = Column.fn("array_distinct", e)
+
+  /**
+   * Returns an array of the elements in the intersection of the given two arrays, without
+   * duplicates.
+   *
+   * @group collection_funcs
+   * @since 3.4.0
+   */
+  def array_intersect(col1: Column, col2: Column): Column =
+    Column.fn("array_intersect", col1, col2)
+
+  /**
+   * Adds an item into a given array at a specified position
+   *
+   * @group collection_funcs
+   * @since 3.4.0
+   */
+  def array_insert(arr: Column, pos: Column, value: Column): Column =
+    Column.fn("array_insert", arr, pos, value)
+
+  /**
+   * Returns an array of the elements in the union of the given two arrays, without duplicates.
+   *
+   * @group collection_funcs
+   * @since 3.4.0
+   */
+  def array_union(col1: Column, col2: Column): Column =
+    Column.fn("array_union", col1, col2)
+
+  /**
+   * Returns an array of the elements in the first array but not in the second array, without
+   * duplicates. The order of elements in the result is not determined
+   *
+   * @group collection_funcs
+   * @since 3.4.0
+   */
+  def array_except(col1: Column, col2: Column): Column =
+    Column.fn("array_except", col1, col2)
+
+  private def newLambdaVariable(name: String): proto.Expression.UnresolvedNamedLambdaVariable = {
+    proto.Expression.UnresolvedNamedLambdaVariable
+      .newBuilder()
+      .addNameParts(name)
+      .build()
+  }
+
+  private def toLambdaVariableColumn(
+      v: proto.Expression.UnresolvedNamedLambdaVariable): Column = {
+    Column(_.setUnresolvedNamedLambdaVariable(v))
+  }
+
+  private def createLambda(f: Column => Column): Column = Column { builder =>
+    val x = newLambdaVariable("x")

Review Comment:
   Thanks very much for your explanation
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] hvanhovell commented on a diff in pull request #40130: [SPARK-42531][CONNECT] Scala Client Add Collections Functions

Posted by "hvanhovell (via GitHub)" <gi...@apache.org>.
hvanhovell commented on code in PR #40130:
URL: https://github.com/apache/spark/pull/40130#discussion_r1115666566


##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/functions.scala:
##########
@@ -3845,6 +3849,1213 @@ object functions {
    */
   def timestamp_seconds(e: Column): Column = Column.fn("timestamp_seconds", e)
 
+  //////////////////////////////////////////////////////////////////////////////////////////////
+  // Collection functions
+  //////////////////////////////////////////////////////////////////////////////////////////////
+
+  /**
+   * Returns null if the array is null, true if the array contains `value`, and false otherwise.
+   * @group collection_funcs
+   * @since 3.4.0
+   */
+  def array_contains(column: Column, value: Any): Column =
+    Column.fn("array_contains", column, lit(value))
+
+  /**
+   * Returns an ARRAY containing all elements from the source ARRAY as well as the new element.
+   * The new element/column is located at end of the ARRAY.
+   *
+   * @group collection_funcs
+   * @since 3.4.0
+   */
+  def array_append(column: Column, element: Any): Column =
+    Column.fn("array_append", column, lit(element))
+
+  /**
+   * Returns `true` if `a1` and `a2` have at least one non-null element in common. If not and both
+   * the arrays are non-empty and any of them contains a `null`, it returns `null`. It returns
+   * `false` otherwise.
+   * @group collection_funcs
+   * @since 3.4.0
+   */
+  def arrays_overlap(a1: Column, a2: Column): Column = Column.fn("arrays_overlap", a1, a2)
+
+  /**
+   * Returns an array containing all the elements in `x` from index `start` (or starting from the
+   * end if `start` is negative) with the specified `length`.
+   *
+   * @param x
+   *   the array column to be sliced
+   * @param start
+   *   the starting index
+   * @param length
+   *   the length of the slice
+   *
+   * @group collection_funcs
+   * @since 2.4.0

Review Comment:
   arrrgghhh.... missed that one



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org