You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2022/08/08 17:08:22 UTC

[flink] 03/04: [FLINK-28621][core] Add central Jackson mapper factory methods

This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit c0bf0ac3fb1fe4814bff09807ed2040bb13da052
Author: Chesnay Schepler <ch...@apache.org>
AuthorDate: Thu Jul 28 16:02:26 2022 +0200

    [FLINK-28621][core] Add central Jackson mapper factory methods
---
 .../sink/testutils/KinesisFirehoseTestUtils.java   |  6 ++--
 .../kinesis/sink/examples/SinkIntoKinesis.java     |  9 +++--
 .../JSONKeyValueDeserializationSchema.java         |  3 +-
 .../KafkaRecordDeserializationSchemaTest.java      | 12 ++++---
 .../JSONKeyValueDeserializationSchemaTest.java     | 31 ++++++++---------
 flink-core/pom.xml                                 | 11 +++---
 .../flink/util/jackson/JacksonMapperFactory.java   | 40 ++++++++++++++++++++++
 .../util/jackson/JacksonMapperFactoryTest.java     | 37 +++++++++-----------
 .../flink/docs/rest/OpenApiSpecGenerator.java      |  4 ++-
 .../flink/docs/rest/RestAPIDocGenerator.java       |  3 +-
 .../table/test/KinesisFirehoseTableITTest.java     | 14 +++++---
 .../table/test/KinesisStreamsTableApiIT.java       | 11 +++---
 .../flink/tests/util/flink/FlinkDistribution.java  |  3 +-
 .../kinesis/test/KinesisTableApiITCase.java        | 11 +++---
 .../flink/streaming/kinesis/test/model/Order.java  |  2 +-
 .../apache/flink/formats/csv/CsvBulkWriter.java    |  3 +-
 .../flink/formats/csv/CsvFileFormatFactory.java    |  5 +--
 .../apache/flink/formats/csv/CsvReaderFormat.java  |  5 +--
 .../csv/CsvRowDataDeserializationSchema.java       |  5 +--
 .../formats/csv/CsvRowDataSerializationSchema.java |  3 +-
 .../formats/csv/CsvRowDeserializationSchema.java   |  5 +--
 .../formats/csv/CsvRowSerializationSchema.java     |  3 +-
 .../flink/formats/csv/RowCsvInputFormat.java       |  4 +--
 .../flink/formats/csv/DataStreamCsvITCase.java     | 10 +++---
 .../formats/json/JsonDeserializationSchema.java    |  5 +--
 .../json/JsonRowDataDeserializationSchema.java     |  3 +-
 .../json/JsonRowDataSerializationSchema.java       |  3 +-
 .../formats/json/JsonRowDeserializationSchema.java |  3 +-
 .../flink/formats/json/JsonRowSchemaConverter.java |  3 +-
 .../formats/json/JsonRowSerializationSchema.java   |  3 +-
 .../json/JsonNodeDeserializationSchemaTest.java    |  3 +-
 .../formats/json/JsonRowDataSerDeSchemaTest.java   | 37 +++++++++-----------
 .../json/JsonRowDeserializationSchemaTest.java     | 21 +++++-------
 .../flink/python/metric/FlinkMetricContainer.java  |  5 ++-
 .../runtime/webmonitor/history/HistoryServer.java  |  3 +-
 .../history/HistoryServerArchiveFetcher.java       |  3 +-
 .../rest/compatibility/CompatibilityRoutines.java  |  3 +-
 .../rest/compatibility/RestAPIStabilityTest.java   |  3 +-
 .../runtime/webmonitor/WebFrontendITCase.java      | 17 ++++-----
 .../webmonitor/history/HistoryServerTest.java      |  4 ++-
 .../highavailability/FileSystemJobResultStore.java |  3 +-
 .../flink/runtime/history/FsJobArchivist.java      |  3 +-
 .../flink/runtime/rest/util/RestMapperUtils.java   |  4 ++-
 .../FileSystemJobResultStoreTestInternal.java      |  3 +-
 .../jobgraph/jsonplan/JsonGeneratorTest.java       |  3 +-
 .../taskmanager/TaskManagerDetailsHandlerTest.java |  3 +-
 .../messages/json/JobResultDeserializerTest.java   |  3 +-
 .../json/SerializedThrowableSerializerTest.java    |  5 +--
 .../json/SerializedValueSerializerTest.java        |  3 +-
 .../flink/streaming/api/graph/JSONGenerator.java   |  3 +-
 .../table/connector/source/CompactPartitions.java  |  3 +-
 .../plan/nodes/exec/serde/JsonSerdeUtil.java       |  3 +-
 .../nodes/exec/serde/LogicalTypeJsonSerdeTest.java |  6 ++--
 .../nodes/exec/serde/PartitionSpecSerdeTest.java   | 13 ++++---
 .../exec/serde/RankProcessStrategySerdeTest.java   |  3 +-
 .../plan/nodes/exec/serde/RankRangeSerdeTest.java  |  3 +-
 .../plan/nodes/exec/serde/RankTypeSerdeTest.java   |  3 +-
 .../plan/nodes/exec/serde/SortSpecSerdeTest.java   | 12 ++++---
 .../flink/table/planner/utils/JsonTestUtils.java   |  4 ++-
 .../flink/table/planner/utils/TableTestBase.scala  |  7 ++--
 .../jsonplan/JsonJobGraphGenerationTest.java       |  6 ++--
 61 files changed, 278 insertions(+), 177 deletions(-)

diff --git a/flink-connectors/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/sink/testutils/KinesisFirehoseTestUtils.java b/flink-connectors/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/sink/testutils/KinesisFirehoseTestUtils.java
index 129c4a43e7f..f4dee623b64 100644
--- a/flink-connectors/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/sink/testutils/KinesisFirehoseTestUtils.java
+++ b/flink-connectors/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/sink/testutils/KinesisFirehoseTestUtils.java
@@ -20,6 +20,7 @@ package org.apache.flink.connector.firehose.sink.testutils;
 import org.apache.flink.connector.aws.testutils.AWSServicesTestUtils;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.util.jackson.JacksonMapperFactory;
 
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
@@ -39,7 +40,7 @@ import java.util.List;
  */
 public class KinesisFirehoseTestUtils {
 
-    private static final ObjectMapper MAPPER = new ObjectMapper();
+    private static final ObjectMapper MAPPER = JacksonMapperFactory.createObjectMapper();
 
     public static FirehoseClient createFirehoseClient(String endpoint, SdkHttpClient httpClient) {
         return AWSServicesTestUtils.createAwsSyncClient(
@@ -68,11 +69,10 @@ public class KinesisFirehoseTestUtils {
 
     public static DataStream<String> getSampleDataGenerator(
             StreamExecutionEnvironment env, int endValue) {
-        ObjectMapper mapper = new ObjectMapper();
         return env.fromSequence(1, endValue)
                 .map(Object::toString)
                 .returns(String.class)
-                .map(data -> mapper.writeValueAsString(ImmutableMap.of("data", data)));
+                .map(data -> MAPPER.writeValueAsString(ImmutableMap.of("data", data)));
     }
 
     public static List<String> getSampleData(int endValue) throws JsonProcessingException {
diff --git a/flink-connectors/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/sink/examples/SinkIntoKinesis.java b/flink-connectors/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/sink/examples/SinkIntoKinesis.java
index 362e25fda77..c861f44fbdf 100644
--- a/flink-connectors/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/sink/examples/SinkIntoKinesis.java
+++ b/flink-connectors/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/sink/examples/SinkIntoKinesis.java
@@ -22,6 +22,7 @@ import org.apache.flink.connector.aws.config.AWSConfigConstants;
 import org.apache.flink.connector.kinesis.sink.KinesisStreamsSink;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.util.jackson.JacksonMapperFactory;
 
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
 
@@ -39,8 +40,9 @@ import java.util.Properties;
  */
 public class SinkIntoKinesis {
 
+    private static final ObjectMapper OBJECT_MAPPER = JacksonMapperFactory.createObjectMapper();
+
     public static void main(String[] args) throws Exception {
-        ObjectMapper mapper = new ObjectMapper();
         final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
         env.enableCheckpointing(10_000);
 
@@ -48,7 +50,10 @@ public class SinkIntoKinesis {
                 env.fromSequence(1, 10_000_000L)
                         .map(Object::toString)
                         .returns(String.class)
-                        .map(data -> mapper.writeValueAsString(ImmutableMap.of("data", data)));
+                        .map(
+                                data ->
+                                        OBJECT_MAPPER.writeValueAsString(
+                                                ImmutableMap.of("data", data)));
 
         Properties sinkProperties = new Properties();
         sinkProperties.put(AWSConfigConstants.AWS_REGION, "your-region-here");
diff --git a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/util/serialization/JSONKeyValueDeserializationSchema.java b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/util/serialization/JSONKeyValueDeserializationSchema.java
index 86817433a6f..e2b428eec1e 100644
--- a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/util/serialization/JSONKeyValueDeserializationSchema.java
+++ b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/util/serialization/JSONKeyValueDeserializationSchema.java
@@ -21,6 +21,7 @@ import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.serialization.DeserializationSchema;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema;
+import org.apache.flink.util.jackson.JacksonMapperFactory;
 
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
@@ -55,7 +56,7 @@ public class JSONKeyValueDeserializationSchema implements KafkaDeserializationSc
 
     @Override
     public void open(DeserializationSchema.InitializationContext context) throws Exception {
-        mapper = new ObjectMapper();
+        mapper = JacksonMapperFactory.createObjectMapper();
     }
 
     @Override
diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/deserializer/KafkaRecordDeserializationSchemaTest.java b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/deserializer/KafkaRecordDeserializationSchemaTest.java
index c2e735a0c13..8766719a0c1 100644
--- a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/deserializer/KafkaRecordDeserializationSchemaTest.java
+++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/deserializer/KafkaRecordDeserializationSchemaTest.java
@@ -23,6 +23,7 @@ import org.apache.flink.connector.testutils.source.deserialization.TestingDeseri
 import org.apache.flink.formats.json.JsonDeserializationSchema;
 import org.apache.flink.streaming.util.serialization.JSONKeyValueDeserializationSchema;
 import org.apache.flink.util.Collector;
+import org.apache.flink.util.jackson.JacksonMapperFactory;
 
 import org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableMap;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
@@ -46,6 +47,8 @@ import static org.assertj.core.api.Assertions.assertThat;
 /** Unit tests for KafkaRecordDeserializationSchema. */
 public class KafkaRecordDeserializationSchemaTest {
 
+    private static final ObjectMapper OBJECT_MAPPER = JacksonMapperFactory.createObjectMapper();
+
     private static Map<String, ?> configurableConfiguration;
     private static Map<String, ?> configuration;
     private static boolean isKeyDeserializer;
@@ -135,14 +138,13 @@ public class KafkaRecordDeserializationSchemaTest {
     }
 
     private ConsumerRecord<byte[], byte[]> getConsumerRecord() throws JsonProcessingException {
-        ObjectMapper mapper = new ObjectMapper();
-        ObjectNode initialKey = mapper.createObjectNode();
+        ObjectNode initialKey = OBJECT_MAPPER.createObjectNode();
         initialKey.put("index", 4);
-        byte[] serializedKey = mapper.writeValueAsBytes(initialKey);
+        byte[] serializedKey = OBJECT_MAPPER.writeValueAsBytes(initialKey);
 
-        ObjectNode initialValue = mapper.createObjectNode();
+        ObjectNode initialValue = OBJECT_MAPPER.createObjectNode();
         initialValue.put("word", "world");
-        byte[] serializedValue = mapper.writeValueAsBytes(initialValue);
+        byte[] serializedValue = OBJECT_MAPPER.writeValueAsBytes(initialValue);
 
         return new ConsumerRecord<>("topic#1", 3, 4L, serializedKey, serializedValue);
     }
diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/JSONKeyValueDeserializationSchemaTest.java b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/JSONKeyValueDeserializationSchemaTest.java
index 3a052cee2af..ddbcf1c94c8 100644
--- a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/JSONKeyValueDeserializationSchemaTest.java
+++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/JSONKeyValueDeserializationSchemaTest.java
@@ -19,6 +19,7 @@ package org.apache.flink.streaming.connectors.kafka;
 
 import org.apache.flink.connector.testutils.formats.DummyInitializationContext;
 import org.apache.flink.streaming.util.serialization.JSONKeyValueDeserializationSchema;
+import org.apache.flink.util.jackson.JacksonMapperFactory;
 
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
@@ -31,16 +32,17 @@ import static org.assertj.core.api.Assertions.assertThat;
 /** Tests for the{@link JSONKeyValueDeserializationSchema}. */
 public class JSONKeyValueDeserializationSchemaTest {
 
+    private static final ObjectMapper OBJECT_MAPPER = JacksonMapperFactory.createObjectMapper();
+
     @Test
     public void testDeserializeWithoutMetadata() throws Exception {
-        ObjectMapper mapper = new ObjectMapper();
-        ObjectNode initialKey = mapper.createObjectNode();
+        ObjectNode initialKey = OBJECT_MAPPER.createObjectNode();
         initialKey.put("index", 4);
-        byte[] serializedKey = mapper.writeValueAsBytes(initialKey);
+        byte[] serializedKey = OBJECT_MAPPER.writeValueAsBytes(initialKey);
 
-        ObjectNode initialValue = mapper.createObjectNode();
+        ObjectNode initialValue = OBJECT_MAPPER.createObjectNode();
         initialValue.put("word", "world");
-        byte[] serializedValue = mapper.writeValueAsBytes(initialValue);
+        byte[] serializedValue = OBJECT_MAPPER.writeValueAsBytes(initialValue);
 
         JSONKeyValueDeserializationSchema schema = new JSONKeyValueDeserializationSchema(false);
         schema.open(new DummyInitializationContext());
@@ -54,12 +56,11 @@ public class JSONKeyValueDeserializationSchemaTest {
 
     @Test
     public void testDeserializeWithoutKey() throws Exception {
-        ObjectMapper mapper = new ObjectMapper();
         byte[] serializedKey = null;
 
-        ObjectNode initialValue = mapper.createObjectNode();
+        ObjectNode initialValue = OBJECT_MAPPER.createObjectNode();
         initialValue.put("word", "world");
-        byte[] serializedValue = mapper.writeValueAsBytes(initialValue);
+        byte[] serializedValue = OBJECT_MAPPER.writeValueAsBytes(initialValue);
 
         JSONKeyValueDeserializationSchema schema = new JSONKeyValueDeserializationSchema(false);
         schema.open(new DummyInitializationContext());
@@ -88,10 +89,9 @@ public class JSONKeyValueDeserializationSchemaTest {
 
     @Test
     public void testDeserializeWithoutValue() throws Exception {
-        ObjectMapper mapper = new ObjectMapper();
-        ObjectNode initialKey = mapper.createObjectNode();
+        ObjectNode initialKey = OBJECT_MAPPER.createObjectNode();
         initialKey.put("index", 4);
-        byte[] serializedKey = mapper.writeValueAsBytes(initialKey);
+        byte[] serializedKey = OBJECT_MAPPER.writeValueAsBytes(initialKey);
 
         byte[] serializedValue = null;
 
@@ -107,14 +107,13 @@ public class JSONKeyValueDeserializationSchemaTest {
 
     @Test
     public void testDeserializeWithMetadata() throws Exception {
-        ObjectMapper mapper = new ObjectMapper();
-        ObjectNode initialKey = mapper.createObjectNode();
+        ObjectNode initialKey = OBJECT_MAPPER.createObjectNode();
         initialKey.put("index", 4);
-        byte[] serializedKey = mapper.writeValueAsBytes(initialKey);
+        byte[] serializedKey = OBJECT_MAPPER.writeValueAsBytes(initialKey);
 
-        ObjectNode initialValue = mapper.createObjectNode();
+        ObjectNode initialValue = OBJECT_MAPPER.createObjectNode();
         initialValue.put("word", "world");
-        byte[] serializedValue = mapper.writeValueAsBytes(initialValue);
+        byte[] serializedValue = OBJECT_MAPPER.writeValueAsBytes(initialValue);
 
         JSONKeyValueDeserializationSchema schema = new JSONKeyValueDeserializationSchema(true);
         schema.open(new DummyInitializationContext());
diff --git a/flink-core/pom.xml b/flink-core/pom.xml
index ef3061f1ac5..45be24efc05 100644
--- a/flink-core/pom.xml
+++ b/flink-core/pom.xml
@@ -51,6 +51,11 @@ under the License.
 			<artifactId>flink-shaded-asm-9</artifactId>
 		</dependency>
 
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-shaded-jackson</artifactId>
+		</dependency>
+
 		<!-- standard utilities -->
 		<dependency>
 			<groupId>org.apache.commons</groupId>
@@ -114,12 +119,6 @@ under the License.
 			<scope>test</scope>
 		</dependency>
 
-		<dependency>
-			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-shaded-jackson</artifactId>
-			<scope>test</scope>
-		</dependency>
-
 		<dependency>
 			<groupId>org.projectlombok</groupId>
 			<artifactId>lombok</artifactId>
diff --git a/flink-core/src/main/java/org/apache/flink/util/jackson/JacksonMapperFactory.java b/flink-core/src/main/java/org/apache/flink/util/jackson/JacksonMapperFactory.java
new file mode 100644
index 00000000000..2c0b57f0471
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/util/jackson/JacksonMapperFactory.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.util.jackson;
+
+import org.apache.flink.annotation.Experimental;
+
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.CsvMapper;
+
+/** Factory for Jackson mappers. */
+@Experimental
+public final class JacksonMapperFactory {
+
+    public static ObjectMapper createObjectMapper() {
+        final ObjectMapper objectMapper = new ObjectMapper();
+        return objectMapper;
+    }
+
+    public static CsvMapper createCsvMapper() {
+        final CsvMapper csvMapper = new CsvMapper();
+        return csvMapper;
+    }
+
+    private JacksonMapperFactory() {}
+}
diff --git a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonNodeDeserializationSchemaTest.java b/flink-core/src/test/java/org/apache/flink/util/jackson/JacksonMapperFactoryTest.java
similarity index 52%
copy from flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonNodeDeserializationSchemaTest.java
copy to flink-core/src/test/java/org/apache/flink/util/jackson/JacksonMapperFactoryTest.java
index 90751525feb..61f2057ca82 100644
--- a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonNodeDeserializationSchemaTest.java
+++ b/flink-core/src/test/java/org/apache/flink/util/jackson/JacksonMapperFactoryTest.java
@@ -15,35 +15,30 @@
  * limitations under the License.
  */
 
-package org.apache.flink.formats.json;
-
-import org.apache.flink.connector.testutils.formats.DummyInitializationContext;
+package org.apache.flink.util.jackson;
 
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.CsvMapper;
 
 import org.junit.jupiter.api.Test;
 
-import java.io.IOException;
-
 import static org.assertj.core.api.Assertions.assertThat;
 
-/** Tests for the {@link JsonNodeDeserializationSchema}. */
-@SuppressWarnings("deprecation")
-class JsonNodeDeserializationSchemaTest {
+class JacksonMapperFactoryTest {
 
     @Test
-    void testDeserialize() throws IOException {
-        ObjectMapper mapper = new ObjectMapper();
-        ObjectNode initialValue = mapper.createObjectNode();
-        initialValue.put("key", 4).put("value", "world");
-        byte[] serializedValue = mapper.writeValueAsBytes(initialValue);
-
-        JsonNodeDeserializationSchema schema = new JsonNodeDeserializationSchema();
-        schema.open(new DummyInitializationContext());
-        ObjectNode deserializedValue = schema.deserialize(serializedValue);
-
-        assertThat(deserializedValue.get("key").asInt()).isEqualTo(4);
-        assertThat(deserializedValue.get("value").asText()).isEqualTo("world");
+    void testCreateObjectMapperReturnDistinctMappers() {
+        final ObjectMapper mapper1 = JacksonMapperFactory.createObjectMapper();
+        final ObjectMapper mapper2 = JacksonMapperFactory.createObjectMapper();
+
+        assertThat(mapper1).isNotSameAs(mapper2);
+    }
+
+    @Test
+    void testCreateCsvMapperReturnDistinctMappers() {
+        final CsvMapper mapper1 = JacksonMapperFactory.createCsvMapper();
+        final CsvMapper mapper2 = JacksonMapperFactory.createCsvMapper();
+
+        assertThat(mapper1).isNotSameAs(mapper2);
     }
 }
diff --git a/flink-docs/src/main/java/org/apache/flink/docs/rest/OpenApiSpecGenerator.java b/flink-docs/src/main/java/org/apache/flink/docs/rest/OpenApiSpecGenerator.java
index 0265ff2c957..fe17e807f8f 100644
--- a/flink-docs/src/main/java/org/apache/flink/docs/rest/OpenApiSpecGenerator.java
+++ b/flink-docs/src/main/java/org/apache/flink/docs/rest/OpenApiSpecGenerator.java
@@ -44,6 +44,7 @@ import org.apache.flink.runtime.util.EnvironmentInformation;
 import org.apache.flink.runtime.webmonitor.handlers.JarUploadHeaders;
 import org.apache.flink.util.ConfigurationException;
 import org.apache.flink.util.SerializedThrowable;
+import org.apache.flink.util.jackson.JacksonMapperFactory;
 
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.SerializationFeature;
@@ -100,7 +101,8 @@ public class OpenApiSpecGenerator {
     static {
         ModelResolver.enumsAsRef = true;
         final ObjectMapper mapper =
-                new ObjectMapper().configure(SerializationFeature.ORDER_MAP_ENTRIES_BY_KEYS, true);
+                JacksonMapperFactory.createObjectMapper()
+                        .configure(SerializationFeature.ORDER_MAP_ENTRIES_BY_KEYS, true);
         modelConverterContext =
                 new ModelConverterContextImpl(Collections.singletonList(new ModelResolver(mapper)));
     }
diff --git a/flink-docs/src/main/java/org/apache/flink/docs/rest/RestAPIDocGenerator.java b/flink-docs/src/main/java/org/apache/flink/docs/rest/RestAPIDocGenerator.java
index 489bc6415ed..a21ec24cc3e 100644
--- a/flink-docs/src/main/java/org/apache/flink/docs/rest/RestAPIDocGenerator.java
+++ b/flink-docs/src/main/java/org/apache/flink/docs/rest/RestAPIDocGenerator.java
@@ -32,6 +32,7 @@ import org.apache.flink.runtime.rest.util.DocumentingDispatcherRestEndpoint;
 import org.apache.flink.runtime.rest.util.DocumentingRestEndpoint;
 import org.apache.flink.runtime.rest.versioning.RuntimeRestAPIVersion;
 import org.apache.flink.util.ConfigurationException;
+import org.apache.flink.util.jackson.JacksonMapperFactory;
 
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.SerializableString;
@@ -99,7 +100,7 @@ public class RestAPIDocGenerator {
     private static final JsonSchemaGenerator schemaGen;
 
     static {
-        mapper = new ObjectMapper();
+        mapper = JacksonMapperFactory.createObjectMapper();
         mapper.getFactory().setCharacterEscapes(new HTMLCharacterEscapes());
         mapper.configure(SerializationFeature.ORDER_MAP_ENTRIES_BY_KEYS, true);
         schemaGen = new JsonSchemaGenerator(mapper);
diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/table/test/KinesisFirehoseTableITTest.java b/flink-end-to-end-tests/flink-end-to-end-tests-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/table/test/KinesisFirehoseTableITTest.java
index 172693c5ac6..ce67a707aec 100644
--- a/flink-end-to-end-tests/flink-end-to-end-tests-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/table/test/KinesisFirehoseTableITTest.java
+++ b/flink-end-to-end-tests/flink-end-to-end-tests-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/table/test/KinesisFirehoseTableITTest.java
@@ -26,10 +26,13 @@ import org.apache.flink.test.util.SQLJobSubmission;
 import org.apache.flink.tests.util.TestUtils;
 import org.apache.flink.util.DockerImageVersions;
 import org.apache.flink.util.TestLogger;
+import org.apache.flink.util.jackson.JacksonMapperFactory;
+
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
 
-import com.fasterxml.jackson.annotation.JsonProperty;
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.ObjectMapper;
 import org.assertj.core.api.Assertions;
 import org.junit.After;
 import org.junit.AfterClass;
@@ -79,6 +82,8 @@ public class KinesisFirehoseTableITTest extends TestLogger {
     private static final String BUCKET_NAME = "s3-firehose";
     private static final String STREAM_NAME = "s3-stream";
 
+    private static final ObjectMapper OBJECT_MAPPER = JacksonMapperFactory.createObjectMapper();
+
     private final Path sqlConnectorFirehoseJar = TestUtils.getResource(".*firehose.jar");
 
     private SdkHttpClient httpClient;
@@ -207,7 +212,7 @@ public class KinesisFirehoseTableITTest extends TestLogger {
 
     private <T> T fromJson(final String json, final Class<T> type) {
         try {
-            return new ObjectMapper().readValue(json, type);
+            return OBJECT_MAPPER.readValue(json, type);
         } catch (JsonProcessingException e) {
             throw new RuntimeException(String.format("Failed to deserialize json: %s", json), e);
         }
@@ -218,6 +223,7 @@ public class KinesisFirehoseTableITTest extends TestLogger {
         private final String code;
         private final int quantity;
 
+        @JsonCreator
         public Order(
                 @JsonProperty("code") final String code, @JsonProperty("quantity") int quantity) {
             this.code = code;
diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/table/test/KinesisStreamsTableApiIT.java b/flink-end-to-end-tests/flink-end-to-end-tests-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/table/test/KinesisStreamsTableApiIT.java
index 1f415c4bcec..d52ca2cb83d 100644
--- a/flink-end-to-end-tests/flink-end-to-end-tests-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/table/test/KinesisStreamsTableApiIT.java
+++ b/flink-end-to-end-tests/flink-end-to-end-tests-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/table/test/KinesisStreamsTableApiIT.java
@@ -27,12 +27,13 @@ import org.apache.flink.connectors.kinesis.testutils.KinesaliteContainer;
 import org.apache.flink.test.util.SQLJobSubmission;
 import org.apache.flink.tests.util.TestUtils;
 import org.apache.flink.util.DockerImageVersions;
+import org.apache.flink.util.jackson.JacksonMapperFactory;
 
 import org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableList;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
 
-import com.fasterxml.jackson.annotation.JsonProperty;
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.ObjectMapper;
 import org.assertj.core.api.Assertions;
 import org.junit.After;
 import org.junit.AfterClass;
@@ -78,6 +79,8 @@ public class KinesisStreamsTableApiIT {
     private static final String ORDERS_STREAM = "orders";
     private static final String INTER_CONTAINER_KINESALITE_ALIAS = "kinesalite";
     private static final String DEFAULT_FIRST_SHARD_NAME = "shardId-000000000000";
+    private static final ObjectMapper OBJECT_MAPPER = JacksonMapperFactory.createObjectMapper();
+
     private SdkHttpClient httpClient;
     private KinesisClient kinesisClient;
 
@@ -202,7 +205,7 @@ public class KinesisStreamsTableApiIT {
 
     private <T> T fromJson(final String json, final Class<T> type) {
         try {
-            return new ObjectMapper().readValue(json, type);
+            return OBJECT_MAPPER.readValue(json, type);
         } catch (JsonProcessingException e) {
             throw new RuntimeException("Test Failure.", e);
         }
diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/FlinkDistribution.java b/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/FlinkDistribution.java
index e9b939adf4d..05f11f820b8 100644
--- a/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/FlinkDistribution.java
+++ b/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/FlinkDistribution.java
@@ -29,6 +29,7 @@ import org.apache.flink.tests.util.AutoClosableProcess;
 import org.apache.flink.tests.util.TestUtils;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.function.FutureTaskWithException;
+import org.apache.flink.util.jackson.JacksonMapperFactory;
 
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
@@ -75,7 +76,7 @@ public final class FlinkDistribution {
 
     private static final Logger LOG = LoggerFactory.getLogger(FlinkDistribution.class);
 
-    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+    private static final ObjectMapper OBJECT_MAPPER = JacksonMapperFactory.createObjectMapper();
 
     private static final Pattern ROOT_LOGGER_PATTERN = Pattern.compile("(rootLogger.level =).*");
     private static final String HIVE_DRIVER = "org.apache.hive.jdbc.HiveDriver";
diff --git a/flink-end-to-end-tests/flink-streaming-kinesis-test/src/test/java/org/apache/flink/streaming/kinesis/test/KinesisTableApiITCase.java b/flink-end-to-end-tests/flink-streaming-kinesis-test/src/test/java/org/apache/flink/streaming/kinesis/test/KinesisTableApiITCase.java
index 4cd75934ed7..b637108e5db 100644
--- a/flink-end-to-end-tests/flink-streaming-kinesis-test/src/test/java/org/apache/flink/streaming/kinesis/test/KinesisTableApiITCase.java
+++ b/flink-end-to-end-tests/flink-streaming-kinesis-test/src/test/java/org/apache/flink/streaming/kinesis/test/KinesisTableApiITCase.java
@@ -28,11 +28,12 @@ import org.apache.flink.test.util.SQLJobSubmission;
 import org.apache.flink.tests.util.TestUtils;
 import org.apache.flink.util.DockerImageVersions;
 import org.apache.flink.util.TestLogger;
+import org.apache.flink.util.jackson.JacksonMapperFactory;
 
 import org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableList;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
 
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.ObjectMapper;
 import org.hamcrest.Matchers;
 import org.junit.After;
 import org.junit.AfterClass;
@@ -67,6 +68,8 @@ public class KinesisTableApiITCase extends TestLogger {
     private static final String LARGE_ORDERS_STREAM = "large_orders";
     private static final String INTER_CONTAINER_KINESALITE_ALIAS = "kinesalite";
 
+    private static final ObjectMapper OBJECT_MAPPER = JacksonMapperFactory.createObjectMapper();
+
     private final Path sqlConnectorKinesisJar = TestUtils.getResource(".*kinesis.jar");
     private static final Network network = Network.newNetwork();
 
@@ -166,7 +169,7 @@ public class KinesisTableApiITCase extends TestLogger {
 
     private <T> String toJson(final T object) {
         try {
-            return new ObjectMapper().writeValueAsString(object);
+            return OBJECT_MAPPER.writeValueAsString(object);
         } catch (JsonProcessingException e) {
             throw new RuntimeException("Test Failure.", e);
         }
@@ -174,7 +177,7 @@ public class KinesisTableApiITCase extends TestLogger {
 
     private <T> T fromJson(final String json, final Class<T> type) {
         try {
-            return new ObjectMapper().readValue(json, type);
+            return OBJECT_MAPPER.readValue(json, type);
         } catch (JsonProcessingException e) {
             throw new RuntimeException("Test Failure.", e);
         }
diff --git a/flink-end-to-end-tests/flink-streaming-kinesis-test/src/test/java/org/apache/flink/streaming/kinesis/test/model/Order.java b/flink-end-to-end-tests/flink-streaming-kinesis-test/src/test/java/org/apache/flink/streaming/kinesis/test/model/Order.java
index 15c158c4c8f..58cec308eae 100644
--- a/flink-end-to-end-tests/flink-streaming-kinesis-test/src/test/java/org/apache/flink/streaming/kinesis/test/model/Order.java
+++ b/flink-end-to-end-tests/flink-streaming-kinesis-test/src/test/java/org/apache/flink/streaming/kinesis/test/model/Order.java
@@ -17,7 +17,7 @@
 
 package org.apache.flink.streaming.kinesis.test.model;
 
-import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
 
 import java.util.Objects;
 
diff --git a/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvBulkWriter.java b/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvBulkWriter.java
index 69b3f9bce28..f9a8e01bb07 100644
--- a/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvBulkWriter.java
+++ b/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvBulkWriter.java
@@ -21,6 +21,7 @@ package org.apache.flink.formats.csv;
 import org.apache.flink.api.common.serialization.BulkWriter;
 import org.apache.flink.core.fs.FSDataOutputStream;
 import org.apache.flink.formats.common.Converter;
+import org.apache.flink.util.jackson.JacksonMapperFactory;
 
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectWriter;
@@ -89,7 +90,7 @@ class CsvBulkWriter<T, R, C> implements BulkWriter<T> {
      */
     static <T> CsvBulkWriter<T, T, Void> forPojo(Class<T> pojoClass, FSDataOutputStream stream) {
         final Converter<T, T, Void> converter = (value, context) -> value;
-        final CsvMapper csvMapper = new CsvMapper();
+        final CsvMapper csvMapper = JacksonMapperFactory.createCsvMapper();
         final CsvSchema schema = csvMapper.schemaFor(pojoClass).withoutQuoteChar();
         return new CsvBulkWriter<>(csvMapper, schema, converter, null, stream);
     }
diff --git a/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvFileFormatFactory.java b/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvFileFormatFactory.java
index b0a0b27d5e7..f977acd5b4f 100644
--- a/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvFileFormatFactory.java
+++ b/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvFileFormatFactory.java
@@ -46,6 +46,7 @@ import org.apache.flink.table.factories.DynamicTableFactory;
 import org.apache.flink.table.plan.stats.TableStats;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.util.jackson.JacksonMapperFactory;
 
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
@@ -132,7 +133,7 @@ public class CsvFileFormatFactory implements BulkReaderFormatFactory, BulkWriter
                                     .createRowConverter(projectedRowType, true);
             CsvReaderFormat<RowData> csvReaderFormat =
                     new CsvReaderFormat<>(
-                            () -> new CsvMapper(),
+                            () -> JacksonMapperFactory.createCsvMapper(),
                             ignored -> schema,
                             JsonNode.class,
                             converter,
@@ -178,7 +179,7 @@ public class CsvFileFormatFactory implements BulkReaderFormatFactory, BulkWriter
         final RowDataToCsvConverter converter = RowDataToCsvConverters.createRowConverter(rowType);
 
         return out -> {
-            final CsvMapper mapper = new CsvMapper();
+            final CsvMapper mapper = JacksonMapperFactory.createCsvMapper();
             final ObjectNode container = mapper.createObjectNode();
 
             final RowDataToCsvConverter.RowDataToCsvFormatConverterContext converterContext =
diff --git a/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvReaderFormat.java b/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvReaderFormat.java
index c2d14008cb9..02f3afbe43e 100644
--- a/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvReaderFormat.java
+++ b/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvReaderFormat.java
@@ -27,6 +27,7 @@ import org.apache.flink.core.fs.FSDataInputStream;
 import org.apache.flink.formats.common.Converter;
 import org.apache.flink.util.function.SerializableFunction;
 import org.apache.flink.util.function.SerializableSupplier;
+import org.apache.flink.util.jackson.JacksonMapperFactory;
 
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.MappingIterator;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.CsvMapper;
@@ -108,7 +109,7 @@ public class CsvReaderFormat<T> extends SimpleStreamFormat<T> {
      */
     public static <T> CsvReaderFormat<T> forSchema(
             CsvSchema schema, TypeInformation<T> typeInformation) {
-        return forSchema(() -> new CsvMapper(), ignored -> schema, typeInformation);
+        return forSchema(JacksonMapperFactory::createCsvMapper, ignored -> schema, typeInformation);
     }
 
     /**
@@ -163,7 +164,7 @@ public class CsvReaderFormat<T> extends SimpleStreamFormat<T> {
      */
     public static <T> CsvReaderFormat<T> forPojo(Class<T> pojoType) {
         return forSchema(
-                () -> new CsvMapper(),
+                () -> JacksonMapperFactory.createCsvMapper(),
                 mapper -> mapper.schemaFor(pojoType).withoutQuoteChar(),
                 TypeInformation.of(pojoType));
     }
diff --git a/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowDataDeserializationSchema.java b/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowDataDeserializationSchema.java
index 0c3f68e97f8..b77312d41b8 100644
--- a/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowDataDeserializationSchema.java
+++ b/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowDataDeserializationSchema.java
@@ -24,10 +24,10 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.jackson.JacksonMapperFactory;
 
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectReader;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.CsvMapper;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.CsvSchema;
 
 import javax.annotation.Nullable;
@@ -77,7 +77,8 @@ public final class CsvRowDataDeserializationSchema implements DeserializationSch
 
     @Override
     public void open(InitializationContext context) {
-        this.objectReader = new CsvMapper().readerFor(JsonNode.class).with(csvSchema);
+        this.objectReader =
+                JacksonMapperFactory.createCsvMapper().readerFor(JsonNode.class).with(csvSchema);
     }
 
     /** A builder for creating a {@link CsvRowDataDeserializationSchema}. */
diff --git a/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowDataSerializationSchema.java b/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowDataSerializationSchema.java
index f7fd810e691..737b74eda20 100644
--- a/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowDataSerializationSchema.java
+++ b/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowDataSerializationSchema.java
@@ -24,6 +24,7 @@ import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.function.SerializableSupplier;
+import org.apache.flink.util.jackson.JacksonMapperFactory;
 
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
@@ -152,7 +153,7 @@ public final class CsvRowDataSerializationSchema implements SerializationSchema<
                     rowType,
                     csvSchema,
                     () -> {
-                        final CsvMapper csvMapper = new CsvMapper();
+                        final CsvMapper csvMapper = JacksonMapperFactory.createCsvMapper();
                         csvMapper.configure(
                                 JsonGenerator.Feature.WRITE_BIGDECIMAL_AS_PLAIN,
                                 !isScientificNotation);
diff --git a/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowDeserializationSchema.java b/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowDeserializationSchema.java
index 24153b4de46..b2a91a9fb34 100644
--- a/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowDeserializationSchema.java
+++ b/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowDeserializationSchema.java
@@ -28,10 +28,10 @@ import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;
 import org.apache.flink.api.java.typeutils.RowTypeInfo;
 import org.apache.flink.types.Row;
 import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.jackson.JacksonMapperFactory;
 
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectReader;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.CsvMapper;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.CsvSchema;
 
 import java.io.IOException;
@@ -93,7 +93,8 @@ public final class CsvRowDeserializationSchema implements DeserializationSchema<
 
     @Override
     public void open(InitializationContext context) throws Exception {
-        objectReader = new CsvMapper().readerFor(JsonNode.class).with(csvSchema);
+        objectReader =
+                JacksonMapperFactory.createCsvMapper().readerFor(JsonNode.class).with(csvSchema);
     }
 
     /** A builder for creating a {@link CsvRowDeserializationSchema}. */
diff --git a/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowSerializationSchema.java b/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowSerializationSchema.java
index eca56958d96..53f4edf16e3 100644
--- a/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowSerializationSchema.java
+++ b/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowSerializationSchema.java
@@ -28,6 +28,7 @@ import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;
 import org.apache.flink.api.java.typeutils.RowTypeInfo;
 import org.apache.flink.types.Row;
 import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.jackson.JacksonMapperFactory;
 
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectWriter;
@@ -93,7 +94,7 @@ public final class CsvRowSerializationSchema implements SerializationSchema<Row>
 
     @Override
     public void open(InitializationContext context) throws Exception {
-        this.csvMapper = new CsvMapper();
+        this.csvMapper = JacksonMapperFactory.createCsvMapper();
         this.objectWriter = csvMapper.writer(csvSchema);
     }
 
diff --git a/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/RowCsvInputFormat.java b/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/RowCsvInputFormat.java
index 75a22cb2dd6..53634c5cc83 100644
--- a/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/RowCsvInputFormat.java
+++ b/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/RowCsvInputFormat.java
@@ -24,10 +24,10 @@ import org.apache.flink.core.fs.FileInputSplit;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.formats.csv.CsvRowDeserializationSchema.RuntimeConverter;
 import org.apache.flink.types.Row;
+import org.apache.flink.util.jackson.JacksonMapperFactory;
 
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.MappingIterator;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.CsvMapper;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.CsvSchema;
 
 import java.io.IOException;
@@ -90,7 +90,7 @@ public class RowCsvInputFormat extends AbstractCsvInputFormat<Row> {
         super.open(split);
         prepareConverter();
         this.iterator =
-                new CsvMapper()
+                JacksonMapperFactory.createCsvMapper()
                         .readerFor(JsonNode.class)
                         .with(csvSchema)
                         .readValues(csvInputStream);
diff --git a/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/DataStreamCsvITCase.java b/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/DataStreamCsvITCase.java
index ea073432116..363a17afdef 100644
--- a/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/DataStreamCsvITCase.java
+++ b/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/DataStreamCsvITCase.java
@@ -37,6 +37,7 @@ import org.apache.flink.streaming.api.operators.collect.ClientAndIterator;
 import org.apache.flink.test.junit5.MiniClusterExtension;
 import org.apache.flink.util.TestLoggerExtension;
 import org.apache.flink.util.function.FunctionWithException;
+import org.apache.flink.util.jackson.JacksonMapperFactory;
 
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonPropertyOrder;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.CsvMapper;
@@ -74,6 +75,8 @@ import static org.assertj.core.api.Assertions.assertThat;
 @ExtendWith({TestLoggerExtension.class})
 public class DataStreamCsvITCase {
 
+    private static final CsvMapper CSV_MAPPER = JacksonMapperFactory.createCsvMapper();
+
     private static final int PARALLELISM = 4;
 
     @TempDir File outDir;
@@ -163,7 +166,7 @@ public class DataStreamCsvITCase {
 
         final CsvReaderFormat<CityPojo> csvFormat =
                 CsvReaderFormat.forSchema(
-                        () -> new CsvMapper(),
+                        () -> CSV_MAPPER,
                         mapper ->
                                 mapper.schemaFor(CityPojo.class)
                                         .withoutQuoteChar()
@@ -229,9 +232,8 @@ public class DataStreamCsvITCase {
 
     private static <T> BulkWriter.Factory<T> factoryForPojo(Class<T> pojoClass) {
         final Converter<T, T, Void> converter = (value, context) -> value;
-        final CsvMapper csvMapper = new CsvMapper();
-        final CsvSchema schema = csvMapper.schemaFor(pojoClass).withoutQuoteChar();
-        return (out) -> new CsvBulkWriter<>(csvMapper, schema, converter, null, out);
+        final CsvSchema schema = CSV_MAPPER.schemaFor(pojoClass).withoutQuoteChar();
+        return (out) -> new CsvBulkWriter<>(CSV_MAPPER, schema, converter, null, out);
     }
 
     private static Map<File, String> getFileContentByPath(File directory) throws IOException {
diff --git a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonDeserializationSchema.java b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonDeserializationSchema.java
index fd28712d56a..cc244b0a0c2 100644
--- a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonDeserializationSchema.java
+++ b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonDeserializationSchema.java
@@ -21,6 +21,7 @@ import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.serialization.AbstractDeserializationSchema;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.util.function.SerializableSupplier;
+import org.apache.flink.util.jackson.JacksonMapperFactory;
 
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
 
@@ -37,11 +38,11 @@ public class JsonDeserializationSchema<T> extends AbstractDeserializationSchema<
     protected transient ObjectMapper mapper;
 
     public JsonDeserializationSchema(Class<T> clazz) {
-        this(clazz, () -> new ObjectMapper());
+        this(clazz, JacksonMapperFactory::createObjectMapper);
     }
 
     public JsonDeserializationSchema(TypeInformation<T> typeInformation) {
-        this(typeInformation, () -> new ObjectMapper());
+        this(typeInformation, JacksonMapperFactory::createObjectMapper);
     }
 
     public JsonDeserializationSchema(
diff --git a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDataDeserializationSchema.java b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDataDeserializationSchema.java
index 805b299c11c..9a57bac203b 100644
--- a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDataDeserializationSchema.java
+++ b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDataDeserializationSchema.java
@@ -26,6 +26,7 @@ import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.types.logical.DecimalType;
 import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.table.types.logical.utils.LogicalTypeChecks;
+import org.apache.flink.util.jackson.JacksonMapperFactory;
 
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.json.JsonReadFeature;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.DeserializationFeature;
@@ -97,7 +98,7 @@ public class JsonRowDataDeserializationSchema implements DeserializationSchema<R
     @Override
     public void open(InitializationContext context) throws Exception {
         objectMapper =
-                new ObjectMapper()
+                JacksonMapperFactory.createObjectMapper()
                         .configure(
                                 JsonReadFeature.ALLOW_UNESCAPED_CONTROL_CHARS.mappedFeature(),
                                 true);
diff --git a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDataSerializationSchema.java b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDataSerializationSchema.java
index 6a8c619eeac..c8b7f73b64d 100644
--- a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDataSerializationSchema.java
+++ b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDataSerializationSchema.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.serialization.SerializationSchema;
 import org.apache.flink.formats.common.TimestampFormat;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.util.jackson.JacksonMapperFactory;
 
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
@@ -86,7 +87,7 @@ public class JsonRowDataSerializationSchema implements SerializationSchema<RowDa
     @Override
     public void open(InitializationContext context) throws Exception {
         mapper =
-                new ObjectMapper()
+                JacksonMapperFactory.createObjectMapper()
                         .configure(
                                 JsonGenerator.Feature.WRITE_BIGDECIMAL_AS_PLAIN,
                                 encodeDecimalAsPlainNumber);
diff --git a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDeserializationSchema.java b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDeserializationSchema.java
index b2b7e6dada5..dd4a9bb9f99 100644
--- a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDeserializationSchema.java
+++ b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDeserializationSchema.java
@@ -30,6 +30,7 @@ import org.apache.flink.api.java.typeutils.RowTypeInfo;
 import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.table.types.logical.utils.LogicalTypeChecks;
 import org.apache.flink.types.Row;
+import org.apache.flink.util.jackson.JacksonMapperFactory;
 
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.DeserializationFeature;
@@ -121,7 +122,7 @@ public class JsonRowDeserializationSchema implements DeserializationSchema<Row>
 
     @Override
     public void open(InitializationContext context) throws Exception {
-        objectMapper = new ObjectMapper();
+        objectMapper = JacksonMapperFactory.createObjectMapper();
         if (hasDecimalType) {
             objectMapper.enable(DeserializationFeature.USE_BIG_DECIMAL_FOR_FLOATS);
         }
diff --git a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowSchemaConverter.java b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowSchemaConverter.java
index fe412320456..d7761851ee4 100644
--- a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowSchemaConverter.java
+++ b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowSchemaConverter.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeinfo.Types;
 import org.apache.flink.types.Row;
 import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.jackson.JacksonMapperFactory;
 
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParser;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
@@ -97,7 +98,7 @@ public final class JsonRowSchemaConverter {
     @SuppressWarnings("unchecked")
     public static <T> TypeInformation<T> convert(String jsonSchema) {
         Preconditions.checkNotNull(jsonSchema, "JSON schema");
-        final ObjectMapper mapper = new ObjectMapper();
+        final ObjectMapper mapper = JacksonMapperFactory.createObjectMapper();
         mapper.getFactory()
                 .enable(JsonParser.Feature.ALLOW_COMMENTS)
                 .enable(JsonParser.Feature.ALLOW_UNQUOTED_FIELD_NAMES)
diff --git a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowSerializationSchema.java b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowSerializationSchema.java
index e789307967e..f185d211bf9 100644
--- a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowSerializationSchema.java
+++ b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowSerializationSchema.java
@@ -28,6 +28,7 @@ import org.apache.flink.api.java.typeutils.RowTypeInfo;
 import org.apache.flink.types.Row;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.WrappingRuntimeException;
+import org.apache.flink.util.jackson.JacksonMapperFactory;
 
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
@@ -96,7 +97,7 @@ public class JsonRowSerializationSchema implements SerializationSchema<Row> {
 
     @Override
     public void open(InitializationContext context) throws Exception {
-        mapper = new ObjectMapper();
+        mapper = JacksonMapperFactory.createObjectMapper();
     }
 
     /** Builder for {@link JsonRowSerializationSchema}. */
diff --git a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonNodeDeserializationSchemaTest.java b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonNodeDeserializationSchemaTest.java
index 90751525feb..e6b2a3e05cb 100644
--- a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonNodeDeserializationSchemaTest.java
+++ b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonNodeDeserializationSchemaTest.java
@@ -18,6 +18,7 @@
 package org.apache.flink.formats.json;
 
 import org.apache.flink.connector.testutils.formats.DummyInitializationContext;
+import org.apache.flink.util.jackson.JacksonMapperFactory;
 
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
@@ -34,7 +35,7 @@ class JsonNodeDeserializationSchemaTest {
 
     @Test
     void testDeserialize() throws IOException {
-        ObjectMapper mapper = new ObjectMapper();
+        ObjectMapper mapper = JacksonMapperFactory.createObjectMapper();
         ObjectNode initialValue = mapper.createObjectNode();
         initialValue.put("key", 4).put("value", "world");
         byte[] serializedValue = mapper.writeValueAsBytes(initialValue);
diff --git a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonRowDataSerDeSchemaTest.java b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonRowDataSerDeSchemaTest.java
index 888be3b9fe7..883c3f09134 100644
--- a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonRowDataSerDeSchemaTest.java
+++ b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonRowDataSerDeSchemaTest.java
@@ -31,6 +31,7 @@ import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.types.Row;
+import org.apache.flink.util.jackson.JacksonMapperFactory;
 
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ArrayNode;
@@ -83,6 +84,8 @@ import static org.assertj.core.api.Assertions.assertThatThrownBy;
  */
 class JsonRowDataSerDeSchemaTest {
 
+    private static final ObjectMapper OBJECT_MAPPER = JacksonMapperFactory.createObjectMapper();
+
     @Test
     void testSerDe() throws Exception {
         byte tinyint = 'c';
@@ -115,11 +118,10 @@ class JsonRowDataSerDeSchemaTest {
         innerMap.put("key", 234);
         nestedMap.put("inner_map", innerMap);
 
-        ObjectMapper objectMapper = new ObjectMapper();
-        ArrayNode doubleNode = objectMapper.createArrayNode().add(1.1D).add(2.2D).add(3.3D);
+        ArrayNode doubleNode = OBJECT_MAPPER.createArrayNode().add(1.1D).add(2.2D).add(3.3D);
 
         // Root
-        ObjectNode root = objectMapper.createObjectNode();
+        ObjectNode root = OBJECT_MAPPER.createObjectNode();
         root.put("bool", true);
         root.put("tinyint", tinyint);
         root.put("smallint", smallint);
@@ -139,7 +141,7 @@ class JsonRowDataSerDeSchemaTest {
         root.putObject("multiSet").put("element", 2);
         root.putObject("map2map").putObject("inner_map").put("key", 234);
 
-        byte[] serializedJson = objectMapper.writeValueAsBytes(root);
+        byte[] serializedJson = OBJECT_MAPPER.writeValueAsBytes(root);
 
         DataType dataType =
                 ROW(
@@ -220,8 +222,7 @@ class JsonRowDataSerDeSchemaTest {
         double doubleValue = random.nextDouble();
         float floatValue = random.nextFloat();
 
-        ObjectMapper objectMapper = new ObjectMapper();
-        ObjectNode root = objectMapper.createObjectNode();
+        ObjectNode root = OBJECT_MAPPER.createObjectNode();
         root.put("bool", String.valueOf(bool));
         root.put("int", String.valueOf(integer));
         root.put("bigint", String.valueOf(bigint));
@@ -230,7 +231,7 @@ class JsonRowDataSerDeSchemaTest {
         root.put("float1", String.valueOf(floatValue));
         root.put("float2", new BigDecimal(floatValue));
 
-        byte[] serializedJson = objectMapper.writeValueAsBytes(root);
+        byte[] serializedJson = OBJECT_MAPPER.writeValueAsBytes(root);
 
         DataType dataType =
                 ROW(
@@ -296,11 +297,9 @@ class JsonRowDataSerDeSchemaTest {
                         true);
         open(serializationSchema);
 
-        ObjectMapper objectMapper = new ObjectMapper();
-
         // the first row
         {
-            ObjectNode root = objectMapper.createObjectNode();
+            ObjectNode root = OBJECT_MAPPER.createObjectNode();
             root.put("f1", 1);
             root.put("f2", true);
             root.put("f3", "str");
@@ -312,7 +311,7 @@ class JsonRowDataSerDeSchemaTest {
             ObjectNode row = root.putObject("f6");
             row.put("f1", "this is row1");
             row.put("f2", 12);
-            byte[] serializedJson = objectMapper.writeValueAsBytes(root);
+            byte[] serializedJson = OBJECT_MAPPER.writeValueAsBytes(root);
             RowData rowData = deserializationSchema.deserialize(serializedJson);
             byte[] actual = serializationSchema.serialize(rowData);
             assertThat(serializedJson).containsExactly(actual);
@@ -320,7 +319,7 @@ class JsonRowDataSerDeSchemaTest {
 
         // the second row
         {
-            ObjectNode root = objectMapper.createObjectNode();
+            ObjectNode root = OBJECT_MAPPER.createObjectNode();
             root.put("f1", 10);
             root.put("f2", false);
             root.put("f3", "newStr");
@@ -332,7 +331,7 @@ class JsonRowDataSerDeSchemaTest {
             ObjectNode row = root.putObject("f6");
             row.put("f1", "this is row2");
             row.putNull("f2");
-            byte[] serializedJson = objectMapper.writeValueAsBytes(root);
+            byte[] serializedJson = OBJECT_MAPPER.writeValueAsBytes(root);
             RowData rowData = deserializationSchema.deserialize(serializedJson);
             byte[] actual = serializationSchema.serialize(rowData);
             assertThat(serializedJson).containsExactly(actual);
@@ -419,12 +418,10 @@ class JsonRowDataSerDeSchemaTest {
 
     @Test
     void testDeserializationMissingField() throws Exception {
-        ObjectMapper objectMapper = new ObjectMapper();
-
         // Root
-        ObjectNode root = objectMapper.createObjectNode();
+        ObjectNode root = OBJECT_MAPPER.createObjectNode();
         root.put("id", 123123123);
-        byte[] serializedJson = objectMapper.writeValueAsBytes(root);
+        byte[] serializedJson = OBJECT_MAPPER.writeValueAsBytes(root);
 
         DataType dataType = ROW(FIELD("name", STRING()));
         RowType schema = (RowType) dataType.getLogicalType();
@@ -504,14 +501,12 @@ class JsonRowDataSerDeSchemaTest {
                         true);
         open(serializationSchema);
 
-        ObjectMapper objectMapper = new ObjectMapper();
-
-        ObjectNode root = objectMapper.createObjectNode();
+        ObjectNode root = OBJECT_MAPPER.createObjectNode();
         root.put("timestamp3", "1990-10-14 12:12:43.123");
         root.put("timestamp9", "1990-10-14 12:12:43.123456789");
         root.put("timestamp_with_local_timezone3", "1990-10-14 12:12:43.123Z");
         root.put("timestamp_with_local_timezone9", "1990-10-14 12:12:43.123456789Z");
-        byte[] serializedJson = objectMapper.writeValueAsBytes(root);
+        byte[] serializedJson = OBJECT_MAPPER.writeValueAsBytes(root);
         RowData rowData = deserializationSchema.deserialize(serializedJson);
         byte[] actual = serializationSchema.serialize(rowData);
         assertThat(serializedJson).containsExactly(actual);
diff --git a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonRowDeserializationSchemaTest.java b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonRowDeserializationSchemaTest.java
index deee53ae011..81e370c0fd3 100644
--- a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonRowDeserializationSchemaTest.java
+++ b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonRowDeserializationSchemaTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.formats.json;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeinfo.Types;
 import org.apache.flink.types.Row;
+import org.apache.flink.util.jackson.JacksonMapperFactory;
 
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
@@ -52,6 +53,8 @@ import static org.junit.internal.matchers.ThrowableMessageMatcher.hasMessage;
 /** Tests for the {@link JsonRowDeserializationSchema}. */
 public class JsonRowDeserializationSchemaTest {
 
+    private static final ObjectMapper OBJECT_MAPPER = JacksonMapperFactory.createObjectMapper();
+
     @Rule public ExpectedException thrown = ExpectedException.none();
 
     /** Tests simple deserialization using type information. */
@@ -73,10 +76,8 @@ public class JsonRowDeserializationSchemaTest {
         innerMap.put("key", 234);
         nestedMap.put("inner_map", innerMap);
 
-        ObjectMapper objectMapper = new ObjectMapper();
-
         // Root
-        ObjectNode root = objectMapper.createObjectNode();
+        ObjectNode root = OBJECT_MAPPER.createObjectNode();
         root.put("id", id);
         root.put("name", name);
         root.put("bytes", bytes);
@@ -89,7 +90,7 @@ public class JsonRowDeserializationSchemaTest {
         root.putObject("map").put("flink", 123);
         root.putObject("map2map").putObject("inner_map").put("key", 234);
 
-        byte[] serializedJson = objectMapper.writeValueAsBytes(root);
+        byte[] serializedJson = OBJECT_MAPPER.writeValueAsBytes(root);
 
         JsonRowDeserializationSchema deserializationSchema =
                 new JsonRowDeserializationSchema.Builder(
@@ -149,10 +150,8 @@ public class JsonRowDeserializationSchemaTest {
                 };
         final String[] strings = new String[] {"one", "two", "three"};
 
-        final ObjectMapper objectMapper = new ObjectMapper();
-
         // Root
-        ObjectNode root = objectMapper.createObjectNode();
+        ObjectNode root = OBJECT_MAPPER.createObjectNode();
         root.put("id", id.longValue());
         root.putNull("idOrNull");
         root.put("name", name);
@@ -164,7 +163,7 @@ public class JsonRowDeserializationSchemaTest {
         root.putArray("strings").add("one").add("two").add("three");
         root.putObject("nested").put("booleanField", true).put("decimalField", 12);
 
-        final byte[] serializedJson = objectMapper.writeValueAsBytes(root);
+        final byte[] serializedJson = OBJECT_MAPPER.writeValueAsBytes(root);
 
         JsonRowDeserializationSchema deserializationSchema =
                 new JsonRowDeserializationSchema.Builder(
@@ -212,12 +211,10 @@ public class JsonRowDeserializationSchemaTest {
     /** Tests deserialization with non-existing field name. */
     @Test
     public void testMissingNode() throws Exception {
-        ObjectMapper objectMapper = new ObjectMapper();
-
         // Root
-        ObjectNode root = objectMapper.createObjectNode();
+        ObjectNode root = OBJECT_MAPPER.createObjectNode();
         root.put("id", 123123123);
-        byte[] serializedJson = objectMapper.writeValueAsBytes(root);
+        byte[] serializedJson = OBJECT_MAPPER.writeValueAsBytes(root);
 
         TypeInformation<Row> rowTypeInformation =
                 Types.ROW_NAMED(new String[] {"name"}, Types.STRING);
diff --git a/flink-python/src/main/java/org/apache/flink/python/metric/FlinkMetricContainer.java b/flink-python/src/main/java/org/apache/flink/python/metric/FlinkMetricContainer.java
index a184c39c621..df2f84a283b 100644
--- a/flink-python/src/main/java/org/apache/flink/python/metric/FlinkMetricContainer.java
+++ b/flink-python/src/main/java/org/apache/flink/python/metric/FlinkMetricContainer.java
@@ -26,6 +26,7 @@ import org.apache.flink.metrics.Gauge;
 import org.apache.flink.metrics.Meter;
 import org.apache.flink.metrics.MeterView;
 import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.util.jackson.JacksonMapperFactory;
 
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
@@ -56,6 +57,8 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
 @Internal
 public final class FlinkMetricContainer {
 
+    private static final ObjectMapper OBJECT_MAPPER = JacksonMapperFactory.createObjectMapper();
+
     private static final String METRIC_KEY_SEPARATOR =
             GlobalConfiguration.loadConfiguration().getString(MetricOptions.SCOPE_DELIMITER);
 
@@ -206,7 +209,7 @@ public final class FlinkMetricContainer {
     static ArrayList getNameSpaceArray(MetricKey metricKey) {
         MetricName metricName = metricKey.metricName();
         try {
-            return new ObjectMapper().readValue(metricName.getNamespace(), ArrayList.class);
+            return OBJECT_MAPPER.readValue(metricName.getNamespace(), ArrayList.class);
         } catch (JsonProcessingException e) {
             throw new RuntimeException(
                     String.format("Parse namespace[%s] error. ", metricName.getNamespace()), e);
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java
index c7c779c4e93..fff6d42306f 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java
@@ -49,6 +49,7 @@ import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.ShutdownHookUtil;
 import org.apache.flink.util.concurrent.ExecutorThreadFactory;
+import org.apache.flink.util.jackson.JacksonMapperFactory;
 
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
 
@@ -100,7 +101,7 @@ import java.util.function.Consumer;
 public class HistoryServer {
 
     private static final Logger LOG = LoggerFactory.getLogger(HistoryServer.class);
-    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+    private static final ObjectMapper OBJECT_MAPPER = JacksonMapperFactory.createObjectMapper();
 
     private final Configuration config;
 
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java
index 8e5aee9c633..a8d782354a0 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java
@@ -30,6 +30,7 @@ import org.apache.flink.runtime.messages.webmonitor.JobDetails;
 import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
 import org.apache.flink.runtime.rest.messages.JobsOverviewHeaders;
 import org.apache.flink.util.FileUtils;
+import org.apache.flink.util.jackson.JacksonMapperFactory;
 
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonFactory;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator;
@@ -104,7 +105,7 @@ class HistoryServerArchiveFetcher {
     private static final Logger LOG = LoggerFactory.getLogger(HistoryServerArchiveFetcher.class);
 
     private static final JsonFactory jacksonFactory = new JsonFactory();
-    private static final ObjectMapper mapper = new ObjectMapper();
+    private static final ObjectMapper mapper = JacksonMapperFactory.createObjectMapper();
 
     private static final String JSON_FILE_ENDING = ".json";
 
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/rest/compatibility/CompatibilityRoutines.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/rest/compatibility/CompatibilityRoutines.java
index 1bf3b8f99cb..20bbc04501c 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/rest/compatibility/CompatibilityRoutines.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/rest/compatibility/CompatibilityRoutines.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.rest.compatibility;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.runtime.rest.handler.RestHandlerSpecification;
 import org.apache.flink.runtime.rest.messages.UntypedResponseMessageHeaders;
+import org.apache.flink.util.jackson.JacksonMapperFactory;
 
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
@@ -137,7 +138,7 @@ enum CompatibilityRoutines {
                             REQUEST_ROUTINE,
                             RESPONSE_ROUTINE));
 
-    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+    private static final ObjectMapper OBJECT_MAPPER = JacksonMapperFactory.createObjectMapper();
     private static final JsonSchemaGenerator SCHEMA_GENERATOR =
             new JsonSchemaGenerator(OBJECT_MAPPER);
 
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/rest/compatibility/RestAPIStabilityTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/rest/compatibility/RestAPIStabilityTest.java
index 0dd5704b1dd..fec3204ebea 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/rest/compatibility/RestAPIStabilityTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/rest/compatibility/RestAPIStabilityTest.java
@@ -23,6 +23,7 @@ import org.apache.flink.runtime.rest.util.DocumentingDispatcherRestEndpoint;
 import org.apache.flink.runtime.rest.util.DocumentingRestEndpoint;
 import org.apache.flink.runtime.rest.versioning.RuntimeRestAPIVersion;
 import org.apache.flink.util.ConfigurationException;
+import org.apache.flink.util.jackson.JacksonMapperFactory;
 
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.util.DefaultIndenter;
@@ -56,7 +57,7 @@ final class RestAPIStabilityTest {
 
     private static final String SNAPSHOT_RESOURCE_PATTERN = "rest_api_%s.snapshot";
 
-    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+    private static final ObjectMapper OBJECT_MAPPER = JacksonMapperFactory.createObjectMapper();
 
     private static class StableRestApiVersionProvider implements ArgumentsProvider {
 
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebFrontendITCase.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebFrontendITCase.java
index d23a0cc1898..b677a094d54 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebFrontendITCase.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebFrontendITCase.java
@@ -38,6 +38,7 @@ import org.apache.flink.runtime.webmonitor.testutils.HttpTestClient;
 import org.apache.flink.test.junit5.InjectClusterClient;
 import org.apache.flink.test.junit5.InjectClusterRESTAddress;
 import org.apache.flink.test.junit5.MiniClusterExtension;
+import org.apache.flink.util.jackson.JacksonMapperFactory;
 
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
@@ -81,6 +82,7 @@ class WebFrontendITCase {
     private static final int NUM_TASK_MANAGERS = 2;
     private static final int NUM_SLOTS = 4;
 
+    private static final ObjectMapper OBJECT_MAPPER = JacksonMapperFactory.createObjectMapper();
     private static final Configuration CLUSTER_CONFIGURATION = getClusterConfiguration();
 
     @RegisterExtension
@@ -170,8 +172,7 @@ class WebFrontendITCase {
     void getNumberOfTaskManagers(@InjectClusterRESTAddress URI restAddress) throws Exception {
         String json = getFromHTTP("http://localhost:" + restAddress.getPort() + "/taskmanagers/");
 
-        ObjectMapper mapper = new ObjectMapper();
-        JsonNode response = mapper.readTree(json);
+        JsonNode response = OBJECT_MAPPER.readTree(json);
         ArrayNode taskManagers = (ArrayNode) response.get("taskmanagers");
 
         assertThat(taskManagers).hasSize(NUM_TASK_MANAGERS);
@@ -181,8 +182,7 @@ class WebFrontendITCase {
     void getTaskManagers(@InjectClusterRESTAddress URI restAddress) throws Exception {
         String json = getFromHTTP("http://localhost:" + restAddress.getPort() + "/taskmanagers/");
 
-        ObjectMapper mapper = new ObjectMapper();
-        JsonNode parsed = mapper.readTree(json);
+        JsonNode parsed = OBJECT_MAPPER.readTree(json);
         ArrayNode taskManagers = (ArrayNode) parsed.get("taskmanagers");
 
         assertThat(taskManagers).hasSize(NUM_TASK_MANAGERS);
@@ -231,8 +231,7 @@ class WebFrontendITCase {
             throws Exception {
         String json = getFromHTTP("http://localhost:" + restAddress.getPort() + "/taskmanagers/");
 
-        ObjectMapper mapper = new ObjectMapper();
-        JsonNode parsed = mapper.readTree(json);
+        JsonNode parsed = OBJECT_MAPPER.readTree(json);
         ArrayNode taskManagers = (ArrayNode) parsed.get("taskmanagers");
         JsonNode taskManager = taskManagers.get(0);
         String id = taskManager.get("id").asText();
@@ -278,8 +277,7 @@ class WebFrontendITCase {
     private static Map<String, String> fromKeyValueJsonArray(String jsonString) {
         try {
             Map<String, String> map = new HashMap<>();
-            ObjectMapper m = new ObjectMapper();
-            ArrayNode array = (ArrayNode) m.readTree(jsonString);
+            ArrayNode array = (ArrayNode) OBJECT_MAPPER.readTree(jsonString);
 
             Iterator<JsonNode> elements = array.elements();
             while (elements.hasNext()) {
@@ -394,8 +392,7 @@ class WebFrontendITCase {
 
         String json = getFromHTTP("http://localhost:" + restAddress.getPort() + "/jobs/overview");
 
-        ObjectMapper mapper = new ObjectMapper();
-        JsonNode parsed = mapper.readTree(json);
+        JsonNode parsed = OBJECT_MAPPER.readTree(json);
         ArrayNode jsonJobs = (ArrayNode) parsed.get("jobs");
         assertThat(jsonJobs.size()).isEqualTo(1);
         assertThat(jsonJobs.get(0).get("duration").asInt()).isGreaterThan(0);
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerTest.java
index 4c5f7ea2645..8ec80e33358 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerTest.java
@@ -37,6 +37,7 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
 import org.apache.flink.test.util.MiniClusterWithClientResource;
 import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.jackson.JacksonMapperFactory;
 
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonFactory;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator;
@@ -78,7 +79,8 @@ class HistoryServerTest {
                     .enable(JsonGenerator.Feature.AUTO_CLOSE_TARGET)
                     .disable(JsonGenerator.Feature.AUTO_CLOSE_JSON_CONTENT);
     private static final ObjectMapper OBJECT_MAPPER =
-            new ObjectMapper().enable(DeserializationFeature.FAIL_ON_MISSING_CREATOR_PROPERTIES);
+            JacksonMapperFactory.createObjectMapper()
+                    .enable(DeserializationFeature.FAIL_ON_MISSING_CREATOR_PROPERTIES);
 
     private MiniClusterWithClientResource cluster;
     private File jmDirectory;
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/FileSystemJobResultStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/FileSystemJobResultStore.java
index 010ce77e74c..986b7a03343 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/FileSystemJobResultStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/FileSystemJobResultStore.java
@@ -30,6 +30,7 @@ import org.apache.flink.runtime.rest.messages.json.JobResultDeserializer;
 import org.apache.flink.runtime.rest.messages.json.JobResultSerializer;
 import org.apache.flink.runtime.util.NonClosingOutputStreamDecorator;
 import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.jackson.JacksonMapperFactory;
 
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnore;
@@ -66,7 +67,7 @@ public class FileSystemJobResultStore extends AbstractThreadsafeJobResultStore {
         return filename.endsWith(FILE_EXTENSION);
     }
 
-    private final ObjectMapper mapper = new ObjectMapper();
+    private final ObjectMapper mapper = JacksonMapperFactory.createObjectMapper();
 
     private final FileSystem fileSystem;
 
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/history/FsJobArchivist.java b/flink-runtime/src/main/java/org/apache/flink/runtime/history/FsJobArchivist.java
index 67c9c2b7655..65c76fdefaa 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/history/FsJobArchivist.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/history/FsJobArchivist.java
@@ -26,6 +26,7 @@ import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
 import org.apache.flink.util.IOUtils;
+import org.apache.flink.util.jackson.JacksonMapperFactory;
 
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonEncoding;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonFactory;
@@ -47,7 +48,7 @@ public class FsJobArchivist {
 
     private static final Logger LOG = LoggerFactory.getLogger(FsJobArchivist.class);
     private static final JsonFactory jacksonFactory = new JsonFactory();
-    private static final ObjectMapper mapper = new ObjectMapper();
+    private static final ObjectMapper mapper = JacksonMapperFactory.createObjectMapper();
 
     private static final String ARCHIVE = "archive";
     private static final String PATH = "path";
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/util/RestMapperUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/util/RestMapperUtils.java
index 9a6bfba2f6e..eb42f41fabe 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/util/RestMapperUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/util/RestMapperUtils.java
@@ -18,6 +18,8 @@
 
 package org.apache.flink.runtime.rest.util;
 
+import org.apache.flink.util.jackson.JacksonMapperFactory;
+
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.DeserializationFeature;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.SerializationFeature;
@@ -27,7 +29,7 @@ public class RestMapperUtils {
     private static final ObjectMapper objectMapper;
 
     static {
-        objectMapper = new ObjectMapper();
+        objectMapper = JacksonMapperFactory.createObjectMapper();
         objectMapper.enable(
                 DeserializationFeature.FAIL_ON_IGNORED_PROPERTIES,
                 DeserializationFeature.FAIL_ON_NULL_FOR_PRIMITIVES,
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/FileSystemJobResultStoreTestInternal.java b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/FileSystemJobResultStoreTestInternal.java
index 20d6f8a4c29..c952e5d4342 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/FileSystemJobResultStoreTestInternal.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/FileSystemJobResultStoreTestInternal.java
@@ -23,6 +23,7 @@ import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.jobmaster.JobResult;
 import org.apache.flink.util.FileUtils;
 import org.apache.flink.util.TestLoggerExtension;
+import org.apache.flink.util.jackson.JacksonMapperFactory;
 
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
 
@@ -45,7 +46,7 @@ import static org.assertj.core.api.Assertions.assertThat;
 @ExtendWith(TestLoggerExtension.class)
 public class FileSystemJobResultStoreTestInternal {
 
-    private static final ObjectMapper MAPPER = new ObjectMapper();
+    private static final ObjectMapper MAPPER = JacksonMapperFactory.createObjectMapper();
 
     private FileSystemJobResultStore fileSystemJobResultStore;
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/jsonplan/JsonGeneratorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/jsonplan/JsonGeneratorTest.java
index a1a37b1d2cc..a7a3b942e2b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/jsonplan/JsonGeneratorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/jsonplan/JsonGeneratorTest.java
@@ -25,6 +25,7 @@ import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.operators.testutils.DummyInvokable;
+import org.apache.flink.util.jackson.JacksonMapperFactory;
 
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
@@ -96,7 +97,7 @@ public class JsonGeneratorTest {
             assertNotNull(plan);
 
             // validate the produced JSON
-            ObjectMapper m = new ObjectMapper();
+            ObjectMapper m = JacksonMapperFactory.createObjectMapper();
             JsonNode rootNode = m.readTree(plan);
 
             // core fields
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/taskmanager/TaskManagerDetailsHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/taskmanager/TaskManagerDetailsHandlerTest.java
index 5e2e14864d8..6d1b58fa3d8 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/taskmanager/TaskManagerDetailsHandlerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/taskmanager/TaskManagerDetailsHandlerTest.java
@@ -39,6 +39,7 @@ import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerInfo;
 import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerMetricsInfo;
 import org.apache.flink.runtime.taskexecutor.TaskExecutorMemoryConfiguration;
 import org.apache.flink.testutils.TestingUtils;
+import org.apache.flink.util.jackson.JacksonMapperFactory;
 
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
@@ -118,7 +119,7 @@ class TaskManagerDetailsHandlerTest {
                         20L,
                         Collections.emptyList());
 
-        ObjectMapper objectMapper = new ObjectMapper();
+        ObjectMapper objectMapper = JacksonMapperFactory.createObjectMapper();
         String actualJson = objectMapper.writeValueAsString(actual);
         String expectedJson = objectMapper.writeValueAsString(expected);
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/json/JobResultDeserializerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/json/JobResultDeserializerTest.java
index 962b02c99f8..c9752f9fc34 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/json/JobResultDeserializerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/json/JobResultDeserializerTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.rest.messages.json;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.jobmaster.JobResult;
 import org.apache.flink.util.TestLogger;
+import org.apache.flink.util.jackson.JacksonMapperFactory;
 
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonMappingException;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
@@ -43,7 +44,7 @@ public class JobResultDeserializerTest extends TestLogger {
         final SimpleModule simpleModule = new SimpleModule();
         simpleModule.addDeserializer(JobResult.class, new JobResultDeserializer());
 
-        objectMapper = new ObjectMapper();
+        objectMapper = JacksonMapperFactory.createObjectMapper();
         objectMapper.registerModule(simpleModule);
     }
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/json/SerializedThrowableSerializerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/json/SerializedThrowableSerializerTest.java
index ac66aa314c4..b167fa7fbcb 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/json/SerializedThrowableSerializerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/json/SerializedThrowableSerializerTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.rest.messages.json;
 
 import org.apache.flink.util.SerializedThrowable;
 import org.apache.flink.util.TestLogger;
+import org.apache.flink.util.jackson.JacksonMapperFactory;
 
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.module.SimpleModule;
@@ -33,7 +34,7 @@ import static org.junit.Assert.assertTrue;
 /** Tests for {@link SerializedThrowableSerializer} and {@link SerializedThrowableDeserializer}. */
 public class SerializedThrowableSerializerTest extends TestLogger {
 
-    private ObjectMapper objectMapper = new ObjectMapper();
+    private ObjectMapper objectMapper;
 
     @Before
     public void setUp() {
@@ -42,7 +43,7 @@ public class SerializedThrowableSerializerTest extends TestLogger {
                 SerializedThrowable.class, new SerializedThrowableDeserializer());
         simpleModule.addSerializer(SerializedThrowable.class, new SerializedThrowableSerializer());
 
-        objectMapper = new ObjectMapper();
+        objectMapper = JacksonMapperFactory.createObjectMapper();
         objectMapper.registerModule(simpleModule);
     }
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/json/SerializedValueSerializerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/json/SerializedValueSerializerTest.java
index ee3d57c00b4..6e6ec96cc64 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/json/SerializedValueSerializerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/json/SerializedValueSerializerTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.rest.messages.json;
 
 import org.apache.flink.util.SerializedValue;
 import org.apache.flink.util.TestLogger;
+import org.apache.flink.util.jackson.JacksonMapperFactory;
 
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.type.TypeReference;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JavaType;
@@ -40,7 +41,7 @@ public class SerializedValueSerializerTest extends TestLogger {
 
     @Before
     public void setUp() {
-        objectMapper = new ObjectMapper();
+        objectMapper = JacksonMapperFactory.createObjectMapper();
         final SimpleModule simpleModule = new SimpleModule();
         final JavaType serializedValueWildcardType =
                 objectMapper
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/JSONGenerator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/JSONGenerator.java
index 7047fd01e4a..b39243f27eb 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/JSONGenerator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/JSONGenerator.java
@@ -18,6 +18,7 @@
 package org.apache.flink.streaming.api.graph;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.util.jackson.JacksonMapperFactory;
 
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ArrayNode;
@@ -45,7 +46,7 @@ public class JSONGenerator {
     public static final String PARALLELISM = "parallelism";
 
     private StreamGraph streamGraph;
-    private final ObjectMapper mapper = new ObjectMapper();
+    private final ObjectMapper mapper = JacksonMapperFactory.createObjectMapper();
 
     public JSONGenerator(StreamGraph streamGraph) {
         this.streamGraph = streamGraph;
diff --git a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/connector/source/CompactPartitions.java b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/connector/source/CompactPartitions.java
index 333a3480b1a..57ce1970720 100644
--- a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/connector/source/CompactPartitions.java
+++ b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/connector/source/CompactPartitions.java
@@ -20,6 +20,7 @@ package org.apache.flink.table.connector.source;
 
 import org.apache.flink.table.catalog.CatalogPartitionSpec;
 import org.apache.flink.table.factories.DynamicTableFactory;
+import org.apache.flink.util.jackson.JacksonMapperFactory;
 
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParser;
@@ -50,7 +51,7 @@ public class CompactPartitions implements Serializable {
 
     private static final long serialVersionUID = 1L;
     private static final String FIELD_NAME_COMPACT_PARTITIONS = "compact-partitions";
-    private static final ObjectMapper MAPPER = new ObjectMapper();
+    private static final ObjectMapper MAPPER = JacksonMapperFactory.createObjectMapper();
 
     private final List<CompactPartition> compactPartitions;
 
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/JsonSerdeUtil.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/JsonSerdeUtil.java
index 8a601601d3f..46fd71036b2 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/JsonSerdeUtil.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/JsonSerdeUtil.java
@@ -40,6 +40,7 @@ import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.extraction.ExtractionUtils;
 import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.util.jackson.JacksonMapperFactory;
 
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator;
@@ -100,7 +101,7 @@ public class JsonSerdeUtil {
     private static final ObjectMapper OBJECT_MAPPER_INSTANCE;
 
     static {
-        OBJECT_MAPPER_INSTANCE = new ObjectMapper();
+        OBJECT_MAPPER_INSTANCE = JacksonMapperFactory.createObjectMapper();
 
         OBJECT_MAPPER_INSTANCE.setTypeFactory(
                 // Make sure to register the classloader of the planner
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/LogicalTypeJsonSerdeTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/LogicalTypeJsonSerdeTest.java
index d27dbb5abe3..d571d753885 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/LogicalTypeJsonSerdeTest.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/LogicalTypeJsonSerdeTest.java
@@ -64,9 +64,9 @@ import org.apache.flink.table.types.logical.ZonedTimestampType;
 import org.apache.flink.table.types.utils.DataTypeFactoryMock;
 import org.apache.flink.table.utils.CatalogManagerMocks;
 import org.apache.flink.types.Row;
+import org.apache.flink.util.jackson.JacksonMapperFactory;
 
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
 
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.parallel.Execution;
@@ -139,8 +139,8 @@ public class LogicalTypeJsonSerdeTest {
         // maximum plan content
         tableConfig.set(TableConfigOptions.PLAN_COMPILE_CATALOG_OBJECTS, ALL);
         final String maximumJson = toJson(serdeContext, STRUCTURED_TYPE);
-        final ObjectMapper mapper = new ObjectMapper();
-        final JsonNode maximumJsonNode = mapper.readTree(maximumJson);
+        final JsonNode maximumJsonNode =
+                JacksonMapperFactory.createObjectMapper().readTree(maximumJson);
         assertThat(maximumJsonNode.get(LogicalTypeJsonSerializer.FIELD_NAME_ATTRIBUTES))
                 .isNotNull();
         assertThat(maximumJsonNode.get(LogicalTypeJsonSerializer.FIELD_NAME_DESCRIPTION).asText())
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/PartitionSpecSerdeTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/PartitionSpecSerdeTest.java
index 159ef2c6ab6..b35f3b808fd 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/PartitionSpecSerdeTest.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/PartitionSpecSerdeTest.java
@@ -19,6 +19,7 @@
 package org.apache.flink.table.planner.plan.nodes.exec.serde;
 
 import org.apache.flink.table.planner.plan.nodes.exec.spec.PartitionSpec;
+import org.apache.flink.util.jackson.JacksonMapperFactory;
 
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
@@ -30,20 +31,22 @@ import static org.assertj.core.api.Assertions.assertThat;
 /** Test PartitionSpec json ser/de. */
 public class PartitionSpecSerdeTest {
 
+    private static final ObjectMapper OBJECT_MAPPER = JacksonMapperFactory.createObjectMapper();
+
     @Test
     public void testPartitionSpec() throws JsonProcessingException {
         PartitionSpec spec = new PartitionSpec(new int[] {1, 2, 3});
-        ObjectMapper mapper = new ObjectMapper();
-        assertThat(mapper.readValue(mapper.writeValueAsString(spec), PartitionSpec.class))
+        assertThat(
+                        OBJECT_MAPPER.readValue(
+                                OBJECT_MAPPER.writeValueAsString(spec), PartitionSpec.class))
                 .isEqualTo(spec);
     }
 
     @Test
     public void testAllInOne() throws JsonProcessingException {
-        ObjectMapper mapper = new ObjectMapper();
         assertThat(
-                        mapper.readValue(
-                                mapper.writeValueAsString(PartitionSpec.ALL_IN_ONE),
+                        OBJECT_MAPPER.readValue(
+                                OBJECT_MAPPER.writeValueAsString(PartitionSpec.ALL_IN_ONE),
                                 PartitionSpec.class))
                 .isEqualTo(PartitionSpec.ALL_IN_ONE);
     }
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RankProcessStrategySerdeTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RankProcessStrategySerdeTest.java
index 82c1b3849fa..69c9e553d77 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RankProcessStrategySerdeTest.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RankProcessStrategySerdeTest.java
@@ -19,6 +19,7 @@
 package org.apache.flink.table.planner.plan.nodes.exec.serde;
 
 import org.apache.flink.table.planner.plan.utils.RankProcessStrategy;
+import org.apache.flink.util.jackson.JacksonMapperFactory;
 
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
@@ -32,7 +33,7 @@ public class RankProcessStrategySerdeTest {
 
     @Test
     public void testRankRange() throws JsonProcessingException {
-        ObjectMapper mapper = new ObjectMapper();
+        ObjectMapper mapper = JacksonMapperFactory.createObjectMapper();
         RankProcessStrategy[] strategies =
                 new RankProcessStrategy[] {
                     RankProcessStrategy.UNDEFINED_STRATEGY,
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RankRangeSerdeTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RankRangeSerdeTest.java
index 90abd497f54..bc5e6a246ba 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RankRangeSerdeTest.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RankRangeSerdeTest.java
@@ -22,6 +22,7 @@ import org.apache.flink.table.runtime.operators.rank.ConstantRankRange;
 import org.apache.flink.table.runtime.operators.rank.ConstantRankRangeWithoutEnd;
 import org.apache.flink.table.runtime.operators.rank.RankRange;
 import org.apache.flink.table.runtime.operators.rank.VariableRankRange;
+import org.apache.flink.util.jackson.JacksonMapperFactory;
 
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
@@ -35,7 +36,7 @@ public class RankRangeSerdeTest {
 
     @Test
     public void testRankRange() throws JsonProcessingException {
-        ObjectMapper mapper = new ObjectMapper();
+        ObjectMapper mapper = JacksonMapperFactory.createObjectMapper();
         RankRange[] ranges =
                 new RankRange[] {
                     new ConstantRankRange(1, 2),
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RankTypeSerdeTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RankTypeSerdeTest.java
index e1daa9d03e3..04fa1552512 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RankTypeSerdeTest.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RankTypeSerdeTest.java
@@ -19,6 +19,7 @@
 package org.apache.flink.table.planner.plan.nodes.exec.serde;
 
 import org.apache.flink.table.runtime.operators.rank.RankType;
+import org.apache.flink.util.jackson.JacksonMapperFactory;
 
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
@@ -32,7 +33,7 @@ public class RankTypeSerdeTest {
 
     @Test
     public void testRankType() throws JsonProcessingException {
-        ObjectMapper mapper = new ObjectMapper();
+        ObjectMapper mapper = JacksonMapperFactory.createObjectMapper();
         for (RankType type : RankType.values()) {
             RankType result = mapper.readValue(mapper.writeValueAsString(type), RankType.class);
             assertThat(result).isEqualTo(type);
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/SortSpecSerdeTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/SortSpecSerdeTest.java
index 2b5600980dd..af2190203b5 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/SortSpecSerdeTest.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/SortSpecSerdeTest.java
@@ -19,6 +19,7 @@
 package org.apache.flink.table.planner.plan.nodes.exec.serde;
 
 import org.apache.flink.table.planner.plan.nodes.exec.spec.SortSpec;
+import org.apache.flink.util.jackson.JacksonMapperFactory;
 
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
@@ -29,6 +30,7 @@ import static org.assertj.core.api.Assertions.assertThat;
 
 /** Test SortSpec json ser/de. */
 public class SortSpecSerdeTest {
+    private static final ObjectMapper OBJECT_MAPPER = JacksonMapperFactory.createObjectMapper();
 
     @Test
     public void testSortSpec() throws JsonProcessingException {
@@ -39,15 +41,17 @@ public class SortSpecSerdeTest {
                         .addField(3, false, true)
                         .addField(4, false, false)
                         .build();
-        ObjectMapper mapper = new ObjectMapper();
-        assertThat(mapper.readValue(mapper.writeValueAsString(sortSpec), SortSpec.class))
+        assertThat(
+                        OBJECT_MAPPER.readValue(
+                                OBJECT_MAPPER.writeValueAsString(sortSpec), SortSpec.class))
                 .isEqualTo(sortSpec);
     }
 
     @Test
     public void testAny() throws JsonProcessingException {
-        ObjectMapper mapper = new ObjectMapper();
-        assertThat(mapper.readValue(mapper.writeValueAsString(SortSpec.ANY), SortSpec.class))
+        assertThat(
+                        OBJECT_MAPPER.readValue(
+                                OBJECT_MAPPER.writeValueAsString(SortSpec.ANY), SortSpec.class))
                 .isEqualTo(SortSpec.ANY);
     }
 }
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/utils/JsonTestUtils.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/utils/JsonTestUtils.java
index 682f3b3e656..6aa9f96c7d0 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/utils/JsonTestUtils.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/utils/JsonTestUtils.java
@@ -19,6 +19,7 @@
 package org.apache.flink.table.planner.utils;
 
 import org.apache.flink.FlinkVersion;
+import org.apache.flink.util.jackson.JacksonMapperFactory;
 
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
@@ -29,7 +30,8 @@ import java.io.IOException;
 /** This class contains a collection of generic utilities to deal with JSON in tests. */
 public final class JsonTestUtils {
 
-    private static final ObjectMapper OBJECT_MAPPER_INSTANCE = new ObjectMapper();
+    private static final ObjectMapper OBJECT_MAPPER_INSTANCE =
+            JacksonMapperFactory.createObjectMapper();
 
     private JsonTestUtils() {}
 
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala
index fd2fd085519..35499ba9439 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala
@@ -66,6 +66,7 @@ import org.apache.flink.table.types.utils.TypeConversions
 import org.apache.flink.table.typeutils.FieldInfoUtils
 import org.apache.flink.types.Row
 import org.apache.flink.util.{FlinkUserCodeClassLoaders, MutableURLClassLoader}
+import org.apache.flink.util.jackson.JacksonMapperFactory
 
 import _root_.java.math.{BigDecimal => JBigDecimal}
 import _root_.java.util
@@ -1612,6 +1613,8 @@ object PlanKind extends Enumeration {
 
 object TableTestUtil {
 
+  private val objectMapper = JacksonMapperFactory.createObjectMapper()
+
   val STREAM_SETTING: EnvironmentSettings =
     EnvironmentSettings.newInstance().inStreamingMode().build()
   val BATCH_SETTING: EnvironmentSettings = EnvironmentSettings.newInstance().inBatchMode().build()
@@ -1706,14 +1709,14 @@ object TableTestUtil {
 
   @throws[IOException]
   def getFormattedJson(json: String): String = {
-    val parser = new ObjectMapper().getFactory.createParser(json)
+    val parser = objectMapper.getFactory.createParser(json)
     val jsonNode: JsonNode = parser.readValueAsTree[JsonNode]
     jsonNode.toString
   }
 
   @throws[IOException]
   def getPrettyJson(json: String): String = {
-    val parser = new ObjectMapper().getFactory.createParser(json)
+    val parser = objectMapper.getFactory.createParser(json)
     val jsonNode: JsonNode = parser.readValueAsTree[JsonNode]
     jsonNode.toPrettyString
   }
diff --git a/flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/JsonJobGraphGenerationTest.java b/flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/JsonJobGraphGenerationTest.java
index 1bbf8d35c42..3e7a589f779 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/JsonJobGraphGenerationTest.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/JsonJobGraphGenerationTest.java
@@ -32,6 +32,7 @@ import org.apache.flink.optimizer.plan.OptimizedPlan;
 import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.jsonplan.JsonPlanGenerator;
+import org.apache.flink.util.jackson.JacksonMapperFactory;
 
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonFactory;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParser;
@@ -245,6 +246,8 @@ public class JsonJobGraphGenerationTest {
 
     private static class GenericValidator implements JsonValidator {
 
+        private static final ObjectMapper OBJECT_MAPPER = JacksonMapperFactory.createObjectMapper();
+
         private final int expectedParallelism;
         private final int numNodes;
 
@@ -258,8 +261,7 @@ public class JsonJobGraphGenerationTest {
             final Map<String, JsonNode> idToNode = new HashMap<>();
 
             // validate the produced JSON
-            ObjectMapper m = new ObjectMapper();
-            JsonNode rootNode = m.readTree(json);
+            JsonNode rootNode = OBJECT_MAPPER.readTree(json);
 
             JsonNode idField = rootNode.get("jid");
             JsonNode nameField = rootNode.get("name");