You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2021/02/02 04:04:50 UTC

[flink] branch master updated: [FLINK-21207][csv] Fix 'csv.disable-quote-character' = 'true' can not take effect in source table

This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 03871e1  [FLINK-21207][csv] Fix 'csv.disable-quote-character' = 'true' can not take effect in source table
03871e1 is described below

commit 03871e142746832a64c84756e928da0c3bdff6a2
Author: sharkdtu(涂小刚) <sh...@tencent.com>
AuthorDate: Fri Jan 29 22:58:11 2021 +0800

    [FLINK-21207][csv] Fix 'csv.disable-quote-character' = 'true' can not take effect in source table
    
    This closes #14813
---
 .../org/apache/flink/formats/csv/CsvFormatFactory.java     | 12 ++++++++----
 .../flink/formats/csv/CsvRowDataDeserializationSchema.java |  5 +++++
 .../org/apache/flink/formats/csv/CsvFormatFactoryTest.java | 14 ++++++++++++++
 .../flink/formats/csv/CsvRowDataSerDeSchemaTest.java       |  9 +++++++++
 4 files changed, 36 insertions(+), 4 deletions(-)

diff --git a/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvFormatFactory.java b/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvFormatFactory.java
index 6f9556a..4908c2d 100644
--- a/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvFormatFactory.java
+++ b/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvFormatFactory.java
@@ -193,10 +193,14 @@ public final class CsvFormatFactory
                 .map(delimiter -> StringEscapeUtils.unescapeJava(delimiter).charAt(0))
                 .ifPresent(schemaBuilder::setFieldDelimiter);
 
-        formatOptions
-                .getOptional(QUOTE_CHARACTER)
-                .map(quote -> quote.charAt(0))
-                .ifPresent(schemaBuilder::setQuoteCharacter);
+        if (formatOptions.get(DISABLE_QUOTE_CHARACTER)) {
+            schemaBuilder.disableQuoteCharacter();
+        } else {
+            formatOptions
+                    .getOptional(QUOTE_CHARACTER)
+                    .map(quote -> quote.charAt(0))
+                    .ifPresent(schemaBuilder::setQuoteCharacter);
+        }
 
         formatOptions.getOptional(ALLOW_COMMENTS).ifPresent(schemaBuilder::setAllowComments);
 
diff --git a/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowDataDeserializationSchema.java b/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowDataDeserializationSchema.java
index aab4dd5..84f3ef5 100644
--- a/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowDataDeserializationSchema.java
+++ b/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowDataDeserializationSchema.java
@@ -114,6 +114,11 @@ public final class CsvRowDataDeserializationSchema implements DeserializationSch
             return this;
         }
 
+        public Builder disableQuoteCharacter() {
+            this.csvSchema = this.csvSchema.rebuild().disableQuoteChar().build();
+            return this;
+        }
+
         public Builder setQuoteCharacter(char c) {
             this.csvSchema = this.csvSchema.rebuild().setQuoteChar(c).build();
             return this;
diff --git a/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/CsvFormatFactoryTest.java b/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/CsvFormatFactoryTest.java
index 1514949..857ea29 100644
--- a/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/CsvFormatFactoryTest.java
+++ b/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/CsvFormatFactoryTest.java
@@ -100,6 +100,20 @@ public class CsvFormatFactoryTest extends TestLogger {
                             opts.remove("csv.quote-character");
                         });
 
+        final CsvRowDataDeserializationSchema expectedDeser =
+                new CsvRowDataDeserializationSchema.Builder(ROW_TYPE, InternalTypeInfo.of(ROW_TYPE))
+                        .setFieldDelimiter(';')
+                        .setAllowComments(true)
+                        .setIgnoreParseErrors(true)
+                        .setArrayElementDelimiter("|")
+                        .setEscapeCharacter('\\')
+                        .setNullLiteral("n/a")
+                        .disableQuoteCharacter()
+                        .build();
+        DeserializationSchema<RowData> actualDeser = createDeserializationSchema(options);
+
+        assertEquals(expectedDeser, actualDeser);
+
         final CsvRowDataSerializationSchema expectedSer =
                 new CsvRowDataSerializationSchema.Builder(ROW_TYPE)
                         .setFieldDelimiter(';')
diff --git a/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/CsvRowDataSerDeSchemaTest.java b/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/CsvRowDataSerDeSchemaTest.java
index 619b9dc..0fc2df2 100644
--- a/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/CsvRowDataSerDeSchemaTest.java
+++ b/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/CsvRowDataSerDeSchemaTest.java
@@ -299,6 +299,15 @@ public class CsvRowDataSerDeSchemaTest {
         testSerDeConsistency(nullRow, serSchemaBuilder, deserSchemaBuilder);
     }
 
+    @Test
+    public void testDeserializationWithDisableQuoteCharacter() throws Exception {
+        Consumer<CsvRowDataDeserializationSchema.Builder> deserConfig =
+                (deserSchemaBuilder) ->
+                        deserSchemaBuilder.disableQuoteCharacter().setFieldDelimiter(',');
+
+        testFieldDeserialization(STRING(), "\"abc", "\"abc", deserConfig, ",");
+    }
+
     private void testNullableField(DataType fieldType, String string, Object value)
             throws Exception {
         testField(