You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2023/01/17 11:16:13 UTC
[flink-table-store] branch master updated: [FLINK-30701] Remove SimpleTableTestHelper and use sql in SparkITCase
This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git
The following commit(s) were added to refs/heads/master by this push:
new 18123526 [FLINK-30701] Remove SimpleTableTestHelper and use sql in SparkITCase
18123526 is described below
commit 181235267919fdae0e99e43d0c08de8b8e06a275
Author: shammon <zj...@gmail.com>
AuthorDate: Tue Jan 17 19:16:07 2023 +0800
[FLINK-30701] Remove SimpleTableTestHelper and use sql in SparkITCase
This closes #488
---
.../table/store/spark/SimpleTableTestHelper.java | 89 ---------------
.../flink/table/store/spark/SparkReadITCase.java | 45 ++------
.../flink/table/store/spark/SparkReadTestBase.java | 121 ++++++++++-----------
.../store/spark/SparkSchemaEvolutionITCase.java | 87 +++++++--------
4 files changed, 108 insertions(+), 234 deletions(-)
diff --git a/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SimpleTableTestHelper.java b/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SimpleTableTestHelper.java
deleted file mode 100644
index 2c88d2b1..00000000
--- a/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SimpleTableTestHelper.java
+++ /dev/null
@@ -1,89 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.store.spark;
-
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.table.store.CoreOptions;
-import org.apache.flink.table.store.data.InternalRow;
-import org.apache.flink.table.store.file.schema.SchemaManager;
-import org.apache.flink.table.store.file.schema.UpdateSchema;
-import org.apache.flink.table.store.table.FileStoreTable;
-import org.apache.flink.table.store.table.FileStoreTableFactory;
-import org.apache.flink.table.store.table.sink.TableCommit;
-import org.apache.flink.table.store.table.sink.TableWrite;
-import org.apache.flink.table.store.types.RowType;
-
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-/** A simple table test helper to write and commit. */
-public class SimpleTableTestHelper {
-
- private TableWrite writer;
- private TableCommit commit;
-
- private long commitIdentifier;
-
- public SimpleTableTestHelper(Path path) throws Exception {
- Map<String, String> options = new HashMap<>();
- // orc is shaded, can not find shaded classes in ide
- options.put(CoreOptions.FILE_FORMAT.key(), "avro");
- createTable(path, options);
- }
-
- public SimpleTableTestHelper(Path path, RowType rowType) throws Exception {
- this(path, rowType, Collections.emptyList(), Collections.emptyList());
- }
-
- public SimpleTableTestHelper(
- Path path, RowType rowType, List<String> partitionKeys, List<String> primaryKeys)
- throws Exception {
- Map<String, String> options = new HashMap<>();
- // orc is shaded, can not find shaded classes in ide
- options.put(CoreOptions.FILE_FORMAT.key(), "avro");
- new SchemaManager(path)
- .commitNewVersion(
- new UpdateSchema(rowType, partitionKeys, primaryKeys, options, ""));
- createTable(path, options);
- }
-
- private void createTable(Path path, Map<String, String> options) {
- Configuration conf = Configuration.fromMap(options);
- conf.setString("path", path.toString());
- FileStoreTable table = FileStoreTableFactory.create(conf);
-
- String commitUser = "user";
- this.writer = table.newWrite(commitUser);
- this.commit = table.newCommit(commitUser);
-
- this.commitIdentifier = 0;
- }
-
- public void write(InternalRow row) throws Exception {
- writer.write(row);
- }
-
- public void commit() throws Exception {
- commit.commit(commitIdentifier, writer.prepareCommit(true, commitIdentifier));
- commitIdentifier++;
- }
-}
diff --git a/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkReadITCase.java b/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkReadITCase.java
index bab448f8..3639bd86 100644
--- a/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkReadITCase.java
+++ b/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkReadITCase.java
@@ -19,9 +19,6 @@
package org.apache.flink.table.store.spark;
import org.apache.flink.core.fs.Path;
-import org.apache.flink.table.store.data.BinaryString;
-import org.apache.flink.table.store.data.GenericArray;
-import org.apache.flink.table.store.data.GenericRow;
import org.apache.flink.table.store.file.schema.TableSchema;
import org.apache.flink.table.store.table.FileStoreTableFactory;
import org.apache.flink.table.store.types.ArrayType;
@@ -41,7 +38,6 @@ import org.junit.jupiter.api.Test;
import java.io.File;
import java.util.Arrays;
-import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -56,18 +52,16 @@ public class SparkReadITCase extends SparkReadTestBase {
@Test
public void testNormal() {
- innerTestSimpleType(spark.read().format("tablestore").load(tablePath1.toString()));
+ innerTestSimpleType(spark.table("tablestore.default.t1"));
- innerTestNestedType(spark.read().format("tablestore").load(tablePath2.toString()));
+ innerTestNestedType(spark.table("tablestore.default.t2"));
}
@Test
public void testFilterPushDown() {
- innerTestSimpleTypeFilterPushDown(
- spark.read().format("tablestore").load(tablePath1.toString()));
+ innerTestSimpleTypeFilterPushDown(spark.table("tablestore.default.t1"));
- innerTestNestedTypeFilterPushDown(
- spark.read().format("tablestore").load(tablePath2.toString()));
+ innerTestNestedTypeFilterPushDown(spark.table("tablestore.default.t2"));
}
@Test
@@ -227,8 +221,8 @@ public class SparkReadITCase extends SparkReadTestBase {
innerTest("MyTable6", false, true, true);
}
- private void innerTest(String tableName, boolean hasPk, boolean partitioned, boolean appendOnly)
- throws Exception {
+ private void innerTest(
+ String tableName, boolean hasPk, boolean partitioned, boolean appendOnly) {
spark.sql("USE tablestore");
String ddlTemplate =
"CREATE TABLE default.%s (\n"
@@ -243,6 +237,7 @@ public class SparkReadITCase extends SparkReadTestBase {
+ "TBLPROPERTIES (%s)";
Map<String, String> tableProperties = new HashMap<>();
tableProperties.put("foo", "bar");
+ tableProperties.put("file.format", "avro");
List<String> columns =
Arrays.asList("order_id", "buyer_id", "coupon_info", "order_amount", "dt", "hh");
List<DataType> types =
@@ -339,29 +334,9 @@ public class SparkReadITCase extends SparkReadTestBase {
assertThat(schema.comment()).isEqualTo("table comment");
- SimpleTableTestHelper testHelper =
- new SimpleTableTestHelper(
- tablePath,
- schema.logicalRowType(),
- partitioned ? Arrays.asList("dt", "hh") : Collections.emptyList(),
- hasPk
- ? partitioned
- ? Arrays.asList("order_id", "dt", "hh")
- : Collections.singletonList("order_id")
- : Collections.emptyList());
- testHelper.write(
- GenericRow.of(
- 1L,
- 10L,
- new GenericArray(
- new BinaryString[] {
- BinaryString.fromString("loyalty_discount"),
- BinaryString.fromString("shipping_discount")
- }),
- 199.0d,
- BinaryString.fromString("2022-07-20"),
- BinaryString.fromString("12")));
- testHelper.commit();
+ writeTable(
+ tableName,
+ "(1L, 10L, array('loyalty_discount', 'shipping_discount'), 199.0d, '2022-07-20', '12')");
Dataset<Row> dataset = spark.read().format("tablestore").load(tablePath.toString());
assertThat(dataset.select("order_id", "buyer_id", "dt").collectAsList().toString())
diff --git a/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkReadTestBase.java b/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkReadTestBase.java
index 5b2ef480..970860ce 100644
--- a/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkReadTestBase.java
+++ b/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkReadTestBase.java
@@ -20,10 +20,12 @@ package org.apache.flink.table.store.spark;
import org.apache.flink.core.fs.Path;
import org.apache.flink.table.store.data.BinaryString;
-import org.apache.flink.table.store.data.GenericArray;
import org.apache.flink.table.store.data.GenericRow;
import org.apache.flink.table.store.file.schema.TableSchema;
+import org.apache.flink.table.store.table.FileStoreTable;
import org.apache.flink.table.store.table.FileStoreTableFactory;
+import org.apache.flink.table.store.table.sink.TableCommit;
+import org.apache.flink.table.store.table.sink.TableWrite;
import org.apache.flink.table.store.types.ArrayType;
import org.apache.flink.table.store.types.BigIntType;
import org.apache.flink.table.store.types.BooleanType;
@@ -35,6 +37,7 @@ import org.apache.flink.table.store.types.RowType;
import org.apache.flink.table.store.types.VarCharType;
import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang3.StringUtils;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
@@ -46,11 +49,14 @@ import java.io.IOException;
import java.nio.file.Files;
import java.util.Arrays;
import java.util.List;
+import java.util.concurrent.atomic.AtomicLong;
import static org.assertj.core.api.Assertions.assertThat;
/** Base tests for spark read. */
public abstract class SparkReadTestBase {
+ private static final String COMMIT_USER = "user";
+ private static final AtomicLong COMMIT_IDENTIFIER = new AtomicLong(0);
private static File warehouse = null;
@@ -70,74 +76,34 @@ public abstract class SparkReadTestBase {
spark = SparkSession.builder().master("local[2]").getOrCreate();
spark.conf().set("spark.sql.catalog.tablestore", SparkCatalog.class.getName());
spark.conf().set("spark.sql.catalog.tablestore.warehouse", warehousePath.toString());
+ spark.sql("USE tablestore");
+ spark.sql("CREATE NAMESPACE default");
// flink sink
tablePath1 = new Path(warehousePath, "default.db/t1");
- SimpleTableTestHelper testHelper1 = new SimpleTableTestHelper(tablePath1, rowType1());
- testHelper1.write(GenericRow.of(1, 2L, BinaryString.fromString("1")));
- testHelper1.write(GenericRow.of(3, 4L, BinaryString.fromString("2")));
- testHelper1.write(GenericRow.of(5, 6L, BinaryString.fromString("3")));
- testHelper1.write(GenericRow.ofKind(RowKind.DELETE, 3, 4L, BinaryString.fromString("2")));
- testHelper1.commit();
+ createTable("t1");
+ writeTable(
+ "t1",
+ GenericRow.of(1, 2L, BinaryString.fromString("1")),
+ GenericRow.of(3, 4L, BinaryString.fromString("2")),
+ GenericRow.of(5, 6L, BinaryString.fromString("3")),
+ GenericRow.ofKind(RowKind.DELETE, 3, 4L, BinaryString.fromString("2")));
// a int not null
// b array<varchar> not null
// c row<row<double, array<boolean> not null> not null, bigint> not null
tablePath2 = new Path(warehousePath, "default.db/t2");
- SimpleTableTestHelper testHelper2 = new SimpleTableTestHelper(tablePath2, rowType2());
- testHelper2.write(
- GenericRow.of(
- 1,
- new GenericArray(
- new BinaryString[] {
- BinaryString.fromString("AAA"), BinaryString.fromString("BBB")
- }),
- GenericRow.of(
- GenericRow.of(1.0d, new GenericArray(new Boolean[] {null})), 1L)));
- testHelper2.write(
- GenericRow.of(
- 2,
- new GenericArray(
- new BinaryString[] {
- BinaryString.fromString("CCC"), BinaryString.fromString("DDD")
- }),
- GenericRow.of(
- GenericRow.of(null, new GenericArray(new Boolean[] {true})),
- null)));
- testHelper2.commit();
-
- testHelper2.write(
- GenericRow.of(
- 3,
- new GenericArray(new BinaryString[] {null, null}),
- GenericRow.of(
- GenericRow.of(2.0d, new GenericArray(new boolean[] {true, false})),
- 2L)));
-
- testHelper2.write(
- GenericRow.of(
- 4,
- new GenericArray(new BinaryString[] {null, BinaryString.fromString("EEE")}),
- GenericRow.of(
- GenericRow.of(
- 3.0d, new GenericArray(new Boolean[] {true, false, true})),
- 3L)));
- testHelper2.commit();
- }
-
- protected static SimpleTableTestHelper createTestHelper(Path tablePath) throws Exception {
- RowType rowType =
- new RowType(
- Arrays.asList(
- new DataField(0, "a", new IntType(false)),
- new DataField(1, "b", new BigIntType()),
- new DataField(2, "c", new VarCharType())));
- return new SimpleTableTestHelper(tablePath, rowType);
- }
-
- protected static SimpleTableTestHelper createTestHelperWithoutDDL(Path tablePath)
- throws Exception {
- return new SimpleTableTestHelper(tablePath);
+ spark.sql(
+ "CREATE TABLE tablestore.default.t2 (a INT NOT NULL COMMENT 'comment about a', b ARRAY<STRING> NOT NULL, c STRUCT<c1: STRUCT<c11: DOUBLE, c12: ARRAY<BOOLEAN> NOT NULL> NOT NULL, c2: BIGINT COMMENT 'comment about c2'> NOT NULL COMMENT 'comment about c') TBLPROPERTIES ('file.format'='avro')");
+ // createTable1("t2");
+ writeTable(
+ "t2",
+ "(1, array('AAA', 'BBB'), struct(struct(1.0d, array(null)), 1L))",
+ "(2, array('CCC', 'DDD'), struct(struct(null, array(true)), null))");
+ writeTable(
+ "t2",
+ "(3, array(null, null), struct(struct(2.0d, array(true, false)), 2L))",
+ "(4, array(null, 'EEE'), struct(struct(3.0d, array(true, false, true)), 3L))");
}
private static RowType rowType1() {
@@ -229,4 +195,37 @@ public abstract class SparkReadTestBase {
}
throw new IllegalArgumentException();
}
+
+ /**
+ * Create table with fields: a->int not null, b->bigint, c->string. orc is shaded, can not find
+ * shaded classes in ide, we use avro here.
+ *
+ * @param tableName the given table name
+ */
+ protected static void createTable(String tableName) {
+ spark.sql(
+ String.format(
+ "CREATE TABLE tablestore.default.%s (a INT NOT NULL, b BIGINT, c STRING) TBLPROPERTIES ('file.format'='avro')",
+ tableName));
+ }
+
+ private static void writeTable(String tableName, GenericRow... rows) throws Exception {
+ FileStoreTable fileStoreTable =
+ FileStoreTableFactory.create(
+ new Path(warehousePath, String.format("default.db/%s", tableName)));
+ TableWrite writer = fileStoreTable.newWrite(COMMIT_USER);
+ TableCommit commit = fileStoreTable.newCommit(COMMIT_USER);
+ for (GenericRow row : rows) {
+ writer.write(row);
+ }
+ long commitIdentifier = COMMIT_IDENTIFIER.getAndIncrement();
+ commit.commit(commitIdentifier, writer.prepareCommit(true, commitIdentifier));
+ }
+
+ protected static void writeTable(String tableName, String... values) {
+ spark.sql(
+ String.format(
+ "INSERT INTO tablestore.default.%s VALUES %s",
+ tableName, StringUtils.join(values, ",")));
+ }
}
diff --git a/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkSchemaEvolutionITCase.java b/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkSchemaEvolutionITCase.java
index 62e613eb..4630df6f 100644
--- a/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkSchemaEvolutionITCase.java
+++ b/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkSchemaEvolutionITCase.java
@@ -18,9 +18,6 @@
package org.apache.flink.table.store.spark;
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.table.store.data.BinaryString;
-import org.apache.flink.table.store.data.GenericRow;
import org.apache.flink.table.store.data.InternalRow;
import org.apache.spark.sql.AnalysisException;
@@ -29,6 +26,7 @@ import org.apache.spark.sql.Row;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -42,12 +40,16 @@ public class SparkSchemaEvolutionITCase extends SparkReadTestBase {
public void testSetAndRemoveOption() {
spark.sql("ALTER TABLE tablestore.default.t1 SET TBLPROPERTIES('xyc' 'unknown1')");
- Map<String, String> options = schema1().options();
+ Map<String, String> options =
+ rowsToMap(
+ spark.sql("SELECT * FROM tablestore.default.`t1$options`").collectAsList());
assertThat(options).containsEntry("xyc", "unknown1");
spark.sql("ALTER TABLE tablestore.default.t1 UNSET TBLPROPERTIES('xyc')");
- options = schema1().options();
+ options =
+ rowsToMap(
+ spark.sql("SELECT * FROM tablestore.default.`t1$options`").collectAsList());
assertThat(options).doesNotContainKey("xyc");
assertThatThrownBy(
@@ -58,13 +60,17 @@ public class SparkSchemaEvolutionITCase extends SparkReadTestBase {
.hasMessageContaining("Alter primary key is not supported");
}
+ private Map<String, String> rowsToMap(List<Row> rows) {
+ Map<String, String> map = new HashMap<>();
+ rows.forEach(r -> map.put(r.getString(0), r.getString(1)));
+
+ return map;
+ }
+
@Test
- public void testAddColumn() throws Exception {
- Path tablePath = new Path(warehousePath, "default.db/testAddColumn");
- SimpleTableTestHelper testHelper1 = createTestHelper(tablePath);
- testHelper1.write(GenericRow.of(1, 2L, BinaryString.fromString("1")));
- testHelper1.write(GenericRow.of(5, 6L, BinaryString.fromString("3")));
- testHelper1.commit();
+ public void testAddColumn() {
+ createTable("testAddColumn");
+ writeTable("testAddColumn", "(1, 2L, '1')", "(5, 6L, '3')");
spark.sql("ALTER TABLE tablestore.default.testAddColumn ADD COLUMN d STRING");
@@ -80,9 +86,8 @@ public class SparkSchemaEvolutionITCase extends SparkReadTestBase {
}
@Test
- public void testAddNotNullColumn() throws Exception {
- Path tablePath = new Path(warehousePath, "default.db/testAddNotNullColumn");
- createTestHelper(tablePath);
+ public void testAddNotNullColumn() {
+ createTable("testAddNotNullColumn");
List<Row> beforeAdd =
spark.sql("SHOW CREATE TABLE tablestore.default.testAddNotNullColumn")
@@ -106,12 +111,9 @@ public class SparkSchemaEvolutionITCase extends SparkReadTestBase {
}
@Test
- public void testRenameColumn() throws Exception {
- Path tablePath = new Path(warehousePath, "default.db/testRenameColumn");
- SimpleTableTestHelper testHelper1 = createTestHelper(tablePath);
- testHelper1.write(GenericRow.of(1, 2L, BinaryString.fromString("1")));
- testHelper1.write(GenericRow.of(5, 6L, BinaryString.fromString("3")));
- testHelper1.commit();
+ public void testRenameColumn() {
+ createTable("testRenameColumn");
+ writeTable("testRenameColumn", "(1, 2L, '1')", "(5, 6L, '3')");
List<Row> beforeRename =
spark.sql("SHOW CREATE TABLE tablestore.default.testRenameColumn").collectAsList();
@@ -189,11 +191,8 @@ public class SparkSchemaEvolutionITCase extends SparkReadTestBase {
@Test
public void testDropSingleColumn() throws Exception {
- Path tablePath = new Path(warehousePath, "default.db/testDropSingleColumn");
- SimpleTableTestHelper testHelper = createTestHelper(tablePath);
- testHelper.write(GenericRow.of(1, 2L, BinaryString.fromString("1")));
- testHelper.write(GenericRow.of(5, 6L, BinaryString.fromString("3")));
- testHelper.commit();
+ createTable("testDropSingleColumn");
+ writeTable("testDropSingleColumn", "(1, 2L, '1')", "(5, 6L, '3')");
List<Row> beforeDrop =
spark.sql("SHOW CREATE TABLE tablestore.default.testDropSingleColumn")
@@ -227,8 +226,7 @@ public class SparkSchemaEvolutionITCase extends SparkReadTestBase {
@Test
public void testDropColumns() throws Exception {
- Path tablePath = new Path(warehousePath, "default.db/testDropColumns");
- createTestHelper(tablePath);
+ createTable("testDropColumns");
List<Row> beforeRename =
spark.sql("SHOW CREATE TABLE tablestore.default.testDropColumns").collectAsList();
@@ -343,11 +341,8 @@ public class SparkSchemaEvolutionITCase extends SparkReadTestBase {
@Disabled
@Test
public void testAlterColumnType() throws Exception {
- Path tablePath = new Path(warehousePath, "default.db/testAlterColumnType");
- SimpleTableTestHelper testHelper1 = createTestHelper(tablePath);
- testHelper1.write(GenericRow.of(1, 2L, BinaryString.fromString("1")));
- testHelper1.write(GenericRow.of(5, 6L, BinaryString.fromString("3")));
- testHelper1.commit();
+ createTable("testAlterColumnType");
+ writeTable("testAlterColumnType", "(1, 2L, '1')", "(5, 6L, '3')");
spark.sql("ALTER TABLE tablestore.default.testAlterColumnType ALTER COLUMN a TYPE BIGINT");
innerTestSimpleType(spark.table("tablestore.default.testAlterColumnType"));
@@ -403,6 +398,12 @@ public class SparkSchemaEvolutionITCase extends SparkReadTestBase {
@Test
public void testAlterTableColumnComment() {
+ createTable("testAlterTableColumnComment");
+ Row row =
+ spark.sql("SHOW CREATE TABLE tablestore.default.`testAlterTableColumnComment`")
+ .collectAsList()
+ .get(0);
+ System.out.println(row);
assertThat(getField(schema1(), 0).description()).isNull();
spark.sql("ALTER TABLE tablestore.default.t1 ALTER COLUMN a COMMENT 'a new comment'");
@@ -455,11 +456,8 @@ public class SparkSchemaEvolutionITCase extends SparkReadTestBase {
@Test
public void testSchemaEvolution() throws Exception {
// Create table with fields [a, b, c] and insert 2 records
- Path tablePath = new Path(warehousePath, "default.db/testSchemaEvolution");
- SimpleTableTestHelper testHelper1 = createTestHelper(tablePath);
- testHelper1.write(GenericRow.of(1, 2L, BinaryString.fromString("3")));
- testHelper1.write(GenericRow.of(4, 5L, BinaryString.fromString("6")));
- testHelper1.commit();
+ createTable("testSchemaEvolution");
+ writeTable("testSchemaEvolution", "(1, 2L, '3')", "(4, 5L, '6')");
assertThat(spark.table("tablestore.default.testSchemaEvolution").collectAsList().toString())
.isEqualTo("[[1,2,3], [4,5,6]]");
assertThat(
@@ -479,10 +477,7 @@ public class SparkSchemaEvolutionITCase extends SparkReadTestBase {
spark.sql("ALTER TABLE tablestore.default.testSchemaEvolution RENAME COLUMN a to aa");
spark.sql("ALTER TABLE tablestore.default.testSchemaEvolution RENAME COLUMN c to a");
spark.sql("ALTER TABLE tablestore.default.testSchemaEvolution RENAME COLUMN b to c");
- SimpleTableTestHelper testHelper2 = createTestHelperWithoutDDL(tablePath);
- testHelper2.write(GenericRow.of(7, 8L, BinaryString.fromString("9")));
- testHelper2.write(GenericRow.of(10, 11L, BinaryString.fromString("12")));
- testHelper2.commit();
+ writeTable("testSchemaEvolution", "(7, 8L, '9')", "(10, 11L, '12')");
assertThat(spark.table("tablestore.default.testSchemaEvolution").collectAsList().toString())
.isEqualTo("[[1,2,3], [4,5,6], [7,8,9], [10,11,12]]");
assertThat(
@@ -501,10 +496,7 @@ public class SparkSchemaEvolutionITCase extends SparkReadTestBase {
// Drop fields "aa", "c" and the fields are [a], insert 2 records
spark.sql("ALTER TABLE tablestore.default.testSchemaEvolution DROP COLUMNS aa, c");
- SimpleTableTestHelper testHelper3 = createTestHelperWithoutDDL(tablePath);
- testHelper3.write(GenericRow.of(BinaryString.fromString("13")));
- testHelper3.write(GenericRow.of(BinaryString.fromString("14")));
- testHelper3.commit();
+ writeTable("testSchemaEvolution", "('13')", "('14')");
assertThat(spark.table("tablestore.default.testSchemaEvolution").collectAsList().toString())
.isEqualTo("[[3], [6], [9], [12], [13], [14]]");
assertThat(
@@ -518,10 +510,7 @@ public class SparkSchemaEvolutionITCase extends SparkReadTestBase {
// Add new fields "d", "c", "b" and the fields are [a, d, c, b], insert 2 records
spark.sql(
"ALTER TABLE tablestore.default.testSchemaEvolution ADD COLUMNS (d INT, c INT, b INT)");
- SimpleTableTestHelper testHelper4 = createTestHelperWithoutDDL(tablePath);
- testHelper4.write(GenericRow.of(BinaryString.fromString("15"), 16, 17, 18));
- testHelper4.write(GenericRow.of(BinaryString.fromString("19"), 20, 21, 22));
- testHelper4.commit();
+ writeTable("testSchemaEvolution", "('15', 16, 17, 18)", "('19', 20, 21, 22)");
assertThat(spark.table("tablestore.default.testSchemaEvolution").collectAsList().toString())
.isEqualTo(
"[[3,null,null,null], "