You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@ignite.apache.org by "tkalkirill (via GitHub)" <gi...@apache.org> on 2023/05/04 09:23:20 UTC

[GitHub] [ignite-3] tkalkirill commented on a diff in pull request #2021: IGNITE-19394 "peek" contract improved to avoid missing data on RW scans.

tkalkirill commented on code in PR #2021:
URL: https://github.com/apache/ignite-3/pull/2021#discussion_r1184741354


##########
modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/index/impl/TestSortedIndexStorage.java:
##########
@@ -61,8 +54,21 @@ public class TestSortedIndexStorage extends AbstractTestIndexStorage implements
     public TestSortedIndexStorage(int partitionId, SortedIndexDescriptor descriptor) {
         super(partitionId);
 
+        BinaryTupleComparator binaryTupleComparator = new BinaryTupleComparator(descriptor);
+
         this.descriptor = descriptor;
-        this.index = new ConcurrentSkipListMap<>(new BinaryTupleComparator(descriptor));
+        this.index = new ConcurrentSkipListSet<>((indexRow0, indexRow1) -> {

Review Comment:
   A separate class for `Camparator` would be more useful I guess.



##########
modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/index/AbstractSortedIndexStorageTest.java:
##########
@@ -1506,6 +1505,49 @@ void testScanPeekReplaceRow() {
         assertNull(scan.peek());
     }
 
+    @Test
+    void testScanPeekRemoveNext() {
+        SortedIndexDefinition indexDefinition = SchemaBuilders.sortedIndex("TEST_IDX")
+                .addIndexColumn(ColumnType.INT32.typeSpec().name()).asc().done()
+                .build();
+
+        SortedIndexStorage indexStorage = createIndexStorage(indexDefinition);
+
+        BinaryTupleRowSerializer serializer = new BinaryTupleRowSerializer(indexStorage.indexDescriptor());
+
+        PeekCursor<IndexRow> scan = indexStorage.scan(null, null, 0);
+
+        RowId rowId = new RowId(TEST_PARTITION);
+
+        // index  =  [0]
+        // cursor = ^ with no cached row
+        put(indexStorage, serializer.serializeRow(new Object[]{0}, rowId));
+
+        // index  =  [0] [1]

Review Comment:
   I don't think `[1]` is here.



##########
modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/index/impl/TestSortedIndexStorage.java:
##########
@@ -118,23 +122,34 @@ public PeekCursor<IndexRow> scan(
             setEqualityFlag(upperBound);
         }
 
-        NavigableMap<ByteBuffer, NavigableMap<RowId, Object>> navigableMap;
+        NavigableSet<IndexRow> navigableSet;
 
         if (lowerBound == null && upperBound == null) {
-            navigableMap = index;
+            navigableSet = index;
         } else if (lowerBound == null) {
-            navigableMap = index.headMap(upperBound.byteBuffer());
+            navigableSet = index.headSet(prefixToIndexRow(upperBound, highestRowId(partitionId)), true);
         } else if (upperBound == null) {
-            navigableMap = index.tailMap(lowerBound.byteBuffer());
+            navigableSet = index.tailSet(prefixToIndexRow(lowerBound, lowestRowId(partitionId)), true);
         } else {
             try {
-                navigableMap = index.subMap(lowerBound.byteBuffer(), upperBound.byteBuffer());
+                navigableSet = index.subSet(
+                        prefixToIndexRow(lowerBound, lowestRowId(partitionId)),
+                        true,
+                        prefixToIndexRow(upperBound, highestRowId(partitionId)),
+                        true
+                );
             } catch (IllegalArgumentException e) {
-                navigableMap = emptyNavigableMap();
+                navigableSet = emptyNavigableSet();

Review Comment:
   Why is exception handling happening, what can go wrong here?



##########
modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/index/impl/TestSortedIndexStorage.java:
##########
@@ -175,112 +192,54 @@ public void close() {
         public boolean hasNext() {
             checkStorageClosedOrInProcessOfRebalance();
 
-            advanceIfNeeded();
+            if (hasNext != null) {
+                return hasNext;
+            }
+
+            currentRow = peekedRow == NO_PEEKED_ROW ? peek() : peekedRow;
+            peekedRow = NO_PEEKED_ROW;
 
+            hasNext = currentRow != null;
             return hasNext;
         }
 
         @Override
         public IndexRow next() {
-            checkStorageClosedOrInProcessOfRebalance();
-
-            advanceIfNeeded();
-
-            boolean hasNext = this.hasNext;
-
-            if (!hasNext) {
+            if (!hasNext()) {
                 throw new NoSuchElementException();
             }
 
             this.hasNext = null;
 
-            return new IndexRowImpl(new BinaryTuple(descriptor.binaryTupleSchema(), currentEntry.getKey()), rowId);
+            return currentRow;
         }
 
         @Override
         public @Nullable IndexRow peek() {
             checkStorageClosedOrInProcessOfRebalance();
 
             if (hasNext != null) {
-                if (hasNext) {
-                    return new IndexRowImpl(new BinaryTuple(descriptor.binaryTupleSchema(), currentEntry.getKey()), rowId);
-                }
-
-                return null;
+                return currentRow;
             }
 
-            Entry<ByteBuffer, NavigableMap<RowId, Object>> indexMapEntry0 = currentEntry == null ? indexMap.firstEntry() : currentEntry;
-
-            RowId nextRowId = null;
-
-            if (rowId == null) {
-                if (indexMapEntry0 != null) {
-                    nextRowId = getRowId(indexMapEntry0.getValue().firstEntry());
-                }
-            } else {
-                Entry<RowId, Object> nextRowIdEntry = indexMapEntry0.getValue().higherEntry(rowId);
-
-                if (nextRowIdEntry != null) {
-                    nextRowId = nextRowIdEntry.getKey();
-                } else {
-                    indexMapEntry0 = indexMap.higherEntry(indexMapEntry0.getKey());
-
-                    if (indexMapEntry0 != null) {
-                        nextRowId = getRowId(indexMapEntry0.getValue().firstEntry());
-                    }
-                }
-            }
-
-            return nextRowId == null
-                    ? null : new IndexRowImpl(new BinaryTuple(descriptor.binaryTupleSchema(), indexMapEntry0.getKey()), nextRowId);
-        }
-
-        private void advanceIfNeeded() {
-            if (hasNext != null) {
-                return;
-            }
-
-            if (currentEntry == null) {
-                currentEntry = indexMap.firstEntry();
-            }
-
-            if (rowId == null) {
-                if (currentEntry != null) {
-                    rowId = getRowId(currentEntry.getValue().firstEntry());
+            if (currentRow == null) {
+                try {
+                    peekedRow = indexSet.first();
+                } catch (NoSuchElementException e) {
+                    peekedRow = null;

Review Comment:
   Exception logic is an anti-pattern, we can avoid them if we use a `NavigableMap`.



##########
modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/index/AbstractSortedIndexStorageTest.java:
##########
@@ -1506,6 +1505,49 @@ void testScanPeekReplaceRow() {
         assertNull(scan.peek());
     }
 
+    @Test
+    void testScanPeekRemoveNext() {
+        SortedIndexDefinition indexDefinition = SchemaBuilders.sortedIndex("TEST_IDX")
+                .addIndexColumn(ColumnType.INT32.typeSpec().name()).asc().done()
+                .build();
+
+        SortedIndexStorage indexStorage = createIndexStorage(indexDefinition);
+
+        BinaryTupleRowSerializer serializer = new BinaryTupleRowSerializer(indexStorage.indexDescriptor());
+
+        PeekCursor<IndexRow> scan = indexStorage.scan(null, null, 0);
+
+        RowId rowId = new RowId(TEST_PARTITION);
+
+        // index  =  [0]
+        // cursor = ^ with no cached row
+        put(indexStorage, serializer.serializeRow(new Object[]{0}, rowId));
+
+        // index  =  [0] [1]
+        // cursor = ^ with no cached row
+        assertEquals(SimpleRow.of(0, rowId), SimpleRow.of(scan.peek(), firstColumn(serializer)));
+
+        // index  =
+        // cursor = ^ with no cached row (but it remembers the last peek call)
+        remove(indexStorage, serializer.serializeRow(new Object[]{0}, rowId));
+
+        // "hasNext" and "next" must return the result of last "peek" operation. This is crucial for RW scans.
+        assertTrue(scan.hasNext());
+        assertEquals(SimpleRow.of(0, rowId), SimpleRow.of(scan.next(), firstColumn(serializer)));
+
+        // index  =
+        // cursor = ^ with no cached row
+        assertNull(scan.peek());

Review Comment:
   What happens if we insert a new row and call the `peek()`?



##########
modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/index/impl/TestSortedIndexStorage.java:
##########
@@ -72,31 +78,29 @@ public SortedIndexDescriptor indexDescriptor() {
 
     @Override
     Iterator<RowId> getRowIdIteratorForGetByBinaryTuple(BinaryTuple key) {
-        return index.getOrDefault(key.byteBuffer(), emptyNavigableMap()).keySet().iterator();
+        // These must be two different instances, because "scan" call messes up headers.
+        BinaryTuplePrefix lowerBound = BinaryTuplePrefix.fromBinaryTuple(key);
+        BinaryTuplePrefix higherBound = BinaryTuplePrefix.fromBinaryTuple(key);
+
+        //noinspection resource
+        return scan(lowerBound, higherBound, GREATER_OR_EQUAL | LESS_OR_EQUAL)
+                .stream()
+                .map(IndexRow::rowId)
+                .iterator();

Review Comment:
   As far as I remember, there may be a problem due to row caching inside the stream api when creating an iterator.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org