You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/04/28 13:04:18 UTC

[pulsar] 14/26: Pulsar SQL support for Decimal data type (#15153)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 642159c8866ac13246ee112964ad79f4b0c7cf9e
Author: Baodi Shi <wu...@icloud.com>
AuthorDate: Thu Apr 21 17:26:35 2022 +0800

    Pulsar SQL support for Decimal data type (#15153)
    
    (cherry picked from commit 6b004ed6a2554ab826a00aa2a177963de3c5f44b)
---
 .../presto/decoder/avro/PulsarAvroColumnDecoder.java | 19 ++++++++++++++++++-
 .../decoder/avro/PulsarAvroRowDecoderFactory.java    | 10 +++++++++-
 .../decoder/json/PulsarJsonRowDecoderFactory.java    |  6 ++++++
 .../pulsar/sql/presto/TestPulsarConnector.java       |  8 +++++++-
 .../pulsar/sql/presto/TestPulsarRecordCursor.java    | 15 +++++++++++++++
 .../sql/presto/decoder/AbstractDecoderTester.java    |  5 +++++
 .../sql/presto/decoder/DecoderTestMessage.java       |  6 +++++-
 .../pulsar/sql/presto/decoder/DecoderTestUtil.java   | 20 ++++++++++++++++++++
 .../sql/presto/decoder/avro/TestAvroDecoder.java     | 11 +++++++++++
 9 files changed, 96 insertions(+), 4 deletions(-)

diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/decoder/avro/PulsarAvroColumnDecoder.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/decoder/avro/PulsarAvroColumnDecoder.java
index 690daf62d2e..0c57336d213 100644
--- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/decoder/avro/PulsarAvroColumnDecoder.java
+++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/decoder/avro/PulsarAvroColumnDecoder.java
@@ -40,6 +40,8 @@ import io.prestosql.spi.type.ArrayType;
 import io.prestosql.spi.type.BigintType;
 import io.prestosql.spi.type.BooleanType;
 import io.prestosql.spi.type.DateType;
+import io.prestosql.spi.type.DecimalType;
+import io.prestosql.spi.type.Decimals;
 import io.prestosql.spi.type.DoubleType;
 import io.prestosql.spi.type.IntegerType;
 import io.prestosql.spi.type.MapType;
@@ -53,6 +55,7 @@ import io.prestosql.spi.type.TinyintType;
 import io.prestosql.spi.type.Type;
 import io.prestosql.spi.type.VarbinaryType;
 import io.prestosql.spi.type.VarcharType;
+import java.math.BigInteger;
 import java.nio.ByteBuffer;
 import java.util.List;
 import java.util.Map;
@@ -139,7 +142,7 @@ public class PulsarAvroColumnDecoder {
     }
 
     private boolean isSupportedPrimitive(Type type) {
-        return type instanceof VarcharType || SUPPORTED_PRIMITIVE_TYPES.contains(type);
+        return type instanceof VarcharType || type instanceof DecimalType || SUPPORTED_PRIMITIVE_TYPES.contains(type);
     }
 
     public FieldValueProvider decodeField(GenericRecord avroRecord) {
@@ -205,6 +208,13 @@ public class PulsarAvroColumnDecoder {
                 return floatToIntBits((Float) value);
             }
 
+            if (columnType instanceof DecimalType) {
+                ByteBuffer buffer = (ByteBuffer) value;
+                byte[] bytes = new byte[buffer.remaining()];
+                buffer.get(bytes);
+                return new BigInteger(bytes).longValue();
+            }
+
             throw new PrestoException(DECODER_CONVERSION_NOT_SUPPORTED,
                     format("cannot decode object of '%s' as '%s' for column '%s'",
                             value.getClass(), columnType, columnName));
@@ -234,6 +244,13 @@ public class PulsarAvroColumnDecoder {
             }
         }
 
+        // The returned Slice size must be equals to 18 Byte
+        if (type instanceof DecimalType) {
+            ByteBuffer buffer = (ByteBuffer) value;
+            BigInteger bigInteger = new BigInteger(buffer.array());
+            return Decimals.encodeUnscaledValue(bigInteger);
+        }
+
         throw new PrestoException(DECODER_CONVERSION_NOT_SUPPORTED,
                 format("cannot decode object of '%s' as '%s' for column '%s'",
                         value.getClass(), type, columnName));
diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/decoder/avro/PulsarAvroRowDecoderFactory.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/decoder/avro/PulsarAvroRowDecoderFactory.java
index 12352059c2d..74b0a88fcef 100644
--- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/decoder/avro/PulsarAvroRowDecoderFactory.java
+++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/decoder/avro/PulsarAvroRowDecoderFactory.java
@@ -33,6 +33,7 @@ import io.prestosql.spi.connector.ColumnMetadata;
 import io.prestosql.spi.type.ArrayType;
 import io.prestosql.spi.type.BigintType;
 import io.prestosql.spi.type.BooleanType;
+import io.prestosql.spi.type.DecimalType;
 import io.prestosql.spi.type.DoubleType;
 import io.prestosql.spi.type.IntegerType;
 import io.prestosql.spi.type.RealType;
@@ -128,7 +129,14 @@ public class PulsarAvroRowDecoderFactory implements PulsarRowDecoderFactory {
                                 + "please check the schema or report the bug.", fieldname));
             case FIXED:
             case BYTES:
-                //TODO: support decimal logicalType
+                //  When the precision <= 0, throw Exception.
+                //  When the precision > 0 and <= 18, use ShortDecimalType. and mapping Long
+                //  When the precision > 18 and <= 36, use LongDecimalType. and mapping Slice
+                //  When the precision > 36, throw Exception.
+                if (logicalType instanceof LogicalTypes.Decimal) {
+                    LogicalTypes.Decimal decimal = (LogicalTypes.Decimal) logicalType;
+                    return DecimalType.createDecimalType(decimal.getPrecision(), decimal.getScale());
+                }
                 return VarbinaryType.VARBINARY;
             case INT:
                 if (logicalType == LogicalTypes.timeMillis()) {
diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/decoder/json/PulsarJsonRowDecoderFactory.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/decoder/json/PulsarJsonRowDecoderFactory.java
index 330631e72a8..bb064d8909f 100644
--- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/decoder/json/PulsarJsonRowDecoderFactory.java
+++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/decoder/json/PulsarJsonRowDecoderFactory.java
@@ -128,6 +128,12 @@ public class PulsarJsonRowDecoderFactory implements PulsarRowDecoderFactory {
                                 + "please check the schema or report the bug.", fieldname));
             case FIXED:
             case BYTES:
+                // In the current implementation, since JsonSchema is generated by Avro,
+                // there may exist LogicalTypes.Decimal.
+                // Mapping decimalType with varcharType in JsonSchema.
+                if (logicalType instanceof LogicalTypes.Decimal) {
+                    return createUnboundedVarcharType();
+                }
                 return VarbinaryType.VARBINARY;
             case INT:
                 if (logicalType == LogicalTypes.timeMillis()) {
diff --git a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarConnector.java b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarConnector.java
index b673fc368e6..7db32f59148 100644
--- a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarConnector.java
+++ b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarConnector.java
@@ -25,6 +25,7 @@ import io.prestosql.spi.connector.ColumnMetadata;
 import io.prestosql.spi.connector.ConnectorContext;
 import io.prestosql.spi.predicate.TupleDomain;
 import io.prestosql.testing.TestingConnectorContext;
+import java.math.BigDecimal;
 import org.apache.bookkeeper.mledger.AsyncCallbacks;
 import org.apache.bookkeeper.mledger.Entry;
 import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
@@ -166,6 +167,8 @@ public abstract class TestPulsarConnector {
         public int time;
         @org.apache.avro.reflect.AvroSchema("{ \"type\": \"int\", \"logicalType\": \"date\" }")
         public int date;
+        @org.apache.avro.reflect.AvroSchema("{ \"type\": \"bytes\", \"logicalType\": \"decimal\", \"precision\": 4, \"scale\": 2 }")
+        public BigDecimal decimal;
         public TestPulsarConnector.Bar bar;
         public TestEnum field7;
     }
@@ -253,6 +256,7 @@ public abstract class TestPulsarConnector {
             fooFieldNames.add("date");
             fooFieldNames.add("bar");
             fooFieldNames.add("field7");
+            fooFieldNames.add("decimal");
 
 
             ConnectorContext prestoConnectorContext = new TestingConnectorContext();
@@ -313,6 +317,7 @@ public abstract class TestPulsarConnector {
                 LocalDate epoch = LocalDate.ofEpochDay(0);
                 return Math.toIntExact(ChronoUnit.DAYS.between(epoch, localDate));
             });
+            fooFunctions.put("decimal", integer -> BigDecimal.valueOf(1234, 2));
             fooFunctions.put("bar.field1", integer -> integer % 3 == 0 ? null : integer + 1);
             fooFunctions.put("bar.field2", integer -> integer % 2 == 0 ? null : String.valueOf(integer + 2));
             fooFunctions.put("bar.field3", integer -> integer + 3.0f);
@@ -331,7 +336,6 @@ public abstract class TestPulsarConnector {
      * @param schemaInfo
      * @param handleKeyValueType
      * @param includeInternalColumn
-     * @param dispatchingRowDecoderFactory
      * @return
      */
     protected static List<PulsarColumnHandle> getColumnColumnHandles(TopicName topicName, SchemaInfo schemaInfo,
@@ -393,6 +397,7 @@ public abstract class TestPulsarConnector {
             LocalDate localDate = LocalDate.now();
             LocalDate epoch = LocalDate.ofEpochDay(0);
             foo.date = Math.toIntExact(ChronoUnit.DAYS.between(epoch, localDate));
+            foo.decimal= BigDecimal.valueOf(count, 2);
 
             MessageMetadata messageMetadata = new MessageMetadata()
                     .setProducerName("test-producer").setSequenceId(i)
@@ -609,6 +614,7 @@ public abstract class TestPulsarConnector {
                                     foo.timestamp = (long) fooFunctions.get("timestamp").apply(count);
                                     foo.time = (int) fooFunctions.get("time").apply(count);
                                     foo.date = (int) fooFunctions.get("date").apply(count);
+                                    foo.decimal = (BigDecimal) fooFunctions.get("decimal").apply(count);
                                     foo.bar = bar;
                                     foo.field7 = (Foo.TestEnum) fooFunctions.get("field7").apply(count);
 
diff --git a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarRecordCursor.java b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarRecordCursor.java
index dbde648ee95..23dc69245f0 100644
--- a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarRecordCursor.java
+++ b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarRecordCursor.java
@@ -22,7 +22,11 @@ import com.fasterxml.jackson.databind.ObjectMapper;
 import io.airlift.log.Logger;
 import io.netty.buffer.ByteBuf;
 import io.prestosql.spi.predicate.TupleDomain;
+import io.prestosql.spi.type.DecimalType;
 import io.prestosql.spi.type.RowType;
+import io.prestosql.spi.type.Type;
+import io.prestosql.spi.type.VarcharType;
+import java.math.BigDecimal;
 import lombok.Data;
 import org.apache.bookkeeper.mledger.AsyncCallbacks;
 import org.apache.bookkeeper.mledger.Entry;
@@ -142,6 +146,17 @@ public class TestPulsarRecordCursor extends TestPulsarConnector {
                         }else if (fooColumnHandles.get(i).getName().equals("field7")) {
                             assertEquals(pulsarRecordCursor.getSlice(i).getBytes(), fooFunctions.get("field7").apply(count).toString().getBytes());
                             columnsSeen.add(fooColumnHandles.get(i).getName());
+                        }else if (fooColumnHandles.get(i).getName().equals("decimal")) {
+                            Type type = fooColumnHandles.get(i).getType();
+                            // In JsonDecoder, decimal trans to varcharType
+                            if (type instanceof VarcharType) {
+                                assertEquals(new String(pulsarRecordCursor.getSlice(i).getBytes()),
+                                        fooFunctions.get("decimal").apply(count).toString());
+                            } else {
+                                DecimalType decimalType = (DecimalType) fooColumnHandles.get(i).getType();
+                                assertEquals(BigDecimal.valueOf(pulsarRecordCursor.getLong(i), decimalType.getScale()), fooFunctions.get("decimal").apply(count));
+                            }
+                            columnsSeen.add(fooColumnHandles.get(i).getName());
                         } else {
                             if (PulsarInternalColumn.getInternalFieldsMap().containsKey(fooColumnHandles.get(i).getName())) {
                                 columnsSeen.add(fooColumnHandles.get(i).getName());
diff --git a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/decoder/AbstractDecoderTester.java b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/decoder/AbstractDecoderTester.java
index 5cd46832516..e5ceb321aae 100644
--- a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/decoder/AbstractDecoderTester.java
+++ b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/decoder/AbstractDecoderTester.java
@@ -26,6 +26,7 @@ import io.prestosql.spi.connector.ColumnMetadata;
 import io.prestosql.spi.connector.ConnectorContext;
 import io.prestosql.spi.type.Type;
 import io.prestosql.testing.TestingConnectorContext;
+import java.math.BigDecimal;
 import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.schema.SchemaInfo;
@@ -102,6 +103,10 @@ public abstract class AbstractDecoderTester {
         decoderTestUtil.checkValue(decodedRow, handle, value);
     }
 
+    protected void checkValue(Map<DecoderColumnHandle, FieldValueProvider> decodedRow, DecoderColumnHandle handle, BigDecimal value) {
+        decoderTestUtil.checkValue(decodedRow, handle, value);
+    }
+
     protected Block getBlock(Map<DecoderColumnHandle, FieldValueProvider> decodedRow, DecoderColumnHandle handle) {
         FieldValueProvider provider = decodedRow.get(handle);
         assertNotNull(provider);
diff --git a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/decoder/DecoderTestMessage.java b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/decoder/DecoderTestMessage.java
index 115f3691c00..da6d92e5158 100644
--- a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/decoder/DecoderTestMessage.java
+++ b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/decoder/DecoderTestMessage.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pulsar.sql.presto.decoder;
 
+import java.math.BigDecimal;
 import lombok.Data;
 
 import java.util.List;
@@ -45,6 +46,10 @@ public class DecoderTestMessage {
     public int dateField;
     public TestRow rowField;
     public TestEnum enumField;
+    @org.apache.avro.reflect.AvroSchema("{ \"type\": \"bytes\", \"logicalType\": \"decimal\", \"precision\": 4, \"scale\": 2 }")
+    public BigDecimal decimalField;
+    @org.apache.avro.reflect.AvroSchema("{ \"type\": \"bytes\", \"logicalType\": \"decimal\", \"precision\": 30, \"scale\": 2 }")
+    public BigDecimal longDecimalField;
 
     public List<String> arrayField;
     public Map<String, Long> mapField;
@@ -62,7 +67,6 @@ public class DecoderTestMessage {
         public long longField;
     }
 
-
     public static class CompositeRow {
         public String stringField;
         public List<NestedRow> arrayField;
diff --git a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/decoder/DecoderTestUtil.java b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/decoder/DecoderTestUtil.java
index 4c3c4a63447..496a6f061bf 100644
--- a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/decoder/DecoderTestUtil.java
+++ b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/decoder/DecoderTestUtil.java
@@ -23,11 +23,16 @@ import io.prestosql.decoder.DecoderColumnHandle;
 import io.prestosql.decoder.FieldValueProvider;
 import io.prestosql.spi.block.Block;
 import io.prestosql.spi.type.ArrayType;
+import io.prestosql.spi.type.DecimalType;
+import io.prestosql.spi.type.Decimals;
 import io.prestosql.spi.type.MapType;
 import io.prestosql.spi.type.RowType;
 import io.prestosql.spi.type.Type;
+import java.math.BigDecimal;
+import java.math.BigInteger;
 import java.util.Map;
 
+import static io.prestosql.spi.type.UnscaledDecimal128Arithmetic.UNSCALED_DECIMAL_128_SLICE_LENGTH;
 import static io.prestosql.testing.TestingConnectorSession.SESSION;
 import static org.testng.Assert.*;
 
@@ -113,6 +118,21 @@ public abstract class DecoderTestUtil {
         assertEquals(provider.getBoolean(), value);
     }
 
+    public void checkValue(Map<DecoderColumnHandle, FieldValueProvider> decodedRow, DecoderColumnHandle handle, BigDecimal value) {
+        FieldValueProvider provider = decodedRow.get(handle);
+        DecimalType decimalType = (DecimalType) handle.getType();
+        BigDecimal actualDecimal;
+        if (decimalType.getFixedSize() == UNSCALED_DECIMAL_128_SLICE_LENGTH) {
+            Slice slice = provider.getSlice();
+            BigInteger bigInteger = Decimals.decodeUnscaledValue(slice);
+            actualDecimal = new BigDecimal(bigInteger, decimalType.getScale());
+        } else {
+            actualDecimal = BigDecimal.valueOf(provider.getLong(), decimalType.getScale());
+        }
+        assertNotNull(provider);
+        assertEquals(actualDecimal, value);
+    }
+
     public void checkIsNull(Map<DecoderColumnHandle, FieldValueProvider> decodedRow, DecoderColumnHandle handle) {
         FieldValueProvider provider = decodedRow.get(handle);
         assertNotNull(provider);
diff --git a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/decoder/avro/TestAvroDecoder.java b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/decoder/avro/TestAvroDecoder.java
index 1cfbbb4fce5..7b270c7995b 100644
--- a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/decoder/avro/TestAvroDecoder.java
+++ b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/decoder/avro/TestAvroDecoder.java
@@ -25,11 +25,13 @@ import io.prestosql.decoder.FieldValueProvider;
 import io.prestosql.spi.PrestoException;
 import io.prestosql.spi.type.ArrayType;
 import io.prestosql.spi.type.BigintType;
+import io.prestosql.spi.type.DecimalType;
 import io.prestosql.spi.type.RowType;
 import io.prestosql.spi.type.StandardTypes;
 import io.prestosql.spi.type.Type;
 import io.prestosql.spi.type.TypeSignatureParameter;
 import io.prestosql.spi.type.VarcharType;
+import java.math.BigDecimal;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -87,6 +89,8 @@ public class TestAvroDecoder extends AbstractDecoderTester {
         message.longField = 222L;
         message.timestampField = System.currentTimeMillis();
         message.enumField = DecoderTestMessage.TestEnum.TEST_ENUM_1;
+        message.decimalField = BigDecimal.valueOf(2233, 2);
+        message.longDecimalField = new BigDecimal("1234567891234567891234567891.23");
 
         LocalTime now = LocalTime.now(ZoneId.systemDefault());
         message.timeField = now.toSecondOfDay() * 1000;
@@ -127,6 +131,13 @@ public class TestAvroDecoder extends AbstractDecoderTester {
                 "enumField", VARCHAR, false, false, "enumField", null, null, PulsarColumnHandle.HandleKeyValueType.NONE);
         checkValue(decodedRow, enumFieldColumnHandle, message.enumField.toString());
 
+        PulsarColumnHandle decimalFieldColumnHandle = new PulsarColumnHandle(getPulsarConnectorId().toString(),
+                "decimalField", DecimalType.createDecimalType(4, 2), false, false, "decimalField", null, null, PulsarColumnHandle.HandleKeyValueType.NONE);
+        checkValue(decodedRow, decimalFieldColumnHandle, message.decimalField);
+
+        PulsarColumnHandle longDecimalFieldColumnHandle = new PulsarColumnHandle(getPulsarConnectorId().toString(),
+                "longDecimalField", DecimalType.createDecimalType(30, 2), false, false, "longDecimalField", null, null, PulsarColumnHandle.HandleKeyValueType.NONE);
+        checkValue(decodedRow, longDecimalFieldColumnHandle, message.longDecimalField);
     }
 
     @Test