You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2022/07/26 07:35:49 UTC
[flink-table-store] branch release-0.2 updated: [FLINK-28677] Primary key should not be null when creating Spark table
This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch release-0.2
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git
The following commit(s) were added to refs/heads/release-0.2 by this push:
new 746f1e28 [FLINK-28677] Primary key should not be null when creating Spark table
746f1e28 is described below
commit 746f1e28695d74bf29649f93b4cdb5cc127a6586
Author: Jane Chan <55...@users.noreply.github.com>
AuthorDate: Tue Jul 26 15:34:14 2022 +0800
[FLINK-28677] Primary key should not be null when creating Spark table
This closes #237
---
.../table/store/file/schema/SchemaManager.java | 6 ++
.../table/store/file/schema/UpdateSchema.java | 46 +++++++++++-
.../table/store/file/schema/SchemaManagerTest.java | 4 +-
.../flink/table/store/spark/SparkReadITCase.java | 86 ++++++++++++++++++++++
4 files changed, 139 insertions(+), 3 deletions(-)
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/SchemaManager.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/SchemaManager.java
index 7dcec7f9..fe3c8f9b 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/SchemaManager.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/SchemaManager.java
@@ -199,6 +199,12 @@ public class SchemaManager implements Serializable {
});
} else if (change instanceof UpdateColumnNullability) {
UpdateColumnNullability update = (UpdateColumnNullability) change;
+ if (update.fieldNames().length == 1
+ && update.newNullability()
+ && schema.primaryKeys().contains(update.fieldNames()[0])) {
+ throw new UnsupportedOperationException(
+ "Cannot change nullability of primary key");
+ }
updateNestedColumn(
newFields,
update.fieldNames(),
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/UpdateSchema.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/UpdateSchema.java
index 29aa4840..87477589 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/UpdateSchema.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/UpdateSchema.java
@@ -24,12 +24,15 @@ import org.apache.flink.table.catalog.CatalogTableImpl;
import org.apache.flink.table.descriptors.DescriptorProperties;
import org.apache.flink.table.descriptors.Schema;
import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.util.Preconditions;
import java.util.ArrayList;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
+import java.util.Set;
import static org.apache.flink.table.descriptors.Schema.SCHEMA;
import static org.apache.flink.table.types.utils.TypeConversions.fromLogicalToDataType;
@@ -53,7 +56,7 @@ public class UpdateSchema {
List<String> primaryKeys,
Map<String, String> options,
String comment) {
- this.rowType = rowType;
+ this.rowType = validateRowType(rowType, primaryKeys, partitionKeys);
this.partitionKeys = partitionKeys;
this.primaryKeys = primaryKeys;
this.options = new HashMap<>(options);
@@ -80,6 +83,47 @@ public class UpdateSchema {
return comment;
}
+ private RowType validateRowType(
+ RowType rowType, List<String> primaryKeys, List<String> partitionKeys) {
+ List<String> fieldNames = rowType.getFieldNames();
+ Set<String> allFields = new HashSet<>(fieldNames);
+ Preconditions.checkState(
+ allFields.containsAll(partitionKeys),
+ "Table column %s should include all partition fields %s",
+ fieldNames,
+ partitionKeys);
+
+ if (primaryKeys.isEmpty()) {
+ return rowType;
+ }
+ Preconditions.checkState(
+ allFields.containsAll(primaryKeys),
+ "Table column %s should include all primary key constraint %s",
+ fieldNames,
+ primaryKeys);
+ Set<String> pkSet = new HashSet<>(primaryKeys);
+ Preconditions.checkState(
+ pkSet.containsAll(partitionKeys),
+ "Primary key constraint %s should include all partition fields %s",
+ primaryKeys,
+ partitionKeys);
+
+ // primary key should not nullable
+ List<RowType.RowField> fields = new ArrayList<>();
+ for (RowType.RowField field : rowType.getFields()) {
+ if (pkSet.contains(field.getName()) && field.getType().isNullable()) {
+ fields.add(
+ new RowType.RowField(
+ field.getName(),
+ field.getType().copy(false),
+ field.getDescription().orElse(null)));
+ } else {
+ fields.add(field);
+ }
+ }
+ return new RowType(false, fields);
+ }
+
@Override
public String toString() {
return "UpdateSchema{"
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/schema/SchemaManagerTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/schema/SchemaManagerTest.java
index 59f3b959..20cd4acd 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/schema/SchemaManagerTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/schema/SchemaManagerTest.java
@@ -93,8 +93,8 @@ public class SchemaManagerTest {
List<DataField> fields =
Arrays.asList(
- new DataField(0, "f0", new AtomicDataType(new IntType())),
- new DataField(1, "f1", new AtomicDataType(new BigIntType())),
+ new DataField(0, "f0", new AtomicDataType(new IntType(false))),
+ new DataField(1, "f1", new AtomicDataType(new BigIntType(false))),
new DataField(2, "f2", new AtomicDataType(new VarCharType())));
assertThat(latest.isPresent()).isTrue();
diff --git a/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkReadITCase.java b/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkReadITCase.java
index 95fcfafa..1326ae27 100644
--- a/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkReadITCase.java
+++ b/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkReadITCase.java
@@ -40,6 +40,7 @@ import org.apache.flink.table.types.logical.VarCharType;
import org.apache.flink.types.RowKind;
import org.apache.commons.io.FileUtils;
+import org.apache.spark.sql.AnalysisException;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
@@ -316,6 +317,24 @@ public class SparkReadITCase {
.isTrue();
}
+ @Test
+ public void testAlterPrimaryKeyNullability() {
+ spark.sql("USE table_store");
+ spark.sql(
+ "CREATE TABLE default.testAlterPkNullability (\n"
+ + "a BIGINT,\n"
+ + "b STRING) USING table_store\n"
+ + "COMMENT 'table comment'\n"
+ + "TBLPROPERTIES ('primary-key' = 'a')");
+ assertThatThrownBy(
+ () ->
+ spark.sql(
+ "ALTER TABLE default.testAlterPkNullability ALTER COLUMN a DROP NOT NULL"))
+ .getRootCause()
+ .isInstanceOf(UnsupportedOperationException.class)
+ .hasMessageContaining("Cannot change nullability of primary key");
+ }
+
@Test
public void testAlterTableColumnComment() {
assertThat(getField(schema1(), 0).description()).isNull();
@@ -356,6 +375,73 @@ public class SparkReadITCase {
.isEqualTo("a boolean array");
}
+ @Test
+ public void testCreateTableWithNullablePk() {
+ spark.sql("USE table_store");
+ spark.sql(
+ "CREATE TABLE default.PkTable (\n"
+ + "a BIGINT,\n"
+ + "b STRING) USING table_store\n"
+ + "COMMENT 'table comment'\n"
+ + "TBLPROPERTIES ('primary-key' = 'a')");
+ Path tablePath = new Path(warehousePath, "default.db/PkTable");
+ TableSchema schema = FileStoreTableFactory.create(tablePath).schema();
+ assertThat(schema.logicalRowType().getTypeAt(0).isNullable()).isFalse();
+ }
+
+ @Test
+ public void testCreateTableWithInvalidPk() {
+ spark.sql("USE table_store");
+ assertThatThrownBy(
+ () ->
+ spark.sql(
+ "CREATE TABLE default.PartitionedPkTable (\n"
+ + "a BIGINT,\n"
+ + "b STRING,\n"
+ + "c DOUBLE) USING table_store\n"
+ + "COMMENT 'table comment'\n"
+ + "PARTITIONED BY (b)"
+ + "TBLPROPERTIES ('primary-key' = 'a')"))
+ .isInstanceOf(IllegalStateException.class)
+ .hasMessageContaining(
+ "Primary key constraint [a] should include all partition fields [b]");
+ }
+
+ @Test
+ public void testCreateTableWithNonexistentPk() {
+ spark.sql("USE table_store");
+ assertThatThrownBy(
+ () ->
+ spark.sql(
+ "CREATE TABLE default.PartitionedPkTable (\n"
+ + "a BIGINT,\n"
+ + "b STRING,\n"
+ + "c DOUBLE) USING table_store\n"
+ + "COMMENT 'table comment'\n"
+ + "PARTITIONED BY (b)"
+ + "TBLPROPERTIES ('primary-key' = 'd')"))
+ .isInstanceOf(IllegalStateException.class)
+ .hasMessageContaining(
+ "Table column [a, b, c] should include all primary key constraint [d]");
+ }
+
+ @Test
+ public void testCreateTableWithNonexistentPartition() {
+ spark.sql("USE table_store");
+ assertThatThrownBy(
+ () ->
+ spark.sql(
+ "CREATE TABLE default.PartitionedPkTable (\n"
+ + "a BIGINT,\n"
+ + "b STRING,\n"
+ + "c DOUBLE) USING table_store\n"
+ + "COMMENT 'table comment'\n"
+ + "PARTITIONED BY (d)"
+ + "TBLPROPERTIES ('primary-key' = 'a')"))
+ .isInstanceOf(AnalysisException.class)
+ .hasMessageContaining("Couldn't find column d");
+ }
+
@Test
public void testCreateAndDropTable() throws Exception {
innerTest("MyTable1", true, true, false);