You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by gu...@apache.org on 2020/03/05 11:49:27 UTC

[spark] branch branch-3.0 updated: [SPARK-31019][SQL] make it clear that people can deduplicate map keys

This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new daa140d  [SPARK-31019][SQL] make it clear that people can deduplicate map keys
daa140d is described below

commit daa140df726f8521ae7bbaf4586277ae4f4aea7c
Author: Wenchen Fan <we...@databricks.com>
AuthorDate: Thu Mar 5 20:43:52 2020 +0900

    [SPARK-31019][SQL] make it clear that people can deduplicate map keys
    
    rename the config and make it non-internal.
    
    Now we fail the query if duplicated map keys are detected, and provide a legacy config to deduplicate it. However, we must provide a way to get users out of this situation, instead of just rejecting to run the query. This exit strategy should always be there, while legacy config indicates that it may be removed someday.
    
    no, just rename a config which was added in 3.0
    
    add more tests for the fail behavior.
    
    Closes #27772 from cloud-fan/map.
    
    Authored-by: Wenchen Fan <we...@databricks.com>
    Signed-off-by: HyukjinKwon <gu...@apache.org>
---
 docs/sql-migration-guide.md                        |  2 +-
 .../sql/catalyst/util/ArrayBasedMapBuilder.scala   | 20 +++---
 .../org/apache/spark/sql/internal/SQLConf.scala    | 25 +++++---
 .../expressions/CollectionExpressionsSuite.scala   | 14 ++++-
 .../catalyst/expressions/ComplexTypeSuite.scala    | 15 ++++-
 .../expressions/HigherOrderFunctionsSuite.scala    |  5 +-
 .../catalyst/util/ArrayBasedMapBuilderSuite.scala  | 71 ++++++++++++++--------
 .../apache/spark/sql/DataFrameFunctionsSuite.scala | 13 +++-
 8 files changed, 112 insertions(+), 53 deletions(-)

diff --git a/docs/sql-migration-guide.md b/docs/sql-migration-guide.md
index 0050061..6c73038 100644
--- a/docs/sql-migration-guide.md
+++ b/docs/sql-migration-guide.md
@@ -49,7 +49,7 @@ license: |
 
   - In Spark version 2.4 and earlier, float/double -0.0 is semantically equal to 0.0, but -0.0 and 0.0 are considered as different values when used in aggregate grouping keys, window partition keys and join keys. Since Spark 3.0, this bug is fixed. For example, `Seq(-0.0, 0.0).toDF("d").groupBy("d").count()` returns `[(0.0, 2)]` in Spark 3.0, and `[(0.0, 1), (-0.0, 1)]` in Spark 2.4 and earlier.
 
-  - In Spark version 2.4 and earlier, users can create a map with duplicated keys via built-in functions like `CreateMap`, `StringToMap`, etc. The behavior of map with duplicated keys is undefined, e.g. map look up respects the duplicated key appears first, `Dataset.collect` only keeps the duplicated key appears last, `MapKeys` returns duplicated keys, etc. Since Spark 3.0, new config `spark.sql.legacy.allowDuplicatedMapKeys` was added, with the default value `false`, Spark will throw Ru [...]
+  - In Spark version 2.4 and earlier, users can create a map with duplicated keys via built-in functions like `CreateMap`, `StringToMap`, etc. The behavior of map with duplicated keys is undefined, e.g. map look up respects the duplicated key appears first, `Dataset.collect` only keeps the duplicated key appears last, `MapKeys` returns duplicated keys, etc. Since Spark 3.0, Spark will throw RuntimeException while duplicated keys are found. Users can set `spark.sql.mapKeyDedupPolicy` to L [...]
 
   - In Spark version 2.4 and earlier, partition column value is converted as null if it can't be casted to corresponding user provided schema. Since 3.0, partition column value is validated with user provided schema. An exception is thrown if the validation fails. You can disable such validation by setting `spark.sql.sources.validatePartitionColumns` to `false`.
 
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ArrayBasedMapBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ArrayBasedMapBuilder.scala
index 40e75b5..0185b57 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ArrayBasedMapBuilder.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ArrayBasedMapBuilder.scala
@@ -21,7 +21,6 @@ import scala.collection.mutable
 
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.sql.internal.SQLConf.LEGACY_ALLOW_DUPLICATED_MAP_KEY
 import org.apache.spark.sql.types._
 import org.apache.spark.unsafe.array.ByteArrayMethods
 
@@ -49,8 +48,7 @@ class ArrayBasedMapBuilder(keyType: DataType, valueType: DataType) extends Seria
   private lazy val keyGetter = InternalRow.getAccessor(keyType)
   private lazy val valueGetter = InternalRow.getAccessor(valueType)
 
-  private val allowDuplicatedMapKey =
-    SQLConf.get.getConf(LEGACY_ALLOW_DUPLICATED_MAP_KEY)
+  private val mapKeyDedupPolicy = SQLConf.get.getConf(SQLConf.MAP_KEY_DEDUP_POLICY)
 
   def put(key: Any, value: Any): Unit = {
     if (key == null) {
@@ -67,13 +65,17 @@ class ArrayBasedMapBuilder(keyType: DataType, valueType: DataType) extends Seria
       keys.append(key)
       values.append(value)
     } else {
-      if (!allowDuplicatedMapKey) {
-        throw new RuntimeException(s"Duplicate map key $key was founded, please check the input " +
-          "data. If you want to remove the duplicated keys with last-win policy, you can set " +
-          s"${LEGACY_ALLOW_DUPLICATED_MAP_KEY.key} to true.")
+      if (mapKeyDedupPolicy == SQLConf.MapKeyDedupPolicy.EXCEPTION.toString) {
+        throw new RuntimeException(s"Duplicate map key $key was found, please check the input " +
+          "data. If you want to remove the duplicated keys, you can set " +
+          s"${SQLConf.MAP_KEY_DEDUP_POLICY.key} to ${SQLConf.MapKeyDedupPolicy.LAST_WIN} so that " +
+          "the key inserted at last takes precedence.")
+      } else if (mapKeyDedupPolicy == SQLConf.MapKeyDedupPolicy.LAST_WIN.toString) {
+        // Overwrite the previous value, as the policy is last wins.
+        values(index) = value
+      } else {
+        throw new IllegalStateException("Unknown map key dedup policy: " + mapKeyDedupPolicy)
       }
-      // Overwrite the previous value, as the policy is last wins.
-      values(index) = value
     }
   }
 
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index e39d066..68a89b2 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -2095,6 +2095,21 @@ object SQLConf {
       .stringConf
       .createOptional
 
+  object MapKeyDedupPolicy extends Enumeration {
+    val EXCEPTION, LAST_WIN = Value
+  }
+
+  val MAP_KEY_DEDUP_POLICY = buildConf("spark.sql.mapKeyDedupPolicy")
+    .doc("The policy to deduplicate map keys in builtin function: CreateMap, MapFromArrays, " +
+      "MapFromEntries, StringToMap, MapConcat and TransformKeys. When EXCEPTION, the query " +
+      "fails if duplicated map keys are detected. When LAST_WIN, the map key that is inserted " +
+      "at last takes precedence.")
+    .version("3.0.0")
+    .stringConf
+    .transform(_.toUpperCase(Locale.ROOT))
+    .checkValues(MapKeyDedupPolicy.values.map(_.toString))
+    .createWithDefault(MapKeyDedupPolicy.EXCEPTION.toString)
+
   val LEGACY_LOOSE_UPCAST = buildConf("spark.sql.legacy.doLooseUpcast")
     .internal()
     .doc("When true, the upcast will be loose and allows string to atomic types.")
@@ -2202,16 +2217,6 @@ object SQLConf {
     .booleanConf
     .createWithDefault(false)
 
-  val LEGACY_ALLOW_DUPLICATED_MAP_KEY =
-    buildConf("spark.sql.legacy.allowDuplicatedMapKeys")
-      .doc("When true, use last wins policy to remove duplicated map keys in built-in functions, " +
-        "this config takes effect in below build-in functions: CreateMap, MapFromArrays, " +
-        "MapFromEntries, StringToMap, MapConcat and TransformKeys. Otherwise, if this is false, " +
-        "which is the default, Spark will throw an exception when duplicated map keys are " +
-        "detected.")
-      .booleanConf
-      .createWithDefault(false)
-
   val LEGACY_ALLOW_HASH_ON_MAPTYPE = buildConf("spark.sql.legacy.allowHashOnMapType")
     .doc("When set to true, hash expressions can be applied on elements of MapType. Otherwise, " +
       "an analysis exception will be thrown.")
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CollectionExpressionsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CollectionExpressionsSuite.scala
index 01df667..3cfc66f 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CollectionExpressionsSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CollectionExpressionsSuite.scala
@@ -139,7 +139,9 @@ class CollectionExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper
       MapType(IntegerType, IntegerType, valueContainsNull = true))
     val mNull = Literal.create(null, MapType(StringType, StringType))
 
-    withSQLConf(SQLConf.LEGACY_ALLOW_DUPLICATED_MAP_KEY.key -> "true") {
+    checkExceptionInExpression[RuntimeException](
+      MapConcat(Seq(m0, m1)), "Duplicate map key")
+    withSQLConf(SQLConf.MAP_KEY_DEDUP_POLICY.key -> SQLConf.MapKeyDedupPolicy.LAST_WIN.toString) {
       // overlapping maps should remove duplicated map keys w.r.t. last win policy.
       checkEvaluation(MapConcat(Seq(m0, m1)), create_map("a" -> "4", "b" -> "2", "c" -> "3"))
     }
@@ -274,7 +276,10 @@ class CollectionExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper
     checkEvaluation(MapFromEntries(ai1), create_map(1 -> null, 2 -> 20, 3 -> null))
     checkEvaluation(MapFromEntries(ai2), Map.empty)
     checkEvaluation(MapFromEntries(ai3), null)
-    withSQLConf(SQLConf.LEGACY_ALLOW_DUPLICATED_MAP_KEY.key -> "true") {
+
+    checkExceptionInExpression[RuntimeException](
+      MapFromEntries(ai4), "Duplicate map key")
+    withSQLConf(SQLConf.MAP_KEY_DEDUP_POLICY.key -> SQLConf.MapKeyDedupPolicy.LAST_WIN.toString) {
       // Duplicated map keys will be removed w.r.t. the last wins policy.
       checkEvaluation(MapFromEntries(ai4), create_map(1 -> 20))
     }
@@ -298,7 +303,10 @@ class CollectionExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper
     checkEvaluation(MapFromEntries(as1), create_map("a" -> null, "b" -> "bb", "c" -> null))
     checkEvaluation(MapFromEntries(as2), Map.empty)
     checkEvaluation(MapFromEntries(as3), null)
-    withSQLConf(SQLConf.LEGACY_ALLOW_DUPLICATED_MAP_KEY.key -> "true") {
+
+    checkExceptionInExpression[RuntimeException](
+      MapFromEntries(as4), "Duplicate map key")
+    withSQLConf(SQLConf.MAP_KEY_DEDUP_POLICY.key -> SQLConf.MapKeyDedupPolicy.LAST_WIN.toString) {
       // Duplicated map keys will be removed w.r.t. the last wins policy.
       checkEvaluation(MapFromEntries(as4), create_map("a" -> "bb"))
     }
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ComplexTypeSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ComplexTypeSuite.scala
index 2c1e0c8..3df7d02 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ComplexTypeSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ComplexTypeSuite.scala
@@ -217,7 +217,9 @@ class ComplexTypeSuite extends SparkFunSuite with ExpressionEvalHelper {
       CreateMap(interlace(strWithNull, intSeq.map(Literal(_)))),
       "Cannot use null as map key")
 
-    withSQLConf(SQLConf.LEGACY_ALLOW_DUPLICATED_MAP_KEY.key -> "true") {
+    checkExceptionInExpression[RuntimeException](
+      CreateMap(Seq(Literal(1), Literal(2), Literal(1), Literal(3))), "Duplicate map key")
+    withSQLConf(SQLConf.MAP_KEY_DEDUP_POLICY.key -> SQLConf.MapKeyDedupPolicy.LAST_WIN.toString) {
       // Duplicated map keys will be removed w.r.t. the last wins policy.
       checkEvaluation(
         CreateMap(Seq(Literal(1), Literal(2), Literal(1), Literal(3))),
@@ -284,7 +286,12 @@ class ComplexTypeSuite extends SparkFunSuite with ExpressionEvalHelper {
       MapFromArrays(intWithNullArray, strArray),
       "Cannot use null as map key")
 
-    withSQLConf(SQLConf.LEGACY_ALLOW_DUPLICATED_MAP_KEY.key -> "true") {
+    checkExceptionInExpression[RuntimeException](
+      MapFromArrays(
+        Literal.create(Seq(1, 1), ArrayType(IntegerType)),
+        Literal.create(Seq(2, 3), ArrayType(IntegerType))),
+      "Duplicate map key")
+    withSQLConf(SQLConf.MAP_KEY_DEDUP_POLICY.key -> SQLConf.MapKeyDedupPolicy.LAST_WIN.toString) {
       // Duplicated map keys will be removed w.r.t. the last wins policy.
       checkEvaluation(
         MapFromArrays(
@@ -404,7 +411,9 @@ class ComplexTypeSuite extends SparkFunSuite with ExpressionEvalHelper {
     val m5 = Map("a" -> null)
     checkEvaluation(new StringToMap(s5), m5)
 
-    withSQLConf(SQLConf.LEGACY_ALLOW_DUPLICATED_MAP_KEY.key -> "true") {
+    checkExceptionInExpression[RuntimeException](
+      new StringToMap(Literal("a:1,b:2,a:3")), "Duplicate map key")
+    withSQLConf(SQLConf.MAP_KEY_DEDUP_POLICY.key -> SQLConf.MapKeyDedupPolicy.LAST_WIN.toString) {
       // Duplicated map keys will be removed w.r.t. the last wins policy.
       checkEvaluation(
         new StringToMap(Literal("a:1,b:2,a:3")),
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/HigherOrderFunctionsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/HigherOrderFunctionsSuite.scala
index b343853..c07f06e 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/HigherOrderFunctionsSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/HigherOrderFunctionsSuite.scala
@@ -465,7 +465,10 @@ class HigherOrderFunctionsSuite extends SparkFunSuite with ExpressionEvalHelper
     checkEvaluation(
       transformKeys(transformKeys(ai0, plusOne), plusValue),
       create_map(3 -> 1, 5 -> 2, 7 -> 3, 9 -> 4))
-    withSQLConf(SQLConf.LEGACY_ALLOW_DUPLICATED_MAP_KEY.key -> "true") {
+
+    checkExceptionInExpression[RuntimeException](
+      transformKeys(ai0, modKey), "Duplicate map key")
+    withSQLConf(SQLConf.MAP_KEY_DEDUP_POLICY.key -> SQLConf.MapKeyDedupPolicy.LAST_WIN.toString) {
       // Duplicated map keys will be removed w.r.t. the last wins policy.
       checkEvaluation(transformKeys(ai0, modKey), create_map(1 -> 4, 2 -> 2, 0 -> 3))
     }
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/ArrayBasedMapBuilderSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/ArrayBasedMapBuilderSuite.scala
index 87bbdb7..6e07cd5 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/ArrayBasedMapBuilderSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/ArrayBasedMapBuilderSuite.scala
@@ -48,11 +48,11 @@ class ArrayBasedMapBuilderSuite extends SparkFunSuite with SQLHelper {
     val builder = new ArrayBasedMapBuilder(IntegerType, IntegerType)
     builder.put(1, 1)
     val e = intercept[RuntimeException](builder.put(1, 2))
-    assert(e.getMessage.contains("Duplicate map key 1 was founded"))
+    assert(e.getMessage.contains("Duplicate map key 1 was found"))
   }
 
   test("remove duplicated keys with last wins policy") {
-    withSQLConf(SQLConf.LEGACY_ALLOW_DUPLICATED_MAP_KEY.key -> "true") {
+    withSQLConf(SQLConf.MAP_KEY_DEDUP_POLICY.key -> SQLConf.MapKeyDedupPolicy.LAST_WIN.toString) {
       val builder = new ArrayBasedMapBuilder(IntegerType, IntegerType)
       builder.put(1, 1)
       builder.put(2, 2)
@@ -63,8 +63,15 @@ class ArrayBasedMapBuilderSuite extends SparkFunSuite with SQLHelper {
     }
   }
 
-  test("binary type key") {
-    withSQLConf(SQLConf.LEGACY_ALLOW_DUPLICATED_MAP_KEY.key -> "true") {
+  test("binary type key with duplication") {
+    val builder = new ArrayBasedMapBuilder(BinaryType, IntegerType)
+    builder.put(Array(1.toByte), 1)
+    builder.put(Array(2.toByte), 2)
+    val e = intercept[RuntimeException](builder.put(Array(1.toByte), 3))
+    // By default duplicated map key fails the query.
+    assert(e.getMessage.contains("Duplicate map key"))
+
+    withSQLConf(SQLConf.MAP_KEY_DEDUP_POLICY.key -> SQLConf.MapKeyDedupPolicy.LAST_WIN.toString) {
       val builder = new ArrayBasedMapBuilder(BinaryType, IntegerType)
       builder.put(Array(1.toByte), 1)
       builder.put(Array(2.toByte), 2)
@@ -79,18 +86,26 @@ class ArrayBasedMapBuilderSuite extends SparkFunSuite with SQLHelper {
     }
   }
 
-  test("struct type key") {
-    withSQLConf(SQLConf.LEGACY_ALLOW_DUPLICATED_MAP_KEY.key -> "true") {
+  test("struct type key with duplication") {
+    val unsafeRow = {
+      val row = new UnsafeRow(1)
+      val bytes = new Array[Byte](16)
+      row.pointTo(bytes, 16)
+      row.setInt(0, 1)
+      row
+    }
+
+    val builder = new ArrayBasedMapBuilder(new StructType().add("i", "int"), IntegerType)
+    builder.put(InternalRow(1), 1)
+    builder.put(InternalRow(2), 2)
+    val e = intercept[RuntimeException](builder.put(unsafeRow, 3))
+    // By default duplicated map key fails the query.
+    assert(e.getMessage.contains("Duplicate map key"))
+
+    withSQLConf(SQLConf.MAP_KEY_DEDUP_POLICY.key -> SQLConf.MapKeyDedupPolicy.LAST_WIN.toString) {
       val builder = new ArrayBasedMapBuilder(new StructType().add("i", "int"), IntegerType)
       builder.put(InternalRow(1), 1)
       builder.put(InternalRow(2), 2)
-      val unsafeRow = {
-        val row = new UnsafeRow(1)
-        val bytes = new Array[Byte](16)
-        row.pointTo(bytes, 16)
-        row.setInt(0, 1)
-        row
-      }
       builder.put(unsafeRow, 3)
       val map = builder.build()
       assert(map.numElements() == 2)
@@ -98,20 +113,28 @@ class ArrayBasedMapBuilderSuite extends SparkFunSuite with SQLHelper {
     }
   }
 
-  test("array type key") {
-    withSQLConf(SQLConf.LEGACY_ALLOW_DUPLICATED_MAP_KEY.key -> "true") {
+  test("array type key with duplication") {
+    val unsafeArray = {
+      val array = new UnsafeArrayData()
+      val bytes = new Array[Byte](24)
+      Platform.putLong(bytes, Platform.BYTE_ARRAY_OFFSET, 2)
+      array.pointTo(bytes, Platform.BYTE_ARRAY_OFFSET, 24)
+      array.setInt(0, 1)
+      array.setInt(1, 1)
+      array
+    }
+
+    val builder = new ArrayBasedMapBuilder(ArrayType(IntegerType), IntegerType)
+    builder.put(new GenericArrayData(Seq(1, 1)), 1)
+    builder.put(new GenericArrayData(Seq(2, 2)), 2)
+    val e = intercept[RuntimeException](builder.put(unsafeArray, 3))
+    // By default duplicated map key fails the query.
+    assert(e.getMessage.contains("Duplicate map key"))
+
+    withSQLConf(SQLConf.MAP_KEY_DEDUP_POLICY.key -> SQLConf.MapKeyDedupPolicy.LAST_WIN.toString) {
       val builder = new ArrayBasedMapBuilder(ArrayType(IntegerType), IntegerType)
       builder.put(new GenericArrayData(Seq(1, 1)), 1)
       builder.put(new GenericArrayData(Seq(2, 2)), 2)
-      val unsafeArray = {
-        val array = new UnsafeArrayData()
-        val bytes = new Array[Byte](24)
-        Platform.putLong(bytes, Platform.BYTE_ARRAY_OFFSET, 2)
-        array.pointTo(bytes, Platform.BYTE_ARRAY_OFFSET, 24)
-        array.setInt(0, 1)
-        array.setInt(1, 1)
-        array
-      }
       builder.put(unsafeArray, 3)
       val map = builder.build()
       assert(map.numElements() == 2)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameFunctionsSuite.scala
index b4b9a48..a613c33 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameFunctionsSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameFunctionsSuite.scala
@@ -23,6 +23,7 @@ import java.util.TimeZone
 
 import scala.util.Random
 
+import org.apache.spark.SparkException
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.Expression
 import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback
@@ -651,7 +652,9 @@ class DataFrameFunctionsSuite extends QueryTest with SharedSparkSession {
       Row(null)
     )
 
-    withSQLConf(SQLConf.LEGACY_ALLOW_DUPLICATED_MAP_KEY.key -> "true") {
+    intercept[SparkException](df1.selectExpr("map_concat(map1, map2)").collect())
+    intercept[SparkException](df1.select(map_concat($"map1", $"map2")).collect())
+    withSQLConf(SQLConf.MAP_KEY_DEDUP_POLICY.key -> SQLConf.MapKeyDedupPolicy.LAST_WIN.toString) {
       checkAnswer(df1.selectExpr("map_concat(map1, map2)"), expected1a)
       checkAnswer(df1.select(map_concat($"map1", $"map2")), expected1a)
     }
@@ -3070,7 +3073,13 @@ class DataFrameFunctionsSuite extends QueryTest with SharedSparkSession {
       checkAnswer(dfExample2.select(transform_keys(col("j"), (k, v) => k + v)),
         Seq(Row(Map(2.0 -> 1.0, 3.4 -> 1.4, 4.7 -> 1.7))))
 
-      withSQLConf(SQLConf.LEGACY_ALLOW_DUPLICATED_MAP_KEY.key -> "true") {
+      intercept[SparkException] {
+        dfExample3.selectExpr("transform_keys(x, (k, v) ->  k % 2 = 0 OR v)").collect()
+      }
+      intercept[SparkException] {
+        dfExample3.select(transform_keys(col("x"), (k, v) => k % 2 === 0 || v)).collect()
+      }
+      withSQLConf(SQLConf.MAP_KEY_DEDUP_POLICY.key -> SQLConf.MapKeyDedupPolicy.LAST_WIN.toString) {
         checkAnswer(dfExample3.selectExpr("transform_keys(x, (k, v) ->  k % 2 = 0 OR v)"),
           Seq(Row(Map(true -> true, true -> false))))
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org