You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2022/08/08 17:08:22 UTC
[flink] 03/04: [FLINK-28621][core] Add central Jackson mapper factory methods
This is an automated email from the ASF dual-hosted git repository.
chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit c0bf0ac3fb1fe4814bff09807ed2040bb13da052
Author: Chesnay Schepler <ch...@apache.org>
AuthorDate: Thu Jul 28 16:02:26 2022 +0200
[FLINK-28621][core] Add central Jackson mapper factory methods
---
.../sink/testutils/KinesisFirehoseTestUtils.java | 6 ++--
.../kinesis/sink/examples/SinkIntoKinesis.java | 9 +++--
.../JSONKeyValueDeserializationSchema.java | 3 +-
.../KafkaRecordDeserializationSchemaTest.java | 12 ++++---
.../JSONKeyValueDeserializationSchemaTest.java | 31 ++++++++---------
flink-core/pom.xml | 11 +++---
.../flink/util/jackson/JacksonMapperFactory.java | 40 ++++++++++++++++++++++
.../util/jackson/JacksonMapperFactoryTest.java | 37 +++++++++-----------
.../flink/docs/rest/OpenApiSpecGenerator.java | 4 ++-
.../flink/docs/rest/RestAPIDocGenerator.java | 3 +-
.../table/test/KinesisFirehoseTableITTest.java | 14 +++++---
.../table/test/KinesisStreamsTableApiIT.java | 11 +++---
.../flink/tests/util/flink/FlinkDistribution.java | 3 +-
.../kinesis/test/KinesisTableApiITCase.java | 11 +++---
.../flink/streaming/kinesis/test/model/Order.java | 2 +-
.../apache/flink/formats/csv/CsvBulkWriter.java | 3 +-
.../flink/formats/csv/CsvFileFormatFactory.java | 5 +--
.../apache/flink/formats/csv/CsvReaderFormat.java | 5 +--
.../csv/CsvRowDataDeserializationSchema.java | 5 +--
.../formats/csv/CsvRowDataSerializationSchema.java | 3 +-
.../formats/csv/CsvRowDeserializationSchema.java | 5 +--
.../formats/csv/CsvRowSerializationSchema.java | 3 +-
.../flink/formats/csv/RowCsvInputFormat.java | 4 +--
.../flink/formats/csv/DataStreamCsvITCase.java | 10 +++---
.../formats/json/JsonDeserializationSchema.java | 5 +--
.../json/JsonRowDataDeserializationSchema.java | 3 +-
.../json/JsonRowDataSerializationSchema.java | 3 +-
.../formats/json/JsonRowDeserializationSchema.java | 3 +-
.../flink/formats/json/JsonRowSchemaConverter.java | 3 +-
.../formats/json/JsonRowSerializationSchema.java | 3 +-
.../json/JsonNodeDeserializationSchemaTest.java | 3 +-
.../formats/json/JsonRowDataSerDeSchemaTest.java | 37 +++++++++-----------
.../json/JsonRowDeserializationSchemaTest.java | 21 +++++-------
.../flink/python/metric/FlinkMetricContainer.java | 5 ++-
.../runtime/webmonitor/history/HistoryServer.java | 3 +-
.../history/HistoryServerArchiveFetcher.java | 3 +-
.../rest/compatibility/CompatibilityRoutines.java | 3 +-
.../rest/compatibility/RestAPIStabilityTest.java | 3 +-
.../runtime/webmonitor/WebFrontendITCase.java | 17 ++++-----
.../webmonitor/history/HistoryServerTest.java | 4 ++-
.../highavailability/FileSystemJobResultStore.java | 3 +-
.../flink/runtime/history/FsJobArchivist.java | 3 +-
.../flink/runtime/rest/util/RestMapperUtils.java | 4 ++-
.../FileSystemJobResultStoreTestInternal.java | 3 +-
.../jobgraph/jsonplan/JsonGeneratorTest.java | 3 +-
.../taskmanager/TaskManagerDetailsHandlerTest.java | 3 +-
.../messages/json/JobResultDeserializerTest.java | 3 +-
.../json/SerializedThrowableSerializerTest.java | 5 +--
.../json/SerializedValueSerializerTest.java | 3 +-
.../flink/streaming/api/graph/JSONGenerator.java | 3 +-
.../table/connector/source/CompactPartitions.java | 3 +-
.../plan/nodes/exec/serde/JsonSerdeUtil.java | 3 +-
.../nodes/exec/serde/LogicalTypeJsonSerdeTest.java | 6 ++--
.../nodes/exec/serde/PartitionSpecSerdeTest.java | 13 ++++---
.../exec/serde/RankProcessStrategySerdeTest.java | 3 +-
.../plan/nodes/exec/serde/RankRangeSerdeTest.java | 3 +-
.../plan/nodes/exec/serde/RankTypeSerdeTest.java | 3 +-
.../plan/nodes/exec/serde/SortSpecSerdeTest.java | 12 ++++---
.../flink/table/planner/utils/JsonTestUtils.java | 4 ++-
.../flink/table/planner/utils/TableTestBase.scala | 7 ++--
.../jsonplan/JsonJobGraphGenerationTest.java | 6 ++--
61 files changed, 278 insertions(+), 177 deletions(-)
diff --git a/flink-connectors/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/sink/testutils/KinesisFirehoseTestUtils.java b/flink-connectors/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/sink/testutils/KinesisFirehoseTestUtils.java
index 129c4a43e7f..f4dee623b64 100644
--- a/flink-connectors/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/sink/testutils/KinesisFirehoseTestUtils.java
+++ b/flink-connectors/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/sink/testutils/KinesisFirehoseTestUtils.java
@@ -20,6 +20,7 @@ package org.apache.flink.connector.firehose.sink.testutils;
import org.apache.flink.connector.aws.testutils.AWSServicesTestUtils;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.util.jackson.JacksonMapperFactory;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
@@ -39,7 +40,7 @@ import java.util.List;
*/
public class KinesisFirehoseTestUtils {
- private static final ObjectMapper MAPPER = new ObjectMapper();
+ private static final ObjectMapper MAPPER = JacksonMapperFactory.createObjectMapper();
public static FirehoseClient createFirehoseClient(String endpoint, SdkHttpClient httpClient) {
return AWSServicesTestUtils.createAwsSyncClient(
@@ -68,11 +69,10 @@ public class KinesisFirehoseTestUtils {
public static DataStream<String> getSampleDataGenerator(
StreamExecutionEnvironment env, int endValue) {
- ObjectMapper mapper = new ObjectMapper();
return env.fromSequence(1, endValue)
.map(Object::toString)
.returns(String.class)
- .map(data -> mapper.writeValueAsString(ImmutableMap.of("data", data)));
+ .map(data -> MAPPER.writeValueAsString(ImmutableMap.of("data", data)));
}
public static List<String> getSampleData(int endValue) throws JsonProcessingException {
diff --git a/flink-connectors/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/sink/examples/SinkIntoKinesis.java b/flink-connectors/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/sink/examples/SinkIntoKinesis.java
index 362e25fda77..c861f44fbdf 100644
--- a/flink-connectors/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/sink/examples/SinkIntoKinesis.java
+++ b/flink-connectors/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/sink/examples/SinkIntoKinesis.java
@@ -22,6 +22,7 @@ import org.apache.flink.connector.aws.config.AWSConfigConstants;
import org.apache.flink.connector.kinesis.sink.KinesisStreamsSink;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.util.jackson.JacksonMapperFactory;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
@@ -39,8 +40,9 @@ import java.util.Properties;
*/
public class SinkIntoKinesis {
+ private static final ObjectMapper OBJECT_MAPPER = JacksonMapperFactory.createObjectMapper();
+
public static void main(String[] args) throws Exception {
- ObjectMapper mapper = new ObjectMapper();
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(10_000);
@@ -48,7 +50,10 @@ public class SinkIntoKinesis {
env.fromSequence(1, 10_000_000L)
.map(Object::toString)
.returns(String.class)
- .map(data -> mapper.writeValueAsString(ImmutableMap.of("data", data)));
+ .map(
+ data ->
+ OBJECT_MAPPER.writeValueAsString(
+ ImmutableMap.of("data", data)));
Properties sinkProperties = new Properties();
sinkProperties.put(AWSConfigConstants.AWS_REGION, "your-region-here");
diff --git a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/util/serialization/JSONKeyValueDeserializationSchema.java b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/util/serialization/JSONKeyValueDeserializationSchema.java
index 86817433a6f..e2b428eec1e 100644
--- a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/util/serialization/JSONKeyValueDeserializationSchema.java
+++ b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/util/serialization/JSONKeyValueDeserializationSchema.java
@@ -21,6 +21,7 @@ import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema;
+import org.apache.flink.util.jackson.JacksonMapperFactory;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
@@ -55,7 +56,7 @@ public class JSONKeyValueDeserializationSchema implements KafkaDeserializationSc
@Override
public void open(DeserializationSchema.InitializationContext context) throws Exception {
- mapper = new ObjectMapper();
+ mapper = JacksonMapperFactory.createObjectMapper();
}
@Override
diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/deserializer/KafkaRecordDeserializationSchemaTest.java b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/deserializer/KafkaRecordDeserializationSchemaTest.java
index c2e735a0c13..8766719a0c1 100644
--- a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/deserializer/KafkaRecordDeserializationSchemaTest.java
+++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/deserializer/KafkaRecordDeserializationSchemaTest.java
@@ -23,6 +23,7 @@ import org.apache.flink.connector.testutils.source.deserialization.TestingDeseri
import org.apache.flink.formats.json.JsonDeserializationSchema;
import org.apache.flink.streaming.util.serialization.JSONKeyValueDeserializationSchema;
import org.apache.flink.util.Collector;
+import org.apache.flink.util.jackson.JacksonMapperFactory;
import org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableMap;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
@@ -46,6 +47,8 @@ import static org.assertj.core.api.Assertions.assertThat;
/** Unit tests for KafkaRecordDeserializationSchema. */
public class KafkaRecordDeserializationSchemaTest {
+ private static final ObjectMapper OBJECT_MAPPER = JacksonMapperFactory.createObjectMapper();
+
private static Map<String, ?> configurableConfiguration;
private static Map<String, ?> configuration;
private static boolean isKeyDeserializer;
@@ -135,14 +138,13 @@ public class KafkaRecordDeserializationSchemaTest {
}
private ConsumerRecord<byte[], byte[]> getConsumerRecord() throws JsonProcessingException {
- ObjectMapper mapper = new ObjectMapper();
- ObjectNode initialKey = mapper.createObjectNode();
+ ObjectNode initialKey = OBJECT_MAPPER.createObjectNode();
initialKey.put("index", 4);
- byte[] serializedKey = mapper.writeValueAsBytes(initialKey);
+ byte[] serializedKey = OBJECT_MAPPER.writeValueAsBytes(initialKey);
- ObjectNode initialValue = mapper.createObjectNode();
+ ObjectNode initialValue = OBJECT_MAPPER.createObjectNode();
initialValue.put("word", "world");
- byte[] serializedValue = mapper.writeValueAsBytes(initialValue);
+ byte[] serializedValue = OBJECT_MAPPER.writeValueAsBytes(initialValue);
return new ConsumerRecord<>("topic#1", 3, 4L, serializedKey, serializedValue);
}
diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/JSONKeyValueDeserializationSchemaTest.java b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/JSONKeyValueDeserializationSchemaTest.java
index 3a052cee2af..ddbcf1c94c8 100644
--- a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/JSONKeyValueDeserializationSchemaTest.java
+++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/JSONKeyValueDeserializationSchemaTest.java
@@ -19,6 +19,7 @@ package org.apache.flink.streaming.connectors.kafka;
import org.apache.flink.connector.testutils.formats.DummyInitializationContext;
import org.apache.flink.streaming.util.serialization.JSONKeyValueDeserializationSchema;
+import org.apache.flink.util.jackson.JacksonMapperFactory;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
@@ -31,16 +32,17 @@ import static org.assertj.core.api.Assertions.assertThat;
/** Tests for the{@link JSONKeyValueDeserializationSchema}. */
public class JSONKeyValueDeserializationSchemaTest {
+ private static final ObjectMapper OBJECT_MAPPER = JacksonMapperFactory.createObjectMapper();
+
@Test
public void testDeserializeWithoutMetadata() throws Exception {
- ObjectMapper mapper = new ObjectMapper();
- ObjectNode initialKey = mapper.createObjectNode();
+ ObjectNode initialKey = OBJECT_MAPPER.createObjectNode();
initialKey.put("index", 4);
- byte[] serializedKey = mapper.writeValueAsBytes(initialKey);
+ byte[] serializedKey = OBJECT_MAPPER.writeValueAsBytes(initialKey);
- ObjectNode initialValue = mapper.createObjectNode();
+ ObjectNode initialValue = OBJECT_MAPPER.createObjectNode();
initialValue.put("word", "world");
- byte[] serializedValue = mapper.writeValueAsBytes(initialValue);
+ byte[] serializedValue = OBJECT_MAPPER.writeValueAsBytes(initialValue);
JSONKeyValueDeserializationSchema schema = new JSONKeyValueDeserializationSchema(false);
schema.open(new DummyInitializationContext());
@@ -54,12 +56,11 @@ public class JSONKeyValueDeserializationSchemaTest {
@Test
public void testDeserializeWithoutKey() throws Exception {
- ObjectMapper mapper = new ObjectMapper();
byte[] serializedKey = null;
- ObjectNode initialValue = mapper.createObjectNode();
+ ObjectNode initialValue = OBJECT_MAPPER.createObjectNode();
initialValue.put("word", "world");
- byte[] serializedValue = mapper.writeValueAsBytes(initialValue);
+ byte[] serializedValue = OBJECT_MAPPER.writeValueAsBytes(initialValue);
JSONKeyValueDeserializationSchema schema = new JSONKeyValueDeserializationSchema(false);
schema.open(new DummyInitializationContext());
@@ -88,10 +89,9 @@ public class JSONKeyValueDeserializationSchemaTest {
@Test
public void testDeserializeWithoutValue() throws Exception {
- ObjectMapper mapper = new ObjectMapper();
- ObjectNode initialKey = mapper.createObjectNode();
+ ObjectNode initialKey = OBJECT_MAPPER.createObjectNode();
initialKey.put("index", 4);
- byte[] serializedKey = mapper.writeValueAsBytes(initialKey);
+ byte[] serializedKey = OBJECT_MAPPER.writeValueAsBytes(initialKey);
byte[] serializedValue = null;
@@ -107,14 +107,13 @@ public class JSONKeyValueDeserializationSchemaTest {
@Test
public void testDeserializeWithMetadata() throws Exception {
- ObjectMapper mapper = new ObjectMapper();
- ObjectNode initialKey = mapper.createObjectNode();
+ ObjectNode initialKey = OBJECT_MAPPER.createObjectNode();
initialKey.put("index", 4);
- byte[] serializedKey = mapper.writeValueAsBytes(initialKey);
+ byte[] serializedKey = OBJECT_MAPPER.writeValueAsBytes(initialKey);
- ObjectNode initialValue = mapper.createObjectNode();
+ ObjectNode initialValue = OBJECT_MAPPER.createObjectNode();
initialValue.put("word", "world");
- byte[] serializedValue = mapper.writeValueAsBytes(initialValue);
+ byte[] serializedValue = OBJECT_MAPPER.writeValueAsBytes(initialValue);
JSONKeyValueDeserializationSchema schema = new JSONKeyValueDeserializationSchema(true);
schema.open(new DummyInitializationContext());
diff --git a/flink-core/pom.xml b/flink-core/pom.xml
index ef3061f1ac5..45be24efc05 100644
--- a/flink-core/pom.xml
+++ b/flink-core/pom.xml
@@ -51,6 +51,11 @@ under the License.
<artifactId>flink-shaded-asm-9</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-shaded-jackson</artifactId>
+ </dependency>
+
<!-- standard utilities -->
<dependency>
<groupId>org.apache.commons</groupId>
@@ -114,12 +119,6 @@ under the License.
<scope>test</scope>
</dependency>
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-shaded-jackson</artifactId>
- <scope>test</scope>
- </dependency>
-
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
diff --git a/flink-core/src/main/java/org/apache/flink/util/jackson/JacksonMapperFactory.java b/flink-core/src/main/java/org/apache/flink/util/jackson/JacksonMapperFactory.java
new file mode 100644
index 00000000000..2c0b57f0471
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/util/jackson/JacksonMapperFactory.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.util.jackson;
+
+import org.apache.flink.annotation.Experimental;
+
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.CsvMapper;
+
+/** Factory for Jackson mappers. */
+@Experimental
+public final class JacksonMapperFactory {
+
+ public static ObjectMapper createObjectMapper() {
+ final ObjectMapper objectMapper = new ObjectMapper();
+ return objectMapper;
+ }
+
+ public static CsvMapper createCsvMapper() {
+ final CsvMapper csvMapper = new CsvMapper();
+ return csvMapper;
+ }
+
+ private JacksonMapperFactory() {}
+}
diff --git a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonNodeDeserializationSchemaTest.java b/flink-core/src/test/java/org/apache/flink/util/jackson/JacksonMapperFactoryTest.java
similarity index 52%
copy from flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonNodeDeserializationSchemaTest.java
copy to flink-core/src/test/java/org/apache/flink/util/jackson/JacksonMapperFactoryTest.java
index 90751525feb..61f2057ca82 100644
--- a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonNodeDeserializationSchemaTest.java
+++ b/flink-core/src/test/java/org/apache/flink/util/jackson/JacksonMapperFactoryTest.java
@@ -15,35 +15,30 @@
* limitations under the License.
*/
-package org.apache.flink.formats.json;
-
-import org.apache.flink.connector.testutils.formats.DummyInitializationContext;
+package org.apache.flink.util.jackson;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.CsvMapper;
import org.junit.jupiter.api.Test;
-import java.io.IOException;
-
import static org.assertj.core.api.Assertions.assertThat;
-/** Tests for the {@link JsonNodeDeserializationSchema}. */
-@SuppressWarnings("deprecation")
-class JsonNodeDeserializationSchemaTest {
+class JacksonMapperFactoryTest {
@Test
- void testDeserialize() throws IOException {
- ObjectMapper mapper = new ObjectMapper();
- ObjectNode initialValue = mapper.createObjectNode();
- initialValue.put("key", 4).put("value", "world");
- byte[] serializedValue = mapper.writeValueAsBytes(initialValue);
-
- JsonNodeDeserializationSchema schema = new JsonNodeDeserializationSchema();
- schema.open(new DummyInitializationContext());
- ObjectNode deserializedValue = schema.deserialize(serializedValue);
-
- assertThat(deserializedValue.get("key").asInt()).isEqualTo(4);
- assertThat(deserializedValue.get("value").asText()).isEqualTo("world");
+ void testCreateObjectMapperReturnDistinctMappers() {
+ final ObjectMapper mapper1 = JacksonMapperFactory.createObjectMapper();
+ final ObjectMapper mapper2 = JacksonMapperFactory.createObjectMapper();
+
+ assertThat(mapper1).isNotSameAs(mapper2);
+ }
+
+ @Test
+ void testCreateCsvMapperReturnDistinctMappers() {
+ final CsvMapper mapper1 = JacksonMapperFactory.createCsvMapper();
+ final CsvMapper mapper2 = JacksonMapperFactory.createCsvMapper();
+
+ assertThat(mapper1).isNotSameAs(mapper2);
}
}
diff --git a/flink-docs/src/main/java/org/apache/flink/docs/rest/OpenApiSpecGenerator.java b/flink-docs/src/main/java/org/apache/flink/docs/rest/OpenApiSpecGenerator.java
index 0265ff2c957..fe17e807f8f 100644
--- a/flink-docs/src/main/java/org/apache/flink/docs/rest/OpenApiSpecGenerator.java
+++ b/flink-docs/src/main/java/org/apache/flink/docs/rest/OpenApiSpecGenerator.java
@@ -44,6 +44,7 @@ import org.apache.flink.runtime.util.EnvironmentInformation;
import org.apache.flink.runtime.webmonitor.handlers.JarUploadHeaders;
import org.apache.flink.util.ConfigurationException;
import org.apache.flink.util.SerializedThrowable;
+import org.apache.flink.util.jackson.JacksonMapperFactory;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.SerializationFeature;
@@ -100,7 +101,8 @@ public class OpenApiSpecGenerator {
static {
ModelResolver.enumsAsRef = true;
final ObjectMapper mapper =
- new ObjectMapper().configure(SerializationFeature.ORDER_MAP_ENTRIES_BY_KEYS, true);
+ JacksonMapperFactory.createObjectMapper()
+ .configure(SerializationFeature.ORDER_MAP_ENTRIES_BY_KEYS, true);
modelConverterContext =
new ModelConverterContextImpl(Collections.singletonList(new ModelResolver(mapper)));
}
diff --git a/flink-docs/src/main/java/org/apache/flink/docs/rest/RestAPIDocGenerator.java b/flink-docs/src/main/java/org/apache/flink/docs/rest/RestAPIDocGenerator.java
index 489bc6415ed..a21ec24cc3e 100644
--- a/flink-docs/src/main/java/org/apache/flink/docs/rest/RestAPIDocGenerator.java
+++ b/flink-docs/src/main/java/org/apache/flink/docs/rest/RestAPIDocGenerator.java
@@ -32,6 +32,7 @@ import org.apache.flink.runtime.rest.util.DocumentingDispatcherRestEndpoint;
import org.apache.flink.runtime.rest.util.DocumentingRestEndpoint;
import org.apache.flink.runtime.rest.versioning.RuntimeRestAPIVersion;
import org.apache.flink.util.ConfigurationException;
+import org.apache.flink.util.jackson.JacksonMapperFactory;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.SerializableString;
@@ -99,7 +100,7 @@ public class RestAPIDocGenerator {
private static final JsonSchemaGenerator schemaGen;
static {
- mapper = new ObjectMapper();
+ mapper = JacksonMapperFactory.createObjectMapper();
mapper.getFactory().setCharacterEscapes(new HTMLCharacterEscapes());
mapper.configure(SerializationFeature.ORDER_MAP_ENTRIES_BY_KEYS, true);
schemaGen = new JsonSchemaGenerator(mapper);
diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/table/test/KinesisFirehoseTableITTest.java b/flink-end-to-end-tests/flink-end-to-end-tests-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/table/test/KinesisFirehoseTableITTest.java
index 172693c5ac6..ce67a707aec 100644
--- a/flink-end-to-end-tests/flink-end-to-end-tests-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/table/test/KinesisFirehoseTableITTest.java
+++ b/flink-end-to-end-tests/flink-end-to-end-tests-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/table/test/KinesisFirehoseTableITTest.java
@@ -26,10 +26,13 @@ import org.apache.flink.test.util.SQLJobSubmission;
import org.apache.flink.tests.util.TestUtils;
import org.apache.flink.util.DockerImageVersions;
import org.apache.flink.util.TestLogger;
+import org.apache.flink.util.jackson.JacksonMapperFactory;
+
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.annotation.JsonProperty;
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.ObjectMapper;
import org.assertj.core.api.Assertions;
import org.junit.After;
import org.junit.AfterClass;
@@ -79,6 +82,8 @@ public class KinesisFirehoseTableITTest extends TestLogger {
private static final String BUCKET_NAME = "s3-firehose";
private static final String STREAM_NAME = "s3-stream";
+ private static final ObjectMapper OBJECT_MAPPER = JacksonMapperFactory.createObjectMapper();
+
private final Path sqlConnectorFirehoseJar = TestUtils.getResource(".*firehose.jar");
private SdkHttpClient httpClient;
@@ -207,7 +212,7 @@ public class KinesisFirehoseTableITTest extends TestLogger {
private <T> T fromJson(final String json, final Class<T> type) {
try {
- return new ObjectMapper().readValue(json, type);
+ return OBJECT_MAPPER.readValue(json, type);
} catch (JsonProcessingException e) {
throw new RuntimeException(String.format("Failed to deserialize json: %s", json), e);
}
@@ -218,6 +223,7 @@ public class KinesisFirehoseTableITTest extends TestLogger {
private final String code;
private final int quantity;
+ @JsonCreator
public Order(
@JsonProperty("code") final String code, @JsonProperty("quantity") int quantity) {
this.code = code;
diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/table/test/KinesisStreamsTableApiIT.java b/flink-end-to-end-tests/flink-end-to-end-tests-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/table/test/KinesisStreamsTableApiIT.java
index 1f415c4bcec..d52ca2cb83d 100644
--- a/flink-end-to-end-tests/flink-end-to-end-tests-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/table/test/KinesisStreamsTableApiIT.java
+++ b/flink-end-to-end-tests/flink-end-to-end-tests-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/table/test/KinesisStreamsTableApiIT.java
@@ -27,12 +27,13 @@ import org.apache.flink.connectors.kinesis.testutils.KinesaliteContainer;
import org.apache.flink.test.util.SQLJobSubmission;
import org.apache.flink.tests.util.TestUtils;
import org.apache.flink.util.DockerImageVersions;
+import org.apache.flink.util.jackson.JacksonMapperFactory;
import org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableList;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.annotation.JsonProperty;
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.ObjectMapper;
import org.assertj.core.api.Assertions;
import org.junit.After;
import org.junit.AfterClass;
@@ -78,6 +79,8 @@ public class KinesisStreamsTableApiIT {
private static final String ORDERS_STREAM = "orders";
private static final String INTER_CONTAINER_KINESALITE_ALIAS = "kinesalite";
private static final String DEFAULT_FIRST_SHARD_NAME = "shardId-000000000000";
+ private static final ObjectMapper OBJECT_MAPPER = JacksonMapperFactory.createObjectMapper();
+
private SdkHttpClient httpClient;
private KinesisClient kinesisClient;
@@ -202,7 +205,7 @@ public class KinesisStreamsTableApiIT {
private <T> T fromJson(final String json, final Class<T> type) {
try {
- return new ObjectMapper().readValue(json, type);
+ return OBJECT_MAPPER.readValue(json, type);
} catch (JsonProcessingException e) {
throw new RuntimeException("Test Failure.", e);
}
diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/FlinkDistribution.java b/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/FlinkDistribution.java
index e9b939adf4d..05f11f820b8 100644
--- a/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/FlinkDistribution.java
+++ b/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/FlinkDistribution.java
@@ -29,6 +29,7 @@ import org.apache.flink.tests.util.AutoClosableProcess;
import org.apache.flink.tests.util.TestUtils;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.function.FutureTaskWithException;
+import org.apache.flink.util.jackson.JacksonMapperFactory;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
@@ -75,7 +76,7 @@ public final class FlinkDistribution {
private static final Logger LOG = LoggerFactory.getLogger(FlinkDistribution.class);
- private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+ private static final ObjectMapper OBJECT_MAPPER = JacksonMapperFactory.createObjectMapper();
private static final Pattern ROOT_LOGGER_PATTERN = Pattern.compile("(rootLogger.level =).*");
private static final String HIVE_DRIVER = "org.apache.hive.jdbc.HiveDriver";
diff --git a/flink-end-to-end-tests/flink-streaming-kinesis-test/src/test/java/org/apache/flink/streaming/kinesis/test/KinesisTableApiITCase.java b/flink-end-to-end-tests/flink-streaming-kinesis-test/src/test/java/org/apache/flink/streaming/kinesis/test/KinesisTableApiITCase.java
index 4cd75934ed7..b637108e5db 100644
--- a/flink-end-to-end-tests/flink-streaming-kinesis-test/src/test/java/org/apache/flink/streaming/kinesis/test/KinesisTableApiITCase.java
+++ b/flink-end-to-end-tests/flink-streaming-kinesis-test/src/test/java/org/apache/flink/streaming/kinesis/test/KinesisTableApiITCase.java
@@ -28,11 +28,12 @@ import org.apache.flink.test.util.SQLJobSubmission;
import org.apache.flink.tests.util.TestUtils;
import org.apache.flink.util.DockerImageVersions;
import org.apache.flink.util.TestLogger;
+import org.apache.flink.util.jackson.JacksonMapperFactory;
import org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableList;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.ObjectMapper;
import org.hamcrest.Matchers;
import org.junit.After;
import org.junit.AfterClass;
@@ -67,6 +68,8 @@ public class KinesisTableApiITCase extends TestLogger {
private static final String LARGE_ORDERS_STREAM = "large_orders";
private static final String INTER_CONTAINER_KINESALITE_ALIAS = "kinesalite";
+ private static final ObjectMapper OBJECT_MAPPER = JacksonMapperFactory.createObjectMapper();
+
private final Path sqlConnectorKinesisJar = TestUtils.getResource(".*kinesis.jar");
private static final Network network = Network.newNetwork();
@@ -166,7 +169,7 @@ public class KinesisTableApiITCase extends TestLogger {
private <T> String toJson(final T object) {
try {
- return new ObjectMapper().writeValueAsString(object);
+ return OBJECT_MAPPER.writeValueAsString(object);
} catch (JsonProcessingException e) {
throw new RuntimeException("Test Failure.", e);
}
@@ -174,7 +177,7 @@ public class KinesisTableApiITCase extends TestLogger {
private <T> T fromJson(final String json, final Class<T> type) {
try {
- return new ObjectMapper().readValue(json, type);
+ return OBJECT_MAPPER.readValue(json, type);
} catch (JsonProcessingException e) {
throw new RuntimeException("Test Failure.", e);
}
diff --git a/flink-end-to-end-tests/flink-streaming-kinesis-test/src/test/java/org/apache/flink/streaming/kinesis/test/model/Order.java b/flink-end-to-end-tests/flink-streaming-kinesis-test/src/test/java/org/apache/flink/streaming/kinesis/test/model/Order.java
index 15c158c4c8f..58cec308eae 100644
--- a/flink-end-to-end-tests/flink-streaming-kinesis-test/src/test/java/org/apache/flink/streaming/kinesis/test/model/Order.java
+++ b/flink-end-to-end-tests/flink-streaming-kinesis-test/src/test/java/org/apache/flink/streaming/kinesis/test/model/Order.java
@@ -17,7 +17,7 @@
package org.apache.flink.streaming.kinesis.test.model;
-import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
import java.util.Objects;
diff --git a/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvBulkWriter.java b/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvBulkWriter.java
index 69b3f9bce28..f9a8e01bb07 100644
--- a/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvBulkWriter.java
+++ b/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvBulkWriter.java
@@ -21,6 +21,7 @@ package org.apache.flink.formats.csv;
import org.apache.flink.api.common.serialization.BulkWriter;
import org.apache.flink.core.fs.FSDataOutputStream;
import org.apache.flink.formats.common.Converter;
+import org.apache.flink.util.jackson.JacksonMapperFactory;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectWriter;
@@ -89,7 +90,7 @@ class CsvBulkWriter<T, R, C> implements BulkWriter<T> {
*/
static <T> CsvBulkWriter<T, T, Void> forPojo(Class<T> pojoClass, FSDataOutputStream stream) {
final Converter<T, T, Void> converter = (value, context) -> value;
- final CsvMapper csvMapper = new CsvMapper();
+ final CsvMapper csvMapper = JacksonMapperFactory.createCsvMapper();
final CsvSchema schema = csvMapper.schemaFor(pojoClass).withoutQuoteChar();
return new CsvBulkWriter<>(csvMapper, schema, converter, null, stream);
}
diff --git a/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvFileFormatFactory.java b/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvFileFormatFactory.java
index b0a0b27d5e7..f977acd5b4f 100644
--- a/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvFileFormatFactory.java
+++ b/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvFileFormatFactory.java
@@ -46,6 +46,7 @@ import org.apache.flink.table.factories.DynamicTableFactory;
import org.apache.flink.table.plan.stats.TableStats;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.util.jackson.JacksonMapperFactory;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
@@ -132,7 +133,7 @@ public class CsvFileFormatFactory implements BulkReaderFormatFactory, BulkWriter
.createRowConverter(projectedRowType, true);
CsvReaderFormat<RowData> csvReaderFormat =
new CsvReaderFormat<>(
- () -> new CsvMapper(),
+ () -> JacksonMapperFactory.createCsvMapper(),
ignored -> schema,
JsonNode.class,
converter,
@@ -178,7 +179,7 @@ public class CsvFileFormatFactory implements BulkReaderFormatFactory, BulkWriter
final RowDataToCsvConverter converter = RowDataToCsvConverters.createRowConverter(rowType);
return out -> {
- final CsvMapper mapper = new CsvMapper();
+ final CsvMapper mapper = JacksonMapperFactory.createCsvMapper();
final ObjectNode container = mapper.createObjectNode();
final RowDataToCsvConverter.RowDataToCsvFormatConverterContext converterContext =
diff --git a/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvReaderFormat.java b/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvReaderFormat.java
index c2d14008cb9..02f3afbe43e 100644
--- a/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvReaderFormat.java
+++ b/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvReaderFormat.java
@@ -27,6 +27,7 @@ import org.apache.flink.core.fs.FSDataInputStream;
import org.apache.flink.formats.common.Converter;
import org.apache.flink.util.function.SerializableFunction;
import org.apache.flink.util.function.SerializableSupplier;
+import org.apache.flink.util.jackson.JacksonMapperFactory;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.MappingIterator;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.CsvMapper;
@@ -108,7 +109,7 @@ public class CsvReaderFormat<T> extends SimpleStreamFormat<T> {
*/
public static <T> CsvReaderFormat<T> forSchema(
CsvSchema schema, TypeInformation<T> typeInformation) {
- return forSchema(() -> new CsvMapper(), ignored -> schema, typeInformation);
+ return forSchema(JacksonMapperFactory::createCsvMapper, ignored -> schema, typeInformation);
}
/**
@@ -163,7 +164,7 @@ public class CsvReaderFormat<T> extends SimpleStreamFormat<T> {
*/
public static <T> CsvReaderFormat<T> forPojo(Class<T> pojoType) {
return forSchema(
- () -> new CsvMapper(),
+ () -> JacksonMapperFactory.createCsvMapper(),
mapper -> mapper.schemaFor(pojoType).withoutQuoteChar(),
TypeInformation.of(pojoType));
}
diff --git a/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowDataDeserializationSchema.java b/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowDataDeserializationSchema.java
index 0c3f68e97f8..b77312d41b8 100644
--- a/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowDataDeserializationSchema.java
+++ b/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowDataDeserializationSchema.java
@@ -24,10 +24,10 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.jackson.JacksonMapperFactory;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectReader;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.CsvMapper;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.CsvSchema;
import javax.annotation.Nullable;
@@ -77,7 +77,8 @@ public final class CsvRowDataDeserializationSchema implements DeserializationSch
@Override
public void open(InitializationContext context) {
- this.objectReader = new CsvMapper().readerFor(JsonNode.class).with(csvSchema);
+ this.objectReader =
+ JacksonMapperFactory.createCsvMapper().readerFor(JsonNode.class).with(csvSchema);
}
/** A builder for creating a {@link CsvRowDataDeserializationSchema}. */
diff --git a/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowDataSerializationSchema.java b/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowDataSerializationSchema.java
index f7fd810e691..737b74eda20 100644
--- a/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowDataSerializationSchema.java
+++ b/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowDataSerializationSchema.java
@@ -24,6 +24,7 @@ import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.function.SerializableSupplier;
+import org.apache.flink.util.jackson.JacksonMapperFactory;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
@@ -152,7 +153,7 @@ public final class CsvRowDataSerializationSchema implements SerializationSchema<
rowType,
csvSchema,
() -> {
- final CsvMapper csvMapper = new CsvMapper();
+ final CsvMapper csvMapper = JacksonMapperFactory.createCsvMapper();
csvMapper.configure(
JsonGenerator.Feature.WRITE_BIGDECIMAL_AS_PLAIN,
!isScientificNotation);
diff --git a/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowDeserializationSchema.java b/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowDeserializationSchema.java
index 24153b4de46..b2a91a9fb34 100644
--- a/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowDeserializationSchema.java
+++ b/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowDeserializationSchema.java
@@ -28,10 +28,10 @@ import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.types.Row;
import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.jackson.JacksonMapperFactory;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectReader;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.CsvMapper;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.CsvSchema;
import java.io.IOException;
@@ -93,7 +93,8 @@ public final class CsvRowDeserializationSchema implements DeserializationSchema<
@Override
public void open(InitializationContext context) throws Exception {
- objectReader = new CsvMapper().readerFor(JsonNode.class).with(csvSchema);
+ objectReader =
+ JacksonMapperFactory.createCsvMapper().readerFor(JsonNode.class).with(csvSchema);
}
/** A builder for creating a {@link CsvRowDeserializationSchema}. */
diff --git a/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowSerializationSchema.java b/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowSerializationSchema.java
index eca56958d96..53f4edf16e3 100644
--- a/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowSerializationSchema.java
+++ b/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowSerializationSchema.java
@@ -28,6 +28,7 @@ import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.types.Row;
import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.jackson.JacksonMapperFactory;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectWriter;
@@ -93,7 +94,7 @@ public final class CsvRowSerializationSchema implements SerializationSchema<Row>
@Override
public void open(InitializationContext context) throws Exception {
- this.csvMapper = new CsvMapper();
+ this.csvMapper = JacksonMapperFactory.createCsvMapper();
this.objectWriter = csvMapper.writer(csvSchema);
}
diff --git a/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/RowCsvInputFormat.java b/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/RowCsvInputFormat.java
index 75a22cb2dd6..53634c5cc83 100644
--- a/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/RowCsvInputFormat.java
+++ b/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/RowCsvInputFormat.java
@@ -24,10 +24,10 @@ import org.apache.flink.core.fs.FileInputSplit;
import org.apache.flink.core.fs.Path;
import org.apache.flink.formats.csv.CsvRowDeserializationSchema.RuntimeConverter;
import org.apache.flink.types.Row;
+import org.apache.flink.util.jackson.JacksonMapperFactory;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.MappingIterator;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.CsvMapper;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.CsvSchema;
import java.io.IOException;
@@ -90,7 +90,7 @@ public class RowCsvInputFormat extends AbstractCsvInputFormat<Row> {
super.open(split);
prepareConverter();
this.iterator =
- new CsvMapper()
+ JacksonMapperFactory.createCsvMapper()
.readerFor(JsonNode.class)
.with(csvSchema)
.readValues(csvInputStream);
diff --git a/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/DataStreamCsvITCase.java b/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/DataStreamCsvITCase.java
index ea073432116..363a17afdef 100644
--- a/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/DataStreamCsvITCase.java
+++ b/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/DataStreamCsvITCase.java
@@ -37,6 +37,7 @@ import org.apache.flink.streaming.api.operators.collect.ClientAndIterator;
import org.apache.flink.test.junit5.MiniClusterExtension;
import org.apache.flink.util.TestLoggerExtension;
import org.apache.flink.util.function.FunctionWithException;
+import org.apache.flink.util.jackson.JacksonMapperFactory;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonPropertyOrder;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.CsvMapper;
@@ -74,6 +75,8 @@ import static org.assertj.core.api.Assertions.assertThat;
@ExtendWith({TestLoggerExtension.class})
public class DataStreamCsvITCase {
+ private static final CsvMapper CSV_MAPPER = JacksonMapperFactory.createCsvMapper();
+
private static final int PARALLELISM = 4;
@TempDir File outDir;
@@ -163,7 +166,7 @@ public class DataStreamCsvITCase {
final CsvReaderFormat<CityPojo> csvFormat =
CsvReaderFormat.forSchema(
- () -> new CsvMapper(),
+ () -> CSV_MAPPER,
mapper ->
mapper.schemaFor(CityPojo.class)
.withoutQuoteChar()
@@ -229,9 +232,8 @@ public class DataStreamCsvITCase {
private static <T> BulkWriter.Factory<T> factoryForPojo(Class<T> pojoClass) {
final Converter<T, T, Void> converter = (value, context) -> value;
- final CsvMapper csvMapper = new CsvMapper();
- final CsvSchema schema = csvMapper.schemaFor(pojoClass).withoutQuoteChar();
- return (out) -> new CsvBulkWriter<>(csvMapper, schema, converter, null, out);
+ final CsvSchema schema = CSV_MAPPER.schemaFor(pojoClass).withoutQuoteChar();
+ return (out) -> new CsvBulkWriter<>(CSV_MAPPER, schema, converter, null, out);
}
private static Map<File, String> getFileContentByPath(File directory) throws IOException {
diff --git a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonDeserializationSchema.java b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonDeserializationSchema.java
index fd28712d56a..cc244b0a0c2 100644
--- a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonDeserializationSchema.java
+++ b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonDeserializationSchema.java
@@ -21,6 +21,7 @@ import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.serialization.AbstractDeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.util.function.SerializableSupplier;
+import org.apache.flink.util.jackson.JacksonMapperFactory;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
@@ -37,11 +38,11 @@ public class JsonDeserializationSchema<T> extends AbstractDeserializationSchema<
protected transient ObjectMapper mapper;
public JsonDeserializationSchema(Class<T> clazz) {
- this(clazz, () -> new ObjectMapper());
+ this(clazz, JacksonMapperFactory::createObjectMapper);
}
public JsonDeserializationSchema(TypeInformation<T> typeInformation) {
- this(typeInformation, () -> new ObjectMapper());
+ this(typeInformation, JacksonMapperFactory::createObjectMapper);
}
public JsonDeserializationSchema(
diff --git a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDataDeserializationSchema.java b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDataDeserializationSchema.java
index 805b299c11c..9a57bac203b 100644
--- a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDataDeserializationSchema.java
+++ b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDataDeserializationSchema.java
@@ -26,6 +26,7 @@ import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.DecimalType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.logical.utils.LogicalTypeChecks;
+import org.apache.flink.util.jackson.JacksonMapperFactory;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.json.JsonReadFeature;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.DeserializationFeature;
@@ -97,7 +98,7 @@ public class JsonRowDataDeserializationSchema implements DeserializationSchema<R
@Override
public void open(InitializationContext context) throws Exception {
objectMapper =
- new ObjectMapper()
+ JacksonMapperFactory.createObjectMapper()
.configure(
JsonReadFeature.ALLOW_UNESCAPED_CONTROL_CHARS.mappedFeature(),
true);
diff --git a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDataSerializationSchema.java b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDataSerializationSchema.java
index 6a8c619eeac..c8b7f73b64d 100644
--- a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDataSerializationSchema.java
+++ b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDataSerializationSchema.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.formats.common.TimestampFormat;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.util.jackson.JacksonMapperFactory;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
@@ -86,7 +87,7 @@ public class JsonRowDataSerializationSchema implements SerializationSchema<RowDa
@Override
public void open(InitializationContext context) throws Exception {
mapper =
- new ObjectMapper()
+ JacksonMapperFactory.createObjectMapper()
.configure(
JsonGenerator.Feature.WRITE_BIGDECIMAL_AS_PLAIN,
encodeDecimalAsPlainNumber);
diff --git a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDeserializationSchema.java b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDeserializationSchema.java
index b2b7e6dada5..dd4a9bb9f99 100644
--- a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDeserializationSchema.java
+++ b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDeserializationSchema.java
@@ -30,6 +30,7 @@ import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.logical.utils.LogicalTypeChecks;
import org.apache.flink.types.Row;
+import org.apache.flink.util.jackson.JacksonMapperFactory;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.DeserializationFeature;
@@ -121,7 +122,7 @@ public class JsonRowDeserializationSchema implements DeserializationSchema<Row>
@Override
public void open(InitializationContext context) throws Exception {
- objectMapper = new ObjectMapper();
+ objectMapper = JacksonMapperFactory.createObjectMapper();
if (hasDecimalType) {
objectMapper.enable(DeserializationFeature.USE_BIG_DECIMAL_FOR_FLOATS);
}
diff --git a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowSchemaConverter.java b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowSchemaConverter.java
index fe412320456..d7761851ee4 100644
--- a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowSchemaConverter.java
+++ b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowSchemaConverter.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.types.Row;
import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.jackson.JacksonMapperFactory;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParser;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
@@ -97,7 +98,7 @@ public final class JsonRowSchemaConverter {
@SuppressWarnings("unchecked")
public static <T> TypeInformation<T> convert(String jsonSchema) {
Preconditions.checkNotNull(jsonSchema, "JSON schema");
- final ObjectMapper mapper = new ObjectMapper();
+ final ObjectMapper mapper = JacksonMapperFactory.createObjectMapper();
mapper.getFactory()
.enable(JsonParser.Feature.ALLOW_COMMENTS)
.enable(JsonParser.Feature.ALLOW_UNQUOTED_FIELD_NAMES)
diff --git a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowSerializationSchema.java b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowSerializationSchema.java
index e789307967e..f185d211bf9 100644
--- a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowSerializationSchema.java
+++ b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowSerializationSchema.java
@@ -28,6 +28,7 @@ import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.types.Row;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.WrappingRuntimeException;
+import org.apache.flink.util.jackson.JacksonMapperFactory;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
@@ -96,7 +97,7 @@ public class JsonRowSerializationSchema implements SerializationSchema<Row> {
@Override
public void open(InitializationContext context) throws Exception {
- mapper = new ObjectMapper();
+ mapper = JacksonMapperFactory.createObjectMapper();
}
/** Builder for {@link JsonRowSerializationSchema}. */
diff --git a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonNodeDeserializationSchemaTest.java b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonNodeDeserializationSchemaTest.java
index 90751525feb..e6b2a3e05cb 100644
--- a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonNodeDeserializationSchemaTest.java
+++ b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonNodeDeserializationSchemaTest.java
@@ -18,6 +18,7 @@
package org.apache.flink.formats.json;
import org.apache.flink.connector.testutils.formats.DummyInitializationContext;
+import org.apache.flink.util.jackson.JacksonMapperFactory;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
@@ -34,7 +35,7 @@ class JsonNodeDeserializationSchemaTest {
@Test
void testDeserialize() throws IOException {
- ObjectMapper mapper = new ObjectMapper();
+ ObjectMapper mapper = JacksonMapperFactory.createObjectMapper();
ObjectNode initialValue = mapper.createObjectNode();
initialValue.put("key", 4).put("value", "world");
byte[] serializedValue = mapper.writeValueAsBytes(initialValue);
diff --git a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonRowDataSerDeSchemaTest.java b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonRowDataSerDeSchemaTest.java
index 888be3b9fe7..883c3f09134 100644
--- a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonRowDataSerDeSchemaTest.java
+++ b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonRowDataSerDeSchemaTest.java
@@ -31,6 +31,7 @@ import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.types.Row;
+import org.apache.flink.util.jackson.JacksonMapperFactory;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ArrayNode;
@@ -83,6 +84,8 @@ import static org.assertj.core.api.Assertions.assertThatThrownBy;
*/
class JsonRowDataSerDeSchemaTest {
+ private static final ObjectMapper OBJECT_MAPPER = JacksonMapperFactory.createObjectMapper();
+
@Test
void testSerDe() throws Exception {
byte tinyint = 'c';
@@ -115,11 +118,10 @@ class JsonRowDataSerDeSchemaTest {
innerMap.put("key", 234);
nestedMap.put("inner_map", innerMap);
- ObjectMapper objectMapper = new ObjectMapper();
- ArrayNode doubleNode = objectMapper.createArrayNode().add(1.1D).add(2.2D).add(3.3D);
+ ArrayNode doubleNode = OBJECT_MAPPER.createArrayNode().add(1.1D).add(2.2D).add(3.3D);
// Root
- ObjectNode root = objectMapper.createObjectNode();
+ ObjectNode root = OBJECT_MAPPER.createObjectNode();
root.put("bool", true);
root.put("tinyint", tinyint);
root.put("smallint", smallint);
@@ -139,7 +141,7 @@ class JsonRowDataSerDeSchemaTest {
root.putObject("multiSet").put("element", 2);
root.putObject("map2map").putObject("inner_map").put("key", 234);
- byte[] serializedJson = objectMapper.writeValueAsBytes(root);
+ byte[] serializedJson = OBJECT_MAPPER.writeValueAsBytes(root);
DataType dataType =
ROW(
@@ -220,8 +222,7 @@ class JsonRowDataSerDeSchemaTest {
double doubleValue = random.nextDouble();
float floatValue = random.nextFloat();
- ObjectMapper objectMapper = new ObjectMapper();
- ObjectNode root = objectMapper.createObjectNode();
+ ObjectNode root = OBJECT_MAPPER.createObjectNode();
root.put("bool", String.valueOf(bool));
root.put("int", String.valueOf(integer));
root.put("bigint", String.valueOf(bigint));
@@ -230,7 +231,7 @@ class JsonRowDataSerDeSchemaTest {
root.put("float1", String.valueOf(floatValue));
root.put("float2", new BigDecimal(floatValue));
- byte[] serializedJson = objectMapper.writeValueAsBytes(root);
+ byte[] serializedJson = OBJECT_MAPPER.writeValueAsBytes(root);
DataType dataType =
ROW(
@@ -296,11 +297,9 @@ class JsonRowDataSerDeSchemaTest {
true);
open(serializationSchema);
- ObjectMapper objectMapper = new ObjectMapper();
-
// the first row
{
- ObjectNode root = objectMapper.createObjectNode();
+ ObjectNode root = OBJECT_MAPPER.createObjectNode();
root.put("f1", 1);
root.put("f2", true);
root.put("f3", "str");
@@ -312,7 +311,7 @@ class JsonRowDataSerDeSchemaTest {
ObjectNode row = root.putObject("f6");
row.put("f1", "this is row1");
row.put("f2", 12);
- byte[] serializedJson = objectMapper.writeValueAsBytes(root);
+ byte[] serializedJson = OBJECT_MAPPER.writeValueAsBytes(root);
RowData rowData = deserializationSchema.deserialize(serializedJson);
byte[] actual = serializationSchema.serialize(rowData);
assertThat(serializedJson).containsExactly(actual);
@@ -320,7 +319,7 @@ class JsonRowDataSerDeSchemaTest {
// the second row
{
- ObjectNode root = objectMapper.createObjectNode();
+ ObjectNode root = OBJECT_MAPPER.createObjectNode();
root.put("f1", 10);
root.put("f2", false);
root.put("f3", "newStr");
@@ -332,7 +331,7 @@ class JsonRowDataSerDeSchemaTest {
ObjectNode row = root.putObject("f6");
row.put("f1", "this is row2");
row.putNull("f2");
- byte[] serializedJson = objectMapper.writeValueAsBytes(root);
+ byte[] serializedJson = OBJECT_MAPPER.writeValueAsBytes(root);
RowData rowData = deserializationSchema.deserialize(serializedJson);
byte[] actual = serializationSchema.serialize(rowData);
assertThat(serializedJson).containsExactly(actual);
@@ -419,12 +418,10 @@ class JsonRowDataSerDeSchemaTest {
@Test
void testDeserializationMissingField() throws Exception {
- ObjectMapper objectMapper = new ObjectMapper();
-
// Root
- ObjectNode root = objectMapper.createObjectNode();
+ ObjectNode root = OBJECT_MAPPER.createObjectNode();
root.put("id", 123123123);
- byte[] serializedJson = objectMapper.writeValueAsBytes(root);
+ byte[] serializedJson = OBJECT_MAPPER.writeValueAsBytes(root);
DataType dataType = ROW(FIELD("name", STRING()));
RowType schema = (RowType) dataType.getLogicalType();
@@ -504,14 +501,12 @@ class JsonRowDataSerDeSchemaTest {
true);
open(serializationSchema);
- ObjectMapper objectMapper = new ObjectMapper();
-
- ObjectNode root = objectMapper.createObjectNode();
+ ObjectNode root = OBJECT_MAPPER.createObjectNode();
root.put("timestamp3", "1990-10-14 12:12:43.123");
root.put("timestamp9", "1990-10-14 12:12:43.123456789");
root.put("timestamp_with_local_timezone3", "1990-10-14 12:12:43.123Z");
root.put("timestamp_with_local_timezone9", "1990-10-14 12:12:43.123456789Z");
- byte[] serializedJson = objectMapper.writeValueAsBytes(root);
+ byte[] serializedJson = OBJECT_MAPPER.writeValueAsBytes(root);
RowData rowData = deserializationSchema.deserialize(serializedJson);
byte[] actual = serializationSchema.serialize(rowData);
assertThat(serializedJson).containsExactly(actual);
diff --git a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonRowDeserializationSchemaTest.java b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonRowDeserializationSchemaTest.java
index deee53ae011..81e370c0fd3 100644
--- a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonRowDeserializationSchemaTest.java
+++ b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonRowDeserializationSchemaTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.formats.json;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.types.Row;
+import org.apache.flink.util.jackson.JacksonMapperFactory;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
@@ -52,6 +53,8 @@ import static org.junit.internal.matchers.ThrowableMessageMatcher.hasMessage;
/** Tests for the {@link JsonRowDeserializationSchema}. */
public class JsonRowDeserializationSchemaTest {
+ private static final ObjectMapper OBJECT_MAPPER = JacksonMapperFactory.createObjectMapper();
+
@Rule public ExpectedException thrown = ExpectedException.none();
/** Tests simple deserialization using type information. */
@@ -73,10 +76,8 @@ public class JsonRowDeserializationSchemaTest {
innerMap.put("key", 234);
nestedMap.put("inner_map", innerMap);
- ObjectMapper objectMapper = new ObjectMapper();
-
// Root
- ObjectNode root = objectMapper.createObjectNode();
+ ObjectNode root = OBJECT_MAPPER.createObjectNode();
root.put("id", id);
root.put("name", name);
root.put("bytes", bytes);
@@ -89,7 +90,7 @@ public class JsonRowDeserializationSchemaTest {
root.putObject("map").put("flink", 123);
root.putObject("map2map").putObject("inner_map").put("key", 234);
- byte[] serializedJson = objectMapper.writeValueAsBytes(root);
+ byte[] serializedJson = OBJECT_MAPPER.writeValueAsBytes(root);
JsonRowDeserializationSchema deserializationSchema =
new JsonRowDeserializationSchema.Builder(
@@ -149,10 +150,8 @@ public class JsonRowDeserializationSchemaTest {
};
final String[] strings = new String[] {"one", "two", "three"};
- final ObjectMapper objectMapper = new ObjectMapper();
-
// Root
- ObjectNode root = objectMapper.createObjectNode();
+ ObjectNode root = OBJECT_MAPPER.createObjectNode();
root.put("id", id.longValue());
root.putNull("idOrNull");
root.put("name", name);
@@ -164,7 +163,7 @@ public class JsonRowDeserializationSchemaTest {
root.putArray("strings").add("one").add("two").add("three");
root.putObject("nested").put("booleanField", true).put("decimalField", 12);
- final byte[] serializedJson = objectMapper.writeValueAsBytes(root);
+ final byte[] serializedJson = OBJECT_MAPPER.writeValueAsBytes(root);
JsonRowDeserializationSchema deserializationSchema =
new JsonRowDeserializationSchema.Builder(
@@ -212,12 +211,10 @@ public class JsonRowDeserializationSchemaTest {
/** Tests deserialization with non-existing field name. */
@Test
public void testMissingNode() throws Exception {
- ObjectMapper objectMapper = new ObjectMapper();
-
// Root
- ObjectNode root = objectMapper.createObjectNode();
+ ObjectNode root = OBJECT_MAPPER.createObjectNode();
root.put("id", 123123123);
- byte[] serializedJson = objectMapper.writeValueAsBytes(root);
+ byte[] serializedJson = OBJECT_MAPPER.writeValueAsBytes(root);
TypeInformation<Row> rowTypeInformation =
Types.ROW_NAMED(new String[] {"name"}, Types.STRING);
diff --git a/flink-python/src/main/java/org/apache/flink/python/metric/FlinkMetricContainer.java b/flink-python/src/main/java/org/apache/flink/python/metric/FlinkMetricContainer.java
index a184c39c621..df2f84a283b 100644
--- a/flink-python/src/main/java/org/apache/flink/python/metric/FlinkMetricContainer.java
+++ b/flink-python/src/main/java/org/apache/flink/python/metric/FlinkMetricContainer.java
@@ -26,6 +26,7 @@ import org.apache.flink.metrics.Gauge;
import org.apache.flink.metrics.Meter;
import org.apache.flink.metrics.MeterView;
import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.util.jackson.JacksonMapperFactory;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
@@ -56,6 +57,8 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
@Internal
public final class FlinkMetricContainer {
+ private static final ObjectMapper OBJECT_MAPPER = JacksonMapperFactory.createObjectMapper();
+
private static final String METRIC_KEY_SEPARATOR =
GlobalConfiguration.loadConfiguration().getString(MetricOptions.SCOPE_DELIMITER);
@@ -206,7 +209,7 @@ public final class FlinkMetricContainer {
static ArrayList getNameSpaceArray(MetricKey metricKey) {
MetricName metricName = metricKey.metricName();
try {
- return new ObjectMapper().readValue(metricName.getNamespace(), ArrayList.class);
+ return OBJECT_MAPPER.readValue(metricName.getNamespace(), ArrayList.class);
} catch (JsonProcessingException e) {
throw new RuntimeException(
String.format("Parse namespace[%s] error. ", metricName.getNamespace()), e);
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java
index c7c779c4e93..fff6d42306f 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java
@@ -49,6 +49,7 @@ import org.apache.flink.util.FlinkException;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.ShutdownHookUtil;
import org.apache.flink.util.concurrent.ExecutorThreadFactory;
+import org.apache.flink.util.jackson.JacksonMapperFactory;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
@@ -100,7 +101,7 @@ import java.util.function.Consumer;
public class HistoryServer {
private static final Logger LOG = LoggerFactory.getLogger(HistoryServer.class);
- private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+ private static final ObjectMapper OBJECT_MAPPER = JacksonMapperFactory.createObjectMapper();
private final Configuration config;
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java
index 8e5aee9c633..a8d782354a0 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java
@@ -30,6 +30,7 @@ import org.apache.flink.runtime.messages.webmonitor.JobDetails;
import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
import org.apache.flink.runtime.rest.messages.JobsOverviewHeaders;
import org.apache.flink.util.FileUtils;
+import org.apache.flink.util.jackson.JacksonMapperFactory;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonFactory;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator;
@@ -104,7 +105,7 @@ class HistoryServerArchiveFetcher {
private static final Logger LOG = LoggerFactory.getLogger(HistoryServerArchiveFetcher.class);
private static final JsonFactory jacksonFactory = new JsonFactory();
- private static final ObjectMapper mapper = new ObjectMapper();
+ private static final ObjectMapper mapper = JacksonMapperFactory.createObjectMapper();
private static final String JSON_FILE_ENDING = ".json";
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/rest/compatibility/CompatibilityRoutines.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/rest/compatibility/CompatibilityRoutines.java
index 1bf3b8f99cb..20bbc04501c 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/rest/compatibility/CompatibilityRoutines.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/rest/compatibility/CompatibilityRoutines.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.rest.compatibility;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.rest.handler.RestHandlerSpecification;
import org.apache.flink.runtime.rest.messages.UntypedResponseMessageHeaders;
+import org.apache.flink.util.jackson.JacksonMapperFactory;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
@@ -137,7 +138,7 @@ enum CompatibilityRoutines {
REQUEST_ROUTINE,
RESPONSE_ROUTINE));
- private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+ private static final ObjectMapper OBJECT_MAPPER = JacksonMapperFactory.createObjectMapper();
private static final JsonSchemaGenerator SCHEMA_GENERATOR =
new JsonSchemaGenerator(OBJECT_MAPPER);
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/rest/compatibility/RestAPIStabilityTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/rest/compatibility/RestAPIStabilityTest.java
index 0dd5704b1dd..fec3204ebea 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/rest/compatibility/RestAPIStabilityTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/rest/compatibility/RestAPIStabilityTest.java
@@ -23,6 +23,7 @@ import org.apache.flink.runtime.rest.util.DocumentingDispatcherRestEndpoint;
import org.apache.flink.runtime.rest.util.DocumentingRestEndpoint;
import org.apache.flink.runtime.rest.versioning.RuntimeRestAPIVersion;
import org.apache.flink.util.ConfigurationException;
+import org.apache.flink.util.jackson.JacksonMapperFactory;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.util.DefaultIndenter;
@@ -56,7 +57,7 @@ final class RestAPIStabilityTest {
private static final String SNAPSHOT_RESOURCE_PATTERN = "rest_api_%s.snapshot";
- private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+ private static final ObjectMapper OBJECT_MAPPER = JacksonMapperFactory.createObjectMapper();
private static class StableRestApiVersionProvider implements ArgumentsProvider {
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebFrontendITCase.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebFrontendITCase.java
index d23a0cc1898..b677a094d54 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebFrontendITCase.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebFrontendITCase.java
@@ -38,6 +38,7 @@ import org.apache.flink.runtime.webmonitor.testutils.HttpTestClient;
import org.apache.flink.test.junit5.InjectClusterClient;
import org.apache.flink.test.junit5.InjectClusterRESTAddress;
import org.apache.flink.test.junit5.MiniClusterExtension;
+import org.apache.flink.util.jackson.JacksonMapperFactory;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
@@ -81,6 +82,7 @@ class WebFrontendITCase {
private static final int NUM_TASK_MANAGERS = 2;
private static final int NUM_SLOTS = 4;
+ private static final ObjectMapper OBJECT_MAPPER = JacksonMapperFactory.createObjectMapper();
private static final Configuration CLUSTER_CONFIGURATION = getClusterConfiguration();
@RegisterExtension
@@ -170,8 +172,7 @@ class WebFrontendITCase {
void getNumberOfTaskManagers(@InjectClusterRESTAddress URI restAddress) throws Exception {
String json = getFromHTTP("http://localhost:" + restAddress.getPort() + "/taskmanagers/");
- ObjectMapper mapper = new ObjectMapper();
- JsonNode response = mapper.readTree(json);
+ JsonNode response = OBJECT_MAPPER.readTree(json);
ArrayNode taskManagers = (ArrayNode) response.get("taskmanagers");
assertThat(taskManagers).hasSize(NUM_TASK_MANAGERS);
@@ -181,8 +182,7 @@ class WebFrontendITCase {
void getTaskManagers(@InjectClusterRESTAddress URI restAddress) throws Exception {
String json = getFromHTTP("http://localhost:" + restAddress.getPort() + "/taskmanagers/");
- ObjectMapper mapper = new ObjectMapper();
- JsonNode parsed = mapper.readTree(json);
+ JsonNode parsed = OBJECT_MAPPER.readTree(json);
ArrayNode taskManagers = (ArrayNode) parsed.get("taskmanagers");
assertThat(taskManagers).hasSize(NUM_TASK_MANAGERS);
@@ -231,8 +231,7 @@ class WebFrontendITCase {
throws Exception {
String json = getFromHTTP("http://localhost:" + restAddress.getPort() + "/taskmanagers/");
- ObjectMapper mapper = new ObjectMapper();
- JsonNode parsed = mapper.readTree(json);
+ JsonNode parsed = OBJECT_MAPPER.readTree(json);
ArrayNode taskManagers = (ArrayNode) parsed.get("taskmanagers");
JsonNode taskManager = taskManagers.get(0);
String id = taskManager.get("id").asText();
@@ -278,8 +277,7 @@ class WebFrontendITCase {
private static Map<String, String> fromKeyValueJsonArray(String jsonString) {
try {
Map<String, String> map = new HashMap<>();
- ObjectMapper m = new ObjectMapper();
- ArrayNode array = (ArrayNode) m.readTree(jsonString);
+ ArrayNode array = (ArrayNode) OBJECT_MAPPER.readTree(jsonString);
Iterator<JsonNode> elements = array.elements();
while (elements.hasNext()) {
@@ -394,8 +392,7 @@ class WebFrontendITCase {
String json = getFromHTTP("http://localhost:" + restAddress.getPort() + "/jobs/overview");
- ObjectMapper mapper = new ObjectMapper();
- JsonNode parsed = mapper.readTree(json);
+ JsonNode parsed = OBJECT_MAPPER.readTree(json);
ArrayNode jsonJobs = (ArrayNode) parsed.get("jobs");
assertThat(jsonJobs.size()).isEqualTo(1);
assertThat(jsonJobs.get(0).get("duration").asInt()).isGreaterThan(0);
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerTest.java
index 4c5f7ea2645..8ec80e33358 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerTest.java
@@ -37,6 +37,7 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.jackson.JacksonMapperFactory;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonFactory;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator;
@@ -78,7 +79,8 @@ class HistoryServerTest {
.enable(JsonGenerator.Feature.AUTO_CLOSE_TARGET)
.disable(JsonGenerator.Feature.AUTO_CLOSE_JSON_CONTENT);
private static final ObjectMapper OBJECT_MAPPER =
- new ObjectMapper().enable(DeserializationFeature.FAIL_ON_MISSING_CREATOR_PROPERTIES);
+ JacksonMapperFactory.createObjectMapper()
+ .enable(DeserializationFeature.FAIL_ON_MISSING_CREATOR_PROPERTIES);
private MiniClusterWithClientResource cluster;
private File jmDirectory;
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/FileSystemJobResultStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/FileSystemJobResultStore.java
index 010ce77e74c..986b7a03343 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/FileSystemJobResultStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/FileSystemJobResultStore.java
@@ -30,6 +30,7 @@ import org.apache.flink.runtime.rest.messages.json.JobResultDeserializer;
import org.apache.flink.runtime.rest.messages.json.JobResultSerializer;
import org.apache.flink.runtime.util.NonClosingOutputStreamDecorator;
import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.jackson.JacksonMapperFactory;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnore;
@@ -66,7 +67,7 @@ public class FileSystemJobResultStore extends AbstractThreadsafeJobResultStore {
return filename.endsWith(FILE_EXTENSION);
}
- private final ObjectMapper mapper = new ObjectMapper();
+ private final ObjectMapper mapper = JacksonMapperFactory.createObjectMapper();
private final FileSystem fileSystem;
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/history/FsJobArchivist.java b/flink-runtime/src/main/java/org/apache/flink/runtime/history/FsJobArchivist.java
index 67c9c2b7655..65c76fdefaa 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/history/FsJobArchivist.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/history/FsJobArchivist.java
@@ -26,6 +26,7 @@ import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
import org.apache.flink.util.IOUtils;
+import org.apache.flink.util.jackson.JacksonMapperFactory;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonEncoding;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonFactory;
@@ -47,7 +48,7 @@ public class FsJobArchivist {
private static final Logger LOG = LoggerFactory.getLogger(FsJobArchivist.class);
private static final JsonFactory jacksonFactory = new JsonFactory();
- private static final ObjectMapper mapper = new ObjectMapper();
+ private static final ObjectMapper mapper = JacksonMapperFactory.createObjectMapper();
private static final String ARCHIVE = "archive";
private static final String PATH = "path";
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/util/RestMapperUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/util/RestMapperUtils.java
index 9a6bfba2f6e..eb42f41fabe 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/util/RestMapperUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/util/RestMapperUtils.java
@@ -18,6 +18,8 @@
package org.apache.flink.runtime.rest.util;
+import org.apache.flink.util.jackson.JacksonMapperFactory;
+
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.DeserializationFeature;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.SerializationFeature;
@@ -27,7 +29,7 @@ public class RestMapperUtils {
private static final ObjectMapper objectMapper;
static {
- objectMapper = new ObjectMapper();
+ objectMapper = JacksonMapperFactory.createObjectMapper();
objectMapper.enable(
DeserializationFeature.FAIL_ON_IGNORED_PROPERTIES,
DeserializationFeature.FAIL_ON_NULL_FOR_PRIMITIVES,
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/FileSystemJobResultStoreTestInternal.java b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/FileSystemJobResultStoreTestInternal.java
index 20d6f8a4c29..c952e5d4342 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/FileSystemJobResultStoreTestInternal.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/FileSystemJobResultStoreTestInternal.java
@@ -23,6 +23,7 @@ import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.jobmaster.JobResult;
import org.apache.flink.util.FileUtils;
import org.apache.flink.util.TestLoggerExtension;
+import org.apache.flink.util.jackson.JacksonMapperFactory;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
@@ -45,7 +46,7 @@ import static org.assertj.core.api.Assertions.assertThat;
@ExtendWith(TestLoggerExtension.class)
public class FileSystemJobResultStoreTestInternal {
- private static final ObjectMapper MAPPER = new ObjectMapper();
+ private static final ObjectMapper MAPPER = JacksonMapperFactory.createObjectMapper();
private FileSystemJobResultStore fileSystemJobResultStore;
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/jsonplan/JsonGeneratorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/jsonplan/JsonGeneratorTest.java
index a1a37b1d2cc..a7a3b942e2b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/jsonplan/JsonGeneratorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/jsonplan/JsonGeneratorTest.java
@@ -25,6 +25,7 @@ import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.operators.testutils.DummyInvokable;
+import org.apache.flink.util.jackson.JacksonMapperFactory;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
@@ -96,7 +97,7 @@ public class JsonGeneratorTest {
assertNotNull(plan);
// validate the produced JSON
- ObjectMapper m = new ObjectMapper();
+ ObjectMapper m = JacksonMapperFactory.createObjectMapper();
JsonNode rootNode = m.readTree(plan);
// core fields
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/taskmanager/TaskManagerDetailsHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/taskmanager/TaskManagerDetailsHandlerTest.java
index 5e2e14864d8..6d1b58fa3d8 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/taskmanager/TaskManagerDetailsHandlerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/taskmanager/TaskManagerDetailsHandlerTest.java
@@ -39,6 +39,7 @@ import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerInfo;
import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerMetricsInfo;
import org.apache.flink.runtime.taskexecutor.TaskExecutorMemoryConfiguration;
import org.apache.flink.testutils.TestingUtils;
+import org.apache.flink.util.jackson.JacksonMapperFactory;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
@@ -118,7 +119,7 @@ class TaskManagerDetailsHandlerTest {
20L,
Collections.emptyList());
- ObjectMapper objectMapper = new ObjectMapper();
+ ObjectMapper objectMapper = JacksonMapperFactory.createObjectMapper();
String actualJson = objectMapper.writeValueAsString(actual);
String expectedJson = objectMapper.writeValueAsString(expected);
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/json/JobResultDeserializerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/json/JobResultDeserializerTest.java
index 962b02c99f8..c9752f9fc34 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/json/JobResultDeserializerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/json/JobResultDeserializerTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.rest.messages.json;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.jobmaster.JobResult;
import org.apache.flink.util.TestLogger;
+import org.apache.flink.util.jackson.JacksonMapperFactory;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonMappingException;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
@@ -43,7 +44,7 @@ public class JobResultDeserializerTest extends TestLogger {
final SimpleModule simpleModule = new SimpleModule();
simpleModule.addDeserializer(JobResult.class, new JobResultDeserializer());
- objectMapper = new ObjectMapper();
+ objectMapper = JacksonMapperFactory.createObjectMapper();
objectMapper.registerModule(simpleModule);
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/json/SerializedThrowableSerializerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/json/SerializedThrowableSerializerTest.java
index ac66aa314c4..b167fa7fbcb 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/json/SerializedThrowableSerializerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/json/SerializedThrowableSerializerTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.rest.messages.json;
import org.apache.flink.util.SerializedThrowable;
import org.apache.flink.util.TestLogger;
+import org.apache.flink.util.jackson.JacksonMapperFactory;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.module.SimpleModule;
@@ -33,7 +34,7 @@ import static org.junit.Assert.assertTrue;
/** Tests for {@link SerializedThrowableSerializer} and {@link SerializedThrowableDeserializer}. */
public class SerializedThrowableSerializerTest extends TestLogger {
- private ObjectMapper objectMapper = new ObjectMapper();
+ private ObjectMapper objectMapper;
@Before
public void setUp() {
@@ -42,7 +43,7 @@ public class SerializedThrowableSerializerTest extends TestLogger {
SerializedThrowable.class, new SerializedThrowableDeserializer());
simpleModule.addSerializer(SerializedThrowable.class, new SerializedThrowableSerializer());
- objectMapper = new ObjectMapper();
+ objectMapper = JacksonMapperFactory.createObjectMapper();
objectMapper.registerModule(simpleModule);
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/json/SerializedValueSerializerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/json/SerializedValueSerializerTest.java
index ee3d57c00b4..6e6ec96cc64 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/json/SerializedValueSerializerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/json/SerializedValueSerializerTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.rest.messages.json;
import org.apache.flink.util.SerializedValue;
import org.apache.flink.util.TestLogger;
+import org.apache.flink.util.jackson.JacksonMapperFactory;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.type.TypeReference;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JavaType;
@@ -40,7 +41,7 @@ public class SerializedValueSerializerTest extends TestLogger {
@Before
public void setUp() {
- objectMapper = new ObjectMapper();
+ objectMapper = JacksonMapperFactory.createObjectMapper();
final SimpleModule simpleModule = new SimpleModule();
final JavaType serializedValueWildcardType =
objectMapper
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/JSONGenerator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/JSONGenerator.java
index 7047fd01e4a..b39243f27eb 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/JSONGenerator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/JSONGenerator.java
@@ -18,6 +18,7 @@
package org.apache.flink.streaming.api.graph;
import org.apache.flink.annotation.Internal;
+import org.apache.flink.util.jackson.JacksonMapperFactory;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ArrayNode;
@@ -45,7 +46,7 @@ public class JSONGenerator {
public static final String PARALLELISM = "parallelism";
private StreamGraph streamGraph;
- private final ObjectMapper mapper = new ObjectMapper();
+ private final ObjectMapper mapper = JacksonMapperFactory.createObjectMapper();
public JSONGenerator(StreamGraph streamGraph) {
this.streamGraph = streamGraph;
diff --git a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/connector/source/CompactPartitions.java b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/connector/source/CompactPartitions.java
index 333a3480b1a..57ce1970720 100644
--- a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/connector/source/CompactPartitions.java
+++ b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/connector/source/CompactPartitions.java
@@ -20,6 +20,7 @@ package org.apache.flink.table.connector.source;
import org.apache.flink.table.catalog.CatalogPartitionSpec;
import org.apache.flink.table.factories.DynamicTableFactory;
+import org.apache.flink.util.jackson.JacksonMapperFactory;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParser;
@@ -50,7 +51,7 @@ public class CompactPartitions implements Serializable {
private static final long serialVersionUID = 1L;
private static final String FIELD_NAME_COMPACT_PARTITIONS = "compact-partitions";
- private static final ObjectMapper MAPPER = new ObjectMapper();
+ private static final ObjectMapper MAPPER = JacksonMapperFactory.createObjectMapper();
private final List<CompactPartition> compactPartitions;
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/JsonSerdeUtil.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/JsonSerdeUtil.java
index 8a601601d3f..46fd71036b2 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/JsonSerdeUtil.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/JsonSerdeUtil.java
@@ -40,6 +40,7 @@ import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.extraction.ExtractionUtils;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.util.jackson.JacksonMapperFactory;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator;
@@ -100,7 +101,7 @@ public class JsonSerdeUtil {
private static final ObjectMapper OBJECT_MAPPER_INSTANCE;
static {
- OBJECT_MAPPER_INSTANCE = new ObjectMapper();
+ OBJECT_MAPPER_INSTANCE = JacksonMapperFactory.createObjectMapper();
OBJECT_MAPPER_INSTANCE.setTypeFactory(
// Make sure to register the classloader of the planner
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/LogicalTypeJsonSerdeTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/LogicalTypeJsonSerdeTest.java
index d27dbb5abe3..d571d753885 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/LogicalTypeJsonSerdeTest.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/LogicalTypeJsonSerdeTest.java
@@ -64,9 +64,9 @@ import org.apache.flink.table.types.logical.ZonedTimestampType;
import org.apache.flink.table.types.utils.DataTypeFactoryMock;
import org.apache.flink.table.utils.CatalogManagerMocks;
import org.apache.flink.types.Row;
+import org.apache.flink.util.jackson.JacksonMapperFactory;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.parallel.Execution;
@@ -139,8 +139,8 @@ public class LogicalTypeJsonSerdeTest {
// maximum plan content
tableConfig.set(TableConfigOptions.PLAN_COMPILE_CATALOG_OBJECTS, ALL);
final String maximumJson = toJson(serdeContext, STRUCTURED_TYPE);
- final ObjectMapper mapper = new ObjectMapper();
- final JsonNode maximumJsonNode = mapper.readTree(maximumJson);
+ final JsonNode maximumJsonNode =
+ JacksonMapperFactory.createObjectMapper().readTree(maximumJson);
assertThat(maximumJsonNode.get(LogicalTypeJsonSerializer.FIELD_NAME_ATTRIBUTES))
.isNotNull();
assertThat(maximumJsonNode.get(LogicalTypeJsonSerializer.FIELD_NAME_DESCRIPTION).asText())
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/PartitionSpecSerdeTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/PartitionSpecSerdeTest.java
index 159ef2c6ab6..b35f3b808fd 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/PartitionSpecSerdeTest.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/PartitionSpecSerdeTest.java
@@ -19,6 +19,7 @@
package org.apache.flink.table.planner.plan.nodes.exec.serde;
import org.apache.flink.table.planner.plan.nodes.exec.spec.PartitionSpec;
+import org.apache.flink.util.jackson.JacksonMapperFactory;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
@@ -30,20 +31,22 @@ import static org.assertj.core.api.Assertions.assertThat;
/** Test PartitionSpec json ser/de. */
public class PartitionSpecSerdeTest {
+ private static final ObjectMapper OBJECT_MAPPER = JacksonMapperFactory.createObjectMapper();
+
@Test
public void testPartitionSpec() throws JsonProcessingException {
PartitionSpec spec = new PartitionSpec(new int[] {1, 2, 3});
- ObjectMapper mapper = new ObjectMapper();
- assertThat(mapper.readValue(mapper.writeValueAsString(spec), PartitionSpec.class))
+ assertThat(
+ OBJECT_MAPPER.readValue(
+ OBJECT_MAPPER.writeValueAsString(spec), PartitionSpec.class))
.isEqualTo(spec);
}
@Test
public void testAllInOne() throws JsonProcessingException {
- ObjectMapper mapper = new ObjectMapper();
assertThat(
- mapper.readValue(
- mapper.writeValueAsString(PartitionSpec.ALL_IN_ONE),
+ OBJECT_MAPPER.readValue(
+ OBJECT_MAPPER.writeValueAsString(PartitionSpec.ALL_IN_ONE),
PartitionSpec.class))
.isEqualTo(PartitionSpec.ALL_IN_ONE);
}
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RankProcessStrategySerdeTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RankProcessStrategySerdeTest.java
index 82c1b3849fa..69c9e553d77 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RankProcessStrategySerdeTest.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RankProcessStrategySerdeTest.java
@@ -19,6 +19,7 @@
package org.apache.flink.table.planner.plan.nodes.exec.serde;
import org.apache.flink.table.planner.plan.utils.RankProcessStrategy;
+import org.apache.flink.util.jackson.JacksonMapperFactory;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
@@ -32,7 +33,7 @@ public class RankProcessStrategySerdeTest {
@Test
public void testRankRange() throws JsonProcessingException {
- ObjectMapper mapper = new ObjectMapper();
+ ObjectMapper mapper = JacksonMapperFactory.createObjectMapper();
RankProcessStrategy[] strategies =
new RankProcessStrategy[] {
RankProcessStrategy.UNDEFINED_STRATEGY,
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RankRangeSerdeTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RankRangeSerdeTest.java
index 90abd497f54..bc5e6a246ba 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RankRangeSerdeTest.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RankRangeSerdeTest.java
@@ -22,6 +22,7 @@ import org.apache.flink.table.runtime.operators.rank.ConstantRankRange;
import org.apache.flink.table.runtime.operators.rank.ConstantRankRangeWithoutEnd;
import org.apache.flink.table.runtime.operators.rank.RankRange;
import org.apache.flink.table.runtime.operators.rank.VariableRankRange;
+import org.apache.flink.util.jackson.JacksonMapperFactory;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
@@ -35,7 +36,7 @@ public class RankRangeSerdeTest {
@Test
public void testRankRange() throws JsonProcessingException {
- ObjectMapper mapper = new ObjectMapper();
+ ObjectMapper mapper = JacksonMapperFactory.createObjectMapper();
RankRange[] ranges =
new RankRange[] {
new ConstantRankRange(1, 2),
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RankTypeSerdeTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RankTypeSerdeTest.java
index e1daa9d03e3..04fa1552512 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RankTypeSerdeTest.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RankTypeSerdeTest.java
@@ -19,6 +19,7 @@
package org.apache.flink.table.planner.plan.nodes.exec.serde;
import org.apache.flink.table.runtime.operators.rank.RankType;
+import org.apache.flink.util.jackson.JacksonMapperFactory;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
@@ -32,7 +33,7 @@ public class RankTypeSerdeTest {
@Test
public void testRankType() throws JsonProcessingException {
- ObjectMapper mapper = new ObjectMapper();
+ ObjectMapper mapper = JacksonMapperFactory.createObjectMapper();
for (RankType type : RankType.values()) {
RankType result = mapper.readValue(mapper.writeValueAsString(type), RankType.class);
assertThat(result).isEqualTo(type);
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/SortSpecSerdeTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/SortSpecSerdeTest.java
index 2b5600980dd..af2190203b5 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/SortSpecSerdeTest.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/SortSpecSerdeTest.java
@@ -19,6 +19,7 @@
package org.apache.flink.table.planner.plan.nodes.exec.serde;
import org.apache.flink.table.planner.plan.nodes.exec.spec.SortSpec;
+import org.apache.flink.util.jackson.JacksonMapperFactory;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
@@ -29,6 +30,7 @@ import static org.assertj.core.api.Assertions.assertThat;
/** Test SortSpec json ser/de. */
public class SortSpecSerdeTest {
+ private static final ObjectMapper OBJECT_MAPPER = JacksonMapperFactory.createObjectMapper();
@Test
public void testSortSpec() throws JsonProcessingException {
@@ -39,15 +41,17 @@ public class SortSpecSerdeTest {
.addField(3, false, true)
.addField(4, false, false)
.build();
- ObjectMapper mapper = new ObjectMapper();
- assertThat(mapper.readValue(mapper.writeValueAsString(sortSpec), SortSpec.class))
+ assertThat(
+ OBJECT_MAPPER.readValue(
+ OBJECT_MAPPER.writeValueAsString(sortSpec), SortSpec.class))
.isEqualTo(sortSpec);
}
@Test
public void testAny() throws JsonProcessingException {
- ObjectMapper mapper = new ObjectMapper();
- assertThat(mapper.readValue(mapper.writeValueAsString(SortSpec.ANY), SortSpec.class))
+ assertThat(
+ OBJECT_MAPPER.readValue(
+ OBJECT_MAPPER.writeValueAsString(SortSpec.ANY), SortSpec.class))
.isEqualTo(SortSpec.ANY);
}
}
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/utils/JsonTestUtils.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/utils/JsonTestUtils.java
index 682f3b3e656..6aa9f96c7d0 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/utils/JsonTestUtils.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/utils/JsonTestUtils.java
@@ -19,6 +19,7 @@
package org.apache.flink.table.planner.utils;
import org.apache.flink.FlinkVersion;
+import org.apache.flink.util.jackson.JacksonMapperFactory;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
@@ -29,7 +30,8 @@ import java.io.IOException;
/** This class contains a collection of generic utilities to deal with JSON in tests. */
public final class JsonTestUtils {
- private static final ObjectMapper OBJECT_MAPPER_INSTANCE = new ObjectMapper();
+ private static final ObjectMapper OBJECT_MAPPER_INSTANCE =
+ JacksonMapperFactory.createObjectMapper();
private JsonTestUtils() {}
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala
index fd2fd085519..35499ba9439 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala
@@ -66,6 +66,7 @@ import org.apache.flink.table.types.utils.TypeConversions
import org.apache.flink.table.typeutils.FieldInfoUtils
import org.apache.flink.types.Row
import org.apache.flink.util.{FlinkUserCodeClassLoaders, MutableURLClassLoader}
+import org.apache.flink.util.jackson.JacksonMapperFactory
import _root_.java.math.{BigDecimal => JBigDecimal}
import _root_.java.util
@@ -1612,6 +1613,8 @@ object PlanKind extends Enumeration {
object TableTestUtil {
+ private val objectMapper = JacksonMapperFactory.createObjectMapper()
+
val STREAM_SETTING: EnvironmentSettings =
EnvironmentSettings.newInstance().inStreamingMode().build()
val BATCH_SETTING: EnvironmentSettings = EnvironmentSettings.newInstance().inBatchMode().build()
@@ -1706,14 +1709,14 @@ object TableTestUtil {
@throws[IOException]
def getFormattedJson(json: String): String = {
- val parser = new ObjectMapper().getFactory.createParser(json)
+ val parser = objectMapper.getFactory.createParser(json)
val jsonNode: JsonNode = parser.readValueAsTree[JsonNode]
jsonNode.toString
}
@throws[IOException]
def getPrettyJson(json: String): String = {
- val parser = new ObjectMapper().getFactory.createParser(json)
+ val parser = objectMapper.getFactory.createParser(json)
val jsonNode: JsonNode = parser.readValueAsTree[JsonNode]
jsonNode.toPrettyString
}
diff --git a/flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/JsonJobGraphGenerationTest.java b/flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/JsonJobGraphGenerationTest.java
index 1bbf8d35c42..3e7a589f779 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/JsonJobGraphGenerationTest.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/JsonJobGraphGenerationTest.java
@@ -32,6 +32,7 @@ import org.apache.flink.optimizer.plan.OptimizedPlan;
import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.jsonplan.JsonPlanGenerator;
+import org.apache.flink.util.jackson.JacksonMapperFactory;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonFactory;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParser;
@@ -245,6 +246,8 @@ public class JsonJobGraphGenerationTest {
private static class GenericValidator implements JsonValidator {
+ private static final ObjectMapper OBJECT_MAPPER = JacksonMapperFactory.createObjectMapper();
+
private final int expectedParallelism;
private final int numNodes;
@@ -258,8 +261,7 @@ public class JsonJobGraphGenerationTest {
final Map<String, JsonNode> idToNode = new HashMap<>();
// validate the produced JSON
- ObjectMapper m = new ObjectMapper();
- JsonNode rootNode = m.readTree(json);
+ JsonNode rootNode = OBJECT_MAPPER.readTree(json);
JsonNode idField = rootNode.get("jid");
JsonNode nameField = rootNode.get("name");