You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@ignite.apache.org by GitBox <gi...@apache.org> on 2022/06/15 18:48:20 UTC

[GitHub] [ignite-3] sergeyuttsel commented on a diff in pull request #834: IGNITE-16881 Multi version storage was integrated. Transaction ID is a UUID now.

sergeyuttsel commented on code in PR #834:
URL: https://github.com/apache/ignite-3/pull/834#discussion_r897894824


##########
modules/storage-api/src/test/java/org/apache/ignite/internal/storage/basic/TestMvPartitionStorage.java:
##########
@@ -114,6 +114,12 @@ public RowId insert(BinaryRow row, UUID txId) throws StorageException {
     /** {@inheritDoc} */
     @Override
     public @Nullable BinaryRow abortWrite(RowId rowId) {
+        VersionChain chain = map.get(rowId);

Review Comment:
   I moved it to computeIfPresent.



##########
modules/storage-api/src/test/java/org/apache/ignite/internal/storage/basic/TestMvPartitionStorage.java:
##########
@@ -114,6 +114,12 @@ public RowId insert(BinaryRow row, UUID txId) throws StorageException {
     /** {@inheritDoc} */
     @Override
     public @Nullable BinaryRow abortWrite(RowId rowId) {
+        VersionChain chain = map.get(rowId);
+
+        if (chain.begin != null && chain.txId == null) {

Review Comment:
   I created notContainsWriteIntent() method. It's needed because if chain doesn't contain write intent then nothing to do.



##########
modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/PageMemoryMvPartitionStorage.java:
##########
@@ -335,7 +334,7 @@ private VersionChain findVersionChainForModification(RowId rowId) {
         VersionChain currentVersionChain = findVersionChainForModification(rowId);
 
         if (currentVersionChain.transactionId() == null) {
-            throw new NoUncommittedVersionException();

Review Comment:
   It's possible to invoke commit/abort even if there is no a write intent.



##########
modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java:
##########
@@ -356,7 +358,7 @@ private Cursor<BinaryRow> scan(Predicate<BinaryRow> keyFilter, @Nullable Timesta
         //  - no one guarantees that there will only be a single cursor;
         //  - no one guarantees that returned cursor will not be used by other threads.
         // The thing is, we need this buffer to preserve its content between invocactions of "hasNext" method.
-        ByteBuffer seekKeyBuf = ByteBuffer.allocate(seekKeyBufSize).order(BIG_ENDIAN);
+        ByteBuffer seekKeyBuf = ByteBuffer.allocate(seekKeyBufSize).order(BIG_ENDIAN).putShort(0, (short) partitionId);

Review Comment:
   This change was proposed by Ivan Bessonov when I had discovered a bug. A scan method returns rows from other partitions without it. For example `part2.scan()` returns rows from `part0`, `part1` and `part2`.



##########
modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItColocationTest.java:
##########
@@ -112,13 +111,13 @@ public class ItColocationTest {
 
         TxManager txManager = new TxManagerImpl(clusterService, new HeapLockManager()) {
             @Override
-            public CompletableFuture<Void> finishRemote(NetworkAddress addr, Timestamp ts, boolean commit, Set<String> groups) {
+            public CompletableFuture<Void> finishRemote(NetworkAddress addr, UUID id, boolean commit, Set<String> groups) {
                 return CompletableFuture.completedFuture(null);
             }
         };
         txManager.start();
 
-        MessagingService messagingService = MessagingServiceTestUtils.mockMessagingService(txManager);
+        MessagingService messagingService = Mockito.mock(MessagingService.class, RETURNS_DEEP_STUBS);;

Review Comment:
   Ouch. Double semicolon.



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/VersionedRowStore.java:
##########
@@ -48,65 +45,66 @@
  */
 public class VersionedRowStore {

Review Comment:
   Yes. Do you have any idea to rename it? I did not find the corresponding component in the tx design document.



##########
modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java:
##########
@@ -132,6 +133,7 @@
 /**
  * Tests scenarios for table manager.
  */
+@Disabled("https://issues.apache.org/jira/browse/IGNITE-16923")

Review Comment:
   I've changed `TablesConfigurationSchema#defaultDataStorage` to "rocksdb" and added assert `assert internalTbl.storage() instanceof MvTableStorage`.
   To fix most of these tests I need to add `RocksDbDataStorageModule` into `TableManagerTest#createDataStorageManager` method:
   `new DataStorageModules(List.of(new PageMemoryDataStorageModule(), new RocksDbDataStorageModule()))`
   But to do it I need to add dependency on module `ignite-storage-rocksdb`.



##########
modules/table/src/test/java/org/apache/ignite/internal/table/MessagingServiceTestUtils.java:
##########
@@ -64,4 +89,29 @@ public static MessagingService mockMessagingService(TxManager txManager) {
 
         return messagingService;
     }
+
+    private static <T extends Command> Iterator<CommandClosure<T>> iterator(BiConsumer<Integer, CommandClosure<T>> func) {

Review Comment:
   Thanks :)



##########
modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java:
##########
@@ -95,56 +98,56 @@ public TxManagerImpl(ClusterService clusterService, LockManager lockManager) {
     /** {@inheritDoc} */
     @Override
     public InternalTransaction begin() {
-        Timestamp ts = Timestamp.nextVersion();
+        UUID txId = Timestamp.nextId();
 
-        states.put(ts, TxState.PENDING);
+        states.put(txId, TxState.PENDING);
 
-        return new TransactionImpl(this, ts, clusterService.topologyService().localMember().address());
+        return new TransactionImpl(this, txId, clusterService.topologyService().localMember().address());
     }
 
     /** {@inheritDoc} */
     @Override
-    public TxState state(Timestamp ts) {
-        return states.get(ts);
+    public TxState state(UUID txId) {
+        return states.get(txId);
     }
 
     /** {@inheritDoc} */
     @Override
-    public void forget(Timestamp ts) {
-        states.remove(ts);
+    public void forget(UUID txId) {
+        states.remove(txId);
     }
 
     /** {@inheritDoc} */
     @Override
-    public CompletableFuture<Void> commitAsync(Timestamp ts) {
-        if (changeState(ts, TxState.PENDING, TxState.COMMITED) || state(ts) == TxState.COMMITED) {
-            unlockAll(ts);
+    public CompletableFuture<Void> commitAsync(UUID txId) {
+        if (changeState(txId, TxState.PENDING, TxState.COMMITED) || state(txId) == TxState.COMMITED) {
+            unlockAll(txId);
 
             return completedFuture(null);
         }
 
-        return failedFuture(new TransactionException(format("Failed to commit a transaction [ts={}, state={}]", ts, state(ts))));
+        return failedFuture(new TransactionException(format("Failed to commit a transaction [id={}, state={}]", txId, state(txId))));
     }
 
     /** {@inheritDoc} */
     @Override
-    public CompletableFuture<Void> rollbackAsync(Timestamp ts) {
-        if (changeState(ts, TxState.PENDING, TxState.ABORTED) || state(ts) == TxState.ABORTED) {
-            unlockAll(ts);
+    public CompletableFuture<Void> rollbackAsync(UUID txId) {
+        if (changeState(txId, TxState.PENDING, TxState.ABORTED) || state(txId) == TxState.ABORTED) {
+            unlockAll(txId);
 
             return completedFuture(null);
         }
 
-        return failedFuture(new TransactionException(format("Failed to rollback a transaction [ts={}, state={}]", ts, state(ts))));
+        return failedFuture(new TransactionException(format("Failed to rollback a transaction [id={}, state={}]", txId, state(txId))));

Review Comment:
   fixed



##########
modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/PageMemoryMvPartitionStorage.java:
##########
@@ -371,7 +370,9 @@ public void commitWrite(RowId rowId, Timestamp timestamp) throws StorageExceptio
         VersionChain currentVersionChain = findVersionChainForModification(rowId);
         long chainLink = PartitionlessLinks.addPartitionIdToPartititionlessLink(currentVersionChain.headLink(), partitionId);
 
-        assert currentVersionChain.transactionId() != null;
+        if (currentVersionChain.transactionId() == null) {

Review Comment:
   It isn't an uncommited chain. It's possible when a tx acquires a write lock, but doesn't make a write op. For example on a deleteExact op. In this case it invokes commit/abort without write intent.



##########
modules/storage-api/src/test/java/org/apache/ignite/internal/storage/basic/TestMvPartitionStorage.java:
##########
@@ -154,6 +160,12 @@ private void cleanupIndexesForAbortedRow(VersionChain versionChain, RowId rowId)
     /** {@inheritDoc} */
     @Override
     public void commitWrite(RowId rowId, Timestamp timestamp) {
+        VersionChain chain = map.get(rowId);

Review Comment:
   I moved it to compute.



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java:
##########
@@ -379,12 +380,18 @@ private void updateAssignmentInternal(ConfigurationNotificationEvent<byte[]> ass
                 InternalTable internalTable = tablesById.get(tblId).internalTable();
 
                 try {
+                    // TODO: IGNITE-16923 Remove assert after the ticket is resolved.
+                    assert internalTable.storage() instanceof MvTableStorage :
+                            "Only multi version storages are supported.";
+
+                    MvTableStorage storage = (MvTableStorage) internalTable.storage();
+
                     futures[partId] = raftMgr.updateRaftGroup(
                             raftGroupName(tblId, partId),
                             newPartitionAssignment,
                             toAdd,
                             () -> new PartitionListener(tblId,
-                                    new VersionedRowStore(internalTable.storage().getOrCreatePartition(partId),
+                                    new VersionedRowStore(storage.createPartition(partId),

Review Comment:
   Hm. I've seen that `RocksDbTableStorage.createPartition()` uses `getOrCreatePartition()` internally. But now I see that `TestMvTableStorage.createPartition` doesn't. So I've updated a `TestMvTableStorage.createPartition` to maintain the same semantics.



##########
modules/table/src/test/java/org/apache/ignite/internal/table/MessagingServiceTestUtils.java:
##########
@@ -64,4 +89,29 @@ public static MessagingService mockMessagingService(TxManager txManager) {
 
         return messagingService;
     }
+
+    private static <T extends Command> Iterator<CommandClosure<T>> iterator(BiConsumer<Integer, CommandClosure<T>> func) {
+        return new Iterator<>() {
+            /** Iteration. */
+            private int it = 0;
+
+            /** {@inheritDoc} */
+            @Override
+            public boolean hasNext() {
+                return it++ < 1;

Review Comment:
   fixed



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/VersionedRowStore.java:
##########
@@ -48,65 +45,66 @@
  */
 public class VersionedRowStore {
     /** Storage delegate. */
-    private final PartitionStorage storage;
+    private final MvPartitionStorage storage;
 
     /** Transaction manager. */
     private TxManager txManager;
 
+    /** Dummy primary index. */
+    private ConcurrentHashMap<ByteBuffer, RowId> primaryIndex = new ConcurrentHashMap<>();

Review Comment:
   OK



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/FinishTxCommand.java:
##########
@@ -17,35 +17,43 @@
 
 package org.apache.ignite.internal.table.distributed.command;
 
-import org.apache.ignite.internal.tx.Timestamp;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import org.apache.ignite.lang.IgniteUuid;
 import org.apache.ignite.raft.client.WriteCommand;
 
 /** State machine command to finish a transaction. */
 public class FinishTxCommand implements WriteCommand {
-    /** The timestamp. */
-    private final Timestamp timestamp;
+    /** Transaction id. */
+    private final UUID txId;
 
     /** Commit or rollback state. */
     private final boolean finish;
 
+    /** Keys that are locked by the transaction. */
+    private Map<IgniteUuid, List<byte[]>> lockedKeys;
+
     /**
      * The constructor.
      *
-     * @param timestamp The timestamp.
-     * @param finish    Commit or rollback state {@code True} to commit.
+     * @param txId          The txId.
+     * @param finish        Commit or rollback state {@code True} to commit.
+     * @param lockedKeys    Keys that are locked by the transaction.

Review Comment:
   I added a comment. By the way, the lockId is actually the tableId.



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableTxManagerImpl.java:
##########
@@ -48,8 +48,9 @@ public TableTxManagerImpl(ClusterService clusterService, LockManager lockManager
 
     /** {@inheritDoc} */
     @Override
-    protected CompletableFuture<?> finish(String groupId, Timestamp ts, boolean commit) {
-        ActionRequest req = FACTORY.actionRequest().command(new FinishTxCommand(ts, commit)).groupId(groupId).readOnlySafe(true).build();
+    protected CompletableFuture<?> finish(String groupId, UUID id, boolean commit) {

Review Comment:
   Yes, thanks.



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/VersionedRowStore.java:
##########
@@ -48,65 +45,66 @@
  */
 public class VersionedRowStore {
     /** Storage delegate. */
-    private final PartitionStorage storage;
+    private final MvPartitionStorage storage;
 
     /** Transaction manager. */
     private TxManager txManager;
 
+    /** Dummy primary index. */
+    private ConcurrentHashMap<ByteBuffer, RowId> primaryIndex = new ConcurrentHashMap<>();
+
+    /** Keys that were inserted by the transaction. */
+    private ConcurrentHashMap<UUID, List<ByteBuffer>> txsInsertedKeys = new ConcurrentHashMap<>();

Review Comment:
   It's need to handle some cases, for example:
   Tx put new key in a storage and in a primary index. Also new chain is created.
   On abort we need to determine which keys were put and to remove these keys from the primary index.



##########
modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TransactionImpl.java:
##########
@@ -164,10 +164,10 @@ private CompletableFuture<Void> finish(boolean commit) {
         for (Map.Entry<NetworkAddress, Set<String>> entry : tmp.entrySet()) {
             boolean local = address.equals(entry.getKey());
 
-            futs[i++] = txManager.finishRemote(entry.getKey(), timestamp, commit, entry.getValue());
+            futs[i++] = txManager.finishRemote(entry.getKey(), id, commit, entry.getValue());
 
-            LOG.debug("finish [addr={}, commit={}, ts={}, local={}, groupIds={}",
-                    address, commit, timestamp, local, entry.getValue());
+            LOG.debug("finish [addr={}, commit={}, id={}, local={}, groupIds={}",

Review Comment:
   fixed



##########
modules/transactions/src/main/java/org/apache/ignite/internal/tx/Timestamp.java:
##########
@@ -151,6 +161,17 @@ public static synchronized Timestamp nextVersion() {
         return new Timestamp(newTime << 16 | cntr, localNodeId);
     }
 
+    /**
+     * Next id.
+     *
+     * @return Next id.
+     */
+    public static UUID nextId() {

Review Comment:
   > Do we really have any logic based on txIds comparison?
   
   Yes, we do. According to tx design document: `Assign the txId. This will be used as a unique key to find a transaction. Additionally it is used for deadlock prevention during the lock acquisition stage. The classic paper ... suggests using (local timestamp, node id) scheme.`
   
   > Why do we have UUID nextId() method in a Timestamp class?
   Why do we entirely have UUID nextId() method?
   
   I think it is a temp solution. May be we will need to convert `Timestamp` to util that creates a `UUID`. And of course we need make HLC.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org