You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sa...@apache.org on 2023/08/04 06:00:17 UTC

[ignite-3] branch main updated: IGNITE-20148 Explicit writeIntent cleanup on primary replica (#2405)

This is an automated email from the ASF dual-hosted git repository.

sanpwc pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 3e15298821 IGNITE-20148 Explicit writeIntent cleanup on primary replica (#2405)
3e15298821 is described below

commit 3e15298821af06e37d8fc2422cdf35dd6a888099
Author: Alexander Lapin <la...@gmail.com>
AuthorDate: Fri Aug 4 09:00:11 2023 +0300

    IGNITE-20148 Explicit writeIntent cleanup on primary replica (#2405)
---
 .../storage/impl/TestMvPartitionStorage.java       | 10 ++--
 .../table/distributed/StorageUpdateHandler.java    | 27 +++++------
 .../replicator/PartitionReplicaListener.java       | 53 +++++++++++++++++++++-
 3 files changed, 71 insertions(+), 19 deletions(-)

diff --git a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvPartitionStorage.java b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvPartitionStorage.java
index a89b7f4765..a0ae1201e1 100644
--- a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvPartitionStorage.java
+++ b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvPartitionStorage.java
@@ -252,13 +252,15 @@ public class TestMvPartitionStorage implements MvPartitionStorage {
         checkStorageClosed();
 
         map.compute(rowId, (ignored, versionChain) -> {
-            assert versionChain != null;
+            if (versionChain != null) {
+                if (!versionChain.isWriteIntent()) {
+                    return versionChain;
+                }
 
-            if (!versionChain.isWriteIntent()) {
-                return versionChain;
+                return resolveCommittedVersionChain(VersionChain.forCommitted(rowId, timestamp, versionChain));
             }
 
-            return resolveCommittedVersionChain(VersionChain.forCommitted(rowId, timestamp, versionChain));
+            return null;
         });
     }
 
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/StorageUpdateHandler.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/StorageUpdateHandler.java
index bbea32ec18..110be7852c 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/StorageUpdateHandler.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/StorageUpdateHandler.java
@@ -103,14 +103,14 @@ public class StorageUpdateHandler {
      * @param rowUuid Row UUID.
      * @param commitPartitionId Commit partition id.
      * @param rowBuffer Row buffer.
-     * @param onReplication Callback on replication.
+     * @param onApplication Callback on application.
      */
     public void handleUpdate(
             UUID txId,
             UUID rowUuid,
             TablePartitionId commitPartitionId,
             @Nullable ByteBuffer rowBuffer,
-            @Nullable Consumer<RowId> onReplication
+            @Nullable Consumer<RowId> onApplication
     ) {
         indexUpdateHandler.waitIndexes();
 
@@ -131,8 +131,8 @@ public class StorageUpdateHandler {
 
             indexUpdateHandler.addToIndexes(row, rowId);
 
-            if (onReplication != null) {
-                onReplication.accept(rowId);
+            if (onApplication != null) {
+                onApplication.accept(rowId);
             }
 
             return null;
@@ -225,9 +225,9 @@ public class StorageUpdateHandler {
      * Handles the abortion of a transaction.
      *
      * @param pendingRowIds Row ids of write-intents to be rolled back.
-     * @param onReplication On replication callback.
+     * @param onApplication On application callback.
      */
-    public void handleTransactionAbortion(Set<RowId> pendingRowIds, Runnable onReplication) {
+    public void handleTransactionAbortion(Set<RowId> pendingRowIds, Runnable onApplication) {
         storage.runConsistently(locker -> {
             for (RowId rowId : pendingRowIds) {
                 locker.lock(rowId);
@@ -239,21 +239,22 @@ public class StorageUpdateHandler {
 
                     ReadResult item = cursor.next();
 
-                    assert item.isWriteIntent();
+                    // TODO: https://issues.apache.org/jira/browse/IGNITE-20124 Prevent double storage updates within primary
+                    if (item.isWriteIntent()) {
+                        BinaryRow rowToRemove = item.binaryRow();
 
-                    BinaryRow rowToRemove = item.binaryRow();
+                        if (rowToRemove == null) {
+                            continue;
+                        }
 
-                    if (rowToRemove == null) {
-                        continue;
+                        indexUpdateHandler.tryRemoveFromIndexes(rowToRemove, rowId, cursor);
                     }
-
-                    indexUpdateHandler.tryRemoveFromIndexes(rowToRemove, rowId, cursor);
                 }
             }
 
             pendingRowIds.forEach(storage::abortWrite);
 
-            onReplication.run();
+            onApplication.run();
 
             return null;
         });
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
index 9064de3b6a..5f44953212 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.internal.table.distributed.replicator;
 
+import static it.unimi.dsi.fastutil.objects.ObjectSortedSets.EMPTY_SET;
 import static java.util.Objects.requireNonNull;
 import static java.util.concurrent.CompletableFuture.allOf;
 import static java.util.concurrent.CompletableFuture.completedFuture;
@@ -42,6 +43,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
+import java.util.SortedSet;
+import java.util.TreeSet;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
@@ -241,6 +244,9 @@ public class PartitionReplicaListener implements ReplicaListener {
 
     private final TablesConfiguration tablesConfig;
 
+    /** Rows that were inserted, updated or removed. All row IDs are sorted in natural order to prevent deadlocks upon commit/abort. */
+    private final Map<UUID, SortedSet<RowId>> txsPendingRowIds = new ConcurrentHashMap<>();
+
     /**
      * The constructor.
      *
@@ -1229,6 +1235,8 @@ public class PartitionReplicaListener implements ReplicaListener {
                     .safeTimeLong(hybridClock.nowLong())
                     .build();
 
+            cleanupLocally(request.txId(), request.commit(), request.commitTimestamp());
+
             return raftClient
                     .run(txCleanupCmd)
                     .thenCompose(ignored -> allOffFuturesExceptionIgnored(txReadFutures, request)
@@ -1740,7 +1748,15 @@ public class PartitionReplicaListener implements ReplicaListener {
                 cmd.rowUuid(),
                 cmd.tablePartitionId().asTablePartitionId(),
                 cmd.rowBuffer(),
-                null
+                rowId -> txsPendingRowIds.compute(cmd.txId(), (k, v) -> {
+                    if (v == null) {
+                        v = new TreeSet<>();
+                    }
+
+                    v.add(rowId);
+
+                    return v;
+                })
         );
 
         return applyCmdWithExceptionHandling(cmd);
@@ -1753,7 +1769,19 @@ public class PartitionReplicaListener implements ReplicaListener {
      * @return Raft future, see {@link #applyCmdWithExceptionHandling(Command)}.
      */
     private CompletableFuture<Object> applyUpdateAllCommand(UpdateAllCommand cmd) {
-        storageUpdateHandler.handleUpdateAll(cmd.txId(), cmd.rowsToUpdate(), cmd.tablePartitionId().asTablePartitionId(), null);
+        storageUpdateHandler.handleUpdateAll(
+                cmd.txId(),
+                cmd.rowsToUpdate(),
+                cmd.tablePartitionId().asTablePartitionId(),
+                rowIds -> txsPendingRowIds.compute(cmd.txId(), (k, v) -> {
+                    if (v == null) {
+                        v = new TreeSet<>();
+                    }
+
+                    v.addAll(rowIds);
+
+                    return v;
+                }));
 
         return applyCmdWithExceptionHandling(cmd);
     }
@@ -2560,4 +2588,25 @@ public class PartitionReplicaListener implements ReplicaListener {
 
         indexBuilder.stopBuildIndexes(tableId(), partId());
     }
+
+    private void cleanupLocally(UUID txId, boolean commit, HybridTimestamp commitTimestamp) {
+        Set<RowId> pendingRowIds = txsPendingRowIds.getOrDefault(txId, EMPTY_SET);
+
+        if (commit) {
+            mvDataStorage.runConsistently(locker -> {
+                pendingRowIds.forEach(locker::lock);
+
+                pendingRowIds.forEach(rowId -> mvDataStorage.commitWrite(rowId, commitTimestamp));
+
+                txsPendingRowIds.remove(txId);
+
+                return null;
+            });
+        } else {
+            storageUpdateHandler.handleTransactionAbortion(pendingRowIds, () -> {
+                // on application callback
+                txsPendingRowIds.remove(txId);
+            });
+        }
+    }
 }