You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2022/03/25 09:31:37 UTC

[flink] branch master updated: [FLINK-26736][tests] Migrate flink-avro-confluent-registry to JUnit5

This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new e3b123d  [FLINK-26736][tests] Migrate flink-avro-confluent-registry to JUnit5
e3b123d is described below

commit e3b123d7d1e48e7adbb04fb3470d02bb76f5ff73
Author: Ryan Skraba <rs...@apache.org>
AuthorDate: Fri Mar 25 10:31:02 2022 +0100

    [FLINK-26736][tests] Migrate flink-avro-confluent-registry to JUnit5
---
 .../confluent/CachedSchemaCoderProviderTest.java   | 36 ++++++------
 .../ConfluentSchemaRegistryCoderTest.java          | 26 +++++----
 .../confluent/RegistryAvroFormatFactoryTest.java   | 54 ++++++++---------
 .../RegistryAvroRowDataSeDeSchemaTest.java         | 67 ++++++++++------------
 .../debezium/DebeziumAvroFormatFactoryTest.java    | 23 +++-----
 .../debezium/DebeziumAvroSerDeSchemaTest.java      | 53 +++++++++++------
 .../org.junit.jupiter.api.extension.Extension      | 16 ++++++
 7 files changed, 143 insertions(+), 132 deletions(-)

diff --git a/flink-formats/flink-avro-confluent-registry/src/test/java/org/apache/flink/formats/avro/registry/confluent/CachedSchemaCoderProviderTest.java b/flink-formats/flink-avro-confluent-registry/src/test/java/org/apache/flink/formats/avro/registry/confluent/CachedSchemaCoderProviderTest.java
index db877d8..cddbf13 100644
--- a/flink-formats/flink-avro-confluent-registry/src/test/java/org/apache/flink/formats/avro/registry/confluent/CachedSchemaCoderProviderTest.java
+++ b/flink-formats/flink-avro-confluent-registry/src/test/java/org/apache/flink/formats/avro/registry/confluent/CachedSchemaCoderProviderTest.java
@@ -22,7 +22,7 @@ import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient;
 import io.confluent.kafka.schemaregistry.client.rest.RestService;
 import io.confluent.kafka.schemaregistry.client.security.basicauth.BasicAuthCredentialProvider;
 import io.confluent.kafka.schemaregistry.client.security.bearerauth.BearerAuthCredentialProvider;
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
 import org.powermock.reflect.Whitebox;
 
 import javax.net.ssl.SSLSocketFactory;
@@ -31,26 +31,24 @@ import java.net.URISyntaxException;
 import java.util.HashMap;
 import java.util.Map;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
+import static org.assertj.core.api.Assertions.assertThat;
 
 /**
  * Tests for properties set by {@link RegistryAvroFormatFactory} in {@link
  * CachedSchemaCoderProvider}.
  */
-public class CachedSchemaCoderProviderTest {
+class CachedSchemaCoderProviderTest {
 
     @Test
-    public void testThatSslIsNotInitializedForNoSslProperties() {
+    void testThatSslIsNotInitializedForNoSslProperties() {
         CachedSchemaCoderProvider provider = initCachedSchemaCoderProvider(new HashMap<>());
         SSLSocketFactory sslSocketFactory = getSslSocketFactoryFromProvider(provider);
 
-        assertNull(sslSocketFactory);
+        assertThat(sslSocketFactory).isNull();
     }
 
     @Test
-    public void testThatSslIsInitializedForSslProperties() throws URISyntaxException {
+    void testThatSslIsInitializedForSslProperties() throws URISyntaxException {
         String keystoreFile = getAbsolutePath("/test-keystore.jks");
         String keystorePassword = "123456";
         Map<String, String> configs = new HashMap<>();
@@ -62,20 +60,20 @@ public class CachedSchemaCoderProviderTest {
         CachedSchemaCoderProvider provider = initCachedSchemaCoderProvider(configs);
         SSLSocketFactory sslSocketFactory = getSslSocketFactoryFromProvider(provider);
 
-        assertNotNull(sslSocketFactory);
+        assertThat(sslSocketFactory).isNotNull();
     }
 
     @Test
-    public void testThatBasicAuthIsNotInitializedForNoBasicAuthProperties() {
+    void testThatBasicAuthIsNotInitializedForNoBasicAuthProperties() {
         CachedSchemaCoderProvider provider = initCachedSchemaCoderProvider(new HashMap<>());
         BasicAuthCredentialProvider basicAuthCredentialProvider =
                 getBasicAuthFromProvider(provider);
 
-        assertNull(basicAuthCredentialProvider);
+        assertThat(basicAuthCredentialProvider).isNull();
     }
 
     @Test
-    public void testThatBasicAuthIsInitializedForBasicAuthProperties() {
+    void testThatBasicAuthIsInitializedForBasicAuthProperties() {
         String userPassword = "user:pwd";
         Map<String, String> configs = new HashMap<>();
         configs.put("basic.auth.credentials.source", "USER_INFO");
@@ -85,21 +83,21 @@ public class CachedSchemaCoderProviderTest {
         BasicAuthCredentialProvider basicAuthCredentialProvider =
                 getBasicAuthFromProvider(provider);
 
-        assertNotNull(basicAuthCredentialProvider);
-        assertEquals(basicAuthCredentialProvider.getUserInfo(null), userPassword);
+        assertThat(basicAuthCredentialProvider).isNotNull();
+        assertThat(basicAuthCredentialProvider.getUserInfo(null)).isEqualTo(userPassword);
     }
 
     @Test
-    public void testThatBearerAuthIsNotInitializedForNoBearerAuthProperties() {
+    void testThatBearerAuthIsNotInitializedForNoBearerAuthProperties() {
         CachedSchemaCoderProvider provider = initCachedSchemaCoderProvider(new HashMap<>());
         BearerAuthCredentialProvider bearerAuthCredentialProvider =
                 getBearerAuthFromProvider(provider);
 
-        assertNull(bearerAuthCredentialProvider);
+        assertThat(bearerAuthCredentialProvider).isNull();
     }
 
     @Test
-    public void testThatBearerAuthIsInitializedForBearerAuthProperties() {
+    void testThatBearerAuthIsInitializedForBearerAuthProperties() {
         String token = "123456";
         Map<String, String> configs = new HashMap<>();
         configs.put("bearer.auth.credentials.source", "STATIC_TOKEN");
@@ -109,8 +107,8 @@ public class CachedSchemaCoderProviderTest {
         BearerAuthCredentialProvider bearerAuthCredentialProvider =
                 getBearerAuthFromProvider(provider);
 
-        assertNotNull(bearerAuthCredentialProvider);
-        assertEquals(bearerAuthCredentialProvider.getBearerToken(null), token);
+        assertThat(bearerAuthCredentialProvider).isNotNull();
+        assertThat(bearerAuthCredentialProvider.getBearerToken(null)).isEqualTo(token);
     }
 
     private String getAbsolutePath(String path) throws URISyntaxException {
diff --git a/flink-formats/flink-avro-confluent-registry/src/test/java/org/apache/flink/formats/avro/registry/confluent/ConfluentSchemaRegistryCoderTest.java b/flink-formats/flink-avro-confluent-registry/src/test/java/org/apache/flink/formats/avro/registry/confluent/ConfluentSchemaRegistryCoderTest.java
index 009ef02..2c93348 100644
--- a/flink-formats/flink-avro-confluent-registry/src/test/java/org/apache/flink/formats/avro/registry/confluent/ConfluentSchemaRegistryCoderTest.java
+++ b/flink-formats/flink-avro-confluent-registry/src/test/java/org/apache/flink/formats/avro/registry/confluent/ConfluentSchemaRegistryCoderTest.java
@@ -21,20 +21,21 @@ package org.apache.flink.formats.avro.registry.confluent;
 import io.confluent.kafka.schemaregistry.client.MockSchemaRegistryClient;
 import org.apache.avro.Schema;
 import org.apache.avro.SchemaBuilder;
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
 
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
 
-import static org.junit.Assert.assertEquals;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 /** Tests for {@link ConfluentSchemaRegistryCoder}. */
-public class ConfluentSchemaRegistryCoderTest {
+class ConfluentSchemaRegistryCoderTest {
 
     @Test
-    public void testSpecificRecordWithConfluentSchemaRegistry() throws Exception {
+    void testSpecificRecordWithConfluentSchemaRegistry() throws Exception {
         MockSchemaRegistryClient client = new MockSchemaRegistryClient();
 
         Schema schema =
@@ -51,12 +52,12 @@ public class ConfluentSchemaRegistryCoderTest {
         ByteArrayInputStream byteInStream = new ByteArrayInputStream(byteOutStream.toByteArray());
         Schema readSchema = registryCoder.readSchema(byteInStream);
 
-        assertEquals(schema, readSchema);
-        assertEquals(0, byteInStream.available());
+        assertThat(readSchema).isEqualTo(schema);
+        assertThat(byteInStream).isEmpty();
     }
 
-    @Test(expected = IOException.class)
-    public void testMagicByteVerification() throws Exception {
+    @Test
+    void testMagicByteVerification() throws Exception {
         MockSchemaRegistryClient client = new MockSchemaRegistryClient();
         int schemaId = client.register("testTopic", Schema.create(Schema.Type.BOOLEAN));
 
@@ -67,9 +68,10 @@ public class ConfluentSchemaRegistryCoderTest {
         dataOutputStream.writeInt(schemaId);
         dataOutputStream.flush();
 
-        ByteArrayInputStream byteInStream = new ByteArrayInputStream(byteOutStream.toByteArray());
-        coder.readSchema(byteInStream);
-
-        // exception is thrown
+        try (ByteArrayInputStream byteInStream =
+                new ByteArrayInputStream(byteOutStream.toByteArray())) {
+            assertThatThrownBy(() -> coder.readSchema(byteInStream))
+                    .isInstanceOf(IOException.class);
+        }
     }
 }
diff --git a/flink-formats/flink-avro-confluent-registry/src/test/java/org/apache/flink/formats/avro/registry/confluent/RegistryAvroFormatFactoryTest.java b/flink-formats/flink-avro-confluent-registry/src/test/java/org/apache/flink/formats/avro/registry/confluent/RegistryAvroFormatFactoryTest.java
index 7c5b426..cf80adf 100644
--- a/flink-formats/flink-avro-confluent-registry/src/test/java/org/apache/flink/formats/avro/registry/confluent/RegistryAvroFormatFactoryTest.java
+++ b/flink-formats/flink-avro-confluent-registry/src/test/java/org/apache/flink/formats/avro/registry/confluent/RegistryAvroFormatFactoryTest.java
@@ -37,23 +37,20 @@ import org.apache.flink.table.runtime.connector.source.ScanRuntimeProviderContex
 import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
 import org.apache.flink.table.types.logical.RowType;
 
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExpectedException;
+import org.junit.jupiter.api.Test;
 
 import java.util.HashMap;
 import java.util.Map;
 import java.util.function.Consumer;
 
-import static org.apache.flink.core.testutils.FlinkMatchers.containsCause;
+import static org.apache.flink.core.testutils.FlinkAssertions.anyCauseMatches;
 import static org.apache.flink.table.factories.utils.FactoryMocks.createTableSink;
 import static org.apache.flink.table.factories.utils.FactoryMocks.createTableSource;
-import static org.hamcrest.CoreMatchers.instanceOf;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertThat;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 /** Tests for the {@link RegistryAvroFormatFactory}. */
-public class RegistryAvroFormatFactoryTest {
+class RegistryAvroFormatFactoryTest {
 
     private static final ResolvedSchema SCHEMA =
             ResolvedSchema.of(
@@ -81,10 +78,8 @@ public class RegistryAvroFormatFactoryTest {
         EXPECTED_OPTIONAL_PROPERTIES.put("bearer.auth.token", "CUSTOM");
     }
 
-    @Rule public ExpectedException thrown = ExpectedException.none();
-
     @Test
-    public void testDeserializationSchema() {
+    void testDeserializationSchema() {
         final AvroRowDataDeserializationSchema expectedDeser =
                 new AvroRowDataDeserializationSchema(
                         ConfluentRegistryAvroDeserializationSchema.forGeneric(
@@ -93,7 +88,7 @@ public class RegistryAvroFormatFactoryTest {
                         InternalTypeInfo.of(ROW_TYPE));
 
         final DynamicTableSource actualSource = createTableSource(SCHEMA, getDefaultOptions());
-        assertThat(actualSource, instanceOf(TestDynamicTableFactory.DynamicTableSourceMock.class));
+        assertThat(actualSource).isInstanceOf(TestDynamicTableFactory.DynamicTableSourceMock.class);
         TestDynamicTableFactory.DynamicTableSourceMock scanSourceMock =
                 (TestDynamicTableFactory.DynamicTableSourceMock) actualSource;
 
@@ -101,11 +96,11 @@ public class RegistryAvroFormatFactoryTest {
                 scanSourceMock.valueFormat.createRuntimeDecoder(
                         ScanRuntimeProviderContext.INSTANCE, SCHEMA.toPhysicalRowDataType());
 
-        assertEquals(expectedDeser, actualDeser);
+        assertThat(actualDeser).isEqualTo(expectedDeser);
     }
 
     @Test
-    public void testSerializationSchema() {
+    void testSerializationSchema() {
         final AvroRowDataSerializationSchema expectedSer =
                 new AvroRowDataSerializationSchema(
                         ROW_TYPE,
@@ -116,32 +111,31 @@ public class RegistryAvroFormatFactoryTest {
                         RowDataToAvroConverters.createConverter(ROW_TYPE));
 
         final DynamicTableSink actualSink = createTableSink(SCHEMA, getDefaultOptions());
-        assertThat(actualSink, instanceOf(TestDynamicTableFactory.DynamicTableSinkMock.class));
+        assertThat(actualSink).isInstanceOf(TestDynamicTableFactory.DynamicTableSinkMock.class);
         TestDynamicTableFactory.DynamicTableSinkMock sinkMock =
                 (TestDynamicTableFactory.DynamicTableSinkMock) actualSink;
 
         SerializationSchema<RowData> actualSer =
                 sinkMock.valueFormat.createRuntimeEncoder(null, SCHEMA.toPhysicalRowDataType());
 
-        assertEquals(expectedSer, actualSer);
+        assertThat(actualSer).isEqualTo(expectedSer);
     }
 
     @Test
-    public void testMissingSubjectForSink() {
-        thrown.expect(ValidationException.class);
-        thrown.expect(
-                containsCause(
-                        new ValidationException(
-                                "Option avro-confluent.subject is required for serialization")));
-
+    void testMissingSubjectForSink() {
         final Map<String, String> options =
                 getModifiedOptions(opts -> opts.remove("avro-confluent.subject"));
 
-        createTableSink(SCHEMA, options);
+        assertThatThrownBy(() -> createTableSink(SCHEMA, options))
+                .isInstanceOf(ValidationException.class)
+                .satisfies(
+                        anyCauseMatches(
+                                ValidationException.class,
+                                "Option avro-confluent.subject is required for serialization"));
     }
 
     @Test
-    public void testDeserializationSchemaWithOptionalProperties() {
+    void testDeserializationSchemaWithOptionalProperties() {
         final AvroRowDataDeserializationSchema expectedDeser =
                 new AvroRowDataDeserializationSchema(
                         ConfluentRegistryAvroDeserializationSchema.forGeneric(
@@ -152,7 +146,7 @@ public class RegistryAvroFormatFactoryTest {
                         InternalTypeInfo.of(ROW_TYPE));
 
         final DynamicTableSource actualSource = createTableSource(SCHEMA, getOptionalProperties());
-        assertThat(actualSource, instanceOf(TestDynamicTableFactory.DynamicTableSourceMock.class));
+        assertThat(actualSource).isInstanceOf(TestDynamicTableFactory.DynamicTableSourceMock.class);
         TestDynamicTableFactory.DynamicTableSourceMock scanSourceMock =
                 (TestDynamicTableFactory.DynamicTableSourceMock) actualSource;
 
@@ -160,11 +154,11 @@ public class RegistryAvroFormatFactoryTest {
                 scanSourceMock.valueFormat.createRuntimeDecoder(
                         ScanRuntimeProviderContext.INSTANCE, SCHEMA.toPhysicalRowDataType());
 
-        assertEquals(expectedDeser, actualDeser);
+        assertThat(actualDeser).isEqualTo(expectedDeser);
     }
 
     @Test
-    public void testSerializationSchemaWithOptionalProperties() {
+    void testSerializationSchemaWithOptionalProperties() {
         final AvroRowDataSerializationSchema expectedSer =
                 new AvroRowDataSerializationSchema(
                         ROW_TYPE,
@@ -176,14 +170,14 @@ public class RegistryAvroFormatFactoryTest {
                         RowDataToAvroConverters.createConverter(ROW_TYPE));
 
         final DynamicTableSink actualSink = createTableSink(SCHEMA, getOptionalProperties());
-        assertThat(actualSink, instanceOf(TestDynamicTableFactory.DynamicTableSinkMock.class));
+        assertThat(actualSink).isInstanceOf(TestDynamicTableFactory.DynamicTableSinkMock.class);
         TestDynamicTableFactory.DynamicTableSinkMock sinkMock =
                 (TestDynamicTableFactory.DynamicTableSinkMock) actualSink;
 
         SerializationSchema<RowData> actualSer =
                 sinkMock.valueFormat.createRuntimeEncoder(null, SCHEMA.toPhysicalRowDataType());
 
-        assertEquals(expectedSer, actualSer);
+        assertThat(actualSer).isEqualTo(expectedSer);
     }
 
     // ------------------------------------------------------------------------
diff --git a/flink-formats/flink-avro-confluent-registry/src/test/java/org/apache/flink/formats/avro/registry/confluent/RegistryAvroRowDataSeDeSchemaTest.java b/flink-formats/flink-avro-confluent-registry/src/test/java/org/apache/flink/formats/avro/registry/confluent/RegistryAvroRowDataSeDeSchemaTest.java
index 0061e8b..a5046ac 100644
--- a/flink-formats/flink-avro-confluent-registry/src/test/java/org/apache/flink/formats/avro/registry/confluent/RegistryAvroRowDataSeDeSchemaTest.java
+++ b/flink-formats/flink-avro-confluent-registry/src/test/java/org/apache/flink/formats/avro/registry/confluent/RegistryAvroRowDataSeDeSchemaTest.java
@@ -39,29 +39,23 @@ import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
 import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException;
 import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericRecord;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExpectedException;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
 
 import java.io.IOException;
 import java.util.Random;
 
-import static org.apache.flink.core.testutils.FlinkMatchers.containsCause;
 import static org.apache.flink.formats.avro.utils.AvroTestUtils.writeRecord;
-import static org.hamcrest.CoreMatchers.equalTo;
-import static org.hamcrest.core.Is.is;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertThat;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 /**
  * Tests for {@link AvroRowDataDeserializationSchema} and {@link AvroRowDataSerializationSchema} for
  * schema registry avro.
  */
-public class RegistryAvroRowDataSeDeSchemaTest {
+class RegistryAvroRowDataSeDeSchemaTest {
     private static final Schema ADDRESS_SCHEMA = Address.getClassSchema();
 
     private static final Schema ADDRESS_SCHEMA_COMPATIBLE =
@@ -83,45 +77,43 @@ public class RegistryAvroRowDataSeDeSchemaTest {
 
     private Address address;
 
-    @Rule public ExpectedException expectedEx = ExpectedException.none();
-
-    @BeforeClass
-    public static void beforeClass() {
+    @BeforeAll
+    static void beforeClass() {
         client = new MockSchemaRegistryClient();
     }
 
-    @Before
-    public void before() {
+    @BeforeEach
+    void before() {
         this.address = TestDataGenerator.generateRandomAddress(new Random());
     }
 
-    @After
-    public void after() throws IOException, RestClientException {
+    @AfterEach
+    void after() throws IOException, RestClientException {
         client.deleteSubject(SUBJECT);
     }
 
     @Test
-    public void testRowDataWriteReadWithFullSchema() throws Exception {
+    void testRowDataWriteReadWithFullSchema() throws Exception {
         testRowDataWriteReadWithSchema(ADDRESS_SCHEMA);
     }
 
     @Test
-    public void testRowDataWriteReadWithCompatibleSchema() throws Exception {
+    void testRowDataWriteReadWithCompatibleSchema() throws Exception {
         testRowDataWriteReadWithSchema(ADDRESS_SCHEMA_COMPATIBLE);
         // Validates new schema has been registered.
-        assertThat(client.getAllVersions(SUBJECT).size(), is(1));
+        assertThat(client.getAllVersions(SUBJECT)).hasSize(1);
     }
 
     @Test
-    public void testRowDataWriteReadWithPreRegisteredSchema() throws Exception {
+    void testRowDataWriteReadWithPreRegisteredSchema() throws Exception {
         client.register(SUBJECT, ADDRESS_SCHEMA);
         testRowDataWriteReadWithSchema(ADDRESS_SCHEMA);
         // Validates it does not produce new schema.
-        assertThat(client.getAllVersions(SUBJECT).size(), is(1));
+        assertThat(client.getAllVersions(SUBJECT)).hasSize(1);
     }
 
     @Test
-    public void testRowDataReadWithNonRegistryAvro() throws Exception {
+    void testRowDataReadWithNonRegistryAvro() throws Exception {
         DataType dataType = AvroSchemaConverter.convertToDataType(ADDRESS_SCHEMA.toString());
         RowType rowType = (RowType) dataType.getLogicalType();
 
@@ -132,10 +124,9 @@ public class RegistryAvroRowDataSeDeSchemaTest {
 
         client.register(SUBJECT, ADDRESS_SCHEMA);
         byte[] oriBytes = writeRecord(address, ADDRESS_SCHEMA);
-        expectedEx.expect(IOException.class);
-        expectedEx.expect(
-                containsCause(new IOException("Unknown data format. Magic number does not match")));
-        deserializer.deserialize(oriBytes);
+        assertThatThrownBy(() -> deserializer.deserialize(oriBytes))
+                .isInstanceOf(IOException.class)
+                .hasCause(new IOException("Unknown data format. Magic number does not match"));
     }
 
     private void testRowDataWriteReadWithSchema(Schema schema) throws Exception {
@@ -150,18 +141,18 @@ public class RegistryAvroRowDataSeDeSchemaTest {
         serializer.open(null);
         deserializer.open(null);
 
-        assertNull(deserializer.deserialize(null));
+        assertThat(deserializer.deserialize(null)).isNull();
 
         RowData oriData = address2RowData(address);
         byte[] serialized = serializer.serialize(oriData);
         RowData rowData = deserializer.deserialize(serialized);
-        assertThat(rowData.getArity(), equalTo(schema.getFields().size()));
-        assertEquals(address.getNum(), rowData.getInt(0));
-        assertEquals(address.getStreet(), rowData.getString(1).toString());
+        assertThat(rowData.getArity()).isEqualTo(schema.getFields().size());
+        assertThat(rowData.getInt(0)).isEqualTo(address.getNum());
+        assertThat(rowData.getString(1).toString()).isEqualTo(address.getStreet());
         if (schema != ADDRESS_SCHEMA_COMPATIBLE) {
-            assertEquals(address.getCity(), rowData.getString(2).toString());
-            assertEquals(address.getState(), rowData.getString(3).toString());
-            assertEquals(address.getZip(), rowData.getString(4).toString());
+            assertThat(rowData.getString(2).toString()).isEqualTo(address.getCity());
+            assertThat(rowData.getString(3).toString()).isEqualTo(address.getState());
+            assertThat(rowData.getString(4).toString()).isEqualTo(address.getZip());
         }
     }
 
diff --git a/flink-formats/flink-avro-confluent-registry/src/test/java/org/apache/flink/formats/avro/registry/confluent/debezium/DebeziumAvroFormatFactoryTest.java b/flink-formats/flink-avro-confluent-registry/src/test/java/org/apache/flink/formats/avro/registry/confluent/debezium/DebeziumAvroFormatFactoryTest.java
index eb91ccb..50d270b 100644
--- a/flink-formats/flink-avro-confluent-registry/src/test/java/org/apache/flink/formats/avro/registry/confluent/debezium/DebeziumAvroFormatFactoryTest.java
+++ b/flink-formats/flink-avro-confluent-registry/src/test/java/org/apache/flink/formats/avro/registry/confluent/debezium/DebeziumAvroFormatFactoryTest.java
@@ -31,25 +31,18 @@ import org.apache.flink.table.runtime.connector.sink.SinkRuntimeProviderContext;
 import org.apache.flink.table.runtime.connector.source.ScanRuntimeProviderContext;
 import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
 import org.apache.flink.table.types.logical.RowType;
-import org.apache.flink.util.TestLogger;
 
-import org.junit.Assert;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExpectedException;
+import org.junit.jupiter.api.Test;
 
 import java.util.HashMap;
 import java.util.Map;
 
-import static junit.framework.TestCase.assertEquals;
 import static org.apache.flink.table.factories.utils.FactoryMocks.createTableSink;
 import static org.apache.flink.table.factories.utils.FactoryMocks.createTableSource;
-import static org.hamcrest.CoreMatchers.instanceOf;
-import static org.junit.Assert.assertThat;
+import static org.assertj.core.api.Assertions.assertThat;
 
 /** Tests for {@link DebeziumAvroFormatFactory}. */
-public class DebeziumAvroFormatFactoryTest extends TestLogger {
-    @Rule public ExpectedException thrown = ExpectedException.none();
+class DebeziumAvroFormatFactoryTest {
 
     private static final ResolvedSchema SCHEMA =
             ResolvedSchema.of(
@@ -64,7 +57,7 @@ public class DebeziumAvroFormatFactoryTest extends TestLogger {
     private static final String REGISTRY_URL = "http://localhost:8081";
 
     @Test
-    public void testSeDeSchema() {
+    void testSeDeSchema() {
         final Map<String, String> options = getAllOptions();
 
         final Map<String, String> registryConfigs = new HashMap<>();
@@ -75,13 +68,13 @@ public class DebeziumAvroFormatFactoryTest extends TestLogger {
                 new DebeziumAvroDeserializationSchema(
                         ROW_TYPE, InternalTypeInfo.of(ROW_TYPE), REGISTRY_URL, registryConfigs);
         DeserializationSchema<RowData> actualDeser = createDeserializationSchema(options);
-        assertEquals(expectedDeser, actualDeser);
+        assertThat(actualDeser).isEqualTo(expectedDeser);
 
         DebeziumAvroSerializationSchema expectedSer =
                 new DebeziumAvroSerializationSchema(
                         ROW_TYPE, REGISTRY_URL, SUBJECT, registryConfigs);
         SerializationSchema<RowData> actualSer = createSerializationSchema(options);
-        Assert.assertEquals(expectedSer, actualSer);
+        assertThat(actualSer).isEqualTo(expectedSer);
     }
 
     private Map<String, String> getAllOptions() {
@@ -101,7 +94,7 @@ public class DebeziumAvroFormatFactoryTest extends TestLogger {
     private static DeserializationSchema<RowData> createDeserializationSchema(
             Map<String, String> options) {
         final DynamicTableSource actualSource = createTableSource(SCHEMA, options);
-        assertThat(actualSource, instanceOf(TestDynamicTableFactory.DynamicTableSourceMock.class));
+        assertThat(actualSource).isInstanceOf(TestDynamicTableFactory.DynamicTableSourceMock.class);
         TestDynamicTableFactory.DynamicTableSourceMock scanSourceMock =
                 (TestDynamicTableFactory.DynamicTableSourceMock) actualSource;
 
@@ -112,7 +105,7 @@ public class DebeziumAvroFormatFactoryTest extends TestLogger {
     private static SerializationSchema<RowData> createSerializationSchema(
             Map<String, String> options) {
         final DynamicTableSink actualSink = createTableSink(SCHEMA, options);
-        assertThat(actualSink, instanceOf(TestDynamicTableFactory.DynamicTableSinkMock.class));
+        assertThat(actualSink).isInstanceOf(TestDynamicTableFactory.DynamicTableSinkMock.class);
         TestDynamicTableFactory.DynamicTableSinkMock sinkMock =
                 (TestDynamicTableFactory.DynamicTableSinkMock) actualSink;
 
diff --git a/flink-formats/flink-avro-confluent-registry/src/test/java/org/apache/flink/formats/avro/registry/confluent/debezium/DebeziumAvroSerDeSchemaTest.java b/flink-formats/flink-avro-confluent-registry/src/test/java/org/apache/flink/formats/avro/registry/confluent/debezium/DebeziumAvroSerDeSchemaTest.java
index a113038..2640b50 100644
--- a/flink-formats/flink-avro-confluent-registry/src/test/java/org/apache/flink/formats/avro/registry/confluent/debezium/DebeziumAvroSerDeSchemaTest.java
+++ b/flink-formats/flink-avro-confluent-registry/src/test/java/org/apache/flink/formats/avro/registry/confluent/debezium/DebeziumAvroSerDeSchemaTest.java
@@ -28,6 +28,8 @@ import org.apache.flink.formats.avro.RegistryAvroSerializationSchema;
 import org.apache.flink.formats.avro.RowDataToAvroConverters;
 import org.apache.flink.formats.avro.registry.confluent.ConfluentSchemaRegistryCoder;
 import org.apache.flink.formats.avro.typeutils.AvroSchemaConverter;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
 import org.apache.flink.table.data.GenericRowData;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.data.StringData;
@@ -35,12 +37,14 @@ import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
 import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.FileUtils;
+import org.apache.flink.util.SimpleUserCodeClassLoader;
+import org.apache.flink.util.UserCodeClassLoader;
 
 import io.confluent.kafka.schemaregistry.client.MockSchemaRegistryClient;
 import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
 import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericRecord;
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
 
 import java.io.File;
 import java.io.IOException;
@@ -58,11 +62,10 @@ import static org.apache.flink.table.api.DataTypes.FIELD;
 import static org.apache.flink.table.api.DataTypes.ROW;
 import static org.apache.flink.table.api.DataTypes.STRING;
 import static org.apache.flink.table.types.utils.TypeConversions.fromLogicalToDataType;
-import static org.junit.Assert.assertEquals;
-import static org.mockito.Mockito.mock;
+import static org.assertj.core.api.Assertions.assertThat;
 
 /** Tests for {@link DebeziumAvroDeserializationSchema}. */
-public class DebeziumAvroSerDeSchemaTest {
+class DebeziumAvroSerDeSchemaTest {
 
     private static final String SUBJECT = "testDebeziumAvro";
 
@@ -78,10 +81,10 @@ public class DebeziumAvroSerDeSchemaTest {
     private static final Schema DEBEZIUM_SCHEMA_COMPATIBLE_TEST =
             new Schema.Parser().parse(new String(readBytesFromFile("debezium-test-schema.json")));
 
-    private SchemaRegistryClient client = new MockSchemaRegistryClient();
+    private final SchemaRegistryClient client = new MockSchemaRegistryClient();
 
     @Test
-    public void testSerializationDeserialization() throws Exception {
+    void testSerializationDeserialization() throws Exception {
 
         RowType rowTypeDe =
                 DebeziumAvroDeserializationSchema.createDebeziumAvroRowType(
@@ -92,7 +95,7 @@ public class DebeziumAvroSerDeSchemaTest {
 
         DebeziumAvroSerializationSchema dbzSerializer =
                 new DebeziumAvroSerializationSchema(getSerializationSchema(rowTypeSe));
-        dbzSerializer.open(mock(SerializationSchema.InitializationContext.class));
+        dbzSerializer.open(new MockInitializationContext());
 
         byte[] serialize = dbzSerializer.serialize(debeziumRow2RowData());
 
@@ -100,7 +103,7 @@ public class DebeziumAvroSerDeSchemaTest {
         DebeziumAvroDeserializationSchema dbzDeserializer =
                 new DebeziumAvroDeserializationSchema(
                         InternalTypeInfo.of(rowType), getDeserializationSchema(rowTypeDe));
-        dbzDeserializer.open(mock(DeserializationSchema.InitializationContext.class));
+        dbzDeserializer.open(new MockInitializationContext());
 
         SimpleCollector collector = new SimpleCollector();
         dbzDeserializer.deserialize(serialize, collector);
@@ -110,37 +113,37 @@ public class DebeziumAvroSerDeSchemaTest {
 
         List<String> expected =
                 Collections.singletonList("+I(107,rocks,box of assorted rocks,5.3)");
-        assertEquals(expected, actual);
+        assertThat(actual).isEqualTo(expected);
     }
 
     @Test
-    public void testInsertDataDeserialization() throws Exception {
+    void testInsertDataDeserialization() throws Exception {
         List<String> actual = testDeserialization("debezium-avro-insert.avro");
 
         List<String> expected =
                 Collections.singletonList("+I(1,lisi,test debezium avro data,21.799999237060547)");
-        assertEquals(expected, actual);
+        assertThat(actual).isEqualTo(expected);
     }
 
     @Test
-    public void testUpdateDataDeserialization() throws Exception {
+    void testUpdateDataDeserialization() throws Exception {
         List<String> actual = testDeserialization("debezium-avro-update.avro");
 
         List<String> expected =
                 Arrays.asList(
                         "-U(1,lisi,test debezium avro data,21.799999237060547)",
                         "+U(1,zhangsan,test debezium avro data,21.799999237060547)");
-        assertEquals(expected, actual);
+        assertThat(actual).isEqualTo(expected);
     }
 
     @Test
-    public void testDeleteDataDeserialization() throws Exception {
+    void testDeleteDataDeserialization() throws Exception {
         List<String> actual = testDeserialization("debezium-avro-delete.avro");
 
         List<String> expected =
                 Collections.singletonList(
                         "-D(1,zhangsan,test debezium avro data,21.799999237060547)");
-        assertEquals(expected, actual);
+        assertThat(actual).isEqualTo(expected);
     }
 
     public List<String> testDeserialization(String dataPath) throws Exception {
@@ -153,7 +156,7 @@ public class DebeziumAvroSerDeSchemaTest {
         DebeziumAvroDeserializationSchema dbzDeserializer =
                 new DebeziumAvroDeserializationSchema(
                         InternalTypeInfo.of(rowType), getDeserializationSchema(rowTypeDe));
-        dbzDeserializer.open(mock(DeserializationSchema.InitializationContext.class));
+        dbzDeserializer.open(new MockInitializationContext());
 
         SimpleCollector collector = new SimpleCollector();
         dbzDeserializer.deserialize(readBytesFromFile(dataPath), collector);
@@ -200,7 +203,7 @@ public class DebeziumAvroSerDeSchemaTest {
     private static byte[] readBytesFromFile(String filePath) {
         try {
             URL url = DebeziumAvroSerDeSchemaTest.class.getClassLoader().getResource(filePath);
-            assert url != null;
+            assertThat(url).isNotNull();
             Path path = new File(url.getFile()).toPath();
             return FileUtils.readAllBytes(path);
         } catch (IOException e) {
@@ -210,7 +213,7 @@ public class DebeziumAvroSerDeSchemaTest {
 
     private static class SimpleCollector implements Collector<RowData> {
 
-        private List<RowData> list = new ArrayList<>();
+        private final List<RowData> list = new ArrayList<>();
 
         @Override
         public void collect(RowData record) {
@@ -222,4 +225,18 @@ public class DebeziumAvroSerDeSchemaTest {
             // do nothing
         }
     }
+
+    private static class MockInitializationContext
+            implements DeserializationSchema.InitializationContext,
+                    SerializationSchema.InitializationContext {
+        @Override
+        public MetricGroup getMetricGroup() {
+            return new UnregisteredMetricsGroup();
+        }
+
+        @Override
+        public UserCodeClassLoader getUserCodeClassLoader() {
+            return SimpleUserCodeClassLoader.create(getClass().getClassLoader());
+        }
+    }
 }
diff --git a/flink-formats/flink-avro-confluent-registry/src/test/resources/META-INF/services/org.junit.jupiter.api.extension.Extension b/flink-formats/flink-avro-confluent-registry/src/test/resources/META-INF/services/org.junit.jupiter.api.extension.Extension
new file mode 100644
index 0000000..2899913
--- /dev/null
+++ b/flink-formats/flink-avro-confluent-registry/src/test/resources/META-INF/services/org.junit.jupiter.api.extension.Extension
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.flink.util.TestLoggerExtension
\ No newline at end of file