You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by et...@apache.org on 2023/05/02 07:25:13 UTC

[iceberg] branch master updated: Flink: Remove deprecated AssertHelpers (#7481)

This is an automated email from the ASF dual-hosted git repository.

etudenhoefner pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new f9f85e7f26 Flink: Remove deprecated AssertHelpers (#7481)
f9f85e7f26 is described below

commit f9f85e7f2647020ef7e9571f76027c8e55075690
Author: Liu Xiao <42...@users.noreply.github.com>
AuthorDate: Tue May 2 15:25:08 2023 +0800

    Flink: Remove deprecated AssertHelpers (#7481)
---
 .../iceberg/flink/TestFlinkCatalogDatabase.java    | 24 +++++-----
 .../iceberg/flink/TestFlinkCatalogFactory.java     | 20 ++++----
 .../iceberg/flink/TestFlinkCatalogTable.java       | 29 +++++-------
 .../flink/TestFlinkCatalogTablePartitions.java     | 10 ++--
 .../apache/iceberg/flink/TestFlinkSchemaUtil.java  | 12 ++---
 .../apache/iceberg/flink/TestIcebergConnector.java | 32 +++++++------
 .../iceberg/flink/data/TestRowDataProjection.java  | 29 +++++-------
 .../iceberg/flink/sink/TestFlinkIcebergSink.java   | 41 ++++------------
 .../iceberg/flink/sink/TestFlinkIcebergSinkV2.java | 29 +++++++-----
 .../flink/sink/TestFlinkIcebergSinkV2Base.java     | 55 ++++++++++------------
 .../apache/iceberg/flink/source/TestFlinkScan.java | 44 ++++++++---------
 .../flink/source/TestFlinkSourceConfig.java        | 14 ++----
 .../iceberg/flink/source/TestFlinkTableSource.java |  8 ++--
 .../iceberg/flink/source/TestStreamScanSql.java    | 34 +++++++------
 .../flink/source/TestStreamingMonitorFunction.java | 24 +++-------
 .../enumerator/TestContinuousSplitPlannerImpl.java | 34 +++++--------
 ...estContinuousSplitPlannerImplStartStrategy.java | 38 +++++++--------
 17 files changed, 213 insertions(+), 264 deletions(-)

diff --git a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogDatabase.java b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogDatabase.java
index d4de12c623..0e767fdc81 100644
--- a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogDatabase.java
+++ b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogDatabase.java
@@ -24,12 +24,12 @@ import java.util.Map;
 import java.util.Objects;
 import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException;
 import org.apache.flink.types.Row;
-import org.apache.iceberg.AssertHelpers;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.catalog.Namespace;
 import org.apache.iceberg.catalog.TableIdentifier;
 import org.apache.iceberg.relocated.com.google.common.collect.Sets;
 import org.apache.iceberg.types.Types;
+import org.assertj.core.api.Assertions;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Assume;
@@ -126,11 +126,11 @@ public class TestFlinkCatalogDatabase extends FlinkCatalogTestBase {
         "Table should exist",
         validationCatalog.tableExists(TableIdentifier.of(icebergNamespace, "tl")));
 
-    AssertHelpers.assertThrowsCause(
-        "Should fail if trying to delete a non-empty database",
-        DatabaseNotEmptyException.class,
-        String.format("Database %s in catalog %s is not empty.", DATABASE, catalogName),
-        () -> sql("DROP DATABASE %s", flinkDatabase));
+    Assertions.assertThatThrownBy(() -> sql("DROP DATABASE %s", flinkDatabase))
+        .cause()
+        .isInstanceOf(DatabaseNotEmptyException.class)
+        .hasMessage(
+            String.format("Database %s in catalog %s is not empty.", DATABASE, catalogName));
 
     sql("DROP TABLE %s.tl", flinkDatabase);
   }
@@ -301,10 +301,12 @@ public class TestFlinkCatalogDatabase extends FlinkCatalogTestBase {
         "Namespace should not already exist",
         validationNamespaceCatalog.namespaceExists(icebergNamespace));
 
-    AssertHelpers.assertThrowsCause(
-        "Should fail if trying to create database with location in hadoop catalog.",
-        UnsupportedOperationException.class,
-        String.format("Cannot create namespace %s: metadata is not supported", icebergNamespace),
-        () -> sql("CREATE DATABASE %s WITH ('prop'='value')", flinkDatabase));
+    Assertions.assertThatThrownBy(
+            () -> sql("CREATE DATABASE %s WITH ('prop'='value')", flinkDatabase))
+        .cause()
+        .isInstanceOf(UnsupportedOperationException.class)
+        .hasMessage(
+            String.format(
+                "Cannot create namespace %s: metadata is not supported", icebergNamespace));
   }
 }
diff --git a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogFactory.java b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogFactory.java
index f7edd5653e..184223847c 100644
--- a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogFactory.java
+++ b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogFactory.java
@@ -20,7 +20,6 @@ package org.apache.iceberg.flink;
 
 import java.util.Map;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.iceberg.AssertHelpers;
 import org.apache.iceberg.CatalogProperties;
 import org.apache.iceberg.catalog.Catalog;
 import org.apache.iceberg.hadoop.HadoopCatalog;
@@ -87,11 +86,11 @@ public class TestFlinkCatalogFactory {
     props.put(
         FlinkCatalogFactory.ICEBERG_CATALOG_TYPE, FlinkCatalogFactory.ICEBERG_CATALOG_TYPE_HIVE);
 
-    AssertHelpers.assertThrows(
-        "Should throw when both catalog-type and catalog-impl are set",
-        IllegalArgumentException.class,
-        "both catalog-type and catalog-impl are set",
-        () -> FlinkCatalogFactory.createCatalogLoader(catalogName, props, new Configuration()));
+    Assertions.assertThatThrownBy(
+            () -> FlinkCatalogFactory.createCatalogLoader(catalogName, props, new Configuration()))
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessageStartingWith(
+            "Cannot create catalog customCatalog, both catalog-type and catalog-impl are set");
   }
 
   @Test
@@ -99,11 +98,10 @@ public class TestFlinkCatalogFactory {
     String catalogName = "unknownCatalog";
     props.put(FlinkCatalogFactory.ICEBERG_CATALOG_TYPE, "fooType");
 
-    AssertHelpers.assertThrows(
-        "Should throw when an unregistered / unknown catalog is set as the catalog factor's`type` setting",
-        UnsupportedOperationException.class,
-        "Unknown catalog-type",
-        () -> FlinkCatalogFactory.createCatalogLoader(catalogName, props, new Configuration()));
+    Assertions.assertThatThrownBy(
+            () -> FlinkCatalogFactory.createCatalogLoader(catalogName, props, new Configuration()))
+        .isInstanceOf(UnsupportedOperationException.class)
+        .hasMessageStartingWith("Unknown catalog-type: fooType");
   }
 
   public static class CustomHadoopCatalog extends HadoopCatalog {
diff --git a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTable.java b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTable.java
index 45b3da5fe6..c6c26833c7 100644
--- a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTable.java
+++ b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTable.java
@@ -32,7 +32,6 @@ import org.apache.flink.table.api.constraints.UniqueConstraint;
 import org.apache.flink.table.catalog.CatalogTable;
 import org.apache.flink.table.catalog.ObjectPath;
 import org.apache.flink.table.catalog.exceptions.TableNotExistException;
-import org.apache.iceberg.AssertHelpers;
 import org.apache.iceberg.BaseTable;
 import org.apache.iceberg.ContentFile;
 import org.apache.iceberg.DataFile;
@@ -54,6 +53,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
 import org.apache.iceberg.relocated.com.google.common.collect.Maps;
 import org.apache.iceberg.relocated.com.google.common.collect.Sets;
 import org.apache.iceberg.types.Types;
+import org.assertj.core.api.Assertions;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Assume;
@@ -104,11 +104,11 @@ public class TestFlinkCatalogTable extends FlinkCatalogTestBase {
         new Schema(Types.NestedField.optional(0, "id", Types.LongType.get()));
     validationCatalog.createTable(TableIdentifier.of(icebergNamespace, "tl"), tableSchema);
     sql("ALTER TABLE tl RENAME TO tl2");
-    AssertHelpers.assertThrows(
-        "Should fail if trying to get a nonexistent table",
-        ValidationException.class,
-        "Table `tl` was not found.",
-        () -> getTableEnv().from("tl"));
+
+    Assertions.assertThatThrownBy(() -> getTableEnv().from("tl"))
+        .isInstanceOf(ValidationException.class)
+        .hasMessage("Table `tl` was not found.");
+
     Schema actualSchema = FlinkSchemaUtil.convert(getTableEnv().from("tl2").getSchema());
     Assert.assertEquals(tableSchema.asStruct(), actualSchema.asStruct());
   }
@@ -179,11 +179,9 @@ public class TestFlinkCatalogTable extends FlinkCatalogTestBase {
     Assert.assertEquals(Maps.newHashMap(), table("tl").properties());
 
     sql("DROP TABLE tl");
-    AssertHelpers.assertThrows(
-        "Table 'tl' should be dropped",
-        NoSuchTableException.class,
-        "Table does not exist: " + getFullQualifiedTableName("tl"),
-        () -> table("tl"));
+    Assertions.assertThatThrownBy(() -> table("tl"))
+        .isInstanceOf(NoSuchTableException.class)
+        .hasMessage("Table does not exist: " + getFullQualifiedTableName("tl"));
 
     sql("CREATE TABLE IF NOT EXISTS tl(id BIGINT)");
     Assert.assertEquals(Maps.newHashMap(), table("tl").properties());
@@ -286,11 +284,10 @@ public class TestFlinkCatalogTable extends FlinkCatalogTestBase {
     TableOperations ops = ((BaseTable) table).operations();
     Assert.assertEquals("should create table using format v2", 2, ops.refresh().formatVersion());
 
-    AssertHelpers.assertThrowsRootCause(
-        "should fail to downgrade to v1",
-        IllegalArgumentException.class,
-        "Cannot downgrade v2 table to v1",
-        () -> sql("ALTER TABLE tl SET('format-version'='1')"));
+    Assertions.assertThatThrownBy(() -> sql("ALTER TABLE tl SET('format-version'='1')"))
+        .rootCause()
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessage("Cannot downgrade v2 table to v1");
   }
 
   @Test
diff --git a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTablePartitions.java b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTablePartitions.java
index e5332e8f30..0008e4320c 100644
--- a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTablePartitions.java
+++ b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTablePartitions.java
@@ -23,12 +23,12 @@ import org.apache.flink.table.catalog.CatalogPartitionSpec;
 import org.apache.flink.table.catalog.ObjectPath;
 import org.apache.flink.table.catalog.exceptions.TableNotExistException;
 import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException;
-import org.apache.iceberg.AssertHelpers;
 import org.apache.iceberg.CatalogProperties;
 import org.apache.iceberg.FileFormat;
 import org.apache.iceberg.catalog.Namespace;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.assertj.core.api.Assertions;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
@@ -90,10 +90,10 @@ public class TestFlinkCatalogTablePartitions extends FlinkCatalogTestBase {
 
     ObjectPath objectPath = new ObjectPath(DATABASE, tableName);
     FlinkCatalog flinkCatalog = (FlinkCatalog) getTableEnv().getCatalog(catalogName).get();
-    AssertHelpers.assertThrows(
-        "Should not list partitions for unpartitioned table.",
-        TableNotPartitionedException.class,
-        () -> flinkCatalog.listPartitions(objectPath));
+    Assertions.assertThatThrownBy(() -> flinkCatalog.listPartitions(objectPath))
+        .isInstanceOf(TableNotPartitionedException.class)
+        .hasMessageStartingWith("Table db.test_table in catalog")
+        .hasMessageEndingWith("is not partitioned.");
   }
 
   @Test
diff --git a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/TestFlinkSchemaUtil.java b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/TestFlinkSchemaUtil.java
index b5dfb9cb2f..4ac32c08eb 100644
--- a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/TestFlinkSchemaUtil.java
+++ b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/TestFlinkSchemaUtil.java
@@ -30,13 +30,13 @@ import org.apache.flink.table.types.logical.TimeType;
 import org.apache.flink.table.types.logical.TimestampType;
 import org.apache.flink.table.types.logical.VarBinaryType;
 import org.apache.flink.table.types.logical.VarCharType;
-import org.apache.iceberg.AssertHelpers;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.relocated.com.google.common.collect.Sets;
 import org.apache.iceberg.types.Type;
 import org.apache.iceberg.types.Types;
+import org.assertj.core.api.Assertions;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -407,10 +407,10 @@ public class TestFlinkSchemaUtil {
                     Types.StructType.of(
                         Types.NestedField.required(2, "inner", Types.IntegerType.get())))),
             Sets.newHashSet(2));
-    AssertHelpers.assertThrows(
-        "Does not support the nested columns in flink schema's primary keys",
-        ValidationException.class,
-        "Column 'struct.inner' does not exist",
-        () -> FlinkSchemaUtil.toSchema(icebergSchema));
+
+    Assertions.assertThatThrownBy(() -> FlinkSchemaUtil.toSchema(icebergSchema))
+        .isInstanceOf(ValidationException.class)
+        .hasMessageStartingWith("Could not create a PRIMARY KEY")
+        .hasMessageContaining("Column 'struct.inner' does not exist.");
   }
 }
diff --git a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/TestIcebergConnector.java b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/TestIcebergConnector.java
index a12fe2507f..4f71b5fe8d 100644
--- a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/TestIcebergConnector.java
+++ b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/TestIcebergConnector.java
@@ -31,13 +31,13 @@ import org.apache.flink.table.catalog.ObjectPath;
 import org.apache.flink.types.Row;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
-import org.apache.iceberg.AssertHelpers;
 import org.apache.iceberg.CatalogProperties;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.relocated.com.google.common.collect.Maps;
 import org.apache.iceberg.relocated.com.google.common.collect.Sets;
 import org.apache.thrift.TException;
+import org.assertj.core.api.Assertions;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.ClassRule;
@@ -261,11 +261,10 @@ public class TestIcebergConnector extends FlinkTestBase {
     try {
       testCreateConnectorTable();
       // Ensure that the table was created under the specific database.
-      AssertHelpers.assertThrows(
-          "Table should already exists",
-          org.apache.flink.table.api.TableException.class,
-          "Could not execute CreateTable in path",
-          () -> sql("CREATE TABLE `default_catalog`.`%s`.`%s`", databaseName(), TABLE_NAME));
+      Assertions.assertThatThrownBy(
+              () -> sql("CREATE TABLE `default_catalog`.`%s`.`%s`", databaseName(), TABLE_NAME))
+          .isInstanceOf(org.apache.flink.table.api.TableException.class)
+          .hasMessageStartingWith("Could not execute CreateTable in path");
     } finally {
       sql("DROP TABLE IF EXISTS `%s`.`%s`", databaseName(), TABLE_NAME);
       if (!isDefaultDatabaseName()) {
@@ -293,14 +292,19 @@ public class TestIcebergConnector extends FlinkTestBase {
     // Create a connector table in an iceberg catalog.
     sql("CREATE CATALOG `test_catalog` WITH %s", toWithClause(catalogProps));
     try {
-      AssertHelpers.assertThrowsCause(
-          "Cannot create the iceberg connector table in iceberg catalog",
-          IllegalArgumentException.class,
-          "Cannot create the table with 'connector'='iceberg' table property in an iceberg catalog",
-          () ->
-              sql(
-                  "CREATE TABLE `test_catalog`.`%s`.`%s` (id BIGINT, data STRING) WITH %s",
-                  FlinkCatalogFactory.DEFAULT_DATABASE_NAME, TABLE_NAME, toWithClause(tableProps)));
+      Assertions.assertThatThrownBy(
+              () ->
+                  sql(
+                      "CREATE TABLE `test_catalog`.`%s`.`%s` (id BIGINT, data STRING) WITH %s",
+                      FlinkCatalogFactory.DEFAULT_DATABASE_NAME,
+                      TABLE_NAME,
+                      toWithClause(tableProps)))
+          .cause()
+          .isInstanceOf(IllegalArgumentException.class)
+          .hasMessage(
+              "Cannot create the table with 'connector'='iceberg' table property in an iceberg catalog, "
+                  + "Please create table with 'connector'='iceberg' property in a non-iceberg catalog or "
+                  + "create table without 'connector'='iceberg' related properties in an iceberg table.");
     } finally {
       sql("DROP CATALOG IF EXISTS `test_catalog`");
     }
diff --git a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/data/TestRowDataProjection.java b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/data/TestRowDataProjection.java
index 4cb77b11fd..a3bbb9ae25 100644
--- a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/data/TestRowDataProjection.java
+++ b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/data/TestRowDataProjection.java
@@ -20,7 +20,6 @@ package org.apache.iceberg.flink.data;
 
 import java.util.Iterator;
 import org.apache.flink.table.data.RowData;
-import org.apache.iceberg.AssertHelpers;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.StructLike;
 import org.apache.iceberg.data.RandomGenericData;
@@ -28,6 +27,7 @@ import org.apache.iceberg.data.Record;
 import org.apache.iceberg.flink.TestHelpers;
 import org.apache.iceberg.types.Types;
 import org.apache.iceberg.util.StructProjection;
+import org.assertj.core.api.Assertions;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -223,11 +223,10 @@ public class TestRowDataProjection {
                     Types.StructType.of(
                         Types.NestedField.required(3, "value", Types.LongType.get()),
                         Types.NestedField.required(4, "valueData", Types.LongType.get())))));
-    AssertHelpers.assertThrows(
-        "Should not allow to project a partial map key with non-primitive type.",
-        IllegalArgumentException.class,
-        "Cannot project a partial map key or value",
-        () -> generateAndValidate(schema, partialMapKey));
+
+    Assertions.assertThatThrownBy(() -> generateAndValidate(schema, partialMapKey))
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessageStartingWith("Cannot project a partial map key or value struct.");
 
     // Project partial map key.
     Schema partialMapValue =
@@ -243,11 +242,10 @@ public class TestRowDataProjection {
                         Types.NestedField.required(2, "keyData", Types.LongType.get())),
                     Types.StructType.of(
                         Types.NestedField.required(3, "value", Types.LongType.get())))));
-    AssertHelpers.assertThrows(
-        "Should not allow to project a partial map value with non-primitive type.",
-        IllegalArgumentException.class,
-        "Cannot project a partial map key or value",
-        () -> generateAndValidate(schema, partialMapValue));
+
+    Assertions.assertThatThrownBy(() -> generateAndValidate(schema, partialMapValue))
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessageStartingWith("Cannot project a partial map key or value struct.");
   }
 
   @Test
@@ -306,11 +304,10 @@ public class TestRowDataProjection {
                     4,
                     Types.StructType.of(
                         Types.NestedField.required(2, "nestedListField2", Types.LongType.get())))));
-    AssertHelpers.assertThrows(
-        "Should not allow to project a partial list element with non-primitive type.",
-        IllegalArgumentException.class,
-        "Cannot project a partial list element",
-        () -> generateAndValidate(schema, partialList));
+
+    Assertions.assertThatThrownBy(() -> generateAndValidate(schema, partialList))
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessageStartingWith("Cannot project a partial list element struct.");
   }
 
   private void generateAndValidate(Schema schema, Schema projectSchema) {
diff --git a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSink.java b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSink.java
index 23beb19a72..dc9e8991ed 100644
--- a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSink.java
+++ b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSink.java
@@ -28,7 +28,6 @@ import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.test.util.MiniClusterWithClientResource;
 import org.apache.flink.types.Row;
-import org.apache.iceberg.AssertHelpers;
 import org.apache.iceberg.DistributionMode;
 import org.apache.iceberg.FileFormat;
 import org.apache.iceberg.PartitionSpec;
@@ -45,6 +44,7 @@ import org.apache.iceberg.flink.util.FlinkCompatibilityUtil;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.assertj.core.api.Assertions;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.ClassRule;
@@ -197,14 +197,9 @@ public class TestFlinkIcebergSink extends TestFlinkIcebergSinkBase {
         .set(TableProperties.WRITE_DISTRIBUTION_MODE, DistributionMode.HASH.modeName())
         .commit();
 
-    AssertHelpers.assertThrows(
-        "Does not support range distribution-mode now.",
-        IllegalArgumentException.class,
-        "Flink does not support 'range' write distribution mode now.",
-        () -> {
-          testWriteRow(null, DistributionMode.RANGE);
-          return null;
-        });
+    Assertions.assertThatThrownBy(() -> testWriteRow(null, DistributionMode.RANGE))
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessage("Flink does not support 'range' write distribution mode now.");
   }
 
   @Test
@@ -350,17 +345,9 @@ public class TestFlinkIcebergSink extends TestFlinkIcebergSinkBase {
             .writeParallelism(parallelism)
             .setAll(newProps);
 
-    AssertHelpers.assertThrows(
-        "Should fail with invalid distribution mode.",
-        IllegalArgumentException.class,
-        "Invalid distribution mode: UNRECOGNIZED",
-        () -> {
-          builder.append();
-
-          // Execute the program.
-          env.execute("Test Iceberg DataStream.");
-          return null;
-        });
+    Assertions.assertThatThrownBy(builder::append)
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessage("Invalid distribution mode: UNRECOGNIZED");
   }
 
   @Test
@@ -378,16 +365,8 @@ public class TestFlinkIcebergSink extends TestFlinkIcebergSinkBase {
             .writeParallelism(parallelism)
             .setAll(newProps);
 
-    AssertHelpers.assertThrows(
-        "Should fail with invalid file format.",
-        IllegalArgumentException.class,
-        "Invalid file format: UNRECOGNIZED",
-        () -> {
-          builder.append();
-
-          // Execute the program.
-          env.execute("Test Iceberg DataStream.");
-          return null;
-        });
+    Assertions.assertThatThrownBy(builder::append)
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessage("Invalid file format: UNRECOGNIZED");
   }
 }
diff --git a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2.java b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2.java
index 6552fe834c..b5c3bcf417 100644
--- a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2.java
+++ b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2.java
@@ -23,7 +23,6 @@ import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.test.util.MiniClusterWithClientResource;
 import org.apache.flink.types.Row;
-import org.apache.iceberg.AssertHelpers;
 import org.apache.iceberg.FileFormat;
 import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.SnapshotRef;
@@ -39,6 +38,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.relocated.com.google.common.collect.Sets;
 import org.apache.iceberg.types.Types;
+import org.assertj.core.api.Assertions;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.ClassRule;
@@ -202,18 +202,21 @@ public class TestFlinkIcebergSinkV2 extends TestFlinkIcebergSinkV2Base {
             .writeParallelism(parallelism)
             .upsert(true);
 
-    AssertHelpers.assertThrows(
-        "Should be error because upsert mode and overwrite mode enable at the same time.",
-        IllegalStateException.class,
-        "OVERWRITE mode shouldn't be enable",
-        () ->
-            builder.equalityFieldColumns(ImmutableList.of("id", "data")).overwrite(true).append());
-
-    AssertHelpers.assertThrows(
-        "Should be error because equality field columns are empty.",
-        IllegalStateException.class,
-        "Equality field columns shouldn't be empty",
-        () -> builder.equalityFieldColumns(ImmutableList.of()).overwrite(false).append());
+    Assertions.assertThatThrownBy(
+            () ->
+                builder
+                    .equalityFieldColumns(ImmutableList.of("id", "data"))
+                    .overwrite(true)
+                    .append())
+        .isInstanceOf(IllegalStateException.class)
+        .hasMessage(
+            "OVERWRITE mode shouldn't be enable when configuring to use UPSERT data stream.");
+
+    Assertions.assertThatThrownBy(
+            () -> builder.equalityFieldColumns(ImmutableList.of()).overwrite(false).append())
+        .isInstanceOf(IllegalStateException.class)
+        .hasMessage(
+            "Equality field columns shouldn't be empty when configuring to use UPSERT data stream.");
   }
 
   @Test
diff --git a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2Base.java b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2Base.java
index 15380408e4..0b403756ce 100644
--- a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2Base.java
+++ b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2Base.java
@@ -28,7 +28,6 @@ import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.types.Row;
 import org.apache.flink.types.RowKind;
-import org.apache.iceberg.AssertHelpers;
 import org.apache.iceberg.FileFormat;
 import org.apache.iceberg.Snapshot;
 import org.apache.iceberg.Table;
@@ -43,6 +42,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.util.StructLikeSet;
+import org.assertj.core.api.Assertions;
 import org.junit.Assert;
 
 public class TestFlinkIcebergSinkV2Base {
@@ -231,20 +231,20 @@ public class TestFlinkIcebergSinkV2Base {
             ImmutableList.of(record(1, "ddd"), record(2, "ddd")));
 
     if (partitioned && writeDistributionMode.equals(TableProperties.WRITE_DISTRIBUTION_MODE_HASH)) {
-      AssertHelpers.assertThrows(
-          "Should be error because equality field columns don't include all partition keys",
-          IllegalStateException.class,
-          "should be included in equality fields",
-          () -> {
-            testChangeLogs(
-                ImmutableList.of("id"),
-                row -> row.getField(ROW_ID_POS),
-                false,
-                elementsPerCheckpoint,
-                expectedRecords,
-                branch);
-            return null;
-          });
+      Assertions.assertThatThrownBy(
+              () ->
+                  testChangeLogs(
+                      ImmutableList.of("id"),
+                      row -> row.getField(ROW_ID_POS),
+                      false,
+                      elementsPerCheckpoint,
+                      expectedRecords,
+                      branch))
+          .isInstanceOf(IllegalStateException.class)
+          .hasMessageStartingWith(
+              "In 'hash' distribution mode with equality fields set, partition field")
+          .hasMessageContaining("should be included in equality fields:");
+
     } else {
       testChangeLogs(
           ImmutableList.of("id"),
@@ -278,20 +278,17 @@ public class TestFlinkIcebergSinkV2Base {
           expectedRecords,
           branch);
     } else {
-      AssertHelpers.assertThrows(
-          "Should be error because equality field columns don't include all partition keys",
-          IllegalStateException.class,
-          "should be included in equality fields",
-          () -> {
-            testChangeLogs(
-                ImmutableList.of("id"),
-                row -> row.getField(ROW_ID_POS),
-                true,
-                elementsPerCheckpoint,
-                expectedRecords,
-                branch);
-            return null;
-          });
+      Assertions.assertThatThrownBy(
+              () ->
+                  testChangeLogs(
+                      ImmutableList.of("id"),
+                      row -> row.getField(ROW_ID_POS),
+                      true,
+                      elementsPerCheckpoint,
+                      expectedRecords,
+                      branch))
+          .isInstanceOf(IllegalStateException.class)
+          .hasMessageContaining("should be included in equality fields:");
     }
   }
 
diff --git a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkScan.java b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkScan.java
index aa5b51eddf..b537efa727 100644
--- a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkScan.java
+++ b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkScan.java
@@ -28,7 +28,6 @@ import java.util.Map;
 import org.apache.flink.test.util.MiniClusterWithClientResource;
 import org.apache.flink.types.Row;
 import org.apache.iceberg.AppendFiles;
-import org.apache.iceberg.AssertHelpers;
 import org.apache.iceberg.DataFile;
 import org.apache.iceberg.FileFormat;
 import org.apache.iceberg.PartitionSpec;
@@ -49,6 +48,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.types.Types;
 import org.apache.iceberg.util.DateTimeUtil;
+import org.assertj.core.api.Assertions;
 import org.junit.Assert;
 import org.junit.ClassRule;
 import org.junit.Rule;
@@ -355,27 +355,27 @@ public abstract class TestFlinkScan {
         expected,
         TestFixtures.SCHEMA);
 
-    AssertHelpers.assertThrows(
-        "START_SNAPSHOT_ID and START_TAG cannot both be set.",
-        Exception.class,
-        () ->
-            runWithOptions(
-                ImmutableMap.<String, String>builder()
-                    .put("start-tag", startTag)
-                    .put("end-tag", endTag)
-                    .put("start-snapshot-id", Long.toString(snapshotId1))
-                    .buildOrThrow()));
-
-    AssertHelpers.assertThrows(
-        "END_SNAPSHOT_ID and END_TAG cannot both be set.",
-        Exception.class,
-        () ->
-            runWithOptions(
-                ImmutableMap.<String, String>builder()
-                    .put("start-tag", startTag)
-                    .put("end-tag", endTag)
-                    .put("end-snapshot-id", Long.toString(snapshotId3))
-                    .buildOrThrow()));
+    Assertions.assertThatThrownBy(
+            () ->
+                runWithOptions(
+                    ImmutableMap.<String, String>builder()
+                        .put("start-tag", startTag)
+                        .put("end-tag", endTag)
+                        .put("start-snapshot-id", Long.toString(snapshotId1))
+                        .buildOrThrow()))
+        .isInstanceOf(Exception.class)
+        .hasMessage("START_SNAPSHOT_ID and START_TAG cannot both be set.");
+
+    Assertions.assertThatThrownBy(
+            () ->
+                runWithOptions(
+                    ImmutableMap.<String, String>builder()
+                        .put("start-tag", startTag)
+                        .put("end-tag", endTag)
+                        .put("end-snapshot-id", Long.toString(snapshotId3))
+                        .buildOrThrow()))
+        .isInstanceOf(Exception.class)
+        .hasMessage("END_SNAPSHOT_ID and END_TAG cannot both be set.");
   }
 
   @Test
diff --git a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkSourceConfig.java b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkSourceConfig.java
index 974b8539b3..1814ff8f85 100644
--- a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkSourceConfig.java
+++ b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkSourceConfig.java
@@ -20,8 +20,8 @@ package org.apache.iceberg.flink.source;
 
 import java.util.List;
 import org.apache.flink.types.Row;
-import org.apache.iceberg.AssertHelpers;
 import org.apache.iceberg.flink.FlinkReadOptions;
+import org.assertj.core.api.Assertions;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -31,14 +31,10 @@ public class TestFlinkSourceConfig extends TestFlinkTableSource {
   @Test
   public void testFlinkSessionConfig() {
     getTableEnv().getConfig().set(FlinkReadOptions.STREAMING_OPTION, true);
-    AssertHelpers.assertThrows(
-        "Should throw exception because of cannot set snapshot-id option for streaming reader",
-        IllegalArgumentException.class,
-        "Cannot set as-of-timestamp option for streaming reader",
-        () -> {
-          sql("SELECT * FROM %s /*+ OPTIONS('as-of-timestamp'='1')*/", TABLE);
-          return null;
-        });
+    Assertions.assertThatThrownBy(
+            () -> sql("SELECT * FROM %s /*+ OPTIONS('as-of-timestamp'='1')*/", TABLE))
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessage("Cannot set as-of-timestamp option for streaming reader");
   }
 
   @Test
diff --git a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkTableSource.java b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkTableSource.java
index d72f57dcea..ff14bc4062 100644
--- a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkTableSource.java
+++ b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkTableSource.java
@@ -25,7 +25,6 @@ import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.table.api.SqlParserException;
 import org.apache.flink.table.api.TableEnvironment;
 import org.apache.flink.types.Row;
-import org.apache.iceberg.AssertHelpers;
 import org.apache.iceberg.FileFormat;
 import org.apache.iceberg.events.Listeners;
 import org.apache.iceberg.events.ScanEvent;
@@ -103,10 +102,9 @@ public class TestFlinkTableSource extends FlinkTestBase {
   @Test
   public void testLimitPushDown() {
 
-    AssertHelpers.assertThrows(
-        "Invalid limit number: -1 ",
-        SqlParserException.class,
-        () -> sql("SELECT * FROM %s LIMIT -1", TABLE_NAME));
+    Assertions.assertThatThrownBy(() -> sql("SELECT * FROM %s LIMIT -1", TABLE_NAME))
+        .isInstanceOf(SqlParserException.class)
+        .hasMessageStartingWith("SQL parse failed.");
 
     Assert.assertEquals(
         "Should have 0 record", 0, sql("SELECT * FROM %s LIMIT 0", TABLE_NAME).size());
diff --git a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamScanSql.java b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamScanSql.java
index abcce11e36..633e11718b 100644
--- a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamScanSql.java
+++ b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamScanSql.java
@@ -30,7 +30,6 @@ import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
 import org.apache.flink.table.api.config.TableConfigOptions;
 import org.apache.flink.types.Row;
 import org.apache.flink.util.CloseableIterator;
-import org.apache.iceberg.AssertHelpers;
 import org.apache.iceberg.FileFormat;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.TestHelpers;
@@ -43,6 +42,7 @@ import org.apache.iceberg.flink.FlinkCatalogTestBase;
 import org.apache.iceberg.flink.MiniClusterResource;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.assertj.core.api.Assertions;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
@@ -220,14 +220,13 @@ public class TestStreamScanSql extends FlinkCatalogTestBase {
     Row row2 = Row.of(2, "bbb", "2021-01-01");
     insertRows(table, row1, row2);
 
-    AssertHelpers.assertThrows(
-        "Cannot scan table using ref for stream yet",
-        IllegalArgumentException.class,
-        "Cannot scan table using ref",
-        () ->
-            exec(
-                "SELECT * FROM %s /*+ OPTIONS('streaming'='true', 'monitor-interval'='1s', 'branch'='b1')*/",
-                TABLE));
+    Assertions.assertThatThrownBy(
+            () ->
+                exec(
+                    "SELECT * FROM %s /*+ OPTIONS('streaming'='true', 'monitor-interval'='1s', 'branch'='b1')*/",
+                    TABLE))
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessage("Cannot scan table using ref b1 configured for streaming reader yet");
   }
 
   @Test
@@ -307,14 +306,13 @@ public class TestStreamScanSql extends FlinkCatalogTestBase {
     }
     result.getJobClient().ifPresent(JobClient::cancel);
 
-    AssertHelpers.assertThrows(
-        "START_SNAPSHOT_ID and START_TAG cannot both be set.",
-        IllegalArgumentException.class,
-        "START_SNAPSHOT_ID and START_TAG cannot both be set.",
-        () ->
-            exec(
-                "SELECT * FROM %s /*+ OPTIONS('streaming'='true', 'monitor-interval'='1s', 'start-tag'='%s', "
-                    + "'start-snapshot-id'='%d' )*/",
-                TABLE, tagName, startSnapshotId));
+    Assertions.assertThatThrownBy(
+            () ->
+                exec(
+                    "SELECT * FROM %s /*+ OPTIONS('streaming'='true', 'monitor-interval'='1s', 'start-tag'='%s', "
+                        + "'start-snapshot-id'='%d' )*/",
+                    TABLE, tagName, startSnapshotId))
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessage("START_SNAPSHOT_ID and START_TAG cannot both be set.");
   }
 }
diff --git a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamingMonitorFunction.java b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamingMonitorFunction.java
index a161645979..1b9049f1bb 100644
--- a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamingMonitorFunction.java
+++ b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamingMonitorFunction.java
@@ -31,7 +31,6 @@ import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.types.Row;
-import org.apache.iceberg.AssertHelpers;
 import org.apache.iceberg.FileFormat;
 import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.Schema;
@@ -47,6 +46,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.types.Types;
 import org.apache.iceberg.util.SnapshotUtil;
 import org.apache.iceberg.util.ThreadPools;
+import org.assertj.core.api.Assertions;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
@@ -259,14 +259,9 @@ public class TestStreamingMonitorFunction extends TableTestBase {
             .maxPlanningSnapshotCount(0)
             .build();
 
-    AssertHelpers.assertThrows(
-        "Should throw exception because of invalid config",
-        IllegalArgumentException.class,
-        "must be greater than zero",
-        () -> {
-          createFunction(scanContext1);
-          return null;
-        });
+    Assertions.assertThatThrownBy(() -> createFunction(scanContext1))
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessage("The max-planning-snapshot-count must be greater than zero");
 
     ScanContext scanContext2 =
         ScanContext.builder()
@@ -274,14 +269,9 @@ public class TestStreamingMonitorFunction extends TableTestBase {
             .maxPlanningSnapshotCount(-10)
             .build();
 
-    AssertHelpers.assertThrows(
-        "Should throw exception because of invalid config",
-        IllegalArgumentException.class,
-        "must be greater than zero",
-        () -> {
-          createFunction(scanContext2);
-          return null;
-        });
+    Assertions.assertThatThrownBy(() -> createFunction(scanContext2))
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessage("The max-planning-snapshot-count must be greater than zero");
   }
 
   @Test
diff --git a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/enumerator/TestContinuousSplitPlannerImpl.java b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/enumerator/TestContinuousSplitPlannerImpl.java
index fef61bc048..2fa921b1f1 100644
--- a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/enumerator/TestContinuousSplitPlannerImpl.java
+++ b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/enumerator/TestContinuousSplitPlannerImpl.java
@@ -23,7 +23,6 @@ import java.util.List;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.stream.Collectors;
-import org.apache.iceberg.AssertHelpers;
 import org.apache.iceberg.DataFile;
 import org.apache.iceberg.FileFormat;
 import org.apache.iceberg.Snapshot;
@@ -37,6 +36,7 @@ import org.apache.iceberg.flink.source.StreamingStartingStrategy;
 import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
 import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.assertj.core.api.Assertions;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.ClassRule;
@@ -338,11 +338,9 @@ public class TestContinuousSplitPlannerImpl {
         new ContinuousSplitPlannerImpl(
             tableResource.tableLoader().clone(), scanContextWithInvalidSnapshotId, null);
 
-    AssertHelpers.assertThrows(
-        "Should detect invalid starting snapshot id",
-        IllegalArgumentException.class,
-        "Start snapshot id not found in history: 1",
-        () -> splitPlanner.planSplits(null));
+    Assertions.assertThatThrownBy(() -> splitPlanner.planSplits(null))
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessage("Start snapshot id not found in history: 1");
   }
 
   @Test
@@ -366,11 +364,9 @@ public class TestContinuousSplitPlannerImpl {
         new ContinuousSplitPlannerImpl(
             tableResource.tableLoader().clone(), scanContextWithInvalidSnapshotId, null);
 
-    AssertHelpers.assertThrows(
-        "Should detect invalid starting snapshot id",
-        IllegalArgumentException.class,
-        "Start snapshot id not found in history: " + invalidSnapshotId,
-        () -> splitPlanner.planSplits(null));
+    Assertions.assertThatThrownBy(() -> splitPlanner.planSplits(null))
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessage("Start snapshot id not found in history: " + invalidSnapshotId);
   }
 
   @Test
@@ -430,11 +426,9 @@ public class TestContinuousSplitPlannerImpl {
         new ContinuousSplitPlannerImpl(
             tableResource.tableLoader().clone(), scanContextWithInvalidSnapshotId, null);
 
-    AssertHelpers.assertThrows(
-        "Should detect invalid starting snapshot timestamp",
-        IllegalArgumentException.class,
-        "Cannot find a snapshot after: ",
-        () -> splitPlanner.planSplits(null));
+    Assertions.assertThatThrownBy(() -> splitPlanner.planSplits(null))
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessage("Cannot find a snapshot after: 1");
   }
 
   @Test
@@ -453,11 +447,9 @@ public class TestContinuousSplitPlannerImpl {
         new ContinuousSplitPlannerImpl(
             tableResource.tableLoader().clone(), scanContextWithInvalidSnapshotId, null);
 
-    AssertHelpers.assertThrows(
-        "Should detect invalid starting snapshot timestamp",
-        IllegalArgumentException.class,
-        "Cannot find a snapshot after: ",
-        () -> splitPlanner.planSplits(null));
+    Assertions.assertThatThrownBy(() -> splitPlanner.planSplits(null))
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessageStartingWith("Cannot find a snapshot after:");
   }
 
   @Test
diff --git a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/enumerator/TestContinuousSplitPlannerImplStartStrategy.java b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/enumerator/TestContinuousSplitPlannerImplStartStrategy.java
index 2df2846f7e..2c94f21590 100644
--- a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/enumerator/TestContinuousSplitPlannerImplStartStrategy.java
+++ b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/enumerator/TestContinuousSplitPlannerImplStartStrategy.java
@@ -20,7 +20,6 @@ package org.apache.iceberg.flink.source.enumerator;
 
 import java.io.IOException;
 import java.util.List;
-import org.apache.iceberg.AssertHelpers;
 import org.apache.iceberg.FileFormat;
 import org.apache.iceberg.Snapshot;
 import org.apache.iceberg.data.GenericAppenderHelper;
@@ -30,6 +29,7 @@ import org.apache.iceberg.flink.HadoopTableResource;
 import org.apache.iceberg.flink.TestFixtures;
 import org.apache.iceberg.flink.source.ScanContext;
 import org.apache.iceberg.flink.source.StreamingStartingStrategy;
+import org.assertj.core.api.Assertions;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Rule;
@@ -79,7 +79,7 @@ public class TestContinuousSplitPlannerImplStartStrategy {
             .startingStrategy(StreamingStartingStrategy.TABLE_SCAN_THEN_INCREMENTAL)
             .build();
 
-    // emtpy table
+    // empty table
     Assert.assertFalse(
         ContinuousSplitPlannerImpl.startSnapshot(tableResource.table(), scanContext).isPresent());
 
@@ -97,7 +97,7 @@ public class TestContinuousSplitPlannerImplStartStrategy {
             .startingStrategy(StreamingStartingStrategy.INCREMENTAL_FROM_LATEST_SNAPSHOT)
             .build();
 
-    // emtpy table
+    // empty table
     Assert.assertFalse(
         ContinuousSplitPlannerImpl.startSnapshot(tableResource.table(), scanContext).isPresent());
 
@@ -115,7 +115,7 @@ public class TestContinuousSplitPlannerImplStartStrategy {
             .startingStrategy(StreamingStartingStrategy.INCREMENTAL_FROM_EARLIEST_SNAPSHOT)
             .build();
 
-    // emtpy table
+    // empty table
     Assert.assertFalse(
         ContinuousSplitPlannerImpl.startSnapshot(tableResource.table(), scanContext).isPresent());
 
@@ -134,14 +134,13 @@ public class TestContinuousSplitPlannerImplStartStrategy {
             .startSnapshotId(1L)
             .build();
 
-    // emtpy table
-    AssertHelpers.assertThrows(
-        "Should detect invalid starting snapshot id",
-        IllegalArgumentException.class,
-        "Start snapshot id not found in history: 1",
-        () ->
-            ContinuousSplitPlannerImpl.startSnapshot(
-                tableResource.table(), scanContextInvalidSnapshotId));
+    // empty table
+    Assertions.assertThatThrownBy(
+            () ->
+                ContinuousSplitPlannerImpl.startSnapshot(
+                    tableResource.table(), scanContextInvalidSnapshotId))
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessage("Start snapshot id not found in history: 1");
 
     appendThreeSnapshots();
 
@@ -166,14 +165,13 @@ public class TestContinuousSplitPlannerImplStartStrategy {
             .startSnapshotTimestamp(1L)
             .build();
 
-    // emtpy table
-    AssertHelpers.assertThrows(
-        "Should detect invalid starting snapshot timestamp",
-        IllegalArgumentException.class,
-        "Cannot find a snapshot after: ",
-        () ->
-            ContinuousSplitPlannerImpl.startSnapshot(
-                tableResource.table(), scanContextInvalidSnapshotTimestamp));
+    // empty table
+    Assertions.assertThatThrownBy(
+            () ->
+                ContinuousSplitPlannerImpl.startSnapshot(
+                    tableResource.table(), scanContextInvalidSnapshotTimestamp))
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessageStartingWith("Cannot find a snapshot after: ");
 
     appendThreeSnapshots();