You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by cloud-fan <gi...@git.apache.org> on 2018/11/23 05:33:46 UTC

[GitHub] spark pull request #23124: [SPARK-25829][SQL] remove duplicated map keys wit...

GitHub user cloud-fan opened a pull request:

    https://github.com/apache/spark/pull/23124

    [SPARK-25829][SQL] remove duplicated map keys with last wins policy

    ## What changes were proposed in this pull request?
    
    Currently duplicated map keys are not handled consistently. For example, map look up respects the duplicated key appears first, `Dataset.collect` only keeps the duplicated key appears last, `MapKeys` returns duplicated keys, etc.
    
    This PR proposes to remove duplicated map keys with last wins policy, to follow Java/Scala and Presto. It only applies to built-in functions, as users can create map with duplicated map keys via private APIs anyway.
    
    For other places:
    1. data source v1 doesn't have this problem, as users need to provide a java/scala map, which can't have duplicated keys.
    2. data source v2 may have this problem. I've added a note to `ArrayBasedMapData` to ask the caller to take care of duplicated keys. In the future we should enforce it in the stable data APIs for data source v2.
    3. UDF doesn't have this problem, as users need to provide a java/scala map. Same as data source v1.
    4. file format. I checked all of them and only parquet does not enforce it. For backward compatibility reasons I change nothing but leave a note saying that the behavior will be undefined if users write map with duplicated keys to parquet files. Maybe we can add a config and fail by default if parquet files have map with duplicated keys. This can be done in followup.
    
    ## How was this patch tested?
    
    updated tests and new tests


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/cloud-fan/spark map

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/23124.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #23124
    
----
commit cbcd5d7a937f8120ef8527f1f26150ed93f1de0a
Author: Wenchen Fan <we...@...>
Date:   2018-11-15T02:49:22Z

    remove duplicated map keys with last wins policy

----


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #23124: [SPARK-25829][SQL] remove duplicated map keys with last ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/23124
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/99312/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #23124: [SPARK-25829][SQL] remove duplicated map keys with last ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/23124
  
    Merged build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #23124: [SPARK-25829][SQL] remove duplicated map keys with last ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/23124
  
    **[Test build #99325 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/99325/testReport)** for PR 23124 at commit [`b7073b2`](https://github.com/apache/spark/commit/b7073b29b0d4316dc783f0b5d7d79b134d7558c4).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #23124: [SPARK-25829][SQL] remove duplicated map keys wit...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/23124#discussion_r236282401
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ArrayBasedMapBuilder.scala ---
    @@ -0,0 +1,118 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.catalyst.util
    +
    +import scala.collection.mutable
    +
    +import org.apache.spark.sql.catalyst.InternalRow
    +import org.apache.spark.sql.types.{AtomicType, CalendarIntervalType, DataType, MapType}
    +
    +/**
    + * A builder of [[ArrayBasedMapData]], which fails if a null map key is detected, and removes
    + * duplicated map keys w.r.t. the last wins policy.
    + */
    +class ArrayBasedMapBuilder(keyType: DataType, valueType: DataType) extends Serializable {
    +  assert(!keyType.existsRecursively(_.isInstanceOf[MapType]), "key of map cannot be/contain map")
    +
    +  private lazy val keyToIndex = keyType match {
    +    case _: AtomicType | _: CalendarIntervalType => mutable.HashMap.empty[Any, Int]
    +    case _ =>
    +      // for complex types, use interpreted ordering to be able to compare unsafe data with safe
    +      // data, e.g. UnsafeRow vs GenericInternalRow.
    +      mutable.TreeMap.empty[Any, Int](TypeUtils.getInterpretedOrdering(keyType))
    +  }
    +
    +  // TODO: specialize it
    +  private lazy val keys = mutable.ArrayBuffer.empty[Any]
    +  private lazy val values = mutable.ArrayBuffer.empty[Any]
    +
    +  private lazy val keyGetter = InternalRow.getAccessor(keyType)
    +  private lazy val valueGetter = InternalRow.getAccessor(valueType)
    +
    +  def reset(): Unit = {
    +    keyToIndex.clear()
    +    keys.clear()
    +    values.clear()
    +  }
    +
    +  def put(key: Any, value: Any): Unit = {
    +    if (key == null) {
    +      throw new RuntimeException("Cannot use null as map key.")
    +    }
    +
    +    val maybeExistingIdx = keyToIndex.get(key)
    +    if (maybeExistingIdx.isDefined) {
    +      // Overwrite the previous value, as the policy is last wins.
    +      values(maybeExistingIdx.get) = value
    +    } else {
    +      keyToIndex.put(key, values.length)
    +      keys.append(key)
    +      values.append(value)
    +    }
    +  }
    +
    +  // write a 2-field row, the first field is key and the second field is value.
    +  def put(entry: InternalRow): Unit = {
    +    if (entry.isNullAt(0)) {
    +      throw new RuntimeException("Cannot use null as map key.")
    +    }
    +    put(keyGetter(entry, 0), valueGetter(entry, 1))
    +  }
    +
    +  def putAll(keyArray: Array[Any], valueArray: Array[Any]): Unit = {
    +    if (keyArray.length != valueArray.length) {
    +      throw new RuntimeException(
    +        "The key array and value array of MapData must have the same length.")
    +    }
    +
    +    var i = 0
    +    while (i < keyArray.length) {
    +      put(keyArray(i), valueArray(i))
    +      i += 1
    +    }
    +  }
    +
    +  def putAll(keyArray: ArrayData, valueArray: ArrayData): Unit = {
    +    if (keyArray.numElements() != valueArray.numElements()) {
    +      throw new RuntimeException(
    +        "The key array and value array of MapData must have the same length.")
    +    }
    +
    +    var i = 0
    +    while (i < keyArray.numElements()) {
    +      put(keyGetter(keyArray, i), valueGetter(valueArray, i))
    +      i += 1
    +    }
    +  }
    +
    +  def build(): ArrayBasedMapData = {
    +    new ArrayBasedMapData(new GenericArrayData(keys.toArray), new GenericArrayData(values.toArray))
    +  }
    +
    +  def from(keyArray: ArrayData, valueArray: ArrayData): ArrayBasedMapData = {
    +    assert(keyToIndex.isEmpty, "'from' can only be called with a fresh GenericMapBuilder.")
    +    putAll(keyArray, valueArray)
    --- End diff --
    
    no we can't, as we still need to detect duplicated keys.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #23124: [SPARK-25829][SQL] remove duplicated map keys with last ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/23124
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/5445/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #23124: [SPARK-25829][SQL] remove duplicated map keys wit...

Posted by ueshin <gi...@git.apache.org>.
Github user ueshin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/23124#discussion_r236955791
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ArrayBasedMapBuilder.scala ---
    @@ -0,0 +1,118 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.catalyst.util
    +
    +import scala.collection.mutable
    +
    +import org.apache.spark.sql.catalyst.InternalRow
    +import org.apache.spark.sql.types._
    +
    +/**
    + * A builder of [[ArrayBasedMapData]], which fails if a null map key is detected, and removes
    + * duplicated map keys w.r.t. the last wins policy.
    + */
    +class ArrayBasedMapBuilder(keyType: DataType, valueType: DataType) extends Serializable {
    +  assert(!keyType.existsRecursively(_.isInstanceOf[MapType]), "key of map cannot be/contain map")
    +  assert(keyType != NullType, "map key cannot be null type.")
    +
    +  private lazy val keyToIndex = keyType match {
    +    case _: AtomicType | _: CalendarIntervalType => mutable.HashMap.empty[Any, Int]
    --- End diff --
    
    We need to exempt `BinaryType` from `AtomicType` here.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #23124: [SPARK-25829][SQL] remove duplicated map keys wit...

Posted by ueshin <gi...@git.apache.org>.
Github user ueshin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/23124#discussion_r236958252
  
    --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/ArrayBasedMapBuilderSuite.scala ---
    @@ -0,0 +1,91 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.catalyst.util
    +
    +import org.apache.spark.SparkFunSuite
    +import org.apache.spark.sql.catalyst.InternalRow
    +import org.apache.spark.sql.catalyst.expressions.{UnsafeArrayData, UnsafeRow}
    +import org.apache.spark.sql.types.{ArrayType, IntegerType, StructType}
    +import org.apache.spark.unsafe.Platform
    +
    +class ArrayBasedMapBuilderSuite extends SparkFunSuite {
    +
    +  test("basic") {
    +    val builder = new ArrayBasedMapBuilder(IntegerType, IntegerType)
    +    builder.put(1, 1)
    +    builder.put(InternalRow(2, 2))
    +    builder.putAll(new GenericArrayData(Seq(3)), new GenericArrayData(Seq(3)))
    +    val map = builder.build()
    +    assert(map.numElements() == 3)
    +    assert(ArrayBasedMapData.toScalaMap(map) == Map(1 -> 1, 2 -> 2, 3 -> 3))
    +  }
    +
    +  test("fail with null key") {
    +    val builder = new ArrayBasedMapBuilder(IntegerType, IntegerType)
    +    builder.put(1, null) // null value is OK
    +    val e = intercept[RuntimeException](builder.put(null, 1))
    +    assert(e.getMessage.contains("Cannot use null as map key"))
    +  }
    +
    +  test("remove duplicated keys with last wins policy") {
    +    val builder = new ArrayBasedMapBuilder(IntegerType, IntegerType)
    +    builder.put(1, 1)
    +    builder.put(2, 2)
    +    builder.put(1, 2)
    +    val map = builder.build()
    +    assert(map.numElements() == 2)
    +    assert(ArrayBasedMapData.toScalaMap(map) == Map(1 -> 2, 2 -> 2))
    +  }
    +
    +  test("struct type key") {
    +    val builder = new ArrayBasedMapBuilder(new StructType().add("i", "int"), IntegerType)
    +    builder.put(InternalRow(1), 1)
    +    builder.put(InternalRow(2), 2)
    +    val unsafeRow = {
    +      val row = new UnsafeRow(1)
    +      val bytes = new Array[Byte](16)
    +      row.pointTo(bytes, 16)
    +      row.setInt(0, 1)
    +      row
    +    }
    +    builder.put(unsafeRow, 3)
    +    val map = builder.build()
    +    assert(map.numElements() == 2)
    +    assert(ArrayBasedMapData.toScalaMap(map) == Map(InternalRow(1) -> 3, InternalRow(2) -> 2))
    +  }
    +
    +  test("array type key") {
    +    val builder = new ArrayBasedMapBuilder(ArrayType(IntegerType), IntegerType)
    +    builder.put(new GenericArrayData(Seq(1, 1)), 1)
    +    builder.put(new GenericArrayData(Seq(2, 2)), 2)
    +    val unsafeArray = {
    +      val array = new UnsafeArrayData()
    +      val bytes = new Array[Byte](24)
    +      Platform.putLong(bytes, Platform.BYTE_ARRAY_OFFSET, 2)
    +      array.pointTo(bytes, Platform.BYTE_ARRAY_OFFSET, 24)
    +      array.setInt(0, 1)
    +      array.setInt(1, 1)
    +      array
    +    }
    +    builder.put(unsafeArray, 3)
    +    val map = builder.build()
    +    assert(map.numElements() == 2)
    +    assert(ArrayBasedMapData.toScalaMap(map) ==
    +      Map(new GenericArrayData(Seq(1, 1)) -> 3, new GenericArrayData(Seq(2, 2)) -> 2))
    +  }
    --- End diff --
    
    We should have a binary type key test as well?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #23124: [SPARK-25829][SQL] remove duplicated map keys wit...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/23124#discussion_r236112390
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala ---
    @@ -646,34 +633,35 @@ case class MapConcat(children: Seq[Expression]) extends ComplexTypeMergingExpres
     
         val mapMerge =
           s"""
    -        |${ev.isNull} = $hasNullName;
    -        |if (!${ev.isNull}) {
    -        |  $arrayDataClass[] $keyArgsName = new $arrayDataClass[${mapCodes.size}];
    -        |  $arrayDataClass[] $valArgsName = new $arrayDataClass[${mapCodes.size}];
    -        |  long $numElementsName = 0;
    -        |  for (int $idxName = 0; $idxName < $argsName.length; $idxName++) {
    -        |    $keyArgsName[$idxName] = $argsName[$idxName].keyArray();
    -        |    $valArgsName[$idxName] = $argsName[$idxName].valueArray();
    -        |    $numElementsName += $argsName[$idxName].numElements();
    -        |  }
    -        |  if ($numElementsName > ${ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH}) {
    -        |    throw new RuntimeException("Unsuccessful attempt to concat maps with " +
    -        |       $numElementsName + " elements due to exceeding the map size limit " +
    -        |       "${ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH}.");
    -        |  }
    -        |  $arrayDataClass $finKeysName = $keyConcat($keyArgsName,
    -        |    (int) $numElementsName);
    -        |  $arrayDataClass $finValsName = $valueConcat($valArgsName,
    -        |    (int) $numElementsName);
    -        |  ${ev.value} = new $arrayBasedMapDataClass($finKeysName, $finValsName);
    +        |ArrayData[] $keyArgsName = new ArrayData[${mapCodes.size}];
    +        |ArrayData[] $valArgsName = new ArrayData[${mapCodes.size}];
    +        |long $numElementsName = 0;
    +        |for (int $idxName = 0; $idxName < $argsName.length; $idxName++) {
    +        |  $keyArgsName[$idxName] = $argsName[$idxName].keyArray();
    +        |  $valArgsName[$idxName] = $argsName[$idxName].valueArray();
    +        |  $numElementsName += $argsName[$idxName].numElements();
             |}
    +        |if ($numElementsName > ${ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH}) {
    --- End diff --
    
    yup. I actually did what you proposed at first, and then realized it's different from before and may introduce perf regression. We can investigate it in a followup.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #23124: [SPARK-25829][SQL] remove duplicated map keys wit...

Posted by dongjoon-hyun <gi...@git.apache.org>.
Github user dongjoon-hyun commented on a diff in the pull request:

    https://github.com/apache/spark/pull/23124#discussion_r235852779
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ArrayBasedMapBuilder.scala ---
    @@ -0,0 +1,118 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.catalyst.util
    +
    +import scala.collection.mutable
    +
    +import org.apache.spark.sql.catalyst.InternalRow
    +import org.apache.spark.sql.types.{AtomicType, CalendarIntervalType, DataType, MapType}
    +
    +/**
    + * A builder of [[ArrayBasedMapData]], which fails if a null map key is detected, and removes
    + * duplicated map keys w.r.t. the last wins policy.
    + */
    +class ArrayBasedMapBuilder(keyType: DataType, valueType: DataType) extends Serializable {
    +  assert(!keyType.existsRecursively(_.isInstanceOf[MapType]), "key of map cannot be/contain map")
    +
    +  private lazy val keyToIndex = keyType match {
    +    case _: AtomicType | _: CalendarIntervalType => mutable.HashMap.empty[Any, Int]
    +    case _ =>
    +      // for complex types, use interpreted ordering to be able to compare unsafe data with safe
    +      // data, e.g. UnsafeRow vs GenericInternalRow.
    +      mutable.TreeMap.empty[Any, Int](TypeUtils.getInterpretedOrdering(keyType))
    --- End diff --
    
    ```scala
    scala> sql("select map(null,2)")
    res1: org.apache.spark.sql.DataFrame = [map(NULL, 2): map<null,int>]
    
    scala> sql("select map(null,2)").collect
    scala.MatchError: NullType (of class org.apache.spark.sql.types.NullType$)
      at org.apache.spark.sql.catalyst.util.TypeUtils$.getInterpretedOrdering(TypeUtils.scala:67)
      at org.apache.spark.sql.catalyst.util.ArrayBasedMapBuilder.keyToIndex$lzycompute(ArrayBasedMapBuilder.scala:37)
    ```


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #23124: [SPARK-25829][SQL] remove duplicated map keys wit...

Posted by dongjoon-hyun <gi...@git.apache.org>.
Github user dongjoon-hyun commented on a diff in the pull request:

    https://github.com/apache/spark/pull/23124#discussion_r235851798
  
    --- Diff: docs/sql-migration-guide-upgrade.md ---
    @@ -19,6 +19,8 @@ displayTitle: Spark SQL Upgrading Guide
     
       - In Spark version 2.4 and earlier, users can create map values with map type key via built-in function like `CreateMap`, `MapFromArrays`, etc. Since Spark 3.0, it's not allowed to create map values with map type key with these built-in functions. Users can still read map values with map type key from data source or Java/Scala collections, though they are not very useful.
     
    +  - In Spark version 2.4 and earlier, users can create a map with duplicated keys via built-in functions like `CreateMap`, `StringToMap`, etc. The behavior of map with duplicated keys is undefined, e.g. map look up respects the duplicated key appears first, `Dataset.collect` only keeps the duplicated key appears last, `MapKeys` returns duplicated keys, etc. Since Spark 3.0, these built-in functions will remove duplicated map keys with last wins policy.
    --- End diff --
    
    Can we merge this with the above sentence at line 20? Both are different, but are related very strongly.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #23124: [SPARK-25829][SQL] remove duplicated map keys with last ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/23124
  
    **[Test build #99368 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/99368/testReport)** for PR 23124 at commit [`72c771e`](https://github.com/apache/spark/commit/72c771e691ae1071742bde5a612593d38f147ff5).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #23124: [SPARK-25829][SQL] remove duplicated map keys wit...

Posted by kiszk <gi...@git.apache.org>.
Github user kiszk commented on a diff in the pull request:

    https://github.com/apache/spark/pull/23124#discussion_r235947044
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ArrayBasedMapBuilder.scala ---
    @@ -0,0 +1,118 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.catalyst.util
    +
    +import scala.collection.mutable
    +
    +import org.apache.spark.sql.catalyst.InternalRow
    +import org.apache.spark.sql.types.{AtomicType, CalendarIntervalType, DataType, MapType}
    +
    +/**
    + * A builder of [[ArrayBasedMapData]], which fails if a null map key is detected, and removes
    + * duplicated map keys w.r.t. the last wins policy.
    + */
    +class ArrayBasedMapBuilder(keyType: DataType, valueType: DataType) extends Serializable {
    +  assert(!keyType.existsRecursively(_.isInstanceOf[MapType]), "key of map cannot be/contain map")
    +
    +  private lazy val keyToIndex = keyType match {
    +    case _: AtomicType | _: CalendarIntervalType => mutable.HashMap.empty[Any, Int]
    +    case _ =>
    +      // for complex types, use interpreted ordering to be able to compare unsafe data with safe
    +      // data, e.g. UnsafeRow vs GenericInternalRow.
    +      mutable.TreeMap.empty[Any, Int](TypeUtils.getInterpretedOrdering(keyType))
    +  }
    +
    +  // TODO: specialize it
    +  private lazy val keys = mutable.ArrayBuffer.empty[Any]
    +  private lazy val values = mutable.ArrayBuffer.empty[Any]
    +
    +  private lazy val keyGetter = InternalRow.getAccessor(keyType)
    +  private lazy val valueGetter = InternalRow.getAccessor(valueType)
    +
    +  def reset(): Unit = {
    +    keyToIndex.clear()
    +    keys.clear()
    +    values.clear()
    +  }
    +
    +  def put(key: Any, value: Any): Unit = {
    +    if (key == null) {
    +      throw new RuntimeException("Cannot use null as map key.")
    +    }
    +
    +    val maybeExistingIdx = keyToIndex.get(key)
    +    if (maybeExistingIdx.isDefined) {
    +      // Overwrite the previous value, as the policy is last wins.
    +      values(maybeExistingIdx.get) = value
    +    } else {
    +      keyToIndex.put(key, values.length)
    +      keys.append(key)
    +      values.append(value)
    +    }
    +  }
    +
    +  // write a 2-field row, the first field is key and the second field is value.
    +  def put(entry: InternalRow): Unit = {
    +    if (entry.isNullAt(0)) {
    +      throw new RuntimeException("Cannot use null as map key.")
    +    }
    +    put(keyGetter(entry, 0), valueGetter(entry, 1))
    +  }
    +
    +  def putAll(keyArray: Array[Any], valueArray: Array[Any]): Unit = {
    +    if (keyArray.length != valueArray.length) {
    +      throw new RuntimeException(
    +        "The key array and value array of MapData must have the same length.")
    +    }
    +
    +    var i = 0
    +    while (i < keyArray.length) {
    +      put(keyArray(i), valueArray(i))
    +      i += 1
    +    }
    +  }
    +
    +  def putAll(keyArray: ArrayData, valueArray: ArrayData): Unit = {
    +    if (keyArray.numElements() != valueArray.numElements()) {
    +      throw new RuntimeException(
    +        "The key array and value array of MapData must have the same length.")
    +    }
    +
    +    var i = 0
    +    while (i < keyArray.numElements()) {
    +      put(keyGetter(keyArray, i), valueGetter(valueArray, i))
    +      i += 1
    +    }
    +  }
    +
    +  def build(): ArrayBasedMapData = {
    +    new ArrayBasedMapData(new GenericArrayData(keys.toArray), new GenericArrayData(values.toArray))
    +  }
    +
    +  def from(keyArray: ArrayData, valueArray: ArrayData): ArrayBasedMapData = {
    +    assert(keyToIndex.isEmpty, "'from' can only be called with a fresh GenericMapBuilder.")
    +    putAll(keyArray, valueArray)
    --- End diff --
    
    Can we call `new ArrayBasedMapData(keyArray, valueArray)` without calling `putAll(keyArray, valueArray)` if `keyArray.asInstanceOf[ArrayData].containsNull` is false?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #23124: [SPARK-25829][SQL] remove duplicated map keys with last ...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on the issue:

    https://github.com/apache/spark/pull/23124
  
    retest this please


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #23124: [SPARK-25829][SQL] remove duplicated map keys with last ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/23124
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/5434/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #23124: [SPARK-25829][SQL] remove duplicated map keys wit...

Posted by bersprockets <gi...@git.apache.org>.
Github user bersprockets commented on a diff in the pull request:

    https://github.com/apache/spark/pull/23124#discussion_r236952729
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ArrayBasedMapBuilder.scala ---
    @@ -0,0 +1,118 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.catalyst.util
    +
    +import scala.collection.mutable
    +
    +import org.apache.spark.sql.catalyst.InternalRow
    +import org.apache.spark.sql.types._
    +
    +/**
    + * A builder of [[ArrayBasedMapData]], which fails if a null map key is detected, and removes
    + * duplicated map keys w.r.t. the last wins policy.
    + */
    +class ArrayBasedMapBuilder(keyType: DataType, valueType: DataType) extends Serializable {
    +  assert(!keyType.existsRecursively(_.isInstanceOf[MapType]), "key of map cannot be/contain map")
    +  assert(keyType != NullType, "map key cannot be null type.")
    +
    +  private lazy val keyToIndex = keyType match {
    +    case _: AtomicType | _: CalendarIntervalType => mutable.HashMap.empty[Any, Int]
    --- End diff --
    
    FYI: I had a test lying around from when I worked on map_concat. With this PR:
    
    - map_concat of two small maps (20 string keys per map, no dups) for 2M rows is about 17% slower.
    - map_concat of two big maps (500 string keys per map, no dups) for 1M rows is about 25% slower.
    
    The baseline code is the same branch as the PR, but without the 4 commits.
    
    Some cost makes sense, as we're checking for dups, but it's odd that the overhead grows disproportionately as the size of the maps grows.
    
    
    I remember that at one time, mutable.HashMap had some performance issues (rumor has it, anyway). So as a test, I modified ArrayBasedMapBuilder.scala to use java.util.Hashmap instead. After that:
    
    - map_concat of two small maps (20 string keys per map, no dups) for 2M rows is about 12% slower.
    - map_concat of two big maps (500 string keys per map, no dups) for 1M rows is about 15% slower.
    
    It's a little more proportionate. I don't know if switching HashMap implementations would have some negative consequences.
    
    Also, my test is a dumb benchmark that uses System.currentTimeMillis concatenating simple [String,Integer] maps.
    
    



---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #23124: [SPARK-25829][SQL] remove duplicated map keys with last ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/23124
  
    **[Test build #99209 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/99209/testReport)** for PR 23124 at commit [`cbcd5d7`](https://github.com/apache/spark/commit/cbcd5d7a937f8120ef8527f1f26150ed93f1de0a).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #23124: [SPARK-25829][SQL] remove duplicated map keys with last ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/23124
  
    **[Test build #99312 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/99312/testReport)** for PR 23124 at commit [`abd0ec5`](https://github.com/apache/spark/commit/abd0ec543a944fa02320337f4fab7fff6ffe9667).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #23124: [SPARK-25829][SQL] remove duplicated map keys wit...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on a diff in the pull request:

    https://github.com/apache/spark/pull/23124#discussion_r235870945
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ArrayBasedMapBuilder.scala ---
    @@ -0,0 +1,118 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.catalyst.util
    +
    +import scala.collection.mutable
    +
    +import org.apache.spark.sql.catalyst.InternalRow
    +import org.apache.spark.sql.types.{AtomicType, CalendarIntervalType, DataType, MapType}
    +
    +/**
    + * A builder of [[ArrayBasedMapData]], which fails if a null map key is detected, and removes
    + * duplicated map keys w.r.t. the last wins policy.
    + */
    +class ArrayBasedMapBuilder(keyType: DataType, valueType: DataType) extends Serializable {
    +  assert(!keyType.existsRecursively(_.isInstanceOf[MapType]), "key of map cannot be/contain map")
    +
    +  private lazy val keyToIndex = keyType match {
    +    case _: AtomicType | _: CalendarIntervalType => mutable.HashMap.empty[Any, Int]
    +    case _ =>
    +      // for complex types, use interpreted ordering to be able to compare unsafe data with safe
    +      // data, e.g. UnsafeRow vs GenericInternalRow.
    +      mutable.TreeMap.empty[Any, Int](TypeUtils.getInterpretedOrdering(keyType))
    +  }
    +
    +  // TODO: specialize it
    +  private lazy val keys = mutable.ArrayBuffer.empty[Any]
    +  private lazy val values = mutable.ArrayBuffer.empty[Any]
    +
    +  private lazy val keyGetter = InternalRow.getAccessor(keyType)
    +  private lazy val valueGetter = InternalRow.getAccessor(valueType)
    +
    +  def reset(): Unit = {
    +    keyToIndex.clear()
    +    keys.clear()
    +    values.clear()
    +  }
    +
    +  def put(key: Any, value: Any): Unit = {
    +    if (key == null) {
    +      throw new RuntimeException("Cannot use null as map key.")
    +    }
    +
    +    val maybeExistingIdx = keyToIndex.get(key)
    +    if (maybeExistingIdx.isDefined) {
    +      // Overwrite the previous value, as the policy is last wins.
    +      values(maybeExistingIdx.get) = value
    +    } else {
    +      keyToIndex.put(key, values.length)
    +      keys.append(key)
    +      values.append(value)
    +    }
    +  }
    +
    +  // write a 2-field row, the first field is key and the second field is value.
    +  def put(entry: InternalRow): Unit = {
    +    if (entry.isNullAt(0)) {
    +      throw new RuntimeException("Cannot use null as map key.")
    +    }
    +    put(keyGetter(entry, 0), valueGetter(entry, 1))
    +  }
    +
    +  def putAll(keyArray: Array[Any], valueArray: Array[Any]): Unit = {
    +    if (keyArray.length != valueArray.length) {
    +      throw new RuntimeException(
    +        "The key array and value array of MapData must have the same length.")
    +    }
    +
    +    var i = 0
    +    while (i < keyArray.length) {
    +      put(keyArray(i), valueArray(i))
    +      i += 1
    +    }
    +  }
    +
    +  def putAll(keyArray: ArrayData, valueArray: ArrayData): Unit = {
    +    if (keyArray.numElements() != valueArray.numElements()) {
    +      throw new RuntimeException(
    +        "The key array and value array of MapData must have the same length.")
    +    }
    +
    +    var i = 0
    +    while (i < keyArray.numElements()) {
    +      put(keyGetter(keyArray, i), valueGetter(valueArray, i))
    +      i += 1
    +    }
    +  }
    +
    +  def build(): ArrayBasedMapData = {
    +    new ArrayBasedMapData(new GenericArrayData(keys.toArray), new GenericArrayData(values.toArray))
    +  }
    +
    +  def from(keyArray: ArrayData, valueArray: ArrayData): ArrayBasedMapData = {
    +    assert(keyToIndex.isEmpty, "'from' can only be called with a fresh GenericMapBuilder.")
    --- End diff --
    
    `GenericMapBuilder`?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #23124: [SPARK-25829][SQL] remove duplicated map keys with last ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/23124
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/99209/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #23124: [SPARK-25829][SQL] remove duplicated map keys wit...

Posted by dongjoon-hyun <gi...@git.apache.org>.
Github user dongjoon-hyun commented on a diff in the pull request:

    https://github.com/apache/spark/pull/23124#discussion_r235999222
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ArrayBasedMapBuilder.scala ---
    @@ -0,0 +1,118 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.catalyst.util
    +
    +import scala.collection.mutable
    +
    +import org.apache.spark.sql.catalyst.InternalRow
    +import org.apache.spark.sql.types.{AtomicType, CalendarIntervalType, DataType, MapType}
    +
    +/**
    + * A builder of [[ArrayBasedMapData]], which fails if a null map key is detected, and removes
    + * duplicated map keys w.r.t. the last wins policy.
    + */
    +class ArrayBasedMapBuilder(keyType: DataType, valueType: DataType) extends Serializable {
    +  assert(!keyType.existsRecursively(_.isInstanceOf[MapType]), "key of map cannot be/contain map")
    +
    +  private lazy val keyToIndex = keyType match {
    +    case _: AtomicType | _: CalendarIntervalType => mutable.HashMap.empty[Any, Int]
    +    case _ =>
    +      // for complex types, use interpreted ordering to be able to compare unsafe data with safe
    +      // data, e.g. UnsafeRow vs GenericInternalRow.
    +      mutable.TreeMap.empty[Any, Int](TypeUtils.getInterpretedOrdering(keyType))
    --- End diff --
    
    After merging this PR, I'll check again and file a JIRA for that.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #23124: [SPARK-25829][SQL] remove duplicated map keys wit...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/23124#discussion_r236112177
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala ---
    @@ -546,33 +546,29 @@ case class MapConcat(children: Seq[Expression]) extends ComplexTypeMergingExpres
     
       override def nullable: Boolean = children.exists(_.nullable)
     
    +  private lazy val mapBuilder = new ArrayBasedMapBuilder(dataType.keyType, dataType.valueType)
    +
       override def eval(input: InternalRow): Any = {
    -    val maps = children.map(_.eval(input))
    +    val maps = children.map(_.eval(input).asInstanceOf[MapData]).toArray
    --- End diff --
    
    in scala, while loop is faster than `foreach`. If you look at `Expression.eval` implementations, we use while loop a lot even `foreach` can produce simpler code.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #23124: [SPARK-25829][SQL] remove duplicated map keys with last ...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on the issue:

    https://github.com/apache/spark/pull/23124
  
    retest this please.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #23124: [SPARK-25829][SQL] remove duplicated map keys wit...

Posted by gatorsmile <gi...@git.apache.org>.
Github user gatorsmile commented on a diff in the pull request:

    https://github.com/apache/spark/pull/23124#discussion_r240104815
  
    --- Diff: docs/sql-migration-guide-upgrade.md ---
    @@ -27,6 +27,8 @@ displayTitle: Spark SQL Upgrading Guide
     
       - In Spark version 2.4 and earlier, float/double -0.0 is semantically equal to 0.0, but users can still distinguish them via `Dataset.show`, `Dataset.collect` etc. Since Spark 3.0, float/double -0.0 is replaced by 0.0 internally, and users can't distinguish them any more.
     
    +  - In Spark version 2.4 and earlier, users can create a map with duplicated keys via built-in functions like `CreateMap`, `StringToMap`, etc. The behavior of map with duplicated keys is undefined, e.g. map look up respects the duplicated key appears first, `Dataset.collect` only keeps the duplicated key appears last, `MapKeys` returns duplicated keys, etc. Since Spark 3.0, these built-in functions will remove duplicated map keys with last wins policy. Users may still read map values with duplicated keys from data sources which do not enforce it (e.g. Parquet), the behavior will be udefined.
    --- End diff --
    
    A few typos. How about?
    
    ```
    In Spark version 2.4 and earlier, users can create a map with duplicate keys via built-in functions like `CreateMap` and `StringToMap`. The behavior of map with duplicate keys is undefined. For example, the map lookup respects the duplicate key that appears first, `Dataset.collect` only keeps the duplicate key that appears last, and `MapKeys` returns duplicate keys. Since Spark 3.0, these built-in functions will remove duplicate map keys using the last-one-wins policy. Users may still read map values with duplicate keys from the data sources that do not enforce it (e.g. Parquet), but the behavior will be undefined.
    ```


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #23124: [SPARK-25829][SQL] remove duplicated map keys with last ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/23124
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/99357/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #23124: [SPARK-25829][SQL] remove duplicated map keys with last ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/23124
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #23124: [SPARK-25829][SQL] remove duplicated map keys wit...

Posted by kiszk <gi...@git.apache.org>.
Github user kiszk commented on a diff in the pull request:

    https://github.com/apache/spark/pull/23124#discussion_r235952965
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ArrayBasedMapBuilder.scala ---
    @@ -0,0 +1,118 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.catalyst.util
    +
    +import scala.collection.mutable
    +
    +import org.apache.spark.sql.catalyst.InternalRow
    +import org.apache.spark.sql.types.{AtomicType, CalendarIntervalType, DataType, MapType}
    +
    +/**
    + * A builder of [[ArrayBasedMapData]], which fails if a null map key is detected, and removes
    + * duplicated map keys w.r.t. the last wins policy.
    + */
    +class ArrayBasedMapBuilder(keyType: DataType, valueType: DataType) extends Serializable {
    +  assert(!keyType.existsRecursively(_.isInstanceOf[MapType]), "key of map cannot be/contain map")
    +
    +  private lazy val keyToIndex = keyType match {
    +    case _: AtomicType | _: CalendarIntervalType => mutable.HashMap.empty[Any, Int]
    +    case _ =>
    +      // for complex types, use interpreted ordering to be able to compare unsafe data with safe
    +      // data, e.g. UnsafeRow vs GenericInternalRow.
    +      mutable.TreeMap.empty[Any, Int](TypeUtils.getInterpretedOrdering(keyType))
    +  }
    +
    +  // TODO: specialize it
    +  private lazy val keys = mutable.ArrayBuffer.empty[Any]
    +  private lazy val values = mutable.ArrayBuffer.empty[Any]
    +
    +  private lazy val keyGetter = InternalRow.getAccessor(keyType)
    +  private lazy val valueGetter = InternalRow.getAccessor(valueType)
    +
    +  def reset(): Unit = {
    +    keyToIndex.clear()
    +    keys.clear()
    +    values.clear()
    +  }
    +
    +  def put(key: Any, value: Any): Unit = {
    +    if (key == null) {
    +      throw new RuntimeException("Cannot use null as map key.")
    +    }
    +
    +    val maybeExistingIdx = keyToIndex.get(key)
    +    if (maybeExistingIdx.isDefined) {
    +      // Overwrite the previous value, as the policy is last wins.
    +      values(maybeExistingIdx.get) = value
    +    } else {
    +      keyToIndex.put(key, values.length)
    +      keys.append(key)
    +      values.append(value)
    +    }
    +  }
    +
    +  // write a 2-field row, the first field is key and the second field is value.
    +  def put(entry: InternalRow): Unit = {
    +    if (entry.isNullAt(0)) {
    +      throw new RuntimeException("Cannot use null as map key.")
    +    }
    +    put(keyGetter(entry, 0), valueGetter(entry, 1))
    +  }
    +
    +  def putAll(keyArray: Array[Any], valueArray: Array[Any]): Unit = {
    +    if (keyArray.length != valueArray.length) {
    +      throw new RuntimeException(
    +        "The key array and value array of MapData must have the same length.")
    +    }
    +
    +    var i = 0
    +    while (i < keyArray.length) {
    +      put(keyArray(i), valueArray(i))
    +      i += 1
    +    }
    +  }
    +
    +  def putAll(keyArray: ArrayData, valueArray: ArrayData): Unit = {
    +    if (keyArray.numElements() != valueArray.numElements()) {
    +      throw new RuntimeException(
    +        "The key array and value array of MapData must have the same length.")
    +    }
    +
    +    var i = 0
    +    while (i < keyArray.numElements()) {
    +      put(keyGetter(keyArray, i), valueGetter(valueArray, i))
    +      i += 1
    +    }
    +  }
    +
    +  def build(): ArrayBasedMapData = {
    +    new ArrayBasedMapData(new GenericArrayData(keys.toArray), new GenericArrayData(values.toArray))
    --- End diff --
    
    
    
    Is it better to call reset() after calling new ArrayBasedMapData to reduce memory consumption in Java heap?
    
    At caller side, ArrayBasedMapBuilder is not released. Therefore, until reset() will be called next time, each ArrayBasedMapBuilder keeps unused data in keys, values, and keyToIndex. They consumes Java heap unexpectedly.



---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #23124: [SPARK-25829][SQL] remove duplicated map keys with last ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/23124
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/5303/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #23124: [SPARK-25829][SQL] remove duplicated map keys with last ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/23124
  
    **[Test build #99368 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/99368/testReport)** for PR 23124 at commit [`72c771e`](https://github.com/apache/spark/commit/72c771e691ae1071742bde5a612593d38f147ff5).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #23124: [SPARK-25829][SQL] remove duplicated map keys with last ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/23124
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #23124: [SPARK-25829][SQL] remove duplicated map keys with last ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/23124
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/99368/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #23124: [SPARK-25829][SQL] remove duplicated map keys wit...

Posted by kiszk <gi...@git.apache.org>.
Github user kiszk commented on a diff in the pull request:

    https://github.com/apache/spark/pull/23124#discussion_r236376102
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/DataFrameFunctionsSuite.scala ---
    @@ -89,7 +89,7 @@ class DataFrameFunctionsSuite extends QueryTest with SharedSQLContext {
         val msg1 = intercept[Exception] {
           df5.select(map_from_arrays($"k", $"v")).collect
         }.getMessage
    -    assert(msg1.contains("Cannot use null as map key!"))
    +    assert(msg1.contains("Cannot use null as map key"))
    --- End diff --
    
    Message at Line 98 is also changed now.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #23124: [SPARK-25829][SQL] remove duplicated map keys with last ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/23124
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #23124: [SPARK-25829][SQL] remove duplicated map keys with last ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/23124
  
    **[Test build #99362 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/99362/testReport)** for PR 23124 at commit [`6dff654`](https://github.com/apache/spark/commit/6dff6545f272e0d5117ac17fdc27b686573c5626).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #23124: [SPARK-25829][SQL] remove duplicated map keys wit...

Posted by mgaido91 <gi...@git.apache.org>.
Github user mgaido91 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/23124#discussion_r235865179
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala ---
    @@ -546,33 +546,29 @@ case class MapConcat(children: Seq[Expression]) extends ComplexTypeMergingExpres
     
       override def nullable: Boolean = children.exists(_.nullable)
     
    +  private lazy val mapBuilder = new ArrayBasedMapBuilder(dataType.keyType, dataType.valueType)
    +
       override def eval(input: InternalRow): Any = {
    -    val maps = children.map(_.eval(input))
    +    val maps = children.map(_.eval(input).asInstanceOf[MapData]).toArray
    --- End diff --
    
    why do we need `toArray` here?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #23124: [SPARK-25829][SQL] remove duplicated map keys with last ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/23124
  
    Merged build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #23124: [SPARK-25829][SQL] remove duplicated map keys with last ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/23124
  
    **[Test build #99357 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/99357/testReport)** for PR 23124 at commit [`6dff654`](https://github.com/apache/spark/commit/6dff6545f272e0d5117ac17fdc27b686573c5626).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #23124: [SPARK-25829][SQL] remove duplicated map keys with last ...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on the issue:

    https://github.com/apache/spark/pull/23124
  
    If not too verbose, we can update the `ExpressionDescription` of the built-in expressions to declare the last wins policy.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #23124: [SPARK-25829][SQL] remove duplicated map keys with last ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/23124
  
    Merged build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #23124: [SPARK-25829][SQL] remove duplicated map keys with last ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/23124
  
    **[Test build #99310 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/99310/testReport)** for PR 23124 at commit [`abd0ec5`](https://github.com/apache/spark/commit/abd0ec543a944fa02320337f4fab7fff6ffe9667).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #23124: [SPARK-25829][SQL] remove duplicated map keys wit...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/23124#discussion_r235928588
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala ---
    @@ -646,34 +633,35 @@ case class MapConcat(children: Seq[Expression]) extends ComplexTypeMergingExpres
     
         val mapMerge =
           s"""
    -        |${ev.isNull} = $hasNullName;
    -        |if (!${ev.isNull}) {
    -        |  $arrayDataClass[] $keyArgsName = new $arrayDataClass[${mapCodes.size}];
    -        |  $arrayDataClass[] $valArgsName = new $arrayDataClass[${mapCodes.size}];
    -        |  long $numElementsName = 0;
    -        |  for (int $idxName = 0; $idxName < $argsName.length; $idxName++) {
    -        |    $keyArgsName[$idxName] = $argsName[$idxName].keyArray();
    -        |    $valArgsName[$idxName] = $argsName[$idxName].valueArray();
    -        |    $numElementsName += $argsName[$idxName].numElements();
    -        |  }
    -        |  if ($numElementsName > ${ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH}) {
    -        |    throw new RuntimeException("Unsuccessful attempt to concat maps with " +
    -        |       $numElementsName + " elements due to exceeding the map size limit " +
    -        |       "${ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH}.");
    -        |  }
    -        |  $arrayDataClass $finKeysName = $keyConcat($keyArgsName,
    -        |    (int) $numElementsName);
    -        |  $arrayDataClass $finValsName = $valueConcat($valArgsName,
    -        |    (int) $numElementsName);
    -        |  ${ev.value} = new $arrayBasedMapDataClass($finKeysName, $finValsName);
    +        |ArrayData[] $keyArgsName = new ArrayData[${mapCodes.size}];
    +        |ArrayData[] $valArgsName = new ArrayData[${mapCodes.size}];
    +        |long $numElementsName = 0;
    +        |for (int $idxName = 0; $idxName < $argsName.length; $idxName++) {
    +        |  $keyArgsName[$idxName] = $argsName[$idxName].keyArray();
    +        |  $valArgsName[$idxName] = $argsName[$idxName].valueArray();
    +        |  $numElementsName += $argsName[$idxName].numElements();
             |}
    +        |if ($numElementsName > ${ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH}) {
    --- End diff --
    
    This check is done before the `putAll`, so that it can fail fast. I think it's fine to ignore duplicated keys here, to make it a more conservative. 


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #23124: [SPARK-25829][SQL] remove duplicated map keys with last ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/23124
  
    Merged build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #23124: [SPARK-25829][SQL] remove duplicated map keys wit...

Posted by kiszk <gi...@git.apache.org>.
Github user kiszk commented on a diff in the pull request:

    https://github.com/apache/spark/pull/23124#discussion_r235950148
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ArrayBasedMapBuilder.scala ---
    @@ -0,0 +1,118 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.catalyst.util
    +
    +import scala.collection.mutable
    +
    +import org.apache.spark.sql.catalyst.InternalRow
    +import org.apache.spark.sql.types.{AtomicType, CalendarIntervalType, DataType, MapType}
    +
    +/**
    + * A builder of [[ArrayBasedMapData]], which fails if a null map key is detected, and removes
    + * duplicated map keys w.r.t. the last wins policy.
    + */
    +class ArrayBasedMapBuilder(keyType: DataType, valueType: DataType) extends Serializable {
    +  assert(!keyType.existsRecursively(_.isInstanceOf[MapType]), "key of map cannot be/contain map")
    +
    +  private lazy val keyToIndex = keyType match {
    +    case _: AtomicType | _: CalendarIntervalType => mutable.HashMap.empty[Any, Int]
    +    case _ =>
    +      // for complex types, use interpreted ordering to be able to compare unsafe data with safe
    +      // data, e.g. UnsafeRow vs GenericInternalRow.
    +      mutable.TreeMap.empty[Any, Int](TypeUtils.getInterpretedOrdering(keyType))
    +  }
    +
    +  // TODO: specialize it
    +  private lazy val keys = mutable.ArrayBuffer.empty[Any]
    +  private lazy val values = mutable.ArrayBuffer.empty[Any]
    +
    +  private lazy val keyGetter = InternalRow.getAccessor(keyType)
    +  private lazy val valueGetter = InternalRow.getAccessor(valueType)
    +
    +  def reset(): Unit = {
    +    keyToIndex.clear()
    +    keys.clear()
    +    values.clear()
    +  }
    +
    +  def put(key: Any, value: Any): Unit = {
    +    if (key == null) {
    +      throw new RuntimeException("Cannot use null as map key.")
    +    }
    +
    +    val maybeExistingIdx = keyToIndex.get(key)
    +    if (maybeExistingIdx.isDefined) {
    +      // Overwrite the previous value, as the policy is last wins.
    +      values(maybeExistingIdx.get) = value
    +    } else {
    +      keyToIndex.put(key, values.length)
    +      keys.append(key)
    +      values.append(value)
    +    }
    +  }
    +
    +  // write a 2-field row, the first field is key and the second field is value.
    +  def put(entry: InternalRow): Unit = {
    +    if (entry.isNullAt(0)) {
    +      throw new RuntimeException("Cannot use null as map key.")
    +    }
    +    put(keyGetter(entry, 0), valueGetter(entry, 1))
    +  }
    +
    +  def putAll(keyArray: Array[Any], valueArray: Array[Any]): Unit = {
    +    if (keyArray.length != valueArray.length) {
    +      throw new RuntimeException(
    +        "The key array and value array of MapData must have the same length.")
    +    }
    +
    +    var i = 0
    +    while (i < keyArray.length) {
    +      put(keyArray(i), valueArray(i))
    +      i += 1
    +    }
    +  }
    +
    +  def putAll(keyArray: ArrayData, valueArray: ArrayData): Unit = {
    +    if (keyArray.numElements() != valueArray.numElements()) {
    +      throw new RuntimeException(
    +        "The key array and value array of MapData must have the same length.")
    +    }
    +
    +    var i = 0
    +    while (i < keyArray.numElements()) {
    +      put(keyGetter(keyArray, i), valueGetter(valueArray, i))
    +      i += 1
    +    }
    +  }
    +
    +  def build(): ArrayBasedMapData = {
    +    new ArrayBasedMapData(new GenericArrayData(keys.toArray), new GenericArrayData(values.toArray))
    +  }
    --- End diff --
    
    Is it better to call `reset()` after calling `new ArrayBasedMapData` to reduce memory consumption?
    
    At caller side, `ArrayBasedMapBuilder` is not released. Therefore, until reset() will be called next time, each `ArrayBasedMapBuilder` keeps unused data in `keys`, `values`, and `keyToIndex`. They consumes Java heap unexpectedly.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #23124: [SPARK-25829][SQL] remove duplicated map keys wit...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/23124#discussion_r236275822
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala ---
    @@ -546,33 +546,29 @@ case class MapConcat(children: Seq[Expression]) extends ComplexTypeMergingExpres
     
       override def nullable: Boolean = children.exists(_.nullable)
     
    +  private lazy val mapBuilder = new ArrayBasedMapBuilder(dataType.keyType, dataType.valueType)
    +
       override def eval(input: InternalRow): Any = {
    -    val maps = children.map(_.eval(input))
    +    val maps = children.map(_.eval(input).asInstanceOf[MapData]).toArray
    --- End diff --
    
    `toArray` is O(N) but we do it only once. If accessing by index is O(N), the total time complexity is O(N ^ 2).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #23124: [SPARK-25829][SQL] remove duplicated map keys wit...

Posted by mgaido91 <gi...@git.apache.org>.
Github user mgaido91 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/23124#discussion_r235866111
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala ---
    @@ -646,34 +633,35 @@ case class MapConcat(children: Seq[Expression]) extends ComplexTypeMergingExpres
     
         val mapMerge =
           s"""
    -        |${ev.isNull} = $hasNullName;
    -        |if (!${ev.isNull}) {
    -        |  $arrayDataClass[] $keyArgsName = new $arrayDataClass[${mapCodes.size}];
    -        |  $arrayDataClass[] $valArgsName = new $arrayDataClass[${mapCodes.size}];
    -        |  long $numElementsName = 0;
    -        |  for (int $idxName = 0; $idxName < $argsName.length; $idxName++) {
    -        |    $keyArgsName[$idxName] = $argsName[$idxName].keyArray();
    -        |    $valArgsName[$idxName] = $argsName[$idxName].valueArray();
    -        |    $numElementsName += $argsName[$idxName].numElements();
    -        |  }
    -        |  if ($numElementsName > ${ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH}) {
    -        |    throw new RuntimeException("Unsuccessful attempt to concat maps with " +
    -        |       $numElementsName + " elements due to exceeding the map size limit " +
    -        |       "${ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH}.");
    -        |  }
    -        |  $arrayDataClass $finKeysName = $keyConcat($keyArgsName,
    -        |    (int) $numElementsName);
    -        |  $arrayDataClass $finValsName = $valueConcat($valArgsName,
    -        |    (int) $numElementsName);
    -        |  ${ev.value} = new $arrayBasedMapDataClass($finKeysName, $finValsName);
    +        |ArrayData[] $keyArgsName = new ArrayData[${mapCodes.size}];
    +        |ArrayData[] $valArgsName = new ArrayData[${mapCodes.size}];
    +        |long $numElementsName = 0;
    +        |for (int $idxName = 0; $idxName < $argsName.length; $idxName++) {
    +        |  $keyArgsName[$idxName] = $argsName[$idxName].keyArray();
    +        |  $valArgsName[$idxName] = $argsName[$idxName].valueArray();
    +        |  $numElementsName += $argsName[$idxName].numElements();
             |}
    +        |if ($numElementsName > ${ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH}) {
    --- End diff --
    
    this check is not really correct, as we are not considering duplicates IIUC. I think we can change this behavior using `putAll` and checking the size in the loop.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #23124: [SPARK-25829][SQL] remove duplicated map keys with last ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/23124
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/5393/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #23124: [SPARK-25829][SQL] remove duplicated map keys wit...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/23124#discussion_r235929748
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ArrayBasedMapBuilder.scala ---
    @@ -0,0 +1,118 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.catalyst.util
    +
    +import scala.collection.mutable
    +
    +import org.apache.spark.sql.catalyst.InternalRow
    +import org.apache.spark.sql.types.{AtomicType, CalendarIntervalType, DataType, MapType}
    +
    +/**
    + * A builder of [[ArrayBasedMapData]], which fails if a null map key is detected, and removes
    + * duplicated map keys w.r.t. the last wins policy.
    + */
    +class ArrayBasedMapBuilder(keyType: DataType, valueType: DataType) extends Serializable {
    +  assert(!keyType.existsRecursively(_.isInstanceOf[MapType]), "key of map cannot be/contain map")
    +
    +  private lazy val keyToIndex = keyType match {
    +    case _: AtomicType | _: CalendarIntervalType => mutable.HashMap.empty[Any, Int]
    +    case _ =>
    +      // for complex types, use interpreted ordering to be able to compare unsafe data with safe
    +      // data, e.g. UnsafeRow vs GenericInternalRow.
    +      mutable.TreeMap.empty[Any, Int](TypeUtils.getInterpretedOrdering(keyType))
    --- End diff --
    
    I think we should fail it at analyzer phase, and other map-producing functions should do it as well. Can you create a JIRA for it? thanks!


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #23124: [SPARK-25829][SQL] remove duplicated map keys with last ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/23124
  
    Merged build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #23124: [SPARK-25829][SQL] remove duplicated map keys wit...

Posted by mgaido91 <gi...@git.apache.org>.
Github user mgaido91 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/23124#discussion_r236171035
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala ---
    @@ -646,34 +633,35 @@ case class MapConcat(children: Seq[Expression]) extends ComplexTypeMergingExpres
     
         val mapMerge =
           s"""
    -        |${ev.isNull} = $hasNullName;
    -        |if (!${ev.isNull}) {
    -        |  $arrayDataClass[] $keyArgsName = new $arrayDataClass[${mapCodes.size}];
    -        |  $arrayDataClass[] $valArgsName = new $arrayDataClass[${mapCodes.size}];
    -        |  long $numElementsName = 0;
    -        |  for (int $idxName = 0; $idxName < $argsName.length; $idxName++) {
    -        |    $keyArgsName[$idxName] = $argsName[$idxName].keyArray();
    -        |    $valArgsName[$idxName] = $argsName[$idxName].valueArray();
    -        |    $numElementsName += $argsName[$idxName].numElements();
    -        |  }
    -        |  if ($numElementsName > ${ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH}) {
    -        |    throw new RuntimeException("Unsuccessful attempt to concat maps with " +
    -        |       $numElementsName + " elements due to exceeding the map size limit " +
    -        |       "${ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH}.");
    -        |  }
    -        |  $arrayDataClass $finKeysName = $keyConcat($keyArgsName,
    -        |    (int) $numElementsName);
    -        |  $arrayDataClass $finValsName = $valueConcat($valArgsName,
    -        |    (int) $numElementsName);
    -        |  ${ev.value} = new $arrayBasedMapDataClass($finKeysName, $finValsName);
    +        |ArrayData[] $keyArgsName = new ArrayData[${mapCodes.size}];
    +        |ArrayData[] $valArgsName = new ArrayData[${mapCodes.size}];
    +        |long $numElementsName = 0;
    +        |for (int $idxName = 0; $idxName < $argsName.length; $idxName++) {
    +        |  $keyArgsName[$idxName] = $argsName[$idxName].keyArray();
    +        |  $valArgsName[$idxName] = $argsName[$idxName].valueArray();
    +        |  $numElementsName += $argsName[$idxName].numElements();
             |}
    +        |if ($numElementsName > ${ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH}) {
    --- End diff --
    
    I see, I agree doing it in a followup, thanks.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #23124: [SPARK-25829][SQL] remove duplicated map keys wit...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/23124#discussion_r235927895
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala ---
    @@ -546,33 +546,29 @@ case class MapConcat(children: Seq[Expression]) extends ComplexTypeMergingExpres
     
       override def nullable: Boolean = children.exists(_.nullable)
     
    +  private lazy val mapBuilder = new ArrayBasedMapBuilder(dataType.keyType, dataType.valueType)
    +
       override def eval(input: InternalRow): Any = {
    -    val maps = children.map(_.eval(input))
    +    val maps = children.map(_.eval(input).asInstanceOf[MapData]).toArray
    --- End diff --
    
    I need to access it by index below, turn it to array so that the access is guaranteed to be O(1).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #23124: [SPARK-25829][SQL] remove duplicated map keys wit...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/spark/pull/23124


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #23124: [SPARK-25829][SQL] remove duplicated map keys with last ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/23124
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/5359/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #23124: [SPARK-25829][SQL] remove duplicated map keys wit...

Posted by dongjoon-hyun <gi...@git.apache.org>.
Github user dongjoon-hyun commented on a diff in the pull request:

    https://github.com/apache/spark/pull/23124#discussion_r235851554
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ArrayBasedMapBuilder.scala ---
    @@ -0,0 +1,118 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.catalyst.util
    +
    +import scala.collection.mutable
    +
    +import org.apache.spark.sql.catalyst.InternalRow
    +import org.apache.spark.sql.types.{AtomicType, CalendarIntervalType, DataType, MapType}
    +
    +/**
    + * A builder of [[ArrayBasedMapData]], which fails if a null map key is detected, and removes
    + * duplicated map keys w.r.t. the last wins policy.
    + */
    +class ArrayBasedMapBuilder(keyType: DataType, valueType: DataType) extends Serializable {
    +  assert(!keyType.existsRecursively(_.isInstanceOf[MapType]), "key of map cannot be/contain map")
    --- End diff --
    
    Shall we add assert to prevent `NullType` here, too?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #23124: [SPARK-25829][SQL] remove duplicated map keys wit...

Posted by ueshin <gi...@git.apache.org>.
Github user ueshin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/23124#discussion_r236973308
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ArrayBasedMapData.scala ---
    @@ -19,6 +19,12 @@ package org.apache.spark.sql.catalyst.util
     
     import java.util.{Map => JavaMap}
     
    +/**
    + * A simple `MapData` implementation which is backed by 2 arrays.
    + *
    + * Note that, user is responsible to guarantee that the key array does not have duplicated
    + * elements, otherwise the behavior is undefined.
    --- End diff --
    
    nit: we might need to add the same note to the 3rd and 4th `ArrayBasedMapData.apply()` method.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #23124: [SPARK-25829][SQL] remove duplicated map keys with last ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/23124
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #23124: [SPARK-25829][SQL] remove duplicated map keys with last ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/23124
  
    Merged build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #23124: [SPARK-25829][SQL] remove duplicated map keys with last ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/23124
  
    **[Test build #99325 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/99325/testReport)** for PR 23124 at commit [`b7073b2`](https://github.com/apache/spark/commit/b7073b29b0d4316dc783f0b5d7d79b134d7558c4).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #23124: [SPARK-25829][SQL] remove duplicated map keys with last ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/23124
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/99325/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #23124: [SPARK-25829][SQL] remove duplicated map keys wit...

Posted by kiszk <gi...@git.apache.org>.
Github user kiszk commented on a diff in the pull request:

    https://github.com/apache/spark/pull/23124#discussion_r235943290
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala ---
    @@ -751,171 +739,46 @@ case class MapFromEntries(child: Expression) extends UnaryExpression {
           s"${child.dataType.catalogString} type. $prettyName accepts only arrays of pair structs.")
       }
     
    +  private lazy val mapBuilder = new ArrayBasedMapBuilder(dataType.keyType, dataType.valueType)
    +
       override protected def nullSafeEval(input: Any): Any = {
    -    val arrayData = input.asInstanceOf[ArrayData]
    -    val numEntries = arrayData.numElements()
    +    val entries = input.asInstanceOf[ArrayData]
    +    val numEntries = entries.numElements()
         var i = 0
    -    if(nullEntries) {
    +    if (nullEntries) {
           while (i < numEntries) {
    -        if (arrayData.isNullAt(i)) return null
    +        if (entries.isNullAt(i)) return null
             i += 1
           }
         }
    -    val keyArray = new Array[AnyRef](numEntries)
    -    val valueArray = new Array[AnyRef](numEntries)
    +
    +    mapBuilder.reset()
         i = 0
         while (i < numEntries) {
    -      val entry = arrayData.getStruct(i, 2)
    -      val key = entry.get(0, dataType.keyType)
    -      if (key == null) {
    -        throw new RuntimeException("The first field from a struct (key) can't be null.")
    -      }
    -      keyArray.update(i, key)
    -      val value = entry.get(1, dataType.valueType)
    -      valueArray.update(i, value)
    +      mapBuilder.put(entries.getStruct(i, 2))
           i += 1
         }
    -    ArrayBasedMapData(keyArray, valueArray)
    +    mapBuilder.build()
       }
     
       override protected def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
         nullSafeCodeGen(ctx, ev, c => {
           val numEntries = ctx.freshName("numEntries")
    -      val isKeyPrimitive = CodeGenerator.isPrimitiveType(dataType.keyType)
    -      val isValuePrimitive = CodeGenerator.isPrimitiveType(dataType.valueType)
    -      val code = if (isKeyPrimitive && isValuePrimitive) {
    -        genCodeForPrimitiveElements(ctx, c, ev.value, numEntries)
    --- End diff --
    
    This change allow us to focus on optimizing `ArrayBasedMapBuilder`.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #23124: [SPARK-25829][SQL] remove duplicated map keys with last ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/23124
  
    **[Test build #99209 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/99209/testReport)** for PR 23124 at commit [`cbcd5d7`](https://github.com/apache/spark/commit/cbcd5d7a937f8120ef8527f1f26150ed93f1de0a).
     * This patch **fails due to an unknown error code, -9**.
     * This patch merges cleanly.
     * This patch adds the following public classes _(experimental)_:
      * `class ArrayBasedMapBuilder(keyType: DataType, valueType: DataType) extends Serializable `


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #23124: [SPARK-25829][SQL] remove duplicated map keys with last ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/23124
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #23124: [SPARK-25829][SQL] remove duplicated map keys with last ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/23124
  
    **[Test build #99310 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/99310/testReport)** for PR 23124 at commit [`abd0ec5`](https://github.com/apache/spark/commit/abd0ec543a944fa02320337f4fab7fff6ffe9667).
     * This patch **fails due to an unknown error code, -9**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #23124: [SPARK-25829][SQL] remove duplicated map keys wit...

Posted by mgaido91 <gi...@git.apache.org>.
Github user mgaido91 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/23124#discussion_r236170759
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala ---
    @@ -546,33 +546,29 @@ case class MapConcat(children: Seq[Expression]) extends ComplexTypeMergingExpres
     
       override def nullable: Boolean = children.exists(_.nullable)
     
    +  private lazy val mapBuilder = new ArrayBasedMapBuilder(dataType.keyType, dataType.valueType)
    +
       override def eval(input: InternalRow): Any = {
    -    val maps = children.map(_.eval(input))
    +    val maps = children.map(_.eval(input).asInstanceOf[MapData]).toArray
    --- End diff --
    
    Yes, but converting `toArray` may require an extra O(N) operation for the copy, so I am not sure the difference between `while` and `foreach` is significant enough to cover the overhead of the copy...


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #23124: [SPARK-25829][SQL] remove duplicated map keys wit...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/23124#discussion_r235849825
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala ---
    @@ -751,171 +739,46 @@ case class MapFromEntries(child: Expression) extends UnaryExpression {
           s"${child.dataType.catalogString} type. $prettyName accepts only arrays of pair structs.")
       }
     
    +  private lazy val mapBuilder = new ArrayBasedMapBuilder(dataType.keyType, dataType.valueType)
    +
       override protected def nullSafeEval(input: Any): Any = {
    -    val arrayData = input.asInstanceOf[ArrayData]
    -    val numEntries = arrayData.numElements()
    +    val entries = input.asInstanceOf[ArrayData]
    +    val numEntries = entries.numElements()
         var i = 0
    -    if(nullEntries) {
    +    if (nullEntries) {
           while (i < numEntries) {
    -        if (arrayData.isNullAt(i)) return null
    +        if (entries.isNullAt(i)) return null
             i += 1
           }
         }
    -    val keyArray = new Array[AnyRef](numEntries)
    -    val valueArray = new Array[AnyRef](numEntries)
    +
    +    mapBuilder.reset()
         i = 0
         while (i < numEntries) {
    -      val entry = arrayData.getStruct(i, 2)
    -      val key = entry.get(0, dataType.keyType)
    -      if (key == null) {
    -        throw new RuntimeException("The first field from a struct (key) can't be null.")
    -      }
    -      keyArray.update(i, key)
    -      val value = entry.get(1, dataType.valueType)
    -      valueArray.update(i, value)
    +      mapBuilder.put(entries.getStruct(i, 2))
           i += 1
         }
    -    ArrayBasedMapData(keyArray, valueArray)
    +    mapBuilder.build()
       }
     
       override protected def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
         nullSafeCodeGen(ctx, ev, c => {
           val numEntries = ctx.freshName("numEntries")
    -      val isKeyPrimitive = CodeGenerator.isPrimitiveType(dataType.keyType)
    -      val isValuePrimitive = CodeGenerator.isPrimitiveType(dataType.valueType)
    -      val code = if (isKeyPrimitive && isValuePrimitive) {
    -        genCodeForPrimitiveElements(ctx, c, ev.value, numEntries)
    --- End diff --
    
    since we need to check duplicated map keys, it's not possible to apply this trick anymore, as we need to overwrite values if the key appears before.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #23124: [SPARK-25829][SQL] remove duplicated map keys with last ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/23124
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/99362/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #23124: [SPARK-25829][SQL] remove duplicated map keys with last ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/23124
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #23124: [SPARK-25829][SQL] remove duplicated map keys with last ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/23124
  
    **[Test build #99304 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/99304/testReport)** for PR 23124 at commit [`0c77c41`](https://github.com/apache/spark/commit/0c77c4173606edb09d4d2c7a721dee48c976233f).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #23124: [SPARK-25829][SQL] remove duplicated map keys with last ...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on the issue:

    https://github.com/apache/spark/pull/23124
  
    cc @dongjoon-hyun @gatorsmile @viirya @kiszk @mgaido91 


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #23124: [SPARK-25829][SQL] remove duplicated map keys wit...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/23124#discussion_r236112256
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala ---
    @@ -546,33 +546,29 @@ case class MapConcat(children: Seq[Expression]) extends ComplexTypeMergingExpres
     
       override def nullable: Boolean = children.exists(_.nullable)
     
    +  private lazy val mapBuilder = new ArrayBasedMapBuilder(dataType.keyType, dataType.valueType)
    +
       override def eval(input: InternalRow): Any = {
    -    val maps = children.map(_.eval(input))
    +    val maps = children.map(_.eval(input).asInstanceOf[MapData]).toArray
    --- End diff --
    
    BTW, if it's not true anymore with scala 2.12, we should update them together with a bechmark, instead of only updating this single one.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #23124: [SPARK-25829][SQL] remove duplicated map keys wit...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/23124#discussion_r235928954
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ArrayBasedMapBuilder.scala ---
    @@ -0,0 +1,118 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.catalyst.util
    +
    +import scala.collection.mutable
    +
    +import org.apache.spark.sql.catalyst.InternalRow
    +import org.apache.spark.sql.types.{AtomicType, CalendarIntervalType, DataType, MapType}
    +
    +/**
    + * A builder of [[ArrayBasedMapData]], which fails if a null map key is detected, and removes
    + * duplicated map keys w.r.t. the last wins policy.
    + */
    +class ArrayBasedMapBuilder(keyType: DataType, valueType: DataType) extends Serializable {
    +  assert(!keyType.existsRecursively(_.isInstanceOf[MapType]), "key of map cannot be/contain map")
    +
    +  private lazy val keyToIndex = keyType match {
    +    case _: AtomicType | _: CalendarIntervalType => mutable.HashMap.empty[Any, Int]
    +    case _ =>
    +      // for complex types, use interpreted ordering to be able to compare unsafe data with safe
    +      // data, e.g. UnsafeRow vs GenericInternalRow.
    +      mutable.TreeMap.empty[Any, Int](TypeUtils.getInterpretedOrdering(keyType))
    +  }
    +
    +  // TODO: specialize it
    +  private lazy val keys = mutable.ArrayBuffer.empty[Any]
    +  private lazy val values = mutable.ArrayBuffer.empty[Any]
    +
    +  private lazy val keyGetter = InternalRow.getAccessor(keyType)
    +  private lazy val valueGetter = InternalRow.getAccessor(valueType)
    +
    +  def reset(): Unit = {
    +    keyToIndex.clear()
    +    keys.clear()
    +    values.clear()
    +  }
    +
    +  def put(key: Any, value: Any): Unit = {
    +    if (key == null) {
    +      throw new RuntimeException("Cannot use null as map key.")
    +    }
    +
    +    val maybeExistingIdx = keyToIndex.get(key)
    +    if (maybeExistingIdx.isDefined) {
    +      // Overwrite the previous value, as the policy is last wins.
    +      values(maybeExistingIdx.get) = value
    +    } else {
    +      keyToIndex.put(key, values.length)
    +      keys.append(key)
    +      values.append(value)
    +    }
    +  }
    +
    +  // write a 2-field row, the first field is key and the second field is value.
    +  def put(entry: InternalRow): Unit = {
    +    if (entry.isNullAt(0)) {
    --- End diff --
    
    There are 2 put methods have this null check and other put methods all go through them.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #23124: [SPARK-25829][SQL] remove duplicated map keys with last ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/23124
  
    **[Test build #99304 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/99304/testReport)** for PR 23124 at commit [`0c77c41`](https://github.com/apache/spark/commit/0c77c4173606edb09d4d2c7a721dee48c976233f).
     * This patch **fails PySpark unit tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #23124: [SPARK-25829][SQL] remove duplicated map keys with last ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/23124
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #23124: [SPARK-25829][SQL] remove duplicated map keys with last ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/23124
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/5439/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #23124: [SPARK-25829][SQL] remove duplicated map keys with last ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/23124
  
    Merged build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #23124: [SPARK-25829][SQL] remove duplicated map keys with last ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/23124
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #23124: [SPARK-25829][SQL] remove duplicated map keys with last ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/23124
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #23124: [SPARK-25829][SQL] remove duplicated map keys wit...

Posted by mgaido91 <gi...@git.apache.org>.
Github user mgaido91 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/23124#discussion_r235931588
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ArrayBasedMapBuilder.scala ---
    @@ -0,0 +1,118 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.catalyst.util
    +
    +import scala.collection.mutable
    +
    +import org.apache.spark.sql.catalyst.InternalRow
    +import org.apache.spark.sql.types.{AtomicType, CalendarIntervalType, DataType, MapType}
    +
    +/**
    + * A builder of [[ArrayBasedMapData]], which fails if a null map key is detected, and removes
    + * duplicated map keys w.r.t. the last wins policy.
    + */
    +class ArrayBasedMapBuilder(keyType: DataType, valueType: DataType) extends Serializable {
    +  assert(!keyType.existsRecursively(_.isInstanceOf[MapType]), "key of map cannot be/contain map")
    +
    +  private lazy val keyToIndex = keyType match {
    +    case _: AtomicType | _: CalendarIntervalType => mutable.HashMap.empty[Any, Int]
    +    case _ =>
    +      // for complex types, use interpreted ordering to be able to compare unsafe data with safe
    +      // data, e.g. UnsafeRow vs GenericInternalRow.
    +      mutable.TreeMap.empty[Any, Int](TypeUtils.getInterpretedOrdering(keyType))
    +  }
    +
    +  // TODO: specialize it
    +  private lazy val keys = mutable.ArrayBuffer.empty[Any]
    +  private lazy val values = mutable.ArrayBuffer.empty[Any]
    +
    +  private lazy val keyGetter = InternalRow.getAccessor(keyType)
    +  private lazy val valueGetter = InternalRow.getAccessor(valueType)
    +
    +  def reset(): Unit = {
    +    keyToIndex.clear()
    +    keys.clear()
    +    values.clear()
    +  }
    +
    +  def put(key: Any, value: Any): Unit = {
    +    if (key == null) {
    +      throw new RuntimeException("Cannot use null as map key.")
    +    }
    +
    +    val maybeExistingIdx = keyToIndex.get(key)
    +    if (maybeExistingIdx.isDefined) {
    +      // Overwrite the previous value, as the policy is last wins.
    +      values(maybeExistingIdx.get) = value
    +    } else {
    +      keyToIndex.put(key, values.length)
    +      keys.append(key)
    +      values.append(value)
    +    }
    +  }
    +
    +  // write a 2-field row, the first field is key and the second field is value.
    +  def put(entry: InternalRow): Unit = {
    +    if (entry.isNullAt(0)) {
    --- End diff --
    
    Oh I see now, I missed it, thanks.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #23124: [SPARK-25829][SQL] remove duplicated map keys wit...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/23124#discussion_r235929210
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ArrayBasedMapBuilder.scala ---
    @@ -0,0 +1,118 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.catalyst.util
    +
    +import scala.collection.mutable
    +
    +import org.apache.spark.sql.catalyst.InternalRow
    +import org.apache.spark.sql.types.{AtomicType, CalendarIntervalType, DataType, MapType}
    +
    +/**
    + * A builder of [[ArrayBasedMapData]], which fails if a null map key is detected, and removes
    + * duplicated map keys w.r.t. the last wins policy.
    + */
    +class ArrayBasedMapBuilder(keyType: DataType, valueType: DataType) extends Serializable {
    +  assert(!keyType.existsRecursively(_.isInstanceOf[MapType]), "key of map cannot be/contain map")
    +
    +  private lazy val keyToIndex = keyType match {
    +    case _: AtomicType | _: CalendarIntervalType => mutable.HashMap.empty[Any, Int]
    +    case _ =>
    +      // for complex types, use interpreted ordering to be able to compare unsafe data with safe
    +      // data, e.g. UnsafeRow vs GenericInternalRow.
    +      mutable.TreeMap.empty[Any, Int](TypeUtils.getInterpretedOrdering(keyType))
    +  }
    +
    +  // TODO: specialize it
    +  private lazy val keys = mutable.ArrayBuffer.empty[Any]
    +  private lazy val values = mutable.ArrayBuffer.empty[Any]
    +
    +  private lazy val keyGetter = InternalRow.getAccessor(keyType)
    +  private lazy val valueGetter = InternalRow.getAccessor(valueType)
    +
    +  def reset(): Unit = {
    +    keyToIndex.clear()
    +    keys.clear()
    +    values.clear()
    +  }
    +
    +  def put(key: Any, value: Any): Unit = {
    +    if (key == null) {
    +      throw new RuntimeException("Cannot use null as map key.")
    +    }
    +
    +    val maybeExistingIdx = keyToIndex.get(key)
    +    if (maybeExistingIdx.isDefined) {
    +      // Overwrite the previous value, as the policy is last wins.
    +      values(maybeExistingIdx.get) = value
    +    } else {
    +      keyToIndex.put(key, values.length)
    +      keys.append(key)
    +      values.append(value)
    +    }
    +  }
    +
    +  // write a 2-field row, the first field is key and the second field is value.
    +  def put(entry: InternalRow): Unit = {
    +    if (entry.isNullAt(0)) {
    +      throw new RuntimeException("Cannot use null as map key.")
    +    }
    +    put(keyGetter(entry, 0), valueGetter(entry, 1))
    +  }
    +
    +  def putAll(keyArray: Array[Any], valueArray: Array[Any]): Unit = {
    --- End diff --
    
    ah good catch!


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #23124: [SPARK-25829][SQL] remove duplicated map keys wit...

Posted by dongjoon-hyun <gi...@git.apache.org>.
Github user dongjoon-hyun commented on a diff in the pull request:

    https://github.com/apache/spark/pull/23124#discussion_r235851923
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala ---
    @@ -558,8 +558,11 @@ private[parquet] class ParquetRowConverter(
     
         override def getConverter(fieldIndex: Int): Converter = keyValueConverter
     
    -    override def end(): Unit =
    +    override def end(): Unit = {
    +      // The parquet map may contains null or duplicated map keys. When it happens, the behavior is
    +      // undefined.
    --- End diff --
    
    What about creating a Spark JIRA issue for this and embedded that ID here?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #23124: [SPARK-25829][SQL] remove duplicated map keys with last ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/23124
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/99304/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #23124: [SPARK-25829][SQL] remove duplicated map keys wit...

Posted by mgaido91 <gi...@git.apache.org>.
Github user mgaido91 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/23124#discussion_r235867070
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ArrayBasedMapBuilder.scala ---
    @@ -0,0 +1,118 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.catalyst.util
    +
    +import scala.collection.mutable
    +
    +import org.apache.spark.sql.catalyst.InternalRow
    +import org.apache.spark.sql.types.{AtomicType, CalendarIntervalType, DataType, MapType}
    +
    +/**
    + * A builder of [[ArrayBasedMapData]], which fails if a null map key is detected, and removes
    + * duplicated map keys w.r.t. the last wins policy.
    + */
    +class ArrayBasedMapBuilder(keyType: DataType, valueType: DataType) extends Serializable {
    +  assert(!keyType.existsRecursively(_.isInstanceOf[MapType]), "key of map cannot be/contain map")
    +
    +  private lazy val keyToIndex = keyType match {
    +    case _: AtomicType | _: CalendarIntervalType => mutable.HashMap.empty[Any, Int]
    +    case _ =>
    +      // for complex types, use interpreted ordering to be able to compare unsafe data with safe
    +      // data, e.g. UnsafeRow vs GenericInternalRow.
    +      mutable.TreeMap.empty[Any, Int](TypeUtils.getInterpretedOrdering(keyType))
    +  }
    +
    +  // TODO: specialize it
    +  private lazy val keys = mutable.ArrayBuffer.empty[Any]
    +  private lazy val values = mutable.ArrayBuffer.empty[Any]
    +
    +  private lazy val keyGetter = InternalRow.getAccessor(keyType)
    +  private lazy val valueGetter = InternalRow.getAccessor(valueType)
    +
    +  def reset(): Unit = {
    +    keyToIndex.clear()
    +    keys.clear()
    +    values.clear()
    +  }
    +
    +  def put(key: Any, value: Any): Unit = {
    +    if (key == null) {
    +      throw new RuntimeException("Cannot use null as map key.")
    +    }
    +
    +    val maybeExistingIdx = keyToIndex.get(key)
    +    if (maybeExistingIdx.isDefined) {
    +      // Overwrite the previous value, as the policy is last wins.
    +      values(maybeExistingIdx.get) = value
    +    } else {
    +      keyToIndex.put(key, values.length)
    +      keys.append(key)
    +      values.append(value)
    +    }
    +  }
    +
    +  // write a 2-field row, the first field is key and the second field is value.
    +  def put(entry: InternalRow): Unit = {
    +    if (entry.isNullAt(0)) {
    --- End diff --
    
    this is checked only here and not in all the other put...I think we should be consistent and either check it always or never do it..


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #23124: [SPARK-25829][SQL] remove duplicated map keys with last ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/23124
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/5405/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #23124: [SPARK-25829][SQL] remove duplicated map keys with last ...

Posted by dongjoon-hyun <gi...@git.apache.org>.
Github user dongjoon-hyun commented on the issue:

    https://github.com/apache/spark/pull/23124
  
    Thank you so much, @cloud-fan !


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #23124: [SPARK-25829][SQL] remove duplicated map keys wit...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/23124#discussion_r236274428
  
    --- Diff: docs/sql-migration-guide-upgrade.md ---
    @@ -19,6 +19,8 @@ displayTitle: Spark SQL Upgrading Guide
     
       - In Spark version 2.4 and earlier, users can create map values with map type key via built-in function like `CreateMap`, `MapFromArrays`, etc. Since Spark 3.0, it's not allowed to create map values with map type key with these built-in functions. Users can still read map values with map type key from data source or Java/Scala collections, though they are not very useful.
     
    +  - In Spark version 2.4 and earlier, users can create a map with duplicated keys via built-in functions like `CreateMap`, `StringToMap`, etc. The behavior of map with duplicated keys is undefined, e.g. map look up respects the duplicated key appears first, `Dataset.collect` only keeps the duplicated key appears last, `MapKeys` returns duplicated keys, etc. Since Spark 3.0, these built-in functions will remove duplicated map keys with last wins policy.
    --- End diff --
    
    They are related, but they are not the same. For example, we don't support map type as key, because we can't check equality of map type correctly. This is just a current implementation limitation, and we may relax it in the future.
    
    Duplicated map keys is a real problem and we will never allow it.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #23124: [SPARK-25829][SQL] remove duplicated map keys with last ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/23124
  
    **[Test build #99276 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/99276/testReport)** for PR 23124 at commit [`b2bfb33`](https://github.com/apache/spark/commit/b2bfb3353f89be03fc6b12e4e8dd0899e3510a09).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #23124: [SPARK-25829][SQL] remove duplicated map keys wit...

Posted by dongjoon-hyun <gi...@git.apache.org>.
Github user dongjoon-hyun commented on a diff in the pull request:

    https://github.com/apache/spark/pull/23124#discussion_r235999040
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ArrayBasedMapBuilder.scala ---
    @@ -0,0 +1,118 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.catalyst.util
    +
    +import scala.collection.mutable
    +
    +import org.apache.spark.sql.catalyst.InternalRow
    +import org.apache.spark.sql.types.{AtomicType, CalendarIntervalType, DataType, MapType}
    +
    +/**
    + * A builder of [[ArrayBasedMapData]], which fails if a null map key is detected, and removes
    + * duplicated map keys w.r.t. the last wins policy.
    + */
    +class ArrayBasedMapBuilder(keyType: DataType, valueType: DataType) extends Serializable {
    +  assert(!keyType.existsRecursively(_.isInstanceOf[MapType]), "key of map cannot be/contain map")
    +
    +  private lazy val keyToIndex = keyType match {
    +    case _: AtomicType | _: CalendarIntervalType => mutable.HashMap.empty[Any, Int]
    +    case _ =>
    +      // for complex types, use interpreted ordering to be able to compare unsafe data with safe
    +      // data, e.g. UnsafeRow vs GenericInternalRow.
    +      mutable.TreeMap.empty[Any, Int](TypeUtils.getInterpretedOrdering(keyType))
    --- End diff --
    
    Sure.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #23124: [SPARK-25829][SQL] remove duplicated map keys with last ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/23124
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/99310/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #23124: [SPARK-25829][SQL] remove duplicated map keys wit...

Posted by mgaido91 <gi...@git.apache.org>.
Github user mgaido91 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/23124#discussion_r235931894
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala ---
    @@ -646,34 +633,35 @@ case class MapConcat(children: Seq[Expression]) extends ComplexTypeMergingExpres
     
         val mapMerge =
           s"""
    -        |${ev.isNull} = $hasNullName;
    -        |if (!${ev.isNull}) {
    -        |  $arrayDataClass[] $keyArgsName = new $arrayDataClass[${mapCodes.size}];
    -        |  $arrayDataClass[] $valArgsName = new $arrayDataClass[${mapCodes.size}];
    -        |  long $numElementsName = 0;
    -        |  for (int $idxName = 0; $idxName < $argsName.length; $idxName++) {
    -        |    $keyArgsName[$idxName] = $argsName[$idxName].keyArray();
    -        |    $valArgsName[$idxName] = $argsName[$idxName].valueArray();
    -        |    $numElementsName += $argsName[$idxName].numElements();
    -        |  }
    -        |  if ($numElementsName > ${ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH}) {
    -        |    throw new RuntimeException("Unsuccessful attempt to concat maps with " +
    -        |       $numElementsName + " elements due to exceeding the map size limit " +
    -        |       "${ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH}.");
    -        |  }
    -        |  $arrayDataClass $finKeysName = $keyConcat($keyArgsName,
    -        |    (int) $numElementsName);
    -        |  $arrayDataClass $finValsName = $valueConcat($valArgsName,
    -        |    (int) $numElementsName);
    -        |  ${ev.value} = new $arrayBasedMapDataClass($finKeysName, $finValsName);
    +        |ArrayData[] $keyArgsName = new ArrayData[${mapCodes.size}];
    +        |ArrayData[] $valArgsName = new ArrayData[${mapCodes.size}];
    +        |long $numElementsName = 0;
    +        |for (int $idxName = 0; $idxName < $argsName.length; $idxName++) {
    +        |  $keyArgsName[$idxName] = $argsName[$idxName].keyArray();
    +        |  $valArgsName[$idxName] = $argsName[$idxName].valueArray();
    +        |  $numElementsName += $argsName[$idxName].numElements();
             |}
    +        |if ($numElementsName > ${ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH}) {
    --- End diff --
    
    yes, but we could do the putAll before and eventually fail when we reach the limit. We can maybe do that in a followup, though, as it is not introducing any regression..


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #23124: [SPARK-25829][SQL] remove duplicated map keys with last ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/23124
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/5395/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #23124: [SPARK-25829][SQL] remove duplicated map keys with last ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/23124
  
    **[Test build #99276 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/99276/testReport)** for PR 23124 at commit [`b2bfb33`](https://github.com/apache/spark/commit/b2bfb3353f89be03fc6b12e4e8dd0899e3510a09).
     * This patch **fails Spark unit tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #23124: [SPARK-25829][SQL] remove duplicated map keys wit...

Posted by mgaido91 <gi...@git.apache.org>.
Github user mgaido91 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/23124#discussion_r235932502
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala ---
    @@ -546,33 +546,29 @@ case class MapConcat(children: Seq[Expression]) extends ComplexTypeMergingExpres
     
       override def nullable: Boolean = children.exists(_.nullable)
     
    +  private lazy val mapBuilder = new ArrayBasedMapBuilder(dataType.keyType, dataType.valueType)
    +
       override def eval(input: InternalRow): Any = {
    -    val maps = children.map(_.eval(input))
    +    val maps = children.map(_.eval(input).asInstanceOf[MapData]).toArray
    --- End diff --
    
    well, my understanding is that we could do a `maps.foreach` instead of accessing them by index. I don't see the index to be significant at all, but maybe I am missing something...


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #23124: [SPARK-25829][SQL] remove duplicated map keys with last ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/23124
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #23124: [SPARK-25829][SQL] remove duplicated map keys with last ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/23124
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #23124: [SPARK-25829][SQL] remove duplicated map keys wit...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on a diff in the pull request:

    https://github.com/apache/spark/pull/23124#discussion_r235879585
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ArrayBasedMapBuilder.scala ---
    @@ -0,0 +1,118 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.catalyst.util
    +
    +import scala.collection.mutable
    +
    +import org.apache.spark.sql.catalyst.InternalRow
    +import org.apache.spark.sql.types.{AtomicType, CalendarIntervalType, DataType, MapType}
    +
    +/**
    + * A builder of [[ArrayBasedMapData]], which fails if a null map key is detected, and removes
    + * duplicated map keys w.r.t. the last wins policy.
    + */
    +class ArrayBasedMapBuilder(keyType: DataType, valueType: DataType) extends Serializable {
    +  assert(!keyType.existsRecursively(_.isInstanceOf[MapType]), "key of map cannot be/contain map")
    +
    +  private lazy val keyToIndex = keyType match {
    +    case _: AtomicType | _: CalendarIntervalType => mutable.HashMap.empty[Any, Int]
    +    case _ =>
    +      // for complex types, use interpreted ordering to be able to compare unsafe data with safe
    +      // data, e.g. UnsafeRow vs GenericInternalRow.
    +      mutable.TreeMap.empty[Any, Int](TypeUtils.getInterpretedOrdering(keyType))
    +  }
    +
    +  // TODO: specialize it
    +  private lazy val keys = mutable.ArrayBuffer.empty[Any]
    +  private lazy val values = mutable.ArrayBuffer.empty[Any]
    +
    +  private lazy val keyGetter = InternalRow.getAccessor(keyType)
    +  private lazy val valueGetter = InternalRow.getAccessor(valueType)
    +
    +  def reset(): Unit = {
    +    keyToIndex.clear()
    +    keys.clear()
    +    values.clear()
    +  }
    +
    +  def put(key: Any, value: Any): Unit = {
    +    if (key == null) {
    +      throw new RuntimeException("Cannot use null as map key.")
    +    }
    +
    +    val maybeExistingIdx = keyToIndex.get(key)
    +    if (maybeExistingIdx.isDefined) {
    +      // Overwrite the previous value, as the policy is last wins.
    +      values(maybeExistingIdx.get) = value
    +    } else {
    +      keyToIndex.put(key, values.length)
    +      keys.append(key)
    +      values.append(value)
    +    }
    +  }
    +
    +  // write a 2-field row, the first field is key and the second field is value.
    +  def put(entry: InternalRow): Unit = {
    +    if (entry.isNullAt(0)) {
    +      throw new RuntimeException("Cannot use null as map key.")
    +    }
    +    put(keyGetter(entry, 0), valueGetter(entry, 1))
    +  }
    +
    +  def putAll(keyArray: Array[Any], valueArray: Array[Any]): Unit = {
    --- End diff --
    
    Has this method been used? Looks like only another `putAll` below is used.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #23124: [SPARK-25829][SQL] remove duplicated map keys with last ...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on the issue:

    https://github.com/apache/spark/pull/23124
  
    thanks, merging to master!


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #23124: [SPARK-25829][SQL] remove duplicated map keys wit...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/23124#discussion_r236962499
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ArrayBasedMapBuilder.scala ---
    @@ -0,0 +1,118 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.catalyst.util
    +
    +import scala.collection.mutable
    +
    +import org.apache.spark.sql.catalyst.InternalRow
    +import org.apache.spark.sql.types._
    +
    +/**
    + * A builder of [[ArrayBasedMapData]], which fails if a null map key is detected, and removes
    + * duplicated map keys w.r.t. the last wins policy.
    + */
    +class ArrayBasedMapBuilder(keyType: DataType, valueType: DataType) extends Serializable {
    +  assert(!keyType.existsRecursively(_.isInstanceOf[MapType]), "key of map cannot be/contain map")
    +  assert(keyType != NullType, "map key cannot be null type.")
    +
    +  private lazy val keyToIndex = keyType match {
    +    case _: AtomicType | _: CalendarIntervalType => mutable.HashMap.empty[Any, Int]
    --- End diff --
    
    I think for performance critical code path we should prefer java collection. thanks for pointing it out!


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #23124: [SPARK-25829][SQL] remove duplicated map keys wit...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/23124#discussion_r235849697
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/InternalRow.scala ---
    @@ -125,22 +125,36 @@ object InternalRow {
        * actually takes a `SpecializedGetters` input because it can be generalized to other classes
        * that implements `SpecializedGetters` (e.g., `ArrayData`) too.
        */
    -  def getAccessor(dataType: DataType): (SpecializedGetters, Int) => Any = dataType match {
    -    case BooleanType => (input, ordinal) => input.getBoolean(ordinal)
    -    case ByteType => (input, ordinal) => input.getByte(ordinal)
    -    case ShortType => (input, ordinal) => input.getShort(ordinal)
    -    case IntegerType | DateType => (input, ordinal) => input.getInt(ordinal)
    -    case LongType | TimestampType => (input, ordinal) => input.getLong(ordinal)
    -    case FloatType => (input, ordinal) => input.getFloat(ordinal)
    -    case DoubleType => (input, ordinal) => input.getDouble(ordinal)
    -    case StringType => (input, ordinal) => input.getUTF8String(ordinal)
    -    case BinaryType => (input, ordinal) => input.getBinary(ordinal)
    -    case CalendarIntervalType => (input, ordinal) => input.getInterval(ordinal)
    -    case t: DecimalType => (input, ordinal) => input.getDecimal(ordinal, t.precision, t.scale)
    -    case t: StructType => (input, ordinal) => input.getStruct(ordinal, t.size)
    -    case _: ArrayType => (input, ordinal) => input.getArray(ordinal)
    -    case _: MapType => (input, ordinal) => input.getMap(ordinal)
    -    case u: UserDefinedType[_] => getAccessor(u.sqlType)
    -    case _ => (input, ordinal) => input.get(ordinal, dataType)
    +  def getAccessor(dt: DataType, nullable: Boolean = true): (SpecializedGetters, Int) => Any = {
    --- End diff --
    
    I can move it to a new PR if others think it's necessary. It's a little dangerous to ask the caller side to take care of null values.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #23124: [SPARK-25829][SQL] remove duplicated map keys with last ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/23124
  
    **[Test build #99213 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/99213/testReport)** for PR 23124 at commit [`cbcd5d7`](https://github.com/apache/spark/commit/cbcd5d7a937f8120ef8527f1f26150ed93f1de0a).
     * This patch **fails Spark unit tests**.
     * This patch merges cleanly.
     * This patch adds the following public classes _(experimental)_:
      * `class ArrayBasedMapBuilder(keyType: DataType, valueType: DataType) extends Serializable `


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #23124: [SPARK-25829][SQL] remove duplicated map keys with last ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/23124
  
    **[Test build #99213 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/99213/testReport)** for PR 23124 at commit [`cbcd5d7`](https://github.com/apache/spark/commit/cbcd5d7a937f8120ef8527f1f26150ed93f1de0a).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #23124: [SPARK-25829][SQL] remove duplicated map keys with last ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/23124
  
    **[Test build #99357 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/99357/testReport)** for PR 23124 at commit [`6dff654`](https://github.com/apache/spark/commit/6dff6545f272e0d5117ac17fdc27b686573c5626).
     * This patch **fails due to an unknown error code, -9**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #23124: [SPARK-25829][SQL] remove duplicated map keys with last ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/23124
  
    **[Test build #99362 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/99362/testReport)** for PR 23124 at commit [`6dff654`](https://github.com/apache/spark/commit/6dff6545f272e0d5117ac17fdc27b686573c5626).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #23124: [SPARK-25829][SQL] remove duplicated map keys with last ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/23124
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #23124: [SPARK-25829][SQL] remove duplicated map keys with last ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/23124
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/5387/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #23124: [SPARK-25829][SQL] remove duplicated map keys with last ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/23124
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/99276/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #23124: [SPARK-25829][SQL] remove duplicated map keys with last ...

Posted by mgaido91 <gi...@git.apache.org>.
Github user mgaido91 commented on the issue:

    https://github.com/apache/spark/pull/23124
  
    LGTM too


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #23124: [SPARK-25829][SQL] remove duplicated map keys wit...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/23124#discussion_r236949897
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala ---
    @@ -558,8 +558,11 @@ private[parquet] class ParquetRowConverter(
     
         override def getConverter(fieldIndex: Int): Converter = keyValueConverter
     
    -    override def end(): Unit =
    +    override def end(): Unit = {
    +      // The parquet map may contains null or duplicated map keys. When it happens, the behavior is
    +      // undefined.
    --- End diff --
    
    done


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #23124: [SPARK-25829][SQL] remove duplicated map keys wit...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on a diff in the pull request:

    https://github.com/apache/spark/pull/23124#discussion_r235872502
  
    --- Diff: docs/sql-migration-guide-upgrade.md ---
    @@ -19,6 +19,8 @@ displayTitle: Spark SQL Upgrading Guide
     
       - In Spark version 2.4 and earlier, users can create map values with map type key via built-in function like `CreateMap`, `MapFromArrays`, etc. Since Spark 3.0, it's not allowed to create map values with map type key with these built-in functions. Users can still read map values with map type key from data source or Java/Scala collections, though they are not very useful.
     
    +  - In Spark version 2.4 and earlier, users can create a map with duplicated keys via built-in functions like `CreateMap`, `StringToMap`, etc. The behavior of map with duplicated keys is undefined, e.g. map look up respects the duplicated key appears first, `Dataset.collect` only keeps the duplicated key appears last, `MapKeys` returns duplicated keys, etc. Since Spark 3.0, these built-in functions will remove duplicated map keys with last wins policy.
    --- End diff --
    
    Similar as above, shall we also mention data source can have be read with duplicated map keys?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #23124: [SPARK-25829][SQL] remove duplicated map keys wit...

Posted by kiszk <gi...@git.apache.org>.
Github user kiszk commented on a diff in the pull request:

    https://github.com/apache/spark/pull/23124#discussion_r236284636
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ArrayBasedMapBuilder.scala ---
    @@ -0,0 +1,118 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.catalyst.util
    +
    +import scala.collection.mutable
    +
    +import org.apache.spark.sql.catalyst.InternalRow
    +import org.apache.spark.sql.types.{AtomicType, CalendarIntervalType, DataType, MapType}
    +
    +/**
    + * A builder of [[ArrayBasedMapData]], which fails if a null map key is detected, and removes
    + * duplicated map keys w.r.t. the last wins policy.
    + */
    +class ArrayBasedMapBuilder(keyType: DataType, valueType: DataType) extends Serializable {
    +  assert(!keyType.existsRecursively(_.isInstanceOf[MapType]), "key of map cannot be/contain map")
    +
    +  private lazy val keyToIndex = keyType match {
    +    case _: AtomicType | _: CalendarIntervalType => mutable.HashMap.empty[Any, Int]
    +    case _ =>
    +      // for complex types, use interpreted ordering to be able to compare unsafe data with safe
    +      // data, e.g. UnsafeRow vs GenericInternalRow.
    +      mutable.TreeMap.empty[Any, Int](TypeUtils.getInterpretedOrdering(keyType))
    +  }
    +
    +  // TODO: specialize it
    +  private lazy val keys = mutable.ArrayBuffer.empty[Any]
    +  private lazy val values = mutable.ArrayBuffer.empty[Any]
    +
    +  private lazy val keyGetter = InternalRow.getAccessor(keyType)
    +  private lazy val valueGetter = InternalRow.getAccessor(valueType)
    +
    +  def reset(): Unit = {
    +    keyToIndex.clear()
    +    keys.clear()
    +    values.clear()
    +  }
    +
    +  def put(key: Any, value: Any): Unit = {
    +    if (key == null) {
    +      throw new RuntimeException("Cannot use null as map key.")
    +    }
    +
    +    val maybeExistingIdx = keyToIndex.get(key)
    +    if (maybeExistingIdx.isDefined) {
    +      // Overwrite the previous value, as the policy is last wins.
    +      values(maybeExistingIdx.get) = value
    +    } else {
    +      keyToIndex.put(key, values.length)
    +      keys.append(key)
    +      values.append(value)
    +    }
    +  }
    +
    +  // write a 2-field row, the first field is key and the second field is value.
    +  def put(entry: InternalRow): Unit = {
    +    if (entry.isNullAt(0)) {
    +      throw new RuntimeException("Cannot use null as map key.")
    +    }
    +    put(keyGetter(entry, 0), valueGetter(entry, 1))
    +  }
    +
    +  def putAll(keyArray: Array[Any], valueArray: Array[Any]): Unit = {
    +    if (keyArray.length != valueArray.length) {
    +      throw new RuntimeException(
    +        "The key array and value array of MapData must have the same length.")
    +    }
    +
    +    var i = 0
    +    while (i < keyArray.length) {
    +      put(keyArray(i), valueArray(i))
    +      i += 1
    +    }
    +  }
    +
    +  def putAll(keyArray: ArrayData, valueArray: ArrayData): Unit = {
    +    if (keyArray.numElements() != valueArray.numElements()) {
    +      throw new RuntimeException(
    +        "The key array and value array of MapData must have the same length.")
    +    }
    +
    +    var i = 0
    +    while (i < keyArray.numElements()) {
    +      put(keyGetter(keyArray, i), valueGetter(valueArray, i))
    +      i += 1
    +    }
    +  }
    +
    +  def build(): ArrayBasedMapData = {
    +    new ArrayBasedMapData(new GenericArrayData(keys.toArray), new GenericArrayData(values.toArray))
    +  }
    +
    +  def from(keyArray: ArrayData, valueArray: ArrayData): ArrayBasedMapData = {
    +    assert(keyToIndex.isEmpty, "'from' can only be called with a fresh GenericMapBuilder.")
    +    putAll(keyArray, valueArray)
    --- End diff --
    
    Ah, you are right.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #23124: [SPARK-25829][SQL] remove duplicated map keys with last ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/23124
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/5305/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #23124: [SPARK-25829][SQL] remove duplicated map keys with last ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/23124
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/99213/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #23124: [SPARK-25829][SQL] remove duplicated map keys with last ...

Posted by kiszk <gi...@git.apache.org>.
Github user kiszk commented on the issue:

    https://github.com/apache/spark/pull/23124
  
    retest this please


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #23124: [SPARK-25829][SQL] remove duplicated map keys with last ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/23124
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #23124: [SPARK-25829][SQL] remove duplicated map keys wit...

Posted by kiszk <gi...@git.apache.org>.
Github user kiszk commented on a diff in the pull request:

    https://github.com/apache/spark/pull/23124#discussion_r235950666
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ArrayBasedMapBuilder.scala ---
    @@ -0,0 +1,118 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.catalyst.util
    +
    +import scala.collection.mutable
    +
    +import org.apache.spark.sql.catalyst.InternalRow
    +import org.apache.spark.sql.types.{AtomicType, CalendarIntervalType, DataType, MapType}
    +
    +/**
    + * A builder of [[ArrayBasedMapData]], which fails if a null map key is detected, and removes
    + * duplicated map keys w.r.t. the last wins policy.
    + */
    +class ArrayBasedMapBuilder(keyType: DataType, valueType: DataType) extends Serializable {
    +  assert(!keyType.existsRecursively(_.isInstanceOf[MapType]), "key of map cannot be/contain map")
    +
    +  private lazy val keyToIndex = keyType match {
    +    case _: AtomicType | _: CalendarIntervalType => mutable.HashMap.empty[Any, Int]
    +    case _ =>
    +      // for complex types, use interpreted ordering to be able to compare unsafe data with safe
    +      // data, e.g. UnsafeRow vs GenericInternalRow.
    +      mutable.TreeMap.empty[Any, Int](TypeUtils.getInterpretedOrdering(keyType))
    +  }
    +
    +  // TODO: specialize it
    +  private lazy val keys = mutable.ArrayBuffer.empty[Any]
    +  private lazy val values = mutable.ArrayBuffer.empty[Any]
    +
    +  private lazy val keyGetter = InternalRow.getAccessor(keyType)
    +  private lazy val valueGetter = InternalRow.getAccessor(valueType)
    +
    +  def reset(): Unit = {
    +    keyToIndex.clear()
    +    keys.clear()
    +    values.clear()
    +  }
    +
    +  def put(key: Any, value: Any): Unit = {
    +    if (key == null) {
    +      throw new RuntimeException("Cannot use null as map key.")
    +    }
    +
    +    val maybeExistingIdx = keyToIndex.get(key)
    +    if (maybeExistingIdx.isDefined) {
    +      // Overwrite the previous value, as the policy is last wins.
    +      values(maybeExistingIdx.get) = value
    +    } else {
    +      keyToIndex.put(key, values.length)
    +      keys.append(key)
    +      values.append(value)
    +    }
    +  }
    +
    +  // write a 2-field row, the first field is key and the second field is value.
    +  def put(entry: InternalRow): Unit = {
    +    if (entry.isNullAt(0)) {
    +      throw new RuntimeException("Cannot use null as map key.")
    +    }
    +    put(keyGetter(entry, 0), valueGetter(entry, 1))
    +  }
    +
    +  def putAll(keyArray: Array[Any], valueArray: Array[Any]): Unit = {
    +    if (keyArray.length != valueArray.length) {
    +      throw new RuntimeException(
    +        "The key array and value array of MapData must have the same length.")
    +    }
    +
    +    var i = 0
    +    while (i < keyArray.length) {
    +      put(keyArray(i), valueArray(i))
    +      i += 1
    +    }
    +  }
    +
    +  def putAll(keyArray: ArrayData, valueArray: ArrayData): Unit = {
    +    if (keyArray.numElements() != valueArray.numElements()) {
    +      throw new RuntimeException(
    +        "The key array and value array of MapData must have the same length.")
    +    }
    +
    +    var i = 0
    +    while (i < keyArray.numElements()) {
    +      put(keyGetter(keyArray, i), valueGetter(valueArray, i))
    +      i += 1
    +    }
    +  }
    +
    +  def build(): ArrayBasedMapData = {
    +    new ArrayBasedMapData(new GenericArrayData(keys.toArray), new GenericArrayData(values.toArray))
    +  }
    +
    +  def from(keyArray: ArrayData, valueArray: ArrayData): ArrayBasedMapData = {
    +    assert(keyToIndex.isEmpty, "'from' can only be called with a fresh GenericMapBuilder.")
    +    putAll(keyArray, valueArray)
    +    if (keyToIndex.size == keyArray.numElements()) {
    +      // If there is no duplicated map keys, creates the MapData with the input key and value array,
    +      // as they might already in unsafe format and are more efficient.
    +      new ArrayBasedMapData(keyArray, valueArray)
    --- End diff --
    
    ditto in `build`


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #23124: [SPARK-25829][SQL] remove duplicated map keys with last ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/23124
  
    **[Test build #99312 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/99312/testReport)** for PR 23124 at commit [`abd0ec5`](https://github.com/apache/spark/commit/abd0ec543a944fa02320337f4fab7fff6ffe9667).
     * This patch **fails PySpark unit tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org