You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@paimon.apache.org by lz...@apache.org on 2023/03/21 09:03:38 UTC
[incubator-paimon] branch master updated: [spark] Upgrade spark version to 3.3.2 of spark-common (#675)
This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 9413ab020 [spark] Upgrade spark version to 3.3.2 of spark-common (#675)
9413ab020 is described below
commit 9413ab020f15287de83f14b0c42a15b48d1f1d3a
Author: yuzelin <33...@users.noreply.github.com>
AuthorDate: Tue Mar 21 17:03:31 2023 +0800
[spark] Upgrade spark version to 3.3.2 of spark-common (#675)
---
paimon-spark/paimon-spark-common/pom.xml | 2 +-
.../org/apache/paimon/spark/SparkReadITCase.java | 263 ++++++-----
.../org/apache/paimon/spark/SparkReadTestBase.java | 15 +
.../paimon/spark/SparkSchemaEvolutionITCase.java | 506 ++++++++-------------
4 files changed, 330 insertions(+), 456 deletions(-)
diff --git a/paimon-spark/paimon-spark-common/pom.xml b/paimon-spark/paimon-spark-common/pom.xml
index 91c4d66b0..375f8471c 100644
--- a/paimon-spark/paimon-spark-common/pom.xml
+++ b/paimon-spark/paimon-spark-common/pom.xml
@@ -34,7 +34,7 @@ under the License.
<name>Paimon : Spark : Common</name>
<properties>
- <spark.version>3.2.2</spark.version>
+ <spark.version>3.3.2</spark.version>
</properties>
<dependencies>
diff --git a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkReadITCase.java b/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkReadITCase.java
index 79cdc9ea0..16b7fd5cf 100644
--- a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkReadITCase.java
+++ b/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkReadITCase.java
@@ -22,6 +22,7 @@ import org.apache.paimon.fs.Path;
import org.apache.paimon.fs.local.LocalFileIO;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.table.FileStoreTableFactory;
+import org.apache.paimon.testutils.assertj.AssertionUtils;
import org.apache.paimon.types.ArrayType;
import org.apache.paimon.types.BigIntType;
import org.apache.paimon.types.DataField;
@@ -53,41 +54,39 @@ public class SparkReadITCase extends SparkReadTestBase {
@Test
public void testNormal() {
- innerTestSimpleType(spark.table("paimon.default.t1"));
+ innerTestSimpleType(spark.table("t1"));
- innerTestNestedType(spark.table("paimon.default.t2"));
+ innerTestNestedType(spark.table("t2"));
}
@Test
public void testFilterPushDown() {
- innerTestSimpleTypeFilterPushDown(spark.table("paimon.default.t1"));
+ innerTestSimpleTypeFilterPushDown(spark.table("t1"));
- innerTestNestedTypeFilterPushDown(spark.table("paimon.default.t2"));
+ innerTestNestedTypeFilterPushDown(spark.table("t2"));
}
@Test
public void testCatalogNormal() {
- innerTestSimpleType(spark.table("paimon.default.t1"));
- innerTestNestedType(spark.table("paimon.default.t2"));
+ innerTestSimpleType(spark.table("t1"));
+ innerTestNestedType(spark.table("t2"));
}
@Test
public void testSnapshotsTable() {
List<Row> rows =
- spark.table("paimon.default.`t1$snapshots`")
+ spark.table("`t1$snapshots`")
.select("snapshot_id", "schema_id", "commit_user", "commit_kind")
.collectAsList();
assertThat(rows.toString()).isEqualTo("[[1,0,user,APPEND]]");
- spark.sql("USE paimon");
spark.sql(
- "CREATE TABLE default.schemasTable (\n"
+ "CREATE TABLE schemasTable (\n"
+ "a BIGINT,\n"
- + "b STRING) USING paimon\n"
- + "COMMENT 'table comment'\n"
+ + "b STRING)\n"
+ "TBLPROPERTIES ('primary-key' = 'a')");
- spark.sql("ALTER TABLE default.schemasTable ADD COLUMN c STRING");
- List<Row> schemas = spark.table("paimon.default.`schemasTable$schemas`").collectAsList();
+ spark.sql("ALTER TABLE schemasTable ADD COLUMN c STRING");
+ List<Row> schemas = spark.table("`schemasTable$schemas`").collectAsList();
List<?> fieldsList = schemas.stream().map(row -> row.get(1)).collect(Collectors.toList());
assertThat(fieldsList.stream().map(Object::toString).collect(Collectors.toList()))
.containsExactlyInAnyOrder(
@@ -101,7 +100,7 @@ public class SparkReadITCase extends SparkReadTestBase {
@Test
public void testSnapshotsTableWithRecordCount() {
List<Row> rows =
- spark.table("paimon.default.`t1$snapshots`")
+ spark.table("`t1$snapshots`")
.select(
"snapshot_id",
"total_record_count",
@@ -113,14 +112,13 @@ public class SparkReadITCase extends SparkReadTestBase {
@Test
public void testCatalogFilterPushDown() {
- innerTestSimpleTypeFilterPushDown(spark.table("paimon.default.t1"));
+ innerTestSimpleTypeFilterPushDown(spark.table("t1"));
- innerTestNestedTypeFilterPushDown(spark.table("paimon.default.t2"));
+ innerTestNestedTypeFilterPushDown(spark.table("t2"));
}
@Test
public void testDefaultNamespace() {
- spark.sql("USE paimon");
assertThat(spark.sql("SHOW CURRENT NAMESPACE").collectAsList().toString())
.isEqualTo("[[paimon,default]]");
}
@@ -128,12 +126,12 @@ public class SparkReadITCase extends SparkReadTestBase {
@Test
public void testCreateTable() {
spark.sql(
- "CREATE TABLE paimon.default.testCreateTable(\n"
+ "CREATE TABLE testCreateTable(\n"
+ "a BIGINT,\n"
+ "b VARCHAR(10),\n"
+ "c CHAR(10))");
assertThat(
- spark.sql("SELECT fields FROM paimon.default.`testCreateTable$schemas`")
+ spark.sql("SELECT fields FROM `testCreateTable$schemas`")
.collectAsList()
.toString())
.isEqualTo(
@@ -146,101 +144,87 @@ public class SparkReadITCase extends SparkReadTestBase {
@Test
public void testCreateTableAs() {
spark.sql(
- "CREATE TABLE default.testCreateTable(\n"
+ "CREATE TABLE testCreateTable(\n"
+ "a BIGINT,\n"
+ "b VARCHAR(10),\n"
+ "c CHAR(10))");
- spark.sql("INSERT INTO default.testCreateTable VALUES(1,'a','b')");
- spark.sql(
- "CREATE TABLE default.testCreateTableAs AS SELECT * FROM default.testCreateTable");
- List<Row> result = spark.sql("SELECT * FROM default.testCreateTableAs").collectAsList();
+ spark.sql("INSERT INTO testCreateTable VALUES(1,'a','b')");
+ spark.sql("CREATE TABLE testCreateTableAs AS SELECT * FROM testCreateTable");
+ List<Row> result = spark.sql("SELECT * FROM testCreateTableAs").collectAsList();
assertThat(result.stream().map(Row::toString)).containsExactlyInAnyOrder("[1,a,b]");
// partitioned table
spark.sql(
- "CREATE TABLE default.partitionedTable (\n"
+ "CREATE TABLE partitionedTable (\n"
+ "a BIGINT,\n"
- + "b STRING,"
- + "c STRING) USING paimon\n"
- + "COMMENT 'table comment'\n"
+ + "b STRING,\n"
+ + "c STRING)\n"
+ "PARTITIONED BY (a,b)");
- spark.sql("INSERT INTO default.partitionedTable VALUES(1,'aaa','bbb')");
+ spark.sql("INSERT INTO partitionedTable VALUES(1,'aaa','bbb')");
spark.sql(
- "CREATE TABLE default.partitionedTableAs PARTITIONED BY (a) AS SELECT * FROM default.partitionedTable");
- assertThat(
- spark.sql("SHOW CREATE TABLE default.partitionedTableAs")
- .collectAsList()
- .toString())
+ "CREATE TABLE partitionedTableAs PARTITIONED BY (a) AS SELECT * FROM partitionedTable");
+ assertThat(spark.sql("SHOW CREATE TABLE partitionedTableAs").collectAsList().toString())
.isEqualTo(
String.format(
- "[[CREATE TABLE partitionedTableAs (\n"
- + " `a` BIGINT,\n"
- + " `b` STRING,\n"
- + " `c` STRING)\n"
+ "[[%s"
+ "PARTITIONED BY (a)\n"
- + "TBLPROPERTIES(\n"
+ + "TBLPROPERTIES (\n"
+ " 'path' = '%s')\n"
+ "]]",
+ showCreateString(
+ "partitionedTableAs", "a BIGINT", "b STRING", "c STRING"),
new Path(warehousePath, "default.db/partitionedTableAs")));
- List<Row> resultPartition =
- spark.sql("SELECT * FROM default.partitionedTableAs").collectAsList();
+ List<Row> resultPartition = spark.sql("SELECT * FROM partitionedTableAs").collectAsList();
assertThat(resultPartition.stream().map(Row::toString))
.containsExactlyInAnyOrder("[1,aaa,bbb]");
// change TBLPROPERTIES
spark.sql(
- "CREATE TABLE default.testTable(\n"
+ "CREATE TABLE testTable(\n"
+ "a BIGINT,\n"
+ "b VARCHAR(10),\n"
- + "c CHAR(10))"
- + " TBLPROPERTIES("
- + " 'file.format' = 'orc'"
+ + "c CHAR(10))\n"
+ + " TBLPROPERTIES(\n"
+ + " 'file.format' = 'orc'\n"
+ ")");
- spark.sql("INSERT INTO default.testTable VALUES(1,'a','b')");
+ spark.sql("INSERT INTO testTable VALUES(1,'a','b')");
spark.sql(
- "CREATE TABLE default.testTableAs TBLPROPERTIES ('file.format' = 'parquet') AS SELECT * FROM default.testTable");
- assertThat(spark.sql("SHOW CREATE TABLE default.testTableAs").collectAsList().toString())
+ "CREATE TABLE testTableAs TBLPROPERTIES ('file.format' = 'parquet') AS SELECT * FROM testTable");
+ assertThat(spark.sql("SHOW CREATE TABLE testTableAs").collectAsList().toString())
.isEqualTo(
String.format(
- "[[CREATE TABLE testTableAs (\n"
- + " `a` BIGINT,\n"
- + " `b` STRING,\n"
- + " `c` STRING)\n"
- + "TBLPROPERTIES(\n"
+ "[[%s"
+ + "TBLPROPERTIES (\n"
+ " 'file.format' = 'parquet',\n"
+ " 'path' = '%s')\n"
+ "]]",
+ showCreateString("testTableAs", "a BIGINT", "b STRING", "c STRING"),
new Path(warehousePath, "default.db/testTableAs")));
- List<Row> resultProp = spark.sql("SELECT * FROM default.testTableAs").collectAsList();
+ List<Row> resultProp = spark.sql("SELECT * FROM testTableAs").collectAsList();
assertThat(resultProp.stream().map(Row::toString)).containsExactlyInAnyOrder("[1,a,b]");
// primary key
spark.sql(
- "CREATE TABLE default.t_pk (\n"
+ "CREATE TABLE t_pk (\n"
+ "a BIGINT,\n"
+ "b STRING,\n"
+ "c STRING\n"
- + ") TBLPROPERTIES ("
- + " 'primary-key' = 'a,b'"
+ + ") TBLPROPERTIES (\n"
+ + " 'primary-key' = 'a,b'\n"
+ ")\n"
+ "COMMENT 'table comment'");
- spark.sql("INSERT INTO default.t_pk VALUES(1,'aaa','bbb')");
- spark.sql(
- "CREATE TABLE default.t_pk_as TBLPROPERTIES ('primary-key' = 'a') AS SELECT * FROM default.t_pk");
- assertThat(spark.sql("SHOW CREATE TABLE default.t_pk_as").collectAsList().toString())
+ spark.sql("INSERT INTO t_pk VALUES(1,'aaa','bbb')");
+ spark.sql("CREATE TABLE t_pk_as TBLPROPERTIES ('primary-key' = 'a') AS SELECT * FROM t_pk");
+ assertThat(spark.sql("SHOW CREATE TABLE t_pk_as").collectAsList().toString())
.isEqualTo(
String.format(
- "[[CREATE TABLE t_pk_as (\n"
- + " `a` BIGINT NOT NULL,\n"
- + " `b` STRING,\n"
- + " `c` STRING)\n"
- + "TBLPROPERTIES(\n"
- + " 'path' = '%s')\n"
- + "]]",
+ "[[%sTBLPROPERTIES (\n 'path' = '%s')\n]]",
+ showCreateString(
+ "t_pk_as", "a BIGINT NOT NULL", "b STRING", "c STRING"),
new Path(warehousePath, "default.db/t_pk_as")));
- List<Row> resultPk = spark.sql("SELECT * FROM default.t_pk_as").collectAsList();
+ List<Row> resultPk = spark.sql("SELECT * FROM t_pk_as").collectAsList();
assertThat(resultPk.stream().map(Row::toString)).containsExactlyInAnyOrder("[1,aaa,bbb]");
@@ -257,34 +241,34 @@ public class SparkReadITCase extends SparkReadTestBase {
+ ")");
spark.sql("INSERT INTO t_all VALUES(1,2,'bbb','2020-01-01','12')");
spark.sql(
- "CREATE TABLE default.t_all_as PARTITIONED BY (dt) TBLPROPERTIES ('primary-key' = 'dt,hh') AS SELECT * FROM default.t_all");
- assertThat(spark.sql("SHOW CREATE TABLE default.t_all_as").collectAsList().toString())
+ "CREATE TABLE t_all_as PARTITIONED BY (dt) TBLPROPERTIES ('primary-key' = 'dt,hh') AS SELECT * FROM t_all");
+ assertThat(spark.sql("SHOW CREATE TABLE t_all_as").collectAsList().toString())
.isEqualTo(
String.format(
- "[[CREATE TABLE t_all_as (\n"
- + " `user_id` BIGINT,\n"
- + " `item_id` BIGINT,\n"
- + " `behavior` STRING,\n"
- + " `dt` STRING NOT NULL,\n"
- + " `hh` STRING NOT NULL)\n"
+ "[[%s"
+ "PARTITIONED BY (dt)\n"
- + "TBLPROPERTIES(\n"
+ + "TBLPROPERTIES (\n"
+ " 'path' = '%s')\n"
+ "]]",
+ showCreateString(
+ "t_all_as",
+ "user_id BIGINT",
+ "item_id BIGINT",
+ "behavior STRING",
+ "dt STRING NOT NULL",
+ "hh STRING NOT NULL"),
new Path(warehousePath, "default.db/t_all_as")));
- List<Row> resultAll = spark.sql("SELECT * FROM default.t_all_as").collectAsList();
+ List<Row> resultAll = spark.sql("SELECT * FROM t_all_as").collectAsList();
assertThat(resultAll.stream().map(Row::toString))
.containsExactlyInAnyOrder("[1,2,bbb,2020-01-01,12]");
}
@Test
public void testCreateTableWithNullablePk() {
- spark.sql("USE paimon");
spark.sql(
- "CREATE TABLE default.PkTable (\n"
+ "CREATE TABLE PkTable (\n"
+ "a BIGINT,\n"
- + "b STRING) USING paimon\n"
- + "COMMENT 'table comment'\n"
+ + "b STRING)\n"
+ "TBLPROPERTIES ('primary-key' = 'a')");
Path tablePath = new Path(warehousePath, "default.db/PkTable");
TableSchema schema = FileStoreTableFactory.create(LocalFileIO.create(), tablePath).schema();
@@ -293,31 +277,58 @@ public class SparkReadITCase extends SparkReadTestBase {
@Test
public void testDescribeTable() {
- spark.sql("USE paimon");
spark.sql(
- "CREATE TABLE default.PartitionedTable (\n"
+ "CREATE TABLE PartitionedTable (\n"
+ "a BIGINT,\n"
- + "b STRING) USING paimon\n"
- + "COMMENT 'table comment'\n"
- + "PARTITIONED BY (a)\n"
- + "TBLPROPERTIES ('foo' = 'bar')");
- assertThat(spark.sql("DESCRIBE default.PartitionedTable").collectAsList().toString())
+ + "b STRING)\n"
+ + "PARTITIONED BY (a)\n");
+ assertThat(spark.sql("DESCRIBE PartitionedTable").collectAsList().toString())
.isEqualTo("[[a,bigint,], [b,string,], [,,], [# Partitioning,,], [Part 0,a,]]");
}
+ @Test
+ public void testShowCreateTable() {
+ spark.sql(
+ "CREATE TABLE tbl (\n"
+ + " a INT COMMENT 'a comment',\n"
+ + " b STRING\n"
+ + ") USING paimon\n"
+ + "PARTITIONED BY (b)\n"
+ + "COMMENT 'tbl comment'\n"
+ + "TBLPROPERTIES (\n"
+ + " 'primary-key' = 'a,b',\n"
+ + " 'k1' = 'v1'\n"
+ + ")");
+
+ assertThat(spark.sql("SHOW CREATE TABLE tbl").collectAsList().toString())
+ .isEqualTo(
+ String.format(
+ "[[%s"
+ + "USING paimon\n"
+ + "PARTITIONED BY (b)\n"
+ + "COMMENT 'tbl comment'\n"
+ + "TBLPROPERTIES (\n"
+ + " 'k1' = 'v1',\n"
+ + " 'path' = '%s')\n]]",
+ showCreateString(
+ "tbl",
+ "a INT NOT NULL COMMENT 'a comment'",
+ "b STRING NOT NULL"),
+ new Path(warehousePath, "default.db/tbl")));
+ }
+
@Test
public void testShowTableProperties() {
- spark.sql("USE paimon");
spark.sql(
- "CREATE TABLE default.tbl (\n"
- + "a INT\n"
- + ") TBLPROPERTIES (\n"
- + "'k1' = 'v1',\n"
- + "'k2' = 'v2'"
+ "CREATE TABLE tbl (\n"
+ + " a INT)\n"
+ + "TBLPROPERTIES (\n"
+ + " 'k1' = 'v1',\n"
+ + " 'k2' = 'v2'\n"
+ ")");
assertThat(
- spark.sql("SHOW TBLPROPERTIES default.tbl").collectAsList().stream()
+ spark.sql("SHOW TBLPROPERTIES tbl").collectAsList().stream()
.map(Row::toString)
.collect(Collectors.toList()))
.contains("[k1,v1]", "[k2,v2]");
@@ -325,16 +336,14 @@ public class SparkReadITCase extends SparkReadTestBase {
@Test
public void testCreateTableWithInvalidPk() {
- spark.sql("USE paimon");
assertThatThrownBy(
() ->
spark.sql(
- "CREATE TABLE default.PartitionedPkTable (\n"
+ "CREATE TABLE PartitionedPkTable (\n"
+ "a BIGINT,\n"
+ "b STRING,\n"
- + "c DOUBLE) USING paimon\n"
- + "COMMENT 'table comment'\n"
- + "PARTITIONED BY (b)"
+ + "c DOUBLE)\n"
+ + "PARTITIONED BY (b)\n"
+ "TBLPROPERTIES ('primary-key' = 'a')"))
.isInstanceOf(IllegalStateException.class)
.hasMessageContaining(
@@ -352,7 +361,7 @@ public class SparkReadITCase extends SparkReadTestBase {
+ "b STRING,\n"
+ "c DOUBLE) USING paimon\n"
+ "COMMENT 'table comment'\n"
- + "PARTITIONED BY (b)"
+ + "PARTITIONED BY (b)\n"
+ "TBLPROPERTIES ('primary-key' = 'd')"))
.isInstanceOf(IllegalStateException.class)
.hasMessageContaining(
@@ -361,23 +370,21 @@ public class SparkReadITCase extends SparkReadTestBase {
@Test
public void testCreateTableWithNonexistentPartition() {
- spark.sql("USE paimon");
assertThatThrownBy(
() ->
spark.sql(
- "CREATE TABLE default.PartitionedPkTable (\n"
+ "CREATE TABLE PartitionedPkTable (\n"
+ "a BIGINT,\n"
+ "b STRING,\n"
- + "c DOUBLE) USING paimon\n"
- + "COMMENT 'table comment'\n"
- + "PARTITIONED BY (d)"
+ + "c DOUBLE)\n"
+ + "PARTITIONED BY (d)\n"
+ "TBLPROPERTIES ('primary-key' = 'a')"))
.isInstanceOf(AnalysisException.class)
.hasMessageContaining("Couldn't find column d");
}
@Test
- public void testCreateAndDropTable() throws Exception {
+ public void testCreateAndDropTable() {
innerTest("MyTable1", true, true, false);
innerTest("MyTable2", true, false, false);
innerTest("MyTable3", false, false, false);
@@ -388,7 +395,6 @@ public class SparkReadITCase extends SparkReadTestBase {
private void innerTest(
String tableName, boolean hasPk, boolean partitioned, boolean appendOnly) {
- spark.sql("USE paimon");
String ddlTemplate =
"CREATE TABLE default.%s (\n"
+ "order_id BIGINT NOT NULL comment 'order_id',\n"
@@ -396,7 +402,7 @@ public class SparkReadITCase extends SparkReadTestBase {
+ "coupon_info ARRAY<STRING> NOT NULL COMMENT 'coupon_info',\n"
+ "order_amount DOUBLE NOT NULL COMMENT 'order_amount',\n"
+ "dt STRING NOT NULL COMMENT 'dt',\n"
- + "hh STRING NOT NULL COMMENT 'hh') USING paimon\n"
+ + "hh STRING NOT NULL COMMENT 'hh')\n"
+ "COMMENT 'table comment'\n"
+ "%s\n"
+ "TBLPROPERTIES (%s)";
@@ -454,7 +460,7 @@ public class SparkReadITCase extends SparkReadTestBase {
() ->
spark.sql(
String.format(
- "ALTER TABLE default.%s UNSET TBLPROPERTIES('primary-key')",
+ "ALTER TABLE %s UNSET TBLPROPERTIES('primary-key')",
tableName)))
.isInstanceOf(UnsupportedOperationException.class)
.hasMessageContaining("Alter primary key is not supported");
@@ -462,11 +468,12 @@ public class SparkReadITCase extends SparkReadTestBase {
() ->
spark.sql(
String.format(
- "ALTER TABLE default.%s SET TBLPROPERTIES('write-mode' = 'append-only')",
+ "ALTER TABLE %s SET TBLPROPERTIES('write-mode' = 'append-only')",
tableName)))
- .getRootCause()
- .isInstanceOf(UnsupportedOperationException.class)
- .hasMessageContaining("Change 'write-mode' is not supported yet");
+ .satisfies(
+ AssertionUtils.anyCauseMatches(
+ UnsupportedOperationException.class,
+ "Change 'write-mode' is not supported yet"));
Path tablePath = new Path(warehousePath, String.format("default.db/%s", tableName));
TableSchema schema = FileStoreTableFactory.create(LocalFileIO.create(), tablePath).schema();
@@ -520,7 +527,7 @@ public class SparkReadITCase extends SparkReadTestBase {
.toString())
.isEqualTo(String.format("[[default,%s]]", tableName));
- spark.sql(String.format("DROP TABLE paimon.default.%s", tableName));
+ spark.sql(String.format("DROP TABLE %s", tableName));
assertThat(
spark.sql(
@@ -538,7 +545,6 @@ public class SparkReadITCase extends SparkReadTestBase {
@Test
public void testCreateAndDropNamespace() {
// create namespace
- spark.sql("USE paimon");
spark.sql("CREATE NAMESPACE bar");
assertThatThrownBy(() -> spark.sql("CREATE NAMESPACE bar"))
@@ -631,19 +637,12 @@ public class SparkReadITCase extends SparkReadTestBase {
@Test
public void testCreateNestedField() {
spark.sql(
- "CREATE TABLE paimon.default.nested_table ( a INT, b STRUCT<b1: STRUCT<b11: INT, b12 INT>, b2 BIGINT>)");
- assertThat(
- spark.sql("SHOW CREATE TABLE paimon.default.nested_table")
- .collectAsList()
- .toString())
- .isEqualTo(
- String.format(
- "[[CREATE TABLE nested_table (\n"
- + " `a` INT,\n"
- + " `b` STRUCT<`b1`: STRUCT<`b11`: INT, `b12`: INT>, `b2`: BIGINT>)\n"
- + "TBLPROPERTIES(\n"
- + " 'path' = '%s')\n"
- + "]]",
- new Path(warehousePath, "default.db/nested_table")));
+ "CREATE TABLE nested_table ( a INT, b STRUCT<b1: STRUCT<b11: INT, b12 INT>, b2 BIGINT>)");
+ assertThat(spark.sql("SHOW CREATE TABLE nested_table").collectAsList().toString())
+ .contains(
+ showCreateString(
+ "nested_table",
+ "a INT",
+ "b STRUCT<b1: STRUCT<b11: INT, b12: INT>, b2: BIGINT>"));
}
}
diff --git a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkReadTestBase.java b/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkReadTestBase.java
index 36454f6f2..0519dfb5b 100644
--- a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkReadTestBase.java
+++ b/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkReadTestBase.java
@@ -40,8 +40,10 @@ import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.io.TempDir;
+import java.util.Arrays;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
import static org.assertj.core.api.Assertions.assertThat;
@@ -186,4 +188,17 @@ public abstract class SparkReadTestBase {
"INSERT INTO paimon.default.%s VALUES %s",
tableName, StringUtils.join(values, ",")));
}
+
+ // return of 'SHOW CREATE TABLE' excluding TBLPROPERTIES
+ protected String showCreateString(String table, String... fieldSpec) {
+ return String.format(
+ "CREATE TABLE %s (%s)\n",
+ table,
+ Arrays.stream(fieldSpec).map(s -> "\n " + s).collect(Collectors.joining(",")));
+ }
+
+ // default schema
+ protected String defaultShowCreateString(String table) {
+ return showCreateString(table, "a INT NOT NULL", "b BIGINT", "c STRING");
+ }
}
diff --git a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkSchemaEvolutionITCase.java b/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkSchemaEvolutionITCase.java
index bba270ba3..de6387991 100644
--- a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkSchemaEvolutionITCase.java
+++ b/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkSchemaEvolutionITCase.java
@@ -18,6 +18,8 @@
package org.apache.paimon.spark;
+import org.apache.paimon.testutils.assertj.AssertionUtils;
+
import org.apache.commons.lang3.StringUtils;
import org.apache.spark.sql.AnalysisException;
import org.apache.spark.sql.Dataset;
@@ -37,21 +39,18 @@ public class SparkSchemaEvolutionITCase extends SparkReadTestBase {
@Test
public void testSetAndRemoveOption() {
- spark.sql("ALTER TABLE paimon.default.t1 SET TBLPROPERTIES('xyc' 'unknown1')");
+ spark.sql("ALTER TABLE t1 SET TBLPROPERTIES('xyc' 'unknown1')");
Map<String, String> options =
- rowsToMap(spark.sql("SELECT * FROM paimon.default.`t1$options`").collectAsList());
+ rowsToMap(spark.sql("SELECT * FROM `t1$options`").collectAsList());
assertThat(options).containsEntry("xyc", "unknown1");
- spark.sql("ALTER TABLE paimon.default.t1 UNSET TBLPROPERTIES('xyc')");
+ spark.sql("ALTER TABLE t1 UNSET TBLPROPERTIES('xyc')");
- options = rowsToMap(spark.sql("SELECT * FROM paimon.default.`t1$options`").collectAsList());
+ options = rowsToMap(spark.sql("SELECT * FROM `t1$options`").collectAsList());
assertThat(options).doesNotContainKey("xyc");
- assertThatThrownBy(
- () ->
- spark.sql(
- "ALTER TABLE paimon.default.t1 SET TBLPROPERTIES('primary-key' = 'a')"))
+ assertThatThrownBy(() -> spark.sql("ALTER TABLE t1 SET TBLPROPERTIES('primary-key' = 'a')"))
.isInstanceOf(UnsupportedOperationException.class)
.hasMessageContaining("Alter primary key is not supported");
}
@@ -68,38 +67,36 @@ public class SparkSchemaEvolutionITCase extends SparkReadTestBase {
createTable("testAddColumn");
writeTable("testAddColumn", "(1, 2L, '1')", "(5, 6L, '3')");
- spark.sql("ALTER TABLE paimon.default.testAddColumn ADD COLUMN d STRING");
-
- Dataset<Row> table = spark.table("paimon.default.testAddColumn");
- List<Row> results = table.collectAsList();
- assertThat(results.toString()).isEqualTo("[[1,2,1,null], [5,6,3,null]]");
+ List<Row> beforeAdd = spark.sql("SHOW CREATE TABLE testAddColumn").collectAsList();
+ assertThat(beforeAdd.toString()).contains(defaultShowCreateString("testAddColumn"));
- results = table.select("a", "c").collectAsList();
- assertThat(results.toString()).isEqualTo("[[1,1], [5,3]]");
+ spark.sql("ALTER TABLE testAddColumn ADD COLUMN d STRING");
- results = table.groupBy().sum("b").collectAsList();
- assertThat(results.toString()).isEqualTo("[[8]]");
+ List<Row> afterAdd = spark.sql("SHOW CREATE TABLE testAddColumn").collectAsList();
+ assertThat(afterAdd.toString())
+ .contains(
+ showCreateString(
+ "testAddColumn",
+ "a INT NOT NULL",
+ "b BIGINT",
+ "c STRING",
+ "d STRING"));
+
+ assertThat(spark.table("testAddColumn").collectAsList().toString())
+ .isEqualTo("[[1,2,1,null], [5,6,3,null]]");
}
@Test
public void testAddNotNullColumn() {
createTable("testAddNotNullColumn");
- List<Row> beforeAdd =
- spark.sql("SHOW CREATE TABLE paimon.default.testAddNotNullColumn").collectAsList();
- assertThat(beforeAdd.toString())
- .isEqualTo(
- "[[CREATE TABLE testAddNotNullColumn (\n"
- + " `a` INT NOT NULL,\n"
- + " `b` BIGINT,\n"
- + " `c` STRING)\n"
- + buildTableProperties("default.db/testAddNotNullColumn")
- + "]]");
+ List<Row> beforeAdd = spark.sql("SHOW CREATE TABLE testAddNotNullColumn").collectAsList();
+ assertThat(beforeAdd.toString()).contains(defaultShowCreateString("testAddNotNullColumn"));
assertThatThrownBy(
() ->
spark.sql(
- "ALTER TABLE paimon.default.testAddNotNullColumn ADD COLUMNS (d INT NOT NULL)"))
+ "ALTER TABLE testAddNotNullColumn ADD COLUMNS (d INT NOT NULL)"))
.isInstanceOf(RuntimeException.class)
.hasMessage(
"java.lang.IllegalArgumentException: ADD COLUMN cannot specify NOT NULL.");
@@ -113,22 +110,24 @@ public class SparkSchemaEvolutionITCase extends SparkReadTestBase {
spark.sql("SHOW CREATE TABLE testAddColumnPositionFirst").collectAsList();
assertThat(result.toString())
.contains(
- "CREATE TABLE testAddColumnPositionFirst (\n"
- + " `d` INT,\n"
- + " `a` INT NOT NULL,\n"
- + " `b` BIGINT,\n"
- + " `c` STRING)");
+ showCreateString(
+ "testAddColumnPositionFirst",
+ "d INT",
+ "a INT NOT NULL",
+ "b BIGINT",
+ "c STRING"));
createTable("testAddColumnPositionAfter");
spark.sql("ALTER TABLE testAddColumnPositionAfter ADD COLUMN d INT AFTER b");
result = spark.sql("SHOW CREATE TABLE testAddColumnPositionAfter").collectAsList();
assertThat(result.toString())
.contains(
- "CREATE TABLE testAddColumnPositionAfter (\n"
- + " `a` INT NOT NULL,\n"
- + " `b` BIGINT,\n"
- + " `d` INT,\n"
- + " `c` STRING)");
+ showCreateString(
+ "testAddColumnPositionAfter",
+ "a INT NOT NULL",
+ "b BIGINT",
+ "d INT",
+ "c STRING"));
}
@Test
@@ -147,15 +146,8 @@ public class SparkSchemaEvolutionITCase extends SparkReadTestBase {
assertThat(tables.stream().map(Row::toString))
.containsExactlyInAnyOrder("[default,t2,false]", "[default,t3,false]");
- List<Row> afterRename = spark.sql("SHOW CREATE TABLE paimon.default.t3").collectAsList();
- assertThat(afterRename.toString())
- .isEqualTo(
- "[[CREATE TABLE t3 (\n"
- + " `a` INT NOT NULL,\n"
- + " `b` BIGINT,\n"
- + " `c` STRING)\n"
- + buildTableProperties("default.db/t3")
- + "]]");
+ List<Row> afterRename = spark.sql("SHOW CREATE TABLE t3").collectAsList();
+ assertThat(afterRename.toString()).contains(defaultShowCreateString("t3"));
List<Row> data = spark.sql("SELECT * FROM t3").collectAsList();
assertThat(data.toString()).isEqualTo("[[1,2,1], [5,6,3]]");
@@ -166,78 +158,45 @@ public class SparkSchemaEvolutionITCase extends SparkReadTestBase {
createTable("testRenameColumn");
writeTable("testRenameColumn", "(1, 2L, '1')", "(5, 6L, '3')");
- List<Row> beforeRename =
- spark.sql("SHOW CREATE TABLE paimon.default.testRenameColumn").collectAsList();
- assertThat(beforeRename.toString())
- .isEqualTo(
- "[[CREATE TABLE testRenameColumn (\n"
- + " `a` INT NOT NULL,\n"
- + " `b` BIGINT,\n"
- + " `c` STRING)\n"
- + buildTableProperties("default.db/testRenameColumn")
- + "]]");
- Dataset<Row> table1 = spark.table("paimon.default.testRenameColumn");
- List<Row> results = table1.select("a", "c").collectAsList();
+ List<Row> beforeRename = spark.sql("SHOW CREATE TABLE testRenameColumn").collectAsList();
+ assertThat(beforeRename.toString()).contains(defaultShowCreateString("testRenameColumn"));
+ List<Row> results = spark.table("testRenameColumn").select("a", "c").collectAsList();
assertThat(results.toString()).isEqualTo("[[1,1], [5,3]]");
// Rename "a" to "aa"
- spark.sql("ALTER TABLE paimon.default.testRenameColumn RENAME COLUMN a to aa");
- List<Row> afterRename =
- spark.sql("SHOW CREATE TABLE paimon.default.testRenameColumn").collectAsList();
+ spark.sql("ALTER TABLE testRenameColumn RENAME COLUMN a to aa");
+ List<Row> afterRename = spark.sql("SHOW CREATE TABLE testRenameColumn").collectAsList();
assertThat(afterRename.toString())
- .isEqualTo(
- "[[CREATE TABLE testRenameColumn (\n"
- + " `aa` INT NOT NULL,\n"
- + " `b` BIGINT,\n"
- + " `c` STRING)\n"
- + buildTableProperties("default.db/testRenameColumn")
- + "]]");
- Dataset<Row> table2 = spark.table("paimon.default.testRenameColumn");
- results = table2.select("aa", "c").collectAsList();
+ .contains(
+ showCreateString(
+ "testRenameColumn", "aa INT NOT NULL", "b BIGINT", "c STRING"));
+ Dataset<Row> table = spark.table("testRenameColumn");
+ results = table.select("aa", "c").collectAsList();
assertThat(results.toString()).isEqualTo("[[1,1], [5,3]]");
- assertThatThrownBy(() -> table2.select("a", "c").collectAsList())
+ assertThatThrownBy(() -> table.select("a", "c"))
.isInstanceOf(AnalysisException.class)
- .hasMessageContaining("cannot resolve '%s' given input columns", "a");
+ .hasMessageContaining(
+ "Column 'a' does not exist. Did you mean one of the following? "
+ + "[paimon.default.testRenameColumn.b, paimon.default.testRenameColumn.c, paimon.default.testRenameColumn.aa]");
}
@Test
public void testRenamePartitionKey() {
- spark.sql("USE paimon");
spark.sql(
- "CREATE TABLE default.testRenamePartitionKey (\n"
+ "CREATE TABLE testRenamePartitionKey (\n"
+ "a BIGINT,\n"
- + "b STRING) USING paimon\n"
- + "COMMENT 'table comment'\n"
- + "PARTITIONED BY (a)\n"
- + "TBLPROPERTIES ('foo' = 'bar')");
-
+ + "b STRING)\n"
+ + "PARTITIONED BY (a)\n");
List<Row> beforeRename =
- spark.sql("SHOW CREATE TABLE paimon.default.testRenamePartitionKey")
- .collectAsList();
+ spark.sql("SHOW CREATE TABLE testRenamePartitionKey").collectAsList();
assertThat(beforeRename.toString())
- .isEqualTo(
- "[[CREATE TABLE testRenamePartitionKey (\n"
- + " `a` BIGINT,\n"
- + " `b` STRING)\n"
- + "USING paimon\n"
- + "PARTITIONED BY (a)\n"
- + "COMMENT 'table comment'\n"
- + "TBLPROPERTIES(\n"
- + " 'foo' = 'bar',\n"
- + String.format(
- " 'path' = '%s/%s')\n",
- warehousePath, "default.db/testRenamePartitionKey")
- + "]]");
+ .contains(showCreateString("testRenamePartitionKey", "a BIGINT", "b STRING"));
assertThatThrownBy(
- () ->
- spark.sql(
- "ALTER TABLE paimon.default.testRenamePartitionKey RENAME COLUMN a to aa"))
+ () -> spark.sql("ALTER TABLE testRenamePartitionKey RENAME COLUMN a to aa"))
.isInstanceOf(RuntimeException.class)
.hasMessage(
- String.format(
- "java.lang.UnsupportedOperationException: Cannot drop/rename partition key[%s]",
- "a"));
+ "java.lang.UnsupportedOperationException: Cannot drop/rename partition key[a]");
}
@Test
@@ -245,31 +204,16 @@ public class SparkSchemaEvolutionITCase extends SparkReadTestBase {
createTable("testDropSingleColumn");
writeTable("testDropSingleColumn", "(1, 2L, '1')", "(5, 6L, '3')");
- List<Row> beforeDrop =
- spark.sql("SHOW CREATE TABLE paimon.default.testDropSingleColumn").collectAsList();
- assertThat(beforeDrop.toString())
- .isEqualTo(
- "[[CREATE TABLE testDropSingleColumn (\n"
- + " `a` INT NOT NULL,\n"
- + " `b` BIGINT,\n"
- + " `c` STRING)\n"
- + buildTableProperties("default.db/testDropSingleColumn")
- + "]]");
-
- spark.sql("ALTER TABLE paimon.default.testDropSingleColumn DROP COLUMN a");
-
- List<Row> afterDrop =
- spark.sql("SHOW CREATE TABLE paimon.default.testDropSingleColumn").collectAsList();
+ List<Row> beforeDrop = spark.sql("SHOW CREATE TABLE testDropSingleColumn").collectAsList();
+ assertThat(beforeDrop.toString()).contains(defaultShowCreateString("testDropSingleColumn"));
+
+ spark.sql("ALTER TABLE testDropSingleColumn DROP COLUMN a");
+
+ List<Row> afterDrop = spark.sql("SHOW CREATE TABLE testDropSingleColumn").collectAsList();
assertThat(afterDrop.toString())
- .isEqualTo(
- "[[CREATE TABLE testDropSingleColumn (\n"
- + " `b` BIGINT,\n"
- + " `c` STRING)\n"
- + buildTableProperties("default.db/testDropSingleColumn")
- + "]]");
-
- Dataset<Row> table = spark.table("paimon.default.testDropSingleColumn");
- List<Row> results = table.collectAsList();
+ .contains(showCreateString("testDropSingleColumn", "b BIGINT", "c STRING"));
+
+ List<Row> results = spark.table("testDropSingleColumn").collectAsList();
assertThat(results.toString()).isEqualTo("[[2,1], [6,3]]");
}
@@ -277,161 +221,104 @@ public class SparkSchemaEvolutionITCase extends SparkReadTestBase {
public void testDropColumns() {
createTable("testDropColumns");
- List<Row> beforeRename =
- spark.sql("SHOW CREATE TABLE paimon.default.testDropColumns").collectAsList();
- assertThat(beforeRename.toString())
- .isEqualTo(
- "[[CREATE TABLE testDropColumns (\n"
- + " `a` INT NOT NULL,\n"
- + " `b` BIGINT,\n"
- + " `c` STRING)\n"
- + buildTableProperties("default.db/testDropColumns")
- + "]]");
-
- spark.sql("ALTER TABLE paimon.default.testDropColumns DROP COLUMNS a, b");
-
- List<Row> afterRename =
- spark.sql("SHOW CREATE TABLE paimon.default.testDropColumns").collectAsList();
- assertThat(afterRename.toString())
- .isEqualTo(
- "[[CREATE TABLE testDropColumns (\n"
- + " `c` STRING)\n"
- + buildTableProperties("default.db/testDropColumns")
- + "]]");
+ List<Row> beforeDrop = spark.sql("SHOW CREATE TABLE testDropColumns").collectAsList();
+ assertThat(beforeDrop.toString()).contains(defaultShowCreateString("testDropColumns"));
+
+ spark.sql("ALTER TABLE testDropColumns DROP COLUMNS a, b");
+
+ List<Row> afterDrop = spark.sql("SHOW CREATE TABLE testDropColumns").collectAsList();
+ assertThat(afterDrop.toString()).contains(showCreateString("testDropColumns", "c STRING"));
}
@Test
public void testDropPartitionKey() {
- spark.sql("USE paimon");
spark.sql(
- "CREATE TABLE default.testDropPartitionKey (\n"
+ "CREATE TABLE testDropPartitionKey (\n"
+ "a BIGINT,\n"
- + "b STRING) USING paimon\n"
- + "COMMENT 'table comment'\n"
- + "PARTITIONED BY (a)\n"
- + "TBLPROPERTIES ('foo' = 'bar')");
+ + "b STRING) \n"
+ + "PARTITIONED BY (a)");
- List<Row> beforeRename =
- spark.sql("SHOW CREATE TABLE paimon.default.testDropPartitionKey").collectAsList();
- assertThat(beforeRename.toString())
- .isEqualTo(
- "[[CREATE TABLE testDropPartitionKey (\n"
- + " `a` BIGINT,\n"
- + " `b` STRING)\n"
- + "USING paimon\n"
- + "PARTITIONED BY (a)\n"
- + "COMMENT 'table comment'\n"
- + "TBLPROPERTIES(\n"
- + " 'foo' = 'bar',\n"
- + String.format(
- " 'path' = '%s/%s')\n",
- warehousePath, "default.db/testDropPartitionKey")
- + "]]");
+ List<Row> beforeDrop = spark.sql("SHOW CREATE TABLE testDropPartitionKey").collectAsList();
+ assertThat(beforeDrop.toString())
+ .contains(showCreateString("testDropPartitionKey", "a BIGINT", "b STRING"));
- assertThatThrownBy(
- () ->
- spark.sql(
- "ALTER TABLE paimon.default.testDropPartitionKey DROP COLUMN a"))
+ assertThatThrownBy(() -> spark.sql("ALTER TABLE testDropPartitionKey DROP COLUMN a"))
.isInstanceOf(RuntimeException.class)
.hasMessage(
- String.format(
- "java.lang.UnsupportedOperationException: Cannot drop/rename partition key[%s]",
- "a"));
+ "java.lang.UnsupportedOperationException: Cannot drop/rename partition key[a]");
}
@Test
public void testDropPrimaryKey() {
- spark.sql("USE paimon");
spark.sql(
- "CREATE TABLE default.testDropPrimaryKey (\n"
+ "CREATE TABLE testDropPrimaryKey (\n"
+ "a BIGINT,\n"
- + "b STRING) USING paimon\n"
- + "COMMENT 'table comment'\n"
+ + "b STRING)\n"
+ "PARTITIONED BY (a)\n"
+ "TBLPROPERTIES ('primary-key' = 'a, b')");
- List<Row> beforeRename =
- spark.sql("SHOW CREATE TABLE paimon.default.testDropPrimaryKey").collectAsList();
- assertThat(beforeRename.toString())
- .isEqualTo(
- "[[CREATE TABLE testDropPrimaryKey (\n"
- + " `a` BIGINT NOT NULL,\n"
- + " `b` STRING NOT NULL)\n"
- + "USING paimon\n"
- + "PARTITIONED BY (a)\n"
- + "COMMENT 'table comment'\n"
- + "TBLPROPERTIES(\n"
- + String.format(
- " 'path' = '%s/%s')\n",
- warehousePath, "default.db/testDropPrimaryKey")
- + "]]");
+ List<Row> beforeDrop = spark.sql("SHOW CREATE TABLE testDropPrimaryKey").collectAsList();
+ assertThat(beforeDrop.toString())
+ .contains(
+ showCreateString(
+ "testDropPrimaryKey", "a BIGINT NOT NULL", "b STRING NOT NULL"));
- assertThatThrownBy(
- () ->
- spark.sql(
- "ALTER TABLE paimon.default.testDropPrimaryKey DROP COLUMN b"))
+ assertThatThrownBy(() -> spark.sql("ALTER TABLE testDropPrimaryKey DROP COLUMN b"))
.isInstanceOf(RuntimeException.class)
.hasMessage(
- String.format(
- "java.lang.UnsupportedOperationException: Cannot drop/rename primary key[%s]",
- "b"));
+ "java.lang.UnsupportedOperationException: Cannot drop/rename primary key[b]");
}
@Test
public void testUpdateColumnPosition() {
// move first
- spark.sql("CREATE TABLE tableFirst (a INT , b BIGINT, c STRING)");
+ createTable("tableFirst");
spark.sql("ALTER TABLE tableFirst ALTER COLUMN b FIRST");
List<Row> result = spark.sql("SHOW CREATE TABLE tableFirst").collectAsList();
assertThat(result.toString())
- .contains(
- "CREATE TABLE tableFirst (\n"
- + " `b` BIGINT,\n"
- + " `a` INT,\n"
- + " `c` STRING)");
+ .contains(showCreateString("tableFirst", "b BIGINT", "a INT NOT NULL", "c STRING"));
// move after
- spark.sql("CREATE TABLE tableAfter (a INT, b BIGINT, c STRING)");
+ createTable("tableAfter");
spark.sql("ALTER TABLE tableAfter ALTER COLUMN c AFTER a");
result = spark.sql("SHOW CREATE TABLE tableAfter").collectAsList();
assertThat(result.toString())
- .contains(
- "CREATE TABLE tableAfter (\n"
- + " `a` INT,\n"
- + " `c` STRING,\n"
- + " `b` BIGINT)");
+ .contains(showCreateString("tableAfter", "a INT NOT NULL", "c STRING", "b BIGINT"));
- spark.sql("CREATE TABLE tableAfter1 (a INT, b BIGINT, c STRING, d DOUBLE)");
+ spark.sql("CREATE TABLE tableAfter1 (a INT NOT NULL, b BIGINT, c STRING, d DOUBLE)");
spark.sql("ALTER TABLE tableAfter1 ALTER COLUMN b AFTER c");
result = spark.sql("SHOW CREATE TABLE tableAfter1").collectAsList();
assertThat(result.toString())
.contains(
- "CREATE TABLE tableAfter1 (\n"
- + " `a` INT,\n"
- + " `c` STRING,\n"
- + " `b` BIGINT,\n"
- + " `d` DOUBLE)");
-
- // move self for first test
- spark.sql("CREATE TABLE tableFirstSelf (a INT , b BIGINT, c STRING)");
+ showCreateString(
+ "tableAfter1",
+ "a INT NOT NULL",
+ "c STRING",
+ "b BIGINT",
+ "d DOUBLE"));
+
+ // move self to first test
+ createTable("tableFirstSelf");
assertThatThrownBy(() -> spark.sql("ALTER TABLE tableFirstSelf ALTER COLUMN a FIRST"))
- .getRootCause()
- .isInstanceOf(UnsupportedOperationException.class)
- .hasMessageContaining("Cannot move itself for column a");
+ .satisfies(
+ AssertionUtils.anyCauseMatches(
+ UnsupportedOperationException.class,
+ "Cannot move itself for column a"));
- // move self for after test
- spark.sql("CREATE TABLE tableAfterSelf (a INT , b BIGINT, c STRING)");
+ // move self to after test
+ createTable("tableAfterSelf");
assertThatThrownBy(() -> spark.sql("ALTER TABLE tableAfterSelf ALTER COLUMN b AFTER b"))
- .getRootCause()
- .isInstanceOf(UnsupportedOperationException.class)
- .hasMessageContaining("Cannot move itself for column b");
+ .satisfies(
+ AssertionUtils.anyCauseMatches(
+ UnsupportedOperationException.class,
+ "Cannot move itself for column b"));
// missing column
- spark.sql("CREATE TABLE tableMissing (a INT , b BIGINT, c STRING)");
+ createTable("tableMissing");
assertThatThrownBy(() -> spark.sql("ALTER TABLE tableMissing ALTER COLUMN d FIRST"))
.hasMessageContaining("Missing field d in table paimon.default.tableMissing");
- spark.sql("CREATE TABLE tableMissingAfter (a INT , b BIGINT, c STRING)");
+ createTable("tableMissingAfter");
assertThatThrownBy(() -> spark.sql("ALTER TABLE tableMissingAfter ALTER COLUMN a AFTER d"))
.hasMessageContaining("Missing field d in table paimon.default.tableMissingAfter");
}
@@ -441,19 +328,18 @@ public class SparkSchemaEvolutionITCase extends SparkReadTestBase {
createTable("testAlterColumnType");
writeTable("testAlterColumnType", "(1, 2L, '1')", "(5, 6L, '3')");
- spark.sql("ALTER TABLE paimon.default.testAlterColumnType ALTER COLUMN b TYPE DOUBLE");
- assertThat(
- spark.table("paimon.default.testAlterColumnType").collectAsList().stream()
- .map(Row::toString)
- .collect(Collectors.toList()))
- .containsExactlyInAnyOrder("[1,2.0,1]", "[5,6.0,3]");
+ List<Row> beforeAlter = spark.sql("SHOW CREATE TABLE testAlterColumnType").collectAsList();
+ assertThat(beforeAlter.toString()).contains(defaultShowCreateString("testAlterColumnType"));
- spark.sql("ALTER TABLE paimon.default.testAlterColumnType DROP COLUMNS a");
- assertThat(
- spark.table("paimon.default.testAlterColumnType").collectAsList().stream()
- .map(Row::toString)
- .collect(Collectors.toList()))
- .containsExactlyInAnyOrder("[2.0,1]", "[6.0,3]");
+ spark.sql("ALTER TABLE testAlterColumnType ALTER COLUMN b TYPE DOUBLE");
+ assertThat(spark.table("testAlterColumnType").collectAsList().toString())
+ .isEqualTo("[[1,2.0,1], [5,6.0,3]]");
+
+ List<Row> afterAlter = spark.sql("SHOW CREATE TABLE testAlterColumnType").collectAsList();
+ assertThat(afterAlter.toString())
+ .contains(
+ showCreateString(
+ "testAlterColumnType", "a INT NOT NULL", "b DOUBLE", "c STRING"));
}
@Test
@@ -469,39 +355,38 @@ public class SparkSchemaEvolutionITCase extends SparkReadTestBase {
.isFalse();
// note: for Spark, it is illegal to change nullable column to non-nullable
- spark.sql("ALTER TABLE paimon.default.t2 ALTER COLUMN a DROP NOT NULL");
+ spark.sql("ALTER TABLE t2 ALTER COLUMN a DROP NOT NULL");
assertThat(fieldIsNullable(getField(schema2(), 0))).isTrue();
- spark.sql("ALTER TABLE paimon.default.t2 ALTER COLUMN b DROP NOT NULL");
+ spark.sql("ALTER TABLE t2 ALTER COLUMN b DROP NOT NULL");
assertThat(fieldIsNullable(getField(schema2(), 1))).isTrue();
- spark.sql("ALTER TABLE paimon.default.t2 ALTER COLUMN c DROP NOT NULL");
+ spark.sql("ALTER TABLE t2 ALTER COLUMN c DROP NOT NULL");
assertThat(fieldIsNullable(getField(schema2(), 2))).isTrue();
- spark.sql("ALTER TABLE paimon.default.t2 ALTER COLUMN c.c1 DROP NOT NULL");
+ spark.sql("ALTER TABLE t2 ALTER COLUMN c.c1 DROP NOT NULL");
assertThat(fieldIsNullable(getNestedField(getField(schema2(), 2), 0))).isTrue();
- spark.sql("ALTER TABLE paimon.default.t2 ALTER COLUMN c.c1.c12 DROP NOT NULL");
+ spark.sql("ALTER TABLE t2 ALTER COLUMN c.c1.c12 DROP NOT NULL");
assertThat(fieldIsNullable(getNestedField(getNestedField(getField(schema2(), 2), 0), 1)))
.isTrue();
}
@Test
public void testAlterPrimaryKeyNullability() {
- spark.sql("USE paimon");
spark.sql(
- "CREATE TABLE default.testAlterPkNullability (\n"
+ "CREATE TABLE testAlterPkNullability (\n"
+ "a BIGINT,\n"
- + "b STRING) USING paimon\n"
- + "COMMENT 'table comment'\n"
+ + "b STRING)\n"
+ "TBLPROPERTIES ('primary-key' = 'a')");
assertThatThrownBy(
() ->
spark.sql(
- "ALTER TABLE default.testAlterPkNullability ALTER COLUMN a DROP NOT NULL"))
- .getRootCause()
- .isInstanceOf(UnsupportedOperationException.class)
- .hasMessageContaining("Cannot change nullability of primary key");
+ "ALTER TABLE testAlterPkNullability ALTER COLUMN a DROP NOT NULL"))
+ .satisfies(
+ AssertionUtils.anyCauseMatches(
+ UnsupportedOperationException.class,
+ "Cannot change nullability of primary key"));
}
@Test
@@ -509,10 +394,10 @@ public class SparkSchemaEvolutionITCase extends SparkReadTestBase {
createTable("testAlterTableColumnComment");
assertThat(getField(schema1(), 0).description()).isNull();
- spark.sql("ALTER TABLE paimon.default.t1 ALTER COLUMN a COMMENT 'a new comment'");
+ spark.sql("ALTER TABLE t1 ALTER COLUMN a COMMENT 'a new comment'");
assertThat(getField(schema1(), 0).description()).isEqualTo("a new comment");
- spark.sql("ALTER TABLE paimon.default.t1 ALTER COLUMN a COMMENT 'yet another comment'");
+ spark.sql("ALTER TABLE t1 ALTER COLUMN a COMMENT 'yet another comment'");
assertThat(getField(schema1(), 0).description()).isEqualTo("yet another comment");
assertThat(getField(schema2(), 2).description()).isEqualTo("comment about c");
@@ -524,12 +409,11 @@ public class SparkSchemaEvolutionITCase extends SparkReadTestBase {
assertThat(getNestedField(getNestedField(getField(schema2(), 2), 0), 1).description())
.isNull();
- spark.sql(
- "ALTER TABLE paimon.default.t2 ALTER COLUMN c COMMENT 'yet another comment about c'");
- spark.sql("ALTER TABLE paimon.default.t2 ALTER COLUMN c.c1 COMMENT 'a nested type'");
- spark.sql("ALTER TABLE paimon.default.t2 ALTER COLUMN c.c2 COMMENT 'a bigint type'");
- spark.sql("ALTER TABLE paimon.default.t2 ALTER COLUMN c.c1.c11 COMMENT 'a double type'");
- spark.sql("ALTER TABLE paimon.default.t2 ALTER COLUMN c.c1.c12 COMMENT 'a boolean array'");
+ spark.sql("ALTER TABLE t2 ALTER COLUMN c COMMENT 'yet another comment about c'");
+ spark.sql("ALTER TABLE t2 ALTER COLUMN c.c1 COMMENT 'a nested type'");
+ spark.sql("ALTER TABLE t2 ALTER COLUMN c.c2 COMMENT 'a bigint type'");
+ spark.sql("ALTER TABLE t2 ALTER COLUMN c.c1.c11 COMMENT 'a double type'");
+ spark.sql("ALTER TABLE t2 ALTER COLUMN c.c1.c12 COMMENT 'a boolean array'");
assertThat(getField(schema2(), 2).description()).isEqualTo("yet another comment about c");
assertThat(getNestedField(getField(schema2(), 2), 0).description())
@@ -563,7 +447,7 @@ public class SparkSchemaEvolutionITCase extends SparkReadTestBase {
public void testSchemaEvolution() {
// Create table with fields [a, b, c] and insert 2 records
spark.sql(
- "CREATE TABLE paimon.default.testSchemaEvolution(\n"
+ "CREATE TABLE testSchemaEvolution(\n"
+ "a INT NOT NULL, \n"
+ "b BIGINT NOT NULL, \n"
+ "c VARCHAR(10), \n"
@@ -572,20 +456,17 @@ public class SparkSchemaEvolutionITCase extends SparkReadTestBase {
+ "f INT NOT NULL) \n"
+ "TBLPROPERTIES ('file.format'='avro')");
writeTable("testSchemaEvolution", "(1, 2L, '3', 4, 5, 6)", "(7, 8L, '9', 10, 11, 12)");
- assertThat(
- spark.table("paimon.default.testSchemaEvolution").collectAsList().stream()
- .map(Row::toString)
- .collect(Collectors.toList()))
+
+ Dataset<Row> table = spark.table("testSchemaEvolution");
+ assertThat(table.collectAsList().stream().map(Row::toString).collect(Collectors.toList()))
.containsExactlyInAnyOrder("[1,2,3,4,5,6]", "[7,8,9,10,11,12]");
assertThat(
- spark.table("paimon.default.testSchemaEvolution").select("a", "c", "e")
- .collectAsList().stream()
+ table.select("a", "c", "e").collectAsList().stream()
.map(Row::toString)
.collect(Collectors.toList()))
.containsExactlyInAnyOrder("[1,3,5]", "[7,9,11]");
assertThat(
- spark.table("paimon.default.testSchemaEvolution").filter("a>1")
- .collectAsList().stream()
+ table.filter("a>1").collectAsList().stream()
.map(Row::toString)
.collect(Collectors.toList()))
.containsExactlyInAnyOrder("[7,8,9,10,11,12]");
@@ -593,50 +474,45 @@ public class SparkSchemaEvolutionITCase extends SparkReadTestBase {
// Rename (a, int)->(aa, bigint), c->a, b->c, (d, int)->(b, bigint), (f, int)->(ff, float),
// the fields are [(1, aa, bigint), (2, c, bigint), (3, a, string), (4, b, bigint), (5, e,
// int), (6, ff, float)] and insert 2 records
- spark.sql("ALTER TABLE paimon.default.testSchemaEvolution RENAME COLUMN a to aa");
- spark.sql("ALTER TABLE paimon.default.testSchemaEvolution ALTER COLUMN aa TYPE bigint");
- spark.sql("ALTER TABLE paimon.default.testSchemaEvolution RENAME COLUMN c to a");
- spark.sql("ALTER TABLE paimon.default.testSchemaEvolution RENAME COLUMN b to c");
- spark.sql("ALTER TABLE paimon.default.testSchemaEvolution RENAME COLUMN d to b");
- spark.sql("ALTER TABLE paimon.default.testSchemaEvolution ALTER COLUMN b TYPE bigint");
- spark.sql("ALTER TABLE paimon.default.testSchemaEvolution RENAME COLUMN f to ff");
- spark.sql("ALTER TABLE paimon.default.testSchemaEvolution ALTER COLUMN ff TYPE float");
+ spark.sql("ALTER TABLE testSchemaEvolution RENAME COLUMN a to aa");
+ spark.sql("ALTER TABLE testSchemaEvolution ALTER COLUMN aa TYPE bigint");
+ spark.sql("ALTER TABLE testSchemaEvolution RENAME COLUMN c to a");
+ spark.sql("ALTER TABLE testSchemaEvolution RENAME COLUMN b to c");
+ spark.sql("ALTER TABLE testSchemaEvolution RENAME COLUMN d to b");
+ spark.sql("ALTER TABLE testSchemaEvolution ALTER COLUMN b TYPE bigint");
+ spark.sql("ALTER TABLE testSchemaEvolution RENAME COLUMN f to ff");
+ spark.sql("ALTER TABLE testSchemaEvolution ALTER COLUMN ff TYPE float");
writeTable(
"testSchemaEvolution",
"(13L, 14L, '15', 16L, 17, 18.0F)",
"(19L, 20L, '21', 22L, 23, 24.0F)");
- assertThat(
- spark.table("paimon.default.testSchemaEvolution").collectAsList().stream()
- .map(Row::toString)
- .collect(Collectors.toList()))
+
+ table = spark.table("testSchemaEvolution");
+ assertThat(table.collectAsList().stream().map(Row::toString).collect(Collectors.toList()))
.containsExactlyInAnyOrder(
"[1,2,3,4,5,6.0]",
"[7,8,9,10,11,12.0]",
"[13,14,15,16,17,18.0]",
"[19,20,21,22,23,24.0]");
assertThat(
- spark.table("paimon.default.testSchemaEvolution").select("aa", "b", "ff")
- .collectAsList().stream()
+ table.select("aa", "b", "ff").collectAsList().stream()
.map(Row::toString)
.collect(Collectors.toList()))
.containsExactlyInAnyOrder(
"[1,4,6.0]", "[7,10,12.0]", "[13,16,18.0]", "[19,22,24.0]");
assertThat(
- spark.table("paimon.default.testSchemaEvolution").select("aa", "a", "ff")
- .filter("b>10L").collectAsList().stream()
+ table.select("aa", "a", "ff").filter("b>10L").collectAsList().stream()
.map(Row::toString)
.collect(Collectors.toList()))
.containsExactlyInAnyOrder("[13,15,18.0]", "[19,21,24.0]");
// Drop fields aa, c, e, the fields are [(3, a, string), (4, b, bigint), (6, ff, float)] and
// insert 2 records
- spark.sql("ALTER TABLE paimon.default.testSchemaEvolution DROP COLUMNS aa, c, e");
+ spark.sql("ALTER TABLE testSchemaEvolution DROP COLUMNS aa, c, e");
writeTable("testSchemaEvolution", "('25', 26L, 27.0F)", "('28', 29L, 30.0)");
- assertThat(
- spark.table("paimon.default.testSchemaEvolution").collectAsList().stream()
- .map(Row::toString)
- .collect(Collectors.toList()))
+ table = spark.table("testSchemaEvolution");
+ assertThat(table.collectAsList().stream().map(Row::toString).collect(Collectors.toList()))
.containsExactlyInAnyOrder(
"[3,4,6.0]",
"[9,10,12.0]",
@@ -645,25 +521,21 @@ public class SparkSchemaEvolutionITCase extends SparkReadTestBase {
"[25,26,27.0]",
"[28,29,30.0]");
assertThat(
- spark.table("paimon.default.testSchemaEvolution").select("a", "ff")
- .filter("b>10L").collectAsList().stream()
+ table.select("a", "ff").filter("b>10L").collectAsList().stream()
.map(Row::toString)
.collect(Collectors.toList()))
.containsExactlyInAnyOrder("[15,18.0]", "[21,24.0]", "[25,27.0]", "[28,30.0]");
// Add new fields d, c, e, the fields are [(3, a, string), (4, b, bigint), (6, ff, float),
// (7, d, int), (8, c, int), (9, e, int)] insert 2 records
- spark.sql(
- "ALTER TABLE paimon.default.testSchemaEvolution ADD COLUMNS (d INT, c INT, e INT)");
+ spark.sql("ALTER TABLE testSchemaEvolution ADD COLUMNS (d INT, c INT, e INT)");
writeTable(
"testSchemaEvolution",
"('31', 32L, 33.0F, 34, 35, 36)",
"('37', 38L, 39.0F, 40, 41, 42)");
- assertThat(
- spark.table("paimon.default.testSchemaEvolution").collectAsList().stream()
- .map(Row::toString)
- .collect(Collectors.toList()))
+ table = spark.table("testSchemaEvolution");
+ assertThat(table.collectAsList().stream().map(Row::toString).collect(Collectors.toList()))
.containsExactlyInAnyOrder(
"[3,4,6.0,null,null,null]",
"[9,10,12.0,null,null,null]",
@@ -674,8 +546,7 @@ public class SparkSchemaEvolutionITCase extends SparkReadTestBase {
"[31,32,33.0,34,35,36]",
"[37,38,39.0,40,41,42]");
assertThat(
- spark.table("paimon.default.testSchemaEvolution").filter("b>10")
- .collectAsList().stream()
+ table.filter("b>10").collectAsList().stream()
.map(Row::toString)
.collect(Collectors.toList()))
.containsExactlyInAnyOrder(
@@ -686,8 +557,7 @@ public class SparkSchemaEvolutionITCase extends SparkReadTestBase {
"[31,32,33.0,34,35,36]",
"[37,38,39.0,40,41,42]");
assertThat(
- spark.table("paimon.default.testSchemaEvolution")
- .select("e", "a", "ff", "d", "b").filter("b>10L").collectAsList()
+ table.select("e", "a", "ff", "d", "b").filter("b>10L").collectAsList()
.stream()
.map(Row::toString)
.collect(Collectors.toList()))
@@ -699,8 +569,7 @@ public class SparkSchemaEvolutionITCase extends SparkReadTestBase {
"[36,31,33.0,34,32]",
"[42,37,39.0,40,38]");
assertThat(
- spark.table("paimon.default.testSchemaEvolution")
- .select("e", "a", "ff", "d", "b").filter("b>10 and e is not null")
+ table.select("e", "a", "ff", "d", "b").filter("b>10 and e is not null")
.collectAsList().stream()
.map(Row::toString)
.collect(Collectors.toList()))
@@ -714,28 +583,25 @@ public class SparkSchemaEvolutionITCase extends SparkReadTestBase {
writeTable("testFilesTable", "(1, 2L, '3')", "(4, 5L, '6')");
assertThat(
getFieldStatsList(
- spark.sql("SELECT * FROM paimon.default.`testFilesTable$files`")
- .collectAsList()))
+ spark.sql("SELECT * FROM `testFilesTable$files`").collectAsList()))
.containsExactlyInAnyOrder("{a=0, b=0, c=0},{a=1, b=2, c=3},{a=4, b=5, c=6}");
// Add new fields d, e, f and the fields are [a, b, c, d, e, f], insert 2 records
- spark.sql("ALTER TABLE paimon.default.testFilesTable ADD COLUMNS (d INT, e INT, f INT)");
+ spark.sql("ALTER TABLE testFilesTable ADD COLUMNS (d INT, e INT, f INT)");
writeTable("testFilesTable", "(7, 8L, '9', 10, 11, 12)", "(13, 14L, '15', 16, 17, 18)");
assertThat(
getFieldStatsList(
- spark.sql("SELECT * FROM paimon.default.`testFilesTable$files`")
- .collectAsList()))
+ spark.sql("SELECT * FROM `testFilesTable$files`").collectAsList()))
.containsExactlyInAnyOrder(
"{a=0, b=0, c=0, d=2, e=2, f=2},{a=1, b=2, c=3, d=null, e=null, f=null},{a=4, b=5, c=6, d=null, e=null, f=null}",
"{a=0, b=0, c=0, d=0, e=0, f=0},{a=7, b=8, c=15, d=10, e=11, f=12},{a=13, b=14, c=9, d=16, e=17, f=18}");
// Drop fields c, e and the fields are [a, b, d, f], insert 2 records
- spark.sql("ALTER TABLE paimon.default.testFilesTable DROP COLUMNS c, e");
+ spark.sql("ALTER TABLE testFilesTable DROP COLUMNS c, e");
writeTable("testFilesTable", "(19, 20L, 21, 22)", "(23, 24L, 25, 26)");
assertThat(
getFieldStatsList(
- spark.sql("SELECT * FROM paimon.default.`testFilesTable$files`")
- .collectAsList()))
+ spark.sql("SELECT * FROM `testFilesTable$files`").collectAsList()))
.containsExactlyInAnyOrder(
"{a=0, b=0, d=2, f=2},{a=1, b=2, d=null, f=null},{a=4, b=5, d=null, f=null}",
"{a=0, b=0, d=0, f=0},{a=7, b=8, d=10, f=12},{a=13, b=14, d=16, f=18}",
@@ -753,10 +619,4 @@ public class SparkSchemaEvolutionITCase extends SparkReadTestBase {
","))
.collect(Collectors.toList());
}
-
- private String buildTableProperties(String tablePath) {
- return String.format(
- "TBLPROPERTIES(\n" + " 'file.format' = 'avro',\n" + " 'path' = '%s/%s')\n",
- warehousePath, tablePath);
- }
}