You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2020/10/05 01:20:33 UTC

[GitHub] [spark] maropu commented on a change in pull request #29942: [SPARK-33007][SQL] Simplify named_struct + get struct field + from_json expression chain

maropu commented on a change in pull request #29942:
URL: https://github.com/apache/spark/pull/29942#discussion_r499308242



##########
File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/OptimizeJsonExprs.scala
##########
@@ -28,10 +28,36 @@ import org.apache.spark.sql.types.{ArrayType, StructType}
  * The optimization includes:
  * 1. JsonToStructs(StructsToJson(child)) => child.
  * 2. Prune unnecessary columns from GetStructField/GetArrayStructFields + JsonToStructs.
+ * 3  struct(from_json.col1, from_json.col2, from_json.col3...) => struct(from_json)
  */
 object OptimizeJsonExprs extends Rule[LogicalPlan] {
   override def apply(plan: LogicalPlan): LogicalPlan = plan transform {
     case p => p.transformExpressions {
+
+      case c: CreateNamedStruct
+        if c.valExprs.forall(v => v.isInstanceOf[GetStructField] &&
+          v.asInstanceOf[GetStructField].child.isInstanceOf[JsonToStructs]) =>
+        val jsonToStructs = c.valExprs.map(_.children(0))

Review comment:
       nit: `_.children(0)` -> `_.children.head` my IDE suggested.

##########
File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/OptimizeJsonExprs.scala
##########
@@ -28,10 +28,36 @@ import org.apache.spark.sql.types.{ArrayType, StructType}
  * The optimization includes:
  * 1. JsonToStructs(StructsToJson(child)) => child.
  * 2. Prune unnecessary columns from GetStructField/GetArrayStructFields + JsonToStructs.
+ * 3  struct(from_json.col1, from_json.col2, from_json.col3...) => struct(from_json)
  */
 object OptimizeJsonExprs extends Rule[LogicalPlan] {
   override def apply(plan: LogicalPlan): LogicalPlan = plan transform {
     case p => p.transformExpressions {
+
+      case c: CreateNamedStruct
+        if c.valExprs.forall(v => v.isInstanceOf[GetStructField] &&
+          v.asInstanceOf[GetStructField].child.isInstanceOf[JsonToStructs]) =>
+        val jsonToStructs = c.valExprs.map(_.children(0))
+        val semanticEqual = jsonToStructs.tail.forall(jsonToStructs.head.semanticEquals(_))

Review comment:
       Can this check be merged with L39? https://github.com/apache/spark/pull/29942/files#diff-f9d27e3c9c32aaf07bb038c779309414R39

##########
File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/OptimizeJsonExprs.scala
##########
@@ -28,10 +28,36 @@ import org.apache.spark.sql.types.{ArrayType, StructType}
  * The optimization includes:
  * 1. JsonToStructs(StructsToJson(child)) => child.
  * 2. Prune unnecessary columns from GetStructField/GetArrayStructFields + JsonToStructs.
+ * 3  struct(from_json.col1, from_json.col2, from_json.col3...) => struct(from_json)
  */
 object OptimizeJsonExprs extends Rule[LogicalPlan] {
   override def apply(plan: LogicalPlan): LogicalPlan = plan transform {
     case p => p.transformExpressions {
+
+      case c: CreateNamedStruct
+        if c.valExprs.forall(v => v.isInstanceOf[GetStructField] &&
+          v.asInstanceOf[GetStructField].child.isInstanceOf[JsonToStructs]) =>
+        val jsonToStructs = c.valExprs.map(_.children(0))
+        val semanticEqual = jsonToStructs.tail.forall(jsonToStructs.head.semanticEquals(_))
+        val sameFieldName = c.names.zip(c.valExprs).forall {
+          case (name, valExpr: GetStructField) =>
+            name.toString == valExpr.childSchema(valExpr.ordinal).name
+          case (_, _) => false
+        }
+
+        // If we create struct from various fields of the same `JsonToStructs` and we don't
+        // alias field names.
+        if (semanticEqual && sameFieldName) {
+          val fromJson = jsonToStructs.head.asInstanceOf[JsonToStructs].copy(schema = c.dataType)
+          val nullFields = c.children.grouped(2).map {
+            case Seq(name, value) => Seq(name, Literal(null, value.dataType))
+          }.flatten.toSeq
+
+          If(IsNull(fromJson.child), c.copy(children = nullFields), KnownNotNull(fromJson))

Review comment:
       Is this related to this optimization? This looks more general to me.

##########
File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/OptimizeJsonExprs.scala
##########
@@ -28,10 +28,36 @@ import org.apache.spark.sql.types.{ArrayType, StructType}
  * The optimization includes:
  * 1. JsonToStructs(StructsToJson(child)) => child.
  * 2. Prune unnecessary columns from GetStructField/GetArrayStructFields + JsonToStructs.
+ * 3  struct(from_json.col1, from_json.col2, from_json.col3...) => struct(from_json)
  */
 object OptimizeJsonExprs extends Rule[LogicalPlan] {
   override def apply(plan: LogicalPlan): LogicalPlan = plan transform {
     case p => p.transformExpressions {
+
+      case c: CreateNamedStruct
+        if c.valExprs.forall(v => v.isInstanceOf[GetStructField] &&
+          v.asInstanceOf[GetStructField].child.isInstanceOf[JsonToStructs]) =>
+        val jsonToStructs = c.valExprs.map(_.children(0))
+        val semanticEqual = jsonToStructs.tail.forall(jsonToStructs.head.semanticEquals(_))
+        val sameFieldName = c.names.zip(c.valExprs).forall {
+          case (name, valExpr: GetStructField) =>
+            name.toString == valExpr.childSchema(valExpr.ordinal).name
+          case (_, _) => false
+        }
+
+        // If we create struct from various fields of the same `JsonToStructs` and we don't
+        // alias field names.
+        if (semanticEqual && sameFieldName) {
+          val fromJson = jsonToStructs.head.asInstanceOf[JsonToStructs].copy(schema = c.dataType)
+          val nullFields = c.children.grouped(2).map {

Review comment:
       `map` -> `flatMap`

##########
File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/OptimizeJsonExprs.scala
##########
@@ -28,10 +28,36 @@ import org.apache.spark.sql.types.{ArrayType, StructType}
  * The optimization includes:
  * 1. JsonToStructs(StructsToJson(child)) => child.
  * 2. Prune unnecessary columns from GetStructField/GetArrayStructFields + JsonToStructs.
+ * 3  struct(from_json.col1, from_json.col2, from_json.col3...) => struct(from_json)
  */
 object OptimizeJsonExprs extends Rule[LogicalPlan] {
   override def apply(plan: LogicalPlan): LogicalPlan = plan transform {
     case p => p.transformExpressions {
+
+      case c: CreateNamedStruct
+        if c.valExprs.forall(v => v.isInstanceOf[GetStructField] &&
+          v.asInstanceOf[GetStructField].child.isInstanceOf[JsonToStructs]) =>
+        val jsonToStructs = c.valExprs.map(_.children(0))
+        val semanticEqual = jsonToStructs.tail.forall(jsonToStructs.head.semanticEquals(_))
+        val sameFieldName = c.names.zip(c.valExprs).forall {
+          case (name, valExpr: GetStructField) =>
+            name.toString == valExpr.childSchema(valExpr.ordinal).name

Review comment:
       Does this work correctly if multiple values refer to the same ordinal?
   ```
   scala> sql("""SELECT from_json('{"a":1, "b":0.8}', 'a INT, b DOUBLE') v""").createOrReplaceTempView("t")
   scala> sql("select named_struct('a', t.v.a, 'a', t.v.a) from t").explain(true)
   == Parsed Logical Plan ==
   'Project [unresolvedalias('named_struct(a, 't.v.a, a, 't.v.a), None)]
   +- 'UnresolvedRelation [t], [], false
   
   == Analyzed Logical Plan ==
   named_struct(a, v.a AS `a`, a, v.a AS `a`): struct<a:int,a:int>
   Project [named_struct(a, v#128.a, a, v#128.a) AS named_struct(a, v.a AS `a`, a, v.a AS `a`)#133]
   +- SubqueryAlias t
      +- Project [from_json(StructField(a,IntegerType,true), StructField(b,DoubleType,true), {"a":1, "b":0.8}, Some(Asia/Tokyo)) AS v#128]
         +- OneRowRelation
   
   == Optimized Logical Plan ==
   Project [[1,1] AS named_struct(a, v.a AS `a`, a, v.a AS `a`)#133]
   +- OneRowRelation
   
   == Physical Plan ==
   *(1) Project [[1,1] AS named_struct(a, v.a AS `a`, a, v.a AS `a`)#133]
   +- *(1) Scan OneRowRelation[]
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org