You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by da...@apache.org on 2016/04/03 08:12:10 UTC

spark git commit: [SPARK-14231] [SQL] JSON data source infers floating-point values as a double when they do not fit in a decimal

Repository: spark
Updated Branches:
  refs/heads/master 7be462050 -> 2262a9335


[SPARK-14231] [SQL] JSON data source infers floating-point values as a double when they do not fit in a decimal

## What changes were proposed in this pull request?

https://issues.apache.org/jira/browse/SPARK-14231

Currently, JSON data source supports to infer `DecimalType` for big numbers and `floatAsBigDecimal` option which reads floating-point values as `DecimalType`.

But there are few restrictions in Spark `DecimalType` below:

1. The precision cannot be bigger than 38.
2. scale cannot be bigger than precision.

Currently, both restrictions are not being handled.

This PR handles the cases by inferring them as `DoubleType`. Also, the option name was changed from `floatAsBigDecimal` to `prefersDecimal` as suggested [here](https://issues.apache.org/jira/browse/SPARK-14231?focusedCommentId=15215579&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-15215579).

So, the codes below:

```scala
def doubleRecords: RDD[String] =
  sqlContext.sparkContext.parallelize(
    s"""{"a": 1${"0" * 38}, "b": 0.01}""" ::
    s"""{"a": 2${"0" * 38}, "b": 0.02}""" :: Nil)

val jsonDF = sqlContext.read
  .option("prefersDecimal", "true")
  .json(doubleRecords)
jsonDF.printSchema()
```

produces below:

- **Before**

```scala
org.apache.spark.sql.AnalysisException: Decimal scale (2) cannot be greater than precision (1).;
	at org.apache.spark.sql.types.DecimalType.<init>(DecimalType.scala:44)
	at org.apache.spark.sql.execution.datasources.json.InferSchema$.org$apache$spark$sql$execution$datasources$json$InferSchema$$inferField(InferSchema.scala:144)
	at org.apache.spark.sql.execution.datasources.json.InferSchema$.org$apache$spark$sql$execution$datasources$json$InferSchema$$inferField(InferSchema.scala:108)
	at
...
```

- **After**

```scala
root
 |-- a: double (nullable = true)
 |-- b: double (nullable = true)
```

## How was this patch tested?

Unit tests were used and `./dev/run_tests` for coding style tests.

Author: hyukjinkwon <gu...@gmail.com>

Closes #12030 from HyukjinKwon/SPARK-14231.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2262a933
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2262a933
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2262a933

Branch: refs/heads/master
Commit: 2262a93358c2f6d4cfb73645c4ebc963c5640ec8
Parents: 7be4620
Author: hyukjinkwon <gu...@gmail.com>
Authored: Sat Apr 2 23:12:04 2016 -0700
Committer: Davies Liu <da...@gmail.com>
Committed: Sat Apr 2 23:12:04 2016 -0700

----------------------------------------------------------------------
 python/pyspark/sql/readwriter.py                |  4 +-
 .../org/apache/spark/sql/DataFrameReader.scala  |  4 +-
 .../datasources/json/InferSchema.scala          | 17 ++++---
 .../datasources/json/JSONOptions.scala          |  4 +-
 .../execution/datasources/json/JsonSuite.scala  | 48 +++++++++++++++++++-
 .../datasources/json/TestJsonData.scala         |  8 ++++
 6 files changed, 71 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/2262a933/python/pyspark/sql/readwriter.py
----------------------------------------------------------------------
diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py
index cca57a3..0cef37e 100644
--- a/python/pyspark/sql/readwriter.py
+++ b/python/pyspark/sql/readwriter.py
@@ -152,8 +152,8 @@ class DataFrameReader(object):
         You can set the following JSON-specific options to deal with non-standard JSON files:
             * ``primitivesAsString`` (default ``false``): infers all primitive values as a string \
                 type
-            * `floatAsBigDecimal` (default `false`): infers all floating-point values as a decimal \
-                type
+            * `prefersDecimal` (default `false`): infers all floating-point values as a decimal \
+                type. If the values do not fit in decimal, then it infers them as doubles.
             * ``allowComments`` (default ``false``): ignores Java/C++ style comment in JSON records
             * ``allowUnquotedFieldNames`` (default ``false``): allows unquoted JSON field names
             * ``allowSingleQuotes`` (default ``true``): allows single quotes in addition to double \

http://git-wip-us.apache.org/repos/asf/spark/blob/2262a933/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
index 704535a..a5a6e01 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
@@ -315,8 +315,8 @@ class DataFrameReader private[sql](sqlContext: SQLContext) extends Logging {
    *
    * You can set the following JSON-specific options to deal with non-standard JSON files:
    * <li>`primitivesAsString` (default `false`): infers all primitive values as a string type</li>
-   * <li>`floatAsBigDecimal` (default `false`): infers all floating-point values as a decimal
-   * type</li>
+   * <li>`prefersDecimal` (default `false`): infers all floating-point values as a decimal
+   * type. If the values do not fit in decimal, then it infers them as doubles.</li>
    * <li>`allowComments` (default `false`): ignores Java/C++ style comment in JSON records</li>
    * <li>`allowUnquotedFieldNames` (default `false`): allows unquoted JSON field names</li>
    * <li>`allowSingleQuotes` (default `true`): allows single quotes in addition to double quotes

http://git-wip-us.apache.org/repos/asf/spark/blob/2262a933/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/InferSchema.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/InferSchema.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/InferSchema.scala
index 945ed2c..4a34f36 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/InferSchema.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/InferSchema.scala
@@ -25,7 +25,6 @@ import org.apache.spark.sql.execution.datasources.json.JacksonUtils.nextUntil
 import org.apache.spark.sql.types._
 import org.apache.spark.util.Utils
 
-
 private[sql] object InferSchema {
 
   /**
@@ -135,14 +134,20 @@ private[sql] object InferSchema {
           // when we see a Java BigInteger, we use DecimalType.
           case BIG_INTEGER | BIG_DECIMAL =>
             val v = parser.getDecimalValue
-            DecimalType(v.precision(), v.scale())
-          case FLOAT | DOUBLE =>
-            if (configOptions.floatAsBigDecimal) {
-              val v = parser.getDecimalValue
-              DecimalType(v.precision(), v.scale())
+            if (Math.max(v.precision(), v.scale()) <= DecimalType.MAX_PRECISION) {
+              DecimalType(Math.max(v.precision(), v.scale()), v.scale())
+            } else {
+              DoubleType
+            }
+          case FLOAT | DOUBLE if configOptions.prefersDecimal =>
+            val v = parser.getDecimalValue
+            if (Math.max(v.precision(), v.scale()) <= DecimalType.MAX_PRECISION) {
+              DecimalType(Math.max(v.precision(), v.scale()), v.scale())
             } else {
               DoubleType
             }
+          case FLOAT | DOUBLE =>
+            DoubleType
         }
 
       case VALUE_TRUE | VALUE_FALSE => BooleanType

http://git-wip-us.apache.org/repos/asf/spark/blob/2262a933/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONOptions.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONOptions.scala
index c0ad9ef..66f1126 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONOptions.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONOptions.scala
@@ -35,8 +35,8 @@ private[sql] class JSONOptions(
     parameters.get("samplingRatio").map(_.toDouble).getOrElse(1.0)
   val primitivesAsString =
     parameters.get("primitivesAsString").map(_.toBoolean).getOrElse(false)
-  val floatAsBigDecimal =
-    parameters.get("floatAsBigDecimal").map(_.toBoolean).getOrElse(false)
+  val prefersDecimal =
+    parameters.get("prefersDecimal").map(_.toBoolean).getOrElse(false)
   val allowComments =
     parameters.get("allowComments").map(_.toBoolean).getOrElse(false)
   val allowUnquotedFieldNames =

http://git-wip-us.apache.org/repos/asf/spark/blob/2262a933/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
index c108d81..421862c 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
@@ -745,8 +745,8 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
     )
   }
 
-  test("Loading a JSON dataset floatAsBigDecimal returns schema with float types as BigDecimal") {
-    val jsonDF = sqlContext.read.option("floatAsBigDecimal", "true").json(primitiveFieldAndType)
+  test("Loading a JSON dataset prefersDecimal returns schema with float types as BigDecimal") {
+    val jsonDF = sqlContext.read.option("prefersDecimal", "true").json(primitiveFieldAndType)
 
     val expectedSchema = StructType(
       StructField("bigInteger", DecimalType(20, 0), true) ::
@@ -773,6 +773,50 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
     )
   }
 
+  test("Infer big integers correctly even when it does not fit in decimal") {
+    val jsonDF = sqlContext.read
+      .json(bigIntegerRecords)
+
+    // The value in `a` field will be a double as it does not fit in decimal. For `b` field,
+    // it will be a decimal as `92233720368547758070`.
+    val expectedSchema = StructType(
+      StructField("a", DoubleType, true) ::
+      StructField("b", DecimalType(20, 0), true) :: Nil)
+
+    assert(expectedSchema === jsonDF.schema)
+    checkAnswer(jsonDF, Row(1.0E38D, BigDecimal("92233720368547758070")))
+  }
+
+  test("Infer floating-point values correctly even when it does not fit in decimal") {
+    val jsonDF = sqlContext.read
+      .option("prefersDecimal", "true")
+      .json(floatingValueRecords)
+
+    // The value in `a` field will be a double as it does not fit in decimal. For `b` field,
+    // it will be a decimal as `0.01` by having a precision equal to the scale.
+    val expectedSchema = StructType(
+      StructField("a", DoubleType, true) ::
+      StructField("b", DecimalType(2, 2), true):: Nil)
+
+    assert(expectedSchema === jsonDF.schema)
+    checkAnswer(jsonDF, Row(1.0E-39D, BigDecimal(0.01)))
+
+    val mergedJsonDF = sqlContext.read
+      .option("prefersDecimal", "true")
+      .json(floatingValueRecords ++ bigIntegerRecords)
+
+    val expectedMergedSchema = StructType(
+      StructField("a", DoubleType, true) ::
+      StructField("b", DecimalType(22, 2), true):: Nil)
+
+    assert(expectedMergedSchema === mergedJsonDF.schema)
+    checkAnswer(
+      mergedJsonDF,
+      Row(1.0E-39D, BigDecimal(0.01)) ::
+      Row(1.0E38D, BigDecimal("92233720368547758070")) :: Nil
+    )
+  }
+
   test("Loading a JSON dataset from a text file with SQL") {
     val dir = Utils.createTempDir()
     dir.delete()

http://git-wip-us.apache.org/repos/asf/spark/blob/2262a933/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/TestJsonData.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/TestJsonData.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/TestJsonData.scala
index b2eff81..2873c6a 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/TestJsonData.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/TestJsonData.scala
@@ -214,6 +214,14 @@ private[json] trait TestJsonData {
       """{"a": {"b": 1}}""" ::
       """{"a": []}""" :: Nil)
 
+  def floatingValueRecords: RDD[String] =
+    sqlContext.sparkContext.parallelize(
+      s"""{"a": 0.${"0" * 38}1, "b": 0.01}""" :: Nil)
+
+  def bigIntegerRecords: RDD[String] =
+    sqlContext.sparkContext.parallelize(
+      s"""{"a": 1${"0" * 38}, "b": 92233720368547758070}""" :: Nil)
+
   lazy val singleRow: RDD[String] = sqlContext.sparkContext.parallelize("""{"a":123}""" :: Nil)
 
   def empty: RDD[String] = sqlContext.sparkContext.parallelize(Seq[String]())


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org