You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@ignite.apache.org by GitBox <gi...@apache.org> on 2021/04/29 21:12:22 UTC

[GitHub] [ignite-3] vldpyatkov opened a new pull request #115: IGNITE-14236 Provide a new version of cache API

vldpyatkov opened a new pull request #115:
URL: https://github.com/apache/ignite-3/pull/115


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite-3] vldpyatkov commented on a change in pull request #115: IGNITE-14236 Provide a new version of cache API

Posted by GitBox <gi...@apache.org>.
vldpyatkov commented on a change in pull request #115:
URL: https://github.com/apache/ignite-3/pull/115#discussion_r623815232



##########
File path: modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaManager.java
##########
@@ -17,36 +17,138 @@
 
 package org.apache.ignite.internal.schema;
 
+import java.util.ArrayList;
+import java.util.Map;
 import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
 import org.apache.ignite.configuration.internal.ConfigurationManager;
+import org.apache.ignite.internal.metastorage.MetaStorageManager;
+import org.apache.ignite.internal.util.ByteUtils;
+import org.apache.ignite.internal.vault.VaultManager;
+import org.apache.ignite.lang.ByteArray;
+import org.apache.ignite.lang.IgniteLogger;
+import org.apache.ignite.metastorage.common.Conditions;
+import org.apache.ignite.metastorage.common.Cursor;
+import org.apache.ignite.metastorage.common.Entry;
+import org.apache.ignite.metastorage.common.Key;
+import org.apache.ignite.metastorage.common.Operations;
+import org.apache.ignite.metastorage.common.WatchEvent;
+import org.apache.ignite.metastorage.common.WatchListener;
+import org.jetbrains.annotations.NotNull;
 
 /**
  * Schema Manager.
  */
-// TODO: IGNITE-14586 Remove @SuppressWarnings when implementation provided.
-@SuppressWarnings({"FieldCanBeLocal", "unused"}) public class SchemaManager {
-    /** Configuration manager in order to handle and listen schema specific configuration.*/
+public class SchemaManager {
+    /** The logger. */
+    private static final IgniteLogger LOG = IgniteLogger.forClass(SchemaManager.class);
+
+    /** Internal prefix for the metasorage. */
+    private static final String INTERNAL_PREFIX = "internal.tables.schema.";
+
+    /** Configuration manager in order to handle and listen schema specific configuration. */
     private final ConfigurationManager configurationMgr;
 
+    /** Metastorage manager. */
+    private final MetaStorageManager metaStorageManager;
+
+    /** Vault manager. */
+    private final VaultManager vaultManager;
+
     /** Schema. */
-    private final SchemaDescriptor schema;
+    private final Map<UUID, SchemaDescriptor> schemes = new ConcurrentHashMap<>();
 
     /**
      * The constructor.
      *
      * @param configurationMgr Configuration manager.
+     * @param metaStorageManager Metastorage manager.
+     * @param vaultManager Vault manager.
      */
-    public SchemaManager(ConfigurationManager configurationMgr) {
+    public SchemaManager(
+        ConfigurationManager configurationMgr,
+        MetaStorageManager metaStorageManager,
+        VaultManager vaultManager
+    ) {
         this.configurationMgr = configurationMgr;
+        this.metaStorageManager = metaStorageManager;
+        this.vaultManager = vaultManager;
+
+//        this.schema = new SchemaDescriptor(1,
+//            new Column[] {
+//                new Column("key", NativeType.LONG, false)
+//            },
+//            new Column[] {
+//                new Column("value", NativeType.LONG, false)
+//            }
+//        );
+
+        metaStorageManager.registerWatchByPrefix(new Key(INTERNAL_PREFIX), new WatchListener() {
+            @Override public boolean onUpdate(@NotNull Iterable<WatchEvent> events) {
+                for (WatchEvent evt : events) {
+                    String keyTail = evt.newEntry().key().toString().substring(INTERNAL_PREFIX.length());
+
+                    UUID tblId = UUID.fromString(keyTail.substring(0, keyTail.indexOf('.')));
 
-        this.schema = new SchemaDescriptor(1,
-            new Column[] {
-                new Column("key", NativeType.LONG, false)
-            },
-            new Column[] {
-                new Column("value", NativeType.LONG, false)
+                    int schemaIdVal = Integer.parseInt(keyTail.substring(keyTail.indexOf('.'), keyTail.length() - 1));
+
+                    if (evt.newEntry().value() == null)
+                        schemes.computeIfPresent(tblId, (key, val) -> val.version() == schemaIdVal ? null : val);
+                    else {
+                        schemes.compute(tblId, (key, val) -> val == null || schemaIdVal > val.version() ?
+                            (SchemaDescriptor)ByteUtils.fromBytes(evt.newEntry().value()) : val);
+                    }
+                }
+
+                return true;
+            }
+
+            @Override public void onError(@NotNull Throwable e) {
+                LOG.error("Faled to notyfy Schema manager.", e);
             }
-        );
+        });
+    }
+
+    /**
+     * Registers new schema.
+     *
+     * @param tableId Table identifier.
+     * @param desc Schema descriptor.
+     */
+    public CompletableFuture<Boolean> registerSchema(UUID tableId, SchemaDescriptor desc) {
+        int schemaVersion = desc.version();
+
+        return metaStorageManager.invoke(new Key(INTERNAL_PREFIX
+                //Tbale id
+                + tableId + '.'
+                //Schema version
+                + schemaVersion),
+            Conditions.value().eq(null),
+            Operations.put(ByteUtils.toBytes(desc)),
+            Operations.noop());
+    }
+
+    /**
+     * Unregistered all schemas associated with a table identifier.
+     *
+     * @param tableId Table identifier.
+     * @return Future which will complete when all versions of schema will be unregistered.

Review comment:
       Done.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite-3] alievmirza commented on a change in pull request #115: IGNITE-14236 Provide a new version of cache API

Posted by GitBox <gi...@apache.org>.
alievmirza commented on a change in pull request #115:
URL: https://github.com/apache/ignite-3/pull/115#discussion_r623806563



##########
File path: modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaManager.java
##########
@@ -17,36 +17,138 @@
 
 package org.apache.ignite.internal.schema;
 
+import java.util.ArrayList;
+import java.util.Map;
 import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
 import org.apache.ignite.configuration.internal.ConfigurationManager;
+import org.apache.ignite.internal.metastorage.MetaStorageManager;
+import org.apache.ignite.internal.util.ByteUtils;
+import org.apache.ignite.internal.vault.VaultManager;
+import org.apache.ignite.lang.ByteArray;
+import org.apache.ignite.lang.IgniteLogger;
+import org.apache.ignite.metastorage.common.Conditions;
+import org.apache.ignite.metastorage.common.Cursor;
+import org.apache.ignite.metastorage.common.Entry;
+import org.apache.ignite.metastorage.common.Key;
+import org.apache.ignite.metastorage.common.Operations;
+import org.apache.ignite.metastorage.common.WatchEvent;
+import org.apache.ignite.metastorage.common.WatchListener;
+import org.jetbrains.annotations.NotNull;
 
 /**
  * Schema Manager.
  */
-// TODO: IGNITE-14586 Remove @SuppressWarnings when implementation provided.
-@SuppressWarnings({"FieldCanBeLocal", "unused"}) public class SchemaManager {
-    /** Configuration manager in order to handle and listen schema specific configuration.*/
+public class SchemaManager {
+    /** The logger. */
+    private static final IgniteLogger LOG = IgniteLogger.forClass(SchemaManager.class);
+
+    /** Internal prefix for the metasorage. */
+    private static final String INTERNAL_PREFIX = "internal.tables.schema.";
+
+    /** Configuration manager in order to handle and listen schema specific configuration. */
     private final ConfigurationManager configurationMgr;
 
+    /** Metastorage manager. */
+    private final MetaStorageManager metaStorageManager;
+
+    /** Vault manager. */
+    private final VaultManager vaultManager;
+
     /** Schema. */
-    private final SchemaDescriptor schema;
+    private final Map<UUID, SchemaDescriptor> schemes = new ConcurrentHashMap<>();
 
     /**
      * The constructor.
      *
      * @param configurationMgr Configuration manager.
+     * @param metaStorageManager Metastorage manager.
+     * @param vaultManager Vault manager.
      */
-    public SchemaManager(ConfigurationManager configurationMgr) {
+    public SchemaManager(
+        ConfigurationManager configurationMgr,
+        MetaStorageManager metaStorageManager,
+        VaultManager vaultManager
+    ) {
         this.configurationMgr = configurationMgr;
+        this.metaStorageManager = metaStorageManager;
+        this.vaultManager = vaultManager;
+
+//        this.schema = new SchemaDescriptor(1,
+//            new Column[] {
+//                new Column("key", NativeType.LONG, false)
+//            },
+//            new Column[] {
+//                new Column("value", NativeType.LONG, false)
+//            }
+//        );
+
+        metaStorageManager.registerWatchByPrefix(new Key(INTERNAL_PREFIX), new WatchListener() {
+            @Override public boolean onUpdate(@NotNull Iterable<WatchEvent> events) {
+                for (WatchEvent evt : events) {
+                    String keyTail = evt.newEntry().key().toString().substring(INTERNAL_PREFIX.length());

Review comment:
       I would introduce utility method which takes `Key` and returns list of parsed values like `parseKey("abc.def.xyz") -> {"abc", "def", "xyz"}`. This approach hides machinery with string. Or at least we can use split(".") instead of doing substring-indexOf twice.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite-3] alievmirza commented on a change in pull request #115: IGNITE-14236 Provide a new version of cache API

Posted by GitBox <gi...@apache.org>.
alievmirza commented on a change in pull request #115:
URL: https://github.com/apache/ignite-3/pull/115#discussion_r623793240



##########
File path: modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaManager.java
##########
@@ -17,36 +17,138 @@
 
 package org.apache.ignite.internal.schema;
 
+import java.util.ArrayList;
+import java.util.Map;
 import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
 import org.apache.ignite.configuration.internal.ConfigurationManager;
+import org.apache.ignite.internal.metastorage.MetaStorageManager;
+import org.apache.ignite.internal.util.ByteUtils;
+import org.apache.ignite.internal.vault.VaultManager;
+import org.apache.ignite.lang.ByteArray;
+import org.apache.ignite.lang.IgniteLogger;
+import org.apache.ignite.metastorage.common.Conditions;
+import org.apache.ignite.metastorage.common.Cursor;
+import org.apache.ignite.metastorage.common.Entry;
+import org.apache.ignite.metastorage.common.Key;
+import org.apache.ignite.metastorage.common.Operations;
+import org.apache.ignite.metastorage.common.WatchEvent;
+import org.apache.ignite.metastorage.common.WatchListener;
+import org.jetbrains.annotations.NotNull;
 
 /**
  * Schema Manager.
  */
-// TODO: IGNITE-14586 Remove @SuppressWarnings when implementation provided.
-@SuppressWarnings({"FieldCanBeLocal", "unused"}) public class SchemaManager {
-    /** Configuration manager in order to handle and listen schema specific configuration.*/
+public class SchemaManager {
+    /** The logger. */
+    private static final IgniteLogger LOG = IgniteLogger.forClass(SchemaManager.class);
+
+    /** Internal prefix for the metasorage. */
+    private static final String INTERNAL_PREFIX = "internal.tables.schema.";
+
+    /** Configuration manager in order to handle and listen schema specific configuration. */
     private final ConfigurationManager configurationMgr;
 
+    /** Metastorage manager. */
+    private final MetaStorageManager metaStorageManager;
+
+    /** Vault manager. */
+    private final VaultManager vaultManager;
+
     /** Schema. */
-    private final SchemaDescriptor schema;
+    private final Map<UUID, SchemaDescriptor> schemes = new ConcurrentHashMap<>();
 
     /**
      * The constructor.
      *
      * @param configurationMgr Configuration manager.
+     * @param metaStorageManager Metastorage manager.
+     * @param vaultManager Vault manager.
      */
-    public SchemaManager(ConfigurationManager configurationMgr) {
+    public SchemaManager(
+        ConfigurationManager configurationMgr,
+        MetaStorageManager metaStorageManager,
+        VaultManager vaultManager
+    ) {
         this.configurationMgr = configurationMgr;
+        this.metaStorageManager = metaStorageManager;
+        this.vaultManager = vaultManager;
+
+//        this.schema = new SchemaDescriptor(1,
+//            new Column[] {
+//                new Column("key", NativeType.LONG, false)
+//            },
+//            new Column[] {
+//                new Column("value", NativeType.LONG, false)
+//            }
+//        );
+
+        metaStorageManager.registerWatchByPrefix(new Key(INTERNAL_PREFIX), new WatchListener() {
+            @Override public boolean onUpdate(@NotNull Iterable<WatchEvent> events) {
+                for (WatchEvent evt : events) {
+                    String keyTail = evt.newEntry().key().toString().substring(INTERNAL_PREFIX.length());
+
+                    UUID tblId = UUID.fromString(keyTail.substring(0, keyTail.indexOf('.')));
 
-        this.schema = new SchemaDescriptor(1,
-            new Column[] {
-                new Column("key", NativeType.LONG, false)
-            },
-            new Column[] {
-                new Column("value", NativeType.LONG, false)
+                    int schemaIdVal = Integer.parseInt(keyTail.substring(keyTail.indexOf('.'), keyTail.length() - 1));
+
+                    if (evt.newEntry().value() == null)
+                        schemes.computeIfPresent(tblId, (key, val) -> val.version() == schemaIdVal ? null : val);
+                    else {
+                        schemes.compute(tblId, (key, val) -> val == null || schemaIdVal > val.version() ?
+                            (SchemaDescriptor)ByteUtils.fromBytes(evt.newEntry().value()) : val);
+                    }
+                }
+
+                return true;
+            }
+
+            @Override public void onError(@NotNull Throwable e) {
+                LOG.error("Faled to notyfy Schema manager.", e);
             }
-        );
+        });
+    }
+
+    /**
+     * Registers new schema.
+     *
+     * @param tableId Table identifier.
+     * @param desc Schema descriptor.
+     */
+    public CompletableFuture<Boolean> registerSchema(UUID tableId, SchemaDescriptor desc) {
+        int schemaVersion = desc.version();
+
+        return metaStorageManager.invoke(new Key(INTERNAL_PREFIX
+                //Tbale id

Review comment:
       Comments are useless IMHO, they just duplicate variable's name

##########
File path: modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaManager.java
##########
@@ -17,36 +17,138 @@
 
 package org.apache.ignite.internal.schema;
 
+import java.util.ArrayList;
+import java.util.Map;
 import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
 import org.apache.ignite.configuration.internal.ConfigurationManager;
+import org.apache.ignite.internal.metastorage.MetaStorageManager;
+import org.apache.ignite.internal.util.ByteUtils;
+import org.apache.ignite.internal.vault.VaultManager;
+import org.apache.ignite.lang.ByteArray;
+import org.apache.ignite.lang.IgniteLogger;
+import org.apache.ignite.metastorage.common.Conditions;
+import org.apache.ignite.metastorage.common.Cursor;
+import org.apache.ignite.metastorage.common.Entry;
+import org.apache.ignite.metastorage.common.Key;
+import org.apache.ignite.metastorage.common.Operations;
+import org.apache.ignite.metastorage.common.WatchEvent;
+import org.apache.ignite.metastorage.common.WatchListener;
+import org.jetbrains.annotations.NotNull;
 
 /**
  * Schema Manager.
  */
-// TODO: IGNITE-14586 Remove @SuppressWarnings when implementation provided.
-@SuppressWarnings({"FieldCanBeLocal", "unused"}) public class SchemaManager {
-    /** Configuration manager in order to handle and listen schema specific configuration.*/
+public class SchemaManager {
+    /** The logger. */
+    private static final IgniteLogger LOG = IgniteLogger.forClass(SchemaManager.class);
+
+    /** Internal prefix for the metasorage. */
+    private static final String INTERNAL_PREFIX = "internal.tables.schema.";
+
+    /** Configuration manager in order to handle and listen schema specific configuration. */
     private final ConfigurationManager configurationMgr;
 
+    /** Metastorage manager. */
+    private final MetaStorageManager metaStorageManager;
+
+    /** Vault manager. */
+    private final VaultManager vaultManager;
+
     /** Schema. */
-    private final SchemaDescriptor schema;
+    private final Map<UUID, SchemaDescriptor> schemes = new ConcurrentHashMap<>();
 
     /**
      * The constructor.
      *
      * @param configurationMgr Configuration manager.
+     * @param metaStorageManager Metastorage manager.
+     * @param vaultManager Vault manager.
      */
-    public SchemaManager(ConfigurationManager configurationMgr) {
+    public SchemaManager(
+        ConfigurationManager configurationMgr,
+        MetaStorageManager metaStorageManager,
+        VaultManager vaultManager
+    ) {
         this.configurationMgr = configurationMgr;
+        this.metaStorageManager = metaStorageManager;
+        this.vaultManager = vaultManager;
+
+//        this.schema = new SchemaDescriptor(1,
+//            new Column[] {
+//                new Column("key", NativeType.LONG, false)
+//            },
+//            new Column[] {
+//                new Column("value", NativeType.LONG, false)
+//            }
+//        );
+
+        metaStorageManager.registerWatchByPrefix(new Key(INTERNAL_PREFIX), new WatchListener() {
+            @Override public boolean onUpdate(@NotNull Iterable<WatchEvent> events) {
+                for (WatchEvent evt : events) {
+                    String keyTail = evt.newEntry().key().toString().substring(INTERNAL_PREFIX.length());
+
+                    UUID tblId = UUID.fromString(keyTail.substring(0, keyTail.indexOf('.')));
 
-        this.schema = new SchemaDescriptor(1,
-            new Column[] {
-                new Column("key", NativeType.LONG, false)
-            },
-            new Column[] {
-                new Column("value", NativeType.LONG, false)
+                    int schemaIdVal = Integer.parseInt(keyTail.substring(keyTail.indexOf('.'), keyTail.length() - 1));
+
+                    if (evt.newEntry().value() == null)
+                        schemes.computeIfPresent(tblId, (key, val) -> val.version() == schemaIdVal ? null : val);
+                    else {
+                        schemes.compute(tblId, (key, val) -> val == null || schemaIdVal > val.version() ?
+                            (SchemaDescriptor)ByteUtils.fromBytes(evt.newEntry().value()) : val);
+                    }
+                }
+
+                return true;
+            }
+
+            @Override public void onError(@NotNull Throwable e) {
+                LOG.error("Faled to notyfy Schema manager.", e);
             }
-        );
+        });
+    }
+
+    /**
+     * Registers new schema.
+     *
+     * @param tableId Table identifier.
+     * @param desc Schema descriptor.
+     */
+    public CompletableFuture<Boolean> registerSchema(UUID tableId, SchemaDescriptor desc) {
+        int schemaVersion = desc.version();
+
+        return metaStorageManager.invoke(new Key(INTERNAL_PREFIX
+                //Tbale id
+                + tableId + '.'
+                //Schema version
+                + schemaVersion),
+            Conditions.value().eq(null),
+            Operations.put(ByteUtils.toBytes(desc)),
+            Operations.noop());
+    }
+
+    /**
+     * Unregistered all schemas associated with a table identifier.
+     *
+     * @param tableId Table identifier.
+     * @return Future which will complete when all versions of schema will be unregistered.

Review comment:
       ```suggestion
        * @return Future which will be completed when all versions of schema will be unregistered.
   ```

##########
File path: modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaManager.java
##########
@@ -17,36 +17,138 @@
 
 package org.apache.ignite.internal.schema;
 
+import java.util.ArrayList;
+import java.util.Map;
 import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
 import org.apache.ignite.configuration.internal.ConfigurationManager;
+import org.apache.ignite.internal.metastorage.MetaStorageManager;
+import org.apache.ignite.internal.util.ByteUtils;
+import org.apache.ignite.internal.vault.VaultManager;
+import org.apache.ignite.lang.ByteArray;
+import org.apache.ignite.lang.IgniteLogger;
+import org.apache.ignite.metastorage.common.Conditions;
+import org.apache.ignite.metastorage.common.Cursor;
+import org.apache.ignite.metastorage.common.Entry;
+import org.apache.ignite.metastorage.common.Key;
+import org.apache.ignite.metastorage.common.Operations;
+import org.apache.ignite.metastorage.common.WatchEvent;
+import org.apache.ignite.metastorage.common.WatchListener;
+import org.jetbrains.annotations.NotNull;
 
 /**
  * Schema Manager.
  */
-// TODO: IGNITE-14586 Remove @SuppressWarnings when implementation provided.
-@SuppressWarnings({"FieldCanBeLocal", "unused"}) public class SchemaManager {
-    /** Configuration manager in order to handle and listen schema specific configuration.*/
+public class SchemaManager {
+    /** The logger. */
+    private static final IgniteLogger LOG = IgniteLogger.forClass(SchemaManager.class);
+
+    /** Internal prefix for the metasorage. */
+    private static final String INTERNAL_PREFIX = "internal.tables.schema.";
+
+    /** Configuration manager in order to handle and listen schema specific configuration. */
     private final ConfigurationManager configurationMgr;
 
+    /** Metastorage manager. */
+    private final MetaStorageManager metaStorageManager;
+
+    /** Vault manager. */
+    private final VaultManager vaultManager;
+
     /** Schema. */
-    private final SchemaDescriptor schema;
+    private final Map<UUID, SchemaDescriptor> schemes = new ConcurrentHashMap<>();
 
     /**
      * The constructor.
      *
      * @param configurationMgr Configuration manager.
+     * @param metaStorageManager Metastorage manager.
+     * @param vaultManager Vault manager.
      */
-    public SchemaManager(ConfigurationManager configurationMgr) {
+    public SchemaManager(
+        ConfigurationManager configurationMgr,
+        MetaStorageManager metaStorageManager,
+        VaultManager vaultManager
+    ) {
         this.configurationMgr = configurationMgr;
+        this.metaStorageManager = metaStorageManager;
+        this.vaultManager = vaultManager;
+
+//        this.schema = new SchemaDescriptor(1,

Review comment:
       Do we need this commented code?

##########
File path: modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaManager.java
##########
@@ -69,8 +171,23 @@ public SchemaDescriptor schema(UUID tableId) {
     public SchemaDescriptor schema(UUID tableId, long ver) {
         assert ver >= 0;
 
-        assert schema.version() == ver;
+        SchemaDescriptor schema = schemes.get(tableId);
+
+        if (schema != null) {
+            assert ver <= schema.version();
+
+            if (ver == schema.version())
+                return schema;
+
+            return (SchemaDescriptor)ByteUtils.fromBytes(
+                vaultManager.get(ByteArray.fromString(INTERNAL_PREFIX
+                    //Tbale id

Review comment:
       The same as I said above about comments




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite-3] vldpyatkov commented on a change in pull request #115: IGNITE-14236 Provide a new version of cache API

Posted by GitBox <gi...@apache.org>.
vldpyatkov commented on a change in pull request #115:
URL: https://github.com/apache/ignite-3/pull/115#discussion_r623815383



##########
File path: modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaManager.java
##########
@@ -69,8 +171,23 @@ public SchemaDescriptor schema(UUID tableId) {
     public SchemaDescriptor schema(UUID tableId, long ver) {
         assert ver >= 0;
 
-        assert schema.version() == ver;
+        SchemaDescriptor schema = schemes.get(tableId);
+
+        if (schema != null) {
+            assert ver <= schema.version();
+
+            if (ver == schema.version())
+                return schema;
+
+            return (SchemaDescriptor)ByteUtils.fromBytes(
+                vaultManager.get(ByteArray.fromString(INTERNAL_PREFIX
+                    //Tbale id

Review comment:
       Removed this comment.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite-3] vldpyatkov closed pull request #115: IGNITE-14236 Provide a new version of cache API

Posted by GitBox <gi...@apache.org>.
vldpyatkov closed pull request #115:
URL: https://github.com/apache/ignite-3/pull/115


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite-3] vldpyatkov commented on a change in pull request #115: IGNITE-14236 Provide a new version of cache API

Posted by GitBox <gi...@apache.org>.
vldpyatkov commented on a change in pull request #115:
URL: https://github.com/apache/ignite-3/pull/115#discussion_r623815554



##########
File path: modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaManager.java
##########
@@ -17,36 +17,138 @@
 
 package org.apache.ignite.internal.schema;
 
+import java.util.ArrayList;
+import java.util.Map;
 import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
 import org.apache.ignite.configuration.internal.ConfigurationManager;
+import org.apache.ignite.internal.metastorage.MetaStorageManager;
+import org.apache.ignite.internal.util.ByteUtils;
+import org.apache.ignite.internal.vault.VaultManager;
+import org.apache.ignite.lang.ByteArray;
+import org.apache.ignite.lang.IgniteLogger;
+import org.apache.ignite.metastorage.common.Conditions;
+import org.apache.ignite.metastorage.common.Cursor;
+import org.apache.ignite.metastorage.common.Entry;
+import org.apache.ignite.metastorage.common.Key;
+import org.apache.ignite.metastorage.common.Operations;
+import org.apache.ignite.metastorage.common.WatchEvent;
+import org.apache.ignite.metastorage.common.WatchListener;
+import org.jetbrains.annotations.NotNull;
 
 /**
  * Schema Manager.
  */
-// TODO: IGNITE-14586 Remove @SuppressWarnings when implementation provided.
-@SuppressWarnings({"FieldCanBeLocal", "unused"}) public class SchemaManager {
-    /** Configuration manager in order to handle and listen schema specific configuration.*/
+public class SchemaManager {
+    /** The logger. */
+    private static final IgniteLogger LOG = IgniteLogger.forClass(SchemaManager.class);
+
+    /** Internal prefix for the metasorage. */
+    private static final String INTERNAL_PREFIX = "internal.tables.schema.";
+
+    /** Configuration manager in order to handle and listen schema specific configuration. */
     private final ConfigurationManager configurationMgr;
 
+    /** Metastorage manager. */
+    private final MetaStorageManager metaStorageManager;
+
+    /** Vault manager. */
+    private final VaultManager vaultManager;
+
     /** Schema. */
-    private final SchemaDescriptor schema;
+    private final Map<UUID, SchemaDescriptor> schemes = new ConcurrentHashMap<>();
 
     /**
      * The constructor.
      *
      * @param configurationMgr Configuration manager.
+     * @param metaStorageManager Metastorage manager.
+     * @param vaultManager Vault manager.
      */
-    public SchemaManager(ConfigurationManager configurationMgr) {
+    public SchemaManager(
+        ConfigurationManager configurationMgr,
+        MetaStorageManager metaStorageManager,
+        VaultManager vaultManager
+    ) {
         this.configurationMgr = configurationMgr;
+        this.metaStorageManager = metaStorageManager;
+        this.vaultManager = vaultManager;
+
+//        this.schema = new SchemaDescriptor(1,
+//            new Column[] {
+//                new Column("key", NativeType.LONG, false)
+//            },
+//            new Column[] {
+//                new Column("value", NativeType.LONG, false)
+//            }
+//        );
+
+        metaStorageManager.registerWatchByPrefix(new Key(INTERNAL_PREFIX), new WatchListener() {
+            @Override public boolean onUpdate(@NotNull Iterable<WatchEvent> events) {
+                for (WatchEvent evt : events) {
+                    String keyTail = evt.newEntry().key().toString().substring(INTERNAL_PREFIX.length());
+
+                    UUID tblId = UUID.fromString(keyTail.substring(0, keyTail.indexOf('.')));
 
-        this.schema = new SchemaDescriptor(1,
-            new Column[] {
-                new Column("key", NativeType.LONG, false)
-            },
-            new Column[] {
-                new Column("value", NativeType.LONG, false)
+                    int schemaIdVal = Integer.parseInt(keyTail.substring(keyTail.indexOf('.'), keyTail.length() - 1));
+
+                    if (evt.newEntry().value() == null)
+                        schemes.computeIfPresent(tblId, (key, val) -> val.version() == schemaIdVal ? null : val);
+                    else {
+                        schemes.compute(tblId, (key, val) -> val == null || schemaIdVal > val.version() ?
+                            (SchemaDescriptor)ByteUtils.fromBytes(evt.newEntry().value()) : val);
+                    }
+                }
+
+                return true;
+            }
+
+            @Override public void onError(@NotNull Throwable e) {
+                LOG.error("Faled to notyfy Schema manager.", e);
             }
-        );
+        });
+    }
+
+    /**
+     * Registers new schema.
+     *
+     * @param tableId Table identifier.
+     * @param desc Schema descriptor.
+     */
+    public CompletableFuture<Boolean> registerSchema(UUID tableId, SchemaDescriptor desc) {
+        int schemaVersion = desc.version();
+
+        return metaStorageManager.invoke(new Key(INTERNAL_PREFIX
+                //Tbale id

Review comment:
       Removed.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org