You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by ji...@apache.org on 2022/04/15 03:01:50 UTC
[incubator-doris-flink-connector] branch branch-for-flink-before-1.13 updated: [Bug-1.13] Fix row type decimal convert bug (#27)
This is an automated email from the ASF dual-hosted git repository.
jiafengzheng pushed a commit to branch branch-for-flink-before-1.13
in repository https://gitbox.apache.org/repos/asf/incubator-doris-flink-connector.git
The following commit(s) were added to refs/heads/branch-for-flink-before-1.13 by this push:
new 2ef45b8 [Bug-1.13] Fix row type decimal convert bug (#27)
2ef45b8 is described below
commit 2ef45b84b96405715cef09aec048573fd018bd19
Author: aiwenmo <32...@users.noreply.github.com>
AuthorDate: Fri Apr 15 11:01:45 2022 +0800
[Bug-1.13] Fix row type decimal convert bug (#27)
* [Bug-1.13] Fix row type decimal convert bug
---
.../apache/doris/flink/serialization/RowBatch.java | 6 +-
.../doris/flink/table/DorisDynamicTableSource.java | 5 +-
.../doris/flink/table/DorisRowDataInputFormat.java | 46 +++++-
.../doris/flink/serialization/TestRowBatch.java | 160 +++++++++++----------
4 files changed, 128 insertions(+), 89 deletions(-)
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/serialization/RowBatch.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/serialization/RowBatch.java
index 3337637..827ec81 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/serialization/RowBatch.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/serialization/RowBatch.java
@@ -37,8 +37,6 @@ import org.apache.doris.flink.exception.DorisException;
import org.apache.doris.flink.rest.models.Schema;
import org.apache.doris.thrift.TScanBatchResult;
-import org.apache.flink.table.data.DecimalData;
-import org.apache.flink.table.data.StringData;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -243,7 +241,7 @@ public class RowBatch {
continue;
}
BigDecimal value = decimalVector.getObject(rowIndex).stripTrailingZeros();
- addValueToRow(rowIndex, DecimalData.fromBigDecimal(value, value.precision(), value.scale()));
+ addValueToRow(rowIndex, value);
}
break;
case "DATE":
@@ -261,7 +259,7 @@ public class RowBatch {
continue;
}
String value = new String(varCharVector.get(rowIndex));
- addValueToRow(rowIndex, StringData.fromString(value));
+ addValueToRow(rowIndex, value);
}
break;
default:
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSource.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSource.java
index 0262677..689aa47 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSource.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSource.java
@@ -33,6 +33,8 @@ import org.apache.flink.table.connector.source.ScanTableSource;
import org.apache.flink.table.connector.source.abilities.SupportsFilterPushDown;
import org.apache.flink.table.connector.source.abilities.SupportsProjectionPushDown;
import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.RowType;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -80,7 +82,8 @@ public final class DorisDynamicTableSource implements ScanTableSource, LookupTab
.setPassword(options.getPassword())
.setTableIdentifier(options.getTableIdentifier())
.setPartitions(dorisPartitions)
- .setReadOptions(readOptions);
+ .setReadOptions(readOptions)
+ .setRowType((RowType) physicalSchema.toRowDataType().getLogicalType());
return InputFormatProvider.of(builder.build());
}
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisRowDataInputFormat.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisRowDataInputFormat.java
index c75a88f..eeb63ba 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisRowDataInputFormat.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisRowDataInputFormat.java
@@ -14,6 +14,7 @@
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
+
package org.apache.doris.flink.table;
import org.apache.doris.flink.cfg.DorisOptions;
@@ -29,16 +30,23 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.io.InputSplitAssigner;
+import org.apache.flink.table.data.DecimalData;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
import java.io.IOException;
+import java.math.BigDecimal;
import java.sql.PreparedStatement;
import java.util.ArrayList;
import java.util.List;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
/**
* InputFormat for {@link DorisDynamicTableSource}.
*/
@@ -56,10 +64,13 @@ public class DorisRowDataInputFormat extends RichInputFormat<RowData, DorisTable
private ScalaValueReader scalaValueReader;
private transient boolean hasNext;
- public DorisRowDataInputFormat(DorisOptions options, List<PartitionDefinition> dorisPartitions, DorisReadOptions readOptions) {
+ private RowType rowType;
+
+ public DorisRowDataInputFormat(DorisOptions options, List<PartitionDefinition> dorisPartitions, DorisReadOptions readOptions, RowType rowType) {
this.options = options;
this.dorisPartitions = dorisPartitions;
this.readOptions = readOptions;
+ this.rowType = rowType;
}
@Override
@@ -136,15 +147,30 @@ public class DorisRowDataInputFormat extends RichInputFormat<RowData, DorisTable
return null;
}
List next = (List) scalaValueReader.next();
- GenericRowData genericRowData = new GenericRowData(next.size());
- for (int i = 0; i < next.size(); i++) {
- genericRowData.setField(i, next.get(i));
+ GenericRowData genericRowData = new GenericRowData(rowType.getFieldCount());
+ for (int i = 0; i < next.size() && i < rowType.getFieldCount(); i++) {
+ Object value = deserialize(rowType.getTypeAt(i), next.get(i));
+ genericRowData.setField(i, value);
}
//update hasNext after we've read the record
hasNext = scalaValueReader.hasNext();
return genericRowData;
}
+ private Object deserialize(LogicalType type, Object val) {
+ switch (type.getTypeRoot()) {
+ case DECIMAL:
+ final DecimalType decimalType = ((DecimalType) type);
+ final int precision = decimalType.getPrecision();
+ final int scala = decimalType.getScale();
+ return DecimalData.fromBigDecimal((BigDecimal) val, precision, scala);
+ case VARCHAR:
+ return StringData.fromString((String) val);
+ default:
+ return val;
+ }
+ }
+
@Override
public BaseStatistics getStatistics(BaseStatistics cachedStatistics) throws IOException {
return cachedStatistics;
@@ -182,6 +208,7 @@ public class DorisRowDataInputFormat extends RichInputFormat<RowData, DorisTable
private DorisOptions.Builder optionsBuilder;
private List<PartitionDefinition> partitions;
private DorisReadOptions readOptions;
+ private RowType rowType;
public Builder() {
@@ -218,9 +245,14 @@ public class DorisRowDataInputFormat extends RichInputFormat<RowData, DorisTable
return this;
}
+ public Builder setRowType(RowType rowType) {
+ this.rowType = rowType;
+ return this;
+ }
+
public DorisRowDataInputFormat build() {
return new DorisRowDataInputFormat(
- optionsBuilder.build(), partitions, readOptions
+ optionsBuilder.build(), partitions, readOptions, rowType
);
}
}
diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/serialization/TestRowBatch.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/serialization/TestRowBatch.java
index 0f45aaa..f2bf878 100644
--- a/flink-doris-connector/src/test/java/org/apache/doris/flink/serialization/TestRowBatch.java
+++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/serialization/TestRowBatch.java
@@ -44,7 +44,7 @@ import org.apache.doris.thrift.TStatusCode;
import org.apache.flink.shaded.guava18.com.google.common.collect.ImmutableList;
import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
import org.apache.flink.table.data.DecimalData;
-import org.apache.flink.table.data.StringData;
+
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
@@ -79,23 +79,23 @@ public class TestRowBatch {
childrenBuilder.add(new Field("k8", FieldType.nullable(new ArrowType.FloatingPoint(FloatingPointPrecision.DOUBLE)), null));
childrenBuilder.add(new Field("k10", FieldType.nullable(new ArrowType.Utf8()), null));
childrenBuilder.add(new Field("k11", FieldType.nullable(new ArrowType.Utf8()), null));
- childrenBuilder.add(new Field("k5", FieldType.nullable(new ArrowType.Decimal(9,2)), null));
+ childrenBuilder.add(new Field("k5", FieldType.nullable(new ArrowType.Decimal(9, 2)), null));
childrenBuilder.add(new Field("k6", FieldType.nullable(new ArrowType.Utf8()), null));
VectorSchemaRoot root = VectorSchemaRoot.create(
- new org.apache.arrow.vector.types.pojo.Schema(childrenBuilder.build(), null),
- new RootAllocator(Integer.MAX_VALUE));
+ new org.apache.arrow.vector.types.pojo.Schema(childrenBuilder.build(), null),
+ new RootAllocator(Integer.MAX_VALUE));
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
ArrowStreamWriter arrowStreamWriter = new ArrowStreamWriter(
- root,
- new DictionaryProvider.MapDictionaryProvider(),
- outputStream);
+ root,
+ new DictionaryProvider.MapDictionaryProvider(),
+ outputStream);
arrowStreamWriter.start();
root.setRowCount(3);
FieldVector vector = root.getVector("k0");
- BitVector bitVector = (BitVector)vector;
+ BitVector bitVector = (BitVector) vector;
bitVector.setInitialCapacity(3);
bitVector.allocateNew(3);
bitVector.setSafe(0, 1);
@@ -104,7 +104,7 @@ public class TestRowBatch {
vector.setValueCount(3);
vector = root.getVector("k1");
- TinyIntVector tinyIntVector = (TinyIntVector)vector;
+ TinyIntVector tinyIntVector = (TinyIntVector) vector;
tinyIntVector.setInitialCapacity(3);
tinyIntVector.allocateNew(3);
tinyIntVector.setSafe(0, 1);
@@ -113,7 +113,7 @@ public class TestRowBatch {
vector.setValueCount(3);
vector = root.getVector("k2");
- SmallIntVector smallIntVector = (SmallIntVector)vector;
+ SmallIntVector smallIntVector = (SmallIntVector) vector;
smallIntVector.setInitialCapacity(3);
smallIntVector.allocateNew(3);
smallIntVector.setSafe(0, 1);
@@ -122,7 +122,7 @@ public class TestRowBatch {
vector.setValueCount(3);
vector = root.getVector("k3");
- IntVector intVector = (IntVector)vector;
+ IntVector intVector = (IntVector) vector;
intVector.setInitialCapacity(3);
intVector.allocateNew(3);
intVector.setSafe(0, 1);
@@ -131,7 +131,7 @@ public class TestRowBatch {
vector.setValueCount(3);
vector = root.getVector("k4");
- BigIntVector bigIntVector = (BigIntVector)vector;
+ BigIntVector bigIntVector = (BigIntVector) vector;
bigIntVector.setInitialCapacity(3);
bigIntVector.allocateNew(3);
bigIntVector.setSafe(0, 1);
@@ -140,7 +140,7 @@ public class TestRowBatch {
vector.setValueCount(3);
vector = root.getVector("k5");
- DecimalVector decimalVector = (DecimalVector)vector;
+ DecimalVector decimalVector = (DecimalVector) vector;
decimalVector.setInitialCapacity(3);
decimalVector.allocateNew();
decimalVector.setIndexDefined(0);
@@ -152,7 +152,7 @@ public class TestRowBatch {
vector.setValueCount(3);
vector = root.getVector("k6");
- VarCharVector charVector = (VarCharVector)vector;
+ VarCharVector charVector = (VarCharVector) vector;
charVector.setInitialCapacity(3);
charVector.allocateNew();
charVector.setIndexDefined(0);
@@ -167,7 +167,7 @@ public class TestRowBatch {
vector.setValueCount(3);
vector = root.getVector("k8");
- Float8Vector float8Vector = (Float8Vector)vector;
+ Float8Vector float8Vector = (Float8Vector) vector;
float8Vector.setInitialCapacity(3);
float8Vector.allocateNew(3);
float8Vector.setSafe(0, 1.1);
@@ -176,7 +176,7 @@ public class TestRowBatch {
vector.setValueCount(3);
vector = root.getVector("k9");
- Float4Vector float4Vector = (Float4Vector)vector;
+ Float4Vector float4Vector = (Float4Vector) vector;
float4Vector.setInitialCapacity(3);
float4Vector.allocateNew(3);
float4Vector.setSafe(0, 1.1f);
@@ -185,7 +185,7 @@ public class TestRowBatch {
vector.setValueCount(3);
vector = root.getVector("k10");
- VarCharVector datecharVector = (VarCharVector)vector;
+ VarCharVector datecharVector = (VarCharVector) vector;
datecharVector.setInitialCapacity(3);
datecharVector.allocateNew();
datecharVector.setIndexDefined(0);
@@ -200,7 +200,7 @@ public class TestRowBatch {
vector.setValueCount(3);
vector = root.getVector("k11");
- VarCharVector timecharVector = (VarCharVector)vector;
+ VarCharVector timecharVector = (VarCharVector) vector;
timecharVector.setInitialCapacity(3);
timecharVector.allocateNew();
timecharVector.setIndexDefined(0);
@@ -227,71 +227,74 @@ public class TestRowBatch {
scanBatchResult.setRows(outputStream.toByteArray());
String schemaStr = "{\"properties\":[{\"type\":\"BOOLEAN\",\"name\":\"k0\",\"comment\":\"\"},"
- + "{\"type\":\"TINYINT\",\"name\":\"k1\",\"comment\":\"\"},{\"type\":\"SMALLINT\",\"name\":\"k2\","
- + "\"comment\":\"\"},{\"type\":\"INT\",\"name\":\"k3\",\"comment\":\"\"},{\"type\":\"BIGINT\","
- + "\"name\":\"k4\",\"comment\":\"\"},{\"type\":\"FLOAT\",\"name\":\"k9\",\"comment\":\"\"},"
- + "{\"type\":\"DOUBLE\",\"name\":\"k8\",\"comment\":\"\"},{\"type\":\"DATE\",\"name\":\"k10\","
- + "\"comment\":\"\"},{\"type\":\"DATETIME\",\"name\":\"k11\",\"comment\":\"\"},"
- + "{\"name\":\"k5\",\"scale\":\"0\",\"comment\":\"\","
- + "\"type\":\"DECIMAL\",\"precision\":\"9\",\"aggregation_type\":\"\"},{\"type\":\"CHAR\",\"name\":\"k6\",\"comment\":\"\",\"aggregation_type\":\"REPLACE_IF_NOT_NULL\"}],"
- + "\"status\":200}";
+ + "{\"type\":\"TINYINT\",\"name\":\"k1\",\"comment\":\"\"},{\"type\":\"SMALLINT\",\"name\":\"k2\","
+ + "\"comment\":\"\"},{\"type\":\"INT\",\"name\":\"k3\",\"comment\":\"\"},{\"type\":\"BIGINT\","
+ + "\"name\":\"k4\",\"comment\":\"\"},{\"type\":\"FLOAT\",\"name\":\"k9\",\"comment\":\"\"},"
+ + "{\"type\":\"DOUBLE\",\"name\":\"k8\",\"comment\":\"\"},{\"type\":\"DATE\",\"name\":\"k10\","
+ + "\"comment\":\"\"},{\"type\":\"DATETIME\",\"name\":\"k11\",\"comment\":\"\"},"
+ + "{\"name\":\"k5\",\"scale\":\"0\",\"comment\":\"\","
+ + "\"type\":\"DECIMAL\",\"precision\":\"9\",\"aggregation_type\":\"\"},{\"type\":\"CHAR\",\"name\":\"k6\",\"comment\":\"\",\"aggregation_type\":\"REPLACE_IF_NOT_NULL\"}],"
+ + "\"status\":200}";
Schema schema = RestService.parseSchema(schemaStr, logger);
RowBatch rowBatch = new RowBatch(scanBatchResult, schema).readArrow();
List<Object> expectedRow1 = Lists.newArrayList(
- Boolean.TRUE,
- (byte) 1,
- (short) 1,
- 1,
- 1L,
- (float) 1.1,
- (double) 1.1,
- StringData.fromString("2008-08-08"),
- StringData.fromString("2008-08-08 00:00:00"),
- DecimalData.fromBigDecimal(new BigDecimal(12.34), 4, 2),
- StringData.fromString("char1")
+ Boolean.TRUE,
+ (byte) 1,
+ (short) 1,
+ 1,
+ 1L,
+ (float) 1.1,
+ (double) 1.1,
+ "2008-08-08",
+ "2008-08-08 00:00:00",
+ DecimalData.fromBigDecimal(new BigDecimal(12.34), 4, 2),
+ "char1"
);
List<Object> expectedRow2 = Arrays.asList(
- Boolean.FALSE,
- (byte) 2,
- (short) 2,
- null,
- 2L,
- (float) 2.2,
- (double) 2.2,
- StringData.fromString("1900-08-08"),
- StringData.fromString("1900-08-08 00:00:00"),
- DecimalData.fromBigDecimal(new BigDecimal(88.88), 4, 2),
- StringData.fromString("char2")
+ Boolean.FALSE,
+ (byte) 2,
+ (short) 2,
+ null,
+ 2L,
+ (float) 2.2,
+ (double) 2.2,
+ "1900-08-08",
+ "1900-08-08 00:00:00",
+ DecimalData.fromBigDecimal(new BigDecimal(88.88), 4, 2),
+ "char2"
);
List<Object> expectedRow3 = Arrays.asList(
- Boolean.TRUE,
- (byte) 3,
- (short) 3,
- 3,
- 3L,
- (float) 3.3,
- (double) 3.3,
- StringData.fromString("2100-08-08"),
- StringData.fromString("2100-08-08 00:00:00"),
- DecimalData.fromBigDecimal(new BigDecimal(10.22), 4, 2),
- StringData.fromString("char3")
+ Boolean.TRUE,
+ (byte) 3,
+ (short) 3,
+ 3,
+ 3L,
+ (float) 3.3,
+ (double) 3.3,
+ "2100-08-08",
+ "2100-08-08 00:00:00",
+ DecimalData.fromBigDecimal(new BigDecimal(10.22), 4, 2),
+ "char3"
);
Assert.assertTrue(rowBatch.hasNext());
List<Object> actualRow1 = rowBatch.next();
+ actualRow1.set(9, DecimalData.fromBigDecimal((BigDecimal) actualRow1.get(9), 4, 2));
Assert.assertEquals(expectedRow1, actualRow1);
Assert.assertTrue(rowBatch.hasNext());
List<Object> actualRow2 = rowBatch.next();
+ actualRow2.set(9, DecimalData.fromBigDecimal((BigDecimal) actualRow2.get(9), 4, 2));
Assert.assertEquals(expectedRow2, actualRow2);
Assert.assertTrue(rowBatch.hasNext());
List<Object> actualRow3 = rowBatch.next();
+ actualRow3.set(9, DecimalData.fromBigDecimal((BigDecimal) actualRow3.get(9), 4, 2));
Assert.assertEquals(expectedRow3, actualRow3);
Assert.assertFalse(rowBatch.hasNext());
@@ -310,13 +313,13 @@ public class TestRowBatch {
childrenBuilder.add(new Field("k7", FieldType.nullable(new ArrowType.Binary()), null));
VectorSchemaRoot root = VectorSchemaRoot.create(
- new org.apache.arrow.vector.types.pojo.Schema(childrenBuilder.build(), null),
- new RootAllocator(Integer.MAX_VALUE));
+ new org.apache.arrow.vector.types.pojo.Schema(childrenBuilder.build(), null),
+ new RootAllocator(Integer.MAX_VALUE));
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
ArrowStreamWriter arrowStreamWriter = new ArrowStreamWriter(
- root,
- new DictionaryProvider.MapDictionaryProvider(),
- outputStream);
+ root,
+ new DictionaryProvider.MapDictionaryProvider(),
+ outputStream);
arrowStreamWriter.start();
root.setRowCount(3);
@@ -356,15 +359,15 @@ public class TestRowBatch {
Assert.assertTrue(rowBatch.hasNext());
List<Object> actualRow0 = rowBatch.next();
- Assert.assertArrayEquals(binaryRow0, (byte[])actualRow0.get(0));
+ Assert.assertArrayEquals(binaryRow0, (byte[]) actualRow0.get(0));
Assert.assertTrue(rowBatch.hasNext());
List<Object> actualRow1 = rowBatch.next();
- Assert.assertArrayEquals(binaryRow1, (byte[])actualRow1.get(0));
+ Assert.assertArrayEquals(binaryRow1, (byte[]) actualRow1.get(0));
Assert.assertTrue(rowBatch.hasNext());
List<Object> actualRow2 = rowBatch.next();
- Assert.assertArrayEquals(binaryRow2, (byte[])actualRow2.get(0));
+ Assert.assertArrayEquals(binaryRow2, (byte[]) actualRow2.get(0));
Assert.assertFalse(rowBatch.hasNext());
thrown.expect(NoSuchElementException.class);
@@ -378,13 +381,13 @@ public class TestRowBatch {
childrenBuilder.add(new Field("k7", FieldType.nullable(new ArrowType.Decimal(27, 9)), null));
VectorSchemaRoot root = VectorSchemaRoot.create(
- new org.apache.arrow.vector.types.pojo.Schema(childrenBuilder.build(), null),
- new RootAllocator(Integer.MAX_VALUE));
+ new org.apache.arrow.vector.types.pojo.Schema(childrenBuilder.build(), null),
+ new RootAllocator(Integer.MAX_VALUE));
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
ArrowStreamWriter arrowStreamWriter = new ArrowStreamWriter(
- root,
- new DictionaryProvider.MapDictionaryProvider(),
- outputStream);
+ root,
+ new DictionaryProvider.MapDictionaryProvider(),
+ outputStream);
arrowStreamWriter.start();
root.setRowCount(3);
@@ -411,8 +414,8 @@ public class TestRowBatch {
scanBatchResult.setRows(outputStream.toByteArray());
String schemaStr = "{\"properties\":[{\"type\":\"DECIMALV2\",\"scale\": 0,"
- + "\"precision\": 9, \"name\":\"k7\",\"comment\":\"\"}], "
- + "\"status\":200}";
+ + "\"precision\": 9, \"name\":\"k7\",\"comment\":\"\"}], "
+ + "\"status\":200}";
Schema schema = RestService.parseSchema(schemaStr, logger);
@@ -420,16 +423,19 @@ public class TestRowBatch {
Assert.assertTrue(rowBatch.hasNext());
List<Object> actualRow0 = rowBatch.next();
- Assert.assertEquals(DecimalData.fromBigDecimal(new BigDecimal(12.340000000), 11, 9), actualRow0.get(0));
+ Assert.assertEquals(DecimalData.fromBigDecimal(new BigDecimal(12.340000000), 11, 9),
+ DecimalData.fromBigDecimal((BigDecimal) actualRow0.get(0), 11, 9));
Assert.assertTrue(rowBatch.hasNext());
List<Object> actualRow1 = rowBatch.next();
- Assert.assertEquals(DecimalData.fromBigDecimal(new BigDecimal(88.880000000), 11, 9), actualRow1.get(0));
+ Assert.assertEquals(DecimalData.fromBigDecimal(new BigDecimal(88.880000000), 11, 9),
+ DecimalData.fromBigDecimal((BigDecimal) actualRow1.get(0), 11, 9));
Assert.assertTrue(rowBatch.hasNext());
List<Object> actualRow2 = rowBatch.next();
- Assert.assertEquals(DecimalData.fromBigDecimal(new BigDecimal(10.000000000),11, 9), actualRow2.get(0));
+ Assert.assertEquals(DecimalData.fromBigDecimal(new BigDecimal(10.000000000), 11, 9),
+ DecimalData.fromBigDecimal((BigDecimal) actualRow2.get(0), 11, 9));
Assert.assertFalse(rowBatch.hasNext());
thrown.expect(NoSuchElementException.class);
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org