You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2022/07/19 03:44:01 UTC

[flink-table-store] branch master updated: [FLINK-28533] SchemaChange supports updateColumnNullability and updateColumnComment

This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git


The following commit(s) were added to refs/heads/master by this push:
     new d9b3eeef [FLINK-28533] SchemaChange supports updateColumnNullability and updateColumnComment
d9b3eeef is described below

commit d9b3eeef306efa10a8c4f1a5d09be11a33849ead
Author: Jane Chan <55...@users.noreply.github.com>
AuthorDate: Tue Jul 19 11:43:58 2022 +0800

    [FLINK-28533] SchemaChange supports updateColumnNullability and updateColumnComment
    
    This closes #215
---
 .../table/store/file/schema/ArrayDataType.java     |   5 +
 .../table/store/file/schema/AtomicDataType.java    |   5 +
 .../flink/table/store/file/schema/DataType.java    |   8 +
 .../flink/table/store/file/schema/MapDataType.java |   5 +
 .../table/store/file/schema/MultisetDataType.java  |   5 +
 .../flink/table/store/file/schema/RowDataType.java |   5 +
 .../table/store/file/schema/SchemaChange.java      |  89 ++++++-
 .../table/store/file/schema/SchemaManager.java     |  99 ++++++--
 .../flink/table/store/file/schema/TableSchema.java |   2 +-
 .../flink/table/store/spark/SparkCatalog.java      |   8 +
 .../flink/table/store/spark/SparkReadITCase.java   | 281 +++++++++++++++++++--
 11 files changed, 466 insertions(+), 46 deletions(-)

diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/ArrayDataType.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/ArrayDataType.java
index ba48fea3..e5e4bbf8 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/ArrayDataType.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/ArrayDataType.java
@@ -38,6 +38,11 @@ public class ArrayDataType extends DataType {
         return elementType;
     }
 
+    @Override
+    public DataType copy(boolean isNullable) {
+        return new ArrayDataType(isNullable, elementType);
+    }
+
     @Override
     public boolean equals(Object o) {
         if (this == o) {
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/AtomicDataType.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/AtomicDataType.java
index 6d4db17b..116721e3 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/AtomicDataType.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/AtomicDataType.java
@@ -28,4 +28,9 @@ public final class AtomicDataType extends DataType {
     public AtomicDataType(LogicalType logicalType) {
         super(logicalType);
     }
+
+    @Override
+    public DataType copy(boolean isNullable) {
+        return new AtomicDataType(logicalType.copy(isNullable));
+    }
 }
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/DataType.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/DataType.java
index 26cbdbee..83bfff0b 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/DataType.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/DataType.java
@@ -45,6 +45,14 @@ public abstract class DataType implements Serializable {
         return logicalType;
     }
 
+    /**
+     * Returns a copy of this data type with possibly different nullability.
+     *
+     * @param isNullable the intended nullability of the copied type
+     * @return a copied {@link DataType}
+     */
+    public abstract DataType copy(boolean isNullable);
+
     @Override
     public boolean equals(Object o) {
         if (this == o) {
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/MapDataType.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/MapDataType.java
index 4f27bf8a..b6719d4a 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/MapDataType.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/MapDataType.java
@@ -45,6 +45,11 @@ public class MapDataType extends DataType {
         return valueType;
     }
 
+    @Override
+    public DataType copy(boolean isNullable) {
+        return new MapDataType(isNullable, keyType, valueType);
+    }
+
     @Override
     public boolean equals(Object o) {
         if (this == o) {
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/MultisetDataType.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/MultisetDataType.java
index 80e5c27d..6b69cdf8 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/MultisetDataType.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/MultisetDataType.java
@@ -38,6 +38,11 @@ public class MultisetDataType extends DataType {
         return elementType;
     }
 
+    @Override
+    public DataType copy(boolean isNullable) {
+        return new MultisetDataType(isNullable, elementType);
+    }
+
     @Override
     public boolean equals(Object o) {
         if (this == o) {
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/RowDataType.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/RowDataType.java
index a0a2bae1..081b4b88 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/RowDataType.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/RowDataType.java
@@ -54,6 +54,11 @@ public class RowDataType extends DataType {
         return fields;
     }
 
+    @Override
+    public DataType copy(boolean isNullable) {
+        return new RowDataType(isNullable, fields);
+    }
+
     @Override
     public boolean equals(Object o) {
         if (this == o) {
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/SchemaChange.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/SchemaChange.java
index 58a3a6c6..42674fa9 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/SchemaChange.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/SchemaChange.java
@@ -22,13 +22,12 @@ import org.apache.flink.table.types.logical.LogicalType;
 
 import javax.annotation.Nullable;
 
+import java.util.Arrays;
 import java.util.Objects;
 
 /** Schema change to table. */
 public interface SchemaChange {
 
-    // TODO more change support like updateColumnNullability and updateColumnComment.
-
     static SchemaChange setOption(String key, String value) {
         return new SetOption(key, value);
     }
@@ -46,6 +45,14 @@ public interface SchemaChange {
         return new UpdateColumnType(fieldName, newLogicalType);
     }
 
+    static SchemaChange updateColumnNullability(String[] fieldNames, boolean newNullability) {
+        return new UpdateColumnNullability(fieldNames, newNullability);
+    }
+
+    static SchemaChange updateColumnComment(String[] fieldNames, String comment) {
+        return new UpdateColumnComment(fieldNames, comment);
+    }
+
     /** A SchemaChange to set a table option. */
     final class SetOption implements SchemaChange {
         private final String key;
@@ -205,4 +212,82 @@ public interface SchemaChange {
             return result;
         }
     }
+
+    /** A SchemaChange to update the (nested) field nullability. */
+    final class UpdateColumnNullability implements SchemaChange {
+        private final String[] fieldNames;
+        private final boolean newNullability;
+
+        public UpdateColumnNullability(String[] fieldNames, boolean newNullability) {
+            this.fieldNames = fieldNames;
+            this.newNullability = newNullability;
+        }
+
+        public String[] fieldNames() {
+            return fieldNames;
+        }
+
+        public boolean newNullability() {
+            return newNullability;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) {
+                return true;
+            }
+            if (!(o instanceof UpdateColumnNullability)) {
+                return false;
+            }
+            UpdateColumnNullability that = (UpdateColumnNullability) o;
+            return newNullability == that.newNullability
+                    && Arrays.equals(fieldNames, that.fieldNames);
+        }
+
+        @Override
+        public int hashCode() {
+            int result = Objects.hash(newNullability);
+            result = 31 * result + Arrays.hashCode(fieldNames);
+            return result;
+        }
+    }
+
+    /** A SchemaChange to update the (nested) field comment. */
+    final class UpdateColumnComment implements SchemaChange {
+        private final String[] fieldNames;
+        private final String newDescription;
+
+        public UpdateColumnComment(String[] fieldNames, String newDescription) {
+            this.fieldNames = fieldNames;
+            this.newDescription = newDescription;
+        }
+
+        public String[] fieldNames() {
+            return fieldNames;
+        }
+
+        public String newDescription() {
+            return newDescription;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) {
+                return true;
+            }
+            if (!(o instanceof UpdateColumnComment)) {
+                return false;
+            }
+            UpdateColumnComment that = (UpdateColumnComment) o;
+            return Arrays.equals(fieldNames, that.fieldNames)
+                    && newDescription.equals(that.newDescription);
+        }
+
+        @Override
+        public int hashCode() {
+            int result = Objects.hash(newDescription);
+            result = 31 * result + Arrays.hashCode(fieldNames);
+            return result;
+        }
+    }
 }
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/SchemaManager.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/SchemaManager.java
index b5a0c775..8634de77 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/SchemaManager.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/SchemaManager.java
@@ -24,6 +24,8 @@ import org.apache.flink.table.store.file.operation.Lock;
 import org.apache.flink.table.store.file.schema.SchemaChange.AddColumn;
 import org.apache.flink.table.store.file.schema.SchemaChange.RemoveOption;
 import org.apache.flink.table.store.file.schema.SchemaChange.SetOption;
+import org.apache.flink.table.store.file.schema.SchemaChange.UpdateColumnComment;
+import org.apache.flink.table.store.file.schema.SchemaChange.UpdateColumnNullability;
 import org.apache.flink.table.store.file.schema.SchemaChange.UpdateColumnType;
 import org.apache.flink.table.store.file.utils.AtomicFileWriter;
 import org.apache.flink.table.store.file.utils.FileUtils;
@@ -37,12 +39,14 @@ import java.io.IOException;
 import java.io.Serializable;
 import java.io.UncheckedIOException;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.Callable;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Function;
 import java.util.stream.Collectors;
 
 import static org.apache.flink.table.store.file.utils.FileUtils.listVersionedFiles;
@@ -172,34 +176,46 @@ public class SchemaManager implements Serializable {
                                     id, addColumn.fieldName(), dataType, addColumn.description()));
                 } else if (change instanceof UpdateColumnType) {
                     UpdateColumnType update = (UpdateColumnType) change;
-                    boolean found = false;
-                    for (int i = 0; i < newFields.size(); i++) {
-                        DataField field = newFields.get(i);
-                        if (field.name().equals(update.fieldName())) {
-                            AtomicInteger dummyId = new AtomicInteger(0);
-                            DataType newType =
-                                    TableSchema.toDataType(update.newLogicalType(), dummyId);
-                            if (dummyId.get() != 0) {
-                                throw new RuntimeException(
-                                        String.format(
-                                                "Update column to nested row type '%s' is not supported.",
-                                                update.newLogicalType()));
-                            }
-                            newFields.set(
-                                    i,
+                    updateColumn(
+                            newFields,
+                            update.fieldName(),
+                            (field) -> {
+                                AtomicInteger dummyId = new AtomicInteger(0);
+                                DataType newType =
+                                        TableSchema.toDataType(
+                                                update.newLogicalType(), new AtomicInteger(0));
+                                if (dummyId.get() != 0) {
+                                    throw new RuntimeException(
+                                            String.format(
+                                                    "Update column to nested row type '%s' is not supported.",
+                                                    update.newLogicalType()));
+                                }
+                                return new DataField(field.id(), field.name(), newType);
+                            });
+                } else if (change instanceof UpdateColumnNullability) {
+                    UpdateColumnNullability update = (UpdateColumnNullability) change;
+                    updateNestedColumn(
+                            newFields,
+                            update.fieldNames(),
+                            0,
+                            (field) ->
                                     new DataField(
                                             field.id(),
                                             field.name(),
-                                            newType,
+                                            field.type().copy(update.newNullability()),
                                             field.description()));
-                            found = true;
-                            break;
-                        }
-                    }
-
-                    if (!found) {
-                        throw new RuntimeException("Can not find column: " + update.fieldName());
-                    }
+                } else if (change instanceof UpdateColumnComment) {
+                    UpdateColumnComment update = (UpdateColumnComment) change;
+                    updateNestedColumn(
+                            newFields,
+                            update.fieldNames(),
+                            0,
+                            (field) ->
+                                    new DataField(
+                                            field.id(),
+                                            field.name(),
+                                            field.type(),
+                                            update.newDescription()));
                 } else {
                     throw new UnsupportedOperationException(
                             "Unsupported change: " + change.getClass());
@@ -223,6 +239,41 @@ public class SchemaManager implements Serializable {
         }
     }
 
+    private void updateNestedColumn(
+            List<DataField> newFields,
+            String[] updateFieldNames,
+            int index,
+            Function<DataField, DataField> updateFunc) {
+        boolean found = false;
+        for (int i = 0; i < newFields.size(); i++) {
+            DataField field = newFields.get(i);
+            if (field.name().equals(updateFieldNames[index])) {
+                found = true;
+                if (index == updateFieldNames.length - 1) {
+                    newFields.set(i, updateFunc.apply(field));
+                    break;
+                } else {
+                    assert field.type() instanceof RowDataType;
+                    updateNestedColumn(
+                            ((RowDataType) field.type()).fields(),
+                            updateFieldNames,
+                            index + 1,
+                            updateFunc);
+                }
+            }
+        }
+        if (!found) {
+            throw new RuntimeException("Can not find column: " + Arrays.asList(updateFieldNames));
+        }
+    }
+
+    private void updateColumn(
+            List<DataField> newFields,
+            String updateFieldName,
+            Function<DataField, DataField> updateFunc) {
+        updateNestedColumn(newFields, new String[] {updateFieldName}, 0, updateFunc);
+    }
+
     private boolean commit(TableSchema newSchema) throws Exception {
         Path schemaPath = toSchemaPath(newSchema.id());
         FileSystem fs = schemaPath.getFileSystem();
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/TableSchema.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/TableSchema.java
index 80266343..bed3d8eb 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/TableSchema.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/TableSchema.java
@@ -294,7 +294,7 @@ public class TableSchema implements Serializable {
             collectFieldIds(((MultisetDataType) type).elementType(), fieldIds);
         } else if (type instanceof MapDataType) {
             collectFieldIds(((MapDataType) type).keyType(), fieldIds);
-            collectFieldIds(((MapDataType) type).keyType(), fieldIds);
+            collectFieldIds(((MapDataType) type).valueType(), fieldIds);
         } else if (type instanceof RowDataType) {
             for (DataField field : ((RowDataType) type).fields()) {
                 if (fieldIds.contains(field.id())) {
diff --git a/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkCatalog.java b/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkCatalog.java
index 65f1387b..5afedea4 100644
--- a/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkCatalog.java
+++ b/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkCatalog.java
@@ -36,6 +36,8 @@ import org.apache.spark.sql.connector.catalog.TableChange;
 import org.apache.spark.sql.connector.catalog.TableChange.AddColumn;
 import org.apache.spark.sql.connector.catalog.TableChange.RemoveProperty;
 import org.apache.spark.sql.connector.catalog.TableChange.SetProperty;
+import org.apache.spark.sql.connector.catalog.TableChange.UpdateColumnComment;
+import org.apache.spark.sql.connector.catalog.TableChange.UpdateColumnNullability;
 import org.apache.spark.sql.connector.catalog.TableChange.UpdateColumnType;
 import org.apache.spark.sql.connector.expressions.Transform;
 import org.apache.spark.sql.types.StructType;
@@ -148,6 +150,12 @@ public class SparkCatalog implements TableCatalog, SupportsNamespaces {
             validateAlterNestedField(update.fieldNames());
             return SchemaChange.updateColumnType(
                     update.fieldNames()[0], toFlinkType(update.newDataType()));
+        } else if (change instanceof UpdateColumnNullability) {
+            UpdateColumnNullability update = (UpdateColumnNullability) change;
+            return SchemaChange.updateColumnNullability(update.fieldNames(), update.nullable());
+        } else if (change instanceof UpdateColumnComment) {
+            UpdateColumnComment update = (UpdateColumnComment) change;
+            return SchemaChange.updateColumnComment(update.fieldNames(), update.newComment());
         } else {
             throw new UnsupportedOperationException(
                     "Change is not supported: " + change.getClass());
diff --git a/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkReadITCase.java b/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkReadITCase.java
index 5dbb8cca..cfc15e1e 100644
--- a/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkReadITCase.java
+++ b/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkReadITCase.java
@@ -19,11 +19,17 @@
 package org.apache.flink.table.store.spark;
 
 import org.apache.flink.core.fs.Path;
+import org.apache.flink.table.data.GenericArrayData;
 import org.apache.flink.table.data.GenericRowData;
 import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.store.file.schema.DataField;
+import org.apache.flink.table.store.file.schema.RowDataType;
 import org.apache.flink.table.store.file.schema.TableSchema;
 import org.apache.flink.table.store.table.FileStoreTableFactory;
+import org.apache.flink.table.types.logical.ArrayType;
 import org.apache.flink.table.types.logical.BigIntType;
+import org.apache.flink.table.types.logical.BooleanType;
+import org.apache.flink.table.types.logical.DoubleType;
 import org.apache.flink.table.types.logical.IntType;
 import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.table.types.logical.VarCharType;
@@ -42,7 +48,6 @@ import java.io.IOException;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
-import java.util.UUID;
 
 import static org.assertj.core.api.Assertions.assertThat;
 
@@ -70,20 +75,58 @@ public class SparkReadITCase {
 
         // flink sink
         tablePath1 = new Path(warehousePath, "default.db/t1");
-        SimpleTableTestHelper testHelper1 = createTestHelper(tablePath1);
+        SimpleTableTestHelper testHelper1 = new SimpleTableTestHelper(tablePath1, rowType1());
         testHelper1.write(GenericRowData.of(1, 2L, StringData.fromString("1")));
         testHelper1.write(GenericRowData.of(3, 4L, StringData.fromString("2")));
         testHelper1.write(GenericRowData.of(5, 6L, StringData.fromString("3")));
         testHelper1.write(GenericRowData.ofKind(RowKind.DELETE, 3, 4L, StringData.fromString("2")));
         testHelper1.commit();
 
+        // a int not null
+        // b array<varchar> not null
+        // c row<row<double, array<boolean> not null> not null, bigint> not null
         tablePath2 = new Path(warehousePath, "default.db/t2");
-        SimpleTableTestHelper testHelper2 = createTestHelper(tablePath2);
-        testHelper2.write(GenericRowData.of(1, 2L, StringData.fromString("1")));
-        testHelper2.write(GenericRowData.of(3, 4L, StringData.fromString("2")));
+        SimpleTableTestHelper testHelper2 = new SimpleTableTestHelper(tablePath2, rowType2());
+        testHelper2.write(
+                GenericRowData.of(
+                        1,
+                        new GenericArrayData(
+                                new StringData[] {
+                                    StringData.fromString("AAA"), StringData.fromString("BBB")
+                                }),
+                        GenericRowData.of(
+                                GenericRowData.of(1.0d, new GenericArrayData(new Boolean[] {null})),
+                                1L)));
+        testHelper2.write(
+                GenericRowData.of(
+                        2,
+                        new GenericArrayData(
+                                new StringData[] {
+                                    StringData.fromString("CCC"), StringData.fromString("DDD")
+                                }),
+                        GenericRowData.of(
+                                GenericRowData.of(null, new GenericArrayData(new Boolean[] {true})),
+                                null)));
         testHelper2.commit();
-        testHelper2.write(GenericRowData.of(5, 6L, StringData.fromString("3")));
-        testHelper2.write(GenericRowData.of(7, 8L, StringData.fromString("4")));
+
+        testHelper2.write(
+                GenericRowData.of(
+                        3,
+                        new GenericArrayData(new StringData[] {null, null}),
+                        GenericRowData.of(
+                                GenericRowData.of(
+                                        2.0d, new GenericArrayData(new boolean[] {true, false})),
+                                2L)));
+
+        testHelper2.write(
+                GenericRowData.of(
+                        4,
+                        new GenericArrayData(new StringData[] {null, StringData.fromString("EEE")}),
+                        GenericRowData.of(
+                                GenericRowData.of(
+                                        3.0d,
+                                        new GenericArrayData(new Boolean[] {true, false, true})),
+                                3L)));
         testHelper2.commit();
     }
 
@@ -91,12 +134,50 @@ public class SparkReadITCase {
         RowType rowType =
                 new RowType(
                         Arrays.asList(
-                                new RowType.RowField("a", new IntType()),
+                                new RowType.RowField("a", new IntType(false)),
                                 new RowType.RowField("b", new BigIntType()),
                                 new RowType.RowField("c", new VarCharType())));
         return new SimpleTableTestHelper(tablePath, rowType);
     }
 
+    private static RowType rowType1() {
+        return new RowType(
+                Arrays.asList(
+                        new RowType.RowField("a", new IntType(false)),
+                        new RowType.RowField("b", new BigIntType()),
+                        new RowType.RowField("c", new VarCharType())));
+    }
+
+    private static RowType rowType2() {
+        return new RowType(
+                Arrays.asList(
+                        new RowType.RowField("a", new IntType(false), "comment about a"),
+                        new RowType.RowField("b", new ArrayType(false, new VarCharType())),
+                        new RowType.RowField(
+                                "c",
+                                new RowType(
+                                        false,
+                                        Arrays.asList(
+                                                new RowType.RowField(
+                                                        "c1",
+                                                        new RowType(
+                                                                false,
+                                                                Arrays.asList(
+                                                                        new RowType.RowField(
+                                                                                "c11",
+                                                                                new DoubleType()),
+                                                                        new RowType.RowField(
+                                                                                "c12",
+                                                                                new ArrayType(
+                                                                                        false,
+                                                                                        new BooleanType()))))),
+                                                new RowType.RowField(
+                                                        "c2",
+                                                        new BigIntType(),
+                                                        "comment about c2"))),
+                                "comment about c")));
+    }
+
     @AfterAll
     public static void stopMetastoreAndSpark() throws IOException {
         if (warehouse != null && warehouse.exists()) {
@@ -110,24 +191,34 @@ public class SparkReadITCase {
 
     @Test
     public void testNormal() {
-        innerTestNormal(
+        innerTestSimpleType(
                 spark.read().format("tablestore").option("path", tablePath1.toString()).load());
+
+        innerTestNestedType(
+                spark.read().format("tablestore").option("path", tablePath2.toString()).load());
     }
 
     @Test
     public void testFilterPushDown() {
-        innerTestFilterPushDown(
+        innerTestSimpleTypeFilterPushDown(
+                spark.read().format("tablestore").option("path", tablePath1.toString()).load());
+
+        innerTestNestedTypeFilterPushDown(
                 spark.read().format("tablestore").option("path", tablePath2.toString()).load());
     }
 
     @Test
     public void testCatalogNormal() {
-        innerTestNormal(spark.table("table_store.default.t1"));
+        innerTestSimpleType(spark.table("table_store.default.t1"));
+
+        innerTestNestedType(spark.table("table_store.default.t2"));
     }
 
     @Test
     public void testCatalogFilterPushDown() {
-        innerTestFilterPushDown(spark.table("table_store.default.t2"));
+        innerTestSimpleTypeFilterPushDown(spark.table("table_store.default.t1"));
+
+        innerTestNestedTypeFilterPushDown(spark.table("table_store.default.t2"));
     }
 
     @Test
@@ -145,7 +236,7 @@ public class SparkReadITCase {
 
     @Test
     public void testAddColumn() throws Exception {
-        Path tablePath = new Path(warehousePath, "default.db/" + UUID.randomUUID());
+        Path tablePath = new Path(warehousePath, "default.db/testAddColumn");
         SimpleTableTestHelper testHelper1 = createTestHelper(tablePath);
         testHelper1.write(GenericRowData.of(1, 2L, StringData.fromString("1")));
         testHelper1.write(GenericRowData.of(5, 6L, StringData.fromString("3")));
@@ -166,21 +257,95 @@ public class SparkReadITCase {
 
     @Test
     public void testAlterColumnType() throws Exception {
-        Path tablePath = new Path(warehousePath, "default.db/testAddColumn");
+        Path tablePath = new Path(warehousePath, "default.db/testAlterColumnType");
         SimpleTableTestHelper testHelper1 = createTestHelper(tablePath);
         testHelper1.write(GenericRowData.of(1, 2L, StringData.fromString("1")));
         testHelper1.write(GenericRowData.of(5, 6L, StringData.fromString("3")));
         testHelper1.commit();
 
-        spark.sql("ALTER TABLE table_store.default.testAddColumn ALTER COLUMN a TYPE BIGINT");
-        innerTestNormal(spark.table("table_store.default.testAddColumn"));
+        spark.sql("ALTER TABLE table_store.default.testAlterColumnType ALTER COLUMN a TYPE BIGINT");
+        innerTestSimpleType(spark.table("table_store.default.testAlterColumnType"));
+    }
+
+    @Test
+    public void testAlterTableColumnNullability() {
+        assertThat(fieldIsNullable(getField(schema2(), 0))).isFalse();
+        assertThat(fieldIsNullable(getField(schema2(), 1))).isFalse();
+        assertThat(fieldIsNullable(getField(schema2(), 2))).isFalse();
+        assertThat(fieldIsNullable(getNestedField(getField(schema2(), 2), 0))).isFalse();
+        assertThat(fieldIsNullable(getNestedField(getField(schema2(), 2), 1))).isTrue();
+        assertThat(fieldIsNullable(getNestedField(getNestedField(getField(schema2(), 2), 0), 0)))
+                .isTrue();
+        assertThat(fieldIsNullable(getNestedField(getNestedField(getField(schema2(), 2), 0), 1)))
+                .isFalse();
+
+        // note: for Spark, it is illegal to change nullable column to non-nullable
+        spark.sql("ALTER TABLE table_store.default.t2 ALTER COLUMN a DROP NOT NULL");
+        assertThat(fieldIsNullable(getField(schema2(), 0))).isTrue();
+
+        spark.sql("ALTER TABLE table_store.default.t2 ALTER COLUMN b DROP NOT NULL");
+        assertThat(fieldIsNullable(getField(schema2(), 1))).isTrue();
+
+        spark.sql("ALTER TABLE table_store.default.t2 ALTER COLUMN c DROP NOT NULL");
+        assertThat(fieldIsNullable(getField(schema2(), 2))).isTrue();
+
+        spark.sql("ALTER TABLE table_store.default.t2 ALTER COLUMN c.c1 DROP NOT NULL");
+        assertThat(fieldIsNullable(getNestedField(getField(schema2(), 2), 0))).isTrue();
+
+        spark.sql("ALTER TABLE table_store.default.t2 ALTER COLUMN c.c1.c12 DROP NOT NULL");
+        assertThat(fieldIsNullable(getNestedField(getNestedField(getField(schema2(), 2), 0), 1)))
+                .isTrue();
+    }
+
+    @Test
+    public void testAlterTableColumnComment() {
+        assertThat(getField(schema1(), 0).description()).isNull();
+
+        spark.sql("ALTER TABLE table_store.default.t1 ALTER COLUMN a COMMENT 'a new comment'");
+        assertThat(getField(schema1(), 0).description()).isEqualTo("a new comment");
+
+        spark.sql(
+                "ALTER TABLE table_store.default.t1 ALTER COLUMN a COMMENT 'yet another comment'");
+        assertThat(getField(schema1(), 0).description()).isEqualTo("yet another comment");
+
+        assertThat(getField(schema2(), 2).description()).isEqualTo("comment about c");
+        assertThat(getNestedField(getField(schema2(), 2), 0).description()).isNull();
+        assertThat(getNestedField(getField(schema2(), 2), 1).description())
+                .isEqualTo("comment about c2");
+        assertThat(getNestedField(getNestedField(getField(schema2(), 2), 0), 0).description())
+                .isNull();
+        assertThat(getNestedField(getNestedField(getField(schema2(), 2), 0), 1).description())
+                .isNull();
+
+        spark.sql(
+                "ALTER TABLE table_store.default.t2 ALTER COLUMN c COMMENT 'yet another comment about c'");
+        spark.sql("ALTER TABLE table_store.default.t2 ALTER COLUMN c.c1 COMMENT 'a nested type'");
+        spark.sql("ALTER TABLE table_store.default.t2 ALTER COLUMN c.c2 COMMENT 'a bigint type'");
+        spark.sql(
+                "ALTER TABLE table_store.default.t2 ALTER COLUMN c.c1.c11 COMMENT 'a double type'");
+        spark.sql(
+                "ALTER TABLE table_store.default.t2 ALTER COLUMN c.c1.c12 COMMENT 'a boolean array'");
+
+        assertThat(getField(schema2(), 2).description()).isEqualTo("yet another comment about c");
+        assertThat(getNestedField(getField(schema2(), 2), 0).description())
+                .isEqualTo("a nested type");
+        assertThat(getNestedField(getField(schema2(), 2), 1).description())
+                .isEqualTo("a bigint type");
+        assertThat(getNestedField(getNestedField(getField(schema2(), 2), 0), 0).description())
+                .isEqualTo("a double type");
+        assertThat(getNestedField(getNestedField(getField(schema2(), 2), 0), 1).description())
+                .isEqualTo("a boolean array");
     }
 
     private TableSchema schema1() {
         return FileStoreTableFactory.create(tablePath1).schema();
     }
 
-    private void innerTestNormal(Dataset<Row> dataset) {
+    private TableSchema schema2() {
+        return FileStoreTableFactory.create(tablePath2).schema();
+    }
+
+    private void innerTestSimpleType(Dataset<Row> dataset) {
         List<Row> results = dataset.collectAsList();
         assertThat(results.toString()).isEqualTo("[[1,2,1], [5,6,3]]");
 
@@ -191,8 +356,86 @@ public class SparkReadITCase {
         assertThat(results.toString()).isEqualTo("[[8]]");
     }
 
-    private void innerTestFilterPushDown(Dataset<Row> dataset) {
+    private void innerTestNestedType(Dataset<Row> dataset) {
+        List<Row> results = dataset.collectAsList();
+        assertThat(results.toString())
+                .isEqualTo(
+                        "[[1,WrappedArray(AAA, BBB),[[1.0,WrappedArray(null)],1]], "
+                                + "[2,WrappedArray(CCC, DDD),[[null,WrappedArray(true)],null]], "
+                                + "[3,WrappedArray(null, null),[[2.0,WrappedArray(true, false)],2]], "
+                                + "[4,WrappedArray(null, EEE),[[3.0,WrappedArray(true, false, true)],3]]]");
+
+        results = dataset.select("a").collectAsList();
+        assertThat(results.toString()).isEqualTo("[[1], [2], [3], [4]]");
+
+        results = dataset.select("c.c1").collectAsList();
+        assertThat(results.toString())
+                .isEqualTo(
+                        "[[[1.0,WrappedArray(null)]], [[null,WrappedArray(true)]], "
+                                + "[[2.0,WrappedArray(true, false)]], "
+                                + "[[3.0,WrappedArray(true, false, true)]]]");
+
+        results = dataset.select("c.c2").collectAsList();
+        assertThat(results.toString()).isEqualTo("[[1], [null], [2], [3]]");
+
+        results = dataset.select("c.c1.c11").collectAsList();
+        assertThat(results.toString()).isEqualTo("[[1.0], [null], [2.0], [3.0]]");
+
+        results = dataset.select("c.c1.c12").collectAsList();
+        assertThat(results.toString())
+                .isEqualTo(
+                        "[[WrappedArray(null)], "
+                                + "[WrappedArray(true)], "
+                                + "[WrappedArray(true, false)], "
+                                + "[WrappedArray(true, false, true)]]");
+    }
+
+    private void innerTestSimpleTypeFilterPushDown(Dataset<Row> dataset) {
         List<Row> results = dataset.filter("a < 4").select("a", "c").collectAsList();
-        assertThat(results.toString()).isEqualTo("[[1,1], [3,2]]");
+        assertThat(results.toString()).isEqualTo("[[1,1]]");
+
+        results = dataset.filter("b = 4").select("a", "c").collectAsList();
+        assertThat(results.toString()).isEqualTo("[]");
+    }
+
+    private void innerTestNestedTypeFilterPushDown(Dataset<Row> dataset) {
+        List<Row> results = dataset.filter("a < 4").select("a").collectAsList();
+        assertThat(results.toString()).isEqualTo("[[1], [2], [3]]");
+
+        results = dataset.filter("array_contains(b, 'AAA')").select("b").collectAsList();
+        assertThat(results.toString()).isEqualTo("[[WrappedArray(AAA, BBB)]]");
+
+        results = dataset.filter("c.c1.c11 is null").select("a", "c").collectAsList();
+        assertThat(results.toString()).isEqualTo("[[2,[[null,WrappedArray(true)],null]]]");
+
+        results = dataset.filter("c.c1.c11 = 1.0").select("a", "c.c1").collectAsList();
+        assertThat(results.toString()).isEqualTo("[[1,[1.0,WrappedArray(null)]]]");
+
+        results = dataset.filter("c.c2 is null").select("a", "c").collectAsList();
+        assertThat(results.toString()).isEqualTo("[[2,[[null,WrappedArray(true)],null]]]");
+
+        results =
+                dataset.filter("array_contains(c.c1.c12, false)")
+                        .select("a", "c.c1.c12", "c.c2")
+                        .collectAsList();
+        assertThat(results.toString())
+                .isEqualTo(
+                        "[[3,WrappedArray(true, false),2], [4,WrappedArray(true, false, true),3]]");
+    }
+
+    private boolean fieldIsNullable(DataField field) {
+        return field.type().logicalType().isNullable();
+    }
+
+    private DataField getField(TableSchema schema, int index) {
+        return schema.fields().get(index);
+    }
+
+    private DataField getNestedField(DataField field, int index) {
+        if (field.type() instanceof RowDataType) {
+            RowDataType rowDataType = (RowDataType) field.type();
+            return rowDataType.fields().get(index);
+        }
+        throw new IllegalArgumentException();
     }
 }