You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by li...@apache.org on 2018/07/12 00:30:47 UTC

spark git commit: [SPARK-24782][SQL] Simplify conf retrieval in SQL expressions

Repository: spark
Updated Branches:
  refs/heads/master ff7f6ef75 -> e008ad175


[SPARK-24782][SQL] Simplify conf retrieval in SQL expressions

## What changes were proposed in this pull request?

The PR simplifies the retrieval of config in `size`, as we can access them from tasks too thanks to SPARK-24250.

## How was this patch tested?

existing UTs

Author: Marco Gaido <ma...@gmail.com>

Closes #21736 from mgaido91/SPARK-24605_followup.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e008ad17
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e008ad17
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e008ad17

Branch: refs/heads/master
Commit: e008ad175256a3192fdcbd2c4793044d52f46d57
Parents: ff7f6ef
Author: Marco Gaido <ma...@gmail.com>
Authored: Wed Jul 11 17:30:43 2018 -0700
Committer: Xiao Li <ga...@gmail.com>
Committed: Wed Jul 11 17:30:43 2018 -0700

----------------------------------------------------------------------
 .../expressions/collectionOperations.scala      | 10 +--
 .../catalyst/expressions/jsonExpressions.scala  | 16 ++---
 .../spark/sql/catalyst/plans/QueryPlan.scala    |  2 -
 .../CollectionExpressionsSuite.scala            | 27 ++++----
 .../expressions/JsonExpressionsSuite.scala      | 65 ++++++++++----------
 .../scala/org/apache/spark/sql/functions.scala  |  4 +-
 6 files changed, 57 insertions(+), 67 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/e008ad17/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
index 879603b..e217d37 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
@@ -89,15 +89,9 @@ trait BinaryArrayExpressionWithImplicitCast extends BinaryExpression
       > SELECT _FUNC_(NULL);
        -1
   """)
-case class Size(
-    child: Expression,
-    legacySizeOfNull: Boolean)
-  extends UnaryExpression with ExpectsInputTypes {
+case class Size(child: Expression) extends UnaryExpression with ExpectsInputTypes {
 
-  def this(child: Expression) =
-    this(
-      child,
-      legacySizeOfNull = SQLConf.get.getConf(SQLConf.LEGACY_SIZE_OF_NULL))
+  val legacySizeOfNull = SQLConf.get.legacySizeOfNull
 
   override def dataType: DataType = IntegerType
   override def inputTypes: Seq[AbstractDataType] = Seq(TypeCollection(ArrayType, MapType))

http://git-wip-us.apache.org/repos/asf/spark/blob/e008ad17/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala
index 8cd8605..63943b1 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala
@@ -514,10 +514,11 @@ case class JsonToStructs(
     schema: DataType,
     options: Map[String, String],
     child: Expression,
-    timeZoneId: Option[String],
-    forceNullableSchema: Boolean)
+    timeZoneId: Option[String] = None)
   extends UnaryExpression with TimeZoneAwareExpression with CodegenFallback with ExpectsInputTypes {
 
+  val forceNullableSchema = SQLConf.get.getConf(SQLConf.FROM_JSON_FORCE_NULLABLE_SCHEMA)
+
   // The JSON input data might be missing certain fields. We force the nullability
   // of the user-provided schema to avoid data corruptions. In particular, the parquet-mr encoder
   // can generate incorrect files if values are missing in columns declared as non-nullable.
@@ -531,8 +532,7 @@ case class JsonToStructs(
       schema = JsonExprUtils.evalSchemaExpr(schema),
       options = options,
       child = child,
-      timeZoneId = None,
-      forceNullableSchema = SQLConf.get.getConf(SQLConf.FROM_JSON_FORCE_NULLABLE_SCHEMA))
+      timeZoneId = None)
 
   def this(child: Expression, schema: Expression) = this(child, schema, Map.empty[String, String])
 
@@ -541,13 +541,7 @@ case class JsonToStructs(
       schema = JsonExprUtils.evalSchemaExpr(schema),
       options = JsonExprUtils.convertToMapData(options),
       child = child,
-      timeZoneId = None,
-      forceNullableSchema = SQLConf.get.getConf(SQLConf.FROM_JSON_FORCE_NULLABLE_SCHEMA))
-
-  // Used in `org.apache.spark.sql.functions`
-  def this(schema: DataType, options: Map[String, String], child: Expression) =
-    this(schema, options, child, timeZoneId = None,
-      forceNullableSchema = SQLConf.get.getConf(SQLConf.FROM_JSON_FORCE_NULLABLE_SCHEMA))
+      timeZoneId = None)
 
   override def checkInputDataTypes(): TypeCheckResult = nullableSchema match {
     case _: StructType | ArrayType(_: StructType, _) | _: MapType =>

http://git-wip-us.apache.org/repos/asf/spark/blob/e008ad17/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala
index e431c95..4b4722b 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala
@@ -27,8 +27,6 @@ abstract class QueryPlan[PlanType <: QueryPlan[PlanType]] extends TreeNode[PlanT
 
   /**
    * The active config object within the current scope.
-   * Note that if you want to refer config values during execution, you have to capture them
-   * in Driver and use the captured values in Executors.
    * See [[SQLConf.get]] for more information.
    */
   def conf: SQLConf = SQLConf.get

http://git-wip-us.apache.org/repos/asf/spark/blob/e008ad17/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CollectionExpressionsSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CollectionExpressionsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CollectionExpressionsSuite.scala
index 173c98a..a838a2e 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CollectionExpressionsSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CollectionExpressionsSuite.scala
@@ -24,43 +24,48 @@ import org.apache.spark.SparkFunSuite
 import org.apache.spark.sql.Row
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.util.DateTimeTestUtils
+import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types._
 import org.apache.spark.unsafe.array.ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH
 import org.apache.spark.unsafe.types.CalendarInterval
 
 class CollectionExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper {
 
-  def testSize(legacySizeOfNull: Boolean, sizeOfNull: Any): Unit = {
+  def testSize(sizeOfNull: Any): Unit = {
     val a0 = Literal.create(Seq(1, 2, 3), ArrayType(IntegerType))
     val a1 = Literal.create(Seq[Integer](), ArrayType(IntegerType))
     val a2 = Literal.create(Seq(1, 2), ArrayType(IntegerType))
 
-    checkEvaluation(Size(a0, legacySizeOfNull), 3)
-    checkEvaluation(Size(a1, legacySizeOfNull), 0)
-    checkEvaluation(Size(a2, legacySizeOfNull), 2)
+    checkEvaluation(Size(a0), 3)
+    checkEvaluation(Size(a1), 0)
+    checkEvaluation(Size(a2), 2)
 
     val m0 = Literal.create(Map("a" -> "a", "b" -> "b"), MapType(StringType, StringType))
     val m1 = Literal.create(Map[String, String](), MapType(StringType, StringType))
     val m2 = Literal.create(Map("a" -> "a"), MapType(StringType, StringType))
 
-    checkEvaluation(Size(m0, legacySizeOfNull), 2)
-    checkEvaluation(Size(m1, legacySizeOfNull), 0)
-    checkEvaluation(Size(m2, legacySizeOfNull), 1)
+    checkEvaluation(Size(m0), 2)
+    checkEvaluation(Size(m1), 0)
+    checkEvaluation(Size(m2), 1)
 
     checkEvaluation(
-      Size(Literal.create(null, MapType(StringType, StringType)), legacySizeOfNull),
+      Size(Literal.create(null, MapType(StringType, StringType))),
       expected = sizeOfNull)
     checkEvaluation(
-      Size(Literal.create(null, ArrayType(StringType)), legacySizeOfNull),
+      Size(Literal.create(null, ArrayType(StringType))),
       expected = sizeOfNull)
   }
 
   test("Array and Map Size - legacy") {
-    testSize(legacySizeOfNull = true, sizeOfNull = -1)
+    withSQLConf(SQLConf.LEGACY_SIZE_OF_NULL.key -> "true") {
+      testSize(sizeOfNull = -1)
+    }
   }
 
   test("Array and Map Size") {
-    testSize(legacySizeOfNull = false, sizeOfNull = null)
+    withSQLConf(SQLConf.LEGACY_SIZE_OF_NULL.key -> "false") {
+      testSize(sizeOfNull = null)
+    }
   }
 
   test("MapKeys/MapValues") {

http://git-wip-us.apache.org/repos/asf/spark/blob/e008ad17/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala
index 52203b9..04f1c8c 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala
@@ -392,7 +392,7 @@ class JsonExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper with
     val jsonData = """{"a": 1}"""
     val schema = StructType(StructField("a", IntegerType) :: Nil)
     checkEvaluation(
-      JsonToStructs(schema, Map.empty, Literal(jsonData), gmtId, true),
+      JsonToStructs(schema, Map.empty, Literal(jsonData), gmtId),
       InternalRow(1)
     )
   }
@@ -401,13 +401,13 @@ class JsonExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper with
     val jsonData = """{"a" 1}"""
     val schema = StructType(StructField("a", IntegerType) :: Nil)
     checkEvaluation(
-      JsonToStructs(schema, Map.empty, Literal(jsonData), gmtId, true),
+      JsonToStructs(schema, Map.empty, Literal(jsonData), gmtId),
       null
     )
 
     // Other modes should still return `null`.
     checkEvaluation(
-      JsonToStructs(schema, Map("mode" -> PermissiveMode.name), Literal(jsonData), gmtId, true),
+      JsonToStructs(schema, Map("mode" -> PermissiveMode.name), Literal(jsonData), gmtId),
       null
     )
   }
@@ -416,62 +416,62 @@ class JsonExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper with
     val input = """[{"a": 1}, {"a": 2}]"""
     val schema = ArrayType(StructType(StructField("a", IntegerType) :: Nil))
     val output = InternalRow(1) :: InternalRow(2) :: Nil
-    checkEvaluation(JsonToStructs(schema, Map.empty, Literal(input), gmtId, true), output)
+    checkEvaluation(JsonToStructs(schema, Map.empty, Literal(input), gmtId), output)
   }
 
   test("from_json - input=object, schema=array, output=array of single row") {
     val input = """{"a": 1}"""
     val schema = ArrayType(StructType(StructField("a", IntegerType) :: Nil))
     val output = InternalRow(1) :: Nil
-    checkEvaluation(JsonToStructs(schema, Map.empty, Literal(input), gmtId, true), output)
+    checkEvaluation(JsonToStructs(schema, Map.empty, Literal(input), gmtId), output)
   }
 
   test("from_json - input=empty array, schema=array, output=empty array") {
     val input = "[ ]"
     val schema = ArrayType(StructType(StructField("a", IntegerType) :: Nil))
     val output = Nil
-    checkEvaluation(JsonToStructs(schema, Map.empty, Literal(input), gmtId, true), output)
+    checkEvaluation(JsonToStructs(schema, Map.empty, Literal(input), gmtId), output)
   }
 
   test("from_json - input=empty object, schema=array, output=array of single row with null") {
     val input = "{ }"
     val schema = ArrayType(StructType(StructField("a", IntegerType) :: Nil))
     val output = InternalRow(null) :: Nil
-    checkEvaluation(JsonToStructs(schema, Map.empty, Literal(input), gmtId, true), output)
+    checkEvaluation(JsonToStructs(schema, Map.empty, Literal(input), gmtId), output)
   }
 
   test("from_json - input=array of single object, schema=struct, output=single row") {
     val input = """[{"a": 1}]"""
     val schema = StructType(StructField("a", IntegerType) :: Nil)
     val output = InternalRow(1)
-    checkEvaluation(JsonToStructs(schema, Map.empty, Literal(input), gmtId, true), output)
+    checkEvaluation(JsonToStructs(schema, Map.empty, Literal(input), gmtId), output)
   }
 
   test("from_json - input=array, schema=struct, output=null") {
     val input = """[{"a": 1}, {"a": 2}]"""
     val schema = StructType(StructField("a", IntegerType) :: Nil)
     val output = null
-    checkEvaluation(JsonToStructs(schema, Map.empty, Literal(input), gmtId, true), output)
+    checkEvaluation(JsonToStructs(schema, Map.empty, Literal(input), gmtId), output)
   }
 
   test("from_json - input=empty array, schema=struct, output=null") {
     val input = """[]"""
     val schema = StructType(StructField("a", IntegerType) :: Nil)
     val output = null
-    checkEvaluation(JsonToStructs(schema, Map.empty, Literal(input), gmtId, true), output)
+    checkEvaluation(JsonToStructs(schema, Map.empty, Literal(input), gmtId), output)
   }
 
   test("from_json - input=empty object, schema=struct, output=single row with null") {
     val input = """{  }"""
     val schema = StructType(StructField("a", IntegerType) :: Nil)
     val output = InternalRow(null)
-    checkEvaluation(JsonToStructs(schema, Map.empty, Literal(input), gmtId, true), output)
+    checkEvaluation(JsonToStructs(schema, Map.empty, Literal(input), gmtId), output)
   }
 
   test("from_json null input column") {
     val schema = StructType(StructField("a", IntegerType) :: Nil)
     checkEvaluation(
-      JsonToStructs(schema, Map.empty, Literal.create(null, StringType), gmtId, true),
+      JsonToStructs(schema, Map.empty, Literal.create(null, StringType), gmtId),
       null
     )
   }
@@ -479,7 +479,7 @@ class JsonExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper with
   test("SPARK-20549: from_json bad UTF-8") {
     val schema = StructType(StructField("a", IntegerType) :: Nil)
     checkEvaluation(
-      JsonToStructs(schema, Map.empty, Literal(badJson), gmtId, true),
+      JsonToStructs(schema, Map.empty, Literal(badJson), gmtId),
       null)
   }
 
@@ -491,14 +491,14 @@ class JsonExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper with
     c.set(2016, 0, 1, 0, 0, 0)
     c.set(Calendar.MILLISECOND, 123)
     checkEvaluation(
-      JsonToStructs(schema, Map.empty, Literal(jsonData1), gmtId, true),
+      JsonToStructs(schema, Map.empty, Literal(jsonData1), gmtId),
       InternalRow(c.getTimeInMillis * 1000L)
     )
     // The result doesn't change because the json string includes timezone string ("Z" here),
     // which means the string represents the timestamp string in the timezone regardless of
     // the timeZoneId parameter.
     checkEvaluation(
-      JsonToStructs(schema, Map.empty, Literal(jsonData1), Option("PST"), true),
+      JsonToStructs(schema, Map.empty, Literal(jsonData1), Option("PST")),
       InternalRow(c.getTimeInMillis * 1000L)
     )
 
@@ -512,8 +512,7 @@ class JsonExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper with
           schema,
           Map("timestampFormat" -> "yyyy-MM-dd'T'HH:mm:ss"),
           Literal(jsonData2),
-          Option(tz.getID),
-          true),
+          Option(tz.getID)),
         InternalRow(c.getTimeInMillis * 1000L)
       )
       checkEvaluation(
@@ -522,8 +521,7 @@ class JsonExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper with
           Map("timestampFormat" -> "yyyy-MM-dd'T'HH:mm:ss",
             DateTimeUtils.TIMEZONE_OPTION -> tz.getID),
           Literal(jsonData2),
-          gmtId,
-          true),
+          gmtId),
         InternalRow(c.getTimeInMillis * 1000L)
       )
     }
@@ -532,7 +530,7 @@ class JsonExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper with
   test("SPARK-19543: from_json empty input column") {
     val schema = StructType(StructField("a", IntegerType) :: Nil)
     checkEvaluation(
-      JsonToStructs(schema, Map.empty, Literal.create(" ", StringType), gmtId, true),
+      JsonToStructs(schema, Map.empty, Literal.create(" ", StringType), gmtId),
       null
     )
   }
@@ -687,23 +685,24 @@ class JsonExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper with
 
   test("from_json missing fields") {
     for (forceJsonNullableSchema <- Seq(false, true)) {
-      val input =
-        """{
+      withSQLConf(SQLConf.FROM_JSON_FORCE_NULLABLE_SCHEMA.key -> forceJsonNullableSchema.toString) {
+        val input =
+          """{
           |  "a": 1,
           |  "c": "foo"
           |}
           |""".stripMargin
-      val jsonSchema = new StructType()
-        .add("a", LongType, nullable = false)
-        .add("b", StringType, nullable = false)
-        .add("c", StringType, nullable = false)
-      val output = InternalRow(1L, null, UTF8String.fromString("foo"))
-      val expr = JsonToStructs(
-        jsonSchema, Map.empty, Literal.create(input, StringType), gmtId, forceJsonNullableSchema)
-      checkEvaluation(expr, output)
-      val schema = expr.dataType
-      val schemaToCompare = if (forceJsonNullableSchema) jsonSchema.asNullable else jsonSchema
-      assert(schemaToCompare == schema)
+        val jsonSchema = new StructType()
+          .add("a", LongType, nullable = false)
+          .add("b", StringType, nullable = false)
+          .add("c", StringType, nullable = false)
+        val output = InternalRow(1L, null, UTF8String.fromString("foo"))
+        val expr = JsonToStructs(jsonSchema, Map.empty, Literal.create(input, StringType), gmtId)
+        checkEvaluation(expr, output)
+        val schema = expr.dataType
+        val schemaToCompare = if (forceJsonNullableSchema) jsonSchema.asNullable else jsonSchema
+        assert(schemaToCompare == schema)
+      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/e008ad17/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
index 89dbba1..6b956dd 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
@@ -3304,7 +3304,7 @@ object functions {
    * @since 2.2.0
    */
   def from_json(e: Column, schema: DataType, options: Map[String, String]): Column = withExpr {
-    new JsonToStructs(schema, options, e.expr)
+    JsonToStructs(schema, options, e.expr)
   }
 
   /**
@@ -3495,7 +3495,7 @@ object functions {
    * @group collection_funcs
    * @since 1.5.0
    */
-  def size(e: Column): Column = withExpr { new Size(e.expr) }
+  def size(e: Column): Column = withExpr { Size(e.expr) }
 
   /**
    * Sorts the input array for the given column in ascending order,


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org