You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "mjsax (via GitHub)" <gi...@apache.org> on 2023/03/28 05:10:24 UTC

[GitHub] [kafka] mjsax commented on a diff in pull request #13431: KAFKA-14491: [19/N] Combine versioned store RocksDB instances into one

mjsax commented on code in PR #13431:
URL: https://github.com/apache/kafka/pull/13431#discussion_r1150018327


##########
streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalKeyValueSegment.java:
##########
@@ -176,7 +184,7 @@ public synchronized KeyValueIterator<Bytes, byte[]> range(final Bytes from, fina
         // with empty bytes from the returned iterator. this filtering is accomplished by
         // passing the prefix filter into StrippedPrefixKeyValueIteratorAdapter().
         final Bytes toBound = to == null
-            ? Bytes.increment(prefixKeyFormatter.getPrefix())
+            ? incrementWithoutOverflow(prefixKeyFormatter.getPrefix())

Review Comment:
   Do we need `incrementWithoutOverflow` here, of could we just use `null`?



##########
streams/src/test/java/org/apache/kafka/streams/state/internals/LogicalKeyValueSegmentTest.java:
##########
@@ -172,30 +227,77 @@ public void shouldReturnValuesOnRange() {
         final KeyValue<String, String> kv2 = new KeyValue<>("2", "two");
         final KeyValue<String, String> kvOther = new KeyValue<>("1", "other");
 
-        segment2.put(new Bytes(serializeBytes(kv0.key)), serializeBytes(kv0.value));
-        segment2.put(new Bytes(serializeBytes(kv1.key)), serializeBytes(kv1.value));
-        segment2.put(new Bytes(serializeBytes(kv2.key)), serializeBytes(kv2.value));
-        segment1.put(new Bytes(serializeBytes(kvOther.key)), serializeBytes(kvOther.value));
-        segment3.put(new Bytes(serializeBytes(kvOther.key)), serializeBytes(kvOther.value));
+        segment1.put(new Bytes(serializeBytes(kv0.key)), serializeBytes(kv0.value));
+        segment1.put(new Bytes(serializeBytes(kv1.key)), serializeBytes(kv1.value));
+        segment1.put(new Bytes(serializeBytes(kv2.key)), serializeBytes(kv2.value));
+        segment0.put(new Bytes(serializeBytes(kvOther.key)), serializeBytes(kvOther.value));
+        segment2.put(new Bytes(serializeBytes(kvOther.key)), serializeBytes(kvOther.value));
+
+        // non-null bounds
+        try (final KeyValueIterator<Bytes, byte[]> iterator = segment1.range(new Bytes(serializeBytes("1")), new Bytes(serializeBytes("2")))) {
+            final LinkedList<KeyValue<String, String>> expectedContents = new LinkedList<>();
+            expectedContents.add(kv1);
+            expectedContents.add(kv2);
+            assertEquals(expectedContents, getDeserializedList(iterator));
+        }
+
+        // null lower bound
+        try (final KeyValueIterator<Bytes, byte[]> iterator = segment1.range(null, new Bytes(serializeBytes("1")))) {
+            final LinkedList<KeyValue<String, String>> expectedContents = new LinkedList<>();
+            expectedContents.add(kv0);
+            expectedContents.add(kv1);
+            assertEquals(expectedContents, getDeserializedList(iterator));
+        }
+
+        // null upper bound
+        try (final KeyValueIterator<Bytes, byte[]> iterator = segment1.range(new Bytes(serializeBytes("0")), null)) {
+            final LinkedList<KeyValue<String, String>> expectedContents = new LinkedList<>();
+            expectedContents.add(kv0);
+            expectedContents.add(kv1);
+            expectedContents.add(kv2);
+            assertEquals(expectedContents, getDeserializedList(iterator));
+        }
+
+        // null upper and lower bounds
+        try (final KeyValueIterator<Bytes, byte[]> iterator = segment1.range(new Bytes(serializeBytes("0")), null)) {

Review Comment:
   Should this not be `range(null, null)` ?



##########
streams/src/test/java/org/apache/kafka/streams/state/internals/LogicalKeyValueSegmentsTest.java:
##########
@@ -101,10 +105,34 @@ public void shouldNotCreateSegmentThatIsAlreadyExpired() {
         assertNull(segments.getOrCreateSegmentIfLive(0, context, streamTime));
     }
 
+    @Test
+    public void shouldCreateReservedSegments() {
+        final LogicalKeyValueSegment reservedSegment1 = segments.createReservedSegment(-1, "reserved-1");
+        final LogicalKeyValueSegment reservedSegment2 = segments.createReservedSegment(-2, "reserved-2");
+
+        final File rocksdbDir = new File(new File(context.stateDir(), DB_FILE_DIR), STORE_NAME);
+        assertTrue(rocksdbDir.isDirectory());
+
+        assertTrue(reservedSegment1.isOpen());
+        assertTrue(reservedSegment2.isOpen());
+    }
+
+    @Test
+    public void shouldNotCreateReservedSegmentWithNonNegativeId() {
+        assertThrows(IllegalArgumentException.class, () -> segments.createReservedSegment(0, "reserved"));
+        assertThrows(IllegalArgumentException.class, () -> segments.createReservedSegment(1, "reserved"));
+    }
+
+    @Test
+    public void shouldNotCreateReservedSegmentFromRegularMethod() {
+        assertThrows(IllegalArgumentException.class, () -> segments.getOrCreateSegmentIfLive(-1, context, 0));
+        assertThrows(IllegalArgumentException.class, () -> segments.getOrCreateSegment(-1, context));
+    }
+
     @Test
     public void shouldCleanupSegmentsThatHaveExpired() {
         final LogicalKeyValueSegment segment1 = segments.getOrCreateSegmentIfLive(0, context, 0);
-        final LogicalKeyValueSegment segment2 = segments.getOrCreateSegmentIfLive(0, context, SEGMENT_INTERVAL * 2L);
+        final LogicalKeyValueSegment segment2 = segments.getOrCreateSegmentIfLive(2, context, SEGMENT_INTERVAL * 2L);

Review Comment:
   Nice side fix.



##########
streams/src/test/java/org/apache/kafka/streams/state/internals/LogicalKeyValueSegmentTest.java:
##########
@@ -204,7 +306,7 @@ public void shouldReturnValuesOnRange() {
         }
 
         // null upper and lower bounds
-        try (final KeyValueIterator<Bytes, byte[]> iterator = segment2.range(new Bytes(serializeBytes("0")), null)) {
+        try (final KeyValueIterator<Bytes, byte[]> iterator = negativeIdSegment.range(new Bytes(serializeBytes("0")), null)) {

Review Comment:
   as above: `range(null, null)` ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org