You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ge...@apache.org on 2021/11/18 02:47:05 UTC

[spark] branch master updated: [SPARK-36346][SQL] Support TimestampNTZ type in Orc file source

This is an automated email from the ASF dual-hosted git repository.

gengliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 6f6a066  [SPARK-36346][SQL] Support TimestampNTZ type in Orc file source
6f6a066 is described below

commit 6f6a066dff7e1ceceb80ab5c3809d5e6fb1474e2
Author: Jiaan Geng <be...@163.com>
AuthorDate: Thu Nov 18 10:46:15 2021 +0800

    [SPARK-36346][SQL] Support TimestampNTZ type in Orc file source
    
    ### What changes were proposed in this pull request?
    As per https://orc.apache.org/docs/types.html, ORC includes two different forms of timestamps from the SQL world:
    
    `Timestamp` is a date and time without a time zone, which does not change based on the time zone of the reader.
    `Timestamp with local time zone` is a fixed instant in time, which does change based on the time zone of the reader.
    
    So the contrast relationship of timestamp between Spark and ORC as follows:
    - `TIMESTAMP_LTZ` => `Timestamp with local time zone`.
    - `TIMESTAMP_NTZ` => `Timestamp`
    
    Unfortunately, in Spark 3.1 or prior, Spark considered ORC `Timestamp` as `TIMESTAMP_LTZ` mistakely.
    The behavior is not correct. To keep backward compatibility, we not change the mistake now.
    Since 3.2, with the support of timestamp without time zone type:
    
    - For the ORC writer, we will have to map both Spark’s `TimestampLTZ` and `TimestampNTZ` as ORC’s `TimestampNTZ` for backward compatibility. In addition, the ORC should write the column type in ORC’s type information with the key “spark.sql.catalyst.type”.
    - For the ORC reader. If the type information contains the key “spark.sql.catalyst.type”, infer the data type as per the value
    Otherwise, given an ORC `TimestampLTZ` column, we will infer it as Spark’s `TimestampType`; given an ORC `TimestampNTZ`, the inferred result is `TimestampType`(`TimestampLTZ`) so that the behavior is consistent with older version Spark.
    
    ### Why are the changes needed?
    Docking `TimestampNTZ` type in Spark and `Timestamp` in ORC.
    
    ### Does this PR introduce _any_ user-facing change?
    'Yes'.
    Spark will read/write the `TimestampNTZ`/`Timestamp` in ORC.
    
    ### How was this patch tested?
    New tests.
    
    Closes #33588 from beliefer/SPARK-36346.
    
    Lead-authored-by: Jiaan Geng <be...@163.com>
    Co-authored-by: gengjiaan <ge...@360.cn>
    Signed-off-by: Gengliang Wang <ge...@apache.org>
---
 .../spark/sql/errors/QueryExecutionErrors.scala    |  4 ++
 .../datasources/orc/OrcAtomicColumnVector.java     | 10 ++++
 .../datasources/orc/OrcDeserializer.scala          |  3 ++
 .../execution/datasources/orc/OrcFileFormat.scala  | 18 -------
 .../execution/datasources/orc/OrcSerializer.scala  |  5 +-
 .../sql/execution/datasources/orc/OrcUtils.scala   | 44 +++++++++++++--
 .../execution/datasources/v2/orc/OrcWrite.scala    |  4 +-
 .../spark/sql/FileBasedDataSourceSuite.scala       |  2 +-
 .../execution/datasources/orc/OrcQuerySuite.scala  | 62 ++++++++++++++++++++++
 .../sql/execution/datasources/orc/OrcTest.scala    |  7 +++
 10 files changed, 132 insertions(+), 27 deletions(-)

diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala
index d7cd8e1..ba3dd52 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala
@@ -1889,5 +1889,9 @@ object QueryExecutionErrors {
   def hiveTableWithAnsiIntervalsError(tableName: String): Throwable = {
     new UnsupportedOperationException(s"Hive table $tableName with ANSI intervals is not supported")
   }
+
+  def cannotConvertOrcTimestampToTimestampNTZError(): Throwable = {
+    new RuntimeException("Unable to convert timestamp of Orc to data type 'timestamp_ntz'")
+  }
 }
 
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcAtomicColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcAtomicColumnVector.java
index c2d8334..b4f7b99 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcAtomicColumnVector.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcAtomicColumnVector.java
@@ -27,6 +27,7 @@ import org.apache.spark.sql.types.DataType;
 import org.apache.spark.sql.types.DateType;
 import org.apache.spark.sql.types.Decimal;
 import org.apache.spark.sql.types.TimestampType;
+import org.apache.spark.sql.types.TimestampNTZType;
 import org.apache.spark.sql.vectorized.ColumnarArray;
 import org.apache.spark.sql.vectorized.ColumnarMap;
 import org.apache.spark.unsafe.types.UTF8String;
@@ -36,6 +37,7 @@ import org.apache.spark.unsafe.types.UTF8String;
  */
 public class OrcAtomicColumnVector extends OrcColumnVector {
   private final boolean isTimestamp;
+  private final boolean isTimestampNTZ;
   private final boolean isDate;
 
   // Column vector for each type. Only 1 is populated for any type.
@@ -54,6 +56,12 @@ public class OrcAtomicColumnVector extends OrcColumnVector {
       isTimestamp = false;
     }
 
+    if (type instanceof TimestampNTZType) {
+      isTimestampNTZ = true;
+    } else {
+      isTimestampNTZ = false;
+    }
+
     if (type instanceof DateType) {
       isDate = true;
     } else {
@@ -105,6 +113,8 @@ public class OrcAtomicColumnVector extends OrcColumnVector {
     int index = getRowIndex(rowId);
     if (isTimestamp) {
       return DateTimeUtils.fromJavaTimestamp(timestampData.asScratchTimestamp(index));
+    } else if (isTimestampNTZ) {
+      return OrcUtils.fromOrcNTZ(timestampData.asScratchTimestamp(index));
     } else {
       return longData.vector[index];
     }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcDeserializer.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcDeserializer.scala
index 9140833..7ab556e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcDeserializer.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcDeserializer.scala
@@ -129,6 +129,9 @@ class OrcDeserializer(
       case TimestampType => (ordinal, value) =>
         updater.setLong(ordinal, DateTimeUtils.fromJavaTimestamp(value.asInstanceOf[OrcTimestamp]))
 
+      case TimestampNTZType => (ordinal, value) =>
+        updater.setLong(ordinal, OrcUtils.fromOrcNTZ(value.asInstanceOf[OrcTimestamp]))
+
       case DecimalType.Fixed(precision, scale) => (ordinal, value) =>
         val v = OrcShimUtils.getDecimal(value)
         v.changePrecision(precision, scale)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala
index 26af2c3..ce851c5 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala
@@ -42,24 +42,6 @@ import org.apache.spark.sql.sources._
 import org.apache.spark.sql.types._
 import org.apache.spark.util.{SerializableConfiguration, Utils}
 
-private[sql] object OrcFileFormat {
-
-  def getQuotedSchemaString(dataType: DataType): String = dataType match {
-    case _: DayTimeIntervalType => LongType.catalogString
-    case _: YearMonthIntervalType => IntegerType.catalogString
-    case _: AtomicType => dataType.catalogString
-    case StructType(fields) =>
-      fields.map(f => s"`${f.name}`:${getQuotedSchemaString(f.dataType)}")
-        .mkString("struct<", ",", ">")
-    case ArrayType(elementType, _) =>
-      s"array<${getQuotedSchemaString(elementType)}>"
-    case MapType(keyType, valueType, _) =>
-      s"map<${getQuotedSchemaString(keyType)},${getQuotedSchemaString(valueType)}>"
-    case _ => // UDT and others
-      dataType.catalogString
-  }
-}
-
 /**
  * New ORC File Format based on Apache ORC.
  */
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcSerializer.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcSerializer.scala
index 9a1eb8a..edd5052 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcSerializer.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcSerializer.scala
@@ -18,7 +18,6 @@
 package org.apache.spark.sql.execution.datasources.orc
 
 import org.apache.hadoop.io._
-import org.apache.orc.TypeDescription
 import org.apache.orc.mapred.{OrcList, OrcMap, OrcStruct, OrcTimestamp}
 
 import org.apache.spark.sql.catalyst.InternalRow
@@ -148,6 +147,8 @@ class OrcSerializer(dataSchema: StructType) {
       result.setNanos(ts.getNanos)
       result
 
+    case TimestampNTZType => (getter, ordinal) => OrcUtils.toOrcNTZ(getter.getLong(ordinal))
+
     case DecimalType.Fixed(precision, scale) =>
       OrcShimUtils.getHiveDecimalWritable(precision, scale)
 
@@ -214,6 +215,6 @@ class OrcSerializer(dataSchema: StructType) {
    * Return a Orc value object for the given Spark schema.
    */
   private def createOrcValue(dataType: DataType) = {
-    OrcStruct.createValue(TypeDescription.fromString(OrcFileFormat.getQuotedSchemaString(dataType)))
+    OrcStruct.createValue(OrcUtils.orcTypeDescription(dataType))
   }
 }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala
index 17aab36..d1b7e8d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala
@@ -18,6 +18,7 @@
 package org.apache.spark.sql.execution.datasources.orc
 
 import java.nio.charset.StandardCharsets.UTF_8
+import java.sql.Timestamp
 import java.util.Locale
 
 import scala.collection.JavaConverters._
@@ -28,6 +29,7 @@ import org.apache.hadoop.fs.{FileStatus, Path}
 import org.apache.hadoop.hive.serde2.io.DateWritable
 import org.apache.hadoop.io.{BooleanWritable, ByteWritable, DoubleWritable, FloatWritable, IntWritable, LongWritable, ShortWritable, WritableComparable}
 import org.apache.orc.{BooleanColumnStatistics, ColumnStatistics, DateColumnStatistics, DoubleColumnStatistics, IntegerColumnStatistics, OrcConf, OrcFile, Reader, TypeDescription, Writer}
+import org.apache.orc.mapred.OrcTimestamp
 
 import org.apache.spark.{SPARK_VERSION_SHORT, SparkException}
 import org.apache.spark.deploy.SparkHadoopUtil
@@ -37,7 +39,8 @@ import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.analysis.caseSensitiveResolution
 import org.apache.spark.sql.catalyst.expressions.JoinedRow
 import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
-import org.apache.spark.sql.catalyst.util.{quoteIdentifier, CharVarcharUtils}
+import org.apache.spark.sql.catalyst.util.{quoteIdentifier, CharVarcharUtils, DateTimeUtils}
+import org.apache.spark.sql.catalyst.util.DateTimeConstants._
 import org.apache.spark.sql.connector.expressions.aggregate.{Aggregation, Count, CountStar, Max, Min}
 import org.apache.spark.sql.errors.QueryExecutionErrors
 import org.apache.spark.sql.execution.datasources.{AggregatePushDownUtils, SchemaMergeUtils}
@@ -197,7 +200,18 @@ object OrcUtils extends Logging {
       requiredSchema: StructType,
       reader: Reader,
       conf: Configuration): Option[(Array[Int], Boolean)] = {
-    val orcFieldNames = reader.getSchema.getFieldNames.asScala
+    def checkTimestampCompatibility(orcCatalystSchema: StructType, dataSchema: StructType): Unit = {
+      orcCatalystSchema.fields.map(_.dataType).zip(dataSchema.fields.map(_.dataType)).foreach {
+        case (TimestampType, TimestampNTZType) =>
+          throw QueryExecutionErrors.cannotConvertOrcTimestampToTimestampNTZError()
+        case (t1: StructType, t2: StructType) => checkTimestampCompatibility(t1, t2)
+        case _ =>
+      }
+    }
+
+    val orcSchema = reader.getSchema
+    checkTimestampCompatibility(toCatalystSchema(orcSchema), dataSchema)
+    val orcFieldNames = orcSchema.getFieldNames.asScala
     val forcePositionalEvolution = OrcConf.FORCE_POSITIONAL_EVOLUTION.getBoolean(conf)
     if (orcFieldNames.isEmpty) {
       // SPARK-8501: Some old empty ORC files always have an empty schema stored in their footer.
@@ -278,6 +292,7 @@ object OrcUtils extends Logging {
       s"array<${orcTypeDescriptionString(a.elementType)}>"
     case m: MapType =>
       s"map<${orcTypeDescriptionString(m.keyType)},${orcTypeDescriptionString(m.valueType)}>"
+    case TimestampNTZType => TypeDescription.Category.TIMESTAMP.getName
     case _: DayTimeIntervalType => LongType.catalogString
     case _: YearMonthIntervalType => IntegerType.catalogString
     case _ => dt.catalogString
@@ -287,15 +302,23 @@ object OrcUtils extends Logging {
     def getInnerTypeDecription(dt: DataType): Option[TypeDescription] = {
       dt match {
         case y: YearMonthIntervalType =>
-          val typeDesc = orcTypeDescription(IntegerType)
+          val typeDesc = new TypeDescription(TypeDescription.Category.INT)
           typeDesc.setAttribute(
             CATALYST_TYPE_ATTRIBUTE_NAME, y.typeName)
           Some(typeDesc)
         case d: DayTimeIntervalType =>
-          val typeDesc = orcTypeDescription(LongType)
+          val typeDesc = new TypeDescription(TypeDescription.Category.LONG)
           typeDesc.setAttribute(
             CATALYST_TYPE_ATTRIBUTE_NAME, d.typeName)
           Some(typeDesc)
+        case n: TimestampNTZType =>
+          val typeDesc = new TypeDescription(TypeDescription.Category.TIMESTAMP)
+          typeDesc.setAttribute(CATALYST_TYPE_ATTRIBUTE_NAME, n.typeName)
+          Some(typeDesc)
+        case t: TimestampType =>
+          val typeDesc = new TypeDescription(TypeDescription.Category.TIMESTAMP)
+          typeDesc.setAttribute(CATALYST_TYPE_ATTRIBUTE_NAME, t.typeName)
+          Some(typeDesc)
         case _ => None
       }
     }
@@ -506,4 +529,17 @@ object OrcUtils extends Logging {
       resultRow
     }
   }
+
+  def fromOrcNTZ(ts: Timestamp): Long = {
+    DateTimeUtils.millisToMicros(ts.getTime) +
+      (ts.getNanos / NANOS_PER_MICROS) % MICROS_PER_MILLIS
+  }
+
+  def toOrcNTZ(micros: Long): OrcTimestamp = {
+    val seconds = Math.floorDiv(micros, MICROS_PER_SECOND)
+    val nanos = (micros - seconds * MICROS_PER_SECOND) * NANOS_PER_MICROS
+    val result = new OrcTimestamp(seconds * MILLIS_PER_SECOND)
+    result.setNanos(nanos.toInt)
+    result
+  }
 }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcWrite.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcWrite.scala
index 286e871..1ac9266e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcWrite.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcWrite.scala
@@ -23,7 +23,7 @@ import org.apache.orc.mapred.OrcStruct
 
 import org.apache.spark.sql.connector.write.LogicalWriteInfo
 import org.apache.spark.sql.execution.datasources.{OutputWriter, OutputWriterFactory}
-import org.apache.spark.sql.execution.datasources.orc.{OrcFileFormat, OrcOptions, OrcOutputWriter, OrcUtils}
+import org.apache.spark.sql.execution.datasources.orc.{OrcOptions, OrcOutputWriter, OrcUtils}
 import org.apache.spark.sql.execution.datasources.v2.FileWrite
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types._
@@ -43,7 +43,7 @@ case class OrcWrite(
 
     val conf = job.getConfiguration
 
-    conf.set(MAPRED_OUTPUT_SCHEMA.getAttribute, OrcFileFormat.getQuotedSchemaString(dataSchema))
+    conf.set(MAPRED_OUTPUT_SCHEMA.getAttribute, OrcUtils.orcTypeDescriptionString(dataSchema))
 
     conf.set(COMPRESS.getAttribute, orcOptions.compressionCodec)
 
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
index 3f2f12d..5180908 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
@@ -697,7 +697,7 @@ class FileBasedDataSourceSuite extends QueryTest
   test("SPARK-22790,SPARK-27668: spark.sql.sources.compressionFactor takes effect") {
     Seq(1.0, 0.5).foreach { compressionFactor =>
       withSQLConf(SQLConf.FILE_COMPRESSION_FACTOR.key -> compressionFactor.toString,
-        SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "350") {
+        SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "457") {
         withTempPath { workDir =>
           // the file size is 504 bytes
           val workDirPath = workDir.getAbsolutePath
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcQuerySuite.scala
index e4c33e96f..2d6978a 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcQuerySuite.scala
@@ -20,6 +20,7 @@ package org.apache.spark.sql.execution.datasources.orc
 import java.io.File
 import java.nio.charset.StandardCharsets
 import java.sql.Timestamp
+import java.time.{LocalDateTime, ZoneOffset}
 
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.Path
@@ -768,6 +769,67 @@ abstract class OrcQuerySuite extends OrcQueryTest with SharedSparkSession {
       }
     }
   }
+
+  test("Read/write all timestamp types") {
+    val data = (0 to 255).map { i =>
+      (new Timestamp(i), LocalDateTime.of(2019, 3, 21, 0, 2, 3, 456000000 + i))
+    } :+ (null, null)
+
+    withOrcFile(data) { file =>
+      withAllOrcReaders {
+        checkAnswer(spark.read.orc(file), data.toDF().collect())
+      }
+    }
+  }
+
+  test("SPARK-36346: can't read TimestampLTZ as TimestampNTZ") {
+    val data = (1 to 10).map { i =>
+      val ts = new Timestamp(i)
+      Row(ts)
+    }
+    val answer = (1 to 10).map { i =>
+      // The second parameter is `nanoOfSecond`, while java.sql.Timestamp accepts milliseconds
+      // as input. So here we multiple the `nanoOfSecond` by NANOS_PER_MILLIS
+      val ts = LocalDateTime.ofEpochSecond(0, i * 1000000, ZoneOffset.UTC)
+      Row(ts)
+    }
+    val actualSchema = StructType(Seq(StructField("time", TimestampType, false)))
+    val providedSchema = StructType(Seq(StructField("time", TimestampNTZType, false)))
+
+    withTempPath { file =>
+      val df = spark.createDataFrame(sparkContext.parallelize(data), actualSchema)
+      df.write.orc(file.getCanonicalPath)
+      withAllOrcReaders {
+        val msg = intercept[SparkException] {
+          spark.read.schema(providedSchema).orc(file.getCanonicalPath).collect()
+        }.getMessage
+        assert(msg.contains("Unable to convert timestamp of Orc to data type 'timestamp_ntz'"))
+      }
+    }
+  }
+
+  test("SPARK-36346: read TimestampNTZ as TimestampLTZ") {
+    val data = (1 to 10).map { i =>
+      // The second parameter is `nanoOfSecond`, while java.sql.Timestamp accepts milliseconds
+      // as input. So here we multiple the `nanoOfSecond` by NANOS_PER_MILLIS
+      val ts = LocalDateTime.ofEpochSecond(0, i * 1000000, ZoneOffset.UTC)
+      Row(ts)
+    }
+    val answer = (1 to 10).map { i =>
+      val ts = new java.sql.Timestamp(i)
+      Row(ts)
+    }
+    val actualSchema = StructType(Seq(StructField("time", TimestampNTZType, false)))
+    val providedSchema = StructType(Seq(StructField("time", TimestampType, false)))
+
+    withTempPath { file =>
+      val df = spark.createDataFrame(sparkContext.parallelize(data), actualSchema)
+      df.write.orc(file.getCanonicalPath)
+      withAllOrcReaders {
+        checkAnswer(spark.read.schema(providedSchema).orc(file.getCanonicalPath), answer)
+      }
+    }
+  }
 }
 
 class OrcV1QuerySuite extends OrcQuerySuite {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcTest.scala
index 4243318..cd87374 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcTest.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcTest.scala
@@ -143,6 +143,13 @@ abstract class OrcTest extends QueryTest with FileBasedDataSourceTest with Befor
     spark.read.orc(file.getAbsolutePath)
   }
 
+  def withAllOrcReaders(code: => Unit): Unit = {
+    // test the row-based reader
+    withSQLConf(SQLConf.ORC_VECTORIZED_READER_ENABLED.key -> "false")(code)
+    // test the vectorized reader
+    withSQLConf(SQLConf.ORC_VECTORIZED_READER_ENABLED.key -> "true")(code)
+  }
+
   /**
    * Takes a sequence of products `data` to generate multi-level nested
    * dataframes as new test data. It tests both non-nested and nested dataframes

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org