You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@paimon.apache.org by lz...@apache.org on 2023/03/21 09:03:38 UTC

[incubator-paimon] branch master updated: [spark] Upgrade spark version to 3.3.2 of spark-common (#675)

This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 9413ab020 [spark] Upgrade spark version to 3.3.2 of spark-common (#675)
9413ab020 is described below

commit 9413ab020f15287de83f14b0c42a15b48d1f1d3a
Author: yuzelin <33...@users.noreply.github.com>
AuthorDate: Tue Mar 21 17:03:31 2023 +0800

    [spark] Upgrade spark version to 3.3.2 of spark-common (#675)
---
 paimon-spark/paimon-spark-common/pom.xml           |   2 +-
 .../org/apache/paimon/spark/SparkReadITCase.java   | 263 ++++++-----
 .../org/apache/paimon/spark/SparkReadTestBase.java |  15 +
 .../paimon/spark/SparkSchemaEvolutionITCase.java   | 506 ++++++++-------------
 4 files changed, 330 insertions(+), 456 deletions(-)

diff --git a/paimon-spark/paimon-spark-common/pom.xml b/paimon-spark/paimon-spark-common/pom.xml
index 91c4d66b0..375f8471c 100644
--- a/paimon-spark/paimon-spark-common/pom.xml
+++ b/paimon-spark/paimon-spark-common/pom.xml
@@ -34,7 +34,7 @@ under the License.
     <name>Paimon : Spark : Common</name>
 
     <properties>
-        <spark.version>3.2.2</spark.version>
+        <spark.version>3.3.2</spark.version>
     </properties>
 
     <dependencies>
diff --git a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkReadITCase.java b/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkReadITCase.java
index 79cdc9ea0..16b7fd5cf 100644
--- a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkReadITCase.java
+++ b/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkReadITCase.java
@@ -22,6 +22,7 @@ import org.apache.paimon.fs.Path;
 import org.apache.paimon.fs.local.LocalFileIO;
 import org.apache.paimon.schema.TableSchema;
 import org.apache.paimon.table.FileStoreTableFactory;
+import org.apache.paimon.testutils.assertj.AssertionUtils;
 import org.apache.paimon.types.ArrayType;
 import org.apache.paimon.types.BigIntType;
 import org.apache.paimon.types.DataField;
@@ -53,41 +54,39 @@ public class SparkReadITCase extends SparkReadTestBase {
 
     @Test
     public void testNormal() {
-        innerTestSimpleType(spark.table("paimon.default.t1"));
+        innerTestSimpleType(spark.table("t1"));
 
-        innerTestNestedType(spark.table("paimon.default.t2"));
+        innerTestNestedType(spark.table("t2"));
     }
 
     @Test
     public void testFilterPushDown() {
-        innerTestSimpleTypeFilterPushDown(spark.table("paimon.default.t1"));
+        innerTestSimpleTypeFilterPushDown(spark.table("t1"));
 
-        innerTestNestedTypeFilterPushDown(spark.table("paimon.default.t2"));
+        innerTestNestedTypeFilterPushDown(spark.table("t2"));
     }
 
     @Test
     public void testCatalogNormal() {
-        innerTestSimpleType(spark.table("paimon.default.t1"));
-        innerTestNestedType(spark.table("paimon.default.t2"));
+        innerTestSimpleType(spark.table("t1"));
+        innerTestNestedType(spark.table("t2"));
     }
 
     @Test
     public void testSnapshotsTable() {
         List<Row> rows =
-                spark.table("paimon.default.`t1$snapshots`")
+                spark.table("`t1$snapshots`")
                         .select("snapshot_id", "schema_id", "commit_user", "commit_kind")
                         .collectAsList();
         assertThat(rows.toString()).isEqualTo("[[1,0,user,APPEND]]");
 
-        spark.sql("USE paimon");
         spark.sql(
-                "CREATE TABLE default.schemasTable (\n"
+                "CREATE TABLE schemasTable (\n"
                         + "a BIGINT,\n"
-                        + "b STRING) USING paimon\n"
-                        + "COMMENT 'table comment'\n"
+                        + "b STRING)\n"
                         + "TBLPROPERTIES ('primary-key' = 'a')");
-        spark.sql("ALTER TABLE default.schemasTable ADD COLUMN c STRING");
-        List<Row> schemas = spark.table("paimon.default.`schemasTable$schemas`").collectAsList();
+        spark.sql("ALTER TABLE schemasTable ADD COLUMN c STRING");
+        List<Row> schemas = spark.table("`schemasTable$schemas`").collectAsList();
         List<?> fieldsList = schemas.stream().map(row -> row.get(1)).collect(Collectors.toList());
         assertThat(fieldsList.stream().map(Object::toString).collect(Collectors.toList()))
                 .containsExactlyInAnyOrder(
@@ -101,7 +100,7 @@ public class SparkReadITCase extends SparkReadTestBase {
     @Test
     public void testSnapshotsTableWithRecordCount() {
         List<Row> rows =
-                spark.table("paimon.default.`t1$snapshots`")
+                spark.table("`t1$snapshots`")
                         .select(
                                 "snapshot_id",
                                 "total_record_count",
@@ -113,14 +112,13 @@ public class SparkReadITCase extends SparkReadTestBase {
 
     @Test
     public void testCatalogFilterPushDown() {
-        innerTestSimpleTypeFilterPushDown(spark.table("paimon.default.t1"));
+        innerTestSimpleTypeFilterPushDown(spark.table("t1"));
 
-        innerTestNestedTypeFilterPushDown(spark.table("paimon.default.t2"));
+        innerTestNestedTypeFilterPushDown(spark.table("t2"));
     }
 
     @Test
     public void testDefaultNamespace() {
-        spark.sql("USE paimon");
         assertThat(spark.sql("SHOW CURRENT NAMESPACE").collectAsList().toString())
                 .isEqualTo("[[paimon,default]]");
     }
@@ -128,12 +126,12 @@ public class SparkReadITCase extends SparkReadTestBase {
     @Test
     public void testCreateTable() {
         spark.sql(
-                "CREATE TABLE paimon.default.testCreateTable(\n"
+                "CREATE TABLE testCreateTable(\n"
                         + "a BIGINT,\n"
                         + "b VARCHAR(10),\n"
                         + "c CHAR(10))");
         assertThat(
-                        spark.sql("SELECT fields FROM paimon.default.`testCreateTable$schemas`")
+                        spark.sql("SELECT fields FROM `testCreateTable$schemas`")
                                 .collectAsList()
                                 .toString())
                 .isEqualTo(
@@ -146,101 +144,87 @@ public class SparkReadITCase extends SparkReadTestBase {
     @Test
     public void testCreateTableAs() {
         spark.sql(
-                "CREATE TABLE default.testCreateTable(\n"
+                "CREATE TABLE testCreateTable(\n"
                         + "a BIGINT,\n"
                         + "b VARCHAR(10),\n"
                         + "c CHAR(10))");
-        spark.sql("INSERT INTO default.testCreateTable VALUES(1,'a','b')");
-        spark.sql(
-                "CREATE TABLE default.testCreateTableAs AS SELECT * FROM default.testCreateTable");
-        List<Row> result = spark.sql("SELECT * FROM default.testCreateTableAs").collectAsList();
+        spark.sql("INSERT INTO testCreateTable VALUES(1,'a','b')");
+        spark.sql("CREATE TABLE testCreateTableAs AS SELECT * FROM testCreateTable");
+        List<Row> result = spark.sql("SELECT * FROM testCreateTableAs").collectAsList();
 
         assertThat(result.stream().map(Row::toString)).containsExactlyInAnyOrder("[1,a,b]");
 
         // partitioned table
         spark.sql(
-                "CREATE TABLE default.partitionedTable (\n"
+                "CREATE TABLE partitionedTable (\n"
                         + "a BIGINT,\n"
-                        + "b STRING,"
-                        + "c STRING) USING paimon\n"
-                        + "COMMENT 'table comment'\n"
+                        + "b STRING,\n"
+                        + "c STRING)\n"
                         + "PARTITIONED BY (a,b)");
-        spark.sql("INSERT INTO default.partitionedTable VALUES(1,'aaa','bbb')");
+        spark.sql("INSERT INTO partitionedTable VALUES(1,'aaa','bbb')");
         spark.sql(
-                "CREATE TABLE default.partitionedTableAs PARTITIONED BY (a) AS SELECT * FROM default.partitionedTable");
-        assertThat(
-                        spark.sql("SHOW CREATE TABLE default.partitionedTableAs")
-                                .collectAsList()
-                                .toString())
+                "CREATE TABLE partitionedTableAs PARTITIONED BY (a) AS SELECT * FROM partitionedTable");
+        assertThat(spark.sql("SHOW CREATE TABLE partitionedTableAs").collectAsList().toString())
                 .isEqualTo(
                         String.format(
-                                "[[CREATE TABLE partitionedTableAs (\n"
-                                        + "  `a` BIGINT,\n"
-                                        + "  `b` STRING,\n"
-                                        + "  `c` STRING)\n"
+                                "[[%s"
                                         + "PARTITIONED BY (a)\n"
-                                        + "TBLPROPERTIES(\n"
+                                        + "TBLPROPERTIES (\n"
                                         + "  'path' = '%s')\n"
                                         + "]]",
+                                showCreateString(
+                                        "partitionedTableAs", "a BIGINT", "b STRING", "c STRING"),
                                 new Path(warehousePath, "default.db/partitionedTableAs")));
-        List<Row> resultPartition =
-                spark.sql("SELECT * FROM default.partitionedTableAs").collectAsList();
+        List<Row> resultPartition = spark.sql("SELECT * FROM partitionedTableAs").collectAsList();
         assertThat(resultPartition.stream().map(Row::toString))
                 .containsExactlyInAnyOrder("[1,aaa,bbb]");
 
         // change TBLPROPERTIES
         spark.sql(
-                "CREATE TABLE default.testTable(\n"
+                "CREATE TABLE testTable(\n"
                         + "a BIGINT,\n"
                         + "b VARCHAR(10),\n"
-                        + "c CHAR(10))"
-                        + " TBLPROPERTIES("
-                        + " 'file.format' = 'orc'"
+                        + "c CHAR(10))\n"
+                        + " TBLPROPERTIES(\n"
+                        + " 'file.format' = 'orc'\n"
                         + ")");
-        spark.sql("INSERT INTO default.testTable VALUES(1,'a','b')");
+        spark.sql("INSERT INTO testTable VALUES(1,'a','b')");
         spark.sql(
-                "CREATE TABLE default.testTableAs TBLPROPERTIES ('file.format' = 'parquet') AS SELECT * FROM default.testTable");
-        assertThat(spark.sql("SHOW CREATE TABLE default.testTableAs").collectAsList().toString())
+                "CREATE TABLE testTableAs TBLPROPERTIES ('file.format' = 'parquet') AS SELECT * FROM testTable");
+        assertThat(spark.sql("SHOW CREATE TABLE testTableAs").collectAsList().toString())
                 .isEqualTo(
                         String.format(
-                                "[[CREATE TABLE testTableAs (\n"
-                                        + "  `a` BIGINT,\n"
-                                        + "  `b` STRING,\n"
-                                        + "  `c` STRING)\n"
-                                        + "TBLPROPERTIES(\n"
+                                "[[%s"
+                                        + "TBLPROPERTIES (\n"
                                         + "  'file.format' = 'parquet',\n"
                                         + "  'path' = '%s')\n"
                                         + "]]",
+                                showCreateString("testTableAs", "a BIGINT", "b STRING", "c STRING"),
                                 new Path(warehousePath, "default.db/testTableAs")));
-        List<Row> resultProp = spark.sql("SELECT * FROM default.testTableAs").collectAsList();
+        List<Row> resultProp = spark.sql("SELECT * FROM testTableAs").collectAsList();
 
         assertThat(resultProp.stream().map(Row::toString)).containsExactlyInAnyOrder("[1,a,b]");
 
         // primary key
         spark.sql(
-                "CREATE TABLE default.t_pk (\n"
+                "CREATE TABLE t_pk (\n"
                         + "a BIGINT,\n"
                         + "b STRING,\n"
                         + "c STRING\n"
-                        + ") TBLPROPERTIES ("
-                        + "  'primary-key' = 'a,b'"
+                        + ") TBLPROPERTIES (\n"
+                        + "  'primary-key' = 'a,b'\n"
                         + ")\n"
                         + "COMMENT 'table comment'");
-        spark.sql("INSERT INTO default.t_pk VALUES(1,'aaa','bbb')");
-        spark.sql(
-                "CREATE TABLE default.t_pk_as TBLPROPERTIES ('primary-key' = 'a') AS SELECT * FROM default.t_pk");
-        assertThat(spark.sql("SHOW CREATE TABLE  default.t_pk_as").collectAsList().toString())
+        spark.sql("INSERT INTO t_pk VALUES(1,'aaa','bbb')");
+        spark.sql("CREATE TABLE t_pk_as TBLPROPERTIES ('primary-key' = 'a') AS SELECT * FROM t_pk");
+        assertThat(spark.sql("SHOW CREATE TABLE t_pk_as").collectAsList().toString())
                 .isEqualTo(
                         String.format(
-                                "[[CREATE TABLE t_pk_as (\n"
-                                        + "  `a` BIGINT NOT NULL,\n"
-                                        + "  `b` STRING,\n"
-                                        + "  `c` STRING)\n"
-                                        + "TBLPROPERTIES(\n"
-                                        + "  'path' = '%s')\n"
-                                        + "]]",
+                                "[[%sTBLPROPERTIES (\n  'path' = '%s')\n]]",
+                                showCreateString(
+                                        "t_pk_as", "a BIGINT NOT NULL", "b STRING", "c STRING"),
                                 new Path(warehousePath, "default.db/t_pk_as")));
-        List<Row> resultPk = spark.sql("SELECT * FROM default.t_pk_as").collectAsList();
+        List<Row> resultPk = spark.sql("SELECT * FROM t_pk_as").collectAsList();
 
         assertThat(resultPk.stream().map(Row::toString)).containsExactlyInAnyOrder("[1,aaa,bbb]");
 
@@ -257,34 +241,34 @@ public class SparkReadITCase extends SparkReadTestBase {
                         + ")");
         spark.sql("INSERT INTO t_all VALUES(1,2,'bbb','2020-01-01','12')");
         spark.sql(
-                "CREATE TABLE default.t_all_as PARTITIONED BY (dt) TBLPROPERTIES ('primary-key' = 'dt,hh') AS SELECT * FROM default.t_all");
-        assertThat(spark.sql("SHOW CREATE TABLE default.t_all_as").collectAsList().toString())
+                "CREATE TABLE t_all_as PARTITIONED BY (dt) TBLPROPERTIES ('primary-key' = 'dt,hh') AS SELECT * FROM t_all");
+        assertThat(spark.sql("SHOW CREATE TABLE t_all_as").collectAsList().toString())
                 .isEqualTo(
                         String.format(
-                                "[[CREATE TABLE t_all_as (\n"
-                                        + "  `user_id` BIGINT,\n"
-                                        + "  `item_id` BIGINT,\n"
-                                        + "  `behavior` STRING,\n"
-                                        + "  `dt` STRING NOT NULL,\n"
-                                        + "  `hh` STRING NOT NULL)\n"
+                                "[[%s"
                                         + "PARTITIONED BY (dt)\n"
-                                        + "TBLPROPERTIES(\n"
+                                        + "TBLPROPERTIES (\n"
                                         + "  'path' = '%s')\n"
                                         + "]]",
+                                showCreateString(
+                                        "t_all_as",
+                                        "user_id BIGINT",
+                                        "item_id BIGINT",
+                                        "behavior STRING",
+                                        "dt STRING NOT NULL",
+                                        "hh STRING NOT NULL"),
                                 new Path(warehousePath, "default.db/t_all_as")));
-        List<Row> resultAll = spark.sql("SELECT * FROM default.t_all_as").collectAsList();
+        List<Row> resultAll = spark.sql("SELECT * FROM t_all_as").collectAsList();
         assertThat(resultAll.stream().map(Row::toString))
                 .containsExactlyInAnyOrder("[1,2,bbb,2020-01-01,12]");
     }
 
     @Test
     public void testCreateTableWithNullablePk() {
-        spark.sql("USE paimon");
         spark.sql(
-                "CREATE TABLE default.PkTable (\n"
+                "CREATE TABLE PkTable (\n"
                         + "a BIGINT,\n"
-                        + "b STRING) USING paimon\n"
-                        + "COMMENT 'table comment'\n"
+                        + "b STRING)\n"
                         + "TBLPROPERTIES ('primary-key' = 'a')");
         Path tablePath = new Path(warehousePath, "default.db/PkTable");
         TableSchema schema = FileStoreTableFactory.create(LocalFileIO.create(), tablePath).schema();
@@ -293,31 +277,58 @@ public class SparkReadITCase extends SparkReadTestBase {
 
     @Test
     public void testDescribeTable() {
-        spark.sql("USE paimon");
         spark.sql(
-                "CREATE TABLE default.PartitionedTable (\n"
+                "CREATE TABLE PartitionedTable (\n"
                         + "a BIGINT,\n"
-                        + "b STRING) USING paimon\n"
-                        + "COMMENT 'table comment'\n"
-                        + "PARTITIONED BY (a)\n"
-                        + "TBLPROPERTIES ('foo' = 'bar')");
-        assertThat(spark.sql("DESCRIBE default.PartitionedTable").collectAsList().toString())
+                        + "b STRING)\n"
+                        + "PARTITIONED BY (a)\n");
+        assertThat(spark.sql("DESCRIBE PartitionedTable").collectAsList().toString())
                 .isEqualTo("[[a,bigint,], [b,string,], [,,], [# Partitioning,,], [Part 0,a,]]");
     }
 
+    @Test
+    public void testShowCreateTable() {
+        spark.sql(
+                "CREATE TABLE tbl (\n"
+                        + "  a INT COMMENT 'a comment',\n"
+                        + "  b STRING\n"
+                        + ") USING paimon\n"
+                        + "PARTITIONED BY (b)\n"
+                        + "COMMENT 'tbl comment'\n"
+                        + "TBLPROPERTIES (\n"
+                        + "  'primary-key' = 'a,b',\n"
+                        + "  'k1' = 'v1'\n"
+                        + ")");
+
+        assertThat(spark.sql("SHOW CREATE TABLE tbl").collectAsList().toString())
+                .isEqualTo(
+                        String.format(
+                                "[[%s"
+                                        + "USING paimon\n"
+                                        + "PARTITIONED BY (b)\n"
+                                        + "COMMENT 'tbl comment'\n"
+                                        + "TBLPROPERTIES (\n"
+                                        + "  'k1' = 'v1',\n"
+                                        + "  'path' = '%s')\n]]",
+                                showCreateString(
+                                        "tbl",
+                                        "a INT NOT NULL COMMENT 'a comment'",
+                                        "b STRING NOT NULL"),
+                                new Path(warehousePath, "default.db/tbl")));
+    }
+
     @Test
     public void testShowTableProperties() {
-        spark.sql("USE paimon");
         spark.sql(
-                "CREATE TABLE default.tbl (\n"
-                        + "a INT\n"
-                        + ") TBLPROPERTIES (\n"
-                        + "'k1' = 'v1',\n"
-                        + "'k2' = 'v2'"
+                "CREATE TABLE tbl (\n"
+                        + "  a INT)\n"
+                        + "TBLPROPERTIES (\n"
+                        + "  'k1' = 'v1',\n"
+                        + "  'k2' = 'v2'\n"
                         + ")");
 
         assertThat(
-                        spark.sql("SHOW TBLPROPERTIES default.tbl").collectAsList().stream()
+                        spark.sql("SHOW TBLPROPERTIES tbl").collectAsList().stream()
                                 .map(Row::toString)
                                 .collect(Collectors.toList()))
                 .contains("[k1,v1]", "[k2,v2]");
@@ -325,16 +336,14 @@ public class SparkReadITCase extends SparkReadTestBase {
 
     @Test
     public void testCreateTableWithInvalidPk() {
-        spark.sql("USE paimon");
         assertThatThrownBy(
                         () ->
                                 spark.sql(
-                                        "CREATE TABLE default.PartitionedPkTable (\n"
+                                        "CREATE TABLE PartitionedPkTable (\n"
                                                 + "a BIGINT,\n"
                                                 + "b STRING,\n"
-                                                + "c DOUBLE) USING paimon\n"
-                                                + "COMMENT 'table comment'\n"
-                                                + "PARTITIONED BY (b)"
+                                                + "c DOUBLE)\n"
+                                                + "PARTITIONED BY (b)\n"
                                                 + "TBLPROPERTIES ('primary-key' = 'a')"))
                 .isInstanceOf(IllegalStateException.class)
                 .hasMessageContaining(
@@ -352,7 +361,7 @@ public class SparkReadITCase extends SparkReadTestBase {
                                                 + "b STRING,\n"
                                                 + "c DOUBLE) USING paimon\n"
                                                 + "COMMENT 'table comment'\n"
-                                                + "PARTITIONED BY (b)"
+                                                + "PARTITIONED BY (b)\n"
                                                 + "TBLPROPERTIES ('primary-key' = 'd')"))
                 .isInstanceOf(IllegalStateException.class)
                 .hasMessageContaining(
@@ -361,23 +370,21 @@ public class SparkReadITCase extends SparkReadTestBase {
 
     @Test
     public void testCreateTableWithNonexistentPartition() {
-        spark.sql("USE paimon");
         assertThatThrownBy(
                         () ->
                                 spark.sql(
-                                        "CREATE TABLE default.PartitionedPkTable (\n"
+                                        "CREATE TABLE PartitionedPkTable (\n"
                                                 + "a BIGINT,\n"
                                                 + "b STRING,\n"
-                                                + "c DOUBLE) USING paimon\n"
-                                                + "COMMENT 'table comment'\n"
-                                                + "PARTITIONED BY (d)"
+                                                + "c DOUBLE)\n"
+                                                + "PARTITIONED BY (d)\n"
                                                 + "TBLPROPERTIES ('primary-key' = 'a')"))
                 .isInstanceOf(AnalysisException.class)
                 .hasMessageContaining("Couldn't find column d");
     }
 
     @Test
-    public void testCreateAndDropTable() throws Exception {
+    public void testCreateAndDropTable() {
         innerTest("MyTable1", true, true, false);
         innerTest("MyTable2", true, false, false);
         innerTest("MyTable3", false, false, false);
@@ -388,7 +395,6 @@ public class SparkReadITCase extends SparkReadTestBase {
 
     private void innerTest(
             String tableName, boolean hasPk, boolean partitioned, boolean appendOnly) {
-        spark.sql("USE paimon");
         String ddlTemplate =
                 "CREATE TABLE default.%s (\n"
                         + "order_id BIGINT NOT NULL comment 'order_id',\n"
@@ -396,7 +402,7 @@ public class SparkReadITCase extends SparkReadTestBase {
                         + "coupon_info ARRAY<STRING> NOT NULL COMMENT 'coupon_info',\n"
                         + "order_amount DOUBLE NOT NULL COMMENT 'order_amount',\n"
                         + "dt STRING NOT NULL COMMENT 'dt',\n"
-                        + "hh STRING NOT NULL COMMENT 'hh') USING paimon\n"
+                        + "hh STRING NOT NULL COMMENT 'hh')\n"
                         + "COMMENT 'table comment'\n"
                         + "%s\n"
                         + "TBLPROPERTIES (%s)";
@@ -454,7 +460,7 @@ public class SparkReadITCase extends SparkReadTestBase {
                         () ->
                                 spark.sql(
                                         String.format(
-                                                "ALTER TABLE default.%s UNSET TBLPROPERTIES('primary-key')",
+                                                "ALTER TABLE %s UNSET TBLPROPERTIES('primary-key')",
                                                 tableName)))
                 .isInstanceOf(UnsupportedOperationException.class)
                 .hasMessageContaining("Alter primary key is not supported");
@@ -462,11 +468,12 @@ public class SparkReadITCase extends SparkReadTestBase {
                         () ->
                                 spark.sql(
                                         String.format(
-                                                "ALTER TABLE default.%s SET TBLPROPERTIES('write-mode' = 'append-only')",
+                                                "ALTER TABLE %s SET TBLPROPERTIES('write-mode' = 'append-only')",
                                                 tableName)))
-                .getRootCause()
-                .isInstanceOf(UnsupportedOperationException.class)
-                .hasMessageContaining("Change 'write-mode' is not supported yet");
+                .satisfies(
+                        AssertionUtils.anyCauseMatches(
+                                UnsupportedOperationException.class,
+                                "Change 'write-mode' is not supported yet"));
 
         Path tablePath = new Path(warehousePath, String.format("default.db/%s", tableName));
         TableSchema schema = FileStoreTableFactory.create(LocalFileIO.create(), tablePath).schema();
@@ -520,7 +527,7 @@ public class SparkReadITCase extends SparkReadTestBase {
                                 .toString())
                 .isEqualTo(String.format("[[default,%s]]", tableName));
 
-        spark.sql(String.format("DROP TABLE paimon.default.%s", tableName));
+        spark.sql(String.format("DROP TABLE %s", tableName));
 
         assertThat(
                         spark.sql(
@@ -538,7 +545,6 @@ public class SparkReadITCase extends SparkReadTestBase {
     @Test
     public void testCreateAndDropNamespace() {
         // create namespace
-        spark.sql("USE paimon");
         spark.sql("CREATE NAMESPACE bar");
 
         assertThatThrownBy(() -> spark.sql("CREATE NAMESPACE bar"))
@@ -631,19 +637,12 @@ public class SparkReadITCase extends SparkReadTestBase {
     @Test
     public void testCreateNestedField() {
         spark.sql(
-                "CREATE TABLE paimon.default.nested_table ( a INT, b STRUCT<b1: STRUCT<b11: INT, b12 INT>, b2 BIGINT>)");
-        assertThat(
-                        spark.sql("SHOW CREATE TABLE paimon.default.nested_table")
-                                .collectAsList()
-                                .toString())
-                .isEqualTo(
-                        String.format(
-                                "[[CREATE TABLE nested_table (\n"
-                                        + "  `a` INT,\n"
-                                        + "  `b` STRUCT<`b1`: STRUCT<`b11`: INT, `b12`: INT>, `b2`: BIGINT>)\n"
-                                        + "TBLPROPERTIES(\n"
-                                        + "  'path' = '%s')\n"
-                                        + "]]",
-                                new Path(warehousePath, "default.db/nested_table")));
+                "CREATE TABLE nested_table ( a INT, b STRUCT<b1: STRUCT<b11: INT, b12 INT>, b2 BIGINT>)");
+        assertThat(spark.sql("SHOW CREATE TABLE nested_table").collectAsList().toString())
+                .contains(
+                        showCreateString(
+                                "nested_table",
+                                "a INT",
+                                "b STRUCT<b1: STRUCT<b11: INT, b12: INT>, b2: BIGINT>"));
     }
 }
diff --git a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkReadTestBase.java b/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkReadTestBase.java
index 36454f6f2..0519dfb5b 100644
--- a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkReadTestBase.java
+++ b/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkReadTestBase.java
@@ -40,8 +40,10 @@ import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.io.TempDir;
 
+import java.util.Arrays;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
 
 import static org.assertj.core.api.Assertions.assertThat;
 
@@ -186,4 +188,17 @@ public abstract class SparkReadTestBase {
                         "INSERT INTO paimon.default.%s VALUES %s",
                         tableName, StringUtils.join(values, ",")));
     }
+
+    // return of 'SHOW CREATE TABLE' excluding TBLPROPERTIES
+    protected String showCreateString(String table, String... fieldSpec) {
+        return String.format(
+                "CREATE TABLE %s (%s)\n",
+                table,
+                Arrays.stream(fieldSpec).map(s -> "\n  " + s).collect(Collectors.joining(",")));
+    }
+
+    // default schema
+    protected String defaultShowCreateString(String table) {
+        return showCreateString(table, "a INT NOT NULL", "b BIGINT", "c STRING");
+    }
 }
diff --git a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkSchemaEvolutionITCase.java b/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkSchemaEvolutionITCase.java
index bba270ba3..de6387991 100644
--- a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkSchemaEvolutionITCase.java
+++ b/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkSchemaEvolutionITCase.java
@@ -18,6 +18,8 @@
 
 package org.apache.paimon.spark;
 
+import org.apache.paimon.testutils.assertj.AssertionUtils;
+
 import org.apache.commons.lang3.StringUtils;
 import org.apache.spark.sql.AnalysisException;
 import org.apache.spark.sql.Dataset;
@@ -37,21 +39,18 @@ public class SparkSchemaEvolutionITCase extends SparkReadTestBase {
 
     @Test
     public void testSetAndRemoveOption() {
-        spark.sql("ALTER TABLE paimon.default.t1 SET TBLPROPERTIES('xyc' 'unknown1')");
+        spark.sql("ALTER TABLE t1 SET TBLPROPERTIES('xyc' 'unknown1')");
 
         Map<String, String> options =
-                rowsToMap(spark.sql("SELECT * FROM paimon.default.`t1$options`").collectAsList());
+                rowsToMap(spark.sql("SELECT * FROM `t1$options`").collectAsList());
         assertThat(options).containsEntry("xyc", "unknown1");
 
-        spark.sql("ALTER TABLE paimon.default.t1 UNSET TBLPROPERTIES('xyc')");
+        spark.sql("ALTER TABLE t1 UNSET TBLPROPERTIES('xyc')");
 
-        options = rowsToMap(spark.sql("SELECT * FROM paimon.default.`t1$options`").collectAsList());
+        options = rowsToMap(spark.sql("SELECT * FROM `t1$options`").collectAsList());
         assertThat(options).doesNotContainKey("xyc");
 
-        assertThatThrownBy(
-                        () ->
-                                spark.sql(
-                                        "ALTER TABLE paimon.default.t1 SET TBLPROPERTIES('primary-key' = 'a')"))
+        assertThatThrownBy(() -> spark.sql("ALTER TABLE t1 SET TBLPROPERTIES('primary-key' = 'a')"))
                 .isInstanceOf(UnsupportedOperationException.class)
                 .hasMessageContaining("Alter primary key is not supported");
     }
@@ -68,38 +67,36 @@ public class SparkSchemaEvolutionITCase extends SparkReadTestBase {
         createTable("testAddColumn");
         writeTable("testAddColumn", "(1, 2L, '1')", "(5, 6L, '3')");
 
-        spark.sql("ALTER TABLE paimon.default.testAddColumn ADD COLUMN d STRING");
-
-        Dataset<Row> table = spark.table("paimon.default.testAddColumn");
-        List<Row> results = table.collectAsList();
-        assertThat(results.toString()).isEqualTo("[[1,2,1,null], [5,6,3,null]]");
+        List<Row> beforeAdd = spark.sql("SHOW CREATE TABLE testAddColumn").collectAsList();
+        assertThat(beforeAdd.toString()).contains(defaultShowCreateString("testAddColumn"));
 
-        results = table.select("a", "c").collectAsList();
-        assertThat(results.toString()).isEqualTo("[[1,1], [5,3]]");
+        spark.sql("ALTER TABLE testAddColumn ADD COLUMN d STRING");
 
-        results = table.groupBy().sum("b").collectAsList();
-        assertThat(results.toString()).isEqualTo("[[8]]");
+        List<Row> afterAdd = spark.sql("SHOW CREATE TABLE testAddColumn").collectAsList();
+        assertThat(afterAdd.toString())
+                .contains(
+                        showCreateString(
+                                "testAddColumn",
+                                "a INT NOT NULL",
+                                "b BIGINT",
+                                "c STRING",
+                                "d STRING"));
+
+        assertThat(spark.table("testAddColumn").collectAsList().toString())
+                .isEqualTo("[[1,2,1,null], [5,6,3,null]]");
     }
 
     @Test
     public void testAddNotNullColumn() {
         createTable("testAddNotNullColumn");
 
-        List<Row> beforeAdd =
-                spark.sql("SHOW CREATE TABLE paimon.default.testAddNotNullColumn").collectAsList();
-        assertThat(beforeAdd.toString())
-                .isEqualTo(
-                        "[[CREATE TABLE testAddNotNullColumn (\n"
-                                + "  `a` INT NOT NULL,\n"
-                                + "  `b` BIGINT,\n"
-                                + "  `c` STRING)\n"
-                                + buildTableProperties("default.db/testAddNotNullColumn")
-                                + "]]");
+        List<Row> beforeAdd = spark.sql("SHOW CREATE TABLE testAddNotNullColumn").collectAsList();
+        assertThat(beforeAdd.toString()).contains(defaultShowCreateString("testAddNotNullColumn"));
 
         assertThatThrownBy(
                         () ->
                                 spark.sql(
-                                        "ALTER TABLE paimon.default.testAddNotNullColumn ADD COLUMNS (d INT NOT NULL)"))
+                                        "ALTER TABLE testAddNotNullColumn ADD COLUMNS (d INT NOT NULL)"))
                 .isInstanceOf(RuntimeException.class)
                 .hasMessage(
                         "java.lang.IllegalArgumentException: ADD COLUMN cannot specify NOT NULL.");
@@ -113,22 +110,24 @@ public class SparkSchemaEvolutionITCase extends SparkReadTestBase {
                 spark.sql("SHOW CREATE TABLE testAddColumnPositionFirst").collectAsList();
         assertThat(result.toString())
                 .contains(
-                        "CREATE TABLE testAddColumnPositionFirst (\n"
-                                + "  `d` INT,\n"
-                                + "  `a` INT NOT NULL,\n"
-                                + "  `b` BIGINT,\n"
-                                + "  `c` STRING)");
+                        showCreateString(
+                                "testAddColumnPositionFirst",
+                                "d INT",
+                                "a INT NOT NULL",
+                                "b BIGINT",
+                                "c STRING"));
 
         createTable("testAddColumnPositionAfter");
         spark.sql("ALTER TABLE testAddColumnPositionAfter ADD COLUMN d INT AFTER b");
         result = spark.sql("SHOW CREATE TABLE testAddColumnPositionAfter").collectAsList();
         assertThat(result.toString())
                 .contains(
-                        "CREATE TABLE testAddColumnPositionAfter (\n"
-                                + "  `a` INT NOT NULL,\n"
-                                + "  `b` BIGINT,\n"
-                                + "  `d` INT,\n"
-                                + "  `c` STRING)");
+                        showCreateString(
+                                "testAddColumnPositionAfter",
+                                "a INT NOT NULL",
+                                "b BIGINT",
+                                "d INT",
+                                "c STRING"));
     }
 
     @Test
@@ -147,15 +146,8 @@ public class SparkSchemaEvolutionITCase extends SparkReadTestBase {
         assertThat(tables.stream().map(Row::toString))
                 .containsExactlyInAnyOrder("[default,t2,false]", "[default,t3,false]");
 
-        List<Row> afterRename = spark.sql("SHOW CREATE TABLE paimon.default.t3").collectAsList();
-        assertThat(afterRename.toString())
-                .isEqualTo(
-                        "[[CREATE TABLE t3 (\n"
-                                + "  `a` INT NOT NULL,\n"
-                                + "  `b` BIGINT,\n"
-                                + "  `c` STRING)\n"
-                                + buildTableProperties("default.db/t3")
-                                + "]]");
+        List<Row> afterRename = spark.sql("SHOW CREATE TABLE t3").collectAsList();
+        assertThat(afterRename.toString()).contains(defaultShowCreateString("t3"));
 
         List<Row> data = spark.sql("SELECT * FROM t3").collectAsList();
         assertThat(data.toString()).isEqualTo("[[1,2,1], [5,6,3]]");
@@ -166,78 +158,45 @@ public class SparkSchemaEvolutionITCase extends SparkReadTestBase {
         createTable("testRenameColumn");
         writeTable("testRenameColumn", "(1, 2L, '1')", "(5, 6L, '3')");
 
-        List<Row> beforeRename =
-                spark.sql("SHOW CREATE TABLE paimon.default.testRenameColumn").collectAsList();
-        assertThat(beforeRename.toString())
-                .isEqualTo(
-                        "[[CREATE TABLE testRenameColumn (\n"
-                                + "  `a` INT NOT NULL,\n"
-                                + "  `b` BIGINT,\n"
-                                + "  `c` STRING)\n"
-                                + buildTableProperties("default.db/testRenameColumn")
-                                + "]]");
-        Dataset<Row> table1 = spark.table("paimon.default.testRenameColumn");
-        List<Row> results = table1.select("a", "c").collectAsList();
+        List<Row> beforeRename = spark.sql("SHOW CREATE TABLE testRenameColumn").collectAsList();
+        assertThat(beforeRename.toString()).contains(defaultShowCreateString("testRenameColumn"));
+        List<Row> results = spark.table("testRenameColumn").select("a", "c").collectAsList();
         assertThat(results.toString()).isEqualTo("[[1,1], [5,3]]");
 
         // Rename "a" to "aa"
-        spark.sql("ALTER TABLE paimon.default.testRenameColumn RENAME COLUMN a to aa");
-        List<Row> afterRename =
-                spark.sql("SHOW CREATE TABLE paimon.default.testRenameColumn").collectAsList();
+        spark.sql("ALTER TABLE testRenameColumn RENAME COLUMN a to aa");
+        List<Row> afterRename = spark.sql("SHOW CREATE TABLE testRenameColumn").collectAsList();
         assertThat(afterRename.toString())
-                .isEqualTo(
-                        "[[CREATE TABLE testRenameColumn (\n"
-                                + "  `aa` INT NOT NULL,\n"
-                                + "  `b` BIGINT,\n"
-                                + "  `c` STRING)\n"
-                                + buildTableProperties("default.db/testRenameColumn")
-                                + "]]");
-        Dataset<Row> table2 = spark.table("paimon.default.testRenameColumn");
-        results = table2.select("aa", "c").collectAsList();
+                .contains(
+                        showCreateString(
+                                "testRenameColumn", "aa INT NOT NULL", "b BIGINT", "c STRING"));
+        Dataset<Row> table = spark.table("testRenameColumn");
+        results = table.select("aa", "c").collectAsList();
         assertThat(results.toString()).isEqualTo("[[1,1], [5,3]]");
-        assertThatThrownBy(() -> table2.select("a", "c").collectAsList())
+        assertThatThrownBy(() -> table.select("a", "c"))
                 .isInstanceOf(AnalysisException.class)
-                .hasMessageContaining("cannot resolve '%s' given input columns", "a");
+                .hasMessageContaining(
+                        "Column 'a' does not exist. Did you mean one of the following? "
+                                + "[paimon.default.testRenameColumn.b, paimon.default.testRenameColumn.c, paimon.default.testRenameColumn.aa]");
     }
 
     @Test
     public void testRenamePartitionKey() {
-        spark.sql("USE paimon");
         spark.sql(
-                "CREATE TABLE default.testRenamePartitionKey (\n"
+                "CREATE TABLE testRenamePartitionKey (\n"
                         + "a BIGINT,\n"
-                        + "b STRING) USING paimon\n"
-                        + "COMMENT 'table comment'\n"
-                        + "PARTITIONED BY (a)\n"
-                        + "TBLPROPERTIES ('foo' = 'bar')");
-
+                        + "b STRING)\n"
+                        + "PARTITIONED BY (a)\n");
         List<Row> beforeRename =
-                spark.sql("SHOW CREATE TABLE paimon.default.testRenamePartitionKey")
-                        .collectAsList();
+                spark.sql("SHOW CREATE TABLE testRenamePartitionKey").collectAsList();
         assertThat(beforeRename.toString())
-                .isEqualTo(
-                        "[[CREATE TABLE testRenamePartitionKey (\n"
-                                + "  `a` BIGINT,\n"
-                                + "  `b` STRING)\n"
-                                + "USING paimon\n"
-                                + "PARTITIONED BY (a)\n"
-                                + "COMMENT 'table comment'\n"
-                                + "TBLPROPERTIES(\n"
-                                + "  'foo' = 'bar',\n"
-                                + String.format(
-                                        "  'path' = '%s/%s')\n",
-                                        warehousePath, "default.db/testRenamePartitionKey")
-                                + "]]");
+                .contains(showCreateString("testRenamePartitionKey", "a BIGINT", "b STRING"));
 
         assertThatThrownBy(
-                        () ->
-                                spark.sql(
-                                        "ALTER TABLE paimon.default.testRenamePartitionKey RENAME COLUMN a to aa"))
+                        () -> spark.sql("ALTER TABLE testRenamePartitionKey RENAME COLUMN a to aa"))
                 .isInstanceOf(RuntimeException.class)
                 .hasMessage(
-                        String.format(
-                                "java.lang.UnsupportedOperationException: Cannot drop/rename partition key[%s]",
-                                "a"));
+                        "java.lang.UnsupportedOperationException: Cannot drop/rename partition key[a]");
     }
 
     @Test
@@ -245,31 +204,16 @@ public class SparkSchemaEvolutionITCase extends SparkReadTestBase {
         createTable("testDropSingleColumn");
         writeTable("testDropSingleColumn", "(1, 2L, '1')", "(5, 6L, '3')");
 
-        List<Row> beforeDrop =
-                spark.sql("SHOW CREATE TABLE paimon.default.testDropSingleColumn").collectAsList();
-        assertThat(beforeDrop.toString())
-                .isEqualTo(
-                        "[[CREATE TABLE testDropSingleColumn (\n"
-                                + "  `a` INT NOT NULL,\n"
-                                + "  `b` BIGINT,\n"
-                                + "  `c` STRING)\n"
-                                + buildTableProperties("default.db/testDropSingleColumn")
-                                + "]]");
-
-        spark.sql("ALTER TABLE paimon.default.testDropSingleColumn DROP COLUMN a");
-
-        List<Row> afterDrop =
-                spark.sql("SHOW CREATE TABLE paimon.default.testDropSingleColumn").collectAsList();
+        List<Row> beforeDrop = spark.sql("SHOW CREATE TABLE testDropSingleColumn").collectAsList();
+        assertThat(beforeDrop.toString()).contains(defaultShowCreateString("testDropSingleColumn"));
+
+        spark.sql("ALTER TABLE testDropSingleColumn DROP COLUMN a");
+
+        List<Row> afterDrop = spark.sql("SHOW CREATE TABLE testDropSingleColumn").collectAsList();
         assertThat(afterDrop.toString())
-                .isEqualTo(
-                        "[[CREATE TABLE testDropSingleColumn (\n"
-                                + "  `b` BIGINT,\n"
-                                + "  `c` STRING)\n"
-                                + buildTableProperties("default.db/testDropSingleColumn")
-                                + "]]");
-
-        Dataset<Row> table = spark.table("paimon.default.testDropSingleColumn");
-        List<Row> results = table.collectAsList();
+                .contains(showCreateString("testDropSingleColumn", "b BIGINT", "c STRING"));
+
+        List<Row> results = spark.table("testDropSingleColumn").collectAsList();
         assertThat(results.toString()).isEqualTo("[[2,1], [6,3]]");
     }
 
@@ -277,161 +221,104 @@ public class SparkSchemaEvolutionITCase extends SparkReadTestBase {
     public void testDropColumns() {
         createTable("testDropColumns");
 
-        List<Row> beforeRename =
-                spark.sql("SHOW CREATE TABLE paimon.default.testDropColumns").collectAsList();
-        assertThat(beforeRename.toString())
-                .isEqualTo(
-                        "[[CREATE TABLE testDropColumns (\n"
-                                + "  `a` INT NOT NULL,\n"
-                                + "  `b` BIGINT,\n"
-                                + "  `c` STRING)\n"
-                                + buildTableProperties("default.db/testDropColumns")
-                                + "]]");
-
-        spark.sql("ALTER TABLE paimon.default.testDropColumns DROP COLUMNS a, b");
-
-        List<Row> afterRename =
-                spark.sql("SHOW CREATE TABLE paimon.default.testDropColumns").collectAsList();
-        assertThat(afterRename.toString())
-                .isEqualTo(
-                        "[[CREATE TABLE testDropColumns (\n"
-                                + "  `c` STRING)\n"
-                                + buildTableProperties("default.db/testDropColumns")
-                                + "]]");
+        List<Row> beforeDrop = spark.sql("SHOW CREATE TABLE testDropColumns").collectAsList();
+        assertThat(beforeDrop.toString()).contains(defaultShowCreateString("testDropColumns"));
+
+        spark.sql("ALTER TABLE testDropColumns DROP COLUMNS a, b");
+
+        List<Row> afterDrop = spark.sql("SHOW CREATE TABLE testDropColumns").collectAsList();
+        assertThat(afterDrop.toString()).contains(showCreateString("testDropColumns", "c STRING"));
     }
 
     @Test
     public void testDropPartitionKey() {
-        spark.sql("USE paimon");
         spark.sql(
-                "CREATE TABLE default.testDropPartitionKey (\n"
+                "CREATE TABLE testDropPartitionKey (\n"
                         + "a BIGINT,\n"
-                        + "b STRING) USING paimon\n"
-                        + "COMMENT 'table comment'\n"
-                        + "PARTITIONED BY (a)\n"
-                        + "TBLPROPERTIES ('foo' = 'bar')");
+                        + "b STRING) \n"
+                        + "PARTITIONED BY (a)");
 
-        List<Row> beforeRename =
-                spark.sql("SHOW CREATE TABLE paimon.default.testDropPartitionKey").collectAsList();
-        assertThat(beforeRename.toString())
-                .isEqualTo(
-                        "[[CREATE TABLE testDropPartitionKey (\n"
-                                + "  `a` BIGINT,\n"
-                                + "  `b` STRING)\n"
-                                + "USING paimon\n"
-                                + "PARTITIONED BY (a)\n"
-                                + "COMMENT 'table comment'\n"
-                                + "TBLPROPERTIES(\n"
-                                + "  'foo' = 'bar',\n"
-                                + String.format(
-                                        "  'path' = '%s/%s')\n",
-                                        warehousePath, "default.db/testDropPartitionKey")
-                                + "]]");
+        List<Row> beforeDrop = spark.sql("SHOW CREATE TABLE testDropPartitionKey").collectAsList();
+        assertThat(beforeDrop.toString())
+                .contains(showCreateString("testDropPartitionKey", "a BIGINT", "b STRING"));
 
-        assertThatThrownBy(
-                        () ->
-                                spark.sql(
-                                        "ALTER TABLE paimon.default.testDropPartitionKey DROP COLUMN a"))
+        assertThatThrownBy(() -> spark.sql("ALTER TABLE testDropPartitionKey DROP COLUMN a"))
                 .isInstanceOf(RuntimeException.class)
                 .hasMessage(
-                        String.format(
-                                "java.lang.UnsupportedOperationException: Cannot drop/rename partition key[%s]",
-                                "a"));
+                        "java.lang.UnsupportedOperationException: Cannot drop/rename partition key[a]");
     }
 
     @Test
     public void testDropPrimaryKey() {
-        spark.sql("USE paimon");
         spark.sql(
-                "CREATE TABLE default.testDropPrimaryKey (\n"
+                "CREATE TABLE testDropPrimaryKey (\n"
                         + "a BIGINT,\n"
-                        + "b STRING) USING paimon\n"
-                        + "COMMENT 'table comment'\n"
+                        + "b STRING)\n"
                         + "PARTITIONED BY (a)\n"
                         + "TBLPROPERTIES ('primary-key' = 'a, b')");
 
-        List<Row> beforeRename =
-                spark.sql("SHOW CREATE TABLE paimon.default.testDropPrimaryKey").collectAsList();
-        assertThat(beforeRename.toString())
-                .isEqualTo(
-                        "[[CREATE TABLE testDropPrimaryKey (\n"
-                                + "  `a` BIGINT NOT NULL,\n"
-                                + "  `b` STRING NOT NULL)\n"
-                                + "USING paimon\n"
-                                + "PARTITIONED BY (a)\n"
-                                + "COMMENT 'table comment'\n"
-                                + "TBLPROPERTIES(\n"
-                                + String.format(
-                                        "  'path' = '%s/%s')\n",
-                                        warehousePath, "default.db/testDropPrimaryKey")
-                                + "]]");
+        List<Row> beforeDrop = spark.sql("SHOW CREATE TABLE testDropPrimaryKey").collectAsList();
+        assertThat(beforeDrop.toString())
+                .contains(
+                        showCreateString(
+                                "testDropPrimaryKey", "a BIGINT NOT NULL", "b STRING NOT NULL"));
 
-        assertThatThrownBy(
-                        () ->
-                                spark.sql(
-                                        "ALTER TABLE paimon.default.testDropPrimaryKey DROP COLUMN b"))
+        assertThatThrownBy(() -> spark.sql("ALTER TABLE testDropPrimaryKey DROP COLUMN b"))
                 .isInstanceOf(RuntimeException.class)
                 .hasMessage(
-                        String.format(
-                                "java.lang.UnsupportedOperationException: Cannot drop/rename primary key[%s]",
-                                "b"));
+                        "java.lang.UnsupportedOperationException: Cannot drop/rename primary key[b]");
     }
 
     @Test
     public void testUpdateColumnPosition() {
         // move first
-        spark.sql("CREATE TABLE tableFirst (a INT , b BIGINT, c STRING)");
+        createTable("tableFirst");
         spark.sql("ALTER TABLE tableFirst ALTER COLUMN b FIRST");
         List<Row> result = spark.sql("SHOW CREATE TABLE tableFirst").collectAsList();
         assertThat(result.toString())
-                .contains(
-                        "CREATE TABLE tableFirst (\n"
-                                + "  `b` BIGINT,\n"
-                                + "  `a` INT,\n"
-                                + "  `c` STRING)");
+                .contains(showCreateString("tableFirst", "b BIGINT", "a INT NOT NULL", "c STRING"));
 
         // move after
-        spark.sql("CREATE TABLE tableAfter (a INT, b BIGINT, c STRING)");
+        createTable("tableAfter");
         spark.sql("ALTER TABLE tableAfter ALTER COLUMN c AFTER a");
         result = spark.sql("SHOW CREATE TABLE tableAfter").collectAsList();
         assertThat(result.toString())
-                .contains(
-                        "CREATE TABLE tableAfter (\n"
-                                + "  `a` INT,\n"
-                                + "  `c` STRING,\n"
-                                + "  `b` BIGINT)");
+                .contains(showCreateString("tableAfter", "a INT NOT NULL", "c STRING", "b BIGINT"));
 
-        spark.sql("CREATE TABLE tableAfter1 (a INT, b BIGINT, c STRING, d DOUBLE)");
+        spark.sql("CREATE TABLE tableAfter1 (a INT NOT NULL, b BIGINT, c STRING, d DOUBLE)");
         spark.sql("ALTER TABLE tableAfter1 ALTER COLUMN b AFTER c");
         result = spark.sql("SHOW CREATE TABLE tableAfter1").collectAsList();
         assertThat(result.toString())
                 .contains(
-                        "CREATE TABLE tableAfter1 (\n"
-                                + "  `a` INT,\n"
-                                + "  `c` STRING,\n"
-                                + "  `b` BIGINT,\n"
-                                + "  `d` DOUBLE)");
-
-        //  move self for first test
-        spark.sql("CREATE TABLE tableFirstSelf (a INT , b BIGINT, c STRING)");
+                        showCreateString(
+                                "tableAfter1",
+                                "a INT NOT NULL",
+                                "c STRING",
+                                "b BIGINT",
+                                "d DOUBLE"));
+
+        //  move self to first test
+        createTable("tableFirstSelf");
         assertThatThrownBy(() -> spark.sql("ALTER TABLE tableFirstSelf ALTER COLUMN a FIRST"))
-                .getRootCause()
-                .isInstanceOf(UnsupportedOperationException.class)
-                .hasMessageContaining("Cannot move itself for column a");
+                .satisfies(
+                        AssertionUtils.anyCauseMatches(
+                                UnsupportedOperationException.class,
+                                "Cannot move itself for column a"));
 
-        //  move self for after test
-        spark.sql("CREATE TABLE tableAfterSelf (a INT , b BIGINT, c STRING)");
+        //  move self to after test
+        createTable("tableAfterSelf");
         assertThatThrownBy(() -> spark.sql("ALTER TABLE tableAfterSelf ALTER COLUMN b AFTER b"))
-                .getRootCause()
-                .isInstanceOf(UnsupportedOperationException.class)
-                .hasMessageContaining("Cannot move itself for column b");
+                .satisfies(
+                        AssertionUtils.anyCauseMatches(
+                                UnsupportedOperationException.class,
+                                "Cannot move itself for column b"));
 
         // missing column
-        spark.sql("CREATE TABLE tableMissing (a INT , b BIGINT, c STRING)");
+        createTable("tableMissing");
         assertThatThrownBy(() -> spark.sql("ALTER TABLE tableMissing ALTER COLUMN d FIRST"))
                 .hasMessageContaining("Missing field d in table paimon.default.tableMissing");
 
-        spark.sql("CREATE TABLE tableMissingAfter (a INT , b BIGINT, c STRING)");
+        createTable("tableMissingAfter");
         assertThatThrownBy(() -> spark.sql("ALTER TABLE tableMissingAfter ALTER COLUMN a AFTER d"))
                 .hasMessageContaining("Missing field d in table paimon.default.tableMissingAfter");
     }
@@ -441,19 +328,18 @@ public class SparkSchemaEvolutionITCase extends SparkReadTestBase {
         createTable("testAlterColumnType");
         writeTable("testAlterColumnType", "(1, 2L, '1')", "(5, 6L, '3')");
 
-        spark.sql("ALTER TABLE paimon.default.testAlterColumnType ALTER COLUMN b TYPE DOUBLE");
-        assertThat(
-                        spark.table("paimon.default.testAlterColumnType").collectAsList().stream()
-                                .map(Row::toString)
-                                .collect(Collectors.toList()))
-                .containsExactlyInAnyOrder("[1,2.0,1]", "[5,6.0,3]");
+        List<Row> beforeAlter = spark.sql("SHOW CREATE TABLE testAlterColumnType").collectAsList();
+        assertThat(beforeAlter.toString()).contains(defaultShowCreateString("testAlterColumnType"));
 
-        spark.sql("ALTER TABLE paimon.default.testAlterColumnType DROP COLUMNS a");
-        assertThat(
-                        spark.table("paimon.default.testAlterColumnType").collectAsList().stream()
-                                .map(Row::toString)
-                                .collect(Collectors.toList()))
-                .containsExactlyInAnyOrder("[2.0,1]", "[6.0,3]");
+        spark.sql("ALTER TABLE testAlterColumnType ALTER COLUMN b TYPE DOUBLE");
+        assertThat(spark.table("testAlterColumnType").collectAsList().toString())
+                .isEqualTo("[[1,2.0,1], [5,6.0,3]]");
+
+        List<Row> afterAlter = spark.sql("SHOW CREATE TABLE testAlterColumnType").collectAsList();
+        assertThat(afterAlter.toString())
+                .contains(
+                        showCreateString(
+                                "testAlterColumnType", "a INT NOT NULL", "b DOUBLE", "c STRING"));
     }
 
     @Test
@@ -469,39 +355,38 @@ public class SparkSchemaEvolutionITCase extends SparkReadTestBase {
                 .isFalse();
 
         // note: for Spark, it is illegal to change nullable column to non-nullable
-        spark.sql("ALTER TABLE paimon.default.t2 ALTER COLUMN a DROP NOT NULL");
+        spark.sql("ALTER TABLE t2 ALTER COLUMN a DROP NOT NULL");
         assertThat(fieldIsNullable(getField(schema2(), 0))).isTrue();
 
-        spark.sql("ALTER TABLE paimon.default.t2 ALTER COLUMN b DROP NOT NULL");
+        spark.sql("ALTER TABLE t2 ALTER COLUMN b DROP NOT NULL");
         assertThat(fieldIsNullable(getField(schema2(), 1))).isTrue();
 
-        spark.sql("ALTER TABLE paimon.default.t2 ALTER COLUMN c DROP NOT NULL");
+        spark.sql("ALTER TABLE t2 ALTER COLUMN c DROP NOT NULL");
         assertThat(fieldIsNullable(getField(schema2(), 2))).isTrue();
 
-        spark.sql("ALTER TABLE paimon.default.t2 ALTER COLUMN c.c1 DROP NOT NULL");
+        spark.sql("ALTER TABLE t2 ALTER COLUMN c.c1 DROP NOT NULL");
         assertThat(fieldIsNullable(getNestedField(getField(schema2(), 2), 0))).isTrue();
 
-        spark.sql("ALTER TABLE paimon.default.t2 ALTER COLUMN c.c1.c12 DROP NOT NULL");
+        spark.sql("ALTER TABLE t2 ALTER COLUMN c.c1.c12 DROP NOT NULL");
         assertThat(fieldIsNullable(getNestedField(getNestedField(getField(schema2(), 2), 0), 1)))
                 .isTrue();
     }
 
     @Test
     public void testAlterPrimaryKeyNullability() {
-        spark.sql("USE paimon");
         spark.sql(
-                "CREATE TABLE default.testAlterPkNullability (\n"
+                "CREATE TABLE testAlterPkNullability (\n"
                         + "a BIGINT,\n"
-                        + "b STRING) USING paimon\n"
-                        + "COMMENT 'table comment'\n"
+                        + "b STRING)\n"
                         + "TBLPROPERTIES ('primary-key' = 'a')");
         assertThatThrownBy(
                         () ->
                                 spark.sql(
-                                        "ALTER TABLE default.testAlterPkNullability ALTER COLUMN a DROP NOT NULL"))
-                .getRootCause()
-                .isInstanceOf(UnsupportedOperationException.class)
-                .hasMessageContaining("Cannot change nullability of primary key");
+                                        "ALTER TABLE testAlterPkNullability ALTER COLUMN a DROP NOT NULL"))
+                .satisfies(
+                        AssertionUtils.anyCauseMatches(
+                                UnsupportedOperationException.class,
+                                "Cannot change nullability of primary key"));
     }
 
     @Test
@@ -509,10 +394,10 @@ public class SparkSchemaEvolutionITCase extends SparkReadTestBase {
         createTable("testAlterTableColumnComment");
         assertThat(getField(schema1(), 0).description()).isNull();
 
-        spark.sql("ALTER TABLE paimon.default.t1 ALTER COLUMN a COMMENT 'a new comment'");
+        spark.sql("ALTER TABLE t1 ALTER COLUMN a COMMENT 'a new comment'");
         assertThat(getField(schema1(), 0).description()).isEqualTo("a new comment");
 
-        spark.sql("ALTER TABLE paimon.default.t1 ALTER COLUMN a COMMENT 'yet another comment'");
+        spark.sql("ALTER TABLE t1 ALTER COLUMN a COMMENT 'yet another comment'");
         assertThat(getField(schema1(), 0).description()).isEqualTo("yet another comment");
 
         assertThat(getField(schema2(), 2).description()).isEqualTo("comment about c");
@@ -524,12 +409,11 @@ public class SparkSchemaEvolutionITCase extends SparkReadTestBase {
         assertThat(getNestedField(getNestedField(getField(schema2(), 2), 0), 1).description())
                 .isNull();
 
-        spark.sql(
-                "ALTER TABLE paimon.default.t2 ALTER COLUMN c COMMENT 'yet another comment about c'");
-        spark.sql("ALTER TABLE paimon.default.t2 ALTER COLUMN c.c1 COMMENT 'a nested type'");
-        spark.sql("ALTER TABLE paimon.default.t2 ALTER COLUMN c.c2 COMMENT 'a bigint type'");
-        spark.sql("ALTER TABLE paimon.default.t2 ALTER COLUMN c.c1.c11 COMMENT 'a double type'");
-        spark.sql("ALTER TABLE paimon.default.t2 ALTER COLUMN c.c1.c12 COMMENT 'a boolean array'");
+        spark.sql("ALTER TABLE t2 ALTER COLUMN c COMMENT 'yet another comment about c'");
+        spark.sql("ALTER TABLE t2 ALTER COLUMN c.c1 COMMENT 'a nested type'");
+        spark.sql("ALTER TABLE t2 ALTER COLUMN c.c2 COMMENT 'a bigint type'");
+        spark.sql("ALTER TABLE t2 ALTER COLUMN c.c1.c11 COMMENT 'a double type'");
+        spark.sql("ALTER TABLE t2 ALTER COLUMN c.c1.c12 COMMENT 'a boolean array'");
 
         assertThat(getField(schema2(), 2).description()).isEqualTo("yet another comment about c");
         assertThat(getNestedField(getField(schema2(), 2), 0).description())
@@ -563,7 +447,7 @@ public class SparkSchemaEvolutionITCase extends SparkReadTestBase {
     public void testSchemaEvolution() {
         // Create table with fields [a, b, c] and insert 2 records
         spark.sql(
-                "CREATE TABLE paimon.default.testSchemaEvolution(\n"
+                "CREATE TABLE testSchemaEvolution(\n"
                         + "a INT NOT NULL, \n"
                         + "b BIGINT NOT NULL, \n"
                         + "c VARCHAR(10), \n"
@@ -572,20 +456,17 @@ public class SparkSchemaEvolutionITCase extends SparkReadTestBase {
                         + "f INT NOT NULL) \n"
                         + "TBLPROPERTIES ('file.format'='avro')");
         writeTable("testSchemaEvolution", "(1, 2L, '3', 4, 5, 6)", "(7, 8L, '9', 10, 11, 12)");
-        assertThat(
-                        spark.table("paimon.default.testSchemaEvolution").collectAsList().stream()
-                                .map(Row::toString)
-                                .collect(Collectors.toList()))
+
+        Dataset<Row> table = spark.table("testSchemaEvolution");
+        assertThat(table.collectAsList().stream().map(Row::toString).collect(Collectors.toList()))
                 .containsExactlyInAnyOrder("[1,2,3,4,5,6]", "[7,8,9,10,11,12]");
         assertThat(
-                        spark.table("paimon.default.testSchemaEvolution").select("a", "c", "e")
-                                .collectAsList().stream()
+                        table.select("a", "c", "e").collectAsList().stream()
                                 .map(Row::toString)
                                 .collect(Collectors.toList()))
                 .containsExactlyInAnyOrder("[1,3,5]", "[7,9,11]");
         assertThat(
-                        spark.table("paimon.default.testSchemaEvolution").filter("a>1")
-                                .collectAsList().stream()
+                        table.filter("a>1").collectAsList().stream()
                                 .map(Row::toString)
                                 .collect(Collectors.toList()))
                 .containsExactlyInAnyOrder("[7,8,9,10,11,12]");
@@ -593,50 +474,45 @@ public class SparkSchemaEvolutionITCase extends SparkReadTestBase {
         // Rename (a, int)->(aa, bigint), c->a, b->c, (d, int)->(b, bigint), (f, int)->(ff, float),
         // the fields are [(1, aa, bigint), (2, c, bigint), (3, a, string), (4, b, bigint), (5, e,
         // int), (6, ff, float)] and insert 2 records
-        spark.sql("ALTER TABLE paimon.default.testSchemaEvolution RENAME COLUMN a to aa");
-        spark.sql("ALTER TABLE paimon.default.testSchemaEvolution ALTER COLUMN aa TYPE bigint");
-        spark.sql("ALTER TABLE paimon.default.testSchemaEvolution RENAME COLUMN c to a");
-        spark.sql("ALTER TABLE paimon.default.testSchemaEvolution RENAME COLUMN b to c");
-        spark.sql("ALTER TABLE paimon.default.testSchemaEvolution RENAME COLUMN d to b");
-        spark.sql("ALTER TABLE paimon.default.testSchemaEvolution ALTER COLUMN b TYPE bigint");
-        spark.sql("ALTER TABLE paimon.default.testSchemaEvolution RENAME COLUMN f to ff");
-        spark.sql("ALTER TABLE paimon.default.testSchemaEvolution ALTER COLUMN ff TYPE float");
+        spark.sql("ALTER TABLE testSchemaEvolution RENAME COLUMN a to aa");
+        spark.sql("ALTER TABLE testSchemaEvolution ALTER COLUMN aa TYPE bigint");
+        spark.sql("ALTER TABLE testSchemaEvolution RENAME COLUMN c to a");
+        spark.sql("ALTER TABLE testSchemaEvolution RENAME COLUMN b to c");
+        spark.sql("ALTER TABLE testSchemaEvolution RENAME COLUMN d to b");
+        spark.sql("ALTER TABLE testSchemaEvolution ALTER COLUMN b TYPE bigint");
+        spark.sql("ALTER TABLE testSchemaEvolution RENAME COLUMN f to ff");
+        spark.sql("ALTER TABLE testSchemaEvolution ALTER COLUMN ff TYPE float");
         writeTable(
                 "testSchemaEvolution",
                 "(13L, 14L, '15', 16L, 17, 18.0F)",
                 "(19L, 20L, '21', 22L, 23, 24.0F)");
-        assertThat(
-                        spark.table("paimon.default.testSchemaEvolution").collectAsList().stream()
-                                .map(Row::toString)
-                                .collect(Collectors.toList()))
+
+        table = spark.table("testSchemaEvolution");
+        assertThat(table.collectAsList().stream().map(Row::toString).collect(Collectors.toList()))
                 .containsExactlyInAnyOrder(
                         "[1,2,3,4,5,6.0]",
                         "[7,8,9,10,11,12.0]",
                         "[13,14,15,16,17,18.0]",
                         "[19,20,21,22,23,24.0]");
         assertThat(
-                        spark.table("paimon.default.testSchemaEvolution").select("aa", "b", "ff")
-                                .collectAsList().stream()
+                        table.select("aa", "b", "ff").collectAsList().stream()
                                 .map(Row::toString)
                                 .collect(Collectors.toList()))
                 .containsExactlyInAnyOrder(
                         "[1,4,6.0]", "[7,10,12.0]", "[13,16,18.0]", "[19,22,24.0]");
         assertThat(
-                        spark.table("paimon.default.testSchemaEvolution").select("aa", "a", "ff")
-                                .filter("b>10L").collectAsList().stream()
+                        table.select("aa", "a", "ff").filter("b>10L").collectAsList().stream()
                                 .map(Row::toString)
                                 .collect(Collectors.toList()))
                 .containsExactlyInAnyOrder("[13,15,18.0]", "[19,21,24.0]");
 
         // Drop fields aa, c, e, the fields are [(3, a, string), (4, b, bigint), (6, ff, float)] and
         // insert 2 records
-        spark.sql("ALTER TABLE paimon.default.testSchemaEvolution DROP COLUMNS aa, c, e");
+        spark.sql("ALTER TABLE testSchemaEvolution DROP COLUMNS aa, c, e");
         writeTable("testSchemaEvolution", "('25', 26L, 27.0F)", "('28', 29L, 30.0)");
 
-        assertThat(
-                        spark.table("paimon.default.testSchemaEvolution").collectAsList().stream()
-                                .map(Row::toString)
-                                .collect(Collectors.toList()))
+        table = spark.table("testSchemaEvolution");
+        assertThat(table.collectAsList().stream().map(Row::toString).collect(Collectors.toList()))
                 .containsExactlyInAnyOrder(
                         "[3,4,6.0]",
                         "[9,10,12.0]",
@@ -645,25 +521,21 @@ public class SparkSchemaEvolutionITCase extends SparkReadTestBase {
                         "[25,26,27.0]",
                         "[28,29,30.0]");
         assertThat(
-                        spark.table("paimon.default.testSchemaEvolution").select("a", "ff")
-                                .filter("b>10L").collectAsList().stream()
+                        table.select("a", "ff").filter("b>10L").collectAsList().stream()
                                 .map(Row::toString)
                                 .collect(Collectors.toList()))
                 .containsExactlyInAnyOrder("[15,18.0]", "[21,24.0]", "[25,27.0]", "[28,30.0]");
 
         // Add new fields d, c, e, the fields are [(3, a, string), (4, b, bigint), (6, ff, float),
         // (7, d, int), (8, c, int), (9, e, int)] insert 2 records
-        spark.sql(
-                "ALTER TABLE paimon.default.testSchemaEvolution ADD COLUMNS (d INT, c INT, e INT)");
+        spark.sql("ALTER TABLE testSchemaEvolution ADD COLUMNS (d INT, c INT, e INT)");
         writeTable(
                 "testSchemaEvolution",
                 "('31', 32L, 33.0F, 34, 35, 36)",
                 "('37', 38L, 39.0F, 40, 41, 42)");
 
-        assertThat(
-                        spark.table("paimon.default.testSchemaEvolution").collectAsList().stream()
-                                .map(Row::toString)
-                                .collect(Collectors.toList()))
+        table = spark.table("testSchemaEvolution");
+        assertThat(table.collectAsList().stream().map(Row::toString).collect(Collectors.toList()))
                 .containsExactlyInAnyOrder(
                         "[3,4,6.0,null,null,null]",
                         "[9,10,12.0,null,null,null]",
@@ -674,8 +546,7 @@ public class SparkSchemaEvolutionITCase extends SparkReadTestBase {
                         "[31,32,33.0,34,35,36]",
                         "[37,38,39.0,40,41,42]");
         assertThat(
-                        spark.table("paimon.default.testSchemaEvolution").filter("b>10")
-                                .collectAsList().stream()
+                        table.filter("b>10").collectAsList().stream()
                                 .map(Row::toString)
                                 .collect(Collectors.toList()))
                 .containsExactlyInAnyOrder(
@@ -686,8 +557,7 @@ public class SparkSchemaEvolutionITCase extends SparkReadTestBase {
                         "[31,32,33.0,34,35,36]",
                         "[37,38,39.0,40,41,42]");
         assertThat(
-                        spark.table("paimon.default.testSchemaEvolution")
-                                .select("e", "a", "ff", "d", "b").filter("b>10L").collectAsList()
+                        table.select("e", "a", "ff", "d", "b").filter("b>10L").collectAsList()
                                 .stream()
                                 .map(Row::toString)
                                 .collect(Collectors.toList()))
@@ -699,8 +569,7 @@ public class SparkSchemaEvolutionITCase extends SparkReadTestBase {
                         "[36,31,33.0,34,32]",
                         "[42,37,39.0,40,38]");
         assertThat(
-                        spark.table("paimon.default.testSchemaEvolution")
-                                .select("e", "a", "ff", "d", "b").filter("b>10 and e is not null")
+                        table.select("e", "a", "ff", "d", "b").filter("b>10 and e is not null")
                                 .collectAsList().stream()
                                 .map(Row::toString)
                                 .collect(Collectors.toList()))
@@ -714,28 +583,25 @@ public class SparkSchemaEvolutionITCase extends SparkReadTestBase {
         writeTable("testFilesTable", "(1, 2L, '3')", "(4, 5L, '6')");
         assertThat(
                         getFieldStatsList(
-                                spark.sql("SELECT * FROM paimon.default.`testFilesTable$files`")
-                                        .collectAsList()))
+                                spark.sql("SELECT * FROM `testFilesTable$files`").collectAsList()))
                 .containsExactlyInAnyOrder("{a=0, b=0, c=0},{a=1, b=2, c=3},{a=4, b=5, c=6}");
 
         // Add new fields d, e, f and the fields are [a, b, c, d, e, f], insert 2 records
-        spark.sql("ALTER TABLE paimon.default.testFilesTable ADD COLUMNS (d INT, e INT, f INT)");
+        spark.sql("ALTER TABLE testFilesTable ADD COLUMNS (d INT, e INT, f INT)");
         writeTable("testFilesTable", "(7, 8L, '9', 10, 11, 12)", "(13, 14L, '15', 16, 17, 18)");
         assertThat(
                         getFieldStatsList(
-                                spark.sql("SELECT * FROM paimon.default.`testFilesTable$files`")
-                                        .collectAsList()))
+                                spark.sql("SELECT * FROM `testFilesTable$files`").collectAsList()))
                 .containsExactlyInAnyOrder(
                         "{a=0, b=0, c=0, d=2, e=2, f=2},{a=1, b=2, c=3, d=null, e=null, f=null},{a=4, b=5, c=6, d=null, e=null, f=null}",
                         "{a=0, b=0, c=0, d=0, e=0, f=0},{a=7, b=8, c=15, d=10, e=11, f=12},{a=13, b=14, c=9, d=16, e=17, f=18}");
 
         // Drop fields c, e and the fields are [a, b, d, f], insert 2 records
-        spark.sql("ALTER TABLE paimon.default.testFilesTable DROP COLUMNS c, e");
+        spark.sql("ALTER TABLE testFilesTable DROP COLUMNS c, e");
         writeTable("testFilesTable", "(19, 20L, 21, 22)", "(23, 24L, 25, 26)");
         assertThat(
                         getFieldStatsList(
-                                spark.sql("SELECT * FROM paimon.default.`testFilesTable$files`")
-                                        .collectAsList()))
+                                spark.sql("SELECT * FROM `testFilesTable$files`").collectAsList()))
                 .containsExactlyInAnyOrder(
                         "{a=0, b=0, d=2, f=2},{a=1, b=2, d=null, f=null},{a=4, b=5, d=null, f=null}",
                         "{a=0, b=0, d=0, f=0},{a=7, b=8, d=10, f=12},{a=13, b=14, d=16, f=18}",
@@ -753,10 +619,4 @@ public class SparkSchemaEvolutionITCase extends SparkReadTestBase {
                                         ","))
                 .collect(Collectors.toList());
     }
-
-    private String buildTableProperties(String tablePath) {
-        return String.format(
-                "TBLPROPERTIES(\n" + "  'file.format' = 'avro',\n" + "  'path' = '%s/%s')\n",
-                warehousePath, tablePath);
-    }
 }