You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by kk...@apache.org on 2020/10/06 01:23:53 UTC
[kafka] branch 2.2 updated: KAFKA-10439: Connect's Values to parse
BigInteger as Decimal with zero scale. (#9320)
This is an automated email from the ASF dual-hosted git repository.
kkarantasis pushed a commit to branch 2.2
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.2 by this push:
new 5b94edd KAFKA-10439: Connect's Values to parse BigInteger as Decimal with zero scale. (#9320)
5b94edd is described below
commit 5b94edd4dfef71128eec3b5e6e63f89278efe572
Author: Alex Diachenko <sa...@gmail.com>
AuthorDate: Mon Oct 5 17:24:44 2020 -0700
KAFKA-10439: Connect's Values to parse BigInteger as Decimal with zero scale. (#9320)
The `org.apache.kafka.connect.data.Values#parse` method parses integers, which are larger than `Long.MAX_VALUE` as `double` with `Schema.FLOAT64_SCHEMA`.
That means we are losing precision for these larger integers.
For example:
`SchemaAndValue schemaAndValue = Values.parseString("9223372036854775808");`
returns:
`SchemaAndValue{schema=Schema{FLOAT64}, value=9.223372036854776E18}`
Also, this method parses values that can be parsed as `FLOAT32` to `FLOAT64`.
This PR changes parsing logic, to use `FLOAT32`/`FLOAT64` for numbers that don't have fraction part(`decimal.scale()!=0`) only, and use an arbitrary-precision `org.apache.kafka.connect.data.Decimal` otherwise.
Also, it updates the method to parse numbers, that can be represented as `float` to `FLOAT64`.
Added unit tests, that cover parsing `BigInteger`, `Byte`, `Short`, `Integer`, `Long`, `Float`, `Double` types.
Reviewers: Konstantine Karantasis <k....@gmail.com>
---
.../java/org/apache/kafka/connect/data/Values.java | 8 +-
.../org/apache/kafka/connect/data/ValuesTest.java | 128 +++++++++++++++++++++
2 files changed, 135 insertions(+), 1 deletion(-)
diff --git a/connect/api/src/main/java/org/apache/kafka/connect/data/Values.java b/connect/api/src/main/java/org/apache/kafka/connect/data/Values.java
index 93c320a..4a50d42 100644
--- a/connect/api/src/main/java/org/apache/kafka/connect/data/Values.java
+++ b/connect/api/src/main/java/org/apache/kafka/connect/data/Values.java
@@ -891,8 +891,14 @@ public class Values {
} catch (ArithmeticException e) {
// continue
}
+ float fValue = decimal.floatValue();
+ if (fValue != Float.NEGATIVE_INFINITY && fValue != Float.POSITIVE_INFINITY
+ && decimal.scale() != 0) {
+ return new SchemaAndValue(Schema.FLOAT32_SCHEMA, fValue);
+ }
double dValue = decimal.doubleValue();
- if (dValue != Double.NEGATIVE_INFINITY && dValue != Double.POSITIVE_INFINITY) {
+ if (dValue != Double.NEGATIVE_INFINITY && dValue != Double.POSITIVE_INFINITY
+ && decimal.scale() != 0) {
return new SchemaAndValue(Schema.FLOAT64_SCHEMA, dValue);
}
Schema schema = Decimal.schema(decimal.scale());
diff --git a/connect/api/src/test/java/org/apache/kafka/connect/data/ValuesTest.java b/connect/api/src/test/java/org/apache/kafka/connect/data/ValuesTest.java
index a5909f3..2d339cf 100644
--- a/connect/api/src/test/java/org/apache/kafka/connect/data/ValuesTest.java
+++ b/connect/api/src/test/java/org/apache/kafka/connect/data/ValuesTest.java
@@ -21,6 +21,8 @@ import org.apache.kafka.connect.data.Values.Parser;
import org.apache.kafka.connect.errors.DataException;
import org.junit.Test;
+import java.math.BigDecimal;
+import java.math.BigInteger;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@@ -571,6 +573,132 @@ public class ValuesTest {
public void canConsume() {
}
+ @Test
+ public void shouldParseBigIntegerAsDecimalWithZeroScale() {
+ BigInteger value = BigInteger.valueOf(Long.MAX_VALUE).add(new BigInteger("1"));
+ SchemaAndValue schemaAndValue = Values.parseString(
+ String.valueOf(value)
+ );
+ assertEquals(Decimal.schema(0), schemaAndValue.schema());
+ assertTrue(schemaAndValue.value() instanceof BigDecimal);
+ assertEquals(value, ((BigDecimal) schemaAndValue.value()).unscaledValue());
+ value = BigInteger.valueOf(Long.MIN_VALUE).subtract(new BigInteger("1"));
+ schemaAndValue = Values.parseString(
+ String.valueOf(value)
+ );
+ assertEquals(Decimal.schema(0), schemaAndValue.schema());
+ assertTrue(schemaAndValue.value() instanceof BigDecimal);
+ assertEquals(value, ((BigDecimal) schemaAndValue.value()).unscaledValue());
+ }
+
+ @Test
+ public void shouldParseByteAsInt8() {
+ Byte value = Byte.MAX_VALUE;
+ SchemaAndValue schemaAndValue = Values.parseString(
+ String.valueOf(value)
+ );
+ assertEquals(Schema.INT8_SCHEMA, schemaAndValue.schema());
+ assertTrue(schemaAndValue.value() instanceof Byte);
+ assertEquals(value.byteValue(), ((Byte) schemaAndValue.value()).byteValue());
+ value = Byte.MIN_VALUE;
+ schemaAndValue = Values.parseString(
+ String.valueOf(value)
+ );
+ assertEquals(Schema.INT8_SCHEMA, schemaAndValue.schema());
+ assertTrue(schemaAndValue.value() instanceof Byte);
+ assertEquals(value.byteValue(), ((Byte) schemaAndValue.value()).byteValue());
+ }
+
+ @Test
+ public void shouldParseShortAsInt16() {
+ Short value = Short.MAX_VALUE;
+ SchemaAndValue schemaAndValue = Values.parseString(
+ String.valueOf(value)
+ );
+ assertEquals(Schema.INT16_SCHEMA, schemaAndValue.schema());
+ assertTrue(schemaAndValue.value() instanceof Short);
+ assertEquals(value.shortValue(), ((Short) schemaAndValue.value()).shortValue());
+ value = Short.MIN_VALUE;
+ schemaAndValue = Values.parseString(
+ String.valueOf(value)
+ );
+ assertEquals(Schema.INT16_SCHEMA, schemaAndValue.schema());
+ assertTrue(schemaAndValue.value() instanceof Short);
+ assertEquals(value.shortValue(), ((Short) schemaAndValue.value()).shortValue());
+ }
+
+ @Test
+ public void shouldParseIntegerAsInt32() {
+ Integer value = Integer.MAX_VALUE;
+ SchemaAndValue schemaAndValue = Values.parseString(
+ String.valueOf(value)
+ );
+ assertEquals(Schema.INT32_SCHEMA, schemaAndValue.schema());
+ assertTrue(schemaAndValue.value() instanceof Integer);
+ assertEquals(value.intValue(), ((Integer) schemaAndValue.value()).intValue());
+ value = Integer.MIN_VALUE;
+ schemaAndValue = Values.parseString(
+ String.valueOf(value)
+ );
+ assertEquals(Schema.INT32_SCHEMA, schemaAndValue.schema());
+ assertTrue(schemaAndValue.value() instanceof Integer);
+ assertEquals(value.intValue(), ((Integer) schemaAndValue.value()).intValue());
+ }
+
+ @Test
+ public void shouldParseLongAsInt64() {
+ Long value = Long.MAX_VALUE;
+ SchemaAndValue schemaAndValue = Values.parseString(
+ String.valueOf(value)
+ );
+ assertEquals(Schema.INT64_SCHEMA, schemaAndValue.schema());
+ assertTrue(schemaAndValue.value() instanceof Long);
+ assertEquals(value.longValue(), ((Long) schemaAndValue.value()).longValue());
+ value = Long.MIN_VALUE;
+ schemaAndValue = Values.parseString(
+ String.valueOf(value)
+ );
+ assertEquals(Schema.INT64_SCHEMA, schemaAndValue.schema());
+ assertTrue(schemaAndValue.value() instanceof Long);
+ assertEquals(value.longValue(), ((Long) schemaAndValue.value()).longValue());
+ }
+
+ @Test
+ public void shouldParseFloatAsFloat32() {
+ Float value = Float.MAX_VALUE;
+ SchemaAndValue schemaAndValue = Values.parseString(
+ String.valueOf(value)
+ );
+ assertEquals(Schema.FLOAT32_SCHEMA, schemaAndValue.schema());
+ assertTrue(schemaAndValue.value() instanceof Float);
+ assertEquals(value.floatValue(), ((Float) schemaAndValue.value()).floatValue(), 0);
+ value = -Float.MAX_VALUE;
+ schemaAndValue = Values.parseString(
+ String.valueOf(value)
+ );
+ assertEquals(Schema.FLOAT32_SCHEMA, schemaAndValue.schema());
+ assertTrue(schemaAndValue.value() instanceof Float);
+ assertEquals(value.floatValue(), ((Float) schemaAndValue.value()).floatValue(), 0);
+ }
+
+ @Test
+ public void shouldParseDoubleAsFloat64() {
+ Double value = Double.MAX_VALUE;
+ SchemaAndValue schemaAndValue = Values.parseString(
+ String.valueOf(value)
+ );
+ assertEquals(Schema.FLOAT64_SCHEMA, schemaAndValue.schema());
+ assertTrue(schemaAndValue.value() instanceof Double);
+ assertEquals(value.doubleValue(), ((Double) schemaAndValue.value()).doubleValue(), 0);
+ value = -Double.MAX_VALUE;
+ schemaAndValue = Values.parseString(
+ String.valueOf(value)
+ );
+ assertEquals(Schema.FLOAT64_SCHEMA, schemaAndValue.schema());
+ assertTrue(schemaAndValue.value() instanceof Double);
+ assertEquals(value.doubleValue(), ((Double) schemaAndValue.value()).doubleValue(), 0);
+ }
+
protected void assertParsed(String input) {
assertParsed(input, input);
}