You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by ja...@apache.org on 2023/04/11 14:33:23 UTC
[iceberg] branch master updated: Arrow: Support vectorized read of INT96 timestamp in imported data (#6962)
This is an automated email from the ASF dual-hosted git repository.
jackye pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new 0123862618 Arrow: Support vectorized read of INT96 timestamp in imported data (#6962)
0123862618 is described below
commit 012386261841c1eb50cdc01d13ae146ad40ae011
Author: ChenliangLu <31...@users.noreply.github.com>
AuthorDate: Tue Apr 11 22:33:16 2023 +0800
Arrow: Support vectorized read of INT96 timestamp in imported data (#6962)
---
.../GenericArrowVectorAccessorFactory.java | 30 ++++
.../arrow/vectorized/VectorizedArrowReader.java | 17 ++
.../parquet/VectorizedColumnIterator.java | 18 ++
...orizedDictionaryEncodedParquetValuesReader.java | 17 ++
.../vectorized/parquet/VectorizedPageIterator.java | 30 ++++
.../VectorizedParquetDefinitionLevelReader.java | 51 ++++++
.../org/apache/iceberg/parquet/ParquetUtil.java | 16 ++
.../iceberg/spark/data/SparkParquetReaders.java | 9 +-
.../iceberg/spark/data/SparkParquetReaders.java | 9 +-
.../iceberg/spark/data/SparkParquetReaders.java | 9 +-
.../iceberg/spark/data/SparkParquetReaders.java | 9 +-
.../source/TestIcebergSourceHadoopTables.java | 5 +
.../spark/source/TestIcebergSourceHiveTables.java | 19 ++-
.../spark/source/TestIcebergSourceTablesBase.java | 190 ++++++++++++++-------
14 files changed, 334 insertions(+), 95 deletions(-)
diff --git a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/GenericArrowVectorAccessorFactory.java b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/GenericArrowVectorAccessorFactory.java
index 2883b7c11b..e9305e399c 100644
--- a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/GenericArrowVectorAccessorFactory.java
+++ b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/GenericArrowVectorAccessorFactory.java
@@ -22,6 +22,7 @@ import java.lang.reflect.Array;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
import java.util.function.IntFunction;
import java.util.function.Supplier;
import java.util.stream.IntStream;
@@ -43,6 +44,7 @@ import org.apache.arrow.vector.VarCharVector;
import org.apache.arrow.vector.complex.ListVector;
import org.apache.arrow.vector.complex.StructVector;
import org.apache.arrow.vector.util.DecimalUtility;
+import org.apache.iceberg.parquet.ParquetUtil;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.parquet.column.ColumnDescriptor;
import org.apache.parquet.column.Dictionary;
@@ -159,6 +161,11 @@ public class GenericArrowVectorAccessorFactory<
return new DictionaryFloatAccessor<>((IntVector) vector, dictionary);
case INT64:
return new DictionaryLongAccessor<>((IntVector) vector, dictionary);
+ case INT96:
+ // Impala & Spark used to write timestamps as INT96 by default. For backwards
+ // compatibility we try to read INT96 as timestamps. But INT96 is not recommended
+ // and deprecated (see https://issues.apache.org/jira/browse/PARQUET-323)
+ return new DictionaryTimestampInt96Accessor<>((IntVector) vector, dictionary);
case DOUBLE:
return new DictionaryDoubleAccessor<>((IntVector) vector, dictionary);
default:
@@ -455,6 +462,29 @@ public class GenericArrowVectorAccessorFactory<
}
}
+ private static class DictionaryTimestampInt96Accessor<
+ DecimalT, Utf8StringT, ArrayT, ChildVectorT extends AutoCloseable>
+ extends ArrowVectorAccessor<DecimalT, Utf8StringT, ArrayT, ChildVectorT> {
+ private final IntVector offsetVector;
+ private final Dictionary dictionary;
+
+ DictionaryTimestampInt96Accessor(IntVector vector, Dictionary dictionary) {
+ super(vector);
+ this.offsetVector = vector;
+ this.dictionary = dictionary;
+ }
+
+ @Override
+ public final long getLong(int rowId) {
+ ByteBuffer byteBuffer =
+ dictionary
+ .decodeToBinary(offsetVector.get(rowId))
+ .toByteBuffer()
+ .order(ByteOrder.LITTLE_ENDIAN);
+ return ParquetUtil.extractTimestampInt96(byteBuffer);
+ }
+ }
+
private static class DateAccessor<
DecimalT, Utf8StringT, ArrayT, ChildVectorT extends AutoCloseable>
extends ArrowVectorAccessor<DecimalT, Utf8StringT, ArrayT, ChildVectorT> {
diff --git a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/VectorizedArrowReader.java b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/VectorizedArrowReader.java
index 0fcb35fbd1..9c52f14381 100644
--- a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/VectorizedArrowReader.java
+++ b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/VectorizedArrowReader.java
@@ -113,6 +113,7 @@ public class VectorizedArrowReader implements VectorizedReader<VectorHolder> {
FLOAT,
DOUBLE,
TIMESTAMP_MILLIS,
+ TIMESTAMP_INT96,
TIME_MICROS,
UUID,
DICTIONARY
@@ -175,6 +176,11 @@ public class VectorizedArrowReader implements VectorizedReader<VectorHolder> {
.timestampMillisBatchReader()
.nextBatch(vec, typeWidth, nullabilityHolder);
break;
+ case TIMESTAMP_INT96:
+ vectorizedColumnIterator
+ .timestampInt96BatchReader()
+ .nextBatch(vec, typeWidth, nullabilityHolder);
+ break;
case UUID:
case FIXED_WIDTH_BINARY:
case FIXED_LENGTH_DECIMAL:
@@ -366,6 +372,17 @@ public class VectorizedArrowReader implements VectorizedReader<VectorHolder> {
this.readType = ReadType.INT;
this.typeWidth = (int) IntVector.TYPE_WIDTH;
break;
+ case INT96:
+ // Impala & Spark used to write timestamps as INT96 by default. For backwards
+ // compatibility we try to read INT96 as timestamps. But INT96 is not recommended
+ // and deprecated (see https://issues.apache.org/jira/browse/PARQUET-323)
+ int length = BigIntVector.TYPE_WIDTH;
+ this.readType = ReadType.TIMESTAMP_INT96;
+ this.vec = arrowField.createVector(rootAlloc);
+ vec.setInitialCapacity(batchSize * length);
+ vec.allocateNew();
+ this.typeWidth = length;
+ break;
case FLOAT:
Field floatField =
new Field(
diff --git a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedColumnIterator.java b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedColumnIterator.java
index 55af9b5361..73e332058e 100644
--- a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedColumnIterator.java
+++ b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedColumnIterator.java
@@ -144,6 +144,20 @@ public class VectorizedColumnIterator extends BaseColumnIterator {
}
}
+ public class TimestampInt96BatchReader extends BatchReader {
+ @Override
+ protected int nextBatchOf(
+ final FieldVector vector,
+ final int expectedBatchSize,
+ final int numValsInVector,
+ final int typeWidth,
+ NullabilityHolder holder) {
+ return vectorizedPageIterator
+ .timestampInt96PageReader()
+ .nextBatch(vector, expectedBatchSize, numValsInVector, typeWidth, holder);
+ }
+ }
+
public class FloatBatchReader extends BatchReader {
@Override
protected int nextBatchOf(
@@ -292,6 +306,10 @@ public class VectorizedColumnIterator extends BaseColumnIterator {
return new TimestampMillisBatchReader();
}
+ public TimestampInt96BatchReader timestampInt96BatchReader() {
+ return new TimestampInt96BatchReader();
+ }
+
public FloatBatchReader floatBatchReader() {
return new FloatBatchReader();
}
diff --git a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedDictionaryEncodedParquetValuesReader.java b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedDictionaryEncodedParquetValuesReader.java
index 96071fe75c..55f1d3fd79 100644
--- a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedDictionaryEncodedParquetValuesReader.java
+++ b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedDictionaryEncodedParquetValuesReader.java
@@ -19,6 +19,7 @@
package org.apache.iceberg.arrow.vectorized.parquet;
import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
import org.apache.arrow.vector.BaseVariableWidthVector;
import org.apache.arrow.vector.BitVectorHelper;
import org.apache.arrow.vector.DecimalVector;
@@ -26,6 +27,7 @@ import org.apache.arrow.vector.FieldVector;
import org.apache.arrow.vector.FixedSizeBinaryVector;
import org.apache.arrow.vector.IntVector;
import org.apache.iceberg.arrow.vectorized.NullabilityHolder;
+import org.apache.iceberg.parquet.ParquetUtil;
import org.apache.parquet.column.Dictionary;
/**
@@ -105,6 +107,17 @@ public class VectorizedDictionaryEncodedParquetValuesReader
}
}
+ class TimestampInt96DictEncodedReader extends BaseDictEncodedReader {
+ @Override
+ protected void nextVal(
+ FieldVector vector, Dictionary dict, int idx, int currentVal, int typeWidth) {
+ ByteBuffer buffer =
+ dict.decodeToBinary(currentVal).toByteBuffer().order(ByteOrder.LITTLE_ENDIAN);
+ long timestampInt96 = ParquetUtil.extractTimestampInt96(buffer);
+ vector.getDataBuffer().setLong(idx, timestampInt96);
+ }
+ }
+
class IntegerDictEncodedReader extends BaseDictEncodedReader {
@Override
protected void nextVal(
@@ -200,6 +213,10 @@ public class VectorizedDictionaryEncodedParquetValuesReader
return new TimestampMillisDictEncodedReader();
}
+ public TimestampInt96DictEncodedReader timestampInt96DictEncodedReader() {
+ return new TimestampInt96DictEncodedReader();
+ }
+
public IntegerDictEncodedReader integerDictEncodedReader() {
return new IntegerDictEncodedReader();
}
diff --git a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedPageIterator.java b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedPageIterator.java
index f6b27c852f..0f5b297711 100644
--- a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedPageIterator.java
+++ b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedPageIterator.java
@@ -283,6 +283,32 @@ public class VectorizedPageIterator extends BasePageIterator {
}
}
+ /** Method for reading a batch of values of TimestampInt96 data type. */
+ class TimestampInt96PageReader extends BagePageReader {
+ @Override
+ protected void nextVal(
+ FieldVector vector, int batchSize, int numVals, int typeWidth, NullabilityHolder holder) {
+ vectorizedDefinitionLevelReader
+ .timestampInt96Reader()
+ .nextBatch(vector, numVals, typeWidth, batchSize, holder, plainValuesReader);
+ }
+
+ @Override
+ protected void nextDictEncodedVal(
+ FieldVector vector, int batchSize, int numVals, int typeWidth, NullabilityHolder holder) {
+ vectorizedDefinitionLevelReader
+ .timestampInt96Reader()
+ .nextDictEncodedBatch(
+ vector,
+ numVals,
+ typeWidth,
+ batchSize,
+ holder,
+ dictionaryEncodedValuesReader,
+ dictionary);
+ }
+ }
+
/** Method for reading a batch of values of FLOAT data type. */
class FloatPageReader extends BagePageReader {
@@ -547,6 +573,10 @@ public class VectorizedPageIterator extends BasePageIterator {
return new TimestampMillisPageReader();
}
+ TimestampInt96PageReader timestampInt96PageReader() {
+ return new TimestampInt96PageReader();
+ }
+
FloatPageReader floatPageReader() {
return new FloatPageReader();
}
diff --git a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedParquetDefinitionLevelReader.java b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedParquetDefinitionLevelReader.java
index a036aee9e6..20d7f80497 100644
--- a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedParquetDefinitionLevelReader.java
+++ b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedParquetDefinitionLevelReader.java
@@ -19,6 +19,7 @@
package org.apache.iceberg.arrow.vectorized.parquet;
import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
import org.apache.arrow.memory.ArrowBuf;
import org.apache.arrow.vector.BaseVariableWidthVector;
import org.apache.arrow.vector.BitVector;
@@ -29,6 +30,7 @@ import org.apache.arrow.vector.FixedSizeBinaryVector;
import org.apache.arrow.vector.IntVector;
import org.apache.arrow.vector.VarBinaryVector;
import org.apache.iceberg.arrow.vectorized.NullabilityHolder;
+import org.apache.iceberg.parquet.ParquetUtil;
import org.apache.iceberg.parquet.ValuesAsBytesReader;
import org.apache.parquet.column.Dictionary;
@@ -447,6 +449,51 @@ public final class VectorizedParquetDefinitionLevelReader
}
}
+ class TimestampInt96Reader extends BaseReader {
+ @Override
+ protected void nextVal(
+ FieldVector vector,
+ int idx,
+ ValuesAsBytesReader valuesReader,
+ int typeWidth,
+ byte[] byteArray) {
+ // 8 bytes (time of day nanos) + 4 bytes(julianDay) = 12 bytes
+ ByteBuffer buffer = valuesReader.getBuffer(12).order(ByteOrder.LITTLE_ENDIAN);
+ long timestampInt96 = ParquetUtil.extractTimestampInt96(buffer);
+ vector.getDataBuffer().setLong((long) idx * typeWidth, timestampInt96);
+ }
+
+ @Override
+ protected void nextDictEncodedVal(
+ FieldVector vector,
+ int idx,
+ VectorizedDictionaryEncodedParquetValuesReader reader,
+ int numValuesToRead,
+ Dictionary dict,
+ NullabilityHolder nullabilityHolder,
+ int typeWidth,
+ Mode mode) {
+ switch (mode) {
+ case RLE:
+ reader
+ .timestampInt96DictEncodedReader()
+ .nextBatch(vector, idx, numValuesToRead, dict, nullabilityHolder, typeWidth);
+ break;
+ case PACKED:
+ ByteBuffer buffer =
+ dict.decodeToBinary(reader.readInteger())
+ .toByteBuffer()
+ .order(ByteOrder.LITTLE_ENDIAN);
+ long timestampInt96 = ParquetUtil.extractTimestampInt96(buffer);
+ vector.getDataBuffer().setLong(idx, timestampInt96);
+ break;
+ default:
+ throw new UnsupportedOperationException(
+ "Unsupported mode for timestamp int96 reader: " + mode);
+ }
+ }
+ }
+
class FixedWidthBinaryReader extends BaseReader {
@Override
protected void nextVal(
@@ -777,6 +824,10 @@ public final class VectorizedParquetDefinitionLevelReader
return new TimestampMillisReader();
}
+ TimestampInt96Reader timestampInt96Reader() {
+ return new TimestampInt96Reader();
+ }
+
FixedWidthBinaryReader fixedWidthBinaryReader() {
return new FixedWidthBinaryReader();
}
diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetUtil.java b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetUtil.java
index b6d57d073e..a879fc5f51 100644
--- a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetUtil.java
+++ b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetUtil.java
@@ -25,6 +25,7 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@@ -68,6 +69,8 @@ public class ParquetUtil {
// not meant to be instantiated
private ParquetUtil() {}
+ private static final long UNIX_EPOCH_JULIAN = 2_440_588L;
+
public static Metrics fileMetrics(InputFile file, MetricsConfig metricsConfig) {
return fileMetrics(file, metricsConfig, null);
}
@@ -403,4 +406,17 @@ public class ParquetUtil {
}
return primitiveType.getPrimitiveTypeName() == PrimitiveType.PrimitiveTypeName.INT32;
}
+
+ /**
+ * Method to read timestamp (parquet Int96) from bytebuffer. Read 12 bytes in byteBuffer: 8 bytes
+ * (time of day nanos) + 4 bytes(julianDay)
+ */
+ public static long extractTimestampInt96(ByteBuffer buffer) {
+ // 8 bytes (time of day nanos)
+ long timeOfDayNanos = buffer.getLong();
+ // 4 bytes(julianDay)
+ int julianDay = buffer.getInt();
+ return TimeUnit.DAYS.toMicros(julianDay - UNIX_EPOCH_JULIAN)
+ + TimeUnit.NANOSECONDS.toMicros(timeOfDayNanos);
+ }
}
diff --git a/spark/v2.4/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetReaders.java b/spark/v2.4/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetReaders.java
index 836ac46d19..bba68684a3 100644
--- a/spark/v2.4/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetReaders.java
+++ b/spark/v2.4/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetReaders.java
@@ -25,10 +25,10 @@ import java.nio.ByteOrder;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
-import java.util.concurrent.TimeUnit;
import org.apache.iceberg.MetadataColumns;
import org.apache.iceberg.Schema;
import org.apache.iceberg.parquet.ParquetSchemaUtil;
+import org.apache.iceberg.parquet.ParquetUtil;
import org.apache.iceberg.parquet.ParquetValueReader;
import org.apache.iceberg.parquet.ParquetValueReaders;
import org.apache.iceberg.parquet.ParquetValueReaders.FloatAsDoubleReader;
@@ -377,7 +377,6 @@ public class SparkParquetReaders {
}
private static class TimestampInt96Reader extends UnboxedReader<Long> {
- private static final long UNIX_EPOCH_JULIAN = 2_440_588L;
TimestampInt96Reader(ColumnDescriptor desc) {
super(desc);
@@ -392,11 +391,7 @@ public class SparkParquetReaders {
public long readLong() {
final ByteBuffer byteBuffer =
column.nextBinary().toByteBuffer().order(ByteOrder.LITTLE_ENDIAN);
- final long timeOfDayNanos = byteBuffer.getLong();
- final int julianDay = byteBuffer.getInt();
-
- return TimeUnit.DAYS.toMicros(julianDay - UNIX_EPOCH_JULIAN)
- + TimeUnit.NANOSECONDS.toMicros(timeOfDayNanos);
+ return ParquetUtil.extractTimestampInt96(byteBuffer);
}
}
diff --git a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetReaders.java b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetReaders.java
index 5dfb51e14b..940e7934ec 100644
--- a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetReaders.java
+++ b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetReaders.java
@@ -25,10 +25,10 @@ import java.nio.ByteOrder;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
-import java.util.concurrent.TimeUnit;
import org.apache.iceberg.MetadataColumns;
import org.apache.iceberg.Schema;
import org.apache.iceberg.parquet.ParquetSchemaUtil;
+import org.apache.iceberg.parquet.ParquetUtil;
import org.apache.iceberg.parquet.ParquetValueReader;
import org.apache.iceberg.parquet.ParquetValueReaders;
import org.apache.iceberg.parquet.ParquetValueReaders.FloatAsDoubleReader;
@@ -378,7 +378,6 @@ public class SparkParquetReaders {
}
private static class TimestampInt96Reader extends UnboxedReader<Long> {
- private static final long UNIX_EPOCH_JULIAN = 2_440_588L;
TimestampInt96Reader(ColumnDescriptor desc) {
super(desc);
@@ -393,11 +392,7 @@ public class SparkParquetReaders {
public long readLong() {
final ByteBuffer byteBuffer =
column.nextBinary().toByteBuffer().order(ByteOrder.LITTLE_ENDIAN);
- final long timeOfDayNanos = byteBuffer.getLong();
- final int julianDay = byteBuffer.getInt();
-
- return TimeUnit.DAYS.toMicros(julianDay - UNIX_EPOCH_JULIAN)
- + TimeUnit.NANOSECONDS.toMicros(timeOfDayNanos);
+ return ParquetUtil.extractTimestampInt96(byteBuffer);
}
}
diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetReaders.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetReaders.java
index 76e780f7a0..59f81de6ae 100644
--- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetReaders.java
+++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetReaders.java
@@ -25,10 +25,10 @@ import java.nio.ByteOrder;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
-import java.util.concurrent.TimeUnit;
import org.apache.iceberg.MetadataColumns;
import org.apache.iceberg.Schema;
import org.apache.iceberg.parquet.ParquetSchemaUtil;
+import org.apache.iceberg.parquet.ParquetUtil;
import org.apache.iceberg.parquet.ParquetValueReader;
import org.apache.iceberg.parquet.ParquetValueReaders;
import org.apache.iceberg.parquet.ParquetValueReaders.FloatAsDoubleReader;
@@ -377,7 +377,6 @@ public class SparkParquetReaders {
}
private static class TimestampInt96Reader extends UnboxedReader<Long> {
- private static final long UNIX_EPOCH_JULIAN = 2_440_588L;
TimestampInt96Reader(ColumnDescriptor desc) {
super(desc);
@@ -392,11 +391,7 @@ public class SparkParquetReaders {
public long readLong() {
final ByteBuffer byteBuffer =
column.nextBinary().toByteBuffer().order(ByteOrder.LITTLE_ENDIAN);
- final long timeOfDayNanos = byteBuffer.getLong();
- final int julianDay = byteBuffer.getInt();
-
- return TimeUnit.DAYS.toMicros(julianDay - UNIX_EPOCH_JULIAN)
- + TimeUnit.NANOSECONDS.toMicros(timeOfDayNanos);
+ return ParquetUtil.extractTimestampInt96(byteBuffer);
}
}
diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetReaders.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetReaders.java
index 76e780f7a0..59f81de6ae 100644
--- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetReaders.java
+++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetReaders.java
@@ -25,10 +25,10 @@ import java.nio.ByteOrder;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
-import java.util.concurrent.TimeUnit;
import org.apache.iceberg.MetadataColumns;
import org.apache.iceberg.Schema;
import org.apache.iceberg.parquet.ParquetSchemaUtil;
+import org.apache.iceberg.parquet.ParquetUtil;
import org.apache.iceberg.parquet.ParquetValueReader;
import org.apache.iceberg.parquet.ParquetValueReaders;
import org.apache.iceberg.parquet.ParquetValueReaders.FloatAsDoubleReader;
@@ -377,7 +377,6 @@ public class SparkParquetReaders {
}
private static class TimestampInt96Reader extends UnboxedReader<Long> {
- private static final long UNIX_EPOCH_JULIAN = 2_440_588L;
TimestampInt96Reader(ColumnDescriptor desc) {
super(desc);
@@ -392,11 +391,7 @@ public class SparkParquetReaders {
public long readLong() {
final ByteBuffer byteBuffer =
column.nextBinary().toByteBuffer().order(ByteOrder.LITTLE_ENDIAN);
- final long timeOfDayNanos = byteBuffer.getLong();
- final int julianDay = byteBuffer.getInt();
-
- return TimeUnit.DAYS.toMicros(julianDay - UNIX_EPOCH_JULIAN)
- + TimeUnit.NANOSECONDS.toMicros(timeOfDayNanos);
+ return ParquetUtil.extractTimestampInt96(byteBuffer);
}
}
diff --git a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceHadoopTables.java b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceHadoopTables.java
index b55ba0e219..9bd7220b90 100644
--- a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceHadoopTables.java
+++ b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceHadoopTables.java
@@ -50,6 +50,11 @@ public class TestIcebergSourceHadoopTables extends TestIcebergSourceTablesBase {
return TABLES.create(schema, spec, tableLocation);
}
+ @Override
+ public void dropTable(TableIdentifier ident) {
+ TABLES.dropTable(tableLocation);
+ }
+
@Override
public Table loadTable(TableIdentifier ident, String entriesSuffix) {
return TABLES.load(loadLocation(ident, entriesSuffix));
diff --git a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceHiveTables.java b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceHiveTables.java
index de26f5f82c..6292a2c1a8 100644
--- a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceHiveTables.java
+++ b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceHiveTables.java
@@ -43,11 +43,11 @@ public class TestIcebergSourceHiveTables extends TestIcebergSourceTablesBase {
@After
public void dropTable() throws IOException {
- Table table = catalog.loadTable(currentIdentifier);
- Path tablePath = new Path(table.location());
- FileSystem fs = tablePath.getFileSystem(spark.sessionState().newHadoopConf());
- fs.delete(tablePath, true);
- catalog.dropTable(currentIdentifier, false);
+ if (!catalog.tableExists(currentIdentifier)) {
+ return;
+ }
+
+ dropTable(currentIdentifier);
}
@Override
@@ -56,6 +56,15 @@ public class TestIcebergSourceHiveTables extends TestIcebergSourceTablesBase {
return TestIcebergSourceHiveTables.catalog.createTable(ident, schema, spec);
}
+ @Override
+ public void dropTable(TableIdentifier ident) throws IOException {
+ Table table = catalog.loadTable(ident);
+ Path tablePath = new Path(table.location());
+ FileSystem fs = tablePath.getFileSystem(spark.sessionState().newHadoopConf());
+ fs.delete(tablePath, true);
+ catalog.dropTable(ident, false);
+ }
+
@Override
public Table loadTable(TableIdentifier ident, String entriesSuffix) {
TableIdentifier identifier =
diff --git a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java
index 123c06b7e5..fc023cdfdb 100644
--- a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java
+++ b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java
@@ -23,8 +23,11 @@ import static org.apache.iceberg.ManifestContent.DELETES;
import static org.apache.iceberg.types.Types.NestedField.optional;
import static org.apache.iceberg.types.Types.NestedField.required;
+import java.io.File;
import java.io.IOException;
import java.io.UncheckedIOException;
+import java.time.LocalDateTime;
+import java.time.ZoneOffset;
import java.util.Comparator;
import java.util.List;
import java.util.StringJoiner;
@@ -72,13 +75,18 @@ import org.apache.iceberg.spark.data.TestHelpers;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.Pair;
import org.apache.spark.SparkException;
+import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.functions;
+import org.apache.spark.sql.internal.SQLConf;
import org.apache.spark.sql.types.StructType;
+import org.assertj.core.api.Assertions;
+import org.junit.After;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
@@ -113,6 +121,13 @@ public abstract class TestIcebergSourceTablesBase extends SparkTestBase {
public abstract String loadLocation(TableIdentifier ident);
+ public abstract void dropTable(TableIdentifier ident) throws IOException;
+
+ @After
+ public void removeTable() {
+ spark.sql("DROP TABLE IF EXISTS parquet_table");
+ }
+
@Test
public synchronized void testTablesSupport() {
TableIdentifier tableIdentifier = TableIdentifier.of("db", "table");
@@ -472,8 +487,6 @@ public abstract class TestIcebergSourceTablesBase extends SparkTestBase {
@Test
public void testFilesTableWithSnapshotIdInheritance() throws Exception {
- spark.sql("DROP TABLE IF EXISTS parquet_table");
-
TableIdentifier tableIdentifier = TableIdentifier.of("db", "files_inheritance_test");
Table table = createTable(tableIdentifier, SCHEMA, SPEC);
table.updateProperties().set(TableProperties.SNAPSHOT_ID_INHERITANCE_ENABLED, "true").commit();
@@ -496,45 +509,39 @@ public abstract class TestIcebergSourceTablesBase extends SparkTestBase {
table.updateProperties().set(TableProperties.DEFAULT_NAME_MAPPING, mappingJson).commit();
- try {
- String stagingLocation = table.location() + "/metadata";
- SparkTableUtil.importSparkTable(
- spark,
- new org.apache.spark.sql.catalyst.TableIdentifier("parquet_table"),
- table,
- stagingLocation);
-
- Dataset<Row> filesTableDs =
- spark.read().format("iceberg").load(loadLocation(tableIdentifier, "files"));
- List<Row> actual = TestHelpers.selectNonDerived(filesTableDs).collectAsList();
-
- List<GenericData.Record> expected = Lists.newArrayList();
- for (ManifestFile manifest : table.currentSnapshot().dataManifests(table.io())) {
- InputFile in = table.io().newInputFile(manifest.path());
- try (CloseableIterable<GenericData.Record> rows =
- Avro.read(in).project(entriesTable.schema()).build()) {
- for (GenericData.Record record : rows) {
- GenericData.Record file = (GenericData.Record) record.get("data_file");
- TestHelpers.asMetadataRecord(file);
- expected.add(file);
- }
+ String stagingLocation = table.location() + "/metadata";
+ SparkTableUtil.importSparkTable(
+ spark,
+ new org.apache.spark.sql.catalyst.TableIdentifier("parquet_table"),
+ table,
+ stagingLocation);
+
+ Dataset<Row> filesTableDs =
+ spark.read().format("iceberg").load(loadLocation(tableIdentifier, "files"));
+ List<Row> actual = TestHelpers.selectNonDerived(filesTableDs).collectAsList();
+
+ List<GenericData.Record> expected = Lists.newArrayList();
+ for (ManifestFile manifest : table.currentSnapshot().dataManifests(table.io())) {
+ InputFile in = table.io().newInputFile(manifest.path());
+ try (CloseableIterable<GenericData.Record> rows =
+ Avro.read(in).project(entriesTable.schema()).build()) {
+ for (GenericData.Record record : rows) {
+ GenericData.Record file = (GenericData.Record) record.get("data_file");
+ TestHelpers.asMetadataRecord(file);
+ expected.add(file);
}
}
-
- Types.StructType struct = TestHelpers.nonDerivedSchema(filesTableDs);
- Assert.assertEquals("Files table should have one row", 2, expected.size());
- Assert.assertEquals("Actual results should have one row", 2, actual.size());
- TestHelpers.assertEqualsSafe(struct, expected.get(0), actual.get(0));
- TestHelpers.assertEqualsSafe(struct, expected.get(1), actual.get(1));
- } finally {
- spark.sql("DROP TABLE parquet_table");
}
+
+ Types.StructType struct = TestHelpers.nonDerivedSchema(filesTableDs);
+ Assert.assertEquals("Files table should have one row", 2, expected.size());
+ Assert.assertEquals("Actual results should have one row", 2, actual.size());
+ TestHelpers.assertEqualsSafe(struct, expected.get(0), actual.get(0));
+ TestHelpers.assertEqualsSafe(struct, expected.get(1), actual.get(1));
}
@Test
public void testEntriesTableWithSnapshotIdInheritance() throws Exception {
- spark.sql("DROP TABLE IF EXISTS parquet_table");
-
TableIdentifier tableIdentifier = TableIdentifier.of("db", "entries_inheritance_test");
PartitionSpec spec = SPEC;
Table table = createTable(tableIdentifier, SCHEMA, spec);
@@ -553,34 +560,30 @@ public abstract class TestIcebergSourceTablesBase extends SparkTestBase {
Dataset<Row> inputDF = spark.createDataFrame(records, SimpleRecord.class);
inputDF.select("data", "id").write().mode("overwrite").insertInto("parquet_table");
- try {
- String stagingLocation = table.location() + "/metadata";
- SparkTableUtil.importSparkTable(
- spark,
- new org.apache.spark.sql.catalyst.TableIdentifier("parquet_table"),
- table,
- stagingLocation);
-
- List<Row> actual =
- spark
- .read()
- .format("iceberg")
- .load(loadLocation(tableIdentifier, "entries"))
- .select("sequence_number", "snapshot_id", "data_file")
- .collectAsList();
-
- table.refresh();
-
- long snapshotId = table.currentSnapshot().snapshotId();
-
- Assert.assertEquals("Entries table should have 2 rows", 2, actual.size());
- Assert.assertEquals("Sequence number must match", 0, actual.get(0).getLong(0));
- Assert.assertEquals("Snapshot id must match", snapshotId, actual.get(0).getLong(1));
- Assert.assertEquals("Sequence number must match", 0, actual.get(1).getLong(0));
- Assert.assertEquals("Snapshot id must match", snapshotId, actual.get(1).getLong(1));
- } finally {
- spark.sql("DROP TABLE parquet_table");
- }
+ String stagingLocation = table.location() + "/metadata";
+ SparkTableUtil.importSparkTable(
+ spark,
+ new org.apache.spark.sql.catalyst.TableIdentifier("parquet_table"),
+ table,
+ stagingLocation);
+
+ List<Row> actual =
+ spark
+ .read()
+ .format("iceberg")
+ .load(loadLocation(tableIdentifier, "entries"))
+ .select("sequence_number", "snapshot_id", "data_file")
+ .collectAsList();
+
+ table.refresh();
+
+ long snapshotId = table.currentSnapshot().snapshotId();
+
+ Assert.assertEquals("Entries table should have 2 rows", 2, actual.size());
+ Assert.assertEquals("Sequence number must match", 0, actual.get(0).getLong(0));
+ Assert.assertEquals("Snapshot id must match", snapshotId, actual.get(0).getLong(1));
+ Assert.assertEquals("Sequence number must match", 0, actual.get(1).getLong(0));
+ Assert.assertEquals("Snapshot id must match", snapshotId, actual.get(1).getLong(1));
}
@Test
@@ -1766,6 +1769,69 @@ public abstract class TestIcebergSourceTablesBase extends SparkTestBase {
}
}
+ @Test
+ public void testTableWithInt96Timestamp() throws IOException {
+ File parquetTableDir = temp.newFolder("table_timestamp_int96");
+ String parquetTableLocation = parquetTableDir.toURI().toString();
+ Schema schema =
+ new Schema(
+ optional(1, "id", Types.LongType.get()),
+ optional(2, "tmp_col", Types.TimestampType.withZone()));
+ spark.conf().set(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE().key(), "INT96");
+
+ LocalDateTime start = LocalDateTime.of(2000, 1, 31, 0, 0, 0);
+ LocalDateTime end = LocalDateTime.of(2100, 1, 1, 0, 0, 0);
+ long startSec = start.toEpochSecond(ZoneOffset.UTC);
+ long endSec = end.toEpochSecond(ZoneOffset.UTC);
+ Column idColumn = functions.expr("id");
+ Column secondsColumn =
+ functions.expr("(id % " + (endSec - startSec) + " + " + startSec + ")").as("seconds");
+ Column timestampColumn = functions.expr("cast( seconds as timestamp) as tmp_col");
+
+ for (Boolean useDict : new Boolean[] {true, false}) {
+ for (Boolean useVectorization : new Boolean[] {true, false}) {
+ spark.sql("DROP TABLE IF EXISTS parquet_table");
+ spark
+ .range(0, 5000, 100, 1)
+ .select(idColumn, secondsColumn)
+ .select(idColumn, timestampColumn)
+ .write()
+ .format("parquet")
+ .option("parquet.enable.dictionary", useDict)
+ .mode("overwrite")
+ .option("path", parquetTableLocation)
+ .saveAsTable("parquet_table");
+ TableIdentifier tableIdentifier = TableIdentifier.of("db", "table_with_timestamp_int96");
+ Table table = createTable(tableIdentifier, schema, PartitionSpec.unpartitioned());
+ table
+ .updateProperties()
+ .set(TableProperties.PARQUET_VECTORIZATION_ENABLED, useVectorization.toString())
+ .commit();
+
+ String stagingLocation = table.location() + "/metadata";
+ SparkTableUtil.importSparkTable(
+ spark,
+ new org.apache.spark.sql.catalyst.TableIdentifier("parquet_table"),
+ table,
+ stagingLocation);
+
+ // validate we get the expected results back
+ List<Row> expected = spark.table("parquet_table").select("tmp_col").collectAsList();
+ List<Row> actual =
+ spark
+ .read()
+ .format("iceberg")
+ .load(loadLocation(tableIdentifier))
+ .select("tmp_col")
+ .collectAsList();
+ Assertions.assertThat(actual)
+ .as("Rows must match")
+ .containsExactlyInAnyOrderElementsOf(expected);
+ dropTable(tableIdentifier);
+ }
+ }
+ }
+
private GenericData.Record manifestRecord(
Table manifestTable, Long referenceSnapshotId, ManifestFile manifest) {
GenericRecordBuilder builder =