You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2022/12/05 15:29:46 UTC

[GitHub] [kafka] C0urante commented on a diff in pull request #12800: KAFKA-14342: KafkaOffsetBackingStore should clear offsets for source partitions on tombstone messages

C0urante commented on code in PR #12800:
URL: https://github.com/apache/kafka/pull/12800#discussion_r1039745954


##########
connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStoreTest.java:
##########
@@ -329,6 +329,60 @@ public void testGetSetNull() throws Exception {
         PowerMock.verifyAll();
     }
 
+    @Test
+    public void testTombstoneOffset() throws Exception {
+        expectConfigure();
+        expectStart(Collections.singletonList(new ConsumerRecord<>(TOPIC, 0, 0, 0L, TimestampType.CREATE_TIME, 0, 0, TP0_KEY.array(), TP0_VALUE.array(),
+                new RecordHeaders(), Optional.empty())));
+
+        Capture<org.apache.kafka.clients.producer.Callback> producerCallback = EasyMock.newCapture();
+        storeLog.send(EasyMock.aryEq(TP0_KEY.array()), EasyMock.isNull(byte[].class), EasyMock.capture(producerCallback));
+        PowerMock.expectLastCall();
+
+        final Capture<Callback<Void>> readToEndCallback = EasyMock.newCapture();
+        storeLog.readToEnd(EasyMock.capture(readToEndCallback));
+        PowerMock.expectLastCall().andAnswer(() -> {
+            capturedConsumedCallback.getValue().onCompletion(null,
+                    new ConsumerRecord<>(TOPIC, 1, 0, 0L, TimestampType.CREATE_TIME, 0, 0, TP0_KEY.array(), null,
+                            new RecordHeaders(), Optional.empty()));
+            readToEndCallback.getValue().onCompletion(null, null);
+            return null;
+        });
+
+        expectStop();
+        expectClusterId();
+
+        PowerMock.replayAll();
+
+        store.configure(DEFAULT_DISTRIBUTED_CONFIG);
+        store.start();
+
+        // Write tombstone offset
+        Map<ByteBuffer, ByteBuffer> toSet = new HashMap<>();
+        toSet.put(TP0_KEY, null);
+
+        final AtomicBoolean invoked = new AtomicBoolean(false);
+        Future<Void> setFuture = store.set(toSet, (error, result) -> invoked.set(true));
+        assertFalse(setFuture.isDone());
+        producerCallback.getValue().onCompletion(null, null);
+        setFuture.get(10000, TimeUnit.MILLISECONDS);
+        assertTrue(invoked.get());
+
+        // Getting data should read to end of our published data and return it
+        Map<ByteBuffer, ByteBuffer> offsets = store.get(Collections.singletonList(TP0_KEY)).get(10000, TimeUnit.MILLISECONDS);
+        assertNull(offsets.get(TP0_KEY));
+
+        // Just verifying that KafkaOffsetBackingStore::get returns null isn't enough, we also need to verify that the mapping for the source partition key is removed.
+        // This is because KafkaOffsetBackingStore::get returns null if either there is no existing offset for the source partition or if there is an offset with null value.
+        // We need to make sure that tombstoned offsets are removed completely (i.e. that the mapping for the corresponding source partition is removed).
+        HashMap<ByteBuffer, ByteBuffer> data = Whitebox.getInternalState(store, "data");
+        assertFalse(data.containsKey(TP0_KEY));

Review Comment:
   Sorry, what exactly is clearer about writing a new test case instead of modifying the existing one?
   
   The logic we're testing here is directly related to handling `null` values and the cognitive overhead of another test case. Plus, we're going to have to add logic for writing a non-null value with a given key no matter what, but with a new test case, we also have to duplicate the existing logic for writing a null value for that same key. Finally, considering how much overhead is involved in the setup of every test case in this class, we should be trying to minimize the number of test cases where possible.



##########
connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java:
##########
@@ -325,11 +325,12 @@ public Future<Void> set(final Map<ByteBuffer, ByteBuffer> values, final Callback
         return producerCallback;
     }
 
-    protected final Callback<ConsumerRecord<byte[], byte[]>> consumedCallback = new Callback<ConsumerRecord<byte[], byte[]>>() {
-        @Override
-        public void onCompletion(Throwable error, ConsumerRecord<byte[], byte[]> record) {
-            ByteBuffer key = record.key() != null ? ByteBuffer.wrap(record.key()) : null;
-            ByteBuffer value = record.value() != null ? ByteBuffer.wrap(record.value()) : null;
+    protected final Callback<ConsumerRecord<byte[], byte[]>> consumedCallback = (error, record) -> {
+        ByteBuffer key = record.key() != null ? ByteBuffer.wrap(record.key()) : null;

Review Comment:
   Yes, I noticed that too; still, if the API we're using (even if it's internal-only) has a signature that can accept an error, we should handle those errors. An alternative is to refactor things to use a `java.util.function.Consumer` instead of a `Callback`, but it's not worth the impact that that would have on the `KafkaBasedLog` class.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org