You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2022/12/07 12:56:29 UTC

[GitHub] [spark] zhengruifeng commented on a diff in pull request #38867: [WIP] [SPARK-41234][SQL][PYTHON] Add functionality for array_insert

zhengruifeng commented on code in PR #38867:
URL: https://github.com/apache/spark/pull/38867#discussion_r1042161861


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala:
##########
@@ -4600,3 +4600,153 @@ case class ArrayExcept(left: Expression, right: Expression) extends ArrayBinaryL
   override protected def withNewChildrenInternal(
     newLeft: Expression, newRight: Expression): ArrayExcept = copy(left = newLeft, right = newRight)
 }
+
+@ExpressionDescription(
+  usage = "_FUNC_(x, pos, val) - Places val into index pos of array x (array indices start at 1)",
+  examples = """
+    Examples:
+      > SELECT _FUNC_(array(1, 2, 3, 4), 5, 5);
+       [1,2,3,4,5]
+      > SELECT _FUNC_(array(5, 3, 2, 1), 2, 4);
+       [5,4,3,2,1]
+  """,
+  group = "array_funcs",
+  since = "3.4.0")
+case class ArrayInsert(srcArrayExpr: Expression, posExpr: Expression, itemExpr: Expression)
+  extends TernaryExpression with ImplicitCastInputTypes with ComplexTypeMergingExpression
+    with SupportQueryContext with QueryErrorsBase{
+
+  override def inputTypes: Seq[AbstractDataType] = {
+    (srcArrayExpr.dataType, posExpr.dataType, itemExpr.dataType) match {
+      case (ArrayType(e1, hasNull), e2: IntegralType, e3) if (e2 != LongType) =>
+        TypeCoercion.findTightestCommonType(e1, e3) match {
+          case Some(dt) => Seq(ArrayType(dt, hasNull), IntegerType, dt)
+          case _ => Seq.empty
+        }
+      case (e1, e2, e3) => Seq.empty
+    }
+    Seq.empty
+  }
+
+  override def checkInputDataTypes(): TypeCheckResult = {

Review Comment:
   the `posExpr` should be `IntegerType`, but as to `srcArrayExpr` and `itemExpr`, I think you may refer to `ArrayRemove`



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala:
##########
@@ -4600,3 +4600,153 @@ case class ArrayExcept(left: Expression, right: Expression) extends ArrayBinaryL
   override protected def withNewChildrenInternal(
     newLeft: Expression, newRight: Expression): ArrayExcept = copy(left = newLeft, right = newRight)
 }
+
+@ExpressionDescription(
+  usage = "_FUNC_(x, pos, val) - Places val into index pos of array x (array indices start at 1)",
+  examples = """
+    Examples:
+      > SELECT _FUNC_(array(1, 2, 3, 4), 5, 5);
+       [1,2,3,4,5]
+      > SELECT _FUNC_(array(5, 3, 2, 1), 2, 4);
+       [5,4,3,2,1]
+  """,
+  group = "array_funcs",
+  since = "3.4.0")
+case class ArrayInsert(srcArrayExpr: Expression, posExpr: Expression, itemExpr: Expression)
+  extends TernaryExpression with ImplicitCastInputTypes with ComplexTypeMergingExpression
+    with SupportQueryContext with QueryErrorsBase{
+
+  override def inputTypes: Seq[AbstractDataType] = {
+    (srcArrayExpr.dataType, posExpr.dataType, itemExpr.dataType) match {
+      case (ArrayType(e1, hasNull), e2: IntegralType, e3) if (e2 != LongType) =>
+        TypeCoercion.findTightestCommonType(e1, e3) match {
+          case Some(dt) => Seq(ArrayType(dt, hasNull), IntegerType, dt)
+          case _ => Seq.empty
+        }
+      case (e1, e2, e3) => Seq.empty
+    }
+    Seq.empty
+  }
+
+  override def checkInputDataTypes(): TypeCheckResult = {
+    (first.dataType, second.dataType, third.dataType) match {
+      case (_: ArrayType, e2, e3) if e2 != IntegerType =>
+        DataTypeMismatch(
+          errorSubClass = "UNEXPECTED_INPUT_TYPE",
+          messageParameters = Map(
+            "paramIndex" -> "2",
+            "requiredType" -> toSQLType(IntegerType),
+            "inputSql" -> toSQLExpr(second),
+            "inputType" -> toSQLType(second.dataType))
+        )
+      case _ => TypeCheckResult.TypeCheckSuccess
+    }
+  }
+
+  override def eval(input: InternalRow): Any = {
+    val arr = first.eval(input)
+    val pos = second.eval(input)
+    val item = third.eval(input)
+
+    if (arr == null || pos == null) {
+      null
+    } else {
+      nullSafeEval(arr, pos, item)
+    }
+  }
+
+  override def nullSafeEval(arr: Any, pos: Any, item: Any): Any = {
+    val baseArr = arr.asInstanceOf[ArrayData]
+    val posInt = pos.asInstanceOf[Int]
+    val arrayElementType = dataType.asInstanceOf[ArrayType].elementType
+    val newArrayLength = baseArr.numElements() + 1
+
+    if (newArrayLength  > ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH) {
+      throw QueryExecutionErrors.concatArraysWithElementsExceedLimitError(newArrayLength)
+    }
+    val validatedPosInt = this.getValidPosIndexOrThrow(posInt, baseArr.numElements())
+
+    if (validatedPosInt < 0 || validatedPosInt > baseArr.numElements()) {
+      null
+    } else {
+      val newArray = new Array[Any](newArrayLength)
+      baseArr.foreach(arrayElementType, (i, v) =>
+        if (i >= validatedPosInt) {
+          newArray(i + 1) = v
+        } else {
+          newArray(i) = v
+        }
+      )
+      newArray(validatedPosInt) = item
+
+      new GenericArrayData(newArray)
+    }
+  }
+
+  private def getValidPosIndexOrThrow(pos: Int, numArrayElements: Int): Int = {
+    val validIdx = if (pos == 0) {
+      throw QueryExecutionErrors.elementAtByIndexZeroError(getContextOrNull())

Review Comment:
   0 should be acceptable



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala:
##########
@@ -4600,3 +4600,153 @@ case class ArrayExcept(left: Expression, right: Expression) extends ArrayBinaryL
   override protected def withNewChildrenInternal(
     newLeft: Expression, newRight: Expression): ArrayExcept = copy(left = newLeft, right = newRight)
 }
+
+@ExpressionDescription(
+  usage = "_FUNC_(x, pos, val) - Places val into index pos of array x (array indices start at 1)",
+  examples = """
+    Examples:
+      > SELECT _FUNC_(array(1, 2, 3, 4), 5, 5);
+       [1,2,3,4,5]
+      > SELECT _FUNC_(array(5, 3, 2, 1), 2, 4);
+       [5,4,3,2,1]
+  """,
+  group = "array_funcs",
+  since = "3.4.0")
+case class ArrayInsert(srcArrayExpr: Expression, posExpr: Expression, itemExpr: Expression)
+  extends TernaryExpression with ImplicitCastInputTypes with ComplexTypeMergingExpression
+    with SupportQueryContext with QueryErrorsBase{
+
+  override def inputTypes: Seq[AbstractDataType] = {
+    (srcArrayExpr.dataType, posExpr.dataType, itemExpr.dataType) match {
+      case (ArrayType(e1, hasNull), e2: IntegralType, e3) if (e2 != LongType) =>
+        TypeCoercion.findTightestCommonType(e1, e3) match {
+          case Some(dt) => Seq(ArrayType(dt, hasNull), IntegerType, dt)
+          case _ => Seq.empty
+        }
+      case (e1, e2, e3) => Seq.empty
+    }
+    Seq.empty
+  }
+
+  override def checkInputDataTypes(): TypeCheckResult = {
+    (first.dataType, second.dataType, third.dataType) match {
+      case (_: ArrayType, e2, e3) if e2 != IntegerType =>
+        DataTypeMismatch(
+          errorSubClass = "UNEXPECTED_INPUT_TYPE",
+          messageParameters = Map(
+            "paramIndex" -> "2",
+            "requiredType" -> toSQLType(IntegerType),
+            "inputSql" -> toSQLExpr(second),
+            "inputType" -> toSQLType(second.dataType))
+        )
+      case _ => TypeCheckResult.TypeCheckSuccess
+    }
+  }
+
+  override def eval(input: InternalRow): Any = {
+    val arr = first.eval(input)
+    val pos = second.eval(input)
+    val item = third.eval(input)
+
+    if (arr == null || pos == null) {
+      null
+    } else {
+      nullSafeEval(arr, pos, item)
+    }
+  }
+
+  override def nullSafeEval(arr: Any, pos: Any, item: Any): Any = {
+    val baseArr = arr.asInstanceOf[ArrayData]
+    val posInt = pos.asInstanceOf[Int]
+    val arrayElementType = dataType.asInstanceOf[ArrayType].elementType
+    val newArrayLength = baseArr.numElements() + 1
+
+    if (newArrayLength  > ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH) {
+      throw QueryExecutionErrors.concatArraysWithElementsExceedLimitError(newArrayLength)
+    }
+    val validatedPosInt = this.getValidPosIndexOrThrow(posInt, baseArr.numElements())
+
+    if (validatedPosInt < 0 || validatedPosInt > baseArr.numElements()) {
+      null
+    } else {
+      val newArray = new Array[Any](newArrayLength)
+      baseArr.foreach(arrayElementType, (i, v) =>
+        if (i >= validatedPosInt) {
+          newArray(i + 1) = v
+        } else {
+          newArray(i) = v
+        }
+      )
+      newArray(validatedPosInt) = item
+
+      new GenericArrayData(newArray)
+    }
+  }
+
+  private def getValidPosIndexOrThrow(pos: Int, numArrayElements: Int): Int = {

Review Comment:
   since this method is only used once, I think we do not need it. 



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala:
##########
@@ -4600,3 +4600,153 @@ case class ArrayExcept(left: Expression, right: Expression) extends ArrayBinaryL
   override protected def withNewChildrenInternal(
     newLeft: Expression, newRight: Expression): ArrayExcept = copy(left = newLeft, right = newRight)
 }
+
+@ExpressionDescription(
+  usage = "_FUNC_(x, pos, val) - Places val into index pos of array x (array indices start at 1)",
+  examples = """
+    Examples:
+      > SELECT _FUNC_(array(1, 2, 3, 4), 5, 5);
+       [1,2,3,4,5]
+      > SELECT _FUNC_(array(5, 3, 2, 1), 2, 4);
+       [5,4,3,2,1]
+  """,
+  group = "array_funcs",
+  since = "3.4.0")
+case class ArrayInsert(srcArrayExpr: Expression, posExpr: Expression, itemExpr: Expression)
+  extends TernaryExpression with ImplicitCastInputTypes with ComplexTypeMergingExpression
+    with SupportQueryContext with QueryErrorsBase{
+
+  override def inputTypes: Seq[AbstractDataType] = {
+    (srcArrayExpr.dataType, posExpr.dataType, itemExpr.dataType) match {
+      case (ArrayType(e1, hasNull), e2: IntegralType, e3) if (e2 != LongType) =>
+        TypeCoercion.findTightestCommonType(e1, e3) match {
+          case Some(dt) => Seq(ArrayType(dt, hasNull), IntegerType, dt)
+          case _ => Seq.empty
+        }
+      case (e1, e2, e3) => Seq.empty
+    }
+    Seq.empty
+  }
+
+  override def checkInputDataTypes(): TypeCheckResult = {
+    (first.dataType, second.dataType, third.dataType) match {
+      case (_: ArrayType, e2, e3) if e2 != IntegerType =>
+        DataTypeMismatch(
+          errorSubClass = "UNEXPECTED_INPUT_TYPE",
+          messageParameters = Map(
+            "paramIndex" -> "2",
+            "requiredType" -> toSQLType(IntegerType),
+            "inputSql" -> toSQLExpr(second),
+            "inputType" -> toSQLType(second.dataType))
+        )
+      case _ => TypeCheckResult.TypeCheckSuccess
+    }
+  }
+
+  override def eval(input: InternalRow): Any = {
+    val arr = first.eval(input)
+    val pos = second.eval(input)
+    val item = third.eval(input)
+
+    if (arr == null || pos == null) {
+      null
+    } else {
+      nullSafeEval(arr, pos, item)
+    }
+  }
+
+  override def nullSafeEval(arr: Any, pos: Any, item: Any): Any = {
+    val baseArr = arr.asInstanceOf[ArrayData]
+    val posInt = pos.asInstanceOf[Int]
+    val arrayElementType = dataType.asInstanceOf[ArrayType].elementType
+    val newArrayLength = baseArr.numElements() + 1

Review Comment:
   this is not the case, refer to the second example in https://docs.snowflake.com/en/sql-reference/functions/array_insert.html



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org