You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sa...@apache.org on 2023/08/04 06:00:17 UTC
[ignite-3] branch main updated: IGNITE-20148 Explicit writeIntent cleanup on primary replica (#2405)
This is an automated email from the ASF dual-hosted git repository.
sanpwc pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 3e15298821 IGNITE-20148 Explicit writeIntent cleanup on primary replica (#2405)
3e15298821 is described below
commit 3e15298821af06e37d8fc2422cdf35dd6a888099
Author: Alexander Lapin <la...@gmail.com>
AuthorDate: Fri Aug 4 09:00:11 2023 +0300
IGNITE-20148 Explicit writeIntent cleanup on primary replica (#2405)
---
.../storage/impl/TestMvPartitionStorage.java | 10 ++--
.../table/distributed/StorageUpdateHandler.java | 27 +++++------
.../replicator/PartitionReplicaListener.java | 53 +++++++++++++++++++++-
3 files changed, 71 insertions(+), 19 deletions(-)
diff --git a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvPartitionStorage.java b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvPartitionStorage.java
index a89b7f4765..a0ae1201e1 100644
--- a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvPartitionStorage.java
+++ b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvPartitionStorage.java
@@ -252,13 +252,15 @@ public class TestMvPartitionStorage implements MvPartitionStorage {
checkStorageClosed();
map.compute(rowId, (ignored, versionChain) -> {
- assert versionChain != null;
+ if (versionChain != null) {
+ if (!versionChain.isWriteIntent()) {
+ return versionChain;
+ }
- if (!versionChain.isWriteIntent()) {
- return versionChain;
+ return resolveCommittedVersionChain(VersionChain.forCommitted(rowId, timestamp, versionChain));
}
- return resolveCommittedVersionChain(VersionChain.forCommitted(rowId, timestamp, versionChain));
+ return null;
});
}
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/StorageUpdateHandler.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/StorageUpdateHandler.java
index bbea32ec18..110be7852c 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/StorageUpdateHandler.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/StorageUpdateHandler.java
@@ -103,14 +103,14 @@ public class StorageUpdateHandler {
* @param rowUuid Row UUID.
* @param commitPartitionId Commit partition id.
* @param rowBuffer Row buffer.
- * @param onReplication Callback on replication.
+ * @param onApplication Callback on application.
*/
public void handleUpdate(
UUID txId,
UUID rowUuid,
TablePartitionId commitPartitionId,
@Nullable ByteBuffer rowBuffer,
- @Nullable Consumer<RowId> onReplication
+ @Nullable Consumer<RowId> onApplication
) {
indexUpdateHandler.waitIndexes();
@@ -131,8 +131,8 @@ public class StorageUpdateHandler {
indexUpdateHandler.addToIndexes(row, rowId);
- if (onReplication != null) {
- onReplication.accept(rowId);
+ if (onApplication != null) {
+ onApplication.accept(rowId);
}
return null;
@@ -225,9 +225,9 @@ public class StorageUpdateHandler {
* Handles the abortion of a transaction.
*
* @param pendingRowIds Row ids of write-intents to be rolled back.
- * @param onReplication On replication callback.
+ * @param onApplication On application callback.
*/
- public void handleTransactionAbortion(Set<RowId> pendingRowIds, Runnable onReplication) {
+ public void handleTransactionAbortion(Set<RowId> pendingRowIds, Runnable onApplication) {
storage.runConsistently(locker -> {
for (RowId rowId : pendingRowIds) {
locker.lock(rowId);
@@ -239,21 +239,22 @@ public class StorageUpdateHandler {
ReadResult item = cursor.next();
- assert item.isWriteIntent();
+ // TODO: https://issues.apache.org/jira/browse/IGNITE-20124 Prevent double storage updates within primary
+ if (item.isWriteIntent()) {
+ BinaryRow rowToRemove = item.binaryRow();
- BinaryRow rowToRemove = item.binaryRow();
+ if (rowToRemove == null) {
+ continue;
+ }
- if (rowToRemove == null) {
- continue;
+ indexUpdateHandler.tryRemoveFromIndexes(rowToRemove, rowId, cursor);
}
-
- indexUpdateHandler.tryRemoveFromIndexes(rowToRemove, rowId, cursor);
}
}
pendingRowIds.forEach(storage::abortWrite);
- onReplication.run();
+ onApplication.run();
return null;
});
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
index 9064de3b6a..5f44953212 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
@@ -17,6 +17,7 @@
package org.apache.ignite.internal.table.distributed.replicator;
+import static it.unimi.dsi.fastutil.objects.ObjectSortedSets.EMPTY_SET;
import static java.util.Objects.requireNonNull;
import static java.util.concurrent.CompletableFuture.allOf;
import static java.util.concurrent.CompletableFuture.completedFuture;
@@ -42,6 +43,8 @@ import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
+import java.util.SortedSet;
+import java.util.TreeSet;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
@@ -241,6 +244,9 @@ public class PartitionReplicaListener implements ReplicaListener {
private final TablesConfiguration tablesConfig;
+ /** Rows that were inserted, updated or removed. All row IDs are sorted in natural order to prevent deadlocks upon commit/abort. */
+ private final Map<UUID, SortedSet<RowId>> txsPendingRowIds = new ConcurrentHashMap<>();
+
/**
* The constructor.
*
@@ -1229,6 +1235,8 @@ public class PartitionReplicaListener implements ReplicaListener {
.safeTimeLong(hybridClock.nowLong())
.build();
+ cleanupLocally(request.txId(), request.commit(), request.commitTimestamp());
+
return raftClient
.run(txCleanupCmd)
.thenCompose(ignored -> allOffFuturesExceptionIgnored(txReadFutures, request)
@@ -1740,7 +1748,15 @@ public class PartitionReplicaListener implements ReplicaListener {
cmd.rowUuid(),
cmd.tablePartitionId().asTablePartitionId(),
cmd.rowBuffer(),
- null
+ rowId -> txsPendingRowIds.compute(cmd.txId(), (k, v) -> {
+ if (v == null) {
+ v = new TreeSet<>();
+ }
+
+ v.add(rowId);
+
+ return v;
+ })
);
return applyCmdWithExceptionHandling(cmd);
@@ -1753,7 +1769,19 @@ public class PartitionReplicaListener implements ReplicaListener {
* @return Raft future, see {@link #applyCmdWithExceptionHandling(Command)}.
*/
private CompletableFuture<Object> applyUpdateAllCommand(UpdateAllCommand cmd) {
- storageUpdateHandler.handleUpdateAll(cmd.txId(), cmd.rowsToUpdate(), cmd.tablePartitionId().asTablePartitionId(), null);
+ storageUpdateHandler.handleUpdateAll(
+ cmd.txId(),
+ cmd.rowsToUpdate(),
+ cmd.tablePartitionId().asTablePartitionId(),
+ rowIds -> txsPendingRowIds.compute(cmd.txId(), (k, v) -> {
+ if (v == null) {
+ v = new TreeSet<>();
+ }
+
+ v.addAll(rowIds);
+
+ return v;
+ }));
return applyCmdWithExceptionHandling(cmd);
}
@@ -2560,4 +2588,25 @@ public class PartitionReplicaListener implements ReplicaListener {
indexBuilder.stopBuildIndexes(tableId(), partId());
}
+
+ private void cleanupLocally(UUID txId, boolean commit, HybridTimestamp commitTimestamp) {
+ Set<RowId> pendingRowIds = txsPendingRowIds.getOrDefault(txId, EMPTY_SET);
+
+ if (commit) {
+ mvDataStorage.runConsistently(locker -> {
+ pendingRowIds.forEach(locker::lock);
+
+ pendingRowIds.forEach(rowId -> mvDataStorage.commitWrite(rowId, commitTimestamp));
+
+ txsPendingRowIds.remove(txId);
+
+ return null;
+ });
+ } else {
+ storageUpdateHandler.handleTransactionAbortion(pendingRowIds, () -> {
+ // on application callback
+ txsPendingRowIds.remove(txId);
+ });
+ }
+ }
}