You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/05/26 00:45:10 UTC
[pulsar] branch branch-2.9 updated: [fix][sql] Fix the decimal type error convert in json schema (#15687)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.9 by this push:
new fb426867c65 [fix][sql] Fix the decimal type error convert in json schema (#15687)
fb426867c65 is described below
commit fb426867c6534fc1cd2aae7c070b2b201bcc97ce
Author: Baodi Shi <wu...@icloud.com>
AuthorDate: Thu May 26 08:24:14 2022 +0800
[fix][sql] Fix the decimal type error convert in json schema (#15687)
In the current sql implementation, If using `JSON` schema and querying for decimal type, there will be the following two errors:
1. The data type is displayed as varchar.
2. Loss of precision because scientific notation is used to display.
```
presto> select bigdecimal, typeof(bigdecimal) as devimal_type from pulsar."public/default".test_avro2;
bigdecimal | devimal_type
-----------------------+--------------
1.2345678912345678E36 | varchar
1.2345678912345678E36 | varchar
(2 rows)
```
The original data is: `1234567891234567891234567891234567.89`
- When getting jsonNode, use `BIG_DECIMAL` instead of float and double.
- `PulsarJsonFieldDecoder` increases the processing of Decimal types
(cherry picked from commit 0c6e2ca24fd368d11a769233bb2041f6cc4a8374)
---
.../impl/schema/generic/GenericJsonReader.java | 27 ++++++++++------------
.../decoder/json/PulsarJsonFieldDecoder.java | 22 ++++++++++++++++++
.../decoder/json/PulsarJsonRowDecoderFactory.java | 11 +++++----
.../sql/presto/decoder/avro/TestAvroDecoder.java | 13 +++++++++--
.../sql/presto/decoder/json/TestJsonDecoder.java | 20 ++++++++++++++++
5 files changed, 72 insertions(+), 21 deletions(-)
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericJsonReader.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericJsonReader.java
index f0b2c86508b..1a95e9be152 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericJsonReader.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericJsonReader.java
@@ -18,35 +18,31 @@
*/
package org.apache.pulsar.client.impl.schema.generic;
+import static java.nio.charset.StandardCharsets.UTF_8;
+import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.ObjectReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.List;
import org.apache.pulsar.client.api.SchemaSerializationException;
import org.apache.pulsar.client.api.schema.Field;
import org.apache.pulsar.client.api.schema.GenericRecord;
import org.apache.pulsar.client.api.schema.SchemaReader;
-
import org.apache.pulsar.common.schema.SchemaInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.List;
-
-import static java.nio.charset.StandardCharsets.UTF_8;
-
public class GenericJsonReader implements SchemaReader<GenericRecord> {
- private final ObjectMapper objectMapper;
+ private final ObjectReader objectReader;
private final byte[] schemaVersion;
private final List<Field> fields;
private SchemaInfo schemaInfo;
public GenericJsonReader(List<Field> fields, SchemaInfo schemaInfo){
- this.fields = fields;
- this.schemaVersion = null;
- this.objectMapper = new ObjectMapper();
- this.schemaInfo = schemaInfo;
+ this(null, fields, schemaInfo);
}
public GenericJsonReader(List<Field> fields){
@@ -58,16 +54,17 @@ public class GenericJsonReader implements SchemaReader<GenericRecord> {
}
public GenericJsonReader(byte[] schemaVersion, List<Field> fields, SchemaInfo schemaInfo){
- this.objectMapper = new ObjectMapper();
this.fields = fields;
this.schemaVersion = schemaVersion;
this.schemaInfo = schemaInfo;
+ ObjectMapper objectMapper = new ObjectMapper();
+ this.objectReader = objectMapper.reader().with(DeserializationFeature.USE_BIG_DECIMAL_FOR_FLOATS);
}
@Override
public GenericJsonRecord read(byte[] bytes, int offset, int length) {
try {
- JsonNode jn = objectMapper.readTree(new String(bytes, offset, length, UTF_8));
+ JsonNode jn = objectReader.readTree(new String(bytes, offset, length, UTF_8));
return new GenericJsonRecord(schemaVersion, fields, jn, schemaInfo);
} catch (IOException ioe) {
throw new SchemaSerializationException(ioe);
@@ -77,7 +74,7 @@ public class GenericJsonReader implements SchemaReader<GenericRecord> {
@Override
public GenericRecord read(InputStream inputStream) {
try {
- JsonNode jn = objectMapper.readTree(inputStream);
+ JsonNode jn = objectReader.readTree(inputStream);
return new GenericJsonRecord(schemaVersion, fields, jn, schemaInfo);
} catch (IOException ioe) {
throw new SchemaSerializationException(ioe);
diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/decoder/json/PulsarJsonFieldDecoder.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/decoder/json/PulsarJsonFieldDecoder.java
index 960d8f42bdf..a14cd6b3ba2 100644
--- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/decoder/json/PulsarJsonFieldDecoder.java
+++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/decoder/json/PulsarJsonFieldDecoder.java
@@ -58,6 +58,8 @@ import io.prestosql.spi.type.ArrayType;
import io.prestosql.spi.type.BigintType;
import io.prestosql.spi.type.BooleanType;
import io.prestosql.spi.type.DateType;
+import io.prestosql.spi.type.DecimalType;
+import io.prestosql.spi.type.Decimals;
import io.prestosql.spi.type.DoubleType;
import io.prestosql.spi.type.IntegerType;
import io.prestosql.spi.type.MapType;
@@ -70,6 +72,7 @@ import io.prestosql.spi.type.TinyintType;
import io.prestosql.spi.type.Type;
import io.prestosql.spi.type.VarbinaryType;
import io.prestosql.spi.type.VarcharType;
+import java.math.BigInteger;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -120,6 +123,9 @@ public class PulsarJsonFieldDecoder
}
private boolean isSupportedType(Type type) {
+ if (type instanceof DecimalType) {
+ return true;
+ }
if (isVarcharType(type)) {
return true;
}
@@ -228,6 +234,13 @@ public class PulsarJsonFieldDecoder
return floatToIntBits((Float) parseFloat(value.asText()));
}
+ // If it is decimalType, need to eliminate the decimal point,
+ // and give it to presto to set the decimal point
+ if (type instanceof DecimalType) {
+ String decimalLong = value.asText().replace(".", "");
+ return Long.valueOf(decimalLong);
+ }
+
long longValue;
if (value.isIntegralNumber() && !value.isBigInteger()) {
longValue = value.longValue();
@@ -267,6 +280,15 @@ public class PulsarJsonFieldDecoder
private static Slice getSlice(JsonNode value, Type type, String columnName) {
String textValue = value.isValueNode() ? value.asText() : value.toString();
+
+ // If it is decimalType, need to eliminate the decimal point,
+ // and give it to presto to set the decimal point
+ if (type instanceof DecimalType) {
+ textValue = textValue.replace(".", "");
+ BigInteger bigInteger = new BigInteger(textValue);
+ return Decimals.encodeUnscaledValue(bigInteger);
+ }
+
Slice slice = utf8Slice(textValue);
if (isVarcharType(type)) {
slice = truncateToLength(slice, type);
diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/decoder/json/PulsarJsonRowDecoderFactory.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/decoder/json/PulsarJsonRowDecoderFactory.java
index 843c2c7f836..b94a963cffd 100644
--- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/decoder/json/PulsarJsonRowDecoderFactory.java
+++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/decoder/json/PulsarJsonRowDecoderFactory.java
@@ -34,6 +34,7 @@ import io.prestosql.spi.connector.ColumnMetadata;
import io.prestosql.spi.type.ArrayType;
import io.prestosql.spi.type.BigintType;
import io.prestosql.spi.type.BooleanType;
+import io.prestosql.spi.type.DecimalType;
import io.prestosql.spi.type.DoubleType;
import io.prestosql.spi.type.IntegerType;
import io.prestosql.spi.type.RealType;
@@ -128,11 +129,13 @@ public class PulsarJsonRowDecoderFactory implements PulsarRowDecoderFactory {
+ "please check the schema or report the bug.", fieldname));
case FIXED:
case BYTES:
- // In the current implementation, since JsonSchema is generated by Avro,
- // there may exist LogicalTypes.Decimal.
- // Mapping decimalType with varcharType in JsonSchema.
+ // When the precision <= 0, throw Exception.
+ // When the precision > 0 and <= 18, use ShortDecimalType. and mapping Long
+ // When the precision > 18 and <= 36, use LongDecimalType. and mapping Slice
+ // When the precision > 36, throw Exception.
if (logicalType instanceof LogicalTypes.Decimal) {
- return createUnboundedVarcharType();
+ LogicalTypes.Decimal decimal = (LogicalTypes.Decimal) logicalType;
+ return DecimalType.createDecimalType(decimal.getPrecision(), decimal.getScale());
}
return VarbinaryType.VARBINARY;
case INT:
diff --git a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/decoder/avro/TestAvroDecoder.java b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/decoder/avro/TestAvroDecoder.java
index 7b270c7995b..2478300dcaa 100644
--- a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/decoder/avro/TestAvroDecoder.java
+++ b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/decoder/avro/TestAvroDecoder.java
@@ -89,8 +89,6 @@ public class TestAvroDecoder extends AbstractDecoderTester {
message.longField = 222L;
message.timestampField = System.currentTimeMillis();
message.enumField = DecoderTestMessage.TestEnum.TEST_ENUM_1;
- message.decimalField = BigDecimal.valueOf(2233, 2);
- message.longDecimalField = new BigDecimal("1234567891234567891234567891.23");
LocalTime now = LocalTime.now(ZoneId.systemDefault());
message.timeField = now.toSecondOfDay() * 1000;
@@ -130,6 +128,17 @@ public class TestAvroDecoder extends AbstractDecoderTester {
PulsarColumnHandle enumFieldColumnHandle = new PulsarColumnHandle(getPulsarConnectorId().toString(),
"enumField", VARCHAR, false, false, "enumField", null, null, PulsarColumnHandle.HandleKeyValueType.NONE);
checkValue(decodedRow, enumFieldColumnHandle, message.enumField.toString());
+ }
+
+ @Test
+ public void testDecimal() {
+ DecoderTestMessage message = new DecoderTestMessage();
+ message.decimalField = BigDecimal.valueOf(2233, 2);
+ message.longDecimalField = new BigDecimal("1234567891234567891234567891.23");
+
+ ByteBuf payload = io.netty.buffer.Unpooled
+ .copiedBuffer(schema.encode(message));
+ Map<DecoderColumnHandle, FieldValueProvider> decodedRow = pulsarRowDecoder.decodeRow(payload).get();
PulsarColumnHandle decimalFieldColumnHandle = new PulsarColumnHandle(getPulsarConnectorId().toString(),
"decimalField", DecimalType.createDecimalType(4, 2), false, false, "decimalField", null, null, PulsarColumnHandle.HandleKeyValueType.NONE);
diff --git a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/decoder/json/TestJsonDecoder.java b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/decoder/json/TestJsonDecoder.java
index 2a22b58a03f..0b8a8f84eda 100644
--- a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/decoder/json/TestJsonDecoder.java
+++ b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/decoder/json/TestJsonDecoder.java
@@ -24,6 +24,7 @@ import io.prestosql.decoder.DecoderColumnHandle;
import io.prestosql.decoder.FieldValueProvider;
import io.prestosql.spi.PrestoException;
import io.prestosql.spi.type.*;
+import java.math.BigDecimal;
import org.apache.pulsar.client.impl.schema.JSONSchema;
import org.apache.pulsar.client.impl.schema.generic.GenericJsonRecord;
import org.apache.pulsar.client.impl.schema.generic.GenericJsonSchema;
@@ -119,6 +120,25 @@ public class TestJsonDecoder extends AbstractDecoderTester {
}
+ @Test
+ public void testDecimal() {
+ DecoderTestMessage message = new DecoderTestMessage();
+ message.decimalField = BigDecimal.valueOf(2233, 2);
+ message.longDecimalField = new BigDecimal("1234567891234567891234567891.23");
+
+ ByteBuf payload = io.netty.buffer.Unpooled
+ .copiedBuffer(schema.encode(message));
+ Map<DecoderColumnHandle, FieldValueProvider> decodedRow = pulsarRowDecoder.decodeRow(payload).get();
+
+ PulsarColumnHandle decimalFieldColumnHandle = new PulsarColumnHandle(getPulsarConnectorId().toString(),
+ "decimalField", DecimalType.createDecimalType(4, 2), false, false, "decimalField", null, null, PulsarColumnHandle.HandleKeyValueType.NONE);
+ checkValue(decodedRow, decimalFieldColumnHandle, message.decimalField);
+
+ PulsarColumnHandle longDecimalFieldColumnHandle = new PulsarColumnHandle(getPulsarConnectorId().toString(),
+ "longDecimalField", DecimalType.createDecimalType(30, 2), false, false, "longDecimalField", null, null, PulsarColumnHandle.HandleKeyValueType.NONE);
+ checkValue(decodedRow, longDecimalFieldColumnHandle, message.longDecimalField);
+ }
+
@Test
public void testArray() {
DecoderTestMessage message = new DecoderTestMessage();