You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by et...@apache.org on 2023/04/26 07:13:35 UTC

[iceberg] branch spark-uuid-read-write-support-3.4 created (now 7fcc85b044)

This is an automated email from the ASF dual-hosted git repository.

etudenhoefner pushed a change to branch spark-uuid-read-write-support-3.4
in repository https://gitbox.apache.org/repos/asf/iceberg.git


      at 7fcc85b044 address review feedback

This branch includes the following new commits:

     new 07ddc3c777 Spark: Add read/write support for UUIDs
     new ced5a0737f Add vectorization support for UTF8String
     new 7fcc85b044 address review feedback

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[iceberg] 02/03: Add vectorization support for UTF8String

Posted by et...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

etudenhoefner pushed a commit to branch spark-uuid-read-write-support-3.4
in repository https://gitbox.apache.org/repos/asf/iceberg.git

commit ced5a0737fa608645f5ae386b387944912cb697d
Author: Eduard Tudenhoefner <et...@gmail.com>
AuthorDate: Mon Apr 24 17:17:08 2023 +0200

    Add vectorization support for UTF8String
---
 .../java/org/apache/iceberg/util/RandomUtil.java   |  4 ++++
 .../GenericArrowVectorAccessorFactory.java         | 27 +++++++++++++++++++++-
 .../vectorized/ArrowVectorAccessorFactory.java     |  7 ++++++
 3 files changed, 37 insertions(+), 1 deletion(-)

diff --git a/api/src/test/java/org/apache/iceberg/util/RandomUtil.java b/api/src/test/java/org/apache/iceberg/util/RandomUtil.java
index a84dc4d8f8..9131e61661 100644
--- a/api/src/test/java/org/apache/iceberg/util/RandomUtil.java
+++ b/api/src/test/java/org/apache/iceberg/util/RandomUtil.java
@@ -182,6 +182,10 @@ public class RandomUtil {
         BigInteger unscaled = new BigInteger(String.valueOf(value + 1));
         BigDecimal bd = new BigDecimal(unscaled, type.scale());
         return negate(value) ? bd.negate() : bd;
+      case UUID:
+        byte[] uuidBytes = new byte[16];
+        random.nextBytes(uuidBytes);
+        return uuidBytes;
       default:
         throw new IllegalArgumentException(
             "Cannot generate random value for unknown type: " + primitive);
diff --git a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/GenericArrowVectorAccessorFactory.java b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/GenericArrowVectorAccessorFactory.java
index e9305e399c..a988516bc6 100644
--- a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/GenericArrowVectorAccessorFactory.java
+++ b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/GenericArrowVectorAccessorFactory.java
@@ -218,7 +218,8 @@ public class GenericArrowVectorAccessorFactory<
         return new FixedSizeBinaryBackedDecimalAccessor<>(
             (FixedSizeBinaryVector) vector, decimalFactorySupplier.get());
       }
-      return new FixedSizeBinaryAccessor<>((FixedSizeBinaryVector) vector);
+      return new FixedSizeBinaryAccessor<>(
+          (FixedSizeBinaryVector) vector, stringFactorySupplier.get());
     }
     throw new UnsupportedOperationException("Unsupported vector: " + vector.getClass());
   }
@@ -558,16 +559,32 @@ public class GenericArrowVectorAccessorFactory<
       extends ArrowVectorAccessor<DecimalT, Utf8StringT, ArrayT, ChildVectorT> {
 
     private final FixedSizeBinaryVector vector;
+    private final StringFactory<Utf8StringT> stringFactory;
 
     FixedSizeBinaryAccessor(FixedSizeBinaryVector vector) {
       super(vector);
       this.vector = vector;
+      this.stringFactory = null;
+    }
+
+    FixedSizeBinaryAccessor(
+        FixedSizeBinaryVector vector, StringFactory<Utf8StringT> stringFactory) {
+      super(vector);
+      this.vector = vector;
+      this.stringFactory = stringFactory;
     }
 
     @Override
     public byte[] getBinary(int rowId) {
       return vector.get(rowId);
     }
+
+    @Override
+    public Utf8StringT getUTF8String(int rowId) {
+      return null == stringFactory
+          ? super.getUTF8String(rowId)
+          : stringFactory.ofRow(vector, rowId);
+    }
   }
 
   private static class ArrayAccessor<
@@ -794,6 +811,14 @@ public class GenericArrowVectorAccessorFactory<
     /** Create a UTF8 String from the row value in the arrow vector. */
     Utf8StringT ofRow(VarCharVector vector, int rowId);
 
+    /** Create a UTF8 String from the row value in the FixedSizeBinaryVector vector. */
+    default Utf8StringT ofRow(FixedSizeBinaryVector vector, int rowId) {
+      throw new UnsupportedOperationException(
+          String.format(
+              "Creating %s from a FixedSizeBinaryVector is not supported",
+              getGenericClass().getSimpleName()));
+    }
+
     /** Create a UTF8 String from the byte array. */
     Utf8StringT ofBytes(byte[] bytes);
 
diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ArrowVectorAccessorFactory.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ArrowVectorAccessorFactory.java
index e32ebcb02b..29e938bb09 100644
--- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ArrowVectorAccessorFactory.java
+++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ArrowVectorAccessorFactory.java
@@ -21,10 +21,12 @@ package org.apache.iceberg.spark.data.vectorized;
 import java.math.BigDecimal;
 import java.nio.ByteBuffer;
 import org.apache.arrow.memory.ArrowBuf;
+import org.apache.arrow.vector.FixedSizeBinaryVector;
 import org.apache.arrow.vector.ValueVector;
 import org.apache.arrow.vector.VarCharVector;
 import org.apache.arrow.vector.complex.ListVector;
 import org.apache.iceberg.arrow.vectorized.GenericArrowVectorAccessorFactory;
+import org.apache.iceberg.util.UUIDUtil;
 import org.apache.spark.sql.types.Decimal;
 import org.apache.spark.sql.vectorized.ArrowColumnVector;
 import org.apache.spark.sql.vectorized.ColumnarArray;
@@ -74,6 +76,11 @@ final class ArrowVectorAccessorFactory
           null, vector.getDataBuffer().memoryAddress() + start, end - start);
     }
 
+    @Override
+    public UTF8String ofRow(FixedSizeBinaryVector vector, int rowId) {
+      return UTF8String.fromString(UUIDUtil.convert(vector.get(rowId)).toString());
+    }
+
     @Override
     public UTF8String ofBytes(byte[] bytes) {
       return UTF8String.fromBytes(bytes);


[iceberg] 01/03: Spark: Add read/write support for UUIDs

Posted by et...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

etudenhoefner pushed a commit to branch spark-uuid-read-write-support-3.4
in repository https://gitbox.apache.org/repos/asf/iceberg.git

commit 07ddc3c777b7dccda24098a9918e3746a27bc500
Author: Eduard Tudenhoefner <et...@gmail.com>
AuthorDate: Fri Apr 21 09:18:32 2023 +0200

    Spark: Add read/write support for UUIDs
---
 .../apache/iceberg/spark/data/SparkAvroWriter.java |  2 +-
 .../apache/iceberg/spark/data/SparkOrcReader.java  |  3 ++
 .../iceberg/spark/data/SparkOrcValueReaders.java   | 32 +++++++++++++++++++
 .../iceberg/spark/data/SparkOrcValueWriters.java   | 17 ++++++++++
 .../apache/iceberg/spark/data/SparkOrcWriter.java  | 11 ++++++-
 .../iceberg/spark/data/SparkParquetReaders.java    | 17 ++++++++++
 .../iceberg/spark/data/SparkParquetWriters.java    | 36 ++++++++++++++++++++++
 .../data/vectorized/VectorizedSparkOrcReaders.java |  5 ++-
 .../apache/iceberg/spark/data/AvroDataTest.java    |  2 +-
 .../org/apache/iceberg/spark/data/RandomData.java  |  2 ++
 .../iceberg/spark/data/TestSparkParquetWriter.java |  2 +-
 11 files changed, 124 insertions(+), 5 deletions(-)

diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/SparkAvroWriter.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/SparkAvroWriter.java
index 15465568c2..04dfd46a18 100644
--- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/SparkAvroWriter.java
+++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/SparkAvroWriter.java
@@ -126,7 +126,7 @@ public class SparkAvroWriter implements MetricsAwareDatumWriter<InternalRow> {
             return SparkValueWriters.decimal(decimal.getPrecision(), decimal.getScale());
 
           case "uuid":
-            return ValueWriters.uuids();
+            return SparkValueWriters.uuids();
 
           default:
             throw new IllegalArgumentException("Unsupported logical type: " + logicalType);
diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcReader.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcReader.java
index 78db137054..c20be44f67 100644
--- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcReader.java
+++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcReader.java
@@ -123,6 +123,9 @@ public class SparkOrcReader implements OrcRowReader<InternalRow> {
         case STRING:
           return SparkOrcValueReaders.utf8String();
         case BINARY:
+          if (Type.TypeID.UUID == iPrimitive.typeId()) {
+            return SparkOrcValueReaders.uuids();
+          }
           return OrcValueReaders.bytes();
         default:
           throw new IllegalArgumentException("Unhandled type " + primitive);
diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcValueReaders.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcValueReaders.java
index 9e9b3e53bb..2bc5ef96a3 100644
--- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcValueReaders.java
+++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcValueReaders.java
@@ -19,6 +19,8 @@
 package org.apache.iceberg.spark.data;
 
 import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
 import java.util.List;
 import java.util.Map;
 import org.apache.iceberg.orc.OrcValueReader;
@@ -26,6 +28,7 @@ import org.apache.iceberg.orc.OrcValueReaders;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.types.Types;
+import org.apache.iceberg.util.UUIDUtil;
 import org.apache.orc.storage.ql.exec.vector.BytesColumnVector;
 import org.apache.orc.storage.ql.exec.vector.ColumnVector;
 import org.apache.orc.storage.ql.exec.vector.DecimalColumnVector;
@@ -49,6 +52,10 @@ public class SparkOrcValueReaders {
     return StringReader.INSTANCE;
   }
 
+  public static OrcValueReader<UTF8String> uuids() {
+    return UUIDReader.INSTANCE;
+  }
+
   public static OrcValueReader<Long> timestampTzs() {
     return TimestampTzReader.INSTANCE;
   }
@@ -170,6 +177,31 @@ public class SparkOrcValueReaders {
     }
   }
 
+  private static class UUIDReader implements OrcValueReader<UTF8String> {
+    private static final ThreadLocal<ByteBuffer> BUFFER =
+        ThreadLocal.withInitial(
+            () -> {
+              ByteBuffer buffer = ByteBuffer.allocate(16);
+              buffer.order(ByteOrder.BIG_ENDIAN);
+              return buffer;
+            });
+
+    private static final UUIDReader INSTANCE = new UUIDReader();
+
+    private UUIDReader() {}
+
+    @Override
+    public UTF8String nonNullRead(ColumnVector vector, int row) {
+      BytesColumnVector bytesVector = (BytesColumnVector) vector;
+      ByteBuffer buffer = BUFFER.get();
+      buffer.rewind();
+      buffer.put(bytesVector.vector[row], bytesVector.start[row], bytesVector.length[row]);
+      buffer.rewind();
+
+      return UTF8String.fromString(UUIDUtil.convert(buffer).toString());
+    }
+  }
+
   private static class TimestampTzReader implements OrcValueReader<Long> {
     private static final TimestampTzReader INSTANCE = new TimestampTzReader();
 
diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcValueWriters.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcValueWriters.java
index 780090f991..9a4f1b5b48 100644
--- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcValueWriters.java
+++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcValueWriters.java
@@ -18,10 +18,13 @@
  */
 package org.apache.iceberg.spark.data;
 
+import java.nio.ByteBuffer;
 import java.util.List;
+import java.util.UUID;
 import java.util.stream.Stream;
 import org.apache.iceberg.FieldMetrics;
 import org.apache.iceberg.orc.OrcValueWriter;
+import org.apache.iceberg.util.UUIDUtil;
 import org.apache.orc.TypeDescription;
 import org.apache.orc.storage.common.type.HiveDecimal;
 import org.apache.orc.storage.ql.exec.vector.BytesColumnVector;
@@ -42,6 +45,10 @@ class SparkOrcValueWriters {
     return StringWriter.INSTANCE;
   }
 
+  static OrcValueWriter<?> uuids() {
+    return UUIDWriter.INSTANCE;
+  }
+
   static OrcValueWriter<?> timestampTz() {
     return TimestampTzWriter.INSTANCE;
   }
@@ -73,6 +80,16 @@ class SparkOrcValueWriters {
     }
   }
 
+  private static class UUIDWriter implements OrcValueWriter<UTF8String> {
+    private static final UUIDWriter INSTANCE = new UUIDWriter();
+
+    @Override
+    public void nonNullWrite(int rowId, UTF8String data, ColumnVector output) {
+      ByteBuffer buffer = UUIDUtil.convertToByteBuffer(UUID.fromString(data.toString()));
+      ((BytesColumnVector) output).setRef(rowId, buffer.array(), 0, buffer.array().length);
+    }
+  }
+
   private static class TimestampTzWriter implements OrcValueWriter<Long> {
     private static final TimestampTzWriter INSTANCE = new TimestampTzWriter();
 
diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcWriter.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcWriter.java
index 60868b8700..c5477fac08 100644
--- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcWriter.java
+++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcWriter.java
@@ -111,6 +111,9 @@ public class SparkOrcWriter implements OrcRowWriter<InternalRow> {
         case DOUBLE:
           return GenericOrcWriters.doubles(ORCSchemaUtil.fieldId(primitive));
         case BINARY:
+          if (Type.TypeID.UUID == iPrimitive.typeId()) {
+            return SparkOrcValueWriters.uuids();
+          }
           return GenericOrcWriters.byteArrays();
         case STRING:
         case CHAR:
@@ -173,7 +176,13 @@ public class SparkOrcWriter implements OrcRowWriter<InternalRow> {
         fieldGetter = SpecializedGetters::getDouble;
         break;
       case BINARY:
-        fieldGetter = SpecializedGetters::getBinary;
+        if (ORCSchemaUtil.BinaryType.UUID
+            .toString()
+            .equals(fieldType.getAttributeValue(ORCSchemaUtil.ICEBERG_BINARY_TYPE_ATTRIBUTE))) {
+          fieldGetter = SpecializedGetters::getUTF8String;
+        } else {
+          fieldGetter = SpecializedGetters::getBinary;
+        }
         // getBinary always makes a copy, so we don't need to worry about it
         // being changed behind our back.
         break;
diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetReaders.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetReaders.java
index 59f81de6ae..af16d9bbc2 100644
--- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetReaders.java
+++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetReaders.java
@@ -46,6 +46,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.relocated.com.google.common.collect.Maps;
 import org.apache.iceberg.types.Type.TypeID;
 import org.apache.iceberg.types.Types;
+import org.apache.iceberg.util.UUIDUtil;
 import org.apache.parquet.column.ColumnDescriptor;
 import org.apache.parquet.io.api.Binary;
 import org.apache.parquet.schema.GroupType;
@@ -232,6 +233,7 @@ public class SparkParquetReaders {
     }
 
     @Override
+    @SuppressWarnings("checkstyle:CyclomaticComplexity")
     public ParquetValueReader<?> primitive(
         org.apache.iceberg.types.Type.PrimitiveType expected, PrimitiveType primitive) {
       ColumnDescriptor desc = type.getColumnDescription(currentPath());
@@ -282,6 +284,9 @@ public class SparkParquetReaders {
       switch (primitive.getPrimitiveTypeName()) {
         case FIXED_LEN_BYTE_ARRAY:
         case BINARY:
+          if (expected != null && expected.typeId() == TypeID.UUID) {
+            return new UUIDReader(desc);
+          }
           return new ParquetValueReaders.ByteArrayReader(desc);
         case INT32:
           if (expected != null && expected.typeId() == TypeID.LONG) {
@@ -413,6 +418,18 @@ public class SparkParquetReaders {
     }
   }
 
+  private static class UUIDReader extends PrimitiveReader<UTF8String> {
+    UUIDReader(ColumnDescriptor desc) {
+      super(desc);
+    }
+
+    @Override
+    @SuppressWarnings("ByteBufferBackingArray")
+    public UTF8String read(UTF8String ignored) {
+      return UTF8String.fromString(UUIDUtil.convert(column.nextBinary().toByteBuffer()).toString());
+    }
+  }
+
   private static class ArrayReader<E> extends RepeatedReader<ArrayData, ReusableArrayData, E> {
     private int readPos = 0;
     private int writePos = 0;
diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetWriters.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetWriters.java
index 3637fa4a26..c1abec96cd 100644
--- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetWriters.java
+++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetWriters.java
@@ -18,10 +18,13 @@
  */
 package org.apache.iceberg.spark.data;
 
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.NoSuchElementException;
+import java.util.UUID;
 import org.apache.iceberg.parquet.ParquetValueReaders.ReusableEntry;
 import org.apache.iceberg.parquet.ParquetValueWriter;
 import org.apache.iceberg.parquet.ParquetValueWriters;
@@ -35,6 +38,7 @@ import org.apache.iceberg.util.DecimalUtil;
 import org.apache.parquet.column.ColumnDescriptor;
 import org.apache.parquet.io.api.Binary;
 import org.apache.parquet.schema.GroupType;
+import org.apache.parquet.schema.LogicalTypeAnnotation;
 import org.apache.parquet.schema.LogicalTypeAnnotation.DecimalLogicalTypeAnnotation;
 import org.apache.parquet.schema.MessageType;
 import org.apache.parquet.schema.PrimitiveType;
@@ -176,6 +180,9 @@ public class SparkParquetWriters {
       switch (primitive.getPrimitiveTypeName()) {
         case FIXED_LEN_BYTE_ARRAY:
         case BINARY:
+          if (LogicalTypeAnnotation.uuidType().equals(primitive.getLogicalTypeAnnotation())) {
+            return uuids(desc);
+          }
           return byteArrays(desc);
         case BOOLEAN:
           return ParquetValueWriters.booleans(desc);
@@ -316,6 +323,35 @@ public class SparkParquetWriters {
     }
   }
 
+  private static PrimitiveWriter<UTF8String> uuids(ColumnDescriptor desc) {
+    return new UUIDWriter(desc);
+  }
+
+  private static class UUIDWriter extends PrimitiveWriter<UTF8String> {
+    private static final ThreadLocal<ByteBuffer> BUFFER =
+        ThreadLocal.withInitial(
+            () -> {
+              ByteBuffer buffer = ByteBuffer.allocate(16);
+              buffer.order(ByteOrder.BIG_ENDIAN);
+              return buffer;
+            });
+
+    private UUIDWriter(ColumnDescriptor desc) {
+      super(desc);
+    }
+
+    @Override
+    public void write(int repetitionLevel, UTF8String string) {
+      UUID uuid = UUID.fromString(string.toString());
+      ByteBuffer buffer = BUFFER.get();
+      buffer.rewind();
+      buffer.putLong(uuid.getMostSignificantBits());
+      buffer.putLong(uuid.getLeastSignificantBits());
+      buffer.rewind();
+      column.writeBinary(repetitionLevel, Binary.fromReusedByteBuffer(buffer));
+    }
+  }
+
   private static class ByteArrayWriter extends PrimitiveWriter<byte[]> {
     private ByteArrayWriter(ColumnDescriptor desc) {
       super(desc);
diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/VectorizedSparkOrcReaders.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/VectorizedSparkOrcReaders.java
index b2d8bd14be..c030311232 100644
--- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/VectorizedSparkOrcReaders.java
+++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/VectorizedSparkOrcReaders.java
@@ -155,7 +155,10 @@ public class VectorizedSparkOrcReaders {
           primitiveValueReader = SparkOrcValueReaders.utf8String();
           break;
         case BINARY:
-          primitiveValueReader = OrcValueReaders.bytes();
+          primitiveValueReader =
+              Type.TypeID.UUID == iPrimitive.typeId()
+                  ? SparkOrcValueReaders.uuids()
+                  : OrcValueReaders.bytes();
           break;
         default:
           throw new IllegalArgumentException("Unhandled type " + primitive);
diff --git a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/data/AvroDataTest.java b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/data/AvroDataTest.java
index 5fd137c536..db0d7336f1 100644
--- a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/data/AvroDataTest.java
+++ b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/data/AvroDataTest.java
@@ -56,7 +56,7 @@ public abstract class AvroDataTest {
           optional(107, "date", Types.DateType.get()),
           required(108, "ts", Types.TimestampType.withZone()),
           required(110, "s", Types.StringType.get()),
-          // required(111, "uuid", Types.UUIDType.get()),
+          required(111, "uuid", Types.UUIDType.get()),
           required(112, "fixed", Types.FixedType.ofLength(7)),
           optional(113, "bytes", Types.BinaryType.get()),
           required(114, "dec_9_0", Types.DecimalType.of(9, 0)), // int encoded
diff --git a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/data/RandomData.java b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/data/RandomData.java
index 1c95df8ced..478afcf09a 100644
--- a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/data/RandomData.java
+++ b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/data/RandomData.java
@@ -329,6 +329,8 @@ public class RandomData {
           return UTF8String.fromString((String) obj);
         case DECIMAL:
           return Decimal.apply((BigDecimal) obj);
+        case UUID:
+          return UTF8String.fromString(UUID.nameUUIDFromBytes((byte[]) obj).toString());
         default:
           return obj;
       }
diff --git a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkParquetWriter.java b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkParquetWriter.java
index 261fb8838a..467d8a27a2 100644
--- a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkParquetWriter.java
+++ b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkParquetWriter.java
@@ -79,7 +79,7 @@ public class TestSparkParquetWriter {
                   Types.StringType.get(),
                   Types.StructType.of(
                       optional(22, "jumpy", Types.DoubleType.get()),
-                      required(23, "koala", Types.IntegerType.get()),
+                      required(23, "koala", Types.UUIDType.get()),
                       required(24, "couch rope", Types.IntegerType.get())))),
           optional(2, "slide", Types.StringType.get()));
 


[iceberg] 03/03: address review feedback

Posted by et...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

etudenhoefner pushed a commit to branch spark-uuid-read-write-support-3.4
in repository https://gitbox.apache.org/repos/asf/iceberg.git

commit 7fcc85b044d70e382443275ad0ee090c79d9193b
Author: Eduard Tudenhoefner <et...@gmail.com>
AuthorDate: Wed Apr 26 09:12:09 2023 +0200

    address review feedback
---
 api/src/main/java/org/apache/iceberg/util/UUIDUtil.java  | 12 +++++++++++-
 .../apache/iceberg/spark/data/SparkOrcValueReaders.java  | 16 ++--------------
 .../org/apache/iceberg/spark/data/SparkOrcWriter.java    |  3 ++-
 .../apache/iceberg/spark/data/SparkParquetWriters.java   | 15 ++++++---------
 4 files changed, 21 insertions(+), 25 deletions(-)

diff --git a/api/src/main/java/org/apache/iceberg/util/UUIDUtil.java b/api/src/main/java/org/apache/iceberg/util/UUIDUtil.java
index 4cedb5bd22..b72feec00b 100644
--- a/api/src/main/java/org/apache/iceberg/util/UUIDUtil.java
+++ b/api/src/main/java/org/apache/iceberg/util/UUIDUtil.java
@@ -62,7 +62,17 @@ public class UUIDUtil {
   }
 
   public static ByteBuffer convertToByteBuffer(UUID value) {
-    ByteBuffer buffer = ByteBuffer.allocate(16);
+    return convertToByteBuffer(value, null);
+  }
+
+  public static ByteBuffer convertToByteBuffer(UUID value, ByteBuffer reuse) {
+    ByteBuffer buffer;
+    if (reuse != null) {
+      buffer = reuse;
+    } else {
+      buffer = ByteBuffer.allocate(16);
+    }
+
     buffer.order(ByteOrder.BIG_ENDIAN);
     buffer.putLong(0, value.getMostSignificantBits());
     buffer.putLong(8, value.getLeastSignificantBits());
diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcValueReaders.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcValueReaders.java
index 2bc5ef96a3..670537fbf8 100644
--- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcValueReaders.java
+++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcValueReaders.java
@@ -20,7 +20,6 @@ package org.apache.iceberg.spark.data;
 
 import java.math.BigDecimal;
 import java.nio.ByteBuffer;
-import java.nio.ByteOrder;
 import java.util.List;
 import java.util.Map;
 import org.apache.iceberg.orc.OrcValueReader;
@@ -178,14 +177,6 @@ public class SparkOrcValueReaders {
   }
 
   private static class UUIDReader implements OrcValueReader<UTF8String> {
-    private static final ThreadLocal<ByteBuffer> BUFFER =
-        ThreadLocal.withInitial(
-            () -> {
-              ByteBuffer buffer = ByteBuffer.allocate(16);
-              buffer.order(ByteOrder.BIG_ENDIAN);
-              return buffer;
-            });
-
     private static final UUIDReader INSTANCE = new UUIDReader();
 
     private UUIDReader() {}
@@ -193,11 +184,8 @@ public class SparkOrcValueReaders {
     @Override
     public UTF8String nonNullRead(ColumnVector vector, int row) {
       BytesColumnVector bytesVector = (BytesColumnVector) vector;
-      ByteBuffer buffer = BUFFER.get();
-      buffer.rewind();
-      buffer.put(bytesVector.vector[row], bytesVector.start[row], bytesVector.length[row]);
-      buffer.rewind();
-
+      ByteBuffer buffer =
+          ByteBuffer.wrap(bytesVector.vector[row], bytesVector.start[row], bytesVector.length[row]);
       return UTF8String.fromString(UUIDUtil.convert(buffer).toString());
     }
   }
diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcWriter.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcWriter.java
index c5477fac08..6b799e677b 100644
--- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcWriter.java
+++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcWriter.java
@@ -178,7 +178,8 @@ public class SparkOrcWriter implements OrcRowWriter<InternalRow> {
       case BINARY:
         if (ORCSchemaUtil.BinaryType.UUID
             .toString()
-            .equals(fieldType.getAttributeValue(ORCSchemaUtil.ICEBERG_BINARY_TYPE_ATTRIBUTE))) {
+            .equalsIgnoreCase(
+                fieldType.getAttributeValue(ORCSchemaUtil.ICEBERG_BINARY_TYPE_ATTRIBUTE))) {
           fieldGetter = SpecializedGetters::getUTF8String;
         } else {
           fieldGetter = SpecializedGetters::getBinary;
diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetWriters.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetWriters.java
index c1abec96cd..af6f65a089 100644
--- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetWriters.java
+++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetWriters.java
@@ -35,6 +35,7 @@ import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.types.TypeUtil;
 import org.apache.iceberg.util.DecimalUtil;
+import org.apache.iceberg.util.UUIDUtil;
 import org.apache.parquet.column.ColumnDescriptor;
 import org.apache.parquet.io.api.Binary;
 import org.apache.parquet.schema.GroupType;
@@ -213,6 +214,10 @@ public class SparkParquetWriters {
     return new UTF8StringWriter(desc);
   }
 
+  private static PrimitiveWriter<UTF8String> uuids(ColumnDescriptor desc) {
+    return new UUIDWriter(desc);
+  }
+
   private static PrimitiveWriter<Decimal> decimalAsInteger(
       ColumnDescriptor desc, int precision, int scale) {
     return new IntegerDecimalWriter(desc, precision, scale);
@@ -323,10 +328,6 @@ public class SparkParquetWriters {
     }
   }
 
-  private static PrimitiveWriter<UTF8String> uuids(ColumnDescriptor desc) {
-    return new UUIDWriter(desc);
-  }
-
   private static class UUIDWriter extends PrimitiveWriter<UTF8String> {
     private static final ThreadLocal<ByteBuffer> BUFFER =
         ThreadLocal.withInitial(
@@ -343,11 +344,7 @@ public class SparkParquetWriters {
     @Override
     public void write(int repetitionLevel, UTF8String string) {
       UUID uuid = UUID.fromString(string.toString());
-      ByteBuffer buffer = BUFFER.get();
-      buffer.rewind();
-      buffer.putLong(uuid.getMostSignificantBits());
-      buffer.putLong(uuid.getLeastSignificantBits());
-      buffer.rewind();
+      ByteBuffer buffer = UUIDUtil.convertToByteBuffer(uuid, BUFFER.get());
       column.writeBinary(repetitionLevel, Binary.fromReusedByteBuffer(buffer));
     }
   }