You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by mn-mikke <gi...@git.apache.org> on 2018/08/06 23:50:21 UTC

[GitHub] spark pull request #22017: [SPARK-23938][SQL] Add map_zip_with function

GitHub user mn-mikke opened a pull request:

    https://github.com/apache/spark/pull/22017

    [SPARK-23938][SQL] Add map_zip_with function

    ## What changes were proposed in this pull request?
    
    This PR adds a new SQL function called ```map_zip_with```. It merges the two given maps into a single map by applying function to the pair of values with the same key. 
    
    ## How was this patch tested?
    
    Added new tests into:
    - DataFrameFunctionsSuite.scala
    - HigherOrderFunctionsSuite.scala


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/mn-mikke/spark SPARK-23938

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/22017.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #22017
    
----
commit ef56011f03d8bae4634e5d3108e4d6502482383c
Author: Marek Novotny <mn...@...>
Date:   2018-08-06T23:42:45Z

    [SPARK-23938][SQL] Add map_zip_with function

----


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22017: [SPARK-23938][SQL] Add map_zip_with function

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22017
  
    **[Test build #94685 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/94685/testReport)** for PR 22017 at commit [`595161f`](https://github.com/apache/spark/commit/595161fefbf55711b76530a9e53aff73491febd6).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22017: [SPARK-23938][SQL] Add map_zip_with function

Posted by mn-mikke <gi...@git.apache.org>.
Github user mn-mikke commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22017#discussion_r208675350
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala ---
    @@ -442,3 +442,191 @@ case class ArrayAggregate(
     
       override def prettyName: String = "aggregate"
     }
    +
    +/**
    + * Merges two given maps into a single map by applying function to the pair of values with
    + * the same key.
    + */
    +@ExpressionDescription(
    +  usage =
    +    """
    +      _FUNC_(map1, map2, function) - Merges two given maps into a single map by applying
    +      function to the pair of values with the same key. For keys only presented in one map,
    +      NULL will be passed as the value for the missing key. If an input map contains duplicated
    +      keys, only the first entry of the duplicated key is passed into the lambda function.
    +    """,
    +  examples = """
    +    Examples:
    +      > SELECT _FUNC_(map(1, 'a', 2, 'b'), map(1, 'x', 2, 'y'), (k, v1, v2) -> concat(v1, v2));
    +       {1:"ax",2:"by"}
    +  """,
    +  since = "2.4.0")
    +case class MapZipWith(left: Expression, right: Expression, function: Expression)
    +  extends HigherOrderFunction with CodegenFallback {
    +
    +  @transient lazy val functionForEval: Expression = functionsForEval.head
    +
    +  @transient lazy val (keyType, leftValueType, _) =
    +    HigherOrderFunction.mapKeyValueArgumentType(left.dataType)
    +
    +  @transient lazy val (_, rightValueType, _) =
    +    HigherOrderFunction.mapKeyValueArgumentType(right.dataType)
    +
    +  @transient lazy val ordering = TypeUtils.getInterpretedOrdering(keyType)
    +
    +  override def inputs: Seq[Expression] = left :: right :: Nil
    +
    +  override def functions: Seq[Expression] = function :: Nil
    +
    +  override def nullable: Boolean = left.nullable || right.nullable
    +
    +  override def dataType: DataType = MapType(keyType, function.dataType, function.nullable)
    +
    +  override def checkInputDataTypes(): TypeCheckResult = {
    +    (left.dataType, right.dataType) match {
    +      case (MapType(k1, _, _), MapType(k2, _, _)) if k1.sameType(k2) =>
    +        TypeUtils.checkForOrderingExpr(k1, s"function $prettyName")
    +      case _ => TypeCheckResult.TypeCheckFailure(s"The input to function $prettyName should have " +
    +        s"been two ${MapType.simpleString}s with the same key type, but it's " +
    +        s"[${left.dataType.catalogString}, ${right.dataType.catalogString}].")
    +    }
    +  }
    +
    +  override def bind(f: (Expression, Seq[(DataType, Boolean)]) => LambdaFunction): MapZipWith = {
    +    val arguments = Seq((keyType, false), (leftValueType, true), (rightValueType, true))
    +    copy(function = f(function, arguments))
    +  }
    +
    +  override def eval(input: InternalRow): Any = {
    +    val value1 = left.eval(input)
    +    if (value1 == null) {
    +      null
    +    } else {
    +      val value2 = right.eval(input)
    +      if (value2 == null) {
    +        null
    +      } else {
    +        nullSafeEval(input, value1, value2)
    +      }
    +    }
    +  }
    +
    +  @transient lazy val LambdaFunction(_, Seq(
    +    keyVar: NamedLambdaVariable,
    +    value1Var: NamedLambdaVariable,
    +    value2Var: NamedLambdaVariable),
    +    _) = function
    +
    +  private def keyTypeSupportsEquals = keyType match {
    +    case BinaryType => false
    +    case _: AtomicType => true
    +    case _ => false
    +  }
    +
    +  @transient private lazy val getKeysWithValueIndexes:
    +      (ArrayData, ArrayData) => Seq[(Any, Array[Option[Int]])] = {
    +    if (keyTypeSupportsEquals) {
    +      getKeysWithIndexesFast
    +    } else {
    +      getKeysWithIndexesBruteForce
    +    }
    +  }
    +
    +  private def assertSizeOfArrayBuffer(size: Int): Unit = {
    +    if (size > ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH) {
    +      throw new RuntimeException(s"Unsuccessful try to zip maps with $size " +
    +        s"unique keys due to exceeding the array size limit " +
    +        s"${ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH}.")
    +    }
    +  }
    +
    +  private def getKeysWithIndexesFast(keys1: ArrayData, keys2: ArrayData) = {
    +    val arrayBuffer = new mutable.ArrayBuffer[(Any, Array[Option[Int]])]
    --- End diff --
    
    This is valid argument, it's a rare edge case. The last question before I change it. WDYT about performance a mutable array vs. ```oldTuple.copy(_2 = newValue)```?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22017: [SPARK-23938][SQL] Add map_zip_with function

Posted by ueshin <gi...@git.apache.org>.
Github user ueshin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22017#discussion_r208181653
  
    --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/HigherOrderFunctionsSuite.scala ---
    @@ -44,6 +44,21 @@ class HigherOrderFunctionsSuite extends SparkFunSuite with ExpressionEvalHelper
         LambdaFunction(function, Seq(lv1, lv2))
       }
     
    +  private def createLambda(
    +    dt1: DataType,
    +    nullable1: Boolean,
    +    dt2: DataType,
    +    nullable2: Boolean,
    +    dt3: DataType,
    +    nullable3: Boolean,
    +    f: (Expression, Expression, Expression) => Expression): Expression = {
    --- End diff --
    
    nit: indent


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22017: [SPARK-23938][SQL] Add map_zip_with function

Posted by mgaido91 <gi...@git.apache.org>.
Github user mgaido91 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22017#discussion_r208188031
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/DataFrameFunctionsSuite.scala ---
    @@ -2071,6 +2071,67 @@ class DataFrameFunctionsSuite extends QueryTest with SharedSQLContext {
         assert(ex4.getMessage.contains("data type mismatch: argument 3 requires int type"))
       }
     
    +  test("map_zip_with function - map of primitive types") {
    +    val df = Seq(
    +      (Map(8 -> 6L, 3 -> 5L, 6 -> 2L), Map[Integer, Integer]((6, 4), (8, 2), (3, 2))),
    +      (Map(10 -> 6L, 8 -> 3L), Map[Integer, Integer]((8, 4), (4, null))),
    +      (Map.empty[Int, Long], Map[Integer, Integer]((5, 1))),
    +      (Map(5 -> 1L), null)
    +    ).toDF("m1", "m2")
    +
    +    checkAnswer(df.selectExpr("map_zip_with(m1, m2, (k, v1, v2) -> k == v1 + v2)"),
    +      Seq(
    +        Row(Map(8 -> true, 3 -> false, 6 -> true)),
    +        Row(Map(10 -> null, 8 -> false, 4 -> null)),
    +        Row(Map(5 -> null)),
    +        Row(null)))
    +  }
    +
    +  test("map_zip_with function - map of complex types") {
    +    val df = Seq(
    +      (Map("z" -> "a", "y" -> "b", "x" -> "c"), Map("x" -> "a", "z" -> "c")),
    +      (Map("b" -> "a", "c" -> "d"), Map("c" -> "a", "b" -> null, "d" -> "k")),
    +      (Map("a" -> "d"), Map.empty[String, String]),
    +      (Map("a" -> "d"), null)
    +    ).toDF("m1", "m2")
    +
    +    checkAnswer(df.selectExpr("map_zip_with(m1, m2, (k, v1, v2) -> (v1, v2))"),
    +      Seq(
    +        Row(Map("z" -> Row("a", "c"), "y" -> Row("b", null), "x" -> Row("c", "a"))),
    +        Row(Map("b" -> Row("a", null), "c" -> Row("d", "a"), "d" -> Row(null, "k"))),
    +        Row(Map("a" -> Row("d", null))),
    +        Row(null)))
    +  }
    +
    +  test("map_zip_with function - invalid")
    +  {
    --- End diff --
    
    nit: please move the brace at the end of the previous line (there are other place where this should be done, please update them too)


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22017: [SPARK-23938][SQL] Add map_zip_with function

Posted by ueshin <gi...@git.apache.org>.
Github user ueshin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22017#discussion_r209820737
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/DataFrameFunctionsSuite.scala ---
    @@ -2238,6 +2238,70 @@ class DataFrameFunctionsSuite extends QueryTest with SharedSQLContext {
         assert(ex5.getMessage.contains("cannot resolve '`a`'"))
       }
     
    +  test("map_zip_with function - map of primitive types") {
    +    val df = Seq(
    +      (Map(8 -> 6L, 3 -> 5L, 6 -> 2L), Map[Integer, Integer]((6, 4), (8, 2), (3, 2))),
    +      (Map(10 -> 6L, 8 -> 3L), Map[Integer, Integer]((8, 4), (4, null))),
    +      (Map.empty[Int, Long], Map[Integer, Integer]((5, 1))),
    +      (Map(5 -> 1L), null)
    +    ).toDF("m1", "m2")
    +
    +    checkAnswer(df.selectExpr("map_zip_with(m1, m2, (k, v1, v2) -> k == v1 + v2)"),
    +      Seq(
    +        Row(Map(8 -> true, 3 -> false, 6 -> true)),
    +        Row(Map(10 -> null, 8 -> false, 4 -> null)),
    +        Row(Map(5 -> null)),
    +        Row(null)))
    +  }
    +
    +  test("map_zip_with function - map of complex types") {
    --- End diff --
    
    nit: `non-primitive` instead of `complex`?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22017: [SPARK-23938][SQL] Add map_zip_with function

Posted by mgaido91 <gi...@git.apache.org>.
Github user mgaido91 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22017#discussion_r208183356
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala ---
    @@ -3667,10 +3667,8 @@ abstract class ArraySetLike extends BinaryArrayExpressionWithImplicitCast {
       @transient protected lazy val ordering: Ordering[Any] =
         TypeUtils.getInterpretedOrdering(elementType)
     
    -  @transient protected lazy val elementTypeSupportEquals = elementType match {
    -    case BinaryType => false
    -    case _: AtomicType => true
    -    case _ => false
    +  @transient protected lazy val elementTypeSupportEquals = {
    --- End diff --
    
    nit: I think we can avoid the braces


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22017: [SPARK-23938][SQL] Add map_zip_with function

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22017
  
    **[Test build #94458 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/94458/testReport)** for PR 22017 at commit [`38ce4e7`](https://github.com/apache/spark/commit/38ce4e72209d2f21cdb0993f89799d563e9ecd97).
     * This patch **fails Spark unit tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22017: [SPARK-23938][SQL] Add map_zip_with function

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22017
  
    **[Test build #94508 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/94508/testReport)** for PR 22017 at commit [`5d2a78e`](https://github.com/apache/spark/commit/5d2a78ef09edb425593eb90b9bab616d2cd626ab).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22017: [SPARK-23938][SQL] Add map_zip_with function

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22017
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22017: [SPARK-23938][SQL] Add map_zip_with function

Posted by mgaido91 <gi...@git.apache.org>.
Github user mgaido91 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22017#discussion_r208210338
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala ---
    @@ -365,3 +364,101 @@ case class ArrayAggregate(
     
       override def prettyName: String = "aggregate"
     }
    +
    +/**
    + * Merges two given maps into a single map by applying function to the pair of values with
    + * the same key.
    + */
    +@ExpressionDescription(
    +  usage =
    +    """
    +      _FUNC_(map1, map2, function) - Merges two given maps into a single map by applying
    +      function to the pair of values with the same key. For keys only presented in one map,
    +      NULL will be passed as the value for the missing key. If an input map contains duplicated
    +      keys, only the first entry of the duplicated key is passed into the lambda function.
    +    """,
    +  examples = """
    +    Examples:
    +      > SELECT _FUNC_(map(1, 'a', 2, 'b'), map(1, 'x', 2, 'y'), (k, v1, v2) -> concat(v1, v2));
    +       {1:"ax",2:"by"}
    +  """,
    +  since = "2.4.0")
    +case class MapZipWith(left: Expression, right: Expression, function: Expression)
    +  extends HigherOrderFunction with CodegenFallback {
    +
    +  @transient lazy val functionForEval: Expression = functionsForEval.head
    +
    +  @transient lazy val MapType(keyType, leftValueType, _) = getMapType(left)
    +
    +  @transient lazy val MapType(_, rightValueType, _) = getMapType(right)
    +
    +  @transient lazy val arrayDataUnion = new ArrayDataUnion(keyType)
    +
    +  @transient lazy val ordering = TypeUtils.getInterpretedOrdering(keyType)
    +
    +  override def inputs: Seq[Expression] = left :: right :: Nil
    +
    +  override def functions: Seq[Expression] = function :: Nil
    +
    +  override def nullable: Boolean = left.nullable || right.nullable
    +
    +  override def dataType: DataType = MapType(keyType, function.dataType, function.nullable)
    +
    +  override def checkInputDataTypes(): TypeCheckResult = {
    +    (left.dataType, right.dataType) match {
    +      case (MapType(k1, _, _), MapType(k2, _, _)) if k1.sameType(k2) =>
    +        TypeUtils.checkForOrderingExpr(k1, s"function $prettyName")
    +      case _ => TypeCheckResult.TypeCheckFailure(s"The input to function $prettyName should have " +
    +        s"been two ${MapType.simpleString}s with the same key type, but it's " +
    +        s"[${left.dataType.catalogString}, ${right.dataType.catalogString}].")
    +    }
    +  }
    +
    +  private def getMapType(expr: Expression) = expr.dataType match {
    +    case m: MapType => m
    +    case _ => MapType.defaultConcreteType
    +  }
    +
    +  override def bind(f: (Expression, Seq[(DataType, Boolean)]) => LambdaFunction): MapZipWith = {
    +    val arguments = Seq((keyType, false), (leftValueType, true), (rightValueType, true))
    +    copy(function = f(function, arguments))
    +  }
    +
    +  override def eval(input: InternalRow): Any = {
    +    val value1 = left.eval(input)
    +    if (value1 == null) {
    +      null
    +    } else {
    +      val value2 = right.eval(input)
    +      if (value2 == null) {
    +        null
    +      } else {
    +        nullSafeEval(input, value1, value2)
    +      }
    +    }
    +  }
    +
    +  @transient lazy val LambdaFunction(_, Seq(
    +    keyVar: NamedLambdaVariable,
    +    value1Var: NamedLambdaVariable,
    +    value2Var: NamedLambdaVariable),
    +    _) = function
    +
    +  private def nullSafeEval(inputRow: InternalRow, value1: Any, value2: Any): Any = {
    +    val mapData1 = value1.asInstanceOf[MapData]
    +    val mapData2 = value2.asInstanceOf[MapData]
    +    val keys = arrayDataUnion(mapData1.keyArray(), mapData2.keyArray())
    +    val values = new GenericArrayData(new Array[Any](keys.numElements()))
    +    keys.foreach(keyType, (idx: Int, key: Any) => {
    +      val v1 = GetMapValueUtil.getValueEval(mapData1, key, keyType, leftValueType, ordering)
    --- End diff --
    
    I think there is no plan to have a different map implementation and anyway there is a lot of code which depends on having the array based version of MapData. Regarding the duplicated code, to be honest, I think that avoiding the refactoring introduced by that would also make this PR cleaner...


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22017: [SPARK-23938][SQL] Add map_zip_with function

Posted by mn-mikke <gi...@git.apache.org>.
Github user mn-mikke commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22017#discussion_r208204796
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala ---
    @@ -365,3 +364,101 @@ case class ArrayAggregate(
     
       override def prettyName: String = "aggregate"
     }
    +
    +/**
    + * Merges two given maps into a single map by applying function to the pair of values with
    + * the same key.
    + */
    +@ExpressionDescription(
    +  usage =
    +    """
    +      _FUNC_(map1, map2, function) - Merges two given maps into a single map by applying
    +      function to the pair of values with the same key. For keys only presented in one map,
    +      NULL will be passed as the value for the missing key. If an input map contains duplicated
    +      keys, only the first entry of the duplicated key is passed into the lambda function.
    +    """,
    +  examples = """
    +    Examples:
    +      > SELECT _FUNC_(map(1, 'a', 2, 'b'), map(1, 'x', 2, 'y'), (k, v1, v2) -> concat(v1, v2));
    +       {1:"ax",2:"by"}
    +  """,
    +  since = "2.4.0")
    +case class MapZipWith(left: Expression, right: Expression, function: Expression)
    +  extends HigherOrderFunction with CodegenFallback {
    +
    +  @transient lazy val functionForEval: Expression = functionsForEval.head
    +
    +  @transient lazy val MapType(keyType, leftValueType, _) = getMapType(left)
    +
    +  @transient lazy val MapType(_, rightValueType, _) = getMapType(right)
    +
    +  @transient lazy val arrayDataUnion = new ArrayDataUnion(keyType)
    +
    +  @transient lazy val ordering = TypeUtils.getInterpretedOrdering(keyType)
    +
    +  override def inputs: Seq[Expression] = left :: right :: Nil
    +
    +  override def functions: Seq[Expression] = function :: Nil
    +
    +  override def nullable: Boolean = left.nullable || right.nullable
    +
    +  override def dataType: DataType = MapType(keyType, function.dataType, function.nullable)
    +
    +  override def checkInputDataTypes(): TypeCheckResult = {
    +    (left.dataType, right.dataType) match {
    +      case (MapType(k1, _, _), MapType(k2, _, _)) if k1.sameType(k2) =>
    +        TypeUtils.checkForOrderingExpr(k1, s"function $prettyName")
    +      case _ => TypeCheckResult.TypeCheckFailure(s"The input to function $prettyName should have " +
    +        s"been two ${MapType.simpleString}s with the same key type, but it's " +
    +        s"[${left.dataType.catalogString}, ${right.dataType.catalogString}].")
    +    }
    +  }
    +
    +  private def getMapType(expr: Expression) = expr.dataType match {
    +    case m: MapType => m
    +    case _ => MapType.defaultConcreteType
    +  }
    +
    +  override def bind(f: (Expression, Seq[(DataType, Boolean)]) => LambdaFunction): MapZipWith = {
    +    val arguments = Seq((keyType, false), (leftValueType, true), (rightValueType, true))
    +    copy(function = f(function, arguments))
    +  }
    +
    +  override def eval(input: InternalRow): Any = {
    +    val value1 = left.eval(input)
    +    if (value1 == null) {
    +      null
    +    } else {
    +      val value2 = right.eval(input)
    +      if (value2 == null) {
    +        null
    +      } else {
    +        nullSafeEval(input, value1, value2)
    +      }
    +    }
    +  }
    +
    +  @transient lazy val LambdaFunction(_, Seq(
    +    keyVar: NamedLambdaVariable,
    +    value1Var: NamedLambdaVariable,
    +    value2Var: NamedLambdaVariable),
    +    _) = function
    +
    +  private def nullSafeEval(inputRow: InternalRow, value1: Any, value2: Any): Any = {
    +    val mapData1 = value1.asInstanceOf[MapData]
    +    val mapData2 = value2.asInstanceOf[MapData]
    +    val keys = arrayDataUnion(mapData1.keyArray(), mapData2.keyArray())
    +    val values = new GenericArrayData(new Array[Any](keys.numElements()))
    +    keys.foreach(keyType, (idx: Int, key: Any) => {
    +      val v1 = GetMapValueUtil.getValueEval(mapData1, key, keyType, leftValueType, ordering)
    --- End diff --
    
    Thanks for mentioning this! I'm not happy with the current complexity either. I've assumed that the implementation of maps will change into something with O(1) element access in future. By then, the complexity would be O(N) for types supporting equals as well and we would safe a portion of duplicated code.
    
    If you think that maps will remain like this for a long time, really like your suggestion with indexes.
    
    @ueshin What's your view on that?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22017: [SPARK-23938][SQL] Add map_zip_with function

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22017
  
    **[Test build #94508 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/94508/testReport)** for PR 22017 at commit [`5d2a78e`](https://github.com/apache/spark/commit/5d2a78ef09edb425593eb90b9bab616d2cd626ab).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22017: [SPARK-23938][SQL] Add map_zip_with function

Posted by ueshin <gi...@git.apache.org>.
Github user ueshin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22017#discussion_r208874195
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala ---
    @@ -442,3 +442,184 @@ case class ArrayAggregate(
     
       override def prettyName: String = "aggregate"
     }
    +
    +/**
    + * Merges two given maps into a single map by applying function to the pair of values with
    + * the same key.
    + */
    +@ExpressionDescription(
    +  usage =
    +    """
    +      _FUNC_(map1, map2, function) - Merges two given maps into a single map by applying
    +      function to the pair of values with the same key. For keys only presented in one map,
    +      NULL will be passed as the value for the missing key. If an input map contains duplicated
    +      keys, only the first entry of the duplicated key is passed into the lambda function.
    +    """,
    +  examples = """
    +    Examples:
    +      > SELECT _FUNC_(map(1, 'a', 2, 'b'), map(1, 'x', 2, 'y'), (k, v1, v2) -> concat(v1, v2));
    +       {1:"ax",2:"by"}
    +  """,
    +  since = "2.4.0")
    +case class MapZipWith(left: Expression, right: Expression, function: Expression)
    +  extends HigherOrderFunction with CodegenFallback {
    +
    +  @transient lazy val functionForEval: Expression = functionsForEval.head
    +
    +  @transient lazy val (keyType, leftValueType, _) =
    --- End diff --
    
    Sounds good. Thanks!


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22017: [SPARK-23938][SQL] Add map_zip_with function

Posted by ueshin <gi...@git.apache.org>.
Github user ueshin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22017#discussion_r209820348
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala ---
    @@ -496,3 +496,195 @@ case class ArrayAggregate(
     
       override def prettyName: String = "aggregate"
     }
    +
    +/**
    + * Merges two given maps into a single map by applying function to the pair of values with
    + * the same key.
    + */
    +@ExpressionDescription(
    +  usage =
    +    """
    +      _FUNC_(map1, map2, function) - Merges two given maps into a single map by applying
    +      function to the pair of values with the same key. For keys only presented in one map,
    +      NULL will be passed as the value for the missing key. If an input map contains duplicated
    +      keys, only the first entry of the duplicated key is passed into the lambda function.
    +    """,
    +  examples = """
    +    Examples:
    +      > SELECT _FUNC_(map(1, 'a', 2, 'b'), map(1, 'x', 2, 'y'), (k, v1, v2) -> concat(v1, v2));
    +       {1:"ax",2:"by"}
    +  """,
    +  since = "2.4.0")
    +case class MapZipWith(left: Expression, right: Expression, function: Expression)
    +  extends HigherOrderFunction with CodegenFallback {
    +
    +  @transient lazy val functionForEval: Expression = functionsForEval.head
    +
    +  @transient lazy val MapType(leftKeyType, leftValueType, leftValueContainsNull) = left.dataType
    +
    +  @transient lazy val MapType(rightKeyType, rightValueType, rightValueContainsNull) = right.dataType
    +
    +  @transient lazy val keyType =
    +    TypeCoercion.findCommonTypeDifferentOnlyInNullFlags(leftKeyType, rightKeyType).get
    +
    +  @transient lazy val ordering = TypeUtils.getInterpretedOrdering(keyType)
    +
    +  override def arguments: Seq[Expression] = left :: right :: Nil
    +
    +  override def argumentTypes: Seq[AbstractDataType] = MapType :: MapType :: Nil
    +
    +  override def functions: Seq[Expression] = function :: Nil
    +
    +  override def functionTypes: Seq[AbstractDataType] = AnyDataType :: Nil
    +
    +  override def nullable: Boolean = left.nullable || right.nullable
    +
    +  override def dataType: DataType = MapType(keyType, function.dataType, function.nullable)
    +
    +  override def bind(f: (Expression, Seq[(DataType, Boolean)]) => LambdaFunction): MapZipWith = {
    +    val arguments = Seq((keyType, false), (leftValueType, true), (rightValueType, true))
    +    copy(function = f(function, arguments))
    +  }
    +
    +  override def checkArgumentDataTypes(): TypeCheckResult = {
    +    super.checkArgumentDataTypes() match {
    +      case TypeCheckResult.TypeCheckSuccess =>
    +        if (leftKeyType.sameType(rightKeyType)) {
    +          TypeUtils.checkForOrderingExpr(leftKeyType, s"function $prettyName")
    +        } else {
    +          TypeCheckResult.TypeCheckFailure(s"The input to function $prettyName should have " +
    +            s"been two ${MapType.simpleString}s with compatible key types, but the key types are " +
    +            s"[${leftKeyType.catalogString}, ${rightKeyType.catalogString}].")
    +        }
    +      case failure => failure
    +    }
    +  }
    +
    +  // Nothing to check since the data type of the lambda function can be anything.
    +  override def checkInputDataTypes(): TypeCheckResult = TypeCheckResult.TypeCheckSuccess
    --- End diff --
    
    I'd call `checkArgumentDataTypes()` here again.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22017: [SPARK-23938][SQL] Add map_zip_with function

Posted by mn-mikke <gi...@git.apache.org>.
Github user mn-mikke commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22017#discussion_r208211250
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala ---
    @@ -365,3 +364,101 @@ case class ArrayAggregate(
     
       override def prettyName: String = "aggregate"
     }
    +
    +/**
    + * Merges two given maps into a single map by applying function to the pair of values with
    + * the same key.
    + */
    +@ExpressionDescription(
    +  usage =
    +    """
    +      _FUNC_(map1, map2, function) - Merges two given maps into a single map by applying
    +      function to the pair of values with the same key. For keys only presented in one map,
    +      NULL will be passed as the value for the missing key. If an input map contains duplicated
    +      keys, only the first entry of the duplicated key is passed into the lambda function.
    +    """,
    +  examples = """
    +    Examples:
    +      > SELECT _FUNC_(map(1, 'a', 2, 'b'), map(1, 'x', 2, 'y'), (k, v1, v2) -> concat(v1, v2));
    +       {1:"ax",2:"by"}
    +  """,
    +  since = "2.4.0")
    +case class MapZipWith(left: Expression, right: Expression, function: Expression)
    +  extends HigherOrderFunction with CodegenFallback {
    +
    +  @transient lazy val functionForEval: Expression = functionsForEval.head
    +
    +  @transient lazy val MapType(keyType, leftValueType, _) = getMapType(left)
    +
    +  @transient lazy val MapType(_, rightValueType, _) = getMapType(right)
    +
    +  @transient lazy val arrayDataUnion = new ArrayDataUnion(keyType)
    +
    +  @transient lazy val ordering = TypeUtils.getInterpretedOrdering(keyType)
    +
    +  override def inputs: Seq[Expression] = left :: right :: Nil
    +
    +  override def functions: Seq[Expression] = function :: Nil
    +
    +  override def nullable: Boolean = left.nullable || right.nullable
    +
    +  override def dataType: DataType = MapType(keyType, function.dataType, function.nullable)
    +
    +  override def checkInputDataTypes(): TypeCheckResult = {
    +    (left.dataType, right.dataType) match {
    +      case (MapType(k1, _, _), MapType(k2, _, _)) if k1.sameType(k2) =>
    +        TypeUtils.checkForOrderingExpr(k1, s"function $prettyName")
    +      case _ => TypeCheckResult.TypeCheckFailure(s"The input to function $prettyName should have " +
    +        s"been two ${MapType.simpleString}s with the same key type, but it's " +
    +        s"[${left.dataType.catalogString}, ${right.dataType.catalogString}].")
    +    }
    +  }
    +
    +  private def getMapType(expr: Expression) = expr.dataType match {
    +    case m: MapType => m
    +    case _ => MapType.defaultConcreteType
    +  }
    +
    +  override def bind(f: (Expression, Seq[(DataType, Boolean)]) => LambdaFunction): MapZipWith = {
    +    val arguments = Seq((keyType, false), (leftValueType, true), (rightValueType, true))
    +    copy(function = f(function, arguments))
    +  }
    +
    +  override def eval(input: InternalRow): Any = {
    +    val value1 = left.eval(input)
    +    if (value1 == null) {
    +      null
    +    } else {
    +      val value2 = right.eval(input)
    +      if (value2 == null) {
    +        null
    +      } else {
    +        nullSafeEval(input, value1, value2)
    +      }
    +    }
    +  }
    +
    +  @transient lazy val LambdaFunction(_, Seq(
    +    keyVar: NamedLambdaVariable,
    +    value1Var: NamedLambdaVariable,
    +    value2Var: NamedLambdaVariable),
    +    _) = function
    +
    +  private def nullSafeEval(inputRow: InternalRow, value1: Any, value2: Any): Any = {
    +    val mapData1 = value1.asInstanceOf[MapData]
    +    val mapData2 = value2.asInstanceOf[MapData]
    +    val keys = arrayDataUnion(mapData1.keyArray(), mapData2.keyArray())
    +    val values = new GenericArrayData(new Array[Any](keys.numElements()))
    +    keys.foreach(keyType, (idx: Int, key: Any) => {
    +      val v1 = GetMapValueUtil.getValueEval(mapData1, key, keyType, leftValueType, ordering)
    --- End diff --
    
    Ok, I will change it. Thanks a lot!


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22017: [SPARK-23938][SQL] Add map_zip_with function

Posted by ueshin <gi...@git.apache.org>.
Github user ueshin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22017#discussion_r208803179
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala ---
    @@ -442,3 +442,191 @@ case class ArrayAggregate(
     
       override def prettyName: String = "aggregate"
     }
    +
    +/**
    + * Merges two given maps into a single map by applying function to the pair of values with
    + * the same key.
    + */
    +@ExpressionDescription(
    +  usage =
    +    """
    +      _FUNC_(map1, map2, function) - Merges two given maps into a single map by applying
    +      function to the pair of values with the same key. For keys only presented in one map,
    +      NULL will be passed as the value for the missing key. If an input map contains duplicated
    +      keys, only the first entry of the duplicated key is passed into the lambda function.
    +    """,
    +  examples = """
    +    Examples:
    +      > SELECT _FUNC_(map(1, 'a', 2, 'b'), map(1, 'x', 2, 'y'), (k, v1, v2) -> concat(v1, v2));
    +       {1:"ax",2:"by"}
    +  """,
    +  since = "2.4.0")
    +case class MapZipWith(left: Expression, right: Expression, function: Expression)
    +  extends HigherOrderFunction with CodegenFallback {
    +
    +  @transient lazy val functionForEval: Expression = functionsForEval.head
    +
    +  @transient lazy val (keyType, leftValueType, _) =
    +    HigherOrderFunction.mapKeyValueArgumentType(left.dataType)
    +
    +  @transient lazy val (_, rightValueType, _) =
    +    HigherOrderFunction.mapKeyValueArgumentType(right.dataType)
    +
    +  @transient lazy val ordering = TypeUtils.getInterpretedOrdering(keyType)
    +
    +  override def inputs: Seq[Expression] = left :: right :: Nil
    +
    +  override def functions: Seq[Expression] = function :: Nil
    +
    +  override def nullable: Boolean = left.nullable || right.nullable
    +
    +  override def dataType: DataType = MapType(keyType, function.dataType, function.nullable)
    +
    +  override def checkInputDataTypes(): TypeCheckResult = {
    +    (left.dataType, right.dataType) match {
    +      case (MapType(k1, _, _), MapType(k2, _, _)) if k1.sameType(k2) =>
    +        TypeUtils.checkForOrderingExpr(k1, s"function $prettyName")
    +      case _ => TypeCheckResult.TypeCheckFailure(s"The input to function $prettyName should have " +
    +        s"been two ${MapType.simpleString}s with the same key type, but it's " +
    +        s"[${left.dataType.catalogString}, ${right.dataType.catalogString}].")
    +    }
    +  }
    +
    +  override def bind(f: (Expression, Seq[(DataType, Boolean)]) => LambdaFunction): MapZipWith = {
    +    val arguments = Seq((keyType, false), (leftValueType, true), (rightValueType, true))
    +    copy(function = f(function, arguments))
    +  }
    +
    +  override def eval(input: InternalRow): Any = {
    +    val value1 = left.eval(input)
    +    if (value1 == null) {
    +      null
    +    } else {
    +      val value2 = right.eval(input)
    +      if (value2 == null) {
    +        null
    +      } else {
    +        nullSafeEval(input, value1, value2)
    +      }
    +    }
    +  }
    +
    +  @transient lazy val LambdaFunction(_, Seq(
    +    keyVar: NamedLambdaVariable,
    +    value1Var: NamedLambdaVariable,
    +    value2Var: NamedLambdaVariable),
    +    _) = function
    +
    +  private def keyTypeSupportsEquals = keyType match {
    +    case BinaryType => false
    +    case _: AtomicType => true
    +    case _ => false
    +  }
    +
    +  @transient private lazy val getKeysWithValueIndexes:
    +      (ArrayData, ArrayData) => Seq[(Any, Array[Option[Int]])] = {
    +    if (keyTypeSupportsEquals) {
    +      getKeysWithIndexesFast
    +    } else {
    +      getKeysWithIndexesBruteForce
    +    }
    +  }
    +
    +  private def assertSizeOfArrayBuffer(size: Int): Unit = {
    +    if (size > ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH) {
    +      throw new RuntimeException(s"Unsuccessful try to zip maps with $size " +
    +        s"unique keys due to exceeding the array size limit " +
    +        s"${ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH}.")
    +    }
    +  }
    +
    +  private def getKeysWithIndexesFast(keys1: ArrayData, keys2: ArrayData) = {
    +    val arrayBuffer = new mutable.ArrayBuffer[(Any, Array[Option[Int]])]
    --- End diff --
    
    I generally prefer the cleaner solution, but actually I'd prefer the previous approach in this case for 2 reasons:
    
    - We shouldn't ignore 20% of performance difference.
    - I'm not sure we can modify the comparison of `MapType` in `ExpressionEvalHelper` here. We might need another pr to make sure the modification is valid.
    
    We still need to add comments what the arrays are for and the reason, though.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22017: [SPARK-23938][SQL] Add map_zip_with function

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22017
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/94570/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22017: [SPARK-23938][SQL] Add map_zip_with function

Posted by ueshin <gi...@git.apache.org>.
Github user ueshin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22017#discussion_r209307767
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala ---
    @@ -442,3 +442,186 @@ case class ArrayAggregate(
     
       override def prettyName: String = "aggregate"
     }
    +
    +/**
    + * Merges two given maps into a single map by applying function to the pair of values with
    + * the same key.
    + */
    +@ExpressionDescription(
    +  usage =
    +    """
    +      _FUNC_(map1, map2, function) - Merges two given maps into a single map by applying
    +      function to the pair of values with the same key. For keys only presented in one map,
    +      NULL will be passed as the value for the missing key. If an input map contains duplicated
    +      keys, only the first entry of the duplicated key is passed into the lambda function.
    +    """,
    +  examples = """
    +    Examples:
    +      > SELECT _FUNC_(map(1, 'a', 2, 'b'), map(1, 'x', 2, 'y'), (k, v1, v2) -> concat(v1, v2));
    +       {1:"ax",2:"by"}
    +  """,
    +  since = "2.4.0")
    +case class MapZipWith(left: Expression, right: Expression, function: Expression)
    +  extends HigherOrderFunction with CodegenFallback {
    +
    +  @transient lazy val functionForEval: Expression = functionsForEval.head
    +
    +  @transient lazy val (leftKeyType, leftValueType, leftValueContainsNull) =
    +    HigherOrderFunction.mapKeyValueArgumentType(left.dataType)
    +
    +  @transient lazy val (rightKeyType, rightValueType, rightValueContainsNull) =
    +    HigherOrderFunction.mapKeyValueArgumentType(right.dataType)
    +
    +  @transient lazy val keyType =
    +    TypeCoercion.findTightestCommonType(leftKeyType, rightKeyType).getOrElse(NullType)
    --- End diff --
    
    Yeah, the current analysis rules order might cause some problem. Let me think about it for a while.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22017: [SPARK-23938][SQL] Add map_zip_with function

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/spark/pull/22017


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22017: [SPARK-23938][SQL] Add map_zip_with function

Posted by mgaido91 <gi...@git.apache.org>.
Github user mgaido91 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22017#discussion_r209880291
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala ---
    @@ -496,3 +496,194 @@ case class ArrayAggregate(
     
       override def prettyName: String = "aggregate"
     }
    +
    +/**
    + * Merges two given maps into a single map by applying function to the pair of values with
    + * the same key.
    + */
    +@ExpressionDescription(
    +  usage =
    +    """
    +      _FUNC_(map1, map2, function) - Merges two given maps into a single map by applying
    +      function to the pair of values with the same key. For keys only presented in one map,
    +      NULL will be passed as the value for the missing key. If an input map contains duplicated
    +      keys, only the first entry of the duplicated key is passed into the lambda function.
    +    """,
    +  examples = """
    +    Examples:
    +      > SELECT _FUNC_(map(1, 'a', 2, 'b'), map(1, 'x', 2, 'y'), (k, v1, v2) -> concat(v1, v2));
    +       {1:"ax",2:"by"}
    +  """,
    +  since = "2.4.0")
    +case class MapZipWith(left: Expression, right: Expression, function: Expression)
    +  extends HigherOrderFunction with CodegenFallback {
    +
    +  def functionForEval: Expression = functionsForEval.head
    +
    +  @transient lazy val MapType(leftKeyType, leftValueType, leftValueContainsNull) = left.dataType
    +
    +  @transient lazy val MapType(rightKeyType, rightValueType, rightValueContainsNull) = right.dataType
    +
    +  @transient lazy val keyType =
    +    TypeCoercion.findCommonTypeDifferentOnlyInNullFlags(leftKeyType, rightKeyType).get
    --- End diff --
    
    I see, thanks


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22017: [SPARK-23938][SQL] Add map_zip_with function

Posted by mgaido91 <gi...@git.apache.org>.
Github user mgaido91 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22017#discussion_r208187682
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala ---
    @@ -365,3 +364,101 @@ case class ArrayAggregate(
     
       override def prettyName: String = "aggregate"
     }
    +
    +/**
    + * Merges two given maps into a single map by applying function to the pair of values with
    + * the same key.
    + */
    +@ExpressionDescription(
    +  usage =
    +    """
    +      _FUNC_(map1, map2, function) - Merges two given maps into a single map by applying
    +      function to the pair of values with the same key. For keys only presented in one map,
    +      NULL will be passed as the value for the missing key. If an input map contains duplicated
    +      keys, only the first entry of the duplicated key is passed into the lambda function.
    +    """,
    +  examples = """
    +    Examples:
    +      > SELECT _FUNC_(map(1, 'a', 2, 'b'), map(1, 'x', 2, 'y'), (k, v1, v2) -> concat(v1, v2));
    +       {1:"ax",2:"by"}
    +  """,
    +  since = "2.4.0")
    +case class MapZipWith(left: Expression, right: Expression, function: Expression)
    +  extends HigherOrderFunction with CodegenFallback {
    +
    +  @transient lazy val functionForEval: Expression = functionsForEval.head
    +
    +  @transient lazy val MapType(keyType, leftValueType, _) = getMapType(left)
    +
    +  @transient lazy val MapType(_, rightValueType, _) = getMapType(right)
    +
    +  @transient lazy val arrayDataUnion = new ArrayDataUnion(keyType)
    +
    +  @transient lazy val ordering = TypeUtils.getInterpretedOrdering(keyType)
    +
    +  override def inputs: Seq[Expression] = left :: right :: Nil
    +
    +  override def functions: Seq[Expression] = function :: Nil
    +
    +  override def nullable: Boolean = left.nullable || right.nullable
    +
    +  override def dataType: DataType = MapType(keyType, function.dataType, function.nullable)
    +
    +  override def checkInputDataTypes(): TypeCheckResult = {
    +    (left.dataType, right.dataType) match {
    +      case (MapType(k1, _, _), MapType(k2, _, _)) if k1.sameType(k2) =>
    +        TypeUtils.checkForOrderingExpr(k1, s"function $prettyName")
    +      case _ => TypeCheckResult.TypeCheckFailure(s"The input to function $prettyName should have " +
    +        s"been two ${MapType.simpleString}s with the same key type, but it's " +
    +        s"[${left.dataType.catalogString}, ${right.dataType.catalogString}].")
    +    }
    +  }
    +
    +  private def getMapType(expr: Expression) = expr.dataType match {
    +    case m: MapType => m
    +    case _ => MapType.defaultConcreteType
    +  }
    +
    +  override def bind(f: (Expression, Seq[(DataType, Boolean)]) => LambdaFunction): MapZipWith = {
    +    val arguments = Seq((keyType, false), (leftValueType, true), (rightValueType, true))
    +    copy(function = f(function, arguments))
    +  }
    +
    +  override def eval(input: InternalRow): Any = {
    +    val value1 = left.eval(input)
    +    if (value1 == null) {
    +      null
    +    } else {
    +      val value2 = right.eval(input)
    +      if (value2 == null) {
    +        null
    +      } else {
    +        nullSafeEval(input, value1, value2)
    +      }
    +    }
    +  }
    +
    +  @transient lazy val LambdaFunction(_, Seq(
    +    keyVar: NamedLambdaVariable,
    +    value1Var: NamedLambdaVariable,
    +    value2Var: NamedLambdaVariable),
    +    _) = function
    +
    +  private def nullSafeEval(inputRow: InternalRow, value1: Any, value2: Any): Any = {
    +    val mapData1 = value1.asInstanceOf[MapData]
    +    val mapData2 = value2.asInstanceOf[MapData]
    +    val keys = arrayDataUnion(mapData1.keyArray(), mapData2.keyArray())
    +    val values = new GenericArrayData(new Array[Any](keys.numElements()))
    +    keys.foreach(keyType, (idx: Int, key: Any) => {
    +      val v1 = GetMapValueUtil.getValueEval(mapData1, key, keyType, leftValueType, ordering)
    --- End diff --
    
    This approach is very inefficient. The computational complexity is very high (N^2 in the size of the biggest map). I think here can implement something more efficient avoid also the changes for the code refactoring. I'd propose to get also the index where a key has been found in each map, so that we can access the values by index. In this way the overall complexity would be O(N).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22017: [SPARK-23938][SQL] Add map_zip_with function

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22017
  
    **[Test build #94570 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/94570/testReport)** for PR 22017 at commit [`595161f`](https://github.com/apache/spark/commit/595161fefbf55711b76530a9e53aff73491febd6).
     * This patch **fails SparkR unit tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22017: [SPARK-23938][SQL] Add map_zip_with function

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22017
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/94685/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22017: [SPARK-23938][SQL] Add map_zip_with function

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22017
  
    **[Test build #94526 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/94526/testReport)** for PR 22017 at commit [`3c849cb`](https://github.com/apache/spark/commit/3c849cbe70922bd22029b41f2558100dfbc16d9e).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22017: [SPARK-23938][SQL] Add map_zip_with function

Posted by ueshin <gi...@git.apache.org>.
Github user ueshin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22017#discussion_r209308055
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala ---
    @@ -231,6 +231,15 @@ object TypeCoercion {
           })
       }
     
    +  /**
    +   * Similar to [[findTightestCommonType]] but with string promotion.
    +   */
    +  def findWiderTypeForTwoExceptDecimals(t1: DataType, t2: DataType): Option[DataType] = {
    --- End diff --
    
    Why except Decimals?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22017: [SPARK-23938][SQL] Add map_zip_with function

Posted by ueshin <gi...@git.apache.org>.
Github user ueshin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22017#discussion_r209420696
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala ---
    @@ -442,3 +442,186 @@ case class ArrayAggregate(
     
       override def prettyName: String = "aggregate"
     }
    +
    +/**
    + * Merges two given maps into a single map by applying function to the pair of values with
    + * the same key.
    + */
    +@ExpressionDescription(
    +  usage =
    +    """
    +      _FUNC_(map1, map2, function) - Merges two given maps into a single map by applying
    +      function to the pair of values with the same key. For keys only presented in one map,
    +      NULL will be passed as the value for the missing key. If an input map contains duplicated
    +      keys, only the first entry of the duplicated key is passed into the lambda function.
    +    """,
    +  examples = """
    +    Examples:
    +      > SELECT _FUNC_(map(1, 'a', 2, 'b'), map(1, 'x', 2, 'y'), (k, v1, v2) -> concat(v1, v2));
    +       {1:"ax",2:"by"}
    +  """,
    +  since = "2.4.0")
    +case class MapZipWith(left: Expression, right: Expression, function: Expression)
    +  extends HigherOrderFunction with CodegenFallback {
    +
    +  @transient lazy val functionForEval: Expression = functionsForEval.head
    +
    +  @transient lazy val (leftKeyType, leftValueType, leftValueContainsNull) =
    +    HigherOrderFunction.mapKeyValueArgumentType(left.dataType)
    +
    +  @transient lazy val (rightKeyType, rightValueType, rightValueContainsNull) =
    +    HigherOrderFunction.mapKeyValueArgumentType(right.dataType)
    +
    +  @transient lazy val keyType =
    +    TypeCoercion.findTightestCommonType(leftKeyType, rightKeyType).getOrElse(NullType)
    --- End diff --
    
    I submitted a pr to fix analysis rules order to fix argument types before `bind` #22075.
    Btw, we should use `findCommonTypeDifferentOnlyInNullFlags` for this. Sorry for confusing you.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22017: [SPARK-23938][SQL] Add map_zip_with function

Posted by ueshin <gi...@git.apache.org>.
Github user ueshin commented on the issue:

    https://github.com/apache/spark/pull/22017
  
    LGTM.
    @mgaido91 Do you have any other comments on this?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22017: [SPARK-23938][SQL] Add map_zip_with function

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22017
  
    Can one of the admins verify this patch?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22017: [SPARK-23938][SQL] Add map_zip_with function

Posted by mn-mikke <gi...@git.apache.org>.
Github user mn-mikke commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22017#discussion_r209876913
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala ---
    @@ -496,3 +496,194 @@ case class ArrayAggregate(
     
       override def prettyName: String = "aggregate"
     }
    +
    +/**
    + * Merges two given maps into a single map by applying function to the pair of values with
    + * the same key.
    + */
    +@ExpressionDescription(
    +  usage =
    +    """
    +      _FUNC_(map1, map2, function) - Merges two given maps into a single map by applying
    +      function to the pair of values with the same key. For keys only presented in one map,
    +      NULL will be passed as the value for the missing key. If an input map contains duplicated
    +      keys, only the first entry of the duplicated key is passed into the lambda function.
    +    """,
    +  examples = """
    +    Examples:
    +      > SELECT _FUNC_(map(1, 'a', 2, 'b'), map(1, 'x', 2, 'y'), (k, v1, v2) -> concat(v1, v2));
    +       {1:"ax",2:"by"}
    +  """,
    +  since = "2.4.0")
    +case class MapZipWith(left: Expression, right: Expression, function: Expression)
    +  extends HigherOrderFunction with CodegenFallback {
    +
    +  def functionForEval: Expression = functionsForEval.head
    +
    +  @transient lazy val MapType(leftKeyType, leftValueType, leftValueContainsNull) = left.dataType
    +
    +  @transient lazy val MapType(rightKeyType, rightValueType, rightValueContainsNull) = right.dataType
    +
    +  @transient lazy val keyType =
    +    TypeCoercion.findCommonTypeDifferentOnlyInNullFlags(leftKeyType, rightKeyType).get
    --- End diff --
    
    If ```leftKeyType``` is ```ArrayType(IntegerType, false)``` and ```rightKeyType``` is ```ArrayType(IntegerType, true)``` for instance, the coercion rule is not executed ```leftKeyType.sameType(rightKeyType) == true```.
    
    An array with nulls seems to be a valid key.:
    ```
    scala> spark.range(1).selectExpr("map(array(1, 2, null), 12)").show()
    +---------------------------------------+
    |map(array(1, 2, CAST(NULL AS INT)), 12)|
    +---------------------------------------+
    |                        [[1, 2,] -> 12]|
    +---------------------------------------+
    ```


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22017: [SPARK-23938][SQL] Add map_zip_with function

Posted by ueshin <gi...@git.apache.org>.
Github user ueshin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22017#discussion_r209422954
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala ---
    @@ -231,6 +231,15 @@ object TypeCoercion {
           })
       }
     
    +  /**
    +   * Similar to [[findTightestCommonType]] but with string promotion.
    +   */
    +  def findWiderTypeForTwoExceptDecimals(t1: DataType, t2: DataType): Option[DataType] = {
    --- End diff --
    
    ok, I see that this is a matter of `findTypeForComplex`. I'll submit another pr later. Maybe we can go back to `findWiderTypeForTwo` in `TypeCoercion` and `findCommonTypeDifferentOnlyInNullFlag` for `keyType`.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22017: [SPARK-23938][SQL] Add map_zip_with function

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22017
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/94585/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22017: [SPARK-23938][SQL] Add map_zip_with function

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22017
  
    **[Test build #94570 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/94570/testReport)** for PR 22017 at commit [`595161f`](https://github.com/apache/spark/commit/595161fefbf55711b76530a9e53aff73491febd6).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22017: [SPARK-23938][SQL] Add map_zip_with function

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22017
  
    **[Test build #94393 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/94393/testReport)** for PR 22017 at commit [`89a3da4`](https://github.com/apache/spark/commit/89a3da4e292690b78fbb41deef4104be3f843c1b).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22017: [SPARK-23938][SQL] Add map_zip_with function

Posted by mgaido91 <gi...@git.apache.org>.
Github user mgaido91 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22017#discussion_r208662014
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala ---
    @@ -442,3 +442,191 @@ case class ArrayAggregate(
     
       override def prettyName: String = "aggregate"
     }
    +
    +/**
    + * Merges two given maps into a single map by applying function to the pair of values with
    + * the same key.
    + */
    +@ExpressionDescription(
    +  usage =
    +    """
    +      _FUNC_(map1, map2, function) - Merges two given maps into a single map by applying
    +      function to the pair of values with the same key. For keys only presented in one map,
    +      NULL will be passed as the value for the missing key. If an input map contains duplicated
    +      keys, only the first entry of the duplicated key is passed into the lambda function.
    +    """,
    +  examples = """
    +    Examples:
    +      > SELECT _FUNC_(map(1, 'a', 2, 'b'), map(1, 'x', 2, 'y'), (k, v1, v2) -> concat(v1, v2));
    +       {1:"ax",2:"by"}
    +  """,
    +  since = "2.4.0")
    +case class MapZipWith(left: Expression, right: Expression, function: Expression)
    +  extends HigherOrderFunction with CodegenFallback {
    +
    +  @transient lazy val functionForEval: Expression = functionsForEval.head
    +
    +  @transient lazy val (keyType, leftValueType, _) =
    +    HigherOrderFunction.mapKeyValueArgumentType(left.dataType)
    +
    +  @transient lazy val (_, rightValueType, _) =
    +    HigherOrderFunction.mapKeyValueArgumentType(right.dataType)
    +
    +  @transient lazy val ordering = TypeUtils.getInterpretedOrdering(keyType)
    +
    +  override def inputs: Seq[Expression] = left :: right :: Nil
    +
    +  override def functions: Seq[Expression] = function :: Nil
    +
    +  override def nullable: Boolean = left.nullable || right.nullable
    +
    +  override def dataType: DataType = MapType(keyType, function.dataType, function.nullable)
    +
    +  override def checkInputDataTypes(): TypeCheckResult = {
    +    (left.dataType, right.dataType) match {
    +      case (MapType(k1, _, _), MapType(k2, _, _)) if k1.sameType(k2) =>
    +        TypeUtils.checkForOrderingExpr(k1, s"function $prettyName")
    +      case _ => TypeCheckResult.TypeCheckFailure(s"The input to function $prettyName should have " +
    +        s"been two ${MapType.simpleString}s with the same key type, but it's " +
    +        s"[${left.dataType.catalogString}, ${right.dataType.catalogString}].")
    +    }
    +  }
    +
    +  override def bind(f: (Expression, Seq[(DataType, Boolean)]) => LambdaFunction): MapZipWith = {
    +    val arguments = Seq((keyType, false), (leftValueType, true), (rightValueType, true))
    +    copy(function = f(function, arguments))
    +  }
    +
    +  override def eval(input: InternalRow): Any = {
    +    val value1 = left.eval(input)
    +    if (value1 == null) {
    +      null
    +    } else {
    +      val value2 = right.eval(input)
    +      if (value2 == null) {
    +        null
    +      } else {
    +        nullSafeEval(input, value1, value2)
    +      }
    +    }
    +  }
    +
    +  @transient lazy val LambdaFunction(_, Seq(
    +    keyVar: NamedLambdaVariable,
    +    value1Var: NamedLambdaVariable,
    +    value2Var: NamedLambdaVariable),
    +    _) = function
    +
    +  private def keyTypeSupportsEquals = keyType match {
    +    case BinaryType => false
    +    case _: AtomicType => true
    +    case _ => false
    +  }
    +
    +  @transient private lazy val getKeysWithValueIndexes:
    +      (ArrayData, ArrayData) => Seq[(Any, Array[Option[Int]])] = {
    +    if (keyTypeSupportsEquals) {
    +      getKeysWithIndexesFast
    +    } else {
    +      getKeysWithIndexesBruteForce
    +    }
    +  }
    +
    +  private def assertSizeOfArrayBuffer(size: Int): Unit = {
    +    if (size > ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH) {
    +      throw new RuntimeException(s"Unsuccessful try to zip maps with $size " +
    +        s"unique keys due to exceeding the array size limit " +
    +        s"${ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH}.")
    +    }
    +  }
    +
    +  private def getKeysWithIndexesFast(keys1: ArrayData, keys2: ArrayData) = {
    +    val arrayBuffer = new mutable.ArrayBuffer[(Any, Array[Option[Int]])]
    --- End diff --
    
    we can the same check, I don't think it is really needed as having duplicated keys is anyway a bad situation itself which shouldn't happen.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22017: [SPARK-23938][SQL] Add map_zip_with function

Posted by mn-mikke <gi...@git.apache.org>.
Github user mn-mikke commented on the issue:

    https://github.com/apache/spark/pull/22017
  
    retest this please


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22017: [SPARK-23938][SQL] Add map_zip_with function

Posted by ueshin <gi...@git.apache.org>.
Github user ueshin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22017#discussion_r209421292
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala ---
    @@ -231,6 +231,15 @@ object TypeCoercion {
           })
       }
     
    +  /**
    +   * Similar to [[findTightestCommonType]] but with string promotion.
    +   */
    +  def findWiderTypeForTwoExceptDecimals(t1: DataType, t2: DataType): Option[DataType] = {
    --- End diff --
    
    On second thoughts, do we really need those? Seems like the current coercions rules don't contain possibly cast to null?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22017: [SPARK-23938][SQL] Add map_zip_with function

Posted by mgaido91 <gi...@git.apache.org>.
Github user mgaido91 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22017#discussion_r208538187
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala ---
    @@ -442,3 +442,191 @@ case class ArrayAggregate(
     
       override def prettyName: String = "aggregate"
     }
    +
    +/**
    + * Merges two given maps into a single map by applying function to the pair of values with
    + * the same key.
    + */
    +@ExpressionDescription(
    +  usage =
    +    """
    +      _FUNC_(map1, map2, function) - Merges two given maps into a single map by applying
    +      function to the pair of values with the same key. For keys only presented in one map,
    +      NULL will be passed as the value for the missing key. If an input map contains duplicated
    +      keys, only the first entry of the duplicated key is passed into the lambda function.
    +    """,
    +  examples = """
    +    Examples:
    +      > SELECT _FUNC_(map(1, 'a', 2, 'b'), map(1, 'x', 2, 'y'), (k, v1, v2) -> concat(v1, v2));
    +       {1:"ax",2:"by"}
    +  """,
    +  since = "2.4.0")
    +case class MapZipWith(left: Expression, right: Expression, function: Expression)
    +  extends HigherOrderFunction with CodegenFallback {
    +
    +  @transient lazy val functionForEval: Expression = functionsForEval.head
    +
    +  @transient lazy val (keyType, leftValueType, _) =
    +    HigherOrderFunction.mapKeyValueArgumentType(left.dataType)
    +
    +  @transient lazy val (_, rightValueType, _) =
    +    HigherOrderFunction.mapKeyValueArgumentType(right.dataType)
    +
    +  @transient lazy val ordering = TypeUtils.getInterpretedOrdering(keyType)
    +
    +  override def inputs: Seq[Expression] = left :: right :: Nil
    +
    +  override def functions: Seq[Expression] = function :: Nil
    +
    +  override def nullable: Boolean = left.nullable || right.nullable
    +
    +  override def dataType: DataType = MapType(keyType, function.dataType, function.nullable)
    +
    +  override def checkInputDataTypes(): TypeCheckResult = {
    +    (left.dataType, right.dataType) match {
    +      case (MapType(k1, _, _), MapType(k2, _, _)) if k1.sameType(k2) =>
    +        TypeUtils.checkForOrderingExpr(k1, s"function $prettyName")
    +      case _ => TypeCheckResult.TypeCheckFailure(s"The input to function $prettyName should have " +
    +        s"been two ${MapType.simpleString}s with the same key type, but it's " +
    +        s"[${left.dataType.catalogString}, ${right.dataType.catalogString}].")
    +    }
    +  }
    +
    +  override def bind(f: (Expression, Seq[(DataType, Boolean)]) => LambdaFunction): MapZipWith = {
    +    val arguments = Seq((keyType, false), (leftValueType, true), (rightValueType, true))
    +    copy(function = f(function, arguments))
    +  }
    +
    +  override def eval(input: InternalRow): Any = {
    +    val value1 = left.eval(input)
    +    if (value1 == null) {
    +      null
    +    } else {
    +      val value2 = right.eval(input)
    +      if (value2 == null) {
    +        null
    +      } else {
    +        nullSafeEval(input, value1, value2)
    +      }
    +    }
    +  }
    +
    +  @transient lazy val LambdaFunction(_, Seq(
    +    keyVar: NamedLambdaVariable,
    +    value1Var: NamedLambdaVariable,
    +    value2Var: NamedLambdaVariable),
    +    _) = function
    +
    +  private def keyTypeSupportsEquals = keyType match {
    +    case BinaryType => false
    +    case _: AtomicType => true
    +    case _ => false
    +  }
    +
    +  @transient private lazy val getKeysWithValueIndexes:
    +      (ArrayData, ArrayData) => Seq[(Any, Array[Option[Int]])] = {
    +    if (keyTypeSupportsEquals) {
    +      getKeysWithIndexesFast
    +    } else {
    +      getKeysWithIndexesBruteForce
    +    }
    +  }
    +
    +  private def assertSizeOfArrayBuffer(size: Int): Unit = {
    +    if (size > ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH) {
    +      throw new RuntimeException(s"Unsuccessful try to zip maps with $size " +
    +        s"unique keys due to exceeding the array size limit " +
    +        s"${ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH}.")
    +    }
    +  }
    +
    +  private def getKeysWithIndexesFast(keys1: ArrayData, keys2: ArrayData) = {
    +    val arrayBuffer = new mutable.ArrayBuffer[(Any, Array[Option[Int]])]
    --- End diff --
    
    > If we changed it to (Option[Int], Option[Int]), wouldn't we need two similar i loops instead of one?
    I really don't think so, it would be the same as now I think
    
    well, maybe we can fix map comparison in tests... :)


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22017: [SPARK-23938][SQL] Add map_zip_with function

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22017
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22017: [SPARK-23938][SQL] Add map_zip_with function

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22017
  
    **[Test build #94737 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/94737/testReport)** for PR 22017 at commit [`bcd4e0f`](https://github.com/apache/spark/commit/bcd4e0f3956f622d99e5fcf4a98c155b92dbeb9a).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22017: [SPARK-23938][SQL] Add map_zip_with function

Posted by mgaido91 <gi...@git.apache.org>.
Github user mgaido91 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22017#discussion_r209160027
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala ---
    @@ -442,3 +442,186 @@ case class ArrayAggregate(
     
       override def prettyName: String = "aggregate"
     }
    +
    +/**
    + * Merges two given maps into a single map by applying function to the pair of values with
    + * the same key.
    + */
    +@ExpressionDescription(
    +  usage =
    +    """
    +      _FUNC_(map1, map2, function) - Merges two given maps into a single map by applying
    +      function to the pair of values with the same key. For keys only presented in one map,
    +      NULL will be passed as the value for the missing key. If an input map contains duplicated
    +      keys, only the first entry of the duplicated key is passed into the lambda function.
    +    """,
    +  examples = """
    +    Examples:
    +      > SELECT _FUNC_(map(1, 'a', 2, 'b'), map(1, 'x', 2, 'y'), (k, v1, v2) -> concat(v1, v2));
    +       {1:"ax",2:"by"}
    +  """,
    +  since = "2.4.0")
    +case class MapZipWith(left: Expression, right: Expression, function: Expression)
    +  extends HigherOrderFunction with CodegenFallback {
    +
    +  @transient lazy val functionForEval: Expression = functionsForEval.head
    +
    +  @transient lazy val (leftKeyType, leftValueType, leftValueContainsNull) =
    +    HigherOrderFunction.mapKeyValueArgumentType(left.dataType)
    +
    +  @transient lazy val (rightKeyType, rightValueType, rightValueContainsNull) =
    +    HigherOrderFunction.mapKeyValueArgumentType(right.dataType)
    +
    +  @transient lazy val keyType =
    +    TypeCoercion.findTightestCommonType(leftKeyType, rightKeyType).getOrElse(NullType)
    --- End diff --
    
    why do we need this? We are enforcing that the two maps have the same key type, can't we just get one?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22017: [SPARK-23938][SQL] Add map_zip_with function

Posted by mgaido91 <gi...@git.apache.org>.
Github user mgaido91 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22017#discussion_r208878022
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala ---
    @@ -442,3 +442,191 @@ case class ArrayAggregate(
     
       override def prettyName: String = "aggregate"
     }
    +
    +/**
    + * Merges two given maps into a single map by applying function to the pair of values with
    + * the same key.
    + */
    +@ExpressionDescription(
    +  usage =
    +    """
    +      _FUNC_(map1, map2, function) - Merges two given maps into a single map by applying
    +      function to the pair of values with the same key. For keys only presented in one map,
    +      NULL will be passed as the value for the missing key. If an input map contains duplicated
    +      keys, only the first entry of the duplicated key is passed into the lambda function.
    +    """,
    +  examples = """
    +    Examples:
    +      > SELECT _FUNC_(map(1, 'a', 2, 'b'), map(1, 'x', 2, 'y'), (k, v1, v2) -> concat(v1, v2));
    +       {1:"ax",2:"by"}
    +  """,
    +  since = "2.4.0")
    +case class MapZipWith(left: Expression, right: Expression, function: Expression)
    +  extends HigherOrderFunction with CodegenFallback {
    +
    +  @transient lazy val functionForEval: Expression = functionsForEval.head
    +
    +  @transient lazy val (keyType, leftValueType, _) =
    +    HigherOrderFunction.mapKeyValueArgumentType(left.dataType)
    +
    +  @transient lazy val (_, rightValueType, _) =
    +    HigherOrderFunction.mapKeyValueArgumentType(right.dataType)
    +
    +  @transient lazy val ordering = TypeUtils.getInterpretedOrdering(keyType)
    +
    +  override def inputs: Seq[Expression] = left :: right :: Nil
    +
    +  override def functions: Seq[Expression] = function :: Nil
    +
    +  override def nullable: Boolean = left.nullable || right.nullable
    +
    +  override def dataType: DataType = MapType(keyType, function.dataType, function.nullable)
    +
    +  override def checkInputDataTypes(): TypeCheckResult = {
    +    (left.dataType, right.dataType) match {
    +      case (MapType(k1, _, _), MapType(k2, _, _)) if k1.sameType(k2) =>
    +        TypeUtils.checkForOrderingExpr(k1, s"function $prettyName")
    +      case _ => TypeCheckResult.TypeCheckFailure(s"The input to function $prettyName should have " +
    +        s"been two ${MapType.simpleString}s with the same key type, but it's " +
    +        s"[${left.dataType.catalogString}, ${right.dataType.catalogString}].")
    +    }
    +  }
    +
    +  override def bind(f: (Expression, Seq[(DataType, Boolean)]) => LambdaFunction): MapZipWith = {
    +    val arguments = Seq((keyType, false), (leftValueType, true), (rightValueType, true))
    +    copy(function = f(function, arguments))
    +  }
    +
    +  override def eval(input: InternalRow): Any = {
    +    val value1 = left.eval(input)
    +    if (value1 == null) {
    +      null
    +    } else {
    +      val value2 = right.eval(input)
    +      if (value2 == null) {
    +        null
    +      } else {
    +        nullSafeEval(input, value1, value2)
    +      }
    +    }
    +  }
    +
    +  @transient lazy val LambdaFunction(_, Seq(
    +    keyVar: NamedLambdaVariable,
    +    value1Var: NamedLambdaVariable,
    +    value2Var: NamedLambdaVariable),
    +    _) = function
    +
    +  private def keyTypeSupportsEquals = keyType match {
    +    case BinaryType => false
    +    case _: AtomicType => true
    +    case _ => false
    +  }
    +
    +  @transient private lazy val getKeysWithValueIndexes:
    +      (ArrayData, ArrayData) => Seq[(Any, Array[Option[Int]])] = {
    +    if (keyTypeSupportsEquals) {
    +      getKeysWithIndexesFast
    +    } else {
    +      getKeysWithIndexesBruteForce
    +    }
    +  }
    +
    +  private def assertSizeOfArrayBuffer(size: Int): Unit = {
    +    if (size > ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH) {
    +      throw new RuntimeException(s"Unsuccessful try to zip maps with $size " +
    +        s"unique keys due to exceeding the array size limit " +
    +        s"${ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH}.")
    +    }
    +  }
    +
    +  private def getKeysWithIndexesFast(keys1: ArrayData, keys2: ArrayData) = {
    +    val arrayBuffer = new mutable.ArrayBuffer[(Any, Array[Option[Int]])]
    --- End diff --
    
    @mn-mikke we can use `LinkedHashMap` in order to preserve key order.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22017: [SPARK-23938][SQL] Add map_zip_with function

Posted by ueshin <gi...@git.apache.org>.
Github user ueshin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22017#discussion_r208803239
  
    --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvalHelper.scala ---
    @@ -225,7 +264,9 @@ trait ExpressionEvalHelper extends GeneratorDrivenPropertyChecks with PlanTestBa
               val lit = InternalRow(expected, expected)
               val expectedRow =
                 UnsafeProjection.create(Array(expression.dataType, expression.dataType)).apply(lit)
    -          if (unsafeRow != expectedRow) {
    +          val field = StructField("field", expression.dataType)
    +          val dataType = StructType(field :: field :: Nil)
    +          if (!checkResult(unsafeRow, expectedRow, dataType)) {
    --- End diff --
    
    What's this for?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22017: [SPARK-23938][SQL] Add map_zip_with function

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22017
  
    Merged build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22017: [SPARK-23938][SQL] Add map_zip_with function

Posted by mn-mikke <gi...@git.apache.org>.
Github user mn-mikke commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22017#discussion_r208605882
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala ---
    @@ -442,3 +442,191 @@ case class ArrayAggregate(
     
       override def prettyName: String = "aggregate"
     }
    +
    +/**
    + * Merges two given maps into a single map by applying function to the pair of values with
    + * the same key.
    + */
    +@ExpressionDescription(
    +  usage =
    +    """
    +      _FUNC_(map1, map2, function) - Merges two given maps into a single map by applying
    +      function to the pair of values with the same key. For keys only presented in one map,
    +      NULL will be passed as the value for the missing key. If an input map contains duplicated
    +      keys, only the first entry of the duplicated key is passed into the lambda function.
    +    """,
    +  examples = """
    +    Examples:
    +      > SELECT _FUNC_(map(1, 'a', 2, 'b'), map(1, 'x', 2, 'y'), (k, v1, v2) -> concat(v1, v2));
    +       {1:"ax",2:"by"}
    +  """,
    +  since = "2.4.0")
    +case class MapZipWith(left: Expression, right: Expression, function: Expression)
    +  extends HigherOrderFunction with CodegenFallback {
    +
    +  @transient lazy val functionForEval: Expression = functionsForEval.head
    +
    +  @transient lazy val (keyType, leftValueType, _) =
    +    HigherOrderFunction.mapKeyValueArgumentType(left.dataType)
    +
    +  @transient lazy val (_, rightValueType, _) =
    +    HigherOrderFunction.mapKeyValueArgumentType(right.dataType)
    +
    +  @transient lazy val ordering = TypeUtils.getInterpretedOrdering(keyType)
    +
    +  override def inputs: Seq[Expression] = left :: right :: Nil
    +
    +  override def functions: Seq[Expression] = function :: Nil
    +
    +  override def nullable: Boolean = left.nullable || right.nullable
    +
    +  override def dataType: DataType = MapType(keyType, function.dataType, function.nullable)
    +
    +  override def checkInputDataTypes(): TypeCheckResult = {
    +    (left.dataType, right.dataType) match {
    +      case (MapType(k1, _, _), MapType(k2, _, _)) if k1.sameType(k2) =>
    +        TypeUtils.checkForOrderingExpr(k1, s"function $prettyName")
    +      case _ => TypeCheckResult.TypeCheckFailure(s"The input to function $prettyName should have " +
    +        s"been two ${MapType.simpleString}s with the same key type, but it's " +
    +        s"[${left.dataType.catalogString}, ${right.dataType.catalogString}].")
    +    }
    +  }
    +
    +  override def bind(f: (Expression, Seq[(DataType, Boolean)]) => LambdaFunction): MapZipWith = {
    +    val arguments = Seq((keyType, false), (leftValueType, true), (rightValueType, true))
    +    copy(function = f(function, arguments))
    +  }
    +
    +  override def eval(input: InternalRow): Any = {
    +    val value1 = left.eval(input)
    +    if (value1 == null) {
    +      null
    +    } else {
    +      val value2 = right.eval(input)
    +      if (value2 == null) {
    +        null
    +      } else {
    +        nullSafeEval(input, value1, value2)
    +      }
    +    }
    +  }
    +
    +  @transient lazy val LambdaFunction(_, Seq(
    +    keyVar: NamedLambdaVariable,
    +    value1Var: NamedLambdaVariable,
    +    value2Var: NamedLambdaVariable),
    +    _) = function
    +
    +  private def keyTypeSupportsEquals = keyType match {
    +    case BinaryType => false
    +    case _: AtomicType => true
    +    case _ => false
    +  }
    +
    +  @transient private lazy val getKeysWithValueIndexes:
    +      (ArrayData, ArrayData) => Seq[(Any, Array[Option[Int]])] = {
    +    if (keyTypeSupportsEquals) {
    +      getKeysWithIndexesFast
    +    } else {
    +      getKeysWithIndexesBruteForce
    +    }
    +  }
    +
    +  private def assertSizeOfArrayBuffer(size: Int): Unit = {
    +    if (size > ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH) {
    +      throw new RuntimeException(s"Unsuccessful try to zip maps with $size " +
    +        s"unique keys due to exceeding the array size limit " +
    +        s"${ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH}.")
    +    }
    +  }
    +
    +  private def getKeysWithIndexesFast(keys1: ArrayData, keys2: ArrayData) = {
    +    val arrayBuffer = new mutable.ArrayBuffer[(Any, Array[Option[Int]])]
    --- End diff --
    
    > you don't need to check neither whether the key is there nor the size of the output array, you just need to add them.
    
    What about duplicated keys? They can be created with other map functions.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22017: [SPARK-23938][SQL] Add map_zip_with function

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22017
  
    Merged build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22017: [SPARK-23938][SQL] Add map_zip_with function

Posted by ueshin <gi...@git.apache.org>.
Github user ueshin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22017#discussion_r208181599
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala ---
    @@ -365,3 +364,101 @@ case class ArrayAggregate(
     
       override def prettyName: String = "aggregate"
     }
    +
    +/**
    + * Merges two given maps into a single map by applying function to the pair of values with
    + * the same key.
    + */
    +@ExpressionDescription(
    +  usage =
    +    """
    +      _FUNC_(map1, map2, function) - Merges two given maps into a single map by applying
    +      function to the pair of values with the same key. For keys only presented in one map,
    +      NULL will be passed as the value for the missing key. If an input map contains duplicated
    +      keys, only the first entry of the duplicated key is passed into the lambda function.
    +    """,
    +  examples = """
    +    Examples:
    +      > SELECT _FUNC_(map(1, 'a', 2, 'b'), map(1, 'x', 2, 'y'), (k, v1, v2) -> concat(v1, v2));
    +       {1:"ax",2:"by"}
    +  """,
    +  since = "2.4.0")
    +case class MapZipWith(left: Expression, right: Expression, function: Expression)
    +  extends HigherOrderFunction with CodegenFallback {
    +
    +  @transient lazy val functionForEval: Expression = functionsForEval.head
    +
    +  @transient lazy val MapType(keyType, leftValueType, _) = getMapType(left)
    +
    +  @transient lazy val MapType(_, rightValueType, _) = getMapType(right)
    +
    +  @transient lazy val arrayDataUnion = new ArrayDataUnion(keyType)
    +
    +  @transient lazy val ordering = TypeUtils.getInterpretedOrdering(keyType)
    +
    +  override def inputs: Seq[Expression] = left :: right :: Nil
    +
    +  override def functions: Seq[Expression] = function :: Nil
    +
    +  override def nullable: Boolean = left.nullable || right.nullable
    +
    +  override def dataType: DataType = MapType(keyType, function.dataType, function.nullable)
    +
    +  override def checkInputDataTypes(): TypeCheckResult = {
    +    (left.dataType, right.dataType) match {
    +      case (MapType(k1, _, _), MapType(k2, _, _)) if k1.sameType(k2) =>
    +        TypeUtils.checkForOrderingExpr(k1, s"function $prettyName")
    +      case _ => TypeCheckResult.TypeCheckFailure(s"The input to function $prettyName should have " +
    +        s"been two ${MapType.simpleString}s with the same key type, but it's " +
    +        s"[${left.dataType.catalogString}, ${right.dataType.catalogString}].")
    +    }
    +  }
    +
    +  private def getMapType(expr: Expression) = expr.dataType match {
    --- End diff --
    
    I'd like you to use the same util method. I suggested to introduce `object HigherOrderFunction` at https://github.com/apache/spark/pull/21986#discussion_r208170769.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22017: [SPARK-23938][SQL] Add map_zip_with function

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22017
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/94737/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22017: [SPARK-23938][SQL] Add map_zip_with function

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22017
  
    **[Test build #94325 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/94325/testReport)** for PR 22017 at commit [`ef56011`](https://github.com/apache/spark/commit/ef56011f03d8bae4634e5d3108e4d6502482383c).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds the following public classes _(experimental)_:
      * `class ArrayDataMerger(elementType: DataType) `
      * `case class ArrayUnion(left: Expression, right: Expression) extends ArraySetLike`
      * `abstract class GetMapValueUtil extends BinaryExpression with ImplicitCastInputTypes `
      * `case class MapZipWith(left: Expression, right: Expression, function: Expression)`


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22017: [SPARK-23938][SQL] Add map_zip_with function

Posted by mn-mikke <gi...@git.apache.org>.
Github user mn-mikke commented on the issue:

    https://github.com/apache/spark/pull/22017
  
    cc @ueshin @mgaido91 @hvanhovell 


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22017: [SPARK-23938][SQL] Add map_zip_with function

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22017
  
    **[Test build #94526 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/94526/testReport)** for PR 22017 at commit [`3c849cb`](https://github.com/apache/spark/commit/3c849cbe70922bd22029b41f2558100dfbc16d9e).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22017: [SPARK-23938][SQL] Add map_zip_with function

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22017
  
    **[Test build #94585 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/94585/testReport)** for PR 22017 at commit [`595161f`](https://github.com/apache/spark/commit/595161fefbf55711b76530a9e53aff73491febd6).
     * This patch **fails Spark unit tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22017: [SPARK-23938][SQL] Add map_zip_with function

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22017
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22017: [SPARK-23938][SQL] Add map_zip_with function

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22017
  
    **[Test build #94458 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/94458/testReport)** for PR 22017 at commit [`38ce4e7`](https://github.com/apache/spark/commit/38ce4e72209d2f21cdb0993f89799d563e9ecd97).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22017: [SPARK-23938][SQL] Add map_zip_with function

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22017
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/94458/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22017: [SPARK-23938][SQL] Add map_zip_with function

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22017
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/94712/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22017: [SPARK-23938][SQL] Add map_zip_with function

Posted by mn-mikke <gi...@git.apache.org>.
Github user mn-mikke commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22017#discussion_r208871779
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala ---
    @@ -442,3 +442,184 @@ case class ArrayAggregate(
     
       override def prettyName: String = "aggregate"
     }
    +
    +/**
    + * Merges two given maps into a single map by applying function to the pair of values with
    + * the same key.
    + */
    +@ExpressionDescription(
    +  usage =
    +    """
    +      _FUNC_(map1, map2, function) - Merges two given maps into a single map by applying
    +      function to the pair of values with the same key. For keys only presented in one map,
    +      NULL will be passed as the value for the missing key. If an input map contains duplicated
    +      keys, only the first entry of the duplicated key is passed into the lambda function.
    +    """,
    +  examples = """
    +    Examples:
    +      > SELECT _FUNC_(map(1, 'a', 2, 'b'), map(1, 'x', 2, 'y'), (k, v1, v2) -> concat(v1, v2));
    +       {1:"ax",2:"by"}
    +  """,
    +  since = "2.4.0")
    +case class MapZipWith(left: Expression, right: Expression, function: Expression)
    +  extends HigherOrderFunction with CodegenFallback {
    +
    +  @transient lazy val functionForEval: Expression = functionsForEval.head
    +
    +  @transient lazy val (keyType, leftValueType, _) =
    --- End diff --
    
    You are right, thanks!
    
    WDYT about introducing a coercion rule handling different key types? For cases like (```IntType```, ```LongType```) might be handy.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22017: [SPARK-23938][SQL] Add map_zip_with function

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22017
  
    **[Test build #94737 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/94737/testReport)** for PR 22017 at commit [`bcd4e0f`](https://github.com/apache/spark/commit/bcd4e0f3956f622d99e5fcf4a98c155b92dbeb9a).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22017: [SPARK-23938][SQL] Add map_zip_with function

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22017
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/94362/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22017: [SPARK-23938][SQL] Add map_zip_with function

Posted by ueshin <gi...@git.apache.org>.
Github user ueshin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22017#discussion_r209420042
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala ---
    @@ -231,6 +231,15 @@ object TypeCoercion {
           })
       }
     
    +  /**
    +   * Similar to [[findTightestCommonType]] but with string promotion.
    +   */
    +  def findWiderTypeForTwoExceptDecimals(t1: DataType, t2: DataType): Option[DataType] = {
    --- End diff --
    
    Ah, I see, good catch! But it led me to another issue. We can't choose those types possibly to be null as a map key. Instead of adding the method, how about modifying `findTypeForComplex` as something like:
    
    ```scala
    private def findTypeForComplex(
          t1: DataType,
          t2: DataType,
          findTypeFunc: (DataType, DataType) => Option[DataType]): Option[DataType] = (t1, t2) match {
      ...
        case (MapType(kt1, vt1, valueContainsNull1), MapType(kt2, vt2, valueContainsNull2)) =>
          findTypeFunc(kt1, kt2)
            .filter(kt => !Cast.forceNullable(kt1, kt) && !Cast.forceNullable(kt2, kt))
            .flatMap { kt =>
              findTypeFunc(vt1, vt2).map { vt =>
                MapType(kt, vt, valueContainsNull1 || valueContainsNull2)
              }
          }
      ...
    }
    ```
    
    We might need to have another pr to discuss this.
    
    cc @cloud-fan @gatorsmile 


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22017: [SPARK-23938][SQL] Add map_zip_with function

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22017
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22017: [SPARK-23938][SQL] Add map_zip_with function

Posted by mn-mikke <gi...@git.apache.org>.
Github user mn-mikke commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22017#discussion_r208527423
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala ---
    @@ -442,3 +442,191 @@ case class ArrayAggregate(
     
       override def prettyName: String = "aggregate"
     }
    +
    +/**
    + * Merges two given maps into a single map by applying function to the pair of values with
    + * the same key.
    + */
    +@ExpressionDescription(
    +  usage =
    +    """
    +      _FUNC_(map1, map2, function) - Merges two given maps into a single map by applying
    +      function to the pair of values with the same key. For keys only presented in one map,
    +      NULL will be passed as the value for the missing key. If an input map contains duplicated
    +      keys, only the first entry of the duplicated key is passed into the lambda function.
    +    """,
    +  examples = """
    +    Examples:
    +      > SELECT _FUNC_(map(1, 'a', 2, 'b'), map(1, 'x', 2, 'y'), (k, v1, v2) -> concat(v1, v2));
    +       {1:"ax",2:"by"}
    +  """,
    +  since = "2.4.0")
    +case class MapZipWith(left: Expression, right: Expression, function: Expression)
    +  extends HigherOrderFunction with CodegenFallback {
    +
    +  @transient lazy val functionForEval: Expression = functionsForEval.head
    +
    +  @transient lazy val (keyType, leftValueType, _) =
    +    HigherOrderFunction.mapKeyValueArgumentType(left.dataType)
    +
    +  @transient lazy val (_, rightValueType, _) =
    +    HigherOrderFunction.mapKeyValueArgumentType(right.dataType)
    +
    +  @transient lazy val ordering = TypeUtils.getInterpretedOrdering(keyType)
    +
    +  override def inputs: Seq[Expression] = left :: right :: Nil
    +
    +  override def functions: Seq[Expression] = function :: Nil
    +
    +  override def nullable: Boolean = left.nullable || right.nullable
    +
    +  override def dataType: DataType = MapType(keyType, function.dataType, function.nullable)
    +
    +  override def checkInputDataTypes(): TypeCheckResult = {
    +    (left.dataType, right.dataType) match {
    +      case (MapType(k1, _, _), MapType(k2, _, _)) if k1.sameType(k2) =>
    +        TypeUtils.checkForOrderingExpr(k1, s"function $prettyName")
    +      case _ => TypeCheckResult.TypeCheckFailure(s"The input to function $prettyName should have " +
    +        s"been two ${MapType.simpleString}s with the same key type, but it's " +
    +        s"[${left.dataType.catalogString}, ${right.dataType.catalogString}].")
    +    }
    +  }
    +
    +  override def bind(f: (Expression, Seq[(DataType, Boolean)]) => LambdaFunction): MapZipWith = {
    +    val arguments = Seq((keyType, false), (leftValueType, true), (rightValueType, true))
    +    copy(function = f(function, arguments))
    +  }
    +
    +  override def eval(input: InternalRow): Any = {
    +    val value1 = left.eval(input)
    +    if (value1 == null) {
    +      null
    +    } else {
    +      val value2 = right.eval(input)
    +      if (value2 == null) {
    +        null
    +      } else {
    +        nullSafeEval(input, value1, value2)
    +      }
    +    }
    +  }
    +
    +  @transient lazy val LambdaFunction(_, Seq(
    +    keyVar: NamedLambdaVariable,
    +    value1Var: NamedLambdaVariable,
    +    value2Var: NamedLambdaVariable),
    +    _) = function
    +
    +  private def keyTypeSupportsEquals = keyType match {
    +    case BinaryType => false
    +    case _: AtomicType => true
    +    case _ => false
    +  }
    +
    +  @transient private lazy val getKeysWithValueIndexes:
    +      (ArrayData, ArrayData) => Seq[(Any, Array[Option[Int]])] = {
    +    if (keyTypeSupportsEquals) {
    +      getKeysWithIndexesFast
    +    } else {
    +      getKeysWithIndexesBruteForce
    +    }
    +  }
    +
    +  private def assertSizeOfArrayBuffer(size: Int): Unit = {
    +    if (size > ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH) {
    +      throw new RuntimeException(s"Unsuccessful try to zip maps with $size " +
    +        s"unique keys due to exceeding the array size limit " +
    +        s"${ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH}.")
    +    }
    +  }
    +
    +  private def getKeysWithIndexesFast(keys1: ArrayData, keys2: ArrayData) = {
    +    val arrayBuffer = new mutable.ArrayBuffer[(Any, Array[Option[Int]])]
    +    val hashMap = new mutable.OpenHashMap[Any, Array[Option[Int]]]
    +    val keys = Array(keys1, keys2)
    +    var z = 0
    +    while(z < 2) {
    +      var i = 0
    +      val array = keys(z)
    +      while (i < array.numElements()) {
    +        val key = array.get(i, keyType)
    +        hashMap.get(key) match {
    +          case Some(indexes) =>
    +            if (indexes(z).isEmpty) indexes(z) = Some(i)
    +          case None =>
    +            assertSizeOfArrayBuffer(arrayBuffer.size)
    +            val indexes = Array[Option[Int]](None, None)
    +            indexes(z) = Some(i)
    +            hashMap.put(key, indexes)
    +            arrayBuffer += Tuple2(key, indexes)
    +        }
    +        i += 1
    +      }
    +      z += 1
    +    }
    +    arrayBuffer
    +  }
    +
    +  private def getKeysWithIndexesBruteForce(keys1: ArrayData, keys2: ArrayData) = {
    +    val arrayBuffer = new mutable.ArrayBuffer[(Any, Array[Option[Int]])]
    +    val keys = Array(keys1, keys2)
    +    var z = 0
    +    while(z < 2) {
    +      var i = 0
    +      val array = keys(z)
    +      while (i < array.numElements()) {
    +        val key = array.get(i, keyType)
    +        var found = false
    +        var j = 0
    +        while (!found && j < arrayBuffer.size) {
    +          val (bufferKey, indexes) = arrayBuffer(j)
    +          if (ordering.equiv(bufferKey, key)) {
    +            found = true
    +            if(indexes(z).isEmpty) indexes(z) = Some(i)
    +          }
    +          j += 1
    +        }
    +        if (!found) {
    +          assertSizeOfArrayBuffer(arrayBuffer.size)
    --- End diff --
    
    The purpose of this line is to avoid ```OutOfMemoryError``` exception when max array size is exceeded and throw something more accurate. Maybe I'm missing something, but wouldn't we break it we checked this only once at the end? The max size could be exceeded in any iteration.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22017: [SPARK-23938][SQL] Add map_zip_with function

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22017
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22017: [SPARK-23938][SQL] Add map_zip_with function

Posted by mgaido91 <gi...@git.apache.org>.
Github user mgaido91 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22017#discussion_r208635800
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala ---
    @@ -442,3 +442,191 @@ case class ArrayAggregate(
     
       override def prettyName: String = "aggregate"
     }
    +
    +/**
    + * Merges two given maps into a single map by applying function to the pair of values with
    + * the same key.
    + */
    +@ExpressionDescription(
    +  usage =
    +    """
    +      _FUNC_(map1, map2, function) - Merges two given maps into a single map by applying
    +      function to the pair of values with the same key. For keys only presented in one map,
    +      NULL will be passed as the value for the missing key. If an input map contains duplicated
    +      keys, only the first entry of the duplicated key is passed into the lambda function.
    +    """,
    +  examples = """
    +    Examples:
    +      > SELECT _FUNC_(map(1, 'a', 2, 'b'), map(1, 'x', 2, 'y'), (k, v1, v2) -> concat(v1, v2));
    +       {1:"ax",2:"by"}
    +  """,
    +  since = "2.4.0")
    +case class MapZipWith(left: Expression, right: Expression, function: Expression)
    +  extends HigherOrderFunction with CodegenFallback {
    +
    +  @transient lazy val functionForEval: Expression = functionsForEval.head
    +
    +  @transient lazy val (keyType, leftValueType, _) =
    +    HigherOrderFunction.mapKeyValueArgumentType(left.dataType)
    +
    +  @transient lazy val (_, rightValueType, _) =
    +    HigherOrderFunction.mapKeyValueArgumentType(right.dataType)
    +
    +  @transient lazy val ordering = TypeUtils.getInterpretedOrdering(keyType)
    +
    +  override def inputs: Seq[Expression] = left :: right :: Nil
    +
    +  override def functions: Seq[Expression] = function :: Nil
    +
    +  override def nullable: Boolean = left.nullable || right.nullable
    +
    +  override def dataType: DataType = MapType(keyType, function.dataType, function.nullable)
    +
    +  override def checkInputDataTypes(): TypeCheckResult = {
    +    (left.dataType, right.dataType) match {
    +      case (MapType(k1, _, _), MapType(k2, _, _)) if k1.sameType(k2) =>
    +        TypeUtils.checkForOrderingExpr(k1, s"function $prettyName")
    +      case _ => TypeCheckResult.TypeCheckFailure(s"The input to function $prettyName should have " +
    +        s"been two ${MapType.simpleString}s with the same key type, but it's " +
    +        s"[${left.dataType.catalogString}, ${right.dataType.catalogString}].")
    +    }
    +  }
    +
    +  override def bind(f: (Expression, Seq[(DataType, Boolean)]) => LambdaFunction): MapZipWith = {
    +    val arguments = Seq((keyType, false), (leftValueType, true), (rightValueType, true))
    +    copy(function = f(function, arguments))
    +  }
    +
    +  override def eval(input: InternalRow): Any = {
    +    val value1 = left.eval(input)
    +    if (value1 == null) {
    +      null
    +    } else {
    +      val value2 = right.eval(input)
    +      if (value2 == null) {
    +        null
    +      } else {
    +        nullSafeEval(input, value1, value2)
    +      }
    +    }
    +  }
    +
    +  @transient lazy val LambdaFunction(_, Seq(
    +    keyVar: NamedLambdaVariable,
    +    value1Var: NamedLambdaVariable,
    +    value2Var: NamedLambdaVariable),
    +    _) = function
    +
    +  private def keyTypeSupportsEquals = keyType match {
    +    case BinaryType => false
    +    case _: AtomicType => true
    +    case _ => false
    +  }
    +
    +  @transient private lazy val getKeysWithValueIndexes:
    +      (ArrayData, ArrayData) => Seq[(Any, Array[Option[Int]])] = {
    +    if (keyTypeSupportsEquals) {
    +      getKeysWithIndexesFast
    +    } else {
    +      getKeysWithIndexesBruteForce
    +    }
    +  }
    +
    +  private def assertSizeOfArrayBuffer(size: Int): Unit = {
    +    if (size > ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH) {
    +      throw new RuntimeException(s"Unsuccessful try to zip maps with $size " +
    +        s"unique keys due to exceeding the array size limit " +
    +        s"${ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH}.")
    +    }
    +  }
    +
    +  private def getKeysWithIndexesFast(keys1: ArrayData, keys2: ArrayData) = {
    +    val arrayBuffer = new mutable.ArrayBuffer[(Any, Array[Option[Int]])]
    --- End diff --
    
    That is a problem anyway also with the current implementation, as you would pick only one value in that case. The same if you add it without checking.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22017: [SPARK-23938][SQL] Add map_zip_with function

Posted by mn-mikke <gi...@git.apache.org>.
Github user mn-mikke commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22017#discussion_r209188342
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala ---
    @@ -442,3 +442,186 @@ case class ArrayAggregate(
     
       override def prettyName: String = "aggregate"
     }
    +
    +/**
    + * Merges two given maps into a single map by applying function to the pair of values with
    + * the same key.
    + */
    +@ExpressionDescription(
    +  usage =
    +    """
    +      _FUNC_(map1, map2, function) - Merges two given maps into a single map by applying
    +      function to the pair of values with the same key. For keys only presented in one map,
    +      NULL will be passed as the value for the missing key. If an input map contains duplicated
    +      keys, only the first entry of the duplicated key is passed into the lambda function.
    +    """,
    +  examples = """
    +    Examples:
    +      > SELECT _FUNC_(map(1, 'a', 2, 'b'), map(1, 'x', 2, 'y'), (k, v1, v2) -> concat(v1, v2));
    +       {1:"ax",2:"by"}
    +  """,
    +  since = "2.4.0")
    +case class MapZipWith(left: Expression, right: Expression, function: Expression)
    +  extends HigherOrderFunction with CodegenFallback {
    +
    +  @transient lazy val functionForEval: Expression = functionsForEval.head
    +
    +  @transient lazy val (leftKeyType, leftValueType, leftValueContainsNull) =
    +    HigherOrderFunction.mapKeyValueArgumentType(left.dataType)
    +
    +  @transient lazy val (rightKeyType, rightValueType, rightValueContainsNull) =
    +    HigherOrderFunction.mapKeyValueArgumentType(right.dataType)
    +
    +  @transient lazy val keyType =
    +    TypeCoercion.findTightestCommonType(leftKeyType, rightKeyType).getOrElse(NullType)
    --- End diff --
    
    Even though there is a coercion rule for unification of key types. The key types may differ in nullability flags if they are complex. In theory, we could use ```==``` and ```findTightestCommonType``` in the coercion rule  since there is no codegen to be optimized for ```null``` checks. But unfortunatelly, ```bind``` gets called once before execution of coercion rules, so ```findTightestCommonType``` is important for setting up a correct input type for lamda function.
    
    Maybe, we could play with order of analysis rules, but I'm not sure about all the consequences. @ueshin could shad some light on analysis rules ordering?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22017: [SPARK-23938][SQL] Add map_zip_with function

Posted by mgaido91 <gi...@git.apache.org>.
Github user mgaido91 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22017#discussion_r208507568
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala ---
    @@ -442,3 +442,191 @@ case class ArrayAggregate(
     
       override def prettyName: String = "aggregate"
     }
    +
    +/**
    + * Merges two given maps into a single map by applying function to the pair of values with
    + * the same key.
    + */
    +@ExpressionDescription(
    +  usage =
    +    """
    +      _FUNC_(map1, map2, function) - Merges two given maps into a single map by applying
    +      function to the pair of values with the same key. For keys only presented in one map,
    +      NULL will be passed as the value for the missing key. If an input map contains duplicated
    +      keys, only the first entry of the duplicated key is passed into the lambda function.
    +    """,
    +  examples = """
    +    Examples:
    +      > SELECT _FUNC_(map(1, 'a', 2, 'b'), map(1, 'x', 2, 'y'), (k, v1, v2) -> concat(v1, v2));
    +       {1:"ax",2:"by"}
    +  """,
    +  since = "2.4.0")
    +case class MapZipWith(left: Expression, right: Expression, function: Expression)
    +  extends HigherOrderFunction with CodegenFallback {
    +
    +  @transient lazy val functionForEval: Expression = functionsForEval.head
    +
    +  @transient lazy val (keyType, leftValueType, _) =
    +    HigherOrderFunction.mapKeyValueArgumentType(left.dataType)
    +
    +  @transient lazy val (_, rightValueType, _) =
    +    HigherOrderFunction.mapKeyValueArgumentType(right.dataType)
    +
    +  @transient lazy val ordering = TypeUtils.getInterpretedOrdering(keyType)
    +
    +  override def inputs: Seq[Expression] = left :: right :: Nil
    +
    +  override def functions: Seq[Expression] = function :: Nil
    +
    +  override def nullable: Boolean = left.nullable || right.nullable
    +
    +  override def dataType: DataType = MapType(keyType, function.dataType, function.nullable)
    +
    +  override def checkInputDataTypes(): TypeCheckResult = {
    +    (left.dataType, right.dataType) match {
    +      case (MapType(k1, _, _), MapType(k2, _, _)) if k1.sameType(k2) =>
    +        TypeUtils.checkForOrderingExpr(k1, s"function $prettyName")
    +      case _ => TypeCheckResult.TypeCheckFailure(s"The input to function $prettyName should have " +
    +        s"been two ${MapType.simpleString}s with the same key type, but it's " +
    +        s"[${left.dataType.catalogString}, ${right.dataType.catalogString}].")
    +    }
    +  }
    +
    +  override def bind(f: (Expression, Seq[(DataType, Boolean)]) => LambdaFunction): MapZipWith = {
    +    val arguments = Seq((keyType, false), (leftValueType, true), (rightValueType, true))
    +    copy(function = f(function, arguments))
    +  }
    +
    +  override def eval(input: InternalRow): Any = {
    +    val value1 = left.eval(input)
    +    if (value1 == null) {
    +      null
    +    } else {
    +      val value2 = right.eval(input)
    +      if (value2 == null) {
    +        null
    +      } else {
    +        nullSafeEval(input, value1, value2)
    +      }
    +    }
    +  }
    +
    +  @transient lazy val LambdaFunction(_, Seq(
    +    keyVar: NamedLambdaVariable,
    +    value1Var: NamedLambdaVariable,
    +    value2Var: NamedLambdaVariable),
    +    _) = function
    +
    +  private def keyTypeSupportsEquals = keyType match {
    +    case BinaryType => false
    +    case _: AtomicType => true
    +    case _ => false
    +  }
    +
    +  @transient private lazy val getKeysWithValueIndexes:
    +      (ArrayData, ArrayData) => Seq[(Any, Array[Option[Int]])] = {
    +    if (keyTypeSupportsEquals) {
    +      getKeysWithIndexesFast
    +    } else {
    +      getKeysWithIndexesBruteForce
    +    }
    +  }
    +
    +  private def assertSizeOfArrayBuffer(size: Int): Unit = {
    +    if (size > ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH) {
    +      throw new RuntimeException(s"Unsuccessful try to zip maps with $size " +
    +        s"unique keys due to exceeding the array size limit " +
    +        s"${ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH}.")
    +    }
    +  }
    +
    +  private def getKeysWithIndexesFast(keys1: ArrayData, keys2: ArrayData) = {
    +    val arrayBuffer = new mutable.ArrayBuffer[(Any, Array[Option[Int]])]
    +    val hashMap = new mutable.OpenHashMap[Any, Array[Option[Int]]]
    +    val keys = Array(keys1, keys2)
    +    var z = 0
    +    while(z < 2) {
    +      var i = 0
    +      val array = keys(z)
    +      while (i < array.numElements()) {
    +        val key = array.get(i, keyType)
    +        hashMap.get(key) match {
    +          case Some(indexes) =>
    +            if (indexes(z).isEmpty) indexes(z) = Some(i)
    +          case None =>
    +            assertSizeOfArrayBuffer(arrayBuffer.size)
    +            val indexes = Array[Option[Int]](None, None)
    +            indexes(z) = Some(i)
    +            hashMap.put(key, indexes)
    +            arrayBuffer += Tuple2(key, indexes)
    +        }
    +        i += 1
    +      }
    +      z += 1
    +    }
    +    arrayBuffer
    +  }
    +
    +  private def getKeysWithIndexesBruteForce(keys1: ArrayData, keys2: ArrayData) = {
    +    val arrayBuffer = new mutable.ArrayBuffer[(Any, Array[Option[Int]])]
    +    val keys = Array(keys1, keys2)
    +    var z = 0
    +    while(z < 2) {
    +      var i = 0
    +      val array = keys(z)
    +      while (i < array.numElements()) {
    +        val key = array.get(i, keyType)
    +        var found = false
    +        var j = 0
    +        while (!found && j < arrayBuffer.size) {
    +          val (bufferKey, indexes) = arrayBuffer(j)
    +          if (ordering.equiv(bufferKey, key)) {
    +            found = true
    +            if(indexes(z).isEmpty) indexes(z) = Some(i)
    +          }
    +          j += 1
    +        }
    +        if (!found) {
    +          assertSizeOfArrayBuffer(arrayBuffer.size)
    +          val indexes = Array[Option[Int]](None, None)
    +          indexes(z) = Some(i)
    +          arrayBuffer += Tuple2(key, indexes)
    +        }
    +        i += 1
    +      }
    +      z += 1
    +    }
    +    arrayBuffer
    +  }
    +
    +  private def getValue(valueData: ArrayData, eType: DataType, index: Option[Int]) = index match {
    --- End diff --
    
    do we really need this? It can be `index.map(valueData.get(_, eType)).getOrElse(null)` and we are using it only in one place (twice, but in the same place)...


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22017: [SPARK-23938][SQL] Add map_zip_with function

Posted by ueshin <gi...@git.apache.org>.
Github user ueshin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22017#discussion_r209536685
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala ---
    @@ -442,3 +442,186 @@ case class ArrayAggregate(
     
       override def prettyName: String = "aggregate"
     }
    +
    +/**
    + * Merges two given maps into a single map by applying function to the pair of values with
    + * the same key.
    + */
    +@ExpressionDescription(
    +  usage =
    +    """
    +      _FUNC_(map1, map2, function) - Merges two given maps into a single map by applying
    +      function to the pair of values with the same key. For keys only presented in one map,
    +      NULL will be passed as the value for the missing key. If an input map contains duplicated
    +      keys, only the first entry of the duplicated key is passed into the lambda function.
    +    """,
    +  examples = """
    +    Examples:
    +      > SELECT _FUNC_(map(1, 'a', 2, 'b'), map(1, 'x', 2, 'y'), (k, v1, v2) -> concat(v1, v2));
    +       {1:"ax",2:"by"}
    +  """,
    +  since = "2.4.0")
    +case class MapZipWith(left: Expression, right: Expression, function: Expression)
    +  extends HigherOrderFunction with CodegenFallback {
    +
    +  @transient lazy val functionForEval: Expression = functionsForEval.head
    +
    +  @transient lazy val (leftKeyType, leftValueType, leftValueContainsNull) =
    +    HigherOrderFunction.mapKeyValueArgumentType(left.dataType)
    +
    +  @transient lazy val (rightKeyType, rightValueType, rightValueContainsNull) =
    +    HigherOrderFunction.mapKeyValueArgumentType(right.dataType)
    +
    +  @transient lazy val keyType =
    +    TypeCoercion.findTightestCommonType(leftKeyType, rightKeyType).getOrElse(NullType)
    --- End diff --
    
    Good catch! Year, we need it.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22017: [SPARK-23938][SQL] Add map_zip_with function

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22017
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/94393/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22017: [SPARK-23938][SQL] Add map_zip_with function

Posted by mn-mikke <gi...@git.apache.org>.
Github user mn-mikke commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22017#discussion_r208559241
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala ---
    @@ -442,3 +442,191 @@ case class ArrayAggregate(
     
       override def prettyName: String = "aggregate"
     }
    +
    +/**
    + * Merges two given maps into a single map by applying function to the pair of values with
    + * the same key.
    + */
    +@ExpressionDescription(
    +  usage =
    +    """
    +      _FUNC_(map1, map2, function) - Merges two given maps into a single map by applying
    +      function to the pair of values with the same key. For keys only presented in one map,
    +      NULL will be passed as the value for the missing key. If an input map contains duplicated
    +      keys, only the first entry of the duplicated key is passed into the lambda function.
    +    """,
    +  examples = """
    +    Examples:
    +      > SELECT _FUNC_(map(1, 'a', 2, 'b'), map(1, 'x', 2, 'y'), (k, v1, v2) -> concat(v1, v2));
    +       {1:"ax",2:"by"}
    +  """,
    +  since = "2.4.0")
    +case class MapZipWith(left: Expression, right: Expression, function: Expression)
    +  extends HigherOrderFunction with CodegenFallback {
    +
    +  @transient lazy val functionForEval: Expression = functionsForEval.head
    +
    +  @transient lazy val (keyType, leftValueType, _) =
    +    HigherOrderFunction.mapKeyValueArgumentType(left.dataType)
    +
    +  @transient lazy val (_, rightValueType, _) =
    +    HigherOrderFunction.mapKeyValueArgumentType(right.dataType)
    +
    +  @transient lazy val ordering = TypeUtils.getInterpretedOrdering(keyType)
    +
    +  override def inputs: Seq[Expression] = left :: right :: Nil
    +
    +  override def functions: Seq[Expression] = function :: Nil
    +
    +  override def nullable: Boolean = left.nullable || right.nullable
    +
    +  override def dataType: DataType = MapType(keyType, function.dataType, function.nullable)
    +
    +  override def checkInputDataTypes(): TypeCheckResult = {
    +    (left.dataType, right.dataType) match {
    +      case (MapType(k1, _, _), MapType(k2, _, _)) if k1.sameType(k2) =>
    +        TypeUtils.checkForOrderingExpr(k1, s"function $prettyName")
    +      case _ => TypeCheckResult.TypeCheckFailure(s"The input to function $prettyName should have " +
    +        s"been two ${MapType.simpleString}s with the same key type, but it's " +
    +        s"[${left.dataType.catalogString}, ${right.dataType.catalogString}].")
    +    }
    +  }
    +
    +  override def bind(f: (Expression, Seq[(DataType, Boolean)]) => LambdaFunction): MapZipWith = {
    +    val arguments = Seq((keyType, false), (leftValueType, true), (rightValueType, true))
    +    copy(function = f(function, arguments))
    +  }
    +
    +  override def eval(input: InternalRow): Any = {
    +    val value1 = left.eval(input)
    +    if (value1 == null) {
    +      null
    +    } else {
    +      val value2 = right.eval(input)
    +      if (value2 == null) {
    +        null
    +      } else {
    +        nullSafeEval(input, value1, value2)
    +      }
    +    }
    +  }
    +
    +  @transient lazy val LambdaFunction(_, Seq(
    +    keyVar: NamedLambdaVariable,
    +    value1Var: NamedLambdaVariable,
    +    value2Var: NamedLambdaVariable),
    +    _) = function
    +
    +  private def keyTypeSupportsEquals = keyType match {
    +    case BinaryType => false
    +    case _: AtomicType => true
    +    case _ => false
    +  }
    +
    +  @transient private lazy val getKeysWithValueIndexes:
    +      (ArrayData, ArrayData) => Seq[(Any, Array[Option[Int]])] = {
    +    if (keyTypeSupportsEquals) {
    +      getKeysWithIndexesFast
    +    } else {
    +      getKeysWithIndexesBruteForce
    +    }
    +  }
    +
    +  private def assertSizeOfArrayBuffer(size: Int): Unit = {
    +    if (size > ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH) {
    +      throw new RuntimeException(s"Unsuccessful try to zip maps with $size " +
    +        s"unique keys due to exceeding the array size limit " +
    +        s"${ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH}.")
    +    }
    +  }
    +
    +  private def getKeysWithIndexesFast(keys1: ArrayData, keys2: ArrayData) = {
    +    val arrayBuffer = new mutable.ArrayBuffer[(Any, Array[Option[Int]])]
    --- End diff --
    
    Yeah, but my point is how to crate a new tuple from a old one without using ```_1```, ```_2```?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22017: [SPARK-23938][SQL] Add map_zip_with function

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22017
  
    Merged build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22017: [SPARK-23938][SQL] Add map_zip_with function

Posted by mn-mikke <gi...@git.apache.org>.
Github user mn-mikke commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22017#discussion_r208280351
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala ---
    @@ -365,3 +364,101 @@ case class ArrayAggregate(
     
       override def prettyName: String = "aggregate"
     }
    +
    +/**
    + * Merges two given maps into a single map by applying function to the pair of values with
    + * the same key.
    + */
    +@ExpressionDescription(
    +  usage =
    +    """
    +      _FUNC_(map1, map2, function) - Merges two given maps into a single map by applying
    +      function to the pair of values with the same key. For keys only presented in one map,
    +      NULL will be passed as the value for the missing key. If an input map contains duplicated
    +      keys, only the first entry of the duplicated key is passed into the lambda function.
    +    """,
    +  examples = """
    +    Examples:
    +      > SELECT _FUNC_(map(1, 'a', 2, 'b'), map(1, 'x', 2, 'y'), (k, v1, v2) -> concat(v1, v2));
    +       {1:"ax",2:"by"}
    +  """,
    +  since = "2.4.0")
    +case class MapZipWith(left: Expression, right: Expression, function: Expression)
    +  extends HigherOrderFunction with CodegenFallback {
    +
    +  @transient lazy val functionForEval: Expression = functionsForEval.head
    +
    +  @transient lazy val MapType(keyType, leftValueType, _) = getMapType(left)
    +
    +  @transient lazy val MapType(_, rightValueType, _) = getMapType(right)
    +
    +  @transient lazy val arrayDataUnion = new ArrayDataUnion(keyType)
    +
    +  @transient lazy val ordering = TypeUtils.getInterpretedOrdering(keyType)
    +
    +  override def inputs: Seq[Expression] = left :: right :: Nil
    +
    +  override def functions: Seq[Expression] = function :: Nil
    +
    +  override def nullable: Boolean = left.nullable || right.nullable
    --- End diff --
    
    ```nullable``` flag is rather related to the cases when the whole map is ```null```. The case that you are referring to is handled by ```valueContainsNull``` flag of ```MapType``` (see the line [423](https://github.com/apache/spark/pull/22017/files/ec583eb29ba6fdb79d0b85cbecb3f709e6648b25#diff-ef52827ed9b41efc1fbd056a06ef7c6aR423)).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22017: [SPARK-23938][SQL] Add map_zip_with function

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22017
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22017: [SPARK-23938][SQL] Add map_zip_with function

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22017
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/94394/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22017: [SPARK-23938][SQL] Add map_zip_with function

Posted by mn-mikke <gi...@git.apache.org>.
Github user mn-mikke commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22017#discussion_r208550479
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala ---
    @@ -442,3 +442,191 @@ case class ArrayAggregate(
     
       override def prettyName: String = "aggregate"
     }
    +
    +/**
    + * Merges two given maps into a single map by applying function to the pair of values with
    + * the same key.
    + */
    +@ExpressionDescription(
    +  usage =
    +    """
    +      _FUNC_(map1, map2, function) - Merges two given maps into a single map by applying
    +      function to the pair of values with the same key. For keys only presented in one map,
    +      NULL will be passed as the value for the missing key. If an input map contains duplicated
    +      keys, only the first entry of the duplicated key is passed into the lambda function.
    +    """,
    +  examples = """
    +    Examples:
    +      > SELECT _FUNC_(map(1, 'a', 2, 'b'), map(1, 'x', 2, 'y'), (k, v1, v2) -> concat(v1, v2));
    +       {1:"ax",2:"by"}
    +  """,
    +  since = "2.4.0")
    +case class MapZipWith(left: Expression, right: Expression, function: Expression)
    +  extends HigherOrderFunction with CodegenFallback {
    +
    +  @transient lazy val functionForEval: Expression = functionsForEval.head
    +
    +  @transient lazy val (keyType, leftValueType, _) =
    +    HigherOrderFunction.mapKeyValueArgumentType(left.dataType)
    +
    +  @transient lazy val (_, rightValueType, _) =
    +    HigherOrderFunction.mapKeyValueArgumentType(right.dataType)
    +
    +  @transient lazy val ordering = TypeUtils.getInterpretedOrdering(keyType)
    +
    +  override def inputs: Seq[Expression] = left :: right :: Nil
    +
    +  override def functions: Seq[Expression] = function :: Nil
    +
    +  override def nullable: Boolean = left.nullable || right.nullable
    +
    +  override def dataType: DataType = MapType(keyType, function.dataType, function.nullable)
    +
    +  override def checkInputDataTypes(): TypeCheckResult = {
    +    (left.dataType, right.dataType) match {
    +      case (MapType(k1, _, _), MapType(k2, _, _)) if k1.sameType(k2) =>
    +        TypeUtils.checkForOrderingExpr(k1, s"function $prettyName")
    +      case _ => TypeCheckResult.TypeCheckFailure(s"The input to function $prettyName should have " +
    +        s"been two ${MapType.simpleString}s with the same key type, but it's " +
    +        s"[${left.dataType.catalogString}, ${right.dataType.catalogString}].")
    +    }
    +  }
    +
    +  override def bind(f: (Expression, Seq[(DataType, Boolean)]) => LambdaFunction): MapZipWith = {
    +    val arguments = Seq((keyType, false), (leftValueType, true), (rightValueType, true))
    +    copy(function = f(function, arguments))
    +  }
    +
    +  override def eval(input: InternalRow): Any = {
    +    val value1 = left.eval(input)
    +    if (value1 == null) {
    +      null
    +    } else {
    +      val value2 = right.eval(input)
    +      if (value2 == null) {
    +        null
    +      } else {
    +        nullSafeEval(input, value1, value2)
    +      }
    +    }
    +  }
    +
    +  @transient lazy val LambdaFunction(_, Seq(
    +    keyVar: NamedLambdaVariable,
    +    value1Var: NamedLambdaVariable,
    +    value2Var: NamedLambdaVariable),
    +    _) = function
    +
    +  private def keyTypeSupportsEquals = keyType match {
    +    case BinaryType => false
    +    case _: AtomicType => true
    +    case _ => false
    +  }
    +
    +  @transient private lazy val getKeysWithValueIndexes:
    +      (ArrayData, ArrayData) => Seq[(Any, Array[Option[Int]])] = {
    +    if (keyTypeSupportsEquals) {
    +      getKeysWithIndexesFast
    +    } else {
    +      getKeysWithIndexesBruteForce
    +    }
    +  }
    +
    +  private def assertSizeOfArrayBuffer(size: Int): Unit = {
    +    if (size > ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH) {
    +      throw new RuntimeException(s"Unsuccessful try to zip maps with $size " +
    +        s"unique keys due to exceeding the array size limit " +
    +        s"${ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH}.")
    +    }
    +  }
    +
    +  private def getKeysWithIndexesFast(keys1: ArrayData, keys2: ArrayData) = {
    +    val arrayBuffer = new mutable.ArrayBuffer[(Any, Array[Option[Int]])]
    --- End diff --
    
    > I really don't think so, it would be the same as now I think
    
    Let's assume that ```indexes``` are tuple for now. ```indexes(z).isEmpty``` could replace with ```indexes.productElement(z).isEmpty```, but how to replace ```indexes(z) = Some(i)```? Since tuple is immutable, I don't see how to replace ith element with ```copy``` function. Maybe we could implement a dedicated class to hold indexes, but is it worth doing that?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22017: [SPARK-23938][SQL] Add map_zip_with function

Posted by mgaido91 <gi...@git.apache.org>.
Github user mgaido91 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22017#discussion_r208695254
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala ---
    @@ -442,3 +442,191 @@ case class ArrayAggregate(
     
       override def prettyName: String = "aggregate"
     }
    +
    +/**
    + * Merges two given maps into a single map by applying function to the pair of values with
    + * the same key.
    + */
    +@ExpressionDescription(
    +  usage =
    +    """
    +      _FUNC_(map1, map2, function) - Merges two given maps into a single map by applying
    +      function to the pair of values with the same key. For keys only presented in one map,
    +      NULL will be passed as the value for the missing key. If an input map contains duplicated
    +      keys, only the first entry of the duplicated key is passed into the lambda function.
    +    """,
    +  examples = """
    +    Examples:
    +      > SELECT _FUNC_(map(1, 'a', 2, 'b'), map(1, 'x', 2, 'y'), (k, v1, v2) -> concat(v1, v2));
    +       {1:"ax",2:"by"}
    +  """,
    +  since = "2.4.0")
    +case class MapZipWith(left: Expression, right: Expression, function: Expression)
    +  extends HigherOrderFunction with CodegenFallback {
    +
    +  @transient lazy val functionForEval: Expression = functionsForEval.head
    +
    +  @transient lazy val (keyType, leftValueType, _) =
    +    HigherOrderFunction.mapKeyValueArgumentType(left.dataType)
    +
    +  @transient lazy val (_, rightValueType, _) =
    +    HigherOrderFunction.mapKeyValueArgumentType(right.dataType)
    +
    +  @transient lazy val ordering = TypeUtils.getInterpretedOrdering(keyType)
    +
    +  override def inputs: Seq[Expression] = left :: right :: Nil
    +
    +  override def functions: Seq[Expression] = function :: Nil
    +
    +  override def nullable: Boolean = left.nullable || right.nullable
    +
    +  override def dataType: DataType = MapType(keyType, function.dataType, function.nullable)
    +
    +  override def checkInputDataTypes(): TypeCheckResult = {
    +    (left.dataType, right.dataType) match {
    +      case (MapType(k1, _, _), MapType(k2, _, _)) if k1.sameType(k2) =>
    +        TypeUtils.checkForOrderingExpr(k1, s"function $prettyName")
    +      case _ => TypeCheckResult.TypeCheckFailure(s"The input to function $prettyName should have " +
    +        s"been two ${MapType.simpleString}s with the same key type, but it's " +
    +        s"[${left.dataType.catalogString}, ${right.dataType.catalogString}].")
    +    }
    +  }
    +
    +  override def bind(f: (Expression, Seq[(DataType, Boolean)]) => LambdaFunction): MapZipWith = {
    +    val arguments = Seq((keyType, false), (leftValueType, true), (rightValueType, true))
    +    copy(function = f(function, arguments))
    +  }
    +
    +  override def eval(input: InternalRow): Any = {
    +    val value1 = left.eval(input)
    +    if (value1 == null) {
    +      null
    +    } else {
    +      val value2 = right.eval(input)
    +      if (value2 == null) {
    +        null
    +      } else {
    +        nullSafeEval(input, value1, value2)
    +      }
    +    }
    +  }
    +
    +  @transient lazy val LambdaFunction(_, Seq(
    +    keyVar: NamedLambdaVariable,
    +    value1Var: NamedLambdaVariable,
    +    value2Var: NamedLambdaVariable),
    +    _) = function
    +
    +  private def keyTypeSupportsEquals = keyType match {
    +    case BinaryType => false
    +    case _: AtomicType => true
    +    case _ => false
    +  }
    +
    +  @transient private lazy val getKeysWithValueIndexes:
    +      (ArrayData, ArrayData) => Seq[(Any, Array[Option[Int]])] = {
    +    if (keyTypeSupportsEquals) {
    +      getKeysWithIndexesFast
    +    } else {
    +      getKeysWithIndexesBruteForce
    +    }
    +  }
    +
    +  private def assertSizeOfArrayBuffer(size: Int): Unit = {
    +    if (size > ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH) {
    +      throw new RuntimeException(s"Unsuccessful try to zip maps with $size " +
    +        s"unique keys due to exceeding the array size limit " +
    +        s"${ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH}.")
    +    }
    +  }
    +
    +  private def getKeysWithIndexesFast(keys1: ArrayData, keys2: ArrayData) = {
    +    val arrayBuffer = new mutable.ArrayBuffer[(Any, Array[Option[Int]])]
    --- End diff --
    
    The array based solution is 20% faster indeed according to my benchmark, but I think it is not critical as I run the benchmark performing the operation 1.000.000 times and the absolute difference was 2 ms. So I prefer the cleaner solution (that is using tuples). @ueshin what do you think?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22017: [SPARK-23938][SQL] Add map_zip_with function

Posted by mgaido91 <gi...@git.apache.org>.
Github user mgaido91 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22017#discussion_r208602347
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala ---
    @@ -442,3 +442,191 @@ case class ArrayAggregate(
     
       override def prettyName: String = "aggregate"
     }
    +
    +/**
    + * Merges two given maps into a single map by applying function to the pair of values with
    + * the same key.
    + */
    +@ExpressionDescription(
    +  usage =
    +    """
    +      _FUNC_(map1, map2, function) - Merges two given maps into a single map by applying
    +      function to the pair of values with the same key. For keys only presented in one map,
    +      NULL will be passed as the value for the missing key. If an input map contains duplicated
    +      keys, only the first entry of the duplicated key is passed into the lambda function.
    +    """,
    +  examples = """
    +    Examples:
    +      > SELECT _FUNC_(map(1, 'a', 2, 'b'), map(1, 'x', 2, 'y'), (k, v1, v2) -> concat(v1, v2));
    +       {1:"ax",2:"by"}
    +  """,
    +  since = "2.4.0")
    +case class MapZipWith(left: Expression, right: Expression, function: Expression)
    +  extends HigherOrderFunction with CodegenFallback {
    +
    +  @transient lazy val functionForEval: Expression = functionsForEval.head
    +
    +  @transient lazy val (keyType, leftValueType, _) =
    +    HigherOrderFunction.mapKeyValueArgumentType(left.dataType)
    +
    +  @transient lazy val (_, rightValueType, _) =
    +    HigherOrderFunction.mapKeyValueArgumentType(right.dataType)
    +
    +  @transient lazy val ordering = TypeUtils.getInterpretedOrdering(keyType)
    +
    +  override def inputs: Seq[Expression] = left :: right :: Nil
    +
    +  override def functions: Seq[Expression] = function :: Nil
    +
    +  override def nullable: Boolean = left.nullable || right.nullable
    +
    +  override def dataType: DataType = MapType(keyType, function.dataType, function.nullable)
    +
    +  override def checkInputDataTypes(): TypeCheckResult = {
    +    (left.dataType, right.dataType) match {
    +      case (MapType(k1, _, _), MapType(k2, _, _)) if k1.sameType(k2) =>
    +        TypeUtils.checkForOrderingExpr(k1, s"function $prettyName")
    +      case _ => TypeCheckResult.TypeCheckFailure(s"The input to function $prettyName should have " +
    +        s"been two ${MapType.simpleString}s with the same key type, but it's " +
    +        s"[${left.dataType.catalogString}, ${right.dataType.catalogString}].")
    +    }
    +  }
    +
    +  override def bind(f: (Expression, Seq[(DataType, Boolean)]) => LambdaFunction): MapZipWith = {
    +    val arguments = Seq((keyType, false), (leftValueType, true), (rightValueType, true))
    +    copy(function = f(function, arguments))
    +  }
    +
    +  override def eval(input: InternalRow): Any = {
    +    val value1 = left.eval(input)
    +    if (value1 == null) {
    +      null
    +    } else {
    +      val value2 = right.eval(input)
    +      if (value2 == null) {
    +        null
    +      } else {
    +        nullSafeEval(input, value1, value2)
    +      }
    +    }
    +  }
    +
    +  @transient lazy val LambdaFunction(_, Seq(
    +    keyVar: NamedLambdaVariable,
    +    value1Var: NamedLambdaVariable,
    +    value2Var: NamedLambdaVariable),
    +    _) = function
    +
    +  private def keyTypeSupportsEquals = keyType match {
    +    case BinaryType => false
    +    case _: AtomicType => true
    +    case _ => false
    +  }
    +
    +  @transient private lazy val getKeysWithValueIndexes:
    +      (ArrayData, ArrayData) => Seq[(Any, Array[Option[Int]])] = {
    +    if (keyTypeSupportsEquals) {
    +      getKeysWithIndexesFast
    +    } else {
    +      getKeysWithIndexesBruteForce
    +    }
    +  }
    +
    +  private def assertSizeOfArrayBuffer(size: Int): Unit = {
    +    if (size > ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH) {
    +      throw new RuntimeException(s"Unsuccessful try to zip maps with $size " +
    +        s"unique keys due to exceeding the array size limit " +
    +        s"${ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH}.")
    +    }
    +  }
    +
    +  private def getKeysWithIndexesFast(keys1: ArrayData, keys2: ArrayData) = {
    +    val arrayBuffer = new mutable.ArrayBuffer[(Any, Array[Option[Int]])]
    --- End diff --
    
    you can change a bit approach from the current one. For the first array of keys, in particular, you don't need to check neither whether the key is there nor the size of the output array, you just need to add them. Then you can add the keys from the other one with the logic here.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22017: [SPARK-23938][SQL] Add map_zip_with function

Posted by ueshin <gi...@git.apache.org>.
Github user ueshin commented on the issue:

    https://github.com/apache/spark/pull/22017
  
    Thanks! merging to master.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22017: [SPARK-23938][SQL] Add map_zip_with function

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22017
  
    Merged build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22017: [SPARK-23938][SQL] Add map_zip_with function

Posted by mgaido91 <gi...@git.apache.org>.
Github user mgaido91 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22017#discussion_r208506639
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala ---
    @@ -442,3 +442,191 @@ case class ArrayAggregate(
     
       override def prettyName: String = "aggregate"
     }
    +
    +/**
    + * Merges two given maps into a single map by applying function to the pair of values with
    + * the same key.
    + */
    +@ExpressionDescription(
    +  usage =
    +    """
    +      _FUNC_(map1, map2, function) - Merges two given maps into a single map by applying
    +      function to the pair of values with the same key. For keys only presented in one map,
    +      NULL will be passed as the value for the missing key. If an input map contains duplicated
    +      keys, only the first entry of the duplicated key is passed into the lambda function.
    +    """,
    +  examples = """
    +    Examples:
    +      > SELECT _FUNC_(map(1, 'a', 2, 'b'), map(1, 'x', 2, 'y'), (k, v1, v2) -> concat(v1, v2));
    +       {1:"ax",2:"by"}
    +  """,
    +  since = "2.4.0")
    +case class MapZipWith(left: Expression, right: Expression, function: Expression)
    +  extends HigherOrderFunction with CodegenFallback {
    +
    +  @transient lazy val functionForEval: Expression = functionsForEval.head
    +
    +  @transient lazy val (keyType, leftValueType, _) =
    +    HigherOrderFunction.mapKeyValueArgumentType(left.dataType)
    +
    +  @transient lazy val (_, rightValueType, _) =
    +    HigherOrderFunction.mapKeyValueArgumentType(right.dataType)
    +
    +  @transient lazy val ordering = TypeUtils.getInterpretedOrdering(keyType)
    +
    +  override def inputs: Seq[Expression] = left :: right :: Nil
    +
    +  override def functions: Seq[Expression] = function :: Nil
    +
    +  override def nullable: Boolean = left.nullable || right.nullable
    +
    +  override def dataType: DataType = MapType(keyType, function.dataType, function.nullable)
    +
    +  override def checkInputDataTypes(): TypeCheckResult = {
    +    (left.dataType, right.dataType) match {
    +      case (MapType(k1, _, _), MapType(k2, _, _)) if k1.sameType(k2) =>
    +        TypeUtils.checkForOrderingExpr(k1, s"function $prettyName")
    +      case _ => TypeCheckResult.TypeCheckFailure(s"The input to function $prettyName should have " +
    +        s"been two ${MapType.simpleString}s with the same key type, but it's " +
    +        s"[${left.dataType.catalogString}, ${right.dataType.catalogString}].")
    +    }
    +  }
    +
    +  override def bind(f: (Expression, Seq[(DataType, Boolean)]) => LambdaFunction): MapZipWith = {
    +    val arguments = Seq((keyType, false), (leftValueType, true), (rightValueType, true))
    +    copy(function = f(function, arguments))
    +  }
    +
    +  override def eval(input: InternalRow): Any = {
    +    val value1 = left.eval(input)
    +    if (value1 == null) {
    +      null
    +    } else {
    +      val value2 = right.eval(input)
    +      if (value2 == null) {
    +        null
    +      } else {
    +        nullSafeEval(input, value1, value2)
    +      }
    +    }
    +  }
    +
    +  @transient lazy val LambdaFunction(_, Seq(
    +    keyVar: NamedLambdaVariable,
    +    value1Var: NamedLambdaVariable,
    +    value2Var: NamedLambdaVariable),
    +    _) = function
    +
    +  private def keyTypeSupportsEquals = keyType match {
    +    case BinaryType => false
    +    case _: AtomicType => true
    +    case _ => false
    +  }
    +
    +  @transient private lazy val getKeysWithValueIndexes:
    +      (ArrayData, ArrayData) => Seq[(Any, Array[Option[Int]])] = {
    +    if (keyTypeSupportsEquals) {
    +      getKeysWithIndexesFast
    +    } else {
    +      getKeysWithIndexesBruteForce
    +    }
    +  }
    +
    +  private def assertSizeOfArrayBuffer(size: Int): Unit = {
    +    if (size > ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH) {
    +      throw new RuntimeException(s"Unsuccessful try to zip maps with $size " +
    +        s"unique keys due to exceeding the array size limit " +
    +        s"${ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH}.")
    +    }
    +  }
    +
    +  private def getKeysWithIndexesFast(keys1: ArrayData, keys2: ArrayData) = {
    +    val arrayBuffer = new mutable.ArrayBuffer[(Any, Array[Option[Int]])]
    +    val hashMap = new mutable.OpenHashMap[Any, Array[Option[Int]]]
    +    val keys = Array(keys1, keys2)
    +    var z = 0
    +    while(z < 2) {
    +      var i = 0
    +      val array = keys(z)
    +      while (i < array.numElements()) {
    +        val key = array.get(i, keyType)
    +        hashMap.get(key) match {
    +          case Some(indexes) =>
    +            if (indexes(z).isEmpty) indexes(z) = Some(i)
    +          case None =>
    +            assertSizeOfArrayBuffer(arrayBuffer.size)
    +            val indexes = Array[Option[Int]](None, None)
    +            indexes(z) = Some(i)
    +            hashMap.put(key, indexes)
    +            arrayBuffer += Tuple2(key, indexes)
    +        }
    +        i += 1
    +      }
    +      z += 1
    +    }
    +    arrayBuffer
    +  }
    +
    +  private def getKeysWithIndexesBruteForce(keys1: ArrayData, keys2: ArrayData) = {
    +    val arrayBuffer = new mutable.ArrayBuffer[(Any, Array[Option[Int]])]
    +    val keys = Array(keys1, keys2)
    +    var z = 0
    +    while(z < 2) {
    +      var i = 0
    +      val array = keys(z)
    +      while (i < array.numElements()) {
    +        val key = array.get(i, keyType)
    +        var found = false
    +        var j = 0
    +        while (!found && j < arrayBuffer.size) {
    +          val (bufferKey, indexes) = arrayBuffer(j)
    +          if (ordering.equiv(bufferKey, key)) {
    +            found = true
    +            if(indexes(z).isEmpty) indexes(z) = Some(i)
    +          }
    +          j += 1
    +        }
    +        if (!found) {
    +          assertSizeOfArrayBuffer(arrayBuffer.size)
    --- End diff --
    
    shall we check this only once at the end in order to avoid the overhead at each iteration?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22017: [SPARK-23938][SQL] Add map_zip_with function

Posted by mn-mikke <gi...@git.apache.org>.
Github user mn-mikke commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22017#discussion_r208941728
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala ---
    @@ -442,3 +442,191 @@ case class ArrayAggregate(
     
       override def prettyName: String = "aggregate"
     }
    +
    +/**
    + * Merges two given maps into a single map by applying function to the pair of values with
    + * the same key.
    + */
    +@ExpressionDescription(
    +  usage =
    +    """
    +      _FUNC_(map1, map2, function) - Merges two given maps into a single map by applying
    +      function to the pair of values with the same key. For keys only presented in one map,
    +      NULL will be passed as the value for the missing key. If an input map contains duplicated
    +      keys, only the first entry of the duplicated key is passed into the lambda function.
    +    """,
    +  examples = """
    +    Examples:
    +      > SELECT _FUNC_(map(1, 'a', 2, 'b'), map(1, 'x', 2, 'y'), (k, v1, v2) -> concat(v1, v2));
    +       {1:"ax",2:"by"}
    +  """,
    +  since = "2.4.0")
    +case class MapZipWith(left: Expression, right: Expression, function: Expression)
    +  extends HigherOrderFunction with CodegenFallback {
    +
    +  @transient lazy val functionForEval: Expression = functionsForEval.head
    +
    +  @transient lazy val (keyType, leftValueType, _) =
    +    HigherOrderFunction.mapKeyValueArgumentType(left.dataType)
    +
    +  @transient lazy val (_, rightValueType, _) =
    +    HigherOrderFunction.mapKeyValueArgumentType(right.dataType)
    +
    +  @transient lazy val ordering = TypeUtils.getInterpretedOrdering(keyType)
    +
    +  override def inputs: Seq[Expression] = left :: right :: Nil
    +
    +  override def functions: Seq[Expression] = function :: Nil
    +
    +  override def nullable: Boolean = left.nullable || right.nullable
    +
    +  override def dataType: DataType = MapType(keyType, function.dataType, function.nullable)
    +
    +  override def checkInputDataTypes(): TypeCheckResult = {
    +    (left.dataType, right.dataType) match {
    +      case (MapType(k1, _, _), MapType(k2, _, _)) if k1.sameType(k2) =>
    +        TypeUtils.checkForOrderingExpr(k1, s"function $prettyName")
    +      case _ => TypeCheckResult.TypeCheckFailure(s"The input to function $prettyName should have " +
    +        s"been two ${MapType.simpleString}s with the same key type, but it's " +
    +        s"[${left.dataType.catalogString}, ${right.dataType.catalogString}].")
    +    }
    +  }
    +
    +  override def bind(f: (Expression, Seq[(DataType, Boolean)]) => LambdaFunction): MapZipWith = {
    +    val arguments = Seq((keyType, false), (leftValueType, true), (rightValueType, true))
    +    copy(function = f(function, arguments))
    +  }
    +
    +  override def eval(input: InternalRow): Any = {
    +    val value1 = left.eval(input)
    +    if (value1 == null) {
    +      null
    +    } else {
    +      val value2 = right.eval(input)
    +      if (value2 == null) {
    +        null
    +      } else {
    +        nullSafeEval(input, value1, value2)
    +      }
    +    }
    +  }
    +
    +  @transient lazy val LambdaFunction(_, Seq(
    +    keyVar: NamedLambdaVariable,
    +    value1Var: NamedLambdaVariable,
    +    value2Var: NamedLambdaVariable),
    +    _) = function
    +
    +  private def keyTypeSupportsEquals = keyType match {
    +    case BinaryType => false
    +    case _: AtomicType => true
    +    case _ => false
    +  }
    +
    +  @transient private lazy val getKeysWithValueIndexes:
    +      (ArrayData, ArrayData) => Seq[(Any, Array[Option[Int]])] = {
    +    if (keyTypeSupportsEquals) {
    +      getKeysWithIndexesFast
    +    } else {
    +      getKeysWithIndexesBruteForce
    +    }
    +  }
    +
    +  private def assertSizeOfArrayBuffer(size: Int): Unit = {
    +    if (size > ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH) {
    +      throw new RuntimeException(s"Unsuccessful try to zip maps with $size " +
    +        s"unique keys due to exceeding the array size limit " +
    +        s"${ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH}.")
    +    }
    +  }
    +
    +  private def getKeysWithIndexesFast(keys1: ArrayData, keys2: ArrayData) = {
    +    val arrayBuffer = new mutable.ArrayBuffer[(Any, Array[Option[Int]])]
    --- End diff --
    
    Like this idea, thanks!


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22017: [SPARK-23938][SQL] Add map_zip_with function

Posted by mn-mikke <gi...@git.apache.org>.
Github user mn-mikke commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22017#discussion_r208647186
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala ---
    @@ -442,3 +442,191 @@ case class ArrayAggregate(
     
       override def prettyName: String = "aggregate"
     }
    +
    +/**
    + * Merges two given maps into a single map by applying function to the pair of values with
    + * the same key.
    + */
    +@ExpressionDescription(
    +  usage =
    +    """
    +      _FUNC_(map1, map2, function) - Merges two given maps into a single map by applying
    +      function to the pair of values with the same key. For keys only presented in one map,
    +      NULL will be passed as the value for the missing key. If an input map contains duplicated
    +      keys, only the first entry of the duplicated key is passed into the lambda function.
    +    """,
    +  examples = """
    +    Examples:
    +      > SELECT _FUNC_(map(1, 'a', 2, 'b'), map(1, 'x', 2, 'y'), (k, v1, v2) -> concat(v1, v2));
    +       {1:"ax",2:"by"}
    +  """,
    +  since = "2.4.0")
    +case class MapZipWith(left: Expression, right: Expression, function: Expression)
    +  extends HigherOrderFunction with CodegenFallback {
    +
    +  @transient lazy val functionForEval: Expression = functionsForEval.head
    +
    +  @transient lazy val (keyType, leftValueType, _) =
    +    HigherOrderFunction.mapKeyValueArgumentType(left.dataType)
    +
    +  @transient lazy val (_, rightValueType, _) =
    +    HigherOrderFunction.mapKeyValueArgumentType(right.dataType)
    +
    +  @transient lazy val ordering = TypeUtils.getInterpretedOrdering(keyType)
    +
    +  override def inputs: Seq[Expression] = left :: right :: Nil
    +
    +  override def functions: Seq[Expression] = function :: Nil
    +
    +  override def nullable: Boolean = left.nullable || right.nullable
    +
    +  override def dataType: DataType = MapType(keyType, function.dataType, function.nullable)
    +
    +  override def checkInputDataTypes(): TypeCheckResult = {
    +    (left.dataType, right.dataType) match {
    +      case (MapType(k1, _, _), MapType(k2, _, _)) if k1.sameType(k2) =>
    +        TypeUtils.checkForOrderingExpr(k1, s"function $prettyName")
    +      case _ => TypeCheckResult.TypeCheckFailure(s"The input to function $prettyName should have " +
    +        s"been two ${MapType.simpleString}s with the same key type, but it's " +
    +        s"[${left.dataType.catalogString}, ${right.dataType.catalogString}].")
    +    }
    +  }
    +
    +  override def bind(f: (Expression, Seq[(DataType, Boolean)]) => LambdaFunction): MapZipWith = {
    +    val arguments = Seq((keyType, false), (leftValueType, true), (rightValueType, true))
    +    copy(function = f(function, arguments))
    +  }
    +
    +  override def eval(input: InternalRow): Any = {
    +    val value1 = left.eval(input)
    +    if (value1 == null) {
    +      null
    +    } else {
    +      val value2 = right.eval(input)
    +      if (value2 == null) {
    +        null
    +      } else {
    +        nullSafeEval(input, value1, value2)
    +      }
    +    }
    +  }
    +
    +  @transient lazy val LambdaFunction(_, Seq(
    +    keyVar: NamedLambdaVariable,
    +    value1Var: NamedLambdaVariable,
    +    value2Var: NamedLambdaVariable),
    +    _) = function
    +
    +  private def keyTypeSupportsEquals = keyType match {
    +    case BinaryType => false
    +    case _: AtomicType => true
    +    case _ => false
    +  }
    +
    +  @transient private lazy val getKeysWithValueIndexes:
    +      (ArrayData, ArrayData) => Seq[(Any, Array[Option[Int]])] = {
    +    if (keyTypeSupportsEquals) {
    +      getKeysWithIndexesFast
    +    } else {
    +      getKeysWithIndexesBruteForce
    +    }
    +  }
    +
    +  private def assertSizeOfArrayBuffer(size: Int): Unit = {
    +    if (size > ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH) {
    +      throw new RuntimeException(s"Unsuccessful try to zip maps with $size " +
    +        s"unique keys due to exceeding the array size limit " +
    +        s"${ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH}.")
    +    }
    +  }
    +
    +  private def getKeysWithIndexesFast(keys1: ArrayData, keys2: ArrayData) = {
    +    val arrayBuffer = new mutable.ArrayBuffer[(Any, Array[Option[Int]])]
    --- End diff --
    
    ```indexes(z).isEmpty``` ensures that we insert always the the first occurrence of the key, which follows behavior of ```GetMapValue```. If we didn't perform such a check the last occurrence would ended up in result.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22017: [SPARK-23938][SQL] Add map_zip_with function

Posted by mn-mikke <gi...@git.apache.org>.
Github user mn-mikke commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22017#discussion_r209515431
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala ---
    @@ -231,6 +231,15 @@ object TypeCoercion {
           })
       }
     
    +  /**
    +   * Similar to [[findTightestCommonType]] but with string promotion.
    +   */
    +  def findWiderTypeForTwoExceptDecimals(t1: DataType, t2: DataType): Option[DataType] = {
    --- End diff --
    
    Thanks for both your PRs! I will submit changes once they get in.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22017: [SPARK-23938][SQL] Add map_zip_with function

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22017
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/94508/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22017: [SPARK-23938][SQL] Add map_zip_with function

Posted by mgaido91 <gi...@git.apache.org>.
Github user mgaido91 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22017#discussion_r208538681
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala ---
    @@ -442,3 +442,191 @@ case class ArrayAggregate(
     
       override def prettyName: String = "aggregate"
     }
    +
    +/**
    + * Merges two given maps into a single map by applying function to the pair of values with
    + * the same key.
    + */
    +@ExpressionDescription(
    +  usage =
    +    """
    +      _FUNC_(map1, map2, function) - Merges two given maps into a single map by applying
    +      function to the pair of values with the same key. For keys only presented in one map,
    +      NULL will be passed as the value for the missing key. If an input map contains duplicated
    +      keys, only the first entry of the duplicated key is passed into the lambda function.
    +    """,
    +  examples = """
    +    Examples:
    +      > SELECT _FUNC_(map(1, 'a', 2, 'b'), map(1, 'x', 2, 'y'), (k, v1, v2) -> concat(v1, v2));
    +       {1:"ax",2:"by"}
    +  """,
    +  since = "2.4.0")
    +case class MapZipWith(left: Expression, right: Expression, function: Expression)
    +  extends HigherOrderFunction with CodegenFallback {
    +
    +  @transient lazy val functionForEval: Expression = functionsForEval.head
    +
    +  @transient lazy val (keyType, leftValueType, _) =
    +    HigherOrderFunction.mapKeyValueArgumentType(left.dataType)
    +
    +  @transient lazy val (_, rightValueType, _) =
    +    HigherOrderFunction.mapKeyValueArgumentType(right.dataType)
    +
    +  @transient lazy val ordering = TypeUtils.getInterpretedOrdering(keyType)
    +
    +  override def inputs: Seq[Expression] = left :: right :: Nil
    +
    +  override def functions: Seq[Expression] = function :: Nil
    +
    +  override def nullable: Boolean = left.nullable || right.nullable
    +
    +  override def dataType: DataType = MapType(keyType, function.dataType, function.nullable)
    +
    +  override def checkInputDataTypes(): TypeCheckResult = {
    +    (left.dataType, right.dataType) match {
    +      case (MapType(k1, _, _), MapType(k2, _, _)) if k1.sameType(k2) =>
    +        TypeUtils.checkForOrderingExpr(k1, s"function $prettyName")
    +      case _ => TypeCheckResult.TypeCheckFailure(s"The input to function $prettyName should have " +
    +        s"been two ${MapType.simpleString}s with the same key type, but it's " +
    +        s"[${left.dataType.catalogString}, ${right.dataType.catalogString}].")
    +    }
    +  }
    +
    +  override def bind(f: (Expression, Seq[(DataType, Boolean)]) => LambdaFunction): MapZipWith = {
    +    val arguments = Seq((keyType, false), (leftValueType, true), (rightValueType, true))
    +    copy(function = f(function, arguments))
    +  }
    +
    +  override def eval(input: InternalRow): Any = {
    +    val value1 = left.eval(input)
    +    if (value1 == null) {
    +      null
    +    } else {
    +      val value2 = right.eval(input)
    +      if (value2 == null) {
    +        null
    +      } else {
    +        nullSafeEval(input, value1, value2)
    +      }
    +    }
    +  }
    +
    +  @transient lazy val LambdaFunction(_, Seq(
    +    keyVar: NamedLambdaVariable,
    +    value1Var: NamedLambdaVariable,
    +    value2Var: NamedLambdaVariable),
    +    _) = function
    +
    +  private def keyTypeSupportsEquals = keyType match {
    +    case BinaryType => false
    +    case _: AtomicType => true
    +    case _ => false
    +  }
    +
    +  @transient private lazy val getKeysWithValueIndexes:
    +      (ArrayData, ArrayData) => Seq[(Any, Array[Option[Int]])] = {
    +    if (keyTypeSupportsEquals) {
    +      getKeysWithIndexesFast
    +    } else {
    +      getKeysWithIndexesBruteForce
    +    }
    +  }
    +
    +  private def assertSizeOfArrayBuffer(size: Int): Unit = {
    +    if (size > ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH) {
    +      throw new RuntimeException(s"Unsuccessful try to zip maps with $size " +
    +        s"unique keys due to exceeding the array size limit " +
    +        s"${ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH}.")
    +    }
    +  }
    +
    +  private def getKeysWithIndexesFast(keys1: ArrayData, keys2: ArrayData) = {
    +    val arrayBuffer = new mutable.ArrayBuffer[(Any, Array[Option[Int]])]
    +    val hashMap = new mutable.OpenHashMap[Any, Array[Option[Int]]]
    +    val keys = Array(keys1, keys2)
    +    var z = 0
    +    while(z < 2) {
    +      var i = 0
    +      val array = keys(z)
    +      while (i < array.numElements()) {
    +        val key = array.get(i, keyType)
    +        hashMap.get(key) match {
    +          case Some(indexes) =>
    +            if (indexes(z).isEmpty) indexes(z) = Some(i)
    +          case None =>
    +            assertSizeOfArrayBuffer(arrayBuffer.size)
    +            val indexes = Array[Option[Int]](None, None)
    +            indexes(z) = Some(i)
    +            hashMap.put(key, indexes)
    +            arrayBuffer += Tuple2(key, indexes)
    +        }
    +        i += 1
    +      }
    +      z += 1
    +    }
    +    arrayBuffer
    +  }
    +
    +  private def getKeysWithIndexesBruteForce(keys1: ArrayData, keys2: ArrayData) = {
    +    val arrayBuffer = new mutable.ArrayBuffer[(Any, Array[Option[Int]])]
    +    val keys = Array(keys1, keys2)
    +    var z = 0
    +    while(z < 2) {
    +      var i = 0
    +      val array = keys(z)
    +      while (i < array.numElements()) {
    +        val key = array.get(i, keyType)
    +        var found = false
    +        var j = 0
    +        while (!found && j < arrayBuffer.size) {
    +          val (bufferKey, indexes) = arrayBuffer(j)
    +          if (ordering.equiv(bufferKey, key)) {
    +            found = true
    +            if(indexes(z).isEmpty) indexes(z) = Some(i)
    +          }
    +          j += 1
    +        }
    +        if (!found) {
    +          assertSizeOfArrayBuffer(arrayBuffer.size)
    --- End diff --
    
    I see, because you are using an ArrayBuffer....makes sense, thanks


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22017: [SPARK-23938][SQL] Add map_zip_with function

Posted by ueshin <gi...@git.apache.org>.
Github user ueshin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22017#discussion_r208803217
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala ---
    @@ -442,3 +442,184 @@ case class ArrayAggregate(
     
       override def prettyName: String = "aggregate"
     }
    +
    +/**
    + * Merges two given maps into a single map by applying function to the pair of values with
    + * the same key.
    + */
    +@ExpressionDescription(
    +  usage =
    +    """
    +      _FUNC_(map1, map2, function) - Merges two given maps into a single map by applying
    +      function to the pair of values with the same key. For keys only presented in one map,
    +      NULL will be passed as the value for the missing key. If an input map contains duplicated
    +      keys, only the first entry of the duplicated key is passed into the lambda function.
    +    """,
    +  examples = """
    +    Examples:
    +      > SELECT _FUNC_(map(1, 'a', 2, 'b'), map(1, 'x', 2, 'y'), (k, v1, v2) -> concat(v1, v2));
    +       {1:"ax",2:"by"}
    +  """,
    +  since = "2.4.0")
    +case class MapZipWith(left: Expression, right: Expression, function: Expression)
    +  extends HigherOrderFunction with CodegenFallback {
    +
    +  @transient lazy val functionForEval: Expression = functionsForEval.head
    +
    +  @transient lazy val (keyType, leftValueType, _) =
    +    HigherOrderFunction.mapKeyValueArgumentType(left.dataType)
    +
    +  @transient lazy val (_, rightValueType, _) =
    +    HigherOrderFunction.mapKeyValueArgumentType(right.dataType)
    +
    +  @transient lazy val ordering = TypeUtils.getInterpretedOrdering(keyType)
    +
    +  override def inputs: Seq[Expression] = left :: right :: Nil
    +
    +  override def functions: Seq[Expression] = function :: Nil
    +
    +  override def nullable: Boolean = left.nullable || right.nullable
    +
    +  override def dataType: DataType = MapType(keyType, function.dataType, function.nullable)
    +
    +  override def checkInputDataTypes(): TypeCheckResult = {
    +    (left.dataType, right.dataType) match {
    +      case (MapType(k1, _, _), MapType(k2, _, _)) if k1.sameType(k2) =>
    +        TypeUtils.checkForOrderingExpr(k1, s"function $prettyName")
    +      case _ => TypeCheckResult.TypeCheckFailure(s"The input to function $prettyName should have " +
    +        s"been two ${MapType.simpleString}s with the same key type, but it's " +
    +        s"[${left.dataType.catalogString}, ${right.dataType.catalogString}].")
    +    }
    +  }
    +
    +  override def bind(f: (Expression, Seq[(DataType, Boolean)]) => LambdaFunction): MapZipWith = {
    +    val arguments = Seq((keyType, false), (leftValueType, true), (rightValueType, true))
    +    copy(function = f(function, arguments))
    +  }
    +
    +  override def eval(input: InternalRow): Any = {
    +    val value1 = left.eval(input)
    +    if (value1 == null) {
    +      null
    +    } else {
    +      val value2 = right.eval(input)
    +      if (value2 == null) {
    +        null
    +      } else {
    +        nullSafeEval(input, value1, value2)
    +      }
    +    }
    +  }
    +
    +  @transient lazy val LambdaFunction(_, Seq(
    +    keyVar: NamedLambdaVariable,
    +    value1Var: NamedLambdaVariable,
    +    value2Var: NamedLambdaVariable),
    +    _) = function
    +
    +  private def keyTypeSupportsEquals = keyType match {
    +    case BinaryType => false
    +    case _: AtomicType => true
    +    case _ => false
    +  }
    +
    +  @transient private lazy val getKeysWithValueIndexes:
    +      (ArrayData, ArrayData) => mutable.Iterable[(Any, (Option[Int], Option[Int]))] = {
    +    if (keyTypeSupportsEquals) {
    +      getKeysWithIndexesFast
    +    } else {
    +      getKeysWithIndexesBruteForce
    +    }
    +  }
    +
    +  private def assertSizeOfArrayBuffer(size: Int): Unit = {
    +    if (size > ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH) {
    +      throw new RuntimeException(s"Unsuccessful try to zip maps with $size " +
    +        s"unique keys due to exceeding the array size limit " +
    +        s"${ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH}.")
    +    }
    +  }
    +
    +  private def getKeysWithIndexesFast(keys1: ArrayData, keys2: ArrayData) = {
    +    val hashMap = new mutable.OpenHashMap[Any, (Option[Int], Option[Int])]
    +    var i = 0
    +    while (i < keys1.numElements) {
    +      val key = keys1.get(i, keyType)
    +      if(!hashMap.contains(key)) hashMap.put(key, (Some(i), None))
    --- End diff --
    
    nit: let's use brackets:
    
    ```scala
    if (!hashMap.contains(key)) {
      hashMap.put(key, (Some(i), None))
    }
    ```


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22017: [SPARK-23938][SQL] Add map_zip_with function

Posted by mn-mikke <gi...@git.apache.org>.
Github user mn-mikke commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22017#discussion_r208868838
  
    --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvalHelper.scala ---
    @@ -225,7 +264,9 @@ trait ExpressionEvalHelper extends GeneratorDrivenPropertyChecks with PlanTestBa
               val lit = InternalRow(expected, expected)
               val expectedRow =
                 UnsafeProjection.create(Array(expression.dataType, expression.dataType)).apply(lit)
    -          if (unsafeRow != expectedRow) {
    +          val field = StructField("field", expression.dataType)
    +          val dataType = StructType(field :: field :: Nil)
    +          if (!checkResult(unsafeRow, expectedRow, dataType)) {
    --- End diff --
    
    ```UnsafeRow```s are compared based on equality of backing arrays. This approach doesn't work well when ignoring order in unsafe representation of maps.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22017: [SPARK-23938][SQL] Add map_zip_with function

Posted by ueshin <gi...@git.apache.org>.
Github user ueshin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22017#discussion_r208803198
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala ---
    @@ -442,3 +442,184 @@ case class ArrayAggregate(
     
       override def prettyName: String = "aggregate"
     }
    +
    +/**
    + * Merges two given maps into a single map by applying function to the pair of values with
    + * the same key.
    + */
    +@ExpressionDescription(
    +  usage =
    +    """
    +      _FUNC_(map1, map2, function) - Merges two given maps into a single map by applying
    +      function to the pair of values with the same key. For keys only presented in one map,
    +      NULL will be passed as the value for the missing key. If an input map contains duplicated
    +      keys, only the first entry of the duplicated key is passed into the lambda function.
    +    """,
    +  examples = """
    +    Examples:
    +      > SELECT _FUNC_(map(1, 'a', 2, 'b'), map(1, 'x', 2, 'y'), (k, v1, v2) -> concat(v1, v2));
    +       {1:"ax",2:"by"}
    +  """,
    +  since = "2.4.0")
    +case class MapZipWith(left: Expression, right: Expression, function: Expression)
    +  extends HigherOrderFunction with CodegenFallback {
    +
    +  @transient lazy val functionForEval: Expression = functionsForEval.head
    +
    +  @transient lazy val (keyType, leftValueType, _) =
    --- End diff --
    
    `keyType` should be `TypeCoercion.findTightestCommonType(leftKeyType, rightKeyType)`?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22017: [SPARK-23938][SQL] Add map_zip_with function

Posted by xuanyuanking <gi...@git.apache.org>.
Github user xuanyuanking commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22017#discussion_r208257687
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/complexTypeExtractors.scala ---
    @@ -267,22 +267,23 @@ case class GetArrayItem(child: Expression, ordinal: Expression)
       }
     }
     
    -/**
    - * Common base class for [[GetMapValue]] and [[ElementAt]].
    - */
    -
    -abstract class GetMapValueUtil extends BinaryExpression with ImplicitCastInputTypes {
    +object GetMapValueUtil
    +{
    --- End diff --
    
    nit: brace should in previous line. 


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22017: [SPARK-23938][SQL] Add map_zip_with function

Posted by ueshin <gi...@git.apache.org>.
Github user ueshin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22017#discussion_r209518997
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala ---
    @@ -442,3 +442,186 @@ case class ArrayAggregate(
     
       override def prettyName: String = "aggregate"
     }
    +
    +/**
    + * Merges two given maps into a single map by applying function to the pair of values with
    + * the same key.
    + */
    +@ExpressionDescription(
    +  usage =
    +    """
    +      _FUNC_(map1, map2, function) - Merges two given maps into a single map by applying
    +      function to the pair of values with the same key. For keys only presented in one map,
    +      NULL will be passed as the value for the missing key. If an input map contains duplicated
    +      keys, only the first entry of the duplicated key is passed into the lambda function.
    +    """,
    +  examples = """
    +    Examples:
    +      > SELECT _FUNC_(map(1, 'a', 2, 'b'), map(1, 'x', 2, 'y'), (k, v1, v2) -> concat(v1, v2));
    +       {1:"ax",2:"by"}
    +  """,
    +  since = "2.4.0")
    +case class MapZipWith(left: Expression, right: Expression, function: Expression)
    +  extends HigherOrderFunction with CodegenFallback {
    +
    +  @transient lazy val functionForEval: Expression = functionsForEval.head
    +
    +  @transient lazy val (leftKeyType, leftValueType, leftValueContainsNull) =
    +    HigherOrderFunction.mapKeyValueArgumentType(left.dataType)
    +
    +  @transient lazy val (rightKeyType, rightValueType, rightValueContainsNull) =
    +    HigherOrderFunction.mapKeyValueArgumentType(right.dataType)
    +
    +  @transient lazy val keyType =
    +    TypeCoercion.findTightestCommonType(leftKeyType, rightKeyType).getOrElse(NullType)
    --- End diff --
    
    After #22075, `checkArgumentDataType()` introduced in it will be executed before `bind`, so the key types should be "sameType" and we will be able to use `findCommonTypeDifferentOnlyInNullFlags`. We still need `checkInputDataTypes` to be executed after `bind` to check the whole data types are valid.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22017: [SPARK-23938][SQL] Add map_zip_with function

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22017
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22017: [SPARK-23938][SQL] Add map_zip_with function

Posted by mgaido91 <gi...@git.apache.org>.
Github user mgaido91 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22017#discussion_r208557315
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala ---
    @@ -442,3 +442,191 @@ case class ArrayAggregate(
     
       override def prettyName: String = "aggregate"
     }
    +
    +/**
    + * Merges two given maps into a single map by applying function to the pair of values with
    + * the same key.
    + */
    +@ExpressionDescription(
    +  usage =
    +    """
    +      _FUNC_(map1, map2, function) - Merges two given maps into a single map by applying
    +      function to the pair of values with the same key. For keys only presented in one map,
    +      NULL will be passed as the value for the missing key. If an input map contains duplicated
    +      keys, only the first entry of the duplicated key is passed into the lambda function.
    +    """,
    +  examples = """
    +    Examples:
    +      > SELECT _FUNC_(map(1, 'a', 2, 'b'), map(1, 'x', 2, 'y'), (k, v1, v2) -> concat(v1, v2));
    +       {1:"ax",2:"by"}
    +  """,
    +  since = "2.4.0")
    +case class MapZipWith(left: Expression, right: Expression, function: Expression)
    +  extends HigherOrderFunction with CodegenFallback {
    +
    +  @transient lazy val functionForEval: Expression = functionsForEval.head
    +
    +  @transient lazy val (keyType, leftValueType, _) =
    +    HigherOrderFunction.mapKeyValueArgumentType(left.dataType)
    +
    +  @transient lazy val (_, rightValueType, _) =
    +    HigherOrderFunction.mapKeyValueArgumentType(right.dataType)
    +
    +  @transient lazy val ordering = TypeUtils.getInterpretedOrdering(keyType)
    +
    +  override def inputs: Seq[Expression] = left :: right :: Nil
    +
    +  override def functions: Seq[Expression] = function :: Nil
    +
    +  override def nullable: Boolean = left.nullable || right.nullable
    +
    +  override def dataType: DataType = MapType(keyType, function.dataType, function.nullable)
    +
    +  override def checkInputDataTypes(): TypeCheckResult = {
    +    (left.dataType, right.dataType) match {
    +      case (MapType(k1, _, _), MapType(k2, _, _)) if k1.sameType(k2) =>
    +        TypeUtils.checkForOrderingExpr(k1, s"function $prettyName")
    +      case _ => TypeCheckResult.TypeCheckFailure(s"The input to function $prettyName should have " +
    +        s"been two ${MapType.simpleString}s with the same key type, but it's " +
    +        s"[${left.dataType.catalogString}, ${right.dataType.catalogString}].")
    +    }
    +  }
    +
    +  override def bind(f: (Expression, Seq[(DataType, Boolean)]) => LambdaFunction): MapZipWith = {
    +    val arguments = Seq((keyType, false), (leftValueType, true), (rightValueType, true))
    +    copy(function = f(function, arguments))
    +  }
    +
    +  override def eval(input: InternalRow): Any = {
    +    val value1 = left.eval(input)
    +    if (value1 == null) {
    +      null
    +    } else {
    +      val value2 = right.eval(input)
    +      if (value2 == null) {
    +        null
    +      } else {
    +        nullSafeEval(input, value1, value2)
    +      }
    +    }
    +  }
    +
    +  @transient lazy val LambdaFunction(_, Seq(
    +    keyVar: NamedLambdaVariable,
    +    value1Var: NamedLambdaVariable,
    +    value2Var: NamedLambdaVariable),
    +    _) = function
    +
    +  private def keyTypeSupportsEquals = keyType match {
    +    case BinaryType => false
    +    case _: AtomicType => true
    +    case _ => false
    +  }
    +
    +  @transient private lazy val getKeysWithValueIndexes:
    +      (ArrayData, ArrayData) => Seq[(Any, Array[Option[Int]])] = {
    +    if (keyTypeSupportsEquals) {
    +      getKeysWithIndexesFast
    +    } else {
    +      getKeysWithIndexesBruteForce
    +    }
    +  }
    +
    +  private def assertSizeOfArrayBuffer(size: Int): Unit = {
    +    if (size > ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH) {
    +      throw new RuntimeException(s"Unsuccessful try to zip maps with $size " +
    +        s"unique keys due to exceeding the array size limit " +
    +        s"${ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH}.")
    +    }
    +  }
    +
    +  private def getKeysWithIndexesFast(keys1: ArrayData, keys2: ArrayData) = {
    +    val arrayBuffer = new mutable.ArrayBuffer[(Any, Array[Option[Int]])]
    --- End diff --
    
    since the HashMap is mutable, you can just: `hashMap += key -> newTuple`


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22017: [SPARK-23938][SQL] Add map_zip_with function

Posted by mn-mikke <gi...@git.apache.org>.
Github user mn-mikke commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22017#discussion_r209311502
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala ---
    @@ -231,6 +231,15 @@ object TypeCoercion {
           })
       }
     
    +  /**
    +   * Similar to [[findTightestCommonType]] but with string promotion.
    +   */
    +  def findWiderTypeForTwoExceptDecimals(t1: DataType, t2: DataType): Option[DataType] = {
    --- End diff --
    
    If we have maps with decimals of different precision as keys. ```Cast``` will fail in analysis phase since it can't cast a key to nullable (potential lost of precision). IMHO, the type mismatch exception from this function will be more accurate. WDYT?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22017: [SPARK-23938][SQL] Add map_zip_with function

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22017
  
    **[Test build #94362 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/94362/testReport)** for PR 22017 at commit [`ec583eb`](https://github.com/apache/spark/commit/ec583eb29ba6fdb79d0b85cbecb3f709e6648b25).
     * This patch **fails Spark unit tests**.
     * This patch merges cleanly.
     * This patch adds the following public classes _(experimental)_:
      * `class ArrayDataUnion(elementType: DataType) extends ((ArrayData, ArrayData) => ArrayData) `
      * `case class ArrayUnion(left: Expression, right: Expression) extends ArraySetLike`


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22017: [SPARK-23938][SQL] Add map_zip_with function

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22017
  
    **[Test build #94712 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/94712/testReport)** for PR 22017 at commit [`2b7e9e5`](https://github.com/apache/spark/commit/2b7e9e59a69d991eba24ca86b8df5fe54b6e077f).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22017: [SPARK-23938][SQL] Add map_zip_with function

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22017
  
    **[Test build #94712 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/94712/testReport)** for PR 22017 at commit [`2b7e9e5`](https://github.com/apache/spark/commit/2b7e9e59a69d991eba24ca86b8df5fe54b6e077f).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22017: [SPARK-23938][SQL] Add map_zip_with function

Posted by xuanyuanking <gi...@git.apache.org>.
Github user xuanyuanking commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22017#discussion_r208260664
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala ---
    @@ -365,3 +364,101 @@ case class ArrayAggregate(
     
       override def prettyName: String = "aggregate"
     }
    +
    +/**
    + * Merges two given maps into a single map by applying function to the pair of values with
    + * the same key.
    + */
    +@ExpressionDescription(
    +  usage =
    +    """
    +      _FUNC_(map1, map2, function) - Merges two given maps into a single map by applying
    +      function to the pair of values with the same key. For keys only presented in one map,
    +      NULL will be passed as the value for the missing key. If an input map contains duplicated
    +      keys, only the first entry of the duplicated key is passed into the lambda function.
    +    """,
    +  examples = """
    +    Examples:
    +      > SELECT _FUNC_(map(1, 'a', 2, 'b'), map(1, 'x', 2, 'y'), (k, v1, v2) -> concat(v1, v2));
    +       {1:"ax",2:"by"}
    +  """,
    +  since = "2.4.0")
    +case class MapZipWith(left: Expression, right: Expression, function: Expression)
    +  extends HigherOrderFunction with CodegenFallback {
    +
    +  @transient lazy val functionForEval: Expression = functionsForEval.head
    +
    +  @transient lazy val MapType(keyType, leftValueType, _) = getMapType(left)
    +
    +  @transient lazy val MapType(_, rightValueType, _) = getMapType(right)
    +
    +  @transient lazy val arrayDataUnion = new ArrayDataUnion(keyType)
    +
    +  @transient lazy val ordering = TypeUtils.getInterpretedOrdering(keyType)
    +
    +  override def inputs: Seq[Expression] = left :: right :: Nil
    +
    +  override def functions: Seq[Expression] = function :: Nil
    +
    +  override def nullable: Boolean = left.nullable || right.nullable
    --- End diff --
    
    `left.nullable && right.nullable`? Because if one side is empty map, NULL will be passed as the value for each key in other side.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22017: [SPARK-23938][SQL] Add map_zip_with function

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22017
  
    **[Test build #94685 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/94685/testReport)** for PR 22017 at commit [`595161f`](https://github.com/apache/spark/commit/595161fefbf55711b76530a9e53aff73491febd6).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22017: [SPARK-23938][SQL] Add map_zip_with function

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22017
  
    **[Test build #94394 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/94394/testReport)** for PR 22017 at commit [`12ad8b2`](https://github.com/apache/spark/commit/12ad8b2248b7acb4a04289ca8da439ecb63206a9).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22017: [SPARK-23938][SQL] Add map_zip_with function

Posted by mn-mikke <gi...@git.apache.org>.
Github user mn-mikke commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22017#discussion_r209533017
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala ---
    @@ -442,3 +442,186 @@ case class ArrayAggregate(
     
       override def prettyName: String = "aggregate"
     }
    +
    +/**
    + * Merges two given maps into a single map by applying function to the pair of values with
    + * the same key.
    + */
    +@ExpressionDescription(
    +  usage =
    +    """
    +      _FUNC_(map1, map2, function) - Merges two given maps into a single map by applying
    +      function to the pair of values with the same key. For keys only presented in one map,
    +      NULL will be passed as the value for the missing key. If an input map contains duplicated
    +      keys, only the first entry of the duplicated key is passed into the lambda function.
    +    """,
    +  examples = """
    +    Examples:
    +      > SELECT _FUNC_(map(1, 'a', 2, 'b'), map(1, 'x', 2, 'y'), (k, v1, v2) -> concat(v1, v2));
    +       {1:"ax",2:"by"}
    +  """,
    +  since = "2.4.0")
    +case class MapZipWith(left: Expression, right: Expression, function: Expression)
    +  extends HigherOrderFunction with CodegenFallback {
    +
    +  @transient lazy val functionForEval: Expression = functionsForEval.head
    +
    +  @transient lazy val (leftKeyType, leftValueType, leftValueContainsNull) =
    +    HigherOrderFunction.mapKeyValueArgumentType(left.dataType)
    +
    +  @transient lazy val (rightKeyType, rightValueType, rightValueContainsNull) =
    +    HigherOrderFunction.mapKeyValueArgumentType(right.dataType)
    +
    +  @transient lazy val keyType =
    +    TypeCoercion.findTightestCommonType(leftKeyType, rightKeyType).getOrElse(NullType)
    --- End diff --
    
    Oh, I see. We also need to check the output data type of lambda functions for the expressions like ```ArrayFilter```.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22017: [SPARK-23938][SQL] Add map_zip_with function

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22017
  
    Can one of the admins verify this patch?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22017: [SPARK-23938][SQL] Add map_zip_with function

Posted by mn-mikke <gi...@git.apache.org>.
Github user mn-mikke commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22017#discussion_r208519941
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala ---
    @@ -442,3 +442,191 @@ case class ArrayAggregate(
     
       override def prettyName: String = "aggregate"
     }
    +
    +/**
    + * Merges two given maps into a single map by applying function to the pair of values with
    + * the same key.
    + */
    +@ExpressionDescription(
    +  usage =
    +    """
    +      _FUNC_(map1, map2, function) - Merges two given maps into a single map by applying
    +      function to the pair of values with the same key. For keys only presented in one map,
    +      NULL will be passed as the value for the missing key. If an input map contains duplicated
    +      keys, only the first entry of the duplicated key is passed into the lambda function.
    +    """,
    +  examples = """
    +    Examples:
    +      > SELECT _FUNC_(map(1, 'a', 2, 'b'), map(1, 'x', 2, 'y'), (k, v1, v2) -> concat(v1, v2));
    +       {1:"ax",2:"by"}
    +  """,
    +  since = "2.4.0")
    +case class MapZipWith(left: Expression, right: Expression, function: Expression)
    +  extends HigherOrderFunction with CodegenFallback {
    +
    +  @transient lazy val functionForEval: Expression = functionsForEval.head
    +
    +  @transient lazy val (keyType, leftValueType, _) =
    +    HigherOrderFunction.mapKeyValueArgumentType(left.dataType)
    +
    +  @transient lazy val (_, rightValueType, _) =
    +    HigherOrderFunction.mapKeyValueArgumentType(right.dataType)
    +
    +  @transient lazy val ordering = TypeUtils.getInterpretedOrdering(keyType)
    +
    +  override def inputs: Seq[Expression] = left :: right :: Nil
    +
    +  override def functions: Seq[Expression] = function :: Nil
    +
    +  override def nullable: Boolean = left.nullable || right.nullable
    +
    +  override def dataType: DataType = MapType(keyType, function.dataType, function.nullable)
    +
    +  override def checkInputDataTypes(): TypeCheckResult = {
    +    (left.dataType, right.dataType) match {
    +      case (MapType(k1, _, _), MapType(k2, _, _)) if k1.sameType(k2) =>
    +        TypeUtils.checkForOrderingExpr(k1, s"function $prettyName")
    +      case _ => TypeCheckResult.TypeCheckFailure(s"The input to function $prettyName should have " +
    +        s"been two ${MapType.simpleString}s with the same key type, but it's " +
    +        s"[${left.dataType.catalogString}, ${right.dataType.catalogString}].")
    +    }
    +  }
    +
    +  override def bind(f: (Expression, Seq[(DataType, Boolean)]) => LambdaFunction): MapZipWith = {
    +    val arguments = Seq((keyType, false), (leftValueType, true), (rightValueType, true))
    +    copy(function = f(function, arguments))
    +  }
    +
    +  override def eval(input: InternalRow): Any = {
    +    val value1 = left.eval(input)
    +    if (value1 == null) {
    +      null
    +    } else {
    +      val value2 = right.eval(input)
    +      if (value2 == null) {
    +        null
    +      } else {
    +        nullSafeEval(input, value1, value2)
    +      }
    +    }
    +  }
    +
    +  @transient lazy val LambdaFunction(_, Seq(
    +    keyVar: NamedLambdaVariable,
    +    value1Var: NamedLambdaVariable,
    +    value2Var: NamedLambdaVariable),
    +    _) = function
    +
    +  private def keyTypeSupportsEquals = keyType match {
    +    case BinaryType => false
    +    case _: AtomicType => true
    +    case _ => false
    +  }
    +
    +  @transient private lazy val getKeysWithValueIndexes:
    +      (ArrayData, ArrayData) => Seq[(Any, Array[Option[Int]])] = {
    +    if (keyTypeSupportsEquals) {
    +      getKeysWithIndexesFast
    +    } else {
    +      getKeysWithIndexesBruteForce
    +    }
    +  }
    +
    +  private def assertSizeOfArrayBuffer(size: Int): Unit = {
    +    if (size > ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH) {
    +      throw new RuntimeException(s"Unsuccessful try to zip maps with $size " +
    +        s"unique keys due to exceeding the array size limit " +
    +        s"${ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH}.")
    +    }
    +  }
    +
    +  private def getKeysWithIndexesFast(keys1: ArrayData, keys2: ArrayData) = {
    +    val arrayBuffer = new mutable.ArrayBuffer[(Any, Array[Option[Int]])]
    --- End diff --
    
    If we changed it to ```(Option[Int], Option[Int])```, wouldn't we need two similar ```i``` loops instead of one?
    
    My motivation for using also the ```ArrayBuffer``` is preserve the order of keys. A random order would break map comparison in tests. Maybe you will come with idea how to compare maps in tests better :-)


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22017: [SPARK-23938][SQL] Add map_zip_with function

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22017
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22017: [SPARK-23938][SQL] Add map_zip_with function

Posted by ueshin <gi...@git.apache.org>.
Github user ueshin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22017#discussion_r209485487
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala ---
    @@ -231,6 +231,15 @@ object TypeCoercion {
           })
       }
     
    +  /**
    +   * Similar to [[findTightestCommonType]] but with string promotion.
    +   */
    +  def findWiderTypeForTwoExceptDecimals(t1: DataType, t2: DataType): Option[DataType] = {
    --- End diff --
    
    I submitted a pr #22086.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22017: [SPARK-23938][SQL] Add map_zip_with function

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22017
  
    **[Test build #94362 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/94362/testReport)** for PR 22017 at commit [`ec583eb`](https://github.com/apache/spark/commit/ec583eb29ba6fdb79d0b85cbecb3f709e6648b25).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22017: [SPARK-23938][SQL] Add map_zip_with function

Posted by mn-mikke <gi...@git.apache.org>.
Github user mn-mikke commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22017#discussion_r208872928
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala ---
    @@ -442,3 +442,191 @@ case class ArrayAggregate(
     
       override def prettyName: String = "aggregate"
     }
    +
    +/**
    + * Merges two given maps into a single map by applying function to the pair of values with
    + * the same key.
    + */
    +@ExpressionDescription(
    +  usage =
    +    """
    +      _FUNC_(map1, map2, function) - Merges two given maps into a single map by applying
    +      function to the pair of values with the same key. For keys only presented in one map,
    +      NULL will be passed as the value for the missing key. If an input map contains duplicated
    +      keys, only the first entry of the duplicated key is passed into the lambda function.
    +    """,
    +  examples = """
    +    Examples:
    +      > SELECT _FUNC_(map(1, 'a', 2, 'b'), map(1, 'x', 2, 'y'), (k, v1, v2) -> concat(v1, v2));
    +       {1:"ax",2:"by"}
    +  """,
    +  since = "2.4.0")
    +case class MapZipWith(left: Expression, right: Expression, function: Expression)
    +  extends HigherOrderFunction with CodegenFallback {
    +
    +  @transient lazy val functionForEval: Expression = functionsForEval.head
    +
    +  @transient lazy val (keyType, leftValueType, _) =
    +    HigherOrderFunction.mapKeyValueArgumentType(left.dataType)
    +
    +  @transient lazy val (_, rightValueType, _) =
    +    HigherOrderFunction.mapKeyValueArgumentType(right.dataType)
    +
    +  @transient lazy val ordering = TypeUtils.getInterpretedOrdering(keyType)
    +
    +  override def inputs: Seq[Expression] = left :: right :: Nil
    +
    +  override def functions: Seq[Expression] = function :: Nil
    +
    +  override def nullable: Boolean = left.nullable || right.nullable
    +
    +  override def dataType: DataType = MapType(keyType, function.dataType, function.nullable)
    +
    +  override def checkInputDataTypes(): TypeCheckResult = {
    +    (left.dataType, right.dataType) match {
    +      case (MapType(k1, _, _), MapType(k2, _, _)) if k1.sameType(k2) =>
    +        TypeUtils.checkForOrderingExpr(k1, s"function $prettyName")
    +      case _ => TypeCheckResult.TypeCheckFailure(s"The input to function $prettyName should have " +
    +        s"been two ${MapType.simpleString}s with the same key type, but it's " +
    +        s"[${left.dataType.catalogString}, ${right.dataType.catalogString}].")
    +    }
    +  }
    +
    +  override def bind(f: (Expression, Seq[(DataType, Boolean)]) => LambdaFunction): MapZipWith = {
    +    val arguments = Seq((keyType, false), (leftValueType, true), (rightValueType, true))
    +    copy(function = f(function, arguments))
    +  }
    +
    +  override def eval(input: InternalRow): Any = {
    +    val value1 = left.eval(input)
    +    if (value1 == null) {
    +      null
    +    } else {
    +      val value2 = right.eval(input)
    +      if (value2 == null) {
    +        null
    +      } else {
    +        nullSafeEval(input, value1, value2)
    +      }
    +    }
    +  }
    +
    +  @transient lazy val LambdaFunction(_, Seq(
    +    keyVar: NamedLambdaVariable,
    +    value1Var: NamedLambdaVariable,
    +    value2Var: NamedLambdaVariable),
    +    _) = function
    +
    +  private def keyTypeSupportsEquals = keyType match {
    +    case BinaryType => false
    +    case _: AtomicType => true
    +    case _ => false
    +  }
    +
    +  @transient private lazy val getKeysWithValueIndexes:
    +      (ArrayData, ArrayData) => Seq[(Any, Array[Option[Int]])] = {
    +    if (keyTypeSupportsEquals) {
    +      getKeysWithIndexesFast
    +    } else {
    +      getKeysWithIndexesBruteForce
    +    }
    +  }
    +
    +  private def assertSizeOfArrayBuffer(size: Int): Unit = {
    +    if (size > ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH) {
    +      throw new RuntimeException(s"Unsuccessful try to zip maps with $size " +
    +        s"unique keys due to exceeding the array size limit " +
    +        s"${ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH}.")
    +    }
    +  }
    +
    +  private def getKeysWithIndexesFast(keys1: ArrayData, keys2: ArrayData) = {
    +    val arrayBuffer = new mutable.ArrayBuffer[(Any, Array[Option[Int]])]
    --- End diff --
    
    @mgaido91 Are you comfortable with reverting back to the previous version?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22017: [SPARK-23938][SQL] Add map_zip_with function

Posted by ueshin <gi...@git.apache.org>.
Github user ueshin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22017#discussion_r209816384
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala ---
    @@ -496,3 +496,195 @@ case class ArrayAggregate(
     
       override def prettyName: String = "aggregate"
     }
    +
    +/**
    + * Merges two given maps into a single map by applying function to the pair of values with
    + * the same key.
    + */
    +@ExpressionDescription(
    +  usage =
    +    """
    +      _FUNC_(map1, map2, function) - Merges two given maps into a single map by applying
    +      function to the pair of values with the same key. For keys only presented in one map,
    +      NULL will be passed as the value for the missing key. If an input map contains duplicated
    +      keys, only the first entry of the duplicated key is passed into the lambda function.
    +    """,
    +  examples = """
    +    Examples:
    +      > SELECT _FUNC_(map(1, 'a', 2, 'b'), map(1, 'x', 2, 'y'), (k, v1, v2) -> concat(v1, v2));
    +       {1:"ax",2:"by"}
    +  """,
    +  since = "2.4.0")
    +case class MapZipWith(left: Expression, right: Expression, function: Expression)
    +  extends HigherOrderFunction with CodegenFallback {
    +
    +  @transient lazy val functionForEval: Expression = functionsForEval.head
    --- End diff --
    
    nit: shall we use `def` here to follow the comment https://github.com/apache/spark/pull/21954#discussion_r208266333?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22017: [SPARK-23938][SQL] Add map_zip_with function

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22017
  
    **[Test build #94325 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/94325/testReport)** for PR 22017 at commit [`ef56011`](https://github.com/apache/spark/commit/ef56011f03d8bae4634e5d3108e4d6502482383c).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22017: [SPARK-23938][SQL] Add map_zip_with function

Posted by mgaido91 <gi...@git.apache.org>.
Github user mgaido91 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22017#discussion_r209862506
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala ---
    @@ -496,3 +496,194 @@ case class ArrayAggregate(
     
       override def prettyName: String = "aggregate"
     }
    +
    +/**
    + * Merges two given maps into a single map by applying function to the pair of values with
    + * the same key.
    + */
    +@ExpressionDescription(
    +  usage =
    +    """
    +      _FUNC_(map1, map2, function) - Merges two given maps into a single map by applying
    +      function to the pair of values with the same key. For keys only presented in one map,
    +      NULL will be passed as the value for the missing key. If an input map contains duplicated
    +      keys, only the first entry of the duplicated key is passed into the lambda function.
    +    """,
    +  examples = """
    +    Examples:
    +      > SELECT _FUNC_(map(1, 'a', 2, 'b'), map(1, 'x', 2, 'y'), (k, v1, v2) -> concat(v1, v2));
    +       {1:"ax",2:"by"}
    +  """,
    +  since = "2.4.0")
    +case class MapZipWith(left: Expression, right: Expression, function: Expression)
    +  extends HigherOrderFunction with CodegenFallback {
    +
    +  def functionForEval: Expression = functionsForEval.head
    +
    +  @transient lazy val MapType(leftKeyType, leftValueType, leftValueContainsNull) = left.dataType
    +
    +  @transient lazy val MapType(rightKeyType, rightValueType, rightValueContainsNull) = right.dataType
    +
    +  @transient lazy val keyType =
    +    TypeCoercion.findCommonTypeDifferentOnlyInNullFlags(leftKeyType, rightKeyType).get
    --- End diff --
    
    shouldn't the null flag be false for both them?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22017: [SPARK-23938][SQL] Add map_zip_with function

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22017
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/94325/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22017: [SPARK-23938][SQL] Add map_zip_with function

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22017
  
    **[Test build #94393 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/94393/testReport)** for PR 22017 at commit [`89a3da4`](https://github.com/apache/spark/commit/89a3da4e292690b78fbb41deef4104be3f843c1b).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22017: [SPARK-23938][SQL] Add map_zip_with function

Posted by mgaido91 <gi...@git.apache.org>.
Github user mgaido91 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22017#discussion_r208504963
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala ---
    @@ -442,3 +442,191 @@ case class ArrayAggregate(
     
       override def prettyName: String = "aggregate"
     }
    +
    +/**
    + * Merges two given maps into a single map by applying function to the pair of values with
    + * the same key.
    + */
    +@ExpressionDescription(
    +  usage =
    +    """
    +      _FUNC_(map1, map2, function) - Merges two given maps into a single map by applying
    +      function to the pair of values with the same key. For keys only presented in one map,
    +      NULL will be passed as the value for the missing key. If an input map contains duplicated
    +      keys, only the first entry of the duplicated key is passed into the lambda function.
    +    """,
    +  examples = """
    +    Examples:
    +      > SELECT _FUNC_(map(1, 'a', 2, 'b'), map(1, 'x', 2, 'y'), (k, v1, v2) -> concat(v1, v2));
    +       {1:"ax",2:"by"}
    +  """,
    +  since = "2.4.0")
    +case class MapZipWith(left: Expression, right: Expression, function: Expression)
    +  extends HigherOrderFunction with CodegenFallback {
    +
    +  @transient lazy val functionForEval: Expression = functionsForEval.head
    +
    +  @transient lazy val (keyType, leftValueType, _) =
    +    HigherOrderFunction.mapKeyValueArgumentType(left.dataType)
    +
    +  @transient lazy val (_, rightValueType, _) =
    +    HigherOrderFunction.mapKeyValueArgumentType(right.dataType)
    +
    +  @transient lazy val ordering = TypeUtils.getInterpretedOrdering(keyType)
    +
    +  override def inputs: Seq[Expression] = left :: right :: Nil
    +
    +  override def functions: Seq[Expression] = function :: Nil
    +
    +  override def nullable: Boolean = left.nullable || right.nullable
    +
    +  override def dataType: DataType = MapType(keyType, function.dataType, function.nullable)
    +
    +  override def checkInputDataTypes(): TypeCheckResult = {
    +    (left.dataType, right.dataType) match {
    +      case (MapType(k1, _, _), MapType(k2, _, _)) if k1.sameType(k2) =>
    +        TypeUtils.checkForOrderingExpr(k1, s"function $prettyName")
    +      case _ => TypeCheckResult.TypeCheckFailure(s"The input to function $prettyName should have " +
    +        s"been two ${MapType.simpleString}s with the same key type, but it's " +
    +        s"[${left.dataType.catalogString}, ${right.dataType.catalogString}].")
    +    }
    +  }
    +
    +  override def bind(f: (Expression, Seq[(DataType, Boolean)]) => LambdaFunction): MapZipWith = {
    +    val arguments = Seq((keyType, false), (leftValueType, true), (rightValueType, true))
    +    copy(function = f(function, arguments))
    +  }
    +
    +  override def eval(input: InternalRow): Any = {
    +    val value1 = left.eval(input)
    +    if (value1 == null) {
    +      null
    +    } else {
    +      val value2 = right.eval(input)
    +      if (value2 == null) {
    +        null
    +      } else {
    +        nullSafeEval(input, value1, value2)
    +      }
    +    }
    +  }
    +
    +  @transient lazy val LambdaFunction(_, Seq(
    +    keyVar: NamedLambdaVariable,
    +    value1Var: NamedLambdaVariable,
    +    value2Var: NamedLambdaVariable),
    +    _) = function
    +
    +  private def keyTypeSupportsEquals = keyType match {
    +    case BinaryType => false
    +    case _: AtomicType => true
    +    case _ => false
    +  }
    +
    +  @transient private lazy val getKeysWithValueIndexes:
    +      (ArrayData, ArrayData) => Seq[(Any, Array[Option[Int]])] = {
    +    if (keyTypeSupportsEquals) {
    +      getKeysWithIndexesFast
    +    } else {
    +      getKeysWithIndexesBruteForce
    +    }
    +  }
    +
    +  private def assertSizeOfArrayBuffer(size: Int): Unit = {
    +    if (size > ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH) {
    +      throw new RuntimeException(s"Unsuccessful try to zip maps with $size " +
    +        s"unique keys due to exceeding the array size limit " +
    +        s"${ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH}.")
    +    }
    +  }
    +
    +  private def getKeysWithIndexesFast(keys1: ArrayData, keys2: ArrayData) = {
    +    val arrayBuffer = new mutable.ArrayBuffer[(Any, Array[Option[Int]])]
    +    val hashMap = new mutable.OpenHashMap[Any, Array[Option[Int]]]
    +    val keys = Array(keys1, keys2)
    --- End diff --
    
    maybe better to do something like `for (arr <- Seq(keys1, keys2))`?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22017: [SPARK-23938][SQL] Add map_zip_with function

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22017
  
    **[Test build #94394 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/94394/testReport)** for PR 22017 at commit [`12ad8b2`](https://github.com/apache/spark/commit/12ad8b2248b7acb4a04289ca8da439ecb63206a9).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22017: [SPARK-23938][SQL] Add map_zip_with function

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22017
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/94526/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22017: [SPARK-23938][SQL] Add map_zip_with function

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22017
  
    **[Test build #94585 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/94585/testReport)** for PR 22017 at commit [`595161f`](https://github.com/apache/spark/commit/595161fefbf55711b76530a9e53aff73491febd6).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22017: [SPARK-23938][SQL] Add map_zip_with function

Posted by mn-mikke <gi...@git.apache.org>.
Github user mn-mikke commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22017#discussion_r209514957
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala ---
    @@ -442,3 +442,186 @@ case class ArrayAggregate(
     
       override def prettyName: String = "aggregate"
     }
    +
    +/**
    + * Merges two given maps into a single map by applying function to the pair of values with
    + * the same key.
    + */
    +@ExpressionDescription(
    +  usage =
    +    """
    +      _FUNC_(map1, map2, function) - Merges two given maps into a single map by applying
    +      function to the pair of values with the same key. For keys only presented in one map,
    +      NULL will be passed as the value for the missing key. If an input map contains duplicated
    +      keys, only the first entry of the duplicated key is passed into the lambda function.
    +    """,
    +  examples = """
    +    Examples:
    +      > SELECT _FUNC_(map(1, 'a', 2, 'b'), map(1, 'x', 2, 'y'), (k, v1, v2) -> concat(v1, v2));
    +       {1:"ax",2:"by"}
    +  """,
    +  since = "2.4.0")
    +case class MapZipWith(left: Expression, right: Expression, function: Expression)
    +  extends HigherOrderFunction with CodegenFallback {
    +
    +  @transient lazy val functionForEval: Expression = functionsForEval.head
    +
    +  @transient lazy val (leftKeyType, leftValueType, leftValueContainsNull) =
    +    HigherOrderFunction.mapKeyValueArgumentType(left.dataType)
    +
    +  @transient lazy val (rightKeyType, rightValueType, rightValueContainsNull) =
    +    HigherOrderFunction.mapKeyValueArgumentType(right.dataType)
    +
    +  @transient lazy val keyType =
    +    TypeCoercion.findTightestCommonType(leftKeyType, rightKeyType).getOrElse(NullType)
    --- End diff --
    
    IMHO, if ```checkInputDataTypes``` was executed before ```bind```, ```findTightestCommonType``` could play the same role. But yeah, ```findCommonTypeDifferentOnlyInNullFlags``` will be semantically more accurate. Thanks!


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22017: [SPARK-23938][SQL] Add map_zip_with function

Posted by HyukjinKwon <gi...@git.apache.org>.
Github user HyukjinKwon commented on the issue:

    https://github.com/apache/spark/pull/22017
  
    retest this please


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22017: [SPARK-23938][SQL] Add map_zip_with function

Posted by mgaido91 <gi...@git.apache.org>.
Github user mgaido91 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22017#discussion_r208503576
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala ---
    @@ -442,3 +442,191 @@ case class ArrayAggregate(
     
       override def prettyName: String = "aggregate"
     }
    +
    +/**
    + * Merges two given maps into a single map by applying function to the pair of values with
    + * the same key.
    + */
    +@ExpressionDescription(
    +  usage =
    +    """
    +      _FUNC_(map1, map2, function) - Merges two given maps into a single map by applying
    +      function to the pair of values with the same key. For keys only presented in one map,
    +      NULL will be passed as the value for the missing key. If an input map contains duplicated
    +      keys, only the first entry of the duplicated key is passed into the lambda function.
    +    """,
    +  examples = """
    +    Examples:
    +      > SELECT _FUNC_(map(1, 'a', 2, 'b'), map(1, 'x', 2, 'y'), (k, v1, v2) -> concat(v1, v2));
    +       {1:"ax",2:"by"}
    +  """,
    +  since = "2.4.0")
    +case class MapZipWith(left: Expression, right: Expression, function: Expression)
    +  extends HigherOrderFunction with CodegenFallback {
    +
    +  @transient lazy val functionForEval: Expression = functionsForEval.head
    +
    +  @transient lazy val (keyType, leftValueType, _) =
    +    HigherOrderFunction.mapKeyValueArgumentType(left.dataType)
    +
    +  @transient lazy val (_, rightValueType, _) =
    +    HigherOrderFunction.mapKeyValueArgumentType(right.dataType)
    +
    +  @transient lazy val ordering = TypeUtils.getInterpretedOrdering(keyType)
    +
    +  override def inputs: Seq[Expression] = left :: right :: Nil
    +
    +  override def functions: Seq[Expression] = function :: Nil
    +
    +  override def nullable: Boolean = left.nullable || right.nullable
    +
    +  override def dataType: DataType = MapType(keyType, function.dataType, function.nullable)
    +
    +  override def checkInputDataTypes(): TypeCheckResult = {
    +    (left.dataType, right.dataType) match {
    +      case (MapType(k1, _, _), MapType(k2, _, _)) if k1.sameType(k2) =>
    +        TypeUtils.checkForOrderingExpr(k1, s"function $prettyName")
    +      case _ => TypeCheckResult.TypeCheckFailure(s"The input to function $prettyName should have " +
    +        s"been two ${MapType.simpleString}s with the same key type, but it's " +
    +        s"[${left.dataType.catalogString}, ${right.dataType.catalogString}].")
    +    }
    +  }
    +
    +  override def bind(f: (Expression, Seq[(DataType, Boolean)]) => LambdaFunction): MapZipWith = {
    +    val arguments = Seq((keyType, false), (leftValueType, true), (rightValueType, true))
    +    copy(function = f(function, arguments))
    +  }
    +
    +  override def eval(input: InternalRow): Any = {
    +    val value1 = left.eval(input)
    +    if (value1 == null) {
    +      null
    +    } else {
    +      val value2 = right.eval(input)
    +      if (value2 == null) {
    +        null
    +      } else {
    +        nullSafeEval(input, value1, value2)
    +      }
    +    }
    +  }
    +
    +  @transient lazy val LambdaFunction(_, Seq(
    +    keyVar: NamedLambdaVariable,
    +    value1Var: NamedLambdaVariable,
    +    value2Var: NamedLambdaVariable),
    +    _) = function
    +
    +  private def keyTypeSupportsEquals = keyType match {
    +    case BinaryType => false
    +    case _: AtomicType => true
    +    case _ => false
    +  }
    +
    +  @transient private lazy val getKeysWithValueIndexes:
    +      (ArrayData, ArrayData) => Seq[(Any, Array[Option[Int]])] = {
    +    if (keyTypeSupportsEquals) {
    +      getKeysWithIndexesFast
    +    } else {
    +      getKeysWithIndexesBruteForce
    +    }
    +  }
    +
    +  private def assertSizeOfArrayBuffer(size: Int): Unit = {
    +    if (size > ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH) {
    +      throw new RuntimeException(s"Unsuccessful try to zip maps with $size " +
    +        s"unique keys due to exceeding the array size limit " +
    +        s"${ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH}.")
    +    }
    +  }
    +
    +  private def getKeysWithIndexesFast(keys1: ArrayData, keys2: ArrayData) = {
    +    val arrayBuffer = new mutable.ArrayBuffer[(Any, Array[Option[Int]])]
    --- End diff --
    
    why `Array[Option[Int]]` instead of `(Option[Int], Option[Int])`? Moreover, I can't understand why we need this at all. As we have the `HashMap`, we can just add there the indexes and return it as an array..


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org