You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2022/07/19 03:44:01 UTC
[flink-table-store] branch master updated: [FLINK-28533] SchemaChange supports updateColumnNullability and updateColumnComment
This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git
The following commit(s) were added to refs/heads/master by this push:
new d9b3eeef [FLINK-28533] SchemaChange supports updateColumnNullability and updateColumnComment
d9b3eeef is described below
commit d9b3eeef306efa10a8c4f1a5d09be11a33849ead
Author: Jane Chan <55...@users.noreply.github.com>
AuthorDate: Tue Jul 19 11:43:58 2022 +0800
[FLINK-28533] SchemaChange supports updateColumnNullability and updateColumnComment
This closes #215
---
.../table/store/file/schema/ArrayDataType.java | 5 +
.../table/store/file/schema/AtomicDataType.java | 5 +
.../flink/table/store/file/schema/DataType.java | 8 +
.../flink/table/store/file/schema/MapDataType.java | 5 +
.../table/store/file/schema/MultisetDataType.java | 5 +
.../flink/table/store/file/schema/RowDataType.java | 5 +
.../table/store/file/schema/SchemaChange.java | 89 ++++++-
.../table/store/file/schema/SchemaManager.java | 99 ++++++--
.../flink/table/store/file/schema/TableSchema.java | 2 +-
.../flink/table/store/spark/SparkCatalog.java | 8 +
.../flink/table/store/spark/SparkReadITCase.java | 281 +++++++++++++++++++--
11 files changed, 466 insertions(+), 46 deletions(-)
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/ArrayDataType.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/ArrayDataType.java
index ba48fea3..e5e4bbf8 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/ArrayDataType.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/ArrayDataType.java
@@ -38,6 +38,11 @@ public class ArrayDataType extends DataType {
return elementType;
}
+ @Override
+ public DataType copy(boolean isNullable) {
+ return new ArrayDataType(isNullable, elementType);
+ }
+
@Override
public boolean equals(Object o) {
if (this == o) {
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/AtomicDataType.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/AtomicDataType.java
index 6d4db17b..116721e3 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/AtomicDataType.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/AtomicDataType.java
@@ -28,4 +28,9 @@ public final class AtomicDataType extends DataType {
public AtomicDataType(LogicalType logicalType) {
super(logicalType);
}
+
+ @Override
+ public DataType copy(boolean isNullable) {
+ return new AtomicDataType(logicalType.copy(isNullable));
+ }
}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/DataType.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/DataType.java
index 26cbdbee..83bfff0b 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/DataType.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/DataType.java
@@ -45,6 +45,14 @@ public abstract class DataType implements Serializable {
return logicalType;
}
+ /**
+ * Returns a copy of this data type with possibly different nullability.
+ *
+ * @param isNullable the intended nullability of the copied type
+ * @return a copied {@link DataType}
+ */
+ public abstract DataType copy(boolean isNullable);
+
@Override
public boolean equals(Object o) {
if (this == o) {
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/MapDataType.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/MapDataType.java
index 4f27bf8a..b6719d4a 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/MapDataType.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/MapDataType.java
@@ -45,6 +45,11 @@ public class MapDataType extends DataType {
return valueType;
}
+ @Override
+ public DataType copy(boolean isNullable) {
+ return new MapDataType(isNullable, keyType, valueType);
+ }
+
@Override
public boolean equals(Object o) {
if (this == o) {
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/MultisetDataType.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/MultisetDataType.java
index 80e5c27d..6b69cdf8 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/MultisetDataType.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/MultisetDataType.java
@@ -38,6 +38,11 @@ public class MultisetDataType extends DataType {
return elementType;
}
+ @Override
+ public DataType copy(boolean isNullable) {
+ return new MultisetDataType(isNullable, elementType);
+ }
+
@Override
public boolean equals(Object o) {
if (this == o) {
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/RowDataType.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/RowDataType.java
index a0a2bae1..081b4b88 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/RowDataType.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/RowDataType.java
@@ -54,6 +54,11 @@ public class RowDataType extends DataType {
return fields;
}
+ @Override
+ public DataType copy(boolean isNullable) {
+ return new RowDataType(isNullable, fields);
+ }
+
@Override
public boolean equals(Object o) {
if (this == o) {
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/SchemaChange.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/SchemaChange.java
index 58a3a6c6..42674fa9 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/SchemaChange.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/SchemaChange.java
@@ -22,13 +22,12 @@ import org.apache.flink.table.types.logical.LogicalType;
import javax.annotation.Nullable;
+import java.util.Arrays;
import java.util.Objects;
/** Schema change to table. */
public interface SchemaChange {
- // TODO more change support like updateColumnNullability and updateColumnComment.
-
static SchemaChange setOption(String key, String value) {
return new SetOption(key, value);
}
@@ -46,6 +45,14 @@ public interface SchemaChange {
return new UpdateColumnType(fieldName, newLogicalType);
}
+ static SchemaChange updateColumnNullability(String[] fieldNames, boolean newNullability) {
+ return new UpdateColumnNullability(fieldNames, newNullability);
+ }
+
+ static SchemaChange updateColumnComment(String[] fieldNames, String comment) {
+ return new UpdateColumnComment(fieldNames, comment);
+ }
+
/** A SchemaChange to set a table option. */
final class SetOption implements SchemaChange {
private final String key;
@@ -205,4 +212,82 @@ public interface SchemaChange {
return result;
}
}
+
+ /** A SchemaChange to update the (nested) field nullability. */
+ final class UpdateColumnNullability implements SchemaChange {
+ private final String[] fieldNames;
+ private final boolean newNullability;
+
+ public UpdateColumnNullability(String[] fieldNames, boolean newNullability) {
+ this.fieldNames = fieldNames;
+ this.newNullability = newNullability;
+ }
+
+ public String[] fieldNames() {
+ return fieldNames;
+ }
+
+ public boolean newNullability() {
+ return newNullability;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (!(o instanceof UpdateColumnNullability)) {
+ return false;
+ }
+ UpdateColumnNullability that = (UpdateColumnNullability) o;
+ return newNullability == that.newNullability
+ && Arrays.equals(fieldNames, that.fieldNames);
+ }
+
+ @Override
+ public int hashCode() {
+ int result = Objects.hash(newNullability);
+ result = 31 * result + Arrays.hashCode(fieldNames);
+ return result;
+ }
+ }
+
+ /** A SchemaChange to update the (nested) field comment. */
+ final class UpdateColumnComment implements SchemaChange {
+ private final String[] fieldNames;
+ private final String newDescription;
+
+ public UpdateColumnComment(String[] fieldNames, String newDescription) {
+ this.fieldNames = fieldNames;
+ this.newDescription = newDescription;
+ }
+
+ public String[] fieldNames() {
+ return fieldNames;
+ }
+
+ public String newDescription() {
+ return newDescription;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (!(o instanceof UpdateColumnComment)) {
+ return false;
+ }
+ UpdateColumnComment that = (UpdateColumnComment) o;
+ return Arrays.equals(fieldNames, that.fieldNames)
+ && newDescription.equals(that.newDescription);
+ }
+
+ @Override
+ public int hashCode() {
+ int result = Objects.hash(newDescription);
+ result = 31 * result + Arrays.hashCode(fieldNames);
+ return result;
+ }
+ }
}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/SchemaManager.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/SchemaManager.java
index b5a0c775..8634de77 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/SchemaManager.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/SchemaManager.java
@@ -24,6 +24,8 @@ import org.apache.flink.table.store.file.operation.Lock;
import org.apache.flink.table.store.file.schema.SchemaChange.AddColumn;
import org.apache.flink.table.store.file.schema.SchemaChange.RemoveOption;
import org.apache.flink.table.store.file.schema.SchemaChange.SetOption;
+import org.apache.flink.table.store.file.schema.SchemaChange.UpdateColumnComment;
+import org.apache.flink.table.store.file.schema.SchemaChange.UpdateColumnNullability;
import org.apache.flink.table.store.file.schema.SchemaChange.UpdateColumnType;
import org.apache.flink.table.store.file.utils.AtomicFileWriter;
import org.apache.flink.table.store.file.utils.FileUtils;
@@ -37,12 +39,14 @@ import java.io.IOException;
import java.io.Serializable;
import java.io.UncheckedIOException;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Function;
import java.util.stream.Collectors;
import static org.apache.flink.table.store.file.utils.FileUtils.listVersionedFiles;
@@ -172,34 +176,46 @@ public class SchemaManager implements Serializable {
id, addColumn.fieldName(), dataType, addColumn.description()));
} else if (change instanceof UpdateColumnType) {
UpdateColumnType update = (UpdateColumnType) change;
- boolean found = false;
- for (int i = 0; i < newFields.size(); i++) {
- DataField field = newFields.get(i);
- if (field.name().equals(update.fieldName())) {
- AtomicInteger dummyId = new AtomicInteger(0);
- DataType newType =
- TableSchema.toDataType(update.newLogicalType(), dummyId);
- if (dummyId.get() != 0) {
- throw new RuntimeException(
- String.format(
- "Update column to nested row type '%s' is not supported.",
- update.newLogicalType()));
- }
- newFields.set(
- i,
+ updateColumn(
+ newFields,
+ update.fieldName(),
+ (field) -> {
+ AtomicInteger dummyId = new AtomicInteger(0);
+ DataType newType =
+ TableSchema.toDataType(
+ update.newLogicalType(), new AtomicInteger(0));
+ if (dummyId.get() != 0) {
+ throw new RuntimeException(
+ String.format(
+ "Update column to nested row type '%s' is not supported.",
+ update.newLogicalType()));
+ }
+ return new DataField(field.id(), field.name(), newType);
+ });
+ } else if (change instanceof UpdateColumnNullability) {
+ UpdateColumnNullability update = (UpdateColumnNullability) change;
+ updateNestedColumn(
+ newFields,
+ update.fieldNames(),
+ 0,
+ (field) ->
new DataField(
field.id(),
field.name(),
- newType,
+ field.type().copy(update.newNullability()),
field.description()));
- found = true;
- break;
- }
- }
-
- if (!found) {
- throw new RuntimeException("Can not find column: " + update.fieldName());
- }
+ } else if (change instanceof UpdateColumnComment) {
+ UpdateColumnComment update = (UpdateColumnComment) change;
+ updateNestedColumn(
+ newFields,
+ update.fieldNames(),
+ 0,
+ (field) ->
+ new DataField(
+ field.id(),
+ field.name(),
+ field.type(),
+ update.newDescription()));
} else {
throw new UnsupportedOperationException(
"Unsupported change: " + change.getClass());
@@ -223,6 +239,41 @@ public class SchemaManager implements Serializable {
}
}
+ private void updateNestedColumn(
+ List<DataField> newFields,
+ String[] updateFieldNames,
+ int index,
+ Function<DataField, DataField> updateFunc) {
+ boolean found = false;
+ for (int i = 0; i < newFields.size(); i++) {
+ DataField field = newFields.get(i);
+ if (field.name().equals(updateFieldNames[index])) {
+ found = true;
+ if (index == updateFieldNames.length - 1) {
+ newFields.set(i, updateFunc.apply(field));
+ break;
+ } else {
+ assert field.type() instanceof RowDataType;
+ updateNestedColumn(
+ ((RowDataType) field.type()).fields(),
+ updateFieldNames,
+ index + 1,
+ updateFunc);
+ }
+ }
+ }
+ if (!found) {
+ throw new RuntimeException("Can not find column: " + Arrays.asList(updateFieldNames));
+ }
+ }
+
+ private void updateColumn(
+ List<DataField> newFields,
+ String updateFieldName,
+ Function<DataField, DataField> updateFunc) {
+ updateNestedColumn(newFields, new String[] {updateFieldName}, 0, updateFunc);
+ }
+
private boolean commit(TableSchema newSchema) throws Exception {
Path schemaPath = toSchemaPath(newSchema.id());
FileSystem fs = schemaPath.getFileSystem();
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/TableSchema.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/TableSchema.java
index 80266343..bed3d8eb 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/TableSchema.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/TableSchema.java
@@ -294,7 +294,7 @@ public class TableSchema implements Serializable {
collectFieldIds(((MultisetDataType) type).elementType(), fieldIds);
} else if (type instanceof MapDataType) {
collectFieldIds(((MapDataType) type).keyType(), fieldIds);
- collectFieldIds(((MapDataType) type).keyType(), fieldIds);
+ collectFieldIds(((MapDataType) type).valueType(), fieldIds);
} else if (type instanceof RowDataType) {
for (DataField field : ((RowDataType) type).fields()) {
if (fieldIds.contains(field.id())) {
diff --git a/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkCatalog.java b/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkCatalog.java
index 65f1387b..5afedea4 100644
--- a/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkCatalog.java
+++ b/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkCatalog.java
@@ -36,6 +36,8 @@ import org.apache.spark.sql.connector.catalog.TableChange;
import org.apache.spark.sql.connector.catalog.TableChange.AddColumn;
import org.apache.spark.sql.connector.catalog.TableChange.RemoveProperty;
import org.apache.spark.sql.connector.catalog.TableChange.SetProperty;
+import org.apache.spark.sql.connector.catalog.TableChange.UpdateColumnComment;
+import org.apache.spark.sql.connector.catalog.TableChange.UpdateColumnNullability;
import org.apache.spark.sql.connector.catalog.TableChange.UpdateColumnType;
import org.apache.spark.sql.connector.expressions.Transform;
import org.apache.spark.sql.types.StructType;
@@ -148,6 +150,12 @@ public class SparkCatalog implements TableCatalog, SupportsNamespaces {
validateAlterNestedField(update.fieldNames());
return SchemaChange.updateColumnType(
update.fieldNames()[0], toFlinkType(update.newDataType()));
+ } else if (change instanceof UpdateColumnNullability) {
+ UpdateColumnNullability update = (UpdateColumnNullability) change;
+ return SchemaChange.updateColumnNullability(update.fieldNames(), update.nullable());
+ } else if (change instanceof UpdateColumnComment) {
+ UpdateColumnComment update = (UpdateColumnComment) change;
+ return SchemaChange.updateColumnComment(update.fieldNames(), update.newComment());
} else {
throw new UnsupportedOperationException(
"Change is not supported: " + change.getClass());
diff --git a/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkReadITCase.java b/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkReadITCase.java
index 5dbb8cca..cfc15e1e 100644
--- a/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkReadITCase.java
+++ b/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkReadITCase.java
@@ -19,11 +19,17 @@
package org.apache.flink.table.store.spark;
import org.apache.flink.core.fs.Path;
+import org.apache.flink.table.data.GenericArrayData;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.store.file.schema.DataField;
+import org.apache.flink.table.store.file.schema.RowDataType;
import org.apache.flink.table.store.file.schema.TableSchema;
import org.apache.flink.table.store.table.FileStoreTableFactory;
+import org.apache.flink.table.types.logical.ArrayType;
import org.apache.flink.table.types.logical.BigIntType;
+import org.apache.flink.table.types.logical.BooleanType;
+import org.apache.flink.table.types.logical.DoubleType;
import org.apache.flink.table.types.logical.IntType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.logical.VarCharType;
@@ -42,7 +48,6 @@ import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
-import java.util.UUID;
import static org.assertj.core.api.Assertions.assertThat;
@@ -70,20 +75,58 @@ public class SparkReadITCase {
// flink sink
tablePath1 = new Path(warehousePath, "default.db/t1");
- SimpleTableTestHelper testHelper1 = createTestHelper(tablePath1);
+ SimpleTableTestHelper testHelper1 = new SimpleTableTestHelper(tablePath1, rowType1());
testHelper1.write(GenericRowData.of(1, 2L, StringData.fromString("1")));
testHelper1.write(GenericRowData.of(3, 4L, StringData.fromString("2")));
testHelper1.write(GenericRowData.of(5, 6L, StringData.fromString("3")));
testHelper1.write(GenericRowData.ofKind(RowKind.DELETE, 3, 4L, StringData.fromString("2")));
testHelper1.commit();
+ // a int not null
+ // b array<varchar> not null
+ // c row<row<double, array<boolean> not null> not null, bigint> not null
tablePath2 = new Path(warehousePath, "default.db/t2");
- SimpleTableTestHelper testHelper2 = createTestHelper(tablePath2);
- testHelper2.write(GenericRowData.of(1, 2L, StringData.fromString("1")));
- testHelper2.write(GenericRowData.of(3, 4L, StringData.fromString("2")));
+ SimpleTableTestHelper testHelper2 = new SimpleTableTestHelper(tablePath2, rowType2());
+ testHelper2.write(
+ GenericRowData.of(
+ 1,
+ new GenericArrayData(
+ new StringData[] {
+ StringData.fromString("AAA"), StringData.fromString("BBB")
+ }),
+ GenericRowData.of(
+ GenericRowData.of(1.0d, new GenericArrayData(new Boolean[] {null})),
+ 1L)));
+ testHelper2.write(
+ GenericRowData.of(
+ 2,
+ new GenericArrayData(
+ new StringData[] {
+ StringData.fromString("CCC"), StringData.fromString("DDD")
+ }),
+ GenericRowData.of(
+ GenericRowData.of(null, new GenericArrayData(new Boolean[] {true})),
+ null)));
testHelper2.commit();
- testHelper2.write(GenericRowData.of(5, 6L, StringData.fromString("3")));
- testHelper2.write(GenericRowData.of(7, 8L, StringData.fromString("4")));
+
+ testHelper2.write(
+ GenericRowData.of(
+ 3,
+ new GenericArrayData(new StringData[] {null, null}),
+ GenericRowData.of(
+ GenericRowData.of(
+ 2.0d, new GenericArrayData(new boolean[] {true, false})),
+ 2L)));
+
+ testHelper2.write(
+ GenericRowData.of(
+ 4,
+ new GenericArrayData(new StringData[] {null, StringData.fromString("EEE")}),
+ GenericRowData.of(
+ GenericRowData.of(
+ 3.0d,
+ new GenericArrayData(new Boolean[] {true, false, true})),
+ 3L)));
testHelper2.commit();
}
@@ -91,12 +134,50 @@ public class SparkReadITCase {
RowType rowType =
new RowType(
Arrays.asList(
- new RowType.RowField("a", new IntType()),
+ new RowType.RowField("a", new IntType(false)),
new RowType.RowField("b", new BigIntType()),
new RowType.RowField("c", new VarCharType())));
return new SimpleTableTestHelper(tablePath, rowType);
}
+ private static RowType rowType1() {
+ return new RowType(
+ Arrays.asList(
+ new RowType.RowField("a", new IntType(false)),
+ new RowType.RowField("b", new BigIntType()),
+ new RowType.RowField("c", new VarCharType())));
+ }
+
+ private static RowType rowType2() {
+ return new RowType(
+ Arrays.asList(
+ new RowType.RowField("a", new IntType(false), "comment about a"),
+ new RowType.RowField("b", new ArrayType(false, new VarCharType())),
+ new RowType.RowField(
+ "c",
+ new RowType(
+ false,
+ Arrays.asList(
+ new RowType.RowField(
+ "c1",
+ new RowType(
+ false,
+ Arrays.asList(
+ new RowType.RowField(
+ "c11",
+ new DoubleType()),
+ new RowType.RowField(
+ "c12",
+ new ArrayType(
+ false,
+ new BooleanType()))))),
+ new RowType.RowField(
+ "c2",
+ new BigIntType(),
+ "comment about c2"))),
+ "comment about c")));
+ }
+
@AfterAll
public static void stopMetastoreAndSpark() throws IOException {
if (warehouse != null && warehouse.exists()) {
@@ -110,24 +191,34 @@ public class SparkReadITCase {
@Test
public void testNormal() {
- innerTestNormal(
+ innerTestSimpleType(
spark.read().format("tablestore").option("path", tablePath1.toString()).load());
+
+ innerTestNestedType(
+ spark.read().format("tablestore").option("path", tablePath2.toString()).load());
}
@Test
public void testFilterPushDown() {
- innerTestFilterPushDown(
+ innerTestSimpleTypeFilterPushDown(
+ spark.read().format("tablestore").option("path", tablePath1.toString()).load());
+
+ innerTestNestedTypeFilterPushDown(
spark.read().format("tablestore").option("path", tablePath2.toString()).load());
}
@Test
public void testCatalogNormal() {
- innerTestNormal(spark.table("table_store.default.t1"));
+ innerTestSimpleType(spark.table("table_store.default.t1"));
+
+ innerTestNestedType(spark.table("table_store.default.t2"));
}
@Test
public void testCatalogFilterPushDown() {
- innerTestFilterPushDown(spark.table("table_store.default.t2"));
+ innerTestSimpleTypeFilterPushDown(spark.table("table_store.default.t1"));
+
+ innerTestNestedTypeFilterPushDown(spark.table("table_store.default.t2"));
}
@Test
@@ -145,7 +236,7 @@ public class SparkReadITCase {
@Test
public void testAddColumn() throws Exception {
- Path tablePath = new Path(warehousePath, "default.db/" + UUID.randomUUID());
+ Path tablePath = new Path(warehousePath, "default.db/testAddColumn");
SimpleTableTestHelper testHelper1 = createTestHelper(tablePath);
testHelper1.write(GenericRowData.of(1, 2L, StringData.fromString("1")));
testHelper1.write(GenericRowData.of(5, 6L, StringData.fromString("3")));
@@ -166,21 +257,95 @@ public class SparkReadITCase {
@Test
public void testAlterColumnType() throws Exception {
- Path tablePath = new Path(warehousePath, "default.db/testAddColumn");
+ Path tablePath = new Path(warehousePath, "default.db/testAlterColumnType");
SimpleTableTestHelper testHelper1 = createTestHelper(tablePath);
testHelper1.write(GenericRowData.of(1, 2L, StringData.fromString("1")));
testHelper1.write(GenericRowData.of(5, 6L, StringData.fromString("3")));
testHelper1.commit();
- spark.sql("ALTER TABLE table_store.default.testAddColumn ALTER COLUMN a TYPE BIGINT");
- innerTestNormal(spark.table("table_store.default.testAddColumn"));
+ spark.sql("ALTER TABLE table_store.default.testAlterColumnType ALTER COLUMN a TYPE BIGINT");
+ innerTestSimpleType(spark.table("table_store.default.testAlterColumnType"));
+ }
+
+ @Test
+ public void testAlterTableColumnNullability() {
+ assertThat(fieldIsNullable(getField(schema2(), 0))).isFalse();
+ assertThat(fieldIsNullable(getField(schema2(), 1))).isFalse();
+ assertThat(fieldIsNullable(getField(schema2(), 2))).isFalse();
+ assertThat(fieldIsNullable(getNestedField(getField(schema2(), 2), 0))).isFalse();
+ assertThat(fieldIsNullable(getNestedField(getField(schema2(), 2), 1))).isTrue();
+ assertThat(fieldIsNullable(getNestedField(getNestedField(getField(schema2(), 2), 0), 0)))
+ .isTrue();
+ assertThat(fieldIsNullable(getNestedField(getNestedField(getField(schema2(), 2), 0), 1)))
+ .isFalse();
+
+ // note: for Spark, it is illegal to change nullable column to non-nullable
+ spark.sql("ALTER TABLE table_store.default.t2 ALTER COLUMN a DROP NOT NULL");
+ assertThat(fieldIsNullable(getField(schema2(), 0))).isTrue();
+
+ spark.sql("ALTER TABLE table_store.default.t2 ALTER COLUMN b DROP NOT NULL");
+ assertThat(fieldIsNullable(getField(schema2(), 1))).isTrue();
+
+ spark.sql("ALTER TABLE table_store.default.t2 ALTER COLUMN c DROP NOT NULL");
+ assertThat(fieldIsNullable(getField(schema2(), 2))).isTrue();
+
+ spark.sql("ALTER TABLE table_store.default.t2 ALTER COLUMN c.c1 DROP NOT NULL");
+ assertThat(fieldIsNullable(getNestedField(getField(schema2(), 2), 0))).isTrue();
+
+ spark.sql("ALTER TABLE table_store.default.t2 ALTER COLUMN c.c1.c12 DROP NOT NULL");
+ assertThat(fieldIsNullable(getNestedField(getNestedField(getField(schema2(), 2), 0), 1)))
+ .isTrue();
+ }
+
+ @Test
+ public void testAlterTableColumnComment() {
+ assertThat(getField(schema1(), 0).description()).isNull();
+
+ spark.sql("ALTER TABLE table_store.default.t1 ALTER COLUMN a COMMENT 'a new comment'");
+ assertThat(getField(schema1(), 0).description()).isEqualTo("a new comment");
+
+ spark.sql(
+ "ALTER TABLE table_store.default.t1 ALTER COLUMN a COMMENT 'yet another comment'");
+ assertThat(getField(schema1(), 0).description()).isEqualTo("yet another comment");
+
+ assertThat(getField(schema2(), 2).description()).isEqualTo("comment about c");
+ assertThat(getNestedField(getField(schema2(), 2), 0).description()).isNull();
+ assertThat(getNestedField(getField(schema2(), 2), 1).description())
+ .isEqualTo("comment about c2");
+ assertThat(getNestedField(getNestedField(getField(schema2(), 2), 0), 0).description())
+ .isNull();
+ assertThat(getNestedField(getNestedField(getField(schema2(), 2), 0), 1).description())
+ .isNull();
+
+ spark.sql(
+ "ALTER TABLE table_store.default.t2 ALTER COLUMN c COMMENT 'yet another comment about c'");
+ spark.sql("ALTER TABLE table_store.default.t2 ALTER COLUMN c.c1 COMMENT 'a nested type'");
+ spark.sql("ALTER TABLE table_store.default.t2 ALTER COLUMN c.c2 COMMENT 'a bigint type'");
+ spark.sql(
+ "ALTER TABLE table_store.default.t2 ALTER COLUMN c.c1.c11 COMMENT 'a double type'");
+ spark.sql(
+ "ALTER TABLE table_store.default.t2 ALTER COLUMN c.c1.c12 COMMENT 'a boolean array'");
+
+ assertThat(getField(schema2(), 2).description()).isEqualTo("yet another comment about c");
+ assertThat(getNestedField(getField(schema2(), 2), 0).description())
+ .isEqualTo("a nested type");
+ assertThat(getNestedField(getField(schema2(), 2), 1).description())
+ .isEqualTo("a bigint type");
+ assertThat(getNestedField(getNestedField(getField(schema2(), 2), 0), 0).description())
+ .isEqualTo("a double type");
+ assertThat(getNestedField(getNestedField(getField(schema2(), 2), 0), 1).description())
+ .isEqualTo("a boolean array");
}
private TableSchema schema1() {
return FileStoreTableFactory.create(tablePath1).schema();
}
- private void innerTestNormal(Dataset<Row> dataset) {
+ private TableSchema schema2() {
+ return FileStoreTableFactory.create(tablePath2).schema();
+ }
+
+ private void innerTestSimpleType(Dataset<Row> dataset) {
List<Row> results = dataset.collectAsList();
assertThat(results.toString()).isEqualTo("[[1,2,1], [5,6,3]]");
@@ -191,8 +356,86 @@ public class SparkReadITCase {
assertThat(results.toString()).isEqualTo("[[8]]");
}
- private void innerTestFilterPushDown(Dataset<Row> dataset) {
+ private void innerTestNestedType(Dataset<Row> dataset) {
+ List<Row> results = dataset.collectAsList();
+ assertThat(results.toString())
+ .isEqualTo(
+ "[[1,WrappedArray(AAA, BBB),[[1.0,WrappedArray(null)],1]], "
+ + "[2,WrappedArray(CCC, DDD),[[null,WrappedArray(true)],null]], "
+ + "[3,WrappedArray(null, null),[[2.0,WrappedArray(true, false)],2]], "
+ + "[4,WrappedArray(null, EEE),[[3.0,WrappedArray(true, false, true)],3]]]");
+
+ results = dataset.select("a").collectAsList();
+ assertThat(results.toString()).isEqualTo("[[1], [2], [3], [4]]");
+
+ results = dataset.select("c.c1").collectAsList();
+ assertThat(results.toString())
+ .isEqualTo(
+ "[[[1.0,WrappedArray(null)]], [[null,WrappedArray(true)]], "
+ + "[[2.0,WrappedArray(true, false)]], "
+ + "[[3.0,WrappedArray(true, false, true)]]]");
+
+ results = dataset.select("c.c2").collectAsList();
+ assertThat(results.toString()).isEqualTo("[[1], [null], [2], [3]]");
+
+ results = dataset.select("c.c1.c11").collectAsList();
+ assertThat(results.toString()).isEqualTo("[[1.0], [null], [2.0], [3.0]]");
+
+ results = dataset.select("c.c1.c12").collectAsList();
+ assertThat(results.toString())
+ .isEqualTo(
+ "[[WrappedArray(null)], "
+ + "[WrappedArray(true)], "
+ + "[WrappedArray(true, false)], "
+ + "[WrappedArray(true, false, true)]]");
+ }
+
+ private void innerTestSimpleTypeFilterPushDown(Dataset<Row> dataset) {
List<Row> results = dataset.filter("a < 4").select("a", "c").collectAsList();
- assertThat(results.toString()).isEqualTo("[[1,1], [3,2]]");
+ assertThat(results.toString()).isEqualTo("[[1,1]]");
+
+ results = dataset.filter("b = 4").select("a", "c").collectAsList();
+ assertThat(results.toString()).isEqualTo("[]");
+ }
+
+ private void innerTestNestedTypeFilterPushDown(Dataset<Row> dataset) {
+ List<Row> results = dataset.filter("a < 4").select("a").collectAsList();
+ assertThat(results.toString()).isEqualTo("[[1], [2], [3]]");
+
+ results = dataset.filter("array_contains(b, 'AAA')").select("b").collectAsList();
+ assertThat(results.toString()).isEqualTo("[[WrappedArray(AAA, BBB)]]");
+
+ results = dataset.filter("c.c1.c11 is null").select("a", "c").collectAsList();
+ assertThat(results.toString()).isEqualTo("[[2,[[null,WrappedArray(true)],null]]]");
+
+ results = dataset.filter("c.c1.c11 = 1.0").select("a", "c.c1").collectAsList();
+ assertThat(results.toString()).isEqualTo("[[1,[1.0,WrappedArray(null)]]]");
+
+ results = dataset.filter("c.c2 is null").select("a", "c").collectAsList();
+ assertThat(results.toString()).isEqualTo("[[2,[[null,WrappedArray(true)],null]]]");
+
+ results =
+ dataset.filter("array_contains(c.c1.c12, false)")
+ .select("a", "c.c1.c12", "c.c2")
+ .collectAsList();
+ assertThat(results.toString())
+ .isEqualTo(
+ "[[3,WrappedArray(true, false),2], [4,WrappedArray(true, false, true),3]]");
+ }
+
+ private boolean fieldIsNullable(DataField field) {
+ return field.type().logicalType().isNullable();
+ }
+
+ private DataField getField(TableSchema schema, int index) {
+ return schema.fields().get(index);
+ }
+
+ private DataField getNestedField(DataField field, int index) {
+ if (field.type() instanceof RowDataType) {
+ RowDataType rowDataType = (RowDataType) field.type();
+ return rowDataType.fields().get(index);
+ }
+ throw new IllegalArgumentException();
}
}