You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by do...@apache.org on 2020/03/26 20:17:54 UTC

[spark] branch branch-3.0 updated: [SPARK-31238][SQL] Rebase dates to/from Julian calendar in write/read for ORC datasource

This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 78cc2ef  [SPARK-31238][SQL] Rebase dates to/from Julian calendar in write/read for ORC datasource
78cc2ef is described below

commit 78cc2ef5b663d6d605e3d4febc6fb99e20b7f165
Author: Maxim Gekk <ma...@gmail.com>
AuthorDate: Thu Mar 26 13:14:28 2020 -0700

    [SPARK-31238][SQL] Rebase dates to/from Julian calendar in write/read for ORC datasource
    
    ### What changes were proposed in this pull request?
    
    This PR (SPARK-31238) aims the followings.
    1. Modified ORC Vectorized Reader, in particular, OrcColumnVector v1.2 and v2.3. After the changes, it uses `DateTimeUtils. rebaseJulianToGregorianDays()` added by https://github.com/apache/spark/pull/27915 . The method performs rebasing days from the hybrid calendar (Julian + Gregorian) to Proleptic Gregorian calendar. It builds a local date in the original calendar, extracts date fields `year`, `month` and `day` from the local date, and builds another local date in the target calend [...]
    2. Introduced rebasing dates while saving ORC files, in particular, I modified `OrcShimUtils. getDateWritable` v1.2 and v2.3, and returned `DaysWritable` instead of Hive's `DateWritable`. The `DaysWritable` class was added by the PR https://github.com/apache/spark/pull/27890 (and fixed by https://github.com/apache/spark/pull/27962). I moved `DaysWritable` from `sql/hive` to `sql/core` to re-use it in ORC datasource.
    
    ### Why are the changes needed?
    For the backward compatibility with Spark 2.4 and earlier versions. The changes allow users to read dates/timestamps saved by previous version, and get the same result.
    
    ### Does this PR introduce any user-facing change?
    Yes. Before the changes, loading the date `1200-01-01` saved by Spark 2.4.5 returns the following:
    ```scala
    scala> spark.read.orc("/Users/maxim/tmp/before_1582/2_4_5_date_orc").show(false)
    +----------+
    |dt        |
    +----------+
    |1200-01-08|
    +----------+
    ```
    After the changes
    ```scala
    scala> spark.read.orc("/Users/maxim/tmp/before_1582/2_4_5_date_orc").show(false)
    +----------+
    |dt        |
    +----------+
    |1200-01-01|
    +----------+
    ```
    
    ### How was this patch tested?
    - By running `OrcSourceSuite` and `HiveOrcSourceSuite`.
    - Add new test `SPARK-31238: compatibility with Spark 2.4 in reading dates` to `OrcSuite` which reads an ORC file saved by Spark 2.4.5 via the commands:
    ```shell
    $ export TZ="America/Los_Angeles"
    ```
    ```scala
    scala> sql("select cast('1200-01-01' as date) dt").write.mode("overwrite").orc("/Users/maxim/tmp/before_1582/2_4_5_date_orc")
    scala> spark.read.orc("/Users/maxim/tmp/before_1582/2_4_5_date_orc").show(false)
    +----------+
    |dt        |
    +----------+
    |1200-01-01|
    +----------+
    ```
    - Add round trip test `SPARK-31238: rebasing dates in write`. The test `SPARK-31238: compatibility with Spark 2.4 in reading dates` confirms rebasing in read. So, we can check rebasing in write.
    
    Closes #28016 from MaxGekk/rebase-date-orc.
    
    Authored-by: Maxim Gekk <ma...@gmail.com>
    Signed-off-by: Dongjoon Hyun <do...@apache.org>
    (cherry picked from commit d72ec8574113f9a7e87f3d7ec56c8447267b0506)
    Signed-off-by: Dongjoon Hyun <do...@apache.org>
---
 .../sql/execution/datasources}/DaysWritable.scala  |  10 ++++++--
 .../test-data/before_1582_date_v2_4.snappy.orc     | Bin 0 -> 201 bytes
 .../execution/datasources/orc/OrcSourceSuite.scala |  28 ++++++++++++++++++++-
 .../sql/execution/datasources/orc/OrcTest.scala    |   5 ++++
 .../execution/datasources/orc/OrcColumnVector.java |  15 ++++++++++-
 .../execution/datasources/orc}/DaysWritable.scala  |  17 ++++++++++---
 .../execution/datasources/orc/OrcShimUtils.scala   |   4 +--
 .../execution/datasources/orc/OrcColumnVector.java |  15 ++++++++++-
 .../execution/datasources/orc/OrcShimUtils.scala   |   5 ++--
 .../org/apache/spark/sql/hive/HiveInspectors.scala |   1 +
 10 files changed, 88 insertions(+), 12 deletions(-)

diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/DaysWritable.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DaysWritable.scala
similarity index 92%
copy from sql/hive/src/main/scala/org/apache/spark/sql/hive/DaysWritable.scala
copy to sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DaysWritable.scala
index 1eec8d7..00b710f 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/DaysWritable.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DaysWritable.scala
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.spark.sql.hive
+package org.apache.spark.sql.execution.datasources
 
 import java.io.{DataInput, DataOutput, IOException}
 import java.sql.Date
@@ -35,11 +35,12 @@ import org.apache.spark.sql.catalyst.util.DateTimeUtils.{rebaseGregorianToJulian
  * @param julianDays The number of days since the epoch 1970-01-01 in
  *                   Julian calendar.
  */
-private[hive] class DaysWritable(
+class DaysWritable(
     var gregorianDays: Int,
     var julianDays: Int)
   extends DateWritable {
 
+  def this() = this(0, 0)
   def this(gregorianDays: Int) =
     this(gregorianDays, rebaseGregorianToJulianDays(gregorianDays))
   def this(dateWritable: DateWritable) = {
@@ -55,6 +56,11 @@ private[hive] class DaysWritable(
   override def getDays: Int = julianDays
   override def get(): Date = new Date(DateWritable.daysToMillis(julianDays))
 
+  override def set(d: Int): Unit = {
+    gregorianDays = d
+    julianDays = rebaseGregorianToJulianDays(d)
+  }
+
   @throws[IOException]
   override def write(out: DataOutput): Unit = {
     WritableUtils.writeVInt(out, julianDays)
diff --git a/sql/core/src/test/resources/test-data/before_1582_date_v2_4.snappy.orc b/sql/core/src/test/resources/test-data/before_1582_date_v2_4.snappy.orc
new file mode 100644
index 0000000..ebe0174
Binary files /dev/null and b/sql/core/src/test/resources/test-data/before_1582_date_v2_4.snappy.orc differ
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcSourceSuite.scala
index 1e27593..b5e002f 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcSourceSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcSourceSuite.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.execution.datasources.orc
 
 import java.io.File
 import java.nio.charset.StandardCharsets.UTF_8
-import java.sql.Timestamp
+import java.sql.{Date, Timestamp}
 import java.util.Locale
 
 import org.apache.hadoop.conf.Configuration
@@ -482,6 +482,32 @@ abstract class OrcSuite extends OrcTest with BeforeAndAfterAll {
       }
     }
   }
+
+  test("SPARK-31238: compatibility with Spark 2.4 in reading dates") {
+    Seq(false, true).foreach { vectorized =>
+      withSQLConf(SQLConf.ORC_VECTORIZED_READER_ENABLED.key -> vectorized.toString) {
+        checkAnswer(
+          readResourceOrcFile("test-data/before_1582_date_v2_4.snappy.orc"),
+          Row(java.sql.Date.valueOf("1200-01-01")))
+      }
+    }
+  }
+
+  test("SPARK-31238: rebasing dates in write") {
+    withTempPath { dir =>
+      val path = dir.getAbsolutePath
+      Seq("1001-01-01").toDF("dateS")
+        .select($"dateS".cast("date").as("date"))
+        .write
+        .orc(path)
+
+      Seq(false, true).foreach { vectorized =>
+        withSQLConf(SQLConf.ORC_VECTORIZED_READER_ENABLED.key -> vectorized.toString) {
+          checkAnswer(spark.read.orc(path), Row(Date.valueOf("1001-01-01")))
+        }
+      }
+    }
+  }
 }
 
 class OrcSourceSuite extends OrcSuite with SharedSparkSession {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcTest.scala
index 388744b..16772fe 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcTest.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcTest.scala
@@ -133,4 +133,9 @@ abstract class OrcTest extends QueryTest with FileBasedDataSourceTest with Befor
         throw new AnalysisException("Can not match OrcTable in the query.")
     }
   }
+
+  protected def readResourceOrcFile(name: String): DataFrame = {
+    val url = Thread.currentThread().getContextClassLoader.getResource(name)
+    spark.read.orc(url.toString)
+  }
 }
diff --git a/sql/core/v1.2/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcColumnVector.java b/sql/core/v1.2/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcColumnVector.java
index 0dfed76..5dc3f37 100644
--- a/sql/core/v1.2/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcColumnVector.java
+++ b/sql/core/v1.2/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcColumnVector.java
@@ -23,6 +23,7 @@ import org.apache.orc.storage.ql.exec.vector.*;
 
 import org.apache.spark.sql.catalyst.util.DateTimeUtils;
 import org.apache.spark.sql.types.DataType;
+import org.apache.spark.sql.types.DateType;
 import org.apache.spark.sql.types.Decimal;
 import org.apache.spark.sql.types.TimestampType;
 import org.apache.spark.sql.vectorized.ColumnarArray;
@@ -42,6 +43,7 @@ public class OrcColumnVector extends org.apache.spark.sql.vectorized.ColumnVecto
   private DecimalColumnVector decimalData;
   private TimestampColumnVector timestampData;
   private final boolean isTimestamp;
+  private final boolean isDate;
 
   private int batchSize;
 
@@ -54,6 +56,12 @@ public class OrcColumnVector extends org.apache.spark.sql.vectorized.ColumnVecto
       isTimestamp = false;
     }
 
+    if (type instanceof DateType) {
+      isDate = true;
+    } else {
+      isDate = false;
+    }
+
     baseData = vector;
     if (vector instanceof LongColumnVector) {
       longData = (LongColumnVector) vector;
@@ -130,7 +138,12 @@ public class OrcColumnVector extends org.apache.spark.sql.vectorized.ColumnVecto
 
   @Override
   public int getInt(int rowId) {
-    return (int) longData.vector[getRowIndex(rowId)];
+    int value = (int) longData.vector[getRowIndex(rowId)];
+    if (isDate) {
+      return DateTimeUtils.rebaseJulianToGregorianDays(value);
+    } else {
+      return value;
+    }
   }
 
   @Override
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/DaysWritable.scala b/sql/core/v1.2/src/main/scala/org/apache/spark/sql/execution/datasources/orc/DaysWritable.scala
similarity index 81%
rename from sql/hive/src/main/scala/org/apache/spark/sql/hive/DaysWritable.scala
rename to sql/core/v1.2/src/main/scala/org/apache/spark/sql/execution/datasources/orc/DaysWritable.scala
index 1eec8d7..4934a96 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/DaysWritable.scala
+++ b/sql/core/v1.2/src/main/scala/org/apache/spark/sql/execution/datasources/orc/DaysWritable.scala
@@ -15,13 +15,13 @@
  * limitations under the License.
  */
 
-package org.apache.spark.sql.hive
+package org.apache.spark.sql.execution.datasources.orc
 
 import java.io.{DataInput, DataOutput, IOException}
 import java.sql.Date
 
-import org.apache.hadoop.hive.serde2.io.DateWritable
 import org.apache.hadoop.io.WritableUtils
+import org.apache.orc.storage.serde2.io.DateWritable
 
 import org.apache.spark.sql.catalyst.util.DateTimeUtils.{rebaseGregorianToJulianDays, rebaseJulianToGregorianDays}
 
@@ -30,16 +30,22 @@ import org.apache.spark.sql.catalyst.util.DateTimeUtils.{rebaseGregorianToJulian
  * via conversion to local date in Julian calendar for dates before 1582-10-15
  * in read/write for backward compatibility with Spark 2.4 and earlier versions.
  *
+ * This is a clone of `org.apache.spark.sql.execution.datasources.DaysWritable`.
+ * The class is cloned because Hive ORC v1.2 uses different `DateWritable`:
+ *   - v1.2: `org.apache.orc.storage.serde2.io.DateWritable`
+ *   - v2.3 and `HiveInspectors`: `org.apache.hadoop.hive.serde2.io.DateWritable`
+ *
  * @param gregorianDays The number of days since the epoch 1970-01-01 in
  *                      Gregorian calendar.
  * @param julianDays The number of days since the epoch 1970-01-01 in
  *                   Julian calendar.
  */
-private[hive] class DaysWritable(
+class DaysWritable(
     var gregorianDays: Int,
     var julianDays: Int)
   extends DateWritable {
 
+  def this() = this(0, 0)
   def this(gregorianDays: Int) =
     this(gregorianDays, rebaseGregorianToJulianDays(gregorianDays))
   def this(dateWritable: DateWritable) = {
@@ -55,6 +61,11 @@ private[hive] class DaysWritable(
   override def getDays: Int = julianDays
   override def get(): Date = new Date(DateWritable.daysToMillis(julianDays))
 
+  override def set(d: Int): Unit = {
+    gregorianDays = d
+    julianDays = rebaseGregorianToJulianDays(d)
+  }
+
   @throws[IOException]
   override def write(out: DataOutput): Unit = {
     WritableUtils.writeVInt(out, julianDays)
diff --git a/sql/core/v1.2/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcShimUtils.scala b/sql/core/v1.2/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcShimUtils.scala
index 68503ab..ece5280 100644
--- a/sql/core/v1.2/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcShimUtils.scala
+++ b/sql/core/v1.2/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcShimUtils.scala
@@ -47,13 +47,13 @@ private[sql] object OrcShimUtils {
 
   def getDateWritable(reuseObj: Boolean): (SpecializedGetters, Int) => DateWritable = {
     if (reuseObj) {
-      val result = new DateWritable()
+      val result = new DaysWritable()
       (getter, ordinal) =>
         result.set(getter.getInt(ordinal))
         result
     } else {
       (getter: SpecializedGetters, ordinal: Int) =>
-        new DateWritable(getter.getInt(ordinal))
+        new DaysWritable(getter.getInt(ordinal))
     }
   }
 
diff --git a/sql/core/v2.3/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcColumnVector.java b/sql/core/v2.3/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcColumnVector.java
index 35447fe..7be4a6f 100644
--- a/sql/core/v2.3/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcColumnVector.java
+++ b/sql/core/v2.3/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcColumnVector.java
@@ -23,6 +23,7 @@ import org.apache.hadoop.hive.ql.exec.vector.*;
 
 import org.apache.spark.sql.catalyst.util.DateTimeUtils;
 import org.apache.spark.sql.types.DataType;
+import org.apache.spark.sql.types.DateType;
 import org.apache.spark.sql.types.Decimal;
 import org.apache.spark.sql.types.TimestampType;
 import org.apache.spark.sql.vectorized.ColumnarArray;
@@ -42,6 +43,7 @@ public class OrcColumnVector extends org.apache.spark.sql.vectorized.ColumnVecto
   private DecimalColumnVector decimalData;
   private TimestampColumnVector timestampData;
   private final boolean isTimestamp;
+  private final boolean isDate;
 
   private int batchSize;
 
@@ -54,6 +56,12 @@ public class OrcColumnVector extends org.apache.spark.sql.vectorized.ColumnVecto
       isTimestamp = false;
     }
 
+    if (type instanceof DateType) {
+      isDate = true;
+    } else {
+      isDate = false;
+    }
+
     baseData = vector;
     if (vector instanceof LongColumnVector) {
       longData = (LongColumnVector) vector;
@@ -130,7 +138,12 @@ public class OrcColumnVector extends org.apache.spark.sql.vectorized.ColumnVecto
 
   @Override
   public int getInt(int rowId) {
-    return (int) longData.vector[getRowIndex(rowId)];
+    int value = (int) longData.vector[getRowIndex(rowId)];
+    if (isDate) {
+      return DateTimeUtils.rebaseJulianToGregorianDays(value);
+    } else {
+      return value;
+    }
   }
 
   @Override
diff --git a/sql/core/v2.3/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcShimUtils.scala b/sql/core/v2.3/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcShimUtils.scala
index c32f024..5666d31 100644
--- a/sql/core/v2.3/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcShimUtils.scala
+++ b/sql/core/v2.3/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcShimUtils.scala
@@ -26,6 +26,7 @@ import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf.{Operator => OrcOperator}
 import org.apache.hadoop.hive.serde2.io.{DateWritable, HiveDecimalWritable}
 
 import org.apache.spark.sql.catalyst.expressions.SpecializedGetters
+import org.apache.spark.sql.execution.datasources.DaysWritable
 import org.apache.spark.sql.types.Decimal
 
 /**
@@ -47,13 +48,13 @@ private[sql] object OrcShimUtils {
 
   def getDateWritable(reuseObj: Boolean): (SpecializedGetters, Int) => DateWritable = {
     if (reuseObj) {
-      val result = new DateWritable()
+      val result = new DaysWritable()
       (getter, ordinal) =>
         result.set(getter.getInt(ordinal))
         result
     } else {
       (getter: SpecializedGetters, ordinal: Int) =>
-        new DateWritable(getter.getInt(ordinal))
+        new DaysWritable(getter.getInt(ordinal))
     }
   }
 
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala
index e3e9a31..16e9014 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala
@@ -32,6 +32,7 @@ import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.util._
+import org.apache.spark.sql.execution.datasources.DaysWritable
 import org.apache.spark.sql.types
 import org.apache.spark.sql.types._
 import org.apache.spark.unsafe.types.UTF8String


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org