You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/05/26 00:24:23 UTC
[pulsar] branch master updated: [fix][sql] Fix the decimal type error convert in json schema (#15687)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 0c6e2ca24fd [fix][sql] Fix the decimal type error convert in json schema (#15687)
0c6e2ca24fd is described below
commit 0c6e2ca24fd368d11a769233bb2041f6cc4a8374
Author: Baodi Shi <wu...@icloud.com>
AuthorDate: Thu May 26 08:24:14 2022 +0800
[fix][sql] Fix the decimal type error convert in json schema (#15687)
### Motivation
In the current sql implementation, If using `JSON` schema and querying for decimal type, there will be the following two errors:
1. The data type is displayed as varchar.
2. Loss of precision because scientific notation is used to display.
```
presto> select bigdecimal, typeof(bigdecimal) as devimal_type from pulsar."public/default".test_avro2;
bigdecimal | devimal_type
-----------------------+--------------
1.2345678912345678E36 | varchar
1.2345678912345678E36 | varchar
(2 rows)
```
The original data is: `1234567891234567891234567891234567.89`
### Modifications
- When getting jsonNode, use `BIG_DECIMAL` instead of float and double.
- `PulsarJsonFieldDecoder` increases the processing of Decimal types
---
.../impl/schema/generic/GenericJsonReader.java | 16 ++++++++--------
.../decoder/json/PulsarJsonFieldDecoder.java | 22 ++++++++++++++++++++++
.../decoder/json/PulsarJsonRowDecoderFactory.java | 11 +++++++----
.../sql/presto/decoder/avro/TestAvroDecoder.java | 13 +++++++++++--
.../sql/presto/decoder/json/TestJsonDecoder.java | 20 ++++++++++++++++++++
5 files changed, 68 insertions(+), 14 deletions(-)
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericJsonReader.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericJsonReader.java
index 3284d427a29..1a95e9be152 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericJsonReader.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericJsonReader.java
@@ -19,8 +19,10 @@
package org.apache.pulsar.client.impl.schema.generic;
import static java.nio.charset.StandardCharsets.UTF_8;
+import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.ObjectReader;
import java.io.IOException;
import java.io.InputStream;
import java.util.List;
@@ -34,16 +36,13 @@ import org.slf4j.LoggerFactory;
public class GenericJsonReader implements SchemaReader<GenericRecord> {
- private final ObjectMapper objectMapper;
+ private final ObjectReader objectReader;
private final byte[] schemaVersion;
private final List<Field> fields;
private SchemaInfo schemaInfo;
public GenericJsonReader(List<Field> fields, SchemaInfo schemaInfo){
- this.fields = fields;
- this.schemaVersion = null;
- this.objectMapper = new ObjectMapper();
- this.schemaInfo = schemaInfo;
+ this(null, fields, schemaInfo);
}
public GenericJsonReader(List<Field> fields){
@@ -55,16 +54,17 @@ public class GenericJsonReader implements SchemaReader<GenericRecord> {
}
public GenericJsonReader(byte[] schemaVersion, List<Field> fields, SchemaInfo schemaInfo){
- this.objectMapper = new ObjectMapper();
this.fields = fields;
this.schemaVersion = schemaVersion;
this.schemaInfo = schemaInfo;
+ ObjectMapper objectMapper = new ObjectMapper();
+ this.objectReader = objectMapper.reader().with(DeserializationFeature.USE_BIG_DECIMAL_FOR_FLOATS);
}
@Override
public GenericJsonRecord read(byte[] bytes, int offset, int length) {
try {
- JsonNode jn = objectMapper.readTree(new String(bytes, offset, length, UTF_8));
+ JsonNode jn = objectReader.readTree(new String(bytes, offset, length, UTF_8));
return new GenericJsonRecord(schemaVersion, fields, jn, schemaInfo);
} catch (IOException ioe) {
throw new SchemaSerializationException(ioe);
@@ -74,7 +74,7 @@ public class GenericJsonReader implements SchemaReader<GenericRecord> {
@Override
public GenericRecord read(InputStream inputStream) {
try {
- JsonNode jn = objectMapper.readTree(inputStream);
+ JsonNode jn = objectReader.readTree(inputStream);
return new GenericJsonRecord(schemaVersion, fields, jn, schemaInfo);
} catch (IOException ioe) {
throw new SchemaSerializationException(ioe);
diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/decoder/json/PulsarJsonFieldDecoder.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/decoder/json/PulsarJsonFieldDecoder.java
index 5c9dc0c5459..fcd2550f9ba 100644
--- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/decoder/json/PulsarJsonFieldDecoder.java
+++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/decoder/json/PulsarJsonFieldDecoder.java
@@ -57,6 +57,8 @@ import io.prestosql.spi.type.ArrayType;
import io.prestosql.spi.type.BigintType;
import io.prestosql.spi.type.BooleanType;
import io.prestosql.spi.type.DateType;
+import io.prestosql.spi.type.DecimalType;
+import io.prestosql.spi.type.Decimals;
import io.prestosql.spi.type.DoubleType;
import io.prestosql.spi.type.IntegerType;
import io.prestosql.spi.type.MapType;
@@ -69,6 +71,7 @@ import io.prestosql.spi.type.TinyintType;
import io.prestosql.spi.type.Type;
import io.prestosql.spi.type.VarbinaryType;
import io.prestosql.spi.type.VarcharType;
+import java.math.BigInteger;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -118,6 +121,9 @@ public class PulsarJsonFieldDecoder
}
private boolean isSupportedType(Type type) {
+ if (type instanceof DecimalType) {
+ return true;
+ }
if (isVarcharType(type)) {
return true;
}
@@ -226,6 +232,13 @@ public class PulsarJsonFieldDecoder
return floatToIntBits((Float) parseFloat(value.asText()));
}
+ // If it is decimalType, need to eliminate the decimal point,
+ // and give it to presto to set the decimal point
+ if (type instanceof DecimalType) {
+ String decimalLong = value.asText().replace(".", "");
+ return Long.valueOf(decimalLong);
+ }
+
long longValue;
if (value.isIntegralNumber() && !value.isBigInteger()) {
longValue = value.longValue();
@@ -265,6 +278,15 @@ public class PulsarJsonFieldDecoder
private static Slice getSlice(JsonNode value, Type type, String columnName) {
String textValue = value.isValueNode() ? value.asText() : value.toString();
+
+ // If it is decimalType, need to eliminate the decimal point,
+ // and give it to presto to set the decimal point
+ if (type instanceof DecimalType) {
+ textValue = textValue.replace(".", "");
+ BigInteger bigInteger = new BigInteger(textValue);
+ return Decimals.encodeUnscaledValue(bigInteger);
+ }
+
Slice slice = utf8Slice(textValue);
if (isVarcharType(type)) {
slice = truncateToLength(slice, type);
diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/decoder/json/PulsarJsonRowDecoderFactory.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/decoder/json/PulsarJsonRowDecoderFactory.java
index b8091cf2592..fef4b39d59e 100644
--- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/decoder/json/PulsarJsonRowDecoderFactory.java
+++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/decoder/json/PulsarJsonRowDecoderFactory.java
@@ -33,6 +33,7 @@ import io.prestosql.spi.connector.ColumnMetadata;
import io.prestosql.spi.type.ArrayType;
import io.prestosql.spi.type.BigintType;
import io.prestosql.spi.type.BooleanType;
+import io.prestosql.spi.type.DecimalType;
import io.prestosql.spi.type.DoubleType;
import io.prestosql.spi.type.IntegerType;
import io.prestosql.spi.type.RealType;
@@ -128,11 +129,13 @@ public class PulsarJsonRowDecoderFactory implements PulsarRowDecoderFactory {
+ "please check the schema or report the bug.", fieldname));
case FIXED:
case BYTES:
- // In the current implementation, since JsonSchema is generated by Avro,
- // there may exist LogicalTypes.Decimal.
- // Mapping decimalType with varcharType in JsonSchema.
+ // When the precision <= 0, throw Exception.
+ // When the precision > 0 and <= 18, use ShortDecimalType. and mapping Long
+ // When the precision > 18 and <= 36, use LongDecimalType. and mapping Slice
+ // When the precision > 36, throw Exception.
if (logicalType instanceof LogicalTypes.Decimal) {
- return createUnboundedVarcharType();
+ LogicalTypes.Decimal decimal = (LogicalTypes.Decimal) logicalType;
+ return DecimalType.createDecimalType(decimal.getPrecision(), decimal.getScale());
}
return VarbinaryType.VARBINARY;
case INT:
diff --git a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/decoder/avro/TestAvroDecoder.java b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/decoder/avro/TestAvroDecoder.java
index 7b270c7995b..2478300dcaa 100644
--- a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/decoder/avro/TestAvroDecoder.java
+++ b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/decoder/avro/TestAvroDecoder.java
@@ -89,8 +89,6 @@ public class TestAvroDecoder extends AbstractDecoderTester {
message.longField = 222L;
message.timestampField = System.currentTimeMillis();
message.enumField = DecoderTestMessage.TestEnum.TEST_ENUM_1;
- message.decimalField = BigDecimal.valueOf(2233, 2);
- message.longDecimalField = new BigDecimal("1234567891234567891234567891.23");
LocalTime now = LocalTime.now(ZoneId.systemDefault());
message.timeField = now.toSecondOfDay() * 1000;
@@ -130,6 +128,17 @@ public class TestAvroDecoder extends AbstractDecoderTester {
PulsarColumnHandle enumFieldColumnHandle = new PulsarColumnHandle(getPulsarConnectorId().toString(),
"enumField", VARCHAR, false, false, "enumField", null, null, PulsarColumnHandle.HandleKeyValueType.NONE);
checkValue(decodedRow, enumFieldColumnHandle, message.enumField.toString());
+ }
+
+ @Test
+ public void testDecimal() {
+ DecoderTestMessage message = new DecoderTestMessage();
+ message.decimalField = BigDecimal.valueOf(2233, 2);
+ message.longDecimalField = new BigDecimal("1234567891234567891234567891.23");
+
+ ByteBuf payload = io.netty.buffer.Unpooled
+ .copiedBuffer(schema.encode(message));
+ Map<DecoderColumnHandle, FieldValueProvider> decodedRow = pulsarRowDecoder.decodeRow(payload).get();
PulsarColumnHandle decimalFieldColumnHandle = new PulsarColumnHandle(getPulsarConnectorId().toString(),
"decimalField", DecimalType.createDecimalType(4, 2), false, false, "decimalField", null, null, PulsarColumnHandle.HandleKeyValueType.NONE);
diff --git a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/decoder/json/TestJsonDecoder.java b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/decoder/json/TestJsonDecoder.java
index 2a22b58a03f..0b8a8f84eda 100644
--- a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/decoder/json/TestJsonDecoder.java
+++ b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/decoder/json/TestJsonDecoder.java
@@ -24,6 +24,7 @@ import io.prestosql.decoder.DecoderColumnHandle;
import io.prestosql.decoder.FieldValueProvider;
import io.prestosql.spi.PrestoException;
import io.prestosql.spi.type.*;
+import java.math.BigDecimal;
import org.apache.pulsar.client.impl.schema.JSONSchema;
import org.apache.pulsar.client.impl.schema.generic.GenericJsonRecord;
import org.apache.pulsar.client.impl.schema.generic.GenericJsonSchema;
@@ -119,6 +120,25 @@ public class TestJsonDecoder extends AbstractDecoderTester {
}
+ @Test
+ public void testDecimal() {
+ DecoderTestMessage message = new DecoderTestMessage();
+ message.decimalField = BigDecimal.valueOf(2233, 2);
+ message.longDecimalField = new BigDecimal("1234567891234567891234567891.23");
+
+ ByteBuf payload = io.netty.buffer.Unpooled
+ .copiedBuffer(schema.encode(message));
+ Map<DecoderColumnHandle, FieldValueProvider> decodedRow = pulsarRowDecoder.decodeRow(payload).get();
+
+ PulsarColumnHandle decimalFieldColumnHandle = new PulsarColumnHandle(getPulsarConnectorId().toString(),
+ "decimalField", DecimalType.createDecimalType(4, 2), false, false, "decimalField", null, null, PulsarColumnHandle.HandleKeyValueType.NONE);
+ checkValue(decodedRow, decimalFieldColumnHandle, message.decimalField);
+
+ PulsarColumnHandle longDecimalFieldColumnHandle = new PulsarColumnHandle(getPulsarConnectorId().toString(),
+ "longDecimalField", DecimalType.createDecimalType(30, 2), false, false, "longDecimalField", null, null, PulsarColumnHandle.HandleKeyValueType.NONE);
+ checkValue(decodedRow, longDecimalFieldColumnHandle, message.longDecimalField);
+ }
+
@Test
public void testArray() {
DecoderTestMessage message = new DecoderTestMessage();