You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ma...@apache.org on 2022/08/01 10:02:26 UTC

[flink] branch master updated: [FLINK-26270][Formats][CSV] Flink SQL write data to kafka by CSV format , whether decimal type was converted to scientific notation. This closes #20127

This is an automated email from the ASF dual-hosted git repository.

martijnvisser pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 09747f999e5 [FLINK-26270][Formats][CSV] Flink SQL write data to kafka by CSV format , whether decimal type was converted to scientific notation. This closes #20127
09747f999e5 is described below

commit 09747f999e54b7921e8c12c944b941b0777be48f
Author: fengjiankun <fe...@360.cn>
AuthorDate: Fri Jul 1 17:12:20 2022 +0800

    [FLINK-26270][Formats][CSV] Flink SQL write data to kafka by CSV format , whether decimal type was converted to scientific notation. This closes #20127
---
 .../docs/connectors/table/formats/csv.md           |  7 +++
 docs/content/docs/connectors/table/formats/csv.md  |  8 +++
 .../org/apache/flink/formats/csv/CsvCommons.java   |  3 ++
 .../apache/flink/formats/csv/CsvFormatFactory.java |  5 ++
 .../apache/flink/formats/csv/CsvFormatOptions.java |  7 +++
 .../formats/csv/CsvRowDataSerializationSchema.java | 15 ++++--
 .../flink/formats/csv/CsvFormatFactoryTest.java    | 58 ++++++++++++++++++++++
 7 files changed, 100 insertions(+), 3 deletions(-)

diff --git a/docs/content.zh/docs/connectors/table/formats/csv.md b/docs/content.zh/docs/connectors/table/formats/csv.md
index 417af014a47..0c93e8e188b 100644
--- a/docs/content.zh/docs/connectors/table/formats/csv.md
+++ b/docs/content.zh/docs/connectors/table/formats/csv.md
@@ -141,6 +141,13 @@ Format 参数
       <td>String</td>
       <td>是否将 "null" 字符串转化为 null 值。</td>
     </tr>
+    <tr>
+      <td><h5>csv.write-bigdecimal-in-scientific-notation</h5></td>
+      <td>可选</td>
+      <td style="word-wrap: break-word;">true</td>
+      <td>Boolean</td>
+      <td>设置将 Bigdecimal 类型的数据表示为科学计数法(默认为true,即需要转为科学计数法),例如一个BigDecimal的值为100000,设置true,结果为 '1E+5';设置为false,结果为 100000。注意:只有当值不等于0且是10的倍数才会转为科学计数法</td>
+    </tr>
     </tbody>
 </table>
 
diff --git a/docs/content/docs/connectors/table/formats/csv.md b/docs/content/docs/connectors/table/formats/csv.md
index e7cc54999a8..27389cc528e 100644
--- a/docs/content/docs/connectors/table/formats/csv.md
+++ b/docs/content/docs/connectors/table/formats/csv.md
@@ -152,6 +152,14 @@ Format Options
       <td>String</td>
       <td>Null literal string that is interpreted as a null value (disabled by default).</td>
     </tr>
+    <tr>
+      <td><h5>csv.write-bigdecimal-in-scientific-notation</h5></td>
+      <td>optional</td>
+      <td>yes</td>
+      <td style="word-wrap: break-word;">true</td>
+      <td>Boolean</td>
+      <td>Enables representation of BigDecimal data type in scientific notation (default is true). For example, 100000 is encoded as 1E+5 by default, and will be written as 100000 if set this option to false. Note: Only when the value is not 0 and a multiple of 10 is converted to scientific notation.</td>
+    </tr>
     </tbody>
 </table>
 
diff --git a/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvCommons.java b/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvCommons.java
index 48825748f82..7d15bfb985b 100644
--- a/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvCommons.java
+++ b/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvCommons.java
@@ -35,6 +35,7 @@ import static org.apache.flink.formats.csv.CsvFormatOptions.FIELD_DELIMITER;
 import static org.apache.flink.formats.csv.CsvFormatOptions.IGNORE_PARSE_ERRORS;
 import static org.apache.flink.formats.csv.CsvFormatOptions.NULL_LITERAL;
 import static org.apache.flink.formats.csv.CsvFormatOptions.QUOTE_CHARACTER;
+import static org.apache.flink.formats.csv.CsvFormatOptions.WRITE_BIGDECIMAL_IN_SCIENTIFIC_NOTATION;
 
 /** A class with common CSV format constants and utility methods. */
 class CsvCommons {
@@ -100,6 +101,7 @@ class CsvCommons {
         options.add(ARRAY_ELEMENT_DELIMITER);
         options.add(ESCAPE_CHARACTER);
         options.add(NULL_LITERAL);
+        options.add(WRITE_BIGDECIMAL_IN_SCIENTIFIC_NOTATION);
         return options;
     }
 
@@ -112,6 +114,7 @@ class CsvCommons {
         options.add(ARRAY_ELEMENT_DELIMITER);
         options.add(ESCAPE_CHARACTER);
         options.add(NULL_LITERAL);
+        options.add(WRITE_BIGDECIMAL_IN_SCIENTIFIC_NOTATION);
         return options;
     }
 }
diff --git a/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvFormatFactory.java b/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvFormatFactory.java
index 49b16eda125..2db1a9f695c 100644
--- a/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvFormatFactory.java
+++ b/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvFormatFactory.java
@@ -52,6 +52,7 @@ import static org.apache.flink.formats.csv.CsvFormatOptions.FIELD_DELIMITER;
 import static org.apache.flink.formats.csv.CsvFormatOptions.IGNORE_PARSE_ERRORS;
 import static org.apache.flink.formats.csv.CsvFormatOptions.NULL_LITERAL;
 import static org.apache.flink.formats.csv.CsvFormatOptions.QUOTE_CHARACTER;
+import static org.apache.flink.formats.csv.CsvFormatOptions.WRITE_BIGDECIMAL_IN_SCIENTIFIC_NOTATION;
 
 /**
  * Format factory for providing configured instances of CSV to RowData {@link SerializationSchema}
@@ -206,5 +207,9 @@ public final class CsvFormatFactory
                 .ifPresent(schemaBuilder::setEscapeCharacter);
 
         formatOptions.getOptional(NULL_LITERAL).ifPresent(schemaBuilder::setNullLiteral);
+
+        formatOptions
+                .getOptional(WRITE_BIGDECIMAL_IN_SCIENTIFIC_NOTATION)
+                .ifPresent(schemaBuilder::setWriteBigDecimalInScientificNotation);
     }
 }
diff --git a/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvFormatOptions.java b/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvFormatOptions.java
index 459e7feef4f..c9beae1ca7e 100644
--- a/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvFormatOptions.java
+++ b/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvFormatOptions.java
@@ -87,5 +87,12 @@ public class CsvFormatOptions {
                             "Optional null literal string that is interpreted as a\n"
                                     + "null value (disabled by default)");
 
+    public static final ConfigOption<Boolean> WRITE_BIGDECIMAL_IN_SCIENTIFIC_NOTATION =
+            ConfigOptions.key("write-bigdecimal-in-scientific-notation")
+                    .booleanType()
+                    .defaultValue(true)
+                    .withDescription(
+                            "Enables representation of BigDecimal data type in scientific notation (default is true). For example, 100000 is encoded as 1E+5 by default, and will be written as 100000 if set this option to false. Note: Only when the value is not 0 and a multiple of 10 is converted to scientific notation.");
+
     private CsvFormatOptions() {}
 }
diff --git a/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowDataSerializationSchema.java b/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowDataSerializationSchema.java
index 16d7c966a52..25673fcc3d8 100644
--- a/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowDataSerializationSchema.java
+++ b/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowDataSerializationSchema.java
@@ -24,6 +24,7 @@ import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.util.Preconditions;
 
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectWriter;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
@@ -70,10 +71,11 @@ public final class CsvRowDataSerializationSchema implements SerializationSchema<
                     .RowDataToCsvFormatConverterContext
             converterContext;
 
-    private CsvRowDataSerializationSchema(RowType rowType, CsvSchema csvSchema) {
+    private CsvRowDataSerializationSchema(
+            RowType rowType, CsvSchema csvSchema, CsvMapper csvMapper) {
         this.rowType = rowType;
         this.runtimeConverter = RowDataToCsvConverters.createRowConverter(rowType);
-        this.csvMapper = new CsvMapper();
+        this.csvMapper = csvMapper;
         this.csvSchema = csvSchema.withLineSeparator("");
         this.objectWriter = csvMapper.writer(this.csvSchema);
     }
@@ -84,6 +86,7 @@ public final class CsvRowDataSerializationSchema implements SerializationSchema<
 
         private final RowType rowType;
         private CsvSchema csvSchema;
+        private CsvMapper csvMapper;
 
         /**
          * Creates a {@link CsvRowDataSerializationSchema} expecting the given {@link RowType}.
@@ -95,6 +98,7 @@ public final class CsvRowDataSerializationSchema implements SerializationSchema<
 
             this.rowType = rowType;
             this.csvSchema = CsvRowSchemaConverter.convert(rowType);
+            this.csvMapper = new CsvMapper();
         }
 
         public Builder setFieldDelimiter(char c) {
@@ -128,8 +132,13 @@ public final class CsvRowDataSerializationSchema implements SerializationSchema<
             return this;
         }
 
+        public void setWriteBigDecimalInScientificNotation(boolean isScientificNotation) {
+            this.csvMapper.configure(
+                    JsonGenerator.Feature.WRITE_BIGDECIMAL_AS_PLAIN, !isScientificNotation);
+        }
+
         public CsvRowDataSerializationSchema build() {
-            return new CsvRowDataSerializationSchema(rowType, csvSchema);
+            return new CsvRowDataSerializationSchema(rowType, csvSchema, csvMapper);
         }
     }
 
diff --git a/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/CsvFormatFactoryTest.java b/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/CsvFormatFactoryTest.java
index 6edde34a947..468cc8ff959 100644
--- a/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/CsvFormatFactoryTest.java
+++ b/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/CsvFormatFactoryTest.java
@@ -20,7 +20,10 @@ package org.apache.flink.formats.csv;
 
 import org.apache.flink.api.common.serialization.DeserializationSchema;
 import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.catalog.Column;
+import org.apache.flink.table.catalog.ResolvedSchema;
 import org.apache.flink.table.connector.Projection;
 import org.apache.flink.table.connector.format.ProjectableDecodingFormat;
 import org.apache.flink.table.connector.sink.DynamicTableSink;
@@ -37,6 +40,7 @@ import org.junit.Test;
 import org.junit.rules.ExpectedException;
 
 import java.io.IOException;
+import java.math.BigDecimal;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
@@ -45,6 +49,7 @@ import java.util.Map;
 import java.util.function.Consumer;
 
 import static org.apache.flink.core.testutils.FlinkMatchers.containsCause;
+import static org.apache.flink.table.data.DecimalData.fromBigDecimal;
 import static org.apache.flink.table.data.StringData.fromString;
 import static org.apache.flink.table.factories.utils.FactoryMocks.PHYSICAL_DATA_TYPE;
 import static org.apache.flink.table.factories.utils.FactoryMocks.PHYSICAL_TYPE;
@@ -230,6 +235,58 @@ public class CsvFormatFactoryTest extends TestLogger {
         createTableSink(SCHEMA, options);
     }
 
+    @Test
+    public void testSerializationWithWriteBigDecimalInScientificNotation() {
+        final Map<String, String> options =
+                getModifiedOptions(
+                        opts -> opts.put("csv.write-bigdecimal-in-scientific-notation", "true"));
+
+        ResolvedSchema schema =
+                ResolvedSchema.of(
+                        Column.physical("a", DataTypes.STRING()),
+                        Column.physical("b", DataTypes.DECIMAL(10, 3)),
+                        Column.physical("c", DataTypes.BOOLEAN()));
+        final DynamicTableSink actualSink = createTableSink(schema, options);
+        assertThat(actualSink).isInstanceOf(TestDynamicTableFactory.DynamicTableSinkMock.class);
+        TestDynamicTableFactory.DynamicTableSinkMock sinkMock =
+                (TestDynamicTableFactory.DynamicTableSinkMock) actualSink;
+
+        SerializationSchema<RowData> runtimeEncoder =
+                sinkMock.valueFormat.createRuntimeEncoder(null, schema.toPhysicalRowDataType());
+
+        RowData rowData =
+                GenericRowData.of(
+                        fromString("abc"), fromBigDecimal(new BigDecimal("100000"), 10, 3), false);
+        byte[] bytes = runtimeEncoder.serialize(rowData);
+        assertThat(new String(bytes)).isEqualTo("abc;'1E+5';false");
+    }
+
+    @Test
+    public void testSerializationWithNotWriteBigDecimalInScientificNotation() {
+        final Map<String, String> options =
+                getModifiedOptions(
+                        opts -> opts.put("csv.write-bigdecimal-in-scientific-notation", "false"));
+
+        ResolvedSchema schema =
+                ResolvedSchema.of(
+                        Column.physical("a", DataTypes.STRING()),
+                        Column.physical("b", DataTypes.DECIMAL(10, 3)),
+                        Column.physical("c", DataTypes.BOOLEAN()));
+        final DynamicTableSink actualSink = createTableSink(schema, options);
+        assertThat(actualSink).isInstanceOf(TestDynamicTableFactory.DynamicTableSinkMock.class);
+        TestDynamicTableFactory.DynamicTableSinkMock sinkMock =
+                (TestDynamicTableFactory.DynamicTableSinkMock) actualSink;
+
+        SerializationSchema<RowData> runtimeEncoder =
+                sinkMock.valueFormat.createRuntimeEncoder(null, schema.toPhysicalRowDataType());
+
+        RowData rowData =
+                GenericRowData.of(
+                        fromString("abc"), fromBigDecimal(new BigDecimal("100000"), 10, 3), false);
+        byte[] bytes = runtimeEncoder.serialize(rowData);
+        assertThat(new String(bytes)).isEqualTo("abc;'100000';false");
+    }
+
     @Test
     public void testProjectionPushdown() throws IOException {
         final Map<String, String> options = getAllOptions();
@@ -311,6 +368,7 @@ public class CsvFormatFactoryTest extends TestLogger {
         options.put("csv.array-element-delimiter", "|");
         options.put("csv.escape-character", "\\");
         options.put("csv.null-literal", "n/a");
+        options.put("csv.write-bigdecimal-in-scientific-notation", "true");
         return options;
     }