You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by et...@apache.org on 2023/04/26 07:13:38 UTC

[iceberg] 03/03: address review feedback

This is an automated email from the ASF dual-hosted git repository.

etudenhoefner pushed a commit to branch spark-uuid-read-write-support-3.4
in repository https://gitbox.apache.org/repos/asf/iceberg.git

commit 7fcc85b044d70e382443275ad0ee090c79d9193b
Author: Eduard Tudenhoefner <et...@gmail.com>
AuthorDate: Wed Apr 26 09:12:09 2023 +0200

    address review feedback
---
 api/src/main/java/org/apache/iceberg/util/UUIDUtil.java  | 12 +++++++++++-
 .../apache/iceberg/spark/data/SparkOrcValueReaders.java  | 16 ++--------------
 .../org/apache/iceberg/spark/data/SparkOrcWriter.java    |  3 ++-
 .../apache/iceberg/spark/data/SparkParquetWriters.java   | 15 ++++++---------
 4 files changed, 21 insertions(+), 25 deletions(-)

diff --git a/api/src/main/java/org/apache/iceberg/util/UUIDUtil.java b/api/src/main/java/org/apache/iceberg/util/UUIDUtil.java
index 4cedb5bd22..b72feec00b 100644
--- a/api/src/main/java/org/apache/iceberg/util/UUIDUtil.java
+++ b/api/src/main/java/org/apache/iceberg/util/UUIDUtil.java
@@ -62,7 +62,17 @@ public class UUIDUtil {
   }
 
   public static ByteBuffer convertToByteBuffer(UUID value) {
-    ByteBuffer buffer = ByteBuffer.allocate(16);
+    return convertToByteBuffer(value, null);
+  }
+
+  public static ByteBuffer convertToByteBuffer(UUID value, ByteBuffer reuse) {
+    ByteBuffer buffer;
+    if (reuse != null) {
+      buffer = reuse;
+    } else {
+      buffer = ByteBuffer.allocate(16);
+    }
+
     buffer.order(ByteOrder.BIG_ENDIAN);
     buffer.putLong(0, value.getMostSignificantBits());
     buffer.putLong(8, value.getLeastSignificantBits());
diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcValueReaders.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcValueReaders.java
index 2bc5ef96a3..670537fbf8 100644
--- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcValueReaders.java
+++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcValueReaders.java
@@ -20,7 +20,6 @@ package org.apache.iceberg.spark.data;
 
 import java.math.BigDecimal;
 import java.nio.ByteBuffer;
-import java.nio.ByteOrder;
 import java.util.List;
 import java.util.Map;
 import org.apache.iceberg.orc.OrcValueReader;
@@ -178,14 +177,6 @@ public class SparkOrcValueReaders {
   }
 
   private static class UUIDReader implements OrcValueReader<UTF8String> {
-    private static final ThreadLocal<ByteBuffer> BUFFER =
-        ThreadLocal.withInitial(
-            () -> {
-              ByteBuffer buffer = ByteBuffer.allocate(16);
-              buffer.order(ByteOrder.BIG_ENDIAN);
-              return buffer;
-            });
-
     private static final UUIDReader INSTANCE = new UUIDReader();
 
     private UUIDReader() {}
@@ -193,11 +184,8 @@ public class SparkOrcValueReaders {
     @Override
     public UTF8String nonNullRead(ColumnVector vector, int row) {
       BytesColumnVector bytesVector = (BytesColumnVector) vector;
-      ByteBuffer buffer = BUFFER.get();
-      buffer.rewind();
-      buffer.put(bytesVector.vector[row], bytesVector.start[row], bytesVector.length[row]);
-      buffer.rewind();
-
+      ByteBuffer buffer =
+          ByteBuffer.wrap(bytesVector.vector[row], bytesVector.start[row], bytesVector.length[row]);
       return UTF8String.fromString(UUIDUtil.convert(buffer).toString());
     }
   }
diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcWriter.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcWriter.java
index c5477fac08..6b799e677b 100644
--- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcWriter.java
+++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcWriter.java
@@ -178,7 +178,8 @@ public class SparkOrcWriter implements OrcRowWriter<InternalRow> {
       case BINARY:
         if (ORCSchemaUtil.BinaryType.UUID
             .toString()
-            .equals(fieldType.getAttributeValue(ORCSchemaUtil.ICEBERG_BINARY_TYPE_ATTRIBUTE))) {
+            .equalsIgnoreCase(
+                fieldType.getAttributeValue(ORCSchemaUtil.ICEBERG_BINARY_TYPE_ATTRIBUTE))) {
           fieldGetter = SpecializedGetters::getUTF8String;
         } else {
           fieldGetter = SpecializedGetters::getBinary;
diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetWriters.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetWriters.java
index c1abec96cd..af6f65a089 100644
--- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetWriters.java
+++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetWriters.java
@@ -35,6 +35,7 @@ import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.types.TypeUtil;
 import org.apache.iceberg.util.DecimalUtil;
+import org.apache.iceberg.util.UUIDUtil;
 import org.apache.parquet.column.ColumnDescriptor;
 import org.apache.parquet.io.api.Binary;
 import org.apache.parquet.schema.GroupType;
@@ -213,6 +214,10 @@ public class SparkParquetWriters {
     return new UTF8StringWriter(desc);
   }
 
+  private static PrimitiveWriter<UTF8String> uuids(ColumnDescriptor desc) {
+    return new UUIDWriter(desc);
+  }
+
   private static PrimitiveWriter<Decimal> decimalAsInteger(
       ColumnDescriptor desc, int precision, int scale) {
     return new IntegerDecimalWriter(desc, precision, scale);
@@ -323,10 +328,6 @@ public class SparkParquetWriters {
     }
   }
 
-  private static PrimitiveWriter<UTF8String> uuids(ColumnDescriptor desc) {
-    return new UUIDWriter(desc);
-  }
-
   private static class UUIDWriter extends PrimitiveWriter<UTF8String> {
     private static final ThreadLocal<ByteBuffer> BUFFER =
         ThreadLocal.withInitial(
@@ -343,11 +344,7 @@ public class SparkParquetWriters {
     @Override
     public void write(int repetitionLevel, UTF8String string) {
       UUID uuid = UUID.fromString(string.toString());
-      ByteBuffer buffer = BUFFER.get();
-      buffer.rewind();
-      buffer.putLong(uuid.getMostSignificantBits());
-      buffer.putLong(uuid.getLeastSignificantBits());
-      buffer.rewind();
+      ByteBuffer buffer = UUIDUtil.convertToByteBuffer(uuid, BUFFER.get());
       column.writeBinary(repetitionLevel, Binary.fromReusedByteBuffer(buffer));
     }
   }