You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by do...@apache.org on 2020/03/26 20:16:23 UTC
[spark] branch master updated: [SPARK-31238][SQL] Rebase dates
to/from Julian calendar in write/read for ORC datasource
This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new d72ec85 [SPARK-31238][SQL] Rebase dates to/from Julian calendar in write/read for ORC datasource
d72ec85 is described below
commit d72ec8574113f9a7e87f3d7ec56c8447267b0506
Author: Maxim Gekk <ma...@gmail.com>
AuthorDate: Thu Mar 26 13:14:28 2020 -0700
[SPARK-31238][SQL] Rebase dates to/from Julian calendar in write/read for ORC datasource
### What changes were proposed in this pull request?
This PR (SPARK-31238) aims the followings.
1. Modified ORC Vectorized Reader, in particular, OrcColumnVector v1.2 and v2.3. After the changes, it uses `DateTimeUtils. rebaseJulianToGregorianDays()` added by https://github.com/apache/spark/pull/27915 . The method performs rebasing days from the hybrid calendar (Julian + Gregorian) to Proleptic Gregorian calendar. It builds a local date in the original calendar, extracts date fields `year`, `month` and `day` from the local date, and builds another local date in the target calend [...]
2. Introduced rebasing dates while saving ORC files, in particular, I modified `OrcShimUtils. getDateWritable` v1.2 and v2.3, and returned `DaysWritable` instead of Hive's `DateWritable`. The `DaysWritable` class was added by the PR https://github.com/apache/spark/pull/27890 (and fixed by https://github.com/apache/spark/pull/27962). I moved `DaysWritable` from `sql/hive` to `sql/core` to re-use it in ORC datasource.
### Why are the changes needed?
For the backward compatibility with Spark 2.4 and earlier versions. The changes allow users to read dates/timestamps saved by previous version, and get the same result.
### Does this PR introduce any user-facing change?
Yes. Before the changes, loading the date `1200-01-01` saved by Spark 2.4.5 returns the following:
```scala
scala> spark.read.orc("/Users/maxim/tmp/before_1582/2_4_5_date_orc").show(false)
+----------+
|dt |
+----------+
|1200-01-08|
+----------+
```
After the changes
```scala
scala> spark.read.orc("/Users/maxim/tmp/before_1582/2_4_5_date_orc").show(false)
+----------+
|dt |
+----------+
|1200-01-01|
+----------+
```
### How was this patch tested?
- By running `OrcSourceSuite` and `HiveOrcSourceSuite`.
- Add new test `SPARK-31238: compatibility with Spark 2.4 in reading dates` to `OrcSuite` which reads an ORC file saved by Spark 2.4.5 via the commands:
```shell
$ export TZ="America/Los_Angeles"
```
```scala
scala> sql("select cast('1200-01-01' as date) dt").write.mode("overwrite").orc("/Users/maxim/tmp/before_1582/2_4_5_date_orc")
scala> spark.read.orc("/Users/maxim/tmp/before_1582/2_4_5_date_orc").show(false)
+----------+
|dt |
+----------+
|1200-01-01|
+----------+
```
- Add round trip test `SPARK-31238: rebasing dates in write`. The test `SPARK-31238: compatibility with Spark 2.4 in reading dates` confirms rebasing in read. So, we can check rebasing in write.
Closes #28016 from MaxGekk/rebase-date-orc.
Authored-by: Maxim Gekk <ma...@gmail.com>
Signed-off-by: Dongjoon Hyun <do...@apache.org>
---
.../sql/execution/datasources}/DaysWritable.scala | 10 ++++++--
.../test-data/before_1582_date_v2_4.snappy.orc | Bin 0 -> 201 bytes
.../execution/datasources/orc/OrcSourceSuite.scala | 28 ++++++++++++++++++++-
.../sql/execution/datasources/orc/OrcTest.scala | 5 ++++
.../execution/datasources/orc/OrcColumnVector.java | 15 ++++++++++-
.../execution/datasources/orc}/DaysWritable.scala | 17 ++++++++++---
.../execution/datasources/orc/OrcShimUtils.scala | 4 +--
.../execution/datasources/orc/OrcColumnVector.java | 15 ++++++++++-
.../execution/datasources/orc/OrcShimUtils.scala | 5 ++--
.../org/apache/spark/sql/hive/HiveInspectors.scala | 1 +
10 files changed, 88 insertions(+), 12 deletions(-)
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/DaysWritable.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DaysWritable.scala
similarity index 92%
copy from sql/hive/src/main/scala/org/apache/spark/sql/hive/DaysWritable.scala
copy to sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DaysWritable.scala
index 1eec8d7..00b710f 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/DaysWritable.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DaysWritable.scala
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.spark.sql.hive
+package org.apache.spark.sql.execution.datasources
import java.io.{DataInput, DataOutput, IOException}
import java.sql.Date
@@ -35,11 +35,12 @@ import org.apache.spark.sql.catalyst.util.DateTimeUtils.{rebaseGregorianToJulian
* @param julianDays The number of days since the epoch 1970-01-01 in
* Julian calendar.
*/
-private[hive] class DaysWritable(
+class DaysWritable(
var gregorianDays: Int,
var julianDays: Int)
extends DateWritable {
+ def this() = this(0, 0)
def this(gregorianDays: Int) =
this(gregorianDays, rebaseGregorianToJulianDays(gregorianDays))
def this(dateWritable: DateWritable) = {
@@ -55,6 +56,11 @@ private[hive] class DaysWritable(
override def getDays: Int = julianDays
override def get(): Date = new Date(DateWritable.daysToMillis(julianDays))
+ override def set(d: Int): Unit = {
+ gregorianDays = d
+ julianDays = rebaseGregorianToJulianDays(d)
+ }
+
@throws[IOException]
override def write(out: DataOutput): Unit = {
WritableUtils.writeVInt(out, julianDays)
diff --git a/sql/core/src/test/resources/test-data/before_1582_date_v2_4.snappy.orc b/sql/core/src/test/resources/test-data/before_1582_date_v2_4.snappy.orc
new file mode 100644
index 0000000..ebe0174
Binary files /dev/null and b/sql/core/src/test/resources/test-data/before_1582_date_v2_4.snappy.orc differ
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcSourceSuite.scala
index 1e27593..b5e002f 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcSourceSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcSourceSuite.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.execution.datasources.orc
import java.io.File
import java.nio.charset.StandardCharsets.UTF_8
-import java.sql.Timestamp
+import java.sql.{Date, Timestamp}
import java.util.Locale
import org.apache.hadoop.conf.Configuration
@@ -482,6 +482,32 @@ abstract class OrcSuite extends OrcTest with BeforeAndAfterAll {
}
}
}
+
+ test("SPARK-31238: compatibility with Spark 2.4 in reading dates") {
+ Seq(false, true).foreach { vectorized =>
+ withSQLConf(SQLConf.ORC_VECTORIZED_READER_ENABLED.key -> vectorized.toString) {
+ checkAnswer(
+ readResourceOrcFile("test-data/before_1582_date_v2_4.snappy.orc"),
+ Row(java.sql.Date.valueOf("1200-01-01")))
+ }
+ }
+ }
+
+ test("SPARK-31238: rebasing dates in write") {
+ withTempPath { dir =>
+ val path = dir.getAbsolutePath
+ Seq("1001-01-01").toDF("dateS")
+ .select($"dateS".cast("date").as("date"))
+ .write
+ .orc(path)
+
+ Seq(false, true).foreach { vectorized =>
+ withSQLConf(SQLConf.ORC_VECTORIZED_READER_ENABLED.key -> vectorized.toString) {
+ checkAnswer(spark.read.orc(path), Row(Date.valueOf("1001-01-01")))
+ }
+ }
+ }
+ }
}
class OrcSourceSuite extends OrcSuite with SharedSparkSession {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcTest.scala
index 388744b..16772fe 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcTest.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcTest.scala
@@ -133,4 +133,9 @@ abstract class OrcTest extends QueryTest with FileBasedDataSourceTest with Befor
throw new AnalysisException("Can not match OrcTable in the query.")
}
}
+
+ protected def readResourceOrcFile(name: String): DataFrame = {
+ val url = Thread.currentThread().getContextClassLoader.getResource(name)
+ spark.read.orc(url.toString)
+ }
}
diff --git a/sql/core/v1.2/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcColumnVector.java b/sql/core/v1.2/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcColumnVector.java
index 0dfed76..5dc3f37 100644
--- a/sql/core/v1.2/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcColumnVector.java
+++ b/sql/core/v1.2/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcColumnVector.java
@@ -23,6 +23,7 @@ import org.apache.orc.storage.ql.exec.vector.*;
import org.apache.spark.sql.catalyst.util.DateTimeUtils;
import org.apache.spark.sql.types.DataType;
+import org.apache.spark.sql.types.DateType;
import org.apache.spark.sql.types.Decimal;
import org.apache.spark.sql.types.TimestampType;
import org.apache.spark.sql.vectorized.ColumnarArray;
@@ -42,6 +43,7 @@ public class OrcColumnVector extends org.apache.spark.sql.vectorized.ColumnVecto
private DecimalColumnVector decimalData;
private TimestampColumnVector timestampData;
private final boolean isTimestamp;
+ private final boolean isDate;
private int batchSize;
@@ -54,6 +56,12 @@ public class OrcColumnVector extends org.apache.spark.sql.vectorized.ColumnVecto
isTimestamp = false;
}
+ if (type instanceof DateType) {
+ isDate = true;
+ } else {
+ isDate = false;
+ }
+
baseData = vector;
if (vector instanceof LongColumnVector) {
longData = (LongColumnVector) vector;
@@ -130,7 +138,12 @@ public class OrcColumnVector extends org.apache.spark.sql.vectorized.ColumnVecto
@Override
public int getInt(int rowId) {
- return (int) longData.vector[getRowIndex(rowId)];
+ int value = (int) longData.vector[getRowIndex(rowId)];
+ if (isDate) {
+ return DateTimeUtils.rebaseJulianToGregorianDays(value);
+ } else {
+ return value;
+ }
}
@Override
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/DaysWritable.scala b/sql/core/v1.2/src/main/scala/org/apache/spark/sql/execution/datasources/orc/DaysWritable.scala
similarity index 81%
rename from sql/hive/src/main/scala/org/apache/spark/sql/hive/DaysWritable.scala
rename to sql/core/v1.2/src/main/scala/org/apache/spark/sql/execution/datasources/orc/DaysWritable.scala
index 1eec8d7..4934a96 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/DaysWritable.scala
+++ b/sql/core/v1.2/src/main/scala/org/apache/spark/sql/execution/datasources/orc/DaysWritable.scala
@@ -15,13 +15,13 @@
* limitations under the License.
*/
-package org.apache.spark.sql.hive
+package org.apache.spark.sql.execution.datasources.orc
import java.io.{DataInput, DataOutput, IOException}
import java.sql.Date
-import org.apache.hadoop.hive.serde2.io.DateWritable
import org.apache.hadoop.io.WritableUtils
+import org.apache.orc.storage.serde2.io.DateWritable
import org.apache.spark.sql.catalyst.util.DateTimeUtils.{rebaseGregorianToJulianDays, rebaseJulianToGregorianDays}
@@ -30,16 +30,22 @@ import org.apache.spark.sql.catalyst.util.DateTimeUtils.{rebaseGregorianToJulian
* via conversion to local date in Julian calendar for dates before 1582-10-15
* in read/write for backward compatibility with Spark 2.4 and earlier versions.
*
+ * This is a clone of `org.apache.spark.sql.execution.datasources.DaysWritable`.
+ * The class is cloned because Hive ORC v1.2 uses different `DateWritable`:
+ * - v1.2: `org.apache.orc.storage.serde2.io.DateWritable`
+ * - v2.3 and `HiveInspectors`: `org.apache.hadoop.hive.serde2.io.DateWritable`
+ *
* @param gregorianDays The number of days since the epoch 1970-01-01 in
* Gregorian calendar.
* @param julianDays The number of days since the epoch 1970-01-01 in
* Julian calendar.
*/
-private[hive] class DaysWritable(
+class DaysWritable(
var gregorianDays: Int,
var julianDays: Int)
extends DateWritable {
+ def this() = this(0, 0)
def this(gregorianDays: Int) =
this(gregorianDays, rebaseGregorianToJulianDays(gregorianDays))
def this(dateWritable: DateWritable) = {
@@ -55,6 +61,11 @@ private[hive] class DaysWritable(
override def getDays: Int = julianDays
override def get(): Date = new Date(DateWritable.daysToMillis(julianDays))
+ override def set(d: Int): Unit = {
+ gregorianDays = d
+ julianDays = rebaseGregorianToJulianDays(d)
+ }
+
@throws[IOException]
override def write(out: DataOutput): Unit = {
WritableUtils.writeVInt(out, julianDays)
diff --git a/sql/core/v1.2/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcShimUtils.scala b/sql/core/v1.2/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcShimUtils.scala
index 68503ab..ece5280 100644
--- a/sql/core/v1.2/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcShimUtils.scala
+++ b/sql/core/v1.2/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcShimUtils.scala
@@ -47,13 +47,13 @@ private[sql] object OrcShimUtils {
def getDateWritable(reuseObj: Boolean): (SpecializedGetters, Int) => DateWritable = {
if (reuseObj) {
- val result = new DateWritable()
+ val result = new DaysWritable()
(getter, ordinal) =>
result.set(getter.getInt(ordinal))
result
} else {
(getter: SpecializedGetters, ordinal: Int) =>
- new DateWritable(getter.getInt(ordinal))
+ new DaysWritable(getter.getInt(ordinal))
}
}
diff --git a/sql/core/v2.3/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcColumnVector.java b/sql/core/v2.3/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcColumnVector.java
index 35447fe..7be4a6f 100644
--- a/sql/core/v2.3/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcColumnVector.java
+++ b/sql/core/v2.3/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcColumnVector.java
@@ -23,6 +23,7 @@ import org.apache.hadoop.hive.ql.exec.vector.*;
import org.apache.spark.sql.catalyst.util.DateTimeUtils;
import org.apache.spark.sql.types.DataType;
+import org.apache.spark.sql.types.DateType;
import org.apache.spark.sql.types.Decimal;
import org.apache.spark.sql.types.TimestampType;
import org.apache.spark.sql.vectorized.ColumnarArray;
@@ -42,6 +43,7 @@ public class OrcColumnVector extends org.apache.spark.sql.vectorized.ColumnVecto
private DecimalColumnVector decimalData;
private TimestampColumnVector timestampData;
private final boolean isTimestamp;
+ private final boolean isDate;
private int batchSize;
@@ -54,6 +56,12 @@ public class OrcColumnVector extends org.apache.spark.sql.vectorized.ColumnVecto
isTimestamp = false;
}
+ if (type instanceof DateType) {
+ isDate = true;
+ } else {
+ isDate = false;
+ }
+
baseData = vector;
if (vector instanceof LongColumnVector) {
longData = (LongColumnVector) vector;
@@ -130,7 +138,12 @@ public class OrcColumnVector extends org.apache.spark.sql.vectorized.ColumnVecto
@Override
public int getInt(int rowId) {
- return (int) longData.vector[getRowIndex(rowId)];
+ int value = (int) longData.vector[getRowIndex(rowId)];
+ if (isDate) {
+ return DateTimeUtils.rebaseJulianToGregorianDays(value);
+ } else {
+ return value;
+ }
}
@Override
diff --git a/sql/core/v2.3/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcShimUtils.scala b/sql/core/v2.3/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcShimUtils.scala
index c32f024..5666d31 100644
--- a/sql/core/v2.3/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcShimUtils.scala
+++ b/sql/core/v2.3/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcShimUtils.scala
@@ -26,6 +26,7 @@ import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf.{Operator => OrcOperator}
import org.apache.hadoop.hive.serde2.io.{DateWritable, HiveDecimalWritable}
import org.apache.spark.sql.catalyst.expressions.SpecializedGetters
+import org.apache.spark.sql.execution.datasources.DaysWritable
import org.apache.spark.sql.types.Decimal
/**
@@ -47,13 +48,13 @@ private[sql] object OrcShimUtils {
def getDateWritable(reuseObj: Boolean): (SpecializedGetters, Int) => DateWritable = {
if (reuseObj) {
- val result = new DateWritable()
+ val result = new DaysWritable()
(getter, ordinal) =>
result.set(getter.getInt(ordinal))
result
} else {
(getter: SpecializedGetters, ordinal: Int) =>
- new DateWritable(getter.getInt(ordinal))
+ new DaysWritable(getter.getInt(ordinal))
}
}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala
index e3e9a31..16e9014 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala
@@ -32,6 +32,7 @@ import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.util._
+import org.apache.spark.sql.execution.datasources.DaysWritable
import org.apache.spark.sql.types
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org