You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@ignite.apache.org by GitBox <gi...@apache.org> on 2021/04/12 16:38:20 UTC

[GitHub] [ignite-3] AMashenkov opened a new pull request #91: IGNITE-14077: Distributed schema manager.

AMashenkov opened a new pull request #91:
URL: https://github.com/apache/ignite-3/pull/91


   Minor refactoring.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite-3] sanpwc commented on a change in pull request #91: IGNITE-14077: Distributed schema manager.

Posted by GitBox <gi...@apache.org>.
sanpwc commented on a change in pull request #91:
URL: https://github.com/apache/ignite-3/pull/91#discussion_r632527783



##########
File path: modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaManager.java
##########
@@ -17,60 +17,310 @@
 
 package org.apache.ignite.internal.schema;
 
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
 import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
 import org.apache.ignite.configuration.internal.ConfigurationManager;
+import org.apache.ignite.configuration.schemas.table.ColumnTypeView;
+import org.apache.ignite.configuration.schemas.table.ColumnView;
+import org.apache.ignite.configuration.schemas.table.TableConfiguration;
+import org.apache.ignite.configuration.schemas.table.TableIndexConfiguration;
+import org.apache.ignite.configuration.schemas.table.TablesConfiguration;
+import org.apache.ignite.configuration.tree.NamedListView;
+import org.apache.ignite.internal.metastorage.MetaStorageManager;
+import org.apache.ignite.internal.schema.registry.SchemaRegistryException;
+import org.apache.ignite.internal.schema.registry.SchemaRegistryImpl;
+import org.apache.ignite.internal.util.ByteUtils;
+import org.apache.ignite.internal.vault.VaultManager;
+import org.apache.ignite.lang.ByteArray;
+import org.apache.ignite.lang.IgniteLogger;
+import org.apache.ignite.metastorage.common.Conditions;
+import org.apache.ignite.metastorage.common.Cursor;
+import org.apache.ignite.metastorage.common.Entry;
+import org.apache.ignite.metastorage.common.Key;
+import org.apache.ignite.metastorage.common.Operations;
+import org.apache.ignite.metastorage.common.WatchEvent;
+import org.apache.ignite.metastorage.common.WatchListener;
+import org.apache.ignite.schema.PrimaryIndex;
+import org.jetbrains.annotations.NotNull;
 
 /**
  * Schema Manager.
  */
-// TODO: IGNITE-14586 Remove @SuppressWarnings when implementation provided.
-@SuppressWarnings({"FieldCanBeLocal", "unused"}) public class SchemaManager {
-    /** Configuration manager in order to handle and listen schema specific configuration.*/
+public class SchemaManager {
+    /** The logger. */
+    private static final IgniteLogger LOG = IgniteLogger.forClass(SchemaManager.class);
+
+    /** Internal prefix for the metasorage. */
+    private static final String INTERNAL_PREFIX = "internal.tables.schema.";
+
+    /** Schema history item key suffix. */
+    protected static final String INTERNAL_VER_SUFFIX = ".ver.";
+
+    /** Configuration manager in order to handle and listen schema specific configuration. */
     private final ConfigurationManager configurationMgr;
 
+    /** Metastorage manager. */
+    private final MetaStorageManager metaStorageMgr;
+
+    /** Vault manager. */
+    private final VaultManager vaultManager;

Review comment:
       Mgr?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite-3] AMashenkov commented on a change in pull request #91: IGNITE-14077: Distributed schema manager.

Posted by GitBox <gi...@apache.org>.
AMashenkov commented on a change in pull request #91:
URL: https://github.com/apache/ignite-3/pull/91#discussion_r634278035



##########
File path: modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaManager.java
##########
@@ -17,60 +17,268 @@
 
 package org.apache.ignite.internal.schema;
 
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
 import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
 import org.apache.ignite.configuration.internal.ConfigurationManager;
+import org.apache.ignite.configuration.schemas.table.ColumnTypeView;
+import org.apache.ignite.configuration.schemas.table.ColumnView;
+import org.apache.ignite.configuration.schemas.table.TableConfiguration;
+import org.apache.ignite.configuration.schemas.table.TableIndexConfiguration;
+import org.apache.ignite.configuration.schemas.table.TablesConfiguration;
+import org.apache.ignite.configuration.tree.NamedListView;
+import org.apache.ignite.internal.metastorage.MetaStorageManager;
+import org.apache.ignite.internal.schema.registry.SchemaRegistryException;
+import org.apache.ignite.internal.schema.registry.SchemaRegistryImpl;
+import org.apache.ignite.internal.util.ByteUtils;
+import org.apache.ignite.internal.vault.VaultManager;
+import org.apache.ignite.lang.ByteArray;
+import org.apache.ignite.lang.IgniteLogger;
+import org.apache.ignite.metastorage.common.Conditions;
+import org.apache.ignite.metastorage.common.Cursor;
+import org.apache.ignite.metastorage.common.Entry;
+import org.apache.ignite.metastorage.common.Key;
+import org.apache.ignite.metastorage.common.Operations;
+import org.apache.ignite.metastorage.common.WatchEvent;
+import org.apache.ignite.metastorage.common.WatchListener;
+import org.apache.ignite.schema.PrimaryIndex;
+import org.jetbrains.annotations.NotNull;
 
 /**
  * Schema Manager.
  */
-// TODO: IGNITE-14586 Remove @SuppressWarnings when implementation provided.
-@SuppressWarnings({"FieldCanBeLocal", "unused"}) public class SchemaManager {
-    /** Configuration manager in order to handle and listen schema specific configuration.*/
+public class SchemaManager {
+    /** The logger. */
+    private static final IgniteLogger LOG = IgniteLogger.forClass(SchemaManager.class);
+
+    /** Internal prefix for the metasorage. */
+    private static final String INTERNAL_PREFIX = "internal.tables.schema.";
+
+    /** Schema history item key suffix. */
+    private static final String INTERNAL_VER_SUFFIX = ".ver.";
+
+    /** Configuration manager in order to handle and listen schema specific configuration. */
     private final ConfigurationManager configurationMgr;
 
+    /** Metastorage manager. */
+    private final MetaStorageManager metaStorageMgr;
+
+    /** Vault manager. */
+    private final VaultManager vaultMgr;
+
     /** Schema. */
-    private final SchemaDescriptor schema;
+    private final Map<UUID, SchemaRegistryImpl> schemes = new ConcurrentHashMap<>();
 
     /**
      * The constructor.
      *
      * @param configurationMgr Configuration manager.
+     * @param metaStorageMgr Metastorage manager.
+     * @param vaultMgr Vault manager.
      */
-    public SchemaManager(ConfigurationManager configurationMgr) {
+    public SchemaManager(
+        ConfigurationManager configurationMgr,
+        MetaStorageManager metaStorageMgr,
+        VaultManager vaultMgr
+    ) {
         this.configurationMgr = configurationMgr;
+        this.metaStorageMgr = metaStorageMgr;
+        this.vaultMgr = vaultMgr;
+
+        metaStorageMgr.registerWatchByPrefix(new Key(INTERNAL_PREFIX), new WatchListener() {
+            /** {@inheritDoc} */
+            @Override public boolean onUpdate(@NotNull Iterable<WatchEvent> events) {
+                for (WatchEvent evt : events) {
+                    String keyTail = evt.newEntry().key().toString().substring(INTERNAL_PREFIX.length() - 1);
+
+                    int verPos = keyTail.indexOf(INTERNAL_VER_SUFFIX);
 
-        this.schema = new SchemaDescriptor(1,
-            new Column[] {
-                new Column("key", NativeType.LONG, false)
-            },
-            new Column[] {
-                new Column("value", NativeType.LONG, false)
+                    // Last table schema version changed.
+                    if (verPos == -1) {
+                        UUID tblId = UUID.fromString(keyTail);
+
+                        if (evt.oldEntry() == null)  // Initial schema added.
+                            schemes.put(tblId, new SchemaRegistryImpl());

Review comment:
       Because schema itself is written under another key with the specific version.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite-3] sanpwc commented on a change in pull request #91: IGNITE-14077: Distributed schema manager.

Posted by GitBox <gi...@apache.org>.
sanpwc commented on a change in pull request #91:
URL: https://github.com/apache/ignite-3/pull/91#discussion_r632534089



##########
File path: modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaManager.java
##########
@@ -17,60 +17,310 @@
 
 package org.apache.ignite.internal.schema;
 
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
 import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
 import org.apache.ignite.configuration.internal.ConfigurationManager;
+import org.apache.ignite.configuration.schemas.table.ColumnTypeView;
+import org.apache.ignite.configuration.schemas.table.ColumnView;
+import org.apache.ignite.configuration.schemas.table.TableConfiguration;
+import org.apache.ignite.configuration.schemas.table.TableIndexConfiguration;
+import org.apache.ignite.configuration.schemas.table.TablesConfiguration;
+import org.apache.ignite.configuration.tree.NamedListView;
+import org.apache.ignite.internal.metastorage.MetaStorageManager;
+import org.apache.ignite.internal.schema.registry.SchemaRegistryException;
+import org.apache.ignite.internal.schema.registry.SchemaRegistryImpl;
+import org.apache.ignite.internal.util.ByteUtils;
+import org.apache.ignite.internal.vault.VaultManager;
+import org.apache.ignite.lang.ByteArray;
+import org.apache.ignite.lang.IgniteLogger;
+import org.apache.ignite.metastorage.common.Conditions;
+import org.apache.ignite.metastorage.common.Cursor;
+import org.apache.ignite.metastorage.common.Entry;
+import org.apache.ignite.metastorage.common.Key;
+import org.apache.ignite.metastorage.common.Operations;
+import org.apache.ignite.metastorage.common.WatchEvent;
+import org.apache.ignite.metastorage.common.WatchListener;
+import org.apache.ignite.schema.PrimaryIndex;
+import org.jetbrains.annotations.NotNull;
 
 /**
  * Schema Manager.
  */
-// TODO: IGNITE-14586 Remove @SuppressWarnings when implementation provided.
-@SuppressWarnings({"FieldCanBeLocal", "unused"}) public class SchemaManager {
-    /** Configuration manager in order to handle and listen schema specific configuration.*/
+public class SchemaManager {
+    /** The logger. */
+    private static final IgniteLogger LOG = IgniteLogger.forClass(SchemaManager.class);
+
+    /** Internal prefix for the metasorage. */
+    private static final String INTERNAL_PREFIX = "internal.tables.schema.";
+
+    /** Schema history item key suffix. */
+    protected static final String INTERNAL_VER_SUFFIX = ".ver.";
+
+    /** Configuration manager in order to handle and listen schema specific configuration. */
     private final ConfigurationManager configurationMgr;
 
+    /** Metastorage manager. */
+    private final MetaStorageManager metaStorageMgr;
+
+    /** Vault manager. */
+    private final VaultManager vaultManager;
+
+    /** Schema history subscription future. */
+    private CompletableFuture<Long> schemaHistorySubscriptionFut;
+
     /** Schema. */
-    private final SchemaDescriptor schema;
+    private final Map<UUID, SchemaRegistryImpl> schemes = new ConcurrentHashMap<>();
 
     /**
      * The constructor.
      *
      * @param configurationMgr Configuration manager.
+     * @param metaStorageMgr Metastorage manager.
+     * @param vaultManager Vault manager.
      */
-    public SchemaManager(ConfigurationManager configurationMgr) {
+    public SchemaManager(
+        ConfigurationManager configurationMgr,
+        MetaStorageManager metaStorageMgr,
+        VaultManager vaultManager
+    ) {
         this.configurationMgr = configurationMgr;
+        this.metaStorageMgr = metaStorageMgr;
+        this.vaultManager = vaultManager;
+
+        schemaHistorySubscriptionFut = metaStorageMgr.registerWatchByPrefix(new Key(INTERNAL_PREFIX), new WatchListener() {
+            /** {@inheritDoc} */
+            @Override public boolean onUpdate(@NotNull Iterable<WatchEvent> events) {
+                for (WatchEvent evt : events) {
+                    String keyTail = evt.newEntry().key().toString().substring(INTERNAL_PREFIX.length() - 1);
+
+                    int verPos = keyTail.indexOf(INTERNAL_VER_SUFFIX);
+
+                    // Last table schema version changed.
+                    if (verPos == -1) {
+                        UUID tblId = UUID.fromString(keyTail);
 
-        this.schema = new SchemaDescriptor(1,
-            new Column[] {
-                new Column("key", NativeType.LONG, false)
-            },
-            new Column[] {
-                new Column("value", NativeType.LONG, false)
+                        if (evt.oldEntry() == null)  // Initial schema added.
+                            schemes.put(tblId, new SchemaRegistryImpl());
+                        else if (evt.newEntry() == null) // Table Dropped.
+                            schemes.remove(tblId);
+                        else //TODO: https://issues.apache.org/jira/browse/IGNITE-13752
+                            throw new SchemaRegistryException("Schema upgrade is not implemented yet.");
+                    }
+                    else {
+                        UUID tblId = UUID.fromString(keyTail.substring(0, verPos));
+
+                        final SchemaRegistryImpl reg = schemes.get(tblId);
+
+                        assert reg != null : "Table schema was not initialized or table has been dropped: " + tblId;
+
+                        reg.registerSchema((SchemaDescriptor)ByteUtils.fromBytes(evt.newEntry().value()));
+                    }
+                }
+
+                return true;
             }
+
+            /** {@inheritDoc} */
+            @Override public void onError(@NotNull Throwable e) {
+                LOG.error("Metastorage listener issue", e);
+            }
+        });
+    }
+
+    /**
+     * Unsubscribes a listener form the affinity calculation.
+     */
+    private void unsubscribeFromAssignmentCalculation() {

Review comment:
       unsubscribeFromAssignmentCalculation is never used.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite-3] sanpwc commented on a change in pull request #91: IGNITE-14077: Distributed schema manager.

Posted by GitBox <gi...@apache.org>.
sanpwc commented on a change in pull request #91:
URL: https://github.com/apache/ignite-3/pull/91#discussion_r632552429



##########
File path: modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaManager.java
##########
@@ -17,60 +17,310 @@
 
 package org.apache.ignite.internal.schema;
 
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
 import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
 import org.apache.ignite.configuration.internal.ConfigurationManager;
+import org.apache.ignite.configuration.schemas.table.ColumnTypeView;
+import org.apache.ignite.configuration.schemas.table.ColumnView;
+import org.apache.ignite.configuration.schemas.table.TableConfiguration;
+import org.apache.ignite.configuration.schemas.table.TableIndexConfiguration;
+import org.apache.ignite.configuration.schemas.table.TablesConfiguration;
+import org.apache.ignite.configuration.tree.NamedListView;
+import org.apache.ignite.internal.metastorage.MetaStorageManager;
+import org.apache.ignite.internal.schema.registry.SchemaRegistryException;
+import org.apache.ignite.internal.schema.registry.SchemaRegistryImpl;
+import org.apache.ignite.internal.util.ByteUtils;
+import org.apache.ignite.internal.vault.VaultManager;
+import org.apache.ignite.lang.ByteArray;
+import org.apache.ignite.lang.IgniteLogger;
+import org.apache.ignite.metastorage.common.Conditions;
+import org.apache.ignite.metastorage.common.Cursor;
+import org.apache.ignite.metastorage.common.Entry;
+import org.apache.ignite.metastorage.common.Key;
+import org.apache.ignite.metastorage.common.Operations;
+import org.apache.ignite.metastorage.common.WatchEvent;
+import org.apache.ignite.metastorage.common.WatchListener;
+import org.apache.ignite.schema.PrimaryIndex;
+import org.jetbrains.annotations.NotNull;
 
 /**
  * Schema Manager.
  */
-// TODO: IGNITE-14586 Remove @SuppressWarnings when implementation provided.
-@SuppressWarnings({"FieldCanBeLocal", "unused"}) public class SchemaManager {
-    /** Configuration manager in order to handle and listen schema specific configuration.*/
+public class SchemaManager {
+    /** The logger. */
+    private static final IgniteLogger LOG = IgniteLogger.forClass(SchemaManager.class);
+
+    /** Internal prefix for the metasorage. */
+    private static final String INTERNAL_PREFIX = "internal.tables.schema.";
+
+    /** Schema history item key suffix. */
+    protected static final String INTERNAL_VER_SUFFIX = ".ver.";
+
+    /** Configuration manager in order to handle and listen schema specific configuration. */
     private final ConfigurationManager configurationMgr;
 
+    /** Metastorage manager. */
+    private final MetaStorageManager metaStorageMgr;
+
+    /** Vault manager. */
+    private final VaultManager vaultManager;
+
+    /** Schema history subscription future. */
+    private CompletableFuture<Long> schemaHistorySubscriptionFut;
+
     /** Schema. */
-    private final SchemaDescriptor schema;
+    private final Map<UUID, SchemaRegistryImpl> schemes = new ConcurrentHashMap<>();
 
     /**
      * The constructor.
      *
      * @param configurationMgr Configuration manager.
+     * @param metaStorageMgr Metastorage manager.
+     * @param vaultManager Vault manager.
      */
-    public SchemaManager(ConfigurationManager configurationMgr) {
+    public SchemaManager(
+        ConfigurationManager configurationMgr,
+        MetaStorageManager metaStorageMgr,
+        VaultManager vaultManager
+    ) {
         this.configurationMgr = configurationMgr;
+        this.metaStorageMgr = metaStorageMgr;
+        this.vaultManager = vaultManager;
+
+        schemaHistorySubscriptionFut = metaStorageMgr.registerWatchByPrefix(new Key(INTERNAL_PREFIX), new WatchListener() {
+            /** {@inheritDoc} */
+            @Override public boolean onUpdate(@NotNull Iterable<WatchEvent> events) {
+                for (WatchEvent evt : events) {
+                    String keyTail = evt.newEntry().key().toString().substring(INTERNAL_PREFIX.length() - 1);
+
+                    int verPos = keyTail.indexOf(INTERNAL_VER_SUFFIX);
+
+                    // Last table schema version changed.
+                    if (verPos == -1) {
+                        UUID tblId = UUID.fromString(keyTail);
 
-        this.schema = new SchemaDescriptor(1,
-            new Column[] {
-                new Column("key", NativeType.LONG, false)
-            },
-            new Column[] {
-                new Column("value", NativeType.LONG, false)
+                        if (evt.oldEntry() == null)  // Initial schema added.
+                            schemes.put(tblId, new SchemaRegistryImpl());
+                        else if (evt.newEntry() == null) // Table Dropped.
+                            schemes.remove(tblId);
+                        else //TODO: https://issues.apache.org/jira/browse/IGNITE-13752
+                            throw new SchemaRegistryException("Schema upgrade is not implemented yet.");
+                    }
+                    else {
+                        UUID tblId = UUID.fromString(keyTail.substring(0, verPos));
+
+                        final SchemaRegistryImpl reg = schemes.get(tblId);
+
+                        assert reg != null : "Table schema was not initialized or table has been dropped: " + tblId;
+
+                        reg.registerSchema((SchemaDescriptor)ByteUtils.fromBytes(evt.newEntry().value()));
+                    }
+                }
+
+                return true;
             }
+
+            /** {@inheritDoc} */
+            @Override public void onError(@NotNull Throwable e) {
+                LOG.error("Metastorage listener issue", e);
+            }
+        });
+    }
+
+    /**
+     * Unsubscribes a listener form the affinity calculation.
+     */
+    private void unsubscribeFromAssignmentCalculation() {
+        if (schemaHistorySubscriptionFut == null)
+            return;
+
+        try {
+            Long subscriptionId = schemaHistorySubscriptionFut.get();
+
+            metaStorageMgr.unregisterWatch(subscriptionId);
+
+            schemaHistorySubscriptionFut = null;
+        }
+        catch (InterruptedException | ExecutionException e) {
+            LOG.error("Couldn't unsubscribe for Metastorage updates", e);
+        }
+    }
+
+    /**
+     * Reads current schema configuration, build schema descriptor,
+     * then add it to history rise up table schema version.
+     *
+     * @param tblId Table id.
+     * @param tblName Table name.
+     * @return Operation future.
+     */
+    public CompletableFuture<Boolean> initNewSchemaForTable(UUID tblId, String tblName) {
+        return vaultManager.get(ByteArray.fromString(INTERNAL_PREFIX + tblId)).
+            thenCompose(entry -> {
+                TableConfiguration tblConfig = configurationMgr.configurationRegistry().getConfiguration(TablesConfiguration.KEY).tables().get(tblName);
+                var key = new Key(INTERNAL_PREFIX + tblId);
+
+                int ver = entry.empty() ? 1 : (int)ByteUtils.bytesToLong(entry.value(), 0) + 1;
+
+                final SchemaDescriptor desc = createSchemaDescriptor(tblConfig, ver);
+
+                return metaStorageMgr.invoke(
+                    Conditions.key(key).value().eq(entry.value()), // Won't to rewrite if the version goes ahead.
+                    List.of(
+                        Operations.put(key, ByteUtils.longToBytes(ver)),
+                        Operations.put(new Key(INTERNAL_PREFIX + tblId + INTERNAL_VER_SUFFIX + ver), ByteUtils.toBytes(desc))
+                    ),
+                    List.of(
+                        Operations.noop(),
+                        Operations.noop()
+                    ));
+            });
+    }
+
+    /**
+     * Creates schema descriptor from config.
+     *
+     * @param tblConfig Table config.
+     * @param ver Schema version.
+     * @return Schema descriptor.
+     */
+    private SchemaDescriptor createSchemaDescriptor(TableConfiguration tblConfig, int ver) {
+        final TableIndexConfiguration pkCfg = tblConfig.indices().get(PrimaryIndex.PRIMARY_KEY_INDEX_NAME);
+
+        assert pkCfg != null;
+
+        final Set<String> keyColNames = Stream.of(pkCfg.colNames().value()).collect(Collectors.toSet());
+        final NamedListView<ColumnView> cols = tblConfig.columns().value();
+
+        final ArrayList<Column> keyCols = new ArrayList<>(keyColNames.size());
+        final ArrayList<Column> valCols = new ArrayList<>(cols.size() - keyColNames.size());
+
+        cols.namedListKeys().stream()
+            .map(cols::get)
+            .map(col -> new Column(col.name(), createType(col.type()), col.nullable()))
+            .forEach(c -> (keyColNames.contains(c.name()) ? keyCols : valCols).add(c));
+
+        return new SchemaDescriptor(
+            ver,
+            keyCols.toArray(Column[]::new),
+            pkCfg.affinityColumns().value(),
+            valCols.toArray(Column[]::new)
         );
     }
 
     /**
-     * Gets a current schema for the table specified.
+     * Create type from config.
      *
-     * @param tableId Table id.
-     * @return Schema.
+     * @param type Type view.
+     * @return Native type.
      */
-    public SchemaDescriptor schema(UUID tableId) {
-        return schema;
+    private NativeType createType(ColumnTypeView type) {
+        switch (type.type().toLowerCase()) {
+            case "byte":
+                return NativeType.BYTE;
+            case "short":
+                return NativeType.SHORT;
+            case "int":
+                return NativeType.INTEGER;
+            case "long":
+                return NativeType.LONG;
+            case "float":
+                return NativeType.FLOAT;
+            case "double":
+                return NativeType.DOUBLE;
+            case "uuid":
+                return NativeType.UUID;
+            case "bitmask":
+                return Bitmask.of(type.length());
+            case "string":
+                return NativeType.STRING;
+            case "bytes":
+                return NativeType.BYTES;
+
+            default:
+                throw new IllegalStateException("Unsupported column type: " + type.type());
+        }
     }
 
     /**
-     * Gets a schema for specific version.
+     * Compares schemas.
      *
+     * @param expected Expected schema.
+     * @param actual Actual schema.
+     * @return {@code True} if schemas are equal, {@code false} otherwise.
+     */
+    public static boolean equalSchemas(SchemaDescriptor expected, SchemaDescriptor actual) {
+        if (expected.keyColumns().length() != actual.keyColumns().length() ||
+            expected.valueColumns().length() != actual.valueColumns().length())
+            return false;
+
+        for (int i = 0; i < expected.length(); i++) {
+            if (!expected.column(i).equals(actual.column(i)))
+                return false;
+        }
+
+        return true;
+    }
+
+    /**
      * @param tableId Table id.
-     * @param ver Schema version.
-     * @return Schema.
+     * @return Schema registry for the table.
+     */
+    public SchemaRegistry schemaRegistryForTable(UUID tableId) {
+        final SchemaRegistry reg = schemes.get(tableId);
+
+        if (reg == null)
+            throw new SchemaRegistryException("No schema was ever registeref for the table: " + tableId);
+
+        return reg;
+    }
+
+    /**
+     * Registers new schema.
+     *
+     * @param tableId Table identifier.
+     * @param desc Schema descriptor.
      */
-    public SchemaDescriptor schema(UUID tableId, long ver) {
-        assert ver >= 0;
+    public CompletableFuture<Boolean> registerSchema(UUID tableId, SchemaDescriptor desc) {

Review comment:
       registerSchema is never used.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite-3] sanpwc commented on a change in pull request #91: IGNITE-14077: Distributed schema manager.

Posted by GitBox <gi...@apache.org>.
sanpwc commented on a change in pull request #91:
URL: https://github.com/apache/ignite-3/pull/91#discussion_r632527096



##########
File path: modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaDescriptor.java
##########
@@ -46,6 +46,19 @@
      * @param valCols Value columns.
      */
     public SchemaDescriptor(int ver, Column[] keyCols, Column[] valCols) {
+        this(ver, keyCols, null, valCols);
+    }
+
+    /**
+     * @param ver Schema version.
+     * @param keyCols Key columns.
+     * @param affCols Affinity column names.
+     * @param valCols Value columns.
+     */
+    public SchemaDescriptor(int ver, Column[] keyCols, @Nullable String[] affCols, Column[] valCols) {

Review comment:
       affCols parameter is never used.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite-3] sanpwc commented on a change in pull request #91: IGNITE-14077: Distributed schema manager.

Posted by GitBox <gi...@apache.org>.
sanpwc commented on a change in pull request #91:
URL: https://github.com/apache/ignite-3/pull/91#discussion_r632529060



##########
File path: modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaManager.java
##########
@@ -17,60 +17,310 @@
 
 package org.apache.ignite.internal.schema;
 
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
 import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
 import org.apache.ignite.configuration.internal.ConfigurationManager;
+import org.apache.ignite.configuration.schemas.table.ColumnTypeView;
+import org.apache.ignite.configuration.schemas.table.ColumnView;
+import org.apache.ignite.configuration.schemas.table.TableConfiguration;
+import org.apache.ignite.configuration.schemas.table.TableIndexConfiguration;
+import org.apache.ignite.configuration.schemas.table.TablesConfiguration;
+import org.apache.ignite.configuration.tree.NamedListView;
+import org.apache.ignite.internal.metastorage.MetaStorageManager;
+import org.apache.ignite.internal.schema.registry.SchemaRegistryException;
+import org.apache.ignite.internal.schema.registry.SchemaRegistryImpl;
+import org.apache.ignite.internal.util.ByteUtils;
+import org.apache.ignite.internal.vault.VaultManager;
+import org.apache.ignite.lang.ByteArray;
+import org.apache.ignite.lang.IgniteLogger;
+import org.apache.ignite.metastorage.common.Conditions;
+import org.apache.ignite.metastorage.common.Cursor;
+import org.apache.ignite.metastorage.common.Entry;
+import org.apache.ignite.metastorage.common.Key;
+import org.apache.ignite.metastorage.common.Operations;
+import org.apache.ignite.metastorage.common.WatchEvent;
+import org.apache.ignite.metastorage.common.WatchListener;
+import org.apache.ignite.schema.PrimaryIndex;
+import org.jetbrains.annotations.NotNull;
 
 /**
  * Schema Manager.
  */
-// TODO: IGNITE-14586 Remove @SuppressWarnings when implementation provided.
-@SuppressWarnings({"FieldCanBeLocal", "unused"}) public class SchemaManager {
-    /** Configuration manager in order to handle and listen schema specific configuration.*/
+public class SchemaManager {
+    /** The logger. */
+    private static final IgniteLogger LOG = IgniteLogger.forClass(SchemaManager.class);
+
+    /** Internal prefix for the metasorage. */
+    private static final String INTERNAL_PREFIX = "internal.tables.schema.";
+
+    /** Schema history item key suffix. */
+    protected static final String INTERNAL_VER_SUFFIX = ".ver.";

Review comment:
       Seems that private is enough for now.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite-3] sanpwc commented on a change in pull request #91: IGNITE-14077: Distributed schema manager.

Posted by GitBox <gi...@apache.org>.
sanpwc commented on a change in pull request #91:
URL: https://github.com/apache/ignite-3/pull/91#discussion_r632554778



##########
File path: modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaManager.java
##########
@@ -17,60 +17,310 @@
 
 package org.apache.ignite.internal.schema;
 
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
 import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
 import org.apache.ignite.configuration.internal.ConfigurationManager;
+import org.apache.ignite.configuration.schemas.table.ColumnTypeView;
+import org.apache.ignite.configuration.schemas.table.ColumnView;
+import org.apache.ignite.configuration.schemas.table.TableConfiguration;
+import org.apache.ignite.configuration.schemas.table.TableIndexConfiguration;
+import org.apache.ignite.configuration.schemas.table.TablesConfiguration;
+import org.apache.ignite.configuration.tree.NamedListView;
+import org.apache.ignite.internal.metastorage.MetaStorageManager;
+import org.apache.ignite.internal.schema.registry.SchemaRegistryException;
+import org.apache.ignite.internal.schema.registry.SchemaRegistryImpl;
+import org.apache.ignite.internal.util.ByteUtils;
+import org.apache.ignite.internal.vault.VaultManager;
+import org.apache.ignite.lang.ByteArray;
+import org.apache.ignite.lang.IgniteLogger;
+import org.apache.ignite.metastorage.common.Conditions;
+import org.apache.ignite.metastorage.common.Cursor;
+import org.apache.ignite.metastorage.common.Entry;
+import org.apache.ignite.metastorage.common.Key;
+import org.apache.ignite.metastorage.common.Operations;
+import org.apache.ignite.metastorage.common.WatchEvent;
+import org.apache.ignite.metastorage.common.WatchListener;
+import org.apache.ignite.schema.PrimaryIndex;
+import org.jetbrains.annotations.NotNull;
 
 /**
  * Schema Manager.
  */
-// TODO: IGNITE-14586 Remove @SuppressWarnings when implementation provided.
-@SuppressWarnings({"FieldCanBeLocal", "unused"}) public class SchemaManager {
-    /** Configuration manager in order to handle and listen schema specific configuration.*/
+public class SchemaManager {
+    /** The logger. */
+    private static final IgniteLogger LOG = IgniteLogger.forClass(SchemaManager.class);
+
+    /** Internal prefix for the metasorage. */
+    private static final String INTERNAL_PREFIX = "internal.tables.schema.";
+
+    /** Schema history item key suffix. */
+    protected static final String INTERNAL_VER_SUFFIX = ".ver.";
+
+    /** Configuration manager in order to handle and listen schema specific configuration. */
     private final ConfigurationManager configurationMgr;
 
+    /** Metastorage manager. */
+    private final MetaStorageManager metaStorageMgr;
+
+    /** Vault manager. */
+    private final VaultManager vaultManager;
+
+    /** Schema history subscription future. */
+    private CompletableFuture<Long> schemaHistorySubscriptionFut;
+
     /** Schema. */
-    private final SchemaDescriptor schema;
+    private final Map<UUID, SchemaRegistryImpl> schemes = new ConcurrentHashMap<>();
 
     /**
      * The constructor.
      *
      * @param configurationMgr Configuration manager.
+     * @param metaStorageMgr Metastorage manager.
+     * @param vaultManager Vault manager.
      */
-    public SchemaManager(ConfigurationManager configurationMgr) {
+    public SchemaManager(
+        ConfigurationManager configurationMgr,
+        MetaStorageManager metaStorageMgr,
+        VaultManager vaultManager
+    ) {
         this.configurationMgr = configurationMgr;
+        this.metaStorageMgr = metaStorageMgr;
+        this.vaultManager = vaultManager;
+
+        schemaHistorySubscriptionFut = metaStorageMgr.registerWatchByPrefix(new Key(INTERNAL_PREFIX), new WatchListener() {
+            /** {@inheritDoc} */
+            @Override public boolean onUpdate(@NotNull Iterable<WatchEvent> events) {
+                for (WatchEvent evt : events) {
+                    String keyTail = evt.newEntry().key().toString().substring(INTERNAL_PREFIX.length() - 1);
+
+                    int verPos = keyTail.indexOf(INTERNAL_VER_SUFFIX);
+
+                    // Last table schema version changed.
+                    if (verPos == -1) {
+                        UUID tblId = UUID.fromString(keyTail);
 
-        this.schema = new SchemaDescriptor(1,
-            new Column[] {
-                new Column("key", NativeType.LONG, false)
-            },
-            new Column[] {
-                new Column("value", NativeType.LONG, false)
+                        if (evt.oldEntry() == null)  // Initial schema added.
+                            schemes.put(tblId, new SchemaRegistryImpl());
+                        else if (evt.newEntry() == null) // Table Dropped.
+                            schemes.remove(tblId);
+                        else //TODO: https://issues.apache.org/jira/browse/IGNITE-13752
+                            throw new SchemaRegistryException("Schema upgrade is not implemented yet.");
+                    }
+                    else {
+                        UUID tblId = UUID.fromString(keyTail.substring(0, verPos));
+
+                        final SchemaRegistryImpl reg = schemes.get(tblId);
+
+                        assert reg != null : "Table schema was not initialized or table has been dropped: " + tblId;
+
+                        reg.registerSchema((SchemaDescriptor)ByteUtils.fromBytes(evt.newEntry().value()));
+                    }
+                }
+
+                return true;
             }
+
+            /** {@inheritDoc} */
+            @Override public void onError(@NotNull Throwable e) {
+                LOG.error("Metastorage listener issue", e);
+            }
+        });
+    }
+
+    /**
+     * Unsubscribes a listener form the affinity calculation.
+     */
+    private void unsubscribeFromAssignmentCalculation() {
+        if (schemaHistorySubscriptionFut == null)
+            return;
+
+        try {
+            Long subscriptionId = schemaHistorySubscriptionFut.get();
+
+            metaStorageMgr.unregisterWatch(subscriptionId);
+
+            schemaHistorySubscriptionFut = null;
+        }
+        catch (InterruptedException | ExecutionException e) {
+            LOG.error("Couldn't unsubscribe for Metastorage updates", e);
+        }
+    }
+
+    /**
+     * Reads current schema configuration, build schema descriptor,
+     * then add it to history rise up table schema version.
+     *
+     * @param tblId Table id.
+     * @param tblName Table name.
+     * @return Operation future.
+     */
+    public CompletableFuture<Boolean> initNewSchemaForTable(UUID tblId, String tblName) {
+        return vaultManager.get(ByteArray.fromString(INTERNAL_PREFIX + tblId)).
+            thenCompose(entry -> {
+                TableConfiguration tblConfig = configurationMgr.configurationRegistry().getConfiguration(TablesConfiguration.KEY).tables().get(tblName);
+                var key = new Key(INTERNAL_PREFIX + tblId);
+
+                int ver = entry.empty() ? 1 : (int)ByteUtils.bytesToLong(entry.value(), 0) + 1;
+
+                final SchemaDescriptor desc = createSchemaDescriptor(tblConfig, ver);
+
+                return metaStorageMgr.invoke(
+                    Conditions.key(key).value().eq(entry.value()), // Won't to rewrite if the version goes ahead.
+                    List.of(
+                        Operations.put(key, ByteUtils.longToBytes(ver)),
+                        Operations.put(new Key(INTERNAL_PREFIX + tblId + INTERNAL_VER_SUFFIX + ver), ByteUtils.toBytes(desc))
+                    ),
+                    List.of(
+                        Operations.noop(),
+                        Operations.noop()
+                    ));
+            });
+    }
+
+    /**
+     * Creates schema descriptor from config.
+     *
+     * @param tblConfig Table config.
+     * @param ver Schema version.
+     * @return Schema descriptor.
+     */
+    private SchemaDescriptor createSchemaDescriptor(TableConfiguration tblConfig, int ver) {
+        final TableIndexConfiguration pkCfg = tblConfig.indices().get(PrimaryIndex.PRIMARY_KEY_INDEX_NAME);
+
+        assert pkCfg != null;
+
+        final Set<String> keyColNames = Stream.of(pkCfg.colNames().value()).collect(Collectors.toSet());
+        final NamedListView<ColumnView> cols = tblConfig.columns().value();
+
+        final ArrayList<Column> keyCols = new ArrayList<>(keyColNames.size());
+        final ArrayList<Column> valCols = new ArrayList<>(cols.size() - keyColNames.size());
+
+        cols.namedListKeys().stream()
+            .map(cols::get)
+            .map(col -> new Column(col.name(), createType(col.type()), col.nullable()))
+            .forEach(c -> (keyColNames.contains(c.name()) ? keyCols : valCols).add(c));
+
+        return new SchemaDescriptor(
+            ver,
+            keyCols.toArray(Column[]::new),
+            pkCfg.affinityColumns().value(),
+            valCols.toArray(Column[]::new)
         );
     }
 
     /**
-     * Gets a current schema for the table specified.
+     * Create type from config.
      *
-     * @param tableId Table id.
-     * @return Schema.
+     * @param type Type view.
+     * @return Native type.
      */
-    public SchemaDescriptor schema(UUID tableId) {
-        return schema;
+    private NativeType createType(ColumnTypeView type) {
+        switch (type.type().toLowerCase()) {
+            case "byte":
+                return NativeType.BYTE;
+            case "short":
+                return NativeType.SHORT;
+            case "int":
+                return NativeType.INTEGER;
+            case "long":
+                return NativeType.LONG;
+            case "float":
+                return NativeType.FLOAT;
+            case "double":
+                return NativeType.DOUBLE;
+            case "uuid":
+                return NativeType.UUID;
+            case "bitmask":
+                return Bitmask.of(type.length());
+            case "string":
+                return NativeType.STRING;
+            case "bytes":
+                return NativeType.BYTES;
+
+            default:
+                throw new IllegalStateException("Unsupported column type: " + type.type());
+        }
     }
 
     /**
-     * Gets a schema for specific version.
+     * Compares schemas.
      *
+     * @param expected Expected schema.
+     * @param actual Actual schema.
+     * @return {@code True} if schemas are equal, {@code false} otherwise.
+     */
+    public static boolean equalSchemas(SchemaDescriptor expected, SchemaDescriptor actual) {
+        if (expected.keyColumns().length() != actual.keyColumns().length() ||
+            expected.valueColumns().length() != actual.valueColumns().length())
+            return false;
+
+        for (int i = 0; i < expected.length(); i++) {
+            if (!expected.column(i).equals(actual.column(i)))
+                return false;
+        }
+
+        return true;
+    }
+
+    /**
      * @param tableId Table id.
-     * @param ver Schema version.
-     * @return Schema.
+     * @return Schema registry for the table.
+     */
+    public SchemaRegistry schemaRegistryForTable(UUID tableId) {
+        final SchemaRegistry reg = schemes.get(tableId);
+
+        if (reg == null)
+            throw new SchemaRegistryException("No schema was ever registeref for the table: " + tableId);
+
+        return reg;
+    }
+
+    /**
+     * Registers new schema.
+     *
+     * @param tableId Table identifier.
+     * @param desc Schema descriptor.
      */
-    public SchemaDescriptor schema(UUID tableId, long ver) {
-        assert ver >= 0;
+    public CompletableFuture<Boolean> registerSchema(UUID tableId, SchemaDescriptor desc) {
+        int schemaVersion = desc.version();
+
+        final Key key = new Key(INTERNAL_PREFIX + tableId + '.' + schemaVersion);
+
+        return metaStorageMgr.invoke(
+            Conditions.key(key).value().eq(null),
+            Operations.put(key, ByteUtils.toBytes(desc)), //TODO: IGNITE-14679 Serialize schema.
+            Operations.noop());
+    }
+
+    /**
+     * Unregistered all schemas associated with a table identifier.
+     *
+     * @param tableId Table identifier.
+     * @return Future which will complete when all versions of schema will be unregistered.
+     */
+    public CompletableFuture<Boolean> unregisterSchemas(UUID tableId) {
+        List<CompletableFuture<?>> futs = new ArrayList<>();
+
+        String schemaPrefix = INTERNAL_PREFIX + tableId + INTERNAL_VER_SUFFIX;
 
-        assert schema.version() == ver;
+        try (Cursor<Entry> cursor = metaStorageMgr.range(new Key(schemaPrefix), null)) {

Review comment:
       Is it safe to use range without upperBoundRevision?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite-3] sanpwc commented on a change in pull request #91: IGNITE-14077: Distributed schema manager.

Posted by GitBox <gi...@apache.org>.
sanpwc commented on a change in pull request #91:
URL: https://github.com/apache/ignite-3/pull/91#discussion_r632550277



##########
File path: modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaManager.java
##########
@@ -17,60 +17,310 @@
 
 package org.apache.ignite.internal.schema;
 
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
 import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
 import org.apache.ignite.configuration.internal.ConfigurationManager;
+import org.apache.ignite.configuration.schemas.table.ColumnTypeView;
+import org.apache.ignite.configuration.schemas.table.ColumnView;
+import org.apache.ignite.configuration.schemas.table.TableConfiguration;
+import org.apache.ignite.configuration.schemas.table.TableIndexConfiguration;
+import org.apache.ignite.configuration.schemas.table.TablesConfiguration;
+import org.apache.ignite.configuration.tree.NamedListView;
+import org.apache.ignite.internal.metastorage.MetaStorageManager;
+import org.apache.ignite.internal.schema.registry.SchemaRegistryException;
+import org.apache.ignite.internal.schema.registry.SchemaRegistryImpl;
+import org.apache.ignite.internal.util.ByteUtils;
+import org.apache.ignite.internal.vault.VaultManager;
+import org.apache.ignite.lang.ByteArray;
+import org.apache.ignite.lang.IgniteLogger;
+import org.apache.ignite.metastorage.common.Conditions;
+import org.apache.ignite.metastorage.common.Cursor;
+import org.apache.ignite.metastorage.common.Entry;
+import org.apache.ignite.metastorage.common.Key;
+import org.apache.ignite.metastorage.common.Operations;
+import org.apache.ignite.metastorage.common.WatchEvent;
+import org.apache.ignite.metastorage.common.WatchListener;
+import org.apache.ignite.schema.PrimaryIndex;
+import org.jetbrains.annotations.NotNull;
 
 /**
  * Schema Manager.
  */
-// TODO: IGNITE-14586 Remove @SuppressWarnings when implementation provided.
-@SuppressWarnings({"FieldCanBeLocal", "unused"}) public class SchemaManager {
-    /** Configuration manager in order to handle and listen schema specific configuration.*/
+public class SchemaManager {
+    /** The logger. */
+    private static final IgniteLogger LOG = IgniteLogger.forClass(SchemaManager.class);
+
+    /** Internal prefix for the metasorage. */
+    private static final String INTERNAL_PREFIX = "internal.tables.schema.";
+
+    /** Schema history item key suffix. */
+    protected static final String INTERNAL_VER_SUFFIX = ".ver.";
+
+    /** Configuration manager in order to handle and listen schema specific configuration. */
     private final ConfigurationManager configurationMgr;
 
+    /** Metastorage manager. */
+    private final MetaStorageManager metaStorageMgr;
+
+    /** Vault manager. */
+    private final VaultManager vaultManager;
+
+    /** Schema history subscription future. */
+    private CompletableFuture<Long> schemaHistorySubscriptionFut;
+
     /** Schema. */
-    private final SchemaDescriptor schema;
+    private final Map<UUID, SchemaRegistryImpl> schemes = new ConcurrentHashMap<>();
 
     /**
      * The constructor.
      *
      * @param configurationMgr Configuration manager.
+     * @param metaStorageMgr Metastorage manager.
+     * @param vaultManager Vault manager.
      */
-    public SchemaManager(ConfigurationManager configurationMgr) {
+    public SchemaManager(
+        ConfigurationManager configurationMgr,
+        MetaStorageManager metaStorageMgr,
+        VaultManager vaultManager
+    ) {
         this.configurationMgr = configurationMgr;
+        this.metaStorageMgr = metaStorageMgr;
+        this.vaultManager = vaultManager;
+
+        schemaHistorySubscriptionFut = metaStorageMgr.registerWatchByPrefix(new Key(INTERNAL_PREFIX), new WatchListener() {
+            /** {@inheritDoc} */
+            @Override public boolean onUpdate(@NotNull Iterable<WatchEvent> events) {
+                for (WatchEvent evt : events) {
+                    String keyTail = evt.newEntry().key().toString().substring(INTERNAL_PREFIX.length() - 1);
+
+                    int verPos = keyTail.indexOf(INTERNAL_VER_SUFFIX);
+
+                    // Last table schema version changed.
+                    if (verPos == -1) {
+                        UUID tblId = UUID.fromString(keyTail);
 
-        this.schema = new SchemaDescriptor(1,
-            new Column[] {
-                new Column("key", NativeType.LONG, false)
-            },
-            new Column[] {
-                new Column("value", NativeType.LONG, false)
+                        if (evt.oldEntry() == null)  // Initial schema added.
+                            schemes.put(tblId, new SchemaRegistryImpl());
+                        else if (evt.newEntry() == null) // Table Dropped.
+                            schemes.remove(tblId);
+                        else //TODO: https://issues.apache.org/jira/browse/IGNITE-13752
+                            throw new SchemaRegistryException("Schema upgrade is not implemented yet.");
+                    }
+                    else {
+                        UUID tblId = UUID.fromString(keyTail.substring(0, verPos));
+
+                        final SchemaRegistryImpl reg = schemes.get(tblId);
+
+                        assert reg != null : "Table schema was not initialized or table has been dropped: " + tblId;
+
+                        reg.registerSchema((SchemaDescriptor)ByteUtils.fromBytes(evt.newEntry().value()));
+                    }
+                }
+
+                return true;
             }
+
+            /** {@inheritDoc} */
+            @Override public void onError(@NotNull Throwable e) {
+                LOG.error("Metastorage listener issue", e);
+            }
+        });
+    }
+
+    /**
+     * Unsubscribes a listener form the affinity calculation.
+     */
+    private void unsubscribeFromAssignmentCalculation() {
+        if (schemaHistorySubscriptionFut == null)
+            return;
+
+        try {
+            Long subscriptionId = schemaHistorySubscriptionFut.get();
+
+            metaStorageMgr.unregisterWatch(subscriptionId);
+
+            schemaHistorySubscriptionFut = null;
+        }
+        catch (InterruptedException | ExecutionException e) {
+            LOG.error("Couldn't unsubscribe for Metastorage updates", e);
+        }
+    }
+
+    /**
+     * Reads current schema configuration, build schema descriptor,
+     * then add it to history rise up table schema version.
+     *
+     * @param tblId Table id.
+     * @param tblName Table name.
+     * @return Operation future.
+     */
+    public CompletableFuture<Boolean> initNewSchemaForTable(UUID tblId, String tblName) {
+        return vaultManager.get(ByteArray.fromString(INTERNAL_PREFIX + tblId)).
+            thenCompose(entry -> {
+                TableConfiguration tblConfig = configurationMgr.configurationRegistry().getConfiguration(TablesConfiguration.KEY).tables().get(tblName);
+                var key = new Key(INTERNAL_PREFIX + tblId);
+
+                int ver = entry.empty() ? 1 : (int)ByteUtils.bytesToLong(entry.value(), 0) + 1;
+
+                final SchemaDescriptor desc = createSchemaDescriptor(tblConfig, ver);
+
+                return metaStorageMgr.invoke(
+                    Conditions.key(key).value().eq(entry.value()), // Won't to rewrite if the version goes ahead.
+                    List.of(
+                        Operations.put(key, ByteUtils.longToBytes(ver)),
+                        Operations.put(new Key(INTERNAL_PREFIX + tblId + INTERNAL_VER_SUFFIX + ver), ByteUtils.toBytes(desc))
+                    ),
+                    List.of(
+                        Operations.noop(),

Review comment:
       One Operations.noop() is enough, it's not necessary to have same amount of failure operations as success ones.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite-3] vldpyatkov commented on a change in pull request #91: IGNITE-14077: Distributed schema manager.

Posted by GitBox <gi...@apache.org>.
vldpyatkov commented on a change in pull request #91:
URL: https://github.com/apache/ignite-3/pull/91#discussion_r633708364



##########
File path: modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaManager.java
##########
@@ -17,60 +17,268 @@
 
 package org.apache.ignite.internal.schema;
 
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
 import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
 import org.apache.ignite.configuration.internal.ConfigurationManager;
+import org.apache.ignite.configuration.schemas.table.ColumnTypeView;
+import org.apache.ignite.configuration.schemas.table.ColumnView;
+import org.apache.ignite.configuration.schemas.table.TableConfiguration;
+import org.apache.ignite.configuration.schemas.table.TableIndexConfiguration;
+import org.apache.ignite.configuration.schemas.table.TablesConfiguration;
+import org.apache.ignite.configuration.tree.NamedListView;
+import org.apache.ignite.internal.metastorage.MetaStorageManager;
+import org.apache.ignite.internal.schema.registry.SchemaRegistryException;
+import org.apache.ignite.internal.schema.registry.SchemaRegistryImpl;
+import org.apache.ignite.internal.util.ByteUtils;
+import org.apache.ignite.internal.vault.VaultManager;
+import org.apache.ignite.lang.ByteArray;
+import org.apache.ignite.lang.IgniteLogger;
+import org.apache.ignite.metastorage.common.Conditions;
+import org.apache.ignite.metastorage.common.Cursor;
+import org.apache.ignite.metastorage.common.Entry;
+import org.apache.ignite.metastorage.common.Key;
+import org.apache.ignite.metastorage.common.Operations;
+import org.apache.ignite.metastorage.common.WatchEvent;
+import org.apache.ignite.metastorage.common.WatchListener;
+import org.apache.ignite.schema.PrimaryIndex;
+import org.jetbrains.annotations.NotNull;
 
 /**
  * Schema Manager.
  */
-// TODO: IGNITE-14586 Remove @SuppressWarnings when implementation provided.
-@SuppressWarnings({"FieldCanBeLocal", "unused"}) public class SchemaManager {
-    /** Configuration manager in order to handle and listen schema specific configuration.*/
+public class SchemaManager {
+    /** The logger. */
+    private static final IgniteLogger LOG = IgniteLogger.forClass(SchemaManager.class);
+
+    /** Internal prefix for the metasorage. */
+    private static final String INTERNAL_PREFIX = "internal.tables.schema.";
+
+    /** Schema history item key suffix. */
+    private static final String INTERNAL_VER_SUFFIX = ".ver.";
+
+    /** Configuration manager in order to handle and listen schema specific configuration. */
     private final ConfigurationManager configurationMgr;
 
+    /** Metastorage manager. */
+    private final MetaStorageManager metaStorageMgr;
+
+    /** Vault manager. */
+    private final VaultManager vaultMgr;
+
     /** Schema. */
-    private final SchemaDescriptor schema;
+    private final Map<UUID, SchemaRegistryImpl> schemes = new ConcurrentHashMap<>();
 
     /**
      * The constructor.
      *
      * @param configurationMgr Configuration manager.
+     * @param metaStorageMgr Metastorage manager.
+     * @param vaultMgr Vault manager.
      */
-    public SchemaManager(ConfigurationManager configurationMgr) {
+    public SchemaManager(
+        ConfigurationManager configurationMgr,
+        MetaStorageManager metaStorageMgr,
+        VaultManager vaultMgr
+    ) {
         this.configurationMgr = configurationMgr;
+        this.metaStorageMgr = metaStorageMgr;
+        this.vaultMgr = vaultMgr;
+
+        metaStorageMgr.registerWatchByPrefix(new Key(INTERNAL_PREFIX), new WatchListener() {
+            /** {@inheritDoc} */
+            @Override public boolean onUpdate(@NotNull Iterable<WatchEvent> events) {

Review comment:
       We don't write a comment for a method in the anonymous classes.

##########
File path: modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaManager.java
##########
@@ -17,60 +17,268 @@
 
 package org.apache.ignite.internal.schema;
 
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
 import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
 import org.apache.ignite.configuration.internal.ConfigurationManager;
+import org.apache.ignite.configuration.schemas.table.ColumnTypeView;
+import org.apache.ignite.configuration.schemas.table.ColumnView;
+import org.apache.ignite.configuration.schemas.table.TableConfiguration;
+import org.apache.ignite.configuration.schemas.table.TableIndexConfiguration;
+import org.apache.ignite.configuration.schemas.table.TablesConfiguration;
+import org.apache.ignite.configuration.tree.NamedListView;
+import org.apache.ignite.internal.metastorage.MetaStorageManager;
+import org.apache.ignite.internal.schema.registry.SchemaRegistryException;
+import org.apache.ignite.internal.schema.registry.SchemaRegistryImpl;
+import org.apache.ignite.internal.util.ByteUtils;
+import org.apache.ignite.internal.vault.VaultManager;
+import org.apache.ignite.lang.ByteArray;
+import org.apache.ignite.lang.IgniteLogger;
+import org.apache.ignite.metastorage.common.Conditions;
+import org.apache.ignite.metastorage.common.Cursor;
+import org.apache.ignite.metastorage.common.Entry;
+import org.apache.ignite.metastorage.common.Key;
+import org.apache.ignite.metastorage.common.Operations;
+import org.apache.ignite.metastorage.common.WatchEvent;
+import org.apache.ignite.metastorage.common.WatchListener;
+import org.apache.ignite.schema.PrimaryIndex;
+import org.jetbrains.annotations.NotNull;
 
 /**
  * Schema Manager.
  */
-// TODO: IGNITE-14586 Remove @SuppressWarnings when implementation provided.
-@SuppressWarnings({"FieldCanBeLocal", "unused"}) public class SchemaManager {
-    /** Configuration manager in order to handle and listen schema specific configuration.*/
+public class SchemaManager {
+    /** The logger. */
+    private static final IgniteLogger LOG = IgniteLogger.forClass(SchemaManager.class);
+
+    /** Internal prefix for the metasorage. */
+    private static final String INTERNAL_PREFIX = "internal.tables.schema.";
+
+    /** Schema history item key suffix. */
+    private static final String INTERNAL_VER_SUFFIX = ".ver.";
+
+    /** Configuration manager in order to handle and listen schema specific configuration. */
     private final ConfigurationManager configurationMgr;
 
+    /** Metastorage manager. */
+    private final MetaStorageManager metaStorageMgr;
+
+    /** Vault manager. */
+    private final VaultManager vaultMgr;
+
     /** Schema. */
-    private final SchemaDescriptor schema;
+    private final Map<UUID, SchemaRegistryImpl> schemes = new ConcurrentHashMap<>();
 
     /**
      * The constructor.
      *
      * @param configurationMgr Configuration manager.
+     * @param metaStorageMgr Metastorage manager.
+     * @param vaultMgr Vault manager.
      */
-    public SchemaManager(ConfigurationManager configurationMgr) {
+    public SchemaManager(
+        ConfigurationManager configurationMgr,
+        MetaStorageManager metaStorageMgr,
+        VaultManager vaultMgr
+    ) {
         this.configurationMgr = configurationMgr;
+        this.metaStorageMgr = metaStorageMgr;
+        this.vaultMgr = vaultMgr;
+
+        metaStorageMgr.registerWatchByPrefix(new Key(INTERNAL_PREFIX), new WatchListener() {
+            /** {@inheritDoc} */
+            @Override public boolean onUpdate(@NotNull Iterable<WatchEvent> events) {
+                for (WatchEvent evt : events) {
+                    String keyTail = evt.newEntry().key().toString().substring(INTERNAL_PREFIX.length() - 1);
+
+                    int verPos = keyTail.indexOf(INTERNAL_VER_SUFFIX);
 
-        this.schema = new SchemaDescriptor(1,
-            new Column[] {
-                new Column("key", NativeType.LONG, false)
-            },
-            new Column[] {
-                new Column("value", NativeType.LONG, false)
+                    // Last table schema version changed.
+                    if (verPos == -1) {
+                        UUID tblId = UUID.fromString(keyTail);
+
+                        if (evt.oldEntry() == null)  // Initial schema added.
+                            schemes.put(tblId, new SchemaRegistryImpl());

Review comment:
       Why doesn't the new schema involve into the schema registry?

##########
File path: modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
##########
@@ -200,12 +199,12 @@ private void listenForTableChange() {
 
                 if (hasMetastorageLocally) {
                     var key = new Key(INTERNAL_PREFIX + tblId.toString());
-
                     futs.add(metaStorageMgr.invoke(
                         Conditions.key(key).value().eq(null),
                         Operations.put(key, tableView.name().getBytes(StandardCharsets.UTF_8)),
-                        Operations.noop()).thenCompose(res ->
-                        affMgr.calculateAssignments(tblId)));
+                        Operations.noop())
+                        .thenCompose(res -> schemaMgr.initNewSchemaForTable(tblId, tableView.name()))

Review comment:
       If you make it in the same manner as affinity, you would use a local event INITIALIZED/REMOVED (as affinity CALCULATED/REMOVED). Otherwise, you may be in circumstances where the metastorege has been consistently, but local structure (schema registry) has been not initialized.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite-3] AMashenkov commented on a change in pull request #91: IGNITE-14077: Distributed schema manager.

Posted by GitBox <gi...@apache.org>.
AMashenkov commented on a change in pull request #91:
URL: https://github.com/apache/ignite-3/pull/91#discussion_r634279782



##########
File path: modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
##########
@@ -200,12 +199,12 @@ private void listenForTableChange() {
 
                 if (hasMetastorageLocally) {
                     var key = new Key(INTERNAL_PREFIX + tblId.toString());
-
                     futs.add(metaStorageMgr.invoke(
                         Conditions.key(key).value().eq(null),
                         Operations.put(key, tableView.name().getBytes(StandardCharsets.UTF_8)),
-                        Operations.noop()).thenCompose(res ->
-                        affMgr.calculateAssignments(tblId)));
+                        Operations.noop())
+                        .thenCompose(res -> schemaMgr.initNewSchemaForTable(tblId, tableView.name()))

Review comment:
       How are you suggest to synchronize 2 independent async events? e.g. Schema.REMOVED and Affinity.REMOVED.
   Why you decide Affinity.REMOVED is terminal event for Table.drop() operation?
   
   Is there any explanation or decision how components should communicate? 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite-3] AMashenkov merged pull request #91: IGNITE-14077: Distributed schema manager.

Posted by GitBox <gi...@apache.org>.
AMashenkov merged pull request #91:
URL: https://github.com/apache/ignite-3/pull/91


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite-3] AMashenkov commented on pull request #91: IGNITE-14077: Distributed schema manager.

Posted by GitBox <gi...@apache.org>.
AMashenkov commented on pull request #91:
URL: https://github.com/apache/ignite-3/pull/91#issuecomment-824305835


   Implemented schema registry that holds actual schema versions and provides methods to get/register/cleanup schema versions.
   Unit tests added.
   
   Postponed due to distributed schema updates processing required subscription to config updates. 
   Schema update events are not specified, because schema is a part of table config which is not ready yet.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite-3] AMashenkov commented on a change in pull request #91: IGNITE-14077: Distributed schema manager.

Posted by GitBox <gi...@apache.org>.
AMashenkov commented on a change in pull request #91:
URL: https://github.com/apache/ignite-3/pull/91#discussion_r634279782



##########
File path: modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
##########
@@ -200,12 +199,12 @@ private void listenForTableChange() {
 
                 if (hasMetastorageLocally) {
                     var key = new Key(INTERNAL_PREFIX + tblId.toString());
-
                     futs.add(metaStorageMgr.invoke(
                         Conditions.key(key).value().eq(null),
                         Operations.put(key, tableView.name().getBytes(StandardCharsets.UTF_8)),
-                        Operations.noop()).thenCompose(res ->
-                        affMgr.calculateAssignments(tblId)));
+                        Operations.noop())
+                        .thenCompose(res -> schemaMgr.initNewSchemaForTable(tblId, tableView.name()))

Review comment:
       How are you suggest to synchronize 2 independent async events? e.g. Schema.REMOVED and Affinity.REMOVED.
   Why you decide Affinity.REMOVED is terminal event for Table.drop() operation?
   
   Is there any explanation or decision on how components should communicate? 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite-3] sanpwc commented on a change in pull request #91: IGNITE-14077: Distributed schema manager.

Posted by GitBox <gi...@apache.org>.
sanpwc commented on a change in pull request #91:
URL: https://github.com/apache/ignite-3/pull/91#discussion_r632552105



##########
File path: modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaManager.java
##########
@@ -17,60 +17,310 @@
 
 package org.apache.ignite.internal.schema;
 
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
 import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
 import org.apache.ignite.configuration.internal.ConfigurationManager;
+import org.apache.ignite.configuration.schemas.table.ColumnTypeView;
+import org.apache.ignite.configuration.schemas.table.ColumnView;
+import org.apache.ignite.configuration.schemas.table.TableConfiguration;
+import org.apache.ignite.configuration.schemas.table.TableIndexConfiguration;
+import org.apache.ignite.configuration.schemas.table.TablesConfiguration;
+import org.apache.ignite.configuration.tree.NamedListView;
+import org.apache.ignite.internal.metastorage.MetaStorageManager;
+import org.apache.ignite.internal.schema.registry.SchemaRegistryException;
+import org.apache.ignite.internal.schema.registry.SchemaRegistryImpl;
+import org.apache.ignite.internal.util.ByteUtils;
+import org.apache.ignite.internal.vault.VaultManager;
+import org.apache.ignite.lang.ByteArray;
+import org.apache.ignite.lang.IgniteLogger;
+import org.apache.ignite.metastorage.common.Conditions;
+import org.apache.ignite.metastorage.common.Cursor;
+import org.apache.ignite.metastorage.common.Entry;
+import org.apache.ignite.metastorage.common.Key;
+import org.apache.ignite.metastorage.common.Operations;
+import org.apache.ignite.metastorage.common.WatchEvent;
+import org.apache.ignite.metastorage.common.WatchListener;
+import org.apache.ignite.schema.PrimaryIndex;
+import org.jetbrains.annotations.NotNull;
 
 /**
  * Schema Manager.
  */
-// TODO: IGNITE-14586 Remove @SuppressWarnings when implementation provided.
-@SuppressWarnings({"FieldCanBeLocal", "unused"}) public class SchemaManager {
-    /** Configuration manager in order to handle and listen schema specific configuration.*/
+public class SchemaManager {
+    /** The logger. */
+    private static final IgniteLogger LOG = IgniteLogger.forClass(SchemaManager.class);
+
+    /** Internal prefix for the metasorage. */
+    private static final String INTERNAL_PREFIX = "internal.tables.schema.";
+
+    /** Schema history item key suffix. */
+    protected static final String INTERNAL_VER_SUFFIX = ".ver.";
+
+    /** Configuration manager in order to handle and listen schema specific configuration. */
     private final ConfigurationManager configurationMgr;
 
+    /** Metastorage manager. */
+    private final MetaStorageManager metaStorageMgr;
+
+    /** Vault manager. */
+    private final VaultManager vaultManager;
+
+    /** Schema history subscription future. */
+    private CompletableFuture<Long> schemaHistorySubscriptionFut;
+
     /** Schema. */
-    private final SchemaDescriptor schema;
+    private final Map<UUID, SchemaRegistryImpl> schemes = new ConcurrentHashMap<>();
 
     /**
      * The constructor.
      *
      * @param configurationMgr Configuration manager.
+     * @param metaStorageMgr Metastorage manager.
+     * @param vaultManager Vault manager.
      */
-    public SchemaManager(ConfigurationManager configurationMgr) {
+    public SchemaManager(
+        ConfigurationManager configurationMgr,
+        MetaStorageManager metaStorageMgr,
+        VaultManager vaultManager
+    ) {
         this.configurationMgr = configurationMgr;
+        this.metaStorageMgr = metaStorageMgr;
+        this.vaultManager = vaultManager;
+
+        schemaHistorySubscriptionFut = metaStorageMgr.registerWatchByPrefix(new Key(INTERNAL_PREFIX), new WatchListener() {
+            /** {@inheritDoc} */
+            @Override public boolean onUpdate(@NotNull Iterable<WatchEvent> events) {
+                for (WatchEvent evt : events) {
+                    String keyTail = evt.newEntry().key().toString().substring(INTERNAL_PREFIX.length() - 1);
+
+                    int verPos = keyTail.indexOf(INTERNAL_VER_SUFFIX);
+
+                    // Last table schema version changed.
+                    if (verPos == -1) {
+                        UUID tblId = UUID.fromString(keyTail);
 
-        this.schema = new SchemaDescriptor(1,
-            new Column[] {
-                new Column("key", NativeType.LONG, false)
-            },
-            new Column[] {
-                new Column("value", NativeType.LONG, false)
+                        if (evt.oldEntry() == null)  // Initial schema added.
+                            schemes.put(tblId, new SchemaRegistryImpl());
+                        else if (evt.newEntry() == null) // Table Dropped.
+                            schemes.remove(tblId);
+                        else //TODO: https://issues.apache.org/jira/browse/IGNITE-13752
+                            throw new SchemaRegistryException("Schema upgrade is not implemented yet.");
+                    }
+                    else {
+                        UUID tblId = UUID.fromString(keyTail.substring(0, verPos));
+
+                        final SchemaRegistryImpl reg = schemes.get(tblId);
+
+                        assert reg != null : "Table schema was not initialized or table has been dropped: " + tblId;
+
+                        reg.registerSchema((SchemaDescriptor)ByteUtils.fromBytes(evt.newEntry().value()));
+                    }
+                }
+
+                return true;
             }
+
+            /** {@inheritDoc} */
+            @Override public void onError(@NotNull Throwable e) {
+                LOG.error("Metastorage listener issue", e);
+            }
+        });
+    }
+
+    /**
+     * Unsubscribes a listener form the affinity calculation.
+     */
+    private void unsubscribeFromAssignmentCalculation() {
+        if (schemaHistorySubscriptionFut == null)
+            return;
+
+        try {
+            Long subscriptionId = schemaHistorySubscriptionFut.get();
+
+            metaStorageMgr.unregisterWatch(subscriptionId);
+
+            schemaHistorySubscriptionFut = null;
+        }
+        catch (InterruptedException | ExecutionException e) {
+            LOG.error("Couldn't unsubscribe for Metastorage updates", e);
+        }
+    }
+
+    /**
+     * Reads current schema configuration, build schema descriptor,
+     * then add it to history rise up table schema version.
+     *
+     * @param tblId Table id.
+     * @param tblName Table name.
+     * @return Operation future.
+     */
+    public CompletableFuture<Boolean> initNewSchemaForTable(UUID tblId, String tblName) {
+        return vaultManager.get(ByteArray.fromString(INTERNAL_PREFIX + tblId)).
+            thenCompose(entry -> {
+                TableConfiguration tblConfig = configurationMgr.configurationRegistry().getConfiguration(TablesConfiguration.KEY).tables().get(tblName);
+                var key = new Key(INTERNAL_PREFIX + tblId);
+
+                int ver = entry.empty() ? 1 : (int)ByteUtils.bytesToLong(entry.value(), 0) + 1;
+
+                final SchemaDescriptor desc = createSchemaDescriptor(tblConfig, ver);
+
+                return metaStorageMgr.invoke(
+                    Conditions.key(key).value().eq(entry.value()), // Won't to rewrite if the version goes ahead.
+                    List.of(
+                        Operations.put(key, ByteUtils.longToBytes(ver)),
+                        Operations.put(new Key(INTERNAL_PREFIX + tblId + INTERNAL_VER_SUFFIX + ver), ByteUtils.toBytes(desc))
+                    ),
+                    List.of(
+                        Operations.noop(),
+                        Operations.noop()
+                    ));
+            });
+    }
+
+    /**
+     * Creates schema descriptor from config.
+     *
+     * @param tblConfig Table config.
+     * @param ver Schema version.
+     * @return Schema descriptor.
+     */
+    private SchemaDescriptor createSchemaDescriptor(TableConfiguration tblConfig, int ver) {
+        final TableIndexConfiguration pkCfg = tblConfig.indices().get(PrimaryIndex.PRIMARY_KEY_INDEX_NAME);
+
+        assert pkCfg != null;
+
+        final Set<String> keyColNames = Stream.of(pkCfg.colNames().value()).collect(Collectors.toSet());
+        final NamedListView<ColumnView> cols = tblConfig.columns().value();
+
+        final ArrayList<Column> keyCols = new ArrayList<>(keyColNames.size());
+        final ArrayList<Column> valCols = new ArrayList<>(cols.size() - keyColNames.size());
+
+        cols.namedListKeys().stream()
+            .map(cols::get)
+            .map(col -> new Column(col.name(), createType(col.type()), col.nullable()))
+            .forEach(c -> (keyColNames.contains(c.name()) ? keyCols : valCols).add(c));
+
+        return new SchemaDescriptor(
+            ver,
+            keyCols.toArray(Column[]::new),
+            pkCfg.affinityColumns().value(),
+            valCols.toArray(Column[]::new)
         );
     }
 
     /**
-     * Gets a current schema for the table specified.
+     * Create type from config.
      *
-     * @param tableId Table id.
-     * @return Schema.
+     * @param type Type view.
+     * @return Native type.
      */
-    public SchemaDescriptor schema(UUID tableId) {
-        return schema;
+    private NativeType createType(ColumnTypeView type) {
+        switch (type.type().toLowerCase()) {
+            case "byte":
+                return NativeType.BYTE;
+            case "short":
+                return NativeType.SHORT;
+            case "int":
+                return NativeType.INTEGER;
+            case "long":
+                return NativeType.LONG;
+            case "float":
+                return NativeType.FLOAT;
+            case "double":
+                return NativeType.DOUBLE;
+            case "uuid":
+                return NativeType.UUID;
+            case "bitmask":
+                return Bitmask.of(type.length());
+            case "string":
+                return NativeType.STRING;
+            case "bytes":
+                return NativeType.BYTES;
+
+            default:
+                throw new IllegalStateException("Unsupported column type: " + type.type());
+        }
     }
 
     /**
-     * Gets a schema for specific version.
+     * Compares schemas.
      *
+     * @param expected Expected schema.
+     * @param actual Actual schema.
+     * @return {@code True} if schemas are equal, {@code false} otherwise.
+     */
+    public static boolean equalSchemas(SchemaDescriptor expected, SchemaDescriptor actual) {
+        if (expected.keyColumns().length() != actual.keyColumns().length() ||
+            expected.valueColumns().length() != actual.valueColumns().length())
+            return false;
+
+        for (int i = 0; i < expected.length(); i++) {
+            if (!expected.column(i).equals(actual.column(i)))
+                return false;
+        }
+
+        return true;
+    }
+
+    /**
      * @param tableId Table id.
-     * @param ver Schema version.
-     * @return Schema.
+     * @return Schema registry for the table.
+     */
+    public SchemaRegistry schemaRegistryForTable(UUID tableId) {
+        final SchemaRegistry reg = schemes.get(tableId);
+
+        if (reg == null)
+            throw new SchemaRegistryException("No schema was ever registeref for the table: " + tableId);

Review comment:
       typo registeref




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite-3] vldpyatkov commented on a change in pull request #91: IGNITE-14077: Distributed schema manager.

Posted by GitBox <gi...@apache.org>.
vldpyatkov commented on a change in pull request #91:
URL: https://github.com/apache/ignite-3/pull/91#discussion_r633708364



##########
File path: modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaManager.java
##########
@@ -17,60 +17,268 @@
 
 package org.apache.ignite.internal.schema;
 
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
 import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
 import org.apache.ignite.configuration.internal.ConfigurationManager;
+import org.apache.ignite.configuration.schemas.table.ColumnTypeView;
+import org.apache.ignite.configuration.schemas.table.ColumnView;
+import org.apache.ignite.configuration.schemas.table.TableConfiguration;
+import org.apache.ignite.configuration.schemas.table.TableIndexConfiguration;
+import org.apache.ignite.configuration.schemas.table.TablesConfiguration;
+import org.apache.ignite.configuration.tree.NamedListView;
+import org.apache.ignite.internal.metastorage.MetaStorageManager;
+import org.apache.ignite.internal.schema.registry.SchemaRegistryException;
+import org.apache.ignite.internal.schema.registry.SchemaRegistryImpl;
+import org.apache.ignite.internal.util.ByteUtils;
+import org.apache.ignite.internal.vault.VaultManager;
+import org.apache.ignite.lang.ByteArray;
+import org.apache.ignite.lang.IgniteLogger;
+import org.apache.ignite.metastorage.common.Conditions;
+import org.apache.ignite.metastorage.common.Cursor;
+import org.apache.ignite.metastorage.common.Entry;
+import org.apache.ignite.metastorage.common.Key;
+import org.apache.ignite.metastorage.common.Operations;
+import org.apache.ignite.metastorage.common.WatchEvent;
+import org.apache.ignite.metastorage.common.WatchListener;
+import org.apache.ignite.schema.PrimaryIndex;
+import org.jetbrains.annotations.NotNull;
 
 /**
  * Schema Manager.
  */
-// TODO: IGNITE-14586 Remove @SuppressWarnings when implementation provided.
-@SuppressWarnings({"FieldCanBeLocal", "unused"}) public class SchemaManager {
-    /** Configuration manager in order to handle and listen schema specific configuration.*/
+public class SchemaManager {
+    /** The logger. */
+    private static final IgniteLogger LOG = IgniteLogger.forClass(SchemaManager.class);
+
+    /** Internal prefix for the metasorage. */
+    private static final String INTERNAL_PREFIX = "internal.tables.schema.";
+
+    /** Schema history item key suffix. */
+    private static final String INTERNAL_VER_SUFFIX = ".ver.";
+
+    /** Configuration manager in order to handle and listen schema specific configuration. */
     private final ConfigurationManager configurationMgr;
 
+    /** Metastorage manager. */
+    private final MetaStorageManager metaStorageMgr;
+
+    /** Vault manager. */
+    private final VaultManager vaultMgr;
+
     /** Schema. */
-    private final SchemaDescriptor schema;
+    private final Map<UUID, SchemaRegistryImpl> schemes = new ConcurrentHashMap<>();
 
     /**
      * The constructor.
      *
      * @param configurationMgr Configuration manager.
+     * @param metaStorageMgr Metastorage manager.
+     * @param vaultMgr Vault manager.
      */
-    public SchemaManager(ConfigurationManager configurationMgr) {
+    public SchemaManager(
+        ConfigurationManager configurationMgr,
+        MetaStorageManager metaStorageMgr,
+        VaultManager vaultMgr
+    ) {
         this.configurationMgr = configurationMgr;
+        this.metaStorageMgr = metaStorageMgr;
+        this.vaultMgr = vaultMgr;
+
+        metaStorageMgr.registerWatchByPrefix(new Key(INTERNAL_PREFIX), new WatchListener() {
+            /** {@inheritDoc} */
+            @Override public boolean onUpdate(@NotNull Iterable<WatchEvent> events) {

Review comment:
       We don't write a comment for a method in the anonymous classes.

##########
File path: modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaManager.java
##########
@@ -17,60 +17,268 @@
 
 package org.apache.ignite.internal.schema;
 
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
 import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
 import org.apache.ignite.configuration.internal.ConfigurationManager;
+import org.apache.ignite.configuration.schemas.table.ColumnTypeView;
+import org.apache.ignite.configuration.schemas.table.ColumnView;
+import org.apache.ignite.configuration.schemas.table.TableConfiguration;
+import org.apache.ignite.configuration.schemas.table.TableIndexConfiguration;
+import org.apache.ignite.configuration.schemas.table.TablesConfiguration;
+import org.apache.ignite.configuration.tree.NamedListView;
+import org.apache.ignite.internal.metastorage.MetaStorageManager;
+import org.apache.ignite.internal.schema.registry.SchemaRegistryException;
+import org.apache.ignite.internal.schema.registry.SchemaRegistryImpl;
+import org.apache.ignite.internal.util.ByteUtils;
+import org.apache.ignite.internal.vault.VaultManager;
+import org.apache.ignite.lang.ByteArray;
+import org.apache.ignite.lang.IgniteLogger;
+import org.apache.ignite.metastorage.common.Conditions;
+import org.apache.ignite.metastorage.common.Cursor;
+import org.apache.ignite.metastorage.common.Entry;
+import org.apache.ignite.metastorage.common.Key;
+import org.apache.ignite.metastorage.common.Operations;
+import org.apache.ignite.metastorage.common.WatchEvent;
+import org.apache.ignite.metastorage.common.WatchListener;
+import org.apache.ignite.schema.PrimaryIndex;
+import org.jetbrains.annotations.NotNull;
 
 /**
  * Schema Manager.
  */
-// TODO: IGNITE-14586 Remove @SuppressWarnings when implementation provided.
-@SuppressWarnings({"FieldCanBeLocal", "unused"}) public class SchemaManager {
-    /** Configuration manager in order to handle and listen schema specific configuration.*/
+public class SchemaManager {
+    /** The logger. */
+    private static final IgniteLogger LOG = IgniteLogger.forClass(SchemaManager.class);
+
+    /** Internal prefix for the metasorage. */
+    private static final String INTERNAL_PREFIX = "internal.tables.schema.";
+
+    /** Schema history item key suffix. */
+    private static final String INTERNAL_VER_SUFFIX = ".ver.";
+
+    /** Configuration manager in order to handle and listen schema specific configuration. */
     private final ConfigurationManager configurationMgr;
 
+    /** Metastorage manager. */
+    private final MetaStorageManager metaStorageMgr;
+
+    /** Vault manager. */
+    private final VaultManager vaultMgr;
+
     /** Schema. */
-    private final SchemaDescriptor schema;
+    private final Map<UUID, SchemaRegistryImpl> schemes = new ConcurrentHashMap<>();
 
     /**
      * The constructor.
      *
      * @param configurationMgr Configuration manager.
+     * @param metaStorageMgr Metastorage manager.
+     * @param vaultMgr Vault manager.
      */
-    public SchemaManager(ConfigurationManager configurationMgr) {
+    public SchemaManager(
+        ConfigurationManager configurationMgr,
+        MetaStorageManager metaStorageMgr,
+        VaultManager vaultMgr
+    ) {
         this.configurationMgr = configurationMgr;
+        this.metaStorageMgr = metaStorageMgr;
+        this.vaultMgr = vaultMgr;
+
+        metaStorageMgr.registerWatchByPrefix(new Key(INTERNAL_PREFIX), new WatchListener() {
+            /** {@inheritDoc} */
+            @Override public boolean onUpdate(@NotNull Iterable<WatchEvent> events) {
+                for (WatchEvent evt : events) {
+                    String keyTail = evt.newEntry().key().toString().substring(INTERNAL_PREFIX.length() - 1);
+
+                    int verPos = keyTail.indexOf(INTERNAL_VER_SUFFIX);
 
-        this.schema = new SchemaDescriptor(1,
-            new Column[] {
-                new Column("key", NativeType.LONG, false)
-            },
-            new Column[] {
-                new Column("value", NativeType.LONG, false)
+                    // Last table schema version changed.
+                    if (verPos == -1) {
+                        UUID tblId = UUID.fromString(keyTail);
+
+                        if (evt.oldEntry() == null)  // Initial schema added.
+                            schemes.put(tblId, new SchemaRegistryImpl());

Review comment:
       Why doesn't the new schema involve into the schema registry?

##########
File path: modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
##########
@@ -200,12 +199,12 @@ private void listenForTableChange() {
 
                 if (hasMetastorageLocally) {
                     var key = new Key(INTERNAL_PREFIX + tblId.toString());
-
                     futs.add(metaStorageMgr.invoke(
                         Conditions.key(key).value().eq(null),
                         Operations.put(key, tableView.name().getBytes(StandardCharsets.UTF_8)),
-                        Operations.noop()).thenCompose(res ->
-                        affMgr.calculateAssignments(tblId)));
+                        Operations.noop())
+                        .thenCompose(res -> schemaMgr.initNewSchemaForTable(tblId, tableView.name()))

Review comment:
       If you make it in the same manner as affinity, you would use a local event INITIALIZED/REMOVED (as affinity CALCULATED/REMOVED). Otherwise, you may be in circumstances where the metastorege has been consistently, but local structure (schema registry) has been not initialized.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite-3] sanpwc commented on a change in pull request #91: IGNITE-14077: Distributed schema manager.

Posted by GitBox <gi...@apache.org>.
sanpwc commented on a change in pull request #91:
URL: https://github.com/apache/ignite-3/pull/91#discussion_r632553119



##########
File path: modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaManager.java
##########
@@ -17,60 +17,310 @@
 
 package org.apache.ignite.internal.schema;
 
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
 import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
 import org.apache.ignite.configuration.internal.ConfigurationManager;
+import org.apache.ignite.configuration.schemas.table.ColumnTypeView;
+import org.apache.ignite.configuration.schemas.table.ColumnView;
+import org.apache.ignite.configuration.schemas.table.TableConfiguration;
+import org.apache.ignite.configuration.schemas.table.TableIndexConfiguration;
+import org.apache.ignite.configuration.schemas.table.TablesConfiguration;
+import org.apache.ignite.configuration.tree.NamedListView;
+import org.apache.ignite.internal.metastorage.MetaStorageManager;
+import org.apache.ignite.internal.schema.registry.SchemaRegistryException;
+import org.apache.ignite.internal.schema.registry.SchemaRegistryImpl;
+import org.apache.ignite.internal.util.ByteUtils;
+import org.apache.ignite.internal.vault.VaultManager;
+import org.apache.ignite.lang.ByteArray;
+import org.apache.ignite.lang.IgniteLogger;
+import org.apache.ignite.metastorage.common.Conditions;
+import org.apache.ignite.metastorage.common.Cursor;
+import org.apache.ignite.metastorage.common.Entry;
+import org.apache.ignite.metastorage.common.Key;
+import org.apache.ignite.metastorage.common.Operations;
+import org.apache.ignite.metastorage.common.WatchEvent;
+import org.apache.ignite.metastorage.common.WatchListener;
+import org.apache.ignite.schema.PrimaryIndex;
+import org.jetbrains.annotations.NotNull;
 
 /**
  * Schema Manager.
  */
-// TODO: IGNITE-14586 Remove @SuppressWarnings when implementation provided.
-@SuppressWarnings({"FieldCanBeLocal", "unused"}) public class SchemaManager {
-    /** Configuration manager in order to handle and listen schema specific configuration.*/
+public class SchemaManager {
+    /** The logger. */
+    private static final IgniteLogger LOG = IgniteLogger.forClass(SchemaManager.class);
+
+    /** Internal prefix for the metasorage. */
+    private static final String INTERNAL_PREFIX = "internal.tables.schema.";
+
+    /** Schema history item key suffix. */
+    protected static final String INTERNAL_VER_SUFFIX = ".ver.";
+
+    /** Configuration manager in order to handle and listen schema specific configuration. */
     private final ConfigurationManager configurationMgr;
 
+    /** Metastorage manager. */
+    private final MetaStorageManager metaStorageMgr;
+
+    /** Vault manager. */
+    private final VaultManager vaultManager;
+
+    /** Schema history subscription future. */
+    private CompletableFuture<Long> schemaHistorySubscriptionFut;
+
     /** Schema. */
-    private final SchemaDescriptor schema;
+    private final Map<UUID, SchemaRegistryImpl> schemes = new ConcurrentHashMap<>();
 
     /**
      * The constructor.
      *
      * @param configurationMgr Configuration manager.
+     * @param metaStorageMgr Metastorage manager.
+     * @param vaultManager Vault manager.
      */
-    public SchemaManager(ConfigurationManager configurationMgr) {
+    public SchemaManager(
+        ConfigurationManager configurationMgr,
+        MetaStorageManager metaStorageMgr,
+        VaultManager vaultManager
+    ) {
         this.configurationMgr = configurationMgr;
+        this.metaStorageMgr = metaStorageMgr;
+        this.vaultManager = vaultManager;
+
+        schemaHistorySubscriptionFut = metaStorageMgr.registerWatchByPrefix(new Key(INTERNAL_PREFIX), new WatchListener() {
+            /** {@inheritDoc} */
+            @Override public boolean onUpdate(@NotNull Iterable<WatchEvent> events) {
+                for (WatchEvent evt : events) {
+                    String keyTail = evt.newEntry().key().toString().substring(INTERNAL_PREFIX.length() - 1);
+
+                    int verPos = keyTail.indexOf(INTERNAL_VER_SUFFIX);
+
+                    // Last table schema version changed.
+                    if (verPos == -1) {
+                        UUID tblId = UUID.fromString(keyTail);
 
-        this.schema = new SchemaDescriptor(1,
-            new Column[] {
-                new Column("key", NativeType.LONG, false)
-            },
-            new Column[] {
-                new Column("value", NativeType.LONG, false)
+                        if (evt.oldEntry() == null)  // Initial schema added.
+                            schemes.put(tblId, new SchemaRegistryImpl());
+                        else if (evt.newEntry() == null) // Table Dropped.
+                            schemes.remove(tblId);
+                        else //TODO: https://issues.apache.org/jira/browse/IGNITE-13752
+                            throw new SchemaRegistryException("Schema upgrade is not implemented yet.");
+                    }
+                    else {
+                        UUID tblId = UUID.fromString(keyTail.substring(0, verPos));
+
+                        final SchemaRegistryImpl reg = schemes.get(tblId);
+
+                        assert reg != null : "Table schema was not initialized or table has been dropped: " + tblId;
+
+                        reg.registerSchema((SchemaDescriptor)ByteUtils.fromBytes(evt.newEntry().value()));
+                    }
+                }
+
+                return true;
             }
+
+            /** {@inheritDoc} */
+            @Override public void onError(@NotNull Throwable e) {
+                LOG.error("Metastorage listener issue", e);
+            }
+        });
+    }
+
+    /**
+     * Unsubscribes a listener form the affinity calculation.
+     */
+    private void unsubscribeFromAssignmentCalculation() {
+        if (schemaHistorySubscriptionFut == null)
+            return;
+
+        try {
+            Long subscriptionId = schemaHistorySubscriptionFut.get();
+
+            metaStorageMgr.unregisterWatch(subscriptionId);
+
+            schemaHistorySubscriptionFut = null;
+        }
+        catch (InterruptedException | ExecutionException e) {
+            LOG.error("Couldn't unsubscribe for Metastorage updates", e);
+        }
+    }
+
+    /**
+     * Reads current schema configuration, build schema descriptor,
+     * then add it to history rise up table schema version.
+     *
+     * @param tblId Table id.
+     * @param tblName Table name.
+     * @return Operation future.
+     */
+    public CompletableFuture<Boolean> initNewSchemaForTable(UUID tblId, String tblName) {
+        return vaultManager.get(ByteArray.fromString(INTERNAL_PREFIX + tblId)).
+            thenCompose(entry -> {
+                TableConfiguration tblConfig = configurationMgr.configurationRegistry().getConfiguration(TablesConfiguration.KEY).tables().get(tblName);
+                var key = new Key(INTERNAL_PREFIX + tblId);
+
+                int ver = entry.empty() ? 1 : (int)ByteUtils.bytesToLong(entry.value(), 0) + 1;
+
+                final SchemaDescriptor desc = createSchemaDescriptor(tblConfig, ver);
+
+                return metaStorageMgr.invoke(
+                    Conditions.key(key).value().eq(entry.value()), // Won't to rewrite if the version goes ahead.
+                    List.of(
+                        Operations.put(key, ByteUtils.longToBytes(ver)),
+                        Operations.put(new Key(INTERNAL_PREFIX + tblId + INTERNAL_VER_SUFFIX + ver), ByteUtils.toBytes(desc))
+                    ),
+                    List.of(
+                        Operations.noop(),
+                        Operations.noop()
+                    ));
+            });
+    }
+
+    /**
+     * Creates schema descriptor from config.
+     *
+     * @param tblConfig Table config.
+     * @param ver Schema version.
+     * @return Schema descriptor.
+     */
+    private SchemaDescriptor createSchemaDescriptor(TableConfiguration tblConfig, int ver) {
+        final TableIndexConfiguration pkCfg = tblConfig.indices().get(PrimaryIndex.PRIMARY_KEY_INDEX_NAME);
+
+        assert pkCfg != null;
+
+        final Set<String> keyColNames = Stream.of(pkCfg.colNames().value()).collect(Collectors.toSet());
+        final NamedListView<ColumnView> cols = tblConfig.columns().value();
+
+        final ArrayList<Column> keyCols = new ArrayList<>(keyColNames.size());
+        final ArrayList<Column> valCols = new ArrayList<>(cols.size() - keyColNames.size());
+
+        cols.namedListKeys().stream()
+            .map(cols::get)
+            .map(col -> new Column(col.name(), createType(col.type()), col.nullable()))
+            .forEach(c -> (keyColNames.contains(c.name()) ? keyCols : valCols).add(c));
+
+        return new SchemaDescriptor(
+            ver,
+            keyCols.toArray(Column[]::new),
+            pkCfg.affinityColumns().value(),
+            valCols.toArray(Column[]::new)
         );
     }
 
     /**
-     * Gets a current schema for the table specified.
+     * Create type from config.
      *
-     * @param tableId Table id.
-     * @return Schema.
+     * @param type Type view.
+     * @return Native type.
      */
-    public SchemaDescriptor schema(UUID tableId) {
-        return schema;
+    private NativeType createType(ColumnTypeView type) {
+        switch (type.type().toLowerCase()) {
+            case "byte":
+                return NativeType.BYTE;
+            case "short":
+                return NativeType.SHORT;
+            case "int":
+                return NativeType.INTEGER;
+            case "long":
+                return NativeType.LONG;
+            case "float":
+                return NativeType.FLOAT;
+            case "double":
+                return NativeType.DOUBLE;
+            case "uuid":
+                return NativeType.UUID;
+            case "bitmask":
+                return Bitmask.of(type.length());
+            case "string":
+                return NativeType.STRING;
+            case "bytes":
+                return NativeType.BYTES;
+
+            default:
+                throw new IllegalStateException("Unsupported column type: " + type.type());
+        }
     }
 
     /**
-     * Gets a schema for specific version.
+     * Compares schemas.
      *
+     * @param expected Expected schema.
+     * @param actual Actual schema.
+     * @return {@code True} if schemas are equal, {@code false} otherwise.
+     */
+    public static boolean equalSchemas(SchemaDescriptor expected, SchemaDescriptor actual) {
+        if (expected.keyColumns().length() != actual.keyColumns().length() ||
+            expected.valueColumns().length() != actual.valueColumns().length())
+            return false;
+
+        for (int i = 0; i < expected.length(); i++) {
+            if (!expected.column(i).equals(actual.column(i)))
+                return false;
+        }
+
+        return true;
+    }
+
+    /**
      * @param tableId Table id.
-     * @param ver Schema version.
-     * @return Schema.
+     * @return Schema registry for the table.
+     */
+    public SchemaRegistry schemaRegistryForTable(UUID tableId) {
+        final SchemaRegistry reg = schemes.get(tableId);
+
+        if (reg == null)
+            throw new SchemaRegistryException("No schema was ever registeref for the table: " + tableId);
+
+        return reg;
+    }
+
+    /**
+     * Registers new schema.
+     *
+     * @param tableId Table identifier.
+     * @param desc Schema descriptor.
      */
-    public SchemaDescriptor schema(UUID tableId, long ver) {
-        assert ver >= 0;
+    public CompletableFuture<Boolean> registerSchema(UUID tableId, SchemaDescriptor desc) {
+        int schemaVersion = desc.version();
+
+        final Key key = new Key(INTERNAL_PREFIX + tableId + '.' + schemaVersion);
+
+        return metaStorageMgr.invoke(
+            Conditions.key(key).value().eq(null),
+            Operations.put(key, ByteUtils.toBytes(desc)), //TODO: IGNITE-14679 Serialize schema.
+            Operations.noop());
+    }
+
+    /**
+     * Unregistered all schemas associated with a table identifier.
+     *
+     * @param tableId Table identifier.
+     * @return Future which will complete when all versions of schema will be unregistered.
+     */
+    public CompletableFuture<Boolean> unregisterSchemas(UUID tableId) {
+        List<CompletableFuture<?>> futs = new ArrayList<>();
+
+        String schemaPrefix = INTERNAL_PREFIX + tableId + INTERNAL_VER_SUFFIX;
 
-        assert schema.version() == ver;
+        try (Cursor<Entry> cursor = metaStorageMgr.range(new Key(schemaPrefix), null)) {
+            cursor.forEach(entry ->
+                futs.add(metaStorageMgr.remove(entry.key())));
+        }
+        catch (Exception e) {
+            LOG.error("Culdn't remove schemas for the table [tblId=" + tableId + ']');

Review comment:
       Typo Culdn't




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite-3] AMashenkov commented on a change in pull request #91: IGNITE-14077: Distributed schema manager.

Posted by GitBox <gi...@apache.org>.
AMashenkov commented on a change in pull request #91:
URL: https://github.com/apache/ignite-3/pull/91#discussion_r634282374



##########
File path: modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
##########
@@ -200,12 +199,12 @@ private void listenForTableChange() {
 
                 if (hasMetastorageLocally) {
                     var key = new Key(INTERNAL_PREFIX + tblId.toString());
-
                     futs.add(metaStorageMgr.invoke(
                         Conditions.key(key).value().eq(null),
                         Operations.put(key, tableView.name().getBytes(StandardCharsets.UTF_8)),
-                        Operations.noop()).thenCompose(res ->
-                        affMgr.calculateAssignments(tblId)));
+                        Operations.noop())
+                        .thenCompose(res -> schemaMgr.initNewSchemaForTable(tblId, tableView.name()))

Review comment:
       For now I see the construction "affMgr.listen(AffinityEvent.REMOVED, ...)" which semantic is unclear and it has a bug.
   If user will start/stop tables, affMgr.listeners collection will constantly grows, as noone removes useless listeners from it.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite-3] sanpwc commented on a change in pull request #91: IGNITE-14077: Distributed schema manager.

Posted by GitBox <gi...@apache.org>.
sanpwc commented on a change in pull request #91:
URL: https://github.com/apache/ignite-3/pull/91#discussion_r632561029



##########
File path: modules/schema/src/main/java/org/apache/ignite/internal/schema/registry/SchemaRegistryImpl.java
##########
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.schema.registry;
+
+import java.util.List;
+import java.util.Objects;
+import java.util.concurrent.ConcurrentSkipListMap;
+import org.apache.ignite.internal.schema.SchemaDescriptor;
+import org.apache.ignite.internal.schema.SchemaRegistry;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Holds schema descriptors for actual schema versions.
+ * <p>
+ * Schemas MUST be registered in a version ascending order incrementing by {@code 1} with NO gaps,
+ * otherwise an exception will be thrown. The version numbering starts from the {@code 1}.
+ * <p>
+ * After some table maintenance process some first versions may become outdated and can be safely cleaned up
+ * if the process guarantees the table no longer has a data of these versions.
+ *
+ * @implSpec The changes in between two arbitrary actual versions MUST NOT be lost.
+ * Thus, schema versions can only be removed from the beginning.
+ * @implSpec Initial schema history MAY be registered without the first outdate versions

Review comment:
       outdate->outdated




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite-3] vldpyatkov commented on a change in pull request #91: IGNITE-14077: Distributed schema manager.

Posted by GitBox <gi...@apache.org>.
vldpyatkov commented on a change in pull request #91:
URL: https://github.com/apache/ignite-3/pull/91#discussion_r622764406



##########
File path: modules/table/src/main/java/org/apache/ignite/internal/table/schema/TableSchemaManagerImpl.java
##########
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.schema;
+
+import java.util.List;
+import org.apache.ignite.internal.schema.Column;
+import org.apache.ignite.internal.schema.SchemaDescriptor;
+import org.apache.ignite.internal.table.TableSchemaManager;
+
+/**
+ * Table schema manager component.
+ */
+public class TableSchemaManagerImpl implements TableSchemaManager {

Review comment:
       It looks like a per table class.
   But the manager it is a reserved term for node component.
   In other word a schema manager should be the component who serves all tables.
   Look at the main branch, is was renamed to `TableSchemaView`:
   https://github.com/apache/ignite-3/blame/main/modules/table/src/main/java/org/apache/ignite/internal/table/TableSchemaView.java




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org