You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by "Daniel-Davies (via GitHub)" <gi...@apache.org> on 2023/02/01 15:51:25 UTC

[GitHub] [spark] Daniel-Davies commented on a diff in pull request #38867: [SPARK-41234][SQL][PYTHON] Add `array_insert` function

Daniel-Davies commented on code in PR #38867:
URL: https://github.com/apache/spark/pull/38867#discussion_r1092124391


##########
sql/core/src/test/scala/org/apache/spark/sql/DataFrameFunctionsSuite.scala:
##########
@@ -3106,6 +3106,50 @@ class DataFrameFunctionsSuite extends QueryTest with SharedSparkSession {
     )
   }
 
+  test("array_insert functions") {
+    val fiveShort: Short = 5
+
+    val df1 = Seq((Array[Int](3, 2, 5, 1, 2), 5, 3)).toDF("a", "b", "c")
+    val df2 = Seq((Array[Short](1, 2, 3, 4), 4, fiveShort)).toDF("a", "b", "c")
+    val df3 = Seq((Array[Double](3.0, 2.0, 5.0, 1.0, 2.0), 1, 3.0)).toDF("a", "b", "c")
+    val df4 = Seq((Array[Boolean](true, false), 2, false)).toDF("a", "b", "c")
+    val df5 = Seq((Array[String]("a", "b", "c"), 0, "d")).toDF("a", "b", "c")
+    val df6 = Seq((Array[String]("a", null, "b", "c"), 4, "d")).toDF("a", "b", "c")
+    val df7 = Seq((Array[Integer](3, 2, 5, null, 1, 2), 5, 3)).toDF("a", "b", "c")
+
+    checkAnswer(df1.selectExpr("array_insert(a, b, c)"), Seq(Row(Seq(3, 2, 5, 1, 2, 3))))
+    checkAnswer(df2.selectExpr("array_insert(a, b, c)"), Seq(Row(Seq[Short](1, 2, 3, 4, 5))))
+    checkAnswer(
+      df3.selectExpr("array_insert(a, b, c)"),
+      Seq(Row(Seq[Double](3.0, 3.0, 2.0, 5.0, 1.0, 2.0)))
+    )
+    checkAnswer(df4.selectExpr("array_insert(a, b, c)"), Seq(Row(Seq(true, false, false))))
+    checkAnswer(df5.selectExpr("array_insert(a, b, c)"), Seq(Row(Seq("d", "a", "b", "c"))))
+
+    // null checks
+    checkAnswer(df6.selectExpr("array_insert(a, b, c)"), Seq(Row(Seq("a", null, "b", "c", "d"))))
+    checkAnswer(df5.select(
+      array_insert(col("a"), col("b"), lit(null).cast("string"))),
+      Seq(Row(Seq(null, "a", "b", "c")))
+    )
+    checkAnswer(df6.select(
+      array_insert(col("a"), col("b"), lit(null).cast("string"))),
+      Seq(Row(Seq("a", null, "b", "c", null)))
+    )
+    checkAnswer(
+      df5.select(array_insert(col("a"), lit(null).cast("integer"), col("c"))),
+      Seq(Row(null))
+    )
+    checkAnswer(
+      df5.select(array_insert(lit(null).cast("array<string>"), col("b"), col("c"))),
+      Seq(Row(null))
+    )
+    checkAnswer(
+      df7.selectExpr("array_insert(a, 7, c)"), Seq(Row(Seq(3, 2, 5, null, 1, 2, null, 3))))

Review Comment:
   It seems that, because the array was specified as type `Array[Int]`, null entries are not permitted. Changing this to `Array[Integer]` seems to fix this. However, I still don't know why this worked a few weeks ago.



##########
sql/core/src/test/resources/sql-tests/results/array.sql.out:
##########
@@ -431,6 +431,104 @@ struct<get(array(1, 2, 3), -1):int>
 NULL
 
 
+-- !query
+select array_insert(array(1, 2, 3), 3, 4)
+-- !query schema
+struct<array_insert(array(1, 2, 3), 3, 4):array<int>>
+-- !query output
+[1,2,4,3]
+
+
+-- !query
+select array_insert(array(2, 3, 4), 0, 1)

Review Comment:
   No problem at all :)
   
   For pos=0, this was to implement the behaviour specified [here](https://github.com/apache/spark/pull/38867#discussion_r1041949049). However, the other spark functions (elementAt, slice) throw an exception i.e. see [slice](https://github.com/apache/spark/blob/c13fea90595cb1489046135c45c92d3bacb85818/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala#L1662). What would your preference be? Maybe we should throw an exception, considering we are no longer implementing the snowflake behaviour?



##########
python/pyspark/sql/functions.py:
##########
@@ -7679,6 +7679,41 @@ def array_distinct(col: "ColumnOrName") -> Column:
     return _invoke_function_over_columns("array_distinct", col)
 
 
+@try_remote_functions
+def array_insert(arr: "ColumnOrName", pos: "ColumnOrName", value: "ColumnOrName") -> Column:
+    """
+    Collection function: adds an item into a given array at a specified position. A position
+    specified beyond the size of the current array (plus additional element)
+    is extended with 'null' elements.
+
+    .. versionadded:: 3.4.0
+
+    Parameters
+    ----------
+    arr : :class:`~pyspark.sql.Column` or str
+        name of column containing an array
+    pos : :class:`~pyspark.sql.Column` or str
+        name of Numeric type column indicating position of insertion (starting index 1)

Review Comment:
   Of course, will make sure to do so throughout!



##########
sql/core/src/test/resources/sql-tests/results/array.sql.out:
##########
@@ -431,6 +431,104 @@ struct<get(array(1, 2, 3), -1):int>
 NULL
 
 
+-- !query
+select array_insert(array(1, 2, 3), 3, 4)
+-- !query schema
+struct<array_insert(array(1, 2, 3), 3, 4):array<int>>
+-- !query output
+[1,2,4,3]
+
+
+-- !query
+select array_insert(array(2, 3, 4), 0, 1)

Review Comment:
   CC @LuciferYang for opinion also, as original commenter, and @cloud-fan for any additional context



##########
sql/core/src/test/scala/org/apache/spark/sql/DataFrameFunctionsSuite.scala:
##########
@@ -3106,6 +3106,50 @@ class DataFrameFunctionsSuite extends QueryTest with SharedSparkSession {
     )
   }
 
+  test("array_insert functions") {
+    val fiveShort: Short = 5
+
+    val df1 = Seq((Array[Int](3, 2, 5, 1, 2), 5, 3)).toDF("a", "b", "c")
+    val df2 = Seq((Array[Short](1, 2, 3, 4), 4, fiveShort)).toDF("a", "b", "c")
+    val df3 = Seq((Array[Double](3.0, 2.0, 5.0, 1.0, 2.0), 1, 3.0)).toDF("a", "b", "c")
+    val df4 = Seq((Array[Boolean](true, false), 2, false)).toDF("a", "b", "c")
+    val df5 = Seq((Array[String]("a", "b", "c"), 0, "d")).toDF("a", "b", "c")
+    val df6 = Seq((Array[String]("a", null, "b", "c"), 4, "d")).toDF("a", "b", "c")
+    val df7 = Seq((Array[Integer](3, 2, 5, null, 1, 2), 5, 3)).toDF("a", "b", "c")
+
+    checkAnswer(df1.selectExpr("array_insert(a, b, c)"), Seq(Row(Seq(3, 2, 5, 1, 2, 3))))
+    checkAnswer(df2.selectExpr("array_insert(a, b, c)"), Seq(Row(Seq[Short](1, 2, 3, 4, 5))))
+    checkAnswer(
+      df3.selectExpr("array_insert(a, b, c)"),
+      Seq(Row(Seq[Double](3.0, 3.0, 2.0, 5.0, 1.0, 2.0)))
+    )
+    checkAnswer(df4.selectExpr("array_insert(a, b, c)"), Seq(Row(Seq(true, false, false))))
+    checkAnswer(df5.selectExpr("array_insert(a, b, c)"), Seq(Row(Seq("d", "a", "b", "c"))))
+
+    // null checks
+    checkAnswer(df6.selectExpr("array_insert(a, b, c)"), Seq(Row(Seq("a", null, "b", "c", "d"))))
+    checkAnswer(df5.select(
+      array_insert(col("a"), col("b"), lit(null).cast("string"))),
+      Seq(Row(Seq(null, "a", "b", "c")))
+    )
+    checkAnswer(df6.select(
+      array_insert(col("a"), col("b"), lit(null).cast("string"))),
+      Seq(Row(Seq("a", null, "b", "c", null)))
+    )
+    checkAnswer(
+      df5.select(array_insert(col("a"), lit(null).cast("integer"), col("c"))),
+      Seq(Row(null))
+    )
+    checkAnswer(
+      df5.select(array_insert(lit(null).cast("array<string>"), col("b"), col("c"))),
+      Seq(Row(null))
+    )
+    checkAnswer(
+      df7.selectExpr("array_insert(a, 7, c)"), Seq(Row(Seq(3, 2, 5, null, 1, 2, null, 3))))

Review Comment:
   This test has started behaving differently in the last few weeks; previously the below worked:
   
   `checkAnswer(df1.selectExpr("array_insert(a, 6, c)"), Seq(Row(Seq(3, 2, 5, 1, 2, null, 3))))`
   
   Now, it fails, as the result ends up being `Seq(3, 2, 5, 1, 2, 0, 3)`- i.e. the nulls are replaced with a 0. I've run the test on my logic from a few weeks ago, and it still fails, so something has changed in the test suite in the last 3 weeks.
   
   I'm not sure how to specify that the Array can have null elements in the scala syntax, so for now, I've just added a null to the test array (i.e. `(3, 2, 5, 1, 2)` => `(3, 2, 5, null, 1, 2)`).
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org