You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@ignite.apache.org by "rpuch (via GitHub)" <gi...@apache.org> on 2023/04/06 07:14:47 UTC

[GitHub] [ignite-3] rpuch commented on a diff in pull request #1887: IGNITE-19105 Remove MetaStorage cursor management

rpuch commented on code in PR #1887:
URL: https://github.com/apache/ignite-3/pull/1887#discussion_r1157337143


##########
modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/command/GetPrefixCommand.java:
##########
@@ -15,20 +15,15 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.metastorage.command.cursor;
+package org.apache.ignite.internal.metastorage.command;
 
-import org.apache.ignite.internal.metastorage.command.MetastorageCommandsMessageGroup;
-import org.apache.ignite.internal.raft.WriteCommand;
-import org.apache.ignite.lang.IgniteUuid;
 import org.apache.ignite.network.annotations.Transferable;
 
 /**
- * Cursor close command for MetaStorageCommandListener that closes cursor with given id.
+ * Range command for MetaStorageCommandListener that retrieves entries for the given key prefix in lexicographic order. Entries will be
+ * filtered out by upper bound of given revision number.
  */
-@Transferable(MetastorageCommandsMessageGroup.CLOSE_CURSOR)
-public interface CloseCursorCommand extends WriteCommand {
-    /**
-     * Returns cursor id.
-     */
-    IgniteUuid cursorId();
+@Transferable(MetastorageCommandsMessageGroup.GET_PREFIX)
+public interface GetPrefixCommand extends PaginationCommand {
+    byte[] prefix();

Review Comment:
   Should we have a javadoc here?



##########
modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/CursorSubscription.java:
##########
@@ -130,7 +126,9 @@ private void processRequest() {
                     demand--;
                 } else {
                     if (cachedResponse.hasNextBatch()) {
-                        requestNextBatch();
+                        byte[] lastProcessedKey = entries.get(entries.size() - 1).key();

Review Comment:
   Is there a guarantee that `entries` is not empty here?



##########
modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java:
##########
@@ -483,21 +485,17 @@ public Collection<Entry> getAndPutAll(List<byte[]> keys, List<byte[]> values) {
         return res;
     }
 
-    /** {@inheritDoc} */
-    @NotNull
     @Override
     public Entry get(byte[] key) {
         rwLock.readLock().lock();

Review Comment:
   It seems that read lock is only needed here to make sure `rev` is read without a race. `doGet()` only reads `index` and `data` fields, they don't seem to need read lock protection. So it looks like we could replace this with just `doGet(key, revision());`.
   
   Is the lock taken here for consistency with other places, or for the sake of safety (if something changes in the future regarding the fields read), or do I miss something?



##########
modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java:
##########
@@ -509,21 +507,28 @@ public Entry get(byte[] key, long revUpperBound) {
         }
     }
 
-    /** {@inheritDoc} */
-    @NotNull
     @Override
     public Collection<Entry> getAll(List<byte[]> keys) {
-        return doGetAll(keys, LATEST_REV);
+        rwLock.readLock().lock();

Review Comment:
   Same as above



##########
modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java:
##########
@@ -758,23 +757,95 @@ private void applyOperations(Collection<Operation> ops) throws RocksDBException
     }
 
     @Override
-    public Cursor<Entry> range(byte[] keyFrom, byte[] keyTo, boolean includeTombstones) {
-        return new RangeCursor(this, keyFrom, keyTo, rev, includeTombstones);
-    }
+    public Cursor<Entry> range(byte[] keyFrom, byte @Nullable [] keyTo) {
+        rwLock.readLock().lock();
 
-    @Override
-    public Cursor<Entry> range(byte[] keyFrom, byte[] keyTo, long revUpperBound, boolean includeTombstones) {
-        return new RangeCursor(this, keyFrom, keyTo, revUpperBound, includeTombstones);
+        try {
+            return range(keyFrom, keyTo, rev);
+        } finally {
+            rwLock.readLock().unlock();
+        }
     }
 
     @Override
-    public Cursor<Entry> prefix(byte[] prefix, boolean includeTombstones) {
-        return prefix(prefix, rev, includeTombstones);
-    }
+    public Cursor<Entry> range(byte[] keyFrom, byte @Nullable [] keyTo, long revUpperBound) {
+        rwLock.readLock().lock();

Review Comment:
   Do we need a read lock here?



##########
modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageListener.java:
##########
@@ -77,159 +67,89 @@ public void onRead(Iterator<CommandClosure<ReadCommand>> iter) {
 
             ReadCommand command = clo.command();
 
-            if (command instanceof GetCommand) {
-                GetCommand getCmd = (GetCommand) command;
-
-                Entry e;
-
-                if (getCmd.revision() != 0) {
-                    e = storage.get(getCmd.key(), getCmd.revision());
-                } else {
-                    e = storage.get(getCmd.key());
-                }
-
-                clo.result(e);
-            } else if (command instanceof GetAllCommand) {
-                GetAllCommand getAllCmd = (GetAllCommand) command;
-
-                Collection<Entry> entries;
-
-                if (getAllCmd.revision() != 0) {
-                    entries = storage.getAll(getAllCmd.keys(), getAllCmd.revision());
-                } else {
-                    entries = storage.getAll(getAllCmd.keys());
-                }
-
-                clo.result((Serializable) entries);
-            } else {
-                assert false : "Command was not found [cmd=" + command + ']';
-            }
-        }
-    }
-
-    @Override
-    public void onWrite(Iterator<CommandClosure<WriteCommand>> iter) {
-        while (iter.hasNext()) {
-            CommandClosure<WriteCommand> clo = iter.next();
-
-            if (writeHandler.handleWriteCommand(clo)) {
-                continue;
-            }
-
-            WriteCommand command = clo.command();
-
-            if (command instanceof CreateRangeCursorCommand) {
-                var rangeCmd = (CreateRangeCursorCommand) command;
-
-                IgniteUuid cursorId = rangeCmd.cursorId();
-
-                Cursor<Entry> cursor = rangeCmd.revUpperBound() != -1
-                        ? storage.range(rangeCmd.keyFrom(), rangeCmd.keyTo(), rangeCmd.revUpperBound(), rangeCmd.includeTombstones())
-                        : storage.range(rangeCmd.keyFrom(), rangeCmd.keyTo(), rangeCmd.includeTombstones());
-
-                var cursorMeta = new CursorMeta(cursor, rangeCmd.requesterNodeId());
-
-                cursors.put(cursorId, cursorMeta);
-
-                clo.result(cursorId);
-            } else if (command instanceof CreatePrefixCursorCommand) {
-                var prefixCmd = (CreatePrefixCursorCommand) command;
-
-                IgniteUuid cursorId = prefixCmd.cursorId();
-
-                Cursor<Entry> cursor = prefixCmd.revUpperBound() == -1
-                        ? storage.prefix(prefixCmd.prefix(), prefixCmd.includeTombstones())
-                        : storage.prefix(prefixCmd.prefix(), prefixCmd.revUpperBound(), prefixCmd.includeTombstones());
-
-                var cursorMeta = new CursorMeta(cursor, prefixCmd.requesterNodeId());
-
-                cursors.put(cursorId, cursorMeta);
+            try {
+                if (command instanceof GetCommand) {
+                    GetCommand getCmd = (GetCommand) command;
 
-                clo.result(cursorId);
-            } else if (command instanceof NextBatchCommand) {
-                var nextBatchCommand = (NextBatchCommand) command;
+                    Entry e;
 
-                CursorMeta cursorMeta = cursors.get(nextBatchCommand.cursorId());
+                    if (getCmd.revision() != 0) {

Review Comment:
   So the smallest possible revision is 1?



##########
modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java:
##########
@@ -483,21 +485,17 @@ public Collection<Entry> getAndPutAll(List<byte[]> keys, List<byte[]> values) {
         return res;
     }
 
-    /** {@inheritDoc} */
-    @NotNull
     @Override
     public Entry get(byte[] key) {
         rwLock.readLock().lock();
 
         try {
-            return doGet(key, LATEST_REV);
+            return doGet(key, rev);
         } finally {
             rwLock.readLock().unlock();
         }
     }
 
-    /** {@inheritDoc} */
-    @NotNull
     @Override
     public Entry get(byte[] key, long revUpperBound) {
         rwLock.readLock().lock();

Review Comment:
   Same question about the read lock as above



##########
modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java:
##########
@@ -758,23 +757,95 @@ private void applyOperations(Collection<Operation> ops) throws RocksDBException
     }
 
     @Override
-    public Cursor<Entry> range(byte[] keyFrom, byte[] keyTo, boolean includeTombstones) {
-        return new RangeCursor(this, keyFrom, keyTo, rev, includeTombstones);
-    }
+    public Cursor<Entry> range(byte[] keyFrom, byte @Nullable [] keyTo) {
+        rwLock.readLock().lock();
 
-    @Override
-    public Cursor<Entry> range(byte[] keyFrom, byte[] keyTo, long revUpperBound, boolean includeTombstones) {
-        return new RangeCursor(this, keyFrom, keyTo, revUpperBound, includeTombstones);
+        try {
+            return range(keyFrom, keyTo, rev);
+        } finally {
+            rwLock.readLock().unlock();
+        }
     }
 
     @Override
-    public Cursor<Entry> prefix(byte[] prefix, boolean includeTombstones) {
-        return prefix(prefix, rev, includeTombstones);
-    }
+    public Cursor<Entry> range(byte[] keyFrom, byte @Nullable [] keyTo, long revUpperBound) {
+        rwLock.readLock().lock();
 
-    @Override
-    public Cursor<Entry> prefix(byte[] prefix, long revUpperBound, boolean includeTombstones) {
-        return new RangeCursor(this, prefix, incrementPrefix(prefix), revUpperBound, includeTombstones);
+        try {
+            var readOpts = new ReadOptions();
+
+            var upperBound = keyTo == null ? null : new Slice(keyTo);
+
+            readOpts.setIterateUpperBound(upperBound);
+
+            RocksIterator iterator = index.newIterator(readOpts);
+
+            iterator.seek(keyFrom);
+
+            return new RocksIteratorAdapter<>(iterator) {
+                /** Cached entry used to filter "empty" values. */
+                @Nullable
+                private Entry next;
+
+                @Override
+                public boolean hasNext() {
+                    if (next != null) {
+                        return true;
+                    }
+
+                    while (next == null && super.hasNext()) {
+                        Entry nextCandidate = decodeEntry(it.key(), it.value());
+
+                        it.next();
+
+                        if (!nextCandidate.empty()) {
+                            next = nextCandidate;
+
+                            return true;
+                        }
+                    }
+
+                    return false;
+                }
+
+                @Override
+                public Entry next() {
+                    if (!hasNext()) {
+                        throw new NoSuchElementException();
+                    }
+
+                    Entry result = next;
+
+                    assert result != null;
+
+                    next = null;
+
+                    return result;
+                }
+
+                @Override
+                protected Entry decodeEntry(byte[] key, byte[] value) {
+                    long[] revisions = getAsLongs(value);
+
+                    long targetRevision = maxRevision(revisions, revUpperBound);
+
+                    if (targetRevision == -1) {
+                        return EntryImpl.empty(key);
+                    }
+
+                    return doGetValue(key, targetRevision);

Review Comment:
   No read lock is taken around `doGetValue()`, but for ordinary `get() and getAll()` it is taken. It does not seem to be necessary (as mentioned in the comments above), I'm just leaving a note here: should we have all `doGetValue()` consistently used under/without a lock?



##########
modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageWriteHandler.java:
##########
@@ -63,7 +63,7 @@ class MetaStorageWriteHandler {
      * Tries to process a {@link WriteCommand}, returning {@code true} if the command has been successfully processed or {@code false} if

Review Comment:
   It does not return anything now



##########
modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java:
##########
@@ -509,21 +507,28 @@ public Entry get(byte[] key, long revUpperBound) {
         }
     }
 
-    /** {@inheritDoc} */
-    @NotNull
     @Override
     public Collection<Entry> getAll(List<byte[]> keys) {
-        return doGetAll(keys, LATEST_REV);
+        rwLock.readLock().lock();
+
+        try {
+            return doGetAll(keys, rev);
+        } finally {
+            rwLock.readLock().unlock();
+        }
     }
 
-    /** {@inheritDoc} */
-    @NotNull
     @Override
     public Collection<Entry> getAll(List<byte[]> keys, long revUpperBound) {
-        return doGetAll(keys, revUpperBound);
+        rwLock.readLock().lock();

Review Comment:
   Same as above



##########
modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageListener.java:
##########
@@ -77,159 +67,89 @@ public void onRead(Iterator<CommandClosure<ReadCommand>> iter) {
 
             ReadCommand command = clo.command();
 
-            if (command instanceof GetCommand) {
-                GetCommand getCmd = (GetCommand) command;
-
-                Entry e;
-
-                if (getCmd.revision() != 0) {
-                    e = storage.get(getCmd.key(), getCmd.revision());
-                } else {
-                    e = storage.get(getCmd.key());
-                }
-
-                clo.result(e);
-            } else if (command instanceof GetAllCommand) {
-                GetAllCommand getAllCmd = (GetAllCommand) command;
-
-                Collection<Entry> entries;
-
-                if (getAllCmd.revision() != 0) {
-                    entries = storage.getAll(getAllCmd.keys(), getAllCmd.revision());
-                } else {
-                    entries = storage.getAll(getAllCmd.keys());
-                }
-
-                clo.result((Serializable) entries);
-            } else {
-                assert false : "Command was not found [cmd=" + command + ']';
-            }
-        }
-    }
-
-    @Override
-    public void onWrite(Iterator<CommandClosure<WriteCommand>> iter) {
-        while (iter.hasNext()) {
-            CommandClosure<WriteCommand> clo = iter.next();
-
-            if (writeHandler.handleWriteCommand(clo)) {
-                continue;
-            }
-
-            WriteCommand command = clo.command();
-
-            if (command instanceof CreateRangeCursorCommand) {
-                var rangeCmd = (CreateRangeCursorCommand) command;
-
-                IgniteUuid cursorId = rangeCmd.cursorId();
-
-                Cursor<Entry> cursor = rangeCmd.revUpperBound() != -1
-                        ? storage.range(rangeCmd.keyFrom(), rangeCmd.keyTo(), rangeCmd.revUpperBound(), rangeCmd.includeTombstones())
-                        : storage.range(rangeCmd.keyFrom(), rangeCmd.keyTo(), rangeCmd.includeTombstones());
-
-                var cursorMeta = new CursorMeta(cursor, rangeCmd.requesterNodeId());
-
-                cursors.put(cursorId, cursorMeta);
-
-                clo.result(cursorId);
-            } else if (command instanceof CreatePrefixCursorCommand) {
-                var prefixCmd = (CreatePrefixCursorCommand) command;
-
-                IgniteUuid cursorId = prefixCmd.cursorId();
-
-                Cursor<Entry> cursor = prefixCmd.revUpperBound() == -1
-                        ? storage.prefix(prefixCmd.prefix(), prefixCmd.includeTombstones())
-                        : storage.prefix(prefixCmd.prefix(), prefixCmd.revUpperBound(), prefixCmd.includeTombstones());
-
-                var cursorMeta = new CursorMeta(cursor, prefixCmd.requesterNodeId());
-
-                cursors.put(cursorId, cursorMeta);
+            try {
+                if (command instanceof GetCommand) {
+                    GetCommand getCmd = (GetCommand) command;
 
-                clo.result(cursorId);
-            } else if (command instanceof NextBatchCommand) {
-                var nextBatchCommand = (NextBatchCommand) command;
+                    Entry e;
 
-                CursorMeta cursorMeta = cursors.get(nextBatchCommand.cursorId());
+                    if (getCmd.revision() != 0) {
+                        e = storage.get(getCmd.key(), getCmd.revision());
+                    } else {
+                        e = storage.get(getCmd.key());
+                    }
 
-                if (cursorMeta == null) {
-                    clo.result(new NoSuchElementException("Corresponding cursor on the server side is not found."));
+                    clo.result(e);
+                } else if (command instanceof GetAllCommand) {
+                    GetAllCommand getAllCmd = (GetAllCommand) command;
 
-                    return;
-                }
+                    Collection<Entry> entries;
 
-                try {
-                    var resp = new ArrayList<Entry>(nextBatchCommand.batchSize());
+                    if (getAllCmd.revision() != 0) {
+                        entries = storage.getAll(getAllCmd.keys(), getAllCmd.revision());
+                    } else {
+                        entries = storage.getAll(getAllCmd.keys());
+                    }
 
-                    Cursor<Entry> cursor = cursorMeta.cursor();
+                    clo.result((Serializable) entries);
+                } else if (command instanceof GetRangeCommand) {
+                    var rangeCmd = (GetRangeCommand) command;
 
-                    for (int i = 0; i < nextBatchCommand.batchSize() && cursor.hasNext(); i++) {
-                        resp.add(cursor.next());
-                    }
+                    byte[] keyFrom = rangeCmd.previousKey() == null
+                            ? rangeCmd.keyFrom()
+                            : requireNonNull(storage.nextKey(rangeCmd.previousKey()));
 
-                    if (!cursor.hasNext()) {
-                        closeCursor(nextBatchCommand.cursorId());
-                    }
+                    clo.result(handlePaginationCommand(keyFrom, rangeCmd.keyTo(), rangeCmd));
+                } else if (command instanceof GetPrefixCommand) {
+                    var prefixCmd = (GetPrefixCommand) command;
 
-                    clo.result(new BatchResponse(resp, cursor.hasNext()));
-                } catch (Exception e) {
-                    clo.result(e);
-                }
-            } else if (command instanceof CloseCursorCommand) {
-                var closeCursorCommand = (CloseCursorCommand) command;
+                    byte[] keyFrom = prefixCmd.previousKey() == null
+                            ? prefixCmd.prefix()
+                            : requireNonNull(storage.nextKey(prefixCmd.previousKey()));
 
-                try {
-                    closeCursor(closeCursorCommand.cursorId());
+                    byte[] keyTo = storage.nextKey(prefixCmd.prefix());
 
-                    clo.result(null);
-                } catch (Exception e) {
-                    clo.result(new MetaStorageException(CURSOR_CLOSING_ERR, e));
+                    clo.result(handlePaginationCommand(keyFrom, keyTo, prefixCmd));
+                } else {
+                    assert false : "Command was not found [cmd=" + command + ']';
                 }
-            } else if (command instanceof CloseAllCursorsCommand) {
-                var cursorsCloseCmd = (CloseAllCursorsCommand) command;
-
-                Iterator<CursorMeta> cursorsIter = cursors.values().iterator();
+            } catch (Exception e) {
+                clo.result(e);
+            }
+        }
+    }
 
-                Exception ocurredException = null;
+    private BatchResponse handlePaginationCommand(byte[] keyFrom, byte @Nullable [] keyTo, PaginationCommand command) {
+        Cursor<Entry> cursor = command.revUpperBound() == -1
+                ? storage.range(keyFrom, keyTo)
+                : storage.range(keyFrom, keyTo, command.revUpperBound());
 
-                while (cursorsIter.hasNext()) {
-                    CursorMeta cursorDesc = cursorsIter.next();
+        try (cursor) {
+            var entries = new ArrayList<Entry>();
 
-                    if (cursorDesc.requesterNodeId().equals(cursorsCloseCmd.nodeId())) {
-                        try {
-                            cursorDesc.cursor().close();
-                        } catch (Exception e) {
-                            if (ocurredException == null) {
-                                ocurredException = e;
-                            } else {
-                                ocurredException.addSuppressed(e);
-                            }
-                        }
+            for (Entry entry : cursor) {
+                if (command.includeTombstones() || !entry.tombstone()) {
+                    entries.add(entry);
 
-                        cursorsIter.remove();
+                    if (entries.size() == command.batchSize()) {

Review Comment:
   Is it possible to send 0 as `batchSize()`? It seems that if zero gets sent, we'll operate in an unbounded manner.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org