You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2022/07/12 09:45:13 UTC

[flink-table-store] branch master updated: [FLINK-28483] Basic schema evolution for table store

This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git


The following commit(s) were added to refs/heads/master by this push:
     new a73350c3 [FLINK-28483] Basic schema evolution for table store
a73350c3 is described below

commit a73350c3694d78f3bff4900c8be7f3f02983b6e1
Author: Jingsong Lee <ji...@gmail.com>
AuthorDate: Tue Jul 12 17:45:07 2022 +0800

    [FLINK-28483] Basic schema evolution for table store
    
    This closes #208
---
 docs/content/docs/engines/spark.md                 |  26 +++
 .../flink/table/store/connector/FlinkCatalog.java  | 115 +++++++++---
 .../table/store/connector/sink/StoreCommitter.java |  20 +-
 .../table/store/connector/sink/StoreSink.java      |  24 +--
 ...PredicateITCase.java => CatalogITCaseBase.java} |  47 ++---
 .../table/store/connector/PredicateITCase.java     |  31 +--
 .../table/store/connector/SchemaChangeITCase.java  |  47 +++++
 .../flink/table/store/file/catalog/Catalog.java    |   7 +-
 .../store/file/catalog/FileSystemCatalog.java      |  20 +-
 .../flink/table/store/file/operation/Lock.java     |  47 ++++-
 .../table/store/file/schema/SchemaChange.java      | 208 +++++++++++++++++++++
 .../table/store/file/schema/SchemaManager.java     | 127 +++++++++++--
 .../flink/table/store/file/schema/TableSchema.java |   2 +-
 .../flink/table/store/table/sink/TableCommit.java  |  11 +-
 .../apache/flink/table/store/hive/HiveCatalog.java |  61 +++---
 .../flink/table/store/hive/HiveCatalogLock.java    |  32 ++--
 .../flink/table/store/spark/SparkCatalog.java      |  72 +++++--
 .../flink/table/store/spark/SparkTypeUtils.java    | 132 +++++++++++--
 .../table/store/spark/SparkInternalRowTest.java    |  23 +--
 .../flink/table/store/spark/SparkReadITCase.java   |  58 +++++-
 .../flink/table/store/spark/SparkTypeTest.java     |  33 ++--
 21 files changed, 884 insertions(+), 259 deletions(-)

diff --git a/docs/content/docs/engines/spark.md b/docs/content/docs/engines/spark.md
index b831a67f..7657f118 100644
--- a/docs/content/docs/engines/spark.md
+++ b/docs/content/docs/engines/spark.md
@@ -70,3 +70,29 @@ OPTIONS (
 ```sql
 SELECT * FROM table_store.default.myTable;
 ```
+
+## DDL
+
+`ALTER TABLE ... SET TBLPROPERTIES`
+```sql
+ALTER TABLE table_store.default.myTable SET TBLPROPERTIES (
+    'write-buffer-size'='256 MB'
+)
+```
+
+`ALTER TABLE ... UNSET TBLPROPERTIES`
+```sql
+ALTER TABLE table_store.default.myTable UNSET TBLPROPERTIES ('write-buffer-size')
+```
+
+`ALTER TABLE ... ADD COLUMN`
+```sql
+ALTER TABLE table_store.default.myTable
+ADD COLUMNS (new_column STRING)
+```
+
+`ALTER TABLE ... ALTER COLUMN ... TYPE`
+```sql
+ALTER TABLE table_store.default.myTable
+ALTER COLUMN column_name TYPE BIGINT
+```
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/FlinkCatalog.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/FlinkCatalog.java
index 15fefe42..e4dcefb7 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/FlinkCatalog.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/FlinkCatalog.java
@@ -41,12 +41,15 @@ import org.apache.flink.table.catalog.stats.CatalogTableStatistics;
 import org.apache.flink.table.expressions.Expression;
 import org.apache.flink.table.factories.Factory;
 import org.apache.flink.table.store.file.catalog.Catalog;
+import org.apache.flink.table.store.file.schema.SchemaChange;
 import org.apache.flink.table.store.file.schema.TableSchema;
 import org.apache.flink.table.store.file.schema.UpdateSchema;
 
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Optional;
 
 import static org.apache.flink.table.factories.FactoryUtil.CONNECTOR;
@@ -132,7 +135,7 @@ public class FlinkCatalog extends AbstractCatalog {
     }
 
     @Override
-    public CatalogBaseTable getTable(ObjectPath tablePath)
+    public CatalogTable getTable(ObjectPath tablePath)
             throws TableNotExistException, CatalogException {
         TableSchema schema;
         try {
@@ -165,8 +168,33 @@ public class FlinkCatalog extends AbstractCatalog {
     @Override
     public void createTable(ObjectPath tablePath, CatalogBaseTable table, boolean ignoreIfExists)
             throws TableAlreadyExistException, DatabaseNotExistException, CatalogException {
+        if (!(table instanceof CatalogTable)) {
+            throw new UnsupportedOperationException(
+                    "Only support CatalogTable, but is: " + table.getClass());
+        }
+        CatalogTable catalogTable = (CatalogTable) table;
+        Map<String, String> options = table.getOptions();
+        if (options.containsKey(CONNECTOR.key())) {
+            throw new CatalogException(
+                    String.format(
+                            "Table Store Catalog only supports table store tables, not '%s' connector."
+                                    + " You can create TEMPORARY table instead.",
+                            options.get(CONNECTOR.key())));
+        }
+
+        // remove table path
+        String specific = options.remove(PATH.key());
+        if (specific != null) {
+            if (!catalog.getTableLocation(tablePath).equals(new Path(specific))) {
+                throw new IllegalArgumentException(
+                        "Illegal table path in table options: " + specific);
+            }
+            catalogTable = catalogTable.copy(options);
+        }
+
         try {
-            catalog.createTable(tablePath, convertTableToSchema(tablePath, table), ignoreIfExists);
+            catalog.createTable(
+                    tablePath, UpdateSchema.fromCatalogTable(catalogTable), ignoreIfExists);
         } catch (Catalog.TableAlreadyExistException e) {
             throw new TableAlreadyExistException(getName(), e.tablePath());
         } catch (Catalog.DatabaseNotExistException e) {
@@ -178,40 +206,75 @@ public class FlinkCatalog extends AbstractCatalog {
     public void alterTable(
             ObjectPath tablePath, CatalogBaseTable newTable, boolean ignoreIfNotExists)
             throws TableNotExistException, CatalogException {
+        if (ignoreIfNotExists && !tableExists(tablePath)) {
+            return;
+        }
+
+        CatalogTable table = getTable(tablePath);
+
+        // Currently, Flink SQL only support altering table properties.
+        validateAlterTable(table, (CatalogTable) newTable);
+
+        List<SchemaChange> changes = new ArrayList<>();
+        Map<String, String> oldProperties = table.getOptions();
+        for (Map.Entry<String, String> entry : newTable.getOptions().entrySet()) {
+            String key = entry.getKey();
+            String value = entry.getValue();
+
+            if (Objects.equals(value, oldProperties.get(key))) {
+                continue;
+            }
+
+            if (PATH.key().equalsIgnoreCase(key)) {
+                throw new IllegalArgumentException("Illegal table path in table options: " + value);
+            }
+
+            changes.add(SchemaChange.setOption(key, value));
+        }
+
+        oldProperties
+                .keySet()
+                .forEach(
+                        k -> {
+                            if (!newTable.getOptions().containsKey(k)) {
+                                changes.add(SchemaChange.removeOption(k));
+                            }
+                        });
+
         try {
-            catalog.alterTable(
-                    tablePath, convertTableToSchema(tablePath, newTable), ignoreIfNotExists);
+            catalog.alterTable(tablePath, changes, ignoreIfNotExists);
         } catch (Catalog.TableNotExistException e) {
             throw new TableNotExistException(getName(), e.tablePath());
         }
     }
 
-    private UpdateSchema convertTableToSchema(ObjectPath tablePath, CatalogBaseTable baseTable) {
-        if (!(baseTable instanceof CatalogTable)) {
-            throw new UnsupportedOperationException(
-                    "Only support CatalogTable, but is: " + baseTable.getClass());
-        }
-        CatalogTable table = (CatalogTable) baseTable;
-        Map<String, String> options = table.getOptions();
-        if (options.containsKey(CONNECTOR.key())) {
-            throw new CatalogException(
-                    String.format(
-                            "Table Store Catalog only supports table store tables, not '%s' connector."
-                                    + " You can create TEMPORARY table instead.",
-                            options.get(CONNECTOR.key())));
+    private static void validateAlterTable(CatalogTable ct1, CatalogTable ct2) {
+        org.apache.flink.table.api.TableSchema ts1 = ct1.getSchema();
+        org.apache.flink.table.api.TableSchema ts2 = ct2.getSchema();
+        boolean pkEquality = false;
+
+        if (ts1.getPrimaryKey().isPresent() && ts2.getPrimaryKey().isPresent()) {
+            pkEquality =
+                    Objects.equals(
+                                    ts1.getPrimaryKey().get().getType(),
+                                    ts2.getPrimaryKey().get().getType())
+                            && Objects.equals(
+                                    ts1.getPrimaryKey().get().getColumns(),
+                                    ts2.getPrimaryKey().get().getColumns());
+        } else if (!ts1.getPrimaryKey().isPresent() && !ts2.getPrimaryKey().isPresent()) {
+            pkEquality = true;
         }
 
-        // remove table path
-        String specific = options.remove(PATH.key());
-        if (specific != null) {
-            if (!catalog.getTableLocation(tablePath).equals(new Path(specific))) {
-                throw new IllegalArgumentException(
-                        "Illegal table path in table options: " + specific);
-            }
-            table = table.copy(options);
+        if (!(Objects.equals(ts1.getTableColumns(), ts2.getTableColumns())
+                && Objects.equals(ts1.getWatermarkSpecs(), ts2.getWatermarkSpecs())
+                && pkEquality)) {
+            throw new UnsupportedOperationException("Altering schema is not supported yet.");
         }
 
-        return UpdateSchema.fromCatalogTable(table);
+        if (!ct1.getPartitionKeys().equals(ct2.getPartitionKeys())) {
+            throw new UnsupportedOperationException(
+                    "Altering partition keys is not supported yet.");
+        }
     }
 
     @Override
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreCommitter.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreCommitter.java
index d06e09fd..cfe0551b 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreCommitter.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreCommitter.java
@@ -18,13 +18,10 @@
 
 package org.apache.flink.table.store.connector.sink;
 
-import org.apache.flink.table.store.file.catalog.CatalogLock;
 import org.apache.flink.table.store.file.manifest.ManifestCommittable;
 import org.apache.flink.table.store.table.sink.FileCommittable;
 import org.apache.flink.table.store.table.sink.TableCommit;
 
-import javax.annotation.Nullable;
-
 import java.io.IOException;
 import java.util.List;
 
@@ -33,18 +30,8 @@ public class StoreCommitter implements Committer {
 
     private final TableCommit commit;
 
-    @Nullable private final CatalogLock lock;
-
-    public StoreCommitter(TableCommit commit, @Nullable CatalogLock lock) {
+    public StoreCommitter(TableCommit commit) {
         this.commit = commit;
-        this.lock = lock;
-    }
-
-    @Override
-    public void close() throws Exception {
-        if (lock != null) {
-            lock.close();
-        }
     }
 
     @Override
@@ -78,4 +65,9 @@ public class StoreCommitter implements Committer {
             throws IOException, InterruptedException {
         commit.commit(committables);
     }
+
+    @Override
+    public void close() throws Exception {
+        commit.close();
+    }
 }
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSink.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSink.java
index 544675a6..3f205871 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSink.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSink.java
@@ -38,7 +38,6 @@ import javax.annotation.Nullable;
 
 import java.io.Serializable;
 import java.util.Map;
-import java.util.concurrent.Callable;
 
 /** Sink of dynamic store. */
 public class StoreSink implements Serializable {
@@ -88,28 +87,9 @@ public class StoreSink implements Serializable {
     }
 
     private StoreCommitter createCommitter(String user) {
-        CatalogLock catalogLock;
-        Lock lock;
-        if (lockFactory == null) {
-            catalogLock = null;
-            lock = null;
-        } else {
-            catalogLock = lockFactory.create();
-            lock =
-                    new Lock() {
-                        @Override
-                        public <T> T runWithLock(Callable<T> callable) throws Exception {
-                            return catalogLock.runWithLock(
-                                    tableIdentifier.getDatabaseName(),
-                                    tableIdentifier.getObjectName(),
-                                    callable);
-                        }
-                    };
-        }
-
+        Lock lock = Lock.fromCatalog(lockFactory, tableIdentifier.toObjectPath());
         return new StoreCommitter(
-                table.newCommit(user).withOverwritePartition(overwritePartition).withLock(lock),
-                catalogLock);
+                table.newCommit(user).withOverwritePartition(overwritePartition).withLock(lock));
     }
 
     public DataStreamSink<?> sinkTo(DataStream<RowData> input) {
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/PredicateITCase.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/CatalogITCaseBase.java
similarity index 63%
copy from flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/PredicateITCase.java
copy to flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/CatalogITCaseBase.java
index 4e56fec8..feffad76 100644
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/PredicateITCase.java
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/CatalogITCaseBase.java
@@ -20,6 +20,11 @@ package org.apache.flink.table.store.connector;
 
 import org.apache.flink.table.api.EnvironmentSettings;
 import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.catalog.Catalog;
+import org.apache.flink.table.catalog.CatalogBaseTable;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.exceptions.TableNotExistException;
 import org.apache.flink.test.util.AbstractTestBase;
 import org.apache.flink.types.Row;
 import org.apache.flink.util.CloseableIterator;
@@ -27,17 +32,14 @@ import org.apache.flink.util.CloseableIterator;
 import org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableList;
 
 import org.junit.Before;
-import org.junit.Test;
 
 import java.io.IOException;
 import java.util.List;
 
-import static org.assertj.core.api.Assertions.assertThat;
+/** ITCase for catalog. */
+public abstract class CatalogITCaseBase extends AbstractTestBase {
 
-/** Predicate ITCase. */
-public class PredicateITCase extends AbstractTestBase {
-
-    private TableEnvironment tEnv;
+    protected TableEnvironment tEnv;
 
     @Before
     public void before() throws IOException {
@@ -50,33 +52,16 @@ public class PredicateITCase extends AbstractTestBase {
         tEnv.useCatalog("TABLE_STORE");
     }
 
-    @Test
-    public void testPkFilterBucket() throws Exception {
-        sql("CREATE TABLE T (a INT PRIMARY KEY NOT ENFORCED, b INT) WITH ('bucket' = '5')");
-        innerTest();
-    }
-
-    @Test
-    public void testNoPkFilterBucket() throws Exception {
-        sql("CREATE TABLE T (a INT, b INT) WITH ('bucket' = '5', 'bucket-key'='a')");
-        innerTest();
-    }
-
-    @Test
-    public void testAppendFilterBucket() throws Exception {
-        sql(
-                "CREATE TABLE T (a INT, b INT) WITH ('bucket' = '5', 'bucket-key'='a', 'write-mode'='append-only')");
-        innerTest();
-    }
-
-    private void innerTest() throws Exception {
-        sql("INSERT INTO T VALUES (1, 2), (3, 4), (5, 6), (7, 8), (9, 10)");
-        assertThat(sql("SELECT * FROM T WHERE a = 5")).containsExactlyInAnyOrder(Row.of(5, 6));
-    }
-
-    private List<Row> sql(String query, Object... args) throws Exception {
+    protected List<Row> sql(String query, Object... args) throws Exception {
         try (CloseableIterator<Row> iter = tEnv.executeSql(String.format(query, args)).collect()) {
             return ImmutableList.copyOf(iter);
         }
     }
+
+    protected CatalogTable table(String tableName) throws TableNotExistException {
+        Catalog catalog = tEnv.getCatalog(tEnv.getCurrentCatalog()).get();
+        CatalogBaseTable table =
+                catalog.getTable(new ObjectPath(catalog.getDefaultDatabase(), tableName));
+        return (CatalogTable) table;
+    }
 }
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/PredicateITCase.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/PredicateITCase.java
index 4e56fec8..ab122791 100644
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/PredicateITCase.java
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/PredicateITCase.java
@@ -18,37 +18,14 @@
 
 package org.apache.flink.table.store.connector;
 
-import org.apache.flink.table.api.EnvironmentSettings;
-import org.apache.flink.table.api.TableEnvironment;
-import org.apache.flink.test.util.AbstractTestBase;
 import org.apache.flink.types.Row;
-import org.apache.flink.util.CloseableIterator;
 
-import org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableList;
-
-import org.junit.Before;
 import org.junit.Test;
 
-import java.io.IOException;
-import java.util.List;
-
 import static org.assertj.core.api.Assertions.assertThat;
 
 /** Predicate ITCase. */
-public class PredicateITCase extends AbstractTestBase {
-
-    private TableEnvironment tEnv;
-
-    @Before
-    public void before() throws IOException {
-        tEnv = TableEnvironment.create(EnvironmentSettings.newInstance().inBatchMode().build());
-        tEnv.executeSql(
-                String.format(
-                        "CREATE CATALOG TABLE_STORE WITH ("
-                                + "'type'='table-store', 'warehouse'='%s')",
-                        TEMPORARY_FOLDER.newFolder().toURI()));
-        tEnv.useCatalog("TABLE_STORE");
-    }
+public class PredicateITCase extends CatalogITCaseBase {
 
     @Test
     public void testPkFilterBucket() throws Exception {
@@ -73,10 +50,4 @@ public class PredicateITCase extends AbstractTestBase {
         sql("INSERT INTO T VALUES (1, 2), (3, 4), (5, 6), (7, 8), (9, 10)");
         assertThat(sql("SELECT * FROM T WHERE a = 5")).containsExactlyInAnyOrder(Row.of(5, 6));
     }
-
-    private List<Row> sql(String query, Object... args) throws Exception {
-        try (CloseableIterator<Row> iter = tEnv.executeSql(String.format(query, args)).collect()) {
-            return ImmutableList.copyOf(iter);
-        }
-    }
 }
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/SchemaChangeITCase.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/SchemaChangeITCase.java
new file mode 100644
index 00000000..378957a5
--- /dev/null
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/SchemaChangeITCase.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector;
+
+import org.junit.Test;
+
+import java.util.Map;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** ITCase for schema changes. */
+public class SchemaChangeITCase extends CatalogITCaseBase {
+
+    // TODO cover more cases once Flink supports more ALTER operations.
+
+    @Test
+    public void testSetAndRemoveOption() throws Exception {
+        sql("CREATE TABLE T (a STRING, b STRING, c STRING)");
+        sql("ALTER TABLE T SET ('xyc'='unknown1', 'abc'='unknown2')");
+
+        Map<String, String> options = table("T").getOptions();
+        assertThat(options).containsEntry("xyc", "unknown1");
+        assertThat(options).containsEntry("abc", "unknown2");
+
+        sql("ALTER TABLE T RESET ('xyc', 'abc')");
+
+        options = table("T").getOptions();
+        assertThat(options).doesNotContainKey("xyc");
+        assertThat(options).doesNotContainKey("abc");
+    }
+}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/Catalog.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/Catalog.java
index 1642d4ff..5d46aa07 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/Catalog.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/Catalog.java
@@ -20,6 +20,7 @@ package org.apache.flink.table.store.file.catalog;
 
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.store.file.schema.SchemaChange;
 import org.apache.flink.table.store.file.schema.TableSchema;
 import org.apache.flink.table.store.file.schema.UpdateSchema;
 import org.apache.flink.table.store.table.FileStoreTable;
@@ -150,15 +151,15 @@ public interface Catalog extends AutoCloseable {
             throws TableAlreadyExistException, DatabaseNotExistException;
 
     /**
-     * Modify an existing table.
+     * Modify an existing table from {@link SchemaChange}s.
      *
      * @param tablePath path of the table to be modified
-     * @param newTableSchema the new table definition
+     * @param changes the schema changes
      * @param ignoreIfNotExists flag to specify behavior when the table does not exist: if set to
      *     false, throw an exception, if set to true, do nothing.
      * @throws TableNotExistException if the table does not exist
      */
-    void alterTable(ObjectPath tablePath, UpdateSchema newTableSchema, boolean ignoreIfNotExists)
+    void alterTable(ObjectPath tablePath, List<SchemaChange> changes, boolean ignoreIfNotExists)
             throws TableNotExistException;
 
     /** Exception for trying to drop on a database that is not empty. */
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/FileSystemCatalog.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/FileSystemCatalog.java
index 23c55131..6dae077e 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/FileSystemCatalog.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/FileSystemCatalog.java
@@ -22,6 +22,7 @@ import org.apache.flink.core.fs.FileStatus;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.store.file.schema.SchemaChange;
 import org.apache.flink.table.store.file.schema.SchemaManager;
 import org.apache.flink.table.store.file.schema.TableSchema;
 import org.apache.flink.table.store.file.schema.UpdateSchema;
@@ -159,22 +160,17 @@ public class FileSystemCatalog extends AbstractCatalog {
             throw new TableAlreadyExistException(tablePath);
         }
 
-        commitTableChange(path, table);
+        uncheck(() -> new SchemaManager(path).commitNewVersion(table));
     }
 
     @Override
-    public void alterTable(ObjectPath tablePath, UpdateSchema newTable, boolean ignoreIfNotExists)
+    public void alterTable(
+            ObjectPath tablePath, List<SchemaChange> changes, boolean ignoreIfNotExists)
             throws TableNotExistException {
-        Path path = getTableLocation(tablePath);
-        if (!tableExists(path)) {
-            if (ignoreIfNotExists) {
-                return;
-            }
-
+        if (!tableExists(tablePath)) {
             throw new TableNotExistException(tablePath);
         }
-
-        commitTableChange(path, newTable);
+        uncheck(() -> new SchemaManager(getTableLocation(tablePath)).commitChanges(changes));
     }
 
     private static <T> T uncheck(Callable<T> callable) {
@@ -194,10 +190,6 @@ public class FileSystemCatalog extends AbstractCatalog {
         return name.substring(0, name.length() - DB_SUFFIX.length());
     }
 
-    private void commitTableChange(Path tablePath, UpdateSchema table) {
-        uncheck(() -> new SchemaManager(tablePath).commitNewVersion(table));
-    }
-
     @Override
     public void close() throws Exception {}
 
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/Lock.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/Lock.java
index 9f28f833..9f28769c 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/Lock.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/Lock.java
@@ -18,11 +18,56 @@
 
 package org.apache.flink.table.store.file.operation;
 
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.store.file.catalog.CatalogLock;
+
+import javax.annotation.Nullable;
+
 import java.util.concurrent.Callable;
 
 /** An interface that allows file store to use global lock to some transaction-related things. */
-public interface Lock {
+public interface Lock extends AutoCloseable {
 
     /** Run with lock. */
     <T> T runWithLock(Callable<T> callable) throws Exception;
+
+    @Nullable
+    static Lock fromCatalog(CatalogLock.Factory lockFactory, ObjectPath tablePath) {
+        if (lockFactory == null) {
+            return null;
+        }
+
+        return fromCatalog(lockFactory.create(), tablePath);
+    }
+
+    @Nullable
+    static Lock fromCatalog(CatalogLock lock, ObjectPath tablePath) {
+        if (lock == null) {
+            return null;
+        }
+        return new CatalogLockImpl(lock, tablePath);
+    }
+
+    /** A {@link Lock} to wrap {@link CatalogLock}. */
+    class CatalogLockImpl implements Lock {
+
+        private final CatalogLock catalogLock;
+        private final ObjectPath tablePath;
+
+        private CatalogLockImpl(CatalogLock catalogLock, ObjectPath tablePath) {
+            this.catalogLock = catalogLock;
+            this.tablePath = tablePath;
+        }
+
+        @Override
+        public <T> T runWithLock(Callable<T> callable) throws Exception {
+            return catalogLock.runWithLock(
+                    tablePath.getDatabaseName(), tablePath.getObjectName(), callable);
+        }
+
+        @Override
+        public void close() throws Exception {
+            this.catalogLock.close();
+        }
+    }
 }
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/SchemaChange.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/SchemaChange.java
new file mode 100644
index 00000000..58a3a6c6
--- /dev/null
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/SchemaChange.java
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.schema;
+
+import org.apache.flink.table.types.logical.LogicalType;
+
+import javax.annotation.Nullable;
+
+import java.util.Objects;
+
+/** Schema change to table. */
+public interface SchemaChange {
+
+    // TODO more change support like updateColumnNullability and updateColumnComment.
+
+    static SchemaChange setOption(String key, String value) {
+        return new SetOption(key, value);
+    }
+
+    static SchemaChange removeOption(String key) {
+        return new RemoveOption(key);
+    }
+
+    static SchemaChange addColumn(
+            String fieldName, LogicalType logicalType, boolean isNullable, String comment) {
+        return new AddColumn(fieldName, logicalType, isNullable, comment);
+    }
+
+    static SchemaChange updateColumnType(String fieldName, LogicalType newLogicalType) {
+        return new UpdateColumnType(fieldName, newLogicalType);
+    }
+
+    /** A SchemaChange to set a table option. */
+    final class SetOption implements SchemaChange {
+        private final String key;
+        private final String value;
+
+        private SetOption(String key, String value) {
+            this.key = key;
+            this.value = value;
+        }
+
+        public String key() {
+            return key;
+        }
+
+        public String value() {
+            return value;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) {
+                return true;
+            }
+            if (o == null || getClass() != o.getClass()) {
+                return false;
+            }
+            SetOption that = (SetOption) o;
+            return key.equals(that.key) && value.equals(that.value);
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(key, value);
+        }
+    }
+
+    /** A SchemaChange to remove a table option. */
+    final class RemoveOption implements SchemaChange {
+        private final String key;
+
+        private RemoveOption(String key) {
+            this.key = key;
+        }
+
+        public String key() {
+            return key;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) {
+                return true;
+            }
+            if (o == null || getClass() != o.getClass()) {
+                return false;
+            }
+            RemoveOption that = (RemoveOption) o;
+            return key.equals(that.key);
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(key);
+        }
+    }
+
+    /** A SchemaChange to add a field. */
+    final class AddColumn implements SchemaChange {
+        private final String fieldName;
+        private final LogicalType logicalType;
+        private final boolean isNullable;
+        private final String description;
+
+        private AddColumn(
+                String fieldName, LogicalType logicalType, boolean isNullable, String description) {
+            this.fieldName = fieldName;
+            this.logicalType = logicalType;
+            this.isNullable = isNullable;
+            this.description = description;
+        }
+
+        public String fieldName() {
+            return fieldName;
+        }
+
+        public LogicalType logicalType() {
+            return logicalType;
+        }
+
+        public boolean isNullable() {
+            return isNullable;
+        }
+
+        @Nullable
+        public String description() {
+            return description;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) {
+                return true;
+            }
+            if (o == null || getClass() != o.getClass()) {
+                return false;
+            }
+            AddColumn addColumn = (AddColumn) o;
+            return isNullable == addColumn.isNullable
+                    && Objects.equals(fieldName, addColumn.fieldName)
+                    && logicalType.equals(addColumn.logicalType)
+                    && Objects.equals(description, addColumn.description);
+        }
+
+        @Override
+        public int hashCode() {
+            int result = Objects.hash(logicalType, isNullable, description);
+            result = 31 * result + Objects.hashCode(fieldName);
+            return result;
+        }
+    }
+
+    /** A SchemaChange to update the field type. */
+    final class UpdateColumnType implements SchemaChange {
+        private final String fieldName;
+        private final LogicalType newLogicalType;
+
+        private UpdateColumnType(String fieldName, LogicalType newLogicalType) {
+            this.fieldName = fieldName;
+            this.newLogicalType = newLogicalType;
+        }
+
+        public String fieldName() {
+            return fieldName;
+        }
+
+        public LogicalType newLogicalType() {
+            return newLogicalType;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) {
+                return true;
+            }
+            if (o == null || getClass() != o.getClass()) {
+                return false;
+            }
+            UpdateColumnType that = (UpdateColumnType) o;
+            return Objects.equals(fieldName, that.fieldName)
+                    && newLogicalType.equals(that.newLogicalType);
+        }
+
+        @Override
+        public int hashCode() {
+            int result = Objects.hash(newLogicalType);
+            result = 31 * result + Objects.hashCode(fieldName);
+            return result;
+        }
+    }
+}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/SchemaManager.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/SchemaManager.java
index ad4a13c5..b5a0c775 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/SchemaManager.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/SchemaManager.java
@@ -21,19 +21,28 @@ package org.apache.flink.table.store.file.schema;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.table.store.file.operation.Lock;
+import org.apache.flink.table.store.file.schema.SchemaChange.AddColumn;
+import org.apache.flink.table.store.file.schema.SchemaChange.RemoveOption;
+import org.apache.flink.table.store.file.schema.SchemaChange.SetOption;
+import org.apache.flink.table.store.file.schema.SchemaChange.UpdateColumnType;
 import org.apache.flink.table.store.file.utils.AtomicFileWriter;
 import org.apache.flink.table.store.file.utils.FileUtils;
 import org.apache.flink.table.store.file.utils.JsonSerdeUtil;
 import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.util.Preconditions;
 
+import javax.annotation.Nullable;
+
 import java.io.IOException;
 import java.io.Serializable;
 import java.io.UncheckedIOException;
+import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.Callable;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.Collectors;
 
 import static org.apache.flink.table.store.file.utils.FileUtils.listVersionedFiles;
@@ -45,10 +54,17 @@ public class SchemaManager implements Serializable {
 
     private final Path tableRoot;
 
+    @Nullable private transient Lock lock;
+
     public SchemaManager(Path tableRoot) {
         this.tableRoot = tableRoot;
     }
 
+    public SchemaManager withLock(@Nullable Lock lock) {
+        this.lock = lock;
+        return this;
+    }
+
     /** @return latest schema. */
     public Optional<TableSchema> latest() {
         try {
@@ -77,11 +93,6 @@ public class SchemaManager implements Serializable {
 
     /** Create a new schema from {@link UpdateSchema}. */
     public TableSchema commitNewVersion(UpdateSchema updateSchema) throws Exception {
-        return commitNewVersion(Callable::call, updateSchema);
-    }
-
-    /** Create a new schema from {@link UpdateSchema}. */
-    public TableSchema commitNewVersion(Lock lock, UpdateSchema updateSchema) throws Exception {
         RowType rowType = updateSchema.rowType();
         List<String> partitionKeys = updateSchema.partitionKeys();
         List<String> primaryKeys = updateSchema.primaryKeys();
@@ -119,7 +130,7 @@ public class SchemaManager implements Serializable {
                 id = 0;
             }
 
-            TableSchema tableSchema =
+            TableSchema newSchema =
                     new TableSchema(
                             id,
                             fields,
@@ -129,25 +140,105 @@ public class SchemaManager implements Serializable {
                             options,
                             updateSchema.comment());
 
-            Path schemaPath = toSchemaPath(id);
+            boolean success = commit(newSchema);
+            if (success) {
+                return newSchema;
+            }
+        }
+    }
 
-            FileSystem fs = schemaPath.getFileSystem();
-            boolean success =
-                    lock.runWithLock(
-                            () -> {
-                                if (fs.exists(schemaPath)) {
-                                    return false;
-                                }
+    /** Create {@link SchemaChange}s. */
+    public TableSchema commitChanges(List<SchemaChange> changes) throws Exception {
+        while (true) {
+            TableSchema schema =
+                    latest().orElseThrow(
+                                    () -> new RuntimeException("Table not exists: " + tableRoot));
+            Map<String, String> newOptions = new HashMap<>(schema.options());
+            List<DataField> newFields = new ArrayList<>(schema.fields());
+            AtomicInteger highestFieldId = new AtomicInteger(schema.highestFieldId());
+            for (SchemaChange change : changes) {
+                if (change instanceof SetOption) {
+                    SetOption setOption = (SetOption) change;
+                    newOptions.put(setOption.key(), setOption.value());
+                } else if (change instanceof RemoveOption) {
+                    newOptions.remove(((RemoveOption) change).key());
+                } else if (change instanceof AddColumn) {
+                    AddColumn addColumn = (AddColumn) change;
+                    int id = highestFieldId.incrementAndGet();
+                    DataType dataType =
+                            TableSchema.toDataType(addColumn.logicalType(), highestFieldId);
+                    newFields.add(
+                            new DataField(
+                                    id, addColumn.fieldName(), dataType, addColumn.description()));
+                } else if (change instanceof UpdateColumnType) {
+                    UpdateColumnType update = (UpdateColumnType) change;
+                    boolean found = false;
+                    for (int i = 0; i < newFields.size(); i++) {
+                        DataField field = newFields.get(i);
+                        if (field.name().equals(update.fieldName())) {
+                            AtomicInteger dummyId = new AtomicInteger(0);
+                            DataType newType =
+                                    TableSchema.toDataType(update.newLogicalType(), dummyId);
+                            if (dummyId.get() != 0) {
+                                throw new RuntimeException(
+                                        String.format(
+                                                "Update column to nested row type '%s' is not supported.",
+                                                update.newLogicalType()));
+                            }
+                            newFields.set(
+                                    i,
+                                    new DataField(
+                                            field.id(),
+                                            field.name(),
+                                            newType,
+                                            field.description()));
+                            found = true;
+                            break;
+                        }
+                    }
+
+                    if (!found) {
+                        throw new RuntimeException("Can not find column: " + update.fieldName());
+                    }
+                } else {
+                    throw new UnsupportedOperationException(
+                            "Unsupported change: " + change.getClass());
+                }
+            }
 
-                                return AtomicFileWriter.writeFileUtf8(
-                                        schemaPath, tableSchema.toString());
-                            });
+            TableSchema newSchema =
+                    new TableSchema(
+                            schema.id() + 1,
+                            newFields,
+                            highestFieldId.get(),
+                            schema.partitionKeys(),
+                            schema.primaryKeys(),
+                            newOptions,
+                            schema.comment());
+
+            boolean success = commit(newSchema);
             if (success) {
-                return tableSchema;
+                return newSchema;
             }
         }
     }
 
+    private boolean commit(TableSchema newSchema) throws Exception {
+        Path schemaPath = toSchemaPath(newSchema.id());
+        FileSystem fs = schemaPath.getFileSystem();
+        Callable<Boolean> callable =
+                () -> {
+                    if (fs.exists(schemaPath)) {
+                        return false;
+                    }
+                    return AtomicFileWriter.writeFileUtf8(schemaPath, newSchema.toString());
+                };
+        if (lock == null) {
+            return callable.call();
+        }
+        return lock.runWithLock(callable);
+    }
+
     /** Read schema for schema id. */
     public TableSchema schema(long id) {
         try {
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/TableSchema.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/TableSchema.java
index bf7d0068..dbdeccf1 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/TableSchema.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/TableSchema.java
@@ -249,7 +249,7 @@ public class TableSchema implements Serializable {
         return ((RowDataType) toDataType(rowType, new AtomicInteger(-1))).fields();
     }
 
-    private static DataType toDataType(LogicalType type, AtomicInteger currentHighestFieldId) {
+    public static DataType toDataType(LogicalType type, AtomicInteger currentHighestFieldId) {
         if (type instanceof ArrayType) {
             DataType element =
                     toDataType(((ArrayType) type).getElementType(), currentHighestFieldId);
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/TableCommit.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/TableCommit.java
index 7fe87165..78c318cc 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/TableCommit.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/TableCommit.java
@@ -34,12 +34,13 @@ import java.util.Map;
  * An abstraction layer above {@link FileStoreCommit} and {@link FileStoreExpire} to provide
  * snapshot commit and expiration.
  */
-public class TableCommit {
+public class TableCommit implements AutoCloseable {
 
     private final FileStoreCommit commit;
     private final FileStoreExpire expire;
 
     @Nullable private Map<String, String> overwritePartition = null;
+    @Nullable private Lock lock;
 
     public TableCommit(FileStoreCommit commit, FileStoreExpire expire) {
         this.commit = commit;
@@ -54,6 +55,7 @@ public class TableCommit {
     public TableCommit withLock(Lock lock) {
         commit.withLock(lock);
         expire.withLock(lock);
+        this.lock = lock;
         return this;
     }
 
@@ -84,4 +86,11 @@ public class TableCommit {
         }
         expire.expire();
     }
+
+    @Override
+    public void close() throws Exception {
+        if (lock != null) {
+            lock.close();
+        }
+    }
 }
diff --git a/flink-table-store-hive/flink-table-store-hive-catalog/src/main/java/org/apache/flink/table/store/hive/HiveCatalog.java b/flink-table-store-hive/flink-table-store-hive-catalog/src/main/java/org/apache/flink/table/store/hive/HiveCatalog.java
index f6a49057..b649c872 100644
--- a/flink-table-store-hive/flink-table-store-hive-catalog/src/main/java/org/apache/flink/table/store/hive/HiveCatalog.java
+++ b/flink-table-store-hive/flink-table-store-hive-catalog/src/main/java/org/apache/flink/table/store/hive/HiveCatalog.java
@@ -22,7 +22,9 @@ import org.apache.flink.core.fs.Path;
 import org.apache.flink.table.catalog.ObjectPath;
 import org.apache.flink.table.store.file.catalog.AbstractCatalog;
 import org.apache.flink.table.store.file.catalog.CatalogLock;
+import org.apache.flink.table.store.file.operation.Lock;
 import org.apache.flink.table.store.file.schema.DataField;
+import org.apache.flink.table.store.file.schema.SchemaChange;
 import org.apache.flink.table.store.file.schema.SchemaManager;
 import org.apache.flink.table.store.file.schema.TableSchema;
 import org.apache.flink.table.store.file.schema.UpdateSchema;
@@ -53,6 +55,8 @@ import java.util.Optional;
 import java.util.stream.Collectors;
 
 import static org.apache.flink.table.store.CatalogOptions.LOCK_ENABLED;
+import static org.apache.flink.table.store.hive.HiveCatalogLock.acquireTimeout;
+import static org.apache.flink.table.store.hive.HiveCatalogLock.checkMaxSleep;
 
 /** A catalog implementation for Hive. */
 public class HiveCatalog extends AbstractCatalog {
@@ -78,14 +82,16 @@ public class HiveCatalog extends AbstractCatalog {
 
     @Override
     public Optional<CatalogLock.Factory> lockFactory() {
-        boolean lockEnabled =
-                Boolean.parseBoolean(
-                        hiveConf.get(LOCK_ENABLED.key(), LOCK_ENABLED.defaultValue().toString()));
-        return lockEnabled
+        return lockEnabled()
                 ? Optional.of(HiveCatalogLock.createFactory(hiveConf))
                 : Optional.empty();
     }
 
+    private boolean lockEnabled() {
+        return Boolean.parseBoolean(
+                hiveConf.get(LOCK_ENABLED.key(), LOCK_ENABLED.defaultValue().toString()));
+    }
+
     @Override
     public List<String> listDatabases() {
         try {
@@ -211,7 +217,16 @@ public class HiveCatalog extends AbstractCatalog {
 
         // first commit changes to underlying files
         // if changes on Hive fails there is no harm to perform the same changes to files again
-        TableSchema schema = commitToUnderlyingFiles(tablePath, updateSchema);
+        TableSchema schema;
+        try {
+            schema = schemaManager(tablePath).commitNewVersion(updateSchema);
+        } catch (Exception e) {
+            throw new RuntimeException(
+                    "Failed to commit changes of table "
+                            + tablePath.getFullName()
+                            + " to underlying files",
+                    e);
+        }
         Table table = newHmsTable(tablePath);
         updateHmsTable(table, tablePath, schema);
         try {
@@ -223,7 +238,7 @@ public class HiveCatalog extends AbstractCatalog {
 
     @Override
     public void alterTable(
-            ObjectPath tablePath, UpdateSchema updateSchema, boolean ignoreIfNotExists)
+            ObjectPath tablePath, List<SchemaChange> changes, boolean ignoreIfNotExists)
             throws TableNotExistException {
         if (isTableStoreTableNotExisted(tablePath)) {
             if (ignoreIfNotExists) {
@@ -233,20 +248,21 @@ public class HiveCatalog extends AbstractCatalog {
             }
         }
 
-        // first commit changes to underlying files
-        // if changes on Hive fails there is no harm to perform the same changes to files again
-        TableSchema schema = commitToUnderlyingFiles(tablePath, updateSchema);
         try {
+            // first commit changes to underlying files
+            TableSchema schema = schemaManager(tablePath).commitChanges(changes);
+
+            // sync to hive hms
             Table table = client.getTable(tablePath.getDatabaseName(), tablePath.getObjectName());
             updateHmsTable(table, tablePath, schema);
             client.alter_table(tablePath.getDatabaseName(), tablePath.getObjectName(), table);
-        } catch (TException e) {
-            throw new RuntimeException("Failed to alter table " + tablePath.getFullName(), e);
+        } catch (Exception e) {
+            throw new RuntimeException(e);
         }
     }
 
     @Override
-    public void close() throws Exception {
+    public void close() {
         client.close();
     }
 
@@ -343,17 +359,18 @@ public class HiveCatalog extends AbstractCatalog {
         return false;
     }
 
-    private TableSchema commitToUnderlyingFiles(ObjectPath tablePath, UpdateSchema schema) {
-        Path path = getTableLocation(tablePath);
-        try {
-            return new SchemaManager(path).commitNewVersion(schema);
-        } catch (Exception e) {
-            throw new RuntimeException(
-                    "Failed to commit changes of table "
-                            + tablePath.getFullName()
-                            + " to underlying files",
-                    e);
+    private SchemaManager schemaManager(ObjectPath tablePath) {
+        return new SchemaManager(getTableLocation(tablePath)).withLock(lock(tablePath));
+    }
+
+    private Lock lock(ObjectPath tablePath) {
+        if (!lockEnabled()) {
+            return null;
         }
+
+        HiveCatalogLock lock =
+                new HiveCatalogLock(client, checkMaxSleep(hiveConf), acquireTimeout(hiveConf));
+        return Lock.fromCatalog(lock, tablePath);
     }
 
     static IMetaStoreClient createClient(HiveConf hiveConf) {
diff --git a/flink-table-store-hive/flink-table-store-hive-catalog/src/main/java/org/apache/flink/table/store/hive/HiveCatalogLock.java b/flink-table-store-hive/flink-table-store-hive-catalog/src/main/java/org/apache/flink/table/store/hive/HiveCatalogLock.java
index 7dd8232c..64352278 100644
--- a/flink-table-store-hive/flink-table-store-hive-catalog/src/main/java/org/apache/flink/table/store/hive/HiveCatalogLock.java
+++ b/flink-table-store-hive/flink-table-store-hive-catalog/src/main/java/org/apache/flink/table/store/hive/HiveCatalogLock.java
@@ -129,22 +129,24 @@ public class HiveCatalogLock implements CatalogLock {
         @Override
         public CatalogLock create() {
             HiveConf conf = hiveConf.conf();
-            long checkMaxSleep =
-                    TimeUtils.parseDuration(
-                                    conf.get(
-                                            LOCK_CHECK_MAX_SLEEP.key(),
-                                            TimeUtils.getStringInMillis(
-                                                    LOCK_CHECK_MAX_SLEEP.defaultValue())))
-                            .toMillis();
-            long acquireTimeout =
-                    TimeUtils.parseDuration(
-                                    conf.get(
-                                            LOCK_ACQUIRE_TIMEOUT.key(),
-                                            TimeUtils.getStringInMillis(
-                                                    LOCK_ACQUIRE_TIMEOUT.defaultValue())))
-                            .toMillis();
             return new HiveCatalogLock(
-                    HiveCatalog.createClient(conf), checkMaxSleep, acquireTimeout);
+                    HiveCatalog.createClient(conf), checkMaxSleep(conf), acquireTimeout(conf));
         }
     }
+
+    public static long checkMaxSleep(HiveConf conf) {
+        return TimeUtils.parseDuration(
+                        conf.get(
+                                LOCK_CHECK_MAX_SLEEP.key(),
+                                TimeUtils.getStringInMillis(LOCK_CHECK_MAX_SLEEP.defaultValue())))
+                .toMillis();
+    }
+
+    public static long acquireTimeout(HiveConf conf) {
+        return TimeUtils.parseDuration(
+                        conf.get(
+                                LOCK_ACQUIRE_TIMEOUT.key(),
+                                TimeUtils.getStringInMillis(LOCK_ACQUIRE_TIMEOUT.defaultValue())))
+                .toMillis();
+    }
 }
diff --git a/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkCatalog.java b/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkCatalog.java
index ae8e2f51..b53cb33f 100644
--- a/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkCatalog.java
+++ b/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkCatalog.java
@@ -22,6 +22,7 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.table.catalog.ObjectPath;
 import org.apache.flink.table.store.file.catalog.Catalog;
 import org.apache.flink.table.store.file.catalog.CatalogFactory;
+import org.apache.flink.table.store.file.schema.SchemaChange;
 import org.apache.flink.util.Preconditions;
 
 import org.apache.spark.sql.catalyst.analysis.NoSuchNamespaceException;
@@ -32,6 +33,10 @@ import org.apache.spark.sql.connector.catalog.SupportsNamespaces;
 import org.apache.spark.sql.connector.catalog.Table;
 import org.apache.spark.sql.connector.catalog.TableCatalog;
 import org.apache.spark.sql.connector.catalog.TableChange;
+import org.apache.spark.sql.connector.catalog.TableChange.AddColumn;
+import org.apache.spark.sql.connector.catalog.TableChange.RemoveProperty;
+import org.apache.spark.sql.connector.catalog.TableChange.SetProperty;
+import org.apache.spark.sql.connector.catalog.TableChange.UpdateColumnType;
 import org.apache.spark.sql.connector.expressions.Transform;
 import org.apache.spark.sql.types.StructType;
 import org.apache.spark.sql.util.CaseInsensitiveStringMap;
@@ -40,6 +45,9 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.table.store.spark.SparkTypeUtils.toFlinkType;
 
 /** Spark {@link TableCatalog} for table store. */
 public class SparkCatalog implements TableCatalog, SupportsNamespaces {
@@ -101,28 +109,70 @@ public class SparkCatalog implements TableCatalog, SupportsNamespaces {
     }
 
     @Override
-    public Table loadTable(Identifier ident) throws NoSuchTableException {
-        if (!isValidateNamespace(ident.namespace())) {
+    public SparkTable loadTable(Identifier ident) throws NoSuchTableException {
+        try {
+            return new SparkTable(catalog.getTable(objectPath(ident)));
+        } catch (Catalog.TableNotExistException e) {
             throw new NoSuchTableException(ident);
         }
+    }
 
+    @Override
+    public Table alterTable(Identifier ident, TableChange... changes) throws NoSuchTableException {
+        List<SchemaChange> schemaChanges =
+                Arrays.stream(changes).map(this::toSchemaChange).collect(Collectors.toList());
         try {
-            return new SparkTable(
-                    catalog.getTable(new ObjectPath(ident.namespace()[0], ident.name())));
+            catalog.alterTable(objectPath(ident), schemaChanges, false);
+            return loadTable(ident);
         } catch (Catalog.TableNotExistException e) {
             throw new NoSuchTableException(ident);
         }
     }
 
-    @Override
-    public void renameTable(Identifier oldIdent, Identifier newIdent) {
-        throw new UnsupportedOperationException();
+    private SchemaChange toSchemaChange(TableChange change) {
+        if (change instanceof SetProperty) {
+            SetProperty set = (SetProperty) change;
+            return SchemaChange.setOption(set.property(), set.value());
+        } else if (change instanceof RemoveProperty) {
+            return SchemaChange.removeOption(((RemoveProperty) change).property());
+        } else if (change instanceof AddColumn) {
+            AddColumn add = (AddColumn) change;
+            validateAlterNestedField(add.fieldNames());
+            return SchemaChange.addColumn(
+                    add.fieldNames()[0],
+                    toFlinkType(add.dataType()),
+                    add.isNullable(),
+                    add.comment());
+        } else if (change instanceof UpdateColumnType) {
+            UpdateColumnType update = (UpdateColumnType) change;
+            validateAlterNestedField(update.fieldNames());
+            return SchemaChange.updateColumnType(
+                    update.fieldNames()[0], toFlinkType(update.newDataType()));
+        } else {
+            throw new UnsupportedOperationException(
+                    "Change is not supported: " + change.getClass());
+        }
+    }
+
+    private void validateAlterNestedField(String[] fieldNames) {
+        if (fieldNames.length > 1) {
+            throw new UnsupportedOperationException(
+                    "Alter nested column is not supported: " + Arrays.toString(fieldNames));
+        }
     }
 
     private boolean isValidateNamespace(String[] namespace) {
         return namespace.length == 1;
     }
 
+    private ObjectPath objectPath(Identifier ident) throws NoSuchTableException {
+        if (!isValidateNamespace(ident.namespace())) {
+            throw new NoSuchTableException(ident);
+        }
+
+        return new ObjectPath(ident.namespace()[0], ident.name());
+    }
+
     // --------------------- unsupported methods ----------------------------
 
     @Override
@@ -150,12 +200,12 @@ public class SparkCatalog implements TableCatalog, SupportsNamespaces {
     }
 
     @Override
-    public Table alterTable(Identifier ident, TableChange... changes) {
-        throw new UnsupportedOperationException("Alter table in Spark is not supported yet.");
+    public boolean dropTable(Identifier ident) {
+        throw new UnsupportedOperationException("Drop table in Spark is not supported yet.");
     }
 
     @Override
-    public boolean dropTable(Identifier ident) {
-        throw new UnsupportedOperationException("Drop table in Spark is not supported yet.");
+    public void renameTable(Identifier oldIdent, Identifier newIdent) {
+        throw new UnsupportedOperationException();
     }
 }
diff --git a/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkTypeUtils.java b/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkTypeUtils.java
index 65481c21..db607fa3 100644
--- a/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkTypeUtils.java
+++ b/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkTypeUtils.java
@@ -33,6 +33,7 @@ import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.types.logical.MapType;
 import org.apache.flink.table.types.logical.MultisetType;
 import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.RowType.RowField;
 import org.apache.flink.table.types.logical.SmallIntType;
 import org.apache.flink.table.types.logical.TimestampType;
 import org.apache.flink.table.types.logical.TinyIntType;
@@ -42,11 +43,13 @@ import org.apache.flink.table.types.logical.utils.LogicalTypeDefaultVisitor;
 
 import org.apache.spark.sql.types.DataType;
 import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.LongType;
 import org.apache.spark.sql.types.StructField;
 import org.apache.spark.sql.types.StructType;
+import org.apache.spark.sql.types.UserDefinedType;
 
+import java.util.ArrayList;
 import java.util.List;
-import java.util.stream.Collectors;
 
 /** Utils for spark {@link DataType}. */
 public class SparkTypeUtils {
@@ -58,12 +61,16 @@ public class SparkTypeUtils {
     }
 
     public static DataType fromFlinkType(LogicalType type) {
-        return type.accept(FlinkToSparkTypeVistor.INSTANCE);
+        return type.accept(FlinkToSparkTypeVisitor.INSTANCE);
     }
 
-    private static class FlinkToSparkTypeVistor extends LogicalTypeDefaultVisitor<DataType> {
+    public static LogicalType toFlinkType(DataType dataType) {
+        return SparkToFlinkTypeVisitor.visit(dataType);
+    }
+
+    private static class FlinkToSparkTypeVisitor extends LogicalTypeDefaultVisitor<DataType> {
 
-        private static final FlinkToSparkTypeVistor INSTANCE = new FlinkToSparkTypeVistor();
+        private static final FlinkToSparkTypeVisitor INSTANCE = new FlinkToSparkTypeVisitor();
 
         @Override
         public DataType visit(CharType charType) {
@@ -162,15 +169,17 @@ public class SparkTypeUtils {
 
         @Override
         public DataType visit(RowType rowType) {
-            List<StructField> fields =
-                    rowType.getFields().stream()
-                            .map(
-                                    field ->
-                                            DataTypes.createStructField(
-                                                    field.getName(),
-                                                    field.getType().accept(this),
-                                                    field.getType().isNullable()))
-                            .collect(Collectors.toList());
+            List<StructField> fields = new ArrayList<>(rowType.getFieldCount());
+            for (RowField field : rowType.getFields()) {
+                StructField structField =
+                        DataTypes.createStructField(
+                                field.getName(),
+                                field.getType().accept(this),
+                                field.getType().isNullable());
+                structField =
+                        field.getDescription().map(structField::withComment).orElse(structField);
+                fields.add(structField);
+            }
             return DataTypes.createStructType(fields);
         }
 
@@ -179,4 +188,101 @@ public class SparkTypeUtils {
             throw new UnsupportedOperationException("Unsupported type: " + logicalType);
         }
     }
+
+    private static class SparkToFlinkTypeVisitor {
+
+        static LogicalType visit(DataType type) {
+            return visit(type, new SparkToFlinkTypeVisitor());
+        }
+
+        static LogicalType visit(DataType type, SparkToFlinkTypeVisitor visitor) {
+            if (type instanceof StructType) {
+                StructField[] fields = ((StructType) type).fields();
+                List<LogicalType> fieldResults = new ArrayList<>(fields.length);
+
+                for (StructField field : fields) {
+                    fieldResults.add(visit(field.dataType(), visitor));
+                }
+
+                return visitor.struct((StructType) type, fieldResults);
+
+            } else if (type instanceof org.apache.spark.sql.types.MapType) {
+                return visitor.map(
+                        (org.apache.spark.sql.types.MapType) type,
+                        visit(((org.apache.spark.sql.types.MapType) type).keyType(), visitor),
+                        visit(((org.apache.spark.sql.types.MapType) type).valueType(), visitor));
+
+            } else if (type instanceof org.apache.spark.sql.types.ArrayType) {
+                return visitor.array(
+                        (org.apache.spark.sql.types.ArrayType) type,
+                        visit(
+                                ((org.apache.spark.sql.types.ArrayType) type).elementType(),
+                                visitor));
+
+            } else if (type instanceof UserDefinedType) {
+                throw new UnsupportedOperationException("User-defined types are not supported");
+
+            } else {
+                return visitor.atomic(type);
+            }
+        }
+
+        public LogicalType struct(StructType struct, List<LogicalType> fieldResults) {
+            StructField[] fields = struct.fields();
+            List<RowField> newFields = new ArrayList<>(fields.length);
+            for (int i = 0; i < fields.length; i += 1) {
+                StructField field = fields[i];
+                LogicalType fieldType = fieldResults.get(i).copy(field.nullable());
+                String comment = field.getComment().getOrElse(() -> null);
+                newFields.add(new RowField(field.name(), fieldType, comment));
+            }
+
+            return new RowType(newFields);
+        }
+
+        public LogicalType array(
+                org.apache.spark.sql.types.ArrayType array, LogicalType elementResult) {
+            return new ArrayType(elementResult.copy(array.containsNull()));
+        }
+
+        public LogicalType map(
+                org.apache.spark.sql.types.MapType map,
+                LogicalType keyResult,
+                LogicalType valueResult) {
+            return new MapType(keyResult.copy(false), valueResult.copy(map.valueContainsNull()));
+        }
+
+        public LogicalType atomic(DataType atomic) {
+            if (atomic instanceof org.apache.spark.sql.types.BooleanType) {
+                return new BooleanType();
+            } else if (atomic instanceof org.apache.spark.sql.types.ByteType) {
+                return new TinyIntType();
+            } else if (atomic instanceof org.apache.spark.sql.types.ShortType) {
+                return new SmallIntType();
+            } else if (atomic instanceof org.apache.spark.sql.types.IntegerType) {
+                return new IntType();
+            } else if (atomic instanceof LongType) {
+                return new BigIntType();
+            } else if (atomic instanceof org.apache.spark.sql.types.FloatType) {
+                return new FloatType();
+            } else if (atomic instanceof org.apache.spark.sql.types.DoubleType) {
+                return new DoubleType();
+            } else if (atomic instanceof org.apache.spark.sql.types.StringType) {
+                return new VarCharType(VarCharType.MAX_LENGTH);
+            } else if (atomic instanceof org.apache.spark.sql.types.DateType) {
+                return new DateType();
+            } else if (atomic instanceof org.apache.spark.sql.types.TimestampType) {
+                return new TimestampType();
+            } else if (atomic instanceof org.apache.spark.sql.types.DecimalType) {
+                return new DecimalType(
+                        ((org.apache.spark.sql.types.DecimalType) atomic).precision(),
+                        ((org.apache.spark.sql.types.DecimalType) atomic).scale());
+            } else if (atomic instanceof org.apache.spark.sql.types.BinaryType) {
+                return new VarBinaryType(VarBinaryType.MAX_LENGTH);
+            }
+
+            throw new UnsupportedOperationException(
+                    "Not a supported type: " + atomic.catalogString());
+        }
+    }
 }
diff --git a/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkInternalRowTest.java b/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkInternalRowTest.java
index f1311dd0..ba8b5ddb 100644
--- a/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkInternalRowTest.java
+++ b/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkInternalRowTest.java
@@ -28,7 +28,6 @@ import org.junit.jupiter.api.Test;
 
 import java.math.BigDecimal;
 import java.nio.charset.StandardCharsets;
-import java.time.Instant;
 import java.time.LocalDate;
 import java.time.LocalDateTime;
 import java.util.AbstractMap;
@@ -57,24 +56,16 @@ public class SparkInternalRowTest {
                                 .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)),
                         new String[] {"v1", "v5"},
                         new Integer[] {10, 30},
-                        "char_v",
-                        "varchar_v",
                         true,
                         (byte) 22,
                         (short) 356,
                         23567222L,
-                        "binary_v".getBytes(StandardCharsets.UTF_8),
                         "varbinary_v".getBytes(StandardCharsets.UTF_8),
                         LocalDateTime.parse("2007-12-03T10:15:30"),
-                        Instant.parse("2007-12-03T10:15:30.00Z"),
                         LocalDate.parse("2022-05-02"),
                         BigDecimal.valueOf(0.21),
                         BigDecimal.valueOf(65782123123.01),
-                        BigDecimal.valueOf(62123123.5),
-                        Stream.of(
-                                        new AbstractMap.SimpleEntry<>("key1", 5),
-                                        new AbstractMap.SimpleEntry<>("key2", 2))
-                                .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)));
+                        BigDecimal.valueOf(62123123.5));
 
         RowRowConverter flinkConverter =
                 RowRowConverter.create(TypeConversions.fromLogicalToDataType(ALL_TYPES));
@@ -96,21 +87,17 @@ public class SparkInternalRowTest {
                         + "\"locations\":{\"key1\":{\"posX\":1.2,\"posY\":2.3},\"key2\":{\"posX\":2.4,\"posY\":3.5}},"
                         + "\"strArray\":[\"v1\",\"v5\"],"
                         + "\"intArray\":[10,30],"
-                        + "\"char\":\"char_v\","
-                        + "\"varchar\":\"varchar_v\","
                         + "\"boolean\":true,"
                         + "\"tinyint\":22,"
                         + "\"smallint\":356,"
                         + "\"bigint\":23567222,"
-                        + "\"varbinary\":\"YmluYXJ5X3Y=\","
-                        + "\"binary\":\"dmFyYmluYXJ5X3Y=\","
-                        + "\"timestampWithoutZone\":\"2007-12-03 10:15:30\","
-                        + "\"timestampWithZone\":\"2007-12-03 10:15:30\","
+                        + "\"bytes\":\"dmFyYmluYXJ5X3Y=\","
+                        + "\"timestamp\":\"2007-12-03 10:15:30\","
                         + "\"date\":\"2022-05-02\","
                         + "\"decimal\":0.21,"
                         + "\"decimal2\":65782123123.01,"
-                        + "\"decimal3\":62123123.5,"
-                        + "\"multiset\":{\"key1\":5,\"key2\":2}}";
+                        + "\"decimal3\":62123123.5"
+                        + "}";
         assertThat(sparkRow.json()).isEqualTo(expected);
     }
 }
diff --git a/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkReadITCase.java b/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkReadITCase.java
index 26567405..b7b2ffae 100644
--- a/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkReadITCase.java
+++ b/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkReadITCase.java
@@ -21,6 +21,8 @@ package org.apache.flink.table.store.spark;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.table.data.GenericRowData;
 import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.store.file.schema.TableSchema;
+import org.apache.flink.table.store.table.FileStoreTableFactory;
 import org.apache.flink.table.types.logical.BigIntType;
 import org.apache.flink.table.types.logical.IntType;
 import org.apache.flink.table.types.logical.RowType;
@@ -39,6 +41,8 @@ import java.io.File;
 import java.io.IOException;
 import java.util.Arrays;
 import java.util.List;
+import java.util.Map;
+import java.util.UUID;
 
 import static org.assertj.core.api.Assertions.assertThat;
 
@@ -49,6 +53,8 @@ public class SparkReadITCase {
 
     private static SparkSession spark = null;
 
+    private static Path warehousePath = null;
+
     private static Path tablePath1;
 
     private static Path tablePath2;
@@ -57,7 +63,7 @@ public class SparkReadITCase {
     public static void startMetastoreAndSpark() throws Exception {
         warehouse = File.createTempFile("warehouse", null);
         assertThat(warehouse.delete()).isTrue();
-        Path warehousePath = new Path("file:" + warehouse);
+        warehousePath = new Path("file:" + warehouse);
         spark = SparkSession.builder().master("local[2]").getOrCreate();
         spark.conf().set("spark.sql.catalog.table_store", SparkCatalog.class.getName());
         spark.conf().set("spark.sql.catalog.table_store.warehouse", warehousePath.toString());
@@ -122,6 +128,56 @@ public class SparkReadITCase {
         innerTestFilterPushDown(spark.table("table_store.default.t2"));
     }
 
+    @Test
+    public void testSetAndRemoveOption() {
+        spark.sql("ALTER TABLE table_store.default.t1 SET TBLPROPERTIES('xyc' 'unknown1')");
+
+        Map<String, String> options = schema1().options();
+        assertThat(options).containsEntry("xyc", "unknown1");
+
+        spark.sql("ALTER TABLE table_store.default.t1 UNSET TBLPROPERTIES('xyc')");
+
+        options = schema1().options();
+        assertThat(options).doesNotContainKey("xyc");
+    }
+
+    @Test
+    public void testAddColumn() throws Exception {
+        Path tablePath = new Path(warehousePath, "default.db/" + UUID.randomUUID());
+        SimpleTableTestHelper testHelper1 = createTestHelper(tablePath);
+        testHelper1.write(GenericRowData.of(1, 2L, StringData.fromString("1")));
+        testHelper1.write(GenericRowData.of(5, 6L, StringData.fromString("3")));
+        testHelper1.commit();
+
+        spark.sql("ALTER TABLE table_store.default.testAddColumn ADD COLUMN d STRING");
+
+        Dataset<Row> table = spark.table("table_store.default.testAddColumn");
+        List<Row> results = table.collectAsList();
+        assertThat(results.toString()).isEqualTo("[[1,2,1,null], [5,6,3,null]]");
+
+        results = table.select("a", "c").collectAsList();
+        assertThat(results.toString()).isEqualTo("[[1,1], [5,3]]");
+
+        results = table.groupBy().sum("b").collectAsList();
+        assertThat(results.toString()).isEqualTo("[[8]]");
+    }
+
+    @Test
+    public void testAlterColumnType() throws Exception {
+        Path tablePath = new Path(warehousePath, "default.db/testAddColumn");
+        SimpleTableTestHelper testHelper1 = createTestHelper(tablePath);
+        testHelper1.write(GenericRowData.of(1, 2L, StringData.fromString("1")));
+        testHelper1.write(GenericRowData.of(5, 6L, StringData.fromString("3")));
+        testHelper1.commit();
+
+        spark.sql("ALTER TABLE table_store.default.testAddColumn ALTER COLUMN a TYPE BIGINT");
+        innerTestNormal(spark.table("table_store.default.testAddColumn"));
+    }
+
+    private TableSchema schema1() {
+        return FileStoreTableFactory.create(tablePath1).schema();
+    }
+
     private void innerTestNormal(Dataset<Row> dataset) {
         List<Row> results = dataset.collectAsList();
         assertThat(results.toString()).isEqualTo("[[1,2,1], [5,6,3]]");
diff --git a/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkTypeTest.java b/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkTypeTest.java
index 12c8694a..abae4e2d 100644
--- a/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkTypeTest.java
+++ b/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkTypeTest.java
@@ -22,8 +22,11 @@ import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.types.logical.RowType;
 
+import org.apache.spark.sql.types.StructType;
 import org.junit.jupiter.api.Test;
 
+import static org.apache.flink.table.store.spark.SparkTypeUtils.fromFlinkRowType;
+import static org.apache.flink.table.store.spark.SparkTypeUtils.toFlinkType;
 import static org.assertj.core.api.Assertions.assertThat;
 
 /** Test for {@link SparkTypeUtils}. */
@@ -38,7 +41,7 @@ public class SparkTypeTest {
                             .field(
                                     "locations",
                                     DataTypes.MAP(
-                                            DataTypes.STRING(),
+                                            DataTypes.STRING().notNull(),
                                             DataTypes.ROW(
                                                     DataTypes.FIELD(
                                                             "posX",
@@ -50,24 +53,20 @@ public class SparkTypeTest {
                                                             "Y field"))))
                             .field("strArray", DataTypes.ARRAY(DataTypes.STRING()).nullable())
                             .field("intArray", DataTypes.ARRAY(DataTypes.INT()).nullable())
-                            .field("char", DataTypes.CHAR(10).notNull())
-                            .field("varchar", DataTypes.VARCHAR(10).notNull())
                             .field("boolean", DataTypes.BOOLEAN().nullable())
                             .field("tinyint", DataTypes.TINYINT())
                             .field("smallint", DataTypes.SMALLINT())
                             .field("bigint", DataTypes.BIGINT())
-                            .field("varbinary", DataTypes.VARBINARY(10))
-                            .field("binary", DataTypes.BINARY(10))
-                            .field("timestampWithoutZone", DataTypes.TIMESTAMP())
-                            .field("timestampWithZone", DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE())
+                            .field("bytes", DataTypes.BYTES())
+                            .field("timestamp", DataTypes.TIMESTAMP())
                             .field("date", DataTypes.DATE())
                             .field("decimal", DataTypes.DECIMAL(2, 2))
                             .field("decimal2", DataTypes.DECIMAL(38, 2))
                             .field("decimal3", DataTypes.DECIMAL(10, 1))
-                            .field("multiset", DataTypes.MULTISET(DataTypes.STRING().notNull()))
                             .build()
                             .toRowDataType()
-                            .getLogicalType();
+                            .getLogicalType()
+                            .copy(true);
 
     @Test
     public void testAllTypes() {
@@ -84,22 +83,20 @@ public class SparkTypeTest {
                         + ", "
                         + "StructField(strArray,ArrayType(StringType,true),true), "
                         + "StructField(intArray,ArrayType(IntegerType,true),true), "
-                        + "StructField(char,StringType,false), "
-                        + "StructField(varchar,StringType,false), "
                         + "StructField(boolean,BooleanType,true), "
                         + "StructField(tinyint,ByteType,true), "
                         + "StructField(smallint,ShortType,true), "
                         + "StructField(bigint,LongType,true), "
-                        + "StructField(varbinary,BinaryType,true), "
-                        + "StructField(binary,BinaryType,true), "
-                        + "StructField(timestampWithoutZone,TimestampType,true), "
-                        + "StructField(timestampWithZone,TimestampType,true), "
+                        + "StructField(bytes,BinaryType,true), "
+                        + "StructField(timestamp,TimestampType,true), "
                         + "StructField(date,DateType,true), "
                         + "StructField(decimal,DecimalType(2,2),true), "
                         + "StructField(decimal2,DecimalType(38,2),true), "
-                        + "StructField(decimal3,DecimalType(10,1),true), "
-                        + "StructField(multiset,MapType(StringType,IntegerType,false),true))";
+                        + "StructField(decimal3,DecimalType(10,1),true))";
 
-        assertThat(SparkTypeUtils.fromFlinkRowType(ALL_TYPES).toString()).isEqualTo(expected);
+        StructType sparkType = fromFlinkRowType(ALL_TYPES);
+        assertThat(sparkType.toString()).isEqualTo(expected);
+
+        assertThat(toFlinkType(sparkType)).isEqualTo(ALL_TYPES);
     }
 }