You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2022/11/24 03:15:23 UTC
[flink-table-store] branch master updated: [FLINK-30161] Add table schema validation for full compaction
This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git
The following commit(s) were added to refs/heads/master by this push:
new 2e06ae5c [FLINK-30161] Add table schema validation for full compaction
2e06ae5c is described below
commit 2e06ae5c81cf14f65ff7bb3619bed4a528fe8e68
Author: shammon <zj...@gmail.com>
AuthorDate: Thu Nov 24 11:15:18 2022 +0800
[FLINK-30161] Add table schema validation for full compaction
This closes #399
---
.../org/apache/flink/table/store/CoreOptions.java | 52 ++++++++++++++++++----
.../table/store/file/schema/SchemaManager.java | 2 +
.../table/store/table/FileStoreTableFactory.java | 19 +-------
.../table/store/file/schema/SchemaManagerTest.java | 20 +++++++++
4 files changed, 67 insertions(+), 26 deletions(-)
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/CoreOptions.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/CoreOptions.java
index 8e4d65dd..584bc2af 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/CoreOptions.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/CoreOptions.java
@@ -29,6 +29,7 @@ import org.apache.flink.configuration.description.Description;
import org.apache.flink.configuration.description.InlineElement;
import org.apache.flink.core.fs.Path;
import org.apache.flink.table.store.file.WriteMode;
+import org.apache.flink.table.store.file.schema.TableSchema;
import org.apache.flink.table.store.format.FileFormat;
import org.apache.flink.util.Preconditions;
@@ -389,15 +390,6 @@ public class CoreOptions implements Serializable {
public CoreOptions(Configuration options) {
this.options = options;
- // TODO validate all keys
- Preconditions.checkArgument(
- snapshotNumRetainMin() > 0,
- SNAPSHOT_NUM_RETAINED_MIN.key() + " should be at least 1");
- Preconditions.checkArgument(
- snapshotNumRetainMin() <= snapshotNumRetainMax(),
- SNAPSHOT_NUM_RETAINED_MIN.key()
- + " should not be larger than "
- + SNAPSHOT_NUM_RETAINED_MAX.key());
}
public int bucket() {
@@ -544,6 +536,10 @@ public class CoreOptions implements Serializable {
return options.getOptional(SEQUENCE_FIELD);
}
+ public WriteMode writeMode() {
+ return options.get(WRITE_MODE);
+ }
+
public boolean writeCompactionSkip() {
return options.get(WRITE_COMPACTION_SKIP);
}
@@ -695,6 +691,44 @@ public class CoreOptions implements Serializable {
}
}
+ /**
+ * Validate the {@link TableSchema} and {@link CoreOptions}.
+ *
+ * <p>TODO validate all items in schema and all keys in options.
+ *
+ * @param schema the schema to be validated
+ */
+ public static void validateTableSchema(TableSchema schema) {
+ CoreOptions options = new CoreOptions(schema.options());
+ if (options.logStartupMode() == LogStartupMode.FROM_TIMESTAMP) {
+ if (options.logScanTimestampMills() == null) {
+ throw new IllegalArgumentException(
+ String.format(
+ "%s can not be null when you use %s for %s",
+ CoreOptions.LOG_SCAN_TIMESTAMP_MILLS.key(),
+ CoreOptions.LogStartupMode.FROM_TIMESTAMP,
+ CoreOptions.LOG_SCAN.key()));
+ }
+ }
+
+ Preconditions.checkArgument(
+ options.snapshotNumRetainMin() > 0,
+ SNAPSHOT_NUM_RETAINED_MIN.key() + " should be at least 1");
+ Preconditions.checkArgument(
+ options.snapshotNumRetainMin() <= options.snapshotNumRetainMax(),
+ SNAPSHOT_NUM_RETAINED_MIN.key()
+ + " should not be larger than "
+ + SNAPSHOT_NUM_RETAINED_MAX.key());
+
+ // Only changelog tables with primary keys support full compaction
+ if (options.changelogProducer() == CoreOptions.ChangelogProducer.FULL_COMPACTION
+ && options.writeMode() == WriteMode.CHANGE_LOG
+ && schema.primaryKeys().isEmpty()) {
+ throw new UnsupportedOperationException(
+ "Changelog table with full compaction must have primary keys");
+ }
+ }
+
@Internal
public static List<ConfigOption<?>> getOptions() {
final Field[] fields = CoreOptions.class.getFields();
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/SchemaManager.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/SchemaManager.java
index 2d020f21..dc6fa535 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/SchemaManager.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/SchemaManager.java
@@ -336,6 +336,8 @@ public class SchemaManager implements Serializable {
}
private boolean commit(TableSchema newSchema) throws Exception {
+ CoreOptions.validateTableSchema(newSchema);
+
Path schemaPath = toSchemaPath(newSchema.id());
FileSystem fs = schemaPath.getFileSystem();
Callable<Boolean> callable =
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/FileStoreTableFactory.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/FileStoreTableFactory.java
index d33c138e..1f61bb08 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/FileStoreTableFactory.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/FileStoreTableFactory.java
@@ -21,7 +21,6 @@ package org.apache.flink.table.store.table;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.Path;
import org.apache.flink.table.store.CoreOptions;
-import org.apache.flink.table.store.CoreOptions.LogStartupMode;
import org.apache.flink.table.store.file.WriteMode;
import org.apache.flink.table.store.file.schema.SchemaManager;
import org.apache.flink.table.store.file.schema.TableSchema;
@@ -62,11 +61,10 @@ public class FileStoreTableFactory {
dynamicOptions.toMap().forEach(newOptions::setString);
newOptions.set(PATH, tablePath.toString());
- // validate merged options
- validateOptions(new CoreOptions(newOptions));
-
// copy a new table store to contain dynamic options
tableSchema = tableSchema.copy(newOptions.toMap());
+ // validate schema wit new options
+ CoreOptions.validateTableSchema(tableSchema);
SchemaManager schemaManager = new SchemaManager(tablePath);
if (newOptions.get(CoreOptions.WRITE_MODE) == WriteMode.APPEND_ONLY) {
@@ -79,17 +77,4 @@ public class FileStoreTableFactory {
}
}
}
-
- private static void validateOptions(CoreOptions options) {
- if (options.logStartupMode() == LogStartupMode.FROM_TIMESTAMP) {
- if (options.logScanTimestampMills() == null) {
- throw new IllegalArgumentException(
- String.format(
- "%s can not be null when you use %s for %s",
- CoreOptions.LOG_SCAN_TIMESTAMP_MILLS.key(),
- CoreOptions.LogStartupMode.FROM_TIMESTAMP,
- CoreOptions.LOG_SCAN.key()));
- }
- }
- }
}
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/schema/SchemaManagerTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/schema/SchemaManagerTest.java
index 1210375f..5d6a4f94 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/schema/SchemaManagerTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/schema/SchemaManagerTest.java
@@ -238,4 +238,24 @@ public class SchemaManagerTest {
assertThat(tableSchema.primaryKeys()).isEqualTo(primaryKeys);
assertThat(tableSchema.options()).isEqualTo(options);
}
+
+ @Test
+ public void testChangelogTableWithFullCompaction() throws Exception {
+ Map<String, String> options = new HashMap<>();
+ options.put("key", "value");
+ options.put(
+ CoreOptions.CHANGELOG_PRODUCER.key(),
+ CoreOptions.ChangelogProducer.FULL_COMPACTION.toString());
+
+ final UpdateSchema schemaWithoutPrimaryKeys =
+ new UpdateSchema(
+ rowType, Collections.EMPTY_LIST, Collections.EMPTY_LIST, options, "");
+ assertThatThrownBy(() -> manager.commitNewVersion(schemaWithoutPrimaryKeys))
+ .isInstanceOf(UnsupportedOperationException.class)
+ .hasMessage("Changelog table with full compaction must have primary keys");
+
+ final UpdateSchema schemaWithPrimaryKeys =
+ new UpdateSchema(rowType, partitionKeys, primaryKeys, options, "");
+ retryArtificialException(() -> manager.commitNewVersion(schemaWithPrimaryKeys));
+ }
}