You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2022/03/25 09:31:37 UTC
[flink] branch master updated: [FLINK-26736][tests] Migrate flink-avro-confluent-registry to JUnit5
This is an automated email from the ASF dual-hosted git repository.
chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new e3b123d [FLINK-26736][tests] Migrate flink-avro-confluent-registry to JUnit5
e3b123d is described below
commit e3b123d7d1e48e7adbb04fb3470d02bb76f5ff73
Author: Ryan Skraba <rs...@apache.org>
AuthorDate: Fri Mar 25 10:31:02 2022 +0100
[FLINK-26736][tests] Migrate flink-avro-confluent-registry to JUnit5
---
.../confluent/CachedSchemaCoderProviderTest.java | 36 ++++++------
.../ConfluentSchemaRegistryCoderTest.java | 26 +++++----
.../confluent/RegistryAvroFormatFactoryTest.java | 54 ++++++++---------
.../RegistryAvroRowDataSeDeSchemaTest.java | 67 ++++++++++------------
.../debezium/DebeziumAvroFormatFactoryTest.java | 23 +++-----
.../debezium/DebeziumAvroSerDeSchemaTest.java | 53 +++++++++++------
.../org.junit.jupiter.api.extension.Extension | 16 ++++++
7 files changed, 143 insertions(+), 132 deletions(-)
diff --git a/flink-formats/flink-avro-confluent-registry/src/test/java/org/apache/flink/formats/avro/registry/confluent/CachedSchemaCoderProviderTest.java b/flink-formats/flink-avro-confluent-registry/src/test/java/org/apache/flink/formats/avro/registry/confluent/CachedSchemaCoderProviderTest.java
index db877d8..cddbf13 100644
--- a/flink-formats/flink-avro-confluent-registry/src/test/java/org/apache/flink/formats/avro/registry/confluent/CachedSchemaCoderProviderTest.java
+++ b/flink-formats/flink-avro-confluent-registry/src/test/java/org/apache/flink/formats/avro/registry/confluent/CachedSchemaCoderProviderTest.java
@@ -22,7 +22,7 @@ import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient;
import io.confluent.kafka.schemaregistry.client.rest.RestService;
import io.confluent.kafka.schemaregistry.client.security.basicauth.BasicAuthCredentialProvider;
import io.confluent.kafka.schemaregistry.client.security.bearerauth.BearerAuthCredentialProvider;
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
import org.powermock.reflect.Whitebox;
import javax.net.ssl.SSLSocketFactory;
@@ -31,26 +31,24 @@ import java.net.URISyntaxException;
import java.util.HashMap;
import java.util.Map;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
+import static org.assertj.core.api.Assertions.assertThat;
/**
* Tests for properties set by {@link RegistryAvroFormatFactory} in {@link
* CachedSchemaCoderProvider}.
*/
-public class CachedSchemaCoderProviderTest {
+class CachedSchemaCoderProviderTest {
@Test
- public void testThatSslIsNotInitializedForNoSslProperties() {
+ void testThatSslIsNotInitializedForNoSslProperties() {
CachedSchemaCoderProvider provider = initCachedSchemaCoderProvider(new HashMap<>());
SSLSocketFactory sslSocketFactory = getSslSocketFactoryFromProvider(provider);
- assertNull(sslSocketFactory);
+ assertThat(sslSocketFactory).isNull();
}
@Test
- public void testThatSslIsInitializedForSslProperties() throws URISyntaxException {
+ void testThatSslIsInitializedForSslProperties() throws URISyntaxException {
String keystoreFile = getAbsolutePath("/test-keystore.jks");
String keystorePassword = "123456";
Map<String, String> configs = new HashMap<>();
@@ -62,20 +60,20 @@ public class CachedSchemaCoderProviderTest {
CachedSchemaCoderProvider provider = initCachedSchemaCoderProvider(configs);
SSLSocketFactory sslSocketFactory = getSslSocketFactoryFromProvider(provider);
- assertNotNull(sslSocketFactory);
+ assertThat(sslSocketFactory).isNotNull();
}
@Test
- public void testThatBasicAuthIsNotInitializedForNoBasicAuthProperties() {
+ void testThatBasicAuthIsNotInitializedForNoBasicAuthProperties() {
CachedSchemaCoderProvider provider = initCachedSchemaCoderProvider(new HashMap<>());
BasicAuthCredentialProvider basicAuthCredentialProvider =
getBasicAuthFromProvider(provider);
- assertNull(basicAuthCredentialProvider);
+ assertThat(basicAuthCredentialProvider).isNull();
}
@Test
- public void testThatBasicAuthIsInitializedForBasicAuthProperties() {
+ void testThatBasicAuthIsInitializedForBasicAuthProperties() {
String userPassword = "user:pwd";
Map<String, String> configs = new HashMap<>();
configs.put("basic.auth.credentials.source", "USER_INFO");
@@ -85,21 +83,21 @@ public class CachedSchemaCoderProviderTest {
BasicAuthCredentialProvider basicAuthCredentialProvider =
getBasicAuthFromProvider(provider);
- assertNotNull(basicAuthCredentialProvider);
- assertEquals(basicAuthCredentialProvider.getUserInfo(null), userPassword);
+ assertThat(basicAuthCredentialProvider).isNotNull();
+ assertThat(basicAuthCredentialProvider.getUserInfo(null)).isEqualTo(userPassword);
}
@Test
- public void testThatBearerAuthIsNotInitializedForNoBearerAuthProperties() {
+ void testThatBearerAuthIsNotInitializedForNoBearerAuthProperties() {
CachedSchemaCoderProvider provider = initCachedSchemaCoderProvider(new HashMap<>());
BearerAuthCredentialProvider bearerAuthCredentialProvider =
getBearerAuthFromProvider(provider);
- assertNull(bearerAuthCredentialProvider);
+ assertThat(bearerAuthCredentialProvider).isNull();
}
@Test
- public void testThatBearerAuthIsInitializedForBearerAuthProperties() {
+ void testThatBearerAuthIsInitializedForBearerAuthProperties() {
String token = "123456";
Map<String, String> configs = new HashMap<>();
configs.put("bearer.auth.credentials.source", "STATIC_TOKEN");
@@ -109,8 +107,8 @@ public class CachedSchemaCoderProviderTest {
BearerAuthCredentialProvider bearerAuthCredentialProvider =
getBearerAuthFromProvider(provider);
- assertNotNull(bearerAuthCredentialProvider);
- assertEquals(bearerAuthCredentialProvider.getBearerToken(null), token);
+ assertThat(bearerAuthCredentialProvider).isNotNull();
+ assertThat(bearerAuthCredentialProvider.getBearerToken(null)).isEqualTo(token);
}
private String getAbsolutePath(String path) throws URISyntaxException {
diff --git a/flink-formats/flink-avro-confluent-registry/src/test/java/org/apache/flink/formats/avro/registry/confluent/ConfluentSchemaRegistryCoderTest.java b/flink-formats/flink-avro-confluent-registry/src/test/java/org/apache/flink/formats/avro/registry/confluent/ConfluentSchemaRegistryCoderTest.java
index 009ef02..2c93348 100644
--- a/flink-formats/flink-avro-confluent-registry/src/test/java/org/apache/flink/formats/avro/registry/confluent/ConfluentSchemaRegistryCoderTest.java
+++ b/flink-formats/flink-avro-confluent-registry/src/test/java/org/apache/flink/formats/avro/registry/confluent/ConfluentSchemaRegistryCoderTest.java
@@ -21,20 +21,21 @@ package org.apache.flink.formats.avro.registry.confluent;
import io.confluent.kafka.schemaregistry.client.MockSchemaRegistryClient;
import org.apache.avro.Schema;
import org.apache.avro.SchemaBuilder;
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream;
import java.io.IOException;
-import static org.junit.Assert.assertEquals;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
/** Tests for {@link ConfluentSchemaRegistryCoder}. */
-public class ConfluentSchemaRegistryCoderTest {
+class ConfluentSchemaRegistryCoderTest {
@Test
- public void testSpecificRecordWithConfluentSchemaRegistry() throws Exception {
+ void testSpecificRecordWithConfluentSchemaRegistry() throws Exception {
MockSchemaRegistryClient client = new MockSchemaRegistryClient();
Schema schema =
@@ -51,12 +52,12 @@ public class ConfluentSchemaRegistryCoderTest {
ByteArrayInputStream byteInStream = new ByteArrayInputStream(byteOutStream.toByteArray());
Schema readSchema = registryCoder.readSchema(byteInStream);
- assertEquals(schema, readSchema);
- assertEquals(0, byteInStream.available());
+ assertThat(readSchema).isEqualTo(schema);
+ assertThat(byteInStream).isEmpty();
}
- @Test(expected = IOException.class)
- public void testMagicByteVerification() throws Exception {
+ @Test
+ void testMagicByteVerification() throws Exception {
MockSchemaRegistryClient client = new MockSchemaRegistryClient();
int schemaId = client.register("testTopic", Schema.create(Schema.Type.BOOLEAN));
@@ -67,9 +68,10 @@ public class ConfluentSchemaRegistryCoderTest {
dataOutputStream.writeInt(schemaId);
dataOutputStream.flush();
- ByteArrayInputStream byteInStream = new ByteArrayInputStream(byteOutStream.toByteArray());
- coder.readSchema(byteInStream);
-
- // exception is thrown
+ try (ByteArrayInputStream byteInStream =
+ new ByteArrayInputStream(byteOutStream.toByteArray())) {
+ assertThatThrownBy(() -> coder.readSchema(byteInStream))
+ .isInstanceOf(IOException.class);
+ }
}
}
diff --git a/flink-formats/flink-avro-confluent-registry/src/test/java/org/apache/flink/formats/avro/registry/confluent/RegistryAvroFormatFactoryTest.java b/flink-formats/flink-avro-confluent-registry/src/test/java/org/apache/flink/formats/avro/registry/confluent/RegistryAvroFormatFactoryTest.java
index 7c5b426..cf80adf 100644
--- a/flink-formats/flink-avro-confluent-registry/src/test/java/org/apache/flink/formats/avro/registry/confluent/RegistryAvroFormatFactoryTest.java
+++ b/flink-formats/flink-avro-confluent-registry/src/test/java/org/apache/flink/formats/avro/registry/confluent/RegistryAvroFormatFactoryTest.java
@@ -37,23 +37,20 @@ import org.apache.flink.table.runtime.connector.source.ScanRuntimeProviderContex
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.types.logical.RowType;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExpectedException;
+import org.junit.jupiter.api.Test;
import java.util.HashMap;
import java.util.Map;
import java.util.function.Consumer;
-import static org.apache.flink.core.testutils.FlinkMatchers.containsCause;
+import static org.apache.flink.core.testutils.FlinkAssertions.anyCauseMatches;
import static org.apache.flink.table.factories.utils.FactoryMocks.createTableSink;
import static org.apache.flink.table.factories.utils.FactoryMocks.createTableSource;
-import static org.hamcrest.CoreMatchers.instanceOf;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertThat;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
/** Tests for the {@link RegistryAvroFormatFactory}. */
-public class RegistryAvroFormatFactoryTest {
+class RegistryAvroFormatFactoryTest {
private static final ResolvedSchema SCHEMA =
ResolvedSchema.of(
@@ -81,10 +78,8 @@ public class RegistryAvroFormatFactoryTest {
EXPECTED_OPTIONAL_PROPERTIES.put("bearer.auth.token", "CUSTOM");
}
- @Rule public ExpectedException thrown = ExpectedException.none();
-
@Test
- public void testDeserializationSchema() {
+ void testDeserializationSchema() {
final AvroRowDataDeserializationSchema expectedDeser =
new AvroRowDataDeserializationSchema(
ConfluentRegistryAvroDeserializationSchema.forGeneric(
@@ -93,7 +88,7 @@ public class RegistryAvroFormatFactoryTest {
InternalTypeInfo.of(ROW_TYPE));
final DynamicTableSource actualSource = createTableSource(SCHEMA, getDefaultOptions());
- assertThat(actualSource, instanceOf(TestDynamicTableFactory.DynamicTableSourceMock.class));
+ assertThat(actualSource).isInstanceOf(TestDynamicTableFactory.DynamicTableSourceMock.class);
TestDynamicTableFactory.DynamicTableSourceMock scanSourceMock =
(TestDynamicTableFactory.DynamicTableSourceMock) actualSource;
@@ -101,11 +96,11 @@ public class RegistryAvroFormatFactoryTest {
scanSourceMock.valueFormat.createRuntimeDecoder(
ScanRuntimeProviderContext.INSTANCE, SCHEMA.toPhysicalRowDataType());
- assertEquals(expectedDeser, actualDeser);
+ assertThat(actualDeser).isEqualTo(expectedDeser);
}
@Test
- public void testSerializationSchema() {
+ void testSerializationSchema() {
final AvroRowDataSerializationSchema expectedSer =
new AvroRowDataSerializationSchema(
ROW_TYPE,
@@ -116,32 +111,31 @@ public class RegistryAvroFormatFactoryTest {
RowDataToAvroConverters.createConverter(ROW_TYPE));
final DynamicTableSink actualSink = createTableSink(SCHEMA, getDefaultOptions());
- assertThat(actualSink, instanceOf(TestDynamicTableFactory.DynamicTableSinkMock.class));
+ assertThat(actualSink).isInstanceOf(TestDynamicTableFactory.DynamicTableSinkMock.class);
TestDynamicTableFactory.DynamicTableSinkMock sinkMock =
(TestDynamicTableFactory.DynamicTableSinkMock) actualSink;
SerializationSchema<RowData> actualSer =
sinkMock.valueFormat.createRuntimeEncoder(null, SCHEMA.toPhysicalRowDataType());
- assertEquals(expectedSer, actualSer);
+ assertThat(actualSer).isEqualTo(expectedSer);
}
@Test
- public void testMissingSubjectForSink() {
- thrown.expect(ValidationException.class);
- thrown.expect(
- containsCause(
- new ValidationException(
- "Option avro-confluent.subject is required for serialization")));
-
+ void testMissingSubjectForSink() {
final Map<String, String> options =
getModifiedOptions(opts -> opts.remove("avro-confluent.subject"));
- createTableSink(SCHEMA, options);
+ assertThatThrownBy(() -> createTableSink(SCHEMA, options))
+ .isInstanceOf(ValidationException.class)
+ .satisfies(
+ anyCauseMatches(
+ ValidationException.class,
+ "Option avro-confluent.subject is required for serialization"));
}
@Test
- public void testDeserializationSchemaWithOptionalProperties() {
+ void testDeserializationSchemaWithOptionalProperties() {
final AvroRowDataDeserializationSchema expectedDeser =
new AvroRowDataDeserializationSchema(
ConfluentRegistryAvroDeserializationSchema.forGeneric(
@@ -152,7 +146,7 @@ public class RegistryAvroFormatFactoryTest {
InternalTypeInfo.of(ROW_TYPE));
final DynamicTableSource actualSource = createTableSource(SCHEMA, getOptionalProperties());
- assertThat(actualSource, instanceOf(TestDynamicTableFactory.DynamicTableSourceMock.class));
+ assertThat(actualSource).isInstanceOf(TestDynamicTableFactory.DynamicTableSourceMock.class);
TestDynamicTableFactory.DynamicTableSourceMock scanSourceMock =
(TestDynamicTableFactory.DynamicTableSourceMock) actualSource;
@@ -160,11 +154,11 @@ public class RegistryAvroFormatFactoryTest {
scanSourceMock.valueFormat.createRuntimeDecoder(
ScanRuntimeProviderContext.INSTANCE, SCHEMA.toPhysicalRowDataType());
- assertEquals(expectedDeser, actualDeser);
+ assertThat(actualDeser).isEqualTo(expectedDeser);
}
@Test
- public void testSerializationSchemaWithOptionalProperties() {
+ void testSerializationSchemaWithOptionalProperties() {
final AvroRowDataSerializationSchema expectedSer =
new AvroRowDataSerializationSchema(
ROW_TYPE,
@@ -176,14 +170,14 @@ public class RegistryAvroFormatFactoryTest {
RowDataToAvroConverters.createConverter(ROW_TYPE));
final DynamicTableSink actualSink = createTableSink(SCHEMA, getOptionalProperties());
- assertThat(actualSink, instanceOf(TestDynamicTableFactory.DynamicTableSinkMock.class));
+ assertThat(actualSink).isInstanceOf(TestDynamicTableFactory.DynamicTableSinkMock.class);
TestDynamicTableFactory.DynamicTableSinkMock sinkMock =
(TestDynamicTableFactory.DynamicTableSinkMock) actualSink;
SerializationSchema<RowData> actualSer =
sinkMock.valueFormat.createRuntimeEncoder(null, SCHEMA.toPhysicalRowDataType());
- assertEquals(expectedSer, actualSer);
+ assertThat(actualSer).isEqualTo(expectedSer);
}
// ------------------------------------------------------------------------
diff --git a/flink-formats/flink-avro-confluent-registry/src/test/java/org/apache/flink/formats/avro/registry/confluent/RegistryAvroRowDataSeDeSchemaTest.java b/flink-formats/flink-avro-confluent-registry/src/test/java/org/apache/flink/formats/avro/registry/confluent/RegistryAvroRowDataSeDeSchemaTest.java
index 0061e8b..a5046ac 100644
--- a/flink-formats/flink-avro-confluent-registry/src/test/java/org/apache/flink/formats/avro/registry/confluent/RegistryAvroRowDataSeDeSchemaTest.java
+++ b/flink-formats/flink-avro-confluent-registry/src/test/java/org/apache/flink/formats/avro/registry/confluent/RegistryAvroRowDataSeDeSchemaTest.java
@@ -39,29 +39,23 @@ import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExpectedException;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
import java.io.IOException;
import java.util.Random;
-import static org.apache.flink.core.testutils.FlinkMatchers.containsCause;
import static org.apache.flink.formats.avro.utils.AvroTestUtils.writeRecord;
-import static org.hamcrest.CoreMatchers.equalTo;
-import static org.hamcrest.core.Is.is;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertThat;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
/**
* Tests for {@link AvroRowDataDeserializationSchema} and {@link AvroRowDataSerializationSchema} for
* schema registry avro.
*/
-public class RegistryAvroRowDataSeDeSchemaTest {
+class RegistryAvroRowDataSeDeSchemaTest {
private static final Schema ADDRESS_SCHEMA = Address.getClassSchema();
private static final Schema ADDRESS_SCHEMA_COMPATIBLE =
@@ -83,45 +77,43 @@ public class RegistryAvroRowDataSeDeSchemaTest {
private Address address;
- @Rule public ExpectedException expectedEx = ExpectedException.none();
-
- @BeforeClass
- public static void beforeClass() {
+ @BeforeAll
+ static void beforeClass() {
client = new MockSchemaRegistryClient();
}
- @Before
- public void before() {
+ @BeforeEach
+ void before() {
this.address = TestDataGenerator.generateRandomAddress(new Random());
}
- @After
- public void after() throws IOException, RestClientException {
+ @AfterEach
+ void after() throws IOException, RestClientException {
client.deleteSubject(SUBJECT);
}
@Test
- public void testRowDataWriteReadWithFullSchema() throws Exception {
+ void testRowDataWriteReadWithFullSchema() throws Exception {
testRowDataWriteReadWithSchema(ADDRESS_SCHEMA);
}
@Test
- public void testRowDataWriteReadWithCompatibleSchema() throws Exception {
+ void testRowDataWriteReadWithCompatibleSchema() throws Exception {
testRowDataWriteReadWithSchema(ADDRESS_SCHEMA_COMPATIBLE);
// Validates new schema has been registered.
- assertThat(client.getAllVersions(SUBJECT).size(), is(1));
+ assertThat(client.getAllVersions(SUBJECT)).hasSize(1);
}
@Test
- public void testRowDataWriteReadWithPreRegisteredSchema() throws Exception {
+ void testRowDataWriteReadWithPreRegisteredSchema() throws Exception {
client.register(SUBJECT, ADDRESS_SCHEMA);
testRowDataWriteReadWithSchema(ADDRESS_SCHEMA);
// Validates it does not produce new schema.
- assertThat(client.getAllVersions(SUBJECT).size(), is(1));
+ assertThat(client.getAllVersions(SUBJECT)).hasSize(1);
}
@Test
- public void testRowDataReadWithNonRegistryAvro() throws Exception {
+ void testRowDataReadWithNonRegistryAvro() throws Exception {
DataType dataType = AvroSchemaConverter.convertToDataType(ADDRESS_SCHEMA.toString());
RowType rowType = (RowType) dataType.getLogicalType();
@@ -132,10 +124,9 @@ public class RegistryAvroRowDataSeDeSchemaTest {
client.register(SUBJECT, ADDRESS_SCHEMA);
byte[] oriBytes = writeRecord(address, ADDRESS_SCHEMA);
- expectedEx.expect(IOException.class);
- expectedEx.expect(
- containsCause(new IOException("Unknown data format. Magic number does not match")));
- deserializer.deserialize(oriBytes);
+ assertThatThrownBy(() -> deserializer.deserialize(oriBytes))
+ .isInstanceOf(IOException.class)
+ .hasCause(new IOException("Unknown data format. Magic number does not match"));
}
private void testRowDataWriteReadWithSchema(Schema schema) throws Exception {
@@ -150,18 +141,18 @@ public class RegistryAvroRowDataSeDeSchemaTest {
serializer.open(null);
deserializer.open(null);
- assertNull(deserializer.deserialize(null));
+ assertThat(deserializer.deserialize(null)).isNull();
RowData oriData = address2RowData(address);
byte[] serialized = serializer.serialize(oriData);
RowData rowData = deserializer.deserialize(serialized);
- assertThat(rowData.getArity(), equalTo(schema.getFields().size()));
- assertEquals(address.getNum(), rowData.getInt(0));
- assertEquals(address.getStreet(), rowData.getString(1).toString());
+ assertThat(rowData.getArity()).isEqualTo(schema.getFields().size());
+ assertThat(rowData.getInt(0)).isEqualTo(address.getNum());
+ assertThat(rowData.getString(1).toString()).isEqualTo(address.getStreet());
if (schema != ADDRESS_SCHEMA_COMPATIBLE) {
- assertEquals(address.getCity(), rowData.getString(2).toString());
- assertEquals(address.getState(), rowData.getString(3).toString());
- assertEquals(address.getZip(), rowData.getString(4).toString());
+ assertThat(rowData.getString(2).toString()).isEqualTo(address.getCity());
+ assertThat(rowData.getString(3).toString()).isEqualTo(address.getState());
+ assertThat(rowData.getString(4).toString()).isEqualTo(address.getZip());
}
}
diff --git a/flink-formats/flink-avro-confluent-registry/src/test/java/org/apache/flink/formats/avro/registry/confluent/debezium/DebeziumAvroFormatFactoryTest.java b/flink-formats/flink-avro-confluent-registry/src/test/java/org/apache/flink/formats/avro/registry/confluent/debezium/DebeziumAvroFormatFactoryTest.java
index eb91ccb..50d270b 100644
--- a/flink-formats/flink-avro-confluent-registry/src/test/java/org/apache/flink/formats/avro/registry/confluent/debezium/DebeziumAvroFormatFactoryTest.java
+++ b/flink-formats/flink-avro-confluent-registry/src/test/java/org/apache/flink/formats/avro/registry/confluent/debezium/DebeziumAvroFormatFactoryTest.java
@@ -31,25 +31,18 @@ import org.apache.flink.table.runtime.connector.sink.SinkRuntimeProviderContext;
import org.apache.flink.table.runtime.connector.source.ScanRuntimeProviderContext;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.types.logical.RowType;
-import org.apache.flink.util.TestLogger;
-import org.junit.Assert;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExpectedException;
+import org.junit.jupiter.api.Test;
import java.util.HashMap;
import java.util.Map;
-import static junit.framework.TestCase.assertEquals;
import static org.apache.flink.table.factories.utils.FactoryMocks.createTableSink;
import static org.apache.flink.table.factories.utils.FactoryMocks.createTableSource;
-import static org.hamcrest.CoreMatchers.instanceOf;
-import static org.junit.Assert.assertThat;
+import static org.assertj.core.api.Assertions.assertThat;
/** Tests for {@link DebeziumAvroFormatFactory}. */
-public class DebeziumAvroFormatFactoryTest extends TestLogger {
- @Rule public ExpectedException thrown = ExpectedException.none();
+class DebeziumAvroFormatFactoryTest {
private static final ResolvedSchema SCHEMA =
ResolvedSchema.of(
@@ -64,7 +57,7 @@ public class DebeziumAvroFormatFactoryTest extends TestLogger {
private static final String REGISTRY_URL = "http://localhost:8081";
@Test
- public void testSeDeSchema() {
+ void testSeDeSchema() {
final Map<String, String> options = getAllOptions();
final Map<String, String> registryConfigs = new HashMap<>();
@@ -75,13 +68,13 @@ public class DebeziumAvroFormatFactoryTest extends TestLogger {
new DebeziumAvroDeserializationSchema(
ROW_TYPE, InternalTypeInfo.of(ROW_TYPE), REGISTRY_URL, registryConfigs);
DeserializationSchema<RowData> actualDeser = createDeserializationSchema(options);
- assertEquals(expectedDeser, actualDeser);
+ assertThat(actualDeser).isEqualTo(expectedDeser);
DebeziumAvroSerializationSchema expectedSer =
new DebeziumAvroSerializationSchema(
ROW_TYPE, REGISTRY_URL, SUBJECT, registryConfigs);
SerializationSchema<RowData> actualSer = createSerializationSchema(options);
- Assert.assertEquals(expectedSer, actualSer);
+ assertThat(actualSer).isEqualTo(expectedSer);
}
private Map<String, String> getAllOptions() {
@@ -101,7 +94,7 @@ public class DebeziumAvroFormatFactoryTest extends TestLogger {
private static DeserializationSchema<RowData> createDeserializationSchema(
Map<String, String> options) {
final DynamicTableSource actualSource = createTableSource(SCHEMA, options);
- assertThat(actualSource, instanceOf(TestDynamicTableFactory.DynamicTableSourceMock.class));
+ assertThat(actualSource).isInstanceOf(TestDynamicTableFactory.DynamicTableSourceMock.class);
TestDynamicTableFactory.DynamicTableSourceMock scanSourceMock =
(TestDynamicTableFactory.DynamicTableSourceMock) actualSource;
@@ -112,7 +105,7 @@ public class DebeziumAvroFormatFactoryTest extends TestLogger {
private static SerializationSchema<RowData> createSerializationSchema(
Map<String, String> options) {
final DynamicTableSink actualSink = createTableSink(SCHEMA, options);
- assertThat(actualSink, instanceOf(TestDynamicTableFactory.DynamicTableSinkMock.class));
+ assertThat(actualSink).isInstanceOf(TestDynamicTableFactory.DynamicTableSinkMock.class);
TestDynamicTableFactory.DynamicTableSinkMock sinkMock =
(TestDynamicTableFactory.DynamicTableSinkMock) actualSink;
diff --git a/flink-formats/flink-avro-confluent-registry/src/test/java/org/apache/flink/formats/avro/registry/confluent/debezium/DebeziumAvroSerDeSchemaTest.java b/flink-formats/flink-avro-confluent-registry/src/test/java/org/apache/flink/formats/avro/registry/confluent/debezium/DebeziumAvroSerDeSchemaTest.java
index a113038..2640b50 100644
--- a/flink-formats/flink-avro-confluent-registry/src/test/java/org/apache/flink/formats/avro/registry/confluent/debezium/DebeziumAvroSerDeSchemaTest.java
+++ b/flink-formats/flink-avro-confluent-registry/src/test/java/org/apache/flink/formats/avro/registry/confluent/debezium/DebeziumAvroSerDeSchemaTest.java
@@ -28,6 +28,8 @@ import org.apache.flink.formats.avro.RegistryAvroSerializationSchema;
import org.apache.flink.formats.avro.RowDataToAvroConverters;
import org.apache.flink.formats.avro.registry.confluent.ConfluentSchemaRegistryCoder;
import org.apache.flink.formats.avro.typeutils.AvroSchemaConverter;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData;
@@ -35,12 +37,14 @@ import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.util.Collector;
import org.apache.flink.util.FileUtils;
+import org.apache.flink.util.SimpleUserCodeClassLoader;
+import org.apache.flink.util.UserCodeClassLoader;
import io.confluent.kafka.schemaregistry.client.MockSchemaRegistryClient;
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
import java.io.File;
import java.io.IOException;
@@ -58,11 +62,10 @@ import static org.apache.flink.table.api.DataTypes.FIELD;
import static org.apache.flink.table.api.DataTypes.ROW;
import static org.apache.flink.table.api.DataTypes.STRING;
import static org.apache.flink.table.types.utils.TypeConversions.fromLogicalToDataType;
-import static org.junit.Assert.assertEquals;
-import static org.mockito.Mockito.mock;
+import static org.assertj.core.api.Assertions.assertThat;
/** Tests for {@link DebeziumAvroDeserializationSchema}. */
-public class DebeziumAvroSerDeSchemaTest {
+class DebeziumAvroSerDeSchemaTest {
private static final String SUBJECT = "testDebeziumAvro";
@@ -78,10 +81,10 @@ public class DebeziumAvroSerDeSchemaTest {
private static final Schema DEBEZIUM_SCHEMA_COMPATIBLE_TEST =
new Schema.Parser().parse(new String(readBytesFromFile("debezium-test-schema.json")));
- private SchemaRegistryClient client = new MockSchemaRegistryClient();
+ private final SchemaRegistryClient client = new MockSchemaRegistryClient();
@Test
- public void testSerializationDeserialization() throws Exception {
+ void testSerializationDeserialization() throws Exception {
RowType rowTypeDe =
DebeziumAvroDeserializationSchema.createDebeziumAvroRowType(
@@ -92,7 +95,7 @@ public class DebeziumAvroSerDeSchemaTest {
DebeziumAvroSerializationSchema dbzSerializer =
new DebeziumAvroSerializationSchema(getSerializationSchema(rowTypeSe));
- dbzSerializer.open(mock(SerializationSchema.InitializationContext.class));
+ dbzSerializer.open(new MockInitializationContext());
byte[] serialize = dbzSerializer.serialize(debeziumRow2RowData());
@@ -100,7 +103,7 @@ public class DebeziumAvroSerDeSchemaTest {
DebeziumAvroDeserializationSchema dbzDeserializer =
new DebeziumAvroDeserializationSchema(
InternalTypeInfo.of(rowType), getDeserializationSchema(rowTypeDe));
- dbzDeserializer.open(mock(DeserializationSchema.InitializationContext.class));
+ dbzDeserializer.open(new MockInitializationContext());
SimpleCollector collector = new SimpleCollector();
dbzDeserializer.deserialize(serialize, collector);
@@ -110,37 +113,37 @@ public class DebeziumAvroSerDeSchemaTest {
List<String> expected =
Collections.singletonList("+I(107,rocks,box of assorted rocks,5.3)");
- assertEquals(expected, actual);
+ assertThat(actual).isEqualTo(expected);
}
@Test
- public void testInsertDataDeserialization() throws Exception {
+ void testInsertDataDeserialization() throws Exception {
List<String> actual = testDeserialization("debezium-avro-insert.avro");
List<String> expected =
Collections.singletonList("+I(1,lisi,test debezium avro data,21.799999237060547)");
- assertEquals(expected, actual);
+ assertThat(actual).isEqualTo(expected);
}
@Test
- public void testUpdateDataDeserialization() throws Exception {
+ void testUpdateDataDeserialization() throws Exception {
List<String> actual = testDeserialization("debezium-avro-update.avro");
List<String> expected =
Arrays.asList(
"-U(1,lisi,test debezium avro data,21.799999237060547)",
"+U(1,zhangsan,test debezium avro data,21.799999237060547)");
- assertEquals(expected, actual);
+ assertThat(actual).isEqualTo(expected);
}
@Test
- public void testDeleteDataDeserialization() throws Exception {
+ void testDeleteDataDeserialization() throws Exception {
List<String> actual = testDeserialization("debezium-avro-delete.avro");
List<String> expected =
Collections.singletonList(
"-D(1,zhangsan,test debezium avro data,21.799999237060547)");
- assertEquals(expected, actual);
+ assertThat(actual).isEqualTo(expected);
}
public List<String> testDeserialization(String dataPath) throws Exception {
@@ -153,7 +156,7 @@ public class DebeziumAvroSerDeSchemaTest {
DebeziumAvroDeserializationSchema dbzDeserializer =
new DebeziumAvroDeserializationSchema(
InternalTypeInfo.of(rowType), getDeserializationSchema(rowTypeDe));
- dbzDeserializer.open(mock(DeserializationSchema.InitializationContext.class));
+ dbzDeserializer.open(new MockInitializationContext());
SimpleCollector collector = new SimpleCollector();
dbzDeserializer.deserialize(readBytesFromFile(dataPath), collector);
@@ -200,7 +203,7 @@ public class DebeziumAvroSerDeSchemaTest {
private static byte[] readBytesFromFile(String filePath) {
try {
URL url = DebeziumAvroSerDeSchemaTest.class.getClassLoader().getResource(filePath);
- assert url != null;
+ assertThat(url).isNotNull();
Path path = new File(url.getFile()).toPath();
return FileUtils.readAllBytes(path);
} catch (IOException e) {
@@ -210,7 +213,7 @@ public class DebeziumAvroSerDeSchemaTest {
private static class SimpleCollector implements Collector<RowData> {
- private List<RowData> list = new ArrayList<>();
+ private final List<RowData> list = new ArrayList<>();
@Override
public void collect(RowData record) {
@@ -222,4 +225,18 @@ public class DebeziumAvroSerDeSchemaTest {
// do nothing
}
}
+
+ private static class MockInitializationContext
+ implements DeserializationSchema.InitializationContext,
+ SerializationSchema.InitializationContext {
+ @Override
+ public MetricGroup getMetricGroup() {
+ return new UnregisteredMetricsGroup();
+ }
+
+ @Override
+ public UserCodeClassLoader getUserCodeClassLoader() {
+ return SimpleUserCodeClassLoader.create(getClass().getClassLoader());
+ }
+ }
}
diff --git a/flink-formats/flink-avro-confluent-registry/src/test/resources/META-INF/services/org.junit.jupiter.api.extension.Extension b/flink-formats/flink-avro-confluent-registry/src/test/resources/META-INF/services/org.junit.jupiter.api.extension.Extension
new file mode 100644
index 0000000..2899913
--- /dev/null
+++ b/flink-formats/flink-avro-confluent-registry/src/test/resources/META-INF/services/org.junit.jupiter.api.extension.Extension
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.flink.util.TestLoggerExtension
\ No newline at end of file