You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by am...@apache.org on 2023/08/30 15:01:41 UTC

[ignite-3] 01/01: Use sync service instead of configuration.

This is an automated email from the ASF dual-hosted git repository.

amashenkov pushed a commit to branch ignite-19768
in repository https://gitbox.apache.org/repos/asf/ignite-3.git

commit 877b4ed8b842e81a863b9eb1eecd59b2499b554d
Author: amashenkov <an...@gmail.com>
AuthorDate: Thu Aug 24 18:48:29 2023 +0300

    Use sync service instead of configuration.
---
 .../table/distributed/ConfiguredTablesCache.java   | 130 -----------------
 .../internal/table/distributed/TableManager.java   | 154 ++++++++-------------
 .../distributed/ConfiguredTablesCacheTest.java     |  85 ------------
 3 files changed, 56 insertions(+), 313 deletions(-)

diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/ConfiguredTablesCache.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/ConfiguredTablesCache.java
deleted file mode 100644
index c1b0a6e2c9..0000000000
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/ConfiguredTablesCache.java
+++ /dev/null
@@ -1,130 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.table.distributed;
-
-import it.unimi.dsi.fastutil.ints.IntRBTreeSet;
-import it.unimi.dsi.fastutil.ints.IntSet;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-import java.util.function.IntFunction;
-import org.apache.ignite.internal.schema.configuration.TableView;
-import org.apache.ignite.internal.schema.configuration.TablesConfiguration;
-import org.apache.ignite.internal.schema.configuration.TablesView;
-
-/**
- * Caches IDs of the tables that are currently configured in a way consistent with creation/removal
- * of tables: that is, if the set of configured table IDs changes externally (in the configuration),
- * the cache will not return a stale result.
- */
-// TODO: IGNITE-19226 - remove this
-class ConfiguredTablesCache {
-    private static final int NO_GENERATION = Integer.MIN_VALUE;
-
-    private final TablesConfiguration tablesConfig;
-    private final boolean getMetadataLocallyOnly;
-
-    private int cachedGeneration = NO_GENERATION;
-    private final IntSet configuredTableIds = new IntRBTreeSet();
-
-    private final ReadWriteLock lock = new ReentrantReadWriteLock();
-
-    private final IntFunction<Boolean> isTableConfigured = configuredTableIds::contains;
-
-    private final IntFunction<List<Integer>> getConfiguredTableIds = unused -> new ArrayList<>(configuredTableIds);
-
-    ConfiguredTablesCache(TablesConfiguration tablesConfig, boolean getMetadataLocallyOnly) {
-        this.tablesConfig = tablesConfig;
-        this.getMetadataLocallyOnly = getMetadataLocallyOnly;
-    }
-
-    /**
-     * Returns whether the table is present in the configuration.
-     *
-     * @param tableId ID of the table.
-     * @return Whether the table is present in the configuration.
-     */
-    public boolean isTableConfigured(int tableId) {
-        return getConsistently(tableId, isTableConfigured);
-    }
-
-    /**
-     * Returns all configured table IDs.
-     *
-     * @return All configured table IDs.
-     */
-    public List<Integer> configuredTableIds() {
-        return getConsistently(0, getConfiguredTableIds);
-    }
-
-    private <T> T getConsistently(int intArg, IntFunction<T> getter) {
-        int currentGeneration = getCurrentGeneration();
-
-        lock.readLock().lock();
-
-        try {
-            if (cachedGenerationMatches(currentGeneration)) {
-                return getter.apply(intArg);
-            }
-        } finally {
-            lock.readLock().unlock();
-        }
-
-        lock.writeLock().lock();
-
-        try {
-            // Check again.
-            currentGeneration = getCurrentGeneration();
-
-            if (cachedGenerationMatches(currentGeneration)) {
-                return getter.apply(intArg);
-            }
-
-            refillCache();
-
-            return getter.apply(intArg);
-        } finally {
-            lock.writeLock().unlock();
-        }
-    }
-
-    private Integer getCurrentGeneration() {
-        return tablesConfigOrDirectProxy().tablesGeneration().value();
-    }
-
-    private TablesConfiguration tablesConfigOrDirectProxy() {
-        return getMetadataLocallyOnly ? tablesConfig : tablesConfig.directProxy();
-    }
-
-    private boolean cachedGenerationMatches(int currentGeneration) {
-        return cachedGeneration != NO_GENERATION && cachedGeneration == currentGeneration;
-    }
-
-    private void refillCache() {
-        TablesView tablesView = tablesConfigOrDirectProxy().value();
-
-        configuredTableIds.clear();
-
-        for (TableView tableView : tablesView.tables()) {
-            configuredTableIds.add(tableView.id());
-        }
-
-        cachedGeneration = tablesView.tablesGeneration();
-    }
-}
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index 370dd0f725..8ccad0fb7f 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -23,7 +23,6 @@ import static java.util.concurrent.CompletableFuture.allOf;
 import static java.util.concurrent.CompletableFuture.completedFuture;
 import static java.util.concurrent.CompletableFuture.failedFuture;
 import static java.util.concurrent.CompletableFuture.runAsync;
-import static java.util.concurrent.CompletableFuture.supplyAsync;
 import static java.util.stream.Collectors.toList;
 import static org.apache.ignite.internal.causality.IncrementalVersionedValue.dependingOn;
 import static org.apache.ignite.internal.distributionzones.DistributionZonesUtil.getZoneById;
@@ -82,7 +81,6 @@ import java.util.function.Supplier;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 import org.apache.ignite.configuration.ConfigurationChangeException;
-import org.apache.ignite.configuration.ConfigurationProperty;
 import org.apache.ignite.configuration.notifications.ConfigurationNamedListListener;
 import org.apache.ignite.configuration.notifications.ConfigurationNotificationEvent;
 import org.apache.ignite.internal.affinity.AffinityUtils;
@@ -190,7 +188,6 @@ import org.apache.ignite.lang.ByteArray;
 import org.apache.ignite.lang.IgniteException;
 import org.apache.ignite.lang.IgniteInternalException;
 import org.apache.ignite.lang.IgniteStringFormatter;
-import org.apache.ignite.lang.IgniteSystemProperties;
 import org.apache.ignite.lang.NodeStoppingException;
 import org.apache.ignite.lang.TableAlreadyExistsException;
 import org.apache.ignite.lang.TableNotFoundException;
@@ -223,14 +220,6 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
     private static final int TX_STATE_STORAGE_FLUSH_DELAY = 1000;
     private static final IntSupplier TX_STATE_STORAGE_FLUSH_DELAY_SUPPLIER = () -> TX_STATE_STORAGE_FLUSH_DELAY;
 
-    /**
-     * If this property is set to {@code true} then an attempt to get the configuration property directly from Meta storage will be skipped,
-     * and the local property will be returned.
-     * TODO: IGNITE-16774 This property and overall approach, access configuration directly through Meta storage,
-     * TODO: will be removed after fix of the issue.
-     */
-    private final boolean getMetadataLocallyOnly = IgniteSystemProperties.getBoolean("IGNITE_GET_METADATA_LOCALLY_ONLY");
-
     /** Tables configuration. */
     private final TablesConfiguration tablesCfg;
 
@@ -379,8 +368,6 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
 
     private final IndexBuilder indexBuilder;
 
-    private final ConfiguredTablesCache configuredTablesCache;
-
     private final Marshaller raftCommandsMarshaller;
 
     /**
@@ -509,8 +496,6 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
 
         indexBuilder = new IndexBuilder(nodeName, cpus);
 
-        configuredTablesCache = new ConfiguredTablesCache(tablesCfg, getMetadataLocallyOnly);
-
         raftCommandsMarshaller = new ThreadLocalPartitionCommandsMarshaller(clusterService.serializationRegistry());
     }
 
@@ -1832,51 +1817,19 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
      * @return Future representing pending completion of the operation.
      */
     private CompletableFuture<List<Table>> tablesAsyncInternal() {
-        return supplyAsync(() -> inBusyLock(busyLock, this::directTableIds), ioExecutor)
-                .thenCompose(tableIds -> inBusyLock(busyLock, () -> {
-                    var tableFuts = new CompletableFuture[tableIds.size()];
-
-                    var i = 0;
-
-                    for (int tblId : tableIds) {
-                        tableFuts[i++] = tableAsyncInternal(tblId, false);
-                    }
-
-                    return allOf(tableFuts).thenApply(unused -> inBusyLock(busyLock, () -> {
-                        var tables = new ArrayList<Table>(tableIds.size());
-
-                        for (var fut : tableFuts) {
-                            var table = fut.join();
-
-                            if (table != null) {
-                                tables.add((Table) table);
-                            }
-                        }
-
-                        return tables;
-                    }));
-                }));
+        return schemaSyncService.waitForMetadataCompleteness(clock.now())
+                .thenApply(ignore -> List.copyOf(latestTablesById().values()));
     }
 
     /**
-     * Collects a list of direct table ids.
+     * Return actual table id by given name or {@code null} if table doesn't exist.
      *
-     * @return A list of direct table ids.
+     * @param tableName Table name.
+     * @return Table id or {@code null} if not found.
      */
-    private List<Integer> directTableIds() {
-        return configuredTablesCache.configuredTableIds();
-    }
-
-    /**
-     * Gets direct id of table with {@code tblName}.
-     *
-     * @param tblName Name of the table.
-     * @return Direct id of the table, or {@code null} if the table with the {@code tblName} has not been found.
-     */
-    @Nullable
-    private Integer directTableId(String tblName) {
+    private @Nullable Integer tableNameToId(String tableName) {
         try {
-            TableConfiguration exTblCfg = directProxy(tablesCfg.tables()).get(tblName);
+            TableConfiguration exTblCfg = tablesCfg.tables().get(tableName);
 
             if (exTblCfg == null) {
                 return null;
@@ -1973,7 +1926,7 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
             throw new IgniteException(new NodeStoppingException());
         }
         try {
-            return tableAsyncInternal(id, true);
+            return tableAsyncInternal(id);
         } finally {
             busyLock.leaveBusy();
         }
@@ -2012,44 +1965,70 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
     /**
      * Gets a table by name, if it was created before. Doesn't parse canonical name.
      *
-     * @param name Table name.
+     * @param tableName Table name.
      * @return Future representing pending completion of the {@code TableManager#tableAsyncInternal} operation.
      */
-    public CompletableFuture<TableImpl> tableAsyncInternal(String name) {
+    public CompletableFuture<TableImpl> tableAsyncInternal(String tableName) {
         if (!busyLock.enterBusy()) {
             throw new IgniteException(new NodeStoppingException());
         }
 
         try {
-            return supplyAsync(() -> inBusyLock(busyLock, () -> directTableId(name)), ioExecutor)
-                    .thenCompose(tableId -> inBusyLock(busyLock, () -> {
+            HybridTimestamp now = clock.now();
+
+            return schemaSyncService.waitForMetadataCompleteness(now)
+                    .thenComposeAsync(ignore -> {
+                        Integer tableId = tableNameToId(tableName);
+
                         if (tableId == null) {
                             return completedFuture(null);
                         }
 
-                        return tableAsyncInternal(tableId, false);
-                    }));
+                        // Table with given name found. Wait for table initialization.
+                        return tableReadyFuture(tableId);
+                    }, ioExecutor);
         } finally {
             busyLock.leaveBusy();
         }
     }
 
     /**
-     * Internal method for getting table by id.
+     * Gets a table by id, if it was created before.
+     *
+     * @param tableId Table id.
+     * @return Future representing pending completion of the {@code TableManager#tableAsyncInternal} operation.
+     */
+    public CompletableFuture<TableImpl> tableAsyncInternal(int tableId) {
+        HybridTimestamp now = clock.now();
+
+        return schemaSyncService.waitForMetadataCompleteness(now)
+                .thenComposeAsync(ignore -> {
+                    // Here we are sure metadata is actual.
+                    if (catalogService.table(tableId, now.longValue()) == null) {
+                        return completedFuture(null);
+                    }
+
+                    // But we may need to wait for table initialization.
+                    return tableReadyFuture(tableId);
+                }, ioExecutor);
+    }
+
+    /**
+     * Internal method for getting table by id, without awaiting for actual metadata.
      *
      * @param id Table id.
-     * @param checkConfiguration {@code True} when the method checks a configuration before trying to get a table, {@code false}
-     *         otherwise.
      * @return Future representing pending completion of the operation.
      */
-    public CompletableFuture<TableImpl> tableAsyncInternal(int id, boolean checkConfiguration) {
-        CompletableFuture<Boolean> tblCfgFut = checkConfiguration
-                ? supplyAsync(() -> inBusyLock(busyLock, () -> isTableConfigured(id)), ioExecutor)
-                : completedFuture(true);
+    private CompletableFuture<TableImpl> tableReadyFuture(int id) {
+        if (!busyLock.enterBusy()) {
+            throw new IgniteException(new NodeStoppingException());
+        }
 
-        return tblCfgFut.thenCompose(isCfg -> inBusyLock(busyLock, () -> {
-            if (!isCfg) {
-                return completedFuture(null);
+        try {
+            TableImpl table = tablesByIdVv.latest().get(id);
+
+            if (table != null) {
+                return completedFuture(table);
             }
 
             TableImpl tbl = latestTablesById().get(id);
@@ -2068,10 +2047,10 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
                         if (e != null) {
                             getTblFut.completeExceptionally(e);
                         } else {
-                            TableImpl table = tables.get(id);
+                            TableImpl table0 = tables.get(id);
 
-                            if (table != null) {
-                                getTblFut.complete(table);
+                            if (table0 != null) {
+                                getTblFut.complete(table0);
                             }
                         }
                     });
@@ -2093,17 +2072,9 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
             }
 
             return getTblFut.whenComplete((unused, throwable) -> assignmentsUpdatedVv.removeWhenComplete(tablesListener));
-        }));
-    }
-
-    /**
-     * Checks that the table is configured with specific id.
-     *
-     * @param id Table id.
-     * @return True when the table is configured into cluster, false otherwise.
-     */
-    private boolean isTableConfigured(int id) {
-        return configuredTablesCache.isTableConfigured(id);
+        } finally {
+            busyLock.leaveBusy();
+        }
     }
 
     /**
@@ -2475,19 +2446,6 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
         return new PartitionMover(busyLock, () -> internalTable.partitionRaftGroupService(partId));
     }
 
-    /**
-     * Gets a direct accessor for the configuration distributed property. If the metadata access only locally configured the method will
-     * return local property accessor.
-     *
-     * @param property Distributed configuration property to receive direct access.
-     * @param <T> Type of the property accessor.
-     * @return An accessor for distributive property.
-     * @see #getMetadataLocallyOnly
-     */
-    private <T extends ConfigurationProperty<?>> T directProxy(T property) {
-        return getMetadataLocallyOnly ? property : (T) property.directProxy();
-    }
-
     private static PeersAndLearners configurationFromAssignments(Collection<Assignment> assignments) {
         var peers = new HashSet<String>();
         var learners = new HashSet<String>();
diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/ConfiguredTablesCacheTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/ConfiguredTablesCacheTest.java
deleted file mode 100644
index 65e8d66f58..0000000000
--- a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/ConfiguredTablesCacheTest.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.table.distributed;
-
-import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.Matchers.contains;
-import static org.hamcrest.Matchers.empty;
-import static org.hamcrest.Matchers.is;
-import static org.junit.jupiter.api.Assertions.assertFalse;
-import static org.junit.jupiter.api.Assertions.assertTrue;
-
-import java.util.concurrent.CompletableFuture;
-import org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
-import org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
-import org.apache.ignite.internal.schema.configuration.TablesConfiguration;
-import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.extension.ExtendWith;
-
-@ExtendWith(ConfigurationExtension.class)
-class ConfiguredTablesCacheTest extends BaseIgniteAbstractTest {
-    @InjectConfiguration
-    private TablesConfiguration tablesConfig;
-
-    private ConfiguredTablesCache cache;
-
-    @BeforeEach
-    void createCache() {
-        cache = new ConfiguredTablesCache(tablesConfig, false);
-    }
-
-    @Test
-    void tableIdCheckIsCoherentWithConfigGenerationUpdate() {
-        createTable(1, "t1");
-
-        assertTrue(cache.isTableConfigured(1));
-        assertFalse(cache.isTableConfigured(2));
-
-        createTable(2, "t2");
-
-        assertTrue(cache.isTableConfigured(2));
-    }
-
-    private void createTable(int tableId, String tableName) {
-        CompletableFuture<Void> future = tablesConfig.change(tablesChange -> {
-            tablesChange.changeTables(ch -> ch.create(tableName, tableChange -> {
-                tableChange.changeId(tableId);
-            }));
-
-            tablesChange.changeTablesGeneration(tablesChange.tablesGeneration() + 1);
-        });
-
-        assertThat(future, willCompleteSuccessfully());
-    }
-
-    @Test
-    void tableIdsIsCoherentWithConfigGenerationUpdate() {
-        assertThat(cache.configuredTableIds(), is(empty()));
-
-        createTable(1, "t1");
-
-        assertThat(cache.configuredTableIds(), contains(1));
-
-        createTable(2, "t2");
-
-        assertThat(cache.configuredTableIds(), contains(1, 2));
-    }
-}