You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vp...@apache.org on 2023/02/09 17:47:38 UTC
[ignite-3] branch main updated: IGNITE-18497 Fixed read only get returns a first one value getting from primary index (#1624)
This is an automated email from the ASF dual-hosted git repository.
vpyatkov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 5f834c5c7e IGNITE-18497 Fixed read only get returns a first one value getting from primary index (#1624)
5f834c5c7e is described below
commit 5f834c5c7ef87fece68f6af4d5f1329aa77fc19b
Author: Denis Chudov <mo...@gmail.com>
AuthorDate: Thu Feb 9 19:47:31 2023 +0200
IGNITE-18497 Fixed read only get returns a first one value getting from primary index (#1624)
---
.../apache/ignite/internal/util/IgniteUtils.java | 52 +++++
.../ItRaftCommandLeftInLogUntilRestartTest.java | 8 +-
.../replicator/PartitionReplicaListener.java | 139 ++++++++++--
.../replication/PartitionReplicaListenerTest.java | 251 ++++++++++++++++++---
.../internal/tx/message/TxStateReplicaRequest.java | 2 +-
5 files changed, 395 insertions(+), 57 deletions(-)
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
index 9d7a6430c9..9bdc8f64b2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
@@ -37,10 +37,12 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
+import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
+import java.util.Optional;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
@@ -944,4 +946,54 @@ public class IgniteUtils {
public static ObjectName makeMbeanName(String group, String name) throws MalformedObjectNameException {
return new ObjectName(String.format("%s:group=%s,name=%s", JMX_MBEAN_PACKAGE, group, name));
}
+
+ /**
+ * Filter the collection using the given predicate.
+ *
+ * @param collection Collection.
+ * @param predicate Predicate.
+ * @return Filtered list.
+ */
+ public static <T> List<T> filter(Collection<T> collection, Predicate<T> predicate) {
+ List<T> result = new ArrayList<>();
+
+ for (T e : collection) {
+ if (predicate.test(e)) {
+ result.add(e);
+ }
+ }
+
+ return result;
+ }
+
+ /**
+ * Find any element in given collection.
+ *
+ * @param collection Collection.
+ * @return Optional containing element (if present).
+ */
+ public static <T> Optional<T> findAny(Collection<T> collection) {
+ return findAny(collection, null);
+ }
+
+ /**
+ * Find any element in given collection for which the predicate returns {@code true}.
+ *
+ * @param collection Collection.
+ * @param predicate Predicate.
+ * @return Optional containing element (if present).
+ */
+ public static <T> Optional<T> findAny(Collection<T> collection, @Nullable Predicate<T> predicate) {
+ if (!collection.isEmpty()) {
+ for (Iterator<T> it = collection.iterator(); it.hasNext(); ) {
+ T t = it.next();
+
+ if (predicate == null || predicate.test(t)) {
+ return Optional.ofNullable(t);
+ }
+ }
+ }
+
+ return Optional.empty();
+ }
}
diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItRaftCommandLeftInLogUntilRestartTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItRaftCommandLeftInLogUntilRestartTest.java
index dd208ea7f2..e2cdf1a131 100644
--- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItRaftCommandLeftInLogUntilRestartTest.java
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItRaftCommandLeftInLogUntilRestartTest.java
@@ -36,6 +36,7 @@ import org.apache.ignite.internal.hlc.HybridClockImpl;
import org.apache.ignite.internal.schema.BinaryRow;
import org.apache.ignite.internal.schema.BinaryRowEx;
import org.apache.ignite.internal.schema.marshaller.TupleMarshallerImpl;
+import org.apache.ignite.internal.schema.row.Row;
import org.apache.ignite.internal.sql.engine.AbstractBasicIntegrationTest;
import org.apache.ignite.internal.table.TableImpl;
import org.apache.ignite.internal.testframework.IgniteTestUtils;
@@ -274,10 +275,9 @@ public class ItRaftCommandLeftInLogUntilRestartTest extends AbstractBasicIntegra
BinaryRow readOnlyRow = table.internalTable().get(testKey, new HybridClockImpl().now(), ignite.node()).get();
- //TODO: IGNITE-18497 Readonly check is possible only when the readonly read will be fixed.
- //assertNotNull(readOnlyRow);
- //assertEquals(row[1], new Row(table.schemaView().schema(), readOnlyRow).stringValue(2));
- //assertEquals(row[2], new Row(table.schemaView().schema(), readOnlyRow).doubleValue(1));
+ assertNotNull(readOnlyRow);
+ assertEquals(row[1], new Row(table.schemaView().schema(), readOnlyRow).stringValue(2));
+ assertEquals(row[2], new Row(table.schemaView().schema(), readOnlyRow).doubleValue(1));
} catch (Exception e) {
new RuntimeException(IgniteStringFormatter.format("Cannot check a row {}", row), e);
}
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
index 7c5496fe88..9c0954b0e8 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
@@ -22,6 +22,8 @@ import static java.util.concurrent.CompletableFuture.allOf;
import static java.util.concurrent.CompletableFuture.completedFuture;
import static java.util.concurrent.CompletableFuture.failedFuture;
import static org.apache.ignite.internal.util.CollectionUtils.nullOrEmpty;
+import static org.apache.ignite.internal.util.IgniteUtils.filter;
+import static org.apache.ignite.internal.util.IgniteUtils.findAny;
import static org.apache.ignite.lang.ErrorGroups.Transactions.TX_FAILED_READ_WRITE_OPERATION_ERR;
import static org.apache.ignite.lang.IgniteStringFormatter.format;
@@ -351,7 +353,7 @@ public class PartitionReplicaListener implements ReplicaListener {
if (txMeta == null) {
// All future transactions will be committed after the resolution processed.
- hybridClock.update(txStateReq.commitTimestamp());
+ hybridClock.update(txStateReq.readTimestamp());
}
txStateFut.complete(txMeta);
@@ -474,7 +476,7 @@ public class PartitionReplicaListener implements ReplicaListener {
CompletableFuture<Void> safeReadFuture = isPrimary ? completedFuture(null) : safeTime.waitFor(request.readTimestamp());
- return safeReadFuture.thenCompose(unused -> resolveRowByPk(searchRow, readTimestamp));
+ return safeReadFuture.thenCompose(unused -> resolveRowByPkForReadOnly(searchRow, readTimestamp));
}
/**
@@ -502,7 +504,7 @@ public class PartitionReplicaListener implements ReplicaListener {
ArrayList<CompletableFuture<BinaryRow>> resolutionFuts = new ArrayList<>(searchRows.size());
for (BinaryRow searchRow : searchRows) {
- CompletableFuture<BinaryRow> fut = resolveRowByPk(searchRow, readTimestamp);
+ CompletableFuture<BinaryRow> fut = resolveRowByPkForReadOnly(searchRow, readTimestamp);
resolutionFuts.add(fut);
}
@@ -1136,35 +1138,94 @@ public class PartitionReplicaListener implements ReplicaListener {
* @param ts A timestamp regarding which we need to resolve the given row.
* @return Result of the given action.
*/
- private CompletableFuture<BinaryRow> resolveRowByPk(BinaryRow searchKey, HybridTimestamp ts) {
+ private CompletableFuture<BinaryRow> resolveRowByPkForReadOnly(BinaryRow searchKey, HybridTimestamp ts) {
try (Cursor<RowId> cursor = pkIndexStorage.get().get(searchKey)) {
+ List<ReadResult> candidates = new ArrayList<>();
+
for (RowId rowId : cursor) {
ReadResult readResult = mvDataStorage.read(rowId, ts);
- return resolveReadResult(readResult, ts, () -> {
- HybridTimestamp newestCommitTimestamp = readResult.newestCommitTimestamp();
+ if (!readResult.isEmpty() || readResult.isWriteIntent()) {
+ candidates.add(readResult);
+ }
+ }
- if (newestCommitTimestamp == null) {
- return null;
- }
+ if (candidates.isEmpty()) {
+ return completedFuture(null);
+ }
- ReadResult committedReadResult = mvDataStorage.read(rowId, newestCommitTimestamp);
+ // TODO https://issues.apache.org/jira/browse/IGNITE-18767 scan of multiple write intents should not be needed
+ List<ReadResult> writeIntents = filter(candidates, ReadResult::isWriteIntent);
- assert !committedReadResult.isWriteIntent() :
- "The result is not committed [rowId=" + rowId + ", timestamp="
- + newestCommitTimestamp + ']';
+ if (!writeIntents.isEmpty()) {
+ ReadResult writeIntent = writeIntents.get(0);
- return committedReadResult.binaryRow();
- });
- }
+ // Assume that all write intents for the same key belong to the same transaction, as the key should be exclusively locked.
+ // This means that we can just resolve the state of this transaction.
+ checkWriteIntentsBelongSameTx(writeIntents);
- return completedFuture(null);
+ return resolveTxState(
+ new TablePartitionId(writeIntent.commitTableId(), writeIntent.commitPartitionId()),
+ writeIntent.transactionId(),
+ ts)
+ .thenApply(readLastCommitted -> {
+ if (readLastCommitted) {
+ for (ReadResult wi : writeIntents) {
+ HybridTimestamp newestCommitTimestamp = wi.newestCommitTimestamp();
+
+ if (newestCommitTimestamp == null) {
+ continue;
+ }
+
+ ReadResult committedReadResult = mvDataStorage.read(wi.rowId(), newestCommitTimestamp);
+
+ assert !committedReadResult.isWriteIntent() :
+ "The result is not committed [rowId=" + wi.rowId() + ", timestamp="
+ + newestCommitTimestamp + ']';
+
+ return committedReadResult.binaryRow();
+ }
+
+ return findAny(candidates, c -> !c.isWriteIntent() && !c.isEmpty()).map(ReadResult::binaryRow)
+ .orElse(null);
+ } else {
+ return findAny(writeIntents, wi -> !wi.isEmpty()).map(ReadResult::binaryRow)
+ .orElse(null);
+ }
+ });
+ } else {
+ BinaryRow result = findAny(candidates, r -> !r.isEmpty()).map(ReadResult::binaryRow)
+ .orElse(null);
+
+ return completedFuture(result);
+ }
} catch (Exception e) {
throw new IgniteInternalException(Replicator.REPLICA_COMMON_ERR,
format("Unable to close cursor [tableId={}]", tableId), e);
}
}
+ /**
+ * Check that all given write intents belong to the same transaction.
+ *
+ * @param writeIntents Write intents.
+ */
+ private static void checkWriteIntentsBelongSameTx(Collection<ReadResult> writeIntents) {
+ ReadResult writeIntent = findAny(writeIntents).orElseThrow();
+
+ for (ReadResult wi : writeIntents) {
+ assert wi.transactionId().equals(writeIntent.transactionId())
+ : "Unexpected write intent, tx1=" + writeIntent.transactionId() + ", tx2=" + wi.transactionId();
+
+ assert wi.commitTableId().equals(writeIntent.commitTableId())
+ : "Unexpected write intent, commitTableId1=" + writeIntent.commitTableId() + ", commitTableId2=" + wi.commitTableId();
+
+ assert wi.commitPartitionId() == writeIntent.commitPartitionId()
+ : "Unexpected write intent, commitPartitionId1=" + writeIntent.commitPartitionId()
+ + ", commitPartitionId2=" + wi.commitPartitionId();
+ }
+ }
+
/**
* Tests row values for equality.
*
@@ -1940,7 +2001,7 @@ public class PartitionReplicaListener implements ReplicaListener {
@Nullable Supplier<BinaryRow> lastCommitted
) {
if (readResult == null) {
- return null;
+ return completedFuture(null);
} else {
if (txId != null) {
// RW request.
@@ -1979,22 +2040,50 @@ public class PartitionReplicaListener implements ReplicaListener {
HybridTimestamp timestamp,
Supplier<BinaryRow> lastCommitted
) {
- ReplicationGroupId commitGrpId = new TablePartitionId(readResult.commitTableId(), readResult.commitPartitionId());
+ return resolveTxState(
+ new TablePartitionId(readResult.commitTableId(), readResult.commitPartitionId()),
+ readResult.transactionId(),
+ timestamp)
+ .thenApply(readLastCommitted -> {
+ if (readLastCommitted) {
+ return lastCommitted.get();
+ } else {
+ return readResult.binaryRow();
+ }
+ });
+ }
+ /**
+ * Resolve the actual tx state.
+ *
+ * @param commitGrpId Commit partition id.
+ * @param txId Transaction id.
+ * @param timestamp Timestamp.
+ * @return Future with boolean value, indicating whether the transaction was committed before timestamp.
+ */
+ private CompletableFuture<Boolean> resolveTxState(
+ ReplicationGroupId commitGrpId,
+ UUID txId,
+ HybridTimestamp timestamp
+ ) {
return placementDriver.sendMetaRequest(commitGrpId, FACTORY.txStateReplicaRequest()
.groupId(commitGrpId)
- .commitTimestamp(timestamp)
- .txId(readResult.transactionId())
+ .readTimestamp(timestamp)
+ .txId(txId)
.build())
.thenApply(txMeta -> {
if (txMeta == null) {
- return lastCommitted.get();
- } else if (txMeta.txState() == TxState.COMMITED && txMeta.commitTimestamp().compareTo(timestamp) <= 0) {
- return readResult.binaryRow();
+ return true;
+ } else if (txMeta.txState() == TxState.COMMITED) {
+ if (txMeta.commitTimestamp().compareTo(timestamp) <= 0) {
+ return false;
+ } else {
+ return true;
+ }
} else {
assert txMeta.txState() == TxState.ABORTED : "Unexpected transaction state [state=" + txMeta.txState() + ']';
- return lastCommitted.get();
+ return true;
}
});
}
diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
index 5dcc07baf4..5997e3be94 100644
--- a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
+++ b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
@@ -22,13 +22,13 @@ import static org.apache.ignite.internal.testframework.matchers.CompletableFutur
import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willSucceedFast;
import static org.apache.ignite.internal.util.ArrayUtils.asList;
import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -37,12 +37,15 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
+import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
+import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
@@ -87,6 +90,9 @@ import org.apache.ignite.internal.table.distributed.SortedIndexLocker;
import org.apache.ignite.internal.table.distributed.StorageUpdateHandler;
import org.apache.ignite.internal.table.distributed.TableMessagesFactory;
import org.apache.ignite.internal.table.distributed.TableSchemaAwareIndexStorage;
+import org.apache.ignite.internal.table.distributed.command.PartitionCommand;
+import org.apache.ignite.internal.table.distributed.command.TxCleanupCommand;
+import org.apache.ignite.internal.table.distributed.command.UpdateCommand;
import org.apache.ignite.internal.table.distributed.raft.PartitionDataStorage;
import org.apache.ignite.internal.table.distributed.replication.request.ReadWriteReplicaRequest;
import org.apache.ignite.internal.table.distributed.replicator.LeaderOrTxState;
@@ -121,7 +127,45 @@ import org.mockito.Mock;
/** There are tests for partition replica listener. */
public class PartitionReplicaListenerTest extends IgniteAbstractTest {
- private static final Supplier<CompletableFuture<?>> DEFAULT_MOCK_RAFT_FUTURE_SUPPLIER = () -> completedFuture(null);
+ /** Partition id. */
+ private static final int partId = 0;
+
+ /** Table id. */
+ private static final UUID tblId = UUID.randomUUID();
+
+ private static final Map<UUID, Set<RowId>> pendingRows = new ConcurrentHashMap<>();
+
+ /** The storage stores partition data. */
+ private static final TestMvPartitionStorage testMvPartitionStorage = new TestMvPartitionStorage(partId);
+
+ private static LockManager lockManager = new HeapLockManager();
+
+ private static final Function<PartitionCommand, CompletableFuture<?>> DEFAULT_MOCK_RAFT_FUTURE_CLOSURE = cmd -> {
+ if (cmd instanceof TxCleanupCommand) {
+ Set<RowId> rows = pendingRows.remove(cmd.txId());
+
+ if (rows != null) {
+ for (RowId row : rows) {
+ testMvPartitionStorage.commitWrite(row, ((TxCleanupCommand) cmd).commitTimestamp().asHybridTimestamp());
+ }
+ }
+
+ lockManager.locks(cmd.txId()).forEachRemaining(lock -> lockManager.release(lock));
+ } else if (cmd instanceof UpdateCommand) {
+ pendingRows.compute(cmd.txId(), (txId, v) -> {
+ if (v == null) {
+ v = new HashSet<>();
+ }
+
+ RowId rowId = new RowId(partId, ((UpdateCommand) cmd).rowUuid());
+ v.add(rowId);
+
+ return v;
+ });
+ }
+
+ return completedFuture(null);
+ };
/** Tx messages factory. */
private static final TxMessagesFactory TX_MESSAGES_FACTORY = new TxMessagesFactory();
@@ -129,12 +173,6 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
/** Table messages factory. */
private static final TableMessagesFactory TABLE_MESSAGES_FACTORY = new TableMessagesFactory();
- /** Partition id. */
- private static final int partId = 0;
-
- /** Table id. */
- private static final UUID tblId = UUID.randomUUID();
-
/** Replication group id. */
private static final ReplicationGroupId grpId = new TablePartitionId(tblId, partId);
@@ -144,16 +182,13 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
/** The storage stores transaction states. */
private static final TestTxStateStorage txStateStorage = new TestTxStateStorage();
- /** The storage stores partition data. */
- private static final TestMvPartitionStorage testMvPartitionStorage = new TestMvPartitionStorage(partId);
-
/** Local cluster node. */
private static final ClusterNode localNode = new ClusterNode("node1", "node1", NetworkAddress.from("127.0.0.1:127"));
/** Another (not local) cluster node. */
private static final ClusterNode anotherNode = new ClusterNode("node2", "node2", NetworkAddress.from("127.0.0.2:127"));
- private static PlacementDriver placementDriver = mock(PlacementDriver.class);
+ private static final PlacementDriver placementDriver = mock(PlacementDriver.class);
private static PartitionDataStorage partitionDataStorage = new TestPartitionDataStorage(testMvPartitionStorage);
@@ -192,9 +227,7 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
/** Secondary hash index. */
private static TableSchemaAwareIndexStorage hashIndexStorage;
- private static Supplier<CompletableFuture<?>> raftClientFutureSupplier = DEFAULT_MOCK_RAFT_FUTURE_SUPPLIER;
-
- private static LockManager lockManager = new HeapLockManager();
+ private static Function<PartitionCommand, CompletableFuture<?>> raftClientFutureClosure = DEFAULT_MOCK_RAFT_FUTURE_CLOSURE;
@BeforeAll
private static void beforeAll() {
@@ -206,7 +239,7 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
return completedFuture(new LeaderWithTerm(new Peer(localNode.name()), 1L));
});
- when(mockRaftClient.run(any())).thenAnswer(invocationOnMock -> raftClientFutureSupplier.get());
+ when(mockRaftClient.run(any())).thenAnswer(invocationOnMock -> raftClientFutureClosure.apply(invocationOnMock.getArgument(0)));
when(topologySrv.getByConsistentId(any())).thenAnswer(invocationOnMock -> {
String consistentId = invocationOnMock.getArgument(0);
@@ -223,7 +256,7 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
HybridTimestamp txFixedTimestamp = clock.now();
- when(placementDriver.sendMetaRequest(eq(grpId), any())).thenAnswer(invocationOnMock -> {
+ when(placementDriver.sendMetaRequest(any(), any())).thenAnswer(invocationOnMock -> {
TxMeta txMeta;
if (txState == null) {
@@ -319,13 +352,15 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
((TestHashIndexStorage) hashIndexStorage.storage()).clear();
((TestSortedIndexStorage) sortedIndexStorage.storage()).clear();
testMvPartitionStorage.clear();
+ pendingRows.clear();
+ //lockManager.locks(txId).forEachRemaining(lock -> lockManager.release(lock));
}
@Test
public void testTxStateReplicaRequestEmptyState() throws Exception {
CompletableFuture<?> fut = partitionReplicaListener.invoke(TX_MESSAGES_FACTORY.txStateReplicaRequest()
.groupId(grpId)
- .commitTimestamp(clock.now())
+ .readTimestamp(clock.now())
.txId(Timestamp.nextVersion().toUuid())
.build());
@@ -345,7 +380,7 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
CompletableFuture<?> fut = partitionReplicaListener.invoke(TX_MESSAGES_FACTORY.txStateReplicaRequest()
.groupId(grpId)
- .commitTimestamp(readTimestamp)
+ .readTimestamp(readTimestamp)
.txId(txId)
.build());
@@ -362,7 +397,7 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
CompletableFuture<?> fut = partitionReplicaListener.invoke(TX_MESSAGES_FACTORY.txStateReplicaRequest()
.groupId(grpId)
- .commitTimestamp(clock.now())
+ .readTimestamp(clock.now())
.txId(Timestamp.nextVersion().toUuid())
.build());
@@ -807,7 +842,7 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
doSingleRowRequest(txId, binaryRow(0), RequestType.RW_DELETE_EXACT);
checkRowInMvStorage(binaryRow(0), false);
- lockManager.locks(txId).forEachRemaining(lock -> lockManager.release(lock));
+ cleanup(txId);
}
@Test
@@ -843,7 +878,7 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
checkRowInMvStorage(row0, false);
checkRowInMvStorage(row1, false);
- lockManager.locks(txId).forEachRemaining(lock -> lockManager.release(lock));
+ cleanup(txId);
}
private void doSingleRowRequest(UUID txId, BinaryRow binaryRow, RequestType requestType) {
@@ -889,7 +924,7 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
() -> checkRowInMvStorage(binaryRow(0), true)
);
- lockManager.locks(txId).forEachRemaining(lock -> lockManager.release(lock));
+ cleanup(txId);
}
@Test
@@ -915,7 +950,7 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
() -> checkRowInMvStorage(binaryRow(0), true)
);
- lockManager.locks(txId).forEachRemaining(lock -> lockManager.release(lock));
+ cleanup(txId);
}
private void checkRowInMvStorage(BinaryRow binaryRow, boolean shouldBePresent) {
@@ -956,14 +991,14 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
// Check that cleanup request processing awaits all write requests.
CompletableFuture<?> writeFut = new CompletableFuture<>();
- raftClientFutureSupplier = () -> writeFut;
+ raftClientFutureClosure = cmd -> writeFut;
try {
CompletableFuture<?> replicaWriteFut = partitionReplicaListener.invoke(updatingRequestSupplier.get());
assertFalse(replicaWriteFut.isDone());
- raftClientFutureSupplier = DEFAULT_MOCK_RAFT_FUTURE_SUPPLIER;
+ raftClientFutureClosure = DEFAULT_MOCK_RAFT_FUTURE_CLOSURE;
HybridTimestamp now = clock.now();
@@ -984,7 +1019,7 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
assertThat(replicaCleanupFut, willSucceedFast());
} finally {
- raftClientFutureSupplier = DEFAULT_MOCK_RAFT_FUTURE_SUPPLIER;
+ raftClientFutureClosure = DEFAULT_MOCK_RAFT_FUTURE_CLOSURE;
}
// Check that one more write after cleanup is discarded.
@@ -992,6 +1027,168 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
assertThat(writeAfterCleanupFuture, willFailFast(TransactionException.class));
}
+ @Test
+ public void testReadOnlyGetAfterRowRewrite() {
+ testReadOnlyGetAfterRowRewrite0(true, true, true, false);
+ testReadOnlyGetAfterRowRewrite0(true, true, false, false);
+ testReadOnlyGetAfterRowRewrite0(true, false, true, false);
+ testReadOnlyGetAfterRowRewrite0(true, false, false, false);
+ testReadOnlyGetAfterRowRewrite0(false, true, true, false);
+ testReadOnlyGetAfterRowRewrite0(false, true, false, false);
+ testReadOnlyGetAfterRowRewrite0(false, false, true, false);
+ testReadOnlyGetAfterRowRewrite0(false, false, false, false);
+ }
+
+ @Test
+ public void testReadOnlyGetAllAfterRowRewrite() {
+ testReadOnlyGetAfterRowRewrite0(true, true, true, true);
+ testReadOnlyGetAfterRowRewrite0(true, true, false, true);
+ testReadOnlyGetAfterRowRewrite0(true, false, true, true);
+ testReadOnlyGetAfterRowRewrite0(true, false, false, true);
+ testReadOnlyGetAfterRowRewrite0(false, true, true, true);
+ testReadOnlyGetAfterRowRewrite0(false, true, false, true);
+ testReadOnlyGetAfterRowRewrite0(false, false, true, true);
+ testReadOnlyGetAfterRowRewrite0(false, false, false, true);
+ }
+
+ /**
+ * Puts several records into the storage, optionally leaving them as write intents, alternately deleting and upserting the same row
+ * within the same RW transaction, then checking read correctness via read only request.
+ *
+ * @param insertFirst Whether to insert some values before RW transaction.
+ * @param upsertAfterDelete Whether to insert value after delete in RW transaction, so that it would present as non-null write intent.
+ * @param committed Whether to commit RW transaction before doing RO request.
+ * @param multiple Whether to check multiple rows via getAll request.
+ */
+ public void testReadOnlyGetAfterRowRewrite0(boolean insertFirst, boolean upsertAfterDelete, boolean committed, boolean multiple) {
+ beforeTest();
+
+ BinaryRow br1 = binaryRow(1);
+ BinaryRow br2 = binaryRow(2);
+
+ if (insertFirst) {
+ UUID tx0 = beginTx();
+ upsert(tx0, br1);
+ upsert(tx0, br2);
+ cleanup(tx0);
+ }
+
+ txState = null;
+
+ UUID tx1 = beginTx();
+ delete(tx1, br1);
+ upsert(tx1, br1);
+
+ while (true) {
+ delete(tx1, br1);
+
+ if (upsertAfterDelete) {
+ upsert(tx1, br1);
+ }
+
+ Cursor<RowId> cursor = pkStorage.get().get(br1);
+
+ RowId rowId = cursor.next();
+
+ BinaryRow row = testMvPartitionStorage.read(rowId, HybridTimestamp.MAX_VALUE).binaryRow();
+
+ if (row == null) {
+ break;
+ }
+ }
+
+ if (committed) {
+ cleanup(tx1);
+ }
+
+ if (multiple) {
+ Set<BinaryRow> allRows = insertFirst ? Set.of(br1, br2) : Set.of(br1);
+ Set<BinaryRow> allRowsButModified = insertFirst ? Set.of(br2) : Set.of();
+ Set<BinaryRow> expected = committed
+ ? (upsertAfterDelete ? allRows : allRowsButModified)
+ : (insertFirst ? allRows : Set.of());
+ Set<BinaryRow> res = new HashSet<>(roGetAll(allRows, clock.now()));
+
+ assertEquals(expected.size(), res.size());
+ for (BinaryRow e : expected) {
+ res.contains(e);
+ }
+ } else {
+ BinaryRow res = roGet(br1, clock.now());
+ BinaryRow expected = committed
+ ? (upsertAfterDelete ? br1 : null)
+ : (insertFirst ? br1 : null);
+
+ if (expected == null) {
+ assertNull(res);
+ } else {
+ assertArrayEquals(expected.bytes(), res.bytes());
+ }
+ }
+
+ cleanup(tx1);
+ }
+
+ private UUID beginTx() {
+ return Timestamp.nextVersion().toUuid();
+ }
+
+ private void upsert(UUID txId, BinaryRow row) {
+ partitionReplicaListener.invoke(TABLE_MESSAGES_FACTORY.readWriteSingleRowReplicaRequest()
+ .requestType(RequestType.RW_UPSERT)
+ .transactionId(txId)
+ .binaryRow(row)
+ .term(1L)
+ .commitPartitionId(new TablePartitionId(tblId, partId))
+ .build()
+ );
+ }
+
+ private void delete(UUID txId, BinaryRow row) {
+ partitionReplicaListener.invoke(TABLE_MESSAGES_FACTORY.readWriteSingleRowReplicaRequest()
+ .requestType(RequestType.RW_DELETE)
+ .transactionId(txId)
+ .binaryRow(row)
+ .term(1L)
+ .commitPartitionId(new TablePartitionId(tblId, partId))
+ .build()
+ );
+ }
+
+ private BinaryRow roGet(BinaryRow row, HybridTimestamp readTimestamp) {
+ CompletableFuture<Object> future = partitionReplicaListener.invoke(TABLE_MESSAGES_FACTORY.readOnlySingleRowReplicaRequest()
+ .requestType(RequestType.RO_GET)
+ .readTimestamp(readTimestamp)
+ .binaryRow(row)
+ .build()
+ );
+
+ return (BinaryRow) future.join();
+ }
+
+ private List<BinaryRow> roGetAll(Collection<BinaryRow> rows, HybridTimestamp readTimestamp) {
+ CompletableFuture<Object> future = partitionReplicaListener.invoke(TABLE_MESSAGES_FACTORY.readOnlyMultiRowReplicaRequest()
+ .requestType(RequestType.RO_GET_ALL)
+ .readTimestamp(readTimestamp)
+ .binaryRows(rows)
+ .build()
+ );
+
+ return (List<BinaryRow>) future.join();
+ }
+
+ private void cleanup(UUID txId) {
+ partitionReplicaListener.invoke(TX_MESSAGES_FACTORY.txCleanupReplicaRequest()
+ .txId(txId)
+ .commit(true)
+ .commitTimestamp(clock.now())
+ .term(1L)
+ .build()
+ );
+
+ txState = TxState.COMMITED;
+ }
+
private static BinaryTuplePrefix toIndexBound(int val) {
ByteBuffer tuple = new BinaryTuplePrefixBuilder(1, 1).appendInt(val).build();
diff --git a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxStateReplicaRequest.java b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxStateReplicaRequest.java
index 983b9133e9..d5bb7aa560 100644
--- a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxStateReplicaRequest.java
+++ b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxStateReplicaRequest.java
@@ -32,5 +32,5 @@ public interface TxStateReplicaRequest extends ReplicaRequest {
UUID txId();
@Marshallable
- HybridTimestamp commitTimestamp();
+ HybridTimestamp readTimestamp();
}