You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2022/08/08 17:08:21 UTC
[flink] 02/04: [FLINK-28621][formats] Initialize mappers in open()
This is an automated email from the ASF dual-hosted git repository.
chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 9a967c010a58e0b2277516068256ef45ee711edc
Author: Chesnay Schepler <ch...@apache.org>
AuthorDate: Thu Jul 28 16:09:43 2022 +0200
[FLINK-28621][formats] Initialize mappers in open()
---
.../JSONKeyValueDeserializationSchema.java | 9 ++++--
.../csv/CsvRowDataDeserializationSchema.java | 8 +++--
.../formats/csv/CsvRowDataSerializationSchema.java | 37 ++++++++++++++++------
.../formats/csv/CsvRowDeserializationSchema.java | 8 +++--
.../formats/csv/CsvRowSerializationSchema.java | 10 ++++--
.../json/JsonRowDataDeserializationSchema.java | 17 +++++++---
.../json/JsonRowDataSerializationSchema.java | 12 +++++--
.../formats/json/JsonRowDeserializationSchema.java | 12 +++++--
.../formats/json/JsonRowSerializationSchema.java | 7 +++-
9 files changed, 90 insertions(+), 30 deletions(-)
diff --git a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/util/serialization/JSONKeyValueDeserializationSchema.java b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/util/serialization/JSONKeyValueDeserializationSchema.java
index cebd40aac26..86817433a6f 100644
--- a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/util/serialization/JSONKeyValueDeserializationSchema.java
+++ b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/util/serialization/JSONKeyValueDeserializationSchema.java
@@ -18,6 +18,7 @@
package org.apache.flink.streaming.util.serialization;
import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema;
@@ -52,11 +53,13 @@ public class JSONKeyValueDeserializationSchema implements KafkaDeserializationSc
this.includeMetadata = includeMetadata;
}
+ @Override
+ public void open(DeserializationSchema.InitializationContext context) throws Exception {
+ mapper = new ObjectMapper();
+ }
+
@Override
public ObjectNode deserialize(ConsumerRecord<byte[], byte[]> record) throws Exception {
- if (mapper == null) {
- mapper = new ObjectMapper();
- }
ObjectNode node = mapper.createObjectNode();
if (record.key() != null) {
node.set("key", mapper.readValue(record.key(), JsonNode.class));
diff --git a/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowDataDeserializationSchema.java b/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowDataDeserializationSchema.java
index f5830f31f83..0c3f68e97f8 100644
--- a/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowDataDeserializationSchema.java
+++ b/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowDataDeserializationSchema.java
@@ -59,7 +59,7 @@ public final class CsvRowDataDeserializationSchema implements DeserializationSch
private final CsvSchema csvSchema;
/** Object reader used to read rows. It is configured by {@link CsvSchema}. */
- private final ObjectReader objectReader;
+ private transient ObjectReader objectReader;
/** Flag indicating whether to ignore invalid fields/rows (default: throw an exception). */
private final boolean ignoreParseErrors;
@@ -72,10 +72,14 @@ public final class CsvRowDataDeserializationSchema implements DeserializationSch
this.resultTypeInfo = resultTypeInfo;
this.runtimeConverter = runtimeConverter;
this.csvSchema = csvSchema;
- this.objectReader = new CsvMapper().readerFor(JsonNode.class).with(csvSchema);
this.ignoreParseErrors = ignoreParseErrors;
}
+ @Override
+ public void open(InitializationContext context) {
+ this.objectReader = new CsvMapper().readerFor(JsonNode.class).with(csvSchema);
+ }
+
/** A builder for creating a {@link CsvRowDataDeserializationSchema}. */
@Internal
public static class Builder {
diff --git a/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowDataSerializationSchema.java b/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowDataSerializationSchema.java
index 25673fcc3d8..f7fd810e691 100644
--- a/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowDataSerializationSchema.java
+++ b/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowDataSerializationSchema.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.function.SerializableSupplier;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
@@ -54,14 +55,16 @@ public final class CsvRowDataSerializationSchema implements SerializationSchema<
/** Runtime instance that performs the actual work. */
private final RowDataToCsvConverters.RowDataToCsvConverter runtimeConverter;
+ private final SerializableSupplier<CsvMapper> csvMapperSuppler;
+
/** CsvMapper used to write {@link JsonNode} into bytes. */
- private final CsvMapper csvMapper;
+ private transient CsvMapper csvMapper;
/** Schema describing the input CSV data. */
private final CsvSchema csvSchema;
/** Object writer used to write rows. It is configured by {@link CsvSchema}. */
- private final ObjectWriter objectWriter;
+ private transient ObjectWriter objectWriter;
/** Reusable object node. */
private transient ObjectNode root;
@@ -72,11 +75,18 @@ public final class CsvRowDataSerializationSchema implements SerializationSchema<
converterContext;
private CsvRowDataSerializationSchema(
- RowType rowType, CsvSchema csvSchema, CsvMapper csvMapper) {
+ RowType rowType,
+ CsvSchema csvSchema,
+ SerializableSupplier<CsvMapper> csvMapperSupplier) {
this.rowType = rowType;
this.runtimeConverter = RowDataToCsvConverters.createRowConverter(rowType);
- this.csvMapper = csvMapper;
this.csvSchema = csvSchema.withLineSeparator("");
+ this.csvMapperSuppler = csvMapperSupplier;
+ }
+
+ @Override
+ public void open(InitializationContext context) throws Exception {
+ this.csvMapper = csvMapperSuppler.get();
this.objectWriter = csvMapper.writer(this.csvSchema);
}
@@ -86,7 +96,7 @@ public final class CsvRowDataSerializationSchema implements SerializationSchema<
private final RowType rowType;
private CsvSchema csvSchema;
- private CsvMapper csvMapper;
+ private boolean isScientificNotation;
/**
* Creates a {@link CsvRowDataSerializationSchema} expecting the given {@link RowType}.
@@ -98,7 +108,6 @@ public final class CsvRowDataSerializationSchema implements SerializationSchema<
this.rowType = rowType;
this.csvSchema = CsvRowSchemaConverter.convert(rowType);
- this.csvMapper = new CsvMapper();
}
public Builder setFieldDelimiter(char c) {
@@ -133,12 +142,22 @@ public final class CsvRowDataSerializationSchema implements SerializationSchema<
}
public void setWriteBigDecimalInScientificNotation(boolean isScientificNotation) {
- this.csvMapper.configure(
- JsonGenerator.Feature.WRITE_BIGDECIMAL_AS_PLAIN, !isScientificNotation);
+ this.isScientificNotation = isScientificNotation;
}
public CsvRowDataSerializationSchema build() {
- return new CsvRowDataSerializationSchema(rowType, csvSchema, csvMapper);
+ // assign to local variable to avoid reference to non-serializable builder
+ final boolean isScientificNotation = this.isScientificNotation;
+ return new CsvRowDataSerializationSchema(
+ rowType,
+ csvSchema,
+ () -> {
+ final CsvMapper csvMapper = new CsvMapper();
+ csvMapper.configure(
+ JsonGenerator.Feature.WRITE_BIGDECIMAL_AS_PLAIN,
+ !isScientificNotation);
+ return csvMapper;
+ });
}
}
diff --git a/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowDeserializationSchema.java b/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowDeserializationSchema.java
index 17ab683acb3..24153b4de46 100644
--- a/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowDeserializationSchema.java
+++ b/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowDeserializationSchema.java
@@ -78,7 +78,7 @@ public final class CsvRowDeserializationSchema implements DeserializationSchema<
private final CsvSchema csvSchema;
/** Object reader used to read rows. It is configured by {@link CsvSchema}. */
- private final ObjectReader objectReader;
+ private transient ObjectReader objectReader;
/** Flag indicating whether to ignore invalid fields/rows (default: throw an exception). */
private final boolean ignoreParseErrors;
@@ -88,10 +88,14 @@ public final class CsvRowDeserializationSchema implements DeserializationSchema<
this.typeInfo = typeInfo;
this.runtimeConverter = createRowRuntimeConverter(typeInfo, ignoreParseErrors, true);
this.csvSchema = csvSchema;
- this.objectReader = new CsvMapper().readerFor(JsonNode.class).with(csvSchema);
this.ignoreParseErrors = ignoreParseErrors;
}
+ @Override
+ public void open(InitializationContext context) throws Exception {
+ objectReader = new CsvMapper().readerFor(JsonNode.class).with(csvSchema);
+ }
+
/** A builder for creating a {@link CsvRowDeserializationSchema}. */
@PublicEvolving
public static class Builder {
diff --git a/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowSerializationSchema.java b/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowSerializationSchema.java
index 279c435e255..eca56958d96 100644
--- a/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowSerializationSchema.java
+++ b/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowSerializationSchema.java
@@ -74,13 +74,13 @@ public final class CsvRowSerializationSchema implements SerializationSchema<Row>
private final RuntimeConverter runtimeConverter;
/** CsvMapper used to write {@link JsonNode} into bytes. */
- private final CsvMapper csvMapper;
+ private transient CsvMapper csvMapper;
/** Schema describing the input CSV data. */
private final CsvSchema csvSchema;
/** Object writer used to write rows. It is configured by {@link CsvSchema}. */
- private final ObjectWriter objectWriter;
+ private transient ObjectWriter objectWriter;
/** Reusable object node. */
private transient ObjectNode root;
@@ -88,8 +88,12 @@ public final class CsvRowSerializationSchema implements SerializationSchema<Row>
private CsvRowSerializationSchema(RowTypeInfo typeInfo, CsvSchema csvSchema) {
this.typeInfo = typeInfo;
this.runtimeConverter = createRowRuntimeConverter(typeInfo, true);
- this.csvMapper = new CsvMapper();
this.csvSchema = csvSchema;
+ }
+
+ @Override
+ public void open(InitializationContext context) throws Exception {
+ this.csvMapper = new CsvMapper();
this.objectWriter = csvMapper.writer(csvSchema);
}
diff --git a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDataDeserializationSchema.java b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDataDeserializationSchema.java
index 83d1b2dc3e8..805b299c11c 100644
--- a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDataDeserializationSchema.java
+++ b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDataDeserializationSchema.java
@@ -67,11 +67,13 @@ public class JsonRowDataDeserializationSchema implements DeserializationSchema<R
private final JsonToRowDataConverters.JsonToRowDataConverter runtimeConverter;
/** Object mapper for parsing the JSON. */
- private final ObjectMapper objectMapper = new ObjectMapper();
+ private transient ObjectMapper objectMapper;
/** Timestamp format specification which is used to parse timestamp. */
private final TimestampFormat timestampFormat;
+ private final boolean hasDecimalType;
+
public JsonRowDataDeserializationSchema(
RowType rowType,
TypeInformation<RowData> resultTypeInfo,
@@ -89,12 +91,19 @@ public class JsonRowDataDeserializationSchema implements DeserializationSchema<R
new JsonToRowDataConverters(failOnMissingField, ignoreParseErrors, timestampFormat)
.createConverter(checkNotNull(rowType));
this.timestampFormat = timestampFormat;
- boolean hasDecimalType =
- LogicalTypeChecks.hasNested(rowType, t -> t instanceof DecimalType);
+ this.hasDecimalType = LogicalTypeChecks.hasNested(rowType, t -> t instanceof DecimalType);
+ }
+
+ @Override
+ public void open(InitializationContext context) throws Exception {
+ objectMapper =
+ new ObjectMapper()
+ .configure(
+ JsonReadFeature.ALLOW_UNESCAPED_CONTROL_CHARS.mappedFeature(),
+ true);
if (hasDecimalType) {
objectMapper.enable(DeserializationFeature.USE_BIG_DECIMAL_FOR_FLOATS);
}
- objectMapper.configure(JsonReadFeature.ALLOW_UNESCAPED_CONTROL_CHARS.mappedFeature(), true);
}
@Override
diff --git a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDataSerializationSchema.java b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDataSerializationSchema.java
index 1b77aab1cf4..6a8c619eeac 100644
--- a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDataSerializationSchema.java
+++ b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDataSerializationSchema.java
@@ -50,7 +50,7 @@ public class JsonRowDataSerializationSchema implements SerializationSchema<RowDa
private final RowDataToJsonConverters.RowDataToJsonConverter runtimeConverter;
/** Object mapper that is used to create output JSON objects. */
- private final ObjectMapper mapper = new ObjectMapper();
+ private transient ObjectMapper mapper;
/** Reusable object node. */
private transient ObjectNode node;
@@ -81,9 +81,15 @@ public class JsonRowDataSerializationSchema implements SerializationSchema<RowDa
this.runtimeConverter =
new RowDataToJsonConverters(timestampFormat, mapNullKeyMode, mapNullKeyLiteral)
.createConverter(rowType);
+ }
- mapper.configure(
- JsonGenerator.Feature.WRITE_BIGDECIMAL_AS_PLAIN, encodeDecimalAsPlainNumber);
+ @Override
+ public void open(InitializationContext context) throws Exception {
+ mapper =
+ new ObjectMapper()
+ .configure(
+ JsonGenerator.Feature.WRITE_BIGDECIMAL_AS_PLAIN,
+ encodeDecimalAsPlainNumber);
}
@Override
diff --git a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDeserializationSchema.java b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDeserializationSchema.java
index a337a78afed..b2b7e6dada5 100644
--- a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDeserializationSchema.java
+++ b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDeserializationSchema.java
@@ -93,8 +93,10 @@ public class JsonRowDeserializationSchema implements DeserializationSchema<Row>
private boolean failOnMissingField;
+ private final boolean hasDecimalType;
+
/** Object mapper for parsing the JSON. */
- private final ObjectMapper objectMapper = new ObjectMapper();
+ private transient ObjectMapper objectMapper;
private DeserializationRuntimeConverter runtimeConverter;
@@ -114,8 +116,12 @@ public class JsonRowDeserializationSchema implements DeserializationSchema<Row>
this.runtimeConverter = createConverter(this.typeInfo);
this.ignoreParseErrors = ignoreParseErrors;
RowType rowType = (RowType) fromLegacyInfoToDataType(this.typeInfo).getLogicalType();
- boolean hasDecimalType =
- LogicalTypeChecks.hasNested(rowType, t -> t.getTypeRoot().equals(DECIMAL));
+ hasDecimalType = LogicalTypeChecks.hasNested(rowType, t -> t.getTypeRoot().equals(DECIMAL));
+ }
+
+ @Override
+ public void open(InitializationContext context) throws Exception {
+ objectMapper = new ObjectMapper();
if (hasDecimalType) {
objectMapper.enable(DeserializationFeature.USE_BIG_DECIMAL_FOR_FLOATS);
}
diff --git a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowSerializationSchema.java b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowSerializationSchema.java
index 3dd8be1fd37..e789307967e 100644
--- a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowSerializationSchema.java
+++ b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowSerializationSchema.java
@@ -79,7 +79,7 @@ public class JsonRowSerializationSchema implements SerializationSchema<Row> {
private final RowTypeInfo typeInfo;
/** Object mapper that is used to create output JSON objects. */
- private final ObjectMapper mapper = new ObjectMapper();
+ private transient ObjectMapper mapper;
private final SerializationRuntimeConverter runtimeConverter;
@@ -94,6 +94,11 @@ public class JsonRowSerializationSchema implements SerializationSchema<Row> {
this.runtimeConverter = createConverter(typeInfo);
}
+ @Override
+ public void open(InitializationContext context) throws Exception {
+ mapper = new ObjectMapper();
+ }
+
/** Builder for {@link JsonRowSerializationSchema}. */
@PublicEvolving
public static class Builder {