You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by "LuciferYang (via GitHub)" <gi...@apache.org> on 2023/02/23 09:09:30 UTC

[GitHub] [spark] LuciferYang commented on a diff in pull request #40130: [SPARK-42531][CONNECT] Scala Client Add Collections Functions

LuciferYang commented on code in PR #40130:
URL: https://github.com/apache/spark/pull/40130#discussion_r1115344944


##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/functions.scala:
##########
@@ -3845,6 +3849,1213 @@ object functions {
    */
   def timestamp_seconds(e: Column): Column = Column.fn("timestamp_seconds", e)
 
+  //////////////////////////////////////////////////////////////////////////////////////////////
+  // Collection functions
+  //////////////////////////////////////////////////////////////////////////////////////////////
+
+  /**
+   * Returns null if the array is null, true if the array contains `value`, and false otherwise.
+   * @group collection_funcs
+   * @since 3.4.0
+   */
+  def array_contains(column: Column, value: Any): Column =
+    Column.fn("array_contains", column, lit(value))
+
+  /**
+   * Returns an ARRAY containing all elements from the source ARRAY as well as the new element.
+   * The new element/column is located at end of the ARRAY.
+   *
+   * @group collection_funcs
+   * @since 3.4.0
+   */
+  def array_append(column: Column, element: Any): Column =
+    Column.fn("array_append", column, lit(element))
+
+  /**
+   * Returns `true` if `a1` and `a2` have at least one non-null element in common. If not and both
+   * the arrays are non-empty and any of them contains a `null`, it returns `null`. It returns
+   * `false` otherwise.
+   * @group collection_funcs
+   * @since 3.4.0
+   */
+  def arrays_overlap(a1: Column, a2: Column): Column = Column.fn("arrays_overlap", a1, a2)
+
+  /**
+   * Returns an array containing all the elements in `x` from index `start` (or starting from the
+   * end if `start` is negative) with the specified `length`.
+   *
+   * @param x
+   *   the array column to be sliced
+   * @param start
+   *   the starting index
+   * @param length
+   *   the length of the slice
+   *
+   * @group collection_funcs
+   * @since 2.4.0

Review Comment:
   `@since`



##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/functions.scala:
##########
@@ -3845,6 +3849,1213 @@ object functions {
    */
   def timestamp_seconds(e: Column): Column = Column.fn("timestamp_seconds", e)
 
+  //////////////////////////////////////////////////////////////////////////////////////////////
+  // Collection functions
+  //////////////////////////////////////////////////////////////////////////////////////////////
+
+  /**
+   * Returns null if the array is null, true if the array contains `value`, and false otherwise.
+   * @group collection_funcs
+   * @since 3.4.0
+   */
+  def array_contains(column: Column, value: Any): Column =
+    Column.fn("array_contains", column, lit(value))
+
+  /**
+   * Returns an ARRAY containing all elements from the source ARRAY as well as the new element.
+   * The new element/column is located at end of the ARRAY.
+   *
+   * @group collection_funcs
+   * @since 3.4.0
+   */
+  def array_append(column: Column, element: Any): Column =
+    Column.fn("array_append", column, lit(element))
+
+  /**
+   * Returns `true` if `a1` and `a2` have at least one non-null element in common. If not and both
+   * the arrays are non-empty and any of them contains a `null`, it returns `null`. It returns
+   * `false` otherwise.
+   * @group collection_funcs
+   * @since 3.4.0
+   */
+  def arrays_overlap(a1: Column, a2: Column): Column = Column.fn("arrays_overlap", a1, a2)
+
+  /**
+   * Returns an array containing all the elements in `x` from index `start` (or starting from the
+   * end if `start` is negative) with the specified `length`.
+   *
+   * @param x
+   *   the array column to be sliced
+   * @param start
+   *   the starting index
+   * @param length
+   *   the length of the slice
+   *
+   * @group collection_funcs
+   * @since 2.4.0
+   */
+  def slice(x: Column, start: Int, length: Int): Column =
+    slice(x, lit(start), lit(length))
+
+  /**
+   * Returns an array containing all the elements in `x` from index `start` (or starting from the
+   * end if `start` is negative) with the specified `length`.
+   *
+   * @param x
+   *   the array column to be sliced
+   * @param start
+   *   the starting index
+   * @param length
+   *   the length of the slice
+   *
+   * @group collection_funcs
+   * @since 3.4.0
+   */
+  def slice(x: Column, start: Column, length: Column): Column =
+    Column.fn("slice", x, start, length)
+
+  /**
+   * Concatenates the elements of `column` using the `delimiter`. Null values are replaced with
+   * `nullReplacement`.
+   * @group collection_funcs
+   * @since 3.4.0
+   */
+  def array_join(column: Column, delimiter: String, nullReplacement: String): Column =
+    Column.fn("array_join", column, lit(delimiter), lit(nullReplacement))
+
+  /**
+   * Concatenates the elements of `column` using the `delimiter`.
+   * @group collection_funcs
+   * @since 3.4.0
+   */
+  def array_join(column: Column, delimiter: String): Column =
+    Column.fn("array_join", column, lit(delimiter))
+
+  /**
+   * Concatenates multiple input columns together into a single column. The function works with
+   * strings, binary and compatible array columns.
+   *
+   * @group collection_funcs
+   * @since 3.4.0
+   */
+  @scala.annotation.varargs
+  def concat(exprs: Column*): Column = Column.fn("concat", exprs: _*)
+
+  /**
+   * Locates the position of the first occurrence of the value in the given array as long. Returns
+   * null if either of the arguments are null.
+   *
+   * @note
+   *   The position is not zero based, but 1 based index. Returns 0 if value could not be found in
+   *   array.
+   *
+   * @group collection_funcs
+   * @since 3.4.0
+   */
+  def array_position(column: Column, value: Any): Column =
+    Column.fn("array_position", column, lit(value))
+
+  /**
+   * Returns element of array at given index in value if column is array. Returns value for the
+   * given key in value if column is map.
+   *
+   * @group collection_funcs
+   * @since 3.4.0
+   */
+  def element_at(column: Column, value: Any): Column = Column.fn("element_at", column, lit(value))
+
+  /**
+   * Returns element of array at given (0-based) index. If the index points outside of the array
+   * boundaries, then this function returns NULL.
+   *
+   * @group collection_funcs
+   * @since 3.4.0
+   */
+  def get(column: Column, index: Column): Column = Column.fn("get", column, index)
+
+  /**
+   * Sorts the input array in ascending order. The elements of the input array must be orderable.
+   * NaN is greater than any non-NaN elements for double/float type. Null elements will be placed
+   * at the end of the returned array.
+   *
+   * @group collection_funcs
+   * @since 3.4.0
+   */
+  def array_sort(e: Column): Column = Column.fn("array_sort", e)
+
+  /**
+   * Sorts the input array based on the given comparator function. The comparator will take two
+   * arguments representing two elements of the array. It returns a negative integer, 0, or a
+   * positive integer as the first element is less than, equal to, or greater than the second
+   * element. If the comparator function returns null, the function will fail and raise an error.
+   *
+   * @group collection_funcs
+   * @since 3.4.0
+   */
+  def array_sort(e: Column, comparator: (Column, Column) => Column): Column =
+    Column.fn("array_sort", e, createLambda(comparator))
+
+  /**
+   * Remove all elements that equal to element from the given array.
+   *
+   * @group collection_funcs
+   * @since 3.4.0
+   */
+  def array_remove(column: Column, element: Any): Column =
+    Column.fn("array_remove", column, lit(element))
+
+  /**
+   * Remove all null elements from the given array.
+   *
+   * @group collection_funcs
+   * @since 3.4.0
+   */
+  def array_compact(column: Column): Column = Column.fn("array_compact", column)
+
+  /**
+   * Removes duplicate values from the array.
+   * @group collection_funcs
+   * @since 3.4.0
+   */
+  def array_distinct(e: Column): Column = Column.fn("array_distinct", e)
+
+  /**
+   * Returns an array of the elements in the intersection of the given two arrays, without
+   * duplicates.
+   *
+   * @group collection_funcs
+   * @since 3.4.0
+   */
+  def array_intersect(col1: Column, col2: Column): Column =
+    Column.fn("array_intersect", col1, col2)
+
+  /**
+   * Adds an item into a given array at a specified position
+   *
+   * @group collection_funcs
+   * @since 3.4.0
+   */
+  def array_insert(arr: Column, pos: Column, value: Column): Column =
+    Column.fn("array_insert", arr, pos, value)
+
+  /**
+   * Returns an array of the elements in the union of the given two arrays, without duplicates.
+   *
+   * @group collection_funcs
+   * @since 3.4.0
+   */
+  def array_union(col1: Column, col2: Column): Column =
+    Column.fn("array_union", col1, col2)
+
+  /**
+   * Returns an array of the elements in the first array but not in the second array, without
+   * duplicates. The order of elements in the result is not determined
+   *
+   * @group collection_funcs
+   * @since 3.4.0
+   */
+  def array_except(col1: Column, col2: Column): Column =
+    Column.fn("array_except", col1, col2)
+
+  private def newLambdaVariable(name: String): proto.Expression.UnresolvedNamedLambdaVariable = {
+    proto.Expression.UnresolvedNamedLambdaVariable
+      .newBuilder()
+      .addNameParts(name)
+      .build()
+  }
+
+  private def toLambdaVariableColumn(
+      v: proto.Expression.UnresolvedNamedLambdaVariable): Column = {
+    Column(_.setUnresolvedNamedLambdaVariable(v))
+  }
+
+  private def createLambda(f: Column => Column): Column = Column { builder =>
+    val x = newLambdaVariable("x")

Review Comment:
   Can we always use `x`? 
   Why not `UnresolvedNamedLambdaVariable.freshVarName("x")`?
   
   
   
   



##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/functions.scala:
##########
@@ -3845,6 +3849,1213 @@ object functions {
    */
   def timestamp_seconds(e: Column): Column = Column.fn("timestamp_seconds", e)
 
+  //////////////////////////////////////////////////////////////////////////////////////////////
+  // Collection functions
+  //////////////////////////////////////////////////////////////////////////////////////////////
+
+  /**
+   * Returns null if the array is null, true if the array contains `value`, and false otherwise.
+   * @group collection_funcs
+   * @since 3.4.0
+   */
+  def array_contains(column: Column, value: Any): Column =
+    Column.fn("array_contains", column, lit(value))
+
+  /**
+   * Returns an ARRAY containing all elements from the source ARRAY as well as the new element.
+   * The new element/column is located at end of the ARRAY.
+   *
+   * @group collection_funcs
+   * @since 3.4.0
+   */
+  def array_append(column: Column, element: Any): Column =
+    Column.fn("array_append", column, lit(element))
+
+  /**
+   * Returns `true` if `a1` and `a2` have at least one non-null element in common. If not and both
+   * the arrays are non-empty and any of them contains a `null`, it returns `null`. It returns
+   * `false` otherwise.
+   * @group collection_funcs
+   * @since 3.4.0
+   */
+  def arrays_overlap(a1: Column, a2: Column): Column = Column.fn("arrays_overlap", a1, a2)
+
+  /**
+   * Returns an array containing all the elements in `x` from index `start` (or starting from the
+   * end if `start` is negative) with the specified `length`.
+   *
+   * @param x
+   *   the array column to be sliced
+   * @param start
+   *   the starting index
+   * @param length
+   *   the length of the slice
+   *
+   * @group collection_funcs
+   * @since 2.4.0
+   */
+  def slice(x: Column, start: Int, length: Int): Column =
+    slice(x, lit(start), lit(length))
+
+  /**
+   * Returns an array containing all the elements in `x` from index `start` (or starting from the
+   * end if `start` is negative) with the specified `length`.
+   *
+   * @param x
+   *   the array column to be sliced
+   * @param start
+   *   the starting index
+   * @param length
+   *   the length of the slice
+   *
+   * @group collection_funcs
+   * @since 3.4.0
+   */
+  def slice(x: Column, start: Column, length: Column): Column =
+    Column.fn("slice", x, start, length)
+
+  /**
+   * Concatenates the elements of `column` using the `delimiter`. Null values are replaced with
+   * `nullReplacement`.
+   * @group collection_funcs
+   * @since 3.4.0
+   */
+  def array_join(column: Column, delimiter: String, nullReplacement: String): Column =
+    Column.fn("array_join", column, lit(delimiter), lit(nullReplacement))
+
+  /**
+   * Concatenates the elements of `column` using the `delimiter`.
+   * @group collection_funcs
+   * @since 3.4.0
+   */
+  def array_join(column: Column, delimiter: String): Column =
+    Column.fn("array_join", column, lit(delimiter))
+
+  /**
+   * Concatenates multiple input columns together into a single column. The function works with
+   * strings, binary and compatible array columns.
+   *
+   * @group collection_funcs
+   * @since 3.4.0
+   */
+  @scala.annotation.varargs
+  def concat(exprs: Column*): Column = Column.fn("concat", exprs: _*)
+
+  /**
+   * Locates the position of the first occurrence of the value in the given array as long. Returns
+   * null if either of the arguments are null.
+   *
+   * @note
+   *   The position is not zero based, but 1 based index. Returns 0 if value could not be found in
+   *   array.
+   *
+   * @group collection_funcs
+   * @since 3.4.0
+   */
+  def array_position(column: Column, value: Any): Column =
+    Column.fn("array_position", column, lit(value))
+
+  /**
+   * Returns element of array at given index in value if column is array. Returns value for the
+   * given key in value if column is map.
+   *
+   * @group collection_funcs
+   * @since 3.4.0
+   */
+  def element_at(column: Column, value: Any): Column = Column.fn("element_at", column, lit(value))
+
+  /**
+   * Returns element of array at given (0-based) index. If the index points outside of the array
+   * boundaries, then this function returns NULL.
+   *
+   * @group collection_funcs
+   * @since 3.4.0
+   */
+  def get(column: Column, index: Column): Column = Column.fn("get", column, index)
+
+  /**
+   * Sorts the input array in ascending order. The elements of the input array must be orderable.
+   * NaN is greater than any non-NaN elements for double/float type. Null elements will be placed
+   * at the end of the returned array.
+   *
+   * @group collection_funcs
+   * @since 3.4.0
+   */
+  def array_sort(e: Column): Column = Column.fn("array_sort", e)
+
+  /**
+   * Sorts the input array based on the given comparator function. The comparator will take two
+   * arguments representing two elements of the array. It returns a negative integer, 0, or a
+   * positive integer as the first element is less than, equal to, or greater than the second
+   * element. If the comparator function returns null, the function will fail and raise an error.
+   *
+   * @group collection_funcs
+   * @since 3.4.0
+   */
+  def array_sort(e: Column, comparator: (Column, Column) => Column): Column =
+    Column.fn("array_sort", e, createLambda(comparator))
+
+  /**
+   * Remove all elements that equal to element from the given array.
+   *
+   * @group collection_funcs
+   * @since 3.4.0
+   */
+  def array_remove(column: Column, element: Any): Column =
+    Column.fn("array_remove", column, lit(element))
+
+  /**
+   * Remove all null elements from the given array.
+   *
+   * @group collection_funcs
+   * @since 3.4.0
+   */
+  def array_compact(column: Column): Column = Column.fn("array_compact", column)
+
+  /**
+   * Removes duplicate values from the array.
+   * @group collection_funcs
+   * @since 3.4.0
+   */
+  def array_distinct(e: Column): Column = Column.fn("array_distinct", e)
+
+  /**
+   * Returns an array of the elements in the intersection of the given two arrays, without
+   * duplicates.
+   *
+   * @group collection_funcs
+   * @since 3.4.0
+   */
+  def array_intersect(col1: Column, col2: Column): Column =
+    Column.fn("array_intersect", col1, col2)
+
+  /**
+   * Adds an item into a given array at a specified position
+   *
+   * @group collection_funcs
+   * @since 3.4.0
+   */
+  def array_insert(arr: Column, pos: Column, value: Column): Column =
+    Column.fn("array_insert", arr, pos, value)
+
+  /**
+   * Returns an array of the elements in the union of the given two arrays, without duplicates.
+   *
+   * @group collection_funcs
+   * @since 3.4.0
+   */
+  def array_union(col1: Column, col2: Column): Column =
+    Column.fn("array_union", col1, col2)
+
+  /**
+   * Returns an array of the elements in the first array but not in the second array, without
+   * duplicates. The order of elements in the result is not determined
+   *
+   * @group collection_funcs
+   * @since 3.4.0
+   */
+  def array_except(col1: Column, col2: Column): Column =
+    Column.fn("array_except", col1, col2)
+
+  private def newLambdaVariable(name: String): proto.Expression.UnresolvedNamedLambdaVariable = {
+    proto.Expression.UnresolvedNamedLambdaVariable
+      .newBuilder()
+      .addNameParts(name)
+      .build()
+  }
+
+  private def toLambdaVariableColumn(
+      v: proto.Expression.UnresolvedNamedLambdaVariable): Column = {
+    Column(_.setUnresolvedNamedLambdaVariable(v))
+  }
+
+  private def createLambda(f: Column => Column): Column = Column { builder =>
+    val x = newLambdaVariable("x")
+    val function = f(toLambdaVariableColumn(x))

Review Comment:
   Yes, I know where I am wrong, thanks
   
   



##########
connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientE2ETestSuite.scala:
##########
@@ -336,4 +336,31 @@ class ClientE2ETestSuite extends RemoteSparkSession {
     checkSample(datasets.get(2), 3.0 / 10.0, 6.0 / 10.0, 9L)
     checkSample(datasets.get(3), 6.0 / 10.0, 1.0, 9L)
   }
+
+  test("lambda functions") {
+    // This test is mostly to validate lambda variables are properly resolved.
+    import org.apache.spark.sql.functions._

Review Comment:
   should we move this import  be put together with others?
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org