You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by tk...@apache.org on 2022/10/28 12:55:11 UTC
[ignite-3] branch main updated: IGNITE-17302 add the propagation of the maximal storages' lastAppliedIndex on onSnapshotSave (#1263)
This is an automated email from the ASF dual-hosted git repository.
tkalkirill pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 81e46457eb IGNITE-17302 add the propagation of the maximal storages' lastAppliedIndex on onSnapshotSave (#1263)
81e46457eb is described below
commit 81e46457eb19f347a3f9f3f91855295fc71c4c49
Author: Mirza Aliev <al...@gmail.com>
AuthorDate: Fri Oct 28 15:55:06 2022 +0300
IGNITE-17302 add the propagation of the maximal storages' lastAppliedIndex on onSnapshotSave (#1263)
---
.../table/distributed/raft/PartitionListener.java | 69 ++++++++--
.../snapshot/PartitionSnapshotStorageFactory.java | 12 +-
.../snapshot/outgoing/OutgoingSnapshotReader.java | 8 +-
.../raft/PartitionCommandListenerTest.java | 146 +++++++++++++++++++--
.../PartitionSnapshotStorageFactoryTest.java | 81 ++++++++++++
.../outgoing/OutgoingSnapshotReaderTest.java | 73 +++++++++++
.../state/rocksdb/TxStateRocksDbStorage.java | 7 +-
.../tx/storage/state/test/TestTxStateStorage.java | 35 ++---
8 files changed, 389 insertions(+), 42 deletions(-)
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
index c4818d018a..c440e24a88 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
@@ -31,6 +31,7 @@ import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
import java.util.function.Supplier;
import org.apache.ignite.internal.lock.AutoLockup;
@@ -97,7 +98,6 @@ public class PartitionListener implements RaftGroupListener {
this.indexes = indexes;
}
- /** {@inheritDoc} */
@Override
public void onRead(Iterator<CommandClosure<ReadCommand>> iterator) {
iterator.forEachRemaining((CommandClosure<? extends ReadCommand> clo) -> {
@@ -107,7 +107,6 @@ public class PartitionListener implements RaftGroupListener {
});
}
- /** {@inheritDoc} */
@Override
public void onWrite(Iterator<CommandClosure<WriteCommand>> iterator) {
iterator.forEachRemaining((CommandClosure<? extends WriteCommand> clo) -> {
@@ -115,11 +114,14 @@ public class PartitionListener implements RaftGroupListener {
long commandIndex = clo.index();
- long storageAppliedIndex = storage.lastAppliedIndex();
+ // We choose the minimum applied index, since we choose it (the minimum one) on local recovery so as not to lose the data for
+ // one of the storages.
+ long storagesAppliedIndex = Math.min(storage.lastAppliedIndex(), txStateStorage.lastAppliedIndex());
- assert storageAppliedIndex < commandIndex
- : "Pending write command has a higher index than already processed commands [commandIndex=" + commandIndex
- + ", storageAppliedIndex=" + storageAppliedIndex + ']';
+ assert commandIndex > storagesAppliedIndex :
+ "Write command must have an index greater than that of storages [commandIndex=" + commandIndex
+ + ", mvAppliedIndex=" + storage.lastAppliedIndex()
+ + ", txStateAppliedIndex=" + txStateStorage.lastAppliedIndex() + "]";
try (AutoLockup ignoredPartitionSnapshotsReadLockup = storage.acquirePartitionSnapshotsReadLock()) {
if (command instanceof UpdateCommand) {
@@ -135,6 +137,7 @@ public class PartitionListener implements RaftGroupListener {
} else {
assert false : "Command was not found [cmd=" + command + ']';
}
+
clo.result(null);
} catch (IgniteInternalException e) {
clo.result(e);
@@ -146,8 +149,14 @@ public class PartitionListener implements RaftGroupListener {
* Handler for the {@link UpdateCommand}.
*
* @param cmd Command.
+ * @param commandIndex Index of the RAFT command.
*/
private void handleUpdateCommand(UpdateCommand cmd, long commandIndex) {
+ // Skips the write command because the storage has already executed it.
+ if (commandIndex <= storage.lastAppliedIndex()) {
+ return;
+ }
+
storage.runConsistently(() -> {
BinaryRow row = cmd.getRow();
RowId rowId = cmd.getRowId();
@@ -171,8 +180,14 @@ public class PartitionListener implements RaftGroupListener {
* Handler for the {@link UpdateAllCommand}.
*
* @param cmd Command.
+ * @param commandIndex Index of the RAFT command.
*/
private void handleUpdateAllCommand(UpdateAllCommand cmd, long commandIndex) {
+ // Skips the write command because the storage has already executed it.
+ if (commandIndex <= storage.lastAppliedIndex()) {
+ return;
+ }
+
storage.runConsistently(() -> {
UUID txId = cmd.txId();
Map<RowId, BinaryRow> rowsToUpdate = cmd.getRowsToUpdate();
@@ -191,6 +206,7 @@ public class PartitionListener implements RaftGroupListener {
addToIndexes(row, rowId);
}
}
+
storage.lastAppliedIndex(commandIndex);
return null;
@@ -200,11 +216,16 @@ public class PartitionListener implements RaftGroupListener {
/**
* Handler for the {@link FinishTxCommand}.
*
- * @param cmd Command.
+ * @param cmd Command.
* @param commandIndex Index of the RAFT command.
* @throws IgniteInternalException if an exception occurred during a transaction state change.
*/
private void handleFinishTxCommand(FinishTxCommand cmd, long commandIndex) throws IgniteInternalException {
+ // Skips the write command because the storage has already executed it.
+ if (commandIndex <= txStateStorage.lastAppliedIndex()) {
+ return;
+ }
+
UUID txId = cmd.txId();
TxState stateToSet = cmd.commit() ? TxState.COMMITED : TxState.ABORTED;
@@ -250,8 +271,14 @@ public class PartitionListener implements RaftGroupListener {
* Handler for the {@link TxCleanupCommand}.
*
* @param cmd Command.
+ * @param commandIndex Index of the RAFT command.
*/
private void handleTxCleanupCommand(TxCleanupCommand cmd, long commandIndex) {
+ // Skips the write command because the storage has already executed it.
+ if (commandIndex <= storage.lastAppliedIndex()) {
+ return;
+ }
+
storage.runConsistently(() -> {
UUID txId = cmd.txId();
@@ -283,19 +310,39 @@ public class PartitionListener implements RaftGroupListener {
// No-op.
}
- /** {@inheritDoc} */
@Override
public void onSnapshotSave(Path path, Consumer<Throwable> doneClo) {
- storage.flush();
+ // The max index here is required for local recovery and a possible scenario
+ // of false node failure when we actually have all required data. This might happen because we use the minimal index
+ // among storages on a node restart.
+ // Let's consider a more detailed example:
+ // 1) We don't propagate the maximal lastAppliedIndex among storages, and onSnapshotSave finishes, it leads to the raft log
+ // truncation until the maximal lastAppliedIndex.
+ // 2) Unexpected cluster restart happens.
+ // 3) Local recovery of a node is started, where we request data from the minimal lastAppliedIndex among storages, because
+ // some data for some node might not have been flushed before unexpected cluster restart.
+ // 4) When we try to restore data starting from the minimal lastAppliedIndex, we come to the situation
+ // that a raft node doesn't have such data, because the truncation until the maximal lastAppliedIndex from 1) has happened.
+ // 5) Node cannot finish local recovery.
+ long maxLastAppliedIndex = Math.max(storage.lastAppliedIndex(), txStateStorage.lastAppliedIndex());
+
+ storage.runConsistently(() -> {
+ storage.lastAppliedIndex(maxLastAppliedIndex);
+
+ return null;
+ });
+
+ txStateStorage.lastAppliedIndex(maxLastAppliedIndex);
+
+ CompletableFuture.allOf(storage.flush(), txStateStorage.flush())
+ .whenComplete((unused, throwable) -> doneClo.accept(throwable));
}
- /** {@inheritDoc} */
@Override
public boolean onSnapshotLoad(Path path) {
return true;
}
- /** {@inheritDoc} */
@Override
public void onShutdown() {
// TODO: IGNITE-17958 - probably, we should not close the storage here as PartitionListener did not create the storage.
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/PartitionSnapshotStorageFactory.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/PartitionSnapshotStorageFactory.java
index e0301bf1bf..ea93af3048 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/PartitionSnapshotStorageFactory.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/PartitionSnapshotStorageFactory.java
@@ -26,7 +26,6 @@ import org.apache.ignite.network.TopologyService;
import org.apache.ignite.raft.jraft.RaftMessagesFactory;
import org.apache.ignite.raft.jraft.entity.RaftOutter.SnapshotMeta;
import org.apache.ignite.raft.jraft.option.RaftOptions;
-import org.apache.ignite.raft.jraft.storage.SnapshotStorage;
import org.apache.ignite.raft.jraft.storage.snapshot.SnapshotReader;
import org.apache.ignite.raft.jraft.storage.snapshot.SnapshotWriter;
@@ -56,7 +55,7 @@ public class PartitionSnapshotStorageFactory implements SnapshotStorageFactory {
/** List of learners. */
private final List<String> learners;
- /** RAFT log index read from {@link MvPartitionStorage#persistedIndex()} during factory instantiation. */
+ /** RAFT log index. */
private final long persistedRaftIndex;
/** Incoming snapshots executor. */
@@ -89,11 +88,16 @@ public class PartitionSnapshotStorageFactory implements SnapshotStorageFactory {
this.learners = learners;
this.incomingSnapshotsExecutor = incomingSnapshotsExecutor;
- persistedRaftIndex = partition.mvPartitionStorage().persistedIndex();
+ // We must choose the minimum applied index for local recovery so that we don't skip the raft commands for the storage with the
+ // lowest applied index and thus no data loss occurs.
+ persistedRaftIndex = Math.min(
+ partition.mvPartitionStorage().persistedIndex(),
+ partition.txStatePartitionStorage().persistedIndex()
+ );
}
@Override
- public SnapshotStorage createSnapshotStorage(String uri, RaftOptions raftOptions) {
+ public PartitionSnapshotStorage createSnapshotStorage(String uri, RaftOptions raftOptions) {
SnapshotMeta snapshotMeta = new RaftMessagesFactory().snapshotMeta()
.lastIncludedIndex(persistedRaftIndex)
// According to the code of org.apache.ignite.raft.jraft.core.NodeImpl.bootstrap, it's "dangerous" to init term with a value
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/OutgoingSnapshotReader.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/OutgoingSnapshotReader.java
index d6201de877..7cc345f054 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/OutgoingSnapshotReader.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/OutgoingSnapshotReader.java
@@ -47,10 +47,16 @@ public class OutgoingSnapshotReader extends SnapshotReader {
public OutgoingSnapshotReader(PartitionSnapshotStorage snapshotStorage) {
this.snapshotStorage = snapshotStorage;
+ // We have to choose the maximum applied index because we have to send a snapshot with all the latest changes in the storages.
+ long lastIncludedIndex = Math.max(
+ snapshotStorage.partition().mvPartitionStorage().persistedIndex(),
+ snapshotStorage.partition().txStatePartitionStorage().persistedIndex()
+ );
+
//TODO https://issues.apache.org/jira/browse/IGNITE-17935
// This meta is wrong, we need a right one.
snapshotMeta = new RaftMessagesFactory().snapshotMeta()
- .lastIncludedIndex(snapshotStorage.partition().mvPartitionStorage().persistedIndex())
+ .lastIncludedIndex(lastIncludedIndex)
.lastIncludedTerm(snapshotStorage.startupSnapshotMeta().lastIncludedTerm())
.peersList(snapshotStorage.startupSnapshotMeta().peersList())
.learnersList(snapshotStorage.startupSnapshotMeta().learnersList())
diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/PartitionCommandListenerTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/PartitionCommandListenerTest.java
index c45a5ddd29..4d2e345be6 100644
--- a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/PartitionCommandListenerTest.java
+++ b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/PartitionCommandListenerTest.java
@@ -17,15 +17,25 @@
package org.apache.ignite.internal.table.distributed.raft;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.mockito.Answers.RETURNS_DEEP_STUBS;
+import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
+import java.io.Serializable;
+import java.nio.file.Path;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
@@ -43,6 +53,7 @@ import org.apache.ignite.internal.hlc.HybridClock;
import org.apache.ignite.internal.hlc.HybridClockImpl;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.replicator.ReplicaService;
+import org.apache.ignite.internal.replicator.command.SafeTimeSyncCommand;
import org.apache.ignite.internal.schema.BinaryRow;
import org.apache.ignite.internal.schema.BinaryTuple;
import org.apache.ignite.internal.schema.BinaryTupleSchema;
@@ -53,18 +64,25 @@ import org.apache.ignite.internal.schema.SchemaDescriptor;
import org.apache.ignite.internal.schema.row.Row;
import org.apache.ignite.internal.schema.row.RowAssembler;
import org.apache.ignite.internal.storage.MvPartitionStorage;
+import org.apache.ignite.internal.storage.MvPartitionStorage.WriteClosure;
import org.apache.ignite.internal.storage.ReadResult;
import org.apache.ignite.internal.storage.RowId;
import org.apache.ignite.internal.storage.impl.TestMvPartitionStorage;
import org.apache.ignite.internal.storage.index.impl.TestHashIndexStorage;
import org.apache.ignite.internal.table.distributed.TableSchemaAwareIndexStorage;
+import org.apache.ignite.internal.table.distributed.command.FinishTxCommand;
import org.apache.ignite.internal.table.distributed.command.TxCleanupCommand;
import org.apache.ignite.internal.table.distributed.command.UpdateAllCommand;
import org.apache.ignite.internal.table.distributed.command.UpdateCommand;
import org.apache.ignite.internal.table.distributed.replicator.TablePartitionId;
+import org.apache.ignite.internal.testframework.WorkDirectory;
+import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
import org.apache.ignite.internal.tx.Timestamp;
+import org.apache.ignite.internal.tx.TxMeta;
+import org.apache.ignite.internal.tx.TxState;
import org.apache.ignite.internal.tx.impl.HeapLockManager;
import org.apache.ignite.internal.tx.impl.TxManagerImpl;
+import org.apache.ignite.internal.tx.storage.state.TxStateStorage;
import org.apache.ignite.internal.tx.storage.state.test.TestTxStateStorage;
import org.apache.ignite.internal.util.Cursor;
import org.apache.ignite.network.ClusterService;
@@ -72,14 +90,15 @@ import org.apache.ignite.network.NetworkAddress;
import org.apache.ignite.raft.client.Command;
import org.apache.ignite.raft.client.WriteCommand;
import org.apache.ignite.raft.client.service.CommandClosure;
-import org.jetbrains.annotations.NotNull;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
-import org.mockito.Mockito;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.ArgumentCaptor;
/**
* Tests for the table command listener.
*/
+@ExtendWith(WorkDirectoryExtension.class)
public class PartitionCommandListenerTest {
/** Key count. */
public static final int KEY_COUNT = 100;
@@ -116,22 +135,31 @@ public class PartitionCommandListenerTest {
);
/** Partition storage. */
- private final MvPartitionStorage mvPartitionStorage = new TestMvPartitionStorage(PARTITION_ID);
+ private final MvPartitionStorage mvPartitionStorage = spy(new TestMvPartitionStorage(PARTITION_ID));
+
+ /** Transaction meta storage. */
+ private final TxStateStorage txStateStorage = spy(new TestTxStateStorage());
+
+ /** Work directory. */
+ @WorkDirectory
+ private Path workDir;
/**
* Initializes a table listener before tests.
*/
@BeforeEach
public void before() {
- ClusterService clusterService = Mockito.mock(ClusterService.class, RETURNS_DEEP_STUBS);
NetworkAddress addr = new NetworkAddress("127.0.0.1", 5003);
- Mockito.when(clusterService.topologyService().localMember().address()).thenReturn(addr);
- ReplicaService replicaService = Mockito.mock(ReplicaService.class, RETURNS_DEEP_STUBS);
+ ClusterService clusterService = mock(ClusterService.class, RETURNS_DEEP_STUBS);
+
+ when(clusterService.topologyService().localMember().address()).thenReturn(addr);
+
+ ReplicaService replicaService = mock(ReplicaService.class, RETURNS_DEEP_STUBS);
commandListener = new PartitionListener(
new TestPartitionDataStorage(mvPartitionStorage),
- new TestTxStateStorage(),
+ txStateStorage,
new TxManagerImpl(replicaService, new HeapLockManager(), new HybridClockImpl()),
() -> Map.of(pkStorage.id(), pkStorage)
);
@@ -207,6 +235,108 @@ public class PartitionCommandListenerTest {
readAndCheck(false);
}
+ /**
+ * The test checks that {@link PartitionListener#onSnapshotSave(Path, Consumer)} propagates
+ * the maximal last applied index among storages to all storages.
+ */
+ @Test
+ public void testOnSnapshotSavePropagateLastAppliedIndex() {
+ ReplicaService replicaService = mock(ReplicaService.class, RETURNS_DEEP_STUBS);
+
+ TestPartitionDataStorage partitionDataStorage = new TestPartitionDataStorage(mvPartitionStorage);
+
+ PartitionListener testCommandListener = new PartitionListener(
+ partitionDataStorage,
+ txStateStorage,
+ new TxManagerImpl(replicaService, new HeapLockManager(), new HybridClockImpl()),
+ () -> Map.of(pkStorage.id(), pkStorage)
+ );
+
+ txStateStorage.lastAppliedIndex(3L);
+
+ partitionDataStorage.lastAppliedIndex(5L);
+
+ AtomicLong counter = new AtomicLong(0);
+
+ testCommandListener.onSnapshotSave(workDir, (throwable) -> counter.incrementAndGet());
+
+ assertEquals(1L, counter.get());
+
+ assertEquals(5L, partitionDataStorage.lastAppliedIndex());
+
+ assertEquals(5L, txStateStorage.lastAppliedIndex());
+
+ txStateStorage.lastAppliedIndex(10L);
+
+ partitionDataStorage.lastAppliedIndex(7L);
+
+ testCommandListener.onSnapshotSave(workDir, (throwable) -> counter.incrementAndGet());
+
+ assertEquals(2L, counter.get());
+
+ assertEquals(10L, partitionDataStorage.lastAppliedIndex());
+
+ assertEquals(10L, txStateStorage.lastAppliedIndex());
+ }
+
+ @Test
+ void testSkipWriteCommandByAppliedIndex() {
+ mvPartitionStorage.lastAppliedIndex(10L);
+
+ ArgumentCaptor<Throwable> commandClosureResultCaptor = ArgumentCaptor.forClass(Throwable.class);
+
+ // Checks for MvPartitionStorage.
+ commandListener.onWrite(List.of(
+ writeCommandCommandClosure(3, mock(UpdateCommand.class), commandClosureResultCaptor),
+ writeCommandCommandClosure(10, mock(UpdateCommand.class), commandClosureResultCaptor),
+ writeCommandCommandClosure(4, mock(TxCleanupCommand.class), commandClosureResultCaptor),
+ writeCommandCommandClosure(5, mock(SafeTimeSyncCommand.class), commandClosureResultCaptor)
+ ).iterator());
+
+ verify(mvPartitionStorage, never()).runConsistently(any(WriteClosure.class));
+ verify(mvPartitionStorage, times(1)).lastAppliedIndex(anyLong());
+
+ assertThat(commandClosureResultCaptor.getAllValues(), containsInAnyOrder(new Throwable[]{null, null, null, null}));
+
+ // Checks for TxStateStorage.
+ mvPartitionStorage.lastAppliedIndex(1L);
+ txStateStorage.lastAppliedIndex(10L);
+
+ commandClosureResultCaptor = ArgumentCaptor.forClass(Throwable.class);
+
+ commandListener.onWrite(List.of(
+ writeCommandCommandClosure(2, mock(FinishTxCommand.class), commandClosureResultCaptor),
+ writeCommandCommandClosure(10, mock(FinishTxCommand.class), commandClosureResultCaptor)
+ ).iterator());
+
+ verify(txStateStorage, never()).compareAndSet(any(UUID.class), any(TxState.class), any(TxMeta.class), anyLong());
+ verify(txStateStorage, times(1)).lastAppliedIndex(anyLong());
+
+ assertThat(commandClosureResultCaptor.getAllValues(), containsInAnyOrder(new Throwable[]{null, null}));
+ }
+
+ /**
+ * Crate a command closure.
+ *
+ * @param index Index of the RAFT command.
+ * @param writeCommand Write command.
+ * @param resultClosureCaptor Captor for {@link CommandClosure#result(Serializable)}
+ */
+ private static CommandClosure<WriteCommand> writeCommandCommandClosure(
+ long index,
+ WriteCommand writeCommand,
+ ArgumentCaptor<Throwable> resultClosureCaptor
+ ) {
+ CommandClosure<WriteCommand> commandClosure = mock(CommandClosure.class);
+
+ when(commandClosure.index()).thenReturn(index);
+ when(commandClosure.command()).thenReturn(writeCommand);
+
+ doNothing().when(commandClosure).result(resultClosureCaptor.capture());
+
+ return commandClosure;
+ }
+
/**
* Prepares a closure iterator for a specific batch operation.
*
@@ -451,7 +581,6 @@ public class PartitionCommandListenerTest {
*
* @return Row.
*/
- @NotNull
private Row getTestKey(int key) {
RowAssembler rowBuilder = new RowAssembler(SCHEMA, 0, 0);
@@ -465,7 +594,6 @@ public class PartitionCommandListenerTest {
*
* @return Row.
*/
- @NotNull
private Row getTestRow(int key, int val) {
RowAssembler rowBuilder = new RowAssembler(SCHEMA, 0, 0);
diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/PartitionSnapshotStorageFactoryTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/PartitionSnapshotStorageFactoryTest.java
new file mode 100644
index 0000000000..5fe7f4e324
--- /dev/null
+++ b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/PartitionSnapshotStorageFactoryTest.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.raft.snapshot;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.util.List;
+import java.util.concurrent.Executor;
+import org.apache.ignite.internal.storage.MvPartitionStorage;
+import org.apache.ignite.internal.storage.impl.TestMvPartitionStorage;
+import org.apache.ignite.internal.table.distributed.raft.snapshot.outgoing.OutgoingSnapshotsManager;
+import org.apache.ignite.internal.tx.storage.state.TxStateStorage;
+import org.apache.ignite.internal.tx.storage.state.test.TestTxStateStorage;
+import org.apache.ignite.network.TopologyService;
+import org.apache.ignite.raft.jraft.option.RaftOptions;
+import org.junit.jupiter.api.Test;
+
+/**
+ * For testing {@link PartitionSnapshotStorageFactory}.
+ */
+public class PartitionSnapshotStorageFactoryTest {
+ @Test
+ void testForChoosingMinimumAppliedIndexForMeta() {
+ MvPartitionStorage mvPartitionStorage = new TestMvPartitionStorage(0);
+ TxStateStorage txStateStorage = new TestTxStateStorage();
+
+ PartitionAccess partitionAccess = mock(PartitionAccess.class);
+
+ when(partitionAccess.mvPartitionStorage()).thenReturn(mvPartitionStorage);
+ when(partitionAccess.txStatePartitionStorage()).thenReturn(txStateStorage);
+
+ mvPartitionStorage.lastAppliedIndex(10L);
+ txStateStorage.lastAppliedIndex(5L);
+
+ PartitionSnapshotStorageFactory partitionSnapshotStorageFactory = new PartitionSnapshotStorageFactory(
+ mock(TopologyService.class),
+ mock(OutgoingSnapshotsManager.class),
+ partitionAccess,
+ List.of(),
+ List.of(),
+ mock(Executor.class)
+ );
+
+ PartitionSnapshotStorage snapshotStorage = partitionSnapshotStorageFactory.createSnapshotStorage("", mock(RaftOptions.class));
+
+ assertEquals(5L, snapshotStorage.startupSnapshotMeta().lastIncludedIndex());
+
+ mvPartitionStorage.lastAppliedIndex(1L);
+ txStateStorage.lastAppliedIndex(2L);
+
+ partitionSnapshotStorageFactory = new PartitionSnapshotStorageFactory(
+ mock(TopologyService.class),
+ mock(OutgoingSnapshotsManager.class),
+ partitionAccess,
+ List.of(),
+ List.of(),
+ mock(Executor.class)
+ );
+
+ snapshotStorage = partitionSnapshotStorageFactory.createSnapshotStorage("", mock(RaftOptions.class));
+
+ assertEquals(1L, snapshotStorage.startupSnapshotMeta().lastIncludedIndex());
+ }
+}
diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/OutgoingSnapshotReaderTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/OutgoingSnapshotReaderTest.java
new file mode 100644
index 0000000000..7485acc54c
--- /dev/null
+++ b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/OutgoingSnapshotReaderTest.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.raft.snapshot.outgoing;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.util.UUID;
+import java.util.concurrent.Executor;
+import org.apache.ignite.internal.storage.MvPartitionStorage;
+import org.apache.ignite.internal.storage.impl.TestMvPartitionStorage;
+import org.apache.ignite.internal.table.distributed.raft.snapshot.PartitionAccess;
+import org.apache.ignite.internal.table.distributed.raft.snapshot.PartitionKey;
+import org.apache.ignite.internal.table.distributed.raft.snapshot.PartitionSnapshotStorage;
+import org.apache.ignite.internal.tx.storage.state.TxStateStorage;
+import org.apache.ignite.internal.tx.storage.state.test.TestTxStateStorage;
+import org.apache.ignite.network.TopologyService;
+import org.apache.ignite.raft.jraft.entity.RaftOutter.SnapshotMeta;
+import org.apache.ignite.raft.jraft.option.RaftOptions;
+import org.junit.jupiter.api.Test;
+
+/**
+ * For {@link OutgoingSnapshotReader} testing.
+ */
+public class OutgoingSnapshotReaderTest {
+ @Test
+ void testForChoosingMaximumAppliedIndexForMeta() {
+ MvPartitionStorage mvPartitionStorage = new TestMvPartitionStorage(0);
+ TxStateStorage txStateStorage = new TestTxStateStorage();
+
+ PartitionAccess partitionAccess = mock(PartitionAccess.class);
+
+ when(partitionAccess.mvPartitionStorage()).thenReturn(mvPartitionStorage);
+ when(partitionAccess.txStatePartitionStorage()).thenReturn(txStateStorage);
+ when(partitionAccess.partitionKey()).thenReturn(new PartitionKey(UUID.randomUUID(), 0));
+
+ PartitionSnapshotStorage snapshotStorage = new PartitionSnapshotStorage(
+ mock(TopologyService.class),
+ mock(OutgoingSnapshotsManager.class),
+ "",
+ mock(RaftOptions.class),
+ partitionAccess,
+ mock(SnapshotMeta.class),
+ mock(Executor.class)
+ );
+
+ mvPartitionStorage.lastAppliedIndex(10L);
+ txStateStorage.lastAppliedIndex(5L);
+
+ assertEquals(10L, new OutgoingSnapshotReader(snapshotStorage).load().lastIncludedIndex());
+
+ mvPartitionStorage.lastAppliedIndex(1L);
+ txStateStorage.lastAppliedIndex(2L);
+
+ assertEquals(2L, new OutgoingSnapshotReader(snapshotStorage).load().lastIncludedIndex());
+ }
+}
diff --git a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/storage/state/rocksdb/TxStateRocksDbStorage.java b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/storage/state/rocksdb/TxStateRocksDbStorage.java
index dfc5fc0813..39a7214214 100644
--- a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/storage/state/rocksdb/TxStateRocksDbStorage.java
+++ b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/storage/state/rocksdb/TxStateRocksDbStorage.java
@@ -44,6 +44,7 @@ import org.apache.ignite.internal.util.IgniteSpinBusyLock;
import org.apache.ignite.internal.util.IgniteUtils;
import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.lang.IgniteInternalException;
+import org.jetbrains.annotations.Nullable;
import org.rocksdb.ReadOptions;
import org.rocksdb.RocksDB;
import org.rocksdb.RocksDBException;
@@ -165,7 +166,7 @@ public class TxStateRocksDbStorage implements TxStateStorage {
}
@Override
- public boolean compareAndSet(UUID txId, TxState txStateExpected, TxMeta txMeta, long commandIndex) {
+ public boolean compareAndSet(UUID txId, @Nullable TxState txStateExpected, TxMeta txMeta, long commandIndex) {
requireNonNull(txMeta);
if (!busyLock.enterBusy()) {
@@ -205,6 +206,8 @@ public class TxStateRocksDbStorage implements TxStateStorage {
db.write(writeOptions, writeBatch);
+ lastAppliedIndex = commandIndex;
+
return result;
} catch (RocksDBException e) {
throw new IgniteInternalException(
@@ -310,6 +313,8 @@ public class TxStateRocksDbStorage implements TxStateStorage {
try {
db.put(lastAppliedIndexKey, longToBytes(lastAppliedIndex));
+
+ this.lastAppliedIndex = lastAppliedIndex;
} catch (RocksDBException e) {
throw new IgniteInternalException(
TX_STATE_STORAGE_ERR,
diff --git a/modules/transactions/src/testFixtures/java/org/apache/ignite/internal/tx/storage/state/test/TestTxStateStorage.java b/modules/transactions/src/testFixtures/java/org/apache/ignite/internal/tx/storage/state/test/TestTxStateStorage.java
index 9c119938f9..7091adca01 100644
--- a/modules/transactions/src/testFixtures/java/org/apache/ignite/internal/tx/storage/state/test/TestTxStateStorage.java
+++ b/modules/transactions/src/testFixtures/java/org/apache/ignite/internal/tx/storage/state/test/TestTxStateStorage.java
@@ -31,7 +31,7 @@ import org.apache.ignite.internal.tx.storage.state.TxStateStorage;
import org.apache.ignite.internal.tx.storage.state.UnsignedUuidComparator;
import org.apache.ignite.internal.util.Cursor;
import org.apache.ignite.lang.IgniteBiTuple;
-import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
/**
* Test implementation of {@link TxStateStorage} based on {@link ConcurrentSkipListMap}.
@@ -53,29 +53,32 @@ public class TestTxStateStorage implements TxStateStorage {
}
@Override
- public boolean compareAndSet(UUID txId, TxState txStateExpected, @NotNull TxMeta txMeta, long commandIndex) {
- while (true) {
- TxMeta old = storage.get(txId);
-
- if (old == null && txStateExpected == null) {
- TxMeta oldMeta = storage.putIfAbsent(txId, txMeta);
- if (oldMeta == null) {
- return true;
- } else {
- return false;
- }
- } else if (old != null) {
+ public boolean compareAndSet(UUID txId, @Nullable TxState txStateExpected, TxMeta txMeta, long commandIndex) {
+ TxMeta old = storage.get(txId);
+
+ boolean result;
+
+ if (old == null && txStateExpected == null) {
+ TxMeta oldMeta = storage.putIfAbsent(txId, txMeta);
+
+ result = oldMeta == null;
+ } else {
+ if (old != null) {
if (old.txState() == txStateExpected) {
- if (storage.replace(txId, old, txMeta)) {
- return true;
- }
+ result = storage.replace(txId, old, txMeta);
} else {
return old.txState() == txMeta.txState() && (
(old.commitTimestamp() == null && txMeta.commitTimestamp() == null)
|| old.commitTimestamp().equals(txMeta.commitTimestamp()));
}
+ } else {
+ result = false;
}
}
+
+ lastAppliedIndex = commandIndex;
+
+ return result;
}
@Override