You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@ignite.apache.org by GitBox <gi...@apache.org> on 2021/05/18 11:12:01 UTC

[GitHub] [ignite-3] AMashenkov commented on a change in pull request #91: IGNITE-14077: Distributed schema manager.

AMashenkov commented on a change in pull request #91:
URL: https://github.com/apache/ignite-3/pull/91#discussion_r634278035



##########
File path: modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaManager.java
##########
@@ -17,60 +17,268 @@
 
 package org.apache.ignite.internal.schema;
 
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
 import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
 import org.apache.ignite.configuration.internal.ConfigurationManager;
+import org.apache.ignite.configuration.schemas.table.ColumnTypeView;
+import org.apache.ignite.configuration.schemas.table.ColumnView;
+import org.apache.ignite.configuration.schemas.table.TableConfiguration;
+import org.apache.ignite.configuration.schemas.table.TableIndexConfiguration;
+import org.apache.ignite.configuration.schemas.table.TablesConfiguration;
+import org.apache.ignite.configuration.tree.NamedListView;
+import org.apache.ignite.internal.metastorage.MetaStorageManager;
+import org.apache.ignite.internal.schema.registry.SchemaRegistryException;
+import org.apache.ignite.internal.schema.registry.SchemaRegistryImpl;
+import org.apache.ignite.internal.util.ByteUtils;
+import org.apache.ignite.internal.vault.VaultManager;
+import org.apache.ignite.lang.ByteArray;
+import org.apache.ignite.lang.IgniteLogger;
+import org.apache.ignite.metastorage.common.Conditions;
+import org.apache.ignite.metastorage.common.Cursor;
+import org.apache.ignite.metastorage.common.Entry;
+import org.apache.ignite.metastorage.common.Key;
+import org.apache.ignite.metastorage.common.Operations;
+import org.apache.ignite.metastorage.common.WatchEvent;
+import org.apache.ignite.metastorage.common.WatchListener;
+import org.apache.ignite.schema.PrimaryIndex;
+import org.jetbrains.annotations.NotNull;
 
 /**
  * Schema Manager.
  */
-// TODO: IGNITE-14586 Remove @SuppressWarnings when implementation provided.
-@SuppressWarnings({"FieldCanBeLocal", "unused"}) public class SchemaManager {
-    /** Configuration manager in order to handle and listen schema specific configuration.*/
+public class SchemaManager {
+    /** The logger. */
+    private static final IgniteLogger LOG = IgniteLogger.forClass(SchemaManager.class);
+
+    /** Internal prefix for the metasorage. */
+    private static final String INTERNAL_PREFIX = "internal.tables.schema.";
+
+    /** Schema history item key suffix. */
+    private static final String INTERNAL_VER_SUFFIX = ".ver.";
+
+    /** Configuration manager in order to handle and listen schema specific configuration. */
     private final ConfigurationManager configurationMgr;
 
+    /** Metastorage manager. */
+    private final MetaStorageManager metaStorageMgr;
+
+    /** Vault manager. */
+    private final VaultManager vaultMgr;
+
     /** Schema. */
-    private final SchemaDescriptor schema;
+    private final Map<UUID, SchemaRegistryImpl> schemes = new ConcurrentHashMap<>();
 
     /**
      * The constructor.
      *
      * @param configurationMgr Configuration manager.
+     * @param metaStorageMgr Metastorage manager.
+     * @param vaultMgr Vault manager.
      */
-    public SchemaManager(ConfigurationManager configurationMgr) {
+    public SchemaManager(
+        ConfigurationManager configurationMgr,
+        MetaStorageManager metaStorageMgr,
+        VaultManager vaultMgr
+    ) {
         this.configurationMgr = configurationMgr;
+        this.metaStorageMgr = metaStorageMgr;
+        this.vaultMgr = vaultMgr;
+
+        metaStorageMgr.registerWatchByPrefix(new Key(INTERNAL_PREFIX), new WatchListener() {
+            /** {@inheritDoc} */
+            @Override public boolean onUpdate(@NotNull Iterable<WatchEvent> events) {
+                for (WatchEvent evt : events) {
+                    String keyTail = evt.newEntry().key().toString().substring(INTERNAL_PREFIX.length() - 1);
+
+                    int verPos = keyTail.indexOf(INTERNAL_VER_SUFFIX);
 
-        this.schema = new SchemaDescriptor(1,
-            new Column[] {
-                new Column("key", NativeType.LONG, false)
-            },
-            new Column[] {
-                new Column("value", NativeType.LONG, false)
+                    // Last table schema version changed.
+                    if (verPos == -1) {
+                        UUID tblId = UUID.fromString(keyTail);
+
+                        if (evt.oldEntry() == null)  // Initial schema added.
+                            schemes.put(tblId, new SchemaRegistryImpl());

Review comment:
       Because schema itself is written under another key with the specific version.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org