You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@ignite.apache.org by GitBox <gi...@apache.org> on 2022/11/03 08:57:56 UTC

[GitHub] [ignite-3] AMashenkov commented on a diff in pull request #1205: IGNITE-17748 Enrich InternalTable.scan API in order to support index scans.

AMashenkov commented on code in PR #1205:
URL: https://github.com/apache/ignite-3/pull/1205#discussion_r1012636163


##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java:
##########
@@ -569,6 +584,184 @@ private CompletableFuture<ArrayList<BinaryRow>> processScanRetrieveBatchAction(R
         });
     }
 
+    /**
+     * Scans sorted index in RW tx.
+     *
+     * @param request Index scan request.
+     * @param indexStorage Index storage.
+     * @return Opreation future.
+     */
+    private CompletableFuture<List<BinaryRow>> scanSortedIndex(ReadWriteScanRetrieveBatchReplicaRequest request,
+            SortedIndexStorage indexStorage) {
+        UUID txId = request.transactionId();
+        int batchCount = request.batchSize();
+
+        IgniteUuid cursorId = new IgniteUuid(txId, request.scanId());
+
+        UUID indexId = request.indexToUse();
+
+        BinaryTuple lowerBound = request.lowerBound();
+        BinaryTuple upperBound = request.upperBound();
+
+        int flags = request.flags();
+
+        boolean includeUpperBound = (flags & SortedIndexStorage.LESS_OR_EQUAL) != 0;
+
+        return lockManager.acquire(txId, new LockKey(indexId), LockMode.IS).thenCompose(idxLock -> { // Index IS lock
+            return lockManager.acquire(txId, new LockKey(tableId), LockMode.IS).thenCompose(tblLock -> { // Table IS lock
+                @SuppressWarnings("resource") Cursor<IndexRow> cursor = (Cursor<IndexRow>) cursors.computeIfAbsent(cursorId,
+                        id -> {
+                            // TODO https://issues.apache.org/jira/browse/IGNITE-18057
+                            // Fix scan cursor return item closet to lowerbound and <= lowerbound
+                            // to correctly lock range between lowerbound value and the item next to lowerbound.
+                            return indexStorage.scan(
+                                    lowerBound == null ? null : BinaryTuplePrefix.fromBinaryTuple(lowerBound),
+                                    // We need upperBound next value for correct range lock.
+                                    null, //BinaryTuplePrefix.fromBinaryTuple(upperBound),
+                                    flags
+                            );
+                        });
+
+                IndexLocker indexLocker = indexesLockers.get().get(indexId);
+
+                final ArrayList<BinaryRow> result = new ArrayList<>(batchCount);
+
+                return continueIndexScan(txId, indexId, indexLocker, cursor, upperBound, includeUpperBound, batchCount, result)
+                        .thenApply(ignore -> result);
+            });
+        });
+    }
+
+    /**
+     * Scans sorted index in RO tx.
+     *
+     * @param request Index scan request.
+     * @param indexStorage Index storage.
+     * @return Opreation future.
+     */
+    private CompletableFuture<List<BinaryRow>> scanSortedIndex(
+            ReadOnlyScanRetrieveBatchReplicaRequest request,
+            SortedIndexStorage indexStorage
+    ) {
+        UUID txId = request.transactionId();
+        int batchCount = request.batchSize();
+        HybridTimestamp timestamp = request.readTimestamp();
+
+        IgniteUuid cursorId = new IgniteUuid(txId, request.scanId());
+
+        BinaryTuple lowerBound = request.lowerBound();
+        BinaryTuple upperBound = request.upperBound();
+
+        int flags = request.flags();
+
+        @SuppressWarnings("resource") Cursor<IndexRow> cursor = (Cursor<IndexRow>) cursors.computeIfAbsent(cursorId,
+                id -> {
+                    return indexStorage.scan(
+                            lowerBound == null ? null : BinaryTuplePrefix.fromBinaryTuple(lowerBound),
+                            upperBound == null ? null : BinaryTuplePrefix.fromBinaryTuple(upperBound),
+                            flags
+                    );
+                });
+
+        final ArrayList<BinaryRow> result = new ArrayList<>(batchCount);
+
+        return continueReadOnlyIndexScan(cursor, timestamp, batchCount, result)
+                .thenCompose(ignore -> CompletableFuture.completedFuture(result));
+    }
+
+    CompletableFuture<Void> continueReadOnlyIndexScan(
+            Cursor<IndexRow> cursor,
+            HybridTimestamp timestamp,
+            int batchSize,
+            List<BinaryRow> result
+    ) {
+        if (result.size() >= batchSize || !cursor.hasNext()) {
+            return CompletableFuture.completedFuture(null);
+        }
+
+        IndexRow indexRow = cursor.next();
+
+        RowId rowId = indexRow.rowId();
+
+        ReadResult readResult = mvDataStorage.read(rowId, HybridTimestamp.MAX_VALUE);
+
+        return resolveReadResult(readResult, timestamp, () -> {
+            if (readResult.newestCommitTimestamp() == null) {
+                return null;
+            }
+
+            ReadResult committedReadResult = mvDataStorage.read(rowId, readResult.newestCommitTimestamp());
+
+            assert !committedReadResult.isWriteIntent() :
+                    "The result is not committed [rowId=" + rowId + ", timestamp="
+                            + readResult.newestCommitTimestamp() + ']';
+
+            return committedReadResult.binaryRow();
+        }).thenCompose(resolvedReadResult -> {
+            if (resolvedReadResult != null) {
+                result.add(resolvedReadResult);
+            }
+            return continueReadOnlyIndexScan(cursor, timestamp, batchSize, result);

Review Comment:
   I've checked. Java doesn't optimize tail recursion.
   So, wrap recursive call into supplyAsync



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org