You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by ja...@apache.org on 2023/04/11 14:33:23 UTC

[iceberg] branch master updated: Arrow: Support vectorized read of INT96 timestamp in imported data (#6962)

This is an automated email from the ASF dual-hosted git repository.

jackye pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 0123862618 Arrow: Support vectorized read of INT96 timestamp in imported data (#6962)
0123862618 is described below

commit 012386261841c1eb50cdc01d13ae146ad40ae011
Author: ChenliangLu <31...@users.noreply.github.com>
AuthorDate: Tue Apr 11 22:33:16 2023 +0800

    Arrow: Support vectorized read of INT96 timestamp in imported data (#6962)
---
 .../GenericArrowVectorAccessorFactory.java         |  30 ++++
 .../arrow/vectorized/VectorizedArrowReader.java    |  17 ++
 .../parquet/VectorizedColumnIterator.java          |  18 ++
 ...orizedDictionaryEncodedParquetValuesReader.java |  17 ++
 .../vectorized/parquet/VectorizedPageIterator.java |  30 ++++
 .../VectorizedParquetDefinitionLevelReader.java    |  51 ++++++
 .../org/apache/iceberg/parquet/ParquetUtil.java    |  16 ++
 .../iceberg/spark/data/SparkParquetReaders.java    |   9 +-
 .../iceberg/spark/data/SparkParquetReaders.java    |   9 +-
 .../iceberg/spark/data/SparkParquetReaders.java    |   9 +-
 .../iceberg/spark/data/SparkParquetReaders.java    |   9 +-
 .../source/TestIcebergSourceHadoopTables.java      |   5 +
 .../spark/source/TestIcebergSourceHiveTables.java  |  19 ++-
 .../spark/source/TestIcebergSourceTablesBase.java  | 190 ++++++++++++++-------
 14 files changed, 334 insertions(+), 95 deletions(-)

diff --git a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/GenericArrowVectorAccessorFactory.java b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/GenericArrowVectorAccessorFactory.java
index 2883b7c11b..e9305e399c 100644
--- a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/GenericArrowVectorAccessorFactory.java
+++ b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/GenericArrowVectorAccessorFactory.java
@@ -22,6 +22,7 @@ import java.lang.reflect.Array;
 import java.math.BigDecimal;
 import java.math.BigInteger;
 import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
 import java.util.function.IntFunction;
 import java.util.function.Supplier;
 import java.util.stream.IntStream;
@@ -43,6 +44,7 @@ import org.apache.arrow.vector.VarCharVector;
 import org.apache.arrow.vector.complex.ListVector;
 import org.apache.arrow.vector.complex.StructVector;
 import org.apache.arrow.vector.util.DecimalUtility;
+import org.apache.iceberg.parquet.ParquetUtil;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 import org.apache.parquet.column.ColumnDescriptor;
 import org.apache.parquet.column.Dictionary;
@@ -159,6 +161,11 @@ public class GenericArrowVectorAccessorFactory<
           return new DictionaryFloatAccessor<>((IntVector) vector, dictionary);
         case INT64:
           return new DictionaryLongAccessor<>((IntVector) vector, dictionary);
+        case INT96:
+          // Impala & Spark used to write timestamps as INT96 by default. For backwards
+          // compatibility we try to read INT96 as timestamps. But INT96 is not recommended
+          // and deprecated (see https://issues.apache.org/jira/browse/PARQUET-323)
+          return new DictionaryTimestampInt96Accessor<>((IntVector) vector, dictionary);
         case DOUBLE:
           return new DictionaryDoubleAccessor<>((IntVector) vector, dictionary);
         default:
@@ -455,6 +462,29 @@ public class GenericArrowVectorAccessorFactory<
     }
   }
 
+  private static class DictionaryTimestampInt96Accessor<
+          DecimalT, Utf8StringT, ArrayT, ChildVectorT extends AutoCloseable>
+      extends ArrowVectorAccessor<DecimalT, Utf8StringT, ArrayT, ChildVectorT> {
+    private final IntVector offsetVector;
+    private final Dictionary dictionary;
+
+    DictionaryTimestampInt96Accessor(IntVector vector, Dictionary dictionary) {
+      super(vector);
+      this.offsetVector = vector;
+      this.dictionary = dictionary;
+    }
+
+    @Override
+    public final long getLong(int rowId) {
+      ByteBuffer byteBuffer =
+          dictionary
+              .decodeToBinary(offsetVector.get(rowId))
+              .toByteBuffer()
+              .order(ByteOrder.LITTLE_ENDIAN);
+      return ParquetUtil.extractTimestampInt96(byteBuffer);
+    }
+  }
+
   private static class DateAccessor<
           DecimalT, Utf8StringT, ArrayT, ChildVectorT extends AutoCloseable>
       extends ArrowVectorAccessor<DecimalT, Utf8StringT, ArrayT, ChildVectorT> {
diff --git a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/VectorizedArrowReader.java b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/VectorizedArrowReader.java
index 0fcb35fbd1..9c52f14381 100644
--- a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/VectorizedArrowReader.java
+++ b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/VectorizedArrowReader.java
@@ -113,6 +113,7 @@ public class VectorizedArrowReader implements VectorizedReader<VectorHolder> {
     FLOAT,
     DOUBLE,
     TIMESTAMP_MILLIS,
+    TIMESTAMP_INT96,
     TIME_MICROS,
     UUID,
     DICTIONARY
@@ -175,6 +176,11 @@ public class VectorizedArrowReader implements VectorizedReader<VectorHolder> {
                 .timestampMillisBatchReader()
                 .nextBatch(vec, typeWidth, nullabilityHolder);
             break;
+          case TIMESTAMP_INT96:
+            vectorizedColumnIterator
+                .timestampInt96BatchReader()
+                .nextBatch(vec, typeWidth, nullabilityHolder);
+            break;
           case UUID:
           case FIXED_WIDTH_BINARY:
           case FIXED_LENGTH_DECIMAL:
@@ -366,6 +372,17 @@ public class VectorizedArrowReader implements VectorizedReader<VectorHolder> {
         this.readType = ReadType.INT;
         this.typeWidth = (int) IntVector.TYPE_WIDTH;
         break;
+      case INT96:
+        // Impala & Spark used to write timestamps as INT96 by default. For backwards
+        // compatibility we try to read INT96 as timestamps. But INT96 is not recommended
+        // and deprecated (see https://issues.apache.org/jira/browse/PARQUET-323)
+        int length = BigIntVector.TYPE_WIDTH;
+        this.readType = ReadType.TIMESTAMP_INT96;
+        this.vec = arrowField.createVector(rootAlloc);
+        vec.setInitialCapacity(batchSize * length);
+        vec.allocateNew();
+        this.typeWidth = length;
+        break;
       case FLOAT:
         Field floatField =
             new Field(
diff --git a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedColumnIterator.java b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedColumnIterator.java
index 55af9b5361..73e332058e 100644
--- a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedColumnIterator.java
+++ b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedColumnIterator.java
@@ -144,6 +144,20 @@ public class VectorizedColumnIterator extends BaseColumnIterator {
     }
   }
 
+  public class TimestampInt96BatchReader extends BatchReader {
+    @Override
+    protected int nextBatchOf(
+        final FieldVector vector,
+        final int expectedBatchSize,
+        final int numValsInVector,
+        final int typeWidth,
+        NullabilityHolder holder) {
+      return vectorizedPageIterator
+          .timestampInt96PageReader()
+          .nextBatch(vector, expectedBatchSize, numValsInVector, typeWidth, holder);
+    }
+  }
+
   public class FloatBatchReader extends BatchReader {
     @Override
     protected int nextBatchOf(
@@ -292,6 +306,10 @@ public class VectorizedColumnIterator extends BaseColumnIterator {
     return new TimestampMillisBatchReader();
   }
 
+  public TimestampInt96BatchReader timestampInt96BatchReader() {
+    return new TimestampInt96BatchReader();
+  }
+
   public FloatBatchReader floatBatchReader() {
     return new FloatBatchReader();
   }
diff --git a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedDictionaryEncodedParquetValuesReader.java b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedDictionaryEncodedParquetValuesReader.java
index 96071fe75c..55f1d3fd79 100644
--- a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedDictionaryEncodedParquetValuesReader.java
+++ b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedDictionaryEncodedParquetValuesReader.java
@@ -19,6 +19,7 @@
 package org.apache.iceberg.arrow.vectorized.parquet;
 
 import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
 import org.apache.arrow.vector.BaseVariableWidthVector;
 import org.apache.arrow.vector.BitVectorHelper;
 import org.apache.arrow.vector.DecimalVector;
@@ -26,6 +27,7 @@ import org.apache.arrow.vector.FieldVector;
 import org.apache.arrow.vector.FixedSizeBinaryVector;
 import org.apache.arrow.vector.IntVector;
 import org.apache.iceberg.arrow.vectorized.NullabilityHolder;
+import org.apache.iceberg.parquet.ParquetUtil;
 import org.apache.parquet.column.Dictionary;
 
 /**
@@ -105,6 +107,17 @@ public class VectorizedDictionaryEncodedParquetValuesReader
     }
   }
 
+  class TimestampInt96DictEncodedReader extends BaseDictEncodedReader {
+    @Override
+    protected void nextVal(
+        FieldVector vector, Dictionary dict, int idx, int currentVal, int typeWidth) {
+      ByteBuffer buffer =
+          dict.decodeToBinary(currentVal).toByteBuffer().order(ByteOrder.LITTLE_ENDIAN);
+      long timestampInt96 = ParquetUtil.extractTimestampInt96(buffer);
+      vector.getDataBuffer().setLong(idx, timestampInt96);
+    }
+  }
+
   class IntegerDictEncodedReader extends BaseDictEncodedReader {
     @Override
     protected void nextVal(
@@ -200,6 +213,10 @@ public class VectorizedDictionaryEncodedParquetValuesReader
     return new TimestampMillisDictEncodedReader();
   }
 
+  public TimestampInt96DictEncodedReader timestampInt96DictEncodedReader() {
+    return new TimestampInt96DictEncodedReader();
+  }
+
   public IntegerDictEncodedReader integerDictEncodedReader() {
     return new IntegerDictEncodedReader();
   }
diff --git a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedPageIterator.java b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedPageIterator.java
index f6b27c852f..0f5b297711 100644
--- a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedPageIterator.java
+++ b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedPageIterator.java
@@ -283,6 +283,32 @@ public class VectorizedPageIterator extends BasePageIterator {
     }
   }
 
+  /** Method for reading a batch of values of TimestampInt96 data type. */
+  class TimestampInt96PageReader extends BagePageReader {
+    @Override
+    protected void nextVal(
+        FieldVector vector, int batchSize, int numVals, int typeWidth, NullabilityHolder holder) {
+      vectorizedDefinitionLevelReader
+          .timestampInt96Reader()
+          .nextBatch(vector, numVals, typeWidth, batchSize, holder, plainValuesReader);
+    }
+
+    @Override
+    protected void nextDictEncodedVal(
+        FieldVector vector, int batchSize, int numVals, int typeWidth, NullabilityHolder holder) {
+      vectorizedDefinitionLevelReader
+          .timestampInt96Reader()
+          .nextDictEncodedBatch(
+              vector,
+              numVals,
+              typeWidth,
+              batchSize,
+              holder,
+              dictionaryEncodedValuesReader,
+              dictionary);
+    }
+  }
+
   /** Method for reading a batch of values of FLOAT data type. */
   class FloatPageReader extends BagePageReader {
 
@@ -547,6 +573,10 @@ public class VectorizedPageIterator extends BasePageIterator {
     return new TimestampMillisPageReader();
   }
 
+  TimestampInt96PageReader timestampInt96PageReader() {
+    return new TimestampInt96PageReader();
+  }
+
   FloatPageReader floatPageReader() {
     return new FloatPageReader();
   }
diff --git a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedParquetDefinitionLevelReader.java b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedParquetDefinitionLevelReader.java
index a036aee9e6..20d7f80497 100644
--- a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedParquetDefinitionLevelReader.java
+++ b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedParquetDefinitionLevelReader.java
@@ -19,6 +19,7 @@
 package org.apache.iceberg.arrow.vectorized.parquet;
 
 import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
 import org.apache.arrow.memory.ArrowBuf;
 import org.apache.arrow.vector.BaseVariableWidthVector;
 import org.apache.arrow.vector.BitVector;
@@ -29,6 +30,7 @@ import org.apache.arrow.vector.FixedSizeBinaryVector;
 import org.apache.arrow.vector.IntVector;
 import org.apache.arrow.vector.VarBinaryVector;
 import org.apache.iceberg.arrow.vectorized.NullabilityHolder;
+import org.apache.iceberg.parquet.ParquetUtil;
 import org.apache.iceberg.parquet.ValuesAsBytesReader;
 import org.apache.parquet.column.Dictionary;
 
@@ -447,6 +449,51 @@ public final class VectorizedParquetDefinitionLevelReader
     }
   }
 
+  class TimestampInt96Reader extends BaseReader {
+    @Override
+    protected void nextVal(
+        FieldVector vector,
+        int idx,
+        ValuesAsBytesReader valuesReader,
+        int typeWidth,
+        byte[] byteArray) {
+      // 8 bytes (time of day nanos) + 4 bytes(julianDay) = 12 bytes
+      ByteBuffer buffer = valuesReader.getBuffer(12).order(ByteOrder.LITTLE_ENDIAN);
+      long timestampInt96 = ParquetUtil.extractTimestampInt96(buffer);
+      vector.getDataBuffer().setLong((long) idx * typeWidth, timestampInt96);
+    }
+
+    @Override
+    protected void nextDictEncodedVal(
+        FieldVector vector,
+        int idx,
+        VectorizedDictionaryEncodedParquetValuesReader reader,
+        int numValuesToRead,
+        Dictionary dict,
+        NullabilityHolder nullabilityHolder,
+        int typeWidth,
+        Mode mode) {
+      switch (mode) {
+        case RLE:
+          reader
+              .timestampInt96DictEncodedReader()
+              .nextBatch(vector, idx, numValuesToRead, dict, nullabilityHolder, typeWidth);
+          break;
+        case PACKED:
+          ByteBuffer buffer =
+              dict.decodeToBinary(reader.readInteger())
+                  .toByteBuffer()
+                  .order(ByteOrder.LITTLE_ENDIAN);
+          long timestampInt96 = ParquetUtil.extractTimestampInt96(buffer);
+          vector.getDataBuffer().setLong(idx, timestampInt96);
+          break;
+        default:
+          throw new UnsupportedOperationException(
+              "Unsupported mode for timestamp int96 reader: " + mode);
+      }
+    }
+  }
+
   class FixedWidthBinaryReader extends BaseReader {
     @Override
     protected void nextVal(
@@ -777,6 +824,10 @@ public final class VectorizedParquetDefinitionLevelReader
     return new TimestampMillisReader();
   }
 
+  TimestampInt96Reader timestampInt96Reader() {
+    return new TimestampInt96Reader();
+  }
+
   FixedWidthBinaryReader fixedWidthBinaryReader() {
     return new FixedWidthBinaryReader();
   }
diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetUtil.java b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetUtil.java
index b6d57d073e..a879fc5f51 100644
--- a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetUtil.java
+++ b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetUtil.java
@@ -25,6 +25,7 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.TimeUnit;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
@@ -68,6 +69,8 @@ public class ParquetUtil {
   // not meant to be instantiated
   private ParquetUtil() {}
 
+  private static final long UNIX_EPOCH_JULIAN = 2_440_588L;
+
   public static Metrics fileMetrics(InputFile file, MetricsConfig metricsConfig) {
     return fileMetrics(file, metricsConfig, null);
   }
@@ -403,4 +406,17 @@ public class ParquetUtil {
     }
     return primitiveType.getPrimitiveTypeName() == PrimitiveType.PrimitiveTypeName.INT32;
   }
+
+  /**
+   * Method to read timestamp (parquet Int96) from bytebuffer. Read 12 bytes in byteBuffer: 8 bytes
+   * (time of day nanos) + 4 bytes(julianDay)
+   */
+  public static long extractTimestampInt96(ByteBuffer buffer) {
+    // 8 bytes (time of day nanos)
+    long timeOfDayNanos = buffer.getLong();
+    // 4 bytes(julianDay)
+    int julianDay = buffer.getInt();
+    return TimeUnit.DAYS.toMicros(julianDay - UNIX_EPOCH_JULIAN)
+        + TimeUnit.NANOSECONDS.toMicros(timeOfDayNanos);
+  }
 }
diff --git a/spark/v2.4/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetReaders.java b/spark/v2.4/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetReaders.java
index 836ac46d19..bba68684a3 100644
--- a/spark/v2.4/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetReaders.java
+++ b/spark/v2.4/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetReaders.java
@@ -25,10 +25,10 @@ import java.nio.ByteOrder;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.TimeUnit;
 import org.apache.iceberg.MetadataColumns;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.parquet.ParquetSchemaUtil;
+import org.apache.iceberg.parquet.ParquetUtil;
 import org.apache.iceberg.parquet.ParquetValueReader;
 import org.apache.iceberg.parquet.ParquetValueReaders;
 import org.apache.iceberg.parquet.ParquetValueReaders.FloatAsDoubleReader;
@@ -377,7 +377,6 @@ public class SparkParquetReaders {
   }
 
   private static class TimestampInt96Reader extends UnboxedReader<Long> {
-    private static final long UNIX_EPOCH_JULIAN = 2_440_588L;
 
     TimestampInt96Reader(ColumnDescriptor desc) {
       super(desc);
@@ -392,11 +391,7 @@ public class SparkParquetReaders {
     public long readLong() {
       final ByteBuffer byteBuffer =
           column.nextBinary().toByteBuffer().order(ByteOrder.LITTLE_ENDIAN);
-      final long timeOfDayNanos = byteBuffer.getLong();
-      final int julianDay = byteBuffer.getInt();
-
-      return TimeUnit.DAYS.toMicros(julianDay - UNIX_EPOCH_JULIAN)
-          + TimeUnit.NANOSECONDS.toMicros(timeOfDayNanos);
+      return ParquetUtil.extractTimestampInt96(byteBuffer);
     }
   }
 
diff --git a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetReaders.java b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetReaders.java
index 5dfb51e14b..940e7934ec 100644
--- a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetReaders.java
+++ b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetReaders.java
@@ -25,10 +25,10 @@ import java.nio.ByteOrder;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.TimeUnit;
 import org.apache.iceberg.MetadataColumns;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.parquet.ParquetSchemaUtil;
+import org.apache.iceberg.parquet.ParquetUtil;
 import org.apache.iceberg.parquet.ParquetValueReader;
 import org.apache.iceberg.parquet.ParquetValueReaders;
 import org.apache.iceberg.parquet.ParquetValueReaders.FloatAsDoubleReader;
@@ -378,7 +378,6 @@ public class SparkParquetReaders {
   }
 
   private static class TimestampInt96Reader extends UnboxedReader<Long> {
-    private static final long UNIX_EPOCH_JULIAN = 2_440_588L;
 
     TimestampInt96Reader(ColumnDescriptor desc) {
       super(desc);
@@ -393,11 +392,7 @@ public class SparkParquetReaders {
     public long readLong() {
       final ByteBuffer byteBuffer =
           column.nextBinary().toByteBuffer().order(ByteOrder.LITTLE_ENDIAN);
-      final long timeOfDayNanos = byteBuffer.getLong();
-      final int julianDay = byteBuffer.getInt();
-
-      return TimeUnit.DAYS.toMicros(julianDay - UNIX_EPOCH_JULIAN)
-          + TimeUnit.NANOSECONDS.toMicros(timeOfDayNanos);
+      return ParquetUtil.extractTimestampInt96(byteBuffer);
     }
   }
 
diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetReaders.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetReaders.java
index 76e780f7a0..59f81de6ae 100644
--- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetReaders.java
+++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetReaders.java
@@ -25,10 +25,10 @@ import java.nio.ByteOrder;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.TimeUnit;
 import org.apache.iceberg.MetadataColumns;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.parquet.ParquetSchemaUtil;
+import org.apache.iceberg.parquet.ParquetUtil;
 import org.apache.iceberg.parquet.ParquetValueReader;
 import org.apache.iceberg.parquet.ParquetValueReaders;
 import org.apache.iceberg.parquet.ParquetValueReaders.FloatAsDoubleReader;
@@ -377,7 +377,6 @@ public class SparkParquetReaders {
   }
 
   private static class TimestampInt96Reader extends UnboxedReader<Long> {
-    private static final long UNIX_EPOCH_JULIAN = 2_440_588L;
 
     TimestampInt96Reader(ColumnDescriptor desc) {
       super(desc);
@@ -392,11 +391,7 @@ public class SparkParquetReaders {
     public long readLong() {
       final ByteBuffer byteBuffer =
           column.nextBinary().toByteBuffer().order(ByteOrder.LITTLE_ENDIAN);
-      final long timeOfDayNanos = byteBuffer.getLong();
-      final int julianDay = byteBuffer.getInt();
-
-      return TimeUnit.DAYS.toMicros(julianDay - UNIX_EPOCH_JULIAN)
-          + TimeUnit.NANOSECONDS.toMicros(timeOfDayNanos);
+      return ParquetUtil.extractTimestampInt96(byteBuffer);
     }
   }
 
diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetReaders.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetReaders.java
index 76e780f7a0..59f81de6ae 100644
--- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetReaders.java
+++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetReaders.java
@@ -25,10 +25,10 @@ import java.nio.ByteOrder;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.TimeUnit;
 import org.apache.iceberg.MetadataColumns;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.parquet.ParquetSchemaUtil;
+import org.apache.iceberg.parquet.ParquetUtil;
 import org.apache.iceberg.parquet.ParquetValueReader;
 import org.apache.iceberg.parquet.ParquetValueReaders;
 import org.apache.iceberg.parquet.ParquetValueReaders.FloatAsDoubleReader;
@@ -377,7 +377,6 @@ public class SparkParquetReaders {
   }
 
   private static class TimestampInt96Reader extends UnboxedReader<Long> {
-    private static final long UNIX_EPOCH_JULIAN = 2_440_588L;
 
     TimestampInt96Reader(ColumnDescriptor desc) {
       super(desc);
@@ -392,11 +391,7 @@ public class SparkParquetReaders {
     public long readLong() {
       final ByteBuffer byteBuffer =
           column.nextBinary().toByteBuffer().order(ByteOrder.LITTLE_ENDIAN);
-      final long timeOfDayNanos = byteBuffer.getLong();
-      final int julianDay = byteBuffer.getInt();
-
-      return TimeUnit.DAYS.toMicros(julianDay - UNIX_EPOCH_JULIAN)
-          + TimeUnit.NANOSECONDS.toMicros(timeOfDayNanos);
+      return ParquetUtil.extractTimestampInt96(byteBuffer);
     }
   }
 
diff --git a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceHadoopTables.java b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceHadoopTables.java
index b55ba0e219..9bd7220b90 100644
--- a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceHadoopTables.java
+++ b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceHadoopTables.java
@@ -50,6 +50,11 @@ public class TestIcebergSourceHadoopTables extends TestIcebergSourceTablesBase {
     return TABLES.create(schema, spec, tableLocation);
   }
 
+  @Override
+  public void dropTable(TableIdentifier ident) {
+    TABLES.dropTable(tableLocation);
+  }
+
   @Override
   public Table loadTable(TableIdentifier ident, String entriesSuffix) {
     return TABLES.load(loadLocation(ident, entriesSuffix));
diff --git a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceHiveTables.java b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceHiveTables.java
index de26f5f82c..6292a2c1a8 100644
--- a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceHiveTables.java
+++ b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceHiveTables.java
@@ -43,11 +43,11 @@ public class TestIcebergSourceHiveTables extends TestIcebergSourceTablesBase {
 
   @After
   public void dropTable() throws IOException {
-    Table table = catalog.loadTable(currentIdentifier);
-    Path tablePath = new Path(table.location());
-    FileSystem fs = tablePath.getFileSystem(spark.sessionState().newHadoopConf());
-    fs.delete(tablePath, true);
-    catalog.dropTable(currentIdentifier, false);
+    if (!catalog.tableExists(currentIdentifier)) {
+      return;
+    }
+
+    dropTable(currentIdentifier);
   }
 
   @Override
@@ -56,6 +56,15 @@ public class TestIcebergSourceHiveTables extends TestIcebergSourceTablesBase {
     return TestIcebergSourceHiveTables.catalog.createTable(ident, schema, spec);
   }
 
+  @Override
+  public void dropTable(TableIdentifier ident) throws IOException {
+    Table table = catalog.loadTable(ident);
+    Path tablePath = new Path(table.location());
+    FileSystem fs = tablePath.getFileSystem(spark.sessionState().newHadoopConf());
+    fs.delete(tablePath, true);
+    catalog.dropTable(ident, false);
+  }
+
   @Override
   public Table loadTable(TableIdentifier ident, String entriesSuffix) {
     TableIdentifier identifier =
diff --git a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java
index 123c06b7e5..fc023cdfdb 100644
--- a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java
+++ b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java
@@ -23,8 +23,11 @@ import static org.apache.iceberg.ManifestContent.DELETES;
 import static org.apache.iceberg.types.Types.NestedField.optional;
 import static org.apache.iceberg.types.Types.NestedField.required;
 
+import java.io.File;
 import java.io.IOException;
 import java.io.UncheckedIOException;
+import java.time.LocalDateTime;
+import java.time.ZoneOffset;
 import java.util.Comparator;
 import java.util.List;
 import java.util.StringJoiner;
@@ -72,13 +75,18 @@ import org.apache.iceberg.spark.data.TestHelpers;
 import org.apache.iceberg.types.Types;
 import org.apache.iceberg.util.Pair;
 import org.apache.spark.SparkException;
+import org.apache.spark.sql.Column;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Encoders;
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.RowFactory;
 import org.apache.spark.sql.SaveMode;
 import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.functions;
+import org.apache.spark.sql.internal.SQLConf;
 import org.apache.spark.sql.types.StructType;
+import org.assertj.core.api.Assertions;
+import org.junit.After;
 import org.junit.Assert;
 import org.junit.Rule;
 import org.junit.Test;
@@ -113,6 +121,13 @@ public abstract class TestIcebergSourceTablesBase extends SparkTestBase {
 
   public abstract String loadLocation(TableIdentifier ident);
 
+  public abstract void dropTable(TableIdentifier ident) throws IOException;
+
+  @After
+  public void removeTable() {
+    spark.sql("DROP TABLE IF EXISTS parquet_table");
+  }
+
   @Test
   public synchronized void testTablesSupport() {
     TableIdentifier tableIdentifier = TableIdentifier.of("db", "table");
@@ -472,8 +487,6 @@ public abstract class TestIcebergSourceTablesBase extends SparkTestBase {
 
   @Test
   public void testFilesTableWithSnapshotIdInheritance() throws Exception {
-    spark.sql("DROP TABLE IF EXISTS parquet_table");
-
     TableIdentifier tableIdentifier = TableIdentifier.of("db", "files_inheritance_test");
     Table table = createTable(tableIdentifier, SCHEMA, SPEC);
     table.updateProperties().set(TableProperties.SNAPSHOT_ID_INHERITANCE_ENABLED, "true").commit();
@@ -496,45 +509,39 @@ public abstract class TestIcebergSourceTablesBase extends SparkTestBase {
 
     table.updateProperties().set(TableProperties.DEFAULT_NAME_MAPPING, mappingJson).commit();
 
-    try {
-      String stagingLocation = table.location() + "/metadata";
-      SparkTableUtil.importSparkTable(
-          spark,
-          new org.apache.spark.sql.catalyst.TableIdentifier("parquet_table"),
-          table,
-          stagingLocation);
-
-      Dataset<Row> filesTableDs =
-          spark.read().format("iceberg").load(loadLocation(tableIdentifier, "files"));
-      List<Row> actual = TestHelpers.selectNonDerived(filesTableDs).collectAsList();
-
-      List<GenericData.Record> expected = Lists.newArrayList();
-      for (ManifestFile manifest : table.currentSnapshot().dataManifests(table.io())) {
-        InputFile in = table.io().newInputFile(manifest.path());
-        try (CloseableIterable<GenericData.Record> rows =
-            Avro.read(in).project(entriesTable.schema()).build()) {
-          for (GenericData.Record record : rows) {
-            GenericData.Record file = (GenericData.Record) record.get("data_file");
-            TestHelpers.asMetadataRecord(file);
-            expected.add(file);
-          }
+    String stagingLocation = table.location() + "/metadata";
+    SparkTableUtil.importSparkTable(
+        spark,
+        new org.apache.spark.sql.catalyst.TableIdentifier("parquet_table"),
+        table,
+        stagingLocation);
+
+    Dataset<Row> filesTableDs =
+        spark.read().format("iceberg").load(loadLocation(tableIdentifier, "files"));
+    List<Row> actual = TestHelpers.selectNonDerived(filesTableDs).collectAsList();
+
+    List<GenericData.Record> expected = Lists.newArrayList();
+    for (ManifestFile manifest : table.currentSnapshot().dataManifests(table.io())) {
+      InputFile in = table.io().newInputFile(manifest.path());
+      try (CloseableIterable<GenericData.Record> rows =
+          Avro.read(in).project(entriesTable.schema()).build()) {
+        for (GenericData.Record record : rows) {
+          GenericData.Record file = (GenericData.Record) record.get("data_file");
+          TestHelpers.asMetadataRecord(file);
+          expected.add(file);
         }
       }
-
-      Types.StructType struct = TestHelpers.nonDerivedSchema(filesTableDs);
-      Assert.assertEquals("Files table should have one row", 2, expected.size());
-      Assert.assertEquals("Actual results should have one row", 2, actual.size());
-      TestHelpers.assertEqualsSafe(struct, expected.get(0), actual.get(0));
-      TestHelpers.assertEqualsSafe(struct, expected.get(1), actual.get(1));
-    } finally {
-      spark.sql("DROP TABLE parquet_table");
     }
+
+    Types.StructType struct = TestHelpers.nonDerivedSchema(filesTableDs);
+    Assert.assertEquals("Files table should have one row", 2, expected.size());
+    Assert.assertEquals("Actual results should have one row", 2, actual.size());
+    TestHelpers.assertEqualsSafe(struct, expected.get(0), actual.get(0));
+    TestHelpers.assertEqualsSafe(struct, expected.get(1), actual.get(1));
   }
 
   @Test
   public void testEntriesTableWithSnapshotIdInheritance() throws Exception {
-    spark.sql("DROP TABLE IF EXISTS parquet_table");
-
     TableIdentifier tableIdentifier = TableIdentifier.of("db", "entries_inheritance_test");
     PartitionSpec spec = SPEC;
     Table table = createTable(tableIdentifier, SCHEMA, spec);
@@ -553,34 +560,30 @@ public abstract class TestIcebergSourceTablesBase extends SparkTestBase {
     Dataset<Row> inputDF = spark.createDataFrame(records, SimpleRecord.class);
     inputDF.select("data", "id").write().mode("overwrite").insertInto("parquet_table");
 
-    try {
-      String stagingLocation = table.location() + "/metadata";
-      SparkTableUtil.importSparkTable(
-          spark,
-          new org.apache.spark.sql.catalyst.TableIdentifier("parquet_table"),
-          table,
-          stagingLocation);
-
-      List<Row> actual =
-          spark
-              .read()
-              .format("iceberg")
-              .load(loadLocation(tableIdentifier, "entries"))
-              .select("sequence_number", "snapshot_id", "data_file")
-              .collectAsList();
-
-      table.refresh();
-
-      long snapshotId = table.currentSnapshot().snapshotId();
-
-      Assert.assertEquals("Entries table should have 2 rows", 2, actual.size());
-      Assert.assertEquals("Sequence number must match", 0, actual.get(0).getLong(0));
-      Assert.assertEquals("Snapshot id must match", snapshotId, actual.get(0).getLong(1));
-      Assert.assertEquals("Sequence number must match", 0, actual.get(1).getLong(0));
-      Assert.assertEquals("Snapshot id must match", snapshotId, actual.get(1).getLong(1));
-    } finally {
-      spark.sql("DROP TABLE parquet_table");
-    }
+    String stagingLocation = table.location() + "/metadata";
+    SparkTableUtil.importSparkTable(
+        spark,
+        new org.apache.spark.sql.catalyst.TableIdentifier("parquet_table"),
+        table,
+        stagingLocation);
+
+    List<Row> actual =
+        spark
+            .read()
+            .format("iceberg")
+            .load(loadLocation(tableIdentifier, "entries"))
+            .select("sequence_number", "snapshot_id", "data_file")
+            .collectAsList();
+
+    table.refresh();
+
+    long snapshotId = table.currentSnapshot().snapshotId();
+
+    Assert.assertEquals("Entries table should have 2 rows", 2, actual.size());
+    Assert.assertEquals("Sequence number must match", 0, actual.get(0).getLong(0));
+    Assert.assertEquals("Snapshot id must match", snapshotId, actual.get(0).getLong(1));
+    Assert.assertEquals("Sequence number must match", 0, actual.get(1).getLong(0));
+    Assert.assertEquals("Snapshot id must match", snapshotId, actual.get(1).getLong(1));
   }
 
   @Test
@@ -1766,6 +1769,69 @@ public abstract class TestIcebergSourceTablesBase extends SparkTestBase {
     }
   }
 
+  @Test
+  public void testTableWithInt96Timestamp() throws IOException {
+    File parquetTableDir = temp.newFolder("table_timestamp_int96");
+    String parquetTableLocation = parquetTableDir.toURI().toString();
+    Schema schema =
+        new Schema(
+            optional(1, "id", Types.LongType.get()),
+            optional(2, "tmp_col", Types.TimestampType.withZone()));
+    spark.conf().set(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE().key(), "INT96");
+
+    LocalDateTime start = LocalDateTime.of(2000, 1, 31, 0, 0, 0);
+    LocalDateTime end = LocalDateTime.of(2100, 1, 1, 0, 0, 0);
+    long startSec = start.toEpochSecond(ZoneOffset.UTC);
+    long endSec = end.toEpochSecond(ZoneOffset.UTC);
+    Column idColumn = functions.expr("id");
+    Column secondsColumn =
+        functions.expr("(id % " + (endSec - startSec) + " + " + startSec + ")").as("seconds");
+    Column timestampColumn = functions.expr("cast( seconds as timestamp) as tmp_col");
+
+    for (Boolean useDict : new Boolean[] {true, false}) {
+      for (Boolean useVectorization : new Boolean[] {true, false}) {
+        spark.sql("DROP TABLE IF EXISTS parquet_table");
+        spark
+            .range(0, 5000, 100, 1)
+            .select(idColumn, secondsColumn)
+            .select(idColumn, timestampColumn)
+            .write()
+            .format("parquet")
+            .option("parquet.enable.dictionary", useDict)
+            .mode("overwrite")
+            .option("path", parquetTableLocation)
+            .saveAsTable("parquet_table");
+        TableIdentifier tableIdentifier = TableIdentifier.of("db", "table_with_timestamp_int96");
+        Table table = createTable(tableIdentifier, schema, PartitionSpec.unpartitioned());
+        table
+            .updateProperties()
+            .set(TableProperties.PARQUET_VECTORIZATION_ENABLED, useVectorization.toString())
+            .commit();
+
+        String stagingLocation = table.location() + "/metadata";
+        SparkTableUtil.importSparkTable(
+            spark,
+            new org.apache.spark.sql.catalyst.TableIdentifier("parquet_table"),
+            table,
+            stagingLocation);
+
+        // validate we get the expected results back
+        List<Row> expected = spark.table("parquet_table").select("tmp_col").collectAsList();
+        List<Row> actual =
+            spark
+                .read()
+                .format("iceberg")
+                .load(loadLocation(tableIdentifier))
+                .select("tmp_col")
+                .collectAsList();
+        Assertions.assertThat(actual)
+            .as("Rows must match")
+            .containsExactlyInAnyOrderElementsOf(expected);
+        dropTable(tableIdentifier);
+      }
+    }
+  }
+
   private GenericData.Record manifestRecord(
       Table manifestTable, Long referenceSnapshotId, ManifestFile manifest) {
     GenericRecordBuilder builder =