You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by am...@apache.org on 2023/08/30 15:01:41 UTC
[ignite-3] 01/01: Use sync service instead of configuration.
This is an automated email from the ASF dual-hosted git repository.
amashenkov pushed a commit to branch ignite-19768
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
commit 877b4ed8b842e81a863b9eb1eecd59b2499b554d
Author: amashenkov <an...@gmail.com>
AuthorDate: Thu Aug 24 18:48:29 2023 +0300
Use sync service instead of configuration.
---
.../table/distributed/ConfiguredTablesCache.java | 130 -----------------
.../internal/table/distributed/TableManager.java | 154 ++++++++-------------
.../distributed/ConfiguredTablesCacheTest.java | 85 ------------
3 files changed, 56 insertions(+), 313 deletions(-)
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/ConfiguredTablesCache.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/ConfiguredTablesCache.java
deleted file mode 100644
index c1b0a6e2c9..0000000000
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/ConfiguredTablesCache.java
+++ /dev/null
@@ -1,130 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.table.distributed;
-
-import it.unimi.dsi.fastutil.ints.IntRBTreeSet;
-import it.unimi.dsi.fastutil.ints.IntSet;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-import java.util.function.IntFunction;
-import org.apache.ignite.internal.schema.configuration.TableView;
-import org.apache.ignite.internal.schema.configuration.TablesConfiguration;
-import org.apache.ignite.internal.schema.configuration.TablesView;
-
-/**
- * Caches IDs of the tables that are currently configured in a way consistent with creation/removal
- * of tables: that is, if the set of configured table IDs changes externally (in the configuration),
- * the cache will not return a stale result.
- */
-// TODO: IGNITE-19226 - remove this
-class ConfiguredTablesCache {
- private static final int NO_GENERATION = Integer.MIN_VALUE;
-
- private final TablesConfiguration tablesConfig;
- private final boolean getMetadataLocallyOnly;
-
- private int cachedGeneration = NO_GENERATION;
- private final IntSet configuredTableIds = new IntRBTreeSet();
-
- private final ReadWriteLock lock = new ReentrantReadWriteLock();
-
- private final IntFunction<Boolean> isTableConfigured = configuredTableIds::contains;
-
- private final IntFunction<List<Integer>> getConfiguredTableIds = unused -> new ArrayList<>(configuredTableIds);
-
- ConfiguredTablesCache(TablesConfiguration tablesConfig, boolean getMetadataLocallyOnly) {
- this.tablesConfig = tablesConfig;
- this.getMetadataLocallyOnly = getMetadataLocallyOnly;
- }
-
- /**
- * Returns whether the table is present in the configuration.
- *
- * @param tableId ID of the table.
- * @return Whether the table is present in the configuration.
- */
- public boolean isTableConfigured(int tableId) {
- return getConsistently(tableId, isTableConfigured);
- }
-
- /**
- * Returns all configured table IDs.
- *
- * @return All configured table IDs.
- */
- public List<Integer> configuredTableIds() {
- return getConsistently(0, getConfiguredTableIds);
- }
-
- private <T> T getConsistently(int intArg, IntFunction<T> getter) {
- int currentGeneration = getCurrentGeneration();
-
- lock.readLock().lock();
-
- try {
- if (cachedGenerationMatches(currentGeneration)) {
- return getter.apply(intArg);
- }
- } finally {
- lock.readLock().unlock();
- }
-
- lock.writeLock().lock();
-
- try {
- // Check again.
- currentGeneration = getCurrentGeneration();
-
- if (cachedGenerationMatches(currentGeneration)) {
- return getter.apply(intArg);
- }
-
- refillCache();
-
- return getter.apply(intArg);
- } finally {
- lock.writeLock().unlock();
- }
- }
-
- private Integer getCurrentGeneration() {
- return tablesConfigOrDirectProxy().tablesGeneration().value();
- }
-
- private TablesConfiguration tablesConfigOrDirectProxy() {
- return getMetadataLocallyOnly ? tablesConfig : tablesConfig.directProxy();
- }
-
- private boolean cachedGenerationMatches(int currentGeneration) {
- return cachedGeneration != NO_GENERATION && cachedGeneration == currentGeneration;
- }
-
- private void refillCache() {
- TablesView tablesView = tablesConfigOrDirectProxy().value();
-
- configuredTableIds.clear();
-
- for (TableView tableView : tablesView.tables()) {
- configuredTableIds.add(tableView.id());
- }
-
- cachedGeneration = tablesView.tablesGeneration();
- }
-}
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index 370dd0f725..8ccad0fb7f 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -23,7 +23,6 @@ import static java.util.concurrent.CompletableFuture.allOf;
import static java.util.concurrent.CompletableFuture.completedFuture;
import static java.util.concurrent.CompletableFuture.failedFuture;
import static java.util.concurrent.CompletableFuture.runAsync;
-import static java.util.concurrent.CompletableFuture.supplyAsync;
import static java.util.stream.Collectors.toList;
import static org.apache.ignite.internal.causality.IncrementalVersionedValue.dependingOn;
import static org.apache.ignite.internal.distributionzones.DistributionZonesUtil.getZoneById;
@@ -82,7 +81,6 @@ import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.ignite.configuration.ConfigurationChangeException;
-import org.apache.ignite.configuration.ConfigurationProperty;
import org.apache.ignite.configuration.notifications.ConfigurationNamedListListener;
import org.apache.ignite.configuration.notifications.ConfigurationNotificationEvent;
import org.apache.ignite.internal.affinity.AffinityUtils;
@@ -190,7 +188,6 @@ import org.apache.ignite.lang.ByteArray;
import org.apache.ignite.lang.IgniteException;
import org.apache.ignite.lang.IgniteInternalException;
import org.apache.ignite.lang.IgniteStringFormatter;
-import org.apache.ignite.lang.IgniteSystemProperties;
import org.apache.ignite.lang.NodeStoppingException;
import org.apache.ignite.lang.TableAlreadyExistsException;
import org.apache.ignite.lang.TableNotFoundException;
@@ -223,14 +220,6 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
private static final int TX_STATE_STORAGE_FLUSH_DELAY = 1000;
private static final IntSupplier TX_STATE_STORAGE_FLUSH_DELAY_SUPPLIER = () -> TX_STATE_STORAGE_FLUSH_DELAY;
- /**
- * If this property is set to {@code true} then an attempt to get the configuration property directly from Meta storage will be skipped,
- * and the local property will be returned.
- * TODO: IGNITE-16774 This property and overall approach, access configuration directly through Meta storage,
- * TODO: will be removed after fix of the issue.
- */
- private final boolean getMetadataLocallyOnly = IgniteSystemProperties.getBoolean("IGNITE_GET_METADATA_LOCALLY_ONLY");
-
/** Tables configuration. */
private final TablesConfiguration tablesCfg;
@@ -379,8 +368,6 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
private final IndexBuilder indexBuilder;
- private final ConfiguredTablesCache configuredTablesCache;
-
private final Marshaller raftCommandsMarshaller;
/**
@@ -509,8 +496,6 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
indexBuilder = new IndexBuilder(nodeName, cpus);
- configuredTablesCache = new ConfiguredTablesCache(tablesCfg, getMetadataLocallyOnly);
-
raftCommandsMarshaller = new ThreadLocalPartitionCommandsMarshaller(clusterService.serializationRegistry());
}
@@ -1832,51 +1817,19 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
* @return Future representing pending completion of the operation.
*/
private CompletableFuture<List<Table>> tablesAsyncInternal() {
- return supplyAsync(() -> inBusyLock(busyLock, this::directTableIds), ioExecutor)
- .thenCompose(tableIds -> inBusyLock(busyLock, () -> {
- var tableFuts = new CompletableFuture[tableIds.size()];
-
- var i = 0;
-
- for (int tblId : tableIds) {
- tableFuts[i++] = tableAsyncInternal(tblId, false);
- }
-
- return allOf(tableFuts).thenApply(unused -> inBusyLock(busyLock, () -> {
- var tables = new ArrayList<Table>(tableIds.size());
-
- for (var fut : tableFuts) {
- var table = fut.join();
-
- if (table != null) {
- tables.add((Table) table);
- }
- }
-
- return tables;
- }));
- }));
+ return schemaSyncService.waitForMetadataCompleteness(clock.now())
+ .thenApply(ignore -> List.copyOf(latestTablesById().values()));
}
/**
- * Collects a list of direct table ids.
+ * Return actual table id by given name or {@code null} if table doesn't exist.
*
- * @return A list of direct table ids.
+ * @param tableName Table name.
+ * @return Table id or {@code null} if not found.
*/
- private List<Integer> directTableIds() {
- return configuredTablesCache.configuredTableIds();
- }
-
- /**
- * Gets direct id of table with {@code tblName}.
- *
- * @param tblName Name of the table.
- * @return Direct id of the table, or {@code null} if the table with the {@code tblName} has not been found.
- */
- @Nullable
- private Integer directTableId(String tblName) {
+ private @Nullable Integer tableNameToId(String tableName) {
try {
- TableConfiguration exTblCfg = directProxy(tablesCfg.tables()).get(tblName);
+ TableConfiguration exTblCfg = tablesCfg.tables().get(tableName);
if (exTblCfg == null) {
return null;
@@ -1973,7 +1926,7 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
throw new IgniteException(new NodeStoppingException());
}
try {
- return tableAsyncInternal(id, true);
+ return tableAsyncInternal(id);
} finally {
busyLock.leaveBusy();
}
@@ -2012,44 +1965,70 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
/**
* Gets a table by name, if it was created before. Doesn't parse canonical name.
*
- * @param name Table name.
+ * @param tableName Table name.
* @return Future representing pending completion of the {@code TableManager#tableAsyncInternal} operation.
*/
- public CompletableFuture<TableImpl> tableAsyncInternal(String name) {
+ public CompletableFuture<TableImpl> tableAsyncInternal(String tableName) {
if (!busyLock.enterBusy()) {
throw new IgniteException(new NodeStoppingException());
}
try {
- return supplyAsync(() -> inBusyLock(busyLock, () -> directTableId(name)), ioExecutor)
- .thenCompose(tableId -> inBusyLock(busyLock, () -> {
+ HybridTimestamp now = clock.now();
+
+ return schemaSyncService.waitForMetadataCompleteness(now)
+ .thenComposeAsync(ignore -> {
+ Integer tableId = tableNameToId(tableName);
+
if (tableId == null) {
return completedFuture(null);
}
- return tableAsyncInternal(tableId, false);
- }));
+ // Table with given name found. Wait for table initialization.
+ return tableReadyFuture(tableId);
+ }, ioExecutor);
} finally {
busyLock.leaveBusy();
}
}
/**
- * Internal method for getting table by id.
+ * Gets a table by id, if it was created before.
+ *
+ * @param tableId Table id.
+ * @return Future representing pending completion of the {@code TableManager#tableAsyncInternal} operation.
+ */
+ public CompletableFuture<TableImpl> tableAsyncInternal(int tableId) {
+ HybridTimestamp now = clock.now();
+
+ return schemaSyncService.waitForMetadataCompleteness(now)
+ .thenComposeAsync(ignore -> {
+ // Here we are sure metadata is actual.
+ if (catalogService.table(tableId, now.longValue()) == null) {
+ return completedFuture(null);
+ }
+
+ // But we may need to wait for table initialization.
+ return tableReadyFuture(tableId);
+ }, ioExecutor);
+ }
+
+ /**
+ * Internal method for getting table by id, without awaiting for actual metadata.
*
* @param id Table id.
- * @param checkConfiguration {@code True} when the method checks a configuration before trying to get a table, {@code false}
- * otherwise.
* @return Future representing pending completion of the operation.
*/
- public CompletableFuture<TableImpl> tableAsyncInternal(int id, boolean checkConfiguration) {
- CompletableFuture<Boolean> tblCfgFut = checkConfiguration
- ? supplyAsync(() -> inBusyLock(busyLock, () -> isTableConfigured(id)), ioExecutor)
- : completedFuture(true);
+ private CompletableFuture<TableImpl> tableReadyFuture(int id) {
+ if (!busyLock.enterBusy()) {
+ throw new IgniteException(new NodeStoppingException());
+ }
- return tblCfgFut.thenCompose(isCfg -> inBusyLock(busyLock, () -> {
- if (!isCfg) {
- return completedFuture(null);
+ try {
+ TableImpl table = tablesByIdVv.latest().get(id);
+
+ if (table != null) {
+ return completedFuture(table);
}
TableImpl tbl = latestTablesById().get(id);
@@ -2068,10 +2047,10 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
if (e != null) {
getTblFut.completeExceptionally(e);
} else {
- TableImpl table = tables.get(id);
+ TableImpl table0 = tables.get(id);
- if (table != null) {
- getTblFut.complete(table);
+ if (table0 != null) {
+ getTblFut.complete(table0);
}
}
});
@@ -2093,17 +2072,9 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
}
return getTblFut.whenComplete((unused, throwable) -> assignmentsUpdatedVv.removeWhenComplete(tablesListener));
- }));
- }
-
- /**
- * Checks that the table is configured with specific id.
- *
- * @param id Table id.
- * @return True when the table is configured into cluster, false otherwise.
- */
- private boolean isTableConfigured(int id) {
- return configuredTablesCache.isTableConfigured(id);
+ } finally {
+ busyLock.leaveBusy();
+ }
}
/**
@@ -2475,19 +2446,6 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
return new PartitionMover(busyLock, () -> internalTable.partitionRaftGroupService(partId));
}
- /**
- * Gets a direct accessor for the configuration distributed property. If the metadata access only locally configured the method will
- * return local property accessor.
- *
- * @param property Distributed configuration property to receive direct access.
- * @param <T> Type of the property accessor.
- * @return An accessor for distributive property.
- * @see #getMetadataLocallyOnly
- */
- private <T extends ConfigurationProperty<?>> T directProxy(T property) {
- return getMetadataLocallyOnly ? property : (T) property.directProxy();
- }
-
private static PeersAndLearners configurationFromAssignments(Collection<Assignment> assignments) {
var peers = new HashSet<String>();
var learners = new HashSet<String>();
diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/ConfiguredTablesCacheTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/ConfiguredTablesCacheTest.java
deleted file mode 100644
index 65e8d66f58..0000000000
--- a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/ConfiguredTablesCacheTest.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.table.distributed;
-
-import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.Matchers.contains;
-import static org.hamcrest.Matchers.empty;
-import static org.hamcrest.Matchers.is;
-import static org.junit.jupiter.api.Assertions.assertFalse;
-import static org.junit.jupiter.api.Assertions.assertTrue;
-
-import java.util.concurrent.CompletableFuture;
-import org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
-import org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
-import org.apache.ignite.internal.schema.configuration.TablesConfiguration;
-import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.extension.ExtendWith;
-
-@ExtendWith(ConfigurationExtension.class)
-class ConfiguredTablesCacheTest extends BaseIgniteAbstractTest {
- @InjectConfiguration
- private TablesConfiguration tablesConfig;
-
- private ConfiguredTablesCache cache;
-
- @BeforeEach
- void createCache() {
- cache = new ConfiguredTablesCache(tablesConfig, false);
- }
-
- @Test
- void tableIdCheckIsCoherentWithConfigGenerationUpdate() {
- createTable(1, "t1");
-
- assertTrue(cache.isTableConfigured(1));
- assertFalse(cache.isTableConfigured(2));
-
- createTable(2, "t2");
-
- assertTrue(cache.isTableConfigured(2));
- }
-
- private void createTable(int tableId, String tableName) {
- CompletableFuture<Void> future = tablesConfig.change(tablesChange -> {
- tablesChange.changeTables(ch -> ch.create(tableName, tableChange -> {
- tableChange.changeId(tableId);
- }));
-
- tablesChange.changeTablesGeneration(tablesChange.tablesGeneration() + 1);
- });
-
- assertThat(future, willCompleteSuccessfully());
- }
-
- @Test
- void tableIdsIsCoherentWithConfigGenerationUpdate() {
- assertThat(cache.configuredTableIds(), is(empty()));
-
- createTable(1, "t1");
-
- assertThat(cache.configuredTableIds(), contains(1));
-
- createTable(2, "t2");
-
- assertThat(cache.configuredTableIds(), contains(1, 2));
- }
-}