You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2022/07/26 08:35:07 UTC

[flink] branch master updated: [FLINK-28636][tests] Add utility for checking POJO compliance

This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 94c3daac4a9 [FLINK-28636][tests] Add utility for checking POJO compliance
94c3daac4a9 is described below

commit 94c3daac4a96a68009a22825fb1fd4ac0bb92195
Author: Chesnay Schepler <ch...@apache.org>
AuthorDate: Tue Jul 26 10:34:59 2022 +0200

    [FLINK-28636][tests] Add utility for checking POJO compliance
---
 .../serialization/types_serialization.md           |  2 +
 .../serialization/types_serialization.md           |  2 +
 .../kinesis/FlinkKinesisConsumerTest.java          |  9 +---
 .../java/org/apache/flink/types/PojoTestUtils.java | 55 ++++++++++++++++++++++
 .../org/apache/flink/types/PojoTestUtilsTest.java  | 42 +++++++++++++++++
 5 files changed, 103 insertions(+), 7 deletions(-)

diff --git a/docs/content.zh/docs/dev/datastream/fault-tolerance/serialization/types_serialization.md b/docs/content.zh/docs/dev/datastream/fault-tolerance/serialization/types_serialization.md
index c80f7b46195..072e7ebaf97 100644
--- a/docs/content.zh/docs/dev/datastream/fault-tolerance/serialization/types_serialization.md
+++ b/docs/content.zh/docs/dev/datastream/fault-tolerance/serialization/types_serialization.md
@@ -115,6 +115,8 @@ You can also register your own custom serializer if required; see [Serialization
 
 Flink analyzes the structure of POJO types, i.e., it learns about the fields of a POJO. As a result POJO types are easier to use than general types. Moreover, Flink can process POJOs more efficiently than general types.
 
+You can test whether your class adheres to the POJO requirements via `org.apache.flink.types.PojoTestUtils#assertSerializedAsPojo()` from the `flink-test-utils`.
+
 The following example shows a simple POJO with two public fields.
 
 {{< tabs "0589f3b3-76d8-4913-9595-276da92cbc77" >}}
diff --git a/docs/content/docs/dev/datastream/fault-tolerance/serialization/types_serialization.md b/docs/content/docs/dev/datastream/fault-tolerance/serialization/types_serialization.md
index 47055860efb..d8563c53ac3 100644
--- a/docs/content/docs/dev/datastream/fault-tolerance/serialization/types_serialization.md
+++ b/docs/content/docs/dev/datastream/fault-tolerance/serialization/types_serialization.md
@@ -116,6 +116,8 @@ You can also register your own custom serializer if required; see [Serialization
 
 Flink analyzes the structure of POJO types, i.e., it learns about the fields of a POJO. As a result POJO types are easier to use than general types. Moreover, Flink can process POJOs more efficiently than general types.
 
+You can test whether your class adheres to the POJO requirements via `org.apache.flink.types.PojoTestUtils#assertSerializedAsPojo()` from the `flink-test-utils`.
+
 The following example shows a simple POJO with two public fields.
 
 {{< tabs "0589f3b3-76d8-4913-9595-276da92cbc77" >}}
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java
index 364a412c5d2..deef7b38057 100644
--- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java
@@ -17,7 +17,6 @@
 
 package org.apache.flink.streaming.connectors.kinesis;
 
-import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.api.common.serialization.DeserializationSchema;
 import org.apache.flink.api.common.serialization.SimpleStringSchema;
@@ -25,9 +24,7 @@ import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.state.OperatorStateStore;
 import org.apache.flink.api.common.time.Deadline;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.typeutils.runtime.PojoSerializer;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.mock.Whitebox;
 import org.apache.flink.runtime.state.FunctionSnapshotContext;
@@ -58,6 +55,7 @@ import org.apache.flink.streaming.connectors.kinesis.util.RecordEmitter;
 import org.apache.flink.streaming.connectors.kinesis.util.WatermarkTracker;
 import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
 import org.apache.flink.streaming.util.CollectingSourceContext;
+import org.apache.flink.types.PojoTestUtils;
 import org.apache.flink.util.TestLogger;
 
 import com.amazonaws.services.kinesis.model.HashKeyRange;
@@ -633,10 +631,7 @@ public class FlinkKinesisConsumerTest extends TestLogger {
 
     @Test
     public void testStreamShardMetadataSerializedUsingPojoSerializer() {
-        TypeInformation<StreamShardMetadata> typeInformation =
-                TypeInformation.of(StreamShardMetadata.class);
-        assertThat(typeInformation.createSerializer(new ExecutionConfig()))
-                .isInstanceOf(PojoSerializer.class);
+        PojoTestUtils.assertSerializedAsPojo(StreamShardMetadata.class);
     }
 
     /**
diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/types/PojoTestUtils.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/types/PojoTestUtils.java
new file mode 100644
index 00000000000..dbae4ed3a22
--- /dev/null
+++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/types/PojoTestUtils.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.types;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.api.java.typeutils.runtime.PojoSerializer;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test utils around POJOs. */
+@PublicEvolving
+public class PojoTestUtils {
+    /**
+     * Verifies that instances of the given class fulfill all conditions to be serialized with the
+     * {@link PojoSerializer}, as documented <a
+     * href="https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/datastream/fault-tolerance/serialization/types_serialization/#pojos">here</a>.
+     *
+     * @param clazz class to analyze
+     * @param <T> class type
+     * @throws AssertionError if instances of the class cannot be serialized as a POJO
+     */
+    public static <T> void assertSerializedAsPojo(Class<T> clazz) throws AssertionError {
+        final TypeInformation<T> typeInformation = TypeInformation.of(clazz);
+        final TypeSerializer<T> actualSerializer =
+                typeInformation.createSerializer(new ExecutionConfig());
+
+        assertThat(actualSerializer)
+                .withFailMessage(
+                        "Instances of the class '%s' cannot be serialized as a POJO, but would use a '%s' instead. %n"
+                                + "Re-run this test with INFO logging enabled and check messages from the '%s' for possible reasons.",
+                        clazz.getSimpleName(),
+                        actualSerializer.getClass().getSimpleName(),
+                        TypeExtractor.class.getCanonicalName())
+                .isInstanceOf(PojoSerializer.class);
+    }
+}
diff --git a/flink-test-utils-parent/flink-test-utils/src/test/java/org/apache/flink/types/PojoTestUtilsTest.java b/flink-test-utils-parent/flink-test-utils/src/test/java/org/apache/flink/types/PojoTestUtilsTest.java
new file mode 100644
index 00000000000..c602eb01f50
--- /dev/null
+++ b/flink-test-utils-parent/flink-test-utils/src/test/java/org/apache/flink/types/PojoTestUtilsTest.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.types;
+
+import org.junit.jupiter.api.Test;
+
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+class PojoTestUtilsTest {
+
+    @Test
+    void testNonPojoRejected() {
+        assertThatThrownBy(() -> PojoTestUtils.assertSerializedAsPojo(NoPojo.class))
+                .isInstanceOf(AssertionError.class);
+    }
+
+    @Test
+    void testPojoAccepted() {
+        PojoTestUtils.assertSerializedAsPojo(Pojo.class);
+    }
+
+    private static class NoPojo {}
+
+    public static class Pojo {
+        public int x;
+    }
+}