You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@ignite.apache.org by "tkalkirill (via GitHub)" <gi...@apache.org> on 2023/06/08 08:36:48 UTC

[GitHub] [ignite-3] tkalkirill commented on a diff in pull request #2161: IGNITE-19422 Fixed "get" method in index storages.

tkalkirill commented on code in PR #2161:
URL: https://github.com/apache/ignite-3/pull/2161#discussion_r1222612347


##########
modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/index/AbstractPageMemoryIndexStorage.java:
##########
@@ -192,6 +196,123 @@ public void finishCleanup() {
         state.compareAndSet(StorageState.CLEANUP, StorageState.RUNNABLE);
     }
 
+    /** Constant that represents the absence of value in {@link ScanCursor}. */
+    private static final IndexRowKey NO_INDEX_ROW = () -> null;

Review Comment:
   Optional: What about `NULL_INDEX_ROW`? seems more obvious.



##########
modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/index/AbstractPageMemoryIndexStorage.java:
##########
@@ -192,6 +196,123 @@ public void finishCleanup() {
         state.compareAndSet(StorageState.CLEANUP, StorageState.RUNNABLE);
     }
 
+    /** Constant that represents the absence of value in {@link ScanCursor}. */
+    private static final IndexRowKey NO_INDEX_ROW = () -> null;
+
+    /**
+     * Cursor that always returns up-to-date next element.
+     *
+     * @param <R> Type of the returned value.
+     */
+    protected abstract class ScanCursor<R> implements PeekCursor<R> {
+        private final BplusTree<K, V> indexTree;
+
+        @Nullable
+        private Boolean hasNext;
+
+        @Nullable
+        private final K lower;

Review Comment:
   ```suggestion
           private final @Nullable K lower;
   ```



##########
modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/index/AbstractPageMemoryIndexStorage.java:
##########
@@ -192,6 +196,123 @@ public void finishCleanup() {
         state.compareAndSet(StorageState.CLEANUP, StorageState.RUNNABLE);
     }
 
+    /** Constant that represents the absence of value in {@link ScanCursor}. */
+    private static final IndexRowKey NO_INDEX_ROW = () -> null;
+
+    /**
+     * Cursor that always returns up-to-date next element.
+     *
+     * @param <R> Type of the returned value.
+     */
+    protected abstract class ScanCursor<R> implements PeekCursor<R> {
+        private final BplusTree<K, V> indexTree;
+
+        @Nullable
+        private Boolean hasNext;

Review Comment:
   ```suggestion
           private @Nullable Boolean hasNext;
   ```



##########
modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/index/AbstractPageMemoryIndexStorage.java:
##########
@@ -192,6 +196,123 @@ public void finishCleanup() {
         state.compareAndSet(StorageState.CLEANUP, StorageState.RUNNABLE);
     }
 
+    /** Constant that represents the absence of value in {@link ScanCursor}. */
+    private static final IndexRowKey NO_INDEX_ROW = () -> null;
+
+    /**
+     * Cursor that always returns up-to-date next element.
+     *
+     * @param <R> Type of the returned value.
+     */
+    protected abstract class ScanCursor<R> implements PeekCursor<R> {
+        private final BplusTree<K, V> indexTree;
+
+        @Nullable
+        private Boolean hasNext;
+
+        @Nullable
+        private final K lower;
+
+        @Nullable
+        private V treeRow;
+
+        @Nullable
+        private V peekedRow = (V) NO_INDEX_ROW;

Review Comment:
   ```suggestion
           private @Nullable V peekedRow = (V) NO_INDEX_ROW;
   ```



##########
modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/index/AbstractPageMemoryIndexStorage.java:
##########
@@ -192,6 +196,123 @@ public void finishCleanup() {
         state.compareAndSet(StorageState.CLEANUP, StorageState.RUNNABLE);
     }
 
+    /** Constant that represents the absence of value in {@link ScanCursor}. */
+    private static final IndexRowKey NO_INDEX_ROW = () -> null;
+
+    /**
+     * Cursor that always returns up-to-date next element.
+     *
+     * @param <R> Type of the returned value.
+     */
+    protected abstract class ScanCursor<R> implements PeekCursor<R> {
+        private final BplusTree<K, V> indexTree;
+
+        @Nullable
+        private Boolean hasNext;
+
+        @Nullable
+        private final K lower;
+
+        @Nullable
+        private V treeRow;

Review Comment:
   ```suggestion
           private @Nullable V treeRow;
   ```



##########
modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/index/AbstractPageMemoryIndexStorage.java:
##########
@@ -192,6 +196,123 @@ public void finishCleanup() {
         state.compareAndSet(StorageState.CLEANUP, StorageState.RUNNABLE);
     }
 
+    /** Constant that represents the absence of value in {@link ScanCursor}. */
+    private static final IndexRowKey NO_INDEX_ROW = () -> null;
+
+    /**
+     * Cursor that always returns up-to-date next element.
+     *
+     * @param <R> Type of the returned value.
+     */
+    protected abstract class ScanCursor<R> implements PeekCursor<R> {
+        private final BplusTree<K, V> indexTree;
+
+        @Nullable
+        private Boolean hasNext;
+
+        @Nullable
+        private final K lower;
+
+        @Nullable
+        private V treeRow;
+
+        @Nullable
+        private V peekedRow = (V) NO_INDEX_ROW;
+
+        protected ScanCursor(@Nullable K lower, BplusTree<K, V> indexTree) {
+            this.lower = lower;
+            this.indexTree = indexTree;
+        }
+
+        /**
+         * Maps value from the index tree into the required result.
+         */
+        protected abstract R map(V value);
+
+        /**
+         * Check whether the passed value exceeds the upper bound for the scan.
+         */
+        protected abstract boolean halt(V value);

Review Comment:
   Not an obvious name, maybe call it `exceedUppderBoun`?



##########
modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/index/AbstractPageMemoryIndexStorage.java:
##########
@@ -192,6 +196,123 @@ public void finishCleanup() {
         state.compareAndSet(StorageState.CLEANUP, StorageState.RUNNABLE);
     }
 
+    /** Constant that represents the absence of value in {@link ScanCursor}. */
+    private static final IndexRowKey NO_INDEX_ROW = () -> null;
+
+    /**
+     * Cursor that always returns up-to-date next element.
+     *
+     * @param <R> Type of the returned value.
+     */
+    protected abstract class ScanCursor<R> implements PeekCursor<R> {
+        private final BplusTree<K, V> indexTree;
+
+        @Nullable
+        private Boolean hasNext;
+
+        @Nullable
+        private final K lower;
+
+        @Nullable
+        private V treeRow;
+
+        @Nullable
+        private V peekedRow = (V) NO_INDEX_ROW;
+
+        protected ScanCursor(@Nullable K lower, BplusTree<K, V> indexTree) {
+            this.lower = lower;
+            this.indexTree = indexTree;
+        }
+
+        /**
+         * Maps value from the index tree into the required result.
+         */
+        protected abstract R map(V value);
+
+        /**
+         * Check whether the passed value exceeds the upper bound for the scan.
+         */
+        protected abstract boolean halt(V value);
+
+        @Override
+        public void close() {
+            // No-op.
+        }
+
+        @Override
+        public boolean hasNext() {
+            return busy(() -> {
+                try {
+                    return advanceIfNeeded();
+                } catch (IgniteInternalCheckedException e) {
+                    throw new StorageException("Error while advancing the cursor", e);
+                }
+            });
+        }
+
+        @Override
+        public R next() {
+            return busy(() -> {
+                try {
+                    if (!advanceIfNeeded()) {
+                        throw new NoSuchElementException();
+                    }
+
+                    this.hasNext = null;
+
+                    return map(treeRow);
+                } catch (IgniteInternalCheckedException e) {
+                    throw new StorageException("Error while advancing the cursor", e);
+                }
+            });
+        }
+
+        @Override
+        public @Nullable R peek() {
+            return busy(() -> {
+                throwExceptionIfStorageInProgressOfRebalance(state.get(), AbstractPageMemoryIndexStorage.this::createStorageInfo);
+
+                try {
+                    return map(peekBusy());
+                } catch (IgniteInternalCheckedException e) {
+                    throw new StorageException("Error when peeking next element", e);
+                }
+            });
+        }
+
+        private @Nullable V peekBusy() throws IgniteInternalCheckedException {
+            if (hasNext != null) {
+                return treeRow;
+            }
+
+            if (treeRow == null) {
+                peekedRow = lower == null ? indexTree.findFirst() : indexTree.findNext(lower, true);
+            } else {
+                peekedRow = indexTree.findNext(treeRow, false);
+            }
+
+            if (peekedRow != null && halt(peekedRow)) {
+                peekedRow = null;
+            }
+
+            return peekedRow;
+        }
+
+        private boolean advanceIfNeeded() throws IgniteInternalCheckedException {

Review Comment:
   ```suggestion
           private boolean advanceIfNeededBusy() throws IgniteInternalCheckedException {
   ```



##########
modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/index/AbstractPageMemoryIndexStorage.java:
##########
@@ -192,6 +196,123 @@ public void finishCleanup() {
         state.compareAndSet(StorageState.CLEANUP, StorageState.RUNNABLE);
     }
 
+    /** Constant that represents the absence of value in {@link ScanCursor}. */
+    private static final IndexRowKey NO_INDEX_ROW = () -> null;
+
+    /**
+     * Cursor that always returns up-to-date next element.
+     *
+     * @param <R> Type of the returned value.
+     */
+    protected abstract class ScanCursor<R> implements PeekCursor<R> {
+        private final BplusTree<K, V> indexTree;
+
+        @Nullable
+        private Boolean hasNext;
+
+        @Nullable
+        private final K lower;
+
+        @Nullable
+        private V treeRow;

Review Comment:
   Missing javadoc



##########
modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/AbstractRocksDbIndexStorage.java:
##########
@@ -201,4 +214,125 @@ String createStorageInfo() {
      * @throws RocksDBException If failed to delete data.
      */
     abstract void destroyData(WriteBatch writeBatch) throws RocksDBException;
+
+    /**
+     * Cursor that always returns up-to-date next element.
+     */
+    protected final class UpToDatePeekCursor<T> implements PeekCursor<T> {
+        private final Slice upperBoundSlice;
+        private final byte[] lowerBound;
+
+        private final ReadOptions options;
+        private final RocksIterator it;
+        private final Function<ByteBuffer, T> mapper;

Review Comment:
   https://memesmix.net/media/created/250/k5md75.jpg



##########
modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/AbstractRocksDbIndexStorage.java:
##########
@@ -201,4 +214,125 @@ String createStorageInfo() {
      * @throws RocksDBException If failed to delete data.
      */
     abstract void destroyData(WriteBatch writeBatch) throws RocksDBException;
+
+    /**
+     * Cursor that always returns up-to-date next element.
+     */
+    protected final class UpToDatePeekCursor<T> implements PeekCursor<T> {
+        private final Slice upperBoundSlice;
+        private final byte[] lowerBound;
+
+        private final ReadOptions options;
+        private final RocksIterator it;
+        private final Function<ByteBuffer, T> mapper;
+
+        @Nullable
+        private Boolean hasNext;
+
+        private byte @Nullable [] key;

Review Comment:
   Missing javadoc



##########
modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/AbstractRocksDbIndexStorage.java:
##########
@@ -201,4 +214,125 @@ String createStorageInfo() {
      * @throws RocksDBException If failed to delete data.
      */
     abstract void destroyData(WriteBatch writeBatch) throws RocksDBException;
+
+    /**
+     * Cursor that always returns up-to-date next element.
+     */
+    protected final class UpToDatePeekCursor<T> implements PeekCursor<T> {
+        private final Slice upperBoundSlice;
+        private final byte[] lowerBound;
+
+        private final ReadOptions options;
+        private final RocksIterator it;
+        private final Function<ByteBuffer, T> mapper;
+
+        @Nullable
+        private Boolean hasNext;

Review Comment:
   ```suggestion
           private @Nullable Boolean hasNext;
   ```



##########
modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/AbstractRocksDbIndexStorage.java:
##########
@@ -201,4 +214,125 @@ String createStorageInfo() {
      * @throws RocksDBException If failed to delete data.
      */
     abstract void destroyData(WriteBatch writeBatch) throws RocksDBException;
+
+    /**
+     * Cursor that always returns up-to-date next element.
+     */
+    protected final class UpToDatePeekCursor<T> implements PeekCursor<T> {
+        private final Slice upperBoundSlice;
+        private final byte[] lowerBound;
+
+        private final ReadOptions options;
+        private final RocksIterator it;
+        private final Function<ByteBuffer, T> mapper;
+
+        @Nullable
+        private Boolean hasNext;
+
+        private byte @Nullable [] key;
+
+        private byte @Nullable [] peekedKey = BYTE_EMPTY_ARRAY;
+
+        UpToDatePeekCursor(byte[] upperBound, ColumnFamily indexCf, Function<ByteBuffer, T> mapper, byte[] lowerBound) {
+            this.lowerBound = lowerBound;
+            upperBoundSlice = new Slice(upperBound);
+            options = new ReadOptions().setIterateUpperBound(upperBoundSlice);
+            it = indexCf.newIterator(options);
+
+            this.mapper = mapper;
+        }
+
+        @Override
+        public void close() {
+            try {
+                closeAll(it, options, upperBoundSlice);
+            } catch (Exception e) {
+                throw new StorageException("Error closing cursor", e);
+            }
+        }
+
+        @Override
+        public boolean hasNext() {
+            return busy(this::advanceIfNeeded);
+        }
+
+        @Override
+        public T next() {
+            return busy(() -> {
+                if (!advanceIfNeeded()) {
+                    throw new NoSuchElementException();
+                }
+
+                this.hasNext = null;
+
+                return mapper.apply(ByteBuffer.wrap(key).order(KEY_BYTE_ORDER));
+            });
+        }
+
+        @Override
+        public @Nullable T peek() {
+            return busy(() -> {
+                throwExceptionIfStorageInProgressOfRebalance(state.get(), AbstractRocksDbIndexStorage.this::createStorageInfo);
+
+                byte[] res = peek0();
+
+                if (res == null) {
+                    return null;
+                } else {
+                    return mapper.apply(ByteBuffer.wrap(res).order(KEY_BYTE_ORDER));
+                }
+            });
+        }
+
+        private byte @Nullable [] peek0() {
+            if (hasNext != null) {
+                return key;
+            }
+
+            refreshAndPrepareRocksIterator();
+
+            if (!it.isValid()) {
+                RocksUtils.checkIterator(it);
+
+                peekedKey = null;
+            } else {
+                peekedKey = it.key();
+            }
+
+            return peekedKey;
+        }
+
+        private boolean advanceIfNeeded() throws StorageException {

Review Comment:
   ```suggestion
           private boolean advanceIfNeededBusy() throws StorageException {
   ```



##########
modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/AbstractRocksDbIndexStorage.java:
##########
@@ -201,4 +214,125 @@ String createStorageInfo() {
      * @throws RocksDBException If failed to delete data.
      */
     abstract void destroyData(WriteBatch writeBatch) throws RocksDBException;
+
+    /**
+     * Cursor that always returns up-to-date next element.
+     */
+    protected final class UpToDatePeekCursor<T> implements PeekCursor<T> {
+        private final Slice upperBoundSlice;
+        private final byte[] lowerBound;
+
+        private final ReadOptions options;
+        private final RocksIterator it;
+        private final Function<ByteBuffer, T> mapper;
+
+        @Nullable
+        private Boolean hasNext;
+
+        private byte @Nullable [] key;
+
+        private byte @Nullable [] peekedKey = BYTE_EMPTY_ARRAY;
+
+        UpToDatePeekCursor(byte[] upperBound, ColumnFamily indexCf, Function<ByteBuffer, T> mapper, byte[] lowerBound) {
+            this.lowerBound = lowerBound;
+            upperBoundSlice = new Slice(upperBound);
+            options = new ReadOptions().setIterateUpperBound(upperBoundSlice);
+            it = indexCf.newIterator(options);
+
+            this.mapper = mapper;
+        }
+
+        @Override
+        public void close() {
+            try {
+                closeAll(it, options, upperBoundSlice);
+            } catch (Exception e) {
+                throw new StorageException("Error closing cursor", e);
+            }
+        }
+
+        @Override
+        public boolean hasNext() {
+            return busy(this::advanceIfNeeded);
+        }
+
+        @Override
+        public T next() {
+            return busy(() -> {
+                if (!advanceIfNeeded()) {
+                    throw new NoSuchElementException();
+                }
+
+                this.hasNext = null;
+
+                return mapper.apply(ByteBuffer.wrap(key).order(KEY_BYTE_ORDER));
+            });
+        }
+
+        @Override
+        public @Nullable T peek() {
+            return busy(() -> {
+                throwExceptionIfStorageInProgressOfRebalance(state.get(), AbstractRocksDbIndexStorage.this::createStorageInfo);
+
+                byte[] res = peek0();
+
+                if (res == null) {
+                    return null;
+                } else {
+                    return mapper.apply(ByteBuffer.wrap(res).order(KEY_BYTE_ORDER));
+                }
+            });
+        }
+
+        private byte @Nullable [] peek0() {
+            if (hasNext != null) {
+                return key;
+            }
+
+            refreshAndPrepareRocksIterator();
+
+            if (!it.isValid()) {
+                RocksUtils.checkIterator(it);
+
+                peekedKey = null;
+            } else {
+                peekedKey = it.key();
+            }
+
+            return peekedKey;
+        }
+
+        private boolean advanceIfNeeded() throws StorageException {
+            throwExceptionIfStorageInProgressOfRebalance(state.get(), AbstractRocksDbIndexStorage.this::createStorageInfo);
+
+            //noinspection ArrayEquality
+            key = (peekedKey == BYTE_EMPTY_ARRAY) ? peek0() : peekedKey;
+            peekedKey = BYTE_EMPTY_ARRAY;
+
+            hasNext = key != null;
+            return hasNext;
+        }
+
+        private void refreshAndPrepareRocksIterator() {

Review Comment:
   ```suggestion
           private void refreshAndPrepareRocksIteratorBusy() {
   ```



##########
modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/index/AbstractPageMemoryIndexStorage.java:
##########
@@ -192,6 +196,123 @@ public void finishCleanup() {
         state.compareAndSet(StorageState.CLEANUP, StorageState.RUNNABLE);
     }
 
+    /** Constant that represents the absence of value in {@link ScanCursor}. */
+    private static final IndexRowKey NO_INDEX_ROW = () -> null;
+
+    /**
+     * Cursor that always returns up-to-date next element.
+     *
+     * @param <R> Type of the returned value.
+     */
+    protected abstract class ScanCursor<R> implements PeekCursor<R> {
+        private final BplusTree<K, V> indexTree;
+
+        @Nullable
+        private Boolean hasNext;
+
+        @Nullable
+        private final K lower;
+
+        @Nullable
+        private V treeRow;
+
+        @Nullable
+        private V peekedRow = (V) NO_INDEX_ROW;

Review Comment:
   Missing javadoc



##########
modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/AbstractRocksDbIndexStorage.java:
##########
@@ -201,4 +214,125 @@ String createStorageInfo() {
      * @throws RocksDBException If failed to delete data.
      */
     abstract void destroyData(WriteBatch writeBatch) throws RocksDBException;
+
+    /**
+     * Cursor that always returns up-to-date next element.
+     */
+    protected final class UpToDatePeekCursor<T> implements PeekCursor<T> {
+        private final Slice upperBoundSlice;
+        private final byte[] lowerBound;
+
+        private final ReadOptions options;
+        private final RocksIterator it;
+        private final Function<ByteBuffer, T> mapper;
+
+        @Nullable
+        private Boolean hasNext;
+
+        private byte @Nullable [] key;
+
+        private byte @Nullable [] peekedKey = BYTE_EMPTY_ARRAY;

Review Comment:
   Missing javadoc



##########
modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/AbstractRocksDbIndexStorage.java:
##########
@@ -201,4 +214,125 @@ String createStorageInfo() {
      * @throws RocksDBException If failed to delete data.
      */
     abstract void destroyData(WriteBatch writeBatch) throws RocksDBException;
+
+    /**
+     * Cursor that always returns up-to-date next element.
+     */
+    protected final class UpToDatePeekCursor<T> implements PeekCursor<T> {
+        private final Slice upperBoundSlice;
+        private final byte[] lowerBound;
+
+        private final ReadOptions options;
+        private final RocksIterator it;
+        private final Function<ByteBuffer, T> mapper;
+
+        @Nullable
+        private Boolean hasNext;
+
+        private byte @Nullable [] key;
+
+        private byte @Nullable [] peekedKey = BYTE_EMPTY_ARRAY;
+
+        UpToDatePeekCursor(byte[] upperBound, ColumnFamily indexCf, Function<ByteBuffer, T> mapper, byte[] lowerBound) {
+            this.lowerBound = lowerBound;
+            upperBoundSlice = new Slice(upperBound);
+            options = new ReadOptions().setIterateUpperBound(upperBoundSlice);
+            it = indexCf.newIterator(options);
+
+            this.mapper = mapper;
+        }
+
+        @Override
+        public void close() {
+            try {
+                closeAll(it, options, upperBoundSlice);
+            } catch (Exception e) {
+                throw new StorageException("Error closing cursor", e);
+            }
+        }
+
+        @Override
+        public boolean hasNext() {
+            return busy(this::advanceIfNeeded);
+        }
+
+        @Override
+        public T next() {
+            return busy(() -> {
+                if (!advanceIfNeeded()) {
+                    throw new NoSuchElementException();
+                }
+
+                this.hasNext = null;
+
+                return mapper.apply(ByteBuffer.wrap(key).order(KEY_BYTE_ORDER));
+            });
+        }
+
+        @Override
+        public @Nullable T peek() {
+            return busy(() -> {
+                throwExceptionIfStorageInProgressOfRebalance(state.get(), AbstractRocksDbIndexStorage.this::createStorageInfo);
+
+                byte[] res = peek0();
+
+                if (res == null) {
+                    return null;
+                } else {
+                    return mapper.apply(ByteBuffer.wrap(res).order(KEY_BYTE_ORDER));
+                }
+            });
+        }
+
+        private byte @Nullable [] peek0() {

Review Comment:
   ```suggestion
           private byte @Nullable [] peekBusy() {
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org