You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by et...@apache.org on 2023/05/02 07:25:13 UTC
[iceberg] branch master updated: Flink: Remove deprecated AssertHelpers (#7481)
This is an automated email from the ASF dual-hosted git repository.
etudenhoefner pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new f9f85e7f26 Flink: Remove deprecated AssertHelpers (#7481)
f9f85e7f26 is described below
commit f9f85e7f2647020ef7e9571f76027c8e55075690
Author: Liu Xiao <42...@users.noreply.github.com>
AuthorDate: Tue May 2 15:25:08 2023 +0800
Flink: Remove deprecated AssertHelpers (#7481)
---
.../iceberg/flink/TestFlinkCatalogDatabase.java | 24 +++++-----
.../iceberg/flink/TestFlinkCatalogFactory.java | 20 ++++----
.../iceberg/flink/TestFlinkCatalogTable.java | 29 +++++-------
.../flink/TestFlinkCatalogTablePartitions.java | 10 ++--
.../apache/iceberg/flink/TestFlinkSchemaUtil.java | 12 ++---
.../apache/iceberg/flink/TestIcebergConnector.java | 32 +++++++------
.../iceberg/flink/data/TestRowDataProjection.java | 29 +++++-------
.../iceberg/flink/sink/TestFlinkIcebergSink.java | 41 ++++------------
.../iceberg/flink/sink/TestFlinkIcebergSinkV2.java | 29 +++++++-----
.../flink/sink/TestFlinkIcebergSinkV2Base.java | 55 ++++++++++------------
.../apache/iceberg/flink/source/TestFlinkScan.java | 44 ++++++++---------
.../flink/source/TestFlinkSourceConfig.java | 14 ++----
.../iceberg/flink/source/TestFlinkTableSource.java | 8 ++--
.../iceberg/flink/source/TestStreamScanSql.java | 34 +++++++------
.../flink/source/TestStreamingMonitorFunction.java | 24 +++-------
.../enumerator/TestContinuousSplitPlannerImpl.java | 34 +++++--------
...estContinuousSplitPlannerImplStartStrategy.java | 38 +++++++--------
17 files changed, 213 insertions(+), 264 deletions(-)
diff --git a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogDatabase.java b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogDatabase.java
index d4de12c623..0e767fdc81 100644
--- a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogDatabase.java
+++ b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogDatabase.java
@@ -24,12 +24,12 @@ import java.util.Map;
import java.util.Objects;
import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException;
import org.apache.flink.types.Row;
-import org.apache.iceberg.AssertHelpers;
import org.apache.iceberg.Schema;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.types.Types;
+import org.assertj.core.api.Assertions;
import org.junit.After;
import org.junit.Assert;
import org.junit.Assume;
@@ -126,11 +126,11 @@ public class TestFlinkCatalogDatabase extends FlinkCatalogTestBase {
"Table should exist",
validationCatalog.tableExists(TableIdentifier.of(icebergNamespace, "tl")));
- AssertHelpers.assertThrowsCause(
- "Should fail if trying to delete a non-empty database",
- DatabaseNotEmptyException.class,
- String.format("Database %s in catalog %s is not empty.", DATABASE, catalogName),
- () -> sql("DROP DATABASE %s", flinkDatabase));
+ Assertions.assertThatThrownBy(() -> sql("DROP DATABASE %s", flinkDatabase))
+ .cause()
+ .isInstanceOf(DatabaseNotEmptyException.class)
+ .hasMessage(
+ String.format("Database %s in catalog %s is not empty.", DATABASE, catalogName));
sql("DROP TABLE %s.tl", flinkDatabase);
}
@@ -301,10 +301,12 @@ public class TestFlinkCatalogDatabase extends FlinkCatalogTestBase {
"Namespace should not already exist",
validationNamespaceCatalog.namespaceExists(icebergNamespace));
- AssertHelpers.assertThrowsCause(
- "Should fail if trying to create database with location in hadoop catalog.",
- UnsupportedOperationException.class,
- String.format("Cannot create namespace %s: metadata is not supported", icebergNamespace),
- () -> sql("CREATE DATABASE %s WITH ('prop'='value')", flinkDatabase));
+ Assertions.assertThatThrownBy(
+ () -> sql("CREATE DATABASE %s WITH ('prop'='value')", flinkDatabase))
+ .cause()
+ .isInstanceOf(UnsupportedOperationException.class)
+ .hasMessage(
+ String.format(
+ "Cannot create namespace %s: metadata is not supported", icebergNamespace));
}
}
diff --git a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogFactory.java b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogFactory.java
index f7edd5653e..184223847c 100644
--- a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogFactory.java
+++ b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogFactory.java
@@ -20,7 +20,6 @@ package org.apache.iceberg.flink;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
-import org.apache.iceberg.AssertHelpers;
import org.apache.iceberg.CatalogProperties;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.hadoop.HadoopCatalog;
@@ -87,11 +86,11 @@ public class TestFlinkCatalogFactory {
props.put(
FlinkCatalogFactory.ICEBERG_CATALOG_TYPE, FlinkCatalogFactory.ICEBERG_CATALOG_TYPE_HIVE);
- AssertHelpers.assertThrows(
- "Should throw when both catalog-type and catalog-impl are set",
- IllegalArgumentException.class,
- "both catalog-type and catalog-impl are set",
- () -> FlinkCatalogFactory.createCatalogLoader(catalogName, props, new Configuration()));
+ Assertions.assertThatThrownBy(
+ () -> FlinkCatalogFactory.createCatalogLoader(catalogName, props, new Configuration()))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessageStartingWith(
+ "Cannot create catalog customCatalog, both catalog-type and catalog-impl are set");
}
@Test
@@ -99,11 +98,10 @@ public class TestFlinkCatalogFactory {
String catalogName = "unknownCatalog";
props.put(FlinkCatalogFactory.ICEBERG_CATALOG_TYPE, "fooType");
- AssertHelpers.assertThrows(
- "Should throw when an unregistered / unknown catalog is set as the catalog factor's`type` setting",
- UnsupportedOperationException.class,
- "Unknown catalog-type",
- () -> FlinkCatalogFactory.createCatalogLoader(catalogName, props, new Configuration()));
+ Assertions.assertThatThrownBy(
+ () -> FlinkCatalogFactory.createCatalogLoader(catalogName, props, new Configuration()))
+ .isInstanceOf(UnsupportedOperationException.class)
+ .hasMessageStartingWith("Unknown catalog-type: fooType");
}
public static class CustomHadoopCatalog extends HadoopCatalog {
diff --git a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTable.java b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTable.java
index 45b3da5fe6..c6c26833c7 100644
--- a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTable.java
+++ b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTable.java
@@ -32,7 +32,6 @@ import org.apache.flink.table.api.constraints.UniqueConstraint;
import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.exceptions.TableNotExistException;
-import org.apache.iceberg.AssertHelpers;
import org.apache.iceberg.BaseTable;
import org.apache.iceberg.ContentFile;
import org.apache.iceberg.DataFile;
@@ -54,6 +53,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.types.Types;
+import org.assertj.core.api.Assertions;
import org.junit.After;
import org.junit.Assert;
import org.junit.Assume;
@@ -104,11 +104,11 @@ public class TestFlinkCatalogTable extends FlinkCatalogTestBase {
new Schema(Types.NestedField.optional(0, "id", Types.LongType.get()));
validationCatalog.createTable(TableIdentifier.of(icebergNamespace, "tl"), tableSchema);
sql("ALTER TABLE tl RENAME TO tl2");
- AssertHelpers.assertThrows(
- "Should fail if trying to get a nonexistent table",
- ValidationException.class,
- "Table `tl` was not found.",
- () -> getTableEnv().from("tl"));
+
+ Assertions.assertThatThrownBy(() -> getTableEnv().from("tl"))
+ .isInstanceOf(ValidationException.class)
+ .hasMessage("Table `tl` was not found.");
+
Schema actualSchema = FlinkSchemaUtil.convert(getTableEnv().from("tl2").getSchema());
Assert.assertEquals(tableSchema.asStruct(), actualSchema.asStruct());
}
@@ -179,11 +179,9 @@ public class TestFlinkCatalogTable extends FlinkCatalogTestBase {
Assert.assertEquals(Maps.newHashMap(), table("tl").properties());
sql("DROP TABLE tl");
- AssertHelpers.assertThrows(
- "Table 'tl' should be dropped",
- NoSuchTableException.class,
- "Table does not exist: " + getFullQualifiedTableName("tl"),
- () -> table("tl"));
+ Assertions.assertThatThrownBy(() -> table("tl"))
+ .isInstanceOf(NoSuchTableException.class)
+ .hasMessage("Table does not exist: " + getFullQualifiedTableName("tl"));
sql("CREATE TABLE IF NOT EXISTS tl(id BIGINT)");
Assert.assertEquals(Maps.newHashMap(), table("tl").properties());
@@ -286,11 +284,10 @@ public class TestFlinkCatalogTable extends FlinkCatalogTestBase {
TableOperations ops = ((BaseTable) table).operations();
Assert.assertEquals("should create table using format v2", 2, ops.refresh().formatVersion());
- AssertHelpers.assertThrowsRootCause(
- "should fail to downgrade to v1",
- IllegalArgumentException.class,
- "Cannot downgrade v2 table to v1",
- () -> sql("ALTER TABLE tl SET('format-version'='1')"));
+ Assertions.assertThatThrownBy(() -> sql("ALTER TABLE tl SET('format-version'='1')"))
+ .rootCause()
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessage("Cannot downgrade v2 table to v1");
}
@Test
diff --git a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTablePartitions.java b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTablePartitions.java
index e5332e8f30..0008e4320c 100644
--- a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTablePartitions.java
+++ b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTablePartitions.java
@@ -23,12 +23,12 @@ import org.apache.flink.table.catalog.CatalogPartitionSpec;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.exceptions.TableNotExistException;
import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException;
-import org.apache.iceberg.AssertHelpers;
import org.apache.iceberg.CatalogProperties;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.assertj.core.api.Assertions;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
@@ -90,10 +90,10 @@ public class TestFlinkCatalogTablePartitions extends FlinkCatalogTestBase {
ObjectPath objectPath = new ObjectPath(DATABASE, tableName);
FlinkCatalog flinkCatalog = (FlinkCatalog) getTableEnv().getCatalog(catalogName).get();
- AssertHelpers.assertThrows(
- "Should not list partitions for unpartitioned table.",
- TableNotPartitionedException.class,
- () -> flinkCatalog.listPartitions(objectPath));
+ Assertions.assertThatThrownBy(() -> flinkCatalog.listPartitions(objectPath))
+ .isInstanceOf(TableNotPartitionedException.class)
+ .hasMessageStartingWith("Table db.test_table in catalog")
+ .hasMessageEndingWith("is not partitioned.");
}
@Test
diff --git a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/TestFlinkSchemaUtil.java b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/TestFlinkSchemaUtil.java
index b5dfb9cb2f..4ac32c08eb 100644
--- a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/TestFlinkSchemaUtil.java
+++ b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/TestFlinkSchemaUtil.java
@@ -30,13 +30,13 @@ import org.apache.flink.table.types.logical.TimeType;
import org.apache.flink.table.types.logical.TimestampType;
import org.apache.flink.table.types.logical.VarBinaryType;
import org.apache.flink.table.types.logical.VarCharType;
-import org.apache.iceberg.AssertHelpers;
import org.apache.iceberg.Schema;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;
+import org.assertj.core.api.Assertions;
import org.junit.Assert;
import org.junit.Test;
@@ -407,10 +407,10 @@ public class TestFlinkSchemaUtil {
Types.StructType.of(
Types.NestedField.required(2, "inner", Types.IntegerType.get())))),
Sets.newHashSet(2));
- AssertHelpers.assertThrows(
- "Does not support the nested columns in flink schema's primary keys",
- ValidationException.class,
- "Column 'struct.inner' does not exist",
- () -> FlinkSchemaUtil.toSchema(icebergSchema));
+
+ Assertions.assertThatThrownBy(() -> FlinkSchemaUtil.toSchema(icebergSchema))
+ .isInstanceOf(ValidationException.class)
+ .hasMessageStartingWith("Could not create a PRIMARY KEY")
+ .hasMessageContaining("Column 'struct.inner' does not exist.");
}
}
diff --git a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/TestIcebergConnector.java b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/TestIcebergConnector.java
index a12fe2507f..4f71b5fe8d 100644
--- a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/TestIcebergConnector.java
+++ b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/TestIcebergConnector.java
@@ -31,13 +31,13 @@ import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.types.Row;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
-import org.apache.iceberg.AssertHelpers;
import org.apache.iceberg.CatalogProperties;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.thrift.TException;
+import org.assertj.core.api.Assertions;
import org.junit.After;
import org.junit.Assert;
import org.junit.ClassRule;
@@ -261,11 +261,10 @@ public class TestIcebergConnector extends FlinkTestBase {
try {
testCreateConnectorTable();
// Ensure that the table was created under the specific database.
- AssertHelpers.assertThrows(
- "Table should already exists",
- org.apache.flink.table.api.TableException.class,
- "Could not execute CreateTable in path",
- () -> sql("CREATE TABLE `default_catalog`.`%s`.`%s`", databaseName(), TABLE_NAME));
+ Assertions.assertThatThrownBy(
+ () -> sql("CREATE TABLE `default_catalog`.`%s`.`%s`", databaseName(), TABLE_NAME))
+ .isInstanceOf(org.apache.flink.table.api.TableException.class)
+ .hasMessageStartingWith("Could not execute CreateTable in path");
} finally {
sql("DROP TABLE IF EXISTS `%s`.`%s`", databaseName(), TABLE_NAME);
if (!isDefaultDatabaseName()) {
@@ -293,14 +292,19 @@ public class TestIcebergConnector extends FlinkTestBase {
// Create a connector table in an iceberg catalog.
sql("CREATE CATALOG `test_catalog` WITH %s", toWithClause(catalogProps));
try {
- AssertHelpers.assertThrowsCause(
- "Cannot create the iceberg connector table in iceberg catalog",
- IllegalArgumentException.class,
- "Cannot create the table with 'connector'='iceberg' table property in an iceberg catalog",
- () ->
- sql(
- "CREATE TABLE `test_catalog`.`%s`.`%s` (id BIGINT, data STRING) WITH %s",
- FlinkCatalogFactory.DEFAULT_DATABASE_NAME, TABLE_NAME, toWithClause(tableProps)));
+ Assertions.assertThatThrownBy(
+ () ->
+ sql(
+ "CREATE TABLE `test_catalog`.`%s`.`%s` (id BIGINT, data STRING) WITH %s",
+ FlinkCatalogFactory.DEFAULT_DATABASE_NAME,
+ TABLE_NAME,
+ toWithClause(tableProps)))
+ .cause()
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessage(
+ "Cannot create the table with 'connector'='iceberg' table property in an iceberg catalog, "
+ + "Please create table with 'connector'='iceberg' property in a non-iceberg catalog or "
+ + "create table without 'connector'='iceberg' related properties in an iceberg table.");
} finally {
sql("DROP CATALOG IF EXISTS `test_catalog`");
}
diff --git a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/data/TestRowDataProjection.java b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/data/TestRowDataProjection.java
index 4cb77b11fd..a3bbb9ae25 100644
--- a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/data/TestRowDataProjection.java
+++ b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/data/TestRowDataProjection.java
@@ -20,7 +20,6 @@ package org.apache.iceberg.flink.data;
import java.util.Iterator;
import org.apache.flink.table.data.RowData;
-import org.apache.iceberg.AssertHelpers;
import org.apache.iceberg.Schema;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.data.RandomGenericData;
@@ -28,6 +27,7 @@ import org.apache.iceberg.data.Record;
import org.apache.iceberg.flink.TestHelpers;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.StructProjection;
+import org.assertj.core.api.Assertions;
import org.junit.Assert;
import org.junit.Test;
@@ -223,11 +223,10 @@ public class TestRowDataProjection {
Types.StructType.of(
Types.NestedField.required(3, "value", Types.LongType.get()),
Types.NestedField.required(4, "valueData", Types.LongType.get())))));
- AssertHelpers.assertThrows(
- "Should not allow to project a partial map key with non-primitive type.",
- IllegalArgumentException.class,
- "Cannot project a partial map key or value",
- () -> generateAndValidate(schema, partialMapKey));
+
+ Assertions.assertThatThrownBy(() -> generateAndValidate(schema, partialMapKey))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessageStartingWith("Cannot project a partial map key or value struct.");
// Project partial map key.
Schema partialMapValue =
@@ -243,11 +242,10 @@ public class TestRowDataProjection {
Types.NestedField.required(2, "keyData", Types.LongType.get())),
Types.StructType.of(
Types.NestedField.required(3, "value", Types.LongType.get())))));
- AssertHelpers.assertThrows(
- "Should not allow to project a partial map value with non-primitive type.",
- IllegalArgumentException.class,
- "Cannot project a partial map key or value",
- () -> generateAndValidate(schema, partialMapValue));
+
+ Assertions.assertThatThrownBy(() -> generateAndValidate(schema, partialMapValue))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessageStartingWith("Cannot project a partial map key or value struct.");
}
@Test
@@ -306,11 +304,10 @@ public class TestRowDataProjection {
4,
Types.StructType.of(
Types.NestedField.required(2, "nestedListField2", Types.LongType.get())))));
- AssertHelpers.assertThrows(
- "Should not allow to project a partial list element with non-primitive type.",
- IllegalArgumentException.class,
- "Cannot project a partial list element",
- () -> generateAndValidate(schema, partialList));
+
+ Assertions.assertThatThrownBy(() -> generateAndValidate(schema, partialList))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessageStartingWith("Cannot project a partial list element struct.");
}
private void generateAndValidate(Schema schema, Schema projectSchema) {
diff --git a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSink.java b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSink.java
index 23beb19a72..dc9e8991ed 100644
--- a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSink.java
+++ b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSink.java
@@ -28,7 +28,6 @@ import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.data.RowData;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.flink.types.Row;
-import org.apache.iceberg.AssertHelpers;
import org.apache.iceberg.DistributionMode;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.PartitionSpec;
@@ -45,6 +44,7 @@ import org.apache.iceberg.flink.util.FlinkCompatibilityUtil;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.assertj.core.api.Assertions;
import org.junit.Assert;
import org.junit.Before;
import org.junit.ClassRule;
@@ -197,14 +197,9 @@ public class TestFlinkIcebergSink extends TestFlinkIcebergSinkBase {
.set(TableProperties.WRITE_DISTRIBUTION_MODE, DistributionMode.HASH.modeName())
.commit();
- AssertHelpers.assertThrows(
- "Does not support range distribution-mode now.",
- IllegalArgumentException.class,
- "Flink does not support 'range' write distribution mode now.",
- () -> {
- testWriteRow(null, DistributionMode.RANGE);
- return null;
- });
+ Assertions.assertThatThrownBy(() -> testWriteRow(null, DistributionMode.RANGE))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessage("Flink does not support 'range' write distribution mode now.");
}
@Test
@@ -350,17 +345,9 @@ public class TestFlinkIcebergSink extends TestFlinkIcebergSinkBase {
.writeParallelism(parallelism)
.setAll(newProps);
- AssertHelpers.assertThrows(
- "Should fail with invalid distribution mode.",
- IllegalArgumentException.class,
- "Invalid distribution mode: UNRECOGNIZED",
- () -> {
- builder.append();
-
- // Execute the program.
- env.execute("Test Iceberg DataStream.");
- return null;
- });
+ Assertions.assertThatThrownBy(builder::append)
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessage("Invalid distribution mode: UNRECOGNIZED");
}
@Test
@@ -378,16 +365,8 @@ public class TestFlinkIcebergSink extends TestFlinkIcebergSinkBase {
.writeParallelism(parallelism)
.setAll(newProps);
- AssertHelpers.assertThrows(
- "Should fail with invalid file format.",
- IllegalArgumentException.class,
- "Invalid file format: UNRECOGNIZED",
- () -> {
- builder.append();
-
- // Execute the program.
- env.execute("Test Iceberg DataStream.");
- return null;
- });
+ Assertions.assertThatThrownBy(builder::append)
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessage("Invalid file format: UNRECOGNIZED");
}
}
diff --git a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2.java b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2.java
index 6552fe834c..b5c3bcf417 100644
--- a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2.java
+++ b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2.java
@@ -23,7 +23,6 @@ import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.flink.types.Row;
-import org.apache.iceberg.AssertHelpers;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.SnapshotRef;
@@ -39,6 +38,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.types.Types;
+import org.assertj.core.api.Assertions;
import org.junit.Assert;
import org.junit.Before;
import org.junit.ClassRule;
@@ -202,18 +202,21 @@ public class TestFlinkIcebergSinkV2 extends TestFlinkIcebergSinkV2Base {
.writeParallelism(parallelism)
.upsert(true);
- AssertHelpers.assertThrows(
- "Should be error because upsert mode and overwrite mode enable at the same time.",
- IllegalStateException.class,
- "OVERWRITE mode shouldn't be enable",
- () ->
- builder.equalityFieldColumns(ImmutableList.of("id", "data")).overwrite(true).append());
-
- AssertHelpers.assertThrows(
- "Should be error because equality field columns are empty.",
- IllegalStateException.class,
- "Equality field columns shouldn't be empty",
- () -> builder.equalityFieldColumns(ImmutableList.of()).overwrite(false).append());
+ Assertions.assertThatThrownBy(
+ () ->
+ builder
+ .equalityFieldColumns(ImmutableList.of("id", "data"))
+ .overwrite(true)
+ .append())
+ .isInstanceOf(IllegalStateException.class)
+ .hasMessage(
+ "OVERWRITE mode shouldn't be enable when configuring to use UPSERT data stream.");
+
+ Assertions.assertThatThrownBy(
+ () -> builder.equalityFieldColumns(ImmutableList.of()).overwrite(false).append())
+ .isInstanceOf(IllegalStateException.class)
+ .hasMessage(
+ "Equality field columns shouldn't be empty when configuring to use UPSERT data stream.");
}
@Test
diff --git a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2Base.java b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2Base.java
index 15380408e4..0b403756ce 100644
--- a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2Base.java
+++ b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2Base.java
@@ -28,7 +28,6 @@ import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.types.Row;
import org.apache.flink.types.RowKind;
-import org.apache.iceberg.AssertHelpers;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.Table;
@@ -43,6 +42,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.util.StructLikeSet;
+import org.assertj.core.api.Assertions;
import org.junit.Assert;
public class TestFlinkIcebergSinkV2Base {
@@ -231,20 +231,20 @@ public class TestFlinkIcebergSinkV2Base {
ImmutableList.of(record(1, "ddd"), record(2, "ddd")));
if (partitioned && writeDistributionMode.equals(TableProperties.WRITE_DISTRIBUTION_MODE_HASH)) {
- AssertHelpers.assertThrows(
- "Should be error because equality field columns don't include all partition keys",
- IllegalStateException.class,
- "should be included in equality fields",
- () -> {
- testChangeLogs(
- ImmutableList.of("id"),
- row -> row.getField(ROW_ID_POS),
- false,
- elementsPerCheckpoint,
- expectedRecords,
- branch);
- return null;
- });
+ Assertions.assertThatThrownBy(
+ () ->
+ testChangeLogs(
+ ImmutableList.of("id"),
+ row -> row.getField(ROW_ID_POS),
+ false,
+ elementsPerCheckpoint,
+ expectedRecords,
+ branch))
+ .isInstanceOf(IllegalStateException.class)
+ .hasMessageStartingWith(
+ "In 'hash' distribution mode with equality fields set, partition field")
+ .hasMessageContaining("should be included in equality fields:");
+
} else {
testChangeLogs(
ImmutableList.of("id"),
@@ -278,20 +278,17 @@ public class TestFlinkIcebergSinkV2Base {
expectedRecords,
branch);
} else {
- AssertHelpers.assertThrows(
- "Should be error because equality field columns don't include all partition keys",
- IllegalStateException.class,
- "should be included in equality fields",
- () -> {
- testChangeLogs(
- ImmutableList.of("id"),
- row -> row.getField(ROW_ID_POS),
- true,
- elementsPerCheckpoint,
- expectedRecords,
- branch);
- return null;
- });
+ Assertions.assertThatThrownBy(
+ () ->
+ testChangeLogs(
+ ImmutableList.of("id"),
+ row -> row.getField(ROW_ID_POS),
+ true,
+ elementsPerCheckpoint,
+ expectedRecords,
+ branch))
+ .isInstanceOf(IllegalStateException.class)
+ .hasMessageContaining("should be included in equality fields:");
}
}
diff --git a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkScan.java b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkScan.java
index aa5b51eddf..b537efa727 100644
--- a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkScan.java
+++ b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkScan.java
@@ -28,7 +28,6 @@ import java.util.Map;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.flink.types.Row;
import org.apache.iceberg.AppendFiles;
-import org.apache.iceberg.AssertHelpers;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.PartitionSpec;
@@ -49,6 +48,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.DateTimeUtil;
+import org.assertj.core.api.Assertions;
import org.junit.Assert;
import org.junit.ClassRule;
import org.junit.Rule;
@@ -355,27 +355,27 @@ public abstract class TestFlinkScan {
expected,
TestFixtures.SCHEMA);
- AssertHelpers.assertThrows(
- "START_SNAPSHOT_ID and START_TAG cannot both be set.",
- Exception.class,
- () ->
- runWithOptions(
- ImmutableMap.<String, String>builder()
- .put("start-tag", startTag)
- .put("end-tag", endTag)
- .put("start-snapshot-id", Long.toString(snapshotId1))
- .buildOrThrow()));
-
- AssertHelpers.assertThrows(
- "END_SNAPSHOT_ID and END_TAG cannot both be set.",
- Exception.class,
- () ->
- runWithOptions(
- ImmutableMap.<String, String>builder()
- .put("start-tag", startTag)
- .put("end-tag", endTag)
- .put("end-snapshot-id", Long.toString(snapshotId3))
- .buildOrThrow()));
+ Assertions.assertThatThrownBy(
+ () ->
+ runWithOptions(
+ ImmutableMap.<String, String>builder()
+ .put("start-tag", startTag)
+ .put("end-tag", endTag)
+ .put("start-snapshot-id", Long.toString(snapshotId1))
+ .buildOrThrow()))
+ .isInstanceOf(Exception.class)
+ .hasMessage("START_SNAPSHOT_ID and START_TAG cannot both be set.");
+
+ Assertions.assertThatThrownBy(
+ () ->
+ runWithOptions(
+ ImmutableMap.<String, String>builder()
+ .put("start-tag", startTag)
+ .put("end-tag", endTag)
+ .put("end-snapshot-id", Long.toString(snapshotId3))
+ .buildOrThrow()))
+ .isInstanceOf(Exception.class)
+ .hasMessage("END_SNAPSHOT_ID and END_TAG cannot both be set.");
}
@Test
diff --git a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkSourceConfig.java b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkSourceConfig.java
index 974b8539b3..1814ff8f85 100644
--- a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkSourceConfig.java
+++ b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkSourceConfig.java
@@ -20,8 +20,8 @@ package org.apache.iceberg.flink.source;
import java.util.List;
import org.apache.flink.types.Row;
-import org.apache.iceberg.AssertHelpers;
import org.apache.iceberg.flink.FlinkReadOptions;
+import org.assertj.core.api.Assertions;
import org.junit.Assert;
import org.junit.Test;
@@ -31,14 +31,10 @@ public class TestFlinkSourceConfig extends TestFlinkTableSource {
@Test
public void testFlinkSessionConfig() {
getTableEnv().getConfig().set(FlinkReadOptions.STREAMING_OPTION, true);
- AssertHelpers.assertThrows(
- "Should throw exception because of cannot set snapshot-id option for streaming reader",
- IllegalArgumentException.class,
- "Cannot set as-of-timestamp option for streaming reader",
- () -> {
- sql("SELECT * FROM %s /*+ OPTIONS('as-of-timestamp'='1')*/", TABLE);
- return null;
- });
+ Assertions.assertThatThrownBy(
+ () -> sql("SELECT * FROM %s /*+ OPTIONS('as-of-timestamp'='1')*/", TABLE))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessage("Cannot set as-of-timestamp option for streaming reader");
}
@Test
diff --git a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkTableSource.java b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkTableSource.java
index d72f57dcea..ff14bc4062 100644
--- a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkTableSource.java
+++ b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkTableSource.java
@@ -25,7 +25,6 @@ import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.table.api.SqlParserException;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.types.Row;
-import org.apache.iceberg.AssertHelpers;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.events.Listeners;
import org.apache.iceberg.events.ScanEvent;
@@ -103,10 +102,9 @@ public class TestFlinkTableSource extends FlinkTestBase {
@Test
public void testLimitPushDown() {
- AssertHelpers.assertThrows(
- "Invalid limit number: -1 ",
- SqlParserException.class,
- () -> sql("SELECT * FROM %s LIMIT -1", TABLE_NAME));
+ Assertions.assertThatThrownBy(() -> sql("SELECT * FROM %s LIMIT -1", TABLE_NAME))
+ .isInstanceOf(SqlParserException.class)
+ .hasMessageStartingWith("SQL parse failed.");
Assert.assertEquals(
"Should have 0 record", 0, sql("SELECT * FROM %s LIMIT 0", TABLE_NAME).size());
diff --git a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamScanSql.java b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamScanSql.java
index abcce11e36..633e11718b 100644
--- a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamScanSql.java
+++ b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamScanSql.java
@@ -30,7 +30,6 @@ import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.api.config.TableConfigOptions;
import org.apache.flink.types.Row;
import org.apache.flink.util.CloseableIterator;
-import org.apache.iceberg.AssertHelpers;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.Table;
import org.apache.iceberg.TestHelpers;
@@ -43,6 +42,7 @@ import org.apache.iceberg.flink.FlinkCatalogTestBase;
import org.apache.iceberg.flink.MiniClusterResource;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.assertj.core.api.Assertions;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
@@ -220,14 +220,13 @@ public class TestStreamScanSql extends FlinkCatalogTestBase {
Row row2 = Row.of(2, "bbb", "2021-01-01");
insertRows(table, row1, row2);
- AssertHelpers.assertThrows(
- "Cannot scan table using ref for stream yet",
- IllegalArgumentException.class,
- "Cannot scan table using ref",
- () ->
- exec(
- "SELECT * FROM %s /*+ OPTIONS('streaming'='true', 'monitor-interval'='1s', 'branch'='b1')*/",
- TABLE));
+ Assertions.assertThatThrownBy(
+ () ->
+ exec(
+ "SELECT * FROM %s /*+ OPTIONS('streaming'='true', 'monitor-interval'='1s', 'branch'='b1')*/",
+ TABLE))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessage("Cannot scan table using ref b1 configured for streaming reader yet");
}
@Test
@@ -307,14 +306,13 @@ public class TestStreamScanSql extends FlinkCatalogTestBase {
}
result.getJobClient().ifPresent(JobClient::cancel);
- AssertHelpers.assertThrows(
- "START_SNAPSHOT_ID and START_TAG cannot both be set.",
- IllegalArgumentException.class,
- "START_SNAPSHOT_ID and START_TAG cannot both be set.",
- () ->
- exec(
- "SELECT * FROM %s /*+ OPTIONS('streaming'='true', 'monitor-interval'='1s', 'start-tag'='%s', "
- + "'start-snapshot-id'='%d' )*/",
- TABLE, tagName, startSnapshotId));
+ Assertions.assertThatThrownBy(
+ () ->
+ exec(
+ "SELECT * FROM %s /*+ OPTIONS('streaming'='true', 'monitor-interval'='1s', 'start-tag'='%s', "
+ + "'start-snapshot-id'='%d' )*/",
+ TABLE, tagName, startSnapshotId))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessage("START_SNAPSHOT_ID and START_TAG cannot both be set.");
}
}
diff --git a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamingMonitorFunction.java b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamingMonitorFunction.java
index a161645979..1b9049f1bb 100644
--- a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamingMonitorFunction.java
+++ b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamingMonitorFunction.java
@@ -31,7 +31,6 @@ import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
import org.apache.flink.table.data.RowData;
import org.apache.flink.types.Row;
-import org.apache.iceberg.AssertHelpers;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
@@ -47,6 +46,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.SnapshotUtil;
import org.apache.iceberg.util.ThreadPools;
+import org.assertj.core.api.Assertions;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@@ -259,14 +259,9 @@ public class TestStreamingMonitorFunction extends TableTestBase {
.maxPlanningSnapshotCount(0)
.build();
- AssertHelpers.assertThrows(
- "Should throw exception because of invalid config",
- IllegalArgumentException.class,
- "must be greater than zero",
- () -> {
- createFunction(scanContext1);
- return null;
- });
+ Assertions.assertThatThrownBy(() -> createFunction(scanContext1))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessage("The max-planning-snapshot-count must be greater than zero");
ScanContext scanContext2 =
ScanContext.builder()
@@ -274,14 +269,9 @@ public class TestStreamingMonitorFunction extends TableTestBase {
.maxPlanningSnapshotCount(-10)
.build();
- AssertHelpers.assertThrows(
- "Should throw exception because of invalid config",
- IllegalArgumentException.class,
- "must be greater than zero",
- () -> {
- createFunction(scanContext2);
- return null;
- });
+ Assertions.assertThatThrownBy(() -> createFunction(scanContext2))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessage("The max-planning-snapshot-count must be greater than zero");
}
@Test
diff --git a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/enumerator/TestContinuousSplitPlannerImpl.java b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/enumerator/TestContinuousSplitPlannerImpl.java
index fef61bc048..2fa921b1f1 100644
--- a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/enumerator/TestContinuousSplitPlannerImpl.java
+++ b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/enumerator/TestContinuousSplitPlannerImpl.java
@@ -23,7 +23,6 @@ import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
-import org.apache.iceberg.AssertHelpers;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.Snapshot;
@@ -37,6 +36,7 @@ import org.apache.iceberg.flink.source.StreamingStartingStrategy;
import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.assertj.core.api.Assertions;
import org.junit.Assert;
import org.junit.Before;
import org.junit.ClassRule;
@@ -338,11 +338,9 @@ public class TestContinuousSplitPlannerImpl {
new ContinuousSplitPlannerImpl(
tableResource.tableLoader().clone(), scanContextWithInvalidSnapshotId, null);
- AssertHelpers.assertThrows(
- "Should detect invalid starting snapshot id",
- IllegalArgumentException.class,
- "Start snapshot id not found in history: 1",
- () -> splitPlanner.planSplits(null));
+ Assertions.assertThatThrownBy(() -> splitPlanner.planSplits(null))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessage("Start snapshot id not found in history: 1");
}
@Test
@@ -366,11 +364,9 @@ public class TestContinuousSplitPlannerImpl {
new ContinuousSplitPlannerImpl(
tableResource.tableLoader().clone(), scanContextWithInvalidSnapshotId, null);
- AssertHelpers.assertThrows(
- "Should detect invalid starting snapshot id",
- IllegalArgumentException.class,
- "Start snapshot id not found in history: " + invalidSnapshotId,
- () -> splitPlanner.planSplits(null));
+ Assertions.assertThatThrownBy(() -> splitPlanner.planSplits(null))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessage("Start snapshot id not found in history: " + invalidSnapshotId);
}
@Test
@@ -430,11 +426,9 @@ public class TestContinuousSplitPlannerImpl {
new ContinuousSplitPlannerImpl(
tableResource.tableLoader().clone(), scanContextWithInvalidSnapshotId, null);
- AssertHelpers.assertThrows(
- "Should detect invalid starting snapshot timestamp",
- IllegalArgumentException.class,
- "Cannot find a snapshot after: ",
- () -> splitPlanner.planSplits(null));
+ Assertions.assertThatThrownBy(() -> splitPlanner.planSplits(null))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessage("Cannot find a snapshot after: 1");
}
@Test
@@ -453,11 +447,9 @@ public class TestContinuousSplitPlannerImpl {
new ContinuousSplitPlannerImpl(
tableResource.tableLoader().clone(), scanContextWithInvalidSnapshotId, null);
- AssertHelpers.assertThrows(
- "Should detect invalid starting snapshot timestamp",
- IllegalArgumentException.class,
- "Cannot find a snapshot after: ",
- () -> splitPlanner.planSplits(null));
+ Assertions.assertThatThrownBy(() -> splitPlanner.planSplits(null))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessageStartingWith("Cannot find a snapshot after:");
}
@Test
diff --git a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/enumerator/TestContinuousSplitPlannerImplStartStrategy.java b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/enumerator/TestContinuousSplitPlannerImplStartStrategy.java
index 2df2846f7e..2c94f21590 100644
--- a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/enumerator/TestContinuousSplitPlannerImplStartStrategy.java
+++ b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/enumerator/TestContinuousSplitPlannerImplStartStrategy.java
@@ -20,7 +20,6 @@ package org.apache.iceberg.flink.source.enumerator;
import java.io.IOException;
import java.util.List;
-import org.apache.iceberg.AssertHelpers;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.data.GenericAppenderHelper;
@@ -30,6 +29,7 @@ import org.apache.iceberg.flink.HadoopTableResource;
import org.apache.iceberg.flink.TestFixtures;
import org.apache.iceberg.flink.source.ScanContext;
import org.apache.iceberg.flink.source.StreamingStartingStrategy;
+import org.assertj.core.api.Assertions;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
@@ -79,7 +79,7 @@ public class TestContinuousSplitPlannerImplStartStrategy {
.startingStrategy(StreamingStartingStrategy.TABLE_SCAN_THEN_INCREMENTAL)
.build();
- // emtpy table
+ // empty table
Assert.assertFalse(
ContinuousSplitPlannerImpl.startSnapshot(tableResource.table(), scanContext).isPresent());
@@ -97,7 +97,7 @@ public class TestContinuousSplitPlannerImplStartStrategy {
.startingStrategy(StreamingStartingStrategy.INCREMENTAL_FROM_LATEST_SNAPSHOT)
.build();
- // emtpy table
+ // empty table
Assert.assertFalse(
ContinuousSplitPlannerImpl.startSnapshot(tableResource.table(), scanContext).isPresent());
@@ -115,7 +115,7 @@ public class TestContinuousSplitPlannerImplStartStrategy {
.startingStrategy(StreamingStartingStrategy.INCREMENTAL_FROM_EARLIEST_SNAPSHOT)
.build();
- // emtpy table
+ // empty table
Assert.assertFalse(
ContinuousSplitPlannerImpl.startSnapshot(tableResource.table(), scanContext).isPresent());
@@ -134,14 +134,13 @@ public class TestContinuousSplitPlannerImplStartStrategy {
.startSnapshotId(1L)
.build();
- // emtpy table
- AssertHelpers.assertThrows(
- "Should detect invalid starting snapshot id",
- IllegalArgumentException.class,
- "Start snapshot id not found in history: 1",
- () ->
- ContinuousSplitPlannerImpl.startSnapshot(
- tableResource.table(), scanContextInvalidSnapshotId));
+ // empty table
+ Assertions.assertThatThrownBy(
+ () ->
+ ContinuousSplitPlannerImpl.startSnapshot(
+ tableResource.table(), scanContextInvalidSnapshotId))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessage("Start snapshot id not found in history: 1");
appendThreeSnapshots();
@@ -166,14 +165,13 @@ public class TestContinuousSplitPlannerImplStartStrategy {
.startSnapshotTimestamp(1L)
.build();
- // emtpy table
- AssertHelpers.assertThrows(
- "Should detect invalid starting snapshot timestamp",
- IllegalArgumentException.class,
- "Cannot find a snapshot after: ",
- () ->
- ContinuousSplitPlannerImpl.startSnapshot(
- tableResource.table(), scanContextInvalidSnapshotTimestamp));
+ // empty table
+ Assertions.assertThatThrownBy(
+ () ->
+ ContinuousSplitPlannerImpl.startSnapshot(
+ tableResource.table(), scanContextInvalidSnapshotTimestamp))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessageStartingWith("Cannot find a snapshot after: ");
appendThreeSnapshots();