You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by da...@apache.org on 2016/04/03 08:12:10 UTC
spark git commit: [SPARK-14231] [SQL] JSON data source infers
floating-point values as a double when they do not fit in a decimal
Repository: spark
Updated Branches:
refs/heads/master 7be462050 -> 2262a9335
[SPARK-14231] [SQL] JSON data source infers floating-point values as a double when they do not fit in a decimal
## What changes were proposed in this pull request?
https://issues.apache.org/jira/browse/SPARK-14231
Currently, JSON data source supports to infer `DecimalType` for big numbers and `floatAsBigDecimal` option which reads floating-point values as `DecimalType`.
But there are few restrictions in Spark `DecimalType` below:
1. The precision cannot be bigger than 38.
2. scale cannot be bigger than precision.
Currently, both restrictions are not being handled.
This PR handles the cases by inferring them as `DoubleType`. Also, the option name was changed from `floatAsBigDecimal` to `prefersDecimal` as suggested [here](https://issues.apache.org/jira/browse/SPARK-14231?focusedCommentId=15215579&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-15215579).
So, the codes below:
```scala
def doubleRecords: RDD[String] =
sqlContext.sparkContext.parallelize(
s"""{"a": 1${"0" * 38}, "b": 0.01}""" ::
s"""{"a": 2${"0" * 38}, "b": 0.02}""" :: Nil)
val jsonDF = sqlContext.read
.option("prefersDecimal", "true")
.json(doubleRecords)
jsonDF.printSchema()
```
produces below:
- **Before**
```scala
org.apache.spark.sql.AnalysisException: Decimal scale (2) cannot be greater than precision (1).;
at org.apache.spark.sql.types.DecimalType.<init>(DecimalType.scala:44)
at org.apache.spark.sql.execution.datasources.json.InferSchema$.org$apache$spark$sql$execution$datasources$json$InferSchema$$inferField(InferSchema.scala:144)
at org.apache.spark.sql.execution.datasources.json.InferSchema$.org$apache$spark$sql$execution$datasources$json$InferSchema$$inferField(InferSchema.scala:108)
at
...
```
- **After**
```scala
root
|-- a: double (nullable = true)
|-- b: double (nullable = true)
```
## How was this patch tested?
Unit tests were used and `./dev/run_tests` for coding style tests.
Author: hyukjinkwon <gu...@gmail.com>
Closes #12030 from HyukjinKwon/SPARK-14231.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2262a933
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2262a933
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2262a933
Branch: refs/heads/master
Commit: 2262a93358c2f6d4cfb73645c4ebc963c5640ec8
Parents: 7be4620
Author: hyukjinkwon <gu...@gmail.com>
Authored: Sat Apr 2 23:12:04 2016 -0700
Committer: Davies Liu <da...@gmail.com>
Committed: Sat Apr 2 23:12:04 2016 -0700
----------------------------------------------------------------------
python/pyspark/sql/readwriter.py | 4 +-
.../org/apache/spark/sql/DataFrameReader.scala | 4 +-
.../datasources/json/InferSchema.scala | 17 ++++---
.../datasources/json/JSONOptions.scala | 4 +-
.../execution/datasources/json/JsonSuite.scala | 48 +++++++++++++++++++-
.../datasources/json/TestJsonData.scala | 8 ++++
6 files changed, 71 insertions(+), 14 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/2262a933/python/pyspark/sql/readwriter.py
----------------------------------------------------------------------
diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py
index cca57a3..0cef37e 100644
--- a/python/pyspark/sql/readwriter.py
+++ b/python/pyspark/sql/readwriter.py
@@ -152,8 +152,8 @@ class DataFrameReader(object):
You can set the following JSON-specific options to deal with non-standard JSON files:
* ``primitivesAsString`` (default ``false``): infers all primitive values as a string \
type
- * `floatAsBigDecimal` (default `false`): infers all floating-point values as a decimal \
- type
+ * `prefersDecimal` (default `false`): infers all floating-point values as a decimal \
+ type. If the values do not fit in decimal, then it infers them as doubles.
* ``allowComments`` (default ``false``): ignores Java/C++ style comment in JSON records
* ``allowUnquotedFieldNames`` (default ``false``): allows unquoted JSON field names
* ``allowSingleQuotes`` (default ``true``): allows single quotes in addition to double \
http://git-wip-us.apache.org/repos/asf/spark/blob/2262a933/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
index 704535a..a5a6e01 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
@@ -315,8 +315,8 @@ class DataFrameReader private[sql](sqlContext: SQLContext) extends Logging {
*
* You can set the following JSON-specific options to deal with non-standard JSON files:
* <li>`primitivesAsString` (default `false`): infers all primitive values as a string type</li>
- * <li>`floatAsBigDecimal` (default `false`): infers all floating-point values as a decimal
- * type</li>
+ * <li>`prefersDecimal` (default `false`): infers all floating-point values as a decimal
+ * type. If the values do not fit in decimal, then it infers them as doubles.</li>
* <li>`allowComments` (default `false`): ignores Java/C++ style comment in JSON records</li>
* <li>`allowUnquotedFieldNames` (default `false`): allows unquoted JSON field names</li>
* <li>`allowSingleQuotes` (default `true`): allows single quotes in addition to double quotes
http://git-wip-us.apache.org/repos/asf/spark/blob/2262a933/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/InferSchema.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/InferSchema.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/InferSchema.scala
index 945ed2c..4a34f36 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/InferSchema.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/InferSchema.scala
@@ -25,7 +25,6 @@ import org.apache.spark.sql.execution.datasources.json.JacksonUtils.nextUntil
import org.apache.spark.sql.types._
import org.apache.spark.util.Utils
-
private[sql] object InferSchema {
/**
@@ -135,14 +134,20 @@ private[sql] object InferSchema {
// when we see a Java BigInteger, we use DecimalType.
case BIG_INTEGER | BIG_DECIMAL =>
val v = parser.getDecimalValue
- DecimalType(v.precision(), v.scale())
- case FLOAT | DOUBLE =>
- if (configOptions.floatAsBigDecimal) {
- val v = parser.getDecimalValue
- DecimalType(v.precision(), v.scale())
+ if (Math.max(v.precision(), v.scale()) <= DecimalType.MAX_PRECISION) {
+ DecimalType(Math.max(v.precision(), v.scale()), v.scale())
+ } else {
+ DoubleType
+ }
+ case FLOAT | DOUBLE if configOptions.prefersDecimal =>
+ val v = parser.getDecimalValue
+ if (Math.max(v.precision(), v.scale()) <= DecimalType.MAX_PRECISION) {
+ DecimalType(Math.max(v.precision(), v.scale()), v.scale())
} else {
DoubleType
}
+ case FLOAT | DOUBLE =>
+ DoubleType
}
case VALUE_TRUE | VALUE_FALSE => BooleanType
http://git-wip-us.apache.org/repos/asf/spark/blob/2262a933/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONOptions.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONOptions.scala
index c0ad9ef..66f1126 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONOptions.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONOptions.scala
@@ -35,8 +35,8 @@ private[sql] class JSONOptions(
parameters.get("samplingRatio").map(_.toDouble).getOrElse(1.0)
val primitivesAsString =
parameters.get("primitivesAsString").map(_.toBoolean).getOrElse(false)
- val floatAsBigDecimal =
- parameters.get("floatAsBigDecimal").map(_.toBoolean).getOrElse(false)
+ val prefersDecimal =
+ parameters.get("prefersDecimal").map(_.toBoolean).getOrElse(false)
val allowComments =
parameters.get("allowComments").map(_.toBoolean).getOrElse(false)
val allowUnquotedFieldNames =
http://git-wip-us.apache.org/repos/asf/spark/blob/2262a933/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
index c108d81..421862c 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
@@ -745,8 +745,8 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
)
}
- test("Loading a JSON dataset floatAsBigDecimal returns schema with float types as BigDecimal") {
- val jsonDF = sqlContext.read.option("floatAsBigDecimal", "true").json(primitiveFieldAndType)
+ test("Loading a JSON dataset prefersDecimal returns schema with float types as BigDecimal") {
+ val jsonDF = sqlContext.read.option("prefersDecimal", "true").json(primitiveFieldAndType)
val expectedSchema = StructType(
StructField("bigInteger", DecimalType(20, 0), true) ::
@@ -773,6 +773,50 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
)
}
+ test("Infer big integers correctly even when it does not fit in decimal") {
+ val jsonDF = sqlContext.read
+ .json(bigIntegerRecords)
+
+ // The value in `a` field will be a double as it does not fit in decimal. For `b` field,
+ // it will be a decimal as `92233720368547758070`.
+ val expectedSchema = StructType(
+ StructField("a", DoubleType, true) ::
+ StructField("b", DecimalType(20, 0), true) :: Nil)
+
+ assert(expectedSchema === jsonDF.schema)
+ checkAnswer(jsonDF, Row(1.0E38D, BigDecimal("92233720368547758070")))
+ }
+
+ test("Infer floating-point values correctly even when it does not fit in decimal") {
+ val jsonDF = sqlContext.read
+ .option("prefersDecimal", "true")
+ .json(floatingValueRecords)
+
+ // The value in `a` field will be a double as it does not fit in decimal. For `b` field,
+ // it will be a decimal as `0.01` by having a precision equal to the scale.
+ val expectedSchema = StructType(
+ StructField("a", DoubleType, true) ::
+ StructField("b", DecimalType(2, 2), true):: Nil)
+
+ assert(expectedSchema === jsonDF.schema)
+ checkAnswer(jsonDF, Row(1.0E-39D, BigDecimal(0.01)))
+
+ val mergedJsonDF = sqlContext.read
+ .option("prefersDecimal", "true")
+ .json(floatingValueRecords ++ bigIntegerRecords)
+
+ val expectedMergedSchema = StructType(
+ StructField("a", DoubleType, true) ::
+ StructField("b", DecimalType(22, 2), true):: Nil)
+
+ assert(expectedMergedSchema === mergedJsonDF.schema)
+ checkAnswer(
+ mergedJsonDF,
+ Row(1.0E-39D, BigDecimal(0.01)) ::
+ Row(1.0E38D, BigDecimal("92233720368547758070")) :: Nil
+ )
+ }
+
test("Loading a JSON dataset from a text file with SQL") {
val dir = Utils.createTempDir()
dir.delete()
http://git-wip-us.apache.org/repos/asf/spark/blob/2262a933/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/TestJsonData.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/TestJsonData.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/TestJsonData.scala
index b2eff81..2873c6a 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/TestJsonData.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/TestJsonData.scala
@@ -214,6 +214,14 @@ private[json] trait TestJsonData {
"""{"a": {"b": 1}}""" ::
"""{"a": []}""" :: Nil)
+ def floatingValueRecords: RDD[String] =
+ sqlContext.sparkContext.parallelize(
+ s"""{"a": 0.${"0" * 38}1, "b": 0.01}""" :: Nil)
+
+ def bigIntegerRecords: RDD[String] =
+ sqlContext.sparkContext.parallelize(
+ s"""{"a": 1${"0" * 38}, "b": 92233720368547758070}""" :: Nil)
+
lazy val singleRow: RDD[String] = sqlContext.sparkContext.parallelize("""{"a":123}""" :: Nil)
def empty: RDD[String] = sqlContext.sparkContext.parallelize(Seq[String]())
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org