You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2022/08/08 17:08:19 UTC

[flink] branch master updated (72405361610 -> 328007f0b9a)

This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


    from 72405361610 [FLINK-28663][runtime] Allow multiple downstream consumer job vertices sharing the same intermediate dataset at scheduler side
     new d442384309a [hotfix][csv][tests] Open schemas
     new 9a967c010a5 [FLINK-28621][formats] Initialize mappers in open()
     new c0bf0ac3fb1 [FLINK-28621][core] Add central Jackson mapper factory methods
     new 328007f0b9a [FLINK-28621] Enable Date/Time&Optional support for all mappers

The 4 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../sink/testutils/KinesisFirehoseTestUtils.java   |   6 +-
 .../kinesis/sink/examples/SinkIntoKinesis.java     |   9 +-
 .../JSONKeyValueDeserializationSchema.java         |  10 +-
 .../KafkaRecordDeserializationSchemaTest.java      |  12 +-
 .../JSONKeyValueDeserializationSchemaTest.java     |  31 +++--
 flink-core/pom.xml                                 |  11 +-
 .../flink/util/jackson/JacksonMapperFactory.java   |  52 ++++++++
 .../util/jackson/JacksonMapperFactoryTest.java     | 139 +++++++++++++++++++++
 .../flink/docs/rest/OpenApiSpecGenerator.java      |   4 +-
 .../flink/docs/rest/RestAPIDocGenerator.java       |   3 +-
 .../table/test/KinesisFirehoseTableITTest.java     |  14 ++-
 .../table/test/KinesisStreamsTableApiIT.java       |  11 +-
 .../flink/tests/util/flink/FlinkDistribution.java  |   3 +-
 .../kinesis/test/KinesisTableApiITCase.java        |  11 +-
 .../flink/streaming/kinesis/test/model/Order.java  |   2 +-
 .../apache/flink/formats/csv/CsvBulkWriter.java    |   3 +-
 .../flink/formats/csv/CsvFileFormatFactory.java    |   5 +-
 .../apache/flink/formats/csv/CsvReaderFormat.java  |   5 +-
 .../csv/CsvRowDataDeserializationSchema.java       |  11 +-
 .../formats/csv/CsvRowDataSerializationSchema.java |  38 ++++--
 .../formats/csv/CsvRowDeserializationSchema.java   |  11 +-
 .../formats/csv/CsvRowSerializationSchema.java     |  11 +-
 .../flink/formats/csv/RowCsvInputFormat.java       |   4 +-
 .../flink/formats/csv/CsvFormatFactoryTest.java    |   2 +
 .../flink/formats/csv/DataStreamCsvITCase.java     |  10 +-
 .../formats/json/JsonDeserializationSchema.java    |   5 +-
 .../json/JsonRowDataDeserializationSchema.java     |  18 ++-
 .../json/JsonRowDataSerializationSchema.java       |  13 +-
 .../formats/json/JsonRowDeserializationSchema.java |  13 +-
 .../flink/formats/json/JsonRowSchemaConverter.java |   3 +-
 .../formats/json/JsonRowSerializationSchema.java   |   8 +-
 .../json/JsonNodeDeserializationSchemaTest.java    |   3 +-
 .../formats/json/JsonRowDataSerDeSchemaTest.java   |  37 +++---
 .../json/JsonRowDeserializationSchemaTest.java     |  21 ++--
 .../flink/python/metric/FlinkMetricContainer.java  |   5 +-
 .../runtime/webmonitor/history/HistoryServer.java  |   3 +-
 .../history/HistoryServerArchiveFetcher.java       |   3 +-
 .../rest/compatibility/CompatibilityRoutines.java  |   3 +-
 .../rest/compatibility/RestAPIStabilityTest.java   |   3 +-
 .../runtime/webmonitor/WebFrontendITCase.java      |  17 ++-
 .../webmonitor/history/HistoryServerTest.java      |   4 +-
 .../highavailability/FileSystemJobResultStore.java |   3 +-
 .../flink/runtime/history/FsJobArchivist.java      |   3 +-
 .../flink/runtime/rest/util/RestMapperUtils.java   |   4 +-
 .../FileSystemJobResultStoreTestInternal.java      |   3 +-
 .../jobgraph/jsonplan/JsonGeneratorTest.java       |   3 +-
 .../taskmanager/TaskManagerDetailsHandlerTest.java |   3 +-
 .../messages/json/JobResultDeserializerTest.java   |   3 +-
 .../json/SerializedThrowableSerializerTest.java    |   5 +-
 .../json/SerializedValueSerializerTest.java        |   3 +-
 .../flink/streaming/api/graph/JSONGenerator.java   |   3 +-
 .../table/connector/source/CompactPartitions.java  |   3 +-
 .../plan/nodes/exec/serde/JsonSerdeUtil.java       |   9 +-
 .../nodes/exec/serde/LogicalTypeJsonSerdeTest.java |   6 +-
 .../nodes/exec/serde/PartitionSpecSerdeTest.java   |  13 +-
 .../exec/serde/RankProcessStrategySerdeTest.java   |   3 +-
 .../plan/nodes/exec/serde/RankRangeSerdeTest.java  |   3 +-
 .../plan/nodes/exec/serde/RankTypeSerdeTest.java   |   3 +-
 .../plan/nodes/exec/serde/SortSpecSerdeTest.java   |  12 +-
 .../flink/table/planner/utils/JsonTestUtils.java   |   4 +-
 .../flink/table/planner/utils/TableTestBase.scala  |   7 +-
 .../jsonplan/JsonJobGraphGenerationTest.java       |   6 +-
 62 files changed, 496 insertions(+), 183 deletions(-)
 create mode 100644 flink-core/src/main/java/org/apache/flink/util/jackson/JacksonMapperFactory.java
 create mode 100644 flink-core/src/test/java/org/apache/flink/util/jackson/JacksonMapperFactoryTest.java


[flink] 02/04: [FLINK-28621][formats] Initialize mappers in open()

Posted by ch...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 9a967c010a58e0b2277516068256ef45ee711edc
Author: Chesnay Schepler <ch...@apache.org>
AuthorDate: Thu Jul 28 16:09:43 2022 +0200

    [FLINK-28621][formats] Initialize mappers in open()
---
 .../JSONKeyValueDeserializationSchema.java         |  9 ++++--
 .../csv/CsvRowDataDeserializationSchema.java       |  8 +++--
 .../formats/csv/CsvRowDataSerializationSchema.java | 37 ++++++++++++++++------
 .../formats/csv/CsvRowDeserializationSchema.java   |  8 +++--
 .../formats/csv/CsvRowSerializationSchema.java     | 10 ++++--
 .../json/JsonRowDataDeserializationSchema.java     | 17 +++++++---
 .../json/JsonRowDataSerializationSchema.java       | 12 +++++--
 .../formats/json/JsonRowDeserializationSchema.java | 12 +++++--
 .../formats/json/JsonRowSerializationSchema.java   |  7 +++-
 9 files changed, 90 insertions(+), 30 deletions(-)

diff --git a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/util/serialization/JSONKeyValueDeserializationSchema.java b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/util/serialization/JSONKeyValueDeserializationSchema.java
index cebd40aac26..86817433a6f 100644
--- a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/util/serialization/JSONKeyValueDeserializationSchema.java
+++ b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/util/serialization/JSONKeyValueDeserializationSchema.java
@@ -18,6 +18,7 @@
 package org.apache.flink.streaming.util.serialization;
 
 import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema;
 
@@ -52,11 +53,13 @@ public class JSONKeyValueDeserializationSchema implements KafkaDeserializationSc
         this.includeMetadata = includeMetadata;
     }
 
+    @Override
+    public void open(DeserializationSchema.InitializationContext context) throws Exception {
+        mapper = new ObjectMapper();
+    }
+
     @Override
     public ObjectNode deserialize(ConsumerRecord<byte[], byte[]> record) throws Exception {
-        if (mapper == null) {
-            mapper = new ObjectMapper();
-        }
         ObjectNode node = mapper.createObjectNode();
         if (record.key() != null) {
             node.set("key", mapper.readValue(record.key(), JsonNode.class));
diff --git a/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowDataDeserializationSchema.java b/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowDataDeserializationSchema.java
index f5830f31f83..0c3f68e97f8 100644
--- a/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowDataDeserializationSchema.java
+++ b/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowDataDeserializationSchema.java
@@ -59,7 +59,7 @@ public final class CsvRowDataDeserializationSchema implements DeserializationSch
     private final CsvSchema csvSchema;
 
     /** Object reader used to read rows. It is configured by {@link CsvSchema}. */
-    private final ObjectReader objectReader;
+    private transient ObjectReader objectReader;
 
     /** Flag indicating whether to ignore invalid fields/rows (default: throw an exception). */
     private final boolean ignoreParseErrors;
@@ -72,10 +72,14 @@ public final class CsvRowDataDeserializationSchema implements DeserializationSch
         this.resultTypeInfo = resultTypeInfo;
         this.runtimeConverter = runtimeConverter;
         this.csvSchema = csvSchema;
-        this.objectReader = new CsvMapper().readerFor(JsonNode.class).with(csvSchema);
         this.ignoreParseErrors = ignoreParseErrors;
     }
 
+    @Override
+    public void open(InitializationContext context) {
+        this.objectReader = new CsvMapper().readerFor(JsonNode.class).with(csvSchema);
+    }
+
     /** A builder for creating a {@link CsvRowDataDeserializationSchema}. */
     @Internal
     public static class Builder {
diff --git a/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowDataSerializationSchema.java b/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowDataSerializationSchema.java
index 25673fcc3d8..f7fd810e691 100644
--- a/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowDataSerializationSchema.java
+++ b/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowDataSerializationSchema.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.serialization.SerializationSchema;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.function.SerializableSupplier;
 
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
@@ -54,14 +55,16 @@ public final class CsvRowDataSerializationSchema implements SerializationSchema<
     /** Runtime instance that performs the actual work. */
     private final RowDataToCsvConverters.RowDataToCsvConverter runtimeConverter;
 
+    private final SerializableSupplier<CsvMapper> csvMapperSuppler;
+
     /** CsvMapper used to write {@link JsonNode} into bytes. */
-    private final CsvMapper csvMapper;
+    private transient CsvMapper csvMapper;
 
     /** Schema describing the input CSV data. */
     private final CsvSchema csvSchema;
 
     /** Object writer used to write rows. It is configured by {@link CsvSchema}. */
-    private final ObjectWriter objectWriter;
+    private transient ObjectWriter objectWriter;
 
     /** Reusable object node. */
     private transient ObjectNode root;
@@ -72,11 +75,18 @@ public final class CsvRowDataSerializationSchema implements SerializationSchema<
             converterContext;
 
     private CsvRowDataSerializationSchema(
-            RowType rowType, CsvSchema csvSchema, CsvMapper csvMapper) {
+            RowType rowType,
+            CsvSchema csvSchema,
+            SerializableSupplier<CsvMapper> csvMapperSupplier) {
         this.rowType = rowType;
         this.runtimeConverter = RowDataToCsvConverters.createRowConverter(rowType);
-        this.csvMapper = csvMapper;
         this.csvSchema = csvSchema.withLineSeparator("");
+        this.csvMapperSuppler = csvMapperSupplier;
+    }
+
+    @Override
+    public void open(InitializationContext context) throws Exception {
+        this.csvMapper = csvMapperSuppler.get();
         this.objectWriter = csvMapper.writer(this.csvSchema);
     }
 
@@ -86,7 +96,7 @@ public final class CsvRowDataSerializationSchema implements SerializationSchema<
 
         private final RowType rowType;
         private CsvSchema csvSchema;
-        private CsvMapper csvMapper;
+        private boolean isScientificNotation;
 
         /**
          * Creates a {@link CsvRowDataSerializationSchema} expecting the given {@link RowType}.
@@ -98,7 +108,6 @@ public final class CsvRowDataSerializationSchema implements SerializationSchema<
 
             this.rowType = rowType;
             this.csvSchema = CsvRowSchemaConverter.convert(rowType);
-            this.csvMapper = new CsvMapper();
         }
 
         public Builder setFieldDelimiter(char c) {
@@ -133,12 +142,22 @@ public final class CsvRowDataSerializationSchema implements SerializationSchema<
         }
 
         public void setWriteBigDecimalInScientificNotation(boolean isScientificNotation) {
-            this.csvMapper.configure(
-                    JsonGenerator.Feature.WRITE_BIGDECIMAL_AS_PLAIN, !isScientificNotation);
+            this.isScientificNotation = isScientificNotation;
         }
 
         public CsvRowDataSerializationSchema build() {
-            return new CsvRowDataSerializationSchema(rowType, csvSchema, csvMapper);
+            // assign to local variable to avoid reference to non-serializable builder
+            final boolean isScientificNotation = this.isScientificNotation;
+            return new CsvRowDataSerializationSchema(
+                    rowType,
+                    csvSchema,
+                    () -> {
+                        final CsvMapper csvMapper = new CsvMapper();
+                        csvMapper.configure(
+                                JsonGenerator.Feature.WRITE_BIGDECIMAL_AS_PLAIN,
+                                !isScientificNotation);
+                        return csvMapper;
+                    });
         }
     }
 
diff --git a/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowDeserializationSchema.java b/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowDeserializationSchema.java
index 17ab683acb3..24153b4de46 100644
--- a/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowDeserializationSchema.java
+++ b/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowDeserializationSchema.java
@@ -78,7 +78,7 @@ public final class CsvRowDeserializationSchema implements DeserializationSchema<
     private final CsvSchema csvSchema;
 
     /** Object reader used to read rows. It is configured by {@link CsvSchema}. */
-    private final ObjectReader objectReader;
+    private transient ObjectReader objectReader;
 
     /** Flag indicating whether to ignore invalid fields/rows (default: throw an exception). */
     private final boolean ignoreParseErrors;
@@ -88,10 +88,14 @@ public final class CsvRowDeserializationSchema implements DeserializationSchema<
         this.typeInfo = typeInfo;
         this.runtimeConverter = createRowRuntimeConverter(typeInfo, ignoreParseErrors, true);
         this.csvSchema = csvSchema;
-        this.objectReader = new CsvMapper().readerFor(JsonNode.class).with(csvSchema);
         this.ignoreParseErrors = ignoreParseErrors;
     }
 
+    @Override
+    public void open(InitializationContext context) throws Exception {
+        objectReader = new CsvMapper().readerFor(JsonNode.class).with(csvSchema);
+    }
+
     /** A builder for creating a {@link CsvRowDeserializationSchema}. */
     @PublicEvolving
     public static class Builder {
diff --git a/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowSerializationSchema.java b/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowSerializationSchema.java
index 279c435e255..eca56958d96 100644
--- a/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowSerializationSchema.java
+++ b/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowSerializationSchema.java
@@ -74,13 +74,13 @@ public final class CsvRowSerializationSchema implements SerializationSchema<Row>
     private final RuntimeConverter runtimeConverter;
 
     /** CsvMapper used to write {@link JsonNode} into bytes. */
-    private final CsvMapper csvMapper;
+    private transient CsvMapper csvMapper;
 
     /** Schema describing the input CSV data. */
     private final CsvSchema csvSchema;
 
     /** Object writer used to write rows. It is configured by {@link CsvSchema}. */
-    private final ObjectWriter objectWriter;
+    private transient ObjectWriter objectWriter;
 
     /** Reusable object node. */
     private transient ObjectNode root;
@@ -88,8 +88,12 @@ public final class CsvRowSerializationSchema implements SerializationSchema<Row>
     private CsvRowSerializationSchema(RowTypeInfo typeInfo, CsvSchema csvSchema) {
         this.typeInfo = typeInfo;
         this.runtimeConverter = createRowRuntimeConverter(typeInfo, true);
-        this.csvMapper = new CsvMapper();
         this.csvSchema = csvSchema;
+    }
+
+    @Override
+    public void open(InitializationContext context) throws Exception {
+        this.csvMapper = new CsvMapper();
         this.objectWriter = csvMapper.writer(csvSchema);
     }
 
diff --git a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDataDeserializationSchema.java b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDataDeserializationSchema.java
index 83d1b2dc3e8..805b299c11c 100644
--- a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDataDeserializationSchema.java
+++ b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDataDeserializationSchema.java
@@ -67,11 +67,13 @@ public class JsonRowDataDeserializationSchema implements DeserializationSchema<R
     private final JsonToRowDataConverters.JsonToRowDataConverter runtimeConverter;
 
     /** Object mapper for parsing the JSON. */
-    private final ObjectMapper objectMapper = new ObjectMapper();
+    private transient ObjectMapper objectMapper;
 
     /** Timestamp format specification which is used to parse timestamp. */
     private final TimestampFormat timestampFormat;
 
+    private final boolean hasDecimalType;
+
     public JsonRowDataDeserializationSchema(
             RowType rowType,
             TypeInformation<RowData> resultTypeInfo,
@@ -89,12 +91,19 @@ public class JsonRowDataDeserializationSchema implements DeserializationSchema<R
                 new JsonToRowDataConverters(failOnMissingField, ignoreParseErrors, timestampFormat)
                         .createConverter(checkNotNull(rowType));
         this.timestampFormat = timestampFormat;
-        boolean hasDecimalType =
-                LogicalTypeChecks.hasNested(rowType, t -> t instanceof DecimalType);
+        this.hasDecimalType = LogicalTypeChecks.hasNested(rowType, t -> t instanceof DecimalType);
+    }
+
+    @Override
+    public void open(InitializationContext context) throws Exception {
+        objectMapper =
+                new ObjectMapper()
+                        .configure(
+                                JsonReadFeature.ALLOW_UNESCAPED_CONTROL_CHARS.mappedFeature(),
+                                true);
         if (hasDecimalType) {
             objectMapper.enable(DeserializationFeature.USE_BIG_DECIMAL_FOR_FLOATS);
         }
-        objectMapper.configure(JsonReadFeature.ALLOW_UNESCAPED_CONTROL_CHARS.mappedFeature(), true);
     }
 
     @Override
diff --git a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDataSerializationSchema.java b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDataSerializationSchema.java
index 1b77aab1cf4..6a8c619eeac 100644
--- a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDataSerializationSchema.java
+++ b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDataSerializationSchema.java
@@ -50,7 +50,7 @@ public class JsonRowDataSerializationSchema implements SerializationSchema<RowDa
     private final RowDataToJsonConverters.RowDataToJsonConverter runtimeConverter;
 
     /** Object mapper that is used to create output JSON objects. */
-    private final ObjectMapper mapper = new ObjectMapper();
+    private transient ObjectMapper mapper;
 
     /** Reusable object node. */
     private transient ObjectNode node;
@@ -81,9 +81,15 @@ public class JsonRowDataSerializationSchema implements SerializationSchema<RowDa
         this.runtimeConverter =
                 new RowDataToJsonConverters(timestampFormat, mapNullKeyMode, mapNullKeyLiteral)
                         .createConverter(rowType);
+    }
 
-        mapper.configure(
-                JsonGenerator.Feature.WRITE_BIGDECIMAL_AS_PLAIN, encodeDecimalAsPlainNumber);
+    @Override
+    public void open(InitializationContext context) throws Exception {
+        mapper =
+                new ObjectMapper()
+                        .configure(
+                                JsonGenerator.Feature.WRITE_BIGDECIMAL_AS_PLAIN,
+                                encodeDecimalAsPlainNumber);
     }
 
     @Override
diff --git a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDeserializationSchema.java b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDeserializationSchema.java
index a337a78afed..b2b7e6dada5 100644
--- a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDeserializationSchema.java
+++ b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDeserializationSchema.java
@@ -93,8 +93,10 @@ public class JsonRowDeserializationSchema implements DeserializationSchema<Row>
 
     private boolean failOnMissingField;
 
+    private final boolean hasDecimalType;
+
     /** Object mapper for parsing the JSON. */
-    private final ObjectMapper objectMapper = new ObjectMapper();
+    private transient ObjectMapper objectMapper;
 
     private DeserializationRuntimeConverter runtimeConverter;
 
@@ -114,8 +116,12 @@ public class JsonRowDeserializationSchema implements DeserializationSchema<Row>
         this.runtimeConverter = createConverter(this.typeInfo);
         this.ignoreParseErrors = ignoreParseErrors;
         RowType rowType = (RowType) fromLegacyInfoToDataType(this.typeInfo).getLogicalType();
-        boolean hasDecimalType =
-                LogicalTypeChecks.hasNested(rowType, t -> t.getTypeRoot().equals(DECIMAL));
+        hasDecimalType = LogicalTypeChecks.hasNested(rowType, t -> t.getTypeRoot().equals(DECIMAL));
+    }
+
+    @Override
+    public void open(InitializationContext context) throws Exception {
+        objectMapper = new ObjectMapper();
         if (hasDecimalType) {
             objectMapper.enable(DeserializationFeature.USE_BIG_DECIMAL_FOR_FLOATS);
         }
diff --git a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowSerializationSchema.java b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowSerializationSchema.java
index 3dd8be1fd37..e789307967e 100644
--- a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowSerializationSchema.java
+++ b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowSerializationSchema.java
@@ -79,7 +79,7 @@ public class JsonRowSerializationSchema implements SerializationSchema<Row> {
     private final RowTypeInfo typeInfo;
 
     /** Object mapper that is used to create output JSON objects. */
-    private final ObjectMapper mapper = new ObjectMapper();
+    private transient ObjectMapper mapper;
 
     private final SerializationRuntimeConverter runtimeConverter;
 
@@ -94,6 +94,11 @@ public class JsonRowSerializationSchema implements SerializationSchema<Row> {
         this.runtimeConverter = createConverter(typeInfo);
     }
 
+    @Override
+    public void open(InitializationContext context) throws Exception {
+        mapper = new ObjectMapper();
+    }
+
     /** Builder for {@link JsonRowSerializationSchema}. */
     @PublicEvolving
     public static class Builder {


[flink] 01/04: [hotfix][csv][tests] Open schemas

Posted by ch...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit d442384309a1afee3a15aa8a3a47c352e2f8d695
Author: Chesnay Schepler <ch...@apache.org>
AuthorDate: Sun Aug 7 17:22:42 2022 +0200

    [hotfix][csv][tests] Open schemas
---
 .../test/java/org/apache/flink/formats/csv/CsvFormatFactoryTest.java    | 2 ++
 1 file changed, 2 insertions(+)

diff --git a/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/CsvFormatFactoryTest.java b/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/CsvFormatFactoryTest.java
index 22e84be6de7..ba4941099cf 100644
--- a/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/CsvFormatFactoryTest.java
+++ b/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/CsvFormatFactoryTest.java
@@ -256,6 +256,7 @@ public class CsvFormatFactoryTest extends TestLogger {
 
         SerializationSchema<RowData> runtimeEncoder =
                 sinkMock.valueFormat.createRuntimeEncoder(null, schema.toPhysicalRowDataType());
+        open(runtimeEncoder);
 
         RowData rowData =
                 GenericRowData.of(
@@ -282,6 +283,7 @@ public class CsvFormatFactoryTest extends TestLogger {
 
         SerializationSchema<RowData> runtimeEncoder =
                 sinkMock.valueFormat.createRuntimeEncoder(null, schema.toPhysicalRowDataType());
+        open(runtimeEncoder);
 
         RowData rowData =
                 GenericRowData.of(


[flink] 04/04: [FLINK-28621] Enable Date/Time&Optional support for all mappers

Posted by ch...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 328007f0b9a3e4da31b20e75b94d9c339b168af0
Author: Chesnay Schepler <ch...@apache.org>
AuthorDate: Fri Aug 5 13:45:18 2022 +0200

    [FLINK-28621] Enable Date/Time&Optional support for all mappers
---
 .../flink/util/jackson/JacksonMapperFactory.java   | 12 +++
 .../util/jackson/JacksonMapperFactoryTest.java     | 95 ++++++++++++++++++++++
 .../plan/nodes/exec/serde/JsonSerdeUtil.java       |  6 --
 3 files changed, 107 insertions(+), 6 deletions(-)

diff --git a/flink-core/src/main/java/org/apache/flink/util/jackson/JacksonMapperFactory.java b/flink-core/src/main/java/org/apache/flink/util/jackson/JacksonMapperFactory.java
index 2c0b57f0471..e43451f92b5 100644
--- a/flink-core/src/main/java/org/apache/flink/util/jackson/JacksonMapperFactory.java
+++ b/flink-core/src/main/java/org/apache/flink/util/jackson/JacksonMapperFactory.java
@@ -20,7 +20,10 @@ package org.apache.flink.util.jackson;
 import org.apache.flink.annotation.Experimental;
 
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.SerializationFeature;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.CsvMapper;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.datatype.jdk8.Jdk8Module;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
 
 /** Factory for Jackson mappers. */
 @Experimental
@@ -28,13 +31,22 @@ public final class JacksonMapperFactory {
 
     public static ObjectMapper createObjectMapper() {
         final ObjectMapper objectMapper = new ObjectMapper();
+        registerModules(objectMapper);
         return objectMapper;
     }
 
     public static CsvMapper createCsvMapper() {
         final CsvMapper csvMapper = new CsvMapper();
+        registerModules(csvMapper);
         return csvMapper;
     }
 
+    private static void registerModules(ObjectMapper mapper) {
+        mapper.registerModule(new JavaTimeModule())
+                .registerModule(new Jdk8Module().configureAbsentsAsNulls(true))
+                .disable(SerializationFeature.WRITE_DURATIONS_AS_TIMESTAMPS)
+                .disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);
+    }
+
     private JacksonMapperFactory() {}
 }
diff --git a/flink-core/src/test/java/org/apache/flink/util/jackson/JacksonMapperFactoryTest.java b/flink-core/src/test/java/org/apache/flink/util/jackson/JacksonMapperFactoryTest.java
index 61f2057ca82..c624b920549 100644
--- a/flink-core/src/test/java/org/apache/flink/util/jackson/JacksonMapperFactoryTest.java
+++ b/flink-core/src/test/java/org/apache/flink/util/jackson/JacksonMapperFactoryTest.java
@@ -17,11 +17,19 @@
 
 package org.apache.flink.util.jackson;
 
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectReader;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectWriter;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.CsvMapper;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.CsvParser;
 
 import org.junit.jupiter.api.Test;
 
+import java.time.Instant;
+import java.util.Optional;
+
 import static org.assertj.core.api.Assertions.assertThat;
 
 class JacksonMapperFactoryTest {
@@ -41,4 +49,91 @@ class JacksonMapperFactoryTest {
 
         assertThat(mapper1).isNotSameAs(mapper2);
     }
+
+    @Test
+    void testObjectMapperOptionalSupportedEnabled() throws Exception {
+        final ObjectMapper mapper = JacksonMapperFactory.createObjectMapper();
+
+        assertThat(mapper.writeValueAsString(new TypeWithOptional(Optional.of("value"))))
+                .isEqualTo("{\"data\":\"value\"}");
+        assertThat(mapper.writeValueAsString(new TypeWithOptional(Optional.empty())))
+                .isEqualTo("{\"data\":null}");
+
+        assertThat(mapper.readValue("{\"data\":\"value\"}", TypeWithOptional.class).data)
+                .contains("value");
+        assertThat(mapper.readValue("{\"data\":null}", TypeWithOptional.class).data).isEmpty();
+        assertThat(mapper.readValue("{}", TypeWithOptional.class).data).isEmpty();
+    }
+
+    @Test
+    void testCsvMapperOptionalSupportedEnabled() throws Exception {
+        final CsvMapper mapper =
+                JacksonMapperFactory.createCsvMapper()
+                        // ensures symmetric read/write behavior for empty optionals/strings
+                        // ensures:   Optional.empty() --write--> "" --read--> Optional.empty()
+                        // otherwise: Optional.empty() --write--> "" --read--> Optional("")
+                        // we should consider enabling this by default, but it unfortunately
+                        // also affects String parsing without Optionals (i.e., prior code)
+                        .enable(CsvParser.Feature.EMPTY_STRING_AS_NULL);
+
+        final ObjectWriter writer = mapper.writerWithSchemaFor(TypeWithOptional.class);
+
+        assertThat(writer.writeValueAsString(new TypeWithOptional(Optional.of("value"))))
+                .isEqualTo("value\n");
+        assertThat(writer.writeValueAsString(new TypeWithOptional(Optional.empty())))
+                .isEqualTo("\n");
+
+        final ObjectReader reader = mapper.readerWithSchemaFor(TypeWithOptional.class);
+
+        assertThat(reader.readValue("value\n", TypeWithOptional.class).data).contains("value");
+        assertThat(reader.readValue("null\n", TypeWithOptional.class).data).contains("null");
+        assertThat(reader.readValue("\n", TypeWithOptional.class).data).isEmpty();
+    }
+
+    @Test
+    void testObjectMappeDateTimeSupportedEnabled() throws Exception {
+        final ObjectMapper mapper = JacksonMapperFactory.createObjectMapper();
+
+        final String instantString = "2022-08-07T12:00:33.107787800Z";
+        final Instant instant = Instant.parse(instantString);
+        final String instantJson = String.format("{\"data\":\"%s\"}", instantString);
+
+        assertThat(mapper.writeValueAsString(new TypeWithInstant(instant))).isEqualTo(instantJson);
+        assertThat(mapper.readValue(instantJson, TypeWithInstant.class).data).isEqualTo(instant);
+    }
+
+    @Test
+    void testCsvMapperDateTimeSupportedEnabled() throws Exception {
+        final CsvMapper mapper = JacksonMapperFactory.createCsvMapper();
+
+        final String instantString = "2022-08-07T12:00:33.107787800Z";
+        final Instant instant = Instant.parse(instantString);
+        final String instantCsv = String.format("\"%s\"\n", instantString);
+
+        final ObjectWriter writer = mapper.writerWithSchemaFor(TypeWithInstant.class);
+
+        assertThat(writer.writeValueAsString(new TypeWithInstant(instant))).isEqualTo(instantCsv);
+
+        final ObjectReader reader = mapper.readerWithSchemaFor(TypeWithInstant.class);
+
+        assertThat(reader.readValue(instantCsv, TypeWithInstant.class).data).isEqualTo(instant);
+    }
+
+    public static class TypeWithOptional {
+        public Optional<String> data;
+
+        @JsonCreator
+        public TypeWithOptional(@JsonProperty("data") Optional<String> data) {
+            this.data = data;
+        }
+    }
+
+    public static class TypeWithInstant {
+        public Instant data;
+
+        @JsonCreator
+        public TypeWithInstant(@JsonProperty("data") Instant data) {
+            this.data = data;
+        }
+    }
 }
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/JsonSerdeUtil.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/JsonSerdeUtil.java
index 46fd71036b2..13c36e40029 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/JsonSerdeUtil.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/JsonSerdeUtil.java
@@ -54,14 +54,11 @@ import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.Module;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectReader;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectWriter;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.SerializationFeature;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.SerializerProvider;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std.StdDeserializer;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.jsontype.NamedType;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.module.SimpleModule;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.datatype.jdk8.Jdk8Module;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
 
 import org.apache.calcite.rel.core.AggregateCall;
 import org.apache.calcite.rel.type.RelDataType;
@@ -109,9 +106,6 @@ public class JsonSerdeUtil {
                         .getTypeFactory()
                         .withClassLoader(JsonSerdeUtil.class.getClassLoader()));
         OBJECT_MAPPER_INSTANCE.configure(MapperFeature.USE_GETTERS_AS_SETTERS, false);
-        OBJECT_MAPPER_INSTANCE.configure(SerializationFeature.WRITE_DURATIONS_AS_TIMESTAMPS, false);
-        OBJECT_MAPPER_INSTANCE.registerModule(new Jdk8Module().configureAbsentsAsNulls(true));
-        OBJECT_MAPPER_INSTANCE.registerModule(new JavaTimeModule());
         OBJECT_MAPPER_INSTANCE.registerModule(createFlinkTableJacksonModule());
     }
 


[flink] 03/04: [FLINK-28621][core] Add central Jackson mapper factory methods

Posted by ch...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit c0bf0ac3fb1fe4814bff09807ed2040bb13da052
Author: Chesnay Schepler <ch...@apache.org>
AuthorDate: Thu Jul 28 16:02:26 2022 +0200

    [FLINK-28621][core] Add central Jackson mapper factory methods
---
 .../sink/testutils/KinesisFirehoseTestUtils.java   |  6 ++--
 .../kinesis/sink/examples/SinkIntoKinesis.java     |  9 +++--
 .../JSONKeyValueDeserializationSchema.java         |  3 +-
 .../KafkaRecordDeserializationSchemaTest.java      | 12 ++++---
 .../JSONKeyValueDeserializationSchemaTest.java     | 31 ++++++++---------
 flink-core/pom.xml                                 | 11 +++---
 .../flink/util/jackson/JacksonMapperFactory.java   | 40 ++++++++++++++++++++++
 .../util/jackson/JacksonMapperFactoryTest.java     | 37 +++++++++-----------
 .../flink/docs/rest/OpenApiSpecGenerator.java      |  4 ++-
 .../flink/docs/rest/RestAPIDocGenerator.java       |  3 +-
 .../table/test/KinesisFirehoseTableITTest.java     | 14 +++++---
 .../table/test/KinesisStreamsTableApiIT.java       | 11 +++---
 .../flink/tests/util/flink/FlinkDistribution.java  |  3 +-
 .../kinesis/test/KinesisTableApiITCase.java        | 11 +++---
 .../flink/streaming/kinesis/test/model/Order.java  |  2 +-
 .../apache/flink/formats/csv/CsvBulkWriter.java    |  3 +-
 .../flink/formats/csv/CsvFileFormatFactory.java    |  5 +--
 .../apache/flink/formats/csv/CsvReaderFormat.java  |  5 +--
 .../csv/CsvRowDataDeserializationSchema.java       |  5 +--
 .../formats/csv/CsvRowDataSerializationSchema.java |  3 +-
 .../formats/csv/CsvRowDeserializationSchema.java   |  5 +--
 .../formats/csv/CsvRowSerializationSchema.java     |  3 +-
 .../flink/formats/csv/RowCsvInputFormat.java       |  4 +--
 .../flink/formats/csv/DataStreamCsvITCase.java     | 10 +++---
 .../formats/json/JsonDeserializationSchema.java    |  5 +--
 .../json/JsonRowDataDeserializationSchema.java     |  3 +-
 .../json/JsonRowDataSerializationSchema.java       |  3 +-
 .../formats/json/JsonRowDeserializationSchema.java |  3 +-
 .../flink/formats/json/JsonRowSchemaConverter.java |  3 +-
 .../formats/json/JsonRowSerializationSchema.java   |  3 +-
 .../json/JsonNodeDeserializationSchemaTest.java    |  3 +-
 .../formats/json/JsonRowDataSerDeSchemaTest.java   | 37 +++++++++-----------
 .../json/JsonRowDeserializationSchemaTest.java     | 21 +++++-------
 .../flink/python/metric/FlinkMetricContainer.java  |  5 ++-
 .../runtime/webmonitor/history/HistoryServer.java  |  3 +-
 .../history/HistoryServerArchiveFetcher.java       |  3 +-
 .../rest/compatibility/CompatibilityRoutines.java  |  3 +-
 .../rest/compatibility/RestAPIStabilityTest.java   |  3 +-
 .../runtime/webmonitor/WebFrontendITCase.java      | 17 ++++-----
 .../webmonitor/history/HistoryServerTest.java      |  4 ++-
 .../highavailability/FileSystemJobResultStore.java |  3 +-
 .../flink/runtime/history/FsJobArchivist.java      |  3 +-
 .../flink/runtime/rest/util/RestMapperUtils.java   |  4 ++-
 .../FileSystemJobResultStoreTestInternal.java      |  3 +-
 .../jobgraph/jsonplan/JsonGeneratorTest.java       |  3 +-
 .../taskmanager/TaskManagerDetailsHandlerTest.java |  3 +-
 .../messages/json/JobResultDeserializerTest.java   |  3 +-
 .../json/SerializedThrowableSerializerTest.java    |  5 +--
 .../json/SerializedValueSerializerTest.java        |  3 +-
 .../flink/streaming/api/graph/JSONGenerator.java   |  3 +-
 .../table/connector/source/CompactPartitions.java  |  3 +-
 .../plan/nodes/exec/serde/JsonSerdeUtil.java       |  3 +-
 .../nodes/exec/serde/LogicalTypeJsonSerdeTest.java |  6 ++--
 .../nodes/exec/serde/PartitionSpecSerdeTest.java   | 13 ++++---
 .../exec/serde/RankProcessStrategySerdeTest.java   |  3 +-
 .../plan/nodes/exec/serde/RankRangeSerdeTest.java  |  3 +-
 .../plan/nodes/exec/serde/RankTypeSerdeTest.java   |  3 +-
 .../plan/nodes/exec/serde/SortSpecSerdeTest.java   | 12 ++++---
 .../flink/table/planner/utils/JsonTestUtils.java   |  4 ++-
 .../flink/table/planner/utils/TableTestBase.scala  |  7 ++--
 .../jsonplan/JsonJobGraphGenerationTest.java       |  6 ++--
 61 files changed, 278 insertions(+), 177 deletions(-)

diff --git a/flink-connectors/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/sink/testutils/KinesisFirehoseTestUtils.java b/flink-connectors/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/sink/testutils/KinesisFirehoseTestUtils.java
index 129c4a43e7f..f4dee623b64 100644
--- a/flink-connectors/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/sink/testutils/KinesisFirehoseTestUtils.java
+++ b/flink-connectors/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/sink/testutils/KinesisFirehoseTestUtils.java
@@ -20,6 +20,7 @@ package org.apache.flink.connector.firehose.sink.testutils;
 import org.apache.flink.connector.aws.testutils.AWSServicesTestUtils;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.util.jackson.JacksonMapperFactory;
 
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
@@ -39,7 +40,7 @@ import java.util.List;
  */
 public class KinesisFirehoseTestUtils {
 
-    private static final ObjectMapper MAPPER = new ObjectMapper();
+    private static final ObjectMapper MAPPER = JacksonMapperFactory.createObjectMapper();
 
     public static FirehoseClient createFirehoseClient(String endpoint, SdkHttpClient httpClient) {
         return AWSServicesTestUtils.createAwsSyncClient(
@@ -68,11 +69,10 @@ public class KinesisFirehoseTestUtils {
 
     public static DataStream<String> getSampleDataGenerator(
             StreamExecutionEnvironment env, int endValue) {
-        ObjectMapper mapper = new ObjectMapper();
         return env.fromSequence(1, endValue)
                 .map(Object::toString)
                 .returns(String.class)
-                .map(data -> mapper.writeValueAsString(ImmutableMap.of("data", data)));
+                .map(data -> MAPPER.writeValueAsString(ImmutableMap.of("data", data)));
     }
 
     public static List<String> getSampleData(int endValue) throws JsonProcessingException {
diff --git a/flink-connectors/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/sink/examples/SinkIntoKinesis.java b/flink-connectors/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/sink/examples/SinkIntoKinesis.java
index 362e25fda77..c861f44fbdf 100644
--- a/flink-connectors/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/sink/examples/SinkIntoKinesis.java
+++ b/flink-connectors/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/sink/examples/SinkIntoKinesis.java
@@ -22,6 +22,7 @@ import org.apache.flink.connector.aws.config.AWSConfigConstants;
 import org.apache.flink.connector.kinesis.sink.KinesisStreamsSink;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.util.jackson.JacksonMapperFactory;
 
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
 
@@ -39,8 +40,9 @@ import java.util.Properties;
  */
 public class SinkIntoKinesis {
 
+    private static final ObjectMapper OBJECT_MAPPER = JacksonMapperFactory.createObjectMapper();
+
     public static void main(String[] args) throws Exception {
-        ObjectMapper mapper = new ObjectMapper();
         final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
         env.enableCheckpointing(10_000);
 
@@ -48,7 +50,10 @@ public class SinkIntoKinesis {
                 env.fromSequence(1, 10_000_000L)
                         .map(Object::toString)
                         .returns(String.class)
-                        .map(data -> mapper.writeValueAsString(ImmutableMap.of("data", data)));
+                        .map(
+                                data ->
+                                        OBJECT_MAPPER.writeValueAsString(
+                                                ImmutableMap.of("data", data)));
 
         Properties sinkProperties = new Properties();
         sinkProperties.put(AWSConfigConstants.AWS_REGION, "your-region-here");
diff --git a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/util/serialization/JSONKeyValueDeserializationSchema.java b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/util/serialization/JSONKeyValueDeserializationSchema.java
index 86817433a6f..e2b428eec1e 100644
--- a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/util/serialization/JSONKeyValueDeserializationSchema.java
+++ b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/util/serialization/JSONKeyValueDeserializationSchema.java
@@ -21,6 +21,7 @@ import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.serialization.DeserializationSchema;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema;
+import org.apache.flink.util.jackson.JacksonMapperFactory;
 
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
@@ -55,7 +56,7 @@ public class JSONKeyValueDeserializationSchema implements KafkaDeserializationSc
 
     @Override
     public void open(DeserializationSchema.InitializationContext context) throws Exception {
-        mapper = new ObjectMapper();
+        mapper = JacksonMapperFactory.createObjectMapper();
     }
 
     @Override
diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/deserializer/KafkaRecordDeserializationSchemaTest.java b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/deserializer/KafkaRecordDeserializationSchemaTest.java
index c2e735a0c13..8766719a0c1 100644
--- a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/deserializer/KafkaRecordDeserializationSchemaTest.java
+++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/deserializer/KafkaRecordDeserializationSchemaTest.java
@@ -23,6 +23,7 @@ import org.apache.flink.connector.testutils.source.deserialization.TestingDeseri
 import org.apache.flink.formats.json.JsonDeserializationSchema;
 import org.apache.flink.streaming.util.serialization.JSONKeyValueDeserializationSchema;
 import org.apache.flink.util.Collector;
+import org.apache.flink.util.jackson.JacksonMapperFactory;
 
 import org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableMap;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
@@ -46,6 +47,8 @@ import static org.assertj.core.api.Assertions.assertThat;
 /** Unit tests for KafkaRecordDeserializationSchema. */
 public class KafkaRecordDeserializationSchemaTest {
 
+    private static final ObjectMapper OBJECT_MAPPER = JacksonMapperFactory.createObjectMapper();
+
     private static Map<String, ?> configurableConfiguration;
     private static Map<String, ?> configuration;
     private static boolean isKeyDeserializer;
@@ -135,14 +138,13 @@ public class KafkaRecordDeserializationSchemaTest {
     }
 
     private ConsumerRecord<byte[], byte[]> getConsumerRecord() throws JsonProcessingException {
-        ObjectMapper mapper = new ObjectMapper();
-        ObjectNode initialKey = mapper.createObjectNode();
+        ObjectNode initialKey = OBJECT_MAPPER.createObjectNode();
         initialKey.put("index", 4);
-        byte[] serializedKey = mapper.writeValueAsBytes(initialKey);
+        byte[] serializedKey = OBJECT_MAPPER.writeValueAsBytes(initialKey);
 
-        ObjectNode initialValue = mapper.createObjectNode();
+        ObjectNode initialValue = OBJECT_MAPPER.createObjectNode();
         initialValue.put("word", "world");
-        byte[] serializedValue = mapper.writeValueAsBytes(initialValue);
+        byte[] serializedValue = OBJECT_MAPPER.writeValueAsBytes(initialValue);
 
         return new ConsumerRecord<>("topic#1", 3, 4L, serializedKey, serializedValue);
     }
diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/JSONKeyValueDeserializationSchemaTest.java b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/JSONKeyValueDeserializationSchemaTest.java
index 3a052cee2af..ddbcf1c94c8 100644
--- a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/JSONKeyValueDeserializationSchemaTest.java
+++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/JSONKeyValueDeserializationSchemaTest.java
@@ -19,6 +19,7 @@ package org.apache.flink.streaming.connectors.kafka;
 
 import org.apache.flink.connector.testutils.formats.DummyInitializationContext;
 import org.apache.flink.streaming.util.serialization.JSONKeyValueDeserializationSchema;
+import org.apache.flink.util.jackson.JacksonMapperFactory;
 
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
@@ -31,16 +32,17 @@ import static org.assertj.core.api.Assertions.assertThat;
 /** Tests for the{@link JSONKeyValueDeserializationSchema}. */
 public class JSONKeyValueDeserializationSchemaTest {
 
+    private static final ObjectMapper OBJECT_MAPPER = JacksonMapperFactory.createObjectMapper();
+
     @Test
     public void testDeserializeWithoutMetadata() throws Exception {
-        ObjectMapper mapper = new ObjectMapper();
-        ObjectNode initialKey = mapper.createObjectNode();
+        ObjectNode initialKey = OBJECT_MAPPER.createObjectNode();
         initialKey.put("index", 4);
-        byte[] serializedKey = mapper.writeValueAsBytes(initialKey);
+        byte[] serializedKey = OBJECT_MAPPER.writeValueAsBytes(initialKey);
 
-        ObjectNode initialValue = mapper.createObjectNode();
+        ObjectNode initialValue = OBJECT_MAPPER.createObjectNode();
         initialValue.put("word", "world");
-        byte[] serializedValue = mapper.writeValueAsBytes(initialValue);
+        byte[] serializedValue = OBJECT_MAPPER.writeValueAsBytes(initialValue);
 
         JSONKeyValueDeserializationSchema schema = new JSONKeyValueDeserializationSchema(false);
         schema.open(new DummyInitializationContext());
@@ -54,12 +56,11 @@ public class JSONKeyValueDeserializationSchemaTest {
 
     @Test
     public void testDeserializeWithoutKey() throws Exception {
-        ObjectMapper mapper = new ObjectMapper();
         byte[] serializedKey = null;
 
-        ObjectNode initialValue = mapper.createObjectNode();
+        ObjectNode initialValue = OBJECT_MAPPER.createObjectNode();
         initialValue.put("word", "world");
-        byte[] serializedValue = mapper.writeValueAsBytes(initialValue);
+        byte[] serializedValue = OBJECT_MAPPER.writeValueAsBytes(initialValue);
 
         JSONKeyValueDeserializationSchema schema = new JSONKeyValueDeserializationSchema(false);
         schema.open(new DummyInitializationContext());
@@ -88,10 +89,9 @@ public class JSONKeyValueDeserializationSchemaTest {
 
     @Test
     public void testDeserializeWithoutValue() throws Exception {
-        ObjectMapper mapper = new ObjectMapper();
-        ObjectNode initialKey = mapper.createObjectNode();
+        ObjectNode initialKey = OBJECT_MAPPER.createObjectNode();
         initialKey.put("index", 4);
-        byte[] serializedKey = mapper.writeValueAsBytes(initialKey);
+        byte[] serializedKey = OBJECT_MAPPER.writeValueAsBytes(initialKey);
 
         byte[] serializedValue = null;
 
@@ -107,14 +107,13 @@ public class JSONKeyValueDeserializationSchemaTest {
 
     @Test
     public void testDeserializeWithMetadata() throws Exception {
-        ObjectMapper mapper = new ObjectMapper();
-        ObjectNode initialKey = mapper.createObjectNode();
+        ObjectNode initialKey = OBJECT_MAPPER.createObjectNode();
         initialKey.put("index", 4);
-        byte[] serializedKey = mapper.writeValueAsBytes(initialKey);
+        byte[] serializedKey = OBJECT_MAPPER.writeValueAsBytes(initialKey);
 
-        ObjectNode initialValue = mapper.createObjectNode();
+        ObjectNode initialValue = OBJECT_MAPPER.createObjectNode();
         initialValue.put("word", "world");
-        byte[] serializedValue = mapper.writeValueAsBytes(initialValue);
+        byte[] serializedValue = OBJECT_MAPPER.writeValueAsBytes(initialValue);
 
         JSONKeyValueDeserializationSchema schema = new JSONKeyValueDeserializationSchema(true);
         schema.open(new DummyInitializationContext());
diff --git a/flink-core/pom.xml b/flink-core/pom.xml
index ef3061f1ac5..45be24efc05 100644
--- a/flink-core/pom.xml
+++ b/flink-core/pom.xml
@@ -51,6 +51,11 @@ under the License.
 			<artifactId>flink-shaded-asm-9</artifactId>
 		</dependency>
 
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-shaded-jackson</artifactId>
+		</dependency>
+
 		<!-- standard utilities -->
 		<dependency>
 			<groupId>org.apache.commons</groupId>
@@ -114,12 +119,6 @@ under the License.
 			<scope>test</scope>
 		</dependency>
 
-		<dependency>
-			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-shaded-jackson</artifactId>
-			<scope>test</scope>
-		</dependency>
-
 		<dependency>
 			<groupId>org.projectlombok</groupId>
 			<artifactId>lombok</artifactId>
diff --git a/flink-core/src/main/java/org/apache/flink/util/jackson/JacksonMapperFactory.java b/flink-core/src/main/java/org/apache/flink/util/jackson/JacksonMapperFactory.java
new file mode 100644
index 00000000000..2c0b57f0471
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/util/jackson/JacksonMapperFactory.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.util.jackson;
+
+import org.apache.flink.annotation.Experimental;
+
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.CsvMapper;
+
+/** Factory for Jackson mappers. */
+@Experimental
+public final class JacksonMapperFactory {
+
+    public static ObjectMapper createObjectMapper() {
+        final ObjectMapper objectMapper = new ObjectMapper();
+        return objectMapper;
+    }
+
+    public static CsvMapper createCsvMapper() {
+        final CsvMapper csvMapper = new CsvMapper();
+        return csvMapper;
+    }
+
+    private JacksonMapperFactory() {}
+}
diff --git a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonNodeDeserializationSchemaTest.java b/flink-core/src/test/java/org/apache/flink/util/jackson/JacksonMapperFactoryTest.java
similarity index 52%
copy from flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonNodeDeserializationSchemaTest.java
copy to flink-core/src/test/java/org/apache/flink/util/jackson/JacksonMapperFactoryTest.java
index 90751525feb..61f2057ca82 100644
--- a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonNodeDeserializationSchemaTest.java
+++ b/flink-core/src/test/java/org/apache/flink/util/jackson/JacksonMapperFactoryTest.java
@@ -15,35 +15,30 @@
  * limitations under the License.
  */
 
-package org.apache.flink.formats.json;
-
-import org.apache.flink.connector.testutils.formats.DummyInitializationContext;
+package org.apache.flink.util.jackson;
 
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.CsvMapper;
 
 import org.junit.jupiter.api.Test;
 
-import java.io.IOException;
-
 import static org.assertj.core.api.Assertions.assertThat;
 
-/** Tests for the {@link JsonNodeDeserializationSchema}. */
-@SuppressWarnings("deprecation")
-class JsonNodeDeserializationSchemaTest {
+class JacksonMapperFactoryTest {
 
     @Test
-    void testDeserialize() throws IOException {
-        ObjectMapper mapper = new ObjectMapper();
-        ObjectNode initialValue = mapper.createObjectNode();
-        initialValue.put("key", 4).put("value", "world");
-        byte[] serializedValue = mapper.writeValueAsBytes(initialValue);
-
-        JsonNodeDeserializationSchema schema = new JsonNodeDeserializationSchema();
-        schema.open(new DummyInitializationContext());
-        ObjectNode deserializedValue = schema.deserialize(serializedValue);
-
-        assertThat(deserializedValue.get("key").asInt()).isEqualTo(4);
-        assertThat(deserializedValue.get("value").asText()).isEqualTo("world");
+    void testCreateObjectMapperReturnDistinctMappers() {
+        final ObjectMapper mapper1 = JacksonMapperFactory.createObjectMapper();
+        final ObjectMapper mapper2 = JacksonMapperFactory.createObjectMapper();
+
+        assertThat(mapper1).isNotSameAs(mapper2);
+    }
+
+    @Test
+    void testCreateCsvMapperReturnDistinctMappers() {
+        final CsvMapper mapper1 = JacksonMapperFactory.createCsvMapper();
+        final CsvMapper mapper2 = JacksonMapperFactory.createCsvMapper();
+
+        assertThat(mapper1).isNotSameAs(mapper2);
     }
 }
diff --git a/flink-docs/src/main/java/org/apache/flink/docs/rest/OpenApiSpecGenerator.java b/flink-docs/src/main/java/org/apache/flink/docs/rest/OpenApiSpecGenerator.java
index 0265ff2c957..fe17e807f8f 100644
--- a/flink-docs/src/main/java/org/apache/flink/docs/rest/OpenApiSpecGenerator.java
+++ b/flink-docs/src/main/java/org/apache/flink/docs/rest/OpenApiSpecGenerator.java
@@ -44,6 +44,7 @@ import org.apache.flink.runtime.util.EnvironmentInformation;
 import org.apache.flink.runtime.webmonitor.handlers.JarUploadHeaders;
 import org.apache.flink.util.ConfigurationException;
 import org.apache.flink.util.SerializedThrowable;
+import org.apache.flink.util.jackson.JacksonMapperFactory;
 
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.SerializationFeature;
@@ -100,7 +101,8 @@ public class OpenApiSpecGenerator {
     static {
         ModelResolver.enumsAsRef = true;
         final ObjectMapper mapper =
-                new ObjectMapper().configure(SerializationFeature.ORDER_MAP_ENTRIES_BY_KEYS, true);
+                JacksonMapperFactory.createObjectMapper()
+                        .configure(SerializationFeature.ORDER_MAP_ENTRIES_BY_KEYS, true);
         modelConverterContext =
                 new ModelConverterContextImpl(Collections.singletonList(new ModelResolver(mapper)));
     }
diff --git a/flink-docs/src/main/java/org/apache/flink/docs/rest/RestAPIDocGenerator.java b/flink-docs/src/main/java/org/apache/flink/docs/rest/RestAPIDocGenerator.java
index 489bc6415ed..a21ec24cc3e 100644
--- a/flink-docs/src/main/java/org/apache/flink/docs/rest/RestAPIDocGenerator.java
+++ b/flink-docs/src/main/java/org/apache/flink/docs/rest/RestAPIDocGenerator.java
@@ -32,6 +32,7 @@ import org.apache.flink.runtime.rest.util.DocumentingDispatcherRestEndpoint;
 import org.apache.flink.runtime.rest.util.DocumentingRestEndpoint;
 import org.apache.flink.runtime.rest.versioning.RuntimeRestAPIVersion;
 import org.apache.flink.util.ConfigurationException;
+import org.apache.flink.util.jackson.JacksonMapperFactory;
 
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.SerializableString;
@@ -99,7 +100,7 @@ public class RestAPIDocGenerator {
     private static final JsonSchemaGenerator schemaGen;
 
     static {
-        mapper = new ObjectMapper();
+        mapper = JacksonMapperFactory.createObjectMapper();
         mapper.getFactory().setCharacterEscapes(new HTMLCharacterEscapes());
         mapper.configure(SerializationFeature.ORDER_MAP_ENTRIES_BY_KEYS, true);
         schemaGen = new JsonSchemaGenerator(mapper);
diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/table/test/KinesisFirehoseTableITTest.java b/flink-end-to-end-tests/flink-end-to-end-tests-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/table/test/KinesisFirehoseTableITTest.java
index 172693c5ac6..ce67a707aec 100644
--- a/flink-end-to-end-tests/flink-end-to-end-tests-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/table/test/KinesisFirehoseTableITTest.java
+++ b/flink-end-to-end-tests/flink-end-to-end-tests-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/table/test/KinesisFirehoseTableITTest.java
@@ -26,10 +26,13 @@ import org.apache.flink.test.util.SQLJobSubmission;
 import org.apache.flink.tests.util.TestUtils;
 import org.apache.flink.util.DockerImageVersions;
 import org.apache.flink.util.TestLogger;
+import org.apache.flink.util.jackson.JacksonMapperFactory;
+
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
 
-import com.fasterxml.jackson.annotation.JsonProperty;
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.ObjectMapper;
 import org.assertj.core.api.Assertions;
 import org.junit.After;
 import org.junit.AfterClass;
@@ -79,6 +82,8 @@ public class KinesisFirehoseTableITTest extends TestLogger {
     private static final String BUCKET_NAME = "s3-firehose";
     private static final String STREAM_NAME = "s3-stream";
 
+    private static final ObjectMapper OBJECT_MAPPER = JacksonMapperFactory.createObjectMapper();
+
     private final Path sqlConnectorFirehoseJar = TestUtils.getResource(".*firehose.jar");
 
     private SdkHttpClient httpClient;
@@ -207,7 +212,7 @@ public class KinesisFirehoseTableITTest extends TestLogger {
 
     private <T> T fromJson(final String json, final Class<T> type) {
         try {
-            return new ObjectMapper().readValue(json, type);
+            return OBJECT_MAPPER.readValue(json, type);
         } catch (JsonProcessingException e) {
             throw new RuntimeException(String.format("Failed to deserialize json: %s", json), e);
         }
@@ -218,6 +223,7 @@ public class KinesisFirehoseTableITTest extends TestLogger {
         private final String code;
         private final int quantity;
 
+        @JsonCreator
         public Order(
                 @JsonProperty("code") final String code, @JsonProperty("quantity") int quantity) {
             this.code = code;
diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/table/test/KinesisStreamsTableApiIT.java b/flink-end-to-end-tests/flink-end-to-end-tests-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/table/test/KinesisStreamsTableApiIT.java
index 1f415c4bcec..d52ca2cb83d 100644
--- a/flink-end-to-end-tests/flink-end-to-end-tests-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/table/test/KinesisStreamsTableApiIT.java
+++ b/flink-end-to-end-tests/flink-end-to-end-tests-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/table/test/KinesisStreamsTableApiIT.java
@@ -27,12 +27,13 @@ import org.apache.flink.connectors.kinesis.testutils.KinesaliteContainer;
 import org.apache.flink.test.util.SQLJobSubmission;
 import org.apache.flink.tests.util.TestUtils;
 import org.apache.flink.util.DockerImageVersions;
+import org.apache.flink.util.jackson.JacksonMapperFactory;
 
 import org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableList;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
 
-import com.fasterxml.jackson.annotation.JsonProperty;
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.ObjectMapper;
 import org.assertj.core.api.Assertions;
 import org.junit.After;
 import org.junit.AfterClass;
@@ -78,6 +79,8 @@ public class KinesisStreamsTableApiIT {
     private static final String ORDERS_STREAM = "orders";
     private static final String INTER_CONTAINER_KINESALITE_ALIAS = "kinesalite";
     private static final String DEFAULT_FIRST_SHARD_NAME = "shardId-000000000000";
+    private static final ObjectMapper OBJECT_MAPPER = JacksonMapperFactory.createObjectMapper();
+
     private SdkHttpClient httpClient;
     private KinesisClient kinesisClient;
 
@@ -202,7 +205,7 @@ public class KinesisStreamsTableApiIT {
 
     private <T> T fromJson(final String json, final Class<T> type) {
         try {
-            return new ObjectMapper().readValue(json, type);
+            return OBJECT_MAPPER.readValue(json, type);
         } catch (JsonProcessingException e) {
             throw new RuntimeException("Test Failure.", e);
         }
diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/FlinkDistribution.java b/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/FlinkDistribution.java
index e9b939adf4d..05f11f820b8 100644
--- a/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/FlinkDistribution.java
+++ b/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/FlinkDistribution.java
@@ -29,6 +29,7 @@ import org.apache.flink.tests.util.AutoClosableProcess;
 import org.apache.flink.tests.util.TestUtils;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.function.FutureTaskWithException;
+import org.apache.flink.util.jackson.JacksonMapperFactory;
 
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
@@ -75,7 +76,7 @@ public final class FlinkDistribution {
 
     private static final Logger LOG = LoggerFactory.getLogger(FlinkDistribution.class);
 
-    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+    private static final ObjectMapper OBJECT_MAPPER = JacksonMapperFactory.createObjectMapper();
 
     private static final Pattern ROOT_LOGGER_PATTERN = Pattern.compile("(rootLogger.level =).*");
     private static final String HIVE_DRIVER = "org.apache.hive.jdbc.HiveDriver";
diff --git a/flink-end-to-end-tests/flink-streaming-kinesis-test/src/test/java/org/apache/flink/streaming/kinesis/test/KinesisTableApiITCase.java b/flink-end-to-end-tests/flink-streaming-kinesis-test/src/test/java/org/apache/flink/streaming/kinesis/test/KinesisTableApiITCase.java
index 4cd75934ed7..b637108e5db 100644
--- a/flink-end-to-end-tests/flink-streaming-kinesis-test/src/test/java/org/apache/flink/streaming/kinesis/test/KinesisTableApiITCase.java
+++ b/flink-end-to-end-tests/flink-streaming-kinesis-test/src/test/java/org/apache/flink/streaming/kinesis/test/KinesisTableApiITCase.java
@@ -28,11 +28,12 @@ import org.apache.flink.test.util.SQLJobSubmission;
 import org.apache.flink.tests.util.TestUtils;
 import org.apache.flink.util.DockerImageVersions;
 import org.apache.flink.util.TestLogger;
+import org.apache.flink.util.jackson.JacksonMapperFactory;
 
 import org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableList;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
 
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.ObjectMapper;
 import org.hamcrest.Matchers;
 import org.junit.After;
 import org.junit.AfterClass;
@@ -67,6 +68,8 @@ public class KinesisTableApiITCase extends TestLogger {
     private static final String LARGE_ORDERS_STREAM = "large_orders";
     private static final String INTER_CONTAINER_KINESALITE_ALIAS = "kinesalite";
 
+    private static final ObjectMapper OBJECT_MAPPER = JacksonMapperFactory.createObjectMapper();
+
     private final Path sqlConnectorKinesisJar = TestUtils.getResource(".*kinesis.jar");
     private static final Network network = Network.newNetwork();
 
@@ -166,7 +169,7 @@ public class KinesisTableApiITCase extends TestLogger {
 
     private <T> String toJson(final T object) {
         try {
-            return new ObjectMapper().writeValueAsString(object);
+            return OBJECT_MAPPER.writeValueAsString(object);
         } catch (JsonProcessingException e) {
             throw new RuntimeException("Test Failure.", e);
         }
@@ -174,7 +177,7 @@ public class KinesisTableApiITCase extends TestLogger {
 
     private <T> T fromJson(final String json, final Class<T> type) {
         try {
-            return new ObjectMapper().readValue(json, type);
+            return OBJECT_MAPPER.readValue(json, type);
         } catch (JsonProcessingException e) {
             throw new RuntimeException("Test Failure.", e);
         }
diff --git a/flink-end-to-end-tests/flink-streaming-kinesis-test/src/test/java/org/apache/flink/streaming/kinesis/test/model/Order.java b/flink-end-to-end-tests/flink-streaming-kinesis-test/src/test/java/org/apache/flink/streaming/kinesis/test/model/Order.java
index 15c158c4c8f..58cec308eae 100644
--- a/flink-end-to-end-tests/flink-streaming-kinesis-test/src/test/java/org/apache/flink/streaming/kinesis/test/model/Order.java
+++ b/flink-end-to-end-tests/flink-streaming-kinesis-test/src/test/java/org/apache/flink/streaming/kinesis/test/model/Order.java
@@ -17,7 +17,7 @@
 
 package org.apache.flink.streaming.kinesis.test.model;
 
-import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
 
 import java.util.Objects;
 
diff --git a/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvBulkWriter.java b/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvBulkWriter.java
index 69b3f9bce28..f9a8e01bb07 100644
--- a/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvBulkWriter.java
+++ b/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvBulkWriter.java
@@ -21,6 +21,7 @@ package org.apache.flink.formats.csv;
 import org.apache.flink.api.common.serialization.BulkWriter;
 import org.apache.flink.core.fs.FSDataOutputStream;
 import org.apache.flink.formats.common.Converter;
+import org.apache.flink.util.jackson.JacksonMapperFactory;
 
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectWriter;
@@ -89,7 +90,7 @@ class CsvBulkWriter<T, R, C> implements BulkWriter<T> {
      */
     static <T> CsvBulkWriter<T, T, Void> forPojo(Class<T> pojoClass, FSDataOutputStream stream) {
         final Converter<T, T, Void> converter = (value, context) -> value;
-        final CsvMapper csvMapper = new CsvMapper();
+        final CsvMapper csvMapper = JacksonMapperFactory.createCsvMapper();
         final CsvSchema schema = csvMapper.schemaFor(pojoClass).withoutQuoteChar();
         return new CsvBulkWriter<>(csvMapper, schema, converter, null, stream);
     }
diff --git a/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvFileFormatFactory.java b/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvFileFormatFactory.java
index b0a0b27d5e7..f977acd5b4f 100644
--- a/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvFileFormatFactory.java
+++ b/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvFileFormatFactory.java
@@ -46,6 +46,7 @@ import org.apache.flink.table.factories.DynamicTableFactory;
 import org.apache.flink.table.plan.stats.TableStats;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.util.jackson.JacksonMapperFactory;
 
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
@@ -132,7 +133,7 @@ public class CsvFileFormatFactory implements BulkReaderFormatFactory, BulkWriter
                                     .createRowConverter(projectedRowType, true);
             CsvReaderFormat<RowData> csvReaderFormat =
                     new CsvReaderFormat<>(
-                            () -> new CsvMapper(),
+                            () -> JacksonMapperFactory.createCsvMapper(),
                             ignored -> schema,
                             JsonNode.class,
                             converter,
@@ -178,7 +179,7 @@ public class CsvFileFormatFactory implements BulkReaderFormatFactory, BulkWriter
         final RowDataToCsvConverter converter = RowDataToCsvConverters.createRowConverter(rowType);
 
         return out -> {
-            final CsvMapper mapper = new CsvMapper();
+            final CsvMapper mapper = JacksonMapperFactory.createCsvMapper();
             final ObjectNode container = mapper.createObjectNode();
 
             final RowDataToCsvConverter.RowDataToCsvFormatConverterContext converterContext =
diff --git a/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvReaderFormat.java b/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvReaderFormat.java
index c2d14008cb9..02f3afbe43e 100644
--- a/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvReaderFormat.java
+++ b/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvReaderFormat.java
@@ -27,6 +27,7 @@ import org.apache.flink.core.fs.FSDataInputStream;
 import org.apache.flink.formats.common.Converter;
 import org.apache.flink.util.function.SerializableFunction;
 import org.apache.flink.util.function.SerializableSupplier;
+import org.apache.flink.util.jackson.JacksonMapperFactory;
 
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.MappingIterator;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.CsvMapper;
@@ -108,7 +109,7 @@ public class CsvReaderFormat<T> extends SimpleStreamFormat<T> {
      */
     public static <T> CsvReaderFormat<T> forSchema(
             CsvSchema schema, TypeInformation<T> typeInformation) {
-        return forSchema(() -> new CsvMapper(), ignored -> schema, typeInformation);
+        return forSchema(JacksonMapperFactory::createCsvMapper, ignored -> schema, typeInformation);
     }
 
     /**
@@ -163,7 +164,7 @@ public class CsvReaderFormat<T> extends SimpleStreamFormat<T> {
      */
     public static <T> CsvReaderFormat<T> forPojo(Class<T> pojoType) {
         return forSchema(
-                () -> new CsvMapper(),
+                () -> JacksonMapperFactory.createCsvMapper(),
                 mapper -> mapper.schemaFor(pojoType).withoutQuoteChar(),
                 TypeInformation.of(pojoType));
     }
diff --git a/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowDataDeserializationSchema.java b/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowDataDeserializationSchema.java
index 0c3f68e97f8..b77312d41b8 100644
--- a/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowDataDeserializationSchema.java
+++ b/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowDataDeserializationSchema.java
@@ -24,10 +24,10 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.jackson.JacksonMapperFactory;
 
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectReader;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.CsvMapper;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.CsvSchema;
 
 import javax.annotation.Nullable;
@@ -77,7 +77,8 @@ public final class CsvRowDataDeserializationSchema implements DeserializationSch
 
     @Override
     public void open(InitializationContext context) {
-        this.objectReader = new CsvMapper().readerFor(JsonNode.class).with(csvSchema);
+        this.objectReader =
+                JacksonMapperFactory.createCsvMapper().readerFor(JsonNode.class).with(csvSchema);
     }
 
     /** A builder for creating a {@link CsvRowDataDeserializationSchema}. */
diff --git a/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowDataSerializationSchema.java b/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowDataSerializationSchema.java
index f7fd810e691..737b74eda20 100644
--- a/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowDataSerializationSchema.java
+++ b/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowDataSerializationSchema.java
@@ -24,6 +24,7 @@ import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.function.SerializableSupplier;
+import org.apache.flink.util.jackson.JacksonMapperFactory;
 
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
@@ -152,7 +153,7 @@ public final class CsvRowDataSerializationSchema implements SerializationSchema<
                     rowType,
                     csvSchema,
                     () -> {
-                        final CsvMapper csvMapper = new CsvMapper();
+                        final CsvMapper csvMapper = JacksonMapperFactory.createCsvMapper();
                         csvMapper.configure(
                                 JsonGenerator.Feature.WRITE_BIGDECIMAL_AS_PLAIN,
                                 !isScientificNotation);
diff --git a/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowDeserializationSchema.java b/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowDeserializationSchema.java
index 24153b4de46..b2a91a9fb34 100644
--- a/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowDeserializationSchema.java
+++ b/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowDeserializationSchema.java
@@ -28,10 +28,10 @@ import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;
 import org.apache.flink.api.java.typeutils.RowTypeInfo;
 import org.apache.flink.types.Row;
 import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.jackson.JacksonMapperFactory;
 
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectReader;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.CsvMapper;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.CsvSchema;
 
 import java.io.IOException;
@@ -93,7 +93,8 @@ public final class CsvRowDeserializationSchema implements DeserializationSchema<
 
     @Override
     public void open(InitializationContext context) throws Exception {
-        objectReader = new CsvMapper().readerFor(JsonNode.class).with(csvSchema);
+        objectReader =
+                JacksonMapperFactory.createCsvMapper().readerFor(JsonNode.class).with(csvSchema);
     }
 
     /** A builder for creating a {@link CsvRowDeserializationSchema}. */
diff --git a/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowSerializationSchema.java b/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowSerializationSchema.java
index eca56958d96..53f4edf16e3 100644
--- a/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowSerializationSchema.java
+++ b/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowSerializationSchema.java
@@ -28,6 +28,7 @@ import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;
 import org.apache.flink.api.java.typeutils.RowTypeInfo;
 import org.apache.flink.types.Row;
 import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.jackson.JacksonMapperFactory;
 
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectWriter;
@@ -93,7 +94,7 @@ public final class CsvRowSerializationSchema implements SerializationSchema<Row>
 
     @Override
     public void open(InitializationContext context) throws Exception {
-        this.csvMapper = new CsvMapper();
+        this.csvMapper = JacksonMapperFactory.createCsvMapper();
         this.objectWriter = csvMapper.writer(csvSchema);
     }
 
diff --git a/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/RowCsvInputFormat.java b/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/RowCsvInputFormat.java
index 75a22cb2dd6..53634c5cc83 100644
--- a/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/RowCsvInputFormat.java
+++ b/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/RowCsvInputFormat.java
@@ -24,10 +24,10 @@ import org.apache.flink.core.fs.FileInputSplit;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.formats.csv.CsvRowDeserializationSchema.RuntimeConverter;
 import org.apache.flink.types.Row;
+import org.apache.flink.util.jackson.JacksonMapperFactory;
 
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.MappingIterator;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.CsvMapper;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.CsvSchema;
 
 import java.io.IOException;
@@ -90,7 +90,7 @@ public class RowCsvInputFormat extends AbstractCsvInputFormat<Row> {
         super.open(split);
         prepareConverter();
         this.iterator =
-                new CsvMapper()
+                JacksonMapperFactory.createCsvMapper()
                         .readerFor(JsonNode.class)
                         .with(csvSchema)
                         .readValues(csvInputStream);
diff --git a/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/DataStreamCsvITCase.java b/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/DataStreamCsvITCase.java
index ea073432116..363a17afdef 100644
--- a/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/DataStreamCsvITCase.java
+++ b/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/DataStreamCsvITCase.java
@@ -37,6 +37,7 @@ import org.apache.flink.streaming.api.operators.collect.ClientAndIterator;
 import org.apache.flink.test.junit5.MiniClusterExtension;
 import org.apache.flink.util.TestLoggerExtension;
 import org.apache.flink.util.function.FunctionWithException;
+import org.apache.flink.util.jackson.JacksonMapperFactory;
 
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonPropertyOrder;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.CsvMapper;
@@ -74,6 +75,8 @@ import static org.assertj.core.api.Assertions.assertThat;
 @ExtendWith({TestLoggerExtension.class})
 public class DataStreamCsvITCase {
 
+    private static final CsvMapper CSV_MAPPER = JacksonMapperFactory.createCsvMapper();
+
     private static final int PARALLELISM = 4;
 
     @TempDir File outDir;
@@ -163,7 +166,7 @@ public class DataStreamCsvITCase {
 
         final CsvReaderFormat<CityPojo> csvFormat =
                 CsvReaderFormat.forSchema(
-                        () -> new CsvMapper(),
+                        () -> CSV_MAPPER,
                         mapper ->
                                 mapper.schemaFor(CityPojo.class)
                                         .withoutQuoteChar()
@@ -229,9 +232,8 @@ public class DataStreamCsvITCase {
 
     private static <T> BulkWriter.Factory<T> factoryForPojo(Class<T> pojoClass) {
         final Converter<T, T, Void> converter = (value, context) -> value;
-        final CsvMapper csvMapper = new CsvMapper();
-        final CsvSchema schema = csvMapper.schemaFor(pojoClass).withoutQuoteChar();
-        return (out) -> new CsvBulkWriter<>(csvMapper, schema, converter, null, out);
+        final CsvSchema schema = CSV_MAPPER.schemaFor(pojoClass).withoutQuoteChar();
+        return (out) -> new CsvBulkWriter<>(CSV_MAPPER, schema, converter, null, out);
     }
 
     private static Map<File, String> getFileContentByPath(File directory) throws IOException {
diff --git a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonDeserializationSchema.java b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonDeserializationSchema.java
index fd28712d56a..cc244b0a0c2 100644
--- a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonDeserializationSchema.java
+++ b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonDeserializationSchema.java
@@ -21,6 +21,7 @@ import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.serialization.AbstractDeserializationSchema;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.util.function.SerializableSupplier;
+import org.apache.flink.util.jackson.JacksonMapperFactory;
 
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
 
@@ -37,11 +38,11 @@ public class JsonDeserializationSchema<T> extends AbstractDeserializationSchema<
     protected transient ObjectMapper mapper;
 
     public JsonDeserializationSchema(Class<T> clazz) {
-        this(clazz, () -> new ObjectMapper());
+        this(clazz, JacksonMapperFactory::createObjectMapper);
     }
 
     public JsonDeserializationSchema(TypeInformation<T> typeInformation) {
-        this(typeInformation, () -> new ObjectMapper());
+        this(typeInformation, JacksonMapperFactory::createObjectMapper);
     }
 
     public JsonDeserializationSchema(
diff --git a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDataDeserializationSchema.java b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDataDeserializationSchema.java
index 805b299c11c..9a57bac203b 100644
--- a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDataDeserializationSchema.java
+++ b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDataDeserializationSchema.java
@@ -26,6 +26,7 @@ import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.types.logical.DecimalType;
 import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.table.types.logical.utils.LogicalTypeChecks;
+import org.apache.flink.util.jackson.JacksonMapperFactory;
 
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.json.JsonReadFeature;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.DeserializationFeature;
@@ -97,7 +98,7 @@ public class JsonRowDataDeserializationSchema implements DeserializationSchema<R
     @Override
     public void open(InitializationContext context) throws Exception {
         objectMapper =
-                new ObjectMapper()
+                JacksonMapperFactory.createObjectMapper()
                         .configure(
                                 JsonReadFeature.ALLOW_UNESCAPED_CONTROL_CHARS.mappedFeature(),
                                 true);
diff --git a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDataSerializationSchema.java b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDataSerializationSchema.java
index 6a8c619eeac..c8b7f73b64d 100644
--- a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDataSerializationSchema.java
+++ b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDataSerializationSchema.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.serialization.SerializationSchema;
 import org.apache.flink.formats.common.TimestampFormat;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.util.jackson.JacksonMapperFactory;
 
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
@@ -86,7 +87,7 @@ public class JsonRowDataSerializationSchema implements SerializationSchema<RowDa
     @Override
     public void open(InitializationContext context) throws Exception {
         mapper =
-                new ObjectMapper()
+                JacksonMapperFactory.createObjectMapper()
                         .configure(
                                 JsonGenerator.Feature.WRITE_BIGDECIMAL_AS_PLAIN,
                                 encodeDecimalAsPlainNumber);
diff --git a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDeserializationSchema.java b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDeserializationSchema.java
index b2b7e6dada5..dd4a9bb9f99 100644
--- a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDeserializationSchema.java
+++ b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDeserializationSchema.java
@@ -30,6 +30,7 @@ import org.apache.flink.api.java.typeutils.RowTypeInfo;
 import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.table.types.logical.utils.LogicalTypeChecks;
 import org.apache.flink.types.Row;
+import org.apache.flink.util.jackson.JacksonMapperFactory;
 
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.DeserializationFeature;
@@ -121,7 +122,7 @@ public class JsonRowDeserializationSchema implements DeserializationSchema<Row>
 
     @Override
     public void open(InitializationContext context) throws Exception {
-        objectMapper = new ObjectMapper();
+        objectMapper = JacksonMapperFactory.createObjectMapper();
         if (hasDecimalType) {
             objectMapper.enable(DeserializationFeature.USE_BIG_DECIMAL_FOR_FLOATS);
         }
diff --git a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowSchemaConverter.java b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowSchemaConverter.java
index fe412320456..d7761851ee4 100644
--- a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowSchemaConverter.java
+++ b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowSchemaConverter.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeinfo.Types;
 import org.apache.flink.types.Row;
 import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.jackson.JacksonMapperFactory;
 
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParser;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
@@ -97,7 +98,7 @@ public final class JsonRowSchemaConverter {
     @SuppressWarnings("unchecked")
     public static <T> TypeInformation<T> convert(String jsonSchema) {
         Preconditions.checkNotNull(jsonSchema, "JSON schema");
-        final ObjectMapper mapper = new ObjectMapper();
+        final ObjectMapper mapper = JacksonMapperFactory.createObjectMapper();
         mapper.getFactory()
                 .enable(JsonParser.Feature.ALLOW_COMMENTS)
                 .enable(JsonParser.Feature.ALLOW_UNQUOTED_FIELD_NAMES)
diff --git a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowSerializationSchema.java b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowSerializationSchema.java
index e789307967e..f185d211bf9 100644
--- a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowSerializationSchema.java
+++ b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowSerializationSchema.java
@@ -28,6 +28,7 @@ import org.apache.flink.api.java.typeutils.RowTypeInfo;
 import org.apache.flink.types.Row;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.WrappingRuntimeException;
+import org.apache.flink.util.jackson.JacksonMapperFactory;
 
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
@@ -96,7 +97,7 @@ public class JsonRowSerializationSchema implements SerializationSchema<Row> {
 
     @Override
     public void open(InitializationContext context) throws Exception {
-        mapper = new ObjectMapper();
+        mapper = JacksonMapperFactory.createObjectMapper();
     }
 
     /** Builder for {@link JsonRowSerializationSchema}. */
diff --git a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonNodeDeserializationSchemaTest.java b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonNodeDeserializationSchemaTest.java
index 90751525feb..e6b2a3e05cb 100644
--- a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonNodeDeserializationSchemaTest.java
+++ b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonNodeDeserializationSchemaTest.java
@@ -18,6 +18,7 @@
 package org.apache.flink.formats.json;
 
 import org.apache.flink.connector.testutils.formats.DummyInitializationContext;
+import org.apache.flink.util.jackson.JacksonMapperFactory;
 
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
@@ -34,7 +35,7 @@ class JsonNodeDeserializationSchemaTest {
 
     @Test
     void testDeserialize() throws IOException {
-        ObjectMapper mapper = new ObjectMapper();
+        ObjectMapper mapper = JacksonMapperFactory.createObjectMapper();
         ObjectNode initialValue = mapper.createObjectNode();
         initialValue.put("key", 4).put("value", "world");
         byte[] serializedValue = mapper.writeValueAsBytes(initialValue);
diff --git a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonRowDataSerDeSchemaTest.java b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonRowDataSerDeSchemaTest.java
index 888be3b9fe7..883c3f09134 100644
--- a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonRowDataSerDeSchemaTest.java
+++ b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonRowDataSerDeSchemaTest.java
@@ -31,6 +31,7 @@ import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.types.Row;
+import org.apache.flink.util.jackson.JacksonMapperFactory;
 
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ArrayNode;
@@ -83,6 +84,8 @@ import static org.assertj.core.api.Assertions.assertThatThrownBy;
  */
 class JsonRowDataSerDeSchemaTest {
 
+    private static final ObjectMapper OBJECT_MAPPER = JacksonMapperFactory.createObjectMapper();
+
     @Test
     void testSerDe() throws Exception {
         byte tinyint = 'c';
@@ -115,11 +118,10 @@ class JsonRowDataSerDeSchemaTest {
         innerMap.put("key", 234);
         nestedMap.put("inner_map", innerMap);
 
-        ObjectMapper objectMapper = new ObjectMapper();
-        ArrayNode doubleNode = objectMapper.createArrayNode().add(1.1D).add(2.2D).add(3.3D);
+        ArrayNode doubleNode = OBJECT_MAPPER.createArrayNode().add(1.1D).add(2.2D).add(3.3D);
 
         // Root
-        ObjectNode root = objectMapper.createObjectNode();
+        ObjectNode root = OBJECT_MAPPER.createObjectNode();
         root.put("bool", true);
         root.put("tinyint", tinyint);
         root.put("smallint", smallint);
@@ -139,7 +141,7 @@ class JsonRowDataSerDeSchemaTest {
         root.putObject("multiSet").put("element", 2);
         root.putObject("map2map").putObject("inner_map").put("key", 234);
 
-        byte[] serializedJson = objectMapper.writeValueAsBytes(root);
+        byte[] serializedJson = OBJECT_MAPPER.writeValueAsBytes(root);
 
         DataType dataType =
                 ROW(
@@ -220,8 +222,7 @@ class JsonRowDataSerDeSchemaTest {
         double doubleValue = random.nextDouble();
         float floatValue = random.nextFloat();
 
-        ObjectMapper objectMapper = new ObjectMapper();
-        ObjectNode root = objectMapper.createObjectNode();
+        ObjectNode root = OBJECT_MAPPER.createObjectNode();
         root.put("bool", String.valueOf(bool));
         root.put("int", String.valueOf(integer));
         root.put("bigint", String.valueOf(bigint));
@@ -230,7 +231,7 @@ class JsonRowDataSerDeSchemaTest {
         root.put("float1", String.valueOf(floatValue));
         root.put("float2", new BigDecimal(floatValue));
 
-        byte[] serializedJson = objectMapper.writeValueAsBytes(root);
+        byte[] serializedJson = OBJECT_MAPPER.writeValueAsBytes(root);
 
         DataType dataType =
                 ROW(
@@ -296,11 +297,9 @@ class JsonRowDataSerDeSchemaTest {
                         true);
         open(serializationSchema);
 
-        ObjectMapper objectMapper = new ObjectMapper();
-
         // the first row
         {
-            ObjectNode root = objectMapper.createObjectNode();
+            ObjectNode root = OBJECT_MAPPER.createObjectNode();
             root.put("f1", 1);
             root.put("f2", true);
             root.put("f3", "str");
@@ -312,7 +311,7 @@ class JsonRowDataSerDeSchemaTest {
             ObjectNode row = root.putObject("f6");
             row.put("f1", "this is row1");
             row.put("f2", 12);
-            byte[] serializedJson = objectMapper.writeValueAsBytes(root);
+            byte[] serializedJson = OBJECT_MAPPER.writeValueAsBytes(root);
             RowData rowData = deserializationSchema.deserialize(serializedJson);
             byte[] actual = serializationSchema.serialize(rowData);
             assertThat(serializedJson).containsExactly(actual);
@@ -320,7 +319,7 @@ class JsonRowDataSerDeSchemaTest {
 
         // the second row
         {
-            ObjectNode root = objectMapper.createObjectNode();
+            ObjectNode root = OBJECT_MAPPER.createObjectNode();
             root.put("f1", 10);
             root.put("f2", false);
             root.put("f3", "newStr");
@@ -332,7 +331,7 @@ class JsonRowDataSerDeSchemaTest {
             ObjectNode row = root.putObject("f6");
             row.put("f1", "this is row2");
             row.putNull("f2");
-            byte[] serializedJson = objectMapper.writeValueAsBytes(root);
+            byte[] serializedJson = OBJECT_MAPPER.writeValueAsBytes(root);
             RowData rowData = deserializationSchema.deserialize(serializedJson);
             byte[] actual = serializationSchema.serialize(rowData);
             assertThat(serializedJson).containsExactly(actual);
@@ -419,12 +418,10 @@ class JsonRowDataSerDeSchemaTest {
 
     @Test
     void testDeserializationMissingField() throws Exception {
-        ObjectMapper objectMapper = new ObjectMapper();
-
         // Root
-        ObjectNode root = objectMapper.createObjectNode();
+        ObjectNode root = OBJECT_MAPPER.createObjectNode();
         root.put("id", 123123123);
-        byte[] serializedJson = objectMapper.writeValueAsBytes(root);
+        byte[] serializedJson = OBJECT_MAPPER.writeValueAsBytes(root);
 
         DataType dataType = ROW(FIELD("name", STRING()));
         RowType schema = (RowType) dataType.getLogicalType();
@@ -504,14 +501,12 @@ class JsonRowDataSerDeSchemaTest {
                         true);
         open(serializationSchema);
 
-        ObjectMapper objectMapper = new ObjectMapper();
-
-        ObjectNode root = objectMapper.createObjectNode();
+        ObjectNode root = OBJECT_MAPPER.createObjectNode();
         root.put("timestamp3", "1990-10-14 12:12:43.123");
         root.put("timestamp9", "1990-10-14 12:12:43.123456789");
         root.put("timestamp_with_local_timezone3", "1990-10-14 12:12:43.123Z");
         root.put("timestamp_with_local_timezone9", "1990-10-14 12:12:43.123456789Z");
-        byte[] serializedJson = objectMapper.writeValueAsBytes(root);
+        byte[] serializedJson = OBJECT_MAPPER.writeValueAsBytes(root);
         RowData rowData = deserializationSchema.deserialize(serializedJson);
         byte[] actual = serializationSchema.serialize(rowData);
         assertThat(serializedJson).containsExactly(actual);
diff --git a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonRowDeserializationSchemaTest.java b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonRowDeserializationSchemaTest.java
index deee53ae011..81e370c0fd3 100644
--- a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonRowDeserializationSchemaTest.java
+++ b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonRowDeserializationSchemaTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.formats.json;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeinfo.Types;
 import org.apache.flink.types.Row;
+import org.apache.flink.util.jackson.JacksonMapperFactory;
 
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
@@ -52,6 +53,8 @@ import static org.junit.internal.matchers.ThrowableMessageMatcher.hasMessage;
 /** Tests for the {@link JsonRowDeserializationSchema}. */
 public class JsonRowDeserializationSchemaTest {
 
+    private static final ObjectMapper OBJECT_MAPPER = JacksonMapperFactory.createObjectMapper();
+
     @Rule public ExpectedException thrown = ExpectedException.none();
 
     /** Tests simple deserialization using type information. */
@@ -73,10 +76,8 @@ public class JsonRowDeserializationSchemaTest {
         innerMap.put("key", 234);
         nestedMap.put("inner_map", innerMap);
 
-        ObjectMapper objectMapper = new ObjectMapper();
-
         // Root
-        ObjectNode root = objectMapper.createObjectNode();
+        ObjectNode root = OBJECT_MAPPER.createObjectNode();
         root.put("id", id);
         root.put("name", name);
         root.put("bytes", bytes);
@@ -89,7 +90,7 @@ public class JsonRowDeserializationSchemaTest {
         root.putObject("map").put("flink", 123);
         root.putObject("map2map").putObject("inner_map").put("key", 234);
 
-        byte[] serializedJson = objectMapper.writeValueAsBytes(root);
+        byte[] serializedJson = OBJECT_MAPPER.writeValueAsBytes(root);
 
         JsonRowDeserializationSchema deserializationSchema =
                 new JsonRowDeserializationSchema.Builder(
@@ -149,10 +150,8 @@ public class JsonRowDeserializationSchemaTest {
                 };
         final String[] strings = new String[] {"one", "two", "three"};
 
-        final ObjectMapper objectMapper = new ObjectMapper();
-
         // Root
-        ObjectNode root = objectMapper.createObjectNode();
+        ObjectNode root = OBJECT_MAPPER.createObjectNode();
         root.put("id", id.longValue());
         root.putNull("idOrNull");
         root.put("name", name);
@@ -164,7 +163,7 @@ public class JsonRowDeserializationSchemaTest {
         root.putArray("strings").add("one").add("two").add("three");
         root.putObject("nested").put("booleanField", true).put("decimalField", 12);
 
-        final byte[] serializedJson = objectMapper.writeValueAsBytes(root);
+        final byte[] serializedJson = OBJECT_MAPPER.writeValueAsBytes(root);
 
         JsonRowDeserializationSchema deserializationSchema =
                 new JsonRowDeserializationSchema.Builder(
@@ -212,12 +211,10 @@ public class JsonRowDeserializationSchemaTest {
     /** Tests deserialization with non-existing field name. */
     @Test
     public void testMissingNode() throws Exception {
-        ObjectMapper objectMapper = new ObjectMapper();
-
         // Root
-        ObjectNode root = objectMapper.createObjectNode();
+        ObjectNode root = OBJECT_MAPPER.createObjectNode();
         root.put("id", 123123123);
-        byte[] serializedJson = objectMapper.writeValueAsBytes(root);
+        byte[] serializedJson = OBJECT_MAPPER.writeValueAsBytes(root);
 
         TypeInformation<Row> rowTypeInformation =
                 Types.ROW_NAMED(new String[] {"name"}, Types.STRING);
diff --git a/flink-python/src/main/java/org/apache/flink/python/metric/FlinkMetricContainer.java b/flink-python/src/main/java/org/apache/flink/python/metric/FlinkMetricContainer.java
index a184c39c621..df2f84a283b 100644
--- a/flink-python/src/main/java/org/apache/flink/python/metric/FlinkMetricContainer.java
+++ b/flink-python/src/main/java/org/apache/flink/python/metric/FlinkMetricContainer.java
@@ -26,6 +26,7 @@ import org.apache.flink.metrics.Gauge;
 import org.apache.flink.metrics.Meter;
 import org.apache.flink.metrics.MeterView;
 import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.util.jackson.JacksonMapperFactory;
 
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
@@ -56,6 +57,8 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
 @Internal
 public final class FlinkMetricContainer {
 
+    private static final ObjectMapper OBJECT_MAPPER = JacksonMapperFactory.createObjectMapper();
+
     private static final String METRIC_KEY_SEPARATOR =
             GlobalConfiguration.loadConfiguration().getString(MetricOptions.SCOPE_DELIMITER);
 
@@ -206,7 +209,7 @@ public final class FlinkMetricContainer {
     static ArrayList getNameSpaceArray(MetricKey metricKey) {
         MetricName metricName = metricKey.metricName();
         try {
-            return new ObjectMapper().readValue(metricName.getNamespace(), ArrayList.class);
+            return OBJECT_MAPPER.readValue(metricName.getNamespace(), ArrayList.class);
         } catch (JsonProcessingException e) {
             throw new RuntimeException(
                     String.format("Parse namespace[%s] error. ", metricName.getNamespace()), e);
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java
index c7c779c4e93..fff6d42306f 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java
@@ -49,6 +49,7 @@ import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.ShutdownHookUtil;
 import org.apache.flink.util.concurrent.ExecutorThreadFactory;
+import org.apache.flink.util.jackson.JacksonMapperFactory;
 
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
 
@@ -100,7 +101,7 @@ import java.util.function.Consumer;
 public class HistoryServer {
 
     private static final Logger LOG = LoggerFactory.getLogger(HistoryServer.class);
-    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+    private static final ObjectMapper OBJECT_MAPPER = JacksonMapperFactory.createObjectMapper();
 
     private final Configuration config;
 
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java
index 8e5aee9c633..a8d782354a0 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java
@@ -30,6 +30,7 @@ import org.apache.flink.runtime.messages.webmonitor.JobDetails;
 import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
 import org.apache.flink.runtime.rest.messages.JobsOverviewHeaders;
 import org.apache.flink.util.FileUtils;
+import org.apache.flink.util.jackson.JacksonMapperFactory;
 
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonFactory;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator;
@@ -104,7 +105,7 @@ class HistoryServerArchiveFetcher {
     private static final Logger LOG = LoggerFactory.getLogger(HistoryServerArchiveFetcher.class);
 
     private static final JsonFactory jacksonFactory = new JsonFactory();
-    private static final ObjectMapper mapper = new ObjectMapper();
+    private static final ObjectMapper mapper = JacksonMapperFactory.createObjectMapper();
 
     private static final String JSON_FILE_ENDING = ".json";
 
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/rest/compatibility/CompatibilityRoutines.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/rest/compatibility/CompatibilityRoutines.java
index 1bf3b8f99cb..20bbc04501c 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/rest/compatibility/CompatibilityRoutines.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/rest/compatibility/CompatibilityRoutines.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.rest.compatibility;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.runtime.rest.handler.RestHandlerSpecification;
 import org.apache.flink.runtime.rest.messages.UntypedResponseMessageHeaders;
+import org.apache.flink.util.jackson.JacksonMapperFactory;
 
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
@@ -137,7 +138,7 @@ enum CompatibilityRoutines {
                             REQUEST_ROUTINE,
                             RESPONSE_ROUTINE));
 
-    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+    private static final ObjectMapper OBJECT_MAPPER = JacksonMapperFactory.createObjectMapper();
     private static final JsonSchemaGenerator SCHEMA_GENERATOR =
             new JsonSchemaGenerator(OBJECT_MAPPER);
 
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/rest/compatibility/RestAPIStabilityTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/rest/compatibility/RestAPIStabilityTest.java
index 0dd5704b1dd..fec3204ebea 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/rest/compatibility/RestAPIStabilityTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/rest/compatibility/RestAPIStabilityTest.java
@@ -23,6 +23,7 @@ import org.apache.flink.runtime.rest.util.DocumentingDispatcherRestEndpoint;
 import org.apache.flink.runtime.rest.util.DocumentingRestEndpoint;
 import org.apache.flink.runtime.rest.versioning.RuntimeRestAPIVersion;
 import org.apache.flink.util.ConfigurationException;
+import org.apache.flink.util.jackson.JacksonMapperFactory;
 
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.util.DefaultIndenter;
@@ -56,7 +57,7 @@ final class RestAPIStabilityTest {
 
     private static final String SNAPSHOT_RESOURCE_PATTERN = "rest_api_%s.snapshot";
 
-    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+    private static final ObjectMapper OBJECT_MAPPER = JacksonMapperFactory.createObjectMapper();
 
     private static class StableRestApiVersionProvider implements ArgumentsProvider {
 
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebFrontendITCase.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebFrontendITCase.java
index d23a0cc1898..b677a094d54 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebFrontendITCase.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebFrontendITCase.java
@@ -38,6 +38,7 @@ import org.apache.flink.runtime.webmonitor.testutils.HttpTestClient;
 import org.apache.flink.test.junit5.InjectClusterClient;
 import org.apache.flink.test.junit5.InjectClusterRESTAddress;
 import org.apache.flink.test.junit5.MiniClusterExtension;
+import org.apache.flink.util.jackson.JacksonMapperFactory;
 
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
@@ -81,6 +82,7 @@ class WebFrontendITCase {
     private static final int NUM_TASK_MANAGERS = 2;
     private static final int NUM_SLOTS = 4;
 
+    private static final ObjectMapper OBJECT_MAPPER = JacksonMapperFactory.createObjectMapper();
     private static final Configuration CLUSTER_CONFIGURATION = getClusterConfiguration();
 
     @RegisterExtension
@@ -170,8 +172,7 @@ class WebFrontendITCase {
     void getNumberOfTaskManagers(@InjectClusterRESTAddress URI restAddress) throws Exception {
         String json = getFromHTTP("http://localhost:" + restAddress.getPort() + "/taskmanagers/");
 
-        ObjectMapper mapper = new ObjectMapper();
-        JsonNode response = mapper.readTree(json);
+        JsonNode response = OBJECT_MAPPER.readTree(json);
         ArrayNode taskManagers = (ArrayNode) response.get("taskmanagers");
 
         assertThat(taskManagers).hasSize(NUM_TASK_MANAGERS);
@@ -181,8 +182,7 @@ class WebFrontendITCase {
     void getTaskManagers(@InjectClusterRESTAddress URI restAddress) throws Exception {
         String json = getFromHTTP("http://localhost:" + restAddress.getPort() + "/taskmanagers/");
 
-        ObjectMapper mapper = new ObjectMapper();
-        JsonNode parsed = mapper.readTree(json);
+        JsonNode parsed = OBJECT_MAPPER.readTree(json);
         ArrayNode taskManagers = (ArrayNode) parsed.get("taskmanagers");
 
         assertThat(taskManagers).hasSize(NUM_TASK_MANAGERS);
@@ -231,8 +231,7 @@ class WebFrontendITCase {
             throws Exception {
         String json = getFromHTTP("http://localhost:" + restAddress.getPort() + "/taskmanagers/");
 
-        ObjectMapper mapper = new ObjectMapper();
-        JsonNode parsed = mapper.readTree(json);
+        JsonNode parsed = OBJECT_MAPPER.readTree(json);
         ArrayNode taskManagers = (ArrayNode) parsed.get("taskmanagers");
         JsonNode taskManager = taskManagers.get(0);
         String id = taskManager.get("id").asText();
@@ -278,8 +277,7 @@ class WebFrontendITCase {
     private static Map<String, String> fromKeyValueJsonArray(String jsonString) {
         try {
             Map<String, String> map = new HashMap<>();
-            ObjectMapper m = new ObjectMapper();
-            ArrayNode array = (ArrayNode) m.readTree(jsonString);
+            ArrayNode array = (ArrayNode) OBJECT_MAPPER.readTree(jsonString);
 
             Iterator<JsonNode> elements = array.elements();
             while (elements.hasNext()) {
@@ -394,8 +392,7 @@ class WebFrontendITCase {
 
         String json = getFromHTTP("http://localhost:" + restAddress.getPort() + "/jobs/overview");
 
-        ObjectMapper mapper = new ObjectMapper();
-        JsonNode parsed = mapper.readTree(json);
+        JsonNode parsed = OBJECT_MAPPER.readTree(json);
         ArrayNode jsonJobs = (ArrayNode) parsed.get("jobs");
         assertThat(jsonJobs.size()).isEqualTo(1);
         assertThat(jsonJobs.get(0).get("duration").asInt()).isGreaterThan(0);
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerTest.java
index 4c5f7ea2645..8ec80e33358 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerTest.java
@@ -37,6 +37,7 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
 import org.apache.flink.test.util.MiniClusterWithClientResource;
 import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.jackson.JacksonMapperFactory;
 
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonFactory;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator;
@@ -78,7 +79,8 @@ class HistoryServerTest {
                     .enable(JsonGenerator.Feature.AUTO_CLOSE_TARGET)
                     .disable(JsonGenerator.Feature.AUTO_CLOSE_JSON_CONTENT);
     private static final ObjectMapper OBJECT_MAPPER =
-            new ObjectMapper().enable(DeserializationFeature.FAIL_ON_MISSING_CREATOR_PROPERTIES);
+            JacksonMapperFactory.createObjectMapper()
+                    .enable(DeserializationFeature.FAIL_ON_MISSING_CREATOR_PROPERTIES);
 
     private MiniClusterWithClientResource cluster;
     private File jmDirectory;
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/FileSystemJobResultStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/FileSystemJobResultStore.java
index 010ce77e74c..986b7a03343 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/FileSystemJobResultStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/FileSystemJobResultStore.java
@@ -30,6 +30,7 @@ import org.apache.flink.runtime.rest.messages.json.JobResultDeserializer;
 import org.apache.flink.runtime.rest.messages.json.JobResultSerializer;
 import org.apache.flink.runtime.util.NonClosingOutputStreamDecorator;
 import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.jackson.JacksonMapperFactory;
 
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnore;
@@ -66,7 +67,7 @@ public class FileSystemJobResultStore extends AbstractThreadsafeJobResultStore {
         return filename.endsWith(FILE_EXTENSION);
     }
 
-    private final ObjectMapper mapper = new ObjectMapper();
+    private final ObjectMapper mapper = JacksonMapperFactory.createObjectMapper();
 
     private final FileSystem fileSystem;
 
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/history/FsJobArchivist.java b/flink-runtime/src/main/java/org/apache/flink/runtime/history/FsJobArchivist.java
index 67c9c2b7655..65c76fdefaa 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/history/FsJobArchivist.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/history/FsJobArchivist.java
@@ -26,6 +26,7 @@ import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
 import org.apache.flink.util.IOUtils;
+import org.apache.flink.util.jackson.JacksonMapperFactory;
 
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonEncoding;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonFactory;
@@ -47,7 +48,7 @@ public class FsJobArchivist {
 
     private static final Logger LOG = LoggerFactory.getLogger(FsJobArchivist.class);
     private static final JsonFactory jacksonFactory = new JsonFactory();
-    private static final ObjectMapper mapper = new ObjectMapper();
+    private static final ObjectMapper mapper = JacksonMapperFactory.createObjectMapper();
 
     private static final String ARCHIVE = "archive";
     private static final String PATH = "path";
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/util/RestMapperUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/util/RestMapperUtils.java
index 9a6bfba2f6e..eb42f41fabe 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/util/RestMapperUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/util/RestMapperUtils.java
@@ -18,6 +18,8 @@
 
 package org.apache.flink.runtime.rest.util;
 
+import org.apache.flink.util.jackson.JacksonMapperFactory;
+
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.DeserializationFeature;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.SerializationFeature;
@@ -27,7 +29,7 @@ public class RestMapperUtils {
     private static final ObjectMapper objectMapper;
 
     static {
-        objectMapper = new ObjectMapper();
+        objectMapper = JacksonMapperFactory.createObjectMapper();
         objectMapper.enable(
                 DeserializationFeature.FAIL_ON_IGNORED_PROPERTIES,
                 DeserializationFeature.FAIL_ON_NULL_FOR_PRIMITIVES,
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/FileSystemJobResultStoreTestInternal.java b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/FileSystemJobResultStoreTestInternal.java
index 20d6f8a4c29..c952e5d4342 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/FileSystemJobResultStoreTestInternal.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/FileSystemJobResultStoreTestInternal.java
@@ -23,6 +23,7 @@ import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.jobmaster.JobResult;
 import org.apache.flink.util.FileUtils;
 import org.apache.flink.util.TestLoggerExtension;
+import org.apache.flink.util.jackson.JacksonMapperFactory;
 
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
 
@@ -45,7 +46,7 @@ import static org.assertj.core.api.Assertions.assertThat;
 @ExtendWith(TestLoggerExtension.class)
 public class FileSystemJobResultStoreTestInternal {
 
-    private static final ObjectMapper MAPPER = new ObjectMapper();
+    private static final ObjectMapper MAPPER = JacksonMapperFactory.createObjectMapper();
 
     private FileSystemJobResultStore fileSystemJobResultStore;
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/jsonplan/JsonGeneratorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/jsonplan/JsonGeneratorTest.java
index a1a37b1d2cc..a7a3b942e2b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/jsonplan/JsonGeneratorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/jsonplan/JsonGeneratorTest.java
@@ -25,6 +25,7 @@ import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.operators.testutils.DummyInvokable;
+import org.apache.flink.util.jackson.JacksonMapperFactory;
 
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
@@ -96,7 +97,7 @@ public class JsonGeneratorTest {
             assertNotNull(plan);
 
             // validate the produced JSON
-            ObjectMapper m = new ObjectMapper();
+            ObjectMapper m = JacksonMapperFactory.createObjectMapper();
             JsonNode rootNode = m.readTree(plan);
 
             // core fields
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/taskmanager/TaskManagerDetailsHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/taskmanager/TaskManagerDetailsHandlerTest.java
index 5e2e14864d8..6d1b58fa3d8 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/taskmanager/TaskManagerDetailsHandlerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/taskmanager/TaskManagerDetailsHandlerTest.java
@@ -39,6 +39,7 @@ import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerInfo;
 import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerMetricsInfo;
 import org.apache.flink.runtime.taskexecutor.TaskExecutorMemoryConfiguration;
 import org.apache.flink.testutils.TestingUtils;
+import org.apache.flink.util.jackson.JacksonMapperFactory;
 
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
@@ -118,7 +119,7 @@ class TaskManagerDetailsHandlerTest {
                         20L,
                         Collections.emptyList());
 
-        ObjectMapper objectMapper = new ObjectMapper();
+        ObjectMapper objectMapper = JacksonMapperFactory.createObjectMapper();
         String actualJson = objectMapper.writeValueAsString(actual);
         String expectedJson = objectMapper.writeValueAsString(expected);
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/json/JobResultDeserializerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/json/JobResultDeserializerTest.java
index 962b02c99f8..c9752f9fc34 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/json/JobResultDeserializerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/json/JobResultDeserializerTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.rest.messages.json;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.jobmaster.JobResult;
 import org.apache.flink.util.TestLogger;
+import org.apache.flink.util.jackson.JacksonMapperFactory;
 
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonMappingException;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
@@ -43,7 +44,7 @@ public class JobResultDeserializerTest extends TestLogger {
         final SimpleModule simpleModule = new SimpleModule();
         simpleModule.addDeserializer(JobResult.class, new JobResultDeserializer());
 
-        objectMapper = new ObjectMapper();
+        objectMapper = JacksonMapperFactory.createObjectMapper();
         objectMapper.registerModule(simpleModule);
     }
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/json/SerializedThrowableSerializerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/json/SerializedThrowableSerializerTest.java
index ac66aa314c4..b167fa7fbcb 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/json/SerializedThrowableSerializerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/json/SerializedThrowableSerializerTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.rest.messages.json;
 
 import org.apache.flink.util.SerializedThrowable;
 import org.apache.flink.util.TestLogger;
+import org.apache.flink.util.jackson.JacksonMapperFactory;
 
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.module.SimpleModule;
@@ -33,7 +34,7 @@ import static org.junit.Assert.assertTrue;
 /** Tests for {@link SerializedThrowableSerializer} and {@link SerializedThrowableDeserializer}. */
 public class SerializedThrowableSerializerTest extends TestLogger {
 
-    private ObjectMapper objectMapper = new ObjectMapper();
+    private ObjectMapper objectMapper;
 
     @Before
     public void setUp() {
@@ -42,7 +43,7 @@ public class SerializedThrowableSerializerTest extends TestLogger {
                 SerializedThrowable.class, new SerializedThrowableDeserializer());
         simpleModule.addSerializer(SerializedThrowable.class, new SerializedThrowableSerializer());
 
-        objectMapper = new ObjectMapper();
+        objectMapper = JacksonMapperFactory.createObjectMapper();
         objectMapper.registerModule(simpleModule);
     }
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/json/SerializedValueSerializerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/json/SerializedValueSerializerTest.java
index ee3d57c00b4..6e6ec96cc64 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/json/SerializedValueSerializerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/json/SerializedValueSerializerTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.rest.messages.json;
 
 import org.apache.flink.util.SerializedValue;
 import org.apache.flink.util.TestLogger;
+import org.apache.flink.util.jackson.JacksonMapperFactory;
 
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.type.TypeReference;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JavaType;
@@ -40,7 +41,7 @@ public class SerializedValueSerializerTest extends TestLogger {
 
     @Before
     public void setUp() {
-        objectMapper = new ObjectMapper();
+        objectMapper = JacksonMapperFactory.createObjectMapper();
         final SimpleModule simpleModule = new SimpleModule();
         final JavaType serializedValueWildcardType =
                 objectMapper
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/JSONGenerator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/JSONGenerator.java
index 7047fd01e4a..b39243f27eb 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/JSONGenerator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/JSONGenerator.java
@@ -18,6 +18,7 @@
 package org.apache.flink.streaming.api.graph;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.util.jackson.JacksonMapperFactory;
 
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ArrayNode;
@@ -45,7 +46,7 @@ public class JSONGenerator {
     public static final String PARALLELISM = "parallelism";
 
     private StreamGraph streamGraph;
-    private final ObjectMapper mapper = new ObjectMapper();
+    private final ObjectMapper mapper = JacksonMapperFactory.createObjectMapper();
 
     public JSONGenerator(StreamGraph streamGraph) {
         this.streamGraph = streamGraph;
diff --git a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/connector/source/CompactPartitions.java b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/connector/source/CompactPartitions.java
index 333a3480b1a..57ce1970720 100644
--- a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/connector/source/CompactPartitions.java
+++ b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/connector/source/CompactPartitions.java
@@ -20,6 +20,7 @@ package org.apache.flink.table.connector.source;
 
 import org.apache.flink.table.catalog.CatalogPartitionSpec;
 import org.apache.flink.table.factories.DynamicTableFactory;
+import org.apache.flink.util.jackson.JacksonMapperFactory;
 
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParser;
@@ -50,7 +51,7 @@ public class CompactPartitions implements Serializable {
 
     private static final long serialVersionUID = 1L;
     private static final String FIELD_NAME_COMPACT_PARTITIONS = "compact-partitions";
-    private static final ObjectMapper MAPPER = new ObjectMapper();
+    private static final ObjectMapper MAPPER = JacksonMapperFactory.createObjectMapper();
 
     private final List<CompactPartition> compactPartitions;
 
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/JsonSerdeUtil.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/JsonSerdeUtil.java
index 8a601601d3f..46fd71036b2 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/JsonSerdeUtil.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/JsonSerdeUtil.java
@@ -40,6 +40,7 @@ import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.extraction.ExtractionUtils;
 import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.util.jackson.JacksonMapperFactory;
 
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator;
@@ -100,7 +101,7 @@ public class JsonSerdeUtil {
     private static final ObjectMapper OBJECT_MAPPER_INSTANCE;
 
     static {
-        OBJECT_MAPPER_INSTANCE = new ObjectMapper();
+        OBJECT_MAPPER_INSTANCE = JacksonMapperFactory.createObjectMapper();
 
         OBJECT_MAPPER_INSTANCE.setTypeFactory(
                 // Make sure to register the classloader of the planner
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/LogicalTypeJsonSerdeTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/LogicalTypeJsonSerdeTest.java
index d27dbb5abe3..d571d753885 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/LogicalTypeJsonSerdeTest.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/LogicalTypeJsonSerdeTest.java
@@ -64,9 +64,9 @@ import org.apache.flink.table.types.logical.ZonedTimestampType;
 import org.apache.flink.table.types.utils.DataTypeFactoryMock;
 import org.apache.flink.table.utils.CatalogManagerMocks;
 import org.apache.flink.types.Row;
+import org.apache.flink.util.jackson.JacksonMapperFactory;
 
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
 
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.parallel.Execution;
@@ -139,8 +139,8 @@ public class LogicalTypeJsonSerdeTest {
         // maximum plan content
         tableConfig.set(TableConfigOptions.PLAN_COMPILE_CATALOG_OBJECTS, ALL);
         final String maximumJson = toJson(serdeContext, STRUCTURED_TYPE);
-        final ObjectMapper mapper = new ObjectMapper();
-        final JsonNode maximumJsonNode = mapper.readTree(maximumJson);
+        final JsonNode maximumJsonNode =
+                JacksonMapperFactory.createObjectMapper().readTree(maximumJson);
         assertThat(maximumJsonNode.get(LogicalTypeJsonSerializer.FIELD_NAME_ATTRIBUTES))
                 .isNotNull();
         assertThat(maximumJsonNode.get(LogicalTypeJsonSerializer.FIELD_NAME_DESCRIPTION).asText())
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/PartitionSpecSerdeTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/PartitionSpecSerdeTest.java
index 159ef2c6ab6..b35f3b808fd 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/PartitionSpecSerdeTest.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/PartitionSpecSerdeTest.java
@@ -19,6 +19,7 @@
 package org.apache.flink.table.planner.plan.nodes.exec.serde;
 
 import org.apache.flink.table.planner.plan.nodes.exec.spec.PartitionSpec;
+import org.apache.flink.util.jackson.JacksonMapperFactory;
 
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
@@ -30,20 +31,22 @@ import static org.assertj.core.api.Assertions.assertThat;
 /** Test PartitionSpec json ser/de. */
 public class PartitionSpecSerdeTest {
 
+    private static final ObjectMapper OBJECT_MAPPER = JacksonMapperFactory.createObjectMapper();
+
     @Test
     public void testPartitionSpec() throws JsonProcessingException {
         PartitionSpec spec = new PartitionSpec(new int[] {1, 2, 3});
-        ObjectMapper mapper = new ObjectMapper();
-        assertThat(mapper.readValue(mapper.writeValueAsString(spec), PartitionSpec.class))
+        assertThat(
+                        OBJECT_MAPPER.readValue(
+                                OBJECT_MAPPER.writeValueAsString(spec), PartitionSpec.class))
                 .isEqualTo(spec);
     }
 
     @Test
     public void testAllInOne() throws JsonProcessingException {
-        ObjectMapper mapper = new ObjectMapper();
         assertThat(
-                        mapper.readValue(
-                                mapper.writeValueAsString(PartitionSpec.ALL_IN_ONE),
+                        OBJECT_MAPPER.readValue(
+                                OBJECT_MAPPER.writeValueAsString(PartitionSpec.ALL_IN_ONE),
                                 PartitionSpec.class))
                 .isEqualTo(PartitionSpec.ALL_IN_ONE);
     }
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RankProcessStrategySerdeTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RankProcessStrategySerdeTest.java
index 82c1b3849fa..69c9e553d77 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RankProcessStrategySerdeTest.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RankProcessStrategySerdeTest.java
@@ -19,6 +19,7 @@
 package org.apache.flink.table.planner.plan.nodes.exec.serde;
 
 import org.apache.flink.table.planner.plan.utils.RankProcessStrategy;
+import org.apache.flink.util.jackson.JacksonMapperFactory;
 
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
@@ -32,7 +33,7 @@ public class RankProcessStrategySerdeTest {
 
     @Test
     public void testRankRange() throws JsonProcessingException {
-        ObjectMapper mapper = new ObjectMapper();
+        ObjectMapper mapper = JacksonMapperFactory.createObjectMapper();
         RankProcessStrategy[] strategies =
                 new RankProcessStrategy[] {
                     RankProcessStrategy.UNDEFINED_STRATEGY,
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RankRangeSerdeTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RankRangeSerdeTest.java
index 90abd497f54..bc5e6a246ba 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RankRangeSerdeTest.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RankRangeSerdeTest.java
@@ -22,6 +22,7 @@ import org.apache.flink.table.runtime.operators.rank.ConstantRankRange;
 import org.apache.flink.table.runtime.operators.rank.ConstantRankRangeWithoutEnd;
 import org.apache.flink.table.runtime.operators.rank.RankRange;
 import org.apache.flink.table.runtime.operators.rank.VariableRankRange;
+import org.apache.flink.util.jackson.JacksonMapperFactory;
 
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
@@ -35,7 +36,7 @@ public class RankRangeSerdeTest {
 
     @Test
     public void testRankRange() throws JsonProcessingException {
-        ObjectMapper mapper = new ObjectMapper();
+        ObjectMapper mapper = JacksonMapperFactory.createObjectMapper();
         RankRange[] ranges =
                 new RankRange[] {
                     new ConstantRankRange(1, 2),
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RankTypeSerdeTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RankTypeSerdeTest.java
index e1daa9d03e3..04fa1552512 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RankTypeSerdeTest.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RankTypeSerdeTest.java
@@ -19,6 +19,7 @@
 package org.apache.flink.table.planner.plan.nodes.exec.serde;
 
 import org.apache.flink.table.runtime.operators.rank.RankType;
+import org.apache.flink.util.jackson.JacksonMapperFactory;
 
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
@@ -32,7 +33,7 @@ public class RankTypeSerdeTest {
 
     @Test
     public void testRankType() throws JsonProcessingException {
-        ObjectMapper mapper = new ObjectMapper();
+        ObjectMapper mapper = JacksonMapperFactory.createObjectMapper();
         for (RankType type : RankType.values()) {
             RankType result = mapper.readValue(mapper.writeValueAsString(type), RankType.class);
             assertThat(result).isEqualTo(type);
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/SortSpecSerdeTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/SortSpecSerdeTest.java
index 2b5600980dd..af2190203b5 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/SortSpecSerdeTest.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/SortSpecSerdeTest.java
@@ -19,6 +19,7 @@
 package org.apache.flink.table.planner.plan.nodes.exec.serde;
 
 import org.apache.flink.table.planner.plan.nodes.exec.spec.SortSpec;
+import org.apache.flink.util.jackson.JacksonMapperFactory;
 
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
@@ -29,6 +30,7 @@ import static org.assertj.core.api.Assertions.assertThat;
 
 /** Test SortSpec json ser/de. */
 public class SortSpecSerdeTest {
+    private static final ObjectMapper OBJECT_MAPPER = JacksonMapperFactory.createObjectMapper();
 
     @Test
     public void testSortSpec() throws JsonProcessingException {
@@ -39,15 +41,17 @@ public class SortSpecSerdeTest {
                         .addField(3, false, true)
                         .addField(4, false, false)
                         .build();
-        ObjectMapper mapper = new ObjectMapper();
-        assertThat(mapper.readValue(mapper.writeValueAsString(sortSpec), SortSpec.class))
+        assertThat(
+                        OBJECT_MAPPER.readValue(
+                                OBJECT_MAPPER.writeValueAsString(sortSpec), SortSpec.class))
                 .isEqualTo(sortSpec);
     }
 
     @Test
     public void testAny() throws JsonProcessingException {
-        ObjectMapper mapper = new ObjectMapper();
-        assertThat(mapper.readValue(mapper.writeValueAsString(SortSpec.ANY), SortSpec.class))
+        assertThat(
+                        OBJECT_MAPPER.readValue(
+                                OBJECT_MAPPER.writeValueAsString(SortSpec.ANY), SortSpec.class))
                 .isEqualTo(SortSpec.ANY);
     }
 }
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/utils/JsonTestUtils.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/utils/JsonTestUtils.java
index 682f3b3e656..6aa9f96c7d0 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/utils/JsonTestUtils.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/utils/JsonTestUtils.java
@@ -19,6 +19,7 @@
 package org.apache.flink.table.planner.utils;
 
 import org.apache.flink.FlinkVersion;
+import org.apache.flink.util.jackson.JacksonMapperFactory;
 
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
@@ -29,7 +30,8 @@ import java.io.IOException;
 /** This class contains a collection of generic utilities to deal with JSON in tests. */
 public final class JsonTestUtils {
 
-    private static final ObjectMapper OBJECT_MAPPER_INSTANCE = new ObjectMapper();
+    private static final ObjectMapper OBJECT_MAPPER_INSTANCE =
+            JacksonMapperFactory.createObjectMapper();
 
     private JsonTestUtils() {}
 
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala
index fd2fd085519..35499ba9439 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala
@@ -66,6 +66,7 @@ import org.apache.flink.table.types.utils.TypeConversions
 import org.apache.flink.table.typeutils.FieldInfoUtils
 import org.apache.flink.types.Row
 import org.apache.flink.util.{FlinkUserCodeClassLoaders, MutableURLClassLoader}
+import org.apache.flink.util.jackson.JacksonMapperFactory
 
 import _root_.java.math.{BigDecimal => JBigDecimal}
 import _root_.java.util
@@ -1612,6 +1613,8 @@ object PlanKind extends Enumeration {
 
 object TableTestUtil {
 
+  private val objectMapper = JacksonMapperFactory.createObjectMapper()
+
   val STREAM_SETTING: EnvironmentSettings =
     EnvironmentSettings.newInstance().inStreamingMode().build()
   val BATCH_SETTING: EnvironmentSettings = EnvironmentSettings.newInstance().inBatchMode().build()
@@ -1706,14 +1709,14 @@ object TableTestUtil {
 
   @throws[IOException]
   def getFormattedJson(json: String): String = {
-    val parser = new ObjectMapper().getFactory.createParser(json)
+    val parser = objectMapper.getFactory.createParser(json)
     val jsonNode: JsonNode = parser.readValueAsTree[JsonNode]
     jsonNode.toString
   }
 
   @throws[IOException]
   def getPrettyJson(json: String): String = {
-    val parser = new ObjectMapper().getFactory.createParser(json)
+    val parser = objectMapper.getFactory.createParser(json)
     val jsonNode: JsonNode = parser.readValueAsTree[JsonNode]
     jsonNode.toPrettyString
   }
diff --git a/flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/JsonJobGraphGenerationTest.java b/flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/JsonJobGraphGenerationTest.java
index 1bbf8d35c42..3e7a589f779 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/JsonJobGraphGenerationTest.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/JsonJobGraphGenerationTest.java
@@ -32,6 +32,7 @@ import org.apache.flink.optimizer.plan.OptimizedPlan;
 import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.jsonplan.JsonPlanGenerator;
+import org.apache.flink.util.jackson.JacksonMapperFactory;
 
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonFactory;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParser;
@@ -245,6 +246,8 @@ public class JsonJobGraphGenerationTest {
 
     private static class GenericValidator implements JsonValidator {
 
+        private static final ObjectMapper OBJECT_MAPPER = JacksonMapperFactory.createObjectMapper();
+
         private final int expectedParallelism;
         private final int numNodes;
 
@@ -258,8 +261,7 @@ public class JsonJobGraphGenerationTest {
             final Map<String, JsonNode> idToNode = new HashMap<>();
 
             // validate the produced JSON
-            ObjectMapper m = new ObjectMapper();
-            JsonNode rootNode = m.readTree(json);
+            JsonNode rootNode = OBJECT_MAPPER.readTree(json);
 
             JsonNode idField = rootNode.get("jid");
             JsonNode nameField = rootNode.get("name");