You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ma...@apache.org on 2022/08/01 10:02:26 UTC
[flink] branch master updated: [FLINK-26270][Formats][CSV] Flink SQL write data to kafka by CSV format , whether decimal type was converted to scientific notation. This closes #20127
This is an automated email from the ASF dual-hosted git repository.
martijnvisser pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 09747f999e5 [FLINK-26270][Formats][CSV] Flink SQL write data to kafka by CSV format , whether decimal type was converted to scientific notation. This closes #20127
09747f999e5 is described below
commit 09747f999e54b7921e8c12c944b941b0777be48f
Author: fengjiankun <fe...@360.cn>
AuthorDate: Fri Jul 1 17:12:20 2022 +0800
[FLINK-26270][Formats][CSV] Flink SQL write data to kafka by CSV format , whether decimal type was converted to scientific notation. This closes #20127
---
.../docs/connectors/table/formats/csv.md | 7 +++
docs/content/docs/connectors/table/formats/csv.md | 8 +++
.../org/apache/flink/formats/csv/CsvCommons.java | 3 ++
.../apache/flink/formats/csv/CsvFormatFactory.java | 5 ++
.../apache/flink/formats/csv/CsvFormatOptions.java | 7 +++
.../formats/csv/CsvRowDataSerializationSchema.java | 15 ++++--
.../flink/formats/csv/CsvFormatFactoryTest.java | 58 ++++++++++++++++++++++
7 files changed, 100 insertions(+), 3 deletions(-)
diff --git a/docs/content.zh/docs/connectors/table/formats/csv.md b/docs/content.zh/docs/connectors/table/formats/csv.md
index 417af014a47..0c93e8e188b 100644
--- a/docs/content.zh/docs/connectors/table/formats/csv.md
+++ b/docs/content.zh/docs/connectors/table/formats/csv.md
@@ -141,6 +141,13 @@ Format 参数
<td>String</td>
<td>是否将 "null" 字符串转化为 null 值。</td>
</tr>
+ <tr>
+ <td><h5>csv.write-bigdecimal-in-scientific-notation</h5></td>
+ <td>可选</td>
+ <td style="word-wrap: break-word;">true</td>
+ <td>Boolean</td>
+ <td>设置将 Bigdecimal 类型的数据表示为科学计数法(默认为true,即需要转为科学计数法),例如一个BigDecimal的值为100000,设置true,结果为 '1E+5';设置为false,结果为 100000。注意:只有当值不等于0且是10的倍数才会转为科学计数法</td>
+ </tr>
</tbody>
</table>
diff --git a/docs/content/docs/connectors/table/formats/csv.md b/docs/content/docs/connectors/table/formats/csv.md
index e7cc54999a8..27389cc528e 100644
--- a/docs/content/docs/connectors/table/formats/csv.md
+++ b/docs/content/docs/connectors/table/formats/csv.md
@@ -152,6 +152,14 @@ Format Options
<td>String</td>
<td>Null literal string that is interpreted as a null value (disabled by default).</td>
</tr>
+ <tr>
+ <td><h5>csv.write-bigdecimal-in-scientific-notation</h5></td>
+ <td>optional</td>
+ <td>yes</td>
+ <td style="word-wrap: break-word;">true</td>
+ <td>Boolean</td>
+ <td>Enables representation of BigDecimal data type in scientific notation (default is true). For example, 100000 is encoded as 1E+5 by default, and will be written as 100000 if set this option to false. Note: Only when the value is not 0 and a multiple of 10 is converted to scientific notation.</td>
+ </tr>
</tbody>
</table>
diff --git a/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvCommons.java b/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvCommons.java
index 48825748f82..7d15bfb985b 100644
--- a/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvCommons.java
+++ b/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvCommons.java
@@ -35,6 +35,7 @@ import static org.apache.flink.formats.csv.CsvFormatOptions.FIELD_DELIMITER;
import static org.apache.flink.formats.csv.CsvFormatOptions.IGNORE_PARSE_ERRORS;
import static org.apache.flink.formats.csv.CsvFormatOptions.NULL_LITERAL;
import static org.apache.flink.formats.csv.CsvFormatOptions.QUOTE_CHARACTER;
+import static org.apache.flink.formats.csv.CsvFormatOptions.WRITE_BIGDECIMAL_IN_SCIENTIFIC_NOTATION;
/** A class with common CSV format constants and utility methods. */
class CsvCommons {
@@ -100,6 +101,7 @@ class CsvCommons {
options.add(ARRAY_ELEMENT_DELIMITER);
options.add(ESCAPE_CHARACTER);
options.add(NULL_LITERAL);
+ options.add(WRITE_BIGDECIMAL_IN_SCIENTIFIC_NOTATION);
return options;
}
@@ -112,6 +114,7 @@ class CsvCommons {
options.add(ARRAY_ELEMENT_DELIMITER);
options.add(ESCAPE_CHARACTER);
options.add(NULL_LITERAL);
+ options.add(WRITE_BIGDECIMAL_IN_SCIENTIFIC_NOTATION);
return options;
}
}
diff --git a/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvFormatFactory.java b/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvFormatFactory.java
index 49b16eda125..2db1a9f695c 100644
--- a/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvFormatFactory.java
+++ b/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvFormatFactory.java
@@ -52,6 +52,7 @@ import static org.apache.flink.formats.csv.CsvFormatOptions.FIELD_DELIMITER;
import static org.apache.flink.formats.csv.CsvFormatOptions.IGNORE_PARSE_ERRORS;
import static org.apache.flink.formats.csv.CsvFormatOptions.NULL_LITERAL;
import static org.apache.flink.formats.csv.CsvFormatOptions.QUOTE_CHARACTER;
+import static org.apache.flink.formats.csv.CsvFormatOptions.WRITE_BIGDECIMAL_IN_SCIENTIFIC_NOTATION;
/**
* Format factory for providing configured instances of CSV to RowData {@link SerializationSchema}
@@ -206,5 +207,9 @@ public final class CsvFormatFactory
.ifPresent(schemaBuilder::setEscapeCharacter);
formatOptions.getOptional(NULL_LITERAL).ifPresent(schemaBuilder::setNullLiteral);
+
+ formatOptions
+ .getOptional(WRITE_BIGDECIMAL_IN_SCIENTIFIC_NOTATION)
+ .ifPresent(schemaBuilder::setWriteBigDecimalInScientificNotation);
}
}
diff --git a/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvFormatOptions.java b/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvFormatOptions.java
index 459e7feef4f..c9beae1ca7e 100644
--- a/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvFormatOptions.java
+++ b/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvFormatOptions.java
@@ -87,5 +87,12 @@ public class CsvFormatOptions {
"Optional null literal string that is interpreted as a\n"
+ "null value (disabled by default)");
+ public static final ConfigOption<Boolean> WRITE_BIGDECIMAL_IN_SCIENTIFIC_NOTATION =
+ ConfigOptions.key("write-bigdecimal-in-scientific-notation")
+ .booleanType()
+ .defaultValue(true)
+ .withDescription(
+ "Enables representation of BigDecimal data type in scientific notation (default is true). For example, 100000 is encoded as 1E+5 by default, and will be written as 100000 if set this option to false. Note: Only when the value is not 0 and a multiple of 10 is converted to scientific notation.");
+
private CsvFormatOptions() {}
}
diff --git a/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowDataSerializationSchema.java b/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowDataSerializationSchema.java
index 16d7c966a52..25673fcc3d8 100644
--- a/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowDataSerializationSchema.java
+++ b/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowDataSerializationSchema.java
@@ -24,6 +24,7 @@ import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.util.Preconditions;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectWriter;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
@@ -70,10 +71,11 @@ public final class CsvRowDataSerializationSchema implements SerializationSchema<
.RowDataToCsvFormatConverterContext
converterContext;
- private CsvRowDataSerializationSchema(RowType rowType, CsvSchema csvSchema) {
+ private CsvRowDataSerializationSchema(
+ RowType rowType, CsvSchema csvSchema, CsvMapper csvMapper) {
this.rowType = rowType;
this.runtimeConverter = RowDataToCsvConverters.createRowConverter(rowType);
- this.csvMapper = new CsvMapper();
+ this.csvMapper = csvMapper;
this.csvSchema = csvSchema.withLineSeparator("");
this.objectWriter = csvMapper.writer(this.csvSchema);
}
@@ -84,6 +86,7 @@ public final class CsvRowDataSerializationSchema implements SerializationSchema<
private final RowType rowType;
private CsvSchema csvSchema;
+ private CsvMapper csvMapper;
/**
* Creates a {@link CsvRowDataSerializationSchema} expecting the given {@link RowType}.
@@ -95,6 +98,7 @@ public final class CsvRowDataSerializationSchema implements SerializationSchema<
this.rowType = rowType;
this.csvSchema = CsvRowSchemaConverter.convert(rowType);
+ this.csvMapper = new CsvMapper();
}
public Builder setFieldDelimiter(char c) {
@@ -128,8 +132,13 @@ public final class CsvRowDataSerializationSchema implements SerializationSchema<
return this;
}
+ public void setWriteBigDecimalInScientificNotation(boolean isScientificNotation) {
+ this.csvMapper.configure(
+ JsonGenerator.Feature.WRITE_BIGDECIMAL_AS_PLAIN, !isScientificNotation);
+ }
+
public CsvRowDataSerializationSchema build() {
- return new CsvRowDataSerializationSchema(rowType, csvSchema);
+ return new CsvRowDataSerializationSchema(rowType, csvSchema, csvMapper);
}
}
diff --git a/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/CsvFormatFactoryTest.java b/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/CsvFormatFactoryTest.java
index 6edde34a947..468cc8ff959 100644
--- a/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/CsvFormatFactoryTest.java
+++ b/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/CsvFormatFactoryTest.java
@@ -20,7 +20,10 @@ package org.apache.flink.formats.csv;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.catalog.Column;
+import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.connector.Projection;
import org.apache.flink.table.connector.format.ProjectableDecodingFormat;
import org.apache.flink.table.connector.sink.DynamicTableSink;
@@ -37,6 +40,7 @@ import org.junit.Test;
import org.junit.rules.ExpectedException;
import java.io.IOException;
+import java.math.BigDecimal;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
@@ -45,6 +49,7 @@ import java.util.Map;
import java.util.function.Consumer;
import static org.apache.flink.core.testutils.FlinkMatchers.containsCause;
+import static org.apache.flink.table.data.DecimalData.fromBigDecimal;
import static org.apache.flink.table.data.StringData.fromString;
import static org.apache.flink.table.factories.utils.FactoryMocks.PHYSICAL_DATA_TYPE;
import static org.apache.flink.table.factories.utils.FactoryMocks.PHYSICAL_TYPE;
@@ -230,6 +235,58 @@ public class CsvFormatFactoryTest extends TestLogger {
createTableSink(SCHEMA, options);
}
+ @Test
+ public void testSerializationWithWriteBigDecimalInScientificNotation() {
+ final Map<String, String> options =
+ getModifiedOptions(
+ opts -> opts.put("csv.write-bigdecimal-in-scientific-notation", "true"));
+
+ ResolvedSchema schema =
+ ResolvedSchema.of(
+ Column.physical("a", DataTypes.STRING()),
+ Column.physical("b", DataTypes.DECIMAL(10, 3)),
+ Column.physical("c", DataTypes.BOOLEAN()));
+ final DynamicTableSink actualSink = createTableSink(schema, options);
+ assertThat(actualSink).isInstanceOf(TestDynamicTableFactory.DynamicTableSinkMock.class);
+ TestDynamicTableFactory.DynamicTableSinkMock sinkMock =
+ (TestDynamicTableFactory.DynamicTableSinkMock) actualSink;
+
+ SerializationSchema<RowData> runtimeEncoder =
+ sinkMock.valueFormat.createRuntimeEncoder(null, schema.toPhysicalRowDataType());
+
+ RowData rowData =
+ GenericRowData.of(
+ fromString("abc"), fromBigDecimal(new BigDecimal("100000"), 10, 3), false);
+ byte[] bytes = runtimeEncoder.serialize(rowData);
+ assertThat(new String(bytes)).isEqualTo("abc;'1E+5';false");
+ }
+
+ @Test
+ public void testSerializationWithNotWriteBigDecimalInScientificNotation() {
+ final Map<String, String> options =
+ getModifiedOptions(
+ opts -> opts.put("csv.write-bigdecimal-in-scientific-notation", "false"));
+
+ ResolvedSchema schema =
+ ResolvedSchema.of(
+ Column.physical("a", DataTypes.STRING()),
+ Column.physical("b", DataTypes.DECIMAL(10, 3)),
+ Column.physical("c", DataTypes.BOOLEAN()));
+ final DynamicTableSink actualSink = createTableSink(schema, options);
+ assertThat(actualSink).isInstanceOf(TestDynamicTableFactory.DynamicTableSinkMock.class);
+ TestDynamicTableFactory.DynamicTableSinkMock sinkMock =
+ (TestDynamicTableFactory.DynamicTableSinkMock) actualSink;
+
+ SerializationSchema<RowData> runtimeEncoder =
+ sinkMock.valueFormat.createRuntimeEncoder(null, schema.toPhysicalRowDataType());
+
+ RowData rowData =
+ GenericRowData.of(
+ fromString("abc"), fromBigDecimal(new BigDecimal("100000"), 10, 3), false);
+ byte[] bytes = runtimeEncoder.serialize(rowData);
+ assertThat(new String(bytes)).isEqualTo("abc;'100000';false");
+ }
+
@Test
public void testProjectionPushdown() throws IOException {
final Map<String, String> options = getAllOptions();
@@ -311,6 +368,7 @@ public class CsvFormatFactoryTest extends TestLogger {
options.put("csv.array-element-delimiter", "|");
options.put("csv.escape-character", "\\");
options.put("csv.null-literal", "n/a");
+ options.put("csv.write-bigdecimal-in-scientific-notation", "true");
return options;
}