You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by xi...@apache.org on 2023/10/17 10:41:04 UTC
[pulsar] branch branch-3.0 updated: [feat][sql] Support UUID for json and avro (#21267)
This is an automated email from the ASF dual-hosted git repository.
xiangying pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new d7ca04bf5e9 [feat][sql] Support UUID for json and avro (#21267)
d7ca04bf5e9 is described below
commit d7ca04bf5e94e660829ccf1efb0e9e0964f9120c
Author: Xiangying Meng <55...@users.noreply.github.com>
AuthorDate: Mon Oct 9 12:07:01 2023 +0800
[feat][sql] Support UUID for json and avro (#21267)
### Motivation
As https://pulsar.apache.org/docs/3.1.x/sql-overview/, Pulsar SQL is based on [Trino (formerly Presto SQL)](https://trino.io/), which supports UUID type. But now, the UUID field in Avro or JSON schema will be interpreted as VARCHAR.
### Modifications
Support decoding UUID form AVRO or JSON schema.
(cherry picked from commit 8c7094328e03b11bf57e8f9d1022047961b75481)
---
.../pulsar/sql/presto/decoder/avro/PulsarAvroColumnDecoder.java | 9 ++++++++-
.../sql/presto/decoder/avro/PulsarAvroRowDecoderFactory.java | 5 +++++
.../pulsar/sql/presto/decoder/json/PulsarJsonFieldDecoder.java | 4 +++-
.../sql/presto/decoder/json/PulsarJsonRowDecoderFactory.java | 5 +++++
.../org/apache/pulsar/sql/presto/decoder/DecoderTestMessage.java | 4 ++++
.../apache/pulsar/sql/presto/decoder/avro/TestAvroDecoder.java | 7 +++++++
.../apache/pulsar/sql/presto/decoder/json/TestJsonDecoder.java | 8 ++++++++
7 files changed, 40 insertions(+), 2 deletions(-)
diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/decoder/avro/PulsarAvroColumnDecoder.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/decoder/avro/PulsarAvroColumnDecoder.java
index 73081f8948a..1672d5f1448 100644
--- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/decoder/avro/PulsarAvroColumnDecoder.java
+++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/decoder/avro/PulsarAvroColumnDecoder.java
@@ -54,6 +54,7 @@ import io.trino.spi.type.TimestampType;
import io.trino.spi.type.Timestamps;
import io.trino.spi.type.TinyintType;
import io.trino.spi.type.Type;
+import io.trino.spi.type.UuidType;
import io.trino.spi.type.VarbinaryType;
import io.trino.spi.type.VarcharType;
import java.math.BigInteger;
@@ -61,6 +62,7 @@ import java.nio.ByteBuffer;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.UUID;
import org.apache.avro.generic.GenericEnumSymbol;
import org.apache.avro.generic.GenericFixed;
import org.apache.avro.generic.GenericRecord;
@@ -87,7 +89,8 @@ public class PulsarAvroColumnDecoder {
TimestampType.TIMESTAMP_MILLIS,
DateType.DATE,
TimeType.TIME_MILLIS,
- VarbinaryType.VARBINARY);
+ VarbinaryType.VARBINARY,
+ UuidType.UUID);
private final Type columnType;
private final String columnMapping;
@@ -255,6 +258,10 @@ public class PulsarAvroColumnDecoder {
}
}
+ if (type instanceof UuidType) {
+ return UuidType.javaUuidToTrinoUuid(UUID.fromString(value.toString()));
+ }
+
throw new TrinoException(DECODER_CONVERSION_NOT_SUPPORTED,
format("cannot decode object of '%s' as '%s' for column '%s'",
value.getClass(), type, columnName));
diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/decoder/avro/PulsarAvroRowDecoderFactory.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/decoder/avro/PulsarAvroRowDecoderFactory.java
index 3072bf9441b..e6eb6b7f2f9 100644
--- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/decoder/avro/PulsarAvroRowDecoderFactory.java
+++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/decoder/avro/PulsarAvroRowDecoderFactory.java
@@ -44,6 +44,7 @@ import io.trino.spi.type.Type;
import io.trino.spi.type.TypeManager;
import io.trino.spi.type.TypeSignature;
import io.trino.spi.type.TypeSignatureParameter;
+import io.trino.spi.type.UuidType;
import io.trino.spi.type.VarbinaryType;
import io.trino.spi.type.VarcharType;
import java.util.List;
@@ -121,6 +122,10 @@ public class PulsarAvroRowDecoderFactory implements PulsarRowDecoderFactory {
LogicalType logicalType = schema.getLogicalType();
switch (type) {
case STRING:
+ if (logicalType != null && logicalType.equals(LogicalTypes.uuid())) {
+ return UuidType.UUID;
+ }
+ return createUnboundedVarcharType();
case ENUM:
return createUnboundedVarcharType();
case NULL:
diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/decoder/json/PulsarJsonFieldDecoder.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/decoder/json/PulsarJsonFieldDecoder.java
index 905e3bd6bec..8e744e3b122 100644
--- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/decoder/json/PulsarJsonFieldDecoder.java
+++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/decoder/json/PulsarJsonFieldDecoder.java
@@ -58,6 +58,7 @@ import io.trino.spi.type.TimestampType;
import io.trino.spi.type.Timestamps;
import io.trino.spi.type.TinyintType;
import io.trino.spi.type.Type;
+import io.trino.spi.type.UuidType;
import io.trino.spi.type.VarbinaryType;
import io.trino.spi.type.VarcharType;
import java.util.Iterator;
@@ -126,7 +127,8 @@ public class PulsarJsonFieldDecoder
TimestampType.TIMESTAMP_MILLIS,
DateType.DATE,
TimeType.TIME_MILLIS,
- RealType.REAL
+ RealType.REAL,
+ UuidType.UUID
).contains(type)) {
return true;
}
diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/decoder/json/PulsarJsonRowDecoderFactory.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/decoder/json/PulsarJsonRowDecoderFactory.java
index 0d5cc2d262d..737eb608d82 100644
--- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/decoder/json/PulsarJsonRowDecoderFactory.java
+++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/decoder/json/PulsarJsonRowDecoderFactory.java
@@ -44,6 +44,7 @@ import io.trino.spi.type.Type;
import io.trino.spi.type.TypeManager;
import io.trino.spi.type.TypeSignature;
import io.trino.spi.type.TypeSignatureParameter;
+import io.trino.spi.type.UuidType;
import io.trino.spi.type.VarbinaryType;
import io.trino.spi.type.VarcharType;
import java.util.List;
@@ -121,6 +122,10 @@ public class PulsarJsonRowDecoderFactory implements PulsarRowDecoderFactory {
LogicalType logicalType = schema.getLogicalType();
switch (type) {
case STRING:
+ if (logicalType != null && logicalType.equals(LogicalTypes.uuid())) {
+ return UuidType.UUID;
+ }
+ return createUnboundedVarcharType();
case ENUM:
return createUnboundedVarcharType();
case NULL:
diff --git a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/decoder/DecoderTestMessage.java b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/decoder/DecoderTestMessage.java
index 4561282c671..0dec76b3d4d 100644
--- a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/decoder/DecoderTestMessage.java
+++ b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/decoder/DecoderTestMessage.java
@@ -19,6 +19,7 @@
package org.apache.pulsar.sql.presto.decoder;
import java.math.BigDecimal;
+import java.util.UUID;
import lombok.Data;
import java.util.List;
@@ -55,6 +56,9 @@ public class DecoderTestMessage {
public Map<String, Long> mapField;
public CompositeRow compositeRow;
+ @org.apache.avro.reflect.AvroSchema("{\"type\":\"string\",\"logicalType\":\"uuid\"}")
+ public UUID uuidField;
+
public static class TestRow {
public String stringField;
public int intField;
diff --git a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/decoder/avro/TestAvroDecoder.java b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/decoder/avro/TestAvroDecoder.java
index c4e7009b946..5f9df96619b 100644
--- a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/decoder/avro/TestAvroDecoder.java
+++ b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/decoder/avro/TestAvroDecoder.java
@@ -44,6 +44,7 @@ import io.trino.spi.type.StandardTypes;
import io.trino.spi.type.Timestamps;
import io.trino.spi.type.Type;
import io.trino.spi.type.TypeSignatureParameter;
+import io.trino.spi.type.UuidType;
import io.trino.spi.type.VarcharType;
import java.math.BigDecimal;
import java.time.LocalDate;
@@ -55,6 +56,7 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.UUID;
import org.apache.pulsar.client.impl.schema.AvroSchema;
import org.apache.pulsar.client.impl.schema.generic.GenericAvroRecord;
import org.apache.pulsar.client.impl.schema.generic.GenericAvroSchema;
@@ -90,6 +92,7 @@ public class TestAvroDecoder extends AbstractDecoderTester {
message.longField = 222L;
message.timestampField = System.currentTimeMillis();
message.enumField = DecoderTestMessage.TestEnum.TEST_ENUM_1;
+ message.uuidField = UUID.randomUUID();
LocalTime now = LocalTime.now(ZoneId.systemDefault());
message.timeField = now.toSecondOfDay() * 1000;
@@ -137,6 +140,10 @@ public class TestAvroDecoder extends AbstractDecoderTester {
PulsarColumnHandle timeFieldColumnHandle = new PulsarColumnHandle(getPulsarConnectorId().toString(),
"timeField", TIME_MILLIS, false, false, "timeField", null, null, PulsarColumnHandle.HandleKeyValueType.NONE);
checkValue(decodedRow, timeFieldColumnHandle, (long) message.timeField * Timestamps.PICOSECONDS_PER_MILLISECOND);
+
+ PulsarColumnHandle uuidHandle = new PulsarColumnHandle(getPulsarConnectorId().toString(),
+ "uuidField", UuidType.UUID, false, false, "uuidField", null, null, PulsarColumnHandle.HandleKeyValueType.NONE);
+ checkValue(decodedRow, uuidHandle, UuidType.javaUuidToTrinoUuid(message.uuidField));
}
@Test
diff --git a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/decoder/json/TestJsonDecoder.java b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/decoder/json/TestJsonDecoder.java
index 4afad9b318f..32e71a53444 100644
--- a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/decoder/json/TestJsonDecoder.java
+++ b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/decoder/json/TestJsonDecoder.java
@@ -44,6 +44,7 @@ import io.trino.spi.type.StandardTypes;
import io.trino.spi.type.Timestamps;
import io.trino.spi.type.Type;
import io.trino.spi.type.TypeSignatureParameter;
+import io.trino.spi.type.UuidType;
import io.trino.spi.type.VarcharType;
import java.math.BigDecimal;
import java.time.LocalDate;
@@ -55,6 +56,7 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.UUID;
import org.apache.pulsar.client.impl.schema.JSONSchema;
import org.apache.pulsar.client.impl.schema.generic.GenericJsonRecord;
import org.apache.pulsar.client.impl.schema.generic.GenericJsonSchema;
@@ -98,6 +100,8 @@ public class TestJsonDecoder extends AbstractDecoderTester {
LocalDate epoch = LocalDate.ofEpochDay(0);
message.dateField = Math.toIntExact(ChronoUnit.DAYS.between(epoch, localDate));
+ message.uuidField = UUID.randomUUID();
+
ByteBuf payload = io.netty.buffer.Unpooled
.copiedBuffer(schema.encode(message));
Map<DecoderColumnHandle, FieldValueProvider> decodedRow = pulsarRowDecoder.decodeRow(payload).get();
@@ -137,6 +141,10 @@ public class TestJsonDecoder extends AbstractDecoderTester {
PulsarColumnHandle timeFieldColumnHandle = new PulsarColumnHandle(getPulsarConnectorId().toString(),
"timeField", TIME_MILLIS, false, false, "timeField", null, null, PulsarColumnHandle.HandleKeyValueType.NONE);
checkValue(decodedRow, timeFieldColumnHandle, (long) message.timeField * Timestamps.PICOSECONDS_PER_MILLISECOND);
+
+ PulsarColumnHandle uuidHandle = new PulsarColumnHandle(getPulsarConnectorId().toString(),
+ "uuidField", UuidType.UUID, false, false, "uuidField", null, null, PulsarColumnHandle.HandleKeyValueType.NONE);
+ checkValue(decodedRow, uuidHandle, message.uuidField.toString());
}
@Test