You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/04/28 13:04:18 UTC
[pulsar] 14/26: Pulsar SQL support for Decimal data type (#15153)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 642159c8866ac13246ee112964ad79f4b0c7cf9e
Author: Baodi Shi <wu...@icloud.com>
AuthorDate: Thu Apr 21 17:26:35 2022 +0800
Pulsar SQL support for Decimal data type (#15153)
(cherry picked from commit 6b004ed6a2554ab826a00aa2a177963de3c5f44b)
---
.../presto/decoder/avro/PulsarAvroColumnDecoder.java | 19 ++++++++++++++++++-
.../decoder/avro/PulsarAvroRowDecoderFactory.java | 10 +++++++++-
.../decoder/json/PulsarJsonRowDecoderFactory.java | 6 ++++++
.../pulsar/sql/presto/TestPulsarConnector.java | 8 +++++++-
.../pulsar/sql/presto/TestPulsarRecordCursor.java | 15 +++++++++++++++
.../sql/presto/decoder/AbstractDecoderTester.java | 5 +++++
.../sql/presto/decoder/DecoderTestMessage.java | 6 +++++-
.../pulsar/sql/presto/decoder/DecoderTestUtil.java | 20 ++++++++++++++++++++
.../sql/presto/decoder/avro/TestAvroDecoder.java | 11 +++++++++++
9 files changed, 96 insertions(+), 4 deletions(-)
diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/decoder/avro/PulsarAvroColumnDecoder.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/decoder/avro/PulsarAvroColumnDecoder.java
index 690daf62d2e..0c57336d213 100644
--- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/decoder/avro/PulsarAvroColumnDecoder.java
+++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/decoder/avro/PulsarAvroColumnDecoder.java
@@ -40,6 +40,8 @@ import io.prestosql.spi.type.ArrayType;
import io.prestosql.spi.type.BigintType;
import io.prestosql.spi.type.BooleanType;
import io.prestosql.spi.type.DateType;
+import io.prestosql.spi.type.DecimalType;
+import io.prestosql.spi.type.Decimals;
import io.prestosql.spi.type.DoubleType;
import io.prestosql.spi.type.IntegerType;
import io.prestosql.spi.type.MapType;
@@ -53,6 +55,7 @@ import io.prestosql.spi.type.TinyintType;
import io.prestosql.spi.type.Type;
import io.prestosql.spi.type.VarbinaryType;
import io.prestosql.spi.type.VarcharType;
+import java.math.BigInteger;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.Map;
@@ -139,7 +142,7 @@ public class PulsarAvroColumnDecoder {
}
private boolean isSupportedPrimitive(Type type) {
- return type instanceof VarcharType || SUPPORTED_PRIMITIVE_TYPES.contains(type);
+ return type instanceof VarcharType || type instanceof DecimalType || SUPPORTED_PRIMITIVE_TYPES.contains(type);
}
public FieldValueProvider decodeField(GenericRecord avroRecord) {
@@ -205,6 +208,13 @@ public class PulsarAvroColumnDecoder {
return floatToIntBits((Float) value);
}
+ if (columnType instanceof DecimalType) {
+ ByteBuffer buffer = (ByteBuffer) value;
+ byte[] bytes = new byte[buffer.remaining()];
+ buffer.get(bytes);
+ return new BigInteger(bytes).longValue();
+ }
+
throw new PrestoException(DECODER_CONVERSION_NOT_SUPPORTED,
format("cannot decode object of '%s' as '%s' for column '%s'",
value.getClass(), columnType, columnName));
@@ -234,6 +244,13 @@ public class PulsarAvroColumnDecoder {
}
}
+ // The returned Slice size must be equals to 18 Byte
+ if (type instanceof DecimalType) {
+ ByteBuffer buffer = (ByteBuffer) value;
+ BigInteger bigInteger = new BigInteger(buffer.array());
+ return Decimals.encodeUnscaledValue(bigInteger);
+ }
+
throw new PrestoException(DECODER_CONVERSION_NOT_SUPPORTED,
format("cannot decode object of '%s' as '%s' for column '%s'",
value.getClass(), type, columnName));
diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/decoder/avro/PulsarAvroRowDecoderFactory.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/decoder/avro/PulsarAvroRowDecoderFactory.java
index 12352059c2d..74b0a88fcef 100644
--- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/decoder/avro/PulsarAvroRowDecoderFactory.java
+++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/decoder/avro/PulsarAvroRowDecoderFactory.java
@@ -33,6 +33,7 @@ import io.prestosql.spi.connector.ColumnMetadata;
import io.prestosql.spi.type.ArrayType;
import io.prestosql.spi.type.BigintType;
import io.prestosql.spi.type.BooleanType;
+import io.prestosql.spi.type.DecimalType;
import io.prestosql.spi.type.DoubleType;
import io.prestosql.spi.type.IntegerType;
import io.prestosql.spi.type.RealType;
@@ -128,7 +129,14 @@ public class PulsarAvroRowDecoderFactory implements PulsarRowDecoderFactory {
+ "please check the schema or report the bug.", fieldname));
case FIXED:
case BYTES:
- //TODO: support decimal logicalType
+ // When the precision <= 0, throw Exception.
+ // When the precision > 0 and <= 18, use ShortDecimalType. and mapping Long
+ // When the precision > 18 and <= 36, use LongDecimalType. and mapping Slice
+ // When the precision > 36, throw Exception.
+ if (logicalType instanceof LogicalTypes.Decimal) {
+ LogicalTypes.Decimal decimal = (LogicalTypes.Decimal) logicalType;
+ return DecimalType.createDecimalType(decimal.getPrecision(), decimal.getScale());
+ }
return VarbinaryType.VARBINARY;
case INT:
if (logicalType == LogicalTypes.timeMillis()) {
diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/decoder/json/PulsarJsonRowDecoderFactory.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/decoder/json/PulsarJsonRowDecoderFactory.java
index 330631e72a8..bb064d8909f 100644
--- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/decoder/json/PulsarJsonRowDecoderFactory.java
+++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/decoder/json/PulsarJsonRowDecoderFactory.java
@@ -128,6 +128,12 @@ public class PulsarJsonRowDecoderFactory implements PulsarRowDecoderFactory {
+ "please check the schema or report the bug.", fieldname));
case FIXED:
case BYTES:
+ // In the current implementation, since JsonSchema is generated by Avro,
+ // there may exist LogicalTypes.Decimal.
+ // Mapping decimalType with varcharType in JsonSchema.
+ if (logicalType instanceof LogicalTypes.Decimal) {
+ return createUnboundedVarcharType();
+ }
return VarbinaryType.VARBINARY;
case INT:
if (logicalType == LogicalTypes.timeMillis()) {
diff --git a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarConnector.java b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarConnector.java
index b673fc368e6..7db32f59148 100644
--- a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarConnector.java
+++ b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarConnector.java
@@ -25,6 +25,7 @@ import io.prestosql.spi.connector.ColumnMetadata;
import io.prestosql.spi.connector.ConnectorContext;
import io.prestosql.spi.predicate.TupleDomain;
import io.prestosql.testing.TestingConnectorContext;
+import java.math.BigDecimal;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
@@ -166,6 +167,8 @@ public abstract class TestPulsarConnector {
public int time;
@org.apache.avro.reflect.AvroSchema("{ \"type\": \"int\", \"logicalType\": \"date\" }")
public int date;
+ @org.apache.avro.reflect.AvroSchema("{ \"type\": \"bytes\", \"logicalType\": \"decimal\", \"precision\": 4, \"scale\": 2 }")
+ public BigDecimal decimal;
public TestPulsarConnector.Bar bar;
public TestEnum field7;
}
@@ -253,6 +256,7 @@ public abstract class TestPulsarConnector {
fooFieldNames.add("date");
fooFieldNames.add("bar");
fooFieldNames.add("field7");
+ fooFieldNames.add("decimal");
ConnectorContext prestoConnectorContext = new TestingConnectorContext();
@@ -313,6 +317,7 @@ public abstract class TestPulsarConnector {
LocalDate epoch = LocalDate.ofEpochDay(0);
return Math.toIntExact(ChronoUnit.DAYS.between(epoch, localDate));
});
+ fooFunctions.put("decimal", integer -> BigDecimal.valueOf(1234, 2));
fooFunctions.put("bar.field1", integer -> integer % 3 == 0 ? null : integer + 1);
fooFunctions.put("bar.field2", integer -> integer % 2 == 0 ? null : String.valueOf(integer + 2));
fooFunctions.put("bar.field3", integer -> integer + 3.0f);
@@ -331,7 +336,6 @@ public abstract class TestPulsarConnector {
* @param schemaInfo
* @param handleKeyValueType
* @param includeInternalColumn
- * @param dispatchingRowDecoderFactory
* @return
*/
protected static List<PulsarColumnHandle> getColumnColumnHandles(TopicName topicName, SchemaInfo schemaInfo,
@@ -393,6 +397,7 @@ public abstract class TestPulsarConnector {
LocalDate localDate = LocalDate.now();
LocalDate epoch = LocalDate.ofEpochDay(0);
foo.date = Math.toIntExact(ChronoUnit.DAYS.between(epoch, localDate));
+ foo.decimal= BigDecimal.valueOf(count, 2);
MessageMetadata messageMetadata = new MessageMetadata()
.setProducerName("test-producer").setSequenceId(i)
@@ -609,6 +614,7 @@ public abstract class TestPulsarConnector {
foo.timestamp = (long) fooFunctions.get("timestamp").apply(count);
foo.time = (int) fooFunctions.get("time").apply(count);
foo.date = (int) fooFunctions.get("date").apply(count);
+ foo.decimal = (BigDecimal) fooFunctions.get("decimal").apply(count);
foo.bar = bar;
foo.field7 = (Foo.TestEnum) fooFunctions.get("field7").apply(count);
diff --git a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarRecordCursor.java b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarRecordCursor.java
index dbde648ee95..23dc69245f0 100644
--- a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarRecordCursor.java
+++ b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarRecordCursor.java
@@ -22,7 +22,11 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import io.airlift.log.Logger;
import io.netty.buffer.ByteBuf;
import io.prestosql.spi.predicate.TupleDomain;
+import io.prestosql.spi.type.DecimalType;
import io.prestosql.spi.type.RowType;
+import io.prestosql.spi.type.Type;
+import io.prestosql.spi.type.VarcharType;
+import java.math.BigDecimal;
import lombok.Data;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.Entry;
@@ -142,6 +146,17 @@ public class TestPulsarRecordCursor extends TestPulsarConnector {
}else if (fooColumnHandles.get(i).getName().equals("field7")) {
assertEquals(pulsarRecordCursor.getSlice(i).getBytes(), fooFunctions.get("field7").apply(count).toString().getBytes());
columnsSeen.add(fooColumnHandles.get(i).getName());
+ }else if (fooColumnHandles.get(i).getName().equals("decimal")) {
+ Type type = fooColumnHandles.get(i).getType();
+ // In JsonDecoder, decimal trans to varcharType
+ if (type instanceof VarcharType) {
+ assertEquals(new String(pulsarRecordCursor.getSlice(i).getBytes()),
+ fooFunctions.get("decimal").apply(count).toString());
+ } else {
+ DecimalType decimalType = (DecimalType) fooColumnHandles.get(i).getType();
+ assertEquals(BigDecimal.valueOf(pulsarRecordCursor.getLong(i), decimalType.getScale()), fooFunctions.get("decimal").apply(count));
+ }
+ columnsSeen.add(fooColumnHandles.get(i).getName());
} else {
if (PulsarInternalColumn.getInternalFieldsMap().containsKey(fooColumnHandles.get(i).getName())) {
columnsSeen.add(fooColumnHandles.get(i).getName());
diff --git a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/decoder/AbstractDecoderTester.java b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/decoder/AbstractDecoderTester.java
index 5cd46832516..e5ceb321aae 100644
--- a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/decoder/AbstractDecoderTester.java
+++ b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/decoder/AbstractDecoderTester.java
@@ -26,6 +26,7 @@ import io.prestosql.spi.connector.ColumnMetadata;
import io.prestosql.spi.connector.ConnectorContext;
import io.prestosql.spi.type.Type;
import io.prestosql.testing.TestingConnectorContext;
+import java.math.BigDecimal;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.schema.SchemaInfo;
@@ -102,6 +103,10 @@ public abstract class AbstractDecoderTester {
decoderTestUtil.checkValue(decodedRow, handle, value);
}
+ protected void checkValue(Map<DecoderColumnHandle, FieldValueProvider> decodedRow, DecoderColumnHandle handle, BigDecimal value) {
+ decoderTestUtil.checkValue(decodedRow, handle, value);
+ }
+
protected Block getBlock(Map<DecoderColumnHandle, FieldValueProvider> decodedRow, DecoderColumnHandle handle) {
FieldValueProvider provider = decodedRow.get(handle);
assertNotNull(provider);
diff --git a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/decoder/DecoderTestMessage.java b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/decoder/DecoderTestMessage.java
index 115f3691c00..da6d92e5158 100644
--- a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/decoder/DecoderTestMessage.java
+++ b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/decoder/DecoderTestMessage.java
@@ -18,6 +18,7 @@
*/
package org.apache.pulsar.sql.presto.decoder;
+import java.math.BigDecimal;
import lombok.Data;
import java.util.List;
@@ -45,6 +46,10 @@ public class DecoderTestMessage {
public int dateField;
public TestRow rowField;
public TestEnum enumField;
+ @org.apache.avro.reflect.AvroSchema("{ \"type\": \"bytes\", \"logicalType\": \"decimal\", \"precision\": 4, \"scale\": 2 }")
+ public BigDecimal decimalField;
+ @org.apache.avro.reflect.AvroSchema("{ \"type\": \"bytes\", \"logicalType\": \"decimal\", \"precision\": 30, \"scale\": 2 }")
+ public BigDecimal longDecimalField;
public List<String> arrayField;
public Map<String, Long> mapField;
@@ -62,7 +67,6 @@ public class DecoderTestMessage {
public long longField;
}
-
public static class CompositeRow {
public String stringField;
public List<NestedRow> arrayField;
diff --git a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/decoder/DecoderTestUtil.java b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/decoder/DecoderTestUtil.java
index 4c3c4a63447..496a6f061bf 100644
--- a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/decoder/DecoderTestUtil.java
+++ b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/decoder/DecoderTestUtil.java
@@ -23,11 +23,16 @@ import io.prestosql.decoder.DecoderColumnHandle;
import io.prestosql.decoder.FieldValueProvider;
import io.prestosql.spi.block.Block;
import io.prestosql.spi.type.ArrayType;
+import io.prestosql.spi.type.DecimalType;
+import io.prestosql.spi.type.Decimals;
import io.prestosql.spi.type.MapType;
import io.prestosql.spi.type.RowType;
import io.prestosql.spi.type.Type;
+import java.math.BigDecimal;
+import java.math.BigInteger;
import java.util.Map;
+import static io.prestosql.spi.type.UnscaledDecimal128Arithmetic.UNSCALED_DECIMAL_128_SLICE_LENGTH;
import static io.prestosql.testing.TestingConnectorSession.SESSION;
import static org.testng.Assert.*;
@@ -113,6 +118,21 @@ public abstract class DecoderTestUtil {
assertEquals(provider.getBoolean(), value);
}
+ public void checkValue(Map<DecoderColumnHandle, FieldValueProvider> decodedRow, DecoderColumnHandle handle, BigDecimal value) {
+ FieldValueProvider provider = decodedRow.get(handle);
+ DecimalType decimalType = (DecimalType) handle.getType();
+ BigDecimal actualDecimal;
+ if (decimalType.getFixedSize() == UNSCALED_DECIMAL_128_SLICE_LENGTH) {
+ Slice slice = provider.getSlice();
+ BigInteger bigInteger = Decimals.decodeUnscaledValue(slice);
+ actualDecimal = new BigDecimal(bigInteger, decimalType.getScale());
+ } else {
+ actualDecimal = BigDecimal.valueOf(provider.getLong(), decimalType.getScale());
+ }
+ assertNotNull(provider);
+ assertEquals(actualDecimal, value);
+ }
+
public void checkIsNull(Map<DecoderColumnHandle, FieldValueProvider> decodedRow, DecoderColumnHandle handle) {
FieldValueProvider provider = decodedRow.get(handle);
assertNotNull(provider);
diff --git a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/decoder/avro/TestAvroDecoder.java b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/decoder/avro/TestAvroDecoder.java
index 1cfbbb4fce5..7b270c7995b 100644
--- a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/decoder/avro/TestAvroDecoder.java
+++ b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/decoder/avro/TestAvroDecoder.java
@@ -25,11 +25,13 @@ import io.prestosql.decoder.FieldValueProvider;
import io.prestosql.spi.PrestoException;
import io.prestosql.spi.type.ArrayType;
import io.prestosql.spi.type.BigintType;
+import io.prestosql.spi.type.DecimalType;
import io.prestosql.spi.type.RowType;
import io.prestosql.spi.type.StandardTypes;
import io.prestosql.spi.type.Type;
import io.prestosql.spi.type.TypeSignatureParameter;
import io.prestosql.spi.type.VarcharType;
+import java.math.BigDecimal;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
@@ -87,6 +89,8 @@ public class TestAvroDecoder extends AbstractDecoderTester {
message.longField = 222L;
message.timestampField = System.currentTimeMillis();
message.enumField = DecoderTestMessage.TestEnum.TEST_ENUM_1;
+ message.decimalField = BigDecimal.valueOf(2233, 2);
+ message.longDecimalField = new BigDecimal("1234567891234567891234567891.23");
LocalTime now = LocalTime.now(ZoneId.systemDefault());
message.timeField = now.toSecondOfDay() * 1000;
@@ -127,6 +131,13 @@ public class TestAvroDecoder extends AbstractDecoderTester {
"enumField", VARCHAR, false, false, "enumField", null, null, PulsarColumnHandle.HandleKeyValueType.NONE);
checkValue(decodedRow, enumFieldColumnHandle, message.enumField.toString());
+ PulsarColumnHandle decimalFieldColumnHandle = new PulsarColumnHandle(getPulsarConnectorId().toString(),
+ "decimalField", DecimalType.createDecimalType(4, 2), false, false, "decimalField", null, null, PulsarColumnHandle.HandleKeyValueType.NONE);
+ checkValue(decodedRow, decimalFieldColumnHandle, message.decimalField);
+
+ PulsarColumnHandle longDecimalFieldColumnHandle = new PulsarColumnHandle(getPulsarConnectorId().toString(),
+ "longDecimalField", DecimalType.createDecimalType(30, 2), false, false, "longDecimalField", null, null, PulsarColumnHandle.HandleKeyValueType.NONE);
+ checkValue(decodedRow, longDecimalFieldColumnHandle, message.longDecimalField);
}
@Test