You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@ignite.apache.org by "ibessonov (via GitHub)" <gi...@apache.org> on 2023/04/25 08:06:43 UTC

[GitHub] [ignite-3] ibessonov opened a new pull request, #1976: IGNITE-19269 Implemented methods to implicitly lock rows in MV storage.

ibessonov opened a new pull request, #1976:
URL: https://github.com/apache/ignite-3/pull/1976

   https://issues.apache.org/jira/browse/IGNITE-19269


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] rpuch commented on a diff in pull request #1976: IGNITE-19269 Implemented methods to explicitly lock rows in MV storage.

Posted by "rpuch (via GitHub)" <gi...@apache.org>.
rpuch commented on code in PR #1976:
URL: https://github.com/apache/ignite-3/pull/1976#discussion_r1180343848


##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/StorageUpdateHandler.java:
##########
@@ -189,6 +198,8 @@ public void handleUpdateAll(
                     RowId rowId = new RowId(partitionId, entry.getKey());
                     BinaryRow row = entry.getValue() != null ? new ByteBufferRow(entry.getValue()) : null;
 
+                    locker.lock(rowId);

Review Comment:
   I think it's better to sort it before merging the PR. Maybe I'm too wary, but seeing a hanging CI build is scary :)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] ibessonov commented on a diff in pull request #1976: IGNITE-19269 Implemented methods to explicitly lock rows in MV storage.

Posted by "ibessonov (via GitHub)" <gi...@apache.org>.
ibessonov commented on code in PR #1976:
URL: https://github.com/apache/ignite-3/pull/1976#discussion_r1180301698


##########
modules/storage-api/src/main/java/org/apache/ignite/internal/storage/MvPartitionStorage.java:
##########
@@ -217,6 +245,25 @@ public interface MvPartitionStorage extends ManuallyCloseable {
      */
     @Nullable RowId closestRowId(RowId lowerBound) throws StorageException;
 
+    /**
+     * Returns the head of GC queue.
+     *
+     * @param lowWatermark Upper bound for commit timestamp of GC entry.

Review Comment:
   I'll figure it out



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] ibessonov commented on a diff in pull request #1976: IGNITE-19269 Implemented methods to explicitly lock rows in MV storage.

Posted by "ibessonov (via GitHub)" <gi...@apache.org>.
ibessonov commented on code in PR #1976:
URL: https://github.com/apache/ignite-3/pull/1976#discussion_r1180310313


##########
modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java:
##########
@@ -853,24 +859,22 @@ protected ReadResult decodeEntry(byte[] key, byte[] value) {
 
                 @Override
                 public boolean hasNext() {
-                    return busy(() -> {
-                        throwExceptionIfStorageInProgressOfRebalance(state.get(), RocksDbMvPartitionStorage.this::createStorageInfo);
+                    assert rowIsLocked(rowId);
 
-                        return super.hasNext();
-                    });
+                    return super.hasNext();
                 }
 
                 @Override
                 public ReadResult next() {
-                    return busy(() -> {
-                        throwExceptionIfStorageInProgressOfRebalance(state.get(), RocksDbMvPartitionStorage.this::createStorageInfo);
+                    assert rowIsLocked(rowId);
 
-                        return super.next();
-                    });
+                    return super.next();
                 }
 
                 @Override
                 public void close() {
+                    assert rowIsLocked(rowId);

Review Comment:
   We don't, that's me being paranoid. Should I remove the assertion?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] rpuch commented on a diff in pull request #1976: IGNITE-19269 Implemented methods to explicitly lock rows in MV storage.

Posted by "rpuch (via GitHub)" <gi...@apache.org>.
rpuch commented on code in PR #1976:
URL: https://github.com/apache/ignite-3/pull/1976#discussion_r1180335501


##########
modules/storage-api/src/main/java/org/apache/ignite/internal/storage/MvPartitionStorage.java:
##########
@@ -226,8 +273,32 @@ public interface MvPartitionStorage extends ManuallyCloseable {
      *      {@code null} if there's no such value.
      * @throws StorageException If failed to poll element for vacuum.
      */
+    //TODO IGNITE-19367 Remove this method and replace its usages with proper batch removes.
+    @Deprecated
     default @Nullable BinaryRowAndRowId pollForVacuum(HybridTimestamp lowWatermark) {
-        throw new UnsupportedOperationException("pollForVacuum");
+        while (true) {
+            BinaryRowAndRowId binaryRowAndRowId = runConsistently(locker -> {
+                GcEntry gcEntry = peek(lowWatermark);
+
+                if (gcEntry == null) {
+                    return null;
+                }
+
+                locker.lock(gcEntry.getRowId());

Review Comment:
   How about adding a TODO? It seems really easy to forget to change lock() to tryLock() after the change is implemented



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] ibessonov commented on a diff in pull request #1976: IGNITE-19269 Implemented methods to explicitly lock rows in MV storage.

Posted by "ibessonov (via GitHub)" <gi...@apache.org>.
ibessonov commented on code in PR #1976:
URL: https://github.com/apache/ignite-3/pull/1976#discussion_r1180347444


##########
modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/PersistentPageMemoryMvPartitionStorage.java:
##########
@@ -136,19 +137,31 @@ public void afterCheckpointEnd(CheckpointProgress progress) {
 
     @Override
     public <V> V runConsistently(WriteClosure<V> closure) throws StorageException {
-        return busy(() -> {
-            throwExceptionIfStorageNotInRunnableOrRebalanceState(state.get(), this::createStorageInfo);
+        LocalLocker locker = THREAD_LOCAL_LOCKER.get();
 
-            checkpointTimeoutLock.checkpointReadLock();
+        if (locker != null) {
+            return closure.execute(locker);
+        } else {
+            return busy(() -> {
+                throwExceptionIfStorageNotInRunnableOrRebalanceState(state.get(), this::createStorageInfo);
 
-            try {
-                return closure.execute();
-            } finally {
-                updateVersionChainLockByRowId.releaseAllLockByCurrentThread();
+                LocalLocker locker0 = new LocalLocker(lockByRowId);
 
-                checkpointTimeoutLock.checkpointReadUnlock();
-            }
-        });
+                checkpointTimeoutLock.checkpointReadLock();
+
+                THREAD_LOCAL_LOCKER.set(locker0);
+
+                try {
+                    return closure.execute(locker0);
+                } finally {
+                    THREAD_LOCAL_LOCKER.set(null);
+
+                    locker0.unlockAll();

Review Comment:
   But this will never happen, we never check that exception anywhere in the code



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] ibessonov commented on a diff in pull request #1976: IGNITE-19269 Implemented methods to explicitly lock rows in MV storage.

Posted by "ibessonov (via GitHub)" <gi...@apache.org>.
ibessonov commented on code in PR #1976:
URL: https://github.com/apache/ignite-3/pull/1976#discussion_r1182234098


##########
modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java:
##########
@@ -853,24 +859,22 @@ protected ReadResult decodeEntry(byte[] key, byte[] value) {
 
                 @Override
                 public boolean hasNext() {
-                    return busy(() -> {
-                        throwExceptionIfStorageInProgressOfRebalance(state.get(), RocksDbMvPartitionStorage.this::createStorageInfo);
+                    assert rowIsLocked(rowId);
 
-                        return super.hasNext();
-                    });
+                    return super.hasNext();
                 }
 
                 @Override
                 public ReadResult next() {
-                    return busy(() -> {
-                        throwExceptionIfStorageInProgressOfRebalance(state.get(), RocksDbMvPartitionStorage.this::createStorageInfo);
+                    assert rowIsLocked(rowId);
 
-                        return super.next();
-                    });
+                    return super.next();
                 }
 
                 @Override
                 public void close() {
+                    assert rowIsLocked(rowId);

Review Comment:
   This assertion is for commenting purposes and for other developers to catch problems if they use cursors in unintended ways.
   This particular assertion should never happen in production environment, because it only depends on code structure, there's no data dependency. Thus we can be sure about it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] ibessonov merged pull request #1976: IGNITE-19269 Implemented methods to explicitly lock rows in MV storage.

Posted by "ibessonov (via GitHub)" <gi...@apache.org>.
ibessonov merged PR #1976:
URL: https://github.com/apache/ignite-3/pull/1976


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] rpuch commented on a diff in pull request #1976: IGNITE-19269 Implemented methods to explicitly lock rows in MV storage.

Posted by "rpuch (via GitHub)" <gi...@apache.org>.
rpuch commented on code in PR #1976:
URL: https://github.com/apache/ignite-3/pull/1976#discussion_r1178781093


##########
modules/storage-api/src/main/java/org/apache/ignite/internal/storage/MvPartitionStorage.java:
##########
@@ -47,14 +48,41 @@ public interface MvPartitionStorage extends ManuallyCloseable {
     long REBALANCE_IN_PROGRESS = -1;
 
     /**
-     * Closure for executing write operations on the storage.
+     * Closure for executing write operations on the storage. All write operations, such as
+     * {@link #addWrite(RowId, BinaryRow, UUID, UUID, int)} or {@link #commitWrite(RowId, HybridTimestamp)},
+     * as well as {@link #scanVersions(RowId)}, and operations like {@link #committedGroupConfiguration(byte[])}, must be executed inside
+     * of the write closure. Also, each operation that involves modifying rows (and {@link #scanVersions(RowId)} must hold lock on

Review Comment:
   ```suggestion
        * of the write closure. Also, each operation that involves modifying rows (and {@link #scanVersions(RowId)}) must hold lock on
   ```



##########
modules/storage-api/src/main/java/org/apache/ignite/internal/storage/MvPartitionStorage.java:
##########
@@ -47,14 +48,41 @@ public interface MvPartitionStorage extends ManuallyCloseable {
     long REBALANCE_IN_PROGRESS = -1;
 
     /**
-     * Closure for executing write operations on the storage.
+     * Closure for executing write operations on the storage. All write operations, such as
+     * {@link #addWrite(RowId, BinaryRow, UUID, UUID, int)} or {@link #commitWrite(RowId, HybridTimestamp)},
+     * as well as {@link #scanVersions(RowId)}, and operations like {@link #committedGroupConfiguration(byte[])}, must be executed inside
+     * of the write closure. Also, each operation that involves modifying rows (and {@link #scanVersions(RowId)} must hold lock on
+     * corresponding row ID, by either calling {@link Locker#lock(RowId)} or calling {@link Locker#tryLock(RowId)} and checking the

Review Comment:
   ```suggestion
        * the corresponding row ID, by either calling {@link Locker#lock(RowId)} or calling {@link Locker#tryLock(RowId)} and checking the
   ```



##########
modules/storage-api/src/main/java/org/apache/ignite/internal/storage/MvPartitionStorage.java:
##########
@@ -217,6 +245,25 @@ public interface MvPartitionStorage extends ManuallyCloseable {
      */
     @Nullable RowId closestRowId(RowId lowerBound) throws StorageException;
 
+    /**
+     * Returns the head of GC queue.
+     *
+     * @param lowWatermark Upper bound for commit timestamp of GC entry.

Review Comment:
   Is it an inclusive or exclusive bound?



##########
modules/storage-api/src/main/java/org/apache/ignite/internal/storage/MvPartitionStorage.java:
##########
@@ -47,14 +48,41 @@ public interface MvPartitionStorage extends ManuallyCloseable {
     long REBALANCE_IN_PROGRESS = -1;
 
     /**
-     * Closure for executing write operations on the storage.
+     * Closure for executing write operations on the storage. All write operations, such as
+     * {@link #addWrite(RowId, BinaryRow, UUID, UUID, int)} or {@link #commitWrite(RowId, HybridTimestamp)},
+     * as well as {@link #scanVersions(RowId)}, and operations like {@link #committedGroupConfiguration(byte[])}, must be executed inside
+     * of the write closure. Also, each operation that involves modifying rows (and {@link #scanVersions(RowId)} must hold lock on
+     * corresponding row ID, by either calling {@link Locker#lock(RowId)} or calling {@link Locker#tryLock(RowId)} and checking the
+     * result.
      *
      * @param <V> Type of the result returned from the closure.
      */
     @SuppressWarnings("PublicInnerClass")
     @FunctionalInterface
     interface WriteClosure<V> {
-        V execute() throws StorageException;
+        V execute(Locker locker) throws StorageException;
+    }
+
+    /**
+     * Parameter type for {@link WriteClosure#execute(Locker)}. Used to lock row IDs before updating the data. All acquired locks are
+     * released automatically after {@code execute} call is completed.
+     */
+    @SuppressWarnings("PublicInnerClass")
+    interface Locker {
+        /**
+         * Locks passed row ID until the {@link WriteClosure#execute(Locker)} is completed.
+         *
+         * @param rowId Row ID to lock.
+         */
+        void lock(RowId rowId);
+
+        /**
+         * Tries to lock passed row ID. If successful, lock will be released when the {@link WriteClosure#execute(Locker)} is completed.
+         *
+         * @param rowId Row ID to lock.
+         * @return {@code true} if row ID has been locked successfully, or the lock has already been held by current thread.

Review Comment:
   ```suggestion
            * @return {@code true} if row ID has been locked successfully, {@code false} if the lock has already been held by current thread.
   ```



##########
modules/storage-api/src/main/java/org/apache/ignite/internal/storage/util/LockHolder.java:
##########
@@ -36,7 +35,7 @@
 class LockHolder<T extends Lock> {
     private final T lock;
 
-    private final AtomicInteger lockHolder = new AtomicInteger();
+    private int lockHoldersCount;

Review Comment:
   Why is non-volatile field is ok now?



##########
modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/GarbageCollector.java:
##########
@@ -153,69 +154,81 @@ boolean tryAddToGcQueue(WriteBatchWithIndex writeBatch, RowId rowId, HybridTimes
     /**
      * Polls an element for vacuum. See {@link org.apache.ignite.internal.storage.MvPartitionStorage#pollForVacuum(HybridTimestamp)}.
      *
-     * @param batch Write batch.
      * @param lowWatermark Low watermark.
-     * @return Garbage collected element.
+     * @return Garbage collected element descriptor.
      * @throws RocksDBException If failed to collect the garbage.
      */
-    @Nullable BinaryRowAndRowId pollForVacuum(WriteBatchWithIndex batch, HybridTimestamp lowWatermark) throws RocksDBException {
-        ColumnFamilyHandle partCf = helper.partCf;
-
+    @Nullable GcEntry peek(HybridTimestamp lowWatermark) {
         // We retrieve the first element of the GC queue and seek for it in the data CF.
         // However, the element that we need to garbage collect is the next (older one) element.
         // First we check if there's anything to garbage collect. If the element is a tombstone we remove it.
         // If the next element exists, that should be the element that we want to garbage collect.
-        try (RocksIterator gcIt = db.newIterator(gcQueueCf, helper.upperBoundReadOpts)) {
+        try (
+                RocksIterator foo = db.newIterator(gcQueueCf, helper.upperBoundReadOpts);

Review Comment:
   `foo` doesn't seem a good name, how about `bareIt`?



##########
modules/storage-api/src/test/java/org/apache/ignite/internal/storage/util/LockByRowIdTest.java:
##########
@@ -25,135 +25,97 @@
 import static org.junit.jupiter.api.Assertions.assertThrows;
 
 import java.util.concurrent.CompletableFuture;
+import java.util.function.Supplier;
 import org.apache.ignite.internal.storage.RowId;
-import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
 /**
- * Class for testing {@link ReentrantLockByRowId}.
+ * Class for testing {@link LockByRowId}.
  */
-public class ReentrantLockByRowIdTest {
-    private ReentrantLockByRowId lockByRowId;
+public class LockByRowIdTest {
+    private LockByRowId lockByRowId;
 
     @BeforeEach
     void setUp() {
-        lockByRowId = new ReentrantLockByRowId();
-    }
-
-    @AfterEach
-    void tearDown() {
-        lockByRowId.releaseAllLockByCurrentThread();
+        lockByRowId = new LockByRowId();
     }
 
     @Test
     void testSimple() {
         RowId rowId = new RowId(0);
 
-        lockByRowId.acquireLock(rowId);
-        lockByRowId.releaseLock(rowId);
+        lockByRowId.lock(rowId);
+        lockByRowId.unlockAll(rowId);
 
-        assertEquals(1, lockByRowId.inLock(rowId, () -> 1));
+        assertEquals(1, inLock(rowId, () -> 1));
     }
 
     @Test
     void testSimpleReEnter() {
         RowId rowId = new RowId(0);
 
-        lockByRowId.acquireLock(rowId);
-        lockByRowId.acquireLock(rowId);
-
-        lockByRowId.inLock(rowId, () -> {
-            lockByRowId.acquireLock(rowId);
-
-            lockByRowId.releaseLock(rowId);
-
-            return null;
-        });
+        lockByRowId.lock(rowId);
+        lockByRowId.lock(rowId);
 
-        lockByRowId.releaseLock(rowId);
-        lockByRowId.releaseLock(rowId);
+        lockByRowId.unlockAll(rowId);
     }
 
     @Test
     void testReleaseError() {
-        assertThrows(IllegalStateException.class, () -> lockByRowId.releaseLock(new RowId(0)));
+        assertThrows(IllegalStateException.class, () -> lockByRowId.unlockAll(new RowId(0)));
 
         RowId rowId = new RowId(0);
 
-        assertThat(runAsync(() -> lockByRowId.acquireLock(rowId)), willCompleteSuccessfully());
+        assertThat(runAsync(() -> lockByRowId.lock(rowId)), willCompleteSuccessfully());
 
-        assertThrows(IllegalMonitorStateException.class, () -> lockByRowId.releaseLock(rowId));
+        assertThrows(IllegalMonitorStateException.class, () -> lockByRowId.unlockAll(rowId));
     }
 
     @Test
     void testBlockSimple() {
         RowId rowId = new RowId(0);
 
-        lockByRowId.acquireLock(rowId);
-        lockByRowId.acquireLock(rowId);
+        lockByRowId.lock(rowId);
+        lockByRowId.lock(rowId);
 
         CompletableFuture<?> acquireLockFuture = runAsync(() -> {
-            lockByRowId.acquireLock(rowId);
-            lockByRowId.releaseLock(rowId);
+            lockByRowId.lock(rowId);
+            lockByRowId.unlockAll(rowId);
         });
 
         assertThat(acquireLockFuture, willTimeoutFast());
 
-        lockByRowId.releaseLock(rowId);
-
-        assertThat(acquireLockFuture, willTimeoutFast());
-
-        lockByRowId.releaseLock(rowId);
+        lockByRowId.unlockAll(rowId);
 
         assertThat(acquireLockFuture, willCompleteSuccessfully());
-
-        lockByRowId.acquireLock(rowId);
     }
 
     @Test
     void testBlockSupplier() {

Review Comment:
   Why is this test needed? Now, after `inLock()` has been removed from the tested class, it seems to test same logic as the preceding test.



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/StorageUpdateHandler.java:
##########
@@ -189,6 +198,8 @@ public void handleUpdateAll(
                     RowId rowId = new RowId(partitionId, entry.getKey());
                     BinaryRow row = entry.getValue() != null ? new ByteBufferRow(entry.getValue()) : null;
 
+                    locker.lock(rowId);

Review Comment:
   Shouldn't we explicitly sort the row IDs before taking locks on them?



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java:
##########
@@ -451,7 +454,10 @@ void handleBuildIndexCommand(BuildIndexCommand cmd, long commandIndex, long comm
             return;
         }
 
-        storage.runConsistently(() -> {
+        storage.runConsistently(locker -> {
+            //TODO Assert order?

Review Comment:
   Same as above (about TODO and ordering)



##########
modules/storage-api/src/main/java/org/apache/ignite/internal/storage/util/LockByRowId.java:
##########
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.util;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.locks.ReentrantLock;
+import org.apache.ignite.internal.storage.RowId;
+
+/**
+ * {@link ReentrantLock} by row ID.
+ *
+ * <p>Allows synchronization of version chain update operations.
+ */
+public class LockByRowId {
+    private final ConcurrentMap<RowId, LockHolder<ReentrantLock>> lockHolderByRowId = new ConcurrentHashMap<>();
+
+    /**
+     * Acquires the lock by row ID.
+     *
+     * @param rowId Row ID.
+     */
+    public void lock(RowId rowId) {
+        LockHolder<ReentrantLock> lockHolder = lockHolderByRowId.compute(rowId, (id, holder) -> {
+            if (holder == null) {
+                holder = new LockHolder<>(new ReentrantLock());
+            }
+
+            if (!holder.getLock().isHeldByCurrentThread()) {
+                holder.incrementHolders();
+            }
+
+            return holder;
+        });
+
+        if (!lockHolder.getLock().isHeldByCurrentThread()) {
+            lockHolder.getLock().lock();
+        }
+    }
+
+    /**
+     * Tries to acquire the lock by row ID.
+     *
+     * @param rowId Row ID.
+     * @return {@code true} if lock has been acquired successfully.
+     */
+    public boolean tryLock(RowId rowId) {
+        boolean[] result = {false};
+
+        lockHolderByRowId.compute(rowId, (id, holder) -> {
+            if (holder == null) {
+                holder = new LockHolder<>(new ReentrantLock());
+
+                holder.incrementHolders();
+
+                // Locks immediately, because no one else have seen this instance yet.

Review Comment:
   ```suggestion
                   // Locks immediately, because no one else has seen this instance yet.
   ```



##########
modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java:
##########
@@ -853,24 +859,22 @@ protected ReadResult decodeEntry(byte[] key, byte[] value) {
 
                 @Override
                 public boolean hasNext() {
-                    return busy(() -> {
-                        throwExceptionIfStorageInProgressOfRebalance(state.get(), RocksDbMvPartitionStorage.this::createStorageInfo);
+                    assert rowIsLocked(rowId);
 
-                        return super.hasNext();
-                    });
+                    return super.hasNext();
                 }
 
                 @Override
                 public ReadResult next() {
-                    return busy(() -> {
-                        throwExceptionIfStorageInProgressOfRebalance(state.get(), RocksDbMvPartitionStorage.this::createStorageInfo);
+                    assert rowIsLocked(rowId);
 
-                        return super.next();
-                    });
+                    return super.next();
                 }
 
                 @Override
                 public void close() {
+                    assert rowIsLocked(rowId);

Review Comment:
   Do we really need to assert this on close?



##########
modules/storage-api/src/main/java/org/apache/ignite/internal/storage/MvPartitionStorage.java:
##########
@@ -47,14 +48,41 @@ public interface MvPartitionStorage extends ManuallyCloseable {
     long REBALANCE_IN_PROGRESS = -1;
 
     /**
-     * Closure for executing write operations on the storage.
+     * Closure for executing write operations on the storage. All write operations, such as
+     * {@link #addWrite(RowId, BinaryRow, UUID, UUID, int)} or {@link #commitWrite(RowId, HybridTimestamp)},
+     * as well as {@link #scanVersions(RowId)}, and operations like {@link #committedGroupConfiguration(byte[])}, must be executed inside
+     * of the write closure. Also, each operation that involves modifying rows (and {@link #scanVersions(RowId)} must hold lock on
+     * corresponding row ID, by either calling {@link Locker#lock(RowId)} or calling {@link Locker#tryLock(RowId)} and checking the
+     * result.
      *
      * @param <V> Type of the result returned from the closure.
      */
     @SuppressWarnings("PublicInnerClass")
     @FunctionalInterface
     interface WriteClosure<V> {
-        V execute() throws StorageException;
+        V execute(Locker locker) throws StorageException;
+    }
+
+    /**
+     * Parameter type for {@link WriteClosure#execute(Locker)}. Used to lock row IDs before updating the data. All acquired locks are
+     * released automatically after {@code execute} call is completed.
+     */
+    @SuppressWarnings("PublicInnerClass")
+    interface Locker {
+        /**
+         * Locks passed row ID until the {@link WriteClosure#execute(Locker)} is completed.

Review Comment:
   Should it be mentioned that it blocks until it can take a lock?



##########
modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/PersistentPageMemoryMvPartitionStorage.java:
##########
@@ -136,19 +137,31 @@ public void afterCheckpointEnd(CheckpointProgress progress) {
 
     @Override
     public <V> V runConsistently(WriteClosure<V> closure) throws StorageException {
-        return busy(() -> {
-            throwExceptionIfStorageNotInRunnableOrRebalanceState(state.get(), this::createStorageInfo);
+        LocalLocker locker = THREAD_LOCAL_LOCKER.get();
 
-            checkpointTimeoutLock.checkpointReadLock();
+        if (locker != null) {
+            return closure.execute(locker);
+        } else {
+            return busy(() -> {
+                throwExceptionIfStorageNotInRunnableOrRebalanceState(state.get(), this::createStorageInfo);
 
-            try {
-                return closure.execute();
-            } finally {
-                updateVersionChainLockByRowId.releaseAllLockByCurrentThread();
+                LocalLocker locker0 = new LocalLocker(lockByRowId);
 
-                checkpointTimeoutLock.checkpointReadUnlock();
-            }
-        });
+                checkpointTimeoutLock.checkpointReadLock();
+
+                THREAD_LOCAL_LOCKER.set(locker0);
+
+                try {
+                    return closure.execute(locker0);
+                } finally {
+                    THREAD_LOCAL_LOCKER.set(null);
+
+                    locker0.unlockAll();

Review Comment:
   If `unlockAll()` fails for some reason, the checkpoint lock will not be released. How about another `try/finally`?



##########
modules/storage-api/src/main/java/org/apache/ignite/internal/storage/MvPartitionStorage.java:
##########
@@ -226,8 +273,32 @@ public interface MvPartitionStorage extends ManuallyCloseable {
      *      {@code null} if there's no such value.
      * @throws StorageException If failed to poll element for vacuum.
      */
+    //TODO IGNITE-19367 Remove this method and replace its usages with proper batch removes.
+    @Deprecated
     default @Nullable BinaryRowAndRowId pollForVacuum(HybridTimestamp lowWatermark) {
-        throw new UnsupportedOperationException("pollForVacuum");
+        while (true) {
+            BinaryRowAndRowId binaryRowAndRowId = runConsistently(locker -> {
+                GcEntry gcEntry = peek(lowWatermark);
+
+                if (gcEntry == null) {
+                    return null;
+                }
+
+                locker.lock(gcEntry.getRowId());

Review Comment:
   Schematic code in the Jira ticket suggests to use `tryLock()` here, is `lock()` ok?



##########
modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/GarbageCollector.java:
##########
@@ -153,69 +154,81 @@ boolean tryAddToGcQueue(WriteBatchWithIndex writeBatch, RowId rowId, HybridTimes
     /**
      * Polls an element for vacuum. See {@link org.apache.ignite.internal.storage.MvPartitionStorage#pollForVacuum(HybridTimestamp)}.
      *
-     * @param batch Write batch.
      * @param lowWatermark Low watermark.
-     * @return Garbage collected element.
+     * @return Garbage collected element descriptor.
      * @throws RocksDBException If failed to collect the garbage.
      */
-    @Nullable BinaryRowAndRowId pollForVacuum(WriteBatchWithIndex batch, HybridTimestamp lowWatermark) throws RocksDBException {
-        ColumnFamilyHandle partCf = helper.partCf;
-
+    @Nullable GcEntry peek(HybridTimestamp lowWatermark) {
         // We retrieve the first element of the GC queue and seek for it in the data CF.
         // However, the element that we need to garbage collect is the next (older one) element.
         // First we check if there's anything to garbage collect. If the element is a tombstone we remove it.
         // If the next element exists, that should be the element that we want to garbage collect.
-        try (RocksIterator gcIt = db.newIterator(gcQueueCf, helper.upperBoundReadOpts)) {
+        try (
+                RocksIterator foo = db.newIterator(gcQueueCf, helper.upperBoundReadOpts);
+                RocksIterator gcIt = helper.wrapIterator(foo, gcQueueCf)
+        ) {
             gcIt.seek(helper.partitionStartPrefix());
 
             if (invalid(gcIt)) {
                 // GC queue is empty.
                 return null;
             }
 
-            ByteBuffer gcKeyBuffer = readGcKey(gcIt);
+            ByteBuffer gcKeyBuffer = readGcKey(foo);
 
             GcRowVersion gcRowVersion = toGcRowVersion(gcKeyBuffer);
 
-            while (true) {
-                if (gcRowVersion.getRowTimestamp().compareTo(lowWatermark) > 0) {
-                    // No elements to garbage collect.
-                    return null;
-                }
+            if (gcRowVersion.getTimestamp().compareTo(lowWatermark) > 0) {
+                // No elements to garbage collect.
+                return null;
+            }
 
-                // If no one has processed the head of the gc queue in parallel, then we must release the lock after write batch in
-                // WriteClosure#execute of MvPartitionStorage#runConsistently so that the indexes can be deleted consistently.
-                helper.lockByRowId.acquireLock(gcRowVersion.getRowId());
+            return gcRowVersion;
+        }
+    }
 
-                // We must refresh the iterator to try to read the head of the gc queue again and if someone deleted it in parallel,
-                // then read the new head of the queue.
-                refreshGcIterator(gcIt, gcKeyBuffer);
 
-                if (invalid(gcIt)) {
-                    // GC queue is empty.
-                    return null;
-                }
+    /**
+     * Polls an element for vacuum. See {@link org.apache.ignite.internal.storage.MvPartitionStorage#pollForVacuum(HybridTimestamp)}.
+     *
+     * @param batch Write batch.
+     * @param lowWatermark Low watermark.
+     * @return Garbage collected element.
+     * @throws RocksDBException If failed to collect the garbage.
+     */
+    @Nullable BinaryRow vacuum(WriteBatchWithIndex batch, GcEntry entry) throws RocksDBException {
+        assert entry instanceof GcRowVersion;
 
-                gcKeyBuffer = readGcKey(gcIt);
+        ColumnFamilyHandle partCf = helper.partCf;
 
-                GcRowVersion oldGcRowVersion = gcRowVersion;
+        // We retrieve the first element of the GC queue and seek for it in the data CF.
+        // However, the element that we need to garbage collect is the next (older one) element.
+        // First we check if there's anything to garbage collect. If the element is a tombstone we remove it.
+        // If the next element exists, that should be the element that we want to garbage collect.
+        try (RocksIterator gcIt = db.newIterator(gcQueueCf, helper.upperBoundReadOpts)) {
+            gcIt.seek(helper.partitionStartPrefix());
 
-                gcRowVersion = toGcRowVersion(gcKeyBuffer);
+            if (invalid(gcIt)) {
+                // GC queue is empty.
+                return null;
+            }
 
-                // Someone has processed the element in parallel, so we need to take a new head of the queue.
-                if (!gcRowVersion.equals(oldGcRowVersion)) {
-                    helper.lockByRowId.releaseLock(oldGcRowVersion.getRowId());
+            ByteBuffer gcKeyBuffer = readGcKey(gcIt);
 
-                    continue;
-                }
+            GcRowVersion gcRowVersion = toGcRowVersion(gcKeyBuffer);
 
-                break;
+            // Someone has processed the element in parallel, so we need to take a new head of the queue.
+            if (!gcRowVersion.equals(entry)) {
+                return null;
             }
 
             // Delete element from the GC queue.
             batch.delete(gcQueueCf, gcKeyBuffer);
 
-            try (RocksIterator it = db.newIterator(partCf, helper.upperBoundReadOpts)) {
+            try (
+                    RocksIterator foo = db.newIterator(partCf, helper.upperBoundReadOpts);

Review Comment:
   `foo` again



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java:
##########
@@ -324,7 +324,10 @@ private void handleTxCleanupCommand(TxCleanupCommand cmd, long commandIndex, lon
         Set<RowId> pendingRowIds = txsPendingRowIds.getOrDefault(txId, Collections.emptySet());
 
         if (cmd.commit()) {
-            storage.runConsistently(() -> {
+            storage.runConsistently(locker -> {
+                //TODO Split & sort.

Review Comment:
   1. We should not have a TODO without a mentioned Jira ticket
   2. Are you sure we can merge this PR without sorting the rowIds? It seems pretty dangerous, we don't want deadlocks



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] ibessonov commented on a diff in pull request #1976: IGNITE-19269 Implemented methods to explicitly lock rows in MV storage.

Posted by "ibessonov (via GitHub)" <gi...@apache.org>.
ibessonov commented on code in PR #1976:
URL: https://github.com/apache/ignite-3/pull/1976#discussion_r1180308170


##########
modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/PersistentPageMemoryMvPartitionStorage.java:
##########
@@ -136,19 +137,31 @@ public void afterCheckpointEnd(CheckpointProgress progress) {
 
     @Override
     public <V> V runConsistently(WriteClosure<V> closure) throws StorageException {
-        return busy(() -> {
-            throwExceptionIfStorageNotInRunnableOrRebalanceState(state.get(), this::createStorageInfo);
+        LocalLocker locker = THREAD_LOCAL_LOCKER.get();
 
-            checkpointTimeoutLock.checkpointReadLock();
+        if (locker != null) {
+            return closure.execute(locker);
+        } else {
+            return busy(() -> {
+                throwExceptionIfStorageNotInRunnableOrRebalanceState(state.get(), this::createStorageInfo);
 
-            try {
-                return closure.execute();
-            } finally {
-                updateVersionChainLockByRowId.releaseAllLockByCurrentThread();
+                LocalLocker locker0 = new LocalLocker(lockByRowId);
 
-                checkpointTimeoutLock.checkpointReadUnlock();
-            }
-        });
+                checkpointTimeoutLock.checkpointReadLock();
+
+                THREAD_LOCAL_LOCKER.set(locker0);
+
+                try {
+                    return closure.execute(locker0);
+                } finally {
+                    THREAD_LOCAL_LOCKER.set(null);
+
+                    locker0.unlockAll();

Review Comment:
   `unlockAll` can't fail. The only possible exception is `IllegalMonitorStateException`, which we hope will never happen. So the order is deliberate.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] ibessonov commented on a diff in pull request #1976: IGNITE-19269 Implemented methods to explicitly lock rows in MV storage.

Posted by "ibessonov (via GitHub)" <gi...@apache.org>.
ibessonov commented on code in PR #1976:
URL: https://github.com/apache/ignite-3/pull/1976#discussion_r1182274790


##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java:
##########
@@ -70,6 +75,9 @@ public class PartitionListener implements RaftGroupListener {
     /** Logger. */
     private static final IgniteLogger LOG = Loggers.forClass(PartitionListener.class);
 
+    /** Empty sorted set. */
+    private static final SortedSet<RowId> EMPTY_SET = unmodifiableSortedSet(new TreeSet<>());

Review Comment:
   Nice



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] rpuch commented on a diff in pull request #1976: IGNITE-19269 Implemented methods to explicitly lock rows in MV storage.

Posted by "rpuch (via GitHub)" <gi...@apache.org>.
rpuch commented on code in PR #1976:
URL: https://github.com/apache/ignite-3/pull/1976#discussion_r1180343055


##########
modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java:
##########
@@ -853,24 +859,22 @@ protected ReadResult decodeEntry(byte[] key, byte[] value) {
 
                 @Override
                 public boolean hasNext() {
-                    return busy(() -> {
-                        throwExceptionIfStorageInProgressOfRebalance(state.get(), RocksDbMvPartitionStorage.this::createStorageInfo);
+                    assert rowIsLocked(rowId);
 
-                        return super.hasNext();
-                    });
+                    return super.hasNext();
                 }
 
                 @Override
                 public ReadResult next() {
-                    return busy(() -> {
-                        throwExceptionIfStorageInProgressOfRebalance(state.get(), RocksDbMvPartitionStorage.this::createStorageInfo);
+                    assert rowIsLocked(rowId);
 
-                        return super.next();
-                    });
+                    return super.next();
                 }
 
                 @Override
                 public void close() {
+                    assert rowIsLocked(rowId);

Review Comment:
   I don't have a hard opinion on this, but sometimes an (unrelated) error happening while trying to release a resource might be a problem (like masking a real error)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] ibessonov commented on a diff in pull request #1976: IGNITE-19269 Implemented methods to explicitly lock rows in MV storage.

Posted by "ibessonov (via GitHub)" <gi...@apache.org>.
ibessonov commented on code in PR #1976:
URL: https://github.com/apache/ignite-3/pull/1976#discussion_r1180308654


##########
modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/GarbageCollector.java:
##########
@@ -153,69 +154,81 @@ boolean tryAddToGcQueue(WriteBatchWithIndex writeBatch, RowId rowId, HybridTimes
     /**
      * Polls an element for vacuum. See {@link org.apache.ignite.internal.storage.MvPartitionStorage#pollForVacuum(HybridTimestamp)}.
      *
-     * @param batch Write batch.
      * @param lowWatermark Low watermark.
-     * @return Garbage collected element.
+     * @return Garbage collected element descriptor.
      * @throws RocksDBException If failed to collect the garbage.
      */
-    @Nullable BinaryRowAndRowId pollForVacuum(WriteBatchWithIndex batch, HybridTimestamp lowWatermark) throws RocksDBException {
-        ColumnFamilyHandle partCf = helper.partCf;
-
+    @Nullable GcEntry peek(HybridTimestamp lowWatermark) {
         // We retrieve the first element of the GC queue and seek for it in the data CF.
         // However, the element that we need to garbage collect is the next (older one) element.
         // First we check if there's anything to garbage collect. If the element is a tombstone we remove it.
         // If the next element exists, that should be the element that we want to garbage collect.
-        try (RocksIterator gcIt = db.newIterator(gcQueueCf, helper.upperBoundReadOpts)) {
+        try (
+                RocksIterator foo = db.newIterator(gcQueueCf, helper.upperBoundReadOpts);

Review Comment:
   Oops



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] ibessonov commented on a diff in pull request #1976: IGNITE-19269 Implemented methods to explicitly lock rows in MV storage.

Posted by "ibessonov (via GitHub)" <gi...@apache.org>.
ibessonov commented on code in PR #1976:
URL: https://github.com/apache/ignite-3/pull/1976#discussion_r1180310872


##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/StorageUpdateHandler.java:
##########
@@ -189,6 +198,8 @@ public void handleUpdateAll(
                     RowId rowId = new RowId(partitionId, entry.getKey());
                     BinaryRow row = entry.getValue() != null ? new ByteBufferRow(entry.getValue()) : null;
 
+                    locker.lock(rowId);

Review Comment:
   Yes, we should. I though I might do that in separate issue, but forgot to create it and add TODO. Good catch, thank you!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] ibessonov commented on a diff in pull request #1976: IGNITE-19269 Implemented methods to explicitly lock rows in MV storage.

Posted by "ibessonov (via GitHub)" <gi...@apache.org>.
ibessonov commented on code in PR #1976:
URL: https://github.com/apache/ignite-3/pull/1976#discussion_r1180346606


##########
modules/storage-api/src/main/java/org/apache/ignite/internal/storage/util/LockHolder.java:
##########
@@ -36,7 +35,7 @@
 class LockHolder<T extends Lock> {
     private final T lock;
 
-    private final AtomicInteger lockHolder = new AtomicInteger();
+    private int lockHoldersCount;

Review Comment:
   But why would I make something "volatile" or atomic if it doesn't need to be? This is misleading



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] ibessonov commented on a diff in pull request #1976: IGNITE-19269 Implemented methods to explicitly lock rows in MV storage.

Posted by "ibessonov (via GitHub)" <gi...@apache.org>.
ibessonov commented on code in PR #1976:
URL: https://github.com/apache/ignite-3/pull/1976#discussion_r1182229173


##########
modules/storage-api/src/main/java/org/apache/ignite/internal/storage/MvPartitionStorage.java:
##########
@@ -47,14 +48,41 @@ public interface MvPartitionStorage extends ManuallyCloseable {
     long REBALANCE_IN_PROGRESS = -1;
 
     /**
-     * Closure for executing write operations on the storage.
+     * Closure for executing write operations on the storage. All write operations, such as
+     * {@link #addWrite(RowId, BinaryRow, UUID, UUID, int)} or {@link #commitWrite(RowId, HybridTimestamp)},
+     * as well as {@link #scanVersions(RowId)}, and operations like {@link #committedGroupConfiguration(byte[])}, must be executed inside
+     * of the write closure. Also, each operation that involves modifying rows (and {@link #scanVersions(RowId)} must hold lock on
+     * corresponding row ID, by either calling {@link Locker#lock(RowId)} or calling {@link Locker#tryLock(RowId)} and checking the
+     * result.
      *
      * @param <V> Type of the result returned from the closure.
      */
     @SuppressWarnings("PublicInnerClass")
     @FunctionalInterface
     interface WriteClosure<V> {
-        V execute() throws StorageException;
+        V execute(Locker locker) throws StorageException;
+    }
+
+    /**
+     * Parameter type for {@link WriteClosure#execute(Locker)}. Used to lock row IDs before updating the data. All acquired locks are
+     * released automatically after {@code execute} call is completed.
+     */
+    @SuppressWarnings("PublicInnerClass")
+    interface Locker {
+        /**
+         * Locks passed row ID until the {@link WriteClosure#execute(Locker)} is completed.
+         *
+         * @param rowId Row ID to lock.
+         */
+        void lock(RowId rowId);
+
+        /**
+         * Tries to lock passed row ID. If successful, lock will be released when the {@link WriteClosure#execute(Locker)} is completed.
+         *
+         * @param rowId Row ID to lock.
+         * @return {@code true} if row ID has been locked successfully, or the lock has already been held by current thread.

Review Comment:
   "false" is everything else, right?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] ibessonov commented on a diff in pull request #1976: IGNITE-19269 Implemented methods to explicitly lock rows in MV storage.

Posted by "ibessonov (via GitHub)" <gi...@apache.org>.
ibessonov commented on code in PR #1976:
URL: https://github.com/apache/ignite-3/pull/1976#discussion_r1182236083


##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java:
##########
@@ -70,6 +74,9 @@ public class PartitionListener implements RaftGroupListener {
     /** Logger. */
     private static final IgniteLogger LOG = Loggers.forClass(PartitionListener.class);
 
+    /** Empty sorted set. */
+    private static final SortedSet<RowId> EMPTY_SET = new TreeSet<>();

Review Comment:
   I couldn't find such thing for some reason, maybe I didn't look hard enough (if at all, there's no excuse). Good idea, I'll do it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] ibessonov commented on a diff in pull request #1976: IGNITE-19269 Implemented methods to explicitly lock rows in MV storage.

Posted by "ibessonov (via GitHub)" <gi...@apache.org>.
ibessonov commented on code in PR #1976:
URL: https://github.com/apache/ignite-3/pull/1976#discussion_r1180303522


##########
modules/storage-api/src/main/java/org/apache/ignite/internal/storage/MvPartitionStorage.java:
##########
@@ -226,8 +273,32 @@ public interface MvPartitionStorage extends ManuallyCloseable {
      *      {@code null} if there's no such value.
      * @throws StorageException If failed to poll element for vacuum.
      */
+    //TODO IGNITE-19367 Remove this method and replace its usages with proper batch removes.
+    @Deprecated
     default @Nullable BinaryRowAndRowId pollForVacuum(HybridTimestamp lowWatermark) {
-        throw new UnsupportedOperationException("pollForVacuum");
+        while (true) {
+            BinaryRowAndRowId binaryRowAndRowId = runConsistently(locker -> {
+                GcEntry gcEntry = peek(lowWatermark);
+
+                if (gcEntry == null) {
+                    return null;
+                }
+
+                locker.lock(gcEntry.getRowId());

Review Comment:
   Yes, it is correct for now. We'll have to migrate to "tryLock" when we have a loop inside of the "runConsistently" closure.
   When you only have a single lock, there's no chance of a deadlock



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] ibessonov commented on a diff in pull request #1976: IGNITE-19269 Implemented methods to explicitly lock rows in MV storage.

Posted by "ibessonov (via GitHub)" <gi...@apache.org>.
ibessonov commented on code in PR #1976:
URL: https://github.com/apache/ignite-3/pull/1976#discussion_r1182234547


##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/StorageUpdateHandler.java:
##########
@@ -189,6 +198,8 @@ public void handleUpdateAll(
                     RowId rowId = new RowId(partitionId, entry.getKey());
                     BinaryRow row = entry.getValue() != null ? new ByteBufferRow(entry.getValue()) : null;
 
+                    locker.lock(rowId);

Review Comment:
   I agree with you, I added the sorting



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] rpuch commented on a diff in pull request #1976: IGNITE-19269 Implemented methods to explicitly lock rows in MV storage.

Posted by "rpuch (via GitHub)" <gi...@apache.org>.
rpuch commented on code in PR #1976:
URL: https://github.com/apache/ignite-3/pull/1976#discussion_r1180334361


##########
modules/storage-api/src/main/java/org/apache/ignite/internal/storage/MvPartitionStorage.java:
##########
@@ -47,14 +48,41 @@ public interface MvPartitionStorage extends ManuallyCloseable {
     long REBALANCE_IN_PROGRESS = -1;
 
     /**
-     * Closure for executing write operations on the storage.
+     * Closure for executing write operations on the storage. All write operations, such as
+     * {@link #addWrite(RowId, BinaryRow, UUID, UUID, int)} or {@link #commitWrite(RowId, HybridTimestamp)},
+     * as well as {@link #scanVersions(RowId)}, and operations like {@link #committedGroupConfiguration(byte[])}, must be executed inside
+     * of the write closure. Also, each operation that involves modifying rows (and {@link #scanVersions(RowId)} must hold lock on
+     * corresponding row ID, by either calling {@link Locker#lock(RowId)} or calling {@link Locker#tryLock(RowId)} and checking the
+     * result.
      *
      * @param <V> Type of the result returned from the closure.
      */
     @SuppressWarnings("PublicInnerClass")
     @FunctionalInterface
     interface WriteClosure<V> {
-        V execute() throws StorageException;
+        V execute(Locker locker) throws StorageException;
+    }
+
+    /**
+     * Parameter type for {@link WriteClosure#execute(Locker)}. Used to lock row IDs before updating the data. All acquired locks are
+     * released automatically after {@code execute} call is completed.
+     */
+    @SuppressWarnings("PublicInnerClass")
+    interface Locker {
+        /**
+         * Locks passed row ID until the {@link WriteClosure#execute(Locker)} is completed.
+         *
+         * @param rowId Row ID to lock.
+         */
+        void lock(RowId rowId);
+
+        /**
+         * Tries to lock passed row ID. If successful, lock will be released when the {@link WriteClosure#execute(Locker)} is completed.
+         *
+         * @param rowId Row ID to lock.
+         * @return {@code true} if row ID has been locked successfully, or the lock has already been held by current thread.

Review Comment:
   Oh, I've misread the second half. Still, it seems that explicit mention of what `false` means should be useful.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] ibessonov commented on a diff in pull request #1976: IGNITE-19269 Implemented methods to explicitly lock rows in MV storage.

Posted by "ibessonov (via GitHub)" <gi...@apache.org>.
ibessonov commented on code in PR #1976:
URL: https://github.com/apache/ignite-3/pull/1976#discussion_r1180345536


##########
modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java:
##########
@@ -853,24 +859,22 @@ protected ReadResult decodeEntry(byte[] key, byte[] value) {
 
                 @Override
                 public boolean hasNext() {
-                    return busy(() -> {
-                        throwExceptionIfStorageInProgressOfRebalance(state.get(), RocksDbMvPartitionStorage.this::createStorageInfo);
+                    assert rowIsLocked(rowId);
 
-                        return super.hasNext();
-                    });
+                    return super.hasNext();
                 }
 
                 @Override
                 public ReadResult next() {
-                    return busy(() -> {
-                        throwExceptionIfStorageInProgressOfRebalance(state.get(), RocksDbMvPartitionStorage.this::createStorageInfo);
+                    assert rowIsLocked(rowId);
 
-                        return super.next();
-                    });
+                    return super.next();
                 }
 
                 @Override
                 public void close() {
+                    assert rowIsLocked(rowId);

Review Comment:
   this assertion should never happen in practice



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] ibessonov commented on a diff in pull request #1976: IGNITE-19269 Implemented methods to explicitly lock rows in MV storage.

Posted by "ibessonov (via GitHub)" <gi...@apache.org>.
ibessonov commented on code in PR #1976:
URL: https://github.com/apache/ignite-3/pull/1976#discussion_r1180349904


##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/StorageUpdateHandler.java:
##########
@@ -189,6 +198,8 @@ public void handleUpdateAll(
                     RowId rowId = new RowId(partitionId, entry.getKey());
                     BinaryRow row = entry.getValue() != null ? new ByteBufferRow(entry.getValue()) : null;
 
+                    locker.lock(rowId);

Review Comment:
   I did that, yes



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] ibessonov commented on a diff in pull request #1976: IGNITE-19269 Implemented methods to explicitly lock rows in MV storage.

Posted by "ibessonov (via GitHub)" <gi...@apache.org>.
ibessonov commented on code in PR #1976:
URL: https://github.com/apache/ignite-3/pull/1976#discussion_r1180312623


##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java:
##########
@@ -324,7 +324,10 @@ private void handleTxCleanupCommand(TxCleanupCommand cmd, long commandIndex, lon
         Set<RowId> pendingRowIds = txsPendingRowIds.getOrDefault(txId, Collections.emptySet());
 
         if (cmd.commit()) {
-            storage.runConsistently(() -> {
+            storage.runConsistently(locker -> {
+                //TODO Split & sort.

Review Comment:
   I'll fix it. It's not really dangerous right now, I guess, because all insertions are protected by external locks in transactions, but nonetheless. Code hygiene is never a bad thing 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] ibessonov commented on a diff in pull request #1976: IGNITE-19269 Implemented methods to explicitly lock rows in MV storage.

Posted by "ibessonov (via GitHub)" <gi...@apache.org>.
ibessonov commented on code in PR #1976:
URL: https://github.com/apache/ignite-3/pull/1976#discussion_r1180301523


##########
modules/storage-api/src/main/java/org/apache/ignite/internal/storage/MvPartitionStorage.java:
##########
@@ -47,14 +48,41 @@ public interface MvPartitionStorage extends ManuallyCloseable {
     long REBALANCE_IN_PROGRESS = -1;
 
     /**
-     * Closure for executing write operations on the storage.
+     * Closure for executing write operations on the storage. All write operations, such as
+     * {@link #addWrite(RowId, BinaryRow, UUID, UUID, int)} or {@link #commitWrite(RowId, HybridTimestamp)},
+     * as well as {@link #scanVersions(RowId)}, and operations like {@link #committedGroupConfiguration(byte[])}, must be executed inside
+     * of the write closure. Also, each operation that involves modifying rows (and {@link #scanVersions(RowId)} must hold lock on
+     * corresponding row ID, by either calling {@link Locker#lock(RowId)} or calling {@link Locker#tryLock(RowId)} and checking the
+     * result.
      *
      * @param <V> Type of the result returned from the closure.
      */
     @SuppressWarnings("PublicInnerClass")
     @FunctionalInterface
     interface WriteClosure<V> {
-        V execute() throws StorageException;
+        V execute(Locker locker) throws StorageException;
+    }
+
+    /**
+     * Parameter type for {@link WriteClosure#execute(Locker)}. Used to lock row IDs before updating the data. All acquired locks are
+     * released automatically after {@code execute} call is completed.
+     */
+    @SuppressWarnings("PublicInnerClass")
+    interface Locker {
+        /**
+         * Locks passed row ID until the {@link WriteClosure#execute(Locker)} is completed.
+         *
+         * @param rowId Row ID to lock.
+         */
+        void lock(RowId rowId);
+
+        /**
+         * Tries to lock passed row ID. If successful, lock will be released when the {@link WriteClosure#execute(Locker)} is completed.
+         *
+         * @param rowId Row ID to lock.
+         * @return {@code true} if row ID has been locked successfully, or the lock has already been held by current thread.

Review Comment:
   No, this is correct, we return `true` if current thread already holds the lock



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] ibessonov commented on a diff in pull request #1976: IGNITE-19269 Implemented methods to explicitly lock rows in MV storage.

Posted by "ibessonov (via GitHub)" <gi...@apache.org>.
ibessonov commented on code in PR #1976:
URL: https://github.com/apache/ignite-3/pull/1976#discussion_r1180307344


##########
modules/storage-api/src/test/java/org/apache/ignite/internal/storage/util/LockByRowIdTest.java:
##########
@@ -25,135 +25,97 @@
 import static org.junit.jupiter.api.Assertions.assertThrows;
 
 import java.util.concurrent.CompletableFuture;
+import java.util.function.Supplier;
 import org.apache.ignite.internal.storage.RowId;
-import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
 /**
- * Class for testing {@link ReentrantLockByRowId}.
+ * Class for testing {@link LockByRowId}.
  */
-public class ReentrantLockByRowIdTest {
-    private ReentrantLockByRowId lockByRowId;
+public class LockByRowIdTest {
+    private LockByRowId lockByRowId;
 
     @BeforeEach
     void setUp() {
-        lockByRowId = new ReentrantLockByRowId();
-    }
-
-    @AfterEach
-    void tearDown() {
-        lockByRowId.releaseAllLockByCurrentThread();
+        lockByRowId = new LockByRowId();
     }
 
     @Test
     void testSimple() {
         RowId rowId = new RowId(0);
 
-        lockByRowId.acquireLock(rowId);
-        lockByRowId.releaseLock(rowId);
+        lockByRowId.lock(rowId);
+        lockByRowId.unlockAll(rowId);
 
-        assertEquals(1, lockByRowId.inLock(rowId, () -> 1));
+        assertEquals(1, inLock(rowId, () -> 1));
     }
 
     @Test
     void testSimpleReEnter() {
         RowId rowId = new RowId(0);
 
-        lockByRowId.acquireLock(rowId);
-        lockByRowId.acquireLock(rowId);
-
-        lockByRowId.inLock(rowId, () -> {
-            lockByRowId.acquireLock(rowId);
-
-            lockByRowId.releaseLock(rowId);
-
-            return null;
-        });
+        lockByRowId.lock(rowId);
+        lockByRowId.lock(rowId);
 
-        lockByRowId.releaseLock(rowId);
-        lockByRowId.releaseLock(rowId);
+        lockByRowId.unlockAll(rowId);
     }
 
     @Test
     void testReleaseError() {
-        assertThrows(IllegalStateException.class, () -> lockByRowId.releaseLock(new RowId(0)));
+        assertThrows(IllegalStateException.class, () -> lockByRowId.unlockAll(new RowId(0)));
 
         RowId rowId = new RowId(0);
 
-        assertThat(runAsync(() -> lockByRowId.acquireLock(rowId)), willCompleteSuccessfully());
+        assertThat(runAsync(() -> lockByRowId.lock(rowId)), willCompleteSuccessfully());
 
-        assertThrows(IllegalMonitorStateException.class, () -> lockByRowId.releaseLock(rowId));
+        assertThrows(IllegalMonitorStateException.class, () -> lockByRowId.unlockAll(rowId));
     }
 
     @Test
     void testBlockSimple() {
         RowId rowId = new RowId(0);
 
-        lockByRowId.acquireLock(rowId);
-        lockByRowId.acquireLock(rowId);
+        lockByRowId.lock(rowId);
+        lockByRowId.lock(rowId);
 
         CompletableFuture<?> acquireLockFuture = runAsync(() -> {
-            lockByRowId.acquireLock(rowId);
-            lockByRowId.releaseLock(rowId);
+            lockByRowId.lock(rowId);
+            lockByRowId.unlockAll(rowId);
         });
 
         assertThat(acquireLockFuture, willTimeoutFast());
 
-        lockByRowId.releaseLock(rowId);
-
-        assertThat(acquireLockFuture, willTimeoutFast());
-
-        lockByRowId.releaseLock(rowId);
+        lockByRowId.unlockAll(rowId);
 
         assertThat(acquireLockFuture, willCompleteSuccessfully());
-
-        lockByRowId.acquireLock(rowId);
     }
 
     @Test
     void testBlockSupplier() {

Review Comment:
   I think you're right, I'll remove it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] ibessonov commented on a diff in pull request #1976: IGNITE-19269 Implemented methods to explicitly lock rows in MV storage.

Posted by "ibessonov (via GitHub)" <gi...@apache.org>.
ibessonov commented on code in PR #1976:
URL: https://github.com/apache/ignite-3/pull/1976#discussion_r1180310872


##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/StorageUpdateHandler.java:
##########
@@ -189,6 +198,8 @@ public void handleUpdateAll(
                     RowId rowId = new RowId(partitionId, entry.getKey());
                     BinaryRow row = entry.getValue() != null ? new ByteBufferRow(entry.getValue()) : null;
 
+                    locker.lock(rowId);

Review Comment:
   Yes, we should. I though I might do that in separate issue, but forgot to create it and add TODO. Good catch, thank you!
   On the other hand, code for sorting is not particularly big, I can do it with ease



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] ibessonov commented on a diff in pull request #1976: IGNITE-19269 Implemented methods to explicitly lock rows in MV storage.

Posted by "ibessonov (via GitHub)" <gi...@apache.org>.
ibessonov commented on code in PR #1976:
URL: https://github.com/apache/ignite-3/pull/1976#discussion_r1180340784


##########
modules/storage-api/src/main/java/org/apache/ignite/internal/storage/MvPartitionStorage.java:
##########
@@ -226,8 +273,32 @@ public interface MvPartitionStorage extends ManuallyCloseable {
      *      {@code null} if there's no such value.
      * @throws StorageException If failed to poll element for vacuum.
      */
+    //TODO IGNITE-19367 Remove this method and replace its usages with proper batch removes.
+    @Deprecated
     default @Nullable BinaryRowAndRowId pollForVacuum(HybridTimestamp lowWatermark) {
-        throw new UnsupportedOperationException("pollForVacuum");
+        while (true) {
+            BinaryRowAndRowId binaryRowAndRowId = runConsistently(locker -> {
+                GcEntry gcEntry = peek(lowWatermark);
+
+                if (gcEntry == null) {
+                    return null;
+                }
+
+                locker.lock(gcEntry.getRowId());

Review Comment:
   There is a TODO on the method itself. This entire implementation will be deleted, but ok, it's easy to add TODO



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] rpuch commented on a diff in pull request #1976: IGNITE-19269 Implemented methods to explicitly lock rows in MV storage.

Posted by "rpuch (via GitHub)" <gi...@apache.org>.
rpuch commented on code in PR #1976:
URL: https://github.com/apache/ignite-3/pull/1976#discussion_r1180467747


##########
modules/storage-api/src/main/java/org/apache/ignite/internal/storage/util/LockHolder.java:
##########
@@ -43,6 +43,7 @@
 
     /**
      * Increment the count of lock holders ({@link Thread}).
+     * Not thread-safe, required external synchronization.

Review Comment:
   ```suggestion
        * Not thread-safe, requires external synchronization.
   ```



##########
modules/storage-api/src/main/java/org/apache/ignite/internal/storage/util/LockHolder.java:
##########
@@ -52,6 +53,7 @@ void incrementHolders() {
 
     /**
      * Decrements the count of lock holders ({@link Thread}), returns {@code true} if there are no more lock holders.
+     * Not thread-safe, required external synchronization.

Review Comment:
   ```suggestion
        * Not thread-safe, requires external synchronization.
   ```



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java:
##########
@@ -70,6 +74,9 @@ public class PartitionListener implements RaftGroupListener {
     /** Logger. */
     private static final IgniteLogger LOG = Loggers.forClass(PartitionListener.class);
 
+    /** Empty sorted set. */
+    private static final SortedSet<RowId> EMPTY_SET = new TreeSet<>();

Review Comment:
   Let's wrap it in `unmodifiableSortedSet()`, to make sure noone adds anything. This should not hit us hard performance-wise.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] rpuch commented on a diff in pull request #1976: IGNITE-19269 Implemented methods to explicitly lock rows in MV storage.

Posted by "rpuch (via GitHub)" <gi...@apache.org>.
rpuch commented on code in PR #1976:
URL: https://github.com/apache/ignite-3/pull/1976#discussion_r1182249152


##########
modules/storage-api/src/main/java/org/apache/ignite/internal/storage/MvPartitionStorage.java:
##########
@@ -47,14 +48,41 @@ public interface MvPartitionStorage extends ManuallyCloseable {
     long REBALANCE_IN_PROGRESS = -1;
 
     /**
-     * Closure for executing write operations on the storage.
+     * Closure for executing write operations on the storage. All write operations, such as
+     * {@link #addWrite(RowId, BinaryRow, UUID, UUID, int)} or {@link #commitWrite(RowId, HybridTimestamp)},
+     * as well as {@link #scanVersions(RowId)}, and operations like {@link #committedGroupConfiguration(byte[])}, must be executed inside
+     * of the write closure. Also, each operation that involves modifying rows (and {@link #scanVersions(RowId)} must hold lock on
+     * corresponding row ID, by either calling {@link Locker#lock(RowId)} or calling {@link Locker#tryLock(RowId)} and checking the
+     * result.
      *
      * @param <V> Type of the result returned from the closure.
      */
     @SuppressWarnings("PublicInnerClass")
     @FunctionalInterface
     interface WriteClosure<V> {
-        V execute() throws StorageException;
+        V execute(Locker locker) throws StorageException;
+    }
+
+    /**
+     * Parameter type for {@link WriteClosure#execute(Locker)}. Used to lock row IDs before updating the data. All acquired locks are
+     * released automatically after {@code execute} call is completed.
+     */
+    @SuppressWarnings("PublicInnerClass")
+    interface Locker {
+        /**
+         * Locks passed row ID until the {@link WriteClosure#execute(Locker)} is completed.
+         *
+         * @param rowId Row ID to lock.
+         */
+        void lock(RowId rowId);
+
+        /**
+         * Tries to lock passed row ID. If successful, lock will be released when the {@link WriteClosure#execute(Locker)} is completed.
+         *
+         * @param rowId Row ID to lock.
+         * @return {@code true} if row ID has been locked successfully, or the lock has already been held by current thread.

Review Comment:
   It's about explicitness. If you think it's already obvious, ok.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] rpuch commented on a diff in pull request #1976: IGNITE-19269 Implemented methods to explicitly lock rows in MV storage.

Posted by "rpuch (via GitHub)" <gi...@apache.org>.
rpuch commented on code in PR #1976:
URL: https://github.com/apache/ignite-3/pull/1976#discussion_r1182255922


##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java:
##########
@@ -70,6 +75,9 @@ public class PartitionListener implements RaftGroupListener {
     /** Logger. */
     private static final IgniteLogger LOG = Loggers.forClass(PartitionListener.class);
 
+    /** Empty sorted set. */
+    private static final SortedSet<RowId> EMPTY_SET = unmodifiableSortedSet(new TreeSet<>());

Review Comment:
   It turns out there is `Collections#emptySortedSet()`, sorry for a misleading comment



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] ibessonov commented on a diff in pull request #1976: IGNITE-19269 Implemented methods to explicitly lock rows in MV storage.

Posted by "ibessonov (via GitHub)" <gi...@apache.org>.
ibessonov commented on code in PR #1976:
URL: https://github.com/apache/ignite-3/pull/1976#discussion_r1180304627


##########
modules/storage-api/src/main/java/org/apache/ignite/internal/storage/util/LockHolder.java:
##########
@@ -36,7 +35,7 @@
 class LockHolder<T extends Lock> {
     private final T lock;
 
-    private final AtomicInteger lockHolder = new AtomicInteger();
+    private int lockHoldersCount;

Review Comment:
   "compute" guarantees mutual exclusivity of closures and HB between everything, as far as I know. I'll add a comment



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] ibessonov commented on a diff in pull request #1976: IGNITE-19269 Implemented methods to explicitly lock rows in MV storage.

Posted by "ibessonov (via GitHub)" <gi...@apache.org>.
ibessonov commented on code in PR #1976:
URL: https://github.com/apache/ignite-3/pull/1976#discussion_r1182236083


##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java:
##########
@@ -70,6 +74,9 @@ public class PartitionListener implements RaftGroupListener {
     /** Logger. */
     private static final IgniteLogger LOG = Loggers.forClass(PartitionListener.class);
 
+    /** Empty sorted set. */
+    private static final SortedSet<RowId> EMPTY_SET = new TreeSet<>();

Review Comment:
   I couldn't find such thing for some reason, maybe I didn't look hard enough. Good idea, I'll do it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] ibessonov commented on a diff in pull request #1976: IGNITE-19269 Implemented methods to explicitly lock rows in MV storage.

Posted by "ibessonov (via GitHub)" <gi...@apache.org>.
ibessonov commented on code in PR #1976:
URL: https://github.com/apache/ignite-3/pull/1976#discussion_r1182276007


##########
modules/storage-api/src/main/java/org/apache/ignite/internal/storage/MvPartitionStorage.java:
##########
@@ -47,14 +48,41 @@ public interface MvPartitionStorage extends ManuallyCloseable {
     long REBALANCE_IN_PROGRESS = -1;
 
     /**
-     * Closure for executing write operations on the storage.
+     * Closure for executing write operations on the storage. All write operations, such as
+     * {@link #addWrite(RowId, BinaryRow, UUID, UUID, int)} or {@link #commitWrite(RowId, HybridTimestamp)},
+     * as well as {@link #scanVersions(RowId)}, and operations like {@link #committedGroupConfiguration(byte[])}, must be executed inside
+     * of the write closure. Also, each operation that involves modifying rows (and {@link #scanVersions(RowId)} must hold lock on
+     * corresponding row ID, by either calling {@link Locker#lock(RowId)} or calling {@link Locker#tryLock(RowId)} and checking the
+     * result.
      *
      * @param <V> Type of the result returned from the closure.
      */
     @SuppressWarnings("PublicInnerClass")
     @FunctionalInterface
     interface WriteClosure<V> {
-        V execute() throws StorageException;
+        V execute(Locker locker) throws StorageException;
+    }
+
+    /**
+     * Parameter type for {@link WriteClosure#execute(Locker)}. Used to lock row IDs before updating the data. All acquired locks are
+     * released automatically after {@code execute} call is completed.
+     */
+    @SuppressWarnings("PublicInnerClass")
+    interface Locker {
+        /**
+         * Locks passed row ID until the {@link WriteClosure#execute(Locker)} is completed.
+         *
+         * @param rowId Row ID to lock.
+         */
+        void lock(RowId rowId);
+
+        /**
+         * Tries to lock passed row ID. If successful, lock will be released when the {@link WriteClosure#execute(Locker)} is completed.
+         *
+         * @param rowId Row ID to lock.
+         * @return {@code true} if row ID has been locked successfully, or the lock has already been held by current thread.

Review Comment:
   I added a comment, but yes, I think that writing the explicit negation of previous statements is a bit too much



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] ibessonov commented on a diff in pull request #1976: IGNITE-19269 Implemented methods to explicitly lock rows in MV storage.

Posted by "ibessonov (via GitHub)" <gi...@apache.org>.
ibessonov commented on code in PR #1976:
URL: https://github.com/apache/ignite-3/pull/1976#discussion_r1180300482


##########
modules/storage-api/src/main/java/org/apache/ignite/internal/storage/MvPartitionStorage.java:
##########
@@ -47,14 +48,41 @@ public interface MvPartitionStorage extends ManuallyCloseable {
     long REBALANCE_IN_PROGRESS = -1;
 
     /**
-     * Closure for executing write operations on the storage.
+     * Closure for executing write operations on the storage. All write operations, such as
+     * {@link #addWrite(RowId, BinaryRow, UUID, UUID, int)} or {@link #commitWrite(RowId, HybridTimestamp)},
+     * as well as {@link #scanVersions(RowId)}, and operations like {@link #committedGroupConfiguration(byte[])}, must be executed inside
+     * of the write closure. Also, each operation that involves modifying rows (and {@link #scanVersions(RowId)} must hold lock on
+     * corresponding row ID, by either calling {@link Locker#lock(RowId)} or calling {@link Locker#tryLock(RowId)} and checking the
+     * result.
      *
      * @param <V> Type of the result returned from the closure.
      */
     @SuppressWarnings("PublicInnerClass")
     @FunctionalInterface
     interface WriteClosure<V> {
-        V execute() throws StorageException;
+        V execute(Locker locker) throws StorageException;
+    }
+
+    /**
+     * Parameter type for {@link WriteClosure#execute(Locker)}. Used to lock row IDs before updating the data. All acquired locks are
+     * released automatically after {@code execute} call is completed.
+     */
+    @SuppressWarnings("PublicInnerClass")
+    interface Locker {
+        /**
+         * Locks passed row ID until the {@link WriteClosure#execute(Locker)} is completed.

Review Comment:
   Ok



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] rpuch commented on a diff in pull request #1976: IGNITE-19269 Implemented methods to explicitly lock rows in MV storage.

Posted by "rpuch (via GitHub)" <gi...@apache.org>.
rpuch commented on code in PR #1976:
URL: https://github.com/apache/ignite-3/pull/1976#discussion_r1180341660


##########
modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/PersistentPageMemoryMvPartitionStorage.java:
##########
@@ -136,19 +137,31 @@ public void afterCheckpointEnd(CheckpointProgress progress) {
 
     @Override
     public <V> V runConsistently(WriteClosure<V> closure) throws StorageException {
-        return busy(() -> {
-            throwExceptionIfStorageNotInRunnableOrRebalanceState(state.get(), this::createStorageInfo);
+        LocalLocker locker = THREAD_LOCAL_LOCKER.get();
 
-            checkpointTimeoutLock.checkpointReadLock();
+        if (locker != null) {
+            return closure.execute(locker);
+        } else {
+            return busy(() -> {
+                throwExceptionIfStorageNotInRunnableOrRebalanceState(state.get(), this::createStorageInfo);
 
-            try {
-                return closure.execute();
-            } finally {
-                updateVersionChainLockByRowId.releaseAllLockByCurrentThread();
+                LocalLocker locker0 = new LocalLocker(lockByRowId);
 
-                checkpointTimeoutLock.checkpointReadUnlock();
-            }
-        });
+                checkpointTimeoutLock.checkpointReadLock();
+
+                THREAD_LOCAL_LOCKER.set(locker0);
+
+                try {
+                    return closure.execute(locker0);
+                } finally {
+                    THREAD_LOCAL_LOCKER.set(null);
+
+                    locker0.unlockAll();

Review Comment:
   This is the exception I kept in mind :) Sometimes there are bugs in code, so 'the impossible' happens. In this case, 'the impossible' will leave a lock locked forever, which may cause other errors/hangs that can make it really difficult to see what's going on while debugging



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] rpuch commented on a diff in pull request #1976: IGNITE-19269 Implemented methods to explicitly lock rows in MV storage.

Posted by "rpuch (via GitHub)" <gi...@apache.org>.
rpuch commented on code in PR #1976:
URL: https://github.com/apache/ignite-3/pull/1976#discussion_r1180339199


##########
modules/storage-api/src/main/java/org/apache/ignite/internal/storage/util/LockHolder.java:
##########
@@ -36,7 +35,7 @@
 class LockHolder<T extends Lock> {
     private final T lock;
 
-    private final AtomicInteger lockHolder = new AtomicInteger();
+    private int lockHoldersCount;

Review Comment:
   To guarantee this, `compute()` probably does volatile writes, maybe with some `synchronized`. So making the field `volatile` should not cost us anything (everything is already paid), but it will make the code less assumptive (so after a change, it will be more difficult to break it)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org