You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2022/11/17 18:16:49 UTC

[GitHub] [kafka] C0urante commented on a diff in pull request #12800: KAFKA-14342: KafkaOffsetBackingStore should clear offsets for source partitions on tombstone messages

C0urante commented on code in PR #12800:
URL: https://github.com/apache/kafka/pull/12800#discussion_r1025541314


##########
connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStoreTest.java:
##########
@@ -329,6 +329,60 @@ public void testGetSetNull() throws Exception {
         PowerMock.verifyAll();
     }
 
+    @Test
+    public void testTombstoneOffset() throws Exception {
+        expectConfigure();
+        expectStart(Collections.singletonList(new ConsumerRecord<>(TOPIC, 0, 0, 0L, TimestampType.CREATE_TIME, 0, 0, TP0_KEY.array(), TP0_VALUE.array(),
+                new RecordHeaders(), Optional.empty())));
+
+        Capture<org.apache.kafka.clients.producer.Callback> producerCallback = EasyMock.newCapture();
+        storeLog.send(EasyMock.aryEq(TP0_KEY.array()), EasyMock.isNull(byte[].class), EasyMock.capture(producerCallback));
+        PowerMock.expectLastCall();
+
+        final Capture<Callback<Void>> readToEndCallback = EasyMock.newCapture();
+        storeLog.readToEnd(EasyMock.capture(readToEndCallback));
+        PowerMock.expectLastCall().andAnswer(() -> {
+            capturedConsumedCallback.getValue().onCompletion(null,
+                    new ConsumerRecord<>(TOPIC, 1, 0, 0L, TimestampType.CREATE_TIME, 0, 0, TP0_KEY.array(), null,
+                            new RecordHeaders(), Optional.empty()));
+            readToEndCallback.getValue().onCompletion(null, null);
+            return null;
+        });
+
+        expectStop();
+        expectClusterId();
+
+        PowerMock.replayAll();
+
+        store.configure(DEFAULT_DISTRIBUTED_CONFIG);
+        store.start();
+
+        // Write tombstone offset
+        Map<ByteBuffer, ByteBuffer> toSet = new HashMap<>();
+        toSet.put(TP0_KEY, null);
+
+        final AtomicBoolean invoked = new AtomicBoolean(false);
+        Future<Void> setFuture = store.set(toSet, (error, result) -> invoked.set(true));
+        assertFalse(setFuture.isDone());
+        producerCallback.getValue().onCompletion(null, null);
+        setFuture.get(10000, TimeUnit.MILLISECONDS);
+        assertTrue(invoked.get());
+
+        // Getting data should read to end of our published data and return it
+        Map<ByteBuffer, ByteBuffer> offsets = store.get(Collections.singletonList(TP0_KEY)).get(10000, TimeUnit.MILLISECONDS);
+        assertNull(offsets.get(TP0_KEY));
+
+        // Just verifying that KafkaOffsetBackingStore::get returns null isn't enough, we also need to verify that the mapping for the source partition key is removed.
+        // This is because KafkaOffsetBackingStore::get returns null if either there is no existing offset for the source partition or if there is an offset with null value.
+        // We need to make sure that tombstoned offsets are removed completely (i.e. that the mapping for the corresponding source partition is removed).
+        HashMap<ByteBuffer, ByteBuffer> data = Whitebox.getInternalState(store, "data");
+        assertFalse(data.containsKey(TP0_KEY));

Review Comment:
   This is going to have to be rewritten to account for the recent switch to Mockito in this test class.
   
   If it helps, though, I don't think we need an entire new test case for this PR. We can probably just add this bit to the end of the existing `testGetSetNull` test case (with whatever comments you feel are appropriate explaining the necessity for the assertion):
   
   ```java
           assertFalse(store.data.containsKey(TP0_KEY));
   ```



##########
connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java:
##########
@@ -325,11 +325,12 @@ public Future<Void> set(final Map<ByteBuffer, ByteBuffer> values, final Callback
         return producerCallback;
     }
 
-    protected final Callback<ConsumerRecord<byte[], byte[]>> consumedCallback = new Callback<ConsumerRecord<byte[], byte[]>>() {
-        @Override
-        public void onCompletion(Throwable error, ConsumerRecord<byte[], byte[]> record) {
-            ByteBuffer key = record.key() != null ? ByteBuffer.wrap(record.key()) : null;
-            ByteBuffer value = record.value() != null ? ByteBuffer.wrap(record.value()) : null;
+    protected final Callback<ConsumerRecord<byte[], byte[]>> consumedCallback = (error, record) -> {
+        ByteBuffer key = record.key() != null ? ByteBuffer.wrap(record.key()) : null;

Review Comment:
   I know this isn't a new bug introduced by this change, but we should probably add error-handling logic here:
   ```suggestion
           if (error != null) {
               log.error("Failed to read from offsets topic", error);
               return;
           }
           ByteBuffer key = record.key() != null ? ByteBuffer.wrap(record.key()) : null;
   ```



##########
connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java:
##########
@@ -325,11 +325,12 @@ public Future<Void> set(final Map<ByteBuffer, ByteBuffer> values, final Callback
         return producerCallback;
     }
 
-    protected final Callback<ConsumerRecord<byte[], byte[]>> consumedCallback = new Callback<ConsumerRecord<byte[], byte[]>>() {
-        @Override
-        public void onCompletion(Throwable error, ConsumerRecord<byte[], byte[]> record) {
-            ByteBuffer key = record.key() != null ? ByteBuffer.wrap(record.key()) : null;
-            ByteBuffer value = record.value() != null ? ByteBuffer.wrap(record.value()) : null;
+    protected final Callback<ConsumerRecord<byte[], byte[]>> consumedCallback = (error, record) -> {
+        ByteBuffer key = record.key() != null ? ByteBuffer.wrap(record.key()) : null;
+        ByteBuffer value = record.value() != null ? ByteBuffer.wrap(record.value()) : null;
+        if (value == null) {
+            data.remove(key);

Review Comment:
   It's a little strange to keep the `value` initializer the way it was now. IMO this would be more readable:
   ```java
           if (record.value() == null)
               data.remove(key);
           else
               data.put(key, ByteBuffer.wrap(record.value()));
   
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org