You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2021/02/02 04:04:50 UTC
[flink] branch master updated: [FLINK-21207][csv] Fix
'csv.disable-quote-character' = 'true' can not take effect in source table
This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 03871e1 [FLINK-21207][csv] Fix 'csv.disable-quote-character' = 'true' can not take effect in source table
03871e1 is described below
commit 03871e142746832a64c84756e928da0c3bdff6a2
Author: sharkdtu(涂小刚) <sh...@tencent.com>
AuthorDate: Fri Jan 29 22:58:11 2021 +0800
[FLINK-21207][csv] Fix 'csv.disable-quote-character' = 'true' can not take effect in source table
This closes #14813
---
.../org/apache/flink/formats/csv/CsvFormatFactory.java | 12 ++++++++----
.../flink/formats/csv/CsvRowDataDeserializationSchema.java | 5 +++++
.../org/apache/flink/formats/csv/CsvFormatFactoryTest.java | 14 ++++++++++++++
.../flink/formats/csv/CsvRowDataSerDeSchemaTest.java | 9 +++++++++
4 files changed, 36 insertions(+), 4 deletions(-)
diff --git a/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvFormatFactory.java b/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvFormatFactory.java
index 6f9556a..4908c2d 100644
--- a/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvFormatFactory.java
+++ b/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvFormatFactory.java
@@ -193,10 +193,14 @@ public final class CsvFormatFactory
.map(delimiter -> StringEscapeUtils.unescapeJava(delimiter).charAt(0))
.ifPresent(schemaBuilder::setFieldDelimiter);
- formatOptions
- .getOptional(QUOTE_CHARACTER)
- .map(quote -> quote.charAt(0))
- .ifPresent(schemaBuilder::setQuoteCharacter);
+ if (formatOptions.get(DISABLE_QUOTE_CHARACTER)) {
+ schemaBuilder.disableQuoteCharacter();
+ } else {
+ formatOptions
+ .getOptional(QUOTE_CHARACTER)
+ .map(quote -> quote.charAt(0))
+ .ifPresent(schemaBuilder::setQuoteCharacter);
+ }
formatOptions.getOptional(ALLOW_COMMENTS).ifPresent(schemaBuilder::setAllowComments);
diff --git a/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowDataDeserializationSchema.java b/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowDataDeserializationSchema.java
index aab4dd5..84f3ef5 100644
--- a/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowDataDeserializationSchema.java
+++ b/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowDataDeserializationSchema.java
@@ -114,6 +114,11 @@ public final class CsvRowDataDeserializationSchema implements DeserializationSch
return this;
}
+ public Builder disableQuoteCharacter() {
+ this.csvSchema = this.csvSchema.rebuild().disableQuoteChar().build();
+ return this;
+ }
+
public Builder setQuoteCharacter(char c) {
this.csvSchema = this.csvSchema.rebuild().setQuoteChar(c).build();
return this;
diff --git a/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/CsvFormatFactoryTest.java b/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/CsvFormatFactoryTest.java
index 1514949..857ea29 100644
--- a/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/CsvFormatFactoryTest.java
+++ b/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/CsvFormatFactoryTest.java
@@ -100,6 +100,20 @@ public class CsvFormatFactoryTest extends TestLogger {
opts.remove("csv.quote-character");
});
+ final CsvRowDataDeserializationSchema expectedDeser =
+ new CsvRowDataDeserializationSchema.Builder(ROW_TYPE, InternalTypeInfo.of(ROW_TYPE))
+ .setFieldDelimiter(';')
+ .setAllowComments(true)
+ .setIgnoreParseErrors(true)
+ .setArrayElementDelimiter("|")
+ .setEscapeCharacter('\\')
+ .setNullLiteral("n/a")
+ .disableQuoteCharacter()
+ .build();
+ DeserializationSchema<RowData> actualDeser = createDeserializationSchema(options);
+
+ assertEquals(expectedDeser, actualDeser);
+
final CsvRowDataSerializationSchema expectedSer =
new CsvRowDataSerializationSchema.Builder(ROW_TYPE)
.setFieldDelimiter(';')
diff --git a/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/CsvRowDataSerDeSchemaTest.java b/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/CsvRowDataSerDeSchemaTest.java
index 619b9dc..0fc2df2 100644
--- a/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/CsvRowDataSerDeSchemaTest.java
+++ b/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/CsvRowDataSerDeSchemaTest.java
@@ -299,6 +299,15 @@ public class CsvRowDataSerDeSchemaTest {
testSerDeConsistency(nullRow, serSchemaBuilder, deserSchemaBuilder);
}
+ @Test
+ public void testDeserializationWithDisableQuoteCharacter() throws Exception {
+ Consumer<CsvRowDataDeserializationSchema.Builder> deserConfig =
+ (deserSchemaBuilder) ->
+ deserSchemaBuilder.disableQuoteCharacter().setFieldDelimiter(',');
+
+ testFieldDeserialization(STRING(), "\"abc", "\"abc", deserConfig, ",");
+ }
+
private void testNullableField(DataType fieldType, String string, Object value)
throws Exception {
testField(