You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by "MaxGekk (via GitHub)" <gi...@apache.org> on 2024/03/19 16:08:41 UTC

Re: [PR] [SPARK-47007][SQL][PYTHON][R][CONNECT] Add the `map_sort` function [spark]

MaxGekk commented on code in PR #45069:
URL: https://github.com/apache/spark/pull/45069#discussion_r1530666982


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala:
##########
@@ -888,6 +888,179 @@ case class MapFromEntries(child: Expression)
     copy(child = newChild)
 }
 
+@ExpressionDescription(
+  usage = """
+    _FUNC_(map[, ascendingOrder]) - Sorts the input map in ascending or descending order
+      according to the natural ordering of the map keys. The algorithm used for sorting is
+      an adaptive, stable and iterative algorithm. If the input map is empty, function
+      returns an empty map.
+  """,
+  arguments =
+    """
+    Arguments:
+      * map - The map that will be sorted.
+      * ascendingOrder - A boolean value describing the order in which the map will be sorted.
+          This can be either be ascending (true) or descending (false).
+  """,
+  examples = """
+    Examples:
+      > SELECT _FUNC_(map(3, 'c', 1, 'a', 2, 'b'), true);
+       {1:"a",2:"b",3:"c"}
+  """,
+  group = "map_funcs",
+  since = "4.0.0")
+case class MapSort(base: Expression, ascendingOrder: Expression)
+  extends BinaryExpression with NullIntolerant with QueryErrorsBase {
+
+  def this(e: Expression) = this(e, Literal(true))
+
+  val keyType: DataType = base.dataType.asInstanceOf[MapType].keyType
+  val valueType: DataType = base.dataType.asInstanceOf[MapType].valueType
+
+  override def left: Expression = base
+  override def right: Expression = ascendingOrder
+  override def dataType: DataType = base.dataType
+
+  override def checkInputDataTypes(): TypeCheckResult = base.dataType match {
+    case MapType(kt, _, _) if RowOrdering.isOrderable(kt) =>
+      ascendingOrder match {
+        case Literal(_: Boolean, BooleanType) =>
+          TypeCheckResult.TypeCheckSuccess
+        case _ =>
+          DataTypeMismatch(
+            errorSubClass = "UNEXPECTED_INPUT_TYPE",
+            messageParameters = Map(
+              "paramIndex" -> ordinalNumber(1),
+              "requiredType" -> toSQLType(BooleanType),
+              "inputSql" -> toSQLExpr(ascendingOrder),
+              "inputType" -> toSQLType(ascendingOrder.dataType))
+          )
+      }
+    case MapType(_, _, _) =>

Review Comment:
   let's don't depend on the number of parameters in `MapType`:
   ```suggestion
       case _: MapType =>
   ```
   
   See https://github.com/databricks/scala-style-guide?tab=readme-ov-file#pattern-matching



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala:
##########
@@ -888,6 +888,179 @@ case class MapFromEntries(child: Expression)
     copy(child = newChild)
 }
 
+@ExpressionDescription(
+  usage = """
+    _FUNC_(map[, ascendingOrder]) - Sorts the input map in ascending or descending order
+      according to the natural ordering of the map keys. The algorithm used for sorting is
+      an adaptive, stable and iterative algorithm. If the input map is empty, function
+      returns an empty map.
+  """,
+  arguments =
+    """
+    Arguments:
+      * map - The map that will be sorted.
+      * ascendingOrder - A boolean value describing the order in which the map will be sorted.
+          This can be either be ascending (true) or descending (false).
+  """,
+  examples = """
+    Examples:
+      > SELECT _FUNC_(map(3, 'c', 1, 'a', 2, 'b'), true);
+       {1:"a",2:"b",3:"c"}
+  """,
+  group = "map_funcs",
+  since = "4.0.0")
+case class MapSort(base: Expression, ascendingOrder: Expression)
+  extends BinaryExpression with NullIntolerant with QueryErrorsBase {
+
+  def this(e: Expression) = this(e, Literal(true))
+
+  val keyType: DataType = base.dataType.asInstanceOf[MapType].keyType
+  val valueType: DataType = base.dataType.asInstanceOf[MapType].valueType
+
+  override def left: Expression = base
+  override def right: Expression = ascendingOrder
+  override def dataType: DataType = base.dataType
+
+  override def checkInputDataTypes(): TypeCheckResult = base.dataType match {
+    case MapType(kt, _, _) if RowOrdering.isOrderable(kt) =>
+      ascendingOrder match {
+        case Literal(_: Boolean, BooleanType) =>
+          TypeCheckResult.TypeCheckSuccess
+        case _ =>
+          DataTypeMismatch(
+            errorSubClass = "UNEXPECTED_INPUT_TYPE",
+            messageParameters = Map(
+              "paramIndex" -> ordinalNumber(1),
+              "requiredType" -> toSQLType(BooleanType),
+              "inputSql" -> toSQLExpr(ascendingOrder),
+              "inputType" -> toSQLType(ascendingOrder.dataType))
+          )
+      }
+    case MapType(_, _, _) =>
+      DataTypeMismatch(
+        errorSubClass = "INVALID_ORDERING_TYPE",
+        messageParameters = Map(
+          "functionName" -> toSQLId(prettyName),
+          "dataType" -> toSQLType(base.dataType)
+        )
+      )
+    case _ =>
+      DataTypeMismatch(
+        errorSubClass = "UNEXPECTED_INPUT_TYPE",
+        messageParameters = Map(
+          "paramIndex" -> ordinalNumber(0),
+          "requiredType" -> toSQLType(MapType),
+          "inputSql" -> toSQLExpr(base),
+          "inputType" -> toSQLType(base.dataType))
+      )
+  }
+
+  override def nullSafeEval(array: Any, ascending: Any): Any = {
+    // put keys and their respective values inside a tuple and sort them
+    // according to the key ordering. Extract the new sorted k/v pairs to form a sorted map
+
+    val mapData = array.asInstanceOf[MapData]
+    val numElements = mapData.numElements()
+    val keys = mapData.keyArray()
+    val values = mapData.valueArray()
+
+    val ordering = if (ascending.asInstanceOf[Boolean]) {
+      PhysicalDataType.ordering(keyType)
+    } else {
+      PhysicalDataType.ordering(keyType).reverse
+    }
+
+    val sortedMap = Array
+      .tabulate(numElements)(i => (keys.get(i, keyType).asInstanceOf[Any],
+        values.get(i, valueType).asInstanceOf[Any]))
+      .sortBy(_._1)(ordering)
+
+    new ArrayBasedMapData(new GenericArrayData(sortedMap.map(_._1)),
+      new GenericArrayData(sortedMap.map(_._2)))
+  }
+
+  override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
+    nullSafeCodeGen(ctx, ev, (b, order) => sortCodegen(ctx, ev, b, order))
+  }
+
+  private def sortCodegen(ctx: CodegenContext, ev: ExprCode,
+      base: String, order: String): String = {
+
+    val arrayBasedMapData = classOf[ArrayBasedMapData].getName
+    val genericArrayData = classOf[GenericArrayData].getName
+
+    val numElements = ctx.freshName("numElements")
+    val keys = ctx.freshName("keys")
+    val values = ctx.freshName("values")
+    val sortArray = ctx.freshName("sortArray")
+    val i = ctx.freshName("i")
+    val o1 = ctx.freshName("o1")
+    val o1entry = ctx.freshName("o1entry")
+    val o2 = ctx.freshName("o2")
+    val o2entry = ctx.freshName("o2entry")
+    val c = ctx.freshName("c")
+    val newKeys = ctx.freshName("newKeys")
+    val newValues = ctx.freshName("newValues")
+    val originalIndex = ctx.freshName("originalIndex")

Review Comment:
   Is it used somewhere?



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala:
##########
@@ -888,6 +888,179 @@ case class MapFromEntries(child: Expression)
     copy(child = newChild)
 }
 
+@ExpressionDescription(
+  usage = """
+    _FUNC_(map[, ascendingOrder]) - Sorts the input map in ascending or descending order
+      according to the natural ordering of the map keys. The algorithm used for sorting is
+      an adaptive, stable and iterative algorithm. If the input map is empty, function
+      returns an empty map.
+  """,
+  arguments =
+    """
+    Arguments:
+      * map - The map that will be sorted.
+      * ascendingOrder - A boolean value describing the order in which the map will be sorted.
+          This can be either be ascending (true) or descending (false).
+  """,
+  examples = """
+    Examples:
+      > SELECT _FUNC_(map(3, 'c', 1, 'a', 2, 'b'), true);
+       {1:"a",2:"b",3:"c"}
+  """,
+  group = "map_funcs",
+  since = "4.0.0")
+case class MapSort(base: Expression, ascendingOrder: Expression)
+  extends BinaryExpression with NullIntolerant with QueryErrorsBase {
+
+  def this(e: Expression) = this(e, Literal(true))
+
+  val keyType: DataType = base.dataType.asInstanceOf[MapType].keyType
+  val valueType: DataType = base.dataType.asInstanceOf[MapType].valueType
+
+  override def left: Expression = base
+  override def right: Expression = ascendingOrder
+  override def dataType: DataType = base.dataType
+
+  override def checkInputDataTypes(): TypeCheckResult = base.dataType match {
+    case MapType(kt, _, _) if RowOrdering.isOrderable(kt) =>

Review Comment:
   ```suggestion
       case m: MapType if RowOrdering.isOrderable(m.keyType) =>
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org