You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2022/12/03 09:48:08 UTC

[GitHub] [kafka] yashmayya commented on a diff in pull request #12800: KAFKA-14342: KafkaOffsetBackingStore should clear offsets for source partitions on tombstone messages

yashmayya commented on code in PR #12800:
URL: https://github.com/apache/kafka/pull/12800#discussion_r1038756498


##########
connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStoreTest.java:
##########
@@ -329,6 +329,60 @@ public void testGetSetNull() throws Exception {
         PowerMock.verifyAll();
     }
 
+    @Test
+    public void testTombstoneOffset() throws Exception {
+        expectConfigure();
+        expectStart(Collections.singletonList(new ConsumerRecord<>(TOPIC, 0, 0, 0L, TimestampType.CREATE_TIME, 0, 0, TP0_KEY.array(), TP0_VALUE.array(),
+                new RecordHeaders(), Optional.empty())));
+
+        Capture<org.apache.kafka.clients.producer.Callback> producerCallback = EasyMock.newCapture();
+        storeLog.send(EasyMock.aryEq(TP0_KEY.array()), EasyMock.isNull(byte[].class), EasyMock.capture(producerCallback));
+        PowerMock.expectLastCall();
+
+        final Capture<Callback<Void>> readToEndCallback = EasyMock.newCapture();
+        storeLog.readToEnd(EasyMock.capture(readToEndCallback));
+        PowerMock.expectLastCall().andAnswer(() -> {
+            capturedConsumedCallback.getValue().onCompletion(null,
+                    new ConsumerRecord<>(TOPIC, 1, 0, 0L, TimestampType.CREATE_TIME, 0, 0, TP0_KEY.array(), null,
+                            new RecordHeaders(), Optional.empty()));
+            readToEndCallback.getValue().onCompletion(null, null);
+            return null;
+        });
+
+        expectStop();
+        expectClusterId();
+
+        PowerMock.replayAll();
+
+        store.configure(DEFAULT_DISTRIBUTED_CONFIG);
+        store.start();
+
+        // Write tombstone offset
+        Map<ByteBuffer, ByteBuffer> toSet = new HashMap<>();
+        toSet.put(TP0_KEY, null);
+
+        final AtomicBoolean invoked = new AtomicBoolean(false);
+        Future<Void> setFuture = store.set(toSet, (error, result) -> invoked.set(true));
+        assertFalse(setFuture.isDone());
+        producerCallback.getValue().onCompletion(null, null);
+        setFuture.get(10000, TimeUnit.MILLISECONDS);
+        assertTrue(invoked.get());
+
+        // Getting data should read to end of our published data and return it
+        Map<ByteBuffer, ByteBuffer> offsets = store.get(Collections.singletonList(TP0_KEY)).get(10000, TimeUnit.MILLISECONDS);
+        assertNull(offsets.get(TP0_KEY));
+
+        // Just verifying that KafkaOffsetBackingStore::get returns null isn't enough, we also need to verify that the mapping for the source partition key is removed.
+        // This is because KafkaOffsetBackingStore::get returns null if either there is no existing offset for the source partition or if there is an offset with null value.
+        // We need to make sure that tombstoned offsets are removed completely (i.e. that the mapping for the corresponding source partition is removed).
+        HashMap<ByteBuffer, ByteBuffer> data = Whitebox.getInternalState(store, "data");
+        assertFalse(data.containsKey(TP0_KEY));

Review Comment:
   I've done a rebase and re-written the test. IMO just adding the `store.data.containsKey` check to the end of `testGetSetNull` wouldn't test the case that we're actually trying to cover with this change - i.e. existing offsets are cleared from the in-memory map on tombstone offsets. `testGetSetNull` is just checking if null keys and null values can be set without issues, and instead of rewriting it to first write a non-null value and then attempt to clear it with a tombstone I felt like a separate test case would be better.



##########
connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java:
##########
@@ -325,11 +325,12 @@ public Future<Void> set(final Map<ByteBuffer, ByteBuffer> values, final Callback
         return producerCallback;
     }
 
-    protected final Callback<ConsumerRecord<byte[], byte[]>> consumedCallback = new Callback<ConsumerRecord<byte[], byte[]>>() {
-        @Override
-        public void onCompletion(Throwable error, ConsumerRecord<byte[], byte[]> record) {
-            ByteBuffer key = record.key() != null ? ByteBuffer.wrap(record.key()) : null;
-            ByteBuffer value = record.value() != null ? ByteBuffer.wrap(record.value()) : null;
+    protected final Callback<ConsumerRecord<byte[], byte[]>> consumedCallback = (error, record) -> {
+        ByteBuffer key = record.key() != null ? ByteBuffer.wrap(record.key()) : null;

Review Comment:
   While the suggestion makes sense and I've added it, it looks like the `error` will never be non-null as of now.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org