You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2021/12/07 09:19:11 UTC

[flink] branch master updated (d946302 -> a0c74c1)

This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from d946302  [FLINK-25114][table-runtime] Remove flink-scala dependency and scala suffix
     new 57a6f02  [FLINK-25014][table-api-java] Perform toDataStream projection case-insensitive
     new a0c74c1  [hotfix][table-api-java] Migrate SchemaTranslatorTest to assertj

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../flink/table/catalog/SchemaTranslator.java      |  38 ++-
 .../flink/table/catalog/SchemaTranslatorTest.java  | 372 ++++++++++-----------
 2 files changed, 207 insertions(+), 203 deletions(-)

[flink] 01/02: [FLINK-25014][table-api-java] Perform toDataStream projection case-insensitive

Posted by tw...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 57a6f02357fd2eb7e3c59e9903f9ec33a655ead6
Author: Timo Walther <tw...@apache.org>
AuthorDate: Mon Dec 6 15:21:46 2021 +0100

    [FLINK-25014][table-api-java] Perform toDataStream projection case-insensitive
    
    This closes #18029.
---
 .../flink/table/catalog/SchemaTranslator.java      | 38 ++++++++++++++++++----
 .../flink/table/catalog/SchemaTranslatorTest.java  |  4 +--
 2 files changed, 33 insertions(+), 9 deletions(-)

diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/SchemaTranslator.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/SchemaTranslator.java
index 4f9fad4..467840c 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/SchemaTranslator.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/SchemaTranslator.java
@@ -43,6 +43,7 @@ import javax.annotation.Nullable;
 
 import java.util.Collections;
 import java.util.List;
+import java.util.Locale;
 import java.util.Optional;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
@@ -113,19 +114,42 @@ public final class SchemaTranslator {
             ResolvedSchema inputSchema,
             AbstractDataType<?> targetDataType) {
         final List<String> inputFieldNames = inputSchema.getColumnNames();
+        final List<String> inputFieldNamesNormalized =
+                inputFieldNames.stream()
+                        .map(n -> n.toLowerCase(Locale.ROOT))
+                        .collect(Collectors.toList());
         final DataType resolvedDataType = dataTypeFactory.createDataType(targetDataType);
         final List<String> targetFieldNames = flattenToNames(resolvedDataType);
+        final List<String> targetFieldNamesNormalized =
+                targetFieldNames.stream()
+                        .map(n -> n.toLowerCase(Locale.ROOT))
+                        .collect(Collectors.toList());
         final List<DataType> targetFieldDataTypes = flattenToDataTypes(resolvedDataType);
 
         // help in reorder fields for POJOs if all field names are present but out of order,
         // otherwise let the sink validation fail later
-        final List<String> projections;
-        if (targetFieldNames.size() == inputFieldNames.size()
-                && !targetFieldNames.equals(inputFieldNames)
-                && targetFieldNames.containsAll(inputFieldNames)) {
-            projections = targetFieldNames;
-        } else {
-            projections = null;
+        List<String> projections = null;
+        if (targetFieldNames.size() == inputFieldNames.size()) {
+            // reordering by name (case-sensitive)
+            if (targetFieldNames.containsAll(inputFieldNames)) {
+                projections = targetFieldNames;
+            }
+            // reordering by name (case-insensitive) but fields must be unique
+            else if (targetFieldNamesNormalized.containsAll(inputFieldNamesNormalized)
+                    && targetFieldNamesNormalized.stream().distinct().count()
+                            == targetFieldNames.size()
+                    && inputFieldNamesNormalized.stream().distinct().count()
+                            == inputFieldNames.size()) {
+                projections =
+                        targetFieldNamesNormalized.stream()
+                                .map(
+                                        targetName -> {
+                                            final int inputFieldPos =
+                                                    inputFieldNamesNormalized.indexOf(targetName);
+                                            return inputFieldNames.get(inputFieldPos);
+                                        })
+                                .collect(Collectors.toList());
+            }
         }
 
         final Schema schema =
diff --git a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/SchemaTranslatorTest.java b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/SchemaTranslatorTest.java
index 46bab61..c2c2e9f 100644
--- a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/SchemaTranslatorTest.java
+++ b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/SchemaTranslatorTest.java
@@ -90,7 +90,7 @@ public class SchemaTranslatorTest {
                 ResolvedSchema.of(
                         Column.physical("c", DataTypes.INT()),
                         Column.physical("a", DataTypes.BOOLEAN()),
-                        Column.physical("b", DataTypes.DOUBLE()));
+                        Column.physical("B", DataTypes.DOUBLE())); // case-insensitive mapping
 
         final DataType physicalDataType =
                 DataTypes.ROW(
@@ -102,7 +102,7 @@ public class SchemaTranslatorTest {
                 SchemaTranslator.createProducingResult(
                         dataTypeFactory(), inputSchema, physicalDataType);
 
-        assertEquals(Optional.of(Arrays.asList("a", "b", "c")), result.getProjections());
+        assertEquals(Optional.of(Arrays.asList("a", "B", "c")), result.getProjections());
 
         assertEquals(
                 Schema.newBuilder()

[flink] 02/02: [hotfix][table-api-java] Migrate SchemaTranslatorTest to assertj

Posted by tw...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit a0c74c1e07030e7bca0c93e24a1e7643937371a7
Author: Timo Walther <tw...@apache.org>
AuthorDate: Mon Dec 6 17:05:36 2021 +0100

    [hotfix][table-api-java] Migrate SchemaTranslatorTest to assertj
---
 .../flink/table/catalog/SchemaTranslatorTest.java  | 372 ++++++++++-----------
 1 file changed, 176 insertions(+), 196 deletions(-)

diff --git a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/SchemaTranslatorTest.java b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/SchemaTranslatorTest.java
index c2c2e9f..3bfe128 100644
--- a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/SchemaTranslatorTest.java
+++ b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/SchemaTranslatorTest.java
@@ -20,7 +20,6 @@ package org.apache.flink.table.catalog;
 
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeinfo.Types;
-import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.api.Schema;
 import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.table.catalog.SchemaTranslator.ConsumingResult;
@@ -37,12 +36,18 @@ import java.time.DayOfWeek;
 import java.util.Arrays;
 import java.util.Optional;
 
-import static org.apache.flink.core.testutils.FlinkMatchers.containsMessage;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertThat;
-import static org.junit.Assert.assertTrue;
+import static org.apache.flink.core.testutils.FlinkAssertions.anyCauseMatches;
+import static org.apache.flink.table.api.DataTypes.BIGINT;
+import static org.apache.flink.table.api.DataTypes.BOOLEAN;
+import static org.apache.flink.table.api.DataTypes.DECIMAL;
+import static org.apache.flink.table.api.DataTypes.DOUBLE;
+import static org.apache.flink.table.api.DataTypes.FIELD;
+import static org.apache.flink.table.api.DataTypes.INT;
+import static org.apache.flink.table.api.DataTypes.ROW;
+import static org.apache.flink.table.api.DataTypes.STRING;
+import static org.apache.flink.table.api.DataTypes.TIMESTAMP_LTZ;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 /** Tests for {@link SchemaTranslator}. */
 public class SchemaTranslatorTest {
@@ -56,63 +61,53 @@ public class SchemaTranslatorTest {
                 SchemaTranslator.createConsumingResult(
                         dataTypeFactoryWithRawType(DayOfWeek.class), inputTypeInfo, null);
 
-        assertEquals(
-                DataTypes.ROW(
-                                DataTypes.FIELD(
-                                        "f0",
-                                        DataTypes.ROW(
-                                                DataTypes.FIELD("f0", DataTypes.INT()),
-                                                DataTypes.FIELD("f1", DataTypes.BOOLEAN()))),
-                                DataTypes.FIELD(
-                                        "f1", DataTypeFactoryMock.dummyRaw(DayOfWeek.class)))
-                        .notNull(),
-                result.getPhysicalDataType());
-
-        assertTrue(result.isTopLevelRecord());
-
-        assertEquals(
-                Schema.newBuilder()
-                        .column(
-                                "f0",
-                                DataTypes.ROW(
-                                        DataTypes.FIELD("f0", DataTypes.INT()),
-                                        DataTypes.FIELD("f1", DataTypes.BOOLEAN())))
-                        .column("f1", DataTypeFactoryMock.dummyRaw(DayOfWeek.class))
-                        .build(),
-                result.getSchema());
-
-        assertNull(result.getProjections());
+        assertThat(result.getPhysicalDataType())
+                .isEqualTo(
+                        ROW(
+                                        FIELD(
+                                                "f0",
+                                                ROW(FIELD("f0", INT()), FIELD("f1", BOOLEAN()))),
+                                        FIELD("f1", DataTypeFactoryMock.dummyRaw(DayOfWeek.class)))
+                                .notNull());
+
+        assertThat(result.isTopLevelRecord()).isTrue();
+
+        assertThat(result.getSchema())
+                .isEqualTo(
+                        Schema.newBuilder()
+                                .column("f0", ROW(FIELD("f0", INT()), FIELD("f1", BOOLEAN())))
+                                .column("f1", DataTypeFactoryMock.dummyRaw(DayOfWeek.class))
+                                .build());
+
+        assertThat(result.getProjections()).isNull();
     }
 
     @Test
     public void testOutputToRowDataType() {
         final ResolvedSchema inputSchema =
                 ResolvedSchema.of(
-                        Column.physical("c", DataTypes.INT()),
-                        Column.physical("a", DataTypes.BOOLEAN()),
-                        Column.physical("B", DataTypes.DOUBLE())); // case-insensitive mapping
+                        Column.physical("c", INT()),
+                        Column.physical("a", BOOLEAN()),
+                        Column.physical("B", DOUBLE())); // case-insensitive mapping
 
         final DataType physicalDataType =
-                DataTypes.ROW(
-                        DataTypes.FIELD("a", DataTypes.BOOLEAN()),
-                        DataTypes.FIELD("b", DataTypes.DOUBLE()),
-                        DataTypes.FIELD("c", DataTypes.INT()));
+                ROW(FIELD("a", BOOLEAN()), FIELD("b", DOUBLE()), FIELD("c", INT()));
 
         final ProducingResult result =
                 SchemaTranslator.createProducingResult(
                         dataTypeFactory(), inputSchema, physicalDataType);
 
-        assertEquals(Optional.of(Arrays.asList("a", "B", "c")), result.getProjections());
+        assertThat(result.getProjections()).hasValue(Arrays.asList("a", "B", "c"));
 
-        assertEquals(
-                Schema.newBuilder()
-                        .column("a", DataTypes.BOOLEAN())
-                        .column("b", DataTypes.DOUBLE())
-                        .column("c", DataTypes.INT())
-                        .build(),
-                result.getSchema());
+        assertThat(result.getSchema())
+                .isEqualTo(
+                        Schema.newBuilder()
+                                .column("a", BOOLEAN())
+                                .column("b", DOUBLE())
+                                .column("c", INT())
+                                .build());
 
-        assertEquals(Optional.of(physicalDataType), result.getPhysicalDataType());
+        assertThat(result.getPhysicalDataType()).hasValue(physicalDataType);
     }
 
     @Test
@@ -123,30 +118,31 @@ public class SchemaTranslatorTest {
                 SchemaTranslator.createConsumingResult(
                         dataTypeFactoryWithRawType(Row.class), inputTypeInfo, null);
 
-        assertEquals(DataTypeFactoryMock.dummyRaw(Row.class), result.getPhysicalDataType());
+        assertThat(result.getPhysicalDataType()).isEqualTo(DataTypeFactoryMock.dummyRaw(Row.class));
 
-        assertFalse(result.isTopLevelRecord());
+        assertThat(result.isTopLevelRecord()).isFalse();
 
-        assertEquals(
-                Schema.newBuilder().column("f0", DataTypeFactoryMock.dummyRaw(Row.class)).build(),
-                result.getSchema());
+        assertThat(result.getSchema())
+                .isEqualTo(
+                        Schema.newBuilder()
+                                .column("f0", DataTypeFactoryMock.dummyRaw(Row.class))
+                                .build());
 
-        assertNull(result.getProjections());
+        assertThat(result.getProjections()).isNull();
     }
 
     @Test
     public void testOutputToAtomicDataType() {
-        final ResolvedSchema inputSchema = ResolvedSchema.of(Column.physical("a", DataTypes.INT()));
+        final ResolvedSchema inputSchema = ResolvedSchema.of(Column.physical("a", INT()));
 
         final ProducingResult result =
-                SchemaTranslator.createProducingResult(
-                        dataTypeFactory(), inputSchema, DataTypes.INT());
+                SchemaTranslator.createProducingResult(dataTypeFactory(), inputSchema, INT());
 
-        assertEquals(Optional.empty(), result.getProjections());
+        assertThat(result.getProjections()).isEmpty();
 
-        assertEquals(Schema.newBuilder().column("f0", DataTypes.INT()).build(), result.getSchema());
+        assertThat(result.getSchema()).isEqualTo(Schema.newBuilder().column("f0", INT()).build());
 
-        assertEquals(Optional.of(DataTypes.INT()), result.getPhysicalDataType());
+        assertThat(result.getPhysicalDataType()).hasValue(INT());
     }
 
     @Test
@@ -163,26 +159,22 @@ public class SchemaTranslatorTest {
                                 .primaryKeyNamed("pk", "f0")
                                 .build());
 
-        assertEquals(
-                DataTypes.ROW(
-                                DataTypes.FIELD("f0", DataTypes.INT()),
-                                DataTypes.FIELD("f1", DataTypes.BIGINT()))
-                        .notNull(),
-                result.getPhysicalDataType());
-
-        assertTrue(result.isTopLevelRecord());
-
-        assertEquals(
-                Schema.newBuilder()
-                        .column("f0", DataTypes.INT().notNull()) // not null due to primary key
-                        .column("f1", DataTypes.BIGINT())
-                        .columnByExpression("computed", "f1 + 42")
-                        .columnByExpression("computed2", "f1 - 1")
-                        .primaryKeyNamed("pk", "f0")
-                        .build(),
-                result.getSchema());
-
-        assertNull(result.getProjections());
+        assertThat(result.getPhysicalDataType())
+                .isEqualTo(ROW(FIELD("f0", INT()), FIELD("f1", BIGINT())).notNull());
+
+        assertThat(result.isTopLevelRecord()).isTrue();
+
+        assertThat(result.getSchema())
+                .isEqualTo(
+                        Schema.newBuilder()
+                                .column("f0", INT().notNull()) // not null due to primary key
+                                .column("f1", BIGINT())
+                                .columnByExpression("computed", "f1 + 42")
+                                .columnByExpression("computed2", "f1 - 1")
+                                .primaryKeyNamed("pk", "f0")
+                                .build());
+
+        assertThat(result.getProjections()).isNull();
     }
 
     @Test
@@ -196,38 +188,38 @@ public class SchemaTranslatorTest {
                         inputTypeInfo,
                         Schema.newBuilder()
                                 .primaryKeyNamed("pk", "f0")
-                                .column("f1", DataTypes.BIGINT()) // reordered
-                                .column("f0", DataTypes.INT())
+                                .column("f1", BIGINT()) // reordered
+                                .column("f0", INT())
                                 .columnByExpression("computed", "f1 + 42")
-                                .column("f2", DataTypes.DECIMAL(10, 2)) // enriches
+                                .column("f2", DECIMAL(10, 2)) // enriches
                                 .columnByExpression("computed2", "f1 - 1")
                                 .build());
 
-        assertEquals(
-                DataTypes.ROW(
-                                DataTypes.FIELD("f0", DataTypes.INT()),
-                                DataTypes.FIELD("f1", DataTypes.BIGINT()),
-                                DataTypes.FIELD("f2", DataTypes.DECIMAL(10, 2)),
-                                DataTypes.FIELD("f3", DataTypes.BOOLEAN()))
-                        .notNull(),
-                result.getPhysicalDataType());
-
-        assertTrue(result.isTopLevelRecord());
-
-        assertEquals(
-                Schema.newBuilder()
-                        .column("f0", DataTypes.INT())
-                        .column("f1", DataTypes.BIGINT())
-                        .column("f2", DataTypes.DECIMAL(10, 2))
-                        .column("f3", DataTypes.BOOLEAN())
-                        .columnByExpression("computed", "f1 + 42")
-                        .columnByExpression("computed2", "f1 - 1")
-                        .primaryKeyNamed("pk", "f0")
-                        .build(),
-                result.getSchema());
-
-        assertEquals(
-                Arrays.asList("f1", "f0", "computed", "f2", "computed2"), result.getProjections());
+        assertThat(result.getPhysicalDataType())
+                .isEqualTo(
+                        ROW(
+                                        FIELD("f0", INT()),
+                                        FIELD("f1", BIGINT()),
+                                        FIELD("f2", DECIMAL(10, 2)),
+                                        FIELD("f3", BOOLEAN()))
+                                .notNull());
+
+        assertThat(result.isTopLevelRecord()).isTrue();
+
+        assertThat(result.getSchema())
+                .isEqualTo(
+                        Schema.newBuilder()
+                                .column("f0", INT())
+                                .column("f1", BIGINT())
+                                .column("f2", DECIMAL(10, 2))
+                                .column("f3", BOOLEAN())
+                                .columnByExpression("computed", "f1 + 42")
+                                .columnByExpression("computed2", "f1 - 1")
+                                .primaryKeyNamed("pk", "f0")
+                                .build());
+
+        assertThat(result.getProjections())
+                .isEqualTo(Arrays.asList("f1", "f0", "computed", "f2", "computed2"));
     }
 
     @Test
@@ -240,153 +232,141 @@ public class SchemaTranslatorTest {
                         inputTypeInfo,
                         Schema.newBuilder()
                                 .columnByExpression("f0_0", "f0.f0_0")
-                                .column(
-                                        "f0",
-                                        DataTypes.ROW(
-                                                DataTypes.FIELD("f0_0", DataTypes.INT()),
-                                                DataTypes.FIELD("f0_1", DataTypes.BOOLEAN())))
+                                .column("f0", ROW(FIELD("f0_0", INT()), FIELD("f0_1", BOOLEAN())))
+                                .columnByExpression("f0_1", "f0.f0_1")
+                                .build());
+
+        assertThat(result.getPhysicalDataType())
+                .isEqualTo(ROW(FIELD("f0_0", INT()), FIELD("f0_1", BOOLEAN())));
+
+        assertThat(result.isTopLevelRecord()).isFalse();
+
+        assertThat(result.getSchema())
+                .isEqualTo(
+                        Schema.newBuilder()
+                                .column("f0", ROW(FIELD("f0_0", INT()), FIELD("f0_1", BOOLEAN())))
+                                .columnByExpression("f0_0", "f0.f0_0")
                                 .columnByExpression("f0_1", "f0.f0_1")
                                 .build());
 
-        assertEquals(
-                DataTypes.ROW(
-                        DataTypes.FIELD("f0_0", DataTypes.INT()),
-                        DataTypes.FIELD("f0_1", DataTypes.BOOLEAN())),
-                result.getPhysicalDataType());
-
-        assertFalse(result.isTopLevelRecord());
-
-        assertEquals(
-                Schema.newBuilder()
-                        .column(
-                                "f0",
-                                DataTypes.ROW(
-                                        DataTypes.FIELD("f0_0", DataTypes.INT()),
-                                        DataTypes.FIELD("f0_1", DataTypes.BOOLEAN())))
-                        .columnByExpression("f0_0", "f0.f0_0")
-                        .columnByExpression("f0_1", "f0.f0_1")
-                        .build(),
-                result.getSchema());
-
-        assertEquals(Arrays.asList("f0_0", "f0", "f0_1"), result.getProjections());
+        assertThat(result.getProjections()).isEqualTo(Arrays.asList("f0_0", "f0", "f0_1"));
     }
 
     @Test
     public void testInvalidDeclaredSchemaColumn() {
         final TypeInformation<?> inputTypeInfo = Types.ROW(Types.INT, Types.LONG);
 
-        try {
-            SchemaTranslator.createConsumingResult(
-                    dataTypeFactory(),
-                    inputTypeInfo,
-                    Schema.newBuilder().column("INVALID", DataTypes.BIGINT()).build());
-        } catch (ValidationException e) {
-            assertThat(
-                    e,
-                    containsMessage(
-                            "Unable to find a field named 'INVALID' in the physical data type"));
-        }
+        assertThatThrownBy(
+                        () ->
+                                SchemaTranslator.createConsumingResult(
+                                        dataTypeFactory(),
+                                        inputTypeInfo,
+                                        Schema.newBuilder().column("INVALID", BIGINT()).build()))
+                .satisfies(
+                        anyCauseMatches(
+                                ValidationException.class,
+                                "Unable to find a field named 'INVALID' in the physical data type"));
     }
 
     @Test
     public void testOutputToNoSchema() {
         final ResolvedSchema tableSchema =
                 ResolvedSchema.of(
-                        Column.physical("id", DataTypes.BIGINT()),
-                        Column.metadata("rowtime", DataTypes.TIMESTAMP_LTZ(3), null, false),
-                        Column.physical("name", DataTypes.STRING()));
+                        Column.physical("id", BIGINT()),
+                        Column.metadata("rowtime", TIMESTAMP_LTZ(3), null, false),
+                        Column.physical("name", STRING()));
 
         final ProducingResult result = SchemaTranslator.createProducingResult(tableSchema, null);
 
-        assertEquals(Optional.empty(), result.getProjections());
+        assertThat(result.getProjections()).isEmpty();
 
-        assertEquals(
-                Schema.newBuilder()
-                        .column("id", DataTypes.BIGINT())
-                        .column("rowtime", DataTypes.TIMESTAMP_LTZ(3)) // becomes physical
-                        .column("name", DataTypes.STRING())
-                        .build(),
-                result.getSchema());
+        assertThat(result.getSchema())
+                .isEqualTo(
+                        Schema.newBuilder()
+                                .column("id", BIGINT())
+                                .column("rowtime", TIMESTAMP_LTZ(3)) // becomes physical
+                                .column("name", STRING())
+                                .build());
 
-        assertEquals(Optional.empty(), result.getPhysicalDataType());
+        assertThat(result.getPhysicalDataType()).isEmpty();
     }
 
     @Test
     public void testOutputToEmptySchema() {
         final ResolvedSchema tableSchema =
                 ResolvedSchema.of(
-                        Column.physical("id", DataTypes.BIGINT()),
-                        Column.metadata("rowtime", DataTypes.TIMESTAMP_LTZ(3), null, false),
-                        Column.physical("name", DataTypes.STRING()));
+                        Column.physical("id", BIGINT()),
+                        Column.metadata("rowtime", TIMESTAMP_LTZ(3), null, false),
+                        Column.physical("name", STRING()));
 
         final ProducingResult result =
                 SchemaTranslator.createProducingResult(tableSchema, Schema.derived());
 
-        assertEquals(Optional.empty(), result.getProjections());
+        assertThat(result.getProjections()).isEmpty();
 
-        assertEquals(
-                Schema.newBuilder()
-                        .column("id", DataTypes.BIGINT())
-                        .column("rowtime", DataTypes.TIMESTAMP_LTZ(3)) // becomes physical
-                        .column("name", DataTypes.STRING())
-                        .build(),
-                result.getSchema());
+        assertThat(result.getSchema())
+                .isEqualTo(
+                        Schema.newBuilder()
+                                .column("id", BIGINT())
+                                .column("rowtime", TIMESTAMP_LTZ(3)) // becomes physical
+                                .column("name", STRING())
+                                .build());
 
-        assertEquals(Optional.empty(), result.getPhysicalDataType());
+        assertThat(result.getPhysicalDataType()).isEmpty();
     }
 
     @Test
     public void testOutputToPartialSchema() {
         final ResolvedSchema tableSchema =
                 ResolvedSchema.of(
-                        Column.physical("id", DataTypes.BIGINT().notNull()),
-                        Column.physical("name", DataTypes.STRING()),
-                        Column.metadata("rowtime", DataTypes.TIMESTAMP_LTZ(3), null, false));
+                        Column.physical("id", BIGINT().notNull()),
+                        Column.physical("name", STRING()),
+                        Column.metadata("rowtime", TIMESTAMP_LTZ(3), null, false));
 
         final ProducingResult result =
                 SchemaTranslator.createProducingResult(
                         tableSchema,
                         Schema.newBuilder()
                                 .columnByExpression("computed", "f1 + 42")
-                                .columnByMetadata("rowtime", DataTypes.TIMESTAMP_LTZ(3))
+                                .columnByMetadata("rowtime", TIMESTAMP_LTZ(3))
                                 .primaryKey("id")
                                 .build());
 
-        assertEquals(
-                Schema.newBuilder()
-                        .column("id", DataTypes.BIGINT().notNull())
-                        .column("name", DataTypes.STRING())
-                        .columnByExpression("computed", "f1 + 42")
-                        .columnByMetadata("rowtime", DataTypes.TIMESTAMP_LTZ(3)) // becomes metadata
-                        .primaryKey("id")
-                        .build(),
-                result.getSchema());
+        assertThat(result.getSchema())
+                .isEqualTo(
+                        Schema.newBuilder()
+                                .column("id", BIGINT().notNull())
+                                .column("name", STRING())
+                                .columnByExpression("computed", "f1 + 42")
+                                .columnByMetadata("rowtime", TIMESTAMP_LTZ(3)) // becomes metadata
+                                .primaryKey("id")
+                                .build());
     }
 
     @Test
     public void testOutputToDeclaredSchema() {
         final ResolvedSchema tableSchema =
                 ResolvedSchema.of(
-                        Column.physical("id", DataTypes.BIGINT()),
-                        Column.physical("rowtime", DataTypes.TIMESTAMP_LTZ(3)),
-                        Column.physical("name", DataTypes.STRING()));
+                        Column.physical("id", BIGINT()),
+                        Column.physical("rowtime", TIMESTAMP_LTZ(3)),
+                        Column.physical("name", STRING()));
 
         final ProducingResult result =
                 SchemaTranslator.createProducingResult(
                         tableSchema,
                         Schema.newBuilder()
-                                .column("id", DataTypes.BIGINT())
-                                .columnByMetadata("rowtime", DataTypes.TIMESTAMP_LTZ(3))
-                                .column("name", DataTypes.STRING().bridgedTo(StringData.class))
+                                .column("id", BIGINT())
+                                .columnByMetadata("rowtime", TIMESTAMP_LTZ(3))
+                                .column("name", STRING().bridgedTo(StringData.class))
                                 .build());
 
-        assertEquals(
-                Schema.newBuilder()
-                        .column("id", DataTypes.BIGINT())
-                        .columnByMetadata("rowtime", DataTypes.TIMESTAMP_LTZ(3))
-                        .column("name", DataTypes.STRING().bridgedTo(StringData.class))
-                        .build(),
-                result.getSchema());
+        assertThat(result.getSchema())
+                .isEqualTo(
+                        Schema.newBuilder()
+                                .column("id", BIGINT())
+                                .columnByMetadata("rowtime", TIMESTAMP_LTZ(3))
+                                .column("name", STRING().bridgedTo(StringData.class))
+                                .build());
     }
 
     private static DataTypeFactory dataTypeFactoryWithRawType(Class<?> rawType) {