You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2022/07/28 12:28:56 UTC
[flink] 01/02: [FLINK-28634][json] Add simple JsonSerDeSchema
This is an automated email from the ASF dual-hosted git repository.
chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit ae45bd3c50abf8b2621a5de31410ae381f7ffa04
Author: Chesnay Schepler <ch...@apache.org>
AuthorDate: Mon Jul 25 15:50:07 2022 +0200
[FLINK-28634][json] Add simple JsonSerDeSchema
---
.../docs/connectors/datastream/formats/json.md | 77 ++++++++++++++
.../docs/connectors/datastream/formats/json.md | 77 ++++++++++++++
flink-formats/flink-json/pom.xml | 6 ++
.../formats/json/JsonDeserializationSchema.java | 70 +++++++++++++
.../json/JsonNodeDeserializationSchema.java | 17 +---
.../formats/json/JsonSerializationSchema.java | 59 +++++++++++
.../json/JsonNodeDeserializationSchemaTest.java | 3 +
.../flink/formats/json/JsonSerDeSchemaTest.java | 111 +++++++++++++++++++++
.../formats/DummyInitializationContext.java | 41 ++++++++
9 files changed, 448 insertions(+), 13 deletions(-)
diff --git a/docs/content.zh/docs/connectors/datastream/formats/json.md b/docs/content.zh/docs/connectors/datastream/formats/json.md
new file mode 100644
index 00000000000..232c45441f3
--- /dev/null
+++ b/docs/content.zh/docs/connectors/datastream/formats/json.md
@@ -0,0 +1,77 @@
+---
+title: "JSON"
+weight: 4
+type: docs
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# Json format
+
+To use the JSON format you need to add the Flink JSON dependency to your project:
+
+```xml
+<dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-json</artifactId>
+ <version>{{< version >}}</version>
+ <scope>provided</scope>
+</dependency>
+```
+
+Flink supports reading/writing JSON records via the `JsonSerializationSchema/JsonDeserializationSchema`.
+These utilize the [Jackson](https://github.com/FasterXML/jackson) library, and support any type that is supported by Jackson, including, but not limited to, `POJO`s and `ObjectNode`.
+
+The `JsonDeserializationSchema` can be used with any connector that supports the `DeserializationSchema`.
+
+For example, this is how you use it with a `KafkaSource` to deserialize a `POJO`:
+
+```java
+JsonDeserializationSchema<SomePojo> jsonFormat = new JsonDeserializationSchema<>(SomePojo.class);
+KafkaSource<SomePojo> source =
+ KafkaSource.<SomePojo>builder()
+ .setValueOnlyDeserializer(jsonFormat)
+ ...
+```
+
+The `JsonSerializationSchema` can be used with any connector that supports the `SerializationSchema`.
+
+For example, this is how you use it with a `KafkaSink` to serialize a `POJO`:
+
+```java
+JsonSerializationSchema<SomePojo> jsonFormat = new JsonSerializationSchema<>();
+KafkaSink<SomePojo> source =
+ KafkaSink.<SomePojo>builder()
+ .setRecordSerializer(
+ new KafkaRecordSerializationSchemaBuilder<>()
+ .setValueSerializationSchema(jsonFormat)
+ ...
+```
+
+## Custom Mapper
+
+Both schemas have constructors that accept a `SerializableSupplier<ObjectMapper>`, acting a factory for object mappers.
+With this factory you gain full control over the created mapper, and can enable/disable various Jackson features or register modules to extend the set of supported types or add additional functionality.
+
+```java
+JsonSerializationSchema<SomeClass> jsonFormat = new JsonSerializationSchema<>(
+ () -> new ObjectMapper()
+ .enable(SerializationFeature.ORDER_MAP_ENTRIES_BY_KEYS))
+ .registerModule(new ParameterNamesModule());
+```
\ No newline at end of file
diff --git a/docs/content/docs/connectors/datastream/formats/json.md b/docs/content/docs/connectors/datastream/formats/json.md
new file mode 100644
index 00000000000..57c97cb6b20
--- /dev/null
+++ b/docs/content/docs/connectors/datastream/formats/json.md
@@ -0,0 +1,77 @@
+---
+title: "JSON"
+weight: 4
+type: docs
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# Json format
+
+To use the JSON format you need to add the Flink JSON dependency to your project:
+
+```xml
+<dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-json</artifactId>
+ <version>{{< version >}}</version>
+ <scope>provided</scope>
+</dependency>
+```
+
+Flink supports reading/writing JSON records via the `JsonSerializationSchema/JsonDeserializationSchema`.
+These utilize the [Jackson](https://github.com/FasterXML/jackson) library, and support any type that is supported by Jackson, including, but not limited to, `POJO`s and `ObjectNode`.
+
+The `JsonDeserializationSchema` can be used with any connector that supports the `DeserializationSchema`.
+
+For example, this is how you use it with a `KafkaSource` to deserialize a `POJO`:
+
+```java
+JsonDeserializationSchema<SomePojo> jsonFormat=new JsonDeserializationSchema<>(SomePojo.class);
+KafkaSource<SomePojo> source=
+ KafkaSource.<SomePojo>builder()
+ .setValueOnlyDeserializer(jsonFormat)
+ ...
+```
+
+The `JsonSerializationSchema` can be used with any connector that supports the `SerializationSchema`.
+
+For example, this is how you use it with a `KafkaSink` to serialize a `POJO`:
+
+```java
+JsonSerializationSchema<SomePojo> jsonFormat=new JsonSerializationSchema<>();
+KafkaSink<SomePojo> source =
+ KafkaSink.<SomePojo>builder()
+ .setRecordSerializer(
+ new KafkaRecordSerializationSchemaBuilder<>()
+ .setValueSerializationSchema(jsonFormat)
+ ...
+```
+
+## Custom Mapper
+
+Both schemas have constructors that accept a `SerializableSupplier<ObjectMapper>`, acting a factory for object mappers.
+With this factory you gain full control over the created mapper, and can enable/disable various Jackson features or register modules to extend the set of supported types or add additional functionality.
+
+```java
+JsonSerializationSchema<SomeClass> jsonFormat=new JsonSerializationSchema<>(
+ () -> new ObjectMapper()
+ .enable(SerializationFeature.ORDER_MAP_ENTRIES_BY_KEYS))
+ .registerModule(new ParameterNamesModule());
+```
\ No newline at end of file
diff --git a/flink-formats/flink-json/pom.xml b/flink-formats/flink-json/pom.xml
index 28c7b4629b9..ab697cf7ed9 100644
--- a/flink-formats/flink-json/pom.xml
+++ b/flink-formats/flink-json/pom.xml
@@ -109,6 +109,12 @@ under the License.
</dependency>
<!-- test utils dependency -->
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-connector-test-utils</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-test-utils</artifactId>
diff --git a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonDeserializationSchema.java b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonDeserializationSchema.java
new file mode 100644
index 00000000000..fd28712d56a
--- /dev/null
+++ b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonDeserializationSchema.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.json;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.serialization.AbstractDeserializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.util.function.SerializableSupplier;
+
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+
+import java.io.IOException;
+
+/** DeserializationSchema that deserializes a JSON String. */
+@PublicEvolving
+public class JsonDeserializationSchema<T> extends AbstractDeserializationSchema<T> {
+
+ private static final long serialVersionUID = 1L;
+
+ private final Class<T> clazz;
+ private final SerializableSupplier<ObjectMapper> mapperFactory;
+ protected transient ObjectMapper mapper;
+
+ public JsonDeserializationSchema(Class<T> clazz) {
+ this(clazz, () -> new ObjectMapper());
+ }
+
+ public JsonDeserializationSchema(TypeInformation<T> typeInformation) {
+ this(typeInformation, () -> new ObjectMapper());
+ }
+
+ public JsonDeserializationSchema(
+ Class<T> clazz, SerializableSupplier<ObjectMapper> mapperFactory) {
+ super(clazz);
+ this.clazz = clazz;
+ this.mapperFactory = mapperFactory;
+ }
+
+ public JsonDeserializationSchema(
+ TypeInformation<T> typeInformation, SerializableSupplier<ObjectMapper> mapperFactory) {
+ super(typeInformation);
+ this.clazz = typeInformation.getTypeClass();
+ this.mapperFactory = mapperFactory;
+ }
+
+ @Override
+ public void open(InitializationContext context) {
+ mapper = mapperFactory.get();
+ }
+
+ @Override
+ public T deserialize(byte[] message) throws IOException {
+ return mapper.readValue(message, clazz);
+ }
+}
diff --git a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonNodeDeserializationSchema.java b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonNodeDeserializationSchema.java
index 55c61e1a6ba..36aa4843f79 100644
--- a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonNodeDeserializationSchema.java
+++ b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonNodeDeserializationSchema.java
@@ -17,28 +17,19 @@
package org.apache.flink.formats.json;
-import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.api.common.serialization.AbstractDeserializationSchema;
-
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
-import java.io.IOException;
-
/**
* DeserializationSchema that deserializes a JSON String into an ObjectNode.
*
* <p>Fields can be accessed by calling objectNode.get(<name>).as(<type>)
*/
@PublicEvolving
-public class JsonNodeDeserializationSchema extends AbstractDeserializationSchema<ObjectNode> {
-
- private static final long serialVersionUID = -1699854177598621044L;
+public class JsonNodeDeserializationSchema extends JsonDeserializationSchema<ObjectNode> {
- private final ObjectMapper mapper = new ObjectMapper();
+ private static final long serialVersionUID = 2L;
- @Override
- public ObjectNode deserialize(byte[] message) throws IOException {
- return mapper.readValue(message, ObjectNode.class);
+ public JsonNodeDeserializationSchema() {
+ super(ObjectNode.class);
}
}
diff --git a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonSerializationSchema.java b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonSerializationSchema.java
new file mode 100644
index 00000000000..c029fa1dc9c
--- /dev/null
+++ b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonSerializationSchema.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.json;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.util.function.SerializableSupplier;
+
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+
+/** SerializationSchema that serializes an object to a JSON String. */
+@PublicEvolving
+public class JsonSerializationSchema<T> implements SerializationSchema<T> {
+
+ private static final long serialVersionUID = 1L;
+
+ private final SerializableSupplier<ObjectMapper> mapperFactory;
+
+ protected transient ObjectMapper mapper;
+
+ public JsonSerializationSchema() {
+ this(() -> new ObjectMapper());
+ }
+
+ public JsonSerializationSchema(SerializableSupplier<ObjectMapper> mapperFactory) {
+ this.mapperFactory = mapperFactory;
+ }
+
+ @Override
+ public void open(InitializationContext context) {
+ mapper = mapperFactory.get();
+ }
+
+ @Override
+ public byte[] serialize(T element) {
+ try {
+ return mapper.writeValueAsBytes(element);
+ } catch (JsonProcessingException e) {
+ throw new RuntimeException(
+ String.format("Could not serialize value '%s'.", element), e);
+ }
+ }
+}
diff --git a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonNodeDeserializationSchemaTest.java b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonNodeDeserializationSchemaTest.java
index 741b492dedf..2bd8dc5d68b 100644
--- a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonNodeDeserializationSchemaTest.java
+++ b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonNodeDeserializationSchemaTest.java
@@ -17,6 +17,8 @@
package org.apache.flink.formats.json;
+import org.apache.flink.connector.testutils.formats.DummyInitializationContext;
+
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
@@ -37,6 +39,7 @@ class JsonNodeDeserializationSchemaTest {
byte[] serializedValue = mapper.writeValueAsBytes(initialValue);
JsonNodeDeserializationSchema schema = new JsonNodeDeserializationSchema();
+ schema.open(new DummyInitializationContext());
ObjectNode deserializedValue = schema.deserialize(serializedValue);
assertThat(deserializedValue.get("key").asInt()).isEqualTo(4);
diff --git a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonSerDeSchemaTest.java b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonSerDeSchemaTest.java
new file mode 100644
index 00000000000..5ed992c1eb1
--- /dev/null
+++ b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonSerDeSchemaTest.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.json;
+
+import org.apache.flink.connector.testutils.formats.DummyInitializationContext;
+
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.Objects;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+class JsonSerDeSchemaTest {
+ private static final JsonSerializationSchema<Event> SERIALIZATION_SCHEMA;
+ private static final JsonDeserializationSchema<Event> DESERIALIZATION_SCHEMA;
+ private static final String JSON = "{\"x\":34,\"y\":\"hello\"}";
+
+ static {
+ SERIALIZATION_SCHEMA = new JsonSerializationSchema<>();
+ SERIALIZATION_SCHEMA.open(new DummyInitializationContext());
+ DESERIALIZATION_SCHEMA = new JsonDeserializationSchema<>(Event.class);
+ DESERIALIZATION_SCHEMA.open(new DummyInitializationContext());
+ }
+
+ @Test
+ void testSrialization() throws IOException {
+ final byte[] serialized = SERIALIZATION_SCHEMA.serialize(new Event(34, "hello"));
+ assertThat(serialized).isEqualTo(JSON.getBytes(StandardCharsets.UTF_8));
+ }
+
+ @Test
+ void testDeserialization() throws IOException {
+ final Event deserialized =
+ DESERIALIZATION_SCHEMA.deserialize(JSON.getBytes(StandardCharsets.UTF_8));
+ assertThat(deserialized).isEqualTo(new Event(34, "hello"));
+ }
+
+ @Test
+ void testRoundTrip() throws IOException {
+ final Event original = new Event(34, "hello");
+
+ final byte[] serialized = SERIALIZATION_SCHEMA.serialize(original);
+
+ final Event deserialized = DESERIALIZATION_SCHEMA.deserialize(serialized);
+
+ assertThat(deserialized).isEqualTo(original);
+ }
+
+ private static class Event {
+
+ private int x;
+ private String y = null;
+
+ public Event() {}
+
+ public Event(int x, String y) {
+ this.x = x;
+ this.y = y;
+ }
+
+ public int getX() {
+ return x;
+ }
+
+ public void setX(int x) {
+ this.x = x;
+ }
+
+ public String getY() {
+ return y;
+ }
+
+ public void setY(String y) {
+ this.y = y;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ Event event = (Event) o;
+ return x == event.x && Objects.equals(y, event.y);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(x, y);
+ }
+ }
+}
diff --git a/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testutils/formats/DummyInitializationContext.java b/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testutils/formats/DummyInitializationContext.java
new file mode 100644
index 00000000000..6335f0d3fc9
--- /dev/null
+++ b/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testutils/formats/DummyInitializationContext.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.testutils.formats;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
+import org.apache.flink.util.SimpleUserCodeClassLoader;
+import org.apache.flink.util.UserCodeClassLoader;
+
+/** A dummy context for serialization schemas. */
+public class DummyInitializationContext
+ implements SerializationSchema.InitializationContext,
+ DeserializationSchema.InitializationContext {
+
+ @Override
+ public MetricGroup getMetricGroup() {
+ return new UnregisteredMetricsGroup();
+ }
+
+ @Override
+ public UserCodeClassLoader getUserCodeClassLoader() {
+ return SimpleUserCodeClassLoader.create(DummyInitializationContext.class.getClassLoader());
+ }
+}