You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2022/11/24 03:15:23 UTC

[flink-table-store] branch master updated: [FLINK-30161] Add table schema validation for full compaction

This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git


The following commit(s) were added to refs/heads/master by this push:
     new 2e06ae5c [FLINK-30161] Add table schema validation for full compaction
2e06ae5c is described below

commit 2e06ae5c81cf14f65ff7bb3619bed4a528fe8e68
Author: shammon <zj...@gmail.com>
AuthorDate: Thu Nov 24 11:15:18 2022 +0800

    [FLINK-30161] Add table schema validation for full compaction
    
    This closes #399
---
 .../org/apache/flink/table/store/CoreOptions.java  | 52 ++++++++++++++++++----
 .../table/store/file/schema/SchemaManager.java     |  2 +
 .../table/store/table/FileStoreTableFactory.java   | 19 +-------
 .../table/store/file/schema/SchemaManagerTest.java | 20 +++++++++
 4 files changed, 67 insertions(+), 26 deletions(-)

diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/CoreOptions.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/CoreOptions.java
index 8e4d65dd..584bc2af 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/CoreOptions.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/CoreOptions.java
@@ -29,6 +29,7 @@ import org.apache.flink.configuration.description.Description;
 import org.apache.flink.configuration.description.InlineElement;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.table.store.file.WriteMode;
+import org.apache.flink.table.store.file.schema.TableSchema;
 import org.apache.flink.table.store.format.FileFormat;
 import org.apache.flink.util.Preconditions;
 
@@ -389,15 +390,6 @@ public class CoreOptions implements Serializable {
 
     public CoreOptions(Configuration options) {
         this.options = options;
-        // TODO validate all keys
-        Preconditions.checkArgument(
-                snapshotNumRetainMin() > 0,
-                SNAPSHOT_NUM_RETAINED_MIN.key() + " should be at least 1");
-        Preconditions.checkArgument(
-                snapshotNumRetainMin() <= snapshotNumRetainMax(),
-                SNAPSHOT_NUM_RETAINED_MIN.key()
-                        + " should not be larger than "
-                        + SNAPSHOT_NUM_RETAINED_MAX.key());
     }
 
     public int bucket() {
@@ -544,6 +536,10 @@ public class CoreOptions implements Serializable {
         return options.getOptional(SEQUENCE_FIELD);
     }
 
+    public WriteMode writeMode() {
+        return options.get(WRITE_MODE);
+    }
+
     public boolean writeCompactionSkip() {
         return options.get(WRITE_COMPACTION_SKIP);
     }
@@ -695,6 +691,44 @@ public class CoreOptions implements Serializable {
         }
     }
 
+    /**
+     * Validate the {@link TableSchema} and {@link CoreOptions}.
+     *
+     * <p>TODO validate all items in schema and all keys in options.
+     *
+     * @param schema the schema to be validated
+     */
+    public static void validateTableSchema(TableSchema schema) {
+        CoreOptions options = new CoreOptions(schema.options());
+        if (options.logStartupMode() == LogStartupMode.FROM_TIMESTAMP) {
+            if (options.logScanTimestampMills() == null) {
+                throw new IllegalArgumentException(
+                        String.format(
+                                "%s can not be null when you use %s for %s",
+                                CoreOptions.LOG_SCAN_TIMESTAMP_MILLS.key(),
+                                CoreOptions.LogStartupMode.FROM_TIMESTAMP,
+                                CoreOptions.LOG_SCAN.key()));
+            }
+        }
+
+        Preconditions.checkArgument(
+                options.snapshotNumRetainMin() > 0,
+                SNAPSHOT_NUM_RETAINED_MIN.key() + " should be at least 1");
+        Preconditions.checkArgument(
+                options.snapshotNumRetainMin() <= options.snapshotNumRetainMax(),
+                SNAPSHOT_NUM_RETAINED_MIN.key()
+                        + " should not be larger than "
+                        + SNAPSHOT_NUM_RETAINED_MAX.key());
+
+        // Only changelog tables with primary keys support full compaction
+        if (options.changelogProducer() == CoreOptions.ChangelogProducer.FULL_COMPACTION
+                && options.writeMode() == WriteMode.CHANGE_LOG
+                && schema.primaryKeys().isEmpty()) {
+            throw new UnsupportedOperationException(
+                    "Changelog table with full compaction must have primary keys");
+        }
+    }
+
     @Internal
     public static List<ConfigOption<?>> getOptions() {
         final Field[] fields = CoreOptions.class.getFields();
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/SchemaManager.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/SchemaManager.java
index 2d020f21..dc6fa535 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/SchemaManager.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/SchemaManager.java
@@ -336,6 +336,8 @@ public class SchemaManager implements Serializable {
     }
 
     private boolean commit(TableSchema newSchema) throws Exception {
+        CoreOptions.validateTableSchema(newSchema);
+
         Path schemaPath = toSchemaPath(newSchema.id());
         FileSystem fs = schemaPath.getFileSystem();
         Callable<Boolean> callable =
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/FileStoreTableFactory.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/FileStoreTableFactory.java
index d33c138e..1f61bb08 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/FileStoreTableFactory.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/FileStoreTableFactory.java
@@ -21,7 +21,6 @@ package org.apache.flink.table.store.table;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.table.store.CoreOptions;
-import org.apache.flink.table.store.CoreOptions.LogStartupMode;
 import org.apache.flink.table.store.file.WriteMode;
 import org.apache.flink.table.store.file.schema.SchemaManager;
 import org.apache.flink.table.store.file.schema.TableSchema;
@@ -62,11 +61,10 @@ public class FileStoreTableFactory {
         dynamicOptions.toMap().forEach(newOptions::setString);
         newOptions.set(PATH, tablePath.toString());
 
-        // validate merged options
-        validateOptions(new CoreOptions(newOptions));
-
         // copy a new table store to contain dynamic options
         tableSchema = tableSchema.copy(newOptions.toMap());
+        // validate schema wit new options
+        CoreOptions.validateTableSchema(tableSchema);
 
         SchemaManager schemaManager = new SchemaManager(tablePath);
         if (newOptions.get(CoreOptions.WRITE_MODE) == WriteMode.APPEND_ONLY) {
@@ -79,17 +77,4 @@ public class FileStoreTableFactory {
             }
         }
     }
-
-    private static void validateOptions(CoreOptions options) {
-        if (options.logStartupMode() == LogStartupMode.FROM_TIMESTAMP) {
-            if (options.logScanTimestampMills() == null) {
-                throw new IllegalArgumentException(
-                        String.format(
-                                "%s can not be null when you use %s for %s",
-                                CoreOptions.LOG_SCAN_TIMESTAMP_MILLS.key(),
-                                CoreOptions.LogStartupMode.FROM_TIMESTAMP,
-                                CoreOptions.LOG_SCAN.key()));
-            }
-        }
-    }
 }
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/schema/SchemaManagerTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/schema/SchemaManagerTest.java
index 1210375f..5d6a4f94 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/schema/SchemaManagerTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/schema/SchemaManagerTest.java
@@ -238,4 +238,24 @@ public class SchemaManagerTest {
         assertThat(tableSchema.primaryKeys()).isEqualTo(primaryKeys);
         assertThat(tableSchema.options()).isEqualTo(options);
     }
+
+    @Test
+    public void testChangelogTableWithFullCompaction() throws Exception {
+        Map<String, String> options = new HashMap<>();
+        options.put("key", "value");
+        options.put(
+                CoreOptions.CHANGELOG_PRODUCER.key(),
+                CoreOptions.ChangelogProducer.FULL_COMPACTION.toString());
+
+        final UpdateSchema schemaWithoutPrimaryKeys =
+                new UpdateSchema(
+                        rowType, Collections.EMPTY_LIST, Collections.EMPTY_LIST, options, "");
+        assertThatThrownBy(() -> manager.commitNewVersion(schemaWithoutPrimaryKeys))
+                .isInstanceOf(UnsupportedOperationException.class)
+                .hasMessage("Changelog table with full compaction must have primary keys");
+
+        final UpdateSchema schemaWithPrimaryKeys =
+                new UpdateSchema(rowType, partitionKeys, primaryKeys, options, "");
+        retryArtificialException(() -> manager.commitNewVersion(schemaWithPrimaryKeys));
+    }
 }