You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2023/01/17 11:16:13 UTC

[flink-table-store] branch master updated: [FLINK-30701] Remove SimpleTableTestHelper and use sql in SparkITCase

This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git


The following commit(s) were added to refs/heads/master by this push:
     new 18123526 [FLINK-30701] Remove SimpleTableTestHelper and use sql in SparkITCase
18123526 is described below

commit 181235267919fdae0e99e43d0c08de8b8e06a275
Author: shammon <zj...@gmail.com>
AuthorDate: Tue Jan 17 19:16:07 2023 +0800

    [FLINK-30701] Remove SimpleTableTestHelper and use sql in SparkITCase
    
    This closes #488
---
 .../table/store/spark/SimpleTableTestHelper.java   |  89 ---------------
 .../flink/table/store/spark/SparkReadITCase.java   |  45 ++------
 .../flink/table/store/spark/SparkReadTestBase.java | 121 ++++++++++-----------
 .../store/spark/SparkSchemaEvolutionITCase.java    |  87 +++++++--------
 4 files changed, 108 insertions(+), 234 deletions(-)

diff --git a/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SimpleTableTestHelper.java b/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SimpleTableTestHelper.java
deleted file mode 100644
index 2c88d2b1..00000000
--- a/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SimpleTableTestHelper.java
+++ /dev/null
@@ -1,89 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.store.spark;
-
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.table.store.CoreOptions;
-import org.apache.flink.table.store.data.InternalRow;
-import org.apache.flink.table.store.file.schema.SchemaManager;
-import org.apache.flink.table.store.file.schema.UpdateSchema;
-import org.apache.flink.table.store.table.FileStoreTable;
-import org.apache.flink.table.store.table.FileStoreTableFactory;
-import org.apache.flink.table.store.table.sink.TableCommit;
-import org.apache.flink.table.store.table.sink.TableWrite;
-import org.apache.flink.table.store.types.RowType;
-
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-/** A simple table test helper to write and commit. */
-public class SimpleTableTestHelper {
-
-    private TableWrite writer;
-    private TableCommit commit;
-
-    private long commitIdentifier;
-
-    public SimpleTableTestHelper(Path path) throws Exception {
-        Map<String, String> options = new HashMap<>();
-        // orc is shaded, can not find shaded classes in ide
-        options.put(CoreOptions.FILE_FORMAT.key(), "avro");
-        createTable(path, options);
-    }
-
-    public SimpleTableTestHelper(Path path, RowType rowType) throws Exception {
-        this(path, rowType, Collections.emptyList(), Collections.emptyList());
-    }
-
-    public SimpleTableTestHelper(
-            Path path, RowType rowType, List<String> partitionKeys, List<String> primaryKeys)
-            throws Exception {
-        Map<String, String> options = new HashMap<>();
-        // orc is shaded, can not find shaded classes in ide
-        options.put(CoreOptions.FILE_FORMAT.key(), "avro");
-        new SchemaManager(path)
-                .commitNewVersion(
-                        new UpdateSchema(rowType, partitionKeys, primaryKeys, options, ""));
-        createTable(path, options);
-    }
-
-    private void createTable(Path path, Map<String, String> options) {
-        Configuration conf = Configuration.fromMap(options);
-        conf.setString("path", path.toString());
-        FileStoreTable table = FileStoreTableFactory.create(conf);
-
-        String commitUser = "user";
-        this.writer = table.newWrite(commitUser);
-        this.commit = table.newCommit(commitUser);
-
-        this.commitIdentifier = 0;
-    }
-
-    public void write(InternalRow row) throws Exception {
-        writer.write(row);
-    }
-
-    public void commit() throws Exception {
-        commit.commit(commitIdentifier, writer.prepareCommit(true, commitIdentifier));
-        commitIdentifier++;
-    }
-}
diff --git a/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkReadITCase.java b/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkReadITCase.java
index bab448f8..3639bd86 100644
--- a/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkReadITCase.java
+++ b/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkReadITCase.java
@@ -19,9 +19,6 @@
 package org.apache.flink.table.store.spark;
 
 import org.apache.flink.core.fs.Path;
-import org.apache.flink.table.store.data.BinaryString;
-import org.apache.flink.table.store.data.GenericArray;
-import org.apache.flink.table.store.data.GenericRow;
 import org.apache.flink.table.store.file.schema.TableSchema;
 import org.apache.flink.table.store.table.FileStoreTableFactory;
 import org.apache.flink.table.store.types.ArrayType;
@@ -41,7 +38,6 @@ import org.junit.jupiter.api.Test;
 
 import java.io.File;
 import java.util.Arrays;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -56,18 +52,16 @@ public class SparkReadITCase extends SparkReadTestBase {
 
     @Test
     public void testNormal() {
-        innerTestSimpleType(spark.read().format("tablestore").load(tablePath1.toString()));
+        innerTestSimpleType(spark.table("tablestore.default.t1"));
 
-        innerTestNestedType(spark.read().format("tablestore").load(tablePath2.toString()));
+        innerTestNestedType(spark.table("tablestore.default.t2"));
     }
 
     @Test
     public void testFilterPushDown() {
-        innerTestSimpleTypeFilterPushDown(
-                spark.read().format("tablestore").load(tablePath1.toString()));
+        innerTestSimpleTypeFilterPushDown(spark.table("tablestore.default.t1"));
 
-        innerTestNestedTypeFilterPushDown(
-                spark.read().format("tablestore").load(tablePath2.toString()));
+        innerTestNestedTypeFilterPushDown(spark.table("tablestore.default.t2"));
     }
 
     @Test
@@ -227,8 +221,8 @@ public class SparkReadITCase extends SparkReadTestBase {
         innerTest("MyTable6", false, true, true);
     }
 
-    private void innerTest(String tableName, boolean hasPk, boolean partitioned, boolean appendOnly)
-            throws Exception {
+    private void innerTest(
+            String tableName, boolean hasPk, boolean partitioned, boolean appendOnly) {
         spark.sql("USE tablestore");
         String ddlTemplate =
                 "CREATE TABLE default.%s (\n"
@@ -243,6 +237,7 @@ public class SparkReadITCase extends SparkReadTestBase {
                         + "TBLPROPERTIES (%s)";
         Map<String, String> tableProperties = new HashMap<>();
         tableProperties.put("foo", "bar");
+        tableProperties.put("file.format", "avro");
         List<String> columns =
                 Arrays.asList("order_id", "buyer_id", "coupon_info", "order_amount", "dt", "hh");
         List<DataType> types =
@@ -339,29 +334,9 @@ public class SparkReadITCase extends SparkReadTestBase {
 
         assertThat(schema.comment()).isEqualTo("table comment");
 
-        SimpleTableTestHelper testHelper =
-                new SimpleTableTestHelper(
-                        tablePath,
-                        schema.logicalRowType(),
-                        partitioned ? Arrays.asList("dt", "hh") : Collections.emptyList(),
-                        hasPk
-                                ? partitioned
-                                        ? Arrays.asList("order_id", "dt", "hh")
-                                        : Collections.singletonList("order_id")
-                                : Collections.emptyList());
-        testHelper.write(
-                GenericRow.of(
-                        1L,
-                        10L,
-                        new GenericArray(
-                                new BinaryString[] {
-                                    BinaryString.fromString("loyalty_discount"),
-                                    BinaryString.fromString("shipping_discount")
-                                }),
-                        199.0d,
-                        BinaryString.fromString("2022-07-20"),
-                        BinaryString.fromString("12")));
-        testHelper.commit();
+        writeTable(
+                tableName,
+                "(1L, 10L, array('loyalty_discount', 'shipping_discount'), 199.0d, '2022-07-20', '12')");
 
         Dataset<Row> dataset = spark.read().format("tablestore").load(tablePath.toString());
         assertThat(dataset.select("order_id", "buyer_id", "dt").collectAsList().toString())
diff --git a/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkReadTestBase.java b/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkReadTestBase.java
index 5b2ef480..970860ce 100644
--- a/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkReadTestBase.java
+++ b/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkReadTestBase.java
@@ -20,10 +20,12 @@ package org.apache.flink.table.store.spark;
 
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.table.store.data.BinaryString;
-import org.apache.flink.table.store.data.GenericArray;
 import org.apache.flink.table.store.data.GenericRow;
 import org.apache.flink.table.store.file.schema.TableSchema;
+import org.apache.flink.table.store.table.FileStoreTable;
 import org.apache.flink.table.store.table.FileStoreTableFactory;
+import org.apache.flink.table.store.table.sink.TableCommit;
+import org.apache.flink.table.store.table.sink.TableWrite;
 import org.apache.flink.table.store.types.ArrayType;
 import org.apache.flink.table.store.types.BigIntType;
 import org.apache.flink.table.store.types.BooleanType;
@@ -35,6 +37,7 @@ import org.apache.flink.table.store.types.RowType;
 import org.apache.flink.table.store.types.VarCharType;
 
 import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.SparkSession;
@@ -46,11 +49,14 @@ import java.io.IOException;
 import java.nio.file.Files;
 import java.util.Arrays;
 import java.util.List;
+import java.util.concurrent.atomic.AtomicLong;
 
 import static org.assertj.core.api.Assertions.assertThat;
 
 /** Base tests for spark read. */
 public abstract class SparkReadTestBase {
+    private static final String COMMIT_USER = "user";
+    private static final AtomicLong COMMIT_IDENTIFIER = new AtomicLong(0);
 
     private static File warehouse = null;
 
@@ -70,74 +76,34 @@ public abstract class SparkReadTestBase {
         spark = SparkSession.builder().master("local[2]").getOrCreate();
         spark.conf().set("spark.sql.catalog.tablestore", SparkCatalog.class.getName());
         spark.conf().set("spark.sql.catalog.tablestore.warehouse", warehousePath.toString());
+        spark.sql("USE tablestore");
+        spark.sql("CREATE NAMESPACE default");
 
         // flink sink
         tablePath1 = new Path(warehousePath, "default.db/t1");
-        SimpleTableTestHelper testHelper1 = new SimpleTableTestHelper(tablePath1, rowType1());
-        testHelper1.write(GenericRow.of(1, 2L, BinaryString.fromString("1")));
-        testHelper1.write(GenericRow.of(3, 4L, BinaryString.fromString("2")));
-        testHelper1.write(GenericRow.of(5, 6L, BinaryString.fromString("3")));
-        testHelper1.write(GenericRow.ofKind(RowKind.DELETE, 3, 4L, BinaryString.fromString("2")));
-        testHelper1.commit();
+        createTable("t1");
+        writeTable(
+                "t1",
+                GenericRow.of(1, 2L, BinaryString.fromString("1")),
+                GenericRow.of(3, 4L, BinaryString.fromString("2")),
+                GenericRow.of(5, 6L, BinaryString.fromString("3")),
+                GenericRow.ofKind(RowKind.DELETE, 3, 4L, BinaryString.fromString("2")));
 
         // a int not null
         // b array<varchar> not null
         // c row<row<double, array<boolean> not null> not null, bigint> not null
         tablePath2 = new Path(warehousePath, "default.db/t2");
-        SimpleTableTestHelper testHelper2 = new SimpleTableTestHelper(tablePath2, rowType2());
-        testHelper2.write(
-                GenericRow.of(
-                        1,
-                        new GenericArray(
-                                new BinaryString[] {
-                                    BinaryString.fromString("AAA"), BinaryString.fromString("BBB")
-                                }),
-                        GenericRow.of(
-                                GenericRow.of(1.0d, new GenericArray(new Boolean[] {null})), 1L)));
-        testHelper2.write(
-                GenericRow.of(
-                        2,
-                        new GenericArray(
-                                new BinaryString[] {
-                                    BinaryString.fromString("CCC"), BinaryString.fromString("DDD")
-                                }),
-                        GenericRow.of(
-                                GenericRow.of(null, new GenericArray(new Boolean[] {true})),
-                                null)));
-        testHelper2.commit();
-
-        testHelper2.write(
-                GenericRow.of(
-                        3,
-                        new GenericArray(new BinaryString[] {null, null}),
-                        GenericRow.of(
-                                GenericRow.of(2.0d, new GenericArray(new boolean[] {true, false})),
-                                2L)));
-
-        testHelper2.write(
-                GenericRow.of(
-                        4,
-                        new GenericArray(new BinaryString[] {null, BinaryString.fromString("EEE")}),
-                        GenericRow.of(
-                                GenericRow.of(
-                                        3.0d, new GenericArray(new Boolean[] {true, false, true})),
-                                3L)));
-        testHelper2.commit();
-    }
-
-    protected static SimpleTableTestHelper createTestHelper(Path tablePath) throws Exception {
-        RowType rowType =
-                new RowType(
-                        Arrays.asList(
-                                new DataField(0, "a", new IntType(false)),
-                                new DataField(1, "b", new BigIntType()),
-                                new DataField(2, "c", new VarCharType())));
-        return new SimpleTableTestHelper(tablePath, rowType);
-    }
-
-    protected static SimpleTableTestHelper createTestHelperWithoutDDL(Path tablePath)
-            throws Exception {
-        return new SimpleTableTestHelper(tablePath);
+        spark.sql(
+                "CREATE TABLE tablestore.default.t2 (a INT NOT NULL COMMENT 'comment about a', b ARRAY<STRING> NOT NULL, c STRUCT<c1: STRUCT<c11: DOUBLE, c12: ARRAY<BOOLEAN> NOT NULL> NOT NULL, c2: BIGINT COMMENT 'comment about c2'> NOT NULL COMMENT 'comment about c') TBLPROPERTIES ('file.format'='avro')");
+        //        createTable1("t2");
+        writeTable(
+                "t2",
+                "(1, array('AAA', 'BBB'), struct(struct(1.0d, array(null)), 1L))",
+                "(2, array('CCC', 'DDD'), struct(struct(null, array(true)), null))");
+        writeTable(
+                "t2",
+                "(3, array(null, null), struct(struct(2.0d, array(true, false)), 2L))",
+                "(4, array(null, 'EEE'), struct(struct(3.0d, array(true, false, true)), 3L))");
     }
 
     private static RowType rowType1() {
@@ -229,4 +195,37 @@ public abstract class SparkReadTestBase {
         }
         throw new IllegalArgumentException();
     }
+
+    /**
+     * Create table with fields: a->int not null, b->bigint, c->string. orc is shaded, can not find
+     * shaded classes in ide, we use avro here.
+     *
+     * @param tableName the given table name
+     */
+    protected static void createTable(String tableName) {
+        spark.sql(
+                String.format(
+                        "CREATE TABLE tablestore.default.%s (a INT NOT NULL, b BIGINT, c STRING) TBLPROPERTIES ('file.format'='avro')",
+                        tableName));
+    }
+
+    private static void writeTable(String tableName, GenericRow... rows) throws Exception {
+        FileStoreTable fileStoreTable =
+                FileStoreTableFactory.create(
+                        new Path(warehousePath, String.format("default.db/%s", tableName)));
+        TableWrite writer = fileStoreTable.newWrite(COMMIT_USER);
+        TableCommit commit = fileStoreTable.newCommit(COMMIT_USER);
+        for (GenericRow row : rows) {
+            writer.write(row);
+        }
+        long commitIdentifier = COMMIT_IDENTIFIER.getAndIncrement();
+        commit.commit(commitIdentifier, writer.prepareCommit(true, commitIdentifier));
+    }
+
+    protected static void writeTable(String tableName, String... values) {
+        spark.sql(
+                String.format(
+                        "INSERT INTO tablestore.default.%s VALUES %s",
+                        tableName, StringUtils.join(values, ",")));
+    }
 }
diff --git a/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkSchemaEvolutionITCase.java b/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkSchemaEvolutionITCase.java
index 62e613eb..4630df6f 100644
--- a/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkSchemaEvolutionITCase.java
+++ b/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkSchemaEvolutionITCase.java
@@ -18,9 +18,6 @@
 
 package org.apache.flink.table.store.spark;
 
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.table.store.data.BinaryString;
-import org.apache.flink.table.store.data.GenericRow;
 import org.apache.flink.table.store.data.InternalRow;
 
 import org.apache.spark.sql.AnalysisException;
@@ -29,6 +26,7 @@ import org.apache.spark.sql.Row;
 import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Test;
 
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
@@ -42,12 +40,16 @@ public class SparkSchemaEvolutionITCase extends SparkReadTestBase {
     public void testSetAndRemoveOption() {
         spark.sql("ALTER TABLE tablestore.default.t1 SET TBLPROPERTIES('xyc' 'unknown1')");
 
-        Map<String, String> options = schema1().options();
+        Map<String, String> options =
+                rowsToMap(
+                        spark.sql("SELECT * FROM tablestore.default.`t1$options`").collectAsList());
         assertThat(options).containsEntry("xyc", "unknown1");
 
         spark.sql("ALTER TABLE tablestore.default.t1 UNSET TBLPROPERTIES('xyc')");
 
-        options = schema1().options();
+        options =
+                rowsToMap(
+                        spark.sql("SELECT * FROM tablestore.default.`t1$options`").collectAsList());
         assertThat(options).doesNotContainKey("xyc");
 
         assertThatThrownBy(
@@ -58,13 +60,17 @@ public class SparkSchemaEvolutionITCase extends SparkReadTestBase {
                 .hasMessageContaining("Alter primary key is not supported");
     }
 
+    private Map<String, String> rowsToMap(List<Row> rows) {
+        Map<String, String> map = new HashMap<>();
+        rows.forEach(r -> map.put(r.getString(0), r.getString(1)));
+
+        return map;
+    }
+
     @Test
-    public void testAddColumn() throws Exception {
-        Path tablePath = new Path(warehousePath, "default.db/testAddColumn");
-        SimpleTableTestHelper testHelper1 = createTestHelper(tablePath);
-        testHelper1.write(GenericRow.of(1, 2L, BinaryString.fromString("1")));
-        testHelper1.write(GenericRow.of(5, 6L, BinaryString.fromString("3")));
-        testHelper1.commit();
+    public void testAddColumn() {
+        createTable("testAddColumn");
+        writeTable("testAddColumn", "(1, 2L, '1')", "(5, 6L, '3')");
 
         spark.sql("ALTER TABLE tablestore.default.testAddColumn ADD COLUMN d STRING");
 
@@ -80,9 +86,8 @@ public class SparkSchemaEvolutionITCase extends SparkReadTestBase {
     }
 
     @Test
-    public void testAddNotNullColumn() throws Exception {
-        Path tablePath = new Path(warehousePath, "default.db/testAddNotNullColumn");
-        createTestHelper(tablePath);
+    public void testAddNotNullColumn() {
+        createTable("testAddNotNullColumn");
 
         List<Row> beforeAdd =
                 spark.sql("SHOW CREATE TABLE tablestore.default.testAddNotNullColumn")
@@ -106,12 +111,9 @@ public class SparkSchemaEvolutionITCase extends SparkReadTestBase {
     }
 
     @Test
-    public void testRenameColumn() throws Exception {
-        Path tablePath = new Path(warehousePath, "default.db/testRenameColumn");
-        SimpleTableTestHelper testHelper1 = createTestHelper(tablePath);
-        testHelper1.write(GenericRow.of(1, 2L, BinaryString.fromString("1")));
-        testHelper1.write(GenericRow.of(5, 6L, BinaryString.fromString("3")));
-        testHelper1.commit();
+    public void testRenameColumn() {
+        createTable("testRenameColumn");
+        writeTable("testRenameColumn", "(1, 2L, '1')", "(5, 6L, '3')");
 
         List<Row> beforeRename =
                 spark.sql("SHOW CREATE TABLE tablestore.default.testRenameColumn").collectAsList();
@@ -189,11 +191,8 @@ public class SparkSchemaEvolutionITCase extends SparkReadTestBase {
 
     @Test
     public void testDropSingleColumn() throws Exception {
-        Path tablePath = new Path(warehousePath, "default.db/testDropSingleColumn");
-        SimpleTableTestHelper testHelper = createTestHelper(tablePath);
-        testHelper.write(GenericRow.of(1, 2L, BinaryString.fromString("1")));
-        testHelper.write(GenericRow.of(5, 6L, BinaryString.fromString("3")));
-        testHelper.commit();
+        createTable("testDropSingleColumn");
+        writeTable("testDropSingleColumn", "(1, 2L, '1')", "(5, 6L, '3')");
 
         List<Row> beforeDrop =
                 spark.sql("SHOW CREATE TABLE tablestore.default.testDropSingleColumn")
@@ -227,8 +226,7 @@ public class SparkSchemaEvolutionITCase extends SparkReadTestBase {
 
     @Test
     public void testDropColumns() throws Exception {
-        Path tablePath = new Path(warehousePath, "default.db/testDropColumns");
-        createTestHelper(tablePath);
+        createTable("testDropColumns");
 
         List<Row> beforeRename =
                 spark.sql("SHOW CREATE TABLE tablestore.default.testDropColumns").collectAsList();
@@ -343,11 +341,8 @@ public class SparkSchemaEvolutionITCase extends SparkReadTestBase {
     @Disabled
     @Test
     public void testAlterColumnType() throws Exception {
-        Path tablePath = new Path(warehousePath, "default.db/testAlterColumnType");
-        SimpleTableTestHelper testHelper1 = createTestHelper(tablePath);
-        testHelper1.write(GenericRow.of(1, 2L, BinaryString.fromString("1")));
-        testHelper1.write(GenericRow.of(5, 6L, BinaryString.fromString("3")));
-        testHelper1.commit();
+        createTable("testAlterColumnType");
+        writeTable("testAlterColumnType", "(1, 2L, '1')", "(5, 6L, '3')");
 
         spark.sql("ALTER TABLE tablestore.default.testAlterColumnType ALTER COLUMN a TYPE BIGINT");
         innerTestSimpleType(spark.table("tablestore.default.testAlterColumnType"));
@@ -403,6 +398,12 @@ public class SparkSchemaEvolutionITCase extends SparkReadTestBase {
 
     @Test
     public void testAlterTableColumnComment() {
+        createTable("testAlterTableColumnComment");
+        Row row =
+                spark.sql("SHOW CREATE TABLE tablestore.default.`testAlterTableColumnComment`")
+                        .collectAsList()
+                        .get(0);
+        System.out.println(row);
         assertThat(getField(schema1(), 0).description()).isNull();
 
         spark.sql("ALTER TABLE tablestore.default.t1 ALTER COLUMN a COMMENT 'a new comment'");
@@ -455,11 +456,8 @@ public class SparkSchemaEvolutionITCase extends SparkReadTestBase {
     @Test
     public void testSchemaEvolution() throws Exception {
         // Create table with fields [a, b, c] and insert 2 records
-        Path tablePath = new Path(warehousePath, "default.db/testSchemaEvolution");
-        SimpleTableTestHelper testHelper1 = createTestHelper(tablePath);
-        testHelper1.write(GenericRow.of(1, 2L, BinaryString.fromString("3")));
-        testHelper1.write(GenericRow.of(4, 5L, BinaryString.fromString("6")));
-        testHelper1.commit();
+        createTable("testSchemaEvolution");
+        writeTable("testSchemaEvolution", "(1, 2L, '3')", "(4, 5L, '6')");
         assertThat(spark.table("tablestore.default.testSchemaEvolution").collectAsList().toString())
                 .isEqualTo("[[1,2,3], [4,5,6]]");
         assertThat(
@@ -479,10 +477,7 @@ public class SparkSchemaEvolutionITCase extends SparkReadTestBase {
         spark.sql("ALTER TABLE tablestore.default.testSchemaEvolution RENAME COLUMN a to aa");
         spark.sql("ALTER TABLE tablestore.default.testSchemaEvolution RENAME COLUMN c to a");
         spark.sql("ALTER TABLE tablestore.default.testSchemaEvolution RENAME COLUMN b to c");
-        SimpleTableTestHelper testHelper2 = createTestHelperWithoutDDL(tablePath);
-        testHelper2.write(GenericRow.of(7, 8L, BinaryString.fromString("9")));
-        testHelper2.write(GenericRow.of(10, 11L, BinaryString.fromString("12")));
-        testHelper2.commit();
+        writeTable("testSchemaEvolution", "(7, 8L, '9')", "(10, 11L, '12')");
         assertThat(spark.table("tablestore.default.testSchemaEvolution").collectAsList().toString())
                 .isEqualTo("[[1,2,3], [4,5,6], [7,8,9], [10,11,12]]");
         assertThat(
@@ -501,10 +496,7 @@ public class SparkSchemaEvolutionITCase extends SparkReadTestBase {
 
         // Drop fields "aa", "c" and the fields are [a], insert 2 records
         spark.sql("ALTER TABLE tablestore.default.testSchemaEvolution DROP COLUMNS aa, c");
-        SimpleTableTestHelper testHelper3 = createTestHelperWithoutDDL(tablePath);
-        testHelper3.write(GenericRow.of(BinaryString.fromString("13")));
-        testHelper3.write(GenericRow.of(BinaryString.fromString("14")));
-        testHelper3.commit();
+        writeTable("testSchemaEvolution", "('13')", "('14')");
         assertThat(spark.table("tablestore.default.testSchemaEvolution").collectAsList().toString())
                 .isEqualTo("[[3], [6], [9], [12], [13], [14]]");
         assertThat(
@@ -518,10 +510,7 @@ public class SparkSchemaEvolutionITCase extends SparkReadTestBase {
         // Add new fields "d", "c", "b" and the fields are [a, d, c, b], insert 2 records
         spark.sql(
                 "ALTER TABLE tablestore.default.testSchemaEvolution ADD COLUMNS (d INT, c INT, b INT)");
-        SimpleTableTestHelper testHelper4 = createTestHelperWithoutDDL(tablePath);
-        testHelper4.write(GenericRow.of(BinaryString.fromString("15"), 16, 17, 18));
-        testHelper4.write(GenericRow.of(BinaryString.fromString("19"), 20, 21, 22));
-        testHelper4.commit();
+        writeTable("testSchemaEvolution", "('15', 16, 17, 18)", "('19', 20, 21, 22)");
         assertThat(spark.table("tablestore.default.testSchemaEvolution").collectAsList().toString())
                 .isEqualTo(
                         "[[3,null,null,null], "