You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by xi...@apache.org on 2023/10/17 10:41:04 UTC

[pulsar] branch branch-3.0 updated: [feat][sql] Support UUID for json and avro (#21267)

This is an automated email from the ASF dual-hosted git repository.

xiangying pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new d7ca04bf5e9 [feat][sql] Support UUID for json and avro (#21267)
d7ca04bf5e9 is described below

commit d7ca04bf5e94e660829ccf1efb0e9e0964f9120c
Author: Xiangying Meng <55...@users.noreply.github.com>
AuthorDate: Mon Oct 9 12:07:01 2023 +0800

    [feat][sql] Support UUID for json and avro (#21267)
    
    ### Motivation
    As https://pulsar.apache.org/docs/3.1.x/sql-overview/, Pulsar SQL is based on [Trino (formerly Presto SQL)](https://trino.io/), which supports UUID type. But now, the UUID field in Avro or JSON schema will be interpreted as VARCHAR.
    
    ### Modifications
    
    Support decoding UUID form AVRO or JSON schema.
    
    (cherry picked from commit 8c7094328e03b11bf57e8f9d1022047961b75481)
---
 .../pulsar/sql/presto/decoder/avro/PulsarAvroColumnDecoder.java  | 9 ++++++++-
 .../sql/presto/decoder/avro/PulsarAvroRowDecoderFactory.java     | 5 +++++
 .../pulsar/sql/presto/decoder/json/PulsarJsonFieldDecoder.java   | 4 +++-
 .../sql/presto/decoder/json/PulsarJsonRowDecoderFactory.java     | 5 +++++
 .../org/apache/pulsar/sql/presto/decoder/DecoderTestMessage.java | 4 ++++
 .../apache/pulsar/sql/presto/decoder/avro/TestAvroDecoder.java   | 7 +++++++
 .../apache/pulsar/sql/presto/decoder/json/TestJsonDecoder.java   | 8 ++++++++
 7 files changed, 40 insertions(+), 2 deletions(-)

diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/decoder/avro/PulsarAvroColumnDecoder.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/decoder/avro/PulsarAvroColumnDecoder.java
index 73081f8948a..1672d5f1448 100644
--- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/decoder/avro/PulsarAvroColumnDecoder.java
+++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/decoder/avro/PulsarAvroColumnDecoder.java
@@ -54,6 +54,7 @@ import io.trino.spi.type.TimestampType;
 import io.trino.spi.type.Timestamps;
 import io.trino.spi.type.TinyintType;
 import io.trino.spi.type.Type;
+import io.trino.spi.type.UuidType;
 import io.trino.spi.type.VarbinaryType;
 import io.trino.spi.type.VarcharType;
 import java.math.BigInteger;
@@ -61,6 +62,7 @@ import java.nio.ByteBuffer;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.UUID;
 import org.apache.avro.generic.GenericEnumSymbol;
 import org.apache.avro.generic.GenericFixed;
 import org.apache.avro.generic.GenericRecord;
@@ -87,7 +89,8 @@ public class PulsarAvroColumnDecoder {
             TimestampType.TIMESTAMP_MILLIS,
             DateType.DATE,
             TimeType.TIME_MILLIS,
-            VarbinaryType.VARBINARY);
+            VarbinaryType.VARBINARY,
+            UuidType.UUID);
 
     private final Type columnType;
     private final String columnMapping;
@@ -255,6 +258,10 @@ public class PulsarAvroColumnDecoder {
             }
         }
 
+        if (type instanceof UuidType) {
+            return UuidType.javaUuidToTrinoUuid(UUID.fromString(value.toString()));
+        }
+
         throw new TrinoException(DECODER_CONVERSION_NOT_SUPPORTED,
                 format("cannot decode object of '%s' as '%s' for column '%s'",
                         value.getClass(), type, columnName));
diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/decoder/avro/PulsarAvroRowDecoderFactory.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/decoder/avro/PulsarAvroRowDecoderFactory.java
index 3072bf9441b..e6eb6b7f2f9 100644
--- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/decoder/avro/PulsarAvroRowDecoderFactory.java
+++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/decoder/avro/PulsarAvroRowDecoderFactory.java
@@ -44,6 +44,7 @@ import io.trino.spi.type.Type;
 import io.trino.spi.type.TypeManager;
 import io.trino.spi.type.TypeSignature;
 import io.trino.spi.type.TypeSignatureParameter;
+import io.trino.spi.type.UuidType;
 import io.trino.spi.type.VarbinaryType;
 import io.trino.spi.type.VarcharType;
 import java.util.List;
@@ -121,6 +122,10 @@ public class PulsarAvroRowDecoderFactory implements PulsarRowDecoderFactory {
         LogicalType logicalType  = schema.getLogicalType();
         switch (type) {
             case STRING:
+                if (logicalType != null && logicalType.equals(LogicalTypes.uuid())) {
+                    return UuidType.UUID;
+                }
+                return createUnboundedVarcharType();
             case ENUM:
                 return createUnboundedVarcharType();
             case NULL:
diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/decoder/json/PulsarJsonFieldDecoder.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/decoder/json/PulsarJsonFieldDecoder.java
index 905e3bd6bec..8e744e3b122 100644
--- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/decoder/json/PulsarJsonFieldDecoder.java
+++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/decoder/json/PulsarJsonFieldDecoder.java
@@ -58,6 +58,7 @@ import io.trino.spi.type.TimestampType;
 import io.trino.spi.type.Timestamps;
 import io.trino.spi.type.TinyintType;
 import io.trino.spi.type.Type;
+import io.trino.spi.type.UuidType;
 import io.trino.spi.type.VarbinaryType;
 import io.trino.spi.type.VarcharType;
 import java.util.Iterator;
@@ -126,7 +127,8 @@ public class PulsarJsonFieldDecoder
                 TimestampType.TIMESTAMP_MILLIS,
                 DateType.DATE,
                 TimeType.TIME_MILLIS,
-                RealType.REAL
+                RealType.REAL,
+                UuidType.UUID
         ).contains(type)) {
             return true;
         }
diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/decoder/json/PulsarJsonRowDecoderFactory.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/decoder/json/PulsarJsonRowDecoderFactory.java
index 0d5cc2d262d..737eb608d82 100644
--- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/decoder/json/PulsarJsonRowDecoderFactory.java
+++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/decoder/json/PulsarJsonRowDecoderFactory.java
@@ -44,6 +44,7 @@ import io.trino.spi.type.Type;
 import io.trino.spi.type.TypeManager;
 import io.trino.spi.type.TypeSignature;
 import io.trino.spi.type.TypeSignatureParameter;
+import io.trino.spi.type.UuidType;
 import io.trino.spi.type.VarbinaryType;
 import io.trino.spi.type.VarcharType;
 import java.util.List;
@@ -121,6 +122,10 @@ public class PulsarJsonRowDecoderFactory implements PulsarRowDecoderFactory {
         LogicalType logicalType  = schema.getLogicalType();
         switch (type) {
             case STRING:
+                if (logicalType != null && logicalType.equals(LogicalTypes.uuid())) {
+                    return UuidType.UUID;
+                }
+                return createUnboundedVarcharType();
             case ENUM:
                 return createUnboundedVarcharType();
             case NULL:
diff --git a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/decoder/DecoderTestMessage.java b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/decoder/DecoderTestMessage.java
index 4561282c671..0dec76b3d4d 100644
--- a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/decoder/DecoderTestMessage.java
+++ b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/decoder/DecoderTestMessage.java
@@ -19,6 +19,7 @@
 package org.apache.pulsar.sql.presto.decoder;
 
 import java.math.BigDecimal;
+import java.util.UUID;
 import lombok.Data;
 
 import java.util.List;
@@ -55,6 +56,9 @@ public class DecoderTestMessage {
     public Map<String, Long> mapField;
     public CompositeRow compositeRow;
 
+    @org.apache.avro.reflect.AvroSchema("{\"type\":\"string\",\"logicalType\":\"uuid\"}")
+    public UUID uuidField;
+
     public static class TestRow {
         public String stringField;
         public int intField;
diff --git a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/decoder/avro/TestAvroDecoder.java b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/decoder/avro/TestAvroDecoder.java
index c4e7009b946..5f9df96619b 100644
--- a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/decoder/avro/TestAvroDecoder.java
+++ b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/decoder/avro/TestAvroDecoder.java
@@ -44,6 +44,7 @@ import io.trino.spi.type.StandardTypes;
 import io.trino.spi.type.Timestamps;
 import io.trino.spi.type.Type;
 import io.trino.spi.type.TypeSignatureParameter;
+import io.trino.spi.type.UuidType;
 import io.trino.spi.type.VarcharType;
 import java.math.BigDecimal;
 import java.time.LocalDate;
@@ -55,6 +56,7 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.UUID;
 import org.apache.pulsar.client.impl.schema.AvroSchema;
 import org.apache.pulsar.client.impl.schema.generic.GenericAvroRecord;
 import org.apache.pulsar.client.impl.schema.generic.GenericAvroSchema;
@@ -90,6 +92,7 @@ public class TestAvroDecoder extends AbstractDecoderTester {
         message.longField = 222L;
         message.timestampField = System.currentTimeMillis();
         message.enumField = DecoderTestMessage.TestEnum.TEST_ENUM_1;
+        message.uuidField = UUID.randomUUID();
 
         LocalTime now = LocalTime.now(ZoneId.systemDefault());
         message.timeField = now.toSecondOfDay() * 1000;
@@ -137,6 +140,10 @@ public class TestAvroDecoder extends AbstractDecoderTester {
         PulsarColumnHandle timeFieldColumnHandle = new PulsarColumnHandle(getPulsarConnectorId().toString(),
                 "timeField", TIME_MILLIS, false, false, "timeField", null, null, PulsarColumnHandle.HandleKeyValueType.NONE);
         checkValue(decodedRow, timeFieldColumnHandle, (long) message.timeField * Timestamps.PICOSECONDS_PER_MILLISECOND);
+
+        PulsarColumnHandle uuidHandle = new PulsarColumnHandle(getPulsarConnectorId().toString(),
+                "uuidField", UuidType.UUID, false, false, "uuidField", null, null, PulsarColumnHandle.HandleKeyValueType.NONE);
+        checkValue(decodedRow, uuidHandle, UuidType.javaUuidToTrinoUuid(message.uuidField));
     }
 
     @Test
diff --git a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/decoder/json/TestJsonDecoder.java b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/decoder/json/TestJsonDecoder.java
index 4afad9b318f..32e71a53444 100644
--- a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/decoder/json/TestJsonDecoder.java
+++ b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/decoder/json/TestJsonDecoder.java
@@ -44,6 +44,7 @@ import io.trino.spi.type.StandardTypes;
 import io.trino.spi.type.Timestamps;
 import io.trino.spi.type.Type;
 import io.trino.spi.type.TypeSignatureParameter;
+import io.trino.spi.type.UuidType;
 import io.trino.spi.type.VarcharType;
 import java.math.BigDecimal;
 import java.time.LocalDate;
@@ -55,6 +56,7 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.UUID;
 import org.apache.pulsar.client.impl.schema.JSONSchema;
 import org.apache.pulsar.client.impl.schema.generic.GenericJsonRecord;
 import org.apache.pulsar.client.impl.schema.generic.GenericJsonSchema;
@@ -98,6 +100,8 @@ public class TestJsonDecoder extends AbstractDecoderTester {
         LocalDate epoch = LocalDate.ofEpochDay(0);
         message.dateField = Math.toIntExact(ChronoUnit.DAYS.between(epoch, localDate));
 
+        message.uuidField = UUID.randomUUID();
+
         ByteBuf payload = io.netty.buffer.Unpooled
                 .copiedBuffer(schema.encode(message));
         Map<DecoderColumnHandle, FieldValueProvider> decodedRow = pulsarRowDecoder.decodeRow(payload).get();
@@ -137,6 +141,10 @@ public class TestJsonDecoder extends AbstractDecoderTester {
         PulsarColumnHandle timeFieldColumnHandle = new PulsarColumnHandle(getPulsarConnectorId().toString(),
                 "timeField", TIME_MILLIS, false, false, "timeField", null, null, PulsarColumnHandle.HandleKeyValueType.NONE);
         checkValue(decodedRow, timeFieldColumnHandle, (long) message.timeField * Timestamps.PICOSECONDS_PER_MILLISECOND);
+
+        PulsarColumnHandle uuidHandle = new PulsarColumnHandle(getPulsarConnectorId().toString(),
+                "uuidField", UuidType.UUID, false, false, "uuidField", null, null, PulsarColumnHandle.HandleKeyValueType.NONE);
+        checkValue(decodedRow, uuidHandle, message.uuidField.toString());
     }
 
     @Test