You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by do...@apache.org on 2019/03/24 06:13:47 UTC

[spark] branch master updated: [SPARK-27241][SQL] Support map_keys and map_values in SelectedField

This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 6f18ac9  [SPARK-27241][SQL] Support map_keys and map_values in SelectedField
6f18ac9 is described below

commit 6f18ac9e99a330d4df1826c5beae11d883a0fceb
Author: Liang-Chi Hsieh <vi...@gmail.com>
AuthorDate: Sat Mar 23 23:13:31 2019 -0700

    [SPARK-27241][SQL] Support map_keys and map_values in SelectedField
    
    ## What changes were proposed in this pull request?
    
    `SelectedField` doesn't support map_keys and map_values for now. When map key or value is complex struct, we should be able to prune unnecessary fields from keys/values. This proposes to add map_keys and map_values support to `SelectedField`.
    
    ## How was this patch tested?
    
    Added tests.
    
    Closes #24179 from viirya/SPARK-27241.
    
    Authored-by: Liang-Chi Hsieh <vi...@gmail.com>
    Signed-off-by: Dongjoon Hyun <dh...@apple.com>
---
 .../sql/catalyst/expressions/SelectedField.scala   |  22 +++++
 .../catalyst/expressions/SelectedFieldSuite.scala  | 103 ++++++++++++++++++++-
 2 files changed, 121 insertions(+), 4 deletions(-)

diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SelectedField.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SelectedField.scala
index 38a0481..7ba3d30 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SelectedField.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SelectedField.scala
@@ -97,6 +97,28 @@ object SelectedField {
         val MapType(keyType, _, valueContainsNull) = child.dataType
         val opt = dataTypeOpt.map(dt => MapType(keyType, dt, valueContainsNull))
         selectField(child, opt)
+      case MapValues(child) =>
+        val MapType(keyType, _, valueContainsNull) = child.dataType
+        // MapValues does not select a field from a struct (i.e. prune the struct) so it can't be
+        // the top-level extractor. However it can be part of an extractor chain.
+        val opt = dataTypeOpt.map {
+          case ArrayType(dataType, _) => MapType(keyType, dataType, valueContainsNull)
+          case x =>
+            // This should not happen.
+            throw new AnalysisException(s"DataType '$x' is not supported by MapValues.")
+        }
+        selectField(child, opt)
+      case MapKeys(child) =>
+        val MapType(_, valueType, valueContainsNull) = child.dataType
+        // MapKeys does not select a field from a struct (i.e. prune the struct) so it can't be
+        // the top-level extractor. However it can be part of an extractor chain.
+        val opt = dataTypeOpt.map {
+          case ArrayType(dataType, _) => MapType(dataType, valueType, valueContainsNull)
+          case x =>
+            // This should not happen.
+            throw new AnalysisException(s"DataType '$x' is not supported by MapKeys.")
+        }
+        selectField(child, opt)
       case GetArrayItem(child, _) =>
         // GetArrayItem does not select a field from a struct (i.e. prune the struct) so it can't be
         // the top-level extractor. However it can be part of an extractor chain.
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/SelectedFieldSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/SelectedFieldSuite.scala
index 7cfe4bf..6a3cc21 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/SelectedFieldSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/SelectedFieldSuite.scala
@@ -17,16 +17,15 @@
 
 package org.apache.spark.sql.catalyst.expressions
 
-import org.scalatest.BeforeAndAfterAll
 import org.scalatest.exceptions.TestFailedException
 
-import org.apache.spark.SparkFunSuite
+import org.apache.spark.sql.catalyst.analysis.AnalysisTest
 import org.apache.spark.sql.catalyst.dsl.plans._
 import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
 import org.apache.spark.sql.catalyst.plans.logical.LocalRelation
 import org.apache.spark.sql.types._
 
-class SelectedFieldSuite extends SparkFunSuite with BeforeAndAfterAll {
+class SelectedFieldSuite extends AnalysisTest {
   private val ignoredField = StructField("col1", StringType, nullable = false)
 
   // The test schema as a tree string, i.e. `schema.treeString`
@@ -317,6 +316,18 @@ class SelectedFieldSuite extends SparkFunSuite with BeforeAndAfterAll {
         StructField("subfield1", IntegerType) :: Nil)) :: Nil), valueContainsNull = false)))
   }
 
+  testSelect(arrayOfStruct, "map_values(col5[0]).field1.subfield1 as foo") {
+    StructField("col5", ArrayType(MapType(StringType, StructType(
+      StructField("field1", StructType(
+        StructField("subfield1", IntegerType) :: Nil)) :: Nil), valueContainsNull = false)))
+  }
+
+  testSelect(arrayOfStruct, "map_values(col5[0]).field1.subfield2 as foo") {
+    StructField("col5", ArrayType(MapType(StringType, StructType(
+      StructField("field1", StructType(
+        StructField("subfield2", IntegerType) :: Nil)) :: Nil), valueContainsNull = false)))
+  }
+
   //  |-- col1: string (nullable = false)
   //  |-- col6: map (nullable = true)
   //  |    |-- key: string
@@ -394,6 +405,90 @@ class SelectedFieldSuite extends SparkFunSuite with BeforeAndAfterAll {
         :: Nil)))
   }
 
+  //  |-- col1: string (nullable = false)
+  //  |-- col2: map (nullable = true)
+  //  |    |-- key: struct (containsNull = false)
+  //  |    |     |-- field1: string (nullable = true)
+  //  |    |     |-- field2: integer (nullable = true)
+  //  |    |-- value: array (valueContainsNull = true)
+  //  |    |    |-- element: struct (containsNull = false)
+  //  |    |    |    |-- field3: struct (nullable = true)
+  //  |    |    |    |    |-- subfield1: integer (nullable = true)
+  //  |    |    |    |    |-- subfield2: integer (nullable = true)
+  private val mapWithStructKey = StructType(Array(ignoredField,
+    StructField("col2", MapType(
+      StructType(
+        StructField("field1", StringType) ::
+        StructField("field2", IntegerType) :: Nil),
+      ArrayType(StructType(
+        StructField("field3", StructType(
+          StructField("subfield1", IntegerType) ::
+          StructField("subfield2", IntegerType) :: Nil)) :: Nil), containsNull = false)))))
+
+  testSelect(mapWithStructKey, "map_keys(col2).field1 as foo") {
+    StructField("col2", MapType(
+      StructType(StructField("field1", StringType) :: Nil),
+      ArrayType(StructType(
+        StructField("field3", StructType(
+          StructField("subfield1", IntegerType) ::
+          StructField("subfield2", IntegerType) :: Nil)) :: Nil), containsNull = false)))
+  }
+
+  testSelect(mapWithStructKey, "map_keys(col2).field2 as foo") {
+    StructField("col2", MapType(
+      StructType(StructField("field2", IntegerType) :: Nil),
+      ArrayType(StructType(
+        StructField("field3", StructType(
+          StructField("subfield1", IntegerType) ::
+          StructField("subfield2", IntegerType) :: Nil)) :: Nil), containsNull = false)))
+  }
+
+  //  |-- col1: string (nullable = false)
+  //  |-- col2: map (nullable = true)
+  //  |    |-- key: array (valueContainsNull = true)
+  //  |    |     |-- element: struct (containsNull = false)
+  //  |    |     |     |-- field1: string (nullable = true)
+  //  |    |     |     |-- field2: struct (containsNull = false)
+  //  |    |     |     |     |-- subfield1: integer (nullable = true)
+  //  |    |     |     |     |-- subfield2: long (nullable = true)
+  //  |    |-- value: array (valueContainsNull = true)
+  //  |    |    |-- element: struct (containsNull = false)
+  //  |    |    |    |-- field3: struct (nullable = true)
+  //  |    |    |    |    |-- subfield3: integer (nullable = true)
+  //  |    |    |    |    |-- subfield4: integer (nullable = true)
+  private val mapWithArrayOfStructKey = StructType(Array(ignoredField,
+    StructField("col2", MapType(
+      ArrayType(StructType(
+        StructField("field1", StringType) ::
+        StructField("field2", StructType(
+          StructField("subfield1", IntegerType) ::
+          StructField("subfield2", LongType) :: Nil)) :: Nil), containsNull = false),
+      ArrayType(StructType(
+        StructField("field3", StructType(
+          StructField("subfield3", IntegerType) ::
+          StructField("subfield4", IntegerType) :: Nil)) :: Nil), containsNull = false)))))
+
+  testSelect(mapWithArrayOfStructKey, "map_keys(col2)[0].field1 as foo") {
+    StructField("col2", MapType(
+      ArrayType(StructType(
+        StructField("field1", StringType) :: Nil), containsNull = false),
+      ArrayType(StructType(
+        StructField("field3", StructType(
+          StructField("subfield3", IntegerType) ::
+          StructField("subfield4", IntegerType) :: Nil)) :: Nil), containsNull = false)))
+  }
+
+  testSelect(mapWithArrayOfStructKey, "map_keys(col2)[0].field2.subfield1 as foo") {
+    StructField("col2", MapType(
+      ArrayType(StructType(
+        StructField("field2", StructType(
+          StructField("subfield1", IntegerType) :: Nil)) :: Nil), containsNull = false),
+      ArrayType(StructType(
+        StructField("field3", StructType(
+          StructField("subfield3", IntegerType) ::
+          StructField("subfield4", IntegerType) :: Nil)) :: Nil), containsNull = false)))
+  }
+
   def assertResult(expected: StructField)(actual: StructField)(selectExpr: String): Unit = {
     try {
       super.assertResult(expected)(actual)
@@ -439,7 +534,7 @@ class SelectedFieldSuite extends SparkFunSuite with BeforeAndAfterAll {
   private def unapplySelect(expr: String, relation: LocalRelation) = {
     val parsedExpr = parseAsCatalystExpression(Seq(expr)).head
     val select = relation.select(parsedExpr)
-    val analyzed = select.analyze
+    val analyzed = caseSensitiveAnalyzer.execute(select)
     SelectedField.unapply(analyzed.expressions.head)
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org