You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/05/26 00:24:23 UTC

[pulsar] branch master updated: [fix][sql] Fix the decimal type error convert in json schema (#15687)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 0c6e2ca24fd [fix][sql] Fix the decimal type error convert in json schema (#15687)
0c6e2ca24fd is described below

commit 0c6e2ca24fd368d11a769233bb2041f6cc4a8374
Author: Baodi Shi <wu...@icloud.com>
AuthorDate: Thu May 26 08:24:14 2022 +0800

    [fix][sql] Fix the decimal type error convert in json schema (#15687)
    
    ### Motivation
    
    In the current sql implementation, If using `JSON` schema and querying for decimal type, there will be the following two errors:
    
    1. The data type is displayed as varchar.
    2. Loss of precision because scientific notation is used to display.
    
    ```
    presto> select bigdecimal, typeof(bigdecimal) as devimal_type from pulsar."public/default".test_avro2;
          bigdecimal       | devimal_type
    -----------------------+--------------
     1.2345678912345678E36 | varchar
     1.2345678912345678E36 | varchar
    (2 rows)
    ```
    The original data is: `1234567891234567891234567891234567.89`
    
    ### Modifications
    -  When getting jsonNode,  use `BIG_DECIMAL` instead of float and double.
    - `PulsarJsonFieldDecoder` increases the processing of Decimal types
---
 .../impl/schema/generic/GenericJsonReader.java     | 16 ++++++++--------
 .../decoder/json/PulsarJsonFieldDecoder.java       | 22 ++++++++++++++++++++++
 .../decoder/json/PulsarJsonRowDecoderFactory.java  | 11 +++++++----
 .../sql/presto/decoder/avro/TestAvroDecoder.java   | 13 +++++++++++--
 .../sql/presto/decoder/json/TestJsonDecoder.java   | 20 ++++++++++++++++++++
 5 files changed, 68 insertions(+), 14 deletions(-)

diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericJsonReader.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericJsonReader.java
index 3284d427a29..1a95e9be152 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericJsonReader.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericJsonReader.java
@@ -19,8 +19,10 @@
 package org.apache.pulsar.client.impl.schema.generic;
 
 import static java.nio.charset.StandardCharsets.UTF_8;
+import com.fasterxml.jackson.databind.DeserializationFeature;
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.ObjectReader;
 import java.io.IOException;
 import java.io.InputStream;
 import java.util.List;
@@ -34,16 +36,13 @@ import org.slf4j.LoggerFactory;
 
 public class GenericJsonReader implements SchemaReader<GenericRecord> {
 
-    private final ObjectMapper objectMapper;
+    private final ObjectReader objectReader;
     private final byte[] schemaVersion;
     private final List<Field> fields;
     private SchemaInfo schemaInfo;
 
     public GenericJsonReader(List<Field> fields, SchemaInfo schemaInfo){
-        this.fields = fields;
-        this.schemaVersion = null;
-        this.objectMapper = new ObjectMapper();
-        this.schemaInfo = schemaInfo;
+        this(null, fields, schemaInfo);
     }
 
     public GenericJsonReader(List<Field> fields){
@@ -55,16 +54,17 @@ public class GenericJsonReader implements SchemaReader<GenericRecord> {
     }
 
     public GenericJsonReader(byte[] schemaVersion, List<Field> fields, SchemaInfo schemaInfo){
-        this.objectMapper = new ObjectMapper();
         this.fields = fields;
         this.schemaVersion = schemaVersion;
         this.schemaInfo = schemaInfo;
+        ObjectMapper objectMapper = new ObjectMapper();
+        this.objectReader = objectMapper.reader().with(DeserializationFeature.USE_BIG_DECIMAL_FOR_FLOATS);
     }
 
     @Override
     public GenericJsonRecord read(byte[] bytes, int offset, int length) {
         try {
-            JsonNode jn = objectMapper.readTree(new String(bytes, offset, length, UTF_8));
+            JsonNode jn = objectReader.readTree(new String(bytes, offset, length, UTF_8));
             return new GenericJsonRecord(schemaVersion, fields, jn, schemaInfo);
         } catch (IOException ioe) {
             throw new SchemaSerializationException(ioe);
@@ -74,7 +74,7 @@ public class GenericJsonReader implements SchemaReader<GenericRecord> {
     @Override
     public GenericRecord read(InputStream inputStream) {
         try {
-            JsonNode jn = objectMapper.readTree(inputStream);
+            JsonNode jn = objectReader.readTree(inputStream);
             return new GenericJsonRecord(schemaVersion, fields, jn, schemaInfo);
         } catch (IOException ioe) {
             throw new SchemaSerializationException(ioe);
diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/decoder/json/PulsarJsonFieldDecoder.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/decoder/json/PulsarJsonFieldDecoder.java
index 5c9dc0c5459..fcd2550f9ba 100644
--- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/decoder/json/PulsarJsonFieldDecoder.java
+++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/decoder/json/PulsarJsonFieldDecoder.java
@@ -57,6 +57,8 @@ import io.prestosql.spi.type.ArrayType;
 import io.prestosql.spi.type.BigintType;
 import io.prestosql.spi.type.BooleanType;
 import io.prestosql.spi.type.DateType;
+import io.prestosql.spi.type.DecimalType;
+import io.prestosql.spi.type.Decimals;
 import io.prestosql.spi.type.DoubleType;
 import io.prestosql.spi.type.IntegerType;
 import io.prestosql.spi.type.MapType;
@@ -69,6 +71,7 @@ import io.prestosql.spi.type.TinyintType;
 import io.prestosql.spi.type.Type;
 import io.prestosql.spi.type.VarbinaryType;
 import io.prestosql.spi.type.VarcharType;
+import java.math.BigInteger;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -118,6 +121,9 @@ public class PulsarJsonFieldDecoder
     }
 
     private boolean isSupportedType(Type type) {
+        if (type instanceof DecimalType) {
+            return true;
+        }
         if (isVarcharType(type)) {
             return true;
         }
@@ -226,6 +232,13 @@ public class PulsarJsonFieldDecoder
                     return floatToIntBits((Float) parseFloat(value.asText()));
                 }
 
+                // If it is decimalType, need to eliminate the decimal point,
+                // and give it to presto to set the decimal point
+                if (type instanceof DecimalType) {
+                    String decimalLong = value.asText().replace(".", "");
+                    return Long.valueOf(decimalLong);
+                }
+
                 long longValue;
                 if (value.isIntegralNumber() && !value.isBigInteger()) {
                     longValue = value.longValue();
@@ -265,6 +278,15 @@ public class PulsarJsonFieldDecoder
 
         private static Slice getSlice(JsonNode value, Type type, String columnName) {
             String textValue = value.isValueNode() ? value.asText() : value.toString();
+
+            // If it is decimalType, need to eliminate the decimal point,
+            // and give it to presto to set the decimal point
+            if (type instanceof DecimalType) {
+                textValue = textValue.replace(".", "");
+                BigInteger bigInteger = new BigInteger(textValue);
+                return Decimals.encodeUnscaledValue(bigInteger);
+            }
+
             Slice slice = utf8Slice(textValue);
             if (isVarcharType(type)) {
                 slice = truncateToLength(slice, type);
diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/decoder/json/PulsarJsonRowDecoderFactory.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/decoder/json/PulsarJsonRowDecoderFactory.java
index b8091cf2592..fef4b39d59e 100644
--- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/decoder/json/PulsarJsonRowDecoderFactory.java
+++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/decoder/json/PulsarJsonRowDecoderFactory.java
@@ -33,6 +33,7 @@ import io.prestosql.spi.connector.ColumnMetadata;
 import io.prestosql.spi.type.ArrayType;
 import io.prestosql.spi.type.BigintType;
 import io.prestosql.spi.type.BooleanType;
+import io.prestosql.spi.type.DecimalType;
 import io.prestosql.spi.type.DoubleType;
 import io.prestosql.spi.type.IntegerType;
 import io.prestosql.spi.type.RealType;
@@ -128,11 +129,13 @@ public class PulsarJsonRowDecoderFactory implements PulsarRowDecoderFactory {
                                 + "please check the schema or report the bug.", fieldname));
             case FIXED:
             case BYTES:
-                // In the current implementation, since JsonSchema is generated by Avro,
-                // there may exist LogicalTypes.Decimal.
-                // Mapping decimalType with varcharType in JsonSchema.
+                //  When the precision <= 0, throw Exception.
+                //  When the precision > 0 and <= 18, use ShortDecimalType. and mapping Long
+                //  When the precision > 18 and <= 36, use LongDecimalType. and mapping Slice
+                //  When the precision > 36, throw Exception.
                 if (logicalType instanceof LogicalTypes.Decimal) {
-                    return createUnboundedVarcharType();
+                    LogicalTypes.Decimal decimal = (LogicalTypes.Decimal) logicalType;
+                    return DecimalType.createDecimalType(decimal.getPrecision(), decimal.getScale());
                 }
                 return VarbinaryType.VARBINARY;
             case INT:
diff --git a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/decoder/avro/TestAvroDecoder.java b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/decoder/avro/TestAvroDecoder.java
index 7b270c7995b..2478300dcaa 100644
--- a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/decoder/avro/TestAvroDecoder.java
+++ b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/decoder/avro/TestAvroDecoder.java
@@ -89,8 +89,6 @@ public class TestAvroDecoder extends AbstractDecoderTester {
         message.longField = 222L;
         message.timestampField = System.currentTimeMillis();
         message.enumField = DecoderTestMessage.TestEnum.TEST_ENUM_1;
-        message.decimalField = BigDecimal.valueOf(2233, 2);
-        message.longDecimalField = new BigDecimal("1234567891234567891234567891.23");
 
         LocalTime now = LocalTime.now(ZoneId.systemDefault());
         message.timeField = now.toSecondOfDay() * 1000;
@@ -130,6 +128,17 @@ public class TestAvroDecoder extends AbstractDecoderTester {
         PulsarColumnHandle enumFieldColumnHandle = new PulsarColumnHandle(getPulsarConnectorId().toString(),
                 "enumField", VARCHAR, false, false, "enumField", null, null, PulsarColumnHandle.HandleKeyValueType.NONE);
         checkValue(decodedRow, enumFieldColumnHandle, message.enumField.toString());
+    }
+
+    @Test
+    public void testDecimal() {
+        DecoderTestMessage message = new DecoderTestMessage();
+        message.decimalField = BigDecimal.valueOf(2233, 2);
+        message.longDecimalField = new BigDecimal("1234567891234567891234567891.23");
+
+        ByteBuf payload = io.netty.buffer.Unpooled
+                .copiedBuffer(schema.encode(message));
+        Map<DecoderColumnHandle, FieldValueProvider> decodedRow = pulsarRowDecoder.decodeRow(payload).get();
 
         PulsarColumnHandle decimalFieldColumnHandle = new PulsarColumnHandle(getPulsarConnectorId().toString(),
                 "decimalField", DecimalType.createDecimalType(4, 2), false, false, "decimalField", null, null, PulsarColumnHandle.HandleKeyValueType.NONE);
diff --git a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/decoder/json/TestJsonDecoder.java b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/decoder/json/TestJsonDecoder.java
index 2a22b58a03f..0b8a8f84eda 100644
--- a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/decoder/json/TestJsonDecoder.java
+++ b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/decoder/json/TestJsonDecoder.java
@@ -24,6 +24,7 @@ import io.prestosql.decoder.DecoderColumnHandle;
 import io.prestosql.decoder.FieldValueProvider;
 import io.prestosql.spi.PrestoException;
 import io.prestosql.spi.type.*;
+import java.math.BigDecimal;
 import org.apache.pulsar.client.impl.schema.JSONSchema;
 import org.apache.pulsar.client.impl.schema.generic.GenericJsonRecord;
 import org.apache.pulsar.client.impl.schema.generic.GenericJsonSchema;
@@ -119,6 +120,25 @@ public class TestJsonDecoder extends AbstractDecoderTester {
 
     }
 
+    @Test
+    public void testDecimal() {
+        DecoderTestMessage message = new DecoderTestMessage();
+        message.decimalField = BigDecimal.valueOf(2233, 2);
+        message.longDecimalField = new BigDecimal("1234567891234567891234567891.23");
+
+        ByteBuf payload = io.netty.buffer.Unpooled
+                .copiedBuffer(schema.encode(message));
+        Map<DecoderColumnHandle, FieldValueProvider> decodedRow = pulsarRowDecoder.decodeRow(payload).get();
+
+        PulsarColumnHandle decimalFieldColumnHandle = new PulsarColumnHandle(getPulsarConnectorId().toString(),
+                "decimalField", DecimalType.createDecimalType(4, 2), false, false, "decimalField", null, null, PulsarColumnHandle.HandleKeyValueType.NONE);
+        checkValue(decodedRow, decimalFieldColumnHandle, message.decimalField);
+
+        PulsarColumnHandle longDecimalFieldColumnHandle = new PulsarColumnHandle(getPulsarConnectorId().toString(),
+                "longDecimalField", DecimalType.createDecimalType(30, 2), false, false, "longDecimalField", null, null, PulsarColumnHandle.HandleKeyValueType.NONE);
+        checkValue(decodedRow, longDecimalFieldColumnHandle, message.longDecimalField);
+    }
+
     @Test
     public void testArray() {
         DecoderTestMessage message = new DecoderTestMessage();