You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2020/10/05 19:45:31 UTC

[GitHub] [spark] sunchao commented on a change in pull request #29942: [SPARK-33007][SQL] Simplify named_struct + get struct field + from_json expression chain

sunchao commented on a change in pull request #29942:
URL: https://github.com/apache/spark/pull/29942#discussion_r499830227



##########
File path: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizeJsonExprsSuite.scala
##########
@@ -199,4 +199,51 @@ class OptimizeJsonExprsSuite extends PlanTest with ExpressionEvalHelper {
         JsonToStructs(prunedSchema2, options, 'json), field2, 0, 1, false).as("b")).analyze
     comparePlans(optimized2, expected2)
   }
+
+  test("SPARK-33007: simplify named_struct + from_json") {
+    val options = Map.empty[String, String]
+    val schema = StructType.fromDDL("a int, b int, c long, d string")
+
+    val query1 = testRelation2
+      .select(namedStruct(
+        "a", GetStructField(JsonToStructs(schema, options, 'json), 0),
+        "b", GetStructField(JsonToStructs(schema, options, 'json), 1)).as("struct"))
+    val optimized1 = Optimizer.execute(query1.analyze)
+
+    val prunedSchema1 = StructType.fromDDL("a int, b int")
+    val nullStruct = namedStruct("a", Literal(null, IntegerType), "b", Literal(null, IntegerType))
+    val expected1 = testRelation2
+      .select(
+        If(IsNull('json),
+          nullStruct,
+          KnownNotNull(JsonToStructs(prunedSchema1, options, 'json))).as("struct")).analyze
+    comparePlans(optimized1, expected1)
+
+    // Skip it if `namedStruct` aliases field name.
+    val field1 = StructType.fromDDL("a int")
+    val field2 = StructType.fromDDL("b int")
+    val query2 = testRelation2
+      .select(namedStruct(
+        "a1", GetStructField(JsonToStructs(schema, options, 'json), 0),
+        "b", GetStructField(JsonToStructs(schema, options, 'json), 1)).as("struct"))
+    val optimized2 = Optimizer.execute(query2.analyze)

Review comment:
       seems this is a bit repetitive - perhaps we can create a util method for the comparison? we can test evaluation in the method too.

##########
File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/OptimizeJsonExprs.scala
##########
@@ -28,10 +28,45 @@ import org.apache.spark.sql.types.{ArrayType, StructType}
  * The optimization includes:
  * 1. JsonToStructs(StructsToJson(child)) => child.
  * 2. Prune unnecessary columns from GetStructField/GetArrayStructFields + JsonToStructs.
+ * 3. CreateNamedStruct(JsonToStructs(json).col1, JsonToStructs(json).col2, ...) =>
+ *      CreateNamedStruct(JsonToStructs(json)) if JsonToStructs(json) is shared among all
+ *      fields of CreateNamedStruct.
  */
 object OptimizeJsonExprs extends Rule[LogicalPlan] {
   override def apply(plan: LogicalPlan): LogicalPlan = plan transform {
     case p => p.transformExpressions {
+
+      case c: CreateNamedStruct
+          // If we create struct from various fields of the same `JsonToStructs`.
+          if c.valExprs.forall { v =>
+            v.isInstanceOf[GetStructField] &&
+              v.asInstanceOf[GetStructField].child.isInstanceOf[JsonToStructs] &&
+              v.children.head.semanticEquals(c.valExprs.head.children.head)
+          } =>
+        val jsonToStructs = c.valExprs.map(_.children.head)
+        val sameFieldName = c.names.zip(c.valExprs).forall {
+          case (name, valExpr: GetStructField) =>
+            name.toString == valExpr.childSchema(valExpr.ordinal).name
+          case _ => false
+        }
+
+        // Although `CreateNamedStruct` allows duplicated field names, e.g. "a int, a int",
+        // `JsonToStructs` does not support parsing json with duplicated field names.
+        val duplicateFields = c.names.map(_.toString).distinct.length != c.names.length
+
+        // If we create struct from various fields of the same `JsonToStructs` and we don't
+        // alias field names and there is not duplicated fields in the struct.

Review comment:
       nit: "there is not duplicated fields" -> "there is no duplicated field"

##########
File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/OptimizeJsonExprs.scala
##########
@@ -28,10 +28,45 @@ import org.apache.spark.sql.types.{ArrayType, StructType}
  * The optimization includes:
  * 1. JsonToStructs(StructsToJson(child)) => child.
  * 2. Prune unnecessary columns from GetStructField/GetArrayStructFields + JsonToStructs.
+ * 3. CreateNamedStruct(JsonToStructs(json).col1, JsonToStructs(json).col2, ...) =>
+ *      CreateNamedStruct(JsonToStructs(json)) if JsonToStructs(json) is shared among all
+ *      fields of CreateNamedStruct.

Review comment:
       For a fresh eye with no context this is still a bit confusing - does the list `col1`, `col2` etc have to represent all columns in the `json` struct? 

##########
File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/OptimizeJsonExprs.scala
##########
@@ -28,10 +28,43 @@ import org.apache.spark.sql.types.{ArrayType, StructType}
  * The optimization includes:
  * 1. JsonToStructs(StructsToJson(child)) => child.
  * 2. Prune unnecessary columns from GetStructField/GetArrayStructFields + JsonToStructs.
+ * 3. struct(from_json.col1, from_json.col2, from_json.col3...) => struct(from_json)

Review comment:
       Perhaps explain a little bit on what this does? without any context I'm assuming `col1`, `col2`, `col3` etc are all columns for `from_json`? 

##########
File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/OptimizeJsonExprs.scala
##########
@@ -28,10 +28,36 @@ import org.apache.spark.sql.types.{ArrayType, StructType}
  * The optimization includes:
  * 1. JsonToStructs(StructsToJson(child)) => child.
  * 2. Prune unnecessary columns from GetStructField/GetArrayStructFields + JsonToStructs.
+ * 3  struct(from_json.col1, from_json.col2, from_json.col3...) => struct(from_json)

Review comment:
       perhaps explain a bit more on what this does? with no context I'm assuming `from_json` contains all columns `col1`, `col2`, `col3` etc?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org