You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by tk...@apache.org on 2024/02/28 05:12:09 UTC
(ignite-3) branch main updated: IGNITE-21606 Add tuple upgrade during filtering index scans (#3299)
This is an automated email from the ASF dual-hosted git repository.
tkalkirill pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new e4e11c8b19 IGNITE-21606 Add tuple upgrade during filtering index scans (#3299)
e4e11c8b19 is described below
commit e4e11c8b190c28f8d4c2b4231150fc7bf4c0f503
Author: Kirill Tkalenko <tk...@yandex.ru>
AuthorDate: Wed Feb 28 08:12:04 2024 +0300
IGNITE-21606 Add tuple upgrade during filtering index scans (#3299)
---
.../internal/index/ItBuildIndexOneNodeTest.java | 3 -
...xDistributedTestSingleNodeNoCleanupMessage.java | 7 +-
.../internal/table/distributed/TableManager.java | 3 +-
.../replicator/PartitionReplicaListener.java | 113 +++++++++++++++------
.../PartitionReplicaListenerIndexLockingTest.java | 4 +-
.../replication/PartitionReplicaListenerTest.java | 5 +-
.../apache/ignite/distributed/ItTxTestCluster.java | 11 +-
.../table/impl/DummyInternalTableImpl.java | 4 +-
.../table/impl/DummySchemaManagerImpl.java | 59 ++++-------
9 files changed, 128 insertions(+), 81 deletions(-)
diff --git a/modules/index/src/integrationTest/java/org/apache/ignite/internal/index/ItBuildIndexOneNodeTest.java b/modules/index/src/integrationTest/java/org/apache/ignite/internal/index/ItBuildIndexOneNodeTest.java
index 8cf4826abd..9f134aed0c 100644
--- a/modules/index/src/integrationTest/java/org/apache/ignite/internal/index/ItBuildIndexOneNodeTest.java
+++ b/modules/index/src/integrationTest/java/org/apache/ignite/internal/index/ItBuildIndexOneNodeTest.java
@@ -47,7 +47,6 @@ import org.apache.ignite.tx.Transaction;
import org.apache.ignite.tx.TransactionOptions;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
/** Integration test for testing the building of an index in a single node cluster. */
@@ -241,7 +240,6 @@ public class ItBuildIndexOneNodeTest extends BaseSqlIntegrationTest {
.check();
}
- @Disabled("https://issues.apache.org/jira/browse/IGNITE-21606")
@Test
void testBuildingIndexWithUpdateSchema() throws Exception {
createZoneAndTable(ZONE_NAME, TABLE_NAME, 1, 1);
@@ -273,7 +271,6 @@ public class ItBuildIndexOneNodeTest extends BaseSqlIntegrationTest {
.check();
}
- @Disabled("https://issues.apache.org/jira/browse/IGNITE-21606")
@Test
void testBuildingIndexWithUpdateSchemaAfterCreateIndex() throws Exception {
createZoneAndTable(ZONE_NAME, TABLE_NAME, 1, 1);
diff --git a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNodeNoCleanupMessage.java b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNodeNoCleanupMessage.java
index c383cc866c..2ea2fe0d74 100644
--- a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNodeNoCleanupMessage.java
+++ b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNodeNoCleanupMessage.java
@@ -42,6 +42,7 @@ import org.apache.ignite.internal.raft.service.RaftGroupService;
import org.apache.ignite.internal.replicator.ReplicaResult;
import org.apache.ignite.internal.replicator.ReplicaService;
import org.apache.ignite.internal.replicator.message.ReplicaRequest;
+import org.apache.ignite.internal.schema.SchemaRegistry;
import org.apache.ignite.internal.storage.MvPartitionStorage;
import org.apache.ignite.internal.table.TxAbstractTest;
import org.apache.ignite.internal.table.distributed.IndexLocker;
@@ -160,7 +161,8 @@ public class ItTxDistributedTestSingleNodeNoCleanupMessage extends TxAbstractTes
CatalogService catalogService,
PlacementDriver placementDriver,
ClusterNodeResolver clusterNodeResolver,
- RemotelyTriggeredResourceRegistry resourcesRegistry
+ RemotelyTriggeredResourceRegistry resourcesRegistry,
+ SchemaRegistry schemaRegistry
) {
return new PartitionReplicaListener(
mvDataStorage,
@@ -184,7 +186,8 @@ public class ItTxDistributedTestSingleNodeNoCleanupMessage extends TxAbstractTes
catalogService,
placementDriver,
clusterNodeResolver,
- resourcesRegistry
+ resourcesRegistry,
+ schemaRegistry
) {
@Override
public CompletableFuture<ReplicaResult> invoke(ReplicaRequest request, String senderId) {
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index 4ec8d85338..8af5a6dd37 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -990,7 +990,8 @@ public class TableManager implements IgniteTablesInternal, IgniteComponent {
catalogService,
executorInclinedPlacementDriver,
clusterService.topologyService(),
- remotelyTriggeredResourceRegistry
+ remotelyTriggeredResourceRegistry,
+ schemaManager.schemaRegistry(tableId)
);
}
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
index 02bd232158..d675397819 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
@@ -30,6 +30,7 @@ import static org.apache.ignite.internal.table.distributed.replicator.RemoteReso
import static org.apache.ignite.internal.table.distributed.replicator.ReplicatorUtils.beginRwTxTs;
import static org.apache.ignite.internal.table.distributed.replicator.ReplicatorUtils.latestIndexDescriptorInBuildingStatus;
import static org.apache.ignite.internal.table.distributed.replicator.ReplicatorUtils.rwTxActiveCatalogVersion;
+import static org.apache.ignite.internal.tx.TransactionIds.beginTimestamp;
import static org.apache.ignite.internal.tx.TxState.ABANDONED;
import static org.apache.ignite.internal.tx.TxState.ABORTED;
import static org.apache.ignite.internal.tx.TxState.COMMITTED;
@@ -75,6 +76,7 @@ import org.apache.ignite.internal.binarytuple.BinaryTupleCommon;
import org.apache.ignite.internal.catalog.Catalog;
import org.apache.ignite.internal.catalog.CatalogService;
import org.apache.ignite.internal.catalog.descriptors.CatalogIndexDescriptor;
+import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor;
import org.apache.ignite.internal.catalog.events.CatalogEvent;
import org.apache.ignite.internal.catalog.events.CatalogEventParameters;
import org.apache.ignite.internal.catalog.events.StartBuildingIndexEventParameters;
@@ -106,6 +108,7 @@ import org.apache.ignite.internal.replicator.message.ReplicaRequest;
import org.apache.ignite.internal.replicator.message.ReplicaSafeTimeSyncRequest;
import org.apache.ignite.internal.replicator.message.SchemaVersionAwareReplicaRequest;
import org.apache.ignite.internal.schema.BinaryRow;
+import org.apache.ignite.internal.schema.BinaryRowUpgrader;
import org.apache.ignite.internal.schema.BinaryTuple;
import org.apache.ignite.internal.schema.BinaryTuplePrefix;
import org.apache.ignite.internal.schema.NullBinaryRow;
@@ -287,6 +290,8 @@ public class PartitionReplicaListener implements ReplicaListener {
/** Listener for {@link CatalogEvent#INDEX_BUILDING}. */
private final EventListener<CatalogEventParameters> indexBuildingCatalogEventListener = this::onIndexBuilding;
+ private final SchemaRegistry schemaRegistry;
+
/**
* The constructor.
*
@@ -332,7 +337,8 @@ public class PartitionReplicaListener implements ReplicaListener {
CatalogService catalogService,
PlacementDriver placementDriver,
ClusterNodeResolver clusterNodeResolver,
- RemotelyTriggeredResourceRegistry remotelyTriggeredResourceRegistry
+ RemotelyTriggeredResourceRegistry remotelyTriggeredResourceRegistry,
+ SchemaRegistry schemaRegistry
) {
this.mvDataStorage = mvDataStorage;
this.raftClient = raftClient;
@@ -353,6 +359,7 @@ public class PartitionReplicaListener implements ReplicaListener {
this.placementDriver = placementDriver;
this.clusterNodeResolver = clusterNodeResolver;
this.remotelyTriggeredResourceRegistry = remotelyTriggeredResourceRegistry;
+ this.schemaRegistry = schemaRegistry;
this.replicationGroupId = new TablePartitionId(tableId, partId);
@@ -1091,9 +1098,6 @@ public class PartitionReplicaListener implements ReplicaListener {
) {
IndexStorage indexStorage = schemaAwareIndexStorage.storage();
- int batchCount = request.batchSize();
- HybridTimestamp timestamp = request.readTimestamp();
-
FullyQualifiedResourceId cursorId = cursorId(request.transactionId(), request.scanId());
BinaryTuple key = request.exactKey().asBinaryTuple();
@@ -1104,12 +1108,22 @@ public class PartitionReplicaListener implements ReplicaListener {
() -> new CursorResource(indexStorage.get(key))
).cursor();
+ Cursor<IndexRow> indexRowCursor = CursorUtils.map(cursor, rowId -> new IndexRowImpl(key, rowId));
+
+ int batchCount = request.batchSize();
+
var result = new ArrayList<BinaryRow>(batchCount);
- Cursor<IndexRow> indexRowCursor = CursorUtils.map(cursor, rowId -> new IndexRowImpl(key, rowId));
+ HybridTimestamp readTimestamp = request.readTimestamp();
- return continueReadOnlyIndexScan(schemaAwareIndexStorage, indexRowCursor, timestamp, batchCount, result)
- .thenApply(ignore -> closeCursorIfBatchNotFull(result, batchCount, cursorId));
+ return continueReadOnlyIndexScan(
+ schemaAwareIndexStorage,
+ indexRowCursor,
+ readTimestamp,
+ batchCount,
+ result,
+ tableVersionByTs(readTimestamp)
+ ).thenApply(ignore -> closeCursorIfBatchNotFull(result, batchCount, cursorId));
}
private CompletableFuture<List<BinaryRow>> lookupIndex(
@@ -1161,7 +1175,6 @@ public class PartitionReplicaListener implements ReplicaListener {
var indexStorage = (SortedIndexStorage) schemaAwareIndexStorage.storage();
UUID txId = request.transactionId();
- int batchCount = request.batchSize();
FullyQualifiedResourceId cursorId = cursorId(txId, request.scanId());
@@ -1214,11 +1227,20 @@ public class PartitionReplicaListener implements ReplicaListener {
SortedIndexLocker indexLocker = (SortedIndexLocker) indexesLockers.get().get(indexId);
+ int batchCount = request.batchSize();
+
var result = new ArrayList<BinaryRow>(batchCount);
- return continueIndexScan(txId, schemaAwareIndexStorage, indexLocker, cursor, batchCount, result,
- isUpperBoundAchieved)
- .thenApply(ignore -> closeCursorIfBatchNotFull(result, batchCount, cursorId));
+ return continueIndexScan(
+ txId,
+ schemaAwareIndexStorage,
+ indexLocker,
+ cursor,
+ batchCount,
+ result,
+ isUpperBoundAchieved,
+ tableVersionByTs(beginTimestamp(txId))
+ ).thenApply(ignore -> closeCursorIfBatchNotFull(result, batchCount, cursorId));
});
});
}
@@ -1236,11 +1258,7 @@ public class PartitionReplicaListener implements ReplicaListener {
) {
var indexStorage = (SortedIndexStorage) schemaAwareIndexStorage.storage();
- UUID txId = request.transactionId();
- int batchCount = request.batchSize();
- HybridTimestamp timestamp = request.readTimestamp();
-
- FullyQualifiedResourceId cursorId = cursorId(txId, request.scanId());
+ FullyQualifiedResourceId cursorId = cursorId(request.transactionId(), request.scanId());
BinaryTupleMessage lowerBoundMessage = request.lowerBoundPrefix();
BinaryTupleMessage upperBoundMessage = request.upperBoundPrefix();
@@ -1257,18 +1275,29 @@ public class PartitionReplicaListener implements ReplicaListener {
flags
))).cursor();
+ int batchCount = request.batchSize();
+
var result = new ArrayList<BinaryRow>(batchCount);
- return continueReadOnlyIndexScan(schemaAwareIndexStorage, cursor, timestamp, batchCount, result)
- .thenApply(ignore -> closeCursorIfBatchNotFull(result, batchCount, cursorId));
+ HybridTimestamp readTimestamp = request.readTimestamp();
+
+ return continueReadOnlyIndexScan(
+ schemaAwareIndexStorage,
+ cursor,
+ readTimestamp,
+ batchCount,
+ result,
+ tableVersionByTs(readTimestamp)
+ ).thenApply(ignore -> closeCursorIfBatchNotFull(result, batchCount, cursorId));
}
private CompletableFuture<Void> continueReadOnlyIndexScan(
TableSchemaAwareIndexStorage schemaAwareIndexStorage,
Cursor<IndexRow> cursor,
- HybridTimestamp timestamp,
+ HybridTimestamp readTimestamp,
int batchSize,
- List<BinaryRow> result
+ List<BinaryRow> result,
+ int tableVersion
) {
if (result.size() >= batchSize || !cursor.hasNext()) {
return nullCompletedFuture();
@@ -1278,14 +1307,14 @@ public class PartitionReplicaListener implements ReplicaListener {
RowId rowId = indexRow.rowId();
- return resolvePlainReadResult(rowId, null, timestamp).thenComposeAsync(resolvedReadResult -> {
- if (resolvedReadResult != null
- && resolvedReadResult.binaryRow() != null
- && indexRowMatches(indexRow, resolvedReadResult.binaryRow(), schemaAwareIndexStorage)) {
- result.add(resolvedReadResult.binaryRow());
+ return resolvePlainReadResult(rowId, null, readTimestamp).thenComposeAsync(resolvedReadResult -> {
+ BinaryRow binaryRow = upgrage(binaryRow(resolvedReadResult), tableVersion);
+
+ if (binaryRow != null && indexRowMatches(indexRow, binaryRow, schemaAwareIndexStorage)) {
+ result.add(binaryRow);
}
- return continueReadOnlyIndexScan(schemaAwareIndexStorage, cursor, timestamp, batchSize, result);
+ return continueReadOnlyIndexScan(schemaAwareIndexStorage, cursor, readTimestamp, batchSize, result, tableVersion);
}, scanRequestExecutor);
}
@@ -1299,6 +1328,7 @@ public class PartitionReplicaListener implements ReplicaListener {
* @param batchSize Batch size.
* @param result Result collection.
* @param isUpperBoundAchieved Function to stop on upper bound.
+ * @param tableVersion Table schema version at begin timestamp.
* @return Future.
*/
private CompletableFuture<Void> continueIndexScan(
@@ -1308,7 +1338,8 @@ public class PartitionReplicaListener implements ReplicaListener {
Cursor<IndexRow> indexCursor,
int batchSize,
List<BinaryRow> result,
- Predicate<IndexRow> isUpperBoundAchieved
+ Predicate<IndexRow> isUpperBoundAchieved,
+ int tableVersion
) {
if (result.size() == batchSize) { // Batch is full, exit loop.
return nullCompletedFuture();
@@ -1325,9 +1356,9 @@ public class PartitionReplicaListener implements ReplicaListener {
return lockManager.acquire(txId, new LockKey(tableId(), rowId), LockMode.S)
.thenComposeAsync(rowLock -> { // Table row S lock
return resolvePlainReadResult(rowId, txId).thenCompose(resolvedReadResult -> {
- if (resolvedReadResult != null
- && resolvedReadResult.binaryRow() != null
- && indexRowMatches(currentRow, resolvedReadResult.binaryRow(), schemaAwareIndexStorage)) {
+ BinaryRow binaryRow = upgrage(binaryRow(resolvedReadResult), tableVersion);
+
+ if (binaryRow != null && indexRowMatches(currentRow, binaryRow, schemaAwareIndexStorage)) {
result.add(resolvedReadResult.binaryRow());
}
@@ -1339,7 +1370,8 @@ public class PartitionReplicaListener implements ReplicaListener {
indexCursor,
batchSize,
result,
- isUpperBoundAchieved
+ isUpperBoundAchieved,
+ tableVersion
);
});
}, scanRequestExecutor);
@@ -1355,7 +1387,6 @@ public class PartitionReplicaListener implements ReplicaListener {
* @return {@code true} if index row matches the binary row, {@code false} otherwise.
*/
private static boolean indexRowMatches(IndexRow indexRow, BinaryRow binaryRow, TableSchemaAwareIndexStorage schemaAwareIndexStorage) {
- // TODO: IGNITE-21606 It is necessary to upgrade the tuple to the required schema version
BinaryTuple actualIndexRow = schemaAwareIndexStorage.indexRowResolver().extractColumns(binaryRow);
return indexRow.indexColumns().byteBuffer().equals(actualIndexRow.byteBuffer());
@@ -3814,4 +3845,22 @@ public class PartitionReplicaListener implements ReplicaListener {
return hybridTimestamp(catalog.time());
}
+
+ private int tableVersionByTs(HybridTimestamp ts) {
+ int activeCatalogVersion = catalogService.activeCatalogVersion(ts.longValue());
+
+ CatalogTableDescriptor table = catalogService.table(tableId(), activeCatalogVersion);
+
+ assert table != null : "tableId=" + tableId() + ", catalogVersion=" + activeCatalogVersion;
+
+ return table.tableVersion();
+ }
+
+ private static @Nullable BinaryRow binaryRow(@Nullable TimedBinaryRow timedBinaryRow) {
+ return timedBinaryRow == null ? null : timedBinaryRow.binaryRow();
+ }
+
+ private @Nullable BinaryRow upgrage(@Nullable BinaryRow source, int targetSchemaVersion) {
+ return source == null ? null : new BinaryRowUpgrader(schemaRegistry, targetSchemaVersion).upgrade(source);
+ }
}
diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerIndexLockingTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerIndexLockingTest.java
index 09ab21b745..7d3cfcb7b1 100644
--- a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerIndexLockingTest.java
+++ b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerIndexLockingTest.java
@@ -214,6 +214,7 @@ public class PartitionReplicaListenerIndexLockingTest extends IgniteAbstractTest
when(tableDescriptor.tableVersion()).thenReturn(schemaDescriptor.version());
when(catalogService.table(anyInt(), anyLong())).thenReturn(tableDescriptor);
+ when(catalogService.table(anyInt(), anyInt())).thenReturn(tableDescriptor);
CatalogIndexDescriptor indexDescriptor = mock(CatalogIndexDescriptor.class);
when(indexDescriptor.id()).thenReturn(PK_INDEX_ID);
@@ -256,7 +257,8 @@ public class PartitionReplicaListenerIndexLockingTest extends IgniteAbstractTest
catalogService,
new TestPlacementDriver(localNode),
mock(ClusterNodeResolver.class),
- new RemotelyTriggeredResourceRegistry()
+ new RemotelyTriggeredResourceRegistry(),
+ schemaManager
);
kvMarshaller = new ReflectionMarshallerFactory().create(schemaDescriptor, Integer.class, Integer.class);
diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
index 0f09e614e1..8ecf0ac46f 100644
--- a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
+++ b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
@@ -173,6 +173,7 @@ import org.apache.ignite.internal.table.distributed.schema.FullTableSchema;
import org.apache.ignite.internal.table.distributed.schema.SchemaSyncService;
import org.apache.ignite.internal.table.distributed.schema.ValidationSchemasSource;
import org.apache.ignite.internal.table.impl.DummyInternalTableImpl;
+import org.apache.ignite.internal.table.impl.DummySchemaManagerImpl;
import org.apache.ignite.internal.testframework.IgniteAbstractTest;
import org.apache.ignite.internal.tostring.IgniteToStringInclude;
import org.apache.ignite.internal.tostring.S;
@@ -449,6 +450,7 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
when(validationSchemasSource.waitForSchemaAvailability(anyInt(), anyInt())).thenReturn(nullCompletedFuture());
lenient().when(catalogService.table(anyInt(), anyLong())).thenReturn(tableDescriptor);
+ lenient().when(catalogService.table(anyInt(), anyInt())).thenReturn(tableDescriptor);
int pkIndexId = 1;
int sortedIndexId = 2;
@@ -577,7 +579,8 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
catalogService,
placementDriver,
new SingleClusterNodeResolver(localNode),
- new RemotelyTriggeredResourceRegistry()
+ new RemotelyTriggeredResourceRegistry(),
+ new DummySchemaManagerImpl(schemaDescriptor, schemaDescriptorVersion2)
);
kvMarshaller = marshallerFor(schemaDescriptor);
diff --git a/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java b/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
index e4c6da5eed..fb02f31f39 100644
--- a/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
+++ b/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
@@ -97,6 +97,7 @@ import org.apache.ignite.internal.replicator.TablePartitionId;
import org.apache.ignite.internal.schema.BinaryRowConverter;
import org.apache.ignite.internal.schema.ColumnsExtractor;
import org.apache.ignite.internal.schema.SchemaDescriptor;
+import org.apache.ignite.internal.schema.SchemaRegistry;
import org.apache.ignite.internal.schema.configuration.StorageUpdateConfiguration;
import org.apache.ignite.internal.storage.MvPartitionStorage;
import org.apache.ignite.internal.storage.engine.MvTableStorage;
@@ -473,6 +474,7 @@ public class ItTxTestCluster {
when(tableDescriptor.tableVersion()).thenReturn(SCHEMA_VERSION);
lenient().when(catalogService.table(eq(tableId), anyLong())).thenReturn(tableDescriptor);
+ lenient().when(catalogService.table(eq(tableId), anyInt())).thenReturn(tableDescriptor);
List<Set<Assignment>> calculatedAssignments = AffinityUtils.calculateAssignments(
cluster.stream().map(node -> node.topologyService().localMember().name()).collect(toList()),
@@ -615,7 +617,8 @@ public class ItTxTestCluster {
catalogService,
placementDriver,
nodeResolver,
- cursorRegistries.get(assignment)
+ cursorRegistries.get(assignment),
+ schemaManager
);
replicaManagers.get(assignment).startReplica(
@@ -714,7 +717,8 @@ public class ItTxTestCluster {
CatalogService catalogService,
PlacementDriver placementDriver,
ClusterNodeResolver clusterNodeResolver,
- RemotelyTriggeredResourceRegistry resourcesRegistry
+ RemotelyTriggeredResourceRegistry resourcesRegistry,
+ SchemaRegistry schemaRegistry
) {
return new PartitionReplicaListener(
mvDataStorage,
@@ -738,7 +742,8 @@ public class ItTxTestCluster {
catalogService,
placementDriver,
clusterNodeResolver,
- resourcesRegistry
+ resourcesRegistry,
+ schemaRegistry
);
}
diff --git a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
index 1ae5e6cc71..472ed405f1 100644
--- a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
+++ b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
@@ -364,6 +364,7 @@ public class DummyInternalTableImpl extends InternalTableImpl {
CatalogTableDescriptor tableDescriptor = mock(CatalogTableDescriptor.class);
lenient().when(catalogService.table(anyInt(), anyLong())).thenReturn(tableDescriptor);
+ lenient().when(catalogService.table(anyInt(), anyInt())).thenReturn(tableDescriptor);
lenient().when(tableDescriptor.tableVersion()).thenReturn(1);
CatalogIndexDescriptor indexDescriptor = mock(CatalogIndexDescriptor.class);
@@ -393,7 +394,8 @@ public class DummyInternalTableImpl extends InternalTableImpl {
catalogService,
new TestPlacementDriver(LOCAL_NODE),
mock(ClusterNodeResolver.class),
- resourcesRegistry
+ resourcesRegistry,
+ schemaManager
);
partitionListener = new PartitionListener(
diff --git a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummySchemaManagerImpl.java b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummySchemaManagerImpl.java
index da412a24a6..4717b88816 100644
--- a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummySchemaManagerImpl.java
+++ b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummySchemaManagerImpl.java
@@ -18,86 +18,71 @@
package org.apache.ignite.internal.table.impl;
import static java.util.concurrent.CompletableFuture.completedFuture;
+import static java.util.function.Function.identity;
import static java.util.stream.Collectors.toList;
+import static java.util.stream.Collectors.toMap;
+import static org.apache.ignite.internal.util.ArrayUtils.nullOrEmpty;
import java.util.Collection;
import java.util.List;
+import java.util.NavigableMap;
+import java.util.TreeMap;
import java.util.concurrent.CompletableFuture;
+import java.util.stream.Stream;
import org.apache.ignite.internal.schema.BinaryRow;
import org.apache.ignite.internal.schema.SchemaDescriptor;
import org.apache.ignite.internal.schema.SchemaRegistry;
import org.apache.ignite.internal.schema.row.Row;
-/**
- * Dummy schema manager for tests.
- */
+/** Dummy schema manager for tests. */
public class DummySchemaManagerImpl implements SchemaRegistry {
- /** Schema. */
- private final SchemaDescriptor schema;
-
- /**
- * Constructor.
- *
- * @param schema Schema descriptor.
- */
- public DummySchemaManagerImpl(SchemaDescriptor schema) {
- assert schema != null;
-
- this.schema = schema;
+ private final NavigableMap<Integer, SchemaDescriptor> schemaByVersion;
+
+ /** Constructor. */
+ public DummySchemaManagerImpl(SchemaDescriptor... schemas) {
+ assert !nullOrEmpty(schemas);
+
+ schemaByVersion = Stream.of(schemas).collect(toMap(SchemaDescriptor::version, identity(), (o, o2) -> o2, TreeMap::new));
}
- /** {@inheritDoc} */
@Override
public SchemaDescriptor lastKnownSchema() {
- return schema;
+ return schemaByVersion.lastEntry().getValue();
}
- /** {@inheritDoc} */
@Override
public SchemaDescriptor schema(int version) {
- assert version >= 0;
- assert schema.version() == version;
+ assert schemaByVersion.containsKey(version) : version;
- return schema;
+ return schemaByVersion.get(version);
}
@Override
public CompletableFuture<SchemaDescriptor> schemaAsync(int version) {
- assert version >= 0;
- assert schema.version() == version;
-
- return completedFuture(schema);
+ return completedFuture(schema(version));
}
- /** {@inheritDoc} */
@Override
public int lastKnownSchemaVersion() {
- return schema.version();
+ return lastKnownSchema().version();
}
- /** {@inheritDoc} */
@Override
public Row resolve(BinaryRow row, SchemaDescriptor desc) {
return Row.wrapBinaryRow(desc, row);
}
- /** {@inheritDoc} */
@Override
public Row resolve(BinaryRow row, int targetSchemaVersion) {
- assert row.schemaVersion() == schema.version() || row.schemaVersion() == 0;
- assert targetSchemaVersion == row.schemaVersion();
+ assert targetSchemaVersion == row.schemaVersion() : "row=" + row.schemaVersion() + ", target=" + targetSchemaVersion;
- return Row.wrapBinaryRow(schema, row);
+ return Row.wrapBinaryRow(schema(targetSchemaVersion), row);
}
@Override
public List<Row> resolve(Collection<BinaryRow> rows, int targetSchemaVersion) {
- for (BinaryRow row : rows) {
- assert row == null || row.schemaVersion() == targetSchemaVersion;
- }
-
return rows.stream()
- .map(row -> row == null ? null : Row.wrapBinaryRow(schema(row.schemaVersion()), row))
+ .map(row -> row == null ? null : resolve(row, targetSchemaVersion))
.collect(toList());
}