You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2022/07/28 12:28:56 UTC

[flink] 01/02: [FLINK-28634][json] Add simple JsonSerDeSchema

This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit ae45bd3c50abf8b2621a5de31410ae381f7ffa04
Author: Chesnay Schepler <ch...@apache.org>
AuthorDate: Mon Jul 25 15:50:07 2022 +0200

    [FLINK-28634][json] Add simple JsonSerDeSchema
---
 .../docs/connectors/datastream/formats/json.md     |  77 ++++++++++++++
 .../docs/connectors/datastream/formats/json.md     |  77 ++++++++++++++
 flink-formats/flink-json/pom.xml                   |   6 ++
 .../formats/json/JsonDeserializationSchema.java    |  70 +++++++++++++
 .../json/JsonNodeDeserializationSchema.java        |  17 +---
 .../formats/json/JsonSerializationSchema.java      |  59 +++++++++++
 .../json/JsonNodeDeserializationSchemaTest.java    |   3 +
 .../flink/formats/json/JsonSerDeSchemaTest.java    | 111 +++++++++++++++++++++
 .../formats/DummyInitializationContext.java        |  41 ++++++++
 9 files changed, 448 insertions(+), 13 deletions(-)

diff --git a/docs/content.zh/docs/connectors/datastream/formats/json.md b/docs/content.zh/docs/connectors/datastream/formats/json.md
new file mode 100644
index 00000000000..232c45441f3
--- /dev/null
+++ b/docs/content.zh/docs/connectors/datastream/formats/json.md
@@ -0,0 +1,77 @@
+---
+title:  "JSON"
+weight: 4
+type: docs
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# Json format
+
+To use the JSON format you need to add the Flink JSON dependency to your project:
+
+```xml
+<dependency>
+	<groupId>org.apache.flink</groupId>
+	<artifactId>flink-json</artifactId>
+	<version>{{< version >}}</version>
+	<scope>provided</scope>
+</dependency>
+```
+
+Flink supports reading/writing JSON records via the `JsonSerializationSchema/JsonDeserializationSchema`.
+These utilize the [Jackson](https://github.com/FasterXML/jackson) library, and support any type that is supported by Jackson, including, but not limited to, `POJO`s and `ObjectNode`.
+
+The `JsonDeserializationSchema` can be used with any connector that supports the `DeserializationSchema`.
+
+For example, this is how you use it with a `KafkaSource` to deserialize a `POJO`:
+
+```java
+JsonDeserializationSchema<SomePojo> jsonFormat = new JsonDeserializationSchema<>(SomePojo.class);
+KafkaSource<SomePojo> source =
+    KafkaSource.<SomePojo>builder()
+        .setValueOnlyDeserializer(jsonFormat)
+        ...
+```
+
+The `JsonSerializationSchema` can be used with any connector that supports the `SerializationSchema`.
+
+For example, this is how you use it with a `KafkaSink` to serialize a `POJO`:
+
+```java
+JsonSerializationSchema<SomePojo> jsonFormat = new JsonSerializationSchema<>();
+KafkaSink<SomePojo> source =
+    KafkaSink.<SomePojo>builder()
+        .setRecordSerializer(
+            new KafkaRecordSerializationSchemaBuilder<>()
+                .setValueSerializationSchema(jsonFormat)
+                ...
+```
+
+## Custom Mapper
+
+Both schemas have constructors that accept a `SerializableSupplier<ObjectMapper>`, acting a factory for object mappers.
+With this factory you gain full control over the created mapper, and can enable/disable various Jackson features or register modules to extend the set of supported types or add additional functionality.
+
+```java
+JsonSerializationSchema<SomeClass> jsonFormat = new JsonSerializationSchema<>(
+    () -> new ObjectMapper()
+        .enable(SerializationFeature.ORDER_MAP_ENTRIES_BY_KEYS))
+        .registerModule(new ParameterNamesModule());
+```
\ No newline at end of file
diff --git a/docs/content/docs/connectors/datastream/formats/json.md b/docs/content/docs/connectors/datastream/formats/json.md
new file mode 100644
index 00000000000..57c97cb6b20
--- /dev/null
+++ b/docs/content/docs/connectors/datastream/formats/json.md
@@ -0,0 +1,77 @@
+---
+title:  "JSON"
+weight: 4
+type: docs
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# Json format
+
+To use the JSON format you need to add the Flink JSON dependency to your project:
+
+```xml
+<dependency>
+	<groupId>org.apache.flink</groupId>
+	<artifactId>flink-json</artifactId>
+	<version>{{< version >}}</version>
+	<scope>provided</scope>
+</dependency>
+```
+
+Flink supports reading/writing JSON records via the `JsonSerializationSchema/JsonDeserializationSchema`.
+These utilize the [Jackson](https://github.com/FasterXML/jackson) library, and support any type that is supported by Jackson, including, but not limited to, `POJO`s and `ObjectNode`.
+
+The `JsonDeserializationSchema` can be used with any connector that supports the `DeserializationSchema`.
+
+For example, this is how you use it with a `KafkaSource` to deserialize a `POJO`:
+
+```java
+JsonDeserializationSchema<SomePojo> jsonFormat=new JsonDeserializationSchema<>(SomePojo.class);
+KafkaSource<SomePojo> source=
+    KafkaSource.<SomePojo>builder()
+        .setValueOnlyDeserializer(jsonFormat)
+        ...
+```
+
+The `JsonSerializationSchema` can be used with any connector that supports the `SerializationSchema`.
+
+For example, this is how you use it with a `KafkaSink` to serialize a `POJO`:
+
+```java
+JsonSerializationSchema<SomePojo> jsonFormat=new JsonSerializationSchema<>();
+KafkaSink<SomePojo> source  = 
+    KafkaSink.<SomePojo>builder()
+        .setRecordSerializer(
+            new KafkaRecordSerializationSchemaBuilder<>()
+                .setValueSerializationSchema(jsonFormat)
+                ...
+```
+
+## Custom Mapper
+
+Both schemas have constructors that accept a `SerializableSupplier<ObjectMapper>`, acting a factory for object mappers.
+With this factory you gain full control over the created mapper, and can enable/disable various Jackson features or register modules to extend the set of supported types or add additional functionality.
+
+```java
+JsonSerializationSchema<SomeClass> jsonFormat=new JsonSerializationSchema<>(
+    () -> new ObjectMapper()
+        .enable(SerializationFeature.ORDER_MAP_ENTRIES_BY_KEYS))
+        .registerModule(new ParameterNamesModule());
+```
\ No newline at end of file
diff --git a/flink-formats/flink-json/pom.xml b/flink-formats/flink-json/pom.xml
index 28c7b4629b9..ab697cf7ed9 100644
--- a/flink-formats/flink-json/pom.xml
+++ b/flink-formats/flink-json/pom.xml
@@ -109,6 +109,12 @@ under the License.
 		</dependency>
 
 		<!-- test utils dependency -->
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-connector-test-utils</artifactId>
+			<version>${project.version}</version>
+			<scope>test</scope>
+		</dependency>
 		<dependency>
 			<groupId>org.apache.flink</groupId>
 			<artifactId>flink-test-utils</artifactId>
diff --git a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonDeserializationSchema.java b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonDeserializationSchema.java
new file mode 100644
index 00000000000..fd28712d56a
--- /dev/null
+++ b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonDeserializationSchema.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.json;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.serialization.AbstractDeserializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.util.function.SerializableSupplier;
+
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+
+import java.io.IOException;
+
+/** DeserializationSchema that deserializes a JSON String. */
+@PublicEvolving
+public class JsonDeserializationSchema<T> extends AbstractDeserializationSchema<T> {
+
+    private static final long serialVersionUID = 1L;
+
+    private final Class<T> clazz;
+    private final SerializableSupplier<ObjectMapper> mapperFactory;
+    protected transient ObjectMapper mapper;
+
+    public JsonDeserializationSchema(Class<T> clazz) {
+        this(clazz, () -> new ObjectMapper());
+    }
+
+    public JsonDeserializationSchema(TypeInformation<T> typeInformation) {
+        this(typeInformation, () -> new ObjectMapper());
+    }
+
+    public JsonDeserializationSchema(
+            Class<T> clazz, SerializableSupplier<ObjectMapper> mapperFactory) {
+        super(clazz);
+        this.clazz = clazz;
+        this.mapperFactory = mapperFactory;
+    }
+
+    public JsonDeserializationSchema(
+            TypeInformation<T> typeInformation, SerializableSupplier<ObjectMapper> mapperFactory) {
+        super(typeInformation);
+        this.clazz = typeInformation.getTypeClass();
+        this.mapperFactory = mapperFactory;
+    }
+
+    @Override
+    public void open(InitializationContext context) {
+        mapper = mapperFactory.get();
+    }
+
+    @Override
+    public T deserialize(byte[] message) throws IOException {
+        return mapper.readValue(message, clazz);
+    }
+}
diff --git a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonNodeDeserializationSchema.java b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonNodeDeserializationSchema.java
index 55c61e1a6ba..36aa4843f79 100644
--- a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonNodeDeserializationSchema.java
+++ b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonNodeDeserializationSchema.java
@@ -17,28 +17,19 @@
 
 package org.apache.flink.formats.json;
 
-import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.api.common.serialization.AbstractDeserializationSchema;
-
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
 
-import java.io.IOException;
-
 /**
  * DeserializationSchema that deserializes a JSON String into an ObjectNode.
  *
  * <p>Fields can be accessed by calling objectNode.get(&lt;name>).as(&lt;type>)
  */
 @PublicEvolving
-public class JsonNodeDeserializationSchema extends AbstractDeserializationSchema<ObjectNode> {
-
-    private static final long serialVersionUID = -1699854177598621044L;
+public class JsonNodeDeserializationSchema extends JsonDeserializationSchema<ObjectNode> {
 
-    private final ObjectMapper mapper = new ObjectMapper();
+    private static final long serialVersionUID = 2L;
 
-    @Override
-    public ObjectNode deserialize(byte[] message) throws IOException {
-        return mapper.readValue(message, ObjectNode.class);
+    public JsonNodeDeserializationSchema() {
+        super(ObjectNode.class);
     }
 }
diff --git a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonSerializationSchema.java b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonSerializationSchema.java
new file mode 100644
index 00000000000..c029fa1dc9c
--- /dev/null
+++ b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonSerializationSchema.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.json;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.util.function.SerializableSupplier;
+
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+
+/** SerializationSchema that serializes an object to a JSON String. */
+@PublicEvolving
+public class JsonSerializationSchema<T> implements SerializationSchema<T> {
+
+    private static final long serialVersionUID = 1L;
+
+    private final SerializableSupplier<ObjectMapper> mapperFactory;
+
+    protected transient ObjectMapper mapper;
+
+    public JsonSerializationSchema() {
+        this(() -> new ObjectMapper());
+    }
+
+    public JsonSerializationSchema(SerializableSupplier<ObjectMapper> mapperFactory) {
+        this.mapperFactory = mapperFactory;
+    }
+
+    @Override
+    public void open(InitializationContext context) {
+        mapper = mapperFactory.get();
+    }
+
+    @Override
+    public byte[] serialize(T element) {
+        try {
+            return mapper.writeValueAsBytes(element);
+        } catch (JsonProcessingException e) {
+            throw new RuntimeException(
+                    String.format("Could not serialize value '%s'.", element), e);
+        }
+    }
+}
diff --git a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonNodeDeserializationSchemaTest.java b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonNodeDeserializationSchemaTest.java
index 741b492dedf..2bd8dc5d68b 100644
--- a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonNodeDeserializationSchemaTest.java
+++ b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonNodeDeserializationSchemaTest.java
@@ -17,6 +17,8 @@
 
 package org.apache.flink.formats.json;
 
+import org.apache.flink.connector.testutils.formats.DummyInitializationContext;
+
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
 
@@ -37,6 +39,7 @@ class JsonNodeDeserializationSchemaTest {
         byte[] serializedValue = mapper.writeValueAsBytes(initialValue);
 
         JsonNodeDeserializationSchema schema = new JsonNodeDeserializationSchema();
+        schema.open(new DummyInitializationContext());
         ObjectNode deserializedValue = schema.deserialize(serializedValue);
 
         assertThat(deserializedValue.get("key").asInt()).isEqualTo(4);
diff --git a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonSerDeSchemaTest.java b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonSerDeSchemaTest.java
new file mode 100644
index 00000000000..5ed992c1eb1
--- /dev/null
+++ b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonSerDeSchemaTest.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.json;
+
+import org.apache.flink.connector.testutils.formats.DummyInitializationContext;
+
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.Objects;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+class JsonSerDeSchemaTest {
+    private static final JsonSerializationSchema<Event> SERIALIZATION_SCHEMA;
+    private static final JsonDeserializationSchema<Event> DESERIALIZATION_SCHEMA;
+    private static final String JSON = "{\"x\":34,\"y\":\"hello\"}";
+
+    static {
+        SERIALIZATION_SCHEMA = new JsonSerializationSchema<>();
+        SERIALIZATION_SCHEMA.open(new DummyInitializationContext());
+        DESERIALIZATION_SCHEMA = new JsonDeserializationSchema<>(Event.class);
+        DESERIALIZATION_SCHEMA.open(new DummyInitializationContext());
+    }
+
+    @Test
+    void testSrialization() throws IOException {
+        final byte[] serialized = SERIALIZATION_SCHEMA.serialize(new Event(34, "hello"));
+        assertThat(serialized).isEqualTo(JSON.getBytes(StandardCharsets.UTF_8));
+    }
+
+    @Test
+    void testDeserialization() throws IOException {
+        final Event deserialized =
+                DESERIALIZATION_SCHEMA.deserialize(JSON.getBytes(StandardCharsets.UTF_8));
+        assertThat(deserialized).isEqualTo(new Event(34, "hello"));
+    }
+
+    @Test
+    void testRoundTrip() throws IOException {
+        final Event original = new Event(34, "hello");
+
+        final byte[] serialized = SERIALIZATION_SCHEMA.serialize(original);
+
+        final Event deserialized = DESERIALIZATION_SCHEMA.deserialize(serialized);
+
+        assertThat(deserialized).isEqualTo(original);
+    }
+
+    private static class Event {
+
+        private int x;
+        private String y = null;
+
+        public Event() {}
+
+        public Event(int x, String y) {
+            this.x = x;
+            this.y = y;
+        }
+
+        public int getX() {
+            return x;
+        }
+
+        public void setX(int x) {
+            this.x = x;
+        }
+
+        public String getY() {
+            return y;
+        }
+
+        public void setY(String y) {
+            this.y = y;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) {
+                return true;
+            }
+            if (o == null || getClass() != o.getClass()) {
+                return false;
+            }
+            Event event = (Event) o;
+            return x == event.x && Objects.equals(y, event.y);
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(x, y);
+        }
+    }
+}
diff --git a/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testutils/formats/DummyInitializationContext.java b/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testutils/formats/DummyInitializationContext.java
new file mode 100644
index 00000000000..6335f0d3fc9
--- /dev/null
+++ b/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testutils/formats/DummyInitializationContext.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.testutils.formats;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
+import org.apache.flink.util.SimpleUserCodeClassLoader;
+import org.apache.flink.util.UserCodeClassLoader;
+
+/** A dummy context for serialization schemas. */
+public class DummyInitializationContext
+        implements SerializationSchema.InitializationContext,
+                DeserializationSchema.InitializationContext {
+
+    @Override
+    public MetricGroup getMetricGroup() {
+        return new UnregisteredMetricsGroup();
+    }
+
+    @Override
+    public UserCodeClassLoader getUserCodeClassLoader() {
+        return SimpleUserCodeClassLoader.create(DummyInitializationContext.class.getClassLoader());
+    }
+}