You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2022/07/26 08:35:07 UTC
[flink] branch master updated: [FLINK-28636][tests] Add utility for checking POJO compliance
This is an automated email from the ASF dual-hosted git repository.
chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 94c3daac4a9 [FLINK-28636][tests] Add utility for checking POJO compliance
94c3daac4a9 is described below
commit 94c3daac4a96a68009a22825fb1fd4ac0bb92195
Author: Chesnay Schepler <ch...@apache.org>
AuthorDate: Tue Jul 26 10:34:59 2022 +0200
[FLINK-28636][tests] Add utility for checking POJO compliance
---
.../serialization/types_serialization.md | 2 +
.../serialization/types_serialization.md | 2 +
.../kinesis/FlinkKinesisConsumerTest.java | 9 +---
.../java/org/apache/flink/types/PojoTestUtils.java | 55 ++++++++++++++++++++++
.../org/apache/flink/types/PojoTestUtilsTest.java | 42 +++++++++++++++++
5 files changed, 103 insertions(+), 7 deletions(-)
diff --git a/docs/content.zh/docs/dev/datastream/fault-tolerance/serialization/types_serialization.md b/docs/content.zh/docs/dev/datastream/fault-tolerance/serialization/types_serialization.md
index c80f7b46195..072e7ebaf97 100644
--- a/docs/content.zh/docs/dev/datastream/fault-tolerance/serialization/types_serialization.md
+++ b/docs/content.zh/docs/dev/datastream/fault-tolerance/serialization/types_serialization.md
@@ -115,6 +115,8 @@ You can also register your own custom serializer if required; see [Serialization
Flink analyzes the structure of POJO types, i.e., it learns about the fields of a POJO. As a result POJO types are easier to use than general types. Moreover, Flink can process POJOs more efficiently than general types.
+You can test whether your class adheres to the POJO requirements via `org.apache.flink.types.PojoTestUtils#assertSerializedAsPojo()` from the `flink-test-utils`.
+
The following example shows a simple POJO with two public fields.
{{< tabs "0589f3b3-76d8-4913-9595-276da92cbc77" >}}
diff --git a/docs/content/docs/dev/datastream/fault-tolerance/serialization/types_serialization.md b/docs/content/docs/dev/datastream/fault-tolerance/serialization/types_serialization.md
index 47055860efb..d8563c53ac3 100644
--- a/docs/content/docs/dev/datastream/fault-tolerance/serialization/types_serialization.md
+++ b/docs/content/docs/dev/datastream/fault-tolerance/serialization/types_serialization.md
@@ -116,6 +116,8 @@ You can also register your own custom serializer if required; see [Serialization
Flink analyzes the structure of POJO types, i.e., it learns about the fields of a POJO. As a result POJO types are easier to use than general types. Moreover, Flink can process POJOs more efficiently than general types.
+You can test whether your class adheres to the POJO requirements via `org.apache.flink.types.PojoTestUtils#assertSerializedAsPojo()` from the `flink-test-utils`.
+
The following example shows a simple POJO with two public fields.
{{< tabs "0589f3b3-76d8-4913-9595-276da92cbc77" >}}
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java
index 364a412c5d2..deef7b38057 100644
--- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java
@@ -17,7 +17,6 @@
package org.apache.flink.streaming.connectors.kinesis;
-import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
@@ -25,9 +24,7 @@ import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.OperatorStateStore;
import org.apache.flink.api.common.time.Deadline;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.typeutils.runtime.PojoSerializer;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.mock.Whitebox;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
@@ -58,6 +55,7 @@ import org.apache.flink.streaming.connectors.kinesis.util.RecordEmitter;
import org.apache.flink.streaming.connectors.kinesis.util.WatermarkTracker;
import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
import org.apache.flink.streaming.util.CollectingSourceContext;
+import org.apache.flink.types.PojoTestUtils;
import org.apache.flink.util.TestLogger;
import com.amazonaws.services.kinesis.model.HashKeyRange;
@@ -633,10 +631,7 @@ public class FlinkKinesisConsumerTest extends TestLogger {
@Test
public void testStreamShardMetadataSerializedUsingPojoSerializer() {
- TypeInformation<StreamShardMetadata> typeInformation =
- TypeInformation.of(StreamShardMetadata.class);
- assertThat(typeInformation.createSerializer(new ExecutionConfig()))
- .isInstanceOf(PojoSerializer.class);
+ PojoTestUtils.assertSerializedAsPojo(StreamShardMetadata.class);
}
/**
diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/types/PojoTestUtils.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/types/PojoTestUtils.java
new file mode 100644
index 00000000000..dbae4ed3a22
--- /dev/null
+++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/types/PojoTestUtils.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.types;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.api.java.typeutils.runtime.PojoSerializer;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test utils around POJOs. */
+@PublicEvolving
+public class PojoTestUtils {
+ /**
+ * Verifies that instances of the given class fulfill all conditions to be serialized with the
+ * {@link PojoSerializer}, as documented <a
+ * href="https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/datastream/fault-tolerance/serialization/types_serialization/#pojos">here</a>.
+ *
+ * @param clazz class to analyze
+ * @param <T> class type
+ * @throws AssertionError if instances of the class cannot be serialized as a POJO
+ */
+ public static <T> void assertSerializedAsPojo(Class<T> clazz) throws AssertionError {
+ final TypeInformation<T> typeInformation = TypeInformation.of(clazz);
+ final TypeSerializer<T> actualSerializer =
+ typeInformation.createSerializer(new ExecutionConfig());
+
+ assertThat(actualSerializer)
+ .withFailMessage(
+ "Instances of the class '%s' cannot be serialized as a POJO, but would use a '%s' instead. %n"
+ + "Re-run this test with INFO logging enabled and check messages from the '%s' for possible reasons.",
+ clazz.getSimpleName(),
+ actualSerializer.getClass().getSimpleName(),
+ TypeExtractor.class.getCanonicalName())
+ .isInstanceOf(PojoSerializer.class);
+ }
+}
diff --git a/flink-test-utils-parent/flink-test-utils/src/test/java/org/apache/flink/types/PojoTestUtilsTest.java b/flink-test-utils-parent/flink-test-utils/src/test/java/org/apache/flink/types/PojoTestUtilsTest.java
new file mode 100644
index 00000000000..c602eb01f50
--- /dev/null
+++ b/flink-test-utils-parent/flink-test-utils/src/test/java/org/apache/flink/types/PojoTestUtilsTest.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.types;
+
+import org.junit.jupiter.api.Test;
+
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+class PojoTestUtilsTest {
+
+ @Test
+ void testNonPojoRejected() {
+ assertThatThrownBy(() -> PojoTestUtils.assertSerializedAsPojo(NoPojo.class))
+ .isInstanceOf(AssertionError.class);
+ }
+
+ @Test
+ void testPojoAccepted() {
+ PojoTestUtils.assertSerializedAsPojo(Pojo.class);
+ }
+
+ private static class NoPojo {}
+
+ public static class Pojo {
+ public int x;
+ }
+}