You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@ignite.apache.org by GitBox <gi...@apache.org> on 2022/02/22 11:16:04 UTC

[GitHub] [ignite-3] vldpyatkov commented on a change in pull request #666: IGNITE-16390

vldpyatkov commented on a change in pull request #666:
URL: https://github.com/apache/ignite-3/pull/666#discussion_r811762939



##########
File path: modules/core/src/main/java/org/apache/ignite/internal/causality/VersionedValue.java
##########
@@ -0,0 +1,428 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.causality;
+
+import static java.util.concurrent.CompletableFuture.completedFuture;
+
+import java.util.Map.Entry;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentNavigableMap;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.BiConsumer;
+import java.util.function.BiFunction;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.function.Supplier;
+import org.apache.ignite.lang.IgniteStringFormatter;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Parametrized type to store several versions of the value.
+ * A value can be available through the causality token, which is represented by long.
+ *
+ * @param <T> Type of real value.
+ */
+public class VersionedValue<T> {
+    /** Last applied casualty token. */
+    private volatile long actualToken = -1;
+
+    /** Size of stored history. */
+    private final int historySize;
+
+    /** Closure applied on storage revision update. */
+    private final BiConsumer<VersionedValue<T>, Long> storageRevisionUpdating;
+
+    /** Versioned value storage. */
+    private final ConcurrentNavigableMap<Long, CompletableFuture<T>> history = new ConcurrentSkipListMap<>();
+
+    /** Default value supplier. */
+    private final Supplier<T> defaultVal;
+
+    /**
+     * This lock guarantees that the history is not trimming {@link #trimToSize(long)} during getting a value from versioned storage {@link
+     * #get(long)}.
+     */
+    private final ReadWriteLock trimHistoryLock = new ReentrantReadWriteLock();
+
+    /**
+     * Constructor.
+     *
+     * @param storageRevisionUpdating    Closure applied on storage revision update (see {@link #onStorageRevisionUpdate(long)}).
+     * @param observableRevisionUpdater  A closure intended to connect this VersionedValue with a revision updater, that this VersionedValue
+     *                                   should be able to listen to, for receiving storage revision updates. This closure is called once on
+     *                                   a construction of this VersionedValue and accepts a {@code Consumer<Long>} that should be called
+     *                                   on every update of storage revision as a listener. IMPORTANT: Revision update shouldn't happen
+     *                                   concurrently with {@link #set(long, T)} operations.
+     * @param historySize                Size of the history of changes to store, including last applied token.
+     * @param defaultVal                 Supplier of the default value, that is used on {@link #update(long, BiFunction)} to evaluate
+     *                                   the default value if the value is not initialized yet.
+     */
+    public VersionedValue(
+            @Nullable BiConsumer<VersionedValue<T>, Long> storageRevisionUpdating,
+            Consumer<Consumer<Long>> observableRevisionUpdater,
+            int historySize,
+            Supplier<T> defaultVal
+    ) {
+        this.storageRevisionUpdating = storageRevisionUpdating;
+
+        observableRevisionUpdater.accept(this::onStorageRevisionUpdate);
+
+        this.historySize = historySize;
+
+        this.defaultVal = defaultVal;
+    }
+
+    /**
+     * Constructor with default history size that equals 2. See {@link #VersionedValue(BiConsumer, Consumer, int, Supplier)}.
+     *
+     * @param storageRevisionUpdating   Closure applied on storage revision update (see {@link #onStorageRevisionUpdate(long)}.
+     * @param observableRevisionUpdater A closure intended to connect this VersionedValue with a revision updater, that this VersionedValue
+     *                                  should be able to listen to, for receiving storage revision updates. This closure is called once on
+     *                                  a construction of this VersionedValue and accepts a {@code Consumer<Long>} that should be called
+     *                                  on every update of storage revision as a listener. IMPORTANT: Revision update shouldn't happen
+     *                                  concurrently with {@link #set(long, T)} operations.
+     */
+    public VersionedValue(
+            @Nullable BiConsumer<VersionedValue<T>, Long> storageRevisionUpdating,
+            Consumer<Consumer<Long>> observableRevisionUpdater
+    ) {
+        this(storageRevisionUpdating, observableRevisionUpdater, 2, null);
+    }
+
+    /**
+     * Constructor with default history size that equals 2 and no closure. See {@link #VersionedValue(BiConsumer, Consumer, int, Supplier)}.
+     *
+     * @param observableRevisionUpdater A closure intended to connect this VersionedValue with a revision updater, that this VersionedValue
+     *                                  should be able to listen to, for receiving storage revision updates. This closure is called once on
+     *                                  a construction of this VersionedValue and accepts a {@code Consumer<Long>} that should be called
+     *                                  on every update of storage revision as a listener. IMPORTANT: Revision update shouldn't happen
+     *                                  concurrently with {@link #set(long, T)} operations.
+     */
+    public VersionedValue(Consumer<Consumer<Long>> observableRevisionUpdater) {
+        this(null, observableRevisionUpdater);
+    }
+
+    /**
+     * Creates a future for this value and causality token, or returns it if it already exists.
+     *
+     * <p>The returned future is associated with an update having the given causality token and completes when this update is finished
+     * applying.
+     *
+     * @param causalityToken Causality token. Let's assume that the update associated with token N is already applied to this value. Then,
+     *                       if token N is given as an argument, a completed future will be returned. If token N - 1 is given, this method
+     *                       returns the result in the state that is actual for the given token. If the token is strongly outdated, {@link
+     *                       OutdatedTokenException} is thrown. If token N + 1 is given, this method will return a future that will be
+     *                       completed when the update associated with token N + 1 will have been applied. Tokens that greater than N by
+     *                       more than 1 should never be passed.
+     * @return The future.
+     * @throws OutdatedTokenException If outdated token is passed as an argument.
+     */
+    public CompletableFuture<T> get(long causalityToken) throws OutdatedTokenException {
+        if (causalityToken <= actualToken) {
+            return getValueForPreviousToken(causalityToken);
+        }
+
+        trimHistoryLock.readLock().lock();
+
+        try {
+            if (causalityToken <= actualToken) {
+                return getValueForPreviousToken(causalityToken);
+            }
+
+            var fut = new CompletableFuture<T>();
+
+            CompletableFuture<T> previousFut = history.putIfAbsent(causalityToken, fut);
+
+            return previousFut == null ? fut : previousFut;
+        } finally {
+            trimHistoryLock.readLock().unlock();
+        }
+    }
+
+    /**
+     * Locks the given causality token to prevent its deletion from history of every {@link VersionedValue}
+     * that share the same {@link CausalityTokensLockManager} as this.
+     * This method works like read lock in read-write lock concept, allowing multiple {@link VersionedValue} to
+     * acquire lock on same token.
+     *
+     * @param causalityToken Causality token.
+     * @throws OutdatedTokenException If outdated token is passed as an argument.
+     */
+    public void lock(long causalityToken) throws OutdatedTokenException {
+
+    }
+
+    /**
+     * Locks the token and gets the value (see {@link #lock(long)} and {@link #get(long)}).
+     *
+     * @param causalityToken Causality token,
+     * @return The future, see {@link #get(long)}.
+     * @throws OutdatedTokenException If outdated token is passed as an argument.
+     */
+    public CompletableFuture<T> lockAndGet(long causalityToken) throws OutdatedTokenException {
+        return null;
+    }
+
+    /**
+     * Unlocks the token, previously locked using {@link #lock(long)}.
+     *
+     * @param causalityToken Causality token.
+     * @throws OutdatedTokenException If outdated token is passed as an argument.
+     */
+    public void unlock(long causalityToken) throws OutdatedTokenException {

Review comment:
       Remove all method with connected with token locking.

##########
File path: modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/MetaStorageManager.java
##########
@@ -323,6 +330,40 @@ public synchronized void deployWatches() throws NodeStoppingException {
         }
     }
 
+    /**
+     * Retrieves a current revision.
+     *
+     * @return Revision.
+     */
+    public CompletableFuture<Long> revision() {

Review comment:
       I think this method is not related to the issue.

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/causality/VersionedValue.java
##########
@@ -0,0 +1,428 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.causality;
+
+import static java.util.concurrent.CompletableFuture.completedFuture;
+
+import java.util.Map.Entry;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentNavigableMap;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.BiConsumer;
+import java.util.function.BiFunction;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.function.Supplier;
+import org.apache.ignite.lang.IgniteStringFormatter;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Parametrized type to store several versions of the value.
+ * A value can be available through the causality token, which is represented by long.
+ *
+ * @param <T> Type of real value.
+ */
+public class VersionedValue<T> {
+    /** Last applied casualty token. */
+    private volatile long actualToken = -1;
+
+    /** Size of stored history. */
+    private final int historySize;
+
+    /** Closure applied on storage revision update. */
+    private final BiConsumer<VersionedValue<T>, Long> storageRevisionUpdating;
+
+    /** Versioned value storage. */
+    private final ConcurrentNavigableMap<Long, CompletableFuture<T>> history = new ConcurrentSkipListMap<>();
+
+    /** Default value supplier. */
+    private final Supplier<T> defaultVal;
+
+    /**
+     * This lock guarantees that the history is not trimming {@link #trimToSize(long)} during getting a value from versioned storage {@link
+     * #get(long)}.
+     */
+    private final ReadWriteLock trimHistoryLock = new ReentrantReadWriteLock();
+
+    /**
+     * Constructor.
+     *
+     * @param storageRevisionUpdating    Closure applied on storage revision update (see {@link #onStorageRevisionUpdate(long)}).
+     * @param observableRevisionUpdater  A closure intended to connect this VersionedValue with a revision updater, that this VersionedValue
+     *                                   should be able to listen to, for receiving storage revision updates. This closure is called once on
+     *                                   a construction of this VersionedValue and accepts a {@code Consumer<Long>} that should be called
+     *                                   on every update of storage revision as a listener. IMPORTANT: Revision update shouldn't happen
+     *                                   concurrently with {@link #set(long, T)} operations.
+     * @param historySize                Size of the history of changes to store, including last applied token.
+     * @param defaultVal                 Supplier of the default value, that is used on {@link #update(long, BiFunction)} to evaluate
+     *                                   the default value if the value is not initialized yet.
+     */
+    public VersionedValue(
+            @Nullable BiConsumer<VersionedValue<T>, Long> storageRevisionUpdating,
+            Consumer<Consumer<Long>> observableRevisionUpdater,
+            int historySize,
+            Supplier<T> defaultVal
+    ) {
+        this.storageRevisionUpdating = storageRevisionUpdating;
+
+        observableRevisionUpdater.accept(this::onStorageRevisionUpdate);
+
+        this.historySize = historySize;
+
+        this.defaultVal = defaultVal;
+    }
+
+    /**
+     * Constructor with default history size that equals 2. See {@link #VersionedValue(BiConsumer, Consumer, int, Supplier)}.
+     *
+     * @param storageRevisionUpdating   Closure applied on storage revision update (see {@link #onStorageRevisionUpdate(long)}.
+     * @param observableRevisionUpdater A closure intended to connect this VersionedValue with a revision updater, that this VersionedValue
+     *                                  should be able to listen to, for receiving storage revision updates. This closure is called once on
+     *                                  a construction of this VersionedValue and accepts a {@code Consumer<Long>} that should be called
+     *                                  on every update of storage revision as a listener. IMPORTANT: Revision update shouldn't happen
+     *                                  concurrently with {@link #set(long, T)} operations.
+     */
+    public VersionedValue(
+            @Nullable BiConsumer<VersionedValue<T>, Long> storageRevisionUpdating,
+            Consumer<Consumer<Long>> observableRevisionUpdater
+    ) {
+        this(storageRevisionUpdating, observableRevisionUpdater, 2, null);
+    }
+
+    /**
+     * Constructor with default history size that equals 2 and no closure. See {@link #VersionedValue(BiConsumer, Consumer, int, Supplier)}.
+     *
+     * @param observableRevisionUpdater A closure intended to connect this VersionedValue with a revision updater, that this VersionedValue
+     *                                  should be able to listen to, for receiving storage revision updates. This closure is called once on
+     *                                  a construction of this VersionedValue and accepts a {@code Consumer<Long>} that should be called
+     *                                  on every update of storage revision as a listener. IMPORTANT: Revision update shouldn't happen
+     *                                  concurrently with {@link #set(long, T)} operations.
+     */
+    public VersionedValue(Consumer<Consumer<Long>> observableRevisionUpdater) {
+        this(null, observableRevisionUpdater);
+    }
+
+    /**
+     * Creates a future for this value and causality token, or returns it if it already exists.
+     *
+     * <p>The returned future is associated with an update having the given causality token and completes when this update is finished
+     * applying.
+     *
+     * @param causalityToken Causality token. Let's assume that the update associated with token N is already applied to this value. Then,
+     *                       if token N is given as an argument, a completed future will be returned. If token N - 1 is given, this method
+     *                       returns the result in the state that is actual for the given token. If the token is strongly outdated, {@link
+     *                       OutdatedTokenException} is thrown. If token N + 1 is given, this method will return a future that will be
+     *                       completed when the update associated with token N + 1 will have been applied. Tokens that greater than N by
+     *                       more than 1 should never be passed.
+     * @return The future.
+     * @throws OutdatedTokenException If outdated token is passed as an argument.
+     */
+    public CompletableFuture<T> get(long causalityToken) throws OutdatedTokenException {
+        if (causalityToken <= actualToken) {
+            return getValueForPreviousToken(causalityToken);
+        }
+
+        trimHistoryLock.readLock().lock();
+
+        try {
+            if (causalityToken <= actualToken) {
+                return getValueForPreviousToken(causalityToken);
+            }
+
+            var fut = new CompletableFuture<T>();
+
+            CompletableFuture<T> previousFut = history.putIfAbsent(causalityToken, fut);
+
+            return previousFut == null ? fut : previousFut;
+        } finally {
+            trimHistoryLock.readLock().unlock();
+        }
+    }
+
+    /**
+     * Locks the given causality token to prevent its deletion from history of every {@link VersionedValue}
+     * that share the same {@link CausalityTokensLockManager} as this.
+     * This method works like read lock in read-write lock concept, allowing multiple {@link VersionedValue} to
+     * acquire lock on same token.
+     *
+     * @param causalityToken Causality token.
+     * @throws OutdatedTokenException If outdated token is passed as an argument.
+     */
+    public void lock(long causalityToken) throws OutdatedTokenException {

Review comment:
       This method is not required for the task.

##########
File path: modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/SqlSchemaManagerImpl.java
##########
@@ -17,213 +17,376 @@
 
 package org.apache.ignite.internal.sql.engine.schema;
 
-import java.util.Comparator;
+import static java.util.Comparator.comparingInt;
+import static java.util.stream.Collectors.toList;
+
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.stream.Collectors;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
 import org.apache.calcite.schema.Schema;
 import org.apache.calcite.schema.SchemaPlus;
 import org.apache.calcite.schema.Table;
 import org.apache.calcite.schema.impl.AbstractSchema;
 import org.apache.calcite.tools.Frameworks;
+import org.apache.ignite.internal.causality.OutdatedTokenException;
+import org.apache.ignite.internal.causality.VersionedValue;
 import org.apache.ignite.internal.schema.Column;
 import org.apache.ignite.internal.schema.SchemaDescriptor;
 import org.apache.ignite.internal.sql.engine.extension.SqlExtension.ExternalCatalog;
 import org.apache.ignite.internal.sql.engine.extension.SqlExtension.ExternalSchema;
 import org.apache.ignite.internal.table.TableImpl;
-import org.apache.ignite.internal.table.distributed.TableManager;
+import org.apache.ignite.lang.IgniteException;
 import org.apache.ignite.lang.IgniteInternalException;
 import org.apache.ignite.lang.IgniteStringFormatter;
-import org.apache.ignite.lang.NodeStoppingException;
+import org.apache.ignite.lang.PatchedMapView;
 import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
 
 /**
  * Holds actual schema and mutates it on schema change, requested by Ignite.
  */
 public class SqlSchemaManagerImpl implements SqlSchemaManager {
-    private final Map<String, IgniteSchema> igniteSchemas = new HashMap<>();
+    private final VersionedValue<Map<String, IgniteSchema>> schemasVv;
 
-    private final Map<UUID, IgniteTable> tablesById = new ConcurrentHashMap<>();
+    private final VersionedValue<Map<UUID, IgniteTable>> tablesVv;
 
-    private final Map<String, Schema> externalCatalogs = new HashMap<>();
+    private final VersionedValue<Map<String, Schema>> externalCatalogsVv;
 
     private final Runnable onSchemaUpdatedCallback;
 
-    private final TableManager tableManager;
+    private final Consumer<Consumer<Long>> storageRevisionUpdater;
+
+    private final VersionedValue<SchemaPlus> calciteSchemaVv;
 
-    private volatile SchemaPlus calciteSchema;
+    private final Supplier<CompletableFuture<Long>> directMsRevision;
 
     /**
      * Constructor.
      * TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
      */
-    public SqlSchemaManagerImpl(TableManager tableManager, Runnable onSchemaUpdatedCallback) {
+    public SqlSchemaManagerImpl(
+            Consumer<Consumer<Long>> revisionUpdater,
+            Runnable onSchemaUpdatedCallback,
+            Supplier<CompletableFuture<Long>> directMsRevision
+    ) {
         this.onSchemaUpdatedCallback = onSchemaUpdatedCallback;
-        this.tableManager = tableManager;
-
-        SchemaPlus newCalciteSchema = Frameworks.createRootSchema(false);
-        newCalciteSchema.add("PUBLIC", new IgniteSchema("PUBLIC"));
-        calciteSchema = newCalciteSchema;
+        this.storageRevisionUpdater = revisionUpdater;
+
+        schemasVv = new VersionedValue<>(null, storageRevisionUpdater, 2, HashMap::new);
+        tablesVv = new VersionedValue<>(null, storageRevisionUpdater, 2, HashMap::new);
+        externalCatalogsVv = new VersionedValue<>(null, storageRevisionUpdater, 2, HashMap::new);
+        this.directMsRevision = directMsRevision;
+
+        calciteSchemaVv = new VersionedValue<>(null, storageRevisionUpdater, 2, () -> {
+            SchemaPlus newCalciteSchema = Frameworks.createRootSchema(false);
+            newCalciteSchema.add("PUBLIC", new IgniteSchema("PUBLIC"));
+            return newCalciteSchema;
+        });
     }
 
     /** {@inheritDoc} */
     @Override
     public SchemaPlus schema(@Nullable String schema) {
-        return schema != null ? calciteSchema.getSubSchema(schema) : calciteSchema;
+        CompletableFuture<SchemaPlus> fut = directMsRevision.get().thenCompose(token -> {
+            try {
+                return calciteSchemaVv.get(token);
+            } catch (OutdatedTokenException e) {
+                throw new IgniteInternalException(e);
+            }
+        });
+
+        try {
+            return schema != null ? fut.get().getSubSchema(schema) : fut.get();
+        } catch (InterruptedException | ExecutionException e) {
+            throw new IgniteInternalException(e);
+        }
     }
 
     /** {@inheritDoc} */
     @Override
     @NotNull
     public IgniteTable tableById(UUID id) {
-        IgniteTable table = tablesById.get(id);
+        CompletableFuture<Map<UUID, IgniteTable>> fut = directMsRevision.get().thenCompose(token -> {
+            try {
+                return tablesVv.get(token);
+            } catch (OutdatedTokenException e) {
+                throw new IgniteInternalException(e);
+            }
+        });
 
-        // there is a chance that someone tries to resolve table before
-        // the distributed event of that table creation has been processed
-        // by TableManager, so we need to get in sync with the TableManager
-        if (table == null) {
-            ensureTableStructuresCreated(id);
+        IgniteTable table;
 
-            // at this point the table is either null means no such table
-            // really exists or the table itself
-            table = tablesById.get(id);
+        try {
+            table = fut.get().get(id);
+        } catch (InterruptedException | ExecutionException e) {
+            throw new IgniteException(e);
         }
 
         if (table == null) {
             throw new IgniteInternalException(
-                    IgniteStringFormatter.format("Table not found [tableId={}]", id));
+                IgniteStringFormatter.format("Table not found [tableId={}]", id));
         }
 
         return table;
     }
 
-    private void ensureTableStructuresCreated(UUID id) {
-        try {
-            tableManager.table(id);
-        } catch (NodeStoppingException e) {
-            // Discard the exception
-        }
-    }
-
     /**
      * Register an external catalog under given name.
      *
      * @param name Name of the external catalog.
      * @param catalog Catalog to register.
      */
-    public synchronized void registerExternalCatalog(String name, ExternalCatalog catalog) {
-        catalog.schemaNames().forEach(schemaName -> registerExternalSchema(name, schemaName, catalog.schema(schemaName)));
-
-        rebuild();
+    public void registerExternalCatalog(String name, ExternalCatalog catalog, long causalityToken) {
+        CompletableFuture.allOf(
+                catalog.schemaNames().stream()
+                    .map(schemaName -> registerExternalSchema(name, schemaName, catalog.schema(schemaName), causalityToken))
+                    .toArray(a -> new CompletableFuture[catalog.schemaNames().size()])
+        ).thenCompose(v -> {
+            try {
+                return rebuild(causalityToken);
+            } catch (OutdatedTokenException e) {
+                throw new IgniteInternalException(e);
+            }
+        });
     }
 
-    private void registerExternalSchema(String catalogName, String schemaName, ExternalSchema schema) {
-        Map<String, Table> tables = new HashMap<>();
-
-        for (String name : schema.tableNames()) {
-            IgniteTable table = schema.table(name);
-
-            tables.put(name, table);
-            tablesById.put(table.id(), table);
-        }
-
-        SchemaPlus schemaPlus = (SchemaPlus) externalCatalogs.computeIfAbsent(catalogName, n -> Frameworks.createRootSchema(false));
-
-        schemaPlus.add(schemaName, new ExternalSchemaHolder(tables));
+    private CompletableFuture<?> registerExternalSchema(String catalogName, String schemaName, ExternalSchema schema, long causalityToken) {
+        final Map<String, Table> tables = new HashMap<>();
+
+        return tablesVv
+                .update(causalityToken,
+                    tablesByIds -> {
+                        Map<UUID, IgniteTable> tempTables = PatchedMapView.of(tablesByIds, Integer.MAX_VALUE).map();
+
+                        for (String name : schema.tableNames()) {
+                            IgniteTable table = schema.table(name);
+
+                            tables.put(name, table);
+                            tempTables = PatchedMapView.of(tempTables).put(table.id(), table);
+                        }
+
+                        return PatchedMapView.of(tempTables).map();
+                    },
+                    e -> {
+                        throw new IgniteInternalException(e);
+                    }
+                )
+                .thenApply(t -> tables)
+                .thenCompose(t ->
+                    externalCatalogsVv.update(causalityToken,
+                        catalogs -> {
+                            Map<String, Schema> res = PatchedMapView.of(catalogs)
+                                    .computeIfAbsent(catalogName, n -> Frameworks.createRootSchema(false));
+
+                            SchemaPlus schemaPlus = (SchemaPlus) res.get(catalogName);
+                            schemaPlus.add(schemaName, new ExternalSchemaHolder(t));
+
+                            return res;
+                        },
+                        e -> {
+                            throw new IgniteInternalException(e);
+                        }
+                    )
+                );
     }
 
-    public synchronized void onSchemaCreated(String schemaName) {
-        igniteSchemas.putIfAbsent(schemaName, new IgniteSchema(schemaName));
-        rebuild();
+    /**
+     * Schema creation handler.
+     *
+     * @param schemaName Schema name.
+     * @param causalityToken Causality token.
+     */
+    public CompletableFuture<Void> onSchemaCreated(String schemaName, long causalityToken) {
+        return schemasVv
+                .update(
+                    causalityToken,
+                    schemas -> PatchedMapView.of(schemas).putIfAbsent(schemaName, new IgniteSchema(schemaName)),
+                    e -> {
+                        throw new IgniteInternalException(e);
+                    }
+                )
+                .thenCompose(s -> {
+                    try {
+                        return rebuild(causalityToken);
+                    } catch (OutdatedTokenException e) {
+                        throw new IgniteInternalException(e);
+                    }
+                });
     }
 
-    public synchronized void onSchemaDropped(String schemaName) {
-        igniteSchemas.remove(schemaName);
-        rebuild();
+    /**
+     * Schema drop handler.
+     *
+     * @param schemaName Schema name.
+     * @param causalityToken Causality token.
+     */
+    public CompletableFuture<Void> onSchemaDropped(String schemaName, long causalityToken) {
+        return schemasVv
+                .update(
+                    causalityToken,
+                    schemas -> PatchedMapView.of(schemas).remove(schemaName),
+                    e -> {
+                        throw new IgniteInternalException(e);
+                    }
+                )
+                .thenCompose(s -> {
+                    try {
+                        return rebuild(causalityToken);
+                    } catch (OutdatedTokenException e) {
+                        throw new IgniteInternalException(e);
+                    }
+                });
     }
 
     /**
      * OnSqlTypeCreated.
      * TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
      */
-    public synchronized void onTableCreated(
+    public CompletableFuture<Void> onTableCreated(
             String schemaName,
-            TableImpl table
+            TableImpl table,
+            long causalityToken
     ) {
-        IgniteSchema schema = igniteSchemas.computeIfAbsent(schemaName, IgniteSchema::new);
-
-        SchemaDescriptor descriptor = table.schemaView().schema();
-
-        List<ColumnDescriptor> colDescriptors = descriptor.columnNames().stream()
-                .map(descriptor::column)
-                .sorted(Comparator.comparingInt(Column::columnOrder))
-                .map(col -> new ColumnDescriptorImpl(
-                        col.name(),
-                        descriptor.isKeyColumn(col.schemaIndex()),
-                        col.columnOrder(),
-                        col.schemaIndex(),
-                        col.type(),
-                        col::defaultValue
-                ))
-                .collect(Collectors.toList());
-
-        IgniteTableImpl igniteTable = new IgniteTableImpl(
-                new TableDescriptorImpl(colDescriptors),
-                table.internalTable(),
-                table.schemaView()
-        );
-
-        schema.addTable(removeSchema(schemaName, table.name()), igniteTable);
-        tablesById.put(igniteTable.id(), igniteTable);
-
-        rebuild();
+        final AtomicReference<IgniteTableImpl> tableRef = new AtomicReference<>();
+
+        return schemasVv
+                .update(
+                    causalityToken,
+                    schemas -> {
+                        IgniteSchema prevSchema = schemas.computeIfAbsent(schemaName, IgniteSchema::new);
+                        IgniteSchema schema = new IgniteSchema(prevSchema.getName(), prevSchema.getTableMap());
+
+                        SchemaDescriptor descriptor = table.schemaView().schema();
+
+                        List<ColumnDescriptor> colDescriptors = descriptor.columnNames().stream()
+                                .map(descriptor::column)
+                                .sorted(comparingInt(Column::columnOrder))
+                                .map(col -> new ColumnDescriptorImpl(
+                                    col.name(),
+                                    descriptor.isKeyColumn(col.schemaIndex()),
+                                    col.columnOrder(),
+                                    col.schemaIndex(),
+                                    col.type(),
+                                    col::defaultValue
+                                ))
+                                .collect(toList());
+
+                        IgniteTableImpl igniteTable = new IgniteTableImpl(
+                                new TableDescriptorImpl(colDescriptors),
+                                table.internalTable(),
+                                table.schemaView()
+                        );
+
+                        schema.addTable(removeSchema(schemaName, table.name()), igniteTable);
+
+                        tableRef.set(igniteTable);
+
+                        return PatchedMapView.of(schemas).put(schemaName, schema);
+                    },
+                    e -> {
+                        throw new IgniteInternalException(e);
+                    }
+                )
+                .thenApply(s -> tableRef.get())
+                .thenCompose(igniteTable ->
+                    tablesVv
+                        .update(
+                            causalityToken,
+                            tables -> PatchedMapView.of(tables).put(igniteTable.id(), igniteTable),
+                            e -> {
+                                throw new IgniteInternalException(e);
+                            }
+                        )
+                )
+                .thenCompose(t -> {
+                    try {
+                        return rebuild(causalityToken);
+                    } catch (OutdatedTokenException e) {
+                        throw new IgniteInternalException(e);
+                    }
+                });
     }
 
     /**
      * OnSqlTypeUpdated.
      * TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
      */
-    public void onTableUpdated(
+    public CompletableFuture<Void> onTableUpdated(
             String schemaName,
-            TableImpl table
+            TableImpl table,
+            long causalityToken
     ) {
-        onTableCreated(schemaName, table);
+        return onTableCreated(schemaName, table, causalityToken);
     }
 
     /**
      * OnSqlTypeDropped.
      * TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
      */
-    public synchronized void onTableDropped(
+    public CompletableFuture<Void> onTableDropped(
             String schemaName,
-            String tableName
+            String tableName,
+            long causalityToken
     ) {
-        IgniteSchema schema = igniteSchemas.computeIfAbsent(schemaName, IgniteSchema::new);
-
-        InternalIgniteTable table = (InternalIgniteTable) schema.getTable(tableName);
-        if (table != null) {
-            tablesById.remove(table.id());
-            schema.removeTable(tableName);
-        }
-
-        rebuild();
+        final AtomicReference<InternalIgniteTable> tableRef = new AtomicReference<>();
+
+        return schemasVv
+                .update(causalityToken,
+                    schemas -> {
+                        IgniteSchema prevSchema = schemas.computeIfAbsent(schemaName, IgniteSchema::new);
+                        IgniteSchema schema = new IgniteSchema(prevSchema.getName(), prevSchema.getTableMap());
+
+                        InternalIgniteTable table = (InternalIgniteTable) schema.getTable(tableName);
+
+                        if (table != null) {
+                            schema.removeTable(tableName);
+                        }
+
+                        tableRef.set(table);
+
+                        return PatchedMapView.of(schemas).put(schemaName, schema);
+                    },
+                    e -> {
+                        throw new IgniteInternalException(e);
+                    }
+                )
+                .thenApply(s -> tableRef.get())
+                .thenCompose(table ->
+                    tablesVv.update(causalityToken,
+                        tables -> PatchedMapView.of(tables).remove(table.id()),
+                        e -> {
+                            throw new IgniteInternalException(e);
+                        }
+                    )
+                )
+                .thenCompose(t -> {
+                    try {
+                        return rebuild(causalityToken);
+                    } catch (OutdatedTokenException e) {
+                        throw new IgniteInternalException(e);
+                    }
+                });
     }
 
-    private void rebuild() {
+    private CompletableFuture<Void> rebuild(long causalityToken) throws OutdatedTokenException {
         SchemaPlus newCalciteSchema = Frameworks.createRootSchema(false);
 
         newCalciteSchema.add("PUBLIC", new IgniteSchema("PUBLIC"));
 
-        igniteSchemas.forEach(newCalciteSchema::add);
-        externalCatalogs.forEach(newCalciteSchema::add);
-
-        calciteSchema = newCalciteSchema;
+        return CompletableFuture.allOf(
+            schemasVv.get(causalityToken).thenAccept(schemas -> schemas.forEach(newCalciteSchema::add)),
+            externalCatalogsVv.get(causalityToken).thenAccept(catalogs -> catalogs.forEach(newCalciteSchema::add))

Review comment:
       I think, here previous token is required (previous from the causalityToken).

##########
File path: modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/MetaStorageManager.java
##########
@@ -323,6 +330,40 @@ public synchronized void deployWatches() throws NodeStoppingException {
         }
     }
 
+    /**
+     * Retrieves a current revision.
+     *
+     * @return Revision.
+     */
+    public CompletableFuture<Long> revision() {
+        if (!busyLock.enterBusy()) {
+            return CompletableFuture.failedFuture(new NodeStoppingException());
+        }
+
+        try {
+            return metaStorageSvcFut.thenCompose(svc -> svc.revision());
+        } finally {
+            busyLock.leaveBusy();
+        }
+    }
+
+    /**
+     * Retrieves the earliest available revision.
+     *
+     * @return Revision.
+     */
+    public CompletableFuture<Long> earliestRevision() {

Review comment:
       I think this method is not related to the issue.

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/causality/VersionedValue.java
##########
@@ -0,0 +1,428 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.causality;
+
+import static java.util.concurrent.CompletableFuture.completedFuture;
+
+import java.util.Map.Entry;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentNavigableMap;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.BiConsumer;
+import java.util.function.BiFunction;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.function.Supplier;
+import org.apache.ignite.lang.IgniteStringFormatter;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Parametrized type to store several versions of the value.
+ * A value can be available through the causality token, which is represented by long.
+ *
+ * @param <T> Type of real value.
+ */
+public class VersionedValue<T> {
+    /** Last applied casualty token. */
+    private volatile long actualToken = -1;
+
+    /** Size of stored history. */
+    private final int historySize;
+
+    /** Closure applied on storage revision update. */
+    private final BiConsumer<VersionedValue<T>, Long> storageRevisionUpdating;
+
+    /** Versioned value storage. */
+    private final ConcurrentNavigableMap<Long, CompletableFuture<T>> history = new ConcurrentSkipListMap<>();
+
+    /** Default value supplier. */
+    private final Supplier<T> defaultVal;
+
+    /**
+     * This lock guarantees that the history is not trimming {@link #trimToSize(long)} during getting a value from versioned storage {@link
+     * #get(long)}.
+     */
+    private final ReadWriteLock trimHistoryLock = new ReentrantReadWriteLock();
+
+    /**
+     * Constructor.
+     *
+     * @param storageRevisionUpdating    Closure applied on storage revision update (see {@link #onStorageRevisionUpdate(long)}).
+     * @param observableRevisionUpdater  A closure intended to connect this VersionedValue with a revision updater, that this VersionedValue
+     *                                   should be able to listen to, for receiving storage revision updates. This closure is called once on
+     *                                   a construction of this VersionedValue and accepts a {@code Consumer<Long>} that should be called
+     *                                   on every update of storage revision as a listener. IMPORTANT: Revision update shouldn't happen
+     *                                   concurrently with {@link #set(long, T)} operations.
+     * @param historySize                Size of the history of changes to store, including last applied token.
+     * @param defaultVal                 Supplier of the default value, that is used on {@link #update(long, BiFunction)} to evaluate
+     *                                   the default value if the value is not initialized yet.
+     */
+    public VersionedValue(
+            @Nullable BiConsumer<VersionedValue<T>, Long> storageRevisionUpdating,
+            Consumer<Consumer<Long>> observableRevisionUpdater,
+            int historySize,
+            Supplier<T> defaultVal
+    ) {
+        this.storageRevisionUpdating = storageRevisionUpdating;
+
+        observableRevisionUpdater.accept(this::onStorageRevisionUpdate);
+
+        this.historySize = historySize;
+
+        this.defaultVal = defaultVal;
+    }
+
+    /**
+     * Constructor with default history size that equals 2. See {@link #VersionedValue(BiConsumer, Consumer, int, Supplier)}.
+     *
+     * @param storageRevisionUpdating   Closure applied on storage revision update (see {@link #onStorageRevisionUpdate(long)}.
+     * @param observableRevisionUpdater A closure intended to connect this VersionedValue with a revision updater, that this VersionedValue
+     *                                  should be able to listen to, for receiving storage revision updates. This closure is called once on
+     *                                  a construction of this VersionedValue and accepts a {@code Consumer<Long>} that should be called
+     *                                  on every update of storage revision as a listener. IMPORTANT: Revision update shouldn't happen
+     *                                  concurrently with {@link #set(long, T)} operations.
+     */
+    public VersionedValue(
+            @Nullable BiConsumer<VersionedValue<T>, Long> storageRevisionUpdating,
+            Consumer<Consumer<Long>> observableRevisionUpdater
+    ) {
+        this(storageRevisionUpdating, observableRevisionUpdater, 2, null);
+    }
+
+    /**
+     * Constructor with default history size that equals 2 and no closure. See {@link #VersionedValue(BiConsumer, Consumer, int, Supplier)}.
+     *
+     * @param observableRevisionUpdater A closure intended to connect this VersionedValue with a revision updater, that this VersionedValue
+     *                                  should be able to listen to, for receiving storage revision updates. This closure is called once on
+     *                                  a construction of this VersionedValue and accepts a {@code Consumer<Long>} that should be called
+     *                                  on every update of storage revision as a listener. IMPORTANT: Revision update shouldn't happen
+     *                                  concurrently with {@link #set(long, T)} operations.
+     */
+    public VersionedValue(Consumer<Consumer<Long>> observableRevisionUpdater) {
+        this(null, observableRevisionUpdater);
+    }
+
+    /**
+     * Creates a future for this value and causality token, or returns it if it already exists.
+     *
+     * <p>The returned future is associated with an update having the given causality token and completes when this update is finished
+     * applying.
+     *
+     * @param causalityToken Causality token. Let's assume that the update associated with token N is already applied to this value. Then,
+     *                       if token N is given as an argument, a completed future will be returned. If token N - 1 is given, this method
+     *                       returns the result in the state that is actual for the given token. If the token is strongly outdated, {@link
+     *                       OutdatedTokenException} is thrown. If token N + 1 is given, this method will return a future that will be
+     *                       completed when the update associated with token N + 1 will have been applied. Tokens that greater than N by
+     *                       more than 1 should never be passed.
+     * @return The future.
+     * @throws OutdatedTokenException If outdated token is passed as an argument.
+     */
+    public CompletableFuture<T> get(long causalityToken) throws OutdatedTokenException {
+        if (causalityToken <= actualToken) {
+            return getValueForPreviousToken(causalityToken);
+        }
+
+        trimHistoryLock.readLock().lock();
+
+        try {
+            if (causalityToken <= actualToken) {
+                return getValueForPreviousToken(causalityToken);
+            }
+
+            var fut = new CompletableFuture<T>();
+
+            CompletableFuture<T> previousFut = history.putIfAbsent(causalityToken, fut);
+
+            return previousFut == null ? fut : previousFut;
+        } finally {
+            trimHistoryLock.readLock().unlock();
+        }
+    }
+
+    /**
+     * Locks the given causality token to prevent its deletion from history of every {@link VersionedValue}
+     * that share the same {@link CausalityTokensLockManager} as this.
+     * This method works like read lock in read-write lock concept, allowing multiple {@link VersionedValue} to
+     * acquire lock on same token.
+     *
+     * @param causalityToken Causality token.
+     * @throws OutdatedTokenException If outdated token is passed as an argument.
+     */
+    public void lock(long causalityToken) throws OutdatedTokenException {
+
+    }
+
+    /**
+     * Locks the token and gets the value (see {@link #lock(long)} and {@link #get(long)}).
+     *
+     * @param causalityToken Causality token,
+     * @return The future, see {@link #get(long)}.
+     * @throws OutdatedTokenException If outdated token is passed as an argument.
+     */
+    public CompletableFuture<T> lockAndGet(long causalityToken) throws OutdatedTokenException {

Review comment:
       Remove all method with connected with token locking.

##########
File path: modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/IgniteSchema.java
##########
@@ -29,15 +29,25 @@
 public class IgniteSchema extends AbstractSchema {
     private final String schemaName;
 
-    private final Map<String, InternalIgniteTable> tblMap = new ConcurrentHashMap<>();
+    private final Map<String, Table> tblMap;

Review comment:
       Why the Table from Calcite is better than our abstraction?

##########
File path: modules/core/src/main/java/org/apache/ignite/lang/PatchedMapView.java
##########
@@ -0,0 +1,279 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.lang;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.function.Function;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * This map view is constructed from a map and a single change applied to it, like put, remove or clear.
+ * It is not thread-safe.
+ */
+public class PatchedMapView<K, V> implements Map<K, V> {

Review comment:
       Why this structure is really need?
   If it is valuable, I recommend moving it to another issue.

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/causality/CausalityTokensLockManager.java
##########
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.causality;
+
+/**
+ * Lock manager that manages locks on causality tokens between multiple {@code VersionedValues}.
+ * When token is locked using some {@link VersionedValue} (see {@link VersionedValue#lock(long)}),
+ * it will be locked on every {@link VersionedValue} that shares the same {@link CausalityTokensLockManager}.
+ */
+// TODO https://issues.apache.org/jira/browse/IGNITE-16544
+public class CausalityTokensLockManager {

Review comment:
       We still don't have a scenario for lock a token.
   I recommend to remove it from the PR.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org