You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2022/07/28 12:28:55 UTC

[flink] branch master updated (e122dec43ed -> db4f7781173)

This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


    from e122dec43ed [FLINK-28523][tests] Increase Zookeeper session timeouts
     new ae45bd3c50a [FLINK-28634][json] Add simple JsonSerDeSchema
     new db4f7781173 [FLINK-28634][json] Deprecate JsonNodeDeserializationSchema

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../docs/connectors/datastream/formats/json.md     |  77 ++++++++++++++
 .../docs/connectors/datastream/formats/json.md     |  77 ++++++++++++++
 .../KafkaRecordDeserializationSchemaTest.java      |  13 ++-
 flink-formats/flink-json/pom.xml                   |   6 ++
 .../formats/json/JsonDeserializationSchema.java    |  70 +++++++++++++
 .../json/JsonNodeDeserializationSchema.java        |  21 ++--
 .../formats/json/JsonSerializationSchema.java      |  59 +++++++++++
 .../json/JsonNodeDeserializationSchemaTest.java    |   4 +
 .../flink/formats/json/JsonSerDeSchemaTest.java    | 111 +++++++++++++++++++++
 .../formats/DummyInitializationContext.java        |  33 ++----
 10 files changed, 429 insertions(+), 42 deletions(-)
 create mode 100644 docs/content.zh/docs/connectors/datastream/formats/json.md
 create mode 100644 docs/content/docs/connectors/datastream/formats/json.md
 create mode 100644 flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonDeserializationSchema.java
 create mode 100644 flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonSerializationSchema.java
 create mode 100644 flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonSerDeSchemaTest.java
 copy flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/InitContextInitializationContextAdapter.java => flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testutils/formats/DummyInitializationContext.java (53%)


[flink] 02/02: [FLINK-28634][json] Deprecate JsonNodeDeserializationSchema

Posted by ch...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit db4f7781173bdc5790f7b3e1fd3eeebfa4b31fc9
Author: Chesnay Schepler <ch...@apache.org>
AuthorDate: Mon Jul 25 15:50:44 2022 +0200

    [FLINK-28634][json] Deprecate JsonNodeDeserializationSchema
    
    Subsumed by more general 'JsonDeserializationSchema'.
---
 .../deserializer/KafkaRecordDeserializationSchemaTest.java  | 13 ++++++++-----
 .../flink/formats/json/JsonNodeDeserializationSchema.java   |  4 +++-
 .../formats/json/JsonNodeDeserializationSchemaTest.java     |  1 +
 3 files changed, 12 insertions(+), 6 deletions(-)

diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/deserializer/KafkaRecordDeserializationSchemaTest.java b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/deserializer/KafkaRecordDeserializationSchemaTest.java
index b0b898fb5ad..c2e735a0c13 100644
--- a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/deserializer/KafkaRecordDeserializationSchemaTest.java
+++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/deserializer/KafkaRecordDeserializationSchemaTest.java
@@ -18,8 +18,9 @@
 
 package org.apache.flink.connector.kafka.source.reader.deserializer;
 
+import org.apache.flink.connector.testutils.formats.DummyInitializationContext;
 import org.apache.flink.connector.testutils.source.deserialization.TestingDeserializationContext;
-import org.apache.flink.formats.json.JsonNodeDeserializationSchema;
+import org.apache.flink.formats.json.JsonDeserializationSchema;
 import org.apache.flink.streaming.util.serialization.JSONKeyValueDeserializationSchema;
 import org.apache.flink.util.Collector;
 
@@ -35,7 +36,6 @@ import org.apache.kafka.common.serialization.StringSerializer;
 import org.junit.Before;
 import org.junit.Test;
 
-import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
@@ -58,10 +58,11 @@ public class KafkaRecordDeserializationSchemaTest {
     }
 
     @Test
-    public void testKafkaDeserializationSchemaWrapper() throws IOException {
+    public void testKafkaDeserializationSchemaWrapper() throws Exception {
         final ConsumerRecord<byte[], byte[]> consumerRecord = getConsumerRecord();
         KafkaRecordDeserializationSchema<ObjectNode> schema =
                 KafkaRecordDeserializationSchema.of(new JSONKeyValueDeserializationSchema(true));
+        schema.open(new DummyInitializationContext());
         SimpleCollector<ObjectNode> collector = new SimpleCollector<>();
         schema.deserialize(consumerRecord, collector);
 
@@ -76,10 +77,12 @@ public class KafkaRecordDeserializationSchemaTest {
     }
 
     @Test
-    public void testKafkaValueDeserializationSchemaWrapper() throws IOException {
+    public void testKafkaValueDeserializationSchemaWrapper() throws Exception {
         final ConsumerRecord<byte[], byte[]> consumerRecord = getConsumerRecord();
         KafkaRecordDeserializationSchema<ObjectNode> schema =
-                KafkaRecordDeserializationSchema.valueOnly(new JsonNodeDeserializationSchema());
+                KafkaRecordDeserializationSchema.valueOnly(
+                        new JsonDeserializationSchema<>(ObjectNode.class));
+        schema.open(new DummyInitializationContext());
         SimpleCollector<ObjectNode> collector = new SimpleCollector<>();
         schema.deserialize(consumerRecord, collector);
 
diff --git a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonNodeDeserializationSchema.java b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonNodeDeserializationSchema.java
index 36aa4843f79..928a6f1e59d 100644
--- a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonNodeDeserializationSchema.java
+++ b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonNodeDeserializationSchema.java
@@ -23,8 +23,10 @@ import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.Obje
  * DeserializationSchema that deserializes a JSON String into an ObjectNode.
  *
  * <p>Fields can be accessed by calling objectNode.get(&lt;name>).as(&lt;type>)
+ *
+ * @deprecated Use {@code new JsonDeserializationSchema(ObjectNode.class)} instead
  */
-@PublicEvolving
+@Deprecated
 public class JsonNodeDeserializationSchema extends JsonDeserializationSchema<ObjectNode> {
 
     private static final long serialVersionUID = 2L;
diff --git a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonNodeDeserializationSchemaTest.java b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonNodeDeserializationSchemaTest.java
index 2bd8dc5d68b..90751525feb 100644
--- a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonNodeDeserializationSchemaTest.java
+++ b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonNodeDeserializationSchemaTest.java
@@ -29,6 +29,7 @@ import java.io.IOException;
 import static org.assertj.core.api.Assertions.assertThat;
 
 /** Tests for the {@link JsonNodeDeserializationSchema}. */
+@SuppressWarnings("deprecation")
 class JsonNodeDeserializationSchemaTest {
 
     @Test


[flink] 01/02: [FLINK-28634][json] Add simple JsonSerDeSchema

Posted by ch...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit ae45bd3c50abf8b2621a5de31410ae381f7ffa04
Author: Chesnay Schepler <ch...@apache.org>
AuthorDate: Mon Jul 25 15:50:07 2022 +0200

    [FLINK-28634][json] Add simple JsonSerDeSchema
---
 .../docs/connectors/datastream/formats/json.md     |  77 ++++++++++++++
 .../docs/connectors/datastream/formats/json.md     |  77 ++++++++++++++
 flink-formats/flink-json/pom.xml                   |   6 ++
 .../formats/json/JsonDeserializationSchema.java    |  70 +++++++++++++
 .../json/JsonNodeDeserializationSchema.java        |  17 +---
 .../formats/json/JsonSerializationSchema.java      |  59 +++++++++++
 .../json/JsonNodeDeserializationSchemaTest.java    |   3 +
 .../flink/formats/json/JsonSerDeSchemaTest.java    | 111 +++++++++++++++++++++
 .../formats/DummyInitializationContext.java        |  41 ++++++++
 9 files changed, 448 insertions(+), 13 deletions(-)

diff --git a/docs/content.zh/docs/connectors/datastream/formats/json.md b/docs/content.zh/docs/connectors/datastream/formats/json.md
new file mode 100644
index 00000000000..232c45441f3
--- /dev/null
+++ b/docs/content.zh/docs/connectors/datastream/formats/json.md
@@ -0,0 +1,77 @@
+---
+title:  "JSON"
+weight: 4
+type: docs
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# Json format
+
+To use the JSON format you need to add the Flink JSON dependency to your project:
+
+```xml
+<dependency>
+	<groupId>org.apache.flink</groupId>
+	<artifactId>flink-json</artifactId>
+	<version>{{< version >}}</version>
+	<scope>provided</scope>
+</dependency>
+```
+
+Flink supports reading/writing JSON records via the `JsonSerializationSchema/JsonDeserializationSchema`.
+These utilize the [Jackson](https://github.com/FasterXML/jackson) library, and support any type that is supported by Jackson, including, but not limited to, `POJO`s and `ObjectNode`.
+
+The `JsonDeserializationSchema` can be used with any connector that supports the `DeserializationSchema`.
+
+For example, this is how you use it with a `KafkaSource` to deserialize a `POJO`:
+
+```java
+JsonDeserializationSchema<SomePojo> jsonFormat = new JsonDeserializationSchema<>(SomePojo.class);
+KafkaSource<SomePojo> source =
+    KafkaSource.<SomePojo>builder()
+        .setValueOnlyDeserializer(jsonFormat)
+        ...
+```
+
+The `JsonSerializationSchema` can be used with any connector that supports the `SerializationSchema`.
+
+For example, this is how you use it with a `KafkaSink` to serialize a `POJO`:
+
+```java
+JsonSerializationSchema<SomePojo> jsonFormat = new JsonSerializationSchema<>();
+KafkaSink<SomePojo> source =
+    KafkaSink.<SomePojo>builder()
+        .setRecordSerializer(
+            new KafkaRecordSerializationSchemaBuilder<>()
+                .setValueSerializationSchema(jsonFormat)
+                ...
+```
+
+## Custom Mapper
+
+Both schemas have constructors that accept a `SerializableSupplier<ObjectMapper>`, acting a factory for object mappers.
+With this factory you gain full control over the created mapper, and can enable/disable various Jackson features or register modules to extend the set of supported types or add additional functionality.
+
+```java
+JsonSerializationSchema<SomeClass> jsonFormat = new JsonSerializationSchema<>(
+    () -> new ObjectMapper()
+        .enable(SerializationFeature.ORDER_MAP_ENTRIES_BY_KEYS))
+        .registerModule(new ParameterNamesModule());
+```
\ No newline at end of file
diff --git a/docs/content/docs/connectors/datastream/formats/json.md b/docs/content/docs/connectors/datastream/formats/json.md
new file mode 100644
index 00000000000..57c97cb6b20
--- /dev/null
+++ b/docs/content/docs/connectors/datastream/formats/json.md
@@ -0,0 +1,77 @@
+---
+title:  "JSON"
+weight: 4
+type: docs
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# Json format
+
+To use the JSON format you need to add the Flink JSON dependency to your project:
+
+```xml
+<dependency>
+	<groupId>org.apache.flink</groupId>
+	<artifactId>flink-json</artifactId>
+	<version>{{< version >}}</version>
+	<scope>provided</scope>
+</dependency>
+```
+
+Flink supports reading/writing JSON records via the `JsonSerializationSchema/JsonDeserializationSchema`.
+These utilize the [Jackson](https://github.com/FasterXML/jackson) library, and support any type that is supported by Jackson, including, but not limited to, `POJO`s and `ObjectNode`.
+
+The `JsonDeserializationSchema` can be used with any connector that supports the `DeserializationSchema`.
+
+For example, this is how you use it with a `KafkaSource` to deserialize a `POJO`:
+
+```java
+JsonDeserializationSchema<SomePojo> jsonFormat=new JsonDeserializationSchema<>(SomePojo.class);
+KafkaSource<SomePojo> source=
+    KafkaSource.<SomePojo>builder()
+        .setValueOnlyDeserializer(jsonFormat)
+        ...
+```
+
+The `JsonSerializationSchema` can be used with any connector that supports the `SerializationSchema`.
+
+For example, this is how you use it with a `KafkaSink` to serialize a `POJO`:
+
+```java
+JsonSerializationSchema<SomePojo> jsonFormat=new JsonSerializationSchema<>();
+KafkaSink<SomePojo> source  = 
+    KafkaSink.<SomePojo>builder()
+        .setRecordSerializer(
+            new KafkaRecordSerializationSchemaBuilder<>()
+                .setValueSerializationSchema(jsonFormat)
+                ...
+```
+
+## Custom Mapper
+
+Both schemas have constructors that accept a `SerializableSupplier<ObjectMapper>`, acting a factory for object mappers.
+With this factory you gain full control over the created mapper, and can enable/disable various Jackson features or register modules to extend the set of supported types or add additional functionality.
+
+```java
+JsonSerializationSchema<SomeClass> jsonFormat=new JsonSerializationSchema<>(
+    () -> new ObjectMapper()
+        .enable(SerializationFeature.ORDER_MAP_ENTRIES_BY_KEYS))
+        .registerModule(new ParameterNamesModule());
+```
\ No newline at end of file
diff --git a/flink-formats/flink-json/pom.xml b/flink-formats/flink-json/pom.xml
index 28c7b4629b9..ab697cf7ed9 100644
--- a/flink-formats/flink-json/pom.xml
+++ b/flink-formats/flink-json/pom.xml
@@ -109,6 +109,12 @@ under the License.
 		</dependency>
 
 		<!-- test utils dependency -->
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-connector-test-utils</artifactId>
+			<version>${project.version}</version>
+			<scope>test</scope>
+		</dependency>
 		<dependency>
 			<groupId>org.apache.flink</groupId>
 			<artifactId>flink-test-utils</artifactId>
diff --git a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonDeserializationSchema.java b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonDeserializationSchema.java
new file mode 100644
index 00000000000..fd28712d56a
--- /dev/null
+++ b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonDeserializationSchema.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.json;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.serialization.AbstractDeserializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.util.function.SerializableSupplier;
+
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+
+import java.io.IOException;
+
+/** DeserializationSchema that deserializes a JSON String. */
+@PublicEvolving
+public class JsonDeserializationSchema<T> extends AbstractDeserializationSchema<T> {
+
+    private static final long serialVersionUID = 1L;
+
+    private final Class<T> clazz;
+    private final SerializableSupplier<ObjectMapper> mapperFactory;
+    protected transient ObjectMapper mapper;
+
+    public JsonDeserializationSchema(Class<T> clazz) {
+        this(clazz, () -> new ObjectMapper());
+    }
+
+    public JsonDeserializationSchema(TypeInformation<T> typeInformation) {
+        this(typeInformation, () -> new ObjectMapper());
+    }
+
+    public JsonDeserializationSchema(
+            Class<T> clazz, SerializableSupplier<ObjectMapper> mapperFactory) {
+        super(clazz);
+        this.clazz = clazz;
+        this.mapperFactory = mapperFactory;
+    }
+
+    public JsonDeserializationSchema(
+            TypeInformation<T> typeInformation, SerializableSupplier<ObjectMapper> mapperFactory) {
+        super(typeInformation);
+        this.clazz = typeInformation.getTypeClass();
+        this.mapperFactory = mapperFactory;
+    }
+
+    @Override
+    public void open(InitializationContext context) {
+        mapper = mapperFactory.get();
+    }
+
+    @Override
+    public T deserialize(byte[] message) throws IOException {
+        return mapper.readValue(message, clazz);
+    }
+}
diff --git a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonNodeDeserializationSchema.java b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonNodeDeserializationSchema.java
index 55c61e1a6ba..36aa4843f79 100644
--- a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonNodeDeserializationSchema.java
+++ b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonNodeDeserializationSchema.java
@@ -17,28 +17,19 @@
 
 package org.apache.flink.formats.json;
 
-import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.api.common.serialization.AbstractDeserializationSchema;
-
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
 
-import java.io.IOException;
-
 /**
  * DeserializationSchema that deserializes a JSON String into an ObjectNode.
  *
  * <p>Fields can be accessed by calling objectNode.get(&lt;name>).as(&lt;type>)
  */
 @PublicEvolving
-public class JsonNodeDeserializationSchema extends AbstractDeserializationSchema<ObjectNode> {
-
-    private static final long serialVersionUID = -1699854177598621044L;
+public class JsonNodeDeserializationSchema extends JsonDeserializationSchema<ObjectNode> {
 
-    private final ObjectMapper mapper = new ObjectMapper();
+    private static final long serialVersionUID = 2L;
 
-    @Override
-    public ObjectNode deserialize(byte[] message) throws IOException {
-        return mapper.readValue(message, ObjectNode.class);
+    public JsonNodeDeserializationSchema() {
+        super(ObjectNode.class);
     }
 }
diff --git a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonSerializationSchema.java b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonSerializationSchema.java
new file mode 100644
index 00000000000..c029fa1dc9c
--- /dev/null
+++ b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonSerializationSchema.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.json;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.util.function.SerializableSupplier;
+
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+
+/** SerializationSchema that serializes an object to a JSON String. */
+@PublicEvolving
+public class JsonSerializationSchema<T> implements SerializationSchema<T> {
+
+    private static final long serialVersionUID = 1L;
+
+    private final SerializableSupplier<ObjectMapper> mapperFactory;
+
+    protected transient ObjectMapper mapper;
+
+    public JsonSerializationSchema() {
+        this(() -> new ObjectMapper());
+    }
+
+    public JsonSerializationSchema(SerializableSupplier<ObjectMapper> mapperFactory) {
+        this.mapperFactory = mapperFactory;
+    }
+
+    @Override
+    public void open(InitializationContext context) {
+        mapper = mapperFactory.get();
+    }
+
+    @Override
+    public byte[] serialize(T element) {
+        try {
+            return mapper.writeValueAsBytes(element);
+        } catch (JsonProcessingException e) {
+            throw new RuntimeException(
+                    String.format("Could not serialize value '%s'.", element), e);
+        }
+    }
+}
diff --git a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonNodeDeserializationSchemaTest.java b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonNodeDeserializationSchemaTest.java
index 741b492dedf..2bd8dc5d68b 100644
--- a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonNodeDeserializationSchemaTest.java
+++ b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonNodeDeserializationSchemaTest.java
@@ -17,6 +17,8 @@
 
 package org.apache.flink.formats.json;
 
+import org.apache.flink.connector.testutils.formats.DummyInitializationContext;
+
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
 
@@ -37,6 +39,7 @@ class JsonNodeDeserializationSchemaTest {
         byte[] serializedValue = mapper.writeValueAsBytes(initialValue);
 
         JsonNodeDeserializationSchema schema = new JsonNodeDeserializationSchema();
+        schema.open(new DummyInitializationContext());
         ObjectNode deserializedValue = schema.deserialize(serializedValue);
 
         assertThat(deserializedValue.get("key").asInt()).isEqualTo(4);
diff --git a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonSerDeSchemaTest.java b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonSerDeSchemaTest.java
new file mode 100644
index 00000000000..5ed992c1eb1
--- /dev/null
+++ b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonSerDeSchemaTest.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.json;
+
+import org.apache.flink.connector.testutils.formats.DummyInitializationContext;
+
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.Objects;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+class JsonSerDeSchemaTest {
+    private static final JsonSerializationSchema<Event> SERIALIZATION_SCHEMA;
+    private static final JsonDeserializationSchema<Event> DESERIALIZATION_SCHEMA;
+    private static final String JSON = "{\"x\":34,\"y\":\"hello\"}";
+
+    static {
+        SERIALIZATION_SCHEMA = new JsonSerializationSchema<>();
+        SERIALIZATION_SCHEMA.open(new DummyInitializationContext());
+        DESERIALIZATION_SCHEMA = new JsonDeserializationSchema<>(Event.class);
+        DESERIALIZATION_SCHEMA.open(new DummyInitializationContext());
+    }
+
+    @Test
+    void testSrialization() throws IOException {
+        final byte[] serialized = SERIALIZATION_SCHEMA.serialize(new Event(34, "hello"));
+        assertThat(serialized).isEqualTo(JSON.getBytes(StandardCharsets.UTF_8));
+    }
+
+    @Test
+    void testDeserialization() throws IOException {
+        final Event deserialized =
+                DESERIALIZATION_SCHEMA.deserialize(JSON.getBytes(StandardCharsets.UTF_8));
+        assertThat(deserialized).isEqualTo(new Event(34, "hello"));
+    }
+
+    @Test
+    void testRoundTrip() throws IOException {
+        final Event original = new Event(34, "hello");
+
+        final byte[] serialized = SERIALIZATION_SCHEMA.serialize(original);
+
+        final Event deserialized = DESERIALIZATION_SCHEMA.deserialize(serialized);
+
+        assertThat(deserialized).isEqualTo(original);
+    }
+
+    private static class Event {
+
+        private int x;
+        private String y = null;
+
+        public Event() {}
+
+        public Event(int x, String y) {
+            this.x = x;
+            this.y = y;
+        }
+
+        public int getX() {
+            return x;
+        }
+
+        public void setX(int x) {
+            this.x = x;
+        }
+
+        public String getY() {
+            return y;
+        }
+
+        public void setY(String y) {
+            this.y = y;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) {
+                return true;
+            }
+            if (o == null || getClass() != o.getClass()) {
+                return false;
+            }
+            Event event = (Event) o;
+            return x == event.x && Objects.equals(y, event.y);
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(x, y);
+        }
+    }
+}
diff --git a/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testutils/formats/DummyInitializationContext.java b/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testutils/formats/DummyInitializationContext.java
new file mode 100644
index 00000000000..6335f0d3fc9
--- /dev/null
+++ b/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testutils/formats/DummyInitializationContext.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.testutils.formats;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
+import org.apache.flink.util.SimpleUserCodeClassLoader;
+import org.apache.flink.util.UserCodeClassLoader;
+
+/** A dummy context for serialization schemas. */
+public class DummyInitializationContext
+        implements SerializationSchema.InitializationContext,
+                DeserializationSchema.InitializationContext {
+
+    @Override
+    public MetricGroup getMetricGroup() {
+        return new UnregisteredMetricsGroup();
+    }
+
+    @Override
+    public UserCodeClassLoader getUserCodeClassLoader() {
+        return SimpleUserCodeClassLoader.create(DummyInitializationContext.class.getClassLoader());
+    }
+}